Source code for quicknxs.models.diagnostic_data

import mantid.simpleapi as api

from quicknxs.utils.filepath import FilePath


[docs] class DiagnosticData: """Diagnostic data for a run or group of runs to help understand and debug issues with QuickNXS workspaces""" def __init__(self, file_path: str | list[str] | None = None, message: str | None = None, min_num_evts: int = 100): self.min_num_events = min_num_evts self.bad_files = set() self.file_path = None self.xs_list = [] self.diagnostic_data = [] self.sample_logs = [] if file_path: self.file_path = FilePath(file_path) self.bad_files.add(self.file_path.split()[1]) self.diagnostic_data = self._extract_diagnostic_data() self.sample_logs = self._get_sample_logs() if message is None: message = f"No valid cross-sections found in file: {file_path}" self.message = message def _extract_diagnostic_data(self): """Extract diagnostic information from workspaces. Returns ------- list of dict List of dictionaries containing diagnostic information for each workspace. """ diagnostic_data = [] run_numbers = self.file_path.run_numbers() _xs_list = [] path_dict = dict() for idx, path in enumerate(self.file_path.single_paths): _, fpath = FilePath(path).split() path_dict[run_numbers[idx]] = fpath ws_name = api.mtd.unique_hidden_name() ws = api.LoadEventNexus(Filename=path, OutputWorkspace=ws_name) ws.mutableRun().addProperty("run_number", run_numbers[idx], True) _xs_list.append(ws) self.xs_list = _xs_list for ws in self.xs_list: run = ws.getRun() def get_prop_value(run, prop_name, default="N/A"): if run.hasProperty(prop_name): try: prop = run.getProperty(prop_name) if hasattr(prop, "getStatistics"): return prop.getStatistics().mean else: return prop.value except Exception: return default return default data = {} run_number = get_prop_value(run, "run_number") xs_id = get_prop_value(run, "cross_section_id") if xs_id != "N/A": data["cross_section_id"] = f"Run {run_number} ({xs_id})" else: data["cross_section_id"] = f"Run {run_number}" event_count = ws.getNumberEvents() if event_count < self.min_num_events: self.bad_files.add(path_dict[run_number]) data["event_count"] = event_count data["lambda_center"] = get_prop_value(run, "LambdaRequest") data["direct_pixel"] = get_prop_value(run, "DIRPIX") data["proton_charge"] = get_prop_value(run, "gd_prtn_chrg") sample_angle = get_prop_value(run, "SampleAngle") if sample_angle == "N/A": sample_angle = get_prop_value(run, "SANGLE") data["sample_angle"] = sample_angle data["dangle"] = get_prop_value(run, "DANGLE") data["dangle0"] = get_prop_value(run, "DANGLE0") counting_time = get_prop_value(run, "duration") data["counting_time"] = counting_time if counting_time != "N/A" and counting_time > 0: data["count_rate"] = event_count / counting_time else: data["count_rate"] = "N/A" diagnostic_data.append(data) return diagnostic_data def _get_sample_logs(self): """Extract all sample logs from workspaces. Returns ------- list of dict List of dictionaries, each containing: - 'run_id': identifier for the run - 'logs': list of tuples (property_name, property_value) """ sample_logs = [] for ws in self.xs_list: run = ws.getRun() run_id = run.getProperty("run_number").value logs = [] for prop in run.getProperties(): logs.append((prop.name, str(prop.value))) logs = sorted(logs, key=lambda x: x[0]) sample_logs.append({"run_id": run_id, "logs": logs}) return sample_logs