Source code for drtsans.auto_wedge

import numpy as np
import os
from typing import List, Tuple
from drtsans.dataobjects import IQmod
from drtsans.determine_bins import determine_1d_linear_bins
from drtsans.iq import BinningMethod, BinningParams, bin_annular_into_q1d

# https://docs.mantidproject.org/nightly/algorithms/DeleteWorkspace-v1.html
from mantid.simpleapi import DeleteWorkspace, logger, Gaussian, FlatBackground, mtd

# https://docs.mantidproject.org/nightly/algorithms/Fit-v1.html
from mantid.simpleapi import Fit
import h5py
from matplotlib import pyplot as plt

__all__ = ["getWedgeSelection"]

# factor to convert from a sigma to full width half max
# https://docs.mantidproject.org/nightly/fitting/fitfunctions/Gaussian.html
SIGMA_TO_FWHM = 2.0 * np.sqrt(2.0 * np.log(2.0))

MANTID_FUNCTION_MAP = {Gaussian().name: Gaussian, FlatBackground().name: FlatBackground}


def _create_fit_function(combo_function_str):
    """Create fit function from a Mantid combo function string

    An example:
    name=FlatBackground,A0=5894.8246694221725;name=Gaussian,Height=16626.818784787953,
    PeakCentre=100.43905821664455,Sigma=5.917594016360494;name=Gaussian,Height=18517.052761733517,
    PeakCentre=304.6715713990027,Sigma=36.943377164429435

    Parameters
    ----------
    combo_function_str: str
        function string

    Returns
    -------
    ~list
        list of Mantid IFunction in order of

    """
    # split to multiple functions
    single_function_str_list = combo_function_str.split(";")
    single_functions = list()

    # convert to function
    for single_func in single_function_str_list:
        # split with ','
        params = single_func.split(",")
        # create function
        func_name = params[0].split("=")[1]
        func_i = MANTID_FUNCTION_MAP[func_name]()
        # start to set parameters
        for value_index in range(1, len(params)):
            # split in PARAM_NAME=PARAM_VALUE style
            param_list = params[value_index].split("=")
            param_name_i = param_list[0]
            param_value_i = float(param_list[1])
            func_i.__setattr__(param_name_i, param_value_i)
        # END-FOR
        single_functions.append(func_i)

    return single_functions


def _set_function_param_value(functions, param_name, param_value):
    """set function parameter value from the result of a combo function

    Parameters
    ----------
    functions: ~list
        list of functions
    param_name: str
        function parameter name may have 2 forms: PARAM_NAME or fI.PARAM_NAME, where I is an integer
    param_value: float
        parameter value

    Returns
    -------
    None

    """
    # Check parameter name
    if "." in param_name:
        terms = param_name.split(".")
        function_index = int(terms[0].split("f")[-1])
        param_name = terms[1]
    else:
        function_index = 0

    # set
    functions[function_index].__setattr__(param_name, param_value)


def _calculate_function(fit_functions, vec_x):
    """Calculate a function (set)

    Parameters
    ----------
    fit_functions: ~list
        List of fit function
    vec_x: numpy.ndarray
        1D array for X values

    Returns
    -------
    numpy.ndarray
        1D array for calculated function values

    """
    # Create the combo function from a list of functions
    combo_function = fit_functions[0]
    for f_i in range(1, len(fit_functions)):
        combo_function += fit_functions[f_i]

    # Call the create
    vec_y = combo_function.__call__(vec_x)

    return vec_y


def _plot_fit_results(rings, peak_fit_dict, output_dir):
    """Plot original data and fit result

    Parameters
    ----------
    rings:  ~list
        List of I(Qx, Qy) in rings
    peak_fit_dict: ~dict
        dictionary containing all the peak fitting result
    output_dir: str
        full path of the output directory

    Returns
    -------

    """
    for index, ring in enumerate(rings):
        # add fitting related information to hdf file
        if peak_fit_dict[index]["error"] is not None:
            # bad fit
            continue

        # construct function from estimated
        fit_function_str = peak_fit_dict[index]["fit_function"]
        fit_function_set = _create_fit_function(fit_function_str)
        # calculate estimated peaks
        estimated_y = _calculate_function(fit_function_set, ring.mod_q)

        for param_name in peak_fit_dict[index]:
            if param_name not in ["fit_function", "error", "used"]:
                # parameter value is recorded as tuple as value and error
                _set_function_param_value(fit_function_set, param_name, peak_fit_dict[index][param_name][0])

        # calculate
        model_y = _calculate_function(fit_function_set, ring.mod_q)
        plt.cla()
        plt.plot(ring.mod_q, ring.intensity, label="observed", color="black")
        plt.plot(ring.mod_q, model_y, label="fitted", color="red")
        plt.plot(ring.mod_q, estimated_y, label="estimated", color="blue")
        plt.savefig(os.path.join(output_dir, f"ring_{index:01}.png"))


def _export_to_h5(iq2d, rings, azimuthal_delta, peak_fit_dict, output_dir):
    """write annular binned I(Qx, Qy) and intensity rings to hdf5 for further review

    Parameters
    ----------
    iq2d:  ~drtsans.dataobjects.Azimuthal
    rings: ~list
        List of I(Qx, Qy) in rings
    peak_fit_dict: ~dict
        dictionary containing all the peak fitting result
    output_dir: str
        full path of the output directory

    Returns
    -------

    """
    # annular binning on the full range
    azimuthal_offset = 0.5 * azimuthal_delta
    azimuthal_binning = BinningParams(
        0.0 - azimuthal_offset,
        360.0 - azimuthal_offset,
        bins=int(360.0 / azimuthal_delta),
    )
    q1d = np.sqrt(iq2d.qx**2 + iq2d.qy**2)
    logger.notice(f"[DEBUG] I(Qx, Qy) Q range: {q1d.min()}, {q1d.max()}")
    full_range_annular = bin_annular_into_q1d(iq2d, azimuthal_binning, q1d.min(), q1d.max(), BinningMethod.NOWEIGHT)

    # open
    debug_h5 = h5py.File(os.path.join(output_dir, "auto_wedge_fit.h5"), "w")
    # define h5py string type
    h5_string_type = h5py.special_dtype(vlen=bytes)

    # create 3 groups for result
    ring_group = debug_h5.create_group("rings")
    fit_group = debug_h5.create_group("good fit")
    nofit_group = debug_h5.create_group("no fit")

    # write full range
    group = ring_group.create_group("full range")
    group.create_dataset("q", data=full_range_annular.mod_q)
    group.create_dataset("intensity", data=full_range_annular.intensity)

    # write out for each ring
    func_param_dict = dict()

    for index, ring in enumerate(rings):
        # add data to hdf file
        group = ring_group.create_group(f"ring {index}")
        group.create_dataset("q", data=ring.mod_q)
        group.create_dataset("intensity", data=ring.intensity)

        # add fitting related information to hdf file
        if peak_fit_dict[index]["error"] is None:
            # good fit
            group = fit_group.create_group(f"ring {index}")
            for param_name in peak_fit_dict[index]:
                # ignore non function parameter
                if param_name in ["fit_function", "error", "used"]:
                    continue

                # create sub dictionary if it does not exist
                if param_name not in func_param_dict:
                    func_param_dict[param_name] = list()

                # add value to dictionary from fit result dictionary
                param_value, param_error = peak_fit_dict[index][param_name]
                func_param_dict[param_name].append([index, param_value, param_error])
        else:
            # no fit
            group = nofit_group.create_group(f"ring {index}")
            # error message
            string_data_set = group.create_dataset("error", (1,), dtype=h5_string_type)
            string_data_set[0] = peak_fit_dict[index]["error"]

        # fit function
        if "fit_function" in peak_fit_dict[index]:
            function_data_set = group.create_dataset("function", (1,), dtype=h5_string_type)
            function_data_set[0] = peak_fit_dict[index]["fit_function"]

    # add peak fitting result
    for param_name in func_param_dict:
        # form data set
        data_set = np.array(func_param_dict[param_name])
        fit_group.create_dataset(param_name, data=data_set)

    # close
    debug_h5.close()


[docs] def getWedgeSelection( data2d, q_min, q_delta, q_max, azimuthal_delta, peak_width=0.25, background_width=1.5, signal_to_noise_min=2.0, peak_search_window_size_factor=0.6, debug_dir="/tmp/", ) -> List[List[Tuple[float, float]]]: """ Calculate azimuthal binning ranges automatically based on finding peaks in the annular ring. The output of this is intended to be used in :py:func:`~drtsans.iq.select_i_of_q_by_wedge`. Parameters ========== data2d: ~drtsans.dataobjects.Azimuthal q_min: float The left bin boundary for the first Q-bin q_delta: float The size of the bins in Q q_max: float The left bin boundary for the last Q-bin azimuthal_delta: float The size of the bins in azimuthal angle peak_width: float Percent of full-width-half-max (FWHM) of the peak to define the signal to be within when determining the final range for azimuthal binning. background_width: float Percent of full-width-half-max (FWHM) of the peak to define the background between peaks to be within when determining the final range for azimuthal binning. signal_to_noise_min: float Minimum signal to noise ratio for the data to be considered "fittable" peak_search_window_size_factor: float Factor of 360 / (num peaks) to construct the search range for wedge peak debug_dir: str Full path of the output directory for debugging output files Results ======= ~list list containing 2 lists each contains 2 2-tuples as ``[[(peak1_min, peak1_max), (peak2_min, peak2_max)], [(..., ...), (..., ...)]]`` """ # Bin azimuthal q, azimuthal_rings = _binInQAndAzimuthal( data2d, q_min=q_min, q_max=q_max, q_delta=q_delta, azimuthal_delta=azimuthal_delta, ) fit_results_tuple = _fitQAndAzimuthal( azimuthal_rings, q_bins=q, signal_to_noise_min=signal_to_noise_min, azimuthal_start=110.0, maxchisq=1000.0, peak_search_window_size_factor=peak_search_window_size_factor, ) center_vec, fwhm_vec, fit_dict = fit_results_tuple # Export fitting result _export_to_h5( iq2d=data2d, rings=azimuthal_rings, azimuthal_delta=azimuthal_delta, peak_fit_dict=fit_dict, output_dir=debug_dir, ) _plot_fit_results(azimuthal_rings, fit_dict, debug_dir) # verify that the results didn't predict wedges larger than half of the data if np.any(np.array(fwhm_vec) > 360.0 / 2): values = ["{:.1f}deg".format(value) for value in fwhm_vec] raise RuntimeError("Encountered large fwhm values: {}".format(", ".join(values))) # convert to min and max ranges min_vec, max_vec = [], [] min_vec.append(center_vec[0] - peak_width * fwhm_vec[0]) max_vec.append(center_vec[0] + peak_width * fwhm_vec[0]) min_vec.append(center_vec[0] + background_width * fwhm_vec[0]) max_vec.append(center_vec[1] - background_width * fwhm_vec[1]) min_vec.append(center_vec[1] - peak_width * fwhm_vec[1]) max_vec.append(center_vec[1] + peak_width * fwhm_vec[1]) min_vec.append(center_vec[1] + background_width * fwhm_vec[1]) max_vec.append(center_vec[0] - background_width * fwhm_vec[0]) # clean up the data to be in the form expected by select_i_of_q_by_wedge min_vec = np.array(min_vec) max_vec = np.array(max_vec) min_vec[min_vec < -90.0] += 360.0 max_vec[max_vec < -90.0] += 360.0 min_vec[min_vec > 270.0] -= 360.0 max_vec[max_vec > 270.0] -= 360.0 # put wedges on opposite sides together raw_wedges = list(zip(min_vec, max_vec)) summing_wedges = [] for i in range(len(raw_wedges) // 2): # iterate over half the list summing_wedges.append([raw_wedges[i], raw_wedges[i + 2]]) return summing_wedges
def _binInQAndAzimuthal(data, q_min, q_delta, q_max, azimuthal_delta): """This function bins the data in Qmod and azimuthal accoring to the supplied parameters. The maximum azimuthal is 540deg to allow for finding a peak at/near azimuthal=0deg. Parameters ========== data: ~drtsans.dataobjects.IQazimuthal q_min: float The left bin boundary for the first Q-bin q_delta: float The size of the bins in Q q_max: float The left bin boundary for the last Q-bin azimuthal_delta: float The size of the bins in azimuthal Results ======= tuple Histogram of ```(intensity, error, azimuthal_bins, q_bins)``` """ # Export information for Q data_q_vec = np.sqrt(data.qx**2 + data.qy**2) logger.notice(f"Raw I(Q). Q range: {data_q_vec.min()}, {data_q_vec.max()}") # the bonus two steps is to get the end-point in the array q_bins = np.arange(q_min, q_max + q_delta, q_delta, dtype=float) logger.notice(f"1D Q bins: {q_bins}") # create azimuthal binning BinningParams takes number of steps azimuthal_offset = 0.5 * azimuthal_delta azimuthal_binning = BinningParams( 0.0 - azimuthal_offset, 360.0 - azimuthal_offset, bins=int(360.0 / azimuthal_delta), ) # create the I(azimuthal) for each q-ring data_of_q_rings = [] # debugging output file for qmin_ring, qmax_ring in zip(q_bins[:-1], q_bins[1:]): # bin into I(azimuthal) I_azimuthal = bin_annular_into_q1d(data, azimuthal_binning, qmin_ring, qmax_ring, BinningMethod.NOWEIGHT) # Create a copy of the arrays with the 360->540deg region repeated # ignore - delta_mod_q wavelength mod_q_new = determine_1d_linear_bins( x_min=0.0, x_max=540.0 + azimuthal_delta, bins=1 + int(540.0 / azimuthal_delta), ).centers num_orig_bins = I_azimuthal.mod_q.size num_repeated_bins = mod_q_new.size - num_orig_bins intensity_new = np.zeros(mod_q_new.size) intensity_new[:num_orig_bins] = I_azimuthal.intensity intensity_new[-1 * num_repeated_bins :] = I_azimuthal.intensity[:num_repeated_bins] error_new = np.zeros(mod_q_new.size) error_new[:num_orig_bins] = I_azimuthal.error error_new[-1 * num_repeated_bins :] = I_azimuthal.error[:num_repeated_bins] I_azimuthal = IQmod(intensity=intensity_new, error=error_new, mod_q=mod_q_new) # append to the list of spectra data_of_q_rings.append(I_azimuthal) return q_bins, data_of_q_rings def _estimatePeakParameters(intensity, azimuthal, azimuthal_start, window_half_width): """Estimate the peak parameters by determining a window around a bright point in the data then using moment calculations to estimate the parameters for a Gaussian that approximates the actual peak. This is done to aid the fitting which does better with better starting values. Parameters ========== intensity: numpy.ndarray Array of intensities. This must not have nans in it. azimuthal: numpy.ndarray Array of azimuthal angles centers azimuthal_start: float Starting guess of peak center window_half_width: float The window used is this amount on either side of what is determined to be the peak center Results ======= tuple ``(intensity, center, sigma)`` where intensity is the full height including background """ # Look for the highest point in a section of the data. This is an iterative approach that starts with a window # centered at `azimuthal_start`, the repeats until the maximum within the window doesn't move more than 1deg. azimuthal_new = azimuthal_start # where to search around # azimuthal_last = azimuthal_start # last known value while True: # determine new windows staying at least 90.deg inside the edges window_min = np.max((azimuthal_new - window_half_width, azimuthal.min() + 90.0)) window_max = np.min((azimuthal_new + window_half_width, azimuthal.max() - 90.0)) # create a search window around azimuthal_new left_index = azimuthal.searchsorted(window_min, side="right") right_index = azimuthal.searchsorted(window_max, side="right") # the highest value in the window max_value = intensity[left_index:right_index].max() # where that is in the window max_index = np.where(intensity[left_index:right_index] == max_value)[0].max() + left_index # update values azimuthal_last = azimuthal_new azimuthal_new = azimuthal[max_index] # stop searching if the value hasn't changed by less than one degree if abs(azimuthal_new - azimuthal_last) < 1.0: break # output print( f"[WEDGE FIT] azimuthal: {azimuthal_new}, {azimuthal_last} with " f"left and right as {left_index}, {right_index}" ) # now use the first two moments of the data within the window to give an improved center position (first moment) # and width (derived from second moment) # the position of the center of the peak is the first moment of the data. "mean" can be thought of as the # center of mass of the peak in azimuthal angle. mean = np.sum(intensity[left_index:right_index] * azimuthal[left_index:right_index]) / np.sum( intensity[left_index:right_index] ) # the fit uses sigma rather than fwhm # calculate the second moment about the mean as an approximation to a Gaussian's "sigma" parameter sigma = np.sum(intensity[left_index:right_index] * np.square(azimuthal[left_index:right_index] - mean)) / np.sum( intensity[left_index:right_index] ) sigma = np.sqrt(sigma) return max_value, mean, sigma def _fitSpectrum( spectrum, q_value, signal_to_noise_min, azimuthal_start, peak_search_window_size_factor, verbose=True, ): """Extract the peak fit parameters for the data. This is done by observing where 2 maxima are in the spectrum then fitting for the peak parameters. This makes the assumption that the two peaks are 180deg apart. Parameters ---------- spectrum: q_value: float center (of q_min and q_max) for given annular binned Q-ring signal_to_noise_min: float Minimum signal to noise ratio for the data to be considered "fittable" azimuthal_start: float First position to look for peaks around peak_search_window_size_factor: float Factor of 360 / (num peaks) to construct the search range for wedge peak verbose: bool Flag to output fitting information Returns ------- dict dict[name] = (value, error) where all of the fit parameters are converted. f0 is background, then f1...fn are the fitted peaks """ # define a default window size based on the number of peaks the function supports # currently only two peaks that are approximately 180deg apart is supported NUM_PEAK = 2 # window_factor = 0.6 # default is 0.6 about 108 degree with 2 peaks .. for strong anisotropic: 0.1 peak_search_window_size = peak_search_window_size_factor * (360.0 / NUM_PEAK) print( f"[WEDGE] Fixed window size = {peak_search_window_size}" f"from factor {peak_search_window_size_factor} Number of peaks = {NUM_PEAK}" ) # filter out the nans mask = np.logical_not(np.isnan(spectrum.intensity)) if np.sum(mask) < 10: # do not allow fitting less points than there are parameters raise RuntimeError("Less than 8 points being fit with 7 parameters (found {} points)".format(np.sum(mask))) # first estimate background as minimum value # this will be subtracted off from found intensities during estimation background = spectrum.intensity[mask].min() # check if there is signal to noise greater than 2 # this calculation assumes that the background is positive signal_to_noise = np.sum(spectrum.intensity[mask]) / (float(np.sum(mask)) * background) if signal_to_noise < signal_to_noise_min: raise RuntimeError( "Estimated signal to noise is smaller than {}: found {:.2f}".format(signal_to_noise_min, signal_to_noise) ) # start of what will eventually be the fit function by specifying the background function = ["name=FlatBackground,A0={}".format(background)] # template for describing initial peak guess gaussian_str = "name=Gaussian,Height={},PeakCentre={},Sigma={}" # guess where one peak might be, start with a window of WINDOW_SIZE each side around 110 intensity_peak, azimuthal_first, sigma = _estimatePeakParameters( spectrum.intensity[mask], spectrum.mod_q[mask], azimuthal_start=azimuthal_start, window_half_width=peak_search_window_size, ) function.append(gaussian_str.format(intensity_peak - background, azimuthal_first, sigma)) for peak_index in range(1, NUM_PEAK): # assume the other peak is 360 / NUM_PEAK degrees away azimuthal_start = azimuthal_first + (360.0 / NUM_PEAK * peak_index) intensity_peak, azimuthal_second, sigma = _estimatePeakParameters( spectrum.intensity[mask], spectrum.mod_q[mask], azimuthal_start=azimuthal_start, window_half_width=peak_search_window_size, ) function.append(gaussian_str.format(intensity_peak - background, azimuthal_second, sigma)) # create workspace version of data # this includes the nans so `Fit` has to be told to ignore them q_azimuthal_workspace = spectrum.to_workspace() # fit the positions of the two suspected peaks fit_workspace_prefix = mtd.unique_hidden_name() fit_function = ";".join(function) try: fitresult = Fit( Function=";".join(function), InputWorkspace=q_azimuthal_workspace, Output=fit_workspace_prefix, StartX=spectrum.mod_q.min() + 90.0, EndX=spectrum.mod_q.min() + 90.0 + 360.0, OutputParametersOnly=True, IgnoreInvalidData=True, ) except RuntimeError as e: raise RuntimeError("Failed to fit Q={} with fit function {}".format(q_value, fit_function)) from e finally: DeleteWorkspace(q_azimuthal_workspace) if fitresult.OutputStatus != "success": raise RuntimeError("Failed to fit Q={}".format(q_value)) # convert the table into a dict[name] = (value, error) result = {"fit_function": fit_function} for i in range(fitresult.OutputParameters.rowCount()): row = fitresult.OutputParameters.row(i) print(f"[DEBUG-TEST] row: {row} of type {type(row)}") name = row["Name"] if name.startswith("Cost function"): name = "chisq" result[name] = (row["Value"], row["Error"]) # delete fit results for label in ["_Parameters", "_NormalisedCovarianceMatrix"]: DeleteWorkspace(Workspace=fit_workspace_prefix + label) if verbose: print(f"Fit result: {result}") return result def _toPositionAndFWHM(fitresult, peak_label, maxchisq): """Returns ((center, center_error), (width, width_error)) If chisq is too large or any of the errors is nan, all return values are set to nan This also generates the weights as height / parameter_error""" if fitresult["chisq"][0] > maxchisq: return (np.nan, np.nan), (np.nan, np.nan) else: # height = fitresult[peak_label + '.Height'] center = fitresult[peak_label + ".PeakCentre"] fwhm = tuple([value * SIGMA_TO_FWHM for value in fitresult[peak_label + ".Sigma"]]) # Anything being nan suggests that a fit failed. Set everything to nan so they do not # contribute to the weighted average. if np.isnan(center[1]) or np.isnan(fwhm[1]) or center[1] == 0.0 or fwhm[1] == 0.0: return (np.nan, np.nan), (np.nan, np.nan) # Weights for are height divided by uncertainty. This results in stronger peaks with lower fitting # uncertainty contributing more to the parameters in azimuthal angle. center = center[0], 1.0 / center[1] fwhm = fwhm[0], 1.0 / fwhm[1] return center, fwhm def _weighted_position_and_width(peaks): """For a set of peaks, calculate the weighted average position and weighted average fwhm Parameters ========== peaks: list [((position, weight), (fwhm, weight)), ...] Each is a peak for a single Q-value as a function of azimuthal angle. Results ======= tuple (position, fwhm) """ if len(peaks) <= 0: raise RuntimeError("Encountered zero fitted peaks") pos_accum, pos_weight_accum = 0.0, 0.0 fwhm_accum, fwhm_weight_accum = 0.0, 0.0 for peak in peaks: # friendlier names pos, pos_weight = peak[0] # position and weight fwhm, fwhm_weight = peak[1] # fwhm and weight if np.isnan(pos_weight) or np.isnan(fwhm_weight) or pos_weight <= 0.0 or fwhm_weight <= 0.0: continue # don't use these points pos_accum += pos * pos_weight pos_weight_accum += pos_weight fwhm_accum += fwhm * fwhm_weight fwhm_weight_accum += fwhm_weight try: return (pos_accum / pos_weight_accum), (fwhm_accum / fwhm_weight_accum) except ZeroDivisionError as e: raise RuntimeError("Cannot determine fitted positions from zero weights") from e def _fitQAndAzimuthal( azimuthal_rings, q_bins, signal_to_noise_min, azimuthal_start, maxchisq, peak_search_window_size_factor, verbose=True, ): """Find the peaks in the azimuthal spectra, then combine them into two composite centers and fwhm. This is currently coded to only look for two peaks. Parameters ========== intensity: numpy.ndarray The intensity as a 2-d array of Q and azimuthal error: numpy.ndarray The uncertainties as a 2-d array of Q and azimuthal azimuthal_bins: numpy.ndarray Array of azimuthal angles bin boundaries q_bins: numpy.ndarray array of Q bin boundaries signal_to_noise_min: float Minimum signal to noise ratio for the data to be considered "fittable" azimuthal_start: float First position to look for peaks around maxchisq: float The maximum chisq value for a fit result to be used in calculating the composite peak peak_search_window_size_factor: float Factor of 360 / (num peaks) to construct the search range for wedge peak verbose: bool Flag to turn on fitting information output Results ======= list, list, dict The first list is the peak centers, the second is the peak fwhm, the third is a dictionary for fit result """ if len(azimuthal_rings) != len(q_bins) - 1: raise RuntimeError("must supply q-bin boundaries") # change to centers in Q for messages q_centers = 0.5 * (q_bins[:-1] + q_bins[1:]) # select out a single spectrum peakResults = [[], []] q_centers_used = [] index = -1 # ring index: set to -1 due to too many 'continue' in the loop used_index = list() unfit_message = "" fitted_peaks_message = "" fit_result_dict = dict() for spectrum, q_center in zip(azimuthal_rings, q_centers): # increase the index number index += 1 # init dict fit_result_dict[index] = dict() try: fitresult = _fitSpectrum( spectrum, q_center, signal_to_noise_min=signal_to_noise_min, azimuthal_start=azimuthal_start, peak_search_window_size_factor=peak_search_window_size_factor, verbose=verbose, ) newlyFittedPeaks = [_toPositionAndFWHM(fitresult, label, maxchisq) for label in ["f1", "f2"]] # record the fit result fit_result_dict[index] = fitresult if np.isnan(newlyFittedPeaks[0][0][0]) or np.isnan(newlyFittedPeaks[1][0][0]): error_reason = f"spectrum {index}: failed to fit peaks due to NaN in fit result\n" unfit_message += error_reason fit_result_dict[index]["error"] = error_reason continue else: fitted_peaks_message += f"spectrum {index-1}: Fitted peaks: {newlyFittedPeaks}\n" for i in range(len(peakResults)): peakResults[i].append(newlyFittedPeaks[i]) q_centers_used.append(q_center) used_index.append(index) except RuntimeError as e: error_reason = "spectrum {}: Not using information from Q-slice ({}A):".format(index, q_center) error_reason += f"Encountered runtime error: {e}\n" # don't worry about it unfit_message += error_reason fit_result_dict[index]["error"] = error_reason continue except ValueError as val_err: # in case user specifies a range containing no Q values error_reason = f"Spectrum {index}: unable to fit peaks due to {val_err}\n" unfit_message += error_reason fit_result_dict[index]["error"] = error_reason continue else: # this ring is used fit_result_dict[index]["error"] = None logger.notice(f"Q-rings used to determine overall wedges: {q_centers_used}") logger.information(f"used annular binning index: {used_index}") logger.notice(unfit_message) peakResults = [_weighted_position_and_width(peak) for peak in peakResults] # convert into parallel arrays of centers and fwhm center_list = [] fwhm_list = [] for center, fwhm in peakResults: center_list.append(center) fwhm_list.append(fwhm) logger.notice(f"Fitted peak centers:\n{fitted_peaks_message}\n") logger.notice(f"Summed peak centers: {center_list}\nFWHMs : {fwhm_list}") return center_list, fwhm_list, fit_result_dict