Source code for swh.storage.pytest_plugin

# Copyright (C) 2019-2023  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import datetime
from functools import partial
import os
import resource
import signal
import socket
import subprocess
import time

from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
import pytest
from pytest_postgresql import factories

from swh.core.db.db_utils import initialize_database_for_module
from swh.storage import get_storage
from swh.storage.postgresql.db import Db as PostgreSQLDb
from swh.storage.postgresql.storage import Storage as StorageDatastore
from swh.storage.tests.storage_data import StorageData

os.environ["LC_ALL"] = "C.UTF-8"


_CASSANDRA_CONFIG_TEMPLATE = """
data_file_directories:
    - {data_dir}/data
commitlog_directory: {data_dir}/commitlog
hints_directory: {data_dir}/hints
saved_caches_directory: {data_dir}/saved_caches

commitlog_sync: periodic
commitlog_sync_period_in_ms: 1000000
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
endpoint_snitch: SimpleSnitch
seed_provider:
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
      parameters:
          - seeds: "127.0.0.1"

storage_port: {storage_port}
native_transport_port: {native_transport_port}
start_native_transport: true
listen_address: 127.0.0.1

enable_user_defined_functions: true

# speed-up by disabling period saving to disk
key_cache_save_period: 0
row_cache_save_period: 0
trickle_fsync: false
commitlog_sync_period_in_ms: 100000
authenticator: PasswordAuthenticator
"""

_SCYLLA_EXTRA_CONFIG_TEMPLATE = """
experimental_features:
    - udf
view_hints_directory: {data_dir}/view_hints
prometheus_port: 0  # disable prometheus server
start_rpc: false  # disable thrift server
api_port: {api_port}
"""


def _free_port():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("127.0.0.1", 0))
    port = sock.getsockname()[1]
    sock.close()
    return port


def _wait_for_peer(addr, port):
    wait_until = time.time() + 60
    while time.time() < wait_until:
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.connect((addr, port))
        except ConnectionRefusedError:
            time.sleep(0.1)
        else:
            sock.close()
            return True
    return False


[docs] @pytest.fixture(scope="session") def cassandra_auth_provider_config(): return { "cls": "cassandra.auth.PlainTextAuthProvider", "username": "cassandra", "password": "cassandra", }
[docs] @pytest.fixture(scope="session") def swh_storage_cassandra_cluster(tmpdir_factory): cassandra_conf = tmpdir_factory.mktemp("cassandra_conf") cassandra_data = tmpdir_factory.mktemp("cassandra_data") cassandra_log = tmpdir_factory.mktemp("cassandra_log") native_transport_port = _free_port() storage_port = _free_port() jmx_port = _free_port() api_port = _free_port() use_scylla = bool(os.environ.get("SWH_USE_SCYLLADB", "")) cassandra_bin = os.environ.get( "SWH_CASSANDRA_BIN", "/usr/bin/scylla" if use_scylla else "/usr/sbin/cassandra" ) if use_scylla: os.makedirs(cassandra_conf.join("conf")) config_path = cassandra_conf.join("conf/scylla.yaml") config_template = _CASSANDRA_CONFIG_TEMPLATE + _SCYLLA_EXTRA_CONFIG_TEMPLATE else: config_path = cassandra_conf.join("cassandra.yaml") config_template = _CASSANDRA_CONFIG_TEMPLATE with open(str(config_path), "w") as fd: fd.write( config_template.format( data_dir=str(cassandra_data), storage_port=storage_port, native_transport_port=native_transport_port, api_port=api_port, ) ) if os.environ.get("SWH_CASSANDRA_LOG"): stdout = stderr = None else: stdout = stderr = subprocess.DEVNULL env = { "MAX_HEAP_SIZE": "300M", "HEAP_NEWSIZE": "50M", "JVM_OPTS": "-Xlog:gc=error:file=%s/gc.log" % cassandra_log, } if "JAVA_HOME" in os.environ: env["JAVA_HOME"] = os.environ["JAVA_HOME"] if use_scylla: env = { **env, "SCYLLA_HOME": cassandra_conf, } # prevent "NOFILE rlimit too low (recommended setting 200000, # minimum setting 10000; refusing to start." resource.setrlimit(resource.RLIMIT_NOFILE, (200000, 200000)) proc = subprocess.Popen( [ cassandra_bin, "--developer-mode=1", ], start_new_session=True, env=env, stdout=stdout, stderr=stderr, ) else: proc = subprocess.Popen( [ cassandra_bin, "-Dcassandra.config=file://%s/cassandra.yaml" % cassandra_conf, "-Dcassandra.logdir=%s" % cassandra_log, "-Dcassandra.jmx.local.port=%d" % jmx_port, "-Dcassandra-foreground=yes", ], start_new_session=True, env=env, stdout=stdout, stderr=stderr, ) listening = _wait_for_peer("127.0.0.1", native_transport_port) if listening: # Wait for initialization auth_provider = PlainTextAuthProvider( username="cassandra", password="cassandra" ) cluster = Cluster( ["127.0.0.1"], port=native_transport_port, auth_provider=auth_provider, connect_timeout=30, control_connection_timeout=30, ) session = None retry = 0 while (not session) and retry < 10: try: session = cluster.connect() except Exception: time.sleep(1) retry += 1 cluster.shutdown() yield (["127.0.0.1"], native_transport_port) if not listening or os.environ.get("SWH_CASSANDRA_LOG"): debug_log_path = str(cassandra_log.join("debug.log")) if os.path.exists(debug_log_path): with open(debug_log_path) as fd: print(fd.read()) if not listening: if proc.poll() is None: raise Exception("cassandra process unexpectedly not listening.") else: raise Exception("cassandra process unexpectedly stopped.") pgrp = os.getpgid(proc.pid) os.killpg(pgrp, signal.SIGKILL)
[docs] @pytest.fixture(scope="session") def swh_storage_cassandra_keyspace( swh_storage_cassandra_cluster, cassandra_auth_provider_config ): from swh.storage.cassandra import create_keyspace (hosts, port) = swh_storage_cassandra_cluster keyspace = "test" + os.urandom(10).hex() create_keyspace(hosts, keyspace, port, auth_provider=cassandra_auth_provider_config) return keyspace
[docs] @pytest.fixture def swh_storage_cassandra_backend_config( swh_storage_cassandra_cluster, swh_storage_cassandra_keyspace, cassandra_auth_provider_config, ): from swh.storage.cassandra.schema import TABLES (hosts, port) = swh_storage_cassandra_cluster keyspace = swh_storage_cassandra_keyspace storage_config = dict( cls="cassandra", hosts=hosts, port=port, keyspace=keyspace, journal_writer={"cls": "memory"}, objstorage={"cls": "memory"}, auth_provider=cassandra_auth_provider_config, ) yield storage_config storage = get_storage(**storage_config) for table in TABLES: table_rows = storage._cql_runner._session.execute( f"SELECT * from {keyspace}.{table} LIMIT 1" ) if table_rows.one() is not None: storage._cql_runner._session.execute(f"TRUNCATE TABLE {keyspace}.{table}") storage._cql_runner._cluster.shutdown()
[docs] def create_object_references_partition(**kwargs): db = PostgreSQLDb.connect(**kwargs) with db.transaction() as cur: db.object_references_create_partition( *datetime.date.today().isocalendar()[0:2], cur=cur )
swh_storage_postgresql_proc = factories.postgresql_proc( load=[ partial( initialize_database_for_module, modname="storage", version=StorageDatastore.current_version, ), create_object_references_partition, ], ) swh_storage_postgresql = factories.postgresql( "swh_storage_postgresql_proc", )
[docs] @pytest.fixture def swh_storage_postgresql_backend_config(swh_storage_postgresql): """Basic pg storage configuration with no journal collaborator (to avoid pulling optional dependency on clients of this fixture) """ yield { "cls": "postgresql", "db": swh_storage_postgresql.info.dsn, "objstorage": {"cls": "memory"}, "check_config": {"check_write": True}, "max_pool_conns": 100, }
[docs] @pytest.fixture def swh_storage_backend_config(swh_storage_postgresql_backend_config): """Configuration to use for :func:`swh_storage_backend`. Defaults to :func:`swh_storage_postgresql_backend_config`. """ return swh_storage_postgresql_backend_config
[docs] @pytest.fixture def swh_storage_backend(swh_storage_backend_config): """ By default, this fixture aliases ``swh_storage``. However, when ``swh_storage`` is overridden to be a proxy storage, this fixture returns the storage instance behind all proxies. This is useful to introspect the state of backends from proxy tests""" return get_storage(**swh_storage_backend_config)
[docs] @pytest.fixture def swh_storage(swh_storage_backend): return swh_storage_backend
[docs] @pytest.fixture def sample_data() -> StorageData: """Pre-defined sample storage object data to manipulate Returns: StorageData whose attribute keys are data model objects. Either multiple objects: contents, directories, revisions, releases, ... or simple ones: content, directory, revision, release, ... """ return StorageData()