import abc
import copy
import forge
import h5py
import inspect
import logging
import pathlib
import re
import shutil
import sys
import warnings
import yaml
from pydoc import locate
from typing import Union, List, Dict, Tuple, Any
from forge import kwargs
from h5rdmtoolbox import errors
from h5rdmtoolbox.repository import RepositoryInterface
from h5rdmtoolbox.wrapper import ds_decoder
from . import cfg
from . import consts
from . import errors
from .errors import ConventionNotFound
from .standard_attributes import StandardAttribute, __doc_string_parser__
from .utils import json2yaml
from .._repr import make_italic, make_bold
from ..user import UserDir
from ..repository import zenodo
from ..repository.zenodo.utils import recid_from_doi_or_redid
logger = logging.getLogger("h5rdmtoolbox")
CV_DIR = UserDir["convention"]
datetime_str = "%Y-%m-%dT%H:%M:%SZ%z"
class MissingAttribute:
def __init__(self, object_name: str, attribute_name: str):
self.object_name = object_name
self.attribute_name = attribute_name
def __str__(self):
return f'Attribute "{self.attribute_name}" is missing in "{self.object_name}".'
def __repr__(self):
return f"MissingAttribute({self.object_name}, {self.attribute_name})"
class InvalidAttribute:
def __init__(
self,
object_name: str,
attribute_name: str,
attribute_value: Any,
error_message: str,
):
self.object_name = object_name
self.attribute_name = attribute_name
self.attribute_value = attribute_value
self.error_message = error_message
def __str__(self):
return (
f'Attribute "{self.attribute_name}" in "{self.object_name}" has an invalid value '
f'"{self.attribute_value}". Error message: "{self.error_message}"'
)
def __repr__(self):
return f"InvalidAttribute({self.object_name}, {self.attribute_name}, {self.attribute_value}, {self.error_message})"
class AbstractConvention(abc.ABC):
"""Abstract class definition for convention"""
# Class interfaces:
# Reader interfaces:
@classmethod
@abc.abstractmethod
def from_yaml(cls, filename: Union[str, pathlib.Path]):
"""read a convention from a YAML file"""
@classmethod
@abc.abstractmethod
def from_json(cls, filename: Union[str, pathlib.Path]):
"""read a convention from a JSON file"""
# Validater:
@abc.abstractmethod
def validate(
self, file_or_filename: Union["h5tbx.File", h5py.File, str, pathlib.Path]
) -> List[Dict]:
"""Checking a file for compliance with the convention. Shall return dictionary indicating
invalid attributes."""
[docs]
class Convention(AbstractConvention):
"""Convention class
A convention is a set of standard attributes, which are defined in a YAML or JSON file.
Recommended initialization is via `Convention.from_yaml(<yaml_filename>)` or
`Convention.from_json(<json_filename>)`.
Parameters
----------
name : str
Name of the convention
contact : str
ORCID of the researcher
institution : str, optional
Institution of the researcher (if different from that of contact)
decoders: List[str], optional=None
List of decoders to be used for decoding datasets. If None, no decoder is used.
Decoders can be written by the user and registered with `h5tbx.register_dataset_decoder(<decoder_func>)`.
"""
[docs]
def __init__(
self,
*, # enforce keyword arguments
name: str,
contact: str, # ORCID of researcher
institution: str = None, # only if different than that from contact
standard_attributes: dict = None,
decoders: Union[str, List[str]] = None,
filename=None,
):
from ..wrapper.core import File, Group, Dataset
if decoders is None:
self._decoders = tuple()
else:
if isinstance(decoders, str):
self._decoders = (decoders,)
else:
self._decoders = tuple(decoders)
# a convention may be stored locally:
if filename is not None:
self.filename = pathlib.Path(filename).absolute()
else:
self.filename = filename
self.contact = contact
self.institution = institution
self._registered_standard_attributes = {}
self.name = name
self.properties = {}
self.methods = {File: {}, Group: {}, Dataset: {}}
if standard_attributes is None:
standard_attributes = {}
for std_name, std in standard_attributes.items():
self.add_standard_attribute(std)
def add_standard_attribute(self, std_attr: StandardAttribute) -> None:
"""Add a standard attribute to the convention."""
_registered_names = list(self._registered_standard_attributes.keys())
# check if the name is already registered:
_cls = std_attr.target_cls
std_attr_name = std_attr.name.split("-")[0]
std_attr.name = std_attr_name
prop = self.properties.get(_cls, None)
if prop is not None:
if std_attr_name in self.properties[_cls]:
raise errors.ConventionError(
f'A standard attribute with the name "{std_attr_name}" '
f'is already registered for "{std_attr.target_cls}".'
)
if std_attr.requirements is not None:
if not all(r in _registered_names for r in std_attr.requirements):
# collect the missing ones:
_missing_requirements = []
for r in std_attr.requirements:
if r not in _registered_names:
_missing_requirements.append(r)
raise errors.ConventionError(
f'Not all requirements for "{std_attr_name}" are registered. '
f"Please add them to the convention first: {_missing_requirements}"
)
self._registered_standard_attributes[std_attr_name] = std_attr
method_name = std_attr.target_method
target_cls = std_attr.target_cls
if target_cls not in self.properties:
self.properties[target_cls] = {}
self.properties[target_cls][std_attr_name] = std_attr
if target_cls not in self.methods:
self.methods[target_cls] = {}
add_to_method = True # for now all standard attributes are always added to the method (signature)
if add_to_method:
cls = StandardAttribute.METHOD_CLS_ASSIGNMENT[method_name]
if method_name not in self.methods[cls]:
self.methods[cls][method_name] = {}
self.methods[cls][method_name][std_attr_name] = std_attr
def add(self, std_attr: StandardAttribute) -> None:
"""Add a standard attribute to the convention."""
warnings.warn(
'The method "add" is deprecated. Please use "add_standard_attribute" instead.',
DeprecationWarning,
)
return self.add_standard_attribute(std_attr)
def delete(self):
"""Delete the convention from the user directory."""
delete(self.name.lower().replace("-", "_"))
def __repr__(self):
return f'{self.__class__.__name__}("{self.name}")'
def __str__(self):
header = f'Convention("{self.name}")'
out = f"{make_bold(header)}"
out += f"\ncontact: {self.contact}"
for cls, method_standard_attributes in self.methods.items():
for method_name, standard_attributes in method_standard_attributes.items():
out += f"\n {cls.__name__}.{method_name}():"
if len(standard_attributes) == 0:
out += f" ({make_italic('Nothing registered')})"
continue
# if props exist list them. first required, then optional
prop_dict = {"positional": {}, "keyword": {}}
for std_attr_name, std_attr in standard_attributes.items():
# for property_name, property_dict in methods.items():
if std_attr.is_positional():
prop_dict["positional"][std_attr_name] = std_attr
else:
prop_dict["keyword"][std_attr_name] = std_attr
for k, v in prop_dict["positional"].items():
out += (
f"\n * {make_bold(k + ' (obligatory)')} :\n\t\t"
f"{v.description}"
)
for k, v in prop_dict["keyword"].items():
default_value = v.default_value
if default_value == StandardAttribute.NONE:
out += f"\n * {make_italic(k)}:\n\t\t{v.description}"
else:
out += (
f"\n * {make_italic(k)} (default={default_value}):\n\t\t"
f"{v.description}"
)
out += "\n"
return out
def __enter__(self):
self._curr_cv = get_current_convention()
use(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
use(self._curr_cv.name)
@property
def registered_standard_attributes(self):
"""Return the registered standard attributes."""
return self._registered_standard_attributes
@property
def decoders(self) -> Tuple[str]:
"""Return registered decoders."""
return self._decoders
def add_decoder(self, decoder: str):
"""Add a decoder to the convention."""
if not isinstance(decoder, str):
raise TypeError(f"Expected a string, got {type(decoder)}")
from ..wrapper import ds_decoder
if decoder not in ds_decoder.registered_dataset_decoders:
raise KeyError(f'The decoder "{decoder}" is not registered.')
self._decoders += (decoder,)
def remove_decoder(self, decoder: str):
"""Remove a decoder from the convention.
Parameters
----------
decoder: str
name of the decoder to remove. TODO: Check if decoder is registered.
"""
decoders = list(self._decoders)
decoders.remove(decoder)
self._decoders = tuple(decoders)
return self._decoders
@classmethod
def from_json(
cls, json_filename: Union[str, pathlib.Path], overwrite: bool = False
) -> "Convention":
"""Create a convention from a json file."""
return cls.from_yaml(json2yaml(json_filename), overwrite=overwrite)
@classmethod
def from_yaml(
cls, yaml_filename: Union[str, pathlib.Path], overwrite: bool = False
):
"""Create a convention from a yaml file.
The YAML file must have the following structure:
# file content:
__name__ = "name of the convention"
__contact__ = "contact email or orcid or ..."
__version__ = "version of the convention"
<standard_attribute_name>:
target_cls: <class name>
target_method: <method name>
description: <description>
default_value: <default value> # optional, default is "$None"
requirements: [<list of required standard attributes>] # optional
# end of file
Note, that the name, author and version are required with the double underscores because they
need to be distinguished from the standard attributes. E.g. "contact" could be a standard attribute.
Parameters
----------
yaml_filename: str
path to the yaml file
overwrite: bool
if True, overwrite an existing (registered) convention with the same name
Returns
-------
Convention
The created conventionRaises
Raises
------
ValueError
If the YAML file does not contain "__name__" or "__contact__"
"""
if not isinstance(yaml_filename, (str, pathlib.Path)):
raise TypeError(
"Parameter yaml_filename must be a filename, i.e. str or pathlib.Path, "
f"got {type(yaml_filename)}"
)
yaml_filename = pathlib.Path(yaml_filename)
with open(yaml_filename, "r") as f:
attrs = _process_paths(yaml.safe_load(f), relative_to=yaml_filename.parent)
if "__name__" not in attrs:
raise ValueError(
f'YAML file {yaml_filename} does not contain "__name__". Is the file a valid convention?'
)
if "__contact__" not in attrs:
raise ValueError(
f'YAML file {yaml_filename} does not contain "__contact__". Is the file a valid convention?'
)
# check if name already exists!
convention_name = attrs["__name__"].lower().replace("-", "_")
if convention_name in [d.name for d in CV_DIR.glob("*")]:
if not overwrite:
return _get_convention_from_dir(attrs["__name__"])
# overwriting existing convention
delete(convention_name)
logger.debug(
f'Convention exists and overwrite is True: Deleting convention "{convention_name}"'
)
from . import generate
generate.write_convention_module_from_yaml(
yaml_filename, name=attrs["__name__"]
)
# add_convention(yaml_filename, name=attrs['__name__'])
# assert attrs['__name__'] in get_registered_conventions()
return _get_convention_from_dir(attrs["__name__"])
def pop(self, *names) -> "Convention":
"""removes the standard attribute with the given name from the convention
Parameters
----------
name: str
name of the standard attribute to remove
Returns
-------
Convention
a new convention without the given standard attribute
"""
new_conv = copy.deepcopy(self)
for prop in new_conv.properties.values():
for name in names:
prop.pop(name, None)
_new_methods_dict = new_conv.methods
for cls, meth_dict in new_conv.methods.items():
for meth_name, std_attr in meth_dict.items():
for name in names:
_new_methods_dict[cls][meth_name].pop(name, None)
new_conv.methods = _new_methods_dict
return new_conv
def _add_signature(self):
for cls, methods in self.methods.items():
for method_name, std_attrs in methods.items():
for std_attr_name, std_attr in std_attrs.items():
__doc_string_parser__[cls][method_name].add_additional_parameters(
{
std_attr_name: {
"default": std_attr.default_value,
"type": std_attr.type_hint,
"description": std_attr.description,
}
}
)
if isinstance(std_attr.position, dict):
position = std_attr.position
else:
signature = inspect.signature(cls.__dict__[method_name])
if std_attr.is_positional():
params = [
param
for param in signature.parameters.values()
if param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD
]
position = {"after": params[-1].name}
else:
params = [
param
for param in signature.parameters.values()
if param.kind == inspect.Parameter.KEYWORD_ONLY
]
if len(params) == 0:
params = [
param
for param in signature.parameters.values()
if param.kind
== inspect.Parameter.POSITIONAL_OR_KEYWORD
]
position = {"after": params[-1].name}
type_hint = locate(std_attr.type_hint)
setattr(
cls,
method_name,
forge.insert(
forge.arg(
f"{std_attr_name}",
default=std_attr.default_value,
type=type_hint,
),
**position,
)(cls.__dict__[method_name]),
)
for cls, methods in self.methods.items():
for name, props in methods.items():
__doc_string_parser__[cls][name].update_docstring()
def _delete_signature(self):
for cls, methods in self.methods.items():
for name, props in methods.items():
for prop_name, prop_attrs in props.items():
try: # try it. If a convention is created during runtime, this may happen!
setattr(
cls, name, forge.delete(f"{prop_name}")(cls.__dict__[name])
)
except ValueError:
pass
__doc_string_parser__[cls][name].restore_docstring()
# orig_docs[cls][name]['callable'].__doc__ = orig_docs[cls][name]['doc']
def register(self):
"""Register the convention in the convention directory"""
add_convention(self)
def validate(
self, file_or_filename: Union[str, pathlib.Path, "File"]
) -> List[Dict]:
"""Checks a file for compliance with the convention. It will NOT raise an error but
return a list of invalid attributes.
Parameters
----------
file_or_filename: str, pathlib.Path, File
path to the file or a File (h5tbx.File!) object
Returns
-------
List[Dict]
The invalid attributes
"""
from ..wrapper.core import File
if not isinstance(file_or_filename, (str, pathlib.Path)):
with File(file_or_filename, "r") as f:
return self.check(f)
failed = []
convention = self
def _is_str_dataset(node):
if node.dtype.kind == "S":
return True
return False
def _validate_convention(name, node):
"""Checks if the node (dataset or group) is compliant with the convention"""
for k, v in convention.properties.items():
if isinstance(node, k):
for ak, av in v.items():
if av.default_value is not consts.DefaultValue.EMPTY:
if ak in node.attrs:
try:
node.attrs[ak]
except errors.StandardAttributeError as e:
failed.append(
dict(
name=node.name,
attr_name=ak,
attr_value=node.attrs.raw[ak],
reason="invalid_value",
error_message=str(e),
)
)
else: # av.default_value is consts.DefaultValue.EMPTY:
if (
av.target_method == "create_string_dataset"
and not _is_str_dataset(node)
):
continue # not the responsibility of this validator
if av.target_method == "create_dataset" and _is_str_dataset(
node
):
continue # not the responsibility of this validator
if ak not in node.attrs:
logger.debug(
f'The attribute "{ak}" is missing in the dataset "{name}" but '
"is required by the convention"
)
failed.append(
MissingAttribute(
object_name=node.name, attribute_name=ak
)
)
else:
# just by accessing the standard attribute, the validation is performed
try:
_ = node.attrs[ak]
# av.validate(value_to_check, parent=node, attrs=node.attrs.raw)
logger.debug(f'The attribute "{ak}" is valid')
except errors.StandardAttributeError as e:
logger.debug(
f'The attribute "{ak}" exists but is invalid'
)
failed.append(
InvalidAttribute(
object_name=node.name,
attribute_name=ak,
attribute_value=node.attrs.raw[ak],
error_message=str(e),
)
)
with File(file_or_filename, "r") as f:
logger.debug(
f"Checking file {file_or_filename} for compliance with convention {self.name}"
)
_validate_convention("/", f)
f.visititems(_validate_convention)
return failed
def _clear_all_signatures():
"""Clear all convention parameters from all registered conventions.
This is needed when switching conventions to ensure no leftover parameters
from previous conventions remain on the class methods.
"""
for conv in get_registered_conventions().values():
for cls, methods in conv.methods.items():
for method_name, std_attrs in methods.items():
for std_attr_name in std_attrs:
try:
setattr(
cls,
method_name,
forge.delete(std_attr_name)(cls.__dict__[method_name]),
)
except (ValueError, KeyError):
pass
try:
__doc_string_parser__[cls][method_name].restore_docstring()
except KeyError:
pass
def _import_convention(convention_name) -> "module":
import importlib
try:
return importlib.import_module(f"{convention_name}")
except ImportError:
logger.error(
f"Failed to import module {convention_name}. Most likely the created convention file is erroneous."
)
def _get_convention_from_dir(convention_name: str) -> "Convention":
_convention_name = convention_name.lower().replace("-", "_")
assert "-" not in _convention_name
if _convention_name in get_registered_conventions():
return get_registered_conventions()[convention_name]
_convention_py_filename = CV_DIR / f"{_convention_name}" / f"{_convention_name}.py"
if not _convention_py_filename.exists():
raise ConventionNotFound(f'Convention "{convention_name}" not found.')
logger.debug(
f"Adding path {_convention_py_filename.parent.absolute()} to system path..."
)
sys.path.insert(0, str(_convention_py_filename.parent))
_import_convention(_convention_name)
cv = get_registered_conventions()[convention_name]
cv.filename = _convention_py_filename
return cv
def _is_builtin_convention(name: str) -> bool:
"""Check if convention name is a built-in convention that can be auto-generated."""
return name.lower().replace("-", "_") == "h5tbx"
def _ensure_builtin_convention(name: str) -> bool:
"""Ensure a built-in convention is built and registered.
Returns True if the convention was built, False if already existed or not a builtin.
"""
if not _is_builtin_convention(name):
return False
_convention_name = name.lower().replace("-", "_")
if _convention_name in get_registered_conventions():
return False
_convention_py_filename = CV_DIR / f"{_convention_name}" / f"{_convention_name}.py"
if not _convention_py_filename.exists():
logger.debug(f"Auto-building built-in convention '{name}' on first use")
from . import generate
h5tbx_yaml = pathlib.Path(__file__).parent.parent / "data/h5tbx.yaml"
generate.write_convention_module_from_yaml(h5tbx_yaml)
return True
return False
[docs]
class use:
"""Enable a convention.
To disable the convention, active an empty convention like so: cv.use(None)
Parameters
----------
convention_or_name: Union[str, Convention, None]
The convention name or object to enable.
"""
[docs]
def __init__(self, convention_or_name: Union[str, Convention, None]):
self._latest_convention = get_current_convention()
registered_conventions = get_registered_conventions()
if convention_or_name is None:
self._current_convention = _use(None)
else:
if isinstance(convention_or_name, Convention):
convention_name = convention_or_name.name
else:
convention_name = convention_or_name
_convention_name = convention_name.lower().replace("-", "_")
assert "-" not in _convention_name
registered_convention_names = [
n.lower().replace("-", "_") for n in registered_conventions
]
if _convention_name not in registered_convention_names:
_ensure_builtin_convention(convention_name)
if _convention_name not in registered_convention_names:
cv = _get_convention_from_dir(convention_name)
convention_name = cv.name
self._current_convention = _use(convention_name)
def __repr__(self):
return f'using("{self._current_convention.name}")'
def __enter__(self):
return self._current_convention
def __exit__(self, *args, **kwargs):
_use(self._latest_convention)
def _use(convention_or_name: Union[str, Convention, None]) -> Convention:
"""Use a convention by name or Convention object"""
if isinstance(convention_or_name, Convention):
convention_name = convention_or_name.name
else:
convention_name = convention_or_name
current_convention = get_current_convention()
if convention_name is None:
# reset to default convention
convention_name = "h5py"
if convention_name not in get_registered_conventions():
_ensure_builtin_convention(convention_name)
if convention_name not in get_registered_conventions():
raise ValueError(f'Convention "{convention_name}" is not registered')
logger.debug(f'Switching to convention "{convention_name}"')
if current_convention is not None:
if convention_name == current_convention.name:
return current_convention
# reset signature and dataset decoders:
current_convention._delete_signature()
ds_decoder.decoder_names = ()
# update signature:
current_convention = get_registered_conventions()[convention_name]
_clear_all_signatures() # Clear all convention parameters before adding new ones
current_convention._add_signature()
# update dataset decoders:
ds_decoder.decoder_names = current_convention.decoders
# set_current_convention(current_convention)
cfg._current_convention.set(current_convention)
return current_convention
def get_registered_conventions() -> Dict:
"""Return dictionary of registered convention"""
return cfg._registered_conventions
def add_convention(convention: Convention, name=None):
"""Add a convention to the list of registered convention"""
if not isinstance(convention, Convention):
raise ValueError(f'Convention "{convention}" is not a valid convention')
if name is None:
name = convention.name
cfg._registered_conventions[name] = convention
def get_current_convention() -> Union[None, Convention]:
"""Return the current convention (thread-safe)."""
return cfg._current_convention.get()
def _process_relpath(rel_filename, relative_to):
return str((relative_to / rel_filename).absolute())
def _process_paths(data: Union[Dict, str], relative_to) -> Dict:
# processed_data = {}
if isinstance(data, str):
match = re.search(r"relpath\((.*?)\)", data)
if match:
return _process_relpath(match.group(1), relative_to)
return data
elif isinstance(data, list):
return [_process_paths(item, relative_to) for item in data]
elif isinstance(data, dict):
_data = data.copy()
for key, value in data.items():
if isinstance(value, str):
match = re.search(r"relpath\((.*?)\)", value)
if match:
_data[key] = _process_relpath(match.group(1), relative_to)
elif isinstance(value, list):
_data[key] = [_process_paths(item, relative_to) for item in value]
elif isinstance(value, dict):
_data[key] = _process_paths(_data[key], relative_to)
return _data
return data
def delete(convention: Union[str, Convention]):
"""Delete convention from directory"""
if isinstance(convention, Convention):
convention_name = convention.name
else:
convention_name = convention
cv_dir = CV_DIR / convention_name
if cv_dir.exists():
shutil.rmtree(CV_DIR / convention_name)
cfg._registered_conventions.pop(convention_name, None)
if convention_name in sys.modules:
# if the convention (py script) already has been imported, remove it from the list of imported modules:
del sys.modules[convention_name]
def from_file(filename) -> Convention:
"""Load a convention from a file. Currently yaml and json files are supported"""
if filename.suffix == ".yaml":
return from_yaml(filename)
elif filename.suffix == ".json":
return from_json(filename)
else:
raise ValueError(f"File {filename} has an unknown suffix")
def from_yaml(
filename: Union[str, pathlib.Path], overwrite: bool = False
) -> Convention:
"""Load a convention from a YAML file. See Convention.from_yaml() for details"""
logger.debug(f"Reading Convention from yaml file: {filename}")
return Convention.from_yaml(filename, overwrite=overwrite)
def from_json(
filename: Union[str, pathlib.Path], overwrite: bool = False
) -> Convention:
"""Load a convention from a JSON file. See Convention.from_json() for details"""
return Convention.from_json(filename, overwrite=overwrite)
def from_repo(repo_interface: RepositoryInterface, name: str):
"""Download a YAML file from a repository
Parameters
----------
repo_interface: RepositoryInterface
The repository interface to use for downloading the file
name: str
Name of the file to download
"""
logger.debug(f"Downloading file {name} from repository {repo_interface}")
filename = repo_interface.download_file(name)
_suffix = pathlib.Path(name).suffix
vfunc_filename = name.split(_suffix, 1)
has_vfunc = False
if len(vfunc_filename) == 2:
vfunc_filename = f"{vfunc_filename[0]}_vfuncs.py"
try:
downloaded_vfunc_filename = repo_interface.download_file(vfunc_filename)
has_vfunc = True
except Exception as e:
logger.debug(f"No vfuncs file found for {name}: {e}")
if has_vfunc:
shutil.copy(
downloaded_vfunc_filename, filename.parent / downloaded_vfunc_filename.name
)
logger.debug(f"File downloaded to {filename}. Now loading convention from file.")
return from_file(filename)
def from_zenodo(
doi_or_recid: str,
name: str = None,
overwrite: bool = False,
force_download: bool = False,
) -> Convention:
"""Download a YAML file from a zenodo repository
Depreciated. Use `from_repo` in the future.
Parameters
----------
doi_or_recid: str
DOI of the zenodo repository. Can be a short DOI or a full DOI or the URL (e.g. 10428822 or
10.5281/zenodo.10428822 or https://doi.org/10.5281/zenodo.10428822 or only the record id, e.g. 10428822)
name: str=None
Name to be sed for the filename. If None, the name is taken from the zenodo record.
overwrite: bool = False
Whether to overwrite existing convention with the same name. Default is False
force_download: bool
Whether to force download the file even if it is already cached. Default is False
Returns
-------
cv: Convention
The convention object
"""
# depending on the input, try to convert to a valid DOI:
# parse record id:
warnings.warn("Please use `from_repo` instead of from_zenodo", DeprecationWarning)
rec_id = recid_from_doi_or_redid(doi_or_recid)
if name is None:
filename = UserDir["cache"] / f"{rec_id}"
else:
filename = UserDir["cache"] / f"{rec_id}/{name}"
if not filename.exists() or force_download:
record = zenodo.ZenodoRecord(rec_id)
filenames = list(record.files.keys())
if name is None:
yaml_matches = [
file for file in filenames if pathlib.Path(file).suffix == ".yaml"
]
vfuns_matches = [file for file in filenames if file.endswith("vfuncs.py")]
else:
yaml_matches = [file for file in filenames if file == name]
vfuns_matches = [file for file in filenames if file == f"{name}_vfuncs.py"]
if len(yaml_matches) == 0:
raise ValueError(
f'No file with name "{name}" found in record {doi_or_recid}'
)
found_filenames = [f for f in yaml_matches]
found_filenames.extend(vfuns_matches)
for match in found_filenames:
_filename = record.download_file(
match, target_folder=pathlib.Path(match).parent
)
shutil.move(_filename, match)
return from_yaml(yaml_matches[0], overwrite=overwrite)
def yaml2jsonld(
yaml_filename: Union[str, pathlib.Path],
file_url: str = None,
jsonld_filename: Union[str, pathlib.Path] = None,
) -> pathlib.Path:
"""Converts a convention stored in a YAML file to JSON-LD"""
yaml_filename = pathlib.Path(yaml_filename)
if jsonld_filename is None:
jsonld_filename = yaml_filename.with_suffix(".jsonld")
else:
jsonld_filename = pathlib.Path(jsonld_filename)
cv = Convention.from_yaml(yaml_filename)
from rdflib.namespace import DCAT, RDF, DCTERMS, PROV, FOAF
from ontolutils import M4I
from rdflib import Graph
import rdflib
person_orcid_id = cv.contact # m4i
g = Graph()
if file_url is None:
n_ds = rdflib.BNode()
else:
n_ds = rdflib.URIRef(file_url)
g.add((n_ds, RDF.type, DCAT.Dataset))
n_person = rdflib.URIRef(value=person_orcid_id)
n_affiliation = rdflib.URIRef(value=cv.institution)
g.add((n_person, RDF.type, FOAF.Person))
g.add((n_affiliation, RDF.type, PROV.Organization))
g.add((n_person, M4I.orcidId, rdflib.URIRef(person_orcid_id)))
g.add((n_person, PROV.hadRole, M4I.Researcher))
g.add((n_person, PROV.hadRole, M4I.ContactPerson))
g.add((n_person, rdflib.URIRef("https://schema.org/affiliation"), n_affiliation))
g.add((n_ds, DCTERMS.creator, n_person))
# as jsonld:
with open(jsonld_filename, "w", encoding="utf-8") as f:
f.write(
g.serialize(
format="json-ld",
indent=4,
context={"dcat": DCAT._NS, "dcterms": DCTERMS._NS, "m4i": M4I._NS},
compact=False,
)
)
return jsonld_filename
__all__ = ["datetime_str", "StandardAttribute", "Convention"]