Source code for swh.indexer.storage.writer
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict, Iterable, Optional
try:
from swh.journal.writer import JournalWriterInterface, get_journal_writer
except ImportError:
get_journal_writer = None # type: ignore
# mypy limitation, see https://github.com/python/mypy/issues/1153
from .model import BaseRow
[docs]
class JournalWriter:
"""Journal writer storage collaborator. It's in charge of adding objects to
the journal.
"""
journal: Optional[JournalWriterInterface]
def __init__(self, journal_writer: Dict[str, Any]):
"""
Args:
journal_writer: configuration passed to
`swh.journal.writer.get_journal_writer`
"""
if journal_writer:
if get_journal_writer is None:
raise EnvironmentError(
"You need the swh.journal package to use the "
"journal_writer feature"
)
self.journal = get_journal_writer(
**journal_writer,
value_sanitizer=lambda object_type, value_dict: value_dict,
)
else:
self.journal = None
[docs]
def write_additions(self, obj_type, entries: Iterable[BaseRow]) -> None:
if not self.journal:
return
translated = []
for entry in entries:
assert entry.object_type == obj_type # type: ignore
# ids are internal to the database and should not be sent to postgresql
if entry.indexer_configuration_id is not None:
raise ValueError(
f"{entry} passed to JournalWriter.write_additions has "
f"indexer_configuration_id instead of full tool dict"
)
assert entry.tool, "Missing both indexer_configuration_id and tool dict"
if "id" in entry.tool:
raise ValueError(
f"{entry} passed to JournalWriter.write_additions "
f"contains a tool id"
)
translated.append(entry)
# write to kafka
self.journal.write_additions(obj_type, translated)