Source code for swh.journal.writer.stream
# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from typing import Any, BinaryIO, Callable, Dict, Iterable
from swh.journal.serializers import value_to_kafka
from .interface import ValueProtocol
logger = logging.getLogger(__name__)
[docs]
class StreamJournalWriter:
"""A simple JournalWriter which serializes objects in a stream
Might be used to serialize a storage in a file to generate a test dataset.
"""
def __init__(
self,
output_stream: BinaryIO,
value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]],
):
# Share the list of objects across processes, for RemoteAPI tests.
self.output = output_stream
self.value_sanitizer = value_sanitizer
[docs]
def write_addition(self, object_type: str, object_: ValueProtocol) -> None:
object_.unique_key() # Check this does not error, to mimic the kafka writer
dict_ = self.value_sanitizer(object_type, object_.to_dict())
self.output.write(value_to_kafka((object_type, dict_)))
[docs]
def write_additions(
self, object_type: str, objects: Iterable[ValueProtocol]
) -> None:
for object_ in objects:
self.write_addition(object_type, object_)
[docs]
def flush(self) -> None:
self.output.flush()