"""Methods used to process data, usually calling Mantid."""
# pylint: disable=invalid-name, too-many-instance-attributes, line-too-long, multiple-statements, bare-except, protected-access, wrong-import-position
import logging
import math
import time
from typing import List, Optional
import h5py
import mantid
import mantid.simpleapi as api
import numpy as np
from mantid.api import MatrixWorkspace
from quicknxs.interfaces.data_handling.data_set import NexusData, NexusMetaData
from quicknxs.interfaces.data_handling.instrument import Instrument
[docs]
class NormalizeToUnityQCutoffError(Exception):
"""When normalizing to unity fails due to no data below Q cutoff."""
pass
[docs]
def generate_short_script(reduction_list):
"""Generate a simple reduction script for Mantid."""
if len(reduction_list) == 0:
return "# No data in reduction list\n"
xs = list(reduction_list[0].cross_sections.keys())[0]
script = "# Mantid version %s\n" % mantid.__version__
script += "# Date: %s\n\n" % time.strftime("%Y-%m-%d %H:%M:%S")
script += "from mantid.simpleapi import *\n"
logging.info("Cross section for script %s", xs)
for i in range(len(reduction_list)):
# If we couldn't calculate the reflectivity, we won't have a workspace available
if reduction_list[i].cross_sections[xs]._reflectivity_workspacegroup is None:
logging.info(" No workspace: %s", i)
continue
ws_name = str(reduction_list[i].cross_sections[xs]._reflectivity_workspacegroup)
# NOTE: It is possible that only one cross section is present in the run, therefore
# api.mtd[ws_name] could be a Workspace2D instead of a workspace group.
# Given that a modernization process is scheduled, we are going to do a ducktape
# fixing here.
contain_single_crosssection = False
try:
if len(api.mtd[ws_name]) == 0:
logging.info(" No entry in %s workspace group", ws_name)
except TypeError:
# NOTE: based on the information we have, the only possible data type here should be
# a workspace2D
contain_single_crosssection = True
logging.info(" single cross section in %s (Workspace2D)", ws_name)
# short-hand it
ws_iterable = [api.mtd[ws_name]] if contain_single_crosssection else api.mtd[ws_name]
script += "# Run:%s\n" % reduction_list[i].cross_sections[xs].number
script_text = api.GeneratePythonScript(ws_iterable[0])
script += script_text.replace(", ", ",\n ")
script += "\n\n"
script += "# Crop points on each end\n"
q = ws_iterable[0].readX(0)
p_0 = reduction_list[i].cross_sections[xs].configuration.cut_first_n_points
p_f = len(q) - reduction_list[i].cross_sections[xs].configuration.cut_last_n_points - 1
q0 = q[p_0]
qf = q[p_f]
logging.info("%s %s %s %s", q0, qf, p_0, p_f)
for item in ws_iterable:
script += "CropWorkspace(InputWorkspace='%s', XMin=%s, XMax=%s, " % (str(item), q0, qf)
script += "OutputWorkspace='%s')\n" % str(item)
script += "Scale(InputWorkspace='%s', Operation='Multiply', " % str(item)
script += "Factor=%s, " % reduction_list[i].cross_sections[xs].configuration.scaling_factor
script += "OutputWorkspace='%s')\n\n" % str(item)
return script
[docs]
def generate_script(reduction_list: List[NexusData], pol_state: str):
"""Generate a Mantid script for the reflectivity reduction.
Parameters
----------
reduction_list:
list of NexusData objects
pol_state:
cross-section name
"""
ws_list = get_scaled_workspaces(reduction_list, pol_state)
# If the reflectivity calculation failed, we may not have data to work with
# for this cross-section.
if not ws_list:
return ""
script = "# Cross-section: %s\n" % pol_state
for ws in ws_list:
script += "# Run:%s\n" % ws.getRunNumber()
script_text = api.GeneratePythonScript(ws)
script += script_text.replace(", ", ",\n ")
script += "\n"
return script
def _prepare_workspace_for_stitching(
cross_sections: dict, xs_input: str, global_fit: bool, ws_name: str
) -> MatrixWorkspace:
"""Create a workspace from a CrossSectionData object that we can call Stitch1D on.
Parameters
----------
cross_sections:
Dictionary with cross-section name and CrossSectionData objects
xs:
Which cross-section to use
global_fit:
if True, merge data from all cross-sections
ws_name:
Name for the output Mantid workspace
"""
if global_fit:
xs_names = cross_sections.keys()
else:
xs_names = [xs_input]
ws_list = []
for xs in xs_names:
cross_section = cross_sections[xs]
n_total = len(cross_section.q)
p_0 = cross_section.configuration.cut_first_n_points
p_n = n_total - cross_section.configuration.cut_last_n_points
# remove leading/trailing indices where cross_section._r is masked
while p_0 < p_n and np.ma.is_masked(cross_section._r[p_0]):
p_0 += 1
while p_n > p_0 and np.ma.is_masked(cross_section._r[p_n - 1]):
p_n -= 1
if p_n <= p_0:
raise ValueError(f"No valid data points in cross-section {xs}")
ws_xs = api.CreateWorkspace(
DataX=cross_section.q[p_0:p_n],
DataY=cross_section._r[p_0:p_n],
DataE=cross_section._dr[p_0:p_n],
OutputWorkspace="tmp_prepare_stitching_" + xs,
)
ws_list.append(str(ws_xs))
ws_merge = api.MergeRuns(ws_list)
ws_merge.setDistribution(True)
ws = api.ConvertToHistogram(ws_merge, OutputWorkspace=ws_name)
# Delete temporary workspaces
api.DeleteWorkspaces(ws_list)
api.DeleteWorkspace(ws_merge)
return ws
def _get_stitching_overlap_region(ws_lo: MatrixWorkspace, ws_hi: MatrixWorkspace, n_points_outside_overlap: int = 3):
"""Get the x boundary values of the overlap region between two workspaces.
Parameters
----------
ws_lo:
The low-Q reflectivity curve
ws_hi:
The high-Q reflectivity curve
n_points_outside_overlap:
Number of additional points on each end of the overlap region to include in the fit
Returns
-------
tuple
(min, max) defining the overlap region
Raises
------
ValueError:
If the x-range of workspace ws_lo is higher than the x-range of ws_hi
"""
if ws_lo.readX(0)[0] > ws_hi.readX(0)[0] or ws_lo.readX(0)[-1] > ws_hi.readX(0)[-1]:
raise ValueError("x-range for ws_lo must not be higher than x-range for ws_hi")
# get initial boundaries from lowest Q of the high-Q curve and highest Q of the low-Q curve
x_lo_max = ws_lo.readX(0)[-1]
x_hi_min = ws_hi.readX(0)[0]
if x_hi_min > x_lo_max:
# no overlap, use only extra points
min_idx_lo = -n_points_outside_overlap
max_idx_hi = n_points_outside_overlap - 1
else:
# use overlap plus extra points
min_idx_lo = np.where(ws_lo.readX(0) >= x_hi_min)[0][0]
min_idx_lo = max(min_idx_lo - n_points_outside_overlap, 0)
max_idx_hi = np.where(ws_hi.readX(0) <= x_lo_max)[0][-1]
max_idx_hi = min(max_idx_hi + n_points_outside_overlap, ws_hi.readX(0).size - 1)
# get x values corresponding to the found indices
xmin = ws_lo.readX(0)[min_idx_lo]
xmax = ws_hi.readX(0)[max_idx_hi]
return xmin, xmax
def _get_polynomial_fit_stitching_scaling_factor(
ws_lo: MatrixWorkspace,
ws_hi: MatrixWorkspace,
n_polynom: int,
n_points_outside_overlap: int = 3,
):
"""Get the scaling factor for stitching two curves by fitting a polynomial and a scaling factor.
For example, if the polynomial degree is 3, the scaling factor is obtained by minimizing the function
f(ws_lo, ws_hi) = sum_{ws_lo}{A0 + A1*x_i + A2*x_i^2 + A3*x_i^3} +
scaling_factor * sum_{ws_hi}{A0 + A1*x_i + A2*x_i^2 + A3*x_i^3}
where the independent variables are A0, A1, A2, A3 and scaling_factor.
Parameters
----------
ws_lo:
Low-Q reflectivity curve
ws_hi:
High-Q reflectivity curve
n_polynom:
Degree of the polynomial to fit the curves to
n_points_outside_overlap:
Number of additional points on each end of the overlap region to include in the fit
Returns
-------
Dictionary with keys `scale_factor_value`, `scale_factor_error`, `polynomial_coeffients`
Raises
------
RunTimeError:
If there are too few data points compared to the number of independent variables
ValueError:
If the x-range of workspace ws_lo is higher than the x-range of ws_hi
"""
# crop to the overlap region of the curves plus extra points
xmin, xmax = _get_stitching_overlap_region(ws_lo, ws_hi, n_points_outside_overlap)
ws_lo_crop = api.CropWorkspace(InputWorkspace=ws_lo, XMin=xmin, XMax=xmax)
ws_hi_crop = api.CropWorkspace(InputWorkspace=ws_hi, XMin=xmin, XMax=xmax)
# build the strings to represent the polynomial, initial values and ties in the Mantid function definition
formula_str = "A0"
initial_val_str = "A0=1"
ties_str = "f1.A0=f0.A0"
for i in range(1, n_polynom + 1):
formula_str += f"+A{i}*x^{i}" # "A0 + A1*x + A2*x^2 + ..."
initial_val_str += f", A{i}=1" # "A0=1, A1=1, ..."
ties_str += f",f1.A{i}=f0.A{i}" # "f1.A0=f0.A0, f1.A1=f0.A1, ..."
poly_func = f";name=UserFunction, Formula={formula_str}, {initial_val_str}, $domains=0"
scaled_poly_func = (
f";name=UserFunction, Formula=poly_scale*({formula_str}), poly_scale=1, {initial_val_str}, $domains=1"
)
ties = f";ties=({ties_str})"
multi_func = "composite=MultiDomainFunction, NumDeriv=1" + poly_func + scaled_poly_func + ties
# fit MultiDomain function to the low-Q and high-Q workspaces
fit = api.Fit(
Function=multi_func,
InputWorkspace=ws_lo_crop,
WorkspaceIndex=0,
InputWorkspace_1=ws_hi_crop,
WorkspaceIndex_1=0,
Output="fit",
)
# the scaling factor poly_scale scaled the polynomial to match the high-Q reflectivity curve, therefore, take the
# inverse to get the scaling factor for matching the high-Q reflectivity curve with the low-Q curve
output = {}
i_scale = n_polynom + 1
poly_scale = fit.Function.function.getParameterValue(i_scale)
poly_scale_error = fit.Function.function.getError(i_scale)
output["scale_factor_value"] = 1 / poly_scale
output["scale_factor_error"] = poly_scale_error / poly_scale**2
output["polynomial_coeff"] = fit.OutputParameters.column(1)[: n_polynom + 1]
# delete temporary workspaces
api.DeleteWorkspace(ws_lo_crop)
api.DeleteWorkspace(ws_hi_crop)
return output
[docs]
def stitch_reflectivity(
reduction_list: List[NexusData],
xs: Optional[str] = None,
normalize_to_unity: bool = True,
q_cutoff: float = 0.01,
global_fit: bool = False,
poly_degree: Optional[int] = None,
poly_points: int = 3,
):
"""Stitch and normalize data sets.
Parameters
----------
reduction_list:
list of data sets to stitch
xs:
name of the cross-section to use for the first data set
normalize_to_unity:
if `True`, the specular ridge will be normalized to 1
q_cutoff:
used if `normalize_to_unity` = `True`, data with q < `q_cutoff` are part of the specular ridge
global_fit:
if `True`, use data from all cross-sections to calculate scaling factors
poly_degree:
if not `None`, find the scaling factor by simultaneously fitting a polynomial and scaling factor to the two curves
poly_points:
number of additional points on each end of the overlap region to include in the fit
Returns
-------
Tuple
(list of scaling factors, list of scaling factor errors)
Raises
------
NormalizeToUnityQCutoffError:
if `normalize_to_unity` = `True`, but there is no data below `q_cutoff`
"""
if not reduction_list:
return []
# Select the cross-section we will use to determine the scaling factors
if xs is None:
xs = list(reduction_list[0].cross_sections.keys())[0]
# First, determine the overall scaling factor as needed
scaling_factor = 1.0
scaling_error = 0.0
if normalize_to_unity:
# Calculate scaling factor for normalizing the specular ridge to 1
idx_list = reduction_list[0].cross_sections[xs].q < q_cutoff
if not any(idx_list):
raise NormalizeToUnityQCutoffError(
f"""No data below Q cutoff.\n
Critical Q cutoff: {q_cutoff}\n
Smallest Q value: {reduction_list[0].cross_sections[xs].q.min():.5f}"""
)
total = 0
weights = 0
for i in range(len(reduction_list[0].cross_sections[xs]._r)):
if idx_list[i]:
w = 1.0 / float(reduction_list[0].cross_sections[xs]._dr[i]) ** 2
total += w * float(reduction_list[0].cross_sections[xs]._r[i])
weights += w
if weights > 0 and total > 0:
# Since weighted_average = \sum{ r_i / dr_i^2 } / \sum{ 1 / dr_i^2 }, the error is
# weighted_average_error = 1 / \sqrt{ \sum{ 1 / dr_i^2 } } = 1 / \sqrt{weights}
weighted_average = total / weights
weighted_average_error = 1.0 / np.sqrt(weights)
# Scaling factor S = 1 / u, error dS = du / u^2
scaling_factor = 1 / weighted_average
scaling_error = weighted_average_error / weighted_average**2
reduction_list[0].set_parameter("scaling_factor", scaling_factor)
reduction_list[0].set_parameter("scaling_error", scaling_error)
else:
scaling_factor = reduction_list[0].cross_sections[xs].configuration.scaling_factor
# Stitch the data sets together
running_scale = scaling_factor
running_error = scaling_error
scaling_factors = [running_scale]
scaling_errors = [running_error]
for i in range(len(reduction_list) - 1):
# Low-Q data set
_previous_ws = _prepare_workspace_for_stitching(
reduction_list[i].cross_sections, xs, global_fit, "low_q_workspace"
)
# High-Q data set
ws = _prepare_workspace_for_stitching(reduction_list[i + 1].cross_sections, xs, global_fit, "high_q_workspace")
# Determine overlap between workspaces after removing leading/trailing zeroes
# Note: Stitch1D can find the overlap automatically, but it fails if the second workspace x_min is smaller
# than the first workspace x_min, which can arise especially with constant-Q binning
start_overlap = max(_previous_ws.readX(0)[0], ws.readX(0)[0])
end_overlap = min(_previous_ws.readX(0)[-1], ws.readX(0)[-1])
if isinstance(poly_degree, int) and poly_degree > 0:
output_poly = _get_polynomial_fit_stitching_scaling_factor(_previous_ws, ws, poly_degree, poly_points)
scale = output_poly["scale_factor_value"]
scale_error = output_poly["scale_factor_error"]
else:
_, scale = api.Stitch1D(
_previous_ws,
ws,
StartOverlap=start_overlap,
EndOverlap=end_overlap,
OutputScalingWorkspace="ws_stitching_scale",
)
scale_error = api.mtd["ws_stitching_scale"].readE(0)[0]
# Calculate the error in the product of two scaling factors, f1 * f2, as \sqrt{(df2 * f1)^2 + (df1 * f2)^2}
running_error = np.sqrt((scale_error * running_scale) ** 2 + (running_error * scale) ** 2)
scaling_errors.append(running_error)
reduction_list[i + 1].set_parameter("scaling_error", running_error)
running_scale *= scale
scaling_factors.append(running_scale)
reduction_list[i + 1].set_parameter("scaling_factor", running_scale)
return scaling_factors, scaling_errors
[docs]
def merge_reflectivity(reduction_list, xs, q_min=0.001, q_step=-0.01):
"""
Combine the workspaces for a given cross-section into a single workspace.
TODO: trim workspaces
trim_first = [item.cross_sections[pol_state].configuration.cut_first_n_points for item in self.data_manager.reduction_list]
trim_last = [item.cross_sections[pol_state].configuration.cut_last_n_points for item in self.data_manager.reduction_list]
"""
ws_list = []
scaling_factors = []
q_max = q_min
for i in range(len(reduction_list)):
# If we couldn't calculate the reflectivity, we won't have a workspace available
if reduction_list[i].cross_sections[xs].reflectivity_workspace is None:
continue
_, _q_max = reduction_list[i].get_q_range()
q_max = max(q_max, _q_max)
ws_name = str(reduction_list[i].cross_sections[xs].reflectivity_workspace)
# Stitch1DMany only scales workspaces relative to the first one
if i == 0:
api.Scale(
InputWorkspace=ws_name,
OutputWorkspace=ws_name + "_histo",
factor=reduction_list[i].cross_sections[xs].configuration.scaling_factor,
Operation="Multiply",
)
api.ConvertToHistogram(InputWorkspace=ws_name + "_histo", OutputWorkspace=ws_name + "_histo")
else:
scaling_factors.append(reduction_list[i].cross_sections[xs].configuration.scaling_factor)
api.ConvertToHistogram(InputWorkspace=ws_name, OutputWorkspace=ws_name + "_histo")
ws_list.append(ws_name + "_histo")
params = "%s, %s, %s" % (q_min, q_step, q_max)
if len(ws_list) > 1:
merged_ws, _ = api.Stitch1DMany(
InputWorkspaces=ws_list,
Params=params,
UseManualScaleFactors=True,
ManualScaleFactors=scaling_factors,
OutputWorkspace=ws_name + "_merged",
)
# sort WorkspaceGroup by theta
merged_ws = sorted(merged_ws, key=lambda ws: ws.getRun().getProperty("two_theta").value)
elif len(ws_list) == 1:
merged_ws = api.CloneWorkspace(ws_list[0], OutputWorkspace=ws_name + "_merged")
else:
return None
# Remove temporary workspaces
for ws in ws_list:
api.DeleteWorkspace(ws)
api.SaveAscii(InputWorkspace=merged_ws, Filename="/tmp/test.txt")
return merged_ws
[docs]
def get_scaled_workspaces(reduction_list: List[NexusData], xs: str):
"""Return a list of scaled workspaces."""
ws_list = []
for i in range(len(reduction_list)):
# If we couldn't calculate the reflectivity, we won't have a workspace available
if reduction_list[i].cross_sections[xs].reflectivity_workspace is None:
continue
ws_name = str(reduction_list[i].cross_sections[xs].reflectivity_workspace)
ws_tmp = api.Scale(
InputWorkspace=ws_name,
OutputWorkspace=ws_name + "_scaled",
factor=reduction_list[i].cross_sections[xs].configuration.scaling_factor,
Operation="Multiply",
)
api.AddSampleLog(
Workspace=ws_tmp,
LogName="scaling_factor",
LogText=str(reduction_list[i].cross_sections[xs].configuration.scaling_factor),
LogType="Number",
LogUnit="",
)
ws_list.append(ws_tmp)
return ws_list
[docs]
def read_log(ws: api.Workspace, name: str, target_units: str = "", assumed_units: str = ""):
"""Read a log value, taking care of units.
If the log entry has no units, the target units are assumed.
Parameters
----------
ws:
Mantid workspace
name:
Name of the property to read
target_units:
Units to convert to
assumed_units:
Units of origin, if not specified in the log itself
"""
_units = {
"m": {
"mm": 1000.0,
},
"mm": {
"m": 0.001,
},
"deg": {
"rad": math.pi / 180.0,
},
"rad": {
"deg": 180.0 / math.pi,
},
}
prop = ws.getRun().getProperty(name)
value = prop.getStatistics().mean
# If the property has units we don't recognize, use the assumed units
units = prop.units if prop.units in _units else assumed_units
if units in _units and target_units in _units[units]:
return value * _units[units][target_units]
return value