Source code for drtsans.instruments

# third party imports
from mantid.kernel import ConfigService
from mantid.api import mtd, MatrixWorkspace
from mantid.dataobjects import EventWorkspace
from mantid.simpleapi import (
    config,
    LoadInstrument,
    LoadEmptyInstrument,
    MergeRuns,
    RemoveSpectra,
    RenameWorkspace,
)

# standard imports
import enum
import os
import shutil
import subprocess
from typing import Optional, Union


__all__ = [
    "InstrumentEnumName",
    "instrument_enum_name",
    "instrument_standard_name",
    "is_time_of_flight",
]

INSTRUMENT_LABELS = ["CG3", "BIOSANS", "EQ-SANS", "EQSANS", "CG2", "GPSANS"]


[docs] @enum.unique class InstrumentEnumName(enum.Enum):
[docs] @staticmethod def names(): r"""Standard names for all instruments, in alphabetical order""" names_all = list(map(str, InstrumentEnumName)) names_all.remove("UNDEFINED") return sorted(names_all)
r"""Unique names labelling each instrument""" UNDEFINED = None # usually the dummy instrument used for testing BIOSANS = ConfigService.getFacility("HFIR").instrument("BIOSANS") EQSANS = ConfigService.getFacility("SNS").instrument("EQSANS") GPSANS = ConfigService.getFacility("HFIR").instrument("GPSANS") def __str__(self): return self.name
[docs] def instrument_enum_name(input_query): r""" Resolve the instrument name as a unique enumeration. Parameters ---------- input_query: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventsWorkspace string representing a filepath, a valid instrument name, or a Mantid workspace containing an instrument Returns ------- InstrumentEnumName The name of the instrument as one of the InstrumentName enumerations """ string_to_enum = { "CG3": InstrumentEnumName.BIOSANS, "BIOSANS": InstrumentEnumName.BIOSANS, "EQ-SANS": InstrumentEnumName.EQSANS, "EQSANS": InstrumentEnumName.EQSANS, "CG2": InstrumentEnumName.GPSANS, "GPSANS": InstrumentEnumName.GPSANS, } # convert to a string name = str(input_query) if name in mtd: # convert mantid workspace into a instrument string name = mtd[str(name)].getInstrument().getName() else: # see if `name` contains any of the instrument labels name = name.upper() for instrument_string_label in sorted(string_to_enum.keys()): if instrument_string_label in name: name = instrument_string_label break return string_to_enum.get(name.upper(), InstrumentEnumName.UNDEFINED)
[docs] def instrument_standard_name(input_query): r""" Resolve the standard instrument name. Parameters ---------- input_query: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventsWorkspace string representing a filepath, a valid instrument name, or a Mantid workspace containing an instrument Returns ------- str The name of the instrument as the string representation of one of the InstrumentName enumerations """ return str(instrument_enum_name(input_query))
def instrument_standard_names(): r"""Standard names for all instruments, in alphabetical order""" return InstrumentEnumName.names() def instrument_filesystem_name(input_query): r""" Resolve the name of the instrument that is the subdirectory name under /SNS or /HFIR Parameters ---------- input_query: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventsWorkspace string representing a filepath, a valid instrument name, or a Mantid workspace containing an instrument Returns ------- str """ filesystem_name = {"BIOSANS": "CG3", "EQSANS": "EQSANS", "GPSANS": "CG2"} return filesystem_name[instrument_standard_name(input_query)] def instrument_label(input_query): r""" Resolve the instrument name. Parameters ---------- input_query: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventsWorkspace string representing a filepath, a valid instrument name, or a Mantid workspace containing an instrument Returns ------- str """ # convert to a string name = str(input_query) if name in mtd: # convert mantid workspace into a instrument string return mtd[str(name)].getInstrument().getName() else: # see if `name` contains any of the instrument labels name = name.upper() for instrument_string_label in INSTRUMENT_LABELS: if instrument_string_label in name: return instrument_string_label raise RuntimeError('Instrument name can not be resolved from "{}"'.format(input_query)) def extract_run_number(input_query): r""" Extract the run number from string Example: input string '/HFIR/..../CG3_961.nxs.h5', 'CG3_961.nxs.h5', 'CG3961', and 'CG3_961' should all return run number 961 Parameters ---------- input_query: str Returns ------- int """ try: # see if `input_query` is an integer run_number = int(input_query) except ValueError: # name of the file without path run_number = os.path.basename(input_query) # everything up to the extension run_number = run_number.split(".")[0] # remove the instrument name for label in INSTRUMENT_LABELS: run_number = run_number.replace(label, "") # remove any remaining '_' if "_" in run_number: run_number = run_number.split("_")[1] # convert to an integer return int(run_number)
[docs] def is_time_of_flight(input_query): r""" Find if the instrument is a time-of-flight one Parameters ---------- input_query: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventsWorkspace, InstrumentEnumName string representing a valid instrument name, or a Mantid workspace containing an instrument Returns ------- bool """ return instrument_enum_name(input_query) is InstrumentEnumName.EQSANS # we only have one, for the moment
def fetch_idf(idf_xml: str, output_directory: str = os.getcwd()): r""" Download an IDF from the Mantid GitHub repository to a temporary directory. If the download fails, attempt to find the IDF in the local instrument directories and copy to the temporary directory. Parameters ---------- idf_xml The name of the IDF file to download. output_directory: Final location of the IDF file. Returns ------- str absolute path to the downloaded IDF file. """ def _empty_download(filepath): r"""The curl command may return and exit code of 0, signaling success, yet the file may contain only the string '404: Not Found'. This function checks for this scenario.""" return "404: Not Found" in open(filepath).read() idf = os.path.join(str(output_directory), idf_xml) url = f"https://raw.githubusercontent.com/mantidproject/mantid/main/instrument/{idf_xml}" result = subprocess.run(f"curl -o {idf} {url}", shell=True, capture_output=True, text=True) if result.returncode == 0 and not _empty_download(idf): return idf else: print(f"Dowloading {idf_xml} failed with error: {result.stderr}.") print("Attempting to find the IDF in the local instrument directories.") local_dirs = config.getInstrumentDirectories() for instrument_directory in local_dirs: idf_local = os.path.join(instrument_directory, idf_xml) if os.path.isfile(idf_local): shutil.copy(idf_local, idf) return idf raise FileNotFoundError(f"IDF {idf_xml} not found in the local instrument directories {local_dirs}.") def empty_instrument_workspace( output_workspace: str, filename: Optional[str] = None, instrument_name: Optional[str] = None, event_workspace: Optional[bool] = False, monitors_have_spectra: Optional[bool] = False, ) -> Union[MatrixWorkspace, EventWorkspace]: r""" Create an emtpy workspace for one of the standard instruments. By default, monitors do not have associated spectra. Invokes ~mantid.simpleapi.LoadEmptyInstrument to create an empty instrument workspace. Parameters ---------- output_workspace Name of the output workspace filename Path to the instrument filename. If not absolute path, mantid will search in the instrument directory. instrument_name Alternative of option ``filename``. Mantid will search for the latest instrument file in the instrument directory. event_workspace If True, create an event workspace, otherwise a histogram workspace. monitors_have_spectra If True, create a workspace with spectra for the monitors. Returns ------- A handle to the empty workspace """ workspace = LoadEmptyInstrument( OutputWorkspace=output_workspace, Filename=filename, InstrumentName=instrument_name, MakeEventWorkspace=event_workspace, ) if monitors_have_spectra is False: # get the list of non-negative detector IDs (i.e. excluding monitors) detector_ids = workspace.detectorInfo().detectorIDs() monitor_count = detector_ids[detector_ids < 0].size # monitors have negative IDs always detector_ids = detector_ids[detector_ids >= 0] # these are the detector IDs for detector pixels # iterate over the spectra, assigning one detector ID to each spectrum for wi, detid in zip(range(workspace.getNumberHistograms()), detector_ids): spectrum = workspace.getSpectrum(wi) spectrum.setDetectorID(int(detid)) # the number of spectra should be equal to the number of monitors plus the number detector pixels. Thus, # we have an excess of spectra in the amount of `monitor_count`. They need to be removed now to_remove = list(range(workspace.getNumberHistograms() - monitor_count, workspace.getNumberHistograms())) workspace = RemoveSpectra( Inputworkspace=output_workspace, OutputWorkspace=output_workspace, WorkspaceIndices=to_remove ) return workspace def copy_to_newest_instrument( input_workspace: Union[str, MatrixWorkspace, EventWorkspace], output_workspace: Optional[str] = None, ) -> Union[MatrixWorkspace, EventWorkspace]: r""" Copy the workspace intensities and/or events to the latest instrument file. Will also copy the logs and preserve the original locations of the main and wing detectors. Parameters ---------- input_workspace Workspace containing an old instrument definition file (IDF). output_workspace Workspace containing the intensities and/or events of input_workspace but with the latest IDF. If ``None``, the input workspace is overwritten """ origin = mtd[str(input_workspace)] if output_workspace is None: target_workspace = mtd.unique_hidden_name() # temporary name else: target_workspace = output_workspace instrument_file = fetch_idf(f"{origin.getInstrument().getName()}_Definition.xml") target = empty_instrument_workspace( output_workspace=target_workspace, filename=instrument_file, event_workspace=isinstance(origin, EventWorkspace), monitors_have_spectra=(origin.getSpectrum(0).getDetectorIDs()[0] < 0), ) # for algorithm MergeRuns to work, units of origin and target workspace must match origin_unit = origin.getAxis(0).getUnit().unitID() target.getAxis(0).setUnit(origin_unit) target.setYUnit(origin.YUnit()) MergeRuns( InputWorkspaces=[target_workspace, input_workspace], OutputWorkspace=target_workspace # order is necessary ) # Move components to the positions they have in input_workspace by reading their positions # in the logs. This is implicitly done when invoking algorithm LoadInstrument. LoadInstrument(Workspace=target_workspace, Filename=instrument_file, RewriteSpectraMap=False) if output_workspace is None: # overwrite the input workspace target = RenameWorkspace(InputWorkspace=target_workspace, OutputWorkspace=str(input_workspace)) return target def instrument_facility_name(input_query): """Get the facility name for the instrument Parameters ---------- input_query: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventWorkspace string representing a filepath, a valid instrument name, or a Mantid workspace containing an instrument Returns ------- str The name of facility for the instrument. """ try: instrument = ConfigService.getInstrument(instrument_standard_name(input_query)) except RuntimeError as exc: raise ValueError(f"Failed to find instrument: {str(input_query)}") from exc return str(instrument.facility())