# local imports
from drtsans.dataobjects import IQmod
# third party imports
from mantid.simpleapi import logger
import numpy as np
# standard imports
__all__ = [
"stitch_profiles",
]
[docs]
def stitch_profiles(profiles, overlaps, target_profile_index=0):
r"""
Stitch together a sequence of intensity profiles with overlapping domains, returning a single encompassing profile.
**drtsans objects used**:
~drtsans.dataobjects.IQmod
<https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans/dataobjects.py>
Parameters
----------
profiles: list
A list of ~drtsans.dataobjects.IQmod objects, ordered with increasing Q-values
overlaps: list of lists or list
The overlap regions either as: [(start_1, end_1), (start_2, end_2), (start_3, end_3), ...] or (for backwards
compatibility): [start_1, end_1, start_2, end_2, start_3, end_3, ...]
target_profile_index: int
Index of the ``profiles`` list indicating the target profile, that is, the profile defining the final scaling.
Returns
-------
~drtsans.dataobjects.IQmod
Raises
-------
ValueError
If either the arguments are incorrect ((i) profiles not in order or increasing Q or (ii) the number of overlaps
not congruent with the number of profiles or (iii) overlaps is not a list of numbers or list of
lists/tuples) or a stitching scaling factor <= 0 is calculated.
"""
# Guard clause to verify the profiles are ordered with increasing Q-values
first_q_values = np.array([profile.mod_q[0] for profile in profiles]) # collect first Q-value for each profile
if np.all(np.diff(first_q_values) > 0) is False:
raise ValueError("The profiles are not ordered with increasing Q-values")
# Check overlaps parameter and if needed convert from list of numbers to list of pairs of numbers
if not overlaps:
raise ValueError("Must provide list of overlaps")
if all(isinstance(x, (int, float)) for x in overlaps) and len(overlaps) == 2 * (len(profiles) - 1):
# Convert to new format by pairing the overlaps into (start_q, end_q) pairs
overlaps = [overlaps[i : i + 2] for i in range(0, len(overlaps), 2)]
if not all(isinstance(x, (list, tuple)) and len(x) == 2 for x in overlaps):
raise ValueError("Parameter overlaps must be either list of numbers or list of lists of numbers")
# Guard clause to validate that the number of overlap boundaries is congruent with the number of intensity profiles
if len(overlaps) != len(profiles) - 1:
raise ValueError("The number of overlaps is not appropriate to the number of intensity profiles")
def scaling(target, to_target, starting_q, ending_q):
r"""Utility function to find the scaling factor bringing the to_target profile to the target profile scaling"""
# Find the data points of the "target" profile in the overlap region
indexes_in_overlap = (target.mod_q > starting_q) & (target.mod_q < ending_q) & np.isfinite(target.intensity)
q_values_in_overlap = target.mod_q[indexes_in_overlap]
# Interpolate the "to_target" profile intensities at the previously found Q values
good_values = np.isfinite(to_target.intensity)
to_target_interpolated = np.interp(
q_values_in_overlap,
to_target.mod_q[good_values],
to_target.intensity[good_values],
)
scale = sum(target_profile.intensity[indexes_in_overlap]) / sum(to_target_interpolated)
if scale <= 0:
raise ValueError(
f"Scale number: {scale}. The scaling number for stitching cannot be negative. "
+ "Please check the stitching range or profile pattern"
)
else:
return scale
# We begin stitching to the target profile the neighboring profile with lower Q-values, then proceed until we
# run out of profiles with lower Q-values than the target profile
target_profile = profiles[target_profile_index]
current_index = target_profile_index - 1
while current_index >= 0:
to_target_profile = profiles[current_index]
start_q, end_q = overlaps[current_index]
# Rescale the "to_target" profile to match the scaling of the target profile
scale = scaling(target_profile, to_target_profile, start_q, end_q)
to_target_profile = to_target_profile * scale
print(f"Stitching profile {current_index} to profile {target_profile_index}. Scale factor is {scale:.3e}")
# Discard extrema points
to_target_profile = to_target_profile.extract(to_target_profile.mod_q < end_q) # keep data with Q < end_q
target_profile = target_profile.extract(target_profile.mod_q > start_q) # keep data with Q > start_q
# Stitch by concatenation followed by sorting, save the result into a new target profile
target_profile = to_target_profile.concatenate(target_profile) # just put one profile after the other
target_profile = target_profile.sort() # sort data points by increasing Q
# Move to the next to-target profile
current_index = current_index - 1
# We continue stitching to the target profile the neighboring profile with higher Q-values, then proceed until we
# run out of profiles with higher Q-values than the target profile
current_index = target_profile_index + 1
while current_index < len(profiles):
to_target_profile = profiles[current_index]
start_q, end_q = overlaps[current_index - 1]
# Rescale the "to_target" profile to match the scaling of the target profile
scale = scaling(target_profile, to_target_profile, start_q, end_q)
to_target_profile = to_target_profile * scale
print(f"Stitching profile {current_index} to profile {target_profile_index}. Scale factor is {scale:.3e}")
# Discard extrema points
to_target_profile = to_target_profile.extract(to_target_profile.mod_q > start_q) # keep data with Q < end_q
target_profile = target_profile.extract(target_profile.mod_q < end_q) # keep data with Q > start_q
# Stitch by concatenation followed by sorting, save the result into a new target profile
target_profile = target_profile.concatenate(to_target_profile) # just put one profile after the other
target_profile = target_profile.sort() # sort data points by increasing Q
# Move to the next to-target profile
current_index = current_index + 1
return target_profile
def get_stitch_boundaries(reduction_config, iq1d_unbinned, anisotropic=False):
r"""Initialize the stitching boundaries when a list of boundaries has not been specified
Parameters
----------
reduction_config: dict
The dictionary of reduction parameters
iq1d_unbinned: list of :py:obj:`~drtsans.dataobjects.IQmod`
The unbinned IQmod profiles, one for each detector.
anisotropic: bool
If True, two binnings exist and the applicable boundaries are "wedge1overlapStitchMin",
"wedge1overlapStitchMax", "wedge2overlapStitchMin", and "wedge2overlapStitchMax".
If False, one binning exists and the applicable boundaries are "overlapStitchMin" and "overlapStitchMax".
Returns
-------
list
A list of lists of boundary values, for example for anisotropic=True:
[
[(start_overlap1, end_overlap1), (start_overlap2, end_overlap2)], # wedge 1
[(start_overlap1, end_overlap1), (start_overlap2, end_overlap2)] # wedge 2
]
For anisotropic=False:
[
[(start_overlap1, end_overlap1), (start_overlap2, end_overlap2)]
]
"""
bounds = []
def profile_boundaries(qmin_values, qmax_values):
r"""
Parameters
----------
qmin_values: None or list
A list of Qmin values to use, one per stitching overlap region. For py:obj:`None`, uses the min value of
the higher-Q profile of each pair of profiles to stitch.
qmax_values: None or list
A list of Qmax values to use, one per stitching overlap region. For py:obj:`None`, uses the max value of
the higher-Q profile of each pair of profiles to stitch.
Returns
-------
list
A list of boundary values [(start, end), (start,end), ...]
"""
if not qmin_values or not qmax_values: # catches None or empty list
# use min/max from the Q range for the higher Q profile, e.g. for [p1, p2, p3]
# stitching boundaries for [p1, p2] will come from min(p2), max(p2)
# stitching boundaries for [p2, p3] will come from min(p3), max(p3)
qmin_values = [x.mod_q.min() for x in iq1d_unbinned[1:]]
qmax_values = [x.mod_q.max() for x in iq1d_unbinned[1:]]
logger.notice("Stitch Qmin/max is None. Getting stitch boundaries from min(I(Q)) and max(I(Q))")
profile_bounds = []
for qmin, qmax in zip(qmin_values, qmax_values):
profile_bounds.append((qmin, qmax))
return profile_bounds
if not anisotropic:
qmin_values = reduction_config["overlapStitchQmin"]
qmax_values = reduction_config["overlapStitchQmax"]
bounds.append(profile_boundaries(qmin_values, qmax_values))
else:
for wedge in ["wedge1", "wedge2"]:
qmin_values = reduction_config[f"{wedge}overlapStitchQmin"]
qmax_values = reduction_config[f"{wedge}overlapStitchQmax"]
bounds.append(profile_boundaries(qmin_values, qmax_values))
return bounds
def stitch_binned_profiles(iq1d_unbinned, iq1d_binned, reduction_config):
"""Stitch together sequences of intensity profiles from different detector panels (main, wing and midrange),
that cover a different range of Q-values, returning a single encompassing profile per sequence.
When "1DQbinType" is "scalar" or "annular", there is one sequence of intensity profiles, i.e. one intensity profile
binning per detector panel, and one combined intensity profile is returned.
When "1DQbinType" is "wedge", there are two sequences of intensity profiles, i.e. two intensity profile binnings
per detector panel, and two combined intensity profiles are returned.
Parameters
----------
iq1d_unbinned: list
A list of ~drtsans.dataobjects.IQmod objects for different detectors. The intensity profiles to stitch together
ordered by increasing Q-values.
iq1d_binned: list of lists of ~drtsans.dataobjects.IQmod
A list of lists of ~drtsans.dataobjects.IQmod objects. The outer list is the list of detectors ordered by
increasing Q-values. The inner lists are for different binnings of the original data from the detector.
Example: [[IQ_main_wedge1, IQ_main_wedge2], [IQ_wing_wedge1, IQ_wing_wedge2]]
reduction_config: dict
The dictionary of reduction parameters.
Returns
-------
list of ~drtsans.dataobjects.IQmod objects
The list of combined (stitched) profiles. When "1DQbinType" is "scalar" or "annular", the list contains one
combined intensity profile. When "1DQbinType" is "wedge", the list contains two combined intensity profiles,
one per wedge.
"""
iq1d_combined_out = []
boundaries = get_stitch_boundaries(reduction_config, iq1d_unbinned, anisotropic=len(iq1d_binned[0]) > 1)
iq1d_binned_main = iq1d_binned[0]
for ibinning in range(len(iq1d_binned_main)):
profiles = [detector_profiles[ibinning] for detector_profiles in iq1d_binned]
overlaps = boundaries[ibinning]
# do stitching
try:
iq1d_combined = stitch_profiles(
profiles=profiles,
overlaps=overlaps,
target_profile_index=0,
)
except ValueError:
iq1d_combined = IQmod(intensity=[], error=[], mod_q=[], delta_mod_q=[])
iq1d_combined_out.append(iq1d_combined)
return iq1d_combined_out