Source code for drtsans.tof.eqsans.load

# standard imports
import os
from typing import List, Union

# third-party imports
from mantid.simpleapi import mtd, LoadNexusMonitors
from mantid.kernel import amend_config

# local imports
from drtsans.beam_finder import center_detector, find_beam_center
from drtsans.instruments import extract_run_number, instrument_enum_name
from drtsans.load import (
    load_events as generic_load_events,
    sum_data,
    load_and_split as generic_load_and_split,
)
from drtsans.settings import namedtuplefy
from drtsans.samplelogs import SampleLogs
from drtsans.tof.eqsans.correct_frame import (
    correct_detector_frame,
    correct_monitor_frame,
    transform_to_wavelength,
    smash_monitor_spikes,
    set_init_uncertainties,
    correct_emission_time,
)
from drtsans.tof.eqsans.geometry import source_monitor_distance


__all__ = [
    "load_events",
    "load_events_monitor",
    "sum_data",
    "load_events_and_histogram",
    "load_and_split_and_histogram",
    "prepare_monitors",
]


[docs] def load_events_monitor(run, data_dir=None, output_workspace=None): r""" Load monitor events with initial corrections for geometry and time-of-flight Parameters ---------- run: int, str Examples: ``55555`` or ``EQSANS_55555`` or file path. data_dir: str, list Additional data search directories output_workspace: str If not specified it will be ``EQSANS_55555_monitors`` determined from the supplied value of ``run`` Returns ------- ~mantid.api.MatrixWorkspace """ suffix = "_monitors" if output_workspace is None: if isinstance(run, str): output_workspace = os.path.split(run)[-1] output_workspace = "_".join(output_workspace.split("_")[:2]) output_workspace = output_workspace.split(".")[0] + suffix else: output_workspace = "EQSANS_{}{}".format(run, suffix) with amend_config(facility="SNS", instrument="EQSANS", data_dir=data_dir): LoadNexusMonitors(Filename=str(run), LoadOnly="Events", OutputWorkspace=output_workspace) smd = source_monitor_distance(output_workspace, unit="mm") SampleLogs(output_workspace).insert("source-monitor-distance", smd, unit="mm") # DAS automatically corrects the frame for the monitor only on or after June 1 2019, day_stamp = int(SampleLogs(output_workspace).start_time.value[0:10].replace("-", "")) if day_stamp < 20190601: correct_monitor_frame(output_workspace) return mtd[output_workspace]
[docs] def prepare_monitors(data, bin_width=0.1, output_workspace=None): r""" Loads monitor counts, correct TOF, and transforms to wavelength. Parameters ---------- data: int, str, ~mantid.api.IEventWorkspace Run number as int or str, file path, :py:obj:`~mantid.api.IEventWorkspace` bin_width: float Bin width for the output workspace, in Angstroms. output_workspace: str Name of the output workspace. If None, then it will be ``EQSANS_XXXXX_monitors`` with number XXXXX determined from ``data``. Returns ------- ~mantid.api.MatrixWorkspace """ w = load_events_monitor(data, output_workspace=output_workspace) w = smash_monitor_spikes(w) w, bands = transform_to_wavelength(w, bin_width=bin_width) w = set_init_uncertainties(w) return w
[docs] def load_events( run, scale_components=None, pixel_calibration=False, detector_offset=0.0, sample_offset=0.0, path_to_pixel=True, data_dir=None, output_workspace=None, output_suffix="", **kwargs, ): r""" Load events with initial corrections for geometry and time-of-flight Note: Detector is translated along the Z-axis by the value specified in keyword ``detectorZ`` of the logs. The final sample to detector distance is detectorZ + detector_offset - sample_offset. **Mantid algorithms used:** :ref:`LoadEventNexus <algm-LoadEventNexus-v1>`, Parameters ---------- run: int, str Examples: ``55555`` or ``EQSANS_55555`` or file path. scale_components: Optional[dict] Dictionary of component names and scaling factors in the form of a three-element list, indicating rescaling of the pixels in the component along the X, Y, and Z axes. For instance, ``{"detector1": [1.0, 2.0, 1.0]}`` scales pixels along the Y-axis by a factor of 2, leaving the other pixel dimensions unchanged. pixel_calibration: bool Adjust pixel heights and widths according to barscan and tube-width calibrations. detector_offset: float Additional translation of the detector along the Z-axis, in mm. Positive moves the detector downstream. sample_offset: float Additional translation of the sample, in mm. The sample flange remains at the origin of coordinates. Positive moves the sample downstream. path_to_pixel: bool When correcting the recorded time of flight of each neutron, use the path from the moderator to the detector pixel (`True`) or to the center of the detector panel (`False`). The latter for comparison to the old data reduction. data_dir: str, list Additional data search directories output_workspace: str If not specified it will be ``EQSANS_55555`` determined from the supplied value of ``run`` output_suffix: str If the ``output_workspace`` is not specified, this is appended to the automatically generated output workspace name. kwargs: dict Additional positional arguments for :ref:`LoadEventNexus <algm-LoadEventNexus-v1>`. Returns ------- ~mantid.api.IEventWorkspace Reference to the events workspace """ # use the generic functionality to do most of the work output_workspace = generic_load_events( run=run, data_dir=data_dir, output_workspace=output_workspace, output_suffix=output_suffix, scale_components=scale_components, pixel_calibration=pixel_calibration, detector_offset=detector_offset, sample_offset=sample_offset, **kwargs, ) # EQSANS specific part benefits from converting workspace to a string output_workspace = str(output_workspace) # Correct TOF of detector correct_detector_frame(output_workspace, path_to_pixel=path_to_pixel) # Correct TOF for emission time correct_emission_time(output_workspace) return mtd[output_workspace]
[docs] @namedtuplefy def load_events_and_histogram( run, scale_components=None, pixel_calibration=False, detector_offset=0.0, sample_offset=0.0, path_to_pixel=True, data_dir=None, output_workspace=None, output_suffix="", bin_width=0.1, low_tof_clip=500, high_tof_clip=2000, center_x=None, center_y=None, centering_method="center_of_mass", centering_options={}, mask=None, monitors=False, keep_events=True, sample_bands=None, **kwargs, ): r"""Load events from one or more NeXus files with initial corrections for geometry, time-of-flight and beam center. Convert to wavelength and sum. Note: Detector is translated along the Z-axis by the value specified in keyword ``detectorZ`` of the logs. The final sample to detector distance is detectorZ + detector_offset - sample_offset. Parameters ---------- run: list of runs to load Examples: ``55555`` or ``EQSANS_55555`` or file path or ``EQSANS_55555, EQSANS_55556`` scale_components: Optional[dict] Dictionary of component names and scaling factors in the form of a three-element list, indicating rescaling of the pixels in the component along the X, Y, and Z axes. For instance, ``{"detector1": [1.0, 2.0, 1.0]}`` scales pixels along the Y-axis by a factor of 2, leaving the other pixel dimensions unchanged. pixel_calibration: bool Adjust pixel heights and widths according to barscan and tube-width calibrations. detector_offset: float Additional translation of the detector along the Z-axis, in mm. Positive moves the detector downstream. sample_offset: float Additional translation of the sample, in mm. The sample flange remains at the origin of coordinates. Positive moves the sample downstream. path_to_pixel: bool When correcting the recorded time of flight of each neutron, use the path from the moderator to the detector pixel (`True`) or to the center of the detector panel (`False`). The latter for comparison to the old data reduction. data_dir: str, list Additional data search directories output_workspace: str If not specified it will be ``EQSANS_55555`` determined from the supplied value of ``run`` output_suffix: str If the ``output_workspace`` is not specified, this is appended to the automatically generated output workspace name. bin_width: float Bin width for the output workspace, in Angstroms. low_tof_clip: float Ignore events with a time-of-flight (TOF) smaller than the minimal TOF plus this quantity. high_tof_clip: float Ignore events with a time-of-flight (TOF) bigger than the maximal TOF minus this quantity. center_x: float Move the center of the detector to this X-coordinate. If :py:obj:`None`, the detector will be moved such that the X-coordinate of the intersection point between the neutron beam and the detector array will have ``x=0``. center_y: float Move the center of the detector to this X-coordinate. If :py:obj:`None`, the detector will be moved such that the X-coordinate of the intersection point between the neutron beam and the detector array will have ``y=0``. centering_method: str Method to calculate the beam center. Available methods are: - 'center_of_mass', invokes :ref:`FindCenterOfMassPosition <algm-FindCenterOfMassPosition-v1>` - 'gaussian', 2D Gaussian fit to beam center data centering_options: dict Arguments to be passed on to the centering method. mask: mask file path, MaskWorkspace, list Additional mask to be applied. If `list`, it is a list of detector ID's. monitors: boolean Option to load the monitors as well as the data, if False monitor will be None keep_events: bool The final histogram will be an EventsWorkspace if True. sample_bands: bands or None sample bands kwargs: dict Additional positional arguments for :ref:`LoadEventNexus <algm-LoadEventNexus-v1>`. Returns ------- namedtuple Fields of namedtuple data: the loaded data monitor: the monitor for the data, if monitors==True else None """ # If needed convert comma separated string list of workspaces in list of strings if isinstance(run, str): run = [r.strip() for r in run.split(",")] if output_workspace is not None: monitor_workspace = output_workspace + "_monitors" else: monitor_workspace = None # if only one run just load and transform to wavelength and return workspace if len(run) == 1: if monitors: ws_monitors = prepare_monitors(run[0], bin_width, monitor_workspace) else: ws_monitors = None ws = load_events( run=run[0], scale_components=scale_components, pixel_calibration=pixel_calibration, detector_offset=detector_offset, sample_offset=sample_offset, path_to_pixel=path_to_pixel, data_dir=data_dir, output_workspace=output_workspace, output_suffix=output_suffix, **kwargs, ) if center_x is None or center_y is None: center_x, center_y, _ = find_beam_center( ws, mask=mask, method=centering_method, centering_options=centering_options, ) center_detector(ws, center_x=center_x, center_y=center_y) # operates in-place ws, bands = transform_to_wavelength( ws, bin_width=bin_width, low_tof_clip=low_tof_clip, high_tof_clip=high_tof_clip, keep_events=keep_events, bands=sample_bands, ) ws = set_init_uncertainties(ws) return dict(data=ws, monitor=ws_monitors, bands=bands) else: # Load multiple runs if keep_events: raise NotImplementedError("Cannot merge runs together with keep_events=True.") instrument_unique_name = instrument_enum_name(run[0]) # determine which SANS instrument # create default name for output workspace, uses all input if (output_workspace is None) or (not output_workspace) or (output_workspace == "None"): output_workspace = "{}_{}{}".format( instrument_unique_name, "_".join(str(extract_run_number(r)) for r in run), output_suffix, ) # list of worksapce to sum temp_workspaces = [] temp_monitors_workspaces = [] # load and transform each workspace in turn for n, r in enumerate(run): temp_workspace_name = "__tmp_ws_{}".format(n) temp_monitors_workspace_name = temp_workspace_name + "_monitors" if monitors: prepare_monitors(r, bin_width, temp_monitors_workspace_name) load_events( run=r, scale_components=scale_components, pixel_calibration=pixel_calibration, detector_offset=detector_offset, sample_offset=sample_offset, path_to_pixel=path_to_pixel, data_dir=data_dir, output_workspace=temp_workspace_name, output_suffix=output_suffix, **kwargs, ) if center_x is None or center_y is None: center_x, center_y, _ = find_beam_center( temp_workspace_name, mask=mask, method=centering_method, centering_options=centering_options, ) center_detector(temp_workspace_name, center_x=center_x, center_y=center_y) # operates in-place # FIXME 792, whether the 2nd workspace shall use the bands from the first one? ws, bands = transform_to_wavelength( temp_workspace_name, bin_width=bin_width, low_tof_clip=low_tof_clip, high_tof_clip=high_tof_clip, keep_events=keep_events, ) assert ws temp_workspaces.append(temp_workspace_name) # Sum temporary loaded monitor workspaces if monitors: ws_monitors = sum_data(temp_monitors_workspaces, output_workspace=monitor_workspace) # After summing data re-calculate initial uncertainties ws_monitors = set_init_uncertainties(ws_monitors) else: ws_monitors = None # Sum temporary loaded workspaces # FIXME 792 sum data potential defect: wavelength range are different ws = sum_data(temp_workspaces, output_workspace=output_workspace) # After summing data re-calculate initial uncertainties ws = set_init_uncertainties(ws) # Remove temporary wokspace for ws_name in temp_workspaces: if mtd.doesExist(ws_name): mtd.remove(ws_name) return dict(data=ws, monitor=ws_monitors, bands=bands)
def load_and_split( run, detector_offset=0.0, sample_offset=0.0, path_to_pixel=True, data_dir=None, output_workspace=None, output_suffix="", overwrite_instrument=True, scale_components=None, pixel_calibration=False, time_interval: Union[float, List[float]] = None, time_offset: float = 0.0, time_period: float = None, log_name=None, log_value_interval=None, reuse_workspace=False, **kwargs, ): r"""Load an event NeXus file and filter into a WorkspaceGroup depending on the provided filter options. Either a time_interval must be provided or a log_name and log_value_interval. Parameters ---------- run: str, ~mantid.api.IEventWorkspace Examples: ``CG3_55555``, ``CG355555`` or file path. scale_components: Optional[dict] Dictionary of component names and scaling factors in the form of a three-element list, indicating rescaling of the pixels in the component along the X, Y, and Z axes. For instance, ``{"detector1": [1.0, 2.0, 1.0]}`` scales pixels along the Y-axis by a factor of 2, leaving the other pixel dimensions unchanged. pixel_calibration: bool Adjust pixel heights and widths according to bar-scan and tube-width calibrations. centering_method: str Method to calculate the beam center. Available methods are: - 'center_of_mass', invokes :ref:`FindCenterOfMassPosition <algm-FindCenterOfMassPosition-v1>` - 'gaussian', 2D Gaussian fit to beam center data centering_options: dict Arguments to be passed on to the centering method. time_interval Array for lengths of time intervals for splitters. If the array has one value, then all splitters will have same time intervals. If the size of the array is larger than one, then the splitters can have various time interval values. time_offset Offset to be added to the start time of the first splitter, in seconds. time_period A multiple integer of the time interval. If specified, it indicates that the time slicing is periodic so that events in time intervals separated by one (or more) period should be reduced together. Returns ------- ~tuple (WorkspaceGroup, Bands): Reference to the workspace groups containing all the split workspaces """ # FIXME the issues with the monitor on EQSANS has not been fixed. Enable normalization by monitor (issue #538) kwargs["monitors"] = False ws_group = generic_load_and_split( run=run, data_dir=data_dir, output_workspace=output_workspace, output_suffix=output_suffix, overwrite_instrument=overwrite_instrument, scale_components=scale_components, pixel_calibration=pixel_calibration, detector_offset=detector_offset, sample_offset=sample_offset, time_interval=time_interval, time_offset=time_offset, time_period=time_period, log_name=log_name, log_value_interval=log_value_interval, reuse_workspace=reuse_workspace, instrument_unique_name="EQSANS", **kwargs, ) for _w in ws_group: # Correct TOF of detector correct_detector_frame(_w, path_to_pixel=path_to_pixel) # Correct TOF for emission time correct_emission_time(_w) return ws_group
[docs] def load_and_split_and_histogram( run, detector_offset=0.0, sample_offset=0.0, path_to_pixel=True, data_dir=None, output_workspace=None, output_suffix="", overwrite_instrument=True, scale_components=None, pixel_calibration=False, bin_width=0.1, low_tof_clip=500, high_tof_clip=2000, center_x=None, center_y=None, centering_method="center_of_mass", centering_options={}, mask=None, keep_events=True, time_interval: Union[float, List[float]] = None, time_offset: float = 0.0, time_period: float = None, log_name=None, log_value_interval=None, reuse_workspace=False, **kwargs, ): r"""Load an event NeXus file and filter into a WorkspaceGroup depending on the provided filter options. Either a time_interval must be provided or a log_name and log_value_interval. Parameters ---------- run: str, ~mantid.api.IEventWorkspace Examples: ``CG3_55555``, ``CG355555`` or file path. scale_components: Optional[dict] Dictionary of component names and scaling factors in the form of a three-element list, indicating rescaling of the pixels in the component along the X, Y, and Z axes. For instance, ``{"detector1": [1.0, 2.0, 1.0]}`` scales pixels along the Y-axis by a factor of 2, leaving the other pixel dimensions unchanged. pixel_calibration: bool Adjust pixel heights and widths according to bar-scan and tube-width calibrations. centering_method: str Method to calculate the beam center. Available methods are: - 'center_of_mass', invokes :ref:`FindCenterOfMassPosition <algm-FindCenterOfMassPosition-v1>` - 'gaussian', 2D Gaussian fit to beam center data centering_options: dict Arguments to be passed on to the centering method. time_interval Array for lengths of time intervals for splitters. If the array has one value, then all splitters will have same time intervals. If the size of the array is larger than one, then the splitters can have various time interval values. time_offset Offset to be added to the start time of the first splitter, in seconds. time_period A multiple integer of the time interval. If specified, it indicates that the time slicing is periodic so that events in time intervals separated by one (or more) period should be reduced together. Returns ------- ~tuple (WorkspaceGroup, Bands): Reference to the workspace groups containing all the split workspaces """ ws_group = load_and_split( run, detector_offset=detector_offset, sample_offset=sample_offset, path_to_pixel=path_to_pixel, data_dir=data_dir, output_workspace=output_workspace, output_suffix=output_suffix, overwrite_instrument=overwrite_instrument, scale_components=scale_components, pixel_calibration=pixel_calibration, time_interval=time_interval, time_offset=time_offset, time_period=time_period, log_name=log_name, log_value_interval=log_value_interval, reuse_workspace=reuse_workspace, **kwargs, ) bands = None for _w in ws_group: if center_x is None or center_y is None: center_x, center_y, _ = find_beam_center( _w, mask=mask, method=centering_method, centering_options=centering_options, ) center_detector(_w, center_x=center_x, center_y=center_y) # operates in-place ws_i, c_bands = transform_to_wavelength( _w, bin_width=bin_width, low_tof_clip=low_tof_clip, high_tof_clip=high_tof_clip, keep_events=keep_events, ) set_init_uncertainties(_w) if bands is None: bands = c_bands return ws_group, bands