Source code for quicknxs.interfaces.data_handling.data_set

"""
Loader for event nexus files.
Uses Mantid Framework
"""
# pylint: disable=invalid-name, too-many-instance-attributes, line-too-long, multiple-statements, bare-except, wrong-import-order, \
# too-many-locals, too-few-public-methods, wrong-import-position, too-many-public-methods

import copy
import logging
import math
import time
import traceback
from collections import OrderedDict
from typing import Dict, Union

import mantid.simpleapi as api
import numpy as np
from mantid.dataobjects import Workspace2D

from quicknxs.interfaces.configuration import Configuration, get_direct_beam_low_res_roi
from quicknxs.interfaces.data_handling.data_info import DataInfo
from quicknxs.interfaces.data_handling.filepath import FilePath
from quicknxs.interfaces.data_handling.gisans import GISANS
from quicknxs.interfaces.data_handling.off_specular import OffSpecular

# Set Mantid logging level to warnings
api.ConfigService.setLogLevel(3)

# Parameters needed for some calculations.
H_OVER_M_NEUTRON = 3.956034e-7  # h/m_n [m^2/s]

# Number of events under which we throw away a workspace
# TODO: This should be a parameter
N_EVENTS_CUTOFF = 100

# If there's an empty reflectivity curve, add a small value to it so that it can be plotted.
REFLECTIVITY_THRESHOLD_VALUE = 1e-6


def _is_empty_reflectivity_curve(input_workspace: Union[str, Workspace2D]) -> bool:
    r"""
    Check that the reflectivity values are not all zero.

    Parameters
    ----------
    input_workspace
        Reflectivity workspace with only one spectrum.

    Returns
    -------
    True if all reflectivity values are zero, False otherwise.
    """
    workspace = api.mtd[str(input_workspace)]
    return np.all(workspace.readY(0) < REFLECTIVITY_THRESHOLD_VALUE)


def _shift_empty_reflectivity_curve(
    input_workspace: Union[str, Workspace2D], shift=REFLECTIVITY_THRESHOLD_VALUE
) -> None:
    r"""
    Shift the reflectivity values by a small amount so that it can be plotted.

    Parameters
    ----------
    input_workspace
        Reflectivity workspace with only one spectrum.
    """
    workspace = api.mtd[str(input_workspace)]
    workspace.dataY(0)[:] += shift
    workspace.dataE(0)[:] += shift**2  # arbitrary small error


[docs] def getIxyt(nxs_data): """ Return [x, y, TOF] array @param nxs_data: Mantid workspace """ _tof_axis = nxs_data.readX(0)[:].copy() nbr_tof = len(_tof_axis) sz_y_axis = int(nxs_data.getInstrument().getNumberParameter("number-of-y-pixels")[0]) # 256 sz_x_axis = int(nxs_data.getInstrument().getNumberParameter("number-of-x-pixels")[0]) # 304 _y_axis = np.zeros((sz_x_axis, sz_y_axis, nbr_tof - 1)) _y_error_axis = np.zeros((sz_x_axis, sz_y_axis, nbr_tof - 1)) for x in range(sz_x_axis): for y in range(sz_y_axis): _index = int(sz_y_axis * x + y) _y_axis[x, y, :] = nxs_data.readY(_index)[:] _y_error_axis[x, y, :] = nxs_data.readE(_index)[:] return _y_axis, _y_error_axis
[docs] class CrossSectionData(object): """ Data object to hold loaded reflectivity data """ # Instrument specific attributes filled out by the instrument object dist_sam_det = 0 dist_mod_det = 0 dist_mod_mon = 0 dist_mod_mon = 0 dist_mod_mon = 0 dangle = 0 dangle0 = 0 det_size_x = 0.0007 det_size_y = 0.0007 lambda_center = 0 def __init__(self, name, configuration, entry_name="entry", workspace=None): self.name = name self.entry_name = entry_name self.cross_section_label = entry_name self.measurement_type = "polarized" self.configuration = copy.deepcopy(configuration) self.number: str = "" # can be singe number (e.g. '1234') or a composite (e.g '1234:1239+1245') self.q = None self._r = None self._dr = None self._event_workspace = None # Flag to tell us whether we succeeded in using the meta data ROI self.use_roi_actual = True # Flag to tell us whether we found this data to be a direct beam data set self.is_direct_beam = False self._active_area_x = None self._active_area_y = None self.logs = {} self.log_minmax = {} self.log_units = {} self.proton_charge = 0 self.total_counts = 0 self.total_time = 0 self.experiment = "" self.merge_warnings = "" self.tof_edges = None # Data used for plotting self.data = None self.xydata = None self.xtofdata = None self.raw_error = None self.meta_data_roi_peak = None self.meta_data_roi_bck = None self._direct_pixel = 0 self._angle_offset = 0 self.scattering_angle = 0 self._reflectivity_workspace = None # Offset data self.off_spec = None # GISANS data self.gisans_data = None if workspace: self.collect_info(workspace) # Properties for easy data access # # return the size of the data stored in memory for this dataset # pylint: disable=missing-docstring @property def event_workspace(self): if str(self._event_workspace) in api.mtd: return api.mtd[self._event_workspace] return None @property def reflectivity_workspace(self): if str(self._reflectivity_workspace) in api.mtd: return api.mtd[self._reflectivity_workspace] return None @property def dpix(self): logging.error("CrossSectionData.dpix is deprecated") return self.direct_pixel @property def dangle0(self): logging.error("CrossSectionData.dangle0 is deprecated") return self.angle_offset @property def direct_pixel(self): if self.configuration.set_direct_pixel: return self.configuration.direct_pixel_overwrite return self._direct_pixel @direct_pixel.setter def direct_pixel(self, value): self._direct_pixel = value @property def angle_offset(self): if self.configuration.set_direct_angle_offset: return self.configuration.direct_angle_offset_overwrite return self._angle_offset @angle_offset.setter def angle_offset(self, value): self._angle_offset = value @property def xdata(self): return self.xydata.mean(axis=0) @property def ydata(self): return self.xydata.mean(axis=1) @property def tofdata(self): return self.xtofdata.mean(axis=0) # coordinates corresponding to the data items @property def x(self): return np.arange(self.xydata.shape[1]) @property def y(self): return np.arange(self.xydata.shape[0]) @property def xy(self): return np.meshgrid(self.x, self.y) @property def tof(self): return (self.tof_edges[:-1] + self.tof_edges[1:]) / 2.0 @property def xtof(self): return np.meshgrid(self.tof, self.x) @property def r(self): if self._r is None: return None return self._r * self.configuration.scaling_factor @property def raw_r(self): return self._r @property def raw_dr(self): return self._dr @property def dr(self): if self._dr is None: return None return np.sqrt( (self._dr * self.configuration.scaling_factor) ** 2 + (self.configuration.scaling_error * self._r) ** 2 ) @property def wavelength_range(self): """ Returns the wavelength range """ # TODO: use the skipped points to trim the wl band. # The following would work, but not if we perform a final rebin. # We should use the final Q binning to determine the wl range. # That's because we cut points in Q, but it has to be consistent # throughout the application even when wavelength is plotted. # PN = len(self.tof)-self.configuration.cut_first_n_points-1 # P0 = self.configuration.cut_last_n_points wl_min = H_OVER_M_NEUTRON / self.dist_mod_det * self.tof[0] * 1.0e4 wl_max = H_OVER_M_NEUTRON / self.dist_mod_det * self.tof[-1] * 1.0e4 return wl_min, wl_max @property def wavelength(self): h = 6.626e-34 # m^2 kg s^-1 m = 1.675e-27 # kg v_n = self.dist_mod_det / self.tof * 1e6 # m/s return h / m / v_n * 1e10 # A @property def active_area_x(self): if self._active_area_x is None: return (0, self.xydata.shape[0]) return self._active_area_x @active_area_x.setter def active_area_x(self, value): self._active_area_x = value @property def active_area_y(self): if self._active_area_y is None: return (0, self.xydata.shape[1]) return self._active_area_y @active_area_y.setter def active_area_y(self, value): self._active_area_y = value # pylint: enable=missing-docstring # Properties for easy data access #
[docs] def collect_info(self, workspace): """ Extract meta data from DASLogs. TODO: get average of values post filtering so that it truly represents the data """ self._event_workspace = str(workspace) data = workspace.getRun() self.logs = {} self.log_minmax = {} self.log_units = {} for motor in data.keys(): if motor in ["proton_charge", "frequency", "Veto_pulse"]: continue item = data[motor] try: self.log_units[motor] = str(item.units) if item.type == "string": pass # self.logs[motor] = item.value # self.log_minmax[motor] = (item.value, item.value) elif item.type == "number": self.logs[motor] = np.float64(item.value) self.log_minmax[motor] = (np.float64(item.value), np.float64(item.value)) else: stats = item.getStatistics() self.logs[motor] = np.float64(stats.mean) self.log_minmax[motor] = (np.float64(stats.minimum), np.float64(stats.maximum)) except: logging.error("Error reading DASLogs %s", motor) self.proton_charge = data["gd_prtn_chrg"].value self.total_counts = workspace.getNumberEvents() self.total_time = data["duration"].value self.experiment = str(data["experiment_identifier"].value) self.number = workspace.getRun().getProperty("run_numbers").value self.merge_warnings = "" # Retrieve instrument-specific information self.configuration.instrument.get_info(workspace, self)
[docs] def process_configuration(self): """ Process loaded data :param bool update_parameters: If true, we will determine reduction parameters """ self.scattering_angle = self.configuration.instrument.scattering_angle_from_data(self) # Determine binning # TODO: only the TOF binning is implemented if self.configuration.tof_overwrite is not None: tof_edges = self.configuration.tof_overwrite else: if self.configuration.tof_bin_type == 1: # constant Q tof_edges = 1.0 / np.linspace( 1.0 / self.configuration.tof_range[0], 1.0 / self.configuration.tof_range[1], self.configuration.tof_bins + 1, ) elif self.configuration.tof_bin_type == 2: # constant 1/wavelength tof_edges = self.configuration.tof_range[0] * ( ( (self.configuration.tof_range[1] / self.configuration.tof_range[0]) ** (1.0 / self.configuration.tof_bins) ) ** np.arange(self.configuration.tof_bins + 1) ) else: tof_edges = np.arange( self.configuration.tof_range[0], self.configuration.tof_range[1], self.configuration.tof_bins ) self.tof_edges = tof_edges
[docs] def prepare_plot_data(self): """ Bin events to be used for plotting and in-app calculations """ workspace = api.mtd[self._event_workspace] if self.xtofdata is None: t_0 = time.time() binning_ws = api.CreateWorkspace(DataX=self.tof_edges, DataY=np.zeros(len(self.tof_edges) - 1)) data_rebinned = api.RebinToWorkspace(WorkspaceToRebin=workspace, WorkspaceToMatch=binning_ws) Ixyt, Ixyt_error = getIxyt(data_rebinned) # Create projections for the 2D datasets Ixy = Ixyt.sum(axis=2) Ixt = Ixyt.sum(axis=1) # Store the data self.data = Ixyt.astype(float) # 3D dataset self.raw_error = Ixyt_error.astype(float) # 3D dataset self.xydata = Ixy.transpose().astype(float) # 2D dataset self.xtofdata = Ixt.astype(float) # 2D dataset logging.info("Plot data generated: %s sec", time.time() - t_0)
[docs] def get_reduction_parameters(self, update_parameters=True): """ Determine reduction parameter :param bool update_parameters: if True, we will find peak ranges """ workspace = api.mtd[self._event_workspace] data_info = DataInfo(workspace, self.name, self.configuration) self.configuration.tof_range = data_info.tof_range if update_parameters: # data_info = DataInfo(workspace, self.name, self.configuration) self.use_roi_actual = data_info.use_roi_actual self.is_direct_beam = data_info.is_direct_beam self.meta_data_roi_peak = data_info.roi_peak self.meta_data_roi_bck = data_info.roi_background if self.configuration.use_peak_finder or self.configuration.use_roi: self.configuration.peak_roi = data_info.peak_range if self.configuration.use_low_res_finder or self.configuration.use_roi: self.configuration.low_res_roi = data_info.low_res_range self.configuration.bck_roi = data_info.background self.process_configuration()
[docs] def get_counts_vs_TOF(self): """ Used for normalization, returns ROI counts vs TOF. """ self.prepare_plot_data() # Calculate ROI intensities and normalize by number of points raw_data = self.data[ self.configuration.peak_roi[0] : self.configuration.peak_roi[1], self.configuration.low_res_roi[0] : self.configuration.low_res_roi[1], :, ] size_roi = float( (self.configuration.low_res_roi[1] - self.configuration.low_res_roi[0]) * (self.configuration.peak_roi[1] - self.configuration.peak_roi[0]) ) summed_raw = raw_data.sum(axis=0).sum(axis=0) # Remove the background bck = self.get_background_vs_TOF() return (summed_raw / math.fabs(size_roi) - bck) / self.proton_charge
[docs] def get_tof_counts_table(self): """ Get a table of TOF vs counts in the region-of-interest (ROI) The table columns are: - TOF - wavelength - counts normalized by proton charge - error in counts normalized by proton charge - counts - error in counts - size of the ROI """ self.prepare_plot_data() # Calculate ROI intensities and normalize by number of points data_roi = self.data[ self.configuration.peak_roi[0] : self.configuration.peak_roi[1], self.configuration.low_res_roi[0] : self.configuration.low_res_roi[1], :, ] counts_roi = data_roi.sum(axis=(0, 1)) raw_error_roi = self.raw_error[ self.configuration.peak_roi[0] : self.configuration.peak_roi[1], self.configuration.low_res_roi[0] : self.configuration.low_res_roi[1], :, ] counts_roi_error = np.linalg.norm(raw_error_roi, axis=(0, 1)) # square root of sum of squares if self.proton_charge > 0.0: counts_roi_normalized = counts_roi / self.proton_charge counts_roi_normalized_error = counts_roi_error / self.proton_charge else: counts_roi_normalized = counts_roi counts_roi_normalized_error = counts_roi_error size_roi = len(counts_roi) * [ float( (self.configuration.low_res_roi[1] - self.configuration.low_res_roi[0]) * (self.configuration.peak_roi[1] - self.configuration.peak_roi[0]) ) ] data_table = np.vstack( ( self.tof, self.wavelength, counts_roi_normalized, counts_roi_normalized_error, counts_roi, counts_roi_error, size_roi, ) ).T header = "tof wavelength counts_normalized counts_normalized_error counts counts_error size_roi" return data_table, header
[docs] def get_background_vs_TOF(self): """ Returns the background counts vs TOF """ # Find the background pixels to use, excluding the peak if there's an overlap. dims = self.data.shape indices = [ (i >= self.configuration.bck_roi[0] and i < self.configuration.bck_roi[1]) and not (i >= self.configuration.peak_roi[0] and i < self.configuration.peak_roi[1]) for i in range(dims[0]) ] indices = np.asarray(indices) n_bins = len(indices[indices == True]) raw_bck = self.data[indices, self.configuration.low_res_roi[0] : self.configuration.low_res_roi[1], :] summed_bck = raw_bck.sum(axis=0).sum(axis=0) size_bck = float(n_bins * (self.configuration.low_res_roi[1] - self.configuration.low_res_roi[0])) return summed_bck / math.fabs(size_bck)
[docs] def update_calculated_values(self): """ Update parameters that are calculated from the configuration """ self.scattering_angle = self.configuration.instrument.scattering_angle_from_data(self)
[docs] def update_configuration(self, configuration): """ Update configuration """ if configuration is not None: self.configuration = copy.deepcopy(configuration) self.update_calculated_values() self.process_configuration() # We want to keep consistency between the specular calculations # and the off-spec and GISANS ones. So clear the off-spec and GISANS # to force a recalculation. # TODO: This is a problem when switching beteewn the Off-spec and GISANS tabs self.off_spec = None self.gisans_data = None
[docs] def reflectivity(self, direct_beam=None, configuration=None): """ Compute reflectivity """ self.q = None self._r = None self._dr = None if configuration is not None: self.configuration = copy.deepcopy(configuration) if self.configuration is None: return # If a direct beam object was passed, use it. apply_norm = direct_beam is not None # and not self.is_direct_beam if not apply_norm: direct_beam = CrossSectionData("none", self.configuration, "none") logging.info( "%s:%s Reduction with DB: %s [config: %s]", self.number, self.entry_name, direct_beam.number, self.configuration.normalization, ) angle_offset = 0 # Offset from dangle0, in radians def _as_ints(a): return [int(round(a[0])), int(round(a[1]))] output_ws = "r%s_%s" % (self.number, str(self.entry_name)) ws_norm = None if apply_norm and direct_beam._event_workspace is not None: ws_norm = direct_beam._event_workspace logging.info( "Calc: %s %s %s", str(_as_ints(self.configuration.peak_roi)), str(_as_ints(self.configuration.bck_roi)), str(_as_ints(self.configuration.low_res_roi)), ) _dirpix = configuration.direct_pixel_overwrite if configuration.set_direct_pixel else None _dangle0 = configuration.direct_angle_offset_overwrite if configuration.set_direct_angle_offset else None direct_beam_low_res_roi = get_direct_beam_low_res_roi(conf, direct_beam.configuration) ws = api.MagnetismReflectometryReduction( InputWorkspace=self._event_workspace, NormalizationWorkspace=ws_norm, SignalPeakPixelRange=_as_ints(self.configuration.peak_roi), SubtractSignalBackground=True, SignalBackgroundPixelRange=_as_ints(self.configuration.bck_roi), ApplyNormalization=apply_norm, NormPeakPixelRange=_as_ints(direct_beam.configuration.peak_roi), SubtractNormBackground=True, NormBackgroundPixelRange=_as_ints(direct_beam.configuration.bck_roi), CutLowResDataAxis=True, LowResDataAxisPixelRange=_as_ints(self.configuration.low_res_roi), CutLowResNormAxis=True, LowResNormAxisPixelRange=direct_beam_low_res_roi, CutTimeAxis=True, QMin=0.001, QStep=-0.01, AngleOffset=angle_offset, UseWLTimeAxis=False, TimeAxisStep=self.configuration.tof_bins, UseSANGLE=not self.configuration.use_dangle, TimeAxisRange=self.configuration.tof_range, SpecularPixel=self.configuration.peak_position, ConstantQBinning=self.configuration.use_constant_q, # EntryName=str(self.entry_name), ConstQTrim=0.1, AcceptNullReflectivity=True, # return empty reflectivity curves (all intensities are zero) ErrorWeightedBackground=False, SampleLength=self.configuration.sample_size, DAngle0Overwrite=_dangle0, DirectPixelOverwrite=_dirpix, OutputWorkspace=output_ws, ) # If there's an empty reflectivity curve, add a small value to it so that it can be plotted. if _is_empty_reflectivity_curve(ws): _shift_empty_reflectivity_curve(ws) # FOR COMPATIBILITY WITH QUICKNXS # run_object = ws.getRun() peak_min = run_object.getProperty("scatt_peak_min").value peak_max = run_object.getProperty("scatt_peak_max").value low_res_min = run_object.getProperty("scatt_low_res_min").value low_res_max = run_object.getProperty("scatt_low_res_max").value norm_x_min = run_object.getProperty("norm_peak_min").value norm_x_max = run_object.getProperty("norm_peak_max").value norm_y_min = run_object.getProperty("norm_low_res_min").value norm_y_max = run_object.getProperty("norm_low_res_max").value tth = ws.getRun().getProperty("SANGLE").getStatistics().mean * math.pi / 180.0 quicknxs_scale = (float(norm_x_max) - float(norm_x_min)) * (float(norm_y_max) - float(norm_y_min)) quicknxs_scale /= (float(peak_max) - float(peak_min)) * (float(low_res_max) - float(low_res_min)) quicknxs_scale *= 0.005 / math.sin(tth) ws = api.Scale(InputWorkspace=output_ws, OutputWorkspace=output_ws, factor=quicknxs_scale, Operation="Multiply") # self.q = ws.readX(0)[:].copy() self._r = ws.readY(0)[:].copy() # * self.configuration.scaling_factor self._dr = ws.readE(0)[:].copy() # * self.configuration.scaling_factor # DeleteWorkspace(ws) self._reflectivity_workspace = str(ws)
[docs] def offspec(self, direct_beam=None): """ Extract off-specular scattering from 4D dataset (x,y,ToF,I). Uses a window in y to filter the 4D data and than sums all I values for each ToF and x channel. Qz,Qx,kiz,kfz is calculated using the x and ToF positions together with the tth-bank and direct pixel values. :param CrossSectionData direct_beam: if given, this data will be used to normalize the output """ self.prepare_plot_data() if direct_beam: direct_beam.prepare_plot_data() self.off_spec = OffSpecular(self) return self.off_spec(direct_beam)
[docs] def gisans(self, direct_beam=None): """ Compute GISANS :param CrossSectionData direct_beam: if given, this data will be used to normalize the output """ self.prepare_plot_data() if direct_beam: direct_beam.prepare_plot_data() self.gisans_data = GISANS(self) self.gisans_data(direct_beam) self.SGrid = self.gisans_data.SGrid self.QyGrid = self.gisans_data.QyGrid self.QzGrid = self.gisans_data.QzGrid return self.gisans_data
[docs] class NexusData(object): """Read a nexus file with multiple cross-section data.""" def __init__(self, file_path: str, configuration: Configuration) -> None: """Structure to read in one or more Nexus data files Parameters ---------- file_path: str absolute path to one or more files. If more than one, paths are concatenated with the plus symbol '+' configuration: Configuration Reduction configuration """ self.file_path = FilePath(file_path).path # sort the paths if more than one self.number: str = "" # can be a singe number (e.g. '1234') or a composite (e.g '1234:1239+1245') self.configuration = configuration self.cross_sections: Dict[str, CrossSectionData] = {} self.main_cross_section: str = None
[docs] def get_highest_cross_section(self, n_points=10): """Get the cross-section with the largest signal at the lower end of its Q range. Parameters ---------- n_points: int number of points to average over """ n_events = 0 large_xs = None for xs in self.cross_sections: if self.cross_sections[xs].raw_r is not None: _r = self.cross_sections[xs].raw_r _dr = self.cross_sections[xs].raw_dr npts = min(len(_r), n_points) _n_events = np.sum(_r[:npts] / _dr[:npts] ** 2) / np.sum(1 / _dr[:npts] ** 2) if _n_events > n_events: n_events = _n_events large_xs = xs return large_xs
[docs] def get_q_range(self): """ Return the Q range for the cross-sections """ q_min = None q_max = None for xs in self.cross_sections: if self.cross_sections[xs].q is not None: if q_min is None: q_min = self.cross_sections[xs].q.min() q_max = self.cross_sections[xs].q.max() else: q_min = min(q_min, self.cross_sections[xs].q.min()) q_max = max(q_max, self.cross_sections[xs].q.max()) return q_min, q_max
[docs] def get_reflectivity_workspace_group(self): ws_list = [self.cross_sections[xs]._reflectivity_workspace for xs in self.cross_sections] wsg = api.GroupWorkspaces(InputWorkspaces=ws_list) return wsg
[docs] def set_parameter(self, param, value): """ Loop through the cross-section data sets and update a parameter. """ has_changed = False try: for xs in self.cross_sections: if hasattr(self.cross_sections[xs].configuration, param): if not getattr(self.cross_sections[xs].configuration, param) == value: setattr(self.cross_sections[xs].configuration, param, value) has_changed = True except: logging.error("Could not set parameter %s %s", param, value) return has_changed
[docs] def calculate_reflectivity(self, direct_beam=None, configuration=None, ws_suffix: str = ""): """ Loop through the cross-section data sets and update the reflectivity. Parameters ---------- direct_beam: CrossSectionData | None Direct beam data configuration: Configuration | None The configuration ws_suffix: str String to add to reflectivity workspace name Example ------- `ws_suffix` is used when reducing multiple ROIs for the same run and cross-section, to differentiate the workspace names in the Mantid data service peak_index = 2 # update the active reduction list data_manager.set_active_reduction_list_index(peak_index) # get the first data set of the active reduction list nexus_data = data_manager.reduction_list[0] # calculate the reflectivity for this data set nexus_data.calculate_reflectivity(ws_suffix=str(peak_index)) """ if configuration is not None: self.configuration = copy.deepcopy(configuration) if self.configuration is None: return # If a direct beam object was passed, use it. apply_norm = direct_beam is not None # and not self.is_direct_beam if not apply_norm: direct_beam = CrossSectionData("none", self.configuration, "none") logging.info( "%s Reduction with DB: %s [config: %s]", self.number, direct_beam.number, self.configuration.normalization ) angle_offset = 0 # Offset from dangle0, in radians def _as_ints(a): return [int(round(a[0])), int(round(a[1])) - 1] output_ws = "r%s_%s" % (self.number, ws_suffix) ws_norm = None if apply_norm and direct_beam._event_workspace is not None: ws_norm = direct_beam._event_workspace ws_list = [self.cross_sections[xs]._event_workspace for xs in self.cross_sections] conf = self.cross_sections[self.main_cross_section].configuration wsg = api.GroupWorkspaces(InputWorkspaces=ws_list) _dirpix = conf.direct_pixel_overwrite if conf.set_direct_pixel else None _dangle0 = conf.direct_angle_offset_overwrite if conf.set_direct_angle_offset else None direct_beam_low_res_roi = get_direct_beam_low_res_roi(conf, direct_beam.configuration) final_rebin = False q_step = 0.0 if conf.do_final_rebin_global: final_rebin = True q_step = conf.final_rebin_step_global else: final_rebin = conf.do_final_rebin_run q_step = conf.final_rebin_step_run # The reduced data workspace may be a group or a single # workspace depending on the InputWorkspace parameter ws = api.MagnetismReflectometryReduction( InputWorkspace=wsg, NormalizationWorkspace=ws_norm, SignalPeakPixelRange=_as_ints(conf.peak_roi), SubtractSignalBackground=conf.subtract_background, SignalBackgroundPixelRange=_as_ints(conf.bck_roi), ApplyNormalization=apply_norm, NormPeakPixelRange=_as_ints(direct_beam.configuration.peak_roi), SubtractNormBackground=conf.subtract_background, NormBackgroundPixelRange=_as_ints(direct_beam.configuration.bck_roi), CutLowResDataAxis=True, LowResDataAxisPixelRange=_as_ints(conf.low_res_roi), CutLowResNormAxis=True, LowResNormAxisPixelRange=direct_beam_low_res_roi, CutTimeAxis=True, FinalRebin=final_rebin, QMin=0.001, QStep=q_step, RoundUpPixel=False, AngleOffset=angle_offset, UseWLTimeAxis=False, TimeAxisStep=conf.tof_bins, UseSANGLE=not conf.use_dangle, TimeAxisRange=conf.tof_range, SpecularPixel=conf.peak_position, ConstantQBinning=conf.use_constant_q, ConstQTrim=0.1, CropFirstAndLastPoints=False, CleanupBadData=final_rebin, AcceptNullReflectivity=True, # return empty reflectivity curves (all intensities are zero) ErrorWeightedBackground=False, SampleLength=conf.sample_size, DAngle0Overwrite=_dangle0, DirectPixelOverwrite=_dirpix, OutputWorkspace=output_ws, ) # If there's an empty reflectivity curve, add a small value to it so that it can be plotted. _xs_ws = [ws] if isinstance(ws, Workspace2D) else ws for i in range(len(ws_list)): if _is_empty_reflectivity_curve(_xs_ws[i]): _shift_empty_reflectivity_curve(_xs_ws[i]) # FOR COMPATIBILITY WITH QUICKNXS # _ws = ws[0] if len(ws_list) > 1 else ws run_object = _ws.getRun() peak_min = run_object.getProperty("scatt_peak_min").value peak_max = run_object.getProperty("scatt_peak_max").value + 1.0 low_res_min = run_object.getProperty("scatt_low_res_min").value low_res_max = run_object.getProperty("scatt_low_res_max").value + 1.0 norm_x_min = run_object.getProperty("norm_peak_min").value norm_x_max = run_object.getProperty("norm_peak_max").value + 1.0 norm_y_min = run_object.getProperty("norm_low_res_min").value norm_y_max = run_object.getProperty("norm_low_res_max").value + 1.0 tth = run_object.getProperty("two_theta").value * math.pi / 360.0 quicknxs_scale = (float(norm_x_max) - float(norm_x_min)) * (float(norm_y_max) - float(norm_y_min)) quicknxs_scale /= (float(peak_max) - float(peak_min)) * (float(low_res_max) - float(low_res_min)) logging.warning("Scale size = %s", str(quicknxs_scale)) logging.warning("Alpha_i = %s", str(tth)) _scale = 0.005 / math.sin(tth) if tth > 0.0002 else 1.0 quicknxs_scale *= _scale ws = api.Scale(InputWorkspace=output_ws, OutputWorkspace=output_ws, factor=quicknxs_scale, Operation="Multiply") _ws = ws if len(ws_list) > 1 else [ws] for xs in _ws: # add suffix to avoid overwriting ws in mantid data service, needed for multiple peaks api.RenameWorkspace(str(xs), str(xs) + ws_suffix) xs_id = xs.getRun().getProperty("cross_section_id").value self.cross_sections[xs_id].q = xs.readX(0)[:].copy() self.cross_sections[xs_id]._r = np.ma.masked_equal(xs.readY(0)[:].copy(), 0) self.cross_sections[xs_id]._dr = np.ma.masked_equal(xs.readE(0)[:].copy(), 0) self.cross_sections[xs_id]._reflectivity_workspace = str(xs) self.cross_sections[xs_id]._reflectivity_workspacegroup = str(ws)
[docs] def calculate_gisans(self, direct_beam, progress=None): """ Compute GISANS """ has_errors = False detailed_msg = "" if progress is not None: progress(1, "Computing GISANS", out_of=100.0) for i, xs in enumerate(self.cross_sections): try: self.cross_sections[xs].gisans(direct_beam=direct_beam) except: has_errors = True detailed_msg += "Could not calculate GISANS reflectivity for %s\n %s\n\n" % ( xs, traceback.format_exc(), ) logging.error("Could not calculate GISANS reflectivity for %s", xs) if progress: progress(i, message="Computed GISANS %s" % xs, out_of=len(self.cross_sections)) if has_errors: raise RuntimeError(detailed_msg) if progress is not None: progress(100, "Complete", out_of=100.0)
[docs] def is_offspec_available(self): """ Verify whether we have off-specular data calculated for all cross-sections """ for xs in self.cross_sections: if self.cross_sections[xs].off_spec is None: return False return True
[docs] def is_gisans_available(self): """ Verify whether we have GISANS data calculated for all cross-sections """ for xs in self.cross_sections: if self.cross_sections[xs].gisans_data is None: return False return True
[docs] def calculate_offspec(self, direct_beam=None): """ Loop through the cross-section data sets and update the reflectivity. """ has_errors = False detailed_msg = "" for xs in self.cross_sections: try: self.cross_sections[xs].offspec(direct_beam=direct_beam) except Exception: has_errors = True detailed_msg += "Could not calculate off-specular reflectivity for %s\n %s\n\n" % ( xs, traceback.format_exc(), ) logging.error(detailed_msg) if has_errors: raise RuntimeError(detailed_msg)
[docs] def update_configuration(self, configuration): """ Loop through the cross-section data sets and update the reflectivity. """ for xs in self.cross_sections: try: self.cross_sections[xs].update_configuration(configuration) except Exception as err: logging.error(f"Could not update configuration for {xs}: {err}")
[docs] def update_calculated_values(self): """ Loop through the cross-section data sets and update. """ for xs in self.cross_sections: self.cross_sections[xs].update_calculated_values()
[docs] def load(self, update_parameters=True, progress=None): """ Load cross-sections from a nexus file. :param function progress: call-back function to track progress :param bool update_parameters: if True, we will find peak ranges """ # sanity check if self.file_path is None: raise RuntimeError("self.file_path is None") self.cross_sections = OrderedDict() if progress is not None: progress(5, "Filtering data...", out_of=100.0) try: xs_list = self.configuration.instrument.load_data(self.file_path, self.configuration) logging.info("%s loaded: %s xs", self.file_path, len(xs_list)) except RuntimeError as run_err: logging.error(f"Could not load file(s) {str(self.file_path)}\n {run_err}") return self.cross_sections progress_value = 0 # Keep track of cross-section with max counts so we can use it to # select peak regions _max_counts = 0 _max_xs = None _loaded_with_getDI = False for ws in xs_list: # Get the unique name for the cross-section, determined by the filtering channel = ws.getRun().getProperty("cross_section_id").value if ws.getRun().hasProperty("loaded_with_getDI"): _loaded_with_getDI = True if progress is not None: progress_value += int(100.0 / len(xs_list)) progress(progress_value, "Loading %s..." % str(channel), out_of=100.0) # Get rid of empty workspaces logging.info("Loading %s: %s events", str(channel), ws.getNumberEvents()) if ws.getNumberEvents() < self.configuration.nbr_events_min: logging.warning("Too few events for %s: %s", channel, ws.getNumberEvents()) continue name = ws.getRun().getProperty("cross_section_id").value cross_section = CrossSectionData(name, self.configuration, entry_name=channel, workspace=ws) self.cross_sections[name] = cross_section self.number = cross_section.number # e.g '1234:1238+1239' if more than one run made up this cross section if cross_section.total_counts > _max_counts: _max_counts = cross_section.total_counts _max_xs = name # Now that we know which cross section has the most data, # use that one to get the reduction parameters self.main_cross_section = _max_xs self.cross_sections[_max_xs].get_reduction_parameters(update_parameters=update_parameters) # Push the configuration (reduction options and peak regions) from the # cross-section with the most data to all other cross-sections. for xs in self.cross_sections: if xs == _max_xs: continue self.cross_sections[xs].update_configuration(self.cross_sections[_max_xs].configuration) if progress is not None: progress(100, "Complete", out_of=100.0) if _loaded_with_getDI: progress(100, "WARNING: Analyzer/polarizer states not available - loaded with getDI", out_of=100.0) return self.cross_sections
[docs] def is_direct_beam(self): """Returns True if the main cross-section is a direct beam""" return self.cross_sections[self.main_cross_section].is_direct_beam
[docs] class NexusMetaData(object): """ Class used to hold meta-data read before loading the neutron events """ mid_q = 0 is_direct_beam = False