"""Attribute module"""
import ast
import json
import logging
import warnings
from datetime import datetime
from typing import Dict, Union, Tuple, Optional, Any
import numpy as np
import pint
import pydantic
import rdflib
from h5py._hl.attrs import AttributeManager
from h5py._hl.base import with_phil
from h5py._objects import ObjectID, phil
from pydantic import HttpUrl
from .. import errors
from .. import get_config, convention, utils
from .. import get_ureg
from .. import protected_attributes
from ..convention import consts
logger = logging.getLogger("h5rdmtoolbox")
H5_DIM_ATTRS = protected_attributes.h5rdmtoolbox
class AttrDescriptionError(Exception):
"""Generic attribute description error"""
pass
[docs]
class Attribute:
"""Helper class for quick assignment of RDF attributes to the HDF5 file.
Examples
--------
>>> import h5rdmtoolbox as h5tbx
>>> from ontolutils import M4I
>>> rdf_attr = h5tbx.Attribute('0000-0001-8729-0482', rdf_predicate=M4I.orcidId,
... rdf_object='https://orcid.org/0000-0001-8729-0482')
>>> with h5tbx.File('test.h5', 'w') as h5:
... grp = h5.create_group('person')
... grp.attrs['orcid'] = rdf_attr
... # equal to:
... # grp.attrs['orcid'] = '0000-0001-8729-0482'
... # grp.rdf.predicate['orcid'] = str(M4I.orcidId)
... # grp.rdf.object['orcid'] = 'https://orcid.org/0000-0001-8729-0482'
"""
[docs]
def __init__(
self,
value,
*,
definition: Optional[str] = None,
rdf_predicate=None,
frdf_predicate=None,
rdf_object=None,
frdf_object=None,
):
self.value = value
self.definition = definition # skos:definition
if rdf_predicate is not None and frdf_predicate is not None:
raise ValueError(
"You cannot set both rdf_predicate and frdf_predicate at the same time."
)
if rdf_object is not None and frdf_object is not None:
raise ValueError(
"You cannot set both rdf_object and frdf_object at the same time."
)
self.rdf_predicate = self._validate_rdf(rdf_predicate)
self.frdf_predicate = self._validate_rdf(frdf_predicate)
self.rdf_object = self._validate_rdf(rdf_object)
self.frdf_object = self._validate_rdf(frdf_object)
@staticmethod
def _validate_rdf(value):
if value is None:
return
try:
str(HttpUrl(value))
except pydantic.ValidationError as e:
raise AttrDescriptionError(
f'Invalid URL: "{value}". This was validated with pydantic. Pydantic error: {e}'
)
return value
def __repr__(self) -> str:
out = f"{self.__class__.__name__}({self.value}"
if self.rdf_predicate is not None:
out += f", rdf_predicate={self.rdf_predicate}"
if self.rdf_object is not None:
out += f", rdf_object={self.rdf_object}"
if self.definition is not None:
out += f", definition={self.definition}"
out += ")"
return out
def __str__(self) -> str:
return self.__repr__()
def pop_hdf_attributes(attrs: Dict) -> Dict:
"""Remove HDF attributes like NAME, CLASS, .... from the input dictionary
Parameters
----------
attrs: Dict
Input dictionary
Returns
-------
dict
Dictionary without entries registered in `H5_DIM_ATTRS`
"""
keep = [k for k in attrs.keys() if k not in H5_DIM_ATTRS]
return {k: attrs[k] for k in keep}
# return {k: v for k, v in attrs.items() if k not in H5_DIM_ATTRS}
def _check_iri(url):
import requests
response = requests.get(url)
if response.status_code != 200:
raise ConnectionError(f"URL {url} does not exist.")
return True
class AttributeString(str):
"""String with special methods such as `to_pint()`"""
def to_pint(self) -> "pint.util.Quantity":
"""Returns a pint.Quantity object"""
assert get_ureg().formatter.default_format == get_config("ureg_format")
return get_ureg()(self)
class WrapperAttributeManager(AttributeManager):
"""
Subclass of h5py's Attribute Manager.
Allows storing dictionaries as json strings and to store a dataset or a group as an
attribute. The latter uses the name of the object. When __getitem__() is called and
the name (string) is identified as a dataset or group, then this object is returned.
"""
def __init__(self, parent):
"""Private constructor."""
super().__init__(parent)
self._parent = parent
@staticmethod
def _parse_return_value(_id, ret):
if isinstance(ret, str):
if ret == "":
return ret
if ret[0] == "{":
dictionary = json.loads(ret)
for k, v in dictionary.items():
if isinstance(v, str):
if not v:
dictionary[k] = ""
# else:
# if v[0] == '/':
# if isinstance(_id, h5py.h5g.GroupID):
# rootgrp = get_rootparent(h5py.Group(_id))
# dictionary[k] = rootgrp.get(v)
# elif isinstance(_id, h5py.h5d.DatasetID):
# rootgrp = get_rootparent(h5py.Dataset(_id).parent)
# dictionary[k] = rootgrp.get(v)
return dictionary
# if ret[0] == '/':
# # it may be group or dataset path or actually just a filepath stored by the user
# if isinstance(_id, h5py.h5g.GroupID):
# # call like this, otherwise recursive call!
# from .core import Group
# rootgrp = get_rootparent(Group(_id))
# if rootgrp.get(ret) is None:
# # not a dataset or group, maybe just a filename that has been stored
# return ret
# return rootgrp.get(ret)
# else:
# from .core import Dataset
# rootgrp = get_rootparent(Dataset(_id).parent)
# return rootgrp.get(ret)
if ret[0] == "(":
if ret[-1] == ")":
# might be a tuple object
return ast.literal_eval(ret)
return ret
if ret[0] == "[":
if ret[-1] == "]":
# might be a list object
try:
return ast.literal_eval(ret)
# return ast.literal_eval(ret.replace(' ', ', '))
except (ValueError, NameError, AttributeError):
return ret
return ret
return AttributeString(ret)
if isinstance(ret, np.ndarray) and ret.dtype.name == "object":
vstr = str(ret.tolist())
if "<HDF5 object reference>" in vstr:
return ret
return WrapperAttributeManager._parse_return_value(_id, vstr)
return ret
@with_phil
def __getitem__(self, name: str):
ret = super(WrapperAttributeManager, self).__getitem__(name)
parent = self._parent
cv_props = parent._convention.properties
if get_config("expose_user_prop_to_attrs") and parent.__class__ in cv_props:
if name in cv_props[parent.__class__]:
return cv_props[parent.__class__][name].get(parent)
return WrapperAttributeManager._parse_return_value(self._id, ret)
@with_phil
def __delitem__(self, name):
super().__delitem__(name)
self._parent.rdf.delete(name)
def create(
self,
name,
data,
shape=None,
dtype=None,
rdf_predicate: Union[str, rdflib.URIRef] = None,
rdf_object: Optional[Union[str, rdflib.URIRef]] = None,
frdf_predicate: Union[str, rdflib.URIRef] = None,
frdf_object: Optional[Union[str, rdflib.URIRef]] = None,
definition: Optional[str] = None,
**kwargs,
) -> Any:
"""
Create a new attribute.
.. note:: Via the config setting "ignore_none" (`h5tbx.set_config(ignore_none=True)`) attribute values, that are None are not written.
Parameters
----------
name: str
Name of the attribute.
data: any
Attribute value.
shape: tuple, optional
Shape of the attribute. If None, the shape is determined from the data.
dtype:
Data type of the attribute. If None, the data type is determined from the data.
rdf_predicate: Union[str, rdflib.URIRef], optional
IRI of the predicate
rdf_object: Union[str, rdflib.URIRef], optional
IRI of the object
"""
if data is None and get_config("ignore_none"):
logger.debug(
f'Attribute "{name}" is None and "ignore_none" in config is True. Attribute is not created.'
)
return
r = super().create(
name, utils.parse_object_for_attribute_setting(data), shape, dtype
)
_predicate = kwargs.get("predicate", None)
if _predicate is not None:
rdf_predicate = _predicate
warnings.warn(
'The "predicate" argument is deprecated. Use "rdf_predicate" instead.',
DeprecationWarning,
)
_object = kwargs.get("object", None)
if _object is not None:
rdf_object = _object
warnings.warn(
'The "object" argument is deprecated. Use "rdf_object" instead.',
DeprecationWarning,
)
if rdf_predicate is not None:
self._parent.rdf.predicate[name] = rdf_predicate
if rdf_object is not None:
self._parent.rdf.object[name] = rdf_object
if definition is not None:
self._parent.rdf[name].definition = definition
_frdf = None
if frdf_predicate is not None or frdf_object is not None:
try:
_frdf = self._parent.frdf
except AttributeError:
raise AttributeError(
'You try to assign a rdf to the file level, however, "{self._parent.name}" is not the root level'
)
if _frdf:
if frdf_predicate is not None:
_frdf.predicate[name] = frdf_predicate
if frdf_object is not None:
_frdf.object[name] = frdf_object
return r
@with_phil
def __setitem__(
self, name: Union[str, Tuple[str, str]], value, attrs: Optional[Dict] = None
):
"""Set a new attribute, overwriting any existing attribute.
The type and shape of the attribute are determined from the data. To
use a specific type or shape, or to preserve the type of attribute,
use the methods create() and modify().
Parameters
----------
name : Union[str, Tuple[str, str]]
Name of the attribute. If it is a tuple, the second element is the IRI of the attribute.
value : any
Attribute value. Can also be type `AttributeValue` to set a value and its object IRI.
"""
if name == "_parent":
return
if isinstance(value, Attribute):
object_iri = value.rdf_object
predicate_iri = value.rdf_predicate
fpredicate_iri = value.frdf_predicate
frdf_object_iri = value.frdf_object
attr_def = value.definition
value = value.value
if not isinstance(name, tuple):
self.create(
name,
value,
rdf_predicate=predicate_iri,
frdf_predicate=fpredicate_iri,
rdf_object=object_iri,
frdf_object=frdf_object_iri,
definition=attr_def,
)
elif isinstance(value, rdflib.Literal):
object_iri = value
value = value.value
self.create(
name, value, rdf_predicate=None, rdf_object=object_iri, definition=None
)
else:
object_iri = None
predicate_iri = None
attr_def = None
if isinstance(name, tuple):
# length must be 2, second element must be a IRI (not checked though)
if not len(name) == 2:
raise ValueError(
"Tuple must have length 2 in order to interpret it as an"
"attribute name and its IRI"
)
if predicate_iri is not None:
raise ValueError(
"You cannot set the predicate iri at the same time by RDFAttribute and through "
"the tuple syntax."
)
_name, predicate_iri = name
self.create(
_name,
value,
rdf_predicate=predicate_iri,
rdf_object=object_iri,
definition=attr_def,
)
return
if not isinstance(name, str):
raise TypeError(f"Attribute name must be a str but got {type(name)}")
curr_cv = self._parent._convention
parent = self._parent
# obj_type = parent.__class__
if parent.__class__ in curr_cv.properties:
sattr = curr_cv.properties[parent.__class__].get(name, None)
if sattr is not None:
logger.debug(f"validating {name} with {sattr}")
# try:
if value is consts.DefaultValue.NONE:
# no value given and not mandatory. just not set it and do nothing
return
if value == "None":
value = None
if value is consts.DefaultValue.EMPTY:
# no value given, but is mandatory. check if there's an alternative
_alternative_sattr = sattr.alternative_standard_attribute
if _alternative_sattr is None:
raise errors.StandardAttributeError(
f'Convention "{curr_cv.name}" expects standard attribute "{name}" to be provided '
f"as an argument during {self._parent.__class__.__name__.lower()} creation."
)
if attrs[_alternative_sattr] is None:
other_provided_attrs = {
k: v
for k, v in attrs.items()
if v is not None
and not isinstance(v, consts._SpecialDefaults)
}
raise errors.StandardAttributeError(
f'Convention "{curr_cv.name}" expects standard attribute "{name}" to be provided '
f"as an argument during {self._parent.__class__.__name__.lower()} creation. Alternative "
f'standard attribute for it is "{_alternative_sattr}" but is not found in the other '
f"provided attributes: {other_provided_attrs}."
)
return
if isinstance(value, consts.DefaultValue):
value = value.value
return sattr.set(parent=parent, value=value, attrs=attrs)
utils.create_special_attribute(self, name, value)
def __repr__(self):
return super().__repr__()
def __str__(self):
outstr = ""
adict = dict(self.items())
key_lens = [len(k) for k in adict.keys()]
if len(key_lens) == 0:
return None
keylen = max(key_lens)
for k, v in adict.items():
outstr += f"{k:{keylen}} {v}\n"
return outstr[:-1]
def __getattr__(self, item):
if get_config("natural_naming"):
if item in self.__dict__:
return super().__getattribute__(item)
if item in self.keys():
return self[item]
return super().__getattribute__(item)
return super().__getattribute__(item)
def __setattr__(self, key, value):
if key in ("_parent",):
super().__setattr__(key, value)
return
if not isinstance(value, ObjectID):
if get_config("natural_naming"):
self.__setitem__(key, value)
return
else:
raise RuntimeError(
"Natural naming is disabled. Use the setitem method to set attributes."
)
super().__setattr__(key, value)
def rename(self, key, new_name):
"""Rename an existing attribute"""
tmp_val = self[key]
self[new_name] = tmp_val
assert tmp_val == self[new_name]
del self[key]
def sdump(self, show_private=True) -> None:
"""Print all attributes. Hides all attributes that start with __ and end with __ if show_private is False.
Parameters
----------
show_private : bool, optional
If True, all attributes are shown, by default True. If False, all attributes that start with
"__" and end with "__" are hidden.
"""
first_line = f'Attributes of "{self._parent.name}":'
print(first_line)
print("-" * len(first_line))
adict = dict(self.items())
if not show_private:
key_lens = [
len(k)
for k in adict.keys()
if not k.startswith("__") and not k.endswith("__")
]
else:
key_lens = [len(k) for k in adict.keys()]
if len(key_lens) == 0:
return None
keylen = max(key_lens)
for k, v in adict.items():
if not show_private:
if k.startswith("__") and k.endswith("__"):
continue
print(f"{k:{keylen}}: {v}")
@property
def raw(self) -> AttributeManager:
"""Return the original h5py attribute object manager"""
with phil:
return AttributeManager(self._parent)
def write_uuid(
self,
uuid: Optional[str] = None,
name: Optional[str] = None,
overwrite: bool = False,
) -> str:
"""Write an uuid to the attribute of the object.
Parameters
----------
uuid : str=None
The uuid to write. If None, a new uuid is generated.
name: str=None
Name of the attribute. If None, the default name is taken from the configuration.
overwrite: bool=False
If the attribute already exists, it is not overwritten if overwrite is False.
Returns
-------
str
The uuid as string.
"""
if name is None:
name = get_config("uuid_name")
if name in self and not overwrite:
raise ValueError(
f'The attribute "{name}" cannot be written. It already exists and '
'"overwrite" is set to False'
)
if uuid is None:
from uuid import uuid4
uuid = uuid4()
suuid = str(uuid)
self.create(name=name, data=suuid)
return suuid
def write_iso_timestamp(
self,
name="timestamp",
dt: Optional[datetime] = None,
overwrite: bool = False,
**kwargs,
):
"""Write the iso timestamp to the attribute of the object.
Parameters
--
"""
if name in self and not overwrite:
raise ValueError(
f'The attribute "{name}" cannot be written. It already exists and '
'"overwrite" is set to False'
)
if dt is None:
dt = datetime.now()
else:
if not isinstance(dt, datetime):
raise TypeError(
f'Invalid type for parameter "dt". Expected type datetime but got "{type(dt)}"'
)
self.create(name=name, data=dt.isoformat(**kwargs))