Source code for swh.export.test.test_edges

# Copyright (C) 2020-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from base64 import b64encode
import collections
from datetime import datetime
import hashlib
from typing import Tuple
from unittest.mock import Mock, call

import pytest

from swh.export.exporters.edges import GraphEdgesExporter, sort_graph_nodes
from swh.export.journalprocessor import _turn_message_into_objects
from swh.export.utils import ZSTFile
from swh.model.hashutil import MultiHash, hash_to_bytes
from swh.model.model import ModelObjectType

DATE = {
    "timestamp": {"seconds": 1234567891, "microseconds": 0},
    "offset": 120,
    "negative_utc": False,
}

TEST_CONTENT = {
    **MultiHash.from_data(b"foo").digest(),
    "length": 3,
    "status": "visible",
}

TEST_REVISION = {
    "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"),
    "message": b"hello",
    "date": DATE,
    "committer": {"fullname": b"foo", "name": b"foo", "email": b""},
    "author": {"fullname": b"foo", "name": b"foo", "email": b""},
    "committer_date": DATE,
    "type": "git",
    "directory": b"\x01" * 20,
    "synthetic": False,
    "metadata": None,
    "parents": [],
}

TEST_RELEASE = {
    "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
    "name": b"v0.0.1",
    "date": {
        "timestamp": {"seconds": 1234567890, "microseconds": 0},
        "offset": 120,
        "negative_utc": False,
    },
    "author": {"fullname": b"foo", "name": b"foo", "email": b""},
    "target_type": "revision",
    "target": b"\x04" * 20,
    "message": b"foo",
    "synthetic": False,
}

TEST_ORIGIN = {"url": "https://somewhere.org/den/fox"}
TEST_ORIGIN_2 = {"url": "https://somewhere.org/den/fox/2"}

TEST_ORIGIN_VISIT_STATUS = {
    "origin": TEST_ORIGIN["url"],
    "visit": 1,
    "date": datetime.fromisoformat("2013-05-07 04:20:39.369271+00:00"),
    "snapshot": None,  # TODO
    "status": "ongoing",  # TODO
    "metadata": {"foo": "bar"},
}


[docs] class FakeDiskSet(set): """ A set with an add() method that returns whether the item has been added or was already there. Used to replace disk sets in unittests. """
[docs] def add(self, v): assert isinstance(v, bytes) r = True if v in self: r = False super().add(v) return r
[docs] @pytest.fixture def exporter(): def wrapped( messages: dict[ModelObjectType, list[dict]], config=None ) -> Tuple[Mock, Mock]: if config is None: config = {} exporter = GraphEdgesExporter(config, "/dummy_path", "/dummy_sensitive_path") node_writer = Mock() edge_writer = Mock() exporter.get_writers_for = lambda *a, **k: ( # type: ignore node_writer, edge_writer, ) for object_type, message_list in messages.items(): for message in message_list: obj = _turn_message_into_objects(object_type.value, (b"", message))[1] assert obj is not None exporter.process_object(object_type, obj) return node_writer.write, edge_writer.write return wrapped
[docs] def binhash(s): return hashlib.sha1(s.encode()).digest()
[docs] def hexhash(s): return hashlib.sha1(s.encode()).hexdigest()
[docs] def b64e(s: str) -> str: return b64encode(s.encode()).decode()
[docs] def test_export_origin(exporter): node_writer, edge_writer = exporter( { ModelObjectType.ORIGIN: [ {"url": "ori1"}, {"url": "ori2"}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:ori:{hexhash('ori1')}\n"), call(f"swh:1:ori:{hexhash('ori2')}\n"), ] assert edge_writer.mock_calls == []
[docs] def test_export_origin_visit_status(exporter): node_writer, edge_writer = exporter( { ModelObjectType.ORIGIN_VISIT_STATUS: [ { **TEST_ORIGIN_VISIT_STATUS, "origin": "ori1", "snapshot": binhash("snp1"), }, { **TEST_ORIGIN_VISIT_STATUS, "origin": "ori2", "snapshot": binhash("snp2"), }, ] } ) assert node_writer.mock_calls == [] assert edge_writer.mock_calls == [ call(f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}\n"), call(f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}\n"), ]
[docs] def test_export_snapshot_simple(exporter): node_writer, edge_writer = exporter( { ModelObjectType.SNAPSHOT: [ { "id": binhash("snp1"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"HEAD": {"target": binhash("rev1"), "target_type": "revision"}, }, }, { "id": binhash("snp2"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"HEAD": {"target": binhash("rev2"), "target_type": "revision"}, b"bcnt": {"target": binhash("cnt1"), "target_type": "content"}, b"bdir": { "target": binhash("dir1"), "target_type": "directory", }, b"brel": {"target": binhash("rel1"), "target_type": "release"}, b"bsnp": {"target": binhash("snp1"), "target_type": "snapshot"}, }, }, {"id": binhash("snp3"), "branches": {}}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:snp:{hexhash('snp1')}\n"), call(f"swh:1:snp:{hexhash('snp2')}\n"), call(f"swh:1:snp:{hexhash('snp3')}\n"), ] assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('HEAD')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:rev:{hexhash('rev2')}" f" {b64e('HEAD')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:cnt:{hexhash('cnt1')}" f" {b64e('bcnt')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:dir:{hexhash('dir1')}" f" {b64e('bdir')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:rel:{hexhash('rel1')}" f" {b64e('brel')}\n" ), call( f"swh:1:snp:{hexhash('snp2')} swh:1:snp:{hexhash('snp1')}" f" {b64e('bsnp')}\n" ), ]
[docs] def test_export_snapshot_aliases(exporter): node_writer, edge_writer = exporter( { ModelObjectType.SNAPSHOT: [ { "id": binhash("snp1"), "branches": { b"origin_branch": { "target": binhash("rev1"), "target_type": "revision", }, b"alias1": {"target": b"origin_branch", "target_type": "alias"}, b"alias2": {"target": b"alias1", "target_type": "alias"}, b"alias3": {"target": b"alias2", "target_type": "alias"}, }, }, ] } ) assert node_writer.mock_calls == [call(f"swh:1:snp:{hexhash('snp1')}\n")] assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('origin_branch')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('alias1')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('alias2')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('alias3')}\n" ), ]
[docs] def test_export_snapshot_no_pull_requests(exporter): snp = { "id": binhash("snp1"), "branches": { b"refs/heads/master": { "target": binhash("rev1"), "target_type": "revision", }, b"refs/pull/42": {"target": binhash("rev2"), "target_type": "revision"}, b"refs/merge-requests/lol": { "target": binhash("rev3"), "target_type": "revision", }, b"refs/tags/v1.0.0": { "target": binhash("rev4"), "target_type": "revision", }, b"refs/patch/123456abc": { "target": binhash("rev5"), "target_type": "revision", }, }, } node_writer, edge_writer = exporter({ModelObjectType.SNAPSHOT: [snp]}) assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev2')}" f" {b64e('refs/pull/42')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev3')}" f" {b64e('refs/merge-requests/lol')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}" f" {b64e('refs/tags/v1.0.0')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev5')}" f" {b64e('refs/patch/123456abc')}\n" ), ] node_writer, edge_writer = exporter( {ModelObjectType.SNAPSHOT: [snp]}, config={"remove_pull_requests": True} ) assert edge_writer.mock_calls == [ call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('refs/heads/master')}\n" ), call( f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev4')}" f" {b64e('refs/tags/v1.0.0')}\n" ), ]
[docs] def test_export_releases(exporter): node_writer, edge_writer = exporter( { ModelObjectType.RELEASE: [ { **TEST_RELEASE, "id": binhash("rel1"), "target": binhash("rev1"), "target_type": "revision", }, { **TEST_RELEASE, "id": binhash("rel2"), "target": binhash("rel1"), "target_type": "release", }, { **TEST_RELEASE, "id": binhash("rel3"), "target": binhash("dir1"), "target_type": "directory", }, { **TEST_RELEASE, "id": binhash("rel4"), "target": binhash("cnt1"), "target_type": "content", }, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:rel:{hexhash('rel1')}\n"), call(f"swh:1:rel:{hexhash('rel2')}\n"), call(f"swh:1:rel:{hexhash('rel3')}\n"), call(f"swh:1:rel:{hexhash('rel4')}\n"), ] assert edge_writer.mock_calls == [ call(f"swh:1:rel:{hexhash('rel1')} swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:rel:{hexhash('rel2')} swh:1:rel:{hexhash('rel1')}\n"), call(f"swh:1:rel:{hexhash('rel3')} swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:rel:{hexhash('rel4')} swh:1:cnt:{hexhash('cnt1')}\n"), ]
[docs] def test_export_revision(exporter): node_writer, edge_writer = exporter( { ModelObjectType.REVISION: [ { **TEST_REVISION, "id": binhash("rev1"), "directory": binhash("dir1"), "parents": [binhash("rev2"), binhash("rev3")], }, { **TEST_REVISION, "id": binhash("rev2"), "directory": binhash("dir2"), "parents": [], }, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:rev:{hexhash('rev1')}\n"), call(f"swh:1:rev:{hexhash('rev2')}\n"), ] assert edge_writer.mock_calls == [ call(f"swh:1:rev:{hexhash('rev1')} swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}\n"), call(f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev3')}\n"), call(f"swh:1:rev:{hexhash('rev2')} swh:1:dir:{hexhash('dir2')}\n"), ]
[docs] def test_export_directory(exporter): node_writer, edge_writer = exporter( { ModelObjectType.DIRECTORY: [ { "id": binhash("dir1"), "entries": [ { "type": "file", "target": binhash("cnt1"), "name": b"cnt1", "perms": 0o644, }, { "type": "dir", "target": binhash("dir2"), "name": b"dir2", "perms": 0o755, }, { "type": "rev", "target": binhash("rev1"), "name": b"rev1", "perms": 0o160000, }, ], }, {"id": binhash("dir2"), "entries": []}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:dir:{hexhash('dir1')}\n"), call(f"swh:1:dir:{hexhash('dir2')}\n"), ] assert edge_writer.mock_calls == [ call( f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')}" f" {b64e('cnt1')} {0o644}\n" ), call( f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir2')}" f" {b64e('dir2')} {0o755}\n" ), call( f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')}" f" {b64e('rev1')} {0o160000}\n" ), ]
[docs] def test_export_content(exporter): node_writer, edge_writer = exporter( { ModelObjectType.CONTENT: [ {**TEST_CONTENT, "sha1_git": binhash("cnt1")}, {**TEST_CONTENT, "sha1_git": binhash("cnt2")}, ] } ) assert node_writer.mock_calls == [ call(f"swh:1:cnt:{hexhash('cnt1')}\n"), call(f"swh:1:cnt:{hexhash('cnt2')}\n"), ] assert edge_writer.mock_calls == []
[docs] def zstwrite(fp, lines): with ZSTFile(fp, "w") as writer: for line in lines: writer.write(line + "\n")
[docs] def zstread(fp): with ZSTFile(fp, "r") as reader: return reader.read()
[docs] def test_sort_pipeline(tmp_path): short_type_mapping = { "origin_visit_status": "ori", "snapshot": "snp", "release": "rel", "revision": "rev", "directory": "dir", "content": "cnt", } input_nodes = [ f"swh:1:{short}:{hexhash(short + str(x))}" for short in short_type_mapping.values() for x in range(4) ] input_edges = [ f"swh:1:ori:{hexhash('ori1')} swh:1:snp:{hexhash('snp1')}", f"swh:1:ori:{hexhash('ori2')} swh:1:snp:{hexhash('snp2')}", f"swh:1:ori:{hexhash('ori3')} swh:1:snp:{hexhash('snp3')}", f"swh:1:ori:{hexhash('ori4')} swh:1:snp:{hexhash('snpX')}", # missing dest f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')} {b64e('dup1')}", f"swh:1:snp:{hexhash('snp1')} swh:1:rev:{hexhash('rev1')} {b64e('dup2')}", f"swh:1:snp:{hexhash('snp3')} swh:1:cnt:{hexhash('cnt1')} {b64e('c1')}", f"swh:1:snp:{hexhash('snp4')} swh:1:rel:{hexhash('rel1')} {b64e('r1')}", f"swh:1:rel:{hexhash('rel1')} swh:1:rel:{hexhash('rel2')}", f"swh:1:rel:{hexhash('rel2')} swh:1:rev:{hexhash('rev1')}", f"swh:1:rel:{hexhash('rel3')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rel:{hexhash('rel4')} swh:1:dir:{hexhash('dir1')}", f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev1')}", # dup f"swh:1:rev:{hexhash('rev1')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rev:{hexhash('rev2')} swh:1:rev:{hexhash('revX')}", # missing dest f"swh:1:rev:{hexhash('rev3')} swh:1:rev:{hexhash('rev2')}", f"swh:1:rev:{hexhash('rev4')} swh:1:dir:{hexhash('dir1')}", f"swh:1:dir:{hexhash('dir1')} swh:1:cnt:{hexhash('cnt1')} {b64e('c1')} 42", f"swh:1:dir:{hexhash('dir1')} swh:1:dir:{hexhash('dir1')} {b64e('d1')} 1337", f"swh:1:dir:{hexhash('dir1')} swh:1:rev:{hexhash('rev1')} {b64e('r1')} 0", ] for obj_type, short_obj_type in short_type_mapping.items(): p = tmp_path / obj_type p.mkdir() edges = [e for e in input_edges if e.startswith(f"swh:1:{short_obj_type}")] zstwrite(p / "00.edges.csv.zst", edges[0::2]) zstwrite(p / "01.edges.csv.zst", edges[1::2]) nodes = [n for n in input_nodes if n.startswith(f"swh:1:{short_obj_type}")] zstwrite(p / "00.nodes.csv.zst", nodes[0::2]) zstwrite(p / "01.nodes.csv.zst", nodes[1::2]) sort_graph_nodes(tmp_path, config={"sort_buffer_size": "1M"}) output_nodes = zstread(tmp_path / "graph.nodes.csv.zst").split("\n") output_edges = zstread(tmp_path / "graph.edges.csv.zst").split("\n") output_labels = zstread(tmp_path / "graph.labels.csv.zst").split("\n") output_nodes = list(filter(bool, output_nodes)) output_edges = list(filter(bool, output_edges)) output_labels = list(filter(bool, output_labels)) expected_nodes = set(input_nodes) | set(e.split()[1] for e in input_edges) assert output_nodes == sorted(expected_nodes) assert int((tmp_path / "graph.nodes.count.txt").read_text()) == len(expected_nodes) assert sorted(output_edges) == sorted(input_edges) assert int((tmp_path / "graph.edges.count.txt").read_text()) == len(input_edges) expected_labels = set(e[2] for e in [e.split() for e in input_edges] if len(e) > 2) assert output_labels == sorted(expected_labels) actual_node_stats = (tmp_path / "graph.nodes.stats.txt").read_text().strip() expected_node_stats = "\n".join( sorted( "{} {}".format(k, v) for k, v in collections.Counter( node.split(":")[2] for node in expected_nodes ).items() ) ) assert actual_node_stats == expected_node_stats actual_edge_stats = (tmp_path / "graph.edges.stats.txt").read_text().strip() expected_edge_stats = "\n".join( sorted( "{} {}".format(k, v) for k, v in collections.Counter( "{}:{}".format(edge.split(":")[2], edge.split(":")[5]) for edge in input_edges ).items() ) ) assert actual_edge_stats == expected_edge_stats