Source code for drtsans.mono.biosans.reduce_spice

import os
from typing import List, Tuple, Union
import numpy as np
import time
from mantid.simpleapi import mtd
from drtsans.mono.spice_data import map_to_nexus
from drtsans.mono.biosans import (
    load_all_files,
    reduce_single_configuration,
    plot_reduction_output,
    reduction_parameters,
    update_reduction_parameters,
)
import warnings

warnings.filterwarnings("ignore")

CG3 = "CG3"


[docs] def clear_buffer(): """Clear memory buffer: clear mantid workspaces""" mtd.clear()
[docs] def reduce_biosans_nexus( ipts_number: int, experiment_number: int, sample_names: List[str], sample_runs: List[Tuple[int, int]], background_runs: List[Union[None, Tuple[int, int]]], sample_transmission_runs: List[Tuple[int, int]], background_transmission_runs: List[Union[None, Tuple[int, int]]], empty_transmission_run: Tuple[int, int], beam_center_runs: Tuple[int, int], sample_thickness_list: List[Union[str, float]], sample_identifier: str, overwrite_reduced_data: bool, main_detector_dark_run: Tuple[int, int], wing_detector_dark_run: Tuple[int, int], base_output_directory: str, main_detector_sensitivities_file: str, wing_detector_sensitivities_file: str, scale_factor: Union[float, str, None], scaling_beam_radius: Union[float, str, None], number_linear_q1d_bins_main_detector: Union[int, str], number_linear_q1d_bins_wing_detector: Union[int, str], number_linear_q2d_bins_main_detector: Union[int, str], number_linear_q2d_bins_wing_detector: Union[int, str], plot_type: str, plot_binning: str, number_log_bins_per_decade_main_detector: int, number_log_bins_per_decade_wing_detector: int, q_range_main: Union[List[float], Tuple[float, float]], q_range_wing: Union[List[float], Tuple[float, float]], overlap_stitch_range: Union[List[float], Tuple[float, float]], flexible_pixel_sizes: bool, wedge_min_angles: Union[List[float], None], wedge_max_angles: Union[List[float], None], auto_wedge_qmin: float, auto_wedge_qmax: float, auto_wedge_delta_q: float, auto_wedge_peak_width: float, auto_wedge_delta_azimuthal_angle: float, auto_wedge_background_width: float, auto_wedge_minimum_signal_to_noise_ratio: float, q_range_main_wedge0: Union[Tuple[float, float], List[float]], q_range_wing_wedge0: Union[Tuple[float, float], List[float]], OL_range_wedge0: Union[Tuple[float, float], List[float]], q_range_main_wedge1: Union[Tuple[float, float], List[float]], q_range_wing_wedge1: Union[Tuple[float, float], List[float]], OL_range_wedge1: Union[Tuple[float, float], List[float]], refresh_cycle: int, nexus_dir: Union[str, None], ): # Convert SPICE scan-pt tuple to NeXus files sample_runs = map_to_nexus(CG3, ipts_number, experiment_number, sample_runs, nexus_dir=nexus_dir) sample_transmission_runs = map_to_nexus( CG3, ipts_number, experiment_number, sample_transmission_runs, nexus_dir=nexus_dir, ) background_runs = map_to_nexus(CG3, ipts_number, experiment_number, background_runs, nexus_dir=nexus_dir) background_transmission_runs = map_to_nexus( CG3, ipts_number, experiment_number, background_transmission_runs, nexus_dir=nexus_dir, ) beam_center_runs = map_to_nexus(CG3, ipts_number, experiment_number, [beam_center_runs], nexus_dir=nexus_dir)[0] empty_transmission_run = map_to_nexus( CG3, ipts_number, experiment_number, [empty_transmission_run], nexus_dir=nexus_dir, )[0] main_detector_dark_run = map_to_nexus( CG3, ipts_number, experiment_number, [main_detector_dark_run], nexus_dir=nexus_dir, )[0] wing_detector_dark_run = map_to_nexus( CG3, ipts_number, experiment_number, [wing_detector_dark_run], nexus_dir=nexus_dir, )[0] # reduction parameters common to all the reduction runs to be carried out in this notebook common_configuration = { "iptsNumber": ipts_number, "beamCenter": {"runNumber": beam_center_runs}, "emptyTransmission": {"runNumber": empty_transmission_run}, "configuration": { "outputDir": base_output_directory, "darkMainFileName": main_detector_dark_run, "darkWingFileName": wing_detector_dark_run, "sensitivityMainFileName": main_detector_sensitivities_file, "sensitivityWingFileName": wing_detector_sensitivities_file, "defaultMask": [ {"Pixel": "1-18,239-256"}, {"Bank": "18-24,42-48"}, {"Bank": "49", "Tube": "1"}, ], "StandardAbsoluteScale": scale_factor, "DBScalingBeamRadius": scaling_beam_radius, "mmRadiusForTransmission": "", "absoluteScaleMethod": "standard", "numMainQBins": number_linear_q1d_bins_main_detector, "numWingQBins": number_linear_q1d_bins_wing_detector, "numMainQxQyBins": number_linear_q2d_bins_main_detector, "numWingQxQyBins": number_linear_q2d_bins_wing_detector, "1DQbinType": plot_type, "QbinType": plot_binning, "LogQBinsPerDecadeMain": number_log_bins_per_decade_main_detector, "LogQBinsPerDecadeWing": number_log_bins_per_decade_wing_detector, "useLogQBinsDecadeCenter": False, "useLogQBinsEvenDecade": False, "sampleApertureSize": 14, "QminMain": q_range_main[0], "QmaxMain": q_range_main[1], "QminWing": q_range_wing[0], "QmaxWing": q_range_wing[1], "overlapStitchQmin": overlap_stitch_range[0], "overlapStitchQmax": overlap_stitch_range[1], "usePixelCalibration": flexible_pixel_sizes, "useTimeSlice": False, "timeSliceInterval": 60, "WedgeMinAngles": wedge_min_angles, "WedgeMaxAngles": wedge_max_angles, "autoWedgeQmin": auto_wedge_qmin, "autoWedgeQmax": auto_wedge_qmax, "autoWedgeQdelta": auto_wedge_delta_q, "autoWedgePeakWidth": auto_wedge_peak_width, "autoWedgeAzimuthalDelta": auto_wedge_delta_azimuthal_angle, "autoWedgeBackgroundWidth": auto_wedge_background_width, "autoWedgeSignalToNoiseMin": auto_wedge_minimum_signal_to_noise_ratio, "wedge1QminMain": q_range_main_wedge0[0], "wedge1QmaxMain": q_range_main_wedge0[1], "wedge1QminWing": q_range_wing_wedge0[0], "wedge1QmaxWing": q_range_wing_wedge0[1], "wedge1overlapStitchQmin": OL_range_wedge0[0], "wedge1overlapStitchQmax": OL_range_wedge0[1], "wedge2QminMain": q_range_main_wedge1[0], "wedge2QmaxMain": q_range_main_wedge1[1], "wedge2QminWing": q_range_wing_wedge1[0], "wedge2QmaxWing": q_range_wing_wedge1[1], "wedge2overlapStitchQmin": OL_range_wedge1[0], "wedge2overlapStitchQmax": OL_range_wedge1[1], }, } common_configuration_full = reduction_parameters(common_configuration, "BIOSANS", validate=False) # pretty_print(common_configuration_full) if len(background_runs) == 1 and len(sample_runs) > len(background_runs): background_runs = background_runs * len(sample_runs) if len(background_transmission_runs) == 1 and len(sample_transmission_runs) > len(background_transmission_runs): background_transmission_runs = background_transmission_runs * len(sample_transmission_runs) if len(sample_thickness_list) == 1 and len(sample_runs) > len(sample_thickness_list): sample_thickness_list = sample_thickness_list * len(sample_runs) # Checking if output directory exists, if it doesn't, creates the folder # Also, if do not overwrite, then makes sure the directory does not exists. output_dir = base_output_directory if not overwrite_reduced_data: suffix = 0 while os.path.exists(output_dir): output_dir = base_output_directory[0, len(base_output_directory) - 2] + "_" + str(suffix) + "/" suffix += 1 if sample_identifier: output_dir = base_output_directory + str(sample_identifier) + "/" change_outputdir = { "configuration": { "outputDir": output_dir, }, } common_configuration_full = update_reduction_parameters( common_configuration_full, change_outputdir, validate=False ) for subfolder in ["1D", "2D"]: output_folder = os.path.join(output_dir, subfolder) if not os.path.exists(output_folder): os.makedirs(output_folder) start_time = time.time() # Loop for samples for i, sample_name in enumerate(sample_names): start_time_loop = time.time() # form base output file name output_file_name = generate_output_base_name(sample_runs[i], sample_names[i]) run_data = { "sample": { "runNumber": sample_runs[i], "thickness": sample_thickness_list[i], "transmission": {"runNumber": sample_transmission_runs[i]}, }, "background": { "runNumber": background_runs[i], "transmission": {"runNumber": background_transmission_runs[i]}, }, "outputFileName": output_file_name, } # Update our common settings with the particulars of the current reduction reduction_input = update_reduction_parameters(common_configuration_full, run_data, validate=True) # pretty_print(reduction_input) reduction_input["configuration"]["WedgeMinAngles"] = wedge_min_angles reduction_input["configuration"]["WedgeMaxAngles"] = wedge_max_angles # Load all files loaded = load_all_files(reduction_input, use_nexus_idf=True) # Reduced from workspaces loaded from NeXus files out = reduce_single_configuration(loaded, reduction_input) plot_reduction_output(out, reduction_input) print("\nloop_" + str(i + 1) + ": ", time.time() - start_time_loop) if np.remainder(i, refresh_cycle) == 0 and i > 0: # mtd.clear() clear_buffer() print("Total Time : ", time.time() - start_time) # samples shall be list of file names test_log_files = list() for i_s, sample in enumerate(sample_runs): output_file_name = generate_output_base_name(sample, sample_names[i_s]) test_log_file = generate_output_log_file(base_output_directory, output_file_name, "") assert os.path.exists(test_log_file), f"Output log file {test_log_file} cannot be found." test_log_files.append(test_log_file) return test_log_files
[docs] def generate_output_base_name(sample_run_i: Union[str, int], sample_name_i: str) -> str: """Generate base name for output files according to sample run (Nexus file or run number) and sample name """ if isinstance(sample_run_i, str) and os.path.exists(sample_run_i): # a full path to NeXus is given part_1 = os.path.basename(sample_run_i).split(".")[0] else: part_1 = sample_run_i output_file_name_base = f"r{part_1}_{sample_name_i}" return output_file_name_base
[docs] def generate_output_log_file(output_directory: str, file_base_name: str, file_suffix: str) -> str: """Generate output log file name as the standard drtsans/biosans convention""" filename = os.path.join(output_directory, f"{file_base_name}_reduction_log{file_suffix}.hdf") return filename