# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from types import TracebackType
from typing import NewType, Optional, Type, cast
from cffi import FFI
from swh.perfecthash._hash_cffi import lib
Key = NewType("Key", bytes)
[docs]
class ShardCreator:
def __init__(self, path: str, object_count: int):
"""Create a Shard.
The file at ``path`` will be truncated if it already exists.
``object_count`` must match the number of objects that will be added
using the :meth:`write` method. A ``RuntimeError`` will be raised
on :meth:`finalize` in case of inconsistencies.
Ideally this should be done using a ``with`` statement, as such:
.. code-block:: python
with ShardCreator("shard", len(objects)) as shard:
for key, object in objects.items():
shard.write(key, object)
Otherwise, :meth:`prepare`, :meth:`write` and :meth:`finalize` must be
called in sequence.
Args:
path: path to the Shard file or device that will be written.
object_count: number of objects that will be written to the Shard.
"""
self.ffi = FFI()
self.path = path
self.object_count = object_count
self.shard = None
def __enter__(self) -> "ShardCreator":
self.prepare()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
if exc_type is not None:
self._destroy()
return
self.finalize()
def __del__(self):
if self.shard:
_ = lib.shard_destroy(self.shard)
def _destroy(self) -> None:
_ = lib.shard_destroy(self.shard)
self.shard = None
[docs]
def prepare(self) -> None:
"""Initialize the shard.
Raises:
RuntimeError: something went wrong while creating the Shard.
"""
assert self.shard is None, "prepare() has already been called"
self.shard = lib.shard_init(self.path.encode("utf-8"))
self.ffi.errno = 0
ret = lib.shard_prepare(self.shard, self.object_count)
if ret != 0:
raise OSError(self.ffi.errno, os.strerror(self.ffi.errno), self.path)
self.written_object_count = 0
[docs]
def finalize(self) -> None:
"""Finalize the Shard.
Write the index and the perfect hash table
that will be used to find the content of the objects from
their key.
Raises:
RuntimeError: if the number of written objects does not match ``object_count``,
or if something went wrong while saving.
"""
assert self.shard, "prepare() has not been called"
if self.object_count != self.written_object_count:
raise RuntimeError(
f"Only {self.written_object_count} objects were written "
f"when {self.object_count} were declared."
)
self.ffi.errno = 0
ret = lib.shard_finalize(self.shard)
if ret != 0:
errno = self.ffi.errno
if errno == 0:
raise RuntimeError(
"shard_finalize failed. Was there a duplicate key by any chance?"
)
else:
raise OSError(self.ffi.errno, os.strerror(errno), self.path)
self._destroy()
[docs]
def write(self, key: Key, object: bytes) -> None:
"""Add the key/object pair to the Read Shard.
Args:
key: the unique key associated with the object.
object: the object
Raises:
ValueError: if the key length is wrong, or if enough objects
have already been written.
RuntimeError: if something wrong happens when writing the object.
"""
assert self.shard, "prepare() has not been called"
if len(key) != Shard.key_len():
raise ValueError(f"key length is {len(key)} instead of {Shard.key_len()}")
if self.written_object_count >= self.object_count:
raise ValueError("The declared number of objects has already been written")
self.ffi.errno = 0
ret = lib.shard_object_write(self.shard, key, object, len(object))
if ret != 0:
raise OSError(self.ffi.errno, os.strerror(self.ffi.errno), self.path)
self.written_object_count += 1
[docs]
class Shard:
"""Files storing objects indexed with a perfect hash table.
This class allows creating a Read Shard by adding key/object pairs
and looking up the content of an object when given the key.
This class can act as a context manager, like so:
.. code-block:: python
with Shard("shard") as shard:
return shard.lookup(key)
"""
def __init__(self, path: str):
"""Open an existing Read Shard.
Args:
path: path to an existing Read Shard file or device
"""
self.ffi = FFI()
self.path = path
self.shard = lib.shard_init(self.path.encode("utf-8"))
self.ffi.errno = 0
ret = lib.shard_load(self.shard)
if ret != 0:
raise OSError(self.ffi.errno, os.strerror(self.ffi.errno), self.path)
def __del__(self) -> None:
if self.shard:
_ = lib.shard_destroy(self.shard)
[docs]
def close(self) -> None:
assert self.shard, "Shard has been closed already"
_ = lib.shard_destroy(self.shard)
self.shard = None
def __enter__(self) -> "Shard":
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.close()
[docs]
@staticmethod
def key_len():
return lib.shard_key_len
[docs]
def lookup(self, key: Key) -> bytes:
"""Fetch the object matching the key in the Read Shard.
Fetching an object is O(1): one lookup in the index to obtain
the offset of the object in the Read Shard and one read to get
the payload.
Args:
key: the key associated with the object to retrieve.
Returns:
the object as bytes.
Raises:
KeyError: the object has been deleted
RuntimeError: something went wrong during lookup
"""
assert self.shard, "Shard has been closed already"
if len(key) != Shard.key_len():
raise ValueError(f"key length is {len(key)} instead of {Shard.key_len()}")
self.ffi.errno = 0
object_size_pointer = self.ffi.new("uint64_t*")
ret = lib.shard_find_object(self.shard, key, object_size_pointer)
if ret == 1:
raise KeyError(key)
elif ret < 0:
errno = self.ffi.errno
if errno == 0:
raise RuntimeError(
f"shard_find_object failed. Mismatching key for {key.hex()} in the index?"
)
else:
raise OSError(self.ffi.errno, os.strerror(self.ffi.errno), self.path)
object_size = object_size_pointer[0]
object_pointer = self.ffi.new("char[]", object_size)
self.ffi.errno = 0
ret = lib.shard_read_object(self.shard, object_pointer, object_size)
if ret != 0:
errno = self.ffi.errno
if errno == 0:
raise RuntimeError(
f"shard_read_object failed. " f"{self.path} might be corrupted."
)
else:
raise OSError(errno, os.strerror(errno), self.path)
return cast(bytes, self.ffi.unpack(object_pointer, object_size))
[docs]
@staticmethod
def delete(path: str, key: Key):
"""Open the Shard file and delete the given key.
The object size and data will be overwritten by zeros. The Shard
file size and offsets are not changed for safety.
Args:
key: the key associated with the object to retrieve.
Raises:
KeyError: the object has been deleted
RuntimeError: something went wrong during lookup
"""
with Shard(path) as shard:
shard._delete(key)
def _delete(self, key: Key):
ret = lib.shard_delete(self.shard, key)
if ret == 1:
raise KeyError(key)
elif ret < 0:
raise RuntimeError("shard_delete failed")