Source code for swh.objstorage.backends.winery.objstorage

# Copyright (C) 2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
from multiprocessing import Process

from swh.objstorage import exc
from swh.objstorage.interface import ObjId
from swh.objstorage.objstorage import ObjStorage

from .roshard import ROShard
from .rwshard import RWShard
from .sharedbase import SharedBase
from .stats import Stats

logger = logging.getLogger(__name__)

[docs]class WineryObjStorage(ObjStorage): def __init__(self, **kwargs): super().__init__(**kwargs) if kwargs.get("readonly"): self.winery = WineryReader(**kwargs) else: self.winery = WineryWriter(**kwargs)
[docs] def uninit(self): self.winery.uninit()
[docs] def get(self, obj_id: ObjId) -> bytes: return self.winery.get(obj_id)
[docs] def check_config(self, *, check_write): return True
def __contains__(self, obj_id): return obj_id in self.winery
[docs] def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None: self.winery.add(content, obj_id, check_presence)
[docs] def check(self, obj_id: ObjId) -> None: return self.winery.check(obj_id)
[docs] def delete(self, obj_id: ObjId): raise PermissionError("Delete is not allowed.")
[docs]class WineryBase: def __init__(self, **kwargs): self.args = kwargs self.init()
[docs] def init(self): self.base = SharedBase(**self.args)
[docs] def uninit(self): self.base.uninit()
def __contains__(self, obj_id): return self.base.contains(obj_id)
[docs]class WineryReader(WineryBase): def __init__(self, **kwargs): super().__init__(**kwargs) self.shards = {}
[docs] def roshard(self, name): if name not in self.shards: shard = ROShard(name, **self.args) shard.load() self.shards[name] = shard return self.shards[name]
[docs] def get(self, obj_id: ObjId) -> bytes: shard_info = self.base.get(obj_id) if shard_info is None: raise exc.ObjNotFoundError(obj_id) name, readonly = shard_info if readonly: shard = self.roshard(name) content = shard.get(obj_id) del shard else: shard = RWShard(name, **self.args) content = shard.get(obj_id) if content is None: raise exc.ObjNotFoundError(obj_id) return content
[docs]def pack(shard, **kwargs): return Packer(shard, **kwargs).run()
[docs]class Packer: def __init__(self, shard, **kwargs): self.stats = Stats(kwargs.get("output_dir")) self.args = kwargs self.shard = shard self.init()
[docs] def init(self): = RWShard(self.shard, **self.args) = ROShard(self.shard, **self.args)
[docs] def uninit(self): del
[docs] def run(self): for obj_id, content in, obj_id) if self.stats.stats_active: self.stats.stats_read(obj_id, content) self.stats.stats_write(obj_id, content) base = SharedBase(**self.args) base.shard_packing_ends(self.shard) base.uninit() return True
[docs]class WineryWriter(WineryReader): def __init__(self, **kwargs): super().__init__(**kwargs) self.packers = [] self.init()
[docs] def init(self): super().init() self.shard = RWShard(self.base.whoami, **self.args)
[docs] def uninit(self): self.shard.uninit() super().uninit()
[docs] def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None: if check_presence and obj_id in self: return shard = self.base.add_phase_1(obj_id) if shard != # this object is the responsibility of another shard return self.shard.add(obj_id, content) self.base.add_phase_2(obj_id) if self.shard.is_full(): self.pack()
[docs] def check(self, obj_id: ObjId) -> None: # load all shards packing == True and not locked (i.e. packer # was interrupted for whatever reason) run pack for each of them pass
[docs] def pack(self): self.base.shard_packing_starts() p = Process(target=pack, args=(,), kwargs=self.args) self.uninit() p.start() self.packers.append(p) self.init()
def __del__(self): for p in self.packers: p.kill() p.join()