# Copyright (C) 2020-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from base64 import b64encode
import collections
import hashlib
from typing import Tuple
from unittest.mock import Mock, call
import pytest
from swh.dataset.exporters.edges import GraphEdgesExporter, sort_graph_nodes
from swh.dataset.utils import ZSTFile
from swh.model.hashutil import MultiHash, hash_to_bytes
from swh.model.model import ModelObjectType
DATE = {
"timestamp": {"seconds": 1234567891, "microseconds": 0},
"offset": 120,
"negative_utc": False,
}
TEST_CONTENT = {
**MultiHash.from_data(b"foo").digest(),
"length": 3,
"status": "visible",
}
TEST_REVISION = {
"id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"),
"message": b"hello",
"date": DATE,
"committer": {"fullname": b"foo", "name": b"foo", "email": b""},
"author": {"fullname": b"foo", "name": b"foo", "email": b""},
"committer_date": DATE,
"type": "git",
"directory": b"\x01" * 20,
"synthetic": False,
"metadata": None,
"parents": [],
}
TEST_RELEASE = {
"id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
"name": b"v0.0.1",
"date": {
"timestamp": {"seconds": 1234567890, "microseconds": 0},
"offset": 120,
"negative_utc": False,
},
"author": {"author": {"fullname": b"foo", "name": b"foo", "email": b""}},
"target_type": "revision",
"target": b"\x04" * 20,
"message": b"foo",
"synthetic": False,
}
TEST_ORIGIN = {"url": "https://somewhere.org/den/fox"}
TEST_ORIGIN_2 = {"url": "https://somewhere.org/den/fox/2"}
TEST_ORIGIN_VISIT_STATUS = {
"origin": TEST_ORIGIN["url"],
"visit": 1,
"date": "2013-05-07 04:20:39.369271+00:00",
"snapshot": None, # TODO
"status": "ongoing", # TODO
"metadata": {"foo": "bar"},
}
[docs]
class FakeDiskSet(set):
"""
A set with an add() method that returns whether the item has been added
or was already there. Used to replace disk sets in unittests.
"""
[docs]
def add(self, v):
assert isinstance(v, bytes)
r = True
if v in self:
r = False
super().add(v)
return r
[docs]
@pytest.fixture
def exporter():
def wrapped(messages, config=None) -> Tuple[Mock, Mock]:
if config is None:
config = {}
exporter = GraphEdgesExporter(config, "/dummy_path")
node_writer = Mock()
edge_writer = Mock()
exporter.get_writers_for = lambda *a, **k: ( # type: ignore
node_writer,
edge_writer,
)
for object_type, objects in messages.items():
for obj in objects:
exporter.process_object(object_type, obj)
return node_writer.write, edge_writer.write
return wrapped
[docs]
def binhash(s):
return hashlib.sha1(s.encode()).digest()
[docs]
def hexhash(s):
return hashlib.sha1(s.encode()).hexdigest()
[docs]
def b64e(s: str) -> str:
return b64encode(s.encode()).decode()
[docs]
def test_export_origin(exporter):
node_writer, edge_writer = exporter(
{
ModelObjectType.ORIGIN: [
{"url": "ori1"},
{"url": "ori2"},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:ori:{hexhash('ori1')}\n"),
call(f"swh:1:ori:{hexhash('ori2')}\n"),
]
assert edge_writer.mock_calls == []
[docs]
def test_export_origin_visit_status(exporter):
node_writer, edge_writer = exporter(
{
ModelObjectType.ORIGIN_VISIT_STATUS: [
{
**TEST_ORIGIN_VISIT_STATUS,
"origin": "ori1",
"snapshot": binhash("snp1"),
},
{
**TEST_ORIGIN_VISIT_STATUS,
"origin": "ori2",
"snapshot": binhash("snp2"),
},
]
}
)
assert node_writer.mock_calls == []
assert edge_writer.mock_calls == [
call(f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}\n"),
call(f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}\n"),
]
[docs]
def test_export_snapshot_simple(exporter):
node_writer, edge_writer = exporter(
{
ModelObjectType.SNAPSHOT: [
{
"id": binhash("snp1"),
"branches": {
b"refs/heads/master": {
"target": binhash("rev1"),
"target_type": "revision",
},
b"HEAD": {"target": binhash("rev1"), "target_type": "revision"},
},
},
{
"id": binhash("snp2"),
"branches": {
b"refs/heads/master": {
"target": binhash("rev1"),
"target_type": "revision",
},
b"HEAD": {"target": binhash("rev2"), "target_type": "revision"},
b"bcnt": {"target": binhash("cnt1"), "target_type": "content"},
b"bdir": {
"target": binhash("dir1"),
"target_type": "directory",
},
b"brel": {"target": binhash("rel1"), "target_type": "release"},
b"bsnp": {"target": binhash("snp1"), "target_type": "snapshot"},
},
},
{"id": binhash("snp3"), "branches": {}},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:snp:{hexhash('snp1')}\n"),
call(f"swh:1:snp:{hexhash('snp2')}\n"),
call(f"swh:1:snp:{hexhash('snp3')}\n"),
]
assert edge_writer.mock_calls == [
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
f" {b64e('refs/heads/master')}\n"
),
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
f" {b64e('HEAD')}\n"
),
call(
f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev1')}"
f" {b64e('refs/heads/master')}\n"
),
call(
f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev2')}"
f" {b64e('HEAD')}\n"
),
call(
f"swh:1:snp:{hexhash('snp2')} swh:1:cnt:{hexhash('cnt1')}"
f" {b64e('bcnt')}\n"
),
call(
f"swh:1:snp:{hexhash('snp2')} swh:1:dir:{hexhash('dir1')}"
f" {b64e('bdir')}\n"
),
call(
f"swh:1:snp:{hexhash('snp2')} swh:1:rel:{hexhash('rel1')}"
f" {b64e('brel')}\n"
),
call(
f"swh:1:snp:{hexhash('snp2')} swh:1:snp:{hexhash('snp1')}"
f" {b64e('bsnp')}\n"
),
]
[docs]
def test_export_snapshot_aliases(exporter):
node_writer, edge_writer = exporter(
{
ModelObjectType.SNAPSHOT: [
{
"id": binhash("snp1"),
"branches": {
b"origin_branch": {
"target": binhash("rev1"),
"target_type": "revision",
},
b"alias1": {"target": b"origin_branch", "target_type": "alias"},
b"alias2": {"target": b"alias1", "target_type": "alias"},
b"alias3": {"target": b"alias2", "target_type": "alias"},
},
},
]
}
)
assert node_writer.mock_calls == [call(f"swh:1:snp:{hexhash('snp1')}\n")]
assert edge_writer.mock_calls == [
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
f" {b64e('origin_branch')}\n"
),
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
f" {b64e('alias1')}\n"
),
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
f" {b64e('alias2')}\n"
),
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
f" {b64e('alias3')}\n"
),
]
[docs]
def test_export_snapshot_no_pull_requests(exporter):
snp = {
"id": binhash("snp1"),
"branches": {
b"refs/heads/master": {
"target": binhash("rev1"),
"target_type": "revision",
},
b"refs/pull/42": {"target": binhash("rev2"), "target_type": "revision"},
b"refs/merge-requests/lol": {
"target": binhash("rev3"),
"target_type": "revision",
},
b"refs/tags/v1.0.0": {
"target": binhash("rev4"),
"target_type": "revision",
},
b"refs/patch/123456abc": {
"target": binhash("rev5"),
"target_type": "revision",
},
},
}
node_writer, edge_writer = exporter({ModelObjectType.SNAPSHOT: [snp]})
assert edge_writer.mock_calls == [
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
f" {b64e('refs/heads/master')}\n"
),
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev2')}"
f" {b64e('refs/pull/42')}\n"
),
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev3')}"
f" {b64e('refs/merge-requests/lol')}\n"
),
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}"
f" {b64e('refs/tags/v1.0.0')}\n"
),
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev5')}"
f" {b64e('refs/patch/123456abc')}\n"
),
]
node_writer, edge_writer = exporter(
{ModelObjectType.SNAPSHOT: [snp]}, config={"remove_pull_requests": True}
)
assert edge_writer.mock_calls == [
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}"
f" {b64e('refs/heads/master')}\n"
),
call(
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}"
f" {b64e('refs/tags/v1.0.0')}\n"
),
]
[docs]
def test_export_releases(exporter):
node_writer, edge_writer = exporter(
{
ModelObjectType.RELEASE: [
{
**TEST_RELEASE,
"id": binhash("rel1"),
"target": binhash("rev1"),
"target_type": "revision",
},
{
**TEST_RELEASE,
"id": binhash("rel2"),
"target": binhash("rel1"),
"target_type": "release",
},
{
**TEST_RELEASE,
"id": binhash("rel3"),
"target": binhash("dir1"),
"target_type": "directory",
},
{
**TEST_RELEASE,
"id": binhash("rel4"),
"target": binhash("cnt1"),
"target_type": "content",
},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:rel:{hexhash('rel1')}\n"),
call(f"swh:1:rel:{hexhash('rel2')}\n"),
call(f"swh:1:rel:{hexhash('rel3')}\n"),
call(f"swh:1:rel:{hexhash('rel4')}\n"),
]
assert edge_writer.mock_calls == [
call(f"swh:1:rel:{hexhash('rel1')} swh:1:rev:{hexhash('rev1')}\n"),
call(f"swh:1:rel:{hexhash('rel2')} swh:1:rel:{hexhash('rel1')}\n"),
call(f"swh:1:rel:{hexhash('rel3')} swh:1:dir:{hexhash('dir1')}\n"),
call(f"swh:1:rel:{hexhash('rel4')} swh:1:cnt:{hexhash('cnt1')}\n"),
]
[docs]
def test_export_revision(exporter):
node_writer, edge_writer = exporter(
{
ModelObjectType.REVISION: [
{
**TEST_REVISION,
"id": binhash("rev1"),
"directory": binhash("dir1"),
"parents": [binhash("rev2"), binhash("rev3")],
},
{
**TEST_REVISION,
"id": binhash("rev2"),
"directory": binhash("dir2"),
"parents": [],
},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:rev:{hexhash('rev1')}\n"),
call(f"swh:1:rev:{hexhash('rev2')}\n"),
]
assert edge_writer.mock_calls == [
call(f"swh:1:rev:{hexhash('rev1')} swh:1:dir:{hexhash('dir1')}\n"),
call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}\n"),
call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev3')}\n"),
call(f"swh:1:rev:{hexhash('rev2')} swh:1:dir:{hexhash('dir2')}\n"),
]
[docs]
def test_export_directory(exporter):
node_writer, edge_writer = exporter(
{
ModelObjectType.DIRECTORY: [
{
"id": binhash("dir1"),
"entries": [
{
"type": "file",
"target": binhash("cnt1"),
"name": b"cnt1",
"perms": 0o644,
},
{
"type": "dir",
"target": binhash("dir2"),
"name": b"dir2",
"perms": 0o755,
},
{
"type": "rev",
"target": binhash("rev1"),
"name": b"rev1",
"perms": 0o160000,
},
],
},
{"id": binhash("dir2"), "entries": []},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:dir:{hexhash('dir1')}\n"),
call(f"swh:1:dir:{hexhash('dir2')}\n"),
]
assert edge_writer.mock_calls == [
call(
f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}"
f" {b64e('cnt1')} {0o644}\n"
),
call(
f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir2')}"
f" {b64e('dir2')} {0o755}\n"
),
call(
f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}"
f" {b64e('rev1')} {0o160000}\n"
),
]
[docs]
def test_export_content(exporter):
node_writer, edge_writer = exporter(
{
ModelObjectType.CONTENT: [
{**TEST_CONTENT, "sha1_git": binhash("cnt1")},
{**TEST_CONTENT, "sha1_git": binhash("cnt2")},
]
}
)
assert node_writer.mock_calls == [
call(f"swh:1:cnt:{hexhash('cnt1')}\n"),
call(f"swh:1:cnt:{hexhash('cnt2')}\n"),
]
assert edge_writer.mock_calls == []
[docs]
def zstwrite(fp, lines):
with ZSTFile(fp, "w") as writer:
for line in lines:
writer.write(line + "\n")
[docs]
def zstread(fp):
with ZSTFile(fp, "r") as reader:
return reader.read()
[docs]
def test_sort_pipeline(tmp_path):
short_type_mapping = {
"origin_visit_status": "ori",
"snapshot": "snp",
"release": "rel",
"revision": "rev",
"directory": "dir",
"content": "cnt",
}
input_nodes = [
f"swh:1:{short}:{hexhash(short + str(x))}"
for short in short_type_mapping.values()
for x in range(4)
]
input_edges = [
f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}",
f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}",
f"swh:1:ori:{hexhash('ori3')} swh:1:snp:{hexhash('snp3')}",
f"swh:1:ori:{hexhash('ori4')} swh:1:snp:{hexhash('snpX')}", # missing dest
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')} {b64e('dup1')}",
f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')} {b64e('dup2')}",
f"swh:1:snp:{hexhash('snp3')} swh:1:cnt:{hexhash('cnt1')} {b64e('c1')}",
f"swh:1:snp:{hexhash('snp4')} swh:1:rel:{hexhash('rel1')} {b64e('r1')}",
f"swh:1:rel:{hexhash('rel1')} swh:1:rel:{hexhash('rel2')}",
f"swh:1:rel:{hexhash('rel2')} swh:1:rev:{hexhash('rev1')}",
f"swh:1:rel:{hexhash('rel3')} swh:1:rev:{hexhash('rev2')}",
f"swh:1:rel:{hexhash('rel4')} swh:1:dir:{hexhash('dir1')}",
f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup
f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup
f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}",
f"swh:1:rev:{hexhash('rev2')} swh:1:rev:{hexhash('revX')}", # missing dest
f"swh:1:rev:{hexhash('rev3')} swh:1:rev:{hexhash('rev2')}",
f"swh:1:rev:{hexhash('rev4')} swh:1:dir:{hexhash('dir1')}",
f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')} {b64e('c1')} 42",
f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir1')} {b64e('d1')} 1337",
f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')} {b64e('r1')} 0",
]
for obj_type, short_obj_type in short_type_mapping.items():
p = tmp_path / obj_type
p.mkdir()
edges = [e for e in input_edges if e.startswith(f"swh:1:{short_obj_type}")]
zstwrite(p / "00.edges.csv.zst", edges[0::2])
zstwrite(p / "01.edges.csv.zst", edges[1::2])
nodes = [n for n in input_nodes if n.startswith(f"swh:1:{short_obj_type}")]
zstwrite(p / "00.nodes.csv.zst", nodes[0::2])
zstwrite(p / "01.nodes.csv.zst", nodes[1::2])
sort_graph_nodes(tmp_path, config={"sort_buffer_size": "1M"})
output_nodes = zstread(tmp_path / "graph.nodes.csv.zst").split("\n")
output_edges = zstread(tmp_path / "graph.edges.csv.zst").split("\n")
output_labels = zstread(tmp_path / "graph.labels.csv.zst").split("\n")
output_nodes = list(filter(bool, output_nodes))
output_edges = list(filter(bool, output_edges))
output_labels = list(filter(bool, output_labels))
expected_nodes = set(input_nodes) | set(e.split()[1] for e in input_edges)
assert output_nodes == sorted(expected_nodes)
assert int((tmp_path / "graph.nodes.count.txt").read_text()) == len(expected_nodes)
assert sorted(output_edges) == sorted(input_edges)
assert int((tmp_path / "graph.edges.count.txt").read_text()) == len(input_edges)
expected_labels = set(e[2] for e in [e.split() for e in input_edges] if len(e) > 2)
assert output_labels == sorted(expected_labels)
actual_node_stats = (tmp_path / "graph.nodes.stats.txt").read_text().strip()
expected_node_stats = "\n".join(
sorted(
"{} {}".format(k, v)
for k, v in collections.Counter(
node.split(":")[2] for node in expected_nodes
).items()
)
)
assert actual_node_stats == expected_node_stats
actual_edge_stats = (tmp_path / "graph.edges.stats.txt").read_text().strip()
expected_edge_stats = "\n".join(
sorted(
"{} {}".format(k, v)
for k, v in collections.Counter(
"{}:{}".format(edge.split(":")[2], edge.split(":")[5])
for edge in input_edges
).items()
)
)
assert actual_edge_stats == expected_edge_stats