Source code for swh.web.utils.origin_visits

# Copyright (C) 2018-2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information

from datetime import datetime
from typing import List, Optional

from swh.web.utils import archive, cache_get, cache_set, parse_iso8601_date_to_utc
from swh.web.utils.exc import NotFoundExc
from swh.web.utils.typing import OriginVisitInfo


[docs] def get_origin_visits( origin_url: str, lookup_similar_urls: bool = True, visit_type: Optional[str] = None, limit: Optional[int] = None, ) -> List[OriginVisitInfo]: """Function that returns the list of visits for a swh origin. That list is put in cache in order to speedup the navigation in the swh web browse ui. The returned visits are sorted according to their date in ascending order. Args: origin_url: URL of origin lookup_similar_urls: if :const:`True`, lookup origin with and without trailing slash in its URL visit_type: filter visits by type limit: retrieve recent visits up to that limit if provided Returns: A list of dict describing the origin visits Raises: swh.web.utils.exc.NotFoundExc: if the origin is not found """ from swh.web.utils import archive def _filter_visits_by_type( visits: List[OriginVisitInfo], visit_type: Optional[str] = None ) -> List[OriginVisitInfo]: return [ visit for visit in visits if visit_type is None or visit["type"] == visit_type ] origin_url = archive.lookup_origin( origin_url, lookup_similar_urls=lookup_similar_urls )["url"] cache_entry_id = "origin_visits_%s" % origin_url cache_entry = cache_get(cache_entry_id) last_visit = None origin_visits = [] new_visits = [] per_page = archive.MAX_LIMIT if limit is not None and limit < per_page: per_page = limit if cache_entry: origin_visits = cache_entry last_visit = origin_visits[-1]["visit"] new_visits = list( archive.lookup_origin_visits( origin_url, last_visit=last_visit, per_page=per_page ) ) last_visit += len(new_visits) if not new_visits: if limit is not None and len(origin_visits) > limit: origin_visits = origin_visits[-limit:] last_snp = archive.lookup_latest_origin_snapshot(origin_url) if not last_snp or last_snp["id"] == origin_visits[-1]["snapshot"]: return _filter_visits_by_type(origin_visits, visit_type=visit_type) else: # retrieve visits from the most recent to the oldest while True and (limit is None or len(new_visits) < limit): visits = list( archive.lookup_origin_visits( origin_url, last_visit=last_visit, per_page=per_page, desc_order=True, ) ) new_visits += visits if len(visits) < per_page: break last_visit = visits[-1]["visit"] if new_visits: last_visit = new_visits[0]["visit"] # get new visits that we did not retrieve yet while True and (limit is None or len(new_visits) < limit): visits = list( archive.lookup_origin_visits( origin_url, last_visit=last_visit, per_page=per_page ) ) new_visits += visits if len(visits) < per_page: break last_visit = visits[-1]["visit"] # cache entry is already sorted with oldest visits origin_visits += sorted( new_visits, key=lambda v: (datetime.fromisoformat(v["date"]), v["visit"]) ) if limit is not None and len(origin_visits) > limit: origin_visits = origin_visits[-limit:] cache_set(cache_entry_id, origin_visits) return _filter_visits_by_type(origin_visits, visit_type=visit_type)
[docs] def get_origin_visit( origin_url: str, visit_ts: Optional[str] = None, visit_id: Optional[int] = None, snapshot_id: Optional[str] = None, visit_type: Optional[str] = None, ) -> OriginVisitInfo: """Function that returns information about a visit for a given origin. If a timestamp is provided, the closest visit from that timestamp is returned. If a snapshot identifier is provided, the first visit with that snapshot is returned. If no search hints are provided, return the most recent full visit with a valid snapshot or the most recent partial visit with a valid snapshot otherwise. Args: origin_url: URL of origin visit_ts: an ISO 8601 datetime string to parse snapshot_id: a snapshot identifier Returns: A dict containing the visit info. Raises: swh.web.utils.exc.NotFoundExc: if no visit can be found """ # returns the latest full visit with a valid snapshot visit = archive.lookup_origin_visit_latest( origin_url, allowed_statuses=["full"], require_snapshot=True, type=visit_type, ) if not visit: # or the latest partial visit with a valid snapshot otherwise visit = archive.lookup_origin_visit_latest( origin_url, allowed_statuses=["partial"], require_snapshot=True, type=visit_type, ) if not visit_ts and not visit_id and not snapshot_id: if visit: return visit else: raise NotFoundExc(f"No valid visit for origin with url {origin_url} found!") # no need to fetch all visits list and search in it if the latest # visit matches some criteria if visit and (visit["snapshot"] == snapshot_id or visit["visit"] == visit_id): return visit if visit_id: return archive.lookup_origin_visit(origin_url, visit_id) error_message_prefix = "Visit" if visit_type is not None: error_message_prefix += f" of type {visit_type}" if visit_ts: visit = archive.origin_visit_find_by_date( origin_url, parse_iso8601_date_to_utc(visit_ts), greater_or_equal=False, type=visit_type, ) if visit is not None: return visit else: raise NotFoundExc( ( f"{error_message_prefix} with timestamp {visit_ts} for origin with " f"url {origin_url} not found!" ) ) visits = get_origin_visits(origin_url, visit_type=visit_type) if not visits: raise NotFoundExc(f"No visits associated to origin with url {origin_url}!") if snapshot_id: visits = [v for v in visits if v.get("snapshot") == snapshot_id] if len(visits) == 0: raise NotFoundExc( ( f"{error_message_prefix} with snapshot id {snapshot_id} " f"for origin with url {origin_url} not found!" ) ) return visits[0] return visits[-1]