Source code for h5rdmtoolbox

"""h5rdmtoolbox repository"""

import logging
import pathlib
from logging.handlers import RotatingFileHandler
from typing import Optional, Dict

import rdflib

from ._version import __version__
from .user import USER_LOG_DIR, USER_DATA_DIR

DEFAULT_LOGGING_LEVEL = logging.WARNING
_formatter = logging.Formatter(
    '%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
    datefmt='%Y-%m-%d_%H:%M:%S')

_stream_handler = logging.StreamHandler()
_stream_handler.setLevel(DEFAULT_LOGGING_LEVEL)
_stream_handler.setFormatter(_formatter)

_file_handler = RotatingFileHandler(USER_LOG_DIR / 'h5rdmtoolbox.log', delay=True)
_file_handler.setLevel(logging.DEBUG)  # log everything to file!
_file_handler.setFormatter(_formatter)

logger = logging.getLogger(__package__)
logger.addHandler(_stream_handler)
logger.addHandler(_file_handler)

import atexit
# noinspection PyUnresolvedReferences
import pint_xarray
import shutil
import xarray as xr
from typing import Union, Callable
from pydantic import HttpUrl

from h5rdmtoolbox._cfg import set_config, get_config, get_ureg

pint_xarray.unit_registry = get_ureg()

from . import convention
from .convention.core import Convention
from . import wrapper
from .user import UserDir
from ._version import __version__
from . import utils
from .wrapper.core import lower, Lower, File, Group, Dataset
from . import errors
from .wrapper import jsonld
from .database.lazy import lazy
from .wrapper.h5attr import Attribute
import json
from .wrapper.accessor import register_accessor

from .ld.shacl import validate_hdf, ValidationResult
from .ld._types import RDFMappingEntry
# noinspection PyUnresolvedReferences
from .utils import DownloadFileManager

name = 'h5rdmtoolbox'
__this_dir__ = pathlib.Path(__file__).parent
__author__ = 'Matthias Probst'
__author_orcid__ = 'https://orcid.org/0000-0001-8729-0482'


def get_package_meta() -> Dict:
    """Reads codemeta.json and returns it as dict"""
    with open(__this_dir__ / '../codemeta.json', 'r') as f:
        codemeta = json.loads(f.read())
    return codemeta


cv_h5py = convention.Convention(name='h5py',
                                contact=__author_orcid__)
cv_h5py.register()

use = convention.core.use
use(None)


def dump(src: Union[str, File, pathlib.Path],
         *args, **kwargs) -> None:
    """Call h5.dump() on the provided HDF5 file

    Parameters
    ----------
    src : str, File, pathlib.Path
        the HDF5 file or filename to dump. An object which has a hdf_filename attribute can also be provided.
    kwargs
        kwargs passed to h5.dump(). See h5.dump() for more information.
    """
    if isinstance(src, File):
        with File(src.hdf_filename) as h5:
            return h5.dump(*args, **kwargs)

    if isinstance(src, (str, pathlib.Path)):
        pass
    else:
        if hasattr(src, 'hdf_filename'):
            src = src.hdf_filename

    with File(src) as h5:
        return h5.dump(**kwargs)


def dumps(src: Union[str, File, pathlib.Path]):
    """Call h5.dumps() on the provided HDF5 file

    Parameters
    ----------
    src : str, File, pathlib.Path
        the HDF5 file or filename to dump. An object which has a hdf_filename attribute can also be provided.
    """
    if isinstance(src, File):
        with File(src.hdf_filename) as h5:
            return h5.dumps()

    if isinstance(src, (str, pathlib.Path)):
        pass
    else:
        if hasattr(src, 'hdf_filename'):
            src = src.hdf_filename

    with File(src) as h5:
        return h5.dumps()


def get_ld(
        hdf_filename: Union[str, pathlib.Path],
        structural: bool = True,
        contextual: bool = True,
        skipND: int = 1,
        file_uri: Optional[str] = None,
        context: Optional[Dict] = None
) -> rdflib.Graph:
    """Return the HDF file content as a rdflib.Graph object."""
    from . import ld
    return ld.get_ld(
        hdf_filename,
        structural=structural,
        contextual=contextual,
        skipND=skipND,
        file_uri=file_uri,
        context=context,
    )


[docs] def compute_metrics( hdf_filename: Union[str, pathlib.Path], structural: bool = True, contextual: bool = True, skipND: int = 1, file_uri: Optional[str] = None, context: Optional[Dict] = None, rdf_mappings: Dict[str, RDFMappingEntry] = None, ) -> Dict[str, object]: """Compute RDF knowledge graph metrics for an HDF5 file.""" from .ld.metrics import compute_metrics as _compute_metrics return _compute_metrics( hdf_filename, structural=structural, contextual=contextual, skipND=skipND, file_uri=file_uri, context=context, rdf_mappings=rdf_mappings, )
def _optimize_ld_context(graph: rdflib.Graph, context: Optional[Dict]) -> Dict: from .ld import optimize_context return optimize_context(graph, context or {}) def _resolve_serialize_format(fmt: str, kwargs: Dict) -> str: return kwargs.pop("format", fmt) def dump_jsonld( hdf_filename: Union[str, pathlib.Path], skipND: int = 1, indent: int = 2, structural: bool = True, contextual: bool = True, context: Optional[Dict] = None, file_uri: Optional[str] = None ): """Return the file content as a JSON-LD string.""" graph = get_ld(hdf_filename, structural=structural, contextual=contextual, file_uri=file_uri, skipND=skipND) return graph.serialize( format="json-ld", indent=indent, auto_compact=True, context=_optimize_ld_context(graph, context) ) # def shacl_validate( # *, # hdf_data: Union[str, rdflib.Graph] = None, # hdf_source: Union[h5py.File, pathlib.Path] = None, # shacl_data: Union[str, rdflib.Graph] = None, # shacl_source: Union[str, pathlib.Path] = None, # hdf_file_uri="https://example.org/hdf5file#", # shacl_format: str = 'turtle', # hdf_data_format: str = 'turtle', # **pyshacl_kwargs # ) -> ValidationResult: # """Validate HDF5 file content against SHACL shapes. # # Parameters # ---------- # hdf_data : Union[str, rdflib.Graph], optional # RDF data of the HDF5 file as string or rdflib.Graph. If not # provided, `hdf_source` must be provided. # hdf_source : Union[h5py.File, pathlib.Path], optional # HDF5 file or h5py.File object to extract RDF data from. If not # provided, `hdf_data` must be provided. # shacl_data : Union[str, rdflib.Graph], optional # SHACL shapes as string or rdflib.Graph. If not provided, # `shacl_source` must be provided. # shacl_source : Union[str, pathlib.Path], optional # File path to SHACL shapes. If not provided, `shacl_data` # must be provided. # hdf_file_uri : str, optional # The file URI to use for the HDF5 file when extracting RDF data. # Default is "https://example.org/hdf5file#". # shacl_format : str, optional # The format of the SHACL shapes if `shacl_data` is provided as # string. Default is 'turtle'. # hdf_data_format : str, optional # The format of the HDF5 RDF data if `hdf_data` is provided as # string. Default is 'turtle'. # **pyshacl_kwargs # Additional keyword arguments passed to pyshacl.validate(). # # Returns # ------- # ValidationResult # The result of the SHACL validation. # """ # return validate_hdf( # hdf_data=hdf_data, # hdf_source=hdf_source, # shacl_data=shacl_data, # shacl_source=shacl_source, # hdf_file_uri=hdf_file_uri, # shacl_format=shacl_format, # hdf_data_format=hdf_data_format, # **pyshacl_kwargs # ) def dump_jsonld_depr(hdf_filename: Union[str, pathlib.Path], skipND: int = 1, structural: bool = True, contextual: bool = True, context: Optional[Dict] = None, blank_node_iri_base: Optional[str] = None ) -> str: """Dump the JSON-LD representation of the file. With semantic=True and structural=False, the JSON-LD represents the semantic content only. To get a pure structural representation, set semantic=False, which will ignore any RDF content. If both are set to True, the JSON-LD will contain both structural and semantic. Parameters ---------- hdf_filename : str, pathlib.Path the HDF5 file to dump. skipND : int=1 Skip writing data of datasets with more than `skipND` dimensions. Only considered if structural=True. structural : bool=True Include structural information in the JSON-LD output. contextual : bool=True Include contextual information in the JSON-LD output. context: Optional[Dict] context in form of {prefix: IRI}, e.g. "ssno": "https://matthiasprobst.github.io/ssno#" blank_node_iri_base: Optional[str] IRI base used for blank nodes """ if blank_node_iri_base is not None: if not isinstance(blank_node_iri_base, (str, HttpUrl)): raise ValueError('blank_node_iri_base must be a valid URL') from .wrapper import jsonld if not structural and not contextual: raise ValueError('At least one of structural or contextual must be True.') from h5rdmtoolbox.ld.hdf.file import get_serialized_ld if structural and not contextual: return get_serialized_ld( hdf_filename, blank_node_iri_base, format="json-ld", context=context, skipND=skipND) return get_serialized_ld( hdf_filename, blank_node_iri_base, format="json-ld", context=context, skipND=skipND) def serialize(hdf_filename, fmt: str = "ttl", skipND: int = 1, structural: bool = True, contextual: bool = True, file_uri: Optional[Union[str, Dict[str, str]]] = None, rdf_mappings: Dict[str, RDFMappingEntry] = None, **kwargs): """Alternative to json-ld but allows multiple serialization options""" fmt = _resolve_serialize_format(fmt, kwargs) with File(hdf_filename) as h5: return h5.serialize(fmt=fmt, skipND=skipND, structural=structural, contextual=contextual, file_uri=file_uri, rdf_mappings=rdf_mappings) def sparql( hdf_filename: Union[str, pathlib.Path], query: str, structural: bool = True, contextual: bool = True, skipND: int = 1, file_uri: Optional[Union[str, Dict[str, str]]] = None, rdf_mappings: Dict[str, RDFMappingEntry] = None, as_dataframe: bool = False, **kwargs ) -> rdflib.query.Result: """Run a SPARQL query on the HDF5 file content. Returns a rdflib.query.Result object.""" from .ld import sparql return sparql( source=hdf_filename, query=query, structural=structural, contextual=contextual, skipND=skipND, file_uri=file_uri, rdf_mappings=rdf_mappings, as_dataframe=as_dataframe, **kwargs ) def build_pyvis_graph(hdf_filename, output_filename="kg-graph.html", notebook=False, structural: bool = True, contextual: bool = True, style: Dict = None): """Calls `build_pyvis_graph` of kglab library. Requires kglab and pyvis""" try: import kglab except ImportError: raise ImportError('kglab is required for this function. Install it using: pip install kglab') kg = kglab.KnowledgeGraph().load_rdf_text( serialize(hdf_filename, fmt="ttl", structural=structural, contextual=contextual) ) vis_style = style or { "hdf": { "color": "orange", "size": 40, }, "ind": { "color": "blue", "size": 30, }, } subgraph = kglab.SubgraphTensor(kg) pyvis_graph = subgraph.build_pyvis_graph(notebook=notebook, style=vis_style) pyvis_graph.force_atlas_2based() pyvis_graph.show(output_filename) return pyvis_graph def get_filesize(hdf_filename: Union[str, pathlib.Path]) -> int: """Get the size of the HDF5 file in bytes""" return utils.get_filesize(hdf_filename) def get_checksum(hdf_filename: Union[str, pathlib.Path]) -> str: """Get the checksum of the HDF5 file""" return utils.get_checksum(hdf_filename) def register_dataset_decoder(decoder: Callable, decoder_name: str = None, overwrite: bool = False): """A decoder function takes a xarray.DataArray and a dataset as input and returns a xarray.DataArray It is called after the dataset is loaded into memory and before being returned to the user. Be careful: Multiple decoders can be registered, and they are called in the order of registration. Hence, your decoder may behave unexpectedly! """ from .wrapper import ds_decoder if decoder_name is None: decoder_name = decoder.__name__ registered_decorators = ds_decoder.registered_dataset_decoders if decoder_name in registered_decorators or decoder in registered_decorators.values(): if not overwrite: raise ValueError(f'decoder "{decoder_name}" already registered. Name and function must be unique.') ds_decoder.registered_dataset_decoders[decoder_name] = decoder _ATEXIT_VERBOSE = False def set_loglevel(level: Union[int, str]): """Set the logging level of the h5rdmtoolbox logger""" import logging _logger = logging.getLogger('h5rdmtoolbox') _logger.setLevel(level) for h in _logger.handlers: h.setLevel(level) @atexit.register def clean_temp_data(full: bool = False): """cleaning up the tmp directory""" failed_dirs = [] failed_dirs_file = UserDir['tmp'] / 'failed.txt' if full: root_tmp = USER_DATA_DIR / 'tmp' if root_tmp.exists(): try: shutil.rmtree(root_tmp) root_tmp.mkdir(exist_ok=True, parents=True) except PermissionError as e: print(f'removing tmp folder "{root_tmp}" failed due to "{e}".') return for _tmp_session_dir in [UserDir['tmp'], ]: if _ATEXIT_VERBOSE: print(f'cleaning up tmp directory "{_tmp_session_dir}"') if _tmp_session_dir.exists(): try: if _ATEXIT_VERBOSE: print(f'try deleting tmp in session dir: {_tmp_session_dir}') shutil.rmtree(_tmp_session_dir) except PermissionError as e: if _ATEXIT_VERBOSE: print(f'[!] failed deleting tmp session dir: {_tmp_session_dir}') failed_dirs.append(UserDir['tmp']) if _ATEXIT_VERBOSE: print(f'removing tmp folder "{_tmp_session_dir}" failed due to "{e}". Best is you ' f'manually delete the directory.') finally: lines = [] if failed_dirs_file.exists(): with open(failed_dirs_file, 'r') as f: lines = f.readlines() for line in lines: try: shutil.rmtree(line.strip()) except Exception: if pathlib.Path(line).exists(): failed_dirs.append(line) if lines or failed_dirs: with open(failed_dirs_file, 'w') as f: for fd in failed_dirs: f.writelines(f'{fd}\n') else: failed_dirs_file.unlink(missing_ok=True) else: logger.debug(f'No user tmp dir not found: {_tmp_session_dir}') xr.set_options(display_expand_data=False) __all__ = ('__version__', '__author__', '__author_orcid__', 'UserDir', 'use', 'File', 'Group', 'Dataset', 'Attribute', 'dump', 'dumps', 'cv_h5py', 'lower', 'Lower', 'set_config', 'get_config', 'get_ureg', 'Convention', 'jsonld', 'lazy', 'DownloadFileManager', 'clean_temp_data')