Source code for quicknxs.interfaces.data_handling.filepath

"""Classes to handle string representations of sets of run numbers and absolute paths to data files."""

import itertools
import operator
import os
import re
from typing import List, Optional, Union


[docs] class RunNumbers(object): """A helper class to handle string representations of one or more run numbers. Translates from a string representation to a list of run numbers, and viceversa """ merge_symbol = "+" range_symbol = ":" def __init__(self, numbers: Union[List[int], List[str], int, str]) -> None: """Initialize the RunNumbers object. Parameters ---------- numbers: a list of numbers or a string containing one or more numbers. For instance, '1:3+5' translates to [1, 2, 3, 5] """ self._numbers: List[int] if isinstance(numbers, int): self._numbers = [numbers] # just one run number elif isinstance(numbers, list): self._numbers = sorted([int(n) for n in numbers]) elif isinstance(numbers, str): if self.merge_symbol in numbers or self.range_symbol in numbers: self._numbers = sorted(self._uncompress(numbers)) else: self._numbers = [int(numbers)] # just one run number else: raise ValueError("Constructor requires a list or a string of run numbers as input") def _uncompress(self, numbers: str) -> List[int]: """Split a string representation of a set of run numbers into a list. Example: '1:3+6' becomes [1, 2, 3, 6] """ run_numbers = list() for run_range in numbers.split(self.merge_symbol): # e.g. 1:3+6' becomes ['1:3', '6'] if self.range_symbol in run_range: # e.g '2:7' first, last = [int(n) for n in run_range.split(self.range_symbol)] run_numbers.extend(list(range(first, last + 1))) else: # a single run number, e.g. '4' run_numbers.append(int(run_range)) return run_numbers @property def numbers(self) -> List[int]: """List of run numbers as a list of integers.""" return self._numbers @property def long(self) -> str: """Long string representation of the run numbers. Example: [1, 2, 3, 6] becomes '1+2+3+6' """ return self.merge_symbol.join([str(n) for n in self._numbers]) @property def short(self) -> str: """Short string representation of the run numbers. Example: [1, 2, 3, 6] becomes '1:3+6' """ ranges = list() for _, g in itertools.groupby(enumerate(self._numbers), lambda i_run_number: i_run_number[0] - i_run_number[1]): runs = list(map(operator.itemgetter(1), g)) # e.g. [3,4,5] run_range = str(runs[0]) if len(runs) == 1 else f"{runs[0]}{self.range_symbol}{runs[-1]}" ranges.append(run_range) return self.merge_symbol.join(ranges) @property def statement(self) -> str: """Human readable string representation. Examples: '12', '12 and 13', '12, 13, and 14' """ runs_str = [str(n) for n in self._numbers] if len(runs_str) == 1: return runs_str[0] runs = ", ".join(runs_str[:-1]) if len(runs_str) > 2: runs += "," runs += " and " + runs_str[-1] return runs
[docs] class FilePath(object): """Helper class to deal with string representation of one or more absolute file paths. Example: file_path = '/SNS/REF_M/IPTS-25531/nexus/REF_M_38202.nxs.h5+/SNS/REF_M/IPTS-25531/nexus/REF_M_38201.nxs.h5' NOTE: Paths are sorted """ merge_symbol = "+"
[docs] @classmethod def join(cls, dirname: str, basename: str, sort: bool = True) -> str: r"""Create the file path for a single file or a set of files using one directory. Example: u'/SNS/REF_M/IPTS-25531/nexus/REF_M_38198.nxs.h5+/SNS/REF_M/IPTS-25531/nexus/REF_M_38199.nxs.h5' Parameters ---------- dirname: absolute path to a directory basename: name of one or more files. If more than one file, they're concatenated with the merge symbol '+'. Example: u'REF_M_38198.nxs.h5+REF_M_38199.nxs.h5' sort: if True, sort the basenames according to increasing run number when more than one file. Returns ------- str: string representing the absolute path to the files. """ base_names = basename.split(cls.merge_symbol) file_paths = [os.path.join(dirname, name) for name in base_names] if sort: file_paths.sort() return str(cls.merge_symbol.join(file_paths))
[docs] @classmethod def unique_dirname(cls, file_path): """For composite file paths, check that the dirname of the paths is the same for all files.""" dirs = [os.path.dirname(path) for path in file_path.split(cls.merge_symbol)] if len(set(dirs)) > 1: return False return True
def __init__(self, file_path: Union[str, List[str]], sort: bool = True): if isinstance(file_path, list): file_path = self.merge_symbol.join(file_path) if not self.unique_dirname(file_path): raise ValueError(f"files in {file_path} reside in different directories") if self.merge_symbol in file_path: if sort: paths = sorted(file_path.split(self.merge_symbol)) self._file_path = str(self.merge_symbol.join(paths)) else: self._file_path = str(file_path) else: self._file_path = str(file_path) def __str__(self): return self._file_path @property def path(self): return self._file_path @property def single_paths(self): if self.is_composite: return self._file_path.split(self.merge_symbol) return [self._file_path] @property def is_composite(self): return self.merge_symbol in self._file_path @property def dirname(self): if self.merge_symbol in self._file_path: first_path = self._file_path.split(self.merge_symbol)[0] return os.path.dirname(first_path) return os.path.dirname(self._file_path) @property def basename(self): if self.merge_symbol in self._file_path: names = [os.path.basename(name) for name in self._file_path.split(self.merge_symbol)] return self.merge_symbol.join(names) return os.path.basename(self._file_path) @property def first_path(self): if self.is_composite: return self._file_path.split(self.merge_symbol)[0] return self._file_path
[docs] def split(self): return self.dirname, self.basename
[docs] def run_numbers(self, string_representation: Optional[str] = None) -> Union[List[int], str]: """Return the run number(s) associated to this file path. This function assumes the basename of each single file path has the pattern "REF_M_XXXX.*" where 'XXXX' is the run number to extract, and * is some file extension Parameters ---------- string_representation: One of [None, "long", "short", "statement"] to return the run numbers as a list of integers, a long string representation, a short string representation, or a human readable statement, respectively. """ numbers = list() for path in self.single_paths: match = re.search(r"REF_M_(\d+)", path) if match is None: raise ValueError(f"Could not extract run number in file path {path}") numbers.append(int(match.groups()[0])) numbers.sort() # this should be unnecessary, though, since self._file_path is already sorted if string_representation is None: return numbers elif string_representation in ("long", "short", "statement"): return getattr(RunNumbers(numbers), string_representation) else: raise ValueError('parameter string_representation must be one of [None, "long", "short", "statement"]')