Source code for swh.scanner.data

# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
from pathlib import Path
import subprocess
from typing import Callable, Dict, List, Optional, Tuple
from xml.etree import ElementTree

from swh.model.exceptions import ValidationError
from swh.model.from_disk import Directory
from swh.model.swhids import CoreSWHID

from .client import Client

SUPPORTED_INFO = {"known", "origin"}

logger = logging.getLogger(__name__)


[docs] class MerkleNodeInfo(dict): """Store additional information about Merkle DAG nodes, using SWHIDs as keys""" def __setitem__(self, key, value): """The keys must be valid valid Software Heritage Persistent Identifiers while values must be dict. """ if not isinstance(key, CoreSWHID): raise ValidationError("keys must be valid SWHID(s)") if not isinstance(value, dict): raise ValidationError(f"values must be dict, not {type(value)}") super(MerkleNodeInfo, self).__setitem__(key, value)
[docs] def init_merkle_node_info(source_tree: Directory, data: MerkleNodeInfo, info: set): """Populate the MerkleNodeInfo with the SWHIDs of the given source tree and the attributes that will be stored. """ if not info: raise Exception("Data initialization requires node attributes values.") nodes_info: Dict[str, Optional[str]] = {} for ainfo in info: if ainfo in SUPPORTED_INFO: nodes_info[ainfo] = None else: raise Exception(f"Information {ainfo} is not supported.") for node in source_tree.iter_tree(): data[node.swhid()] = nodes_info.copy() # type: ignore
[docs] async def add_origin(source_tree: Directory, data: MerkleNodeInfo, client: Client): """Store origin information about software artifacts retrieved from the Software Heritage graph service. """ queue = [] queue.append(source_tree) while queue: for node in queue.copy(): queue.remove(node) node_ori = await client.get_origin(node.swhid()) if node_ori: data[node.swhid()]["origin"] = node_ori if node.object_type == "directory": for sub_node in node.iter_tree(): data[sub_node.swhid()]["origin"] = node_ori # type: ignore else: if node.object_type == "directory": children = [sub_node for sub_node in node.iter_tree()] children.remove(node) queue.extend(children) # type: ignore
[docs] def get_directory_data( root_path: str, source_tree: Directory, nodes_data: MerkleNodeInfo, directory_data: Dict = {}, ) -> Dict[Path, dict]: """Get content information for each directory inside source_tree. Returns: A dictionary with a directory path as key and the relative contents information as values. """ def _get_directory_data( source_tree: Directory, nodes_data: MerkleNodeInfo, directory_data: Dict ): directories = list( filter( lambda n: n.object_type == "directory", map(lambda n: n[1], source_tree.items()), ) ) for node in directories: directory_info = directory_content(node, nodes_data) rel_path = Path(node.data["path"].decode()).relative_to(Path(root_path)) directory_data[rel_path] = directory_info if has_dirs(node): _get_directory_data(node, nodes_data, directory_data) _get_directory_data(source_tree, nodes_data, directory_data) return directory_data
[docs] def directory_content(node: Directory, nodes_data: MerkleNodeInfo) -> Tuple[int, int]: """Count known contents inside the given directory. Returns: A tuple with the total number of contents inside the directory and the number of known contents. """ known_cnt = 0 node_contents = list( filter(lambda n: n.object_type == "content", map(lambda n: n[1], node.items())) ) for sub_node in node_contents: if nodes_data[sub_node.swhid()]["known"]: known_cnt += 1 return (len(node_contents), known_cnt)
[docs] def has_dirs(node: Directory) -> bool: """Check if the given directory has other directories inside.""" for _, sub_node in node.items(): if isinstance(sub_node, Directory): return True return False
[docs] def get_content_from( node_path: bytes, source_tree: Directory, nodes_data: MerkleNodeInfo ) -> Dict[bytes, dict]: """Get content information from the given directory node.""" # root in model.from_disk.Directory should be accessed with b"" directory = source_tree[node_path if node_path != source_tree.data["path"] else b""] node_contents = list( filter( lambda n: n.object_type == "content", map(lambda n: n[1], directory.items()) ) ) files_data = {} for node in node_contents: node_info = nodes_data[node.swhid()] node_info["swhid"] = str(node.swhid()) path_name = "path" if "path" in node.data.keys() else "data" files_data[node.data[path_name]] = node_info return files_data
def _call_vcs(command, cwd: Optional[Path], **cmd_kwargs): """Separate function for ease of overriding in tests""" return subprocess.run( command, check=True, capture_output=True, cwd=cwd, **cmd_kwargs )
[docs] def get_git_ignore_patterns(cwd: Optional[Path]): try: res = _call_vcs(["git", "status", "--ignored", "--no-renames", "-z"], cwd) except subprocess.CalledProcessError as e: logger.debug("Failed to call out to git [%d]: %s", e.stderr) return False, [] patterns = [] stdout = res.stdout if not stdout: # No status output, so no ignored files return True, [] # The `-z` CLI flag gives us a stable, null byte-separated output lines = stdout.split(b"\0") for line in lines: if not line: continue status, name = line.split(b" ", 1) if status != b"!!": # skip non-ignored files continue patterns.append(name.rstrip(b"/")) return True, patterns
[docs] def get_hg_ignore_patterns(cwd: Optional[Path]): try: res = _call_vcs( ["hg", "status", "--ignored", "--no-status", "-0"], cwd, env={"HGPLAIN": "1"}, ) except subprocess.CalledProcessError as e: logger.debug("Failed to call out to hg [%d]: %s", e.returncode, e.stderr) return False, [] stdout = res.stdout if not stdout: # No status output, so no ignored files return True, [] # The `-0` CLI flag gives us a stable, null byte-separated output patterns = [line for line in stdout.split(b"\0") if line] return True, patterns
[docs] def get_svn_ignore_patterns(cwd: Optional[Path]): try: res = _call_vcs(["svn", "status", "--no-ignore", "--xml"], cwd) except subprocess.CalledProcessError as e: logger.debug("Failed to call out to svn [%d]: %s", e.returncode, e.stderr) return False, [] patterns = [] stdout = res.stdout if not stdout: # No status output, so no ignored files return True, [] # We've asked for XML output since it's easily parsable and stable, unlike # the normal Subversion output. root = ElementTree.fromstring(stdout) status = root.find("target") assert status is not None for entry in status: path = entry.attrib["path"] wc_status = entry.find("wc-status") assert wc_status is not None entry_status = wc_status.attrib["item"] if entry_status == "ignored": # SVN uses UTF8 for all paths patterns.append(path.encode()) return True, patterns
# Associates a Version Control System to its on-disk folder and a method of # getting its ignore patterns. VCS_IGNORE_PATTERNS_METHODS: Dict[ str, Tuple[str, Callable[[Optional[Path]], Tuple[bool, List[bytes]]]] ] = { "git": (".git", get_git_ignore_patterns), "hg": (".hg", get_hg_ignore_patterns), "svn": (".svn", get_svn_ignore_patterns), }
[docs] def vcs_detected(folder_path: str) -> bool: try: return Path(folder_path).is_dir() except Exception as e: logger.debug("Got an exception while looking for %s: %s", folder_path, e) return False
[docs] def get_vcs_ignore_patterns(cwd: Optional[Path] = None) -> List[bytes]: """Return a list of all patterns to ignore according to the VCS used for the project being scanned, if any.""" ignore_patterns = [] for vcs, (folder_path, method) in VCS_IGNORE_PATTERNS_METHODS.items(): if vcs_detected(folder_path): logger.debug("Trying to get ignore patterns from '%s'", vcs) success, patterns = method(cwd) if success: logger.debug("Successfully obtained ignore patterns from '%s'", vcs) ignore_patterns.extend(patterns) break else: logger.debug("No VCS found in the current working directory") return ignore_patterns