# standard modules
import h5py
from typing import List, Union
# third party modules
from mantid.dataobjects import Workspace2D
from mantid.api import AnalysisDataService
from mantid.kernel import Logger, amend_config
from mantid.simpleapi import (
AddSampleLogMultiple,
FilterEvents,
LoadEventNexus,
LoadEventAsWorkspace2D,
MergeRuns,
mtd,
ScaleInstrumentComponent,
)
from drtsans.geometry import (
translate_detector_by_z,
translate_sample_by_z,
translate_source_by_z,
)
# package modules
from drtsans.instruments import (
extract_run_number,
instrument_enum_name,
InstrumentEnumName,
instrument_facility_name,
is_time_of_flight,
)
from drtsans.path import abspath, registered_workspace, exists as path_exists
from drtsans.pixel_calibration import apply_calibrations
from drtsans.polarization import polarized_sample
from drtsans.samplelogs import SampleLogs
from drtsans.filterevents.factory import create_filter_strategy
__all__ = ["load_events", "sum_data", "load_and_split", "move_instrument"]
logger = Logger("drtsans.load")
def __monitor_counts(filename, monitor_name="monitor1"):
r"""Get the total number of counts in a single monitor
Parameters
----------
filename: str
Absolute path to the HDF5 file to be read
monitor_name: str
Name of the monitor to determine the total counts of
Raises
------
RuntimeError
The HDF5 file does not contain a monitor entry
"""
counts = 0 # default value is zero
with h5py.File(filename, "r") as handle:
if monitor_name not in handle["entry"]:
raise RuntimeError('File "{}" does not contain /entry/{}'.format(filename, monitor_name))
# open the monitor group
nxmonitor = handle["entry"][monitor_name]
# get the number of counts from the total counts array or the monitor array
if "total_counts" in nxmonitor:
counts = nxmonitor["total_counts"][0]
else:
counts = nxmonitor["event_time_offset"].shape[0]
return int(counts)
[docs]
def load_events(
run,
data_dir=None,
output_workspace=None,
overwrite_instrument=True,
output_suffix="",
scale_components=None,
pixel_calibration=False,
detector_offset=0.0,
sample_offset=0.0,
reuse_workspace=False,
**kwargs,
):
r"""
Load an event Nexus file produced by the instruments at ORNL.
Parameters
----------
run: str, ~mantid.api.IEventWorkspace
Examples: ``CG3_55555``, ``CG355555`` or file path.
output_workspace: str
If not specified it will be ``BIOSANS_55555`` determined from the supplied value of ``run``.
data_dir: str, list
Additional data search directories
overwrite_instrument: bool, str
If not :py:obj:`False`, ignore the instrument embedeed in the Nexus file. If :py:obj:`True`,
use the latest instrument definition file (IDF) available in Mantid. If ``str``,
then it should be the filepath to the desired IDF.
output_suffix: str
If the ``output_workspace`` is not specified, this is appended to the automatically generated
output workspace name.
scale_components: Optional[dict]
Dictionary of component names and scaling factors in the form of a three-element list,
indicating rescaling of the pixels in the component along the X, Y, and Z axes.
For instance, ``{"detector1": [1.0, 2.0, 1.0]}`` scales pixels along the Y-axis by a factor of 2,
leaving the other pixel dimensions unchanged.
pixel_calibration: bool, str
Adjust pixel heights and widths according to bar-scan and tube-width calibrations. Options are
(1) No calibration (2) Using default calibration file (True) and
(3) User specified calibration file (str)
detector_offset: float
Additional translation of the detector along the Z-axis, in mm. Positive
moves the detector downstream.
sample_offset: float
Additional translation of the sample, in mm. The sample flange remains
at the origin of coordinates. Positive moves the sample downstream.
reuse_workspace: bool
When true, return the ``output_workspace`` if it already exists
kwargs: dict
Additional positional arguments for loading algorithm;
:ref:`LoadEventNexus <algm-LoadEventNexus-v1>`,
or :ref:`LoadEventAsWorkspace2D <algm-LoadEventAsWorkspace2D-v1>`.
May contain filtering options such as ``FilterByTimeStart`` and ``FilterByTimeStop``.
Returns
-------
~mantid.api.IEventWorkspace
Reference to the events workspace
"""
instrument_unique_name = instrument_enum_name(run) # determine which SANS instrument
run_number = extract_run_number(run) if isinstance(run, str) else ""
filename = run if path_exists(run) else "{}{}".format(instrument_unique_name, run_number)
# create default name for output workspace
if (output_workspace is None) or (not output_workspace) or (output_workspace == "None"):
output_workspace = "{}_{}{}".format(instrument_unique_name, run_number, output_suffix)
# determine if this is a monochromatic measurement
is_mono = not is_time_of_flight(instrument_unique_name)
# retrieve or load workspace
if reuse_workspace and mtd.doesExist(output_workspace):
# if it exists skip loading
return mtd[output_workspace]
else:
# load the data into the appropriate workspace
facility = instrument_facility_name(instrument_unique_name)
with amend_config(facility=facility, instrument=str(instrument_unique_name), data_dir=data_dir):
# not loading the instrument xml from the nexus file will use the correct one that is inside mantid
# decide the value of LoadNexusInstrumentXML
# if not specified, determine by overwrite_instrument
if "LoadNexusInstrumentXML" not in kwargs:
kwargs["LoadNexusInstrumentXML"] = not overwrite_instrument
logger.notice(f"Loading {filename} to {output_workspace}")
# MetaDataOnly doesn't require loading events and transform to wavelength
if is_mono and all([kwarg not in kwargs for kwarg in ["LoadMonitors", "NumberOfBins", "MetaDataOnly"]]):
# For monochromatic non-event filtering workflows, LoadEventAsWorkspace2D is more efficient
# LoadEventAsWorkspace2D does not have MetaDataOnly as an argument parameter "MetaDataOnly"
LoadEventAsWorkspace2D(Filename=filename, OutputWorkspace=output_workspace, **kwargs)
else:
LoadEventNexus(Filename=filename, OutputWorkspace=output_workspace, **kwargs)
if isinstance(scale_components, dict):
for component, scalings in scale_components.items():
if isinstance(scalings, list) and len(scalings) == 3:
ScaleInstrumentComponent(
Workspace=mtd[output_workspace],
ComponentName=component,
Scalings=scalings,
ScalePixelSizes=True,
)
if pixel_calibration is not False: # pixel_calibration can be True or a string
# pixel calibration is specified as not False
if isinstance(pixel_calibration, str):
calib_file = pixel_calibration
else:
calib_file = None
apply_calibrations(output_workspace, database=calib_file)
# insert monitor counts for monochromatic instruments
if is_mono:
# determine the fully qualified file path
if "Filename" in mtd[output_workspace].run():
# from the existing workspace
filename = str(mtd[output_workspace].run()["Filename"].value)
else:
# use archive search
filename = str(abspath(filename))
# create new log with the monitor counts if monitor counts exists and the log is not already present
if "monitor" not in SampleLogs(output_workspace):
try:
SampleLogs(output_workspace).insert("monitor", __monitor_counts(filename))
except RuntimeError as e:
logger.warning(str(e)) # log a warning that monitor info not found in filename
# move instrument components - sample position must happen first
# Translate source along Z-axis
translate_source_by_z(output_workspace, z=None, relative=False)
# FIXME (485) - This shall be modified accordingly
from drtsans.geometry import sample_detector_distance
if is_mono:
# HFIR-SANS: use new method
out_ws = mtd[str(output_workspace)]
logger.notice(
f"Before translate source and sample:\n"
f"Sample position = {out_ws.getInstrument().getSample().getPos()}\n"
f"DAS SDD = "
f"{sample_detector_distance(output_workspace, search_logs=True, forbid_calculation=True)}\n"
f"Calculated SDD = {sample_detector_distance(output_workspace, search_logs=False)}"
)
# Determine detector and sample offset from meta data afterwards
else:
# For TOF (i.e., EQSANS), still translate sample and detector as usual
try:
# get DAS recorded SDD
das_sdd = sample_detector_distance(output_workspace, search_logs=True, forbid_calculation=True)
except RuntimeError as run_err:
# it may not exist
if "Unable to find any meta data related to SDD" in str(run_err):
# it is fine if das recorded SDD does not exist
das_sdd = None
else:
raise run_err
translate_sample_by_z(output_workspace, 1e-3 * float(sample_offset)) # convert sample offset from mm to meter
translate_detector_by_z(output_workspace, None) # search logs and translate if necessary
translate_detector_by_z(output_workspace, 1e-3 * float(detector_offset))
real_sdd = sample_detector_distance(output_workspace, search_logs=False)
logger.notice(f"EQSANS workspace {str(output_workspace)} SDD is equal to {real_sdd}")
if das_sdd is not None:
assert real_sdd == das_sdd, f"EQSANS DAS SDD = {das_sdd}, Calculated SDD = {real_sdd}"
return mtd[output_workspace]
[docs]
def move_instrument(
workspace,
sample_offset,
detector_offset,
is_mono=False,
sample_si_name=None,
si_window_to_nominal_distance=None,
):
"""Move instrument sample and detector
Parameters
----------
workspace:
Workspace with instrument's sample or detector translated along z-axis
sample_offset: float
sample offset in unit meter
detector_offset: float
detector offset in unit meter
is_mono: bool
Flag that it belongs to a mono-SANS
sample_si_name: str
Name of Sample to silicon window name
si_window_to_nominal_distance : float or None
distance between Silicon window and sample in unit of meter
Returns
-------
~mantid.api.IEventWorkspace, ~mantid.api.MatrixWorkspace
Workspace with instrument moved
"""
# Get workspace name
workspace_name = str(workspace)
# Move sample and detector
translate_sample_by_z(workspace, sample_offset)
translate_detector_by_z(workspace, detector_offset)
# Reset the SampleToSi log and sample_detector_distance log for mono-SANS
if is_mono:
logs = SampleLogs(workspace)
# Update sample-silicon-window distance
# curr_value = logs.find_log_with_units(sample_si_name, unit='mm')
# sample offset is at same direction to +Y, while 'SampleToSi' is toward -Y
# convert from meter to mm
new_value = (si_window_to_nominal_distance - sample_offset) * 1e3
AddSampleLogMultiple(
Workspace=workspace,
LogNames="{}".format(sample_si_name),
LogValues="{}".format(new_value),
LogUnits="mm",
)
# Adjust sample_to_detector_distance
curr_sdd = logs.find_log_with_units("sample_detector_distance", unit="m")
# shift shall be (-sample_offset + detector_offset)
curr_sdd += -sample_offset + detector_offset
# Set
AddSampleLogMultiple(
Workspace=workspace,
LogNames="{}".format("sample_detector_distance"),
LogValues="{}".format(curr_sdd),
LogUnits="m",
)
# END-IF
return mtd[workspace_name]
def _monitor_split_and_log(monitor: str, monitor_group: str, sample_group: str, is_mono: bool, filter_events: dict):
r"""
Split the monitor workspace and insert the total count from each split workspace as log entry
'monitor' in the corresponding split sample workspace.
Parameters
----------
monitor
name of the input events monitor workspace
monitor_group
name of the output WorkspaceGroup containing the split workspaces for the sample monitor
sample_group
name of the WorkspaceGroup containing the split workspaces for the sample
is_mono
monochromatic or time-of-flight instrument?
filter_events
options to be passed on to Mantid algorithm `FilterEvents`
"""
FilterEvents(
InputWorkspace=monitor,
OutputWorkspaceBaseName=monitor_group,
SpectrumWithoutDetector="Skip only if TOF correction",
**filter_events,
)
if is_mono:
splitted_workspaces_count = mtd[sample_group].getNumberOfEntries()
# sanity check
if mtd[monitor_group].getNumberOfEntries() != splitted_workspaces_count:
message = "Mismatch: the number of sample and monitor slices differ"
logger.error(message)
raise ValueError(message)
# insert sample-log entry "monitor"
for n in range(splitted_workspaces_count):
monitor_slice = mtd[monitor_group].getItem(n)
sample_slice = mtd[sample_group].getItem(n)
SampleLogs(sample_slice).insert("monitor", monitor_slice.getNumberEvents())
[docs]
def load_and_split(
run,
data_dir=None,
output_workspace=None,
output_suffix="",
overwrite_instrument=True,
scale_components=None,
pixel_calibration=False,
detector_offset=0.0,
sample_offset=0.0,
time_interval: Union[float, List[float]] = None,
time_offset: float = 0.0,
time_period: float = None,
log_name=None,
log_value_interval=None,
reuse_workspace=False,
monitors=False,
instrument_unique_name=None,
is_mono=None,
reduction_config: dict = None,
**kwargs,
):
r"""Load an event NeXus file and filter into a WorkspaceGroup depending
on the provided filter options. Either a time_interval must be
provided or a log_name and log_value_interval.
Metadata added to output workspace includes the ``slice`` number,
``number_of_slices``, ``slice_parameter``, ``slice_interval``,
``slice_start`` and ``slice_end``.
For EQSANS two WorkspaceGroup's are return, one for the filtered data and one for filtered monitors
Parameters
----------
run: str, ~mantid.api.IEventWorkspace
Examples: ``CG3_55555``, ``CG355555`` or file path.
data_dir: str, list
Additional data search directories
output_workspace: str
If not specified it will be ``BIOSANS_55555`` determined from the supplied value of ``run``.
output_suffix: str
If the ``output_workspace`` is not specified, this is appended to the automatically generated
output workspace name.
overwrite_instrument: bool, str
If not :py:obj:`False`, ignore the instrument embedeed in the Nexus file. If :py:obj:`True`, use the
latest instrument definition file (IDF) available in Mantid. If ``str``, then it should be the filepath to the
desired IDF.
scale_components: Optional[dict]
Dictionary of component names and scaling factors in the form of a three-element list,
indicating rescaling of the pixels in the component along the X, Y, and Z axes.
For instance, ``{"detector1": [1.0, 2.0, 1.0]}`` scales pixels along the Y-axis by a factor of 2,
leaving the other pixel dimensions unchanged.
pixel_calibration: bool
Adjust pixel heights and widths according to bar-scan and tube-width calibrations.
detector_offset: float
Additional translation of the detector along the Z-axis, in mm. Positive
moves the detector downstream.
sample_offset: float
Additional translation of the sample, in mm. The sample flange remains
at the origin of coordinates. Positive moves the sample downstream.
time_interval: float or list of floats
Array for lengths of time intervals for splitters. If the array has one value,
then all splitters will have same time intervals. If the size of the array is larger
than one, then the splitters can have various time interval values.
time_offset
Offset to be added to the start time of the first splitter, in seconds.
time_period
A multiple integer of the time interval, in seconds. If specified, it indicates that the time
slicing is periodic so that events in time intervals separated by one (or more) period
should be reduced together.
log_name: string
Name of the sample log to use to filter. For example, the pulse charge is recorded in 'ProtonCharge'.
log_value_interval: float
Delta of log value to be sliced into from min log value and max log value.
reuse_workspace: bool
When true, return the ``output_workspace`` if it already exists
reduction_config: dict
Dictionary containing all parameters under entry "configuration" of the input reduction options.
kwargs: dict
Additional positional arguments for :ref:`LoadEventNexus <algm-LoadEventNexus-v1>`.
Returns
-------
Tuple[WorkspaceGroup, Optional[WorkspaceGroup]]
A tuple containing:
- The workspace group containing all the split sample data workspaces
- The workspace group containing all the split monitor workspaces (or None if monitors were not loaded)
"""
# check whether we have some slicing to do
polarized = polarized_sample(reduction_config) if reduction_config is not None else False
if not (time_interval or polarized or (log_name and log_value_interval)):
raise ValueError("Load and split called with no slicing parameters")
# find the unique instrument name, if necessary. Check `is_mono` consistent with the instrument's name
run = str(run)
if instrument_unique_name is None:
instrument_unique_name = instrument_enum_name(run)
if is_mono is None:
is_mono = instrument_unique_name in (InstrumentEnumName.BIOSANS, InstrumentEnumName.GPSANS)
elif is_mono is False:
assert instrument_unique_name == InstrumentEnumName.EQSANS, (
f"Invalid instrument name '{instrument_unique_name}' when is_mono=False"
)
elif is_mono is True:
assert instrument_unique_name in (InstrumentEnumName.BIOSANS, InstrumentEnumName.GPSANS), (
f"Invalid instrument name '{instrument_unique_name}' when is_mono=True"
)
# Check whether we need to load or not
if registered_workspace(run): # run is a workspace, or the name of a workspace in the AnalysisDataService
sample_workspace_already_exists = True
all_events_workspace = run
monitors = monitors or is_mono
else:
sample_workspace_already_exists = False
all_events_workspace = mtd.unique_hidden_name() # temporary workspace
monitors = monitors or is_mono
load_events(
run=run,
data_dir=data_dir,
output_workspace=all_events_workspace,
overwrite_instrument=overwrite_instrument,
scale_components=scale_components,
pixel_calibration=pixel_calibration,
output_suffix=output_suffix,
detector_offset=detector_offset,
sample_offset=sample_offset,
reuse_workspace=reuse_workspace,
**dict(kwargs, LoadMonitors=monitors),
)
# create default name for output workspace
if (output_workspace is None) or (not output_workspace) or (output_workspace == "None"):
run_number = extract_run_number(run) if isinstance(run, str) else ""
output_workspace = "{}_{}{}".format(instrument_unique_name, run_number, output_suffix)
# Create and apply filter strategy
filter_strategy = create_filter_strategy(
workspace=all_events_workspace,
reduction_config=reduction_config,
time_interval=time_interval,
time_offset=time_offset,
time_period=time_period,
log_name=log_name,
log_value_interval=log_value_interval,
)
# Apply filtering
filter_strategy.apply_filter(output_workspace)
assert is_mono is not None, "is_mono shall be either set or specified"
if monitors:
# Use the strategy's splitter and info workspace names for monitor filtering
filter_events_opts = dict(
SplitterWorkspace=filter_strategy.splitter_workspace,
InformationWorkspace=filter_strategy.info_workspace,
FilterByPulseTime=True,
GroupWorkspaces=True,
OutputWorkspaceIndexedFrom1=True,
)
_monitor_split_and_log(
monitor=all_events_workspace + "_monitors",
monitor_group=output_workspace + "_monitors",
sample_group=output_workspace,
is_mono=is_mono,
filter_events=filter_events_opts,
)
# Remove empty workspaces from event filtering
split_ws_list = [mtd[output_workspace].getItem(n) for n in range(mtd[output_workspace].getNumberOfEntries())]
if monitors:
group_workspace = mtd[output_workspace + "_monitors"]
split_monitors = [str(group_workspace.getItem(n)) for n in range(group_workspace.getNumberOfEntries())]
for i, split_ws in enumerate(split_ws_list):
num_events = split_ws.getNumberEvents()
if num_events == 0:
logger.notice(f"Remove empty sliced workspace {str(split_ws)}")
mtd[output_workspace].remove(str(split_ws))
if monitors:
mtd[output_workspace + "_monitors"].remove(split_monitors[i])
# Inject metadata using the strategy
filter_strategy.inject_metadata(output_workspace)
# Clean up temporary workspaces
if sample_workspace_already_exists is True:
logger.warning("Removing sample workspace, which already existed")
for name in [all_events_workspace, filter_strategy.splitter_workspace, filter_strategy.info_workspace]:
AnalysisDataService.remove(name)
if monitors:
AnalysisDataService.remove(all_events_workspace + "_monitors")
return mtd[output_workspace], mtd[output_workspace + "_monitors"]
else:
return mtd[output_workspace], None
[docs]
def sum_data(data_list, output_workspace, sum_logs=("duration", "timer", "monitor", "monitor1")):
r"""
Merge data set together, summing the listed logs
Parameters
----------
data_list: list of Workspace2D, list of workspace names, comma separated list of workspace names, WorkspaceGroup
Examples: [ws1, ws1], ['ws1', 'ws2'] or 'ws1, ws2'
output_workspace: str
Name of output workspace, required
sum_logs: list of str
numeric logs that will be summed during merging
Returns
-------
Workspace2D
"""
# If needed convert comma separated string list of workspaces in list of strings
if isinstance(data_list, str):
data_list = [data.strip() for data in data_list.split(",")]
# If only one input workpsace then just return that workspace
if len(data_list) == 0:
return mtd[data_list[0]]
# Check workspaces are of correct type
for data in data_list:
if not mtd.doesExist(str(data)):
raise ValueError("Workspace " + data + " does not exist")
if not isinstance(mtd[str(data)], Workspace2D):
raise ValueError(data + " is not a Workspace2D, this currently only works correctly for Workspace2D")
# Filter sum_logs list to only include logs that exist in data
sum_logs = [log for log in sum_logs if mtd[str(data_list[0])].getRun().hasProperty(log)]
# Merge workspaces together
MergeRuns(
InputWorkspaces=",".join(str(data) for data in data_list),
OutputWorkspace=output_workspace,
SampleLogsSum=",".join(sum_logs),
)
return mtd[output_workspace]