Source code for drtsans.mono.gpsans.api

""" GPSANS API """
# standard imports
from collections import namedtuple
import copy
from datetime import datetime
import os
from typing import List, Union

# third party imports
from mantid.simpleapi import logger, mtd, MaskDetectors, MoveInstrumentComponent, SaveNexusProcessed
from matplotlib.colors import LogNorm
import matplotlib.pyplot as plt

# local imports
from drtsans import getWedgeSelection, subtract_background
from drtsans.beam_finder import center_detector, fbc_options_json, find_beam_center
from drtsans.dataobjects import save_iqmod
from drtsans.instruments import extract_run_number, instrument_filesystem_name
from drtsans.iq import bin_all
from drtsans.load import move_instrument, resolve_slicing
from drtsans.mask_utils import apply_mask, load_mask
from drtsans.mono import meta_data
from drtsans.mono.absolute_units import empty_beam_scaling
from drtsans.mono.dark_current import subtract_dark_current
from drtsans.mono.gpsans import convert_to_q
from drtsans.mono.gpsans.attenuation import attenuation_factor
from drtsans.mono.load import (
    load_and_split,
    load_events,
    load_events_and_histogram,
    set_init_uncertainties,
    transform_to_wavelength,
)
from drtsans.mono.meta_data import get_sample_detector_offset, parse_json_meta_data, set_meta_data
from drtsans.mono.normalization import normalize_by_monitor, normalize_by_time
from drtsans.path import allow_overwrite
from drtsans.mono.transmission import apply_transmission_correction, calculate_transmission
from drtsans.path import abspath, abspaths, registered_workspace
from drtsans.plots import plot_detector, plot_IQazimuthal, plot_IQmod
from drtsans.reductionlog import savereductionlog
from drtsans.samplelogs import SampleLogs
from drtsans.sensitivity import apply_sensitivity_correction, load_sensitivity_workspace
from drtsans.settings import namedtuplefy
from drtsans.solid_angle import solid_angle_correction
from drtsans.thickness_normalization import normalize_by_thickness
from drtsans.save_ascii import save_ascii_binned_2D

# Functions exposed to the general user (public) API
__all__ = [
    "prepare_data",
    "prepare_data_workspaces",
    "process_single_configuration",
    "load_all_files",
    "plot_reduction_output",
    "reduce_single_configuration",
    "adjust_back_panels_to_effective_position",
]


SI_WINDOW_NOMINAL_DISTANCE_METER = 0.0  # meter, (i.e., 0. mm)
SAMPLE_SI_META_NAME = "CG2:CS:SampleToSi"


[docs] @namedtuplefy def load_all_files( reduction_input, prefix="", load_params=None, path: Union[str, List[str]] = None, use_nexus_idf: bool = False, debug_output: bool = False, back_panel_correction: bool = False, ): """load all required files at the beginning, and transform them to histograms Parameters ---------- reduction_input prefix load_params path Absolute path to one or more directories where to search for the NeXus files use_nexus_idf: bool Flag to use IDF inside NeXus file. True for SPICE data debug_output: bool Flag to save out internal result back_panel_correction: bool Move the z direction of back panel to remove artifacts in azimuthal plots Returns ------- """ reduction_config = reduction_input["configuration"] instrument_name = reduction_input["instrumentName"] ipts = reduction_input["iptsNumber"] sample = reduction_input["sample"]["runNumber"] sample_trans = reduction_input["sample"]["transmission"]["runNumber"] bkgd = reduction_input["background"]["runNumber"] bkgd_trans = reduction_input["background"]["transmission"]["runNumber"] empty = reduction_input["emptyTransmission"]["runNumber"] center = reduction_input["beamCenter"]["runNumber"] blocked_beam = reduction_config["blockedBeamRunNumber"] # Remove existing workspaces, this is to guarantee that all the data is loaded correctly # In the future this should be made optional ws_to_remove = [ f"{prefix}_{instrument_name}_{run_number}_raw_histo" for run_number in ( sample, center, bkgd, empty, sample_trans, bkgd_trans, blocked_beam, ) ] ws_to_remove.append(f"{prefix}_{instrument_name}_{sample}_raw_histo_slice_group") ws_to_remove.append(f"{prefix}_sensitivity") ws_to_remove.append(f"{prefix}_mask") if reduction_config["darkFileName"]: run_number = extract_run_number(reduction_config["darkFileName"]) ws_to_remove.append(f"{prefix}_{instrument_name}_{run_number}_raw_histo") for ws_name in ws_to_remove: if registered_workspace(ws_name): mtd.remove(ws_name) # sample offsets, etc if load_params is None: load_params = {} # load nexus idf if use_nexus_idf: load_params["LoadNexusInstrumentXML"] = use_nexus_idf # Adjust pixel heights and widths load_params["pixel_calibration"] = reduction_config.get("usePixelCalibration", False) # wave length and wave length spread ( wave_length_dict, wave_length_spread_dict, ) = meta_data.parse_json_wave_length_and_spread(reduction_input) if reduction_config["useDefaultMask"]: # reduction_config["defaultMask"] is a list of python dictionaries default_mask = reduction_config["defaultMask"] if reduction_config["defaultMask"] is not None else [] else: default_mask = [] if path is None: path = f"/HFIR/{instrument_filesystem_name(instrument_name)}/IPTS-{ipts}/nexus" assert os.path.exists(path), "NeXus file path {} does not exist".format(path) # sample thickness # thickness is written to sample log if it is defined... thickness = reduction_input["sample"]["thickness"] # sample aperture diameter in mm sample_aperture_diameter = reduction_config["sampleApertureSize"] # source aperture diameter in mm source_aperture_diameter = reduction_config["sourceApertureDiameter"] # Overwriting pixel size X and pixel size Y smearing_pixel_size_x_dict = parse_json_meta_data( reduction_input, "smearingPixelSizeX", 1e-3, beam_center_run=True, background_run=True, empty_transmission_run=True, transmission_run=True, background_transmission=True, block_beam_run=True, dark_current_run=True, ) smearing_pixel_size_y_dict = parse_json_meta_data( reduction_input, "smearingPixelSizeY", 1e-3, beam_center_run=True, background_run=True, empty_transmission_run=True, transmission_run=True, background_transmission=True, block_beam_run=True, dark_current_run=True, ) # Retrieve parameters for overwriting geometry related meta data # Sample to Si-window distance swd_value_dict = parse_json_meta_data( reduction_input, "sampleToSi", 1e-3, beam_center_run=True, background_run=True, empty_transmission_run=True, transmission_run=True, background_transmission=True, block_beam_run=True, dark_current_run=False, ) # Sample to detector distance sdd_value_dict = parse_json_meta_data( reduction_input, "sampleDetectorDistance", 1.0, beam_center_run=True, background_run=True, empty_transmission_run=True, transmission_run=True, background_transmission=True, block_beam_run=True, dark_current_run=False, ) # check for time/log slicing timeslice, logslice = resolve_slicing(reduction_input) # special loading case for sample to allow the slicing options logslice_data_dict = {} if timeslice or logslice: # Load data and split ws_name = f"{prefix}_{instrument_name}_{sample}_raw_histo_slice_group" if not registered_workspace(ws_name): filename = abspath(sample.strip(), instrument=instrument_name, ipts=ipts, directory=path) logger.notice(f"Loading filename {filename} to slice") if timeslice: timesliceinterval = reduction_config["timeSliceInterval"] timesliceperiod = reduction_config["timeSlicePeriod"] logslicename = logsliceinterval = None elif logslice: timesliceinterval, timesliceperiod = None, None logslicename = reduction_config["logSliceName"] logsliceinterval = reduction_config["logSliceInterval"] load_and_split( filename, output_workspace=ws_name, time_interval=timesliceinterval, time_offset=reduction_config["timeSliceOffset"], time_period=timesliceperiod, log_name=logslicename, log_value_interval=logsliceinterval, sample_to_si_name=SAMPLE_SI_META_NAME, si_nominal_distance=SI_WINDOW_NOMINAL_DISTANCE_METER, sample_to_si_value=swd_value_dict[meta_data.SAMPLE], sample_detector_distance_value=sdd_value_dict[meta_data.SAMPLE], **load_params, ) for _w in mtd[ws_name]: # Overwrite meta data set_meta_data( str(_w), wave_length=wave_length_dict[meta_data.SAMPLE], wavelength_spread=wave_length_spread_dict[meta_data.SAMPLE], sample_thickness=thickness, sample_aperture_diameter=sample_aperture_diameter, source_aperture_diameter=source_aperture_diameter, smearing_pixel_size_x=smearing_pixel_size_x_dict[meta_data.SAMPLE], smearing_pixel_size_y=smearing_pixel_size_y_dict[meta_data.SAMPLE], ) # Transform X-axis to wave length with spread _w = transform_to_wavelength(_w) _w = set_init_uncertainties(_w) for btp_params in default_mask: apply_mask(_w, **btp_params) if logslicename is not None: for n in range(mtd[ws_name].getNumberOfEntries()): samplelogs = SampleLogs(mtd[ws_name].getItem(n)) logslice_data_dict[str(n)] = { "data": list(samplelogs[logslicename].value), "units": samplelogs[logslicename].units, "name": logslicename, } else: # Load single data ws_name = f"{prefix}_{instrument_name}_{sample}_raw_histo" if not registered_workspace(ws_name): filename = abspaths(sample, instrument=instrument_name, ipts=ipts, directory=path) logger.notice(f"Loading filename {filename} to {ws_name}") load_events_and_histogram( filename, output_workspace=ws_name, sample_to_si_name=SAMPLE_SI_META_NAME, si_nominal_distance=SI_WINDOW_NOMINAL_DISTANCE_METER, sample_to_si_value=swd_value_dict[meta_data.SAMPLE], sample_detector_distance_value=sdd_value_dict[meta_data.SAMPLE], **load_params, ) # Overwrite meta data set_meta_data( ws_name, wave_length=wave_length_dict[meta_data.SAMPLE], wavelength_spread=wave_length_spread_dict[meta_data.SAMPLE], sample_thickness=thickness, sample_aperture_diameter=sample_aperture_diameter, source_aperture_diameter=source_aperture_diameter, smearing_pixel_size_x=smearing_pixel_size_x_dict[meta_data.SAMPLE], smearing_pixel_size_y=smearing_pixel_size_y_dict[meta_data.SAMPLE], ) # Re-transform to wave length if overwriting values are specified if wave_length_dict[meta_data.SAMPLE]: transform_to_wavelength(ws_name) logger.information( "[META] Wavelength range is from {} to {}" "".format(mtd[ws_name].readX(0)[0], mtd[ws_name].readX(0)[1]) ) # Apply mask for btp_params in default_mask: apply_mask(ws_name, **btp_params) reduction_input["logslice_data"] = logslice_data_dict # load all other files for run_number, run_type in [ (center, meta_data.BEAM_CENTER), (bkgd, meta_data.BACKGROUND), (empty, meta_data.EMPTY_TRANSMISSION), (sample_trans, meta_data.TRANSMISSION), (bkgd_trans, meta_data.TRANSMISSION_BACKGROUND), (blocked_beam, meta_data.BLOCK_BEAM), ]: if run_number: ws_name = f"{prefix}_{instrument_name}_{run_number}_raw_histo" if not registered_workspace(ws_name): filename = abspaths(run_number, instrument=instrument_name, ipts=ipts, directory=path) logger.notice(f"Loading {run_type} filename {filename} to {ws_name}") load_events_and_histogram( filename, output_workspace=ws_name, sample_to_si_name=SAMPLE_SI_META_NAME, si_nominal_distance=SI_WINDOW_NOMINAL_DISTANCE_METER, sample_to_si_value=swd_value_dict[run_type], sample_detector_distance_value=sdd_value_dict[run_type], **load_params, ) # Set the wave length and wave length spread set_meta_data( ws_name, wave_length=wave_length_dict[run_type], wavelength_spread=wave_length_spread_dict[run_type], sample_thickness=None, sample_aperture_diameter=None, source_aperture_diameter=None, smearing_pixel_size_x=smearing_pixel_size_x_dict[run_type], smearing_pixel_size_y=smearing_pixel_size_y_dict[run_type], ) if wave_length_dict[run_type]: # Transform X-axis to wave length with spread transform_to_wavelength(ws_name) for btp_params in default_mask: apply_mask(ws_name, **btp_params) # do the same for dark current if exists dark_current_file = reduction_config["darkFileName"] if dark_current_file: # dark current file is specified and thus loaded run_number = extract_run_number(dark_current_file) ws_name = f"{prefix}_{instrument_name}_{run_number}_raw_histo" if not registered_workspace(ws_name): # load dark current file logger.notice(f"Loading dark current file {dark_current_file} to {ws_name}") # identify to use exact given path to NeXus or use OnCat instead dark_current_path = abspath(str(run_number), instrument=instrument_name, directory=path) if os.path.exists(dark_current_path): dark_current_file = dark_current_path load_events_and_histogram( dark_current_file, output_workspace=ws_name, sample_to_si_name=SAMPLE_SI_META_NAME, si_nominal_distance=SI_WINDOW_NOMINAL_DISTANCE_METER, sample_to_si_value=swd_value_dict[meta_data.DARK_CURRENT], sample_detector_distance_value=sdd_value_dict[meta_data.DARK_CURRENT], **load_params, ) # Set the wave length and wave length spread set_meta_data( ws_name, wave_length=wave_length_dict[meta_data.DARK_CURRENT], wavelength_spread=wave_length_spread_dict[meta_data.DARK_CURRENT], sample_thickness=None, sample_aperture_diameter=None, source_aperture_diameter=None, smearing_pixel_size_x=smearing_pixel_size_x_dict[meta_data.DARK_CURRENT], smearing_pixel_size_y=smearing_pixel_size_y_dict[meta_data.DARK_CURRENT], ) # Re-transform X-axis to wave length with spread if wave_length_dict[meta_data.DARK_CURRENT]: transform_to_wavelength(ws_name) for btp_params in default_mask: apply_mask(ws_name, **btp_params) dark_current = mtd[ws_name] else: # being loaded previously dark_current = mtd[ws_name] else: # No dark current (correction) is specified dark_current = None # load required processed_files sensitivity_ws_name = None flood_file = reduction_config["sensitivityFileName"] if flood_file: sensitivity_ws_name = f"{prefix}_sensitivity" if not registered_workspace(sensitivity_ws_name): logger.notice(f"Loading flood file {flood_file} to {sensitivity_ws_name}") load_sensitivity_workspace(flood_file, output_workspace=sensitivity_ws_name) mask_ws = None custom_mask_file = reduction_config["maskFileName"] if custom_mask_file is not None: mask_ws_name = f"{prefix}_mask" if not registered_workspace(mask_ws_name): logger.notice(f"Loading user mask file {custom_mask_file} to {mask_ws_name}") mask_ws = load_mask(custom_mask_file, output_workspace=mask_ws_name) else: mask_ws = mtd[mask_ws_name] if registered_workspace(f"{prefix}_{instrument_name}_{sample}_raw_histo_slice_group"): raw_sample_ws = mtd[f"{prefix}_{instrument_name}_{sample}_raw_histo_slice_group"] raw_sample_ws_list = [w for w in raw_sample_ws] else: raw_sample_ws_list = [mtd[f"{prefix}_{instrument_name}_{sample}_raw_histo"]] raw_bkgd_ws = mtd[f"{prefix}_{instrument_name}_{bkgd}_raw_histo"] if bkgd else None raw_blocked_ws = mtd[f"{prefix}_{instrument_name}_{blocked_beam}_raw_histo"] if blocked_beam else None raw_center_ws = mtd[f"{prefix}_{instrument_name}_{center}_raw_histo"] raw_empty_ws = mtd[f"{prefix}_{instrument_name}_{empty}_raw_histo"] if empty else None raw_sample_trans_ws = mtd[f"{prefix}_{instrument_name}_{sample_trans}_raw_histo"] if sample_trans else None raw_bkg_trans_ws = mtd[f"{prefix}_{instrument_name}_{bkgd_trans}_raw_histo"] if bkgd_trans else None sensitivity_ws = mtd[sensitivity_ws_name] if sensitivity_ws_name else None # Plot if debug_output: for raw_sample in raw_sample_ws_list: plot_detector( input_workspace=str(raw_sample), filename=form_output_name(raw_sample), backend="mpl", ) for ws in [ raw_bkgd_ws, raw_center_ws, raw_empty_ws, raw_sample_trans_ws, raw_bkg_trans_ws, raw_blocked_ws, dark_current, ]: if ws is not None: plot_detector( input_workspace=str(ws), filename=form_output_name(ws), backend="mpl", ) # Apply the backpanel correction on z direction to remove the # artifacts in 2D azimuthal patterns if back_panel_correction == True: for ws in raw_sample_ws_list: if ws is not None: adjust_back_panels_to_effective_position(ws) if raw_bkgd_ws is not None: adjust_back_panels_to_effective_position(raw_bkgd_ws) if raw_center_ws is not None: adjust_back_panels_to_effective_position(raw_center_ws) if raw_empty_ws is not None: adjust_back_panels_to_effective_position(raw_empty_ws) if raw_sample_trans_ws is not None: adjust_back_panels_to_effective_position(raw_sample_trans_ws) if raw_bkg_trans_ws is not None: adjust_back_panels_to_effective_position(raw_bkg_trans_ws) if raw_blocked_ws is not None: adjust_back_panels_to_effective_position(raw_blocked_ws) if dark_current is not None: adjust_back_panels_to_effective_position(dark_current) if sensitivity_ws is not None: adjust_back_panels_to_effective_position(sensitivity_ws) if mask_ws is not None: adjust_back_panels_to_effective_position(mask_ws) return dict( sample=raw_sample_ws_list, background=raw_bkgd_ws, center=raw_center_ws, empty=raw_empty_ws, sample_transmission=raw_sample_trans_ws, background_transmission=raw_bkg_trans_ws, blocked_beam=raw_blocked_ws, dark_current=dark_current, sensitivity=sensitivity_ws, mask=mask_ws, )
[docs] def prepare_data( data, pixel_calibration=False, mask_detector=None, detector_offset=0, sample_offset=0, center_x=None, center_y=None, dark_current=None, flux_method=None, monitor_fail_switch=False, mask=None, mask_panel=None, btp=dict(), solid_angle=True, sensitivity_file_path=None, sensitivity_workspace=None, wave_length=None, wavelength_spread=None, sample_aperture_diameter=None, sample_thickness=None, source_aperture_diameter=None, smearing_pixel_size_x=None, smearing_pixel_size_y=None, output_workspace=None, output_suffix="", **kwargs, ): r""" Load a GPSANS data file and bring the data to a point where it can be used. This includes applying basic corrections that are always applied regardless of whether the data is background or scattering data. Parameters ---------- data: int, str, ~mantid.api.IEventWorkspace Run number as int or str, file path, :py:obj:`~mantid.api.IEventWorkspace` pixel_calibration: bool, str Adjust pixel heights and widths according to barscan and tube-width calibrations. Options are (1) No calibration (False), (2) default calibration file (True) (3) user specified calibration file (str) mask_detector: str Name of an instrument component to mask detector_offset: float Additional translation of the detector along Z-axis, in millimeters. sample_offset: float Additional translation of the sample along the Z-axis, in millimeters. center_x: float Move the center of the detector to this X-coordinate. If :py:obj:`None`, the detector will be moved such that the X-coordinate of the intersection point between the neutron beam and the detector array will have ``x=0``. center_y: float Move the center of the detector to this Y-coordinate. If :py:obj:`None`, the detector will be moved such that the Y-coordinate of the intersection point between the neutron beam and the detector array will have ``y=0``. dark_current: int, str, ~mantid.api.IEventWorkspace Run number as int or str, file path, :py:obj:`~mantid.api.IEventWorkspace` flux_method: str Method for flux normalization. Either 'monitor', or 'time'. monitor_fail_switch: bool Resort to 'time' normalization if 'monitor' was selected but no monitor counts are available mask_panel: str Either 'front' or 'back' to mask whole front or back panel. mask: mask file path, MaskWorkspace, list Additional mask to be applied. If `list`, it is a list of detector ID's. btp: dict Additional properties to Mantid's MaskBTP algorithm solid_angle: bool Apply the solid angle correction sensitivity_file_path: str file containing previously calculated sensitivity correction sensitivity_workspace: str, ~mantid.api.MatrixWorkspace workspace containing previously calculated sensitivity correction. This overrides the sensitivity_filename if both are provided. wave_length: float, None wave length in Angstrom wavelength_spread: float, None wave length spread in Angstrom sample_aperture_diameter: float, None sample aperture diameter in mm sample_thickness: None, float sample thickness in unit cm source_aperture_diameter: float, None source aperture size radius in unit mm smearing_pixel_size_x: float, None pixel size in x direction in unit as meter, only for Q-resolution calculation smearing_pixel_size_y: float, None pixel size in Y direction in unit as meter, only for Q-resolution calculation output_workspace: str Name of the output workspace. If not supplied, will be determined from the supplied value of ``data``. output_suffix: str If the ``output_workspace`` is not specified, this is appended to the automatically generated output workspace name. Returns ------- ~mantid.api.IEventWorkspace Reference to the events workspace """ # Detector offset and sample offset are disabled if abs(detector_offset) > 1e-8 or abs(sample_offset) > 1e-8: raise RuntimeError("gpsans.api.prepare_data does not work with detector_offset or sample_offset") # Load data and enforce to use nexus IDF if "enforce_use_nexus_idf" in kwargs: enforce_use_nexus_idf = kwargs["enforce_use_nexus_idf"] else: enforce_use_nexus_idf = False # GPSANS: detector offset is fixed to 0. Only detector sample distance is essential. # So one offset is sufficient ws = load_events( data, overwrite_instrument=True, output_workspace=output_workspace, output_suffix=output_suffix, pixel_calibration=pixel_calibration, detector_offset=0, sample_offset=sample_offset, LoadNexusInstrumentXML=enforce_use_nexus_idf, ) # Reset the offset sample_offset, detector_offset = get_sample_detector_offset( ws, SAMPLE_SI_META_NAME, SI_WINDOW_NOMINAL_DISTANCE_METER ) # Translate instrument with offsets move_instrument(ws, sample_offset, detector_offset) ws_name = str(ws) transform_to_wavelength(ws_name) set_init_uncertainties(ws_name) if center_x is not None and center_y is not None: center_detector(ws_name, center_x=center_x, center_y=center_y) # Dark current if dark_current is not None: if registered_workspace(str(dark_current)): dark_ws = mtd[str(dark_current)] else: dark_ws = load_events(dark_current, overwrite_instrument=True) dark_ws = transform_to_wavelength(dark_ws) dark_ws = set_init_uncertainties(dark_ws) else: dark_ws = None # load sensitivity if sensitivity_workspace is None and sensitivity_file_path: sensitivity_workspace = os.path.split(sensitivity_file_path)[-1] sensitivity_workspace = sensitivity_workspace.split(".")[0] sensitivity_workspace = load_sensitivity_workspace(sensitivity_file_path, sensitivity_workspace) # Mask either detector if mask_detector is not None: MaskDetectors(ws_name, ComponentList=mask_detector) # Overwrite meta data set_meta_data( ws_name, wave_length, wavelength_spread, sample_offset, sample_aperture_diameter, sample_thickness, source_aperture_diameter, smearing_pixel_size_x, smearing_pixel_size_y, ) return prepare_data_workspaces( ws_name, center_x=center_x, center_y=center_y, dark_current=dark_current, flux_method=flux_method, monitor_fail_switch=monitor_fail_switch, mask_ws=mask, mask_panel=mask_panel, mask_btp=btp, solid_angle=solid_angle, sensitivity_workspace=sensitivity_workspace, output_workspace_name=output_workspace, debug=False, )
[docs] def prepare_data_workspaces( data, center_x=None, center_y=None, dark_current=None, flux_method=None, # normalization (time/monitor) monitor_fail_switch=False, mask_ws=None, # apply a custom mask from workspace mask_panel=None, # mask back or front panel mask_btp=None, # mask bank/tube/pixel solid_angle=True, sensitivity_workspace=None, output_workspace_name=None, debug=False, ): r""" Given a " raw"data workspace, this function provides the following: - centers the detector - subtracts dark current - normalize by time or monitor - applies masks - corrects for solid angle - corrects for sensitivity All steps are optional. data, mask_ws, dark_current are either None or histogram workspaces. This function does not load any file. Parameters ---------- data: ~mantid.dataobjects.Workspace2D raw workspace (histogram) center_x: float Move the center of the detector to this X-coordinate. If :py:obj:`None`, the detector will be moved such that the X-coordinate of the intersection point between the neutron beam and the detector array will have ``x=0``. center_y: float Move the center of the detector to this Y-coordinate. If :py:obj:`None`, the detector will be moved such that the Y-coordinate of the intersection point between the neutron beam and the detector array will have ``y=0``. dark_current: ~mantid.dataobjects.Workspace2D histogram workspace containing the dark current measurement flux_method: str Method for flux normalization. Either 'monitor', or 'time'. monitor_fail_switch: bool Resort to 'time' normalization if 'monitor' was selected but no monitor counts are available mask_ws: ~mantid.dataobjects.Workspace2D Mask workspace mask_panel: str Either 'front' or 'back' to mask whole front or back panel. mask_btp: dict Additional properties to Mantid's MaskBTP algorithm solid_angle: bool Apply the solid angle correction sensitivity_workspace: str, ~mantid.api.MatrixWorkspace workspace containing previously calculated sensitivity correction. This overrides the sensitivity_filename if both are provided. output_workspace_name: str The output workspace name. If None will create data.name()+output_suffix debug: bool Flag for debugging output Returns ------- ~mantid.dataobjects.Workspace2D Reference to the processed workspace """ if not data: raise RuntimeError("Input workspace of prepare data cannot be None!") if not output_workspace_name: output_workspace_name = str(data) output_workspace_name = output_workspace_name.replace("_raw_histo", "") + "_processed_histo" mtd[str(data)].clone(OutputWorkspace=output_workspace_name) # name gets into workspace if center_x is not None and center_y is not None: center_detector(output_workspace_name, center_x=center_x, center_y=center_y) if debug: SaveNexusProcessed( InputWorkspace=output_workspace_name, Filename=f"{output_workspace_name}_after_center.nxs", ) # Dark current if dark_current is not None: subtract_dark_current(output_workspace_name, dark_current) # Normalization if str(flux_method).lower() == "monitor": try: normalize_by_monitor(output_workspace_name) except RuntimeError as e: if monitor_fail_switch: logger.warning(f"{e}. Resorting to normalization by time") normalize_by_time(output_workspace_name) else: msg = ( '. Setting configuration "normalizationResortToTime": True will cause the' " reduction to normalize by time if monitor counts are not available" ) raise RuntimeError(str(e) + msg) elif str(flux_method).lower() == "time": normalize_by_time(output_workspace_name) else: logger.notice("No time or monitor normalization is carried out") # Additional masks if mask_btp is None: mask_btp = dict() if debug: # output masking information logger.notice(f"mask panel: {mask_panel}\n" f"mask ws : {str(mask_ws)}\n" f"mask btp : {mask_btp}") if mask_ws is not None: SaveNexusProcessed( InputWorkspace=mask_ws, Filename=f"{output_workspace_name}_{str(mask_ws)}.nxs", ) apply_mask(output_workspace_name, panel=mask_panel, mask=mask_ws, **mask_btp) if debug: SaveNexusProcessed( InputWorkspace=output_workspace_name, Filename=f"{output_workspace_name}_after_mask.nxs", ) # Solid angle if solid_angle: solid_angle_correction(output_workspace_name) if debug: SaveNexusProcessed( InputWorkspace=output_workspace_name, Filename=f"{output_workspace_name}_before_sens.nxs", ) # Sensitivity if sensitivity_workspace is not None: apply_sensitivity_correction(output_workspace_name, sensitivity_workspace=sensitivity_workspace) if debug: out_ws = mtd[output_workspace_name] plot_detector( input_workspace=str(out_ws), filename=f"prepared{form_output_name(out_ws)}", backend="mpl", imshow_kwargs={"norm": LogNorm(vmin=1)}, ) return mtd[output_workspace_name]
[docs] def process_single_configuration( sample_ws_raw, sample_trans_ws=None, sample_trans_value=None, bkg_ws_raw=None, bkg_trans_ws=None, bkg_trans_value=None, blocked_ws_raw=None, theta_deppendent_transmission=True, center_x=None, center_y=None, dark_current=None, flux_method=None, # normalization (time/monitor) monitor_fail_switch=False, mask_ws=None, # apply a custom mask from workspace mask_panel=None, # mask back or front panel mask_btp=None, # mask bank/tube/pixel solid_angle=True, sensitivity_workspace=None, output_workspace=None, output_suffix="", thickness=1.0, absolute_scale_method="standard", empty_beam_ws=None, beam_radius=None, absolute_scale=1.0, keep_processed_workspaces=True, debug=False, ): r""" This function provides full data processing for a single experimental configuration, starting from workspaces (no data loading is happening inside this function) Parameters ---------- sample_ws_raw: ~mantid.dataobjects.Workspace2D raw data histogram workspace sample_trans_ws: ~mantid.dataobjects.Workspace2D optional histogram workspace for sample transmission sample_trans_value: float optional value for sample transmission bkg_ws_raw: ~mantid.dataobjects.Workspace2D optional raw histogram workspace for background bkg_trans_ws: ~mantid.dataobjects.Workspace2D optional histogram workspace for background transmission bkg_trans_value: float optional value for background transmission blocked_ws_raw: ~mantid.dataobjects.Workspace2D optional histogram workspace for blocked beam theta_deppendent_transmission: bool flag to apply angle dependent transmission center_x: float x center for the beam center_y: float y center for the beam dark_current: ~mantid.dataobjects.Workspace2D dark current workspace flux_method: str normalization by time or monitor monitor_fail_switch: bool resort to 'time' normalization if 'monitor' was selected but no monitor counts are available mask_ws: ~mantid.dataobjects.Workspace2D user defined mask mask_panel: str mask fron or back panel mask_btp: dict optional bank, tube, pixel to mask solid_angle: bool flag to apply solid angle sensitivity_workspace: ~mantid.dataobjects.Workspace2D workspace containing sensitivity output_workspace: str output workspace name output_suffix:str suffix for output workspace thickness: float sample thickness (cm) absolute_scale_method: str method to do absolute scaling (standard or direct_beam) empty_beam_ws: ~mantid.dataobjects.Workspace2D empty beam workspace for absolute scaling beam_radius: float beam radius for absolute scaling absolute_scale: float absolute scaling value for standard method keep_processed_workspaces: bool flag to keep the processed blocked beam and background workspaces debug: bool flag to do some debugging output Returns ------- ~mantid.dataobjects.Workspace2D Reference to the processed workspace """ if debug: SaveNexusProcessed(InputWorkspace=sample_ws_raw, Filename="sample_raw", Title="Raw") if not output_workspace: output_workspace = output_suffix + "_sample.nxs" # create a common configuration for prepare data prepare_data_conf = { "center_x": center_x, "center_y": center_y, "dark_current": dark_current, "flux_method": flux_method, "monitor_fail_switch": monitor_fail_switch, "mask_ws": mask_ws, "mask_panel": mask_panel, "mask_btp": mask_btp, "solid_angle": solid_angle, "sensitivity_workspace": sensitivity_workspace, } # process blocked if blocked_ws_raw: blocked_ws_name = output_suffix + "_blocked" if not registered_workspace(blocked_ws_name): blocked_ws = prepare_data_workspaces( blocked_ws_raw, output_workspace_name=blocked_ws_name, debug=debug, **prepare_data_conf, ) else: blocked_ws = mtd[blocked_ws_name] # process sample sample_ws = prepare_data_workspaces( sample_ws_raw, output_workspace_name=output_workspace, debug=debug, **prepare_data_conf, ) if debug: SaveNexusProcessed( InputWorkspace=sample_ws, Filename="sample_prepared.nxs", Title=f"Prepared Data Workspace: From {str(sample_ws_raw)} to {str(sample_ws)}", ) # raise NotImplementedError('DEBUG STOP') if blocked_ws_raw: sample_ws = subtract_background(sample_ws, blocked_ws) if debug: SaveNexusProcessed( InputWorkspace=sample_ws, Filename="sample_block_subtracted", Title="Block run subtracted", ) # apply transmission to the sample if sample_trans_ws or sample_trans_value: sample_ws = apply_transmission_correction( sample_ws, trans_workspace=sample_trans_ws, trans_value=sample_trans_value, theta_dependent=theta_deppendent_transmission, output_workspace=output_workspace, ) if debug: SaveNexusProcessed( InputWorkspace=sample_ws, Filename="sample_trans_correction", Title="Transmission corrected", ) # process background, if not already processed if bkg_ws_raw: bkgd_ws_name = output_suffix + "_background" if not registered_workspace(bkgd_ws_name): bkgd_ws = prepare_data_workspaces(bkg_ws_raw, output_workspace_name=bkgd_ws_name, **prepare_data_conf) if blocked_ws_raw: bkgd_ws = subtract_background(bkgd_ws, blocked_ws) # apply transmission to bkgd if bkg_trans_ws or bkg_trans_value: bkgd_ws = apply_transmission_correction( bkgd_ws, trans_workspace=bkg_trans_ws, trans_value=bkg_trans_value, theta_dependent=theta_deppendent_transmission, output_workspace=bkgd_ws_name, ) else: bkgd_ws = mtd[bkgd_ws_name] # subtract background sample_ws = subtract_background(sample_ws, bkgd_ws) if not keep_processed_workspaces: bkgd_ws.delete() if blocked_ws_raw and not keep_processed_workspaces: blocked_ws.delete() # finalize with absolute scale and thickness sample_ws = normalize_by_thickness(sample_ws, thickness) # standard method assumes absolute scale from outside if absolute_scale_method == "direct_beam": try: empty = mtd[str(empty_beam_ws)] except KeyError: raise ValueError(f"Could not find empty beam {str(empty_beam_ws)}") ac, ace = attenuation_factor(empty) empty_beam_scaling( sample_ws, empty, beam_radius=beam_radius, unit="mm", attenuator_coefficient=ac, attenuator_error=ace, output_workspace=output_workspace, ) else: sample_ws *= absolute_scale # Final debug output if debug: plot_detector( input_workspace=str(output_workspace), filename=f"final_processed_{form_output_name(output_workspace)}", backend="mpl", imshow_kwargs={"norm": LogNorm(vmin=1)}, ) SaveNexusProcessed( InputWorkspace=output_workspace, Filename=f"final_processed_{form_output_name(output_workspace)}.nxs", ) return mtd[output_workspace]
[docs] def reduce_single_configuration(loaded_ws, reduction_input, prefix="", skip_nan=True, debug_output=False): reduction_config = reduction_input["configuration"] flux_method = reduction_config["normalization"] monitor_fail_switch = reduction_config["normalizationResortToTime"] transmission_radius = reduction_config["mmRadiusForTransmission"] solid_angle = reduction_config["useSolidAngleCorrection"] sample_trans_value = reduction_input["sample"]["transmission"]["value"] bkg_trans_value = reduction_input["background"]["transmission"]["value"] theta_deppendent_transmission = reduction_config["useThetaDepTransCorrection"] mask_panel = None if reduction_config["useMaskBackTubes"]: mask_panel = "back" output_suffix = "" thickness = reduction_input["sample"]["thickness"] absolute_scale_method = reduction_config["absoluteScaleMethod"] beam_radius = reduction_config["DBScalingBeamRadius"] absolute_scale = reduction_config["StandardAbsoluteScale"] output_dir = reduction_config["outputDir"] nybins_main = nxbins_main = reduction_config["numQxQyBins"] bin1d_type = reduction_config["1DQbinType"] log_binning = reduction_config["QbinType"] == "log" # FIXME - NO MORE EVENT DECADES: even_decades = reduction_config.get("LogQBinsEvenDecade", False) decade_on_center = reduction_config.get("useLogQBinsDecadeCenter", False) nbins_main = reduction_config.get("numQBins") nbins_main_per_decade = reduction_config.get("LogQBinsPerDecade") outputFilename = reduction_input["outputFileName"] weighted_errors = reduction_config["useErrorWeighting"] qmin = reduction_config["Qmin"] qmax = reduction_config["Qmax"] annular_bin = reduction_config["AnnularAngleBin"] wedges_min = reduction_config["WedgeMinAngles"] wedges_max = reduction_config["WedgeMaxAngles"] wedges = None if wedges_min is None or wedges_max is None else list(zip(wedges_min, wedges_max)) # automatically determine wedge binning if it wasn't explicitly set autoWedgeOpts = {} symmetric_wedges = True if bin1d_type == "wedge" and wedges_min is None: # the JSON validator "wedgesources" guarantees that the parameters to be collected are all non-empty autoWedgeOpts = { "q_min": reduction_config["autoWedgeQmin"], "q_delta": reduction_config["autoWedgeQdelta"], "q_max": reduction_config["autoWedgeQmax"], "azimuthal_delta": reduction_config["autoWedgeAzimuthalDelta"], "peak_width": reduction_config["autoWedgePeakWidth"], "background_width": reduction_config["autoWedgeBackgroundWidth"], "signal_to_noise_min": reduction_config["autoWedgeSignalToNoiseMin"], "peak_search_window_size_factor": reduction_config["autoWedgePeakSearchWindowSizeFactor"], } # auto-aniso returns all of the wedges symmetric_wedges = False logger.debug(f'Wedge peak search window size factor: {autoWedgeOpts["peak_search_window_size_factor"]}') fbc_options = fbc_options_json(reduction_input) xc, yc, fit_results = find_beam_center(loaded_ws.center, **fbc_options) logger.notice(f"Find beam center = {xc}, {yc}") # process the center if using it in absolute scaling if absolute_scale_method == "direct_beam": processed_center_ws_name = f"{prefix}_processed_center" processed_center_ws = prepare_data_workspaces( loaded_ws.center, flux_method=flux_method, monitor_fail_switch=monitor_fail_switch, center_x=xc, center_y=yc, solid_angle=False, sensitivity_workspace=loaded_ws.sensitivity, output_workspace_name=processed_center_ws_name, debug=debug_output, ) else: processed_center_ws = None # empty beam transmission workspace if loaded_ws.empty is not None: empty_trans_ws_name = f"{prefix}_empty" empty_trans_ws = prepare_data_workspaces( loaded_ws.empty, flux_method=flux_method, monitor_fail_switch=monitor_fail_switch, center_x=xc, center_y=yc, solid_angle=False, sensitivity_workspace=loaded_ws.sensitivity, output_workspace_name=empty_trans_ws_name, ) else: empty_trans_ws = None # background transmission background_transmission_dict = {} if loaded_ws.background_transmission is not None and empty_trans_ws is not None: bkgd_trans_ws_name = f"{prefix}_bkgd_trans" bkgd_trans_ws_processed = prepare_data_workspaces( loaded_ws.background_transmission, flux_method=flux_method, monitor_fail_switch=monitor_fail_switch, center_x=xc, center_y=yc, solid_angle=False, sensitivity_workspace=loaded_ws.sensitivity, output_workspace_name=bkgd_trans_ws_name, ) bkgd_trans_ws = calculate_transmission( bkgd_trans_ws_processed, empty_trans_ws, radius=transmission_radius, radius_unit="mm", ) logger.notice(f"Background transmission = {bkgd_trans_ws.extractY()[0, 0]}") background_transmission_dict = { "value": bkgd_trans_ws.extractY(), "error": bkgd_trans_ws.extractE(), } else: bkgd_trans_ws = None # sample transmission sample_transmission_dict = {} if loaded_ws.sample_transmission is not None and empty_trans_ws is not None: sample_trans_ws_name = f"{prefix}_sample_trans" sample_trans_ws_processed = prepare_data_workspaces( loaded_ws.sample_transmission, flux_method=flux_method, monitor_fail_switch=monitor_fail_switch, center_x=xc, center_y=yc, solid_angle=False, sensitivity_workspace=loaded_ws.sensitivity, output_workspace_name=sample_trans_ws_name, ) sample_trans_ws = calculate_transmission( sample_trans_ws_processed, empty_trans_ws, radius=transmission_radius, radius_unit="mm", ) logger.notice(f"Sample transmission = {sample_trans_ws.extractY()[0, 0]}") sample_transmission_dict = { "value": sample_trans_ws.extractY(), "error": sample_trans_ws.extractE(), } else: sample_trans_ws = None output = [] detectordata = {} for i, raw_sample_ws in enumerate(loaded_ws.sample): name = "_slice_{}".format(i + 1) if len(loaded_ws.sample) > 1: output_suffix = f"_{i}" processed_data_main = process_single_configuration( raw_sample_ws, sample_trans_ws=sample_trans_ws, sample_trans_value=sample_trans_value, bkg_ws_raw=loaded_ws.background, bkg_trans_ws=bkgd_trans_ws, bkg_trans_value=bkg_trans_value, blocked_ws_raw=loaded_ws.blocked_beam, theta_deppendent_transmission=theta_deppendent_transmission, center_x=xc, center_y=yc, dark_current=loaded_ws.dark_current, flux_method=flux_method, monitor_fail_switch=monitor_fail_switch, mask_ws=loaded_ws.mask, mask_panel=mask_panel, solid_angle=solid_angle, sensitivity_workspace=loaded_ws.sensitivity, output_workspace="processed_data_main", output_suffix=output_suffix, thickness=thickness, absolute_scale_method=absolute_scale_method, empty_beam_ws=processed_center_ws, beam_radius=beam_radius, absolute_scale=absolute_scale, keep_processed_workspaces=False, debug=debug_output, ) # binning subpixel_kwargs = dict() if reduction_config["useSubpixels"] is True: subpixel_kwargs = { "n_horizontal": reduction_config["subpixelsX"], "n_vertical": reduction_config["subpixelsY"], } iq1d_main_in = convert_to_q(processed_data_main, mode="scalar", **subpixel_kwargs) iq2d_main_in = convert_to_q(processed_data_main, mode="azimuthal", **subpixel_kwargs) if bool(autoWedgeOpts): # determine wedges automatically logger.notice(f"Auto wedge options: {autoWedgeOpts}") autoWedgeOpts["debug_dir"] = output_dir wedges = getWedgeSelection(iq2d_main_in, **autoWedgeOpts) logger.notice( f"found wedge angles:\n" f" peak: {wedges[0]}\n" f" background: {wedges[1]}" ) # sanity check assert len(wedges) == 2, f"Auto-wedges {wedges} shall have 2 2-tuples" # set the found wedge values to the reduction input, this will allow correct plotting reduction_config["wedges"] = wedges reduction_config["symmetric_wedges"] = symmetric_wedges iq2d_main_out, iq1d_main_out = bin_all( iq2d_main_in, iq1d_main_in, nxbins_main, nybins_main, n1dbins=nbins_main, n1dbins_per_decade=nbins_main_per_decade, decade_on_center=decade_on_center, bin1d_type=bin1d_type, log_scale=log_binning, qmin=qmin, qmax=qmax, annular_angle_bin=annular_bin, wedges=wedges, symmetric_wedges=symmetric_wedges, error_weighted=weighted_errors, ) # save ASCII files filename = os.path.join(output_dir, "2D", f"{outputFilename}{output_suffix}_2D.dat") save_ascii_binned_2D(filename, "I(Qx,Qy)", iq2d_main_out) for j in range(len(iq1d_main_out)): add_suffix = "" if len(iq1d_main_out) > 1: add_suffix = f"_wedge_{j}" ascii_1D_filename = os.path.join(output_dir, "1D", f"{outputFilename}{output_suffix}_1D{add_suffix}.txt") save_iqmod(iq1d_main_out[j], ascii_1D_filename, skip_nan=skip_nan) IofQ_output = namedtuple("IofQ_output", ["I2D_main", "I1D_main"]) current_output = IofQ_output(I2D_main=iq2d_main_out, I1D_main=iq1d_main_out) output.append(current_output) detectordata[name] = {"main": {"iq": iq1d_main_out, "iqxqy": iq2d_main_out}} # save reduction log filename = os.path.join( reduction_config["outputDir"], outputFilename + f"_reduction_log{output_suffix}.hdf", ) starttime = datetime.now().isoformat() # try: # pythonfile = __file__ # except NameError: # pythonfile = "Launched from notebook" reductionparams = {"data": copy.deepcopy(reduction_input)} specialparameters = { "beam_center": {"x": xc, "y": yc}, "fit_results": fit_results, "sample_transmission": sample_transmission_dict, "background_transmission": background_transmission_dict, } samplelogs = {"main": SampleLogs(processed_data_main)} logslice_data_dict = reduction_input["logslice_data"] savereductionlog( filename=filename, detectordata=detectordata, reductionparams=reductionparams, # pythonfile=pythonfile, starttime=starttime, specialparameters=specialparameters, logslicedata=logslice_data_dict, samplelogs=samplelogs, ) # change permissions to all files to allow overwrite allow_overwrite(reduction_config["outputDir"]) allow_overwrite(os.path.join(reduction_config["outputDir"], "1D")) allow_overwrite(os.path.join(reduction_config["outputDir"], "2D")) return output
def form_output_name(workspace): workspace_name = str(workspace) file_name = workspace_name.split("/")[-1].split(".")[0] return f"{file_name}.png"
[docs] def plot_reduction_output( reduction_output, reduction_input, loglog=True, imshow_kwargs=None, close_figures=False, ): reduction_config = reduction_input["configuration"] output_dir = reduction_config["outputDir"] outputFilename = reduction_input["outputFileName"] output_suffix = "" bin1d_type = reduction_config["1DQbinType"] if imshow_kwargs is None: imshow_kwargs = {} for i, out in enumerate(reduction_output): if len(reduction_output) > 1: output_suffix = f"_{i}" filename = os.path.join(output_dir, "2D", f"{outputFilename}{output_suffix}_2D.png") wedges = reduction_config["wedges"] if bin1d_type == "wedge" else None symmetric_wedges = reduction_config.get("symmetric_wedges", True) qmin = reduction_config["Qmin"] qmax = reduction_config["Qmax"] plot_IQazimuthal( out.I2D_main, filename, backend="mpl", imshow_kwargs=imshow_kwargs, title="Main", wedges=wedges, symmetric_wedges=symmetric_wedges, qmin=qmin, qmax=qmax, ) if close_figures: plt.clf() for j in range(len(out.I1D_main)): add_suffix = "" if len(out.I1D_main) > 1: add_suffix = f"_wedge_{j}" filename = os.path.join(output_dir, "1D", f"{outputFilename}{output_suffix}_1D{add_suffix}.png") plot_IQmod( [out.I1D_main[j]], filename, loglog=loglog, backend="mpl", errorbar_kwargs={"label": "main"}, ) if close_figures: plt.clf() if close_figures: plt.close() # allow overwrite allow_overwrite(os.path.join(output_dir, "1D")) allow_overwrite(os.path.join(output_dir, "2D"))
[docs] def adjust_back_panels_to_effective_position(workspace): """ As the back panels are shadowed by its neighboring front panels when the incident beam is hitting at an angle, the effective position of the back panels needs to be adjusted to the center of the gap between its two neighboring front panels. :param workspace: intput workspace @NOTE: this adjustment must be done right before convert to q space """ MoveInstrumentComponent( Workspace=workspace, ComponentName="detector1/back-panel", Z=-0.0082, RelativePosition=True )