Source code for quicknxs.models.data_manipulation

"""Methods used to process data, usually calling Mantid."""

import logging
import math
import time

import h5py
import mantid
import mantid.simpleapi as api
import numpy as np
from mantid.api import MatrixWorkspace

from quicknxs.exceptions import NormalizeToUnityQCutoffError
from quicknxs.models.data_set import NexusData, NexusMetaData
from quicknxs.models.instrument import Instrument


[docs] def generate_short_script(reduction_list): """Generate a simple reduction script for Mantid.""" if len(reduction_list) == 0: return "# No data in reduction list\n" xs = list(reduction_list[0].cross_sections.keys())[0] script = f"# Mantid version {mantid.__version__}\n" script += f"# Date: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n" script += "from mantid.simpleapi import *\n" logging.info("Cross section for script %s", xs) for i in range(len(reduction_list)): # If we couldn't calculate the reflectivity, we won't have a workspace available if reduction_list[i].cross_sections[xs]._reflectivity_workspacegroup is None: logging.info(" No workspace: %s", i) continue ws_name = str(reduction_list[i].cross_sections[xs]._reflectivity_workspacegroup) # NOTE: It is possible that only one cross section is present in the run, therefore # api.mtd[ws_name] could be a Workspace2D instead of a workspace group. # Given that a modernization process is scheduled, we are going to do a ducktape # fixing here. contain_single_crosssection = False try: if len(api.mtd[ws_name]) == 0: logging.info(" No entry in %s workspace group", ws_name) except TypeError: # NOTE: based on the information we have, the only possible data type here should be # a workspace2D contain_single_crosssection = True logging.info(" single cross section in %s (Workspace2D)", ws_name) # short-hand it ws_iterable = [api.mtd[ws_name]] if contain_single_crosssection else api.mtd[ws_name] script += f"# Run:{reduction_list[i].cross_sections[xs].run_number}\n" script_text = api.GeneratePythonScript(ws_iterable[0]) script += script_text.replace(", ", ",\n ") script += "\n\n" script += "# Crop points on each end\n" q = ws_iterable[0].readX(0) p_0 = reduction_list[i].cross_sections[xs].configuration.cut_first_n_points p_f = len(q) - reduction_list[i].cross_sections[xs].configuration.cut_last_n_points - 1 q0 = q[p_0] qf = q[p_f] logging.info("%s %s %s %s", q0, qf, p_0, p_f) for item in ws_iterable: script += f"CropWorkspace(InputWorkspace='{str(item)}', XMin={q0}, XMax={qf}, " script += f"OutputWorkspace='{str(item)}')\n" script += f"Scale(InputWorkspace='{str(item)}', Operation='Multiply', " script += f"Factor={reduction_list[i].cross_sections[xs].configuration.scaling_factor}, " script += f"OutputWorkspace='{str(item)}')\n\n" return script
[docs] def generate_script(reduction_list: list[NexusData], pol_state: str): """Generate a Mantid script for the reflectivity reduction. Parameters ---------- reduction_list: list of NexusData objects pol_state: cross-section name """ ws_list = get_scaled_workspaces(reduction_list, pol_state) # If the reflectivity calculation failed, we may not have data to work with # for this cross-section. if not ws_list: return "" script = f"# Cross-section: {pol_state}\n" for ws in ws_list: script += f"# Run:{ws.getRunNumber()}\n" script_text = api.GeneratePythonScript(ws) script += script_text.replace(", ", ",\n ") script += "\n" return script
def _prepare_workspace_for_stitching( cross_sections: dict, xs_input: str, global_fit: bool, ws_name: str ) -> MatrixWorkspace: """Create a workspace from a CrossSectionData object that we can call Stitch1D on. Parameters ---------- cross_sections: Dictionary with cross-section name and CrossSectionData objects xs: Which cross-section to use global_fit: if True, merge data from all cross-sections ws_name: Name for the output Mantid workspace """ if global_fit: xs_names = cross_sections.keys() else: xs_names = [xs_input] ws_list = [] for xs in xs_names: cross_section = cross_sections[xs] n_total = len(cross_section.q) p_0 = cross_section.configuration.cut_first_n_points p_n = n_total - cross_section.configuration.cut_last_n_points # remove leading/trailing indices where cross_section._r is masked while p_0 < p_n and np.ma.is_masked(cross_section._r[p_0]): p_0 += 1 while p_n > p_0 and np.ma.is_masked(cross_section._r[p_n - 1]): p_n -= 1 if p_n <= p_0: raise ValueError(f"No valid data points in cross-section {xs}") ws_xs = api.CreateWorkspace( DataX=cross_section.q[p_0:p_n], DataY=cross_section._r[p_0:p_n], DataE=cross_section._dr[p_0:p_n], OutputWorkspace="tmp_prepare_stitching_" + xs, ) ws_list.append(str(ws_xs)) ws_merge = api.MergeRuns(ws_list) ws_merge.setDistribution(True) ws = api.ConvertToHistogram(ws_merge, OutputWorkspace=ws_name) # Delete temporary workspaces api.DeleteWorkspaces(ws_list) api.DeleteWorkspace(ws_merge) return ws def _get_stitching_overlap_region(ws_lo: MatrixWorkspace, ws_hi: MatrixWorkspace, n_points_outside_overlap: int = 3): """Get the x boundary values of the overlap region between two workspaces. Parameters ---------- ws_lo: The low-Q reflectivity curve ws_hi: The high-Q reflectivity curve n_points_outside_overlap: Number of additional points on each end of the overlap region to include in the fit Returns ------- tuple (min, max) defining the overlap region Raises ------ ValueError: If the x-range of workspace ws_lo is higher than the x-range of ws_hi """ if ws_lo.readX(0)[0] > ws_hi.readX(0)[0] or ws_lo.readX(0)[-1] > ws_hi.readX(0)[-1]: raise ValueError("x-range for ws_lo must not be higher than x-range for ws_hi") # get initial boundaries from lowest Q of the high-Q curve and highest Q of the low-Q curve x_lo_max = ws_lo.readX(0)[-1] x_hi_min = ws_hi.readX(0)[0] if x_hi_min > x_lo_max: # no overlap, use only extra points min_idx_lo = -n_points_outside_overlap max_idx_hi = n_points_outside_overlap - 1 else: # use overlap plus extra points min_idx_lo = np.where(ws_lo.readX(0) >= x_hi_min)[0][0] min_idx_lo = max(min_idx_lo - n_points_outside_overlap, 0) max_idx_hi = np.where(ws_hi.readX(0) <= x_lo_max)[0][-1] max_idx_hi = min(max_idx_hi + n_points_outside_overlap, ws_hi.readX(0).size - 1) # get x values corresponding to the found indices xmin = ws_lo.readX(0)[min_idx_lo] xmax = ws_hi.readX(0)[max_idx_hi] return xmin, xmax def _get_polynomial_fit_stitching_scaling_factor( ws_lo: MatrixWorkspace, ws_hi: MatrixWorkspace, n_polynom: int, n_points_outside_overlap: int = 3, ): """Get the scaling factor for stitching two curves by fitting a polynomial and a scaling factor. For example, if the polynomial degree is 3, the scaling factor is obtained by minimizing the function f(ws_lo, ws_hi) = sum_{ws_lo}{A0 + A1*x_i + A2*x_i^2 + A3*x_i^3} + scaling_factor * sum_{ws_hi}{A0 + A1*x_i + A2*x_i^2 + A3*x_i^3} where the independent variables are A0, A1, A2, A3 and scaling_factor. Parameters ---------- ws_lo: Low-Q reflectivity curve ws_hi: High-Q reflectivity curve n_polynom: Degree of the polynomial to fit the curves to n_points_outside_overlap: Number of additional points on each end of the overlap region to include in the fit Returns ------- Dictionary with keys `scale_factor_value`, `scale_factor_error`, `polynomial_coeffients` Raises ------ RunTimeError: If there are too few data points compared to the number of independent variables ValueError: If the x-range of workspace ws_lo is higher than the x-range of ws_hi """ # crop to the overlap region of the curves plus extra points xmin, xmax = _get_stitching_overlap_region(ws_lo, ws_hi, n_points_outside_overlap) ws_lo_crop = api.CropWorkspace(InputWorkspace=ws_lo, XMin=xmin, XMax=xmax) ws_hi_crop = api.CropWorkspace(InputWorkspace=ws_hi, XMin=xmin, XMax=xmax) # build the strings to represent the polynomial, initial values and ties in the Mantid function definition formula_str = "A0" initial_val_str = "A0=1" ties_str = "f1.A0=f0.A0" for i in range(1, n_polynom + 1): formula_str += f"+A{i}*x^{i}" # "A0 + A1*x + A2*x^2 + ..." initial_val_str += f", A{i}=1" # "A0=1, A1=1, ..." ties_str += f",f1.A{i}=f0.A{i}" # "f1.A0=f0.A0, f1.A1=f0.A1, ..." poly_func = f";name=UserFunction, Formula={formula_str}, {initial_val_str}, $domains=0" scaled_poly_func = ( f";name=UserFunction, Formula=poly_scale*({formula_str}), poly_scale=1, {initial_val_str}, $domains=1" ) ties = f";ties=({ties_str})" multi_func = "composite=MultiDomainFunction, NumDeriv=1" + poly_func + scaled_poly_func + ties # fit MultiDomain function to the low-Q and high-Q workspaces fit = api.Fit( Function=multi_func, InputWorkspace=ws_lo_crop, WorkspaceIndex=0, InputWorkspace_1=ws_hi_crop, WorkspaceIndex_1=0, Output="fit", ) # the scaling factor poly_scale scaled the polynomial to match the high-Q reflectivity curve, therefore, take the # inverse to get the scaling factor for matching the high-Q reflectivity curve with the low-Q curve output = {} i_scale = n_polynom + 1 poly_scale = fit.Function.function.getParameterValue(i_scale) poly_scale_error = fit.Function.function.getError(i_scale) output["scale_factor_value"] = 1 / poly_scale output["scale_factor_error"] = poly_scale_error / poly_scale**2 output["polynomial_coeff"] = fit.OutputParameters.column(1)[: n_polynom + 1] # delete temporary workspaces api.DeleteWorkspace(ws_lo_crop) api.DeleteWorkspace(ws_hi_crop) return output
[docs] def stitch_reflectivity( reduction_list: list[NexusData], xs: str | None = None, normalize_to_unity: bool = True, q_cutoff: float = 0.01, global_fit: bool = False, poly_degree: int | None = None, poly_points: int = 3, ): """Stitch and normalize data sets. Parameters ---------- reduction_list: list of data sets to stitch xs: name of the cross-section to use for the first data set normalize_to_unity: if `True`, the specular ridge will be normalized to 1 q_cutoff: used if `normalize_to_unity` = `True`, data with q < `q_cutoff` are part of the specular ridge global_fit: if `True`, use data from all cross-sections to calculate scaling factors poly_degree: if not `None`, find the scaling factor by simultaneously fitting a polynomial and scaling factor to the two curves poly_points: number of additional points on each end of the overlap region to include in the fit Returns ------- Tuple (list of scaling factors, list of scaling factor errors) Raises ------ NormalizeToUnityQCutoffError: if `normalize_to_unity` = `True`, but there is no data below `q_cutoff` """ if not reduction_list: return [] # Select the cross-section we will use to determine the scaling factors if xs is None: xs = list(reduction_list[0].cross_sections.keys())[0] # First, determine the overall scaling factor as needed scaling_factor = 1.0 scaling_error = 0.0 if normalize_to_unity: # Calculate scaling factor for normalizing the specular ridge to 1 idx_list = reduction_list[0].cross_sections[xs].q < q_cutoff if not any(idx_list): raise NormalizeToUnityQCutoffError( f"""No data below Q cutoff.\n Critical Q cutoff: {q_cutoff}\n Smallest Q value: {reduction_list[0].cross_sections[xs].q.min():.5f}""" ) total = 0 weights = 0 for i in range(len(reduction_list[0].cross_sections[xs]._r)): if idx_list[i]: w = 1.0 / float(reduction_list[0].cross_sections[xs]._dr[i]) ** 2 total += w * float(reduction_list[0].cross_sections[xs]._r[i]) weights += w if weights > 0 and total > 0: # Since weighted_average = \sum{ r_i / dr_i^2 } / \sum{ 1 / dr_i^2 }, the error is # weighted_average_error = 1 / \sqrt{ \sum{ 1 / dr_i^2 } } = 1 / \sqrt{weights} weighted_average = total / weights weighted_average_error = 1.0 / np.sqrt(weights) # Scaling factor S = 1 / u, error dS = du / u^2 scaling_factor = 1 / weighted_average scaling_error = weighted_average_error / weighted_average**2 reduction_list[0].set_parameter("scaling_factor", scaling_factor) reduction_list[0].set_parameter("scaling_error", scaling_error) else: scaling_factor = reduction_list[0].cross_sections[xs].configuration.scaling_factor # Stitch the data sets together running_scale = scaling_factor running_error = scaling_error scaling_factors = [running_scale] scaling_errors = [running_error] for i in range(len(reduction_list) - 1): # Low-Q data set _previous_ws = _prepare_workspace_for_stitching( reduction_list[i].cross_sections, xs, global_fit, "low_q_workspace" ) # High-Q data set ws = _prepare_workspace_for_stitching(reduction_list[i + 1].cross_sections, xs, global_fit, "high_q_workspace") # Determine overlap between workspaces after removing leading/trailing zeroes # Note: Stitch1D can find the overlap automatically, but it fails if the second workspace x_min is smaller # than the first workspace x_min, which can arise especially with constant-Q binning start_overlap = max(_previous_ws.readX(0)[0], ws.readX(0)[0]) end_overlap = min(_previous_ws.readX(0)[-1], ws.readX(0)[-1]) if isinstance(poly_degree, int) and poly_degree > 0: output_poly = _get_polynomial_fit_stitching_scaling_factor(_previous_ws, ws, poly_degree, poly_points) scale = output_poly["scale_factor_value"] scale_error = output_poly["scale_factor_error"] else: _, scale = api.Stitch1D( _previous_ws, ws, StartOverlap=start_overlap, EndOverlap=end_overlap, OutputScalingWorkspace="ws_stitching_scale", ) scale_error = api.mtd["ws_stitching_scale"].readE(0)[0] # Calculate the error in the product of two scaling factors, f1 * f2, as \sqrt{(df2 * f1)^2 + (df1 * f2)^2} running_error = np.sqrt((scale_error * running_scale) ** 2 + (running_error * scale) ** 2) scaling_errors.append(running_error) reduction_list[i + 1].set_parameter("scaling_error", running_error) running_scale *= scale scaling_factors.append(running_scale) reduction_list[i + 1].set_parameter("scaling_factor", running_scale) return scaling_factors, scaling_errors
[docs] def merge_reflectivity(reduction_list, xs, q_min=0.001, q_step=-0.01): """ Combine the workspaces for a given cross-section into a single workspace. TODO: trim workspaces trim_first = [item.cross_sections[pol_state].configuration.cut_first_n_points for item in self.data_manager.reduction_list] trim_last = [item.cross_sections[pol_state].configuration.cut_last_n_points for item in self.data_manager.reduction_list] """ ws_list = [] scaling_factors = [] q_max = q_min for i in range(len(reduction_list)): # If we couldn't calculate the reflectivity, we won't have a workspace available if reduction_list[i].cross_sections[xs].reflectivity_workspace is None: continue _, _q_max = reduction_list[i].get_q_range() q_max = max(q_max, _q_max) ws_name = str(reduction_list[i].cross_sections[xs].reflectivity_workspace) # Stitch1DMany only scales workspaces relative to the first one if i == 0: api.Scale( InputWorkspace=ws_name, OutputWorkspace=ws_name + "_histo", factor=reduction_list[i].cross_sections[xs].configuration.scaling_factor, Operation="Multiply", ) api.ConvertToHistogram(InputWorkspace=ws_name + "_histo", OutputWorkspace=ws_name + "_histo") else: scaling_factors.append(reduction_list[i].cross_sections[xs].configuration.scaling_factor) api.ConvertToHistogram(InputWorkspace=ws_name, OutputWorkspace=ws_name + "_histo") ws_list.append(ws_name + "_histo") params = f"{q_min}, {q_step}, {q_max}" if len(ws_list) > 1: merged_ws, _ = api.Stitch1DMany( InputWorkspaces=ws_list, Params=params, UseManualScaleFactors=True, ManualScaleFactors=scaling_factors, OutputWorkspace=ws_name + "_merged", ) # sort WorkspaceGroup by theta merged_ws = sorted(merged_ws, key=lambda ws: ws.getRun().getProperty("two_theta").value) elif len(ws_list) == 1: merged_ws = api.CloneWorkspace(ws_list[0], OutputWorkspace=ws_name + "_merged") else: return None # Remove temporary workspaces for ws in ws_list: api.DeleteWorkspace(ws) api.SaveAscii(InputWorkspace=merged_ws, Filename="/tmp/test.txt") return merged_ws
[docs] def get_scaled_workspaces(reduction_list: list[NexusData], xs: str): """Return a list of scaled workspaces.""" ws_list = [] for i in range(len(reduction_list)): # If we couldn't calculate the reflectivity, we won't have a workspace available if reduction_list[i].cross_sections[xs].reflectivity_workspace is None: continue ws_name = str(reduction_list[i].cross_sections[xs].reflectivity_workspace) ws_tmp = api.Scale( InputWorkspace=ws_name, OutputWorkspace=ws_name + "_scaled", factor=reduction_list[i].cross_sections[xs].configuration.scaling_factor, Operation="Multiply", ) api.AddSampleLog( Workspace=ws_tmp, LogName="scaling_factor", LogText=str(reduction_list[i].cross_sections[xs].configuration.scaling_factor), LogType="Number", LogUnit="", ) ws_list.append(ws_tmp) return ws_list
[docs] def extract_metadata(file_path: str | None = None, cross_section_data=None): """Get mid Q-value from metadata.""" metadata = NexusMetaData() if cross_section_data is not None: metadata.mid_q = Instrument.mid_q_value(cross_section_data.event_workspace) metadata.is_direct_beam = cross_section_data.is_direct_beam return metadata elif file_path is None: raise RuntimeError("Either a file path or a data object must be supplied") nxs = h5py.File(file_path, mode="r") keys = list(nxs.keys()) keys.sort() nxs.close() if len(keys) == 0: logging.error("No entry in data file %s", file_path) return metadata try: ws = api.LoadEventNexus(str(file_path), MetaDataOnly=True, NXentryName=str(keys[0])) metadata.mid_q = Instrument.mid_q_value(ws) metadata.is_direct_beam = Instrument.check_direct_beam(ws) except (RuntimeError, IndexError): logging.error("Exception extracting metadata") raise RuntimeError(f"Could not load file {file_path} [{keys[0]}]") return metadata
[docs] def read_log(ws: api.Workspace, name: str, target_units: str = "", assumed_units: str = ""): """Read a log value, taking care of units. If the log entry has no units, the target units are assumed. Parameters ---------- ws: Mantid workspace name: Name of the property to read target_units: Units to convert to assumed_units: Units of origin, if not specified in the log itself """ _units = { "m": { "mm": 1000.0, }, "mm": { "m": 0.001, }, "deg": { "rad": math.pi / 180.0, }, "rad": { "deg": 180.0 / math.pi, }, } prop = ws.getRun().getProperty(name) value = prop.getStatistics().mean # If the property has units we don't recognize, use the assumed units units = prop.units if prop.units in _units else assumed_units if units in _units and target_units in _units[units]: return value * _units[units][target_units] return value