r"""
Links to mantid algorithms
CloneWorkspace <https://docs.mantidproject.org/nightly/algorithms/CloneWorkspace-v1.html>
ConvertToHistogram <https://docs.mantidproject.org/nightly/algorithms/ConvertToHistogram-v1.html>
DeleteWorkspace <https://docs.mantidproject.org/nightly/algorithms/DeleteWorkspace-v1.html>
Divide <https://docs.mantidproject.org/nightly/algorithms/Divide-v1.html>
Load <https://docs.mantidproject.org/nightly/algorithms/Load-v1.html>
LoadAscii <https://docs.mantidproject.org/nightly/algorithms/LoadAscii-v1.html>
Multiply <https://docs.mantidproject.org/nightly/algorithms/Multiply-v1.html>
NormaliseByCurrent <https://docs.mantidproject.org/nightly/algorithms/NormaliseByCurrent-v1.html>
RebinToWorkspace <https://docs.mantidproject.org/nightly/algorithms/RebinToWorkspace-v1.html>
RemoveSpectra <https://docs.mantidproject.org/nightly/algorithms/RemoveSpectra-v1.html>
Scale <https://docs.mantidproject.org/nightly/algorithms/Scale-v1.html>
SplineInterpolation <https://docs.mantidproject.org/nightly/algorithms/SplineInterpolation-v1.html>
"""
from mantid.simpleapi import (
CloneWorkspace,
ConvertToHistogram,
DeleteWorkspace,
Divide,
Load,
LoadAscii,
Multiply,
NormaliseByCurrent,
RebinToWorkspace,
RemoveSpectra,
Scale,
SplineInterpolation,
)
from mantid.api import mtd
import numpy as np
r"""
Hyperlinks to drtsans functions
SampleLogs <https://github.com/neutrons/drtsans/blob/next/src/drtsans/samplelogs.py>
path <https://github.com/neutrons/drtsans/blob/next/src/drtsans/path.py>
duration <https://github.com/neutrons/drtsans/blob/next/src/drtsans/tof/eqsans/dark_current.py>
""" # noqa: E501
from drtsans.samplelogs import SampleLogs
from drtsans import path
from drtsans.dark_current import duration as run_duration
__all__ = [
"normalize_by_flux",
"normalize_by_time",
"normalize_by_monitor",
"normalize_by_proton_charge_and_flux",
"ZeroNeutronFluxError",
]
[docs]
class ZeroNeutronFluxError(ValueError):
pass
def load_beam_flux_file(flux, data_workspace=None, output_workspace=None):
r"""
Loads the beam flux spectrum file.
**Mantid algorithms used:**
:ref:`LoadAscii <algm-LoadAscii-v1>`,
:ref:`ConvertToHistogram <algm-ConvertToHistogram-v1>`,
:ref:`ConvertToDistribution <algm-ConvertToDistribution-v1>`,
:ref:`NormaliseToUnity <algm-NormaliseToUnity-v1>`,
:ref:`RebinToWorkspace <algm-RebinToWorkspace-v1>`,
Parameters
----------
flux: str
Path to file with the wavelength distribution of the neutron
flux. Loader is Mantid `LoadAscii` algorithm.
data_workspace : str, MatrixWorkspace
Workspace to rebin the flux to. If None, then no rebin is performed
output_workspace: str
Name of the output workspace. If None, a hidden random name
will be assigned.
Returns
-------
MatrixWorkspace
"""
if output_workspace is None:
output_workspace = mtd.unique_hidden_name() # make a hidden workspace
# Load flux filename to a point-data workspace (we have as many intensities as wavelength values)
LoadAscii(
Filename=flux,
Separator="Tab",
Unit="Wavelength",
OutputWorkspace=output_workspace,
)
# In histogram data we have as many intensities as wavelength bins
ConvertToHistogram(InputWorkspace=output_workspace, OutputWorkspace=output_workspace)
if data_workspace is not None:
RebinToWorkspace(
WorkspaceToRebin=output_workspace,
WorkspaceToMatch=data_workspace,
OutputWorkspace=output_workspace,
)
return mtd[output_workspace]
[docs]
def normalize_by_proton_charge_and_flux(input_workspace, flux, output_workspace=None):
r"""
Normalizes the input workspace by proton charge and measured flux
**Mantid algorithms used:**
:ref:`RebinToWorkspace <algm-Divide-v1>`,
:ref:`Divide <algm-Divide-v1>`,
:ref:`DeleteWorkspace <algm-DeleteWorkspace-v1>`,
:ref:`NormaliseByCurrent <algm-NormaliseByCurrent-v1>`,
Parameters
----------
input_workspace : str, MatrixWorkspace
Workspace to be normalized, rebinned in wavelength.
flux : Workspace
Measured beam flux file ws, usually the output of `load_beam_flux_file`
output_workspace : str
Name of the normalized workspace. If None, the name of the input
workspace is chosen (the input workspace is overwritten).
Returns
-------
MatrixWorkspace
"""
if output_workspace is None:
output_workspace = str(input_workspace)
# Match the binning of the input workspace prior to carry out the division
rebinned_flux = mtd.unique_hidden_name()
RebinToWorkspace(
WorkspaceToRebin=flux,
WorkspaceToMatch=input_workspace,
OutputWorkspace=rebinned_flux,
)
# Normalize by the flux
Divide(
LHSWorkspace=input_workspace,
RHSWorkspace=rebinned_flux,
OutputWorkspace=output_workspace,
)
DeleteWorkspace(rebinned_flux) # remove the temporary rebinned flux workspace
# Normalize by the proton charge
try:
NormaliseByCurrent(InputWorkspace=output_workspace, OutputWorkspace=output_workspace)
except RuntimeError:
raise ZeroNeutronFluxError(f"Zero neutron flux for workspace: {output_workspace}")
return mtd[output_workspace]
def load_flux_to_monitor_ratio_file(
flux_to_monitor_ratio_file,
data_workspace=None,
loader_kwargs=dict(),
output_workspace=None,
):
r"""
Loads the flux-to-monitor ratio
**Mantid algorithms used:**
:ref:`Load <algm-Load-v1>`,
:ref:`ConvertToHistogram <algm-Divide-v1>`,
:ref:`SplineInterpolation <algm-Divide-v1>`,
Parameters
----------
flux_to_monitor_ratio_file: str
Path to file with the flux-to-monitor ratio data. Loader is
Mantid `LoadAscii` algorithm.
data_workspace: str, MatrixWorkspace
Match the binning of the flux-to-monitor workspace to that of the data workspace.
loader_kwargs: dict
optional keyword arguments to Mantid's Load algorithm.
output_workspace: str
Name of the output workspace. If None, a hidden random name
will be assigned.
Returns
-------
MatrixWorkspace
"""
if output_workspace is None:
output_workspace = mtd.unique_hidden_name() # make a hidden workspace
# Let Mantid figure out what kind file format is the flux file
Load(Filename=flux_to_monitor_ratio_file, OutputWorkspace=output_workspace, **loader_kwargs)
ConvertToHistogram(InputWorkspace=output_workspace, OutputWorkspace=output_workspace)
if data_workspace is not None:
SplineInterpolation(
WorkspaceToMatch=data_workspace,
WorkspaceToInterpolate=output_workspace,
OutputWorkspace=output_workspace,
)
return mtd[output_workspace]
[docs]
def normalize_by_monitor(input_workspace, flux_to_monitor, monitor_workspace, output_workspace=None):
r"""
Normalizes the input workspace by monitor count and flux-to-monitor
ratio.
**Mantid algorithms used:**
:ref:`RebinToWorkspace <algm-Divide-v1>`,
:ref:`RemoveSpectra <algm-RemoveSpectra-v1>`,
:ref:`CloneWorkspace <algm-CloneWorkspace-v1>`,
:ref:`SplineInterpolation <algm-SplineInterpolation-v1>`,
:ref:`Multiply <algm-Multiply-v1>`,
:ref:`Divide <algm-Divide-v1>`,
:ref:`DeleteWorkspace <algm-DeleteWorkspace-v1>`
Parameters
----------
input_workspace : str, MatrixWorkspace
Workspace to be normalized, rebinned in wavelength.
flux_to_monitor : str, MatrixWorkspace
Flux to monitor ratio. A file path or a workspace resulting from
calling `load_flux_to_monitor_ratio_file`.
monitor_workspace : str, MatrixWorkspace
Counts from the monitor.
output_workspace : str
Name of the normalized workspace. If None, the name of the input
workspace is chosen (the input workspace is overwritten).
Returns
-------
MatrixWorkspace
"""
if output_workspace is None:
output_workspace = str(input_workspace)
# Check non-skip mode
if bool(SampleLogs(input_workspace).is_frame_skipping.value) is True:
msg = "Normalization by monitor not possible in frame-skipping mode"
raise ValueError(msg)
# Only the first spectrum of the monitor is required
monitor_workspace_rebinned = mtd.unique_hidden_name()
RebinToWorkspace(monitor_workspace, input_workspace, OutputWorkspace=monitor_workspace_rebinned)
excess_idx = range(1, mtd[monitor_workspace_rebinned].getNumberHistograms()) # only one spectrum is needed
RemoveSpectra(
monitor_workspace_rebinned,
WorkspaceIndices=excess_idx,
OutputWorkspace=monitor_workspace_rebinned,
)
# Elucidate the nature of the flux to monitor input
flux_to_monitor_workspace = mtd.unique_hidden_name()
if isinstance(flux_to_monitor, str) and path.exists(flux_to_monitor):
load_flux_to_monitor_ratio_file(
flux_to_monitor,
data_workspace=input_workspace,
output_workspace=flux_to_monitor_workspace,
)
else:
CloneWorkspace(flux_to_monitor, OutputWorkspace=flux_to_monitor_workspace)
# Match the binning to that of the input workspace. Necessary prior to division
SplineInterpolation(
WorkspaceToMatch=input_workspace,
WorkspaceToInterpolate=flux_to_monitor_workspace,
OutputWorkspace=flux_to_monitor_workspace,
)
# the neutron flux integrated over the duration of the run is the product of the monitor counts and the
# flux-to-monitor ratios
flux_workspace = mtd.unique_hidden_name()
Multiply(
monitor_workspace_rebinned,
flux_to_monitor_workspace,
OutputWorkspace=flux_workspace,
)
# if the neutron flux is zero we don't want to continue, raise an error
if np.count_nonzero(mtd[flux_workspace].readY(0)) == 0:
raise ZeroNeutronFluxError(f"Zero neutron flux for workspace: {output_workspace}")
# Normalize our input workspace
Divide(
LHSWorkspace=input_workspace,
RHSWorkspace=flux_workspace,
OutputWorkspace=output_workspace,
)
# Clean the dust balls
[
DeleteWorkspace(name)
for name in (
flux_to_monitor_workspace,
flux_workspace,
monitor_workspace_rebinned,
)
]
return mtd[output_workspace]
[docs]
def normalize_by_time(input_workspace, log_key=None, output_workspace=None):
r"""
Divide the counts by the duration of the run
**Mantid algorithms used:**
:ref:`Scale <algm-Scale-v1>`
Parameters
----------
input_workspace: str, ~mantid.api.MatrixWorkspace
log_key: str
Use this log entry to figure out the run duration. If :py:obj:`None`,
logs are sequentially searched for keys ``duration``, ``start_time``,
``proton_charge``, and ``timer``, in order to find out the duration.
output_workspace : str
Name of the normalized workspace. If :py:obj:`None`, the name of the input
workspace is chosen (and the input workspace is overwritten).
Returns
-------
~mantid.api.MatrixWorkspace
"""
if output_workspace is None:
output_workspace = str(input_workspace)
duration = run_duration(input_workspace, log_key=log_key)
Scale(
input_workspace,
Factor=1.0 / duration.value,
Operation="Multiply",
OutputWorkspace=output_workspace,
)
SampleLogs(output_workspace).insert("normalizing_duration", duration.log_key)
return mtd[output_workspace]
[docs]
def normalize_by_flux(
input_workspace,
flux,
method="proton charge",
monitor_workspace=None,
output_workspace=None,
):
r"""
Normalize counts by several methods to estimate the neutron flux.
This function calls specialized normalizing functions based on ``method`` argument.
Those functions are:
- normalize_by_time
- normalize_by_monitor
- normalize_by_proton_charge_and_flux
Parameters
----------
input_workspace: ~mantid.api.MatrixWorkspace
Input workspace, binned in wavelength
flux: str
If ``method`` is 'proton charge', flux is the path to the file
containing the wavelength distribution of the neutron flux. If
``method`` is 'monitor', then flux is the path to the file containing
a pre-measured flux-to-monitor ratio spectrum. If ``flux_method``
is 'time', then pass one log entry name such as 'duration' or pass
:py:obj:`None` for automatic log search.
method: str
Either 'proton charge', 'monitor', or 'time'
monitor_workspace: str, ~mantid.api.MatrixWorkspace
Prepared monitor workspace
output_workspace : str
Name of the normalized workspace. If :py:obj:`None`, the name of the input
workspace is chosen (the input workspace is overwritten).
Returns
-------
~mantid.api.MatrixWorkspace
"""
if output_workspace is None:
output_workspace = str(input_workspace)
# Use the appropriate flux file loader
if method == "proton charge":
w_flux = load_beam_flux_file(flux, data_workspace=input_workspace)
elif method == "monitor":
w_flux = load_flux_to_monitor_ratio_file(flux, data_workspace=input_workspace)
else:
w_flux = None
# Select the normalization function
normalizer = {
"proton charge": normalize_by_proton_charge_and_flux,
"monitor": normalize_by_monitor,
"time": normalize_by_time,
}
# Arguments specific to the normalizer
args = {"proton charge": [w_flux], "monitor": [w_flux, monitor_workspace]}
args = args.get(method, list())
kwargs = {"time": dict(log_key=flux)}
kwargs = kwargs.get(method, dict())
normalizer[method](input_workspace, *args, output_workspace=output_workspace, **kwargs)
# A bit of cleanup
if w_flux:
w_flux.delete()
return mtd[output_workspace]