# pylint: disable=bare-except
"""Data manager. Holds information about the current data location and manages the data cache."""
import copy
import glob
import logging
import os
import sys
import time
from typing import Callable, Dict, List, Optional, Union
import numpy as np
from quicknxs.interfaces.configuration import Configuration
from quicknxs.interfaces.data_handling import data_manipulation, gisans, quicknxs_io
from quicknxs.interfaces.data_handling.data_set import CrossSectionData, NexusData
from quicknxs.interfaces.data_handling.filepath import FilePath
from quicknxs.interfaces.event_handlers.progress_reporter import ProgressReporter
[docs]
class DataManager(object):
"""Holds information about the current data location and manages the data cache.
Attributes
----------
current_directory
Current directory
current_file_name
Current file name, used for file list table to set the current item
_nexus_data
Current data set
active_cross_section
Currently active CrossSectionData
_cache
Cache of loaded data
active_reduction_list_index
Index of current data (ROI) tab
peak_reduction_lists
Dictionary of reduction lists
key: reduction list index, corresponds to the reduction table tab in the UI
direct_beam_list
List of direct beam data sets
reduction_states
List of cross-sections common to all reduced data sets
final_merged_reflectivity
Merged reflectivity data
cached_offspec
Cached off-specular data
cached_gisans
Cached GISANS data
"""
MAX_CACHE = 50 # maximum number of loaded datasets (either single-file or merged-files types)
MAIN_REDUCTION_LIST_INDEX = 1
def __init__(self, current_directory: str):
self.current_directory: str = current_directory
self.current_file_name: Optional[str] = None
self._nexus_data: Optional[NexusData] = None
self.active_cross_section: Optional[CrossSectionData] = None
self.active_reduction_list_index: int = 1
# Main data structure holding the reduction list for each ROI/peak
# key: reduction list index, corresponds to the reduction table tab in the UI
# value: list of NexusData
self.peak_reduction_lists: Dict[int, List[NexusData]] = {self.active_reduction_list_index: []}
self.direct_beam_list: List[NexusData] = []
# Track the last selected row index for each reduction table tab and the direct beam tab
# This allows us to maintain the selection when switching between tabs
self.last_selected_reduction_row: Dict[int, int] = {} # key: tab index, value: row index
self.last_selected_direct_beam_row: int = 0
# List of cross-sections common to all reduced data sets
self.reduction_states: List[str] = []
self.final_merged_reflectivity = {}
# TODO: cache could be improved to be a dict or named tuple with file_path as key (Glass)
self._cache: List[NexusData] = list()
self.cached_offspec: Optional[dict] = None
self.cached_gisans: Optional[dict] = None
@property
def data_sets(self):
"""dict: Dict of reduced cross sections."""
if self._nexus_data is None:
return None
return self._nexus_data.cross_sections
@property
def current_file(self):
if self._nexus_data is None:
return None
return self._nexus_data.file_path
@property
def reduction_list(self) -> list[NexusData]:
"""list[NexusData]: Reduction list for the active data tab."""
return self.peak_reduction_lists[self.active_reduction_list_index]
@reduction_list.setter
def reduction_list(self, value):
self.peak_reduction_lists[self.active_reduction_list_index] = value
@property
def main_reduction_list(self) -> list[NexusData]:
"""Reduction list for the first (mandatory) data tab."""
return self.peak_reduction_lists[self.MAIN_REDUCTION_LIST_INDEX]
[docs]
def get_cachesize(self):
return len(self._cache)
[docs]
def clear_cache(self):
self._cache = []
[docs]
def clear_cached_unused_data(self):
"""Delete cached files that are not in the reduction list or direct beam list."""
def is_used_in_reduction(f: NexusData):
return (self.find_data_in_reduction_list(f) is not None) or (
self.find_data_in_direct_beam_list(f) is not None
)
self._cache[:] = [file for file in self._cache if is_used_in_reduction(file)]
[docs]
def set_active_data_from_reduction_list(self, index):
"""Set a data set in the reduction list as the active data set according to its index.
Args:
index (int): index in the reduction list
"""
if index < len(self.reduction_list):
self._nexus_data = self.reduction_list[index]
self.set_active_cross_section(0)
# Track the last selected row for this reduction table
self.last_selected_reduction_row[self.active_reduction_list_index] = index
[docs]
def set_active_data_from_direct_beam_list(self, index: int):
"""Set a data set in the direct beam list as the active data set according to its index.
Args:
index : index in the direct beam list
"""
if index < len(self.direct_beam_list):
self._nexus_data = self.direct_beam_list[index]
self.set_active_cross_section(0)
# Track the last selected row for the direct beam table
self.last_selected_direct_beam_row = index
[docs]
def set_active_cross_section(self, index: int) -> bool:
"""Set the current cross section to the specified index, or zero if it doesn't exist."""
if self.data_sets is None:
return False
cross_sections = list(self.data_sets.keys())
if index < len(cross_sections):
# cross section index is allowed
self.active_cross_section = self.data_sets[cross_sections[index]]
return True
elif len(cross_sections) == 0:
# no cross sections
logging.error("Could not set active cross section: no data available")
else:
# default
self.active_cross_section = self.data_sets[cross_sections[0]]
return False
[docs]
def is_active(self, data_set: NexusData):
"""Check if the given data set is the active data set."""
return data_set == self._nexus_data
[docs]
def is_nexus_data_compatible(self, nexus_data: NexusData, reduction_list: List[NexusData]) -> bool:
"""Determine if the data set is compatible with the data sets in the reduction list.
A data set is compatible if the polarization cross-section states matches those of the
first run in the reduction list, both the same number of states and the same states.
Parameters
----------
nexus_data : NexusData
The data set to check if compatible with reduction list
reduction_list : list[NexusData]
The reduction list
Returns
-------
bool
True if the data set is compatible with the reduction list, False otherwise.
"""
# If we are starting a new reduction list, just proceed
if not reduction_list:
return True
nexus_data_states = list(nexus_data.cross_sections.keys())
reduction_list_states = list(reduction_list[0].cross_sections.keys())
# First, check that we have the same number of states
if not len(reduction_list_states) == len(nexus_data_states):
logging.error(
f"Nexus data cross-sections ({reduction_list_states}) different than those of the"
f" reduction list ({nexus_data_states})"
)
return False
# Second, make sure the states match
for cross_section_state in nexus_data_states:
if cross_section_state not in reduction_list_states:
logging.error(
f"Nexus data cross-section {cross_section_state} not found in those of the reduction list"
)
return False
return True
[docs]
def find_run_number_in_reduction_list(self, run_number: int, reduction_list: list[NexusData]):
"""Look for the given run number in the reduction list.
Returns
-------
int | None
The index in the reduction list or None
"""
for i, nexus_data in enumerate(reduction_list):
if nexus_data.number == run_number:
return i
return None
[docs]
def find_data_in_reduction_list(self, nexus_data):
"""Look for the given data in the reduction list.
Returns
-------
int | None:
The index within the reduction list, or none.
"""
for i in range(len(self.reduction_list)):
if nexus_data == self.reduction_list[i]:
return i
return None
[docs]
def find_data_in_direct_beam_list(self, nexus_data: NexusData | None) -> Optional[int]:
"""Look for the given data in the direct beam list.
Returns
-------
int | None:
The index within the direct beam list, or none.
"""
if nexus_data is None:
return None
for i in range(len(self.direct_beam_list)):
if nexus_data == self.direct_beam_list[i]:
return i
return None
[docs]
def find_run_number_in_direct_beam_list(self, nexus_data: NexusData | None) -> Optional[int]:
"""Look for data with the same run number in the direct beam list.
This method compares by run number rather than object identity, which is useful
for detecting duplicates when deepcopied objects are involved.
Parameters
----------
nexus_data : NexusData | None
The data to search for by run number.
Returns
-------
int | None:
The index within the direct beam list, or None if not found.
"""
if nexus_data is None:
return None
for i in range(len(self.direct_beam_list)):
if nexus_data.number == self.direct_beam_list[i].number:
return i
return None
[docs]
def find_active_data_id(self) -> Optional[int]:
"""Look for the active data in the reduction list.
Returns
-------
int | None:
The index within the reduction list or none.
"""
return self.find_data_in_reduction_list(self._nexus_data)
[docs]
def find_active_direct_beam_id(self) -> Optional[int]:
"""Look for the active data in the direct beam list.
Returns
-------
int | None:
The index within the direct beam list or none.
"""
return self.find_data_in_direct_beam_list(self._nexus_data)
def _insert_into_reduction_list_by_q(self, nexus_data: NexusData, reduction_list: List[NexusData]) -> bool:
"""Insert NexusData into reduction list in ascending Q order.
Parameters
----------
nexus_data:
Data to insert
reduction_list:
Target reduction list
Returns
-------
bool
True if successfully inserted, False if Q range is unavailable
"""
q_min, _ = nexus_data.get_q_range()
if q_min is None:
logging.error("Could not get q range information")
return False
# Find insertion point to maintain ascending Q order
is_inserted = False
for i in range(len(reduction_list)):
_q_min, _ = reduction_list[i].get_q_range()
if q_min <= _q_min:
reduction_list.insert(i, nexus_data)
is_inserted = True
break
if not is_inserted:
reduction_list.append(nexus_data)
return True
[docs]
def add_active_to_reduction(self, peak_index=MAIN_REDUCTION_LIST_INDEX) -> bool:
"""Add active data set to reduction list.
New data sets are always added to the main reduction list. Data sets are added to secondary
reduction lists by initializing from the main reduction list (button to add new data tab)
or by propagating individual data sets to other tabs (right-click menu).
Parameters
----------
peak_index: int
The index of the peak in peak_reduction_lists
Returns
-------
bool
True if the active data set was added to the reduction list, False if it was not added
"""
reduction_list = self.peak_reduction_lists[peak_index]
if self._nexus_data not in reduction_list:
if self.is_nexus_data_compatible(self._nexus_data, reduction_list):
if len(reduction_list) == 0:
self.reduction_states = list(self.data_sets.keys())
return self._insert_into_reduction_list_by_q(self._nexus_data, reduction_list)
else:
logging.error("The data you are trying to add has different cross-sections")
return False
[docs]
def copy_nexus_data_to_reduction(self, nexus_data_to_copy: NexusData, peak_index: int):
"""Add data set to the reduction list specified by `peak_index`.
Parameters
----------
nexus_data_to_copy:
Data set to copy
peak_index:
reduction list to copy data set to
Returns
-------
bool
True if the data set was added successfully, otherwise False
"""
reduction_list = self.peak_reduction_lists[peak_index]
# check if run already exists in this reduction list
if any(run_data.number == nexus_data_to_copy.number for run_data in reduction_list):
return False
nexus_data = copy.deepcopy(nexus_data_to_copy)
if self.is_nexus_data_compatible(nexus_data, reduction_list):
return self._insert_into_reduction_list_by_q(nexus_data, reduction_list)
else:
logging.error("The data you are trying to add has different cross-sections")
return False
[docs]
def add_active_to_direct_beam_list(self):
"""Add active data set to the direct beam list.
This method allows adding any run to the direct beam list, even if it wasn't originally
acquired as a direct beam (i.e., when the PV data_type != 1). This is useful for
calibration and other runs started with "Start RUN" command in EPICS, which don't add
the "Direct Beam" PV-tag.
Returns
-------
int
2 if the run was added and is a true direct beam (data_type == 1)
1 if the run was added but is NOT a true direct beam (data_type != 1)
0 if the run was not added (already in the list)
"""
# Check if already in list by run number (since we may have deepcopied objects)
if self.find_run_number_in_direct_beam_list(self._nexus_data) is not None:
return 0
is_true_direct_beam = self._nexus_data.is_direct_beam()
if is_true_direct_beam:
# True direct beam - add directly
self.direct_beam_list.append(self._nexus_data)
else:
# Not a true direct beam - make a deep copy and change the data type of the copy
# This makes it possible to add the same run also as a data run while preventing them sharing state
direct_beam_nexus_data = copy.deepcopy(self._nexus_data)
direct_beam_nexus_data.set_is_direct_beam(True)
self.direct_beam_list.append(direct_beam_nexus_data)
logging.warning(
"Run %s was added to the direct beam list but is not labeled as a direct beam "
"(data_type PV != 1). This run may have been started with 'Start RUN' command.",
self._nexus_data.number,
)
return 1
return 2
[docs]
def remove_active_from_direct_beam_list(self):
"""Remove the active data set from the direct beam list.
Uses run number comparison to find the entry, since the active data may be
the original object while the list contains a deepcopy.
"""
index = self.find_run_number_in_direct_beam_list(self._nexus_data)
if index is not None:
self.direct_beam_list.pop(index)
return index
return -1
[docs]
def remove_from_active_reduction_list(self, index: int):
"""
Remove item from the active reduction list.
Parameters
----------
index: int
Index of the item to remove
"""
self.reduction_list.pop(index)
[docs]
def clear_direct_beam_list(self):
"""Remove all items from the direct beam list."""
# TODO: remove links from scattering data sets.
self.direct_beam_list = []
def _loading_progress(self, call_back, start_value, stop_value, value, message=None):
_value = start_value + (stop_value - start_value) * value
call_back(_value, message)
[docs]
def load(
self,
file_path: str,
configuration: Configuration,
force: bool = False,
update_parameters: bool = True,
progress: Optional[Callable] = None,
) -> bool:
"""Load one or more Nexus data files.
Parameters
----------
file_path:
absolute path to one or more files.
If more than one, files are concatenated with the merge symbol '+'.
configuration:
Configuration to use to load the data
force:
if True, existing data in the cache will be replaced by reading from file.
update_parameters:
if True, we will find peak ranges
progress:
aggregator to estimate percent of time allotted to this function
Returns
-------
bool:
True if the data is retrieved from the cache of past loading events
"""
# Actions taken in this function:
# 1. Find if the file has been loaded in the past. Retrieve the cache when force==False
# 2. If file not in cache, or if force==True: invoke NexusData.load()
# 3. Update attributes _nexus_data, current_directory, and current_file_name
# 4. If we're overwriting cached data that was allocated in the reduction_list and direct_beam_list,
# then assign the new data to the proper indexes in lists reduction_list and direct_beam_list
# 5. Compute reflectivity if data is loaded from file
nexus_data = None # type: Optional[NexusData]
is_from_cache = False # if True, the file has been loaded before
reduction_list_id = None
direct_beam_list_id = None
file_path = FilePath(file_path, sort=True).path # force sorting by increasing run number
if progress is not None:
progress(10, "Loading data...")
# Check whether the file has already been loaded (in cache)
for i in range(len(self._cache)):
if self._cache[i].file_path == file_path:
if force:
# Check whether the data is in the reduction list before removing it
reduction_list_id = self.find_data_in_reduction_list(self._cache[i])
direct_beam_list_id = self.find_data_in_direct_beam_list(self._cache[i])
self._cache.pop(i)
else:
nexus_data = self._cache[i]
is_from_cache = True
break
# If we don't have the data, load it
if nexus_data is None:
nexus_data = NexusData(file_path, configuration)
sub_task = progress.create_sub_task(max_value=70) if progress else None
nexus_data.load(progress=sub_task, update_parameters=update_parameters)
if progress is not None:
progress(80, "Calculating...")
self._nexus_data = nexus_data
# Example: '/SNS/REF_M/IPTS-25531/nexus/REF_M_38198.nxs.h5+/SNS/REF_M/IPTS-25531/nexus/REF_M_38199.nxs.h5'
# will be split into directory='/SNS/REF_M/IPTS-25531/nexus' and
# file_name='REF_M_38198.nxs.h5+REF_M_38199.nxs.h5'
directory, file_name = FilePath(file_path).split()
self.current_directory = directory
self.current_file_name = file_name
self.set_active_cross_section(0)
# If we didn't get this data set from our cache, add it and compute its reflectivity.
if not is_from_cache:
# Find suitable direct beam
if configuration.match_direct_beam:
match_found = self.find_best_direct_beam()
direct_beam = self.get_active_direct_beam()
if match_found and direct_beam is not None:
dpix = direct_beam.configuration.peak_position
self._nexus_data.set_parameter("direct_pixel_overwrite", dpix)
else:
logging.info(f"DataManager.load: No matching direct beam found - {match_found=}, {direct_beam=}")
# Replace reduction and direct beam entries as needed
if reduction_list_id is not None:
self.reduction_list[reduction_list_id] = nexus_data
if direct_beam_list_id is not None:
self.direct_beam_list[direct_beam_list_id] = nexus_data
# Compute reflectivity
if not nexus_data.is_direct_beam():
try:
self.calculate_reflectivity()
except Exception as e:
logging.error(f"Reflectivity calculation failed for {file_name}: {e}")
# if cached reduced data exceeds maximum cache size, remove the oldest reduced data
while len(self._cache) >= self.MAX_CACHE:
self._cache.pop(0)
self._cache.append(nexus_data)
if progress is not None:
progress(100)
return is_from_cache
[docs]
def update_configuration(self, configuration, active_only: bool = False, nexus_data: Optional[NexusData] = None):
"""Update configuration."""
if active_only:
if self.active_cross_section is None:
logging.error("No active cross section to update configuration")
return
self.active_cross_section.update_configuration(configuration)
elif nexus_data is not None:
nexus_data.update_configuration(configuration)
else:
self._nexus_data.update_configuration(configuration)
[docs]
def get_active_direct_beam(self):
"""Return the direct beam data object for the active data."""
return self._find_direct_beam(self._nexus_data)
[docs]
def update_direct_pixel_from_direct_beam(self) -> Optional[float]:
"""Set `direct_pixel_overwrite` based on the matched direct beam peak position."""
if self._nexus_data is None:
return None
direct_beam = self.get_active_direct_beam()
if direct_beam is None:
return None
dpix = direct_beam.configuration.peak_position
self._nexus_data.set_parameter("direct_pixel_overwrite", dpix)
return dpix
[docs]
def is_same_run(self, run_number_a: str | int, run_number_b: str | int) -> bool:
"""
Returns True if two run numbers are considered the same.
Tries to compare the run numbers as integers if possible;
falls back to comparing them as-is (e.g., strings).
Parameters
----------
run_number_a : str or int
The first run number.
run_number_b : str or int
The second run number.
Returns
-------
bool
True if the run numbers are equal (after normalization), False otherwise.
"""
def normalize(run):
try:
return int(run)
except (ValueError, TypeError):
return run
return normalize(run_number_a) == normalize(run_number_b)
[docs]
def is_direct_beam_for_run(self, nexus_data: NexusData, direct_beam_run: str | int) -> bool:
"""Check if the direct beam is the configured direct beam for the given run.
Parameters
----------
nexus_data : NexusData
NexusData run object
direct_beam : str | int
Direct beam run number
"""
run_direct_beam = self._find_direct_beam(nexus_data)
return run_direct_beam is not None and self.is_same_run(run_direct_beam.number, direct_beam_run)
def _find_direct_beam(self, nexus_data: Union[NexusData, CrossSectionData]) -> Optional[CrossSectionData]:
"""Attempt to find a direct beam data set for a given reflectivity data set.
Returns
-------
CrossSectionData or None:
The direct beam data set if found, otherwise None.
"""
# Find the CrossSectionData object to work with
if isinstance(nexus_data, NexusData):
# Get the direct beam info from the configuration
# All the cross sections should have the same direct beam file.
data_keys = list(nexus_data.cross_sections.keys())
if len(data_keys) == 0:
logging.error("DataManager._find_direct_beam: no data available in NexusData object")
return
data_xs = nexus_data.cross_sections[data_keys[0]]
elif isinstance(nexus_data, CrossSectionData):
data_xs = nexus_data
else:
raise TypeError("nexus_data must be a NexusData or CrossSectionData object")
direct_beam = None
if data_xs.configuration is None or data_xs.configuration.direct_beam is None:
return direct_beam
data_xs_direct_beam = data_xs.configuration.direct_beam
for direct_beam_item in self.direct_beam_list:
if not self.is_same_run(direct_beam_item.number, data_xs_direct_beam):
continue
keys = list(direct_beam_item.cross_sections.keys())
if not keys:
logging.error(f"Direct beam {direct_beam_item.number} has no cross-sections")
continue
if len(keys) > 1:
logging.error("More than one cross-section for the direct beam, using the first one")
direct_beam = direct_beam_item.cross_sections[keys[0]]
break
if direct_beam is None:
logging.error("The specified direct beam is not available: skipping")
return direct_beam
[docs]
def find_direct_beam_by_name(self, direct_beam_name: str) -> Optional[NexusData]:
"""Find a direct beam data set by its name.
Parameters
----------
direct_beam_name : str
Name of the direct beam run to find.
Returns
-------
NexusData or None
The direct beam data set if found, otherwise None.
"""
for db in self.direct_beam_list:
if db.number == str(direct_beam_name):
return db
return None
[docs]
def reduce_gisans(self, progress=None):
"""Calculate GISANS for all datasets in the reduction list.
Since the specular reflectivity is prominently displayed, it is updated as
soon as parameters change. This is not the case for GISANS, which is
computed on-demand.
"""
if progress is not None:
progress(1, "Reducing GISANS...")
for i, nexus_data in enumerate(self.reduction_list):
try:
self.calculate_gisans(nexus_data=nexus_data, progress=None)
if progress is not None:
progress(100.0 / len(self.reduction_list) * (i + 1))
except:
logging.error("Could not compute GISANS for %s\n %s", nexus_data.number, sys.exc_info()[1])
if progress is not None:
progress(100)
[docs]
def calculate_gisans(self, nexus_data=None, progress=None):
"""Compute GISANS for a single data set."""
t_0 = time.time()
# Select the data to work on
if nexus_data is None:
nexus_data = self._nexus_data
# We must have a direct beam data set to normalize with
direct_beam = self._find_direct_beam(nexus_data)
if direct_beam is None:
return False
nexus_data.calculate_gisans(direct_beam=direct_beam, progress=progress)
logging.info("Calculate GISANS: %s %s sec", nexus_data.number, (time.time() - t_0))
return True
[docs]
def is_offspec_available(self):
"""Verify that all data sets and all cross-sections have calculated off-specular data available."""
for nexus_data in self.reduction_list:
if not nexus_data.is_offspec_available():
return False
return True
[docs]
def is_gisans_available(self, active_only=True):
"""Verify that all data sets and all cross-sections have calculated GISANS data available."""
if active_only:
return self._nexus_data.is_gisans_available()
for nexus_data in self.reduction_list:
if not nexus_data.is_gisans_available():
return False
return True
[docs]
def reduce_spec(self, direct_beam: Optional[str | int] = None):
"""
Calculate reflectivity for all runs in all reduction lists.
If a direct beam is given, only calculate reflectivity for the runs using the given direct beam.
Parameters
----------
direct_beam : Optional[str | int]
Direct beam run number
"""
for reduct_list in self.peak_reduction_lists.values():
for nexus_data in reduct_list:
if direct_beam is not None and not self.is_direct_beam_for_run(nexus_data, direct_beam):
continue
try:
self.calculate_reflectivity(nexus_data=nexus_data)
except:
logging.error("Could not compute reflectivity for %s\n %s", nexus_data.number, sys.exc_info()[1])
[docs]
def reduce_offspec(self, progress=None):
"""Calculate off-specular reflectivity for all datasets in all reduction list.
Since the specular reflectivity is prominently displayed, it is updated as
soon as parameters change. This is not the case for the off-specular, which is
computed on-demand.
"""
for nexus_data in self.reduction_list:
try:
self.calculate_reflectivity(nexus_data=nexus_data, specular=False)
except:
logging.error("Could not compute reflectivity for %s\n %s", nexus_data.number, sys.exc_info()[1])
[docs]
def rebin_gisans(self, pol_state, wl_min=0, wl_max=100, qy_npts=50, qz_npts=50, use_pf=False):
"""Merge all the off-specular reflectivity data and rebin."""
return gisans.rebin_extract(
self.reduction_list,
pol_state=pol_state,
wl_min=wl_min,
wl_max=wl_max,
qy_npts=qy_npts,
qz_npts=qz_npts,
use_pf=use_pf,
)
# TODO 67 Find out whether it can work with merged data
[docs]
def calculate_reflectivity(self, configuration=None, active_only=False, nexus_data=None, specular=True):
"""Calculate reflectivity using the current configuration."""
# Select the data to work on
if nexus_data is None:
nexus_data = self._nexus_data
# Try to find the direct beam in the list of direct beam data sets
direct_beam = self._find_direct_beam(nexus_data)
if not specular:
nexus_data.calculate_offspec(direct_beam=direct_beam)
elif active_only:
self.active_cross_section.reflectivity(direct_beam=direct_beam, configuration=configuration)
else:
nexus_data.calculate_reflectivity(
direct_beam=direct_beam, configuration=configuration, ws_suffix=str(self.active_reduction_list_index)
)
[docs]
def find_best_direct_beam(self):
"""Find the best direct beam in the direct beam list for the active data.
Returns
-------
bool:
True if we have updated the data with a new normalization run.
"""
closest = None
# for each run in the beamline, compute a closeness score
# if the wavelengths do not match within tolerance, this is some overlarge value
# if the wavelengths do match, compute a euclidean difference of their slit widths
# pick the run with a matching wavelength and lowest euclidean slit difference
active_xs = self.active_cross_section
active_instrument = active_xs.configuration.instrument
closeness: Dict[int, float] = {}
for item in self.direct_beam_list:
item_number = int(item.number)
xs_keys = list(item.cross_sections.keys())
if len(xs_keys) > 0:
xs = item.cross_sections[xs_keys[0]]
if active_instrument.direct_beam_match(active_xs, xs, skip_slits=True):
closeness[item_number] = active_instrument.direct_beam_distance(active_xs, xs)
else:
closeness[item_number] = 1.0e16
if len(self.direct_beam_list) > 0:
closest = min(closeness.items(), key=lambda item: item[1])[0]
logging.info(f"Best direct beam for run {self._nexus_data.number} is {closest}")
if closest is not None:
try:
self._nexus_data.set_parameter("direct_beam", closest)
return True
except Exception as e:
logging.error(f"Could not set direct beam to {closest}: {e}")
return False
return False
[docs]
def get_trim_values(self) -> Optional[List[int]]:
"""Cut the start and end of the active data set to 5% of its maximum intensity."""
if (
self.active_cross_section is not None
and self.active_cross_section.q is not None
and self.active_cross_section.configuration.direct_beam is not None
):
direct_beam = self._find_direct_beam(self.active_cross_section)
if direct_beam is None:
logging.error("The specified direct beam is not available: skipping")
return None
region = np.where(direct_beam.r >= (direct_beam.r.max() * 0.05))[0]
p_0 = region[0]
p_n = len(direct_beam.r) - region[-1] - 1
self._nexus_data.set_parameter("cut_first_n_points", p_0)
self._nexus_data.set_parameter("cut_last_n_points", p_n)
return [p_0, p_n]
return None
[docs]
def strip_overlap(self):
"""Remove overlapping points in the reflectivity, cutting always from the lower Qz measurements."""
if len(self.reduction_list) < 2:
logging.error("You need to have at least two datasets in the reduction table")
return
xs = self.active_cross_section.name
for idx, item in enumerate(self.reduction_list[:-1]):
next_item = self.reduction_list[idx + 1]
end_idx = next_item.cross_sections[xs].configuration.cut_first_n_points
overlap_idx = np.where(item.cross_sections[xs].q >= next_item.cross_sections[xs].q[end_idx])
logging.error(overlap_idx[0])
if len(overlap_idx[0]) > 0:
n_points = len(item.cross_sections[xs].q) - overlap_idx[0][0]
item.set_parameter("cut_last_n_points", n_points)
[docs]
def stitch_data_sets(
self,
normalize_to_unity: bool = True,
q_cutoff: float = 0.01,
global_stitching: bool = False,
poly_degree: Optional[int] = None,
poly_points: int = 3,
):
"""Determine scaling factors for each data set.
Parameters
----------
normalize_to_unity:
If True, the reflectivity plateau will be normalized to 1.
q_cutoff:
critical q-value below which we expect R=1
global_stitching:
If True, use data from all cross-sections to calculate scaling factors
poly_degree:
if not None, find the scaling factor by simultaneously fitting a polynomial and scaling factor to the curves
poly_points:
number of additional points on each end of the overlap region to include in the fit
"""
data_manipulation.stitch_reflectivity(
self.reduction_list,
self.active_cross_section.name,
normalize_to_unity,
q_cutoff,
global_stitching,
poly_degree,
poly_points,
)
[docs]
def merge_data_sets(self, asymmetry=True):
self.final_merged_reflectivity = {}
for pol_state in self.reduction_states:
# The scaling factors should have been determined at this point. Just use them
# to merge the different runs in a set.
merged_ws = data_manipulation.merge_reflectivity(
self.reduction_list, xs=pol_state, q_min=0.001, q_step=-0.01
)
self.final_merged_reflectivity[pol_state] = merged_ws
# Compute asymmetry
if asymmetry:
self.asymmetry()
[docs]
def determine_asymmetry_states(self):
"""Determine which cross-section to use to compute asymmetry."""
# Inspect cross-section
# - For two states, just calculate the asymmetry using those two
p_state = None
m_state = None
if len(self.reduction_states) == 2:
if self.reduction_states[0].lower() in ["off_off", "off-off"]:
p_state = self.reduction_states[0]
m_state = self.reduction_states[1]
else:
p_state = self.reduction_states[1]
m_state = self.reduction_states[0]
else:
_p_state_data = None
_m_state_data = None
for item in self.reduction_states:
if item.lower() in ["off_off", "off-off"]:
_p_state_data = item
if item.lower() in ["on_on", "on-on"]:
_m_state_data = item
if _p_state_data is None or _m_state_data is None:
_p_state_data = None
_m_state_data = None
for item in self.reduction_states:
if self.data_sets[item].cross_section_label == "++":
_p_state_data = item
if self.data_sets[item].cross_section_label == "--":
_m_state_data = item
if _p_state_data is None or _m_state_data is None:
p_state = None
m_state = None
# - If we haven't made sense of it yet, take the first and last cross-sections
if p_state is None and m_state is None and len(self.reduction_states) >= 2:
p_state = self.reduction_states[0]
m_state = self.reduction_states[-1]
return p_state, m_state
[docs]
def asymmetry(self):
"""Determine which cross-section to use to compute asymmetry, and compute it."""
p_state, m_state = self.determine_asymmetry_states()
# Get the list of workspaces
if p_state in self.final_merged_reflectivity and m_state in self.final_merged_reflectivity:
p_ws = self.final_merged_reflectivity[p_state]
m_ws = self.final_merged_reflectivity[m_state]
ratio_ws = (p_ws - m_ws) / (p_ws + m_ws)
self.final_merged_reflectivity["SA"] = ratio_ws
[docs]
def load_data_from_reduced_file(
self, file_path: str, configuration: Optional[Configuration] = None, progress: ProgressReporter = None
):
"""Load the information from a reduced file, the load the data.
Ask the main event handler to update the UI once we are done.
"""
t_0 = time.time()
db_files, data_files, additional_peaks, has_scaling_error = quicknxs_io.read_reduced_file(
file_path, configuration
)
logging.info("Reduced file loaded: %s sec", time.time() - t_0)
n_total = len(db_files) + len(data_files)
if progress and n_total > 0:
progress.set_value(1, message="Loaded %s" % os.path.basename(file_path), out_of=n_total)
self.load_direct_beam_and_data_files(db_files, data_files, additional_peaks, configuration, progress, True, t_0)
if progress and not has_scaling_error:
progress.set_value(
1,
"NOTE: Initial error bars may be inaccurate - please run stitching to update scaling factor errors.",
1,
)
logging.info("DONE: %s sec", time.time() - t_0)
[docs]
def load_direct_beam_and_data_files(
self,
db_files: List[tuple],
data_files: List[tuple],
additional_peaks: Optional[list] = None,
configuration: Optional[Configuration] = None,
progress: Optional[ProgressReporter] = None,
force: bool = False,
t_0: Optional[float] = None,
):
"""Load direct beam and data files and add them to the direct beam list and reduction list, respectively.
Parameters
----------
db_files: list
List of (run_number, run_file, conf, slice_value) for direct beam files
data_files: list
List of (run_number, run_file, conf, slice_value) for data files
additional_peaks: list | None
List of (peak_index, run_number, run_file, conf, slice_value) for data files for additional peaks
configuration: Configuration
Configuration to base the loaded data on
progress: ProgressReporter
Progress reporter
force: bool
If True, ignore cache and force reloading from file
t_0: float
Start time for logging data loading time
"""
if not t_0:
t_0 = time.time()
n_loaded = 0
n_total = len(db_files) + len(data_files)
for r_id, run_file, conf, slice_value in db_files:
t_i = time.time()
if os.path.isfile(run_file):
is_from_cache = self.load(run_file, conf, force=force, update_parameters=False)
if is_from_cache:
configuration.direct_beam = None
self._nexus_data.update_configuration(conf)
# Set the slice value on the NexusData object
self._nexus_data.slice = slice_value
self.add_active_to_direct_beam_list()
logging.info("%s loaded: %s sec [%s]", r_id, time.time() - t_i, time.time() - t_0)
if progress:
progress.set_value(n_loaded, message="%s loaded" % os.path.basename(run_file), out_of=n_total)
else:
logging.error("File does not exist: %s", run_file)
if progress:
progress.set_value(n_loaded, message="ERROR: %s does not exist" % run_file, out_of=n_total)
n_loaded += 1
for r_id, run_file, conf, slice_value in data_files:
t_i = time.time()
do_files_exist = []
for name in run_file.split("+"):
do_files_exist.append((os.path.isfile(name)))
if all(do_files_exist):
is_from_cache = self.load(run_file, conf, force=force, update_parameters=False)
if is_from_cache:
configuration.direct_beam = None
self._nexus_data.update_configuration(conf)
self.calculate_reflectivity()
# Set the slice value on the NexusData object
self._nexus_data.slice = slice_value
if self.add_active_to_reduction():
logging.info("%s loaded: %s sec [%s]", r_id, time.time() - t_i, time.time() - t_0)
else:
logging.error("Could not load %s", r_id)
if progress:
progress.set_value(n_loaded, message="%s loaded" % os.path.basename(run_file), out_of=n_total)
else:
logging.error("File does not exist: %s", run_file)
if progress:
progress.set_value(n_loaded, message="ERROR: %s does not exist" % run_file, out_of=n_total)
n_loaded += 1
if progress:
progress.set_value(n_total, message="Done", out_of=n_total)
# Initialize any additional peak reduction lists by copying the data from the main reduction list
if additional_peaks:
for peak_index, r_id, run_file, conf, slice_value in additional_peaks:
if peak_index not in self.peak_reduction_lists:
self.peak_reduction_lists[peak_index] = []
self.set_active_reduction_list_index(peak_index)
# find run in main reduction list and make a copy
# Match by file path which handles both single and summed runs correctly
run_index = [i for i, data in enumerate(self.main_reduction_list) if data.file_path == run_file][0]
self._nexus_data = copy.deepcopy(self.main_reduction_list[run_index])
configuration.direct_beam = None
self.update_configuration(conf)
self.calculate_reflectivity()
# Set the slice value on the NexusData object
self._nexus_data.slice = slice_value
self.add_active_to_reduction(peak_index)
@property
def current_event_files(self) -> List[str]:
"""Sorted list of event files in the current directory.
Return only file names with pattern '*event.nxs' or '*.nxs.h5'
"""
event_file_list = glob.glob(os.path.join(self.current_directory, "*event.nxs"))
h5_file_list = glob.glob(os.path.join(self.current_directory, "*.nxs.h5"))
event_file_list.extend(h5_file_list)
return sorted([os.path.basename(name) for name in event_file_list])
[docs]
def reload_files(self, configuration: Optional[Configuration] = None, progress=None):
"""Force reload of files in the reduction lists and direct beam list."""
def _get_nexus_conf(nexus_data: NexusData) -> Configuration:
"""Returns the configuration for the main cross-section of the run."""
return nexus_data.cross_sections[nexus_data.main_cross_section].configuration
# Get files to reload
db_files = [
(nexus.number, nexus.file_path, _get_nexus_conf(nexus), nexus.slice) for nexus in self.direct_beam_list
]
data_files = []
additional_peaks = []
for ipeak, reduction_list in self.peak_reduction_lists.items():
if ipeak == self.MAIN_REDUCTION_LIST_INDEX:
data_files = [
(nexus.number, nexus.file_path, _get_nexus_conf(nexus), nexus.slice) for nexus in reduction_list
]
else:
additional_peaks = [
(ipeak, nexus.number, nexus.file_path, _get_nexus_conf(nexus), nexus.slice)
for nexus in reduction_list
]
# Clear the lists
self.direct_beam_list.clear()
[reduct_list.clear() for reduct_list in self.peak_reduction_lists.values()]
# Reload files and add to reduction and direct beam lists
self.load_direct_beam_and_data_files(db_files, data_files, additional_peaks, configuration, progress, True)
[docs]
def add_additional_reduction_list(self, tab_index: int):
"""Add reduction list for an additional ROI/peak.
Parameters
----------
tab_index: int
Index of the peak in `self.peak_reduction_lists`
"""
if self.main_reduction_list is not None and tab_index not in self.peak_reduction_lists:
self.peak_reduction_lists[tab_index] = copy.deepcopy(self.main_reduction_list)
[docs]
def remove_additional_reduction_list(self, tab_index: int):
"""Remove reduction list for additional ROI/peak.
Parameters
----------
tab_index: int
Index of the peak in `self.peak_reduction_lists`
"""
if tab_index in self.peak_reduction_lists:
self.peak_reduction_lists.pop(tab_index)
[docs]
def set_active_reduction_list_index(self, tab_index: int):
"""Set the active reduction list index.
Parameters
----------
tab_index: int
Index of the peak in `self.peak_reduction_lists`
"""
self.active_reduction_list_index = tab_index
[docs]
def update_active_reduction_list(self, tab_index: int):
"""Updates the active reduction list and run.
Parameters
----------
tab_index: int
Index of the peak in `self.peak_reduction_lists`
"""
self.set_active_reduction_list_index(tab_index)
# Try to restore the last selected row for this tab
if tab_index in self.last_selected_reduction_row:
last_row = self.last_selected_reduction_row[tab_index]
if last_row < len(self.reduction_list):
self.set_active_data_from_reduction_list(last_row)
return
# Otherwise, try to keep same row index as current active data
active_data_idx = self.find_active_data_id()
if active_data_idx and active_data_idx < len(self.reduction_list):
self.set_active_data_from_reduction_list(active_data_idx)
else:
# Default to first row
self.set_active_data_from_reduction_list(0)
[docs]
def update_active_direct_beam(self):
"""Updates the active direct beam when switching to the direct beam tab.
Tries to maintain the previously selected direct beam row index if available,
otherwise defaults to the first direct beam.
"""
# If the current active data is already in the direct beam list, keep it
active_direct_beam_idx = self.find_active_direct_beam_id()
if active_direct_beam_idx is not None:
# Update the tracked index to match current selection
self.last_selected_direct_beam_row = active_direct_beam_idx
return
# Try to restore the last selected direct beam row
if self.last_selected_direct_beam_row < len(self.direct_beam_list):
self.set_active_data_from_direct_beam_list(self.last_selected_direct_beam_row)
elif self.direct_beam_list:
# Default to first direct beam if last selection is out of range
self.set_active_data_from_direct_beam_list(0)
[docs]
def clear_reduction_lists(self):
"""Resets to one empty reduction list."""
self.active_reduction_list_index = 1
self.peak_reduction_lists = {self.active_reduction_list_index: []}