"""Layout validation module"""
import enum
import logging
import pathlib
import types
import uuid
import warnings
from typing import Dict, Union, List, Protocol, Optional, Callable, Tuple
import h5py
import h5rdmtoolbox as h5tbx
logger = logging.getLogger('h5rdmtoolbox')
class VALIDATION_FLAGS(enum.Enum):
"""Validation flags used in layout validation"""
UNCALLED = 0
SUCCESSFUL = 1
FAILED = 2
ALTERNATIVE_CALLED = 4 # an alternative spec succeeded
INVALID_NUMBER = 8
OPTIONAL = 16 # if a spec is optional, it is successful independent of the number of results
def _get_flag_explanations(flag):
explanations = []
for i in VALIDATION_FLAGS:
if flag & i.value:
explanations.append(i.name)
return ', '.join(explanations)
def _replace_callables_with_names(dict_with_callables: Dict) -> Dict:
"""Replace all callables in a dictionary with their names plus '()'
Used by __repr__ of LayoutSpecification"""
dict_with_callables = dict_with_callables.copy()
for key in dict_with_callables.keys():
if callable(dict_with_callables[key]):
dict_with_callables[key] = f'{dict_with_callables[key].__name__}()'
elif isinstance(dict_with_callables[key], dict):
_replace_callables_with_names(dict_with_callables[key])
return dict_with_callables
class QueryCallable(Protocol):
"""Protocol class for classes passed to the LayoutSpecification class.
Those classes must implement the method find()
"""
def find(self, target: Union[str, pathlib.Path, h5tbx.Group], **kwargs): ...
def __call__(self, *args, **kwargs) -> List: ...
def is_single_result(res) -> bool:
"""Return True if the result is a single result. This is the case if the result is not an iterable object."""
if res is None:
return False
return not isinstance(res, (types.GeneratorType, tuple, list))
class SpecificationResult:
"""Stores the result of a specification call"""
def __init__(self, target):
self.target = target
self.target_name = target if isinstance(target, str) else target.name
self.target_type = 'Dataset' if isinstance(target, h5py.Dataset) else 'Group'
self.validation_flag = VALIDATION_FLAGS.UNCALLED.value
self.res = []
@property
def n_res(self) -> int:
"""Number of found objects (results)"""
return len(self.res)
[docs]class LayoutSpecification:
"""Specification for a layout
Parameters
----------
func: QueryCallable
Callable query according to protocol class QueryCallable to be called on the hdf5 file.
The first argument of the function will be an opened h5py.File or h5py.Group or h5py.Dataset object.
kwargs: Dict
Keyword arguments passed to the func.
n: Union[int, None, Dict]
Number of matches or condition. Only applicable if query function (`func`) returns an iterable
object. None means that the number can be zero as well, which makes the specification optional.
Example: n=1 means that the specification is successful if the query function returns exactly one.
If n is a dictionary, the key must be a comparison operator (e.g. '$eq', '$gt', '$lt', '$gte', '$lte'),
e.g. {'$eq': 1} means that the query function must return exactly one result.
{'$gt': 1} means that the query function must return more than one result.
description: Optional[str]
Optional description explaining the specification
parent: Optional[LayoutSpecification]
Parent specification. If the specification is a conditional one, it has a parent.
If this is the case, the function `func` is called on the parent's results.
If the parent's specification fails, the specification is not applied.
If `parent` is None, the specification has no parent and is applied to the
hdf5 root group.
"""
[docs] def __init__(self,
func: QueryCallable,
kwargs: Dict,
n: Union[int, None, Dict],
rebase: bool = False,
description: Optional[str] = None,
parent: Optional["LayoutSpecification"] = None):
self.func = func
self.kwargs = kwargs
self.rebase = rebase
self.n, self.number_of_result_comparison = self._parse_n_def(n)
self.id = uuid.uuid4()
self.specifications: List[LayoutSpecification] = []
self.alt_specifications: List[LayoutSpecification] = []
self.parent: Optional["LayoutSpecification"] = parent
self.description: str = description or ''
self.results: List[Optional[SpecificationResult]] = []
self._n_calls = 0
self._n_fails = 0
@property
def validation_flag(self) -> int:
raise NotImplementedError('validation_flag is moved')
def __eq__(self, other) -> bool:
"""A specification is equal to another if the ID is identical or the
function, kwargs, description and parent are identical."""
if not isinstance(other, LayoutSpecification):
return False
if isinstance(other, Layout):
return False
same_id = self.id == other.id
if same_id:
return True
same_parent = self.parent == other.parent
same_comment = self.description == other.description
same_kwargs = self.kwargs == other.kwargs
same_func = self.func == other.func
same_n = self.n == other.n
return all([same_parent, same_comment, same_kwargs, same_func, same_n])
@property
def failed(self) -> bool:
"""Return True if the specification failed"""
return any(
r.validation_flag & VALIDATION_FLAGS.FAILED.value == VALIDATION_FLAGS.FAILED.value for r in self.results)
@property
def n_calls(self) -> int:
"""Return number of calls"""
return self._n_calls
@property
def n_fails(self) -> int:
"""Return number of failed calls"""
return self._n_fails
def reset(self) -> None:
"""Reset the specification and all its children"""
self._n_calls = 0
self._n_fails = 0
self.results = []
for spec in self.specifications:
spec.reset()
@property
def called(self) -> bool:
"""Return True if the specification has been called at least once.
This is determined by the number of calls."""
return self.n_calls > 0
@property
def n_successes(self):
"""Return number of successful calls"""
if self.n_calls == 0:
raise ValueError('Not called')
return self.n_calls - self._n_fails
@staticmethod
def _parse_n_def(n: int) -> Tuple[Union[int, None], Callable]:
"""Parse the number of expected results"""
if n is None:
return None, lambda x, y: True
# number_of_result_comparison = lambda x, y: True
# n = None
else:
if isinstance(n, int):
if n < 1:
raise ValueError('n must be greater than 0')
n = {'$eq': n}
if not isinstance(n, dict):
raise TypeError(f'n must be an integer or dictionary, but got {type(n)}')
from ..database.hdfdb import query
assert len(n) == 1, 'n must be a dictionary with exactly one key'
for k, v in n.items():
try:
number_of_result_comparison = query.operator[k]
except KeyError:
raise KeyError(f'Unexpected operator. Valid ones are: {list(query.operator.keys())}')
assert isinstance(v, int), 'n must be an integer'
n = v
return n, number_of_result_comparison
def __repr__(self):
_kwargs = _replace_callables_with_names(self.kwargs)
if self.description:
return f'{self.__class__.__name__}(description="{self.description}", kwargs={_kwargs})'
return f'{self.__class__.__name__}(kwargs={_kwargs})'
def __call__(self, target: Union[h5py.Group, h5py.Dataset]):
if isinstance(target, h5tbx.wrapper.lazy.LHDFObject):
with target as _target:
return self.__call__(_target)
if self.rebase and isinstance(target, (h5tbx.Dataset, h5tbx.Group)):
target = target.rootparent
self._n_calls += 1
scr = SpecificationResult(target)
if self.n is None:
# per definition successful since n is None
scr.validation_flag = VALIDATION_FLAGS.SUCCESSFUL.value + VALIDATION_FLAGS.OPTIONAL.value
logger.debug(f'Calling spec {self} on hdf obj {target}.')
res = self.func(target, **self.kwargs)
if res is None:
res = []
elif is_single_result(res):
res = [res]
if self.n is None:
self.n, self.number_of_result_comparison = self._parse_n_def(1)
if not is_single_result(res) and res is not None:
res = list(res)
# assign result to scr
scr.res = res
if not res:
# first assume failure unless n=None. There might be an alternative spec registered though.
# The following will update `res`
if self.n is None: # query is optional!
scr.validation_flag = VALIDATION_FLAGS.SUCCESSFUL.value + VALIDATION_FLAGS.OPTIONAL.value
else:
scr.validation_flag = VALIDATION_FLAGS.FAILED.value
# If alternative specifications exist, try them. This will only be performed if self.n is not None
if len(self.alt_specifications) > 0 and self.n is not None:
alt_spec_successes = []
res = []
for alt_spec in self.alt_specifications:
alt_res = alt_spec.func(target, **alt_spec.kwargs)
alt_sr = SpecificationResult(target)
alt_sr.res = alt_res
if not is_single_result(alt_res):
alt_res = list(alt_res)
if alt_res:
alt_sr.validation_flag = VALIDATION_FLAGS.SUCCESSFUL.value
alt_spec_successes.append(True)
else:
alt_sr.validation_flag = VALIDATION_FLAGS.FAILED.value
alt_spec_successes.append(False)
res.extend(alt_res)
scr.res.extend(alt_res)
# failed = not any(alt_spec_successes)
if any(alt_spec_successes):
logger.debug('An alternative succeeded!')
if scr.validation_flag & VALIDATION_FLAGS.FAILED.value:
scr.validation_flag -= VALIDATION_FLAGS.FAILED.value
scr.validation_flag += VALIDATION_FLAGS.ALTERNATIVE_CALLED.value
else:
self._n_fails += 1
logger.error(f'Applying spec. "{self}" on "{target}" failed.')
scr.validation_flag = VALIDATION_FLAGS.FAILED.value + VALIDATION_FLAGS.ALTERNATIVE_CALLED.value
# now, for successful results, let's apply the sub-specifications if exist
# and check how many results we have and if the number of results is correct (if n is specified)
if is_single_result(res):
# as there is a result, the specification is successful as n=1 is implicit
scr.validation_flag = VALIDATION_FLAGS.SUCCESSFUL.value
# check sub-specifications
for sub_spec in self.specifications:
sub_spec(res)
self.results.append(scr)
return
n_res = len(res)
if res: # if no alternative was defined, res may still be None!
# continue here if res is an iterable object
for r in res:
if not r: # first result failed
logger.error(f'Applying spec. "{self}" on "{target}" failed.')
self._n_fails += 1
else:
# if it was successful, we can check the sub-specifications
for sub_spec in self.specifications:
logger.debug(f'Calling spec {sub_spec} to hdf obj {r}.')
sub_spec(r)
# logger.debug(f'Validation {self} found {n_res} results from which {self._n_fails} failed.')
# If the number of successful results is not specified, it means, that
# the spec is optional. so it is successful in any case!
if self.n is not None:
if self.number_of_result_comparison(n_res, self.n):
scr.validation_flag = VALIDATION_FLAGS.SUCCESSFUL.value
else: # if self.n != n_res:
self._n_fails += 1
scr.validation_flag = VALIDATION_FLAGS.FAILED.value + VALIDATION_FLAGS.INVALID_NUMBER.value
logger.error(f'Applying spec. "{self}" failed due to not '
f'matching the number of results: {self.n} != {n_res}')
self.results.append(scr)
def add(self,
func: QueryCallable,
*,
n: Optional[Union[int, None, Dict]] = None,
rebase: bool = False,
description: Optional[str] = None,
**kwargs):
"""
Add a specification by providing a callable query obj. Optionally, the
number of exact matches can be provided as well as the a description string.
The kwargs are passed to the callable
Parameters
----------
func: Callable
Function to be called on the hdf5 file
n: int
Number of matches or query dictionary. None indicates optional specification.
description: Optional[str]
Optional description explaining the specification
kwargs: Dict
Keyword arguments passed to the func
Returns
-------
LayoutSpecification
Examples
--------
>>> from h5rdmtoolbox.database import FileDB
>>> lay = LayoutSpecification()
>>> spec1 = lay.__set_meta_field__(hdfdb.FileDB.find, flt={'$name': '/u'}, n=1)
>>> spec2 = lay.__set_meta_field__(...) # add another spec to layout
>>> spec_sub1 = spec1.__set_meta_field__(...) # add spec to `spec1` if it succeeds and apply to all results of `spec1`
"""
new_spec = LayoutSpecification(func=func,
kwargs=kwargs,
n=n,
rebase=rebase,
description=description,
parent=self)
for spec in self.specifications:
if spec == new_spec:
warnings.warn(f'Specification "{new_spec}" already exists. Skipping.',
UserWarning)
return spec
self.specifications.append(new_spec)
return new_spec
def add_alternative(self,
func: QueryCallable,
*,
n: Union[int, None, Dict],
description: Optional[str] = None,
**kwargs):
"""Add an alternative specification by providing a callable query obj. Optionally, the
number of exact matches can be provided as well as the a description string.
The kwargs are passed to the callable
Either the parent or the alternative specification must be successful.
.. note::
An alternative query can only added to specifications which have a number of expected results (n!=None).
Parameters
----------
func: Callable
Function to be called on the hdf5 file
n: Union[int, None, Dict]
Number of matches or query dictionary. None indicates optional specification.
description: Optional[str]
Optional description explaining the specification
kwargs: Dict
Keyword arguments passed to the func
"""
if self.n is None:
raise ValueError('Parent specification must not be an optional specification. Please provide a number of '
'expected results.')
new_spec = LayoutSpecification(func=func,
kwargs=kwargs,
n=n,
description=description,
parent=self)
for spec in self.alt_specifications:
if spec == new_spec:
warnings.warn(f'Specification "{new_spec}" already exists. Skipping.',
UserWarning)
return spec
self.alt_specifications.append(new_spec)
return new_spec
def is_valid(self):
"""Return True if the specification is valid"""
if self.n_calls == 0:
print(f'{self} has not been called yet')
return False
if self.n is None:
return True
if self.failed:
print(f'{self} failed')
return False
return all(spec.is_valid() for spec in self.specifications)
def get_valid(self) -> List['LayoutSpecification']:
"""Return all successful specifications"""
if self.n_calls == 0:
return [] # has not been called yet, thus cannot be valid
valid: List[LayoutSpecification] = []
if not self.failed and self.n_calls > 0 and not self.failed:
valid.append(self)
if self.specifications:
for spec in self.specifications:
valid.extend(spec.get_valid())
return valid
def get_failed(self) -> List['LayoutSpecification']:
"""Return a list of failed specifications"""
if self.failed is True:
return [self]
failed = []
if self.failed:
failed.append(self)
if self.specifications:
failed.extend(spec.get_failed() for spec in self.specifications)
# flatten list:
return [item for sublist in failed for item in sublist]
return failed
def get_summary(self, exclude_keys: Optional[Union[str, List[str]]] = None) -> List[Dict]:
"""return a summary as dictionary"""
if isinstance(exclude_keys, str):
exclude_keys = [exclude_keys, ]
data = []
for res in self.results:
data.append({'id': self.id,
'called': len(self.results) > 0,
# 'n_calls/n_res/n_fails': f'{self.n_calls}/{n_res}/{self.n_fails}' if self.called else '-(-)',
'flag': res.validation_flag,
'flag description': _get_flag_explanations(res.validation_flag),
'description': self.description,
'target_type': res.target_type,
'target_name': res.target_name,
# '_n_fails': self._n_fails,
'func': f'{self.func.__module__}.{self.func.__name__}',
'kwargs': self.kwargs,
})
if exclude_keys is None:
exclude_keys = []
elif isinstance(exclude_keys, str):
exclude_keys = [exclude_keys, ]
for key in exclude_keys:
# pop keys from dict:
for i, d in enumerate(data):
data[i].pop(key)
for spec in self.specifications:
data.extend(spec.get_summary(exclude_keys))
return data
class LayoutResult:
"""Container for the result of a layout validation. It only contains a list of failed specs."""
def __init__(self, specifications: List[LayoutSpecification]):
self.specifications: List[LayoutSpecification] = specifications
def get_failed(self) -> List[LayoutSpecification]:
"""Return a list of failed specifications"""
failed = [spec.get_failed() for spec in self.specifications]
# flatten list:
return [item for sublist in failed for item in sublist]
def get_valid(self) -> List[LayoutSpecification]:
"""Return a list of valid specifications"""
valid_specs = [spec.get_valid() for spec in self.specifications]
# flatten list:
return [item for sublist in valid_specs for item in sublist]
def is_valid(self) -> bool:
"""Return True if the layout is valid, which is the case if no specs failed"""
return len(self.get_failed()) == 0
def get_summary(self, exclude_keys: Optional[Union[str, List[str]]] = None,
failed_only: bool = False) -> List[Dict]:
"""return a list of dictionaries containing information about a specification call"""
data = []
for spec in self.specifications:
s = spec.get_summary(exclude_keys=exclude_keys)
if failed_only:
data.extend([d for d in s if d['flag'] & 2 == 2])
else:
data.extend(s)
return data
def print_summary(self, exclude_keys: Optional[Union[str, List[str]]] = None,
failed_only: bool = False):
"""Prints a summary of the specification. Requires the tabulate package."""
try:
from tabulate import tabulate
except ImportError:
raise ImportError('Please install tabulate to use this method')
print('\nSummary of layout validation')
print(tabulate(self.get_summary(exclude_keys, failed_only), headers='keys', tablefmt='psql'))
if self.is_valid():
print('--> Layout is valid')
else:
print('--> Layout validation found issues!')
[docs]class Layout(LayoutSpecification):
"""A layout is a collection of specifications that can be applied to an HDF5 file or group.
The class is inherited from LayoutSpecification. Some methods are overwritten.
Examples
--------
>>> from h5rdmtoolbox import layout
>>> lay = layout.Layout()
>>> spec_all_dataset = lay.__set_meta_field__(
>>> hdfdb.FileDB.find, # query function
>>> flt={},
>>> objfilter='dataset'
>>> )
>>>
>>> # all datasets must be compressed with gzip (conditional spec. only called if parent spec is successful)
>>> spec_compression = spec_all_dataset.__set_meta_field__(
>>> hdfdb.FileDB.find_one, # query function
>>> flt={'$compression': 'gzip'} # query parameter
>>> )
>>>
>>> # the file must have the dataset "/u"
>>> spec_ds_u = lay.__set_meta_field__(
>>> hdfdb.FileDB.find, # query function
>>> flt={'$name': '/u'},
>>> objfilter='dataset'
>>> )
>>> lay.validate('path/to/file.h5')
"""
[docs] def __init__(self, description: str = ''):
self.description = description # description of the layout class
self.specifications: List[LayoutSpecification] = []
def __repr__(self):
return f'{self.__class__.__name__} (description="{self.description}")'
def __eq__(self, other):
if not isinstance(other, Layout):
return False
return self.specifications == other.specifications
def validate(self, filename_or_root_group: Union[str, pathlib.Path, h5py.Group]) -> LayoutResult:
"""Validate the layout by passing a filename or an opened root group"""
self.reset()
# if isinstance(filename_or_root_group, (str, pathlib.Path)):
# with h5tbx.File(filename_or_root_group, mode='r') as h5:
# return self.validate(h5)
# if isinstance(filename_or_root_group, h5py.Group):
# if not filename_or_root_group.name == '/':
# raise ValueError('If passing an HDF5 group, a root group must be passed')
# first reset all specs (n_calls = 0, _n_fails = 0, failed = None)
for spec in self.specifications:
spec.reset()
for spec in self.specifications:
spec(filename_or_root_group)
return LayoutResult(self.specifications)
def __call__(self, *args, **kwargs):
raise RuntimeError('Layout cannot be called. User `.validate()` to validate')
def is_valid(self) -> bool:
"""Return True if all specifications are valid.
A specification is valid it is has been called and has not been failed"""
return all(spec.is_valid() for spec in self.specifications)
def get_failed(self) -> List[LayoutSpecification]:
"""Return a list of failed specifications"""
failed = [spec.get_failed() for spec in self.specifications]
# flatten list:
return [item for sublist in failed for item in sublist]