swh.journal.pytest_plugin module#

swh.journal.pytest_plugin.ensure_lists(value: Any) Any[source]#
>>> ensure_lists(["foo", 42])
['foo', 42]
>>> ensure_lists(("foo", 42))
['foo', 42]
>>> ensure_lists({"a": ["foo", 42]})
{'a': ['foo', 42]}
>>> ensure_lists({"a": ("foo", 42)})
{'a': ['foo', 42]}
swh.journal.pytest_plugin.consume_messages(consumer, kafka_prefix, expected_messages)[source]#

Consume expected_messages from the consumer; Sort them all into a consumed_objects dict

swh.journal.pytest_plugin.assert_all_objects_consumed(consumed_messages: Dict, exclude: Collection | None = None)[source]#

Check whether all objects from TEST_OBJECTS have been consumed

exclude can be a list of object types for which we do not want to compare the values (eg. for anonymized object).

swh.journal.pytest_plugin.kafka_prefix()[source]#

Pick a random prefix for kafka topics on each call

swh.journal.pytest_plugin.kafka_consumer_group(kafka_prefix: str)[source]#

Pick a random consumer group for kafka consumers on each call

swh.journal.pytest_plugin.object_types()[source]#

Set of object types to precreate topics for.

swh.journal.pytest_plugin.privileged_object_types()[source]#

Set of object types to precreate privileged topics for.

swh.journal.pytest_plugin.kafka_server(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str]) str[source]#

A kafka server with existing topics

Unprivileged topics are built as {kafka_prefix}.{object_type} with object_type from the object_types list.

Privileged topics are built as {kafka_prefix}_privileged.{object_type} with object_type from the privileged_object_types list.

swh.journal.pytest_plugin.kafka_server_base() Iterator[str][source]#

Create a mock kafka cluster suitable for tests.

Yield a connection string.

Note: this is a generator to keep the mock broker alive during the whole test session.

see edenhill/librdkafka

swh.journal.pytest_plugin.test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str])[source]#

Test configuration needed for producer/consumer

swh.journal.pytest_plugin.consumer(kafka_server: str, test_config: Dict, kafka_consumer_group: str) Consumer[source]#

Get a connected Kafka consumer.