"""
This module provides the functionality to correct I(Q, wavelength) accounting for wavelength-dependent
inelastic scattering, for both 1D and 2D data.
"""
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/-/issues/689
# Step 3: Correct wave-dependent incoherence intensity for I(Q, wavelength)
from drtsans.tof.eqsans.elastic_correction import (
build_i1d_from_intensity_domain_meshgrid,
build_i1d_one_wl_from_intensity_domain_meshgrid,
determine_common_domain_range_mesh,
determine_reference_wavelength_intensity_mesh,
reshape_intensity_domain_meshgrid,
)
from drtsans.dataobjects import getDataType, DataType, save_i1d, IQazimuthal, IQmod
from collections import namedtuple
from mantid.kernel import logger
from typing import Tuple, List, Optional
import numpy as np
import os
__all__ = [
"correct_incoherence_inelastic_all",
"calculate_incoherence_correction_factors",
"apply_incoherence_correction_to_unbinned_data",
"correct_incoherence_inelastic_1d",
"correct_incoherence_inelastic_2d",
"CorrectedI1D",
"CorrectedIQ2D",
"CorrectionFactors",
"inelastic_correction",
]
# Output of corrected 1D case
CorrectedI1D = namedtuple("CorrectedI1D", "i1d b_factor b_error")
# Output of corrected 2D case
CorrectedIQ2D = namedtuple("CorrectedIQ2D", "iq2d b_factor b_error")
# Output of correction factors calculation
CorrectionFactors = namedtuple("CorrectionFactors", "b_factor b_error wavelength")
# ====================================================================================
# 2D Incoherence Correction Functions
# ====================================================================================
def reshape_q_azimuthal(i_of_q):
"""Enforce flattened and sorted IQazimuthal setup
2D incoherence correction as implemented operates on IQazimuthal data assuming that
the numpy arrays are one dimensional and sorted such that a 3D array of each IQazimuthal
attribute structured [Qx index, Qy index, wavelength index] would equal the desired array
when flattened by numpy.ndarray.flatten
Parameters
----------
i_of_q: ~drtsans.dataobjects.IQazimuthal
Input I(Qx, Qy, wavelength)
Returns
-------
~drtsans.dataobject.IQazimuthal
flattened and sorted input
"""
flat_i_of_q = i_of_q.ravel()
# lexsort sorts from last argument to first (qx, then qy, then wavelength in this case)
# this recreates the ordering of numpy.ndarray.flatten on array form [Qx, Qy, wavelength]
index_sorted = np.lexsort((flat_i_of_q.wavelength, flat_i_of_q.qy, flat_i_of_q.qx))
kwargs = dict()
if flat_i_of_q.delta_qx is not None:
kwargs["delta_qx"] = flat_i_of_q.delta_qx[index_sorted]
if flat_i_of_q.delta_qy is not None:
kwargs["delta_qy"] = flat_i_of_q.delta_qy[index_sorted]
return IQazimuthal(
intensity=flat_i_of_q.intensity[index_sorted],
error=flat_i_of_q.error[index_sorted],
qx=flat_i_of_q.qx[index_sorted],
qy=flat_i_of_q.qy[index_sorted],
wavelength=flat_i_of_q.wavelength[index_sorted],
**kwargs,
)
[docs]
def correct_incoherence_inelastic_2d(i_of_q, b_array):
"""Correct I(Q2D) with wavelength dependent incoherence inelastic scattering
This method implements the workflow for correcting I(Q2D) with
wavelength-dependent incoherent inelastic scattering
The correction of I(Q2D) uses the correction term b calculated from the 1D I(Q, lambda), since
this gives a lower statistical error in b. The error in the corrected I(Q2D) is calculated
assuming b is independent of I(Q2D).
Parameters
----------
i_of_q: ~drtsans.dataobjects.IQazimuthal
I(Qx, Qy, wavelength) with error
b_array: ~numpy.ndarray
2D numpy array for B[wavelength], B error[wavelength]
Returns
-------
CorrectedIQ2D
named tuple of corrected I(Qx, Qy, wavelength), b1d, b1d error
"""
# coerce IQazimuthal data to desired shapes
_i_of_q = reshape_q_azimuthal(i_of_q)
# grab unique lengths
_qx_len = np.unique(_i_of_q.qx).shape[0]
_qy_len = np.unique(_i_of_q.qy).shape[0]
# get b values
b1d, b1d_error = b_array
corrected_intensity = _i_of_q.intensity - np.tile(b1d, _qx_len * _qy_len)
corrected_error = np.sqrt(_i_of_q.error**2 + np.tile(b1d_error**2, _qx_len * _qy_len))
corrected_i_of_q = IQazimuthal(
intensity=corrected_intensity,
error=corrected_error,
qx=_i_of_q.qx,
qy=_i_of_q.qy,
wavelength=_i_of_q.wavelength,
delta_qx=_i_of_q.delta_qx,
delta_qy=_i_of_q.delta_qy,
)
corrected = CorrectedIQ2D(iq2d=corrected_i_of_q, b_factor=b1d, b_error=b1d_error)
return corrected
# ====================================================================================
# 1D Incoherence Correction Functions
# ====================================================================================
[docs]
def correct_incoherence_inelastic_all(
i_of_q_2d,
i1d,
select_minimum_incoherence,
intensity_weighted=False,
qmin=None,
qmax=None,
factor=None,
output_wavelength_dependent_profile=False,
output_dir=None,
):
"""Correct I(Q2D) and I(1D) accounting wavelength dependant incoherent inelastic scattering
This is the envelope method for the complete workflow to correct I(1D) and I(Q2D) accounting
wavelength-dependent incoherent inelastic scattering
Parameters
----------
i_of_q_2d: ~drtsans.dataobjects.IQazimuthal or None
I(Q2D, wavelength) and error
i1d: ~drtsans.dataobjects.IQmod | ~drtsans.dataobjects.I1DAnnular
I(1D, wavelength) and error
select_minimum_incoherence: bool
flag to determine correction B by minimum incoherence
intensity_weighted: bool
if set to true, the B factor is calculated using weighted function by intensity
qmin: float
manually set the qmin used for incoherent calculation
qmax: float
manually set the qmax used for incoherent calculation
factor: float
automatically determine the qmin qmax by checking the intensity profile
output_wavelength_dependent_profile: bool
If True then output intensity profile for each wavelength before and after b correction
output_dir: str
output directory for intensity profiles
Returns
-------
tuple[CorrectedIQ2D, CorrectedI1D]
"""
# Convert to mesh grid I(Q) and delta I(Q), or I(phi) for annular binning
# x is either Q or phi
wl_vec, x_vec, i_array, error_array, delta_x_array = reshape_intensity_domain_meshgrid(i1d)
if qmin is not None and qmax is not None:
xmin_index, xmax_index = np.searchsorted(x_vec, [qmin, qmax])
xmax_index = min(xmax_index, len(x_vec) - 1)
else:
# determine x min and x max that exists in all I(Q, wl) or I(phi, wl)
xmin_index, xmax_index = determine_common_domain_range_mesh(x_vec, i_array)
if factor is not None:
logger.notice(f"Using automated (xmin, xmax) finder with factor={factor}")
xmin_index, xmax_index = tune_xmin(xmin_index, xmax_index, i_array, factor=factor)
logger.notice(
f"Incoherent correction using xmin={x_vec[xmin_index]} xmax={x_vec[xmax_index]} "
f"with xmin_index={xmin_index}, xmax_index={xmax_index}"
)
# calculate B factors and errors
b_array, ref_wl_ie, ref_wl_index = calculate_b_factors(
wl_vec,
x_vec,
i_array,
error_array,
select_minimum_incoherence,
xmin_index,
xmax_index,
intensity_weighted=intensity_weighted,
)
# Correct 1D
i1d_type = getDataType(i1d)
corrected_1d = correct_incoherence_inelastic_1d(
wl_vec,
x_vec,
i_array,
error_array,
delta_x_array,
b_array,
xmin_index,
xmax_index,
ref_wl_ie,
output_wavelength_dependent_profile,
output_dir,
i1d_type,
)
# Correct 2D
if i_of_q_2d:
corrected_2d = correct_incoherence_inelastic_2d(i_of_q_2d, b_array)
else:
corrected_2d = None
return corrected_2d, corrected_1d
[docs]
def calculate_incoherence_correction_factors(
i1d_scalar_binned,
select_minimum_incoherence,
intensity_weighted=False,
qmin=None,
qmax=None,
factor=None,
):
"""Calculate wavelength-dependent incoherence correction factors from scalar-binned I(Q, λ)
This function calculates the b(λ) correction factors that are independent of binning mode.
It should be called with scalar-binned I(Q, λ) data to ensure consistent correction factors
regardless of whether the final output is scalar, wedge, or annular binning.
Parameters
----------
i1d_scalar_binned: ~drtsans.dataobjects.IQmod | ~drtsans.dataobjects.I1DAnnular
Scalar-binned I(Q, wavelength) data used to calculate correction factors
select_minimum_incoherence: bool
flag to determine correction B by minimum incoherence
intensity_weighted: bool
if set to true, the B factor is calculated using weighted function by intensity
qmin: float
manually set the qmin used for incoherent calculation
qmax: float
manually set the qmax used for incoherent calculation
factor: float
automatically determine the qmin qmax by checking the intensity profile
Returns
-------
CorrectionFactors
Named tuple containing b_factor, b_error, and wavelength arrays
"""
# Convert to mesh grid I(Q) and delta I(Q), or I(phi) for annular binning
wl_vec, x_vec, i_array, error_array, _ = reshape_intensity_domain_meshgrid(i1d_scalar_binned)
if qmin is not None and qmax is not None:
xmin_index, xmax_index = np.searchsorted(x_vec, [qmin, qmax])
xmax_index = min(xmax_index, len(x_vec) - 1)
else:
# determine x min and x max that exists in all I(Q, wl) or I(phi, wl)
xmin_index, xmax_index = determine_common_domain_range_mesh(x_vec, i_array)
if factor is not None:
logger.notice(f"Using automated (xmin, xmax) finder with factor={factor}")
xmin_index, xmax_index = tune_xmin(xmin_index, xmax_index, i_array, factor=factor)
logger.notice(
f"Incoherent correction using xmin={x_vec[xmin_index]} xmax={x_vec[xmax_index]} "
f"with xmin_index={xmin_index}, xmax_index={xmax_index}"
)
# calculate B factors and errors
b_array, _, _ = calculate_b_factors(
wl_vec,
x_vec,
i_array,
error_array,
select_minimum_incoherence,
xmin_index,
xmax_index,
intensity_weighted=intensity_weighted,
)
return CorrectionFactors(b_factor=b_array[0], b_error=b_array[1], wavelength=wl_vec)
[docs]
def apply_incoherence_correction_to_unbinned_data(
i_of_q_2d,
i_of_q_1d,
correction_factors,
):
"""Apply incoherence correction factors to unbinned I(Q, λ) and I(Qx, Qy, λ) data
This function applies pre-calculated b(λ) correction factors to unbinned data.
The corrected unbinned data can then be binned in any mode (scalar, wedge, annular)
and will produce consistent results.
Parameters
----------
i_of_q_2d: ~drtsans.dataobjects.IQazimuthal
Unbinned I(Qx, Qy, wavelength) data
i_of_q_1d: ~drtsans.dataobjects.IQmod
Unbinned I(Q, wavelength) data
correction_factors: CorrectionFactors
Correction factors from calculate_incoherence_correction_factors()
Returns
-------
tuple[IQazimuthal, IQmod]
Corrected unbinned I(Qx, Qy, λ) and I(Q, λ)
"""
from drtsans.dataobjects import IQmod
# Helper function to create numpy-based interpolation with edge handling
def _make_interp_fn(x, y):
"""
Create a 1D interpolation function using numpy.interp with
linear interpolation and edge handling equivalent to
scipy.interpolate.interp1d(..., bounds_error=False,
fill_value=(y[0], y[-1])).
Parameters
----------
x: array-like
x coordinates (must be sorted)
y: array-like
y values corresponding to x
Returns
-------
callable
Interpolation function that accepts new x values
"""
x = np.asarray(x)
y = np.asarray(y)
left = y[0]
right = y[-1]
def _interp(new_x):
return np.interp(new_x, x, y, left=left, right=right)
return _interp
# Create interpolation functions for b_factor and b_error
# Use linear interpolation, extrapolate with nearest values at edges
b_factor_interp = _make_interp_fn(
correction_factors.wavelength,
correction_factors.b_factor,
)
b_error_interp = _make_interp_fn(
correction_factors.wavelength,
correction_factors.b_error,
)
# Apply correction to 2D unbinned data
if i_of_q_2d is not None and len(i_of_q_2d.intensity) > 0:
# Interpolate b factors to match wavelengths in unbinned data
b_vals_2d = b_factor_interp(i_of_q_2d.wavelength)
b_errs_2d = b_error_interp(i_of_q_2d.wavelength)
# Apply correction: I_corrected = I - b(λ)
corrected_intensity_2d = i_of_q_2d.intensity - b_vals_2d
# Error propagation: σ²_corrected = σ²_I + σ²_b
# Note: We treat I and b as independent because:
# 1. b(λ) is calculated from scalar-binned I(Q, λ) at specific Q values within [qmin, qmax]
# 2. The unbinned data being corrected consists of individual events at arbitrary (Qx, Qy)
# values that were NOT used in the b(λ) calculation
# Therefore, the standard independent error propagation is appropriate here.
# This differs from the correlation-aware error model in correct_intensity_error(),
# which applies when correcting the SAME binned data used to calculate b(λ).
corrected_error_2d = np.sqrt(i_of_q_2d.error**2 + b_errs_2d**2)
corrected_i_of_q_2d = IQazimuthal(
intensity=corrected_intensity_2d,
error=corrected_error_2d,
qx=i_of_q_2d.qx,
qy=i_of_q_2d.qy,
wavelength=i_of_q_2d.wavelength,
delta_qx=i_of_q_2d.delta_qx,
delta_qy=i_of_q_2d.delta_qy,
)
else:
# Return input as-is if None or empty
corrected_i_of_q_2d = i_of_q_2d
# Apply correction to 1D unbinned data
if i_of_q_1d is not None and len(i_of_q_1d.intensity) > 0:
# Interpolate b factors to match wavelengths in unbinned data
b_vals_1d = b_factor_interp(i_of_q_1d.wavelength)
b_errs_1d = b_error_interp(i_of_q_1d.wavelength)
# Apply correction: I_corrected = I - b(λ)
corrected_intensity_1d = i_of_q_1d.intensity - b_vals_1d
# Error propagation: σ²_corrected = σ²_I + σ²_b (independent errors, see comment above)
corrected_error_1d = np.sqrt(i_of_q_1d.error**2 + b_errs_1d**2)
corrected_i_of_q_1d = IQmod(
intensity=corrected_intensity_1d,
error=corrected_error_1d,
mod_q=i_of_q_1d.mod_q,
delta_mod_q=i_of_q_1d.delta_mod_q,
wavelength=i_of_q_1d.wavelength,
)
else:
# Return input as-is if None or empty
corrected_i_of_q_1d = i_of_q_1d
return corrected_i_of_q_2d, corrected_i_of_q_1d
[docs]
def correct_incoherence_inelastic_1d(
wl_vec,
x_vec,
i_array,
error_array,
delta_x_array,
b_array,
xmin_index,
xmax_index,
ref_wl_ie,
output_wavelength_dependent_profile=False,
output_dir=None,
i1d_type=DataType.IQ_MOD,
):
"""
Parameters
----------
wl_vec: ~numpy.ndarray
1D vector of wavelength (in ascending order)
x_vec: ~numpy.ndarray
1D vector of Q or phi (in ascending order)
i_array: ~numpy.ndarray
2D array of intensities, shape[0] = number of Q or phi, shape[1] = number of wavelength
error_array: ~numpy.ndarray
2D array if intensity errors
delta_x_array: ~numpy.ndarray
2D array of errors, shape[0] = number of Q or phi, shape[1] = number of wavelength
b_array: ~numpy.ndarray
incoherence inelastic correction factor B, row 0: B factor, row 1: delta B
xmin_index: int
index of minimum common Q or phi in x vector
xmax_index: int
index of maximum common Q or phi in x vector
ref_wl_ie: ReferenceWavelength
the reference wavelength data used to calculate B
output_wavelength_dependent_profile: bool
If True then output Iq for each wavelength before and after b correction
output_dir: str
output directory for Iq profiles
i1d_type: DataType
the data type of the 1D intensity profile to output
Returns
-------
CorrectedI1D
I(Q1D) or I(phi) with inelastic incoherent correction applied
"""
if output_wavelength_dependent_profile and output_dir:
for tmpwlii, wl in enumerate(wl_vec):
tmpfn = os.path.join(output_dir, f"IQ_{wl:.3f}_before_b_correction.dat")
i1d_wl = build_i1d_one_wl_from_intensity_domain_meshgrid(
x_vec, i_array, error_array, delta_x_array, tmpwlii, i1d_type
)
save_i1d(i1d_wl, tmpfn)
# correct intensities and errors
corrected_intensities, corrected_errors = correct_intensity_error(
wl_vec, x_vec, i_array, error_array, b_array, xmin_index, xmax_index, ref_wl_ie
)
# construct the output and return
corrected_i1d = build_i1d_from_intensity_domain_meshgrid(
wl_vec, x_vec, corrected_intensities, corrected_errors, delta_x_array, i1d_type
)
if output_wavelength_dependent_profile and output_dir:
for tmpwlii, wl in enumerate(wl_vec):
tmpfn = os.path.join(output_dir, f"IQ_{wl:.3f}_after_b_correction.dat")
i1d_wl = build_i1d_one_wl_from_intensity_domain_meshgrid(
x_vec, corrected_intensities, corrected_errors, delta_x_array, tmpwlii, i1d_type
)
save_i1d(i1d_wl, tmpfn)
corrected = {
"i1d": corrected_i1d,
"b_factor": b_array[0],
"b_error": b_array[1],
}
return CorrectedI1D(**corrected)
def tune_xmin(xmin_idx, xmax_idx, i_arr, factor):
"""get I(xmin_index) and I(xmax_index) from the default xmin_index and
xmax_index if I(xmin_index) > 10 * I(xmax_index), then xmin_index
= xmin_index + 1 repeat comparison until I(xmin_index) <= factor*
I(xmax_index)
"""
assert xmin_idx < xmax_idx
imin = np.nansum(i_arr[xmin_idx])
imax = np.nansum(i_arr[xmax_idx])
if imin > factor * imax:
return tune_xmin(xmin_idx + 1, xmax_idx, i_arr, factor)
return (xmin_idx, xmax_idx)
def calculate_b_factors(
wl_vec,
x_vec,
intensity_array,
error_array,
select_min_incoherence,
xmin_index,
xmax_index,
intensity_weighted=False,
):
"""Determine reference wavelength and then calculate B factor, B error factor.
With option select_min_incoherence, reference wavelength will be reselected according to first
round of calculation of B
Parameters
----------
wl_vec: ~numpy.ndarray
wavelength vector
x_vec: ~numpy.ndarray
Q or phi vector
intensity_array: ~numpy.ndarray
intenisty 2D array
error_array: ~numpy.ndarray
intensity error 2D array
select_min_incoherence: bool
flag to apply select minimum incoherence algorithm
xmin_index: int
index of minimum common Q or phi
xmax_index: int
index of maximum common Q or phi (included)
intensity_weighted: bool
if set to true, the B factor is calculated using weighted function by intensity
Returns
-------
tuple[~numpy.ndarray, ReferenceWavelength, int]
- row 0: B factor, row 1: delta B
- the reference wavelength used to calculate B
- the index of the reference wavelength in the wavelength vector
"""
# Sanity check
assert intensity_array.shape == error_array.shape
assert wl_vec.shape[0] == intensity_array.shape[1]
assert x_vec.shape[0] == error_array.shape[0]
# determine the reference wavelength: minimum wavelength bin
ref_wl_ie = determine_reference_wavelength_intensity_mesh(
wl_vec, x_vec, intensity_array, error_array, xmin_index, xmax_index, 0
)
# calculate b(lambda_i) and delta b(lambda_i) if it is final
b_array = calculate_b_error_b(
wl_vec,
intensity_array,
error_array,
xmin_index,
xmax_index,
ref_wl_ie,
calculate_b_error=not select_min_incoherence,
intensity_weighted=intensity_weighted,
)
# If JSON parameter "selectMinIncoh" is true
if select_min_incoherence and np.argmin(b_array[0]) > 0:
# reselect reference wavelength to the wavelength bin with minimum b
ref_wl_index = np.argmin(b_array[0])
# (re)determine the reference wavelengths' intensities and errors
ref_wl_ie = determine_reference_wavelength_intensity_mesh(
wl_vec,
x_vec,
intensity_array,
error_array,
xmin_index,
xmax_index,
ref_wl_index,
)
# (re)calculate b array
b_array = calculate_b_error_b(
wl_vec,
intensity_array,
error_array,
xmin_index,
xmax_index,
ref_wl_ie,
calculate_b_error=True,
intensity_weighted=intensity_weighted,
)
"""
We would rather see where the negative values happen if it happens.
In addition, if we later decide to discard b(lambda) from last wavelength
bins for getting min(b[lambda]), b[lambda_max] or b[lambda_min] may have negative values.
"""
# verify
"""
assert (
b_array[np.isfinite(b_array)].min() >= -1e-20
), f"B array has negative values: {b_array}"
"""
else:
ref_wl_index = 0
return b_array, ref_wl_ie, ref_wl_index
def calculate_b_error_b(
wl_vec,
intensity_array,
error_array,
xmin_index,
xmax_index,
ref_wavelengths,
calculate_b_error,
intensity_weighted=False,
):
"""Calculate B factor and its error
Parameters
----------
wl_vec: ~numpy.ndarray
1D vector of wavelength (in ascending order)
x_vec: ~numpy.ndarray
1D vector of Q or phi (in ascending order)
intensity_array: ~numpy.ndarray
2D array of intensities, shape[0] = number of Q or phi, shape[1] = number of wavelength
error_array: ~numpy.ndarray
2D array of errors, shape[0] = number of Q or phi, shape[1] = number of wavelength
xmin_index: int
index of common Q or phi min (included)
xmax_index: int
index of common Q or phi max (included)
ref_wavelengths: ReferenceWavelengths
instance of ReferenceWavelengths containing intensities and errors
calculate_b_error: bool
flag to calculate B factor's error
intensity_weighted: bool
if set to true, the B factor is calculated using weighted function by intensity
Returns
-------
~numpy.ndarray
row 0: B factor, row 1: delta B
"""
# Declare B factor array
b_factor_array = np.zeros(shape=(2, len(wl_vec)), dtype="float")
print(f"Using intensity weighted B factor calculation: {intensity_weighted}")
from uncertainties import unumpy
if intensity_weighted:
# if use reference intensity to normalize the function then b value should be
# adjusted according to the I(Q) profile on each Q
# Equation from https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/-/issues/896
# Calculate B factors
# b[wl] =
# - 1/N sum_{q_k=q_min}^{q_max}(1/RefI(q_k)) * sum_{q_k=q_min}^{q_max} ([RefI(q_k) - I(q_k, wl)]/RefI(q_k))
num_x = xmax_index + 1 - xmin_index
# operation into a (num_x, num_wl) 2D array
ref_intensity_vec = unumpy.uarray(
ref_wavelengths.intensity_vec[xmin_index : xmax_index + 1].reshape((num_x, 1)),
ref_wavelengths.error_vec[xmin_index : xmax_index + 1].reshape((num_x, 1)),
)
intensity_vec = unumpy.uarray(
intensity_array[xmin_index : xmax_index + 1, :], error_array[xmin_index : xmax_index + 1, :]
)
b_vec = (ref_intensity_vec - intensity_vec) / ref_intensity_vec
b_array = -1.0 * np.sum(b_vec, axis=0) * 1 / np.sum(1 / ref_intensity_vec)
b_factor_array[0] = unumpy.nominal_values(b_array)
# Calculate B error (delta B) as an option
if calculate_b_error:
# delta b(wl)^2 = 1/N^2 sum_{q_k=qmin}^{qmax} [(delta I(q_k, ref_wl))^2 + (delta I(q_k, wl))^2]
# operation into a (num_x, num_wl) 2D array
b_factor_array[1] = unumpy.std_devs(b_array)
else:
# Calculate B factors
# b[wl] = - 1/N sum_{q_k=q_min}^{q_max} [RefI(q_k) - I(q_k, wl)]
num_x = xmax_index + 1 - xmin_index
# operation into a (num_x, num_wl) 2D array
b_vec = (
ref_wavelengths.intensity_vec[xmin_index : xmax_index + 1].reshape((num_x, 1))
- intensity_array[xmin_index : xmax_index + 1, :]
)
b_factor_array[0] = -1.0 / num_x * np.sum(b_vec, axis=0)
# Calculate B error (delta B) as an option
if calculate_b_error:
# delta b(wl)^2 = 1/N^2 sum_{q_k=qmin}^{qmax} [(delta I(q_k, ref_wl))^2 + (delta I(q_k, wl))^2]
# operation into a (num_x, num_wl) 2D array
b2_vec = (ref_wavelengths.error_vec[xmin_index : xmax_index + 1].reshape((num_x, 1))) ** 2 + (
error_array[xmin_index : xmax_index + 1, :]
) ** 2
b_factor_array[1] = 1.0 / num_x * np.sqrt(b2_vec.sum(axis=0))
return b_factor_array
def correct_intensity_error(
wavelength_vec,
x_vec,
intensity_array,
error_array,
b_array2d,
xmin_index,
xmax_index,
ref_wl_ie,
):
"""Correct intensity and error
Parameters
----------
wavelength_vec: ~numpy.ndarray
1D vector of wavelength (in ascending order)
x_vec: ~numpy.ndarray
1D vector of Q or phi (in ascending order)
intensity_array: ~numpy.ndarray
2D array of intensities, shape[0] = number of Q, shape[1] = number of wavelength
error_array: ~numpy.ndarray
2D array of errors, shape[0] = number of Q, shape[1] = number of wavelength
xmin_index: int
index of minimum common Q or phi (included)
xmax_index: int
index of maximum common Q or phi (included)
ref_wl_ie: ReferenceWavelengths
instance of ReferenceWavelengths containing intensities and errors
b_array2d: ~numpy.ndarray
2D numpy array for B[wavelength], B error[wavelength]
Returns
-------
tuple
2D arrays for (1) corrected intensities (2) corrected intensity errors
"""
# Sanity checks
assert intensity_array.shape == error_array.shape
assert wavelength_vec.shape[0] == intensity_array.shape[1]
assert x_vec.shape[0] == error_array.shape[0]
assert len(b_array2d.shape) == 2 and b_array2d.shape[0] == 2, (
f"Expected input B and B error but not of shape {b_array2d.shape}"
)
assert b_array2d.shape[1] == wavelength_vec.shape[0]
# Init data structure
num_common_q = xmax_index - xmin_index + 1
corrected_intensities = np.zeros_like(intensity_array)
corrected_errors = np.zeros_like(error_array)
# Loop around wavelength
for i_wl in range(wavelength_vec.shape[0]):
# Correct intensity: I'(q, wl_i) = I(q, wl_i) - b(wl_i)
corrected_intensities[:, i_wl] = intensity_array[:, i_wl] - b_array2d[0][i_wl]
# Correct intensity error
# outside q_min and q_max
# term1[q_j] = (error[q_j, wl]^2
term1 = error_array[:, i_wl] ** 2
# term2 = 1/N^2 sum_{q_k=q_min:qmax}[RefError(q_k)^2 + error(q_k, wl)^2]
# term2 is a single value
term2 = (
1.0
/ num_common_q**2
* np.sum(
ref_wl_ie.error_vec[xmin_index : xmax_index + 1] ** 2
+ error_array[xmin_index : xmax_index + 1, i_wl] ** 2
)
)
# make correct term1 for those inside qmin and qmax
# term1[q_j] = (error[q_j, wl]^2 * (1 - 2/N)
term1[xmin_index : xmax_index + 1] *= 1 - 2 / num_common_q
# sum
term1 += term2
# assign to output
corrected_errors[:, i_wl] = term1
return corrected_intensities, np.sqrt(corrected_errors)
[docs]
def inelastic_correction(
iq2d_unbinned: IQazimuthal,
iq1d_unbinned: IQmod,
num_x_bins: int,
num_y_bins: int,
num_q1d_bins: int,
num_q1d_bins_per_decade: int,
decade_on_center: bool,
bin1d_type: str,
log_binning: bool,
user_qmin: Optional[float],
user_qmax: Optional[float],
annular_bin: float,
wedges: List[Tuple[int, int]],
symmetric_wedges: bool,
weighted_errors: bool,
select_min_incoherence: bool,
intensity_weighted: bool,
incoh_qmin: Optional[float],
incoh_qmax: Optional[float],
incoh_factor: Optional[float],
output_wavelength_profile: bool,
output_dir: str,
output_filename: str,
raw_name: str,
) -> Tuple[IQazimuthal, IQmod]:
"""Apply inelastic/incoherence correction to unbinned I(Q) data.
This function encapsulates all inelastic correction logic:
1. Determines Q ranges from data if not provided by user
2. Temporarily bins data in scalar mode to calculate b(λ) factors
3. Saves b(λ) to file
4. Applies b(λ) correction to unbinned sample data
5. Returns corrected unbinned data
Parameters
----------
iq2d_unbinned : IQazimuthal
Unbinned 2D I(Qx, Qy) sample data
iq1d_unbinned : IQmod
Unbinned 1D I(Q) sample data
num_x_bins : int
Number of Qx bins for temporary binning
num_y_bins : int
Number of Qy bins for temporary binning
num_q1d_bins : int
Number of Q bins for temporary 1D binning
num_q1d_bins_per_decade : int
Number of bins per decade for logarithmic binning
decade_on_center : bool
Whether decade boundaries are on bin centers
bin1d_type : str
Type of 1D binning ('scalar', 'wedge', 'annular')
log_binning : bool
Whether to use logarithmic binning
user_qmin : float, optional
Minimum Q value for binning (if None, determined from data)
user_qmax : float, optional
Maximum Q value for binning (if None, determined from data)
annular_bin : float
Width of annular bin in degrees
wedges : list
List of (angle_min, angle_max) tuples for wedges
symmetric_wedges : bool
Whether to add symmetric wedges
weighted_errors : bool
Whether to use error-weighted binning
select_min_incoherence : bool
Flag to determine correction B by minimum incoherence
intensity_weighted : bool
Whether to use intensity-weighted B factor calculation
incoh_qmin : float, optional
Minimum Q for incoherence correction calculation
incoh_qmax : float, optional
Maximum Q for incoherence correction calculation
incoh_factor : float, optional
Factor for automatic Q range determination
output_wavelength_profile : bool
If True, output I(Q) for each wavelength before and after b correction
output_dir : str
Output directory for correction files
output_filename : str
Base filename for output
raw_name : str
Prefix for correction file names
Returns
-------
tuple
(corrected_iq2d_unbinned, corrected_iq1d_unbinned)
"""
# Import bin_all here to avoid circular import
from drtsans.iq import bin_all
from drtsans.tof.eqsans.correction_api import save_b_factor
logger.notice("Applying inelastic/incoherent correction")
# Determine Q ranges from data if not provided by user
qmin = user_qmin if user_qmin is not None else iq1d_unbinned.mod_q.min()
qmax = user_qmax if user_qmax is not None else iq1d_unbinned.mod_q.max()
qxrange = (np.min(iq2d_unbinned.qx), np.max(iq2d_unbinned.qx))
qyrange = (np.min(iq2d_unbinned.qy), np.max(iq2d_unbinned.qy))
# Temporarily bin data in scalar mode to calculate b(λ) factors
# These binned results are ONLY used for factor calculation, then discarded
iq2d_temp_binned, iq1d_temp_binned = bin_all(
iq2d_unbinned,
iq1d_unbinned,
num_x_bins,
num_y_bins,
n1dbins=num_q1d_bins,
n1dbins_per_decade=num_q1d_bins_per_decade,
decade_on_center=decade_on_center,
bin1d_type="scalar" if bin1d_type == "wedge" else bin1d_type,
log_scale=log_binning,
qmin=qmin,
qmax=qmax,
qxrange=qxrange,
qyrange=qyrange,
annular_angle_bin=annular_bin,
wedges=wedges,
symmetric_wedges=symmetric_wedges,
error_weighted=weighted_errors,
n_wavelength_bin=None,
)
if len(iq1d_temp_binned) != 1:
raise NotImplementedError("Expected exactly one IQmod from temporary binning")
# Calculate b(λ) correction factors from scalar-binned I(Q, λ)
logger.notice("Calculating inelastic/incoherent correction factors from scalar-binned I(Q, lambda)")
correction_factors = calculate_incoherence_correction_factors(
iq1d_temp_binned[0],
select_min_incoherence,
intensity_weighted,
incoh_qmin,
incoh_qmax,
incoh_factor,
)
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Save b(λ) to file
WavelengthContainer = namedtuple("WavelengthContainer", ["wavelength"])
wl_container = WavelengthContainer(wavelength=correction_factors.wavelength)
save_b_factor(
CorrectedI1D(wl_container, correction_factors.b_factor, correction_factors.b_error),
os.path.join(output_dir, f"{output_filename}_inelastic_b1d_{raw_name}.dat"),
)
# Output wavelength-dependent I(Q) profiles if requested
if output_wavelength_profile:
os.makedirs(output_dir, exist_ok=True)
logger.notice("Writing wavelength-dependent I(Q) profiles before and after b correction")
# Get the binned I(Q, λ) data and reshape to meshgrid format
iq1d_binned = iq1d_temp_binned[0]
wl_vec, x_vec, i_array, error_array, delta_x_array = reshape_intensity_domain_meshgrid(iq1d_binned)
# Get data type (IQmod or I1DAnnular)
i1d_type = getDataType(iq1d_binned)
# Write "before b correction" files for each wavelength
for tmpwlii, wl in enumerate(wl_vec):
tmpfn = os.path.join(output_dir, f"IQ_{wl:.3f}_before_b_correction.dat")
i1d_wl = build_i1d_one_wl_from_intensity_domain_meshgrid(
x_vec, i_array, error_array, delta_x_array, tmpwlii, i1d_type
)
save_i1d(i1d_wl, tmpfn)
# Apply b correction to intensities and errors
b_factor_array = correction_factors.b_factor
b_error_array = correction_factors.b_error
# Correct each wavelength slice
corrected_i_array = i_array.copy()
corrected_error_array = error_array.copy()
for wl_idx in range(len(wl_vec)):
corrected_i_array[:, wl_idx] = i_array[:, wl_idx] - b_factor_array[wl_idx]
# Error propagation: σ²_corrected = σ²_intensity + σ²_b
corrected_error_array[:, wl_idx] = np.sqrt(error_array[:, wl_idx] ** 2 + b_error_array[wl_idx] ** 2)
# Write "after b correction" files for each wavelength
for tmpwlii, wl in enumerate(wl_vec):
tmpfn = os.path.join(output_dir, f"IQ_{wl:.3f}_after_b_correction.dat")
i1d_wl = build_i1d_one_wl_from_intensity_domain_meshgrid(
x_vec, corrected_i_array, corrected_error_array, delta_x_array, tmpwlii, i1d_type
)
save_i1d(i1d_wl, tmpfn)
# Apply b(λ) to unbinned sample data
logger.notice("Applying inelastic/incoherent correction to unbinned data")
iq2d_corrected, iq1d_corrected = apply_incoherence_correction_to_unbinned_data(
iq2d_unbinned, iq1d_unbinned, correction_factors
)
return iq2d_corrected, iq1d_corrected