Source code for swh.objstorage.backends.winery.roshard

# Copyright (C) 2021-2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from collections import Counter
import logging
import os
import random
import shlex
import socket
import stat
import subprocess
import time
from types import TracebackType
from typing import Callable, Dict, Literal, Optional, Type

from systemd.daemon import notify

from swh.shard import Shard, ShardCreator

from .pools import Pool
from .sharedbase import ShardState, SharedBase
from .sleep import sleep_exponential

logger = logging.getLogger(__name__)


[docs] class ShardNotMapped(Exception): pass
[docs] def record_shard_mapped(base: SharedBase, shard_name: str): """Record a shard as mapped, bailing out after a few attempts. Multiple attempts are used to handle a race condition when two hosts attempt to record the shard as mapped at the same time. In this situation, one of the two hosts will succeed and the other one will fail, the sleep delay can be kept short and linear. """ outer_exc = None for attempt in range(5): try: base.record_shard_mapped(host=socket.gethostname(), name=shard_name) break except Exception as exc: outer_exc = exc logger.warning("Failed to mark shard %s as mapped, retrying...", shard_name) time.sleep(attempt + 1) else: assert outer_exc is not None raise outer_exc
[docs] def manage_images( pool: Pool, base_dsn: str, manage_rw_images: bool, wait_for_image: Callable[[int], None], stop_running: Callable[[], bool], only_prefix: Optional[str] = None, application_name: Optional[str] = None, ) -> None: """Manage RBD image creation and mapping automatically. Arguments: base_dsn: the DSN of the connection to the SharedBase manage_rw_images: whether RW images should be created and mapped wait_for_image: function which is called at each loop iteration, with an attempt number, if no images had to be mapped recently stop_running: callback that returns True when the manager should stop running only_prefix: only map images with the given name prefix application_name: the application name sent to PostgreSQL """ application_name = application_name or "Winery RBD image manager" base = SharedBase(base_dsn=base_dsn, application_name=application_name) mapped_images: Dict[str, Literal["ro", "rw"]] = {} attempt = 0 notified_systemd = False while not stop_running(): did_something = False logger.debug("Listing shards") start = time.monotonic() shards = [ (shard_name, shard_state) for shard_name, shard_state in base.list_shards() if not only_prefix or shard_name.startswith(only_prefix) ] random.shuffle(shards) if logger.isEnabledFor(logging.DEBUG): logger.debug( "Listed %d shards in %.02f seconds", len(shards), time.monotonic() - start, ) logger.debug("Mapped images: %s", Counter(mapped_images.values())) for shard_name, shard_state in shards: mapped_state = mapped_images.get(shard_name) if mapped_state == "ro": if shard_state == ShardState.PACKED: record_shard_mapped(base, shard_name) continue elif shard_state.image_available: check_mapped = pool.image_mapped(shard_name) if check_mapped == "ro": logger.debug( "Detected %s shard %s, already mapped read-only", shard_state.name, shard_name, ) elif check_mapped == "rw": logger.info( "Detected %s shard %s, remapping read-only", shard_state.name, shard_name, ) pool.image_remap_ro(shard_name) attempt = 0 while pool.image_mapped(shard_name) != "ro": attempt += 1 time.sleep(0.1) if attempt % 100 == 0: logger.warning( "Waiting for %s shard %s to be remapped " "read-only (for %ds)", shard_state.name, shard_name, attempt / 10, ) record_shard_mapped(base, shard_name) did_something = True else: logger.debug( "Detected %s shard %s, mapping read-only", shard_state.name, shard_name, ) pool.image_map(shard_name, options="ro") record_shard_mapped(base, shard_name) did_something = True mapped_images[shard_name] = "ro" elif manage_rw_images: if os.path.exists(pool.image_path(shard_name)): # Image already mapped, nothing to do pass elif not pool.image_exists(shard_name): logger.info( "Detected %s shard %s, creating RBD image", shard_state.name, shard_name, ) pool.image_create(shard_name) did_something = True else: logger.warning( "Detected %s shard %s and RBD image exists, mapping read-write", shard_state.name, shard_name, ) pool.image_map(shard_name, "rw") did_something = True # Now the shard is mapped mapped_images[shard_name] = "rw" else: logger.debug("%s shard %s, skipping", shard_state.name, shard_name) notify( "STATUS=" f"Enumerated {len(shards)} shards, " f"mapped {len(mapped_images)} images" ) if not notified_systemd: # The first iteration has happened, all known shards should be ready notify("READY=1") notified_systemd = True if did_something: attempt = 0 else: # Sleep using the current value wait_for_image(attempt) attempt += 1
[docs] class ROShard: def __init__(self, name, pool): self.pool = pool image_status = self.pool.image_mapped(name) if image_status != "ro": raise ShardNotMapped( f"RBD image for {name} isn't mapped" f"{' read-only' if image_status == 'rw' else ''}" ) self.name = name self.path = self.pool.image_path(self.name) self.shard = None self.open() logger.debug("ROShard %s: loaded", self.name)
[docs] def open(self): try: self.shard = Shard(self.path) except FileNotFoundError: raise ShardNotMapped(f"RBD image for {self.name} not found at {self.path}")
[docs] def get(self, key): if not self.shard: self.open() return self.shard.lookup(key)
[docs] def close(self): if shard := getattr(self, "shard", None): shard.close() self.shard = None
def __del__(self): self.close()
[docs] @staticmethod def delete(pool, shard_name, obj_id): image_status = pool.image_mapped(shard_name) if image_status == "ro": raise PermissionError( f"Cannot delete object from {shard_name}, mapped read-only" ) if not image_status: pool.image_map(shard_name, options="rw") Shard.delete(pool.image_path(shard_name), obj_id)
[docs] class ROShardCreator: """Helper for Read-Only shard creation. Arguments: name: Name of the shard to be initialized count: Number of objects to provision in the shard rbd_create_images: whether the ROShardCreator should create the rbd image, or delegate to the rbd_shard_manager rbd_wait_for_image: function called when waiting for a shard to be mapped shard_max_size: the size of the shard, passed to :class:`Pool` rbd_*: other RBD-related :class:`Pool` arguments """ def __init__( self, name: str, count: int, pool: Pool, rbd_create_images: bool = True, rbd_wait_for_image: Callable[[int], None] = sleep_exponential( min_duration=5, factor=2, max_duration=60, message="Waiting for RBD image mapping", ), **kwargs, ): self.pool = pool self.name = name self.count = count self.path = self.pool.image_path(self.name) self.rbd_create_images = rbd_create_images self.rbd_wait_for_image = rbd_wait_for_image def __enter__(self) -> "ROShardCreator": if self.rbd_create_images: self.pool.image_create(self.name) else: attempt = 0 while not os.path.exists(self.path): self.rbd_wait_for_image(attempt) attempt += 1 self.zero_image_if_needed() self.shard = ShardCreator(self.path, self.count) logger.debug("ROShard %s: created", self.name) self.shard.__enter__() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: if exc_type is None: self.shard.__exit__(exc_type, exc_val, exc_tb) if self.rbd_create_images: self.pool.image_remap_ro(self.name)
[docs] def zero_image_if_needed(self): """Check whether the image is empty, and zero it out if it's not. We really check only the first 1kB, as we assume that the SWHShard marker will have been written at the beginning of the image under all circumstances if the RO Shard creation has been interrupted. """ with open(self.path, "rb") as f: start = f.read(1024) if not start or set(start) == {0}: return logger.warning("RO Shard %s isn't empty, cleaning it up", self.path) st = os.stat(self.path) if stat.S_ISBLK(st.st_mode): # Block device, use DISCARD command = ["/usr/sbin/blkdiscard", self.path] else: # Regular file, use fallocate --punch-hole command = [ "/usr/bin/fallocate", "--punch-hole", "-l", str(st.st_size), self.path, ] try: subprocess.run(command, check=True, capture_output=True) except subprocess.CalledProcessError: logger.warning("%s failed:", shlex.join(command), self.path, exc_info=True)
[docs] def add(self, content, obj_id): return self.shard.write(obj_id, content)