# pylint: disable=bare-except, too-many-locals, too-many-statements, too-many-branches, wrong-import-order, too-many-arguments
"""
Read and write quicknxs reduced files
"""
import copy
import logging
import math
import os
import sys
import time
from pathlib import Path
from typing import Dict, List, Union
import mantid
import mr_reduction
import numpy as np
from quicknxs import __version__
from quicknxs.interfaces.configuration import Configuration
from quicknxs.interfaces.data_handling.data_set import CrossSectionData, NexusData
def _find_h5_data(filename: str):
"""
Because we have legacy data and new data re-processed for QuickNXS, we have to
ensure that we get the proper data file.
"""
if filename.endswith(".nxs"):
_new_filename = filename.replace("_histo.nxs", ".nxs.h5")
_new_filename = _new_filename.replace("_event.nxs", ".nxs.h5")
_new_filename = _new_filename.replace("data", "nexus")
if os.path.isfile(_new_filename):
logging.warning("Using %s" % _new_filename)
return _new_filename
return filename
def _get_cross_section_config_values(cross_section_data: CrossSectionData, i_direct_beam: int):
"""
Get dictionary of cross-section data configuration to write to QuickNXS file
Parameters
----------
cross_section_data: ~quicknxs.interfaces.data_handling.data_set.CrossSectionData
Cross-section to get parameter values from
i_direct_beam: int
Current direct beam index in the reduction list
Returns
-------
dict
"""
conf = cross_section_data.configuration
ws = cross_section_data.reflectivity_workspace
run_object = ws.getRun()
dpix = run_object.getProperty("DIRPIX").getStatistics().mean
filename = run_object.getProperty("Filename").value
constant_q_binning = run_object.getProperty("constant_q_binning").value
scatt_pos = run_object.getProperty("specular_pixel").value
scaling_factor = conf.scaling_factor
scaling_error = conf.scaling_error
# For some reason, the tth value that QuickNXS expects is offset.
# It seems to be because that same offset is applied later in the QuickNXS calculation.
# Correct tth here so that it can load properly in QuickNXS and produce the same result.
tth = run_object.getProperty("two_theta").value
det_distance = run_object["SampleDetDis"].getStatistics().mean / 1000.0
direct_beam_pix = run_object["DIRPIX"].getStatistics().mean
# Get pixel size from instrument properties
if ws.getInstrument().hasParameter("pixel-width"):
pixel_width = float(ws.getInstrument().getNumberParameter("pixel-width")[0]) / 1000.0
else:
pixel_width = 0.0007
tth -= ((direct_beam_pix - scatt_pos) * pixel_width) / det_distance * 180.0 / math.pi
normalization_run = run_object.getProperty("normalization_run").value
if normalization_run == "None":
db_id = 0
else:
i_direct_beam += 1
db_id = i_direct_beam
item = dict(
scale=scaling_factor,
scale_err=scaling_error,
DB_ID=db_id,
P0=conf.cut_first_n_points,
PN=conf.cut_last_n_points,
tth=tth,
fan=constant_q_binning,
x_pos=conf.peak_position,
x_width=conf.peak_width,
y_pos=conf.low_res_position,
y_width=conf.low_res_width,
bg_pos=conf.bck_position,
bg_width=conf.bck_width,
dpix=dpix,
number=str(ws.getRunNumber()),
File=filename,
)
parameter_values = {}
for key in item:
if isinstance(item[key], str):
parameter_values[key] = "%8s" % item[key]
else:
parameter_values[key] = "%8g" % item[key]
return parameter_values
[docs]
def write_reflectivity_data(
output_path: str, data: Union[list, np.ndarray], col_names: List[str], as_5col: bool = True
):
"""
Write out reflectivity header in a format readable by QuickNXS
:param str output_path: output file path
:param ndarray or list data: data to be written
:param list col_names: list of column names
:param bool as_5col: if True, a 5-column ascii will be written (theta is the last column)
"""
with open(output_path, "a") as fd:
# Determine how many columns to write
if isinstance(data, list):
four_cols = True
else:
four_cols = not as_5col and data.shape[1] > 4
# write header
fd.write("# [Data]\n")
if four_cols:
toks = ["%12s" % item for item in col_names[:4]]
else:
toks = ["%12s" % item for item in col_names]
fd.write("# %s\n" % "\t".join(toks))
# write numerical columns
if isinstance(data, list):
# [TOF][pixel][parameter]
for tof_item in data:
for pixel_item in tof_item:
np.savetxt(fd, pixel_item, delimiter="\t", fmt="%-18e")
fd.write("\n")
else:
if four_cols:
np.savetxt(fd, data[:, :4], delimiter=" ", fmt="%-18e")
else:
np.savetxt(fd, data, delimiter="\t", fmt="%-18e")
[docs]
def read_reduced_file(file_path, configuration=None):
"""
Read in configurations from a reduced data file.
:param str file_path: reduced data file
"""
direct_beam_runs = []
data_runs = []
additional_peaks = []
def _get_tok(col_name: str, cols: List[str], toks: List) -> Union[int, None]:
"""Get the item in a list of index matching the column name"""
try:
idx = cols.index(col_name)
return toks[idx]
except ValueError:
return None
# reading is mocked. The file_path is the prefix of the path. File name is obtained from the mocked data
with open(file_path, "r") as file_content:
# Section identifier
# 0: None
# 1: direct beams
# 2: data runs
# 3: additional peak data runs
# 4: global options
_in_section = 0
_file_start = True
for line in file_content.readlines():
if _file_start and not line.startswith("# Datafile created by QuickNXS"):
raise RuntimeError("The selected file does not conform to the QuickNXS format")
_file_start = False
if "Input file indices" in line:
data_file_indices = line
if "[Direct Beam Runs]" in line:
_in_section = 1
elif "[Data Runs]" in line:
_in_section = 2
elif "[Peak 1 Runs]" in line:
# if existing, use this section instead of "[Data Runs]"
_in_section = 2
data_runs = []
elif "[Peak" in line:
_in_section = 3
peak_index = int(line.split("[Peak ")[1].split(" Runs]")[0])
elif "[Global Options]" in line:
_in_section = 4
# Process direct beam runs
if _in_section == 1:
toks = line.split()
if "DB_ID" in toks:
cols = toks
continue
if len(toks) < 14:
continue
try:
if configuration is not None:
conf = copy.deepcopy(configuration)
else:
conf = Configuration()
conf.cut_first_n_points = int(_get_tok("P0", cols, toks))
conf.cut_last_n_points = int(_get_tok("PN", cols, toks))
conf.peak_position = float(_get_tok("x_pos", cols, toks))
conf.peak_width = float(_get_tok("x_width", cols, toks))
conf.low_res_position = float(_get_tok("y_pos", cols, toks))
conf.low_res_width = float(_get_tok("y_width", cols, toks))
conf.bck_position = float(_get_tok("bg_pos", cols, toks))
conf.bck_width = float(_get_tok("bg_width", cols, toks))
conf.direct_pixel_overwrite = float(_get_tok("dpix", cols, toks))
run_number = int(_get_tok("number", cols, toks))
run_file = _get_tok("File", cols, toks)
if not Path(run_file).is_absolute():
run_file = str(Path(file_path).parent / run_file)
# This application only deals with event data, to be able to load
# reduced files created with histo nexus files, we have to
# use the corresponding event file instead.
# Similarly, the number of points cut on each side probably
# doesn't make sense, so reset those options.
if run_file.endswith("histo.nxs"):
run_file = run_file.replace("histo.", "event.")
# conf.cut_first_n_points = 0
# conf.cut_last_n_points = 0
# Catch data files meant for QuickNXS and use the raw file instead
run_file = _find_h5_data(run_file)
direct_beam_runs.append([run_number, run_file, conf])
except:
logging.error("Could not parse reduced data file:\n %s", sys.exc_info()[1])
logging.error(line)
# Process data runs and additional peaks
if _in_section == 2 or _in_section == 3:
toks = line.split()
if "DB_ID" in toks:
cols = toks
continue
if len(toks) < 16:
continue
try:
if configuration is not None:
conf = copy.deepcopy(configuration)
else:
conf = Configuration()
conf.scaling_factor = float(_get_tok("scale", cols, toks))
if (tok := _get_tok("scale_err", cols, toks)) is not None:
has_scaling_error = True
conf.scaling_error = float(tok)
else:
has_scaling_error = False
conf.cut_first_n_points = int(_get_tok("P0", cols, toks))
conf.cut_last_n_points = int(_get_tok("PN", cols, toks))
conf.peak_position = float(_get_tok("x_pos", cols, toks))
conf.peak_width = float(_get_tok("x_width", cols, toks))
conf.low_res_position = float(_get_tok("y_pos", cols, toks))
conf.low_res_width = float(_get_tok("y_width", cols, toks))
conf.bck_position = float(_get_tok("bg_pos", cols, toks))
conf.bck_width = float(_get_tok("bg_width", cols, toks))
fan = _get_tok("fan", cols, toks) or _get_tok("extract_fan", cols, toks)
Configuration.use_constant_q = fan.lower() == "true"
conf.direct_pixel_overwrite = float(_get_tok("dpix", cols, toks))
DB_ID = int(_get_tok("DB_ID", cols, toks))
if DB_ID > 0 and len(direct_beam_runs) > DB_ID - 1:
conf.normalization = direct_beam_runs[DB_ID - 1][0]
run_number = int(_get_tok("number", cols, toks))
run_file = _get_tok("File", cols, toks)
if not Path(run_file).is_absolute():
run_file = str(Path(file_path).parent / run_file)
if run_file.endswith("histo.nxs"):
run_file = run_file.replace("histo.", "event.")
# conf.cut_first_n_points = 0
# conf.cut_last_n_points = 0
run_file = _find_h5_data(run_file)
run_file = determine_which_files_to_sum(run_file, data_file_indices)
if _in_section == 2:
data_runs.append([run_number, run_file, conf])
else:
additional_peaks.append([peak_index, run_number, run_file, conf])
except:
logging.error(
f"Could not parse reduced data file:\n {sys.exc_info()[1]} at line {sys.exc_info()[2].tb_lineno}"
)
logging.error("data_file_indices: %s", data_file_indices)
logging.error("run_file: %s", run_file)
logging.error(line)
# Options
if _in_section == 4:
if line.startswith("# sample_length"):
try:
conf.sample_size = float((line[len("# sample_length") :]).strip())
except:
logging.error("Could not extract sample size: %s" % line)
if line.startswith("# lock_direct_beam_y"):
try:
Configuration.lock_direct_beam_y = (
(line[len("# lock_direct_beam_y") :]).strip().lower() == "true"
)
except:
logging.error("Could not extract direct beam y lock: %s" % line)
return direct_beam_runs, data_runs, additional_peaks, has_scaling_error
[docs]
def determine_which_files_to_sum(run_file, data_file_indices):
# Determeine which files are summed when reading a saved reduction file
# The saved file has the correct run numbers (numors) in the line
# that begins # Input file indices, however the file does not contain the corect paths
# the way the file is read ignores any files that were summed in the processing from which the
# saved file was created.
if "+" in data_file_indices:
runs = str.split(str.split(data_file_indices)[-1], "+")
else:
runs = str.split(str.split(data_file_indices)[-1], ",")
for run in runs:
numors = str.split(run, ":")
if len(numors) > 1 and (str.split(run, ":")[0] in run_file):
outfile = ""
for i in range(int(numors[0]), int(numors[-1]) + 1):
outfile = outfile + "+" + run_file.replace(numors[0], str(i))
outfile = outfile[1:]
if len(numors) == 1 and (str.split(run, ":")[0] in run_file):
outfile = run_file
return outfile