"""
This module provides the functionality to correct I(Q, wavelength) accounting for wavelength-dependent incoherent
inelastic scattering, for both 1D and 2D data (despite its name).
"""
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/-/issues/689
# Step 3: Correct wave-dependent incoherence intensity for I(Q, wavelength)
from drtsans.tof.eqsans.elastic_reference_normalization import (
build_i1d_from_intensity_domain_meshgrid,
build_i1d_one_wl_from_intensity_domain_meshgrid,
determine_common_domain_range_mesh,
determine_reference_wavelength_intensity_mesh,
reshape_intensity_domain_meshgrid,
)
from drtsans.tof.eqsans.incoherence_correction_2d import correct_incoherence_inelastic_2d
from drtsans.dataobjects import getDataType, DataType, save_i1d
from collections import namedtuple
from mantid.kernel import logger
import numpy as np
import os
__all__ = ["correct_incoherence_inelastic_all", "correct_incoherence_inelastic_1d", "CorrectedI1D"]
# Output of corrected 1D case
CorrectedI1D = namedtuple("CorrectedI1D", "i1d b_factor b_error")
[docs]
def correct_incoherence_inelastic_all(
i_of_q_2d,
i1d,
select_minimum_incoherence,
intensity_weighted=False,
qmin=None,
qmax=None,
factor=None,
output_wavelength_dependent_profile=False,
output_dir=None,
):
"""Correct I(Q2D) and I(1D) accounting wavelength dependant incoherent inelastic scattering
This is the envelope method for the complete workflow to correct I(1D) and I(Q2D) accounting
wavelength-dependent incoherent inelastic scattering
Parameters
----------
i_of_q_2d: ~drtsans.dataobjects.IQazimuthal or None
I(Q2D, wavelength) and error
i1d: ~drtsans.dataobjects.IQmod | ~drtsans.dataobjects.I1DAnnular
I(1D, wavelength) and error
select_minimum_incoherence: bool
flag to determine correction B by minimum incoherence
intensity_weighted: bool
if set to true, the B factor is calculated using weighted function by intensity
qmin: float
manually set the qmin used for incoherent calculation
qmax: float
manually set the qmax used for incoherent calculation
factor: float
automatically determine the qmin qmax by checking the intensity profile
output_wavelength_dependent_profile: bool
If True then output intensity profile for each wavelength before and after b correction
output_dir: str
output directory for intensity profiles
Returns
-------
tuple[CorrectedIQ2D, CorrectedI1D]
"""
# Convert to mesh grid I(Q) and delta I(Q), or I(phi) for annular binning
# x is either Q or phi
wl_vec, x_vec, i_array, error_array, delta_x_array = reshape_intensity_domain_meshgrid(i1d)
if qmin is not None and qmax is not None:
xmin_index, xmax_index = np.searchsorted(x_vec, [qmin, qmax])
xmax_index = min(xmax_index, len(x_vec) - 1)
else:
# determine x min and x max that exists in all I(Q, wl) or I(phi, wl)
xmin_index, xmax_index = determine_common_domain_range_mesh(x_vec, i_array)
if factor is not None:
logger.notice(f"Using automated (xmin, xmax) finder with factor={factor}")
xmin_index, xmax_index = tune_xmin(xmin_index, xmax_index, i_array, factor=factor)
logger.notice(
f"Incoherent correction using xmin={x_vec[xmin_index]} xmax={x_vec[xmax_index]} "
f"with xmin_index={xmin_index}, xmax_index={xmax_index}"
)
# calculate B factors and errors
b_array, ref_wl_ie, ref_wl_index = calculate_b_factors(
wl_vec,
x_vec,
i_array,
error_array,
select_minimum_incoherence,
xmin_index,
xmax_index,
intensity_weighted=intensity_weighted,
)
# Correct 1D
i1d_type = getDataType(i1d)
corrected_1d = correct_incoherence_inelastic_1d(
wl_vec,
x_vec,
i_array,
error_array,
delta_x_array,
b_array,
xmin_index,
xmax_index,
ref_wl_ie,
output_wavelength_dependent_profile,
output_dir,
i1d_type,
)
# Correct 2D
if i_of_q_2d:
corrected_2d = correct_incoherence_inelastic_2d(i_of_q_2d, b_array, ref_wl_index)
else:
corrected_2d = None
return corrected_2d, corrected_1d
[docs]
def correct_incoherence_inelastic_1d(
wl_vec,
x_vec,
i_array,
error_array,
delta_x_array,
b_array,
xmin_index,
xmax_index,
ref_wl_ie,
output_wavelength_dependent_profile=False,
output_dir=None,
i1d_type=DataType.IQ_MOD,
):
"""
Parameters
----------
wl_vec: ~numpy.ndarray
1D vector of wavelength (in ascending order)
x_vec: ~numpy.ndarray
1D vector of Q or phi (in ascending order)
i_array: ~numpy.ndarray
2D array of intensities, shape[0] = number of Q or phi, shape[1] = number of wavelength
error_array: ~numpy.ndarray
2D array if intensity errors
delta_x_array: ~numpy.ndarray
2D array of errors, shape[0] = number of Q or phi, shape[1] = number of wavelength
b_array: ~numpy.ndarray
incoherence inelastic correction factor B, row 0: B factor, row 1: delta B
xmin_index: int
index of minimum common Q or phi in x vector
xmax_index: int
index of maximum common Q or phi in x vector
ref_wl_ie: ReferenceWavelength
the reference wavelength data used to calculate B
output_wavelength_dependent_profile: bool
If True then output Iq for each wavelength before and after b correction
output_dir: str
output directory for Iq profiles
i1d_type: DataType
the data type of the 1D intensity profile to output
Returns
-------
CorrectedI1D
I(Q1D) or I(phi) with inelastic incoherent correction applied
"""
if output_wavelength_dependent_profile and output_dir:
for tmpwlii, wl in enumerate(wl_vec):
tmpfn = os.path.join(output_dir, f"IQ_{wl:.3f}_before_b_correction.dat")
i1d_wl = build_i1d_one_wl_from_intensity_domain_meshgrid(
x_vec, i_array, error_array, delta_x_array, tmpwlii, i1d_type
)
save_i1d(i1d_wl, tmpfn)
# correct intensities and errors
corrected_intensities, corrected_errors = correct_intensity_error(
wl_vec, x_vec, i_array, error_array, b_array, xmin_index, xmax_index, ref_wl_ie
)
# construct the output and return
corrected_i1d = build_i1d_from_intensity_domain_meshgrid(
wl_vec, x_vec, corrected_intensities, corrected_errors, delta_x_array, i1d_type
)
if output_wavelength_dependent_profile and output_dir:
for tmpwlii, wl in enumerate(wl_vec):
tmpfn = os.path.join(output_dir, f"IQ_{wl:.3f}_after_b_correction.dat")
i1d_wl = build_i1d_one_wl_from_intensity_domain_meshgrid(
x_vec, corrected_intensities, corrected_errors, delta_x_array, tmpwlii, i1d_type
)
save_i1d(i1d_wl, tmpfn)
corrected = {
"i1d": corrected_i1d,
"b_factor": b_array[0],
"b_error": b_array[1],
}
return CorrectedI1D(**corrected)
def tune_xmin(xmin_idx, xmax_idx, i_arr, factor):
"""get I(xmin_index) and I(xmax_index) from the default xmin_index and
xmax_index if I(xmin_index) > 10 * I(xmax_index), then xmin_index
= xmin_index + 1 repeat comparison until I(xmin_index) <= factor*
I(xmax_index)
"""
assert xmin_idx < xmax_idx
imin = np.nansum(i_arr[xmin_idx])
imax = np.nansum(i_arr[xmax_idx])
if imin > factor * imax:
return tune_xmin(xmin_idx + 1, xmax_idx, i_arr, factor)
return (xmin_idx, xmax_idx)
def calculate_b_factors(
wl_vec,
x_vec,
intensity_array,
error_array,
select_min_incoherence,
xmin_index,
xmax_index,
intensity_weighted=False,
):
"""Determine reference wavelength and then calculate B factor, B error factor.
With option select_min_incoherence, reference wavelength will be reselected according to first
round of calculation of B
Parameters
----------
wl_vec: ~numpy.ndarray
wavelength vector
x_vec: ~numpy.ndarray
Q or phi vector
intensity_array: ~numpy.ndarray
intenisty 2D array
error_array: ~numpy.ndarray
intensity error 2D array
select_min_incoherence: bool
flag to apply select minimum incoherence algorithm
xmin_index: int
index of minimum common Q or phi
xmax_index: int
index of maximum common Q or phi (included)
intensity_weighted: bool
if set to true, the B factor is calculated using weighted function by intensity
Returns
-------
tuple[~numpy.ndarray, ReferenceWavelength, int]
- row 0: B factor, row 1: delta B
- the reference wavelength used to calculate B
- the index of the reference wavelength in the wavelength vector
"""
# Sanity check
assert intensity_array.shape == error_array.shape
assert wl_vec.shape[0] == intensity_array.shape[1]
assert x_vec.shape[0] == error_array.shape[0]
# determine the reference wavelength: minimum wavelength bin
ref_wl_ie = determine_reference_wavelength_intensity_mesh(
wl_vec, x_vec, intensity_array, error_array, xmin_index, xmax_index, 0
)
# calculate b(lambda_i) and delta b(lambda_i) if it is final
b_array = calculate_b_error_b(
wl_vec,
intensity_array,
error_array,
xmin_index,
xmax_index,
ref_wl_ie,
calculate_b_error=not select_min_incoherence,
intensity_weighted=intensity_weighted,
)
# If JSON parameter “selectMinIncoh” is true
if select_min_incoherence and np.argmin(b_array[0]) > 0:
# reselect reference wavelength to the wavelength bin with minimum b
ref_wl_index = np.argmin(b_array[0])
# (re)determine the reference wavelengths' intensities and errors
ref_wl_ie = determine_reference_wavelength_intensity_mesh(
wl_vec,
x_vec,
intensity_array,
error_array,
xmin_index,
xmax_index,
ref_wl_index,
)
# (re)calculate b array
b_array = calculate_b_error_b(
wl_vec,
intensity_array,
error_array,
xmin_index,
xmax_index,
ref_wl_ie,
calculate_b_error=True,
intensity_weighted=intensity_weighted,
)
"""
We would rather see where the negative values happen if it happens.
In addition, if we later decide to discard b(lambda) from last wavelength
bins for getting min(b[lambda]), b[lambda_max] or b[lambda_min] may have negative values.
"""
# verify
"""
assert (
b_array[np.isfinite(b_array)].min() >= -1e-20
), f"B array has negative values: {b_array}"
"""
else:
ref_wl_index = 0
return b_array, ref_wl_ie, ref_wl_index
def calculate_b_error_b(
wl_vec,
intensity_array,
error_array,
xmin_index,
xmax_index,
ref_wavelengths,
calculate_b_error,
intensity_weighted=False,
):
"""Calculate B factor and its error
Parameters
----------
wl_vec: ~numpy.ndarray
1D vector of wavelength (in ascending order)
x_vec: ~numpy.ndarray
1D vector of Q or phi (in ascending order)
intensity_array: ~numpy.ndarray
2D array of intensities, shape[0] = number of Q or phi, shape[1] = number of wavelength
error_array: ~numpy.ndarray
2D array of errors, shape[0] = number of Q or phi, shape[1] = number of wavelength
xmin_index: int
index of common Q or phi min (included)
xmax_index: int
index of common Q or phi max (included)
ref_wavelengths: ReferenceWavelengths
instance of ReferenceWavelengths containing intensities and errors
calculate_b_error: bool
flag to calculate B factor's error
intensity_weighted: bool
if set to true, the B factor is calculated using weighted function by intensity
Returns
-------
~numpy.ndarray
row 0: B factor, row 1: delta B
"""
# Declare B factor array
b_factor_array = np.zeros(shape=(2, len(wl_vec)), dtype="float")
print(f"Using intensity weighted B factor calculation: {intensity_weighted}")
from uncertainties import unumpy
if intensity_weighted:
# if use reference intensity to normalize the function then b value should be
# adjusted according to the I(Q) profile on each Q
# Equation from https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/-/issues/896
# Calculate B factors
# b[wl] =
# - 1/N sum_{q_k=q_min}^{q_max}(1/RefI(q_k)) * sum_{q_k=q_min}^{q_max} ([RefI(q_k) - I(q_k, wl)]/RefI(q_k))
num_x = xmax_index + 1 - xmin_index
# operation into a (num_x, num_wl) 2D array
ref_intensity_vec = unumpy.uarray(
ref_wavelengths.intensity_vec[xmin_index : xmax_index + 1].reshape((num_x, 1)),
ref_wavelengths.error_vec[xmin_index : xmax_index + 1].reshape((num_x, 1)),
)
intensity_vec = unumpy.uarray(
intensity_array[xmin_index : xmax_index + 1, :], error_array[xmin_index : xmax_index + 1, :]
)
b_vec = (ref_intensity_vec - intensity_vec) / ref_intensity_vec
b_array = -1.0 * np.sum(b_vec, axis=0) * 1 / np.sum(1 / ref_intensity_vec)
b_factor_array[0] = unumpy.nominal_values(b_array)
# Calculate B error (delta B) as an option
if calculate_b_error:
# delta b(wl)^2 = 1/N^2 sum_{q_k=qmin}^{qmax} [(delta I(q_k, ref_wl))^2 + (delta I(q_k, wl))^2]
# operation into a (num_x, num_wl) 2D array
b_factor_array[1] = unumpy.std_devs(b_array)
else:
# Calculate B factors
# b[wl] = - 1/N sum_{q_k=q_min}^{q_max} [RefI(q_k) - I(q_k, wl)]
num_x = xmax_index + 1 - xmin_index
# operation into a (num_x, num_wl) 2D array
b_vec = (
ref_wavelengths.intensity_vec[xmin_index : xmax_index + 1].reshape((num_x, 1))
- intensity_array[xmin_index : xmax_index + 1, :]
)
b_factor_array[0] = -1.0 / num_x * np.sum(b_vec, axis=0)
# Calculate B error (delta B) as an option
if calculate_b_error:
# delta b(wl)^2 = 1/N^2 sum_{q_k=qmin}^{qmax} [(delta I(q_k, ref_wl))^2 + (delta I(q_k, wl))^2]
# operation into a (num_x, num_wl) 2D array
b2_vec = (ref_wavelengths.error_vec[xmin_index : xmax_index + 1].reshape((num_x, 1))) ** 2 + (
error_array[xmin_index : xmax_index + 1, :]
) ** 2
b_factor_array[1] = 1.0 / num_x * np.sqrt(b2_vec.sum(axis=0))
return b_factor_array
def correct_intensity_error(
wavelength_vec,
x_vec,
intensity_array,
error_array,
b_array2d,
xmin_index,
xmax_index,
ref_wl_ie,
):
"""Correct intensity and error
Parameters
----------
wavelength_vec: ~numpy.ndarray
1D vector of wavelength (in ascending order)
x_vec: ~numpy.ndarray
1D vector of Q or phi (in ascending order)
intensity_array: ~numpy.ndarray
2D array of intensities, shape[0] = number of Q, shape[1] = number of wavelength
error_array: ~numpy.ndarray
2D array of errors, shape[0] = number of Q, shape[1] = number of wavelength
xmin_index: int
index of minimum common Q or phi (included)
xmax_index: int
index of maximum common Q or phi (included)
ref_wl_ie: ReferenceWavelengths
instance of ReferenceWavelengths containing intensities and errors
b_array2d: ~numpy.ndarray
2D numpy array for B[wavelength], B error[wavelength]
Returns
-------
tuple
2D arrays for (1) corrected intensities (2) corrected intensity errors
"""
# Sanity checks
assert intensity_array.shape == error_array.shape
assert wavelength_vec.shape[0] == intensity_array.shape[1]
assert x_vec.shape[0] == error_array.shape[0]
assert len(b_array2d.shape) == 2 and b_array2d.shape[0] == 2, (
f"Expected input B and B error but not of shape {b_array2d.shape}"
)
assert b_array2d.shape[1] == wavelength_vec.shape[0]
# Init data structure
num_common_q = xmax_index - xmin_index + 1
corrected_intensities = np.zeros_like(intensity_array)
corrected_errors = np.zeros_like(error_array)
# Loop around wavelength
for i_wl in range(wavelength_vec.shape[0]):
# Correct intensity: I'(q, wl_i) = I(q, wl_i) - b(wl_i)
corrected_intensities[:, i_wl] = intensity_array[:, i_wl] - b_array2d[0][i_wl]
# Correct intensity error
# outside q_min and q_max
# term1[q_j] = (error[q_j, wl]^2
term1 = error_array[:, i_wl] ** 2
# term2 = 1/N^2 sum_{q_k=q_min:qmax}[RefError(q_k)^2 + error(q_k, wl)^2]
# term2 is a single value
term2 = (
1.0
/ num_common_q**2
* np.sum(
ref_wl_ie.error_vec[xmin_index : xmax_index + 1] ** 2
+ error_array[xmin_index : xmax_index + 1, i_wl] ** 2
)
)
# make correct term1 for those inside qmin and qmax
# term1[q_j] = (error[q_j, wl]^2 * (1 - 2/N)
term1[xmin_index : xmax_index + 1] *= 1 - 2 / num_common_q
# sum
term1 += term2
# assign to output
corrected_errors[:, i_wl] = term1
return corrected_intensities, np.sqrt(corrected_errors)