Source code for swh.indexer.bibtex

# Copyright (C) 2023-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import calendar
import collections
import json
import secrets
import sys
from typing import Any, Dict, List, Optional
import uuid

import iso8601
from pybtex.database import Entry, Person
from pybtex.database.output.bibtex import Writer
from pybtex.plugin import register_plugin
import rdflib

from swh.indexer.codemeta import compact, expand
from swh.indexer.metadata_dictionary.cff import CffMapping
from swh.indexer.namespaces import RDF, SCHEMA, SPDX_LICENSES
from swh.model.swhids import ObjectType, QualifiedSWHID

TMP_ROOT_URI_PREFIX = "https://www.softwareheritage.org/schema/2022/indexer/tmp-node/"
"""IRI used for `skolemization <https://www.w3.org/TR/rdf11-concepts/#section-skolemization>`_;
it is not used outside :func:`codemeta_to_bibtex`.
"""

MACRO_PREFIX = "macro" + secrets.token_urlsafe(16).replace("_", "")


[docs] class BibTeXWithMacroWriter(Writer):
[docs] def quote(self, s): r""" >>> w = BibTeXWithMacroWriter() >>> print(w.quote(f'{MACRO_PREFIX}:jan')) jan """ if s.startswith(f"{MACRO_PREFIX}:"): return s[len(MACRO_PREFIX) + 1 :] return super().quote(s)
register_plugin("pybtex.database.output", "bibtex_with_macro", BibTeXWithMacroWriter)
[docs] def codemeta_to_bibtex( doc: Dict[str, Any], swhid: Optional[QualifiedSWHID] = None ) -> str: doc = compact(doc, False) identifiers = [] if "id" in doc: identifiers.append(doc["id"]) else: doc["id"] = f"_:{uuid.uuid4()}" id_: rdflib.term.Node if doc["id"].startswith("_:"): id_ = rdflib.term.BNode(doc["id"][2:]) else: # using a base in case the id is not an absolute URI id_ = rdflib.term.URIRef(doc["id"], base=TMP_ROOT_URI_PREFIX) doc["id"] = str(id_) # workaround for https://github.com/codemeta/codemeta/pull/322 if "identifier" in doc: if isinstance(doc["identifier"], list): for identifier in doc["identifier"]: if isinstance(identifier, str) and "/" not in identifier: identifiers.append(identifier) elif isinstance(doc["identifier"], str) and "/" not in doc["identifier"]: identifiers.append(doc["identifier"]) doc = expand(doc) g = rdflib.Graph().parse( data=json.dumps(doc), format="json-ld", # replace invalid URIs with blank node ids, instead of discarding whole nodes: generalized_rdf=True, ) persons: Dict[str, List[Person]] = collections.defaultdict(list) fields: Dict[str, Any] = {} def add_person(persons: List[Person], person_id: rdflib.term.Node) -> None: person = Person() for _, _, name in g.triples((person_id, SCHEMA.name, None)): if (person_id, RDF.type, SCHEMA.Organization) in g: # prevent interpreting the name as "Firstname Lastname" and reformatting # it to "Lastname, Firstname" person.last_names.append(name) else: person = Person(name) for _, _, given_name in g.triples((person_id, SCHEMA.givenName, None)): person.first_names.append(given_name) for _, _, family_name in g.triples((person_id, SCHEMA.familyName, None)): person.last_names.append(family_name) if str(person) and person not in persons: persons.append(person) def add_affiliations(person: rdflib.term.Node) -> None: for _, _, organization in g.triples((person, SCHEMA.affiliation, None)): add_person(persons["organization"], organization) # abstract for _, _, description in g.triples((id_, SCHEMA.description, None)): fields["abstract"] = description break for _, _, author_or_author_list in g.triples((id_, SCHEMA.author, None)): # schema.org-style authors, which are single values add_person(persons["author"], author_or_author_list) # codemeta-style authors, which are an ordered list if author_or_author_list == RDF.nil: # Workaround for https://github.com/RDFLib/rdflib/pull/2818 continue for author in rdflib.collection.Collection(g, author_or_author_list): add_person(persons["author"], author) add_affiliations(author) # date for _, _, date in g.triples((id_, SCHEMA.datePublished, None)): fields["date"] = date break else: for _, _, date in g.triples((id_, SCHEMA.dateCreated, None)): fields["date"] = date break else: for _, _, date in g.triples((id_, SCHEMA.dateModified, None)): fields["date"] = date break if "date" in fields: try: parsed_date = iso8601.parse_date(fields["date"]) fields["year"] = str(parsed_date.year) fields["month"] = ( f"{MACRO_PREFIX}:{calendar.month_abbr[parsed_date.month].lower()}" ) except iso8601.ParseError: pass # identifier, doi, hal_id entry_key = None for _, _, identifier in g.triples((id_, SCHEMA.identifier, None)): identifiers.append(identifier) for identifier in identifiers: if entry_key is None and "/" not in identifier: # Avoid URLs entry_key = identifier if identifier.startswith("https://doi.org/"): fields["doi"] = identifier if identifier.startswith("hal-"): fields["hal_id"] = identifier # editor for _, _, editor in g.triples((id_, SCHEMA.editor, None)): add_person(persons["editor"], editor) add_affiliations(editor) # file for _, _, download_url in g.triples((id_, SCHEMA.downloadUrl, None)): fields["file"] = download_url break # license (represented by "Person" as it's the only way to make pybtex format # them as a list) for _, _, license in g.triples((id_, SCHEMA.license, None)): if license is None: continue license_ = str(license) if license_.startswith(str(SPDX_LICENSES)): license_ = license_[len(str(SPDX_LICENSES)) :] if license_.endswith(".html"): license_ = license_[:-5] persons["license"].append(Person(last=license_)) # publisher for _, _, publisher in g.triples((id_, SCHEMA.publisher, None)): add_person(persons["publisher"], publisher) add_affiliations(publisher) # repository for _, _, code_repository in g.triples((id_, SCHEMA.codeRepository, None)): fields["repository"] = code_repository break # title for _, _, name in g.triples((id_, SCHEMA.name, None)): fields["title"] = name break # url for _, _, name in g.triples((id_, SCHEMA.url, None)): fields["url"] = name break # version for _, _, version in g.triples((id_, SCHEMA.softwareVersion, None)): fields["version"] = version break else: for _, _, version in g.triples((id_, SCHEMA.version, None)): fields["version"] = version # entry_type if swhid: fields["swhid"] = str(swhid) if swhid.object_type == ObjectType.SNAPSHOT: entry_type = "software" elif swhid.object_type == ObjectType.CONTENT: entry_type = "codefragment" else: entry_type = "softwareversion" elif "version" in fields: entry_type = "softwareversion" else: entry_type = "software" entry = Entry( entry_type, persons=persons, fields=fields, ) entry.key = entry_key or "REPLACEME" return entry.to_string(bib_format="bibtex_with_macro")
[docs] def cff_to_bibtex(content: str, swhid: Optional[QualifiedSWHID] = None) -> str: codemeta = CffMapping().translate(raw_content=content.encode("utf-8")) if codemeta is None: codemeta = {} return codemeta_to_bibtex(codemeta, swhid)
if __name__ == "__main__": for filename in sys.argv[1:]: if filename == "-": print(codemeta_to_bibtex(json.load(sys.stdin))) else: with open(filename) as f: print(codemeta_to_bibtex(json.load(f)))