# pylint: disable=bare-except
"""
Data manager. Holds information about the current data location
and manages the data cache.
"""
import copy
import glob
import logging
import os
import sys
import time
from typing import Dict, List, Optional
import numpy as np
from quicknxs.interfaces.data_handling import data_manipulation, gisans, quicknxs_io
from quicknxs.interfaces.data_handling.data_set import CrossSectionData, NexusData
from quicknxs.interfaces.data_handling.filepath import FilePath, RunNumbers
[docs]
class DataManager(object):
MAX_CACHE = 50 # maximum number of loaded datasets (either single-file or merged-files types)
MAIN_REDUCTION_LIST_INDEX = 1
def __init__(self, current_directory: str):
self.current_directory = current_directory
# current file name is used for file list table to set the current item
self.current_file_name: str = None
# Current data set
self._nexus_data: NexusData = None
self.active_channel: Optional[CrossSectionData] = None
# Cache of loaded data: list of NexusData instances
self._cache: List[NexusData] = list()
# Current data tab (ROI)
self.active_reduction_list_index: int = 1
# Main data structure holding the reduction list for each ROI/peak
# key: reduction list index, corresponds to the reduction table tab in the UI
# value: list of NexusData
self.peak_reduction_lists: Dict[int, List[NexusData]] = {self.active_reduction_list_index: []}
self.direct_beam_list: List[NexusData] = []
# List of cross-sections common to all reduced data sets
self.reduction_states: List[str] = []
self.final_merged_reflectivity = {}
# Cached outputs
self.cached_offspec = None
self.cached_gisans = None
@property
def data_sets(self):
"""Reduced cross sections
Returns
-------
dict
dictionary of cross sections
"""
if self._nexus_data is None:
return None
return self._nexus_data.cross_sections
@property
def current_file(self):
if self._nexus_data is None:
return None
return self._nexus_data.file_path
@property
def reduction_list(self):
"""
Returns the reduction list for the active data tab
Returns
-------
list[NexusData]
The reduction list
"""
return self.peak_reduction_lists[self.active_reduction_list_index]
@reduction_list.setter
def reduction_list(self, value):
"""
Sets the reduction list for the active data tab
Parameters
----------
value: list[NexusData]
The reduction list
"""
self.peak_reduction_lists[self.active_reduction_list_index] = value
@property
def main_reduction_list(self):
"""
Returns the reduction list for the first (mandatory) data tab
Returns
-------
list[NexusData]
The reduction list
"""
return self.peak_reduction_lists[self.MAIN_REDUCTION_LIST_INDEX]
[docs]
def get_cachesize(self):
return len(self._cache)
[docs]
def clear_cache(self):
self._cache = []
[docs]
def clear_cached_unused_data(self):
"""
Delete cached files that are not in the reduction list or direct beam list
"""
def is_used_in_reduction(f: NexusData):
return (self.find_data_in_reduction_list(f) is not None) or (
self.find_data_in_direct_beam_list(f) is not None
)
self._cache[:] = [file for file in self._cache if is_used_in_reduction(file)]
[docs]
def set_active_data_from_reduction_list(self, index):
"""
Set a data set in the reduction list as the active
data set according to its index.
:param int index: index in the reduction list
"""
if index < len(self.reduction_list):
self._nexus_data = self.reduction_list[index]
self.set_channel(0)
[docs]
def set_active_data_from_direct_beam_list(self, index):
"""
Set a data set in the direct beam list as the active
data set according to its index.
:param int index: index in the direct beam list
"""
if index < len(self.direct_beam_list):
self._nexus_data = self.direct_beam_list[index]
self.set_channel(0)
[docs]
def set_channel(self, index):
"""Set the current channel to the specified index, or zero
if it doesn't exist.
Parameters
----------
index: int
channel index
Returns
-------
bool
"""
if self.data_sets is None:
return False
channels = list(self.data_sets.keys())
if index < len(channels):
# channel index is allowed
self.active_channel = self.data_sets[channels[index]]
return True
elif len(channels) == 0:
# no channel
logging.error("Could not set active channel: no data available")
else:
# default
self.active_channel = self.data_sets[channels[0]]
return False
[docs]
def is_active(self, data_set):
"""
Returns True of the given data set is the active data set.
:param NexusData: data set object
"""
return data_set == self._nexus_data
[docs]
def is_nexus_data_compatible(self, nexus_data: NexusData, reduction_list: list):
"""
Determine if the data set is compatible with the data sets in the reduction list.
A data set is compatible if the polarization cross-section states matches those of the
first run in the reduction list, both the same number of states and the same states.
Parameters
----------
nexus_data: NexusData
The data set to check if compatible with reduction list
reduction_list: list[NexusData]
The reduction list
Returns
-------
bool
"""
# If we are starting a new reduction list, just proceed
if not reduction_list:
return True
nexus_data_states = list(nexus_data.cross_sections.keys())
reduction_list_states = list(reduction_list[0].cross_sections.keys())
# First, check that we have the same number of states
if not len(reduction_list_states) == len(nexus_data_states):
logging.error(
f"Nexus data cross-sections ({reduction_list_states}) different than those of the"
f" reduction list ({nexus_data_states})"
)
return False
# Second, make sure the states match
for cross_section_state in nexus_data_states:
if cross_section_state not in reduction_list_states:
logging.error(
f"Nexus data cross-section {cross_section_state} not found in those of the reduction list"
)
return False
return True
[docs]
def find_run_number_in_reduction_list(self, run_number: int, reduction_list: list[NexusData]):
"""
Look for the given run number in the reduction list.
Parameters
----------
run_number: int
Run number to look for
reduction_list: list[NexusData]
The reduction list to search
Returns
-------
int | None
The index in the reduction list or None
"""
for i, nexus_data in enumerate(reduction_list):
if nexus_data.number == run_number:
return i
return None
[docs]
def find_data_in_reduction_list(self, nexus_data):
"""
Look for the given data in the reduction list.
Return the index within the reduction list or none.
:param NexusData: data set object
"""
for i in range(len(self.reduction_list)):
if nexus_data == self.reduction_list[i]:
return i
return None
[docs]
def find_data_in_direct_beam_list(self, nexus_data):
"""
Look for the given data in the direct beam list.
Return the index within the direct beam list or none.
:param NexusData: data set object
"""
for i in range(len(self.direct_beam_list)):
if nexus_data == self.direct_beam_list[i]:
return i
return None
[docs]
def find_active_data_id(self):
"""
Look for the active data in the reduction list.
Return the index within the reduction list or none.
"""
return self.find_data_in_reduction_list(self._nexus_data)
[docs]
def find_active_direct_beam_id(self):
"""
Look for the active data in the direct beam list.
Return the index within the direct beam list or none.
"""
return self.find_data_in_direct_beam_list(self._nexus_data)
[docs]
def add_active_to_reduction(self, peak_index=MAIN_REDUCTION_LIST_INDEX):
r"""
Add active data set to reduction list
New data sets are always added to the main reduction list. Data sets are added to secondary
reduction lists by initializing from the main reduction list (button to add new data tab)
or by propagating individual data sets to other tabs (right-click menu).
Parameters
----------
peak_index: int
The index of the peak in peak_reduction_lists
Returns
-------
bool
True if the active data set was added to the reduction list, False if it was not added
"""
reduct_list = self.peak_reduction_lists[peak_index]
if self._nexus_data not in reduct_list:
if self.is_nexus_data_compatible(self._nexus_data, reduct_list):
if len(reduct_list) == 0:
self.reduction_states = list(self.data_sets.keys())
is_inserted = False
q_min, _ = self._nexus_data.get_q_range()
if q_min is None:
logging.error("Could not get q range information")
return False
for i in range(len(reduct_list)):
_q_min, _ = reduct_list[i].get_q_range()
if q_min <= _q_min:
reduct_list.insert(i, self._nexus_data)
is_inserted = True
break
if not is_inserted:
reduct_list.append(self._nexus_data)
return True
else:
logging.error("The data you are trying to add has different cross-sections")
return False
[docs]
def copy_nexus_data_to_reduction(self, nexus_data_to_copy: NexusData, peak_index: int):
r"""
Add data set to the reduction list specified by `peak_index`
Parameters
----------
nexus_data_to_copy: NexusData
Data set to copy
peak_index: int
Peak (reduction list) to copy data set to
Returns
-------
bool
True if the data set was added successfully, otherwise False
"""
reduction_list = self.peak_reduction_lists[peak_index]
# check if run already exists in this reduction list
if any(run_data.number == nexus_data_to_copy.number for run_data in reduction_list):
return False
nexus_data = copy.deepcopy(nexus_data_to_copy)
if self.is_nexus_data_compatible(nexus_data, reduction_list):
is_inserted = False
q_min, _ = nexus_data.get_q_range()
for i in range(len(reduction_list)):
_q_min, _ = reduction_list[i].get_q_range()
if q_min <= _q_min:
reduction_list.insert(i, nexus_data)
is_inserted = True
break
if not is_inserted:
reduction_list.append(nexus_data)
return True
else:
logging.error("The data you are trying to add has different cross-sections")
return False
[docs]
def add_active_to_normalization(self):
"""
Add active data set to the direct beam list
"""
if self._nexus_data not in self.direct_beam_list:
self.direct_beam_list.append(self._nexus_data)
return True
return False
[docs]
def remove_active_from_normalization(self):
"""
Remove the active data set from the direct beam list
"""
for i in range(len(self.direct_beam_list)):
if self.direct_beam_list[i] == self._nexus_data:
self.direct_beam_list.pop(i)
return i
return -1
[docs]
def remove_from_active_reduction_list(self, index: int):
"""
Remove item from the active reduction list
Parameters
----------
index: int
Index of the item to remove
"""
self.reduction_list.pop(index)
[docs]
def clear_direct_beam_list(self):
"""
Remove all items from the direct beam list, and make
sure to remove links to those items in the scattering data sets.
TODO: remove links from scattering data sets.
"""
self.direct_beam_list = []
def _loading_progress(self, call_back, start_value, stop_value, value, message=None):
_value = start_value + (stop_value - start_value) * value
call_back(_value, message)
[docs]
def load(self, file_path, configuration, force=False, update_parameters=True, progress=None):
# type: (str, Configuration, Optional[bool], Optional[bool], Optional[ProgressReporter]) -> bool
r"""
@brief Load one ore more Nexus data files
@param file_path: absolute path to one or more files. If more than one, files are concatenated with the
merge symbol '+'.
@param configuration: configuration to use to load the data
@param force: it True, existing data in the cache will be replaced by reading from file.
@param update_parameters: if True, we will find peak ranges
@param progress: aggregator to estimate percent of time allotted to this function
@returns True if the data is retrieved from the cache of past loading events
"""
# Actions taken in this function:
# 1. Find if the file has been loaded in the past. Retrieve the cache when force==False
# 2. If file not in cache, or if force==True: invoke NexusData.load()
# 3. Update attributes _nexus_data, current_directory, and current_file_name
# 4. If we're overwriting cached data that was allocated in the reduction_list and direct_beam_list,
# then assign the new data to the proper indexes in lists reduction_list and direct_beam_list
# 5. Compute reflectivity if data is loaded from file
nexus_data = None # type: NexusData
is_from_cache = False # if True, the file has been loaded before
reduction_list_id = None
direct_beam_list_id = None
file_path = FilePath(file_path, sort=True).path # force sorting by increasing run number
if progress is not None:
progress(10, "Loading data...")
# Check whether the file has already been loaded (in cache)
for i in range(len(self._cache)):
if self._cache[i].file_path == file_path:
if force:
# Check whether the data is in the reduction list before removing it
reduction_list_id = self.find_data_in_reduction_list(self._cache[i])
direct_beam_list_id = self.find_data_in_direct_beam_list(self._cache[i])
self._cache.pop(i)
else:
nexus_data = self._cache[i]
is_from_cache = True
break
# If we don't have the data, load it
if nexus_data is None:
nexus_data = NexusData(file_path, configuration)
sub_task = progress.create_sub_task(max_value=70) if progress else None
nexus_data.load(progress=sub_task, update_parameters=update_parameters)
if progress is not None:
progress(80, "Calculating...")
if nexus_data is not None:
self._nexus_data = nexus_data
# Example: '/SNS/REF_M/IPTS-25531/nexus/REF_M_38198.nxs.h5+/SNS/REF_M/IPTS-25531/nexus/REF_M_38199.nxs.h5'
# will be split into directory='/SNS/REF_M/IPTS-25531/nexus' and
# file_name='REF_M_38198.nxs.h5+REF_M_38199.nxs.h5'
directory, file_name = FilePath(file_path).split()
self.current_directory = directory
self.current_file_name = file_name
self.set_channel(0)
# If we didn't get this data set from our cache, add it and compute its reflectivity.
if not is_from_cache:
# Find suitable direct beam
if configuration.match_direct_beam:
self.find_best_direct_beam()
# Replace reduction and normalization entries as needed
if reduction_list_id is not None:
self.reduction_list[reduction_list_id] = nexus_data
if direct_beam_list_id is not None:
self.direct_beam_list[direct_beam_list_id] = nexus_data
# Compute reflectivity
if not nexus_data.is_direct_beam():
try:
self.calculate_reflectivity()
except Exception as e:
logging.error(f"Reflectivity calculation failed for {file_name}: {e}")
# if cached reduced data exceeds maximum cache size, remove the oldest reduced data
while len(self._cache) >= self.MAX_CACHE:
self._cache.pop(0)
self._cache.append(nexus_data)
if progress is not None:
progress(100)
return is_from_cache
[docs]
def update_configuration(self, configuration, active_only: bool = False, nexus_data: NexusData = None):
"""
Update configuration
"""
if active_only:
self.active_channel.update_configuration(configuration)
elif nexus_data is not None:
nexus_data.update_configuration(configuration)
else:
self._nexus_data.update_configuration(configuration)
[docs]
def get_active_direct_beam(self):
"""
Return the direct beam data object for the active data
"""
return self._find_direct_beam(self._nexus_data)
def _find_direct_beam(self, nexus_data):
"""
Determine whether we have a direct beam data set available
for a given reflectivity data set.
The object returned is a CrossSectionData object.
:param NexusData or CrossSectionData nexus_data: data set to find a direct beam for
"""
direct_beam = None
# Find the CrossSectionData object to work with
if isinstance(nexus_data, NexusData):
# Get the direct beam info from the configuration
# All the cross sections should have the same direct beam file.
data_keys = list(nexus_data.cross_sections.keys())
if len(data_keys) == 0:
logging.error("DataManager._find_direct_beam: no data available in NexusData object")
return
data_xs = nexus_data.cross_sections[data_keys[0]]
else:
data_xs = nexus_data
if data_xs.configuration is not None and data_xs.configuration.normalization is not None:
for item in self.direct_beam_list:
# convert _run_number to int if it can be
try:
_run_number = int(data_xs.configuration.normalization)
except (ValueError, TypeError):
_run_number = data_xs.configuration.normalization
# convert item.number to int if it can be
try:
item_number = int(item.number)
except (ValueError, TypeError):
item_number = item.number
if item_number == _run_number:
keys = list(item.cross_sections.keys())
if len(keys) >= 1:
if len(keys) > 1:
logging.error("More than one cross-section for the direct beam, using the first one")
direct_beam = item.cross_sections[keys[0]]
if direct_beam is None:
logging.error("The specified direct beam is not available: skipping")
return direct_beam
[docs]
def reduce_gisans(self, progress=None):
"""
Since the specular reflectivity is prominently displayed, it is updated as
soon as parameters change. This is not the case for GISANS, which is
computed on-demand.
This method goes through the data sets in the reduction list and re-calculate
the GISANS.
"""
if progress is not None:
progress(1, "Reducing GISANS...")
for i, nexus_data in enumerate(self.reduction_list):
try:
self.calculate_gisans(nexus_data=nexus_data, progress=None)
if progress is not None:
progress(100.0 / len(self.reduction_list) * (i + 1))
except:
logging.error("Could not compute GISANS for %s\n %s", nexus_data.number, sys.exc_info()[1])
if progress is not None:
progress(100)
[docs]
def calculate_gisans(self, nexus_data=None, progress=None):
"""
Compute GISANS for a single data set
"""
t_0 = time.time()
# Select the data to work on
if nexus_data is None:
nexus_data = self._nexus_data
# We must have a direct beam data set to normalize with
direct_beam = self._find_direct_beam(nexus_data)
if direct_beam is None:
# TODO 67 Handle this error with GUI prompt GUI
raise RuntimeError("Please select a direct beam data set for your data.")
nexus_data.calculate_gisans(direct_beam=direct_beam, progress=progress)
logging.info("Calculate GISANS: %s %s sec", nexus_data.number, (time.time() - t_0))
[docs]
def is_offspec_available(self):
"""
Verify that all data sets and all cross-sections have calculated
off-specular data available.
"""
for nexus_data in self.reduction_list:
if not nexus_data.is_offspec_available():
return False
return True
[docs]
def is_gisans_available(self, active_only=True):
"""
Verify that all data sets and all cross-sections have calculated
GISANS data available.
"""
if active_only:
return self._nexus_data.is_gisans_available()
for nexus_data in self.reduction_list:
if not nexus_data.is_gisans_available():
return False
return True
[docs]
def reduce_spec(self):
"""
Calculate reflectivity for all runs in all reduction lists
"""
for reduct_list in self.peak_reduction_lists.values():
for nexus_data in reduct_list:
try:
self.calculate_reflectivity(nexus_data=nexus_data)
except:
logging.error("Could not compute reflectivity for %s\n %s", nexus_data.number, sys.exc_info()[1])
[docs]
def reduce_offspec(self, progress=None):
"""
Since the specular reflectivity is prominently displayed, it is updated as
soon as parameters change. This is not the case for the off-specular, which is
computed on-demand.
This method goes through the data sets in the reduction list and re-calculate
the off-specular reflectivity.
"""
for nexus_data in self.reduction_list:
try:
self.calculate_reflectivity(nexus_data=nexus_data, specular=False)
except:
logging.error("Could not compute reflectivity for %s\n %s", nexus_data.number, sys.exc_info()[1])
[docs]
def rebin_gisans(self, pol_state, wl_min=0, wl_max=100, qy_npts=50, qz_npts=50, use_pf=False):
"""
Merge all the off-specular reflectivity data and rebin.
"""
return gisans.rebin_extract(
self.reduction_list,
pol_state=pol_state,
wl_min=wl_min,
wl_max=wl_max,
qy_npts=qy_npts,
qz_npts=qz_npts,
use_pf=use_pf,
)
# TODO 67 FInd out whether it can work with merged data
[docs]
def calculate_reflectivity(self, configuration=None, active_only=False, nexus_data=None, specular=True):
"""
Calculate reflectivity using the current configuration
"""
# Select the data to work on
if nexus_data is None:
nexus_data = self._nexus_data
# Try to find the direct beam in the list of direct beam data sets
direct_beam = self._find_direct_beam(nexus_data)
if not specular:
nexus_data.calculate_offspec(direct_beam=direct_beam)
elif active_only:
self.active_channel.reflectivity(direct_beam=direct_beam, configuration=configuration)
else:
nexus_data.calculate_reflectivity(
direct_beam=direct_beam, configuration=configuration, ws_suffix=str(self.active_reduction_list_index)
)
[docs]
def find_best_direct_beam(self):
"""
Find the best direct beam in the direct beam list for the active data
Returns a run number.
Returns True if we have updated the data with a new normalization run.
"""
# TODO 65+ Can it work with merged data?
# Select the first run number if the active channel cross section is derived from more than one run
active_channel_number = RunNumbers(self.active_channel.number).numbers[0]
closest = None
for item in self.direct_beam_list:
item_number = int(item.number)
xs_keys = list(item.cross_sections.keys())
if len(xs_keys) > 0:
channel = item.cross_sections[list(item.cross_sections.keys())[0]]
if self.active_channel.configuration.instrument.direct_beam_match(self.active_channel, channel):
if closest is None:
closest = item_number
elif abs(item_number - active_channel_number) < abs(closest - active_channel_number):
closest = item_number
if closest is None:
# If we didn't find a direct beam, try with just the wavelength
for item in self.direct_beam_list:
xs_keys = list(item.cross_sections.keys())
if len(xs_keys) > 0:
channel = item.cross_sections[list(item.cross_sections.keys())[0]]
if self.active_channel.configuration.instrument.direct_beam_match(
self.active_channel, channel, skip_slits=True
):
if closest is None:
closest = item_number
elif abs(item_number - active_channel_number) < abs(closest - active_channel_number):
closest = item_number
if closest is not None:
return self._nexus_data.set_parameter("normalization", closest)
return False
[docs]
def get_trim_values(self):
"""
Cut the start and end of the active data set to 5% of its
maximum intensity.
"""
if (
self.active_channel is not None
and self.active_channel.q is not None
and self.active_channel.configuration.normalization is not None
):
direct_beam = self._find_direct_beam(self.active_channel)
if direct_beam is None:
logging.error("The specified direct beam is not available: skipping")
return
region = np.where(direct_beam.r >= (direct_beam.r.max() * 0.05))[0]
p_0 = region[0]
p_n = len(direct_beam.r) - region[-1] - 1
self._nexus_data.set_parameter("cut_first_n_points", p_0)
self._nexus_data.set_parameter("cut_last_n_points", p_n)
return [p_0, p_n]
return
[docs]
def strip_overlap(self):
"""
Remove overlapping points in the reflecitviy, cutting always from the lower Qz
measurements.
"""
if len(self.reduction_list) < 2:
logging.error("You need to have at least two datasets in the reduction table")
return
xs = self.active_channel.name
for idx, item in enumerate(self.reduction_list[:-1]):
next_item = self.reduction_list[idx + 1]
end_idx = next_item.cross_sections[xs].configuration.cut_first_n_points
overlap_idx = np.where(item.cross_sections[xs].q >= next_item.cross_sections[xs].q[end_idx])
logging.error(overlap_idx[0])
if len(overlap_idx[0]) > 0:
n_points = len(item.cross_sections[xs].q) - overlap_idx[0][0]
item.set_parameter("cut_last_n_points", n_points)
[docs]
def stitch_data_sets(
self, normalize_to_unity=True, q_cutoff=0.01, global_stitching=False, poly_degree=None, poly_points=3
):
"""
Determine scaling factors for each data set
:param bool normalize_to_unity: If True, the reflectivity plateau will be normalized to 1.
:param float q_cutoff: critical q-value below which we expect R=1
:param bool global_stitching: If True, use data from all cross-sections to calculate scaling factors
:param int poly_degree: if not None, find the scaling factor by simultaneously fitting a polynomial and scaling factor to the curves
:param int poly_points: number of additional points on each end of the overlap region to include in the fit
"""
data_manipulation.smart_stitch_reflectivity(
self.reduction_list,
self.active_channel.name,
normalize_to_unity,
q_cutoff,
global_stitching,
poly_degree,
poly_points,
)
[docs]
def merge_data_sets(self, asymmetry=True):
self.final_merged_reflectivity = {}
for pol_state in self.reduction_states:
# The scaling factors should have been determined at this point. Just use them
# to merge the different runs in a set.
merged_ws = data_manipulation.merge_reflectivity(
self.reduction_list, xs=pol_state, q_min=0.001, q_step=-0.01
)
self.final_merged_reflectivity[pol_state] = merged_ws
# Compute asymmetry
if asymmetry:
self.asymmetry()
[docs]
def determine_asymmetry_states(self):
"""
Determine which cross-section to use to compute asymmetry.
"""
# Inspect cross-section
# - For two states, just calculate the asymmetry using those two
p_state = None
m_state = None
if len(self.reduction_states) == 2:
if self.reduction_states[0].lower() in ["off_off", "off-off"]:
p_state = self.reduction_states[0]
m_state = self.reduction_states[1]
else:
p_state = self.reduction_states[1]
m_state = self.reduction_states[0]
else:
_p_state_data = None
_m_state_data = None
for item in self.reduction_states:
if item.lower() in ["off_off", "off-off"]:
_p_state_data = item
if item.lower() in ["on_on", "on-on"]:
_m_state_data = item
if _p_state_data is None or _m_state_data is None:
_p_state_data = None
_m_state_data = None
for item in self.reduction_states:
if self.data_sets[item].cross_section_label == "++":
_p_state_data = item
if self.data_sets[item].cross_section_label == "--":
_m_state_data = item
if _p_state_data is None or _m_state_data is None:
p_state = None
m_state = None
# - If we haven't made sense of it yet, take the first and last cross-sections
if p_state is None and m_state is None and len(self.reduction_states) >= 2:
p_state = self.reduction_states[0]
m_state = self.reduction_states[-1]
return p_state, m_state
[docs]
def asymmetry(self):
"""
Determine which cross-section to use to compute asymmetry, and compute it.
"""
p_state, m_state = self.determine_asymmetry_states()
# Get the list of workspaces
if p_state in self.final_merged_reflectivity and m_state in self.final_merged_reflectivity:
p_ws = self.final_merged_reflectivity[p_state]
m_ws = self.final_merged_reflectivity[m_state]
ratio_ws = (p_ws - m_ws) / (p_ws + m_ws)
self.final_merged_reflectivity["SA"] = ratio_ws
[docs]
def load_data_from_reduced_file(self, file_path, configuration=None, progress=None):
"""
Load the information from a reduced file, the load the data.
Ask the main event handler to update the UI once we are done.
:param str file_path: reduced file to load
:param Configuration configuration: configuration to base the loaded data on
:param ProgressReporter progress: progress reporter
"""
t_0 = time.time()
db_files, data_files, additional_peaks, has_scaling_error = quicknxs_io.read_reduced_file(
file_path, configuration
)
logging.info("Reduced file loaded: %s sec", time.time() - t_0)
n_total = len(db_files) + len(data_files)
if progress and n_total > 0:
progress.set_value(1, message="Loaded %s" % os.path.basename(file_path), out_of=n_total)
self.load_direct_beam_and_data_files(db_files, data_files, additional_peaks, configuration, progress, t_0)
if progress and not has_scaling_error:
progress.set_value(
1,
"NOTE: Initial error bars may be inaccurate - please run stitching to update scaling factor errors.",
1,
)
logging.info("DONE: %s sec", time.time() - t_0)
[docs]
def load_direct_beam_and_data_files(
self, db_files, data_files, additional_peaks=None, configuration=None, progress=None, force=False, t_0=None
):
"""
Load direct beam and data files and add them to the direct beam list and reduction
list, respectively
Parameters
----------
db_files: list
List of (run_number, run_file, conf) for direct beam files
data_files: list
List of (run_number, run_file, conf) for data files
additional_peaks: list | None
List of (peak_index, run_number, run_file, conf) for data files for additional peaks
configuration: Configuration
Configuration to base the loaded data on
progress: ProgressReporter
Progress reporter
force: bool
If True, ignore cache and force reloading from file
t_0: float
Start time for logging data loading time
"""
if not t_0:
t_0 = time.time()
n_loaded = 0
n_total = len(db_files) + len(data_files)
for r_id, run_file, conf in db_files:
t_i = time.time()
if os.path.isfile(run_file):
is_from_cache = self.load(run_file, conf, force=force, update_parameters=False)
if is_from_cache:
configuration.normalization = None
self._nexus_data.update_configuration(conf)
self.add_active_to_normalization()
logging.info("%s loaded: %s sec [%s]", r_id, time.time() - t_i, time.time() - t_0)
if progress:
progress.set_value(n_loaded, message="%s loaded" % os.path.basename(run_file), out_of=n_total)
else:
logging.error("File does not exist: %s", run_file)
if progress:
progress.set_value(n_loaded, message="ERROR: %s does not exist" % run_file, out_of=n_total)
n_loaded += 1
for r_id, run_file, conf in data_files:
t_i = time.time()
do_files_exist = []
for name in run_file.split("+"):
do_files_exist.append((os.path.isfile(name)))
if all(do_files_exist):
is_from_cache = self.load(run_file, conf, force=force, update_parameters=False)
if is_from_cache:
configuration.normalization = None
self._nexus_data.update_configuration(conf)
self.calculate_reflectivity()
if self.add_active_to_reduction():
logging.info("%s loaded: %s sec [%s]", r_id, time.time() - t_i, time.time() - t_0)
else:
logging.error("Could not load %s", r_id)
if progress:
progress.set_value(n_loaded, message="%s loaded" % os.path.basename(run_file), out_of=n_total)
else:
logging.error("File does not exist: %s", run_file)
if progress:
progress.set_value(n_loaded, message="ERROR: %s does not exist" % run_file, out_of=n_total)
n_loaded += 1
if progress:
progress.set_value(n_total, message="Done", out_of=n_total)
# Initialize any additional peak reduction lists by copying the data from the main reduction list
if additional_peaks:
for peak_index, r_id, run_file, conf in additional_peaks:
if peak_index not in self.peak_reduction_lists:
self.peak_reduction_lists[peak_index] = []
self.set_active_reduction_list_index(peak_index)
# find run in main reduction list and make a copy TODO: what if it is missing?
run_index = [i for i, data in enumerate(self.main_reduction_list) if data.number == str(r_id)][0]
self._nexus_data = copy.deepcopy(self.main_reduction_list[run_index])
configuration.normalization = None
self.update_configuration(conf)
self.calculate_reflectivity()
self.add_active_to_reduction(peak_index)
@property
def current_event_files(self):
# type: () -> List[str]
r"""
@brief Sorted list of event files in the current directory
@details return only file names with pattern '*event.nxs' or '*.nxs.h5'
"""
event_file_list = glob.glob(os.path.join(self.current_directory, "*event.nxs"))
h5_file_list = glob.glob(os.path.join(self.current_directory, "*.nxs.h5"))
event_file_list.extend(h5_file_list)
return sorted([os.path.basename(name) for name in event_file_list])
[docs]
def reload_files(self, configuration=None, progress=None):
"""
Force reload of files in the reduction lists and direct beam list
"""
def _get_nexus_conf(nexus_data):
"""Returns the configuration for the main cross-section of the run"""
return nexus_data.cross_sections[nexus_data.main_cross_section].configuration
# Get files to reload
db_files = [(nexus.number, nexus.file_path, _get_nexus_conf(nexus)) for nexus in self.direct_beam_list]
data_files = []
additional_peaks = []
for ipeak, reduction_list in self.peak_reduction_lists.items():
if ipeak == self.MAIN_REDUCTION_LIST_INDEX:
data_files = [(nexus.number, nexus.file_path, _get_nexus_conf(nexus)) for nexus in reduction_list]
else:
additional_peaks = [
(ipeak, nexus.number, nexus.file_path, _get_nexus_conf(nexus)) for nexus in reduction_list
]
# Clear the lists
self.direct_beam_list.clear()
[reduct_list.clear() for reduct_list in self.peak_reduction_lists.values()]
# Reload files and add to reduction and direct beam lists
self.load_direct_beam_and_data_files(db_files, data_files, additional_peaks, configuration, progress, True)
[docs]
def add_additional_reduction_list(self, tab_index: int):
"""
Add reduction list for an additional ROI/peak
Parameters
----------
tab_index: int
Index of the peak in `self.peak_reduction_lists`
"""
if self.main_reduction_list is not None and tab_index not in self.peak_reduction_lists:
self.peak_reduction_lists[tab_index] = copy.deepcopy(self.main_reduction_list)
[docs]
def remove_additional_reduction_list(self, tab_index: int):
"""
Remove reduction list for additional ROI/peak
Parameters
----------
tab_index: int
Index of the peak in `self.peak_reduction_lists`
"""
if tab_index in self.peak_reduction_lists:
self.peak_reduction_lists.pop(tab_index)
[docs]
def set_active_reduction_list_index(self, tab_index: int):
"""
Set the active reduction list index
Parameters
----------
tab_index: int
Index of the peak in `self.peak_reduction_lists`
"""
self.active_reduction_list_index = tab_index
[docs]
def update_active_reduction_list(self, tab_index: int):
"""
Updates the active reduction list and run
Parameters
----------
tab_index: int
Index of the peak in `self.peak_reduction_lists`
"""
active_data_idx = self.find_active_data_id()
self.set_active_reduction_list_index(tab_index)
# keep same selected run if available in the other reduction table
if active_data_idx and active_data_idx < len(self.reduction_list):
self.set_active_data_from_reduction_list(active_data_idx)
else:
self.set_active_data_from_reduction_list(0)
[docs]
def clear_reduction_lists(self):
"""
Resets to one empty reduction list
"""
self.active_reduction_list_index = 1
self.peak_reduction_lists = {self.active_reduction_list_index: []}