swh.loader.core.loader module#

class swh.loader.core.loader.BaseLoader(storage: StorageInterface, origin_url: str, logging_class: Optional[str] = None, save_data_path: Optional[str] = None, max_content_size: Optional[int] = None, lister_name: Optional[str] = None, lister_instance_name: Optional[str] = None, metadata_fetcher_credentials: Optional[Dict[str, Dict[str, List[Dict[str, str]]]]] = None, create_partial_snapshot: bool = False)[source]#

Bases: object

Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, …) or PackageLoader (e.g PyPI, Npm, CRAN, …)

A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/… package artifacts), ingests the contents/directories/revisions/releases/snapshot read from those artifacts and send them to the archive through the storage backend.

The main entry point for the loader is the load() function.

2 static methods (from_config(), from_configfile()) centralizes and eases the loader instantiation from either configuration dict or configuration file.

Some class examples:

  • SvnLoader

  • GitLoader

  • PyPILoader

  • NpmLoader

Parameters:
  • lister_name – Name of the lister which triggered this load. If provided, the loader will try to use the forge’s API to retrieve extrinsic metadata

  • lister_instance_name – Name of the lister instance which triggered this load. Must be None iff lister_name is, but it may be the empty string for listers with a single instance.

visit_type: str#
origin: Origin#
loaded_snapshot_id: Optional[bytes]#
parent_origins: Optional[List[Origin]]#

If the given origin is a “forge fork” (ie. created with the “Fork” button of GitHub-like forges), build_extrinsic_origin_metadata() sets this to a list of origins it was forked from; closest parent first.

classmethod from_config(storage: Dict[str, Any], **config: Any)[source]#

Instantiate a loader from a configuration dict.

This is basically a backwards-compatibility shim for the CLI.

Parameters:
  • storage – instantiation config for the storage

  • config – the configuration dict for the loader, with the following keys: - credentials (optional): credentials list for the scheduler - any other kwargs passed to the loader.

Returns:

the instantiated loader

classmethod from_configfile(**kwargs: Any)[source]#

Instantiate a loader from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None.

Parameters:

kwargs – kwargs passed to the loader instantiation

save_data() None[source]#

Save the data associated to the current load

get_save_data_path() str[source]#

The path to which we archive the loader’s raw data

flush() Dict[str, int][source]#

Flush any potential buffered data not sent to swh-storage. Returns the same value as swh.storage.interface.StorageInterface.flush().

cleanup() None[source]#

Last step executed by the loader.

prepare() None[source]#
Second step executed by the loader to prepare some state needed by

the loader.

Raises

NotFound exception if the origin to ingest is not found.

get_origin() Origin[source]#

Get the origin that is currently being loaded. self.origin should be set in prepare_origin()

Returns:

an origin ready to be sent to storage by origin_add().

Return type:

dict

fetch_data() bool[source]#
Fetch the data from the source the loader is currently loading

(ex: git/hg/svn/… repository).

Returns:

a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading.

process_data() bool[source]#

Run any additional processing between fetching and storing the data

Returns:

a value that is interpreted as a boolean. If True, fetch_data() needs to be called again to complete loading. Ignored if fetch_data() already returned False.

store_data() None[source]#

Store fetched and processed data in the storage.

This should call the storage.<object>_add methods, which handle the objects to store in the storage.

load_status() Dict[str, str][source]#

Detailed loading status.

Defaults to logging an eventful load.

Returns: a dictionary that is eventually passed back as the task’s

result to the scheduler, allowing tuning of the task recurrence mechanism.

post_load(success: bool = True) None[source]#

Permit the loader to do some additional actions according to status after the loading is done. The flag success indicates the loading’s status.

Defaults to doing nothing.

This is up to the implementer of this method to make sure this does not break.

Parameters:

success (bool) – the success status of the loading

visit_status() str[source]#

Detailed visit status.

Defaults to logging a full visit.

pre_cleanup() None[source]#

As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues.

build_partial_snapshot() Optional[Snapshot][source]#

When the loader is configured to serialize partial snapshot, this allows the loader to give an implementation that builds a partial snapshot. This is used when the ingestion is taking multiple calls to fetch_data() and store_data(). Ignored when the loader is not configured to serialize partial snapshot.

load() Dict[str, str][source]#

Loading logic for the loader to follow:

load_metadata_objects(metadata_objects: List[RawExtrinsicMetadata]) None[source]#
build_extrinsic_origin_metadata() List[RawExtrinsicMetadata][source]#

Builds a list of full RawExtrinsicMetadata objects, using a metadata fetcher returned by get_fetcher_classes().

statsd_timed(name: str, tags: Dict[str, Any] = {}) AbstractContextManager[source]#

Wrapper for swh.core.statsd.Statsd.timed(), which uses the standard metric name and tags for loaders.

statsd_timing(name: str, value: float, tags: Dict[str, Any] = {}) None[source]#

Wrapper for swh.core.statsd.Statsd.timing(), which uses the standard metric name and tags for loaders.

statsd_average(name: str, value: Union[int, float], tags: Dict[str, Any] = {}) None[source]#

Increments both {name}_sum (by the value) and {name}_count (by 1), allowing to prometheus to compute the average value over time.

class swh.loader.core.loader.NodeLoader(storage: StorageInterface, url: str, checksums: Dict[str, str], checksums_computation: str = 'standard', fallback_urls: Optional[List[str]] = None, **kwargs)[source]#

Bases: BaseLoader

Common class for ContentLoader and Directoryloader.

The “checksums” field is a dictionary of hex hashes on the object retrieved (content or directory). When “checksums_computation” is “standard”, that means the checksums are computed on the content of the remote file to retrieve itself (as unix cli allows, “sha1sum”, “sha256sum”, …). When “checksums_computation” is “nar”, the checks is delegated to the nix-store –dump command, it’s actually checksums on the content of the remote artifact retrieved. Other “checksums_computation” will raise UnsupportedChecksumComputation

The multiple “fallback” urls received are mirror urls only used to fetch the object if the main origin is no longer available. Those are not stored.

Ingestion is considered eventful on the first ingestion. Subsequent load of the same object should end up being an uneventful visit (matching snapshot).

prepare() None[source]#
Second step executed by the loader to prepare some state needed by

the loader.

Raises

NotFound exception if the origin to ingest is not found.

load_status() Dict[str, Any][source]#

Detailed loading status.

Defaults to logging an eventful load.

Returns: a dictionary that is eventually passed back as the task’s

result to the scheduler, allowing tuning of the task recurrence mechanism.

cleanup() None[source]#

Last step executed by the loader.

visit_type: str#
origin: Origin#
loaded_snapshot_id: Optional[bytes]#
parent_origins: Optional[List[Origin]]#

If the given origin is a “forge fork” (ie. created with the “Fork” button of GitHub-like forges), build_extrinsic_origin_metadata() sets this to a list of origins it was forked from; closest parent first.

class swh.loader.core.loader.ContentLoader(*args, **kwargs)[source]#

Bases: NodeLoader

Basic loader for edge case content ingestion.

The output snapshot is of the form:

id: <bytes>
branches:
  HEAD:
    target_type: content
    target: <content-id>
visit_type: str = 'content'#
fetch_data() bool[source]#

Retrieve the content file as a Content Object

process_data() bool[source]#

Build the snapshot out of the Content retrieved.

store_data() None[source]#

Store newly retrieved Content and Snapshot.

visit_status()[source]#

Detailed visit status.

Defaults to logging a full visit.

origin: Origin#
loaded_snapshot_id: Optional[bytes]#
parent_origins: Optional[List[Origin]]#

If the given origin is a “forge fork” (ie. created with the “Fork” button of GitHub-like forges), build_extrinsic_origin_metadata() sets this to a list of origins it was forked from; closest parent first.

snapshot: Optional[Snapshot]#
mirror_urls: List[str]#
class swh.loader.core.loader.DirectoryLoader(*args, **kwargs)[source]#

Bases: NodeLoader

Basic loader for edge case directory ingestion (through one tarball).

The output snapshot is of the form:

id: <bytes>
branches:
  HEAD:
    target_type: directory
    target: <directory-id>
visit_type: str = 'directory'#
fetch_data() bool[source]#

Fetch directory as a tarball amongst the self.mirror_urls.

Raises NotFound if no tarball is found

origin: Origin#
loaded_snapshot_id: Optional[bytes]#
parent_origins: Optional[List[Origin]]#

If the given origin is a “forge fork” (ie. created with the “Fork” button of GitHub-like forges), build_extrinsic_origin_metadata() sets this to a list of origins it was forked from; closest parent first.

snapshot: Optional[Snapshot]#
mirror_urls: List[str]#
process_data() bool[source]#

Build the snapshot out of the Directory retrieved.

store_data() None[source]#

Store newly retrieved Content and Snapshot.

visit_status()[source]#

Detailed visit status.

Defaults to logging a full visit.