#!/usr/bin/env python
import os
import yaml
import numpy as np
from typing import List, Union
from drtsans.mono.convert_xml_to_nexus import EventNeXusWriter
from drtsans.files.event_nexus_rw import parse_event_nexus
from drtsans.mono.convert_xml_to_nexus import EventNexusConverter
[docs]
class CG3EventNexusConvert(EventNexusConverter):
"""
Converting legacy SPICE file for BIOSANS to Event NeXus
"""
def __init__(self):
"""
initialization
work for 88 banks
"""
super(CG3EventNexusConvert, self).__init__("CG3", "CG3", self.num_banks)
@property
def num_banks(self):
"""For BioSANS (CG3), the total number of banks is a fixed value: 88"""
return 88
def _map_detector_and_counts(self):
"""Map detector counts and pixel IDs from SPICE-era IDF to NeXus-era IDF
SPICE: pixel ID is consecutive from lower left corner of detector, up to the top and then from the next tube
as
255 511 ...
. ... ...
. ... ...
1 257 ...
0 256 512 ....
----------------------------------------
tube0 tube1 tube2 ...
NeXus: in each bank, the pixel IDs start with 4 front tubes and then 4 back tubes as
255 1279 ...
. ... ...
. ... ...
1 1025 257
0 1024 256 ....
---------------------------
bank1 bank 25 bank 2 bank 26 ......
and the front 4 tubes are in one bank and the back 4 tubes are in another bank
Therefore, the algorithm shall re-assign the counts to tubes
"""
# map SPICE tube to NeXus bank/tube
# TODO - make these into constants and do a sanity check for any data
num_pixel_per_tube = 256
num_main_8packs = 192 // 8
num_wing_8packs = 160 // 8
# sanity_check()
# initialize the output
for nexus_bank_id in range(1, 1 + self._num_banks):
# NeXus PID range for each bank
start_pid, end_pid = self.get_pid_range(nexus_bank_id)
# assign to tubes: pixel ID shall be ordered according to SPICE workspace indexes
pix_ids = np.arange(start_pid, end_pid + 1)
# initialize dictionary items for PID and counts
self._bank_pid_dict[nexus_bank_id] = pix_ids
self._bank_counts_dict[nexus_bank_id] = np.zeros_like(pix_ids)
# map from SPICE tubes to Nexus bank/tube
for tube_group in range(num_main_8packs + num_wing_8packs):
# each 8 pack/tube group has 2 banks: bank shift is for the front bank in the 8 pack's shift from
# first bank
if tube_group < num_main_8packs:
group_bank_shift = tube_group
else:
group_bank_shift = tube_group + num_main_8packs
for tube_index in range(8):
# event tube: front panel
# odd tube: back panel shift another half detector (i.e., 1/2 banks in detector or number of 8 packs)
tube_bank_shift = tube_index % 2
# consider main and wing
if tube_group < num_main_8packs:
# main
bank_id = group_bank_shift + tube_bank_shift * num_main_8packs
else:
# wing
bank_id = group_bank_shift + tube_bank_shift * num_wing_8packs
bank_id += 1 # Nexus bank ID starts from 1
# spice tube index
spice_tube_index = tube_group * 8 + tube_index
bank_tube_index = tube_index // 2
# map counts to
spice_count_start_index = spice_tube_index * num_pixel_per_tube
bank_count_start_index = bank_tube_index * num_pixel_per_tube
self._bank_counts_dict[bank_id][
bank_count_start_index : bank_count_start_index + num_pixel_per_tube
] = self._spice_detector_counts[spice_count_start_index : spice_count_start_index + num_pixel_per_tube]
[docs]
def get_pid_range(self, bank_id):
"""Set GPSANS bank and pixel ID relation
Parameters
----------
bank_id: int
bank ID from 1 to 88
Returns
-------
tuple
start PID, end PID (assuming PID are consecutive in a bank and end PID is inclusive)
"""
# NOTE:
# For legacy data, the hardware configuration is fixed, therefore it is hardcoded
# in this method. DO NOT TOUCH!!!
# Check input valid
if bank_id < 1 or bank_id > self.num_banks:
raise RuntimeError(f"CG3 (BioSANS) has 88 banks indexed from 1 to 88. " f"Bank {bank_id} is out of range.")
# calculate starting PID
if bank_id <= 24:
# from 1 to 24: front panel
start_pid = (bank_id - 1) * 2 * 1024
elif bank_id <= 48:
# from 25 to 48: back panel
start_pid = ((bank_id - 25) * 2 + 1) * 1024
elif bank_id <= 68:
# from 49 to 68: even bank from 49152 (main detector pixel number)
start_pid = (bank_id - 49) * 2 * 1024 + 48 * 1024
else:
# from 69 to 88
start_pid = ((bank_id - 69) * 2 + 1) * 1024 + 48 * 1024
# calculate end PID
end_pid = start_pid + 1023
return start_pid, end_pid
[docs]
def convert_spice_to_nexus(
ipts_number: int,
exp_number: int,
scan_number: int,
pt_number: int,
template_nexus: str,
masked_detector_pixels: List[int] = list(),
output_dir: str = None,
spice_dir: str = None,
spice_data: str = Union[None, str],
):
"""
Convert legacy SPICE file for bioSANS/cg3 to Event NeXus
Parameters
----------
ipts_number: int
IPTS
exp_number: int
experiment number
scan_number: int
scan
pt_number: int
pt
masked_detector_pixels: ~list
List of pixels (mantid workspace indexes) to mask
template_nexus: str
path to a GPSANS nED event Nexus file especially for IDF
output_dir: None or str
output directory of the converted data
spice_dir: None or str
data file directory for SPICE file. None using default
spice_data: None or str
full data file. It is specified, there is no need to construct SPICE file name anymore
Returns
-------
str
generated event Nexus file
"""
if spice_dir is not None:
# construct SPICE file from IPTS, experiment and etc.
# path processing
spice_dir = f"/HFIR/CG3/IPTS-{ipts_number}/exp{exp_number}/Datafiles" if spice_dir is None else spice_dir
# construct SPICE
spice_data = f"BioSANS_exp{exp_number}_scan{scan_number:04}_{pt_number:04}.xml"
spice_data = os.path.join(spice_dir, spice_data)
output_dir = f"/HFIR/CG3/IPTS-{ipts_number}/shared/spice_nexus" if output_dir is None else output_dir
# Input (Path&File) validation
assert os.path.exists(spice_dir), f"SPICE data directory {spice_dir} cannot be found"
# check SPICE file
assert os.path.exists(spice_data), f"SPICE file {spice_data} cannot be located"
assert os.path.exists(template_nexus), f"Template NeXus file {template_nexus} cannot be located"
# Check output directory exist. If not, create it
if not os.path.exists(output_dir):
try:
os.mkdir(output_dir)
except (OSError, IOError) as dir_err:
raise RuntimeError(
f"Output directory {output_dir} doesn't exist." f"Unable to create {output_dir} due to {dir_err}"
)
# output file name
out_nexus_file = f"CG3_{exp_number:04}{scan_number:04}{pt_number:04}.nxs.h5"
out_nexus_file = os.path.join(output_dir, out_nexus_file)
# load mapping reference from yaml
_file_parent_dir = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(_file_parent_dir, "cg3_to_nexus_mapping.yml"), "r") as stream:
das_log_map = yaml.safe_load(stream)
# init converter
converter = CG3EventNexusConvert()
# load instrument definition (IDF)
converter.load_idf(template_nexus)
# load SPICE (xml file)
converter.load_sans_xml(spice_data, das_log_map)
# mask detector
converter.mask_spice_detector_pixels(masked_detector_pixels)
# generate event nexus
converter.generate_event_nexus(out_nexus_file)
return out_nexus_file
# note: the following function is a legacy function
[docs]
def generate_event_nexus(source_nexus, target_nexus, das_log_list=None):
"""Generate event NeXus properly from a source Nexus file
This method will be migrated to drtsans.mono.biaosans
Parameters
----------
source_nexus: str
source nexus file
target_nexus: str
target nexus file
das_log_list: ~list
list of DAS logs
Returns
-------
"""
cg3_num_banks = 88
DAS_LOGs = [
"CG3:CS:SampleToSi",
"sample_detector_distance",
"wavelength",
"wavelength_spread",
"source_aperture_diameter",
"sample_aperture_diameter",
"detector_trans_Readback",
"ww_rot_Readback",
"source_aperture_sample_aperture_distance",
]
das_log_list = DAS_LOGs if das_log_list is None else das_log_list
# Import essential experimental data from source event nexus file
nexus_contents = parse_event_nexus(source_nexus, 88, das_log_list)
# Generate event nexus writer
event_nexus_writer = EventNeXusWriter(beam_line="CG3", instrument_name="CG3")
# set instrument: 88 banks (2 detectors)
event_nexus_writer.set_instrument_info(cg3_num_banks, nexus_contents[0])
# set counts: 88 banks (2 detectors)
for bank_id in range(1, cg3_num_banks + 1):
event_nexus_writer.set_bank_histogram(bank_id, nexus_contents[1][bank_id])
# set meta
for das_log in nexus_contents[5].values():
event_nexus_writer.set_meta_data(das_log)
# time
start_time = nexus_contents[3]
end_time = nexus_contents[4]
# Write file
event_nexus_writer.generate_event_nexus(target_nexus, start_time, end_time, nexus_contents[2])
if __name__ == "__main__":
print("converting Legacy cg3 SPICE file to Event Nexus")
print("e.g.")
print("cg3_spice_to_nexus.py SPICE_FILE_TO_CONVERT")