Source code for swh.indexer.metadata_dictionary.base

# Copyright (C) 2017-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import json
import logging
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Pattern,
    Tuple,
    TypeVar,
    Union,
    cast,
)
import uuid
import xml.parsers.expat

from pyld import jsonld
import rdflib
from typing_extensions import TypedDict
import xmltodict
import yaml

from swh.indexer.codemeta import _document_loader, compact
from swh.indexer.namespaces import RDF, SCHEMA
from swh.indexer.storage.interface import Sha1
from swh.objstorage.interface import CompositeObjId, objid_from_dict

from .utils import add_url_if_valid

TMP_ROOT_URI_PREFIX = "https://www.softwareheritage.org/schema/2022/indexer/tmp-node/"
"""Prefix used to generate temporary URIs for root nodes being translated."""


[docs] class DirectoryLsEntry(TypedDict): target: Sha1 sha1: Optional[Sha1] sha1_git: Optional[bytes] sha256: Optional[bytes] name: bytes type: str
TTranslateCallable = TypeVar( "TTranslateCallable", bound=Callable[[Any, rdflib.Graph, rdflib.term.BNode, Any], None], )
[docs] def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]: """Returns a decorator that marks the decorated function as adding the given terms to the ``translated_metadata`` dict""" def decorator(f: TTranslateCallable) -> TTranslateCallable: if not hasattr(f, "produced_terms"): f.produced_terms = [] # type: ignore f.produced_terms.extend(uris) # type: ignore return f return decorator
[docs] class BaseMapping: """Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`, not to be inherited directly.""" def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name")
[docs] def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing mapping-specific data and translating with the appropriate mapping to JSON-LD using the Codemeta and ForgeFed vocabularies. Args: raw_content: raw content to translate Returns: translated metadata in JSON friendly form needed for the content if parseable, :const:`None` otherwise. """ raise NotImplementedError(f"{self.__class__.__name__}.translate")
[docs] def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation")
[docs] class BaseExtrinsicMapping(BaseMapping): """Base class for extrinsic_metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """
[docs] @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: """ Returns the list of extrinsic metadata formats which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats")
[docs] def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=True)
[docs] class BaseIntrinsicMapping(BaseMapping): """Base class for intrinsic-metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """
[docs] @classmethod def detect_metadata_files( cls, file_entries: List[DirectoryLsEntry] ) -> List[CompositeObjId]: """ Returns the sha1 hashes of files which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
[docs] def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=False)
[docs] class SingleFileIntrinsicMapping(BaseIntrinsicMapping): """Base class for all intrinsic metadata mappings that use a single file as input.""" filename: Union[bytes, Pattern[bytes]]
[docs] @classmethod def detect_metadata_files( cls, file_entries: List[DirectoryLsEntry] ) -> List[CompositeObjId]: filename = cls.filename # Check if filename is a regex or bytes: if isinstance(filename, bytes): for entry in file_entries: if entry["name"].lower() == filename.lower(): if entry["sha1"] is not None: # ignore skipped_content and dangling return [objid_from_dict(cast(dict, entry))] else: for entry in file_entries: if filename.match(entry["name"]): if entry["sha1"] is not None: # ignore skipped_content and dangling return [objid_from_dict(cast(dict, entry))] return []
[docs] class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" string_fields: List[str] = [] """List of fields that are simple strings, and don't need any normalization.""" date_fields: List[str] = [] """List of fields that are strings that should be typed as http://schema.org/Date """ uri_fields: List[str] = [] """List of fields that are simple URIs, and don't need any normalization.""" @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod def _normalize_method_name(name: str) -> str: return name.replace("-", "_")
[docs] @classmethod def supported_terms(cls): # one-to-one mapping from the original key to a CodeMeta term simple_terms = { str(term) for (key, term) in cls.mapping.items() if key in cls.string_fields + cls.date_fields + cls.uri_fields or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } # more complex mapping from the original key to JSON-LD complex_terms = { str(term) for meth_name in dir(cls) if meth_name.startswith("translate_") for term in getattr(getattr(cls, meth_name), "produced_terms", []) } return simple_terms | complex_terms
[docs] def get_root_uri(self, content_dict: Dict) -> rdflib.URIRef: """Returns an URI for the SoftwareSourceCode or Repository being described. The default implementation uses a temporary URI that is stripped before normalization by :meth:`_translate_dict`. """ # The main object being described (the SoftwareSourceCode) does not necessarily # may or may not have an id. # If it does, it will need to be set by a subclass. # If it doesn't we temporarily use this URI to identify it. Unfortunately, # we cannot use a blank node as we need to use it for JSON-LD framing later, # and blank nodes cannot be used for framing in JSON-LD >= 1.1 root_id = TMP_ROOT_URI_PREFIX + str(uuid.uuid4()) return rdflib.URIRef(root_id)
def _translate_dict(self, content_dict: Dict) -> Dict[str, Any]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ graph = rdflib.Graph() root = self.get_root_uri(content_dict) self._translate_to_graph(graph, root, content_dict) self.sanitize(graph) # Convert from rdflib's internal graph representation to JSON s = graph.serialize(format="application/ld+json") # Load from JSON to a list of Python objects jsonld_graph = json.loads(s) # Use JSON-LD framing to turn the graph into a rooted tree # frame = {"@type": str(SCHEMA.SoftwareSourceCode)} translated_metadata = jsonld.frame( jsonld_graph, {"@id": str(root)}, options={ "documentLoader": _document_loader, "processingMode": "json-ld-1.1", }, ) # Remove the temporary id we added at the beginning assert isinstance(translated_metadata["@id"], str) if translated_metadata["@id"].startswith(TMP_ROOT_URI_PREFIX): del translated_metadata["@id"] return self.normalize_translation(translated_metadata) def _translate_to_graph( self, graph: rdflib.Graph, root: rdflib.term.Identifier, content_dict: Dict ) -> None: """ Translates content by parsing content from a dict object and translating with the appropriate mapping to the graph passed as parameter Args: content_dict (dict): content dict to translate """ graph.add((root, RDF.type, SCHEMA.SoftwareSourceCode)) for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(graph, root, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value, # and add its results to the triples normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) if v is None: pass elif isinstance(v, list): for item in reversed(v): if isinstance(item, rdflib.URIRef): add_url_if_valid(graph, root, codemeta_key, str(item)) else: graph.add((root, codemeta_key, item)) else: if isinstance(v, rdflib.URIRef): add_url_if_valid(graph, root, codemeta_key, str(v)) else: graph.add((root, codemeta_key, v)) elif k in self.string_fields and isinstance(v, str): graph.add((root, codemeta_key, rdflib.Literal(v))) elif k in self.string_fields and isinstance(v, list): for item in v: graph.add((root, codemeta_key, rdflib.Literal(item))) elif k in self.date_fields and isinstance(v, str): typed_v = rdflib.Literal(v, datatype=SCHEMA.Date) graph.add((root, codemeta_key, typed_v)) elif k in self.date_fields and isinstance(v, list): for item in v: if isinstance(item, str): typed_item = rdflib.Literal(item, datatype=SCHEMA.Date) graph.add((root, codemeta_key, typed_item)) elif k in self.uri_fields and isinstance(v, str): add_url_if_valid(graph, root, codemeta_key, v) elif k in self.uri_fields and isinstance(v, list): for item in v: add_url_if_valid(graph, root, codemeta_key, item) else: continue self.extra_translation(graph, root, content_dict)
[docs] def sanitize(self, graph: rdflib.Graph) -> None: # Remove triples that make PyLD crash for subject, predicate, _ in graph.triples((None, None, rdflib.URIRef(""))): graph.remove((subject, predicate, rdflib.URIRef(""))) # Should not happen, but we's better check as this may lead to incorrect data invalid = False for triple in graph.triples((rdflib.URIRef(""), None, None)): invalid = True logging.error("Empty triple subject URI: %r", triple) if invalid: raise ValueError("Empty triple subject(s)")
[docs] def extra_translation( self, graph: rdflib.Graph, root: rdflib.term.Node, d: Dict[str, Any] ) -> None: """Called at the end of the translation process, and may add arbitrary triples to ``graph`` based on the input dictionary (passed as ``d``). """ pass
[docs] class JsonMapping(DictMapping): """Base class for all mappings that use JSON data as input."""
[docs] def translate(self, raw_content: bytes) -> Optional[Dict]: try: raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None try: content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None
[docs] class XmlMapping(DictMapping): """Base class for all mappings that use XML data as input."""
[docs] def translate(self, raw_content: bytes) -> Optional[Dict]: try: d = xmltodict.parse(raw_content) except xml.parsers.expat.ExpatError: self.log.warning("Error parsing XML from %s", self.log_suffix) return None except UnicodeDecodeError: self.log.warning("Error unidecoding XML from %s", self.log_suffix) return None except (LookupError, ValueError): # unknown encoding or multi-byte encoding self.log.warning("Error detecting XML encoding from %s", self.log_suffix) return None if not isinstance(d, dict): self.log.warning("Skipping ill-formed XML content: %s", raw_content) return None return self._translate_dict(d)
[docs] class SafeLoader(yaml.SafeLoader): yaml_implicit_resolvers = { k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"] for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items() }
[docs] class YamlMapping(DictMapping): """Base class for all mappings that use Yaml data as input."""
[docs] def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]: raw_content_string: str = raw_content.decode() try: content_dict = yaml.load(raw_content_string, Loader=SafeLoader) except yaml.YAMLError: return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None