# This module contains the class and static method to convert legacy
# CG2's data file in XML format to event Nexus
import os
import numpy as np
from drtsans.mono.spice_xml_parser import SpiceXMLParser
from drtsans.files.event_nexus_rw import DasLog, EventNeXusWriter, TofHistogram
import h5py
from typing import List
from mantid.simpleapi import LoadHFIRSANS
from mantid.simpleapi import mtd, logger
import abc
from abc import ABC
# SPICE NeXus meta data unit name conversion. Note that the units are same but with difference names.
SPICE_NEXUS_UNIT_NAME_MAP = {
"wavelength": "A",
"wavelength_spread": None,
"ww_rot_Readback": "deg",
"attenuator": None,
}
[docs]
class EventNexusConverter(ABC):
"""
Class to provide service to convert to event NeXus from various input
"""
def __init__(self, beam_line, instrument_name, num_banks):
"""
Parameters
----------
beam_line: str
beam line such as CG2, CG3
instrument_name: str
instrument name such as CG2, CG3
"""
# beam line name
self._beam_line = beam_line
self._instrument_name = instrument_name
self._num_banks = num_banks
# instrument XML IDF content
self._idf_content = None
# counts
self._spice_detector_counts = None # 1D array ranging from PID 0 (aka workspace index 0)
self._bank_pid_dict = dict()
self._bank_counts_dict = dict()
self._monitor_counts = None
# sample logs
self._das_logs = dict()
# run time
self._run_start = None
self._run_stop = None
# run number
self._run_number = None
@property
def detector_counts(self):
"""Get detector counts
Returns
-------
~numpy.array
detector counts
"""
return self._spice_detector_counts[:]
[docs]
def generate_event_nexus(self, target_nexus):
"""Generate event NeXus properly
num_banks: int
CG2 = 48, CG3 = 88
Parameters
----------
target_nexus: str
name of the output Nexus file
Returns
-------
"""
# Map detector counts from SPICE to detector counts/bank/detector ID of NeXus
# detector counts
self._map_detector_and_counts()
num_banks = self._num_banks
# Set constants
pulse_duration = 0.1 # second
tof_min = 1000.0
tof_max = 20000.0
# Generate event nexus writer
event_nexus_writer = EventNeXusWriter(beam_line=self._beam_line, instrument_name=self._instrument_name)
# set instrument
event_nexus_writer.set_instrument_info(num_banks, self._idf_content)
# set counts
for bank_id in range(1, num_banks + 1):
# create TofHistogram instance
# start_pid, end_pid = self.get_pid_range(bank_id)
# pix_ids = np.arange(start_pid, end_pid + 1)
# counts = self._detector_counts[start_pid:end_pid + 1]
# counts = counts.astype("int64")
pix_ids = self._bank_pid_dict[bank_id]
counts = self._bank_counts_dict[bank_id]
assert pix_ids is not None
assert counts is not None
histogram = TofHistogram(pix_ids, counts, pulse_duration, tof_min, tof_max)
# set to writer
event_nexus_writer.set_bank_histogram(bank_id, histogram)
# set meta
for das_log in self._das_logs.values():
event_nexus_writer.set_meta_data(das_log)
# set fake run number
event_nexus_writer.set_run_number(self._run_number)
# Write file
event_nexus_writer.generate_event_nexus(target_nexus, self._run_start, self._run_stop, self._monitor_counts)
[docs]
def load_sans_xml(self, xml_file_name, das_log_map, prefix=""):
"""Load data and meta data from legacy SANS XML data file
Parameters
----------
xml_file_name: str
name of SANS XML file
prefix: str
prefix for output workspace name
das_log_map: ~dict
meta data map between event NeXus and SPICE
Returns
-------
"""
spice_log_dict, pt_number = self._retrieve_meta_data(xml_file_name, das_log_map)
self._das_logs = self.convert_log_units(spice_log_dict)
# output workspace name
sans_ws_name = os.path.basename(xml_file_name).split(".xml")[0]
sans_ws_name = f"{prefix}{sans_ws_name}"
# load
logger.notice(f"Load {xml_file_name}")
LoadHFIRSANS(Filename=xml_file_name, OutputWorkspace=sans_ws_name)
# get counts and reshape to (N, )
sans_ws = mtd[sans_ws_name]
counts = sans_ws.extractY().transpose().reshape((sans_ws.getNumberHistograms(),))
# monitor counts
monitor_counts = int(counts[0])
self._monitor_counts = monitor_counts
# get detector counts and convert to int 64
self._spice_detector_counts = counts[2:].astype("int64")
# NOTE:
# monitor counts cannot be zero since we need it as the denominator during
# normalization.
if abs(self._monitor_counts) < 1e-6:
logger.warning("current XML contains: monitor_count=0")
# get run start time: force to a new time
self._run_start = sans_ws.run().getProperty("run_start").value
self._run_stop = sans_ws.run().getProperty("end_time").value
self._run_number = pt_number
def _map_detector_and_counts(self):
# self._detector_counts = counts[2:]
raise RuntimeError("This is virtual")
[docs]
def load_idf(self, template_nexus_file):
"""Load IDF content from a template NeXus file
Parameters
----------
template_nexus_file
Returns
-------
"""
# Import source
try:
source_nexus_h5 = h5py.File(template_nexus_file, "r")
except OSError as os_err:
raise RuntimeError(f"Unable to load {template_nexus_file} due to {os_err}")
# IDF in XML
self._idf_content = source_nexus_h5["entry"]["instrument"]["instrument_xml"]["data"][0]
# Close
source_nexus_h5.close()
[docs]
def mask_spice_detector_pixels(self, pixel_index_list: List[int]):
"""Mask detector pixels with SPICE counts by set the counts to zero
Parameters
----------
pixel_index_list: ~list
list of integers as workspace index detector pixels
"""
# Sanity check
if self._spice_detector_counts is None:
raise RuntimeError("Detector counts array has not been set up yet. Load data first")
# Set masked pixels
for pid in pixel_index_list:
try:
self._spice_detector_counts[pid] = 0
except IndexError as index_error:
raise RuntimeError(
f"Pixel ID {pid} is out of range {self._spice_detector_counts.shape}. " f"FYI: {index_error}"
)
@staticmethod
def _retrieve_meta_data(spice_file_name, das_spice_log_map):
"""Retrieve meta from workspace
Parameters
----------
spice_file_name: str
full path of SPICE data file in XML format
das_spice_log_map: ~dict
DAS log conversion map between event NeXus and spice
Returns
-------
~dict
key: Nexus das log name, value: (log value, log unit). if das log is not found, value will be None
"""
# Load SPICE file
spice_reader = SpiceXMLParser(spice_file_name)
# Mandatory key list
# NOTE: Missing node listed in this list will leads to error in processing.
# Early failure is enfored for missing entry in this list
mandatory_das_logs = [
"CG3:CS:SampleToSi",
"sample_detector_distance",
"wavelength",
"wavelength_spread",
"source_aperture_diameter",
"sample_aperture_diameter",
"detector_trans_Readback",
"ww_rot_Readback",
"source_aperture_sample_aperture_distance",
]
das_log_values = dict()
for nexus_log_name, spice_tuple in das_spice_log_map.items():
# read value from XML node
# FIXME - this is a temporary solution in order to work with both new and old
# FIXME - meta data map
spice_log_name = spice_tuple[0]
default_unit = spice_tuple[1]
if len(spice_tuple) >= 3:
data_type = spice_tuple[2]
else:
data_type = float # default
# try to query the value from XML tree
try:
value, unit = spice_reader.get_node_value(spice_log_name, data_type)
except KeyError as key_err:
if nexus_log_name in mandatory_das_logs:
raise ValueError(f"!Aborting: Cannot find mandaory node {nexus_log_name}")
else:
logger.warning(str(key_err))
logger.warning(f"skipping {nexus_log_name}")
value = None
unit = None
if value is not None:
# set default
if unit is None:
unit = default_unit
else:
# check unit
if unit != default_unit:
raise RuntimeError(
f"SPICE log {spice_log_name} has unit {unit} different from " f"expected {default_unit}"
)
das_log_values[nexus_log_name] = value, unit
# Get pt number
# NOTE: the image path contains some information we can use for sanity check
_expn, _scnn, _scpn = 0, 0, 0
try:
imgpath, _ = spice_reader.get_node_value("ImagePath", str)
tmp = imgpath.split(".")[0].split("_")[1:]
except KeyError as key_err:
logger.warning(str(key_err))
imgpath = ""
tmp = []
if len(tmp) == 3:
_expn = int(tmp[0].replace("exp", ""))
_scnn = int(tmp[1].replace("scan", ""))
_scpn = int(tmp[2].replace("scan", ""))
_pt_parts = [_expn, _scnn, _scpn]
if "bio" in imgpath.lower():
logger.notice("Possible BioSANS data found")
logger.notice("run_number = exp#scan#scanPt#")
lbs = ["Experiment_number", "Scan_Number", "Scan_Point_Number"]
for i, lb in enumerate(lbs):
try:
_pt_parts[i], _ = spice_reader.get_node_value(lb, int)
except KeyError as key_err:
logger.warning(str(key_err))
logger.warning(f"Defaulting {lb} to {_pt_parts[i]}")
# stich to form a pt_number
pt_number = "".join([str(me).zfill(4) for me in _pt_parts])
else:
# Default method to handle non BioSANS type SPICE data
try:
pt_number, unit = spice_reader.get_node_value("Scan_Point_Number", int)
except KeyError as key_err:
pt_number = key_err
# Close file
spice_reader.close()
return das_log_values, pt_number
[docs]
@staticmethod
def convert_log_units(spice_log_dict):
"""Convert log unit from SPICE log to Nexus log
Parameters
----------
spice_log_dict: ~dict
key: Nexus das log name, value: (log value, log unit)
Returns
-------
~dict
key: DAS log name, value: DasLog
"""
nexus_log_dict = dict()
for nexus_das_log_name in spice_log_dict:
# get value
log_value, log_unit = spice_log_dict[nexus_das_log_name]
# use the name of the NeXus das log value unit
if nexus_das_log_name in SPICE_NEXUS_UNIT_NAME_MAP:
log_unit = SPICE_NEXUS_UNIT_NAME_MAP[nexus_das_log_name]
# form das log
nexus_das_log = DasLog(
nexus_das_log_name,
np.array([0.0]),
np.array([log_value]),
log_unit,
None,
)
# add
nexus_log_dict[nexus_das_log_name] = nexus_das_log
return nexus_log_dict
[docs]
@abc.abstractmethod
def get_pid_range(self, bank_id):
"""Set GPSANS bank and pixel ID relation
Parameters
----------
bank_id: int
bank ID from 1 to 48
Returns
-------
tuple
start PID, end PID (assuming PID are consecutive in a bank and end PID is inclusive)
"""
raise RuntimeError("Class method EventNexusConverter.get_pid_range is abstract")