Source code for

# Copyright (C) 2021  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""This module implements a model of the frequency of updates of an origin
and how long it takes to load it.

For each origin, a commit frequency is chosen deterministically based on the
hash of its URL and assume all origins were created on an arbitrary epoch.
From this we compute a number of commits, that is the product of these two.

And the run time of a load task is approximated as proportional to the number
of commits since the previous visit of the origin (possibly 0)."""

from datetime import datetime, timedelta, timezone
import hashlib
import logging
from typing import Dict, Generator, Iterator, List, Optional, Tuple
import uuid

import attr
from simpy import Event

from swh.model.model import OriginVisitStatus
from swh.scheduler.model import ListedOrigin

from .common import Environment, Queue, Task, TaskEvent

logger = logging.getLogger(__name__)

_nb_generated_origins = 0
_visit_times: Dict[Tuple[str, str], datetime] = {}
"""Cache of the time of the last visit of (visit_type, origin_url),
to spare an SQL query (high latency)."""

[docs] def generate_listed_origin( lister_id: uuid.UUID, now: Optional[datetime] = None ) -> ListedOrigin: """Returns a globally unique new origin. Seed the `last_update` value according to the OriginModel and the passed timestamp. Arguments: lister: instance of the lister that generated this origin now: time of listing, to emulate last_update (defaults to :func:``) """ global _nb_generated_origins _nb_generated_origins += 1 assert _nb_generated_origins < 10**6, "Too many origins!" if now is None: now = url = f"{_nb_generated_origins:06d}.git" visit_type = "test-git" origin = OriginModel(visit_type, url) return ListedOrigin( lister_id=lister_id, url=url, visit_type=visit_type, last_update=origin.get_last_update(now), )
[docs] class OriginModel: MIN_RUN_TIME = 0.5 """Minimal run time for a visit (retrieved from production data)""" MAX_RUN_TIME = 7200 """Max run time for a visit""" PER_COMMIT_RUN_TIME = 0.1 """Run time per commit""" EPOCH = datetime(2015, 9, 1, 0, 0, 0, tzinfo=timezone.utc) """The origin of all origins (at least according to Software Heritage)""" def __init__(self, type: str, origin: str): self.type = type self.origin = origin
[docs] def seconds_between_commits(self): """Returns a random 'average time between two commits' of this origin, used to estimate the run time of a load task, and how much the loading architecture is lagging behind origin updates.""" n_bytes = 2 num_buckets = 2 ** (8 * n_bytes) # Deterministic seed to generate "random" characteristics of this origin bucket = int.from_bytes( hashlib.md5(self.origin.encode()).digest()[0:n_bytes], "little" ) # minimum: 1 second (bucket == 0) # max: 10 years (bucket == num_buckets - 1) ten_y = 10 * 365 * 24 * 3600 return ten_y ** (bucket / num_buckets)
# return 1 + (ten_y - 1) * (bucket / (num_buckets - 1))
[docs] def get_last_update(self, now: datetime) -> datetime: """Get the last_update value for this origin. We assume that the origin had its first commit at `EPOCH`, and that one commit happened every `self.seconds_between_commits()`. This returns the last commit date before or equal to `now`. """ _, time_since_last_commit = divmod( (now - self.EPOCH).total_seconds(), self.seconds_between_commits() ) return now - timedelta(seconds=time_since_last_commit)
[docs] def get_current_snapshot_id(self, now: datetime) -> bytes: """Get the current snapshot for this origin. To generate a snapshot id, we calculate the number of commits since the EPOCH, and hash it alongside the origin type and url. """ commits_since_epoch, _ = divmod( (now - self.EPOCH).total_seconds(), self.seconds_between_commits() ) return hashlib.sha1( f"{self.type} {self.origin} {commits_since_epoch}".encode() ).digest()
[docs] def load_task_characteristics( self, now: datetime ) -> Tuple[float, str, Optional[bytes]]: """Returns the (run_time, end_status, snapshot id) of the next origin visit.""" current_snapshot = self.get_current_snapshot_id(now) key = (self.type, self.origin) last_visit = _visit_times.get(key, now - timedelta(days=365)) time_since_last_successful_run = now - last_visit _visit_times[key] = now seconds_between_commits = self.seconds_between_commits() seconds_since_last_successful = time_since_last_successful_run.total_seconds() n_commits = int(seconds_since_last_successful / seconds_between_commits) logger.debug( "%s characteristics %s origin=%s: Interval: %s, n_commits: %s", now, self.type, self.origin, timedelta(seconds=seconds_between_commits), n_commits, ) run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits if run_time > self.MAX_RUN_TIME: # Long visits usually fail return (self.MAX_RUN_TIME, "partial", None) else: return (run_time, "full", current_snapshot)
[docs] def lister_process( env: Environment, lister_id: uuid.UUID ) -> Generator[Event, Event, None]: """Every hour, generate new origins and update the `last_update` field for the ones this process generated in the past""" NUM_NEW_ORIGINS = 100 origins: List[ListedOrigin] = [] while True: updated_origins = [] for origin in origins: model = OriginModel(origin.visit_type, origin.url) updated_origins.append( attr.evolve(origin, last_update=model.get_last_update(env.time)) ) origins = updated_origins origins.extend( generate_listed_origin(lister_id, now=env.time) for _ in range(NUM_NEW_ORIGINS) ) env.scheduler.record_listed_origins(origins) yield env.timeout(3600)
[docs] def load_task_process( env: Environment, task: Task, status_queue: Queue ) -> Iterator[Event]: """A loading task. This pushes OriginVisitStatus objects to the status_queue to simulate the visible outcomes of the task. Uses the `load_task_duration` function to determine its run time. """ status = OriginVisitStatus( origin=task.origin, visit=42, type=task.visit_type, status="created", date=env.time, snapshot=None, ) logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) yield status_queue.put(TaskEvent(task=task, status=status)) origin_model = OriginModel(task.visit_type, task.origin) (run_time, end_status, snapshot) = origin_model.load_task_characteristics(env.time) yield env.timeout(run_time) logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) yield status_queue.put( TaskEvent( task=task, status=attr.evolve( status, status=end_status, date=env.time, snapshot=snapshot ), ) ) (task.visit_type, task.origin), run_time, end_status, snapshot )