Source code for drtsans.pixel_calibration

import copy
import enum
import itertools
import json
import numpy as np
import numexpr
import os
import stat
import sys
import warnings


r""" Hyperlinks to mantid algorithms
ApplyCalibration <https://docs.mantidproject.org/nightly/algorithms/ApplyCalibration-v1.html>
CloneWorkspace <https://docs.mantidproject.org/nightly/algorithms/CloneWorkspace-v1.html>
CreateEmptyTableWorkspace <https://docs.mantidproject.org/nightly/algorithms/CreateEmptyTableWorkspace-v1.html>
CreateWorkspace <https://docs.mantidproject.org/nightly/algorithms/CreateWorkspace-v1.html>
DeleteWorkspaces <https://docs.mantidproject.org/nightly/algorithms/DeleteWorkspaces-v1.html>
FilterEvents <https://docs.mantidproject.org/nightly/algorithms/FilterEvents-v1.html>
GenerateEventsFilter <https://docs.mantidproject.org/nightly/algorithms/GenerateEventsFilter-v1.html>
Integration <https://docs.mantidproject.org/nightly/algorithms/Integration-v1.html>
LoadEmptyInstrument <https://docs.mantidproject.org/nightly/algorithms/LoadEmptyInstrument-v1.html>
LoadInstrument <https://docs.mantidproject.org/nightly/algorithms/LoadInstrument-v1.html>
LoadNexus <https://docs.mantidproject.org/nightly/algorithms/LoadNexus-v1.html>
MaskDetectors <https://docs.mantidproject.org/nightly/algorithms/MaskDetectors-v1.html>
MaskDetectorsIf <https://docs.mantidproject.org/nightly/algorithms/MaskDetectorsIf-v1.html>
ReplaceSpecialValues <https://docs.mantidproject.org/nightly/algorithms/ReplaceSpecialValues-v1.html>
SaveNexus <https://docs.mantidproject.org/nightly/algorithms/SaveNexus-v1.html>
"""
from mantid.simpleapi import (
    AddSampleLog,
    ApplyCalibration,
    ClearMaskFlag,
    CloneWorkspace,
    CreateEmptyTableWorkspace,
    DeleteWorkspaces,
    FilterEvents,
    GenerateEventsFilter,
    Integration,
    Load,
    LoadEventNexus,
    LoadNexus,
    LoadNexusProcessed,
    MaskDetectors,
    MaskDetectorsIf,
    Rebin,
    ReplaceSpecialValues,
    SaveNexus,
)
from mantid.api import FileLoaderRegistry, mtd

r"""
Hyperlinks to drtsans functions
namedtuplefy <https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans/settings.py>
SampleLogs <https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans/samplelogs.py>
TubeCollection <https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans/tubecollection.py>
"""  # noqa: E501
from drtsans.instruments import (
    InstrumentEnumName,
    instrument_enum_name,
    instrument_standard_name,
)
from drtsans.mask_utils import apply_mask
from drtsans.path import exists as file_exists
from drtsans.settings import namedtuplefy
from drtsans.samplelogs import SampleLogs
from drtsans.tubecollection import TubeCollection


__all__ = [
    "apply_calibrations",
    "as_intensities",
    "calculate_apparent_tube_width",
    "day_stamp",
    "calculate_barscan_calibration",
    "load_calibration",
    "split_barscan_run",
]


class CalibrationNotFound(Exception):
    """Custom exception to be raised when no appropriate calibration is found in the database"""

    pass


r"""Flags a problem when running the barscan algorithm that identifies the pixel corresponding
to the bottom of the shadow cast by the bar on the detector array."""
INCORRECT_PIXEL_ASSIGNMENT = -1


r"""Default files storing the metadata of the pixel calibrations. There's one file for each instrument."""
database_file = {
    InstrumentEnumName.BIOSANS: "/HFIR/CG3/shared/calibration/pixel_calibration.json",
    InstrumentEnumName.EQSANS: "/SNS/EQSANS/shared/calibration/pixel_calibration.json",
    InstrumentEnumName.GPSANS: "/HFIR/CG2/shared/calibration/pixel_calibration.json",
}


class CalType(enum.Enum):
    r"""Enumerate the possible types of pixel calibrations"""
    BARSCAN = "BARSCAN"
    TUBEWIDTH = "TUBEWIDTH"


def loader_algorithm(input_file):
    r"""
    Determine which Mantid algorithm to use to load a file.

    If a specialized loading algorithm can't be found, then `Load` algorithm is returned.

    devs - Jose Borreguero <borreguerojm@ornl.gov>

    Parameters
    ----------
    input_file: str

    Returns
    -------
    ~mantid.api.Algorithm
    """
    loaders = {
        "LoadNexusProcessed": LoadNexusProcessed,
        "LoadEventNexus": LoadEventNexus,
    }
    loader_name = FileLoaderRegistry.Instance().chooseLoader(input_file).name()
    return loaders.get(loader_name, Load)


[docs] def day_stamp(input_workspace): r""" Find the day stamp (e.g 20200311 for March 11, 2020) using the "start_time" metadata from the Nexus events file as input. devs - Jose Borreguero <borreguerojm@ornl.gov> Parameters ---------- input_workspace: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventsWorkspace Workspace from which the day stamp is to be retrieved. Returns ------- int """ return int(SampleLogs(input_workspace).start_time.value[0:10].replace("-", ""))
class BarPositionFormula: # Default formulae for each instrument and detector component _default_formula = "565 - {y} + 0.0 * {tube}" # no dependency on the tube index _default_formulae = { ("BIOSANS", "detector1"): "565 - {y} + 0.0083115 * (191 - {tube})", ("BIOSANS", "wing_detector"): _default_formula, ("BIOSANS", "midrange_detector"): _default_formula, ("EQSANS", "detector1"): _default_formula, ("GPSANS", "detector1"): "565 - {y} - 0.0914267 * (191 - {tube})", } @staticmethod def _elucidate_formula(instrument_component): r""" Find the default formula for an instrument and detector array. Parameters ---------- instrument_component: tuple A two-item tuple. The first item is the standard name of the instrument, e.g. 'BIOSANS'; the second item is the name of the detector array (e.g. 'detector1', 'wing_detector' or 'midrange_detector'). Returns ------- str Warnings -------- UserWarning When ```instrument_component``` is not found in the default instrument formulae. """ default_formula = BarPositionFormula._default_formula default_formulae = BarPositionFormula._default_formulae if instrument_component not in default_formulae: warnings.warn( f"Unable to find a bar position formula for argument {instrument_component}.\n" f"Using default formula: {default_formula}.\n" f"Valid values are {default_formulae.keys()}\n" ) return default_formulae.get(instrument_component, default_formula) @staticmethod def _validate_symbols(formula): r""" Assess if a formula provider by the user contain the required symbols. Symbols '{y}' is required and symbol '{tube}' is optional. If '{tube}' is not present in the formulat, a null term '0.0 * {tube}' is appended to the formula. Parameters ---------- formula: str Formula to obtain the Y-coordinate of the bar in the frame of reference of the sample. Raises ------ ValueError When the formula fails to contain symbols '{y}' and '{tube}'. """ if "{y}" not in formula: raise ValueError(f'Formula does not contain "{{y}}", e.g. formula = "565-{{y}}+0.008*(191-{{tube}})"') if "{tube}" not in formula: warnings.warn(f'Formula does not contain "{{tube}}", e.g. formula = "565-{{y}}+0.008*(191-{{tube}})"') formula += " + 0.0 * {tube}" return formula def __init__(self, instrument_component=None, formula=None): r""" Formula to obtain the bar position in the frame of reference of the sample, in milimeters. There are default formulae for each instrument and detector array. Parameters ---------- instrument_component: tuple A two-item tuple. The first item is the standard name of the instrument, e.g. 'BIOSANS'; the second item is the name of the detector array (e.g. 'detector1', 'wing_detector' or 'midrange_detector'). formula: str Formula Raises ------ RuntimeError When neither ```instrument_component``` nor ```formula``` is supplied. """ if instrument_component is not None: self._formula = self._elucidate_formula(instrument_component) elif formula is not None: self._formula = self._validate_symbols(formula) else: raise RuntimeError("Insufficient input to create a bar position formula") def __str__(self): return self._formula def evaluate(self, bar_position, tube_index): r""" Y-coordinate of the bar in the frame of reference of the sample. Parameters ---------- bar_position: float Log value for the bar, in mili meters. tube_index: int Tube index. The first index (zero) corresponds to the leftmost tube when standing at the sample. position. Returns ------- float """ values_inserted = self._formula.format(y=bar_position, tube=tube_index) return float(numexpr.evaluate(values_inserted)) def validate_top_position(self, bar_top_position): r""" Assert that the formula evaluates to a positive Y-coordinate when the bar is at the top position. Parameters ---------- bar_top_position: float Log value when the bar is located at the top position. Raises ------ RuntimeError When the formula evaluates to a non-positive Y-coordinate """ if self.evaluate(bar_top_position, 0) <= 0: # evaluate for the leftmost tube raise RuntimeError( f"The Y-coordinate of the bar in the frame of reference of the sample when\n" f" the bar is placed at the top position has evaluated as non-negative.\n" f' The formula used is "{self._formula}"' ) class Table: r"""Container for a table of pixel calibration item data, plus metadata The table object holds two attributes: - metadata, dict, informs about the calibration run, instrument, detector array. - table, ~mantid.api.TableWorkspace, containing the actual calibration data. devs - Jose Borreguero <borreguerojm@ornl.gov> Parameters ---------- metadata: dict Dictionary with the following fields about the calibration: - caltype, str, the type of calibration (BARSCAN, TUBEWIDTH) - instrument, str, standard name of the instrument for which the calibration was carried out. - component, str, standard name of the double detector array for which the calibration was carried out. - daystamp, int, 8-digit integer whose digits are to be understood as YYYYMMDD. - run_numbers, list, list of run numbers that encompassed the calibration. detector_ids: list List of detector IDs for which a calibration has been carried out. positions: list List of Y-coordinates for each detector, in meters. heights: list List of detector heights (along the Y-), in meters. widths: list List of detector widths (along the X-axis), in meters. """ @classmethod def compose_table_name(cls, metadata): r"""Standard workspace name for a calibration table, built as a composite name using the calibration type, instrument, component, and daystamp. (e.g. "barscan_gpsans_detector1_20200311") Parameters ---------- metadata: dict Dictionary containing the metadata of one calibration Returns ------- str """ m = metadata # handy shortcut return f'{m["caltype"].lower()}_{m["instrument"]}_{m["component"]}_{str(m["daystamp"])}' @classmethod def load(cls, database, caltype, instrument, component, daystamp, output_workspace=None): r""" Loading a calibration requires loading the calibration metadata from a ```database``` file along with loading a Nexus file containing the actual data. Metadata and data are encapsulated in a ~drtsans.pixel_calibration.Table object. This object contains attribute ```table``` which holds the actual data in a ~mantid.api.TableWorkspace. **Mantid algorithms used:** :ref:`LoadNexus <algm-LoadNexus-v1>`, <https://docs.mantidproject.org/algorithms/LoadNexus-v1.html> Parameters ---------- database: str Path to JSON file containing metadata for different calibrations. caltype: str Type of calibration (BARSCAN, TUBEWIDHT). instrument: str Standard name of the instrument for which the calibration was carried out. component: str Standard name of the double detector array for which the calibration was carried out. daystamp: int 8-digit integer whose digits are to be understood as YYYYMMDD. The returned calibration will have a daystamp equal or more recent. output_workspace: str Name of the output ~mantid.api.TableWorkspace containing the calibration data. If :py:obj:`None`, a composite name is created using the calibration type, instrument, component, and daystamp. (e.g. "barscan_gpsans_detector1_20200311"). Returns -------' ~drtsans.pixel_calibration.Table """ # Search the database for a match to the required metadata not_found_message = f"No suitable {caltype}_{instrument}_{component} calibration found in {database}" with open(database, mode="r") as json_file: entries = json.load(json_file) # list of metadata entries stored in `database` required = { caltype, instrument, component, } # required metadata pieces of information # Filter the metadata entries, leaving out those not containing the required pieces of information candidates = [entry for entry in entries if required.issubset(set([str(v) for v in entry.values()]))] if len(candidates) == 0: raise CalibrationNotFound(not_found_message) candidates.sort(key=lambda c: c["daystamp"], reverse=True) # sort candidates by decreasing day stamp # Find the metadata entry with the closest (equal or smaller) day stamp to the input `daystamp` for candidate in candidates: if candidate["daystamp"] <= daystamp: if output_workspace is None: output_workspace = Table.compose_table_name(candidate) table = LoadNexus(candidate["tablefile"], OutputWorkspace=output_workspace) return Table(candidate, table=table) raise CalibrationNotFound(not_found_message) @classmethod def build_mantid_table(cls, output_workspace, detector_ids, positions=None, heights=None, widths=None): r""" Instantiate a Table workspace with input calibration data. The table contains a series of columns, the first one always being the list of detector IDs for which a calibration has been made. A 'BARSCAN' calibration will contains two additional columns, namely 'Detector Y Coordinate' and 'Detector Height' to store respective pixel positions and heights, in meters. A 'TUBEWIDTH' calibration table will contain one additional column, namely 'Detector Width' to store pixel widths, in meters. **Mantid algorithms used:** :ref:`CreateEmptyTableWorkspace <algm-CreateEmptyTableWorkspace-v1>`, <https://docs.mantidproject.org/algorithms/CreateEmptyTableWorkspace-v1.html> Parameters ---------- output_workspace: str Name of the output table workspace. detector_ids: list List of detector IDs for which a calibration has been carried out. positions: list List of Y-coordinates for each detector, in meters. heights: list List of detector heights (along the Y-), in meters. widths: list List of detector widths (along the X-axis), in meters. Returns ------- ~mantid.api.TableWorkspace """ # All possible columns names (and associated items) for a calibration table columns_data = { "Detector Y Coordinate": positions, "Detector Height": heights, "Detector Width": widths, } # Remove names not appropriate for the calibration type. For example, a barscan calibration will # not contain data for pixel widths (widths==None), so item 'Detector Width': widths is removed [columns_data.pop(column) for column in list(columns_data.keys()) if columns_data[column] is None] table = CreateEmptyTableWorkspace(OutputWorkspace=output_workspace) table.addColumn(type="int", name="Detector ID") [table.addColumn(type="double", name=column) for column in columns_data] # Loop over each calibrated detector and add a new row in the table for i in range(len(detector_ids)): row = {"Detector ID": detector_ids[i]} row.update({column: data[i] for column, data in columns_data.items()}) table.addRow(row) return table @classmethod def validate_metadata(cls, metadata): r""" Verify the metadata contains entries for: instrument, name of the double-detector-array, and day stamp. Parameters ---------- metadata: dict Returns ------- bool Raises ------ ValueError The metadata is missing one of the required entries. """ required_keys = {"instrument", "component", "daystamp"} if required_keys.issubset(metadata.keys()) is False: raise ValueError(f"Metadata is missing one or more of these entries: {required_keys}") def __init__( self, metadata, table=None, detector_ids=None, positions=None, heights=None, widths=None, ): Table.validate_metadata(metadata) self.metadata = copy.copy(metadata) if table is None: # Create table using column data output_workspace = Table.compose_table_name(metadata) self.table = Table.build_mantid_table( output_workspace, detector_ids, positions=positions, heights=heights, widths=widths, ) else: self.table = mtd[str(table)] def __getattr__(self, item): r""" Access metadata's keys as attributes of the ```Table``` object. For instance, self.metadata['instrument'] can be accessed as self.instrument """ if item not in self.__dict__: return self.__dict__["metadata"][item] return self.__dict__[item] def column_values(self, name): r""" Return a list of values for the selected table column. Possible names are 'Detector ID', 'Detector Y Coordinate', 'Detector Height', and 'Detector Width'. Parameters ---------- name: str Name of the column. Must match the name of one of the columns in the ~mantid.api.TableWorkspace ```table``` attribute. Returns ------- list """ column_names = self.table.getColumnNames() try: column_index = column_names.index(name) except ValueError: raise ValueError(f'"{name}" is not a column name of the calibration table') return self.table.column(column_index) @property def detector_ids(self): r"""List of pixel positions stored in the calibration table.""" return self.column_values("Detector ID") @property def positions(self): r"""List of pixel positions stored in the calibration table.""" return self.column_values("Detector Y Coordinate") @property def heights(self): r"""List of pixel heights stored in the calibration table.""" return self.column_values("Detector Height") @property def widths(self): r"""List of pixel widths stored in the calibration table.""" return self.column_values("Detector Width") def apply(self, input_workspace, output_workspace=None): r""" Apply a calibration to an input workspace, and return the calibrated workspace. **Mantid algorithms used:** :ref:`CloneWorkspace <algm-CloneWorkspace-v1>`, <https://docs.mantidproject.org/algorithms/CloneWorkspace-v1.html> :ref:`ApplyCalibration <algm-ApplyCalibration-v1>`, <https://docs.mantidproject.org/algorithms/ApplyCalibration-v1.html> Parameters ---------- input_workspace: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventsWorkspace Workspace to which calibration needs to be applied. output_workspace: str Name of the output workspace with calibrated pixels. If :py:obj:`None`, the pixels of the input workspace will be calibrated and no new workspace is generated. Returns ------- ~mantid.api.MatrixWorkspace, ~mantid.api.IEventsWorkspace """ if output_workspace is None: output_workspace = str(input_workspace) # calibrate the pixels of the input workspace else: # Copy the input workspace and calibrate the pixels of the copy CloneWorkspace(InputWorkspace=input_workspace, OutputWorkspace=output_workspace) ApplyCalibration(Workspace=output_workspace, CalibrationTable=self.table) return mtd[output_workspace] def save(self, database=None, tablefile=None, overwrite=False): r""" Save the calibration metadata in a JSON file, and the calibration table workspace in a Nexus file. **Mantid algorithms used:** :ref:`SaveNexus <algm-SaveNexus-v1>`, <https://docs.mantidproject.org/algorithms/SaveNexus-v1.html> Parameters ---------- database: str Path to the JSON file where the ```metadata``` dictionary will be appended. If :py:obj:`None`, then the appropriate default file from ~drtsans.pixel_calibration.database_file is used. Currently, these are the default files: - BIOSANS, '/HFIR/CG3/shared/calibration/pixel_calibration.json', - EQSANS, '/SNS/EQSANS/shared/calibration/pixel_calibration.json', - GPSANS, '/HFIR/CG2/shared/calibration/pixel_calibration.json' tablefile: str Path to the Nexus file storing the pixel calibration data. If :py:obj:`None`, then a composite name is created using the calibration type, instrument, component, and daystamp. (e.g. "barscan_gpsans_detector1_20200311"). The file is saved under subdirectory 'tables', located within the directory of the ```database``` file. For instance, '/HFIR/CG3/shared/calibration/tables/barscan_gpsans_detector1_20200311.nxs' overwrite: bool Substitute existing entry with same metadata. Raises ------ ValueError If we save a calibration already in the database with option ```overwrite=False```. """ if database is None: database = database_file[instrument_enum_name(self.instrument)] # default database file # Load existing calibrations entries = list() if os.path.exists(database): # the database may not exist if we're not saving to the default database with open(database, mode="r") as json_file: entries = json.load(json_file) # list of metadata entries # Is there an entry with the same metadata? discard_index = None for i in range(len(entries)): if self.duplicate_metadata(entries[i]) is True: if overwrite is True: discard_index = i # index in the list of entries to replace break else: raise ValueError( "A calibration with the same metadata already exists in the database." 'Use "overwrite=True" if you want to overwrite the existing calibration' ) # Save the table containing the actual data. if tablefile is None: cal_dir = os.path.join(os.path.dirname(database), "tables") # directory where to save the table file os.makedirs(cal_dir, exist_ok=True) # Create directory, and don't complain if already exists tablefile = os.path.join(cal_dir, Table.compose_table_name(self.metadata)) + ".nxs" self.metadata["tablefile"] = tablefile # store the location in the metadata, used later when loading. # save new table and overwrite existing one if having the same name if os.path.exists(tablefile): os.remove(tablefile) SaveNexus(InputWorkspace=self.table, Filename=tablefile) os.chmod(tablefile, 0o666) # everybody can read and write # Save the metadata, and replace an existing duplicate entry if required to do so if discard_index is not None: entries[discard_index] = self.metadata # replace the metadata else: entries.append(self.metadata) # this is a new entry if os.path.exists(database): os.remove(database) # delete the old database with open(database, mode="w") as json_file: json.dump(entries, json_file, indent=2) # save the new database os.chmod(database, 0o666) # everybody can read and write def duplicate_metadata(self, metadata): r""" Find if the metadata coincides with a query metadata. Keys used for comparison are "caltype", "component", "daystamp", "instrument", and "runnumbers". Parameters ---------- metadata: dict Returns ------- bool """ for key in ("caltype", "instrument", "component", "daystamp"): if self.metadata[key] != metadata[key]: return False # We don't require the run numbers to be sorted if set(self.metadata["runnumbers"]) != set(self.metadata["runnumbers"]): return False return True @namedtuplefy def as_intensities(self, reference_workspace): r""" Returns one workspace for each pixel property that is calibrated (e.g., pixel height), and the calibration datum is stored as the intensity value for that pixel. Intended to visualize the calibration in MantidPlot's instrument viewer. Not required for calibration generation or for data reduction. For example, a BARSCAN calibration will generate workspaces ```tablename_positions``` and ```tablename_heights```, where ```tablename``` is the name of the ~mantid.api.TableWorkspace holding the calibration data. Note: Positions for visualization in Mantid's instrument view are shifted so that the lowest position (usually a negative number) becomes zero. The reason being that showing the instrument in Mantid will mask negative intensities, and we want to avoid this. **Mantid algorithms used:** :ref:`CreateWorkspace <algm-CreateWorkspace-v1>`, <https://docs.mantidproject.org/algorithms/CreateWorkspace-v1.html> :ref:`LoadEmptyInstrument <algm-LoadEmptyInstrument-v1>`, <https://docs.mantidproject.org/algorithms/LoadEmptyInstrument-v1.html> :ref:`LoadInstrument <algm-LoadInstrument-v1>`, <https://docs.mantidproject.org/algorithms/LoadInstrument-v1.html> Parameters ---------- reference_workspace: str, ~mantid.api.MatrixWorkspace, , ~mantid.api.IEventsWorkspace Workspace having the same embedded instrument as the instrument used in the barscan calculation Returns ------- namedtuple A namedtuple containing the ~mantid.api.MatrixWorkspace workspaces with the following fields - 'positions' and 'heights' for a BARSCAN calibration - 'widths' for a TUBEWIDTH calibration """ # Create the template workspace for the views. It will contain a single intensity value per histogram reference = mtd.unique_hidden_name() # We only need one intensity value per histogram Rebin( InputWorkspace=reference_workspace, OutputWorkspace=reference, Params=[0, 1000000, 1000000], PreserveEvents=False, ) # In the event that the reference workspace has any of the calibrated detector pixels masked, we have to # preemptively clear the mask of all pixels ClearMaskFlag(Workspace=reference) # Find workspace indexes having a calibrated detector, otherwise flag them to be masked detector_ids = self.detector_ids detector_ids_dict = {detector_ids[i]: i for i in range(0, len(detector_ids))} wi_to_ri = [] # from workspace index to table's row index workspace_indexes_to_mask = [] get_detector = mtd[reference].getDetector # find the min and max boundaries of detector_ids # to skip checks in the upcoming loop min_detector_id = min(detector_ids) max_detector_id = max(detector_ids) for workspace_index in range(mtd[reference].getNumberHistograms()): detector = get_detector(workspace_index) # associated detector pixel # Find the entry in the calibration table for the detector with a particular detector ID detector_id = detector.getID() if detector_id >= min_detector_id and detector_id <= max_detector_id and detector_id in detector_ids_dict: wi_to_ri.append((workspace_index, detector_ids_dict[detector_id])) else: # This detector was not calibrated, thus is not in detector_ids workspace_indexes_to_mask.append(workspace_index) # Mask those uncalibrated detectors MaskDetectors(Workspace=reference, WorkspaceIndexList=workspace_indexes_to_mask) def transfer_values_to_workspace(property_values, property_name): r"""Create a workspace with the modified intensities, and overlay them in the appropriate instrument""" output_workspace = f"{self.table.name()}_{property_name}" workspace = CloneWorkspace(InputWorkspace=reference, OutputWorkspace=output_workspace) # substitute the intensity in this histogram with the calibration datum for the detector pixel for wi, ri in wi_to_ri: workspace.dataY(wi)[:] = property_values[ri] return mtd[output_workspace] calibration_properties = ( ["positions", "heights"] if self.caltype == "BARSCAN" else [ "widths", ] ) returned_views = {} for cal_prop in calibration_properties: values = getattr(self, cal_prop) returned_views[cal_prop] = transfer_values_to_workspace(values, cal_prop) # Mantid cannot display negative intensities if "positions" in calibration_properties: values = np.array(self.positions) returned_views["positions_mantid"] = transfer_values_to_workspace( values - np.min(values), "positions_mantid" ) DeleteWorkspaces( [ reference, ] ) return returned_views
[docs] def load_calibration( input_workspace, caltype, component="detector1", database=None, output_workspace=None, ): r""" Load a calibration into a ~drtsans.pixel_calibration.Table object. devs - Jose Borreguero <borreguerojm@ornl.gov> Parameters ---------- input_workspace: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventsWorkspace Workspace from which calibration session is to be retrieved. caltype: str Either 'BARSCAN' or 'TUBEWIDTH'. A saved calibration can only contain one of these, but not both. component: str Name of one of the double detector array panels. For BIOSANS we have 'detector1', 'wing-detector' or 'midrange_detector' database: str Path to database file containing the metadata for the calibrations. If :py:obj:`None`, the default database is used. Currently, these are the default files: - BIOSANS, '/HFIR/CG3/shared/calibration/pixel_calibration.json', - EQSANS, '/SNS/EQSANS/shared/calibration/pixel_calibration.json', - GPSANS, '/HFIR/CG2/shared/calibration/pixel_calibration.json' output_workspace: str Name of the table workspace containing the calibration session values. If :py:obj:`None`, then a composite name is created using the calibration type, instrument, component, and daystamp. (e.g. "barscan_gpsans_detector1_20200311") Returns ------- ~drtsans.pixel_calibration.Table """ enum_instrument = instrument_enum_name(input_workspace) if database is None: database = database_file[enum_instrument] # default database name return Table.load( database, caltype, str(enum_instrument), component, day_stamp(input_workspace), output_workspace=output_workspace, )
[docs] def apply_calibrations( input_workspace, database=None, calibrations=[cal.name for cal in CalType], output_workspace=None, ): r""" Load and apply pixel calibrations to an input workspace. devs - Jose Borreguero <borreguerojm@ornl.gov> Parameters ---------- input_workspace: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventWorkspace Input workspace whose pixels are to be calibrated. database: str, None Path to JSON file containing metadata for different past calibrations. If :py:obj:`None`, the default database is used. Currently, these are the default files: - BIOSANS, '/HFIR/CG3/shared/calibration/pixel_calibration.json', - EQSANS, '/SNS/EQSANS/shared/calibration/pixel_calibration.json', - GPSANS, '/HFIR/CG2/shared/calibration/pixel_calibration.json' calibrations: str, list One or more of 'BARSCAN' and/or 'TUBEWIDTH'. output_workspace: str Name of the output workspace with calibrated pixels. If :py:obj:`None`, the pixels of the input workspace will be calibrated. Returns ------- ~mantid.api.MatrixWorkspace, ~mantid.api.IEventsWorkspace """ if output_workspace is None: output_workspace = str(input_workspace) else: CloneWorkspace(InputWorkspace=input_workspace, OutputWorkspace=output_workspace) if isinstance(calibrations, str): # we passed only one calibration calibrations = [ calibrations, ] # convert `calibrations` into a list components = { InstrumentEnumName.BIOSANS: ["detector1", "wing_detector", "midrange_detector"], InstrumentEnumName.EQSANS: ["detector1"], InstrumentEnumName.GPSANS: ["detector1"], } for component in components[instrument_enum_name(input_workspace)]: for caltype in calibrations: try: calibration = load_calibration(input_workspace, caltype, component, database=database) except CalibrationNotFound as e: calibration = None warnings.warn(str(e)) if calibration is not None: calibration.apply(output_workspace) return mtd[output_workspace]
def _consecutive_true_values(values, how_many, reverse=False, raise_message=None): r""" Find first array index of consecutive `how_many` True values. devs - Andrei Savici <saviciat@ornl.gov>, Jose Borreguero <borreguerojm@ornl.gov> Parameters ---------- values: list list of `True` and `False` items how_many: int Number of desired consecutive `True` values raise_message: str Exception message. No exception if :py:obj:`None`, but INCORRECT_PIXEL_ASSIGNMENT is returned Returns ------- int Raises ------ IndexError If no index is found for any of the edges RuntimeError If a faulty tube is found """ # use the array or the reverse one truth_array = values[::-1] if reverse else values # create a sub-array of length how_many of True values that we want to find pattern = [True] * how_many # loop over the input data and return the first index where the next # how_many elements match the pattern for i in range(len(truth_array) - how_many): if truth_array[i : i + how_many] == pattern: return len(values) - i - 1 if reverse else i # raise an error if the pattern is not found else: if raise_message is not None: raise IndexError(raise_message) return INCORRECT_PIXEL_ASSIGNMENT # signal for non-identified value @namedtuplefy def find_edges( intensities, tube_threshold=0.2, shadow_threshold=0.3, tube_edge_min_width=3, shadow_edge_min_width=4, min_illuminated_length=7, ): r""" Find the active length of the tube and the shadow region All pixel indexes start from the bottom of the tube, with the first index being zero. devs - Andrei Savici <saviciat@ornl.gov>, Jose Borreguero <borreguerojm@ornl.gov> Parameters ---------- intensities: list pixel pixel_intensities along the tube. tube_threshold: float fraction of the average intensity to determine the tube edges. shadow_threshold: float fraction of the average intensity to determine the shadow region. tube_edge_min_width: int required minimum number of consecutive good pixels above the tube threshold shadow_edge_min_width: int required minimum number of consecutive shadowed pixels min_illuminated_length: int minimum number of illuminated pixels on the active length Returns ------- namedtuple the fields of the name tuple are: - bottom_pixel: first illuminated pixel - top_pixel: last illuminated pixel - bottom_shadow_pixel: first shadowed pixel - above_shadow_pixel= first illuminated pixel above the shadow region """ # calculate minimum intensity thresholds for tube ends and shadows average_intensity = np.average(intensities) end_threshold = tube_threshold * average_intensity shadow_threshold = shadow_threshold * average_intensity # Find edges of the tube: want at least tube_edge_min_width pixels # (starting from the top or bottom of a tube) that have pixel_intensities greater # than the threshold illuminated = [bool(i > end_threshold) for i in intensities] # The bottom pixel is the first illuminated pixel. It is required that the next tube_edge_min_width pixels are # also illuminated bottom_pixel = _consecutive_true_values( illuminated, tube_edge_min_width, raise_message="Could not find bottom tube edge", ) # The top pixel is the last illuminated pixel. It is required that the previous tube_edge_min_width pixels are # also illuminated. top_pixel = _consecutive_true_values( illuminated, tube_edge_min_width, raise_message="Could not find top tube edge", reverse=True, ) # Find the shadow region: similar to tube edges, but in this case # we want shadow_edge_min_width pixel_intensities less than the shadow threshold, # followed by at least one intensity greater than the threshold # The bottom pixel shadowed by the bar is the first pixel below the intensity threshold. We require that the # next shadow_edge_min_width are also shadowed. shadowed = [bool(i < shadow_threshold) for i in intensities[bottom_pixel:]] bottom_shadow_pixel = bottom_pixel + _consecutive_true_values( shadowed, shadow_edge_min_width, raise_message="Could not find bottom shadow edge", ) # Find the first illuminated pixel above the bar. illuminated = [ bool(i > shadow_threshold) for i in intensities[bottom_shadow_pixel + shadow_edge_min_width : top_pixel + 1] ] # Don't raise if the pixel is not found above_shadow_pixel = ( bottom_shadow_pixel + shadow_edge_min_width + _consecutive_true_values(illuminated, 1, raise_message=None) ) # Check for a faulty tube: we want a certain number of pixels not in the bar shaddow active_tube_length = top_pixel - bottom_pixel + 1 shadow_length = above_shadow_pixel - bottom_shadow_pixel if active_tube_length < min_illuminated_length + shadow_length: raise RuntimeError("Faulty tube found") return dict( bottom_pixel=bottom_pixel, top_pixel=top_pixel, bottom_shadow_pixel=bottom_shadow_pixel, above_shadow_pixel=above_shadow_pixel, ) @namedtuplefy def fit_positions( edge_pixels, bar_positions, tube_pixels=256, order=5, ignore_value=INCORRECT_PIXEL_ASSIGNMENT, permissive=False, ): r""" Fit the position and heights of the pixels in a tube. The bar_positions as a function of edge pixels are fitted to a nth order polynomial (by default n=5). The positions of the pixels along the tube are the values of the polynomial at integer points, while the heights are the derivatives. Description from the master requirements document, section A2.1 All pixel indexes start from the bottom of the tube, with the first index being zero. Uses :ref:`~numpy.polynomial.polynomial.polyfit`. devs - Andrei Savici <saviciat@ornl.gov>, Parameters ---------- edge_pixels: list (or numpy array) the bottom pixel for each bar position, as found in `find_edges` function bar_positions: list (or numpy array) the bar position from the logs for each file in the bar scan tube_pixels: integer number of pixels for which to calculate positions and heights order: integer the order of polynomial to be used in the fit (default 5) ignore_value: int certain positions of the bar (close to the top and bottom of the tube) results in incorrect assignment of the edge pixel. In those cases it is expected that the edge pixel has a particular value that flags incorrect assignment. The default value is INCORRECT_PIXEL_ASSIGNMENT. These edge pixels will be ignored when carrying out the fit. permissive: bool If :py:obj:`True`, then fitted positions and heights are allowed to be non-physical. Only for debugging. Returns ------- namedtuple the fields of the name tuple are: - calculated_positions: calculated positions of the pixels - calculated_heights: calculated pixel heights """ message_len = "The positions of the bar and edge pixels have to be the same length" assert len(edge_pixels) == len(bar_positions), message_len # Ignore the incorrectly assigned edge pixels edge_pixels = np.array(edge_pixels) bar_positions = np.array(bar_positions) valid_edge_pixels = edge_pixels[np.where(edge_pixels != ignore_value)] valid_bar_positions = bar_positions[np.where(edge_pixels != ignore_value)] try: # if the algorithm was unable to find the pixel indexes associated with the bottom edge of the bar, it usually # assigns the top pixel (index 249). For instance, the first tube of the midrange front-panel has # [249, 249, ...,249] as edge_pixels. We check the number of unique indexes bigger than order if len(np.unique(valid_edge_pixels)) <= int(order): raise ValueError("Insufficient distinct number of pixel indexes") # fit the bar positions to a 5th degree polynomial in edge_pixels coefficients = np.polynomial.polynomial.polyfit(valid_edge_pixels, valid_bar_positions, int(order)) # calculate the coefficients of the derivative deriv_coefficients = np.polynomial.polynomial.polyder(coefficients) # evaluate the positions. Should be monotonically increasing calculated_positions = np.polynomial.polynomial.polyval(np.arange(tube_pixels), coefficients) position_jumps = np.diff(calculated_positions) if permissive is False and position_jumps[position_jumps <= 0.0].size > 0: raise ValueError( f"Pixel positions do not increase monotonically starting from the bottom of the tube\n" f"Positions = : {calculated_positions}" ) # evaluate the heights. All should be positive calculated_heights = np.polynomial.polynomial.polyval(np.arange(tube_pixels), deriv_coefficients) if permissive is False and calculated_heights[calculated_heights <= 0.0].size > 0: raise ValueError(f"Some of the calculated heights are negative.\n" f"Heights = {calculated_heights}") except Exception: coefficients = np.ones(int(order)) * np.nan calculated_positions = np.ones(tube_pixels) * np.nan calculated_heights = np.ones(tube_pixels) * np.nan return dict( calculated_positions=calculated_positions, calculated_heights=calculated_heights, coefficients=coefficients, ) def event_splitter( barscan_workspace, split_workspace=None, info_workspace=None, bar_position_log="dcal_Readback", ): r""" Split a Nexus events file containing a full bar scan, saving the splitting information into a 'split' and 'info' workspaces. This information is used later by ```barscan_workspace_generator```. It is assumed that the bar is shifted by a fixed amount every time we go on to the next scan. devs - Jose Borreguero <borreguerojm@ornl.gov> Parameters ---------- barscan_workspace: str Path to barscan run file containing multiple positions of the bar. split_workspace: str Name of the table workspace to be used as event splitter workpsce in algorithm FilterEvents. If :py:obj:`None`, a random name will be provided. info_workspace: str Name of the table workspace to be used along ``split_workspace`` in algorithm FilterEvents. If :py:obj:`None`, a random name will be provided. bar_position_log: str Name of the log entry in the barscan run file containing the position of the bar (Y-coordinate, in 'mm') with respect to some particular frame of reference, not necessarily the one located at the sample. Returns ------- list List of bar positions """ if split_workspace is None: split_workspace = mtd.unique_hidden_name() if info_workspace is None: info_workspace = mtd.unique_hidden_name() # Find the amount by which the position of the bar is shifted every time we go on to the next scan. # It is assumed that this shift is a fixed amount bar_positions = SampleLogs(barscan_workspace)[bar_position_log].value bar_delta_positions = bar_positions[1:] - bar_positions[:-1] # list of shifts in the position of the bar # Only retain shifts where the bar position increases bar_delta_positions = bar_delta_positions[bar_delta_positions > 0] # Find the most likely shift of the bar position, within two significant figures. Even thought the shift # is supposed to be constant, minute fluctuations over this value (<1%) are often encountered # We round-off bar shifts up to two decimal places, then create a histogram of these values, and pick # the shift having the largest count. bar_step = float(np.bincount(np.round(100 * bar_delta_positions).astype("int")).argmax()) / 100.0 # Mantid algorithm that creates the 'split' and 'info' workspaces using the bar positions stored in the # metadata GenerateEventsFilter( InputWorkspace=barscan_workspace, OutputWorkspace=split_workspace, InformationWorkspace=info_workspace, UnitOfTime="Nanoseconds", LogName=bar_position_log, LogValueInterval=bar_step, LogValueTolerance=bar_step / 2, MinimumLogValue=min(bar_positions), MaximumLogValue=max(bar_positions), ) # Read bar positions from the generated info_workspace. This is a table workspace whose the second column # has entries as strings of the form: "Log.dcal_Readback.From.{min}.To.{max}.Value-change-direction:both" # We parse this string to fetch the 'min' and 'max' values of the bar bar_positions = list() for min_max_bar_position in mtd[info_workspace].column(1): min_bar_position = float(min_max_bar_position.split(".From.")[-1].split(".To.")[0]) max_bar_position = float(min_max_bar_position.split(".To.")[-1].split(".Value")[0]) bar_positions.append((min_bar_position + max_bar_position) / 2) # average between 'min' and 'max' return bar_positions def barscan_workspace_generator(barscan_dataset, bar_position_log="dcal_Readback", mask=None, delete_workspaces=True): r""" A python generator to be used when the user wants to iterate over the runs that hold the bar at a fixed position. Each iteration prompts this generator to return the position of the bar and a workspace containing the intensities for the run that held the bar at the returned position. devs - Jose Borreguero <borreguerojm@ornl.gov> Parameters ---------- barscan_dataset: str, list Path(s) to barscan run file(s), or list of workspaces. If only one file, it should contain multiple positions of the bar. The generator will split the file into multiple workpaces, each one containing the scan of the bar at a particular position. If a list of files, then each file contains the pixel_intensities recorded with a constant position for the bar. If a list of workspaces, each workspace must contain the same information as when passing a list of files. bar_position_log: str Name of the log entry in the barscan run file containing the position of the bar (Y-coordinate, in 'mm') with respect to some particular frame of reference, not necessarily the one located at the sample. mask: mask file path, ~mantid.api.MaskWorkspace, :py:obj:`list` A mask to be applied. If :py:obj:`list`, it is a list of detector ID's. delete_workspaces: Bool Delete temporary workspaces one we have iterated over all the scans. Returns ------- tuple A two-item tuple containing, in this order: - the position of the bar as stated in the logs the run currently being returned. - the name of the workspace containing the run currently being returned. """ temporary_workspaces = list() # store the names of the workspaces to be removed if isinstance(barscan_dataset, str): # the whole barscan is contained in a single file. Must be splitted into subruns. Each subrun will contain # intensities for a run with the bar held at a fixed position. spliter_workspace = mtd.unique_hidden_name() info_workspace = mtd.unique_hidden_name() barscan_workspace = mtd.unique_hidden_name() temporary_workspaces.extend([spliter_workspace, info_workspace, barscan_workspace]) # BOTTLENECK LoadEventNexus(barscan_dataset, OutputWorkspace=barscan_workspace) # Create the splitting scheme and save it in table workspaces `spliter_workspace` and `info_workspace`. bar_positions = event_splitter( barscan_workspace, split_workspace=spliter_workspace, info_workspace=info_workspace, bar_position_log=bar_position_log, ) splitted_workspace_group = mtd.unique_hidden_name() # group of subruns # Mantid algorithm using the 'spliter' and 'info' tables to carry out the splitting of # `barscans_workspace into a set of subruns. # BOTTLENECK FilterEvents( InputWorkspace=barscan_workspace, SplitterWorkspace=spliter_workspace, InformationWorkspace=info_workspace, OutputWorkspaceBaseName=splitted_workspace_group, GroupWorkspaces=True, TimeSeriesPropertyLogs=[bar_position_log], ExcludeSpecifiedLogs=False, ) temporary_workspaces.append(splitted_workspace_group) temporary_workspaces.append("TOFCorrectWS") # spurious workspace spawned by FilterEvents barscan_workspaces = [splitted_workspace_group + "_" + str(i) for i in range(len(bar_positions))] else: # of a set of files or workspaces, each contains intensities for a scan with the bar fixed # determine if the list contains files or workspaces first_scan = barscan_dataset[0] if isinstance(first_scan, str) and os.path.exists(first_scan): # list of files, thus load into workspaces loader = loader_algorithm(barscan_dataset[0]) barscan_workspaces = list() barscan_workspace_basename = mtd.unique_hidden_name() for scan_index, scan_data in enumerate(barscan_dataset): barscan_workspace = f"{barscan_workspace_basename}_{scan_index:03d}" temporary_workspaces.append(barscan_workspace) loader(scan_data, OutputWorkspace=barscan_workspace) barscan_workspaces.append(barscan_workspace) else: # barscan_dataset is a set of workspaces barscan_workspaces = barscan_dataset # bar_positions = [SampleLogs(barscan_workspace).find_log_with_units(bar_position_log, 'mm') # for barscan_workspace in barscan_workspaces] bar_positions = list() for barscan_workspace in barscan_workspaces: try: bar_position = SampleLogs(barscan_workspace).find_log_with_units(bar_position_log, "mm") except RuntimeError as run_error: raise RuntimeError(f"Workspace {str(barscan_workspace)}: {run_error}") bar_positions.append(bar_position) # Serve bar positions and workspaces, one at a time for bar_position, barscan_workspace in zip(bar_positions, barscan_workspaces): if mask is not None: apply_mask(barscan_workspace, mask=mask) yield bar_position, barscan_workspace # Clean up workspaces we instantiated for each scan if delete_workspaces is True: DeleteWorkspaces(temporary_workspaces) # flake8: noqa: C901
[docs] def calculate_barscan_calibration( barscan_dataset, component="detector1", bar_position_log="dcal_Readback", formula=None, order=5, mask=None, inspect_data=False, permissive_fit=False, ): r""" Calculate pixel positions (only Y-coordinae) as well as pixel heights from a barscan calibration session. **Mantid Algorithms used:** :ref:`Load <algm-Load-v1>`, devs - Andrei Savici <saviciat@ornl.gov>, Jose Borreguero <borreguerojm@ornl.gov> Parameters ---------- barscan_dataset: str, list Path(s) to barscan run file(s), or list of workspaces. If only one file, it should contain multiple positions of the bar. If a list of files, then each file contains the pixel_intensities recorded with a constant position for the bar. If a list of workspaces, each workspace must contain the same information as when passing a list of files. component: str Name of the detector panel scanned with the bar. Usually, 'detector1`. bar_position_log: str Name of the log entry in the barscan run file containing the position of the bar (Y-coordinate, in 'mm') with respect to some particular frame of reference, not necessarily the one located at the sample. formula: str Formula to obtain the position of the bar (Y-coordinate) in the frame of reference located at the sample. order: int Highest degree for the polynomial that will fit the observed positions of the bar. mask: mask file path, ~mantid.api.MaskWorkspace, :py:obj:`list` A mask to be applied. If :py:obj:`list`, it is a list of detector ID's. inspect_data: Bool Additional pieces of data returned by this function in order to assess the correctness of the barscan calculation. These data are returned as a dictionary with the current entries: - bar_positions: list of Y-coordinates of the bar for each scan holding the bar at a particular position. - bar workspaces: list of ~mantid.api.MatrixWorkspace objects, each containing the bar at a particular position. - bottom_shadow_pixels: ~numpy.ndarray of shape (number of scans, number of tubes) listing the indexes for the pixels shadowed by the lower portion of the bar. permissive_fit: bool If :py:obj:`True`, then fitted positions and heights are allowed to be non-physical. Only for debugging. Returns ------- ~drtsans.pixel_calbration.Table, dict If ```inspect_data``` is :py:obj:`False`, only a table object is returned. Otherwise a tube is returned where the first component is the table, and the second item is a dictionary with the additional pieces of data. """ addons = dict(bar_positions=[], bar_workspaces=[]) # for inspecting the result of the calibration instrument_name, number_pixels_in_tube, number_tubes = ( None, None, None, ) # initialize some variables run_numbers, daystamp = set(), None # initialize some variables bar_positions = [] # Y-coordinates of the bar for each tube and scan # `bottom_shadow_pixels` is a 2D array defining the position of the bar on the detector, in pixel coordinates # The first index corresponds to the Y-axis (along each tube), the second to the X-axis (across tubes) # Thus, bottom_shadow_pixels[:, 0] indicates bottom shadow pixel coordinates along the very first tube # bottom_shadow_pixels.shape = (number of scans, number of tubes) bottom_shadow_pixels = [] delete_workspaces = True if inspect_data is False else False # retain workspace per scan if we want to inspect barscan_list = barscan_workspace_generator( barscan_dataset, bar_position_log=bar_position_log, mask=mask, delete_workspaces=delete_workspaces, ) for bar_position, barscan_workspace in barscan_list: if instrument_name is None: # retrieve some info from the first bar position instrument_name = instrument_standard_name(barscan_workspace) daystamp = day_stamp(barscan_workspace) # instantiate a bar formula using either the instrument default or a user's formula if formula is None: bar_formula = BarPositionFormula(instrument_component=(instrument_name, component)) else: bar_formula = BarPositionFormula(formula=formula) bar_formula.validate_top_position(bar_position) run_numbers.add(int(SampleLogs(barscan_workspace).run_number.value)) # Find out the Y-coordinates of the bar in the reference-of-frame located at the sample bottom_shadow_pixels_per_scan = [] # For the current scan, we have one bottom shadow pixel for each tube if number_pixels_in_tube is None: # We create a tube collection to figure out the pixel indexes for each tube. # A TubeCollection is a list of TubeSpectrum objects, each representing a physical tube. Here # we obtain the list of tubes for the selected double-detector-panel array. # The view 'decreasing X' sort the tubes by decreasing value of their corresponding X-coordinate. # In this view, a double detector panel looks like a single detector panel. When looking at # the panel standing at the sample, the leftmost tube has the highest X-coordinate, so the # 'decreasing X' view orders the tubes from left to right. collection = TubeCollection(barscan_workspace, component).sorted(view="decreasing X") # pixel_indexes is a list of length equal the number of tubes. Each item in the list is itself a list, # containing the pixel spectrumInfo indexes for a particular tube. pixel_indexes = [tube.spectrum_info_index for tube in collection] number_tubes, number_pixels_in_tube = len(collection), len(collection[0]) # Find the detector ID's for the tube collection, that is, the selected component # BOTTLENECK, but it's run one time detector_ids = list(itertools.chain.from_iterable(tube.detector_ids for tube in collection)) bar_positions.append([bar_formula.evaluate(bar_position, i) for i in range(number_tubes)]) # pixel_intensities is a list, whose items are the integrated intensities for each pixel. The index of this # list coincides with the spectrumInfo index. pixel_intensities = np.sum(mtd[barscan_workspace].extractY(), axis=1) for pixel_indexes_in_tube in pixel_indexes: # iterate over each tube, retrieving its pixel indexes try: # Find the bottom shadow pixel for the current tube and current barscan run pixel_intensities_in_tube = pixel_intensities[pixel_indexes_in_tube] bottom_shadow_pixels_per_scan.append(find_edges(pixel_intensities_in_tube).bottom_shadow_pixel) except IndexError: # tube masked or malfunctioning bottom_shadow_pixels_per_scan.append(INCORRECT_PIXEL_ASSIGNMENT) except RuntimeError: # tube masked or malfunctioning bottom_shadow_pixels_per_scan.append(INCORRECT_PIXEL_ASSIGNMENT) bottom_shadow_pixels.append(bottom_shadow_pixels_per_scan) # Add iteration info to the addons addons["bar_positions"].append(bar_position) addons["bar_workspaces"].append(barscan_workspace) bottom_shadow_pixels = np.array(bottom_shadow_pixels) bar_positions = np.array(bar_positions) # Deal with corner cases not resolved with the find_edges algorithm resolve_incorrect_pixel_assignments(bottom_shadow_pixels, bar_positions) addons["bottom_shadow_pixels"] = bottom_shadow_pixels if len(bottom_shadow_pixels) <= order: raise ValueError(f"There are not enough bar positions to fo a fit with a polynomyal of order {order}.") # fit pixel positions for each tube positions, heights = [], [] for tube_index in range(number_tubes): # iterate over the tubes in the collection # Fit the pixel numbers and Y-coordinates of the bar for the current tube with a polynomial try: fit_results = fit_positions( bottom_shadow_pixels[:, tube_index], bar_positions[:, tube_index], order=order, tube_pixels=number_pixels_in_tube, permissive=permissive_fit, ) except ValueError as e: raise ValueError(f"In tube index {tube_index}: {e}") # Store the fitted Y-coordinates and heights of each pixel in the current tube # Store as lists so that they can be easily serializable positions.append(list(1.0e-03 * fit_results.calculated_positions)) # store with units of meters heights.append(list(1.0e-03 * fit_results.calculated_heights)) # store with units of meters # Find the average pixel positions and heights in a tube using all good tubes. Then apply these to # any bad tube positions = np.array(positions) # shape = (number_tubes, number_pixels_in_tube) heights = np.array(heights) bad_tube_indexes = np.isnan(np.sum(positions, axis=1)) # array containing True at the index of a bad tube if np.any(bad_tube_indexes): # there's at least one bad tube average_positions = np.average(positions[~bad_tube_indexes], axis=0) average_heights = np.average(heights[~bad_tube_indexes], axis=0) for tube_index in np.where(bad_tube_indexes)[0]: # insert averages in the bad tubes positions[tube_index] = average_positions heights[tube_index] = average_heights positions = list(positions.ravel()) heights = list(heights.ravel()) metadata = dict( caltype=CalType.BARSCAN.name, instrument=instrument_name, component=component, daystamp=daystamp, runnumbers=sorted(list(run_numbers)), ) calibration = Table(metadata, detector_ids=detector_ids, positions=positions, heights=heights) # decide on what to return if inspect_data is True: return calibration, addons else: return calibration
def bad_tube(bottom_shadow_pixels): r""" Check if all pixels for this tube have been flagged as ```INCORRECT_PIXEL_ASSIGNMENT```. Happens when the tube has been masked or it was malfunctioning when the scans were taken. Parameters ---------- bottom_shadow_pixels: list Pixel indexes defining the positions of the bottom of the bar in the barscan Returns ------- bool """ return list(set(bottom_shadow_pixels)) == [ INCORRECT_PIXEL_ASSIGNMENT, ] def resolve_incorrect_pixel_assignments(bottom_shadow_pixels, bar_positions): r""" The algorithm finding the position of the bottom of the bar sometimes fails when the bar is close to the bottom of the detector, finding two corner cases. Corner case 1: When the bar approaches the bottom of the tubes, the bottom edge of its shadow blends with the non-functioning pixels at the bottom of the tubes. In this scenario, the bottom edge of the bar is assigned as the first non-functioning pixel at the top of the tube. Thus, we must find out for each tube a sudden jump in the identified bottom shadow pixel. Example: the bottom shadow pixels for the first tube as we change the position of the bar have been identified: 249, 230, 209, 187, 165, 145, 129, 103, 82, 67, 52, 39, 21, 249, 249 In the example, the last two bottom pixels are erroneous. They must be set to INCORRECT_PIXEL_ASSIGNMENT Corner case 2: When the identified bottom pixel in a tube is far from the boundary between the illuminated pixels and the non illuminated pixels, it leaves a few non-illuminated pixels as active pixels. The bottom shadow pixel can then be incorrectly assigned as one of these non-illuminated pixels. The results are outliers pixel indexes. Example: 249 248 246 4 244 242.... Here "4" is in incorrectly assigned pixel index as bottom of the bar We must find these outliers and assign them as incorrect. We use a linear fit to find out outliers. Fixes to corner cases are skippedd for bad tubes (see ~drtsans.pixel_calibration.bad_tube). devs - Jose Borreguero <borreguerojm@ornl.gov> Parameters ---------- bottom_shadow_pixels: numpy.ndarray 2D array defining the position of the bar on the detector, in pixel coordinates. The first array index corresponds to the Y-axis (along each tube), the second to the X-axis (across tubes). Thus, bottom_shadow_pixels[:, 0] indicates bottom shadow pixel coordinates along the very first tube bar_positions: numpy.ndarray 2D array defining the position of the bar on the detector, in mili meters. The first array index corresponds to the Y-axis (along each tube), the second to the X-axis (across tubes). Thus, bar_positions[:, 0] indicates bottom shadow Y-coordinates along the very first tube """ number_bar_positions, number_tubes = bottom_shadow_pixels.shape # Iterate over each tube for tube_index in range(number_tubes): if bad_tube(bottom_shadow_pixels[:, tube_index]): continue # this tube has all pixels marked as incorrectly assigned. Don't attempt to fix corner issues. # # Correct corner case 1 # We scan the end of the tube for possible missassignments jump_index = None y = bottom_shadow_pixels[:, tube_index] begin_index = number_bar_positions - 2 # start next to last index end_index = max(0, number_bar_positions - 12) # look at the last 12 pixels for i in range(begin_index, end_index, -1): # start from the bottom of the tube, work upwards if abs(y[i] - y[i - 1]) > 10 * max(1, abs(y[i + 1] - y[i])): # value/factor of 10 selected as threshold jump_index = i # The position of the bar jumped by more than 10 pixels. Flags an incorrect assigment break if jump_index is not None: y[jump_index:] = INCORRECT_PIXEL_ASSIGNMENT # # Correct corner case 2 y = bottom_shadow_pixels[:, tube_index] # bar positions in pixel coordinates x = bar_positions[:, tube_index] # bar positions in mili meters # Filter out bad pixels y_correct = y[y != INCORRECT_PIXEL_ASSIGNMENT] x_correct = x[y != INCORRECT_PIXEL_ASSIGNMENT] # bar positions, in mili-meters # Linear fit between bar positions in pixel coordinates and mili-meters try: coefficients = np.polynomial.polynomial.polyfit(x_correct, y_correct, 1) except TypeError: # empty array `x_correct`, meaning this tube was masked or malfunctioning continue # Find residuals of correct pixels y_fitted = np.polynomial.polynomial.polyval(x_correct, coefficients) residuals = np.abs(y_correct - y_fitted) # deviations between the linear fit and the actual positions large_residual = np.average(residuals) + 1.5 * np.std(residuals) # Find residuals of correct and incorrect pixels. Residuals for incorrect pixels are nonsense, # but we include them because we need array `residuals` and array `y` of same length. y_fitted = np.polynomial.polynomial.polyval(x, coefficients) residuals = np.abs(y - y_fitted) # We flag as incorrect assignments those correct pixels with bar positions largely deviating from the linear # fit. The incorrect pixels already have nonsense large residuals y[(residuals > large_residual) & (y != INCORRECT_PIXEL_ASSIGNMENT)] = INCORRECT_PIXEL_ASSIGNMENT
[docs] def calculate_apparent_tube_width(flood_input, component="detector1", load_barscan_calibration=True, db_file=None): r""" Determine the tube width most efficient for detecting neutrons. An effective tube (or pixel) diameter is determined for tubes in the front panel, and likewise for the tubes in the back panel. devs - Jose Borreguero <borreguerojm@ornl.gov> **Mantid algorithms used:** :ref:`DeleteWorkspaces <algm-DeleteWorkspaces-v1>`, :ref:`Integration <algm-Integration-v1>`, :ref:`MaskDetectors <algm-MaskDetectors-v1>`, :ref:`MaskDetectorsIf <algm-MaskDetectorsIf-v1>`, :ref:`ReplaceSpecialValues <algm-ReplaceSpecialValues-v1>`, Parameters ---------- flood_input: str, ~mantid.api.IEventWorkspace, ~mantid.api.MatrixWorkspace Path to flood run, flood workspace name, or flood workspace object. component: str Name of the instrument component containing the detector array consisting of two parallel panels of tubes. load_barscan_calibration: bool Load pixel positions and heights from the pixel-calibrations database appropriate to ```input_workspace```. If :py:obj:`False`, then the pixel positions and heigths will be those of ```input_workspace```. db_file: str, None database file (json format). None will load the default database for the selected instrument. Otherwise the combination load_barscan_calibration=True, db_file=None may come across as some data contradictory Returns ------- dict Dictionary containing the following keys: - instrument, str, Standard name of the instrument. - component, str, name of the double detector array, usually "detector1". - run, int, run number of ``input_workspace``. - unit: str, the units for the tube widths. Set to 'mm' for mili-meters. - widths, list, A two-item list containing the apparent widths for the front and back tubes. """ # Determine the type of input for the flood data if file_exists(flood_input): input_workspace = mtd.unique_hidden_name() Load(Filename=flood_input, OutputWorkspace=input_workspace) else: input_workspace = str(flood_input) # workspace object or workspace name integrated_intensities = mtd.unique_hidden_name() Integration(InputWorkspace=input_workspace, OutputWorkspace=integrated_intensities) # Mask non-finite pixel_intensities (nan, inf). They can't be used in the calculation. # # Replace non-finite pixel_intensities with a value of -1 ReplaceSpecialValues( InputWorkspace=integrated_intensities, OutputWorkspace=integrated_intensities, NanValue=-1, NanError=-1, InfinityValue=-1, InfinityError=-1, ) # Mask detectors with negative pixel_intensities mask_workspace = mtd.unique_hidden_name() MaskDetectorsIf( InputWorkspace=integrated_intensities, Operator="Less", Value=0.0, OutputWorkspace=mask_workspace, ) MaskDetectors(Workspace=integrated_intensities, MaskedWorkspace=mask_workspace) # Update pixel positions and heights with the appropriate calibration, if so requested. if load_barscan_calibration is True: calibration = load_calibration(input_workspace, "BARSCAN", component=component, database=db_file) calibration.apply(integrated_intensities) # Calculate the count density for each tube. Notice that if the whole tube is masked, then the associated # intensity is stored as nan. # # Sort the tubes according to the X-coordinate in decreasing value. This is the order when sitting on the # sample and iterating over the tubes "from left to right" collection = TubeCollection(integrated_intensities, component).sorted(view="fbfb") # BOTTLENECK detector_ids = list(itertools.chain.from_iterable(tube.detector_ids for tube in collection)) count_densities = list() for tube in collection: weighted_intensities = tube.readY.ravel() / tube.pixel_heights d = np.mean(weighted_intensities[~tube.isMasked]) count_densities.append(d) count_densities = np.array(count_densities) # is convenient to cast densities into a numpy array data structure. # Determine the count densities per panel and for the whole detector array. # We must be careful to pick only tubes with finite densities (avoid 'nan') average_count_density = np.mean(count_densities[np.isfinite(count_densities)]) front_count_density = np.mean(count_densities[::2][np.isfinite(count_densities[::2])]) # front tubes, even indexes back_count_density = np.mean(count_densities[1::2][np.isfinite(count_densities[1::2])]) # back tubes, odd indexes # Determine the front and back pixel widths nominal_width = collection[0][0].width # width of the first pixel in the first tube front_width = (front_count_density / average_count_density) * nominal_width back_width = (back_count_density / average_count_density) * nominal_width # Generate a list of pixel widths. It is assumed that front tubes have an even tube index widths = list() for tube_index, tube in enumerate(collection): pixel_width = front_width if tube_index % 2 == 0 else back_width widths.extend([pixel_width] * len(tube)) DeleteWorkspaces(integrated_intensities, mask_workspace) metadata = dict( caltype=CalType.TUBEWIDTH.name, instrument=instrument_standard_name(input_workspace), component=component, daystamp=day_stamp(input_workspace), runnumbers=[ int(SampleLogs(input_workspace).run_number.value), ], ) return Table(metadata, detector_ids=detector_ids, widths=widths)
[docs] @namedtuplefy def as_intensities(input_workspace, component="detector1", views=["positions", "heights", "widths"]): r""" Returns one workspace for each pixel property that is calibrated (e.g., pixel height), and the calibration datum is stored as the intensity value for that pixel. Intended to visualize the calibration in MantidPlot's instrument viewer. Not required for calibration generation or for data reduction. Generated workspaces are ```input_name_positions```, ```input_name_heights```, and ```input_name_widths```, where ```input_name``` is the name of the input workspace. Note: Positions for visualization in Mantid's instrument view are shifted so that the lowest position (usually a negative number) becomes zero. The reason being that showing the instrument in Mantid will mask negative intensities, and we want to avoid this. **Mantid algorithms used:** :ref:`CreateWorkspace <algm-CreateWorkspace-v1>`, <https://docs.mantidproject.org/algorithms/CreateWorkspace-v1.html> :ref:`LoadEmptyInstrument <algm-LoadEmptyInstrument-v1>`, <https://docs.mantidproject.org/algorithms/LoadEmptyInstrument-v1.html> :ref:`LoadInstrument <algm-LoadInstrument-v1>`, <https://docs.mantidproject.org/algorithms/LoadInstrument-v1.html> Parameters ---------- input_workspace: str, ~mantid.api.MatrixWorkspace, ~mantid.api.IEventsWorkspace Workspace from which pixel properties are retrieved. component: str, list Name or list of names for the double detector array panels. For BIOSANS we have 'detector1', 'wing_detector' or 'midrange_detector'. views: list Generate views for the pixel properties provided. Returns ------- namedtuple A namedtuple containing the ~mantid.api.MatrixWorkspace workspaces with fields 'positions', 'positions_mantid', 'heights', and 'widths' """ if isinstance(component, str): components = [ component, ] else: components = component # methods TubeSpectrum.pixel_y, TubeSpectrum.pixel_heights, and TubeSpectrum.pixel_widths tube_properties = { "positions": "pixel_y", "heights": "pixel_heights", "widths": "pixel_widths", } # collect pixel properties and associated workspace indexes workspace_indexes = np.array([], dtype=int) pixel_props = dict() # on entry for each view, each entry is a 1D numpy array for current_component in components: collection = TubeCollection(input_workspace, current_component).sorted(view="decreasing X") for tube in collection: workspace_indexes = np.hstack( (workspace_indexes, tube.spectrum_info_index) ) # workspace indexes for the tube for view in views: # 'positions', 'heights', 'widths' pixel_props_collected = pixel_props.get(view, np.array([])) # pixel properties up to the current tube addition = getattr(tube, tube_properties[view]) # pixel properties for the current tube pixel_props[view] = np.hstack((pixel_props_collected, addition)) # Mantid can only show positive quantities in the instrument view if "positions" in pixel_props: pixel_props["positions_mantid"] = pixel_props["positions"] - np.min(pixel_props["positions"]) number_histograms = mtd[str(input_workspace)].getNumberHistograms() intensities = np.zeros(number_histograms) returned_views = {} for cal_prop in pixel_props: # 'positions', 'heights', 'widths', 'positions_mantid' output_workspace = f"{str(input_workspace)}_{cal_prop}" # Workspace containing the property as intensity # intensties will be non-zero only for workpace indexes that have associated pixels of interests intensities[workspace_indexes] = pixel_props[cal_prop] workspace = Integration(InputWorkspace=input_workspace, OutputWorkspace=output_workspace) for index in range(number_histograms): workspace.dataY(index)[:] = intensities[index] returned_views[cal_prop] = mtd[output_workspace] return returned_views
[docs] def split_barscan_run(input_file, output_directory, bar_position_log="dcal_Readback"): r""" Split a barscan file containing many positions of the bar into a set of files each holding the bar at a unique position. The input file must be an events file. The output files contain only the total intensity per pixel. If input file is of the name 'INST_1234.nxs.h5', the output files' names are 'INST_1234_0.nxs', 'INST_1234_1.nxs', and so on. Parameters ---------- input_file: str Events Nexus file containing a full barscan (many positions of the bar) ouput_directory: str Path where the individual scans are saved """ barscan_workspace = mtd.unique_hidden_name() spliter_workspace = mtd.unique_hidden_name() info_workspace = mtd.unique_hidden_name() splitted_workspace_group = mtd.unique_hidden_name() LoadEventNexus(input_file, OutputWorkspace=barscan_workspace) bar_positions = event_splitter( barscan_workspace, split_workspace=spliter_workspace, info_workspace=info_workspace, bar_position_log=bar_position_log, ) FilterEvents( InputWorkspace=barscan_workspace, SplitterWorkspace=spliter_workspace, InformationWorkspace=info_workspace, OutputWorkspaceBaseName=splitted_workspace_group, GroupWorkspaces=True, TimeSeriesPropertyLogs=[bar_position_log], ExcludeSpecifiedLogs=False, ) os.makedirs(output_directory, exist_ok=True) # Create directory, and don't complain if already exists basename = os.path.basename(input_file).split(".nxs")[0] for i in range(len(bar_positions)): workspace = splitted_workspace_group + "_" + str(i) AddSampleLog( Workspace=workspace, LogName=bar_position_log, LogText=str(bar_positions[i]), LogType="Number Series", LogUnit="mm", ) Rebin( workspace, Params=[0, 1000000, 1000000], PreserveEvents=False, OutputWorkspace=workspace, ) out_file = os.path.join(output_directory, f"{basename}_{i}.nxs") SaveNexus(InputWorkspace=workspace, Filename=out_file) # Clean up all workspaces DeleteWorkspaces([splitted_workspace_group + "_" + str(i) for i in range(len(bar_positions))]) DeleteWorkspaces([splitted_workspace_group, info_workspace, spliter_workspace, barscan_workspace])