from collections import namedtuple
from collections.abc import Iterable
import h5py
from enum import Enum
import numpy as np
from typing import Union
# https://docs.mantidproject.org/nightly/algorithms/CreateWorkspace-v1.html
from mantid.simpleapi import mtd, CreateWorkspace
__all__ = [
"getDataType",
"DataType",
"IQmod",
"IQazimuthal",
"IQcrystal",
"verify_same_q_bins",
]
[docs]
class DataType(Enum):
WORKSPACE2D = "Workspace2D"
IQ_MOD = "IQmod"
IQ_AZIMUTHAL = "IQazimuthal"
IQ_CRYSTAL = "IQcrystal"
class HeaderType(Enum):
MANTID_ASCII = "MantidAscii"
PANDAS = "Pandas"
[docs]
def getDataType(obj):
try:
return DataType(obj.id())
except AttributeError:
name = str(obj)
if name not in mtd:
raise ValueError("Do not know how to get id from: {}".format(obj))
return DataType(mtd[name].id())
def _check_parallel(*args):
"""This makes sure that all input arrays are parallel to each
other. It assumes that the inputs are ndarrays."""
shape = args[0].shape
for arg in args[1:]:
if arg.shape != shape:
raise TypeError("Shape mismatch ({} != {})".format(shape, arg.shape))
def _nary_operation(iq_objects, operation, unpack=True, **kwargs):
r"""
Carry out an operation on the component arrays for each of the IQ objects.
Examples:
- _nary_operation((iq_1, iq_2), numpy.append, unpack=True)
- _nary_operation((iq_1, iq_2), numpy.concatenate, unpack=False)
Parameters
----------
iq_objects: list
A list of ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, or ~drtsans.dataobjects.IQcrystal
objects.
operation: function
A function operating on a list of :ref:`~numpy.ndarray` objects, e.g. numpy.concatenate((array1, array2))
unpack: bool
If set to :py:obj:`True`, then ``operation`` receives an unpacked list of arrays. If set to :py:obj:`False`,
then ``operation`` receives the list of arrays as a single argument.
Examples: numpy.append(*(array1, array2)) versus numpy.concatenate((array1, array2))
kwargs: dict
Additional options to be passed to ``operation``.
Returns
-------
~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, or ~drtsans.dataobjects.IQcrystal
"""
reference_object = iq_objects[0]
assert len(set([type(iq_object) for iq_object in iq_objects])) == 1 # check all objects of same type
new_components = list()
for i in range(len(reference_object)): # iterate over the IQ object components
i_components = [iq_object[i] for iq_object in iq_objects] # collect the ith components of each object
if True in [i_component is None for i_component in i_components]: # is any of these None?
new_components.append(None)
elif unpack is True:
new_components.append(operation(*i_components, **kwargs))
else:
new_components.append(operation(i_components, **kwargs))
return reference_object.__class__(*new_components)
def _extract(iq_object, selection):
r"""
Extract a subset of data points onto a new IQ object.
Examples:
- iq_object.extract(42) # extract data point number 42
- iq_object.extract(slice(None, None, 2)) # extract every other data point
- iq_object.extract(IQmod().mod_q < 0.1) # extract points with Q < 0.1
Parameters
----------
iq_object: ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal
selection: int, slice, :ref:`~numpy.ndarray`
Any selection that can be passed onto a :ref:`~numpy.ndarray`
Returns
-------
~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal
"""
component_fragments = list()
for component in iq_object:
if component is None:
component_fragments.append(None)
else:
fragment = component.__getitem__(selection)
if isinstance(fragment, Iterable) is False: # selection extracts only one data point
fragment = [
fragment,
]
component_fragments.append(fragment)
return iq_object.__class__(*component_fragments)
def scale_intensity(iq_object, scaling):
r"""Rescale intensity and error for one IQ object.
Relies on fields 'intensity' and 'error' being the first two components
Parameters
----------
iq_object: ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal
scaling: float
Returns
-------
~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal
"""
intensity = scaling * iq_object.intensity
error = scaling * iq_object.error
return iq_object.__class__(intensity, error, *[iq_object[i] for i in range(2, len(iq_object))])
[docs]
def verify_same_q_bins(iq0, iq1, raise_exception_if_diffrent=False, tolerance=None):
"""Verify whether 2 I(Q) has the same range of Q
Parameters
----------
iq0: ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal
iq1: ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal
Returns
-------
bool
True if they are same
"""
# Same class
if iq0.__class__ != iq1.__class__:
raise RuntimeError(f"Input I(Q)s are of different type: {type(iq0)} and {type(iq1)}")
# IQmod
if isinstance(iq0, IQmod):
# Q1D
if iq0.wavelength is None or iq1.wavelength is None:
# no wave length
q0vec = iq0.mod_q
q1vec = iq1.mod_q
else:
# also comparing the wavelength bins if they do exist
q0vec = np.array([iq0.mod_q, iq0.wavelength])
q1vec = np.array([iq1.mod_q, iq1.wavelength])
elif isinstance(iq0, IQazimuthal) or isinstance(iq0, IQcrystal):
# Q2D
if iq0.wavelength is None or iq1.wavelength is None:
# no wavelength
q0vec = np.array([iq0.qx, iq0.qy])
q1vec = np.array([iq1.qx, iq1.qy])
else:
q0vec = np.array([iq0.qx, iq0.qy, iq0.wavelength])
q1vec = np.array([iq1.qx, iq1.qy, iq0.wavelength])
else:
raise RuntimeError(f"I(Q) of type {type(iq0)} is not supported by verify same binning")
# Verify
same = True
try:
if tolerance:
np.testing.assert_allclose(q0vec, q1vec, atol=tolerance)
else:
np.testing.assert_allclose(q0vec, q1vec)
except AssertionError as assert_error:
same = False
if raise_exception_if_diffrent:
raise assert_error
return same
def q_azimuthal_to_q_modulo(Iq):
"""this method converts Qazimuthal to Qmodulo using
(1) Q = sqrt(Qx**2 + Qy**2)
(2) sigmaQ = sqrt(sigmaQx**2 + sigmaQy**2)
Parameters:
----------
Iq: IQazimuthal object
Returns:
-------
Iqmod: IQmod object
"""
qx = Iq.qx
qy = Iq.qy
delta_qx = Iq.delta_qx
delta_qy = Iq.delta_qy
mod_q = np.sqrt(np.square(qx) + np.square(qy))
delta_mode_q = np.sqrt(np.square(delta_qx) + np.square(delta_qy))
q_azimuthal_to_q_modulo = namedtuple("q_azimuthal_to_q_modulo", "mod_q, delta_mod_q")
q_azimuthal_to_q_modulo.mod_q = mod_q
q_azimuthal_to_q_modulo.delta_mod_q = delta_mode_q
iqmod = IQmod(intensity=Iq.intensity, error=Iq.error, mod_q=mod_q, delta_mod_q=delta_mode_q)
return iqmod
def concatenate(iq_objects):
r"""
Join a sequence IQ objects; concatenate((iq1, iq2, iq3,...))
Parameters
----------
iq_objects: list
A sequence of ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, or
~drtsans.dataobjects.IQcrystal objects. All objects must be of the same type
Returns
-------
~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, ~drtsans.dataobjects.IQcrystal
"""
return _nary_operation(iq_objects, np.concatenate, unpack=False)
[docs]
class IQmod(namedtuple("IQmod", "intensity error mod_q delta_mod_q wavelength")):
r"""This class holds the information for I(Q) scalar. All of the arrays must be 1-dimensional
and parallel (same length). The ``delta_mod_q`` and ``wavelength`` fields are optional."""
[docs]
@staticmethod
def read_csv(file, sep=" "):
r"""
Load an intensity profile into a ~drtsans.dataobjects.IQmod object.
Required file format:
The first row must include the names for the file columns. The order of the columns is irrelevant and
the names of the columns must be:
- 'intensity' for profile intensities. This column is required.
- 'error' for uncertainties in the profile intensities. This column is required.
- 'mod_q' for values of Q. This column is required.
- 'delta_mod_q' for uncertainties in the Q values. This column is optional.
- 'wavelength' This column is optional.
Example of file contents:
intensity error mod_q
1000.0 89.0 0.001
90.0 8.0 0.01
4.7 0.9 0.1
Usage example:
```
from drtsans.mono.gpsans import IQmod
iq = IQmod.read_csv(file_name)
```
Parameters
----------
file: str
Path to input file
sep: str
String of length 1. Field delimiter in the input file.
Returns
-------
~drtsans.dataobjects.IQmod
"""
from pandas import read_csv as pd_read_csv
frame = pd_read_csv(file, sep=sep, dtype=np.float64, na_values="NAN", comment="#")
args = [frame[label].values for label in ["intensity", "error", "mod_q"]]
kwargs = {
label: frame[label].values for label in ["delta_mod_q", "wavelength"] if label in list(frame.columns)
}
return IQmod(*args, **kwargs)
def __new__(cls, intensity, error, mod_q, delta_mod_q=None, wavelength=None):
# these conversions do nothing if the supplied information is already a numpy.ndarray
intensity = np.array(intensity)
error = np.array(error)
mod_q = np.array(mod_q)
# if intensity is 1d, then everything else will be if they are parallel
if len(intensity.shape) != 1:
raise TypeError('"intensity" must be a 1-dimensional array, found shape={}'.format(intensity.shape))
# check that the mandatory fields are parallel
_check_parallel(intensity, error, mod_q)
# work with optional fields
if delta_mod_q is not None:
delta_mod_q = np.array(delta_mod_q)
_check_parallel(intensity, delta_mod_q)
if wavelength is not None:
wavelength = np.array(wavelength)
_check_parallel(intensity, wavelength)
# pass everything to namedtuple
return super(IQmod, cls).__new__(cls, intensity, error, mod_q, delta_mod_q, wavelength)
def __mul__(self, scaling):
r"""Scale intensities and their uncertainties by a number"""
return scale_intensity(self, scaling)
def __rmul__(self, scaling):
return self.__mul__(scaling)
def __truediv__(self, divisor):
r"""Divide intensities and their uncertainties by a number"""
return self.__mul__(1.0 / divisor)
[docs]
def concatenate(self, other):
r"""
Append additional data points from another ~drtsans.dataobjects.IQmod object and return the composite as a
new ~drtsans.dataobjects.IQmod object.
Parameters
----------
other: ~drtsans.dataobjects.IQmod
Additional data points.
Returns
-------
~drtsans.dataobjects.IQmod
"""
return _nary_operation((self, other), np.concatenate, unpack=False)
[docs]
def sort(self, key="mod_q"):
r"""
Sort the data points according to one of the components of the ~drtsans.dataobjects.IQmod object.
Parameters
----------
key: str
Component prescribing the sorting order. Default sorting is by increasing Q value.
Returns
-------
~drtsans.dataobjects.IQmod
"""
return _extract(self, np.argsort(getattr(self, key)))
[docs]
def id(self):
return DataType.IQ_MOD
[docs]
def be_finite(self):
# Remove NaN
finite_locations = np.isfinite(self.intensity)
finite_delta_mod_q = None if self.delta_mod_q is None else self.delta_mod_q[finite_locations]
finite_binned_iq_wl = IQmod(
intensity=self.intensity[finite_locations],
error=self.error[finite_locations],
mod_q=self.mod_q[finite_locations],
delta_mod_q=finite_delta_mod_q,
wavelength=self.wavelength[finite_locations],
)
return finite_binned_iq_wl
[docs]
def to_workspace(self, name=None):
# create a name if one isn't provided
if name is None:
name = mtd.unique_hidden_name()
dq = self.delta_mod_q
if dq is None:
dq = self.mod_q * 0.0
return CreateWorkspace(
DataX=self.mod_q,
DataY=self.intensity,
DataE=self.error,
UnitX="momentumtransfer",
OutputWorkspace=name,
Dx=dq,
EnableLogging=False,
)
[docs]
def to_csv(self, file_name, sep=" ", float_format="%.6E", skip_nan=True):
r"""
Write the ~drtsans.dataobjects.IQmod object into an ASCII file.
Parameters
----------
file_name: str
Path to output file
sep: str
String of length 1. Field delimiter for the output file.
float_format: str
Format string for floating point numbers.
skip_nan: bool
If true, any data point where intensity is NAN will not be written to file
"""
# Convert to dictionary to construct a pandas DataFrame instance
from pandas import DataFrame
frame = DataFrame({label: value for label, value in self._asdict().items() if value is not None})
# Create the order of the columns
i_q_mod_cols = ["mod_q", "intensity", "error"] # 3 mandatory columns
if "delta_mod_q" in frame.keys():
i_q_mod_cols.append("delta_mod_q")
if "wavelength" in frame.keys():
i_q_mod_cols.append("wavelength")
mode_nan = "w" # write mode for csv file. If we add a header first, it will be 'a'
# delete NANs if requested
if skip_nan:
finites = np.isfinite(frame["intensity"])
if np.count_nonzero(finites) < len(frame):
frame = frame[finites]
mode_nan = "a"
with open(file_name, "w") as f:
f.write("# NANs have been skipped\n")
# Write to file
frame.to_csv(
file_name,
columns=i_q_mod_cols,
index=False,
sep=sep,
float_format=float_format,
na_rep="NAN",
mode=mode_nan,
)
[docs]
def load_iqmod(file, sep=" ", header_type=HeaderType.PANDAS.value):
r"""
Load an intensity profile into a ~drtsans.dataobjects.IQmod object.
Required file format:
The first row must include the names for the file columns. The order of the columns is irrelevant and
the names of the columns must be:
- 'intensity' for profile intensities. This column is required.
- 'error' for uncertainties in the profile intensities. This column is required.
- 'mod_q' for values of Q. This column is required.
- 'delta_mod_q' for uncertainties in the Q values. This column is optional.
- 'wavelength' This column is optional.
Example of file contents:
intensity error mod_q
1000.0 89.0 0.001
90.0 8.0 0.01
4.7 0.9 0.1
Usage example:
```
from drtsans.mono.gpsans import load_iqmod
iq = load_iqmod(file_name)
```
Parameters
----------
file: str
Path to input file
sep: str
String of length 1. Field delimiter in the input file.
Returns
-------
~drtsans.dataobjects.IQmod
"""
if header_type == HeaderType.MANTID_ASCII.value:
csv_data = np.genfromtxt(file, comments="#", dtype=np.float64, skip_header=2)
num_cols = len(csv_data[0])
assert num_cols == 4, "Incompatible number of colums: {} should be 4".format(num_cols)
return IQmod(csv_data[:, 1], csv_data[:, 2], csv_data[:, 0], csv_data[:, 3])
else:
return IQmod.read_csv(file, sep=sep)
[docs]
def save_iqmod(
iq,
file,
sep=" ",
float_format="%.6E",
skip_nan=True,
header_type=HeaderType.MANTID_ASCII.value,
):
r"""
Write the ~drtsans.dataobjects.IQmod object into an ASCII file.
Current output columns
(Line 0: ) intensity error mod_q
Expected
(Line 0: ) mod_q intensity error mod_q_error
Parameters
----------
iq: ~drtsans.dataobjects.IQmod
Profile to be saved
file: str
Path to output file
sep: str
String of length 1. Field delimiter for the output file.
float_format: str
Format string for floating point numbers.
skip_nan: bool
If true, any data point where intensity is NAN will not be written to file
header: text
Determine the header type to make 1D data compatible with panda or Mantid
possible values:
HeaderType.MANTID_ASCII.value
HeaderType.PANDAS.value
"""
if header_type == HeaderType.MANTID_ASCII.value:
from drtsans.save_ascii import save_ascii_binned_1D
save_ascii_binned_1D(file, "I(Q)", iq)
else:
iq.to_csv(file, sep=sep, float_format=float_format, skip_nan=skip_nan)
[docs]
class IQazimuthal(namedtuple("IQazimuthal", "intensity error qx qy delta_qx delta_qy wavelength")):
r"""
This class holds the information for the azimuthal projection, I(Qx, Qy). The resolution terms,
(``delta_qx``, ``delta_qy``) and ``wavelength`` fields are optional.
All of the arrays must be 1-dimensional or 2-dimensional and matching length. For the 1-dimensional
case, all of the arrays must be parallel (same length). For the 2-dimensional case, (``intensity``,
``error``, ``delta_qx``, ``delta_qy``, ``wavelength``) must all be parallel. However, for (``qx``,
``qy``), they must either (both) be 2-dimensional and parallel, or (both) 1-dimensional with
``len(qx) == intensity.shape[0]`` and ``len(qy) == intensity.shape[1]``.
if intensity is 2D, and qx and qy are 1D: In this constructor, it is assumed that intensity 2D array
will match
qx = [[qx0, qx1, ...], [qx0, qx1, ...], ...]
qy = [[qy0, qy0, ...], [qy1, qy1, ...], ...]
because qx and qy will be created in such style.
"""
def __new__(cls, intensity, error, qx, qy, delta_qx=None, delta_qy=None, wavelength=None): # noqa: C901
# these conversions do nothing if the supplied information is already a numpy.ndarray
intensity = np.array(intensity)
error = np.array(error)
qx = np.array(qx)
qy = np.array(qy)
# check that the mandatory fields are parallel
if len(intensity.shape) == 1:
_check_parallel(intensity, error, qx, qy)
elif len(intensity.shape) == 2:
if len(qx.shape) == 1:
# Qx and Qy are given in 1D array (not meshed)
_check_parallel(intensity, error)
if intensity.shape[0] != qx.shape[0]:
raise TypeError(
"Incompatible dimensions intensity[{}] and qx[{}]".format(intensity.shape, qx.shape[0])
)
if intensity.shape[1] != qy.shape[0]:
raise TypeError(
"Incompatible dimensions intensity[{}] and qy[{}]".format(intensity.shape, qy.shape[0])
)
elif len(qx.shape) == 2:
# Qx and Qy are given in meshed 2D
_check_parallel(intensity, error, qx, qy)
else:
raise TypeError("Qx can only be of dimension 1 or 2, found {}".format(len(qx.shape)))
else:
raise TypeError("intensity can only be of dimension 1 or 2, found {}".format(len(intensity.shape)))
# work with optional fields
if np.logical_xor(delta_qx is None, delta_qy is None):
raise TypeError("Must specify either both or neither of delta_qx and delta_qy")
if delta_qx is not None:
delta_qx = np.array(delta_qx)
delta_qy = np.array(delta_qy)
_check_parallel(intensity, delta_qx, delta_qy)
if wavelength is not None:
wavelength = np.array(wavelength)
_check_parallel(intensity, wavelength)
# make the qx and qy have the same shape as the data
if len(intensity.shape) == 2 and len(qx.shape) == 1 and len(qy.shape) == 1:
# Using meshgrid to construct the Qx and Qy 2D arrays. This is consistent with the algorithm
# that is used in bin_iq_2d()
qx, qy = np.meshgrid(qx, qy, indexing="ij")
# Sanity check
assert qx.shape == intensity.shape, (
f"qx and intensity must have same shapes. " f"It is not now: {qx.shape} vs {intensity.shape}"
)
assert qy.shape == intensity.shape, (
f"qy and intensity must have same shapes. " f"It is not now: {qy.shape} vs {intensity.shape}"
)
# pass everything to namedtuple
return super(IQazimuthal, cls).__new__(cls, intensity, error, qx, qy, delta_qx, delta_qy, wavelength)
[docs]
def be_finite(self):
"""Remove NaN by flattening first
Returns
-------
IQazimuthal
I(qx, qy, wavelength) with NaN removed
"""
# Check whether is any need to recontruct the IQazimuthal
num_finite_points = len(np.where(np.isfinite(self.intensity))[0])
if num_finite_points == self.intensity.size:
return self
# Flatten
intensity = self.intensity.flatten()
# finite values
finite_locations = np.isfinite(intensity)
# construct output
intensity = intensity[finite_locations]
error = self.error.flatten()[finite_locations]
qx = self.qx.flatten()[finite_locations]
qy = self.qy.flatten()[finite_locations]
dqx = None if self.delta_qx is None else self.delta_qx.flatten()[finite_locations]
dqy = None if self.delta_qy is None else self.delta_qy.flatten()[finite_locations]
wavelength = None if self.wavelength is None else self.wavelength.flatten()[finite_locations]
finite_iq2d = IQazimuthal(
intensity=intensity,
error=error,
qx=qx,
qy=qy,
delta_qx=dqx,
delta_qy=dqy,
wavelength=wavelength,
)
return finite_iq2d
[docs]
def id(self):
return DataType.IQ_AZIMUTHAL
[docs]
def concatenate(self, other):
r"""
Append additional data points from another ~drtsans.dataobjects.IQazimuthal object and
return the composite as a new ~drtsans.dataobjects.IQazimuthal object.
Parameters
----------
other: ~drtsans.dataobjects.IQazimuthal
Additional data points.
Returns
-------
~drtsans.dataobjects.IQazimuthal
"""
return _nary_operation((self, other), np.concatenate, unpack=False)
[docs]
def ravel(self):
"""Create a new ~drtsans.dataobjects.IQazimuthal with all of the arrays flattened"""
kwargs = dict()
if self.delta_qx is not None:
kwargs["delta_qx"] = self.delta_qx.ravel()
if self.delta_qx is not None:
kwargs["delta_qy"] = self.delta_qy.ravel()
if self.wavelength is not None:
kwargs["wavelength"] = self.wavelength.ravel()
return IQazimuthal(
intensity=self.intensity.ravel(),
error=self.error.ravel(),
qx=self.qx.ravel(),
qy=self.qy.ravel(),
**kwargs,
)
[docs]
class IQcrystal(namedtuple("IQazimuthal", "intensity error qx qy qz delta_qx delta_qy delta_qz wavelength")):
"""This class holds the information for the crystallographic projection, I(Qx, Qy, Qz). All of the
arrays must be 1-dimensional and parallel (same length). The resolution terms, (``delta_qx``,
``delta_qy``, ``delta_qz``) and ``wavelength`` fields are optional."""
def __new__(
cls,
intensity,
error,
qx,
qy,
qz,
delta_qx=None,
delta_qy=None,
delta_qz=None,
wavelength=None,
):
# these conversions do nothing if the supplied information is already a numpy.ndarray
intensity = np.array(intensity)
error = np.array(error)
qx = np.array(qx)
qy = np.array(qy)
qz = np.array(qz)
# check that the mandatory fields are parallel
if len(intensity.shape) != 1:
raise NotImplementedError("Do not currently support dimension != 1, found {}".format(len(intensity.shape)))
_check_parallel(intensity, error)
_check_parallel(intensity, qx, qy, qz) # TODO make more generic
# work with optional fields
count = 0
if delta_qx is not None:
count += 1
if delta_qy is not None:
count += 1
if delta_qz is not None:
count += 1
if not (count == 0 or count == 3):
raise TypeError("Must specify either all or none of delta_qx, delta_qy, delta_qz")
if delta_qx is not None:
delta_qx = np.array(delta_qx)
delta_qy = np.array(delta_qy)
delta_qz = np.array(delta_qz)
_check_parallel(intensity, delta_qx, delta_qy, delta_qz)
if wavelength is not None:
wavelength = np.array(wavelength)
_check_parallel(intensity, wavelength)
# pass everything to namedtuple
return super(IQcrystal, cls).__new__(
cls, intensity, error, qx, qy, qz, delta_qx, delta_qy, delta_qz, wavelength
)
[docs]
def id(self):
return DataType.IQ_CRYSTAL
def save_i_of_q_to_h5(iq: Union[IQmod, IQazimuthal], h5_name: str):
"""Export I of Q, in form of namedtuple, to an HDF5"""
# assert isinstance(iq, namedtuple), f'I of Q must be of type namedtuple but not {type(iq)}'
# Init h5
iq_h5 = h5py.File(h5_name, "w")
# create group
data_group = iq_h5.create_group(iq.__class__.__name__)
# Write field
for index, field in enumerate(iq._fields):
# add data
data = iq[index]
if data is not None:
data_group.create_dataset(field, data=data)
# Close
iq_h5.close()
def load_iq1d_from_h5(h5_name: str) -> IQmod:
"""Load an HDF5 for I(Q1D)"""
# Open file
with h5py.File(h5_name, "r") as iq_h5:
data_group = iq_h5["IQmod"]
value_dict = dict()
# get tuple element
for field in ["intensity", "error", "mod_q", "delta_mod_q", "wavelength"]:
try:
value_dict[field] = data_group[field][()]
except KeyError:
value_dict[field] = None
iqmod = IQmod(**value_dict)
return iqmod
def load_iq2d_from_h5(h5_name: str) -> IQazimuthal:
# Open file
with h5py.File(h5_name, "r") as iq_h5:
data_group = iq_h5["IQazimuthal"]
value_dict = dict()
# get tuple element
print(f"DEBUG field: {IQazimuthal._fields}")
for field in IQazimuthal._fields:
try:
value_dict[field] = data_group[field][()]
except KeyError:
value_dict[field] = None
iq2d = IQazimuthal(**value_dict)
return iq2d
class _Testing:
r"""
Mimic the numpy.testing module by applying functions of this module to the component arrays of the IQ objects
"""
@staticmethod
def _nary_assertion(iq_objects, assertion_function, unpack=True, **kwargs):
r"""
Carry out an assertion on the component arrays for each of the IQ objects.
Examples:
- _nary_assertion((iq_1, iq_2), numpy.append, unpack=True)
- _nary_assertion((iq_1, iq_2), numpy.concatenate, unpack=False)
Parameters
----------
iq_objects: list
A list of ~drtsans.dataobjects.IQmod, ~drtsans.dataobjects.IQazimuthal, or ~drtsans.dataobjects.IQcrystal
objects.
assertion_function: function
A function operating on a list of :ref:`~numpy.ndarray` objects, e.g. numpy.concatenate((array1, array2))
unpack: bool
If set to :py:obj:`True`, then ``assertion_function`` receives an unpacked list of arrays.
If set to :py:obj:`False`, then ``assertion_function`` receives the list of arrays as a single argument.
Examples: numpy.append(*(array1, array2)) versus numpy.concatenate((array1, array2))
kwargs: dict
Additional options to be passed
Raises
------
AssertionError
"""
reference_object = iq_objects[0] # pick the first of the list as reference object
assert len(set([type(iq_object) for iq_object in iq_objects])) == 1 # check all objects of same type
for i in range(len(reference_object)): # iterate over the IQ object components
component_name = reference_object._fields[i]
print(f"all_close on {component_name}")
i_components = [iq_object[i] for iq_object in iq_objects] # collect the ith components of each object
if True in [i_component is None for i_component in i_components]: # is any of these None?
if set(i_components) == set([None]):
continue # all arrays are actually None, so they are identical
else:
raise AssertionError(f"field {component_name} is None for some of the iQ objects")
elif unpack is True:
assertion_function(*i_components, **kwargs)
else:
assertion_function(i_components, **kwargs)
@staticmethod
def assert_allclose(actual, desired, **kwargs):
r"""Apply :ref:`~numpy.testing.assert_allclose on each component array"""
_Testing._nary_assertion(
(actual, desired),
assertion_function=np.testing.assert_allclose,
unpack=True,
**kwargs,
)
testing = _Testing() # use it as if it were a module, e.g. testing.assert_allclose(iq_1, iq_2, atol=1.e-6)