swh.loader.core.loader module#

class swh.loader.core.loader.BaseLoader(storage: StorageInterface, origin_url: str, logging_class: str | None = None, save_data_path: str | None = None, max_content_size: int | None = None, lister_name: str | None = None, lister_instance_name: str | None = None, metadata_fetcher_credentials: Dict[str, Dict[str, List[Dict[str, str]]]] | None = None, create_partial_snapshot: bool = False)[source]#

Bases: object

Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, …) or PackageLoader (e.g PyPI, Npm, CRAN, …)

A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/… package artifacts), ingests the contents/directories/revisions/releases/snapshot read from those artifacts and send them to the archive through the storage backend.

The main entry point for the loader is the load() function.

2 static methods (from_config(), from_configfile()) centralizes and eases the loader instantiation from either configuration dict or configuration file.

Some class examples:

  • SvnLoader

  • GitLoader

  • PyPILoader

  • NpmLoader

Parameters:
  • lister_name – Name of the lister which triggered this load. If provided, the loader will try to use the forge’s API to retrieve extrinsic metadata

  • lister_instance_name – Name of the lister instance which triggered this load. Must be None iff lister_name is, but it may be the empty string for listers with a single instance.

visit_type: str#
origin: Origin#
loaded_snapshot_id: bytes | None#
parent_origins: List[Origin] | None#

If the given origin is a “forge fork” (ie. created with the “Fork” button of GitHub-like forges), build_extrinsic_origin_metadata() sets this to a list of origins it was forked from; closest parent first.

classmethod from_config(storage: Dict[str, Any], overrides: Dict[str, Any] | None = None, **extra_kwargs: Any)[source]#

Instantiate a loader from a configuration dict.

This is basically a backwards-compatibility shim for the CLI.

Parameters:
  • storage – instantiation config for the storage

  • overrides – A dict of extra configuration for loaders. Maps fully qualified class names (e.g. "swh.loader.git.loader.GitLoader") to a dict of extra keyword arguments to pass to this (and only this) loader.

  • extra_kwargs – all extra keyword arguments are passed to all loaders

Returns:

the instantiated loader

classmethod from_configfile(**kwargs: Any)[source]#

Instantiate a loader from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None.

Parameters:

kwargs – kwargs passed to the loader instantiation

save_data() None[source]#

Save the data associated to the current load

get_save_data_path() str[source]#

The path to which we archive the loader’s raw data

flush() Dict[str, int][source]#

Flush any potential buffered data not sent to swh-storage. Returns the same value as swh.storage.interface.StorageInterface.flush().

cleanup() None[source]#

Last step executed by the loader.

prepare() None[source]#
Second step executed by the loader to prepare some state needed by

the loader.

Raises

NotFound exception if the origin to ingest is not found.

get_origin() Origin[source]#

Get the origin that is currently being loaded. self.origin should be set in prepare_origin()

Returns:

an origin ready to be sent to storage by origin_add().

Return type:

dict

fetch_data() bool[source]#
Fetch the data from the source the loader is currently loading

(ex: git/hg/svn/… repository).

Returns:

a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading.

process_data() bool[source]#

Run any additional processing between fetching and storing the data

Returns:

a value that is interpreted as a boolean. If True, fetch_data() needs to be called again to complete loading. Ignored if fetch_data() already returned False.

store_data() None[source]#

Store fetched and processed data in the storage.

This should call the storage.<object>_add methods, which handle the objects to store in the storage.

load_status() Dict[str, str][source]#

Detailed loading status.

Defaults to logging an eventful load.

Returns: a dictionary that is eventually passed back as the task’s

result to the scheduler, allowing tuning of the task recurrence mechanism.

post_load(success: bool = True) None[source]#

Permit the loader to do some additional actions according to status after the loading is done. The flag success indicates the loading’s status.

Defaults to doing nothing.

This is up to the implementer of this method to make sure this does not break.

Parameters:

success (bool) – the success status of the loading

visit_status() str[source]#

Detailed visit status.

Defaults to logging a full visit.

pre_cleanup() None[source]#

As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues.

build_partial_snapshot() Snapshot | None[source]#

When the loader is configured to serialize partial snapshot, this allows the loader to give an implementation that builds a partial snapshot. This is used when the ingestion is taking multiple calls to fetch_data() and store_data(). Ignored when the loader is not configured to serialize partial snapshot.

load() Dict[str, str][source]#

Loading logic for the loader to follow:

load_metadata_objects(metadata_objects: List[RawExtrinsicMetadata]) None[source]#
build_extrinsic_origin_metadata() List[RawExtrinsicMetadata][source]#

Builds a list of full RawExtrinsicMetadata objects, using a metadata fetcher returned by get_fetcher_classes().

statsd_timed(name: str, tags: Dict[str, Any] = {}) ContextManager[source]#

Wrapper for swh.core.statsd.Statsd.timed(), which uses the standard metric name and tags for loaders.

statsd_timing(name: str, value: float, tags: Dict[str, Any] = {}) None[source]#

Wrapper for swh.core.statsd.Statsd.timing(), which uses the standard metric name and tags for loaders.

statsd_average(name: str, value: int | float, tags: Dict[str, Any] = {}) None[source]#

Increments both {name}_sum (by the value) and {name}_count (by 1), allowing to prometheus to compute the average value over time.

class swh.loader.core.loader.NodeLoader(storage: StorageInterface, url: str, checksums: Dict[str, str], checksums_computation: str | None = None, checksum_layout: str | None = None, fallback_urls: List[str] | None = None, **kwargs)[source]#

Bases: BaseLoader, ABC

Common abstract class for ContentLoader and Directoryloader.

The “checksums” field is a dictionary of hex hashes on the object retrieved (content or directory). When “checksum_layout” is “standard”, the checksums are computed on the content of the remote file to retrieve itself (as unix cli allows, “sha1sum”, “sha256sum”, …). When “checksum_layout” is “nar”, the checks is delegated to Nar class (which does an equivalent hash computation as the nix store –dump cli). It’s actually checksums on the content of the remote artifact retrieved (be it a file or an archive). Other “checksum_layout” will raise UnsupportedChecksumLayout.

The multiple “fallback” urls received are mirror urls only used to fetch the object if the main origin is no longer available. Those are not stored.

Ingestion is considered eventful on the first ingestion. Subsequent load of the same object should end up being an uneventful visit (matching snapshot).

extid_version = 1#
prepare() None[source]#
Second step executed by the loader to prepare some state needed by

the loader.

Raises

NotFound exception if the origin to ingest is not found.

load_status() Dict[str, Any][source]#

Detailed loading status.

Defaults to logging an eventful load.

Returns: a dictionary that is eventually passed back as the task’s

result to the scheduler, allowing tuning of the task recurrence mechanism.

cleanup() None[source]#

Last step executed by the loader.

abstract fetch_artifact() Iterator[Path][source]#

This fetches an artifact representation and yields its associated local representation (as Path). Depending on the implementation, this may yield contents coming from a remote location, or directories coming from tarball, svn tree, git tree, hg tree, …

Raises

NotFound if nothing is found; ValueError in case of mismatched checksums

abstract process_artifact(artifact_path: Path) None[source]#

Build the DAG objects out of the locally retrieved artifact.

fetch_data() bool[source]#

Fetch artifact (e.g. content, directory), checks and ingests the DAG objects coming from the artifact.

This iterates over the generator fetch_artifact() to retrieve artifact. As soon as one is retrieved and pass the checks (e.g. nar checks if the “checksum_layout” is “nar”), the method proceeds with the DAG ingestion as usual. If the artifact does not pass the check, this tries to retrieve the next mirrored artifact. If no artifacts is retrievable, this raises.

Raises

NotFound if no artifact is found; ValueError in case of mismatched checksums

store_extids(node: Content | Directory) None[source]#

Store the checksums provided as extids for node.

This stores as much ExtID types as there are keys in the provided self.checksums dict.

class swh.loader.core.loader.ContentLoader(*args, **kwargs)[source]#

Bases: NodeLoader

Basic loader for edge case content ingestion.

The output snapshot is of the form:

id: <bytes>
branches:
  HEAD:
    target_type: content
    target: <content-id>
visit_type: str = 'content'#
fetch_artifact() Iterator[Path][source]#

Iterates over the mirror urls to find a content.

Raises

NotFound if nothing is found; ValueError in case of any error when fetching/computing (length, checksums mismatched…)

process_artifact(artifact_path: Path)[source]#

Build the Content out of the remote artifact retrieved.

This needs to happen in this method because it’s within a context manager block.

process_data() bool[source]#

Build Snapshot out of the artifact retrieved.

store_data() None[source]#

Store newly retrieved Content and Snapshot.

visit_status()[source]#

Detailed visit status.

Defaults to logging a full visit.

class swh.loader.core.loader.BaseDirectoryLoader(*args, path_filter: ~typing.Callable[[bytes, bytes, ~typing.Iterable[bytes] | None], bool] = <function accept_all_paths>, dir_filter: None | str | ~typing.Callable[[bytes, bytes, ~typing.Iterable[bytes]], bool] = None, **kwargs)[source]#

Bases: NodeLoader

Abstract base Directory Loader for ‘tree’ ingestion (through any media).

Implementations should inherit from this class and provide the:

  • required fetch_artifact() method to retrieve the Directory (from the proper media protocol, e.g. git, svn, hg, …)

  • optional build_snapshot() method to build the Snapshot with the proper structure if the default is not enough.

visit_type: str = 'directory'#
path_filter(path: bytes, name: bytes, entries: Iterable[bytes] | None) bool[source]#
process_artifact(artifact_path: Path) None[source]#

Build the Directory and other DAG objects out of the remote artifact retrieved (self.artifact_path).

This needs to happen in this method because it’s within a context manager block.

build_snapshot() Snapshot[source]#

Build and return the snapshot to store in the archive.

By default, this builds the snapshot with the structure:

id: <bytes>
branches:
  HEAD:
    target_type: directory
    target: <directory-id>

Other directory loader implementations could override this method to build a more specific snapshot.

store_data() None[source]#

Store newly retrieved Content and Snapshot.

visit_status()[source]#

Detailed visit status.

Defaults to logging a full visit.

class swh.loader.core.loader.TarballDirectoryLoader(*args, path_filter: ~typing.Callable[[bytes, bytes, ~typing.Iterable[bytes] | None], bool] = <function accept_all_paths>, dir_filter: None | str | ~typing.Callable[[bytes, bytes, ~typing.Iterable[bytes]], bool] = None, **kwargs)[source]#

Bases: BaseDirectoryLoader

TarballDirectoryLoader in charge of ingesting Directory coming from a tarball.

visit_type: str = 'tarball-directory'#
fetch_artifact() Iterator[Path][source]#

Iterates over the mirror urls to find a directory packaged in a tarball.

Raises

NotFound if nothing is found; ValueError in case of any error when fetching/computing (length, checksums mismatched…)