# -*- coding: utf-8 -*-
# pylint: disable=invalid-name, line-too-long, too-many-public-methods, too-many-instance-attributes, wrong-import-order, \
# bare-except, protected-access, too-many-arguments, too-many-statements
"""Manage file-related and UI events."""
import glob
import logging
import math
import os
import time
import traceback
from typing import List, Optional, Union
import numpy as np
from mantid.simpleapi import DeleteWorkspace, LoadEventNexus
from qtpy import QtCore, QtWidgets
from quicknxs.config import Settings
from quicknxs.config.gui import QColors
from quicknxs.interfaces.configuration import BinningType, Configuration
from quicknxs.interfaces.data_handling.data_manipulation import NormalizeToUnityQCutoffError
from quicknxs.interfaces.data_handling.data_set import CrossSectionData, NexusData
from quicknxs.interfaces.data_handling.filepath import FilePath, RunNumbers
from quicknxs.interfaces.data_handling.instrument import InsufficientEventCountError
from quicknxs.interfaces.data_manager import DataManager
from quicknxs.interfaces.enums import DirectBeamTableColumn, ReductionTableColumn
from quicknxs.interfaces.event_handlers.progress_reporter import ProgressReporter
from quicknxs.interfaces.event_handlers.status_bar_handler import StatusBarHandler
from quicknxs.interfaces.event_handlers.widgets import AcceptRejectDialog
from quicknxs.ui.active_radio_button import ActiveDataRadioButton
from quicknxs.ui.binningtype_combobox import BinningTypeSelection
[docs]
class MainHandler:
"""Event handler for the main application window."""
# Index of the direct beam tab in the reduction table tab widget
DIRECT_BEAM_TAB_INDEX = 0
# Index of the first (and always visible) data tab in the reduction table tab widget
MAIN_DATA_TAB_INDEX = 1
def __init__(self, main_window):
self.ui = main_window.ui
self.main_window = main_window
self._data_manager: DataManager = main_window.data_manager
# Create button groups for radio buttons to ensure mutual exclusivity
self.reduction_table_button_groups = {} # {tab_index: QButtonGroup}
self.direct_beam_button_group = QtWidgets.QButtonGroup(self.main_window)
# Initialize button group for the main reduction table
self.reduction_table_button_groups[self.MAIN_DATA_TAB_INDEX] = QtWidgets.QButtonGroup(self.main_window)
# Update file list when changes are made
self._path_watcher = QtCore.QFileSystemWatcher([self._data_manager.current_directory], self.main_window)
self._path_watcher.directoryChanged.connect(self.update_file_list)
self.cache_indicator = QtWidgets.QLabel("Files loaded: 0")
self.cache_indicator.setMargin(5)
self.cache_indicator.setSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Preferred)
self.cache_indicator.setMinimumWidth(110)
self.ui.statusbar.addPermanentWidget(self.cache_indicator)
button = QtWidgets.QPushButton("Empty Cache")
self.ui.statusbar.addPermanentWidget(button)
button.pressed.connect(self.empty_cache)
button.setFlat(False)
button.setMaximumSize(150, 20)
# Create progress bar in statusbar
self.progress_bar = QtWidgets.QProgressBar(self.ui.statusbar)
self.progress_bar.setMinimumSize(20, 14)
self.progress_bar.setMaximumSize(140, 100)
self.ui.statusbar.addPermanentWidget(self.progress_bar)
self.status_bar_handler = StatusBarHandler(self.ui.statusbar)
# Log Level dropdown in statusbar
self.log_level = QtWidgets.QComboBox(self.ui.statusbar)
self.LOG_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
self.log_level.addItems(self.LOG_LEVELS)
self.log_level.setCurrentText(self.get_log_level())
self.log_level.setToolTip(
"Set the logging level\nDefault log level can be set with environment variable QUICKNXS_LOGLEVEL"
)
self.log_level.currentTextChanged.connect(self.change_log_level)
self.ui.statusbar.addWidget(QtWidgets.QLabel("Log Level:"))
self.ui.statusbar.addWidget(self.log_level)
@property
def reduction_table(self) -> QtWidgets.QTableWidget:
"""Returns the active reduction table widget if one of the data tabs is active, else the first one."""
if self.ui.tabWidget.currentIndex() == self.DIRECT_BEAM_TAB_INDEX:
# get the table for the main data tab
current_table = self.ui.tabWidget.widget(self.MAIN_DATA_TAB_INDEX).findChild(QtWidgets.QTableWidget)
else:
current_table = self.ui.tabWidget.currentWidget().findChild(QtWidgets.QTableWidget)
return current_table
@property
def direct_beam_table(self) -> QtWidgets.QTableWidget:
"""Returns the direct beam table widget."""
return self.ui.directBeamTable
[docs]
def get_reduction_table_by_index(self, tab_index: int) -> QtWidgets.QTableWidget:
"""Return the QTableWidget for the data tab with the given index."""
return self.ui.tabWidget.widget(tab_index).findChild(QtWidgets.QTableWidget)
[docs]
def new_progress_reporter(self):
"""Return a progress reporter."""
return ProgressReporter(progress_bar=self.progress_bar, status_bar=self.status_bar_handler)
[docs]
def empty_cache(self):
"""Empty the data cache."""
self._data_manager.clear_cache()
self.cache_indicator.setText("Files loaded: 0")
[docs]
def hide_sidebar(self):
if self.main_window.leftEntries.isHidden():
self.main_window.leftEntries.show()
else:
self.main_window.leftEntries.hide()
[docs]
def hide_run_data(self):
if self.main_window.runDataFrame.isHidden():
self.main_window.runDataFrame.show()
else:
self.main_window.runDataFrame.hide()
[docs]
def hide_data_table(self):
if self.main_window.frame_2.isHidden():
self.main_window.frame_2.show()
else:
self.main_window.frame_2.hide()
[docs]
def open_file(self, file_path: str, force: bool = False, silent: bool = False) -> None:
"""Read one or more data files. If more than one, merge their data.
Parameters
----------
file_path:
Absolute path to data files.
If more than one file, paths are joined with the plus symbol '+'
force:
if true, the file will be reloaded even if it was loaded previously
silent:
if true, the plots currently shown in the interface will NOT be updated
"""
# Actions carried out:
# 1. check the file exists
# 2. Invoke DataManager.load()
# 3. if silent==False, invoke DataManager.file_loaded()
if file_path is None:
self.report_message("No file selected", pop_up=True)
return
for single_file_path in file_path.split(
FilePath.merge_symbol
): # also works when file_path is just the path to one file
if not os.path.isfile(single_file_path):
self.report_message(
"File does not exist",
detailed_message="The following file does not exist:\n %s" % single_file_path,
pop_up=True,
is_error=True,
)
return
t_0 = time.time()
prog = None
self.main_window.auto_change_active = True
try:
self.report_message(f"Loading file(s) {file_path}")
prog = ProgressReporter(progress_bar=self.progress_bar, status_bar=self.status_bar_handler)
configuration = self.get_configuration_from_ui()
self._data_manager.load(file_path, configuration, force=force, progress=prog)
self.report_message(f"Loaded file(s) {self._data_manager.current_file_name}")
except (RuntimeError, InsufficientEventCountError) as run_err:
self.report_message(
f"Error loading file(s) {self._data_manager.current_file_name} due to:\n{run_err}",
detailed_message=str(traceback.format_exc()),
pop_up=True,
is_error=True,
)
self.main_window.auto_change_active = False
# reset progress bar
if prog is not None:
prog.reset()
return
if not silent:
self.file_loaded()
self.main_window.auto_change_active = False
logging.info("DONE: %s sec", time.time() - t_0)
[docs]
def file_loaded(self):
"""Update UI after a file is loaded."""
self.main_window.auto_change_active = True
self._set_data_manager_active_cross_section()
cross_sections = list(self._data_manager.data_sets.keys())
for i, xs in enumerate(cross_sections):
getattr(self.ui, "selectedCrossSection%i" % i).show()
good_label = xs.replace("_", "-")
cross_section_label = self._data_manager.data_sets[xs].cross_section_label
if good_label != cross_section_label:
good_label = f"{good_label}: {cross_section_label}"
getattr(self.ui, "selectedCrossSection%i" % i).setText(good_label)
for i in range(len(cross_sections), 12):
getattr(self.ui, "selectedCrossSection%i" % i).hide()
self.main_window.auto_change_active = False
# Emit signals to update the UI and plots
self.main_window.file_loaded_signal.emit()
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
self.main_window.initiate_projection_plot.emit(False)
self.cache_indicator.setText("Files loaded: %s" % (self._data_manager.get_cachesize()))
[docs]
def active_cross_section_changed(self):
"""Update UI metadata and plots after the active cross section is changed."""
self._set_data_manager_active_cross_section()
self.update_cross_section_info()
self.main_window.plotActiveTab()
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
self.main_window.initiate_projection_plot.emit(False)
def _set_data_manager_active_cross_section(self):
"""Set the data manager's active cross section to the one in the UI."""
self.main_window.auto_change_active = True
current_cross_section = 0
for i in range(12):
if getattr(self.ui, "selectedCrossSection%i" % i).isChecked():
current_cross_section = i
break
success = self._data_manager.set_active_cross_section(current_cross_section)
if not success:
self.ui.selectedCrossSection0.setChecked(True)
self.main_window.auto_change_active = False
def _congruency_fail_report(self, file_paths: List[str], log_names: Optional[List[str]] = None):
"""Check whether these files can be merged.
Parameters
----------
file_paths:
List of Nexus files (full paths)
log_names:
List of log names to compare. If None, all logs from Settings are used.
If a log name is not in Settings, an error message is returned.
Returns
-------
str:
Error message; empty string if no error is found,
"""
assert len(file_paths) > 1, "We require more than one data file in order to compare their metadata"
# Log names validation and collect tolerances
tolerances = dict() # store the tolerance value for each log name
all_tolerances: List[float] = Settings()["OpenSum"]["Tolerances"]
all_log_names: List[str] = Settings()["OpenSum"]["LogNames"]
assert all_log_names # failsafe if structure of settings.json changes
if log_names is None:
log_names = all_log_names
for log_name in log_names:
try:
i = all_log_names.index(log_name)
except ValueError:
return f"{log_name} is not a valid Log for comparison"
tolerances[log_name] = all_tolerances[i]
# simple data structure to collect the log values from all files
log_values = {name: list() for name in log_names}
for file_path in file_paths:
for entry in ["", "-Off_Off", "-On_Off", "-Off_On", "-On_On"]: # for new and old nexus files
workspace = None
try:
workspace = LoadEventNexus(Filename=file_path, NXentryName="entry" + entry, MetaDataOnly=True)
break
except RuntimeError:
continue
if workspace is None:
return f"Could not load {file_path}"
metadata = workspace.getRun()
for log_name in log_names:
try:
log_property = metadata.getProperty(log_name)
except RuntimeError as e:
return e.message
log_values[log_name].append(log_property.getStatistics().mean)
DeleteWorkspace(workspace)
# Find the minimum and maximum values for each log, and compare to the tolerance
message = ""
for log_name, values in log_values.items():
if max(values) - min(values) > tolerances[log_name]:
runs = FilePath(file_paths).run_numbers(string_representation="statement")
message_template = "Runs {0} contain values for log {1} that differ above tolerance {2}"
message = message + message_template.format(runs, log_name, tolerances[log_name]) + "\n"
return message # empty string if no failures
[docs]
def update_tables(self):
"""Update a data set that may be in the reduction table or the direct beam table."""
# Update the reduction table if this data set is in it
idx = self._data_manager.find_active_data_id()
if idx is not None:
table_widget = self.reduction_table
self.update_reduction_table(table_widget, idx, self._data_manager.active_cross_section)
# Update the direct beam table if this data set is in it
idx = self._data_manager.find_active_direct_beam_id()
if idx is not None:
self.update_direct_beam_table(idx, self._data_manager.active_cross_section)
direct_beam = self._data_manager.direct_beam_list[idx]
self.update_reduction_table_from_direct_beam(direct_beam)
[docs]
def update_calculated_data(self):
"""Update the calculated entries in the overview tab.
We should call this after the peak ranges change,
or after a change is made that will affect the displayed results.
"""
d = self._data_manager.active_cross_section
if d is None:
return
self.ui.datasetAi.setText("%.3f°" % (d.scattering_angle))
try:
wl_min, wl_max = d.wavelength_range
except ZeroDivisionError:
wl_min, wl_max = float("NaN"), float("NaN")
self.ui.datasetLambda.setText("%.2f (%.2f-%.2f) Å" % (d.lambda_center, wl_min, wl_max))
# DIRPIX and DANGLE0 overwrite
if self.ui.set_dangle0_checkbox.isChecked():
dangle0 = "%.3f° (%.3f°)" % (float(self.ui.dangle0Overwrite.text()), d._angle_offset)
else:
dangle0 = "%.3f°" % (d.angle_offset)
self.ui.datasetDangle0.setText(dangle0)
if self.ui.set_dirpix_checkbox.isChecked():
dpix = "%.1f (%.1f)" % (float(self.ui.directPixelOverwrite.value()), d._direct_pixel)
else:
dpix = "%.1f" % d.direct_pixel
self.ui.datasetDirectPixel.setText(dpix)
if d.configuration.direct_beam is not None:
self.ui.matched_direct_beam_label.setText("%s" % d.configuration.direct_beam)
else:
self.ui.matched_direct_beam_label.setText("None")
[docs]
def update_info(self):
"""Update metadata shown in the overview tab."""
self.main_window.auto_change_active = True
d = self._data_manager.active_cross_section
self.populate_from_configuration(d.configuration)
self.main_window.initiate_projection_plot.emit(False)
QtWidgets.QApplication.instance().processEvents()
if self.ui.set_dangle0_checkbox.isChecked():
dangle0 = "%.3f° (%.3f°)" % (float(self.ui.dangle0Overwrite.text()), d.angle_offset)
else:
dangle0 = "%.3f°" % (d.angle_offset)
if self.ui.set_dirpix_checkbox.isChecked():
dpix = "%.1f (%.1f)" % (float(self.ui.directPixelOverwrite.value()), d.direct_pixel)
else:
dpix = "%.1f" % d.direct_pixel
wl_min, wl_max = d.wavelength_range
self.ui.datasetLambda.setText("%.2f (%.2f-%.2f) Å" % (d.lambda_center, wl_min, wl_max))
self.ui.datasetPCharge.setText("%.3e" % d.proton_charge)
self.ui.datasetTime.setText("%i s" % d.total_time)
self.ui.datasetTotCounts.setText("%.4e" % d.total_counts)
try:
self.ui.datasetRate.setText("%.1f cps" % (d.total_counts / d.total_time))
except ZeroDivisionError:
self.ui.datasetRate.setText("NaN")
self.ui.datasetDangle.setText("%.3f°" % d.dangle)
self.ui.datasetDangle0.setText(dangle0)
self.ui.datasetSangle.setText("%.3f°" % d.sangle)
self.ui.datasetDirectPixel.setText(dpix)
self.ui.currentCrossSection.setText(
"<b>%s</b> (%s) Type: %s Current State: "
"<b>%s</b>" % (d.number, d.experiment, d.measurement_type, d.name)
)
# Update direct beam indicator
if d.is_direct_beam:
self.ui.is_direct_beam_label.setText("Direct beam")
else:
self.ui.is_direct_beam_label.setText("")
# Update the calculated data
self.update_calculated_data()
self.ui.roi_used_value.setText("%s" % d.use_roi_actual)
self.ui.roi_peak_value.setText(str(d.configuration.metadata_roi_peak))
self.ui.roi_bck_value.setText(str(d.configuration.metadata_roi_bck))
# Update reduction tables
self.update_tables()
self.active_data_changed()
self.main_window.auto_change_active = False
[docs]
def update_cross_section_info(self):
"""Update cross section metadata shown in the overview tab."""
# Update cross-section specific information in the overview tab
d = self._data_manager.active_cross_section
self.ui.datasetPCharge.setText("%.3e" % d.proton_charge)
self.ui.datasetTime.setText("%i s" % d.total_time)
self.ui.datasetTotCounts.setText("%.4e" % d.total_counts)
try:
self.ui.datasetRate.setText("%.1f cps" % (d.total_counts / d.total_time))
except ZeroDivisionError:
self.ui.datasetRate.setText("NaN")
self.ui.currentCrossSection.setText(
"<b>%s</b> (%s) Type: %s Current State: "
"<b>%s</b>" % (d.number, d.experiment, d.measurement_type, d.name)
)
# Update the calculated data
self.update_calculated_data()
[docs]
def update_file_list(self, query_path: Optional[str] = None) -> None:
"""Update the list of data files.
Parameters
----------
query_path:
Full path of a directory, a Nexus file, or a list of Nexus files.
If a list of files, their paths are joined by the plus symbol '+'.
"""
def _split_composites():
"""Split the list of files in widget self.ui.file_list into a list of single files and a list of composite files."""
singles, composites = list(), list()
for i in range(self.ui.file_list.count()):
file_base_name = self.ui.file_list.item(i).text()
if FilePath.merge_symbol in file_base_name:
composites.append(file_base_name)
else:
singles.append(file_base_name)
return singles, composites
def _updated_current_list():
r"""Most updated list of single and composite files from the current directory."""
_, composites = _split_composites()
return sorted(self._data_manager.current_event_files + composites)
def _reset_ui_file_list(fresh_list):
r"""Reset widget self.ui.file_list and highlight the current file_name."""
self.ui.file_list.clear() # Reset ui.file_list, a QtWidgets.QListWidget object
assert isinstance(fresh_list, list), f"fresh_list must be list but not {type(fresh_list)}"
for item in fresh_list:
listitem = QtWidgets.QListWidgetItem(item, self.ui.file_list)
if item == self._data_manager.current_file_name:
# Changing the current selection will trigger self.main_window.file_open_from_list() to be called,
# however, the current setting self.main_window.auto_change_active == True will cause
# self.main_window.file_open_from_list() to return before any statement is executed
self.ui.file_list.setCurrentItem(listitem)
def _update_current_directory(new_dir):
r"""Update the directory path in the main window and the path watcher."""
self.main_window.settings.setValue("current_directory", new_dir)
self._path_watcher.removePath(self._data_manager.current_directory)
self._data_manager.current_directory = new_dir
self._path_watcher.addPath(self._data_manager.current_directory)
# This setting prevents automatic read-in of the currently selected item in the list
# when the currently selected items changes due to the list update.
self.main_window.auto_change_active = True
file_path = None if query_path is None else FilePath(query_path)
# Use case 1: the contents of the current directory may have changed with the addition of new
# event files. This could happen if the experiment is running, producing new event files.
if file_path is None:
new_list = _updated_current_list()
# Use case 2: a composite from using Open Sum
elif file_path.is_composite:
file_dir, file_name = file_path.split()
self._data_manager.current_file_name = file_path.basename
# Use case 2.1: the composite is made up of files in the current directory
if file_dir == self._data_manager.current_directory:
new_list = sorted(_updated_current_list() + [file_path.basename])
# Use case 2.2: the composite is made up of files in a new directory
else:
_update_current_directory(file_dir)
new_list = sorted(self._data_manager.current_event_files + [file_path.basename])
# Use case 3: a single path pointing to a file or a directory
else:
# Use case 3.1: a single path pointing to a new directory
if os.path.isdir(query_path):
file_dir = query_path
if file_dir != self._data_manager.current_directory: # User changed directory
_update_current_directory(file_dir)
self._data_manager.current_file_name = self._data_manager.current_event_files[0]
new_list = self._data_manager.current_event_files
else:
# TODO FIXME discovered from #93. This path is not checked and thus problematic
new_list = None
# Use case 3.2: a single path pointing to a file in the current or new directory
else:
file_dir, file_name = file_path.split()
self._data_manager.current_file_name = file_name
if file_dir == self._data_manager.current_directory: # User selected a new file in the directory
new_list = _updated_current_list()
else: # User selected a new file in a new directory
_update_current_directory(file_dir)
new_list = self._data_manager.current_event_files
# TODO FIXME - new_list is not None has not been defined by stakeholder
if new_list is not None:
_reset_ui_file_list(new_list)
self.main_window.auto_change_active = False
[docs]
def automated_file_selection(self):
"""Automatically select files in the current directory based on incident angle.
Go through the files in the current in order of run numbers,
and load files until the incident angle is no longer increasing.
"""
self.main_window.auto_change_active = True
# Update the list of files
event_file_list = glob.glob(os.path.join(self._data_manager.current_directory, "*event.nxs"))
h5_file_list = glob.glob(os.path.join(self._data_manager.current_directory, "*.nxs.h5"))
event_file_list.extend(h5_file_list)
event_file_list.sort()
event_file_list = [os.path.basename(name) for name in event_file_list]
current_file_found = False
n_count = 0
logging.error("Current file: %s", self._data_manager.current_file_name)
q_current = self._data_manager.extract_metadata().mid_q
# Add the current data set to the reduction table
# Do nothing if the data is incompatible
is_direct_beam = self._data_manager.active_cross_section.is_direct_beam
if is_direct_beam:
if not self.add_direct_beam():
return
else:
if not self.add_reflectivity():
return
for f in event_file_list:
file_path = str(os.path.join(self._data_manager.current_directory, f))
if current_file_found and n_count < 10:
n_count += 1
metadata = self._data_manager.extract_metadata(file_path)
if q_current <= metadata.mid_q and is_direct_beam == metadata.is_direct_beam:
q_current = metadata.mid_q
self.open_file(file_path, silent=True)
d = self._data_manager.active_cross_section
# If we find data of another type, stop here
if not is_direct_beam == self._data_manager.active_cross_section.is_direct_beam:
break
self.main_window.auto_change_active = True
self.populate_from_configuration(d.configuration)
if self._data_manager.active_cross_section.is_direct_beam:
self.add_direct_beam()
else:
self.add_reflectivity()
if f == self._data_manager.current_file_name:
current_file_found = True
# At the very end, update the UI and plot reflectivity
if n_count > 0:
self.main_window.auto_change_active = True
self.file_loaded()
self.main_window.auto_change_active = False
[docs]
def open_reduced_file_dialog(self):
"""Open a reduced file and all the data files needed to reproduce it."""
# Open file dialog
filter_ = "QuickNXS files (*.dat);;All (*.*)"
output_dir = self.main_window.settings.value("output_directory", os.path.expanduser("~"))
file_path, _ = QtWidgets.QFileDialog.getOpenFileName(
self.main_window, "Open reduced file...", directory=output_dir, filter=filter_
)
if file_path:
t_0 = time.time()
# Clear the reduction lists first so that we don't create problems later
self.main_window.reset_data_tabs()
self.clear_direct_beams()
self.clear_reflectivity()
configuration = self.get_configuration_from_ui()
prog = self.new_progress_reporter()
self._data_manager.load_data_from_reduced_file(file_path, configuration=configuration, progress=prog)
# Update output directory
file_dir, _ = os.path.split(str(file_path))
self.main_window.settings.setValue("output_directory", file_dir)
self.main_window.auto_change_active = True
# Update UI direct beam table
self.ui.directBeamTable.setRowCount(len(self._data_manager.direct_beam_list))
for idx, _ in enumerate(self._data_manager.direct_beam_list):
self._data_manager.set_active_data_from_direct_beam_list(idx)
self.update_direct_beam_table(idx, self._data_manager.active_cross_section)
# Update UI data table(s) with the loaded data
for ipeak, _ in self._data_manager.peak_reduction_lists.items():
self._data_manager.set_active_reduction_list_index(ipeak)
self.main_window.add_data_tab_by_index(ipeak)
table_widget = self.get_reduction_table_by_index(ipeak)
table_widget.setRowCount(len(self._data_manager.reduction_list))
for idx, _ in enumerate(self._data_manager.reduction_list):
self._data_manager.set_active_data_from_reduction_list(idx)
self.update_reduction_table(table_widget, idx, self._data_manager.active_cross_section)
# Set the first reduction table and its first run as the active (plotted) data
self._data_manager.set_active_reduction_list_index(self._data_manager.MAIN_REDUCTION_LIST_INDEX)
self._data_manager.set_active_data_from_reduction_list(0)
direct_beam_ids = [str(r.number) for r in self._data_manager.direct_beam_list]
self.ui.direct_beam_list_label.setText(", ".join(direct_beam_ids))
self.file_loaded()
if self._data_manager.active_cross_section is not None:
self.populate_from_configuration(self._data_manager.active_cross_section.configuration)
self.update_file_list(self._data_manager.current_file)
self.main_window.auto_change_active = False
logging.info("UI updated: %s", time.time() - t_0)
[docs]
def initialize_additional_reduction_table(self, tab_index: int):
"""Initialize new reduction table from the main reduction table.
Parameters
----------
tab_index: int
Index of the additional tab/peak
"""
# Create a new button group for this reduction table
if tab_index not in self.reduction_table_button_groups:
self.reduction_table_button_groups[tab_index] = QtWidgets.QButtonGroup(self.main_window)
if self._data_manager.main_reduction_list:
table_widget = self.get_reduction_table_by_index(tab_index)
table_widget.setRowCount(len(self._data_manager.main_reduction_list))
active_cross_section_name: str = self._data_manager.active_cross_section.name
for idx, nexus_data in enumerate(self._data_manager.main_reduction_list):
active_cross_section = nexus_data.cross_sections[active_cross_section_name]
self.update_reduction_table(table_widget, idx, active_cross_section)
def _file_open_dialog(self, filter_: Optional[str] = None) -> Optional[str]:
"""Pop a File dialog window for the user to select one file.
Parameters
----------
filter_:
Show files with only selected extensions
Returns
-------
str:
Absolute path to the selected file
"""
file_path, _ = QtWidgets.QFileDialog.getOpenFileName(
self.main_window, "Open NXS file...", directory=self._data_manager.current_directory, filter=filter_
)
return file_path
def _file_open_sum_dialog(self, filter_: Optional[str] = None) -> Optional[str]:
"""Open a File dialog Window for the user to select two or more files.
Congruency among the selected files is checked by comparing the values of selected metadata.
User is asked to override if congruency fails.
Parameters
----------
filter_:
Show files with only selected extensions
Returns
-------
str:
Absolute paths to the selected files, joined by the plus symbol '+'
"""
file_paths, _ = QtWidgets.QFileDialog.getOpenFileNames(
self.main_window,
"Select multiple NXS files to sum before data reduction.",
directory=self._data_manager.current_directory,
filter=filter_,
)
# user cancel operation
if len(file_paths) == 0:
return
elif len(file_paths) == 1:
self.report_message("Merge mode must have more than 1 file selected")
return
# check whether files can be merged without further notice
message = self._congruency_fail_report(file_paths, log_names=None)
if message and not self._user_gives_permission(message):
return
return FilePath(file_paths).path
def _process_file_path(self, dialog_opening_method: str) -> None:
"""Wrapper of the opening-file dialogs.
This wrapper defines the extension of the files to be shown by the file dialog,
and process the file(s) selected by the user.
It updates the file list widget as well as reads-in the file(s)
"""
if self.ui.histogramActive.isChecked():
filter_ = "All (*.*);;histo.nxs (*histo.nxs)"
else:
filter_ = "All (*.*);;nxs.h5 (*nxs.h5);;event.nxs (*event.nxs)"
file_path = getattr(self, dialog_opening_method)(filter_=filter_)
if file_path:
self.update_file_list(file_path)
self.open_file(file_path)
[docs]
def file_open_dialog(self):
"""GUI callback for backend MainHandler._file_open_dialog."""
self._process_file_path("_file_open_dialog")
[docs]
def file_open_sum_dialog(self):
"""GUI callback for backend MainHandler._file_open_sum_dialog."""
self._process_file_path("_file_open_sum_dialog")
def _user_gives_permission(self, message: str) -> bool:
"""Ask user's permission to proceed or quit if the select runs do not have same sample logs."""
message += ".\nProceed with Open Sum?"
dialog = AcceptRejectDialog(self.main_window, title="Open Sum Confirmation", message=message)
proceed = dialog.exec_()
return proceed
[docs]
def open_run_number(self, number: Union[List[int], List[str], int, str, None] = None):
"""Open a data file by typing a run number or a composite run number for merging data sets.
Example
-------
"120:123+125+127:132" opens files with run numbers from 120 to 132 except 124 and 126
"""
self.main_window.auto_change_active = True
if number is None:
number = str(self.ui.numberSearchEntry.text()) # cast from unicode to string
if number == "":
self.report_message("No run number entered", pop_up=True)
return
QtWidgets.QApplication.instance().processEvents()
run_numbers = RunNumbers(number)
file_list = list()
# Look for new-style nexus file name
configuration = self.get_configuration_from_ui()
for run_number in run_numbers.numbers:
search_string = configuration.instrument.file_search_template % run_number
matches = glob.glob(search_string + ".nxs.h5") # type: Optional[List[str]]
if not matches: # Look for old-style nexus file name
search_string = configuration.instrument.legacy_search_template % run_number
matches = glob.glob(search_string + "_event.nxs")
if not matches:
self.report_message("Could not locate run number %s" % run_number, pop_up=True)
return
file_list.append(matches[0]) # there should be only one match, since we query with one run number
self.ui.numberSearchEntry.setText("") # empty the contents of in the LineEdit widget
success = False
if len(file_list) > 0:
file_path = FilePath(file_list).path # single path or a composite of file paths
# If opening more than one file, check whether files can be merged
if len(file_list) > 1:
message = self._congruency_fail_report(file_list, log_names=None)
if message and not self._user_gives_permission(message):
return
self.update_file_list(file_path)
self.open_file(file_path)
success = True
else:
self.report_message("Could not locate one or more of file(s) %s" % run_numbers.short, pop_up=True)
self.main_window.auto_change_active = False
return success
[docs]
def update_daslog(self):
"""Write parameters from all file daslogs to the table in the daslog tab."""
table = self.ui.daslogTableBox
table.setRowCount(0)
table.sortItems(-1)
table.setColumnCount(len(self._data_manager.data_sets) + 2)
table.setHorizontalHeaderLabels(["Name"] + list(self._data_manager.data_sets.keys()) + ["Unit"])
for j, key in enumerate(sorted(self._data_manager.active_cross_section.logs.keys(), key=lambda s: s.lower())):
table.insertRow(j)
table.setItem(j, 0, QtWidgets.QTableWidgetItem(key))
table.setItem(
j,
len(self._data_manager.data_sets) + 1,
QtWidgets.QTableWidgetItem(self._data_manager.active_cross_section.log_units[key]),
)
i = 0
for xs in self._data_manager.data_sets:
item = QtWidgets.QTableWidgetItem("%g" % self._data_manager.data_sets[xs].logs[key])
item.setToolTip("MIN: %g MAX: %g" % (self._data_manager.data_sets[xs].log_minmax[key]))
table.setItem(j, i + 1, item)
i += 1
table.resizeColumnsToContents()
[docs]
def add_reflectivity(self, silent=False):
"""Collect information about the current extraction settings and store them in the list of reduction items.
Returns
-------
bool:
True if everything is ok, false otherwise.
"""
# Update the configuration according to current parameters
# Note that when a data set is first loaded, the peaks may have a different
# range for each cross-section. If the option to use a common set of ranges
# was turned on, we pick the ranges from the currently active cross-section
# and apply then to all cross-sections.
if self.ui.action_use_common_ranges.isChecked():
config = self.get_configuration_from_ui()
self._data_manager.update_configuration(configuration=config, active_only=False)
# Verify that the new data is consistent with existing data in the table
if not self._data_manager.add_active_to_reduction():
if not silent:
self.report_message("(Add reflectivity) Data incompatible or already in the list.", pop_up=True)
return False
self.main_window.auto_change_active = True
idx = self._data_manager.find_data_in_reduction_list(self._data_manager._nexus_data)
if idx is None:
raise RuntimeError("It could be None but not likely")
self.ui.reductionTable.insertRow(idx)
self.update_tables()
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
self.main_window.update_specular_viewer.emit()
self.main_window.auto_change_active = False
return True
[docs]
def update_reduction_table(self, table_widget: QtWidgets.QTableWidget, idx: int, data: CrossSectionData):
"""Update the reduction table.
Parameters
----------
table_widget:
Table widget of the table to update
idx:
Row to update
data:
Cross-section data
"""
self.main_window.auto_change_active = True
# Get the current tab index to use the correct button group
current_tab_index = self.ui.tabWidget.currentIndex()
if current_tab_index not in self.reduction_table_button_groups:
self.reduction_table_button_groups[current_tab_index] = QtWidgets.QButtonGroup(self.main_window)
button_group = self.reduction_table_button_groups[current_tab_index]
# radio button for active data (layout inside a widget to center it)
radio_widget = ActiveDataRadioButton(self, is_active=(data == self._data_manager.active_cross_section), idx=idx)
button_group.addButton(radio_widget.radio_button)
table_widget.setCellWidget(idx, ReductionTableColumn.ACTIVE, radio_widget)
item = QtWidgets.QTableWidgetItem(str(data.number))
if data == self._data_manager.active_cross_section:
item.setBackground(QColors.yellow)
else:
item.setBackground(QColors.white)
# Set the item to be non-editable (bitwise AND with the negation of the editable flag)
item.setFlags(item.flags() & ~QtCore.Qt.ItemFlag.ItemIsEditable)
table_widget.setItem(idx, ReductionTableColumn.RUN_NUMBER, item)
# Slice column (non-editable)
if idx < len(self._data_manager.reduction_list):
nexus_data = self._data_manager.reduction_list[idx]
slice_item = QtWidgets.QTableWidgetItem(str(nexus_data.slice))
else:
# Fallback if index is out of bounds (shouldn't happen but defensive programming)
slice_item = QtWidgets.QTableWidgetItem("0")
slice_item.setFlags(slice_item.flags() & ~QtCore.Qt.ItemFlag.ItemIsEditable)
table_widget.setItem(idx, ReductionTableColumn.SLICE, slice_item)
table_widget.setItem(
idx,
ReductionTableColumn.SCALE_FACTOR,
QtWidgets.QTableWidgetItem("%.4f" % (data.configuration.scaling_factor)),
)
table_widget.setItem(
idx, ReductionTableColumn.NUM_LEFT, QtWidgets.QTableWidgetItem(str(data.configuration.cut_first_n_points))
)
table_widget.setItem(
idx, ReductionTableColumn.NUM_RIGHT, QtWidgets.QTableWidgetItem(str(data.configuration.cut_last_n_points))
)
item = QtWidgets.QTableWidgetItem(str(data.configuration.peak_position))
item.setBackground(QColors.dark_grey)
table_widget.setItem(idx, ReductionTableColumn.PEAK_POSITION, item)
table_widget.setItem(
idx, ReductionTableColumn.PEAK_WIDTH, QtWidgets.QTableWidgetItem(str(data.configuration.peak_width))
)
item = QtWidgets.QTableWidgetItem(str(data.configuration.low_res_position))
item.setBackground(QColors.dark_grey)
table_widget.setItem(idx, ReductionTableColumn.LOW_RES_POSITION, item)
table_widget.setItem(
idx, ReductionTableColumn.LOW_RES_WIDTH, QtWidgets.QTableWidgetItem(str(data.configuration.low_res_width))
)
item = QtWidgets.QTableWidgetItem(str(data.configuration.bck_position))
item.setBackground(QColors.dark_grey)
table_widget.setItem(idx, ReductionTableColumn.BCK_POSITION, item)
table_widget.setItem(
idx, ReductionTableColumn.BCK_WIDTH, QtWidgets.QTableWidgetItem(str(data.configuration.bck_width))
)
table_widget.setItem(idx, ReductionTableColumn.DPIX, QtWidgets.QTableWidgetItem(str(data.direct_pixel)))
item = QtWidgets.QTableWidgetItem("%.4f" % data.scattering_angle)
item.setFlags(item.flags() & ~QtCore.Qt.ItemFlag.ItemIsEditable)
table_widget.setItem(idx, ReductionTableColumn.THETA, item)
direct_beam = "none"
if data.configuration.direct_beam is not None:
direct_beam = data.configuration.direct_beam
table_widget.setItem(idx, ReductionTableColumn.DIRECT_BEAM, QtWidgets.QTableWidgetItem(str(direct_beam)))
# Binning type column
combobox = BinningTypeSelection(on_change_handler=self.reduction_table_binning_type_changed, row=idx)
combobox.blockSignals(True)
combobox.setCurrentIndex(data.configuration.binning_type_run)
combobox.blockSignals(False)
table_widget.setCellWidget(idx, ReductionTableColumn.BINNING_TYPE, combobox)
# Q steps column
item = QtWidgets.QTableWidgetItem(f"{data.configuration.binning_q_step_run:.3f}")
if data.configuration.binning_type_run == BinningType.NONE:
# indicate Q steps is not used
item.setForeground(QColors.dark_grey)
item.setBackground(QColors.light_grey)
table_widget.setItem(idx, ReductionTableColumn.Q_STEPS, item)
self.main_window.auto_change_active = False
[docs]
def clear_reflectivity(self):
"""Remove all items from the reduction lists."""
# clear the reduction lists in the data manager
self._data_manager.clear_reduction_lists()
# clear the reflectivity UI table widgets
for itab in range(self.ui.tabWidget.count()):
if itab == self.DIRECT_BEAM_TAB_INDEX:
continue
tab_widget = self.get_reduction_table_by_index(itab)
tab_widget.setRowCount(0)
# remove additional data tabs
self.main_window.reset_data_tabs()
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
[docs]
def remove_reflectivity(self):
"""Remove one item from the reduction list."""
index = self.reduction_table.currentRow()
if index < 0:
return
self._data_manager.reduction_list.pop(index)
self.reduction_table.removeRow(index)
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
[docs]
def reduction_table_changed(self, item: QtWidgets.QTableWidgetItem):
"""Perform action upon change in data reduction list.
Parameters
----------
item : QtWidgets.QTableWidgetItem
The changed table item
"""
if self.main_window.auto_change_active:
return
row = item.row()
column = item.column()
column = ReductionTableColumn(column)
refl = self._data_manager.reduction_list[row]
keys = {
# ReductionTableColumn.ACTIVE: "active",
# ReductionTableColumn.RUN_NUMBER: "number", ## run number, not editable
ReductionTableColumn.SCALE_FACTOR: "scaling_factor",
ReductionTableColumn.NUM_LEFT: "cut_first_n_points",
ReductionTableColumn.NUM_RIGHT: "cut_last_n_points",
ReductionTableColumn.PEAK_POSITION: "peak_position",
ReductionTableColumn.PEAK_WIDTH: "peak_width",
ReductionTableColumn.LOW_RES_POSITION: "low_res_position",
ReductionTableColumn.LOW_RES_WIDTH: "low_res_width",
ReductionTableColumn.BCK_POSITION: "bck_position",
ReductionTableColumn.BCK_WIDTH: "bck_width",
ReductionTableColumn.DPIX: "direct_pixel_overwrite",
# ReductionTableColumn.THETA: "scattering_angle",
ReductionTableColumn.DIRECT_BEAM: "direct_beam",
# ReductionTableColumn.BINNING_TYPE: "binning_type_run", ## handled by combobox
ReductionTableColumn.Q_STEPS: "binning_q_step_run",
}
if column not in keys:
return
# Update settings from selected option using match/case
match column:
case (
ReductionTableColumn.SCALE_FACTOR
| ReductionTableColumn.PEAK_POSITION
| ReductionTableColumn.PEAK_WIDTH
| ReductionTableColumn.LOW_RES_POSITION
| ReductionTableColumn.LOW_RES_WIDTH
| ReductionTableColumn.BCK_POSITION
| ReductionTableColumn.BCK_WIDTH
| ReductionTableColumn.DPIX
):
refl.set_parameter(keys[column], float(item.text()))
case ReductionTableColumn.NUM_LEFT | ReductionTableColumn.NUM_RIGHT:
refl.set_parameter(keys[column], int(item.text()))
case ReductionTableColumn.DIRECT_BEAM:
direct_beam_name = item.text()
direct_beam = self._data_manager.find_direct_beam_by_name(direct_beam_name)
if direct_beam:
refl.set_parameter(keys[column], direct_beam_name)
# use direct beam peak position as dpix overwrite for matched ref runs
dpix = direct_beam.get_parameter("peak_position")
refl.set_parameter("direct_pixel_overwrite", dpix)
dpix_item = self.reduction_table.item(row, ReductionTableColumn.DPIX)
dpix_item.setText(str(dpix))
else:
self.report_message("Not a valid direct beam run number, or the direct beam is not in the list.")
refl.set_parameter(keys[column], None)
self.main_window.auto_change_active = True
item.setText("none")
self.main_window.auto_change_active = False
# TODO: reset dpix overwrite to DAS value? (Glass)
case ReductionTableColumn.Q_STEPS:
try:
new_value = round(float(item.text()), 3)
if -0.1 <= new_value <= 0.1:
refl.set_parameter(keys[column], new_value)
except:
refl.set_parameter(keys[column], None)
recalculate = (
False
if column
in [
ReductionTableColumn.SCALE_FACTOR,
ReductionTableColumn.NUM_LEFT,
ReductionTableColumn.NUM_RIGHT,
]
else True
)
# Recalculate and replot
self.reduction_table_cell_changed(refl, recalculate)
[docs]
def reduction_table_binning_type_changed(self, combobox_index: int, row: int):
"""Perform action upon change in binning type column in the UI reduction table.
Parameters
----------
combobox_index : int
The selected index in the combobox
row : int
The row in the reduction table to update.
"""
# update the configuration state
binning_type = BinningType(combobox_index)
nexus_data = self._data_manager.reduction_list[row]
nexus_data.set_parameter("binning_type_run", binning_type)
# recalculate and replot
self.reduction_table_cell_changed(nexus_data, recalculate=True)
[docs]
def reduction_table_cell_changed(self, refl: NexusData, recalculate: bool = True):
"""Perform action upon change in UI reduction table.
Updates the internal configuration state and recalculates the reflectivity.
Parameters
----------
refl : NexusData
The data set to update.
recalculate : bool, optional
Whether to recalculate the reflectivity, by default True
"""
# Update calculated data
refl.update_calculated_values()
# If the changed data set is the active data, also change the UI
if self._data_manager.is_active(refl):
self.main_window.auto_change_active = True
self.update_info()
self.main_window.auto_change_active = False
elif not refl.is_direct_beam():
# If the changed data set is another data run, only need to update the UI reduction table,
# to take into account changes in one column affecting another column
table_widget = self.reduction_table
idx = self._data_manager.find_data_in_reduction_list(refl)
active_cross_section_name: str = self._data_manager.active_cross_section.name
active_cross_section = refl.cross_sections[active_cross_section_name]
self.update_reduction_table(table_widget, idx, active_cross_section)
# Update the direct beam table if this data set is in it
idx = self._data_manager.find_data_in_direct_beam_list(refl)
if idx is not None:
cross_sections = list(refl.cross_sections.keys())
self.update_direct_beam_table(idx, refl.cross_sections[cross_sections[0]])
# Only recalculate if we need to, otherwise just replot
if recalculate:
try:
self._data_manager.calculate_reflectivity(nexus_data=refl)
except:
self.report_message(
"Could not compute reflectivity for %s" % self._data_manager.current_file_name,
detailed_message=str(traceback.format_exc()),
pop_up=False,
is_error=False,
)
self.main_window.plotActiveTab()
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
self.main_window.update_specular_viewer.emit()
[docs]
def add_direct_beam(self, silent=False):
"""Add dataset to the direct beam table."""
# Update all cross-section parameters as needed.
if self.ui.action_use_common_ranges.isChecked():
config = self.get_configuration_from_ui()
self._data_manager.update_configuration(configuration=config, active_only=False)
# Verify that the new data is consistent with existing data in the table
add_result = self._data_manager.add_active_to_direct_beam_list()
if add_result == 0:
# Run was not added (already in the list)
if not silent:
self.report_message("(Add direct beam) Data already in the list.", pop_up=True)
return False
elif add_result == 1:
# Run was added but is not a true direct beam
if not silent:
self.report_message(
f"Run {self._data_manager._nexus_data.number} added to direct beam list.\n\n"
"Note: This run is not labeled as a direct beam in the metadata "
"(data_type PV ≠ 1). This may occur for runs started with 'Start RUN' "
"command in EPICS.",
pop_up=False,
)
# else: add_result == 2, run was added and is a true direct beam (no warning needed)
# The direct beam list has been appended with a new direct beam - add it to the UI table
direct_beam_count = len(self._data_manager.direct_beam_list)
self.ui.directBeamTable.setRowCount(direct_beam_count)
idx = direct_beam_count - 1
direct_beam_data = self._data_manager.direct_beam_list[idx].get_main_cross_section_data()
self.update_direct_beam_table(idx, direct_beam_data)
direct_beam_ids = [str(r.number) for r in self._data_manager.direct_beam_list]
self.ui.direct_beam_list_label.setText(", ".join(direct_beam_ids))
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
return True
[docs]
def remove_direct_beam(self):
"""Remove one item from the direct beam list."""
index = self.ui.directBeamTable.currentRow()
if index < 0:
return
self._data_manager.direct_beam_list.pop(index)
self.ui.directBeamTable.removeRow(index)
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
[docs]
def clear_direct_beams(self):
"""Remove all items from the direct beam list."""
self._data_manager.clear_direct_beam_list()
self.ui.directBeamTable.setRowCount(0)
self.ui.direct_beam_list_label.setText("None")
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
[docs]
def update_reduction_table_from_direct_beam(self, direct_beam: NexusData):
"""Update all reflectivity runs that use the given direct beam.
Parameters
----------
direct_beam:
Direct beam data
"""
matched_runs = [
refl
for refl in self._data_manager.reduction_list
if str(refl.get_parameter("direct_beam")) == str(direct_beam.number)
]
dpix = direct_beam.get_parameter("peak_position")
for refl in matched_runs:
# update the overwrite value for all runs with this direct beam
refl.set_parameter("direct_pixel_overwrite", dpix)
# only update the UI table for runs with set_direct_pixel ("overwrite") enabled,
# if it is disabled, the value from the DAS will be used and shown instead
if refl.get_parameter("set_direct_pixel"):
idx = self._data_manager.find_data_in_reduction_list(refl)
dpix_item = self.reduction_table.item(idx, ReductionTableColumn.DPIX)
self.main_window.auto_change_active = True
dpix_item.setText(str(dpix))
self.main_window.auto_change_active = False
[docs]
def update_direct_beam_table(self, idx: int, data: CrossSectionData) -> None:
"""Update a direct beam table entry with cross-section data.
Parameters
----------
idx:
Row to update
data:
Cross-section data
"""
# Block signals to prevent recursion
self.main_window.auto_change_active = True
# radio button for active data (layout inside a widget to center it)
radio_widget = ActiveDataRadioButton(
self, is_active=(data == self._data_manager.active_cross_section), idx=idx, is_direct_beam=True
)
self.direct_beam_button_group.addButton(radio_widget.radio_button)
self.ui.directBeamTable.setCellWidget(idx, DirectBeamTableColumn.ACTIVE, radio_widget)
item = QtWidgets.QTableWidgetItem(str(data.number))
item.setFlags(item.flags() & ~QtCore.Qt.ItemFlag.ItemIsEditable)
if data == self._data_manager.active_cross_section:
item.setBackground(QColors.yellow)
else:
item.setBackground(QColors.white)
self.ui.directBeamTable.setItem(idx, DirectBeamTableColumn.RUN_NUMBER, item)
item = QtWidgets.QTableWidgetItem(str(data.configuration.peak_position))
item.setBackground(QColors.dark_grey)
self.ui.directBeamTable.setItem(idx, DirectBeamTableColumn.PEAK_POSITION, item)
self.ui.directBeamTable.setItem(
idx, DirectBeamTableColumn.PEAK_WIDTH, QtWidgets.QTableWidgetItem(str(data.configuration.peak_width))
)
item = QtWidgets.QTableWidgetItem(str(data.configuration.low_res_position))
item.setBackground(QColors.dark_grey)
self.ui.directBeamTable.setItem(idx, DirectBeamTableColumn.LOW_RES_POSITION, item)
self.ui.directBeamTable.setItem(
idx, DirectBeamTableColumn.LOW_RES_WIDTH, QtWidgets.QTableWidgetItem(str(data.configuration.low_res_width))
)
item = QtWidgets.QTableWidgetItem(str(data.configuration.bck_position))
item.setBackground(QColors.dark_grey)
self.ui.directBeamTable.setItem(idx, DirectBeamTableColumn.BCK_POSITION, item)
self.ui.directBeamTable.setItem(
idx, DirectBeamTableColumn.BCK_WIDTH, QtWidgets.QTableWidgetItem(str(data.configuration.bck_width))
)
wl = "%s - %s" % (data.wavelength[0], data.wavelength[-1])
item = QtWidgets.QTableWidgetItem(wl)
item.setFlags(item.flags() & ~QtCore.Qt.ItemFlag.ItemIsEditable)
self.ui.directBeamTable.setItem(idx, DirectBeamTableColumn.WAVELENGTH, item)
# Unblock signals
self.main_window.auto_change_active = False
[docs]
def direct_beam_table_changed(self, item: QtWidgets.QTableWidgetItem):
"""Perform action upon change in direct beam list."""
if self.main_window.auto_change_active:
return
data = self._data_manager.direct_beam_list[item.row()]
keys = {
# DirectBeamTableColumn.ACTIVE: "active",
# DirectBeamTableColumn.RUN_NUMBER: "number", ## run number, not editable
DirectBeamTableColumn.PEAK_POSITION: "peak_position",
DirectBeamTableColumn.PEAK_WIDTH: "peak_width",
DirectBeamTableColumn.LOW_RES_POSITION: "low_res_position",
DirectBeamTableColumn.LOW_RES_WIDTH: "low_res_width",
DirectBeamTableColumn.BCK_POSITION: "bck_position",
DirectBeamTableColumn.BCK_WIDTH: "bck_width",
# DirectBeamTableColumn.WAVELENGTH: "wavelength", ## not editable
}
col = DirectBeamTableColumn(item.column())
if col not in keys:
return
try:
data.set_parameter(keys[col], float(item.text()))
except ValueError:
self.report_message(
f"Invalid value for {keys[col]}:\n\t{item.text()}\nPlease enter a valid number.",
pop_up=True,
is_error=True,
)
# Reset to old value if conversion fails
old_value = getattr(self._data_manager.active_cross_section.configuration, keys[col])
item.setText(str(old_value))
return
# If peak position changed, also update direct pixel overwrite in all reflectivity runs using this direct beam
if col == DirectBeamTableColumn.PEAK_POSITION:
self.update_reduction_table_from_direct_beam(data)
# Update calculated data
data.update_calculated_values()
# Update UI if this data set is the active one
if self._data_manager.is_active(data):
self.main_window.auto_change_active = True
self.update_info()
self.main_window.auto_change_active = False
# Recalculate reflectivity for runs with this direct beam
try:
self._data_manager.reduce_spec(direct_beam=data.number)
except:
self.report_message(
"Could not compute reflectivity for %s" % self._data_manager.current_file_name,
detailed_message=str(traceback.format_exc()),
pop_up=False,
is_error=False,
)
self.main_window.plotActiveTab()
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
self.main_window.initiate_projection_plot.emit(True)
[docs]
def active_data_changed(self):
"""Actions to be taken once the active data set has changed."""
# If we update an entry, it's because that data is currently active.
# Highlight it and un-highlight the other ones.
self.main_window.auto_change_active = True
idx = self._data_manager.find_active_data_id()
for i in range(self.reduction_table.rowCount()):
# Highlight the active data row, un-highlight the others
run_num = self.reduction_table.item(i, ReductionTableColumn.RUN_NUMBER)
if run_num is not None:
if i == idx:
run_num.setBackground(QColors.yellow)
else:
run_num.setBackground(QColors.white)
# Set the radio button states
active_cell = self.reduction_table.cellWidget(i, ReductionTableColumn.ACTIVE)
if active_cell is not None:
radio_button = active_cell.findChild(QtWidgets.QRadioButton)
radio_button.setChecked(i == idx)
idx = self._data_manager.find_active_direct_beam_id()
for i in range(self.ui.directBeamTable.rowCount()):
# Highlight the active data row, un-highlight the others
item = self.ui.directBeamTable.item(i, DirectBeamTableColumn.RUN_NUMBER)
if item is not None:
if i == idx:
item.setBackground(QColors.yellow)
else:
item.setBackground(QColors.white)
# Set the radio button states
active_cell = self.ui.directBeamTable.cellWidget(i, DirectBeamTableColumn.ACTIVE)
if active_cell is not None:
radio_button = active_cell.findChild(QtWidgets.QRadioButton)
radio_button.setChecked(i == idx)
self.main_window.auto_change_active = False
[docs]
def reduction_table_right_click(self, pos: QtCore.QPoint, is_reduction_table: bool = True):
"""Handle right-click on the reduction table.
Parameters
----------
pos:
Mouse position
is_reduction_table:
True if the reduction table is active, False if the direct beam table is active
"""
# Callbacks for the actions in the context menu
def _export_data(_pos):
row = table_widget.rowAt(pos.y())
if 0 <= row < len(data_table):
nexus_data = data_table[row]
self.save_run_data(nexus_data)
def _propagate_run(_pos):
# If direct beam, make sure the user
if not is_reduction_table and not self.ask_question(
"Run is labeled as direct beam. Do you still want to add it to the list of reflectivity runs?"
):
return
row = table_widget.rowAt(pos.y())
if 0 <= row < len(data_table):
nexus_data = data_table[row]
active_cross_section = self._data_manager.active_cross_section.name
active_cross_section = nexus_data.cross_sections[active_cross_section]
for ipeak, peak_data in self._data_manager.peak_reduction_lists.items():
if self._data_manager.copy_nexus_data_to_reduction(nexus_data, ipeak):
# get widget for target reduction table
target_widget = self.get_reduction_table_by_index(ipeak)
idx = self._data_manager.find_run_number_in_reduction_list(nexus_data.number, peak_data)
if idx is None:
raise RuntimeError("Run number not in reduction list")
target_widget.insertRow(idx)
# update UI table widget
self.update_reduction_table(target_widget, idx, active_cross_section)
def _remove_run(_pos):
if is_reduction_table:
self.remove_reflectivity()
else:
self.remove_direct_beam()
# Get the table widget and data table
if is_reduction_table:
table_widget = self.reduction_table
data_table = self._data_manager.reduction_list
else:
table_widget = self.ui.directBeamTable
data_table = self._data_manager.direct_beam_list
reduction_table_menu = QtWidgets.QMenu(table_widget)
export_data_action = QtWidgets.QAction("Export data")
export_data_action.triggered.connect(lambda: _export_data(pos))
reduction_table_menu.addAction(export_data_action)
propagate_run_action = QtWidgets.QAction("Propagate run to all tabs")
propagate_run_action.triggered.connect(lambda: _propagate_run(pos))
reduction_table_menu.addAction(propagate_run_action)
remove_run_action = QtWidgets.QAction("Remove run from this tab")
remove_run_action.triggered.connect(lambda: _remove_run(pos))
reduction_table_menu.addAction(remove_run_action)
reduction_table_menu.exec_(table_widget.mapToGlobal(pos))
[docs]
def save_run_data(self, nexus_data: NexusData):
"""Save run data to file."""
path = QtWidgets.QFileDialog.getExistingDirectory(self.main_window, "Select directory")
if not path:
return
# ask user for base name for files (one file for each cross-section, e.g. "REF_M_1234_data_Off-Off.dat")
default_basename = f"REF_M_{nexus_data.number}_data"
while True: # to ask again for new basename if the user does not want to overwrite existing files
basename, ok = QtWidgets.QInputDialog.getText(
self.main_window, "Base name", "Save file base name:", text=default_basename
)
if not (ok and basename):
# user cancels
return
save_filepaths = {}
existing_filenames = []
for xs in nexus_data.cross_sections.keys():
filename = f"{basename}_{xs}.dat"
filepath = os.path.join(path, filename)
save_filepaths[xs] = filepath
if os.path.isfile(filepath):
existing_filenames.append(filename)
newline = "\n"
if len(existing_filenames) == 0 or self.ask_question(
f"Overwrite existing file(s):\n{newline.join(existing_filenames)}?"
):
break
# save one file per cross-section
for xs, filepath in save_filepaths.items():
cross_section = nexus_data.cross_sections[xs]
data_to_save, header = cross_section.get_tof_counts_table()
np.savetxt(filepath, data_to_save, header=header)
[docs]
def compute_offspec_on_change(self, force=False):
"""Compute off-specular as needed."""
prog = self.new_progress_reporter()
has_changed_values = self.check_region_values_changed()
offspec_data_exists = self._data_manager.is_offspec_available()
logging.info("Exists %s %s", has_changed_values, offspec_data_exists)
if force or has_changed_values >= 0 or not offspec_data_exists:
logging.info("Updating....")
config = self.get_configuration_from_ui()
self._data_manager.update_configuration(configuration=config, active_only=False)
self._data_manager.reduce_offspec(progress=prog)
[docs]
def compute_gisans_on_change(self, force=False, active_only=True):
"""Compute GISANS as needed."""
prog = self.new_progress_reporter()
has_changed_values = self.check_region_values_changed()
gisans_data_exists = self._data_manager.is_gisans_available(active_only=active_only)
logging.info("Exists %s %s %s", force, has_changed_values, gisans_data_exists)
if force or has_changed_values >= 0 or not gisans_data_exists:
logging.info("Updating....")
config = self.get_configuration_from_ui()
self._data_manager.update_configuration(configuration=config, active_only=False)
if active_only:
result = self._data_manager.calculate_gisans(progress=prog)
if not result:
self.report_message(
f"Could not compute GISANS for {self._data_manager.current_file_name}",
detailed_message=str(traceback.format_exc()),
pop_up=True,
is_error=True,
)
else:
self._data_manager.reduce_gisans(progress=prog)
[docs]
def check_region_values_changed(self):
"""Return true if any of the parameters tied to a particular slot has changed.
Some parameters are tied to the changeRegionValues() slot.
There are time-consuming actions that we only want to take
if those values actually changed, as opposed to the use simply
clicking outside the box.
Some parameters don't require a recalculation but simply a
refreshing of the plots. Those are parameters such as scaling
factors or the number of points clipped.
Returns
-------
int:
-1 = no valid change,
0 = replot needed,
1 = recalculation needed
"""
if self._data_manager.active_cross_section is None:
return -1
configuration = self._data_manager.active_cross_section.configuration
valid_change = False
replot_change = False
# ROI parameters
x_pos = self.ui.refXPos.value()
x_width = self.ui.refXWidth.value()
y_pos = self.ui.refYPos.value()
y_width = self.ui.refYWidth.value()
bck_pos = self.ui.bgCenter.value()
bck_width = self.ui.bgWidth.value()
valid_change = (
valid_change or not configuration.peak_position == x_pos or not configuration.peak_width == x_width
)
valid_change = (
valid_change or not configuration.low_res_position == y_pos or not configuration.low_res_width == y_width
)
valid_change = (
valid_change or not configuration.bck_position == bck_pos or not configuration.bck_width == bck_width
)
try:
scale = math.pow(10.0, self.ui.refScale.value())
except:
scale = 1
replot_change = replot_change or not configuration.scaling_factor == scale
replot_change = replot_change or not configuration.cut_first_n_points == self.ui.rangeStart.value()
replot_change = replot_change or not configuration.cut_last_n_points == self.ui.rangeEnd.value()
valid_change = valid_change or not configuration.subtract_background == self.ui.bgActive.isChecked()
valid_change = valid_change or not configuration.use_dangle == self.ui.trustDANGLE.isChecked()
valid_change = valid_change or not configuration.set_direct_pixel == self.ui.set_dirpix_checkbox.isChecked()
valid_change = (
valid_change or not configuration.set_direct_angle_offset == self.ui.set_dangle0_checkbox.isChecked()
)
if configuration.set_direct_pixel:
valid_change = (
valid_change or not configuration.direct_pixel_overwrite == self.ui.directPixelOverwrite.value()
)
if configuration.set_direct_angle_offset:
valid_change = (
valid_change or not configuration.direct_angle_offset_overwrite == self.ui.dangle0Overwrite.value()
)
valid_change = (
valid_change or configuration.binning_type_run != self.ui.binning_type_selector_run.currentIndex()
)
valid_change = valid_change or configuration.binning_q_step_run != self.ui.q_rebin_spinbox_run.value()
if valid_change:
return 1
if replot_change:
return 0
return -1
[docs]
def get_configuration_from_ui(self) -> Configuration:
"""Gather the reduction options.
Retrieve the reduction options either from the active cross section,
or from the current settings in the graphical interface.
Note that some options are global (static members of Configuration class),
while others are per-cross-section (members of Configuration instance).
This is because some options are applied to all cross-sections
(e.g. whether to use a ROI or not), while others are specific to each cross-section
(e.g. the peak position and width).
"""
if self._data_manager.active_cross_section is not None:
configuration = self._data_manager.active_cross_section.configuration
else:
configuration = Configuration()
configuration.tof_bins = self.ui.eventTofBins.value()
configuration.tof_bin_type = self.ui.eventBinMode.currentIndex()
# Peak finder options
Configuration.use_roi = self.ui.use_roi_checkbox.isChecked()
Configuration.update_peak_range = self.ui.fit_within_roi_checkbox.isChecked()
Configuration.use_peak_finder = self.ui.actionAutoXROI.isChecked()
Configuration.use_low_res_finder = self.ui.actionAutoYROI.isChecked()
Configuration.use_metadata_bck_roi = self.ui.use_bck_roi_checkbox.isChecked()
Configuration.use_tight_bck = self.ui.use_side_bck_checkbox.isChecked()
Configuration.bck_offset = self.ui.side_bck_width.value()
# Default ranges, using the current values
configuration.peak_position = self.ui.refXPos.value()
configuration.peak_width = self.ui.refXWidth.value()
configuration.low_res_position = self.ui.refYPos.value()
configuration.low_res_width = self.ui.refYWidth.value()
configuration.bck_position = self.ui.bgCenter.value()
configuration.bck_width = self.ui.bgWidth.value()
configuration.match_direct_beam = self.ui.actionAutoNorm.isChecked()
configuration.binning_type_run = self.ui.binning_type_selector_run.currentIndex()
configuration.binning_q_step_run = self.ui.q_rebin_spinbox_run.value()
# Other reduction options
configuration.subtract_background = self.ui.bgActive.isChecked()
try:
scale = math.pow(10.0, self.ui.refScale.value())
except:
scale = 1
configuration.scaling_factor = scale
configuration.cut_first_n_points = self.ui.rangeStart.value()
configuration.cut_last_n_points = self.ui.rangeEnd.value()
Configuration.normalize_to_unity = self.ui.normalize_to_unity_checkbox.isChecked()
Configuration.total_reflectivity_q_cutoff = self.ui.normalization_q_cutoff_spinbox.value()
Configuration.global_stitching = self.ui.global_fit_checkbox.isChecked()
Configuration.polynomial_stitching = self.ui.polynomial_stitching_checkbox.isChecked()
Configuration.polynomial_stitching_degree = self.ui.polynomial_stitching_degree_spinbox.value()
Configuration.polynomial_stitching_points = self.ui.polynomial_stitching_points_spinbox.value()
Configuration.wl_bandwidth = self.ui.bandwidth_spinbox.value()
configuration.use_dangle = self.ui.trustDANGLE.isChecked()
configuration.set_direct_pixel = self.ui.set_dirpix_checkbox.isChecked()
configuration.set_direct_angle_offset = self.ui.set_dangle0_checkbox.isChecked()
configuration.direct_pixel_overwrite = self.ui.directPixelOverwrite.value()
configuration.direct_angle_offset_overwrite = self.ui.dangle0Overwrite.value()
Configuration.sample_size = self.ui.sample_size_spinbox.value()
Configuration.binning_q_step_global = self.ui.q_rebin_spinbox_global.value()
Configuration.apply_deadtime = self.ui.deadtime_entry.applyCheckBox.isChecked()
Configuration.lock_direct_beam_y = self.ui.direct_beam_y_lock_checkbox.isChecked()
# UI elements
configuration.normalize_x_tof = self.ui.normalizeXTof.isChecked()
configuration.x_wl_map = self.ui.xLamda.isChecked()
configuration.angle_map = self.ui.tthPhi.isChecked()
configuration.log_1d = self.ui.logarithmic_y.isChecked()
configuration.log_2d = self.ui.logarithmic_colorscale.isChecked()
# Off-specular options
if self.ui.kizmkfzVSqz.isChecked():
configuration.off_spec_x_axis = Configuration.DELTA_KZ_VS_QZ
elif self.ui.qxVSqz.isChecked():
configuration.off_spec_x_axis = Configuration.QX_VS_QZ
else:
configuration.off_spec_x_axis = Configuration.KZI_VS_KZF
configuration.off_spec_slice = self.ui.offspec_slice_checkbox.isChecked()
configuration.off_spec_slice_qz_min = self.ui.slice_qz_min_spinbox.value()
configuration.off_spec_slice_qz_max = self.ui.slice_qz_max_spinbox.value()
# try:
# qz_list = self.ui.offspec_qz_list_edit.text()
# if len(qz_list) > 0:
# configuration.off_spec_qz_list = [float(x) for x in self.ui.offspec_qz_list_edit.text().split(',')]
# except:
# logging.error("Could not parse off_spec_qz_list: %s", configuration.off_spec_qz_list)
configuration.off_spec_err_weight = self.ui.offspec_err_weight_checkbox.isChecked()
configuration.off_spec_nxbins = self.ui.offspec_rebin_x_bins_spinbox.value()
configuration.off_spec_nybins = self.ui.offspec_rebin_y_bins_spinbox.value()
configuration.off_spec_x_min = self.ui.offspec_x_min_spinbox.value()
configuration.off_spec_x_max = self.ui.offspec_x_max_spinbox.value()
configuration.off_spec_y_min = self.ui.offspec_y_min_spinbox.value()
configuration.off_spec_y_max = self.ui.offspec_y_max_spinbox.value()
# Off-spec smoothing options
configuration.apply_smoothing = self.ui.offspec_smooth_checkbox.isChecked()
# GISANS options
configuration.gisans_wl_min = self.ui.gisans_wl_min_spinbox.value()
configuration.gisans_wl_max = self.ui.gisans_wl_max_spinbox.value()
configuration.gisans_wl_npts = self.ui.gisans_wl_npts_spinbox.value()
configuration.gisans_qz_npts = self.ui.gisans_qz_npts_spinbox.value()
configuration.gisans_qy_npts = self.ui.gisans_qy_npts_spinbox.value()
configuration.gisans_use_pf = self.ui.gisans_pf_radio.isChecked()
configuration.gisans_slice = self.ui.gisans_slice_checkbox.isChecked()
configuration.gisans_slice_qz_min = self.ui.gisans_qz_min_spinbox.value()
configuration.gisans_slice_qz_max = self.ui.gisans_qz_max_spinbox.value()
return configuration
[docs]
def populate_from_configuration(self, configuration=None):
"""Set reduction options in UI, usually after loading a reduced data set."""
if configuration is None:
configuration = Configuration()
self.ui.eventTofBins.setValue(configuration.tof_bins)
self.ui.eventBinMode.setCurrentIndex(configuration.tof_bin_type)
# Peak finder settings
self.ui.use_roi_checkbox.setChecked(configuration.use_roi)
self.ui.use_bck_roi_checkbox.setChecked(configuration.use_metadata_bck_roi)
self.ui.fit_within_roi_checkbox.setChecked(configuration.update_peak_range)
self.ui.actionAutoXROI.setChecked(False if configuration.use_roi else configuration.use_peak_finder)
self.ui.actionAutoYROI.setChecked(False if configuration.use_roi else configuration.use_low_res_finder)
# Use background on each side of the peak
self.ui.use_side_bck_checkbox.setChecked(configuration.use_tight_bck)
self.ui.side_bck_width.setValue(configuration.bck_offset)
# Update reduction parameters
self.ui.refXPos.setValue(configuration.peak_position)
self.ui.refXWidth.setValue(configuration.peak_width)
self.ui.refYPos.setValue(configuration.low_res_position)
self.ui.refYWidth.setValue(configuration.low_res_width)
self.ui.bgCenter.setValue(configuration.bck_position)
self.ui.bgWidth.setValue(configuration.bck_width)
# Subtract background
self.ui.bgActive.setChecked(configuration.subtract_background)
# Scaling factor
try:
scale = math.log10(configuration.scaling_factor)
except:
scale = 0.0
self.ui.refScale.setValue(scale)
# Cut first and last points
self.ui.rangeStart.setValue(configuration.cut_first_n_points)
self.ui.rangeEnd.setValue(configuration.cut_last_n_points)
self.ui.normalize_to_unity_checkbox.setChecked(configuration.normalize_to_unity)
self.ui.normalization_q_cutoff_spinbox.setValue(configuration.total_reflectivity_q_cutoff)
self.ui.global_fit_checkbox.setChecked(configuration.global_stitching)
self.ui.polynomial_stitching_checkbox.setChecked(configuration.polynomial_stitching)
self.ui.polynomial_stitching_degree_spinbox.setValue(configuration.polynomial_stitching_degree)
self.ui.polynomial_stitching_points_spinbox.setValue(configuration.polynomial_stitching_points)
self.ui.bandwidth_spinbox.setValue(configuration.wl_bandwidth)
self.ui.trustDANGLE.setChecked(configuration.use_dangle)
self.ui.set_dirpix_checkbox.setChecked(configuration.set_direct_pixel)
self.ui.set_dangle0_checkbox.setChecked(configuration.set_direct_angle_offset)
self.ui.directPixelOverwrite.setValue(configuration.direct_pixel_overwrite)
self.ui.dangle0Overwrite.setValue(configuration.direct_angle_offset_overwrite)
self.ui.sample_size_spinbox.setValue(configuration.sample_size)
self.ui.q_rebin_spinbox_global.setValue(configuration.binning_q_step_global)
self.ui.deadtime_entry.applyCheckBox.setChecked(configuration.apply_deadtime)
self.ui.direct_beam_y_lock_checkbox.setChecked(configuration.lock_direct_beam_y)
self.ui.binning_type_selector_run.setCurrentIndex(configuration.binning_type_run)
if configuration.binning_q_step_run:
self.ui.q_rebin_spinbox_run.setValue(configuration.binning_q_step_run)
# UI elements
self.ui.normalizeXTof.setChecked(configuration.normalize_x_tof)
self.ui.xLamda.setChecked(configuration.x_wl_map)
self.ui.tthPhi.setChecked(configuration.angle_map)
self.ui.logarithmic_y.setChecked(configuration.log_1d)
self.ui.logarithmic_colorscale.setChecked(configuration.log_2d)
# Off-specular options
if configuration.off_spec_x_axis == Configuration.DELTA_KZ_VS_QZ:
self.ui.kizmkfzVSqz.setChecked(True)
elif configuration.off_spec_x_axis == Configuration.QX_VS_QZ:
self.ui.qxVSqz.setChecked(True)
else:
self.ui.kizVSkfz.setChecked(True)
self.ui.offspec_slice_checkbox.setChecked(configuration.off_spec_slice)
# self.ui.offspec_qz_list_edit.setText(','.join([str(x) for x in configuration.off_spec_qz_list]))
self.ui.slice_qz_min_spinbox.setValue(configuration.off_spec_slice_qz_min)
self.ui.slice_qz_max_spinbox.setValue(configuration.off_spec_slice_qz_max)
self.ui.offspec_err_weight_checkbox.setChecked(configuration.off_spec_err_weight)
self.ui.offspec_rebin_x_bins_spinbox.setValue(configuration.off_spec_nxbins)
self.ui.offspec_rebin_y_bins_spinbox.setValue(configuration.off_spec_nybins)
self.ui.offspec_x_min_spinbox.setValue(configuration.off_spec_x_min)
self.ui.offspec_x_max_spinbox.setValue(configuration.off_spec_x_max)
self.ui.offspec_y_min_spinbox.setValue(configuration.off_spec_y_min)
self.ui.offspec_y_max_spinbox.setValue(configuration.off_spec_y_max)
# Off-spec smoothing options
self.ui.offspec_smooth_checkbox.setChecked(configuration.apply_smoothing)
# GISANS options
self.ui.gisans_wl_min_spinbox.setValue(configuration.gisans_wl_min)
self.ui.gisans_wl_max_spinbox.setValue(configuration.gisans_wl_max)
self.ui.gisans_wl_npts_spinbox.setValue(configuration.gisans_wl_npts)
self.ui.gisans_qz_npts_spinbox.setValue(configuration.gisans_qz_npts)
self.ui.gisans_qy_npts_spinbox.setValue(configuration.gisans_qy_npts)
self.ui.gisans_pf_radio.setChecked(configuration.gisans_use_pf)
self.ui.gisans_slice_checkbox.setChecked(configuration.gisans_slice)
self.ui.gisans_qz_min_spinbox.setValue(configuration.gisans_slice_qz_min)
self.ui.gisans_qz_max_spinbox.setValue(configuration.gisans_slice_qz_max)
[docs]
def stitch_reflectivity(self):
"""Stitch the reflectivity parts and normalize to 1."""
# Update the configuration so we can remember the cutoff value
# later if it was changed
self.get_configuration_from_ui()
if self.ui.polynomial_stitching_checkbox.isChecked():
poly_degree = self.ui.polynomial_stitching_degree_spinbox.value()
else:
poly_degree = None
try:
self._data_manager.stitch_data_sets(
normalize_to_unity=self.ui.normalize_to_unity_checkbox.isChecked(),
q_cutoff=self.ui.normalization_q_cutoff_spinbox.value(),
global_stitching=self.ui.global_fit_checkbox.isChecked(),
poly_degree=poly_degree,
poly_points=self.ui.polynomial_stitching_points_spinbox.value(),
)
except (RuntimeError, ValueError) as err:
self.report_message(
f"Error in stitching:\n{str(err)}",
detailed_message=str(traceback.format_exc()),
pop_up=True,
is_error=True,
)
except NormalizeToUnityQCutoffError as err:
self.report_message(
f"Error in normalize to unity when stitching:\n{str(err)}",
detailed_message=str(traceback.format_exc()),
pop_up=True,
is_error=True,
)
else:
for i in range(len(self._data_manager.reduction_list)):
xs = self._data_manager.active_cross_section.name
d = self._data_manager.reduction_list[i].cross_sections[xs]
self.reduction_table.setItem(
i,
ReductionTableColumn.SCALE_FACTOR,
QtWidgets.QTableWidgetItem("%.4f" % (d.configuration.scaling_factor)),
)
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
[docs]
def trim_data_to_normalization(self):
"""Cut the start and end of the active data set to 5% of its maximum intensity."""
trim_points = self._data_manager.get_trim_values()
if trim_points is not None:
self.ui.rangeStart.setValue(trim_points[0])
self.ui.rangeEnd.setValue(trim_points[1])
self.update_tables()
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
else:
self.report_message("No direct beam found to trim data", pop_up=False)
[docs]
def strip_overlap(self):
"""Remove overlapping points in the reflectivity, cutting always from the lower Qz measurements."""
self._data_manager.strip_overlap()
for i in range(len(self._data_manager.reduction_list)):
xs = self._data_manager.active_cross_section.name
d = self._data_manager.reduction_list[i].cross_sections[xs]
self.reduction_table.setItem(
i, ReductionTableColumn.NUM_RIGHT, QtWidgets.QTableWidgetItem(str(d.configuration.cut_last_n_points))
)
self.main_window.initiate_reflectivity_or_intensity_plot.emit()
[docs]
def reload_all_files(self):
"""Reload all files upon change in loading configuration.
To speed up reloading, the file cache is first cleared of files that are not used in the
reduction list or direct beam list.
"""
if self._data_manager.get_cachesize() == 0:
return
# Store the active (plotted) run index
active_idx = self._data_manager.find_active_data_id()
if active_idx is not None:
is_active_data_direct_beam = False
else:
is_active_data_direct_beam = True
active_idx = self._data_manager.find_active_direct_beam_id()
# Store the active data tab
active_data_tab = self._data_manager.active_reduction_list_index
# Reload files
self._data_manager.clear_cached_unused_data()
configuration = self.get_configuration_from_ui()
prog = ProgressReporter(progress_bar=self.progress_bar, status_bar=self.status_bar_handler)
self._data_manager.reload_files(configuration, prog)
# Update the tables in the UI
self.main_window.auto_change_active = True
self.ui.directBeamTable.setRowCount(len(self._data_manager.direct_beam_list))
for idx, _ in enumerate(self._data_manager.direct_beam_list):
self._data_manager.set_active_data_from_direct_beam_list(idx)
self.update_direct_beam_table(idx, self._data_manager.active_cross_section)
self.ui.reductionTable.setRowCount(len(self._data_manager.reduction_list))
for ipeak, peak_data in self._data_manager.peak_reduction_lists.items():
self._data_manager.set_active_reduction_list_index(ipeak)
table_widget = self.get_reduction_table_by_index(ipeak)
table_widget.setRowCount(len(self._data_manager.reduction_list))
for idx, _ in enumerate(self._data_manager.reduction_list):
self._data_manager.set_active_data_from_reduction_list(idx)
self.update_reduction_table(table_widget, idx, self._data_manager.active_cross_section)
direct_beam_ids = [str(r.number) for r in self._data_manager.direct_beam_list]
self.ui.direct_beam_list_label.setText(", ".join(direct_beam_ids))
# Restore the active data tab
self._data_manager.set_active_reduction_list_index(active_data_tab)
# Restore the active run
if is_active_data_direct_beam:
self._data_manager.set_active_data_from_direct_beam_list(active_idx)
else:
self._data_manager.set_active_data_from_reduction_list(active_idx)
# Update plots
self.file_loaded()
self.main_window.auto_change_active = False
[docs]
def report_message(
self,
message: str,
informative_message: Optional[str] = None,
detailed_message: Optional[str] = None,
pop_up: bool = False,
is_error: bool = False,
):
"""Report a message or error to the status bar at the bottom of the window.
If `is_error` is True, the message is also logged on the error channel.
"""
self.status_bar_handler.show_message(message)
if is_error:
logging.error(message)
if detailed_message is not None:
logging.error(detailed_message)
elif pop_up:
logging.warning(message)
else:
logging.info(message)
if pop_up:
msg = QtWidgets.QMessageBox()
msg.setIcon(QtWidgets.QMessageBox.Warning)
msg.setText(message)
msg.setWindowTitle("Information")
if informative_message is not None:
msg.setInformativeText(informative_message)
if detailed_message is not None:
msg.setDetailedText(detailed_message)
msg.setStandardButtons(QtWidgets.QMessageBox.Ok)
msg.exec_()
[docs]
def ask_question(self, message: str) -> bool:
"""Display a popup dialog with a message and choices "Ok" and "Cancel"."""
ret = QtWidgets.QMessageBox.warning(
self.main_window,
"Warning",
message,
buttons=QtWidgets.QMessageBox.Ok | QtWidgets.QMessageBox.Cancel,
defaultButton=QtWidgets.QMessageBox.Ok,
)
if ret == QtWidgets.QMessageBox.Cancel:
return False
return True
[docs]
def show_results(self):
"""Pop up the result viewer."""
from quicknxs.interfaces.result_viewer import ResultViewer
dialog = ResultViewer(self.main_window, self._data_manager)
dialog.specular_compare_widget.ui.refl_preview_checkbox.setChecked(True)
self.main_window.update_specular_viewer.connect(dialog.update_specular)
self.main_window.update_off_specular_viewer.connect(dialog.update_off_specular)
self.main_window.update_gisans_viewer.connect(dialog.update_gisans)
dialog.show()
[docs]
def propagate_binning_options_to_run_config(self):
"""Enable the selected binning type with the given Q-step for all runs in the active data tab.
Note: This function updates the UI and internal configuration state while blocking all signals.
The caller is responsible for triggering recalculation and replotting.
"""
bin_type_global = self.ui.binning_type_selector_global.currentIndex()
q_step_global = self.ui.q_rebin_spinbox_global.value()
# loop over runs in the active data tab to update internal state and UI state
reduct_list = self._data_manager.reduction_list
for idx, nexus_data in enumerate(reduct_list):
active_cross_section_name: str = self._data_manager.active_cross_section.name
active_cross_section = nexus_data.cross_sections[active_cross_section_name]
# get the current configuration state
conf = active_cross_section.configuration
# update the run final rebin configuration state
conf.binning_type_run = bin_type_global
conf.binning_q_step_run = q_step_global
nexus_data.update_configuration(conf)
# update the UI reduction table to reflect the configuration state (signals are blocked)
self.update_reduction_table(self.reduction_table, idx, active_cross_section)
# update the run Q step spinbox value
self.ui.q_rebin_spinbox_run.blockSignals(True)
self.ui.q_rebin_spinbox_run.setValue(q_step_global)
self.ui.q_rebin_spinbox_run.blockSignals(False)
# update the run binning type combobox
self.ui.binning_type_selector_run.blockSignals(True)
self.ui.binning_type_selector_run.setCurrentIndex(bin_type_global)
self.ui.binning_type_selector_run.blockSignals(False)
[docs]
def get_log_level(self):
"""Return current root logger level as a string for GUI dropdown."""
level = logging.getLogger().getEffectiveLevel()
return logging.getLevelName(level)
[docs]
def change_log_level(self, level: str):
"""Update all handlers + root logger to new level."""
lvl = level.upper()
if lvl not in (self.LOG_LEVELS):
lvl = "INFO"
root = logging.getLogger()
root.setLevel(lvl)
for handler in root.handlers:
handler.setLevel(lvl)