Source code for swh.objstorage.backends.winery.housekeeping

# Copyright (C) 2022-2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
from time import monotonic
from typing import Callable, Iterable, Optional, Tuple

from psycopg.errors import UniqueViolation

from swh.core.statsd import statsd
from swh.core.utils import grouper
from swh.shard import Shard
from swh.shard.cli import NULLKEY

from . import roshard, settings
from .pools import Pool, pool_from_settings
from .rwshard import RWShard
from .sharedbase import ShardState, SharedBase
from .sleep import sleep_exponential

logger = logging.getLogger(__name__)


[docs] class AbortOperation(Exception): pass
[docs] def never_stop(_: int) -> bool: return False
[docs] def stop_after_shards(max_shards_packed: int) -> Callable[[int], bool]: def stop(shards_packed: int): return shards_packed >= max_shards_packed return stop
[docs] def shard_packer( database: settings.Database, shards: settings.Shards, shards_pool: settings.ShardsPool, packer: Optional[settings.Packer] = None, stop_packing: Callable[[int], bool] = never_stop, abort_packing: Callable[[int], bool] = never_stop, wait_for_shard: Callable[[int], None] = sleep_exponential( min_duration=5, factor=2, max_duration=60, message="No shards to pack", ), ) -> int: """Pack shards until the `stop_packing` function returns True. When no shards are available for packing, call the `wait_for_shard` function. Arguments: database: database settings (e.g. db connection string) shards: shards settings (e.g. max_size) shards_pool: shards pool settings (e.g. Ceph RBD settings) packer: packer settings stop_packing: callback to determine whether the packer should exit abort_packing: callback to determine whether the packer should abort wait_for_shard: sleep function called when no shards are available to be packed """ all_settings = settings.populate_default_settings( database=database, shards=shards, shards_pool=shards_pool, packer=(packer or {}), ) application_name = ( all_settings["database"]["application_name"] or "Winery Shard Packer" ) base = SharedBase( base_dsn=all_settings["database"]["db"], application_name=application_name, ) shards_packed = 0 waited_for_shards = 0 while not stop_packing(shards_packed): locked = base.maybe_lock_one_shard( current_state=ShardState.FULL, new_state=ShardState.PACKING ) if not locked: wait_for_shard(waited_for_shards) waited_for_shards += 1 continue waited_for_shards = 0 with locked: if locked.name is None: raise RuntimeError("No shard has been locked?") logger.info("shard_packer: Locked shard %s to pack", locked.name) ret = pack( shard=locked.name, base_dsn=all_settings["database"]["db"], packer_settings=all_settings["packer"], shards_settings=all_settings["shards"], shards_pool_settings=all_settings["shards_pool"], shared_base=base, abort_packing=abort_packing, ) if not ret: raise ValueError("Packing shard %s failed" % locked.name) shards_packed += 1 return shards_packed
[docs] def pack( shard: str, base_dsn: str, packer_settings: settings.Packer, shards_settings: settings.Shards, shards_pool_settings: settings.ShardsPool, shared_base: Optional[SharedBase] = None, abort_packing: Callable[[int], bool] = never_stop, ) -> bool: rw = RWShard(shard, shard_max_size=shards_settings["max_size"], base_dsn=base_dsn) if not shared_base: shared_base = SharedBase(base_dsn=base_dsn) count = rw.count() logger.info("Creating RO shard %s for %s objects", shard, count) pool = pool_from_settings( shards_settings=shards_settings, shards_pool_settings=shards_pool_settings ) statsd.gauge( "swh_objstorage_winery_packer_shard_max_size_bytes", shards_settings["max_size"] ) tags = {"pool_name": pool.pool_name} t0 = monotonic() with statsd.timed("swh_objstorage_winery_packer_seconds", tags=tags): with roshard.ROShardCreator( name=shard, count=count, pool=pool, rbd_create_images=packer_settings["create_images"], ) as ro: logger.info("Created RO shard %s", shard) for i, (obj_id, content) in enumerate(rw.all()): ro.add(content, obj_id) if abort_packing(i): logger.info("Aborting packing of %s", shard) raise AbortOperation("Packing shard %s aborted" % shard) if i % 100 == 99: logger.debug( "RO shard %s: added %s/%s objects", shard, i + 1, count ) statsd.increment("swh_objstorage_winery_packer_objects", tags=tags) statsd.increment( "swh_objstorage_winery_packer_volume_bytes", value=len(content), tags=tags, ) logger.debug("RO shard %s: added %s objects, saving", shard, count) logger.info("RO shard %s: saved (in %ds)", shard, monotonic() - t0) shared_base.shard_packing_ends(shard) if packer_settings.get("clean_immediately"): logger.warning( "clean_immediately has been disabled. Please use a " "'swh objstorage winery rw-shard-cleaner' service instead. " "Cleaning will NOT be executed now." ) return True
[docs] def rw_shard_cleaner( database: settings.Database, min_mapped_hosts: int, stop_cleaning: Callable[[int], bool] = never_stop, wait_for_shard: Callable[[int], None] = sleep_exponential( min_duration=5, factor=2, max_duration=60, message="No shards to clean up", ), ) -> int: """Clean up RW shards until the `stop_cleaning` function returns True. When no shards are available for packing, call the `wait_for_shard` function. Arguments: database: database settings (e.g. db connection string) min_mapped_hosts: how many hosts should have mapped the image read-only before cleaning it stop_cleaning: callback to determine whether the cleaner should exit wait_for_shard: sleep function called when no shards are available to be cleaned """ database = settings.database_settings_with_defaults(database) base = SharedBase(base_dsn=database["db"]) shards_cleaned = 0 waited_for_shards = 0 while not stop_cleaning(shards_cleaned): locked = base.maybe_lock_one_shard( current_state=ShardState.PACKED, new_state=ShardState.CLEANING, min_mapped_hosts=min_mapped_hosts, ) if not locked: wait_for_shard(waited_for_shards) waited_for_shards += 1 continue waited_for_shards = 0 with locked: logger.info("rw_shard_cleaner: Locked shard %s to clean", locked.name) ret = cleanup_rw_shard( locked.name, base_dsn=base.dsn, ) if not ret: raise ValueError("Cleaning shard %s failed" % locked.name) shards_cleaned += 1 return shards_cleaned
[docs] def cleanup_rw_shard(shard, base_dsn) -> bool: rw = RWShard(name=shard, shard_max_size=0, base_dsn=base_dsn) rw.drop() shared_base = SharedBase(base_dsn=base_dsn) shared_base.set_shard_state(name=shard, new_state=ShardState.READONLY) return True
[docs] def deleted_objects_cleaner( base: SharedBase, pool: Pool, stop_running: Callable[[], bool], ): """Clean up deleted objects from RO shards and the shared database. This requires the ability to map RBD images in read-write mode. Images will be left mapped by this process as it is meant to be executed in a transient host dedicated to this purpose. Arguments: base_dsn: PostgreSQL dsn for the shared database pool: Ceph RBD pool for Winery shards stop_running: callback that returns True when the manager should stop running """ count = 0 for obj_id, shard_name, shard_state in base.deleted_objects(): if stop_running(): break if shard_state.readonly: roshard.ROShard.delete(pool, shard_name, obj_id) base.clean_deleted_object(obj_id) count += 1 logger.info("Cleaned %d deleted objects", count)
[docs] def import_ro_shards( base: SharedBase, pool: Pool, shards: Iterable[str] | None = None ) -> Tuple[int, int]: """Import existing shard files in the winery database.""" n_obj = 0 n_shard = 0 if not shards: shards = pool.image_list() for imgname in shards: with Shard(pool.image_path(imgname)) as s: if base.get_shard_state(name=imgname) is not None: logger.info(f"Shard {imgname} already exists, skipping") continue try: base._locked_shard = base.create_shard(ShardState.PACKING, name=imgname) except UniqueViolation: # Should not happen, but sh*t happen, so better safe than sorry # The shard already exists in the winery DB, skip it logger.info(f"Shard {imgname} already exists, skipping") # TODO: check stored entries match? continue with base.pool.connection() as db, db.transaction(): for keys in grouper(s, 10000): keys = [key for key in keys if key != NULLKEY] known = [key for key in keys if base.contains(key)] if known: logger.info( "Keys %s are already known, skipping", [key.hex() for key in known], ) base.record_new_obj_ids( db, [key for key in keys if key not in known] ) n_obj += len(keys) - len(known) base.shard_packing_ends(imgname) n_shard += 1 base.set_shard_state(name=imgname, new_state=ShardState.READONLY) pool.image_map(imgname, options="ro") return n_obj, n_shard