# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Pattern,
Tuple,
TypeVar,
Union,
cast,
)
import uuid
import xml.parsers.expat
from pyld import jsonld
import rdflib
from typing_extensions import TypedDict
import xmltodict
import yaml
from swh.indexer.codemeta import _document_loader, compact
from swh.indexer.namespaces import RDF, SCHEMA
from swh.indexer.storage.interface import Sha1
from swh.objstorage.interface import CompositeObjId, objid_from_dict
from .utils import add_url_if_valid
TMP_ROOT_URI_PREFIX = "https://www.softwareheritage.org/schema/2022/indexer/tmp-node/"
"""Prefix used to generate temporary URIs for root nodes being translated."""
[docs]
class DirectoryLsEntry(TypedDict):
target: Sha1
sha1: Optional[Sha1]
sha1_git: Optional[bytes]
sha256: Optional[bytes]
name: bytes
type: str
TTranslateCallable = TypeVar(
"TTranslateCallable",
bound=Callable[[Any, rdflib.Graph, rdflib.term.BNode, Any], None],
)
[docs]
def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]:
"""Returns a decorator that marks the decorated function as adding
the given terms to the ``translated_metadata`` dict"""
def decorator(f: TTranslateCallable) -> TTranslateCallable:
if not hasattr(f, "produced_terms"):
f.produced_terms = [] # type: ignore
f.produced_terms.extend(uris) # type: ignore
return f
return decorator
[docs]
class BaseMapping:
"""Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`,
not to be inherited directly."""
def __init__(self, log_suffix=""):
self.log_suffix = log_suffix
self.log = logging.getLogger(
"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
)
@property
def name(self):
"""A name of this mapping, used as an identifier in the
indexer storage."""
raise NotImplementedError(f"{self.__class__.__name__}.name")
[docs]
def translate(self, raw_content: bytes) -> Optional[Dict]:
"""
Translates content by parsing content from a bytestring containing
mapping-specific data and translating with the appropriate mapping
to JSON-LD using the Codemeta and ForgeFed vocabularies.
Args:
raw_content: raw content to translate
Returns:
translated metadata in JSON friendly form needed for the content
if parseable, :const:`None` otherwise.
"""
raise NotImplementedError(f"{self.__class__.__name__}.translate")
[docs]
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation")
[docs]
class BaseExtrinsicMapping(BaseMapping):
"""Base class for extrinsic_metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
[docs]
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=True)
[docs]
class BaseIntrinsicMapping(BaseMapping):
"""Base class for intrinsic-metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
[docs]
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=False)
[docs]
class SingleFileIntrinsicMapping(BaseIntrinsicMapping):
"""Base class for all intrinsic metadata mappings that use a single file as input."""
filename: Union[bytes, Pattern[bytes]]
[docs]
class DictMapping(BaseMapping):
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
string_fields: List[str] = []
"""List of fields that are simple strings, and don't need any
normalization."""
date_fields: List[str] = []
"""List of fields that are strings that should be typed as http://schema.org/Date
"""
uri_fields: List[str] = []
"""List of fields that are simple URIs, and don't need any
normalization."""
@property
def mapping(self):
"""A translation dict to map dict keys into a canonical name."""
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
[docs]
@classmethod
def supported_terms(cls):
# one-to-one mapping from the original key to a CodeMeta term
simple_terms = {
str(term)
for (key, term) in cls.mapping.items()
if key in cls.string_fields + cls.date_fields + cls.uri_fields
or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
}
# more complex mapping from the original key to JSON-LD
complex_terms = {
str(term)
for meth_name in dir(cls)
if meth_name.startswith("translate_")
for term in getattr(getattr(cls, meth_name), "produced_terms", [])
}
return simple_terms | complex_terms
[docs]
def get_root_uri(self, content_dict: Dict) -> rdflib.URIRef:
"""Returns an URI for the SoftwareSourceCode or Repository being described.
The default implementation uses a temporary URI that is stripped before
normalization by :meth:`_translate_dict`.
"""
# The main object being described (the SoftwareSourceCode) does not necessarily
# may or may not have an id.
# If it does, it will need to be set by a subclass.
# If it doesn't we temporarily use this URI to identify it. Unfortunately,
# we cannot use a blank node as we need to use it for JSON-LD framing later,
# and blank nodes cannot be used for framing in JSON-LD >= 1.1
root_id = TMP_ROOT_URI_PREFIX + str(uuid.uuid4())
return rdflib.URIRef(root_id)
def _translate_dict(self, content_dict: Dict) -> Dict[str, Any]:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
Args:
content_dict (dict): content dict to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
graph = rdflib.Graph()
root = self.get_root_uri(content_dict)
self._translate_to_graph(graph, root, content_dict)
self.sanitize(graph)
# Convert from rdflib's internal graph representation to JSON
s = graph.serialize(format="application/ld+json")
# Load from JSON to a list of Python objects
jsonld_graph = json.loads(s)
# Use JSON-LD framing to turn the graph into a rooted tree
# frame = {"@type": str(SCHEMA.SoftwareSourceCode)}
translated_metadata = jsonld.frame(
jsonld_graph,
{"@id": str(root)},
options={
"documentLoader": _document_loader,
"processingMode": "json-ld-1.1",
},
)
# Remove the temporary id we added at the beginning
assert isinstance(translated_metadata["@id"], str)
if translated_metadata["@id"].startswith(TMP_ROOT_URI_PREFIX):
del translated_metadata["@id"]
return self.normalize_translation(translated_metadata)
def _translate_to_graph(
self, graph: rdflib.Graph, root: rdflib.term.Identifier, content_dict: Dict
) -> None:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping to the graph passed as parameter
Args:
content_dict (dict): content dict to translate
"""
graph.add((root, RDF.type, SCHEMA.SoftwareSourceCode))
for k, v in content_dict.items():
# First, check if there is a specific translation
# method for this key
translation_method = getattr(
self, "translate_" + self._normalize_method_name(k), None
)
if translation_method:
translation_method(graph, root, v)
elif k in self.mapping:
# if there is no method, but the key is known from the
# crosswalk table
codemeta_key = self.mapping[k]
# if there is a normalization method, use it on the value,
# and add its results to the triples
normalization_method = getattr(
self, "normalize_" + self._normalize_method_name(k), None
)
if normalization_method:
v = normalization_method(v)
if v is None:
pass
elif isinstance(v, list):
for item in reversed(v):
if isinstance(item, rdflib.URIRef):
add_url_if_valid(graph, root, codemeta_key, str(item))
else:
graph.add((root, codemeta_key, item))
else:
if isinstance(v, rdflib.URIRef):
add_url_if_valid(graph, root, codemeta_key, str(v))
else:
graph.add((root, codemeta_key, v))
elif k in self.string_fields and isinstance(v, str):
graph.add((root, codemeta_key, rdflib.Literal(v)))
elif k in self.string_fields and isinstance(v, list):
for item in v:
graph.add((root, codemeta_key, rdflib.Literal(item)))
elif k in self.date_fields and isinstance(v, str):
typed_v = rdflib.Literal(v, datatype=SCHEMA.Date)
graph.add((root, codemeta_key, typed_v))
elif k in self.date_fields and isinstance(v, list):
for item in v:
if isinstance(item, str):
typed_item = rdflib.Literal(item, datatype=SCHEMA.Date)
graph.add((root, codemeta_key, typed_item))
elif k in self.uri_fields and isinstance(v, str):
add_url_if_valid(graph, root, codemeta_key, v)
elif k in self.uri_fields and isinstance(v, list):
for item in v:
add_url_if_valid(graph, root, codemeta_key, item)
else:
continue
self.extra_translation(graph, root, content_dict)
[docs]
def sanitize(self, graph: rdflib.Graph) -> None:
# Remove triples that make PyLD crash
for subject, predicate, _ in graph.triples((None, None, rdflib.URIRef(""))):
graph.remove((subject, predicate, rdflib.URIRef("")))
# Should not happen, but we's better check as this may lead to incorrect data
invalid = False
for triple in graph.triples((rdflib.URIRef(""), None, None)):
invalid = True
logging.error("Empty triple subject URI: %r", triple)
if invalid:
raise ValueError("Empty triple subject(s)")
[docs]
class JsonMapping(DictMapping):
"""Base class for all mappings that use JSON data as input."""
[docs]
def translate(self, raw_content: bytes) -> Optional[Dict]:
try:
raw_content_string: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
return None
try:
content_dict = json.loads(raw_content_string)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
[docs]
class XmlMapping(DictMapping):
"""Base class for all mappings that use XML data as input."""
[docs]
def translate(self, raw_content: bytes) -> Optional[Dict]:
try:
d = xmltodict.parse(raw_content)
except xml.parsers.expat.ExpatError:
self.log.warning("Error parsing XML from %s", self.log_suffix)
return None
except UnicodeDecodeError:
self.log.warning("Error unidecoding XML from %s", self.log_suffix)
return None
except (LookupError, ValueError):
# unknown encoding or multi-byte encoding
self.log.warning("Error detecting XML encoding from %s", self.log_suffix)
return None
if not isinstance(d, dict):
self.log.warning("Skipping ill-formed XML content: %s", raw_content)
return None
return self._translate_dict(d)
[docs]
class SafeLoader(yaml.SafeLoader):
yaml_implicit_resolvers = {
k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"]
for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items()
}
[docs]
class YamlMapping(DictMapping):
"""Base class for all mappings that use Yaml data as input."""
[docs]
def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]:
raw_content_string: str = raw_content.decode()
try:
content_dict = yaml.load(raw_content_string, Loader=SafeLoader)
except yaml.YAMLError:
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None