"""
Module for algorithms to prepare sensitivity for instrument with moving detector
"""
import numpy as np
from drtsans.mask_utils import circular_mask_from_beam_center, apply_mask
import drtsans.mono.gpsans as gp
from mantid.simpleapi import CreateWorkspace, logger
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/issues/205
[docs]
def prepare_sensitivity(flood_data_matrix, flood_sigma_matrix, threshold_min, threshold_max):
"""Prepare sensitivity for moving detector
Data files are processed such that intensities and errors are stored in numpy.ndarray with shape (N, M), where
- N: number of data files to calculate sensitivities
- M: number of pixels (aka spectra) in instrument's detector; The 2D data from 2D detector are flattened to 1D
Prerequisite of the input data:
- Input data has been normalized by monitor counts already
- top and bottom of the detector shall be masked (set to value as NaN) due to edge effects
- in each file, beam center shall be found and masked out
- errors are then calculated from the flood intensities
Workflow to prepare sensitivities
- normalize the flood field data by monitor
- find weighted average for each fie and error
- apply bad pixel threshold to each file
- correct for beam stop and add all the flood files together to non-normalized sensitivities
- apply weighted average to sensitivities
Parameters
----------
flood_data_matrix : ~numpy.ndaray
multiple set of flood data intensities with shape = N, M
flood_sigma_matrix : ~numpy.ndaray
multiple set of flood data intensities' error with shape = N, M
threshold_min : float
minimum allowed detector counts to mask out 'bad' pixels
threshold_max : float
maximum allowed detector counts to mask out 'bad' pixels
Returns
-------
~numpy.ndaray, ~numpy.ndaray
sensitivities, sensitivities error
"""
# There might some zero-count pixels in the flood data, they shall be masked
_mask_zero_count_pixel(flood_data_matrix, flood_sigma_matrix)
# normalize the flood field data by monitor: Normalization is removed from this algorithm to integration
# inputs: (N, M) array; outputs: (N, M) array
# flood_data_matrix, flood_sigma_matrix = _normalize_by_monitor(flood_data_matrix, flood_sigma_matrix,
# monitor_counts)
# find weighted average for each fie and error
# inputs: (N, M) array; outputs: (N, M) array
returns = _calculate_weighted_average_with_error(flood_data_matrix, flood_sigma_matrix)
flood_data_matrix = returns[0]
flood_sigma_matrix = returns[1]
# apply bad pixel threshold to each file
# inputs: (N, M) array; outputs: (N, M) array
flood_data_matrix, flood_sigma_matrix = _apply_sensitivity_thresholds(
flood_data_matrix, flood_sigma_matrix, threshold_min, threshold_max
)
# correct for beam stop and add all the flood files together to non-normalized sensitivities
raw_sensitivities, raw_sensitivities_error = _calculate_pixel_wise_sensitivity(
flood_data_matrix, flood_sigma_matrix
)
# apply weighted average to sensitivities
(
sensitivities,
sensitivities_error,
sens_avg,
sigma_sens_avg,
) = _normalize_sensitivities(raw_sensitivities, raw_sensitivities_error)
return sensitivities, sensitivities_error
def _mask_zero_count_pixel(flood_data_matrix, flood_sigma_matrix):
"""Mask out pixels/elements in the flood data to NaN.
Because these zero count is not considered by the equations to calculate weighted average.
The value will be modified in place to the numpy arrays
Parameters
----------
flood_data_matrix: numpy.ndarray
normalized flood data
flood_sigma_matrix: numpy.ndarray
normalized flood data's error
Returns
-------
"""
# get the zero count elments
zero_count_elements = flood_data_matrix < 1e-12
logger.notice(f"Input flood runs: total {len(np.where(zero_count_elements)[0])} are " f"masked")
# set to NaN
flood_data_matrix[zero_count_elements] = np.nan
flood_sigma_matrix[zero_count_elements] = np.nan
def _calculate_weighted_average_with_error(normalized_data, normalized_error):
"""Calculated weighted average for normalized flood data and error
Average = (sum I(m, n) / sigma^(m, n)) / sum 1 / sigma^2
Parameters
----------
normalized_data : ndarray
normalized flood data
multiple set of flood data. flood_data.shape = (M, N) as M set of N pixels
normalized_error : ndarray
normalized flood data's error
multiple set of flood data. flood_data.shape = (M, N) as M set of N pixels
Returns
-------
ndarray, ndarray, float, float
data normalized by average, data's error normalized by average, Average, sigma(Average)
"""
# Calculate weighted average for each flood file/run (average = sum_{i, j} I(i, j) / sigma^2(i, j))
# For m-th flood file/run
# where (i, j) is the index of a pixel on 2D detector and n is the index of same pixel as the 2D array
# is flattened to 1D.
# np.nansum() is used to exclude NaN from summation
# np.nansum(): https://docs.scipy.org/doc/numpy/reference/generated/numpy.nansum.html
# Summation is done along axis=1, the return, weighted_sum, is a 1D array with shape (M,)
# calculate: sum_{n} I(m, n) / sigma^2(m, n)
weighted_sum = np.nansum(normalized_data / normalized_error**2, axis=1) # summing in row
# calculate: sum_n(1 / sigma^2(m, n))
weights_square = np.nansum(1.0 / normalized_error**2, axis=1)
# average[m] == sum_{n}(I(m, n) / sigma^2(m, n)) / sum_{n}(1 / sigma^2(m, n))
weighted_average = weighted_sum / weights_square
# reshape to (M, 1) for division to input 2D array with shape (M, N)
weighted_average = weighted_average.reshape((normalized_data.shape[0], 1)) # reshape to (N, 1) for division
# calculate: error for weighted average: sigma_avg[m] = 1 / sqrt(sum_{n}(1 / sigma^2(m, n)))
weighted_average_error = 1.0 / np.sqrt(weights_square)
# reshape to (M, 1) for division to input 2D array with shape (M, N)
weighted_average_error = weighted_average_error.reshape((normalized_data.shape[0], 1))
# Normalize data by weighted-average
avg_norm_data = normalized_data / weighted_average
# Propagate uncertainties: sigma S(n) = I(m, n) / avg * [(error(m, n)/I(m, n))^2 + (sigma Avg/Avg)^2]^1/2
# in the sqrt operation, first term is a N x M array and second term is a N x 1 array
avg_norm_error = (
normalized_data
/ weighted_average
* np.sqrt((normalized_error / normalized_data) ** 2 + (weighted_average_error / weighted_average) ** 2)
)
return avg_norm_data, avg_norm_error, weighted_average, weighted_average_error
def _apply_sensitivity_thresholds(data, data_error, threshold_min, threshold_max):
"""Apply bad pixel threshold to each data set including error
If any pixel with counts falling out of allowed threshold, i.e., out of range (min, max)
they will be specified as bad pixels.
For any bad pixel, the counts will then be set to '-inf'
Parameters
----------
data : ndarray
normalized data
data_error : ndarray
normalized data error
threshold_min: float
minimum value of allowed value
threshold_max: float
maximum value of allowed value
Returns
-------
ndarray, ndarray
data with bad pixels set to INF,
data error with bad pixels set to INF
"""
msg = "[drtsans._apply_sensitivity_thresholds]:\n"
msg += f"\tthreshold_min: {threshold_min}\n"
msg += f"\tthreshold_max: {threshold_max}\n"
msg += f"\tnumber of pixels below threshold_min: {len(np.where(data < threshold_min)[0])}\n"
msg += f"\tnumber of pixels above threshold_max: {len(np.where(data > threshold_max)[0])}\n"
logger.notice(msg)
# (data < threshold_min) | (data > threshold_max) returns the list of indexes in array data whose values
# are either smaller than minimum threshold or larger than maximum threshold.
data[(data < threshold_min) | (data > threshold_max)] = -np.inf
data_error[(data < threshold_min) | (data > threshold_max)] = -np.inf
return data, data_error
def _calculate_pixel_wise_sensitivity(flood_data, flood_error):
"""Calculate pixel-wise average of N files to create the new summed file for doing sensitivity correction
# data_a, data_a_error, data_b, data_b_error, data_c, data_c_error
D(m, n) = A_F(m, n) + B_F(m, n) + C_F(m, n) with average weight
Calculate Pixel-wise Average of 3 files to create the new summed file for
doing the sensitivity correction
Parameters
----------
flood_data : ~numpy.ndarray
processed multiple flood files in an N x M array
shape[0]: number of flood files
shape[1]: number of pixels
flood_error : ~numpy.ndarray
processed multiple flood files' error in an N x M array
Returns
-------
~numpy.nparray, ~numpy.nparray
non-normalized sensitivities, non-normalized sensitivities error
1D array as all the flood files are summed
"""
# Keep a record on the array elements with np.inf long axis=0, i.e., same detector pixel among different
# flood files
simple_sum = np.sum(flood_data, axis=0)
# Calculate D'(i, j) = sum_{k}^{A, B, C}M_k(i, j)/s_k^2(i, j)
# 1/s^2(i, j) = sum_{k}^{A, B, C}1/s_k^2(i, j)
# Do weighted summation to the subset and exclude the NaN by np.nansum()
# np.nansum(): https://docs.scipy.org/doc/numpy/reference/generated/numpy.nansum.html
# If there is any element in array that is infinity, the summation of all elements on the specified axis
# i.e., among all flood data files/runs, could be messed up (though every likely the summed value is inf).
s_ij = np.nansum(1.0 / flood_error**2, axis=0) # summation along axis 1: among files
d_ij = np.nansum(flood_data / flood_error**2, axis=0) / s_ij
s_ij = 1.0 / np.sqrt(s_ij)
# In case there is at least an inf in this subset of data along axis=0, i.e., among various flood runs/files,
# set sensitivities to -inf in case nansum() messes up
s_ij[np.isinf(simple_sum)] = -np.inf
d_ij[np.isinf(simple_sum)] = -np.inf
# sensitivities = d_ij
# sensitivities_error = s_ij
return d_ij, s_ij
def _normalize_sensitivities(d_array, sigam_d_array):
"""Do weighted average to pixel-wise sensitivities and propagate the error
And then apply the average to sensitivity
S_avg = sum_{i, j}{D(i, j) / sigma^2(i, j)} / sum_{i, j}{1 / sigma^2(i, j)}
Parameters
----------
d_array : ndarray
pixel-wise sensitivities in 1D array with shape (N,) where N is the number of pixels
sigam_d_array : ndarray
pixel-wise sensitivities error in 1D array with shape (N,) where N is the number of pixels
Returns
-------
ndarray, ndarray, float, float
normalized pixel-wise sensitivities, normalized pixel-wise sensitivities error
scalar sensitivity, error of scalar sensitivity
"""
# Calculate wighted-average of pixel-wise sensitivities: i.e., do the summation on the all pixels
# since the 2D detector is treated as a 1D array in this method.
# Each (i, j) has a unique value p to be mapped to n.
# Any NaN terms and Infinity terms (for bad pixels) shall be excluded from summation
# ~(np.isinf(d_array) | np.isnan(d_array) gives out the indexes of elements in d_array that are not NaN or Inf
# calculate denominator: denominator = sum_{i, j}{D(i, j) / sigma^2(i, j)} = sum_{n}(D(n) / sigma^2(n))
denominator = np.sum(
d_array[~(np.isinf(d_array) | np.isnan(d_array))]
/ sigam_d_array[~(np.isinf(d_array) | np.isnan(d_array))] ** 2
)
# calculate nominator: nominator = sum_{m, n}{1 / sigma^2(m, n)}
nominator = np.sum(1 / sigam_d_array[~(np.isinf(d_array) | np.isnan(d_array))] ** 2)
sens_avg = denominator / nominator
# Normalize pixel-wise sensitivities
sensitivities = d_array / sens_avg
# Calculate scalar sensitivity's error:
# sigma_S_avg = sqrt(1 / sum_{m, n}(1 / sigma_D(m, n)^2))
# for all D(m, n) are not NaN
# Thus, all the NaN terms shall be excluded from summation
# All the infinity terms shall be ignored because (1/inf) is zero and has no contribution in summation
# d_array[~(np.isinf(d_array) | np.isnan(d_array))] excludes all items that are either infinity or Nan
sigma_sens_avg = np.sqrt(1 / np.sum(1 / sigam_d_array[~(np.isinf(d_array) | np.isnan(d_array))] ** 2))
# Propagate the sensitivities
# sigma_sens(m, n) = D(m, n) / S_avg * [(sigma_D(m, n) / D(m, n))^2 + (sigma_S_avg / S_avg)^2]^{1/2}
# D(m, n) are the non-normalized sensitivities
sensitivities_error = (
d_array / sens_avg * np.sqrt((sigam_d_array / d_array) ** 2 + (sigma_sens_avg / sens_avg) ** 2)
)
# set sensitivities error to -infinity if sensitivities are
sensitivities_error[np.isinf(sensitivities)] = -np.inf
return sensitivities, sensitivities_error, sens_avg, sigma_sens_avg
[docs]
def mask_beam_center(data_ws, beam_center_ws, beam_center_radius):
"""Mask detectors in a workspace
Mask (1) beam center
Parameters
----------
data_ws : ~mantid.api.MatrixWorkspace
Flood data workspace
beam_center_ws : ~mantid.api.MatrixWorkspace
Beam center workspace used to generate beam center mask
beam_center_radius : float
beam center radius in unit of mm
Returns
-------
~mantid.api.MatrixWorkspace
"""
# Use beam center ws to find beam center
xc, yc = gp.find_beam_center(beam_center_ws)
# Center detector to the data workspace (change in geometry)
gp.center_detector(data_ws, xc, yc)
# Mask the new beam center by 65 mm (Lisa's magic number)
det = list(circular_mask_from_beam_center(data_ws, beam_center_radius))
apply_mask(data_ws, mask=det) # data_ws reference shall not be invalidated here!
return data_ws
[docs]
def calculate_sensitivity_correction(flood_run_ws_list, threshold_min, threshold_max):
"""Prepare sensitivities with
Parameters
----------
flood_run_ws_list : ~list
List of references to Mantid workspaces for normalized and masked (default and beam center)
threshold_min : float
minimum threshold
threshold_max : float
maximum threshold
Returns
-------
~mantid.api.MatrixWorkspace
Reference to the events workspace
"""
# Set the flood/beam center pair
num_ws_pairs = len(flood_run_ws_list)
# number of histograms
num_spec = flood_run_ws_list[0].getNumberHistograms()
# Combine to numpy arrays: N, M
flood_array = np.ndarray(shape=(num_ws_pairs, num_spec), dtype=float)
sigma_array = np.ndarray(shape=(num_ws_pairs, num_spec), dtype=float)
for f_index in range(num_ws_pairs):
flood_array[f_index][:] = flood_run_ws_list[f_index].extractY().transpose()[0]
sigma_array[f_index][:] = flood_run_ws_list[f_index].extractE().transpose()[0]
# Convert all masked pixels' counts to NaN
masked_items = np.where(sigma_array < 1e-16)
# Logging
msg = "[drtsans.calculate_sensitivity_correction]:\n"
msg += f"\tNumber of zero counts: {np.where(flood_array < 1e-16)[0].shape}\n"
msg += f"\tNumber of zero sigmas: {np.where(sigma_array < 1e-16)[0].shape}\n"
logger.notice(msg)
# set values to masked pixels
flood_array[masked_items] = np.nan
sigma_array[masked_items] = np.nan
# Calculate sensitivities
sens_array, sens_sigma_array = prepare_sensitivity(
flood_data_matrix=flood_array,
flood_sigma_matrix=sigma_array,
threshold_min=threshold_min,
threshold_max=threshold_max,
)
# Convert all -infinity to nan
sens_sigma_array[np.where(np.isinf(sens_array))] = np.nan
sens_array[np.where(np.isinf(sens_array))] = np.nan
# Export to a MatrixWorkspace
# Create a workspace for sensitivities
vec_x = flood_run_ws_list[0].extractX().flatten()
sens_ws_name = "sensitivities"
# Create output workspace
nexus_ws = CreateWorkspace(
DataX=vec_x,
DataY=sens_array,
DataE=sens_sigma_array,
NSpec=num_spec,
UnitX="wavelength",
OutputWorkspace=sens_ws_name,
)
return nexus_ws