Source code for swh.objstorage.backends.winery.database
# Copyright (C) 2021-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import logging
from psycopg_pool import ConnectionPool
logger = logging.getLogger(__name__)
[docs]
class Database(abc.ABC):
def __init__(self, dsn: str, application_name: str):
self.dsn = dsn
self.application_name = application_name
self._pool = None
@property
def pool(self):
if not self._pool:
self._pool = ConnectionPool(
conninfo=self.dsn,
kwargs={
"application_name": self.application_name,
"fallback_application_name": "SWH Winery",
"autocommit": True,
},
name=f"pool-{self.application_name}"
if self.application_name
else "pool",
min_size=0,
max_size=4,
open=True,
max_idle=5,
check=ConnectionPool.check_connection,
)
return self._pool
[docs]
def list_shard_tables(self):
QUERY = """
SELECT c.relname
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_catalog.pg_am am ON am.oid = c.relam
WHERE c.relkind = 'r'
AND n.nspname <> 'pg_catalog'
AND n.nspname !~ '^pg_toast'
AND n.nspname <> 'information_schema'
AND pg_catalog.pg_table_is_visible(c.oid)
AND c.relname ~ '^shard_'
AND c.relname <> 'shard_template'
"""
with self.pool.connection() as db:
c = db.execute(QUERY)
return [r[0].removeprefix("shard_") for r in c]
def __del__(self):
# Release the connection pool
if self._pool:
self._pool.close()
self._pool = None