Source code for quicknxs.interfaces.data_handling.data_info

"""
Meta-data information for MR reduction
"""
# pylint: disable=too-few-public-methods, wrong-import-position, too-many-instance-attributes, wrong-import-order

import copy
import logging
import math
import sys
import time

import mantid.simpleapi as api
import numpy as np
import scipy.optimize as opt
from scipy import ndimage

from quicknxs.interfaces.data_handling.peak_finding import find_peaks, peak_prominences, peak_widths

NX_PIXELS = 304
NY_PIXELS = 256


[docs] class DataInfo(object): """ Class to hold the relevant information from a run (scattering or direct beam). """ peak_range_offset = 0 tolerance = 0.02 def __init__(self, ws, cross_section, configuration): self.cross_section = cross_section self.run_number = ws.getRunNumber() self.is_direct_beam = False self.data_type = 1 self.peak_position = 0 self.peak_range = [0, 0] self.low_res_range = [0, 0] self.background = configuration.bck_roi self.n_events_cutoff = 100 # ROI information self.roi_peak = [0, 0] self.roi_low_res = [0, 0] self.roi_background = [0, 0] # Options to override the ROI self.forced_peak_roi = configuration.peak_roi self.forced_low_res_roi = configuration.low_res_roi self.force_bck_roi = configuration.force_bck_roi self.forced_bck_roi = configuration.bck_roi self.use_peak_finder = configuration.use_peak_finder self.use_low_res_finder = configuration.use_low_res_finder # Peak found before fitting for the central position self.found_peak = [0, 0] self.found_low_res = [0, 0] # Processing options # Use the ROI rather than finding the ranges self.use_roi = configuration.use_roi self.use_roi_actual = False # Use the 2nd ROI as the background, if available self.use_roi_bck = configuration.use_roi_bck # Use background as a region on each side of the peak self.use_tight_bck = configuration.use_tight_bck # Width of the background on each side of the peak self.bck_offset = configuration.bck_offset # Update the specular peak range after finding the peak # within the ROI self.update_peak_range = configuration.update_peak_range self.wl_bandwidth = configuration.wl_bandwidth self.tof_range = self.get_tof_range(ws) self.calculated_scattering_angle = 0.0 self.theta_d = 0.0 t_0 = time.time() self.determine_data_type(ws) logging.info("INSPECT: %s sec" % (time.time() - t_0))
[docs] def get_tof_range(self, ws): """ Determine TOF range from the data :param workspace ws: workspace to work with """ run_object = ws.getRun() sample_detector_distance = run_object["SampleDetDis"].getStatistics().mean source_sample_distance = run_object["ModeratorSamDis"].getStatistics().mean # Check units if run_object["SampleDetDis"].units not in ["m", "meter"]: sample_detector_distance /= 1000.0 if run_object["ModeratorSamDis"].units not in ["m", "meter"]: source_sample_distance /= 1000.0 source_detector_distance = source_sample_distance + sample_detector_distance h = 6.626e-34 # m^2 kg s^-1 m = 1.675e-27 # kg wl = run_object.getProperty("LambdaRequest").value[0] chopper_speed = run_object.getProperty("SpeedRequest1").value[0] wl_offset = 0 cst = source_detector_distance / h * m half_width = self.wl_bandwidth / 2.0 tof_min = cst * (wl + wl_offset * 60.0 / chopper_speed - half_width * 60.0 / chopper_speed) * 1e-4 tof_max = cst * (wl + wl_offset * 60.0 / chopper_speed + half_width * 60.0 / chopper_speed) * 1e-4 self.tof_range = [tof_min, tof_max] return [tof_min, tof_max]
[docs] def process_roi(self, ws): """ Process the ROI information and determine the peak range, the low-resolution range, and the background range. Starting in June 2018, with the DAS upgrade, the ROIs are specified with a start/width rather than start/stop. :param workspace ws: workspace to work with """ roi_peak = [0, 0] roi_low_res = [0, 0] # Read ROI 1 roi1_valid = True if "ROI1StartX" in ws.getRun(): roi1_x0 = ws.getRun()["ROI1StartX"].getStatistics().mean roi1_y0 = ws.getRun()["ROI1StartY"].getStatistics().mean if "ROI1SizeX" in ws.getRun(): size_x = ws.getRun()["ROI1SizeX"].getStatistics().mean size_y = ws.getRun()["ROI1SizeY"].getStatistics().mean roi1_x1 = roi1_x0 + size_x roi1_y1 = roi1_y0 + size_y else: roi1_x1 = ws.getRun()["ROI1EndX"].getStatistics().mean roi1_y1 = ws.getRun()["ROI1EndY"].getStatistics().mean if roi1_x1 > roi1_x0: peak1 = [int(roi1_x0), int(roi1_x1)] else: peak1 = [int(roi1_x1), int(roi1_x0)] if roi1_y1 > roi1_y0: low_res1 = [int(roi1_y0), int(roi1_y1)] else: low_res1 = [int(roi1_y1), int(roi1_y0)] if peak1 == [0, 0] and low_res1 == [0, 0]: roi1_valid = False # Read ROI 2 if "ROI2StartX" in ws.getRun(): roi2_valid = True roi2_x0 = ws.getRun()["ROI2StartX"].getStatistics().mean roi2_y0 = ws.getRun()["ROI2StartY"].getStatistics().mean if "ROI2SizeX" in ws.getRun(): size_x = ws.getRun()["ROI2SizeX"].getStatistics().mean size_y = ws.getRun()["ROI2SizeY"].getStatistics().mean roi2_x1 = roi2_x0 + size_x roi2_y1 = roi2_y0 + size_y else: roi2_x1 = ws.getRun()["ROI2EndX"].getStatistics().mean roi2_y1 = ws.getRun()["ROI2EndY"].getStatistics().mean if roi2_x1 > roi2_x0: peak2 = [int(roi2_x0), int(roi2_x1)] else: peak2 = [int(roi2_x1), int(roi2_x0)] if roi2_y1 > roi2_y0: low_res2 = [int(roi2_y0), int(roi2_y1)] else: low_res2 = [int(roi2_y1), int(roi2_y0)] if peak2 == [0, 0] and low_res2 == [0, 0]: roi2_valid = False else: roi2_valid = False else: roi1_valid = False roi2_valid = False # Pick the ROI that describes the reflectivity peak if roi1_valid and not roi2_valid: roi_peak = peak1 roi_low_res = low_res1 roi_background = [0, 0] elif roi2_valid and not roi1_valid: roi_peak = peak2 roi_low_res = low_res2 roi_background = [0, 0] elif roi1_valid and roi2_valid: # If ROI 2 is within ROI 1, treat it as the peak, # otherwise, use ROI 1 if peak1[0] >= peak2[0] and peak1[1] <= peak2[1]: roi_peak = peak1 roi_low_res = low_res1 elif peak2[0] >= peak1[0] and peak2[1] <= peak1[1]: roi_peak = peak2 roi_low_res = low_res2 else: roi_peak = peak1 roi_low_res = low_res1 # After all this, update the ROI according to reduction options self.roi_peak = roi_peak self.roi_low_res = roi_low_res self.meta_data_peak2 = peak2 if self.force_bck_roi == True: self.background = peak2 self.roi_background = peak2
[docs] def determine_data_type(self, ws): """ Inspect the data and determine peak locations and data type. :param workspace ws: Workspace to inspect """ # Skip empty data entries if ws.getNumberEvents() < self.n_events_cutoff: self.data_type = -1 logging.info("No data for %s %s" % (self.run_number, self.cross_section)) return # Find reflectivity peak and low resolution ranges # fitter = Fitter(ws, True) fitter = Fitter2(ws) peak, low_res = fitter.fit_2d_peak() self.found_peak = copy.copy(peak) self.found_low_res = copy.copy(low_res) logging.info("Run %s [%s]: Peak found %s" % (self.run_number, self.cross_section, peak)) logging.info("Run %s [%s]: Low-res found %s" % (self.run_number, self.cross_section, str(low_res))) # Process the ROI information try: self.process_roi(ws) except: logging.info("Could not process ROI\n%s" % sys.exc_info()[1]) # Keep track of whether we actually used the ROI self.use_roi_actual = False # If we were asked to use the ROI but no peak is in it, use the peak we found # If we were asked to use the ROI and there's a peak in it, use the ROI if self.use_roi and not self.update_peak_range and not self.roi_peak == [0, 0]: logging.warning(f"Using ROI peak range: {self.roi_peak}") self.use_roi_actual = True peak = copy.copy(self.roi_peak) if not self.roi_low_res == [0, 0]: low_res = copy.copy(self.roi_low_res) else: logging.warning(f"Using fit peak range: {peak}") # Background if self.use_tight_bck: bck_range = [int(max(0.0, peak[0] - self.bck_offset)), int(min(NX_PIXELS, peak[1] + self.bck_offset))] else: bck_range = self.background logging.warning(f"Using ROI low-res range: {low_res}") logging.warning(f"Using ROI background range: {bck_range}") # Store the information we found self.peak_position = (peak[1] + peak[0]) / 2.0 self.peak_range = [int(max(0, peak[0])), int(min(peak[1], NX_PIXELS))] self.low_res_range = [int(max(0, low_res[0])), int(min(low_res[1], NY_PIXELS))] self.background = [int(max(0, bck_range[0])), int(min(bck_range[1], NY_PIXELS))] # Computed scattering angle self.calculated_scattering_angle = api.MRGetTheta(ws, SpecularPixel=self.peak_position) self.calculated_scattering_angle *= 180.0 / math.pi # Determine whether we have a direct beam run_object = ws.getRun() try: self.is_direct_beam = run_object.getProperty("data_type").value[0] == 1 self.data_type = 0 if self.is_direct_beam else 1 except: self.is_direct_beam = False self.data_type = 1
[docs] def chi2(data, model): """Returns the chi^2 for a data set and model pair""" err = np.fabs(data) err[err <= 0] = 1 return np.sum((data - model) ** 2 / err) / len(data)
[docs] class Fitter2(object): DEAD_PIXELS = 10 def __init__(self, workspace): self.workspace = workspace self._prepare_data() def _prepare_data(self): """ Read in the data and create arrays for fitting """ # Prepare data to fit self.n_x = int(self.workspace.getInstrument().getNumberParameter("number-of-x-pixels")[0]) self.n_y = int(self.workspace.getInstrument().getNumberParameter("number-of-y-pixels")[0]) _integrated = api.Integration(InputWorkspace=self.workspace) signal = _integrated.extractY() self.z = np.reshape(signal, (self.n_x, self.n_y)) self.y = np.arange(0, self.n_y)[self.DEAD_PIXELS : -self.DEAD_PIXELS] # 1D data x/y vs counts self.x_vs_counts = np.sum(self.z, 1) self.y_vs_counts = np.sum(self.z, 0) self.guess_x = np.argmax(self.x_vs_counts) self.guess_wx = 6.0 def _scan_peaks(self): f1 = ndimage.gaussian_filter(self.x_vs_counts, 3) peaks, _ = find_peaks(f1) prom, _, _ = peak_prominences(f1, peaks) peaks_w, _, _, _ = peak_widths(f1, peaks) # The quality factor is the size of the peak (height*width) multiply by # a factor that peaks in the middle of the detector, where the peak usually is. nx = 304.0 delta = 100.0 mid_point = 150.0 quality_pos = np.exp(-((mid_point - peaks) ** 2.0) / 2000.0) low_peaks = peaks < delta high_peaks = peaks > nx - delta quality_pos[low_peaks] = quality_pos[low_peaks] * (1 - np.abs(delta - peaks[low_peaks]) / delta) ** 3 quality_pos[high_peaks] = quality_pos[high_peaks] * (1 - np.abs(nx - delta - peaks[high_peaks]) / delta) ** 3 quality = -peaks_w * prom * quality_pos zipped = zip(peaks, peaks_w, quality, prom) ordered = sorted(zipped, key=lambda a: a[2]) found_peaks = [p[0] for p in ordered] if found_peaks: # self.guess_x = ordered[0][0] # self.guess_ws = ordered[0][1] i_final = 0 if ( len(ordered) > 1 and (ordered[0][2] - ordered[1][2]) / ordered[0][2] < 0.75 and ordered[1][0] < ordered[0][0] ): i_final = 1 self.guess_x = ordered[i_final][0] self.guess_ws = ordered[i_final][1] return found_peaks
[docs] def fit_2d_peak(self): """Backward compatibility""" spec_peak = self.fit_peak() beam_peak = self.fit_beam_width() return spec_peak, beam_peak
[docs] def fit_peak(self): self.peaks = self._scan_peaks() # Package the best results x_min = max(0, int(self.guess_x - np.fabs(self.guess_wx))) x_max = min(self.n_x - 1, int(self.guess_x + np.fabs(self.guess_wx))) return [x_min, x_max]
[docs] def gaussian_1d(self, value, *p): """ 1D Gaussian """ A, center_x, width_x, background = p A = np.abs(A) values = A * np.exp(-((value - center_x) ** 2) / (2.0 * width_x**2)) values += background return values
[docs] def peak_derivative(self, value, *p): """ Double Gaussian to fit the first derivative of a plateau/peak. """ A, center_x, width_x, edge_width, background = p mu_right = center_x + width_x / 2.0 mu_left = center_x - width_x / 2.0 A = np.abs(A) values = A * np.exp(-((value - mu_left) ** 2) / (2.0 * edge_width**2)) - A * np.exp( -((value - mu_right) ** 2) / (2.0 * edge_width**2) ) values += background return values
def _perform_beam_fit(self, y_d, derivative, derivative_err, y_r=None, signal_r=None, gaussian_first=False): if gaussian_first: _running_err = np.sqrt(signal_r) _gauss, _ = opt.curve_fit( self.gaussian_1d, y_r, signal_r, p0=[np.max(signal_r), 140, 50, 0], sigma=_running_err ) p0 = [np.max(derivative), _gauss[1], 2.0 * _gauss[2], 5, 0] else: p0 = [np.max(derivative), 140, 60, 5, 0] # p = A, center_x, width_x, edge_width, background _coef, _ = opt.curve_fit(self.peak_derivative, y_d, derivative, p0=p0, sigma=derivative_err) return _coef
[docs] def fit_beam_width(self): """ Fit the data distribution in y and get its range. """ peak_min = 0 peak_max = self.n_x try: _integral = [np.sum(self.y_vs_counts[:i]) for i in range(len(self.y_vs_counts))] _running = 0.1 * np.convolve(self.y_vs_counts, np.ones(10), mode="valid") _deriv = np.asarray([_running[i + 1] - _running[i] for i in range(len(_running) - 1)]) _deriv_err = np.sqrt(_running)[:-1] _deriv_err[_deriv_err < 1] = 1 _y = np.arange(len(self.y_vs_counts))[5:-5] _coef = self._perform_beam_fit(_y, _deriv, _deriv_err, gaussian_first=False) peak_min = _coef[1] - np.abs(_coef[2]) / 2.0 - 2.0 * np.abs(_coef[3]) peak_max = _coef[1] + np.abs(_coef[2]) / 2.0 + 2.0 * np.abs(_coef[3]) if peak_max - peak_min < 10: logging.error("Low statisting: trying again") _y_running = self.y[5:-4] _coef = self._perform_beam_fit(_y, _deriv, _deriv_err, _y_running, _running, gaussian_first=True) self.guess_y = _coef[1] self.guess_wy = (peak_max - peak_min) / 2.0 peak_min = max(peak_min, self.DEAD_PIXELS) peak_max = min(peak_max, self.n_x - self.DEAD_PIXELS) except: logging.error("Could not fit the beam width") return [peak_min, peak_max]