from collections import namedtuple
from collections.abc import Iterable
import h5py
from enum import Enum
import numpy as np
from typing import Union
# https://docs.mantidproject.org/nightly/algorithms/CreateWorkspace-v1.html
from mantid.simpleapi import mtd, CreateWorkspace
__all__ = [
"getDataType",
"DataType",
"IQmod",
"IQazimuthal",
"IQcrystal",
"I1DAnnular",
"save_i1d",
"verify_same_q_bins",
]
QBINS_EQUAL_DECIMALS = 8
[docs]
class DataType(Enum):
WORKSPACE2D = "Workspace2D"
IQ_MOD = "IQmod"
IQ_AZIMUTHAL = "IQazimuthal"
IQ_CRYSTAL = "IQcrystal"
I_ANNULAR = "I1DAnnular"
class HeaderType(Enum):
MANTID_ASCII = "MantidAscii"
PANDAS = "Pandas"
[docs]
def getDataType(obj):
try:
return DataType(obj.id())
except AttributeError:
name = str(obj)
if name not in mtd:
raise ValueError("Do not know how to get id from: {}".format(obj))
return DataType(mtd[name].id())
def _check_parallel(*args):
"""This makes sure that all input arrays are parallel to each
other. It assumes that the inputs are ndarrays."""
shape = args[0].shape
for arg in args[1:]:
if arg.shape != shape:
raise TypeError("Shape mismatch ({} != {})".format(shape, arg.shape))
def _nary_operation(iq_objects, operation, unpack=True, **kwargs):
r"""
Carry out an operation on the component arrays for each of the IQ objects.
Examples:
- _nary_operation((iq_1, iq_2), numpy.append, unpack=True)
- _nary_operation((iq_1, iq_2), numpy.concatenate, unpack=False)
Parameters
----------
iq_objects: list
A list of ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal
or ~drtsans.dataobjects.I1DAnnular objects.
operation: function
A function operating on a list of :ref:`~numpy.ndarray` objects, e.g. numpy.concatenate((array1, array2))
unpack: bool
If set to :py:obj:`True`, then ``operation`` receives an unpacked list of arrays. If set to :py:obj:`False`,
then ``operation`` receives the list of arrays as a single argument.
Examples: numpy.append(*(array1, array2)) versus numpy.concatenate((array1, array2))
kwargs: dict
Additional options to be passed to ``operation``.
Returns
-------
~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal or
~drtsans.dataobjects.I1DAnnular
"""
reference_object = iq_objects[0]
assert len(set([type(iq_object) for iq_object in iq_objects])) == 1 # check all objects of same type
new_components = list()
for i in range(len(reference_object)): # iterate over the IQ object components
i_components = [iq_object[i] for iq_object in iq_objects] # collect the ith components of each object
if True in [i_component is None for i_component in i_components]: # is any of these None?
new_components.append(None)
elif unpack is True:
new_components.append(operation(*i_components, **kwargs))
else:
new_components.append(operation(i_components, **kwargs))
return reference_object.__class__(*new_components)
def _extract(iq_object, selection):
r"""
Extract a subset of data points onto a new IQ object.
Examples:
- iq_object.extract(42) # extract data point number 42
- iq_object.extract(slice(None, None, 2)) # extract every other data point
- iq_object.extract(IQmod().mod_q < 0.1) # extract points with Q < 0.1
Parameters
----------
iq_object: ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal,
or ~drtsans.dataobjects.I1DAnnular
selection: int, slice, :ref:`~numpy.ndarray`
Any selection that can be passed onto a :ref:`~numpy.ndarray`
Returns
-------
~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal
"""
component_fragments = list()
for component in iq_object:
if component is None:
component_fragments.append(None)
else:
fragment = component.__getitem__(selection)
if isinstance(fragment, Iterable) is False: # selection extracts only one data point
fragment = [
fragment,
]
component_fragments.append(fragment)
return iq_object.__class__(*component_fragments)
def scale_intensity(iq_object, scaling):
r"""Rescale intensity and error for one IQ object.
Relies on fields 'intensity' and 'error' being the first two components
Parameters
----------
iq_object: ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal,
or ~drtsans.dataobjects.I1DAnnular
scaling: float
Returns
-------
~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal,
~drtsans.dataobjects.I1DAnnular
"""
intensity = scaling * iq_object.intensity
error = scaling * iq_object.error
return iq_object.__class__(intensity, error, *[iq_object[i] for i in range(2, len(iq_object))])
[docs]
def verify_same_q_bins(iq0, iq1, raise_exception_if_diffrent=False, tolerance=None):
"""Verify whether 2 I(Q) has the same range of Q
Parameters
----------
iq0: ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal, ~drtsans.dataobjects.I1DAnnular
iq1: ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal, ~drtsans.dataobjects.I1DAnnular
Returns
-------
bool
True if they are same
""" # noqa: E501
# Same class
if iq0.__class__ != iq1.__class__:
raise RuntimeError(f"Input I(Q)s are of different type: {type(iq0)} and {type(iq1)}")
# IQmod
if isinstance(iq0, IQmod) or isinstance(iq0, I1DAnnular):
# Q1D
if iq0.wavelength is None or iq1.wavelength is None:
# no wave length
q0vec = iq0.x
q1vec = iq1.x
else:
# also comparing the wavelength bins if they do exist
q0vec = np.array([iq0.x, iq0.wavelength])
q1vec = np.array([iq1.x, iq1.wavelength])
elif isinstance(iq0, IQazimuthal) or isinstance(iq0, IQcrystal):
# Q2D
if iq0.wavelength is None or iq1.wavelength is None:
# no wavelength
q0vec = np.array([iq0.qx, iq0.qy])
q1vec = np.array([iq1.qx, iq1.qy])
else:
q0vec = np.array([iq0.qx, iq0.qy, iq0.wavelength])
q1vec = np.array([iq1.qx, iq1.qy, iq0.wavelength])
else:
raise RuntimeError(f"I(Q) of type {type(iq0)} is not supported by verify same binning")
# Verify
same = True
try:
if tolerance:
np.testing.assert_allclose(q0vec, q1vec, atol=tolerance)
else:
np.testing.assert_allclose(q0vec, q1vec)
except AssertionError as assert_error:
same = False
if raise_exception_if_diffrent:
raise assert_error
return same
def q_azimuthal_to_q_modulo(Iq):
"""this method converts Qazimuthal to Qmodulo using
(1) Q = sqrt(Qx**2 + Qy**2)
(2) sigmaQ = sqrt(sigmaQx**2 + sigmaQy**2)
Parameters:
----------
Iq: IQazimuthal object
Returns:
-------
Iqmod: IQmod object
"""
qx = Iq.qx
qy = Iq.qy
delta_qx = Iq.delta_qx
delta_qy = Iq.delta_qy
mod_q = np.sqrt(np.square(qx) + np.square(qy))
delta_mode_q = np.sqrt(np.square(delta_qx) + np.square(delta_qy))
q_azimuthal_to_q_modulo = namedtuple("q_azimuthal_to_q_modulo", "mod_q, delta_mod_q")
q_azimuthal_to_q_modulo.mod_q = mod_q
q_azimuthal_to_q_modulo.delta_mod_q = delta_mode_q
iqmod = IQmod(intensity=Iq.intensity, error=Iq.error, mod_q=mod_q, delta_mod_q=delta_mode_q)
return iqmod
def concatenate(iq_objects):
r"""
Join a sequence IQ objects; concatenate((iq1, iq2, iq3,...))
Parameters
----------
iq_objects: list
A sequence of ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal,
~drtsans.dataobjects.IQcrystal, or ~drtsans.dataobjects.I1DAnnular objects.
All objects must be of the same type
Returns
-------
~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal,
~drtsans.dataobjects.I1DAnnular
"""
return _nary_operation(iq_objects, np.concatenate, unpack=False)
class I1DWorker:
"""Common methods for 1D intensity objects"""
def to_csv(self, file_name, sep=" ", float_format="%.6E", skip_nan=True):
r"""
Write the intensity profile into an ASCII file.
Parameters
----------
file_name: str
Path to output file
sep: str
String of length 1. Field delimiter for the output file.
float_format: str
Format string for floating point numbers.
skip_nan: bool
If true, any data point where intensity is NAN will not be written to file
"""
# Convert to dictionary to construct a pandas DataFrame instance
from pandas import DataFrame
frame = DataFrame({label: value for label, value in self._asdict().items() if value is not None})
# Create the order of the columns
if isinstance(self, IQmod):
i_q_mod_cols = ["mod_q", "intensity", "error"] # 3 mandatory columns
elif isinstance(self, I1DAnnular):
i_q_mod_cols = ["phi", "intensity", "error"] # 3 mandatory columns
else:
raise NotImplementedError("Only IQmod and I1DAnnular are supported")
if "delta_mod_q" in frame.keys():
i_q_mod_cols.append("delta_mod_q")
if "wavelength" in frame.keys():
i_q_mod_cols.append("wavelength")
mode_nan = "w" # write mode for csv file. If we add a header first, it will be 'a'
# delete NANs if requested
if skip_nan:
finites = np.isfinite(frame["intensity"])
if np.count_nonzero(finites) < len(frame):
frame = frame[finites]
mode_nan = "a"
with open(file_name, "w") as f:
f.write("# NANs have been skipped\n")
# Write to file
frame.to_csv(
file_name,
columns=i_q_mod_cols,
index=False,
sep=sep,
float_format=float_format,
na_rep="NAN",
mode=mode_nan,
)
[docs]
class IQmod(namedtuple("IQmod", "intensity error mod_q delta_mod_q wavelength"), I1DWorker):
r"""This class holds the information for I(Q) scalar. All of the arrays must be 1-dimensional
and parallel (same length). The ``delta_mod_q`` and ``wavelength`` fields are optional."""
[docs]
@staticmethod
def read_csv(file, sep=" "):
r"""
Load an intensity profile into a :py:obj:`~drtsans.dataobjects.IQmod` object.
Required file format:
The first row must include the names for the file columns. The order of the columns is irrelevant and
the names of the columns must be:
- 'intensity' for profile intensities. This column is required.
- 'error' for uncertainties in the profile intensities. This column is required.
- 'mod_q' for values of Q. This column is required.
- 'delta_mod_q' for uncertainties in the Q values. This column is optional.
- 'wavelength' This column is optional.
Example of file contents::
intensity error mod_q
1000.0 89.0 0.001
90.0 8.0 0.01
4.7 0.9 0.1
Usage example::
from drtsans.mono.gpsans import IQmod
iq = IQmod.read_csv(file_name)
Parameters
----------
file: str
Path to input file
sep: str
String of length 1. Field delimiter in the input file.
Returns
-------
~drtsans.dataobjects.IQmod
"""
from pandas import read_csv as pd_read_csv
frame = pd_read_csv(file, sep=sep, dtype=np.float64, na_values="NAN", comment="#")
args = [frame[label].values for label in ["intensity", "error", "mod_q"]]
kwargs = {
label: frame[label].values for label in ["delta_mod_q", "wavelength"] if label in list(frame.columns)
}
return IQmod(*args, **kwargs)
def __new__(cls, intensity, error, mod_q, delta_mod_q=None, wavelength=None):
# these conversions do nothing if the supplied information is already a numpy.ndarray
intensity = np.array(intensity)
error = np.array(error)
mod_q = np.array(mod_q)
# if intensity is 1d, then everything else will be if they are parallel
if len(intensity.shape) != 1:
raise TypeError('"intensity" must be a 1-dimensional array, found shape={}'.format(intensity.shape))
# check that the mandatory fields are parallel
_check_parallel(intensity, error, mod_q)
# work with optional fields
if delta_mod_q is not None:
delta_mod_q = np.array(delta_mod_q)
_check_parallel(intensity, delta_mod_q)
if wavelength is not None:
wavelength = np.array(wavelength)
_check_parallel(intensity, wavelength)
# pass everything to namedtuple
return super(IQmod, cls).__new__(cls, intensity, error, mod_q, delta_mod_q, wavelength)
def __mul__(self, scaling):
r"""Scale intensities and their uncertainties by a number"""
return scale_intensity(self, scaling)
def __rmul__(self, scaling):
return self.__mul__(scaling)
def __truediv__(self, divisor):
r"""Divide intensities and their uncertainties by a number"""
return self.__mul__(1.0 / divisor)
@property
def x(self):
return self.mod_q
@property
def delta_x(self):
return self.delta_mod_q
[docs]
def concatenate(self, other):
r"""
Append additional data points from another :py:obj:`~drtsans.dataobjects.IQmod` object and
return the composite as a new :py:obj:`~drtsans.dataobjects.IQmod` object.
Parameters
----------
other: ~drtsans.dataobjects.IQmod
Additional data points.
Returns
-------
~drtsans.dataobjects.IQmod
"""
return _nary_operation((self, other), np.concatenate, unpack=False)
[docs]
def sort(self, key="mod_q"):
r"""
Sort the data points according to one of the components of the :py:obj:`~drtsans.dataobjects.IQmod` object.
Parameters
----------
key: str
Component prescribing the sorting order. Default sorting is by increasing Q value.
Returns
-------
~drtsans.dataobjects.IQmod
"""
return _extract(self, np.argsort(getattr(self, key)))
[docs]
def id(self):
return DataType.IQ_MOD
[docs]
def be_finite(self):
# Remove NaN
finite_locations = np.isfinite(self.intensity)
finite_delta_mod_q = None if self.delta_mod_q is None else self.delta_mod_q[finite_locations]
finite_binned_iq_wl = IQmod(
intensity=self.intensity[finite_locations],
error=self.error[finite_locations],
mod_q=self.mod_q[finite_locations],
delta_mod_q=finite_delta_mod_q,
wavelength=self.wavelength[finite_locations],
)
return finite_binned_iq_wl
[docs]
def to_workspace(self, name=None):
# create a name if one isn't provided
if name is None:
name = mtd.unique_hidden_name()
dq = self.delta_mod_q
if dq is None:
dq = self.mod_q * 0.0
return CreateWorkspace(
DataX=self.mod_q,
DataY=self.intensity,
DataE=self.error,
UnitX="momentumtransfer",
OutputWorkspace=name,
Dx=dq,
EnableLogging=False,
)
[docs]
class I1DAnnular(namedtuple("I1DAnnular", "intensity error phi wavelength"), I1DWorker):
"""
This class holds the information for the 1D annular profile, I(phi).
The field ``wavelength`` is optional.
The annular profile is obtained by integrating the 2D I(x,y) over a limited scalar Q range
[Qmin, Qmax] to produce the intensity as a function of the angle around the beam center.
"""
def __new__(cls, intensity, error, phi, wavelength=None):
# these conversions do nothing if the supplied information is already a numpy.ndarray
intensity = np.array(intensity)
error = np.array(error)
phi = np.array(phi)
# if intensity is 1d, then everything else will be if they are parallel
if len(intensity.shape) != 1:
raise TypeError('"intensity" must be a 1-dimensional array, found shape={}'.format(intensity.shape))
# check that the mandatory fields are parallel
_check_parallel(intensity, error, phi)
# work with optional fields
if wavelength is not None:
wavelength = np.array(wavelength)
_check_parallel(intensity, wavelength)
# pass everything to namedtuple
return super(I1DAnnular, cls).__new__(cls, intensity, error, phi, wavelength)
def __mul__(self, scaling):
r"""Scale intensities and their uncertainties by a number"""
return scale_intensity(self, scaling)
def __rmul__(self, scaling):
return self.__mul__(scaling)
def __truediv__(self, divisor):
r"""Divide intensities and their uncertainties by a number"""
return self.__mul__(1.0 / divisor)
@property
def x(self):
return self.phi
@property
def delta_x(self):
return None
[docs]
def concatenate(self, other):
r"""
Append additional data points from another :py:obj:`~drtsans.dataobjects.I1DAnnular` object
and return the composite as a new :py:obj:`~drtsans.dataobjects.I1DAnnular` object.
Parameters
----------
other: ~drtsans.dataobjects.I1DAnnular
Additional data points.
Returns
-------
~drtsans.dataobjects.I1DAnnular
"""
return _nary_operation((self, other), np.concatenate, unpack=False)
[docs]
def sort(self, key="phi"):
r"""
Sort the data points according to one of the components of the
:py:obj:`~drtsans.dataobjects.I1DAnnular` object.
Parameters
----------
key: str
Component prescribing the sorting order. Default sorting is by increasing phi value.
Returns
-------
~drtsans.dataobjects.I1DAnnular
"""
return _extract(self, np.argsort(getattr(self, key)))
[docs]
def id(self):
return DataType.I_ANNULAR
[docs]
def be_finite(self):
# Remove NaN
finite_locations = np.isfinite(self.intensity)
finite_binned_i_wl = I1DAnnular(
intensity=self.intensity[finite_locations],
error=self.error[finite_locations],
phi=self.phi[finite_locations],
wavelength=self.wavelength[finite_locations],
)
return finite_binned_i_wl
[docs]
def to_workspace(self, name=None):
# create a name if one isn't provided
if name is None:
name = mtd.unique_hidden_name()
return CreateWorkspace(
DataX=self.phi,
DataY=self.intensity,
DataE=self.error,
UnitX="azimuthalAngle",
OutputWorkspace=name,
EnableLogging=False,
)
[docs]
@staticmethod
def read_csv(file, sep=" "):
r"""
Load an intensity profile into a :py:obj:`~drtsans.dataobjects.I1DAnnular` object.
Required file format:
The first row must include the names for the file columns. The order of the columns is
irrelevant and the names of the columns must be:
- 'intensity' for profile intensities. This column is required.
- 'error' for uncertainties in the profile intensities. This column is required.
- 'phi' for values of annular angle. This column is required.
- 'wavelength' This column is optional.
Example of file contents::
intensity error phi
1000.0 89.0 10.0
90.0 8.0 20.0
4.7 0.9 30.0
Example::
from drtsans.mono.gpsans import I1DAnnular
iq = I1DAnnular.read_csv(file_name)
Parameters
----------
file: str
Path to input file
sep: str
String of length 1. Field delimiter in the input file.
Returns
-------
~drtsans.dataobjects.I1DAnnular
"""
from pandas import read_csv as pd_read_csv
frame = pd_read_csv(file, sep=sep, dtype=np.float64, na_values="NAN", comment="#")
args = [frame[label].values for label in ["intensity", "error", "phi"]]
kwargs = {label: frame[label].values for label in ["wavelength"] if label in list(frame.columns)}
return I1DAnnular(*args, **kwargs)
[docs]
def load_iqmod(file, sep=" ", header_type=HeaderType.PANDAS.value):
r"""
Load an intensity profile into a :py:obj:`~drtsans.dataobjects.I1DAnnular` object.
Required file format:
The first row must include the names for the file columns. The order of the columns is irrelevant and
the names of the columns must be:
- 'intensity' for profile intensities. This column is required.
- 'error' for uncertainties in the profile intensities. This column is required.
- 'mod_q' for values of Q. This column is required.
- 'delta_mod_q' for uncertainties in the Q values. This column is optional.
- 'wavelength' This column is optional.
Example of file contents::
intensity error mod_q
1000.0 89.0 0.001
90.0 8.0 0.01
4.7 0.9 0.1
Usage example::
from drtsans.mono.gpsans import load_iqmod
iq = load_iqmod(file_name)
Parameters
----------
file: str
Path to input file
sep: str
String of length 1. Field delimiter in the input file.
Returns
-------
~drtsans.dataobjects.IQmod
"""
if header_type == HeaderType.MANTID_ASCII.value:
csv_data = np.genfromtxt(file, comments="#", dtype=np.float64, skip_header=2)
num_cols = len(csv_data[0])
assert num_cols == 4, "Incompatible number of colums: {} should be 4".format(num_cols)
return IQmod(csv_data[:, 1], csv_data[:, 2], csv_data[:, 0], csv_data[:, 3])
else:
return IQmod.read_csv(file, sep=sep)
[docs]
def save_i1d(
iq,
file,
sep=" ",
float_format="%.6E",
skip_nan=True,
header_type=HeaderType.MANTID_ASCII.value,
):
r"""
Write the 1D intensity profile object into an ASCII file.
Parameters
----------
iq: IQmod | I1DAnnular
Profile to be saved
file: str
Path to output file
sep: str
String of length 1. Field delimiter for the output file.
float_format: str
Format string for floating point numbers.
skip_nan: bool
If true, any data point where intensity is NAN will not be written to file
header_type: str
Determine the header type to make 1D data compatible with panda or Mantid
possible values:
HeaderType.MANTID_ASCII.value
HeaderType.PANDAS.value
"""
if isinstance(iq, IQmod):
save_iqmod(iq, file, sep, float_format, skip_nan, header_type)
elif isinstance(iq, I1DAnnular):
save_i1d_annular(iq, file, sep, float_format, skip_nan, header_type)
else:
raise NotImplementedError("Only IQmod and I1DAnnular are supported")
[docs]
def save_iqmod(
iq,
file,
sep=" ",
float_format="%.6E",
skip_nan=True,
header_type=HeaderType.MANTID_ASCII.value,
):
r"""
Write the :py:obj:`~drtsans.dataobjects.IQmod` object into an ASCII file.
Output columns
(Line 0: ) mod_q intensity error mod_q_error
Parameters
----------
iq: ~drtsans.dataobjects.IQmod
Profile to be saved
file: str
Path to output file
sep: str
String of length 1. Field delimiter for the output file.
float_format: str
Format string for floating point numbers.
skip_nan: bool
If true, any data point where intensity is NAN will not be written to file
header_type: str
Determine the header type to make 1D data compatible with panda or Mantid
possible values:
HeaderType.MANTID_ASCII.value
HeaderType.PANDAS.value
"""
if header_type == HeaderType.MANTID_ASCII.value:
from drtsans.save_ascii import save_ascii_binned_1D
save_ascii_binned_1D(file, "I(Q)", iq)
else:
iq.to_csv(file, sep=sep, float_format=float_format, skip_nan=skip_nan)
def save_i1d_annular(
iq,
file,
sep=" ",
float_format="%.6E",
skip_nan=True,
header_type=HeaderType.MANTID_ASCII.value,
):
r"""
Write the :py:obj`~drtsans.dataobjects.I1DAnnular` object into an ASCII file.
Output columns
(Line 0: ) phi intensity error
Parameters
----------
iq: ~drtsans.dataobjects.I1DAnnular
Profile to be saved
file: str
Path to output file
sep: str
String of length 1. Field delimiter for the output file.
float_format: str
Format string for floating point numbers.
skip_nan: bool
If true, any data point where intensity is NAN will not be written to file
header_type: str
Determine the header type to make 1D data compatible with panda or Mantid
possible values:
HeaderType.MANTID_ASCII.value
HeaderType.PANDAS.value
"""
if header_type == HeaderType.MANTID_ASCII.value:
from drtsans.save_ascii import save_ascii_binned_1D_annular
save_ascii_binned_1D_annular(file, "I(phi)", iq)
else:
iq.to_csv(file, sep=sep, float_format=float_format, skip_nan=skip_nan)
[docs]
class IQazimuthal(namedtuple("IQazimuthal", "intensity error qx qy delta_qx delta_qy wavelength")):
r"""
This class holds the information for the azimuthal projection, I(Qx, Qy). The resolution terms,
(``delta_qx``, ``delta_qy``) and ``wavelength`` fields are optional.
All of the arrays must be 1-dimensional or 2-dimensional and matching length. For the 1-dimensional
case, all of the arrays must be parallel (same length). For the 2-dimensional case, (``intensity``,
``error``, ``delta_qx``, ``delta_qy``, ``wavelength``) must all be parallel. However, for (``qx``,
``qy``), they must either (both) be 2-dimensional and parallel, or (both) 1-dimensional with
``len(qx) == intensity.shape[0]`` and ``len(qy) == intensity.shape[1]``.
if intensity is 2D, and qx and qy are 1D: In this constructor, it is assumed that intensity 2D array
will match
qx = [[qx0, qx0, ...], [qx1, qx1, ...], ...]
qy = [[qy0, qy1, ...], [qy0, qy1, ...], ...]
because qx and qy will be created in such style.
"""
def __new__(cls, intensity, error, qx, qy, delta_qx=None, delta_qy=None, wavelength=None): # noqa: C901
# these conversions do nothing if the supplied information is already a numpy.ndarray
intensity = np.array(intensity)
error = np.array(error)
qx = np.array(qx)
qy = np.array(qy)
# check that the mandatory fields are parallel
if len(intensity.shape) == 1:
_check_parallel(intensity, error, qx, qy)
elif len(intensity.shape) == 2:
if len(qx.shape) == 1:
# Qx and Qy are given in 1D array (not meshed)
_check_parallel(intensity, error)
if intensity.shape[0] != qx.shape[0]:
raise TypeError(
"Incompatible dimensions intensity[{}] and qx[{}]".format(intensity.shape, qx.shape[0])
)
if intensity.shape[1] != qy.shape[0]:
raise TypeError(
"Incompatible dimensions intensity[{}] and qy[{}]".format(intensity.shape, qy.shape[0])
)
elif len(qx.shape) == 2:
# Qx and Qy are given in meshed 2D
_check_parallel(intensity, error, qx, qy)
else:
raise TypeError("Qx can only be of dimension 1 or 2, found {}".format(len(qx.shape)))
else:
raise TypeError("intensity can only be of dimension 1 or 2, found {}".format(len(intensity.shape)))
# work with optional fields
if np.logical_xor(delta_qx is None, delta_qy is None):
raise TypeError("Must specify either both or neither of delta_qx and delta_qy")
if delta_qx is not None:
delta_qx = np.array(delta_qx)
delta_qy = np.array(delta_qy)
_check_parallel(intensity, delta_qx, delta_qy)
if wavelength is not None:
wavelength = np.array(wavelength)
_check_parallel(intensity, wavelength)
# make the qx and qy have the same shape as the data
if len(intensity.shape) == 2 and len(qx.shape) == 1 and len(qy.shape) == 1:
# Using meshgrid to construct the Qx and Qy 2D arrays. This is consistent with the algorithm
# that is used in bin_iq_2d()
qx, qy = np.meshgrid(qx, qy, indexing="ij")
# Sanity check
assert qx.shape == intensity.shape, (
f"qx and intensity must have same shapes. It is not now: {qx.shape} vs {intensity.shape}"
)
assert qy.shape == intensity.shape, (
f"qy and intensity must have same shapes. It is not now: {qy.shape} vs {intensity.shape}"
)
# pass everything to namedtuple
return super(IQazimuthal, cls).__new__(cls, intensity, error, qx, qy, delta_qx, delta_qy, wavelength)
[docs]
def be_finite(self):
"""Remove NaN by flattening first
Returns
-------
IQazimuthal
I(qx, qy, wavelength) with NaN removed
"""
# Check whether is any need to recontruct the IQazimuthal
num_finite_points = len(np.where(np.isfinite(self.intensity))[0])
if num_finite_points == self.intensity.size:
return self
# Flatten
intensity = self.intensity.flatten()
# finite values
finite_locations = np.isfinite(intensity)
# construct output
intensity = intensity[finite_locations]
error = self.error.flatten()[finite_locations]
qx = self.qx.flatten()[finite_locations]
qy = self.qy.flatten()[finite_locations]
dqx = None if self.delta_qx is None else self.delta_qx.flatten()[finite_locations]
dqy = None if self.delta_qy is None else self.delta_qy.flatten()[finite_locations]
wavelength = None if self.wavelength is None else self.wavelength.flatten()[finite_locations]
finite_iq2d = IQazimuthal(
intensity=intensity,
error=error,
qx=qx,
qy=qy,
delta_qx=dqx,
delta_qy=dqy,
wavelength=wavelength,
)
return finite_iq2d
[docs]
def id(self):
return DataType.IQ_AZIMUTHAL
[docs]
def concatenate(self, other):
r"""
Append additional data points from another ~drtsans.dataobjects.IQazimuthal object and
return the composite as a new ~drtsans.dataobjects.IQazimuthal object.
Parameters
----------
other: ~drtsans.dataobjects.IQazimuthal
Additional data points.
Returns
-------
~drtsans.dataobjects.IQazimuthal
"""
return _nary_operation((self, other), np.concatenate, unpack=False)
[docs]
def ravel(self):
"""Create a new ~drtsans.dataobjects.IQazimuthal with all of the arrays flattened"""
kwargs = dict()
if self.delta_qx is not None:
kwargs["delta_qx"] = self.delta_qx.ravel()
if self.delta_qx is not None:
kwargs["delta_qy"] = self.delta_qy.ravel()
if self.wavelength is not None:
kwargs["wavelength"] = self.wavelength.ravel()
return IQazimuthal(
intensity=self.intensity.ravel(),
error=self.error.ravel(),
qx=self.qx.ravel(),
qy=self.qy.ravel(),
**kwargs,
)
[docs]
def to_workspace(self, name=None):
"""Return a workspace representing I(Qx, Qy) with error E"""
# create a name if one isn't provided
if name is None:
name = mtd.unique_hidden_name()
def flatten_unique_vals(a: np.ndarray):
"""
Helper function to get the unique values with a tolerance preserving order
Steps:
1. Flatten the array
2. Round the array to QBINS_EQUAL_DECIMALS decimals
3. Find the indices of the first location of each unique element in the rounded array
4. Use the indices to return the unique elements from the original array
"""
b = a.flatten()
return b[np.unique(b.round(QBINS_EQUAL_DECIMALS), return_index=True)[1]]
# get the unique qx and qy values
qx_ws = flatten_unique_vals(self.qx)
qy_ws = flatten_unique_vals(self.qy)
# reshape intensity and error as needed for CreateWorkspace with qy as vertical axis
qx_ws_len = len(qx_ws)
qy_ws_len = len(qy_ws)
if len(self.intensity.shape) == 1 and len(self.intensity) == len(qx_ws):
intensity_ws = self.intensity
error_ws = self.error
else:
intensity_ws = self.intensity.ravel().reshape((qx_ws_len, qy_ws_len)).T
error_ws = self.error.ravel().reshape((qx_ws_len, qy_ws_len)).T
ws = CreateWorkspace(
DataX=qx_ws,
UnitX="MomentumTransfer",
VerticalAxisValues=qy_ws,
VerticalAxisUnit="MomentumTransfer",
NSpec=qy_ws.size,
DataY=intensity_ws,
DataE=error_ws,
OutputWorkspace=name,
# dx=
EnableLogging=False,
)
return ws
[docs]
class IQcrystal(namedtuple("IQazimuthal", "intensity error qx qy qz delta_qx delta_qy delta_qz wavelength")):
"""This class holds the information for the crystallographic projection, I(Qx, Qy, Qz). All of the
arrays must be 1-dimensional and parallel (same length). The resolution terms, (``delta_qx``,
``delta_qy``, ``delta_qz``) and ``wavelength`` fields are optional."""
def __new__(
cls,
intensity,
error,
qx,
qy,
qz,
delta_qx=None,
delta_qy=None,
delta_qz=None,
wavelength=None,
):
# these conversions do nothing if the supplied information is already a numpy.ndarray
intensity = np.array(intensity)
error = np.array(error)
qx = np.array(qx)
qy = np.array(qy)
qz = np.array(qz)
# check that the mandatory fields are parallel
if len(intensity.shape) != 1:
raise NotImplementedError("Do not currently support dimension != 1, found {}".format(len(intensity.shape)))
_check_parallel(intensity, error)
_check_parallel(intensity, qx, qy, qz) # TODO make more generic
# work with optional fields
count = 0
if delta_qx is not None:
count += 1
if delta_qy is not None:
count += 1
if delta_qz is not None:
count += 1
if not (count == 0 or count == 3):
raise TypeError("Must specify either all or none of delta_qx, delta_qy, delta_qz")
if delta_qx is not None:
delta_qx = np.array(delta_qx)
delta_qy = np.array(delta_qy)
delta_qz = np.array(delta_qz)
_check_parallel(intensity, delta_qx, delta_qy, delta_qz)
if wavelength is not None:
wavelength = np.array(wavelength)
_check_parallel(intensity, wavelength)
# pass everything to namedtuple
return super(IQcrystal, cls).__new__(
cls, intensity, error, qx, qy, qz, delta_qx, delta_qy, delta_qz, wavelength
)
[docs]
def id(self):
return DataType.IQ_CRYSTAL
def save_i_of_q_to_h5(iq: Union[IQmod, IQazimuthal], h5_name: str):
"""Export I of Q, in form of namedtuple, to an HDF5"""
# assert isinstance(iq, namedtuple), f'I of Q must be of type namedtuple but not {type(iq)}'
# Init h5
iq_h5 = h5py.File(h5_name, "w")
# create group
data_group = iq_h5.create_group(iq.__class__.__name__)
# Write field
for index, field in enumerate(iq._fields):
# add data
data = iq[index]
if data is not None:
data_group.create_dataset(field, data=data)
# Close
iq_h5.close()
def load_iq1d_from_h5(h5_name: str) -> IQmod:
"""Load an HDF5 for I(Q1D)"""
# Open file
with h5py.File(h5_name, "r") as iq_h5:
data_group = iq_h5["IQmod"]
value_dict = dict()
# get tuple element
for field in ["intensity", "error", "mod_q", "delta_mod_q", "wavelength"]:
try:
value_dict[field] = data_group[field][()]
except KeyError:
value_dict[field] = None
iqmod = IQmod(**value_dict)
return iqmod
def load_iq2d_from_h5(h5_name: str) -> IQazimuthal:
# Open file
with h5py.File(h5_name, "r") as iq_h5:
data_group = iq_h5["IQazimuthal"]
value_dict = dict()
# get tuple element
print(f"DEBUG field: {IQazimuthal._fields}")
for field in IQazimuthal._fields:
try:
value_dict[field] = data_group[field][()]
except KeyError:
value_dict[field] = None
iq2d = IQazimuthal(**value_dict)
return iq2d
class _Testing:
r"""
Mimic the numpy.testing module by applying functions of this module to the component arrays of the IQ objects
"""
@staticmethod
def _nary_assertion(iq_objects, assertion_function, unpack=True, **kwargs):
r"""
Carry out an assertion on the component arrays for each of the IQ objects.
Examples:
- _nary_assertion((iq_1, iq_2), numpy.append, unpack=True)
- _nary_assertion((iq_1, iq_2), numpy.concatenate, unpack=False)
Parameters
----------
iq_objects: list
A list of ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, or ~drtsans.dataobjects.IQcrystal
objects.
assertion_function: function
A function operating on a list of :ref:`~numpy.ndarray` objects, e.g. numpy.concatenate((array1, array2))
unpack: bool
If set to :py:obj:`True`, then ``assertion_function`` receives an unpacked list of arrays.
If set to :py:obj:`False`, then ``assertion_function`` receives the list of arrays as a single argument.
Examples: numpy.append(*(array1, array2)) versus numpy.concatenate((array1, array2))
kwargs: dict
Additional options to be passed
Raises
------
AssertionError
"""
reference_object = iq_objects[0] # pick the first of the list as reference object
assert len(set([type(iq_object) for iq_object in iq_objects])) == 1 # check all objects of same type
for i in range(len(reference_object)): # iterate over the IQ object components
component_name = reference_object._fields[i]
print(f"all_close on {component_name}")
i_components = [iq_object[i] for iq_object in iq_objects] # collect the ith components of each object
if True in [i_component is None for i_component in i_components]: # is any of these None?
if set(i_components) == set([None]):
continue # all arrays are actually None, so they are identical
else:
raise AssertionError(f"field {component_name} is None for some of the iQ objects")
elif unpack is True:
assertion_function(*i_components, **kwargs)
else:
assertion_function(i_components, **kwargs)
@staticmethod
def assert_allclose(actual, desired, **kwargs):
r"""Apply :ref:`~numpy.testing.assert_allclose on each component array"""
_Testing._nary_assertion(
(actual, desired),
assertion_function=np.testing.assert_allclose,
unpack=True,
**kwargs,
)
testing = _Testing() # use it as if it were a module, e.g. testing.assert_allclose(iq_1, iq_2, atol=1.e-6)