Source code for swh.indexer.metadata

# Copyright (C) 2017-2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from copy import deepcopy
import datetime
import hashlib
from importlib.metadata import version
import logging
import re
import time
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Tuple,
    TypeVar,
    cast,
)
import urllib.parse
from urllib.parse import urlparse

import sentry_sdk

from swh.core.config import merge_configs
from swh.core.utils import grouper
from swh.indexer.codemeta import merge_documents
from swh.indexer.indexer import (
    BaseIndexer,
    ContentIndexer,
    DirectoryIndexer,
    ObjectsDict,
    OriginIndexer,
)
from swh.indexer.metadata_detector import detect_metadata
from swh.indexer.metadata_mapping import get_extrinsic_mappings, get_intrinsic_mappings
from swh.indexer.metadata_mapping.base import DirectoryLsEntry
from swh.indexer.origin_head import get_head_swhid
from swh.indexer.storage import INDEXER_CFG_KEY
from swh.indexer.storage.model import (
    ContentMetadataRow,
    DirectoryIntrinsicMetadataRow,
    OriginExtrinsicMetadataRow,
    OriginIntrinsicMetadataRow,
)
from swh.model.hashutil import HashDict, hash_to_hex
from swh.model.model import (
    Directory,
    MetadataAuthorityType,
    Origin,
    RawExtrinsicMetadata,
    ReleaseTargetType,
    Sha1Git,
)
from swh.model.swhids import CoreSWHID, ExtendedObjectType, ObjectType

# Default batch size per object type (can be overridden through indexer configuration)
DEFAULT_BATCH_SIZE = {
    "revision": 10,
    "release": 10,
    "origin": 10,
}


T1 = TypeVar("T1")
T2 = TypeVar("T2")

logger = logging.getLogger(__name__)


[docs] def fetch_in_batches( fetch_fn: Callable[[List[T1]], Iterable[T2]], args: List[T1], batch_size: int, ) -> Iterator[T2]: """Calls a function `fetch_fn` on batchs of args, this yields the results when ok. When a batch raised, processing continues with the next batch of data to read. Then another round of read is executed on the failed batchs but one object at a time, any further failure is logged and skipped, so callers receive a *partial* result set rather than a total failure. """ batchs = grouper(args, batch_size) # Read and yield ids we successfully read from `fetch_fn` call for batch in batchs: batch_list: List[T1] = list(batch) try: yield from fetch_fn(batch_list) except Exception: # If the whole batch failed, fall back to fetching objects individually # in case a single object is causing the exception for obj in batch_list: try: # Try to fetch one object at a time yield from fetch_fn([obj]) except Exception as exc: # If it failed again, we found the problematic object, this time, we just # log it and skip it logger.error( "Failure to retrieve object %s when calling %r: %s", obj, fetch_fn, exc, exc_info=True, )
[docs] def fetch_as_dict( fetch_fn: Callable[[List[T1]], Iterable[T2]], ids: List[T1], batch_size: int, ) -> Dict[T1, T2]: """Return a dict ``{id: object}``; missing items are logged.""" result: Dict[T1, T2] = {} for obj in fetch_in_batches(fetch_fn, ids, batch_size): if obj is None: continue result[obj.id] = obj # type: ignore return result
[docs] class ExtrinsicMetadataIndexer( BaseIndexer[Sha1Git, RawExtrinsicMetadata, OriginExtrinsicMetadataRow] ): """Indexer for Raw Extrinsic Metadata For supported extrinsic metadata formats, translate the original format into CodeMeta, and attach the result to the Origin. Use XXX to get registered mapping formats. """ object_types = ["raw_extrinsic_metadata"]
[docs] def process_journal_objects(self, objects: ObjectsDict) -> Dict: summary: Dict[str, Any] = {"status": "uneventful"} try: results = {} for item in objects.get("raw_extrinsic_metadata", []): remd = RawExtrinsicMetadata.from_dict(item) sentry_sdk.set_tag("swh-indexer-remd-swhid", str(remd.swhid())) for result in self.index(remd.id, data=remd): results[result.id] = result except Exception: if not self.catch_exceptions: raise summary["status"] = "failed" return summary summary_persist = self.persist_index_computations(list(results.values())) if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) return summary
[docs] def index( self, id: Sha1Git, data: Optional[RawExtrinsicMetadata], **kwargs, ) -> List[OriginExtrinsicMetadataRow]: if data is None: raise NotImplementedError( "ExtrinsicMetadataIndexer.index() without RawExtrinsicMetadata data" ) if data.target.object_type == ExtendedObjectType.ORIGIN: origin_sha1 = data.target.object_id elif data.origin is not None: if ( data.fetcher.name == "swh-deposit" and data.discovery_date < datetime.datetime( 2024, 5, 16, 15, 20, 0, tzinfo=datetime.timezone.utc ) ): # Workaround for deposits while swh.model.swhids did not unescape origin # URLs while parsing <swh:object swhid="..." />, which was fixed in # https://gitlab.softwareheritage.org/swh/devel/swh-model/-/merge_requests/348 # itself deployed shortly after 2024-05-16T15:18:07 by # https://gitlab.softwareheritage.org/swh/infra/swh-apps/-/commit/70bd86aafcbc1787183e5d2cd52c392ae012e65e if "%" in data.origin: assert re.match( "^https://cran.r-project.org/package%3D[a-zA-Z]+$", data.origin ), data origin_url = urllib.parse.unquote_to_bytes(data.origin) else: origin_url = data.origin.encode() # HACK: As swh-search does not (yet?) support searching on directories # and traversing back to origins, we index metadata on non-origins with # an origin context as if they were on the origin itself. origin_sha1 = hashlib.sha1(origin_url).digest() else: # other types are not supported yet return [] metadata_items = [] mappings: List[str] = [] for mapping_cls in get_extrinsic_mappings().values(): if data.format in mapping_cls.extrinsic_metadata_formats(): mapping = mapping_cls() metadata_item = mapping.translate(data.metadata) if metadata_item is not None: metadata_items.append(metadata_item) mappings.append(mapping.name) if not metadata_items: # Don't have any mapping to parse it, ignore return [] # TODO: batch requests to origin_get_by_sha1() num_retries = 6 sleep_delay = 10 for _ in range(num_retries): origins = self.storage.origin_get_by_sha1([origin_sha1]) try: (origin,) = origins if origin is not None: break except ValueError: pass # The origin does not exist. This may be due to some replication lag # between the loader's DB/journal and the DB we are consuming from. # Wait a bit and try again logger.debug( "Origin %s not found, sleeping for %ss.", data.target, sleep_delay ) time.sleep(sleep_delay) else: # Does not exist, or replication lag > 60s. raise ValueError( f"Unknown origin swh:1:ori:{origin_sha1.hex()} for metadata target: " f"{data.target}. Is the swh-storage database replication lag " f"over {num_retries * sleep_delay}s?" ) from None authority_base_url = urlparse(data.authority.url).netloc origin_base_url = urlparse(origin["url"]).netloc if ( data.authority.type != MetadataAuthorityType.REGISTRY and authority_base_url != origin_base_url ): # Registries are allowed to push metadata provided related to origins that # do not match their own URL # TODO: add ways to define trusted authorities logger.debug( "Authority URL %s and origin URL %s do not match, ignoring.", authority_base_url, origin_base_url, ) return [] metadata = merge_documents(metadata_items) return [ OriginExtrinsicMetadataRow( id=origin["url"], indexer_configuration_id=self.tool["id"], from_remd_id=data.id, mappings=mappings, metadata=metadata, ) ]
[docs] def persist_index_computations( self, results: List[OriginExtrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" return self.idx_storage.origin_extrinsic_metadata_add(results)
[docs] class ContentMetadataIndexer(ContentIndexer[ContentMetadataRow]): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing metadata by given context - using the metadata_mapping as the 'swh-metadata-translator' tool - store result in content_metadata table """
[docs] def filter(self, ids: List[HashDict]): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.content_metadata_missing( ( { "id": id["sha1"], "indexer_configuration_id": self.tool["id"], } for id in ids ) )
[docs] def index( self, id: HashDict, data: Optional[bytes] = None, log_suffix="unknown directory", **kwargs, ) -> List[ContentMetadataRow]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the metadata keys will be returned as None """ assert "sha1" in id assert data is not None metadata = None try: mapping_name = self.tool["tool_configuration"]["context"] log_suffix += ", content_id=%s" % hash_to_hex(id["sha1"]) metadata = get_intrinsic_mappings()[mapping_name](log_suffix).translate( data ) except Exception: self.log.exception( "Problem during metadata translation " "for content %s" % hash_to_hex(id["sha1"]) ) sentry_sdk.capture_exception() if metadata is None: return [] return [ ContentMetadataRow( id=id["sha1"], indexer_configuration_id=self.tool["id"], metadata=metadata, ) ]
[docs] def persist_index_computations( self, results: List[ContentMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" return self.idx_storage.content_metadata_add(results)
DEFAULT_CONFIG: Dict[str, Any] = { "tools": { "name": "swh.indexer.metadata", "version": version("swh.indexer"), "configuration": {}, }, }
[docs] class DirectoryMetadataIndexer(DirectoryIndexer[DirectoryIntrinsicMetadataRow]): """Directory-level indexer This indexer is in charge of: - filtering directories already indexed in directory_intrinsic_metadata table with defined computation tool - retrieve all entry_files in directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for directory """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config)
[docs] def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.directory_intrinsic_metadata_missing( ( { "id": sha1_git, "indexer_configuration_id": self.tool["id"], } for sha1_git in sha1_gits ) )
[docs] def index( self, id: Sha1Git, data: Optional[Directory] = None, **kwargs ) -> List[DirectoryIntrinsicMetadataRow]: """Index directory by processing it and organizing result. use metadata_detector to iterate on filenames, passes them to the content indexers, then merges (if more than one) Args: id: sha1_git of the directory data: should always be None Returns: dict: dictionary representing a directory_intrinsic_metadata, with keys: - id: directory's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - metadata: dict of retrieved metadata """ dir_: List[DirectoryLsEntry] assert data is None, "Unexpected directory object" dir_ = cast( List[DirectoryLsEntry], list(self.storage.directory_ls(id, recursive=False)), ) try: if [entry["type"] for entry in dir_] == ["dir"]: # If the root is just a single directory, recurse into it # eg. PyPI packages, GNU tarballs subdir = dir_[0]["target"] dir_ = cast( List[DirectoryLsEntry], list(self.storage.directory_ls(subdir, recursive=False)), ) files = [entry for entry in dir_ if entry["type"] == "file"] (mappings, metadata) = self.translate_directory_intrinsic_metadata( files, log_suffix="directory=%s" % hash_to_hex(id), ) except Exception as e: self.log.exception("Problem when indexing dir: %r", e) sentry_sdk.capture_exception() return [] return [ DirectoryIntrinsicMetadataRow( id=id, indexer_configuration_id=self.tool["id"], mappings=mappings, metadata=metadata, ) ]
[docs] def persist_index_computations( self, results: List[DirectoryIntrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" # TODO: add functions in storage to keep data in # directory_intrinsic_metadata return self.idx_storage.directory_intrinsic_metadata_add(results)
[docs] def translate_directory_intrinsic_metadata( self, files: List[DirectoryLsEntry], log_suffix: str ) -> Tuple[List[Any], Any]: """ Determine plan of action to translate metadata in the given root directory Args: files: list of file entries, as returned by :meth:`swh.storage.interface.StorageInterface.directory_ls` Returns: (List[str], dict): list of mappings used and dict with translated metadata according to the CodeMeta vocabulary """ metadata = [] # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = { k: self.config[k] for k in [INDEXER_CFG_KEY, "objstorage", "storage", "tools"] } all_detected_files = detect_metadata(files) used_mappings = [ get_intrinsic_mappings()[context].name for context in all_detected_files ] for mapping_name, detected_files in all_detected_files.items(): cfg = deepcopy(config) cfg["tools"]["configuration"]["context"] = mapping_name c_metadata_indexer = ContentMetadataIndexer(config=cfg) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get( [f["sha1"] for f in detected_files] ) for c in metadata_generator: # extracting metadata sha1 = c.id sha1s_in_storage.append(sha1) local_metadata = c.metadata # local metadata is aggregated if local_metadata: metadata.append(local_metadata) sha1s_filtered = [ item for item in detected_files if item["sha1"] not in sha1s_in_storage ] if sha1s_filtered: # content indexing try: _, results = c_metadata_indexer.run( sha1s_filtered, log_suffix=log_suffix, ) # on the fly possibility: for result in results: local_metadata = result.metadata metadata.append(local_metadata) except Exception: self.log.exception("Exception while indexing metadata on contents") sentry_sdk.capture_exception() metadata = merge_documents(metadata) return (used_mappings, metadata)
[docs] class OriginMetadataIndexer( OriginIndexer[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]] ): """Indexer for intrinsic metadata found within origin's root directory If there is a metadata file corresponding to a known format in the root directory of an Origin (i.e. in the root directory of the , read it and """ USE_TOOLS = False def __init__(self, config=None, **kwargs) -> None: super().__init__(config=config, **kwargs) self.directory_metadata_indexer = DirectoryMetadataIndexer(config=config) self.batch_size = ( config.get("batch_size", DEFAULT_BATCH_SIZE) if config else DEFAULT_BATCH_SIZE )
[docs] def index_list( self, origins: List[Origin], *, check_origin_known: bool = True, **kwargs, ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: head_rev_ids = [] head_rel_ids = [] origin_heads: Dict[Origin, CoreSWHID] = {} # Filter out origins missing from the storage if check_origin_known: known_urls = set( fetch_in_batches( self.storage.origin_get, [origin.url for origin in origins], self.batch_size["origin"], ) ) known_origins = [o for o in origins if o in known_urls] else: known_origins = list(origins) # Scan origins once, collect head IDs per object type {release, revision} for origin in known_origins: if origin is None: continue head_swhid = get_head_swhid(self.storage, origin.url) if head_swhid is None: continue if head_swhid.object_type == ObjectType.REVISION: head_rev_ids.append(head_swhid.object_id) elif head_swhid.object_type == ObjectType.RELEASE: head_rel_ids.append(head_swhid.object_id) else: self.log.error( "Unexpected object type %s for origin %s. Skipping", head_swhid, origin.url, ) continue # Skip origin already whose head is already detected if origin in origin_heads: continue origin_heads[origin] = head_swhid # fetch revisions (and releases) as dict. If revision_get (or release_get) # raises, this will skip such objects. It will receive less results but continue # indexation. head_revs = fetch_as_dict( self.storage.revision_get, head_rev_ids, self.batch_size["revision"] ) head_rels = fetch_as_dict( self.storage.release_get, head_rel_ids, self.batch_size["release"] ) results = [] for origin, head_swhid in origin_heads.items(): sentry_sdk.set_tag("swh-indexer-origin-url", origin.url) sentry_sdk.set_tag("swh-indexer-origin-head-swhid", str(head_swhid)) if head_swhid.object_type == ObjectType.REVISION: rev = head_revs.get(head_swhid.object_id) if not rev: self.log.warning( "Missing revision head object %s of origin %r", head_swhid, origin.url, ) continue directory_id = rev.directory elif head_swhid.object_type == ObjectType.RELEASE: rel = head_rels.get(head_swhid.object_id) if not rel: self.log.warning( "Missing release head object %s of origin %r", head_swhid, origin.url, ) continue if rel.target_type != ReleaseTargetType.DIRECTORY: # TODO self.log.warning( "Head release %s of %r has unexpected target type %s", head_swhid, origin.url, rel.target_type, ) continue assert rel.target, rel directory_id = rel.target else: self.log.error("Unhandled head type %s for %s", head_swhid, origin.url) continue for dir_metadata in self.directory_metadata_indexer.index(directory_id): # There is at most one dir_metadata orig_metadata = OriginIntrinsicMetadataRow( from_directory=dir_metadata.id, id=origin.url, metadata=dir_metadata.metadata, mappings=dir_metadata.mappings, indexer_configuration_id=dir_metadata.indexer_configuration_id, ) results.append((orig_metadata, dir_metadata)) return results
[docs] def persist_index_computations( self, results: List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]], ) -> Dict[str, int]: # Deduplicate directories dir_metadata: Dict[bytes, DirectoryIntrinsicMetadataRow] = {} orig_metadata: Dict[str, OriginIntrinsicMetadataRow] = {} summary: Dict = {} for orig_item, dir_item in results: assert dir_item.metadata == orig_item.metadata if dir_item.metadata and not (dir_item.metadata.keys() <= {"@context"}): # Only store non-empty metadata sets if dir_item.id not in dir_metadata: dir_metadata[dir_item.id] = dir_item if orig_item.id not in orig_metadata: orig_metadata[orig_item.id] = orig_item if dir_metadata: summary_dir = self.idx_storage.directory_intrinsic_metadata_add( list(dir_metadata.values()) ) summary.update(summary_dir) if orig_metadata: summary_ori = self.idx_storage.origin_intrinsic_metadata_add( list(orig_metadata.values()) ) summary.update(summary_ori) return summary