# standard imports
import os
from typing import List, Union
# third-party imports
from mantid.kernel import logger
from mantid.simpleapi import LoadHFIRSANS, HFIRSANS2Wavelength, mtd, SaveNexusProcessed
# the generic version is feature complete for monochromatic data
# local imports
from drtsans.geometry import sample_detector_distance
from drtsans.instruments import extract_run_number, instrument_enum_name
from drtsans.load import load_events, move_instrument, sum_data
from drtsans.load import load_and_split as generic_load_and_split
from drtsans.mono.meta_data import get_sample_detector_offset
from drtsans.process_uncertainties import set_init_uncertainties
from drtsans.samplelogs import SampleLogs
__all__ = [
"load_events",
"sum_data",
"load_histogram",
"transform_to_wavelength",
"load_mono",
"load_events_and_histogram",
"load_and_split",
"set_init_uncertainties",
]
[docs]
def load_histogram(
filename,
output_workspace=None,
wavelength=None,
wavelength_spread=None,
sample_det_cent=None,
):
"""Loads a SANS data file produce by the HFIR instruments at ORNL.
The instrument geometry is also loaded. The center of the detector is
placed at (0, 0, :ref:`sample_det_cent <devdocs-standardnames>` )
Parameters
----------
filename : str
The name of the input xml file to load
output_workspace : str, optional
The optional name of the output workspace. If :py:obj:`None` is the filename stripped of the extension.
wavelength : float
The wavelength value to use when loading the data file (Angstrom).
This value will be used instead of the value found in the data file.
wavelength_spread : float
wavelength spread value to use when loading the data file (Angstrom).
This value will be used instead of the value found in the data file.
sample_det_cent : float
Sample to detector distance to use (overrides meta data) in mm
Returns
-------
~mantid.api.MatrixWorkspace
A reference for the workspace created.
"""
if output_workspace is None:
output_workspace = os.path.basename(filename).split(".")[0]
ws = LoadHFIRSANS(
Filename=filename,
Wavelength=wavelength,
WavelengthSpread=wavelength_spread,
SampleDetectorDistance=sample_det_cent,
OutputWorkspace=output_workspace,
)
return ws
[docs]
def load_mono(filename, **kwargs):
r"""
Loads a SANS data file produce by the HFIR instruments at ORNL.
Parameters
----------
filename: int, str
Examples: ``55555`` or ``CG3_55555`` or file path.
kwargs:
keyword arguments for load_events or load_histogram.
Returns
-------
~mantid.api.MatrixWorkspace
"""
try:
return load_events(filename, **kwargs)
except Exception:
return load_histogram(filename, **kwargs)
[docs]
def load_events_and_histogram(
run,
data_dir=None,
output_workspace=None,
output_suffix="",
overwrite_instrument=True,
pixel_calibration=False,
reuse_workspace=False,
sample_to_si_name=None,
si_nominal_distance=None,
sample_to_si_value=None,
sample_detector_distance_value=None,
**kwargs,
):
r"""Load one or more event Nexus file produced by the instruments at
HFIR. Convert to wavelength and sums the data.
Parameters
----------
run: str, list of runs to load
Examples: ``CG3_55555``, ``CG355555``, file path, ``CG3_55555,CG3_55556``
data_dir: str, list
Additional data search directories
output_workspace: str
If not specified it will be ``BIOSANS_55555`` determined from the supplied value of ``run``.
output_suffix: str
If the ``output_workspace`` is not specified, this is appended to the automatically generated
output workspace name.
overwrite_instrument: bool, str
If not :py:obj:`False`, ignore the instrument embedeed in the Nexus file. If :py:obj:`True`, use the
latest instrument definition file (IDF) available in Mantid. If ``str``, then it should be the filepath to the
desired IDF.
pixel_calibration: bool
Adjust pixel heights and widths according to bar-scan and tube-width calibrations.
sample_to_si_name: str
Meta data name for sample to Silicon window distance
si_nominal_distance: float
distance between nominal position to silicon window. unit = meter
sample_to_si_value: float or None
Sample to silicon window distance to overwrite the EPICS value. None for no operation. unit = meter
sample_detector_distance_value: float or None
Sample to detector distance to overwrite the EPICS value. None for no operation. unit = meter
reuse_workspace: bool
When true, return the ``output_workspace`` if it already exists
kwargs: dict
Additional positional arguments for :ref:`LoadEventNexus <algm-LoadEventNexus-v1>`.
Returns
-------
~mantid.api.MatrixWorkspace
"""
# Check inputs
if sample_to_si_name is None:
raise NotImplementedError(f"For {run} Sample to Si window name must be specified thus cannot be None")
# If needed convert comma separated string list of workspaces in list of strings
if isinstance(run, str):
runs = [r.strip() for r in run.split(",")]
else:
runs = run
# sanity check
if not isinstance(runs, list):
raise RuntimeError(f"runs {runs} of type {type(runs)} must be a list at this stage")
single_run = len(runs) == 1
# Specify a default name for non-single run output workspace
if not single_run and ((output_workspace is None) or (not output_workspace) or (output_workspace == "None")):
# create default name for output workspace, uses all input
instrument_unique_name = instrument_enum_name(runs[0]) # determine which SANS instrument
output_workspace = "{}_{}{}".format(
instrument_unique_name,
"_".join(str(extract_run_number(r)) for r in runs),
output_suffix,
)
# Load NeXus file(s)
# define list of workspace to sum
temp_workspaces = list()
for index, run in enumerate(runs):
# load and transform to wavelength and return workspace
if single_run:
output_ws_name = output_workspace
else:
output_ws_name = "__tmp_ws_{}".format(index)
# Load event but not move sample or detector position by meta data
ws = load_events(
run=run,
data_dir=data_dir,
output_workspace=output_ws_name,
overwrite_instrument=overwrite_instrument,
output_suffix=output_suffix,
pixel_calibration=pixel_calibration,
detector_offset=0.0,
sample_offset=0.0,
reuse_workspace=reuse_workspace,
**kwargs,
)
# Calculate offset with overwriting to sample-detector-distance
set_sample_detector_position(
ws,
sample_to_si_window_name=sample_to_si_name,
si_window_to_nominal_distance=si_nominal_distance,
sample_si_window_overwrite_value=sample_to_si_value,
sample_detector_distance_overwrite_value=sample_detector_distance_value,
)
# Transform to wavelength
ws = transform_to_wavelength(ws)
# Append
temp_workspaces.append(ws)
# Sum over all the workspaces if needed
if single_run:
out_ws = temp_workspaces[0]
else:
# Sum temporary loaded workspaces
out_ws = sum_data(temp_workspaces, output_workspace=output_workspace)
# Remove temporary workspaces
for ws_i in temp_workspaces:
ws_name = str(ws_i)
if mtd.doesExist(ws_name):
mtd.remove(ws_name)
# Set uncertainty: After summing data re-calculate initial uncertainties
out_ws = set_init_uncertainties(out_ws)
return out_ws
def set_sample_detector_position(
ws,
sample_to_si_window_name,
si_window_to_nominal_distance,
sample_si_window_overwrite_value,
sample_detector_distance_overwrite_value,
):
"""Calculate sample and detector offset from default position from geometry-related meta data
and move the main detector and/or sample to correct position
Parameters
----------
ws: ~mantid.api.MatrixWorkspace
Workspace where the instrument is for sample detector position to set correctly
sample_to_si_window_name: str
meta data name for Sample to Silicon window distance
si_window_to_nominal_distance: float
Silicon window to nominal position distance in unit of meter
sample_si_window_overwrite_value: float or None
value to overwrite sample to silicon window distance in unit of meter
None for not overwriting
sample_detector_distance_overwrite_value: float or None
value to overwrite sample to detector distance in unit of meter
None for not overwriting
Returns
-------
"""
# Information output before
logs = SampleLogs(ws)
# Input verification: DAS record SDD must be same as calculated SDD
das_sdd = sample_detector_distance(ws, search_logs=True, unit="mm", forbid_calculation=True)
real_sdd = sample_detector_distance(ws, search_logs=False, unit="mm")
if abs(das_sdd - real_sdd) > 1.0:
raise RuntimeError(
f"Workspace {str(ws)}: after loading and initial setup, DAS SDD ({das_sdd})"
f"is not equal to calculated/real SDD ({real_sdd}) by proportion as "
f"{abs(das_sdd - real_sdd)/das_sdd}"
)
# Get original sample detector distance: find expected SDD for further verification
if sample_detector_distance_overwrite_value is None:
# respect the das-recorded SDD
expected_sdd = sample_detector_distance(ws, search_logs=True, unit="mm")
if sample_si_window_overwrite_value is not None:
das_sample_si_distance = ws.getRun().getProperty(sample_to_si_window_name).value.mean() * 1e-3 # meter
shift = sample_si_window_overwrite_value - das_sample_si_distance # meter
expected_sdd += shift * 1e3
else:
# sample overwrite value: input is meter
expected_sdd = sample_detector_distance_overwrite_value * 1000
# record some raw (prior to any processing) geometry information
prior_geom_info = (
f"{ws}: \n"
f"Prior to any geometry correction:\n"
f"Sample to detector distance = {sample_detector_distance(ws, search_logs=False)}"
f"(calculated) vs {sample_detector_distance(ws, search_logs=True)} (meta) mm.\n"
f' SampleToSi = {logs.find_log_with_units(sample_to_si_window_name, unit="mm")} mm\n'
f"Overwrite Values = {sample_si_window_overwrite_value}, "
f"{sample_detector_distance_overwrite_value}\n"
)
# Calculate sample and detector offsets for moving
sample_offset, detector_offset = get_sample_detector_offset(
ws,
sample_si_meta_name=sample_to_si_window_name,
zero_sample_offset_sample_si_distance=si_window_to_nominal_distance,
overwrite_sample_si_distance=sample_si_window_overwrite_value,
overwrite_sample_detector_distance=sample_detector_distance_overwrite_value,
)
# log
prior_geom_info += "Sample offset = {}, Detector offset = {}\n".format(sample_offset, detector_offset)
# Move sample and detector
ws = move_instrument(
ws,
sample_offset,
detector_offset,
is_mono=True,
sample_si_name=sample_to_si_window_name,
si_window_to_nominal_distance=si_window_to_nominal_distance,
)
# Check current instrument setup and meta data (sample logs)
logger.notice(
"{} Sample to detector distance = {} (calculated) vs {} (meta) mm"
"".format(
str(ws),
sample_detector_distance(ws, search_logs=False),
sample_detector_distance(ws, search_logs=True),
)
)
# Verification
calculated_sdd = sample_detector_distance(ws, search_logs=False, unit="mm")
# FIXME - absolute 0.01 mm is not a criteria restrict enough: 10E-2 mm will fail the test
criteria_mm = 1e-3
if abs(expected_sdd - calculated_sdd) > criteria_mm: # absolute difference: 0.02 mm. not good!
logs = SampleLogs(ws)
prior_geom_info += (
f"Result from geometry operation:\n"
f"Sample position = {ws.getInstrument().getSample().getPos()}\n"
f'SampleToSi = {logs.find_log_with_units(sample_to_si_window_name, unit="mm")}'
f"mm (From Log)\n"
)
# add detector information
prior_geom_info += f"Detector[0] pos = {ws.getDetector(0).getPos()}\n"
prior_geom_info += f"Detector[{192 * 256 - 1}] = {ws.getDetector(192 * 256 - 1).getPos()}"
shift_det_x = ws.getRun().getProperty("detector_trans_Readback").value
shift_det_x_unit = ws.getRun().getProperty("detector_trans_Readback").units
prior_geom_info += f"Detector translation X-axis = {shift_det_x} ({shift_det_x_unit})\n"
# form error message
error_msg = (
f"Error: ws = {str(ws)}:\n"
f"Expected SDD = {expected_sdd} (mm), "
f"Overwrite SDD = {sample_detector_distance_overwrite_value}, "
f"Calculated SDD = {calculated_sdd} (mm)."
f"Error = {abs(expected_sdd - calculated_sdd)} > {criteria_mm}.\n"
f"FYI:\n {prior_geom_info}\n"
f"Failed workspace is saved to mono_sans_run_geometry_error.nxs"
)
logger.error(error_msg)
# Save workspace for further investigation
SaveNexusProcessed(
InputWorkspace=ws,
Filename="mono_sans_run_geometry_error.nxs",
Title=f"from workspace {str(ws)}",
)
raise RuntimeError(error_msg)
return ws
[docs]
def load_and_split(
run,
sample_to_si_name,
si_nominal_distance,
data_dir=None,
output_workspace=None,
output_suffix="",
overwrite_instrument=True,
pixel_calibration=False,
time_interval: Union[float, List[float]] = None,
time_offset: float = 0.0,
time_period: float = None,
log_name=None,
log_value_interval=None,
sample_to_si_value=None,
sample_detector_distance_value=None,
reuse_workspace=False,
monitors=False,
**kwargs,
):
r"""Load an event NeXus file and filter into a WorkspaceGroup depending
on the provided filter options. Either a time_interval must be
provided or a log_name and log_value_interval.
Metadata added to output workspace includes the ``slice`` number,
``number_of_slices``, ``slice_parameter``, ``slice_interval``,
``slice_start`` and ``slice_end``.
For EQSANS two WorkspaceGroup's are return, one for the filtered data and one for filtered monitors
Parameters
----------
run: str, ~mantid.api.IEventWorkspace
Examples: ``CG3_55555``, ``CG355555`` or file path.
sample_to_si_name: str
Meta data name for sample to Silicon window distance
si_nominal_distance: float
distance between nominal position to silicon window. unit = meter
data_dir: str, list
Additional data search directories
output_workspace: str
If not specified it will be ``BIOSANS_55555`` determined from the supplied value of ``run``.
output_suffix: str
If the ``output_workspace`` is not specified, this is appended to the automatically generated
output workspace name.
overwrite_instrument: bool, str
If not :py:obj:`False`, ignore the instrument embedeed in the Nexus file. If :py:obj:`True`, use the
latest instrument definition file (IDF) available in Mantid. If ``str``, then it should be the filepath to the
desired IDF.
pixel_calibration: bool
Adjust pixel heights and widths according to bar-scan and tube-width calibrations.
time_interval
Array for lengths of time intervals for splitters. If the array has one value,
then all splitters will have same time intervals. If the size of the array is larger
than one, then the splitters can have various time interval values.
time_offset
Offset to be added to the start time of the first splitter, in seconds.
time_period
A multiple integer of the time interval. If specified, it indicates that the time
slicing is periodic so that events in time intervals separated by one (or more) period
should be reduced together.
sample_to_si_value: float or None
Sample to silicon window distance to overwrite the EPICS value. None for no operation. unit = meter
sample_detector_distance_value: float or None
Sample to detector distance to overwrite the EPICS value. None for no operation. unit = meter
log_name: string
Name of the sample log to use to filter. For example, the pulse charge is recorded in 'ProtonCharge'.
log_value_interval: float
Delta of log value to be sliced into from min log value and max log value.
reuse_workspace: bool
When true, return the ``output_workspace`` if it already exists
monitors: bool
flag to load monitors
kwargs: dict
Additional positional arguments for :ref:`LoadEventNexus <algm-LoadEventNexus-v1>`.
Returns
-------
WorkspaceGroup
Reference to the workspace groups containing all the split workspaces
"""
# Load workspace
ws = load_events(
run=run,
data_dir=data_dir,
output_workspace="_load_tmp",
overwrite_instrument=overwrite_instrument,
pixel_calibration=pixel_calibration,
output_suffix=output_suffix,
detector_offset=0.0,
sample_offset=0.0,
reuse_workspace=reuse_workspace,
**dict(kwargs, LoadMonitors=True),
)
# Calculate offset with overwriting to sample-detector-distance
ws = set_sample_detector_position(
ws,
sample_to_si_name,
si_nominal_distance,
sample_to_si_value,
sample_detector_distance_value,
)
# determine which SANS instrument from the data file
instrument_name = instrument_enum_name(run)
# create default name for output workspace
if (output_workspace is None) or (not output_workspace) or (output_workspace == "None"):
run_number = extract_run_number(run) if isinstance(run, str) else ""
output_workspace = "{}_{}{}".format(instrument_name, run_number, output_suffix)
# Split the workspace
split_ws_group = generic_load_and_split(
run=ws,
data_dir=data_dir,
output_workspace=output_workspace,
overwrite_instrument=overwrite_instrument,
output_suffix=output_suffix,
detector_offset=0.0,
sample_offset=0.0,
time_interval=time_interval,
time_offset=time_offset,
time_period=time_period,
log_name=log_name,
log_value_interval=log_value_interval,
reuse_workspace=reuse_workspace,
monitors=monitors,
instrument_unique_name=instrument_name,
is_mono=True,
**kwargs,
)
return split_ws_group