Source code for drtsans.mono.biosans.prepare_sensitivities_correction

# local imports
from drtsans.mask_utils import circular_mask_from_beam_center, apply_mask
from drtsans.mono.biosans import apply_transmission_correction, calculate_transmission
from drtsans.mono.biosans.api import prepare_data
from drtsans.mono.biosans.geometry import has_midrange_detector
from drtsans.mono.spice_data import SpiceRun
from drtsans.path import abspath
from drtsans.prepare_sensivities_correction import PrepareSensitivityCorrection as PrepareBase
from drtsans.process_uncertainties import set_init_uncertainties

# third party imports
from mantid.api import Workspace as MantidWorkspace
from mantid.kernel import logger
from mantid.simpleapi import DeleteWorkspace, LoadEventNexus, LoadNexusProcessed, MaskAngle, mtd
import numpy as np

# standard imports
import os
from typing import List, Union


# Constants
PIXEL = "Pixel"


[docs] class PrepareSensitivityCorrection(PrepareBase): def __init__(self, component="detector1"): super().__init__("CG3", component=component) self._curved_detectors = None # either ["wing_detector"] or ["wing_detector", "midrange_detector"] self._theta_dep_correction = False self._biosans_beam_trap_factor = 2 # mask the area around the direct beam to remove it and the associated parasitic scattering # Mask angles on main detector pixels to mask on beam center. self._main_det_mask_angle = None @property def curved_detectors(self) -> List[str]: r""" List of curved detectors by inspecting the first of the flood files. Returns ------- Either ``["wing_detector"]`` or ``["wing_detector", "midrange_detector"]`` depending on whether the data files contain the midrange detector. """ if self._curved_detectors is None: # initialize the cache self._curved_detectors = ["wing_detector"] # load the first flood run to check if midrange detector is present first_flood_run = self._flood_runs[0] if isinstance(first_flood_run, str) and first_flood_run.isdigit(): first_flood_run = int(first_flood_run) # convert string to integer if isinstance(first_flood_run, int): first_flood_run = abspath(first_flood_run, instrument="CG3") try: _first_flood_run = LoadEventNexus( Filename=first_flood_run, MetaDataOnly=True, # no need to load the events LoadNexusInstrumentXML=self._enforce_use_nexus_idf, OutputWorkspace=mtd.unique_hidden_name(), ) except RuntimeError: # loading an events file saved as a nexus file by Mantid _first_flood_run = LoadNexusProcessed(Filename=first_flood_run) if has_midrange_detector(_first_flood_run): self._curved_detectors.append("midrange_detector") DeleteWorkspace(_first_flood_run) return self._curved_detectors
[docs] def set_masks(self, default_mask, pixels, main_det_mask_angle=None, **kwargs) -> None: """Set masks Parameters ---------- default_mask : str or ~mantid.api.MaskWorkspace or :py:obj:`list` or None Mask to be applied. If :py:obj:`list`, it is a list of detector ID's. mask file name pixels : str or None pixels to mask. Example: '1-8,249-256' main_det_mask_angle : float or None angle to mask (MaskAngle) to main detector kwargs : dict additional keyword arguments to accept deprecated argument "wing_det_mask_angle". Unused. """ super().set_masks(default_mask, pixels) if main_det_mask_angle is not None: self._main_det_mask_angle = main_det_mask_angle
def _get_beam_center_run(self, index): r""" Input beam center associated to a particular flood run. if direct beam center runs are not defined, then the flood run itself is used as direct beam center run Parameters ---------- index : int beam center run index mapped to flood run index Returns ------- int, str beam center run number (as an int) or absolute file path (as a string) """ if self._direct_beam_center_runs is None: return self._flood_runs[index] else: return super()._get_beam_center_run(index) def _prepare_data_opts(self, beam_center): r"""Set additional options for function prepare_data Parameters ---------- beam_center : List[float] beam center for the main detector (X,Y), the wing detector (Y) and optionally the midrange detector (Y) Returns ------- dict """ opts = super()._prepare_data_opts(beam_center) if len(beam_center) > 2: opts["center_y_wing"] = beam_center[2] if len(beam_center) > 3: opts["center_y_midrange"] = beam_center[3] return opts def _get_beam_center_workspace(self, beam_center_run): r""" Load and prepare the beam center run with customary corrections. Also masks the curved detectors. Parameters ---------- beam_center_run: str instrument name plus run number or absolute file path Returns ------- ~mantid.api.Workspace2D """ beam_center_workspace = super()._get_beam_center_workspace(beam_center_run) # Mask curved detectors, unnecessary but reduces the execution time of algorithm FindCenterOfMassPosition apply_mask(beam_center_workspace, Components=self.curved_detectors) return beam_center_workspace def _mask_beam_center(self, flood_ws: MantidWorkspace, beam_center: str) -> MantidWorkspace: """Mask the beam center pixels and all detector panels but the one of interest. Mask beam center with three algorithms, tried in sequence: 1. if beam center mask file is present, use it. It's assumed it will mask the beam center pixels and all detector panels but the one of interest. 2. Mask all detector panels but the one of interest. If the panel of interest is the main panel, mask all pixels in the main detector subtending TwoTheta < main_det_mask_angle. 3. Mask all detector pixels within a distance of property "beam_center_radius" from the beam axis. Parameters ---------- flood_ws Mantid workspace for flood data beam_center mask file to mask the bean center pixels """ if isinstance(beam_center, str): apply_mask(flood_ws, mask=beam_center) elif self._main_det_mask_angle is not None: components_to_mask = ["detector1"] + self.curved_detectors components_to_mask.remove(self._component) # mask everything but the component of interest apply_mask(flood_ws, Components=components_to_mask) # mask all pixels subtending TwoTheta < main_det_mask_angle. It's unlikely this will affect any pixel in # the wing or midrange detectors, but it's possible if they're located close enough to the beam axis MaskAngle(Workspace=flood_ws, MaxAngle=self._main_det_mask_angle, Angle="TwoTheta") else: masking = list(circular_mask_from_beam_center(flood_ws, self._beam_center_radius)) apply_mask(flood_ws, mask=masking) # data_ws reference shall not be invalidated here! # Set uncertainties # output: masked are zero intensity and zero error masked_flood_ws = set_init_uncertainties(flood_ws) return masked_flood_ws
[docs] def set_transmission_correction(self, transmission_flood_runs, transmission_reference_runs, beam_trap_factor=2): """Set transmission beam run and transmission flood runs Parameters ---------- transmission_flood_runs : int or tuple or list transmission flood runs transmission_reference_runs : int or tuple or list transmission reference runs beam_trap_factor : float, int factor to beam trap size for masking angle Returns ------- """ def format_run_or_runs(run_s): """Format input run or runs to list of run or file names Parameters ---------- run_s: int or str or ~list a run, a NeXus file name, or a list of runs Returns ------- ~list """ if isinstance(run_s, (int, str)): # an integer or string as run number run_list = [run_s] else: # a sequence, tuple or list run_list = list(run_s) return run_list # transmission reference self._transmission_reference_runs = format_run_or_runs(transmission_reference_runs) # transmission flood self._transmission_flood_runs = format_run_or_runs(transmission_flood_runs) # Set the beam trap factor for transmission reference and flood run to mask angle self._biosans_beam_trap_factor = beam_trap_factor
[docs] def set_theta_dependent_correction_flag(self, flag): """Set the flag to do theta dep with transmission correction Parameters ---------- flag : bool True to do the correction Returns ------- """ self._theta_dep_correction = flag
def _apply_transmission_correction(self, flood_ws, transmission_beam_run, transmission_flood_run, beam_center): """Calculate and apply transmission correction Parameters ---------- flood_ws : MarixWorkspace Flood run workspace to transmission correct workspace transmission_beam_run : int or str run number for transmission beam run transmission_flood_run : int or str run number for transmission flood run beam_center : ~tuple detector center Returns ------- MatrixWorkspace Flood workspace with transmission corrected """ if isinstance(transmission_beam_run, str) and os.path.exists(transmission_beam_run): sans_data = transmission_beam_run elif ( isinstance(transmission_beam_run, str) and transmission_beam_run.isdigit() or isinstance(transmission_beam_run, int) ): sans_data = "{}_{}".format(self._instrument, transmission_beam_run) else: raise TypeError( f"Transmission run {transmission_beam_run} of type {type(transmission_beam_run)} " f"is not supported to load a NeXus run from it" ) transmission_workspace = prepare_data( data=sans_data, pixel_calibration=self._apply_calibration, mask=self._default_mask, btp=self._extra_mask_dict, solid_angle=False, output_workspace="TRANS_{}_{}".format(self._instrument, transmission_beam_run), **self._prepare_data_opts(beam_center), ) # Apply mask apply_mask(transmission_workspace, Components=self.curved_detectors) MaskAngle( Workspace=transmission_workspace, MinAngle=self._biosans_beam_trap_factor * self._main_det_mask_angle, Angle="TwoTheta", ) if not os.path.exists(transmission_flood_run): # given run number: form to CG3_XXX mtd_trans_run = "{}_{}".format(self._instrument, transmission_flood_run) else: # already a file path mtd_trans_run = transmission_flood_run transmission_flood_ws = prepare_data( data=mtd_trans_run, pixel_calibration=self._apply_calibration, mask=self._default_mask, btp=self._extra_mask_dict, solid_angle=False, output_workspace="TRANS_{}_{}".format(self._instrument, transmission_flood_run), **self._prepare_data_opts(beam_center), ) # Apply mask apply_mask(transmission_flood_ws, Components=self.curved_detectors) MaskAngle( Workspace=transmission_flood_ws, MinAngle=self._biosans_beam_trap_factor * self._main_det_mask_angle, Angle="TwoTheta", ) # Zero-Angle Transmission Co-efficients transmission_corr_ws = calculate_transmission(transmission_flood_ws, transmission_workspace) average_zero_angle = np.mean(transmission_corr_ws.readY(0)) average_zero_angle_error = np.linalg.norm(transmission_corr_ws.readE(0)) logger.notice( f"Transmission Coefficient is {average_zero_angle:.3f} +/- " f"{average_zero_angle_error:.3f}." f"Transmission flood {str(transmission_flood_ws)} and " f"transmission {str(transmission_workspace)}" ) # Apply calculated transmission flood_ws = apply_transmission_correction( flood_ws, trans_workspace=transmission_corr_ws, theta_dependent=self._theta_dep_correction, ) return flood_ws
[docs] def prepare_spice_sensitivities_correction( component: str, flood_run: SpiceRun, direct_beam_run: SpiceRun, dark_current_run: SpiceRun, apply_solid_angle_correction: bool, transmission_flood_run: SpiceRun, transmission_reference_run: SpiceRun, beam_trap_size_factor: float, apply_theta_dependent_correction: bool, universal_mask_file: Union[str, None], pixels_to_mask: str, beam_center_mask_radius: float, main_detector_mask_angle: float, wing_detector_mask_angle: float, min_count_threshold: float, max_count_threshold: float, sensitivity_file_name: str, nexus_dir: str = None, ): """Prepare sensitivities from SPICE files Parameters ---------- component: str Detector panel to use. One of "detector1" or "wing_detector" flood_run: SpiceRun flood run direct_beam_run: SpiceRun or None direct beam run dark_current_run: SpiceRun dark current run apply_solid_angle_correction: bool Flag to apply solid angle correction to flood run transmission_flood_run: SpiceRun transmission flood run transmission_reference_run: SpiceRun transmission reference run beam_trap_size_factor: float size factor of beam trap given by user apply_theta_dependent_correction: bool Flag to apply theta dependent correction to transmission run universal_mask_file: str path to mask file applied to all the runs pixels_to_mask: str lists of pixels (IDs) to mask beam_center_mask_radius: float radius of mask for beam center in mm main_detector_mask_angle: float angle for main detector mask wing_detector_mask_angle: float deprecated min_count_threshold: float minimum normalized count threshold as a good pixel max_count_threshold: float maximum normalized count threshold as a good pixel sensitivity_file_name: str output file name with full path nexus_dir: str or None directory for nexus file. None for default. """ if wing_detector_mask_angle is not None: logger.notice("Option wing_detector_mask_angle is deprecated.") # Set up sensitivities preparation configurations if component not in ["detector1", "wing_detector"]: # old spice data lacks the midrange detector raise ValueError(f"Unknown component {component}. Must be one of 'detector1' or 'wing_detector'") preparer = PrepareSensitivityCorrection(component=component) # Load flood runs preparer.set_flood_runs([flood_run.unique_nexus_name(nexus_dir, True)]) # Process beam center runs if direct_beam_run is not None: preparer.set_direct_beam_runs([direct_beam_run.unique_nexus_name(nexus_dir, True)]) # Set extra masks preparer.set_masks( universal_mask_file, pixels_to_mask, main_det_mask_angle=main_detector_mask_angle, ) # Set beam center radius if beam_center_mask_radius is not None: preparer.set_beam_center_radius(beam_center_mask_radius) # Transmission if transmission_reference_run is not None: trans_flood_file = transmission_flood_run.unique_nexus_name(nexus_dir, True) trans_ref_file = transmission_reference_run.unique_nexus_name(nexus_dir, True) preparer.set_transmission_correction( transmission_flood_runs=[trans_flood_file], transmission_reference_runs=[trans_ref_file], beam_trap_factor=beam_trap_size_factor, ) preparer.set_theta_dependent_correction_flag(apply_theta_dependent_correction) # Dark runs if dark_current_run is not None: dark_curr_file = dark_current_run.unique_nexus_name(nexus_dir, True) preparer.set_dark_current_runs([dark_curr_file]) # solid angle correction preparer.set_solid_angle_correction_flag(apply_solid_angle_correction) # Run moving_detector = False preparer.execute( moving_detector, min_count_threshold, max_count_threshold, sensitivity_file_name, enforce_use_nexus_idf=True, debug_mode=True, )