Source code for swh.model.hypothesis_strategies

# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import datetime
import functools
import string
from typing import Any, Callable, List, Sequence, Set, Tuple, Union

from deprecated import deprecated
from hypothesis import assume
from hypothesis.extra.dateutil import timezones
from hypothesis.strategies import (
    SearchStrategy,
    binary,
    booleans,
    builds,
    characters,
    composite,
    datetimes,
    dictionaries,
    from_regex,
    integers,
    just,
    lists,
    none,
    one_of,
    sampled_from,
    sets,
    text,
    tuples,
)

from .from_disk import DentryPerms
from .model import (
    BaseContent,
    BaseModel,
    Content,
    Directory,
    DirectoryEntry,
    MetadataAuthority,
    MetadataFetcher,
    ModelObjectType,
    Origin,
    OriginVisit,
    OriginVisitStatus,
    Person,
    RawExtrinsicMetadata,
    Release,
    ReleaseTargetType,
    Revision,
    RevisionType,
    SkippedContent,
    Snapshot,
    SnapshotBranch,
    SnapshotTargetType,
    Timestamp,
    TimestampWithTimezone,
)
from .swhids import ExtendedObjectType, ExtendedSWHID

pgsql_alphabet = characters(
    blacklist_categories=["Cs"],
    blacklist_characters=["\u0000"],
)  # postgresql does not like these


[docs] def optional(strategy): return one_of(none(), strategy)
[docs] def pgsql_text(): return text(alphabet=pgsql_alphabet)
[docs] def sha1_git(): return binary(min_size=20, max_size=20)
[docs] def sha1(): return binary(min_size=20, max_size=20)
[docs] def binaries_without_bytes(blacklist: Sequence[int]): """Like hypothesis.strategies.binary, but takes a sequence of bytes that should not be included.""" return lists(sampled_from([i for i in range(256) if i not in blacklist])).map(bytes)
[docs] @composite def extended_swhids(draw): object_type = draw(sampled_from(ExtendedObjectType)) object_id = draw(sha1_git()) return ExtendedSWHID(object_type=object_type, object_id=object_id)
[docs] def aware_datetimes(): # datetimes in Software Heritage are not used for software artifacts # (which may be much older than 2000), but only for objects like scheduler # task runs, and origin visits, which were created by Software Heritage, # so at least in 2015. # We're forbidding old datetimes, because until 1956, many timezones had seconds # in their "UTC offsets" (see # <https://en.wikipedia.org/wiki/Time_zone#Worldwide_time_zones>), which is not # encodable in ISO8601; and we need our datetimes to be ISO8601-encodable in the # RPC protocol min_value = datetime.datetime(2000, 1, 1, 0, 0, 0) return datetimes(min_value=min_value, timezones=timezones())
[docs] @composite def iris(draw): protocol = draw(sampled_from(["git", "http", "https", "deb"])) domain = draw(from_regex(r"\A([a-z]([a-z0-9é🏛️-]*)\.){1,3}([a-z0-9é])+\Z")) return "%s://%s" % (protocol, domain)
[docs] @composite def persons_d(draw): fullname = draw(binary()) email = draw(optional(binary())) name = draw(optional(binary())) assume(not (len(fullname) == 32 and email is None and name is None)) return dict(fullname=fullname, name=name, email=email)
[docs] def persons(**kwargs): return persons_d(**kwargs).map(Person.from_dict)
[docs] def timestamps_d(**kwargs): max_seconds = datetime.datetime.max.replace( tzinfo=datetime.timezone.utc ).timestamp() min_seconds = datetime.datetime.min.replace( tzinfo=datetime.timezone.utc ).timestamp() # in Python 3.9, datetime.datetime.max is 9999-12-31T23:59:59.999999, which # means its .timestamp() is 253402300799.999999 in UTC. Unfortunately, because of # flotting-point loss of precision, this is rounded up to 253402300800.0, which # is the timestamp of 10000-01-01T00:00:00 in UTC, which cannot be passed to # datetime.datetime.fromtimestamp because it overflows. # To work around this issue, we move from max_seconds and min_seconds one second # closer to Epoch, which is more than enough (actually, subtracting 20ms from # max_seconds is enough). max_seconds -= 1 min_seconds += 1 defaults = dict( seconds=integers(min_seconds, max_seconds), microseconds=integers(0, 1000000 - 1), ) return builds(dict, **{**defaults, **kwargs})
[docs] def timestamps(): return timestamps_d().map(Timestamp.from_dict)
[docs] @composite def timestamps_with_timezone_d( draw, *, timestamp=timestamps_d(), offset=integers(min_value=-14 * 60, max_value=14 * 60), negative_utc=booleans(), ): timestamp = draw(timestamp) offset = draw(offset) negative_utc = draw(negative_utc) assume(not (negative_utc and offset)) return dict(timestamp=timestamp, offset=offset, negative_utc=negative_utc)
timestamps_with_timezone = timestamps_with_timezone_d().map( TimestampWithTimezone.from_dict )
[docs] def origins_d(*, url=iris().filter(lambda iri: len(iri.encode()) < 2048)): return builds(dict, url=url)
[docs] def origins(**kwargs): return origins_d(**kwargs).map(Origin.from_dict)
[docs] def origin_visits_d(**kwargs): defaults = dict( visit=integers(1, 1000), origin=iris(), date=aware_datetimes(), type=pgsql_text(), ) return builds(dict, **{**defaults, **kwargs})
[docs] def origin_visits(**kwargs): return origin_visits_d(**kwargs).map(OriginVisit.from_dict)
[docs] def metadata_dicts(): return dictionaries(pgsql_text(), pgsql_text())
[docs] def origin_visit_statuses_d(**kwargs): defaults = dict( visit=integers(1, 1000), origin=iris(), type=optional(sampled_from(["git", "svn", "pypi", "debian"])), status=sampled_from( ["created", "ongoing", "full", "partial", "not_found", "failed"] ), date=aware_datetimes(), snapshot=optional(sha1_git()), metadata=optional(metadata_dicts()), ) return builds(dict, **{**defaults, **kwargs})
[docs] def origin_visit_statuses(**kwargs): return origin_visit_statuses_d(**kwargs).map(OriginVisitStatus.from_dict)
[docs] @composite def releases_d(draw, **kwargs): defaults = dict( target_type=sampled_from([x.value for x in ReleaseTargetType]), name=binary(), message=optional(binary()), synthetic=booleans(), target=sha1_git(), metadata=optional(revision_metadata()), raw_manifest=optional(binary()), ) d = draw( one_of( # None author/date: builds(dict, author=none(), date=none(), **{**defaults, **kwargs}), # non-None author/date: builds( dict, date=timestamps_with_timezone_d(), author=persons_d(), **{**defaults, **kwargs}, ), # it is also possible for date to be None but not author, but let's not # overwhelm hypothesis with this edge case ) ) if d["raw_manifest"] is None: del d["raw_manifest"] return d
[docs] def releases(**kwargs): return releases_d(**kwargs).map(Release.from_dict)
revision_metadata = metadata_dicts
[docs] def extra_headers(): return lists( tuples(binary(min_size=0, max_size=50), binary(min_size=0, max_size=500)) ).map(tuple)
[docs] @composite def revisions_d(draw, **kwargs): defaults = dict( message=optional(binary()), synthetic=booleans(), parents=tuples(sha1_git()), directory=sha1_git(), type=sampled_from([x.value for x in RevisionType]), metadata=optional(revision_metadata()), extra_headers=extra_headers(), raw_manifest=optional(binary()), ) d = draw( one_of( # None author/committer/date/committer_date builds( dict, author=none(), committer=none(), date=none(), committer_date=none(), **{**defaults, **kwargs}, ), # non-None author/committer/date/committer_date builds( dict, author=persons_d(), committer=persons_d(), date=timestamps_with_timezone_d(), committer_date=timestamps_with_timezone_d(), **{**defaults, **kwargs}, ), # There are many other combinations, but let's not overwhelm hypothesis # with these edge cases ) ) # TODO: metadata['extra_headers'] can have binary keys and values if d["raw_manifest"] is None: del d["raw_manifest"] return d
[docs] def revisions(**kwargs): return revisions_d(**kwargs).map(Revision.from_dict)
[docs] def directory_entries_d(**kwargs): defaults = dict( name=binaries_without_bytes(b"/"), target=sha1_git(), ) return one_of( builds( dict, type=just("file"), perms=one_of( integers(min_value=0o100000, max_value=0o100777), # regular file integers(min_value=0o120000, max_value=0o120777), # symlink ), **{**defaults, **kwargs}, ), builds( dict, type=just("dir"), perms=integers( min_value=DentryPerms.directory, max_value=DentryPerms.directory + 0o777, ), **{**defaults, **kwargs}, ), builds( dict, type=just("rev"), perms=integers( min_value=DentryPerms.revision, max_value=DentryPerms.revision + 0o777, ), **{**defaults, **kwargs}, ), )
[docs] def directory_entries(**kwargs): return directory_entries_d(**kwargs).map(DirectoryEntry)
[docs] @composite def directories_d(draw, raw_manifest=optional(binary())): d = draw(builds(dict, entries=tuples(directory_entries_d()))) d["raw_manifest"] = draw(raw_manifest) if d["raw_manifest"] is None: del d["raw_manifest"] return d
[docs] def directories(**kwargs): return directories_d(**kwargs).map(Directory.from_dict)
[docs] def contents_d(): return one_of(present_contents_d(), skipped_contents_d())
[docs] def contents(): return one_of(present_contents(), skipped_contents())
[docs] def present_contents_d(**kwargs): defaults = dict( data=binary(max_size=4096), ctime=optional(aware_datetimes()), status=one_of(just("visible"), just("hidden")), ) return builds(dict, **{**defaults, **kwargs})
[docs] def present_contents(**kwargs): return present_contents_d().map(lambda d: Content.from_data(**d))
[docs] @composite def skipped_contents_d( draw, reason=pgsql_text(), status=just("absent"), ctime=optional(aware_datetimes()) ): result = BaseContent._hash_data(draw(binary(max_size=4096))) result.pop("data") nullify_attrs = draw( sets(sampled_from(["sha1", "sha1_git", "sha256", "blake2s256"])) ) for k in nullify_attrs: result[k] = None result["reason"] = draw(reason) result["status"] = draw(status) result["ctime"] = draw(ctime) return result
[docs] def skipped_contents(**kwargs): return skipped_contents_d().map(SkippedContent.from_dict)
[docs] def branch_names(): return binary(min_size=1)
[docs] def snapshot_targets_object_d(): return builds( dict, target=sha1_git(), target_type=sampled_from( [x.value for x in SnapshotTargetType if x.value not in ("alias",)] ), )
branch_targets_object_d = deprecated( version="v6.13.0", reason="use snapshot_targets_object_d" )(snapshot_targets_object_d)
[docs] def snapshot_targets_alias_d(): return builds( dict, target=sha1_git(), target_type=just("alias") ) # SnapshotTargetType.ALIAS.value))
branch_targets_alias_d = deprecated( version="v6.13.0", reason="use snapshot_targets_alias_d" )(snapshot_targets_alias_d)
[docs] def snapshot_targets_d(*, only_objects=False): if only_objects: return snapshot_targets_object_d() else: return one_of(snapshot_targets_alias_d(), snapshot_targets_object_d())
branch_targets_d = deprecated(version="v6.13.0", reason="use snapshot_targets_d")( snapshot_targets_d )
[docs] def snapshot_targets(*, only_objects=False): return builds( SnapshotBranch.from_dict, snapshot_targets_d(only_objects=only_objects) )
[docs] @composite def snapshots_d(draw, *, min_size=0, max_size=100, only_objects=False): branches = draw( dictionaries( keys=branch_names(), values=optional(snapshot_targets_d(only_objects=only_objects)), min_size=min_size, max_size=max_size, ) ) if not only_objects: # Make sure aliases point to actual branches unresolved_aliases = { branch: target["target"] for branch, target in branches.items() if ( target and target["target_type"] == "alias" and target["target"] not in branches ) } for alias_name, alias_target in unresolved_aliases.items(): # Override alias branch with one pointing to a real object # if max_size constraint is reached alias = alias_target if len(branches) < max_size else alias_name branches[alias] = draw(snapshot_targets_d(only_objects=True)) # Ensure no cycles between aliases while True: try: snapshot = Snapshot.from_dict( { "branches": { name: branch or None for (name, branch) in branches.items() } } ) except ValueError as e: for source, target in e.args[1]: branches[source] = draw(snapshot_targets_d(only_objects=True)) else: break return snapshot.to_dict()
[docs] def snapshots(*, min_size=0, max_size=100, only_objects=False): return snapshots_d( min_size=min_size, max_size=max_size, only_objects=only_objects ).map(Snapshot.from_dict)
[docs] def metadata_authorities(url=iris()): return builds(MetadataAuthority, url=url, metadata=just(None))
[docs] def metadata_fetchers(**kwargs): defaults = dict( name=text(min_size=1, alphabet=string.printable), version=text( min_size=1, alphabet=string.ascii_letters + string.digits + string.punctuation, ), ) return builds( MetadataFetcher, metadata=just(None), **{**defaults, **kwargs}, )
[docs] def raw_extrinsic_metadata(**kwargs): defaults = dict( target=extended_swhids(), discovery_date=aware_datetimes(), authority=metadata_authorities(), fetcher=metadata_fetchers(), format=text(min_size=1, alphabet=string.printable), ) return builds(RawExtrinsicMetadata, **{**defaults, **kwargs})
[docs] def raw_extrinsic_metadata_d(**kwargs): return raw_extrinsic_metadata(**kwargs).map(RawExtrinsicMetadata.to_dict)
def _tuplify(object_type: ModelObjectType, obj: BaseModel): return (object_type, obj)
[docs] def objects( # remove the Union once deprecated usage have been migrated blacklist_types: Union[Set[ModelObjectType] | Any] = { ModelObjectType.ORIGIN_VISIT_STATUS, }, split_content: bool = False, ): """generates a random couple (type, obj) which obj is an instance of the Model class corresponding to obj_type. `blacklist_types` is a list of obj_type to exclude from the strategy. If `split_content` is True, generates Content and SkippedContent under different obj_type, resp. "content" and "skipped_content". """ strategies: List[ Tuple[ModelObjectType, Callable[[], SearchStrategy[BaseModel]]] ] = [ (ModelObjectType.ORIGIN, origins), (ModelObjectType.ORIGIN_VISIT, origin_visits), (ModelObjectType.ORIGIN_VISIT_STATUS, origin_visit_statuses), (ModelObjectType.SNAPSHOT, snapshots), (ModelObjectType.RELEASE, releases), (ModelObjectType.REVISION, revisions), (ModelObjectType.DIRECTORY, directories), (ModelObjectType.RAW_EXTRINSIC_METADATA, raw_extrinsic_metadata), ] if split_content: strategies.append((ModelObjectType.CONTENT, present_contents)) strategies.append((ModelObjectType.SKIPPED_CONTENT, skipped_contents)) else: strategies.append((ModelObjectType.CONTENT, contents)) candidates = [ obj_gen().map(functools.partial(_tuplify, obj_type)) for (obj_type, obj_gen) in strategies if obj_type not in blacklist_types ] return one_of(*candidates)
[docs] def object_dicts( blacklist_types=(ModelObjectType.ORIGIN_VISIT_STATUS,), split_content=False ): """generates a random couple (type, dict) which dict is suitable for <ModelForType>.from_dict() factory methods. `blacklist_types` is a list of obj_type to exclude from the strategy. If `split_content` is True, generates Content and SkippedContent under different obj_type, resp. "content" and "skipped_content". """ strategies = [ (ModelObjectType.ORIGIN, origins_d), (ModelObjectType.ORIGIN_VISIT, origin_visits_d), (ModelObjectType.ORIGIN_VISIT_STATUS, origin_visit_statuses_d), (ModelObjectType.SNAPSHOT, snapshots_d), (ModelObjectType.RELEASE, releases_d), (ModelObjectType.REVISION, revisions_d), (ModelObjectType.DIRECTORY, directories_d), (ModelObjectType.RAW_EXTRINSIC_METADATA, raw_extrinsic_metadata_d), ] if split_content: strategies.append((ModelObjectType.CONTENT, present_contents_d)) strategies.append((ModelObjectType.SKIPPED_CONTENT, skipped_contents_d)) else: strategies.append((ModelObjectType.CONTENT, contents_d)) args = [ obj_gen().map(lambda x, obj_type=obj_type: (obj_type, x)) for (obj_type, obj_gen) in strategies if obj_type not in blacklist_types ] return one_of(*args)