# Copyright (C) 2020-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import collections
from contextlib import contextmanager
import hashlib
import math
from pathlib import Path
import tempfile
import pyorc
import pytest
from swh.dataset.exporters import orc
from swh.dataset.relational import MAIN_TABLES, RELATION_TABLES
from swh.model.model import ModelObjectType
from swh.model.tests.swh_model_data import TEST_OBJECTS
from swh.objstorage.factory import get_objstorage
[docs]
@contextmanager
def orc_tmpdir(tmpdir):
if tmpdir:
yield Path(tmpdir)
else:
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
[docs]
@contextmanager
def orc_export(messages, config=None, tmpdir=None):
with orc_tmpdir(tmpdir) as tmpdir:
if config is None:
config = {}
with orc.ORCExporter(config, tmpdir) as exporter:
for object_type, objects in messages.items():
for obj in objects:
exporter.process_object(object_type, obj.to_dict())
yield tmpdir
[docs]
def orc_load(rootdir):
res = collections.defaultdict(list)
res["rootdir"] = rootdir
for obj_type_dir in rootdir.iterdir():
for orc_file in obj_type_dir.iterdir():
with orc_file.open("rb") as orc_obj:
reader = pyorc.Reader(
orc_obj,
converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter},
)
obj_type = reader.user_metadata["swh_object_type"].decode()
res[obj_type].extend(reader)
return res
[docs]
def exporter(messages, config=None, tmpdir=None):
with orc_export(messages, config, tmpdir) as exportdir:
return orc_load(exportdir)
[docs]
def test_export_origin():
obj_type = ModelObjectType.ORIGIN
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (hashlib.sha1(obj.url.encode()).hexdigest(), obj.url) in output[obj_type]
[docs]
def test_export_origin_visit():
obj_type = ModelObjectType.ORIGIN_VISIT
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
obj.origin,
obj.visit,
orc.datetime_to_tuple(obj.date),
obj.type,
) in output[obj_type]
[docs]
def test_export_origin_visit_status():
obj_type = ModelObjectType.ORIGIN_VISIT_STATUS
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
obj.origin,
obj.visit,
orc.datetime_to_tuple(obj.date),
obj.status,
orc.hash_to_hex_or_none(obj.snapshot),
obj.type,
) in output[obj_type]
[docs]
def test_export_snapshot():
obj_type = ModelObjectType.SNAPSHOT
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (orc.hash_to_hex_or_none(obj.id),) in output["snapshot"]
for branch_name, branch in obj.branches.items():
if branch is None:
continue
assert (
orc.hash_to_hex_or_none(obj.id),
branch_name,
orc.hash_to_hex_or_none(branch.target),
str(branch.target_type.value),
) in output["snapshot_branch"]
[docs]
def test_export_release():
obj_type = ModelObjectType.RELEASE
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
orc.hash_to_hex_or_none(obj.id),
obj.name,
obj.message,
orc.hash_to_hex_or_none(obj.target),
obj.target_type.value,
obj.author.fullname if obj.author else None,
*orc.swh_date_to_tuple(
obj.date.to_dict() if obj.date is not None else None
),
obj.raw_manifest,
) in output[obj_type]
[docs]
def test_export_revision():
obj_type = ModelObjectType.REVISION
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
orc.hash_to_hex_or_none(obj.id),
obj.message,
obj.author.fullname,
*orc.swh_date_to_tuple(
obj.date.to_dict() if obj.date is not None else None
),
obj.committer.fullname,
*orc.swh_date_to_tuple(
obj.committer_date.to_dict() if obj.committer_date is not None else None
),
orc.hash_to_hex_or_none(obj.directory),
obj.type.value,
obj.raw_manifest,
) in output["revision"]
for i, parent in enumerate(obj.parents):
assert (
orc.hash_to_hex_or_none(obj.id),
orc.hash_to_hex_or_none(parent),
i,
) in output["revision_history"]
[docs]
def test_export_directory():
obj_type = ModelObjectType.DIRECTORY
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (orc.hash_to_hex_or_none(obj.id), obj.raw_manifest) in output[
"directory"
]
for entry in obj.entries:
assert (
orc.hash_to_hex_or_none(obj.id),
entry.name,
entry.type,
orc.hash_to_hex_or_none(entry.target),
entry.perms,
) in output["directory_entry"]
[docs]
def test_export_content():
obj_type = ModelObjectType.CONTENT
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
orc.hash_to_hex_or_none(obj.sha1),
orc.hash_to_hex_or_none(obj.sha1_git),
orc.hash_to_hex_or_none(obj.sha256),
orc.hash_to_hex_or_none(obj.blake2s256),
obj.length,
obj.status,
None,
) in output[obj_type]
[docs]
def test_export_skipped_content():
obj_type = ModelObjectType.SKIPPED_CONTENT
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
orc.hash_to_hex_or_none(obj.sha1),
orc.hash_to_hex_or_none(obj.sha1_git),
orc.hash_to_hex_or_none(obj.sha256),
orc.hash_to_hex_or_none(obj.blake2s256),
obj.length,
obj.status,
obj.reason,
) in output[obj_type]
[docs]
def test_date_to_tuple():
ts = {"seconds": 123456, "microseconds": 1515}
assert orc.swh_date_to_tuple({"timestamp": ts, "offset_bytes": b"+0100"}) == (
(123456, 1515),
60,
b"+0100",
)
assert orc.swh_date_to_tuple(
{
"timestamp": ts,
"offset": 120,
"negative_utc": False,
"offset_bytes": b"+0100",
}
) == ((123456, 1515), 60, b"+0100")
assert orc.swh_date_to_tuple(
{
"timestamp": ts,
"offset": 120,
"negative_utc": False,
}
) == ((123456, 1515), 120, b"+0200")
assert orc.swh_date_to_tuple(
{
"timestamp": ts,
"offset": 0,
"negative_utc": True,
}
) == (
(123456, 1515),
0,
b"-0000",
)
# mapping of related tables for each main table (if any)
RELATED = {
"snapshot": ["snapshot_branch"],
"revision": ["revision_history", "revision_extra_headers"],
"directory": ["directory_entry"],
}
[docs]
@pytest.mark.parametrize("table_name", RELATION_TABLES.keys())
def test_export_invalid_max_rows(table_name):
config = {"orc": {"max_rows": {table_name: 10}}}
with pytest.raises(ValueError):
exporter({}, config=config)
[docs]
def test_export_content_with_data(monkeypatch, tmpdir):
obj_type = "content"
objstorage = get_objstorage("memory")
for content in TEST_OBJECTS[obj_type]:
objstorage.add(content=content.data, obj_id=content.hashes())
def get_objstorage_mock(**kw):
if kw.get("cls") == "mock":
return objstorage
monkeypatch.setattr(orc, "get_objstorage", get_objstorage_mock)
config = {
"orc": {
"with_data": True,
"objstorage": {"cls": "mock"},
},
}
output = exporter(
{ModelObjectType(obj_type): TEST_OBJECTS[obj_type]},
config=config,
tmpdir=tmpdir,
)
for obj in TEST_OBJECTS[obj_type]:
assert (
orc.hash_to_hex_or_none(obj.sha1),
orc.hash_to_hex_or_none(obj.sha1_git),
orc.hash_to_hex_or_none(obj.sha256),
orc.hash_to_hex_or_none(obj.blake2s256),
obj.length,
obj.status,
obj.data,
) in output[obj_type]