Source code for swh.objstorage.backends.winery.rwshard

# Copyright (C) 2021-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information


import logging

import psycopg
import psycopg.errors

from .database import Database, DatabaseAdmin

logger = logging.getLogger(__name__)


[docs] class RWShard(Database): def __init__(self, name, application_name=None, **kwargs): self._name = name self.application_name = application_name if application_name is None: self.application_name = f"SWH Winery RW Shard {name}" DatabaseAdmin( kwargs["base_dsn"], self.name, application_name=f"Admin {self.application_name}", ).create_database() super().__init__(kwargs["shard_dsn"], self.name, self.application_name) self.create_tables() self.size = self.total_size() self.limit = kwargs["shard_max_size"] @property def lock(self): return 452343 # an arbitrary unique number @property def name(self): return self._name
[docs] def is_full(self): return self.size >= self.limit
[docs] def drop(self): DatabaseAdmin(self.dsn, self.dbname).drop_database()
@property def database_tables(self): return [ """ CREATE TABLE IF NOT EXISTS objects( key BYTEA PRIMARY KEY, content BYTEA ) WITH (autovacuum_enabled = false) """, ]
[docs] def total_size(self): with self.pool.connection() as db, db.cursor() as c: c.execute("SELECT SUM(LENGTH(content)) FROM objects") size = c.fetchone()[0] if size is None: return 0 else: return size
[docs] def add(self, obj_id, content): try: with self.pool.connection() as db, db.cursor() as c: c.execute( "INSERT INTO objects (key, content) VALUES (%s, %s)", (obj_id, content), binary=True, ) self.size += len(content) except psycopg.errors.UniqueViolation: pass
[docs] def get(self, obj_id): with self.pool.connection() as db, db.cursor() as c: c.execute( "SELECT content FROM objects WHERE key = %s", (obj_id,), binary=True ) if c.rowcount == 0: return None else: return c.fetchone()[0]
[docs] def delete(self, obj_id): with self.pool.connection() as db, db.cursor() as c: c.execute("DELETE FROM objects WHERE key = %s", (obj_id,)) if c.rowcount == 0: raise KeyError(obj_id)
[docs] def all(self): with self.pool.connection() as db, db.cursor() as c: with c.copy( "COPY objects (key, content) TO STDOUT (FORMAT BINARY)" ) as copy: copy.set_types(["bytea", "bytea"]) yield from copy.rows()
[docs] def count(self): with self.pool.connection() as db, db.cursor() as c: c.execute("SELECT COUNT(*) FROM objects") return c.fetchone()[0]