Source code for swh.objstorage.backends.winery.rwshard

# Copyright (C) 2021  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information


import psycopg2

from .database import Database, DatabaseAdmin


[docs]class RWShard(Database): def __init__(self, name, **kwargs): self._name = name DatabaseAdmin(kwargs["base_dsn"], self.name).create_database() super().__init__(kwargs["shard_dsn"], self.name) self.create_tables() self.db = self.connect_database() self.size = self.total_size() self.limit = kwargs["shard_max_size"]
[docs] def uninit(self): if hasattr(self, "db"): self.db.close() del self.db
@property def lock(self): return 452343 # an arbitrary unique number @property def name(self): return self._name
[docs] def is_full(self): return self.size > self.limit
[docs] def drop(self): DatabaseAdmin(self.dsn, self.dbname).drop_database()
@property def database_tables(self): return [ """ CREATE TABLE IF NOT EXISTS objects( key BYTEA PRIMARY KEY, content BYTEA ) """, ]
[docs] def total_size(self): with self.db.cursor() as c: c.execute("SELECT SUM(LENGTH(content)) FROM objects") size = c.fetchone()[0] if size is None: return 0 else: return size
[docs] def add(self, obj_id, content): try: with self.db.cursor() as c: c.execute( "INSERT INTO objects (key, content) VALUES (%s, %s)", (obj_id, content), ) self.db.commit() self.size += len(content) except psycopg2.errors.UniqueViolation: pass
[docs] def get(self, obj_id): with self.db.cursor() as c: c.execute("SELECT content FROM objects WHERE key = %s", (obj_id,)) if c.rowcount == 0: return None else: return c.fetchone()[0].tobytes()
[docs] def all(self): with self.db.cursor() as c: c.execute("SELECT key,content FROM objects") for row in c: yield row[0].tobytes(), row[1].tobytes()
[docs] def count(self): with self.db.cursor() as c: c.execute("SELECT COUNT(*) FROM objects") return c.fetchone()[0]