swh.journal.pytest_plugin module#
- swh.journal.pytest_plugin.ensure_lists(value: Any) Any [source]#
>>> ensure_lists(["foo", 42]) ['foo', 42] >>> ensure_lists(("foo", 42)) ['foo', 42] >>> ensure_lists({"a": ["foo", 42]}) {'a': ['foo', 42]} >>> ensure_lists({"a": ("foo", 42)}) {'a': ['foo', 42]}
- swh.journal.pytest_plugin.consume_messages(consumer, kafka_prefix, expected_messages)[source]#
Consume expected_messages from the consumer; Sort them all into a consumed_objects dict
- swh.journal.pytest_plugin.assert_all_objects_consumed(consumed_messages: Dict, exclude: Collection | None = None)[source]#
Check whether all objects from TEST_OBJECTS have been consumed
exclude can be a list of object types for which we do not want to compare the values (eg. for anonymized object).
- swh.journal.pytest_plugin.kafka_prefix()[source]#
Pick a random prefix for kafka topics on each call
- swh.journal.pytest_plugin.kafka_consumer_group(kafka_prefix: str)[source]#
Pick a random consumer group for kafka consumers on each call
- swh.journal.pytest_plugin.privileged_object_types()[source]#
Set of object types to precreate privileged topics for.
- swh.journal.pytest_plugin.kafka_server(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str]) str [source]#
A kafka server with existing topics
Unprivileged topics are built as
{kafka_prefix}.{object_type}
with object_type from theobject_types
list.Privileged topics are built as
{kafka_prefix}_privileged.{object_type}
with object_type from theprivileged_object_types
list.
- swh.journal.pytest_plugin.kafka_server_base() Iterator[str] [source]#
Create a mock kafka cluster suitable for tests.
Yield a connection string.
Note: this is a generator to keep the mock broker alive during the whole test session.