"""Meta-data information for MR reduction."""
# pylint: disable=too-few-public-methods, wrong-import-position, too-many-instance-attributes, wrong-import-order
import copy
import logging
import math
import sys
import time
from typing import Tuple
import mantid.simpleapi as api
import numpy as np
import scipy.optimize as opt
from scipy import ndimage
from quicknxs.interfaces.data_handling.peak_finding import find_peaks, peak_prominences, peak_widths
NX_PIXELS = 304
NY_PIXELS = 256
[docs]
class DataInfo(object):
"""Class to hold the relevant information from a run (scattering or direct beam)."""
peak_range_offset = 0
tolerance = 0.02
def __init__(self, ws, cross_section, configuration):
self.cross_section = cross_section
self.run_number = ws.getRunNumber()
self.is_direct_beam = False
self.data_type = 1
self.peak_position = 0
self.peak_range = [0, 0]
self.low_res_range = [0, 0]
self.background = configuration.bck_roi
self.n_events_cutoff = 100
# ROI information
self.roi_peak = [0, 0]
self.roi_low_res = [0, 0]
self.roi_background = [0, 0]
# Options to override the ROI
self.forced_peak_roi = configuration.peak_roi
self.forced_low_res_roi = configuration.low_res_roi
self.use_metadata_bck_roi = configuration.use_metadata_bck_roi
self.forced_bck_roi = configuration.bck_roi
self.use_peak_finder = configuration.use_peak_finder
self.use_low_res_finder = configuration.use_low_res_finder
# Peak found before fitting for the central position
self.found_peak = [0, 0]
self.found_low_res = [0, 0]
# Processing options
# Use the ROI rather than finding the ranges
self.use_roi = configuration.use_roi
self.use_roi_actual = False
# Use background as a region on each side of the peak
self.use_tight_bck = configuration.use_tight_bck
# Width of the background on each side of the peak
self.bck_offset = configuration.bck_offset
# Update the specular peak range after finding the peak
# within the ROI
self.update_peak_range = configuration.update_peak_range
self.wl_bandwidth = configuration.wl_bandwidth
self.tof_range = self.get_tof_range(ws)
self.calculated_scattering_angle = 0.0
self.theta_d = 0.0
t_0 = time.time()
self.determine_data_type(ws)
logging.info("INSPECT: %s sec" % (time.time() - t_0))
[docs]
def get_tof_range(self, ws) -> Tuple[float, float]:
"""Determine TOF range from the data."""
run_object = ws.getRun()
sample_detector_distance = run_object["SampleDetDis"].getStatistics().mean
source_sample_distance = run_object["ModeratorSamDis"].getStatistics().mean
# Check units
if run_object["SampleDetDis"].units not in ["m", "meter"]:
sample_detector_distance /= 1000.0
if run_object["ModeratorSamDis"].units not in ["m", "meter"]:
source_sample_distance /= 1000.0
source_detector_distance = source_sample_distance + sample_detector_distance
h = 6.626e-34 # m^2 kg s^-1
m = 1.675e-27 # kg
wl = run_object.getProperty("LambdaRequest").value[0]
chopper_speed = run_object.getProperty("SpeedRequest1").value[0]
wl_offset = 0
cst = source_detector_distance / h * m
half_width = self.wl_bandwidth / 2.0
tof_min = cst * (wl + wl_offset * 60.0 / chopper_speed - half_width * 60.0 / chopper_speed) * 1e-4
tof_max = cst * (wl + wl_offset * 60.0 / chopper_speed + half_width * 60.0 / chopper_speed) * 1e-4
self.tof_range = [tof_min, tof_max]
return [tof_min, tof_max]
[docs]
def process_roi(self, ws):
"""Process the ROI information from a Mantid workspace.
Determines the peak range, low-resolution range, and background range.
Starting in June 2018, with the DAS upgrade, the ROIs are
specified with a start/width rather than start/stop.
"""
roi_peak = [0, 0]
roi_low_res = [0, 0]
# Read ROI 1
roi1_valid = True
if "ROI1StartX" in ws.getRun():
roi1_x0 = ws.getRun()["ROI1StartX"].getStatistics().mean
roi1_y0 = ws.getRun()["ROI1StartY"].getStatistics().mean
if "ROI1SizeX" in ws.getRun():
size_x = ws.getRun()["ROI1SizeX"].getStatistics().mean
size_y = ws.getRun()["ROI1SizeY"].getStatistics().mean
roi1_x1 = roi1_x0 + size_x
roi1_y1 = roi1_y0 + size_y
else:
roi1_x1 = ws.getRun()["ROI1EndX"].getStatistics().mean
roi1_y1 = ws.getRun()["ROI1EndY"].getStatistics().mean
if roi1_x1 > roi1_x0:
peak1 = [int(roi1_x0), int(roi1_x1)]
else:
peak1 = [int(roi1_x1), int(roi1_x0)]
if roi1_y1 > roi1_y0:
low_res1 = [int(roi1_y0), int(roi1_y1)]
else:
low_res1 = [int(roi1_y1), int(roi1_y0)]
if peak1 == [0, 0] and low_res1 == [0, 0]:
roi1_valid = False
# Read ROI 2
if "ROI2StartX" in ws.getRun():
roi2_valid = True
roi2_x0 = ws.getRun()["ROI2StartX"].getStatistics().mean
roi2_y0 = ws.getRun()["ROI2StartY"].getStatistics().mean
if "ROI2SizeX" in ws.getRun():
size_x = ws.getRun()["ROI2SizeX"].getStatistics().mean
size_y = ws.getRun()["ROI2SizeY"].getStatistics().mean
roi2_x1 = roi2_x0 + size_x
roi2_y1 = roi2_y0 + size_y
else:
roi2_x1 = ws.getRun()["ROI2EndX"].getStatistics().mean
roi2_y1 = ws.getRun()["ROI2EndY"].getStatistics().mean
if roi2_x1 > roi2_x0:
peak2 = [int(roi2_x0), int(roi2_x1)]
else:
peak2 = [int(roi2_x1), int(roi2_x0)]
if roi2_y1 > roi2_y0:
low_res2 = [int(roi2_y0), int(roi2_y1)]
else:
low_res2 = [int(roi2_y1), int(roi2_y0)]
if peak2 == [0, 0] and low_res2 == [0, 0]:
roi2_valid = False
else:
roi2_valid = False
else:
roi1_valid = False
roi2_valid = False
# Pick the ROI that describes the reflectivity peak
if roi1_valid and not roi2_valid:
roi_peak = peak1
roi_low_res = low_res1
roi_background = [0, 0]
elif roi2_valid and not roi1_valid:
roi_peak = peak2
roi_low_res = low_res2
roi_background = [0, 0]
elif roi1_valid and roi2_valid:
# If ROI 2 is within ROI 1, treat it as the peak,
# otherwise, use ROI 1
if peak1[0] >= peak2[0] and peak1[1] <= peak2[1]:
roi_peak = peak1
roi_low_res = low_res1
elif peak2[0] >= peak1[0] and peak2[1] <= peak1[1]:
roi_peak = peak2
roi_low_res = low_res2
else:
roi_peak = peak1
roi_low_res = low_res1
# After all this, update the ROI according to reduction options
self.roi_peak = roi_peak
self.roi_low_res = roi_low_res
self.metadata_peak2 = peak2
if self.use_metadata_bck_roi == True:
self.background = peak2
self.roi_background = peak2
[docs]
def determine_data_type(self, ws):
"""Inspect the data and determine peak locations and data type."""
# Skip empty data entries
if ws.getNumberEvents() < self.n_events_cutoff:
self.data_type = -1
logging.info("No data for %s %s" % (self.run_number, self.cross_section))
return
# Find reflectivity peak and low resolution ranges
# fitter = Fitter(ws, True)
fitter = Fitter2(ws)
peak, low_res = fitter.fit_2d_peak()
self.found_peak = copy.copy(peak)
self.found_low_res = copy.copy(low_res)
logging.info("Run %s [%s]: Peak found %s" % (self.run_number, self.cross_section, peak))
logging.info("Run %s [%s]: Low-res found %s" % (self.run_number, self.cross_section, str(low_res)))
# Process the ROI information
try:
self.process_roi(ws)
except:
logging.info("Could not process ROI\n%s" % sys.exc_info()[1])
# Keep track of whether we actually used the ROI
self.use_roi_actual = False
# If we were asked to use the ROI but no peak is in it, use the peak we found
# If we were asked to use the ROI and there's a peak in it, use the ROI
if self.use_roi and not self.update_peak_range and not self.roi_peak == [0, 0]:
logging.warning(f"Using ROI peak range: {self.roi_peak}")
self.use_roi_actual = True
peak = copy.copy(self.roi_peak)
if not self.roi_low_res == [0, 0]:
low_res = copy.copy(self.roi_low_res)
else:
logging.warning(f"Using fit peak range: {peak}")
# Background
if self.use_tight_bck:
bck_range = [int(max(0.0, peak[0] - self.bck_offset)), int(min(NX_PIXELS, peak[1] + self.bck_offset))]
else:
bck_range = self.background
logging.warning(f"Using ROI low-res range: {low_res}")
logging.warning(f"Using ROI background range: {bck_range}")
# Store the information we found
self.peak_position = (peak[1] + peak[0]) / 2.0
self.peak_range = [int(max(0, peak[0])), int(min(peak[1], NX_PIXELS))]
self.low_res_range = [int(max(0, low_res[0])), int(min(low_res[1], NY_PIXELS))]
self.background = [int(max(0, bck_range[0])), int(min(bck_range[1], NX_PIXELS))]
# Computed scattering angle
self.calculated_scattering_angle = api.MRGetTheta(ws, SpecularPixel=self.peak_position)
self.calculated_scattering_angle *= 180.0 / math.pi
# Determine whether we have a direct beam
run_object = ws.getRun()
try:
self.is_direct_beam = run_object.getProperty("data_type").value[0] == 1
self.data_type = 0 if self.is_direct_beam else 1
except:
self.is_direct_beam = False
self.data_type = 1
[docs]
def chi2(data, model):
"""Returns the chi^2 for a data set and model pair."""
err = np.fabs(data)
err[err <= 0] = 1
return np.sum((data - model) ** 2 / err) / len(data)
[docs]
class Fitter2(object):
"""Class to fit the data and find the peak and beam width."""
DEAD_PIXELS = 10
def __init__(self, workspace):
self.workspace = workspace
self._prepare_data()
def _prepare_data(self):
"""Read in the data and create arrays for fitting."""
# Prepare data to fit
self.n_x = int(self.workspace.getInstrument().getNumberParameter("number-of-x-pixels")[0])
self.n_y = int(self.workspace.getInstrument().getNumberParameter("number-of-y-pixels")[0])
_integrated = api.Integration(InputWorkspace=self.workspace)
signal = _integrated.extractY()
self.z = np.reshape(signal, (self.n_x, self.n_y))
self.y = np.arange(0, self.n_y)[self.DEAD_PIXELS : -self.DEAD_PIXELS]
# 1D data x/y vs counts
self.x_vs_counts = np.sum(self.z, 1)
self.y_vs_counts = np.sum(self.z, 0)
self.guess_x = np.argmax(self.x_vs_counts)
self.guess_wx = 6.0
def _scan_peaks(self):
f1 = ndimage.gaussian_filter(self.x_vs_counts, 3)
peaks, _ = find_peaks(f1)
prom, _, _ = peak_prominences(f1, peaks)
peaks_w, _, _, _ = peak_widths(f1, peaks)
# The quality factor is the size of the peak (height*width) multiply by
# a factor that peaks in the middle of the detector, where the peak usually is.
nx = 304.0
delta = 100.0
mid_point = 150.0
quality_pos = np.exp(-((mid_point - peaks) ** 2.0) / 2000.0)
low_peaks = peaks < delta
high_peaks = peaks > nx - delta
quality_pos[low_peaks] = quality_pos[low_peaks] * (1 - np.abs(delta - peaks[low_peaks]) / delta) ** 3
quality_pos[high_peaks] = quality_pos[high_peaks] * (1 - np.abs(nx - delta - peaks[high_peaks]) / delta) ** 3
quality = -peaks_w * prom * quality_pos
zipped = zip(peaks, peaks_w, quality, prom)
ordered = sorted(zipped, key=lambda a: a[2])
found_peaks = [p[0] for p in ordered]
if found_peaks:
# self.guess_x = ordered[0][0]
# self.guess_ws = ordered[0][1]
i_final = 0
if (
len(ordered) > 1
and (ordered[0][2] - ordered[1][2]) / ordered[0][2] < 0.75
and ordered[1][0] < ordered[0][0]
):
i_final = 1
self.guess_x = ordered[i_final][0]
self.guess_ws = ordered[i_final][1]
return found_peaks
[docs]
def fit_2d_peak(self):
"""Backward compatibility."""
spec_peak = self.fit_peak()
beam_peak = self.fit_beam_width()
return spec_peak, beam_peak
[docs]
def fit_peak(self):
self.peaks = self._scan_peaks()
# Package the best results
x_min = max(0, int(self.guess_x - np.fabs(self.guess_wx)))
x_max = min(self.n_x - 1, int(self.guess_x + np.fabs(self.guess_wx)))
return [x_min, x_max]
[docs]
def gaussian_1d(self, value, *p):
"""1D Gaussian."""
A, center_x, width_x, background = p
A = np.abs(A)
values = A * np.exp(-((value - center_x) ** 2) / (2.0 * width_x**2))
values += background
return values
[docs]
def peak_derivative(self, value, *p):
"""Double Gaussian to fit the first derivative of a plateau/peak."""
A, center_x, width_x, edge_width, background = p
mu_right = center_x + width_x / 2.0
mu_left = center_x - width_x / 2.0
A = np.abs(A)
values = A * np.exp(-((value - mu_left) ** 2) / (2.0 * edge_width**2)) - A * np.exp(
-((value - mu_right) ** 2) / (2.0 * edge_width**2)
)
values += background
return values
def _perform_beam_fit(self, y_d, derivative, derivative_err, y_r=None, signal_r=None, gaussian_first=False):
if gaussian_first:
_running_err = np.sqrt(signal_r)
_gauss, _ = opt.curve_fit(
self.gaussian_1d, y_r, signal_r, p0=[np.max(signal_r), 140, 50, 0], sigma=_running_err
)
p0 = [np.max(derivative), _gauss[1], 2.0 * _gauss[2], 5, 0]
else:
p0 = [np.max(derivative), 140, 60, 5, 0]
# p = A, center_x, width_x, edge_width, background
_coef, _ = opt.curve_fit(self.peak_derivative, y_d, derivative, p0=p0, sigma=derivative_err)
return _coef
[docs]
def fit_beam_width(self):
"""Fit the data distribution in y and get its range."""
peak_min = 0
peak_max = self.n_x
try:
_integral = [np.sum(self.y_vs_counts[:i]) for i in range(len(self.y_vs_counts))]
_running = 0.1 * np.convolve(self.y_vs_counts, np.ones(10), mode="valid")
_deriv = np.asarray([_running[i + 1] - _running[i] for i in range(len(_running) - 1)])
_deriv_err = np.sqrt(_running)[:-1]
_deriv_err[_deriv_err < 1] = 1
_y = np.arange(len(self.y_vs_counts))[5:-5]
_coef = self._perform_beam_fit(_y, _deriv, _deriv_err, gaussian_first=False)
peak_min = _coef[1] - np.abs(_coef[2]) / 2.0 - 2.0 * np.abs(_coef[3])
peak_max = _coef[1] + np.abs(_coef[2]) / 2.0 + 2.0 * np.abs(_coef[3])
if peak_max - peak_min < 10:
logging.error("Low statisting: trying again")
_y_running = self.y[5:-4]
_coef = self._perform_beam_fit(_y, _deriv, _deriv_err, _y_running, _running, gaussian_first=True)
self.guess_y = _coef[1]
self.guess_wy = (peak_max - peak_min) / 2.0
peak_min = max(peak_min, self.DEAD_PIXELS)
peak_max = min(peak_max, self.n_x - self.DEAD_PIXELS)
except:
logging.error("Could not fit the beam width")
return [peak_min, peak_max]