# Move part of the methods from api.py to avoid importing in loops
import os
from collections import namedtuple
from typing import Dict, List, Tuple
import numpy as np
from mantid.simpleapi import (
SaveAscii,
mtd,
) # noqa E402
# Import rolled up to complete a single top-level API
from drtsans import ( # noqa E402
apply_sensitivity_correction,
solid_angle_correction,
subtract_background, # noqa E402
)
from drtsans.dataobjects import IQmod, IQazimuthal # noqa E402
from drtsans.iq import bin_all # noqa E402
from drtsans.mask_utils import apply_mask # noqa E402
from drtsans.thickness_normalization import normalize_by_thickness # noqa E402
from drtsans.tof.eqsans.correction_api import (
CorrectionConfiguration,
do_inelastic_incoherence_correction,
save_k_vector,
)
from drtsans.tof.eqsans.dark_current import subtract_dark_current # noqa E402
from drtsans.tof.eqsans.elastic_reference_normalization import (
normalize_by_elastic_reference_all,
)
from drtsans.tof.eqsans.normalization import normalize_by_flux # noqa E402
from drtsans.tof.eqsans.transmission import calculate_transmission # noqa E402
from drtsans.transmission import apply_transmission_correction # noqa E402
# Binning parameters
BinningSetup = namedtuple(
"binning_setup",
"nxbins_main nybins_main n1dbins n1dbins_per_decade "
"decade_on_center bin1d_type log_scale qmin, qmax, qxrange, qyrange",
)
[docs]
def prepare_data_workspaces(
data: namedtuple,
dark_current=None,
flux_method=None, # normalization (proton charge/time/monitor)
flux=None, # additional file for normalization
mask_ws=None, # apply a custom mask from workspace
mask_panel=None, # mask back or front panel
mask_btp=None, # mask bank/tube/pixel
solid_angle=True,
sensitivity_workspace=None,
output_workspace=None,
):
r"""
Given a " raw"data workspace, this function provides the following:
- subtracts dark current
- normalize by time or monitor
- applies masks
- corrects for solid angle
- corrects for sensitivity
All steps are optional. data, mask_ws, dark_current are either None
or histogram workspaces. This function does not load any file.
Parameters
----------
data: namedtuple
(~mantid.dataobjects.Workspace2D, ~mantid.dataobjects.Workspace2D)
raw workspace (histogram) for data and monitor
dark_current: ~mantid.dataobjects.Workspace2D
histogram workspace containing the dark current measurement
flux_method: str
Method for flux normalization. Either 'monitor', or 'time'.
flux: str
if ``flux_method`` is proton charge, then path to file containing the
wavelength distribution of the neutron flux. If ``flux method`` is
monitor, then path to file containing the flux-to-monitor ratios.
if ``flux_method`` is time, then pass one log entry name such
as ``duration`` or leave it as :py:obj:`None` for automatic log search.
mask_ws: ~mantid.dataobjects.Workspace2D
Mask workspace
mask_panel: str
Either 'front' or 'back' to mask whole front or back panel.
mask_btp: dict
Additional properties to Mantid's MaskBTP algorithm
solid_angle: bool
Apply the solid angle correction
sensitivity_workspace: str, ~mantid.api.MatrixWorkspace
workspace containing previously calculated sensitivity correction. This
overrides the sensitivity_filename if both are provided.
output_workspace: str
The output workspace name. If None will create data.name()+output_suffix
Returns
-------
~mantid.dataobjects.Workspace2D
Reference to the processed workspace
"""
if not output_workspace:
output_workspace = str(data.data)
output_workspace = output_workspace.replace("_raw_histo", "") + "_processed_histo"
mtd[str(data.data)].clone(OutputWorkspace=output_workspace) # name gets into workspace
# Dark current
if dark_current is not None and dark_current.data is not None:
subtract_dark_current(output_workspace, dark_current.data)
# Normalization
if flux_method is not None:
kw = dict(method=flux_method, output_workspace=output_workspace)
if flux_method == "monitor":
kw["monitor_workspace"] = data.monitor
normalize_by_flux(output_workspace, flux, **kw)
# Additional masks
if mask_btp is None:
mask_btp = dict()
apply_mask(output_workspace, panel=mask_panel, mask=mask_ws, **mask_btp)
# Solid angle
if solid_angle:
solid_angle_correction(output_workspace)
# Sensitivity
if sensitivity_workspace is not None:
apply_sensitivity_correction(output_workspace, sensitivity_workspace=sensitivity_workspace)
return mtd[output_workspace]
# NOTE: transformed from block of codes inside reduce_single_configuration
# for calculating transmission
[docs]
def process_transmission(
transmission_ws,
empty_trans_ws,
transmission_radius,
sensitivity_ws,
flux_method,
flux,
prefix,
type_name,
output_dir,
output_file_name,
):
# sample transmission
processed_transmission_dict = {} # for output log
raw_transmission_dict = {} # for output log
if transmission_ws.data is not None and empty_trans_ws is not None:
# process transition workspace from raw
processed_trans_ws_name = f"{prefix}_{type_name}_trans" # type_name: sample/background
processed_trans_ws = prepare_data_workspaces(
transmission_ws,
flux_method=flux_method,
flux=flux,
solid_angle=False,
sensitivity_workspace=sensitivity_ws,
output_workspace=processed_trans_ws_name,
)
# calculate transmission with fit function (default) Formula=a*x+b'
calculated_trans_ws = calculate_transmission(
processed_trans_ws,
empty_trans_ws,
radius=transmission_radius,
radius_unit="mm",
)
print(f"{type_name} transmission =", calculated_trans_ws.extractY()[0, 0])
# optionally save
if output_dir:
# save calculated transmission
transmission_filename = os.path.join(output_dir, f"{output_file_name}_trans.txt")
SaveAscii(calculated_trans_ws, Filename=transmission_filename)
# Prepare result for drtsans.savereductionlog
processed_transmission_dict["value"] = calculated_trans_ws.extractY()
processed_transmission_dict["error"] = calculated_trans_ws.extractE()
processed_transmission_dict["wavelengths"] = calculated_trans_ws.extractX()
# Prepare result for drtsans.savereductionlog including raw sample transmission
sample_trans_raw_ws = calculate_transmission(
processed_trans_ws,
empty_trans_ws,
radius=transmission_radius,
radius_unit="mm",
fit_function="",
)
raw_tr_fn = os.path.join(output_dir, f"{output_file_name}_raw_trans.txt")
SaveAscii(sample_trans_raw_ws, Filename=raw_tr_fn)
# Prepare result for drtsans.savereductionlog
raw_transmission_dict["value"] = sample_trans_raw_ws.extractY()
raw_transmission_dict["error"] = sample_trans_raw_ws.extractE()
raw_transmission_dict["wavelengths"] = sample_trans_raw_ws.extractX()
else:
calculated_trans_ws = None
return calculated_trans_ws, processed_transmission_dict, raw_transmission_dict
[docs]
def bin_i_with_correction(
iq1d_in_frames: List[IQmod],
iq2d_in_frames: List[IQazimuthal],
frameskip_frame: int,
slice_name: str,
weighted_errors: bool,
user_qmin: float,
user_qmax: float,
num_x_bins: int,
num_y_bins: int,
num_q1d_bins: int,
num_q1d_bins_per_decade: int,
decade_on_center: bool,
bin1d_type: str,
log_binning: bool,
annular_bin: float,
wedges: List[Tuple[int, int]],
symmetric_wedges: bool,
correction_setup: CorrectionConfiguration,
iq1d_elastic_ref_fr: List[IQmod],
iq2d_elastic_ref_fr: List[IQazimuthal],
raw_name: str,
output_dir: str,
output_filename: str = "",
) -> Tuple[IQazimuthal, List[IQmod]]:
"""Bin I(Q) in 1D and 2D with the option to do elastic and/or inelastic incoherent correction
Parameters
----------
iq1d_in_frames: list[~drtsans.dataobjects.IQmod]
Objects containing 1D unbinned data I(\|Q\|). It will be used for scalar binned data
iq2d_in_frames: list[~drtsans.dataobjects.IQazimuthal]
Objects containing 2D unbinned data I(Qx, Qy). It will be used for 2D binned data,
and 1D wedge or annular binned data
frameskip_frame: int
Index of the frame in ``iq1d_in_frames`` and ``iq2d_in_frames`` to bin and correct
weighted_errors: bool
If True, the binning is done using the Weighted method
user_qmin: float
Minimum value of the momentum transfer modulus Q
user_qmax: float
Maximum value of the momentum transfer modulus Q
num_x_bins: int
Number of bins in the x direction for 2D binning
num_y_bins: int
Number of bins in the y direction for 2D binning
num_q1d_bins: int
Number of bins for the 1d binning.
num_q1d_bins_per_decade: int
Total number of bins will be this value multiplied by
number of decades from X min to X max
decade_on_center: bool
Flag to have the min X and max X on bin center; Otherwise, they will be on bin boundary
bin1d_type: str
Type of binning for 1D data. Possible choices are 'scalar', 'annular', or 'wedge'
log_binning: bool
If True, 1D scalar or wedge binning will be logarithmic. Ignored for anything else
annular_bin: float
Width of annular bin in degrees. Annular binning is linear
wedges: list
List of tuples (angle_min, angle_max) for the wedges. Both numbers have to be in
the [-90,270) range. It will add the wedge offset by 180 degrees dependent
on ``symmetric_wedges``
symmetric_wedges: bool
It will add the wedge offset by 180 degrees if True
correction_setup: ~drtsans.tof.eqsans.correction_api.CorrectionConfiguration
Parameters for elastic and inelastic/incoherence scattering correction
iq1d_elastic_ref_fr: list[~drtsans.dataobjects.IQmod]
Objects containing 1D unbinned data I(\|Q\|) for the elastic reference run
iq2d_elastic_ref_fr: list[~drtsans.dataobjects.IQazimuthal]
Objects containing 2D unbinned data I(Qx, Qy) for the elastic reference run
raw_name: str
Prefix for file to save inelastic incoherent correction (B) data
output_dir: str
Output directory for I(Q) profiles
output_filename: str
Output filename parsed from input configuration file (JSON)
Returns
-------
(~drtsans.dataobjects.IQazimuthal, list[~drtsans.dataobjects.IQmod])
- Binned and optionally corrected IQazimuthal
- List of binned and optionally corrected IQmod objects. The list has length
1, unless the 'wedge' mode is selected, when the length is the number of
original wedges
"""
# Setup for corrections
if correction_setup.do_elastic_correction or any(correction_setup.do_inelastic_correction):
# If any correction is turned on, then weighted_errors is always true
weighted_errors = True
# Define qmin and qmax for this frame
if user_qmin is None:
qmin = iq1d_in_frames[frameskip_frame].mod_q.min()
else:
qmin = user_qmin
if user_qmax is None:
qmax = iq1d_in_frames[frameskip_frame].mod_q.max()
else:
qmax = user_qmax
# Set qxrange and qyrange for this frame
qxrange = np.min(iq2d_in_frames[frameskip_frame].qx), np.max(iq2d_in_frames[frameskip_frame].qx)
qyrange = np.min(iq2d_in_frames[frameskip_frame].qy), np.max(iq2d_in_frames[frameskip_frame].qy)
# Bin I(Q1D, wl) and I(Q2D, wl) in Q and (Qx, Qy) space respectively but not wavelength
iq2d_main_wl, iq1d_main_wl = bin_all(
iq2d_in_frames[frameskip_frame],
iq1d_in_frames[frameskip_frame],
num_x_bins,
num_y_bins,
n1dbins=num_q1d_bins,
n1dbins_per_decade=num_q1d_bins_per_decade,
decade_on_center=decade_on_center,
bin1d_type=bin1d_type,
log_scale=log_binning,
qmin=qmin,
qmax=qmax,
qxrange=qxrange,
qyrange=qyrange,
annular_angle_bin=annular_bin,
wedges=wedges,
symmetric_wedges=symmetric_wedges,
error_weighted=weighted_errors,
n_wavelength_bin=None,
)
# Check due to functional limitation
assert isinstance(iq1d_main_wl, list), f"Output I(Q) must be a list but not a {type(iq1d_main_wl)}"
if len(iq1d_main_wl) != 1:
raise NotImplementedError(f"Not expected that there are more than 1 IQmod main but {len(iq1d_main_wl)}")
else:
...
# Elastic correction
if correction_setup.do_elastic_correction:
elastic_output_dir = os.path.join(
output_dir, "info", "elastic_norm", output_filename, slice_name, f"frame_{frameskip_frame}"
)
os.makedirs(elastic_output_dir, exist_ok=True)
k_file_prefix = f"{raw_name}"
# Bin elastic reference run
if iq1d_elastic_ref_fr:
# bin the reference elastic runs of the current frame
iq2d_elastic_wl, iq1d_elastic_wl = bin_all(
iq2d_elastic_ref_fr[frameskip_frame],
iq1d_elastic_ref_fr[frameskip_frame],
num_x_bins,
num_y_bins,
n1dbins=num_q1d_bins,
n1dbins_per_decade=num_q1d_bins_per_decade,
decade_on_center=decade_on_center,
bin1d_type=bin1d_type,
log_scale=log_binning,
qmin=qmin,
qmax=qmax,
qxrange=qxrange,
qyrange=qyrange,
annular_angle_bin=annular_bin,
wedges=wedges,
symmetric_wedges=symmetric_wedges,
error_weighted=weighted_errors,
n_wavelength_bin=None,
)
if len(iq1d_elastic_wl) != 1:
raise NotImplementedError("Not expected that there are more than 1 IQmod of elastic reference run.")
iq2d_main_wl, iq1d_wl, k_vec, k_error_vec = normalize_by_elastic_reference_all(
iq2d_main_wl,
iq1d_main_wl[0],
iq1d_elastic_wl[0],
correction_setup.output_wavelength_dependent_profile,
elastic_output_dir,
)
iq1d_main_wl[0] = iq1d_wl
# write
save_k_vector(
iq1d_wl.wavelength,
k_vec,
k_error_vec,
path=os.path.join(elastic_output_dir, f"{output_filename}_elastic_k1d_{k_file_prefix}.dat"),
)
# Inelastic incoherence correction
if correction_setup.do_inelastic_correction[frameskip_frame]:
inelastic_output_dir = os.path.join(
output_dir, "info", "inelastic_incoh", output_filename, slice_name, f"frame_{frameskip_frame}"
)
os.makedirs(inelastic_output_dir, exist_ok=True)
b_file_prefix = f"{raw_name}"
# 1D correction
iq2d_main_wl, iq1d_wl = do_inelastic_incoherence_correction(
iq2d_main_wl,
iq1d_main_wl[0],
frameskip_frame,
correction_setup,
b_file_prefix,
inelastic_output_dir,
output_filename,
)
iq1d_main_wl[0] = iq1d_wl
if not correction_setup.do_elastic_correction and not any(correction_setup.do_inelastic_correction):
finite_iq1d = iq1d_in_frames[frameskip_frame]
finite_iq2d = iq2d_in_frames[frameskip_frame]
qmin = user_qmin
qmax = user_qmax
else:
# Be finite
finite_iq1d = iq1d_main_wl[0].be_finite()
finite_iq2d = iq2d_main_wl.be_finite()
# Bin binned I(Q1D, wl) and and binned I(Q2D, wl) in wavelength space
assert len(iq1d_main_wl) == 1, (
f"It is assumed that output I(Q) list contains 1 I(Q) but not {len(iq1d_main_wl)}"
)
# Bin output in wavelength space
iq2d_main_out, iq1d_main_out = bin_all(
finite_iq2d,
finite_iq1d,
num_x_bins,
num_y_bins,
n1dbins=num_q1d_bins,
n1dbins_per_decade=num_q1d_bins_per_decade,
decade_on_center=decade_on_center,
bin1d_type=bin1d_type,
log_scale=log_binning,
qmin=qmin,
qmax=qmax,
qxrange=None,
qyrange=None,
annular_angle_bin=annular_bin,
wedges=wedges,
symmetric_wedges=symmetric_wedges,
# When set to true, reduces high uncertainty in the high-Q limit when low statistics
error_weighted=weighted_errors,
)
return iq2d_main_out, iq1d_main_out
[docs]
def remove_workspaces(
reduction_config: Dict,
instrument_name: str,
prefix: str,
sample_run_number,
center_run_number,
extra_run_numbers: List,
):
"""Helping method to remove existing workspaces"""
from drtsans.instruments import extract_run_number # noqa E402
from drtsans.path import registered_workspace # noqa E402
# In the future this should be made optional
ws_to_remove = [f"{prefix}_{instrument_name}_{run_number}_raw_histo" for run_number in extra_run_numbers]
# List special workspaces and workspace groups
ws_to_remove.append(f"{prefix}_{instrument_name}_{sample_run_number}_raw_histo_slice_group")
ws_to_remove.append(f"{prefix}_{instrument_name}_{center_run_number}_raw_events")
ws_to_remove.append(f"{prefix}_sensitivity")
ws_to_remove.append(f"{prefix}_mask")
if reduction_config["darkFileName"]:
run_number = extract_run_number(reduction_config["darkFileName"])
ws_to_remove.append(f"{prefix}_{instrument_name}_{run_number}_raw_histo")
for ws_name in ws_to_remove:
# Remove existing workspaces, this is to guarantee that all the data is loaded correctly
if registered_workspace(ws_name):
mtd.remove(ws_name)