Source code for swh.indexer.codemeta

# Copyright (C) 2018-2025  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import collections
import csv
from functools import lru_cache, partial
import itertools
import json
import os.path
import re
from typing import Any, Dict, List, Set, TextIO, Tuple

from pyld import jsonld
from pyld.documentloader.requests import requests_document_loader
import rdflib

import swh.indexer
from swh.indexer.namespaces import ACTIVITYSTREAMS, CODEMETA, FORGEFED, SCHEMA, XSD

_DATA_DIR = os.path.join(os.path.dirname(swh.indexer.__file__), "data")

CROSSWALK_TABLE_PATH = os.path.join(_DATA_DIR, "codemeta", "crosswalk.csv")

CODEMETA_V2_CONTEXT_PATH = os.path.join(_DATA_DIR, "codemeta", "codemeta-2.0.jsonld")
CODEMETA_V3_CONTEXT_PATH = os.path.join(_DATA_DIR, "codemeta", "codemeta-3.0.jsonld")


with open(CODEMETA_V2_CONTEXT_PATH) as fdv2, open(CODEMETA_V3_CONTEXT_PATH) as fdv3:
    CODEMETA_V2_CONTEXT = json.load(fdv2)
    CODEMETA_V3_CONTEXT = json.load(fdv3)

with open(os.path.join(_DATA_DIR, "schema.org", "schemaorgcontext.jsonld")) as fd:
    _SCHEMA_DOT_ORG_CONTEXT = json.load(fd)

_EMPTY_PROCESSED_CONTEXT: Any = {"mappings": {}}
_PROCESSED_CODEMETA_CONTEXT = jsonld.JsonLdProcessor().process_context(
    _EMPTY_PROCESSED_CONTEXT, CODEMETA_V2_CONTEXT, None
)

CODEMETA_V2_CONTEXT_URL = "https://doi.org/10.5063/schema/codemeta-2.0"
CODEMETA_V2_ALTERNATE_CONTEXT_URLS = {
    "https://raw.githubusercontent.com/codemeta/codemeta/master/codemeta.jsonld",
    "https://raw.githubusercontent.com/codemeta/codemeta/2.0/codemeta.jsonld",
    "https://doi.org/doi:10.5063/schema/codemeta-2.0",
    "http://purl.org/codemeta/2.0",
}

CODEMETA_V3_CONTEXT_URL = "https://w3id.org/codemeta/3.0"
CODEMETA_V3_ALTERNATE_CONTEXT_URLS = {
    "https://raw.githubusercontent.com/codemeta/codemeta/3.0/codemeta.jsonld"
}

PROPERTY_BLACKLIST = {
    # CodeMeta properties that we cannot properly represent.
    SCHEMA.softwareRequirements,
    CODEMETA.softwareSuggestions,
    # Duplicate of 'author'
    SCHEMA.creator,
}

_codemeta_field_separator = re.compile(r"\s*[,/]\s*")


@lru_cache
def _requests_document_loader(url):
    return requests_document_loader()(url)


[docs] def make_absolute_uri(local_name): """Parses codemeta.jsonld, and returns the @id of terms it defines. >>> make_absolute_uri("name") 'http://schema.org/name' >>> make_absolute_uri("downloadUrl") 'http://schema.org/downloadUrl' >>> make_absolute_uri("referencePublication") 'https://codemeta.github.io/terms/referencePublication' """ uri = jsonld.JsonLdProcessor.get_context_value( _PROCESSED_CODEMETA_CONTEXT, local_name, "@id" ) assert uri.startswith(("@", CODEMETA, SCHEMA)), (local_name, uri) return uri
[docs] def read_crosstable(fd: TextIO) -> Tuple[Set[str], Dict[str, Dict[str, rdflib.URIRef]]]: """ Given a file-like object to a `CodeMeta crosswalk table` (either the main cross-table with all columns, or an auxiliary table with just the CodeMeta column and one ecosystem-specific table); returns a list of all CodeMeta terms, and a dictionary ``{ecosystem: {ecosystem_term: codemeta_term}}`` .. _CodeMeta crosswalk table: <https://codemeta.github.io/crosswalk/ """ reader = csv.reader(fd) try: header = next(reader) except StopIteration: raise ValueError("empty file") data_sources = set(header) - {"Parent Type", "Property", "Type", "Description"} codemeta_translation: Dict[str, Dict[str, rdflib.URIRef]] = { data_source: {} for data_source in data_sources } terms = set() for line in reader: # For each canonical name local_name = dict(zip(header, line))["Property"] if not local_name: continue canonical_name = make_absolute_uri(local_name) if rdflib.URIRef(canonical_name) in PROPERTY_BLACKLIST: continue terms.add(canonical_name) for col, value in zip(header, line): # For each cell in the row if col in data_sources: # If that's not the parentType/property/type/description for local_name in _codemeta_field_separator.split(value): # For each of the data source's properties that maps # to this canonical name if local_name.strip(): codemeta_translation[col][local_name.strip()] = rdflib.URIRef( canonical_name ) return (terms, codemeta_translation)
with open(CROSSWALK_TABLE_PATH) as fd: (CODEMETA_TERMS, CROSSWALK_TABLE) = read_crosstable(fd) def _document_loader(url, options=None, resolve_unknown_context_url: bool = False): """Document loader for pyld. Reads the local codemeta.jsonld file instead of fetching it from the Internet every single time.""" if ( url.lower().rstrip("/") == CODEMETA_V2_CONTEXT_URL.lower() or url.lower() in CODEMETA_V2_ALTERNATE_CONTEXT_URLS ): return { "contextUrl": None, "documentUrl": url, "document": CODEMETA_V2_CONTEXT, } if ( url.lower().rstrip("/") == CODEMETA_V3_CONTEXT_URL.lower() or url.lower().rstrip("/") in CODEMETA_V3_ALTERNATE_CONTEXT_URLS ): return { "contextUrl": None, "documentUrl": url, "document": CODEMETA_V3_CONTEXT, } elif url == CODEMETA: raise Exception( "{} is CodeMeta's URI, use {} as context url".format( CODEMETA, CODEMETA_V2_CONTEXT_URL ) ) elif url.lower().rstrip("/") in ("http://schema.org", "https://schema.org"): return { "contextUrl": None, "documentUrl": url, "document": _SCHEMA_DOT_ORG_CONTEXT, } else: if resolve_unknown_context_url: return _requests_document_loader(url) else: raise Exception(f"Unknown context URL: {url}")
[docs] def compact( doc: Dict[str, Any], forgefed: bool, resolve_unknown_context_url: bool = False ) -> Dict[str, Any]: """Same as `pyld.jsonld.compact`, but in the context of CodeMeta. Args: doc: parsed ``codemeta.json`` file forgefed: Whether to add ForgeFed and ActivityStreams as compact URIs. This is typically used for extrinsic metadata documents, which frequently use properties from these namespaces. resolve_unknown_context_url: if const:`True` unknown JSON-LD context URL will be fetched using ``requests`` instead of raising an exception, :const:`False` by default as it can lead sending requests to arbitrary URLs so use with caution Returns: A compacted JSON-LD document. """ contexts: List[Any] = [CODEMETA_V2_CONTEXT_URL] if forgefed: contexts.append( {"as": str(ACTIVITYSTREAMS), "forge": str(FORGEFED), "xsd": str(XSD)} ) return jsonld.compact( doc, contexts, options={ "documentLoader": partial( _document_loader, resolve_unknown_context_url=resolve_unknown_context_url, ) }, )
[docs] def expand( doc: Dict[str, Any], resolve_unknown_context_url: bool = False ) -> Dict[str, Any]: """Same as `pyld.jsonld.expand`, but in the context of CodeMeta. Args: doc: parsed ``codemeta.json`` file resolve_unknown_context_url: if const:`True` unknown JSON-LD context URL will be fetched using ``requests`` instead of raising an exception, :const:`False` by default as it can lead sending requests to arbitrary URLs so use with caution Returns: An expanded JSON-LD document. """ return jsonld.expand( doc, options={ "documentLoader": partial( _document_loader, resolve_unknown_context_url=resolve_unknown_context_url, ) }, )
[docs] def merge_documents(documents): """Takes a list of metadata dicts, each generated from a different metadata file, and merges them. Removes duplicates, if any.""" documents = list(itertools.chain.from_iterable(map(expand, documents))) merged_document = collections.defaultdict(list) for document in documents: for key, values in document.items(): if key == "@id": # @id does not get expanded to a list value = values # Only one @id is allowed, move it to sameAs if "@id" not in merged_document: merged_document["@id"] = value elif value != merged_document["@id"]: if value not in merged_document[SCHEMA.sameAs]: merged_document[SCHEMA.sameAs].append(value) else: for value in values: if isinstance(value, dict) and set(value) == {"@list"}: # Value is of the form {'@list': [item1, item2]} # instead of the usual [item1, item2]. # We need to merge the inner lists (and mostly # preserve order). merged_value = merged_document.setdefault(key, {"@list": []}) for subvalue in value["@list"]: # merged_value must be of the form # {'@list': [item1, item2]}; as it is the same # type as value, which is an @list. if subvalue not in merged_value["@list"]: merged_value["@list"].append(subvalue) elif value not in merged_document[key]: merged_document[key].append(value) # XXX: we should set forgefed=True when merging extrinsic_metadata documents. # however, this function is only used to merge multiple files of the same # directory (which is only for intrinsic-metadata), so it is not an issue for now return compact(merged_document, forgefed=False)