"""Computations for GISANS."""
import logging
from multiprocessing import Pool
from typing import TYPE_CHECKING, List
import numpy as np
from quicknxs.interfaces.configuration import get_direct_beam_low_res_roi
if TYPE_CHECKING:
from quicknxs.interfaces.data_handling.data_set import CrossSectionData, NexusData
H_OVER_M_NEUTRON = 3.956034e-7 # h/m_n [m^2/s]
[docs]
class GISANS(object):
"""Compute grazing-incident SANS."""
def __init__(self, cross_section_data: "CrossSectionData"):
"""Initialize GISANS calculations.
The calculations here are meant to match QuickNXS v1.
The following are items to improve on:
- Background subtraction
- Trim the TOF distribution to the chopper bandwidth
"""
self.data_set = cross_section_data
def __call__(self, direct_beam=None):
x_pos = self.data_set.configuration.peak_position
y_pos = self.data_set.configuration.low_res_position
scale = 1.0 / self.data_set.proton_charge * self.data_set.configuration.scaling_factor
rad_per_pixel = self.data_set.det_size_x / self.data_set.dist_sam_det / self.data_set.xydata.shape[1]
# QuickNXS v1 uses the whole detector for GISANS calculations and doesn't trim the edges
# active_area_x = self.data_set.active_area_x
# active_area_y = self.data_set.active_area_y
active_area_x = [0, 304]
active_area_y = [0, 256]
xtth = self.data_set.direct_pixel - np.arange(self.data_set.data.shape[0])[active_area_x[0] : active_area_x[1]]
pix_offset_spec = self.data_set.direct_pixel - x_pos
delta_dangle = self.data_set.dangle - self.data_set.angle_offset
tth_spec = delta_dangle * np.pi / 180.0 + pix_offset_spec * rad_per_pixel
af = delta_dangle * np.pi / 180.0 + xtth * rad_per_pixel - tth_spec / 2.0
ai = np.ones_like(af) * tth_spec / 2.0
phi = (np.arange(self.data_set.data.shape[1])[active_area_y[0] : active_area_y[1]] - y_pos) * rad_per_pixel
# To be compatible with QuickNXS v1, take the whole TOF range rather than trimming the edges
# ws = self.data_set.event_workspace
# tof_edges = np.arange(ws.getTofMin(), ws.getTofMax(), self.data_set.configuration.tof_bins)
tof_edges = self.data_set.tof_edges
v_edges = self.data_set.dist_mod_det / tof_edges * 1e6 # m/s
lambda_edges = H_OVER_M_NEUTRON / v_edges * 1e10 # A
wavelengths = np.asarray((lambda_edges[:-1] + lambda_edges[1:]) / 2.0)
k = 2.0 * np.pi / wavelengths
# calculate ROI intensities and normalize by number of points
# Note: the number of points here may need to be adjusted from the specular reflectivity
# calculation if we used a final rebining.
P0 = self.data_set.configuration.cut_first_n_points
PN = len(self.data_set.tof) - self.data_set.configuration.cut_last_n_points
self.wavelengths = wavelengths[P0:PN]
# calculate reciprocal space, incident and outgoing perpendicular wave vectors
self.Qy = k[np.newaxis, np.newaxis, P0:PN] * (np.sin(phi) * np.cos(af)[:, np.newaxis])[:, :, np.newaxis]
p_i = k[np.newaxis, np.newaxis, P0:PN] * ((0.0 * phi) + np.sin(ai)[:, np.newaxis])[:, :, np.newaxis]
self.p_f = k[np.newaxis, np.newaxis, P0:PN] * ((0.0 * phi) + np.sin(af)[:, np.newaxis])[:, :, np.newaxis]
self.Qz = p_i + self.p_f
raw = self.data_set.data[active_area_x[0] : active_area_x[1], active_area_y[0] : active_area_y[1], P0:PN]
intensity = scale * np.array(raw)
d_intensity = scale * np.sqrt(raw)
if direct_beam is not None:
if not direct_beam.configuration.tof_bins == self.data_set.configuration.tof_bins:
logging.error("Trying to normalize with a direct beam data set with different binning")
norm_y_min, norm_y_max = get_direct_beam_low_res_roi(self.data_set.configuration, direct_beam.configuration)
norm_x_min, norm_x_max = direct_beam.configuration.peak_roi
norm_raw_multi_dim = direct_beam.data[norm_x_min:norm_x_max, norm_y_min:norm_y_max, P0:PN]
norm_raw = norm_raw_multi_dim.sum(axis=0).sum(axis=0)
norm_d_raw = np.sqrt(norm_raw)
norm_scale = (float(norm_x_max) - float(norm_x_min)) * (float(norm_y_max) - float(norm_y_min))
norm_raw /= norm_scale * direct_beam.proton_charge
norm_d_raw /= norm_scale * direct_beam.proton_charge
idxs = norm_raw > 0.0
d_intensity[:, :, idxs] = np.sqrt(
(d_intensity[:, :, idxs] / norm_raw[idxs][np.newaxis, np.newaxis, :]) ** 2
+ (
intensity[:, :, idxs]
/ norm_raw[idxs][np.newaxis, np.newaxis, :] ** 2
* norm_d_raw[idxs][np.newaxis, np.newaxis, :]
)
** 2
)
intensity[:, :, idxs] /= norm_raw[idxs][np.newaxis, np.newaxis, :]
intensity[:, :, np.logical_not(idxs)] = 0.0
d_intensity[:, :, np.logical_not(idxs)] = 0.0
self.S = intensity
self.dS = d_intensity
# Create plotting data
# TODO: use options to plot the right data (for instance: qz or pf)
self.SGrid, qy, qz = np.histogram2d(
self.Qy.flatten(), self.Qz.flatten(), bins=(50, 50), weights=intensity.flatten()
)
npoints, _, _ = np.histogram2d(self.Qy.flatten(), self.Qz.flatten(), bins=(50, 50))
self.SGrid[npoints > 0] /= npoints[npoints > 0]
self.SGrid = self.SGrid.transpose()
qy = (qy[:-1] + qy[1:]) / 2.0
qz = (qz[:-1] + qz[1:]) / 2.0
self.QyGrid, self.QzGrid = np.meshgrid(qy, qz)
[docs]
def merge(reduction_list: List["NexusData"], pol_state: str, wl_min: float = 0, wl_max: float = 100):
"""Merge the off-specular data from a reduction list.
The scaling factors should have been determined at this point.
Just use them to merge the different runs in a set.
TODO: This doesn't deal with the overlap properly.
It assumes that the user has cut the overlapping points by hand.
Parameters
----------
reduction_list:
List of NexusData objects
pol_state:
Polarization state to consider
wl_min:
Minimum wavelength to consider
wl_max:
Maximum wavelength to consider
"""
_qy = np.zeros(0)
_qz = np.zeros(0)
_pf = np.zeros(0)
_s = np.zeros(0)
_ds = np.zeros(0)
_wl = np.zeros(0)
for item in reduction_list:
gisans = item.cross_sections[pol_state].gisans_data
Qy, Qz, pf, S, dS = (gisans.Qy, gisans.Qz, gisans.p_f, gisans.S, gisans.dS)
# Filter according to wavelength
filtered = np.where((gisans.wavelengths >= wl_min) & (gisans.wavelengths <= wl_max))
_qy = np.concatenate((_qy, Qy[:, :, filtered].flatten()))
_qz = np.concatenate((_qz, Qz[:, :, filtered].flatten()))
_pf = np.concatenate((_pf, pf[:, :, filtered].flatten()))
_s = np.concatenate((_s, S[:, :, filtered].flatten()))
_ds = np.concatenate((_ds, dS[:, :, filtered].flatten()))
_wl = np.concatenate((_wl, gisans.wavelengths[filtered]))
return _qy, _qz, _pf, _s, _ds, _wl
def _rebin_proc(data: dict):
# Filter data
wl = data["wl"]
filtered = np.where((wl >= data["wl_min"]) & (wl <= data["wl_max"]))
qy = data["qy"][filtered]
_z_axis = data["qz"][filtered]
intensity = data["intensity"][filtered]
d_intensity = data["d_intensity"][filtered]
# Perform binning
n_points, _qy, _qz_axis = np.histogram2d(qy, _z_axis, bins=data["binning"])
_intensity_summed, _, _ = np.histogram2d(qy, _z_axis, bins=(_qy, _qz_axis), weights=intensity)
_intensity_err, _, _ = np.histogram2d(qy, _z_axis, bins=(_qy, _qz_axis), weights=d_intensity**2)
_intensity_summed[n_points > 0] /= n_points[n_points > 0]
_intensity_err = np.sqrt(_intensity_err)
_intensity_err[n_points > 0] /= n_points[n_points > 0]
_qy = (_qy[:-1] + _qy[1:]) / 2.0
_qz_axis = (_qz_axis[:-1] + _qz_axis[1:]) / 2.0
return _intensity_summed, _qy, _qz_axis, _intensity_err
[docs]
def rebin_parallel(
reduction_list: List["NexusData"],
pol_state: str,
wl_min: float,
wl_max: float,
wl_npts: int = 2,
qy_npts: int = 50,
qz_npts: int = 50,
use_pf: bool = False,
):
"""Process the wavelength bands in parallel."""
# First, merge all the data
binning = (qy_npts + 1, qz_npts + 1)
qy, qz, pf, intensity, d_intensity, wl_array = merge(reduction_list, pol_state, wl_min=0, wl_max=100.0)
if use_pf:
_z_axis = pf
else:
_z_axis = qz
# Create one job per wavelength band
inputs = []
for i in range(wl_npts):
wl_step = (wl_max - wl_min) / wl_npts
_wl_min = wl_min + i * wl_step
_wl_max = wl_min + (i + 1) * wl_step
_d = dict(
qy=qy,
qz=_z_axis,
intensity=intensity,
d_intensity=d_intensity,
wl=wl_array,
wl_min=_wl_min,
wl_max=_wl_max,
binning=binning,
)
inputs.append(_d)
pool = Pool(wl_npts)
results = pool.map(_rebin_proc, inputs)
return results