Source code for drtsans.tof.eqsans.elastic_correction

# Main method in this module implement step 2 of
# wavelength dependent inelastic incoherent scattering correction
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/-/issues/689
import os
from dataclasses import dataclass
from typing import Optional, Tuple, List

import numpy as np
from mantid.kernel import logger

from drtsans.dataobjects import IQazimuthal, IQmod, verify_same_q_bins, I1DAnnular, save_i1d, getDataType, DataType

__all__ = [
    "normalize_by_elastic_reference_all",
    "normalize_by_elastic_reference_1d",
    "normalize_by_elastic_reference_2d",
    "apply_elastic_normalization_to_unbinned_data",
    "determine_reference_wavelength_intensity_mesh",
    "reshape_intensity_domain_meshgrid",
    "build_i1d_from_intensity_domain_meshgrid",
    "build_i1d_one_wl_from_intensity_domain_meshgrid",
    "determine_common_domain_range_mesh",
    "elastic_correction",
]


@dataclass
class ReferenceWavelengths:
    """
    Class for keeping track of reference wavelength for each bin (Q or phi)

    Parameters
    ----------
    x_vec: ~numpy.ndarray
        vector for Q or phi
    ref_wl_vec: ~numpy.ndarray
        vector for reference wavelength vector for each Q or phi
    intensity_vec: ~numpy.ndarray
        vector for intensities of (Q, reference wavelength) or (phi, reference wavelength)
    error_vec: ~numpy.ndarray
        vector for errors of (Q, reference wavelength) or (phi, reference wavelength)
    """

    x_vec: np.ndarray
    ref_wl_vec: np.ndarray
    intensity_vec: np.ndarray
    error_vec: np.ndarray


[docs] def reshape_intensity_domain_meshgrid(i1d: IQmod | I1DAnnular) -> tuple: """Reshape I(Q)/I(phi) into a mesh grid of (Q, wavelength) or (phi, wavelength) Parameters ---------- i1d: IQmod | I1DAnnular Input I(Q, wavelength) or I(phi, wavelength) to find common range from Returns ------- tuple wavelength vector, Q/phi vector, intensity (2D), error (2D), dq array (2D) or None """ # Create a matrix for Q/phi, wavelength, intensity and error # x is either momentum transfer, Q, or azimuthal angle, phi if i1d.delta_x is None: i_x_wl_matrix = np.array([i1d.x, i1d.wavelength, i1d.intensity, i1d.error]) else: i_x_wl_matrix = np.array( [ i1d.x, i1d.wavelength, i1d.intensity, i1d.error, i1d.delta_x, ] ) i_x_wl_matrix = i_x_wl_matrix.transpose() # Order by wavelength and x i_x_wl_matrix = i_x_wl_matrix[np.lexsort((i_x_wl_matrix[:, 1], i_x_wl_matrix[:, 0]))] # Unique wavelength and unique x wl_vector = np.unique(i1d.wavelength) x_vector = np.unique(i1d.x) # verify whether (x, wl) are on mesh grid by checking unique x and wavelength assert wl_vector.shape[0] * x_vector.shape[0] == i1d.intensity.shape[0] # Reformat intensity_array = i_x_wl_matrix[:, 2].reshape((x_vector.shape[0], wl_vector.shape[0])) error_array = i_x_wl_matrix[:, 3].reshape((x_vector.shape[0], wl_vector.shape[0])) if i1d.delta_x is not None: delta_x_array = i_x_wl_matrix[:, 4].reshape((x_vector.shape[0], wl_vector.shape[0])) else: delta_x_array = None return wl_vector, x_vector, intensity_array, error_array, delta_x_array
[docs] def normalize_by_elastic_reference_all( i_of_q_2d, i_1d, ref_i_1d, output_wavelength_dependent_profile=False, output_dir=None ): """Normalize I(Q2D) and I(1D) by elastic reference run Parameters ---------- i_of_q_2d: ~drtsans.dataobjects.IQazimuthal Input I(Q2D, wavelength) to normalize i_1d: ~drtsans.dataobjects.IQmod | ~drtsans.dataobjects.I1DAnnular Input I(Q1D, wavelength) or I(phi, wavelength) to normalize ref_i_1d: ~drtsans.dataobjects.IQmod | ~drtsans.dataobjects.I1DAnnular Input I(Q1D, wavelength) or I(phi, wavelength) as elastic reference run output_wavelength_dependent_profile: bool If True then output I(1D) for each wavelength before and after k correction output_dir: str output directory for I(1D) profiles Returns ------- tuple normalized I(Q2D), normalized I(1D), K vector and delta K vector """ # check i_of_q and ref_i_of_q shall have same binning if not verify_same_q_bins(i_1d, ref_i_1d, False, tolerance=1e-3): raise RuntimeError("Input I(1D) and elastic reference I(1D) have different binning") wl_vec = np.unique(i_1d.wavelength) # x is either momentum transfer, Q, or azimuthal angle, phi x_vec = np.unique(i_1d.x) # Calculate the normalization factor for each wavelength k_vec, k_error_vec = calculate_elastic_reference_normalization(wl_vec, x_vec, ref_i_1d) # 1D normalization i1d_wl = normalize_by_elastic_reference_1d( i_1d, k_vec, k_error_vec, output_wavelength_dependent_profile, output_dir, ) # 2D normalization iq2d_wl = normalize_by_elastic_reference_2d(i_of_q_2d, k_vec, k_error_vec) return iq2d_wl, i1d_wl, k_vec, k_error_vec
def calculate_elastic_reference_normalization(wl_vec, x_vec, ref_i_1d): """Calculate the elastic reference normalization factor (K) for each wavelength Parameters ---------- wl_vec: ~numpy.ndarray Vector of wavelengths in I(1D) to normalize x_vec: ~numpy.ndarray Vector of Q:s, or phi:s, in I(1D) to normalize ref_i_1d: ~drtsans.dataobjects.IQmod | ~drtsans.dataobjects.I1DAnnular Elastic reference run I(Q, wavelength) or I(phi, wavelength) Returns ------- tuple K vector and delta K vector """ # Reshape Q/phi, wavelength, intensities and errors to unique 1D array or 2D array _, _, ref_i_array, ref_error_array, _ = reshape_intensity_domain_meshgrid(ref_i_1d) # Calculate Qmin and Qmax qmin_index, qmax_index = determine_common_domain_range_mesh(x_vec, ref_i_array) # Calculate reference ref_wl_ie = determine_reference_wavelength_intensity_mesh( wl_vec, x_vec, ref_i_array, ref_error_array, qmin_index, qmax_index ) # Calculate scale factor k_vec, k_error_vec = calculate_scale_factor_mesh_grid( wl_vec, ref_i_array, ref_error_array, ref_wl_ie, qmin_index, qmax_index ) return k_vec, k_error_vec
[docs] def normalize_by_elastic_reference_1d( i1d: IQmod | I1DAnnular, k_vec: np.ndarray, k_error_vec: np.ndarray, output_wavelength_dependent_profile: bool = False, output_dir: Optional[str] = None, ) -> IQmod | I1DAnnular: """Normalize I(Q1D) or I(phi) by wavelength-dependent elastic reference normalization factor Parameters ---------- i1d: IQmod | I1DAnnular Input I(Q, wavelength) or I(phi, wavelength) to normalize k_vec: ~numpy.ndarray Elastic reference normalization factors (one for each wavelength) k_error_vec: ~numpy.ndarray Elastic reference normalization factor errors (one for each wavelength) output_wavelength_dependent_profile: bool If True then output I for each wavelength before and after k correction output_dir: str output directory for intensity profiles Returns ------- tuple normalized I(Q1D) or I(phi), K vector and delta K vector """ # Reshape Q/phi, wavelength, intensities and errors to unique 1D array or 2D array wl_vec, x_vec, i_array, error_array, dq_array = reshape_intensity_domain_meshgrid(i1d) i1d_type = getDataType(i1d) if output_wavelength_dependent_profile and output_dir: os.makedirs(output_dir, exist_ok=True) for tmpwlii, wl in enumerate(wl_vec): tmpfn = os.path.join(output_dir, f"IQ_{wl:.3f}_before_k_correction.dat") i1d_wl = build_i1d_one_wl_from_intensity_domain_meshgrid( x_vec, i_array, error_array, dq_array, tmpwlii, i1d_type ) save_i1d(i1d_wl, tmpfn) # Normalize normalized = normalize_intensity_1d( wl_vec, x_vec, i_array, error_array, k_vec, k_error_vec, ) # Convert normalized intensities and errors to object of type `i1d_type` normalized_i1d = build_i1d_from_intensity_domain_meshgrid( wl_vec, x_vec, normalized[0], normalized[1], dq_array, i1d_type ) if output_wavelength_dependent_profile and output_dir: for tmpwlii, wl in enumerate(wl_vec): tmpfn = os.path.join(output_dir, f"IQ_{wl:.3f}_after_k_correction.dat") i1d_wl = build_i1d_one_wl_from_intensity_domain_meshgrid( x_vec, normalized[0], normalized[1], dq_array, tmpwlii, i1d_type ) save_i1d(i1d_wl, tmpfn) return normalized_i1d
[docs] def normalize_by_elastic_reference_2d(i_of_q, k_vec, k_error_vec): """Normalize I(Q2D) by wavelength-dependent elastic reference normalization factor Parameters ---------- i_of_q: ~drtsans.dataobjects.IQazimuthal Input I(Q2D, wavelength) to normalize k_vec: ~numpy.ndarray Elastic reference normalization factors (one for each wavelength) k_error_vec: ~numpy.ndarray Elastic reference normalization factor errors (one for each wavelength) Returns ------- ~drtsans.dataobjects.IQazimuthal normalized I(Q2D) """ intensity_array = i_of_q.intensity error_array = i_of_q.error # Reshape vectors to be easily indexed by wavelength num_wl = len(np.unique(i_of_q.wavelength)) sizeX = i_of_q.qx.shape[0] sizeY = i_of_q.qy.shape[0] intensity_3d = intensity_array.transpose().reshape((num_wl, sizeX, sizeY)) error_3d = error_array.transpose().reshape((num_wl, sizeX, sizeY)) # Scale each wavelength by the corresponding normalization factor, K normalized_intensity_array = np.zeros_like(intensity_3d) for i_wl in range(num_wl): normalized_intensity_array[i_wl] = intensity_3d[i_wl] * k_vec[i_wl] # Lowest wavelength bin does not require normalization as K = 1, i_wl = 0 normalized_error2_array = np.zeros_like(error_3d) normalized_error2_array[0, :, :] = error_3d[0, :, :] ** 2 for i_wl in range(1, num_wl): # Calculate error as dI^2 = dK^2 * I^2 + K^2 * dI^2 normalized_error2_array[i_wl] = ( k_error_vec[i_wl] ** 2 * intensity_3d[i_wl] ** 2 + k_vec[i_wl] ** 2 * error_3d[i_wl] ** 2 ) normalized_error_array = np.sqrt(normalized_error2_array) # Transform vectors back to their original shape normalized_intensity_array = normalized_intensity_array.reshape((sizeX * num_wl, sizeY)).transpose() normalized_error_array = normalized_error_array.reshape((sizeX * num_wl, sizeY)).transpose() normalized_i_of_q = IQazimuthal( intensity=normalized_intensity_array, error=normalized_error_array, qx=i_of_q.qx, qy=i_of_q.qy, wavelength=i_of_q.wavelength, delta_qx=i_of_q.delta_qx, delta_qy=i_of_q.delta_qy, ) return normalized_i_of_q
[docs] def build_i1d_from_intensity_domain_meshgrid( wl_vector, x_vector, intensity_array, error_array, delta_q_array, i1d_type ) -> IQmod | I1DAnnular: """Build I(1D) from a mesh grid representation of intensity, error and delta Q as a function of Q (or phi) and wavelength This is the reversed operation to method :py:meth:`~drtsans.tof.eqsans.elastic_correction.reshape_intensity_domain_meshgrid` Parameters ---------- wl_vector: ~numpy.ndarray wavelength (1D) x_vector: ~numpy.ndarray Q or phi (1D) intensity_array: ~numpy.ndarray intensities (2D) error_array: ~numpy.ndarray intensity errors (2D) delta_q_array: ~numpy.ndarray delta Q (1D) size = number wavelength * number Q, not used for i1d_type = DataType.I_ANNULAR i1d_type: DataType output data type for the intensity profile Returns ------- ~drtsans.dataobjects.IQmod | ~drtsans.dataobjects.I1DAnnular constructed I(Q, wavelength) or I(phi, wavelength) """ # assume that intensity, error and delta q have the same as (num_q, num_wl) assert intensity_array.shape[0] == x_vector.shape[0] and intensity_array.shape[1] == wl_vector.shape[0] # tile wave length wl_array_1d = np.tile(wl_vector, x_vector.shape[0]) q_array_1d = np.repeat(x_vector, wl_vector.shape[0]) # flatten intensity, error and optionally delta q intensity_array = intensity_array.flatten() error_array = error_array.flatten() if delta_q_array is not None: delta_q_array = delta_q_array.flatten() if i1d_type == DataType.IQ_MOD: i1d = IQmod( intensity=intensity_array, error=error_array, mod_q=q_array_1d, wavelength=wl_array_1d, delta_mod_q=delta_q_array, ) elif i1d_type == DataType.I_ANNULAR: i1d = I1DAnnular( intensity=intensity_array, error=error_array, phi=q_array_1d, wavelength=wl_array_1d, ) else: raise TypeError(f"{i1d_type} is not supported") return i1d
[docs] def build_i1d_one_wl_from_intensity_domain_meshgrid( x_vector, intensity_array, error_array, delta_q_array, wl_index, i1d_type ) -> IQmod | I1DAnnular: """Build I(1D) for one wavelength from a mesh grid representation of intensity, error and delta Q as a function of Q (or phi) and wavelength Parameters ---------- x_vector: ~numpy.ndarray Q or phi (1D) intensity_array: ~numpy.ndarray intensities (2D) error_array: ~numpy.ndarray intensity errors (2D) delta_q_array: ~numpy.ndarray delta Q (1D) size = number wavelength * number Q, not used for i1d_type = DataType.I_ANNULAR wl_index: int Wavelength axis index to extract i1d_type: DataType output data type for the intensity profile Returns ------- ~drtsans.dataobjects.IQmod | ~drtsans.dataobjects.I1DAnnular constructed I(Q, wavelength) or I(phi, wavelength) """ if i1d_type == DataType.IQ_MOD: return IQmod( intensity=intensity_array[:, wl_index], error=error_array[:, wl_index], mod_q=x_vector, delta_mod_q=delta_q_array[:, wl_index], ) elif i1d_type == DataType.I_ANNULAR: return I1DAnnular( intensity=intensity_array[:, wl_index], error=error_array[:, wl_index], phi=x_vector, ) else: raise TypeError(f"{i1d_type} is not supported")
[docs] def determine_common_domain_range_mesh(x_vec, intensity_array): """Determine the common Q or phi range among all the wavelengths such that I(Q/phi, lambda) does exist. This method assumes that I(Q/phi, wavelength) are on mesh grid of Q/phi and wavelength Detailed requirement: Determine q_min and q_max that exist in all I(q, lambda) for the fitting (minimization) process Parameters ---------- x_vec: numpy.ndarray vector of sorted unique Q or phi, depending on whether the 1D binning is scalar or annular intensity_array: numpy.ndarray 2D array of intensity. Each row is of same wavelength Returns ------- tuple index of x_min and x_max """ xmin_index = None xmax_index = None # Sanity check assert x_vec.shape[0] == intensity_array.shape[0], "Shape mismatch" num_x = x_vec.shape[0] for x_index in range(num_x): if len(np.where(np.isnan(intensity_array[x_index]))[0]) == 0: xmin_index = x_index break for x_index in range(num_x - 1, -1, -1): if len(np.where(np.isnan(intensity_array[x_index]))[0]) == 0: xmax_index = x_index break if xmin_index is None: raise RuntimeError("Unable to find common range") return xmin_index, xmax_index
def calculate_scale_factor_mesh_grid(wl_vec, intensity_array, error_array, ref_wl_intensities, xmin_index, xmax_index): """Same functionality as calculate_scale_factor but the algorithm is improved as I(Q, wavelength) or I(phi, wavelength) are in meshgrid Parameters ---------- wl_vec: numpy.array wavelength vector intensity_array: numpy.array intensity 2D array error_array: numpy.array error 2D array ref_wl_intensities: ReferenceWavelengths reference wavelength intensity/error xmin_index: int index of min x in x vector (actually Q or phi) xmax_index: int index of max x in x vector (actually Q or phi) Returns ------- tuple K vector, K error vector """ # Check input assert wl_vec.shape[0] == intensity_array.shape[1] k_vec = np.zeros_like(wl_vec) k_error2_vec = np.zeros_like(wl_vec) for i_wl, lambda_i in enumerate(wl_vec): # P(wl) = sum_q I(q, ref_wl) * I(q, wl) p_value = np.sum( ref_wl_intensities.intensity_vec[xmin_index : xmax_index + 1] * intensity_array[:, i_wl][xmin_index : xmax_index + 1] ) # S(wl) = sum_q I(q, wl)**2 s_value = np.sum(intensity_array[:, i_wl][xmin_index : xmax_index + 1] ** 2) # Calculate K(wl) k_vec[i_wl] = p_value / s_value # Calculate K(wl) error term0 = error_array[:, i_wl][xmin_index : xmax_index + 1] term1 = ( ref_wl_intensities.intensity_vec[xmin_index : xmax_index + 1] * s_value - 2.0 * intensity_array[:, i_wl][xmin_index : xmax_index + 1] * p_value ) / s_value**2 term2 = ref_wl_intensities.error_vec[xmin_index : xmax_index + 1] term3 = intensity_array[:, i_wl][xmin_index : xmax_index + 1] / s_value k_error2_vec[i_wl] = np.sum((term0 * term1) ** 2 + (term2 * term3) ** 2) return k_vec, np.sqrt(k_error2_vec)
[docs] def determine_reference_wavelength_intensity_mesh( wavelength_vec, x_vec, intensity_array, error_array, xmin_index, xmax_index, min_wl_index=0, ): """Determine the reference wavelength for each x (Q or phi). The reference wavelength of a specific Q or (qx, qy) or phi is defined as the shortest wavelength for all the finite I(Q, wavelength) or I(qx, qy, wavelength) or I(phi, wavelength) Parameters ---------- wavelength_vec: numpy.ndarray Wavelength vector x_vec: numpy.ndarray Vector of Q or phi intensity_array: numpy.ndarray Intensity array (2D) error_array: numpy.ndarray Error array (2D) xmin_index: int index of xmin in x-vector xmax_index: int index of xmax in x-vector Returns ------- ReferenceWavelengths Reference wavelengths for each momentum transfer Q, or azimuthal angle phi, and the corresponding intensity and error """ # Sanity check assert wavelength_vec.shape[0] == intensity_array.shape[1], ( f"Wavelength dimension = {wavelength_vec.shape} ,Intensity dimension = {intensity_array.shape}" ) # Minimum wavelength bin is the reference wavelength min_wl_vec = np.zeros_like(x_vec) + wavelength_vec[min_wl_index] # Minimum intensity and error min_intensity_vec = np.copy(intensity_array[:, min_wl_index]) min_error_vec = np.copy(error_array[:, min_wl_index]) # Set the unused defined reference wavelength (outside of xmin and xmax)'s # intensity and error to nan min_intensity_vec[0:xmin_index] = np.nan min_intensity_vec[xmax_index + 1 :] = np.nan min_error_vec[0:xmin_index] = np.nan min_error_vec[xmax_index + 1 :] = np.nan return ReferenceWavelengths(x_vec, min_wl_vec, min_intensity_vec, min_error_vec)
def normalize_intensity_1d( wl_vec, x_vec, intensity_array, error_array, k_vec, k_error_vec, ): """Normalize 1D intensities and errors Parameters ---------- wl_vec: ~numpy.ndarray 1D vector of wavelength (in ascending order) x_vec: ~numpy.ndarray 1D vector of Q or phi (in ascending order) intensity_array: ~numpy.ndarray 2D array of intensities, shape[0] = number of Q or phi, shape[1] = number of wavelength error_array: ~numpy.ndarray 2D array of errors, shape[0] = number of Q or phi, shape[1] = number of wavelength k_vec: ~numpy.ndarray calculated K vector k_error_vec: ~numpy.ndarray calculated K error vector Returns ------- tuple normalized I(1D), normalized error(1D) """ # Sanity check assert wl_vec.shape[0] == intensity_array.shape[1] # wavelength as lambda assert x_vec.shape[0] == error_array.shape[0] # points as Q or phi assert intensity_array.shape == error_array.shape # Normalized intensities normalized_intensity_array = intensity_array * k_vec normalized_error2_array = np.zeros_like(error_array) # Lowest wavelength bin does not require normalization as K = 1, i_wl = 0 normalized_error2_array[:, 0] = error_array[:, 0] ** 2 # Loop over wavelength num_wl = wl_vec.shape[0] for i_wl in range(1, num_wl): # Calculate error as dI^2 = dK^2 * I^2 + K^2 * dI^2 normalized_error2_array[:, i_wl] = ( k_error_vec[i_wl] ** 2 * intensity_array[:, i_wl] ** 2 + k_vec[i_wl] ** 2 * error_array[:, i_wl] ** 2 ) return normalized_intensity_array, np.sqrt(normalized_error2_array)
[docs] def apply_elastic_normalization_to_unbinned_data( i_of_q_2d, i_of_q_1d, wl_vec, k_vec, k_error_vec, ): """Apply elastic normalization to unbinned I(Q, λ) and I(Qx, Qy, λ) data This function applies pre-calculated k(λ) normalization factors to unbinned data. The normalized unbinned data can then be binned in any mode (scalar, wedge, annular) and will produce consistent results. Parameters ---------- i_of_q_2d: ~drtsans.dataobjects.IQazimuthal Unbinned I(Qx, Qy, wavelength) data i_of_q_1d: ~drtsans.dataobjects.IQmod Unbinned I(Q, wavelength) data wl_vec: ~numpy.ndarray Wavelength bins corresponding to k_vec k_vec: ~numpy.ndarray Elastic reference normalization factors (one for each wavelength) k_error_vec: ~numpy.ndarray Elastic reference normalization factor errors (one for each wavelength) Returns ------- tuple[IQazimuthal, IQmod] Normalized unbinned I(Qx, Qy, λ) and I(Q, λ) """ # Helper function to create numpy-based interpolation with edge handling def _make_interp_fn(x, y): """ Create a 1D interpolation function using numpy.interp with linear interpolation and edge handling (extrapolate with nearest values at edges). Parameters ---------- x: array-like x coordinates (must be sorted) y: array-like y values corresponding to x Returns ------- callable Interpolation function that accepts new x values """ x = np.asarray(x) y = np.asarray(y) left = y[0] right = y[-1] def _interp(new_x): return np.interp(new_x, x, y, left=left, right=right) return _interp # Create interpolation functions for k and k_error k_interp = _make_interp_fn(wl_vec, k_vec) k_error_interp = _make_interp_fn(wl_vec, k_error_vec) # Apply normalization to 2D unbinned data if i_of_q_2d is not None and len(i_of_q_2d.intensity) > 0: # Interpolate k factors to match wavelengths in unbinned data k_vals_2d = k_interp(i_of_q_2d.wavelength) k_errs_2d = k_error_interp(i_of_q_2d.wavelength) # Apply normalization: I_normalized = I * k(λ) normalized_intensity_2d = i_of_q_2d.intensity * k_vals_2d # Error propagation: σ²_normalized = k² * σ²_I + I² * σ²_k normalized_error_2d = np.sqrt(k_vals_2d**2 * i_of_q_2d.error**2 + i_of_q_2d.intensity**2 * k_errs_2d**2) normalized_i_of_q_2d = IQazimuthal( intensity=normalized_intensity_2d, error=normalized_error_2d, qx=i_of_q_2d.qx, qy=i_of_q_2d.qy, wavelength=i_of_q_2d.wavelength, delta_qx=i_of_q_2d.delta_qx, delta_qy=i_of_q_2d.delta_qy, ) else: # Return input as-is if None or empty normalized_i_of_q_2d = i_of_q_2d # Apply normalization to 1D unbinned data if i_of_q_1d is not None and len(i_of_q_1d.intensity) > 0: # Interpolate k factors to match wavelengths in unbinned data k_vals_1d = k_interp(i_of_q_1d.wavelength) k_errs_1d = k_error_interp(i_of_q_1d.wavelength) # Apply normalization: I_normalized = I * k(λ) normalized_intensity_1d = i_of_q_1d.intensity * k_vals_1d # Error propagation: σ²_normalized = k² * σ²_I + I² * σ²_k normalized_error_1d = np.sqrt(k_vals_1d**2 * i_of_q_1d.error**2 + i_of_q_1d.intensity**2 * k_errs_1d**2) normalized_i_of_q_1d = IQmod( intensity=normalized_intensity_1d, error=normalized_error_1d, mod_q=i_of_q_1d.mod_q, delta_mod_q=i_of_q_1d.delta_mod_q, wavelength=i_of_q_1d.wavelength, ) else: # Return input as-is if None or empty normalized_i_of_q_1d = i_of_q_1d return normalized_i_of_q_2d, normalized_i_of_q_1d
[docs] def elastic_correction( iq2d_unbinned: IQazimuthal, iq1d_unbinned: IQmod, iq2d_elastic_ref: IQazimuthal, iq1d_elastic_ref: IQmod, num_x_bins: int, num_y_bins: int, num_q1d_bins: int, num_q1d_bins_per_decade: int, decade_on_center: bool, bin1d_type: str, log_binning: bool, user_qmin: Optional[float], user_qmax: Optional[float], annular_bin: float, wedges: List[Tuple[int, int]], symmetric_wedges: bool, weighted_errors: bool, output_wavelength_profile: bool, output_dir: str, output_filename: str, raw_name: str, ) -> Tuple[IQazimuthal, IQmod]: """Apply elastic reference normalization to unbinned I(Q) data. This function encapsulates all elastic correction logic: 1. Determines Q ranges from data if not provided by user 2. Temporarily bins sample and elastic reference data to calculate k(λ) factors 3. Saves k(λ) to file 4. Applies k(λ) correction to unbinned sample data 5. Returns corrected unbinned data Parameters ---------- iq2d_unbinned : IQazimuthal Unbinned 2D I(Qx, Qy) sample data iq1d_unbinned : IQmod Unbinned 1D I(Q) sample data iq2d_elastic_ref : IQazimuthal Unbinned 2D I(Qx, Qy) elastic reference data iq1d_elastic_ref : IQmod Unbinned 1D I(Q) elastic reference data num_x_bins : int Number of Qx bins for temporary binning num_y_bins : int Number of Qy bins for temporary binning num_q1d_bins : int Number of Q bins for temporary 1D binning num_q1d_bins_per_decade : int Number of bins per decade for logarithmic binning decade_on_center : bool Whether decade boundaries are on bin centers bin1d_type : str Type of 1D binning ('scalar', 'wedge', 'annular') log_binning : bool Whether to use logarithmic binning user_qmin : float, optional Minimum Q value (if None, determined from data) user_qmax : float, optional Maximum Q value (if None, determined from data) annular_bin : float Width of annular bin in degrees wedges : list List of (angle_min, angle_max) tuples for wedges symmetric_wedges : bool Whether to add symmetric wedges weighted_errors : bool Whether to use error-weighted binning output_wavelength_profile : bool Whether to output wavelength-dependent profiles output_dir : str Output directory for correction files output_filename : str Base filename for output raw_name : str Prefix for correction file names Returns ------- tuple (corrected_iq2d_unbinned, corrected_iq1d_unbinned) """ # Import bin_all here to avoid circular import from drtsans.iq import bin_all from drtsans.tof.eqsans.correction_api import save_k_vector logger.notice("Applying elastic reference normalization") # Determine Q ranges from data if not provided by user qmin = user_qmin if user_qmin is not None else iq1d_unbinned.mod_q.min() qmax = user_qmax if user_qmax is not None else iq1d_unbinned.mod_q.max() qxrange = (np.min(iq2d_unbinned.qx), np.max(iq2d_unbinned.qx)) qyrange = (np.min(iq2d_unbinned.qy), np.max(iq2d_unbinned.qy)) # Temporarily bin sample data to calculate k(λ) factors # These binned results are ONLY used for factor calculation, then discarded iq2d_temp_binned, iq1d_temp_binned = bin_all( iq2d_unbinned, iq1d_unbinned, num_x_bins, num_y_bins, n1dbins=num_q1d_bins, n1dbins_per_decade=num_q1d_bins_per_decade, decade_on_center=decade_on_center, bin1d_type="scalar" if bin1d_type == "wedge" else bin1d_type, log_scale=log_binning, qmin=qmin, qmax=qmax, qxrange=qxrange, qyrange=qyrange, annular_angle_bin=annular_bin, wedges=wedges, symmetric_wedges=symmetric_wedges, error_weighted=weighted_errors, n_wavelength_bin=None, ) # Temporarily bin elastic reference data iq2d_elastic_temp, iq1d_elastic_temp = bin_all( iq2d_elastic_ref, iq1d_elastic_ref, num_x_bins, num_y_bins, n1dbins=num_q1d_bins, n1dbins_per_decade=num_q1d_bins_per_decade, decade_on_center=decade_on_center, bin1d_type="scalar" if bin1d_type == "wedge" else bin1d_type, log_scale=log_binning, qmin=qmin, qmax=qmax, qxrange=qxrange, qyrange=qyrange, annular_angle_bin=annular_bin, wedges=wedges, symmetric_wedges=symmetric_wedges, error_weighted=weighted_errors, n_wavelength_bin=None, ) if len(iq1d_temp_binned) != 1 or len(iq1d_elastic_temp) != 1: raise NotImplementedError("Expected exactly one IQmod from temporary binning") # Calculate k(λ) correction factors # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) _, _, k_vec, k_error_vec = normalize_by_elastic_reference_all( iq2d_temp_binned, iq1d_temp_binned[0], iq1d_elastic_temp[0], output_wavelength_profile, output_dir, ) # Save k(λ) to file wl_vec = np.unique(iq1d_temp_binned[0].wavelength) save_k_vector( wl_vec, k_vec, k_error_vec, path=os.path.join(output_dir, f"{output_filename}_elastic_k1d_{raw_name}.dat"), ) # Apply k(λ) to unbinned sample data logger.notice("Applying elastic normalization to unbinned data") iq2d_corrected, iq1d_corrected = apply_elastic_normalization_to_unbinned_data( iq2d_unbinned, iq1d_unbinned, wl_vec, k_vec, k_error_vec ) return iq2d_corrected, iq1d_corrected