Source code for swh.journal.writer.interface
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict, Iterable, Optional, TypeVar
from typing_extensions import Protocol, runtime_checkable
from swh.model.model import KeyType
TSelf = TypeVar("TSelf")
[docs]
class ValueProtocol(Protocol):
[docs]
def anonymize(self: TSelf) -> Optional[TSelf]: ...
[docs]
def unique_key(self) -> KeyType: ...
[docs]
def to_dict(self) -> Dict[str, Any]: ...
[docs]
@runtime_checkable
class JournalWriterInterface(Protocol):
[docs]
def write_addition(self, object_type: str, object_: ValueProtocol) -> None:
"""Add a SWH object of type object_type in the journal."""
...
[docs]
def write_additions(
self, object_type: str, objects: Iterable[ValueProtocol]
) -> None:
"""Add a list of SWH objects of type object_type in the journal."""
...
[docs]
def flush(self) -> None:
"""Flush the pending object additions in the backend, if any."""
...