# Copyright (C) 2021-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from abc import ABC, abstractmethod
from enum import Enum
import json
import os
import sys
from typing import Any, Dict, Set
import ndjson
from swh.model.from_disk import Directory
from swh.model.swhids import CoreSWHID, ExtendedSWHID, QualifiedSWHID
from swh.web.client.client import WebAPIClient
from .dashboard.dashboard import run_app
from .data import MerkleNodeInfo
DEFAULT_OUTPUT = "text"
OUTPUT_MAP = {}
[docs]
class Color(Enum):
BLUE = "\033[94m"
GREEN = "\033[92m"
RED = "\033[91m"
END = "\033[0m"
[docs]
def colorize(text: str, color: Color):
return color.value + text + Color.END.value
def _register(name):
"""decorator to register an output class under mode `name`"""
def dec(cls):
OUTPUT_MAP[name] = cls
return cls
return dec
[docs]
def get_output_class(mode=DEFAULT_OUTPUT):
"""return the output class that correspond to `mode`"""
cls = OUTPUT_MAP.get(mode)
if cls is None:
raise Exception(f"mode {mode} is not an output format")
return cls
[docs]
class BaseOutput(ABC):
"""base class for object able to display scan result"""
def __init__(
self,
root_path: str,
nodes_data: MerkleNodeInfo,
source_tree: Directory,
config: Dict[str, Any],
web_client: WebAPIClient,
):
self.root_path = root_path
self.nodes_data = nodes_data
self.source_tree = source_tree
self.config = config
self.web_client = web_client
[docs]
def get_path_name(self, node):
return "path" if "path" in node.data.keys() else "data"
[docs]
@abstractmethod
def show(self):
pass
[docs]
@_register("summary")
class SummaryOutput(BaseOutput):
"""display a summary of the scan results"""
[docs]
def compute_summary(self):
directories_with_known_files = set()
total_files = 0
total_directories = 0
known_files = 0
full_known_directories = set()
partially_known_directories = set()
contents = set()
directories = set()
for node in self.source_tree.iter_tree():
if node.object_type == "content":
contents.add(node)
elif node.object_type == "directory":
directories.add(node)
else:
assert False, "unreachable"
total_files = len(contents)
for c in contents:
if self.nodes_data[c.swhid()]["known"]:
known_files += 1
path = c.data[self.get_path_name(c)]
dir_name = os.path.dirname(path)
directories_with_known_files.add(dir_name)
for d in directories:
if self.nodes_data[d.swhid()]["known"]:
path_name = self.get_path_name(d)
path = d.data[path_name]
full_known_directories.add(path)
self.compute_partially_known_recursive(
directories_with_known_files,
partially_known_directories,
full_known_directories,
self.source_tree,
)
total_directories = len(directories)
kp = known_files * 100 // total_files
fkp = len(full_known_directories) * 100 // total_directories
pkp = len(partially_known_directories) * 100 // total_directories
return {
"total_files": total_files,
"known_files": known_files,
"known_files_percent": kp,
"total_directories": total_directories,
"full_known_directories": full_known_directories,
"full_known_directories_percent": fkp,
"partially_known_directories": partially_known_directories,
"partially_known_directories_percent": pkp,
}
[docs]
def compute_partially_known_recursive(
self,
directories_with_known_files: Set[bytes],
partially_known_directories: Set[bytes],
full_known_directories: Set[bytes],
d: Directory,
):
"""Recursively compute partially known directories."""
path_name = self.get_path_name(d)
path = d.data[path_name]
partially_known = False
if path in full_known_directories:
return False
if path not in full_known_directories and path in directories_with_known_files:
partially_known_directories.add(path)
partially_known = True
for entry in d.values():
if entry.object_type == "directory":
partially_known_child = self.compute_partially_known_recursive(
directories_with_known_files,
partially_known_directories,
full_known_directories,
entry,
)
if partially_known_child:
partially_known_directories.add(path)
partially_known = partially_known or partially_known_child
return partially_known
[docs]
def show(self):
summary = self.compute_summary()
kp = summary["known_files_percent"]
fkp = summary["full_known_directories_percent"]
pkp = summary["partially_known_directories_percent"]
print(f"Files: {summary['total_files']:10d}")
print(f" known: {summary['known_files']:10d} ({kp:3d}%)")
print(f"directories: {summary['total_directories']:10d}")
print(
f" fully-known: {len(summary['full_known_directories']):10d} ({fkp:3d}%)"
)
print(
f" partially-known: {len(summary['partially_known_directories']):10d} ({pkp:3d}%)"
)
[docs]
@_register("text")
class TextOutput(BaseOutput):
"""display an exhaustive result of the scan in text form
note: as soon as the scan target something larger than a toy project, the
usability of this mode is poor."""
[docs]
def show(self) -> None:
isatty = sys.stdout.isatty()
for node in self.source_tree.iter_tree():
self.print_node(node, isatty, self._compute_level(node))
def _compute_level(self, node: Any):
node_path = str(node.data[self.get_path_name(node)]).split("/")
source_path = str(self.source_tree.data["path"]).split("/")
return len(node_path) - len(source_path)
[docs]
def print_node(self, node: Any, isatty: bool, level: int) -> None:
rel_path = os.path.basename(node.data[self.get_path_name(node)])
rel_path = rel_path.decode()
begin = "│ " * level
end = "/" if node.object_type == "directory" else ""
if isatty:
if not self.nodes_data[node.swhid()]["known"]:
rel_path = colorize(rel_path, Color.RED)
elif node.object_type == "directory":
rel_path = colorize(rel_path, Color.BLUE)
elif node.object_type == "content":
rel_path = colorize(rel_path, Color.GREEN)
print(f"{begin}{rel_path}{end}")
[docs]
class SWHIDEncoder(json.JSONEncoder):
[docs]
def default(self, value):
if isinstance(value, (CoreSWHID, ExtendedSWHID, QualifiedSWHID)):
return str(value)
else:
return super().default(value)
[docs]
@_register("json")
class JsonOutput(BaseOutput):
"""display the scan result in json"""
[docs]
def data_as_json(self):
json = {}
for node in self.source_tree.iter_tree():
rel_path = os.path.relpath(
node.data[self.get_path_name(node)].decode(),
self.source_tree.data["path"].decode(),
)
json[rel_path] = {"swhid": str(node.swhid())}
for k, v in self.nodes_data[node.swhid()].items():
json[rel_path][k] = v
return json
[docs]
def show(self):
print(
json.dumps(
self.data_as_json(),
indent=4,
sort_keys=True,
cls=SWHIDEncoder,
)
)
[docs]
@_register("ndjson")
class NDJsonTextOutput(JsonOutput):
"""display the scan result in newline-delimited json"""
[docs]
def show(self):
print(ndjson.dumps({k: v} for k, v in self.data_as_json().items()), flush=True)
[docs]
@_register("interactive")
class InteractiveDashboardOutput(SummaryOutput):
"""Dashboard to explore the scan results"""
[docs]
def show(self) -> None:
run_app(
self.config,
self.root_path,
self.source_tree,
self.nodes_data,
self.compute_summary(),
web_client=self.web_client,
)