Source code for swh.indexer.citation.codemeta_data

# Copyright (C) 2026  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass
import json
from typing import Any, Dict, Optional
import uuid

import rdflib

from swh.indexer.codemeta import CODEMETA_V3_CONTEXT_URL, compact, expand
from swh.indexer.namespaces import CODEMETA, RDF, SCHEMA
from swh.model.swhids import QualifiedSWHID

TMP_ROOT_URI_PREFIX = "https://www.softwareheritage.org/schema/2022/indexer/tmp-node/"
"""IRI used for skolemization.

See <https://www.w3.org/TR/rdf11-concepts/#section-skolemization>.
"""


[docs] @dataclass class CodeMetaData: """Class to store data extracted from a codemeta.json""" id: Optional[str] = None swhid: Optional[QualifiedSWHID] = None address: Optional[str] = None affiliation: Optional[list["CodeMetaPerson"]] = None applicationCategory: Optional[str] = None applicationSubCategory: Optional[str] = None author: Optional[list["CodeMetaPerson"]] = None buildInstructions: Optional[str] = None citation: Optional[str] = None codeRepository: Optional[str] = None continuousIntegration: Optional[str] = None contributor: Optional[str] = None copyrightHolder: Optional[str] = None copyrightYear: Optional[str] = None dateCreated: Optional[str] = None dateModified: Optional[str] = None datePublished: Optional[str] = None description: Optional[str] = None developmentStatus: Optional[str] = None downloadUrl: Optional[str] = None editor: Optional[list["CodeMetaPerson"]] = None email: Optional[str] = None embargoEndDate: Optional[str] = None encoding: Optional[str] = None endDate: Optional[str] = None familyName: Optional[str] = None fileFormat: Optional[str] = None fileSize: Optional[str] = None funder: Optional[str] = None funding: Optional[str] = None givenName: Optional[str] = None hasPart: Optional[str] = None hasSourceCode: Optional[str] = None identifier: Optional[list[str]] = None installUrl: Optional[str] = None isAccessibleForFree: Optional[str] = None isPartOf: Optional[str] = None isSourceCodeOf: Optional[str] = None issueTracker: Optional[str] = None keywords: Optional[str] = None license: Optional[list[str]] = None maintainer: Optional[str] = None memoryRequirements: Optional[str] = None name: Optional[str] = None operatingSystem: Optional[list[str]] = None permissions: Optional[str] = None position: Optional[str] = None processorRequirements: Optional[str] = None producer: Optional[str] = None programmingLanguage: Optional[list[str]] = None provider: Optional[str] = None publisher: Optional[list["CodeMetaPerson"]] = None readme: Optional[str] = None referencePublication: Optional[str] = None relatedLink: Optional[str] = None releaseNotes: Optional[str] = None review: Optional[str] = None reviewAspect: Optional[str] = None reviewBody: Optional[str] = None roleName: Optional[str] = None runtimePlatform: Optional[str] = None sameAs: Optional[str] = None softwareHelp: Optional[str] = None softwareRequirements: Optional[str] = None softwareSuggestions: Optional[str] = None softwareVersion: Optional[str] = None sponsor: Optional[str] = None startDate: Optional[str] = None storageRequirements: Optional[str] = None supportingData: Optional[str] = None targetProduct: Optional[str] = None url: Optional[str] = None version: Optional[str] = None
[docs] @dataclass class CodeMetaPerson: names: tuple[str, ...] = () given_names: tuple[str, ...] = () family_names: tuple[str, ...] = () is_organization: bool = False
[docs] def rdf_str_values( graph: rdflib.Graph, subject: rdflib.term.Node, predicate: rdflib.term.URIRef, ) -> tuple[str, ...]: return tuple( str(object_) for _, _, object_ in graph.triples((subject, predicate, None)) )
[docs] def rdf_first_str_value( graph: rdflib.Graph, subject: rdflib.term.Node, predicate: rdflib.term.URIRef, ) -> Optional[str]: for _, _, object_ in graph.triples((subject, predicate, None)): return str(object_) return None
[docs] def extract_person( graph: rdflib.Graph, person_id: rdflib.term.Node ) -> Optional[CodeMetaPerson]: names = rdf_str_values(graph, person_id, SCHEMA.name) given_names = rdf_str_values(graph, person_id, SCHEMA.givenName) family_names = rdf_str_values(graph, person_id, SCHEMA.familyName) if not (names or given_names or family_names): return None return CodeMetaPerson( names=names, given_names=given_names, family_names=family_names, is_organization=(person_id, RDF.type, SCHEMA.Organization) in graph, )
[docs] def resolve_role_nodes( graph: rdflib.Graph, person_id: rdflib.term.Node, role_property: rdflib.term.URIRef, seen: Optional[set[rdflib.term.Node]] = None, ) -> list[rdflib.term.Node]: if seen is None: seen = set() if person_id in seen: return [] seen.add(person_id) if (person_id, RDF.type, SCHEMA.Role) not in graph: return [person_id] resolved_nodes: list[rdflib.term.Node] = [] for _, _, inner_person in graph.triples((person_id, role_property, None)): resolved_nodes.extend( resolve_role_nodes( graph, inner_person, role_property=role_property, seen=seen ) ) return resolved_nodes
[docs] def extract_people( graph: rdflib.Graph, entity_id: rdflib.term.Node, role_property: rdflib.term.URIRef, ) -> tuple[list[CodeMetaPerson], list[CodeMetaPerson]]: people: list[CodeMetaPerson] = [] affiliations: list[CodeMetaPerson] = [] for _, _, person_or_person_list in graph.triples((entity_id, role_property, None)): person_ids = resolve_role_nodes(graph, person_or_person_list, role_property) if ( person_or_person_list != RDF.nil and (person_or_person_list, RDF.first, None) in graph ): for person_id in rdflib.collection.Collection(graph, person_or_person_list): person_ids.extend(resolve_role_nodes(graph, person_id, role_property)) for person_id in person_ids: person = extract_person(graph, person_id) if person and person not in people: people.append(person) for _, _, organization in graph.triples( (person_id, SCHEMA.affiliation, None) ): for organization_id in resolve_role_nodes( graph, organization, role_property=SCHEMA.affiliation ): affiliation = extract_person(graph, organization_id) if affiliation and affiliation not in affiliations: affiliations.append(affiliation) return (people, affiliations)
[docs] def extract_rdf_metadata( graph: rdflib.Graph, entity_id: rdflib.term.Node, *, swhid: Optional[QualifiedSWHID] ) -> dict[str, Any]: authors, author_affiliations = extract_people(graph, entity_id, SCHEMA.author) editors, editor_affiliations = extract_people(graph, entity_id, SCHEMA.editor) publishers, publisher_affiliations = extract_people( graph, entity_id, SCHEMA.publisher ) affiliations: list[CodeMetaPerson] = [] for affiliation in ( author_affiliations + editor_affiliations + publisher_affiliations ): if affiliation not in affiliations: affiliations.append(affiliation) identifiers = [ str(identifier) for _, _, identifier in graph.triples((entity_id, SCHEMA.identifier, None)) ] licenses = [ str(license) for _, _, license in graph.triples((entity_id, SCHEMA.license, None)) if license is not None ] d = { "swhid": swhid, "affiliation": affiliations or None, "author": authors or None, "developmentStatus": rdf_first_str_value( graph, entity_id, CODEMETA.developmentStatus ), "editor": editors or None, "identifier": identifiers or None, "issueTracker": rdf_first_str_value(graph, entity_id, CODEMETA.issueTracker), "license": licenses or None, "publisher": publishers or None, "referencePublication": rdf_first_str_value( graph, entity_id, CODEMETA.referencePublication ), } for term in [ "applicationCategory", "codeRepository", "dateCreated", "dateModified", "datePublished", "description", "downloadUrl", "installUrl", "name", "relatedLink", "softwareVersion", "url", "version", ]: d[term] = rdf_first_str_value(graph, entity_id, SCHEMA[term]) for term in [ "operatingSystem", "programmingLanguage", ]: d[term] = list(rdf_str_values(graph, entity_id, SCHEMA[term])) or None return d
[docs] def extract_compact_identifiers(doc: Dict[str, Any]) -> list[str]: # workaround for https://github.com/codemeta/codemeta/pull/322 compact_identifiers: list[str] = [] if "identifier" in doc: if isinstance(doc["identifier"], list): for identifier in doc["identifier"]: if isinstance(identifier, str) and "/" not in identifier: compact_identifiers.append(identifier) elif isinstance(doc["identifier"], str) and "/" not in doc["identifier"]: compact_identifiers.append(doc["identifier"]) return compact_identifiers
[docs] def merge_identifiers( *, rdf_data: dict[str, Any], codemeta_id: str, had_explicit_id: bool, compact_identifiers: list[str], ) -> None: identifiers = [*(rdf_data.pop("identifier") or [])] if had_explicit_id: identifiers.insert(0, codemeta_id) identifiers.extend(compact_identifiers) if identifiers: rdf_data["identifier"] = identifiers
[docs] def normalize_doc_id( doc: Dict[str, Any], ) -> tuple[bool, str, rdflib.term.Node]: had_explicit_id = "id" in doc if had_explicit_id: codemeta_id = str(doc["id"]) else: doc["id"] = f"_:{uuid.uuid4()}" codemeta_id = str(doc["id"]) doc_id = str(doc["id"]) if doc_id.startswith("_:"): entity_id: rdflib.term.Node = rdflib.term.BNode(doc_id[2:]) else: # using a base in case the id is not an absolute URI entity_id = rdflib.term.URIRef(doc_id, base=TMP_ROOT_URI_PREFIX) doc["id"] = str(entity_id) return (had_explicit_id, codemeta_id, entity_id)
[docs] def extract_codemeta_data( doc: Dict[str, Any], swhid: Optional[QualifiedSWHID] = None, *, resolve_unknown_context_url: bool = False, force_codemeta_context: bool = False, ) -> CodeMetaData: if force_codemeta_context: doc["@context"] = CODEMETA_V3_CONTEXT_URL doc = compact( doc, forgefed=False, resolve_unknown_context_url=resolve_unknown_context_url ) had_explicit_id, codemeta_id, entity_id = normalize_doc_id(doc) compact_identifiers = extract_compact_identifiers(doc) expanded_doc = expand(doc, resolve_unknown_context_url=resolve_unknown_context_url) graph = rdflib.Graph().parse( data=json.dumps(expanded_doc), format="json-ld", # replace invalid URIs with blank node ids, instead of discarding whole nodes: generalized_rdf=True, ) rdf_data = extract_rdf_metadata(graph, entity_id, swhid=swhid) merge_identifiers( rdf_data=rdf_data, codemeta_id=codemeta_id, had_explicit_id=had_explicit_id, compact_identifiers=compact_identifiers, ) return CodeMetaData( id=codemeta_id, **rdf_data, )