"""Methods used to process data, usually calling Mantid."""
import logging
import math
import time
import h5py
import mantid
import mantid.simpleapi as api
import numpy as np
from mantid.api import MatrixWorkspace
from quicknxs.exceptions import NormalizeToUnityQCutoffError
from quicknxs.models.data_set import NexusData, NexusMetaData
from quicknxs.models.instrument import Instrument
[docs]
def generate_short_script(reduction_list):
"""Generate a simple reduction script for Mantid."""
if len(reduction_list) == 0:
return "# No data in reduction list\n"
xs = list(reduction_list[0].cross_sections.keys())[0]
script = f"# Mantid version {mantid.__version__}\n"
script += f"# Date: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
script += "from mantid.simpleapi import *\n"
logging.info("Cross section for script %s", xs)
for i in range(len(reduction_list)):
# If we couldn't calculate the reflectivity, we won't have a workspace available
if reduction_list[i].cross_sections[xs]._reflectivity_workspacegroup is None:
logging.info(" No workspace: %s", i)
continue
ws_name = str(reduction_list[i].cross_sections[xs]._reflectivity_workspacegroup)
# NOTE: It is possible that only one cross section is present in the run, therefore
# api.mtd[ws_name] could be a Workspace2D instead of a workspace group.
# Given that a modernization process is scheduled, we are going to do a ducktape
# fixing here.
contain_single_crosssection = False
try:
if len(api.mtd[ws_name]) == 0:
logging.info(" No entry in %s workspace group", ws_name)
except TypeError:
# NOTE: based on the information we have, the only possible data type here should be
# a workspace2D
contain_single_crosssection = True
logging.info(" single cross section in %s (Workspace2D)", ws_name)
# short-hand it
ws_iterable = [api.mtd[ws_name]] if contain_single_crosssection else api.mtd[ws_name]
script += f"# Run:{reduction_list[i].cross_sections[xs].run_number}\n"
script_text = api.GeneratePythonScript(ws_iterable[0])
script += script_text.replace(", ", ",\n ")
script += "\n\n"
script += "# Crop points on each end\n"
q = ws_iterable[0].readX(0)
p_0 = reduction_list[i].cross_sections[xs].configuration.cut_first_n_points
p_f = len(q) - reduction_list[i].cross_sections[xs].configuration.cut_last_n_points - 1
q0 = q[p_0]
qf = q[p_f]
logging.info("%s %s %s %s", q0, qf, p_0, p_f)
for item in ws_iterable:
script += f"CropWorkspace(InputWorkspace='{str(item)}', XMin={q0}, XMax={qf}, "
script += f"OutputWorkspace='{str(item)}')\n"
script += f"Scale(InputWorkspace='{str(item)}', Operation='Multiply', "
script += f"Factor={reduction_list[i].cross_sections[xs].configuration.scaling_factor}, "
script += f"OutputWorkspace='{str(item)}')\n\n"
return script
[docs]
def generate_script(reduction_list: list[NexusData], pol_state: str):
"""Generate a Mantid script for the reflectivity reduction.
Parameters
----------
reduction_list:
list of NexusData objects
pol_state:
cross-section name
"""
ws_list = get_scaled_workspaces(reduction_list, pol_state)
# If the reflectivity calculation failed, we may not have data to work with
# for this cross-section.
if not ws_list:
return ""
script = f"# Cross-section: {pol_state}\n"
for ws in ws_list:
script += f"# Run:{ws.getRunNumber()}\n"
script_text = api.GeneratePythonScript(ws)
script += script_text.replace(", ", ",\n ")
script += "\n"
return script
def _prepare_workspace_for_stitching(
cross_sections: dict, xs_input: str, global_fit: bool, ws_name: str
) -> MatrixWorkspace:
"""Create a workspace from a CrossSectionData object that we can call Stitch1D on.
Parameters
----------
cross_sections:
Dictionary with cross-section name and CrossSectionData objects
xs:
Which cross-section to use
global_fit:
if True, merge data from all cross-sections
ws_name:
Name for the output Mantid workspace
"""
if global_fit:
xs_names = cross_sections.keys()
else:
xs_names = [xs_input]
ws_list = []
for xs in xs_names:
cross_section = cross_sections[xs]
n_total = len(cross_section.q)
p_0 = cross_section.configuration.cut_first_n_points
p_n = n_total - cross_section.configuration.cut_last_n_points
# remove leading/trailing indices where cross_section._r is masked
while p_0 < p_n and np.ma.is_masked(cross_section._r[p_0]):
p_0 += 1
while p_n > p_0 and np.ma.is_masked(cross_section._r[p_n - 1]):
p_n -= 1
if p_n <= p_0:
raise ValueError(f"No valid data points in cross-section {xs}")
ws_xs = api.CreateWorkspace(
DataX=cross_section.q[p_0:p_n],
DataY=cross_section._r[p_0:p_n],
DataE=cross_section._dr[p_0:p_n],
OutputWorkspace="tmp_prepare_stitching_" + xs,
)
ws_list.append(str(ws_xs))
ws_merge = api.MergeRuns(ws_list)
ws_merge.setDistribution(True)
ws = api.ConvertToHistogram(ws_merge, OutputWorkspace=ws_name)
# Delete temporary workspaces
api.DeleteWorkspaces(ws_list)
api.DeleteWorkspace(ws_merge)
return ws
def _get_stitching_overlap_region(ws_lo: MatrixWorkspace, ws_hi: MatrixWorkspace, n_points_outside_overlap: int = 3):
"""Get the x boundary values of the overlap region between two workspaces.
Parameters
----------
ws_lo:
The low-Q reflectivity curve
ws_hi:
The high-Q reflectivity curve
n_points_outside_overlap:
Number of additional points on each end of the overlap region to include in the fit
Returns
-------
tuple
(min, max) defining the overlap region
Raises
------
ValueError:
If the x-range of workspace ws_lo is higher than the x-range of ws_hi
"""
if ws_lo.readX(0)[0] > ws_hi.readX(0)[0] or ws_lo.readX(0)[-1] > ws_hi.readX(0)[-1]:
raise ValueError("x-range for ws_lo must not be higher than x-range for ws_hi")
# get initial boundaries from lowest Q of the high-Q curve and highest Q of the low-Q curve
x_lo_max = ws_lo.readX(0)[-1]
x_hi_min = ws_hi.readX(0)[0]
if x_hi_min > x_lo_max:
# no overlap, use only extra points
min_idx_lo = -n_points_outside_overlap
max_idx_hi = n_points_outside_overlap - 1
else:
# use overlap plus extra points
min_idx_lo = np.where(ws_lo.readX(0) >= x_hi_min)[0][0]
min_idx_lo = max(min_idx_lo - n_points_outside_overlap, 0)
max_idx_hi = np.where(ws_hi.readX(0) <= x_lo_max)[0][-1]
max_idx_hi = min(max_idx_hi + n_points_outside_overlap, ws_hi.readX(0).size - 1)
# get x values corresponding to the found indices
xmin = ws_lo.readX(0)[min_idx_lo]
xmax = ws_hi.readX(0)[max_idx_hi]
return xmin, xmax
def _get_polynomial_fit_stitching_scaling_factor(
ws_lo: MatrixWorkspace,
ws_hi: MatrixWorkspace,
n_polynom: int,
n_points_outside_overlap: int = 3,
):
"""Get the scaling factor for stitching two curves by fitting a polynomial and a scaling factor.
For example, if the polynomial degree is 3, the scaling factor is obtained by minimizing the function
f(ws_lo, ws_hi) = sum_{ws_lo}{A0 + A1*x_i + A2*x_i^2 + A3*x_i^3} +
scaling_factor * sum_{ws_hi}{A0 + A1*x_i + A2*x_i^2 + A3*x_i^3}
where the independent variables are A0, A1, A2, A3 and scaling_factor.
Parameters
----------
ws_lo:
Low-Q reflectivity curve
ws_hi:
High-Q reflectivity curve
n_polynom:
Degree of the polynomial to fit the curves to
n_points_outside_overlap:
Number of additional points on each end of the overlap region to include in the fit
Returns
-------
Dictionary with keys `scale_factor_value`, `scale_factor_error`, `polynomial_coeffients`
Raises
------
RunTimeError:
If there are too few data points compared to the number of independent variables
ValueError:
If the x-range of workspace ws_lo is higher than the x-range of ws_hi
"""
# crop to the overlap region of the curves plus extra points
xmin, xmax = _get_stitching_overlap_region(ws_lo, ws_hi, n_points_outside_overlap)
ws_lo_crop = api.CropWorkspace(InputWorkspace=ws_lo, XMin=xmin, XMax=xmax)
ws_hi_crop = api.CropWorkspace(InputWorkspace=ws_hi, XMin=xmin, XMax=xmax)
# build the strings to represent the polynomial, initial values and ties in the Mantid function definition
formula_str = "A0"
initial_val_str = "A0=1"
ties_str = "f1.A0=f0.A0"
for i in range(1, n_polynom + 1):
formula_str += f"+A{i}*x^{i}" # "A0 + A1*x + A2*x^2 + ..."
initial_val_str += f", A{i}=1" # "A0=1, A1=1, ..."
ties_str += f",f1.A{i}=f0.A{i}" # "f1.A0=f0.A0, f1.A1=f0.A1, ..."
poly_func = f";name=UserFunction, Formula={formula_str}, {initial_val_str}, $domains=0"
scaled_poly_func = (
f";name=UserFunction, Formula=poly_scale*({formula_str}), poly_scale=1, {initial_val_str}, $domains=1"
)
ties = f";ties=({ties_str})"
multi_func = "composite=MultiDomainFunction, NumDeriv=1" + poly_func + scaled_poly_func + ties
# fit MultiDomain function to the low-Q and high-Q workspaces
fit = api.Fit(
Function=multi_func,
InputWorkspace=ws_lo_crop,
WorkspaceIndex=0,
InputWorkspace_1=ws_hi_crop,
WorkspaceIndex_1=0,
Output="fit",
)
# the scaling factor poly_scale scaled the polynomial to match the high-Q reflectivity curve, therefore, take the
# inverse to get the scaling factor for matching the high-Q reflectivity curve with the low-Q curve
output = {}
i_scale = n_polynom + 1
poly_scale = fit.Function.function.getParameterValue(i_scale)
poly_scale_error = fit.Function.function.getError(i_scale)
output["scale_factor_value"] = 1 / poly_scale
output["scale_factor_error"] = poly_scale_error / poly_scale**2
output["polynomial_coeff"] = fit.OutputParameters.column(1)[: n_polynom + 1]
# delete temporary workspaces
api.DeleteWorkspace(ws_lo_crop)
api.DeleteWorkspace(ws_hi_crop)
return output
[docs]
def stitch_reflectivity(
reduction_list: list[NexusData],
xs: str | None = None,
normalize_to_unity: bool = True,
q_cutoff: float = 0.01,
global_fit: bool = False,
poly_degree: int | None = None,
poly_points: int = 3,
):
"""Stitch and normalize data sets.
Parameters
----------
reduction_list:
list of data sets to stitch
xs:
name of the cross-section to use for the first data set
normalize_to_unity:
if `True`, the specular ridge will be normalized to 1
q_cutoff:
used if `normalize_to_unity` = `True`, data with q < `q_cutoff` are part of the specular ridge
global_fit:
if `True`, use data from all cross-sections to calculate scaling factors
poly_degree:
if not `None`, find the scaling factor by simultaneously fitting a polynomial and scaling factor to the two curves
poly_points:
number of additional points on each end of the overlap region to include in the fit
Returns
-------
Tuple
(list of scaling factors, list of scaling factor errors)
Raises
------
NormalizeToUnityQCutoffError:
if `normalize_to_unity` = `True`, but there is no data below `q_cutoff`
"""
if not reduction_list:
return []
# Select the cross-section we will use to determine the scaling factors
if xs is None:
xs = list(reduction_list[0].cross_sections.keys())[0]
# First, determine the overall scaling factor as needed
scaling_factor = 1.0
scaling_error = 0.0
if normalize_to_unity:
# Calculate scaling factor for normalizing the specular ridge to 1
idx_list = reduction_list[0].cross_sections[xs].q < q_cutoff
if not any(idx_list):
raise NormalizeToUnityQCutoffError(
f"""No data below Q cutoff.\n
Critical Q cutoff: {q_cutoff}\n
Smallest Q value: {reduction_list[0].cross_sections[xs].q.min():.5f}"""
)
total = 0
weights = 0
for i in range(len(reduction_list[0].cross_sections[xs]._r)):
if idx_list[i]:
w = 1.0 / float(reduction_list[0].cross_sections[xs]._dr[i]) ** 2
total += w * float(reduction_list[0].cross_sections[xs]._r[i])
weights += w
if weights > 0 and total > 0:
# Since weighted_average = \sum{ r_i / dr_i^2 } / \sum{ 1 / dr_i^2 }, the error is
# weighted_average_error = 1 / \sqrt{ \sum{ 1 / dr_i^2 } } = 1 / \sqrt{weights}
weighted_average = total / weights
weighted_average_error = 1.0 / np.sqrt(weights)
# Scaling factor S = 1 / u, error dS = du / u^2
scaling_factor = 1 / weighted_average
scaling_error = weighted_average_error / weighted_average**2
reduction_list[0].set_parameter("scaling_factor", scaling_factor)
reduction_list[0].set_parameter("scaling_error", scaling_error)
else:
scaling_factor = reduction_list[0].cross_sections[xs].configuration.scaling_factor
# Stitch the data sets together
running_scale = scaling_factor
running_error = scaling_error
scaling_factors = [running_scale]
scaling_errors = [running_error]
for i in range(len(reduction_list) - 1):
# Low-Q data set
_previous_ws = _prepare_workspace_for_stitching(
reduction_list[i].cross_sections, xs, global_fit, "low_q_workspace"
)
# High-Q data set
ws = _prepare_workspace_for_stitching(reduction_list[i + 1].cross_sections, xs, global_fit, "high_q_workspace")
# Determine overlap between workspaces after removing leading/trailing zeroes
# Note: Stitch1D can find the overlap automatically, but it fails if the second workspace x_min is smaller
# than the first workspace x_min, which can arise especially with constant-Q binning
start_overlap = max(_previous_ws.readX(0)[0], ws.readX(0)[0])
end_overlap = min(_previous_ws.readX(0)[-1], ws.readX(0)[-1])
if isinstance(poly_degree, int) and poly_degree > 0:
output_poly = _get_polynomial_fit_stitching_scaling_factor(_previous_ws, ws, poly_degree, poly_points)
scale = output_poly["scale_factor_value"]
scale_error = output_poly["scale_factor_error"]
else:
_, scale = api.Stitch1D(
_previous_ws,
ws,
StartOverlap=start_overlap,
EndOverlap=end_overlap,
OutputScalingWorkspace="ws_stitching_scale",
)
scale_error = api.mtd["ws_stitching_scale"].readE(0)[0]
# Calculate the error in the product of two scaling factors, f1 * f2, as \sqrt{(df2 * f1)^2 + (df1 * f2)^2}
running_error = np.sqrt((scale_error * running_scale) ** 2 + (running_error * scale) ** 2)
scaling_errors.append(running_error)
reduction_list[i + 1].set_parameter("scaling_error", running_error)
running_scale *= scale
scaling_factors.append(running_scale)
reduction_list[i + 1].set_parameter("scaling_factor", running_scale)
return scaling_factors, scaling_errors
[docs]
def merge_reflectivity(reduction_list, xs, q_min=0.001, q_step=-0.01):
"""
Combine the workspaces for a given cross-section into a single workspace.
TODO: trim workspaces
trim_first = [item.cross_sections[pol_state].configuration.cut_first_n_points for item in self.data_manager.reduction_list]
trim_last = [item.cross_sections[pol_state].configuration.cut_last_n_points for item in self.data_manager.reduction_list]
"""
ws_list = []
scaling_factors = []
q_max = q_min
for i in range(len(reduction_list)):
# If we couldn't calculate the reflectivity, we won't have a workspace available
if reduction_list[i].cross_sections[xs].reflectivity_workspace is None:
continue
_, _q_max = reduction_list[i].get_q_range()
q_max = max(q_max, _q_max)
ws_name = str(reduction_list[i].cross_sections[xs].reflectivity_workspace)
# Stitch1DMany only scales workspaces relative to the first one
if i == 0:
api.Scale(
InputWorkspace=ws_name,
OutputWorkspace=ws_name + "_histo",
factor=reduction_list[i].cross_sections[xs].configuration.scaling_factor,
Operation="Multiply",
)
api.ConvertToHistogram(InputWorkspace=ws_name + "_histo", OutputWorkspace=ws_name + "_histo")
else:
scaling_factors.append(reduction_list[i].cross_sections[xs].configuration.scaling_factor)
api.ConvertToHistogram(InputWorkspace=ws_name, OutputWorkspace=ws_name + "_histo")
ws_list.append(ws_name + "_histo")
params = f"{q_min}, {q_step}, {q_max}"
if len(ws_list) > 1:
merged_ws, _ = api.Stitch1DMany(
InputWorkspaces=ws_list,
Params=params,
UseManualScaleFactors=True,
ManualScaleFactors=scaling_factors,
OutputWorkspace=ws_name + "_merged",
)
# sort WorkspaceGroup by theta
merged_ws = sorted(merged_ws, key=lambda ws: ws.getRun().getProperty("two_theta").value)
elif len(ws_list) == 1:
merged_ws = api.CloneWorkspace(ws_list[0], OutputWorkspace=ws_name + "_merged")
else:
return None
# Remove temporary workspaces
for ws in ws_list:
api.DeleteWorkspace(ws)
api.SaveAscii(InputWorkspace=merged_ws, Filename="/tmp/test.txt")
return merged_ws
[docs]
def get_scaled_workspaces(reduction_list: list[NexusData], xs: str):
"""Return a list of scaled workspaces."""
ws_list = []
for i in range(len(reduction_list)):
# If we couldn't calculate the reflectivity, we won't have a workspace available
if reduction_list[i].cross_sections[xs].reflectivity_workspace is None:
continue
ws_name = str(reduction_list[i].cross_sections[xs].reflectivity_workspace)
ws_tmp = api.Scale(
InputWorkspace=ws_name,
OutputWorkspace=ws_name + "_scaled",
factor=reduction_list[i].cross_sections[xs].configuration.scaling_factor,
Operation="Multiply",
)
api.AddSampleLog(
Workspace=ws_tmp,
LogName="scaling_factor",
LogText=str(reduction_list[i].cross_sections[xs].configuration.scaling_factor),
LogType="Number",
LogUnit="",
)
ws_list.append(ws_tmp)
return ws_list
[docs]
def read_log(ws: api.Workspace, name: str, target_units: str = "", assumed_units: str = ""):
"""Read a log value, taking care of units.
If the log entry has no units, the target units are assumed.
Parameters
----------
ws:
Mantid workspace
name:
Name of the property to read
target_units:
Units to convert to
assumed_units:
Units of origin, if not specified in the log itself
"""
_units = {
"m": {
"mm": 1000.0,
},
"mm": {
"m": 0.001,
},
"deg": {
"rad": math.pi / 180.0,
},
"rad": {
"deg": 180.0 / math.pi,
},
}
prop = ws.getRun().getProperty(name)
value = prop.getStatistics().mean
# If the property has units we don't recognize, use the assumed units
units = prop.units if prop.units in _units else assumed_units
if units in _units and target_units in _units[units]:
return value * _units[units][target_units]
return value