Source code for drtsans.polarization

# standard library imports
from collections import namedtuple
from dataclasses import dataclass
from typing import ClassVar, Generator, List, Optional

# third party imports
from mantid.simpleapi import CreateSingleValuedWorkspace, DeleteWorkspace, mtd, RenameWorkspace
import numpy as np

# drtsans imports
from drtsans.api import _set_uncertainty_from_numpy
from drtsans.samplelogs import SampleLogs
from drtsans.type_hints import MantidWorkspace


__all__ = [
    "PV_POLARIZER",
    "PV_POLARIZER_FLIPPER",
    "PV_POLARIZER_VETO",
    "PV_ANALYZER",
    "PV_ANALYZER_FLIPPER",
    "half_polarization",
    "SimulatedPolarizationLogs",
]

# Names of processing variables related to polarization, stored in the sample logs of the Nexus Event file
PV_POLARIZER = "Polarizer"
PV_POLARIZER_FLIPPER = "PolarizerFlipper"
PV_POLARIZER_VETO = "PolarizerVeto"

PV_ANALYZER = "Analyzer"
PV_ANALYZER_FLIPPER = "AnalyzerFlipper"
PV_ANALYZER_VETO = "AnalyzerVeto"


def _calc_flipping_ratio(polarization):
    """Calculates the flipping ratio from the polarization state

    Parameters
    ----------
    polarization: str, ~mantid.api.MatrixWorkspace
        Polarization state

    Returns
    -------
    ~mantid.api.MatrixWorkspace
        The ratio of flipping
    """
    value = polarization.extractY()[0]
    uncertainty = polarization.extractE()[0]
    if len(value) == 1 and len(uncertainty) == 1:
        value, uncertainty = value[0], uncertainty[0]

    uncertainty = 2.0 * uncertainty / np.square(1 - value)
    value = (1 + value) / (1 - value)

    if isinstance(value, float):  # create a single value workspace
        return CreateSingleValuedWorkspace(
            DataValue=value,
            ErrorValue=uncertainty,
            OutputWorkspace=mtd.unique_hidden_name(),
            EnableLogging=False,
        )
    else:
        # if it is an array should call CreateWorkspace(EnableLogging=False)
        raise NotImplementedError("Somebody needs to create an output from {} (type={})".format(value, type(value)))


def _calc_half_polarization_up(flipper_off, flipper_on, efficiency, flipping_ratio):
    """This calculates the spin up workspace

    Parameters
    ----------
    flipper_off_workspace: str, ~mantid.api.MatrixWorkspace
        Flipper off measurement
    flipper_on_workspace: str, ~mantid.api.MatrixWorkspace
        Flipper on measurement
    efficiency: ~mantid.api.MatrixWorkspace
        Flipper efficiency
    flipping_ratio: ~mantid.api.MatrixWorkspace
        The ratio of flipping

    Returns
    -------
    ~mantid.api.MatrixWorkspace
        The spin up workspace
    """
    __spin_up = flipper_off + (flipper_off - flipper_on) / (efficiency * (flipping_ratio - 1.0))

    e = efficiency.extractY()[0]
    F = flipping_ratio.extractY()[0]

    # the uncertainties (numerically) aren't correct because of build-up of numerical errors.
    # Recalculate them based on the proper equation based on a hand calculation of the partial derivatives
    m0_part = np.square(flipper_off.extractE()[0][0] * (1 + 1 / (e * (F - 1))))
    m1_part = np.square(flipper_on.extractE()[0][0] * (1 / (e * (F - 1))))

    mixed = np.square((flipper_off.extractY()[0][0] - flipper_on.extractY()[0][0]) / (e * (F - 1)))
    mixed *= np.square(efficiency.extractE()[0][0] / e) + np.square(flipping_ratio.extractE()[0][0] / (F - 1))
    sup_err = np.sqrt(m0_part + m1_part + mixed)

    # set the uncertainty in the workspace
    __spin_up = _set_uncertainty_from_numpy(__spin_up, sup_err)

    return __spin_up


def _calc_half_polarization_down(flipper_off, flipper_on, efficiency, flipping_ratio):
    """This calculates the spin down workspace

    Parameters
    ----------
    flipper_off_workspace: str, ~mantid.api.MatrixWorkspace
        Flipper off measurement
    flipper_on_workspace: str, ~mantid.api.MatrixWorkspace
        Flipper on measurement
    efficiency: ~mantid.api.MatrixWorkspace
        Flipper efficiency
    flipping_ratio: ~mantid.api.MatrixWorkspace
        The ratio of flipping

    Returns
    -------
    ~mantid.api.MatrixWorkspace
        The spin down workspace
    """
    __spin_down = flipper_off - (flipper_off - flipper_on) / (efficiency * (1.0 - 1.0 / flipping_ratio))

    e = efficiency.extractY()[0]
    F = flipping_ratio.extractY()[0]

    # the uncertainties (numerically) aren't correct because of build-up of numerical errors.
    # Recalculate them based on the proper equation based on a hand calculation of the partial derivatives
    m0_part = np.square(flipper_off.extractE()[0][0] * (1 - 1 / (e * (1 - 1 / F))))
    m1_part = np.square(flipper_on.extractE()[0][0] * (1 / (e * (1 - 1 / F))))
    mixed = np.square((flipper_off.extractY()[0][0] - flipper_on.extractY()[0][0]) / (e * (1 - 1 / F)))
    mixed *= np.square(efficiency.extractE()[0][0] / e) + np.square(
        flipping_ratio.extractE()[0][0] / (F * F * (1 - 1 / F))
    )

    sdn_err = np.sqrt(m0_part + m1_part + mixed)

    # set the uncertainty in the workspace
    __spin_down = _set_uncertainty_from_numpy(__spin_down, sdn_err)

    return __spin_down


[docs] def half_polarization( flipper_off_workspace, flipper_on_workspace, polarization, efficiency, spin_up_workspace=None, spin_down_workspace=None, ): """Calculate the spin up/down workspaces from flipper on/off. **Mantid algorithms used:** :ref:`RenameWorkspace <algm-RenameWorkspace-v1>` Parameters ---------- flipper_off_workspace: str, ~mantid.api.MatrixWorkspace Flipper off measurement flipper_on_workspace: str, ~mantid.api.MatrixWorkspace Flipper on measurement polarization: str, ~mantid.api.MatrixWorkspace Polarization state efficiency: str, ~mantid.api.MatrixWorkspace Flipper efficiency spin_up_workspace: str Name of the resulting spin up workspace. If :py:obj:`None`, then ``flipper_off_workspace`` will be overwritten. spin_down_workspace: str Name of the resulting spin down workspace. If :py:obj:`None`, then ``flipper_on_workspace`` will be overwritten. Returns ------- py:obj:`tuple` of 2 ~mantid.api.MatrixWorkspace """ if spin_up_workspace is None: spin_up_workspace = str(flipper_off_workspace) if spin_down_workspace is None: spin_down_workspace = str(flipper_on_workspace) # this is denoted as "F" in the master document flipping_ratio = _calc_flipping_ratio(polarization) __spin_up = _calc_half_polarization_up(flipper_off_workspace, flipper_on_workspace, efficiency, flipping_ratio) __spin_down = _calc_half_polarization_down(flipper_off_workspace, flipper_on_workspace, efficiency, flipping_ratio) spin_up_workspace = RenameWorkspace(InputWorkspace=__spin_up, OutputWorkspace=spin_up_workspace) spin_down_workspace = RenameWorkspace(InputWorkspace=__spin_down, OutputWorkspace=spin_down_workspace) DeleteWorkspace(flipping_ratio) return (spin_up_workspace, spin_down_workspace)
# A simple way to encode the name and specifications for one of the time generators methods of class SimulatedLogs # Example: polarizer_veto=TimesGeneratorSpecs("binary_pulse", {"interval": 1.0, "veto_duration": 0.2}) TimesGeneratorSpecs = namedtuple("TimesGeneratorSpecs", ["name", "kwargs"])
[docs] @dataclass class SimulatedPolarizationLogs: """A simulated log for testing purposes.""" polarizer: int = 0 polarizer_flipper: Optional[TimesGeneratorSpecs] = None polarizer_veto: Optional[TimesGeneratorSpecs] = None analyzer: int = 0 analyzer_flipper: Optional[TimesGeneratorSpecs] = None analyzer_veto: Optional[TimesGeneratorSpecs] = None # Class variables flipper_generators: ClassVar[List[str]] = ["heartbeat"] # available time-generators for flipper devices veto_generators: ClassVar[List[str]] = ["binary_pulse"] # available time-generators for veto intervals def __post_init__(self): # Validate input polarizer and analyzer flipper generators for flipper, device in zip([self.polarizer_flipper, self.analyzer_flipper], ["polarizer", "analyzer"]): if flipper and flipper.name not in self.flipper_generators: raise ValueError( f"The {device} flipper generator must be one of {self.flipper_generators}, got '{flipper.name}'" ) # Validate input polarizer and analyzer veto generators for veto, device in zip([self.polarizer_veto, self.analyzer_veto], ["polarizer", "analyzer"]): if veto and veto.name not in self.veto_generators: raise ValueError( f"The {device} veto generator must be one of {self.veto_generators}, got '{veto.name}'" )
[docs] def heartbeat( self, interval: float, dead_time: Optional[float] = 0.0, upper_bound: Optional[float] = None ) -> Generator[float, None, None]: """ Generate a sequence of timestamps at regular intervals, starting at or later than dead_time. Parameters ---------- interval : float The time interval between consecutive timestamps, in seconds. dead_time: float The initial time period, in seconds, during which no times are generated. Defaults to 0.0. upper_bound : float, optional The maximum time value to generate, in seconds. If None, the generator will continue indefinitely. Yields ------ float The next timestamp in the sequence. Examples -------- >>> log = SimulatedPolarizationLogs() >>> list(log.heartbeat(interval=1.0, upper_bound=5.0)) [0, 1.0, 2.0, 3.0, 4.0, 5.0] """ elapsed = 0 while elapsed <= upper_bound if upper_bound is not None else True: if elapsed >= dead_time: yield elapsed elapsed += interval
[docs] def binary_pulse( self, interval: float, veto_duration: float, dead_time: Optional[float] = 0.0, upper_bound: Optional[float] = None, ) -> Generator[float, None, None]: """ Generate a sequence of timestamps with a binary pulse pattern, starting at or later than dead_time. The timestamps alternate between the start and end of a veto period,which is centered within each interval. Parameters ---------- interval : float The time interval between consecutive pulses, in seconds. veto_duration : float The duration of the veto period, in seconds. Must be less than the interval. dead_time: float The initial time period, in seconds, during which no times are generated. Defaults to 0.0. upper_bound : float, optional The maximum time value to generate, in seconds. If None, the generator will continue indefinitely. Yields ------ float The next timestamp in the binary pulse sequence. Examples -------- >>> log = SimulatedPolarizationLogs() >>> list(log.binary_pulse(interval=3.0, veto_duration=1.0, upper_bound=10)) [0, 2.5, 3.5, 5.5, 6.5, 8.5, 9.5] """ if not veto_duration < interval: raise ValueError("Veto duration must be less than the interval") elapsed, latest_pulse_time, veto_half, continue_while = 0.0, interval, veto_duration / 2, True if dead_time == 0.0: yield elapsed while continue_while: for elapsed in [latest_pulse_time - veto_half, latest_pulse_time + veto_half]: if (upper_bound is None) or (elapsed <= upper_bound): if elapsed >= dead_time: yield elapsed else: continue_while = False # exit the outer while loop break # exit the immediate `for` loop latest_pulse_time += interval
[docs] def times_generator(self, pv_name: str, **options: dict) -> Optional[Generator[float, None, None]]: """ Create a generator that yields time points This method selects the appropriate time generator function (e.g., `heartbeat` or `binary_pulse`) based on the PV name and its associated generator specifications. Additional options can be passed to override or extend the generator specifications. Parameters ---------- pv_name : str The name of the process variable (e.g., 'PolarizerFlipper', 'PolarizerVeto', etc.). **options : dict Additional keyword arguments to override or extend the generator's default arguments. Raises ------ KeyError If the provided PV name does not match any known process variable. AttributeError If the generator function associated with the PV name is not found. Examples -------- >>> logs = SimulatedPolarizationLogs( ... polarizer_flipper=TimesGeneratorSpecs("heartbeat", {"interval": 1.0}), ... polarizer_veto=TimesGeneratorSpecs("binary_pulse", {"interval": 1.0, "veto_duration": 0.4}) ... ) >>> list(logs.times_generator(PV_POLARIZER_FLIPPER, upper_bound=6.3)) [0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0] >>> list(logs.times_generator(PV_POLARIZER_VETO, upper_bound=6.3)) [0.0, 0.8, 1.2, 1.8, 2.2, 3.8, 4.2, 4.8, 5.2, 5.8, 6.2] >>> logs.times_generator(PV_ANALYZER_FLIPPER) None >>> logs.times_generator(PV_ANALYZER_VETO) None """ # conversion between PV name and class field converter = { PV_POLARIZER_FLIPPER: self.polarizer_flipper, PV_POLARIZER_VETO: self.polarizer_veto, PV_ANALYZER_FLIPPER: self.analyzer_flipper, PV_ANALYZER_VETO: self.analyzer_veto, } specs_field = converter[pv_name] if specs_field is None: return None generator_function = getattr(self, specs_field.name) kwargs = {**specs_field.kwargs, **options} return generator_function(**kwargs)
[docs] def inject(self, input_workspace: MantidWorkspace): """ Injects simulated log data into a Mantid workspace. This method adds polarizer and analyzer values as single-valued logs, and generates time-series logs for flippers and veto process variables based on their associated time-generator specifications. Values for flipper and veto time-series are either 0 or 1, and always start with 0 for simplicity. Parameters ---------- input_workspace : MantidWorkspace The Mantid workspace into which the simulated logs will be injected. Raises ------ AttributeError If the workspace does not contain required sample log entries like `run_start` or `duration`. Examples -------- >>> workspace = CreateSingleValuedWorkspace(OutputWorkspace="example") >>> sample_logs = SampleLogs(workspace) >>> sample_logs.insert("start_time", "2023-10-01T00:00:00") >>> sample_logs.insert("duration", 300) >>> logs = SimulatedPolarizationLogs( ... polarizer=1, ... polarizer_flipper=TimesGeneratorSpecs("heartbeat", {"interval": 1.0}), ... polarizer_veto=TimesGeneratorSpecs("binary_pulse", {"interval": 2.0, "veto_duration": 0.5}) ... ) >>> logs.inject(workspace) """ sample_logs = SampleLogs(input_workspace) # Retrieve the run start time and duration from the sample logs, handling potential attribute differences. try: run_start = sample_logs.run_start.value except AttributeError: run_start = sample_logs.start_time.value duration: float = sample_logs.duration.value # in seconds # insert polarizer and analyzer types sample_logs.insert(name=PV_POLARIZER, value=self.polarizer) sample_logs.insert(name=PV_ANALYZER, value=self.analyzer) # insert the time series for pv in [PV_POLARIZER_FLIPPER, PV_POLARIZER_VETO, PV_ANALYZER_FLIPPER, PV_ANALYZER_VETO]: times = self.times_generator(pv, upper_bound=duration) if times is None: # no time generator specifications for this PV continue times = list(times) # run the generator to get all times values = [i % 2 for i in range(len(times))] # Alternating zeros and ones sample_logs.insert_time_series(name=pv, start_time=run_start, elapsed_times=times, values=values)