# Read an arbitrary h5 file in order to study its structure
# Goal: reveal and duplicate a CANSAS file such that it can be imported by SASVIEW
import h5py
import numpy as np
__all__ = ["HDFNode", "GroupNode", "FileNode", "DataSetNode"]
def parse_h5_entry(h5_entry):
"""Parse an HDF5 entry and generate an HDFNode object including all the sub entries
Parameters
----------
h5_entry: ~h5py._hl.dataset.Dataset, ~h5py._hl.group.Group, ~h5py._hl.files.File
h5py entries including data set, group and file
Returns
-------
HDFNode
an HDFNode
"""
# Create entry node instance
entry_node = None
for h5_entry_type, buffer_node_class in [
(h5py._hl.files.File, FileNode),
(h5py._hl.group.Group, GroupNode),
(h5py._hl.dataset.Dataset, DataSetNode),
]:
if isinstance(h5_entry, h5_entry_type):
# generate node
entry_node = buffer_node_class()
# parse
entry_node.parse_h5_entry(h5_entry)
break
# Check
if entry_node is None:
raise RuntimeError("HDF entry of type {} is not supported".format(type(h5_entry)))
return entry_node
[docs]
class HDFNode(object):
"""
an HDF node with more information
"""
def __init__(self, name=None):
"""initialization
Parameters
----------
name: str, None
entry name
"""
self._name = name
# Set attributes and etc
self._attributes = dict()
[docs]
def match(self, other_node):
"""Compare 2 HDFNode to see whether they are same
If mismatch, an exception will be raised including
- TypeError: for nodes are nto same type
- ValueError: attribute or node name value mismatch
- KeyError: some attribute does not exist in both node
Parameters
----------
other_node: HDFNode
other node to compare
Returns
-------
"""
# compare class type
if not isinstance(other_node, type(self)):
raise TypeError(
"Try to match instance of class {} (other) to {} (self)".format(type(other_node), type(self))
)
# compare name
if self._name != other_node.name:
raise ValueError("self.name = {}; other.name = {}".format(self.name, other_node.name))
# compare attributes
if set(self._attributes.keys()) != set(other_node.attributes.keys()):
print(
"Data node {} Attributes are not same:\nself - other = {}]\nother - self = {}".format(
self.name,
set(self._attributes.keys()) - set(other_node.attributes.keys()),
set(other_node.attributes.keys()) - set(self._attributes.keys()),
)
)
raise KeyError(
"Data node {} Attributes are not same:\nself - other = {}]\nother - self = {}".format(
self.name,
set(self._attributes.keys()) - set(other_node.attributes.keys()),
set(other_node.attributes.keys()) - set(self._attributes.keys()),
)
)
# compare attribute values
error_msg = ""
for attr_name in self._attributes.keys():
if self._attributes[attr_name] != other_node.attributes[attr_name]:
error_msg += "Mismatch attribute {} value: self = {}, other = {}".format(
attr_name,
self._attributes[attr_name],
other_node.attributes[attr_name],
)
if error_msg:
raise ValueError(error_msg)
[docs]
def parse_h5_entry(self, h5_entry):
"""Parse an HDF5 entry
Parameters
----------
h5_entry
Returns
-------
"""
# Name
self._name = h5_entry.name
# Parse attributes
# Reset data structure
self._attributes = dict()
# parse attributes
for attr_name in h5_entry.attrs:
# returned h5_attribute in fact is attribute name
self._attributes[attr_name] = h5_entry.attrs[attr_name]
@property
def name(self):
return self._name
@property
def attributes(self):
return self._attributes
[docs]
def add_attributes(self, attributes):
"""Add a list of attributes to the HDF5 node
Parameters
----------
attributes: ~dict
Attributes to add. key = attribute name, value = attribute value
Returns
-------
"""
for attr_name in attributes.keys():
self._attributes[attr_name] = attributes[attr_name]
[docs]
def write(self, inputs):
"""
Parameters
----------
inputs: str, ~h5py._hl.group.Group, ~h5py._hl.files.File
Node to input
Returns
-------
"""
raise NotImplementedError("Virtual method to write {}".format(inputs))
[docs]
def write_attributes(self, curr_entry):
# attributes
for attr_name in self._attributes:
# ignore if an attribute is None (might be missing)
if self._attributes[attr_name] is None:
continue
try:
curr_entry.attrs[attr_name] = self._attributes[attr_name]
except TypeError as type_error:
print(f"[ERROR] {self._name}-node attribute {attr_name} is of type {type(attr_name)}")
raise TypeError(
f"[ERROR] {self._name}-node attribute {attr_name} is of type {type(attr_name)}: {type_error}"
)
[docs]
class GroupNode(HDFNode):
"""
Node for an HDF Group
"""
def __init__(self, name=None):
"""
Initialization
"""
super(GroupNode, self).__init__(name)
self._children = list()
@property
def children(self):
return self._children[:]
def _create_child_name(self, short_name):
"""Create name of a child with full path from a short name
For example:
self._name = '/entry/DASlogs/Sampleid'
child short name = 'time'
child name with full path = '/entry/DASlogs/Sampleid/time'
Parameters
----------
short_name: str
short name of child without full path
Returns
-------
"""
return f"{self._name}/{short_name}"
[docs]
def match(self, other_node):
"""Compare this node with another node
Parameters
----------
other_node
Returns
-------
"""
# call base class
super(GroupNode, self).match(other_node)
# compare child
for child in self._children:
child_name = child.name
other_child = other_node.get_child(child_name)
child.match(other_child)
[docs]
def get_child(self, child_name, is_short_name=False):
"""Get a child
Parameters
----------
child_name: str
is_short_name: bool
If True, concatenate the child name with current self._name
Returns
-------
GroupNode, DataSetNode
Child HDFNode
"""
# process name
if is_short_name:
if self._name.endswith("/"):
child_name = f"{self._name}{child_name}"
else:
child_name = f"{self._name}/{child_name}"
child_node = None
for child_node_i in self._children:
if child_node_i.name == child_name:
child_node = child_node_i
break
if child_node is None:
raise RuntimeError(f"There is no child node with name {child_name} for node {self.name})")
return child_node
[docs]
def remove_child(self, child_node_name):
for child_node in self._children:
if child_node.name == child_node_name:
self._children.remove(child_node)
[docs]
def set_child(self, child_node):
"""
Parameters
----------
child_node: GroupNode, DataSetNode
child node to append
Returns
-------
"""
# Check whether a child with same name exists
for child_node_i in self._children:
if child_node_i.name == child_node.name:
raise RuntimeError(f"Node {self.name} has child with name {child_node.name} already!")
# Attach
self._children.append(child_node)
[docs]
def parse_h5_entry(self, h5_entry):
"""Parse HDF5 entry
Parameters
----------
h5_entry: ~h5py._hl.group.Group
hdf5 entry
Returns
-------
None
"""
# Parse in general way
super(GroupNode, self).parse_h5_entry(h5_entry)
# parse children
children_names = h5_entry.keys()
for child_name in children_names:
child_node = parse_h5_entry(h5_entry[child_name])
self._children.append(child_node)
[docs]
def write(self, parent_entry):
"""Write buffer node to an HDF entry
Parameters
----------
parent_entry: ~h5py._hl.dataset.Dataset, ~h5py._hl.group.Group, ~h5py._hl.files.File
parent HDF node
Returns
-------
"""
# create group or data set
# h5py._hl.group.Group only
curr_entry = parent_entry.create_group(self._name)
# write
self.write_content(curr_entry)
[docs]
def write_content(self, curr_entry):
# write child
for child in self._children:
child.write(curr_entry)
# attributes
self.write_attributes(curr_entry)
[docs]
class FileNode(GroupNode):
"""
Node for an HDF file
"""
def __init__(self):
"""
Initialization
"""
super(FileNode, self).__init__("/")
[docs]
def write(self, file_name):
"""Write to a file
Parameters
----------
file_name: str
Name of file to write to
Returns
-------
"""
# create file node
h5 = h5py.File(file_name, "w")
# write
self.write_content(h5)
# close
h5.close()
[docs]
class DataSetNode(HDFNode):
"""
Node for data set
"""
def __init__(self, name=None):
"""
Initialization
"""
super(DataSetNode, self).__init__(name)
self._value = None
[docs]
def match(self, other_node):
"""Match this node with other
Parameters
----------
other_node: DataSetNode
another node to match against
Returns
-------
"""
# call base class's match
super(DataSetNode, self).match(other_node)
# compare this one
try:
np.testing.assert_allclose(self._value, other_node.value)
except AssertionError as ass_err:
raise ValueError(ass_err)
except TypeError:
# in case value is not float or integer
if self._value.shape != other_node.value.shape:
raise ValueError(
f"Node {self._name}: Value have different shape: self = {self.value.shape}, "
f"other = {other_node.value.shape}"
)
this_value = self._value.flatten()
that_value = other_node.value.flatten()
for i in range(this_value.shape[0]):
if this_value[i] != that_value[i]:
raise ValueError("Different values:\n 1: {}\n 2: {}".format(self._value, other_node.value))
[docs]
def parse_h5_entry(self, h5_entry):
"""Parse HDF5 entry
Parameters
----------
h5_entry: ~h5py._hl.group.Group
hdf5 entry
Returns
-------
None
"""
# Parse in general way
super(DataSetNode, self).parse_h5_entry(h5_entry)
# Parse value
self._value = h5_entry[()]
@property
def value(self):
return self._value
[docs]
def set_value(self, data_array):
"""Set data value (as numpy array)
Parameters
----------
data_array: np.ndarray
data value
Returns
-------
"""
self._value = data_array
[docs]
def set_string_value(self, str_value):
"""Set value from a single string (object)
Parameters
----------
str_value: str
string to be written to an entry's only value
Returns
-------
"""
# it is possible that input string is of type as unicode. so it is better
# to enforce it to be a string (encoded string) that can be accepted by h5py
self._value = np.array([np.bytes_(str_value)])
[docs]
def write(self, parent_entry):
"""Write buffer node to an HDF entry
Parameters
----------
parent_entry: ~h5py._hl.group.Group, ~h5py._hl.files.File
parent HDF node
Returns
-------
"""
# Generate current entry and set the data
curr_entry = parent_entry.create_dataset(self._name, data=self._value)
self.write_attributes(curr_entry)