Source code for swh.webhooks.journal_client
# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from functools import partial
from typing import Any, Dict, List
import sentry_sdk
from swh.core.sentry import init_sentry
from swh.journal.client import JournalClient
from swh.model.swhids import CoreSWHID, ObjectType
from swh.webhooks.interface import Webhooks
[docs]
def process_journal_objects(
messages: Dict[str, List[Dict[str, Any]]], webhooks: Webhooks
):
process_origins(messages.get("origin", []), webhooks)
process_origin_visit_statuses(messages.get("origin_visit_status", []), webhooks)
[docs]
def process_origins(origins: List[Dict[str, Any]], webhooks: Webhooks):
for origin in origins:
try:
webhooks.event_send(
"origin.create", {"origin_url": origin["url"]}, channel=origin["url"]
)
except Exception as e:
sentry_sdk.capture_exception(e)
[docs]
def process_origin_visit_statuses(
origin_visit_statuses: List[Dict[str, Any]], webhooks: Webhooks
):
for origin_visit_status in origin_visit_statuses:
try:
webhooks.event_send(
"origin.visit",
{
"origin_url": origin_visit_status["origin"],
"visit_type": origin_visit_status["type"],
"visit_date": origin_visit_status["date"].isoformat(),
"visit_status": origin_visit_status["status"],
"snapshot_swhid": (
str(
CoreSWHID(
object_type=ObjectType.SNAPSHOT,
object_id=origin_visit_status["snapshot"],
)
)
if origin_visit_status["snapshot"]
else None
),
},
channel=origin_visit_status["origin"],
)
except Exception as e:
sentry_sdk.capture_exception(e)
[docs]
def process(client: JournalClient, webhooks: Webhooks):
init_sentry()
return client.process(partial(process_journal_objects, webhooks=webhooks))