Source code for swh.objstorage.backends.winery.cli

# Copyright (C) 2015-2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from datetime import timedelta
import logging
import re
from types import FrameType
from typing import TYPE_CHECKING, Callable, Dict, List, Tuple

import click

from swh.objstorage.cli import objstorage_cli_group

if TYPE_CHECKING:
    from datetime import datetime

    from .sharedbase import ShardState

# WARNING: do not import unnecessary things here to keep cli startup time under
# control


logger = logging.getLogger(__name__)


[docs] def install_signal_handlers(signal_handler: Callable[[int, FrameType | None], None]): """Install the signal handler for SIGINT and SIGTERM""" import os # This is critical for tests not to hang... See, the way ServerTestFixture # -- used in rpc/api tests -- is shutting down the flask server on teardown # is by calling process.terminate()... So if some other test (e.g. in # test_objstorage_winery) call this command BEFORE executing rpc/api tests, # then the SIGTERM aiming at the Flask server will be caught by this very # 'set_signal_received' hook if we install it here... if os.environ.get("PYTEST_VERSION") is not None: return import signal for signum in (signal.SIGINT, signal.SIGTERM): signal.signal(signum, signal_handler)
@objstorage_cli_group.group("winery") @click.pass_context def winery(ctx): "Winery related commands" config = ctx.obj["config"]["objstorage"] if config["cls"] != "winery": raise click.ClickException("winery packer only works on a winery objstorage") from swh.objstorage.backends.winery.settings import ( SETTINGS, populate_default_settings, ) ctx.obj["winery_settings"] = populate_default_settings( **{k: v for k, v in config.items() if k in SETTINGS} ) @winery.command("packer") @click.option("--stop-after-shards", type=click.INT, default=None) @click.pass_context def winery_packer(ctx, stop_after_shards: int | None = None): """Run the winery packer process This process is in charge of creating (packing) shard files when a winery writer has accumulated enough file objects to reach the shard's `max_size` size. When a shard becomes full, it gets locked by this packer service. The shard creation can then occur either as part of the packing step (within this process) when `create_images` configuration option is set, or waited for (in this case, the shard creation processing is delegated to the shard managenent tool, aka `swh objstorage winery rdb`). When the shard file is ready, the shard gets packed. Note: when using a `cls: directory` type for `shards_pool` configuration, it is advisable to set `create_images` to True; the `rdb` management process is then unnecessary (when writing directly in shard files, there is no need for provisionning the RDB volume). """ import signal from swh.objstorage.backends.winery.housekeeping import AbortOperation, shard_packer settings = ctx.obj["winery_settings"] signal_stop = False signal_abort = False def stop_packing(num_shards: int) -> bool: """Stop packing when a signal is received or when stop_after_shards is reached""" return signal_stop or ( stop_after_shards is not None and num_shards >= stop_after_shards ) def abort_packing(num_shards: int) -> bool: """Abort packing when a signal is received.""" return signal_abort def set_signal_received(signum: int, _stack_frame: FrameType | None) -> None: nonlocal signal_abort nonlocal signal_stop if signum == signal.SIGTERM: signal_abort = True else: signal_stop = True logger.warning( "Received signal %s, %s", signal.strsignal(signum), signal_stop and "exiting" or "aborting", ) install_signal_handlers(set_signal_received) logger.info("Image packer starting") try: ret = shard_packer( **settings, stop_packing=stop_packing, abort_packing=abort_packing ) logger.info("Packed %s shards", ret) except AbortOperation: logger.warning("Packing aborted, exiting") @winery.command("rbd") @click.option("--stop-instead-of-waiting", is_flag=True) @click.option("--manage-rw-images", is_flag=True) @click.option("--only-prefix") @click.pass_context def winery_rbd( ctx, stop_instead_of_waiting: bool = False, manage_rw_images: bool = True, only_prefix: str | None = None, ): """Run a winery RBD image manager process This process is in charge of creating and mapping image files for shards. This is required for `shards_pool` of type `cls: rbd`. It will: - Map all `readonly` shards (if need be). - If `manage_rw_images` is true, provision a new RBD image in the Ceph cluster each time a shard appears in the `standby` or `writing` state. - When a shard packing completes (shrd status becomes one of `packed`, `cleaning`, `readonly`), the image is mapped read-only. - Record mapping event in the database. """ import signal from swh.objstorage.backends.winery.pools import pool_from_settings from swh.objstorage.backends.winery.roshard import manage_images from swh.objstorage.backends.winery.sleep import sleep_exponential settings = ctx.obj["winery_settings"] stop_on_next_iteration = False def stop_running() -> bool: """Stop running when a signal is received, or when there's nothing to do.""" return stop_on_next_iteration def wait_for_image(attempt: int): nonlocal stop_on_next_iteration if stop_instead_of_waiting: stop_on_next_iteration = True return return sleep_exponential( min_duration=1, max_duration=60, factor=2, message="No new RBD images", )(attempt) def set_signal_received(signum: int, _stack_frame: FrameType | None) -> None: nonlocal stop_on_next_iteration logger.warning("Received signal %s, exiting", signal.strsignal(signum)) stop_on_next_iteration = True install_signal_handlers(set_signal_received) pool = pool_from_settings( shards_settings=settings["shards"], shards_pool_settings=settings["shards_pool"], ) manage_images( pool=pool, base_dsn=settings["database"]["db"], manage_rw_images=manage_rw_images, wait_for_image=wait_for_image, only_prefix=only_prefix, stop_running=stop_running, ) logger.info("Image manager exiting") @winery.command("rw-shard-cleaner") @click.option("--stop-after-shards", type=click.INT, default=None) @click.option("--stop-instead-of-waiting", is_flag=True) @click.option( "--min-mapped-hosts", type=click.INT, default=1, help="Number of hosts on which the image should be mapped read-only before cleanup", ) @click.pass_context def winery_rw_shard_cleaner( ctx, stop_after_shards: int | None = None, stop_instead_of_waiting: bool = False, min_mapped_hosts: int = 1, ): """Run the winery database image manager process This process is responsible for cleaning winery DB tables for shards that have been packed. It performs clean up of the `packed` read-write shards, as soon as they are recorded as mapped on enough (`--min-mapped-hosts`) hosts (when using a rbd shards pool). They get locked in the `cleaning` state, the database cleanup is performed, then the shard gets moved in the final `readonly` state. This process should run continuously as a background process. """ import signal from swh.objstorage.backends.winery.housekeeping import rw_shard_cleaner from swh.objstorage.backends.winery.sleep import sleep_exponential settings = ctx.obj["winery_settings"] stop_on_next_iteration = False def stop_cleaning(num_shards: int) -> bool: """Stop running when requested, or when the max number of shards was reached.""" return ( stop_after_shards is not None and num_shards >= stop_after_shards ) or stop_on_next_iteration def wait_for_shard(attempt: int): nonlocal stop_on_next_iteration if stop_instead_of_waiting: stop_on_next_iteration = True return return sleep_exponential( min_duration=1, max_duration=60, factor=2, message="No shards to clean up", )(attempt) def set_signal_received(signum: int, _stack_frame: FrameType | None) -> None: nonlocal stop_on_next_iteration logger.warning("Received signal %s, exiting", signal.strsignal(signum)) stop_on_next_iteration = True install_signal_handlers(set_signal_received) ret = rw_shard_cleaner( database=settings["database"], min_mapped_hosts=min_mapped_hosts, stop_cleaning=stop_cleaning, wait_for_shard=wait_for_shard, ) logger.info("RW shard cleaner exiting, %d shards cleaned", ret) @winery.command("clean-deleted-objects") @click.pass_context def winery_clean_deleted_objects(ctx): """Clean deleted objects from Winery""" import signal from swh.objstorage.backends.winery.housekeeping import deleted_objects_cleaner from swh.objstorage.backends.winery.pools import pool_from_settings from swh.objstorage.backends.winery.sharedbase import SharedBase settings = ctx.obj["winery_settings"] stop_on_next_iteration = False def stop_running() -> bool: """Stop running when a signal is received, or when there's nothing to do.""" return stop_on_next_iteration def set_signal_received(signum: int, _stack_frame: FrameType | None) -> None: nonlocal stop_on_next_iteration logger.warning("Received signal %s, exiting", signal.strsignal(signum)) stop_on_next_iteration = True install_signal_handlers(set_signal_received) base = SharedBase(base_dsn=settings["database"]["db"]) pool = pool_from_settings( shards_settings=settings["shards"], shards_pool_settings=settings["shards_pool"], ) deleted_objects_cleaner(base, pool, stop_running) # Stolen from pypi's click_pendulum 0.2.1 package: # swh:1:rel:51dbe55356f79c1d87662fe77a40ee5c28074e67; # origin=https://pypi.org/project/click-pendulum/; # visit=swh:1:snp:9ff3b835b09011a44c27baec70e3e3f8cb9e114a # author: Dawson Reid (@ddaws), MIT License. # Adapted to straight datetime. # Note: Could probably migrate to swh-core at some point.
[docs] class Duration(click.ParamType): """A Duration object. The pattern used for matching must include the following named groups: - years: Matches the number of years. - weeks: Matches the number of weeks. - days: Matches the number of days. - hours: Matches the number of hours. - minutes: Matches the number of minutes. - seconds: Matches the number of seconds. Each group is optional, but the pattern must be structured to capture these groups if present. """ name = "duration" DEFAULT_PATTERN = ( r"(?:(?P<years>\d+)\s*y(?:ears?)?)?\s*" r"(?:(?P<weeks>\d+)\s*w(?:eeks?)?)?\s*" r"(?:(?P<days>\d+)\s*d(?:ays?)?)?\s*" r"(?:(?P<hours>\d+)\s*h(?:ours?)?)?\s*" r"(?:(?P<minutes>\d+)\s*m(?:inutes?)?)?\s*" r"(?:(?P<seconds>\d+)\s*s(?:econds?)?)?" ) _pattern: re.Pattern def __init__(self, pattern: str = DEFAULT_PATTERN): self._pattern = re.compile(pattern)
[docs] def convert(self, value: str | None, param, ctx) -> timedelta | None: if value is None: return value try: match = self._pattern.match(value) if not match or not match.group(0).strip(): raise ValueError("Invalid duration format: no matches found") duration_kwargs = { "weeks": int(match.group("weeks") or 0), "days": int(match.group("days") or 0), "hours": int(match.group("hours") or 0), "minutes": int(match.group("minutes") or 0), "seconds": int(match.group("seconds") or 0), } return timedelta(**duration_kwargs) except ValueError as ex: self.fail( f'Could not parse duration string "{value}" ({ex})', param, ctx, )
[docs] def shards_by_locker( shards: List[Tuple[str, "ShardState", "datetime", str | None]], ) -> List[Tuple[str, List[Tuple[str, "ShardState", "datetime"]]]]: by_locker: Dict[str, List[Tuple[str, "ShardState", "datetime"]]] = {} for name, state, locker_ts, locker in shards: if locker is None: locker = "N/A" by_locker.setdefault(locker, []).append((name, state, locker_ts)) return sorted(by_locker.items())
@winery.command("list-open-shards") @click.option( "--state", "-s", type=click.Choice(["standby", "writing", "full", "packing", "packed", "cleaning"]), help="Only list shards in the given state (rather than all non-readonly shards)", default=None, ) @click.option( "--long", "-l", is_flag=True, help="Long output (can be slow)", default=False, ) @click.option( "--humanize/--no-humanize", "humanize_results", is_flag=True, help="Do / do not humalize results", default=True, ) @click.pass_context def winery_list_open_shards(ctx, state, long, humanize_results): """List open shards""" from datetime import UTC, datetime from humanize import intcomma, naturaldelta, naturalsize from swh.objstorage.backends.winery.rwshard import RWShard from swh.objstorage.backends.winery.sharedbase import ShardState, SharedBase settings = ctx.obj["winery_settings"] base = SharedBase(base_dsn=settings["database"]["db"]) max_size = settings["shards"]["max_size"] shardstate = ShardState(state) if state is not None else None shards = list(base.list_open_shards(state=shardstate)) if shards: click.echo("Open shards:") for locker, entries in shards_by_locker(shards): click.echo(f"{locker}:") for name, state, locker_ts in entries: since = "" if locker_ts is not None: since = f" since {naturaldelta(datetime.now(UTC) - locker_ts)}" extra = "" if long: try: rwshard = RWShard( name=name, base_dsn=base.dsn, readonly=True, shard_max_size=0, ) n = rwshard.obj_count.count size = rwshard.obj_count.volume if size >= max_size: full = "full" else: full = f"{size / max_size * 100.0:.2f}%" if humanize_results: extra = ( f", N={intcomma(n)}, size={naturalsize(size)} ({full})" ) else: extra = f", N={n}, size={size} ({full})" except Exception: logger.warning( f"Failed to retrieve detailed information on {name}" ) click.echo(f" {name}: {state.name}{extra}{since}") else: if state is not None: click.echo(f"No shard in the state '{state}'") else: click.echo("No open shard") @winery.command("list-stale-shards") @click.option( "--duration", "-d", type=Duration(), help="How long the shard must have been stuck in its state to be considered as stale", default="48h", ) @click.option( "--humanize/--no-humanize", "humanize_results", is_flag=True, help="Do / do not humalize results", default=True, ) @click.pass_context def winery_list_stale_shards(ctx, duration, humanize_results): """List open shards that look stale for some reason""" from datetime import UTC, datetime from humanize import naturaldelta from swh.objstorage.backends.winery.sharedbase import SharedBase settings = ctx.obj["winery_settings"] base = SharedBase(base_dsn=settings["database"]["db"]) shards = list(base.list_stale_shards(delay=duration)) if shards: click.echo("Potentially stale shards:") for locker, entries in shards_by_locker(shards): click.echo(f"{locker}:") for name, state, locker_ts in entries: since = datetime.now(UTC) - locker_ts if humanize_results: click.echo(f" {name}: {state.name} since {naturaldelta(since)}") else: click.echo(f" {name}: {state.name} since {since}") else: click.echo("No identified stale shard") @winery.command("release-stale-shards") @click.option("--shard", "shard_ids", help="shard name to release", multiple=True) @click.option( "--locker", "lockers", help="limit shards to release to this locker ID", multiple=True, ) @click.option( "--duration", "-d", help="How long the shard must have been stuck in its state to be considered as stale", type=Duration(), default="48h", ) @click.option( "--dry-run", help="Do not perform the state change", is_flag=True, default=False ) @click.pass_context def winery_release_stale_shards(ctx, shard_ids, lockers, duration, dry_run): """Release WRITING shards that look stale""" from datetime import UTC, datetime from humanize import naturaldelta from swh.objstorage.backends.winery.sharedbase import ShardState, SharedBase settings = ctx.obj["winery_settings"] base = SharedBase(base_dsn=settings["database"]["db"]) dst_state = { ShardState.WRITING: ShardState.STANDBY, ShardState.PACKING: ShardState.FULL, ShardState.CLEANING: ShardState.PACKED, } shards = list(base.list_stale_shards(delay=duration)) if lockers: shards = [shard for shard in shards if str(shard[3]) in lockers] if shard_ids: shards = [shard for shard in shards if shard[0] in shard_ids] if shards: if dry_run: click.echo("Would release (dry run):") else: click.echo("Releasing:") for locker, entries in shards_by_locker(shards): click.echo(f"{locker}:") for name, state, locker_ts in entries: since = naturaldelta(datetime.now(UTC) - locker_ts) if state not in dst_state: click.echo(f" {name} is in unexpected state {state}, ignoring") continue dst = dst_state[state] if not dry_run: click.echo( f" {name} stuck in {state.name} for {since} --> {dst.name}" ) base.set_shard_state(new_state=dst, name=name) else: click.echo( f" {name} stuck in {state.name} for {since} --> {dst.name}" ) else: click.echo("No identified stale shards to release") @winery.command("import-shards") @click.pass_context def winery_import_shards(ctx): """Populate the winery database from existing shard files""" from swh.objstorage.backends.winery.housekeeping import import_ro_shards from swh.objstorage.backends.winery.pools import pool_from_settings from swh.objstorage.backends.winery.sharedbase import SharedBase settings = ctx.obj["winery_settings"] if settings["shards_pool"]["type"] != "directory": raise click.ClickException("winery import only works on a directory shard_pool") pool = pool_from_settings( shards_settings=settings["shards"], shards_pool_settings=settings["shards_pool"], ) base = SharedBase(base_dsn=settings["database"]["db"]) n_obj, n_shard = import_ro_shards(base, pool) if n_obj: click.echo( "Pool %s: imported %s objects from %s shards" % (pool.pool_name, n_obj, n_shard) ) else: click.echo("Pool %s: nothing to do" % (pool.pool_name,))