swh.archiver package

Submodules

swh.archiver.checker module

class swh.archiver.checker.BaseContentChecker[source]

Bases: swh.core.config.SWHConfig

Abstract class of the content integrity checker.

This checker’s purpose is to iterate over the contents of a storage and check the integrity of each file. Behavior of the checker to deal with corrupted status will be specified by subclasses.

You should override the DEFAULT_CONFIG and CONFIG_BASE_FILENAME variables if you need it.

DEFAULT_CONFIG = {'batch_size': ('int', 1000), 'log_tag': ('str', 'objstorage.checker'), 'storage': ('dict', {'args': {'slicing': '0:2/2:4/4:6', 'root': '/srv/softwareheritage/objects'}, 'cls': 'pathslicing'})}
CONFIG_BASE_FILENAME = 'objstorage/objstorage_checker'
run_as_daemon()[source]

Start the check routine and perform it forever.

Use this method to run the checker as a daemon that will iterate over the content forever in background.

run()[source]

Check a batch of content.

corrupted_content(obj_id)[source]

Perform an action to treat with a corrupted content.

missing_content(obj_id)[source]

Perform an action to treat with a missing content.

class swh.archiver.checker.LogContentChecker[source]

Bases: swh.archiver.checker.BaseContentChecker

Content integrity checker that just log detected errors.

CONFIG_BASE_FILENAME = 'objstorage/log_checker'
corrupted_content(obj_id)[source]

Perform an action to treat with a corrupted content.

missing_content(obj_id)[source]

Perform an action to treat with a missing content.

class swh.archiver.checker.RepairContentChecker[source]

Bases: swh.archiver.checker.LogContentChecker

Content integrity checker that will try to restore contents.

DEFAULT_CONFIG = {'backup_storages': ('dict', {'banco': {'args': {'url': 'http://banco:5003/'}, 'cls': 'remote'}}), 'batch_size': ('int', 1000), 'log_tag': ('str', 'objstorage.checker'), 'storage': ('dict', {'args': {'slicing': '0:2/2:4/4:6', 'root': '/srv/softwareheritage/objects'}, 'cls': 'pathslicing'})}
CONFIG_BASE_FILENAME = 'objstorage/repair_checker'
corrupted_content(obj_id)[source]

Perform an action to treat with a corrupted content.

missing_content(obj_id)[source]

Perform an action to treat with a missing content.

class swh.archiver.checker.ArchiveNotifierContentChecker[source]

Bases: swh.archiver.checker.LogContentChecker

Implementation of the checker that will update the archiver database

Once the database is updated the archiver may restore the content on it’s next scheduling as it won’t be present anymore, and this status change will probably make the retention policy invalid.

DEFAULT_CONFIG = {'batch_size': ('int', 1000), 'dbconn': ('str', 'dbname=softwareheritage-archiver-dev'), 'log_tag': ('str', 'objstorage.checker'), 'storage': ('dict', {'args': {'slicing': '0:2/2:4/4:6', 'root': '/srv/softwareheritage/objects'}, 'cls': 'pathslicing'}), 'storage_name': ('str', 'banco')}
CONFIG_BASE_FILENAME = 'objstorage/archive_notifier_checker'
corrupted_content(obj_id)[source]

Perform an action to treat with a corrupted content.

missing_content(obj_id)[source]

Perform an action to treat with a missing content.

swh.archiver.copier module

class swh.archiver.copier.ArchiverCopier(source, destination, content_ids)[source]

Bases: object

This archiver copy some files into a remote objstorage in order to get a backup.

run()[source]

Do the copy on the backup storage.

Run the archiver copier in order to copy the required content into the current destination. The content which corresponds to the sha1 in self.content_ids will be fetched from the master_storage and then copied into the backup object storage.

Returns:A boolean that indicates if the whole content have been copied.

swh.archiver.db module

swh.archiver.db.utcnow()[source]
class swh.archiver.db.ArchiverDb(conn, pool=None)[source]

Bases: swh.storage.db.BaseDb

Proxy to the SWH’s archiver DB

archive_ls(cur=None)[source]

Get all the archives registered on the server.

Yields:a tuple (server_id, server_url) for each archive server.
content_archive_get(content_id, cur=None)[source]

Get the archival status of a content in a specific server.

Retrieve from the database the archival status of the given content in the given archive server.

Parameters:content_id – the sha1 of the content.
Yields:A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime.
content_archive_get_copies(last_content=None, limit=1000, cur=None)[source]
Get the list of copies for limit contents starting after
last_content.
Parameters:
  • last_content – sha1 of the last content retrieved. May be None to start at the beginning.
  • limit – number of contents to retrieve. Can be None to retrieve all objects (will be slow).
Yields:

A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime.

content_archive_get_unarchived_copies(retention_policy, last_content=None, limit=1000, cur=None)[source]
Get the list of copies for limit contents starting after
last_content. Yields only copies with number of present smaller than retention policy.
Parameters:
  • last_content – sha1 of the last content retrieved. May be None to start at the beginning.
  • retention_policy – number of required present copies
  • limit – number of contents to retrieve. Can be None to retrieve all objects (will be slow).
Yields:

A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime.

mktemp_content_archive(cur=None)[source]

Trigger the creation of the temporary table tmp_content_archive during the lifetime of the transaction.

content_archive_add_from_temp(cur=None)[source]

Add new content archive entries from temporary table.

Use from archiver.storage module:

db.mktemp_content_archive(cur)
# copy data over to the temp table
db.copy_to([{'colname': id0}, {'colname': id1}],
           'tmp_cache_content',
           ['colname'], cur)
# insert into the main table
db.add_content_archive_from_temp(cur)
content_archive_get_missing(backend_name, cur=None)[source]

Retrieve the content missing from backend_name.

content_archive_get_unknown(cur=None)[source]

Retrieve unknown sha1 from archiver db.

content_archive_update(content_id, archive_id, new_status=None, cur=None)[source]

Update the status of an archive content and set its mtime to

Change the mtime of an archived content for the given archive and set it’s mtime to the current time.

Parameters:
  • content_id (str) – content sha1
  • archive_id (str) – name of the archive
  • new_status (str) – one of ‘missing’, ‘present’ or ‘ongoing’. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive.

swh.archiver.director module

class swh.archiver.director.ArchiverDirectorBase[source]

Bases: swh.core.config.SWHConfig

Abstract Director class

An archiver director is in charge of dispatching batch of contents to archiver workers (for them to archive).

Inherit from this class and provide:

  • ADDITIONAL_CONFIG: Some added configuration needed for the director to work
  • CONFIG_BASE_FILENAME: relative path to lookup for the configuration file
  • def get_contents_to_archive(self): Implementation method to read contents to archive
DEFAULT_CONFIG = {'archiver_storage': ('dict', {'args': {'dbconn': 'dbname=softwareheritage-archiver-dev user=guest'}, 'cls': 'db'}), 'asynchronous': ('bool', True), 'batch_max_size': ('int', 1500), 'max_queue_length': ('int', 100000), 'queue_throttling_delay': ('int', 120)}
ADDITIONAL_CONFIG = {}
CONFIG_BASE_FILENAME = 'archiver/worker'
TASK_NAME = None
run()[source]

Run the archiver director.

The archiver director will check all the contents of the archiver database and do the required backup jobs.

run_async_worker(batch)[source]

Produce a worker that will be added to the task queue.

run_sync_worker(batch)[source]

Run synchronously a worker on the given batch.

read_batch_contents()[source]

Create batch of contents that needs to be archived

Yields:batch of sha1 that corresponds to contents that needs more archive copies.
get_contents_to_archive()[source]

Retrieve generator of sha1 to archive

Yields:sha1 to archive
class swh.archiver.director.ArchiverWithRetentionPolicyDirector(start_id)[source]

Bases: swh.archiver.director.ArchiverDirectorBase

Process the files in order to know which one is needed as backup.

The archiver director processes the files in the local storage in order to know which one needs archival and it delegates this task to archiver workers.

ADDITIONAL_CONFIG = {'retention_policy': ('int', 2)}
TASK_NAME = 'swh.archiver.tasks.SWHArchiverWithRetentionPolicyTask'
get_contents_to_archive()[source]

Create batch of contents that needs to be archived

Yields:Datas about a content as a tuple (content_id, present_copies, ongoing_copies) where ongoing_copies is a dict mapping copy to mtime.
swh.archiver.director.read_sha1_from_stdin()[source]

Read sha1 from stdin.

class swh.archiver.director.ArchiverStdinToBackendDirector[source]

Bases: swh.archiver.director.ArchiverDirectorBase

A cloud archiver director in charge of reading contents and send them in batch in the cloud.

The archiver director, in order:

  • Reads sha1 to send to a specific backend.
  • Checks if those sha1 are known in the archiver. If they are not, add them
  • if the sha1 are missing, they are sent for the worker to archive

If the flag force_copy is set, this will force the copy to be sent for archive even though it has already been done.

ADDITIONAL_CONFIG = {'destination': ('str', 'azure'), 'force_copy': ('bool', False), 'source': ('str', 'uffizi'), 'storages': ('list[dict]', [{'host': 'uffizi', 'cls': 'pathslicing', 'args': {'slicing': '0:2/2:4/4:6', 'root': '/tmp/softwareheritage/objects'}}, {'host': 'banco', 'cls': 'remote', 'args': {'base_url': 'http://banco:5003/'}}])}
CONFIG_BASE_FILENAME = 'archiver/worker-to-backend'
TASK_NAME = 'swh.archiver.tasks.SWHArchiverToBackendTask'
get_contents_to_archive()[source]

Retrieve generator of sha1 to archive

Yields:sha1 to archive
run_async_worker(batch)[source]

Produce a worker that will be added to the task queue.

run_sync_worker(batch)[source]

Run synchronously a worker on the given batch.

swh.archiver.storage module

class swh.archiver.storage.ArchiverStorage(dbconn)[source]

Bases: object

SWH Archiver storage proxy, encompassing DB

get_db()[source]
archive_ls(db=None, cur=None)[source]

Get all the archives registered on the server.

Yields:a tuple (server_id, server_url) for each archive server.
content_archive_get(content_id, db=None, cur=None)[source]

Get the archival status of a content.

Retrieve from the database the archival status of the given content

Parameters:content_id – the sha1 of the content
Yields:A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime.
content_archive_get_copies(last_content=None, limit=1000, db=None, cur=None)[source]
Get the list of copies for limit contents starting after
last_content.
Parameters:
  • last_content – sha1 of the last content retrieved. May be None to start at the beginning.
  • limit – number of contents to retrieve. Can be None to retrieve all objects (will be slow).
Yields:

A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime.

content_archive_get_unarchived_copies(retention_policy, last_content=None, limit=1000, db=None, cur=None)[source]
Get the list of copies for limit contents starting after
last_content. Yields only copies with number of present smaller than retention policy.
Parameters:
  • last_content – sha1 of the last content retrieved. May be None to start at the beginning.
  • retention_policy – number of required present copies
  • limit – number of contents to retrieve. Can be None to retrieve all objects (will be slow).
Yields:

A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime.

content_archive_get_missing(content_ids, backend_name, db=None, cur=None)[source]

Retrieve missing sha1s from source_name.

Parameters:
  • content_ids ([sha1s]) – list of sha1s to test
  • source_name (str) – Name of the backend to check for content
Yields:

missing sha1s from backend_name

content_archive_get_unknown(content_ids, db=None, cur=None)[source]

Retrieve unknown sha1s from content_archive.

Parameters:content_ids ([sha1s]) – list of sha1s to test
Yields:Unknown sha1s from content_archive
content_archive_update(content_id, archive_id, new_status=None, db=None, cur=None)[source]

Update the status of an archive content and set its mtime to now

Change the mtime of an archived content for the given archive and set it’s mtime to the current time.

Parameters:
  • content_id (str) – content sha1
  • archive_id (str) – name of the archive
  • new_status (str) – one of ‘missing’, ‘present’ or ‘ongoing’. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive.
content_archive_add(content_ids, sources_present, db=None, cur=None)[source]

Insert a new entry in db about content_id.

Parameters:
  • content_ids ([bytes|str]) – content identifiers
  • sources_present ([str]) – List of source names where contents are present
class swh.archiver.storage.StubArchiverStorage(archives, present, missing, logfile_base)[source]

Bases: object

open_logfile()[source]
close_logfile()[source]
archive_ls(cur=None)[source]

Get all the archives registered on the server.

Yields:a tuple (server_id, server_url) for each archive server.
content_archive_get(content_id, cur=None)[source]

Get the archival status of a content.

Retrieve from the database the archival status of the given content

Parameters:content_id – the sha1 of the content
Yields:A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime.
content_archive_get_copies(last_content=None, limit=1000, cur=None)[source]
Get the list of copies for limit contents starting after
last_content.
Parameters:
  • last_content – sha1 of the last content retrieved. May be None to start at the beginning.
  • limit – number of contents to retrieve. Can be None to retrieve all objects (will be slow).
Yields:

A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime.

content_archive_get_unarchived_copies(retention_policy, last_content=None, limit=1000, cur=None)[source]
Get the list of copies for limit contents starting after
last_content. Yields only copies with number of present smaller than retention policy.
Parameters:
  • last_content – sha1 of the last content retrieved. May be None to start at the beginning.
  • retention_policy – number of required present copies
  • limit – number of contents to retrieve. Can be None to retrieve all objects (will be slow).
Yields:

A tuple (content_id, present_copies, ongoing_copies), where ongoing_copies is a dict mapping copy to mtime.

content_archive_get_missing(content_ids, backend_name, cur=None)[source]

Retrieve missing sha1s from source_name.

Parameters:
  • content_ids ([sha1s]) – list of sha1s to test
  • source_name (str) – Name of the backend to check for content
Yields:

missing sha1s from backend_name

content_archive_get_unknown(content_ids, cur=None)[source]

Retrieve unknown sha1s from content_archive.

Parameters:content_ids ([sha1s]) – list of sha1s to test
Yields:Unknown sha1s from content_archive
content_archive_update(content_id, archive_id, new_status=None, cur=None)[source]

Update the status of an archive content and set its mtime to now

Change the mtime of an archived content for the given archive and set it’s mtime to the current time.

Parameters:
  • content_id (str) – content sha1
  • archive_id (str) – name of the archive
  • new_status (str) – one of ‘missing’, ‘present’ or ‘ongoing’. this status will replace the previous one. If not given, the function only change the mtime of the content for the given archive.
content_archive_add(content_ids, sources_present, cur=None)[source]

Insert a new entry in db about content_id.

Parameters:
  • content_ids ([bytes|str]) – content identifiers
  • sources_present ([str]) – List of source names where contents are present
swh.archiver.storage.get_archiver_storage(cls, args)[source]

Instantiate an archiver database with the proper class and arguments

swh.archiver.tasks module

class swh.archiver.tasks.SWHArchiverWithRetentionPolicyTask[source]

Bases: swh.scheduler.task.Task

Main task that archive a batch of content.

task_queue = 'swh_storage_archive_worker'
run_task(*args, **kwargs)[source]

Perform the task.

Must return a json-serializable value as it is passed back to the task scheduler using a celery event.

ignore_result = False
rate_limit = None
reject_on_worker_lost = None
request_stack = <celery.utils.threads._LocalStack object>
serializer = 'json'
store_errors_even_if_ignored = False
track_started = False
typing = True
class swh.archiver.tasks.SWHArchiverToBackendTask[source]

Bases: swh.scheduler.task.Task

Main task that archive a batch of content in the cloud.

task_queue = 'swh_storage_archive_worker_to_backend'
run_task(*args, **kwargs)[source]

Perform the task.

Must return a json-serializable value as it is passed back to the task scheduler using a celery event.

ignore_result = False
rate_limit = None
reject_on_worker_lost = None
request_stack = <celery.utils.threads._LocalStack object>
serializer = 'json'
store_errors_even_if_ignored = False
track_started = False
typing = True

swh.archiver.updater module

swh.archiver.worker module

class swh.archiver.worker.BaseArchiveWorker(batch)[source]

Bases: swh.core.config.SWHConfig

Base archive worker.

Inherit from this class and override:

  • ADDITIONAL_CONFIG: Some added configuration needed for the director to work
  • CONFIG_BASE_FILENAME: relative path to lookup for the configuration file
  • def need_archival(self, content_data): Determine if a content needs archival or not
  • def choose_backup_servers(self, present, missing): Choose which backup server to send copies to
DEFAULT_CONFIG = {'archiver_storage': ('dict', {'args': {'dbconn': 'dbname=softwareheritage-archiver-dev user=guest'}, 'cls': 'db'}), 'storages': ('list[dict]', [{'host': 'uffizi', 'cls': 'pathslicing', 'args': {'slicing': '0:2/2:4/4:6', 'root': '/tmp/softwareheritage/objects'}}, {'host': 'banco', 'cls': 'remote', 'args': {'base_url': 'http://banco:5003/'}}])}
ADDITIONAL_CONFIG = {}
CONFIG_BASE_FILENAME = 'archiver/worker'
objstorages = {}
run()[source]

Do the task expected from the archiver worker.

Process the contents in self.batch, ensure that the elements still need an archival (using archiver db), and spawn copiers to copy files in each destination according to the archiver-worker’s policy.

compute_copies(set_objstorages, content_id)[source]

From a content_id, return present and missing copies.

Parameters:
  • objstorages (set) – objstorage’s id name
  • content_id – the content concerned
Returns:

A dictionary with the following keys:

  • present: set of archives where the content is present
  • missing: set of archives where the content is missing
  • ongoing: dict mapping the archive id with the time the copy supposedly started.

Return type:

dict

run_copier(source, destination, content_ids)[source]

Run a copier in order to archive the given contents.

Upload the given contents from the source to the destination. If the process fails, the whole content is considered uncopied and remains ‘ongoing’, waiting to be rescheduled as there is a delay.

Parameters:
  • source (str) – source storage’s identifier
  • destination (str) – destination storage’s identifier
  • content_ids ([sha1]) – list of content ids to archive.
copy_finished(content_ids)[source]

Hook to notify the content_ids archive copy is finished. (This is not an abstract method as this is optional

get_contents_error(content_ids, source_storage)[source]

Indicates what is the error associated to a content when needed

Check the given content on the given storage. If an error is detected, it will be reported through the returned dict.

Parameters:
  • content_ids ([sha1]) – list of content ids to check
  • source_storage (str) – the source storage holding the
  • to check. (contents) –
Returns:

a dict that map {content_id -> error_status} for each content_id with an error. The error_status result may be ‘missing’ or ‘corrupted’.

need_archival(content_data)[source]

Indicate if the content needs to be archived.

Parameters:content_data (dict) – dict that contains two lists ‘present’ and ‘missing’ with copies id corresponding to this status.
Returns:True if there is not enough copies, False otherwise.
choose_backup_servers(present, missing)[source]

Choose and yield the required amount of couple source/destination

For each required copy, choose a unique destination server among the missing copies and a source server among the presents.

Parameters:
  • present (is) – set of objstorage source name where the content
  • present
  • missing – set of objstorage destination name where the
  • is missing (content) –
Yields:

tuple (source (str), destination (src)) for each required copy.

class swh.archiver.worker.ArchiverWithRetentionPolicyWorker(batch)[source]

Bases: swh.archiver.worker.BaseArchiveWorker

Do the required backups on a given batch of contents.

Process the content of a content batch in order to do the needed backups on the slaves servers.

ADDITIONAL_CONFIG = {'archival_max_age': ('int', 3600), 'retention_policy': ('int', 2), 'sources': ('list[str]', ['uffizi', 'banco'])}
need_archival(content_data)[source]

Indicate if the content need to be archived.

Parameters:content_data (dict) – dict that contains two lists ‘present’ and ‘missing’ with copies id corresponding to this status.

Returns: True if there is not enough copies, False otherwise.

choose_backup_servers(present, missing)[source]

Choose and yield the required amount of couple source/destination

For each required copy, choose a unique destination server among the missing copies and a source server among the presents.

Each destination server is unique so after archival, the retention policy requirement will be fulfilled. However, the source server may be used multiple times.

Parameters:
  • present (is) – set of objstorage source name where the content
  • present
  • missing – set of objstorage destination name where the
  • is missing (content) –
Yields:

tuple (source, destination) for each required copy.

class swh.archiver.worker.ArchiverToBackendWorker(destination, batch)[source]

Bases: swh.archiver.worker.BaseArchiveWorker

Worker that sends copies over from a source to another backend.

Process the content of a content batch from source objstorage to destination objstorage.

CONFIG_BASE_FILENAME = 'archiver/worker-to-backend'
ADDITIONAL_CONFIG = {'next_task': ('dict', {'batch_size': 10, 'name': 'swh.indexer.tasks.SWHOrchestratorAllContentsTask'})}
need_archival(content_data)[source]

Indicate if the content needs to be archived.

Parameters:
  • content_data (dict) – dict that contains 3 lists ‘present’,
  • and 'missing' with copies id corresponding to ('ongoing') –
  • status. (this) –
Returns:

True if we need to archive, False otherwise

choose_backup_servers(present, missing)[source]

The destination is fixed to the destination mentioned.

The only variable here is the source of information that we choose randomly in ‘present’.

Parameters:
  • present (is) – set of objstorage source name where the content
  • present
  • missing – set of objstorage destination name where the
  • is missing (content) –
Yields:

tuple (source, destination) for each required copy.

copy_finished(content_ids)[source]

Once the copy is finished, we’ll send those batch of contents as done in the destination queue.

Module contents