Source code for swh.storage
# Copyright (C) 2015-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import importlib.metadata
from typing import TYPE_CHECKING, Any, Dict, List
import warnings
if TYPE_CHECKING:
from .interface import StorageInterface
StorageSpec = Dict[str, Any]
[docs]
def get_storage(cls: str, **kwargs) -> "StorageInterface":
"""Get a storage object of class ``cls`` with arguments ``**kwargs``.
``cls`` must be one of the ``swh.storage.classes`` `entry-points
<https://setuptools.pypa.io/en/latest/userguide/entry_point.html>`_
Args:
cls (str):
storage's class, can be:
- ``local`` to use a postgresql database
- ``cassandra`` to use a cassandra database
- ``remote`` to connect to a swh-storage server
- ``memory`` for an in-memory storage, useful for fast tests
- ``filter``, ``buffer``, ... to use specific storage "proxies", see their
respective documentations
args (dict): dictionary with keys
Returns:
an instance of swh.storage.Storage or compatible class
Raises:
ValueError if passed an unknown storage class.
"""
if "args" in kwargs:
warnings.warn(
'Explicit "args" key is deprecated, use keys directly instead.',
DeprecationWarning,
)
kwargs = kwargs["args"]
if cls == "pipeline":
return get_storage_pipeline(**kwargs)
if cls == "local":
warnings.warn(
'The "local" storage class is deprecated, use "postgresql" instead.',
DeprecationWarning,
)
entry_points = importlib.metadata.entry_points(group="swh.storage.classes")
try:
entry_point = entry_points[cls]
except KeyError:
raise ValueError(
"Unknown storage class `%s`. Supported: %s"
% (cls, ", ".join(entry_point.name for entry_point in entry_points))
) from None
Storage = entry_point.load()
check_config = kwargs.pop("check_config", {})
storage = Storage(**kwargs)
if check_config:
if not storage.check_config(**check_config):
raise EnvironmentError("storage check config failed")
return storage
get_datastore = get_storage
[docs]
def get_storage_pipeline(
steps: List[Dict[str, Any]], check_config=None
) -> "StorageInterface":
"""Recursively get a storage object that may use other storage objects
as backends.
Args:
steps (List[dict]): List of dicts that may be used as kwargs for
`get_storage`.
Returns:
an instance of swh.storage.Storage or compatible class
Raises:
ValueError if passed an unknown storage class.
"""
storage_config = None
for step in reversed(steps):
if "args" in step:
warnings.warn(
'Explicit "args" key is deprecated, use keys directly ' "instead.",
DeprecationWarning,
)
step = {
"cls": step["cls"],
**step["args"],
}
if storage_config:
step["storage"] = storage_config
step["check_config"] = check_config
storage_config = step
if storage_config is None:
raise ValueError("'pipeline' has no steps.")
return get_storage(**storage_config)