# standard imports
import ast
from copy import deepcopy
import itertools
import json
import os
import re
import warnings
# third-party imports
import jsonschema
from jsonschema.exceptions import relevance
from numpy import greater
from mantid.kernel import logger
import referencing
warnings.filterwarnings("ignore")
from mantid.kernel import IntArrayProperty # noqa: E404
# drtsans imports
from drtsans import configdir # noqa: E404, non-source files
from drtsans.instruments import (
instrument_filesystem_name,
instrument_standard_name,
instrument_standard_names,
) # noqa: E404
from drtsans.path import abspath, abspaths # noqa: E404
COMMON_SCHEMA_FILEPATH = os.path.join(configdir, "schema", "common.json")
__all__ = [
"reduction_parameters",
"update_reduction_parameters",
"validate_reduction_parameters",
]
def type_selector(preferred_type): # noqa: C901
r"""
Callable the will cast an object to a preferred type.
Example: type_enforcer('str')(42) returns '42'
Parameters
----------
preferred_type: str
One of 'str', 'int', 'float', '[str]', '[float]', '[dict]'
Returns
-------
:py:obj:`function`
"""
def list_comprehension_exec(item_type):
r"""
Select the list below comprehension executor based on the type of the list item
Parameters
----------
item_type: type
The preferred type of the list items
Returns
-------
function
a list comprehension executor in charge of casting the items in a list to a particular type.
"""
def comprehension_executor(a_list):
r"""
Conversion of the items in a list to a selected type. The list can be in a string representation,
or a number
Examples:
- [1, 2], ['1', '2'], '1, 2', '1 2' are all cast into [item_type('1'), item_type('2')
- 1 is cast into [item_type(1)]
- 1.0 is cast into [item_type(1.0)]
Parameters
----------
a_list: int, float, str, list
If list, then list items must be either of int, float, str.
Returns
-------
list
items in this list are of type `item_type`
"""
if isinstance(a_list, (int, float)):
a_list = [a_list]
elif isinstance(a_list, str) is True: # a_list is the string representation of a list
for brace in ("[", "]", "(", ")"):
a_list = a_list.replace(brace, "")
if "," in a_list:
a_list = a_list.split(",")
else:
a_list = a_list.split(None)
return [item_type(x) for x in a_list]
return comprehension_executor
def list_comprehension_str2dict(instance):
r"""
Convert the items of a list into dictionaries. It is understood that each list items is the
string representation of a dictionary.
Example: ["{'Pixel':'1-18,239-256'}", "{'Bank':'18-24,42-48'}"]
Parameters
----------
instance: list
Returns
-------
dict
"""
list_of_dictionaries = list()
for item in instance:
if isinstance(item, str):
try:
list_of_dictionaries.append(ast.literal_eval(item))
except ValueError:
raise ValueError(f'Could not translate "{item}" into a python dictionary')
elif isinstance(item, dict):
list_of_dictionaries.append(item)
else:
raise ValueError(f"{instance} must be a list of strings or a list of dictionaries")
return list_of_dictionaries
def run_str(instance):
r"""
Cast to a list of run numbers into a string, iterating over items when the object to cast is a list
Examples: [1, '2', '3 - 5', '6:8'] and '1, 2, 3-5, 6 : 8' both become '1, 2, 3, 4, 5, 6, 7, 8'
Parameters
----------
instance:str, list
Returns
-------
str
"""
if isinstance(instance, (list, tuple)):
return ", ".join([run_str(item) for item in instance])
# expand any run range, such as 12345-12349
instance = str(instance)
# Expand
for run_range in re.findall(r"(\d+\s*[-:]\s*\d+)", instance):
run_range_no_whitespaces = run_range.replace(" ", "")
try:
all_runs = IntArrayProperty("_", run_range_no_whitespaces).valueAsStr
except RuntimeError as e:
raise ValueError("Invalid range format: {}".format(run_range_no_whitespaces)) from e
instance = instance.replace(run_range, all_runs)
return instance
# the type_selector cast the input instance to desired data-type
dispatcher = {
"int": int,
"float": float,
"str": str,
"runstr": run_str,
"[float]": list_comprehension_exec(float),
"[str]": list_comprehension_exec(str),
"[dict]": list_comprehension_str2dict,
}
return dispatcher[preferred_type]
class ReferenceResolver:
r"""
Uncompress dictionary items of the sort `{'$ref': URI}` by substituting string `URI`
with the corresponding contents in the provided schema file.
Also recursive uncompression, when the URI-associated value contains itself more {'$ref': URI} items.
URI strings allways begin as `base_uri#/`, where `base_uri` is a string. The assumed convention is
that `base_uri` is the file name of the schema file. For instance,
if `ReferenceResolver("/tmp/common.json"), then `base_uri == "common.json"`.
Parameters
----------
referred: str
Absolute path to the JSON schema containing the definitions for the references. Default
is common.json
"""
def __init__(self, referred):
with open(
referred,
) as file_handle:
schema_dict = json.load(file_handle)
resource = referencing.Resource.from_contents(schema_dict)
base_uri = os.path.basename(referred)
registry = referencing.Registry().with_resource(uri=base_uri, resource=resource)
self._resolver = registry.resolver()
def dereference(self, to_resolve):
r"""
Resolve all '$ref' keys found in a dictionary. Do a recursive resolution for nested
dictionaries containing '$ref' keys
Parameters
----------
to_resolve: dict
property dictionary containing '$ref' keys
Returns
-------
dict
property dictionary where items containing '$ref' keys have been resolved
"""
while "$ref" in to_resolve:
resolved = self._resolver.lookup(to_resolve["$ref"]).contents
to_resolve.pop("$ref")
to_resolve.update(resolved)
for name, value in to_resolve.items():
if isinstance(value, dict):
to_resolve[name] = self.dereference(value) # nested dictionaries must be looked, just in case
return to_resolve
class Suggest:
r"""
Set of property names in a schema. The purpose of this class if to offer property name suggestions
when a user enters a property name that is not in the set.
Parameters
----------
entries: list
A list of strings representing the property names in a schema
"""
@staticmethod
def levenshtein(query, entry):
r"""
Calculate similarity between two strings.
Credit to <https://www.python-course.eu/levenshtein_distance.php>
Parameters
----------
query: str
potential entry typed in by the user.
entry: str
One of the entries in the JSON file/object.
Returns
-------
int
"""
rows, cols = len(query) + 1, len(entry) + 1
dist = [[0 for x in range(cols)] for x in range(rows)]
# source prefixes can be transformed into empty strings by deletions:
for i in range(1, rows):
dist[i][0] = i
# entry prefixes can be created from an empty source string by inserting the characters
for i in range(1, cols):
dist[0][i] = i
for col in range(1, cols):
for row in range(1, rows):
if query[row - 1] == entry[col - 1]:
cost = 0
else:
cost = 1
dist[row][col] = min(
dist[row - 1][col] + 1, # deletion
dist[row][col - 1] + 1, # insertion
dist[row - 1][col - 1] + cost,
) # substitution
return dist[row][col]
def __init__(self, entries):
self.entries = set(entries)
def __len__(self):
return len(self.entries)
def top_match(self, query):
r"""
Find the similarity between a query word and the reduction parameter entries, then return the
most similar entry
Parameters
----------
query: str
Potential entry typed in by the user
Returns
-------
str
"""
scores = [(Suggest.levenshtein(query, entry), entry) for entry in self.entries]
return sorted(scores)[0][1]
resolver_common = ReferenceResolver(os.path.join(configdir, "schema", "common.json"))
class DefaultJson:
r"""
JSON dictionary containing only keywords related to physical properties (i.e., the domain
of the business)
This version of the schema is easy to review by the instrument scientists because it contains only
keywords related to the instrument. Keywords specifying the data-types and the validators are
omitted.
Parameters
----------
schema: dict
field: str
entry in the parameter's schema that we want to use as default value for the parameter.
"""
# public class/static methods:
@staticmethod
def trim_schema(schema, field="default"):
r"""
Trim the (key, val) items from a full schema so that keys are assigned their default values,
or :py:obj:`None`
Parameters
----------
schema: dict
field: str
entry in the schema for each parameter that we want to use as value
Returns
-------
dict
Property names and default values
"""
# Bring items under 'properties' as items of the current schema
if "properties" in schema:
dict_slim = dict()
for name, value in schema["properties"].items():
dict_slim[name] = DefaultJson.trim_schema(value, field=field)
return dict_slim
return schema.get(field, None)
# public bound methods:
@property
def parameters(self):
return self._json
@property
def property_names(self):
return set(self._property_names())
def dumps(self, **kwargs):
r"""
JSON-dump schema to string after removing certain names from the dictionary.
The goal is to yield a dictionary containing only instrument-related property names and values.
Parameters
----------
output_json: str, io.StringIO
File name or File-like object where to save the reduction parameters. If passing file name,
the file extension must by ".json"
kwargs: dict
Optional arguments for json.dump
Returns
-------
str
"""
# default formatting
if "indent" not in kwargs:
kwargs["indent"] = 2
return json.dumps(self._json, **kwargs)
def dump(self, output_json, **kwargs):
r"""
JSON dump
Parameters
----------
output_json: str, io.StringIO
File name or File-like object where to save the reduction parameters. If passing file name,
the file extension must by ".json"
kwargs: dict
Optional arguments for json.dump
"""
file_handle = output_json # initialize assuming the file handle is a file-like object
if isinstance(output_json, str):
if output_json.endswith(".json") is False:
raise RuntimeError(f'"{output_json}" extension is not ".json"')
file_handle = open(output_json, "w")
# default formatting
if "indent" not in kwargs:
kwargs["indent"] = 2
json.dump(self._json, file_handle, **kwargs)
file_handle.close()
def to_rest(self, line_length_max=75):
r"""
Represent the default parameters as a restructuredtext string.
Returns
-------
str
"""
def quote_in(lines):
r"""Fix the quote marks when splitting a long line into a list of shorter lines"""
if len(lines) == 1:
return lines # the line was not splited, so nothing to do
quoted_lines = [lines[0] + '"'] # first line needs ending '"'
for line in lines[1:-1]:
quoted_lines.append('"' + line + '"') # middle lines must be quoted
quoted_lines.append('"' + lines[-1]) # last line needs beginning '"'
return quoted_lines
name = self.parameters["instrumentName"]
doc = f"{name}\n" + "".join(["="] * len(name)) + "\n\n" # instrument header
doc += ".. code-block:: python\n\n"
def_dict = list()
# Split long lines
for line in self.dumps(indent=2).split("\n"):
def_dict.extend(quote_in([line[i : i + line_length_max] for i in range(0, len(line), line_length_max)]))
# Three white spaces for proper indentation of restructuredtext blockquotes
def_dict = "\n ".join(def_dict)
# Change back some keywords from JSON representation to Python
for json_key, python_key in {
": true": ": True",
": false": ": False",
": null": ": None",
}.items():
def_dict = def_dict.replace(json_key, python_key)
doc += " " + def_dict + "\n\n"
return r"{}".format(doc)
# private bound methods:
def __init__(self, schema, field="default"):
self._field = field
self._json = self.trim_schema(schema, field=field)
def __getitem__(self, item):
return self._json.__getitem__(item)
def __setitem__(self, key, value):
self._json.__setitem__(key, value)
def __str__(self):
return self._to_string()
def _to_string(self, parent_dictionary=None, n=0):
r"""
Pretty print the schema dictionary. Includes indentation and default values.
Parameters
----------
parent_dictionary: dict
format this dictionary as a string
n: int
indentation level
Returns
-------
str
"""
if parent_dictionary is None:
s = f"#\n# property-name ({self._field} value)\n#\n"
parent_dictionary = self._json
else:
s = ""
for name, value in parent_dictionary.items():
s += " " * n + f"{name}"
if isinstance(value, dict) is False:
print_value = "" if value is None else f" = {value}"
s += f"{print_value}\n"
else:
s += ":\n"
s += self._to_string(parent_dictionary=value, n=n + 1)
return s
def _iteritems_recursive(self, parent_dictionary=None):
r"""
Iterate over the schema's items, descending recursively into lower-level dictionaries"""
if parent_dictionary is None:
parent_dictionary = self._json
for name, value in parent_dictionary.items():
if isinstance(value, dict):
self._iteritems_recursive(parent_dictionary=value)
yield (name, value)
def _property_names(self, parent_dictionary=None):
r"""List of all property names, as we navigate the schema dictionary. Duplicates in the list are OK."""
if parent_dictionary is None:
parent_dictionary = self._json
names = list(parent_dictionary.keys())
for value in parent_dictionary.values():
if isinstance(value, dict):
names.extend(self._property_names(parent_dictionary=value))
return names
class ReductionParameters:
r"""
Validation and sanity checks for all parameters defining a reduction.
Broadly speaking, two types of validation against the JSON schema are enforced:
1. validators built-in jsonschema package
2. method of this class that start with "_validate". This methods are inserted in the
JSON schema via dedicated keywords ('dataSource', 'useEntry',..)
Parameters
----------
parameters: dict
The dictionary of reduction parameters to be validated.
schema_instrument: dict
One of the instrument schemae under subdirectory 'schema' of ~drtsans.configdir
permissible: bool
If `False`, raise an exception if a parameter in the parameters dictionary is not found in the instrument's
schema, and a warning otherwise.
"""
# Routines organization
# 1. private class variables
# 2. public class methods and static functions
# 3. member initialization
# 4. public bound methods
# 5. private bound methods
# 1. private class variables
_validators = {
"dataSource": "_validate_data_source",
"evaluateCondition": "_validate_evaluate_condition",
"lessThan": "_validate_less_than",
"exclusiveOr": "_validate_exclusive_or",
"fluxFileTOF": "_validate_flux_file_tof",
"pairedTo": "_validate_is_paired_to",
"onlyOneTrue": "_validate_only_one_true",
"sameLen": "_validate_equal_len",
"useEntry": "_validate_use_entry",
"wedgeSources": "_validate_wedge_sources",
"pairwiseLessThan": "_validate_pairwise_less_than",
}
# 2. public class methods and static functions
@staticmethod
def initialize_suggestions(schema_instrument):
schema_resolved = resolver_common.dereference(schema_instrument)
default_json = DefaultJson(schema_resolved)
return Suggest(default_json.property_names)
# 3. member initialization
def __init__(self, parameters, schema_instrument, permissible=False):
self._parameters = parameters
self._schema = resolver_common.dereference(schema_instrument) # schema instrument without `$ref` entries
self._permissible = permissible
# object in charge of offering a suggestion when the user enters the wrong property name
self._entries = ReductionParameters.initialize_suggestions(schema_instrument)
self._initialize_parameters()
self.validator = self.initialize_validator()
# 4. public bound methods
def validate(self):
r"""
Run all the validators on the reduction parameters
This code reproduces jsonschema.validate omitting error selection when more
than one error is found
Raises
------
jsonschema.ValidationError
"""
errors = iter(self.validator.iter_errors(self._parameters))
best = next(errors, None)
if best is None:
return
error = max(itertools.chain([best], errors), key=relevance)
if error is not None:
raise error
def dump(self, output_json, target="parameters", **kwargs):
r"""
Save the reduction parameters dictionary to a JSON object
Parameters
----------
output_json: str, io.StringIO
File name or File-like object where to save the reduction parameters. If passing file name,
the file extension must by ".json"
target: str
One of 'parameters' or 'schema', depending on what dictionary we want to dump
kwargs: dict
Optional arguments for json.dump
"""
target_selector = dict(parameters=self.parameters, schema=self._schema)
file_handle = output_json # initialize assuming the file handle is a file-like object
if isinstance(output_json, str):
if output_json.endswith(".json") is False:
raise RuntimeError(f'"{output_json}" extension is not ".json"')
file_handle = open(output_json, "w")
# default formatting
if "indent" not in kwargs:
kwargs["indent"] = 2
json.dump(target_selector[target], file_handle, **kwargs)
file_handle.close()
def dumps(self, target="parameters", **kwargs):
r"""
print the reduction parameters dictionary to a string
Parameters
----------
target: str
One of 'parameters' or 'schema', depending on what dictionary we want to dump
kwargs: dict
Optional arguments for json.dump
Returns
-------
str
"""
target_selector = dict(parameters=self.parameters, schema=self._schema)
# default formatting
if "indent" not in kwargs:
kwargs["indent"] = 2
return json.dumps(target_selector[target], **kwargs)
def get_parameter_value(self, composite_key):
r"""
Find the value for a composite key uniquely identifying one of the reduction parameter properties.
Composite keys (optionally) start with character '#', and keys are joined with backslash '/' character.
Example: '#configuration/Qmin' will search for `_parameters['configuration']['Qmin']`
Parameters
----------
composite_key: str
Returns
-------
bool, str, int, float
"""
unrooted_composite_key = composite_key.replace("#", "")
value = self._parameters
for name in unrooted_composite_key.split("/"):
value = value[name]
return value
@property
def parameters(self):
r"""Accessor-only to private _parameters"""
return self._parameters
# 5. private bound methods
def __getitem__(self, item):
try:
return self._parameters.__getitem__(item)
except KeyError as e:
raise KeyError(f'{e}. Did you mean "{self._entries.top_match(item)}"?')
def __setitem__(self, key, value):
top_match = self._entries.top_match(key)
if top_match != key:
warnings.warn(f'{key} not found in the parameters dictionary. Closest match is "{top_match}"')
self._parameters.__setitem__(key, value)
def initialize_validator(self):
# Find which schema-draft version in self._schema
draft = re.search("(draft-[0-9]+)", self._schema["$schema"]).groups()[0]
# select schema appropriate to the schema-draft version
meta_schema = {"draft-07": jsonschema.Draft7Validator}[draft] # include more in the future
# Create a new class Validator by including the custom validator functions listed in
# ReductionParameters._validators into the standard class Validator
all_validators = dict(meta_schema.VALIDATORS)
for keyword, function_name in self._validators.items():
validator_function = getattr(self, function_name)
all_validators[keyword] = validator_function
validator_factory = jsonschema.validators.create(
meta_schema=meta_schema.META_SCHEMA, validators=all_validators
)
# create the validator as an instance of our custom class Validator, `validator_factory`
validator_factory.check_schema(self._schema) # sanity check with static method `check_schema`
return validator_factory(self._schema)
def _initialize_json_validator(self):
# Elucidate the draft version for the meta-schema
meta_schemas = {"draft-07": jsonschema.Draft7Validator}
# Find which schema-draft version in self._schema
meta_schema_key = re.search("(draft-[0-9]+)", self._schema["$schema"]).groups()[0]
meta_schema = meta_schemas[meta_schema_key] # select schema appropriate to the schema-draft version
#
all_validators = dict(meta_schema.VALIDATORS)
for keyword, function_name in self._validators.items():
function = getattr(self, function_name)
all_validators[keyword] = function
return jsonschema.validators.create(meta_schema=meta_schema.META_SCHEMA, validators=all_validators)
def _initialize_parameters(self, schema=None, parameters=None):
r"""
1. If value is empty string, set to None
2. If value is None and has a default, set to the default value
2. If value is a non-empty string but can be a number, set to integer or float
"""
if schema is None:
schema = self._schema
parameters = self._parameters
else:
schema = resolver_common.dereference(schema) # dereference entries with key `$ref`
# We need list() in order to capture the initial state of parameter values
for name, parameter_value in list(parameters.items()):
try:
schema_value = schema["properties"][name] # schema dictionary associated to parameter_value
except KeyError:
try:
schema_value = schema["additionalProperties"][name]
except KeyError as key_err:
not_found_message = f"Parameter {name} not found in the schema."
if self._permissible:
logger.warning(not_found_message)
else:
properties_keys = schema["properties"].keys()
if "additionalProperties" in schema.keys():
properties_keys.extend(schema["additionalProperties"].keys())
errmsg = f"{not_found_message}. Available properties are: {properties_keys}"
raise KeyError(errmsg + ". " + str(key_err))
if isinstance(parameter_value, dict) is True:
# recursive call for nested dictionaries. We pass references to the child dictionaries
# for the schema and the reduction parameters
self._initialize_parameters(schema=schema_value, parameters=parameter_value)
else:
# initialization of parameters[name begins
if parameter_value in ("", None):
parameters[name] = schema_value.get("default", None) # is there a default value?
elif "preferredType" in schema_value: # parameter_value is not empty
cast = type_selector(schema_value["preferredType"])
parameters[name] = cast(parameter_value)
def _validate_data_source(self, validator, value, instance, schema):
r"""
Check for the existence of the data source. Typically applied to find out if a file exists.
If looking for event data, look also in the list of directories given by entry "eventDataDirectories".
Parameters
----------
validator: ~jsonschema.IValidator
value: str
One of 'file' or 'events'.
- 'file' triggers a call to os.path.exists for a search of `instance` in the local file system.
- 'events' triggers a call to ~drtsans.path.abspath to search the nexus events file associated
to `instance`. Preconditions are that the JSON files contains entries 'iptsNumber' and 'instrumentName'.
instance: str
file path or run number to be validated.
schema: dict
schema related to `instance`
Raises
------
~jsonschema.ValidationError
when the validator fails or when `value` is not one of the allowed values
"""
if instance is not None:
if isinstance(instance, str) is False:
yield jsonschema.ValidationError(f"{instance} is not a string")
data_directories = self.get_parameter_value("#dataDirectories")
if value == "file":
try:
abspath(instance, directory=data_directories)
except RuntimeError:
yield jsonschema.ValidationError(f"Cannot find file {instance}")
elif value == "events": # run number(s)
instrument_name = instrument_filesystem_name(self["instrumentName"])
try:
# the runNumber preferredType is a list of strings
abspaths(
instance,
instrument=instrument_name,
ipts=self["iptsNumber"],
directory=data_directories,
search_archive=True,
)
except RuntimeError:
yield jsonschema.ValidationError(f"Cannot find events file associated to {instance}")
else:
sources = ("file", "events")
yield jsonschema.ValidationError(f"{value} is not valid data source. Try one of {sources}")
def _validate_evaluate_condition(self, validator, value, instance, schema):
r"""
Checks a condition evaluates to :py:obj:`True`
Example: len({this}) == len(#configuration/WedgeMinAngles) checks that the length of
`instance` and the length of the value associated to
#configuration/WedgeMinAngles is :py:obj:`True`
Parameters
----------
validator: ~jsonschema.IValidator
value: str
condition to be evaluated
instance: float, list
current reduction parameter to be compared to other reduction parameters in the condition.
schema: dict
schema related to `instance`
Raises
------
~jsonschema.ValidationError
when the condition in `value` evaluates to :py:obj:`False`
"""
condition = value.replace("{this}", str(instance))
# replace keywords of other reduction parameters with their corresponding values
composite_keys = re.findall(r"{#([\w,/]+)}", value)
condition = condition.replace("#", "") # python has trouble formatting strings containing '#'
for composite_key in composite_keys:
other_instance = self.get_parameter_value(composite_key)
other_instance_key = f"{{{composite_key}}}" # the key is enclosed by curly braces
condition = condition.replace(other_instance_key, str(other_instance))
if eval(condition) is False:
yield jsonschema.ValidationError(f"{value} condition has evaluated to False")
def _validate_less_than(self, validator, value, instance, schema):
r"""
Check the parameter value is smaller than the value of other parameters
Example: Qmin should be smaller than Qmax
Parameters
----------
validator: ~jsonschema.IValidator
value: str, list
entry path(s) of the other parameters to compare to (e.g. '#configuration/Qmax')
instance: str
file path or run number to be validated.
schema: dict
schema related to `instance`
Raises
------
~jsonschema.ValidationError
when the validator fails or when `value` is not one of the allowed values
"""
if instance is not None and isinstance(instance, (int, float)) is False:
yield jsonschema.ValidationError(f"{instance} is not a number")
entry_paths = value
if isinstance(value, str):
entry_paths = [value]
for entry_path in entry_paths:
other_instance = self.get_parameter_value(entry_path)
if other_instance is not None:
if isinstance(other_instance, (int, float)) is False:
yield jsonschema.ValidationError(f"{entry_path} is not a number")
if instance >= other_instance:
yield jsonschema.ValidationError(f"{instance} is not smaller than {entry_path}")
def _validate_pairwise_less_than(self, validator, value, instance, schema):
r"""Check that values in a list are smaller than the value in the same position in another list
Example: Qmin[0] should be smaller than Qmax[0], Qmin[1] should be smaller than Qmax[1], etc
Parameters
----------
validator: ~jsonschema.IValidator
value: str, list
entry path(s) of the other parameters to compare to (e.g. '#configuration/Qmax')
instance: object
value for the entry being validated
schema: dict
schema related to `instance`
Raises
------
~jsonschema.ValidationError
when the validator fails or when `value` is not one of the allowed values
"""
if isinstance(instance, (int, float)):
instance = [instance]
entry_paths = value
if isinstance(value, str):
entry_paths = [value]
for entry_path in entry_paths:
other_instance = self.get_parameter_value(entry_path)
if [instance, other_instance].count(None) == 1:
yield jsonschema.ValidationError(f"{entry_path} or its companion parameter is empty")
if isinstance(other_instance, (int, float)):
other_instance = [other_instance]
if instance and other_instance and len(instance) != len(other_instance):
yield jsonschema.ValidationError(f"{entry_path} and its companion parameter have different lengths")
if instance and other_instance:
if not greater(other_instance, instance).all():
yield jsonschema.ValidationError(f"Pairwise less than failed for {instance} < {other_instance}")
def _validate_exclusive_or(self, validator, value, instance, schema):
r"""
Check that only one of two related entries entries is not :py:obj:`None`.
Parameters
----------
validator: ~jsonschema.IValidator
value: str
entry name for the datum entry associated with the boolean entry.
instance: str
file path or run number to be validated.
schema: dict
schema related to `instance`
Raises
------
~jsonschema.ValidationError
when the validator fails or when `value` is not one of the allowed values
"""
other_instance = self.get_parameter_value(value)
both_true = instance is not None and other_instance is not None
both_false = instance is None and other_instance is None
if both_true or both_false:
yield jsonschema.ValidationError(f"{value}")
def _validate_use_entry(self, validator, value, instance, schema):
r"""
Verify that parameters associated to a 'use' boolean entry is not-empty when the boolean entry evalues
to :py:obj:`True`.
Example: if 'useDefaultMask' is True, check entry 'defaultMask' exists and is not empty.
Parameters
----------
validator: ~jsonschema.IValidator
value: str, list
entry(ies) name(s) for the datum entry associated with the boolean entry.
instance: bool
only check for the datum entry if :py:obj:`True`
schema: dict
schema related to `instance`
Raises
------
~jsonschema.ValidationError
when the validator fails or when `value` is not one of the allowed values
"""
if not isinstance(instance, bool):
yield jsonschema.ValidationError(f"{instance} is not a boolean")
if instance is True:
composite_keys = (
[
value,
]
if isinstance(value, str)
else value
) # either a string or a list
for composite_key in composite_keys:
other_instance = self.get_parameter_value(composite_key)
if other_instance is None:
yield jsonschema.ValidationError(f"{composite_key} is empty")
def _validate_is_paired_to(self, validator, value, instance, schema):
r"""
Verify two parameters must be both either empty or not-empty
Example: if 'wavelength' parameter is not :py:obj:`None`, check 'wavelengthSpread' parameter is also
not :py:obj:`None`
Parameters
----------
validator: ~jsonschema.IValidator
value: str, list
entry(ies) associated with this `instance`.
instance: object
value for the entry being validated.
schema: dict
schema related to `instance`
Raises
------
~jsonschema.ValidationError
when the validator fails or when `value` is not one of the allowed values
"""
if instance is not None:
composite_key = value
other_instance = self.get_parameter_value(composite_key)
if other_instance is None:
yield jsonschema.ValidationError(f"{composite_key} is empty")
def _validate_only_one_true(self, validator, value, instance, schema):
r"""
Check that only one boolean entry is :py:obj:`True`. All can be :py:obj:`False`.
Example: either 'configuration/useTimeSlice' or 'configuration/useLogSlice' can be True,
but not both.
Parameters
----------
validator: ~jsonschema.IValidator
value: str, list
entry(ies) associated with this `instance`.
instance: object
value for the entry being validated.
schema: dict
schema related to `instance`
Raises
------
~jsonschema.ValidationError
when the the type of `instance` is not Bool or when more than one entry in `value` plus
`instance` evaluates to :py:obj:`True`.
"""
if not isinstance(instance, bool):
yield jsonschema.ValidationError(f"{instance} is not a boolean")
truth_count = 1 if instance is True else 0
composite_keys = (
[
value,
]
if isinstance(value, str)
else value
) # either a string or a list
for composite_key in composite_keys:
other_instance = self.get_parameter_value(composite_key)
if other_instance is True:
truth_count += 1
if truth_count > 1:
yield jsonschema.ValidationError(f"More than {value} entry is True")
def _validate_equal_len(self, validator, value, instance, schema):
r"""
Check that two list instances have equal evaluation of len()
Example: check than WedgeMinAngles and WedgeMaxAngles are list of same length
Parameters
----------
validator: ~jsonschema.IValidator
value: str, list
entry(ies) associated with this `instance`.
instance: object
value for the entry being validated. Should be a list or be casted into a list
schema: dict
schema related to `instance`
Raises
------
~jsonschema.ValidationError
when the the type of `instance` is not Bool or when more than one entry in `value` plus
`instance` evaluates to :py:obj:`True`.
"""
if instance is not None:
if isinstance(instance, str): # instance is a string representation of a list
instance = instance.replace("[", "").replace("]", "").split(",")
target_len = len(instance)
composite_keys = (
[
value,
]
if isinstance(value, str)
else value
) # either a string or a list
for composite_key in composite_keys:
other_instance = self.get_parameter_value(composite_key)
if instance is None:
yield jsonschema.ValidationError(f"list(s) {composite_keys} have different length than instance")
if isinstance(other_instance, str): # instance is a string representation of a list
instance = instance.replace("[", "").replace("]", "").split(",")
if len(other_instance) != target_len:
yield jsonschema.ValidationError(f"list(s) {composite_keys} have different length than instance")
def _validate_wedge_sources(self, validator, value, instance, schema):
r"""
Check that we can derive wedge angels from the given parameters.
Two sources of wedge parameter are identified:
1. custom WedgeMinAngles and WedgeMaxAngles.
2. automatic wedges, requires autoWedgeQmin, autoWedgeQmax, autoWedgeQdelta, autoWedgeAzimuthalDelta,
autoWedgePeakWidth, autoWedgeBackgroundWidth, and autoWedgeSignalToNoiseMin.
Parameters
----------
validator: ~jsonschema.IValidator
value: list
sets of wedge specifications. Each item in this list is a list itself, containing the parameter names
specifying the wedge specifications.
instance: object
value for the entry being validated. Perform validation only if this instance evaluates to 'wedge'..
schema: dict
schema related to `instance`
Raises
------
~jsonschema.ValidationError
when every set of wedge specifications contains at least one empty parameter.
"""
if instance == "wedge":
source_set_valid_found = False # True if we find one source set that allows us to specify the wedge angles
for source_set in value:
for composite_key in source_set:
other_instance = self.get_parameter_value(composite_key)
if other_instance is None:
break # the source set is invalid because we're missing this one parameter
else:
source_set_valid_found = True # all instances in the source set are not empty. It's a valid set
if source_set_valid_found is False:
yield jsonschema.ValidationError(
f"We cannot define the wedge angles given the current" f"values or parameters {value}"
)
def _validate_flux_file_tof(self, validator, value, instance, schema):
r"""
Check that the entry specifying the flux file is not-empty for the selected normalization.
For example, if `instance` evaluates to 'Monitor', check parameter 'fluxMonitorRatioFile' is not
empty.
Parameters
----------
validator: ~jsonschema.IValidator
value: list
items in this list are pairs of normalization type and parameter name containing the location of the
associated flux file
instance: object
value for the entry being validated.
schema: dict
schema related to `instance`
Raises
------
~jsonschema.ValidationError
when every set of wedge specifications contains at least one empty parameter.
"""
# assigns `None` for 'Time' normalization, since there's no flux file associated to this type of normalization
composite_key = value.get(instance, None)
if composite_key is not None:
other_instance = self.get_parameter_value(composite_key) # path to the flux file
if other_instance is None:
yield jsonschema.ValidationError(f"No flux file was specified for {instance} normalization")
def _instrument_json_generator(instrument=None, field="default"):
r"""
For each instrument schema, yield a resolved ~drtsans.redparms.DefaultJson instance.
Parameters
----------
instrument: str
Name of the instrument. If :py:obj:`None` then generate for all instruments
field: str
entry in the parameter's schema that we want to use as default value for the parameter.
Returns
-------
tuple
A two item tuple. First item is instrument name and second item is the schema trimmed instance
"""
schema_dir = os.path.join(configdir, "schema")
instrument_names = instrument_standard_names() if instrument is None else [instrument_standard_name(instrument)]
for name in instrument_names:
schema_file = os.path.join(schema_dir, f"{name}.json")
with open(schema_file, "r") as file_handle:
schema_unresolved = json.load(file_handle)
schema_resolved = resolver_common.dereference(schema_unresolved)
yield name, DefaultJson(schema_resolved, field=field)
def default_reduction_parameters(instrument_name):
r"""
Get the dictionary of reduction parameters with default values from the schema
Parameters
----------
instrument_name: str
Returns
-------
dict
"""
_, generated_parameters = list(_instrument_json_generator(instrument=instrument_name))[0]
return generated_parameters.parameters
def pretty_print_schemae(save_dir):
r"""
Save a trimmed version of the instrument schemae for revision to file (BIOSANS.txt,...)
Parameters
----------
save_dir: str
Absolute path where printed schemae are saved
"""
for name, default_json in _instrument_json_generator():
save_path = os.path.join(save_dir, f"{name}.txt")
open(save_path, "w").write(str(default_json))
def generate_json_files(save_dir, timestamp=True, field="default"):
r"""
For each instrument schema, dump only the physical properties and their default
values into a JSON formatted file.
Parameters
----------
save_dir: str
Absolute path where printed schemae are saved
timestamp: bool
Include a 'timestamp' entry
"""
for name, default_json in _instrument_json_generator(field=field):
save_path = os.path.join(save_dir, f"{name}.json")
default_json.dump(save_path)
[docs]
def validate_reduction_parameters(parameters, permissible=False):
r"""
Validate reduction parameters against the instrument's schema.
Parameters
----------
parameters: dict, ~drtsans.redparms.ReductionParameters
Reduction configuration
permissible: bool
If `False`, raise an exception if a parameter in the parameters dictionary is not found in the instrument's
schema, and a warning otherwise.
Returns
-------
dict
Validated reduction parameters
"""
if isinstance(parameters, ReductionParameters):
parameters = parameters.parameters
instrument_name = instrument_standard_name(parameters["instrumentName"])
schema = load_schema(instrument_name)
parameters = ReductionParameters(parameters, schema, permissible=permissible)
parameters.validate()
return deepcopy(parameters.parameters)
[docs]
def update_reduction_parameters(parameters_original, parameter_changes, validate=True, permissible=False):
r"""
Update the values of a reduction parameters dictionary with values from another dictionary. Handles nested
dictionaries. Validate after update is done.
Dictionary `parameters_original` is not modified, but a new copy is produced an updated with
`parameter_changes`
Parameters
----------
parameters_original: dict
parameter_changes: dict
validate: bool
Perform validation of the parameters
permissible: bool
If `False`, raise an exception if a parameter in the overall parameters dictionary is not found in the
instrument's schema, and a warning otherwise.
Returns
-------
dict
"""
parameters_updated = deepcopy(parameters_original)
_update_reduction_parameters(parameters_updated, parameter_changes)
if validate is False:
return parameters_updated
return validate_reduction_parameters(parameters_updated, permissible=permissible)
def _update_reduction_parameters(parameters_original: dict, parameter_changes: dict) -> None:
r"""
Update the values of a reduction parameters dictionary with values from another dictionary. Handles nested
dictionaries. Update is performed in-place.
Parameters
----------
parameters_original
parameter_changes
Returns
-------
dict
"""
for name, value in parameter_changes.items():
if isinstance(value, dict):
if name not in parameters_original:
parameters_original[name] = {}
_update_reduction_parameters(parameters_original[name], value)
else:
parameters_original[name] = value
[docs]
def reduction_parameters(parameters_particular=None, instrument_name=None, validate=True, permissible=False):
r"""
Serve all necessary (and validated if so desired) parameters for a reduction session of
a particular instrument.
Parameters
----------
parameters_particular: dict
Non-default parameters, particular to the reduction session. If :py:obj:`None`, then the
default parameters for the specified instrument are passed.
instrument_name: str
Mix the non-default parameters with the remaining default parameters appropriate for this instrument.
If left as :py:obj:`None`, the instrument name is looked under keyword 'instrumentName' in
dictionary `parameters_particular`
validate: bool
Perform validation of the parameters
permissible: bool
If `False`, raise an exception if a parameter in the parameters dictionary is not found in the instrument's
schema, and a warning otherwise.
Returns
-------
dict
"""
if parameters_particular is None and instrument_name is None:
raise RuntimeError("Either `parameters_particular` or `instrument_name` must be specified")
if instrument_name is None:
instrument_name = parameters_particular["instrumentName"]
instrument_name = instrument_standard_name(instrument_name) # e.g. change CG2 to GPSANS
reduction_input = default_reduction_parameters(instrument_name)
if parameters_particular is None:
if validate is False:
return reduction_input # nothing else to do
return validate_reduction_parameters(reduction_input, permissible=permissible)
return update_reduction_parameters(
reduction_input, parameters_particular, validate=validate, permissible=permissible
)
def load_schema(instrument_name):
r"""
Load the schema appropriate to an instrument.
Parameters
----------
instrument_name: str
One of the standard instrument names (BIOSANS, EQSANS, GPSANS)
Returns
-------
dict
"""
file_path = os.path.join(configdir, "schema", f"{instrument_name}.json")
return json.load(open(file_path, "r"))