# Copyright (C) 2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
from functools import partial
import logging
from typing import Any, Dict, List, Tuple
from urllib.parse import unquote_plus
from requests import HTTPError
from swh.fuse import LOGGER_NAME
from swh.fuse.backends import ContentBackend, GraphBackend
from swh.fuse.cache import FuseCache
from swh.model.swhids import CoreSWHID, ObjectType
from swh.web.client.client import WebAPIClient
[docs]
class WebApiBackend(GraphBackend, ContentBackend):
"""
A Backend querying everything via Software Heritage's public API.
This is simpler to configure and deploy, but expect long response times.
It gets a pointer to the cache because some endpoints are more verbose than needed,
but it allows to pre-cache data for further use. That's not elegant, but can avoid
being rate-limited because of a simple `ls archive/rev.../history` .
"""
def __init__(self, conf: Dict, cache: FuseCache):
"""
Only needs the ``web-api`` key of ``conf``, searching for ``url`` and maybe
``auth-token`` keys.
"""
self.web_api = WebAPIClient(
conf["web-api"]["url"], conf["web-api"]["auth-token"]
)
self.logger = logging.getLogger(LOGGER_NAME)
self.cache = cache
[docs]
async def get_blob(self, swhid) -> bytes:
try:
self.logger.debug("Retrieving blob %s via web API...", swhid)
loop = asyncio.get_event_loop()
resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid)
blob = b"".join(list(resp))
return blob
except HTTPError as err:
self.logger.error("Cannot fetch blob for object %s: %s", swhid, err)
raise
[docs]
async def get_history(self, swhid: CoreSWHID) -> List[Tuple[str, str]]:
"""
Fetch a thousand ``(entry, parent)`` edges from ``swhid``.
As the ``/log`` endpoint provides complete metadata objects, we pre-fill the
cache by the way: it will be used afterwards by ``RevisionHistoryShardBy*``
artifacts.
"""
edges = []
limit = 1000
try:
self.logger.debug(
"Retrieving %d revs before %s via Web API...", limit, swhid
)
call = f"revision/{swhid.object_id.hex()}/log/?limit={limit}"
loop = asyncio.get_event_loop()
request = await loop.run_in_executor(None, self.web_api._call, call)
history = request.json()
for revision in history:
entry_swhid = CoreSWHID(
object_type=ObjectType.REVISION,
object_id=bytes.fromhex(revision["id"]),
)
await self.cache.metadata.set(entry_swhid, revision)
for parent in revision["parents"]:
parent_swhid = CoreSWHID(
object_type=ObjectType.REVISION,
object_id=bytes.fromhex(parent["id"]),
)
edges.append((str(entry_swhid), str(parent_swhid)))
except HTTPError as err:
self.logger.error("Cannot fetch history for object %s: %s", swhid, err)
return edges
[docs]
async def get_visits(self, url_encoded: str) -> List[Dict[str, Any]]:
try:
self.logger.debug(
"Retrieving visits for origin '%s' via web API...", url_encoded
)
loop = asyncio.get_event_loop()
# Web API only takes non-encoded URL
url = unquote_plus(url_encoded)
origin_exists = await loop.run_in_executor(
None, self.web_api.origin_exists, url
)
if not origin_exists:
raise ValueError("origin does not exist")
visits_it = await loop.run_in_executor(
None, partial(self.web_api.visits, url, typify=False)
)
visits = list(visits_it)
return visits
except (ValueError, HTTPError) as err:
self.logger.error(
"Cannot fetch visits for origin '%s': %s", url_encoded, err
)
raise