Source code for swh.storage.proxies.masking.cli

# Copyright (C) 2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import csv
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    TextIO,
    Tuple,
)
import warnings

import click

from ...cli import storage as storage_cli_group

if TYPE_CHECKING:
    from swh.model.swhids import ExtendedSWHID

    from .db import MaskedState, MaskingRequest


[docs] def storage_config_iter(storage_config: Dict[str, Any]) -> Iterator[Dict[str, Any]]: yield storage_config if storage_config["cls"] == "pipeline": for step in storage_config["steps"]: yield step elif "storage" in storage_config: yield storage_config["storage"]
@storage_cli_group.group(name="masking") @click.pass_context def masking_cli_group(ctx: click.Context) -> click.Context: """Configure masking on archived objects These tools require read/write access to the masking database. An entry must be added to the configuration file as follow:: \b storage: \b masking_admin: cls: postgresql db: "service=swh-masking-admin" """ if "masking_admin" not in ctx.obj["config"] or ( "masking_db" not in ctx.obj["config"]["masking_admin"] and "db" not in ctx.obj["config"]["masking_admin"] ): ctx.fail( "You must have a masking_admin, with a db entry, " "configured in your config file." ) if "masking_db" in ctx.obj["config"]["masking_admin"]: warnings.warn( "Please use the db field for the masking admin configuration, which " "makes it compatible with the `swh db` command-line utilities", DeprecationWarning, ) from psycopg2 import OperationalError from .db import MaskingAdmin try: db = None for key in ("db", "masking_db"): db = ctx.obj["config"]["masking_admin"].get(key) if db: break if not db: assert False, "Existence of config entries has been checked earlier" ctx.obj["masking_admin"] = MaskingAdmin.connect(db) except OperationalError as ex: raise click.ClickException(str(ex)) return ctx
[docs] class EditAborted(BaseException): pass
[docs] def edit_message(prompt: str, extra_lines: List[str] = []): """Edit a message through click.edit() adding some extra context, filtering comments, and raising an exception on empty messages""" message = ( "\n\n" f"# {prompt}\n" "# Lines starting with “#” will be ignored. " "An empty message will abort the operation.\n" ) if extra_lines: message += "#\n# " + "\n# ".join(extra_lines) message = click.edit(message) if message is None: raise EditAborted() message = "\n".join( line.strip() for line in message.lstrip().split("\n") if not line.startswith("#") ).rstrip() if message == "": raise EditAborted() return message
[docs] class RequestType(click.ParamType): name = "request slug or uuid"
[docs] def convert(self, value, param, ctx) -> "MaskingRequest": # Try from UUID first from uuid import UUID try: uuid = UUID(value) except ValueError: pass else: request = ctx.obj["masking_admin"].find_request_by_id(uuid) if not request: raise click.ClickException(f"Request “{uuid}” not found from id.") return request # Try from slug request = ctx.obj["masking_admin"].find_request(value) if not request: raise click.ClickException(f"Request “{value}” not found.") return request
[docs] class SWHIDType(click.ParamType): name = "swhid"
[docs] def convert(self, value, param, ctx) -> "ExtendedSWHID": from swh.model.swhids import ExtendedSWHID, ValidationError try: return ExtendedSWHID.from_string(value) except ValidationError: raise click.ClickException(f"Unable to parse “{value}” as a SWHID.")
@masking_cli_group.command(name="new-request") @click.option( "-m", "--message", "reason", metavar="REASON", help="why the request was made" ) @click.argument("slug") @click.pass_context def new_request(ctx: click.Context, slug: str, reason: Optional[str] = None) -> None: """Create a new request to mask objects SLUG is a human-readable unique identifier for the request. It is an internal identifier that will be used in subsequent commands to address this newly recorded request. A reason for the request must be specified, either using the `-m` option or via the provided editor. """ from .db import DuplicateRequest if reason is None or reason == "": try: reason = edit_message(f"Please enter a reason for the request “{slug}”.") except EditAborted: raise click.ClickException("Aborting due to an empty reason.") try: request = ctx.obj["masking_admin"].create_request(slug, reason) except DuplicateRequest: raise click.ClickException(f"Request “{slug}” already exists.") click.echo( f"New request “{click.style(slug, fg='green', bold=True)}” " f"recorded with identifier {request.id}" ) @masking_cli_group.command(name="list-requests") @click.option( "-a", "--include-cleared-requests/--exclude-cleared-requests", default=False, help="Show requests without any masking state", ) @click.pass_context def list_requests(ctx: click.Context, include_cleared_requests: bool) -> None: """List masking requests""" def list_output() -> Iterator[str]: for request, mask_count in ctx.obj["masking_admin"].get_requests( include_cleared_requests=include_cleared_requests ): yield f"📄 {click.style(request.slug, bold=True)}\n" yield f"📊 {mask_count} object{'s' if mask_count != 1 else ''}\n" yield f"🪪 {request.id}\n" # XXX: humanize would be nice here as well yield f"📅 {request.date.strftime('%c %z')}\n" yield from output_message(request.reason) yield "\n" click.echo_via_pager(list_output())
[docs] class MaskedStateType(click.Choice): def __init__(self): from .db import MaskedState super().__init__( [format_masked_state(state) for state in MaskedState], case_sensitive=False )
[docs] def parse_masked_state(str: str) -> "MaskedState": from .db import MaskedState return MaskedState[str.replace("-", "_").upper()]
[docs] def format_masked_state(state: "MaskedState") -> str: return state.name.lower().replace("_", "-")
[docs] def read_swhids(file: TextIO) -> List["ExtendedSWHID"]: import re from swh.model.swhids import ExtendedSWHID filter_re = re.compile(r"^(#|$)") return [ ExtendedSWHID.from_string(line.strip()) for line in file.read().split("\n") if not filter_re.match(line) ]
@masking_cli_group.command(name="update-objects") @click.option("-m", "--message", help="an explanation for this change") @click.option("-f", "--file", type=click.File(), help="a file with on SWHID per line") @click.argument("request", metavar="SLUG", type=RequestType()) @click.argument("new-state", metavar="NEW_STATE", type=MaskedStateType()) @click.pass_context def update_objects( ctx: click.Context, request: "MaskingRequest", new_state: str, message: Optional[str] = None, file: Optional[TextIO] = None, ) -> None: """Update the state of given objects The masked state of the provided SWHIDs will be updated to NEW_STATE for the request SLUG. NEW_STATE must be one of “visible”, “decision-pending” or “restricted”. SWHIDs must be provided one per line, either via the standard input or a file specified via the `-f` option. `-` is synonymous for the standard input. An explanation for this change must be added to the request history. It can either be specified by the `-m` option or via the provided editor. """ import sys from swh.model.swhids import ValidationError from .db import RequestNotFound if file is None: file = sys.stdin masked_state = parse_masked_state(new_state) try: swhids = read_swhids(file) except ValidationError as ex: raise click.ClickException(ex.messages[0]) if len(swhids) == 0: raise click.ClickException("No SWHIDs given!") if message is None or message == "": try: message = edit_message( f"Please enter an explanation for this update on request “{request.slug}”." ) except EditAborted: raise click.ClickException("Aborting due to an empty explanation.") try: ctx.obj["masking_admin"].set_object_state(request.id, masked_state, swhids) ctx.obj["masking_admin"].record_history(request.id, message) except RequestNotFound: raise click.ClickException(f"Request with id “{request.id}” not found.") click.echo( f"Updated masking state for {len(swhids)} objects in request “{request.slug}”." ) @masking_cli_group.command(name="status") @click.argument("request", metavar="SLUG", type=RequestType()) @click.pass_context def status_cmd(ctx: click.Context, request: "MaskingRequest") -> None: """Get the masking states defined by a request""" states = ctx.obj["masking_admin"].get_states_for_request(request.id) for swhid, state in states.items(): click.echo(f"{swhid} {format_masked_state(state)}")
[docs] def output_message(message: str) -> Iterator[str]: import textwrap for i, line in enumerate(message.splitlines()): yield textwrap.fill( line, initial_indent="🗩 " if i == 0 else " ", subsequent_indent=" ", replace_whitespace=False, ) yield "\n"
@masking_cli_group.command(name="history") @click.argument("request", metavar="SLUG", type=RequestType()) @click.pass_context def history_cmd(ctx: click.Context, request: "MaskingRequest") -> None: """Get the history for a request""" def history_output() -> Iterator[str]: # XXX: we have no way to tie a record to a set of swhids or their state # the only thing we can do is display the messages yield f"History for request “{request.slug}” ({request.id}) \n" yield from output_message(request.reason) yield "\n" for record in ctx.obj["masking_admin"].get_history(request.id): # XXX: if we agree to bring humanize in the dependencies # relative_time = humanize.naturaltime( # dt.datetime.now(tz=dt.timezone.utc) - record.date # ) yield f"📅 {record.date.strftime('%c %z')}\n" for i, line in enumerate(record.message.splitlines()): yield from output_message(record.message) click.echo_via_pager(history_output()) @masking_cli_group.command(name="object-state") @click.argument("swhids", metavar="SWHID", nargs=-1, type=SWHIDType()) @click.pass_context def object_state(ctx: click.Context, swhids: List["ExtendedSWHID"]) -> None: """Get the masking state for a set of SWHIDs If an object given in the arguments is not listed in the output, it means no masking state is set in any requests. """ import itertools from .db import MaskedState STATE_TO_COLORS = { MaskedState.VISIBLE: "bright_green", MaskedState.DECISION_PENDING: "bright_yellow", MaskedState.RESTRICTED: "bright_red", } STATE_TO_LABEL = { MaskedState.VISIBLE: "visible", MaskedState.DECISION_PENDING: "decision-pending", MaskedState.RESTRICTED: "restricted", } def state_output() -> Iterator[str]: # find_masks() will group masks for the same SWHID for swhid, masks_iter in itertools.groupby( ctx.obj["masking_admin"].find_masks(swhids), key=lambda mask: mask.swhid ): masks = list(masks_iter) masked = any(mask.state != MaskedState.VISIBLE for mask in masks) yield ( f"{'masked ' if masked else 'visible'} " f"{click.style(str(masks[0].swhid), bold=masked)}\n" ) for mask in masks: yield click.style( f" {mask.request_slug}: {STATE_TO_LABEL[mask.state]}\n", fg=STATE_TO_COLORS[mask.state], ) click.echo_via_pager(state_output()) @masking_cli_group.command(name="clear-request") @click.option("-m", "--message", help="an explanation for this change") @click.argument("request", metavar="SLUG", type=RequestType()) @click.pass_context def clear_request( ctx: click.Context, request: "MaskingRequest", message: Optional[str] ) -> None: """Remove all masking states for the given request""" from .db import RequestNotFound if message is None or message == "": states = ctx.obj["masking_admin"].get_states_for_request(request.id) extra_lines = ["Associated object states:", ""] for swhid, state in states.items(): extra_lines.append(f"{swhid} {format_masked_state(state)}") try: message = edit_message( "Please enter an explanation for clearing masks" f" from request “{request.slug}”.", extra_lines=extra_lines, ) except EditAborted: raise click.ClickException("Aborting due to an empty explanation.") try: ctx.obj["masking_admin"].delete_masks(request.id) ctx.obj["masking_admin"].record_history(request.id, message) except RequestNotFound: raise click.ClickException(f"Request with id “{request.id}” not found.") click.echo(f"Masks cleared for request “{request.slug}”.") @masking_cli_group.group(name="patching") @click.pass_context def patching_cli_group(ctx: click.Context) -> click.Context: """Tools to manage the patching of objects""" return ctx
[docs] def read_display_names(file: Iterable) -> List[Tuple[bytes, bytes]]: data = list(csv.reader(file)) assert all(len(x) == 2 for x in data) return [(name.encode(), newname.encode()) for (name, newname) in data]
@patching_cli_group.command(name="set") @click.argument("input", type=click.File("r")) @click.option( "--clear/--keep", help="Clear the display names table before inserting new entries", ) @click.pass_context def set_patching_entries( ctx: click.Context, input, clear: bool, ) -> None: """Set display names (patching entries)""" display_names = read_display_names(input) db = ctx.obj["masking_admin"] db.set_display_names(display_names, clear=clear) click.echo("Display names updated")