"""Data processing workflow, taking results and writing them to files."""
# pylint: disable=bare-except, too-many-locals
import copy
import io
import logging
import math
import os
import smtplib
import sys
import time
import zipfile
from email import encoders
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Dict, Optional, Tuple
import numpy as np
from mantid.simpleapi import CopyLogs, CreateWorkspace
from mr_reduction import io_orso
from mr_reduction.types import MantidWorkspace
from quicknxs.config import Settings
from quicknxs.interfaces.configuration import Configuration
from quicknxs.interfaces.data_handling import data_manipulation, gisans, off_specular, quicknxs_io
from quicknxs.interfaces.data_manager import DataManager
from quicknxs.interfaces.event_handlers.progress_reporter import ProgressReporter
DEFAULT_OPTIONS = dict(
export_specular=True,
export_asym=False,
export_gisans=False,
export_offspec=False,
export_offspec_smooth=False,
format_matlab=False,
format_mantid=True,
format_numpy=False,
format_5cols=False,
output_sample_size=10,
output_directory="",
output_file_template="{instrument}_{numbers}_{peak}_{item}_{state}.{type}",
email_send=False,
email_zip_data=False,
email_send_plots=False,
email_send_data=False,
email_to="",
email_cc="",
email_subject="",
email_test="",
)
[docs]
class ProcessingWorkflow(object):
"""Carry out the reduction process for a set of data runs and manages outputs."""
def __init__(self, data_manager: DataManager, output_options: Optional[dict] = None):
"""All the reduced data shall come from data manager."""
self.data_manager = data_manager
self.output_options = output_options if output_options else DEFAULT_OPTIONS
self.exported_data_files = []
self.exported_data_plots = []
[docs]
def execute(self, progress: Optional[ProgressReporter] = None):
"""Process data and write output files."""
if not self.data_manager.reduction_states:
return
# store current peak shown in the UI
active_peak = self.data_manager.active_reduction_list_index
active_data_index = self.data_manager.find_active_data_id()
for peak_index in self.data_manager.peak_reduction_lists.keys():
# set active data based on peak index
self.data_manager.set_active_reduction_list_index(peak_index)
self.data_manager.set_active_data_from_reduction_list(0)
if self.output_options["export_specular"]:
if progress is not None:
progress(10, "Computing reflectivity")
self.specular_reflectivity()
if self.output_options["export_offspec"] or self.output_options["export_offspec_smooth"]:
if progress is not None:
progress(20, "Computing off-specular reflectivity")
self.offspec(
raw=self.output_options["export_offspec"], binned=self.output_options["export_offspec_smooth"]
)
if self.output_options["export_gisans"]:
if progress is not None:
progress(60, "Computing GISANS")
# FIXME 66 - could be an AttributeError from self.gisans(). Catch it!
# -> Is this good enough?
try:
self.gisans(progress=progress)
except AttributeError:
logging.error("GISANS failed: %s", sys.exc_info()[1])
# restore current peak shown in the UI
self.data_manager.set_active_reduction_list_index(active_peak)
if active_data_index is not None:
self.data_manager.set_active_data_from_reduction_list(active_data_index)
if self.output_options["email_send"]:
self.send_email()
if progress is not None:
progress(100, "Complete")
[docs]
def get_file_name(
self,
run_list: Optional[list] = None,
pol_state: Optional[str] = None,
data_type: str = "dat",
process_type: str = "Specular",
):
"""Construct a file name according to the measurement type.
Parameters
----------
run_list:
List of run numbers
pol_state:
Name for the polarization state
data_type:
File extension
process_type:
Descriptor for the process type
"""
if run_list is None:
run_list = []
base_name = self.output_options["output_file_template"].replace("{numbers}", "+".join(run_list))
base_name = base_name.replace(
"{instrument}", self.data_manager.active_cross_section.configuration.instrument.instrument_name
)
base_name = base_name.replace("{item}", process_type)
if pol_state is not None:
base_name = base_name.replace("{state}", pol_state)
base_name = base_name.replace("{type}", data_type)
base_name = base_name.replace("{peak}", f"peak{self.data_manager.active_reduction_list_index}")
return os.path.join(self.output_options["output_directory"], base_name)
[docs]
def write_quicknxs(
self,
output_data: Dict[str, np.ndarray],
output_file_base: str,
xs: Optional[list] = None,
include_gisans: bool = False,
include_offspec: bool = False,
):
"""Write QuickNXS output reflectivity file.
Parameters
----------
output_data:
Dictionary of numpy arrays
output_file_base:
Template for output file paths
xs:
List of cross-sections available in the output_data
include_gisans:
Include GISANS specifc variables in the reduction output file
include_offspec:
Include Offspec specific variables in the reduction output file
"""
# Get the column names
units = output_data["units"]
cols = output_data["columns"]
col_names = ["%s [%s]" % (cols[i], units[i]) for i in range(len(cols))]
# List of all output states we have to deal with
if xs is not None:
output_states = xs
else:
output_states = copy.copy(self.data_manager.reduction_states)
if self.output_options["export_asym"] and "SA" in output_data:
output_states.append("SA")
# Sanity check
if len(output_states) == 0:
return
# Write out the cross-section data
five_cols = self.output_options["format_5cols"]
for pol_state in output_states:
# The cross-sections might have different names
if pol_state in self.data_manager.reduction_list[0].cross_sections:
_pol_state = self.data_manager.reduction_list[0].cross_sections[pol_state].cross_section_label
else:
_pol_state = pol_state
# We might not have data for a given cross-section
if pol_state not in output_data:
continue
state_output_path = output_file_base.replace("{state}", pol_state)
quicknxs_io.write_reflectivity_header(
self.data_manager.peak_reduction_lists,
self.data_manager.active_reduction_list_index,
self.data_manager.direct_beam_list,
state_output_path,
_pol_state,
include_gisans,
include_offspec,
)
quicknxs_io.write_reflectivity_data(state_output_path, output_data[pol_state], col_names, as_5col=five_cols)
self.exported_data_files.append(state_output_path)
[docs]
def write_orso(self, output_data: Dict[str, np.ndarray]):
"""
Save individual and combined reflectivity curves to ORSO format.
This function collects reflectivity data and metadata and saves one individual ORSO ASCII
file per run and peak and one combined (stitched) ORSO ASCII file per peak.
Parameters
----------
output_data: Dict[str, np.ndarray]
Dictionary of scaled and stitched data where the key is the cross-section name and the
value is an array of reflectivity data with shape (number of Q points, 5), where the 5
columns are:
- Qz: normal momentum transfer
- R: reflectivity
- dR: reflectivity error
- dQz: normal momentum transfer error
- theta: scattering angle
"""
# Save the individual runs to ORSO
individual_paths = {}
for nexus_data in self.data_manager.reduction_list:
run = str(nexus_data.number)
reflectivity_workspaces = nexus_data.get_reflectivity_workspace_group()
filepath = self.get_file_name([run], "all", "ort")
io_orso.save_cross_sections(reflectivity_workspaces, filepath)
individual_paths[run] = filepath
def _create_combined_reflectivity_workspace(_ws: MantidWorkspace, _xs: str):
"""Create a combined reflectivity workspace.
Create a new workspace with metadata copied from the given workspace
and output data for the given cross-section
"""
output_xs = output_data[_xs]
_ws_combined = CreateWorkspace(
DataX=output_xs[:, 0],
DataY=output_xs[:, 1],
DataE=output_xs[:, 2],
Dx=output_xs[:, 3],
OutputWorkspace=str(_ws) + "-combined",
)
CopyLogs(_ws, _ws_combined, MergeStrategy="WipeExisting")
return _ws_combined
# Assemble the combined reflectivity workspaces
combined_reflectivity_workspaces = []
cross_sections = self.data_manager.reduction_states
for xs in cross_sections:
# use the first reflectivity workspace to copy metadata from
ws_first = self.data_manager.reduction_list[0].cross_sections[xs].reflectivity_workspace
# create a workspace from the stitched reflectivity data and add metadata
ws_combined = _create_combined_reflectivity_workspace(ws_first, xs)
combined_reflectivity_workspaces.append(ws_combined)
# Save the combined reflectivity to ORSO
run_list = [str(item.number) for item in self.data_manager.reduction_list]
combined_path = self.get_file_name(run_list, "all", "ort")
io_orso.save_cross_sections(combined_reflectivity_workspaces, combined_path)
[docs]
def specular_reflectivity(self):
"""Retrieve the computed reflectivity and save it to file."""
# The reflectivity should always be up to date, so we don't need to recalculate it.
# The following would be used to recalculate it:
# self.data_manager.calculate_reflectivity(specular=True)
# self.data_manager.merge_data_sets(asymmetry=self.output_options['export_asym'])
run_list = [str(item.number) for item in self.data_manager.reduction_list]
output_data = self.get_output_data()
# QuickNXS format
output_file_base = self.get_file_name(run_list)
self.write_quicknxs(output_data, output_file_base)
# ORSO format
self.write_orso(output_data)
# Numpy arrays
if self.output_options["format_numpy"]:
output_file = self.get_file_name(run_list, data_type="npz", pol_state="all")
np.savez(output_file, **output_data)
self.exported_data_files.append(output_file)
# Matlab output
if self.output_options["format_matlab"]:
try:
from scipy.io import savemat
output_file = self.get_file_name(run_list, data_type="mat", pol_state="all")
savemat(output_file, output_data, oned_as="column")
self.exported_data_files.append(output_file)
except:
logging.error("Could not save in matlab format: %s", sys.exc_info([1]))
if self.output_options["format_mantid"]:
output_file = self.get_file_name(run_list, data_type="py", pol_state="all")
script = data_manipulation.generate_short_script(self.data_manager.reduction_list)
with open(output_file, "w") as file_object:
file_object.write(script)
self.exported_data_files.append(output_file)
[docs]
def gisans(self, progress=None):
"""Export GISANS."""
run_list = [str(item.number) for item in self.data_manager.reduction_list]
# Refresh the reflectivity calculation
if progress is not None:
progress(65, "Reducing GISANS...")
self.data_manager.cached_gisans = None
self.data_manager.reduce_gisans(progress=None)
if progress is not None:
progress(75, "Binning GISANS...")
data_dict, slice_data_dict = self.get_gisans_data(progress=None)
self.data_manager.cached_gisans = data_dict
if progress is not None:
progress(90, "Writing data")
output_file_base = self.get_file_name(run_list, process_type="GISANS")
self.write_quicknxs(data_dict, output_file_base, xs=data_dict["cross_sections"].keys(), include_gisans=True)
# Write out slice data
if slice_data_dict is not None and "cross_sections" in slice_data_dict:
output_file_base = self.get_file_name(run_list, process_type="GISANSSlice")
self.write_quicknxs(
slice_data_dict, output_file_base, xs=slice_data_dict["cross_sections"].keys(), include_gisans=True
)
if progress is not None:
progress(100, "GISANS complete")
[docs]
def offspec(self, raw: bool = True, binned: bool = False):
"""Export off-specular reflectivity.
Parameters
----------
raw:
If true, the raw results will be saved
binned:
If true, the raw results will be binned and saved
"""
run_list = [str(item.number) for item in self.data_manager.reduction_list]
# Refresh the reflectivity calculation
self.data_manager.cached_offspec = None
self.data_manager.reduce_offspec()
if raw or binned:
output_data = self.get_offspec_data()
# Export raw result
if raw:
# QuickNXS format
output_file_base = self.get_file_name(run_list, process_type="OffSpec")
self.write_quicknxs(output_data, output_file_base, include_offspec=True)
# Export binned result
if binned:
if self.data_manager.active_cross_section.configuration.apply_smoothing:
# "Smooth" version
try:
smooth_output, slice_data_dict = self.smooth_offspec(output_data)
output_file_base = self.get_file_name(run_list, process_type="OffSpecSmooth")
self.write_quicknxs(smooth_output, output_file_base, include_offspec=True)
if slice_data_dict is not None and "cross_sections" in slice_data_dict:
output_file_base = self.get_file_name(run_list, process_type="OffSpecSmoothSlice")
self.write_quicknxs(
slice_data_dict,
output_file_base,
xs=slice_data_dict["cross_sections"].keys(),
include_offspec=True,
)
self.data_manager.cached_offspec = smooth_output
except:
raise
logging.error("Problem writing smooth off-spec output")
else:
# Binned version
binned_data, slice_data_dict = self.get_rebinned_offspec_data()
# QuickNXS format ['smooth' is an odd name but we keep it for backward compatibility]
output_file_base = self.get_file_name(run_list, process_type="OffSpecBinned")
self.write_quicknxs(binned_data, output_file_base, include_offspec=True)
if slice_data_dict is not None and "cross_sections" in slice_data_dict:
output_file_base = self.get_file_name(run_list, process_type="OffSpecSlice")
self.write_quicknxs(
slice_data_dict,
output_file_base,
xs=slice_data_dict["cross_sections"].keys(),
include_offspec=True,
)
self.data_manager.cached_offspec = binned_data
[docs]
def get_rebinned_offspec_data(self):
"""Get a data dictionary ready for saving."""
data_dict = None
slice_data_dict = {}
# Extract common information
if len(self.data_manager.reduction_states) == 0:
logging.error("List of cross-sections is empty")
return {}
for pol_state in self.data_manager.reduction_states:
r, dr, x, y, labels = off_specular.rebin_extract(
self.data_manager.reduction_list,
pol_state,
axes=self.data_manager.active_cross_section.configuration.off_spec_x_axis,
use_weights=self.data_manager.active_cross_section.configuration.off_spec_err_weight,
n_bins_x=self.data_manager.active_cross_section.configuration.off_spec_nxbins,
n_bins_y=self.data_manager.active_cross_section.configuration.off_spec_nybins,
x_min=self.data_manager.active_cross_section.configuration.off_spec_x_min,
x_max=self.data_manager.active_cross_section.configuration.off_spec_x_max,
y_min=self.data_manager.active_cross_section.configuration.off_spec_y_min,
y_max=self.data_manager.active_cross_section.configuration.off_spec_y_max,
)
if data_dict is None:
data_dict = dict(
units=["1/A", "1/A", "a.u.", "a.u."], columns=[labels[0], labels[1], "I", "dI"], cross_sections={}
)
# Create array of x-values
x_tiled = np.tile(x, len(y))
x_tiled = x_tiled.reshape([len(y), len(x)])
# Create array of y-values
y_tiled = np.tile(y, len(x))
y_tiled = y_tiled.reshape([len(x), len(y)])
y_tiled = y_tiled.T
rdata = np.array([x_tiled, y_tiled, r, dr]).transpose((1, 2, 0))
if pol_state in self.data_manager.reduction_list[0].cross_sections:
_pol_state = self.data_manager.reduction_list[0].cross_sections[pol_state].cross_section_label
else:
_pol_state = pol_state
data_dict[pol_state] = [np.nan_to_num(rdata)]
data_dict["cross_sections"][pol_state] = _pol_state
# Slices
slice_data_dict = self.get_slice_output_data(x, y, r, dr, pol_state, labels[1], **slice_data_dict)
return data_dict, slice_data_dict
[docs]
def get_gisans_data(self, progress=None) -> dict | Tuple[dict, dict]:
wl_npts = self.data_manager.active_cross_section.configuration.gisans_wl_npts
wl_min = self.data_manager.active_cross_section.configuration.gisans_wl_min
wl_max = self.data_manager.active_cross_section.configuration.gisans_wl_max
qy_npts = self.data_manager.active_cross_section.configuration.gisans_qy_npts
qz_npts = self.data_manager.active_cross_section.configuration.gisans_qz_npts
use_pf = self.data_manager.active_cross_section.configuration.gisans_use_pf
data_dict = {"units": ["1/A", "1/A", "a.u.", "a.u."], "cross_sections": {}, "cross_section_bins": {}}
if use_pf:
data_dict["columns"] = ["Qy", "pf", "I", "dI"]
else:
data_dict["columns"] = ["Qy", "Qz", "I", "dI"]
# Extract common information
if not self.data_manager.reduction_states or not self.data_manager.reduction_list:
logging.error("List of cross-sections is empty")
return data_dict
t_0 = time.time()
_parallel = False
slice_data_dict = {}
for pol_state in self.data_manager.reduction_states:
if _parallel:
binned_data = gisans.rebin_parallel(
self.data_manager.reduction_list,
pol_state,
wl_min=wl_min,
wl_max=wl_max,
wl_npts=wl_npts,
qy_npts=qy_npts,
qz_npts=qz_npts,
use_pf=use_pf,
)
data_dict["cross_section_bins"][pol_state] = []
for i in range(wl_npts):
wl_step = (wl_max - wl_min) / wl_npts
_wl_min = wl_min + i * wl_step
_wl_max = wl_min + (i + 1) * wl_step
if _parallel:
_intensity, _qy, _qz_axis, _intensity_err = binned_data[i]
else:
_intensity, _qy, _qz_axis, _intensity_err = self.data_manager.rebin_gisans(
pol_state, wl_min=_wl_min, wl_max=_wl_max, qy_npts=qy_npts, qz_npts=qz_npts, use_pf=use_pf
)
qz, qy = np.meshgrid(_qz_axis, _qy)
rdata = np.array([qy, qz, _intensity, _intensity_err]).transpose((1, 2, 0))
if pol_state in self.data_manager.reduction_list[0].cross_sections:
_pol_state = self.data_manager.reduction_list[0].cross_sections[pol_state].cross_section_label
else:
_pol_state = pol_state
_pol_state = "%.3f-%.3f_%s" % (_wl_min, _wl_max, pol_state)
_pol_state_clean = "%.3f-%.3f_%s" % (_wl_min, _wl_max, _pol_state)
data_dict[_pol_state] = [np.nan_to_num(rdata)]
data_dict["cross_sections"][_pol_state] = _pol_state_clean
data_dict["cross_section_bins"][pol_state].append(_pol_state)
# Slices
label = "pf" if use_pf else "Qz"
slice_data_dict = self.get_gisans_slice_output_data(
_qy, _qz_axis, _intensity.T, _intensity_err.T, _pol_state, label, **slice_data_dict
)
logging.info("GISANS processing time: %s sec", (time.time() - t_0))
return data_dict, slice_data_dict
[docs]
def get_offspec_data(self):
"""Get a data dictionary ready for saving."""
data_dict = dict(
units=["1/A", "1/A", "1/A", "1/A", "1/A", "a.u.", "a.u."],
columns=["Qx", "Qz", "ki_z", "kf_z", "ki_z-kf_z", "I", "dI"],
cross_sections={},
)
# Extract common information
if not self.data_manager.reduction_states or not self.data_manager.reduction_list:
logging.error("List of cross-sections is empty")
return data_dict
first_state = self.data_manager.reduction_states[0]
p_0 = [
item.cross_sections[first_state].configuration.cut_first_n_points
for item in self.data_manager.reduction_list
]
p_n = [
item.cross_sections[first_state].configuration.cut_last_n_points
for item in self.data_manager.reduction_list
]
ki_max = 0.01
for pol_state in self.data_manager.reduction_states:
# The scaling factors should have been determined at this point. Just use them
# to merge the different runs in a set.
combined_data = []
for item in self.data_manager.reduction_list:
offspec = item.cross_sections[pol_state].off_spec
Qx, Qz, ki_z, kf_z, S, dS = (offspec.Qx, offspec.Qz, offspec.ki_z, offspec.kf_z, offspec.S, offspec.dS)
n_total = len(S[0])
# P_0 and P_N are the number of points to cut in TOF on each side
p_0 = item.cross_sections[pol_state].configuration.cut_first_n_points
p_n = n_total - item.cross_sections[pol_state].configuration.cut_last_n_points
rdata = np.array(
[
Qx[:, p_0:p_n],
Qz[:, p_0:p_n],
ki_z[:, p_0:p_n],
kf_z[:, p_0:p_n],
ki_z[:, p_0:p_n] - kf_z[:, p_0:p_n],
S[:, p_0:p_n],
dS[:, p_0:p_n],
]
).transpose((1, 2, 0))
combined_data.append(rdata)
ki_max = max(ki_max, ki_z.max())
if pol_state in self.data_manager.reduction_list[0].cross_sections:
_pol_state = self.data_manager.reduction_list[0].cross_sections[pol_state].cross_section_label
else:
_pol_state = pol_state
data_dict[pol_state] = combined_data
data_dict["cross_sections"][pol_state] = _pol_state
data_dict["ki_max"] = ki_max
return data_dict
[docs]
def smooth_offspec(self, data_dict: dict) -> Tuple[dict, dict]:
"""Create a smoothed dataset from the off-specular scattering.
Note for my own integrity (MD):
I don't think one should smooth data distributions and do any quantitative
work with it following this process. The way this was implemented in the
previous QuickNXS, replicated here, is equivalent to adding an extra resolution,
which then would have to be properly taken into account when fitting.
In addition, the process doesn't produce errors in intensity.
It effectively only produces a pretty picture and should only be used as such.
Parameters
----------
data_dict:
The output of `get_offspec_data()`
Returns
-------
Tuple[dict, dict]
A tuple containing the smoothed data dictionary and a slice data dictionary.
"""
axes = self.data_manager.active_cross_section.configuration.off_spec_x_axis
output_data = {
"units": [],
"columns": [],
"cross_sections": {},
}
slice_data_dict = {}
for xs in data_dict["cross_sections"].keys():
data = np.hstack(data_dict[xs])
I = data[:, :, 5].flatten()
Qzmax = data[:, :, 2].max() * 2.0
y_label = "Qz"
if axes == Configuration.QX_VS_QZ:
x = data[:, :, 0].flatten()
y = data[:, :, 1].flatten()
output_data["units"] = ["1/A", "1/A", "a.u."]
output_data["columns"] = ["Qx", "Qz", "I"]
axis_sigma_scaling = 2
xysigma0 = Qzmax / 3.0
elif axes == Configuration.KZI_VS_KZF:
y_label = "kf_z"
x = data[:, :, 2].flatten()
y = data[:, :, 3].flatten()
output_data["units"] = ["1/A", "1/A", "a.u."]
output_data["columns"] = ["ki_z", "kf_z", "I"]
axis_sigma_scaling = 3
xysigma0 = Qzmax / 6.0
else:
x = data[:, :, 4].flatten()
y = data[:, :, 1].flatten()
output_data["units"] = ["1/A", "1/A", "a.u."]
output_data["columns"] = ["ki_z-kf_z", "Qz", "I"]
axis_sigma_scaling = 2
xysigma0 = Qzmax / 3.0
x, y, I = off_specular.smooth_data(
x,
y,
I,
sigmas=self.output_options["off_spec_sigmas"],
gridx=self.output_options["off_spec_nxbins"],
gridy=self.output_options["off_spec_nybins"],
sigmax=self.output_options["off_spec_sigmax"],
sigmay=self.output_options["off_spec_sigmay"],
x1=self.output_options["off_spec_x_min"],
x2=self.output_options["off_spec_x_max"],
y1=self.output_options["off_spec_y_min"],
y2=self.output_options["off_spec_y_max"],
axis_sigma_scaling=axis_sigma_scaling,
xysigma0=xysigma0,
)
output_data[xs] = [np.array([x, y, I]).transpose((1, 2, 0))]
output_data["cross_sections"][xs] = data_dict["cross_sections"][xs]
# Slices
slice_data_dict = self.get_slice_output_data(x[0], y.T[0], I, None, xs, y_label, **slice_data_dict)
output_data["ki_max"] = data_dict["ki_max"]
return output_data, slice_data_dict
[docs]
def get_slice_output_data(self, qx, qz, r, dr, pol_state, label, **slice_data_dict):
"""Produce a data dictionary with a slice of the data."""
if slice_data_dict == {}:
slice_data_dict = dict(units=["1/A", "a.u.", "a.u."], columns=[label, "I", "dI"], cross_sections={})
q_min = self.data_manager.active_cross_section.configuration.off_spec_slice_qz_min
q_max = self.data_manager.active_cross_section.configuration.off_spec_slice_qz_max
result, error = off_specular.get_slice(qz, r, dr, q_min, q_max)
if error is not None:
# Is x what we need here, or the middle of the bin
_to_save = np.asarray([qx, result, error]).T
else:
_to_save = np.asarray([qx, result]).T
slice_label = "%s_%s_%s-%s" % (pol_state, label, q_min, q_max)
slice_data_dict["cross_sections"][slice_label] = slice_label
slice_data_dict[slice_label] = _to_save
return slice_data_dict
[docs]
def get_gisans_slice_output_data(self, qy, qz, r, dr, pol_state, label, **slice_data_dict):
"""Produce a data dictionary with a slice of the data."""
if slice_data_dict == {}:
slice_data_dict = dict(units=["1/A", "a.u.", "a.u."], columns=[label, "I", "dI"], cross_sections={})
q_min = self.data_manager.active_cross_section.configuration.gisans_slice_qz_min
q_max = self.data_manager.active_cross_section.configuration.gisans_slice_qz_max
# We can use the off-specular get_slice() function because it's not specific to off-specular
result, error = off_specular.get_slice(qz, r, dr, q_min, q_max)
if error is not None:
# Is x what we need here, or the middle of the bin
_to_save = np.asarray([qy, result, error]).T
else:
_to_save = np.asarray([qy, result]).T
slice_label = "%s_%s_%s-%s" % (pol_state, label, q_min, q_max)
slice_data_dict["cross_sections"][slice_label] = slice_label
slice_data_dict[slice_label] = _to_save
return slice_data_dict
[docs]
def get_output_data(self):
"""Re-format the reflectivity data for output.
The QuickNXS format cannot be written from the merged reflectivity, so we
have to treat it differently and give it the workspaces for each angle.
"""
data_dict = dict(
units=["1/A", "a.u.", "a.u.", "1/A", "rad"], columns=["Qz", "R", "dR", "dQz", "theta"], cross_sections={}
)
# Extract common information
if not self.data_manager.reduction_states or not self.data_manager.reduction_list:
logging.error("List of cross-sections is empty")
return data_dict
first_state = self.data_manager.reduction_states[0]
p_0 = [
item.cross_sections[first_state].configuration.cut_first_n_points
for item in self.data_manager.reduction_list
]
p_n = [
item.cross_sections[first_state].configuration.cut_last_n_points
for item in self.data_manager.reduction_list
]
for pol_state in self.data_manager.reduction_states:
# The scaling factors should have been determined at this point. Just use them
# to merge the different runs in a set.
ws_list = data_manipulation.get_scaled_workspaces(self.data_manager.reduction_list, pol_state)
# If the reflectivity calculation failed, we may not have data to work with
# for this cross-section.
if len(ws_list) == 0:
continue
combined_data = []
for i, ws in enumerate(ws_list):
_x = ws.readX(0)
n_total = len(_x)
x = ws.readX(0)[p_0[i] : n_total - p_n[i]]
y = ws.readY(0)[p_0[i] : n_total - p_n[i]]
dy = ws.readE(0)[p_0[i] : n_total - p_n[i]]
dx = ws.readDx(0)[p_0[i] : n_total - p_n[i]]
# This should be actual theta calculated
tth_value = ws.getRun().getProperty("two_theta").value / 2.0 * np.pi / 180.0
# tth_value = ws.getRun().getProperty("SANGLE").getStatistics().mean * math.pi / 180.0
tth = np.ones(len(x)) * tth_value
combined_data.append(np.vstack((x, y, dy, dx, tth)).transpose())
_output_data = np.vstack(combined_data)
ordered = np.argsort(_output_data, axis=0).transpose()[0]
output_data = _output_data[ordered]
if pol_state in self.data_manager.reduction_list[0].cross_sections:
_pol_state = self.data_manager.reduction_list[0].cross_sections[pol_state].cross_section_label
else:
_pol_state = pol_state
data_dict[pol_state] = output_data
data_dict["cross_sections"][pol_state] = _pol_state
# Asymmetry
if self.output_options["export_asym"]:
p_state, m_state = self.data_manager.determine_asymmetry_states()
if p_state and m_state:
# Get the list of workspaces
asym_data = []
if p_state in data_dict and m_state in data_dict:
# Sometimes the number of points may be different if the last few points had no signal.
for i in range(len(data_dict[p_state])):
p_point = data_dict[p_state][i]
i_m = len(data_dict[m_state][data_dict[m_state].T[0] < p_point[0]])
if i_m < len(data_dict[m_state]):
m_point = data_dict[m_state][i_m]
if p_point[1] > 0 and m_point[1] > 0:
ratio = (p_point[1] - m_point[1]) / (p_point[1] + m_point[1])
d_ratio = 2.0 / (p_point[1] + m_point[1]) ** 2
d_ratio *= math.sqrt(
m_point[1] ** 2 * p_point[2] ** 2 + p_point[1] ** 2 * m_point[2] ** 2
)
asym_data.append([p_point[0], ratio, d_ratio, p_point[3], p_point[4]])
data_dict["SA"] = np.asarray(asym_data)
else:
logging.error(
"Asym request but failed: %s %s %s %s",
p_state,
m_state,
len(data_dict[p_state]),
len(data_dict[m_state]),
)
return data_dict
def _email_replace(self, text):
"""Replace token templates in text."""
run_list = [str(item.number) for item in self.data_manager.reduction_list]
return (
text.replace("{ipts}", "")
.replace("{numbers}", "+".join(run_list))
.replace("{instrument}", self.data_manager.active_cross_section.configuration.instrument.instrument_name)
)
[docs]
def send_email(self):
"""Collect all files and send them to the user via smtp mail."""
SMTP_SERVER = Settings()["SMTP_SERVER"]
msg = MIMEMultipart()
msg["Subject"] = self._email_replace(self.output_options["email_subject"])
msg["From"] = "BL4A@ornl.gov"
msg["To"] = self.output_options["email_to"].replace(";", ", ")
msg["CC"] = self.output_options["email_cc"].replace(";", ", ")
text = self._email_replace(self.output_options["email_text"])
msg.preamble = text
msg.attach(MIMEText(text))
if self.output_options["email_send_data"]:
exported_files = self.exported_data_files
elif self.output_options["email_send_plots"]:
exported_files = self.exported_data_plots
else:
exported_files = self.exported_data_files
exported_files.extend(self.exported_data_plots)
if self.output_options["email_zip_data"]:
# Create an in-memory zip file which gets attached to the mail
fobj = io.BytesIO()
_file = zipfile.ZipFile(fobj, "w", zipfile.ZIP_DEFLATED)
for item in exported_files:
_file.write(item, arcname=os.path.basename(item))
_file.close()
fobj.seek(0)
mitem = MIMEBase("application", "zip")
mitem.set_payload(fobj.read())
encoders.encode_base64(mitem)
mitem.add_header("Content-Disposition", "attachment", filename=self._email_replace("results_{numbers}.zip"))
msg.attach(mitem)
else:
# read each file, which was exported and attach it to the mail
for item in exported_files:
try:
if item.endswith(".png"):
mitem = MIMEImage(open(item, "r").read(), "png")
elif item.endswith(".pdf"):
mitem = MIMEText(open(item, "r").read(), "pdf")
elif item.endswith(".dat") or item.endswith(".gp"):
mitem = MIMEText(open(item, "r").read())
else:
mitem = MIMEBase("application", item[-3:])
mitem.set_payload(open(item, "r").read())
encoders.encode_base64(mitem)
except:
logging.error("Could not package files for email: %s", sys.exc_info()[1])
mitem.add_header("Content-Disposition", "attachment", filename=os.path.basename(item))
msg.attach(mitem)
try:
smtp = smtplib.SMTP(SMTP_SERVER, timeout=10)
to_addr = msg["To"].replace(",", ";") + ";" + msg["CC"].replace(",", ";")
smtp.sendmail(msg["From"], to_addr, msg.as_string())
smtp.quit()
except:
logging.error("Could not send email: %s", sys.exc_info()[1])