Source code for swh.model.validators
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from . import fields
from .exceptions import NON_FIELD_ERRORS, ValidationError
from .hashutil import MultiHash, hash_to_bytes
[docs]
def validate_content(content):
"""Validate that a content has the correct schema.
Args: a content (dictionary) to validate."""
def validate_content_status(status):
return fields.validate_enum(status, {"absent", "visible", "hidden"})
def validate_keys(content):
hashes = {"sha1", "sha1_git", "sha256"}
errors = []
out = True
if content["status"] == "absent":
try:
out = out and fields.validate_all_keys(content, {"reason", "origin"})
except ValidationError as e:
errors.append(e)
try:
out = out and fields.validate_any_key(content, hashes)
except ValidationError as e:
errors.append(e)
else:
try:
out = out and fields.validate_all_keys(content, hashes)
except ValidationError as e:
errors.append(e)
if errors:
raise ValidationError(errors)
return out
def validate_hashes(content):
errors = []
if "data" in content:
hashes = MultiHash.from_data(content["data"]).digest()
for hash_type, computed_hash in hashes.items():
if hash_type not in content:
continue
content_hash = hash_to_bytes(content[hash_type])
if content_hash != computed_hash:
errors.append(
ValidationError(
"hash mismatch in content for hash %(hash)s",
params={"hash": hash_type},
code="content-hash-mismatch",
)
)
if errors:
raise ValidationError(errors)
return True
content_schema = {
"sha1": (False, fields.validate_sha1),
"sha1_git": (False, fields.validate_sha1_git),
"sha256": (False, fields.validate_sha256),
"status": (True, validate_content_status),
"length": (True, fields.validate_int),
"ctime": (True, fields.validate_datetime),
"reason": (False, fields.validate_str),
"origin": (False, fields.validate_int),
"data": (False, fields.validate_bytes),
NON_FIELD_ERRORS: [validate_keys, validate_hashes],
}
return fields.validate_against_schema("content", content_schema, content)