Source code for quicknxs.interfaces.event_handlers.main_handler

# -*- coding: utf-8 -*-
# pylint: disable=invalid-name, line-too-long, too-many-public-methods, too-many-instance-attributes, wrong-import-order, \
# bare-except, protected-access, too-many-arguments, too-many-statements
"""
Manage file-related and UI events
"""

import glob
import logging
import math
import os
import time
import traceback
from typing import List, Optional

import numpy as np
from mantid.simpleapi import DeleteWorkspace, LoadEventNexus
from PyQt5 import QtCore, QtGui, QtWidgets

from quicknxs.config import Settings
from quicknxs.interfaces.configuration import Configuration
from quicknxs.interfaces.data_handling.data_manipulation import NormalizeToUnityQCutoffError
from quicknxs.interfaces.data_handling.data_set import CrossSectionData, NexusData
from quicknxs.interfaces.data_handling.filepath import FilePath, RunNumbers
from quicknxs.interfaces.data_manager import DataManager
from quicknxs.interfaces.event_handlers.progress_reporter import ProgressReporter
from quicknxs.interfaces.event_handlers.status_bar_handler import StatusBarHandler
from quicknxs.interfaces.event_handlers.widgets import AcceptRejectDialog


[docs] class MainHandler(object): """ Event handler for the main application window. """ # Index of the direct beam tab in the reduction table tab widget DIRECT_BEAM_TAB_INDEX = 0 # Index of the first (and always visible) data tab in the reduction table tab widget MAIN_DATA_TAB_INDEX = 1 def __init__(self, main_window): self.ui = main_window.ui self.main_window = main_window self._data_manager: DataManager = main_window.data_manager # Update file list when changes are made self._path_watcher = QtCore.QFileSystemWatcher([self._data_manager.current_directory], self.main_window) self._path_watcher.directoryChanged.connect(self.update_file_list) self.cache_indicator = QtWidgets.QLabel("Files loaded: 0") self.cache_indicator.setMargin(5) self.cache_indicator.setSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Preferred) self.cache_indicator.setMinimumWidth(110) self.ui.statusbar.addPermanentWidget(self.cache_indicator) button = QtWidgets.QPushButton("Empty Cache") self.ui.statusbar.addPermanentWidget(button) button.pressed.connect(self.empty_cache) button.setFlat(False) button.setMaximumSize(150, 20) # Create progress bar in statusbar self.progress_bar = QtWidgets.QProgressBar(self.ui.statusbar) self.progress_bar.setMinimumSize(20, 14) self.progress_bar.setMaximumSize(140, 100) self.ui.statusbar.addPermanentWidget(self.progress_bar) self.status_bar_handler = StatusBarHandler(self.ui.statusbar) @property def reduction_table(self): """ Returns the active reduction table widget if one of the data tabs is active, else the first one Returns ------- QTableWidget """ if self.ui.tabWidget.currentIndex() == self.DIRECT_BEAM_TAB_INDEX: # get the table for the main data tab current_table = self.ui.tabWidget.widget(self.MAIN_DATA_TAB_INDEX).findChild(QtWidgets.QTableWidget) else: current_table = self.ui.tabWidget.currentWidget().findChild(QtWidgets.QTableWidget) return current_table
[docs] def get_reduction_table_by_index(self, tab_index: int) -> QtWidgets.QTableWidget: """Return the QTableWidget for the data tab with the given index""" return self.ui.tabWidget.widget(tab_index).findChild(QtWidgets.QTableWidget)
[docs] def new_progress_reporter(self): """Return a progress reporter""" return ProgressReporter(progress_bar=self.progress_bar, status_bar=self.status_bar_handler)
[docs] def empty_cache(self): """ Empty the data cache """ self._data_manager.clear_cache() self.cache_indicator.setText("Files loaded: 0")
[docs] def hide_sidebar(self): if self.main_window.leftEntries.isHidden(): self.main_window.leftEntries.show() else: self.main_window.leftEntries.hide()
[docs] def hide_run_data(self): if self.main_window.runDataFrame.isHidden(): self.main_window.runDataFrame.show() else: self.main_window.runDataFrame.hide()
[docs] def hide_data_table(self): if self.main_window.frame_2.isHidden(): self.main_window.frame_2.show() else: self.main_window.frame_2.hide()
[docs] def open_file(self, file_path: str, force: Optional[bool] = False, silent: Optional[bool] = False) -> None: r""" @brief Read one or more data files. If more than one, merge their data. @param file_path: absolute path to data files. If more than one file, paths are joined with the plus symbol '+' @param force: if true, the file will be reloaded even if it was loaded previously @param silent: if true, the plots currently shown in the interface will NOT be updated """ # Actions carried out: # 1. check the file exists # 2. Invoke DataManager.load() # 3. if silent==False, invoke DataManager.file_loaded() for single_file_path in file_path.split( FilePath.merge_symbol ): # also works when file_path is just the path to one file if not os.path.isfile(single_file_path): self.report_message( "File does not exist", detailed_message="The following file does not exist:\n %s" % single_file_path, pop_up=True, is_error=True, ) return t_0 = time.time() self.main_window.auto_change_active = True try: self.report_message(f"Loading file(s) {file_path}") prog = ProgressReporter(progress_bar=self.progress_bar, status_bar=self.status_bar_handler) configuration = self.get_configuration() self._data_manager.load(file_path, configuration, force=force, progress=prog) self.report_message(f"Loaded file(s) {self._data_manager.current_file_name}") except RuntimeError as run_err: # FIXME - need to find out what kind of error it could have self.report_message( f"Error loading file(s) {self._data_manager.current_file_name} due to {run_err}", detailed_message=str(traceback.format_exc()), pop_up=False, is_error=True, ) if not silent: self.file_loaded() self.main_window.auto_change_active = False logging.info("DONE: %s sec", time.time() - t_0)
[docs] def file_loaded(self): """ Update UI after a file is loaded """ self.main_window.auto_change_active = True self._set_data_manager_active_channel() channels = list(self._data_manager.data_sets.keys()) for i, channel in enumerate(channels): getattr(self.ui, "selectedChannel%i" % i).show() good_label = channel.replace("_", "-") if not good_label == self._data_manager.data_sets[channel].cross_section_label: good_label = "%s: %s" % (good_label, self._data_manager.data_sets[channel].cross_section_label) getattr(self.ui, "selectedChannel%i" % i).setText(good_label) for i in range(len(channels), 12): getattr(self.ui, "selectedChannel%i" % i).hide() self.main_window.auto_change_active = False self.main_window.file_loaded_signal.emit() if self.main_window.data_manager.active_channel.is_direct_beam: self.main_window.initiate_intensity_plot.emit(False) else: self.main_window.initiate_reflectivity_plot.emit(False) self.main_window.initiate_projection_plot.emit(False) self.cache_indicator.setText("Files loaded: %s" % (self._data_manager.get_cachesize()))
[docs] def active_channel_changed(self): """ Update UI metadata and plots after the active channel is changed """ self._set_data_manager_active_channel() self.update_channel_info() self.main_window.plotActiveTab() if self.main_window.data_manager.active_channel.is_direct_beam: self.main_window.initiate_intensity_plot.emit(False) else: self.main_window.initiate_reflectivity_plot.emit(False) self.main_window.initiate_projection_plot.emit(False)
def _set_data_manager_active_channel(self): """ Set the data manager's active channel from the active channel in the UI """ self.main_window.auto_change_active = True current_channel = 0 for i in range(12): if getattr(self.ui, "selectedChannel%i" % i).isChecked(): current_channel = i break success = self._data_manager.set_channel(current_channel) if not success: self.ui.selectedChannel0.setChecked(True) self.main_window.auto_change_active = False def _congruency_fail_report(self, file_paths, log_names=None): r""" # type: List[str], Optional(List[str]) -> str @brief Check whether these files can be merged @param file_paths : List of Nexus files (full paths) @returns str : Error message; empty string if no error is found, """ assert len(file_paths) > 1, "We require more than one data file in order to compare their metadata" # Log names validation and collect tolerances tolerances = dict() # store the tolerance value for each log name all_tolerances: List[float] = Settings()["OpenSum"]["Tolerances"] all_log_names: List[str] = Settings()["OpenSum"]["LogNames"] assert all_log_names # failsafe if structure of settings.json changes if log_names is None: log_names = all_log_names for log_name in log_names: try: i = all_log_names.index(log_name) except ValueError: return f"{log_name} is not a valid Log for comparison" tolerances[log_name] = all_tolerances[i] # simple data structure to collect the log values from all files log_values = {name: list() for name in log_names} for file_path in file_paths: for entry in ["", "-Off_Off", "-On_Off", "-Off_On", "-On_On"]: # for new and old nexus files workspace = None try: workspace = LoadEventNexus(Filename=file_path, NXentryName="entry" + entry, MetaDataOnly=True) break except RuntimeError: continue if workspace is None: return f"Could not load {file_path}" metadata = workspace.getRun() for log_name in log_names: try: log_property = metadata.getProperty(log_name) except RuntimeError as e: return e.message log_values[log_name].append(log_property.getStatistics().mean) DeleteWorkspace(workspace) # Find the minimum and maximum values for each log, and compare to the tolerance message = "" for log_name, values in log_values.items(): if max(values) - min(values) > tolerances[log_name]: runs = FilePath(file_paths).run_numbers(string_representation="statement") message_template = "Runs {0} contain values for log {1} that differ above tolerance {2}" message = message + message_template.format(runs, log_name, tolerances[log_name]) + "\n" return message # empty string if no failures
[docs] def update_tables(self): """ Update a data set that may be in the reduction table or the direct beam table. """ # Update the reduction table if this data set is in it idx = self._data_manager.find_active_data_id() if idx is not None: table_widget = self.reduction_table self.update_reduction_table(table_widget, idx, self._data_manager.active_channel) # Update the direct beam table if this data set is in it idx = self._data_manager.find_active_direct_beam_id() if idx is not None: self.update_direct_beam_table(idx, self._data_manager.active_channel)
[docs] def update_calculated_data(self): """ Update the calculated entries in the overview tab. We should call this after the peak ranges change, or after a change is made that will affect the displayed results. """ d = self._data_manager.active_channel self.ui.datasetAi.setText("%.3f°" % (d.scattering_angle)) try: wl_min, wl_max = d.wavelength_range except ZeroDivisionError: wl_min, wl_max = float("NaN"), float("NaN") self.ui.datasetLambda.setText("%.2f (%.2f-%.2f) Å" % (d.lambda_center, wl_min, wl_max)) # DIRPIX and DANGLE0 overwrite if self.ui.set_dangle0_checkbox.isChecked(): dangle0 = "%.3f° (%.3f°)" % (float(self.ui.dangle0Overwrite.text()), d._angle_offset) else: dangle0 = "%.3f°" % (d.angle_offset) self.ui.datasetDangle0.setText(dangle0) if self.ui.set_dirpix_checkbox.isChecked(): dpix = "%.1f (%.1f)" % (float(self.ui.directPixelOverwrite.value()), d._direct_pixel) else: dpix = "%.1f" % d.direct_pixel self.ui.datasetDirectPixel.setText(dpix) if d.configuration.normalization is not None: self.ui.matched_direct_beam_label.setText("%s" % d.configuration.normalization) else: self.ui.matched_direct_beam_label.setText("None")
[docs] def update_info(self): """ Update metadata shown in the overview tab. """ self.main_window.auto_change_active = True d = self._data_manager.active_channel self.populate_from_configuration(d.configuration) self.main_window.initiate_projection_plot.emit(False) QtWidgets.QApplication.instance().processEvents() if self.ui.set_dangle0_checkbox.isChecked(): dangle0 = "%.3f° (%.3f°)" % (float(self.ui.dangle0Overwrite.text()), d.angle_offset) else: dangle0 = "%.3f°" % (d.angle_offset) if self.ui.set_dirpix_checkbox.isChecked(): dpix = "%.1f (%.1f)" % (float(self.ui.directPixelOverwrite.value()), d.direct_pixel) else: dpix = "%.1f" % d.direct_pixel wl_min, wl_max = d.wavelength_range self.ui.datasetLambda.setText("%.2f (%.2f-%.2f) Å" % (d.lambda_center, wl_min, wl_max)) self.ui.datasetPCharge.setText("%.3e" % d.proton_charge) self.ui.datasetTime.setText("%i s" % d.total_time) self.ui.datasetTotCounts.setText("%.4e" % d.total_counts) try: self.ui.datasetRate.setText("%.1f cps" % (d.total_counts / d.total_time)) except ZeroDivisionError: self.ui.datasetRate.setText("NaN") self.ui.datasetDangle.setText("%.3f°" % d.dangle) self.ui.datasetDangle0.setText(dangle0) self.ui.datasetSangle.setText("%.3f°" % d.sangle) self.ui.datasetDirectPixel.setText(dpix) self.ui.currentChannel.setText( "<b>%s</b> (%s)&nbsp;&nbsp;&nbsp;Type: %s&nbsp;&nbsp;&nbsp;Current State: " "<b>%s</b>" % (d.number, d.experiment, d.measurement_type, d.name) ) # Update direct beam indicator if d.is_direct_beam: self.ui.is_direct_beam_label.setText("Direct beam") else: self.ui.is_direct_beam_label.setText("") # Update the calculated data self.update_calculated_data() self.ui.roi_used_value.setText("%s" % d.use_roi_actual) self.ui.roi_peak_value.setText("%s" % str(d.meta_data_roi_peak)) self.ui.roi_bck_value.setText("%s" % str(d.meta_data_roi_bck)) # Update reduction tables self.update_tables() self.active_data_changed() self.main_window.auto_change_active = False
[docs] def update_channel_info(self): """ Update channel metadata shown in the overview tab. """ # Update cross-section specific information in the overview tab d = self._data_manager.active_channel self.ui.datasetPCharge.setText("%.3e" % d.proton_charge) self.ui.datasetTime.setText("%i s" % d.total_time) self.ui.datasetTotCounts.setText("%.4e" % d.total_counts) try: self.ui.datasetRate.setText("%.1f cps" % (d.total_counts / d.total_time)) except ZeroDivisionError: self.ui.datasetRate.setText("NaN") self.ui.currentChannel.setText( "<b>%s</b> (%s)&nbsp;&nbsp;&nbsp;Type: %s&nbsp;&nbsp;&nbsp;Current State: " "<b>%s</b>" % (d.number, d.experiment, d.measurement_type, d.name) ) # Update the calculated data self.update_calculated_data()
[docs] def update_file_list(self, query_path=None): # type: (Optional[str]) -> None r""" @brief Update the list of data files @param query_path: full path of a directory, a Nexus file, or a list of Nexus files. If a list of files, their paths are joined by the plus symbol '+'. """ def _split_composites(): r"""Split the list of files in widget self.ui.file_list into a list of single files and a list of composite files""" singles, composites = list(), list() for i in range(self.ui.file_list.count()): file_base_name = self.ui.file_list.item(i).text() if FilePath.merge_symbol in file_base_name: composites.append(file_base_name) else: singles.append(file_base_name) return singles, composites def _updated_current_list(): r"""Most updated list of single and composite files from the current directory""" _, composites = _split_composites() return sorted(self._data_manager.current_event_files + composites) def _reset_ui_file_list(fresh_list): r"""reset widget self.ui.file_list and highlight the current file_name""" self.ui.file_list.clear() # Reset ui.file_list, a QtWidgets.QListWidget object assert isinstance(fresh_list, list), f"fresh_list must be list but not {type(fresh_list)}" for item in fresh_list: listitem = QtWidgets.QListWidgetItem(item, self.ui.file_list) if item == self._data_manager.current_file_name: # Changing the current selection will trigger self.main_window.file_open_from_list() to be called, # however, the current setting self.main_window.auto_change_active == True will cause # self.main_window.file_open_from_list() to return before any statement is executed self.ui.file_list.setCurrentItem(listitem) def _update_current_directory(new_dir): r"""Update the directory path in the main window and the path watcher""" self.main_window.settings.setValue("current_directory", new_dir) self._path_watcher.removePath(self._data_manager.current_directory) self._data_manager.current_directory = new_dir self._path_watcher.addPath(self._data_manager.current_directory) # This setting prevents automatic read-in of the currently selected item in the list # when the currently selected items changes due to the list update. self.main_window.auto_change_active = True file_path = None if query_path is None else FilePath(query_path) # Use case 1: the contents of the current directory may have changed with the addition of new # event files. This could happen if the experiment is running, producing new event files. if file_path is None: new_list = _updated_current_list() # Use case 2: a composite from using Open Sum elif file_path.is_composite: file_dir, file_name = file_path.split() self._data_manager.current_file_name = file_path.basename # Use case 2.1: the composite is made up of files in the current directory if file_dir == self._data_manager.current_directory: new_list = sorted(_updated_current_list() + [file_path.basename]) # Use case 2.2: the composite is made up of files in a new directory else: _update_current_directory(file_dir) new_list = sorted(self._data_manager.current_event_files + [file_path.basename]) # Use case 3: a single path pointing to a file or a directory else: # Use case 3.1: a single path pointing to a new directory if os.path.isdir(query_path): file_dir = query_path if file_dir != self._data_manager.current_directory: # User changed directory _update_current_directory(file_dir) self._data_manager.current_file_name = self._data_manager.current_event_files[0] new_list = self._data_manager.current_event_files else: # TODO FIXME discovered from #93. This path is not checked and thus problematic new_list = None # Use case 3.2: a single path pointing to a file in the current or new directory else: file_dir, file_name = file_path.split() self._data_manager.current_file_name = file_name if file_dir == self._data_manager.current_directory: # User selected a new file in the directory new_list = _updated_current_list() else: # User selected a new file in a new directory _update_current_directory(file_dir) new_list = self._data_manager.current_event_files # TODO FIXME - new_list is not None has not been defined by stakeholder if new_list is not None: _reset_ui_file_list(new_list) self.main_window.auto_change_active = False
[docs] def automated_file_selection(self): """ Go through the files in the current in order of run numbers, and load files until the incident angle is no longer increasing. """ self.main_window.auto_change_active = True # Update the list of files event_file_list = glob.glob(os.path.join(self._data_manager.current_directory, "*event.nxs")) h5_file_list = glob.glob(os.path.join(self._data_manager.current_directory, "*.nxs.h5")) event_file_list.extend(h5_file_list) event_file_list.sort() event_file_list = [os.path.basename(name) for name in event_file_list] current_file_found = False n_count = 0 logging.error("Current file: %s", self._data_manager.current_file_name) q_current = self._data_manager.extract_meta_data().mid_q # Add the current data set to the reduction table # Do nothing if the data is incompatible is_direct_beam = self._data_manager.active_channel.is_direct_beam if is_direct_beam: if not self.add_direct_beam(): return else: if not self.add_reflectivity(): return for f in event_file_list: file_path = str(os.path.join(self._data_manager.current_directory, f)) if current_file_found and n_count < 10: n_count += 1 meta_data = self._data_manager.extract_meta_data(file_path) if q_current <= meta_data.mid_q and is_direct_beam == meta_data.is_direct_beam: q_current = meta_data.mid_q self.open_file(file_path, silent=True) d = self._data_manager.active_channel # If we find data of another type, stop here if not is_direct_beam == self._data_manager.active_channel.is_direct_beam: break self.main_window.auto_change_active = True self.populate_from_configuration(d.configuration) if self._data_manager.active_channel.is_direct_beam: self.add_direct_beam() else: self.add_reflectivity() if f == self._data_manager.current_file_name: current_file_found = True # At the very end, update the UI and plot reflectivity if n_count > 0: self.main_window.auto_change_active = True self.file_loaded() self.main_window.auto_change_active = False
[docs] def open_reduced_file_dialog(self): """ Open a reduced file and all the data files needed to reproduce it. """ # Open file dialog filter_ = "QuickNXS files (*.dat);;All (*.*)" output_dir = self.main_window.settings.value("output_directory", os.path.expanduser("~")) file_path, _ = QtWidgets.QFileDialog.getOpenFileName( self.main_window, "Open reduced file...", directory=output_dir, filter=filter_ ) t_0 = time.time() if file_path: # Clear the reduction lists first so that we don't create problems later self.main_window.reset_data_tabs() self.clear_direct_beams() self.clear_reflectivity() configuration = self.get_configuration() prog = self.new_progress_reporter() self._data_manager.load_data_from_reduced_file(file_path, configuration=configuration, progress=prog) # Update output directory file_dir, _ = os.path.split(str(file_path)) self.main_window.settings.setValue("output_directory", file_dir) self.main_window.auto_change_active = True # Update UI direct beam table self.ui.normalizeTable.setRowCount(len(self._data_manager.direct_beam_list)) for idx, _ in enumerate(self._data_manager.direct_beam_list): self._data_manager.set_active_data_from_direct_beam_list(idx) self.update_direct_beam_table(idx, self._data_manager.active_channel) # Update UI data table(s) with the loaded data for ipeak, _ in self._data_manager.peak_reduction_lists.items(): self._data_manager.set_active_reduction_list_index(ipeak) self.main_window.add_data_tab_by_index(ipeak) table_widget = self.get_reduction_table_by_index(ipeak) table_widget.setRowCount(len(self._data_manager.reduction_list)) for idx, _ in enumerate(self._data_manager.reduction_list): self._data_manager.set_active_data_from_reduction_list(idx) self.update_reduction_table(table_widget, idx, self._data_manager.active_channel) # Set the first reduction table and its first run as the active (plotted) data self._data_manager.set_active_reduction_list_index(self._data_manager.MAIN_REDUCTION_LIST_INDEX) self._data_manager.set_active_data_from_reduction_list(0) direct_beam_ids = [str(r.number) for r in self._data_manager.direct_beam_list] self.ui.normalization_list_label.setText(", ".join(direct_beam_ids)) self.file_loaded() if self._data_manager.active_channel is not None: self.populate_from_configuration(self._data_manager.active_channel.configuration) self.update_file_list(self._data_manager.current_file) self.main_window.auto_change_active = False logging.info("UI updated: %s", time.time() - t_0)
[docs] def initialize_additional_reduction_table(self, tab_index: int): """ Initialize new reduction table from the main reduction table Parameters ---------- tab_index: int Index of the additional tab/peak """ if self._data_manager.main_reduction_list: table_widget = self.get_reduction_table_by_index(tab_index) table_widget.setRowCount(len(self._data_manager.main_reduction_list)) active_cross_section = self._data_manager.active_channel.name for idx, nexus_data in enumerate(self._data_manager.main_reduction_list): active_channel = nexus_data.cross_sections[active_cross_section] self.update_reduction_table(table_widget, idx, active_channel)
def _file_open_dialog(self, filter_=None): # type: (Optional[str]) -> Optional[str] r""" @brief Pop a File dialog window for the user to select one file @param filter_: show files with only selected extensions @returns absolute path to the selected file """ file_path, _ = QtWidgets.QFileDialog.getOpenFileName( self.main_window, "Open NXS file...", directory=self._data_manager.current_directory, filter=filter_ ) return file_path def _file_open_sum_dialog(self, filter_: Optional[str] = None) -> Optional[str]: r""" @brief Pop a File dialog Window for the user to select two or more files @details Congruency among the selected files is checked by comparing the values of selected metadata. User is asked to override if congruency fails. @param filter_: show files with only selected extensions @returns absolute paths to the selected files, joined by the plus symbol '+' """ file_paths, _ = QtWidgets.QFileDialog.getOpenFileNames( self.main_window, "Select multiple NXS files to sum before data reduction.", directory=self._data_manager.current_directory, filter=filter_, ) # user cancel operation if len(file_paths) == 0: return elif len(file_paths) == 1: self.report_message("Merge mode must have more than 1 file selected") return # check whether files can be merged without further notice message = self._congruency_fail_report(file_paths, log_names=None) if message and not self._user_gives_permission(message): return return FilePath(file_paths).path def _process_file_path(self, dialog_opening_method): # type: (str) -> None r""" @brief Wrapper of the opening-file dialogs @details This wrapper defines the extension of the files to be shown by the file dialog, and process the file(s) selected by the user. It updates the file list widget as well as reads-in the file(s) """ if self.ui.histogramActive.isChecked(): filter_ = "All (*.*);;histo.nxs (*histo.nxs)" else: filter_ = "All (*.*);;nxs.h5 (*nxs.h5);;event.nxs (*event.nxs)" file_path = getattr(self, dialog_opening_method)(filter_=filter_) if file_path: self.update_file_list(file_path) self.open_file(file_path)
[docs] def file_open_dialog(self): r"""GUI callback for backend MainHandler._file_open_dialog.""" self._process_file_path("_file_open_dialog")
[docs] def file_open_sum_dialog(self): r"""GUI callback for backend MainHandler._file_open_sum_dialog.""" self._process_file_path("_file_open_sum_dialog")
def _user_gives_permission(self, message): # type: (str) -> bool r""" @brief Ask user's permission to proceed or quit if the select runs do not have same sample logs @param message: message to show in the dialog box """ message += ".\nProceed with Open Sum?" dialog = AcceptRejectDialog(self.main_window, title="Open Sum Confirmation", message=message) proceed = dialog.exec_() return proceed
[docs] def open_run_number(self, number=None): r""" @brief Open a data file by typing a run number or a composite run number for merging data sets @details Example: 120:123+125+127:132 opens files with run numbers from 120 to 132 except 124 and 126 """ self.main_window.auto_change_active = True if number is None: number = str(self.ui.numberSearchEntry.text()) # cast from unicode to string QtWidgets.QApplication.instance().processEvents() run_numbers = RunNumbers(number) file_list = list() # Look for new-style nexus file name configuration = self.get_configuration() for run_number in run_numbers.numbers: search_string = configuration.instrument.file_search_template % run_number matches = glob.glob(search_string + ".nxs.h5") # type: Optional[List[str]] if not matches: # Look for old-style nexus file name search_string = configuration.instrument.legacy_search_template % run_number matches = glob.glob(search_string + "_event.nxs") file_list.append(matches[0]) # there should be only one match, since we query with one run number self.ui.numberSearchEntry.setText("") # empty the contents of in the LineEdit widget success = False if len(file_list) > 0: file_path = FilePath(file_list).path # single path or a composite of file paths # If opening more than one file, check whether files can be merged if len(file_list) > 1: message = self._congruency_fail_report(file_list, log_names=None) if message and not self._user_gives_permission(message): return self.update_file_list(file_path) self.open_file(file_path) success = True else: self.report_message("Could not locate one or more of file(s) %s" % run_numbers.short, pop_up=True) self.main_window.auto_change_active = False return success
[docs] def update_daslog(self): """ Write parameters from all file daslogs to the table in the daslog tab. """ table = self.ui.daslogTableBox table.setRowCount(0) table.sortItems(-1) table.setColumnCount(len(self._data_manager.data_sets) + 2) table.setHorizontalHeaderLabels(["Name"] + list(self._data_manager.data_sets.keys()) + ["Unit"]) for j, key in enumerate(sorted(self._data_manager.active_channel.logs.keys(), key=lambda s: s.lower())): table.insertRow(j) table.setItem(j, 0, QtWidgets.QTableWidgetItem(key)) table.setItem( j, len(self._data_manager.data_sets) + 1, QtWidgets.QTableWidgetItem(self._data_manager.active_channel.log_units[key]), ) i = 0 for xs in self._data_manager.data_sets: item = QtWidgets.QTableWidgetItem("%g" % self._data_manager.data_sets[xs].logs[key]) item.setToolTip("MIN: %g MAX: %g" % (self._data_manager.data_sets[xs].log_minmax[key])) table.setItem(j, i + 1, item) i += 1 table.resizeColumnsToContents()
[docs] def add_reflectivity(self, silent=False): """ Collect information about the current extraction settings and store them in the list of reduction items. Returns true if everything is ok, false otherwise. """ # Update the configuration according to current parameters # Note that when a data set is first loaded, the peaks may have a different # range for each cross-section. If the option to use a common set of ranges # was turned on, we pick the ranges from the currently active cross-section # and apply then to all cross-sections. if self.ui.action_use_common_ranges.isChecked(): config = self.get_configuration() self._data_manager.update_configuration(configuration=config, active_only=False) # Verify that the new data is consistent with existing data in the table if not self._data_manager.add_active_to_reduction(): if not silent: self.report_message("(Add reflectivity) Data incompatible or already in the list.", pop_up=True) return False self.main_window.auto_change_active = True idx = self._data_manager.find_data_in_reduction_list(self._data_manager._nexus_data) if idx is None: raise RuntimeError("It could be None but not likely") self.ui.reductionTable.insertRow(idx) self.update_tables() self.main_window.initiate_reflectivity_plot.emit(True) self.main_window.update_specular_viewer.emit() self.main_window.auto_change_active = False return True
[docs] def update_reduction_table(self, table_widget: QtWidgets.QTableWidget, idx: int, d: CrossSectionData): """ Update the reduction table Parameters ---------- table_widget: QtWidgets.QTableWidget Table widget of the table to update idx: int Row to update d: CrossSectionData Cross-section data """ self.main_window.auto_change_active = True item = QtWidgets.QTableWidgetItem(str(d.number)) if d == self._data_manager.active_channel: item.setBackground(QtGui.QColor(246, 213, 16)) else: item.setBackground(QtGui.QColor(255, 255, 255)) item.setFlags(item.flags() & ~QtCore.Qt.ItemIsEditable) table_widget.setItem(idx, 0, item) table_widget.setItem(idx, 1, QtWidgets.QTableWidgetItem("%.4f" % (d.configuration.scaling_factor))) table_widget.setItem(idx, 2, QtWidgets.QTableWidgetItem(str(d.configuration.cut_first_n_points))) table_widget.setItem(idx, 3, QtWidgets.QTableWidgetItem(str(d.configuration.cut_last_n_points))) item = QtWidgets.QTableWidgetItem(str(d.configuration.peak_position)) item.setBackground(QtGui.QColor(200, 200, 200)) table_widget.setItem(idx, 4, item) table_widget.setItem(idx, 5, QtWidgets.QTableWidgetItem(str(d.configuration.peak_width))) item = QtWidgets.QTableWidgetItem(str(d.configuration.low_res_position)) item.setBackground(QtGui.QColor(200, 200, 200)) table_widget.setItem(idx, 6, item) table_widget.setItem(idx, 7, QtWidgets.QTableWidgetItem(str(d.configuration.low_res_width))) item = QtWidgets.QTableWidgetItem(str(d.configuration.bck_position)) item.setBackground(QtGui.QColor(200, 200, 200)) table_widget.setItem(idx, 8, item) table_widget.setItem(idx, 9, QtWidgets.QTableWidgetItem(str(d.configuration.bck_width))) table_widget.setItem(idx, 10, QtWidgets.QTableWidgetItem(str(d.direct_pixel))) table_widget.setItem(idx, 11, QtWidgets.QTableWidgetItem("%.4f" % d.scattering_angle)) norma = "none" if d.configuration.normalization is not None: norma = d.configuration.normalization table_widget.setItem(idx, 12, QtWidgets.QTableWidgetItem(str(norma))) if d.configuration.do_final_rebin_run: item = QtWidgets.QTableWidgetItem(str(d.configuration.final_rebin_step_run)) else: item = QtWidgets.QTableWidgetItem("") if d.configuration.do_final_rebin_global: item.setFlags(item.flags() & ~QtCore.Qt.ItemIsEditable) item.setBackground(QtGui.QColor(220, 220, 220)) table_widget.setItem(idx, 13, item) self.main_window.auto_change_active = False
[docs] def clear_reflectivity(self): """ Remove all items from the reduction lists. """ # clear the reduction lists in the data manager self._data_manager.clear_reduction_lists() # clear the reflectivity UI table widgets for itab in range(self.ui.tabWidget.count()): if itab == self.DIRECT_BEAM_TAB_INDEX: continue tab_widget = self.get_reduction_table_by_index(itab) tab_widget.setRowCount(0) # remove additional data tabs self.main_window.reset_data_tabs() if self.main_window.refl.isVisible(): self.main_window.initiate_reflectivity_plot.emit(False)
[docs] def clear_direct_beams(self): """ Remove all items from the direct beam list. """ self._data_manager.clear_direct_beam_list() self.ui.normalizeTable.setRowCount(0) self.ui.normalization_list_label.setText("None") self.main_window.initiate_intensity_plot.emit(False)
[docs] def remove_reflectivity(self): """ Remove one item from the reduction list. """ index = self.reduction_table.currentRow() if index < 0: return self._data_manager.reduction_list.pop(index) self.reduction_table.removeRow(index) self.main_window.initiate_reflectivity_plot.emit(False)
[docs] def remove_direct_beam(self): """ Remove one item from the direct beam list. """ index = self.ui.normalizeTable.currentRow() if index < 0: return self._data_manager.direct_beam_list.pop(index) self.ui.normalizeTable.removeRow(index) self.main_window.initiate_intensity_plot.emit(False)
[docs] def reduction_table_changed(self, item): """ Perform action upon change in data reduction list. """ if self.main_window.auto_change_active: return entry = item.row() column = item.column() refl = self._data_manager.reduction_list[entry] # TODO: If we changed the normalization run, make sure it's in the list # of direct beams we know about. keys = [ "number", "scaling_factor", "cut_first_n_points", "cut_last_n_points", "peak_position", "peak_width", "low_res_position", "low_res_width", "bck_position", "bck_width", "direct_pixel", "scattering_angle", "normalization", "final_rebin_step_run", ] # Update settings from selected option if column in [1, 4, 5, 6, 7, 8, 9, 10]: refl.set_parameter(keys[column], float(item.text())) elif column in [2, 3]: refl.set_parameter(keys[column], int(item.text())) elif column == 12: try: refl.set_parameter(keys[column], item.text()) except: refl.set_parameter(keys[column], None) item.setText("none") elif column == 13: try: new_value = round(float(item.text()), 3) if -0.1 <= new_value <= 0.1: refl.set_parameter(keys[column], new_value) refl.set_parameter("do_final_rebin_run", True) except: refl.set_parameter(keys[column], None) refl.set_parameter("do_final_rebin_run", False) # Update calculated data refl.update_calculated_values() # If the changed data set is the active data, also change the UI if self._data_manager.is_active(refl): self.main_window.auto_change_active = True self.update_info() self.main_window.auto_change_active = False # Update the direct beam table if this data set is in it idx = self._data_manager.find_data_in_direct_beam_list(refl) if idx is not None: channels = list(refl.cross_sections.keys()) self.update_direct_beam_table(idx, refl.cross_sections[channels[0]]) # Only recalculate if we need to, otherwise just replot if column not in [1, 2, 3]: try: self._data_manager.calculate_reflectivity(nexus_data=refl) except: self.report_message( "Could not compute reflectivity for %s" % self._data_manager.current_file_name, detailed_message=str(traceback.format_exc()), pop_up=False, is_error=False, ) self.main_window.initiate_reflectivity_plot.emit(True) self.main_window.update_specular_viewer.emit()
[docs] def add_direct_beam(self, silent=False): """ Add / remove dataset to the available normalizations or clear the normalization list. """ # Update all cross-section parameters as needed. if self.ui.action_use_common_ranges.isChecked(): config = self.get_configuration() self._data_manager.update_configuration(configuration=config, active_only=False) # Verify that the new data is consistent with existing data in the table if not self._data_manager.add_active_to_normalization(): if not silent: self.report_message("(Add direct beam) Data incompatible or already in the list.", pop_up=True) return False self.ui.normalizeTable.setRowCount(len(self._data_manager.direct_beam_list)) self.update_tables() direct_beam_ids = [str(r.number) for r in self._data_manager.direct_beam_list] self.ui.normalization_list_label.setText(", ".join(direct_beam_ids)) self.main_window.initiate_intensity_plot.emit(False) return True
[docs] def update_direct_beam_table(self, idx, d): """ Update a direct beam table entry :param int idx: row index :param CrossSectionData d: data object """ self.main_window.auto_change_active = True item = QtWidgets.QTableWidgetItem(str(d.number)) item.setFlags(item.flags() & ~QtCore.Qt.ItemIsEditable) if d == self._data_manager.active_channel: item.setBackground(QtGui.QColor(246, 213, 16)) else: item.setBackground(QtGui.QColor(255, 255, 255)) self.ui.normalizeTable.setItem(idx, 0, QtWidgets.QTableWidgetItem(item)) wl = "%s - %s" % (d.wavelength[0], d.wavelength[-1]) self.ui.normalizeTable.setItem(idx, 7, QtWidgets.QTableWidgetItem(wl)) item = QtWidgets.QTableWidgetItem(str(d.configuration.peak_position)) item.setBackground(QtGui.QColor(200, 200, 200)) self.ui.normalizeTable.setItem(idx, 1, QtWidgets.QTableWidgetItem(item)) self.ui.normalizeTable.setItem(idx, 2, QtWidgets.QTableWidgetItem(str(d.configuration.peak_width))) item = QtWidgets.QTableWidgetItem(str(d.configuration.low_res_position)) item.setBackground(QtGui.QColor(200, 200, 200)) self.ui.normalizeTable.setItem(idx, 3, QtWidgets.QTableWidgetItem(item)) self.ui.normalizeTable.setItem(idx, 4, QtWidgets.QTableWidgetItem(str(d.configuration.low_res_width))) item = QtWidgets.QTableWidgetItem(str(d.configuration.bck_position)) item.setBackground(QtGui.QColor(200, 200, 200)) self.ui.normalizeTable.setItem(idx, 5, QtWidgets.QTableWidgetItem(item)) self.ui.normalizeTable.setItem(idx, 6, QtWidgets.QTableWidgetItem(str(d.configuration.bck_width))) self.main_window.auto_change_active = False
[docs] def active_data_changed(self): """ Actions to be taken once the active data set has changed """ # If we update an entry, it's because that data is currently active. # Highlight it and un-highlight the other ones. self.main_window.auto_change_active = True idx = self._data_manager.find_active_data_id() for i in range(self.reduction_table.rowCount()): item = self.reduction_table.item(i, 0) if item is not None: if i == idx: item.setBackground(QtGui.QColor(246, 213, 16)) else: item.setBackground(QtGui.QColor(255, 255, 255)) idx = self._data_manager.find_active_direct_beam_id() for i in range(self.ui.normalizeTable.rowCount()): item = self.ui.normalizeTable.item(i, 0) if item is not None: if i == idx: item.setBackground(QtGui.QColor(246, 213, 16)) else: item.setBackground(QtGui.QColor(255, 255, 255)) self.main_window.auto_change_active = False
[docs] def reduction_table_right_click(self, pos, is_reduction_table=True): """ Handle right-click on the reduction table. :param QPoint pos: mouse position :param bool is_reduction_table: True if the reduction table is active, False if the direct beam table is active """ if is_reduction_table: table_widget = self.reduction_table data_table = self._data_manager.reduction_list else: table_widget = self.ui.normalizeTable data_table = self._data_manager.direct_beam_list def _export_data(_pos): """callback function to right-click action: Export data""" row = table_widget.rowAt(pos.y()) if 0 <= row < len(data_table): nexus_data = data_table[row] self.save_run_data(nexus_data) def _propagate_run(_pos): """callback function to right-click action: Propagate run to all tabs""" row = table_widget.rowAt(pos.y()) if 0 <= row < len(data_table): nexus_data = data_table[row] active_cross_section = self._data_manager.active_channel.name active_channel = nexus_data.cross_sections[active_cross_section] for ipeak, peak_data in self._data_manager.peak_reduction_lists.items(): if self._data_manager.copy_nexus_data_to_reduction(nexus_data, ipeak): # get widget for target reduction table target_widget = self.get_reduction_table_by_index(ipeak) idx = self._data_manager.find_run_number_in_reduction_list(nexus_data.number, peak_data) if idx is None: raise RuntimeError("Run number not in reduction list") target_widget.insertRow(idx) # update UI table widget self.update_reduction_table(target_widget, idx, active_channel) def _remove_run(_pos): """callback function to right-click action: Remove run from this tab""" self.remove_reflectivity() reduction_table_menu = QtWidgets.QMenu(table_widget) export_data_action = QtWidgets.QAction("Export data") export_data_action.triggered.connect(lambda: _export_data(pos)) reduction_table_menu.addAction(export_data_action) propagate_run_action = QtWidgets.QAction("Propagate run to all tabs") propagate_run_action.triggered.connect(lambda: _propagate_run(pos)) reduction_table_menu.addAction(propagate_run_action) remove_run_action = QtWidgets.QAction("Remove run from this tab") remove_run_action.triggered.connect(lambda: _remove_run(pos)) reduction_table_menu.addAction(remove_run_action) reduction_table_menu.exec_(table_widget.mapToGlobal(pos))
[docs] def save_run_data(self, nexus_data: NexusData): """ Save run data to file :param NexusData nexus_data: run data object """ path = QtWidgets.QFileDialog.getExistingDirectory(self.main_window, "Select directory") if not path: return # ask user for base name for files (one file for each cross-section, e.g. "REF_M_1234_data_Off-Off.dat") default_basename = f"REF_M_{nexus_data.number}_data" while True: # to ask again for new basename if the user does not want to overwrite existing files basename, ok = QtWidgets.QInputDialog.getText( self.main_window, "Base name", "Save file base name:", text=default_basename ) if not (ok and basename): # user cancels return save_filepaths = {} existing_filenames = [] for xs in nexus_data.cross_sections.keys(): filename = f"{basename}_{xs}.dat" filepath = os.path.join(path, filename) save_filepaths[xs] = filepath if os.path.isfile(filepath): existing_filenames.append(filename) newline = "\n" if len(existing_filenames) == 0 or self.ask_question( f"Overwrite existing file(s):\n{newline.join(existing_filenames)}?" ): break # save one file per cross-section for xs, filepath in save_filepaths.items(): cross_section = nexus_data.cross_sections[xs] data_to_save, header = cross_section.get_tof_counts_table() np.savetxt(filepath, data_to_save, header=header)
[docs] def compute_offspec_on_change(self, force=False): """ Compute off-specular as needed """ prog = self.new_progress_reporter() has_changed_values = self.check_region_values_changed() offspec_data_exists = self._data_manager.is_offspec_available() logging.info("Exists %s %s", has_changed_values, offspec_data_exists) if force or has_changed_values >= 0 or not offspec_data_exists: logging.info("Updating....") config = self.get_configuration() self._data_manager.update_configuration(configuration=config, active_only=False) self._data_manager.reduce_offspec(progress=prog)
[docs] def compute_gisans_on_change(self, force=False, active_only=True): """ Compute GISANS as needed """ prog = self.new_progress_reporter() has_changed_values = self.check_region_values_changed() gisans_data_exists = self._data_manager.is_gisans_available(active_only=active_only) logging.info("Exists %s %s %s", force, has_changed_values, gisans_data_exists) if force or has_changed_values >= 0 or not gisans_data_exists: logging.info("Updating....") config = self.get_configuration() self._data_manager.update_configuration(configuration=config, active_only=False) if active_only: self._data_manager.calculate_gisans(progress=prog) else: self._data_manager.reduce_gisans(active_only=active_only, progress=prog)
[docs] def check_region_values_changed(self): """ Return true if any of the parameters tied to a particular slot has changed. Some parameters are tied to the changeRegionValues() slot. There are time-consuming actions that we only want to take if those values actually changed, as opposed to the use simply clicking outside the box. Some parameters don't require a recalculation but simply a refreshing of the plots. Those are parameters such as scaling factors or the number of points clipped. Return values: -1 = no valid change 0 = replot needed 1 = recalculation needed """ if self._data_manager.active_channel is None: return -1 configuration = self._data_manager.active_channel.configuration valid_change = False replot_change = False # ROI parameters x_pos = self.ui.refXPos.value() x_width = self.ui.refXWidth.value() y_pos = self.ui.refYPos.value() y_width = self.ui.refYWidth.value() bck_pos = self.ui.bgCenter.value() bck_width = self.ui.bgWidth.value() valid_change = ( valid_change or not configuration.peak_position == x_pos or not configuration.peak_width == x_width ) valid_change = ( valid_change or not configuration.low_res_position == y_pos or not configuration.low_res_width == y_width ) valid_change = ( valid_change or not configuration.bck_position == bck_pos or not configuration.bck_width == bck_width ) try: scale = math.pow(10.0, self.ui.refScale.value()) except: scale = 1 replot_change = replot_change or not configuration.scaling_factor == scale replot_change = replot_change or not configuration.cut_first_n_points == self.ui.rangeStart.value() replot_change = replot_change or not configuration.cut_last_n_points == self.ui.rangeEnd.value() valid_change = valid_change or not configuration.subtract_background == self.ui.bgActive.isChecked() valid_change = valid_change or not configuration.use_constant_q == self.ui.fanReflectivity.isChecked() valid_change = valid_change or not configuration.use_dangle == self.ui.trustDANGLE.isChecked() valid_change = valid_change or not configuration.set_direct_pixel == self.ui.set_dirpix_checkbox.isChecked() valid_change = ( valid_change or not configuration.set_direct_angle_offset == self.ui.set_dangle0_checkbox.isChecked() ) if configuration.set_direct_pixel: valid_change = ( valid_change or not configuration.direct_pixel_overwrite == self.ui.directPixelOverwrite.value() ) if configuration.set_direct_angle_offset: valid_change = ( valid_change or not configuration.direct_angle_offset_overwrite == self.ui.dangle0Overwrite.value() ) # Final rebin valid_change = ( valid_change or not configuration.do_final_rebin_global == self.ui.final_rebin_checkbox_global.isChecked() ) valid_change = ( valid_change or not configuration.final_rebin_step_global == self.ui.q_rebin_spinbox_global.value() ) valid_change = ( valid_change or not configuration.do_final_rebin_run == self.ui.final_rebin_checkbox_run.isChecked() ) valid_change = valid_change or not configuration.final_rebin_step_run == self.ui.q_rebin_spinbox_run.value() if valid_change: return 1 if replot_change: return 0 return -1
[docs] def get_configuration(self) -> Configuration: r""" @brief Gather the reduction options. @details Retrieve the reduction options either from the active channel or from the current settings in the graphical interface. """ if self._data_manager.active_channel is not None: configuration = self._data_manager.active_channel.configuration else: configuration = Configuration(self.main_window.settings) configuration.tof_bins = self.ui.eventTofBins.value() configuration.tof_bin_type = self.ui.eventBinMode.currentIndex() # Peak finder options Configuration.use_roi = self.ui.use_roi_checkbox.isChecked() Configuration.update_peak_range = self.ui.fit_within_roi_checkbox.isChecked() Configuration.use_roi_bck = self.ui.use_bck_roi_checkbox.isChecked() Configuration.use_peak_finder = self.ui.actionAutoXROI.isChecked() Configuration.use_low_res_finder = self.ui.actionAutoYROI.isChecked() Configuration.force_bck_roi = self.ui.use_bck_roi_checkbox.isChecked() Configuration.use_tight_bck = self.ui.use_side_bck_checkbox.isChecked() Configuration.bck_offset = self.ui.side_bck_width.value() # Default ranges, using the current values configuration.peak_position = self.ui.refXPos.value() configuration.peak_width = self.ui.refXWidth.value() configuration.low_res_position = self.ui.refYPos.value() configuration.low_res_width = self.ui.refYWidth.value() configuration.bck_position = self.ui.bgCenter.value() configuration.bck_width = self.ui.bgWidth.value() configuration.match_direct_beam = self.ui.actionAutoNorm.isChecked() configuration.do_final_rebin_run = self.ui.final_rebin_checkbox_run.isChecked() configuration.final_rebin_step_run = self.ui.q_rebin_spinbox_run.value() # Other reduction options configuration.subtract_background = self.ui.bgActive.isChecked() try: scale = math.pow(10.0, self.ui.refScale.value()) except: scale = 1 configuration.scaling_factor = scale configuration.cut_first_n_points = self.ui.rangeStart.value() configuration.cut_last_n_points = self.ui.rangeEnd.value() Configuration.normalize_to_unity = self.ui.normalize_to_unity_checkbox.isChecked() Configuration.total_reflectivity_q_cutoff = self.ui.normalization_q_cutoff_spinbox.value() Configuration.global_stitching = self.ui.global_fit_checkbox.isChecked() Configuration.polynomial_stitching = self.ui.polynomial_stitching_checkbox.isChecked() Configuration.polynomial_stitching_degree = self.ui.polynomial_stitching_degree_spinbox.value() Configuration.polynomial_stitching_points = self.ui.polynomial_stitching_points_spinbox.value() Configuration.wl_bandwidth = self.ui.bandwidth_spinbox.value() Configuration.use_constant_q = self.ui.fanReflectivity.isChecked() configuration.use_dangle = self.ui.trustDANGLE.isChecked() configuration.set_direct_pixel = self.ui.set_dirpix_checkbox.isChecked() configuration.set_direct_angle_offset = self.ui.set_dangle0_checkbox.isChecked() configuration.direct_pixel_overwrite = self.ui.directPixelOverwrite.value() configuration.direct_angle_offset_overwrite = self.ui.dangle0Overwrite.value() Configuration.sample_size = self.ui.sample_size_spinbox.value() Configuration.do_final_rebin_global = self.ui.final_rebin_checkbox_global.isChecked() Configuration.final_rebin_step_global = self.ui.q_rebin_spinbox_global.value() Configuration.apply_deadtime = self.ui.deadtime_entry.applyCheckBox.isChecked() Configuration.lock_direct_beam_y = self.ui.direct_beam_y_lock_checkbox.isChecked() # UI elements configuration.normalize_x_tof = self.ui.normalizeXTof.isChecked() configuration.x_wl_map = self.ui.xLamda.isChecked() configuration.angle_map = self.ui.tthPhi.isChecked() configuration.log_1d = self.ui.logarithmic_y.isChecked() configuration.log_2d = self.ui.logarithmic_colorscale.isChecked() # Off-specular options if self.ui.kizmkfzVSqz.isChecked(): configuration.off_spec_x_axis = Configuration.DELTA_KZ_VS_QZ elif self.ui.qxVSqz.isChecked(): configuration.off_spec_x_axis = Configuration.QX_VS_QZ else: configuration.off_spec_x_axis = Configuration.KZI_VS_KZF configuration.off_spec_slice = self.ui.offspec_slice_checkbox.isChecked() configuration.off_spec_slice_qz_min = self.ui.slice_qz_min_spinbox.value() configuration.off_spec_slice_qz_max = self.ui.slice_qz_max_spinbox.value() # try: # qz_list = self.ui.offspec_qz_list_edit.text() # if len(qz_list) > 0: # configuration.off_spec_qz_list = [float(x) for x in self.ui.offspec_qz_list_edit.text().split(',')] # except: # logging.error("Could not parse off_spec_qz_list: %s", configuration.off_spec_qz_list) configuration.off_spec_err_weight = self.ui.offspec_err_weight_checkbox.isChecked() configuration.off_spec_nxbins = self.ui.offspec_rebin_x_bins_spinbox.value() configuration.off_spec_nybins = self.ui.offspec_rebin_y_bins_spinbox.value() configuration.off_spec_x_min = self.ui.offspec_x_min_spinbox.value() configuration.off_spec_x_max = self.ui.offspec_x_max_spinbox.value() configuration.off_spec_y_min = self.ui.offspec_y_min_spinbox.value() configuration.off_spec_y_max = self.ui.offspec_y_max_spinbox.value() # Off-spec smoothing options configuration.apply_smoothing = self.ui.offspec_smooth_checkbox.isChecked() # GISANS options configuration.gisans_wl_min = self.ui.gisans_wl_min_spinbox.value() configuration.gisans_wl_max = self.ui.gisans_wl_max_spinbox.value() configuration.gisans_wl_npts = self.ui.gisans_wl_npts_spinbox.value() configuration.gisans_qz_npts = self.ui.gisans_qz_npts_spinbox.value() configuration.gisans_qy_npts = self.ui.gisans_qy_npts_spinbox.value() configuration.gisans_use_pf = self.ui.gisans_pf_radio.isChecked() configuration.gisans_slice = self.ui.gisans_slice_checkbox.isChecked() configuration.gisans_slice_qz_min = self.ui.gisans_qz_min_spinbox.value() configuration.gisans_slice_qz_max = self.ui.gisans_qz_max_spinbox.value() return configuration
[docs] def populate_from_configuration(self, configuration=None): """Set reduction options in UI, usually after loading a reduced data set.""" if configuration is None: configuration = Configuration(self.main_window.settings) self.ui.eventTofBins.setValue(configuration.tof_bins) self.ui.eventBinMode.setCurrentIndex(configuration.tof_bin_type) # Peak finder settings self.ui.use_roi_checkbox.setChecked(configuration.use_roi) self.ui.use_bck_roi_checkbox.setChecked(configuration.use_roi_bck) self.ui.fit_within_roi_checkbox.setChecked(configuration.update_peak_range) self.ui.actionAutoXROI.setChecked(False if configuration.use_roi else configuration.use_peak_finder) self.ui.actionAutoYROI.setChecked(False if configuration.use_roi else configuration.use_low_res_finder) # Use background on each side of the peak self.ui.use_side_bck_checkbox.setChecked(configuration.use_tight_bck) self.ui.side_bck_width.setValue(configuration.bck_offset) # Update reduction parameters self.ui.refXPos.setValue(configuration.peak_position) self.ui.refXWidth.setValue(configuration.peak_width) self.ui.refYPos.setValue(configuration.low_res_position) self.ui.refYWidth.setValue(configuration.low_res_width) self.ui.bgCenter.setValue(configuration.bck_position) self.ui.bgWidth.setValue(configuration.bck_width) # Subtract background self.ui.bgActive.setChecked(configuration.subtract_background) # Scaling factor try: scale = math.log10(configuration.scaling_factor) except: scale = 0.0 self.ui.refScale.setValue(scale) # Cut first and last points self.ui.rangeStart.setValue(configuration.cut_first_n_points) self.ui.rangeEnd.setValue(configuration.cut_last_n_points) self.ui.normalize_to_unity_checkbox.setChecked(configuration.normalize_to_unity) self.ui.normalization_q_cutoff_spinbox.setValue(configuration.total_reflectivity_q_cutoff) self.ui.global_fit_checkbox.setChecked(configuration.global_stitching) self.ui.polynomial_stitching_checkbox.setChecked(configuration.polynomial_stitching) self.ui.polynomial_stitching_degree_spinbox.setValue(configuration.polynomial_stitching_degree) self.ui.polynomial_stitching_points_spinbox.setValue(configuration.polynomial_stitching_points) self.ui.bandwidth_spinbox.setValue(configuration.wl_bandwidth) self.ui.fanReflectivity.setChecked(configuration.use_constant_q) self.ui.trustDANGLE.setChecked(configuration.use_dangle) self.ui.set_dirpix_checkbox.setChecked(configuration.set_direct_pixel) self.ui.set_dangle0_checkbox.setChecked(configuration.set_direct_angle_offset) self.ui.directPixelOverwrite.setValue(configuration.direct_pixel_overwrite) self.ui.dangle0Overwrite.setValue(configuration.direct_angle_offset_overwrite) self.ui.sample_size_spinbox.setValue(configuration.sample_size) self.ui.final_rebin_checkbox_global.setChecked(configuration.do_final_rebin_global) self.ui.q_rebin_spinbox_global.setValue(configuration.final_rebin_step_global) self.ui.deadtime_entry.applyCheckBox.setChecked(configuration.apply_deadtime) self.ui.direct_beam_y_lock_checkbox.setChecked(configuration.lock_direct_beam_y) self.ui.final_rebin_checkbox_run.setChecked(configuration.do_final_rebin_run) if configuration.final_rebin_step_run: self.ui.q_rebin_spinbox_run.setValue(configuration.final_rebin_step_run) # UI elements self.ui.normalizeXTof.setChecked(configuration.normalize_x_tof) self.ui.xLamda.setChecked(configuration.x_wl_map) self.ui.tthPhi.setChecked(configuration.angle_map) self.ui.logarithmic_y.setChecked(configuration.log_1d) self.ui.logarithmic_colorscale.setChecked(configuration.log_2d) # Off-specular options if configuration.off_spec_x_axis == Configuration.DELTA_KZ_VS_QZ: self.ui.kizmkfzVSqz.setChecked(True) elif configuration.off_spec_x_axis == Configuration.QX_VS_QZ: self.ui.qxVSqz.setChecked(True) else: self.ui.kizVSkfz.setChecked(True) self.ui.offspec_slice_checkbox.setChecked(configuration.off_spec_slice) # self.ui.offspec_qz_list_edit.setText(','.join([str(x) for x in configuration.off_spec_qz_list])) self.ui.slice_qz_min_spinbox.setValue(configuration.off_spec_slice_qz_min) self.ui.slice_qz_max_spinbox.setValue(configuration.off_spec_slice_qz_max) self.ui.offspec_err_weight_checkbox.setChecked(configuration.off_spec_err_weight) self.ui.offspec_rebin_x_bins_spinbox.setValue(configuration.off_spec_nxbins) self.ui.offspec_rebin_y_bins_spinbox.setValue(configuration.off_spec_nybins) self.ui.offspec_x_min_spinbox.setValue(configuration.off_spec_x_min) self.ui.offspec_x_max_spinbox.setValue(configuration.off_spec_x_max) self.ui.offspec_y_min_spinbox.setValue(configuration.off_spec_y_min) self.ui.offspec_y_max_spinbox.setValue(configuration.off_spec_y_max) # Off-spec smoothing options self.ui.offspec_smooth_checkbox.setChecked(configuration.apply_smoothing) # GISANS options self.ui.gisans_wl_min_spinbox.setValue(configuration.gisans_wl_min) self.ui.gisans_wl_max_spinbox.setValue(configuration.gisans_wl_max) self.ui.gisans_wl_npts_spinbox.setValue(configuration.gisans_wl_npts) self.ui.gisans_qz_npts_spinbox.setValue(configuration.gisans_qz_npts) self.ui.gisans_qy_npts_spinbox.setValue(configuration.gisans_qy_npts) self.ui.gisans_pf_radio.setChecked(configuration.gisans_use_pf) self.ui.gisans_slice_checkbox.setChecked(configuration.gisans_slice) self.ui.gisans_qz_min_spinbox.setValue(configuration.gisans_slice_qz_min) self.ui.gisans_qz_max_spinbox.setValue(configuration.gisans_slice_qz_max)
[docs] def stitch_reflectivity(self): """ Stitch the reflectivity parts and normalize to 1. """ # Update the configuration so we can remember the cutoff value # later if it was changed self.get_configuration() if self.ui.polynomial_stitching_checkbox.isChecked(): poly_degree = self.ui.polynomial_stitching_degree_spinbox.value() else: poly_degree = None try: self._data_manager.stitch_data_sets( normalize_to_unity=self.ui.normalize_to_unity_checkbox.isChecked(), q_cutoff=self.ui.normalization_q_cutoff_spinbox.value(), global_stitching=self.ui.global_fit_checkbox.isChecked(), poly_degree=poly_degree, poly_points=self.ui.polynomial_stitching_points_spinbox.value(), ) except (RuntimeError, ValueError) as err: self.report_message( f"Error in stitching:\n{str(err)}", detailed_message=str(traceback.format_exc()), pop_up=True, is_error=True, ) except NormalizeToUnityQCutoffError as err: self.report_message( f"Error in normalize to unity when stitching:\n{str(err)}", detailed_message=str(traceback.format_exc()), pop_up=True, is_error=True, ) else: for i in range(len(self._data_manager.reduction_list)): xs = self._data_manager.active_channel.name d = self._data_manager.reduction_list[i].cross_sections[xs] self.reduction_table.setItem( i, 1, QtWidgets.QTableWidgetItem("%.4f" % (d.configuration.scaling_factor)) ) self.main_window.initiate_reflectivity_plot.emit(False)
[docs] def trim_data_to_normalization(self): """ Cut the start and end of the active data set to 5% of its maximum intensity. """ trim_points = self._data_manager.get_trim_values() if trim_points is not None: self.ui.rangeStart.setValue(trim_points[0]) self.ui.rangeEnd.setValue(trim_points[1]) self.update_tables() self.main_window.initiate_reflectivity_plot.emit(False) else: self.report_message("No direct beam found to trim data", pop_up=False)
[docs] def strip_overlap(self): """ Remove overlapping points in the reflectivity, cutting always from the lower Qz measurements. """ self._data_manager.strip_overlap() for i in range(len(self._data_manager.reduction_list)): xs = self._data_manager.active_channel.name d = self._data_manager.reduction_list[i].cross_sections[xs] self.reduction_table.setItem(i, 3, QtWidgets.QTableWidgetItem(str(d.configuration.cut_last_n_points))) self.main_window.initiate_reflectivity_plot.emit(False)
[docs] def reload_all_files(self): """ Reload all files upon change in loading configuration To speed up reloading, the file cache is first cleared of files that are not used in the reduction list or direct beam list. """ if self._data_manager.get_cachesize() == 0: return # Store the active (plotted) run index active_idx = self._data_manager.find_active_data_id() if active_idx is not None: is_active_data_direct_beam = False else: is_active_data_direct_beam = True active_idx = self._data_manager.find_active_direct_beam_id() # Store the active data tab active_data_tab = self._data_manager.active_reduction_list_index # Reload files self._data_manager.clear_cached_unused_data() configuration = self.get_configuration() prog = ProgressReporter(progress_bar=self.progress_bar, status_bar=self.status_bar_handler) self._data_manager.reload_files(configuration, prog) # Update the tables in the UI self.main_window.auto_change_active = True self.ui.normalizeTable.setRowCount(len(self._data_manager.direct_beam_list)) for idx, _ in enumerate(self._data_manager.direct_beam_list): self._data_manager.set_active_data_from_direct_beam_list(idx) self.update_direct_beam_table(idx, self._data_manager.active_channel) self.ui.reductionTable.setRowCount(len(self._data_manager.reduction_list)) for ipeak, peak_data in self._data_manager.peak_reduction_lists.items(): self._data_manager.set_active_reduction_list_index(ipeak) table_widget = self.get_reduction_table_by_index(ipeak) table_widget.setRowCount(len(self._data_manager.reduction_list)) for idx, _ in enumerate(self._data_manager.reduction_list): self._data_manager.set_active_data_from_reduction_list(idx) self.update_reduction_table(table_widget, idx, self._data_manager.active_channel) direct_beam_ids = [str(r.number) for r in self._data_manager.direct_beam_list] self.ui.normalization_list_label.setText(", ".join(direct_beam_ids)) # Restore the active data tab self._data_manager.set_active_reduction_list_index(active_data_tab) # Restore the active run if is_active_data_direct_beam: self._data_manager.set_active_data_from_direct_beam_list(active_idx) else: self._data_manager.set_active_data_from_reduction_list(active_idx) # Update plots self.file_loaded() self.main_window.auto_change_active = False
[docs] def report_message(self, message, informative_message=None, detailed_message=None, pop_up=False, is_error=False): r""" Report a message or error to the status bar at the bottom of the window. :param str message: message string to be reported :param str informative_message: extra information :param str detailed_message: detailed message for the log :param bool pop_up: if True, a dialog will pop up :param bool is_error: if True, the message is logged on the error channel """ self.status_bar_handler.show_message(message) if is_error: logging.error(message) if detailed_message is not None: logging.error(detailed_message) elif pop_up: logging.warning(message) else: logging.info(message) if pop_up: msg = QtWidgets.QMessageBox() msg.setIcon(QtWidgets.QMessageBox.Warning) msg.setText(message) msg.setWindowTitle("Information") if informative_message is not None: msg.setInformativeText(informative_message) if detailed_message is not None: msg.setDetailedText(detailed_message) msg.setStandardButtons(QtWidgets.QMessageBox.Ok) msg.exec_()
[docs] def ask_question(self, message): """ Display a popup dialog with a message and choices "Ok" and "Cancel" :param str message: question to ask :returns: bool """ ret = QtWidgets.QMessageBox.warning( self.main_window, "Warning", message, buttons=QtWidgets.QMessageBox.Ok | QtWidgets.QMessageBox.Cancel, defaultButton=QtWidgets.QMessageBox.Ok, ) if ret == QtWidgets.QMessageBox.Cancel: return False return True
[docs] def show_results(self): """ Pop up the result viewer """ from quicknxs.interfaces.result_viewer import ResultViewer dialog = ResultViewer(self.main_window, self._data_manager) dialog.specular_compare_widget.ui.refl_preview_checkbox.setChecked(True) self.main_window.update_specular_viewer.connect(dialog.update_specular) self.main_window.update_off_specular_viewer.connect(dialog.update_off_specular) self.main_window.update_gisans_viewer.connect(dialog.update_gisans) dialog.show()
[docs] def toggle_final_rebin_global(self, state): r"""When the global rebin checkbox is toggled, update the run rebin checkbox so that only one of them can be checked at a time.""" # **Ensure mutual exclusivity of the checkboxes** if state == QtCore.Qt.Checked: self.ui.final_rebin_checkbox_run.blockSignals(True) self.ui.final_rebin_checkbox_run.setChecked(False) self.ui.final_rebin_checkbox_run.blockSignals(False) # col_index = self.ui.reductionTable.get_column_index("Q-Steps") col_index = 13 self.ui.reductionTable.blockSignals(True) for row in range(self.ui.reductionTable.rowCount()): item = self.ui.reductionTable.item(row, col_index) if item is None: item = QtWidgets.QTableWidgetItem("") if item: if state == QtCore.Qt.Checked: _item = QtWidgets.QTableWidgetItem(item.text()) _item.setFlags(item.flags() & ~QtCore.Qt.ItemIsEditable) _item.setBackground(QtGui.QColor(220, 220, 220)) self.ui.reductionTable.setItem(row, col_index, _item) else: _item = QtWidgets.QTableWidgetItem(item.text()) _item.setFlags(item.flags() | QtCore.Qt.ItemIsEditable) _item.setBackground(QtGui.QColor(255, 255, 255)) self.ui.reductionTable.setItem(row, col_index, _item) self.ui.reductionTable.blockSignals(False)
[docs] def toggle_final_rebin_run(self, state): """When the run rebin checkbox is toggled, update the global rebin checkbox so that only one of them can be checked at a time.""" # **Ensure mutual exclusivity of the checkboxes** if state == QtCore.Qt.Checked: self.ui.final_rebin_checkbox_global.setChecked(False)