Source code for swh.loader.core.loader

# Copyright (C) 2015-2021  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import datetime
import hashlib
import logging
import os
from typing import Any, Dict, Iterable, Optional

from swh.core.config import load_from_envvar
from swh.loader.exception import NotFound
from swh.model.model import (
    BaseContent,
    Content,
    Directory,
    Origin,
    OriginVisit,
    OriginVisitStatus,
    Release,
    Revision,
    Sha1Git,
    SkippedContent,
    Snapshot,
)
from swh.storage import get_storage
from swh.storage.interface import StorageInterface
from swh.storage.utils import now

DEFAULT_CONFIG: Dict[str, Any] = {
    "max_content_size": 100 * 1024 * 1024,
}


[docs]class BaseLoader: """Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g PyPI, Npm, CRAN, ...) A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... package artifacts), ingests the contents/directories/revisions/releases/snapshot read from those artifacts and send them to the archive through the storage backend. The main entry point for the loader is the :func:`load` function. 2 static methods (:func:`from_config`, :func:`from_configfile`) centralizes and eases the loader instantiation from either configuration dict or configuration file. Some class examples: - :class:`SvnLoader` - :class:`GitLoader` - :class:`PyPILoader` - :class:`NpmLoader` """ visit_date: Optional[datetime.datetime] origin: Optional[Origin] origin_metadata: Dict[str, Any] loaded_snapshot_id: Optional[Sha1Git] def __init__( self, storage: StorageInterface, logging_class: Optional[str] = None, save_data_path: Optional[str] = None, max_content_size: Optional[int] = None, ): super().__init__() self.storage = storage self.max_content_size = int(max_content_size) if max_content_size else None if logging_class is None: logging_class = "%s.%s" % ( self.__class__.__module__, self.__class__.__name__, ) self.log = logging.getLogger(logging_class) _log = logging.getLogger("requests.packages.urllib3.connectionpool") _log.setLevel(logging.WARN) # possibly overridden in self.prepare method self.visit_date = None self.origin = None if not hasattr(self, "visit_type"): self.visit_type: Optional[str] = None self.origin_metadata = {} self.loaded_snapshot_id = None if save_data_path: path = save_data_path os.stat(path) if not os.access(path, os.R_OK | os.W_OK): raise PermissionError("Permission denied: %r" % path) self.save_data_path = save_data_path
[docs] @classmethod def from_config(cls, storage: Dict[str, Any], **config: Any): """Instantiate a loader from a configuration dict. This is basically a backwards-compatibility shim for the CLI. Args: storage: instantiation config for the storage config: the configuration dict for the loader, with the following keys: - credentials (optional): credentials list for the scheduler - any other kwargs passed to the loader. Returns: the instantiated loader """ # Drop the legacy config keys which aren't used for this generation of loader. for legacy_key in ("storage", "celery"): config.pop(legacy_key, None) # Instantiate the storage storage_instance = get_storage(**storage) return cls(storage=storage_instance, **config)
[docs] @classmethod def from_configfile(cls, **kwargs: Any): """Instantiate a loader from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None. Args: kwargs: kwargs passed to the loader instantiation """ config = dict(load_from_envvar(DEFAULT_CONFIG)) config.update({k: v for k, v in kwargs.items() if v is not None}) return cls.from_config(**config)
[docs] def save_data(self) -> None: """Save the data associated to the current load""" raise NotImplementedError
[docs] def get_save_data_path(self) -> str: """The path to which we archive the loader's raw data""" if not hasattr(self, "__save_data_path"): year = str(self.visit_date.year) # type: ignore assert self.origin url = self.origin.url.encode("utf-8") origin_url_hash = hashlib.sha1(url).hexdigest() path = "%s/sha1:%s/%s/%s" % ( self.save_data_path, origin_url_hash[0:2], origin_url_hash, year, ) os.makedirs(path, exist_ok=True) self.__save_data_path = path return self.__save_data_path
[docs] def flush(self) -> None: """Flush any potential buffered data not sent to swh-storage. """ self.storage.flush()
[docs] def cleanup(self) -> None: """Last step executed by the loader. """ raise NotImplementedError
[docs] def prepare_origin_visit(self) -> None: """First step executed by the loader to prepare origin and visit references. Set/update self.origin, and optionally self.origin_url, self.visit_date. """ raise NotImplementedError
def _store_origin_visit(self) -> None: """Store origin and visit references. Sets the self.visit references. """ assert self.origin self.storage.origin_add([self.origin]) if not self.visit_date: # now as default visit_date if not provided self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) assert isinstance(self.visit_date, datetime.datetime) assert isinstance(self.visit_type, str) self.visit = list( self.storage.origin_visit_add( [ OriginVisit( origin=self.origin.url, date=self.visit_date, type=self.visit_type, ) ] ) )[0]
[docs] def prepare(self) -> None: """Second step executed by the loader to prepare some state needed by the loader. Raises NotFound exception if the origin to ingest is not found. """ raise NotImplementedError
[docs] def get_origin(self) -> Origin: """Get the origin that is currently being loaded. self.origin should be set in :func:`prepare_origin` Returns: dict: an origin ready to be sent to storage by :func:`origin_add`. """ assert self.origin return self.origin
[docs] def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading (ex: git/hg/svn/... repository). Returns: a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading. """ raise NotImplementedError
[docs] def store_data(self): """Store fetched data in the database. Should call the :func:`maybe_load_xyz` methods, which handle the bundles sent to storage, rather than send directly. """ raise NotImplementedError
[docs] def store_metadata(self) -> None: """Store fetched metadata in the database. For more information, see implementation in :class:`DepositLoader`. """ pass
[docs] def load_status(self) -> Dict[str, str]: """Detailed loading status. Defaults to logging an eventful load. Returns: a dictionary that is eventually passed back as the task's result to the scheduler, allowing tuning of the task recurrence mechanism. """ return { "status": "eventful", }
[docs] def post_load(self, success: bool = True) -> None: """Permit the loader to do some additional actions according to status after the loading is done. The flag success indicates the loading's status. Defaults to doing nothing. This is up to the implementer of this method to make sure this does not break. Args: success (bool): the success status of the loading """ pass
[docs] def visit_status(self) -> str: """Detailed visit status. Defaults to logging a full visit. """ return "full"
[docs] def pre_cleanup(self) -> None: """As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues. """ pass
[docs] def load(self) -> Dict[str, str]: r"""Loading logic for the loader to follow: - 1. Call :meth:`prepare_origin_visit` to prepare the origin and visit we will associate loading data to - 2. Store the actual ``origin_visit`` to storage - 3. Call :meth:`prepare` to prepare any eventual state - 4. Call :meth:`get_origin` to get the origin we work with and store - while True: - 5. Call :meth:`fetch_data` to fetch the data to store - 6. Call :meth:`store_data` to store the data - 7. Call :meth:`cleanup` to clean up any eventual state put in place in :meth:`prepare` method. """ try: self.pre_cleanup() except Exception: msg = "Cleaning up dangling data failed! Continue loading." self.log.warning(msg) self.prepare_origin_visit() self._store_origin_visit() assert ( self.origin ), "The method `prepare_origin_visit` call should set the origin (Origin)" assert ( self.visit.visit ), "The method `_store_origin_visit` should set the visit (OriginVisit)" self.log.info( "Load origin '%s' with type '%s'", self.origin.url, self.visit.type ) try: self.prepare() while True: more_data_to_fetch = self.fetch_data() self.store_data() if not more_data_to_fetch: break self.store_metadata() visit_status = OriginVisitStatus( origin=self.origin.url, visit=self.visit.visit, type=self.visit_type, date=now(), status=self.visit_status(), snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) self.post_load() except Exception as e: if isinstance(e, NotFound): status = "not_found" task_status = "uneventful" else: status = "partial" if self.loaded_snapshot_id else "failed" task_status = "failed" self.log.exception( "Loading failure, updating to `%s` status", status, extra={ "swh_task_args": [], "swh_task_kwargs": {"origin": self.origin.url}, }, ) visit_status = OriginVisitStatus( origin=self.origin.url, visit=self.visit.visit, type=self.visit_type, date=now(), status=status, snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) self.post_load(success=False) return {"status": task_status} finally: self.flush() self.cleanup() return self.load_status()
[docs]class DVCSLoader(BaseLoader): """This base class is a pattern for dvcs loaders (e.g. git, mercurial). Those loaders are able to load all the data in one go. For example, the loader defined in swh-loader-git :class:`BulkUpdater`. For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), inherit directly from :class:`BaseLoader`. """
[docs] def cleanup(self) -> None: """Clean up an eventual state installed for computations.""" pass
[docs] def has_contents(self) -> bool: """Checks whether we need to load contents""" return True
[docs] def get_contents(self) -> Iterable[BaseContent]: """Get the contents that need to be loaded""" raise NotImplementedError
[docs] def has_directories(self) -> bool: """Checks whether we need to load directories""" return True
[docs] def get_directories(self) -> Iterable[Directory]: """Get the directories that need to be loaded""" raise NotImplementedError
[docs] def has_revisions(self) -> bool: """Checks whether we need to load revisions""" return True
[docs] def get_revisions(self) -> Iterable[Revision]: """Get the revisions that need to be loaded""" raise NotImplementedError
[docs] def has_releases(self) -> bool: """Checks whether we need to load releases""" return True
[docs] def get_releases(self) -> Iterable[Release]: """Get the releases that need to be loaded""" raise NotImplementedError
[docs] def get_snapshot(self) -> Snapshot: """Get the snapshot that needs to be loaded""" raise NotImplementedError
[docs] def eventful(self) -> bool: """Whether the load was eventful""" raise NotImplementedError
[docs] def store_data(self) -> None: assert self.origin if self.save_data_path: self.save_data() if self.has_contents(): for obj in self.get_contents(): if isinstance(obj, Content): self.storage.content_add([obj]) elif isinstance(obj, SkippedContent): self.storage.skipped_content_add([obj]) else: raise TypeError(f"Unexpected content type: {obj}") if self.has_directories(): for directory in self.get_directories(): self.storage.directory_add([directory]) if self.has_revisions(): for revision in self.get_revisions(): self.storage.revision_add([revision]) if self.has_releases(): for release in self.get_releases(): self.storage.release_add([release]) snapshot = self.get_snapshot() self.storage.snapshot_add([snapshot]) self.flush() self.loaded_snapshot_id = snapshot.id