Source code for swh.dataset.utils

# Copyright (C) 2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import sqlite3
import subprocess

try:
    # Plyvel shouldn't be a hard dependency if we want to use sqlite instead
    import plyvel
except ImportError:
    plyvel = None


[docs] class ZSTFile: """ Object-like wrapper around a ZST file. Uses a subprocess of the "zstd" command to compress and deflate the objects. """ def __init__(self, path: str, mode: str = "r"): if mode not in ("r", "rb", "w", "wb"): raise ValueError(f"ZSTFile mode {mode} is invalid.") self.path = path self.mode = mode def __enter__(self) -> "ZSTFile": is_text = not (self.mode in ("rb", "wb")) writing = self.mode in ("w", "wb") if writing: cmd = ["zstd", "-f", "-q", "-o", self.path] else: cmd = ["zstdcat", self.path] self.process = subprocess.Popen( cmd, text=is_text, stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) return self def __exit__(self, exc_type, exc_value, tb): self.process.stdin.close() self.process.stdout.close() self.process.wait()
[docs] def read(self, *args): return self.process.stdout.read(*args)
[docs] def write(self, buf): self.process.stdin.write(buf)
[docs] class SQLiteSet: """ On-disk Set object for hashes using SQLite as an indexer backend. Used to deduplicate objects when processing large queues with duplicates. """ def __init__(self, db_path): self.db_path = db_path def __enter__(self): self.db = sqlite3.connect(str(self.db_path)) self.db.execute( "CREATE TABLE IF NOT EXISTS" " tmpset (val TEXT NOT NULL PRIMARY KEY)" " WITHOUT ROWID" ) self.db.execute("PRAGMA synchronous = OFF") self.db.execute("PRAGMA journal_mode = OFF") return self def __exit__(self, exc_type, exc_val, exc_tb): self.db.commit() self.db.close()
[docs] def add(self, v: bytes) -> bool: """ Add an item to the set. Args: v: The value to add to the set. Returns: True if the value was added to the set, False if it was already present. """ try: self.db.execute("INSERT INTO tmpset(val) VALUES (?)", (v.hex(),)) except sqlite3.IntegrityError: return False else: return True
[docs] class LevelDBSet: """ On-disk Set object for hashes using LevelDB as an indexer backend. Used to deduplicate objects when processing large queues with duplicates. """ def __init__(self, db_path): self.db_path = db_path if plyvel is None: raise ImportError("Plyvel library not found, required for LevelDBSet") def __enter__(self): self.db = plyvel.DB(str(self.db_path), create_if_missing=True) return self def __exit__(self, exc_type, exc_val, exc_tb): self.db.close()
[docs] def add(self, v: bytes) -> bool: """ Add an item to the set. Args: v: The value to add to the set. Returns: True if the value was added to the set, False if it was already present. """ if self.db.get(v): return False else: self.db.put(v, b"T") return True
[docs] def remove_pull_requests(snapshot): """ Heuristic to filter out pull requests in snapshots: remove all branches that start with refs/ but do not start with refs/heads or refs/tags. """ # Copy the items with list() to remove items during iteration for branch_name, branch in list(snapshot["branches"].items()): original_branch_name = branch_name while branch and branch.get("target_type") == "alias": branch_name = branch["target"] branch = snapshot["branches"].get(branch_name) if branch is None or not branch_name: continue if branch_name.startswith(b"refs/") and not ( branch_name.startswith(b"refs/heads") or branch_name.startswith(b"refs/tags") ): snapshot["branches"].pop(original_branch_name)