Source code for drtsans.samplelogs

# package imports

# third party imports
from mantid.api import Run, MatrixWorkspace
from mantid.kernel import (
    BoolTimeSeriesProperty,
    DateAndTime,
    FloatTimeSeriesProperty,
    Int64TimeSeriesProperty,
    StringTimeSeriesProperty,
)
from mantid.simpleapi import mtd
import numpy as np

# standard library imports
from typing import List, Union

TimeSeriesProperty = Union[
    BoolTimeSeriesProperty, FloatTimeSeriesProperty, Int64TimeSeriesProperty, StringTimeSeriesProperty
]
SECONDS_TO_NANOSECONDS = 1.0e09  # from seconds to nanoseconds


[docs] def time_series( name: str, elapsed_times: Union[np.ndarray, List[DateAndTime]], values: List[Union[bool, float, int, str]], start_time: Union[DateAndTime, str] = "2000-01-01T00:00:00", unit: str = "", ) -> TimeSeriesProperty: r""" Create time series log Parameters ---------- name log entry name elapsed_times List of elapsed times after ```start_time```, in seconds. start_time Starting time for the run values List of log values, same length as the list of times unit Log unit Returns ------- mantid log that can be added as a property to a Run object Raises ------ ValueError If items in `sequence `values`` is not one of ``bool, ``float``, ``int``, ``str`` """ if len(elapsed_times) != len(values): raise ValueError("elapsed_times and values must have the same length") python_types = (bool, float, int, str, object) mantid_types = ( BoolTimeSeriesProperty, FloatTimeSeriesProperty, Int64TimeSeriesProperty, StringTimeSeriesProperty, FloatTimeSeriesProperty, ) first_value = values[0] for python_type, mantid_type in zip(python_types, mantid_types): if isinstance(first_value, python_type): series_property = mantid_type(name) break else: raise ValueError(f"Cannot create time series log for values of type {type(first_value)}") start = start_time if isinstance(start_time, str): start = DateAndTime(start_time) # Insert one pair of (time, elapsed_time) at a time for elapsed_time, value in zip(elapsed_times, values): series_property.addValue(start + int(elapsed_time * SECONDS_TO_NANOSECONDS), value) series_property.units = unit return series_property
[docs] def periodic_index_log( period: float, interval: float, duration: float, run_start: Union[str, DateAndTime], offset: float = 0.0, step: float = 1.0, name: str = "periodic_index", ) -> FloatTimeSeriesProperty: r""" Generate a periodic log whose values are integers ranging from 0 to ``period / interval``. The first log entry is at ``run_start + offset`` with value 0. The next entry at ``run_start + offset + interval`` with value 1, and so on. The log wraps around at ``run_start + offset + period`` so the next value is again 0. Parameters ---------- period Period of the log, in seconds interval Interval between consecutive log entries, in seconds duration Duration of the log from ``run_start``, in seconds run_start Time of the first log entry, unless ``offset`` is also specified offset Time of the first log entry after ``run_start``, in seconds step Absolute value of the change in the log value between two consecutive entries name Name of the log Returns ------- A Mantid timeseries property object which can be attached to a Run object. Raises ------ ValueError If ``period`` is not a multiple of ``interval``. """ if SECONDS_TO_NANOSECONDS * period % interval > 1: # allow for rounding errors of 1 nanosecond raise ValueError(f"period {period} must be a multiple of interval {interval}") times = np.arange(offset, duration, interval) # times at which we insert a new log entry values_in_period = step * np.arange(0, int(period / interval)) # 0, 1,.., period/interval period_count = 1 + int((duration - offset) / period) # additional period if "/" truncates times entries = np.tile(values_in_period, period_count)[: len(times)].tolist() # cast to python's int type return time_series(name, times, entries, run_start, unit="")
[docs] class SampleLogs(object): r""" Log reader, a bit more pythonic source: PyObject Instrument object, MatrixWorkspace, workspace name, file name, run number """ def __init__(self, source): self._ws = None self._run = self.find_run(source) def __getitem__(self, item): if item in self._run.keys(): return self._run[item] raise KeyError('"{}" not found in sample logs'.format(item)) def __getattr__(self, item): _run = self.__dict__["_run"] try: return getattr(_run, item) except AttributeError: if item in _run.keys(): return _run.getProperty(item) else: raise AttributeError('"{}" not found in sample logs'.format(item)) def __contains__(self, item): """Called when using python's ``in`` operation""" return item in self._run
[docs] def insert(self, name: str, value: Union[str, int, float, list, TimeSeriesProperty], unit: str = None): r""" Wrapper to Mantid AddSampleLog algorithm The following properties of AddSampleLog are determined by inspection of `value`: LogText, LogType, NumberType Parameters ---------- name log entry name. If ``value`` is one of Mantid's time series property objects, it is expected that this is the name of the series. value Value to insert. If ``value`` is a ``list`` object, it will insert only the first value of the list. unit: str Log unit. If ``value`` is one of Mantid's time series property objects, it is expected that this is the unit of the series. """ if not unit: unit = "" if isinstance(value, list): value = value[0] # copies AddSampleLog behavior if isinstance( value, (BoolTimeSeriesProperty, FloatTimeSeriesProperty, Int64TimeSeriesProperty, StringTimeSeriesProperty) ): assert name == value.name assert unit == value.units self._ws.mutableRun().addProperty(name, value, unit, replace=True)
[docs] def insert_time_series(self, name, elapsed_times, values, start_time="2000-01-01T00:00:00", unit=""): r""" Insert a ~mantid.kernel.FloatTimeSeriesProperty in the logs Parameters ---------- name: str log entry name elapsed_times: list List of elapsed times after ```start_time```, in seconds. values: list List of log values, same length as the list of times start_time: str Starting time for the run unit: str units of the log values """ series_property = time_series(name, elapsed_times, values, start_time, unit) self._ws.mutableRun().addProperty(name, series_property, units=unit, replace=True)
@property def mantid_logs(self): return self._run @property def workspace(self): return self._ws
[docs] def single_value(self, log_key, operation=np.mean): r"""Cast the log entry to a single value Parameters ---------- log_key: str the name of the log operation: funct The casting operation to perform on the values when the log entry is a time series. Returns ------- str, float, None The single value associated with the log entry. Raises ------ RuntimeError When the log_key is not found in the sample logs """ _run = self.__dict__["_run"] value = _run[log_key].value if isinstance(value, str): return value else: return float(operation(value))
[docs] def find_run(self, other): r""" Retrieve the Run object Parameters ---------- other: Run, str, MatrixWorkspace Returns ------- Run Reference to the run object """ def from_ws(ws): self._ws = ws return ws.getRun() def from_run(a_run): return a_run def from_string(s): # see if it is a file if s in mtd: return self.find_run(mtd[s]) else: raise RuntimeError("{} is not a valid workspace name".format(s)) dispatch = {Run: from_run, MatrixWorkspace: from_ws, str: from_string} # If others is not None: raise exception if other is None: a = "[DEBUG 187] dispatch: {}".format(dispatch.items()) b = "[DEBUG 187] other: {}".format(other) raise NotImplementedError("{}\n{}".format(a, b)) finders = [v for k, v in dispatch.items() if isinstance(other, k)] if len(finders) == 0: # In case no items found raise RuntimeError( 'Input "other" of value {} is not supported to retrieve Mantid ' '"run" object'.format(other) ) finder = finders[0] return finder(other)
[docs] def find_log_with_units(self, log_key, unit=None): r""" Find a log entry in the logs, and ensure it has the right units Parameters ---------- log_key: string key of the log to find unit: None or string units string to enforce Returns ------- log value """ if log_key in self.keys(): if bool(unit) and not self[log_key].units == unit: error_msg = "Found %s with wrong units" % log_key error_msg += " [%s]" % self[log_key].units raise RuntimeError(error_msg) return np.average(self[log_key].value) raise RuntimeError(f"Could not find {log_key} with unit {unit} in logs: {self.keys()}")
[docs] def records_json(self, log=None, logs=None, default=None): r"""Save the `single_value` of the requested log entries to a dictionary, useful for reporting. If both `log` and `logs` are passed, the `log` entry is appended to the `logs` list Parameters ---------- log: str log entry name logs: List[str] list of log entry names default Value assigned to log entry if no entry is found Returns ------- dict log names and single values for each of the requested log entries """ if logs is None: logs = list() if log is not None: logs.append(log) output = {} for log_entry in logs: try: value = self.single_value(log_entry) except (KeyError, RuntimeError): value = default output[log_entry] = value return output