Source code for swh.scanner.output
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from abc import ABC, abstractmethod
from enum import Enum
import json
import os
import sys
from typing import Any
import ndjson
from swh.model.from_disk import Directory
from .dashboard.dashboard import run_app
from .data import MerkleNodeInfo, get_directory_data
from .plot import generate_sunburst, offline_plot
DEFAULT_OUTPUT = "text"
OUTPUT_MAP = {}
[docs]
class Color(Enum):
BLUE = "\033[94m"
GREEN = "\033[92m"
RED = "\033[91m"
END = "\033[0m"
[docs]
def colorize(text: str, color: Color):
return color.value + text + Color.END.value
def _register(name):
"""decorator to register an output class under mode `name`"""
def dec(cls):
OUTPUT_MAP[name] = cls
return cls
return dec
[docs]
def get_output_class(mode=DEFAULT_OUTPUT):
"""return the output class that correspond to `mode`"""
cls = OUTPUT_MAP.get(mode)
if cls is None:
raise Exception(f"mode {mode} is not an output format")
return cls
[docs]
class BaseOutput(ABC):
"""base class for object able to display scan result"""
def __init__(
self,
root_path: str,
nodes_data: MerkleNodeInfo,
source_tree: Directory,
):
self.root_path = root_path
self.nodes_data = nodes_data
self.source_tree = source_tree
[docs]
def get_path_name(self, node):
return "path" if "path" in node.data.keys() else "data"
[docs]
@abstractmethod
def show(self):
pass
[docs]
@_register("summary")
class SummaryOuput(BaseOutput):
"""display a summary of the scan results"""
[docs]
def show(self):
directories_with_known_files = set()
total_files = 0
total_directories = 0
known_files = 0
full_known_directories = 0
partially_known_directories = 0
contents = []
directories = []
for node in self.source_tree.iter_tree():
if node.object_type == "content":
contents.append(node)
elif node.object_type == "directory":
directories.append(node)
else:
assert False, "unreachable"
total_files = len(contents)
for c in contents:
if self.nodes_data[c.swhid()]["known"]:
known_files += 1
path = c.data[self.get_path_name(c)]
dir_name = os.path.dirname(path)
directories_with_known_files.add(dir_name)
total_directories = len(directories)
for d in directories:
if self.nodes_data[d.swhid()]["known"]:
full_known_directories += 1
else:
path = d.data[self.get_path_name(d)]
if path in directories_with_known_files:
partially_known_directories += 1
kp = known_files * 100 // total_files
fkp = full_known_directories * 100 // total_directories
pkp = partially_known_directories * 100 // total_directories
print(f"Files: {total_files:10d}")
print(f" known: {known_files:10d} ({kp:3d}%)")
print(f"directories: {total_directories:10d}")
print(f" fully-known: {full_known_directories:10d} ({fkp:3d}%)")
print(f" partially-known: {partially_known_directories:10d} ({pkp:3d}%)")
print("(see other --output-format for more details)")
[docs]
@_register("text")
class TextOuput(BaseOutput):
"""display an exhaustive result of the scan in text form
note: as soon as the scan target something larger than a toy project, the
usability of this mode is poor."""
[docs]
def show(self) -> None:
isatty = sys.stdout.isatty()
for node in self.source_tree.iter_tree():
self.print_node(node, isatty, self._compute_level(node))
def _compute_level(self, node: Any):
node_path = str(node.data[self.get_path_name(node)]).split("/")
source_path = str(self.source_tree.data["path"]).split("/")
return len(node_path) - len(source_path)
[docs]
def print_node(self, node: Any, isatty: bool, level: int) -> None:
rel_path = os.path.basename(node.data[self.get_path_name(node)])
rel_path = rel_path.decode()
begin = "│ " * level
end = "/" if node.object_type == "directory" else ""
if isatty:
if not self.nodes_data[node.swhid()]["known"]:
rel_path = colorize(rel_path, Color.RED)
elif node.object_type == "directory":
rel_path = colorize(rel_path, Color.BLUE)
elif node.object_type == "content":
rel_path = colorize(rel_path, Color.GREEN)
print(f"{begin}{rel_path}{end}")
[docs]
@_register("json")
class JsonOuput(BaseOutput):
"""display the scan result in json"""
[docs]
def data_as_json(self):
json = {}
for node in self.source_tree.iter_tree():
rel_path = os.path.relpath(
node.data[self.get_path_name(node)].decode(),
self.source_tree.data["path"].decode(),
)
json[rel_path] = {"swhid": str(node.swhid())}
for k, v in self.nodes_data[node.swhid()].items():
json[rel_path][k] = v
return json
[docs]
def show(self):
print(json.dumps(self.data_as_json(), indent=4, sort_keys=True))
[docs]
@_register("nbjson")
class NDJsonTextOuput(JsonOuput):
"""display the scan result in newline-delimited json"""
[docs]
def show(self):
print(ndjson.dumps({k: v} for k, v in self.data_as_json().items()), flush=True)
[docs]
@_register("sunburst")
class SunburstOuput(BaseOutput):
"""display the scan result as a sunburst plot
note: as soon as the scan target something larger than a toy project, the
usability of this mode is poor."""
def _make_sunburst(self):
directory_data = get_directory_data(
self.root_path, self.source_tree, self.nodes_data
)
return generate_sunburst(directory_data, self.root_path)
[docs]
def show(self):
sunburst_figure = self._make_sunburst()
offline_plot(sunburst_figure)
[docs]
@_register("interactive")
class InteractiveSunburstOuput(SunburstOuput):
"""display the scan result as an interactive sunburst plot"""
[docs]
def show(self):
sunburst_figure = self._make_sunburst()
run_app(sunburst_figure, self.source_tree, self.nodes_data)