# Copyright (C) 2022-2026 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import OrderedDict
from functools import partial
import logging
from typing import Dict, Iterator, List, Optional, Tuple
from swh.core.statsd import statsd
from swh.objstorage.constants import LiteralPrimaryHash
from swh.objstorage.exc import ObjNotFoundError, ReadOnlyObjStorageError
from swh.objstorage.interface import HashDict
from swh.objstorage.objstorage import ObjStorage, timed
from . import settings
from .housekeeping import pack
from .pools import Pool, pool_from_settings
from .roshard import ROShard, ShardNotMapped
from .rwshard import RWShard
from .sharedbase import ShardState, SharedBase
logger = logging.getLogger(__name__)
SHARD_OPEN_DURATION_METRIC = "swh_objstorage_winery_shard_open_seconds"
SHARD_CACHE_METRIC = "swh_objstorage_winery_shard_request_count"
[docs]
class WineryObjStorage(ObjStorage):
primary_hash: LiteralPrimaryHash = "sha256"
name: str = "winery"
def __init__(
self,
database: settings.Database,
shards: settings.Shards,
shards_pool: settings.ShardsPool,
packer: Optional[settings.Packer] = None,
readonly: bool = False,
allow_delete: bool = False,
name: str = "winery",
readers_cache_size: int = 1000,
) -> None:
super().__init__(allow_delete=allow_delete, name=name)
if self.primary_hash != "sha256":
raise TypeError("Winery backend only support the sha256 primary hash")
self.settings = settings.populate_default_settings(
database=database,
shards=shards,
shards_pool=shards_pool,
packer=(packer or {}),
)
self.pool = pool_from_settings(
shards_settings=self.settings["shards"],
shards_pool_settings=self.settings["shards_pool"],
)
self.reader: WineryReader = WineryReader(
pool=self.pool,
database=self.settings["database"],
cache_size=readers_cache_size,
)
self.writer: Optional[WineryWriter] = None
if not readonly:
self.writer = WineryWriter(
packer_settings=self.settings["packer"],
shards_settings=self.settings["shards"],
shards_pool_settings=self.settings["shards_pool"],
database_settings=self.settings["database"],
)
[docs]
@timed
def get(self, obj_id: HashDict) -> bytes:
try:
return self.reader.get(self._hash(obj_id))
except ObjNotFoundError as exc:
# re-raise exception with the passed obj_id instead of the internal winery obj_id.
raise ObjNotFoundError(obj_id) from exc
[docs]
def check_config(self, *, check_write: bool) -> bool:
return True
@timed
def __contains__(self, obj_id: HashDict) -> bool:
return self._hash(obj_id) in self.reader
[docs]
@timed
def add(
self, content: bytes, obj_id: HashDict, check_presence: bool = True
) -> None:
self._add_batch([(obj_id, content)])
[docs]
@timed
def add_batch(
self, contents: list[tuple[HashDict, bytes]], check_presence: bool = True
) -> Dict:
"""``contents`` should be pairs of ``(obj_id, content)``"""
return self._add_batch(contents, check_presence)
def _add_batch(
self, contents: list[tuple[HashDict, bytes]], check_presence: bool = True
) -> Dict:
"""Same as ``add_batch``, but not wrapped by ``@timed``, so ``add()`` is not
double-counted"""
if not self.writer:
raise ReadOnlyObjStorageError("add")
hashed_contents = (
(self._hash(obj_id), content) for (obj_id, content) in contents
)
if check_presence:
# filter out contents that already exist
hashed_contents = (
(internal_obj_id, content)
for (internal_obj_id, content) in hashed_contents
if internal_obj_id not in self.reader
)
hashed_contents_list = list(hashed_contents)
if hashed_contents_list:
return self.writer.add_batch(hashed_contents_list)
return {"object:add": 0, "object:add:bytes": 0}
[docs]
def delete(self, obj_id: HashDict):
if not self.writer:
raise ReadOnlyObjStorageError("delete")
if not self.allow_delete:
raise PermissionError("Delete is not allowed.")
try:
return self.writer.delete(self._hash(obj_id))
# Re-raise ObjNotFoundError with the full object id
except ObjNotFoundError as exc:
raise ObjNotFoundError(obj_id) from exc
def _hash(self, obj_id: HashDict) -> bytes:
return obj_id[self.primary_hash]
[docs]
def on_shutdown(self):
self.reader.on_shutdown()
if self.writer:
self.writer.on_shutdown()
[docs]
class LRUDict(OrderedDict):
def __init__(self, capacity: int):
assert capacity > 0
super().__init__()
self.capacity = capacity
def __getitem__(self, key):
if key in self:
self.move_to_end(key)
return self.get(key)
return None
def __setitem__(self, key, value):
super().__setitem__(key, value)
self.move_to_end(key)
if len(self) > self.capacity:
self.popitem(last=False)
[docs]
class WineryReader:
def __init__(self, pool: Pool, database: settings.Database, cache_size: int = 1000):
self.pool = pool
self.base = SharedBase(
base_dsn=database["db"], application_name=database["application_name"]
)
self.ro_shards: Dict[str, ROShard] = LRUDict(cache_size)
self.rw_shards: Dict[str, RWShard] = {}
def __contains__(self, obj_id):
return self.base.contains(obj_id)
[docs]
def list_signatures(
self, after_id: Optional[bytes] = None, limit: Optional[int] = None
) -> Iterator[bytes]:
yield from self.base.list_signatures(after_id, limit)
[docs]
def roshard(self, name) -> Optional[ROShard]:
if name not in self.ro_shards:
try:
with statsd.timed(
SHARD_OPEN_DURATION_METRIC,
tags={"shard_type": "ro", "pool_name": self.pool.pool_name},
):
shard = ROShard(
name=name,
pool=self.pool,
)
statsd.increment(
SHARD_CACHE_METRIC,
tags={
"shard_type": "ro",
"cache_status": "miss",
"pool_name": self.pool.pool_name,
},
)
except ShardNotMapped:
statsd.increment(
SHARD_CACHE_METRIC,
tags={
"shard_type": "ro",
"cache_status": "fallthrough",
"pool_name": self.pool.pool_name,
},
)
return None
self.ro_shards[name] = shard
if name in self.rw_shards:
del self.rw_shards[name]
else:
statsd.increment(
SHARD_CACHE_METRIC,
tags={
"shard_type": "ro",
"cache_status": "hit",
"pool_name": self.pool.pool_name,
},
)
return self.ro_shards[name]
[docs]
def rwshard(self, name) -> RWShard:
if name not in self.rw_shards:
statsd.increment(
SHARD_CACHE_METRIC, tags={"shard_type": "rw", "cache_status": "miss"}
)
with statsd.timed(SHARD_OPEN_DURATION_METRIC, tags={"shard_type": "rw"}):
shard = RWShard(
name, shard_max_size=0, base_dsn=self.base.dsn, readonly=True
)
self.rw_shards[name] = shard
else:
statsd.increment(
SHARD_CACHE_METRIC, tags={"shard_type": "rw", "cache_status": "hit"}
)
return self.rw_shards[name]
[docs]
def get(self, obj_id: bytes) -> bytes:
shard_info = self.base.get(obj_id)
if shard_info is None:
raise ObjNotFoundError(obj_id)
name, state = shard_info
content: Optional[bytes] = None
if state.image_available:
roshard = self.roshard(name)
if roshard:
content = roshard.get(obj_id)
if content is None:
rwshard = self.rwshard(name)
content = rwshard.get(obj_id)
if content is None:
raise ObjNotFoundError(obj_id)
return content
[docs]
def on_shutdown(self):
for shard in self.ro_shards.values():
shard.close()
self.ro_shards.clear()
self.rw_shards = {}
[docs]
class WineryWriter:
def __init__(
self,
packer_settings: settings.Packer,
shards_settings: settings.Shards,
shards_pool_settings: settings.ShardsPool,
database_settings: settings.Database,
):
self.packer_settings = packer_settings
self.shards_settings = shards_settings
self.shards_pool_settings = shards_pool_settings
self.base = SharedBase(
base_dsn=database_settings["db"],
application_name=database_settings["application_name"],
)
self.shards_filled: List[str] = []
self._shard: Optional[RWShard] = None
self.idle_timeout = shards_settings.get("rw_idle_timeout", 300)
[docs]
def release_shard(
self,
shard: Optional[RWShard] = None,
from_idle_handler: bool = False,
new_state: ShardState = ShardState.STANDBY,
):
"""Release the currently locked shard"""
if not shard:
shard = self._shard
if not shard:
return
logger.debug("WineryWriter releasing shard %s", shard.name)
self.base.set_shard_state(new_state=new_state, name=shard.name)
if not from_idle_handler:
logger.debug("Shard released, disabling idle handler")
shard.disable_idle_handler()
self._shard = None
@property
def shard(self):
"""Lock a shard to be able to use it. Release it after :attr:`idle_timeout`."""
if not self._shard:
self._shard = RWShard(
name=self.base.locked_shard,
base_dsn=self.base.dsn,
shard_max_size=self.shards_settings["max_size"],
idle_timeout_cb=partial(self.release_shard, from_idle_handler=True),
idle_timeout=self.idle_timeout,
)
logger.debug(
"WineryBase: locked RWShard %s, releasing it in %s",
self._shard.name,
self.idle_timeout,
)
return self._shard
[docs]
def add(self, content: bytes, obj_id: bytes) -> None:
self.add_batch([(obj_id, content)])
[docs]
def add_batch(self, contents: List[Tuple[bytes, bytes]]) -> Dict:
"""``contents`` should be pairs of ``(obj_id, content)``"""
with self.base.pool.connection() as db, db.transaction():
shards = self.base.record_new_obj_ids(
db, [obj_id for (obj_id, _content) in contents]
)
contents = [
(obj_id, content)
for (obj_id, content) in contents
# if not equal, this object is the responsibility of another shard:
if shards[obj_id] == self.base.locked_shard_id
]
stats = self.shard.add_batch(db, contents)
if self.shard.is_full():
filled_name = self.shard.name
self.release_shard(new_state=ShardState.FULL)
self.shards_filled.append(filled_name)
if self.packer_settings["pack_immediately"]:
logger.warning(
"pack_immediately has been disabled. Please use a "
"'swh objstorage winery packer' service instead. "
"Packing will NOT be executed now."
)
return stats
[docs]
def delete(self, obj_id: bytes):
shard_info = self.base.get(obj_id)
if shard_info is None:
raise ObjNotFoundError(obj_id)
name, state = shard_info
# We only care about RWShard for now. ROShards will be
# taken care in a batch job.
if not state.image_available:
rwshard = RWShard(name, shard_max_size=0, base_dsn=self.base.dsn)
try:
rwshard.delete(obj_id)
except KeyError:
logger.warning(
"Shard %s does not seem to know about object %s, but we "
"had an entry in SharedBase (which is going to "
"be removed just now)",
rwshard.name,
obj_id,
)
self.base.delete(obj_id)
return True
[docs]
def check(self, obj_id: HashDict) -> None:
# load all shards packing == True and not locked (i.e. packer
# was interrupted for whatever reason) run pack for each of them
pass
[docs]
def pack(self, shard_name: str):
self.base.shard_packing_starts(shard_name)
return pack(
shard=shard_name,
base_dsn=self.base.dsn,
packer_settings=self.packer_settings,
shards_settings=self.shards_settings,
shards_pool_settings=self.shards_pool_settings,
)
[docs]
def on_shutdown(self):
self.release_shard()