Source code for

import numpy

from drtsans.dataobjects import (
from enum import Enum
from typing import List, Any, Tuple
import numpy as np

from drtsans.determine_bins import (

# To ignore warning:   invalid value encountered in true_divide
np.seterr(divide="ignore", invalid="ignore")

__all__ = [

[docs] class BinningMethod(Enum): """ Binning method """ NOWEIGHT = 1 # no-weight binning WEIGHTED = 2 # weighted binning
[docs] def check_iq_for_binning(i_of_q): """Check I(Q) for binning. Binning I(Q) assumes that 1. there is no NaN or Infinity in intensities 2. there is no NaN, Infinity or Zero in intensity errors :exception : RuntimeError raise exception if input I(Q) does not meet assumption :param i_of_q: ~drtsans.dataobjects.IQmod or IQazimuthal I(Q) """ error_message = "" # Check intensity if np.where(np.isnan(i_of_q.intensity))[0].size > 0: error_message += "Intensity has {} NaNs: {}\n".format( len(np.where(np.isnan(i_of_q.intensity))[0]), np.where(np.isnan(i_of_q.intensity))[0], ) if np.where(np.isinf(i_of_q.intensity))[0].size > 0: error_message += "Intensity has {} Infinities: {}\n".format( len(np.where(np.isnan(i_of_q.intensity))[0]), np.where(np.isnan(i_of_q.intensity))[0], ) # Check error if np.where(np.isnan(i_of_q.error))[0].size > 0: error_message += "Intensity error has {} NaNs: {}\n".format( len(np.where(np.isnan(i_of_q.error))[0]), np.where(np.isnan(i_of_q.error))[0], ) if np.where(np.isinf(i_of_q.error))[0].size > 0: error_message += "Intensity error has Inf: {}\n".format(np.where(np.isnan(i_of_q.error))[0]) if np.where(np.abs(i_of_q.error) < 1e-20)[0].size > 0: error_message += "Intensity error has zero {}\n".format(np.where(np.abs(i_of_q.error) < 1e-20)[0]) if len(error_message) > 0: raise RuntimeError("Input I(Q) for binning does not meet assumption:\n{}".format(error_message))
def valid_wedge(min_angle, max_angle) -> List[Tuple[float, float]]: """ Helper function to validate wedge. It checks that the values are in the [-90,270) range and that the wedge angle is less than 180 degrees Parameters ---------- min_angle: float max_angle: float Returns ------- ~list (min_angle, max_angle) tuple. If min_angle < 270 and max_angle > -90 the function returns two wedges [(min_angle,270.1),(-90.1,max_angle)] """ if min_angle >= 270.0 or min_angle < -90: raise ValueError("minimum angle not in the [-90,270) range: {:.1f}".format(min_angle)) if max_angle >= 270.0 or max_angle < -90: raise ValueError("maximum angle not in the [-90,270) range: {:.1f}".format(max_angle)) if max_angle == min_angle: raise ValueError("maximum angle = minimum angle: {:.1f} == {:.1f}".format(min_angle, max_angle)) if max_angle > min_angle: diff = max_angle - min_angle if diff < 180.0: return [(min_angle, max_angle)] raise ValueError( "wedge angle is greater than 180 degrees: {:.1f} - {:.1f} = {:.1f} < 180" "".format(max_angle, min_angle, diff) ) diff = min_angle - max_angle if diff <= 180: raise ValueError( "wedge angle is greater than 180 degrees: {:.1f} - {:.1f} = {:.1f} <= 180" "".format(min_angle, max_angle, diff) ) return [(min_angle, 270.1), (-90.1, max_angle)] def get_wedges(min_angle: float, max_angle: float, symmetric_wedges=True) -> List[Tuple[float, float]]: """ Helper function to return all wedges defined by the min_angle and max_angle, including the wedge offset by 180 degrees Parameters ---------- min_angle: float lower boundary of the wedge angle in degree max_angle: float upper boundary of the wedge angle in degree symmetric_wedges: bool Add the wedge offset by 180 degrees if True Returns ------- ~list (min_angle, max_angle) tuples. """ if symmetric_wedges: wedges = valid_wedge(min_angle, max_angle) # create the opposite side and add it opp_min = min_angle + 180.0 opp_max = max_angle + 180.0 if opp_min >= 270: opp_min -= 360.0 if opp_max >= 270.0: opp_max -= 360 wedges.extend(valid_wedge(opp_min, opp_max)) elif isinstance(min_angle, (float, int)): # also a tuple for a single wedge (min angle, max angle) # but not symmetric wedges = valid_wedge(min_angle, max_angle) else: # in this case min_angle and max_angle are actually two wedges # that should be summed together raise NotImplementedError("use case to have min_angle and max_angle as 2-tuple is disabled") return wedges
[docs] def bin_all( i_qxqy, i_modq, nxbins, nybins, n1dbins=None, n1dbins_per_decade=None, bin1d_type="scalar", log_scale=False, decade_on_center=False, qmin=None, qmax=None, wedge1_qmin=None, wedge1_qmax=None, wedge2_qmin=None, wedge2_qmax=None, qxrange=None, qyrange=None, annular_angle_bin=1.0, wedges: List[Any] = None, symmetric_wedges: bool = True, error_weighted=False, n_wavelength_bin=1, ) -> Tuple[IQazimuthal, List[IQmod]]: r"""Do all 1D and 2D binning for a configuration or detector Parameters ---------- i_qxqy: ~drtsans.dataobjects.IQazimuthal Object containing 2D unbinned data I(Qx, Qy). It will be used for 2D binned data, and 1D wedge or annular binned data i_modq: ~drtsans.dataobjects.IQmod Object containing 1D unbinned data I(\|Q\|). It will be used for scalar binned data nxbins: int number of bins in the x direction for 2D binning nybins: int number of bins in the y direction for 2D binning n1dbins: int number of bins for the 1d binning. n1dbins_per_decade: int Total number of bins will be this value multiplied by number of decades from X min to X max bin1d_type: str type of binning for 1D data. Possible choices are 'scalar', 'annular', or 'wedge' log_scale: bool if True, 1D scalar or wedge binning will be logarithmic. Ignored for anything else decade_on_center: bool Flag to have the min X and max X on bin center; Otherwise, they will be on bin boundary qmin: float Minimum value of the momentum transfer modulus Q qmax: float Maximum value of the momentum transfer modulus Q wedge1_qmin: float Minimum value of the momentum transfer modulus Q for the first wedge when ``bin1d_type = 'wedge'`` wedge1_qmax: float Maximum value of the momentum transfer modulus Q for the first wedge when ``bin1d_type = 'wedge'`` wedge2_qmin: float Minimum value of the momentum transfer modulus Q for the second wedge when ``bin1d_type = 'wedge'`` wedge2_qmax: float Maximum value of the momentum transfer modulus Q for the second wedge when ``bin1d_type = 'wedge'`` qxrange: ~tuple qx min and qx max qyrange: ~tuple qy min and qy max annular_angle_bin: float width of annular bin in degrrees. Annular binning is linear wedges: list list of tuples (angle_min, angle_max) for the wedges. Both numbers have to be in the [-90,270) range. It will add the wedge offset by 180 degrees dependent on ``symmetric_wedges`` symmetric_wedges: bool It will add the wedge offset by 180 degrees if True error_weighted: bool if True, the binning is done using the Weighted method n_wavelength_bin: None, int None: keep original wavelength vector. int: number of wavelength bins. 1 to sum all Returns ------- (~drtsans.dataobjects.IQazimuthal, ~list) binned IQazimuthal list of binned ~drtsans.dataobjects.IQmod objects. The list has length 1, unless the 'wedge' mode is selected, when the length is the number of original wedges """ # Sanity check if n_wavelength_bin is None: pass elif n_wavelength_bin > 1: raise NotImplementedError(f"Case for n_wavelength_bin = {n_wavelength_bin} has not been implemented") method = BinningMethod.NOWEIGHT if error_weighted: method = BinningMethod.WEIGHTED # 2D binning if qxrange is None: # default: data's qx range qx_min = np.min(i_qxqy.qx) qx_max = np.max(i_qxqy.qx) else: qx_min, qx_max = qxrange binning_x = determine_1d_linear_bins(qx_min, qx_max, nxbins) if qyrange is None: # default: data's qy range qy_min = np.min(i_qxqy.qy) qy_max = np.max(i_qxqy.qy) else: qy_min, qy_max = qyrange binning_y = determine_1d_linear_bins(qy_min, qy_max, nybins) # bin 2D # FIXME - 2D binning does not support weighted binning well. Force it to no-weight binning bin_2d_method = BinningMethod.NOWEIGHT binned_q2d = bin_intensity_into_q2d( i_qxqy, binning_x, binning_y, method=bin_2d_method, wavelength_bins=n_wavelength_bin, ) # 1D binning binned_q1d_list = [] if bin1d_type == "annular": # annular binning bin_params = BinningParams(0.0, 360.0, int(360.0 / annular_angle_bin)) kwargs = {"method": method} if qmin is not None: kwargs["q_min"] = qmin if qmax is not None: kwargs["q_max"] = qmax binned_q1d_list.append(bin_annular_into_q1d(i_qxqy, bin_params, **kwargs)) else: # regular binning including 'scalar' and 'wedge' if bin1d_type == "scalar": unbinned_1d = [i_modq] qmin_list = [qmin] if qmin is not None else [i_modq.mod_q.min()] qmax_list = [qmax] if qmax is not None else [i_modq.mod_q.max()] elif bin1d_type == "wedge": # select Q's by wedge angles unbinned_1d = bin_into_wedges(i_qxqy, wedges, symmetric_wedges) unbinned_1d_flattened = concatenate(unbinned_1d) # there are two wedges with individual (qmin, qmax) limits qmin_list = [None, None] qmin_list[0] = wedge1_qmin if wedge1_qmin is not None else unbinned_1d_flattened.mod_q.min() qmin_list[1] = wedge2_qmin if wedge2_qmin is not None else unbinned_1d_flattened.mod_q.min() qmax_list = [None, None] qmax_list[0] = wedge1_qmax if wedge1_qmax is not None else unbinned_1d_flattened.mod_q.max() qmax_list[1] = wedge2_qmax if wedge2_qmax is not None else unbinned_1d_flattened.mod_q.max() else: raise ValueError(f"bin1d_type of type {bin1d_type} is not available") if log_scale: for iub, ub1d in enumerate(unbinned_1d): # log bins bins_1d = determine_1d_log_bins( qmin_list[iub], qmax_list[iub], n_bins_per_decade=n1dbins_per_decade, n_bins=n1dbins, decade_on_center=decade_on_center, ) # The filter is needed for logarithmic binning so that # the qmin and qmax are correctly taken into account q_filter = np.where(np.logical_and(ub1d.mod_q >= qmin_list[iub], ub1d.mod_q <= qmax_list[iub])) ub1d_filtered = IQmod( ub1d.intensity[q_filter], ub1d.error[q_filter], ub1d.mod_q[q_filter], ub1d.delta_mod_q[q_filter] if ub1d.delta_mod_q is not None else None, ub1d.wavelength[q_filter] if ub1d.wavelength is not None else None, ) binned_q1d = bin_intensity_into_q1d( ub1d_filtered, bins_1d, bin_method=method, wavelength_bins=n_wavelength_bin, ) binned_q1d_list.append(binned_q1d) else: for iub, ub1d in enumerate(unbinned_1d): # linear bins bins_1d = determine_1d_linear_bins(qmin_list[iub], qmax_list[iub], n1dbins) print(f"Linear binning for {iub}!") print( f"Number of NaNs = {np.where(np.isnan(ub1d.intensity))[0].shape}; " f"Number of data points = {ub1d.intensity.shape}" ) binned_q1d_list.append( bin_intensity_into_q1d( ub1d, bins_1d, bin_method=method, wavelength_bins=n_wavelength_bin, ) ) return binned_q2d, binned_q1d_list
def bin_into_wedges(i_qxqy, wedges: List[Any], symmetric_wedges: bool) -> List[Any]: """ Parameters ---------- i_qxqy wedges symmetric_wedges Returns ------- list list of ~drtsans.dataobjects.IQmod """ unbinned_1d = list() # Group is an element of the list # Each group is a list of 2-tuples, each is for an individual wedge validated_wedge_angles_groups = validate_wedges_groups(wedges, symmetric_wedges) # Bin! for wedge_angles in validated_wedge_angles_groups: # select I(Q) by wedge angles wedge_pieces = [select_i_of_q_by_wedge(i_qxqy, min_angle, max_angle) for min_angle, max_angle in wedge_angles] # concatenate selected I(Q) unbinned_1d.append(q_azimuthal_to_q_modulo(concatenate(wedge_pieces))) return unbinned_1d def validate_wedges_groups(wedges, symmetric_wedges) -> List[List[Tuple[float, float]]]: """Validate a list of wedges groups Parameters ---------- wedges: ~list List of wedges group. Each wedge group is either a list of 2-tuples or a 2-tuple. Each 2-tuple is a wedge symmetric_wedges: bool Flag to include all the symmetric wedges of the given wedges to output Returns ------- ~list List of wedges group. Each wedge group is a list of 2-tuples. Each 2-tupel is a wedge """ validated_wedge_angles_groups = list() for wedge in wedges: # For a given wedge group, it can be a single wedge or a list of wedges if isinstance(wedge, tuple): # manual wedge: each wedge group contains 1 and only 1 wedge. min_wedge_angle, max_wedge_angle = wedge wedge_angles = get_wedges(min_wedge_angle, max_wedge_angle, symmetric_wedges) elif isinstance(wedge, list): # auto wedge: each wedge group contain 2 wedges if len(wedges) != 2 or symmetric_wedges: # Note: auto wedge shall not have a pair of wedges sent to this method # It is worth to discuss how to work with auto/manual wedge with symmetric/asymmetric combination # by unified data structure raise NotImplementedError( f"Unsupported scenario for automated wedge: number of wedges {len(wedges)}" f" is not equal to 2. And/or symmetric wedge option {symmetric_wedges} " f"cannot be True." ) wedge_angles = get_wedges(wedge[0][0], wedge[0][1], symmetric_wedges) wedge_angles.extend(get_wedges(wedge[1][0], wedge[1][1], symmetric_wedges)) else: # supported wedge group type raise TypeError(f"Wedge group {wedges} of type {type(wedges)} is not supported") # add the corrected wedge angles validated_wedge_angles_groups.append(wedge_angles) return validated_wedge_angles_groups
[docs] def bin_intensity_into_q1d(i_of_q, q_bins, bin_method=BinningMethod.NOWEIGHT, wavelength_bins=1) -> IQmod: """Binning I(Q) from scalar Q (1D) with linear binning on Q Replace intensity, intensity_error, scalar_q, scalar_dq by IQmod Replace bins, q_min=None, q_max=None by BinningParams bins: number of bins for linear binning; step per decade for logarithm binning q_min : Default to min(scalar_q) q_max : Default to max(scalar_q) Parameters ---------- i_of_q : ~drtsans.dataobjects.IQmod Scalar I(Q) including intensity, intensity_error, scalar_q, scalar_dq in 1d nparray including: intensity error mod_q delta_mod_q q_bins : Bins namedtuple for arbitrary bin edges and bin centers bin_method : ~drtsans.BinningMethod weighted binning or no-weight binning method wavelength_bins: None, int number of binned wavelength. If None, do not bin. If equal to 1, bin all wavelength together Returns ------- drtsans.dataobjects.IQmod the one dimensional data as a named tuple """ # Check input I(Q) whether it meets assumptions check_iq_for_binning(i_of_q) # bin I(Q) if bin_method == BinningMethod.WEIGHTED: # weighed binning binned_q = _do_1d_weighted_binning( i_of_q.mod_q, i_of_q.delta_mod_q, i_of_q.intensity, i_of_q.error, q_bins, i_of_q.wavelength, wavelength_bins, ) else: # no-weight binning binned_q = _do_1d_no_weight_binning( i_of_q.mod_q, i_of_q.delta_mod_q, i_of_q.intensity, i_of_q.error, q_bins, i_of_q.wavelength, wavelength_bins, ) return binned_q
[docs] def select_i_of_q_by_wedge(i_of_q, min_wedge_angle, max_wedge_angle): """Select a sub set of I(Q) by 2D wedge Parameters ---------- i_of_q : ~drtsans.dataobjects.IQazimuthal "intensity": intensity, "error": sigma(I), "qx": qx, "qy": qy, "delta_qx": dqx, "delta_qy", dqy min_wedge_angle : float minimum value of theta/azimuthal angle for wedge max_wedge_angle : float maximum value of theta/azimuthal angle for wedge Returns ------- drtsans.dataobjects.IQazimuthal subset of input I(Qx, Qy) with (Qx, Qy) inside defined wedge """ # Calculate wedge angles for each I(Qx, Qy) # calculate azimuthal angles from -180 to 180 degrees azimuthal_array = np.arctan2(i_of_q.qy, i_of_q.qx) * 180.0 / np.pi # correct azimuthal angles to -90 to 270 degrees azimuthal_array[azimuthal_array < -90.0] += 360.0 # Define the filter (mask/ROI) for pixels falling into preferred wedge wedge_indexes = (azimuthal_array > min_wedge_angle) & (azimuthal_array < max_wedge_angle) # Create a new IQazimuthal with selected subset wedge_i_of_q = IQazimuthal( i_of_q.intensity[wedge_indexes], i_of_q.error[wedge_indexes], i_of_q.qx[wedge_indexes], i_of_q.qy[wedge_indexes], i_of_q.delta_qx[wedge_indexes], i_of_q.delta_qy[wedge_indexes], ) return wedge_i_of_q
def _toQmodAndAzimuthal( data: IQazimuthal, ) -> Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]: """This function returns the values of qmod and azimuthal that are parallel to the original data array. It requires that the data is IQazimuthal Parameters ========== data: ~drtsans.dataobjects.IQazimuthal Results ======= tuple ``(intensity, error, qmod, dqmod, azimuthal)`` as 1d arrays with Q in angstrom and azimuthal angle in degrees""" if not getDataType(data) == DataType.IQ_AZIMUTHAL: raise RuntimeError("Calculating qmod and azimuthal only works for IQazimuthal") # reshape the qx and qy if intensity array is 2d if len(data.intensity.shape) == 2 and len(data.qx.shape) == 1 and len(data.qy.shape) == 1: qx = np.tile(data.qx, (data.qy.shape[0], 1)) qy = np.tile(data.qy, (data.qx.shape[0], 1)).transpose() else: qx = data.qx qy = data.qy # calculate q-scalar q = np.sqrt(np.square(qx) + np.square(qy)).ravel() # calculate dQ from dQx and dQy if data.delta_qx is None or data.delta_qy is None: dq = None else: dq = np.sqrt(np.square(data.delta_qx) + np.square(data.delta_qy)).ravel() # azimuthal is expected to be positive so use cyclical nature of trig functions azimuthal = np.rad2deg(np.arctan2(qy, qx)).ravel() azimuthal[azimuthal < 0.0] += 360.0 return data.intensity.ravel(), data.error.ravel(), q, dq, azimuthal
[docs] def bin_annular_into_q1d(i_of_q, theta_bin_params, q_min=0.001, q_max=0.4, method=BinningMethod.NOWEIGHT): """Annular 1D binning Calculates: I(azimuthal), sigma I and dazmuthal by assigning pixels to proper azimuthal angle bins Given I(Qx, Qy) and will convert to :py:obj:`~drtsans.IQmod` in the code. The independent axis is actually the azimuthal angle around the ring. Parameters ---------- i_of_q : ~drtsans.dataobjects.IQazimuthal I(Qx, Qy), sigma I(Qx, Qy), Qx, Qy, dQx and dQy theta_bin_params : ~drtsans.BinningParams binning parameters on annular angle 'theta' theta_min : float minimum value of theta/azimuthal angle theta_max : float maximum value of theta/azimuthal angle bins : int or sequence of scalars, optional See `scipy.stats.binned_statistic`. If `bins` is an int, it defines the number of equal-width bins in the given range (10 by default). If `bins` is a sequence, it defines the bin edges, including the rightmost edge, allowing for non-uniform bin widths. Values in `x` that are smaller than lowest bin edge areassigned to bin number 0, values beyond the highest bin are assigned to ``bins[-1]``. If the bin edges are specified, the number of bins will be, (nx = len(bins)-1). q_min : float, optional , by default q_max : float, optional , by default method : ~drtsans.BinningMethod binning method, no-weight or weighed Returns ------- drtsans.dataobjects.IQmod Annular-binned I(azimuthal) in 1D """ # Determine azimuthal angle bins (i.e., theta bins) theta_bins = determine_1d_linear_bins(theta_bin_params.min, theta_bin_params.max, theta_bin_params.bins) if theta_bins.centers.min() < 0.0 or theta_bins.centers.max() > 360.0: msg = "must specify range 0<=theta<=360deg found {}<=theta<={}deg".format( theta_bin_params.min, theta_bin_params.max ) raise ValueError(msg) # Check input I(Q) whether it meets assumptions check_iq_for_binning(i_of_q) # convert the data to q and azimuthal angle intensity, error, q_array, dq_array, theta_array = _toQmodAndAzimuthal(i_of_q) # Filter by q_min and q_max allowed_q_index = np.logical_and((q_array > q_min), (q_array < q_max)) # select binning method # the methods call the independent axis "Q", but are generic to whatever values are passed in if method == BinningMethod.NOWEIGHT: # no weight binning do_1d_binning = _do_1d_no_weight_binning elif method == BinningMethod.WEIGHTED: # weighted binning do_1d_binning = _do_1d_weighted_binning else: # not supported case raise RuntimeError("Binning method {} is not recognized".format(method)) # apply the selected binning method by either using or skipping the dq_array if dq_array is None: binned_i_of_azimuthal = do_1d_binning( theta_array[allowed_q_index], None, intensity[allowed_q_index], error[allowed_q_index], theta_bins, None, 1, ) else: binned_i_of_azimuthal = do_1d_binning( theta_array[allowed_q_index], dq_array[allowed_q_index], intensity[allowed_q_index], error[allowed_q_index], theta_bins, None, 1, ) return binned_i_of_azimuthal
def _do_1d_no_weight_binning(q_array, dq_array, iq_array, sigmaq_array, q_bins, wl_array, wavelength_bins): """Bin I(Q) by given bin edges and do no-weight binning This method implements equation 11.34, 11.35 and 11.36 in master document. If there is no Q in a certain Qk bin, NaN will be set to both I(Qk) and sigma I(Qk) Parameters ---------- q_array: ndarray scalar momentum transfer Q in 1D array flattened from 2D detector dq_array: ndarray, None scalar momentum transfer (Q) resolution in 1D array flattened from 2D detector iq_array: ndarray I(Q) in 1D array flattened from 2D detector sigmaq_array: ndarray sigma I(Q) in 1D array flattened from 2D detector q_bins: ~drtsans.determine_bins.Bins Bin centers and edges Returns ------- ~drtsans.dataobjects.IQmod IQmod is a class for holding 1D binned data. """ def _bin_iq1d(bin_edges, q_vec, dq_vec, i_vec, error_vec): """Bin I(Q1D), dI(Q1D) and dQ(Q1D) by no weight binning algorithm Parameters ---------- bin_edges: ~numpy.ndarray bin edges q_vec: ~numpy.ndarray vector of Q1D dq_vec: ~numpy.ndarray, None vector for Q1D resolution. could be left as None i_vec: ~numpy.ndarray 1d vector of intensity error_vec: ~numpy.ndarray 1D vector of intensity error Returns ------- ~tuple binned intensity vector, binned intensity error vector, binned q resolution vector """ # Count number of Q in 'q_array' in each Q-bin when they are binned (histogram) to 'bin_edges' num_pt_vec, _ = np.histogram(q_vec, bins=bin_edges) # Counts per bin: I_{k, raw} = \sum I(i, j) for each bin i_raw_vec, _ = np.histogram(q_vec, bins=bin_edges, weights=i_vec) # Square of summed uncertainties for each bin sigma_sqr_vec, _ = np.histogram(q_vec, bins=bin_edges, weights=error_vec**2) # Final I(Q): I_k = \frac{I_{k, raw}}{N_k} i_final_vec = i_raw_vec / num_pt_vec # Final sigma(Q): sigmaI_k = \frac{sigmaI_{k, raw}}{N_k} sigma_final_vec = np.sqrt(sigma_sqr_vec) / num_pt_vec # Calculate Q resolution of binned if dq_vec is None: bin_dq_vec = None else: binned_vec, _ = np.histogram(q_vec, bins=bin_edges, weights=dq_vec) bin_dq_vec = binned_vec / num_pt_vec return i_final_vec, sigma_final_vec, bin_dq_vec # check input assert q_bins.centers.shape[0] + 1 == q_bins.edges.shape[0] if wavelength_bins == 1 or wl_array is None: # bin I(Q, wl) regardless of wl value i_final_array, sigma_final_array, bin_q_resolution = _bin_iq1d( q_bins.edges, q_array, dq_array, iq_array, sigmaq_array ) # construct output without wavelength vector binned_iq1d = IQmod( intensity=i_final_array, error=sigma_final_array, mod_q=q_bins.centers, delta_mod_q=bin_q_resolution, ) elif wavelength_bins is None: # bin I(Q) with same value of wavelength unique_wl_vec = np.unique(wl_array) unique_wl_vec.sort() # construct a 2D array for filtering if dq_array is None: wl_matrix = np.array([wl_array, q_array, iq_array, sigmaq_array]) else: wl_matrix = np.array([wl_array, q_array, iq_array, sigmaq_array, dq_array]) wl_matrix = wl_matrix.transpose() # define output binned_q_vec = binned_dq_vec = binned_i_vec = binned_sigma_vec = binned_wl_vec = np.ndarray( shape=(0,), dtype=float ) for wl_i in unique_wl_vec: # filter filtered_matrix = wl_matrix[wl_matrix[:, 0] == wl_i] # special work with q resolution if dq_array is None: dq_array_i = None else: dq_array_i = filtered_matrix[:, 4] # bin by Q1D i_final_array, sigma_final_array, bin_q_resolution = _bin_iq1d( q_bins.edges, filtered_matrix[:, 1], dq_array_i, filtered_matrix[:, 2], filtered_matrix[:, 3], ) # build up the final output binned_q_vec = np.concatenate((binned_q_vec, q_bins.centers)) binned_i_vec = np.concatenate((binned_i_vec, i_final_array)) binned_sigma_vec = np.concatenate((binned_sigma_vec, sigma_final_array)) if dq_array is not None: binned_dq_vec = np.concatenate((binned_dq_vec, bin_q_resolution)) binned_wl_vec = np.concatenate((binned_wl_vec, np.zeros_like(i_final_array) + wl_i)) # END-FOR (wl_i) # Construct output # Get the final result by constructing an IQmod object defined in ~drtsans.dataobjects. # IQmod is a class for holding 1D binned data. if dq_array is None: binned_dq_vec = None binned_iq1d = IQmod( intensity=binned_i_vec, error=binned_sigma_vec, mod_q=binned_q_vec, delta_mod_q=binned_dq_vec, wavelength=binned_wl_vec, ) else: raise RuntimeError(f"Number of wavlength bins = {wavelength_bins} is not supported") return binned_iq1d def _do_1d_weighted_binning(q_array, dq_array, iq_array, sigma_iq_array, q_bins, wl_array, wavelength_bins): """Bin I(Q) by given bin edges and do weighted binning This method implements equation 11.22, 11.23 and 11.24 in master document for 1-dimensional Q If there is no Q in a certain Qk bin, NaN will be set to both I(Qk) and sigma I(Qk) General description of algorithm: Equation 11.26 I(Q') = sum_{Q, lambda}^{K} (I(Q, lambda) / sigma(Q, lambda)^2) / sum_{Q, lambda}^{K} (1 / sigma(Q, lambda)^2) Equation 11.27 sigmaI(Q') = sqrt(sum_{Q, lambda}^{K} (sigma(Q, lambda / sigma(Q, lambda)^2)^2) / sum_{Q, lambda}^{K} (1 / sigma(Q, lambda)^2) = sqrt(sum_{Q, lambda}^{K} (1 / sigma(Q, lambda)^2)) / sum_{Q, lambda}^{K}(1/sigma(Q, lambda)^2) = 1 / sqrt(sum_{Q, lambda}^{K} (1 / sigma(Q, lambda)^2)) Equation 11.28 sigmaQ(Q') = sum_{Q, lambda}^{K}(sigmaQ(Q, lambda)/sigma^2(Q, lambda)^2) / sum_{Q, lambda}^{K}(1/sigma(Q, lambda)^2) Parameters ---------- q_array: ndarray scalar momentum transfer Q in 1D array flattened from 2D detector dq_array: ndarray, None scalar momentum transfer (Q) resolution in 1D array flattened from 2D detector iq_array: ndarray I(Q) in 1D array flattened from 2D detector sigma_iq_array: ndarray sigma I(Q) in 1D array flattened from 2D detector q_bins: ~drtsans.determine_bins.Bins Bin centers and edges Returns ------- ~drtsans.dataobjects.IQmod IQmod is a class for holding 1D binned data. """ def _bin_q1d_weighted(mod_q_array, delta_q_array, intensity_array, error_array, bin_edges): """Do 1D weighed binning""" # bin I(Q, wl) regardless of wl value # Calculate 1/sigma^2 for multiple uses invert_sigma2_array = 1.0 / (error_array**2) # Histogram on 1/sigma^2, i.e., nominator part in Equation 11.22, 11.23 and 11.24 # sum_{Q, lambda}^{K} (1 / sigma(Q, lambda)^2) w_array, _ = np.histogram(mod_q_array, bins=bin_edges, weights=invert_sigma2_array) # Calculate Equation 11.26: I(Q) # I(Q') = sum_{Q, lambda}^{K} (I(Q, lambda) / sigma(Q, lambda)^2) / # sum_{Q, lambda}^{K} (1 / sigma(Q, lambda)^2) # denominator in Equation 11.22: sum_{Q, lambda}^{K} (I(Q, lambda) / sigma(Q, lambda)^2) i_raw_array, _ = np.histogram(mod_q_array, bins=bin_edges, weights=intensity_array * invert_sigma2_array) # numerator divided by denominator (11.26) binned_intensity_array = i_raw_array / w_array # Calculate equation 11.27: sigmaI(Q) # sigmaI(Q') = sqrt(sum_{Q, lambda}^{K} (sigma(Q, lambda / sigma(Q, lambda)^2)^2) / # sum_{Q, lambda}^{K} (1 / sigma(Q, lambda)^2) # = sqrt(sum_{Q, lambda}^{K} (1 / sigma(Q, lambda)^2)) / # sum_{Q, lambda}^{K}(1/sigma(Q, lambda)^2) # = 1 / sqrt(sum_{Q, lambda}^{K} (1 / sigma(Q, lambda)^2)) # Thus histogrammed sigmaI can be obtained from histogrammed invert_sigma2_array directly binned_error_array = 1 / np.sqrt(w_array) # Calculate equation 11.28: sigmaQ (i.e., Q resolution) # sigmaQ(Q') = sum_{Q, lambda}^{K}(sigmaQ(Q, lambda)/sigma^2(Q, lambda)^2) / # sum_{Q, lambda}^{K}(1/sigma(Q, lambda)^2) # denominator in Equation 11.28: sum_{Q, lambda}^{K}(1/sigma(Q, lambda)^2) if delta_q_array is None: binned_dq_array = None else: binned_dq, _ = np.histogram(mod_q_array, bins=bin_edges, weights=delta_q_array * invert_sigma2_array) # numerator divided by denominator (11.28) binned_dq_array = binned_dq / w_array return binned_intensity_array, binned_error_array, binned_dq_array # Check input assert q_bins.centers.shape[0] + 1 == q_bins.edges.shape[0] if wl_array is None or wavelength_bins == 1: # bin I(Q, wl) regardless of wl value i_final_array, sigma_final_array, bin_q_resolution = _bin_q1d_weighted( q_array, dq_array, iq_array, sigma_iq_array, q_bins.edges ) binned_i_of_q = IQmod( intensity=i_final_array, error=sigma_final_array, mod_q=q_bins.centers, delta_mod_q=bin_q_resolution, ) elif wavelength_bins is None: # bin I(Q, wl) per wavelength unique_wl_vec = np.unique(wl_array) unique_wl_vec.sort() # construct a 2D array for filtering if dq_array is None: wl_matrix = np.array([wl_array, q_array, iq_array, sigma_iq_array]) else: wl_matrix = np.array([wl_array, q_array, iq_array, sigma_iq_array, dq_array]) wl_matrix = wl_matrix.transpose() # define output binned_q_vec = binned_dq_vec = binned_i_vec = binned_sigma_vec = binned_wl_vec = np.ndarray( shape=(0,), dtype=float ) for wl_i in unique_wl_vec: # filter filtered_matrix = wl_matrix[wl_matrix[:, 0] == wl_i] # special work with q resolution if dq_array is None: dq_array_i = None else: dq_array_i = filtered_matrix[:, 4] # bin by Q1D binned = _bin_q1d_weighted( mod_q_array=filtered_matrix[:, 1], delta_q_array=dq_array_i, intensity_array=filtered_matrix[:, 2], error_array=filtered_matrix[:, 3], bin_edges=q_bins.edges, ) i_final_array, sigma_final_array, bin_q_resolution = binned # build up the final output binned_q_vec = np.concatenate((binned_q_vec, q_bins.centers)) binned_i_vec = np.concatenate((binned_i_vec, i_final_array)) binned_sigma_vec = np.concatenate((binned_sigma_vec, sigma_final_array)) if dq_array is not None: binned_dq_vec = np.concatenate((binned_dq_vec, bin_q_resolution)) binned_wl_vec = np.concatenate((binned_wl_vec, np.zeros_like(i_final_array) + wl_i)) # END-FOR (wl_i) # Construct output # Get the final result by constructing an IQmod object defined in ~drtsans.dataobjects. # IQmod is a class for holding 1D binned data. if dq_array is None: binned_dq_vec = None binned_i_of_q = IQmod( intensity=binned_i_vec, error=binned_sigma_vec, mod_q=binned_q_vec, delta_mod_q=binned_dq_vec, wavelength=binned_wl_vec, ) else: raise RuntimeError(f"Binning with wavelength bins = {wavelength_bins} is not supported") # Get the final result by constructing an IQmod object defined in ~drtsans.dataobjects. # IQmod is a class for holding 1D binned data. return binned_i_of_q
[docs] def bin_intensity_into_q2d(i_of_q, qx_bins, qy_bins, method=BinningMethod.NOWEIGHT, wavelength_bins=1): """Bin I(Qx, Qy) into to new (Qx, Qy) bins Note 1: for binning parameters: - 'min': float or None. If None, set to default as min(Qx) (or Qy) - 'max': float or None. If None, set to default as max(Qx) (or Qy) - 'bins': integer as number of bins Note 2: output Intensity, error, dqx an dqy are in following order - qx = [[qx0, qx1, ...], [qx0, qx1, ...], ...] - qy = [[qy0, qy0, ...], [qy1, qy1, ...], ...] Parameters ---------- i_of_q: ~drtsans.dataobjects.IQazimuthal class IQazimuthal(namedtuple('IQazimuthal', 'intensity error qx qy delta_qx delta_qy wavelength')) qx_bins : Bins namedtuple for arbitrary bin edges and bin centers for Qx qy_bins : Bins namedtuple for arbitrary bin edges and bin centers for Qy method: ~drtsans.BinningMethod Weighted binning or no weight binning wavelength_bins: None, int number of binned wavelength. If None, do not bin. If equal to 1, bin all wavelength together Returns ------- ~drtsans.dataobjects.IQazimuthal binned IQazimuthal (important: must read Note 2) """ # Check input I(Q) whether it meets assumptions check_iq_for_binning(i_of_q) # Check whether it needs to bin wavelength if wavelength_bins == 1 or i_of_q.wavelength is None: # no need to do 2D binning by filtering wavelength filter_wavelength = False else: # bin I(qx, qy, wl) by each wave length filter_wavelength = True if method == BinningMethod.NOWEIGHT: # Calculate no-weight binning binned_arrays = _do_2d_no_weight_binning( i_of_q.qx, i_of_q.delta_qx, i_of_q.qy, i_of_q.delta_qy, i_of_q.wavelength, i_of_q.intensity, i_of_q.error, qx_bins.edges, qy_bins.edges, debug_filter_wl=filter_wavelength, sum_all_wavelengths=not filter_wavelength, ) else: # Calculate weighed binning # FIXME - _do_2d_weighted_binning API shall be changed as _do_2d_no_weight_binning() for sum_all_wavelength binned_arrays = _do_2d_weighted_binning( i_of_q.qx, i_of_q.delta_qx, i_of_q.qy, i_of_q.delta_qy, i_of_q.wavelength, i_of_q.intensity, i_of_q.error, qx_bins.edges, qy_bins.edges, ) # END-IF-ELSE # construct return binned_intensities, binned_sigmas, binned_dqx, binned_dqy, binned_wl = binned_arrays # create Qx and Qy meshgrid explicitly # this must agree with the return from histogram2D, which is as # qx = [[qx0, qx0, ...], # [qx1, qx1, ...], # ...] # qy = [[qy0, qy1, ...], # [qy0, qy1, ...], # ...] # Thus indexing='ij' is used qx_matrix, qy_matrix = np.meshgrid(qx_bins.centers, qy_bins.centers, indexing="ij") binned_qx_array = qx_matrix binned_qy_array = qy_matrix unique_wl_vec = np.unique(i_of_q.wavelength) unique_wl_vec.sort() if i_of_q.wavelength is not None and filter_wavelength: for wl_i in unique_wl_vec[1:]: binned_qx_array = np.concatenate((binned_qx_array, qx_matrix), axis=1) binned_qy_array = np.concatenate((binned_qy_array, qy_matrix), axis=1) return IQazimuthal( intensity=binned_intensities, error=binned_sigmas, qx=binned_qx_array, delta_qx=binned_dqx, qy=binned_qy_array, delta_qy=binned_dqy, wavelength=binned_wl, )
def _bin_iq2d(qx_bin_edges, qy_bin_edges, qx_vec, qy_vec, dqx_vec, dqy_vec, i_vec, error_vec): """Bin I(Q2D), dI(Q2D) and dQ(Q2D) by no weight binning algorithm Parameters ---------- qx_bin_edges: ~numpy.ndarray bin edges qy_bin_edges: ~numpy.ndarray bin edges qx_vec: ~numpy.ndarray array of Q2D qy_vec: ~numpy.ndarray array of Q2D dqx_vec: ~numpy.ndarray or None array for Q2D resolution. May be None dqy_vec: ~numpy.ndarray or None array for Q2D resolution. May be None i_vec: ~numpy.ndarray 2D array of intensity error_vec: ~numpy.ndarray 2D array of intensity error Returns ------- ~tuple binned intensity vector, binned intensity error vector, binned qx resolution vector, binned qy resolution vector """ # Number of I(q) in each target Q bin num_pt_array, *_ = np.histogram2d(qx_vec, qy_vec, bins=(qx_bin_edges, qy_bin_edges)) # Counts per bin: I_{k, raw} = \sum I(i, j) for each bin i_raw_array, *_ = np.histogram2d(qx_vec, qy_vec, bins=(qx_bin_edges, qy_bin_edges), weights=i_vec) # Square of summed uncertainties for each bin sigma_sqr_array, *_ = np.histogram2d(qx_vec, qy_vec, bins=(qx_bin_edges, qy_bin_edges), weights=error_vec**2) # Q resolution: simple average dqx_raw_array, *_ = np.histogram2d(qx_vec, qy_vec, bins=(qx_bin_edges, qy_bin_edges), weights=dqx_vec) dqy_raw_array, *_ = np.histogram2d(qx_vec, qy_vec, bins=(qx_bin_edges, qy_bin_edges), weights=dqy_vec) # Final I(Q): I_{k, final} = \frac{I_{k, raw}}{Nk} # sigma = 1/sqrt(w_k) i_final_array = i_raw_array / num_pt_array sigma_final_array = np.sqrt(sigma_sqr_array) / num_pt_array dqx_final_array = dqx_raw_array / num_pt_array dqy_final_array = dqy_raw_array / num_pt_array return i_final_array, sigma_final_array, dqx_final_array, dqy_final_array def _do_2d_no_weight_binning( qx_array, dqx_array, qy_array, dqy_array, wl_array, iq_array, sigma_iq_array, qx_bin_edges, qy_bin_edges, sum_all_wavelengths: bool = True, debug_filter_wl: bool = False, ): """Perform 2D no-weight binning on I(Qx, Qy) General description of the algorithm: I_{i, j} = sum^{(i, j)}_k I_{k} / N_{i, j} sigma I_{i, j} = sqrt(sum^{(i, j)}_k sigma I_k^2) / N_{i, j} Parameters ---------- qx_array: ndarray Qx array dqx_array: ndarray Qx resolution qy_array : ndarray Qy array dqy_array: ndarray Qy resolution wl_array: ndarray or None wavelengths iq_array: ndarray intensities sigma_iq_array: ndarray intensities error qx_bin: ~drtsans.determine_bins.Bins Bin centers and edges qy_bin: Bin centers and edges sum_all_wavelengths: bool Flag to bin I(qx, qy, wavelength) by qx and qy only. Wavelength term will then be thrown away Returns ------- ndarray, ndarray, ndarray, ndarray, ndarray intensities (n x m x o), sigma intensities (n x m x o), Qx resolution (n x m x o), Qy resolution (n x m x o), Wavelengths (o) """ if wl_array is None or sum_all_wavelengths: # bin only by (qx, qy). all I(qx, qy, wavelength) with binned regardless of wavelength value # output will be I(Qx, Qy) ( binned_iq_array, binned_sigma_iq_array, binned_dqx_array, binned_dqy_array, ) = _bin_iq2d( qx_bin_edges, qy_bin_edges, qx_array, qy_array, dqx_array, dqy_array, iq_array, sigma_iq_array, ) binned_wl_array = None else: # separate I(qx, qy, wavelength) by wavelength value and bin (qx, qy) # output will be I(Qx, Qy, wavelength) if debug_filter_wl is False and len(wl_array) > 1: raise RuntimeError("It is not supposed to do binning with wavelength term kept.") unique_wl_vec = np.unique(wl_array) unique_wl_vec.sort() # construct a 2D array for filtering if dqx_array is None: wl_matrix = np.array([wl_array, qx_array, qy_array, iq_array, sigma_iq_array]) else: wl_matrix = np.array( [ wl_array, qx_array, qy_array, iq_array, sigma_iq_array, dqx_array, dqy_array, ] ) wl_matrix = wl_matrix.transpose() binned_iq_array = np.ndarray(shape=(0,), dtype=float) binned_sigma_iq_array = binned_wl_array = np.ndarray(shape=(0,), dtype=float) # Initialize binned dqx and dqy arrays if dqx_array is not None: binned_dqx_array = np.ndarray(shape=(0,), dtype=float) binned_dqy_array = np.ndarray(shape=(0,), dtype=float) else: binned_dqx_array = binned_dqy_array = None for wl_i in unique_wl_vec: filtered_matrix = wl_matrix[wl_matrix[:, 0] == wl_i] # special work with q resolution if dqx_array is None: dqx_array_i = None dqy_array_i = None else: dqx_array_i = filtered_matrix[:, 5] dqy_array_i = filtered_matrix[:, 6] # bin by Q2D ( i_final_array, sigma_final_array, dqx_final_array, dqy_final_array, ) = _bin_iq2d( qx_bin_edges, qy_bin_edges, filtered_matrix[:, 1], filtered_matrix[:, 2], dqx_array_i, dqy_array_i, filtered_matrix[:, 3], filtered_matrix[:, 4], ) # build up the final output binned_iq_array = ( np.concatenate((binned_iq_array, i_final_array), axis=1) if binned_iq_array.size else i_final_array ) binned_sigma_iq_array = ( np.concatenate((binned_sigma_iq_array, sigma_final_array), axis=1) if binned_sigma_iq_array.size else sigma_final_array ) if dqx_array is not None: binned_dqx_array = ( np.concatenate((binned_dqx_array, dqx_final_array), axis=1) if binned_dqx_array.size else dqx_final_array ) binned_dqy_array = ( np.concatenate((binned_dqy_array, dqy_final_array), axis=1) if binned_dqy_array.size else dqy_final_array ) binned_wl_array = ( np.concatenate((binned_wl_array, np.zeros_like(i_final_array) + wl_i), axis=1) if binned_wl_array.size else np.zeros_like(i_final_array) + wl_i ) # END-FOR (wl_i) if dqx_array is None: binned_dqx_array = None binned_dqy_array = None return ( binned_iq_array, binned_sigma_iq_array, binned_dqx_array, binned_dqy_array, binned_wl_array, ) def _do_2d_weighted_binning( qx_array, dqx_array, qy_array, dqy_array, wl_array, iq_array, sigma_iq_array, x_bin_edges, y_bin_edges, ): """Perform 2D weighted binning General description of algorithm: I(x', y') = sum_{x, y, lambda}^{K} (I(x, y, lambda) / sigma(x, y, lambda)^2) / sum_{x, y, lambda}^{K} (1 / sigma(x, y, lambda)^2) sigmaI(x', y') = sqrt(sum_{x, y, lambda}^{K} (sigma(x, y, lambda / sigma(x, y, lambda)^2)^2) / sum_{x, y, lambda}^{K} (1 / sigma(x, y, lambda)^2) = sqrt(sum_{x, y, lambda}^{K} (1 / sigma(x, y, lambda)^2)) / sum_{x, y, lambda}^{K}(1/sigma(x, y, lambda)^2) = 1 / sqrt(sum_{x, y, lambda}^{K} (1 / sigma(x, y, lambda)^2)) sigmaQ(x', y') = sum_{x, y, lambda}^{K}(sigmaQ(x, y, lambda)/sigma^2(x, y, lambda)^2) / sum_{x, y, lambda}^{K}(1/sigma(x, y, lambda)^2) where K is the set of (x, y, sigma) such that (x, y, sigma) in the same Q_bin Parameters ---------- qx_array : ndarray qx dqx_array : ndarray Qx resolution qy_array: ndarray qy dqy_array: ndarray Qy resolution wl_array : ndarray wavelengths iq_array : ndarray intensities sigma_iq_array : ndarray intensity errors x_bin_edges : ndarray X bin edges y_bin_edges Y bin edges Returns ------- ndarray, ndarray, ndarray, ndarray binned intensities (n x m), binned sigmas (n x m), binned Qx resolution (n x m), binned Qy resolution (n x m), binned wavelength (n x m) """ unique_wl_vec = np.unique(wl_array) unique_wl_vec.sort() if wl_array is None or len(unique_wl_vec) == 1: # Calculate 1/sigma^2 for multiple uses invert_sigma2_array = 1.0 / (sigma_iq_array**2) # 1D # Histogram on 1/sigma^2, i.e., nominator part in Equation 11.22, 11.23 and 11.24 # sum_{x, y, lambda}^{K} (1 / sigma(x, y, lambda)^2) w_2d_array, *_ = np.histogram2d( qx_array, qy_array, bins=(x_bin_edges, y_bin_edges), weights=invert_sigma2_array, ) # 2D # Calculate Equation 11.22: I(Qx, Qy) # I(x', y') = sum_{x, y, lambda}^{K} (I(x, y, lambda) / sigma(x, y, lambda)^2) / # sum_{x, y, lambda}^{K} (1 / sigma(x, y, lambda)^2) # denominator in Equation 11.22: sum_{x, y, lambda}^{K} (I(x, y, lambda) / sigma(x, y, lambda)^2) i_raw_2d_array, *_ = np.histogram2d( qx_array, qy_array, bins=(x_bin_edges, y_bin_edges), weights=iq_array * invert_sigma2_array, ) # 2D # denominator divided by nominator (11.22) i_final_array = i_raw_2d_array / w_2d_array # Calculate equation 11.23: sigmaI(Q) # sigmaI(x', y') = sqrt(sum_{x, y, lambda}^{K} (sigma(x, y, lambda / sigma(x, y, lambda)^2)^2) / # sum_{x, y, lambda}^{K} (1 / sigma(x, y, lambda)^2) # = sqrt(sum_{x, y, lambda}^{K} (1 / sigma(x, y, lambda)^2)) / # sum_{x, y, lambda}^{K}(1/sigma(x, y, lambda)^2) # = 1 / sqrt(sum_{x, y, lambda}^{K} (1 / sigma(x, y, lambda)^2)) # Thus histogrammed sigmaI can be obtained from histogrammed invert_sigma2_array directly sigma_final_array = 1.0 / np.sqrt(w_2d_array) # Calculate equation 11.24: sigmaQx and sigmaQy (i.e., Q resolution) # sigmaQ(x', y') = sum_{x, y, lambda}^{K}(sigmaQ(x, y, lambda)/sigma^2(x, y, lambda)^2) / # sum_{x, y, lambda}^{K}(1/sigma(x, y, lambda)^2) # denominator in Equation 11.24: sum_{x, y, lambda}^{K}(sigmaQ(x, y, lambda)/sigma^2(x, y, lambda)^2) dqx_raw_array, *_ = np.histogram2d( qx_array, qy_array, bins=(x_bin_edges, y_bin_edges), weights=dqx_array * invert_sigma2_array, ) # 2D dqy_raw_array, *_ = np.histogram2d( qx_array, qy_array, bins=(x_bin_edges, y_bin_edges), weights=dqy_array * invert_sigma2_array, ) # 2D # denominator divided by nominator (11.24) dqx_final_array = dqx_raw_array / w_2d_array # dQx dqy_final_array = dqy_raw_array / w_2d_array # dQy if wl_array is None: wl_final_array = None else: wl_final_array = np.full_like(i_final_array, unique_wl_vec[0]) else: raise NotImplementedError("2D binning with multiple wavelengths is not supported") return ( i_final_array, sigma_final_array, dqx_final_array, dqy_final_array, wl_final_array, ) def explore_binning_issue(ub_index, n_wavelength_bin, ub1d: IQmod, bins_1d): # TODO FIXME - Remove after binning issue is resolved completely # DEBUG BINNING print(f"[PROOF] [{ub_index}] Wavelength bins = {n_wavelength_bin}") if n_wavelength_bin is None: wl_vec = np.unique(ub1d.wavelength) print(f"number of wavelength: {len(wl_vec)}: {wl_vec}") else: wl_vec = None for i_bin in range(5): # print(f'{i_bin}-bin: boundary: {bins_1d.edges[i_bin]}, {bins_1d.edges[i_bin + 1]}') bin_qmin = bins_1d.edges[i_bin] bin_qmax = bins_1d.edges[i_bin + 1] # filter the I(Q) in boundary # >= q_min in_range_i_arrays = ub1d.intensity[ub1d.mod_q >= bin_qmin] in_range_q_arrays = ub1d.mod_q[ub1d.mod_q >= bin_qmin] # < qmax in_range_i_arrays = in_range_i_arrays[in_range_q_arrays < bin_qmax] in_range_q_arrays = in_range_q_arrays[in_range_q_arrays < bin_qmax] sum_intensity = in_range_i_arrays.sum() print( f"{i_bin}-bin: boundary: {bins_1d.edges[i_bin]}, {bins_1d.edges[i_bin + 1]}:" f"num points = {len(in_range_q_arrays)}, " f"sum = {sum_intensity}," f"average = {sum_intensity / len(in_range_q_arrays)}" ) if wl_vec is not None: # wavelength in details in_range_wl_arrays = ub1d.wavelength[ub1d.mod_q >= bin_qmin] in_range_q_arrays = ub1d.mod_q[ub1d.mod_q >= bin_qmin] # < qmax in_range_wl_arrays = in_range_wl_arrays[in_range_q_arrays < bin_qmax] sum_i_per_wl_vec = list() num_pt_per_wl_vec = list() for wl in wl_vec: selected_i_array = in_range_i_arrays[np.abs(in_range_wl_arrays - wl) < 0.001] sum_i_per_wl_vec.append(np.sum(selected_i_array)) num_pt_per_wl_vec.append(len(selected_i_array)) print(f"wl = {wl}: sum = {np.sum(selected_i_array)}, num I(Q) = {len(selected_i_array)}") sum_i_per_wl_vec = np.array(sum_i_per_wl_vec) num_pt_per_wl_vec = np.array(num_pt_per_wl_vec) sum_i = sum_i_per_wl_vec.sum() # binned value can be a little more complicated valid_sum_i_vec = sum_i_per_wl_vec[num_pt_per_wl_vec > 0] valid_num_pt_vec = num_pt_per_wl_vec[num_pt_per_wl_vec > 0] num_valid_ws = len(valid_num_pt_vec) bin_int = np.sum(valid_sum_i_vec / valid_num_pt_vec) / num_valid_ws print(f"Number of I(Q) = {num_pt_per_wl_vec.sum()}, Total I(Q) = {sum_i}, Binned I = {bin_int}")