Source code for swh.journal.writer.inmemory

# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
from multiprocessing import Manager
from typing import Any, Callable, Dict, Iterable, List, Tuple

from .interface import ValueProtocol

logger = logging.getLogger(__name__)


[docs] class InMemoryJournalWriter: objects: List[Tuple[str, ValueProtocol]] privileged_objects: List[Tuple[str, ValueProtocol]] def __init__( self, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]], anonymize: bool = False, use_shared_memory: bool = False, ): # Share the list of objects across processes, for RemoteAPI tests. if use_shared_memory: self.manager = Manager() self.objects = self.manager.list() # type: ignore[assignment] self.privileged_objects = self.manager.list() # type: ignore[assignment] else: self.objects = [] self.privileged_objects = [] self.anonymize = anonymize
[docs] def write_addition(self, object_type: str, object_: ValueProtocol) -> None: object_.unique_key() # Check this does not error, to mimic the kafka writer anon_object_ = None if self.anonymize: anon_object_ = object_.anonymize() if anon_object_ is not None: self.privileged_objects.append((object_type, object_)) self.objects.append((object_type, anon_object_)) else: self.objects.append((object_type, object_))
[docs] def write_additions( self, object_type: str, objects: Iterable[ValueProtocol] ) -> None: for object_ in objects: self.write_addition(object_type, object_)
[docs] def flush(self) -> None: pass