"""Loader for event nexus files using Mantid Framework."""
# pylint: disable=invalid-name, too-many-instance-attributes, line-too-long, multiple-statements, bare-except, wrong-import-order, \
# too-many-locals, too-few-public-methods, wrong-import-position, too-many-public-methods
import copy
import logging
import math
import time
import traceback
from collections import OrderedDict
from typing import TYPE_CHECKING, Callable, Dict, Optional, Union
import mantid.simpleapi as api
import numpy as np
from mantid.dataobjects import Workspace2D
from quicknxs.interfaces.configuration import BinningType, Configuration, get_direct_beam_low_res_roi
from quicknxs.interfaces.data_handling.data_info import DataInfo
from quicknxs.interfaces.data_handling.filepath import FilePath
from quicknxs.interfaces.data_handling.gisans import GISANS
from quicknxs.interfaces.data_handling.off_specular import OffSpecular
if TYPE_CHECKING:
from quicknxs.interfaces.configuration import Configuration
# Set Mantid logging level to warnings
api.ConfigService.setLogLevel(3)
# Parameters needed for some calculations.
H_OVER_M_NEUTRON = 3.956034e-7 # h/m_n [m^2/s]
# Number of events under which we throw away a workspace
# TODO: This should be a parameter
N_EVENTS_CUTOFF = 100
# If there's an empty reflectivity curve, add a small value to it so that it can be plotted.
REFLECTIVITY_THRESHOLD_VALUE = 1e-6
def _is_empty_reflectivity_curve(input_workspace: Union[str, Workspace2D]) -> bool:
r"""
Check that the reflectivity values are not all zero.
Parameters
----------
input_workspace
Reflectivity workspace with only one spectrum.
Returns
-------
True if all reflectivity values are zero, False otherwise.
"""
workspace = api.mtd[str(input_workspace)]
return np.all(workspace.readY(0) < REFLECTIVITY_THRESHOLD_VALUE)
def _shift_empty_reflectivity_curve(
input_workspace: Union[str, Workspace2D], shift=REFLECTIVITY_THRESHOLD_VALUE
) -> None:
r"""
Shift the reflectivity values by a small amount so that it can be plotted.
Parameters
----------
input_workspace
Reflectivity workspace with only one spectrum.
"""
workspace = api.mtd[str(input_workspace)]
workspace.dataY(0)[:] += shift
workspace.dataE(0)[:] += shift**2 # arbitrary small error
[docs]
def getIxyt(nxs_data: Workspace2D) -> tuple:
"""Return [x, y, TOF] array and [x, y, TOF] error array from a Nexus file."""
_tof_axis = nxs_data.readX(0)[:].copy()
nbr_tof = len(_tof_axis)
sz_y_axis = int(nxs_data.getInstrument().getNumberParameter("number-of-y-pixels")[0]) # 256
sz_x_axis = int(nxs_data.getInstrument().getNumberParameter("number-of-x-pixels")[0]) # 304
_y_axis = np.zeros((sz_x_axis, sz_y_axis, nbr_tof - 1))
_y_error_axis = np.zeros((sz_x_axis, sz_y_axis, nbr_tof - 1))
for x in range(sz_x_axis):
for y in range(sz_y_axis):
_index = int(sz_y_axis * x + y)
_y_axis[x, y, :] = nxs_data.readY(_index)[:]
_y_error_axis[x, y, :] = nxs_data.readE(_index)[:]
return _y_axis, _y_error_axis
[docs]
class CrossSectionData(object):
"""Data object to hold loaded reflectivity data."""
# Instrument specific attributes filled out by the instrument object
dist_sam_det = 0.0
dist_mod_det = 0.0
dist_mod_mon = 0.0
dangle = 0.0
det_size_x = 0.0007
det_size_y = 0.0007
lambda_center = 0.0
def __init__(self, name, configuration: "Configuration", entry_name="entry", workspace=None):
self.name = name
self.entry_name = entry_name
self.cross_section_label = entry_name
self.measurement_type = "polarized"
self.configuration = copy.deepcopy(configuration)
self.number: str = "" # can be single number (e.g. '1234') or a composite (e.g '1234:1239+1245')
self.q = None
self._r = None
self._dr = None
self._event_workspace = None
# Flag to tell us whether we succeeded in using the meta data ROI
self.use_roi_actual = True
# Flag to tell us whether we found this data to be a direct beam data set
self.is_direct_beam = False
self._active_area_x = None
self._active_area_y = None
self.logs = {}
self.log_minmax = {}
self.log_units = {}
self.proton_charge = 0
self.total_counts = 0
self.total_time = 0
self.experiment = ""
self.merge_warnings = ""
self.tof_edges = None
# Data used for plotting
self.data = None
self.xydata = None
self.xtofdata = None
self.raw_error = None
self._direct_pixel = 0
self._angle_offset = 0
self.scattering_angle = 0
self._reflectivity_workspace = None
# Offset data
self.off_spec = None
# GISANS data
self.gisans_data = None
if workspace:
self.collect_info(workspace)
# Properties for easy data access #
# return the size of the data stored in memory for this dataset
# pylint: disable=missing-docstring
@property
def event_workspace(self):
if str(self._event_workspace) in api.mtd:
return api.mtd[self._event_workspace]
return None
@property
def reflectivity_workspace(self):
if str(self._reflectivity_workspace) in api.mtd:
return api.mtd[self._reflectivity_workspace]
return None
@property
def dpix(self):
logging.error("CrossSectionData.dpix is deprecated")
return self.direct_pixel
@property
def dangle0(self):
logging.error("CrossSectionData.dangle0 is deprecated")
return self.angle_offset
@property
def direct_pixel(self):
if self.configuration.set_direct_pixel:
return self.configuration.direct_pixel_overwrite
return self._direct_pixel
@direct_pixel.setter
def direct_pixel(self, value):
self._direct_pixel = value
@property
def angle_offset(self):
if self.configuration.set_direct_angle_offset:
return self.configuration.direct_angle_offset_overwrite
return self._angle_offset
@angle_offset.setter
def angle_offset(self, value):
self._angle_offset = value
@property
def xdata(self):
return self.xydata.mean(axis=0)
@property
def ydata(self):
return self.xydata.mean(axis=1)
@property
def tofdata(self):
return self.xtofdata.mean(axis=0)
# coordinates corresponding to the data items
@property
def x(self):
return np.arange(self.xydata.shape[1])
@property
def y(self):
return np.arange(self.xydata.shape[0])
@property
def xy(self):
return np.meshgrid(self.x, self.y)
@property
def tof(self):
return (self.tof_edges[:-1] + self.tof_edges[1:]) / 2.0
@property
def xtof(self):
return np.meshgrid(self.tof, self.x)
@property
def r(self):
if self._r is None:
return None
return self._r * self.configuration.scaling_factor
@property
def raw_r(self):
return self._r
@property
def raw_dr(self):
return self._dr
@property
def dr(self):
if self._dr is None:
return None
return np.sqrt(
(self._dr * self.configuration.scaling_factor) ** 2 + (self.configuration.scaling_error * self._r) ** 2
)
@property
def wavelength_range(self):
"""Returns the wavelength range."""
# TODO: use the skipped points to trim the wl band.
# The following would work, but not if we perform a final rebin.
# We should use the final Q binning to determine the wl range.
# That's because we cut points in Q, but it has to be consistent
# throughout the application even when wavelength is plotted.
# PN = len(self.tof)-self.configuration.cut_first_n_points-1
# P0 = self.configuration.cut_last_n_points
wl_min = H_OVER_M_NEUTRON / self.dist_mod_det * self.tof[0] * 1.0e4
wl_max = H_OVER_M_NEUTRON / self.dist_mod_det * self.tof[-1] * 1.0e4
return wl_min, wl_max
@property
def wavelength(self):
h = 6.626e-34 # m^2 kg s^-1
m = 1.675e-27 # kg
v_n = self.dist_mod_det / self.tof * 1e6 # m/s
return h / m / v_n * 1e10 # A
@property
def active_area_x(self):
if self._active_area_x is None:
return (0, self.xydata.shape[0])
return self._active_area_x
@active_area_x.setter
def active_area_x(self, value):
self._active_area_x = value
@property
def active_area_y(self):
if self._active_area_y is None:
return (0, self.xydata.shape[1])
return self._active_area_y
@active_area_y.setter
def active_area_y(self, value):
self._active_area_y = value
# pylint: enable=missing-docstring
# Properties for easy data access #
[docs]
def collect_info(self, workspace):
"""
Extract meta data from DASLogs.
TODO: get average of values post filtering so that it truly represents the data
"""
self._event_workspace = str(workspace)
data = workspace.getRun()
self.logs = {}
self.log_minmax = {}
self.log_units = {}
for motor in data.keys():
if motor in ["proton_charge", "frequency", "Veto_pulse"]:
continue
item = data[motor]
try:
self.log_units[motor] = str(item.units)
if item.type == "string":
pass
# self.logs[motor] = item.value
# self.log_minmax[motor] = (item.value, item.value)
elif item.type == "number":
self.logs[motor] = np.float64(item.value)
self.log_minmax[motor] = (np.float64(item.value), np.float64(item.value))
else:
stats = item.getStatistics()
self.logs[motor] = np.float64(stats.mean)
self.log_minmax[motor] = (np.float64(stats.minimum), np.float64(stats.maximum))
except:
logging.error("Error reading DASLogs %s", motor)
self.proton_charge = data["gd_prtn_chrg"].value
self.total_counts = workspace.getNumberEvents()
self.total_time = data["duration"].value
self.experiment = str(data["experiment_identifier"].value)
self.number = workspace.getRun().getProperty("run_numbers").value
self.merge_warnings = ""
# Retrieve instrument-specific information
self.configuration.instrument.get_info(workspace, self)
[docs]
def process_configuration(self):
"""Process loaded data."""
self.scattering_angle = self.configuration.instrument.scattering_angle_from_data(self)
# Determine binning
# TODO: only the TOF binning is implemented
if self.configuration.tof_overwrite is not None:
tof_edges = self.configuration.tof_overwrite
else:
if self.configuration.tof_bin_type == 1: # constant Q
tof_edges = 1.0 / np.linspace(
1.0 / self.configuration.tof_range[0],
1.0 / self.configuration.tof_range[1],
self.configuration.tof_bins + 1,
)
elif self.configuration.tof_bin_type == 2: # constant 1/wavelength
tof_edges = self.configuration.tof_range[0] * (
(
(self.configuration.tof_range[1] / self.configuration.tof_range[0])
** (1.0 / self.configuration.tof_bins)
)
** np.arange(self.configuration.tof_bins + 1)
)
else:
tof_edges = np.arange(
self.configuration.tof_range[0], self.configuration.tof_range[1], self.configuration.tof_bins
)
self.tof_edges = tof_edges
[docs]
def prepare_plot_data(self):
"""Bin events to be used for plotting and in-app calculations."""
workspace = api.mtd[self._event_workspace]
if self.xtofdata is None:
t_0 = time.time()
binning_ws = api.CreateWorkspace(DataX=self.tof_edges, DataY=np.zeros(len(self.tof_edges) - 1))
data_rebinned = api.RebinToWorkspace(WorkspaceToRebin=workspace, WorkspaceToMatch=binning_ws)
Ixyt, Ixyt_error = getIxyt(data_rebinned)
# Create projections for the 2D datasets
Ixy = Ixyt.sum(axis=2)
Ixt = Ixyt.sum(axis=1)
# Store the data
self.data = Ixyt.astype(float) # 3D dataset
self.raw_error = Ixyt_error.astype(float) # 3D dataset
self.xydata = Ixy.transpose().astype(float) # 2D dataset
self.xtofdata = Ixt.astype(float) # 2D dataset
logging.info("Plot data generated: %s sec", time.time() - t_0)
[docs]
def get_reduction_parameters(self, update_parameters=True):
"""Determine reduction parameters from the Nexus file.
Parameters
----------
update_parameters:
if True, we will find peak ranges
"""
workspace = api.mtd[self._event_workspace]
data_info = DataInfo(workspace, self.name, self.configuration)
self.configuration.tof_range = data_info.tof_range
if update_parameters:
# data_info = DataInfo(workspace, self.name, self.configuration)
self.use_roi_actual = data_info.use_roi_actual
self.is_direct_beam = data_info.is_direct_beam
self.configuration.metadata_roi_peak = data_info.roi_peak
self.configuration.metadata_roi_bck = data_info.roi_background
if self.configuration.use_peak_finder or self.configuration.use_roi:
self.configuration.peak_roi = data_info.peak_range
if self.configuration.use_low_res_finder or self.configuration.use_roi:
self.configuration.low_res_roi = data_info.low_res_range
self.configuration.bck_roi = data_info.background
self.process_configuration()
[docs]
def get_counts_vs_TOF(self):
"""Used for normalization, returns ROI counts vs TOF."""
self.prepare_plot_data()
# Calculate ROI intensities and normalize by number of points
raw_data = self.data[
self.configuration.peak_roi[0] : self.configuration.peak_roi[1],
self.configuration.low_res_roi[0] : self.configuration.low_res_roi[1],
:,
]
size_roi = float(
(self.configuration.low_res_roi[1] - self.configuration.low_res_roi[0])
* (self.configuration.peak_roi[1] - self.configuration.peak_roi[0])
)
summed_raw = raw_data.sum(axis=0).sum(axis=0)
# Remove the background
bck = self.get_background_vs_TOF()
return (summed_raw / math.fabs(size_roi) - bck) / self.proton_charge
[docs]
def get_tof_counts_table(self):
"""Get a table of TOF vs counts in the region-of-interest (ROI).
The table columns are:
- TOF
- wavelength
- counts normalized by proton charge
- error in counts normalized by proton charge
- counts
- error in counts
- size of the ROI
"""
self.prepare_plot_data()
# Calculate ROI intensities and normalize by number of points
data_roi = self.data[
self.configuration.peak_roi[0] : self.configuration.peak_roi[1],
self.configuration.low_res_roi[0] : self.configuration.low_res_roi[1],
:,
]
counts_roi = data_roi.sum(axis=(0, 1))
raw_error_roi = self.raw_error[
self.configuration.peak_roi[0] : self.configuration.peak_roi[1],
self.configuration.low_res_roi[0] : self.configuration.low_res_roi[1],
:,
]
counts_roi_error = np.linalg.norm(raw_error_roi, axis=(0, 1)) # square root of sum of squares
if self.proton_charge > 0.0:
counts_roi_normalized = counts_roi / self.proton_charge
counts_roi_normalized_error = counts_roi_error / self.proton_charge
else:
counts_roi_normalized = counts_roi
counts_roi_normalized_error = counts_roi_error
size_roi = len(counts_roi) * [
float(
(self.configuration.low_res_roi[1] - self.configuration.low_res_roi[0])
* (self.configuration.peak_roi[1] - self.configuration.peak_roi[0])
)
]
data_table = np.vstack(
(
self.tof,
self.wavelength,
counts_roi_normalized,
counts_roi_normalized_error,
counts_roi,
counts_roi_error,
size_roi,
)
).T
header = "tof wavelength counts_normalized counts_normalized_error counts counts_error size_roi"
return data_table, header
[docs]
def get_background_vs_TOF(self):
"""Returns the background counts vs TOF."""
# Find the background pixels to use, excluding the peak if there's an overlap.
dims = self.data.shape
indices = [
(i >= self.configuration.bck_roi[0] and i < self.configuration.bck_roi[1])
and not (i >= self.configuration.peak_roi[0] and i < self.configuration.peak_roi[1])
for i in range(dims[0])
]
indices = np.asarray(indices)
n_bins = len(indices[indices == True])
raw_bck = self.data[indices, self.configuration.low_res_roi[0] : self.configuration.low_res_roi[1], :]
summed_bck = raw_bck.sum(axis=0).sum(axis=0)
size_bck = float(n_bins * (self.configuration.low_res_roi[1] - self.configuration.low_res_roi[0]))
return summed_bck / math.fabs(size_bck)
[docs]
def update_calculated_values(self):
"""Update parameters that are calculated from the configuration."""
self.scattering_angle = self.configuration.instrument.scattering_angle_from_data(self)
[docs]
def update_configuration(self, configuration: Configuration):
"""Update configuration."""
if configuration is not None:
self.configuration = copy.deepcopy(configuration)
self.update_calculated_values()
self.process_configuration()
# We want to keep consistency between the specular calculations
# and the off-spec and GISANS ones. So clear the off-spec and GISANS
# to force a recalculation.
# TODO: This is a problem when switching beteewn the Off-spec and GISANS tabs
self.off_spec = None
self.gisans_data = None
[docs]
def reflectivity(self, direct_beam=None, configuration=None):
"""Compute reflectivity."""
self.q = None
self._r = None
self._dr = None
if configuration is not None:
self.configuration = copy.deepcopy(configuration)
if self.configuration is None:
return
# If a direct beam object was passed, use it.
apply_norm = direct_beam is not None # and not self.is_direct_beam
if not apply_norm:
direct_beam = CrossSectionData("none", self.configuration, "none")
logging.info(
"%s:%s Reduction with DB: %s [config: %s]",
self.number,
self.entry_name,
direct_beam.number,
self.configuration.direct_beam,
)
angle_offset = 0 # Offset from dangle0, in radians
def _as_ints(a):
return [int(round(a[0])), int(round(a[1]))]
output_ws = "r%s_%s" % (self.number, str(self.entry_name))
ws_norm = None
if apply_norm and direct_beam._event_workspace is not None:
ws_norm = direct_beam._event_workspace
logging.info(
"Calc: %s %s %s",
str(_as_ints(self.configuration.peak_roi)),
str(_as_ints(self.configuration.bck_roi)),
str(_as_ints(self.configuration.low_res_roi)),
)
_dirpix = configuration.direct_pixel_overwrite if configuration.set_direct_pixel else None
_dangle0 = configuration.direct_angle_offset_overwrite if configuration.set_direct_angle_offset else None
direct_beam_low_res_roi = get_direct_beam_low_res_roi(conf, direct_beam.configuration)
ws = api.MagnetismReflectometryReduction(
InputWorkspace=self._event_workspace,
NormalizationWorkspace=ws_norm,
SignalPeakPixelRange=_as_ints(self.configuration.peak_roi),
SubtractSignalBackground=True,
SignalBackgroundPixelRange=_as_ints(self.configuration.bck_roi),
ApplyNormalization=apply_norm,
NormPeakPixelRange=_as_ints(direct_beam.configuration.peak_roi),
SubtractNormBackground=True,
NormBackgroundPixelRange=_as_ints(direct_beam.configuration.bck_roi),
CutLowResDataAxis=True,
LowResDataAxisPixelRange=_as_ints(self.configuration.low_res_roi),
CutLowResNormAxis=True,
LowResNormAxisPixelRange=direct_beam_low_res_roi,
CutTimeAxis=True,
QMin=0.001,
QStep=-0.01,
AngleOffset=angle_offset,
UseWLTimeAxis=False,
TimeAxisStep=self.configuration.tof_bins,
UseSANGLE=not self.configuration.use_dangle,
TimeAxisRange=self.configuration.tof_range,
SpecularPixel=self.configuration.peak_position,
ConstantQBinning=self.configuration.use_constant_q,
# EntryName=str(self.entry_name),
ConstQTrim=0.1,
AcceptNullReflectivity=True, # return empty reflectivity curves (all intensities are zero)
ErrorWeightedBackground=False,
SampleLength=self.configuration.sample_size,
DAngle0Overwrite=_dangle0,
DirectPixelOverwrite=_dirpix,
OutputWorkspace=output_ws,
)
# If there's an empty reflectivity curve, add a small value to it so that it can be plotted.
if _is_empty_reflectivity_curve(ws):
_shift_empty_reflectivity_curve(ws)
# FOR COMPATIBILITY WITH QUICKNXS #
run_object = ws.getRun()
peak_min = run_object.getProperty("scatt_peak_min").value
peak_max = run_object.getProperty("scatt_peak_max").value
low_res_min = run_object.getProperty("scatt_low_res_min").value
low_res_max = run_object.getProperty("scatt_low_res_max").value
norm_x_min = run_object.getProperty("norm_peak_min").value
norm_x_max = run_object.getProperty("norm_peak_max").value
norm_y_min = run_object.getProperty("norm_low_res_min").value
norm_y_max = run_object.getProperty("norm_low_res_max").value
tth = ws.getRun().getProperty("SANGLE").getStatistics().mean * math.pi / 180.0
quicknxs_scale = (float(norm_x_max) - float(norm_x_min)) * (float(norm_y_max) - float(norm_y_min))
quicknxs_scale /= (float(peak_max) - float(peak_min)) * (float(low_res_max) - float(low_res_min))
quicknxs_scale *= 0.005 / math.sin(tth)
ws = api.Scale(InputWorkspace=output_ws, OutputWorkspace=output_ws, factor=quicknxs_scale, Operation="Multiply")
#
self.q = ws.readX(0)[:].copy()
self._r = ws.readY(0)[:].copy() # * self.configuration.scaling_factor
self._dr = ws.readE(0)[:].copy() # * self.configuration.scaling_factor
# DeleteWorkspace(ws)
self._reflectivity_workspace = str(ws)
[docs]
def offspec(self, direct_beam: Optional["CrossSectionData"] = None):
"""Extract off-specular scattering from 4D dataset (x,y,ToF,I).
Uses a window in y to filter the 4D data,
then sums all I values for each ToF and x channel.
Qz, Qx, kiz, kfz are calculated using the x and ToF positions
together with the tth-bank and direct pixel values.
Parameters
----------
direct_beam:
if given, this data will be used to normalize the output
"""
self.prepare_plot_data()
if direct_beam:
direct_beam.prepare_plot_data()
self.off_spec = OffSpecular(self)
return self.off_spec(direct_beam)
[docs]
def gisans(self, direct_beam: Optional["CrossSectionData"] = None):
"""Compute GISANS.
Parameters
----------
direct_beam:
if given, this data will be used to normalize the output
"""
self.prepare_plot_data()
if direct_beam:
direct_beam.prepare_plot_data()
self.gisans_data = GISANS(self)
self.gisans_data(direct_beam)
self.SGrid = self.gisans_data.SGrid
self.QyGrid = self.gisans_data.QyGrid
self.QzGrid = self.gisans_data.QzGrid
return self.gisans_data
[docs]
class NexusData(object):
"""Read a nexus file with multiple cross-section data."""
def __init__(self, file_path: str, configuration: Configuration) -> None:
"""Structure to read in one or more Nexus data files.
Parameters
----------
file_path: str
absolute path to one or more files. If more than one, paths are concatenated with the plus symbol '+'
configuration: Configuration
Reduction configuration
"""
self.file_path = FilePath(file_path).path # sort the paths if more than one
self.number: str = "" # can be a single number (e.g. '1234') or a composite (e.g '1234:1239+1245')
self.configuration = configuration
self.cross_sections: Dict[str, CrossSectionData] = {}
self.main_cross_section: str = None
self.slice: int = 0 # slice index for multiple slices per run (default 0)
[docs]
def get_highest_cross_section(self, n_points=10):
"""Get the cross-section with the largest signal at the lower end of its Q range.
Parameters
----------
n_points: int
number of points to average over
"""
n_events = 0
large_xs = None
for xs in self.cross_sections:
if self.cross_sections[xs].raw_r is not None:
_r = self.cross_sections[xs].raw_r
_dr = self.cross_sections[xs].raw_dr
npts = min(len(_r), n_points)
_n_events = np.sum(_r[:npts] / _dr[:npts] ** 2) / np.sum(1 / _dr[:npts] ** 2)
if _n_events > n_events:
n_events = _n_events
large_xs = xs
return large_xs
[docs]
def get_q_range(self):
"""Return the Q range for the cross-sections."""
q_min = None
q_max = None
for xs in self.cross_sections:
if self.cross_sections[xs].q is not None:
if q_min is None:
q_min = self.cross_sections[xs].q.min()
q_max = self.cross_sections[xs].q.max()
else:
q_min = min(q_min, self.cross_sections[xs].q.min())
q_max = max(q_max, self.cross_sections[xs].q.max())
return q_min, q_max
[docs]
def get_reflectivity_workspace_group(self):
ws_list = [self.cross_sections[xs]._reflectivity_workspace for xs in self.cross_sections]
wsg = api.GroupWorkspaces(InputWorkspaces=ws_list)
return wsg
[docs]
def get_parameter(self, param):
"""Get a parameter value from the main cross-section data set."""
if self.main_cross_section is None:
return None
try:
if hasattr(self.cross_sections[self.main_cross_section].configuration, param):
return getattr(self.cross_sections[self.main_cross_section].configuration, param)
except Exception as e:
logging.error(f"Could not get parameter {param}: {e}")
return None
[docs]
def set_parameter(self, param, value):
"""Loop through the cross-section data sets and update a parameter."""
has_changed = False
try:
for xs in self.cross_sections:
if hasattr(self.cross_sections[xs].configuration, param):
if not getattr(self.cross_sections[xs].configuration, param) == value:
setattr(self.cross_sections[xs].configuration, param, value)
has_changed = True
except Exception as e:
logging.error(f"Could not set parameter {param} {value}: {e}")
return has_changed
[docs]
def calculate_reflectivity(
self,
direct_beam: Optional[CrossSectionData] = None,
configuration: Optional[Configuration] = None,
ws_suffix: str = "",
):
"""
Loop through the cross-section data sets and update the reflectivity.
Parameters
----------
direct_beam: CrossSectionData | None
Direct beam data
configuration: Configuration | None
The configuration
ws_suffix: str
String to add to reflectivity workspace name
Example
-------
`ws_suffix` is used when reducing multiple ROIs for the same run and cross-section, to
differentiate the workspace names in the Mantid data service
peak_index = 2
# update the active reduction list
data_manager.set_active_reduction_list_index(peak_index)
# get the first data set of the active reduction list
nexus_data = data_manager.reduction_list[0]
# calculate the reflectivity for this data set
nexus_data.calculate_reflectivity(ws_suffix=str(peak_index))
"""
if configuration is not None:
self.configuration = copy.deepcopy(configuration)
if self.configuration is None:
return
# If a direct beam object was passed, use it.
apply_norm = direct_beam is not None # and not self.is_direct_beam
if not apply_norm:
direct_beam = CrossSectionData("none", self.configuration, "none")
logging.info(
"%s Reduction with DB: %s [config: %s]", self.number, direct_beam.number, self.configuration.direct_beam
)
angle_offset = 0 # Offset from dangle0, in radians
def _as_ints(a):
return [int(round(a[0])), int(round(a[1])) - 1]
output_ws = "r%s_%s" % (self.number, ws_suffix)
ws_norm = None
if apply_norm and direct_beam._event_workspace is not None:
ws_norm = direct_beam._event_workspace
ws_list = [self.cross_sections[xs]._event_workspace for xs in self.cross_sections]
conf = self.cross_sections[self.main_cross_section].configuration
wsg = api.GroupWorkspaces(InputWorkspaces=ws_list)
_dirpix = conf.direct_pixel_overwrite if conf.set_direct_pixel else None
_dangle0 = conf.direct_angle_offset_overwrite if conf.set_direct_angle_offset else None
direct_beam_low_res_roi = get_direct_beam_low_res_roi(conf, direct_beam.configuration)
# Resolve binning options
final_rebin = False
const_q_binning = False
if conf.binning_type_run == BinningType.NORMAL:
final_rebin = True
elif conf.binning_type_run == BinningType.CONST_Q:
const_q_binning = True
# The reduced data workspace may be a group or a single
# workspace depending on the InputWorkspace parameter
ws = api.MagnetismReflectometryReduction(
InputWorkspace=wsg,
NormalizationWorkspace=ws_norm,
SignalPeakPixelRange=_as_ints(conf.peak_roi),
SubtractSignalBackground=conf.subtract_background,
SignalBackgroundPixelRange=_as_ints(conf.bck_roi),
ApplyNormalization=apply_norm,
NormPeakPixelRange=_as_ints(direct_beam.configuration.peak_roi),
SubtractNormBackground=conf.subtract_background,
NormBackgroundPixelRange=_as_ints(direct_beam.configuration.bck_roi),
CutLowResDataAxis=True,
LowResDataAxisPixelRange=_as_ints(conf.low_res_roi),
CutLowResNormAxis=True,
LowResNormAxisPixelRange=direct_beam_low_res_roi,
CutTimeAxis=True,
FinalRebin=final_rebin,
QMin=0.001,
QStep=conf.binning_q_step_run,
RoundUpPixel=False,
AngleOffset=angle_offset,
UseWLTimeAxis=False,
TimeAxisStep=conf.tof_bins,
UseSANGLE=not conf.use_dangle,
TimeAxisRange=conf.tof_range,
SpecularPixel=conf.peak_position,
ConstantQBinning=const_q_binning,
ConstQTrim=0.1,
CropFirstAndLastPoints=False,
CleanupBadData=final_rebin,
AcceptNullReflectivity=True, # return empty reflectivity curves (all intensities are zero)
ErrorWeightedBackground=False,
SampleLength=conf.sample_size,
DAngle0Overwrite=_dangle0,
DirectPixelOverwrite=_dirpix,
OutputWorkspace=output_ws,
)
# If there's an empty reflectivity curve, add a small value to it so that it can be plotted.
_xs_ws = [ws] if isinstance(ws, Workspace2D) else ws
for i in range(len(ws_list)):
if _is_empty_reflectivity_curve(_xs_ws[i]):
_shift_empty_reflectivity_curve(_xs_ws[i])
# FOR COMPATIBILITY WITH QUICKNXS #
_ws = ws[0] if len(ws_list) > 1 else ws
run_object = _ws.getRun()
peak_min = run_object.getProperty("scatt_peak_min").value
peak_max = run_object.getProperty("scatt_peak_max").value + 1.0
low_res_min = run_object.getProperty("scatt_low_res_min").value
low_res_max = run_object.getProperty("scatt_low_res_max").value + 1.0
norm_x_min = run_object.getProperty("norm_peak_min").value
norm_x_max = run_object.getProperty("norm_peak_max").value + 1.0
norm_y_min = run_object.getProperty("norm_low_res_min").value
norm_y_max = run_object.getProperty("norm_low_res_max").value + 1.0
tth = run_object.getProperty("two_theta").value * math.pi / 360.0
quicknxs_scale = (float(norm_x_max) - float(norm_x_min)) * (float(norm_y_max) - float(norm_y_min))
quicknxs_scale /= (float(peak_max) - float(peak_min)) * (float(low_res_max) - float(low_res_min))
logging.warning("Scale size = %s", str(quicknxs_scale))
logging.warning("Alpha_i = %s", str(tth))
_scale = 0.005 / math.sin(tth) if tth > 0.0002 else 1.0
quicknxs_scale *= _scale
ws = api.Scale(InputWorkspace=output_ws, OutputWorkspace=output_ws, factor=quicknxs_scale, Operation="Multiply")
_ws = ws if len(ws_list) > 1 else [ws]
for xs in _ws:
# add suffix to avoid overwriting ws in mantid data service, needed for multiple peaks
api.RenameWorkspace(str(xs), str(xs) + ws_suffix)
xs_id = xs.getRun().getProperty("cross_section_id").value
self.cross_sections[xs_id].q = xs.readX(0)[:].copy()
self.cross_sections[xs_id]._r = np.ma.masked_equal(xs.readY(0)[:].copy(), 0)
self.cross_sections[xs_id]._dr = np.ma.masked_equal(xs.readE(0)[:].copy(), 0)
self.cross_sections[xs_id]._reflectivity_workspace = str(xs)
self.cross_sections[xs_id]._reflectivity_workspacegroup = str(ws)
[docs]
def calculate_gisans(self, direct_beam, progress=None):
"""Compute GISANS."""
has_errors = False
detailed_msg = ""
if progress is not None:
progress(1, "Computing GISANS", out_of=100.0)
for i, xs in enumerate(self.cross_sections):
try:
self.cross_sections[xs].gisans(direct_beam=direct_beam)
except:
has_errors = True
detailed_msg += "Could not calculate GISANS reflectivity for %s\n %s\n\n" % (
xs,
traceback.format_exc(),
)
logging.error("Could not calculate GISANS reflectivity for %s", xs)
if progress:
progress(i, message="Computed GISANS %s" % xs, out_of=len(self.cross_sections))
if has_errors:
raise RuntimeError(detailed_msg)
if progress is not None:
progress(100, "Complete", out_of=100.0)
[docs]
def is_offspec_available(self):
"""Verify whether we have off-specular data calculated for all cross-sections."""
for xs in self.cross_sections:
if self.cross_sections[xs].off_spec is None:
return False
return True
[docs]
def is_gisans_available(self):
"""Verify whether we have GISANS data calculated for all cross-sections."""
for xs in self.cross_sections:
if self.cross_sections[xs].gisans_data is None:
return False
return True
[docs]
def calculate_offspec(self, direct_beam=None):
"""Update the off-specular reflectivity for all cross sections."""
has_errors = False
detailed_msg = ""
for xs in self.cross_sections:
try:
self.cross_sections[xs].offspec(direct_beam=direct_beam)
except Exception:
has_errors = True
detailed_msg += "Could not calculate off-specular reflectivity for %s\n %s\n\n" % (
xs,
traceback.format_exc(),
)
logging.error(detailed_msg)
if has_errors:
raise RuntimeError(detailed_msg)
[docs]
def update_configuration(self, configuration):
"""Update the configuration for all cross sections."""
for xs in self.cross_sections:
try:
self.cross_sections[xs].update_configuration(configuration)
except Exception as err:
logging.error(f"Could not update configuration for {xs}: {err}")
[docs]
def update_calculated_values(self):
"""Update calculated values for all cross sections."""
for xs in self.cross_sections:
self.cross_sections[xs].update_calculated_values()
[docs]
def load(self, update_parameters: bool = True, progress: Optional[Callable] = None):
"""Load cross-sections from a nexus file.
Parameters
----------
update_parameters:
if True, we will find peak ranges
progress:
call-back function to track progress
"""
# sanity check
if self.file_path is None:
raise RuntimeError("self.file_path is None")
self.cross_sections = OrderedDict()
if progress is not None:
progress(5, "Filtering data...", out_of=100.0)
try:
xs_list = self.configuration.instrument.load_data(self.file_path, self.configuration)
logging.info("%s loaded: %s xs", self.file_path, len(xs_list))
except RuntimeError as run_err:
logging.error(f"Could not load file(s) {str(self.file_path)}\n {run_err}")
return self.cross_sections
progress_value = 0
# Keep track of cross-section with max counts so we can use it to
# select peak regions
_max_counts = 0
_max_xs = None
_loaded_with_getDI = False
for ws in xs_list:
# Get the unique name for the cross-section, determined by the filtering
xs_id = ws.getRun().getProperty("cross_section_id").value
if ws.getRun().hasProperty("loaded_with_getDI"):
_loaded_with_getDI = True
if progress is not None:
progress_value += int(100.0 / len(xs_list))
progress(progress_value, "Loading %s..." % str(xs_id), out_of=100.0)
# Get rid of empty workspaces
logging.info("Loading %s: %s events", str(xs_id), ws.getNumberEvents())
if ws.getNumberEvents() < self.configuration.nbr_events_min:
logging.warning("Too few events for %s: %s", xs_id, ws.getNumberEvents())
continue
name = ws.getRun().getProperty("cross_section_id").value
cross_section = CrossSectionData(name, self.configuration, entry_name=xs_id, workspace=ws)
self.cross_sections[name] = cross_section
self.number = cross_section.number # e.g '1234:1238+1239' if more than one run made up this cross section
if cross_section.total_counts > _max_counts:
_max_counts = cross_section.total_counts
_max_xs = name
# Set direct_pixel_overwrite to the value read from the file
cross_section.configuration.direct_pixel_overwrite = cross_section.direct_pixel
# Now that we know which cross section has the most data,
# use that one to get the reduction parameters
self.main_cross_section = _max_xs
self.cross_sections[_max_xs].get_reduction_parameters(update_parameters=update_parameters)
# Push the configuration (reduction options and peak regions) from the
# cross-section with the most data to all other cross-sections.
for xs in self.cross_sections:
if xs == _max_xs:
continue
self.cross_sections[xs].update_configuration(self.cross_sections[_max_xs].configuration)
if progress is not None:
progress(100, "Complete", out_of=100.0)
if _loaded_with_getDI:
progress(100, "WARNING: Analyzer/polarizer states not available - loaded with getDI", out_of=100.0)
return self.cross_sections
[docs]
def is_direct_beam(self):
"""Returns True if the main cross-section is a direct beam."""
return self.cross_sections[self.main_cross_section].is_direct_beam
[docs]
def set_is_direct_beam(self, is_direct_beam: bool):
"""Sets the value of `is_direct_beam` for all cross-sections."""
for xs in self.cross_sections:
self.cross_sections[xs].is_direct_beam = is_direct_beam
[docs]
def get_main_cross_section_data(self) -> CrossSectionData:
"""Returns the CrossSectionData object corresponding to the main cross section"""
xs_label = self.main_cross_section
return self.cross_sections[xs_label]