# Copyright (C) 2020-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import collections
from contextlib import contextmanager
import hashlib
import math
from pathlib import Path
import tempfile
import pyorc
import pytest
from swh.export.exporters import orc
from swh.export.relational import MAIN_TABLES, RELATION_TABLES
from swh.model.model import (
Content,
Directory,
ModelObjectType,
Origin,
OriginVisit,
OriginVisitStatus,
Release,
Revision,
SkippedContent,
Snapshot,
TimestampWithTimezone,
)
from swh.model.tests.swh_model_data import TEST_OBJECTS
from swh.objstorage.factory import get_objstorage
[docs]
@contextmanager
def orc_tmpdir(tmpdir):
if tmpdir:
yield Path(tmpdir)
else:
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
[docs]
@contextmanager
def orc_sensitive_tmpdir(tmpdir):
if tmpdir:
yield Path(tmpdir)
else:
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
[docs]
@contextmanager
def orc_export(messages, config=None, tmpdir=None, sensitive_tmpdir=None):
with orc_tmpdir(tmpdir) as tmpdir:
with orc_sensitive_tmpdir(sensitive_tmpdir) as sensitive_tmpdir:
if config is None:
config = {}
with orc.ORCExporter(config, tmpdir, sensitive_tmpdir) as exporter:
for object_type, objects in messages.items():
for obj in objects:
exporter.process_object(object_type, obj)
yield tmpdir
[docs]
def orc_load(rootdir):
res = collections.defaultdict(list)
res["rootdir"] = rootdir
for obj_type_dir in rootdir.iterdir():
for orc_file in obj_type_dir.iterdir():
with orc_file.open("rb") as orc_obj:
reader = pyorc.Reader(
orc_obj,
converters={pyorc.TypeKind.TIMESTAMP: orc.SWHTimestampConverter},
)
obj_type = reader.user_metadata["swh_object_type"].decode()
res[obj_type].extend(reader)
return res
[docs]
def exporter(messages, config=None, tmpdir=None):
with orc_export(messages, config, tmpdir) as exportdir:
return orc_load(exportdir)
[docs]
def test_export_origin():
obj_type = Origin.object_type
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
sha1 = hashlib.sha1(obj.url.encode()).hexdigest()
assert (sha1, obj.url) in output[obj_type.value]
[docs]
def test_export_origin_visit():
obj_type = OriginVisit.object_type
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
obj.origin,
obj.visit,
orc.datetime_to_tuple(obj.date),
obj.type,
) in output[obj_type.value]
[docs]
def test_export_origin_visit_status():
obj_type = OriginVisitStatus.object_type
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
obj.origin,
obj.visit,
orc.datetime_to_tuple(obj.date),
obj.status,
orc.hash_to_hex_or_none(obj.snapshot),
obj.type,
) in output[obj_type.value]
[docs]
def test_export_snapshot():
obj_type = Snapshot.object_type
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (orc.hash_to_hex_or_none(obj.id),) in output["snapshot"]
for branch_name, branch in obj.branches.items():
if branch is None:
continue
assert (
orc.hash_to_hex_or_none(obj.id),
branch_name,
orc.hash_to_hex_or_none(branch.target),
str(branch.target_type.value),
) in output["snapshot_branch"]
[docs]
def test_export_release():
obj_type = Release.object_type
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
orc.hash_to_hex_or_none(obj.id),
obj.name,
obj.message,
orc.hash_to_hex_or_none(obj.target),
obj.target_type.value,
obj.author.fullname if obj.author else None,
*orc.swh_date_to_tuple(getattr(obj, "date", None)),
obj.raw_manifest,
) in output[obj_type.value]
[docs]
def test_export_revision():
obj_type = Revision.object_type
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
orc.hash_to_hex_or_none(obj.id),
obj.message,
obj.author.fullname,
*orc.swh_date_to_tuple(getattr(obj, "date", None)),
obj.committer.fullname,
*orc.swh_date_to_tuple(getattr(obj, "committer_date", None)),
orc.hash_to_hex_or_none(obj.directory),
obj.type.value,
obj.raw_manifest,
) in output["revision"]
for i, parent in enumerate(obj.parents):
assert (
orc.hash_to_hex_or_none(obj.id),
orc.hash_to_hex_or_none(parent),
i,
) in output["revision_history"]
[docs]
def test_export_directory():
obj_type = Directory.object_type
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (orc.hash_to_hex_or_none(obj.id), obj.raw_manifest) in output[
"directory"
]
for entry in obj.entries:
assert (
orc.hash_to_hex_or_none(obj.id),
entry.name,
entry.type,
orc.hash_to_hex_or_none(entry.target),
entry.perms,
) in output["directory_entry"]
[docs]
def test_export_content():
obj_type = Content.object_type
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
orc.hash_to_hex_or_none(obj.sha1),
orc.hash_to_hex_or_none(obj.sha1_git),
orc.hash_to_hex_or_none(obj.sha256),
orc.hash_to_hex_or_none(obj.blake2s256),
obj.length,
obj.status,
None,
) in output[obj_type.value]
[docs]
def test_export_skipped_content():
obj_type = SkippedContent.object_type
output = exporter({obj_type: TEST_OBJECTS[obj_type]})
for obj in TEST_OBJECTS[obj_type]:
assert (
orc.hash_to_hex_or_none(obj.sha1),
orc.hash_to_hex_or_none(obj.sha1_git),
orc.hash_to_hex_or_none(obj.sha256),
orc.hash_to_hex_or_none(obj.blake2s256),
obj.length,
obj.status,
obj.reason,
) in output[obj_type.value]
[docs]
def test_date_to_tuple():
ts = {"seconds": 123456, "microseconds": 1515}
assert orc.swh_date_to_tuple(
TimestampWithTimezone.from_dict({"timestamp": ts, "offset_bytes": b"+0100"})
) == (
(123456, 1515),
60,
b"+0100",
)
assert orc.swh_date_to_tuple(
TimestampWithTimezone.from_dict(
{
"timestamp": ts,
"offset": 120,
"negative_utc": False,
"offset_bytes": b"+0100",
}
)
) == ((123456, 1515), 60, b"+0100")
assert orc.swh_date_to_tuple(
TimestampWithTimezone.from_dict(
{
"timestamp": ts,
"offset": 120,
"negative_utc": False,
}
)
) == ((123456, 1515), 120, b"+0200")
assert orc.swh_date_to_tuple(
TimestampWithTimezone.from_dict(
{
"timestamp": ts,
"offset": 0,
"negative_utc": True,
}
)
) == (
(123456, 1515),
0,
b"-0000",
)
# mapping of related tables for each main table (if any)
RELATED = {
"snapshot": ["snapshot_branch"],
"revision": ["revision_history", "revision_extra_headers"],
"directory": ["directory_entry"],
}
[docs]
@pytest.mark.parametrize("table_name", RELATION_TABLES.keys())
def test_export_invalid_max_rows(table_name):
config = {"orc": {"max_rows": {table_name: 10}}}
with pytest.raises(ValueError):
exporter({}, config=config)
[docs]
def test_export_content_with_data(monkeypatch, tmpdir):
obj_type = Content.object_type
objstorage = get_objstorage("memory")
for content in TEST_OBJECTS[obj_type]:
objstorage.add(content=content.data, obj_id=content.hashes())
def get_objstorage_mock(**kw):
if kw.get("cls") == "mock":
return objstorage
monkeypatch.setattr(orc, "get_objstorage", get_objstorage_mock)
config = {
"orc": {
"with_data": True,
"objstorage": {"cls": "mock"},
},
}
output = exporter(
{obj_type: TEST_OBJECTS[obj_type]},
config=config,
tmpdir=tmpdir,
)
for obj in TEST_OBJECTS[obj_type]:
assert (
orc.hash_to_hex_or_none(obj.sha1),
orc.hash_to_hex_or_none(obj.sha1_git),
orc.hash_to_hex_or_none(obj.sha256),
orc.hash_to_hex_or_none(obj.blake2s256),
obj.length,
obj.status,
obj.data,
) in output[obj_type.value]