swh.journal.pytest_plugin module

swh.journal.pytest_plugin.ensure_lists(value: Any) Any[source]
>>> ensure_lists(["foo", 42])
['foo', 42]
>>> ensure_lists(("foo", 42))
['foo', 42]
>>> ensure_lists({"a": ["foo", 42]})
{'a': ['foo', 42]}
>>> ensure_lists({"a": ("foo", 42)})
{'a': ['foo', 42]}
swh.journal.pytest_plugin.consume_messages(consumer, kafka_prefix, expected_messages)[source]

Consume expected_messages from the consumer; Sort them all into a consumed_objects dict

swh.journal.pytest_plugin.assert_all_objects_consumed(consumed_messages: Dict, exclude: Optional[Collection] = None)[source]

Check whether all objects from TEST_OBJECTS have been consumed

exclude can be a list of object types for which we do not want to compare the values (eg. for anonymized object).


Pick a random prefix for kafka topics on each call

swh.journal.pytest_plugin.kafka_consumer_group(kafka_prefix: str)[source]

Pick a random consumer group for kafka consumers on each call


Set of object types to precreate topics for.


Set of object types to precreate privileged topics for.

swh.journal.pytest_plugin.kafka_server(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str]) str[source]

A kafka server with existing topics

Unprivileged topics are built as {kafka_prefix}.{object_type} with object_type from the object_types list.

Privileged topics are built as {kafka_prefix}_privileged.{object_type} with object_type from the privileged_object_types list.

swh.journal.pytest_plugin.kafka_server_base() Iterator[str][source]

Create a mock kafka cluster suitable for tests.

Yield a connection string.

Note: this is a generator to keep the mock broker alive during the whole test session.

see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h

swh.journal.pytest_plugin.test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str])[source]

Test configuration needed for producer/consumer

swh.journal.pytest_plugin.consumer(kafka_server: str, test_config: Dict, kafka_consumer_group: str) cimpl.Consumer[source]

Get a connected Kafka consumer.