Source code for drtsans.tof.eqsans.chopper

r"""
This module provides class `EQSANSDiskChopperSet` representing the set of disk choppers.
Prior to 2026, EQSANS had four disk choppers (two of them paired as a double chopper).
Starting in 2026, EQSANS has six disk choppers (three double choppers).
The main goal of the module is to find the set of neutron wavelength
bands transmitted by the chopper set, given definite choppers settings such as aperture and starting phase.
"""

import importlib
import json

import numpy as np
from drtsans.chopper import DiskChopper, DiskChopperSetConfiguration
from drtsans.samplelogs import SampleLogs
from drtsans.frame_mode import FrameMode
from drtsans.path import exists
from mantid.api import Run
from mantid.simpleapi import LoadNexusProcessed, mtd

from drtsans.wavelength import Wbands


[docs] class EQSANSDiskChopperSet: r""" Set of disks choppers installed in EQSANS. Parameters ---------- other: file name, workspace, Run object, run number Load the chopper settings from this object. """ #: Neutrons of a given wavelength :math:`\lambda` emitted from the moderator follow a distribution of delayed #: emission times that depends on the wavelength, and is characterized by function #: :math:`FWHM(\lambda) \simeq pulsewidth \cdot \lambda`. #: This is the default :math:`pulsewidth` in micro-sec/Angstrom. _pulse_width = 20 #: The number of wavelength bands transmitted by a disk chopper is determined by the slowest emitted neutron, #: expressed as the maximum wavelength. This is the default cut-off maximum wavelength, in Angstroms. _cutoff_wl = 35 def __init__(self, other): # Load choppers settings from the logs if isinstance(other, Run) or str(other) in mtd: sample_logs = SampleLogs(other) elif exists(other): ws = LoadNexusProcessed(other) sample_logs = SampleLogs(ws) else: raise RuntimeError("{} is not a valid file name, workspace, Run object or run number".format(other)) # Get the chopper configuration (4 or 6 choppers) self.chopper_config: DiskChopperSetConfiguration = self.get_chopper_configuration(sample_logs.start_time.value) self._choppers = list() for chopper_index in range(self._n_choppers): aperture = self._aperture[chopper_index] to_source = self._to_source[chopper_index] speed = sample_logs["Speed{}".format(1 + chopper_index)].value.mean() sensor_phase = sample_logs["Phase{}".format(1 + chopper_index)].value.mean() ch = DiskChopper(to_source, aperture, speed, sensor_phase) ch.pulse_width = self._pulse_width ch.cutoff_wl = self._cutoff_wl self._choppers.append(ch) # Determine period and if frame skipping mode from the first chopper ch = self._choppers[0] # example of frame skipping: chopper speed 30 Hz, pulse frequency 60 Hz: abs(30 - 60) / 2 = 15 condition = abs(ch.speed - sample_logs.frequency.value.mean()) / 2 > 1 self.frame_mode = FrameMode.skip if condition else FrameMode.not_skip # Select appropriate offsets, based on the frame-skip mode. for chopper_index in range(self._n_choppers): ch = self._choppers[chopper_index] ch.offset = self._offsets[self.frame_mode][chopper_index]
[docs] def transmission_bands(self, cutoff_wl: float = None, delay: float = 0, pulsed: bool = False) -> Wbands: r""" Wavelength bands transmitted by the chopper apertures. The number of bands is determined by the slowest neutrons emitted from the moderator. Parameters ---------- cutoff_wl: float maximum wavelength of incoming neutrons. Discard slower neutrons when finding the transmission bands. delay: float Additional time-of-flight to include in the calculations. For instance, this could be a multiple of the the pulse period. pulsed: bool Include a correction due to delayed emission of neutrons from the moderator. See :const:`~drtsans.tof.eqsans.chopper.EQSANSDiskChopperSet._pulse_width` for a more detailed explanation. Returns ------- ~drtsans.wavelength.Wbands Set of wavelength bands transmitted by the chopper. """ if cutoff_wl is None: cutoff_wl = self._cutoff_wl # Filter out the choppers with zero speed, which do not contribute to the transmission bands moving_choppers = [ch for ch in self._choppers if not np.isclose(ch.speed, 0.0)] if not moving_choppers: return Wbands() # Transmission bands of the first chopper wb = moving_choppers[0].transmission_bands(cutoff_wl, delay, pulsed) # Find the common transmitted bands between the first chopper # and the ensuing choppers for ch in moving_choppers[1:]: wb_other = ch.transmission_bands(cutoff_wl, delay, pulsed) wb *= wb_other # We end up with the transmission bands of the chopper set return wb
[docs] def get_chopper_configuration(self, start_time: str) -> DiskChopperSetConfiguration: r""" Get the chopper configuration (number of choppers, apertures, distances to source, offsets) from the JSON configuration file based on the log "start_time". Parameters ---------- start_time String representing the run start time in the format: "YYYY-MM-DDThh:mm:ssZ" Returns ------- DiskChopperSetConfiguration Configuration of the disk choppers. """ # Get daystamp from sample logs (format: YYYYMMDD) start_time_str = start_time[0:10] # "YYYY-MM-DD" daystamp = int(start_time_str.replace("-", "")) # Convert to YYYYMMDD integer # Load configuration from JSON file with importlib.resources.open_text("drtsans.configuration", "EQSANS_chopper_configurations.json") as file: configs = json.load(file) return DiskChopperSetConfiguration.from_json(configs, daystamp)
@property def period(self): return self._choppers[0].period def __getitem__(self, item): return self._choppers[item] @property def pulse_width(self): return self._pulse_width @property def _n_choppers(self): return self.chopper_config.n_choppers @property def _aperture(self): return self.chopper_config.aperture @property def _to_source(self): return self.chopper_config.to_source @property def _offsets(self): return self.chopper_config.offsets