# Copyright (C) 2021-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
import gzip
import logging
import os
import re
import shutil
import subprocess
import tempfile
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from lxml import etree
import ndjson
import requests
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
RepoPage = Optional[Dict[str, Any]]
SUPPORTED_SCM_TYPES = ("git", "svn", "hg", "cvs", "bzr")
[docs]
@dataclass
class MavenListerState:
"""State of the MavenLister"""
last_seen_doc: int = -1
"""Last doc ID ingested during an incremental pass
"""
last_seen_pom: int = -1
"""Last doc ID related to a pom and ingested during
an incremental pass
"""
[docs]
class MavenLister(Lister[MavenListerState, RepoPage]):
"""List origins from a Maven repository.
Maven Central provides artifacts for Java builds.
It includes POM files and source archives, which we download to get
the source code of artifacts and links to their scm repository.
This lister yields origins of types: git/svn/hg or whatever the Artifacts
use as repository type, plus maven types for the maven loader (tarball, source jar).
The lister relies on the use of the maven index exporter tool allowing to
convert the binary content of a maven repository index to NDJSON format
(https://gitlab.softwareheritage.org/swh/devel/fixtures/maven-index-exporter).
To be able to execute the tool, Java runtime environment >= 17 must be available
in the lister execution environment.
"""
LISTER_NAME = "maven"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
# keep no longer used parameter for backward compatibility of
# existing lister tasks in scheduler database
index_url: Optional[str] = None,
instance: Optional[str] = None,
credentials: CredentialsType = None,
max_origins_per_page: Optional[int] = None,
max_pages: Optional[int] = None,
enable_origins: bool = True,
incremental: bool = True,
with_github_session=True,
process_pom_files: bool = True,
):
"""Lister class for Maven repositories.
Args:
url: main URL of the Maven repository, i.e. url of the base index
used to fetch maven artifacts. For Maven central use
https://repo1.maven.org/maven2/
instance: Name of maven instance. Defaults to url's network location
if unset.
incremental: defaults to :const:`True`. Defines if incremental listing
is activated or not.
with_github_session: defaults to :const:`True`. Defines if canonical
URL for extracted github repository should be retrieved with the
GitHub REST API.
"""
self.BASE_URL = url.rstrip("/") + "/"
self.incremental = incremental
super().__init__(
scheduler=scheduler,
credentials=credentials,
url=self.BASE_URL,
instance=instance,
with_github_session=with_github_session,
max_origins_per_page=max_origins_per_page,
max_pages=max_pages,
enable_origins=enable_origins,
)
self.session.headers.update({"Accept": "application/json"})
self.jar_origin: Optional[ListedOrigin] = None
self.jar_origin_docs: List[int] = []
self.last_origin_url: Optional[str] = None
self.last_seen_doc = self.state.last_seen_doc
self.last_seen_pom = self.state.last_seen_pom
self.process_pom_files = process_pom_files
[docs]
def state_from_dict(self, d: Dict[str, Any]) -> MavenListerState:
return MavenListerState(**d)
[docs]
def state_to_dict(self, state: MavenListerState) -> Dict[str, Any]:
return asdict(state)
[docs]
def get_pages(self) -> Iterator[RepoPage]:
"""Retrieve and parse exported maven indexes to
identify all pom files and src archives.
"""
# Example of returned RepoPage's:
# [
# {
# "type": "maven",
# "url": "https://maven.xwiki.org/..-5.4.2-sources.jar",
# "time": 1626109619335,
# "gid": "org.xwiki.platform",
# "aid": "xwiki-platform-wikistream-events-xwiki",
# "version": "5.4.2"
# },
# {
# "type": "scm",
# "url": "scm:git:git://github.com/openengsb/openengsb-framework.git",
# "project": "openengsb-framework",
# },
# ...
# ]
out_pom: Dict = {}
with tempfile.TemporaryDirectory() as tmpdir:
work_dir = os.path.join(tmpdir, "work")
publish_dir = os.path.join(tmpdir, "publish")
os.makedirs(work_dir, exist_ok=True)
os.makedirs(publish_dir, exist_ok=True)
# Execute maven index exporter tool to dump maven repository
# index to NDJSON format
subprocess.check_call(
[
"python3",
"/opt/maven-index-exporter/run_full_export.py",
"--base-url",
self.BASE_URL,
"--work-dir",
work_dir,
"--publish-dir",
publish_dir,
],
)
# Remove no longer needed files to save some disk space
shutil.rmtree(work_dir)
# Read NDJSON file line by line and process it, documents are sorted
# by groupId and artifactId so every versions of a given maven package
# are processed sequentially.
with gzip.open(
os.path.join(publish_dir, "maven-index-export.ndjson.gz"),
mode="rt",
encoding="utf-8",
errors="ignore",
) as f:
reader = ndjson.reader(f)
for entry in reader:
doc_id = entry["doc"]
gid, aid, version, classifier, ext = entry["u"].split("|")
ext = ext.strip()
path = "/".join(gid.split("."))
if (
self.process_pom_files
and classifier == "NA"
and ext.lower() == "pom"
):
# Store pom file URL to extract SCM URLs at end of listing
# process.If incremental mode, we don't record any pom file
# that is before our last recorded doc id.
if self.incremental and self.last_seen_pom >= doc_id:
continue
url_path = f"{path}/{aid}/{version}/{aid}-{version}.{ext}"
url_pom = urljoin(
self.BASE_URL,
url_path,
)
out_pom[url_pom] = doc_id
elif (
classifier.lower() == "sources" or ("src" in classifier)
) and ext.lower() in ("zip", "jar"):
# Yield maven source package info
url_path = (
f"{path}/{aid}/{version}/{aid}-{version}-{classifier}.{ext}"
)
url_src = urljoin(self.BASE_URL, url_path)
m_time = entry["i"].split("|")[1]
artifact_metadata_d = {
"type": "maven",
"url": url_src,
"doc": doc_id,
"gid": gid,
"aid": aid,
"version": version,
"time": int(m_time),
}
logger.debug(
"* Yielding jar %s: %s", url_src, artifact_metadata_d
)
yield artifact_metadata_d
# Notify that maven index listing has finished
yield None
if self.process_pom_files:
# Now fetch pom files and scan them for scm info.
logger.info("Found %s poms.", len(out_pom))
logger.info("Fetching poms ...")
for pom_url in out_pom:
try:
response = self.http_request(pom_url)
parsed_pom = BeautifulSoup(response.content, "xml")
connection = parsed_pom.select_one("project scm connection")
if connection is not None:
artifact_metadata_d = {
"type": "scm",
"doc": out_pom[pom_url],
"url": connection.text,
}
logger.debug(
"* Yielding pom %s: %s", pom_url, artifact_metadata_d
)
yield artifact_metadata_d
else:
logger.debug("No project.scm.connection in pom %s", pom_url)
except requests.HTTPError:
logger.warning(
"POM info page could not be fetched, skipping project '%s'",
pom_url,
)
except etree.Error as error:
logger.info("Could not parse POM %s XML: %s.", pom_url, error)
[docs]
def get_scm(self, page: RepoPage) -> Optional[ListedOrigin]:
"""Retrieve scm origin out of the page information. Only called when type of the
page is scm.
Try and detect an scm/vcs repository. Note that official format is in the form:
scm:{type}:git://example.org/{user}/{repo}.git but some projects directly put
the repo url (without the "scm:type"), so we have to check against the content
to extract the type and url properly.
Raises
AssertionError when the type of the page is not 'scm'
Returns
ListedOrigin with proper canonical scm url (for github) if any is found,
None otherwise.
"""
assert page and page["type"] == "scm"
visit_type: Optional[str] = None
url: Optional[str] = None
m_scm = re.match(r"^scm:(?P<type>[^:]+):(?P<url>.*)$", page["url"])
if m_scm is None:
return None
scm_type = m_scm.group("type")
if scm_type and scm_type in SUPPORTED_SCM_TYPES:
url = m_scm.group("url")
visit_type = scm_type
elif page["url"].endswith(".git"):
url = page["url"].lstrip("scm:")
visit_type = "git"
else:
return None
if self.github_session and url and visit_type == "git":
# Non-github urls will be returned as is, github ones will be canonical ones
url = self.github_session.get_canonical_url(url)
if not url:
return None
if "${" in url:
# A handful of URLs contain templated strings (21, as of 2023-01-30)
# We could implement support for
# https://maven.apache.org/guides/introduction/introduction-to-the-pom.html#Project_Interpolation_and_Variables
# but most of them seem to use variables not defined in this spec.
return None
assert visit_type is not None
assert self.lister_obj.id is not None
return ListedOrigin(
lister_id=self.lister_obj.id,
url=url,
visit_type=visit_type,
)
[docs]
def get_origins_from_page(self, page: RepoPage) -> Iterator[ListedOrigin]:
"""Convert a page of Maven repositories into a list of ListedOrigins."""
if page is None:
# maven index listing has finished, yield last ListedOrigin if any
if self.jar_origin and (
not self.incremental
or (any(doc > self.last_seen_doc for doc in self.jar_origin_docs))
):
yield self.jar_origin
elif page["type"] == "scm":
listed_origin = self.get_scm(page)
if listed_origin:
yield listed_origin
elif page["type"] == "maven":
# Origin is gathering source archives:
last_update_dt = None
last_update_iso = ""
try:
last_update_seconds = str(page["time"])[:-3]
last_update_dt = datetime.fromtimestamp(int(last_update_seconds))
last_update_dt = last_update_dt.astimezone(timezone.utc)
except (OverflowError, ValueError):
logger.warning("- Failed to convert datetime %s.", last_update_seconds)
if last_update_dt:
last_update_iso = last_update_dt.isoformat()
# Origin URL will target page holding sources for all versions of
# an artifactId (package name) inside a groupId (namespace)
path = "/".join(page["gid"].split("."))
origin_url = urljoin(self.BASE_URL, f"{path}/{page['aid']}")
artifact = {
**{k: v for k, v in page.items() if k != "doc"},
"time": last_update_iso,
"base_url": self.BASE_URL,
}
if origin_url != self.last_origin_url:
# All versions of a given maven package have been processed,
# current ListedOrigin can be yielded
if self.jar_origin and (
not self.incremental
# In incremental mode, yield maven origin only if it has new
# versions since last listing
or (any(doc > self.last_seen_doc for doc in self.jar_origin_docs))
):
yield self.jar_origin
# Keep track of maven index documents ids for package versions
self.jar_origin_docs = [page["doc"]]
# Create ListedOrigin instance for newly seen maven package
assert self.lister_obj.id is not None
self.jar_origin = ListedOrigin(
lister_id=self.lister_obj.id,
url=origin_url,
visit_type=page["type"],
last_update=last_update_dt,
extra_loader_arguments={"artifacts": [artifact]},
)
elif self.jar_origin:
# Update list of source artifacts for the current ListedOrigin
artifacts = self.jar_origin.extra_loader_arguments["artifacts"]
if artifact not in artifacts:
artifacts.append(artifact)
self.jar_origin_docs.append(page["doc"])
if (
self.jar_origin.last_update
and last_update_dt
and last_update_dt > self.jar_origin.last_update
):
self.jar_origin.last_update = last_update_dt
self.last_origin_url = origin_url
[docs]
def commit_page(self, page: RepoPage) -> None:
"""Update currently stored state using the latest listed doc.
Note: this is a noop for full listing mode
"""
if self.incremental and self.state:
# We need to differentiate the two state counters according
# to the type of origin.
if (
page
and page["type"] == "maven"
and page["doc"] > self.state.last_seen_doc
):
self.state.last_seen_doc = page["doc"]
elif (
page
and page["type"] == "scm"
and page["doc"] > self.state.last_seen_pom
):
self.state.last_seen_doc = page["doc"]
self.state.last_seen_pom = page["doc"]
[docs]
def finalize(self) -> None:
"""Finalize the lister state, set update if any progress has been made.
Note: this is a noop for full listing mode
"""
if self.incremental and self.state:
last_seen_doc = self.state.last_seen_doc
last_seen_pom = self.state.last_seen_pom
if last_seen_doc and last_seen_pom:
if (self.last_seen_doc < last_seen_doc) or (
self.last_seen_pom < last_seen_pom
):
self.updated = True