Source code for swh.objstorage.backends.generator

from itertools import count, islice, repeat
import logging
import random
from typing import Generator, Iterator, Optional, cast

from swh.objstorage.constants import ID_HASH_ALGO
from swh.objstorage.interface import CompositeObjId, ObjId
from swh.objstorage.objstorage import DEFAULT_LIMIT, ObjStorage, timed

# we decorate methods with timed here to make tests pass without special care

logger = logging.getLogger(__name__)


[docs] class Randomizer: def __init__(self): self.size = 0 self.read(1024) # create a not-so-small initial buffer
[docs] def read(self, size): if size >= self.size: with open("/dev/urandom", "rb") as fobj: self.data = fobj.read(2 * size) self.size = len(self.data) # pick a random subset of our existing buffer idx = random.randint(0, self.size - size - 1) return self.data[idx : idx + size]
[docs] def gen_sizes(): """generates numbers according to the rought distribution of file size in the SWH archive """ # these are the histogram bounds of the pg content.length column bounds = [ 0, 2, 72, 119, 165, 208, 256, 300, 345, 383, 429, 474, 521, 572, 618, 676, 726, 779, 830, 879, 931, 992, 1054, 1119, 1183, 1244, 1302, 1370, 1437, 1504, 1576, 1652, 1725, 1806, 1883, 1968, 2045, 2133, 2236, 2338, 2433, 2552, 2659, 2774, 2905, 3049, 3190, 3322, 3489, 3667, 3834, 4013, 4217, 4361, 4562, 4779, 5008, 5233, 5502, 5788, 6088, 6396, 6728, 7094, 7457, 7835, 8244, 8758, 9233, 9757, 10313, 10981, 11693, 12391, 13237, 14048, 14932, 15846, 16842, 18051, 19487, 20949, 22595, 24337, 26590, 28840, 31604, 34653, 37982, 41964, 46260, 51808, 58561, 66584, 78645, 95743, 122883, 167016, 236108, 421057, 1047367, 55056238, ] nbounds = len(bounds) for i in count(): idx = random.randint(1, nbounds - 1) lower = bounds[idx - 1] upper = bounds[idx] yield random.randint(lower, upper - 1)
[docs] def gen_random_content(total=None, filesize=None): """generates random (file) content which sizes roughly follows the SWH archive file size distribution (by default). Args: total (int): the total number of objects to generate. Infinite if unset. filesize (int): generate objects with fixed size instead of random ones. """ randomizer = Randomizer() if filesize: gen = repeat(filesize) else: gen = gen_sizes() if total: gen = islice(gen, total) for objsize in gen: yield randomizer.read(objsize)
[docs] class RandomGeneratorObjStorage(ObjStorage): """A stupid read-only storage that generates blobs for testing purpose.""" name: str = "generator" def __init__(self, filesize=None, total=None, **kwargs): super().__init__(**kwargs) if filesize: filesize = int(filesize) self.filesize = filesize if total: total = int(total) self.total = total self._content_generator = None @property def content_generator(self): if self._content_generator is None: self._content_generator = gen_random_content(self.total, self.filesize) return self._content_generator
[docs] def check_config(self, *, check_write): return True
@timed def __contains__(self, obj_id, *args, **kwargs): return False def __iter__(self) -> Iterator[CompositeObjId]: i = 1 while True: j = yield {ID_HASH_ALGO: b"%d" % i} if self.total and i >= self.total: logger.debug("DONE") break if j is not None: i = j else: i += 1
[docs] @timed def get(self, obj_id, *args, **kwargs): return next(self.content_generator)
[docs] @timed def add(self, content, obj_id, check_presence=True, *args, **kwargs): pass
[docs] def check(self, obj_id, *args, **kwargs): return True
[docs] def delete(self, obj_id, *args, **kwargs): return True
[docs] def list_content( self, last_obj_id: Optional[ObjId] = None, limit: Optional[int] = DEFAULT_LIMIT, ) -> Iterator[CompositeObjId]: if isinstance(last_obj_id, dict): last_obj_id = last_obj_id[ID_HASH_ALGO] it = cast(Generator[CompositeObjId, int, None], iter(self)) if last_obj_id: next(it) it.send(int(last_obj_id)) return islice(it, limit)