swh.loader.core package

Submodules

swh.loader.core.converters module

Convert objects to dictionaries suitable for swh.storage

swh.loader.core.converters.content_for_storage(content, log=None, max_content_size=None, origin_url=None)[source]

Prepare content to be ready for storage

Note: - ‘data’ is returned only if max_content_size is not reached.

Returns:content with added data (or reason for being missing)

swh.loader.core.loader module

swh.loader.core.loader.send_in_packets(objects, sender, packet_size, packet_size_bytes=None)[source]

Send objects, using the sender, in packets of packet_size objects (and of max packet_size_bytes).

swh.loader.core.loader.retry_loading(error)[source]

Retry policy when the database raises an integrity error

class swh.loader.core.loader.BufferedLoader(logging_class=None, config=None)[source]

Bases: swh.core.config.SWHConfig

Mixin base class for loader.

To use this class, you must:

  • inherit from this class
  • and implement the @abstractmethod methods:
    • prepare(): First step executed by the loader to prepare some state needed by the func:load method.
    • get_origin(): Retrieve the origin that is currently being loaded.
    • fetch_data(): Fetch the data is actually the method to implement to compute data to inject in swh (through the store_data method)
    • store_data(): Store data fetched.
    • visit_status(): Explicit status of the visit (‘partial’ or ‘full’)
    • load_status(): Explicit status of the loading, for use by the scheduler (eventful/uneventful/temporary failure/permanent failure).
    • cleanup(): Last step executed by the loader.

The entry point for the resulting loader is load().

You can take a look at some example classes:

  • BaseSvnLoader
  • TarLoader
  • DirLoader
  • DebianLoader
CONFIG_BASE_FILENAME = None
DEFAULT_CONFIG = {'content_packet_size': ('int', 10000), 'content_packet_size_bytes': ('int', 104857600), 'directory_packet_size': ('int', 25000), 'occurrence_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'revision_packet_size': ('int', 100000), 'save_data': ('bool', False), 'save_data_path': ('str', ''), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_releases': ('bool', True), 'send_revisions': ('bool', True), 'send_snapshot': ('bool', True), 'storage': ('dict', {'cls': 'remote', 'args': {'url': 'http://localhost:5002/'}})}
ADDITIONAL_CONFIG = {}
__init__(logging_class=None, config=None)[source]

Initialize self. See help(type(self)) for accurate signature.

save_data()[source]

Save the data associated to the current load

get_save_data_path()[source]

The path to which we archive the loader’s raw data

send_origin(origin)[source]
send_origin_visit(visit_date)[source]
send_tool(tool)[source]
send_provider(provider)[source]
send_origin_metadata(visit_date, provider_id, tool_id, metadata)[source]
update_origin_visit(status)[source]
send_contents(content_list)[source]

Actually send properly formatted contents to the database.

send_directories(directory_list)[source]

Actually send properly formatted directories to the database.

send_revisions(revision_list)[source]

Actually send properly formatted revisions to the database.

send_releases(release_list)[source]

Actually send properly formatted releases to the database.

send_snapshot(snapshot)[source]
filter_missing_contents(contents)[source]

Return only the contents missing from swh

bulk_send_contents(contents)[source]

Format contents as swh contents and send them to the database.

filter_missing_directories(directories)[source]

Return only directories missing from swh

bulk_send_directories(directories)[source]

Send missing directories to the database

filter_missing_revisions(revisions)[source]

Return only revisions missing from swh

bulk_send_revisions(revisions)[source]

Send missing revisions to the database

filter_missing_releases(releases)[source]

Return only releases missing from swh

bulk_send_releases(releases)[source]

Send missing releases to the database

bulk_send_snapshot(snapshot)[source]

Send missing releases to the database

maybe_load_contents(contents)[source]

Load contents in swh-storage if need be.

maybe_load_directories(directories)[source]

Load directories in swh-storage if need be.

maybe_load_revisions(revisions)[source]

Load revisions in swh-storage if need be.

maybe_load_releases(releases)[source]

Load releases in swh-storage if need be.

maybe_load_snapshot(snapshot)[source]

Load the snapshot in swh-storage if need be.

send_batch_contents(contents)[source]

Send contents batches to the storage

send_batch_directories(directories)[source]

Send directories batches to the storage

send_batch_revisions(revisions)[source]

Send revisions batches to the storage

send_batch_releases(releases)[source]

Send releases batches to the storage

flush()[source]

Flush any potential dangling data not sent to swh-storage.

Bypass the maybe_load_* methods which awaits threshold reached signal. We actually want to store those as we are done loading.

prepare_metadata()[source]

First step for origin_metadata insertion, resolving the provider_id and the tool_id by fetching data from the storage or creating tool and provider on the fly if the data isn’t available

cleanup()[source]

Last step executed by the loader.

prepare_origin_visit(*args, **kwargs)[source]

First step executed by the loader to prepare origin and visit references. Set/update self.origin, and optionally self.origin_url, self.visit_date.

_store_origin_visit()[source]

Store origin and visit references. Sets the self.origin_visit and self.visit references.

prepare(*args, **kwargs)[source]

Second step executed by the loader to prepare some state needed by the loader.

get_origin()[source]

Get the origin that is currently being loaded. self.origin should be set in prepare_origin()

Returns:an origin ready to be sent to storage by origin_add_one().
Return type:dict
fetch_data()[source]
Fetch the data from the source the loader is currently loading
(ex: git/hg/svn/… repository).
Returns:a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading.
store_data()[source]

Store fetched data in the database.

Should call the maybe_load_xyz() methods, which handle the bundles sent to storage, rather than send directly.

store_metadata()[source]

Store fetched metadata in the database.

For more information, see implementation in DepositLoader.

load_status()[source]

Detailed loading status.

Defaults to logging an eventful load.

Returns: a dictionary that is eventually passed back as the task’s
result to the scheduler, allowing tuning of the task recurrence mechanism.
post_load(success=True)[source]

Permit the loader to do some additional actions according to status after the loading is done. The flag success indicates the loading’s status.

Defaults to doing nothing.

This is up to the implementer of this method to make sure this does not break.

Parameters:success (bool) – the success status of the loading
visit_status()[source]

Detailed visit status.

Defaults to logging a full visit.

pre_cleanup()[source]

As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues.

load(*args, **kwargs)[source]

Loading logic for the loader to follow:

__abstractmethods__ = frozenset({'prepare', 'store_data', 'prepare_origin_visit', 'cleanup', 'fetch_data'})
__module__ = 'swh.loader.core.loader'
_abc_cache = <_weakrefset.WeakSet object>
_abc_negative_cache = <_weakrefset.WeakSet object>
_abc_negative_cache_version = 111
_abc_registry = <_weakrefset.WeakSet object>
class swh.loader.core.loader.UnbufferedLoader(logging_class=None, config=None)[source]

Bases: swh.loader.core.loader.BufferedLoader

This base class is a pattern for unbuffered loaders.

UnbufferedLoader loaders are able to load all the data in one go. For example, the loader defined in swh-loader-git BulkUpdater.

For other loaders (stateful one, (e.g SWHSvnLoader), inherit directly from BufferedLoader.

ADDITIONAL_CONFIG = {}
__init__(logging_class=None, config=None)[source]

Initialize self. See help(type(self)) for accurate signature.

cleanup()[source]

Clean up an eventual state installed for computations.

has_contents()[source]

Checks whether we need to load contents

get_contents()[source]

Get the contents that need to be loaded

has_directories()[source]

Checks whether we need to load directories

get_directories()[source]

Get the directories that need to be loaded

has_revisions()[source]

Checks whether we need to load revisions

get_revisions()[source]

Get the revisions that need to be loaded

has_releases()[source]

Checks whether we need to load releases

get_releases()[source]

Get the releases that need to be loaded

get_snapshot()[source]

Get the snapshot that needs to be loaded

eventful()[source]

Whether the load was eventful

save_data()[source]

Save the data associated to the current load

flush()[source]

Unbuffered loader does not flush since it has no state to flush.

__abstractmethods__ = frozenset({'prepare', 'prepare_origin_visit', 'fetch_data'})
__module__ = 'swh.loader.core.loader'
_abc_cache = <_weakrefset.WeakSet object>
_abc_negative_cache = <_weakrefset.WeakSet object>
_abc_negative_cache_version = 111
_abc_registry = <_weakrefset.WeakSet object>
store_data()[source]

Store fetched data in the database.

Should call the maybe_load_xyz() methods, which handle the bundles sent to storage, rather than send directly.

swh.loader.core.queue module

class swh.loader.core.queue.QueuePerNbElements(max_nb_elements)[source]

Bases: object

Basic queue which holds the nb of elements it contains.

__init__(max_nb_elements)[source]

Initialize self. See help(type(self)) for accurate signature.

add(elements)[source]
pop()[source]
reset()[source]
__dict__ = mappingproxy({'pop': <function QueuePerNbElements.pop>, '__module__': 'swh.loader.core.queue', '__weakref__': <attribute '__weakref__' of 'QueuePerNbElements' objects>, '__init__': <function QueuePerNbElements.__init__>, '__dict__': <attribute '__dict__' of 'QueuePerNbElements' objects>, 'add': <function QueuePerNbElements.add>, '__doc__': 'Basic queue which holds the nb of elements it contains.\n\n ', 'reset': <function QueuePerNbElements.reset>})
__module__ = 'swh.loader.core.queue'
__weakref__

list of weak references to the object (if defined)

class swh.loader.core.queue.QueuePerSizeAndNbUniqueElements(max_nb_elements, max_size, key)[source]

Bases: object

Queue which permits to add unknown elements and holds the current size of the queue.

__init__(max_nb_elements, max_size, key)[source]

Initialize self. See help(type(self)) for accurate signature.

_add_element(e)[source]
add(elements)[source]
pop()[source]
reset()[source]
__dict__ = mappingproxy({'pop': <function QueuePerSizeAndNbUniqueElements.pop>, '__dict__': <attribute '__dict__' of 'QueuePerSizeAndNbUniqueElements' objects>, '__init__': <function QueuePerSizeAndNbUniqueElements.__init__>, '__doc__': 'Queue which permits to add unknown elements and holds the current\n size of the queue.\n\n ', 'add': <function QueuePerSizeAndNbUniqueElements.add>, '__module__': 'swh.loader.core.queue', '__weakref__': <attribute '__weakref__' of 'QueuePerSizeAndNbUniqueElements' objects>, '_add_element': <function QueuePerSizeAndNbUniqueElements._add_element>, 'reset': <function QueuePerSizeAndNbUniqueElements.reset>})
__module__ = 'swh.loader.core.queue'
__weakref__

list of weak references to the object (if defined)

class swh.loader.core.queue.QueuePerNbUniqueElements(max_nb_elements, key)[source]

Bases: object

Queue which permits to add unknown elements and knows the actual count of elements it held.

__init__(max_nb_elements, key)[source]

Initialize self. See help(type(self)) for accurate signature.

_add_element(e)[source]
add(elements)[source]
pop()[source]
reset()[source]
__dict__ = mappingproxy({'pop': <function QueuePerNbUniqueElements.pop>, '__dict__': <attribute '__dict__' of 'QueuePerNbUniqueElements' objects>, '__init__': <function QueuePerNbUniqueElements.__init__>, '__doc__': 'Queue which permits to add unknown elements and knows the actual\n count of elements it held.\n\n ', 'add': <function QueuePerNbUniqueElements.add>, '__module__': 'swh.loader.core.queue', '__weakref__': <attribute '__weakref__' of 'QueuePerNbUniqueElements' objects>, '_add_element': <function QueuePerNbUniqueElements._add_element>, 'reset': <function QueuePerNbUniqueElements.reset>})
__module__ = 'swh.loader.core.queue'
__weakref__

list of weak references to the object (if defined)

swh.loader.core.utils module

swh.loader.core.utils.clean_dangling_folders(dirpath, pattern_check, log=None)[source]
Clean up potential dangling temporary working folder rooted at
dirpath. Those folders must match a dedicated pattern and not belonging to a live pid.
Parameters:
  • dirpath (str) – Path to check for dangling files
  • pattern_check (str) – A dedicated pattern to check on first
  • directory (e.g swh.loader.mercurial., (level) –
  • swh.loader.svn.)
  • log (Logger) – Optional logger

Module contents