# Create Event NeXus file
import numpy as np
import dateutil
import datetime
import math
import drtsans
from drtsans.files.hdf5_rw import DataSetNode, GroupNode
__all__ = [
"MonitorNode",
"BankNode",
"DataSetNode",
"DasLogsCollectionNode",
"InstrumentNode",
]
[docs]
class MonitorNode(drtsans.files.hdf5_rw.GroupNode):
"""
Node to record monitor counts
"""
def __init__(self, name, monitor_name):
"""
Parameters
----------
name
monitor_name
"""
self._monitor_name = monitor_name
super(MonitorNode, self).__init__(name)
# add NX_class
self.add_attributes({"NX_class": b"NXmonitor"})
[docs]
def set_monitor_events(
self,
event_index_array,
event_time_offset_array,
run_start_time,
event_time_zero_array,
):
"""Monitor counts are recorded as events
Parameters
----------
event_time_offset_array: numpy.ndarray
TOF of each neutron event. Size is equal to number of events
event_time_zero_array: numpy.ndarray
Staring time, as seconds offset to run start time, of each pulse
run_start_time: str
Run start time in ISO standard
event_index_array: numpy.ndarray
Index of staring event in each pulse. Size is equal to number of pulses
Returns
-------
"""
# Check inputs
assert event_time_zero_array.shape == event_index_array.shape
# Total counts
total_counts = len(event_time_offset_array)
# For all children except event time
for child_name, child_value, child_units in [
("event_index", event_index_array, None),
("event_time_offset", event_time_offset_array, b"microsecond"),
("total_counts", [total_counts], None),
]:
child_node = DataSetNode(name=self._create_child_name(child_name))
child_node.set_value(np.array(child_value))
if child_units is not None:
child_node.add_attributes({"units": child_units})
self.set_child(child_node)
# Set pulse time node (event time zero)
node_name = self._create_child_name("event_time_zero")
pulse_time_node = generate_event_time_zero_node(node_name, event_time_zero_array, run_start_time)
# link to self/its parent
self.set_child(pulse_time_node)
[docs]
class BankNode(drtsans.files.hdf5_rw.GroupNode):
"""Node for bank entry such as /entry/bank12"""
def __init__(self, name, bank_name):
"""Initialization
Parameters
----------
name: str
Bank node name
bank_name: str
name of bank, such as 'bank10' or 'bank39'
"""
self._bank_name = bank_name
super(BankNode, self).__init__(name)
# add NX_class
self.add_attributes({"NX_class": b"NXevent_data"})
[docs]
def set_events(
self,
event_id_array,
event_index_array,
event_time_offset_array,
run_start_time,
event_time_zero_array,
):
"""
Parameters
----------
event_id_array: numpy.ndarray
pixel IDs for each event. Size is equal to number of events
event_time_offset_array: numpy.ndarray
TOF of each neutron event. Size is equal to number of events
event_time_zero_array: numpy.ndarray
Staring time, as seconds offset to run start time, of each pulse
run_start_time: str
Run start time in ISO standard
event_index_array: numpy.ndarray
Index of staring event in each pulse. Size is equal to number of pulses
Returns
-------
"""
# Check inputs
assert event_id_array.shape == event_time_offset_array.shape
assert event_time_zero_array.shape == event_index_array.shape
# Total counts
total_counts = len(event_id_array)
# For all children except event time
for child_name, child_value, child_units in [
("event_id", event_id_array, None),
("event_index", event_index_array, None),
("event_time_offset", event_time_offset_array, b"microsecond"),
("total_counts", [total_counts], None),
]:
child_node = DataSetNode(name=self._create_child_name(child_name))
child_node.set_value(np.array(child_value))
# add target
target_value = f"/entry/instrument/{self._bank_name}/{child_name}".encode()
child_node.add_attributes({"target": target_value})
if child_units is not None:
child_node.add_attributes({"units": child_units})
self.set_child(child_node)
# Set pulse time node
self._set_pulse_time_node(event_time_zero_array, run_start_time)
def _set_pulse_time_node(self, event_time_zero_array, run_start_time):
"""Set pulse time zero node
# add attriutes including
# offset : run start time
# offset_nanoseconds
# offset_seconds
Parameters
----------
event_time_zero_array: numpy.ndarray
Staring time, as seconds offset to run start time, of each pulse
run_start_time: str
run start time in ISO format
Returns
-------
"""
# create child node name with full path
node_name = self._create_child_name("event_time_zero")
# create child node for event time zero (pulse time)
pulse_time_node = generate_event_time_zero_node(node_name, event_time_zero_array, run_start_time)
# link to self/its parent
self.set_child(pulse_time_node)
def generate_event_time_zero_node(node_name, event_time_zero_array, run_start_time):
# calculate run start time offset
offset_second, offset_ns = calculate_time_offsets(run_start_time)
# Special for event_time_zero node
pulse_time_node = DataSetNode(name=node_name)
# set value
pulse_time_node.set_value(event_time_zero_array)
# set up attribution dictionary
pulse_attr_dict = {
"units": b"second",
"target": b"/entry/DASlogs/frequency/time",
"offset": np.bytes_(run_start_time),
"offset_nanoseconds": offset_ns,
"offset_seconds": offset_second,
}
pulse_time_node.add_attributes(pulse_attr_dict)
return pulse_time_node
[docs]
class InstrumentNode(drtsans.files.hdf5_rw.GroupNode):
"""
Node for instrument entry (i.e., /entry/instrument)
"""
def __init__(self):
""" """
super(InstrumentNode, self).__init__(name="/entry/instrument")
# add the NeXus class attributes
self.add_attributes({"NX_class": b"NXinstrument"})
[docs]
def set_instrument_info(self, target_station_number, beam_line, name, short_name):
"""Set instrument information
Parameters
----------
target_station_number: int
target station number. 1 is used for HFIR
beam_line: Bytes
CG2, CG3
name: Bytes
CG2, CG3
short_name: Bytes
CG2, CG3
Returns
-------
None
"""
# target station node
target_station_node = DataSetNode(name=f"{self.name}/target_station_number")
target_station_node.set_value(np.array(target_station_number))
self.set_child(target_station_node)
# beam line
beam_line_node = DataSetNode(name=f"{self.name}/beamline")
beam_line_node.set_string_value(beam_line)
self.set_child(beam_line_node)
# beam line name
name_node = DataSetNode(name=f"{self.name}/name")
name_node.set_string_value(name)
self.set_child(name_node)
name_node.add_attributes({"short_name": short_name})
[docs]
def set_idf(self, idf_str, idf_type, description):
"""Set instrument xml
Parameters
----------
idf_str: Bytes
IDF XML string
idf_type: Bytes
IDF type
description: Bytes
Description
Returns
-------
"""
# Create the instrument_xml node
xml_node_name = f"{self.name}/instrument_xml"
xml_node = GroupNode(name=xml_node_name)
xml_node.add_attributes({"NX_class": b"NXnote"})
self.set_child(xml_node)
# add data node
data_node = DataSetNode(name=f"{xml_node_name}/data")
data_node.set_string_value(idf_str)
xml_node.set_child(data_node)
# add description
des_node = DataSetNode(name=f"{xml_node_name}/description")
des_node.set_string_value(description)
xml_node.set_child(des_node)
# add type
type_node = DataSetNode(name=f"{xml_node_name}/type")
type_node.set_string_value(idf_type)
xml_node.set_child(type_node)
class DasLogNode(drtsans.files.hdf5_rw.GroupNode):
"""
Node for one specific DAS log such as /entry/DASlogs/sample_detector_distance
"""
def __init__(self, log_name, log_times, start_time, log_values, log_unit):
"""DAS log node for specific
Parameters
----------
log_name: str
full path log name as /entry/DASlogs/{log_name}
log_times: numpy.ndarray
relative sample log time
start_time: Bytes
ISO standard time for run start
log_values: numpy.ndarray
sample log values
log_unit: Byes
log unit
"""
super(DasLogNode, self).__init__(name=log_name)
self._log_times = log_times
self._run_start = start_time
self._log_values = log_values
self._log_unit = log_unit
# Standard NX_class type for DASlogs' node
self.add_attributes({"NX_class": b"NXlog"})
# Set log value and related terms
self._set_log_values()
# Set log times
self._set_log_times()
def _set_log_times(self):
"""Set log times' node
- time
Returns
-------
"""
time_offset_second, time_offset_ns = calculate_time_offsets(self._run_start)
# Now I set up time related attributes
time_node = DataSetNode(name=self._create_child_name("time"))
time_node.set_value(self._log_times)
time_node.add_attributes(
{
"offset_nanoseconds": time_offset_ns,
"offset_seconds": time_offset_second,
"start": self._run_start,
"units": b"second",
}
)
self.set_child(time_node)
def _set_log_values(self):
"""Set time and value including
- average_value
- average_value_error
- maximum_value
- minimum_value
- value
Returns
-------
"""
if any([isinstance(me, np.bytes_) for me in self._log_values]):
# string type log requires no calculation
child_node = DataSetNode(name=self._create_child_name("value"))
child_node.set_value(self._log_values)
child_node.add_attributes({"units": self._log_unit})
self.set_child(child_node)
else:
# value type log requires some calculation
average_value = self._log_values.mean()
average_value_error = self._log_values.std()
min_value = np.min(self._log_values)
max_value = np.max(self._log_values)
for child_name, child_value in [
("average_value", [average_value]),
("average_value_error", [average_value_error]),
("maximum_value", [max_value]),
("minimum_value", [min_value]),
("value", self._log_values),
]:
child_node = DataSetNode(name=self._create_child_name(child_name))
child_node.set_value(np.array(child_value))
child_node.add_attributes({"units": self._log_unit})
self.set_child(child_node)
def set_device_info(self, device_id, device_name, target):
"""Set node for device related information
Parameters
----------
device_id
device_name
target
Returns
-------
"""
# Create Device ID node
# Need to make sure all strings are Bytes
for node_name, info_value in [
("device_id", [device_id]),
("device_name", [np.bytes_(device_name)]),
("target", [np.bytes_(target)]),
]:
child_node = DataSetNode(name=self._create_child_name(node_name))
child_node.set_value(np.array(info_value))
self.set_child(child_node)
[docs]
class DasLogsCollectionNode(drtsans.files.hdf5_rw.GroupNode):
"""
Node for '/entry/DASlogs'
"""
def __init__(self):
"""
Initialization
"""
super(DasLogsCollectionNode, self).__init__(name="/entry/DASlogs")
self.add_attributes({"NX_class": b"NXcollection"})
def calculate_time_offsets(iso_time):
"""Calculate time offset from 1990.01.01
Parameters
----------
iso_time: str, bytes
time in ISO format
Returns
-------
tuple
offset time in second (whole number), offset time in nanosecond (whole number)
"""
# convert
if isinstance(iso_time, bytes):
iso_time = iso_time.decode()
# convert date time in IOS string to datetime instance
run_start_time = dateutil.parser.parse(iso_time)
epoch_time = datetime.datetime(1990, 1, 1, tzinfo=datetime.timezone(datetime.timedelta(0)))
# offsets
time_offset = run_start_time.timestamp() - epoch_time.timestamp()
time_offset_second = int(time_offset)
# nanosecond shift
if iso_time.count(".") == 0:
# zero sub-second offset
time_offset_ns = 0
elif iso_time.count(".") == 1:
# non-zero sub-second offset
# has HH:MM:SS.nnnsssnnnss-05 format
sub_second_str = iso_time.split(".")[1].split("-")[0]
sub_seconds = float(sub_second_str)
# convert from sub seconds to nano seconds
# example: 676486982
digits = int(math.log(sub_seconds) / math.log(10)) + 1
time_offset_ns = int(sub_seconds * 10 ** (9 - digits))
else:
# more than 1 '.': not knowing the case. Use robust solution
time_offset_ns = int((time_offset - time_offset_second) * 1e9)
return time_offset_second, time_offset_ns