swh.indexer.indexer module

swh.indexer.indexer.write_to_temp(filename: str, data: bytes, working_directory: str) → Iterator[str][source]

Write the sha1’s content in a temporary file.

Parameters
  • filename – one of sha1’s many filenames

  • data – the sha1’s content to write in temporary file

  • working_directory – the directory into which the file is written

Returns

The path to the temporary file created. That file is filled in with the raw content’s data.

class swh.indexer.indexer.BaseIndexer(config=None, **kw)[source]

Bases: swh.core.config.SWHConfig

Base class for indexers to inherit from.

The main entry point is the run() function which is in charge of triggering the computations on the batch dict/ids received.

Indexers can:

  • filter out ids whose data has already been indexed.

  • retrieve ids data from storage or objstorage

  • index this data depending on the object and store the result in storage.

To implement a new object type indexer, inherit from the BaseIndexer and implement indexing:

run():

object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin

To implement a new concrete indexer, inherit from the object level classes: ContentIndexer, RevisionIndexer, OriginIndexer.

Then you need to implement the following functions:

filter():

filter out data already indexed (in storage).

index_object():

compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation.

persist_index_computations():

persist the results of multiple index computations in the storage.

The new indexer implementation can also override the following functions:

prepare():

Configuration preparation for the indexer. When overriding, this must call the super().prepare() instruction.

check():

Configuration check for the indexer. When overriding, this must call the super().check() instruction.

register_tools():

This should return a dict of the tool(s) to use when indexing or filtering.

results: List[Dict]
CONFIG = 'indexer/base'
DEFAULT_CONFIG = {'indexer_storage': ('dict', {'cls': 'remote', 'args': {'url': 'http://localhost:5007/'}}), 'objstorage': ('dict', {'cls': 'remote', 'args': {'url': 'http://localhost:5003/'}}), 'storage': ('dict', {'cls': 'remote', 'args': {'url': 'http://localhost:5002/'}})}
ADDITIONAL_CONFIG = {}
USE_TOOLS = True
catch_exceptions = True

Prevents exceptions in index() from raising too high. Set to False in tests to properly catch all exceptions.

scheduler: Any
prepare() → None[source]

Prepare the indexer’s needed runtime configuration. Without this step, the indexer cannot possibly run.

property tool
check() → None[source]

Check the indexer’s configuration is ok before proceeding. If ok, does nothing. If not raise error.

register_tools(tools: Union[Dict[str, Any], List[Dict[str, Any]]]) → List[Dict[str, Any]][source]

Permit to register tools to the storage.

Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool)

Expects the self.config[‘tools’] property to be set with one or more tools.

Parameters

tools – Either a dict or a list of dict.

Returns

List of dicts with additional id key.

Return type

list

Raises

ValueError – if not a list nor a dict.

index(id: Union[bytes, Dict, swh.model.model.Revision], data: Optional[bytes] = None, **kwargs) → Dict[str, Any][source]

Index computation for the id and associated raw data.

Parameters
  • id – identifier or Dict object

  • data – id’s data from storage or objstorage depending on object type

Returns

a dict that makes sense for the persist_index_computations() method.

Return type

dict

filter(ids: List[bytes]) → Iterator[bytes][source]

Filter missing ids for that particular indexer.

Parameters

ids – list of ids

Yields

iterator of missing ids

abstract persist_index_computations(results, policy_update) → Dict[str, int][source]

Persist the computation resulting from the index.

Parameters
  • results ([result]) – List of results. One result is the result of the index function.

  • policy_update ([str]) – either ‘update-dups’ or ‘ignore-dups’ to respectively update duplicates or ignore them

Returns

a summary dict of what has been inserted in the storage

class swh.indexer.indexer.ContentIndexer(config=None, **kw)[source]

Bases: swh.indexer.indexer.BaseIndexer

A content indexer working on a list of ids directly.

To work on indexer partition, use the ContentPartitionIndexer instead.

Note: ContentIndexer is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the BaseIndexer class.

run(ids: Union[List[bytes], bytes, str], policy_update: str, **kwargs) → Dict[source]

Given a list of ids:

  • retrieve the content from the storage

  • execute the indexing computations

  • store the results (according to policy_update)

Parameters
  • ids (Iterable[Union[bytes, str]]) – sha1’s identifier list

  • policy_update (str) – either ‘update-dups’ or ‘ignore-dups’ to respectively update duplicates or ignore them

  • **kwargs – passed to the index method

Returns

A summary Dict of the task’s status

results: List[Dict]
scheduler: Any
class swh.indexer.indexer.ContentPartitionIndexer(config=None, **kw)[source]

Bases: swh.indexer.indexer.BaseIndexer

A content partition indexer.

This expects as input a partition_id and a nb_partitions. This will then index the contents within that partition.

To work on a list of ids, use the ContentIndexer instead.

Note: ContentPartitionIndexer is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the BaseIndexer class.

abstract indexed_contents_in_partition(partition_id: int, nb_partitions: int, page_token: Optional[str] = None)swh.core.api.classes.PagedResult[bytes, str][source]

Retrieve indexed contents within range [start, end].

Parameters
  • partition_id – Index of the partition to fetch

  • nb_partitions – Total number of partitions to split into

  • page_token – opaque token used for pagination

Returns

PagedResult of Sha1. If next_page_token is None, there is no more data to fetch

run(partition_id: int, nb_partitions: int, skip_existing: bool = True, **kwargs) → Dict[source]

Given a partition of content ids, index the contents within.

Either the indexer is incremental (filter out existing computed data) or it computes everything from scratch.

Parameters
  • partition_id – Index of the partition to fetch

  • nb_partitions – Total number of partitions to split into

  • skip_existing – Skip existing indexed data (default) or not

  • **kwargs – passed to the index method

Returns

dict with the indexing task status

results: List[Dict]
scheduler: Any
class swh.indexer.indexer.OriginIndexer(config=None, **kw)[source]

Bases: swh.indexer.indexer.BaseIndexer

An object type indexer, inherits from the BaseIndexer and implements Origin indexing using the run method

Note: the OriginIndexer is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the BaseIndexer class.

run(origin_urls: List[str], policy_update: str = 'update-dups', **kwargs) → Dict[source]

Given a list of origin urls:

  • retrieve origins from storage

  • execute the indexing computations

  • store the results (according to policy_update)

Parameters
  • origin_urls – list of origin urls.

  • policy_update – either ‘update-dups’ or ‘ignore-dups’ to respectively update duplicates (default) or ignore them

  • **kwargs – passed to the index method

index_list(origins: List[Any], **kwargs: Any) → List[Dict][source]
results: List[Dict]
scheduler: Any
class swh.indexer.indexer.RevisionIndexer(config=None, **kw)[source]

Bases: swh.indexer.indexer.BaseIndexer

An object type indexer, inherits from the BaseIndexer and implements Revision indexing using the run method

Note: the RevisionIndexer is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the BaseIndexer class.

results: List[Dict]
scheduler: Any
run(ids: Union[str, bytes], policy_update: str) → Dict[source]

Given a list of sha1_gits:

  • retrieve revisions from storage

  • execute the indexing computations

  • store the results (according to policy_update)

Parameters
  • ids – sha1_git’s identifier list

  • policy_update – either ‘update-dups’ or ‘ignore-dups’ to respectively update duplicates or ignore them