Source code for swh.storage.proxies.record_references

# Copyright (C) 2023  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information


from typing import Dict, List, Mapping

from swh.model.model import Directory, OriginVisitStatus, Release, Revision, Snapshot
from swh.storage import get_storage
from swh.storage.interface import ObjectReference, StorageInterface


[docs] class RecordReferencesProxyStorage: """Automatically store object references when adding objects that have them""" def __init__(self, storage: Mapping): self.storage: StorageInterface = get_storage(**storage) def __getattr__(self, key: str): if key == "storage": raise AttributeError(key) return getattr(self.storage, key)
[docs] def directory_add(self, directories: List[Directory]) -> Dict[str, int]: object_references = [ ObjectReference( source=directory.swhid().to_extended(), target=entry.swhid().to_extended(), ) for directory in directories for entry in directory.entries ] stats = self.storage.object_references_add(object_references) stats.update(self.storage.directory_add(directories)) return stats
[docs] def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: object_references = [] for revision in revisions: object_references.append( ObjectReference( source=revision.swhid().to_extended(), target=revision.directory_swhid().to_extended(), ) ) for swhid in revision.parent_swhids(): object_references.append( ObjectReference( source=revision.swhid().to_extended(), target=swhid.to_extended(), ) ) stats = self.storage.object_references_add(object_references) stats.update(self.storage.revision_add(revisions)) return stats
[docs] def release_add(self, releases: List[Release]) -> Dict[str, int]: object_references = [] for release in releases: target_swhid = release.target_swhid() if target_swhid: object_references.append( ObjectReference( source=release.swhid().to_extended(), target=target_swhid.to_extended(), ) ) stats = self.storage.object_references_add(object_references) stats.update(self.storage.release_add(releases)) return stats
[docs] def snapshot_add(self, snapshots: List[Snapshot]) -> Dict[str, int]: object_references = [] for snapshot in snapshots: snapshot_swhid = snapshot.swhid().to_extended() for branch in snapshot.branches.values(): if not branch: continue target_swhid = branch.swhid() if target_swhid: object_references.append( ObjectReference( source=snapshot_swhid, target=target_swhid.to_extended(), ) ) stats = self.storage.object_references_add(object_references) stats.update(self.storage.snapshot_add(snapshots)) return stats
[docs] def origin_visit_status_add( self, visit_statuses: List[OriginVisitStatus], ) -> Dict[str, int]: object_references = [] for visit_status in visit_statuses: snapshot_swhid = visit_status.snapshot_swhid() if snapshot_swhid: object_references.append( ObjectReference( source=visit_status.origin_swhid(), target=snapshot_swhid.to_extended(), ) ) stats = self.storage.object_references_add(object_references) stats.update(self.storage.origin_visit_status_add(visit_statuses)) return stats