# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from codecs import escape_decode
import json
from pathlib import Path
import re
import subprocess
from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Union
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import click
from swh.loader.mercurial.utils import get_minimum_env
from swh.model.cli import identify_object
from swh.model.git_objects import normalize_timestamp
from swh.model.hashutil import hash_to_bytehex
from swh.model.model import RevisionType
from swh.model.swhids import CoreSWHID, ObjectType
TAG_PATTERN = re.compile(b"([0-9A-Fa-f]{40}) +(.+)")
[docs]
class HgAuthor(NamedTuple):
"""Represent a Mercurial revision author."""
fullname: bytes
"""full name of the author"""
name: Optional[bytes]
"""name of the author"""
email: Optional[bytes]
"""email of the author"""
[docs]
@staticmethod
def from_bytes(data: bytes) -> "HgAuthor":
"""Convert bytes to an HgAuthor named tuple.
Expected format: "name <email>"
"""
from swh.loader.mercurial.converters import parse_author
result = parse_author(data)
return HgAuthor(
fullname=result["fullname"], name=result["name"], email=result["email"]
)
[docs]
def to_dict(self) -> Dict[str, Optional[bytes]]:
return {"fullname": self.fullname, "name": self.name, "email": self.email}
HG_REVISION_TEMPLATE = "\n".join(
[
"node_id:{node}",
"author:{author}",
"timestamp_offset:{date|json}",
"p1:{p1.node}",
"p2:{p2.node}",
"extras:{join(extras, '\nextras:')}",
]
) # Log template for HgRevision.from_bytes
NULL_NODE_ID = b"0" * 40 # Value used when no parent
[docs]
class HgRevision(NamedTuple):
"""Represent a Mercurial revision."""
node_id: bytes
"""raw bytes of the revision hash"""
author: HgAuthor
"""author of the revision"""
timestamp: bytes
"""timestamp of the revision"""
offset: bytes
"""offset of the revision"""
parents: List[bytes]
"""hex bytes of the revision's parents"""
extras: Dict[bytes, bytes]
"""metadata of the revision"""
description: bytes
"""description of the revision"""
[docs]
@staticmethod
def from_bytes(data: bytes, description: bytes) -> "HgRevision":
"""Convert bytes to an HgRevision named tuple.
Expected data format:
'''
node_id:{node}
author:{author}
timestamp_offset:[{timestamp}, {offset}]
p1:{p1}
p2:{p2}
extras:{key1}={value1}
...
extras:{keyn}={value}
'''
"""
lines = data.split(b"\n")
tuples = [line.split(b":", 1) for line in lines]
fields: Dict[str, Any] = {
"parents": [],
"extras": {},
"description": description,
}
for key, value in tuples:
if key == b"timestamp_offset":
timestamp, offset = json.loads(value)
fields["timestamp"] = timestamp
fields["offset"] = offset
elif key in (b"p1", b"p2"):
if value != NULL_NODE_ID:
fields["parents"].append(value)
elif key == b"extras":
extra_key, extra_value = value.split(b"=", 1)
fields["extras"][extra_key] = extra_value
elif key == b"author":
fields["author"] = HgAuthor.from_bytes(value)
else:
fields[key.decode()] = value
return HgRevision(**fields)
[docs]
def branch(self) -> bytes:
return self.extras.get(b"branch", b"default")
[docs]
def to_dict(self) -> Dict:
"""Convert a HgRevision to a dict for SWHID computation"""
date = normalize_timestamp(int(self.timestamp))
extra_headers = [
(b"time_offset_seconds", str(self.offset).encode("utf-8")),
]
for key, value in self.extras.items():
if key == b"branch" and value == b"default":
# branch default is skipped to match historical implementation
continue
if key == b"transplant_source":
# transplant_source is converted to hex
# to match historical implementation
value = hash_to_bytehex(escape_decode(value)[0])
extra_headers.append((key, value))
author = self.author.to_dict()
return {
"author": author,
"date": date,
"committer": author,
"committer_date": date,
"type": RevisionType.MERCURIAL.value,
"message": self.description,
"metadata": {"node": self.node_id},
"extra_headers": tuple(extra_headers),
"synthetic": False,
"parents": self.parents,
}
[docs]
class HgBranch(NamedTuple):
"""Represent a Mercurial branch."""
name: bytes
"""name of the branch"""
node_id: bytes
"""row bytes of the target revision hash"""
[docs]
class HgTag(NamedTuple):
"""Represent a Mercurial tag."""
name: bytes
"""name of the tag"""
node_id: bytes
"""hex bytes of the target revision"""
[docs]
class Hg:
"""Provide methods to extract data from a Mercurial repository."""
def __init__(self, repository_root: Path) -> None:
self._root = repository_root
def _output(self, *args) -> bytes:
"""Return the output of a `hg` call."""
return subprocess.check_output(
["hg", *args], cwd=self._root, env=get_minimum_env()
)
def _call(self, *args) -> None:
"""Perform a `hg` call."""
subprocess.check_call(
["hg", *args],
cwd=self._root,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
env=get_minimum_env(),
)
[docs]
def root(self) -> Path:
"""Return the root of the Mercurial repository."""
return self._root
[docs]
def log(self, rev: Optional[Union[bytes, str]] = None) -> List[HgRevision]:
"""Return the specified revisions of the Mercurial repository.
Mercurial revsets are supported. (See `hg help revsets`)
If no revision range is specified, return all revisions".
"""
if rev:
node_ids = self._output("log", "-r", rev, "-T", "{node}\n").splitlines()
else:
node_ids = self._output("log", "-T", "{node}\n").splitlines()
revisions = [self._revision(node_id) for node_id in reversed(node_ids)]
return revisions
def _revision(self, revision: bytes) -> HgRevision:
data = self._output("log", "-r", revision, "-T", HG_REVISION_TEMPLATE)
# hg log strips the description so the raw description has to be taken
# from debugdata
# The description follows some metadata and is separated from them
# by an empty line
_, desc = self._output("debugdata", "-c", revision).split(b"\n\n", 1)
return HgRevision.from_bytes(data, desc)
[docs]
def up(self, rev: bytes) -> None:
"""Update the repository working directory to the specified revision."""
self._call("up", rev)
[docs]
def branches(self) -> List[HgBranch]:
"""List the repository named branches."""
output = self._output("branches", "-T", "{branch}\n{node}\n\n").strip()
branches = []
for block in output.split(b"\n\n"):
name, node_id = block.splitlines()
branches.append(HgBranch(name=name, node_id=node_id))
return branches
[docs]
def tip(self) -> HgRevision:
"""Return the `tip` node-id."""
return self.log("tip")[0]
@click.group()
@click.option(
"--directory",
"-d",
help=("Path to the Mercurial repository. If unset, the current directory is used"),
)
@click.pass_context
def main(ctx, directory=None):
"""Compute the Software Heritage persistent identifier (SWHID) for the given
source code object(s).
For more details about SWHIDs see:
https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html
"""
# ensure that ctx.obj exists and is a dict (in case `cli()` is called
# by means other than the `if` block below)
ctx.ensure_object(dict)
root = Path(directory) if directory else Path()
if not root.exists():
raise IOError(f"{root!r} does not exists")
ctx.obj["HG_ROOT"] = root
[docs]
def identify_directory(path: Path) -> CoreSWHID:
"""Return the SWHID of the given path."""
return CoreSWHID.from_string(
identify_object(
"directory", follow_symlinks=True, exclude_patterns=[b".hg"], obj=str(path)
)
)
[docs]
class RevisionIdentity(NamedTuple):
"""Represent a swh revision identity."""
swhid: CoreSWHID
"""SWH Identifier of the revision."""
node_id: bytes
"""node_id hex bytes"""
directory_swhid: CoreSWHID
"""SWH Identifier of the directory"""
[docs]
def dir_uri(self) -> str:
"""Return the SWHID uri of the revision's directory."""
return f"{self.directory_swhid}\t{self.node_id.decode()}"
def __str__(self) -> str:
"""Return the string representation of a RevisionIdentity."""
return f"{self.swhid}\t{self.node_id.decode()}"
[docs]
def identify_revision(
hg: Hg,
rev: Optional[bytes] = None,
node_id_2_swhid: Optional[Dict[bytes, CoreSWHID]] = None,
) -> Iterator[RevisionIdentity]:
"""Return the repository revision identities.
Args:
hg: A `Hg` repository instance
rev: An optional revision or Mercurial revsets (See `hg help revsets`)
If not provided all the repository revisions will be computed.
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
It will be updated in place with new mappings.
"""
from swh.model.model import Revision
if node_id_2_swhid is None:
node_id_2_swhid = {}
for revision in hg.log(rev):
data = revision.to_dict()
hg.up(revision.node_id)
directory_swhid = identify_directory(hg.root())
data["directory"] = directory_swhid.object_id
parents = []
for parent in data["parents"]:
if parent not in node_id_2_swhid:
parent_revision = next(identify_revision(hg, parent, node_id_2_swhid))
node_id_2_swhid[parent] = parent_revision.swhid
assert node_id_2_swhid[parent].object_type == ObjectType.REVISION
parents.append(node_id_2_swhid[parent].object_id)
data["parents"] = parents
revision_swhid = Revision.from_dict(data).swhid()
node_id_2_swhid[revision.node_id] = revision_swhid
yield RevisionIdentity(
swhid=revision_swhid,
node_id=revision.node_id,
directory_swhid=directory_swhid,
)
[docs]
class ReleaseIdentity(NamedTuple):
"""Represent a swh release identity."""
swhid: CoreSWHID
"""SWH Identifier of the release."""
node_id: bytes
"""node_id hex bytes"""
name: bytes
"""name of the release"""
def __str__(self) -> str:
"""Return the string representation of a ReleaseIdentity."""
return f"{self.swhid}\t{self.name.decode()}"
[docs]
def identify_release(
hg: Hg,
node_id_2_swhid: Optional[Dict[bytes, CoreSWHID]] = None,
) -> Iterator[ReleaseIdentity]:
"""Return the repository's release identities.
Args:
hg: A `Hg` repository instance
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
If not provided it will be computed using `identify_revision`.
"""
from swh.model.model import Release, ReleaseTargetType
if node_id_2_swhid is None:
node_id_2_swhid = {
revision.node_id: revision.swhid for revision in identify_revision(hg)
}
for tag in hg.tags():
assert node_id_2_swhid[tag.node_id].object_type == ObjectType.REVISION
data = {
"name": tag.name,
"target": node_id_2_swhid[tag.node_id].object_id,
"target_type": ReleaseTargetType.REVISION.value,
"message": None,
"metadata": None,
"synthetic": False,
"author": {"name": None, "email": None, "fullname": b""},
"date": None,
}
release_swhid = Release.from_dict(data).swhid()
yield ReleaseIdentity(
swhid=release_swhid,
node_id=tag.node_id,
name=tag.name,
)
[docs]
def identify_snapshot(
hg: Hg,
node_id_2_swhid: Optional[Dict[bytes, CoreSWHID]] = None,
releases: Optional[List[ReleaseIdentity]] = None,
) -> CoreSWHID:
"""Return the repository snapshot identity.
Args:
hg: A `Hg` repository instance
node_id_2_swhid: An optional cache mapping hg node ids to SWHIDs
If not provided it will be computed using `identify_revision`.
release: an optional list of `ReleaseIdentity`.
If not provided it will be computed using `identify_release`.
"""
from swh.model.model import Snapshot, SnapshotTargetType
if node_id_2_swhid is None:
node_id_2_swhid = {
revision.node_id: revision.swhid for revision in identify_revision(hg)
}
if releases is None:
releases = [release for release in identify_release(hg, node_id_2_swhid)]
branches = {}
tip = hg.tip()
branches[b"HEAD"] = {
"target": tip.branch(),
"target_type": SnapshotTargetType.ALIAS.value,
}
for branch in hg.branches():
assert node_id_2_swhid[branch.node_id].object_type == ObjectType.REVISION
branches[branch.name] = {
"target": node_id_2_swhid[branch.node_id].object_id,
"target_type": SnapshotTargetType.REVISION.value,
}
for release in releases:
assert release.swhid.object_type == ObjectType.RELEASE
branches[release.name] = {
"target": release.swhid.object_id,
"target_type": SnapshotTargetType.RELEASE.value,
}
return Snapshot.from_dict({"branches": branches}).swhid()
@main.command()
@click.argument("rev", required=False)
@click.pass_context
def revision(ctx, rev):
"""Compute the SWHID of a given revision.
If specified REV allow to select a single or multiple revisions
(using the Mercurial revsets language: `hg help revsets`)
"""
hg = Hg(ctx.obj["HG_ROOT"])
for identity in identify_revision(hg, rev):
click.echo(identity)
@main.command()
@click.pass_context
def snapshot(ctx):
"""Compute the SWHID of the snapshot."""
root = ctx.obj["HG_ROOT"]
hg = Hg(root)
snapshot_swhid = identify_snapshot(hg)
click.echo(f"{snapshot_swhid}\t{root}")
@main.command()
@click.pass_context
def all(ctx):
"""Compute the SWHID of all the repository objects."""
root = ctx.obj["HG_ROOT"]
hg = Hg(root)
dir_uris = []
rev_uris = []
rel_uris = []
node_id_2_swhid = {}
for revision in identify_revision(hg):
dir_uris.append(revision.dir_uri())
rev_uris.append(str(revision))
node_id_2_swhid[revision.node_id] = revision.swhid
releases = []
for release in identify_release(hg, node_id_2_swhid):
rel_uris.append(str(release))
releases.append(release)
snapshot_swhid = identify_snapshot(hg, node_id_2_swhid, releases)
for uri in dir_uris + rev_uris + rel_uris:
click.echo(uri)
click.echo(f"{snapshot_swhid}\t{root}")
if __name__ == "__main__":
main()