Source code for drtsans.tof.eqsans.api

"""Top-level API for EQSANS"""

# standard imports
import copy
import os
from collections import namedtuple
from datetime import datetime
from typing import Dict, List, Optional, Set, Tuple

# third party imports
import matplotlib.pyplot as plt
from mantid.simpleapi import mtd, logger, RebinToWorkspace, SaveNexus, RemoveWorkspaceHistory

# local imports
import drtsans  # noqa E402
from drtsans import (
    NoDataProcessedError,
    apply_sensitivity_correction,
    getWedgeSelection,
    load_sensitivity_workspace,
    solid_angle_correction,
    subtract_background,  # noqa E402
)  # noqa E402
from drtsans.beam_finder import fbc_options_json, find_beam_center  # noqa E402
from drtsans.dataobjects import save_i1d, I1DAnnular  # noqa E402
from drtsans.instruments import extract_run_number  # noqa E402
from drtsans.iq import bin_all  # noqa E402
from drtsans.load import resolve_slicing
from drtsans.mask_utils import apply_mask, load_mask  # noqa E402
from drtsans.path import (  # noqa E402
    abspath,
    abspaths,
    allow_overwrite,  # noqa E402
    registered_workspace,
)
from drtsans.plots import plot_IQazimuthal, plot_i1d  # noqa E402
from drtsans.process_uncertainties import set_init_uncertainties  # noqa E402
from drtsans.samplelogs import SampleLogs  # noqa E402
from drtsans.save_2d import save_nexus, save_nist_dat  # noqa E402
from drtsans.save_ascii import save_ascii_1D, save_ascii_binned_2D  # noqa E402
from drtsans.save_cansas import save_cansas_nx, save_cansas_xml_1D
from drtsans.settings import namedtuplefy  # noqa E402
from drtsans.thickness_normalization import normalize_by_thickness  # noqa E402
from drtsans.tof.eqsans.cfg import load_config  # noqa E402
from drtsans.tof.eqsans.correction_api import (
    parse_correction_config,
)
from drtsans.tof.eqsans.dark_current import subtract_dark_current  # noqa E402
from drtsans.tof.eqsans.load import (
    load_and_split_and_histogram,
    load_events,
    load_events_and_histogram,
)  # noqa E402
from drtsans.tof.eqsans.meta_data import set_meta_data  # noqa E402
from drtsans.tof.eqsans.momentum_transfer import (
    convert_to_q,
    split_by_frame,
)  # noqa E402
from drtsans.tof.eqsans.normalization import normalize_by_flux, ZeroNeutronFluxError  # noqa E402
from drtsans.tof.eqsans.reduction_api import (
    bin_i_with_correction,
    prepare_data_workspaces,
    process_transmission,
)
from drtsans.tof.eqsans.transmission import calculate_transmission  # noqa E402
from drtsans.transmission import apply_transmission_correction  # noqa E402

__all__ = [
    "apply_solid_angle_correction",
    "subtract_background",
    "prepare_data",
    "save_ascii_1D",
    "save_cansas_xml_1D",
    "save_cansas_nx",
    "save_nist_dat",
    "save_nexus",
    "set_init_uncertainties",
    "load_all_files",
    "prepare_data_workspaces",
    "pre_process_single_configuration",
    "reduce_single_configuration",
    "plot_reduction_output",
]

I_output = namedtuple("I_output", ["I2D_main", "I1D_main"])


def _get_configuration_file_parameters(sample_run, directory=None):
    r"""
    Configuration options from a eqsans_configuration.* configuration file.
    See files in /SNS/EQSANS/shared/instrument_configuration/ for examples

    Parameters
    ----------
    sample_run: int
        run number (e.g. 12345)
    directory: str
        absolute path to the directory containing configuration files

    Returns
    -------
    dict
        contents of the configuration file
    """
    try:
        configuration_file_parameters = load_config(source=sample_run, config_dir=directory)
    except RuntimeError as e:
        logger.error(e)
        logger.warning("Not using previous configuration")
        configuration_file_parameters = {}
    return configuration_file_parameters


[docs] @namedtuplefy def load_all_files(reduction_input, prefix="", load_params=None): r""" overwrites metadata for sample workspace Workflow: 1. parse reduction_input 2. remove existing related workspaces with same run numbers 3. process beam center - output: load_params, reduction_input 4. adjust pixel heights and widths - output: load_params, reduction_input 5. load and optionally slice sample runs 6. load other runs: bkgd, empty, sample_trans, bkgd_trans Returned namedtuple: - sample, background, empty, sample_transmission, background_transmission: namedtuple(data[ws], monitor[ws]) - dark_current, sensitivity, mask: workspace Returns ------- namedtuple Named tuple including all loaded workspaces """ reduction_config = reduction_input["configuration"] # a handy shortcut to the configuration parameters dictionary instrument_name = reduction_input["instrumentName"] ipts = reduction_input["iptsNumber"] sample = reduction_input["sample"]["runNumber"] sample_trans = reduction_input["sample"]["transmission"]["runNumber"] sample_load_options = reduction_input["sample"]["loadOptions"] bkgd = reduction_input["background"]["runNumber"] bkgd_trans = reduction_input["background"]["transmission"]["runNumber"] empty = reduction_input["emptyTransmission"]["runNumber"] center = reduction_input["beamCenter"]["runNumber"] # elastic reference and background: incoherence correction elastic_ref_run = reduction_config["elasticReference"].get("runNumber") elastic_ref_bkgd_run = reduction_config["elasticReferenceBkgd"].get("runNumber") elastic_ref_trans_run = reduction_config["elasticReference"]["transmission"].get("runNumber") elastic_ref_bkgd_trans_run = reduction_config["elasticReferenceBkgd"]["transmission"].get("runNumber") from drtsans.tof.eqsans.reduction_api import remove_workspaces remove_workspaces( reduction_config, instrument_name, prefix, sample, center, extra_run_numbers=[ sample, bkgd, empty, sample_trans, bkgd_trans, elastic_ref_run, elastic_ref_bkgd_run, elastic_ref_trans_run, elastic_ref_bkgd_trans_run, ], ) default_mask = None if reduction_config["useDefaultMask"]: first_run = sample.split(",")[0].strip() configuration_file_parameters = _get_configuration_file_parameters( extract_run_number(first_run), directory=reduction_config["instrumentConfigurationDir"] ) default_mask = configuration_file_parameters["combined mask"] filenames: Set[str] = set() load_params = set_beam_center(reduction_input, prefix, default_mask, load_params, filenames) # Adjust pixel pixel positions, heights and widths load_params["scale_components"] = reduction_config.get("scaleComponents", None) load_params["pixel_calibration"] = reduction_config.get("usePixelCalibration", False) if reduction_config["detectorOffset"] is not None: load_params["detector_offset"] = reduction_config["detectorOffset"] if reduction_config["sampleOffset"] is not None: load_params["sample_offset"] = reduction_config["sampleOffset"] load_params["low_tof_clip"] = reduction_config["cutTOFmin"] load_params["high_tof_clip"] = reduction_config["cutTOFmax"] if reduction_config["wavelengthStep"] is not None: # account for wavelengthStepType step_type = 1 if reduction_config["wavelengthStepType"] == "constant Delta lambda/lambda": step_type = -1 load_params["bin_width"] = step_type * reduction_config["wavelengthStep"] load_params["monitors"] = reduction_config["normalization"] == "Monitor" # FIXME the issues with the monitor on EQSANS has not been fixed. Enable normalization by monitor (issue #538) if load_params["monitors"]: raise RuntimeError("Normalization by monitor option will be enabled in a later drt-sans release") # ----- END OF SETUP of load_params ----- # check for time/log slicing timeslice, logslice = resolve_slicing(reduction_input) load_params_sample = {**sample_load_options, **load_params} # Load (and optionally slice) sample runs # special loading case for sample to allow the slicing options logslice_data_dict = {} if timeslice or logslice: ws_name = f"{prefix}_{instrument_name}_{sample}_raw_histo_slice_group" if not registered_workspace(ws_name): filename = abspath( sample.strip(), instrument=instrument_name, ipts=ipts, directory=reduction_input["dataDirectories"] ) print(f"Loading filename {filename}") filenames.add(filename) if timeslice: logslicename = None load_and_split_and_histogram( filename, output_workspace=ws_name, time_interval=reduction_config["timeSliceInterval"], time_offset=reduction_config["timeSliceOffset"], time_period=reduction_config["timeSlicePeriod"], **load_params_sample, ) elif logslice: logslicename = reduction_config["logSliceName"] load_and_split_and_histogram( filename, output_workspace=ws_name, log_name=logslicename, log_value_interval=reduction_config["logSliceInterval"], **load_params_sample, ) for _w in mtd[ws_name]: if default_mask: apply_mask(_w, mask=default_mask) if logslicename is not None: for n in range(mtd[ws_name].getNumberOfEntries()): samplelogs = SampleLogs(mtd[ws_name].getItem(n)) logslice_data_dict[str(n)] = { "data": list(samplelogs[logslicename].value), "units": samplelogs[logslicename].units, "name": logslicename, } sample_bands = None else: # load Nexus file or files without splitting ws_name = f"{prefix}_{instrument_name}_{sample}_raw_histo" if not registered_workspace(ws_name): filename = abspaths( sample.strip(), instrument=instrument_name, ipts=ipts, directory=reduction_input["dataDirectories"] ) print(f"Loading filename {filename}") filenames.add(filename) loaded_sample_tup = load_events_and_histogram(filename, output_workspace=ws_name, **load_params_sample) sample_bands = loaded_sample_tup.bands if default_mask: apply_mask(ws_name, mask=default_mask) else: sample_bands = None reduction_input["logslice_data"] = logslice_data_dict # Load all other files without further processing # background, empty, sample transmission, background transmission, # elastic reference and its transmission, background, background transmission other_ws_list = list() for irun, run_number in enumerate( [ bkgd, empty, sample_trans, bkgd_trans, elastic_ref_run, elastic_ref_bkgd_run, elastic_ref_trans_run, elastic_ref_bkgd_trans_run, ] ): def is_elastic_ref_run(index): r"""Human way of saying the index refers to one of the elastic ref runs""" return index > 3 if run_number: # run number is given ws_name = f"{prefix}_{instrument_name}_{run_number}_raw_histo" if not registered_workspace(ws_name): filename = abspaths( run_number.strip(), instrument=instrument_name, ipts=ipts, directory=reduction_input["dataDirectories"], ) logger.notice(f"Loading filename {filename}") filenames.add(filename) if is_elastic_ref_run(irun): # elastic reference run and background run, the bands must be same as sample's load_events_and_histogram( filename, output_workspace=ws_name, sample_bands=sample_bands, **load_params, ) else: load_events_and_histogram(filename, output_workspace=ws_name, **load_params) if default_mask: apply_mask(ws_name, mask=default_mask) other_ws_list.append(mtd[ws_name]) else: # run number is not given other_ws_list.append(None) # workspaces associated to the elastic reference correction elastic_ref_ws = other_ws_list[4] elastic_ref_bkgd_ws = other_ws_list[5] elastic_ref_trans_ws = other_ws_list[6] elastic_ref_bkgd_trans_ws = other_ws_list[7] # dark run (aka dark current run) dark_current_ws = None dark_current_mon_ws = None dark_current_file = reduction_config["darkFileName"] if dark_current_file is not None: run_number = extract_run_number(dark_current_file) ws_name = f"{prefix}_{instrument_name}_{run_number}_raw_histo" if not registered_workspace(ws_name): dark_current_file = abspath(dark_current_file) print(f"Loading filename {dark_current_file}") filenames.add(dark_current_file) loaded_dark = load_events_and_histogram(dark_current_file, output_workspace=ws_name, **load_params) dark_current_ws = loaded_dark.data if default_mask: apply_mask(ws_name, mask=default_mask) else: dark_current_ws = mtd[ws_name] if registered_workspace(f"{prefix}_{instrument_name}_{sample}_raw_histo_slice_group"): sample_ws = mtd[f"{prefix}_{instrument_name}_{sample}_raw_histo_slice_group"] sample_ws_list = [w for w in sample_ws] else: sample_ws_list = [mtd[f"{prefix}_{instrument_name}_{sample}_raw_histo"]] background_ws = mtd[f"{prefix}_{instrument_name}_{bkgd}_raw_histo"] if bkgd else None empty_ws = mtd[f"{prefix}_{instrument_name}_{empty}_raw_histo"] if empty else None sample_transmission_ws = mtd[f"{prefix}_{instrument_name}_{sample_trans}_raw_histo"] if sample_trans else None background_transmission_ws = mtd[f"{prefix}_{instrument_name}_{bkgd_trans}_raw_histo"] if bkgd_trans else None if load_params["monitors"]: sample_mon_ws = mtd[f"{prefix}_{instrument_name}_{sample}_raw_histo_monitors"] background_mon_ws = mtd[f"{prefix}_{instrument_name}_{bkgd}_raw_histo_monitors"] if bkgd else None empty_mon_ws = mtd[f"{prefix}_{instrument_name}_{empty}_raw_histo_monitors"] if empty else None sample_transmission_mon_ws = ( mtd[f"{prefix}_{instrument_name}_{sample_trans}" + "_raw_histo_monitors"] if sample_trans else None ) background_transmission_mon_ws = ( mtd[f"{prefix}_{instrument_name}_{bkgd_trans}" + "_raw_histo_monitors"] if bkgd_trans else None ) else: sample_mon_ws = None background_mon_ws = None empty_mon_ws = None sample_transmission_mon_ws = None background_transmission_mon_ws = None # load required processed_files sensitivity_ws = None flood_file = reduction_input["configuration"]["sensitivityFileName"] if flood_file: sensitivity_ws_name = f"{prefix}_sensitivity" if not registered_workspace(sensitivity_ws_name): flood_file = abspath(flood_file) print(f"Loading filename {flood_file}") filenames.add(flood_file) load_sensitivity_workspace(flood_file, output_workspace=sensitivity_ws_name) sensitivity_ws = mtd[sensitivity_ws_name] # load mask mask_ws = None custom_mask_file = reduction_config["maskFileName"] if custom_mask_file is not None: mask_ws_name = f"{prefix}_mask" if not registered_workspace(mask_ws_name): custom_mask_file = abspath(custom_mask_file) print(f"Loading filename {custom_mask_file}") filenames.add(custom_mask_file) mask_ws = load_mask(custom_mask_file, output_workspace=mask_ws_name) else: mask_ws = mtd[mask_ws_name] # TODO load these files only once # beam_flux_ws = None # monitor_flux_ratio_ws = None sample_aperture_diameter = reduction_config["sampleApertureSize"] sample_thickness = reduction_input["sample"]["thickness"] smearing_pixel_size_x = reduction_config["smearingPixelSizeX"] smearing_pixel_size_y = reduction_config["smearingPixelSizeY"] # Sample workspace: set meta data for ws in sample_ws_list: set_meta_data( ws, wave_length=None, wavelength_spread=None, sample_offset=load_params["sample_offset"], sample_aperture_diameter=sample_aperture_diameter, sample_thickness=sample_thickness, source_aperture_diameter=None, smearing_pixel_size_x=smearing_pixel_size_x, smearing_pixel_size_y=smearing_pixel_size_y, ) # Set meta data to elastic reference run optionally if elastic_ref_run: reference_thickness = reduction_config["elasticReference"].get("thickness") set_meta_data( elastic_ref_ws, wave_length=None, wavelength_spread=None, sample_offset=load_params["sample_offset"], sample_aperture_diameter=sample_aperture_diameter, sample_thickness=reference_thickness, source_aperture_diameter=None, smearing_pixel_size_x=smearing_pixel_size_x, smearing_pixel_size_y=smearing_pixel_size_y, ) # There is no extra setup for elastic reference background following typical background run logger.notice("FILE PATH, FILE SIZE:") total_size = 0 for comma_separated_names in filenames: for name in comma_separated_names.split(","): try: file_size = os.path.getsize(name) except FileNotFoundError: hint = "EQSANS_{}".format(drtsans.instruments.extract_run_number(name)) name = drtsans.path.abspath(hint, instrument="EQSANS") # noqa: PLW2901 file_size = os.path.getsize(name) total_size += file_size print(name + ",", "{:.2f} MiB".format(file_size / 1024**2)) print("TOTAL: ", "{:.2f} MB".format(total_size / 1024**2)) ws_mon_pair = namedtuple("ws_mon_pair", ["data", "monitor"]) loaded_ws_dict = dict( sample=[ws_mon_pair(data=ws, monitor=sample_mon_ws) for ws in sample_ws_list], background=ws_mon_pair(data=background_ws, monitor=background_mon_ws), empty=ws_mon_pair(data=empty_ws, monitor=empty_mon_ws), sample_transmission=ws_mon_pair(data=sample_transmission_ws, monitor=sample_transmission_mon_ws), background_transmission=ws_mon_pair(data=background_transmission_ws, monitor=background_transmission_mon_ws), dark_current=ws_mon_pair(data=dark_current_ws, monitor=dark_current_mon_ws), sensitivity=sensitivity_ws, mask=mask_ws, elastic_reference=ws_mon_pair(elastic_ref_ws, None), elastic_reference_background=ws_mon_pair(elastic_ref_bkgd_ws, None), elastic_reference_transmission=ws_mon_pair(elastic_ref_trans_ws, None), elastic_reference_background_transmission=ws_mon_pair(elastic_ref_bkgd_trans_ws, None), ) return loaded_ws_dict
[docs] def pre_process_single_configuration( sample_ws_raw: namedtuple, sample_trans_ws=None, sample_trans_value=None, bkg_ws_raw=None, bkg_trans_ws=None, bkg_trans_value=None, theta_dependent_transmission=True, dark_current=None, flux_method=None, # normalization (time/monitor/proton charge) flux=None, # file for flux mask_ws=None, # apply a custom mask from workspace mask_panel=None, # mask back or front panel mask_btp=None, # mask bank/tube/pixel solid_angle=True, sensitivity_workspace=None, output_workspace=None, output_suffix="", thickness=1.0, absolute_scale_method="standard", empty_beam_ws=None, beam_radius=None, absolute_scale=1.0, keep_processed_workspaces=True, ): r""" This function provides full data processing for a single experimental configuration, starting from workspaces (no data loading is happening inside this function) Parameters ---------- sample_ws_raw: namedtuple (~mantid.dataobjects.Workspace2D, ~mantid.dataobjects.Workspace2D) raw data histogram workspace and monitor sample_trans_ws: ~mantid.dataobjects.Workspace2D optional histogram workspace for sample transmission (already prepared) sample_trans_value: float optional value for sample transmission bkg_ws_raw: ~mantid.dataobjects.Workspace2D optional raw histogram workspace for background bkg_trans_ws: ~mantid.dataobjects.Workspace2D optional histogram workspace for background transmission bkg_trans_value: float optional value for background transmission theta_dependent_transmission: bool flag to apply angle dependent transmission dark_current: ~mantid.dataobjects.Workspace2D dark current workspace flux_method: str normalization by time or monitor mask_ws: ~mantid.dataobjects.Workspace2D user defined mask mask_panel: str mask fron or back panel mask_btp: dict optional bank, tube, pixel to mask solid_angle: bool flag to apply solid angle sensitivity_workspace: ~mantid.dataobjects.Workspace2D workspace containing sensitivity output_workspace: str output workspace name output_suffix:str suffix for output workspace thickness: float sample thickness (cm) absolute_scale_method: str method to do absolute scaling (standard or direct_beam) empty_beam_ws: ~mantid.dataobjects.Workspace2D empty beam workspace for absolute scaling beam_radius: float beam radius for absolute scaling absolute_scale: float absolute scaling value for standard method keep_processed_workspaces: bool flag to keep the processed background workspace Returns ------- ~mantid.dataobjects.Workspace2D Reference to the processed workspace """ if not output_workspace: output_workspace = output_suffix + "_sample" # create a common configuration for prepare data prepare_data_conf = { "dark_current": dark_current, "flux_method": flux_method, "flux": flux, "mask_ws": mask_ws, "mask_panel": mask_panel, "mask_btp": mask_btp, "solid_angle": solid_angle, "sensitivity_workspace": sensitivity_workspace, } # process sample sample_ws = prepare_data_workspaces(sample_ws_raw, output_workspace=output_workspace, **prepare_data_conf) # apply transmission to the sample if sample_trans_ws or sample_trans_value: if sample_trans_ws: RebinToWorkspace( WorkspaceToRebin=sample_trans_ws, WorkspaceToMatch=sample_ws, OutputWorkspace=sample_trans_ws, ) sample_ws = apply_transmission_correction( sample_ws, trans_workspace=sample_trans_ws, trans_value=sample_trans_value, theta_dependent=theta_dependent_transmission, output_workspace=output_workspace, ) # process background, if not already processed if bkg_ws_raw.data: # process background run bkgd_ws_name = output_suffix + "_background" if not registered_workspace(bkgd_ws_name): bkgd_ws = prepare_data_workspaces(bkg_ws_raw, output_workspace=bkgd_ws_name, **prepare_data_conf) # apply transmission to background if bkg_trans_ws or bkg_trans_value: if bkg_trans_ws: RebinToWorkspace( WorkspaceToRebin=bkg_trans_ws, WorkspaceToMatch=bkgd_ws, OutputWorkspace=bkg_trans_ws, ) bkgd_ws = apply_transmission_correction( bkgd_ws, trans_workspace=bkg_trans_ws, trans_value=bkg_trans_value, theta_dependent=theta_dependent_transmission, output_workspace=bkgd_ws_name, ) else: bkgd_ws = mtd[bkgd_ws_name] # subtract background sample_ws = subtract_background(sample_ws, bkgd_ws) if not keep_processed_workspaces: bkgd_ws.delete() # finalize with absolute scale and thickness sample_ws = normalize_by_thickness(sample_ws, thickness) # standard method assumes absolute scale from outside if absolute_scale_method == "direct_beam": raise NotImplementedError("This feature is not yet implemented for EQSANS") else: sample_ws *= absolute_scale return mtd[output_workspace]
[docs] def reduce_single_configuration( loaded_ws: namedtuple, reduction_input, prefix="", skip_nan=True, not_apply_elastic_correction: bool = False, not_apply_incoherence_correction: bool = False, ): # TODO: finish this incomplete docstring """Reduce samples from raw workspaces including 1. prepare data 1. This is the main entry point of reduction Input loaded workspaces as namedtuple: - sample, background, empty, sample_transmission, background_transmission: namedtuple(data[ws], monitor[ws]) - dark_current, sensitivity, mask: workspace Parameters ---------- loaded_ws: namedtuple loaded workspaces, usually the return value of `load_all_files()` reduction_input: dict reduction configuration prefix skip_nan not_apply_elastic_correction: bool If true, then no elastic scattering correction will be applied to reduction overriding JSON not_apply_incoherence_correction: bool If true, then no inelastic incoherence scattering correction will be applied to reduction overriding JSON Returns ------- ~list list of I_output: ['I2D_main', 'I1D_main'] """ # Process reduction input: configuration and etc. reduction_config = reduction_input["configuration"] # Process elastic and inelastic/incoherent scattering correction configuration if user does not specify assert isinstance(not_apply_elastic_correction, bool), "Only boolean for skip flag is allowed" assert isinstance(not_apply_incoherence_correction, bool), "Only boolean for skip flag is allowed" correction_setup = parse_correction_config( reduction_input, not_apply_elastic_correction, not_apply_incoherence_correction ) # process: flux, monitor, proton charge, ... flux_method_translator = { "Monitor": "monitor", "Total charge": "proton charge", "Time": "time", } flux_method = flux_method_translator.get(reduction_config["normalization"], None) flux_translator = { "Monitor": reduction_config["fluxMonitorRatioFile"], "Total charge": reduction_config["beamFluxFileName"], "Time": "duration", } flux = flux_translator.get(reduction_config["normalization"], None) solid_angle = reduction_config["useSolidAngleCorrection"] transmission_radius = reduction_config["mmRadiusForTransmission"] sample_trans_value = reduction_input["sample"]["transmission"]["value"] bkg_trans_value = reduction_input["background"]["transmission"]["value"] theta_dependent_transmission = reduction_config["useThetaDepTransCorrection"] mask_panel = "back" if reduction_config["useMaskBackTubes"] is True else None output_suffix = "" thickness = reduction_input["sample"]["thickness"] absolute_scale_method = reduction_config["absoluteScaleMethod"] beam_radius = None # EQSANS doesn't use keyword DBScalingBeamRadius absolute_scale = reduction_config["StandardAbsoluteScale"] output_dir = reduction_config["outputDir"] # process binning # Note: option {even_decades = reduction_config["useLogQBinsEvenDecade"]} is removed nybins_main = nxbins_main = reduction_config["numQxQyBins"] bin1d_type = reduction_config["1DQbinType"] log_binning = reduction_config["QbinType"] == "log" # FIXME - note: fixed to log binning decade_on_center = reduction_config["useLogQBinsDecadeCenter"] nbins_main = reduction_config["numQBins"] nbins_main_per_decade = reduction_config["LogQBinsPerDecade"] outputFilename = reduction_input["outputFileName"] weighted_errors = reduction_config["useErrorWeighting"] qmin = reduction_config["Qmin"] qmax = reduction_config["Qmax"] annular_bin = reduction_config["AnnularAngleBin"] wedges_min = reduction_config["WedgeMinAngles"] wedges_max = reduction_config["WedgeMaxAngles"] wedges = None if wedges_min is None or wedges_max is None else list(zip(wedges_min, wedges_max)) # set the found wedge values to the reduction input, this will allow correct plotting reduction_config["wedges"] = wedges reduction_config["symmetric_wedges"] = True time_slice = reduction_config["useTimeSlice"] # automatically determine wedge binning if it wasn't explicitly set autoWedgeOpts, symmetric_wedges = parse_auto_wedge_setup(reduction_config, bin1d_type, wedges_min) # set up subpixel binning options FIXME - it does not seem to work subpixel_kwargs = dict() if reduction_config["useSubpixels"]: subpixel_kwargs = { "n_horizontal": reduction_config["subpixelsX"], "n_vertical": reduction_config["subpixelsY"], } ############################################## # PROCESS SAMPLE AND BACKGROUND TRANSMISSIONS ############################################## # Prepare empty beam transmission workspace # if loaded_ws.empty.data is not None: empty_trans_ws_name = f"{prefix}_empty" empty_trans_ws = prepare_data_workspaces( loaded_ws.empty, flux_method=flux_method, flux=flux, solid_angle=False, sensitivity_workspace=loaded_ws.sensitivity, output_workspace=empty_trans_ws_name, ) else: empty_trans_ws = None # Sample transmission # sample_returned = process_transmission( loaded_ws.sample_transmission, empty_trans_ws, transmission_radius, loaded_ws.sensitivity, flux_method, flux, prefix, "sample", output_dir, outputFilename, ) (sample_trans_ws, sample_transmission_dict, sample_transmission_raw_dict) = sample_returned # Background transmission # # TODO 781 - to test and check for sanity # specific output filename (base) for background trans if loaded_ws.background_transmission.data: bkgd_trans = reduction_input["background"]["transmission"]["runNumber"].strip() base_out_name = f"{outputFilename}_bkgd_{bkgd_trans}" # process transmission bkgd_returned = process_transmission( loaded_ws.background_transmission, empty_trans_ws, transmission_radius, loaded_ws.sensitivity, flux_method, flux, prefix, "bkgd", output_dir, base_out_name, ) else: bkgd_returned = (None, None, None) bkgd_trans_ws, background_transmission_dict, background_transmission_raw_dict = bkgd_returned ########################################## # PROCESS ELASTIC REFERENCE NORMALIZATION ########################################## if correction_setup.do_elastic_correction and correction_setup.elastic_reference: # sanity check assert loaded_ws.elastic_reference.data, f"Reference run is not loaded: {correction_setup.elastic_reference}" ############################################## # PROCESS SAMPLE AND BACKGROUND TRANSMISSIONS ############################################## # Elastic reference transmission # if loaded_ws.elastic_reference_transmission.data: # process transmission (returns a 3-item tuple) output = process_transmission( loaded_ws.elastic_reference_transmission, empty_trans_ws, transmission_radius, loaded_ws.sensitivity, flux_method, flux, prefix, "elasticref", None, None, ) else: output = (None, None, None) elasticref_trans_ws, _, _ = output # Elastic reference background transmission # if loaded_ws.elastic_reference_background_transmission.data: # process transmission (returns a 3-item tuple) output = process_transmission( loaded_ws.elastic_reference_background_transmission, empty_trans_ws, transmission_radius, loaded_ws.sensitivity, flux_method, flux, prefix, "elasticrefbkgd", None, None, ) else: output = (None, None, None) elasticrefbkgd_trans_ws, _, _ = output ############################ # I(Q) OF ELASTIC REFERENCE ############################ elastic_ref = correction_setup.elastic_reference processed_elastic_ref = pre_process_single_configuration( loaded_ws.elastic_reference, sample_trans_ws=elasticref_trans_ws, sample_trans_value=elastic_ref.transmission_value, bkg_ws_raw=loaded_ws.elastic_reference_background, bkg_trans_ws=elasticrefbkgd_trans_ws, bkg_trans_value=elastic_ref.background_transmission_value, # noqa E502 theta_dependent_transmission=theta_dependent_transmission, # noqa E502 dark_current=loaded_ws.dark_current, flux_method=flux_method, flux=flux, mask_ws=loaded_ws.mask, mask_panel=mask_panel, solid_angle=solid_angle, sensitivity_workspace=loaded_ws.sensitivity, output_workspace="processed_elastic_ref", output_suffix=output_suffix, thickness=elastic_ref.thickness, absolute_scale_method=absolute_scale_method, empty_beam_ws=empty_trans_ws, beam_radius=beam_radius, absolute_scale=absolute_scale, keep_processed_workspaces=False, ) # convert to I(Q) iq1d_elastic_ref = convert_to_q(processed_elastic_ref, mode="scalar", **subpixel_kwargs) iq2d_elastic_ref = convert_to_q(processed_elastic_ref, mode="azimuthal", **subpixel_kwargs) # split to frames iq1d_elastic_ref_frames = split_by_frame(processed_elastic_ref, iq1d_elastic_ref, verbose=True) iq2d_elastic_ref_frames = split_by_frame(processed_elastic_ref, iq2d_elastic_ref, verbose=True) else: iq1d_elastic_ref_frames = iq2d_elastic_ref_frames = None # --------------- END OF PROCESS ELASTIC REFERENCE ------------- # Define output data structure output = [] detectordata = {} processed_data_main = None for i, raw_sample_ws in enumerate(loaded_ws.sample): slice_name = f"slice_{i}" if len(loaded_ws.sample) > 1: output_suffix = f"_{i}" raw_name = f"EQSANS_{raw_sample_ws.data.getRunNumber()}" # process data without correction try: processed_data_main = pre_process_single_configuration( raw_sample_ws, sample_trans_ws=sample_trans_ws, sample_trans_value=sample_trans_value, bkg_ws_raw=loaded_ws.background, bkg_trans_ws=bkgd_trans_ws, bkg_trans_value=bkg_trans_value, theta_dependent_transmission=theta_dependent_transmission, # noqa E502 dark_current=loaded_ws.dark_current, flux_method=flux_method, flux=flux, mask_ws=loaded_ws.mask, mask_panel=mask_panel, solid_angle=solid_angle, sensitivity_workspace=loaded_ws.sensitivity, output_workspace="processed_data_main", output_suffix=output_suffix, thickness=thickness, absolute_scale_method=absolute_scale_method, empty_beam_ws=empty_trans_ws, beam_radius=beam_radius, absolute_scale=absolute_scale, keep_processed_workspaces=False, ) except ZeroNeutronFluxError as e: if time_slice: logger.warning(f"Skipping slice {slice_name}: {e}.") continue else: raise # convert to Q iq1d_main_in = convert_to_q(processed_data_main, mode="scalar", **subpixel_kwargs) iq2d_main_in = convert_to_q(processed_data_main, mode="azimuthal", **subpixel_kwargs) # split to frames iq1d_main_in_fr = split_by_frame(processed_data_main, iq1d_main_in, verbose=True) iq2d_main_in_fr = split_by_frame(processed_data_main, iq2d_main_in, verbose=True) # Save nexus processed # For EQSANS the suffix has to add _processed to tell the output file apart # from the original data file # remove history to write less data and speed up I/O if reduction_config["removeAlgorithmHistory"]: RemoveWorkspaceHistory(processed_data_main) filename = os.path.join(output_dir, f"{outputFilename}{output_suffix}_processed.nxs") SaveNexus(processed_data_main, Filename=filename) print(f"SaveNexus to {filename}") # Work with wedges if bool(autoWedgeOpts): # determine wedges automatically from the main detectora wedges = process_auto_wedge( autoWedgeOpts, iq2d_main_in, output_dir, reduction_config, symmetric_wedges, ) num_of_frames = len(iq2d_main_in_fr) _inside_detectordata = {} # Process each frame separately for frameskip_frame in range(num_of_frames): if num_of_frames > 1: fr_log_label = f"_frame_{frameskip_frame}" fr_label = fr_log_label else: fr_log_label = "frame" fr_label = "" assert iq1d_main_in_fr[frameskip_frame] is not None, "Input I(Q) main input cannot be None." assert iq2d_main_in_fr[frameskip_frame] is not None, "Input I(qx, qy) main input cannot be None." iq2d_main_out, i1d_main_out = bin_i_with_correction( iq1d_main_in_fr, iq2d_main_in_fr, frameskip_frame, slice_name, weighted_errors, qmin, qmax, nxbins_main, nybins_main, nbins_main, nbins_main_per_decade, decade_on_center, bin1d_type, log_binning, annular_bin, wedges, symmetric_wedges, correction_setup, iq1d_elastic_ref_frames, iq2d_elastic_ref_frames, raw_name, output_dir, outputFilename, ) _inside_detectordata[fr_log_label] = { "i1d": i1d_main_out, "iqxqy": iq2d_main_out, } # save ASCII and NXCANSAS files filename = os.path.join(output_dir, f"{outputFilename}{output_suffix}{fr_label}_Iqxqy") if iq2d_main_out: save_ascii_binned_2D(f"{filename}.dat", "I(Qx,Qy)", iq2d_main_out) save_cansas_nx(iq2d_main_out.to_workspace(), f"{filename}.h5") binning_suffix = "Iq" if isinstance(i1d_main_out[0], I1DAnnular): binning_suffix = "Iphi" for j in range(len(i1d_main_out)): add_suffix = "" if len(i1d_main_out) > 1: add_suffix = f"_wedge_{j}" add_suffix += fr_label ascii_1D_filename = os.path.join( output_dir, f"{outputFilename}{output_suffix}{add_suffix}_{binning_suffix}" ) save_i1d(i1d_main_out[j], f"{ascii_1D_filename}.dat", skip_nan=skip_nan) current_output = I_output(I2D_main=iq2d_main_out, I1D_main=i1d_main_out) output.append(current_output) # END binning loop over frame detectordata[slice_name] = _inside_detectordata # END reduction loop over sample workspaces if processed_data_main is None: raise NoDataProcessedError # Save reduction log save_reduction_log( reduction_input, outputFilename, processed_data_main, sample_transmission_dict, sample_transmission_raw_dict, background_transmission_dict, background_transmission_raw_dict, detectordata, output_dir, ) return output
def save_reduction_log( reduction_input: Dict, output_file_name: str, processed_data_main, sample_transmission_dict: Dict, sample_transmission_raw_dict: Dict, background_transmission_dict: Dict, background_transmission_raw_dict, detector_data, output_dir: str, ): """Save reduction log to an HDF5 file""" # create reduction log filename = os.path.join( reduction_input["configuration"]["outputDir"], output_file_name + "_reduction_log.hdf", ) starttime = datetime.now().isoformat() # try: # pythonfile = __file__ # except NameError: # pythonfile = "Launched from notebook" reductionparams = {"data": copy.deepcopy(reduction_input)} beam_center_dict = reduction_input["beam_center"] specialparameters = { "beam_center": { "x": beam_center_dict["x"], "y": beam_center_dict["y"], "type": beam_center_dict["type"], }, "fit_results": beam_center_dict["fit_results"], "sample_transmission": sample_transmission_dict, "sample_transmission_raw": sample_transmission_raw_dict, "background_transmission": background_transmission_dict, "background_transmission_raw": background_transmission_raw_dict, } # [#689] TODO FIXME - Reincarnate this section! # FIXME - check original code. processed_data_main outside a loop is a BUG! # The correction workflow does not output processed data workspace yet! assert processed_data_main is not None samplelogs = {"main": SampleLogs(processed_data_main)} logslice_data_dict = reduction_input["logslice_data"] drtsans.savereductionlog( filename=filename, detectordata=detector_data, reductionparams=reductionparams, starttime=starttime, specialparameters=specialparameters, logslicedata=logslice_data_dict, samplelogs=samplelogs, ) # change permissions to all files to allow overwrite allow_overwrite(output_dir) def process_auto_wedge( auto_wedge_setup: Dict, iq2d_input, output_dir: str, reduction_config: Dict, symmetric_wedges, ) -> List: """Process and set up auto wedge Returns ------- ~list list containing 2 lists each contains 2 2-tuples as ``[[(peak1_min, peak1_max), (peak2_min, peak2_max)], [(..., ...), (..., ...)]]`` """ logger.notice(f"Auto wedge options: {auto_wedge_setup}") auto_wedge_setup["debug_dir"] = output_dir auto_wedge_setup["auto_wedge_phi_min"] = reduction_config["autoWedgePhiMin"] auto_wedge_setup["auto_wedge_phi_max"] = reduction_config["autoWedgePhiMax"] auto_wedge_setup["auto_symmetric_wedges"] = reduction_config[ "autoSymmetricWedges" ] # this is a parameter from json file wedges = getWedgeSelection(iq2d_input, **auto_wedge_setup) logger.notice(f"found wedge angles:\n peak: {wedges[0]}\n background: {wedges[1]}") # sanity check assert len(wedges) == 2, f"Auto-wedges {wedges} shall have 2 2-tuples" # set automated wedge to reduction configuration for correct plotting. # reduction_config is an in/out function argument reduction_config["wedges"] = wedges reduction_config["symmetric_wedges"] = symmetric_wedges return wedges def parse_auto_wedge_setup(reduction_config: Dict, bin1d_type: str, wedges_min) -> Tuple[Dict, bool]: """Parse JSON input for automatic wedge setup""" autoWedgeOpts = {} symmetric_wedges = True if bin1d_type == "wedge" and len(wedges_min) == 0: # the JSON validator "wedgesources" guarantees that the parameters to be collected are all non-empty autoWedgeOpts = { "q_min": reduction_config["autoWedgeQmin"], "q_delta": reduction_config["autoWedgeQdelta"], "q_max": reduction_config["autoWedgeQmax"], "azimuthal_delta": reduction_config["autoWedgeAzimuthalDelta"], "peak_width": reduction_config["autoWedgePeakWidth"], "background_width": reduction_config["autoWedgeBackgroundWidth"], "signal_to_noise_min": reduction_config["autoWedgeSignalToNoiseMin"], } # auto-aniso returns all of the wedges symmetric_wedges = False return autoWedgeOpts, symmetric_wedges
[docs] def plot_reduction_output(reduction_output, reduction_input, imshow_kwargs=None): reduction_config = reduction_input["configuration"] output_dir = reduction_config["outputDir"] outputFilename = reduction_input["outputFileName"] output_suffix = "" bin1d_type = reduction_config["1DQbinType"] if imshow_kwargs is None: imshow_kwargs = {} for i, out in enumerate(reduction_output): if len(reduction_output) > 1: output_suffix = f"_{i}" wedges = reduction_config["wedges"] if bin1d_type == "wedge" else None symmetric_wedges = reduction_config.get("symmetric_wedges", True) qmin = reduction_config["Qmin"] qmax = reduction_config["Qmax"] filename = os.path.join(output_dir, f"{outputFilename}{output_suffix}_Iqxqy.png") plot_IQazimuthal( out.I2D_main, filename, backend="mpl", imshow_kwargs=imshow_kwargs, title="Main", wedges=wedges, symmetric_wedges=symmetric_wedges, qmin=qmin, qmax=qmax, ) plt.clf() binning_suffix = "Iq" if isinstance(out.I1D_main[0], I1DAnnular): binning_suffix = "Iphi" for j in range(len(out.I1D_main)): add_suffix = "" if len(out.I1D_main) > 1: add_suffix = f"_wedge_{j}" filename = os.path.join(output_dir, f"{outputFilename}{output_suffix}{add_suffix}_{binning_suffix}.png") plot_i1d( [out.I1D_main[j]], filename, log_scale=True, backend="mpl", errorbar_kwargs={"label": "main"}, ) plt.clf() plt.close() # change permissions to all files to allow overwrite allow_overwrite(output_dir)
[docs] def apply_solid_angle_correction(input_workspace): """Apply solid angle correction. This uses :func:`drtsans.solid_angle_correction`.""" return solid_angle_correction(input_workspace, detector_type="VerticalTube")
def set_beam_center( reduction_input: dict, prefix: str, default_mask: str, load_params: Optional[Dict], filenames: Set[str] ) -> dict: r""" Helping method to find the beam center coordinates. Parameters ---------- reduction_input reduction configurations, holding all parameter values prefix String to prepend to the workspace name that either containing or to contain the beam center events default_mask The default mask to use load_params: The load parameters filenames (Output) Will add to this set the absolute path to the beam center file """ # find the center first, if so required center = reduction_input["beamCenter"]["runNumber"] instrument_name = reduction_input["instrumentName"] if center: center_ws_name = f"{prefix}_{instrument_name}_{center}_raw_events" if not registered_workspace(center_ws_name): reduction_config = reduction_input["configuration"] center_filename = abspath( center, instrument=instrument_name, ipts=reduction_input["iptsNumber"], directory=reduction_input["dataDirectories"], ) load_events( center_filename, pixel_calibration=reduction_config.get("usePixelCalibration", False), output_workspace=center_ws_name, ) filenames.add(center_filename) if reduction_config["useDefaultMask"]: apply_mask(center_ws_name, mask=default_mask) fbc_options = fbc_options_json(reduction_input) center_x, center_y, fit_results = find_beam_center(center_ws_name, **fbc_options) logger.notice(f"calculated center ({center_x}, {center_y})") beam_center_type = "calculated" else: # use default EQSANS center # TODO - it is better to have these hard code value defined outside of this method center_x = 0.025239 center_y = 0.0170801 logger.notice(f"use default center ({center_x}, {center_y})") beam_center_type = "default" fit_results = None # set beam center to reduction configuration reduction_input["beam_center"] = { "type": beam_center_type, "x": center_x, "y": center_y, "fit_results": fit_results, } # update to 'load_params' if load_params is None: load_params = dict(center_x=center_x, center_y=center_y, keep_events=False) elif isinstance(load_params, dict): load_params["center_x"] = center_x load_params["center_y"] = center_y load_params["keep_events"] = False else: raise RuntimeError(f"load_param of type {type(load_params)} is not allowed.") return load_params
[docs] def prepare_data( data, scale_components=None, pixel_calibration=False, detector_offset=0, sample_offset=0, bin_width=0.1, low_tof_clip=500, high_tof_clip=2000, center_x=None, center_y=None, dark_current=None, flux_method=None, flux=None, mask=None, mask_panel=None, btp=dict(), solid_angle=True, sensitivity_file_path=None, sensitivity_workspace=None, sample_aperture_diameter=None, sample_thickness=None, source_aperture_diameter=None, smearing_pixel_size_x=None, smearing_pixel_size_y=None, output_workspace=None, output_suffix="", ): r""" Load an EQSANS data file and bring the data to a point where it can be used. This includes applying basic corrections that are always applied regardless of whether the data is background or scattering data. Parameters ---------- data: int, str, ~mantid.api.IEventWorkspace Run number as int or str, file path, :py:obj:`~mantid.api.IEventWorkspace` scale_components: Optional[dict] Dictionary of component names and scaling factors in the form of a three-element list, indicating rescaling of the pixels in the component along the X, Y, and Z axes. For instance, ``{"detector1": [1.0, 2.0, 1.0]}`` scales pixels along the Y-axis by a factor of 2, leaving the other pixel dimensions unchanged. pixel_calibration: bool Adjust pixel heights and widths according to barscan and tube-width calibrations. detector_offset: float Additional translation of the detector along Z-axis, in mili-meters. sample_offset: float Additional translation of the sample along the Z-axis, in mili-meters. bin_width: float Bin width for the output workspace, in Angstroms. low_tof_clip: float Ignore events with a time-of-flight (TOF) smaller than the minimal TOF plus this quantity. high_tof_clip: float Ignore events with a time-of-flight (TOF) bigger than the maximal TOF minus this quantity. center_x: float Move the center of the detector to this X-coordinate. If :py:obj:`None`, the detector will be moved such that the X-coordinate of the intersection point between the neutron beam and the detector array will have ``x=0``. center_y: float Move the center of the detector to this X-coordinate. If :py:obj:`None`, the detector will be moved such that the X-coordinate of the intersection point between the neutron beam and the detector array will have ``y=0``. dark_current: int, str, ~mantid.api.IEventWorkspace Run number as int or str, file path, :py:obj:`~mantid.api.IEventWorkspace` flux_method: str Method for flux normalization. Either 'proton charge', 'monitor', or 'time'. flux: str if ``flux_method`` is proton charge, then path to file containing the wavelength distribution of the neutron flux. If ``flux method`` is monitor, then path to file containing the flux-to-monitor ratios. if ``flux_method`` is time, then pass one log entry name such as ``duration`` or leave it as :py:obj:`None` for automatic log search. panel: str Either 'front' or 'back' to mask a whole panel mask_panel: str Either 'front' or 'back' to mask whole front or back panel. mask: mask file path, MaskWorkspace, list Additional mask to be applied. If `list`, it is a list of detector ID's. btp: dict Additional properties to Mantid's MaskBTP algorithm solid_angle: bool Apply the solid angle correction sensitivity_file_path: str file containing previously calculated sensitivity correction sensitivity_workspace: str, ~mantid.api.MatrixWorkspace workspace containing previously calculated sensitivity correction. This overrides the sensitivity_filename if both are provided. sample_aperture_diameter: float, None sample aperture diameter in unit mm sample_thickness: None, float sample thickness in unit cm source_aperture_diameter: float, None source aperture diameter in unit meter smearing_pixel_size_x: float, None pixel size in x direction in unit as meter, only for Q-resolution calculation smearing_pixel_size_y: float, None pixel size in Y direction in unit as meter, only for Q-resolutio calculation output_workspace: str Name of the output workspace. If not supplied, will be determined from the supplied value of ``data``. output_suffix: str If the ``output_workspace`` is not specified, this is appended to the automatically generated output workspace name. Returns ------- ~mantid.api.IEventWorkspace Reference to the events workspace """ # First, load the event stream data into a workspace # The output_workspace name is for the Mantid workspace workspaces = load_events_and_histogram( data, scale_components=scale_components, pixel_calibration=pixel_calibration, detector_offset=detector_offset, sample_offset=sample_offset, output_workspace=output_workspace, output_suffix=output_suffix, center_x=center_x, center_y=center_y, bin_width=bin_width, low_tof_clip=low_tof_clip, high_tof_clip=high_tof_clip, keep_events=(dark_current is None), monitors=(flux_method == "monitor"), ) output_workspace = workspaces.data # Next, we subtract dark current, if it exists. # Note that the function handles the normalization internally. if dark_current is not None: output_workspace = subtract_dark_current(output_workspace, dark_current) # The solid angle is corrected for next if solid_angle: if solid_angle is True: output_workspace = apply_solid_angle_correction(output_workspace) else: # assume the solid_angle parameter is a workspace output_workspace = apply_solid_angle_correction(output_workspace, solid_angle_ws=solid_angle) # Interestingly, this is the only use of the btp dictionary. # The BTP stands for banks, tubes and pixels - it is a Mantid thing. apply_mask(output_workspace, panel=mask_panel, mask=mask, **btp) # returns the mask # Correct for the detector sensitivity (the per pixel relative response) if sensitivity_file_path is not None or sensitivity_workspace is not None: kw = dict( sensitivity_filename=sensitivity_file_path, sensitivity_workspace=sensitivity_workspace, ) output_workspace = apply_sensitivity_correction(output_workspace, **kw) # We can perform the desired normalization here. if flux_method is not None: kw = dict(method=flux_method) if flux_method == "monitor": kw["monitor_workspace"] = str(workspaces.monitor) output_workspace = normalize_by_flux(output_workspace, flux, **kw) # Overwrite meta data set_meta_data( output_workspace, None, None, sample_offset, sample_aperture_diameter, sample_thickness, source_aperture_diameter, smearing_pixel_size_x, smearing_pixel_size_y, ) if isinstance(output_workspace, str): return mtd[output_workspace] # shouldn't happen else: return output_workspace