import numpy as np
import os
from mantid.kernel import logger
r"""
Links to mantid algorithms
https://docs.mantidproject.org/nightly/algorithms/DeleteWorkspace-v1.html
https://docs.mantidproject.org/nightly/algorithms/SaveNexusProcessed-v1.html
https://docs.mantidproject.org/nightly/algorithms/Integration-v1.html
https://docs.mantidproject.org/nightly/algorithms/CreateWorkspace-v1.html
"""
from mantid.simpleapi import (
mtd,
DeleteWorkspace,
SaveNexusProcessed,
Integration,
CreateWorkspace,
)
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans%2Fdetector.py
from drtsans.detector import Component
[docs]
def calculate_sensitivity_correction(
input_workspace,
min_threshold=0.5,
max_threshold=2.0,
poly_order=2,
min_detectors_per_tube=50,
filename=None,
component_name="detector1",
output_workspace=None,
):
"""
Calculate the detector sensitivity
Prerequisites for input workspace:
1. All previously masked values to NaN as required by Numpy functions but not masked pixels for beam centers
2. All masked pixels at beam centers are set to -infinity
**Mantid algorithms used:**
:ref:`SaveNexusProcessed <algm-SaveNexusProcessed-v1>`
:ref:`CreateWorkspace <algm-CreateWorkspace-v1>`
Parameters
----------
input_workspace: str, ~mantid.api.MatrixWorkspace
Workspace to calculate the sensitivity from
min_threshold: float
Minimum threshold for efficiency value.
max_threshold: float
Maximum threshold for efficiency value
poly_order : int
ploy order. default to 2
min_detectors_per_tube : int, optional
Minimum detectors with a value existing in the tube to fit. Only fits
tubes with at least `min_detectors_per_tube` (the default is 50).
component_name : str, optional
Component name to (the default is 'detector1', which is the main
detector)
filename: str
Name of the file to save the sensitivity calculation to
output_workspace: ~mantid.api.MatrixWorkspace
The calculated sensitivity workspace
"""
if output_workspace is None:
output_workspace = "{}_sensitivity".format(input_workspace)
# Wavelength bins are summed together to remove the time-of-flight nature according
# to equations A3.1 and A3.2
input_workspace = mtd[str(input_workspace)]
# Process input workspace such that each spectra shall only have 1 value, i.e., the total
# neutron counts received on that pixel because this ALGORITHM requires total counts on each detector pixel
if input_workspace.blocksize() != 1:
# More than 1 bins in spectra: do integration to single bin
# This is for EQSANS specially
# output workspace name shall be unique and thus won't overwrite any existing one
input_workspace = Integration(InputWorkspace=input_workspace, OutputWorkspace=mtd.unique_hidden_name())
# set flag to delete input workspace later
delete_input_workspace = True
else:
# set flag to keep input workspace
delete_input_workspace = False
# The average and uncertainty in the average are determined from the masked pattern
# according to equations A3.3 and A3.4
# numpy.flatten() used to more easily find the mean and uncertainty using numpy.
y = input_workspace.extractY().flatten()
y_uncertainty = input_workspace.extractE().flatten()
# Normalize the counts and uncertainty on each pixel by total average counts on detector
# calculate the number of spectra that is not masked out (i.e., either NaN or -Infinity)
n_elements = (
input_workspace.getNumberHistograms() - np.count_nonzero(np.isnan(y)) - np.count_nonzero(np.isneginf(y))
)
# calculate average counts on non-masked spectra
# F = sum_i^{|N|}(Y_i)/N: i \in spectra such that Y_i is not NaN or -Infinity
F = np.sum([value for value in y if not np.isnan(value) and not np.isneginf(value)]) / n_elements
# Calculate the average uncertainties on non-masked spectra
dF = (
np.sqrt(np.sum([value**2 for value in y_uncertainty if not np.isnan(value) and not np.isneginf(value)]))
/ n_elements
)
# calculate the normalized counts on each pixel by average count
II = y / F
# calculate the normalized uncertainty on each pixel
dI = II * np.sqrt(np.square(y_uncertainty / y) + np.square(dF / F))
# Any pixel in II less than min_threshold or greater than max_threshold is masked
counts = 0
for i, value in enumerate(II):
if not np.isnan(value) and not np.isneginf(value):
if value < min_threshold or value > max_threshold:
II[i] = np.nan
dI[i] = np.nan
counts += 1
# Get the detector (component) to calculate sensitivity correction for
# 'detector1' for EQSANS, GPSANS and BIOSANS's main detector
# 'wing_detector' or 'midrange_detector' for BIOSANS's wing detector
comp = Component(input_workspace, component_name)
# The next step is to fit the data in each tube with a second order polynomial as shown in
# Equations A3.9 and A3.10. Use result to fill in NaN values.
num_interpolated_tubes = 0
# Loop over all the tubes
num_tubes = comp.dim_x
num_pixels_per_tube = comp.dim_y
for j in range(0, num_tubes):
xx = []
yy = []
ee = []
# beam center masked pixels
beam_center_masked_pixels = []
for i in range(0, num_pixels_per_tube):
index = num_pixels_per_tube * j + i
if np.isneginf(II[index]):
beam_center_masked_pixels.append([i, index])
elif not np.isnan(II[index]):
xx.append(i)
yy.append(II[index])
ee.append(dI[index])
if len(beam_center_masked_pixels) == 0:
# no masked/center masked pixels
# no need to do interpolation
continue
# This shall be an option later
if len(xx) < min_detectors_per_tube:
logger.error(
"Skipping tube with indices {} with {} non-masked value. Too many "
"masked or dead pixels.".format(j, len(xx))
)
continue
# Do poly fit
num_interpolated_tubes += 1
# the weights in a real sensitivity measurement are all going to be very similar,
# so it should not really matter in practice.
# covariance matrix is calculated differently from numpy 1.6
# Refer to: https://numpy.org/devdocs/release/
# 1.16.0-notes.html#the-scaling-of-the-covariance-matrix-in-np-polyfit-is-different
polynomial_coeffs, cov_matrix = np.polyfit(xx, yy, poly_order, w=np.array(ee), cov=True)
# Errors in the least squares is the sqrt of the covariance matrix
# (correlation between the coefficients)
e_coeffs = np.sqrt(np.diag(cov_matrix))
beam_center_masked_pixels = np.array(beam_center_masked_pixels)
# The patch is applied to the results of the previous step to produce S2(m,n).
y_new = np.polyval(polynomial_coeffs, beam_center_masked_pixels[:, 0])
# errors of the polynomial
e_new = np.sqrt(
e_coeffs[2] ** 2
+ (e_coeffs[1] * beam_center_masked_pixels[:, 0]) ** 2
+ (e_coeffs[0] * beam_center_masked_pixels[:, 0] ** 2) ** 2
)
for i, index in enumerate(beam_center_masked_pixels[:, 1]):
II[index] = y_new[i]
dI[index] = e_new[i]
# The final sensitivity, S(m,n), is produced by dividing this result by the average value
# per Equations A3.13 and A3.14
# numpy.flatten() used to more easily find the mean and uncertainty using numpy.
n_elements = (
input_workspace.getNumberHistograms() - np.count_nonzero(np.isnan(II)) - np.count_nonzero(np.isneginf(II))
)
F = np.sum([value for value in II if not np.isnan(value) and not np.isneginf(value)]) / n_elements
dF = np.sqrt(np.sum([value**2 for value in dI if not np.isnan(value) and not np.isneginf(value)])) / n_elements
output = II / F
# propagate uncertainties from covariance matrix to sensitivities' uncertainty
output_uncertainty = output * np.sqrt(np.square(dI / II) + np.square(dF / F))
# Export the calculated sensitivities via Mantid Workspace
# Create a workspace to have sensitivity value and error for each spectrum
CreateWorkspace(
DataX=[1.0, 2.0],
DataY=output,
DataE=output_uncertainty,
Nspec=input_workspace.getNumberHistograms(),
UnitX="wavelength",
OutputWorkspace=output_workspace,
)
# If the input workspace is flagged to delete, delete it
if delete_input_workspace:
DeleteWorkspace(input_workspace)
# If output file name is given, save the workspace as Processed NeXus file
if filename:
path = os.path.join(os.path.expanduser("~"), filename)
SaveNexusProcessed(InputWorkspace=output_workspace, Filename=path)
return mtd[output_workspace]