# Copyright (C) 2018-2026 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
from typing import List, Optional
from swh.web.utils import archive, cache_get, cache_set, parse_iso8601_date_to_utc
from swh.web.utils.exc import NotFoundExc
from swh.web.utils.typing import OriginVisitInfo
[docs]
def get_origin_visits(
origin_url: str,
lookup_similar_urls: bool = True,
visit_type: Optional[str] = None,
limit: Optional[int] = None,
) -> List[OriginVisitInfo]:
"""Function that returns the list of visits for a swh origin.
That list is put in cache in order to speedup the navigation
in the swh web browse ui.
The returned visits are sorted according to their date in
ascending order.
Args:
origin_url: URL of origin
lookup_similar_urls: if :const:`True`, lookup origin with and
without trailing slash in its URL
visit_type: filter visits by type
limit: retrieve recent visits up to that limit if provided
Returns:
A list of dict describing the origin visits
Raises:
swh.web.utils.exc.NotFoundExc: if the origin is not found
"""
from swh.web.utils import archive
def _filter_visits_by_type(
visits: List[OriginVisitInfo], visit_type: Optional[str] = None
) -> List[OriginVisitInfo]:
return [
visit
for visit in visits
if visit_type is None or visit["type"] == visit_type
]
origin_url = archive.lookup_origin(
origin_url, lookup_similar_urls=lookup_similar_urls
)["url"]
cache_entry_id = "origin_visits_%s" % origin_url
cache_entry = cache_get(cache_entry_id)
last_visit = None
origin_visits = []
new_visits = []
per_page = archive.MAX_LIMIT
if limit is not None and limit < per_page:
per_page = limit
if cache_entry:
origin_visits = cache_entry
last_visit = origin_visits[-1]["visit"]
new_visits = list(
archive.lookup_origin_visits(
origin_url, last_visit=last_visit, per_page=per_page
)
)
last_visit += len(new_visits)
if not new_visits:
if limit is not None and len(origin_visits) > limit:
origin_visits = origin_visits[-limit:]
last_snp = archive.lookup_latest_origin_snapshot(origin_url)
if not last_snp or last_snp["id"] == origin_visits[-1]["snapshot"]:
return _filter_visits_by_type(origin_visits, visit_type=visit_type)
else:
# retrieve visits from the most recent to the oldest
while True and (limit is None or len(new_visits) < limit):
visits = list(
archive.lookup_origin_visits(
origin_url,
last_visit=last_visit,
per_page=per_page,
desc_order=True,
)
)
new_visits += visits
if len(visits) < per_page:
break
last_visit = visits[-1]["visit"]
if new_visits:
last_visit = new_visits[0]["visit"]
# get new visits that we did not retrieve yet
while True and (limit is None or len(new_visits) < limit):
visits = list(
archive.lookup_origin_visits(
origin_url, last_visit=last_visit, per_page=per_page
)
)
new_visits += visits
if len(visits) < per_page:
break
last_visit = visits[-1]["visit"]
# cache entry is already sorted with oldest visits
origin_visits += sorted(
new_visits, key=lambda v: (datetime.fromisoformat(v["date"]), v["visit"])
)
if limit is not None and len(origin_visits) > limit:
origin_visits = origin_visits[-limit:]
cache_set(cache_entry_id, origin_visits)
return _filter_visits_by_type(origin_visits, visit_type=visit_type)
[docs]
def get_origin_visit(
origin_url: str,
visit_ts: Optional[str] = None,
visit_id: Optional[int] = None,
snapshot_id: Optional[str] = None,
visit_type: Optional[str] = None,
) -> OriginVisitInfo:
"""Function that returns information about a visit for a given origin.
If a timestamp is provided, the closest visit from that
timestamp is returned.
If a snapshot identifier is provided, the first visit with that snapshot
is returned.
If no search hints are provided, return the most recent full visit with
a valid snapshot or the most recent partial visit with a valid snapshot
otherwise.
Args:
origin_url: URL of origin
visit_ts: an ISO 8601 datetime string to parse
snapshot_id: a snapshot identifier
Returns:
A dict containing the visit info.
Raises:
swh.web.utils.exc.NotFoundExc: if no visit can be found
"""
# returns the latest full visit with a valid snapshot
visit = archive.lookup_origin_visit_latest(
origin_url,
allowed_statuses=["full"],
require_snapshot=True,
type=visit_type,
)
if not visit:
# or the latest partial visit with a valid snapshot otherwise
visit = archive.lookup_origin_visit_latest(
origin_url,
allowed_statuses=["partial"],
require_snapshot=True,
type=visit_type,
)
if not visit_ts and not visit_id and not snapshot_id:
if visit:
return visit
else:
raise NotFoundExc(f"No valid visit for origin with url {origin_url} found!")
# no need to fetch all visits list and search in it if the latest
# visit matches some criteria
if visit and (visit["snapshot"] == snapshot_id or visit["visit"] == visit_id):
return visit
if visit_id:
return archive.lookup_origin_visit(origin_url, visit_id)
error_message_prefix = "Visit"
if visit_type is not None:
error_message_prefix += f" of type {visit_type}"
if visit_ts:
visit = archive.origin_visit_find_by_date(
origin_url,
parse_iso8601_date_to_utc(visit_ts),
greater_or_equal=False,
type=visit_type,
)
if visit is not None:
return visit
else:
raise NotFoundExc(
(
f"{error_message_prefix} with timestamp {visit_ts} for origin with "
f"url {origin_url} not found!"
)
)
visits = get_origin_visits(origin_url, visit_type=visit_type)
if not visits:
raise NotFoundExc(f"No visits associated to origin with url {origin_url}!")
if snapshot_id:
visits = [v for v in visits if v.get("snapshot") == snapshot_id]
if len(visits) == 0:
raise NotFoundExc(
(
f"{error_message_prefix} with snapshot id {snapshot_id} "
f"for origin with url {origin_url} not found!"
)
)
return visits[0]
return visits[-1]