# Move part of the methods from api.py to avoid importing in loops
import os
from collections import namedtuple
from typing import Dict, List, Tuple
from mantid.simpleapi import (
SaveAscii,
mtd,
) # noqa E402
from mantid.kernel import logger # noqa E402
# Import rolled up to complete a single top-level API
from drtsans import ( # noqa E402
apply_sensitivity_correction,
solid_angle_correction,
subtract_background, # noqa E402
)
from drtsans.dataobjects import IQmod, IQazimuthal # noqa E402
from drtsans.iq import bin_all # noqa E402
from drtsans.mask_utils import apply_mask # noqa E402
from drtsans.thickness_normalization import normalize_by_thickness # noqa E402
from drtsans.tof.eqsans.blocked_beam import subtract_blocked_beam # noqa E402
from drtsans.tof.eqsans.correction_api import (
CorrectionConfiguration,
)
from drtsans.tof.eqsans.dark_current import subtract_dark_current # noqa E402
from drtsans.tof.eqsans.elastic_correction import (
elastic_correction,
)
from drtsans.tof.eqsans.inelastic_correction import inelastic_correction
from drtsans.tof.eqsans.normalization import normalize_by_flux # noqa E402
from drtsans.tof.eqsans.transmission import calculate_transmission # noqa E402
from drtsans.transmission import apply_transmission_correction # noqa E402
# Binning parameters
BinningSetup = namedtuple(
"binning_setup",
"nxbins_main nybins_main n1dbins n1dbins_per_decade "
"decade_on_center bin1d_type log_scale qmin, qmax, qxrange, qyrange",
)
[docs]
def prepare_data_workspaces(
data: namedtuple,
dark_current=None,
flux_method=None, # normalization (proton charge/time/monitor)
flux=None, # additional file for normalization
mask_ws=None, # apply a custom mask from workspace
mask_panel=None, # mask back or front panel
mask_btp=None, # mask bank/tube/pixel
solid_angle=True,
sensitivity_workspace=None,
blocked_beam=None, # blocked beam workspace
output_workspace=None,
):
r"""
Given a " raw"data workspace, this function provides the following:
- subtracts dark current
- normalize by time or monitor
- applies masks
- corrects for solid angle
- corrects for sensitivity
All steps are optional. data, mask_ws, dark_current are either None
or histogram workspaces. This function does not load any file.
Parameters
----------
data: namedtuple
(~mantid.dataobjects.Workspace2D, ~mantid.dataobjects.Workspace2D)
raw workspace (histogram) for data and monitor
dark_current: ~mantid.dataobjects.Workspace2D
histogram workspace containing the dark current measurement
flux_method: str
Method for flux normalization. Either 'monitor', or 'time'.
flux: str
if ``flux_method`` is proton charge, then path to file containing the
wavelength distribution of the neutron flux. If ``flux method`` is
monitor, then path to file containing the flux-to-monitor ratios.
if ``flux_method`` is time, then pass one log entry name such
as ``duration`` or leave it as :py:obj:`None` for automatic log search.
mask_ws: ~mantid.dataobjects.Workspace2D
Mask workspace
mask_panel: str
Either 'front' or 'back' to mask whole front or back panel.
mask_btp: dict
Additional properties to Mantid's MaskBTP algorithm
solid_angle: bool
Apply the solid angle correction
sensitivity_workspace: str, ~mantid.api.MatrixWorkspace
workspace containing previously calculated sensitivity correction. This
overrides the sensitivity_filename if both are provided.
blocked_beam: ~mantid.dataobjects.Workspace2D
histogram workspace containing the blocked beam measurement
output_workspace: str
The output workspace name. If None will create data.name()+output_suffix
Returns
-------
~mantid.dataobjects.Workspace2D
Reference to the processed workspace
"""
if not output_workspace:
output_workspace = str(data.data)
output_workspace = output_workspace.replace("_raw_histo", "") + "_processed_histo"
mtd[str(data.data)].clone(OutputWorkspace=output_workspace) # name gets into workspace
# Dark current
if dark_current is not None and dark_current.data is not None:
subtract_dark_current(output_workspace, dark_current.data)
# Normalization
if flux_method is not None:
kw = dict(method=flux_method, output_workspace=output_workspace)
if flux_method == "monitor":
kw["monitor_workspace"] = data.monitor
normalize_by_flux(output_workspace, flux, **kw)
# Blocked Beam
subtract_blocked_beam(
output_workspace, blocked_beam, flux_method=flux_method, flux=flux, dark_current=dark_current
)
# Additional masks
if mask_btp is None:
mask_btp = dict()
apply_mask(output_workspace, panel=mask_panel, mask=mask_ws, **mask_btp)
# Solid angle
if solid_angle:
solid_angle_correction(output_workspace)
# Sensitivity
if sensitivity_workspace is not None:
apply_sensitivity_correction(output_workspace, sensitivity_workspace=sensitivity_workspace)
return mtd[output_workspace]
# NOTE: transformed from block of codes inside reduce_single_configuration
# for calculating transmission
[docs]
def process_transmission(
transmission_ws,
empty_trans_ws,
transmission_radius,
sensitivity_ws,
flux_method,
flux,
prefix,
type_name,
output_dir,
output_file_name,
):
# sample transmission
processed_transmission_dict = {} # for output log
raw_transmission_dict = {} # for output log
if transmission_ws.data is not None and empty_trans_ws is not None:
# process transition workspace from raw
processed_trans_ws_name = f"{prefix}_{type_name}_trans" # type_name: sample/background
processed_trans_ws = prepare_data_workspaces(
transmission_ws,
flux_method=flux_method,
flux=flux,
solid_angle=False,
sensitivity_workspace=sensitivity_ws,
output_workspace=processed_trans_ws_name,
)
# calculate transmission with fit function (default) Formula=a*x+b'
calculated_trans_ws = calculate_transmission(
processed_trans_ws,
empty_trans_ws,
radius=transmission_radius,
radius_unit="mm",
)
print(f"{type_name} transmission =", calculated_trans_ws.extractY()[0, 0])
# optionally save
if output_dir:
# save calculated transmission
transmission_filename = os.path.join(output_dir, f"{output_file_name}_trans.txt")
SaveAscii(calculated_trans_ws, Filename=transmission_filename)
# Prepare result for drtsans.savereductionlog
processed_transmission_dict["value"] = calculated_trans_ws.extractY()
processed_transmission_dict["error"] = calculated_trans_ws.extractE()
processed_transmission_dict["wavelengths"] = calculated_trans_ws.extractX()
# Prepare result for drtsans.savereductionlog including raw sample transmission
sample_trans_raw_ws = calculate_transmission(
processed_trans_ws,
empty_trans_ws,
radius=transmission_radius,
radius_unit="mm",
fit_function="",
)
raw_tr_fn = os.path.join(output_dir, f"{output_file_name}_raw_trans.txt")
SaveAscii(sample_trans_raw_ws, Filename=raw_tr_fn)
# Prepare result for drtsans.savereductionlog
raw_transmission_dict["value"] = sample_trans_raw_ws.extractY()
raw_transmission_dict["error"] = sample_trans_raw_ws.extractE()
raw_transmission_dict["wavelengths"] = sample_trans_raw_ws.extractX()
else:
calculated_trans_ws = None
return calculated_trans_ws, processed_transmission_dict, raw_transmission_dict
[docs]
def bin_i_with_correction(
iq1d_in_frames: List[IQmod],
iq2d_in_frames: List[IQazimuthal],
frameskip_frame: int,
slice_name: str,
weighted_errors: bool,
user_qmin: float,
user_qmax: float,
num_x_bins: int,
num_y_bins: int,
num_q1d_bins: int,
num_q1d_bins_per_decade: int,
decade_on_center: bool,
bin1d_type: str,
log_binning: bool,
annular_bin: float,
wedges: List[Tuple[int, int]],
symmetric_wedges: bool,
correction_setup: CorrectionConfiguration,
iq1d_elastic_ref_fr: List[IQmod],
iq2d_elastic_ref_fr: List[IQazimuthal],
raw_name: str,
output_dir: str,
output_filename: str = "",
) -> Tuple[IQazimuthal, List[IQmod]]:
"""Bin I(Q) in 1D and 2D with the option to do elastic and/or inelastic incoherent correction
Parameters
----------
iq1d_in_frames: list[~drtsans.dataobjects.IQmod]
Objects containing 1D unbinned data I(|Q|). It will be used for scalar binned data
iq2d_in_frames: list[~drtsans.dataobjects.IQazimuthal]
Objects containing 2D unbinned data I(Qx, Qy). It will be used for 2D binned data,
and 1D wedge or annular binned data
frameskip_frame: int
Index of the frame in ``iq1d_in_frames`` and ``iq2d_in_frames`` to bin and correct
weighted_errors: bool
If True, the binning is done using the Weighted method
user_qmin: float
Minimum value of the momentum transfer modulus Q
user_qmax: float
Maximum value of the momentum transfer modulus Q
num_x_bins: int
Number of bins in the x direction for 2D binning
num_y_bins: int
Number of bins in the y direction for 2D binning
num_q1d_bins: int
Number of bins for the 1d binning.
num_q1d_bins_per_decade: int
Total number of bins will be this value multiplied by
number of decades from X min to X max
decade_on_center: bool
Flag to have the min X and max X on bin center; Otherwise, they will be on bin boundary
bin1d_type: str
Type of binning for 1D data. Possible choices are 'scalar', 'annular', or 'wedge'
log_binning: bool
If True, 1D scalar or wedge binning will be logarithmic. Ignored for anything else
annular_bin: float
Width of annular bin in degrees. Annular binning is linear
wedges: list
List of tuples (angle_min, angle_max) for the wedges. Both numbers have to be in
the [-90,270) range. It will add the wedge offset by 180 degrees dependent
on ``symmetric_wedges``
symmetric_wedges: bool
It will add the wedge offset by 180 degrees if True
correction_setup: ~drtsans.tof.eqsans.correction_api.CorrectionConfiguration
Parameters for elastic and inelastic/incoherence scattering correction
iq1d_elastic_ref_fr: list[~drtsans.dataobjects.IQmod]
Objects containing 1D unbinned data I(|Q|) for the elastic reference run
iq2d_elastic_ref_fr: list[~drtsans.dataobjects.IQazimuthal]
Objects containing 2D unbinned data I(Qx, Qy) for the elastic reference run
raw_name: str
Prefix for file to save inelastic incoherent correction (B) data
output_dir: str
Output directory for I(Q) profiles
output_filename: str
Output filename parsed from input configuration file (JSON)
Returns
-------
(~drtsans.dataobjects.IQazimuthal, list[~drtsans.dataobjects.IQmod])
- Binned and optionally corrected IQazimuthal
- List of binned and optionally corrected IQmod objects. The list has length
1, unless the 'wedge' mode is selected, when the length is the number of
original wedges
"""
# Get unbinned data for this frame
iq2d = iq2d_in_frames[frameskip_frame]
iq1d = iq1d_in_frames[frameskip_frame]
# EWM-13940: Preserve original errors for binning consistency
iq2d_orig_errors = iq2d.error.copy()
iq1d_orig_errors = iq1d.error.copy()
# Apply elastic correction if requested
if correction_setup.do_elastic_correction and iq1d_elastic_ref_fr and iq2d_elastic_ref_fr:
# Build output directory with slice and frame info
elastic_dir = os.path.join(
output_dir, "info", "elastic_norm", output_filename, slice_name, f"frame_{frameskip_frame}"
)
iq2d, iq1d = elastic_correction(
iq2d_unbinned=iq2d,
iq1d_unbinned=iq1d,
iq2d_elastic_ref=iq2d_elastic_ref_fr[frameskip_frame],
iq1d_elastic_ref=iq1d_elastic_ref_fr[frameskip_frame],
num_x_bins=num_x_bins,
num_y_bins=num_y_bins,
num_q1d_bins=num_q1d_bins,
num_q1d_bins_per_decade=num_q1d_bins_per_decade,
decade_on_center=decade_on_center,
bin1d_type=bin1d_type,
log_binning=log_binning,
user_qmin=user_qmin,
user_qmax=user_qmax,
annular_bin=annular_bin,
wedges=wedges,
symmetric_wedges=symmetric_wedges,
weighted_errors=True, # allways use weighted errors when computing the elastic correction
output_wavelength_profile=correction_setup.output_wavelength_dependent_profile,
output_dir=elastic_dir,
output_filename=output_filename,
raw_name=raw_name,
)
# EWM-13940: After elastic correction, replace errors with originals for binning consistency
# This ensures scalar and wedge modes use the same binning weights
iq2d = IQazimuthal(
intensity=iq2d.intensity,
error=iq2d_orig_errors,
qx=iq2d.qx,
qy=iq2d.qy,
wavelength=iq2d.wavelength,
delta_qx=iq2d.delta_qx,
delta_qy=iq2d.delta_qy,
)
iq1d = IQmod(
intensity=iq1d.intensity,
error=iq1d_orig_errors,
mod_q=iq1d.mod_q,
wavelength=iq1d.wavelength,
delta_mod_q=iq1d.delta_mod_q,
)
# Apply inelastic correction if requested
if correction_setup.do_inelastic_correction[frameskip_frame]:
# Build output directory with slice and frame info
inelastic_dir = os.path.join(
output_dir, "info", "inelastic_incoh", output_filename, slice_name, f"frame_{frameskip_frame}"
)
iq2d, iq1d = inelastic_correction(
iq2d_unbinned=iq2d,
iq1d_unbinned=iq1d,
num_x_bins=num_x_bins,
num_y_bins=num_y_bins,
num_q1d_bins=num_q1d_bins,
num_q1d_bins_per_decade=num_q1d_bins_per_decade,
decade_on_center=decade_on_center,
bin1d_type=bin1d_type,
log_binning=log_binning,
user_qmin=user_qmin,
user_qmax=user_qmax,
annular_bin=annular_bin,
wedges=wedges,
symmetric_wedges=symmetric_wedges,
weighted_errors=True, # allways use weighted errors when computing the inelastic correction
select_min_incoherence=correction_setup.select_min_incoherence,
intensity_weighted=correction_setup.select_intensityweighted[frameskip_frame],
incoh_qmin=correction_setup.qmin[frameskip_frame],
incoh_qmax=correction_setup.qmax[frameskip_frame],
incoh_factor=correction_setup.factor[frameskip_frame],
output_wavelength_profile=correction_setup.output_wavelength_dependent_profile,
output_dir=inelastic_dir,
output_filename=output_filename,
raw_name=raw_name,
)
# EWM-13940: After inelastic correction, replace errors with originals for binning consistency
# This ensures scalar and wedge modes use the same binning weights
iq2d = IQazimuthal(
intensity=iq2d.intensity,
error=iq2d_orig_errors,
qx=iq2d.qx,
qy=iq2d.qy,
wavelength=iq2d.wavelength,
delta_qx=iq2d.delta_qx,
delta_qy=iq2d.delta_qy,
)
iq1d = IQmod(
intensity=iq1d.intensity,
error=iq1d_orig_errors,
mod_q=iq1d.mod_q,
wavelength=iq1d.wavelength,
delta_mod_q=iq1d.delta_mod_q,
)
# Remove non-finite values before final binning
finite_iq2d = iq2d.be_finite()
finite_iq1d = iq1d.be_finite()
# ONE FINAL BINNING: Bin corrected (or uncorrected) unbinned data
# This is the ONLY binning that produces output - the "One Rebin Only" pattern
iq2d_main_out, iq1d_main_out = bin_all(
finite_iq2d,
finite_iq1d,
num_x_bins,
num_y_bins,
n1dbins=num_q1d_bins,
n1dbins_per_decade=num_q1d_bins_per_decade,
decade_on_center=decade_on_center,
bin1d_type=bin1d_type,
log_scale=log_binning,
qmin=user_qmin,
qmax=user_qmax,
qxrange=None,
qyrange=None,
annular_angle_bin=annular_bin,
wedges=wedges,
symmetric_wedges=symmetric_wedges,
error_weighted=weighted_errors,
)
return iq2d_main_out, iq1d_main_out
[docs]
def remove_workspaces(
reduction_config: Dict,
instrument_name: str,
prefix: str,
sample_run_number,
center_run_number,
extra_run_numbers: List,
):
"""Helping method to remove existing workspaces"""
from drtsans.instruments import extract_run_number # noqa E402
from drtsans.path import registered_workspace # noqa E402
# In the future this should be made optional
ws_to_remove = [f"{prefix}_{instrument_name}_{run_number}_raw_histo" for run_number in extra_run_numbers]
# List special workspaces and workspace groups
ws_to_remove.append(f"{prefix}_{instrument_name}_{sample_run_number}_raw_histo_slice_group")
ws_to_remove.append(f"{prefix}_{instrument_name}_{center_run_number}_raw_events")
ws_to_remove.append(f"{prefix}_sensitivity")
ws_to_remove.append(f"{prefix}_mask")
if "darkFileName" in reduction_config and reduction_config["darkFileName"]:
run_number = extract_run_number(reduction_config["darkFileName"])
ws_to_remove.append(f"{prefix}_{instrument_name}_{run_number}_raw_histo")
if "blockedBeamRunNumber" in reduction_config and reduction_config["blockedBeamRunNumber"]:
run_number = extract_run_number(reduction_config["blockedBeamRunNumber"])
ws_to_remove.append(f"{prefix}_{instrument_name}_{run_number}_raw_histo")
for ws_name in ws_to_remove:
# Remove existing workspaces, this is to guarantee that all the data is loaded correctly
if registered_workspace(ws_name):
mtd.remove(ws_name)