Source code for drtsans.reductionlog

from datetime import datetime
import h5py
import glob
import os
import re
import json
import socket
from mantid import __version__ as mantid_version
from mantid.kernel import logger

# from mantid.simpleapi import mtd, SaveNexusProcessed
import numpy as np
from drtsans import __version__ as drtsans_version
from os import environ, path


__all__ = ["savereductionlog"]


def _createnxgroup(parent, name, klass):
    child = parent.create_group(name)
    child.attrs["NX_class"] = klass
    return child


def _savenxnote(nxentry, name, mimetype, file_name, data):
    """Write an NXnote inside an entry
    http://download.nexusformat.org/doc/html/classes/base_classes/NXnote.html

    nxentry: HDF handle
        NXentry to put the NXnote into
    name: str
        Name of the NXnote group
    mimetype: str
        Mimetype of the data
    file_name: str
        Name of the file that the note was taken from
    data: str
        The contents of the note
    """
    nxnote = _createnxgroup(nxentry, name, "NXnote")

    nxnote.create_dataset(name="file_name", data=[np.string_(file_name)])
    nxnote.create_dataset(name="type", data=[np.string_(mimetype)])
    nxnote.create_dataset(name="data", data=[np.string_(data)])

    return nxnote


def _savepythonscript(nxentry, pythonfile, pythonscript):
    """Write the python script as a NXnote

    nxentry: HDF handle
        NXentry to put the NXnote into
    pythonfile: str
        Filename that was supplied to reduction
    pythonscript: str
        The python script itself
    """
    # read the contents of the script
    if not pythonscript and path.exists(pythonfile):
        with open(pythonfile, "r") as script:
            pythonscript = script.read()

    return _savenxnote(nxentry, "reduction_script", "text/x-python", pythonfile, pythonscript)


def _savereductionjson(nxentry, parameters):
    """Save the all of the reduction parameters as an NXnote

    nxentry: HDF handle
        NXentry to put the NXnote into
    parameters: dict or str
        The parameters supplied to the reduction script. This will be converted
        to a json string if it isn't one already.
    """

    if "filename" in parameters.keys():
        filename = parameters["filename"]
    else:
        filename = ""

    # convert the parameters into a string to save
    if not isinstance(parameters["data"], str):
        parameters = json.dumps(parameters["data"])

    return _savenxnote(
        nxentry,
        "reduction_json",
        "application/json",
        file_name=filename,
        data=parameters,
    )


def _savereductionparams(nxentry, parameters, name_of_entry):
    """save the reduction parameters as a nice tree structure"""
    if parameters is None:
        return

    nxentry = _createnxgroup(nxentry, name_of_entry, "NXnote")

    for _key, _value in parameters.items():
        if isinstance(_value, dict):
            _savereductionparams(nxentry, _value, _key)
        elif isinstance(_value, list):
            _new_entry = nxentry.create_dataset(name=_key, data=str(_value))
            _new_entry.attrs["NX_class"] = "NXdata"
        else:
            if _value is None:
                _value = ""
            _new_entry = nxentry.create_dataset(name=_key, data=_value)
            _new_entry.attrs["NX_class"] = "NXdata"


def _savenxprocess(nxentry, program, version):
    """Create a NXprocess for the specified program

    Parameters
    ----------
    nxentry: HDF handle
        NXentry to put the NXprocess into
    program: str
        name of the program
    version: str
        program's version string
    """
    nxprocess = _createnxgroup(nxentry, program, "NXprocess")
    nxprocess.create_dataset(name="program", data=[np.string_(program)])
    nxprocess.create_dataset(name="version", data=[np.string_(version)])


def _savenxlog(nxcollection, property):
    """Create a NXlog from the supplied property

    Parameters
    ----------
    nxcollection: HDF handle
        NXcollection that the NXlog should be added to
    property: PropertyWithValue
        log item to inspect and write its values to disk
    """
    nxlog = _createnxgroup(nxcollection, property.name, "NXlog")

    try:
        if isinstance(property.value, str):
            value = nxlog.create_dataset(name="value", data=[np.string_(property.value)])
        elif len(property.value) > 1:
            value = nxlog.create_dataset(name="value", data=property.value)
        else:
            raise RuntimeError("Should never have gotten here")
    except TypeError:
        value = nxlog.create_dataset(name="value", data=[property.value])

    if value and property.units:
        value.attrs["units"] = property.units

    # TODO should get time from the logs but current examples don't have that
    # this code should work
    try:
        times = property.times
        if len(times) > 0:
            # convert to float in seconds
            epoch = times[0].toISO8601String()
            times = (times - times[0]) / np.timedelta64(1, "s")
            times = nxlog.create_dataset(name="time", data=times)
            times.attrs["offset"] = np.string_(epoch)
            times.attrs["units"] = "second"
    except AttributeError:
        pass  # doesn't have times

    return nxlog


def _savespecialparameters(nxentry, dict_special_parameters, name_of_entry):
    """Save the special parameters

    Parameters
    ----------
    nxentry: HDF handle
        Entry group to put information in
    dict_special_parameters: dict
        dictionary where to get the parameters from
    name_of_entry: String
        Name of the top tree structure
    """
    nxentry = _createnxgroup(nxentry, name_of_entry, "NXnote")

    for _key, _value in dict_special_parameters.items():
        if isinstance(_value, dict):
            _savereductionparams(nxentry, _value, _key)
        else:
            if _value is None:
                _value = ""
            _new_entry = nxentry.create_dataset(name=_key, data=_value)
            _new_entry.attrs["NX_class"] = "NXdata"


def _savesamplelogs(nxentry, dict_sample_logs, name_of_entry):
    """Save all the DAS logs infos

    Parameters
    ----------
    nxentry: HDF handle
        Entry group to put information in
    dict_sample_logs: SampleLogs object
    name_of_entry: String
        Name of the top tree structure
    """
    nxentry = _createnxgroup(nxentry, name_of_entry, "NXnote")

    for _sample_key in dict_sample_logs.keys():
        nxentry_log = _createnxgroup(nxentry, _sample_key, "NXnote")
        local_dict_sample_logs = dict_sample_logs[_sample_key]
        for _key in local_dict_sample_logs.keys():
            _value = str(local_dict_sample_logs[_key].value)
            if _value is None:
                _value = ""
            _units = str(local_dict_sample_logs[_key].units)
            if _units is None:
                _units = ""
            _new_entry = nxentry_log.create_dataset(name=_key, data=_value)
            _new_entry.attrs["NX_class"] = "NXdata"
            _new_entry.attrs["units"] = _units


def _create_groupe(entry=None, name="Default", data=[], units=""):
    if data is not None:
        _entry_group = entry.create_dataset(name=name, data=data)
        _entry_group.attrs["units"] = units


def _save_logslicedata(logslicedata={}, index=0, topEntry=None):
    if logslicedata == {}:
        return

    nameGroup = logslicedata[str(index)]["name"]
    entry = topEntry.create_group(nameGroup)
    entry.attrs["NX_class"] = "NXdata"

    _create_groupe(
        entry=entry,
        name="data",
        data=logslicedata[str(index)]["data"],
        units=logslicedata[str(index)]["units"],
    )


def _save_iqxqy_to_log(iqxqy=None, topEntry=None):
    entry = topEntry.create_group("I(QxQy)")
    entry.attrs["NX_class"] = "NXdata"

    # intensity
    _create_groupe(entry=entry, name="I", data=iqxqy.intensity, units="1/A")

    # errors
    _create_groupe(entry=entry, name="Idev", data=iqxqy.error, units="1/cm")

    # qx
    if iqxqy.qx is not None:
        _create_groupe(entry=entry, name="Qx", data=iqxqy.qx, units="1/A")

        _create_groupe(entry=entry, name="Qxdev", data=iqxqy.delta_qx, units="1/A")

    # qy
    if iqxqy.qy is not None:
        _create_groupe(entry=entry, name="Qy", data=iqxqy.qy, units="1/A")

        _create_groupe(entry=entry, name="Qydev", data=iqxqy.delta_qy, units="1/A")
    # wavelength
    if iqxqy.wavelength is not None:
        wavelength = "{}".format(iqxqy.wavelength)
        _create_groupe(entry=entry, name="Wavelength", data=wavelength, units="A")


def __save_individual_iq_to_log(iq=None, topEntry=None, entryNameExt=""):
    entry_name = "I(Q)"
    if entryNameExt:
        entry_name += "_" + entryNameExt
    entry = topEntry.create_group(entry_name)
    entry.attrs["NX_class"] = "NXdata"
    entry.attrs["signal"] = "I"
    entry.attrs["axes"] = "Q"

    # intensity
    _create_groupe(entry=entry, name="I", data=iq.intensity, units="1/cm")

    # errors
    _create_groupe(entry=entry, name="Idev", data=iq.error, units="1/cm")

    # mod_q
    if iq.mod_q is not None:
        _create_groupe(entry=entry, name="Q", data=iq.mod_q, units="1/A")

        logger.debug(f"delta mod q: {iq.delta_mod_q}")
        _create_groupe(entry=entry, name="Qdev", data=iq.delta_mod_q, units="1/A")

    # wavelength
    if iq.wavelength:
        wavelength = "{}".format(iq.wavelength)
        _create_groupe(entry=entry, name="Wavelength", data=wavelength, units="A")


def _save_iq_to_log(iq=None, topEntry=None):
    if (type(iq) is list) and len(iq) > 1:
        for _index, _iq in enumerate(iq):
            __save_individual_iq_to_log(iq=_iq, topEntry=topEntry, entryNameExt="wedge{}".format(_index))
    else:
        __save_individual_iq_to_log(iq=iq[0], topEntry=topEntry, entryNameExt="")


def _retrieve_beam_radius_from_out_file(outfolder=""):
    name_of_out_file = glob.glob(os.path.join(outfolder, "*.out"))
    if name_of_out_file == []:
        return ""

    with open(name_of_out_file[0], "r") as handler:
        file_contain = handler.readlines()
    string_to_look_for = "Radius calculated from the input workspace ="
    for _line in file_contain:
        if string_to_look_for in _line:
            regular_exp = r".*= (?P<radius>.*) mm\n"
            m = re.search(regular_exp, _line)
            if m:
                return m.group("radius")
    return ""


def _appendCalculatedBeamRadius(specialparameters=None, json=None, outfolder=""):
    if json is None:
        return specialparameters

    try:
        beam_radius_in_json = json["configuration"]["mmRadiusForTransmission"]
    except KeyError:
        return specialparameters
    except TypeError:
        return specialparameters

    if not beam_radius_in_json:
        beam_radius_in_json = _retrieve_beam_radius_from_out_file(outfolder=outfolder)

    if specialparameters is None:
        specialparameters = {"transmission_radius_used (mm)": beam_radius_in_json}
    else:
        specialparameters = {
            **specialparameters,
            "transmission_radius_used (mm)": beam_radius_in_json,
        }
    return specialparameters


[docs] def savereductionlog(filename="", detectordata=None, **kwargs): r"""Save the reduction log There are three ``NXentry``. The first is for the 1d reduced data, second is for the 2d reduced data, and the third is for the extra information about how the data was processed. The options read from the ``kwargs`` parameter are listed with the other parameters. Parameters ---------- detectordata: dict for each key (name of detector), will have iq: Iqmod and iqxqy: IQazimuthal where Iqmod is a tuple with the following informations: intensity, error, mod_q, delta_mode_q and IQazimuthal is a tuple with the following informations: intensity, error, qx, delta_qx, qy, delta_y python: string The script used to create everything (optional) pythonfile: string The name of the file containing the python script. Will be read into ``python`` argument if not already supplied (optional) reductionparams: str, dict The parameters supplied to the reduction script as either a nested :py:obj:`dict` or a json formatted :py:obj:`str` (optional) logslicedata: dict data corresponding to the various slices starttime: str When the original script was started (optional, default: now) hostname: str Name of the computer used. If not provided, will be gotten from the system environment ``HOSTNAME`` (optional) user: str User-id of who reduced the data (as in xcamms). If not provided will be gotten from the system environment ``USER`` (optional) username: str Username of who reduced the data (as in actual name). If not provided will be gotten from the system environment ``USERNAME`` (optional) specialparameters: dict dictionary of any other arguments you want to keep in the log file samplelogs: SampleLogs SampleLogs object of all the EPICS infos logged into the NeXus (and visible on ONCat) """ if not filename: filename = "_reduction_log.hdf" # raise RuntimeError('Cannot write to file "{}"'.format(filename)) if detectordata is None: raise RuntimeError("Provide at least one detector data {}".format(filename)) if type(detectordata) is not dict: raise RuntimeError( "detectordata has the wrong type. It should be a dictionary " "and not a {}".format(type(detectordata)) ) for _slice_name in detectordata.keys(): if type(detectordata[_slice_name]) is not dict: raise RuntimeError( "detectordata value has the wrong type. It should be a dictionary " "and not a {}".format(type(detectordata[_slice_name])) ) for _detector_name in detectordata[_slice_name].keys(): if type(detectordata[_slice_name][_detector_name]) is not dict: raise RuntimeError( f"detectordata[{_slice_name}][{_detector_name}] value has the wrong type. It " f"should be a dictionary " f"and not a {type(detectordata[_slice_name][_detector_name])}" ) if ( "iq" not in detectordata[_slice_name][_detector_name].keys() and "iqxqy" not in detectordata[_slice_name][_detector_name].keys() ): raise KeyError("Provide at least a iq and/or iqxqy keys to {}".format(filename)) logslicedata = kwargs.get("logslicedata", {}) if logslicedata: if type(logslicedata) is not dict: raise RuntimeError( "logslicedata has the wrong type. It should a dictionary " "and not a {}".format(type(logslicedata)) ) if len(logslicedata.keys()) > len(detectordata.keys()): raise ValueError( f"Can not have more logs slice data ({len(logslicedata.keys())}) than " f"slices ({len(detectordata.keys())})" ) # end of testing inputs writing_flag = "w" for _index, _slice_name in enumerate(detectordata.keys()): if _index > 0: writing_flag = "a" with h5py.File(filename, writing_flag) as handle: topEntry = handle.create_group(_slice_name) topEntry.attrs["NX_class"] = "NXdata" _current_detectordata = detectordata[_slice_name] for _frame_index, _frame_name in enumerate(_current_detectordata.keys()): _current_frame = _current_detectordata[_frame_name] midEntry = _createnxgroup(topEntry, _frame_name, "NXdata") cfkeys = list(_current_frame.keys()) logger.debug(f"current frame keys: {cfkeys}") if "iq" in _current_frame.keys() and "iqxqy" in _current_frame.keys(): logger.debug(str(_current_frame["iq"])) _save_iq_to_log(iq=_current_frame["iq"], topEntry=midEntry) logger.debug(str(_current_frame["iqxqy"])) _save_iqxqy_to_log(iqxqy=_current_frame["iqxqy"], topEntry=midEntry) elif "iq" in _current_frame.keys(): _save_iq_to_log(iq=_current_frame["iq"], topEntry=midEntry) else: _save_iqxqy_to_log(iqxqy=_current_frame["iqxqy"], topEntry=midEntry) _save_logslicedata(logslicedata=logslicedata, index=_index, topEntry=topEntry) # re-open the file to append other information with h5py.File(filename, "a") as handle: entry = _createnxgroup(handle, "reduction_information", "NXentry") # read the contents of the script _pythonfile = kwargs.get("pythonfile", None) if _pythonfile: _savepythonscript( entry, pythonfile=kwargs.get("pythonfile", None), pythonscript=kwargs.get("python", ""), ) _reduction_parameters = kwargs.get("reductionparams", "") if _reduction_parameters: _savereductionjson(entry, parameters=_reduction_parameters) _savereductionparams( entry, parameters=_reduction_parameters["data"], name_of_entry="reduction_parameters", ) # timestamp of when it happened - default to now starttime = kwargs.get("starttime", datetime.now().isoformat()) entry.create_dataset(name="start_time", data=[np.string_(starttime)]) # computer it was on hostname = kwargs.get("hostname", None) if not hostname: hostname = socket.gethostname() if hostname: entry.create_dataset(name="hostname", data=[np.string_(hostname)]) # software involved _savenxprocess(entry, "mantid", mantid_version) _savenxprocess(entry, "drtsans", drtsans_version) # user information user = kwargs.get("user", environ.get("USER", "")) if user: nxuser = _createnxgroup(entry, "user", "NXuser") nxuser.create_dataset(name="facility_user_id", data=[np.string_(user)]) username = kwargs.get("username", environ.get("USERNAME", "")) if username: nxuser.create_dataset(name="name", data=[np.string_(username)]) specialparameters = kwargs.get("specialparameters", None) if specialparameters: # add calculated beam radius if beam radius is None if _reduction_parameters: json_entry = _reduction_parameters["data"] else: json_entry = None specialparameters = _appendCalculatedBeamRadius( specialparameters, json=json_entry, outfolder=os.path.dirname(filename) ) if specialparameters: _savespecialparameters(entry, specialparameters, "special_parameters") samplelogs = kwargs.get("samplelogs", None) if samplelogs: _savesamplelogs(entry, samplelogs, "sample_logs")