Source code for swh.scanner.data

# Copyright (C) 2021-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import concurrent.futures
import json
import logging
from os import path
from pathlib import Path
import subprocess
from typing import Callable, Dict, Iterator, List, Optional, Tuple, TypeVar, Union, cast
from xml.etree import ElementTree

import requests

from swh.core.utils import grouper
from swh.model.exceptions import ValidationError
from swh.model.from_disk import Content, Directory, FromDiskType
from swh.model.swhids import CoreSWHID, ObjectType, QualifiedSWHID
from swh.web.client.client import WebAPIClient

logger = logging.getLogger(__name__)


[docs] class MerkleNodeInfo(dict): """Store additional information about Merkle DAG nodes, using SWHIDs as keys""" def __setitem__(self, key, value): """The keys must be valid valid Software Heritage Persistent Identifiers while values must be dict. """ if not isinstance(key, CoreSWHID): raise ValidationError("keys must be valid SWHID(s)") if not isinstance(value, dict): raise ValidationError(f"values must be dict, not {type(value)}") super(MerkleNodeInfo, self).__setitem__(key, value)
[docs] def init_merkle_node_info( source_tree: Directory, data: MerkleNodeInfo, provenance: bool ) -> None: """Populate the MerkleNodeInfo with the SWHIDs of the given source tree The dictionary value are pre-filed with dictionary holding the information about the nodes. The "known" key is always stored as it is always fetched. The "provenance" key is stored if the `provenance` parameter is :const:`True`. """ nodes_info: Dict[str, Optional[str]] = {"known": None} if provenance: nodes_info["provenance"] = None for node in source_tree.iter_tree(): data[node.swhid()] = nodes_info.copy() return None
[docs] class NoProvenanceAPIAccess(RuntimeError): """Raise when the user have not Access to the Provenance API"""
def _get_provenance_info(client, swhid: CoreSWHID) -> Optional[QualifiedSWHID]: """find a revision or release and origin containing this object XXX This function is now only used by the "on demand" query that can be requested from the dashboard. Remove it whenever relevant XXX Revision and Release might not be found, we prioritize finding a Release over finding a Revision when possible. note: The quality of the result is not guaranteed whatsoever. Since the definition of "best" likely vary from one usage to the next, this API will evolve in the futur when this notion get better defined. For example, if we are looking for provenance information to detect prior art. We search for the first appearance of a content. So the "best answer" is the oldest content, something a bit tricky to determine as we can't fully trust the date of revision. On the other hand, if we try to known which library are used and at which version, to detect CVE or outdated dependencies, the best answer is the most recent release/revision in the authoritative origin relevant to a content. Finding the authoritative origin is a challenge in itclient. This function exist until we have some proper provenance entry point on the archive level. And will, hopefully, soon be removed. Args swhid: the SWHID of the Content or Directory to find info for Returns: None or QualifiedSWHID for the current Content or Directory. The QualifiedSWHID will have the following qualifiers set: - anchor: swhid of a Release or Revision containing it - origin: the origin containing this Release or Revision If no anchor could be found, this function return None. Raises: requests.HTTPError: if HTTP request fails """ if swhid.object_type not in (ObjectType.DIRECTORY, ObjectType.CONTENT): msg = "swhid should be %r or %r as parameter, not: %r" msg %= (ObjectType.DIRECTORY, ObjectType.CONTENT, swhid.object_type) raise ValueError(msg) return _call_whereis(client, swhid) def _call_whereis(client, swhid: CoreSWHID) -> Optional[QualifiedSWHID]: """manually call provenance's `whereis` endpoind The WebAPIClient will eventually support this natively. At that point this function should be remove in favor on calling the associated method on WebAPIClient. """ query = f"provenance/whereis/{swhid}/" try: with client._call(query) as r: raw_json = r.text if raw_json: result = json.loads(raw_json) else: result = None except requests.HTTPError as exc: r = exc.response if r.status_code == requests.codes.UNAUTHORIZED: raise NoProvenanceAPIAccess(r.text) if result is None: return None return QualifiedSWHID.from_string(result) # We tried 1000, but the API was suffering (504 and 503 return)/ # Bump again when this get more reliable MAX_WHEREARE_BATCH = 100 def _call_whereare(client, swhids: List[CoreSWHID]) -> List[Optional[QualifiedSWHID]]: """manually call provenance's `whereare` endpoind The WebAPIClient will eventually support this natively. At that point this function should be remove in favor on calling the associated method on WebAPIClient. """ query = "provenance/whereare/" args = [str(s) for s in swhids] try: with client._call(query, http_method="post", json=args) as r: result = r.json() except requests.HTTPError as exc: r = exc.response if r.status_code == requests.codes.UNAUTHORIZED: raise NoProvenanceAPIAccess(r.text) raise to_q = QualifiedSWHID.from_string return [to_q(q) if q is not None else q for q in result] _IN_MEM_NODE = Union[Directory, Content] MAX_CONCURRENT_PROVENANCE_QUERIES = 5 Item = TypeVar("Item") def _get_many_provenance_info( client, swhids: List[CoreSWHID] ) -> Iterator[Tuple[CoreSWHID, Optional[QualifiedSWHID]]]: """yield provenance data for multiple swhid For all SWHID we can find provenance data for, we will yield a (CoreSWHID, QualifiedSWHID) pair, (see provenance's API "whereis" documentation for the details on the QualifiedSWHID). SWHID for which we cannot find provenance yield a None value. note: We could drop the SWHID part of the pair and only return QualifiedSWHID, if they were some easy method for QualifiedSWHID → CoreSWHID conversion.""" # XXX note that this concurrency can be dealt with by # WebAPIClient._call_groups one the WebAPIClient grown function to fetch # provenance. with concurrent.futures.ThreadPoolExecutor( max_workers=MAX_CONCURRENT_PROVENANCE_QUERIES ) as executor: pending = {} for chunk in grouper(swhids, MAX_WHEREARE_BATCH): chunk = list(chunk) f = executor.submit(_call_whereare, client, chunk) pending[f] = chunk for future in concurrent.futures.as_completed(list(pending.keys())): provenances = future.result() sources = pending[future] yield from zip(sources, provenances) def _no_update_progress(*args, **kwargs): pass
[docs] def add_provenance( source_tree: Directory, data: MerkleNodeInfo, client: WebAPIClient, update_progress: Optional[Callable[[int, int], None]] = _no_update_progress, ): """Store provenance information about software artifacts retrieved from the Software Heritage graph service. """ # XXX Note that the current provenance handling highlight some limitation with # the underlying way we handle data in the scanner. # # Right now, we store unique data for each unique SWHID. However the same # SWHID appearing in different context might have different data relevant # to it. For example, the GPL Licence file might be found in multiple # places, and the best provenance for each of this location will likely # varies. # # So we might want to overhaul the way we connect data to be more path # centric (for some of it at least). It would also help us to put the # "path" qualifier, of QualifiedSWHID to use. if update_progress is None: update_progress = _no_update_progress all_queries: set[_IN_MEM_NODE] = set() done_queries: set[_IN_MEM_NODE] = set() seen: set[_IN_MEM_NODE] = set() current_boundary: dict[CoreSWHID, _IN_MEM_NODE] = {} next_boundary: dict[CoreSWHID, _IN_MEM_NODE] = {} # search for the initial boundary of "known" set initial_walk_queue: set[_IN_MEM_NODE] = {source_tree} while initial_walk_queue: node = initial_walk_queue.pop() if node in seen: continue seen.add(node) known: Optional[bool] = data[node.swhid()]["known"] if known is None or known: # We found a "root" for a known set, we should query it. current_boundary[node.swhid()] = node elif node.object_type == FromDiskType.DIRECTORY: # that node is unknown, no need to query it, but there might be # known set of descendant that need provenance queries. initial_walk_queue.update(node.values()) all_queries.update(current_boundary.values()) update_progress(len(done_queries), len(all_queries)) while current_boundary: boundary = list(current_boundary.keys()) for info in _get_many_provenance_info(client, boundary): swhid, qualified_swhid = info node = current_boundary.pop(swhid) done_queries.add(node) if qualified_swhid is not None: data[node.swhid()]["provenance"] = qualified_swhid if node.object_type == FromDiskType.DIRECTORY: node = cast(Directory, node) for sub_node in node.iter_tree(): if sub_node in seen: continue seen.add(sub_node) data[sub_node.swhid()]["provenance"] = qualified_swhid elif node.object_type == FromDiskType.DIRECTORY: for sub_node in node.values(): if sub_node in seen: continue seen.add(sub_node) all_queries.add(sub_node) next_boundary[sub_node.swhid()] = sub_node update_progress(len(done_queries), len(all_queries)) current_boundary = next_boundary next_boundary = {}
[docs] def has_dirs(node: Directory) -> bool: """Check if the given directory has other directories inside.""" for _, sub_node in node.items(): if isinstance(sub_node, Directory): return True return False
[docs] def get_content_from( node_path: bytes, source_tree: Directory, nodes_data: MerkleNodeInfo ) -> Dict[bytes, dict]: """Get content information from the given directory node.""" # root in model.from_disk.Directory should be accessed with b"" directory = source_tree[node_path if node_path != source_tree.data["path"] else b""] node_contents = list( filter( lambda n: n.object_type == "content", map(lambda n: n[1], directory.items()) ) ) files_data = {} for node in node_contents: node_info = nodes_data[node.swhid()] node_info["swhid"] = str(node.swhid()) path_name = "path" if "path" in node.data.keys() else "data" files_data[node.data[path_name]] = node_info return files_data
def _call_vcs(command, cwd: Optional[Path], **cmd_kwargs): """Separate function for ease of overriding in tests""" return subprocess.run( command, check=True, capture_output=True, cwd=cwd, **cmd_kwargs )
[docs] def get_git_ignore_patterns(cwd: Optional[Path]): try: res = _call_vcs(["git", "status", "--ignored", "--no-renames", "-z"], cwd) except subprocess.CalledProcessError as e: logger.debug("Failed to call out to git [%d]: %s", e.stderr) return False, [] patterns = [] stdout = res.stdout if not stdout: # No status output, so no ignored files return True, [] # The `-z` CLI flag gives us a stable, null byte-separated output lines = stdout.split(b"\0") for line in lines: if not line: continue status, name = line.split(b" ", 1) if status != b"!!": # skip non-ignored files continue patterns.append(name.rstrip(b"/")) return True, patterns
[docs] def get_hg_ignore_patterns(cwd: Optional[Path]): try: res = _call_vcs( ["hg", "status", "--ignored", "--no-status", "-0"], cwd, env={"HGPLAIN": "1"}, ) except subprocess.CalledProcessError as e: logger.debug("Failed to call out to hg [%d]: %s", e.returncode, e.stderr) return False, [] stdout = res.stdout if not stdout: # No status output, so no ignored files return True, [] # The `-0` CLI flag gives us a stable, null byte-separated output patterns = [line for line in stdout.split(b"\0") if line] return True, patterns
[docs] def get_svn_ignore_patterns(cwd: Optional[Path]): try: res = _call_vcs(["svn", "status", "--no-ignore", "--xml"], cwd) except subprocess.CalledProcessError as e: logger.debug("Failed to call out to svn [%d]: %s", e.returncode, e.stderr) return False, [] patterns = [] stdout = res.stdout if not stdout: # No status output, so no ignored files return True, [] # We've asked for XML output since it's easily parsable and stable, unlike # the normal Subversion output. root = ElementTree.fromstring(stdout) status = root.find("target") assert status is not None for entry in status: path = entry.attrib["path"] wc_status = entry.find("wc-status") assert wc_status is not None entry_status = wc_status.attrib["item"] if entry_status == "ignored": # SVN uses UTF8 for all paths patterns.append(path.encode()) return True, patterns
# Associates a Version Control System to its on-disk folder and a method of # getting its ignore patterns. VCS_IGNORE_PATTERNS_METHODS: Dict[ str, Tuple[str, Callable[[Optional[Path]], Tuple[bool, List[bytes]]]] ] = { "git": (".git", get_git_ignore_patterns), "hg": (".hg", get_hg_ignore_patterns), "svn": (".svn", get_svn_ignore_patterns), }
[docs] def vcs_detected(folder_path: Path) -> bool: try: return folder_path.is_dir() except Exception as e: logger.debug("Got an exception while looking for %s: %s", folder_path, e) return False
[docs] def get_vcs_ignore_patterns(cwd: Optional[Path] = None) -> List[bytes]: """Return a list of all patterns to ignore according to the VCS used for the project being scanned, if any.""" ignore_patterns = [] for vcs, (folder_name, method) in VCS_IGNORE_PATTERNS_METHODS.items(): if cwd is not None: folder_path = cwd / folder_name else: folder_path = Path(folder_name) if vcs_detected(folder_path): logger.debug("Trying to get ignore patterns from '%s'", vcs) success, patterns = method(cwd) if success: logger.debug("Successfully obtained ignore patterns from '%s'", vcs) ignore_patterns.extend(patterns) break else: logger.debug("No VCS found in the current working directory") return ignore_patterns
[docs] def get_ignore_patterns_templates() -> Dict[str, Path]: """Return a dict where keys are ignore templates names and value a path to the ignore definition file.""" here = Path(path.abspath(path.dirname(__file__))) gitignore_path = here / "resources" / "gitignore" assert gitignore_path.exists() skip = [".git", ".github"] templates = { item.stem: item for item in gitignore_path.rglob("*.gitignore") if set(item.parts).isdisjoint(skip) } return templates
[docs] def parse_ignore_patterns_template(source: Path) -> List[bytes]: """Given a file path to a gitignore template, return an ignore patterns list""" patterns: List[bytes] = [] assert source.exists() assert source.is_file() patterns_str = source.read_text() patterns_list = patterns_str.splitlines() for pattern in patterns_list: pattern = pattern.strip() if pattern and pattern.startswith("#") is False: patterns.append(pattern.encode()) return patterns