Source code for h5rdmtoolbox.layout.core

"""Layout validation module"""
import enum
import logging
import pathlib
import types
import uuid
import warnings
from typing import Dict, Union, List, Protocol, Optional, Callable, Tuple

import h5py

import h5rdmtoolbox as h5tbx

logger = logging.getLogger('h5rdmtoolbox')


class VALIDATION_FLAGS(enum.Enum):
    """Validation flags used in layout validation"""
    UNCALLED = 0
    SUCCESSFUL = 1
    FAILED = 2
    ALTERNATIVE_CALLED = 4  # an alternative spec succeeded
    INVALID_NUMBER = 8
    OPTIONAL = 16  # if a spec is optional, it is successful independent of the number of results


def _get_flag_explanations(flag):
    explanations = []
    for i in VALIDATION_FLAGS:
        if flag & i.value:
            explanations.append(i.name)
    return ', '.join(explanations)


def _replace_callables_with_names(dict_with_callables: Dict) -> Dict:
    """Replace all callables in a dictionary with their names plus '()'
    Used by __repr__ of LayoutSpecification"""
    dict_with_callables = dict_with_callables.copy()
    for key in dict_with_callables.keys():
        if callable(dict_with_callables[key]):
            dict_with_callables[key] = f'{dict_with_callables[key].__name__}()'
        elif isinstance(dict_with_callables[key], dict):
            _replace_callables_with_names(dict_with_callables[key])
    return dict_with_callables


class QueryCallable(Protocol):
    """Protocol class for classes passed to the LayoutSpecification class.

    Those classes must implement the method find()
    """

    def find(self, target: Union[str, pathlib.Path, h5tbx.Group], **kwargs): ...

    def __call__(self, *args, **kwargs) -> List: ...


def is_single_result(res) -> bool:
    """Return True if the result is a single result. This is the case if the result is not an iterable object."""
    if res is None:
        return False
    return not isinstance(res, (types.GeneratorType, tuple, list))


class SpecificationResult:
    """Stores the result of a specification call"""

    def __init__(self, target):
        self.target = target
        self.target_name = target if isinstance(target, str) else target.name
        self.target_type = 'Dataset' if isinstance(target, h5py.Dataset) else 'Group'
        self.validation_flag = VALIDATION_FLAGS.UNCALLED.value
        self.res = []

    @property
    def n_res(self) -> int:
        """Number of found objects (results)"""
        return len(self.res)


[docs] class LayoutSpecification: """Specification for a layout Parameters ---------- func: QueryCallable Callable query according to protocol class QueryCallable to be called on the hdf5 file. The first argument of the function will be an opened h5py.File or h5py.Group or h5py.Dataset object. kwargs: Dict Keyword arguments passed to the func. n: Union[int, None, Dict] Number of matches or condition. Only applicable if query function (`func`) returns an iterable object. None means that the number can be zero as well, which makes the specification optional. Example: n=1 means that the specification is successful if the query function returns exactly one. If n is a dictionary, the key must be a comparison operator (e.g. '$eq', '$gt', '$lt', '$gte', '$lte'), e.g. {'$eq': 1} means that the query function must return exactly one result. {'$gt': 1} means that the query function must return more than one result. description: Optional[str] Optional description explaining the specification parent: Optional[LayoutSpecification] Parent specification. If the specification is a conditional one, it has a parent. If this is the case, the function `func` is called on the parent's results. If the parent's specification fails, the specification is not applied. If `parent` is None, the specification has no parent and is applied to the hdf5 root group. """
[docs] def __init__(self, func: QueryCallable, kwargs: Dict, n: Union[int, None, Dict], rebase: bool = False, description: Optional[str] = None, parent: Optional["LayoutSpecification"] = None): self.func = func self.kwargs = kwargs self.rebase = rebase self.n, self.number_of_result_comparison = self._parse_n_def(n) self.id = uuid.uuid4() self.specifications: List[LayoutSpecification] = [] self.alt_specifications: List[LayoutSpecification] = [] self.parent: Optional["LayoutSpecification"] = parent self.description: str = description or '' self.results: List[Optional[SpecificationResult]] = [] self._n_calls = 0 self._n_fails = 0
@property def validation_flag(self) -> int: raise NotImplementedError('validation_flag is moved') def __eq__(self, other) -> bool: """A specification is equal to another if the ID is identical or the function, kwargs, description and parent are identical.""" if not isinstance(other, LayoutSpecification): return False if isinstance(other, Layout): return False same_id = self.id == other.id if same_id: return True same_parent = self.parent == other.parent same_comment = self.description == other.description same_kwargs = self.kwargs == other.kwargs same_func = self.func == other.func same_n = self.n == other.n return all([same_parent, same_comment, same_kwargs, same_func, same_n]) @property def failed(self) -> bool: """Return True if the specification failed""" return any( r.validation_flag & VALIDATION_FLAGS.FAILED.value == VALIDATION_FLAGS.FAILED.value for r in self.results) @property def n_calls(self) -> int: """Return number of calls""" return self._n_calls @property def n_fails(self) -> int: """Return number of failed calls""" return self._n_fails def reset(self) -> None: """Reset the specification and all its children""" self._n_calls = 0 self._n_fails = 0 self.results = [] for spec in self.specifications: spec.reset() @property def called(self) -> bool: """Return True if the specification has been called at least once. This is determined by the number of calls.""" return self.n_calls > 0 @property def n_successes(self): """Return number of successful calls""" if self.n_calls == 0: raise ValueError('Not called') return self.n_calls - self._n_fails @staticmethod def _parse_n_def(n: int) -> Tuple[Union[int, None], Callable]: """Parse the number of expected results""" if n is None: return None, lambda x, y: True # number_of_result_comparison = lambda x, y: True # n = None else: if isinstance(n, int): if n < 1: raise ValueError('n must be greater than 0') n = {'$eq': n} if not isinstance(n, dict): raise TypeError(f'n must be an integer or dictionary, but got {type(n)}') from ..database.hdfdb import query assert len(n) == 1, 'n must be a dictionary with exactly one key' for k, v in n.items(): try: number_of_result_comparison = query.operator[k] except KeyError: raise KeyError(f'Unexpected operator. Valid ones are: {list(query.operator.keys())}') assert isinstance(v, int), 'n must be an integer' n = v return n, number_of_result_comparison def __repr__(self): _kwargs = _replace_callables_with_names(self.kwargs) if self.description: return f'{self.__class__.__name__}(description="{self.description}", kwargs={_kwargs})' return f'{self.__class__.__name__}(kwargs={_kwargs})' def __call__(self, target: Union[h5py.Group, h5py.Dataset]): if isinstance(target, h5tbx.wrapper.lazy.LHDFObject): with target as _target: return self.__call__(_target) if self.rebase and isinstance(target, (h5tbx.Dataset, h5tbx.Group)): target = target.rootparent self._n_calls += 1 scr = SpecificationResult(target) if self.n is None: # per definition successful since n is None scr.validation_flag = VALIDATION_FLAGS.SUCCESSFUL.value + VALIDATION_FLAGS.OPTIONAL.value logger.debug(f'Calling spec {self} on hdf obj {target}.') res = self.func(target, **self.kwargs) if res is None: res = [] elif is_single_result(res): res = [res] if self.n is None: self.n, self.number_of_result_comparison = self._parse_n_def(1) if not is_single_result(res) and res is not None: res = list(res) # assign result to scr scr.res = res if not res: # first assume failure unless n=None. There might be an alternative spec registered though. # The following will update `res` if self.n is None: # query is optional! scr.validation_flag = VALIDATION_FLAGS.SUCCESSFUL.value + VALIDATION_FLAGS.OPTIONAL.value else: scr.validation_flag = VALIDATION_FLAGS.FAILED.value # If alternative specifications exist, try them. This will only be performed if self.n is not None if len(self.alt_specifications) > 0 and self.n is not None: alt_spec_successes = [] res = [] for alt_spec in self.alt_specifications: alt_res = alt_spec.func(target, **alt_spec.kwargs) alt_sr = SpecificationResult(target) alt_sr.res = alt_res if not is_single_result(alt_res): alt_res = list(alt_res) if alt_res: alt_sr.validation_flag = VALIDATION_FLAGS.SUCCESSFUL.value alt_spec_successes.append(True) else: alt_sr.validation_flag = VALIDATION_FLAGS.FAILED.value alt_spec_successes.append(False) res.extend(alt_res) scr.res.extend(alt_res) # failed = not any(alt_spec_successes) if any(alt_spec_successes): logger.debug('An alternative succeeded!') if scr.validation_flag & VALIDATION_FLAGS.FAILED.value: scr.validation_flag -= VALIDATION_FLAGS.FAILED.value scr.validation_flag += VALIDATION_FLAGS.ALTERNATIVE_CALLED.value else: self._n_fails += 1 logger.error(f'Applying spec. "{self}" on "{target}" failed.') scr.validation_flag = VALIDATION_FLAGS.FAILED.value + VALIDATION_FLAGS.ALTERNATIVE_CALLED.value # now, for successful results, let's apply the sub-specifications if exist # and check how many results we have and if the number of results is correct (if n is specified) if is_single_result(res): # as there is a result, the specification is successful as n=1 is implicit scr.validation_flag = VALIDATION_FLAGS.SUCCESSFUL.value # check sub-specifications for sub_spec in self.specifications: sub_spec(res) self.results.append(scr) return n_res = len(res) if res: # if no alternative was defined, res may still be None! # continue here if res is an iterable object for r in res: if not r: # first result failed logger.error(f'Applying spec. "{self}" on "{target}" failed.') self._n_fails += 1 else: # if it was successful, we can check the sub-specifications for sub_spec in self.specifications: logger.debug(f'Calling spec {sub_spec} to hdf obj {r}.') sub_spec(r) # logger.debug(f'Validation {self} found {n_res} results from which {self._n_fails} failed.') # If the number of successful results is not specified, it means, that # the spec is optional. so it is successful in any case! if self.n is not None: if self.number_of_result_comparison(n_res, self.n): scr.validation_flag = VALIDATION_FLAGS.SUCCESSFUL.value else: # if self.n != n_res: self._n_fails += 1 scr.validation_flag = VALIDATION_FLAGS.FAILED.value + VALIDATION_FLAGS.INVALID_NUMBER.value logger.error(f'Applying spec. "{self}" failed due to not ' f'matching the number of results: {self.n} != {n_res}') self.results.append(scr) def add(self, func: QueryCallable, *, n: Optional[Union[int, None, Dict]] = None, rebase: bool = False, description: Optional[str] = None, **kwargs): """ Add a specification by providing a callable query obj. Optionally, the number of exact matches can be provided as well as the a description string. The kwargs are passed to the callable Parameters ---------- func: Callable Function to be called on the hdf5 file n: int Number of matches or query dictionary. None indicates optional specification. description: Optional[str] Optional description explaining the specification kwargs: Dict Keyword arguments passed to the func Returns ------- LayoutSpecification Examples -------- >>> from h5rdmtoolbox.database import FileDB >>> lay = LayoutSpecification() >>> spec1 = lay.__set_meta_field__(hdfdb.FileDB.find, flt={'$name': '/u'}, n=1) >>> spec2 = lay.__set_meta_field__(...) # add another spec to layout >>> spec_sub1 = spec1.__set_meta_field__(...) # add spec to `spec1` if it succeeds and apply to all results of `spec1` """ new_spec = LayoutSpecification(func=func, kwargs=kwargs, n=n, rebase=rebase, description=description, parent=self) for spec in self.specifications: if spec == new_spec: warnings.warn(f'Specification "{new_spec}" already exists. Skipping.', UserWarning) return spec self.specifications.append(new_spec) return new_spec def add_alternative(self, func: QueryCallable, *, n: Union[int, None, Dict], description: Optional[str] = None, **kwargs): """Add an alternative specification by providing a callable query obj. Optionally, the number of exact matches can be provided as well as the a description string. The kwargs are passed to the callable Either the parent or the alternative specification must be successful. .. note:: An alternative query can only added to specifications which have a number of expected results (n!=None). Parameters ---------- func: Callable Function to be called on the hdf5 file n: Union[int, None, Dict] Number of matches or query dictionary. None indicates optional specification. description: Optional[str] Optional description explaining the specification kwargs: Dict Keyword arguments passed to the func """ if self.n is None: raise ValueError('Parent specification must not be an optional specification. Please provide a number of ' 'expected results.') new_spec = LayoutSpecification(func=func, kwargs=kwargs, n=n, description=description, parent=self) for spec in self.alt_specifications: if spec == new_spec: warnings.warn(f'Specification "{new_spec}" already exists. Skipping.', UserWarning) return spec self.alt_specifications.append(new_spec) return new_spec def is_valid(self): """Return True if the specification is valid""" if self.n_calls == 0: print(f'{self} has not been called yet') return False if self.n is None: return True if self.failed: print(f'{self} failed') return False return all(spec.is_valid() for spec in self.specifications) def get_valid(self) -> List['LayoutSpecification']: """Return all successful specifications""" if self.n_calls == 0: return [] # has not been called yet, thus cannot be valid valid: List[LayoutSpecification] = [] if not self.failed and self.n_calls > 0 and not self.failed: valid.append(self) if self.specifications: for spec in self.specifications: valid.extend(spec.get_valid()) return valid def get_failed(self) -> List['LayoutSpecification']: """Return a list of failed specifications""" if self.failed is True: return [self] failed = [] if self.failed: failed.append(self) if self.specifications: failed.extend(spec.get_failed() for spec in self.specifications) # flatten list: return [item for sublist in failed for item in sublist] return failed def get_summary(self, exclude_keys: Optional[Union[str, List[str]]] = None) -> List[Dict]: """return a summary as dictionary""" if isinstance(exclude_keys, str): exclude_keys = [exclude_keys, ] data = [] for res in self.results: data.append({'id': self.id, 'called': len(self.results) > 0, # 'n_calls/n_res/n_fails': f'{self.n_calls}/{n_res}/{self.n_fails}' if self.called else '-(-)', 'flag': res.validation_flag, 'flag description': _get_flag_explanations(res.validation_flag), 'description': self.description, 'target_type': res.target_type, 'target_name': res.target_name, # '_n_fails': self._n_fails, 'func': f'{self.func.__module__}.{self.func.__name__}', 'kwargs': self.kwargs, }) if exclude_keys is None: exclude_keys = [] elif isinstance(exclude_keys, str): exclude_keys = [exclude_keys, ] for key in exclude_keys: # pop keys from dict: for i, d in enumerate(data): data[i].pop(key) for spec in self.specifications: data.extend(spec.get_summary(exclude_keys)) return data
class LayoutResult: """Container for the result of a layout validation. It only contains a list of failed specs.""" def __init__(self, specifications: List[LayoutSpecification]): self.specifications: List[LayoutSpecification] = specifications def get_failed(self) -> List[LayoutSpecification]: """Return a list of failed specifications""" failed = [spec.get_failed() for spec in self.specifications] # flatten list: return [item for sublist in failed for item in sublist] def get_valid(self) -> List[LayoutSpecification]: """Return a list of valid specifications""" valid_specs = [spec.get_valid() for spec in self.specifications] # flatten list: return [item for sublist in valid_specs for item in sublist] def is_valid(self) -> bool: """Return True if the layout is valid, which is the case if no specs failed""" return len(self.get_failed()) == 0 def get_summary(self, exclude_keys: Optional[Union[str, List[str]]] = None, failed_only: bool = False) -> List[Dict]: """return a list of dictionaries containing information about a specification call""" data = [] for spec in self.specifications: s = spec.get_summary(exclude_keys=exclude_keys) if failed_only: data.extend([d for d in s if d['flag'] & 2 == 2]) else: data.extend(s) return data def print_summary(self, exclude_keys: Optional[Union[str, List[str]]] = None, failed_only: bool = False): """Prints a summary of the specification. Requires the tabulate package.""" try: from tabulate import tabulate except ImportError: raise ImportError('Please install tabulate to use this method') print('\nSummary of layout validation') print(tabulate(self.get_summary(exclude_keys, failed_only), headers='keys', tablefmt='psql')) if self.is_valid(): print('--> Layout is valid') else: print('--> Layout validation found issues!')
[docs] class Layout(LayoutSpecification): """A layout is a collection of specifications that can be applied to an HDF5 file or group. The class is inherited from LayoutSpecification. Some methods are overwritten. Examples -------- >>> from h5rdmtoolbox import layout >>> lay = layout.Layout() >>> spec_all_dataset = lay.__set_meta_field__( >>> hdfdb.FileDB.find, # query function >>> flt={}, >>> objfilter='dataset' >>> ) >>> >>> # all datasets must be compressed with gzip (conditional spec. only called if parent spec is successful) >>> spec_compression = spec_all_dataset.__set_meta_field__( >>> hdfdb.FileDB.find_one, # query function >>> flt={'$compression': 'gzip'} # query parameter >>> ) >>> >>> # the file must have the dataset "/u" >>> spec_ds_u = lay.__set_meta_field__( >>> hdfdb.FileDB.find, # query function >>> flt={'$name': '/u'}, >>> objfilter='dataset' >>> ) >>> lay.validate('path/to/file.h5') """
[docs] def __init__(self, description: str = ''): self.description = description # description of the layout class self.specifications: List[LayoutSpecification] = []
def __repr__(self): return f'{self.__class__.__name__} (description="{self.description}")' def __eq__(self, other): if not isinstance(other, Layout): return False return self.specifications == other.specifications def validate(self, filename_or_root_group: Union[str, pathlib.Path, h5py.Group]) -> LayoutResult: """Validate the layout by passing a filename or an opened root group""" self.reset() # if isinstance(filename_or_root_group, (str, pathlib.Path)): # with h5tbx.File(filename_or_root_group, mode='r') as h5: # return self.validate(h5) # if isinstance(filename_or_root_group, h5py.Group): # if not filename_or_root_group.name == '/': # raise ValueError('If passing an HDF5 group, a root group must be passed') # first reset all specs (n_calls = 0, _n_fails = 0, failed = None) for spec in self.specifications: spec.reset() for spec in self.specifications: spec(filename_or_root_group) return LayoutResult(self.specifications) def __call__(self, *args, **kwargs): raise RuntimeError('Layout cannot be called. User `.validate()` to validate') def is_valid(self) -> bool: """Return True if all specifications are valid. A specification is valid it is has been called and has not been failed""" return all(spec.is_valid() for spec in self.specifications) def get_failed(self) -> List[LayoutSpecification]: """Return a list of failed specifications""" failed = [spec.get_failed() for spec in self.specifications] # flatten list: return [item for sublist in failed for item in sublist]