Source code for swh.objstorage.backends.winery.objstorage
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from multiprocessing import Process
from swh.objstorage import exc
from swh.objstorage.interface import ObjId
from swh.objstorage.objstorage import ObjStorage
from .roshard import ROShard
from .rwshard import RWShard
from .sharedbase import SharedBase
from .stats import Stats
logger = logging.getLogger(__name__)
[docs]class WineryObjStorage(ObjStorage):
def __init__(self, **kwargs):
super().__init__(**kwargs)
if kwargs.get("readonly"):
self.winery = WineryReader(**kwargs)
else:
self.winery = WineryWriter(**kwargs)
[docs] def uninit(self):
self.winery.uninit()
[docs] def get(self, obj_id: ObjId) -> bytes:
return self.winery.get(obj_id)
[docs] def check_config(self, *, check_write):
return True
def __contains__(self, obj_id):
return obj_id in self.winery
[docs] def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None:
self.winery.add(content, obj_id, check_presence)
[docs] def check(self, obj_id: ObjId) -> None:
return self.winery.check(obj_id)
[docs] def delete(self, obj_id: ObjId):
raise PermissionError("Delete is not allowed.")
[docs]class WineryBase:
def __init__(self, **kwargs):
self.args = kwargs
self.init()
[docs] def init(self):
self.base = SharedBase(**self.args)
[docs] def uninit(self):
self.base.uninit()
def __contains__(self, obj_id):
return self.base.contains(obj_id)
[docs]class WineryReader(WineryBase):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.shards = {}
[docs] def roshard(self, name):
if name not in self.shards:
shard = ROShard(name, **self.args)
shard.load()
self.shards[name] = shard
return self.shards[name]
[docs] def get(self, obj_id: ObjId) -> bytes:
shard_info = self.base.get(obj_id)
if shard_info is None:
raise exc.ObjNotFoundError(obj_id)
name, readonly = shard_info
if readonly:
shard = self.roshard(name)
content = shard.get(obj_id)
del shard
else:
shard = RWShard(name, **self.args)
content = shard.get(obj_id)
if content is None:
raise exc.ObjNotFoundError(obj_id)
return content
[docs]def pack(shard, **kwargs):
return Packer(shard, **kwargs).run()
[docs]class Packer:
def __init__(self, shard, **kwargs):
self.stats = Stats(kwargs.get("output_dir"))
self.args = kwargs
self.shard = shard
self.init()
[docs] def init(self):
self.rw = RWShard(self.shard, **self.args)
self.ro = ROShard(self.shard, **self.args)
[docs] def uninit(self):
del self.ro
self.rw.uninit()
[docs] def run(self):
self.ro.create(self.rw.count())
for obj_id, content in self.rw.all():
self.ro.add(content, obj_id)
if self.stats.stats_active:
self.stats.stats_read(obj_id, content)
self.stats.stats_write(obj_id, content)
self.ro.save()
base = SharedBase(**self.args)
base.shard_packing_ends(self.shard)
base.uninit()
self.rw.uninit()
self.rw.drop()
return True
[docs]class WineryWriter(WineryReader):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.packers = []
self.init()
[docs] def init(self):
super().init()
self.shard = RWShard(self.base.whoami, **self.args)
[docs] def uninit(self):
self.shard.uninit()
super().uninit()
[docs] def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> None:
if check_presence and obj_id in self:
return
shard = self.base.add_phase_1(obj_id)
if shard != self.base.id:
# this object is the responsibility of another shard
return
self.shard.add(obj_id, content)
self.base.add_phase_2(obj_id)
if self.shard.is_full():
self.pack()
[docs] def check(self, obj_id: ObjId) -> None:
# load all shards packing == True and not locked (i.e. packer
# was interrupted for whatever reason) run pack for each of them
pass
[docs] def pack(self):
self.base.shard_packing_starts()
p = Process(target=pack, args=(self.shard.name,), kwargs=self.args)
self.uninit()
p.start()
self.packers.append(p)
self.init()
def __del__(self):
for p in self.packers:
p.kill()
p.join()