Source code for drtsans.load

# standard modules
import h5py
import re
from typing import List, Tuple, Union

# third party modules
import mantid
from mantid.dataobjects import Workspace2D, EventWorkspace
from mantid.simpleapi import mtd
from mantid.simpleapi import (
    DeleteWorkspace,
    LoadEventNexus,
    MergeRuns,
    GenerateEventsFilter,
    FilterEvents,
)
from mantid.simpleapi import AddSampleLogMultiple
from mantid.kernel import Logger, amend_config
from drtsans.geometry import (
    translate_detector_by_z,
    translate_sample_by_z,
    translate_source_by_z,
)

# package modules
from drtsans.instruments import (
    extract_run_number,
    instrument_enum_name,
    InstrumentEnumName,
    instrument_facility_name,
    is_time_of_flight,
)
from drtsans.path import abspath, registered_workspace, exists as path_exists
from drtsans.pixel_calibration import apply_calibrations
from drtsans.samplelogs import SampleLogs, periodic_index_log


__all__ = ["load_events", "sum_data", "load_and_split", "move_instrument"]

logger = Logger("drtsans.load")


def __monitor_counts(filename, monitor_name="monitor1"):
    r"""Get the total number of counts in a single monitor

    Parameters
    ----------
    filename: str
        Absolute path to the HDF5 file to be read
    monitor_name: str
        Name of the monitor to determine the total counts of

    Raises
    ------
    RuntimeError
        The HDF5 file does not contain a monitor entry
    """
    counts = 0  # default value is zero
    with h5py.File(filename, "r") as handle:
        if monitor_name not in handle["entry"]:
            raise RuntimeError('File "{}" does not contain /entry/{}'.format(filename, monitor_name))
        # open the monitor group
        nxmonitor = handle["entry"][monitor_name]

        # get the number of counts from the total counts array or the monitor array
        if "total_counts" in nxmonitor:
            counts = nxmonitor["total_counts"][0]
        else:
            counts = nxmonitor["event_time_offset"].shape[0]
    return int(counts)


def _insert_periodic_timeslice_log(
    input_workspace: Union[str, Workspace2D, EventWorkspace],
    name: str,
    time_interval: float,
    time_period: float,
    time_offset: float = 0,
):
    r"""
    Insert a log into the input workspace that emulates the periodicity of the time intervals. Within a given
    period, the log value is incremented by one every ``time_interval`` seconds, starting from zero.

    Parameters
    ----------
    input_workspace
    name
        Name of the log to insert
    time_interval
        Interval between consecutive log entries, in seconds.
    time_period
        The time in between equal log values, in seconds. Must be a multiple integer of ``time_interval``.
    time_offset
    """
    sample_logs = SampleLogs(input_workspace)
    try:
        run_start = sample_logs.run_start.value
    except AttributeError:
        run_start = sample_logs.start_time.value
    log = periodic_index_log(
        period=time_period,
        interval=time_interval,
        duration=sample_logs.duration.value,
        run_start=run_start,
        offset=time_offset,
        step=1.0,
        name=name,
    )
    sample_logs.insert(name=name, value=log)


[docs] def load_events( run, data_dir=None, output_workspace=None, overwrite_instrument=True, output_suffix="", pixel_calibration=False, detector_offset=0.0, sample_offset=0.0, reuse_workspace=False, **kwargs, ): r""" Load an event Nexus file produced by the instruments at ORNL. Parameters ---------- run: str, ~mantid.api.IEventWorkspace Examples: ``CG3_55555``, ``CG355555`` or file path. output_workspace: str If not specified it will be ``BIOSANS_55555`` determined from the supplied value of ``run``. data_dir: str, list Additional data search directories overwrite_instrument: bool, str If not :py:obj:`False`, ignore the instrument embedeed in the Nexus file. If :py:obj:`True`, use the latest instrument definition file (IDF) available in Mantid. If ``str``, then it should be the filepath to the desired IDF. output_suffix: str If the ``output_workspace`` is not specified, this is appended to the automatically generated output workspace name. pixel_calibration: bool, str Adjust pixel heights and widths according to bar-scan and tube-width calibrations. Options are (1) No calibration (2) Using default calibration file (True) and (3) User specified calibration file (str) detector_offset: float Additional translation of the detector along the Z-axis, in mm. Positive moves the detector downstream. sample_offset: float Additional translation of the sample, in mm. The sample flange remains at the origin of coordinates. Positive moves the sample downstream. reuse_workspace: bool When true, return the ``output_workspace`` if it already exists kwargs: dict Additional positional arguments for :ref:`LoadEventNexus <algm-LoadEventNexus-v1>`. Returns ------- ~mantid.api.IEventWorkspace Reference to the events workspace """ instrument_unique_name = instrument_enum_name(run) # determine which SANS instrument run_number = extract_run_number(run) if isinstance(run, str) else "" filename = run if path_exists(run) else "{}{}".format(instrument_unique_name, run_number) # create default name for output workspace if (output_workspace is None) or (not output_workspace) or (output_workspace == "None"): output_workspace = "{}_{}{}".format(instrument_unique_name, run_number, output_suffix) # determine if this is a monochromatic measurement is_mono = not is_time_of_flight(instrument_unique_name) # retrieve or load workspace if reuse_workspace and mtd.doesExist(output_workspace): # if it exists skip loading return mtd[output_workspace] else: # load the data into the appropriate workspace facility = instrument_facility_name(instrument_unique_name) with amend_config(facility=facility, instrument=str(instrument_unique_name), data_dir=data_dir): # not loading the instrument xml from the nexus file will use the correct one that is inside mantid # decide the value of LoadNexusInstrumentXML # if not specified, determine by overwrite_instrument if "LoadNexusInstrumentXML" not in kwargs: kwargs["LoadNexusInstrumentXML"] = not overwrite_instrument logger.notice(f"Loading {filename} to {output_workspace}") LoadEventNexus(Filename=filename, OutputWorkspace=output_workspace, **kwargs) # FIXME - what is the difference from: pixel_calibration is True? if pixel_calibration is not False: # pixel calibration is specified as not False if isinstance(pixel_calibration, str): calib_file = pixel_calibration else: calib_file = None apply_calibrations(output_workspace, database=calib_file) # insert monitor counts for monochromatic instruments if is_mono: # determine the fully qualified file path if "Filename" in mtd[output_workspace].run(): # from the existing workspace filename = str(mtd[output_workspace].run()["Filename"].value) else: # use archive search filename = str(abspath(filename)) # create new log with the monitor counts if monitor counts exists and the log is not already present if "monitor" not in SampleLogs(output_workspace): try: SampleLogs(output_workspace).insert("monitor", __monitor_counts(filename)) except RuntimeError as e: logger.warning(str(e)) # log a warning that monitor info not found in filename # move instrument components - sample position must happen first # Translate source along Z-axis translate_source_by_z(output_workspace, z=None, relative=False) # FIXME (485) - This shall be modified accordingly from drtsans.geometry import sample_detector_distance if is_mono: # HFIR-SANS: use new method out_ws = mtd[str(output_workspace)] logger.notice( f"Before translate source and sample:\n" f"Sample position = {out_ws.getInstrument().getSample().getPos()}\n" f"DAS SDD = " f"{sample_detector_distance(output_workspace, search_logs=True, forbid_calculation=True)}\n" f"Calculated SDD = {sample_detector_distance(output_workspace, search_logs=False)}" ) # Determine detector and sample offset from meta data afterwards else: # For TOF (i.e., EQSANS), still translate sample and detector as usual try: # get DAS recorded SDD das_sdd = sample_detector_distance(output_workspace, search_logs=True, forbid_calculation=True) except RuntimeError as run_err: # it may not exist if "Unable to find any meta data related to SDD" in str(run_err): # it is fine if das recorded SDD does not exist das_sdd = None else: raise run_err translate_sample_by_z(output_workspace, 1e-3 * float(sample_offset)) # convert sample offset from mm to meter translate_detector_by_z(output_workspace, None) # search logs and translate if necessary translate_detector_by_z(output_workspace, 1e-3 * float(detector_offset)) real_sdd = sample_detector_distance(output_workspace, search_logs=False) logger.notice(f"EQSANS workspace {str(output_workspace)} SDD is equal to {real_sdd}") if das_sdd is not None: assert real_sdd == das_sdd, f"EQSANS DAS SDD = {das_sdd}, Calculated SDD = {real_sdd}" return mtd[output_workspace]
[docs] def move_instrument( workspace, sample_offset, detector_offset, is_mono=False, sample_si_name=None, si_window_to_nominal_distance=None, ): """Move instrument sample and detector Parameters ---------- workspace: Workspace with instrument's sample or detector translated along z-axis sample_offset: float sample offset in unit meter detector_offset: float detector offset in unit meter is_mono: bool Flag that it belongs to a mono-SANS sample_si_name: str Name of Sample to silicon window name si_window_to_nominal_distance : float or None distance between Silicon window and sample in unit of meter Returns ------- ~mantid.api.IEventWorkspace, ~mantid.api.MatrixWorkspace Workspace with instrument moved """ # Get workspace name workspace_name = str(workspace) # Move sample and detector translate_sample_by_z(workspace, sample_offset) translate_detector_by_z(workspace, detector_offset) # Reset the SampleToSi log and sample_detector_distance log for mono-SANS if is_mono: logs = SampleLogs(workspace) # Update sample-silicon-window distance # curr_value = logs.find_log_with_units(sample_si_name, unit='mm') # sample offset is at same direction to +Y, while 'SampleToSi' is toward -Y # convert from meter to mm new_value = (si_window_to_nominal_distance - sample_offset) * 1e3 AddSampleLogMultiple( Workspace=workspace, LogNames="{}".format(sample_si_name), LogValues="{}".format(new_value), LogUnits="mm", ) # Adjust sample_to_detector_distance curr_sdd = logs.find_log_with_units("sample_detector_distance", unit="m") # shift shall be (-sample_offset + detector_offset) curr_sdd += -sample_offset + detector_offset # Set AddSampleLogMultiple( Workspace=workspace, LogNames="{}".format("sample_detector_distance"), LogValues="{}".format(curr_sdd), LogUnits="m", ) # END-IF return mtd[workspace_name]
def _monitor_split_and_log(monitor: str, monitor_group: str, sample_group: str, is_mono: bool, filter_events: dict): r""" Split the monitor workspace and insert the total count in each splitted workspace as log entry 'monitor' in the corresponding splited sample workspaces Parameters ---------- monitor name of the input events monitor workspace monitor_group name of the output WorkspaceGroup containing the splited workspaces for the sample monitor sample_group name of the WorkspaceGroup containing the splited workspaces for the sample is_mono monochromatic or time-of-flight instrument? filter_events options to be passed on to Mantid algorithm `FilterEvents` """ FilterEvents( InputWorkspace=monitor, OutputWorkspaceBaseName=monitor_group, SpectrumWithoutDetector="Skip only if TOF correction", **filter_events, ) if is_mono: splitted_workspaces_count = mtd[sample_group].getNumberOfEntries() for n in range(splitted_workspaces_count): SampleLogs(mtd[sample_group].getItem(n)).insert("monitor", mtd[monitor_group].getItem(n).getNumberEvents())
[docs] def load_and_split( run, data_dir=None, output_workspace=None, output_suffix="", overwrite_instrument=True, pixel_calibration=False, detector_offset=0.0, sample_offset=0.0, time_interval: Union[float, List[float]] = None, time_offset: float = 0.0, time_period: float = None, log_name=None, log_value_interval=None, reuse_workspace=False, monitors=False, instrument_unique_name=None, is_mono=None, **kwargs, ): r"""Load an event NeXus file and filter into a WorkspaceGroup depending on the provided filter options. Either a time_interval must be provided or a log_name and log_value_interval. Metadata added to output workspace includes the ``slice`` number, ``number_of_slices``, ``slice_parameter``, ``slice_interval``, ``slice_start`` and ``slice_end``. For EQSANS two WorkspaceGroup's are return, one for the filtered data and one for filtered monitors Parameters ---------- run: str, ~mantid.api.IEventWorkspace Examples: ``CG3_55555``, ``CG355555`` or file path. data_dir: str, list Additional data search directories output_workspace: str If not specified it will be ``BIOSANS_55555`` determined from the supplied value of ``run``. output_suffix: str If the ``output_workspace`` is not specified, this is appended to the automatically generated output workspace name. overwrite_instrument: bool, str If not :py:obj:`False`, ignore the instrument embedeed in the Nexus file. If :py:obj:`True`, use the latest instrument definition file (IDF) available in Mantid. If ``str``, then it should be the filepath to the desired IDF. pixel_calibration: bool Adjust pixel heights and widths according to bar-scan and tube-width calibrations. detector_offset: float Additional translation of the detector along the Z-axis, in mm. Positive moves the detector downstream. sample_offset: float Additional translation of the sample, in mm. The sample flange remains at the origin of coordinates. Positive moves the sample downstream. time_interval: float or list of floats Array for lengths of time intervals for splitters. If the array has one value, then all splitters will have same time intervals. If the size of the array is larger than one, then the splitters can have various time interval values. time_offset Offset to be added to the start time of the first splitter, in seconds. time_period A multiple integer of the time interval, in seconds. If specified, it indicates that the time slicing is periodic so that events in time intervals separated by one (or more) period should be reduced together. log_name: string Name of the sample log to use to filter. For example, the pulse charge is recorded in 'ProtonCharge'. log_value_interval: float Delta of log value to be sliced into from min log value and max log value. reuse_workspace: bool When true, return the ``output_workspace`` if it already exists kwargs: dict Additional positional arguments for :ref:`LoadEventNexus <algm-LoadEventNexus-v1>`. Returns ------- WorkspaceGroup Reference to the workspace groups containing all the split workspaces """ if not (time_interval or (log_name and log_value_interval)): raise ValueError("Must provide with time_interval or log_name and log_value_interval") # Check whether we need to load or not run = str(run) if registered_workspace(run): all_events_workspace = run # run is a workspace, or the name of a workspace in the AnalysisDataService monitors = monitors or is_mono assert instrument_unique_name is not None, "Instrument name must be given!" else: # determine if this is a monochromatic measurement instrument_unique_name = instrument_enum_name(run) # determine which SANS instrument is_mono = (instrument_unique_name == InstrumentEnumName.BIOSANS) or ( instrument_unique_name == InstrumentEnumName.GPSANS ) # monitors are required for gpsans and biosans monitors = monitors or is_mono all_events_workspace = "_load_tmp" # temporary workspace load_events( run=run, data_dir=data_dir, output_workspace=all_events_workspace, overwrite_instrument=overwrite_instrument, pixel_calibration=pixel_calibration, output_suffix=output_suffix, detector_offset=detector_offset, sample_offset=sample_offset, reuse_workspace=reuse_workspace, **dict(kwargs, LoadMonitors=monitors or is_mono), ) # create default name for output workspace if (output_workspace is None) or (not output_workspace) or (output_workspace == "None"): run_number = extract_run_number(run) if isinstance(run, str) else "" output_workspace = "{}_{}{}".format(instrument_unique_name, run_number, output_suffix) # in the case of periodic time slicing, we replace the time slicing with a log slicing by creating a log emulating # the periodicity of the time intervals. This is so because Mantid algorithm GenerateEventsFilter # does not support periodic time slicing, but it does support the slicing of a periodic log. if isinstance(time_interval, float) and time_period: log_name = "periodic_time_slicing" _insert_periodic_timeslice_log( all_events_workspace, name=log_name, time_interval=time_interval, time_period=time_period, time_offset=time_offset, ) time_interval, log_value_interval = None, 1 # Create event filter workspace GenerateEventsFilter( InputWorkspace=all_events_workspace, OutputWorkspace="_filter", InformationWorkspace="_info", StartTime=str(time_offset), # the algorithm requires a string object TimeInterval=time_interval, UnitOfTime="Seconds", LogName=log_name, LogValueInterval=log_value_interval, ) # Filter data filter_events_opts = dict( SplitterWorkspace="_filter", InformationWorkspace="_info", FilterByPulseTime=True, GroupWorkspaces=True, OutputWorkspaceIndexedFrom1=True, ) FilterEvents(InputWorkspace=all_events_workspace, OutputWorkspaceBaseName=output_workspace, **filter_events_opts) # Remove empty workspaces from event filtering split_ws_list = [mtd[output_workspace].getItem(n) for n in range(mtd[output_workspace].getNumberOfEntries())] for split_ws in split_ws_list: num_events = split_ws.getNumberEvents() if num_events == 0: logger.notice(f"Remove empty sliced workspace {str(split_ws)}") mtd[output_workspace].remove(str(split_ws)) assert is_mono is not None, "is_mono shall be either set or specified" if monitors: _monitor_split_and_log( monitor=all_events_workspace + "_monitors", monitor_group=output_workspace + "_monitors", sample_group=output_workspace, is_mono=is_mono, filter_events=filter_events_opts, ) # Add metadata for each slice with details for n in range(mtd[output_workspace].getNumberOfEntries()): samplelogs = SampleLogs(mtd[output_workspace].getItem(n)) samplelogs.insert("slice", n + 1) samplelogs.insert("number_of_slices", mtd[output_workspace].getNumberOfEntries()) slice_info = mtd[output_workspace].getItem(n).getComment() samplelogs.insert("slice_info", slice_info) if time_interval: samplelogs.insert("slice_parameter", "relative time from start") samplelogs.insert("slice_interval", time_interval) # Calculate relative start and end time samplelogs.insert( "slice_start", (mtd["_filter"].cell(n, 0) - samplelogs.startTime().totalNanoseconds()) / 1e9, "seconds", ) samplelogs.insert( "slice_end", (mtd["_filter"].cell(n, 1) - samplelogs.startTime().totalNanoseconds()) / 1e9, "seconds", ) else: samplelogs.insert("slice_parameter", log_name) samplelogs.insert("slice_interval", log_value_interval) slice_start, slice_end = re.sub(r".*\.From\.|\.Value.*", "", slice_info).split(".To.") samplelogs.insert("slice_start", float(slice_start), samplelogs[log_name].units) samplelogs.insert("slice_end", float(slice_end), samplelogs[log_name].units) # Clean up for name in [all_events_workspace, "_filter", "_info"]: DeleteWorkspace(name) if is_mono or not monitors: return mtd[output_workspace] else: # If EQSANS and the filtered monitors are also being returned DeleteWorkspace(all_events_workspace + "_monitors") return mtd[output_workspace], mtd[output_workspace + "_monitors"]
[docs] def sum_data(data_list, output_workspace, sum_logs=("duration", "timer", "monitor", "monitor1")): r""" Merge data set together, summing the listed logs Parameters ---------- data_list: list of Workspace2D, list of workspace names, comma separated list of workspace names, WorkspaceGroup Examples: [ws1, ws1], ['ws1', 'ws2'] or 'ws1, ws2' output_workspace: str Name of output workspace, required sum_logs: list of str numeric logs that will be summed during merging Returns ------- Workspace2D """ # If needed convert comma separated string list of workspaces in list of strings if isinstance(data_list, str): data_list = [data.strip() for data in data_list.split(",")] # If only one input workpsace then just return that workspace if len(data_list) == 0: return mtd[data_list[0]] # Check workspaces are of correct type for data in data_list: if not mtd.doesExist(str(data)): raise ValueError("Workspace " + data + " does not exist") if not isinstance(mtd[str(data)], mantid.dataobjects.Workspace2D): raise ValueError(data + " is not a Workspace2D, this currently only works correctly for Workspace2D") # Filter sum_logs list to only include logs that exist in data sum_logs = [log for log in sum_logs if mtd[str(data_list[0])].getRun().hasProperty(log)] # Merge workspaces together MergeRuns( InputWorkspaces=",".join(str(data) for data in data_list), OutputWorkspace=output_workspace, SampleLogsSum=",".join(sum_logs), ) return mtd[output_workspace]
def resolve_slicing(reduction_input: dict) -> Tuple[bool, bool]: r""" Resolve if the reduction configuration parameters specify time or log slicing Parameters ---------- reduction_input Dictionary of reduction configuration parameters Returns ------- boolean values for time and log slicing, respectively Raises ------ ValueError - If the sample input data is composed of more than one run - If both time and log slicing are ``True`` """ parameters = reduction_input["configuration"] timeslice, logslice = parameters["useTimeSlice"], parameters["useLogSlice"] if timeslice and logslice: raise ValueError("Can't do both time and log slicing") if timeslice or logslice: sample = reduction_input["sample"]["runNumber"] if len(sample.split(",")) > 1: raise ValueError("Can't do slicing on summed data sets") return timeslice, logslice