swh.lister.pattern module#

class swh.lister.pattern.ListerStats(pages: 'int' = 0, origins: 'int' = 0)[source]#

Bases: object

pages: int = 0#
origins: int = 0#
dict() Dict[str, int][source]#
class swh.lister.pattern.Lister(scheduler: SchedulerInterface, url: str | None = None, instance: str | None = None, credentials: Dict[str, Dict[str, List[Dict[str, str]]]] | None = None, max_origins_per_page: int | None = None, max_pages: int | None = None, enable_origins: bool = True, with_github_session: bool = False, record_batch_size: int = 1000, first_visits_queue_prefix: str | None = None)[source]#

Bases: Generic[StateType, PageType]

The base class for a Software Heritage lister.

A lister scrapes a page by page list of origins from an upstream (a forge, the API of a package manager, …), and massages the results of that scrape into a list of origins that are recorded by the scheduler backend.

The main loop of the lister, run(), basically revolves around the get_pages() iterator, which sets up the lister state, then yields the scrape results page by page. The get_origins_from_page() method converts the pages into a list of model.ListedOrigin, sent to the scheduler at every page. The commit_page() method can be used to update the lister state after a page of origins has been recorded in the scheduler backend.

The finalize() method is called at lister teardown (whether the run has been successful or not) to update the local state object before it’s sent to the database. This method must set the updated attribute if an updated state needs to be sent to the scheduler backend. This method can call get_state_from_scheduler() to refresh and merge the lister state from the scheduler before it’s finalized (and potentially minimize the risk of race conditions between concurrent runs of the lister).

The state of the lister is serialized and deserialized from the dict stored in the scheduler backend, using the state_from_dict() and state_to_dict() methods.

Parameters:
  • scheduler – the instance of the Scheduler being used to register the origins listed by this lister

  • url – a URL representing this lister, e.g. the API’s base URL

  • instance – the instance name, to uniquely identify this lister instance, if not provided the URL network location will be used

  • credentials – dictionary of credentials for all listers. The first level identifies the LISTER_NAME, the second level the lister instance. The final level is a list of dicts containing the expected credentials for the given instance of that lister.

  • max_pages – the maximum number of pages listed in a full listing operation

  • max_origins_per_page – the maximum number of origins processed per page

  • enable_origins – whether the created origins should be enabled or not

  • record_batch_size – maximum number of records to flush to the scheduler at once.

Generic types:
  • StateType: concrete lister type; should usually be a dataclass for stricter typing

  • PageType: type of scrape results; can usually be a requests.Response, or a dict

LISTER_NAME: str = ''#
github_session: GitHubSession | None = None#
build_url(instance: str) str[source]#

Optionally build the forge url to list. When the url is not provided in the constructor, this method is called. This should compute the actual url to use to list the forge.

This is particularly useful for forges which uses an api. This simplifies the cli calls to use. They should then only provide the instance (its domain).

For example: - gitlab: https://{instance}/api/v4 - gitea: https://{instance}/api/v1 - …

http_request(url: str, method='GET', **kwargs) Response[source]#
run() ListerStats[source]#

Run the lister.

Returns:

A counter with the number of pages and origins seen for this run of the lister.

get_state_from_scheduler() StateType[source]#

Update the state in the current instance from the state in the scheduler backend.

This updates lister_obj, and returns its (deserialized) current state, to allow for comparison with the local state.

Returns:

the state retrieved from the scheduler backend

set_state_in_scheduler(with_listing_finished_date: bool = False, force_state: bool = False) None[source]#

Update the state in the scheduler backend from the state of the current instance.

Parameters:
  • with_listing_finished_date – Update the last_listing_finished_at column value for the lister in scheduler database if set to const:True.

  • force_state – Update lister state even when lister has updated attribute set to False, this is useful for tests

Raises:

swh.scheduler.exc.StaleData – in case of a race condition between concurrent listers (from swh.scheduler.Scheduler.update_lister()).

state_from_dict(d: Dict[str, Any]) StateType[source]#

Convert the state stored in the scheduler backend (as a dict), to the concrete StateType for this lister.

state_to_dict(state: StateType) Dict[str, Any][source]#

Convert the StateType for this lister to its serialization as dict for storage in the scheduler.

Values must be JSON-compatible as that’s what the backend database expects.

finalize() None[source]#

Custom hook to finalize the lister state before returning from the main loop.

This method must set updated if the lister has done some work.

If relevant, this method can use :meth`get_state_from_scheduler` to merge the current lister state with the one from the scheduler backend, reducing the risk of race conditions if we’re running concurrent listings.

This method is called in a finally block, which means it will also run when the lister fails.

get_pages() Iterator[PageType][source]#

Retrieve a list of pages of listed results. This is the main loop of the lister.

Returns:

an iterator of raw pages fetched from the platform currently being listed.

get_origins_from_page(page: PageType) Iterator[ListedOrigin][source]#

Extract a list of model.ListedOrigin from a raw page of results.

Parameters:

page – a single page of results

Returns:

an iterator for the origins present on the given page of results

commit_page(page: PageType) None[source]#

Custom hook called after the current page has been committed in the scheduler backend.

This method can be used to update the state after a page of origins has been successfully recorded in the scheduler backend. If the new state should be recorded at the point the lister completes, the updated attribute must be set.

send_origins(origins: Iterable[ListedOrigin]) List[str][source]#

Record the stream of valid model.ListedOrigin in the scheduler.

This will filter out invalid urls prior to record origins to the scheduler.

Returns:

the list of origin URLs recorded in scheduler database

classmethod from_config(scheduler: Dict[str, Any], **config: Any)[source]#

Instantiate a lister from a configuration dict.

This is basically a backwards-compatibility shim for the CLI.

Parameters:
  • scheduler – instantiation config for the scheduler

  • config – the configuration dict for the lister, with the following keys: - credentials (optional): credentials list for the scheduler - any other kwargs passed to the lister.

Returns:

the instantiated lister

classmethod from_configfile(**kwargs: Any)[source]#

Instantiate a lister from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None.

Parameters:

kwargs – kwargs passed to the lister instantiation

class swh.lister.pattern.StatelessLister(scheduler: SchedulerInterface, url: str | None = None, instance: str | None = None, credentials: Dict[str, Dict[str, List[Dict[str, str]]]] | None = None, max_origins_per_page: int | None = None, max_pages: int | None = None, enable_origins: bool = True, with_github_session: bool = False, record_batch_size: int = 1000, first_visits_queue_prefix: str | None = None)[source]#

Bases: Lister[None, PageType], Generic[PageType]

state_from_dict(d: Dict[str, Any]) None[source]#

Always return empty state

state_to_dict(state: None) Dict[str, Any][source]#

Always set empty state