# Copyright (C) 2017-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import enum
from importlib import import_module
from itertools import cycle
import logging
import os
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
import warnings
from confluent_kafka import (
OFFSET_BEGINNING,
Consumer,
KafkaError,
KafkaException,
TopicPartition,
)
from confluent_kafka.admin import AdminClient, NewTopic
from swh.core.statsd import Statsd
from swh.journal import DEFAULT_PREFIX
from .serializers import kafka_to_value
logger = logging.getLogger(__name__)
rdkafka_logger = logging.getLogger(__name__ + ".rdkafka")
# Only accepted offset reset policy accepted
ACCEPTED_OFFSET_RESET = ["earliest", "latest"]
# Errors that Kafka raises too often and are not useful; therefore they
# we lower their log level to DEBUG instead of INFO.
_SPAMMY_ERRORS = [
KafkaError._NO_OFFSET,
]
[docs]
class EofBehavior(enum.Enum):
"""Possible behaviors when reaching the end of the log"""
CONTINUE = "continue"
STOP = "stop"
RESTART = "restart"
[docs]
def get_journal_client(cls: str, **kwargs: Any):
"""Factory function to instantiate a journal client object.
Currently, only the "kafka" journal client is supported.
"""
if cls == "kafka":
if "stats_cb" in kwargs:
stats_cb = kwargs["stats_cb"]
if isinstance(stats_cb, str):
try:
module_path, func_name = stats_cb.split(":")
except ValueError:
raise ValueError(
"Invalid stats_cb configuration option: "
"it should be a string like 'path.to.module:function'"
)
try:
module = import_module(module_path, package=__package__)
except ModuleNotFoundError:
raise ValueError(
"Invalid stats_cb configuration option: "
f"module {module_path} not found"
)
try:
kwargs["stats_cb"] = getattr(module, func_name)
except AttributeError:
raise ValueError(
"Invalid stats_cb configuration option: "
f"function {func_name} not found in module {module_path}"
)
return JournalClient(**kwargs)
raise ValueError("Unknown journal client class `%s`" % cls)
def _error_cb(error):
if error.fatal():
raise KafkaException(error)
if error.code() in _SPAMMY_ERRORS:
logger.debug("Received non-fatal kafka error: %s", error)
else:
logger.info("Received non-fatal kafka error: %s", error)
def _on_commit(error, partitions):
if error is not None:
_error_cb(error)
[docs]
class JournalClient:
"""A base client for the Software Heritage journal.
The current implementation of the journal uses Apache Kafka
brokers to publish messages under a given topic prefix, with each
object type using a specific topic under that prefix. If the `prefix`
argument is None (default value), it will take the default value
`'swh.journal.objects'`.
Clients subscribe to events specific to each object type as listed in the
``object_types`` argument (if unset, defaults to all existing kafka topic under
the prefix).
Clients can be sharded by setting the ``group_id`` to a common
value across instances. The journal will share the message
throughput across the nodes sharing the same group_id.
Messages are processed by the ``worker_fn`` callback passed to the `process`
method, in batches of maximum ``batch_size`` messages (defaults to 200).
The objects passed to the ``worker_fn`` callback are the result of the kafka
message converted by the ``value_deserializer`` function. By default (if this
argument is not given), it will produce dicts (using the ``kafka_to_value``
function). This signature of the function is::
value_deserializer(object_type: str, kafka_msg: bytes) -> Any
If the value returned by ``value_deserializer`` is None, it is ignored and
not passed the ``worker_fn`` function.
Arguments:
stop_after_objects: If set, the processing stops after processing
this number of messages in total.
on_eof: What to do when reaching the end of each partition (keep consuming,
stop, or restart from earliest offsets); defaults to continuing.
This can be either a :class:`EofBehavior` variant or a string containing the
name of one of the variants.
stop_on_eof: (deprecated) equivalent to passing ``on_eof=EofBehavior.STOP``
auto_offset_reset: sets the behavior of the client when the consumer group
initializes: ``'earliest'`` (the default) processes all objects since the
inception of the topics; ``''``
create_topics: Create kafka topics if they do not exist, not for production, this
flag should only be enabled in development environments.
Any other named argument is passed directly to KafkaConsumer().
"""
def __init__(
self,
brokers: Union[str, List[str]],
group_id: str,
prefix: Optional[str] = None,
object_types: Optional[List[str]] = None,
privileged: bool = False,
stop_after_objects: Optional[int] = None,
batch_size: int = 200,
process_timeout: Optional[float] = None,
auto_offset_reset: str = "earliest",
stop_on_eof: Optional[bool] = None,
on_eof: Optional[Union[EofBehavior, str]] = None,
value_deserializer: Optional[Callable[[str, bytes], Any]] = None,
create_topics: bool = False,
**kwargs,
):
if prefix is None:
prefix = DEFAULT_PREFIX
if auto_offset_reset not in ACCEPTED_OFFSET_RESET:
raise ValueError(
"Option 'auto_offset_reset' only accept %s, not %s"
% (ACCEPTED_OFFSET_RESET, auto_offset_reset)
)
if batch_size <= 0:
raise ValueError("Option 'batch_size' needs to be positive")
if value_deserializer:
self.value_deserializer = value_deserializer
else:
self.value_deserializer = lambda _, value: kafka_to_value(value)
if stop_on_eof is not None:
if on_eof is not None:
raise TypeError(
"stop_on_eof and on_eof are mutually exclusive (the former is "
"deprecated)"
)
elif stop_on_eof:
warnings.warn(
"stop_on_eof=True should be replaced with "
"on_eof=EofBehavior.STOP ('on_eof: stop' in YAML)",
DeprecationWarning,
2,
)
on_eof = EofBehavior.STOP
else:
warnings.warn(
"stop_on_eof=False should be replaced with "
"on_eof=EofBehavior.CONTINUE ('on_eof: continue' in YAML)",
DeprecationWarning,
2,
)
on_eof = EofBehavior.CONTINUE
self.on_eof = EofBehavior(on_eof or EofBehavior.CONTINUE)
if isinstance(brokers, str):
brokers = [brokers]
debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG)
if debug_logging and "debug" not in kwargs:
kwargs["debug"] = "consumer"
# Static group instance id management
group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID")
if group_instance_id:
kwargs["group.instance.id"] = group_instance_id
if "group.instance.id" in kwargs:
# When doing static consumer group membership, set a higher default
# session timeout. The session timeout is the duration after which
# the broker considers that a consumer has left the consumer group
# for good, and triggers a rebalance. Considering our current
# processing pattern, 10 minutes gives the consumer ample time to
# restart before that happens.
if "session.timeout.ms" not in kwargs:
kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes
if "session.timeout.ms" in kwargs:
# When the session timeout is set, rdkafka requires the max poll
# interval to be set to a higher value; the max poll interval is
# rdkafka's way of figuring out whether the client's message
# processing thread has stalled: when the max poll interval lapses
# between two calls to consumer.poll(), rdkafka leaves the consumer
# group and terminates the connection to the brokers.
#
# We default to 1.5 times the session timeout
if "max.poll.interval.ms" not in kwargs:
kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3
consumer_settings = {
**kwargs,
"bootstrap.servers": ",".join(brokers),
"auto.offset.reset": auto_offset_reset,
"group.id": group_id,
"on_commit": _on_commit,
"error_cb": _error_cb,
"enable.auto.commit": False,
"logger": rdkafka_logger,
}
if self.on_eof != EofBehavior.CONTINUE:
consumer_settings["enable.partition.eof"] = True
if logger.isEnabledFor(logging.DEBUG):
filtered_keys = {"sasl.password"}
logger.debug("Consumer settings:")
for k, v in consumer_settings.items():
if k in filtered_keys:
v = "**filtered**"
logger.debug(" %s: %s", k, v)
self.statsd = Statsd(
namespace="swh_journal_client", constant_tags={"group": group_id}
)
self.consumer = Consumer(consumer_settings)
if privileged:
privileged_prefix = f"{prefix}_privileged"
else: # do not attempt to subscribe to privileged topics
privileged_prefix = f"{prefix}"
existing_topics = [
topic
for topic in self.consumer.list_topics(timeout=10).topics.keys()
if (
topic.startswith(f"{prefix}.")
or topic.startswith(f"{privileged_prefix}.")
)
]
if not create_topics and not existing_topics:
raise ValueError(
f"The prefix {prefix} does not match any existing topic "
"on the kafka broker"
)
if not object_types:
object_types = list({topic.split(".")[-1] for topic in existing_topics})
self.subscription = []
unknown_types = []
for object_type in object_types:
topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}")
for topic in topics:
if create_topics or topic in existing_topics:
self.subscription.append(topic)
break
else:
unknown_types.append(object_type)
if create_topics:
topic_list = [
NewTopic(topic, 1, 1)
for topic in self.subscription
if topic not in existing_topics
]
if topic_list:
logger.debug("Creating topics: %s", topic_list)
admin_client = AdminClient({"bootstrap.servers": ",".join(brokers)})
for topic in admin_client.create_topics(topic_list).values():
try:
# wait for topic to be created
topic.result() # type: ignore[attr-defined]
except KafkaException:
# topic already exists
pass
if unknown_types:
raise ValueError(
f"Topic(s) for object types {','.join(unknown_types)} "
"are unknown on the kafka broker"
)
logger.debug(f"Upstream topics: {existing_topics}")
self.subscribe()
self.stop_after_objects = stop_after_objects
self.eof_reached: Set[Tuple[str, str]] = set()
self.batch_size = batch_size
if process_timeout is not None:
raise DeprecationWarning(
"'process_timeout' argument is not supported anymore by "
"JournalClient; please remove it from your configuration.",
)
[docs]
def subscribe(self):
"""Subscribe to topics listed in self.subscription
This can be overridden if you need, for instance, to manually assign partitions.
"""
logger.debug(f"Subscribing to: {self.subscription}")
self.consumer.subscribe(topics=self.subscription)
[docs]
def process(self, worker_fn: Callable[[Dict[str, List[dict]]], None]):
"""Polls Kafka for a batch of messages, and calls the worker_fn
with these messages.
Args:
worker_fn: Function called with the messages as argument.
"""
total_objects_processed = 0
# timeout for message poll
timeout = 1.0
with self.statsd.status_gauge(
"status", statuses=["idle", "processing", "waiting"]
) as set_status:
set_status("idle")
while True:
batch_size = self.batch_size
if self.stop_after_objects:
if total_objects_processed >= self.stop_after_objects:
break
# clamp batch size to avoid overrunning stop_after_objects
batch_size = min(
self.stop_after_objects - total_objects_processed,
batch_size,
)
set_status("waiting")
for i in cycle(reversed(range(10))):
messages = self.consumer.consume(
timeout=timeout, num_messages=batch_size
)
if messages:
break
# do check for an EOF condition iff we already consumed
# messages, otherwise we could detect an EOF condition
# before messages had a chance to reach us (e.g. in tests)
if total_objects_processed > 0 and i == 0:
if self.on_eof == EofBehavior.STOP:
at_eof = all(
(tp.topic, tp.partition) in self.eof_reached
for tp in self.consumer.assignment()
)
if at_eof:
break
elif self.on_eof == EofBehavior.RESTART:
for tp in self.consumer.assignment():
if (tp.topic, tp.partition) in self.eof_reached:
self.eof_reached.remove((tp.topic, tp.partition))
self.statsd.increment("partition_restart_total")
new_tp = TopicPartition(
tp.topic,
tp.partition,
OFFSET_BEGINNING,
)
self.consumer.seek(new_tp)
elif self.on_eof == EofBehavior.CONTINUE:
pass # Nothing to do, we'll just keep consuming
else:
assert False, f"Unexpected on_eof behavior: {self.on_eof}"
if messages:
set_status("processing")
batch_processed, at_eof = self.handle_messages(messages, worker_fn)
set_status("idle")
# report the number of handled messages
self.statsd.increment("handle_message_total", value=batch_processed)
total_objects_processed += batch_processed
if self.on_eof == EofBehavior.STOP and at_eof:
self.statsd.increment("stop_total")
break
return total_objects_processed
[docs]
def handle_messages(
self, messages, worker_fn: Callable[[Dict[str, List[dict]]], None]
) -> Tuple[int, bool]:
objects: Dict[str, List[Any]] = defaultdict(list)
nb_processed = 0
for message in messages:
error = message.error()
if error is not None:
if error.code() == KafkaError._PARTITION_EOF:
self.eof_reached.add((message.topic(), message.partition()))
else:
_error_cb(error)
continue
if message.value() is None:
# ignore message with no payload, these can be generated in tests
continue
nb_processed += 1
object_type = message.topic().split(".")[-1]
deserialized_object = self.deserialize_message(
message, object_type=object_type
)
if deserialized_object is not None:
objects[object_type].append(deserialized_object)
if objects:
worker_fn(dict(objects))
self.consumer.commit()
if self.on_eof in (EofBehavior.STOP, EofBehavior.RESTART):
at_eof = all(
(tp.topic, tp.partition) in self.eof_reached
for tp in self.consumer.assignment()
)
elif self.on_eof == EofBehavior.CONTINUE:
at_eof = False
else:
assert False, f"Unexpected on_eof behavior: {self.on_eof}"
return nb_processed, at_eof
[docs]
def deserialize_message(self, message, object_type=None):
return self.value_deserializer(object_type, message.value())
[docs]
def close(self):
self.consumer.close()