Source code for swh.vault.cookers
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import os
from typing import Any, Dict, List, Type
from swh.core.config import load_named_config
from swh.core.config import read as read_config
from swh.model.swhids import CoreSWHID, ObjectType
from swh.objstorage.factory import get_objstorage
from swh.storage import get_storage
from swh.vault.cookers.base import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH, BaseVaultCooker
from swh.vault.cookers.directory import DirectoryCooker
from swh.vault.cookers.git_bare import GitBareCooker
from swh.vault.cookers.revision_flat import RevisionFlatCooker
from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker
_COOKER_CLS: List[Type[BaseVaultCooker]] = [
DirectoryCooker,
RevisionFlatCooker,
RevisionGitfastCooker,
GitBareCooker,
]
COOKER_TYPES: Dict[str, List[Type[BaseVaultCooker]]] = {}
for _cooker_cls in _COOKER_CLS:
COOKER_TYPES.setdefault(_cooker_cls.BUNDLE_TYPE, []).append(_cooker_cls)
[docs]
def get_cooker_cls(bundle_type: str, object_type: ObjectType):
cookers = COOKER_TYPES.get(bundle_type)
if not cookers:
raise ValueError(f"{bundle_type} is not a valid bundle type.")
for cooker in cookers:
try:
cooker.check_object_type(object_type)
except ValueError:
pass
else:
return cooker
raise ValueError(
f"{object_type.name.lower()} objects do not have a {bundle_type} cooker"
)
[docs]
def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]:
"""Ensure the configuration is ok to run a vault worker, and propagate defaults
Raises:
EnvironmentError if the configuration is not for remote instance
ValueError if one of the following keys is missing: vault, storage
Returns:
New configuration dict to instantiate a vault worker instance
"""
cfg = cfg.copy()
if "vault" not in cfg:
raise ValueError("missing 'vault' configuration")
vcfg = cfg["vault"]
if vcfg["cls"] != "remote":
raise EnvironmentError(
"This vault backend can only be a 'remote' configuration"
)
# Default to top-level value if any
for key in ("storage", "objstorage", "graph"):
if key not in vcfg and key in cfg:
vcfg[key] = cfg[key]
if not vcfg.get("storage"):
raise ValueError("invalid configuration: missing 'storage' config entry.")
return cfg
[docs]
def get_cooker(bundle_type: str, swhid: CoreSWHID):
"""Instantiate a cooker class of type bundle_type.
Returns:
Cooker class in charge of cooking the bundle_type with id swhid.
Raises:
ValueError in case of a missing top-level vault key configuration or a storage
key.
EnvironmentError in case the vault configuration reference a non remote class.
"""
from swh.vault import get_vault
if "SWH_CONFIG_FILENAME" in os.environ:
cfg = read_config(os.environ["SWH_CONFIG_FILENAME"], DEFAULT_CONFIG)
else:
cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG)
cooker_cls = get_cooker_cls(bundle_type, swhid.object_type)
cfg = check_config(cfg)
vcfg = cfg["vault"]
storage = get_storage(**vcfg.pop("storage"))
backend = get_vault(**vcfg)
try:
from swh.graph.http_client import RemoteGraphClient # optional dependency
graph = RemoteGraphClient(**vcfg["graph"]) if vcfg.get("graph") else None
except ModuleNotFoundError:
if vcfg.get("graph"):
raise EnvironmentError(
"Graph configuration required but module is not installed."
)
else:
graph = None
if vcfg.get("objstorage"):
objstorage = get_objstorage(**vcfg["objstorage"])
else:
objstorage = None
kwargs = {
k: v for (k, v) in cfg.items() if k in ("max_bundle_size", "thread_pool_size")
}
return cooker_cls(
swhid,
backend=backend,
storage=storage,
objstorage=objstorage,
graph=graph,
**kwargs,
)