swh.storage.validate module

swh.storage.validate.convert_validation_exceptions()[source]

Catches validation errors arguments, and re-raises a StorageArgumentException.

swh.storage.validate.dict_converter(model: Type[ModelObject], obj: Union[Dict, ModelObject]) → ModelObject[source]

Convert dicts to model objects; Passes through model objects as well.

class swh.storage.validate.ValidatingProxyStorage(storage)[source]

Bases: object

Storage implementation converts dictionaries to swh-model objects before calling its backend, and back to dicts before returning results

For test purposes.

content_add(content: Iterable[Union[swh.model.model.Content, Dict]]) → Dict[source]
content_add_metadata(content: Iterable[Union[swh.model.model.Content, Dict]]) → Dict[source]
skipped_content_add(content: Iterable[Union[swh.model.model.SkippedContent, Dict]]) → Dict[source]
directory_add(directories: Iterable[Union[swh.model.model.Directory, Dict]]) → Dict[source]
revision_add(revisions: Iterable[Union[swh.model.model.Revision, Dict]]) → Dict[source]
revision_get(revisions: Iterable[bytes]) → Iterator[Optional[Dict]][source]
revision_log(revisions: Iterable[bytes], limit: Optional[int] = None) → Iterator[Dict][source]
revision_shortlog(revisions: Iterable[bytes], limit: Optional[int] = None) → Iterator[Tuple[bytes, Tuple]][source]
release_add(releases: Iterable[Union[Dict, swh.model.model.Release]]) → Dict[source]
snapshot_add(snapshots: Iterable[Union[Dict, swh.model.model.Snapshot]]) → Dict[source]
origin_visit_add(visits: Iterable[swh.model.model.OriginVisit]) → Iterable[swh.model.model.OriginVisit][source]
origin_add(origins: Iterable[Union[Dict, swh.model.model.Origin]]) → Dict[str, int][source]
origin_add_one(origin: Union[Dict, swh.model.model.Origin]) → int[source]
clear_buffers(object_types: Optional[Iterable[str]] = None) → None[source]
flush(object_types: Optional[Iterable[str]] = None) → Dict[source]