"""Download utilities for h5rdmtoolbox."""
import atexit
import json
import pathlib
import time
import uuid
from typing import Dict, Optional, Union, List
import requests
from pydantic import HttpUrl, validate_call
from rdflib.plugins.shared.jsonld.context import Context
from .file_io import (
_request_with_backoff,
_download_file,
get_checksum,
)
from .. import user, USER_DATA_DIR
from .._version import __version__
logger = __import__("h5rdmtoolbox", fromlist=["logger"]).logger
USER_AGENT_HEADER = {
"User-Agent": f"h5rdmtoolbox/{__version__} (https://github.com/matthiasprobst/h5rdmtoolbox)",
}
M4I_CONTEXT_URL = "https://nfdi4ing.pages.rwth-aachen.de/metadata4ing/metadata4ing/ontology.jsonld"
M4I_LEGACY_CONTEXT_URLS = {
"https://w3id.org/nfdi4ing/metadata4ing/m4i_context.jsonld",
"https://w3id.org/nfdi4ing/metadata4ing/ontology.jsonld",
}
M4I_CONTEXT_URLS = {M4I_CONTEXT_URL, *M4I_LEGACY_CONTEXT_URLS}
M4I_CONTEXT_FALLBACK = {
"@vocab": "http://w3id.org/nfdi4ing/metadata4ing#",
"m4i": "http://w3id.org/nfdi4ing/metadata4ing#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"label": "http://www.w3.org/2000/01/rdf-schema#label",
"description": "http://purl.org/dc/terms/description",
"ProcessingStep": "http://w3id.org/nfdi4ing/metadata4ing#ProcessingStep",
"processing step": "http://w3id.org/nfdi4ing/metadata4ing#ProcessingStep",
"Tool": "http://w3id.org/nfdi4ing/metadata4ing#Tool",
"Method": "http://w3id.org/nfdi4ing/metadata4ing#Method",
"NumericalVariable": "http://w3id.org/nfdi4ing/metadata4ing#NumericalVariable",
"FileSet": "http://w3id.org/nfdi4ing/metadata4ing#FileSet",
"has participant": "http://w3id.org/nfdi4ing/metadata4ing#hasParticipant",
"start time": "http://w3id.org/nfdi4ing/metadata4ing#startTime",
"has employed tool": "http://w3id.org/nfdi4ing/metadata4ing#hasEmployedTool",
"realizes method": "http://w3id.org/nfdi4ing/metadata4ing#realizesMethod",
"investigates": "http://w3id.org/nfdi4ing/metadata4ing#investigates",
"investigatesProperty": "http://w3id.org/nfdi4ing/metadata4ing#investigatesProperty",
"has input": "http://purl.obolibrary.org/obo/RO_0002233",
"has output": "http://purl.obolibrary.org/obo/RO_0002234",
"has ORCID ID": "http://w3id.org/nfdi4ing/metadata4ing#orcidId",
"first name": "http://xmlns.com/foaf/0.1/firstName",
"last name": "http://xmlns.com/foaf/0.1/lastName",
"has parameter": "http://w3id.org/nfdi4ing/metadata4ing#hasParameter",
"has kind of quantity": "http://w3id.org/nfdi4ing/metadata4ing#hasKindOfQuantity",
"has numerical value": "http://w3id.org/nfdi4ing/metadata4ing#hasNumericalValue",
"has unit": "http://w3id.org/nfdi4ing/metadata4ing#hasUnit",
"includes": "http://w3id.org/nfdi4ing/metadata4ing#includes",
}
def _normalize_context_url(url: str) -> str:
if url in M4I_LEGACY_CONTEXT_URLS:
return M4I_CONTEXT_URL
return url
def _context_fallback_for_url(original_url: str, resolved_url: str) -> Optional[Dict]:
if original_url in M4I_CONTEXT_URLS or resolved_url in M4I_CONTEXT_URLS:
return dict(M4I_CONTEXT_FALLBACK)
return None
def _write_context_fallback(context_file: pathlib.Path, fallback_context: Dict) -> None:
with open(context_file, "w", encoding="utf-8") as f:
json.dump({"@context": fallback_context}, f)
def _has_valid_context_cache(context_file: pathlib.Path) -> bool:
if not context_file.exists():
return False
try:
with open(context_file, encoding="utf-8") as f:
json.load(f)
except (OSError, json.JSONDecodeError):
return False
return True
def _handle_transient_context_download_error(
*,
context_file: pathlib.Path,
original_url: str,
resolved_url: str,
reason: str,
) -> None:
if _has_valid_context_cache(context_file):
logger.warning("Using cached context file after %s while downloading %s", reason, resolved_url)
return
fallback_context = _context_fallback_for_url(original_url, resolved_url)
if fallback_context is not None:
logger.warning("Using built-in context fallback after %s while downloading %s", reason, resolved_url)
_write_context_fallback(context_file, fallback_context)
return
context_file.unlink(missing_ok=True)
raise _context_download_error(original_url, resolved_url, "download")
def _context_download_error(original_url: str, resolved_url: str, reason: str = "download") -> RuntimeError:
if original_url == resolved_url:
return RuntimeError(f"Failed to {reason} context file from {resolved_url}")
return RuntimeError(f"Failed to {reason} context file from {original_url} via {resolved_url}")
def download_context(
url_source: Union[HttpUrl, List[HttpUrl]],
force_download: bool = False,
*,
timeout: int = 30,
max_retries: int = 8,
) -> Context:
"""Download one or more JSON-LD context files and return an RDFLib Context.
Context files are cached in ``UserDir["cache"]`` using the final URL path
component as filename. Existing cache files are reused unless
``force_download`` is True.
If a transient download failure occurs, the function tries to keep context
resolution usable without hiding unrelated errors. For ``requests.Timeout``
and retryable HTTP responses (500, 502, 503, 504), an existing valid cached
context file is used. If no valid cache file exists and the URL has a
built-in fallback context, that fallback is written to the cache and used.
Currently, built-in fallback data is provided for known metadata4ing/m4i
context URLs.
Non-transient request errors, non-retryable HTTP errors, and transient
failures without a valid cache or built-in fallback raise ``RuntimeError``.
Partial cache files are removed on non-fallback failures.
Parameters
----------
url_source : HttpUrl or List[HttpUrl]
URL or list of URLs to download from.
force_download : bool, optional
Force download even if a cache file exists, by default False.
timeout : int, optional
Request timeout in seconds, by default 30.
max_retries : int, optional
Maximum number of retries for retryable HTTP responses, by default 8.
Returns
-------
Context
RDFLib Context object built from the downloaded, cached, or fallback
context data.
Raises
------
RuntimeError
If a context cannot be downloaded and no valid cache or built-in
fallback can be used.
Examples
--------
>>> from h5rdmtoolbox.utils import download_context
>>> context = download_context('https://raw.githubusercontent.com/codemeta/codemeta/2.0/codemeta.jsonld')
"""
if not isinstance(url_source, list):
url_source = [url_source]
context_sources = []
context_cache = {}
for url in url_source:
original_url = str(url)
_url = _normalize_context_url(original_url)
_fname = _url.rsplit("/", 1)[-1]
context_file = user.UserDir["cache"] / _fname
if not context_file.exists() or force_download:
logger.debug(f"Downloading context file from {_url} to {context_file}")
try:
r = _request_with_backoff(
"GET", _url, timeout=timeout, max_retries=max_retries
)
r.raise_for_status()
with open(context_file, "wb") as f:
f.write(r.content)
except requests.Timeout:
_handle_transient_context_download_error(
context_file=context_file,
original_url=original_url,
resolved_url=_url,
reason="timeout",
)
except requests.HTTPError as exc:
status_code = getattr(exc.response, "status_code", None)
if status_code in {500, 502, 503, 504}:
_handle_transient_context_download_error(
context_file=context_file,
original_url=original_url,
resolved_url=_url,
reason=f"HTTP {status_code}",
)
else:
context_file.unlink(missing_ok=True)
raise _context_download_error(original_url, _url, "download")
except requests.RequestException:
context_file.unlink(missing_ok=True)
raise _context_download_error(original_url, _url, "download")
with open(context_file, encoding="utf-8") as f:
context_data = json.load(f)
fallback_context = _context_fallback_for_url(original_url, _url)
if fallback_context is not None:
context_data = fallback_context
context_sources.append(context_data)
context_cache[_url] = context_data
context = Context(context_sources)
context._context_cache.update(context_cache)
return context
def download_file(
url,
known_hash=None,
target_folder: Optional[pathlib.Path] = None,
checksum: Optional[str] = None,
params: Optional[Dict] = None,
):
"""Downloads the file or returns the already downloaded file.
Parameters
----------
url : str
URL to download from.
known_hash : str, optional
Expected SHA256 hash of the file.
target_folder : pathlib.Path, optional
Target folder to save the file.
checksum : str, optional
Checksum of the file.
params : dict, optional
Query parameters for the request.
Returns
-------
pathlib.Path
Path to the downloaded file.
"""
dfm = DownloadFileManager()
return dfm.download(
url,
target_folder=target_folder,
known_hash=known_hash,
checksum=checksum,
params=params,
)
[docs]
class DownloadFileManager:
"""Manager for downloading files. By registering checksums and filenames, the manager can be used to
download files from a remote location. The manager will check if the file is already downloaded and if the
checksum matches. If the file is not downloaded, it will be downloaded and the checksum will be checked.
This class is a singleton, hence only one instance can be created."""
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(DownloadFileManager, cls).__new__(
cls, *args, **kwargs
)
return cls._instance
[docs]
def __init__(self):
from ..user import CACHE_DIR
self.file_directory = CACHE_DIR
self.file_directory.mkdir(parents=True, exist_ok=True)
self.registry: Dict[str, Dict[str, str]] = self.load_registry()
atexit.register(self.save_registry)
def __len__(self):
return len(self.registry)
def __repr__(self):
return f"{self.__class__.__name__}({self.file_directory})"
@property
def registry_filename(self) -> pathlib.Path:
return USER_DATA_DIR / "download_registry.json"
def add(
self,
*,
url: str,
filepath: pathlib.Path,
filename: str,
checksum: Optional[str] = None,
):
"""Add to registry. Computes the checksum if not provided.
Parameters
----------
url : str
URL the file was downloaded from.
filepath : pathlib.Path
Path to the file.
filename : str
Original filename.
checksum : str, optional
SHA256 checksum of the file.
"""
filepath = pathlib.Path(filepath)
if not filepath.exists():
raise FileNotFoundError(f"File {filepath} does not exist!")
if checksum is None:
checksum = get_checksum(filepath)
logger.debug(f"Checksum for {filepath} computed: {checksum}")
self.registry[checksum] = {
"url": str(url),
"filepath": str(filepath.resolve()),
"filename": filename,
}
self.save_registry()
def get(self, checksum: str, filename: str) -> Optional[pathlib.Path]:
"""Returns the file path from the registry based on checksum and filename.
Parameters
----------
checksum : str
SHA256 checksum.
filename : str
Filename to look up.
Returns
-------
pathlib.Path or None
Path to the file if found, None otherwise.
"""
entry = self.registry.get(checksum)
if entry and entry.get("filename") == filename:
path = pathlib.Path(entry["filepath"])
if path.exists():
return path
return None
def remove(self, checksum: str, filename: str):
"""Removes a file from the registry based on checksum and filename.
Parameters
----------
checksum : str
SHA256 checksum.
filename : str
Filename to remove.
"""
entry = self.registry.get(checksum)
if entry and entry.get("filename") == filename:
self.registry.pop(checksum)
self.save_registry()
logger.info(f"File removed: {filename} with checksum: {checksum}")
else:
logger.warning(f"No entry found for: {filename} with checksum: {checksum}")
def remove_corrupted_file(self, filename: pathlib.Path):
"""Removes a corrupted file from the registry.
Parameters
----------
filename : pathlib.Path
Path to the corrupted file.
"""
logger.info(f"Removing corrupted file from registry: {filename}")
remove_keys = []
for k, v in self.registry.items():
if pathlib.Path(self.registry[k].get("filepath", None)) == pathlib.Path(
filename
):
remove_keys.append(k)
for k in remove_keys:
self.registry.pop(k)
def save_registry(self):
"""Save the registry to disk."""
max_tries = 10
n_tries = 0
self.registry_filename.parent.mkdir(parents=True, exist_ok=True)
while n_tries < max_tries:
try:
with open(self.registry_filename, "w") as f:
json.dump(self.registry, f, indent=2)
return
except PermissionError:
logger.debug(f"Could not save registry. Trying again in 0.1s")
n_tries += 1
time.sleep(0.1)
logger.debug(
f"Could not save registry after {max_tries} tries. File seems to be locked."
)
def load_registry(self) -> Dict[str, str]:
"""Load the registry from disk.
Returns
-------
Dict
Registry dictionary.
"""
registry_filename = self.registry_filename
if registry_filename.exists():
try:
with open(self.registry_filename, "r") as f:
return json.load(f)
except json.JSONDecodeError as e:
logger.error(
f"Could not load registry file {registry_filename}: {e}. Deleting the file."
)
self.registry_filename.unlink()
return {}
def reset_registry(self):
"""Resets the registry. This will also delete the downloaded files."""
for k, v in self.registry.items():
fpath = v.get("filepath", None)
if fpath:
pathlib.Path(fpath).unlink(missing_ok=True)
self.registry_filename.unlink(missing_ok=True)
self.registry = self.load_registry()
@validate_call
def download(
self,
url: HttpUrl,
*,
target_folder: Optional[pathlib.Path] = None,
params: Optional[Dict] = None,
checksum: Optional[str] = None,
known_hash: Optional[str] = None,
) -> pathlib.Path:
"""Returns the downloaded file. Based on an optionally provided checksum
already downloaded files can be quickly returned.
Parameters
----------
url : HttpUrl
URL to download from.
target_folder : pathlib.Path, optional
Target folder to save the file.
params : dict, optional
Query parameters for the request.
checksum : str, optional
SHA256 checksum.
known_hash : str, optional
Known hash for verification.
Returns
-------
pathlib.Path
Path to the downloaded file.
"""
from .file_io import sanitize_filename
if checksum and checksum in self.registry:
logger.debug("Returning already downloaded file")
filepath = pathlib.Path(self.registry[checksum]["filepath"])
if filepath.exists():
return filepath
self.registry.pop(checksum)
filename = sanitize_filename(str(url).rsplit("/", 1)[-1])
if filename == "":
filename = uuid.uuid4().hex
assert len(filename) > 0, f"Could not extract filename from URL {url}"
if target_folder is None:
file_path = self.file_directory / filename
else:
file_path = pathlib.Path(target_folder) / filename
downloaded_filename = _download_file(
url, known_hash, target=file_path, params=params
)
assert downloaded_filename == file_path, (
f"Expected {file_path}, got {downloaded_filename}"
)
if not checksum:
checksum = get_checksum(downloaded_filename)
self.registry[checksum] = {
"url": str(url),
"filepath": str(downloaded_filename.absolute().resolve()),
}
return downloaded_filename