Source code for drtsans.prepare_sensivities_correction

# local imports
from drtsans.load import load_events
from drtsans.geometry import panel_names
from drtsans.instruments import extract_run_number
from drtsans.mask_utils import circular_mask_from_beam_center, apply_mask
import drtsans.mono.gpsans
import drtsans.mono.biosans
from drtsans.process_uncertainties import set_init_uncertainties
from drtsans.sensitivity_correction_moving_detectors import (
    calculate_sensitivity_correction as calculate_sensitivity_correction_moving,
)
from drtsans.sensitivity_correction_patch import (
    calculate_sensitivity_correction as calculate_sensitivity_correction_patch,
)
import drtsans.tof.eqsans

# third party imports
import h5py
from mantid.api import mtd
from mantid.kernel import logger
from mantid.simpleapi import (
    SaveNexusProcessed,
    Integration,
    MaskDetectors,
    CreateWorkspace,
)
import numpy as np

# standard imports
import os


# Constants
CG2 = "CG2"
CG3 = "CG3"
EQSANS = "EQSANS"
PIXEL = "Pixel"
MOVING_DETECTORS = "Moving Detectors"
PATCHING_DETECTORS = "Patching Detectors"

# As this script is a wrapper to handle script prepare_sensitivity
# (e.g. https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/scripts%2Fprepare_sensitivities_biosans.py),
# by which user just needs to set up instrument name but not is required to import the right modules
# (for example drtsans.mono.gpsans or drtsans.tof.eqsans),
# therefore it has to import the correct ones according instrument name in string.
# Using dictionary with instrument name as key is solution for it.

# prepare data  in .../api.py
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans%2Fmono%2Fgpsans%2Fapi.py
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans%2Fmono%2Fbiosans%2Fapi.py
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans%2Ftof%2Feqsans%2Fapi.py
PREPARE_DATA = {
    CG2: drtsans.mono.gpsans.api.prepare_data,
    CG3: drtsans.mono.biosans.api.prepare_data,
    EQSANS: drtsans.tof.eqsans.api.prepare_data,
}

# Find beam center in .../find_beam_center.py
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans%2Fmono%2Fbiosans%2Fbeam_finder.py
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans%2Fbeam_finder.py
FIND_BEAM_CENTER = {
    CG2: drtsans.mono.gpsans.find_beam_center,
    CG3: drtsans.mono.biosans.find_beam_center,
    EQSANS: drtsans.tof.eqsans.find_beam_center,
}

# Center detector in
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans%2Fmono%2Fbiosans%2Fbeam_finder.py
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans%2Fbeam_finder.py
CENTER_DETECTOR = {
    CG2: drtsans.mono.gpsans.center_detector,
    CG3: drtsans.mono.biosans.center_detector,
    EQSANS: drtsans.tof.eqsans.center_detector,
}

# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans%2Fsolid_angle.py
SOLID_ANGLE_CORRECTION = {
    CG2: drtsans.mono.gpsans.solid_angle_correction,
    CG3: drtsans.mono.biosans.solid_angle_correction,
    EQSANS: drtsans.solid_angle_correction,
}

# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans%2Fsensitivity_correction_moving_detectors.py
# https://code.ornl.gov/sns-hfir-scse/sans/sans-backend/blob/next/drtsans%2Fsensitivity_correction_patch.py
CALCULATE_SENSITIVITY_CORRECTION = {
    MOVING_DETECTORS: calculate_sensitivity_correction_moving,
    PATCHING_DETECTORS: calculate_sensitivity_correction_patch,
}


[docs] class PrepareSensitivityCorrection(object): """Workflow (container) class to prepare sensitivities correction file It tries to manage the various configuration and approaches for instrument scientists to prepare sensitivities file for EQSANS, GPSANS and BIOSANS. """ @staticmethod def _get_masked_detectors(workspace): """Get the detector masking information Parameters ---------- workspace : ~mantid.api.MatrixWorkspace Workspace to get masked detectors' masking status Returns ------- numpy.ndarray (N, 1) bool array, True for being masked """ # The masked pixels after `set_uncertainties()` will have zero uncertainty. # Thus, it is an efficient way to identify them by check uncertainties (E) close to zero masked_array = workspace.extractE() < 1e-5 return masked_array @staticmethod def _set_mask_value(flood_workspace, det_mask_array, use_moving_detector_method=True): """Set masked pixels' values to NaN or -infinity according to mask type and sensitivity correction algorithm Parameters ---------- flood_workspace : Flood data workspace space to have masked pixels' value set det_mask_array : numpy.ndarray Array to indicate pixel to be masked or not use_moving_detector_method : bool True for calculating sensitivities by moving detector algorithm; otherwise for detector patching algorithm Returns ------- """ # Complete mask array. Flood workspace has been processed by set_uncertainties. Therefore all the masked # pixels' uncertainties are zero, which is different from other pixels total_mask_array = flood_workspace.extractE() < 1e-6 # Loop through each detector pixel to check its masking state to determine whether its value shall be # set to NaN, -infinity or not changed (i.e., for pixels without mask) num_spec = flood_workspace.getNumberHistograms() problematic_pixels = list() for i in range(num_spec): if total_mask_array[i][0] and use_moving_detector_method: # Moving detector algorithm. Any masked detector pixel is set to NaN flood_workspace.dataY(i)[0] = np.nan flood_workspace.dataE(i)[0] = np.nan elif total_mask_array[i][0] and not use_moving_detector_method and det_mask_array[i][0]: # Patch detector method: Masked as the bad pixels and thus set to NaN flood_workspace.dataY(i)[0] = np.nan flood_workspace.dataE(i)[0] = np.nan elif total_mask_array[i][0]: # Patch detector method: Pixels that have not been masked as bad pixels, but have been # identified as needing to have values set by the patch applied. To identify them, the # value is set to -INF. flood_workspace.dataY(i)[0] = np.NINF flood_workspace.dataE(i)[0] = np.NINF elif not total_mask_array[i][0] and not use_moving_detector_method and det_mask_array[i][0]: # Logic error: impossible case problematic_pixels.append(i) # END-FOR # Array if len(problematic_pixels) > 0: raise RuntimeError( f"Impossible case: pixels {problematic_pixels} has local detector mask is on, " f"but total mask is off" ) logger.debug( "Patch detector method: Pixels that have not been masked as bad pixels, but have been" "identified as needing to have values set by the patch applied. To identify them, the" "value is set to -INF." ) logger.notice("Number of infinities = {}".format(len(np.where(np.isinf(flood_workspace.extractY()))[0]))) logger.debug("Moving/Patch detector algorithm. Any masked detector pixel is set to NaN") logger.notice("Number of NaNs = {}".format(len(np.where(np.isnan(flood_workspace.extractY()))[0]))) return flood_workspace
[docs] @staticmethod def sum_input_runs(flood_workspaces): """Do NaN sum to all input flood workspaces Parameters ---------- flood_workspaces Returns ------- """ from mantid.simpleapi import CloneWorkspace # Do NaN sum y_list = [ws.extractY() for ws in flood_workspaces] y_matrix = np.array(y_list) nan_sum_matrix = np.nansum(y_matrix, axis=0) # clone a workspace cloned = CloneWorkspace(InputWorkspace=flood_workspaces[0], OutputWorkspace="FloodSum") for iws in range(cloned.getNumberHistograms()): cloned.dataY(iws)[0] = nan_sum_matrix[iws][0] # output SaveNexusProcessed(InputWorkspace=cloned, Filename="SummedFlood.nxs")
def __init__(self, instrument, component="detector1"): """Initialization Parameters ---------- instrument : str instrument name. One of CG2, CG3, EQSANS component : str One of the detector panels of the instrument (e.g. detector1, wing_detector, midrange_detector) """ if instrument not in [CG2, CG3, EQSANS]: raise RuntimeError("Instrument {} is not supported".format(instrument)) self._instrument = instrument # validate the component as part of the instrument if component not in panel_names(instrument): raise RuntimeError(f"Component {component} is not part of {instrument}") self._component = component # flood runs self._flood_runs = None # direct beam (center) runs self._direct_beam_center_runs = None # mask self._default_mask = None self._extra_mask_dict = dict() self._beam_center_radius = None # mm # flag to enforce to use IDF XML in NeXus file; otherwise, it may use IDF from Mantid library self._enforce_use_nexus_idf = False # Transmission correction self._transmission_reference_runs = None self._transmission_flood_runs = None # Dark current self._dark_current_runs = None # Pixel calibration default self._apply_calibration = False # Apply solid angle correction or not? self._solid_angle_correction = False @property def beam_center_radius(self) -> float: return self._beam_center_radius
[docs] def set_pixel_calibration_flag(self, apply_calibration): """Set the flag to apply pixel calibrations. Parameters ---------- apply_calibration : bool, str Flag for applying the pixel calibration. No calibration (False), Default (True), Calibration file (str) """ self._apply_calibration = apply_calibration
[docs] def set_solid_angle_correction_flag(self, apply_correction): """Set the flag to apply solid angle correction Parameters ---------- apply_correction : bool Flag for applying Returns ------- """ self._solid_angle_correction = apply_correction
[docs] def set_flood_runs(self, flood_runs): """Set flood run numbers Parameters ---------- flood_runs : ~list or int or tuple list of run number as integers Returns ------- None """ # Process flood runs if isinstance(flood_runs, (int, str)): self._flood_runs = [flood_runs] else: self._flood_runs = list(flood_runs)
[docs] def set_dark_current_runs(self, dark_current_runs): """Set dark current runs Parameters ---------- dark_current_runs : ~list or ~tuple or int Dark current run(s)'s run number(s) Returns ------- None """ if dark_current_runs is None: self._dark_current_runs = None elif isinstance(dark_current_runs, (int, str)): self._dark_current_runs = [dark_current_runs] else: self._dark_current_runs = list(dark_current_runs)
[docs] def set_direct_beam_runs(self, direct_beam_runs): """Set direct beam runs Parameters ---------- direct_beam_runs : ~list or int or tuple Returns ------- """ if isinstance(direct_beam_runs, (int, str)): # defined as a single value self._direct_beam_center_runs = [direct_beam_runs] else: # shall be a list or tuple self._direct_beam_center_runs = list(direct_beam_runs)
[docs] def set_masks(self, default_mask, pixels): """Set masks Parameters ---------- default_mask : str or ~mantid.api.MaskWorkspace or :py:obj:`list` or None Mask to be applied. If :py:obj:`list`, it is a list of detector ID's. mask file name pixels : str or None pixels to mask. Example: '1-8,249-256' Returns ------- None """ # default mask file or list of detector IDS or MaskWorkspace if default_mask is not None: self._default_mask = default_mask # pixels to mask if pixels is not None: self._extra_mask_dict[PIXEL] = pixels
[docs] def set_beam_center_radius(self, radius): """Set beam center radius Parameters ---------- radius : float radius in mm Returns ------- """ self._beam_center_radius = radius
def _get_beam_center_run(self, index): r""" Input beam center associated to a particular flood run. Parameters ---------- index : int beam center run index mapped to flood run index Returns ------- int, str beam center run number (as an int) or absolute file path (as a string) """ if self._direct_beam_center_runs is None: raise RuntimeError("Beam center runs must be given for {}".format(self._instrument)) return self._direct_beam_center_runs[index] def _prepare_data_opts(self, beam_center): r""" Set additional options for function prepare_data Parameters ---------- beam_center : list beam center for each of the double panels Returns ------- dict """ opts = dict(center_x=beam_center[0], center_y=beam_center[1], flux_method=None) if self._instrument in [CG2, CG3]: opts["flux_method"] = "monitor" opts["overwrite_instrument"] = False opts["enforce_use_nexus_idf"] = self._enforce_use_nexus_idf return opts def _get_beam_center_workspace(self, beam_center_run): r""" Load and prepare the beam center run with customary corrections Parameters ---------- beam_center_run : str instrument name plus run number or absolute file path Returns ------- ~mantid.api.Workspace2D """ # elucidate the name for the output workspace if os.path.exists(beam_center_run): output_workspace = f"BC_{self._instrument}_{extract_run_number(beam_center_run)}" else: output_workspace = f"BC_{beam_center_run}" beam_center_workspace = PREPARE_DATA[self._instrument]( data=beam_center_run, pixel_calibration=self._apply_calibration, mask=self._default_mask, btp=self._extra_mask_dict, solid_angle=False, output_workspace=output_workspace, **self._prepare_data_opts(beam_center=[0.0, 0.0]), ) return beam_center_workspace def _calculate_beam_center(self, index): """Find beam centers for all flood runs Beam center run shall be (1) masked properly (default mask + top/bottom) (2) NOT corrected by solid angle Parameters ---------- index : int beam center run index mapped to flood run Returns ------- ~tuple Beam center as xc, yc and possible wc for BIOSANS """ # Process the input run chosen to calculate the beam center beam_center_run = self._get_beam_center_run(index) if isinstance(beam_center_run, str) and beam_center_run.isdigit(): beam_center_run = int(beam_center_run) # convert to int if possible if isinstance(beam_center_run, int): beam_center_run = "{}_{}".format(self._instrument, beam_center_run) elif isinstance(beam_center_run, str): assert os.path.exists(beam_center_run), f"Bean center run {beam_center_run} cannot be found" else: raise RuntimeError(f"Beam center run {beam_center_run} is not recognized") beam_center_workspace = self._get_beam_center_workspace(beam_center_run) beam_center = FIND_BEAM_CENTER[self._instrument](beam_center_workspace) return beam_center[:-1] # last item is the fit results, useless here def _prepare_flood_data(self, flood_run, beam_center, dark_current_run): """Prepare flood data including (1) load (2) mask: default, pixels (3) center detector (4) optionally solid angle correction (5) optionally dark current correction (6) normalization Parameters ---------- flood_run: int, str flood run number of flood file path beam_center dark_current_run enforce_use_nexus_idf: bool flag to enforce to use IDF XML in NeXus file; otherwise, it may use IDF from Mantid library Returns ------- """ if dark_current_run is not None: if isinstance(dark_current_run, str) and os.path.exists(dark_current_run): pass # dark current run (given) is a data file: do nothing else: # dark current is a run number either as an integer or a string cast from integer dark_current_run = "{}_{}".format(self._instrument, dark_current_run) # Load data with masking: returning to a list of workspace references # processing includes: load, mask, normalize by monitor if isinstance(flood_run, str) and flood_run.isdigit(): # run number passed on as a string flood_run = int(flood_run) # unequivocally convert run numbers to integers if isinstance(flood_run, int): flood_run = "{}_{}".format(self._instrument, flood_run) elif isinstance(flood_run, str): # if still a string, it must be an absolute file path assert os.path.exists(flood_run) else: raise ValueError(f"Flood run {flood_run} is not recognized") flood_ws = PREPARE_DATA[self._instrument]( data=flood_run, pixel_calibration=self._apply_calibration, mask=self._default_mask, btp=self._extra_mask_dict, dark_current=dark_current_run, solid_angle=self._solid_angle_correction, **self._prepare_data_opts(beam_center), ) # Integrate all the wavelength bins if necessary if flood_ws.blocksize() != 1: flood_ws = Integration(InputWorkspace=flood_ws, OutputWorkspace=str(flood_ws)) return flood_ws def _mask_beam_center(self, flood_ws, beam_center): """Mask beam center Mask beam center with two algorithms, tried in sequence: 1. if beam center mask file is present, use it. It's assumed it will mask the beam center pixels and all detector panels but the one of interest. 1. Mask all detector pixels within a distance of property "beam_center_radius" from the beam axis. Parameters ---------- flood_ws : ~mantid.api.MatrixWorkspace Mantid workspace for flood data beam_center : tuple or str if tuple, beam centers (xc, yc, wc) / (xc, yc); str: beam center masks file Returns ------- """ if isinstance(beam_center, str): apply_mask(flood_ws, mask=beam_center) else: masking = list(circular_mask_from_beam_center(flood_ws, self._beam_center_radius)) apply_mask(flood_ws, mask=masking) # data_ws reference shall not be invalidated here! # Set uncertainties # output: masked are zero intensity and zero error masked_flood_ws = set_init_uncertainties(flood_ws) return masked_flood_ws def _export_sensitivity(self, sensitivity_ws, output_nexus_name, parent_flood_run): """Process and export sensitivities to a processed NeXus file Parameters ---------- sensitivity_ws : ~mantid.api.MatrixWorkspace MatrixWorkspace containing sensitivity and error output_nexus_name : str Output NeXus file name parent_flood_run : int Flood run number to create parent workspace for sensitivity workspace Returns ------- None """ # Create a new workspace for output instrument_name = {CG2: "GPSANS", CG3: "BIOSANS", EQSANS: "EQSANS_"}[self._instrument] if isinstance(parent_flood_run, int): event_nexus = "{}{}".format(instrument_name, parent_flood_run) else: # must be a nexus file already event_nexus = parent_flood_run assert os.path.exists(event_nexus) parent_ws = load_events(run=event_nexus, MetaDataOnly=True, LoadNexusInstrumentXML=self._enforce_use_nexus_idf) # Create new sensitivity workspace new_sens_name = "{}_new".format(str(sensitivity_ws)) new_sensitivity_ws = CreateWorkspace( DataX=sensitivity_ws.extractX().flatten(), DataY=sensitivity_ws.extractY().flatten(), DataE=sensitivity_ws.extractE().flatten(), NSpec=parent_ws.getNumberHistograms(), ParentWorkspace=parent_ws, OutputWorkspace=new_sens_name, ) # Mask detectors mask_ws_indexes = list() for iws in range(new_sensitivity_ws.getNumberHistograms()): # get the workspace with -infinity or NaN for masking if np.isnan(new_sensitivity_ws.readY(iws)[0]) or np.isinf(new_sensitivity_ws.readY(iws)[0]): mask_ws_indexes.append(iws) MaskDetectors(Workspace=new_sensitivity_ws, WorkspaceIndexList=mask_ws_indexes) # Set all the mask values to NaN new_sensitivity_ws = mtd[new_sens_name] for iws in mask_ws_indexes: new_sensitivity_ws.dataY(iws)[0] = np.nan # Save SaveNexusProcessed(InputWorkspace=new_sensitivity_ws, Filename=output_nexus_name) def _apply_transmission_correction(self, *args): r"""Calculate and apply transmission correction Warnings -------- This method is implemented only for CG3 Raises ------ NotImplementedError """ raise NotImplementedError("Transmission correction is not implemented")
[docs] def execute( self, use_moving_detector_method, min_threshold, max_threshold, output_nexus_name, enforce_use_nexus_idf=False, debug_mode=False, ): """Main workflow method to calculate sensitivities correction Parameters ---------- use_moving_detector_method : bool Flag to use 'moving detectors' method; Otherwise, use 'detector patch' method min_threshold : float minimum threshold of normalized count for GOOD pixels max_threshold : float maximum threshold of normalized count for GOOD pixels output_nexus_name : str path to the output processed NeXus file enforce_use_nexus_idf: bool flag to enforce to use IDF XML in NeXus file; otherwise, it may use IDF from Mantid library debug_mode: bool flag for debugging mode Returns ------- None """ # Number of pair of workspaces to process num_workspaces_set = len(self._flood_runs) self._enforce_use_nexus_idf = enforce_use_nexus_idf # Load beam center runs and calculate beam centers beam_centers = list() for i in range(num_workspaces_set): beam_center_i = self._calculate_beam_center(i) beam_centers.append(beam_center_i) logger.notice("Calculated beam center ({}-th) = {}".format(i, beam_center_i)) # Set default value to dark current runs if self._dark_current_runs is None: self._dark_current_runs = [None] * num_workspaces_set # Load and process flood data with (1) mask (2) center detector and (3) solid angle correction flood_workspaces = list() for i in range(num_workspaces_set): flood_ws_i = self._prepare_flood_data(self._flood_runs[i], beam_centers[i], self._dark_current_runs[i]) flood_workspaces.append(flood_ws_i) logger.notice(f"Load {i}-th flood run {self._flood_runs[i]} to " f"{flood_ws_i}") # Retrieve masked detectors before masking the beam center. These are termed "bad pixels" if not use_moving_detector_method: bad_pixels_list = list() for i in range(num_workspaces_set): bad_pixels_list.append(self._get_masked_detectors(flood_workspaces[i])) else: bad_pixels_list = [None] * num_workspaces_set # Mask beam centers for i in range(num_workspaces_set): flood_workspaces[i] = self._mask_beam_center(flood_workspaces[i], beam_centers[i]) # Transmission correction as an option if self._transmission_reference_runs is not None: for i in range(num_workspaces_set): flood_workspaces[i] = self._apply_transmission_correction( flood_workspaces[i], self._transmission_reference_runs[i], self._transmission_flood_runs[i], beam_centers[i], ) # Set the masked pixels' counts to nan and -infinity for i in range(num_workspaces_set): flood_workspaces[i] = self._set_mask_value( flood_workspaces[i], bad_pixels_list[i], use_moving_detector_method ) info = "Preparation of data is over.\n" for fws in flood_workspaces: info += ( f"{str(fws)}: Number of infinities = {len(np.where(np.isinf(fws.extractY()))[0])}," f"Number of NaNs = {len(np.where(np.isnan(fws.extractY()))[0])}\n" ) logger.notice(info) # Debug output if debug_mode: for flood_ws in flood_workspaces: SaveNexusProcessed(InputWorkspace=flood_ws, Filename=f"{str(flood_ws)}_flood.nxs") # Decide algorithm to prepare sensitivities if self._instrument in [CG2, CG3] and use_moving_detector_method is True: if debug_mode: # nan.sum all the input flood runs to check the coverage of summed counts self.sum_input_runs(flood_workspaces) # Prepare by using moving detector algorithm calculate_sensitivity_correction = CALCULATE_SENSITIVITY_CORRECTION[MOVING_DETECTORS] # Calculate sensitivities for each file sens_ws = calculate_sensitivity_correction( flood_workspaces, threshold_min=min_threshold, threshold_max=max_threshold, ) else: # Prepare by using the sensitivity patch method for a single detector (image) # Such as GPSANS, BIOSANS Main detector, BIOSANS wing detector, EQSANS calculate_sensitivity_correction = CALCULATE_SENSITIVITY_CORRECTION[PATCHING_DETECTORS] # Default polynomial order: CG3 uses order 3. Others use order 2. if self._instrument == CG3: polynomial_order = 3 else: polynomial_order = 2 # This only processes a single image, even for the Bio-SANS. # Each detector on the Bio-SANS must be treated independently sens_ws = calculate_sensitivity_correction( flood_workspaces[0], min_threshold=min_threshold, max_threshold=max_threshold, poly_order=polynomial_order, min_detectors_per_tube=50, component_name=self._component, ) # Export self._export_sensitivity(sens_ws, output_nexus_name, self._flood_runs[0])
[docs] def debug_output(workspace, output_file): """Exporting a workspace to NeXus file and HDF5 for debugging purpose Parameters ---------- workspace : numpy.ndarray data to plot output_file : str output file name as reference Returns ------- None """ # Save Nexus SaveNexusProcessed(InputWorkspace=workspace, Filename=output_file) data = workspace.extractY() data_error = workspace.extractE() # Export sensitivities calculated in file for quick review # Export to hdf5 for a quick review sens_h5 = h5py.File("{}.h5".format(output_file.split(".")[0]), "w") sens_group = sens_h5.create_group("Data") sens_group.create_dataset("Data", data=data) if data_error is not None: sens_group.create_dataset("Data error", data=data_error) sens_h5.close()