Source code for swh.indexer.metadata

# Copyright (C) 2017-2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from collections import defaultdict
from copy import deepcopy
import datetime
import hashlib
from importlib.metadata import version
import logging
import re
import time
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Set,
    Tuple,
    TypeVar,
)
import urllib.parse
from urllib.parse import urlparse

import sentry_sdk

from swh.core.config import merge_configs
from swh.core.statsd import statsd
from swh.core.utils import grouper
from swh.indexer.codemeta import merge_documents
from swh.indexer.indexer import (
    BaseIndexer,
    ContentIndexer,
    DirectoryIndexer,
    ObjectsDict,
    OriginIndexer,
)
from swh.indexer.metadata_detector import detect_metadata
from swh.indexer.metadata_mapping import get_extrinsic_mappings, get_intrinsic_mappings
from swh.indexer.origin_head import get_head_swhid
from swh.indexer.storage import INDEXER_CFG_KEY
from swh.indexer.storage.model import (
    ContentMetadataRow,
    DirectoryIntrinsicMetadataRow,
    OriginExtrinsicMetadataRow,
    OriginIntrinsicMetadataRow,
)
from swh.model.hashutil import HashDict, hash_to_hex
from swh.model.model import (
    Content,
    Directory,
    MetadataAuthorityType,
    Origin,
    RawExtrinsicMetadata,
    ReleaseTargetType,
    Sha1Git,
)
from swh.model.swhids import CoreSWHID, ExtendedObjectType, ObjectType
from swh.objstorage.factory import get_objstorage
from swh.storage.interface import StorageInterface

# Default batch size per object type (can be overridden through indexer configuration)
DEFAULT_BATCH_SIZE = {
    "revision": 10,
    "release": 10,
    "origin": 10,
}


T1 = TypeVar("T1")
T2 = TypeVar("T2")

logger = logging.getLogger(__name__)

# Count directory found with metadata (or not) and truncated (or not)
METRIC_INTRINSIC_DIRECTORY_COUNT = "swh_indexer_intrinsic_metadata_directory_count"
# Count actual content metadata indexed
METRIC_INTRINSIC_CONTENT_INDEXED_COUNT = (
    "swh_indexer_intrinsic_metadata_content_indexed_count"
)
# Count content metadata indexed (per mapping)
METRIC_INTRINSIC_CONTENT_PER_MAPPING_COUNT = (
    "swh_indexer_intrinsic_metadata_content_per_mapping_count"
)


[docs] def fetch_in_batches( fetch_fn: Callable[[List[T1]], Iterable[T2]], args: List[T1], batch_size: int, ) -> Iterator[T2]: """Calls a function `fetch_fn` on batchs of args, this yields the results when ok. When a batch raised, processing continues with the next batch of data to read. Then another round of read is executed on the failed batchs but one object at a time, any further failure is logged and skipped, so callers receive a *partial* result set rather than a total failure. """ batchs = grouper(args, batch_size) # Read and yield ids we successfully read from `fetch_fn` call for batch in batchs: batch_list: List[T1] = list(batch) try: yield from fetch_fn(batch_list) except Exception: # If the whole batch failed, fall back to fetching objects individually # in case a single object is causing the exception for obj in batch_list: try: # Try to fetch one object at a time yield from fetch_fn([obj]) except Exception as exc: # If it failed again, we found the problematic object, this time, we just # log it and skip it logger.error( "Failure to retrieve object %s when calling %r: %s", obj, fetch_fn, exc, exc_info=True, )
[docs] def fetch_as_dict( fetch_fn: Callable[[List[T1]], Iterable[T2]], ids: List[T1], batch_size: int, ) -> Dict[T1, T2]: """Return a dict ``{id: object}``; missing items are logged.""" result: Dict[T1, T2] = {} for obj in fetch_in_batches(fetch_fn, ids, batch_size): if obj is None: continue result[obj.id] = obj # type: ignore return result
[docs] class ExtrinsicMetadataIndexer( BaseIndexer[Sha1Git, RawExtrinsicMetadata, OriginExtrinsicMetadataRow] ): """Indexer for Raw Extrinsic Metadata For supported extrinsic metadata formats, translate the original format into CodeMeta, and attach the result to the Origin. Use XXX to get registered mapping formats. """ object_types = ["raw_extrinsic_metadata"]
[docs] def process_journal_objects(self, objects: ObjectsDict) -> Dict: summary: Dict[str, Any] = {"status": "uneventful"} try: results = {} for item in objects.get("raw_extrinsic_metadata", []): remd = RawExtrinsicMetadata.from_dict(item) sentry_sdk.set_tag("swh-indexer-remd-swhid", str(remd.swhid())) for result in self.index(remd.id, data=remd): results[result.id] = result except Exception: if not self.catch_exceptions: raise summary["status"] = "failed" return summary summary_persist = self.persist_index_computations(list(results.values())) if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) return summary
[docs] def index( self, id: Sha1Git, data: Optional[RawExtrinsicMetadata], **kwargs, ) -> List[OriginExtrinsicMetadataRow]: if data is None: raise NotImplementedError( "ExtrinsicMetadataIndexer.index() without RawExtrinsicMetadata data" ) if data.target.object_type == ExtendedObjectType.ORIGIN: origin_sha1 = data.target.object_id elif data.origin is not None: if ( data.fetcher.name == "swh-deposit" and data.discovery_date < datetime.datetime( 2024, 5, 16, 15, 20, 0, tzinfo=datetime.timezone.utc ) ): # Workaround for deposits while swh.model.swhids did not unescape origin # URLs while parsing <swh:object swhid="..." />, which was fixed in # https://gitlab.softwareheritage.org/swh/devel/swh-model/-/merge_requests/348 # itself deployed shortly after 2024-05-16T15:18:07 by # https://gitlab.softwareheritage.org/swh/infra/swh-apps/-/commit/70bd86aafcbc1787183e5d2cd52c392ae012e65e if "%" in data.origin: assert re.match( "^https://cran.r-project.org/package%3D[a-zA-Z]+$", data.origin ), data origin_url = urllib.parse.unquote_to_bytes(data.origin) else: origin_url = data.origin.encode() # HACK: As swh-search does not (yet?) support searching on directories # and traversing back to origins, we index metadata on non-origins with # an origin context as if they were on the origin itself. origin_sha1 = hashlib.sha1(origin_url).digest() else: # other types are not supported yet return [] metadata_items = [] mappings: List[str] = [] for mapping_cls in get_extrinsic_mappings().values(): if data.format in mapping_cls.extrinsic_metadata_formats(): mapping = mapping_cls() metadata_item = mapping.translate(data.metadata) if metadata_item is not None: metadata_items.append(metadata_item) mappings.append(mapping.name) if not metadata_items: # Don't have any mapping to parse it, ignore return [] # TODO: batch requests to origin_get_by_sha1() num_retries = 6 sleep_delay = 10 for _ in range(num_retries): origins = self.storage.origin_get_by_sha1([origin_sha1]) try: (origin,) = origins if origin is not None: break except ValueError: pass # The origin does not exist. This may be due to some replication lag # between the loader's DB/journal and the DB we are consuming from. # Wait a bit and try again logger.debug( "Origin %s not found, sleeping for %ss.", data.target, sleep_delay ) time.sleep(sleep_delay) else: # Does not exist, or replication lag > 60s. raise ValueError( f"Unknown origin swh:1:ori:{origin_sha1.hex()} for metadata target: " f"{data.target}. Is the swh-storage database replication lag " f"over {num_retries * sleep_delay}s?" ) from None authority_base_url = urlparse(data.authority.url).netloc origin_base_url = urlparse(origin["url"]).netloc if ( data.authority.type != MetadataAuthorityType.REGISTRY and authority_base_url != origin_base_url ): # Registries are allowed to push metadata provided related to origins that # do not match their own URL # TODO: add ways to define trusted authorities logger.debug( "Authority URL %s and origin URL %s do not match, ignoring.", authority_base_url, origin_base_url, ) return [] metadata = merge_documents(metadata_items) return [ OriginExtrinsicMetadataRow( id=origin["url"], indexer_configuration_id=self.tool["id"], from_remd_id=data.id, mappings=mappings, metadata=metadata, ) ]
[docs] def persist_index_computations( self, results: List[OriginExtrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" return self.idx_storage.origin_extrinsic_metadata_add(results)
[docs] class ContentMetadataIndexer(ContentIndexer[ContentMetadataRow]): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing metadata by given context - using the metadata_mapping as the 'swh-metadata-translator' tool - store result in content_metadata table """ def __init__(self, *args, objstorage=None, **kwargs): kwargs["objstorage"] = objstorage super().__init__(*args, **kwargs)
[docs] def filter(self, ids: List[HashDict]): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.content_metadata_missing( ( { "id": id["sha1"], "indexer_configuration_id": self.tool["id"], } for id in ids ) )
[docs] def index( self, id: HashDict, data: Optional[bytes] = None, log_suffix="unknown directory", **kwargs, ) -> List[ContentMetadataRow]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the metadata keys will be returned as None """ assert "sha1" in id assert data is not None metadata = None content_id = id["sha1"] content_id_str = hash_to_hex(content_id) try: mapping_name = self.tool["tool_configuration"]["context"] log_suffix += f", content_id={content_id_str}" metadata = get_intrinsic_mappings()[mapping_name](log_suffix).translate( data ) self.log.info( "Translated metadata for content %s with mapping %s", content_id_str, mapping_name, ) except Exception: self.log.exception( "Problem during metadata translation for content %s", content_id, ) sentry_sdk.capture_exception() statsd.increment( METRIC_INTRINSIC_CONTENT_INDEXED_COUNT, 1, tags={ "content_found": False, "metadata_found": False, }, ) statsd.increment( METRIC_INTRINSIC_CONTENT_INDEXED_COUNT, 1, tags={ "content_found": True, "metadata_found": metadata is not None, }, ) if metadata is None: return [] return [ ContentMetadataRow( id=content_id, indexer_configuration_id=self.tool["id"], metadata=metadata, ) ]
[docs] def persist_index_computations( self, results: List[ContentMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" return self.idx_storage.content_metadata_add(results)
DEFAULT_CONFIG: Dict[str, Any] = { "tools": { "name": "swh.indexer.metadata", "version": version("swh.indexer"), "configuration": {}, }, }
[docs] def directory_get( storage: StorageInterface, directory_id: Sha1Git, ) -> Tuple[Optional[Directory], bool]: """Get the directory from the storage. This used a more effective implementation to read the directory from the storage. It's currently limited though. It can only read partially a directory. Args: storage: the storage instance directory_id: the directory's identifier Returns: * The directory if it could be properly put back together. None otherwise. * Whether the list of entries was truncated """ directory_page = storage.directory_get_entries(directory_id, limit=100) # The directory does not exist, we just stop if not directory_page: return None, False # Detected a potentially big directory, increment metrics about it and continue # to try and index it as-is truncated_dir = directory_page.next_page_token is not None return ( Directory( id=directory_id, entries=tuple(directory_page.results), ), truncated_dir, )
[docs] class DirectoryMetadataIndexer(DirectoryIndexer[DirectoryIntrinsicMetadataRow]): """Directory-level indexer This indexer is in charge of: - filtering directories already indexed in directory_intrinsic_metadata table with defined computation tool - retrieve all entry_files in directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for directory """ def __init__(self, *args, **kwargs): # Instantiate an objstorage once (for specific cases like multiplexer, this can # spawn threads so we need to limit such instantiation) if "objstorage" in kwargs: kwargs["objstorage"] = get_objstorage(**kwargs["objstorage"]) super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config) # Build a dict of ContentMetadataIndexer per mapping intrinsic_mappings = get_intrinsic_mappings() template_tool_config = { k: self.config[k] for k in [INDEXER_CFG_KEY, "objstorage", "storage", "tools"] } content_metadata_indexers = {} for mapping_name, mapping_config in intrinsic_mappings.items(): cfg = deepcopy(template_tool_config) cfg["tools"]["configuration"]["context"] = mapping_name content_metadata_indexers[mapping_name] = ContentMetadataIndexer( config=cfg, objstorage=self.objstorage ) self.content_metadata_indexers = content_metadata_indexers
[docs] def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.directory_intrinsic_metadata_missing( ( { "id": sha1_git, "indexer_configuration_id": self.tool["id"], } for sha1_git in sha1_gits ) )
[docs] def index( self, id: Sha1Git, data: Optional[Directory] = None, **kwargs ) -> List[DirectoryIntrinsicMetadataRow]: """Index directory by processing it and organizing result. use metadata_detector to iterate on filenames, passes them to the content indexers, then merges (if more than one) Args: id: sha1_git of the directory data: should always be None Returns: dict: dictionary representing a directory_intrinsic_metadata, with keys: - id: directory's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - metadata: dict of retrieved metadata """ assert data is None, "Unexpected directory object" directory, truncated_dir = directory_get(self.storage, id) assert directory is not None metadata: Dict = {} try: subdirs = [entry for entry in directory.entries] if len(subdirs) == 1: subdir = subdirs[0] if subdir.type == "dir": # If the root is just a single directory, recurse into it, e.g. PyPI # packages, GNU tarballs directory, truncated_dir = directory_get( self.storage, subdir.target, ) if not directory: statsd.increment( METRIC_INTRINSIC_DIRECTORY_COUNT, 1, tags={ "directory_found": False, "directory_truncated": False, "metadata_found": False, }, ) return [] metadata_sha1_gits = [] # Map from file direntry to mapping detected entry_to_mapping = defaultdict(set) # Filtering now relevant metadata file entries for mapping_dir_entry, entries in detect_metadata( list(directory.entries) ).items(): for entry in entries: if entry is None: continue content_id = entry.target # It's a sha1_git metadata_sha1_gits.append(content_id) entry_to_mapping[content_id].add(mapping_dir_entry) # We have to transform the list of directory entries returned by the storage # into Content (so ids are correct). Currently, DirectoryEntry uses sha1_git # as id but we need the sha1) mapping_contents: Dict[str, Set[Content]] = defaultdict(set) # Now that we have filtered the interesting file entries, we can retrieve # the filter list of interesting associated contents to have their full ids # and keep the mapping dict updated with a set of Content references instead # of a DirectoryEntry for content in self.storage.content_get( metadata_sha1_gits, algo="sha1_git" ): if content is None: continue for mapping_name in entry_to_mapping[content.sha1_git]: mapping_contents[mapping_name].add(content) # We can now translate into relevant metadata information (mappings, metadata) = self.translate_directory_intrinsic_metadata( mapping_contents, log_suffix=f"directory={hash_to_hex(id)}" ) except Exception as e: self.log.exception("Problem when indexing dir: %r", e) sentry_sdk.capture_exception() return [] finally: statsd.increment( METRIC_INTRINSIC_DIRECTORY_COUNT, 1, tags={ "directory_found": True, "directory_truncated": truncated_dir, "metadata_found": metadata and len(metadata) > 0, }, ) return [ DirectoryIntrinsicMetadataRow( id=id, indexer_configuration_id=self.tool["id"], mappings=mappings, metadata=metadata, ) ]
[docs] def persist_index_computations( self, results: List[DirectoryIntrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" # TODO: add functions in storage to keep data in # directory_intrinsic_metadata return self.idx_storage.directory_intrinsic_metadata_add(results)
[docs] def translate_directory_intrinsic_metadata( self, mapping_contents: Dict[str, Set[Content]], log_suffix: str ) -> Tuple[List[Any], Any]: """Determine how to translate metadata from the directory file entries. Args: files: list of file entries DirectoryEntry of type 'file' Returns: (List[str], dict): list of mappings used and dict with translated metadata according to the CodeMeta vocabulary """ metadata = [] # Load/Retrieve intrinsic mappings intrinsic_mappings = get_intrinsic_mappings() used_mappings = [] for mapping_name, detected_contents in mapping_contents.items(): # Append mapping in list used_mappings.append(intrinsic_mappings[mapping_name].name) # Whether we found content to index or already indexed content_already_indexed = False content_newly_indexed = False # sha1s that are in content_metadata table sha1s_in_idx_storage = [] for c in self.idx_storage.content_metadata_get( [content.sha1 for content in detected_contents] ): # extracting metadata sha1s_in_idx_storage.append(c.id) # id is a sha1 local_metadata = c.metadata # local metadata is aggregated if local_metadata: metadata.append(local_metadata) content_already_indexed = True sha1s_to_index = [ content.hashes() for content in detected_contents if content.sha1 not in sha1s_in_idx_storage ] # If we did not have indexed the file yet if sha1s_to_index: # Retrieve the metadata indexer to use c_metadata_indexer = self.content_metadata_indexers[mapping_name] # content indexing try: _, results = c_metadata_indexer.run( sha1s_to_index, log_suffix=log_suffix, ) indexed_count = 0 for result in results: local_metadata = result.metadata metadata.append(local_metadata) indexed_count += 1 content_newly_indexed = indexed_count > 0 except Exception: self.log.exception("Exception while indexing metadata on contents") sentry_sdk.capture_exception() statsd.increment( METRIC_INTRINSIC_CONTENT_PER_MAPPING_COUNT, 1, tags={ "content_already_indexed": content_already_indexed, "content_newly_indexed": content_newly_indexed, "mapping": mapping_name, "metadata_found": len(metadata) > 0, }, ) metadata = merge_documents(metadata) return (used_mappings, metadata)
[docs] class OriginMetadataIndexer( OriginIndexer[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]] ): """Indexer for intrinsic metadata found within origin's root directory If there is a metadata file corresponding to a known format in the root directory of an Origin (i.e. in the root directory of the , read it and """ USE_TOOLS = False def __init__(self, config=None, **kwargs) -> None: super().__init__(config=config, **kwargs) self.directory_metadata_indexer = DirectoryMetadataIndexer(config=config) self.batch_size = ( config.get("batch_size", DEFAULT_BATCH_SIZE) if config else DEFAULT_BATCH_SIZE )
[docs] def index_list( self, origins: List[Origin], *, check_origin_known: bool = True, **kwargs, ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: head_rev_ids = [] head_rel_ids = [] origin_heads: Dict[Origin, CoreSWHID] = {} # Filter out origins missing from the storage if check_origin_known: known_urls = set( fetch_in_batches( self.storage.origin_get, [origin.url for origin in origins], self.batch_size["origin"], ) ) known_origins = [o for o in origins if o in known_urls] else: known_origins = list(origins) # Scan origins once, collect head IDs per object type {release, revision} for origin in known_origins: if origin is None: continue head_swhid = get_head_swhid(self.storage, origin.url) if head_swhid is None: continue if head_swhid.object_type == ObjectType.REVISION: head_rev_ids.append(head_swhid.object_id) elif head_swhid.object_type == ObjectType.RELEASE: head_rel_ids.append(head_swhid.object_id) else: self.log.error( "Unexpected object type %s for origin %s. Skipping", head_swhid, origin.url, ) continue # Skip origin already whose head is already detected if origin in origin_heads: continue origin_heads[origin] = head_swhid # fetch revisions (and releases) as dict. If revision_get (or release_get) # raises, this will skip such objects. It will receive less results but continue # indexation. head_revs = fetch_as_dict( self.storage.revision_get, head_rev_ids, self.batch_size["revision"] ) head_rels = fetch_as_dict( self.storage.release_get, head_rel_ids, self.batch_size["release"] ) results = [] for origin, head_swhid in origin_heads.items(): sentry_sdk.set_tag("swh-indexer-origin-url", origin.url) sentry_sdk.set_tag("swh-indexer-origin-head-swhid", str(head_swhid)) if head_swhid.object_type == ObjectType.REVISION: rev = head_revs.get(head_swhid.object_id) if not rev: self.log.warning( "Missing revision head object %s of origin %r", head_swhid, origin.url, ) continue directory_id = rev.directory elif head_swhid.object_type == ObjectType.RELEASE: rel = head_rels.get(head_swhid.object_id) if not rel: self.log.warning( "Missing release head object %s of origin %r", head_swhid, origin.url, ) continue if rel.target_type != ReleaseTargetType.DIRECTORY: # TODO self.log.warning( "Head release %s of %r has unexpected target type %s", head_swhid, origin.url, rel.target_type, ) continue assert rel.target, rel directory_id = rel.target else: self.log.error("Unhandled head type %s for %s", head_swhid, origin.url) continue for dir_metadata in self.directory_metadata_indexer.index(directory_id): # There is at most one dir_metadata orig_metadata = OriginIntrinsicMetadataRow( from_directory=dir_metadata.id, id=origin.url, metadata=dir_metadata.metadata, mappings=dir_metadata.mappings, indexer_configuration_id=dir_metadata.indexer_configuration_id, ) results.append((orig_metadata, dir_metadata)) return results
[docs] def persist_index_computations( self, results: List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]], ) -> Dict[str, int]: # Deduplicate directories dir_metadata: Dict[bytes, DirectoryIntrinsicMetadataRow] = {} orig_metadata: Dict[str, OriginIntrinsicMetadataRow] = {} summary: Dict = {} for orig_item, dir_item in results: assert dir_item.metadata == orig_item.metadata if dir_item.metadata and not (dir_item.metadata.keys() <= {"@context"}): # Only store non-empty metadata sets if dir_item.id not in dir_metadata: dir_metadata[dir_item.id] = dir_item if orig_item.id not in orig_metadata: orig_metadata[orig_item.id] = orig_item if dir_metadata: summary_dir = self.idx_storage.directory_intrinsic_metadata_add( list(dir_metadata.values()) ) summary.update(summary_dir) if orig_metadata: summary_ori = self.idx_storage.origin_intrinsic_metadata_add( list(orig_metadata.values()) ) summary.update(summary_ori) return summary