Source code for h5rdmtoolbox.utils.download

"""Download utilities for h5rdmtoolbox."""

import atexit
import json
import pathlib
import time
import uuid
from typing import Dict, Optional, Union, List

import requests
from pydantic import HttpUrl, validate_call
from rdflib.plugins.shared.jsonld.context import Context

from .file_io import (
    _request_with_backoff,
    _download_file,
    get_checksum,
)
from .. import user, USER_DATA_DIR
from .._version import __version__

logger = __import__("h5rdmtoolbox", fromlist=["logger"]).logger
USER_AGENT_HEADER = {
    "User-Agent": f"h5rdmtoolbox/{__version__} (https://github.com/matthiasprobst/h5rdmtoolbox)",
}

M4I_CONTEXT_URL = "https://nfdi4ing.pages.rwth-aachen.de/metadata4ing/metadata4ing/ontology.jsonld"
M4I_LEGACY_CONTEXT_URLS = {
    "https://w3id.org/nfdi4ing/metadata4ing/m4i_context.jsonld",
    "https://w3id.org/nfdi4ing/metadata4ing/ontology.jsonld",
}
M4I_CONTEXT_URLS = {M4I_CONTEXT_URL, *M4I_LEGACY_CONTEXT_URLS}
M4I_CONTEXT_FALLBACK = {
    "@vocab": "http://w3id.org/nfdi4ing/metadata4ing#",
    "m4i": "http://w3id.org/nfdi4ing/metadata4ing#",
    "rdfs": "http://www.w3.org/2000/01/rdf-schema#",
    "label": "http://www.w3.org/2000/01/rdf-schema#label",
    "description": "http://purl.org/dc/terms/description",
    "ProcessingStep": "http://w3id.org/nfdi4ing/metadata4ing#ProcessingStep",
    "processing step": "http://w3id.org/nfdi4ing/metadata4ing#ProcessingStep",
    "Tool": "http://w3id.org/nfdi4ing/metadata4ing#Tool",
    "Method": "http://w3id.org/nfdi4ing/metadata4ing#Method",
    "NumericalVariable": "http://w3id.org/nfdi4ing/metadata4ing#NumericalVariable",
    "FileSet": "http://w3id.org/nfdi4ing/metadata4ing#FileSet",
    "has participant": "http://w3id.org/nfdi4ing/metadata4ing#hasParticipant",
    "start time": "http://w3id.org/nfdi4ing/metadata4ing#startTime",
    "has employed tool": "http://w3id.org/nfdi4ing/metadata4ing#hasEmployedTool",
    "realizes method": "http://w3id.org/nfdi4ing/metadata4ing#realizesMethod",
    "investigates": "http://w3id.org/nfdi4ing/metadata4ing#investigates",
    "investigatesProperty": "http://w3id.org/nfdi4ing/metadata4ing#investigatesProperty",
    "has input": "http://purl.obolibrary.org/obo/RO_0002233",
    "has output": "http://purl.obolibrary.org/obo/RO_0002234",
    "has ORCID ID": "http://w3id.org/nfdi4ing/metadata4ing#orcidId",
    "first name": "http://xmlns.com/foaf/0.1/firstName",
    "last name": "http://xmlns.com/foaf/0.1/lastName",
    "has parameter": "http://w3id.org/nfdi4ing/metadata4ing#hasParameter",
    "has kind of quantity": "http://w3id.org/nfdi4ing/metadata4ing#hasKindOfQuantity",
    "has numerical value": "http://w3id.org/nfdi4ing/metadata4ing#hasNumericalValue",
    "has unit": "http://w3id.org/nfdi4ing/metadata4ing#hasUnit",
    "includes": "http://w3id.org/nfdi4ing/metadata4ing#includes",
}


def _normalize_context_url(url: str) -> str:
    if url in M4I_LEGACY_CONTEXT_URLS:
        return M4I_CONTEXT_URL
    return url


def _context_fallback_for_url(original_url: str, resolved_url: str) -> Optional[Dict]:
    if original_url in M4I_CONTEXT_URLS or resolved_url in M4I_CONTEXT_URLS:
        return dict(M4I_CONTEXT_FALLBACK)
    return None


def _write_context_fallback(context_file: pathlib.Path, fallback_context: Dict) -> None:
    with open(context_file, "w", encoding="utf-8") as f:
        json.dump({"@context": fallback_context}, f)


def _has_valid_context_cache(context_file: pathlib.Path) -> bool:
    if not context_file.exists():
        return False
    try:
        with open(context_file, encoding="utf-8") as f:
            json.load(f)
    except (OSError, json.JSONDecodeError):
        return False
    return True


def _handle_transient_context_download_error(
        *,
        context_file: pathlib.Path,
        original_url: str,
        resolved_url: str,
        reason: str,
) -> None:
    if _has_valid_context_cache(context_file):
        logger.warning("Using cached context file after %s while downloading %s", reason, resolved_url)
        return

    fallback_context = _context_fallback_for_url(original_url, resolved_url)
    if fallback_context is not None:
        logger.warning("Using built-in context fallback after %s while downloading %s", reason, resolved_url)
        _write_context_fallback(context_file, fallback_context)
        return

    context_file.unlink(missing_ok=True)
    raise _context_download_error(original_url, resolved_url, "download")


def _context_download_error(original_url: str, resolved_url: str, reason: str = "download") -> RuntimeError:
    if original_url == resolved_url:
        return RuntimeError(f"Failed to {reason} context file from {resolved_url}")
    return RuntimeError(f"Failed to {reason} context file from {original_url} via {resolved_url}")


def download_context(
        url_source: Union[HttpUrl, List[HttpUrl]],
        force_download: bool = False,
        *,
        timeout: int = 30,
        max_retries: int = 8,
) -> Context:
    """Download one or more JSON-LD context files and return an RDFLib Context.

     Context files are cached in ``UserDir["cache"]`` using the final URL path
     component as filename. Existing cache files are reused unless
     ``force_download`` is True.

     If a transient download failure occurs, the function tries to keep context
     resolution usable without hiding unrelated errors. For ``requests.Timeout``
     and retryable HTTP responses (500, 502, 503, 504), an existing valid cached
     context file is used. If no valid cache file exists and the URL has a
     built-in fallback context, that fallback is written to the cache and used.
     Currently, built-in fallback data is provided for known metadata4ing/m4i
     context URLs.

     Non-transient request errors, non-retryable HTTP errors, and transient
     failures without a valid cache or built-in fallback raise ``RuntimeError``.
     Partial cache files are removed on non-fallback failures.

     Parameters
     ----------
     url_source : HttpUrl or List[HttpUrl]
         URL or list of URLs to download from.
     force_download : bool, optional
         Force download even if a cache file exists, by default False.
     timeout : int, optional
         Request timeout in seconds, by default 30.
     max_retries : int, optional
         Maximum number of retries for retryable HTTP responses, by default 8.

     Returns
     -------
     Context
         RDFLib Context object built from the downloaded, cached, or fallback
         context data.

     Raises
     ------
     RuntimeError
         If a context cannot be downloaded and no valid cache or built-in
         fallback can be used.

     Examples
     --------
     >>> from h5rdmtoolbox.utils import download_context
     >>> context = download_context('https://raw.githubusercontent.com/codemeta/codemeta/2.0/codemeta.jsonld')
     """
    if not isinstance(url_source, list):
        url_source = [url_source]

    context_sources = []
    context_cache = {}
    for url in url_source:
        original_url = str(url)
        _url = _normalize_context_url(original_url)
        _fname = _url.rsplit("/", 1)[-1]
        context_file = user.UserDir["cache"] / _fname
        if not context_file.exists() or force_download:
            logger.debug(f"Downloading context file from {_url} to {context_file}")
            try:
                r = _request_with_backoff(
                    "GET", _url, timeout=timeout, max_retries=max_retries
                )
                r.raise_for_status()
                with open(context_file, "wb") as f:
                    f.write(r.content)
            except requests.Timeout:
                _handle_transient_context_download_error(
                    context_file=context_file,
                    original_url=original_url,
                    resolved_url=_url,
                    reason="timeout",
                )
            except requests.HTTPError as exc:
                status_code = getattr(exc.response, "status_code", None)
                if status_code in {500, 502, 503, 504}:
                    _handle_transient_context_download_error(
                        context_file=context_file,
                        original_url=original_url,
                        resolved_url=_url,
                        reason=f"HTTP {status_code}",
                    )
                else:
                    context_file.unlink(missing_ok=True)
                    raise _context_download_error(original_url, _url, "download")
            except requests.RequestException:
                context_file.unlink(missing_ok=True)
                raise _context_download_error(original_url, _url, "download")
        with open(context_file, encoding="utf-8") as f:
            context_data = json.load(f)
        fallback_context = _context_fallback_for_url(original_url, _url)
        if fallback_context is not None:
            context_data = fallback_context
        context_sources.append(context_data)
        context_cache[_url] = context_data
    context = Context(context_sources)
    context._context_cache.update(context_cache)
    return context


def download_file(
        url,
        known_hash=None,
        target_folder: Optional[pathlib.Path] = None,
        checksum: Optional[str] = None,
        params: Optional[Dict] = None,
):
    """Downloads the file or returns the already downloaded file.

    Parameters
    ----------
    url : str
        URL to download from.
    known_hash : str, optional
        Expected SHA256 hash of the file.
    target_folder : pathlib.Path, optional
        Target folder to save the file.
    checksum : str, optional
        Checksum of the file.
    params : dict, optional
        Query parameters for the request.

    Returns
    -------
    pathlib.Path
        Path to the downloaded file.
    """
    dfm = DownloadFileManager()
    return dfm.download(
        url,
        target_folder=target_folder,
        known_hash=known_hash,
        checksum=checksum,
        params=params,
    )


[docs] class DownloadFileManager: """Manager for downloading files. By registering checksums and filenames, the manager can be used to download files from a remote location. The manager will check if the file is already downloaded and if the checksum matches. If the file is not downloaded, it will be downloaded and the checksum will be checked. This class is a singleton, hence only one instance can be created.""" _instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(DownloadFileManager, cls).__new__( cls, *args, **kwargs ) return cls._instance
[docs] def __init__(self): from ..user import CACHE_DIR self.file_directory = CACHE_DIR self.file_directory.mkdir(parents=True, exist_ok=True) self.registry: Dict[str, Dict[str, str]] = self.load_registry() atexit.register(self.save_registry)
def __len__(self): return len(self.registry) def __repr__(self): return f"{self.__class__.__name__}({self.file_directory})" @property def registry_filename(self) -> pathlib.Path: return USER_DATA_DIR / "download_registry.json" def add( self, *, url: str, filepath: pathlib.Path, filename: str, checksum: Optional[str] = None, ): """Add to registry. Computes the checksum if not provided. Parameters ---------- url : str URL the file was downloaded from. filepath : pathlib.Path Path to the file. filename : str Original filename. checksum : str, optional SHA256 checksum of the file. """ filepath = pathlib.Path(filepath) if not filepath.exists(): raise FileNotFoundError(f"File {filepath} does not exist!") if checksum is None: checksum = get_checksum(filepath) logger.debug(f"Checksum for {filepath} computed: {checksum}") self.registry[checksum] = { "url": str(url), "filepath": str(filepath.resolve()), "filename": filename, } self.save_registry() def get(self, checksum: str, filename: str) -> Optional[pathlib.Path]: """Returns the file path from the registry based on checksum and filename. Parameters ---------- checksum : str SHA256 checksum. filename : str Filename to look up. Returns ------- pathlib.Path or None Path to the file if found, None otherwise. """ entry = self.registry.get(checksum) if entry and entry.get("filename") == filename: path = pathlib.Path(entry["filepath"]) if path.exists(): return path return None def remove(self, checksum: str, filename: str): """Removes a file from the registry based on checksum and filename. Parameters ---------- checksum : str SHA256 checksum. filename : str Filename to remove. """ entry = self.registry.get(checksum) if entry and entry.get("filename") == filename: self.registry.pop(checksum) self.save_registry() logger.info(f"File removed: {filename} with checksum: {checksum}") else: logger.warning(f"No entry found for: {filename} with checksum: {checksum}") def remove_corrupted_file(self, filename: pathlib.Path): """Removes a corrupted file from the registry. Parameters ---------- filename : pathlib.Path Path to the corrupted file. """ logger.info(f"Removing corrupted file from registry: {filename}") remove_keys = [] for k, v in self.registry.items(): if pathlib.Path(self.registry[k].get("filepath", None)) == pathlib.Path( filename ): remove_keys.append(k) for k in remove_keys: self.registry.pop(k) def save_registry(self): """Save the registry to disk.""" max_tries = 10 n_tries = 0 self.registry_filename.parent.mkdir(parents=True, exist_ok=True) while n_tries < max_tries: try: with open(self.registry_filename, "w") as f: json.dump(self.registry, f, indent=2) return except PermissionError: logger.debug(f"Could not save registry. Trying again in 0.1s") n_tries += 1 time.sleep(0.1) logger.debug( f"Could not save registry after {max_tries} tries. File seems to be locked." ) def load_registry(self) -> Dict[str, str]: """Load the registry from disk. Returns ------- Dict Registry dictionary. """ registry_filename = self.registry_filename if registry_filename.exists(): try: with open(self.registry_filename, "r") as f: return json.load(f) except json.JSONDecodeError as e: logger.error( f"Could not load registry file {registry_filename}: {e}. Deleting the file." ) self.registry_filename.unlink() return {} def reset_registry(self): """Resets the registry. This will also delete the downloaded files.""" for k, v in self.registry.items(): fpath = v.get("filepath", None) if fpath: pathlib.Path(fpath).unlink(missing_ok=True) self.registry_filename.unlink(missing_ok=True) self.registry = self.load_registry() @validate_call def download( self, url: HttpUrl, *, target_folder: Optional[pathlib.Path] = None, params: Optional[Dict] = None, checksum: Optional[str] = None, known_hash: Optional[str] = None, ) -> pathlib.Path: """Returns the downloaded file. Based on an optionally provided checksum already downloaded files can be quickly returned. Parameters ---------- url : HttpUrl URL to download from. target_folder : pathlib.Path, optional Target folder to save the file. params : dict, optional Query parameters for the request. checksum : str, optional SHA256 checksum. known_hash : str, optional Known hash for verification. Returns ------- pathlib.Path Path to the downloaded file. """ from .file_io import sanitize_filename if checksum and checksum in self.registry: logger.debug("Returning already downloaded file") filepath = pathlib.Path(self.registry[checksum]["filepath"]) if filepath.exists(): return filepath self.registry.pop(checksum) filename = sanitize_filename(str(url).rsplit("/", 1)[-1]) if filename == "": filename = uuid.uuid4().hex assert len(filename) > 0, f"Could not extract filename from URL {url}" if target_folder is None: file_path = self.file_directory / filename else: file_path = pathlib.Path(target_folder) / filename downloaded_filename = _download_file( url, known_hash, target=file_path, params=params ) assert downloaded_filename == file_path, ( f"Expected {file_path}, got {downloaded_filename}" ) if not checksum: checksum = get_checksum(downloaded_filename) self.registry[checksum] = { "url": str(url), "filepath": str(downloaded_filename.absolute().resolve()), } return downloaded_filename