Source code for swh.indexer.metadata_dictionary.codemeta
# Copyright (C) 2018-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import collections
import json
import logging
import re
from typing import Any, Dict, List, Optional, Tuple, Union
import xml.etree.ElementTree as ET
import iso8601
from pyld import jsonld
import xmltodict
from swh.indexer.codemeta import (
CODEMETA_TERMS,
CODEMETA_V2_CONTEXT_URL,
compact,
expand,
)
from .base import BaseExtrinsicMapping, SingleFileIntrinsicMapping
ATOM_URI = "http://www.w3.org/2005/Atom"
_TAG_RE = re.compile(r"\{(?P<namespace>.*?)\}(?P<localname>.*)")
_IGNORED_NAMESPACES = ("http://www.w3.org/2005/Atom",)
_DATE_RE = re.compile("^[0-9]{4}-[0-9]{1,2}-[0-9]{1,2}$")
logger = logging.getLogger(__name__)
[docs]
class CodemetaMapping(SingleFileIntrinsicMapping):
"""
dedicated class for CodeMeta (codemeta.json) mapping and translation
"""
name = "codemeta"
filename = b"codemeta.json"
string_fields = None
[docs]
@classmethod
def supported_terms(cls) -> List[str]:
return [term for term in CODEMETA_TERMS if not term.startswith("@")]
[docs]
def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
try:
return self.normalize_translation(expand(json.loads(content.decode())))
except Exception:
return None
[docs]
class SwordCodemetaMapping(BaseExtrinsicMapping):
"""
dedicated class for mapping and translation from JSON-LD statements
embedded in SWORD documents, optionally using Codemeta contexts,
as described in the :ref:`deposit-protocol`.
"""
name = "sword-codemeta"
[docs]
@classmethod
def extrinsic_metadata_formats(cls) -> Tuple[str, ...]:
return (
"sword-v2-atom-codemeta",
"sword-v2-atom-codemeta-v2",
)
[docs]
@classmethod
def supported_terms(cls) -> List[str]:
return [term for term in CODEMETA_TERMS if not term.startswith("@")]
[docs]
def xml_to_jsonld(self, e: ET.Element) -> Union[str, Dict[str, Any]]:
# Keys are JSON-LD property names (URIs or terms).
# Values are either a single string (if key is "type") or list of
# other dicts with the same type recursively.
# To simply annotations, we omit the single string case here.
doc: Dict[str, List[Union[str, Dict[str, Any]]]] = collections.defaultdict(list)
for child in e:
m = _TAG_RE.match(child.tag)
assert m, f"Tag with no namespace: {child}"
namespace = m.group("namespace")
localname = m.group("localname")
if namespace == ATOM_URI and localname in ("title", "name"):
# Convert Atom to Codemeta name; in case codemeta:name
# is not provided or different
doc["name"].append(self.xml_to_jsonld(child))
elif namespace == ATOM_URI and localname in ("author", "email"):
# ditto for these author properties (note that author email is also
# covered by the previous test)
doc[localname].append(self.xml_to_jsonld(child))
elif namespace in _IGNORED_NAMESPACES:
# SWORD-specific namespace that is not interesting to translate
pass
elif namespace.lower() == CODEMETA_V2_CONTEXT_URL:
# It is a term defined by the context; write is as-is and JSON-LD
# expansion will convert it to a full URI based on
# "@context": CODEMETA_V2_CONTEXT_URL
jsonld_child = self.xml_to_jsonld(child)
if (
localname
in (
"dateCreated",
"dateModified",
"datePublished",
)
and isinstance(jsonld_child, str)
and _DATE_RE.match(jsonld_child)
):
# Dates missing a leading zero for their day/month, used
# to be allowed by the deposit; so we need to reformat them
# to be valid ISO8601.
jsonld_child = iso8601.parse_date(jsonld_child).date().isoformat()
if localname == "id":
# JSON-LD only allows a single id, and they have to be strings.
if localname in doc:
logger.error(
"Duplicate <id>s in SWORD document: %r and %r",
doc[localname],
jsonld_child,
)
continue
elif not jsonld_child:
logger.error("Empty <id> value in SWORD document")
continue
elif not isinstance(jsonld_child, str):
logger.error(
"Unexpected <id> value in SWORD document: %r", jsonld_child
)
continue
else:
doc[localname] = jsonld_child # type: ignore[assignment]
else:
doc[localname].append(jsonld_child)
else:
# Otherwise, we already know the URI
doc[f"{namespace}{localname}"].append(self.xml_to_jsonld(child))
# The above needed doc values to be list to work; now we allow any type
# of value as key "@value" cannot have a list as value.
doc_: Dict[str, Any] = doc
text = e.text.strip() if e.text else None
if text:
# TODO: check doc is empty, and raise mixed-content error otherwise?
return text
return doc_
[docs]
def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
# Parse XML
try:
root = ET.fromstring(content)
except ET.ParseError:
logger.error("Failed to parse XML document: %s", content)
return None
else:
# Transform to JSON-LD document
doc = self.xml_to_jsonld(root)
assert isinstance(doc, dict), f"Root object is not a dict: {doc}"
# Add @context to JSON-LD expansion replaces the "codemeta:" prefix
# hash (which uses the context URL as namespace URI for historical
# reasons) into properties in `http://schema.org/` and
# `https://codemeta.github.io/terms/` namespaces
doc["@context"] = CODEMETA_V2_CONTEXT_URL
# Normalize as a Codemeta document
return self.normalize_translation(expand(doc))
[docs]
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=False)
[docs]
def iter_keys(d):
"""Recursively iterates on dictionary keys"""
if isinstance(d, dict):
yield from d
for value in d:
yield from iter_keys(value)
elif isinstance(d, list):
for value in d:
yield from iter_keys(value)
else:
pass
[docs]
class JsonSwordCodemetaMapping(SwordCodemetaMapping):
"""
Variant of :class:`SwordCodemetaMapping` that reads the legacy
``sword-v2-atom-codemeta-v2-in-json`` format and converts it back to
``sword-v2-atom-codemeta-v2`` XML
"""
name = "json-sword-codemeta"
[docs]
@classmethod
def extrinsic_metadata_formats(cls) -> Tuple[str, ...]:
return ("sword-v2-atom-codemeta-v2-in-json",)
[docs]
def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
# ``content`` was generated by calling ``xmltodict.parse()`` on a XML document,
# so ``xmltodict.unparse()`` is guaranteed to return a document that is
# semantically equivalent to the original and pass it to SwordCodemetaMapping.
try:
json_doc = json.loads(content)
except json.JSONDecodeError:
logger.error("Failed to parse JSON document: %s", content)
return None
else:
if "@xmlns" not in json_doc:
# Technically invalid, but old versions of the deposit dropped
# XMLNS information
json_doc["@xmlns"] = ATOM_URI
if "@xmlns:codemeta" not in json_doc and any(
key.startswith("codemeta:") for key in iter_keys(json_doc)
):
# ditto
json_doc["@xmlns:codemeta"] = CODEMETA_V2_CONTEXT_URL
if json_doc["@xmlns"] not in (ATOM_URI, [ATOM_URI]):
# Technically, non-default XMLNS were allowed, but no one used them,
# and we don't write this format anymore, so they do not need to be
# implemented here.
raise NotImplementedError(f"Unexpected XMLNS set: {json_doc}")
# Root tag was stripped by swh-deposit
json_doc = {"entry": json_doc}
return super().translate(xmltodict.unparse(json_doc).encode())
[docs]
def load_and_compact_notification(content: bytes | str) -> dict[str, Any] | None:
"""Load and compact a notification from the REMS.
Errors logs will be written if something went wrong in the process.
Args:
content: the expanded COAR Notification
Returns:
The compacted form of the COAR Notification or None if we weren't able to
read it
"""
try:
raw_json = json.loads(content)
notification = jsonld.compact(
raw_json,
{
"@context": [
"https://www.w3.org/ns/activitystreams",
"https://coar-notify.net",
]
},
)
except json.JSONDecodeError:
logger.error("Failed to parse JSON document: %s", content)
return None
except jsonld.JsonLdError:
logger.error("Failed to compact JSON-LD document: %s", content)
return None
return notification
[docs]
def validate_mention(notification: dict[str, Any]) -> bool:
"""Validate minimal notification's requirements before indexation.
Args:
notification: a compact form of a COAR Notification
Returns:
False if the we can't find required props in the notification
"""
subject = notification.get("object", {}).get("as:subject")
if subject is None:
logger.error("Missing object[as:subject] key in %s", notification)
return False
if not isinstance(subject, str):
logger.error("object[as:subject] value is not a string in %s", notification)
return False
paper = notification.get("context", {}).get("id")
if paper is None:
logger.error("Missing context[id] key in %s", notification)
return False
if not isinstance(paper, str):
logger.error("context[id] value is not a string in %s", notification)
return False
if subject != paper:
logger.error(
"Mismatch between context[id] and object[as:subject] in %s", notification
)
return False
notification_id = notification.get("id")
if notification_id is None:
logger.error("missing id key in %s", notification)
return False
if not isinstance(notification_id, str):
logger.error("id value is not a string in %s", notification)
return False
return True
[docs]
class CoarNotifyMentionMapping(BaseExtrinsicMapping):
"""Map & translate a COAR Notify software mention in a CodeMeta format.
COAR Notify mentions are received by ``swh-coarnotify`` and saved expanded.
Mentions contains metadata on a scientific paper that cites a software.
"""
name = "coarnotify-mention-codemeta"
[docs]
@classmethod
def supported_terms(cls) -> list[str]:
return [term for term in CODEMETA_TERMS if not term.startswith("@")]
[docs]
@classmethod
def extrinsic_metadata_formats(cls) -> tuple[str, ...]:
return ("coarnotify-mention-v1",)
[docs]
def translate(self, content: bytes) -> dict[str, Any] | None:
"""Parse JSON and compact the payload to access the mention.
The whole `context` of the `AnnounceRelationship` notification will be indexed
as it contains metadata about the scientific paper citing the software.
TODO: At some point we might need to fetch metadata from the paper URL as COAR
Notifications are not made to contain **all** the metadata but to indicate
where we should find them.
TODO: We will need to handle cancellations of a mention if it was made by
mistake. Maybe we could use the original notification id and an empty context
to overwrite the previous citation when merging documents ? It is with this in
mind that the notification ID is added to the citation.
Args:
content: the raw expanded COAR Notification
Returns:
A CodeMeta citation if the notification was valid or None
"""
notification = load_and_compact_notification(content)
if not notification:
return None
if not validate_mention(notification):
return None
citation = {
"@context": ["http://schema.org/", "https://w3id.org/codemeta/3.0"],
"citation": [
{"id": notification["id"], "ScholarlyArticle": notification["context"]}
],
}
return citation