# Copyright (C) 2022-2025 zimoun and the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
import contextlib
import hashlib
import io
import os
from pathlib import Path
import stat
import struct
import tempfile
from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Protocol, Union
from typing_extensions import Buffer
from swh.core.tarball import uncompress
CHUNK_SIZE = 65536
def _identity(hash: bytes) -> bytes:
return hash
def _convert_hex(hash: bytes) -> str:
return hash.hex()
def _convert_b64(hash: bytes) -> str:
return base64.b64encode(bytes.fromhex(hash.hex())).decode()
_chars = "0123456789abcdfghijklmnpqrsvwxyz"
# base32 digest used in nix-hash, Python implementation found on
# https://bombrary.github.io/blog/posts/nix-impl-digest/
def _convert_b32(hash: bytes) -> str:
hash_bits = 8 * len(hash)
nix32_len = (hash_bits - 1) // 5 + 1
s = ""
for n in range(nix32_len - 1, -1, -1):
b = n * 5
i = b // 8
j = b % 8
c = (hash[i] >> j) | (hash[i + 1] << (8 - j) if i + 1 < len(hash) else 0)
s = s + _chars[c & 0x1F]
return s
[docs]
class Nar:
"""NAR serializer.
This builds the NAR structure and serializes it as per the phd thesis from Eelco
Dolstra thesis. See https://edolstra.github.io/pubs/phd-thesis.pdf.
For example, this tree on a filesystem:
.. code::
$ tree foo
foo
├── bar
│ └── exe
└── baz
1 directory, 2 files
serializes as:
.. code::
nix-archive-1(typedirectoryentry(namebarnode(typedirectoryentry(nameexenode(typeregularexecutablecontents<Content of file foo/bar/exe>))))entry(namebaznode(typeregularcontents<Content of file foo/baz>)))
For reability, the debug mode prints the following:
.. code::
nix-archive-1
(
type
directory
entry
(
name
bar
node
(
type
directory
entry
(
name
exe
node
(
type
regular
executable
contents
<Content of file foo/bar/exe>
)
)
)
)
entry
(
name
baz
node
(
type
regular
contents
<Content of file foo/baz>
)
)
)
Note: "<Content of file $name>" is a placeholder for the actual file content
""" # noqa
def __init__(
self,
hash_names: List[str],
exclude_vcs: bool = False,
vcs_type: Optional[str] = "git",
debug: bool = False,
):
self.hash_names = hash_names
self.updater = {
hash_name: (
hashlib.sha256() if hash_name.lower() == "sha256" else hashlib.sha1()
)
for hash_name in hash_names
}
self.exclude_vcs = exclude_vcs
self.vcs_type = vcs_type
self.debug = debug
self.indent = 0
self.nar_serialization = b""
[docs]
def str_(self, thing: Union[str, io.BufferedReader, list]) -> None:
"""Compute the nar serialization format on 'thing' and compute its hash.
This is the function named named 'str' in Figure 5.2 p.93 (page 101 of pdf) [1]
[1] https://edolstra.github.io/pubs/phd-thesis.pdf
"""
if self.debug and isinstance(thing, (str, io.BufferedReader)):
indent = "".join([" " for _ in range(self.indent)])
if isinstance(thing, io.BufferedReader):
msg = f"{indent} <Content of file {thing.name}>"
else:
msg = f"{indent}{thing}"
print(msg)
# named 'int'
if isinstance(thing, str):
byte_sequence = thing.encode("utf-8")
length = len(byte_sequence)
elif isinstance(thing, io.BufferedReader):
length = os.stat(thing.name).st_size
# ease reading of _serialize
elif isinstance(thing, list):
for stuff in thing:
self.str_(stuff)
return
else:
raise ValueError("not string nor file")
blen = length.to_bytes(8, byteorder="little") # 64-bit little endian
self.update(blen)
# first part of 'pad'
if isinstance(thing, str):
self.update(byte_sequence)
elif isinstance(thing, io.BufferedReader):
for chunk in iter(lambda: thing.read(CHUNK_SIZE), b""):
self.update(chunk)
# second part of 'pad
m = length % 8
if m == 0:
offset = 0
else:
offset = 8 - m
boffset = bytearray(offset)
self.update(boffset)
[docs]
def update(self, chunk: bytes) -> None:
self.nar_serialization += chunk
for hash_name in self.hash_names:
self.updater[hash_name].update(chunk)
def _serialize_directory(self, fso: Path) -> None:
"""On the first level of the main tree, we may have to skip some paths (e.g.
.git, ...). Once those are ignored, we can serialize the remaining part of the
entries.
"""
path_to_ignore = (
f"{fso}/.{self.vcs_type}" if self.exclude_vcs and self.vcs_type else None
)
for path in sorted(Path(fso).iterdir()):
if path_to_ignore is None or not path.match(path_to_ignore):
self._serializeEntry(path)
def _serialize(self, fso: Path) -> None:
if self.debug:
self.indent += 1
self.str_("(")
mode = os.lstat(fso).st_mode
if stat.S_ISREG(mode):
self.str_(["type", "regular"])
if mode & 0o111 != 0:
self.str_(["executable", ""])
self.str_("contents")
with open(str(fso), "rb") as f:
self.str_(f)
elif stat.S_ISLNK(mode):
self.str_(["type", "symlink", "target"])
self.str_(os.readlink(fso))
elif stat.S_ISDIR(mode):
self.str_(["type", "directory"])
self._serialize_directory(fso)
else:
raise ValueError("unsupported file type")
self.str_(")")
if self.debug:
self.indent -= 1
def _serializeEntry(self, fso: Path) -> None:
if self.debug:
self.indent += 1
self.str_(["entry", "(", "name", fso.name, "node"])
self._serialize(fso)
self.str_(")")
if self.debug:
self.indent -= 1
[docs]
def serialize(self, fso: Path) -> bytes:
self.nar_serialization = b""
self.str_("nix-archive-1")
self._serialize(fso)
return self.nar_serialization
def _compute_result(self, convert_fn) -> Dict[str, Any]:
return {
hash_name: convert_fn(self.updater[hash_name].digest())
for hash_name in self.hash_names
}
[docs]
def digest(self) -> Dict[str, bytes]:
"""Compute the hash results with bytes format."""
return self._compute_result(_identity)
[docs]
def hexdigest(self) -> Dict[str, str]:
"""Compute the hash results with hex format."""
return self._compute_result(_convert_hex)
[docs]
def b64digest(self) -> Dict[str, str]:
"""Compute the hash results with b64 format."""
return self._compute_result(_convert_b64)
[docs]
def b32digest(self) -> Dict[str, str]:
"""Compute the hash results with b32 format."""
return self._compute_result(_convert_b32)
[docs]
def compute_nar_hashes(
filepath: Path,
hash_names: List[str] = ["sha256"],
is_tarball=True,
top_level=True,
) -> Dict[str, str]:
"""Compute nar checksums dict out of a filepath (tarball or plain file).
If it's a tarball, this uncompresses the tarball in a temporary directory to compute
the nar hashes (and then cleans it up).
Args:
filepath: The tarball (if is_tarball is True) or a filepath
hash_names: The list of checksums to compute
is_tarball: Whether filepath represents a tarball or not
top_level: Whether we want to compute the top-level directory (of the tarball)
hashes. This is only useful when used with 'is_tarball' at True.
Returns:
The dict of checksums values whose keys are present in hash_names.
"""
with tempfile.TemporaryDirectory() as tmpdir:
if is_tarball:
directory_path = Path(tmpdir)
directory_path.mkdir(parents=True, exist_ok=True)
uncompress(str(filepath), dest=str(directory_path))
if top_level:
# Default behavior, pass the extracted tarball path root directory
path_on_disk = directory_path
else:
# Pass along the first directory of the tarball
path_on_disk = next(iter(directory_path.iterdir()))
else:
path_on_disk = filepath
nar = Nar(hash_names)
nar.serialize(path_on_disk)
hashes = nar.hexdigest()
return hashes
[docs]
def nar_serialize(
path: Path,
exclude_vcs: bool = False,
vcs_type: Optional[str] = "git",
) -> bytes:
"""Return the NAR serialization of a path.
Args:
path: The path to NAR serialize, can be a file or a directory.
exclude_vcs: Whether to exclude VCS related directories (.git for instance).
vcs_type: The type of VCS to exclude related directories, default to git.
Returns:
The NAR serialization of the path.
"""
nar = Nar(hash_names=["sha256"], exclude_vcs=exclude_vcs, vcs_type=vcs_type)
return nar.serialize(path)
# The code below is adapted from the narflinger project (https://github.com/wh0/narflinger)
# MIT License
#
# Copyright (c) 2023 wh0
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
class _NARFileReader(Protocol):
def read1(self, size: int) -> bytes: ...
def finish(self) -> None: ...
def close(self) -> None: ...
def _reader_read_limit(input: _NARFileReader, size: int) -> bytes:
return input.read1(size)
def _reader_read_exact(input: _NARFileReader, size: int) -> bytes:
piece = input.read1(size)
piece_len = len(piece)
if piece_len == size:
return piece
remaining = size - piece_len
pieces = [piece]
while remaining:
piece = input.read1(remaining)
pieces.append(piece)
remaining -= len(piece)
return b"".join(pieces)
def _reader_skip_exact(input: _NARFileReader, size: int) -> None:
remaining = size
while remaining:
piece = input.read1(remaining)
remaining -= len(piece)
def _nar_read_int(input: _NARFileReader) -> int:
b = _reader_read_exact(input, 8)
return struct.unpack("<Q", b)[0]
def _nar_skip_padding(input: _NARFileReader, length: int) -> None:
modulo = length & 7
if modulo:
_reader_skip_exact(input, 8 - modulo)
def _nar_read_bytes(input: _NARFileReader) -> bytes:
length = _nar_read_int(input)
if not length:
return b""
b = _reader_read_exact(input, length)
_nar_skip_padding(input, length)
return b
def _nar_generate_binary(input: _NARFileReader) -> Iterator[bytes]:
length = _nar_read_int(input)
remaining = length
while remaining:
piece = _reader_read_limit(input, remaining)
yield piece
remaining -= len(piece)
_nar_skip_padding(input, length)
def _nar_expect_bytes(input: _NARFileReader, expected: bytes) -> None:
b = _nar_read_bytes(input)
if b != expected:
raise Exception("unexpected %r, expected %r" % (b, expected))
def _nar_generate_pair_keys(input: _NARFileReader) -> Iterator[bytes]:
_nar_expect_bytes(input, b"(")
while True:
k = _nar_read_bytes(input)
if k == b")":
break
yield k
def _nar_unpack_dir_entry(dest_path: str, input: _NARFileReader) -> None:
name = None
for k in _nar_generate_pair_keys(input):
if k == b"name":
name = _nar_read_bytes(input)
elif k == b"node":
assert name is not None
_nar_unpack_node(os.path.join(dest_path, name.decode("utf-8")), input)
else:
raise Exception("dir entry unrecognized key %r" % k)
def _nar_unpack_node(dest_path: str, input: _NARFileReader) -> None:
type = None
executable = False
for k in _nar_generate_pair_keys(input):
if k == b"type":
type = _nar_read_bytes(input)
if type == b"regular":
pass
elif type == b"symlink":
pass
elif type == b"directory":
os.mkdir(dest_path)
else:
raise Exception("unrecognized type %r" % type)
elif k == b"executable":
_nar_expect_bytes(input, b"")
executable = True
elif k == b"contents":
dst_fd = os.open(
dest_path, os.O_WRONLY | os.O_CREAT, 0o777 if executable else 0o666
)
for b in _nar_generate_binary(input):
os.write(dst_fd, b)
os.close(dst_fd)
elif k == b"target":
target = _nar_read_bytes(input)
os.symlink(target, dest_path)
elif k == b"entry":
_nar_unpack_dir_entry(dest_path, input)
else:
raise Exception("node unrecognized key %r" % k)
def _nar_unpack(dest_path: str, reader: _NARFileReader) -> None:
_nar_expect_bytes(reader, b"nix-archive-1")
_nar_unpack_node(dest_path, reader)
decompress_empty = b""
class _Decompressor(Protocol):
def decompress(self, data: Buffer, max_length: int = -1) -> bytes: ...
@property
def eof(self) -> bool: ...
@property
def needs_input(self) -> bool: ...
class _DecompressReader(_NARFileReader):
def __init__(self, input: BinaryIO, decompressor: _Decompressor):
self.input = input
self.decompressor = decompressor
def read1(self, size):
while self.decompressor.needs_input:
piece_in = self.input.read1(8192)
piece = self.decompressor.decompress(piece_in, size)
if piece:
return piece
piece = self.decompressor.decompress(decompress_empty, size)
return piece
def finish(self):
piece_in = self.input.read()
if not self.decompressor.eof:
self.decompressor.decompress(piece_in)
def close(self):
self.input.close()
class _IdentityReader(_NARFileReader):
def __init__(self, input: BinaryIO):
self.input = input
def read1(self, size):
return self.input.read1(size)
def finish(self):
self.input.read()
def close(self):
self.input.close()
def _get_nar_reader(nar_archive_path: str) -> _NARFileReader:
decompressor: Optional[_Decompressor] = None
nar_reader: _NARFileReader
if nar_archive_path.endswith(".bz2"):
import bz2
decompressor = bz2.BZ2Decompressor()
elif nar_archive_path.endswith(".xz"):
import lzma
decompressor = lzma.LZMADecompressor(lzma.FORMAT_XZ)
if decompressor is None:
nar_reader = _IdentityReader(open(nar_archive_path, "rb"))
else:
nar_reader = _DecompressReader(open(nar_archive_path, "rb"), decompressor)
return nar_reader
[docs]
def nar_unpack(nar_path: str, dest_path: str) -> None:
"""Unpack a NAR archive (possibly compressed with xz or bz2) to a path.
Please note that a nar archive can contain a single file instead of multiple
files and directories, in that case ``dest_path`` will target a file after
the unpacking.
Args:
nar_archive_path: A path to a NAR archive.
dest_path: The destination path where the NAR archive is extracted.
"""
with contextlib.closing(_get_nar_reader(nar_path)) as nar_reader:
_nar_unpack(dest_path, nar_reader)
nar_reader.finish()