# Copyright (C) 2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import dataclasses
import enum
import graphlib
import logging
import textwrap
from typing import Callable, Iterable, Optional, Sequence
from .cql import CqlRunner, create_table
from .model import MigrationRow
logger = logging.getLogger(__name__)
[docs]
class MigrationStatus(enum.Enum):
PENDING = "pending"
"""The migration was not applied yet"""
RUNNING = "running"
COMPLETED = "completed"
[docs]
@dataclasses.dataclass
class Migration:
id: str
"""Unique identifier of this migration.
Should have the format: ``YYYY-MM-DD_developer_readable_name``"""
dependencies: set[str]
"""Set of identifiers of migrations this migration depends on"""
min_read_version: str
"""Lowest version of the Python code that should be allowed to read the database
if this migration is applied"""
script: Optional[Callable[[CqlRunner], None]]
"""If provided, this is a function that runs the migration.
If not provided, the migration must be run manually, using steps described
in the documentation"""
help: Optional[str]
"""Documentation of the migration
Typically describes what to do if ``script`` is :const:`None`."""
required: bool
"""Whether this migration must be applied for the current version of the Python code
to allow instantiating :class:`swh.storage.cassandra.CassandraStorage`."""
MIGRATIONS: tuple[Migration, ...] = (
Migration(
id="2024-12-12_init",
dependencies=set(),
min_read_version="2.9.0",
script=lambda _cql_runner: None,
help="Dummy migration that represents the database schema as of v2.9.0" "",
required=True,
),
)
[docs]
def list_migrations(
cql_runner: CqlRunner, rows: Optional[Sequence[MigrationRow]] = None
) -> list[tuple[Migration, MigrationStatus]]:
"""Returns all known migrations, in topological order
``rows``, should be the value returned by ``cql_runner.migration_list``.
This includes migrations that are not required to instantiate
:class:`swh.storage.cassandra.CassandraStorage`."""
dependency_graph = {m.id: m.dependencies for m in MIGRATIONS}
if rows is None:
rows = list(cql_runner.migration_list())
statuses = {row.id: row.status for row in rows}
migrations = {migration.id: migration for migration in MIGRATIONS}
return [
(
migrations[migration_id],
MigrationStatus(statuses.get(migration_id, "pending")),
)
for migration_id in graphlib.TopologicalSorter(dependency_graph).static_order()
]
[docs]
def apply_migrations(
cql_runner: CqlRunner, ids_to_apply: Iterable[str]
) -> tuple[bool, Sequence[Migration], Sequence[Migration]]:
"""Applies migrations with the given ids (unless they already are).
Returns:
* whether any was run, and
* which migrations still need to be run manually.
* which migrations cannot run because they are missing dependencies
"""
applied_any = False
remaining_manual_migrations = []
remaining_migrations_missing_dependencies = []
statuses = {
migration.id: status for (migration, status) in list_migrations(cql_runner)
}
for migration_id in ids_to_apply:
if migration_id not in statuses:
raise ValueError(f"Unknown migration: {migration_id}")
migrations_to_apply = [
migration for migration in MIGRATIONS if migration.id in ids_to_apply
]
for migration in migrations_to_apply:
status = statuses[migration.id]
if status == MigrationStatus.PENDING:
missing_dependencies = {
dependency
for dependency in migration.dependencies
if statuses[dependency] != MigrationStatus.COMPLETED
}
if missing_dependencies:
logger.warning(
"Cannot apply %s: depends on %s",
migration.id,
", ".join(missing_dependencies),
)
remaining_migrations_missing_dependencies.append(migration)
continue
cql_runner.migration_add_one(
MigrationRow(
id=migration.id,
dependencies=migration.dependencies,
min_read_version=migration.min_read_version,
status=MigrationStatus.RUNNING.value,
)
)
if migration.script is None:
logger.info("Skipping %s", migration.id)
if migration.help:
logger.info("%s", textwrap.indent(migration.help, " "))
remaining_manual_migrations.append(migration)
else:
logger.info("Running %s...", migration.id)
migration.script(cql_runner)
cql_runner.migration_add_one(
MigrationRow(
id=migration.id,
dependencies=migration.dependencies,
min_read_version=migration.min_read_version,
status=MigrationStatus.COMPLETED.value,
)
)
logger.info("Done.")
statuses[migration.id] = MigrationStatus.COMPLETED
applied_any = True
return (
applied_any,
remaining_manual_migrations,
remaining_migrations_missing_dependencies,
)
[docs]
def create_migrations_table_if_needed(cql_runner: CqlRunner) -> None:
if not list(
cql_runner.execute_with_retries(
"""
SELECT table_name
FROM system_schema.tables
WHERE keyspace_name=%s AND table_name='migration'
""",
[cql_runner.keyspace],
)
):
logger.info("'migrations' table does not exist yet, creating it.")
# 'migration' table does not exist. Create it:
cql_runner.execute_with_retries(f'USE "{cql_runner.keyspace}"', [])
create_table(cql_runner, "migration")
# And mark the dummy initial migration as done, as it corresponds to the schema
# of the last swh-storage version before adding the 'migrations' table.
# Other migrations could not have run before this is done.
(migration,) = [
migration for migration in MIGRATIONS if migration.id == "2024-12-12_init"
]
migration_row = MigrationRow(
id=migration.id,
dependencies=migration.dependencies,
min_read_version=migration.min_read_version,
status=MigrationStatus.COMPLETED.value,
)
cql_runner.migration_add_concurrent([migration_row])
logger.info("'migrations' table created.")