Source code for swh.journal.serializers

# Copyright (C) 2016-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import datetime
from enum import Enum
from typing import Any, BinaryIO, Union

import msgpack

from swh.model.model import KeyType


[docs] class MsgpackExtTypeCodes(Enum): LONG_INT = 1 LONG_NEG_INT = 2
# this as been copied from swh.core.api.serializer # TODO refactor swh.core to make this function available def _msgpack_encode_longint(value): # needed because msgpack will not handle long integers with more than 64 bits # which we unfortunately happen to have to deal with from time to time if value > 0: code = MsgpackExtTypeCodes.LONG_INT.value else: code = MsgpackExtTypeCodes.LONG_NEG_INT.value value = -value length, rem = divmod(value.bit_length(), 8) if rem: length += 1 return msgpack.ExtType(code, int.to_bytes(value, length, "big"))
[docs] def msgpack_ext_encode_types(obj): if isinstance(obj, int): return _msgpack_encode_longint(obj) return obj
[docs] def msgpack_ext_hook(code, data): if code == MsgpackExtTypeCodes.LONG_INT.value: return int.from_bytes(data, "big") if code == MsgpackExtTypeCodes.LONG_NEG_INT.value: return -int.from_bytes(data, "big") raise ValueError("Unknown msgpack extended code %s" % code)
# for BW compat
[docs] def decode_types_bw(obj): if set(obj.keys()) == {b"d", b"swhtype"} and obj[b"swhtype"] == "datetime": return datetime.datetime.fromisoformat(obj[b"d"]) return obj
[docs] def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): return v if k == "url": return v.decode("utf-8") return v.hex()
[docs] def pprint_key(key: KeyType) -> str: """Pretty-print a kafka key""" if isinstance(key, dict): return "{%s}" % ", ".join( f"{k}: {stringify_key_item(k, v)}" for k, v in key.items() ) elif isinstance(key, bytes): return key.hex() else: return key
[docs] def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key)
[docs] def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" return msgpack.loads(kafka_key, raw=False)
[docs] def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" return msgpack.packb( value, use_bin_type=True, datetime=True, # encode datetime as msgpack.Timestamp default=msgpack_ext_encode_types, )
[docs] def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" return msgpack.unpackb( kafka_value, raw=False, object_hook=decode_types_bw, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) )
[docs] def kafka_stream_to_value(file_like: BinaryIO) -> msgpack.Unpacker: """Return a deserializer for data stored in kafka""" return msgpack.Unpacker( file_like, raw=False, object_hook=decode_types_bw, ext_hook=msgpack_ext_hook, strict_map_key=False, use_list=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) )