Source code for swh.journal.writer.inmemory
# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from multiprocessing import Manager
from typing import Any, Callable, Dict, Iterable, List, Tuple
from .interface import ValueProtocol
logger = logging.getLogger(__name__)
[docs]
class InMemoryJournalWriter:
objects: List[Tuple[str, ValueProtocol]]
privileged_objects: List[Tuple[str, ValueProtocol]]
def __init__(
self,
value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]],
anonymize: bool = False,
use_shared_memory: bool = False,
):
# Share the list of objects across processes, for RemoteAPI tests.
if use_shared_memory:
self.manager = Manager()
self.objects = self.manager.list() # type: ignore[assignment]
self.privileged_objects = self.manager.list() # type: ignore[assignment]
else:
self.objects = []
self.privileged_objects = []
self.anonymize = anonymize
[docs]
def write_addition(self, object_type: str, object_: ValueProtocol) -> None:
object_.unique_key() # Check this does not error, to mimic the kafka writer
anon_object_ = None
if self.anonymize:
anon_object_ = object_.anonymize()
if anon_object_ is not None:
self.privileged_objects.append((object_type, object_))
self.objects.append((object_type, anon_object_))
else:
self.objects.append((object_type, object_))
[docs]
def write_additions(
self, object_type: str, objects: Iterable[ValueProtocol]
) -> None:
for object_ in objects:
self.write_addition(object_type, object_)
[docs]
def flush(self) -> None:
pass