import copy
import numbers
import functools
from inspect import signature
import numpy as np
from mantid.kernel import V3D
from mantid.api import mtd
[docs]
def unpack_v3d(function, *args):
r"""
if the return value of ```function(args)``` is a V3D object, then cast to numpy.ndarray and return,
otherwise return ```function(*args)```
Parameters
----------
function: function
Callable receiving argument ```index``` and returning a V3D object.
args: list
Arguments to be passed to function.
"""
result = function(*args)
return np.array([result.X(), result.Y(), result.Z()]) if isinstance(result, V3D) else result
def _decrement_arity(attribute, index):
r"""
Decrement the arity of callable ``attribute`` by either evaluating this function, or creating a partial
function.
If the arity of ``attribute`` is one, then we evaluate ``attribute`` by passing ``index`` as its argument,
finally returning the value resulting from the evaluation. If the arity is bigger than one, then we create a
partial function by passing ``index`` as the first argument of ``attribute``.
Parameters
----------
attribute: function
Callable to evaluate or reduce it's arity with a partial function.
index: int
Value to use as fist argument of callable ``attribute``.
Returns
-------
PyObject, function
"""
if callable(attribute) is True:
try:
parameters = list(signature(attribute).parameters)
except ValueError: # no signature found for builtin <Boost.Python.function object at ...>
# Crappy hack. Determine number of arguments from docstring ! :(
doc_string = attribute.__doc__
n_parenthesis, first_index, last_index = (
1,
1 + doc_string.find("("),
len(doc_string),
)
counter_incremental = {"(": 1, ")": -1}
for i in range(first_index, len(doc_string)):
n_parenthesis += counter_incremental.get(doc_string[i], 0)
if n_parenthesis == 0:
last_index = i
break
parameters = doc_string[first_index:last_index].split(",")
if parameters[0][-4:] in ("self", "arg1"): # `attribute` is a bound method
parameters.pop()
if len(parameters) == 0:
return unpack_v3d(attribute) # attribute is a function of no arguments
if len(parameters) == 1:
return unpack_v3d(attribute, index) # attribute is a function of one argument
return functools.partial(attribute, index) # attribute is a function of more than one argument
return attribute # nothing to do is `attribute` is not callable
def _inverse_map(list_of_functions, *args, **kwargs):
r"""Apply a list of functions to a set of given arguments
Parameters
----------
list_of_functions: list
List whose items are callables.
args: list
Required argumens to be passed on to the functions of '`list_of_functions``.
kwargs: dict
Optional argumens to be passed on to the functions of '`list_of_functions``.
"""
return [function(*args, **kwargs) for function in list_of_functions]
[docs]
class SpectrumInfo:
def __init__(self, input_workspace, workspace_index):
r"""Wrapper to ~mantid.api.SpectrumInfo. We reduce the arity for the methods of `SpectrumInfo`, thus
converting them into methods of `SpectrumInfo`.
Example: function ~mantid.api.SpectrumInfo.isMasked(index) becomes simple attribute SpectrumInfo.isMasked.
The class contains additional methods for SpectrumInfo that are wrappers to methods
of ~mantid.api.Workspace. For example, function ~mantid.api.Workspace.readY(index) become simple
attribute SpectrumInfo.readY.
Parameters
----------
input_workspace: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventWorkspace
workspace_index: int, list
Single index or list of indexes to be used with the ``spectrum_info`` object.
"""
self._workspace = mtd[str(input_workspace)]
self._spectrum_info = self._workspace.spectrumInfo()
self.spectrum_info_index = workspace_index
def __getattr__(self, item):
r"""
Overload methods of ~mantid.api.SpectrumInfo so that they act as methods of SpectrumInfo.
Methods of ~mantid.api.SpectrumInfo with only one argument, the component info index, become
read-only properties of SpectrumInfo. Example: ``spectrum_info.hasUniqueDetectors(90177)``
becomes ``SpectrumInfo.hasUniqueDetectors``
Methods of ~mantid.geometry.SpectrumInfo with more than one argument where the first argument is the
component info index become methods of SpectrumInfo with this argument removed.
Parameters
----------
item: str
Name of anyone of the functions of ~mantid.geometry.SpectrumInfo.
"""
_spectrum_info = self.__dict__["_spectrum_info"]
try:
attribute = getattr(_spectrum_info, item)
if isinstance(self.spectrum_info_index, int):
return _decrement_arity(attribute, self.spectrum_info_index)
# `self.spectrum_info_index` is a list of spectrum indexes
arity_decremented_attributes = [_decrement_arity(attribute, i) for i in self.spectrum_info_index]
if callable(arity_decremented_attributes[0]) is True: # attribute's arity was bigger than one
# A partial function that will call each arity-decremented attribute with remaining method arguments
return functools.partial(_inverse_map, arity_decremented_attributes)
return np.array(arity_decremented_attributes) # attribute's arity was less than two
except AttributeError:
return super().__getattr__(item) # Next class in the Method Resolution Order
def __len__(self):
if isinstance(self.spectrum_info_index, int):
return 1
return len(self.spectrum_info_index)
def _iterate_with_indexes(self, function_name, array_type=np.array):
r"""
Utility function to discriminate between one or more indexes contained in ``spectrum_info_index`` when
evaluating a method invoked via a Mantid's workspace object.
Parameters
----------
function_name: str
Method name invoked with the workspace handle ``self._workspace``
array_type: type
Data structure constructor when ``self.spectrum_info_index`` is a list of indexes. Examples: list,
numpy.array
Returns
-------
Object
Object returned by invoking the function over the indexes in ``self.spectrum_info_index``.
"""
function = getattr(self._workspace, function_name)
if isinstance(self.spectrum_info_index, int):
return function(self.spectrum_info_index)
# copy.deepcopy is necessary if we're dealing with event workspaces, as only 50 event lists are
# stored at a time in memory. See function void EventWorkspaceMRU::ensureEnoughBuffersY() in Mantid's source.
return array_type([copy.deepcopy(function(index)) for index in self.spectrum_info_index])
@property
def readX(self):
return self._iterate_with_indexes("readX")
@property
def readY(self):
return self._iterate_with_indexes("readY")
@property
def readE(self):
return self._iterate_with_indexes("readE")
[docs]
class ElementComponentInfo:
def __init__(self, component_info, component_info_index):
r"""Wrapper to one of the components in ~mantid.ExperimentInfo.ComponentInfo
Parameters
----------
component_info: ~mantid.geometry.ComponentInfo
component_info_index: int
Index to be used with the component_info object
"""
self._component_info = component_info
self.component_info_index = component_info_index
def __getattr__(self, item):
r"""
Overload methods of ~mantid.geometry.ComponentInfo so that they act as methods of ElementComponentInfo.
Methods of ~mantid.geometry.ComponentInfo with only one argument, the component info index, become
read-only properties of ElementComponentInfo. Example: ``component_info.children(90177)``
becomes ``element_component_info.children``
Methods of ~mantid.geometry.ComponentInfo with more than one argument where the first argument is the
component info index become methods of ElementComponentInfo with this argument removed. Example:
``component_info.setPosition(90177, V3D(0.0, 0.0, 0.0)) becomes
``element_component_info.setPosition(V3D(0.0, 0.0, 0.0))``
"""
_component_info = self.__dict__["_component_info"]
try:
attribute = getattr(_component_info, item)
return _decrement_arity(attribute, self.component_info_index)
except AttributeError:
return super().__getattr__(item) # Next class in the Method Resolution Order
@property
def children(self):
return [int(index) for index in self._component_info.children(self.component_info_index)] # cast to int
def __len__(self):
return len(self.children)
def _resolve_indexes(input_workspace, component_info_index, workspace_index):
r"""Resolve the component info index or the workspace index when only one of the two indexes is provided"""
if workspace_index is not None and component_info_index is not None:
return (
component_info_index,
workspace_index,
) # nothing to do if both were provided
if workspace_index is None and component_info_index is None:
raise RuntimeError('Either "component/detector_info_index" or "spectrum_info/workspace_index" must be passed')
input_workspace = mtd[str(input_workspace)]
get_spectrum_definition = input_workspace.spectrumInfo().getSpectrumDefinition
if component_info_index is None:
component_info_index = get_spectrum_definition(workspace_index)[0][0]
if workspace_index is None:
def get_component_index(index):
return get_spectrum_definition(index)[0][0]
# Expensive search, since no a priori sorting between workspace indexes and component info indexes!!
for index in range(0, input_workspace.getNumberHistograms()):
if get_component_index(index) == component_info_index:
workspace_index = index
break
return component_info_index, workspace_index
[docs]
class PixelSpectrum(ElementComponentInfo, SpectrumInfo):
def __init__(self, input_workspace, component_info_index=None, workspace_index=None):
r"""
Wrapper of ~mantid.geometry.ComponentInfo, ~mantid.api.DetectorInfo, and ~mantid.api.SpectrumInfo for a pixel.
Additionally, the class contains additional methods invoked through objects of type ~mantid.api.Workspace
such as `readY`.
Parameters
----------
input_workspace: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventWorkspace
component_info_index: int
Index to be used with the ``ElementComponentInfo`` object. If not supplied, then ``workspace_index``
is used for initialization.
workspace_index: int
If not supplied, then ``detector_info_index`` is used for initialization.
"""
component_info_index, workspace_index = _resolve_indexes(
input_workspace, component_info_index, workspace_index
)
input_workspace = mtd[str(input_workspace)]
self._detector_info = input_workspace.detectorInfo()
ElementComponentInfo.__init__(self, input_workspace.componentInfo(), component_info_index)
SpectrumInfo.__init__(self, input_workspace, workspace_index)
def __getattr__(self, item):
try:
_detector_info = self.__dict__["_detector_info"]
attribute = getattr(_detector_info, item)
return _decrement_arity(attribute, self.component_info_index)
except AttributeError:
return super().__getattr__(item) # next class in the Method Resolution Order, (SpectrumInfo.__getattr__)
@property
def detector_id(self):
# remember that the componentInfo index is the same as the detectorInfo index
return self.detectorIDs[self.component_info_index]
@property
def position(self):
r"""Cartesian coordinates of the pixel detector. Overrides componentInfo().position
Coordinates typically correspond to the center of a cuboid detector, or the center for the base of a
cylindrical pixel.
"""
return unpack_v3d(self._component_info.position, self.component_info_index)
@position.setter
def position(self, xyz):
r"""Update the coordinates of the pixel. Can be used in place of componentInfo().setPosition
Parameters
----------
xyz: list
Either a:
- three-item iterable with the new X, Y, and Z coordinates
- two-item tuple of the form ('x', float), or ('y', float), or ('z', float) if we only want to update one
of the three coordinates.
"""
new_position = self.position
if len(xyz) == 3:
new_position = xyz # substitute with new position
else: # xyz of the form ('x', 34.5), or ('y', 34.5) or ('z', 34.5)
new_position["xyz".find(xyz[0])] = xyz[1] # update selected coordinate
self.setPosition(V3D(*list(new_position)))
@property
def width(self):
r"""Extent of the pixel detector along the Y-axis"""
return self.scaleFactor[0] * self.shape.getBoundingBox().width()[0]
@width.setter
def width(self, w):
scale_factor = self.scaleFactor
scale_factor[0] = w / self.shape.getBoundingBox().width()[0]
self.setScaleFactor(V3D(*scale_factor))
@property
def height(self):
r"""Extent of the pixel detector along the Z-axis"""
return self.scaleFactor[1] * self.shape.getBoundingBox().width()[1]
@height.setter
def height(self, h):
scale_factor = self.scaleFactor
scale_factor[1] = h / self.shape.getBoundingBox().width()[1]
self.setScaleFactor(V3D(*scale_factor))
@property
def area(self):
r"""Product of pixel width and height"""
return self.width * self.height
@property
def intensity(self):
r"""Summed intensity for the spectrum associated to this pixel"""
return sum(self.readY)
[docs]
class TubeSpectrum(ElementComponentInfo, SpectrumInfo):
[docs]
@staticmethod
def is_valid_tube(component_info, component_index):
r"""
Determine if all the immediate children-components are detectors, as in the case of a tube.
Warnings
--------
This function does not invalidate other components than tubes whose immediate children-components are
all detectors.
Parameters
----------
component_info: ~mantid.geometry.ComponentInfo
component_index: int
Returns
-------
bool
"""
children_indexes = [int(i) for i in component_info.children(component_index)]
if len(children_indexes) == 0:
return False # we hit a detector
if component_info.isDetector(children_indexes[0]) is False:
return False # quick check. The first child is not a detector
children_are_detectors = [component_info.isDetector(index) for index in children_indexes]
if len(set(children_are_detectors)) != 1:
return False # at least one child is not a detector
return True
def __init__(self, input_workspace, component_info_index, workspace_indexes):
r"""Wrapper of ~mantid.geometry.ComponentInfo when the component is a tube of detector pixels.
Parameters
----------
input_workspace: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventWorkspace
component_info_index: int
Index corresponding to the tube
"""
workspace = mtd[str(input_workspace)]
if self.is_valid_tube(workspace.componentInfo(), component_info_index) is False:
raise ValueError("The component index is not associated to a valid tube")
self._detector_info = workspace.detectorInfo()
self._pixels = list()
SpectrumInfo.__init__(self, workspace, workspace_indexes)
ElementComponentInfo.__init__(self, workspace.componentInfo(), component_info_index)
def __len__(self):
return len(self.spectrum_info_index)
@property
def detector_info_index(self):
r"""Array of detector info indexes for the pixels contained in the tube."""
return self.children
@property
def detector_ids(self):
all_detector_ids = self._detector_info.detectorIDs()
return all_detector_ids[self.detector_info_index]
@property
def pixels(self):
r"""List of ~drtsans.tubecollection.PixelSpectrum objects making up the tube.
Returns
-------
list
"""
if len(self._pixels) == 0:
for component_info_index, workspace_index in zip(self.children, self.spectrum_info_index):
self._pixels.append(
PixelSpectrum(
self._workspace,
component_info_index=component_info_index,
workspace_index=workspace_index,
)
)
return self._pixels
def __getitem__(self, item):
return self.pixels[item] # iterate over the pixels
# Below are a few properties to hide complexity when coding for the bar-scan calibration
@property
def pixel_scale_factors(self):
r"""Convenience property to get the size scale factors for each pixel"""
return np.array([unpack_v3d(self._component_info.scaleFactor, i) for i in self.detector_info_index])
@property
def pixel_heights(self):
r"""Convenience property to get/set the pixel heights"""
first_index = self.detector_info_index[0] # component info index of the first pixel in the tube
nominal_height = self._component_info.shape(first_index).getBoundingBox().width().Y()
return nominal_height * self.pixel_scale_factors[:, 1]
@pixel_heights.setter
def pixel_heights(self, heights):
r"""
Parameters
----------
heights: float or list
Either a list of heights or a single number if all pixel heights are to be the same.
"""
heights = (
[
heights,
]
* len(self)
if isinstance(heights, numbers.Real)
else heights
)
for pixel, height in zip(self.pixels, heights):
pixel.height = height
@property
def pixel_widths(self):
r"""Convenience property to get/set the pixel widths"""
first_index = self.detector_info_index[0] # component info index of the first pixel in the tube
nominal_width = self._component_info.shape(first_index).getBoundingBox().width().X()
return nominal_width * self.pixel_scale_factors[:, 0]
@pixel_widths.setter
def pixel_widths(self, widths):
r"""
Parameters
----------
widths: float or list
Either a list of widths or a single number if all pixel widths are to be the same.
"""
widths = (
[
widths,
]
* len(self)
if isinstance(widths, numbers.Real)
else widths
)
for pixel, width in zip(self.pixels, widths):
pixel.width = width
@property
def pixel_positions(self):
return np.array([unpack_v3d(self._component_info.position, i) for i in self.detector_info_index])
@property
def pixel_y(self):
r"""Convenience property to get/set the pixel Y-coordinate"""
return self.pixel_positions[:, 1]
@pixel_y.setter
def pixel_y(self, y_coordinates):
r"""
Parameters
----------
y_coordinates: list
List, or any other iterable, to update the Y-coordinates of the pixels in the tube.
"""
for pixel, y in zip(self.pixels, y_coordinates):
pixel.position = ("y", y)
@property
def x_boundaries(self):
r"""Coordinates along the X-axis of the tube boundaries."""
first_index = self.detector_info_index[0] # component info index of the lowest pixel in the tube
x = self._component_info.position(first_index).X() # X-coordinate for the center of the first pixel
dx = self.width
return [x - dx / 2, x + dx / 2]
@property
def pixel_y_boundaries(self):
r"""Coordinates along the Y-axis of the pixel boundaries"""
first_index = self.detector_info_index[0] # component info index of the lowest pixel in the tube
first_y = self._component_info.position(first_index).Y() # Y-coordinate for the center of the first pixel
heights = np.cumsum(self.pixel_heights) # cummulative sum of the pixel heights
return np.insert(heights, 0, 0.0) + (first_y - heights[0] / 2) # pixel boundaries
@property
def pixel_intensities(self):
r"""Array of pixel_intensities for every pixel"""
return np.array([pixel.intensity for pixel in self.pixels])
@property
def isMasked(self):
r"""Mask flag for each pixel in the tube"""
return np.array([self._spectrum_info.isMasked(i) for i in self.spectrum_info_index])
@property
def width(self):
r"""Tube width, taken as width of the first pixel"""
first_index = self.detector_info_index[0] # component info index of the first pixel in the tube
nominal_width = self._component_info.shape(first_index).getBoundingBox().width().X()
scaling = self._component_info.scaleFactor(first_index).X()
return nominal_width * scaling
[docs]
class TubeCollection(ElementComponentInfo):
[docs]
@staticmethod
def map_detector_to_spectrum(input_workspace):
r"""A mapping from detector info index (or component info index) to spectrum index (or workspace index)
Parameters
----------
input_workspace: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventWorkspace
Returns
-------
dict
"""
input_workspace = mtd[str(input_workspace)]
get_spectrum_definition = input_workspace.spectrumInfo().getSpectrumDefinition
def get_detector_info_index(workspace_index):
return get_spectrum_definition(workspace_index)[0][0]
detector_to_spectrum = dict()
for spectrum_index in range(input_workspace.getNumberHistograms()):
detector_to_spectrum[get_detector_info_index(spectrum_index)] = spectrum_index
return detector_to_spectrum
def __init__(self, input_workspace, component_name):
r"""
A list of ~drtsans.tubecollection.TubeInfo objects making up one of the instrument components, such as the
front panel, a double panel, or the whole instrument.
Parameters
----------
input_workspace: ~mantid.api.Workspace
component_name: str
One of the named components in the instrument geometry file.
"""
input_workspace = mtd[str(input_workspace)]
component_info = input_workspace.componentInfo()
if component_info is None:
raise RuntimeError(f'Could not find component info for Workspace "{str(input_workspace)}"')
for component_index in range(component_info.root(), -1, -1):
if component_info.name(component_index) == component_name:
super().__init__(component_info, component_index)
break
if component_info.isDetector(component_index) is True: # we reached the detector with the highest index
raise RuntimeError(f'Could not find a component with name "{component_name}"')
self._tubes = list()
self._sorting_permutations = {}
self._input_workspace = input_workspace
# A map from detectorInfo index (or componentInfo index) to workspace spectrum index
self.detector_to_spectrum = None
def __getitem__(self, item):
return self.tubes[item]
def __len__(self):
return len(self.tubes)
@property
def tubes(self):
r"""List of ~drtsans.tubecollection.TubeSpectrum objects ordered using their component info indexes,
from smallest to highest index."""
if len(self._tubes) == 0:
# Initialize the mapping between spectrum indexes and detectorInfo indexes
self.detector_to_spectrum = self.map_detector_to_spectrum(self._input_workspace)
# Iterate over the components of the instrument that are not detectors
non_detector_indexes = sorted(
[int(i) for i in set(self.componentsInSubtree) - set(self.detectorsInSubtree)]
)
for component_info_index in non_detector_indexes:
if TubeSpectrum.is_valid_tube(self._component_info, component_info_index) is True:
tube_info = ElementComponentInfo(self._component_info, component_info_index)
# Find workspace indexes associated to the component/detector info indexes
workspace_indexes = [self.detector_to_spectrum[index] for index in tube_info.children]
self._tubes.append(
TubeSpectrum(
self._input_workspace,
component_info_index,
workspace_indexes,
)
)
return self._tubes
@property
def sorted_views(self):
r"""Dictionary mapping a particular sorting of the tubes to the permutation of their component info indexes
required to attain the desired sorting."""
return self._sorting_permutations.keys()
[docs]
def sorted(self, key=None, reverse=False, view="decreasing X"):
r"""
Sort the list of ~drtsans.tubecollection.TubeInfo objects in the prescribed order.
Parameters
----------
key: :py:obj:`function`
Function of one argument to extract a comparison key from each element in ``self.tubes``. If None, then
the selected ``view`` determines the order
reverse: bool
Reverse the order resulting from application of ``key`` or ``view``
view: str
Built-in permutations of the tubes prescribing a particular order. Valid views are:
- 'fbfb': order the tubes alternating front tube and back tube. It is assumed that all front tubes
are first listed in the instrument definition file for the double panel, followed by the back tubes. It
is also assumed the number of front and back tubes is the same.
- 'decreasing X': order the tubes by decreasing X-coordinate. This view can "flatten" a double
detector panel when viewed from the sample "from left to right".
- 'workspace index': order the tubes by increasing workspace index for the first pixel of each tube.
"""
if key is not None:
return sorted(self._tubes, key=key, reverse=reverse)
permutation = self._sorting_permutations.get(view, None)
if permutation is None:
if view == "fbfb":
number_front_tubes = int(len(self.tubes) / 2) # also the number of back tubes
permutation = []
for i in range(number_front_tubes):
permutation.extend([i, i + number_front_tubes])
self._sorting_permutations["fbfb"] = permutation
elif view == "decreasing X": # initialize this view
x_coords = [tube.position[0] for tube in self.tubes] # X coords of each tube
permutation = np.flip(np.argsort(x_coords), axis=0).tolist()
self._sorting_permutations["decreasing X"] = permutation
elif view == "workspace index": # initialize this view
# spectrum index of first pixel for each tube
permutation = np.argsort([tube.spectrum_info_index[0] for tube in self.tubes])
self._sorting_permutations["workspace index"] = permutation
sorted_list = [self._tubes[i] for i in permutation]
return sorted_list if reverse is False else sorted_list[::-1]