Source code for swh.web.utils.origin_visits
# Copyright (C) 2018-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import List, Optional
from swh.web.utils import archive, cache_get, cache_set, parse_iso8601_date_to_utc
from swh.web.utils.exc import NotFoundExc
from swh.web.utils.typing import OriginVisitInfo
[docs]
def get_origin_visits(
origin_url: str,
lookup_similar_urls: bool = True,
visit_type: Optional[str] = None,
) -> List[OriginVisitInfo]:
"""Function that returns the list of visits for a swh origin.
That list is put in cache in order to speedup the navigation
in the swh web browse ui.
The returned visits are sorted according to their date in
ascending order.
Args:
origin_url: URL of origin
lookup_similar_urls: if :const:`True`, lookup origin with and
without trailing slash in its URL
Returns:
A list of dict describing the origin visits
Raises:
swh.web.utils.exc.NotFoundExc: if the origin is not found
"""
from swh.web.utils import archive
def _filter_visits_by_type(
visits: List[OriginVisitInfo], visit_type: Optional[str] = None
) -> List[OriginVisitInfo]:
return [
visit
for visit in visits
if visit_type is None or visit["type"] == visit_type
]
origin_url = archive.lookup_origin(
origin_url, lookup_similar_urls=lookup_similar_urls
)["url"]
cache_entry_id = "origin_visits_%s" % origin_url
cache_entry = cache_get(cache_entry_id)
last_visit = 0
origin_visits = []
new_visits = []
per_page = archive.MAX_LIMIT
if cache_entry:
origin_visits = cache_entry
last_visit = cache_entry[-1]["visit"]
new_visits = list(
archive.lookup_origin_visits(
origin_url, last_visit=last_visit, per_page=per_page
)
)
last_visit += len(new_visits)
if not new_visits:
last_snp = archive.lookup_latest_origin_snapshot(origin_url)
if not last_snp or last_snp["id"] == cache_entry[-1]["snapshot"]:
return _filter_visits_by_type(cache_entry, visit_type=visit_type)
# get new visits that we did not retrieve yet
while 1:
visits = list(
archive.lookup_origin_visits(
origin_url, last_visit=last_visit, per_page=per_page
)
)
new_visits += visits
if len(visits) < per_page:
break
last_visit += per_page
def _visit_sort_key(visit):
ts = parse_iso8601_date_to_utc(visit["date"]).timestamp()
return ts + (float(visit["visit"]) / 10e3)
# cache entry is already sorted with oldest visits
origin_visits += sorted(new_visits, key=lambda v: _visit_sort_key(v))
cache_set(cache_entry_id, origin_visits)
return _filter_visits_by_type(origin_visits, visit_type=visit_type)
[docs]
def get_origin_visit(
origin_url: str,
visit_ts: Optional[str] = None,
visit_id: Optional[int] = None,
snapshot_id: Optional[str] = None,
visit_type: Optional[str] = None,
) -> OriginVisitInfo:
"""Function that returns information about a visit for a given origin.
If a timestamp is provided, the closest visit from that
timestamp is returned.
If a snapshot identifier is provided, the first visit with that snapshot
is returned.
If no search hints are provided, return the most recent full visit with
a valid snapshot or the most recent partial visit with a valid snapshot
otherwise.
Args:
origin_url: URL of origin
visit_ts: an ISO 8601 datetime string to parse
snapshot_id: a snapshot identifier
Returns:
A dict containing the visit info.
Raises:
swh.web.utils.exc.NotFoundExc: if no visit can be found
"""
# returns the latest full visit with a valid snapshot
visit = archive.lookup_origin_visit_latest(
origin_url,
allowed_statuses=["full"],
require_snapshot=True,
type=visit_type,
)
if not visit:
# or the latest partial visit with a valid snapshot otherwise
visit = archive.lookup_origin_visit_latest(
origin_url,
allowed_statuses=["partial"],
require_snapshot=True,
type=visit_type,
)
if not visit_ts and not visit_id and not snapshot_id:
if visit:
return visit
else:
raise NotFoundExc(f"No valid visit for origin with url {origin_url} found!")
# no need to fetch all visits list and search in it if the latest
# visit matches some criteria
if visit and (visit["snapshot"] == snapshot_id or visit["visit"] == visit_id):
return visit
if visit_id:
return archive.lookup_origin_visit(origin_url, visit_id)
error_message_prefix = "Visit"
if visit_type is not None:
error_message_prefix += f" of type {visit_type}"
if visit_ts:
visit = archive.origin_visit_find_by_date(
origin_url,
parse_iso8601_date_to_utc(visit_ts),
greater_or_equal=False,
type=visit_type,
)
if visit is not None:
return visit
else:
raise NotFoundExc(
(
f"{error_message_prefix} with timestamp {visit_ts} for origin with "
f"url {origin_url} not found!"
)
)
visits = get_origin_visits(origin_url, visit_type=visit_type)
if not visits:
raise NotFoundExc(f"No visits associated to origin with url {origin_url}!")
if snapshot_id:
visits = [v for v in visits if v["snapshot"] == snapshot_id]
if len(visits) == 0:
raise NotFoundExc(
(
f"{error_message_prefix} with snapshot id {snapshot_id} "
f"for origin with url {origin_url} not found!"
)
)
return visits[0]
return visits[-1]