""" BIOSANS API """
# local imports
import drtsans
from drtsans import getWedgeSelection, subtract_background
from drtsans.dataobjects import save_iqmod, IQazimuthal
from drtsans.instruments import extract_run_number
from drtsans.iq import bin_all
from drtsans.load import move_instrument
from drtsans.mask_utils import apply_mask, load_mask
from drtsans.mono import biosans
from drtsans.mono import meta_data
from drtsans.mono.biosans import solid_angle_correction
from drtsans.mono.biosans.geometry import has_midrange_detector
from drtsans.mono.dark_current import subtract_dark_current
from drtsans.mono.load import load_events, transform_to_wavelength, set_init_uncertainties
from drtsans.mono.meta_data import get_sample_detector_offset, parse_json_meta_data, set_meta_data
from drtsans.mono.normalization import normalize_by_monitor, normalize_by_time
from drtsans.mono.transmission import apply_transmission_correction, calculate_transmission
from drtsans.path import abspath, abspaths, allow_overwrite, registered_workspace
from drtsans.plots import plot_detector
from drtsans.samplelogs import SampleLogs
from drtsans.save_ascii import save_ascii_binned_2D
from drtsans.sensitivity import apply_sensitivity_correction, load_sensitivity_workspace
from drtsans.settings import namedtuplefy
from drtsans.stitch import stitch_binned_profiles
from drtsans.reductionlog import savereductionlog
from drtsans.thickness_normalization import normalize_by_thickness
# third party imports
from mantid.dataobjects import Workspace2D
from mantid.kernel import Logger
from mantid.simpleapi import mtd, MaskDetectors, LoadEventNexus, LoadNexusProcessed, DeleteWorkspace
from matplotlib.colors import LogNorm
import matplotlib.pyplot as plt
# standard imports
from collections import namedtuple
import copy
from datetime import datetime
import os
from typing import Union, List
# Functions exposed to the general user (public) API
__all__ = [
"prepare_data",
"load_all_files",
"plot_reduction_output",
"prepare_data_workspaces",
"process_single_configuration",
"reduce_single_configuration",
]
# silicon window to nominal position (origin) distance in meter
SI_WINDOW_NOMINAL_DISTANCE_METER = 0.071
SAMPLE_SI_META_NAME = "CG3:CS:SampleToSi"
# setup logger
# NOTE: If logging information is not showing up, please check the mantid log level.
# If problem persists, please visit:
# https://docs.mantidproject.org/nightly/concepts/PropertiesFile.html#logging-properties
logger = Logger("BioSANS")
[docs]
@namedtuplefy
def load_all_files(
reduction_input: dict,
prefix: str = "",
load_params: dict = {},
path: str = None,
use_nexus_idf: bool = False,
debug_output: bool = False,
) -> dict:
"""Load all required files at the beginning, and transform them into histograms.
Parameters
----------
reduction_input: dict
Dictionary containing the reduction input
prefix: str
Prefix to be used for the workspaces
load_params: dict
Dictionary containing the parameters to be passed to the Load event algorithm
path: str
Additional path to search the for NeXus files
use_nexus_idf: bool
Flag to enforce to use IDF from NeXus file. It must be true for SPICE-converted NeXus
debug_output: bool
Flag to save internal data for debugging
Returns
-------
dict
Dictionary containing the configuration used to load the workspaces
"""
# a handy shortcut to the configuration parameters dictionary
reduction_config = reduction_input["configuration"]
instrument_name = reduction_input["instrumentName"]
ipts = reduction_input["iptsNumber"]
sample = reduction_input["sample"]["runNumber"]
# on the fly check to see if mid-range detector is present in data
reduction_input["has_midrange_detector"] = file_has_midrange_detector(
sample=sample,
ipts=ipts,
instrument_name=instrument_name,
directory=path,
)
# check that the configuration is consistent with the presence/absence of the midrange detector in the run
check_overlap_stitch_configuration(reduction_input)
sample_trans = reduction_input["sample"]["transmission"]["runNumber"]
bkgd = reduction_input["background"]["runNumber"]
bkgd_trans = reduction_input["background"]["transmission"]["runNumber"]
empty = reduction_input["emptyTransmission"]["runNumber"]
center = reduction_input["beamCenter"]["runNumber"]
blocked_beam = reduction_config["blockedBeamRunNumber"]
# Remove existing workspaces, this is to guarantee that all the data is loaded correctly
# In the future this should be made optional
ws_to_remove = [
f"{prefix}_{instrument_name}_{run_number}_raw_histo"
for run_number in (
sample,
center,
bkgd,
empty,
sample_trans,
bkgd_trans,
blocked_beam,
)
]
ws_to_remove.append(f"{prefix}_{instrument_name}_{sample}_raw_histo_slice_group")
ws_to_remove.append(f"{prefix}_main_sensitivity")
ws_to_remove.append(f"{prefix}_wing_sensitivity")
ws_to_remove.append(f"{prefix}_midrange_sensitivity")
ws_to_remove.append(f"{prefix}_mask")
if reduction_config["darkMainFileName"]:
run_number = extract_run_number(reduction_config["darkMainFileName"])
ws_to_remove.append(f"{prefix}_{instrument_name}_{run_number}_raw_histo")
if reduction_config["darkWingFileName"]:
run_number = extract_run_number(reduction_config["darkWingFileName"])
ws_to_remove.append(f"{prefix}_{instrument_name}_{run_number}_raw_histo")
if reduction_config.get("darkMidrangeFileName", None):
run_number = extract_run_number(reduction_config["darkMidrangeFileName"])
ws_to_remove.append(f"{prefix}_{instrument_name}_{run_number}_raw_histo")
for ws_name in ws_to_remove:
if registered_workspace(ws_name):
mtd.remove(ws_name)
# load nexus idf
if use_nexus_idf:
load_params["LoadNexusInstrumentXML"] = use_nexus_idf
# Adjust pixel heights and widths
load_params["pixel_calibration"] = reduction_config.get("usePixelCalibration", False)
# wave length and wave length spread
(
wave_length_dict,
wave_length_spread_dict,
) = meta_data.parse_json_wave_length_and_spread(reduction_input)
if reduction_config["useDefaultMask"]:
# reduction_config["defaultMask"] is a list of python dictionaries
default_mask = reduction_config["defaultMask"] if reduction_config["defaultMask"] is not None else []
else:
default_mask = []
# check for time/log slicing
timeslice = reduction_config["useTimeSlice"]
logslice = reduction_config["useLogSlice"]
if timeslice or logslice:
if len(sample.split(",")) > 1:
logger.error("Can't do slicing on summed data sets")
raise ValueError("Can't do slicing on summed data sets")
# thickness is written to sample log if it is defined...
thickness = reduction_input["sample"]["thickness"]
sample_aperture_diameter = reduction_config["sampleApertureSize"] # in mm
source_aperture_diameter = reduction_config["sourceApertureDiameter"] # in mm
# Parse smearing pixel size x and y for all runs
smearing_pixel_size_x_dict = parse_json_meta_data(
reduction_input,
"smearingPixelSizeX",
1e-3,
beam_center_run=True,
background_run=True,
empty_transmission_run=True,
transmission_run=True,
background_transmission=True,
block_beam_run=True,
dark_current_run=True,
)
smearing_pixel_size_y_dict = parse_json_meta_data(
reduction_input,
"smearingPixelSizeY",
1e-3,
beam_center_run=True,
background_run=True,
empty_transmission_run=True,
transmission_run=True,
background_transmission=True,
block_beam_run=True,
dark_current_run=True,
)
# special loading case for sample to allow the slicing options
logslice_data_dict = {}
# Retrieve parameters for overwriting geometry related meta data
swd_value_dict = parse_json_meta_data(
reduction_input,
"sampleToSi",
1e-3,
beam_center_run=True,
background_run=True,
empty_transmission_run=True,
transmission_run=True,
background_transmission=True,
block_beam_run=True,
dark_current_run=False,
)
# Sample to detector distance
sdd_value_dict = parse_json_meta_data(
reduction_input,
"sampleDetectorDistance",
1.0,
beam_center_run=True,
background_run=True,
empty_transmission_run=True,
transmission_run=True,
background_transmission=True,
block_beam_run=True,
dark_current_run=False,
)
if timeslice or logslice:
ws_name = f"{prefix}_{instrument_name}_{sample}_raw_histo_slice_group"
if not registered_workspace(ws_name):
filename = abspath(sample.strip(), instrument=instrument_name, ipts=ipts, directory=path)
logger.notice(f"Loading filename {filename}")
if timeslice:
timesliceinterval = float(reduction_config["timeSliceInterval"])
timesliceperiod = reduction_config["timeSlicePeriod"]
logslicename = logsliceinterval = None
elif logslice:
timesliceinterval, timesliceperiod = None, None
logslicename = reduction_config["logSliceName"]
logsliceinterval = reduction_config["logSliceInterval"]
biosans.load_and_split(
filename,
output_workspace=ws_name,
time_interval=timesliceinterval,
time_offset=reduction_config["timeSliceOffset"],
time_period=timesliceperiod,
log_name=logslicename,
log_value_interval=logsliceinterval,
sample_to_si_name=SAMPLE_SI_META_NAME,
si_nominal_distance=SI_WINDOW_NOMINAL_DISTANCE_METER,
sample_to_si_value=swd_value_dict[meta_data.SAMPLE],
sample_detector_distance_value=sdd_value_dict[meta_data.SAMPLE],
**load_params,
)
for _w in mtd[ws_name]:
# Overwrite meta data
set_meta_data(
str(_w),
wave_length=wave_length_dict[meta_data.SAMPLE],
wavelength_spread=wave_length_spread_dict[meta_data.SAMPLE],
sample_thickness=thickness,
sample_aperture_diameter=sample_aperture_diameter,
source_aperture_diameter=source_aperture_diameter,
smearing_pixel_size_x=smearing_pixel_size_x_dict[meta_data.SAMPLE],
smearing_pixel_size_y=smearing_pixel_size_y_dict[meta_data.SAMPLE],
)
# Transform X-axis to wave length with spread
_w = transform_to_wavelength(_w)
_w = set_init_uncertainties(_w)
for btp_params in default_mask:
apply_mask(_w, **btp_params)
if logslicename is not None:
for n in range(mtd[ws_name].getNumberOfEntries()):
samplelogs = SampleLogs(mtd[ws_name].getItem(n))
logslice_data_dict[str(n)] = {
"data": list(samplelogs[logslicename].value),
"units": samplelogs[logslicename].units,
"name": logslicename,
}
else:
# Load raw without slicing
ws_name = f"{prefix}_{instrument_name}_{sample}_raw_histo"
if not registered_workspace(ws_name):
# if sample is not an absolute path to nexus file, convert it to the absolute path
filename = abspaths(sample, instrument=instrument_name, ipts=ipts, directory=path)
logger.notice(f"Loading filename {filename} from sample {sample}")
biosans.load_events_and_histogram(
filename,
output_workspace=ws_name,
sample_to_si_name=SAMPLE_SI_META_NAME,
si_nominal_distance=SI_WINDOW_NOMINAL_DISTANCE_METER,
sample_to_si_value=swd_value_dict[meta_data.SAMPLE],
sample_detector_distance_value=sdd_value_dict[meta_data.SAMPLE],
**load_params,
)
# Overwrite meta data
set_meta_data(
ws_name,
wave_length=wave_length_dict[meta_data.SAMPLE],
wavelength_spread=wave_length_spread_dict[meta_data.SAMPLE],
sample_thickness=thickness,
sample_aperture_diameter=sample_aperture_diameter,
source_aperture_diameter=source_aperture_diameter,
smearing_pixel_size_x=smearing_pixel_size_x_dict[meta_data.SAMPLE],
smearing_pixel_size_y=smearing_pixel_size_y_dict[meta_data.SAMPLE],
)
# Re-transform to wave length if overwriting values are specified
if wave_length_dict[meta_data.SAMPLE]:
transform_to_wavelength(ws_name)
# Apply mask
for btp_params in default_mask:
apply_mask(ws_name, **btp_params)
reduction_input["logslice_data"] = logslice_data_dict
# load all other files
# for run_number in [center, bkgd, empty, sample_trans, bkgd_trans, blocked_beam]:
for run_number, run_type in [
(center, meta_data.BEAM_CENTER),
(bkgd, meta_data.BACKGROUND),
(empty, meta_data.EMPTY_TRANSMISSION),
(sample_trans, meta_data.TRANSMISSION),
(bkgd_trans, meta_data.TRANSMISSION_BACKGROUND),
(blocked_beam, meta_data.BLOCK_BEAM),
]:
if run_number:
ws_name = f"{prefix}_{instrument_name}_{run_number}_raw_histo"
if not registered_workspace(ws_name):
filename = abspaths(run_number, instrument=instrument_name, ipts=ipts, directory=path)
logger.notice(f"Loading filename {filename}")
biosans.load_events_and_histogram(
filename,
output_workspace=ws_name,
sample_to_si_name=SAMPLE_SI_META_NAME,
si_nominal_distance=SI_WINDOW_NOMINAL_DISTANCE_METER,
sample_to_si_value=swd_value_dict[run_type],
sample_detector_distance_value=sdd_value_dict[run_type],
**load_params,
)
# Set the wave length and wave length spread
set_meta_data(
ws_name,
wave_length=wave_length_dict[run_type],
wavelength_spread=wave_length_spread_dict[run_type],
sample_thickness=None,
sample_aperture_diameter=None,
source_aperture_diameter=None,
smearing_pixel_size_x=smearing_pixel_size_x_dict[run_type],
smearing_pixel_size_y=smearing_pixel_size_y_dict[run_type],
)
# Re-transform X-axis to wave length with spread due to overwriting wave length
if wave_length_dict[run_type]:
transform_to_wavelength(ws_name)
for btp_params in default_mask:
apply_mask(ws_name, **btp_params)
# load dark current
# step 0: check if mid-range detector is used
if reduction_input["has_midrange_detector"]:
dark_current_file_midrange = reduction_config.get("darkMidrangeFileName", None)
else:
dark_current_file_midrange = "no_midrange_detector"
# step 1: gather main and wing detector dark current
dark_current_file_main = reduction_config.get("darkMainFileName", None)
dark_current_file_wing = reduction_config.get("darkWingFileName", None)
# step 2: logic
# if and only if when none of the dark current files are None, we will load dark current
# otherwise set all dark current to None
if dark_current_file_main and dark_current_file_wing and dark_current_file_midrange:
# main
dark_current_main = dark_current_correction(
dark_current_file_main,
default_mask,
instrument_name,
ipts,
load_params,
path,
prefix,
wave_length_dict[meta_data.DARK_CURRENT],
wave_length_spread_dict[meta_data.DARK_CURRENT],
swd_value_dict[meta_data.DARK_CURRENT],
sdd_value_dict[meta_data.DARK_CURRENT],
smearing_pixel_size_x_dict[meta_data.DARK_CURRENT],
smearing_pixel_size_y_dict[meta_data.DARK_CURRENT],
)
# wing
dark_current_wing = dark_current_correction(
dark_current_file_wing,
default_mask,
instrument_name,
ipts,
load_params,
path,
prefix,
wave_length_dict[meta_data.DARK_CURRENT],
wave_length_spread_dict[meta_data.DARK_CURRENT],
swd_value_dict[meta_data.DARK_CURRENT],
sdd_value_dict[meta_data.DARK_CURRENT],
smearing_pixel_size_x_dict[meta_data.DARK_CURRENT],
smearing_pixel_size_y_dict[meta_data.DARK_CURRENT],
)
# midrange
if dark_current_file_midrange != "no_midrange_detector":
dark_current_midrange = dark_current_correction(
dark_current_file_midrange,
default_mask,
instrument_name,
ipts,
load_params,
path,
prefix,
wave_length_dict[meta_data.DARK_CURRENT],
wave_length_spread_dict[meta_data.DARK_CURRENT],
swd_value_dict[meta_data.DARK_CURRENT],
sdd_value_dict[meta_data.DARK_CURRENT],
smearing_pixel_size_x_dict[meta_data.DARK_CURRENT],
smearing_pixel_size_y_dict[meta_data.DARK_CURRENT],
)
else:
dark_current_midrange = None
else:
dark_current_main = None
dark_current_wing = None
dark_current_midrange = None
# load required processed_files
# step 0: check if mid-range detector is used
if reduction_input["has_midrange_detector"]:
flood_file_midrange = reduction_config.get("sensitivityMidrangeFileName", None)
else:
flood_file_midrange = "no_midrange_detector"
# step 1: gather main and wing detector processed_files
flood_file_main = reduction_config.get("sensitivityMainFileName", None)
flood_file_wing = reduction_config.get("sensitivityWingFileName", None)
# step 2: logic
# if and only if when none of the flood file are None, we will load teh sensitivity
# workspace, otherwise set all sensitivity workspace to None
if flood_file_main and flood_file_wing and flood_file_midrange:
# main
sensitivity_main_ws_name = f"{prefix}_main_sensitivity"
if not registered_workspace(sensitivity_main_ws_name):
logger.notice(f"Loading filename {flood_file_main}")
load_sensitivity_workspace(flood_file_main, output_workspace=sensitivity_main_ws_name)
# wing
sensitivity_wing_ws_name = f"{prefix}_wing_sensitivity"
if not registered_workspace(sensitivity_wing_ws_name):
logger.notice(f"Loading filename {flood_file_wing}")
load_sensitivity_workspace(flood_file_wing, output_workspace=sensitivity_wing_ws_name)
# midrange
if flood_file_midrange != "no_midrange_detector":
sensitivity_midrange_ws_name = f"{prefix}_midrange_sensitivity"
if not registered_workspace(sensitivity_midrange_ws_name):
logger.notice(f"Loading filename {flood_file_midrange}")
load_sensitivity_workspace(flood_file_midrange, output_workspace=sensitivity_midrange_ws_name)
else:
sensitivity_midrange_ws_name = None
else:
sensitivity_main_ws_name = None
sensitivity_wing_ws_name = None
sensitivity_midrange_ws_name = None
mask_ws = None
custom_mask_file = reduction_input["configuration"]["maskFileName"]
if custom_mask_file is not None:
mask_ws_name = f"{prefix}_mask"
if not registered_workspace(mask_ws_name):
logger.notice(f"Loading filename {custom_mask_file}")
mask_ws = load_mask(custom_mask_file, output_workspace=mask_ws_name)
else:
mask_ws = mtd[mask_ws_name]
if registered_workspace(f"{prefix}_{instrument_name}_{sample}_raw_histo_slice_group"):
raw_sample_ws = mtd[f"{prefix}_{instrument_name}_{sample}_raw_histo_slice_group"]
raw_sample_ws_list = [w for w in raw_sample_ws]
else:
raw_sample_ws_list = [mtd[f"{prefix}_{instrument_name}_{sample}_raw_histo"]]
raw_bkgd_ws = mtd[f"{prefix}_{instrument_name}_{bkgd}_raw_histo"] if bkgd else None
raw_blocked_ws = mtd[f"{prefix}_{instrument_name}_{blocked_beam}_raw_histo"] if blocked_beam else None
raw_center_ws = mtd[f"{prefix}_{instrument_name}_{center}_raw_histo"]
raw_empty_ws = mtd[f"{prefix}_{instrument_name}_{empty}_raw_histo"] if empty else None
raw_sample_trans_ws = mtd[f"{prefix}_{instrument_name}_{sample_trans}_raw_histo"] if sample_trans else None
raw_bkg_trans_ws = mtd[f"{prefix}_{instrument_name}_{bkgd_trans}_raw_histo"] if bkgd_trans else None
sensitivity_main_ws = mtd[sensitivity_main_ws_name] if sensitivity_main_ws_name else None
sensitivity_wing_ws = mtd[sensitivity_wing_ws_name] if sensitivity_wing_ws_name else None
sensitivity_midrange_ws = mtd[sensitivity_midrange_ws_name] if sensitivity_midrange_ws_name else None
if debug_output:
for raw_sample in raw_sample_ws_list:
plot_detector(
input_workspace=str(raw_sample),
filename=form_output_name(raw_sample),
backend="mpl",
)
for ws in [
raw_bkgd_ws,
raw_center_ws,
raw_empty_ws,
raw_sample_trans_ws,
raw_bkg_trans_ws,
raw_blocked_ws,
dark_current_main,
dark_current_wing,
dark_current_midrange,
]:
if ws is not None:
plot_detector(
input_workspace=str(ws),
filename=form_output_name(ws),
backend="mpl",
imshow_kwargs={"norm": LogNorm(vmin=1)},
)
return dict(
sample=raw_sample_ws_list,
background=raw_bkgd_ws,
center=raw_center_ws,
empty=raw_empty_ws,
sample_transmission=raw_sample_trans_ws,
background_transmission=raw_bkg_trans_ws,
blocked_beam=raw_blocked_ws,
dark_current_main=dark_current_main,
dark_current_wing=dark_current_wing,
dark_current_midrange=dark_current_midrange,
sensitivity_main=sensitivity_main_ws,
sensitivity_wing=sensitivity_wing_ws,
sensitivity_midrange=sensitivity_midrange_ws,
mask=mask_ws,
)
def check_overlap_stitch_configuration(reduction_input: dict) -> None:
"""Validate the overlap stitch configuration depending on whether the midrange detector is present
If the midrange detector is absent, there are two detectors and one overlap region, i.e. one Qmin/Qmax.
If the midrange detector is present, there are three detectors and two overlap regions, i.e. two Qmin/Qmax.
Parameters
----------
reduction_input
Reduction configuration parameters
Raises
-------
ValueError
If one of the overlap stitch boundary parameters has the wrong number of values.
"""
reduction_config = reduction_input["configuration"]
num_params_overlap_stitch = 1
if reduction_input["has_midrange_detector"] and not reduction_config["overlapStitchIgnoreMidrange"]:
num_params_overlap_stitch = 2
params_overlap_stitch = ["overlapStitchQmin", "overlapStitchQmax"]
if reduction_config["1DQbinType"] == "wedge":
wedge_names = ["wedge1", "wedge2"]
params_overlap_stitch = [wedge + boundary for wedge in wedge_names for boundary in params_overlap_stitch]
for param in params_overlap_stitch:
# these parameters are optional, only check values if not None
if reduction_config[param] and len(reduction_config[param]) != num_params_overlap_stitch:
has_midrange_str = "present" if reduction_input["has_midrange_detector"] else "not present"
raise ValueError(
f'Configuration "{param}" must have length {num_params_overlap_stitch} since midrange '
f"detector {has_midrange_str}"
)
def dark_current_correction(
dark_current_file,
default_mask,
instrument_name,
ipts,
load_params,
path,
prefix,
wavelength,
wavelength_spread_user,
user_sample_si_distance,
user_sample_detector_distance,
smearing_pixel_size_x,
smearing_pixel_size_y,
):
"""Calculate the dark current correction
Parameters
----------
dark_current_file
default_mask
instrument_name
ipts
load_params
path
prefix
wavelength
wavelength_spread_user
user_sample_si_distance: float, None
user specified (overwriting) sample to Si-window distance in unit of meter
user_sample_detector_distance: float, None
user specified (overwriting) sample to detector distance in unit of meter
smearing_pixel_size_x: float, None
user specified smearing pixel size along X-direction
smearing_pixel_size_y: float, None
user specified smearing pixel size along Y-direction
Returns
-------
~mantid.api.MatrixWorkspace
dark current correction workspace
"""
run_number = extract_run_number(dark_current_file)
ws_name = f"{prefix}_{instrument_name}_{run_number}_raw_histo"
if not registered_workspace(ws_name):
logger.notice(f"Loading filename {dark_current_file}")
# identify to use exact given path to NeXus or use OnCat instead
temp_name = abspath(dark_current_file, instrument=instrument_name, ipts=ipts, directory=path)
if os.path.exists(temp_name):
dark_current_file = temp_name
biosans.load_events_and_histogram(
dark_current_file,
output_workspace=ws_name,
sample_to_si_name=SAMPLE_SI_META_NAME,
si_nominal_distance=SI_WINDOW_NOMINAL_DISTANCE_METER,
sample_to_si_value=user_sample_si_distance,
sample_detector_distance_value=user_sample_detector_distance,
**load_params,
)
# Set the wave length and wave length spread
set_meta_data(
ws_name,
wave_length=wavelength,
wavelength_spread=wavelength_spread_user,
sample_thickness=None,
sample_aperture_diameter=None,
source_aperture_diameter=None,
smearing_pixel_size_x=smearing_pixel_size_x,
smearing_pixel_size_y=smearing_pixel_size_y,
)
# Re-Transform X-axis to wave length with spread
if wavelength:
transform_to_wavelength(ws_name)
for btp_params in default_mask:
apply_mask(ws_name, **btp_params)
dark_current = mtd[ws_name]
else:
dark_current = mtd[ws_name]
return dark_current
[docs]
def prepare_data_workspaces(
data,
center_x=None,
center_y=None,
center_y_wing=None,
center_y_midrange=None,
dark_current=None,
flux_method=None, # normalization (time/monitor)
monitor_fail_switch=False,
mask_ws=None, # apply a custom mask from workspace
mask_detector=None, # main and/or wing and/or midrange
mask_panel=None, # mask back or front panel
mask_btp=None, # mask bank/tube/pixel
solid_angle=True,
sensitivity_workspace=None,
output_workspace=None,
):
r"""
Given a " raw"data workspace, this function provides the following:
- centers the detector
- subtracts dark current
- normalize by time or monitor
- applies masks
- corrects for solid angle
- corrects for sensitivity
All steps are optional. data, mask_ws, dark_current are either None
or histogram workspaces. This function does not load any file.
Parameters
----------
data: ~mantid.dataobjects.Workspace2D
raw workspace (histogram)
center_x: float
Move the center of the detector to this X-coordinate. If :py:obj:`None`, the
detector will be moved such that the X-coordinate of the intersection
point between the neutron beam and the detector array will have ``x=0``.
center_y: float
Move the center of the detector to this Y-coordinate. If :py:obj:`None`, the
detector will be moved such that the Y-coordinate of the intersection
point between the neutron beam and the detector array will have ``y=0``.
center_y_wing: float
Move the center r of the detector to this Y-coordinate. If :py:obj:`None`, the
detector will be moved such that the Y-coordinate of the intersection
point between the neutron beam and the detector array will have ``y=0``.
center_y_midrange: float
Move the center of the detector vertically to this Y-coordinate. If :py:obj:`None`, the
detector will be moved such that the Y-coordinate of the intersection
point between the neutron beam and the detector array will have ``y=0``.
dark_current: ~mantid.dataobjects.Workspace2D
histogram workspace containing the dark current measurement
flux_method: str
Method for flux normalization. Either 'monitor', or 'time'.
monitor_fail_switch: bool
Resort to normalization by 'time' if 'monitor' was selected but no monitor counts are available
mask_ws: ~mantid.dataobjects.Workspace2D
Mask workspace
mask_detector: str, list
Name of one or more instrument components to mask: (e.g `detector1,wing_detector`)
mask_panel: str
Either 'front' or 'back' to mask whole front or back panel.
mask_btp: dict
Additional properties to Mantid's MaskBTP algorithm
solid_angle: bool
Apply the solid angle correction
sensitivity_workspace: str, ~mantid.api.MatrixWorkspace
workspace containing previously calculated sensitivity correction. This
overrides the sensitivity_filename if both are provided.
output_workspace: str
The output workspace name. If None will create data.name()+output_suffix
Returns
-------
~mantid.dataobjects.Workspace2D
Reference to the processed workspace
"""
if not output_workspace:
output_workspace = str(data)
output_workspace = output_workspace.replace("_raw_histo", "") + "_processed_histo"
mtd[str(data)].clone(OutputWorkspace=output_workspace) # name gets into workspace
if center_x is not None and center_y is not None and center_y_wing is not None:
# check whether:
# (1) there is no midrange detector information provided or
# (2) it is provided and the center_y_midrange should be there
if (not has_midrange_detector(output_workspace)) or (
has_midrange_detector(output_workspace) and center_y_midrange is not None
):
biosans.center_detector(
output_workspace,
center_x=center_x,
center_y=center_y,
center_y_wing=center_y_wing,
center_y_midrange=center_y_midrange,
)
# Dark current
if dark_current is not None:
subtract_dark_current(output_workspace, dark_current)
# Normalization
if str(flux_method).lower() == "monitor":
try:
normalize_by_monitor(output_workspace)
except RuntimeError as e:
if monitor_fail_switch:
logger.warning(f"{e}. Resorting to normalization by time")
normalize_by_time(output_workspace)
else:
msg = (
'. Setting "normalizationResortToTime": True will cause the'
" reduction to normalize by time if monitor counts are not available"
)
raise RuntimeError(str(e) + msg)
elif str(flux_method).lower() == "time":
normalize_by_time(output_workspace)
else:
logger.notice("No time or monitor normalization is carried out")
# Mask either detector
if mask_detector is not None:
MaskDetectors(output_workspace, ComponentList=mask_detector)
# Additional masks
if mask_btp is None:
mask_btp = dict()
apply_mask(output_workspace, panel=mask_panel, mask=mask_ws, **mask_btp)
# Solid angle
if solid_angle:
solid_angle_correction(output_workspace)
# Sensitivity
if sensitivity_workspace is not None:
apply_sensitivity_correction(output_workspace, sensitivity_workspace=sensitivity_workspace)
return mtd[output_workspace]
[docs]
def process_single_configuration(
sample_ws_raw: Union[str, Workspace2D],
sample_trans_ws: Workspace2D = None,
sample_trans_value: float = None,
bkg_ws_raw: Workspace2D = None,
bkg_trans_ws: Workspace2D = None,
bkg_trans_value: float = None,
blocked_ws_raw: Workspace2D = None,
theta_dependent_transmission: bool = True,
center_x: float = None,
center_y: float = None,
center_y_wing: float = None,
center_y_midrange: float = None,
dark_current: Workspace2D = None,
flux_method: str = None,
monitor_fail_switch: bool = False,
mask_ws: Workspace2D = None,
mask_detector: Union[str, List] = None,
mask_panel: str = None,
mask_btp: dict = None,
solid_angle: bool = True,
sensitivity_workspace: Workspace2D = None,
thickness: float = 1.0,
absolute_scale_method: str = "standard",
absolute_scale: float = 1.0,
keep_processed_workspaces: bool = True,
output_workspace: str = None,
output_prefix: str = "",
):
r"""
This function provides full data processing for a single experimental configuration,
starting from workspaces (no data loading is happening inside this function)
Steps applied to the sample and background workspaces:
1. centering the detector
2. dark current subtraction
3. normalization by time or monitor
4. masking
5. solid angle correction
6. sensitivity correction
7. transmission correction
Additional steps applied to the sample workspace:
1. background subtraction
2. thickness correction
3. absolute scaling
Parameters
----------
sample_ws_raw
raw data histogram workspace, or a string with the workspace name
sample_trans_ws
optional histogram workspace for sample transmission
sample_trans_value
optional value for sample transmission
bkg_ws_raw
optional raw histogram workspace for background
bkg_trans_ws
optional histogram workspace for background transmission
bkg_trans_value
optional value for background transmission
blocked_ws_raw:
optional histogram workspace for blocked beam
theta_dependent_transmission: bool
flag to apply angle dependent transmission
center_x
x center for the beam
center_y
y center for the beam
center_y_wing
y center for the wing detector
center_y_midrange
y center for the midrange detector
dark_current
dark current workspace
flux_method
normalization by 'time' or 'monitor'
monitor_fail_switch
resort to normalization by 'time' if 'monitor' was selected but no monitor counts are available
mask_ws
user-defined mask
mask_detector
Name of one or more instrument components to mask: (e.g `wing_detector,midrange_detector`)
mask_panel
mask front or back panel of tubes
mask_btp
optional bank, tube, pixel to mask. These are options passed as optional arguments to Mantid algorithm MaskBTP
solid_angle
flag to apply solid angle
sensitivity_workspace
workspace containing sensitivity
thickness
sample thickness (cm)
absolute_scale_method
method to do absolute scaling. One of ["standard", "direct_beam"]
absolute_scale
absolute scaling value for standard method
keep_processed_workspaces
flag to keep the processed blocked beam and background workspaces
output_workspace
output workspace name for the sample. If ``None``, the name will be ``output_suffix + "_sample"``
output_prefix
prefix for output workspaces (``output_suffix + "_sample"``, ``output_suffix + "_blocked"``, and
``output_suffix + "_background"``)
Returns
-------
~mantid.dataobjects.Workspace2D
Reference to the processed workspace
"""
if not output_workspace:
output_workspace = output_prefix + "_sample"
# create a common configuration for prepare data
prepare_data_conf = {
"center_x": center_x,
"center_y": center_y,
"center_y_wing": center_y_wing,
"center_y_midrange": center_y_midrange,
"dark_current": dark_current,
"flux_method": flux_method,
"monitor_fail_switch": monitor_fail_switch,
"mask_ws": mask_ws,
"mask_detector": mask_detector,
"mask_panel": mask_panel,
"mask_btp": mask_btp,
"solid_angle": solid_angle,
"sensitivity_workspace": sensitivity_workspace,
}
# process the optional histogram workspace for blocked beam
if blocked_ws_raw:
blocked_ws_name = output_prefix + "_blocked"
if not registered_workspace(blocked_ws_name):
blocked_ws = prepare_data_workspaces(blocked_ws_raw, output_workspace=blocked_ws_name, **prepare_data_conf)
else:
blocked_ws = mtd[blocked_ws_name]
# process sample
sample_ws = prepare_data_workspaces(sample_ws_raw, output_workspace=output_workspace, **prepare_data_conf)
if blocked_ws_raw:
sample_ws = subtract_background(sample_ws, blocked_ws)
# apply transmission to the sample
transmission_dict = {}
if sample_trans_ws or sample_trans_value:
if sample_trans_value:
transmission_dict = {"value": float(sample_trans_value), "error": ""}
else:
transmission_dict = {
"value": sample_trans_ws.extractY(),
"error": sample_trans_ws.extractE(),
}
sample_ws = apply_transmission_correction(
sample_ws,
trans_workspace=sample_trans_ws,
trans_value=sample_trans_value,
theta_dependent=theta_dependent_transmission,
output_workspace=output_workspace,
)
# process background, if not already processed
background_transmission_dict = {}
if bkg_ws_raw:
bkgd_ws_name = output_prefix + "_background"
if not registered_workspace(bkgd_ws_name):
bkgd_ws = prepare_data_workspaces(bkg_ws_raw, output_workspace=bkgd_ws_name, **prepare_data_conf)
if blocked_ws_raw:
bkgd_ws = subtract_background(bkgd_ws, blocked_ws)
# apply transmission to bkgd
if bkg_trans_ws or bkg_trans_value:
if bkg_trans_value:
background_transmission_dict = {
"value": float(bkg_trans_value),
"error": "",
}
else:
background_transmission_dict = {
"value": bkg_trans_ws.extractY(),
"error": bkg_trans_ws.extractE(),
}
bkgd_ws = apply_transmission_correction(
bkgd_ws,
trans_workspace=bkg_trans_ws,
trans_value=bkg_trans_value,
theta_dependent=theta_dependent_transmission,
output_workspace=bkgd_ws_name,
)
else:
bkgd_ws = mtd[bkgd_ws_name]
# subtract background
sample_ws = subtract_background(sample_ws, bkgd_ws, output_workspace=output_workspace)
if not keep_processed_workspaces:
bkgd_ws.delete()
if blocked_ws_raw and not keep_processed_workspaces:
blocked_ws.delete()
# finalize with absolute scale and thickness
sample_ws = normalize_by_thickness(sample_ws, thickness)
# standard method assumes absolute scale from outside
if absolute_scale_method == "direct_beam":
raise NotImplementedError("This method is not yet implemented for BIOSANS")
else:
sample_ws *= absolute_scale
return mtd[str(sample_ws)], {
"sample": transmission_dict,
"background": background_transmission_dict,
}
[docs]
def plot_reduction_output(
reduction_output: list,
reduction_input: dict,
loglog: bool = True,
imshow_kwargs: dict = {},
):
"""
Generate reduction plot per slice per detector.
Parameters
----------
reduction_output:
List of 1D and 2D I(Q) profile objects, for each detector panel
reduction_input:.
Dictionary of all possible options to properly configure the data reduction workflow.
loglog:
Whether to plot in loglog scale, by default True.
imshow_kwargs:
Keyword arguments to pass to imshow, by default {}.
"""
reduction_config = reduction_input["configuration"]
output_dir = reduction_config["outputDir"]
outputFilename = reduction_input["outputFileName"]
output_suffix = ""
bin1d_type = reduction_config["1DQbinType"]
imshow_kwargs = {} if imshow_kwargs is None else imshow_kwargs
has_midrange_detector = reduction_input.get("has_midrange_detector", False)
for i, out in enumerate(reduction_output):
if len(reduction_output) > 1:
output_suffix = f"_{i}"
wedges = reduction_config["wedges"] if bin1d_type == "wedge" else None
symmetric_wedges = reduction_config.get("symmetric_wedges", True)
qmin_main = reduction_config["QminMain"]
qmax_main = reduction_config["QmaxMain"]
qmin_wing = reduction_config["QminWing"]
qmax_wing = reduction_config["QmaxWing"]
# special case for mid-range detector
if has_midrange_detector:
qmin_midrange = reduction_config["QminMidrange"]
qmax_midrange = reduction_config["QmaxMidrange"]
# NOTE: due to the api design in drtsans, pytest monkey patching does
# not work with the standard import, therefore we need to use
# full path import here to make unit test work.
plot_IQazimuthal = drtsans.plots.api.plot_IQazimuthal
plot_IQmod = drtsans.plots.api.plot_IQmod
allow_overwrite = drtsans.path.allow_overwrite
# main detector
filename = os.path.join(output_dir, "2D", f"{outputFilename}{output_suffix}_2D_main.png")
plot_IQazimuthal(
out.I2D_main,
filename,
backend="mpl",
imshow_kwargs=imshow_kwargs,
title="Main",
wedges=wedges,
symmetric_wedges=symmetric_wedges,
qmin=qmin_main,
qmax=qmax_main,
)
plt.clf()
# wing detector
filename = os.path.join(output_dir, "2D", f"{outputFilename}{output_suffix}_2D_wing.png")
plot_IQazimuthal(
out.I2D_wing,
filename,
backend="mpl",
imshow_kwargs=imshow_kwargs,
title="Wing",
wedges=wedges,
symmetric_wedges=symmetric_wedges,
qmin=qmin_wing,
qmax=qmax_wing,
)
plt.clf()
# special case for mid-range detector
if has_midrange_detector:
filename = os.path.join(output_dir, "2D", f"{outputFilename}{output_suffix}_2D_midrange.png")
plot_IQazimuthal(
out.I2D_midrange,
filename,
backend="mpl",
imshow_kwargs=imshow_kwargs,
title="Midrange",
wedges=wedges,
symmetric_wedges=symmetric_wedges,
qmin=qmin_midrange,
qmax=qmax_midrange,
)
plt.clf()
for j in range(len(out.I1D_main)):
add_suffix = ""
if len(out.I1D_main) > 1:
add_suffix = f"_wedge_{j}"
filename = os.path.join(output_dir, "1D", f"{outputFilename}{output_suffix}_1D{add_suffix}.png")
if has_midrange_detector:
plot_IQmod(
[out.I1D_main[j], out.I1D_wing[j], out.I1D_midrange[j], out.I1D_combined[j]],
filename,
loglog=loglog,
backend="mpl",
errorbar_kwargs={"label": "main,wing,midrange,both"},
)
else:
plot_IQmod(
[out.I1D_main[j], out.I1D_wing[j], out.I1D_combined[j]],
filename,
loglog=loglog,
backend="mpl",
errorbar_kwargs={"label": "main,wing,both"},
)
plt.clf()
plt.close()
# allow overwrite
allow_overwrite(os.path.join(output_dir, "1D"))
allow_overwrite(os.path.join(output_dir, "2D"))
[docs]
def reduce_single_configuration(
loaded_ws: dict,
reduction_input: dict,
prefix: str = "",
skip_nan: bool = True,
debug_output: bool = False,
) -> list:
"""Do data reduction for the loaded workspaces according to the given reduction configuration
Saves to file the resulting I(Q) profiles in ASCII format as well as a reduction log.
Parameters
----------
loaded_ws
Dictionary with loaded workspaces (e.g. background, sensitivity, etc.) to use in the reduction
reduction_input
Reduction configuration parameters
prefix
String to prepend to output file names
skip_nan
If True, data points with value NaN will be ignored when writing I(Q) profiles to file.
debug_output
If True, saves 2D plots representative of the workspace at different stages of the reduction
Returns
-------
list
List of tuple of I(Q):s, one tuple per sample in the workspace. Each tuple holds I(Q):s with 1D and 2D binning
for the individual detector panels, as well as a stitched 1D I(Q) profile.
"""
reduction_config = reduction_input["configuration"]
flux_method = reduction_config["normalization"]
monitor_fail_switch = reduction_config["normalizationResortToTime"]
transmission_radius = reduction_config["mmRadiusForTransmission"]
solid_angle = reduction_config["useSolidAngleCorrection"]
sample_trans_value = reduction_input["sample"]["transmission"]["value"]
bkg_trans_value = reduction_input["background"]["transmission"]["value"]
theta_dependent_transmission = reduction_config["useThetaDepTransCorrection"]
mask_panel = None
if reduction_config["useMaskBackTubes"] is True:
mask_panel = "back"
output_suffix = ""
thickness = reduction_input["sample"]["thickness"] # default thickness set in BIOSANS.json schema
absolute_scale_method = reduction_config["absoluteScaleMethod"]
absolute_scale = reduction_config["StandardAbsoluteScale"]
time_slice_transmission = reduction_config["useTimeSlice"] and reduction_config["useTimeSliceTransmission"]
output_dir = reduction_config["outputDir"]
nxbins_main = reduction_config["numMainQxQyBins"]
nybins_main = nxbins_main
nxbins_wing = reduction_config["numWingQxQyBins"]
nybins_wing = nxbins_wing
nxbins_midrange = reduction_config["numMidrangeQxQyBins"]
nybins_midrange = nxbins_midrange
bin1d_type = reduction_config["1DQbinType"]
log_binning = reduction_config["QbinType"] == "log"
decade_on_center = reduction_config["useLogQBinsDecadeCenter"] # default set in the schema
nbins_main = reduction_config["numMainQBins"]
nbins_main_per_decade = reduction_config["LogQBinsPerDecadeMain"]
nbins_wing = reduction_config["numWingQBins"]
nbins_wing_per_decade = reduction_config["LogQBinsPerDecadeWing"]
nbins_midrange = reduction_config["numMidrangeQBins"]
nbins_midrange_per_decade = reduction_config["LogQBinsPerDecadeMidrange"]
outputFilename = reduction_input["outputFileName"]
weighted_errors = reduction_config["useErrorWeighting"]
qmin_main = reduction_config["QminMain"]
qmax_main = reduction_config["QmaxMain"]
qmin_wing = reduction_config["QminWing"]
qmax_wing = reduction_config["QmaxWing"]
qmin_midrange = reduction_config["QminMidrange"]
qmax_midrange = reduction_config["QmaxMidrange"]
wedge1_qmin_main = reduction_config["wedge1QminMain"]
wedge1_qmax_main = reduction_config["wedge1QmaxMain"]
wedge1_qmin_wing = reduction_config["wedge1QminWing"]
wedge1_qmax_wing = reduction_config["wedge1QmaxWing"]
wedge1_qmin_midrange = reduction_config["wedge1QminMidrange"]
wedge1_qmax_midrange = reduction_config["wedge1QmaxMidrange"]
wedge2_qmin_main = reduction_config["wedge2QminMain"]
wedge2_qmax_main = reduction_config["wedge2QmaxMain"]
wedge2_qmin_wing = reduction_config["wedge2QminWing"]
wedge2_qmax_wing = reduction_config["wedge2QmaxWing"]
wedge2_qmin_midrange = reduction_config["wedge2QminMidrange"]
wedge2_qmax_midrange = reduction_config["wedge2QmaxMidrange"]
annular_bin = reduction_config["AnnularAngleBin"]
wedges_min = reduction_config["WedgeMinAngles"]
wedges_max = reduction_config["WedgeMaxAngles"]
wedges = None if wedges_min is None or wedges_max is None else list(zip(wedges_min, wedges_max))
# automatically determine wedge binning if it wasn't explicitly set
autoWedgeOpts = {}
symmetric_wedges = True
if bin1d_type == "wedge" and wedges_min is None:
# the JSON validator "wedgesources" guarantees that the parameters to be collected are all non-empty
autoWedgeOpts = {
"q_min": reduction_config["autoWedgeQmin"],
"q_delta": reduction_config["autoWedgeQdelta"],
"q_max": reduction_config["autoWedgeQmax"],
"azimuthal_delta": reduction_config["autoWedgeAzimuthalDelta"],
"peak_width": reduction_config["autoWedgePeakWidth"],
"background_width": reduction_config["autoWedgeBackgroundWidth"],
"signal_to_noise_min": reduction_config["autoWedgeSignalToNoiseMin"],
}
# auto-aniso returns all of the wedges
symmetric_wedges = False
fbc_options = biosans.fbc_options_json(reduction_input)
xc, yc, yw, ym, fit_results = biosans.find_beam_center(loaded_ws.center, **fbc_options)
logger.notice(f"Find beam center = {xc}, {yc}, {yw}, {ym}")
# does the run include the midrange detector? check the geometry of the first sample
reduction_config["has_midrange_detector"] = has_midrange_detector(loaded_ws.sample[0])
all_detectors_except_main = ["wing_detector"]
all_detectors_except_wing = ["detector1"]
if reduction_config["has_midrange_detector"]:
all_detectors_except_main.append("midrange_detector")
all_detectors_except_wing.append("midrange_detector")
# empty beam transmission workspace
if loaded_ws.empty is not None:
empty_trans_ws_name = f"{prefix}_empty"
empty_trans_ws = prepare_data_workspaces(
loaded_ws.empty,
flux_method=flux_method,
monitor_fail_switch=monitor_fail_switch,
mask_detector=all_detectors_except_main,
center_x=xc,
center_y=yc,
center_y_wing=yw,
center_y_midrange=ym,
solid_angle=False,
sensitivity_workspace=loaded_ws.sensitivity_main,
output_workspace=empty_trans_ws_name,
)
if debug_output:
plot_detector(
empty_trans_ws,
form_output_name(empty_trans_ws),
backend="mpl",
imshow_kwargs={"norm": LogNorm(vmin=1)},
)
else:
empty_trans_ws = None
# background transmission
if loaded_ws.background_transmission is not None and empty_trans_ws is not None:
bkgd_trans_ws_name = f"{prefix}_bkgd_trans"
bkgd_trans_ws_processed = prepare_data_workspaces(
loaded_ws.background_transmission,
flux_method=flux_method,
monitor_fail_switch=monitor_fail_switch,
mask_detector=all_detectors_except_main,
center_x=xc,
center_y=yc,
center_y_wing=yw,
center_y_midrange=ym,
solid_angle=False,
sensitivity_workspace=loaded_ws.sensitivity_main,
output_workspace=bkgd_trans_ws_name,
)
bkgd_trans_ws = calculate_transmission(
bkgd_trans_ws_processed,
empty_trans_ws,
radius=transmission_radius,
radius_unit="mm",
)
logger.notice(f"Background transmission = {bkgd_trans_ws.extractY()[0, 0]}")
if debug_output:
plot_detector(
bkgd_trans_ws_processed,
form_output_name(bkgd_trans_ws_processed),
backend="mpl",
imshow_kwargs={"norm": LogNorm(vmin=1)},
)
else:
bkgd_trans_ws = None
# sample transmission
def _prepare_sample_transmission_ws(_sample_transmission):
"""
inline function that prepare the sample transmission workspace for
normalization usage
"""
_ws_processed = prepare_data_workspaces(
_sample_transmission,
flux_method=flux_method,
monitor_fail_switch=monitor_fail_switch,
mask_detector=all_detectors_except_main,
center_x=xc,
center_y=yc,
center_y_wing=yw,
center_y_midrange=ym,
solid_angle=False,
sensitivity_workspace=loaded_ws.sensitivity_main,
output_workspace=f"{prefix}_sample_trans",
)
return _ws_processed, calculate_transmission(
_ws_processed,
empty_trans_ws,
radius=transmission_radius,
radius_unit="mm",
)
sample_trans_ws = None
if loaded_ws.sample_transmission is not None and empty_trans_ws is not None:
sample_trans_ws_processed, sample_trans_ws = _prepare_sample_transmission_ws(loaded_ws.sample_transmission)
logger.notice(f"Sample transmission = {sample_trans_ws.extractY()[0, 0]}")
if debug_output:
plot_detector(
sample_trans_ws_processed,
form_output_name(sample_trans_ws_processed),
backend="mpl",
imshow_kwargs={"norm": LogNorm(vmin=1)},
)
output = []
detectordata = {}
for i, raw_sample_ws in enumerate(loaded_ws.sample):
sample_name = "_slice_{}".format(i + 1)
if len(loaded_ws.sample) > 1:
output_suffix = f"_{i}"
if time_slice_transmission:
_, sample_trans_ws = _prepare_sample_transmission_ws(raw_sample_ws)
processed_data_main, trans_main = process_single_configuration(
raw_sample_ws,
sample_trans_ws=sample_trans_ws,
sample_trans_value=sample_trans_value,
bkg_ws_raw=loaded_ws.background,
bkg_trans_ws=bkgd_trans_ws,
bkg_trans_value=bkg_trans_value,
blocked_ws_raw=loaded_ws.blocked_beam,
theta_dependent_transmission=theta_dependent_transmission,
center_x=xc,
center_y=yc,
center_y_wing=yw,
center_y_midrange=ym,
dark_current=loaded_ws.dark_current_main,
flux_method=flux_method,
monitor_fail_switch=monitor_fail_switch,
mask_detector=all_detectors_except_main,
mask_ws=loaded_ws.mask,
mask_panel=mask_panel,
solid_angle=solid_angle,
sensitivity_workspace=loaded_ws.sensitivity_main,
output_workspace=f"processed_data_main_{i}",
output_prefix=output_suffix,
thickness=thickness,
absolute_scale_method=absolute_scale_method,
absolute_scale=absolute_scale,
keep_processed_workspaces=False,
)
processed_data_wing, trans_wing = process_single_configuration(
raw_sample_ws,
sample_trans_ws=sample_trans_ws,
sample_trans_value=sample_trans_value,
bkg_ws_raw=loaded_ws.background,
bkg_trans_ws=bkgd_trans_ws,
bkg_trans_value=bkg_trans_value,
blocked_ws_raw=loaded_ws.blocked_beam,
theta_dependent_transmission=theta_dependent_transmission,
center_x=xc,
center_y=yc,
center_y_wing=yw,
center_y_midrange=ym,
dark_current=loaded_ws.dark_current_wing,
flux_method=flux_method,
monitor_fail_switch=monitor_fail_switch,
mask_detector=all_detectors_except_wing,
mask_ws=loaded_ws.mask,
mask_panel=mask_panel,
solid_angle=solid_angle,
sensitivity_workspace=loaded_ws.sensitivity_wing,
output_workspace=f"processed_data_wing_{i}",
output_prefix=output_suffix,
thickness=thickness,
absolute_scale_method=absolute_scale_method,
absolute_scale=absolute_scale,
keep_processed_workspaces=False,
)
if reduction_config["has_midrange_detector"]:
processed_data_midrange, trans_midrange = process_single_configuration(
raw_sample_ws,
sample_trans_ws=sample_trans_ws,
sample_trans_value=sample_trans_value,
bkg_ws_raw=loaded_ws.background,
bkg_trans_ws=bkgd_trans_ws,
bkg_trans_value=bkg_trans_value,
blocked_ws_raw=loaded_ws.blocked_beam,
theta_dependent_transmission=theta_dependent_transmission,
center_x=xc,
center_y=yc,
center_y_wing=yw,
center_y_midrange=ym,
dark_current=loaded_ws.dark_current_midrange,
flux_method=flux_method,
monitor_fail_switch=monitor_fail_switch,
mask_detector=["detector1", "wing_detector"],
mask_ws=loaded_ws.mask,
mask_panel=mask_panel,
solid_angle=solid_angle,
sensitivity_workspace=loaded_ws.sensitivity_midrange,
output_workspace=f"processed_data_midrange_{i}",
output_prefix=output_suffix,
thickness=thickness,
absolute_scale_method=absolute_scale_method,
absolute_scale=absolute_scale,
keep_processed_workspaces=False,
)
else:
processed_data_midrange, trans_midrange = (
None,
{
"sample": None,
"background": None,
},
)
if debug_output:
from mantid.simpleapi import SaveNexusProcessed
main_name = f'{form_output_name(processed_data_main).split(".")[0]}.nxs'
wing_name = f'{form_output_name(processed_data_wing).split(".")[0]}.nxs'
SaveNexusProcessed(InputWorkspace=processed_data_main, Filename=main_name)
SaveNexusProcessed(InputWorkspace=processed_data_wing, Filename=wing_name)
plot_detector(
processed_data_main,
form_output_name(processed_data_main),
backend="mpl",
) # imshow_kwargs={'norm': LogNorm(vmin=1)})
# FIXME - using LogNorm option results in exception from matplotlib as some negative value detected
plot_detector(
processed_data_wing,
form_output_name(processed_data_wing),
backend="mpl",
) # , imshow_kwargs={'norm': LogNorm(vmin=1)})
if reduction_config["has_midrange_detector"]:
midrange_name = f'{form_output_name(processed_data_midrange).split(".")[0]}.nxs'
SaveNexusProcessed(InputWorkspace=processed_data_midrange, Filename=midrange_name)
plot_detector(
processed_data_midrange,
form_output_name(processed_data_midrange),
backend="mpl",
)
_loginfo = f"Transmission (main detector)@{str(raw_sample_ws)}: {trans_main}\n"
_loginfo += f"Transmission (midrange detector)@{str(raw_sample_ws)}: {trans_midrange}\n"
_loginfo += f"Transmission (wing detector)@{str(raw_sample_ws)}: {trans_wing}"
logger.notice(_loginfo)
# binning
subpixel_kwargs = dict()
if reduction_config["useSubpixels"] is True:
subpixel_kwargs = {
"n_horizontal": reduction_config["subpixelsX"],
"n_vertical": reduction_config["subpixelsY"],
}
iq1d_main_in = biosans.convert_to_q(processed_data_main, mode="scalar", **subpixel_kwargs)
iq2d_main_in = biosans.convert_to_q(processed_data_main, mode="azimuthal", **subpixel_kwargs)
if bool(autoWedgeOpts): # determine wedges automatically from the main detector
logger.notice(f"Auto wedge options: {autoWedgeOpts}")
autoWedgeOpts["debug_dir"] = output_dir
wedges = getWedgeSelection(iq2d_main_in, **autoWedgeOpts)
logger.notice("found wedge angles:")
peak_wedge, back_wedge = wedges
logger.notice(" peak: ", peak_wedge)
logger.notice(" background:", back_wedge)
del peak_wedge, back_wedge
logger.notice(f"wedges: {wedges}")
# set the found wedge values to the reduction input, this will allow correct plotting
reduction_config["wedges"] = wedges
reduction_config["symmetric_wedges"] = symmetric_wedges
iq2d_main_out, iq1d_main_out = bin_all(
iq2d_main_in,
iq1d_main_in,
nxbins_main,
nybins_main,
n1dbins=nbins_main,
n1dbins_per_decade=nbins_main_per_decade,
decade_on_center=decade_on_center,
bin1d_type=bin1d_type,
log_scale=log_binning,
qmin=qmin_main,
qmax=qmax_main,
wedge1_qmin=wedge1_qmin_main,
wedge1_qmax=wedge1_qmax_main,
wedge2_qmin=wedge2_qmin_main,
wedge2_qmax=wedge2_qmax_main,
annular_angle_bin=annular_bin,
wedges=wedges,
symmetric_wedges=symmetric_wedges,
error_weighted=weighted_errors,
)
iq1d_wing_in = biosans.convert_to_q(processed_data_wing, mode="scalar")
iq2d_wing_in = biosans.convert_to_q(processed_data_wing, mode="azimuthal")
iq2d_wing_out, iq1d_wing_out = bin_all(
iq2d_wing_in,
iq1d_wing_in,
nxbins_wing,
nybins_wing,
n1dbins=nbins_wing,
n1dbins_per_decade=nbins_wing_per_decade,
decade_on_center=decade_on_center,
bin1d_type=bin1d_type,
log_scale=log_binning,
qmin=qmin_wing,
qmax=qmax_wing,
wedge1_qmin=wedge1_qmin_wing,
wedge1_qmax=wedge1_qmax_wing,
wedge2_qmin=wedge2_qmin_wing,
wedge2_qmax=wedge2_qmax_wing,
annular_angle_bin=annular_bin,
wedges=wedges,
symmetric_wedges=symmetric_wedges,
error_weighted=weighted_errors,
)
if reduction_config["has_midrange_detector"]:
iq1d_midrange_in = biosans.convert_to_q(processed_data_midrange, mode="scalar")
iq2d_midrange_in = biosans.convert_to_q(processed_data_midrange, mode="azimuthal")
iq2d_midrange_out, iq1d_midrange_out = bin_all(
iq2d_midrange_in,
iq1d_midrange_in,
nxbins_midrange,
nybins_midrange,
n1dbins=nbins_midrange,
n1dbins_per_decade=nbins_midrange_per_decade,
decade_on_center=decade_on_center,
bin1d_type=bin1d_type,
log_scale=log_binning,
qmin=qmin_midrange,
qmax=qmax_midrange,
wedge1_qmin=wedge1_qmin_midrange,
wedge1_qmax=wedge1_qmax_midrange,
wedge2_qmin=wedge2_qmin_midrange,
wedge2_qmax=wedge2_qmax_midrange,
annular_angle_bin=annular_bin,
wedges=wedges,
symmetric_wedges=symmetric_wedges,
error_weighted=weighted_errors,
)
else:
iq2d_midrange_out, iq1d_midrange_out = (IQazimuthal(intensity=[], error=[], qx=[], qy=[]), [])
# create output directories
create_output_dir(output_dir)
# save ASCII files
filename = os.path.join(output_dir, "2D", f"{outputFilename}{output_suffix}_2D_main.dat")
save_ascii_binned_2D(filename, "I(Qx,Qy)", iq2d_main_out)
filename = os.path.join(output_dir, "2D", f"{outputFilename}{output_suffix}_2D_wing.dat")
save_ascii_binned_2D(filename, "I(Qx,Qy)", iq2d_wing_out)
filename = os.path.join(output_dir, "2D", f"{outputFilename}{output_suffix}_2D_midrange.dat")
save_ascii_binned_2D(filename, "I(Qx,Qy)", iq2d_midrange_out)
# intensity profiles in the order of stitching (lower to higher Q)
if not reduction_config["has_midrange_detector"] or reduction_config["overlapStitchIgnoreMidrange"]:
iq1d_profiles_in = [iq1d_main_in, iq1d_wing_in]
iq1d_profiles_out = [iq1d_main_out, iq1d_wing_out]
else:
iq1d_profiles_in = [iq1d_main_in, iq1d_midrange_in, iq1d_wing_in]
iq1d_profiles_out = [iq1d_main_out, iq1d_midrange_out, iq1d_wing_out]
iq1d_combined_out = stitch_binned_profiles(iq1d_profiles_in, iq1d_profiles_out, reduction_config)
save_iqmod_all(
iq1d_main_out,
iq1d_midrange_out,
iq1d_wing_out,
iq1d_combined_out,
outputFilename,
output_dir,
output_suffix,
skip_nan,
)
IofQ_output = namedtuple(
"IofQ_output",
["I2D_main", "I2D_midrange", "I2D_wing", "I1D_main", "I1D_midrange", "I1D_wing", "I1D_combined"],
)
current_output = IofQ_output(
I2D_main=iq2d_main_out,
I2D_midrange=iq2d_midrange_out,
I2D_wing=iq2d_wing_out,
I1D_main=iq1d_main_out,
I1D_midrange=iq1d_midrange_out,
I1D_wing=iq1d_wing_out,
I1D_combined=iq1d_combined_out,
)
output.append(current_output)
detectordata[sample_name] = get_sample_detectordata(
iq1d_main_out,
iq1d_midrange_out,
iq1d_wing_out,
iq1d_combined_out,
iq2d_main_out,
iq2d_midrange_out,
iq2d_wing_out,
reduction_config["has_midrange_detector"],
)
# save reduction log
filename = os.path.join(
reduction_config["outputDir"],
outputFilename + f"_reduction_log{output_suffix}.hdf",
)
starttime = datetime.now().isoformat()
reductionparams = {"data": copy.deepcopy(reduction_input)}
specialparameters = {
"beam_center": {"x": xc, "y": yc, "y_midrange": ym, "y_wing": yw},
"fit results": fit_results,
"sample_transmission": {
"main": trans_main["sample"],
"midrange": trans_midrange["sample"],
"wing": trans_wing["sample"],
},
"background_transmission": {
"main": trans_main["background"],
"midrange": trans_midrange["background"],
"wing": trans_wing["background"],
},
}
samplelogs = {
"main": SampleLogs(processed_data_main),
"wing": SampleLogs(processed_data_wing),
}
if reduction_config["has_midrange_detector"]:
samplelogs["midrange"] = SampleLogs(processed_data_midrange)
logslice_data_dict = reduction_input["logslice_data"]
savereductionlog(
filename=filename,
detectordata=detectordata,
reductionparams=reductionparams,
# pythonfile=pythonfile,
starttime=starttime,
specialparameters=specialparameters,
logslicedata=logslice_data_dict,
samplelogs=samplelogs,
)
# change permissions to all files to allow overwrite
allow_overwrite(reduction_config["outputDir"])
allow_overwrite(os.path.join(reduction_config["outputDir"], "1D"))
allow_overwrite(os.path.join(reduction_config["outputDir"], "2D"))
return output
def save_iqmod_all(
iq1d_main, iq1d_midrange, iq1d_wing, iq1d_combined, output_filename, output_dir, output_suffix, skip_nan
):
"""Save to file the intensity profiles for the individual detectors, as well as the combined (stitched) profile
Parameters
----------
iq1d_main: ~drtsans.dataobjects.IQmod
Intensity profile for the main detector
iq1d_midrange: ~drtsans.dataobjects.IQmod
Intensity profile for the midrange detector
iq1d_wing: ~drtsans.dataobjects.IQmod
Intensity profile for the wing detector
iq1d_combined: ~drtsans.dataobjects.IQmod
Combined (stitched) intensity profile from different detectors
output_filename: str
The basename of the output files
output_dir: str
The directory to write files to
output_suffix:str
Suffix for output file names
skip_nan: bool
If true, any data point where intensity is NAN will not be written to file
"""
for j in range(len(iq1d_main)):
add_suffix = ""
if len(iq1d_main) > 1:
add_suffix = f"_wedge_{j}"
main_1D_filename = os.path.join(
output_dir,
"1D",
f"{output_filename}{output_suffix}_1D_main{add_suffix}.txt",
)
save_iqmod(iq1d_main[j], main_1D_filename, skip_nan=skip_nan)
wing_1D_filename = os.path.join(
output_dir,
"1D",
f"{output_filename}{output_suffix}_1D_wing{add_suffix}.txt",
)
save_iqmod(iq1d_wing[j], wing_1D_filename, skip_nan=skip_nan)
if iq1d_midrange:
midrange_1D_filename = os.path.join(
output_dir,
"1D",
f"{output_filename}{output_suffix}_1D_midrange{add_suffix}.txt",
)
save_iqmod(iq1d_midrange[j], midrange_1D_filename, skip_nan=skip_nan)
combined_1D_filename = os.path.join(
output_dir,
"1D",
f"{output_filename}{output_suffix}_1D_combined{add_suffix}.txt",
)
save_iqmod(iq1d_combined[j], combined_1D_filename, skip_nan=skip_nan)
def get_sample_detectordata(
iq1d_main, iq1d_midrange, iq1d_wing, iq1d_combined, iq2d_main, iq2d_midrange, iq2d_wing, has_midrange
):
"""Construct a dictionary of I(Q) output for the reduction logs
Parameters
----------
iq1d_main: ~drtsans.dataobjects.IQmod
Intensity profile for the main detector
iq1d_midrange: ~drtsans.dataobjects.IQmod
Intensity profile for the midrange detector
iq1d_wing: ~drtsans.dataobjects.IQmod
Intensity profile for the wing detector
iq1d_combined: ~drtsans.dataobjects.IQmod
Combined (stitched) intensity profile from different detectors
iq2d_main: ~drtsans.dataobjects.IQazimuthal
I(Qx, Qy) for the main detector
iq2d_midrange: ~drtsans.dataobjects.IQazimuthal
I(Qx, Qy) for the midrange detector
iq2d_wing: ~drtsans.dataobjects.IQazimuthal
I(Qx, Qy) for the wing detector
has_midrange: bool
If True, the run includes the midrange detector
"""
detector_data = {}
# TODO: fix this bug - only one combined profile is saved in the reduction log, but for wedges there are two
if iq1d_combined[-1].intensity.size > 0:
detector_data["combined"] = {"iq": [iq1d_combined[-1]]}
# one 1D I(Q) per detector for scalar and annular binning and two 1D I(Q) per detector for wedge binning
for index, (_iq1d_main, _iq1d_wing) in enumerate(zip(iq1d_main, iq1d_wing)):
detector_data[f"main_{index}"] = {"iq": [_iq1d_main]}
detector_data[f"wing_{index}"] = {"iq": [_iq1d_wing]}
if has_midrange:
for index, (_iq1d_midrange) in enumerate(iq1d_midrange):
detector_data[f"midrange_{index}"] = {"iq": [_iq1d_midrange]}
# one 2D I(Q) per detector, assign to index 0
index = 0
detector_data[f"main_{index}"]["iqxqy"] = iq2d_main
detector_data[f"wing_{index}"]["iqxqy"] = iq2d_wing
if has_midrange:
detector_data[f"midrange_{index}"]["iqxqy"] = iq2d_midrange
return detector_data
[docs]
def prepare_data(
data,
data_dir=None,
pixel_calibration=False,
mask_detector=None,
detector_offset=0,
sample_offset=0,
center_x=None,
center_y=None,
center_y_wing=None,
center_y_midrange=None,
dark_current=None,
flux_method=None,
monitor_fail_switch=False,
mask=None,
mask_panel=None,
btp=dict(),
solid_angle=True,
sensitivity_file_path=None,
sensitivity_workspace=None,
wave_length=None,
wavelength_spread=None,
sample_aperture_diameter=None,
sample_thickness=None,
source_aperture_diameter=None,
smearing_pixel_size_x=None,
smearing_pixel_size_y=None,
output_workspace=None,
output_suffix="",
**kwargs,
):
r"""
Load a BIOSANS data file and bring the data to a point where it can be used. This includes applying basic
corrections that are always applied regardless of whether the data is background or scattering data.
Parameters
----------
data: int, str, ~mantid.api.IEventWorkspace
Run number as int or str, file path, :py:obj:`~mantid.api.IEventWorkspace`
data_dir: str, list
Additional data search directories
pixel_calibration: bool
Adjust pixel heights and widths according to barscan and tube-width calibrations.
mask_detector: str, list
Name of one or more instrument components to mask: (e.g `detector1,wing_detector`)
detector_offset: float
Additional translation of the detector along Z-axis, in mili-meters.
sample_offset: float
Additional translation of the sample along the Z-axis, in mili-meters.
center_x: float
Move the center of the detector to this X-coordinate. If :py:obj:`None`, the
detector will be moved such that the X-coordinate of the intersection
point between the neutron beam and the detector array will have ``x=0``.
center_y: float
Move the center of the detector to this Y-coordinate. If :py:obj:`None`, the
detector will be moved such that the Y-coordinate of the intersection
point between the neutron beam and the detector array will have ``y=0``.
center_y_wing: float
Move the center of the wing detector to this Y-coordinate. If :py:obj:`None`, the
detector will be moved such that the Y-coordinate of the intersection
point between the neutron beam and the detector array will have ``y=0``.
center_y_midrange: float
Move the center of the detector vertically to this Y-coordinate. If :py:obj:`None`, the
detector will be moved such that the Y-coordinate of the intersection
point between the neutron beam and the detector array will have ``y=0``.
dark_current: int, str, ~mantid.api.IEventWorkspace
Run number as int or str, file path, :py:obj:`~mantid.api.IEventWorkspace`
flux_method: str
Method for flux normalization. Either 'monitor', or 'time'.
monitor_fail_switch: bool
Resort to normalization by 'time' if 'monitor' was selected but no monitor counts are available
panel: str
Either 'front' or 'back' to mask a whole panel
mask_panel: str
Either 'front' or 'back' to mask whole front or back panel.
mask: mask file path, MaskWorkspace, list
Additional mask to be applied. If `list`, it is a list of
detector ID's.
btp: dict
Additional properties to Mantid's MaskBTP algorithm
solid_angle: bool
Apply the solid angle correction
sensitivity_file_path: str
file containing previously calculated sensitivity correction
sensitivity_workspace: str, ~mantid.api.MatrixWorkspace
workspace containing previously calculated sensitivity correction. This
overrides the sensitivity_filename if both are provided.
wave_length: float, None
wave length in Angstrom
wavelength_spread: float, None
wave length spread in Angstrom
sample_aperture_diameter: float, None
sample aperture diameter in mm
sample_thickness: None, float
sample thickness in unit cm
source_aperture_diameter: float, None
source aperture size radius in unit mm
smearing_pixel_size_x: float, None
pixel size in x direction in unit as meter, only for Q-resolution calculation
smearing_pixel_size_y: float, None
pixel size in Y direction in unit as meter, only for Q-resolution calculation
output_workspace: str
Name of the output workspace. If not supplied, will be determined from the supplied value of ``data``.
output_suffix: str
If the ``output_workspace`` is not specified, this is appended to the automatically generated
output workspace name.
Returns
-------
~mantid.api.IEventWorkspace
Reference to the events workspace
"""
# Detector offset and sample offset are disabled
if abs(detector_offset) > 1e-8 or abs(sample_offset) > 1e-8:
raise RuntimeError("biosans.api.prepare_data does not work with detector_offset or sample_offset")
# Load data and enforce to use nexus IDF
if "enforce_use_nexus_idf" in kwargs:
enforce_use_nexus_idf = kwargs["enforce_use_nexus_idf"]
else:
enforce_use_nexus_idf = False
# Load event without moving detector and sample after loading NeXus and instrument
ws = load_events(
data,
data_dir=data_dir,
overwrite_instrument=True,
output_workspace=output_workspace,
output_suffix=output_suffix,
pixel_calibration=pixel_calibration,
detector_offset=0.0,
sample_offset=0.0,
LoadNexusInstrumentXML=enforce_use_nexus_idf,
)
# Reset the offset
sample_offset, detector_offset = get_sample_detector_offset(
ws, SAMPLE_SI_META_NAME, SI_WINDOW_NOMINAL_DISTANCE_METER
)
# Translate instrument with offsets
move_instrument(ws, sample_offset, detector_offset)
ws_name = str(ws)
transform_to_wavelength(ws_name)
set_init_uncertainties(ws_name)
if center_x is not None and center_y is not None and center_y_wing is not None:
# check whether:
# (1) there is no midrange detector information provided or
# (2) it is provided and the center_y_midrange should be there
if (not has_midrange_detector(ws_name)) or (has_midrange_detector(ws_name) and center_y_midrange is not None):
biosans.center_detector(
ws_name,
center_x=center_x,
center_y=center_y,
center_y_wing=center_y_wing,
center_y_midrange=center_y_midrange,
)
# Mask either detector
if mask_detector is not None:
MaskDetectors(ws_name, ComponentList=mask_detector)
# Dark current
if dark_current is not None:
if mtd.doesExist(str(dark_current)):
dark_ws = mtd[str(dark_current)]
else:
# load dark current
dark_ws = load_events(
dark_current,
overwrite_instrument=True,
LoadNexusInstrumentXML=enforce_use_nexus_idf,
)
dark_ws = transform_to_wavelength(dark_ws)
dark_ws = set_init_uncertainties(dark_ws)
subtract_dark_current(ws_name, dark_ws)
# Normalization
if str(flux_method).lower() == "monitor":
try:
normalize_by_monitor(ws_name)
except RuntimeError as e:
if monitor_fail_switch:
logger.warning(f"{e}. Resorting to normalization by time")
normalize_by_time(ws_name)
else:
msg = (
'. Setting configuration "normalizationResortToTime": True will cause the'
" reduction to normalize by time if monitor counts are not available"
)
raise RuntimeError(str(e) + msg)
elif str(flux_method).lower() == "time":
normalize_by_time(ws_name)
else:
logger.notice("No time or monitor normalization is carried out")
# Additional masks
apply_mask(ws_name, panel=mask_panel, mask=mask, **btp)
# Solid angle
if solid_angle:
if solid_angle is True:
solid_angle_correction(ws_name)
else: # assume the solid_angle parameter is a workspace
solid_angle_correction(ws_name, solid_angle_ws=solid_angle)
# Sensitivity
if sensitivity_file_path is not None or sensitivity_workspace is not None:
drtsans.apply_sensitivity_correction(
ws_name,
sensitivity_filename=sensitivity_file_path,
sensitivity_workspace=sensitivity_workspace,
)
# Overwrite meta data
set_meta_data(
ws_name,
wave_length,
wavelength_spread,
sample_offset,
sample_aperture_diameter,
sample_thickness,
source_aperture_diameter,
smearing_pixel_size_x,
smearing_pixel_size_y,
)
return mtd[ws_name]
def create_output_dir(output_dir):
# output root
if not os.path.exists(output_dir):
os.mkdir(output_dir)
# 1D and 2D
for sub_dir in ["1D", "2D"]:
n_d_dir = os.path.join(output_dir, sub_dir)
if not os.path.exists(n_d_dir):
os.mkdir(n_d_dir)
def form_output_name(workspace):
workspace_name = str(workspace)
file_name = workspace_name.split("/")[-1].split(".")[0]
return f"{file_name}.png"
def file_has_midrange_detector(sample: str, instrument_name: str, ipts: str, directory: str) -> bool:
"""
Check if the sample has midrange detector
Parameters
----------
sample
Sample name
instrument_name
Instrument name
ipts
IPTS number
directory
Directory of the data
Returns
-------
bool
True if the sample has midrange detector
"""
filename = abspaths(
sample.strip(),
instrument=instrument_name,
ipts=ipts,
directory=directory,
).split(
","
)[0]
out_ws_name = mtd.unique_hidden_name()
try:
workspace = LoadEventNexus(Filename=filename, MetadataOnly=True, OutputWorkspace=out_ws_name)
except RuntimeError:
workspace = LoadNexusProcessed(Filename=filename, OutputWorkspace=out_ws_name)
has_midrange = workspace.getInstrument().getComponentByName("midrange_detector") is not None
# cleanup
DeleteWorkspace(workspace)
return has_midrange