# Copyright (C) 2022-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from functools import partial
import logging
from multiprocessing import Process
from typing import Callable, Dict, Iterator, List, Optional
from swh.objstorage.constants import LiteralPrimaryHash
from swh.objstorage.exc import ObjNotFoundError, ReadOnlyObjStorageError
from swh.objstorage.interface import HashDict
from swh.objstorage.objstorage import ObjStorage, timed
from . import roshard, settings
from .rwshard import RWShard
from .sharedbase import ShardState, SharedBase
from .sleep import sleep_exponential
from .throttler import Throttler
logger = logging.getLogger(__name__)
[docs]
class WineryObjStorage(ObjStorage):
primary_hash: LiteralPrimaryHash = "sha256"
name: str = "winery"
def __init__(
self,
database: settings.Database,
shards: settings.Shards,
shards_pool: settings.ShardsPool,
throttler: settings.Throttler,
packer: Optional[settings.Packer] = None,
readonly: bool = False,
allow_delete: bool = False,
name: str = "winery",
) -> None:
super().__init__(allow_delete=allow_delete, name=name)
if self.primary_hash != "sha256":
raise TypeError("Winery backend only support the sha256 primary hash")
self.settings = settings.populate_default_settings(
database=database,
shards=shards,
shards_pool=shards_pool,
throttler=throttler,
packer=(packer or {}),
)
self.throttler = Throttler.from_settings(self.settings)
self.pool = roshard.pool_from_settings(
shards_settings=self.settings["shards"],
shards_pool_settings=self.settings["shards_pool"],
)
self.reader: WineryReader = WineryReader(
throttler=self.throttler, pool=self.pool, database=self.settings["database"]
)
self.writer: Optional[WineryWriter] = None
if not readonly:
self.writer = WineryWriter(
packer_settings=self.settings["packer"],
throttler_settings=self.settings.get("throttler"),
shards_settings=self.settings["shards"],
shards_pool_settings=self.settings["shards_pool"],
database_settings=self.settings["database"],
)
[docs]
@timed
def get(self, obj_id: HashDict) -> bytes:
try:
return self.reader.get(self._hash(obj_id))
except ObjNotFoundError as exc:
# re-raise exception with the passed obj_id instead of the internal winery obj_id.
raise ObjNotFoundError(obj_id) from exc
[docs]
def check_config(self, *, check_write: bool) -> bool:
return True
@timed
def __contains__(self, obj_id: HashDict) -> bool:
return self._hash(obj_id) in self.reader
[docs]
@timed
def add(
self, content: bytes, obj_id: HashDict, check_presence: bool = True
) -> None:
if not self.writer:
raise ReadOnlyObjStorageError("add")
internal_obj_id = self._hash(obj_id)
if check_presence and internal_obj_id in self.reader:
return
self.writer.add(content, internal_obj_id)
[docs]
def delete(self, obj_id: HashDict):
if not self.writer:
raise ReadOnlyObjStorageError("delete")
if not self.allow_delete:
raise PermissionError("Delete is not allowed.")
try:
return self.writer.delete(self._hash(obj_id))
# Re-raise ObjNotFoundError with the full object id
except ObjNotFoundError as exc:
raise ObjNotFoundError(obj_id) from exc
def _hash(self, obj_id: HashDict) -> bytes:
return obj_id[self.primary_hash]
[docs]
def on_shutdown(self):
self.reader.on_shutdown()
if self.writer:
self.writer.on_shutdown()
[docs]
class WineryReader:
def __init__(
self, throttler: Throttler, pool: roshard.Pool, database: settings.Database
):
self.throttler = throttler
self.pool = pool
self.base = SharedBase(
base_dsn=database["db"], application_name=database["application_name"]
)
self.ro_shards: Dict[str, roshard.ROShard] = {}
self.rw_shards: Dict[str, RWShard] = {}
def __contains__(self, obj_id):
return self.base.contains(obj_id)
[docs]
def list_signatures(
self, after_id: Optional[bytes] = None, limit: Optional[int] = None
) -> Iterator[bytes]:
yield from self.base.list_signatures(after_id, limit)
[docs]
def roshard(self, name) -> Optional[roshard.ROShard]:
if name not in self.ro_shards:
try:
shard = roshard.ROShard(
name=name,
throttler=self.throttler,
pool=self.pool,
)
except roshard.ShardNotMapped:
return None
self.ro_shards[name] = shard
if name in self.rw_shards:
del self.rw_shards[name]
return self.ro_shards[name]
[docs]
def rwshard(self, name) -> RWShard:
if name not in self.rw_shards:
shard = RWShard(
name, shard_max_size=0, base_dsn=self.base.dsn, readonly=True
)
self.rw_shards[name] = shard
return self.rw_shards[name]
[docs]
def get(self, obj_id: bytes) -> bytes:
shard_info = self.base.get(obj_id)
if shard_info is None:
raise ObjNotFoundError(obj_id)
name, state = shard_info
content: Optional[bytes] = None
if state.image_available:
roshard = self.roshard(name)
if roshard:
content = roshard.get(obj_id)
if content is None:
rwshard = self.rwshard(name)
content = rwshard.get(obj_id)
if content is None:
raise ObjNotFoundError(obj_id)
return content
[docs]
def on_shutdown(self):
for shard in self.ro_shards.values():
shard.close()
self.ro_shards = {}
self.rw_shards = {}
[docs]
def pack(
shard: str,
base_dsn: str,
packer_settings: settings.Packer,
throttler_settings: Optional[settings.Throttler],
shards_settings: settings.Shards,
shards_pool_settings: settings.ShardsPool,
shared_base: Optional[SharedBase] = None,
) -> bool:
rw = RWShard(shard, shard_max_size=shards_settings["max_size"], base_dsn=base_dsn)
count = rw.count()
logger.info("Creating RO shard %s for %s objects", shard, count)
throttler = Throttler.from_settings({"throttler": throttler_settings})
pool = roshard.pool_from_settings(
shards_settings=shards_settings, shards_pool_settings=shards_pool_settings
)
with roshard.ROShardCreator(
name=shard,
count=count,
throttler=throttler,
pool=pool,
rbd_create_images=packer_settings["create_images"],
) as ro:
logger.info("Created RO shard %s", shard)
for i, (obj_id, content) in enumerate(rw.all()):
ro.add(content, obj_id)
if i % 100 == 99:
logger.debug("RO shard %s: added %s/%s objects", shard, i + 1, count)
logger.debug("RO shard %s: added %s objects, saving", shard, count)
logger.info("RO shard %s: saved", shard)
if not shared_base:
shared_base = SharedBase(base_dsn=base_dsn)
shared_base.shard_packing_ends(shard)
if packer_settings["clean_immediately"]:
cleanup_rw_shard(shard, shared_base=shared_base)
return True
[docs]
def cleanup_rw_shard(shard, base_dsn=None, shared_base=None) -> bool:
if shared_base is not None and not base_dsn:
base_dsn = shared_base.dsn
rw = RWShard(name=shard, shard_max_size=0, base_dsn=base_dsn)
rw.drop()
if not shared_base:
shared_base = SharedBase(base_dsn=base_dsn)
shared_base.set_shard_state(name=shard, new_state=ShardState.READONLY)
return True
[docs]
class WineryWriter:
def __init__(
self,
packer_settings: settings.Packer,
throttler_settings: Optional[settings.Throttler],
shards_settings: settings.Shards,
shards_pool_settings: settings.ShardsPool,
database_settings: settings.Database,
):
self.packer_settings = packer_settings
self.throttler_settings = throttler_settings
self.shards_settings = shards_settings
self.shards_pool_settings = shards_pool_settings
self.base = SharedBase(
base_dsn=database_settings["db"],
application_name=database_settings["application_name"],
)
self.shards_filled: List[str] = []
self.packers: List[Process] = []
self._shard: Optional[RWShard] = None
self.idle_timeout = shards_settings.get("rw_idle_timeout", 300)
[docs]
def release_shard(
self,
shard: Optional[RWShard] = None,
from_idle_handler: bool = False,
new_state: ShardState = ShardState.STANDBY,
):
"""Release the currently locked shard"""
if not shard:
shard = self._shard
if not shard:
return
logger.debug("WineryWriter releasing shard %s", shard.name)
self.base.set_shard_state(new_state=new_state, name=shard.name)
if not from_idle_handler:
logger.debug("Shard released, disabling idle handler")
shard.disable_idle_handler()
self._shard = None
@property
def shard(self):
"""Lock a shard to be able to use it. Release it after :attr:`idle_timeout`."""
if not self._shard:
self._shard = RWShard(
name=self.base.locked_shard,
base_dsn=self.base.dsn,
shard_max_size=self.shards_settings["max_size"],
idle_timeout_cb=partial(self.release_shard, from_idle_handler=True),
idle_timeout=self.idle_timeout,
)
logger.debug(
"WineryBase: locked RWShard %s, releasing it in %s",
self._shard.name,
self.idle_timeout,
)
return self._shard
[docs]
def add(self, content: bytes, obj_id: bytes) -> None:
with self.base.pool.connection() as db, db.transaction():
shard = self.base.record_new_obj_id(db, obj_id)
if shard != self.base.locked_shard_id:
# this object is the responsibility of another shard
return
self.shard.add(db, obj_id, content)
if self.shard.is_full():
filled_name = self.shard.name
self.release_shard(new_state=ShardState.FULL)
self.shards_filled.append(filled_name)
if self.packer_settings["pack_immediately"]:
self.pack(filled_name)
[docs]
def delete(self, obj_id: bytes):
shard_info = self.base.get(obj_id)
if shard_info is None:
raise ObjNotFoundError(obj_id)
name, state = shard_info
# We only care about RWShard for now. ROShards will be
# taken care in a batch job.
if not state.image_available:
rwshard = RWShard(name, shard_max_size=0, base_dsn=self.base.dsn)
try:
rwshard.delete(obj_id)
except KeyError:
logger.warning(
"Shard %s does not seem to know about object %s, but we "
"had an entry in SharedBase (which is going to "
"be removed just now)",
rwshard.name,
obj_id,
)
self.base.delete(obj_id)
return True
[docs]
def check(self, obj_id: HashDict) -> None:
# load all shards packing == True and not locked (i.e. packer
# was interrupted for whatever reason) run pack for each of them
pass
[docs]
def pack(self, shard_name: str):
self.base.shard_packing_starts(shard_name)
p = Process(
target=pack,
kwargs={
"shard": shard_name,
"base_dsn": self.base.dsn,
"packer_settings": self.packer_settings,
"throttler_settings": self.throttler_settings,
"shards_settings": self.shards_settings,
"shards_pool_settings": self.shards_pool_settings,
},
)
p.start()
self.packers.append(p)
[docs]
def on_shutdown(self):
self.release_shard()
for p in self.packers:
p.join()
def __del__(self):
for p in getattr(self, "packers", []):
if not p.is_alive():
continue
logger.warning("Killing packer %s", p)
p.kill()
p.join()
[docs]
def never_stop(_: int) -> bool:
return False
[docs]
def stop_after_shards(max_shards_packed: int) -> Callable[[int], bool]:
def stop(shards_packed: int):
return shards_packed >= max_shards_packed
return stop
[docs]
def shard_packer(
database: settings.Database,
shards: settings.Shards,
shards_pool: settings.ShardsPool,
throttler: settings.Throttler,
packer: Optional[settings.Packer] = None,
stop_packing: Callable[[int], bool] = never_stop,
wait_for_shard: Callable[[int], None] = sleep_exponential(
min_duration=5,
factor=2,
max_duration=60,
message="No shards to pack",
),
) -> int:
"""Pack shards until the `stop_packing` function returns True.
When no shards are available for packing, call the `wait_for_shard` function.
Arguments:
database: database settings (e.g. db connection string)
shards: shards settings (e.g. max_size)
shards_pool: shards pool settings (e.g. Ceph RBD settings)
throttler: throttler settings
packer: packer settings
stop_packing: callback to determine whether the packer should exit
wait_for_shard: sleep function called when no shards are available to be packed
"""
all_settings = settings.populate_default_settings(
database=database,
shards=shards,
shards_pool=shards_pool,
throttler=throttler,
packer=(packer or {}),
)
application_name = (
all_settings["database"]["application_name"] or "Winery Shard Packer"
)
base = SharedBase(
base_dsn=all_settings["database"]["db"],
application_name=application_name,
)
shards_packed = 0
waited_for_shards = 0
while not stop_packing(shards_packed):
locked = base.maybe_lock_one_shard(
current_state=ShardState.FULL, new_state=ShardState.PACKING
)
if not locked:
wait_for_shard(waited_for_shards)
waited_for_shards += 1
continue
waited_for_shards = 0
with locked:
if locked.name is None:
raise RuntimeError("No shard has been locked?")
logger.info("shard_packer: Locked shard %s to pack", locked.name)
ret = pack(
shard=locked.name,
base_dsn=all_settings["database"]["db"],
packer_settings=all_settings["packer"],
throttler_settings=all_settings["throttler"],
shards_settings=all_settings["shards"],
shards_pool_settings=all_settings["shards_pool"],
shared_base=base,
)
if not ret:
raise ValueError("Packing shard %s failed" % locked.name)
shards_packed += 1
return shards_packed
[docs]
def rw_shard_cleaner(
database: settings.Database,
min_mapped_hosts: int,
stop_cleaning: Callable[[int], bool] = never_stop,
wait_for_shard: Callable[[int], None] = sleep_exponential(
min_duration=5,
factor=2,
max_duration=60,
message="No shards to clean up",
),
) -> int:
"""Clean up RW shards until the `stop_cleaning` function returns True.
When no shards are available for packing, call the `wait_for_shard` function.
Arguments:
database: database settings (e.g. db connection string)
min_mapped_hosts: how many hosts should have mapped the image read-only before
cleaning it
stop_cleaning: callback to determine whether the cleaner should exit
wait_for_shard: sleep function called when no shards are available to be cleaned
"""
database = settings.database_settings_with_defaults(database)
base = SharedBase(base_dsn=database["db"])
shards_cleaned = 0
waited_for_shards = 0
while not stop_cleaning(shards_cleaned):
locked = base.maybe_lock_one_shard(
current_state=ShardState.PACKED,
new_state=ShardState.CLEANING,
min_mapped_hosts=min_mapped_hosts,
)
if not locked:
wait_for_shard(waited_for_shards)
waited_for_shards += 1
continue
waited_for_shards = 0
with locked:
logger.info("rw_shard_cleaner: Locked shard %s to clean", locked.name)
ret = cleanup_rw_shard(
locked.name,
base_dsn=database["db"],
shared_base=base,
)
if not ret:
raise ValueError("Cleaning shard %s failed" % locked.name)
shards_cleaned += 1
return shards_cleaned
[docs]
def deleted_objects_cleaner(
base: SharedBase,
pool: roshard.Pool,
stop_running: Callable[[], bool],
):
"""Clean up deleted objects from RO shards and the shared database.
This requires the ability to map RBD images in read-write mode. Images will be
left mapped by this process as it is meant to be executed in a transient host
dedicated to this purpose.
Arguments:
base_dsn: PostgreSQL dsn for the shared database
pool: Ceph RBD pool for Winery shards
stop_running: callback that returns True when the manager should stop running
"""
count = 0
for obj_id, shard_name, shard_state in base.deleted_objects():
if stop_running():
break
if shard_state.readonly:
roshard.ROShard.delete(pool, shard_name, obj_id)
base.clean_deleted_object(obj_id)
count += 1
logger.info("Cleaned %d deleted objects", count)