# Copyright (C) 2021-2026 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import math
import os
from pathlib import Path
import subprocess
from typing import Iterable, List, Literal, Optional, Protocol, Tuple
from . import settings
logger = logging.getLogger(__name__)
[docs]
class Pool(Protocol):
pool_name: str
[docs]
def image_exists(self, image: str) -> bool:
"""Check whether the named image exists (it does not have to be mapped)"""
...
[docs]
def image_mapped(self, image: str) -> Optional[Literal["ro", "rw"]]:
"""Check whether the image is already mapped, read-only or read-write"""
try:
image_stat = os.stat(self.image_path(image))
except FileNotFoundError:
return None
return "rw" if (image_stat.st_mode & 0o222) != 0 else "ro"
[docs]
def image_list(self) -> List[str]:
"""List all known images, mapped or not"""
...
[docs]
def image_path(self, image: str) -> str:
"""Return a path to the image, that can be opened with :func:`open`."""
...
[docs]
def image_create(self, image: str) -> None:
"""Create a new image named `image` and allocate the right amount of space."""
...
[docs]
def image_map(self, image: str, options: str) -> None:
"""Map an image for use. Options can be `"ro"` to map the image read-only, or
`"rw"` to map the image read-write."""
...
[docs]
def image_unmap(self, image: str) -> None:
"""Unmap the image. Once this is done, the image is unavailable for use."""
...
[docs]
def image_remap_ro(self, image: str):
self.image_unmap(image)
self.image_map(image, "ro")
[docs]
class RBDPool(Pool):
"""Manage a Ceph RBD pool for Winery shards.
Arguments:
shard_max_size: max size of shard contents
rbd_use_sudo: whether to use sudo for rbd commands
rbd_pool_name: name of the pool used for RBD images (metadata)
rbd_data_pool_name: name of the pool used for RBD images (data)
rbd_image_features_unsupported: features not supported by the kernel
mounting the rbd images
rbd_map_options: options to pass to ``rbd device map``, e.g.
``ms_mode=prefer-secure`` to connect to a ceph cluster with encryption
enabled
"""
def __init__(
self,
shard_max_size: int,
rbd_use_sudo: bool = True,
rbd_pool_name: str = "shards",
rbd_data_pool_name: Optional[str] = None,
rbd_image_features_unsupported: Tuple[
str, ...
] = settings.DEFAULT_IMAGE_FEATURES_UNSUPPORTED,
rbd_map_options: str = "",
) -> None:
self.use_sudo = rbd_use_sudo
self.pool_name = rbd_pool_name
self.data_pool_name = rbd_data_pool_name or f"{self.pool_name}-data"
self.features_unsupported = rbd_image_features_unsupported
self.map_options = rbd_map_options
self.image_size = math.ceil((shard_max_size * 2) / (1024 * 1024))
POOL_CONFIG: Tuple[str, ...] = (
"shard_max_size",
"rbd_use_sudo",
"rbd_pool_name",
"rbd_data_pool_name",
"rbd_image_features_unsupported",
"rbd_map_options",
)
[docs]
@classmethod
def from_kwargs(cls, **kwargs) -> "RBDPool":
"""Create a Pool from a set of arbitrary keyword arguments"""
return cls(**{k: kwargs[k] for k in cls.POOL_CONFIG if k in kwargs})
[docs]
def run(self, *cmd: str) -> Iterable[str]:
"""Run the given command, and return its output as lines.
Return: the standard output of the run command
Raises: CalledProcessError if the command doesn't exit with exit code 0.
"""
sudo = ("sudo",) if self.use_sudo else ()
cmd = sudo + cmd
logger.debug(" ".join(repr(item) if " " in item else item for item in cmd))
result = subprocess.check_output(cmd, encoding="utf-8", stderr=subprocess.PIPE)
return result.splitlines()
[docs]
def rbd(self, *arguments: str) -> Iterable[str]:
"""Run rbd with the given arguments"""
return self.run("rbd", f"--pool={self.pool_name}", *arguments)
[docs]
def image_exists(self, image: str):
try:
self.rbd("info", image)
except subprocess.CalledProcessError:
return False
else:
return True
[docs]
def image_list(self):
try:
images = self.rbd("ls")
except subprocess.CalledProcessError as exc:
if exc.returncode == 2 and "No such file or directory" in exc.stderr:
return []
else:
raise
return [image.strip() for image in images]
[docs]
def image_path(self, image: str) -> str:
return f"/dev/rbd/{self.pool_name}/{image}"
[docs]
def image_create(self, image: str):
self.rbd(
"create",
f"--size={self.image_size}",
f"--data-pool={self.data_pool_name}",
image,
)
if self.features_unsupported:
self.rbd(
"feature",
"disable",
f"{self.pool_name}/{image}",
*self.features_unsupported,
)
self.image_map(image, "rw")
[docs]
def image_map(self, image: str, options: str):
self.rbd(
"device",
"map",
"-o",
f"{options},{self.map_options}" if self.map_options else options,
image,
)
[docs]
def image_unmap(self, image: str):
if os.path.exists(self.image_path(image)):
try:
self.rbd("device", "unmap", self.image_path(image))
except subprocess.CalledProcessError as exc:
if exc.returncode == 22 and "Invalid argument" in exc.stderr:
logger.warning(
"Image %s already unmapped? stderr: %s", image, exc.stderr
)
else:
raise
[docs]
class FileBackedPool(Pool):
"""File-backed pool for Winery shards mimicking a Ceph RBD pool.
Unmapped images are represented by setting the file permission to 0o000.
"""
def __init__(
self,
base_directory: Path,
pool_name: str,
shard_max_size: int,
) -> None:
self.base_directory = base_directory
self.pool_name = pool_name
self.image_size = shard_max_size
self.pool_dir = self.base_directory / self.pool_name
self.pool_dir.mkdir(exist_ok=True)
[docs]
def image_exists(self, image: str) -> bool:
return (self.pool_dir / image).is_file()
[docs]
def image_list(self) -> List[str]:
return [entry.name for entry in self.pool_dir.iterdir() if entry.is_file()]
[docs]
def image_path(self, image: str) -> str:
return str(self.pool_dir / image)
[docs]
def image_create(self, image: str) -> None:
path = self.image_path(image)
if os.path.exists(path):
if os.stat(path).st_mode == 0o100600:
# If the image exists but is -rw------- it is expected to be a
# dandling/stale shard file left by a crashed/aborted packing
# process
logger.warning("Stale image found. Reusing it")
else:
raise ValueError(f"Image {image} already exists")
open(path, "w").close()
self.image_map(image, "rw")
[docs]
def image_map(self, image: str, options: str) -> None:
if "ro" in options:
os.chmod(self.image_path(image), 0o400)
else:
os.chmod(self.image_path(image), 0o600)
[docs]
def image_unmap(self, image: str) -> None:
os.chmod(self.image_path(image), 0o000)
[docs]
def image_unmap_all(self) -> None:
for entry in self.pool_dir.iterdir():
if entry.is_file():
entry.chmod(0o000)
[docs]
def pool_from_settings(
shards_settings: settings.Shards,
shards_pool_settings: settings.ShardsPool,
) -> Pool:
"""Return a Pool from the settings"""
pool_type = shards_pool_settings["type"]
if pool_type == "rbd":
rbd_settings = settings.rbd_shards_pool_settings_with_defaults(
shards_pool_settings
)
return RBDPool(
shard_max_size=shards_settings["max_size"],
rbd_use_sudo=rbd_settings["use_sudo"],
rbd_pool_name=rbd_settings["pool_name"],
rbd_data_pool_name=rbd_settings["data_pool_name"],
rbd_image_features_unsupported=rbd_settings["image_features_unsupported"],
rbd_map_options=rbd_settings["map_options"],
)
elif pool_type == "directory":
dir_settings = settings.directory_shards_pool_settings_with_defaults(
shards_pool_settings
)
return FileBackedPool(
shard_max_size=shards_settings["max_size"],
base_directory=Path(dir_settings["base_directory"]),
pool_name=dir_settings["pool_name"],
)
else:
raise ValueError(f"Unknown shards pool type: {pool_type}")