Source code for swh.perfecthash

# Copyright (C) 2021-2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import os
from types import TracebackType
from typing import NewType, Optional, Type, cast

from cffi import FFI

from swh.perfecthash._hash_cffi import lib

Key = NewType("Key", bytes)


[docs] class ShardCreator: def __init__(self, path: str, object_count: int): """Create a Shard. The file at ``path`` will be truncated if it already exists. ``object_count`` must match the number of objects that will be added using the :meth:`write` method. A ``RuntimeError`` will be raised on :meth:`finalize` in case of inconsistencies. Ideally this should be done using a ``with`` statement, as such: .. code-block:: python with ShardCreator("shard", len(objects)) as shard: for key, object in objects.items(): shard.write(key, object) Otherwise, :meth:`prepare`, :meth:`write` and :meth:`finalize` must be called in sequence. Args: path: path to the Shard file or device that will be written. object_count: number of objects that will be written to the Shard. """ self.ffi = FFI() self.path = path self.object_count = object_count self.shard = None def __enter__(self) -> "ShardCreator": self.prepare() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: if exc_type is not None: self._destroy() return self.finalize() def __del__(self): if self.shard: _ = lib.shard_destroy(self.shard) def _destroy(self) -> None: _ = lib.shard_destroy(self.shard) self.shard = None
[docs] def prepare(self) -> None: """Initialize the shard. Raises: RuntimeError: something went wrong while creating the Shard. """ assert self.shard is None, "prepare() has already been called" self.shard = lib.shard_init(self.path.encode("utf-8")) self.ffi.errno = 0 ret = lib.shard_prepare(self.shard, self.object_count) if ret != 0: raise OSError(self.ffi.errno, os.strerror(self.ffi.errno), self.path) self.written_object_count = 0
[docs] def finalize(self) -> None: """Finalize the Shard. Write the index and the perfect hash table that will be used to find the content of the objects from their key. Raises: RuntimeError: if the number of written objects does not match ``object_count``, or if something went wrong while saving. """ assert self.shard, "prepare() has not been called" if self.object_count != self.written_object_count: raise RuntimeError( f"Only {self.written_object_count} objects were written " f"when {self.object_count} were declared." ) self.ffi.errno = 0 ret = lib.shard_finalize(self.shard) if ret != 0: errno = self.ffi.errno if errno == 0: raise RuntimeError( "shard_finalize failed. Was there a duplicate key by any chance?" ) else: raise OSError(self.ffi.errno, os.strerror(errno), self.path) self._destroy()
[docs] def write(self, key: Key, object: bytes) -> None: """Add the key/object pair to the Read Shard. Args: key: the unique key associated with the object. object: the object Raises: ValueError: if the key length is wrong, or if enough objects have already been written. RuntimeError: if something wrong happens when writing the object. """ assert self.shard, "prepare() has not been called" if len(key) != Shard.key_len(): raise ValueError(f"key length is {len(key)} instead of {Shard.key_len()}") if self.written_object_count >= self.object_count: raise ValueError("The declared number of objects has already been written") self.ffi.errno = 0 ret = lib.shard_object_write(self.shard, key, object, len(object)) if ret != 0: raise OSError(self.ffi.errno, os.strerror(self.ffi.errno), self.path) self.written_object_count += 1
[docs] class Shard: """Files storing objects indexed with a perfect hash table. This class allows creating a Read Shard by adding key/object pairs and looking up the content of an object when given the key. This class can act as a context manager, like so: .. code-block:: python with Shard("shard") as shard: return shard.lookup(key) """ def __init__(self, path: str): """Open an existing Read Shard. Args: path: path to an existing Read Shard file or device """ self.ffi = FFI() self.path = path self.shard = lib.shard_init(self.path.encode("utf-8")) self.ffi.errno = 0 ret = lib.shard_load(self.shard) if ret != 0: raise OSError(self.ffi.errno, os.strerror(self.ffi.errno), self.path) def __del__(self) -> None: if self.shard: _ = lib.shard_destroy(self.shard)
[docs] def close(self) -> None: assert self.shard, "Shard has been closed already" _ = lib.shard_destroy(self.shard) self.shard = None
def __enter__(self) -> "Shard": return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: self.close()
[docs] @staticmethod def key_len(): return lib.shard_key_len
[docs] def lookup(self, key: Key) -> bytes: """Fetch the object matching the key in the Read Shard. Fetching an object is O(1): one lookup in the index to obtain the offset of the object in the Read Shard and one read to get the payload. Args: key: the key associated with the object to retrieve. Returns: the object as bytes. """ assert self.shard, "Shard has been closed already" if len(key) != Shard.key_len(): raise ValueError(f"key length is {len(key)} instead of {Shard.key_len()}") self.ffi.errno = 0 object_size_pointer = self.ffi.new("uint64_t*") ret = lib.shard_find_object(self.shard, key, object_size_pointer) if ret != 0: errno = self.ffi.errno if errno == 0: raise RuntimeError( f"shard_find_object failed. Mismatching key for {key.hex()} in the index?" ) else: raise OSError(self.ffi.errno, os.strerror(self.ffi.errno), self.path) object_size = object_size_pointer[0] object_pointer = self.ffi.new("char[]", object_size) self.ffi.errno = 0 ret = lib.shard_read_object(self.shard, object_pointer, object_size) if ret != 0: errno = self.ffi.errno if errno == 0: raise RuntimeError( f"shard_read_object failed. " f"{self.path} might be corrupted." ) else: raise OSError(errno, os.strerror(errno), self.path) return cast(bytes, self.ffi.unpack(object_pointer, object_size))