Source code for quicknxs.interfaces.data_handling.data_info

"""Meta-data information for MR reduction."""
# pylint: disable=too-few-public-methods, wrong-import-position, too-many-instance-attributes, wrong-import-order

import copy
import logging
import math
import sys
import time
from typing import Tuple

import mantid.simpleapi as api
import numpy as np
import scipy.optimize as opt
from scipy import ndimage

from quicknxs.interfaces.data_handling.peak_finding import find_peaks, peak_prominences, peak_widths

NX_PIXELS = 304
NY_PIXELS = 256


[docs] class DataInfo(object): """Class to hold the relevant information from a run (scattering or direct beam).""" peak_range_offset = 0 tolerance = 0.02 def __init__(self, ws, cross_section, configuration): self.cross_section = cross_section self.run_number = ws.getRunNumber() self.is_direct_beam = False self.data_type = 1 self.peak_position = 0 self.peak_range = [0, 0] self.low_res_range = [0, 0] self.background = configuration.bck_roi self.n_events_cutoff = 100 # ROI information self.roi_peak = [0, 0] self.roi_low_res = [0, 0] self.roi_background = [0, 0] # Options to override the ROI self.forced_peak_roi = configuration.peak_roi self.forced_low_res_roi = configuration.low_res_roi self.use_metadata_bck_roi = configuration.use_metadata_bck_roi self.forced_bck_roi = configuration.bck_roi self.use_peak_finder = configuration.use_peak_finder self.use_low_res_finder = configuration.use_low_res_finder # Peak found before fitting for the central position self.found_peak = [0, 0] self.found_low_res = [0, 0] # Processing options # Use the ROI rather than finding the ranges self.use_roi = configuration.use_roi self.use_roi_actual = False # Use background as a region on each side of the peak self.use_tight_bck = configuration.use_tight_bck # Width of the background on each side of the peak self.bck_offset = configuration.bck_offset # Update the specular peak range after finding the peak # within the ROI self.update_peak_range = configuration.update_peak_range self.wl_bandwidth = configuration.wl_bandwidth self.tof_range = self.get_tof_range(ws) self.calculated_scattering_angle = 0.0 self.theta_d = 0.0 t_0 = time.time() self.determine_data_type(ws) logging.info("INSPECT: %s sec" % (time.time() - t_0))
[docs] def get_tof_range(self, ws) -> Tuple[float, float]: """Determine TOF range from the data.""" run_object = ws.getRun() sample_detector_distance = run_object["SampleDetDis"].getStatistics().mean source_sample_distance = run_object["ModeratorSamDis"].getStatistics().mean # Check units if run_object["SampleDetDis"].units not in ["m", "meter"]: sample_detector_distance /= 1000.0 if run_object["ModeratorSamDis"].units not in ["m", "meter"]: source_sample_distance /= 1000.0 source_detector_distance = source_sample_distance + sample_detector_distance h = 6.626e-34 # m^2 kg s^-1 m = 1.675e-27 # kg wl = run_object.getProperty("LambdaRequest").value[0] chopper_speed = run_object.getProperty("SpeedRequest1").value[0] wl_offset = 0 cst = source_detector_distance / h * m half_width = self.wl_bandwidth / 2.0 tof_min = cst * (wl + wl_offset * 60.0 / chopper_speed - half_width * 60.0 / chopper_speed) * 1e-4 tof_max = cst * (wl + wl_offset * 60.0 / chopper_speed + half_width * 60.0 / chopper_speed) * 1e-4 self.tof_range = [tof_min, tof_max] return [tof_min, tof_max]
[docs] def process_roi(self, ws): """Process the ROI information from a Mantid workspace. Determines the peak range, low-resolution range, and background range. Starting in June 2018, with the DAS upgrade, the ROIs are specified with a start/width rather than start/stop. """ roi_peak = [0, 0] roi_low_res = [0, 0] # Read ROI 1 roi1_valid = True if "ROI1StartX" in ws.getRun(): roi1_x0 = ws.getRun()["ROI1StartX"].getStatistics().mean roi1_y0 = ws.getRun()["ROI1StartY"].getStatistics().mean if "ROI1SizeX" in ws.getRun(): size_x = ws.getRun()["ROI1SizeX"].getStatistics().mean size_y = ws.getRun()["ROI1SizeY"].getStatistics().mean roi1_x1 = roi1_x0 + size_x roi1_y1 = roi1_y0 + size_y else: roi1_x1 = ws.getRun()["ROI1EndX"].getStatistics().mean roi1_y1 = ws.getRun()["ROI1EndY"].getStatistics().mean if roi1_x1 > roi1_x0: peak1 = [int(roi1_x0), int(roi1_x1)] else: peak1 = [int(roi1_x1), int(roi1_x0)] if roi1_y1 > roi1_y0: low_res1 = [int(roi1_y0), int(roi1_y1)] else: low_res1 = [int(roi1_y1), int(roi1_y0)] if peak1 == [0, 0] and low_res1 == [0, 0]: roi1_valid = False # Read ROI 2 if "ROI2StartX" in ws.getRun(): roi2_valid = True roi2_x0 = ws.getRun()["ROI2StartX"].getStatistics().mean roi2_y0 = ws.getRun()["ROI2StartY"].getStatistics().mean if "ROI2SizeX" in ws.getRun(): size_x = ws.getRun()["ROI2SizeX"].getStatistics().mean size_y = ws.getRun()["ROI2SizeY"].getStatistics().mean roi2_x1 = roi2_x0 + size_x roi2_y1 = roi2_y0 + size_y else: roi2_x1 = ws.getRun()["ROI2EndX"].getStatistics().mean roi2_y1 = ws.getRun()["ROI2EndY"].getStatistics().mean if roi2_x1 > roi2_x0: peak2 = [int(roi2_x0), int(roi2_x1)] else: peak2 = [int(roi2_x1), int(roi2_x0)] if roi2_y1 > roi2_y0: low_res2 = [int(roi2_y0), int(roi2_y1)] else: low_res2 = [int(roi2_y1), int(roi2_y0)] if peak2 == [0, 0] and low_res2 == [0, 0]: roi2_valid = False else: roi2_valid = False else: roi1_valid = False roi2_valid = False # Pick the ROI that describes the reflectivity peak if roi1_valid and not roi2_valid: roi_peak = peak1 roi_low_res = low_res1 roi_background = [0, 0] elif roi2_valid and not roi1_valid: roi_peak = peak2 roi_low_res = low_res2 roi_background = [0, 0] elif roi1_valid and roi2_valid: # If ROI 2 is within ROI 1, treat it as the peak, # otherwise, use ROI 1 if peak1[0] >= peak2[0] and peak1[1] <= peak2[1]: roi_peak = peak1 roi_low_res = low_res1 elif peak2[0] >= peak1[0] and peak2[1] <= peak1[1]: roi_peak = peak2 roi_low_res = low_res2 else: roi_peak = peak1 roi_low_res = low_res1 # After all this, update the ROI according to reduction options self.roi_peak = roi_peak self.roi_low_res = roi_low_res self.metadata_peak2 = peak2 if self.use_metadata_bck_roi == True: self.background = peak2 self.roi_background = peak2
[docs] def determine_data_type(self, ws): """Inspect the data and determine peak locations and data type.""" # Skip empty data entries if ws.getNumberEvents() < self.n_events_cutoff: self.data_type = -1 logging.info("No data for %s %s" % (self.run_number, self.cross_section)) return # Find reflectivity peak and low resolution ranges # fitter = Fitter(ws, True) fitter = Fitter2(ws) peak, low_res = fitter.fit_2d_peak() self.found_peak = copy.copy(peak) self.found_low_res = copy.copy(low_res) logging.info("Run %s [%s]: Peak found %s" % (self.run_number, self.cross_section, peak)) logging.info("Run %s [%s]: Low-res found %s" % (self.run_number, self.cross_section, str(low_res))) # Process the ROI information try: self.process_roi(ws) except: logging.info("Could not process ROI\n%s" % sys.exc_info()[1]) # Keep track of whether we actually used the ROI self.use_roi_actual = False # If we were asked to use the ROI but no peak is in it, use the peak we found # If we were asked to use the ROI and there's a peak in it, use the ROI if self.use_roi and not self.update_peak_range and not self.roi_peak == [0, 0]: logging.warning(f"Using ROI peak range: {self.roi_peak}") self.use_roi_actual = True peak = copy.copy(self.roi_peak) if not self.roi_low_res == [0, 0]: low_res = copy.copy(self.roi_low_res) else: logging.warning(f"Using fit peak range: {peak}") # Background if self.use_tight_bck: bck_range = [int(max(0.0, peak[0] - self.bck_offset)), int(min(NX_PIXELS, peak[1] + self.bck_offset))] else: bck_range = self.background logging.warning(f"Using ROI low-res range: {low_res}") logging.warning(f"Using ROI background range: {bck_range}") # Store the information we found self.peak_position = (peak[1] + peak[0]) / 2.0 self.peak_range = [int(max(0, peak[0])), int(min(peak[1], NX_PIXELS))] self.low_res_range = [int(max(0, low_res[0])), int(min(low_res[1], NY_PIXELS))] self.background = [int(max(0, bck_range[0])), int(min(bck_range[1], NX_PIXELS))] # Computed scattering angle self.calculated_scattering_angle = api.MRGetTheta(ws, SpecularPixel=self.peak_position) self.calculated_scattering_angle *= 180.0 / math.pi # Determine whether we have a direct beam run_object = ws.getRun() try: self.is_direct_beam = run_object.getProperty("data_type").value[0] == 1 self.data_type = 0 if self.is_direct_beam else 1 except: self.is_direct_beam = False self.data_type = 1
[docs] def chi2(data, model): """Returns the chi^2 for a data set and model pair.""" err = np.fabs(data) err[err <= 0] = 1 return np.sum((data - model) ** 2 / err) / len(data)
[docs] class Fitter2(object): """Class to fit the data and find the peak and beam width.""" DEAD_PIXELS = 10 def __init__(self, workspace): self.workspace = workspace self._prepare_data() def _prepare_data(self): """Read in the data and create arrays for fitting.""" # Prepare data to fit self.n_x = int(self.workspace.getInstrument().getNumberParameter("number-of-x-pixels")[0]) self.n_y = int(self.workspace.getInstrument().getNumberParameter("number-of-y-pixels")[0]) _integrated = api.Integration(InputWorkspace=self.workspace) signal = _integrated.extractY() self.z = np.reshape(signal, (self.n_x, self.n_y)) self.y = np.arange(0, self.n_y)[self.DEAD_PIXELS : -self.DEAD_PIXELS] # 1D data x/y vs counts self.x_vs_counts = np.sum(self.z, 1) self.y_vs_counts = np.sum(self.z, 0) self.guess_x = np.argmax(self.x_vs_counts) self.guess_wx = 6.0 def _scan_peaks(self): f1 = ndimage.gaussian_filter(self.x_vs_counts, 3) peaks, _ = find_peaks(f1) prom, _, _ = peak_prominences(f1, peaks) peaks_w, _, _, _ = peak_widths(f1, peaks) # The quality factor is the size of the peak (height*width) multiply by # a factor that peaks in the middle of the detector, where the peak usually is. nx = 304.0 delta = 100.0 mid_point = 150.0 quality_pos = np.exp(-((mid_point - peaks) ** 2.0) / 2000.0) low_peaks = peaks < delta high_peaks = peaks > nx - delta quality_pos[low_peaks] = quality_pos[low_peaks] * (1 - np.abs(delta - peaks[low_peaks]) / delta) ** 3 quality_pos[high_peaks] = quality_pos[high_peaks] * (1 - np.abs(nx - delta - peaks[high_peaks]) / delta) ** 3 quality = -peaks_w * prom * quality_pos zipped = zip(peaks, peaks_w, quality, prom) ordered = sorted(zipped, key=lambda a: a[2]) found_peaks = [p[0] for p in ordered] if found_peaks: # self.guess_x = ordered[0][0] # self.guess_ws = ordered[0][1] i_final = 0 if ( len(ordered) > 1 and (ordered[0][2] - ordered[1][2]) / ordered[0][2] < 0.75 and ordered[1][0] < ordered[0][0] ): i_final = 1 self.guess_x = ordered[i_final][0] self.guess_ws = ordered[i_final][1] return found_peaks
[docs] def fit_2d_peak(self): """Backward compatibility.""" spec_peak = self.fit_peak() beam_peak = self.fit_beam_width() return spec_peak, beam_peak
[docs] def fit_peak(self): self.peaks = self._scan_peaks() # Package the best results x_min = max(0, int(self.guess_x - np.fabs(self.guess_wx))) x_max = min(self.n_x - 1, int(self.guess_x + np.fabs(self.guess_wx))) return [x_min, x_max]
[docs] def gaussian_1d(self, value, *p): """1D Gaussian.""" A, center_x, width_x, background = p A = np.abs(A) values = A * np.exp(-((value - center_x) ** 2) / (2.0 * width_x**2)) values += background return values
[docs] def peak_derivative(self, value, *p): """Double Gaussian to fit the first derivative of a plateau/peak.""" A, center_x, width_x, edge_width, background = p mu_right = center_x + width_x / 2.0 mu_left = center_x - width_x / 2.0 A = np.abs(A) values = A * np.exp(-((value - mu_left) ** 2) / (2.0 * edge_width**2)) - A * np.exp( -((value - mu_right) ** 2) / (2.0 * edge_width**2) ) values += background return values
def _perform_beam_fit(self, y_d, derivative, derivative_err, y_r=None, signal_r=None, gaussian_first=False): if gaussian_first: _running_err = np.sqrt(signal_r) _gauss, _ = opt.curve_fit( self.gaussian_1d, y_r, signal_r, p0=[np.max(signal_r), 140, 50, 0], sigma=_running_err ) p0 = [np.max(derivative), _gauss[1], 2.0 * _gauss[2], 5, 0] else: p0 = [np.max(derivative), 140, 60, 5, 0] # p = A, center_x, width_x, edge_width, background _coef, _ = opt.curve_fit(self.peak_derivative, y_d, derivative, p0=p0, sigma=derivative_err) return _coef
[docs] def fit_beam_width(self): """Fit the data distribution in y and get its range.""" peak_min = 0 peak_max = self.n_x try: _integral = [np.sum(self.y_vs_counts[:i]) for i in range(len(self.y_vs_counts))] _running = 0.1 * np.convolve(self.y_vs_counts, np.ones(10), mode="valid") _deriv = np.asarray([_running[i + 1] - _running[i] for i in range(len(_running) - 1)]) _deriv_err = np.sqrt(_running)[:-1] _deriv_err[_deriv_err < 1] = 1 _y = np.arange(len(self.y_vs_counts))[5:-5] _coef = self._perform_beam_fit(_y, _deriv, _deriv_err, gaussian_first=False) peak_min = _coef[1] - np.abs(_coef[2]) / 2.0 - 2.0 * np.abs(_coef[3]) peak_max = _coef[1] + np.abs(_coef[2]) / 2.0 + 2.0 * np.abs(_coef[3]) if peak_max - peak_min < 10: logging.error("Low statisting: trying again") _y_running = self.y[5:-4] _coef = self._perform_beam_fit(_y, _deriv, _deriv_err, _y_running, _running, gaussian_first=True) self.guess_y = _coef[1] self.guess_wy = (peak_max - peak_min) / 2.0 peak_min = max(peak_min, self.DEAD_PIXELS) peak_max = min(peak_max, self.n_x - self.DEAD_PIXELS) except: logging.error("Could not fit the beam width") return [peak_min, peak_max]