import numpy as np
import os
from typing import List, Tuple
from drtsans.dataobjects import I1DAnnular
from drtsans.determine_bins import determine_1d_linear_bins
from drtsans.iq import BinningMethod, BinningParams, bin_annular_into_q1d
# https://docs.mantidproject.org/nightly/algorithms/DeleteWorkspace-v1.html
from mantid.simpleapi import DeleteWorkspace, logger, Gaussian, FlatBackground, mtd
# https://docs.mantidproject.org/nightly/algorithms/Fit-v1.html
from mantid.simpleapi import Fit
from mantid.api import IFunction
import h5py
from matplotlib import pyplot as plt
__all__ = ["getWedgeSelection"]
# factor to convert from a sigma to full width half max
# https://docs.mantidproject.org/nightly/fitting/fitfunctions/Gaussian.html
SIGMA_TO_FWHM = 2.0 * np.sqrt(2.0 * np.log(2.0))
MANTID_FUNCTION_MAP = {Gaussian().name: Gaussian, FlatBackground().name: FlatBackground}
def _create_fit_function(combo_function_str):
"""Create fit function from a Mantid combo function string
An example:
name=FlatBackground,A0=5894.8246694221725;name=Gaussian,Height=16626.818784787953,
PeakCentre=100.43905821664455,Sigma=5.917594016360494;name=Gaussian,Height=18517.052761733517,
PeakCentre=304.6715713990027,Sigma=36.943377164429435
Parameters
----------
combo_function_str: str
function string
Returns
-------
~list
list of Mantid IFunction in order of
"""
# split to multiple functions
single_function_str_list = combo_function_str.split(";")
single_functions = list()
# convert to function
for single_func in single_function_str_list:
# split with ','
params = single_func.split(",")
# create function
func_name = params[0].split("=")[1]
func_i = MANTID_FUNCTION_MAP[func_name]()
# start to set parameters
for value_index in range(1, len(params)):
# split in PARAM_NAME=PARAM_VALUE style
param_list = params[value_index].split("=")
param_name_i = param_list[0]
param_value_i = param_list[1]
if param_name_i == "constraints":
func_i.constrain(param_value_i)
else:
func_i.__setattr__(param_name_i, float(param_value_i))
# END-FOR
single_functions.append(func_i)
return single_functions
def _set_function_param_value(functions, param_name, param_value):
"""set function parameter value from the result of a combo function
Parameters
----------
functions: ~list
list of functions
param_name: str
function parameter name may have 2 forms: PARAM_NAME or fI.PARAM_NAME, where I is an integer
param_value: float
parameter value
Returns
-------
None
"""
# Check parameter name
if "." in param_name:
terms = param_name.split(".")
function_index = int(terms[0].split("f")[-1])
param_name = terms[1]
else:
function_index = 0
# set
functions[function_index].__setattr__(param_name, param_value)
def _calculate_function(fit_functions, vec_x):
"""Calculate a function (set)
Parameters
----------
fit_functions: ~list[IFunction]
List of fit function
vec_x: numpy.ndarray
1D array for X values
Returns
-------
numpy.ndarray
1D array for calculated function values
"""
# Create the combo function from a list of functions
combo_function = fit_functions[0]
for f_i in range(1, len(fit_functions)):
combo_function += fit_functions[f_i]
# Call the create
vec_y = combo_function.__call__(vec_x)
return vec_y
def _plot_fit_results(rings, peak_fit_dict, output_dir, auto_symmetric_wedges):
"""Plot original data and fit result
Parameters
----------
rings: ~list
List of I(Qx, Qy) in rings
peak_fit_dict: ~dict
dictionary containing all the peak fitting result
output_dir: str
full path of the output directory
auto_symmetric_wedges: bool
if True, label the second peak as mirrored
Returns
-------
"""
def _set_function_params_and_calculate(fit_functions: list[IFunction], peak_fitted_dict: dict):
for param_name in peak_fitted_dict:
if param_name not in ["fit_function", "error", "used"]:
# parameter value is recorded as tuple as value and error
_set_function_param_value(fit_function_set, param_name, peak_fitted_dict[param_name][0])
# calculate
model_y = _calculate_function(fit_functions, ring.phi)
return model_y
for index, ring in enumerate(rings):
# add fitting related information to hdf file
if peak_fit_dict[index]["error"] is not None:
# bad fit
continue
# construct function from estimated
fit_function_str = peak_fit_dict[index]["fit_function"]
fit_function_set = _create_fit_function(fit_function_str)
# calculate estimated peaks
estimated_y = _calculate_function(fit_function_set, ring.phi)
if auto_symmetric_wedges:
# separate fitted peak ("f1" params) from mirrored peak (fake "f2" params) to plot them separately
# the fit function is a single gaussian with only "f1" entries
peak_fitted_dict = {}
peak_mirrored_dict = {}
for key, val in peak_fit_dict[index].items():
if key.startswith("f2"):
new_key = key.replace("f2", "f1")
# shift mirrored peak into plot angle range
new_value = (val[0] - 360.0, val[1]) if "PeakCentre" in key and val[0] > 360.0 else val
peak_mirrored_dict[new_key] = new_value
else:
peak_fitted_dict[key] = val
else:
peak_fitted_dict = peak_fit_dict[index]
model_y = _set_function_params_and_calculate(fit_function_set, peak_fitted_dict)
# plot
plt.cla()
plt.plot(ring.phi, ring.intensity, label="observed", color="black")
plt.plot(ring.phi, estimated_y, label="estimated", color="blue")
plt.plot(ring.phi, model_y, label="fitted", color="red")
if auto_symmetric_wedges:
mirrored_model_y = _set_function_params_and_calculate(fit_function_set, peak_mirrored_dict)
plt.plot(ring.phi, mirrored_model_y, label="mirrored", color="red", linestyle="dashed")
plt.legend(loc="upper left")
plt.savefig(os.path.join(output_dir, f"ring_{index:01}.png"))
def _export_to_h5(iq2d, rings, azimuthal_delta, peak_fit_dict, output_dir):
"""write annular binned I(Qx, Qy) and intensity rings to hdf5 for further review
Parameters
----------
iq2d: ~drtsans.dataobjects.Azimuthal
rings: ~list
List of I(Qx, Qy) in rings
peak_fit_dict: ~dict
dictionary containing all the peak fitting result
output_dir: str
full path of the output directory
Returns
-------
"""
# annular binning on the full range
azimuthal_offset = 0.5 * azimuthal_delta
azimuthal_binning = BinningParams(
0.0 - azimuthal_offset,
360.0 - azimuthal_offset,
bins=int(360.0 / azimuthal_delta),
)
q1d = np.sqrt(iq2d.qx**2 + iq2d.qy**2)
logger.notice(f"[DEBUG] I(Qx, Qy) Q range: {q1d.min()}, {q1d.max()}")
full_range_annular = bin_annular_into_q1d(iq2d, azimuthal_binning, q1d.min(), q1d.max(), BinningMethod.NOWEIGHT)
# open
debug_h5 = h5py.File(os.path.join(output_dir, "auto_wedge_fit.h5"), "w")
# define h5py string type
h5_string_type = h5py.special_dtype(vlen=bytes)
# create 3 groups for result
ring_group = debug_h5.create_group("rings")
fit_group = debug_h5.create_group("good fit")
nofit_group = debug_h5.create_group("no fit")
# write full range
group = ring_group.create_group("full range")
group.create_dataset("phi", data=full_range_annular.phi)
group.create_dataset("intensity", data=full_range_annular.intensity)
# write out for each ring
func_param_dict = dict()
for index, ring in enumerate(rings):
# add data to hdf file
group = ring_group.create_group(f"ring {index}")
group.create_dataset("phi", data=ring.phi)
group.create_dataset("intensity", data=ring.intensity)
# add fitting related information to hdf file
if peak_fit_dict[index]["error"] is None:
# good fit
group = fit_group.create_group(f"ring {index}")
for param_name in peak_fit_dict[index]:
# ignore non function parameter
if param_name in ["fit_function", "error", "used"]:
continue
# create sub dictionary if it does not exist
if param_name not in func_param_dict:
func_param_dict[param_name] = list()
# add value to dictionary from fit result dictionary
param_value, param_error = peak_fit_dict[index][param_name]
func_param_dict[param_name].append([index, param_value, param_error])
else:
# no fit
group = nofit_group.create_group(f"ring {index}")
# error message
string_data_set = group.create_dataset("error", (1,), dtype=h5_string_type)
string_data_set[0] = peak_fit_dict[index]["error"]
# fit function
if "fit_function" in peak_fit_dict[index]:
function_data_set = group.create_dataset("function", (1,), dtype=h5_string_type)
function_data_set[0] = peak_fit_dict[index]["fit_function"]
# add peak fitting result
for param_name, param_value in func_param_dict.items():
# form data set
data_set = np.array(param_value)
fit_group.create_dataset(param_name, data=data_set)
# close
debug_h5.close()
[docs]
def getWedgeSelection(
data2d,
q_min,
q_delta,
q_max,
azimuthal_delta,
peak_width=0.25,
background_width=1.5,
signal_to_noise_min=2.0,
peak_search_window_size_factor=0.6,
debug_dir="/tmp/",
auto_wedge_phi_min=0,
auto_wedge_phi_max=360,
auto_symmetric_wedges=False,
) -> List[List[Tuple[float, float]]]:
r"""
Calculate azimuthal binning ranges automatically based on finding peaks in the annular ring. The
output of this is intended to be used in :py:func:`~drtsans.iq.select_i_of_q_by_wedge`.
Parameters
==========
data2d: ~drtsans.dataobjects.Azimuthal
The 2D data object containing the azimuthal data.
q_min: float
The left bin boundary for the first Q-bin.
q_delta: float
The size of the bins in Q.
q_max: float
The left bin boundary for the last Q-bin.
azimuthal_delta: float
The size of the bins in azimuthal angle.
peak_width: float
Percent of full-width-half-max (FWHM) of the peak to define the signal to be within when
determining the final range for azimuthal binning.
background_width: float
Percent of full-width-half-max (FWHM) of the peak to define the background between peaks
to be within when determining the final range for azimuthal binning.
signal_to_noise_min: float
Minimum signal to noise ratio for the data to be considered fittable.
peak_search_window_size_factor: float
Factor of 360 / (num peaks) to construct the search range for wedge peak.
debug_dir: str
Full path of the output directory for debugging output files
auto_wedge_phi_min: float
minimum angle to find a wedge for subsequent symmetrisation. Used if symmetric_wedges is true
auto_wedge_phi_max: float
maximum angle to find a wedge for subsequent symmetrisation. Used if symmetric_wedges is true
auto_symmetric_wedges: bool
find a wedge between auto_wedge_phi_min and auto_wedge_phi_max, then symmetrize the findings to the remaining
phi angle domain
Returns
=======
list
A two-item list. The first item contains the azimuthal ranges for the two wedges
that enclose the intensity maxima. The second item contains the azimuthal ranges for the
background regions associated to the wedges.
The first item is `[(wedge1\_phimin, wedge1\_phimax), (wedge2\_phimin, wedge2\_phimax)]`,
and the second item is `[(back1\_phimin, back1\_phimax), (back2\_phimin, back2\_phimax)]]`.
"""
# Bin azimuthal
q, azimuthal_rings = _binInQAndAzimuthal(
data2d,
q_min=q_min,
q_max=q_max,
q_delta=q_delta,
azimuthal_delta=azimuthal_delta,
)
fit_results_tuple = _fitQAndAzimuthal(
azimuthal_rings,
q_bins=q,
signal_to_noise_min=signal_to_noise_min,
azimuthal_start=110.0,
maxchisq=1000.0,
peak_search_window_size_factor=peak_search_window_size_factor,
auto_wedge_phi_min=auto_wedge_phi_min,
auto_wedge_phi_max=auto_wedge_phi_max,
auto_symmetric_wedges=auto_symmetric_wedges,
)
center_vec, fwhm_vec, fit_dict = fit_results_tuple
# Export fitting result
_export_to_h5(
iq2d=data2d,
rings=azimuthal_rings,
azimuthal_delta=azimuthal_delta,
peak_fit_dict=fit_dict,
output_dir=debug_dir,
)
_plot_fit_results(azimuthal_rings, fit_dict, debug_dir, auto_symmetric_wedges)
# verify that the results didn't predict wedges larger than half of the data
if np.any(np.array(fwhm_vec) > 360.0 / 2):
values = ["{:.1f}deg".format(value) for value in fwhm_vec]
raise RuntimeError("Encountered large fwhm values: {}".format(", ".join(values)))
# convert to min and max ranges
min_vec, max_vec = [], []
# azimuthal range for the first wedge enclosing an intensity maximum
min_vec.append(center_vec[0] - peak_width * fwhm_vec[0])
max_vec.append(center_vec[0] + peak_width * fwhm_vec[0])
# azimuthal range enclosing the background for the first wedge
min_vec.append(center_vec[0] + background_width * fwhm_vec[0])
max_vec.append(center_vec[1] - background_width * fwhm_vec[1])
# azimuthal range for the second wedge enclosing an intensity maximum
min_vec.append(center_vec[1] - peak_width * fwhm_vec[1])
max_vec.append(center_vec[1] + peak_width * fwhm_vec[1])
# azimuthal range enclosing the background for the second wedge
min_vec.append(center_vec[1] + background_width * fwhm_vec[1])
max_vec.append(center_vec[0] - background_width * fwhm_vec[0])
# clean up the data to be in the form expected by select_i_of_q_by_wedge
min_vec = np.array(min_vec)
max_vec = np.array(max_vec)
min_vec[min_vec < -90.0] += 360.0
max_vec[max_vec < -90.0] += 360.0
min_vec[min_vec > 270.0] -= 360.0
max_vec[max_vec > 270.0] -= 360.0
# rearrange the ranges: first the wedges, then the backgrounds
raw_wedges = list(zip(min_vec, max_vec))
summing_wedges = []
for i in range(len(raw_wedges) // 2): # iterate over half the list
summing_wedges.append([raw_wedges[i], raw_wedges[i + 2]])
return summing_wedges
def _binInQAndAzimuthal(data, q_min, q_delta, q_max, azimuthal_delta):
"""This function bins the data in Qmod and azimuthal accoring to the supplied parameters. The maximum
azimuthal is 540deg to allow for finding a peak at/near azimuthal=0deg.
Parameters
==========
data: ~drtsans.dataobjects.IQazimuthal
q_min: float
The left bin boundary for the first Q-bin
q_delta: float
The size of the bins in Q
q_max: float
The left bin boundary for the last Q-bin
azimuthal_delta: float
The size of the bins in azimuthal
Results
=======
tuple
Histogram of ```(intensity, error, azimuthal_bins, q_bins)```
"""
# Export information for Q
data_q_vec = np.sqrt(data.qx**2 + data.qy**2)
logger.notice(f"Raw I(Q). Q range: {data_q_vec.min()}, {data_q_vec.max()}")
# the bonus two steps is to get the end-point in the array
q_bins = np.arange(q_min, q_max + q_delta, q_delta, dtype=float)
logger.notice(f"1D Q bins: {q_bins}")
# create azimuthal binning BinningParams takes number of steps
azimuthal_offset = 0.5 * azimuthal_delta
azimuthal_binning = BinningParams(
0.0 - azimuthal_offset,
360.0 - azimuthal_offset,
bins=int(360.0 / azimuthal_delta),
)
# create the I(azimuthal) for each q-ring
data_of_q_rings = []
# debugging output file
for qmin_ring, qmax_ring in zip(q_bins[:-1], q_bins[1:]):
# bin into I(azimuthal)
i1d_annular = bin_annular_into_q1d(data, azimuthal_binning, qmin_ring, qmax_ring, BinningMethod.NOWEIGHT)
# Create a copy of the arrays with the 360->540deg region repeated
# ignore - delta_mod_q wavelength
phi_new = determine_1d_linear_bins(
x_min=0.0,
x_max=540.0 + azimuthal_delta,
bins=1 + int(540.0 / azimuthal_delta),
).centers
num_orig_bins = i1d_annular.phi.size
num_repeated_bins = phi_new.size - num_orig_bins
intensity_new = np.zeros(phi_new.size)
intensity_new[:num_orig_bins] = i1d_annular.intensity
intensity_new[-1 * num_repeated_bins :] = i1d_annular.intensity[:num_repeated_bins]
error_new = np.zeros(phi_new.size)
error_new[:num_orig_bins] = i1d_annular.error
error_new[-1 * num_repeated_bins :] = i1d_annular.error[:num_repeated_bins]
i1d_annular = I1DAnnular(intensity=intensity_new, error=error_new, phi=phi_new)
# append to the list of spectra
data_of_q_rings.append(i1d_annular)
return q_bins, data_of_q_rings
def _estimatePeakParameters(intensity, azimuthal, azimuthal_start, window_half_width):
"""Estimate the peak parameters by determining a window around a bright point in the data then using
moment calculations to estimate the parameters for a Gaussian that approximates the actual peak.
This is done to aid the fitting which does better with better starting values.
Parameters
==========
intensity: numpy.ndarray
Array of intensities. This must not have nans in it.
azimuthal: numpy.ndarray
Array of azimuthal angles centers
azimuthal_start: float
Starting guess of peak center
window_half_width: float
The window used is this amount on either side of what is determined to be the peak center
Results
=======
tuple
``(intensity, center, sigma)`` where intensity is the full height including background
"""
# Look for the highest point in a section of the data. This is an iterative approach that starts with a window
# centered at `azimuthal_start`, the repeats until the maximum within the window doesn't move more than 1deg.
azimuthal_new = azimuthal_start # where to search around
# azimuthal_last = azimuthal_start # last known value
while True:
# determine new windows staying at least 90.deg inside the edges
window_min = np.max((azimuthal_new - window_half_width, azimuthal.min() + 90.0))
window_max = np.min((azimuthal_new + window_half_width, azimuthal.max() - 90.0))
# create a search window around azimuthal_new
left_index = azimuthal.searchsorted(window_min, side="right")
right_index = azimuthal.searchsorted(window_max, side="right")
# the highest value in the window
max_value = intensity[left_index:right_index].max()
# where that is in the window
max_index = np.where(intensity[left_index:right_index] == max_value)[0].max() + left_index
# update values
azimuthal_last = azimuthal_new
azimuthal_new = azimuthal[max_index]
# stop searching if the value hasn't changed by less than one degree
if abs(azimuthal_new - azimuthal_last) < 1.0:
break
# output
logger.debug(
f"[WEDGE FIT] azimuthal: {azimuthal_new}, {azimuthal_last} with left and right as {left_index}, {right_index}"
)
# now use the first two moments of the data within the window to give an improved center position (first moment)
# and width (derived from second moment)
# the position of the center of the peak is the first moment of the data. "mean" can be thought of as the
# center of mass of the peak in azimuthal angle.
mean = np.sum(intensity[left_index:right_index] * azimuthal[left_index:right_index]) / np.sum(
intensity[left_index:right_index]
)
# the fit uses sigma rather than fwhm
# calculate the second moment about the mean as an approximation to a Gaussian's "sigma" parameter
sigma = np.sum(intensity[left_index:right_index] * np.square(azimuthal[left_index:right_index] - mean)) / np.sum(
intensity[left_index:right_index]
)
sigma = np.sqrt(sigma)
return max_value, mean, sigma
def _fitSpectrum(
spectrum,
q_value,
signal_to_noise_min,
azimuthal_start,
peak_search_window_size_factor,
verbose=True,
auto_wedge_phi_min=0,
auto_wedge_phi_max=360,
auto_symmetric_wedges=False,
):
"""Extract the peak fit parameters for the data. This is done by observing where 2 maxima are in the
spectrum then fitting for the peak parameters. This makes the assumption that the two peaks are 180deg
apart.
Parameters
----------
spectrum:
q_value: float
center (of q_min and q_max) for given annular binned Q-ring
signal_to_noise_min: float
Minimum signal to noise ratio for the data to be considered "fittable"
azimuthal_start: float
First position to look for peaks around
peak_search_window_size_factor: float
Factor of 360 / (num peaks) to construct the search range for wedge peak
verbose: bool
Flag to output fitting information
auto_wedge_phi_min: float
minimum angle to find a wedge for subsequent symmetrisation. Used if symmetric_wedges is true
auto_wedge_phi_max: float
maximum angle to find a wedge for subsequent symmetrisation. Used if symmetric_wedges is true
auto_symmetric_wedges: bool
find a wedge between auto_wedge_phi_min and auto_wedge_phi_max, then symmetrize the findings to the remaining
azimuthal angle domain
Returns
-------
dict
dict[name] = (value, error) where all of the fit parameters are converted.
f0 is background, then f1...fn are the fitted peaks
"""
# define a default window size based on the number of peaks the function supports
# currently only two peaks that are approximately 180deg apart is supported
# window_factor = 0.6 # default is 0.6 about 108 degree with 2 peaks .. for strong anisotropic: 0.1
peak_search_window_size = peak_search_window_size_factor * 180
if auto_symmetric_wedges:
if auto_wedge_phi_min < 0.0:
auto_wedge_phi_min += 360.0
auto_wedge_phi_max += 360.0
azimuthal_start = (auto_wedge_phi_min + auto_wedge_phi_max) / 2.0
peak_search_window_size = (auto_wedge_phi_max - auto_wedge_phi_min) / 2.0
logger.information(
f"[WEDGE] Fixed window size = {peak_search_window_size}"
f"from factor {peak_search_window_size_factor} Number of peaks = 2"
)
# filter out the nans
mask = np.logical_not(np.isnan(spectrum.intensity))
if np.sum(mask) < 10: # do not allow fitting less points than there are parameters
raise RuntimeError("Less than 8 points being fit with 7 parameters (found {} points)".format(np.sum(mask)))
# first estimate background as minimum value
# this will be subtracted off from found intensities during estimation
background = spectrum.intensity[mask].min()
# check if there is signal to noise greater than 2
# this calculation assumes that the background is positive
signal_to_noise = np.sum(spectrum.intensity[mask]) / (float(np.sum(mask)) * background)
if signal_to_noise < signal_to_noise_min:
raise RuntimeError(
"Estimated signal to noise is smaller than {}: found {:.2f}".format(signal_to_noise_min, signal_to_noise)
)
# start of what will eventually be the fit function by specifying the background
function = ["name=FlatBackground,A0={}".format(background)]
# template for describing initial peak guess
gaussian_str = "name=Gaussian,Height={},PeakCentre={},Sigma={},constraints=(0<Height)"
# guess where one peak might be, start with a window of WINDOW_SIZE each side around 110
intensity_peak, azimuthal_first, sigma = _estimatePeakParameters(
spectrum.intensity[mask],
spectrum.phi[mask],
azimuthal_start=azimuthal_start,
window_half_width=peak_search_window_size,
)
function.append(gaussian_str.format(intensity_peak - background, azimuthal_first, sigma))
if not auto_symmetric_wedges:
# assume the other peak is 180 degrees away
azimuthal_start = azimuthal_first + 180.0
intensity_peak, azimuthal_second, sigma = _estimatePeakParameters(
spectrum.intensity[mask],
spectrum.phi[mask],
azimuthal_start=azimuthal_start,
window_half_width=peak_search_window_size,
)
function.append(gaussian_str.format(intensity_peak - background, azimuthal_second, sigma))
# create workspace version of data
# this includes the nans so `Fit` has to be told to ignore them
q_azimuthal_workspace = spectrum.to_workspace()
# fit the positions of the two suspected peaks
fit_workspace_prefix = mtd.unique_hidden_name()
fit_function = ";".join(function)
try:
if auto_symmetric_wedges:
fitresult = Fit(
Function=fit_function,
InputWorkspace=q_azimuthal_workspace,
Output=fit_workspace_prefix,
StartX=spectrum.phi.min() + auto_wedge_phi_min,
EndX=spectrum.phi.min() + auto_wedge_phi_max,
OutputParametersOnly=True,
IgnoreInvalidData=True,
)
else:
fitresult = Fit(
Function=fit_function,
InputWorkspace=q_azimuthal_workspace,
Output=fit_workspace_prefix,
StartX=spectrum.phi.min() + 90.0,
EndX=spectrum.phi.min() + 90.0 + 360.0,
OutputParametersOnly=True,
IgnoreInvalidData=True,
)
except RuntimeError as e:
raise RuntimeError("Failed to fit Q={} with fit function {}".format(q_value, fit_function)) from e
finally:
DeleteWorkspace(q_azimuthal_workspace)
if fitresult.OutputStatus != "success":
raise RuntimeError("Failed to fit Q={}".format(q_value))
# convert the table into a dict[name] = (value, error)
result = {"fit_function": fit_function}
for i in range(fitresult.OutputParameters.rowCount()):
row = fitresult.OutputParameters.row(i)
logger.debug(f"[DEBUG-TEST] row: {row} of type {type(row)}")
name = row["Name"]
if name.startswith("Cost function"):
name = "chisq"
result[name] = (row["Value"], row["Error"])
# delete fit results
for label in ["_Parameters", "_NormalisedCovarianceMatrix"]:
DeleteWorkspace(Workspace=fit_workspace_prefix + label)
# symmetrize the result
if auto_symmetric_wedges:
result["f2.Height"] = result["f1.Height"]
result["f2.PeakCentre"] = (result["f1.PeakCentre"][0] + 180.0, result["f1.PeakCentre"][1])
result["f2.Sigma"] = result["f1.Sigma"]
if verbose:
logger.notice(f"Fit result: {result}")
return result
def _toPositionAndFWHM(fitresult, peak_label, maxchisq):
"""Returns ((center, center_error), (width, width_error))
If chisq is too large or any of the errors is nan, all return values are set to nan
This also generates the weights as height / parameter_error"""
if fitresult["chisq"][0] > maxchisq:
return (np.nan, np.nan), (np.nan, np.nan)
else:
# height = fitresult[peak_label + '.Height']
center = fitresult[peak_label + ".PeakCentre"]
fwhm = tuple([value * SIGMA_TO_FWHM for value in fitresult[peak_label + ".Sigma"]])
# Anything being nan suggests that a fit failed. Set everything to nan so they do not
# contribute to the weighted average.
if np.isnan(center[1]) or np.isnan(fwhm[1]) or center[1] == 0.0 or fwhm[1] == 0.0:
return (np.nan, np.nan), (np.nan, np.nan)
# Weights for are height divided by uncertainty. This results in stronger peaks with lower fitting
# uncertainty contributing more to the parameters in azimuthal angle.
center = center[0], 1.0 / center[1]
fwhm = fwhm[0], 1.0 / fwhm[1]
return center, fwhm
def _weighted_position_and_width(peaks):
"""For a set of peaks, calculate the weighted average position and weighted average fwhm
Parameters
==========
peaks: list
[((position, weight), (fwhm, weight)), ...] Each is a peak for a single Q-value
as a function of azimuthal angle.
Results
=======
tuple
(position, fwhm)
"""
if len(peaks) <= 0:
raise RuntimeError("Encountered zero fitted peaks")
pos_accum, pos_weight_accum = 0.0, 0.0
fwhm_accum, fwhm_weight_accum = 0.0, 0.0
for peak in peaks:
# friendlier names
pos, pos_weight = peak[0] # position and weight
fwhm, fwhm_weight = peak[1] # fwhm and weight
if np.isnan(pos_weight) or np.isnan(fwhm_weight) or pos_weight <= 0.0 or fwhm_weight <= 0.0:
continue # don't use these points
pos_accum += pos * pos_weight
pos_weight_accum += pos_weight
fwhm_accum += fwhm * fwhm_weight
fwhm_weight_accum += fwhm_weight
try:
return (pos_accum / pos_weight_accum), (fwhm_accum / fwhm_weight_accum)
except ZeroDivisionError as e:
raise RuntimeError("Cannot determine fitted positions from zero weights") from e
def _fitQAndAzimuthal(
azimuthal_rings,
q_bins,
signal_to_noise_min,
azimuthal_start,
maxchisq,
peak_search_window_size_factor,
verbose=True,
auto_wedge_phi_min=0,
auto_wedge_phi_max=360,
auto_symmetric_wedges=False,
):
"""Find the peaks in the azimuthal spectra, then combine them into
two composite centers and fwhm. This is currently coded to only
look for two peaks.
Parameters
==========
intensity: numpy.ndarray
The intensity as a 2-d array of Q and azimuthal
error: numpy.ndarray
The uncertainties as a 2-d array of Q and azimuthal
azimuthal_bins: numpy.ndarray
Array of azimuthal angles bin boundaries
q_bins: numpy.ndarray
array of Q bin boundaries
signal_to_noise_min: float
Minimum signal to noise ratio for the data to be considered "fittable"
azimuthal_start: float
First position to look for peaks around
maxchisq: float
The maximum chisq value for a fit result to be used in calculating the composite peak
peak_search_window_size_factor: float
Factor of 360 / (num peaks) to construct the search range for wedge peak
verbose: bool
Flag to turn on fitting information output
Results
=======
list, list, dict
The first list is the peak centers, the second is the peak fwhm, the third is a dictionary for fit result
"""
if len(azimuthal_rings) != len(q_bins) - 1:
raise RuntimeError("must supply q-bin boundaries")
# change to centers in Q for messages
q_centers = 0.5 * (q_bins[:-1] + q_bins[1:])
# select out a single spectrum
peakResults = [[], []]
q_centers_used = []
index = -1 # ring index: set to -1 due to too many 'continue' in the loop
used_index = list()
unfit_message = ""
fitted_peaks_message = ""
fit_result_dict = dict()
for spectrum, q_center in zip(azimuthal_rings, q_centers):
# increase the index number
index += 1
# init dict
fit_result_dict[index] = dict()
try:
fitresult = _fitSpectrum(
spectrum,
q_center,
signal_to_noise_min=signal_to_noise_min,
azimuthal_start=azimuthal_start,
peak_search_window_size_factor=peak_search_window_size_factor,
verbose=verbose,
auto_wedge_phi_min=auto_wedge_phi_min,
auto_wedge_phi_max=auto_wedge_phi_max,
auto_symmetric_wedges=auto_symmetric_wedges,
)
newlyFittedPeaks = [_toPositionAndFWHM(fitresult, label, maxchisq) for label in ["f1", "f2"]]
# record the fit result
fit_result_dict[index] = fitresult
if np.isnan(newlyFittedPeaks[0][0][0]) or np.isnan(newlyFittedPeaks[1][0][0]):
error_reason = f"spectrum {index}: failed to fit peaks due to NaN in fit result\n"
unfit_message += error_reason
fit_result_dict[index]["error"] = error_reason
continue
else:
fitted_peaks_message += f"spectrum {index - 1}: Fitted peaks: {newlyFittedPeaks}\n"
for i in range(len(peakResults)):
peakResults[i].append(newlyFittedPeaks[i])
q_centers_used.append(q_center)
used_index.append(index)
except RuntimeError as e:
error_reason = "spectrum {}: Not using information from Q-slice ({}A):".format(index, q_center)
error_reason += f"Encountered runtime error: {e}\n" # don't worry about it
unfit_message += error_reason
fit_result_dict[index]["error"] = error_reason
continue
except ValueError as val_err:
# in case user specifies a range containing no Q values
error_reason = f"Spectrum {index}: unable to fit peaks due to {val_err}\n"
unfit_message += error_reason
fit_result_dict[index]["error"] = error_reason
continue
else:
# this ring is used
fit_result_dict[index]["error"] = None
logger.notice(f"Q-rings used to determine overall wedges: {q_centers_used}")
logger.information(f"used annular binning index: {used_index}")
logger.notice(unfit_message)
peakResults = [_weighted_position_and_width(peak) for peak in peakResults]
# convert into parallel arrays of centers and fwhm
center_list = []
fwhm_list = []
for center, fwhm in peakResults:
center_list.append(center)
fwhm_list.append(fwhm)
logger.notice(f"Fitted peak centers:\n{fitted_peaks_message}\n")
logger.notice(f"Summed peak centers: {center_list}\nFWHMs : {fwhm_list}")
return center_list, fwhm_list, fit_result_dict