Source code for swh.webhooks.journal_client

# Copyright (C) 2023  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from functools import partial
from typing import Any, Dict, List

import sentry_sdk

from swh.core.sentry import init_sentry
from swh.journal.client import JournalClient
from swh.model.swhids import CoreSWHID, ObjectType
from swh.webhooks.interface import Webhooks


[docs] def process_journal_objects( messages: Dict[str, List[Dict[str, Any]]], webhooks: Webhooks ): process_origins(messages.get("origin", []), webhooks) process_origin_visit_statuses(messages.get("origin_visit_status", []), webhooks)
[docs] def process_origins(origins: List[Dict[str, Any]], webhooks: Webhooks): for origin in origins: try: webhooks.event_send( "origin.create", {"origin_url": origin["url"]}, channel=origin["url"] ) except Exception as e: sentry_sdk.capture_exception(e)
[docs] def process_origin_visit_statuses( origin_visit_statuses: List[Dict[str, Any]], webhooks: Webhooks ): for origin_visit_status in origin_visit_statuses: try: webhooks.event_send( "origin.visit", { "origin_url": origin_visit_status["origin"], "visit_type": origin_visit_status["type"], "visit_date": origin_visit_status["date"].isoformat(), "visit_status": origin_visit_status["status"], "snapshot_swhid": ( str( CoreSWHID( object_type=ObjectType.SNAPSHOT, object_id=origin_visit_status["snapshot"], ) ) if origin_visit_status["snapshot"] else None ), }, channel=origin_visit_status["origin"], ) except Exception as e: sentry_sdk.capture_exception(e)
[docs] def process(client: JournalClient, webhooks: Webhooks): init_sentry() return client.process(partial(process_journal_objects, webhooks=webhooks))