Source code for swh.search.journal_client
# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import sys
from typing import Dict, Optional
from swh.model.model import SnapshotTargetType
from swh.storage.algos.snapshot import snapshot_get_all_branches
from swh.storage.interface import StorageInterface
EXPECTED_MESSAGE_TYPES = {
"origin",
"origin_visit_status",
"origin_intrinsic_metadata",
"origin_extrinsic_metadata",
}
[docs]
def fetch_last_revision_release_date(
snapshot_id: bytes, storage: StorageInterface
) -> Dict[str, str]:
if "pytest" not in sys.modules:
# FIXME: This function is too slow to be reasonably used in the journal-client
# (at least the main one), we need to figure out a solution before this can
# be enabled again.
return {}
if not snapshot_id:
return {}
snapshot = snapshot_get_all_branches(storage, snapshot_id)
if not snapshot:
return {}
branches = snapshot.branches.values()
tip_revision_ids = []
tip_release_ids = []
for branch in branches:
if branch.target_type == SnapshotTargetType.REVISION:
tip_revision_ids.append(branch.target)
elif branch.target_type == SnapshotTargetType.RELEASE:
tip_release_ids.append(branch.target)
revision_datetimes = [
revision.date.to_datetime()
for revision in storage.revision_get(tip_revision_ids)
if revision and revision.date
]
release_datetimes = [
release.date.to_datetime()
for release in storage.release_get(tip_release_ids)
if release and release.date
]
ret = {}
if revision_datetimes:
ret["last_revision_date"] = max(revision_datetimes).isoformat()
if release_datetimes:
ret["last_release_date"] = max(release_datetimes).isoformat()
return ret
[docs]
def process_journal_objects(messages, *, search, storage=None):
"""Worker function for `JournalClient.process(worker_fn)`, after
currification of `scheduler` and `task_names`."""
assert set(messages) <= EXPECTED_MESSAGE_TYPES, set(messages)
if "origin" in messages:
process_origins(messages["origin"], search)
if "origin_visit_status" in messages:
process_origin_visit_statuses(messages["origin_visit_status"], search, storage)
if "origin_intrinsic_metadata" in messages:
process_origin_intrinsic_metadata(messages["origin_intrinsic_metadata"], search)
if "origin_extrinsic_metadata" in messages:
process_origin_extrinsic_metadata(messages["origin_extrinsic_metadata"], search)
[docs]
def process_origins(origins, search):
logging.debug("processing origins %r", origins)
search.origin_update(origins)
[docs]
def process_origin_visit_statuses(visit_statuses, search, storage):
logging.debug("processing origin visit statuses %r", visit_statuses)
def hexify(b: Optional[bytes]) -> Optional[str]:
if b is None:
return None
return b.hex()
processed_visit_statuses = []
for visit_status in visit_statuses:
visit_types = []
visit_type = visit_status.get("type")
if visit_type is None:
visit = storage.origin_visit_get_by(
origin=visit_status["origin"], visit=visit_status["visit"]
)
if visit is not None:
visit_types.append(visit.type)
else:
visit_types.append(visit_type)
processed_status = {
"url": visit_status["origin"],
"visit_types": visit_types,
}
if visit_status["status"] == "full":
processed_status.update(
{
"has_visits": True,
"nb_visits": visit_status["visit"],
"snapshot_id": hexify(visit_status.get("snapshot")),
"last_visit_date": visit_status["date"].isoformat(),
"last_eventful_visit_date": visit_status["date"].isoformat(),
**fetch_last_revision_release_date(
visit_status.get("snapshot"), storage
),
}
)
processed_visit_statuses.append(processed_status)
if processed_visit_statuses:
search.origin_update(processed_visit_statuses)