Source code for quicknxs.interfaces.data_handling.processing_workflow

"""Data processing workflow, taking results and writing them to files."""
# pylint: disable=bare-except, too-many-locals

import copy
import io
import logging
import math
import os
import smtplib
import sys
import time
import zipfile
from email import encoders
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Dict, Optional, Tuple

import numpy as np
from mantid.simpleapi import CopyLogs, CreateWorkspace
from mr_reduction import io_orso
from mr_reduction.types import MantidWorkspace

from quicknxs.config import Settings
from quicknxs.interfaces.configuration import Configuration
from quicknxs.interfaces.data_handling import data_manipulation, gisans, off_specular, quicknxs_io
from quicknxs.interfaces.data_manager import DataManager
from quicknxs.interfaces.event_handlers.progress_reporter import ProgressReporter

DEFAULT_OPTIONS = dict(
    export_specular=True,
    export_asym=False,
    export_gisans=False,
    export_offspec=False,
    export_offspec_smooth=False,
    format_matlab=False,
    format_mantid=True,
    format_numpy=False,
    format_5cols=False,
    output_sample_size=10,
    output_directory="",
    output_file_template="{instrument}_{numbers}_{peak}_{item}_{state}.{type}",
    email_send=False,
    email_zip_data=False,
    email_send_plots=False,
    email_send_data=False,
    email_to="",
    email_cc="",
    email_subject="",
    email_test="",
)


[docs] class ProcessingWorkflow(object): """Carry out the reduction process for a set of data runs and manages outputs.""" def __init__(self, data_manager: DataManager, output_options: Optional[dict] = None): """All the reduced data shall come from data manager.""" self.data_manager = data_manager self.output_options = output_options if output_options else DEFAULT_OPTIONS self.exported_data_files = [] self.exported_data_plots = []
[docs] def execute(self, progress: Optional[ProgressReporter] = None): """Process data and write output files.""" if not self.data_manager.reduction_states: return # store current peak shown in the UI active_peak = self.data_manager.active_reduction_list_index active_data_index = self.data_manager.find_active_data_id() for peak_index in self.data_manager.peak_reduction_lists.keys(): # set active data based on peak index self.data_manager.set_active_reduction_list_index(peak_index) self.data_manager.set_active_data_from_reduction_list(0) if self.output_options["export_specular"]: if progress is not None: progress(10, "Computing reflectivity") self.specular_reflectivity() if self.output_options["export_offspec"] or self.output_options["export_offspec_smooth"]: if progress is not None: progress(20, "Computing off-specular reflectivity") self.offspec( raw=self.output_options["export_offspec"], binned=self.output_options["export_offspec_smooth"] ) if self.output_options["export_gisans"]: if progress is not None: progress(60, "Computing GISANS") # FIXME 66 - could be an AttributeError from self.gisans(). Catch it! # -> Is this good enough? try: self.gisans(progress=progress) except AttributeError: logging.error("GISANS failed: %s", sys.exc_info()[1]) # restore current peak shown in the UI self.data_manager.set_active_reduction_list_index(active_peak) if active_data_index is not None: self.data_manager.set_active_data_from_reduction_list(active_data_index) if self.output_options["email_send"]: self.send_email() if progress is not None: progress(100, "Complete")
[docs] def get_file_name( self, run_list: Optional[list] = None, pol_state: Optional[str] = None, data_type: str = "dat", process_type: str = "Specular", ): """Construct a file name according to the measurement type. Parameters ---------- run_list: List of run numbers pol_state: Name for the polarization state data_type: File extension process_type: Descriptor for the process type """ if run_list is None: run_list = [] base_name = self.output_options["output_file_template"].replace("{numbers}", "+".join(run_list)) base_name = base_name.replace( "{instrument}", self.data_manager.active_cross_section.configuration.instrument.instrument_name ) base_name = base_name.replace("{item}", process_type) if pol_state is not None: base_name = base_name.replace("{state}", pol_state) base_name = base_name.replace("{type}", data_type) base_name = base_name.replace("{peak}", f"peak{self.data_manager.active_reduction_list_index}") return os.path.join(self.output_options["output_directory"], base_name)
[docs] def write_quicknxs( self, output_data: Dict[str, np.ndarray], output_file_base: str, xs: Optional[list] = None, include_gisans: bool = False, include_offspec: bool = False, ): """Write QuickNXS output reflectivity file. Parameters ---------- output_data: Dictionary of numpy arrays output_file_base: Template for output file paths xs: List of cross-sections available in the output_data include_gisans: Include GISANS specifc variables in the reduction output file include_offspec: Include Offspec specific variables in the reduction output file """ # Get the column names units = output_data["units"] cols = output_data["columns"] col_names = ["%s [%s]" % (cols[i], units[i]) for i in range(len(cols))] # List of all output states we have to deal with if xs is not None: output_states = xs else: output_states = copy.copy(self.data_manager.reduction_states) if self.output_options["export_asym"] and "SA" in output_data: output_states.append("SA") # Sanity check if len(output_states) == 0: return # Write out the cross-section data five_cols = self.output_options["format_5cols"] for pol_state in output_states: # The cross-sections might have different names if pol_state in self.data_manager.reduction_list[0].cross_sections: _pol_state = self.data_manager.reduction_list[0].cross_sections[pol_state].cross_section_label else: _pol_state = pol_state # We might not have data for a given cross-section if pol_state not in output_data: continue state_output_path = output_file_base.replace("{state}", pol_state) quicknxs_io.write_reflectivity_header( self.data_manager.peak_reduction_lists, self.data_manager.active_reduction_list_index, self.data_manager.direct_beam_list, state_output_path, _pol_state, include_gisans, include_offspec, ) quicknxs_io.write_reflectivity_data(state_output_path, output_data[pol_state], col_names, as_5col=five_cols) self.exported_data_files.append(state_output_path)
[docs] def write_orso(self, output_data: Dict[str, np.ndarray]): """ Save individual and combined reflectivity curves to ORSO format. This function collects reflectivity data and metadata and saves one individual ORSO ASCII file per run and peak and one combined (stitched) ORSO ASCII file per peak. Parameters ---------- output_data: Dict[str, np.ndarray] Dictionary of scaled and stitched data where the key is the cross-section name and the value is an array of reflectivity data with shape (number of Q points, 5), where the 5 columns are: - Qz: normal momentum transfer - R: reflectivity - dR: reflectivity error - dQz: normal momentum transfer error - theta: scattering angle """ # Save the individual runs to ORSO individual_paths = {} for nexus_data in self.data_manager.reduction_list: run = str(nexus_data.number) reflectivity_workspaces = nexus_data.get_reflectivity_workspace_group() filepath = self.get_file_name([run], "all", "ort") io_orso.save_cross_sections(reflectivity_workspaces, filepath) individual_paths[run] = filepath def _create_combined_reflectivity_workspace(_ws: MantidWorkspace, _xs: str): """Create a combined reflectivity workspace. Create a new workspace with metadata copied from the given workspace and output data for the given cross-section """ output_xs = output_data[_xs] _ws_combined = CreateWorkspace( DataX=output_xs[:, 0], DataY=output_xs[:, 1], DataE=output_xs[:, 2], Dx=output_xs[:, 3], OutputWorkspace=str(_ws) + "-combined", ) CopyLogs(_ws, _ws_combined, MergeStrategy="WipeExisting") return _ws_combined # Assemble the combined reflectivity workspaces combined_reflectivity_workspaces = [] cross_sections = self.data_manager.reduction_states for xs in cross_sections: # use the first reflectivity workspace to copy metadata from ws_first = self.data_manager.reduction_list[0].cross_sections[xs].reflectivity_workspace # create a workspace from the stitched reflectivity data and add metadata ws_combined = _create_combined_reflectivity_workspace(ws_first, xs) combined_reflectivity_workspaces.append(ws_combined) # Save the combined reflectivity to ORSO run_list = [str(item.number) for item in self.data_manager.reduction_list] combined_path = self.get_file_name(run_list, "all", "ort") io_orso.save_cross_sections(combined_reflectivity_workspaces, combined_path)
[docs] def specular_reflectivity(self): """Retrieve the computed reflectivity and save it to file.""" # The reflectivity should always be up to date, so we don't need to recalculate it. # The following would be used to recalculate it: # self.data_manager.calculate_reflectivity(specular=True) # self.data_manager.merge_data_sets(asymmetry=self.output_options['export_asym']) run_list = [str(item.number) for item in self.data_manager.reduction_list] output_data = self.get_output_data() # QuickNXS format output_file_base = self.get_file_name(run_list) self.write_quicknxs(output_data, output_file_base) # ORSO format self.write_orso(output_data) # Numpy arrays if self.output_options["format_numpy"]: output_file = self.get_file_name(run_list, data_type="npz", pol_state="all") np.savez(output_file, **output_data) self.exported_data_files.append(output_file) # Matlab output if self.output_options["format_matlab"]: try: from scipy.io import savemat output_file = self.get_file_name(run_list, data_type="mat", pol_state="all") savemat(output_file, output_data, oned_as="column") self.exported_data_files.append(output_file) except: logging.error("Could not save in matlab format: %s", sys.exc_info([1])) if self.output_options["format_mantid"]: output_file = self.get_file_name(run_list, data_type="py", pol_state="all") script = data_manipulation.generate_short_script(self.data_manager.reduction_list) with open(output_file, "w") as file_object: file_object.write(script) self.exported_data_files.append(output_file)
[docs] def gisans(self, progress=None): """Export GISANS.""" run_list = [str(item.number) for item in self.data_manager.reduction_list] # Refresh the reflectivity calculation if progress is not None: progress(65, "Reducing GISANS...") self.data_manager.cached_gisans = None self.data_manager.reduce_gisans(progress=None) if progress is not None: progress(75, "Binning GISANS...") data_dict, slice_data_dict = self.get_gisans_data(progress=None) self.data_manager.cached_gisans = data_dict if progress is not None: progress(90, "Writing data") output_file_base = self.get_file_name(run_list, process_type="GISANS") self.write_quicknxs(data_dict, output_file_base, xs=data_dict["cross_sections"].keys(), include_gisans=True) # Write out slice data if slice_data_dict is not None and "cross_sections" in slice_data_dict: output_file_base = self.get_file_name(run_list, process_type="GISANSSlice") self.write_quicknxs( slice_data_dict, output_file_base, xs=slice_data_dict["cross_sections"].keys(), include_gisans=True ) if progress is not None: progress(100, "GISANS complete")
[docs] def offspec(self, raw: bool = True, binned: bool = False): """Export off-specular reflectivity. Parameters ---------- raw: If true, the raw results will be saved binned: If true, the raw results will be binned and saved """ run_list = [str(item.number) for item in self.data_manager.reduction_list] # Refresh the reflectivity calculation self.data_manager.cached_offspec = None self.data_manager.reduce_offspec() if raw or binned: output_data = self.get_offspec_data() # Export raw result if raw: # QuickNXS format output_file_base = self.get_file_name(run_list, process_type="OffSpec") self.write_quicknxs(output_data, output_file_base, include_offspec=True) # Export binned result if binned: if self.data_manager.active_cross_section.configuration.apply_smoothing: # "Smooth" version try: smooth_output, slice_data_dict = self.smooth_offspec(output_data) output_file_base = self.get_file_name(run_list, process_type="OffSpecSmooth") self.write_quicknxs(smooth_output, output_file_base, include_offspec=True) if slice_data_dict is not None and "cross_sections" in slice_data_dict: output_file_base = self.get_file_name(run_list, process_type="OffSpecSmoothSlice") self.write_quicknxs( slice_data_dict, output_file_base, xs=slice_data_dict["cross_sections"].keys(), include_offspec=True, ) self.data_manager.cached_offspec = smooth_output except: raise logging.error("Problem writing smooth off-spec output") else: # Binned version binned_data, slice_data_dict = self.get_rebinned_offspec_data() # QuickNXS format ['smooth' is an odd name but we keep it for backward compatibility] output_file_base = self.get_file_name(run_list, process_type="OffSpecBinned") self.write_quicknxs(binned_data, output_file_base, include_offspec=True) if slice_data_dict is not None and "cross_sections" in slice_data_dict: output_file_base = self.get_file_name(run_list, process_type="OffSpecSlice") self.write_quicknxs( slice_data_dict, output_file_base, xs=slice_data_dict["cross_sections"].keys(), include_offspec=True, ) self.data_manager.cached_offspec = binned_data
[docs] def get_rebinned_offspec_data(self): """Get a data dictionary ready for saving.""" data_dict = None slice_data_dict = {} # Extract common information if len(self.data_manager.reduction_states) == 0: logging.error("List of cross-sections is empty") return {} for pol_state in self.data_manager.reduction_states: r, dr, x, y, labels = off_specular.rebin_extract( self.data_manager.reduction_list, pol_state, axes=self.data_manager.active_cross_section.configuration.off_spec_x_axis, use_weights=self.data_manager.active_cross_section.configuration.off_spec_err_weight, n_bins_x=self.data_manager.active_cross_section.configuration.off_spec_nxbins, n_bins_y=self.data_manager.active_cross_section.configuration.off_spec_nybins, x_min=self.data_manager.active_cross_section.configuration.off_spec_x_min, x_max=self.data_manager.active_cross_section.configuration.off_spec_x_max, y_min=self.data_manager.active_cross_section.configuration.off_spec_y_min, y_max=self.data_manager.active_cross_section.configuration.off_spec_y_max, ) if data_dict is None: data_dict = dict( units=["1/A", "1/A", "a.u.", "a.u."], columns=[labels[0], labels[1], "I", "dI"], cross_sections={} ) # Create array of x-values x_tiled = np.tile(x, len(y)) x_tiled = x_tiled.reshape([len(y), len(x)]) # Create array of y-values y_tiled = np.tile(y, len(x)) y_tiled = y_tiled.reshape([len(x), len(y)]) y_tiled = y_tiled.T rdata = np.array([x_tiled, y_tiled, r, dr]).transpose((1, 2, 0)) if pol_state in self.data_manager.reduction_list[0].cross_sections: _pol_state = self.data_manager.reduction_list[0].cross_sections[pol_state].cross_section_label else: _pol_state = pol_state data_dict[pol_state] = [np.nan_to_num(rdata)] data_dict["cross_sections"][pol_state] = _pol_state # Slices slice_data_dict = self.get_slice_output_data(x, y, r, dr, pol_state, labels[1], **slice_data_dict) return data_dict, slice_data_dict
[docs] def get_gisans_data(self, progress=None) -> dict | Tuple[dict, dict]: wl_npts = self.data_manager.active_cross_section.configuration.gisans_wl_npts wl_min = self.data_manager.active_cross_section.configuration.gisans_wl_min wl_max = self.data_manager.active_cross_section.configuration.gisans_wl_max qy_npts = self.data_manager.active_cross_section.configuration.gisans_qy_npts qz_npts = self.data_manager.active_cross_section.configuration.gisans_qz_npts use_pf = self.data_manager.active_cross_section.configuration.gisans_use_pf data_dict = {"units": ["1/A", "1/A", "a.u.", "a.u."], "cross_sections": {}, "cross_section_bins": {}} if use_pf: data_dict["columns"] = ["Qy", "pf", "I", "dI"] else: data_dict["columns"] = ["Qy", "Qz", "I", "dI"] # Extract common information if not self.data_manager.reduction_states or not self.data_manager.reduction_list: logging.error("List of cross-sections is empty") return data_dict t_0 = time.time() _parallel = False slice_data_dict = {} for pol_state in self.data_manager.reduction_states: if _parallel: binned_data = gisans.rebin_parallel( self.data_manager.reduction_list, pol_state, wl_min=wl_min, wl_max=wl_max, wl_npts=wl_npts, qy_npts=qy_npts, qz_npts=qz_npts, use_pf=use_pf, ) data_dict["cross_section_bins"][pol_state] = [] for i in range(wl_npts): wl_step = (wl_max - wl_min) / wl_npts _wl_min = wl_min + i * wl_step _wl_max = wl_min + (i + 1) * wl_step if _parallel: _intensity, _qy, _qz_axis, _intensity_err = binned_data[i] else: _intensity, _qy, _qz_axis, _intensity_err = self.data_manager.rebin_gisans( pol_state, wl_min=_wl_min, wl_max=_wl_max, qy_npts=qy_npts, qz_npts=qz_npts, use_pf=use_pf ) qz, qy = np.meshgrid(_qz_axis, _qy) rdata = np.array([qy, qz, _intensity, _intensity_err]).transpose((1, 2, 0)) if pol_state in self.data_manager.reduction_list[0].cross_sections: _pol_state = self.data_manager.reduction_list[0].cross_sections[pol_state].cross_section_label else: _pol_state = pol_state _pol_state = "%.3f-%.3f_%s" % (_wl_min, _wl_max, pol_state) _pol_state_clean = "%.3f-%.3f_%s" % (_wl_min, _wl_max, _pol_state) data_dict[_pol_state] = [np.nan_to_num(rdata)] data_dict["cross_sections"][_pol_state] = _pol_state_clean data_dict["cross_section_bins"][pol_state].append(_pol_state) # Slices label = "pf" if use_pf else "Qz" slice_data_dict = self.get_gisans_slice_output_data( _qy, _qz_axis, _intensity.T, _intensity_err.T, _pol_state, label, **slice_data_dict ) logging.info("GISANS processing time: %s sec", (time.time() - t_0)) return data_dict, slice_data_dict
[docs] def get_offspec_data(self): """Get a data dictionary ready for saving.""" data_dict = dict( units=["1/A", "1/A", "1/A", "1/A", "1/A", "a.u.", "a.u."], columns=["Qx", "Qz", "ki_z", "kf_z", "ki_z-kf_z", "I", "dI"], cross_sections={}, ) # Extract common information if not self.data_manager.reduction_states or not self.data_manager.reduction_list: logging.error("List of cross-sections is empty") return data_dict first_state = self.data_manager.reduction_states[0] p_0 = [ item.cross_sections[first_state].configuration.cut_first_n_points for item in self.data_manager.reduction_list ] p_n = [ item.cross_sections[first_state].configuration.cut_last_n_points for item in self.data_manager.reduction_list ] ki_max = 0.01 for pol_state in self.data_manager.reduction_states: # The scaling factors should have been determined at this point. Just use them # to merge the different runs in a set. combined_data = [] for item in self.data_manager.reduction_list: offspec = item.cross_sections[pol_state].off_spec Qx, Qz, ki_z, kf_z, S, dS = (offspec.Qx, offspec.Qz, offspec.ki_z, offspec.kf_z, offspec.S, offspec.dS) n_total = len(S[0]) # P_0 and P_N are the number of points to cut in TOF on each side p_0 = item.cross_sections[pol_state].configuration.cut_first_n_points p_n = n_total - item.cross_sections[pol_state].configuration.cut_last_n_points rdata = np.array( [ Qx[:, p_0:p_n], Qz[:, p_0:p_n], ki_z[:, p_0:p_n], kf_z[:, p_0:p_n], ki_z[:, p_0:p_n] - kf_z[:, p_0:p_n], S[:, p_0:p_n], dS[:, p_0:p_n], ] ).transpose((1, 2, 0)) combined_data.append(rdata) ki_max = max(ki_max, ki_z.max()) if pol_state in self.data_manager.reduction_list[0].cross_sections: _pol_state = self.data_manager.reduction_list[0].cross_sections[pol_state].cross_section_label else: _pol_state = pol_state data_dict[pol_state] = combined_data data_dict["cross_sections"][pol_state] = _pol_state data_dict["ki_max"] = ki_max return data_dict
[docs] def smooth_offspec(self, data_dict: dict) -> Tuple[dict, dict]: """Create a smoothed dataset from the off-specular scattering. Note for my own integrity (MD): I don't think one should smooth data distributions and do any quantitative work with it following this process. The way this was implemented in the previous QuickNXS, replicated here, is equivalent to adding an extra resolution, which then would have to be properly taken into account when fitting. In addition, the process doesn't produce errors in intensity. It effectively only produces a pretty picture and should only be used as such. Parameters ---------- data_dict: The output of `get_offspec_data()` Returns ------- Tuple[dict, dict] A tuple containing the smoothed data dictionary and a slice data dictionary. """ axes = self.data_manager.active_cross_section.configuration.off_spec_x_axis output_data = { "units": [], "columns": [], "cross_sections": {}, } slice_data_dict = {} for xs in data_dict["cross_sections"].keys(): data = np.hstack(data_dict[xs]) I = data[:, :, 5].flatten() Qzmax = data[:, :, 2].max() * 2.0 y_label = "Qz" if axes == Configuration.QX_VS_QZ: x = data[:, :, 0].flatten() y = data[:, :, 1].flatten() output_data["units"] = ["1/A", "1/A", "a.u."] output_data["columns"] = ["Qx", "Qz", "I"] axis_sigma_scaling = 2 xysigma0 = Qzmax / 3.0 elif axes == Configuration.KZI_VS_KZF: y_label = "kf_z" x = data[:, :, 2].flatten() y = data[:, :, 3].flatten() output_data["units"] = ["1/A", "1/A", "a.u."] output_data["columns"] = ["ki_z", "kf_z", "I"] axis_sigma_scaling = 3 xysigma0 = Qzmax / 6.0 else: x = data[:, :, 4].flatten() y = data[:, :, 1].flatten() output_data["units"] = ["1/A", "1/A", "a.u."] output_data["columns"] = ["ki_z-kf_z", "Qz", "I"] axis_sigma_scaling = 2 xysigma0 = Qzmax / 3.0 x, y, I = off_specular.smooth_data( x, y, I, sigmas=self.output_options["off_spec_sigmas"], gridx=self.output_options["off_spec_nxbins"], gridy=self.output_options["off_spec_nybins"], sigmax=self.output_options["off_spec_sigmax"], sigmay=self.output_options["off_spec_sigmay"], x1=self.output_options["off_spec_x_min"], x2=self.output_options["off_spec_x_max"], y1=self.output_options["off_spec_y_min"], y2=self.output_options["off_spec_y_max"], axis_sigma_scaling=axis_sigma_scaling, xysigma0=xysigma0, ) output_data[xs] = [np.array([x, y, I]).transpose((1, 2, 0))] output_data["cross_sections"][xs] = data_dict["cross_sections"][xs] # Slices slice_data_dict = self.get_slice_output_data(x[0], y.T[0], I, None, xs, y_label, **slice_data_dict) output_data["ki_max"] = data_dict["ki_max"] return output_data, slice_data_dict
[docs] def get_slice_output_data(self, qx, qz, r, dr, pol_state, label, **slice_data_dict): """Produce a data dictionary with a slice of the data.""" if slice_data_dict == {}: slice_data_dict = dict(units=["1/A", "a.u.", "a.u."], columns=[label, "I", "dI"], cross_sections={}) q_min = self.data_manager.active_cross_section.configuration.off_spec_slice_qz_min q_max = self.data_manager.active_cross_section.configuration.off_spec_slice_qz_max result, error = off_specular.get_slice(qz, r, dr, q_min, q_max) if error is not None: # Is x what we need here, or the middle of the bin _to_save = np.asarray([qx, result, error]).T else: _to_save = np.asarray([qx, result]).T slice_label = "%s_%s_%s-%s" % (pol_state, label, q_min, q_max) slice_data_dict["cross_sections"][slice_label] = slice_label slice_data_dict[slice_label] = _to_save return slice_data_dict
[docs] def get_gisans_slice_output_data(self, qy, qz, r, dr, pol_state, label, **slice_data_dict): """Produce a data dictionary with a slice of the data.""" if slice_data_dict == {}: slice_data_dict = dict(units=["1/A", "a.u.", "a.u."], columns=[label, "I", "dI"], cross_sections={}) q_min = self.data_manager.active_cross_section.configuration.gisans_slice_qz_min q_max = self.data_manager.active_cross_section.configuration.gisans_slice_qz_max # We can use the off-specular get_slice() function because it's not specific to off-specular result, error = off_specular.get_slice(qz, r, dr, q_min, q_max) if error is not None: # Is x what we need here, or the middle of the bin _to_save = np.asarray([qy, result, error]).T else: _to_save = np.asarray([qy, result]).T slice_label = "%s_%s_%s-%s" % (pol_state, label, q_min, q_max) slice_data_dict["cross_sections"][slice_label] = slice_label slice_data_dict[slice_label] = _to_save return slice_data_dict
[docs] def get_output_data(self): """Re-format the reflectivity data for output. The QuickNXS format cannot be written from the merged reflectivity, so we have to treat it differently and give it the workspaces for each angle. """ data_dict = dict( units=["1/A", "a.u.", "a.u.", "1/A", "rad"], columns=["Qz", "R", "dR", "dQz", "theta"], cross_sections={} ) # Extract common information if not self.data_manager.reduction_states or not self.data_manager.reduction_list: logging.error("List of cross-sections is empty") return data_dict first_state = self.data_manager.reduction_states[0] p_0 = [ item.cross_sections[first_state].configuration.cut_first_n_points for item in self.data_manager.reduction_list ] p_n = [ item.cross_sections[first_state].configuration.cut_last_n_points for item in self.data_manager.reduction_list ] for pol_state in self.data_manager.reduction_states: # The scaling factors should have been determined at this point. Just use them # to merge the different runs in a set. ws_list = data_manipulation.get_scaled_workspaces(self.data_manager.reduction_list, pol_state) # If the reflectivity calculation failed, we may not have data to work with # for this cross-section. if len(ws_list) == 0: continue combined_data = [] for i, ws in enumerate(ws_list): _x = ws.readX(0) n_total = len(_x) x = ws.readX(0)[p_0[i] : n_total - p_n[i]] y = ws.readY(0)[p_0[i] : n_total - p_n[i]] dy = ws.readE(0)[p_0[i] : n_total - p_n[i]] dx = ws.readDx(0)[p_0[i] : n_total - p_n[i]] # This should be actual theta calculated tth_value = ws.getRun().getProperty("two_theta").value / 2.0 * np.pi / 180.0 # tth_value = ws.getRun().getProperty("SANGLE").getStatistics().mean * math.pi / 180.0 tth = np.ones(len(x)) * tth_value combined_data.append(np.vstack((x, y, dy, dx, tth)).transpose()) _output_data = np.vstack(combined_data) ordered = np.argsort(_output_data, axis=0).transpose()[0] output_data = _output_data[ordered] if pol_state in self.data_manager.reduction_list[0].cross_sections: _pol_state = self.data_manager.reduction_list[0].cross_sections[pol_state].cross_section_label else: _pol_state = pol_state data_dict[pol_state] = output_data data_dict["cross_sections"][pol_state] = _pol_state # Asymmetry if self.output_options["export_asym"]: p_state, m_state = self.data_manager.determine_asymmetry_states() if p_state and m_state: # Get the list of workspaces asym_data = [] if p_state in data_dict and m_state in data_dict: # Sometimes the number of points may be different if the last few points had no signal. for i in range(len(data_dict[p_state])): p_point = data_dict[p_state][i] i_m = len(data_dict[m_state][data_dict[m_state].T[0] < p_point[0]]) if i_m < len(data_dict[m_state]): m_point = data_dict[m_state][i_m] if p_point[1] > 0 and m_point[1] > 0: ratio = (p_point[1] - m_point[1]) / (p_point[1] + m_point[1]) d_ratio = 2.0 / (p_point[1] + m_point[1]) ** 2 d_ratio *= math.sqrt( m_point[1] ** 2 * p_point[2] ** 2 + p_point[1] ** 2 * m_point[2] ** 2 ) asym_data.append([p_point[0], ratio, d_ratio, p_point[3], p_point[4]]) data_dict["SA"] = np.asarray(asym_data) else: logging.error( "Asym request but failed: %s %s %s %s", p_state, m_state, len(data_dict[p_state]), len(data_dict[m_state]), ) return data_dict
def _email_replace(self, text): """Replace token templates in text.""" run_list = [str(item.number) for item in self.data_manager.reduction_list] return ( text.replace("{ipts}", "") .replace("{numbers}", "+".join(run_list)) .replace("{instrument}", self.data_manager.active_cross_section.configuration.instrument.instrument_name) )
[docs] def send_email(self): """Collect all files and send them to the user via smtp mail.""" SMTP_SERVER = Settings()["SMTP_SERVER"] msg = MIMEMultipart() msg["Subject"] = self._email_replace(self.output_options["email_subject"]) msg["From"] = "BL4A@ornl.gov" msg["To"] = self.output_options["email_to"].replace(";", ", ") msg["CC"] = self.output_options["email_cc"].replace(";", ", ") text = self._email_replace(self.output_options["email_text"]) msg.preamble = text msg.attach(MIMEText(text)) if self.output_options["email_send_data"]: exported_files = self.exported_data_files elif self.output_options["email_send_plots"]: exported_files = self.exported_data_plots else: exported_files = self.exported_data_files exported_files.extend(self.exported_data_plots) if self.output_options["email_zip_data"]: # Create an in-memory zip file which gets attached to the mail fobj = io.BytesIO() _file = zipfile.ZipFile(fobj, "w", zipfile.ZIP_DEFLATED) for item in exported_files: _file.write(item, arcname=os.path.basename(item)) _file.close() fobj.seek(0) mitem = MIMEBase("application", "zip") mitem.set_payload(fobj.read()) encoders.encode_base64(mitem) mitem.add_header("Content-Disposition", "attachment", filename=self._email_replace("results_{numbers}.zip")) msg.attach(mitem) else: # read each file, which was exported and attach it to the mail for item in exported_files: try: if item.endswith(".png"): mitem = MIMEImage(open(item, "r").read(), "png") elif item.endswith(".pdf"): mitem = MIMEText(open(item, "r").read(), "pdf") elif item.endswith(".dat") or item.endswith(".gp"): mitem = MIMEText(open(item, "r").read()) else: mitem = MIMEBase("application", item[-3:]) mitem.set_payload(open(item, "r").read()) encoders.encode_base64(mitem) except: logging.error("Could not package files for email: %s", sys.exc_info()[1]) mitem.add_header("Content-Disposition", "attachment", filename=os.path.basename(item)) msg.attach(mitem) try: smtp = smtplib.SMTP(SMTP_SERVER, timeout=10) to_addr = msg["To"].replace(",", ";") + ";" + msg["CC"].replace(",", ";") smtp.sendmail(msg["From"], to_addr, msg.as_string()) smtp.quit() except: logging.error("Could not send email: %s", sys.exc_info()[1])