# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import logging
import pathlib
import sys
from typing import (
TYPE_CHECKING,
Callable,
Dict,
Iterable,
Optional,
Set,
TextIO,
Tuple,
cast,
)
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
if TYPE_CHECKING:
from swh.model.model import Origin
from swh.model.swhids import ExtendedSWHID
from .operations import Remover
from .progressbar import ProgressBar, V
from .recovery_bundle import ObjectDecryptionKeyProvider, ShareDecryptionKeys
[docs]
class SWHIDType(click.ParamType):
name = "swhid"
[docs]
def convert(self, value, param, ctx) -> "ExtendedSWHID":
from swh.model.swhids import ExtendedSWHID, ValidationError
try:
return ExtendedSWHID.from_string(value)
except ValidationError:
raise click.ClickException(f"Unable to parse “{value}” as a SWHID.")
[docs]
class SwhidOrUrlParamType(click.ParamType):
name = "swhid or origin URL"
[docs]
def convert(self, value, param, ctx):
from swh.model.exceptions import ValidationError
from swh.model.model import Origin
from swh.model.swhids import ExtendedSWHID
if value.startswith("swh:1:"):
try:
return ExtendedSWHID.from_string(value)
except ValidationError:
self.fail(f"expected extended SWHID, got {value!r}", param, ctx)
else:
return Origin(url=value)
[docs]
class ClickLoggingHandler(logging.Handler):
"""Handler displaying logs using click.secho(), passing the style extra
attribute."""
[docs]
def emit(self, record):
if hasattr(record, "style"):
click.secho(self.format(record), **record.style)
else:
click.echo(self.format(record))
[docs]
def progressbar(
iterable: Optional[Iterable[V]] = None,
length: Optional[int] = None,
label: Optional[str] = None,
show_eta: bool = True,
show_pos: bool = False,
show_percent: Optional[bool] = None,
item_show_func: Optional[Callable[[V], str]] = None,
) -> ProgressBar[V]:
bar = click.progressbar(
iterable=iterable,
length=length,
label=label,
show_eta=show_eta,
show_pos=show_pos,
show_percent=show_percent,
item_show_func=item_show_func,
file=sys.stderr,
)
# We have to use `cast()` to renconcile the case where
# length is used and `click.progressbar()` returns a
# `ProgressBar[int]`. But in that case, iterable is not
# given, so V is not bound and it is safe to assume
# that V = int.
return cast("ProgressBar[V]", bar)
@swh_cli_group.group(name="alter", context_settings=CONTEXT_SETTINGS)
@click.pass_context
def alter_cli_group(ctx):
"""Archive alteration tools.
Location of the configuration should be specified through the environment
variable ``SWH_CONFIG_FILENAME``.
Expected config format:
\b
storage:
cls: remote
url: https://storage-cassandra-ro.softwareheritage.org
\b
graph:
url: "http://granet.internal.softwareheritage.org:5009/graph"
\b
restoration_storage:
cls: remote
url: https://storage-rw.softwareheritage.org
\b
removal_searches:
main:
cls: elasticsearch
hosts:
- elasticsearch:9200
\b
removal_storages:
old_primary:
cls: postgresql
db: "service=swh"
new_primary:
cls: cassandra
hosts:
- cassandra-seed
keyspace: swh
\b
removal_objstorages:
main:
cls: remote
url: https://objstorage.softwareheritage.org
\b
removal_journals:
main_journal:
cls: kafka
brokers:
- kafka1.internal.softwareheritage.org
prefix: swh.journal.objects
client_id: swh.alter.removals
\b
recovery_bundles:
secret_sharing:
minimum_required_groups: 2
groups:
legal:
minimum_required_shares: 1
recipient_keys:
"YubiKey serial 4245067 slot 1": age1yubikey1q2e37f74zzazz75mtggzql3at66pegemfnul0dtd7axctahljkvsqezscaq
"YubiKey serial 2284622 slot 3": age1yubikey4o1aypv83isatti92q1zasv1hkpuozlkoak4zd66t7poud23rftqrcszjgul
sysadmins:
minimum_required_shares: 1
recipient_keys:
"YubiKey serial 3862152 slot 1": age1yubikeyrupnxsu6uneqxw146g9szaofyxexiy4nhnzqg1ayb9b85g8h4oardwj6c212
"Ruby": age1y6epp27nq8n4faj8g8hkw8thcvj744y5vnr8jyfmp4857d6npc3qn9k7jz
\b
journal_writer:
cls: kafka
brokers:
- kafka1.internal.softwareheritage.org
prefix: swh.journal
client_id: swh.alter.removals
The identifier for the recipient key must be in the form of
“YubiKey serial ####### slot #” if the secret key is stored
on a YubiKey. Keys specified by any other identifiers will be
considered as plain age identities.
""" # noqa: B950
from swh.core import config
from .operations import logger as operations_logger
from .recovery_bundle import logger as recovery_bundle_logger
try:
conf = config.load_from_envvar()
except AssertionError as ex:
raise click.ClickException(ex.args[0])
ctx.ensure_object(dict)
ctx.obj["config"] = conf
for logger in (operations_logger, recovery_bundle_logger):
if not logger.propagate:
# Avoid configuring the logger twice
continue
logger.propagate = False
logger.addHandler(ClickLoggingHandler())
return ctx
[docs]
def read_swhids(file: TextIO) -> Set["ExtendedSWHID"]:
import re
from swh.model.swhids import ExtendedSWHID
filter_re = re.compile(r"^(#|$)")
return {
ExtendedSWHID.from_string(line.strip())
for line in file.read().split("\n")
if not filter_re.match(line)
}
[docs]
def get_remover(
ctx: click.Context,
dry_run: bool = False,
require_masking_admin: bool = False,
ignore_backends: Optional[Iterable[str]] = None,
) -> "Remover":
from psycopg2 import OperationalError, ProgrammingError
from swh.core.api import RemoteException
from swh.graph.http_client import GraphAPIError, RemoteGraphClient
from swh.journal.writer import get_journal_writer
from swh.journal.writer.kafka import KafkaJournalWriter
from swh.objstorage.factory import get_objstorage
from swh.objstorage.interface import ObjStorageInterface
from swh.search import get_search
from swh.search.interface import SearchInterface
from swh.storage import get_storage
from swh.storage.interface import ObjectDeletionInterface
from .operations import Remover
conf = ctx.obj["config"]
if ignore_backends is None:
ignore_backends = []
if "graph" not in conf:
if "graph" in ignore_backends:
graph_client = None
else:
raise click.ClickException("Configuration does not define `graph`")
else:
try:
graph_client = RemoteGraphClient(**conf["graph"])
except GraphAPIError as e:
raise click.ClickException(
f"Unable to connect to the graph server: {e.args}"
)
storage = get_storage(**conf["storage"])
if not dry_run:
if "restoration_storage" not in conf:
raise click.ClickException(
"Configuration does not define `restoration_storage`"
)
if "search" not in ignore_backends and (
"removal_searches" not in conf or len(conf["removal_searches"]) == 0
):
raise click.ClickException(
"Configuration does not define any `removal_searches`"
)
if "removal_storages" not in conf or len(conf["removal_storages"]) == 0:
raise click.ClickException(
"Configuration does not define any `removal_storages`"
)
if "removal_objstorages" not in conf or len(conf["removal_objstorages"]) == 0:
raise click.ClickException(
"Configuration does not define any `removal_objstorages`"
)
if "journal" not in ignore_backends and (
"removal_journals" not in conf or len(conf["removal_journals"]) == 0
):
raise click.ClickException(
"Configuration does not define any `removal_journals`"
)
restoration_storage = (
get_storage(**conf["restoration_storage"])
if "restoration_storage" in conf
else None
)
removal_searches = {}
for name, d in conf.get("removal_searches", {}).items():
removal_searches[name] = get_search(**d)
try:
removal_searches[name].check()
except RemoteException as e:
raise click.ClickException(f"Search “{name}” is unreachable: {e}")
removal_storages = {}
for name, d in conf.get("removal_storages", {}).items():
removal_storage = get_storage(**d)
assert hasattr(
removal_storage, "object_delete"
), f"storage “{name}” does not implement ObjectDeletionInterface"
removal_storages[name] = removal_storage
removal_objstorages = {}
for name, d in conf.get("removal_objstorages", {}).items():
removal_objstorages[name] = get_objstorage(**d)
removal_journals = {}
for name, d in conf.get("removal_journals", {}).items():
journal_writer = get_journal_writer(**d)
assert isinstance(
journal_writer, KafkaJournalWriter
), "journal writer is not kafka-based"
removal_journals[name] = journal_writer
known_missing = set(ctx.params.get("known_missing_swhids", set()))
if known_missing_file := ctx.params.get("known_missing_file"):
known_missing.update(read_swhids(known_missing_file))
if require_masking_admin:
from swh.storage.proxies.masking.db import MaskingAdmin
if "masking_admin" not in conf or "db" not in conf["masking_admin"]:
raise click.ClickException("masking_admin.db not found in configuration")
try:
masking_admin = MaskingAdmin.connect(conf["masking_admin"]["db"])
except (OperationalError, ProgrammingError) as e:
raise click.ClickException(f"Unable to connect to masking database: {e}")
else:
masking_admin = None
return Remover(
storage=storage,
graph_client=graph_client,
restoration_storage=restoration_storage,
removal_searches=cast(Dict[str, SearchInterface], removal_searches),
removal_storages=cast(Dict[str, ObjectDeletionInterface], removal_storages),
removal_objstorages=cast(Dict[str, ObjStorageInterface], removal_objstorages),
removal_journals=cast(Dict[str, KafkaJournalWriter], removal_journals),
masking_admin=masking_admin,
known_missing=known_missing,
progressbar=progressbar,
)
@alter_cli_group.command()
@click.option(
"--dry-run",
type=click.Choice(
["stop-before-recovery-bundle", "stop-before-removal"], case_sensitive=False
),
help="perform a trial run",
)
@click.option(
"--output-inventory-subgraph",
type=click.File(mode="w", atomic=True),
)
@click.option(
"--output-removable-subgraph",
type=click.File(mode="w", atomic=True),
)
@click.option(
"--output-pruned-removable-subgraph",
type=click.File(mode="w", atomic=True),
)
@click.option(
"--identifier",
metavar="IDENTIFIER",
required=True,
help="identifier for this removal operation",
)
@click.option(
"--reason",
metavar="REASON",
help="reason for this removal operation",
)
@click.option(
"--expire",
metavar="YYYY-MM-DD",
type=click.DateTime(formats=["%Y-%m-%d"]),
help="date when the recovery bundle should be removed",
)
@click.option(
"--recovery-bundle",
metavar="PATH",
type=click.Path(dir_okay=False),
required=True,
help="path to the recovery bundle that will be created",
)
@click.option(
"--known-missing",
"known_missing_swhids",
metavar="SWHID",
type=SWHIDType(),
multiple=True,
help="object known to be missing from storage",
)
@click.option(
"--known-missing-file",
"known_missing_file",
metavar="PATH",
type=click.File(),
help=(
"file (or '-') with object known to be missing from storage, "
"one SWHID per line"
),
)
@click.option(
"--allow-empty-content-objects/--disallow-empty-content-objects",
"allow_empty_content_objects",
default=False,
help="Create recovery bundle even when data for Content object cannot be found",
)
@click.option(
"--ignore",
"ignore_backends",
default=None,
multiple=True,
type=click.Choice(["search", "journal", "graph"], case_sensitive=False),
help=(
"Do not make the given backend mandatory when checking the configuration; "
"this command is usually meant to remove objects from all possible data silos, "
"so the default behavior is to have all of them mandatory in the configuration "
"file. Using this option allows to explicitly ignore the given 'backend'"
),
)
@click.argument(
"requested",
metavar="<SWHID|URL>..",
type=SwhidOrUrlParamType(),
required=True,
nargs=-1,
)
@click.pass_context
def remove(
ctx,
requested: Tuple["Origin" | "ExtendedSWHID", ...],
dry_run: bool,
output_inventory_subgraph,
output_removable_subgraph,
output_pruned_removable_subgraph,
identifier,
reason,
expire,
recovery_bundle,
known_missing_swhids,
known_missing_file,
allow_empty_content_objects,
ignore_backends,
) -> None:
"""Remove the given SWHIDs or URLs from the archive."""
from swh.journal.writer import JournalWriterInterface, get_journal_writer
from swh.model.model import Origin
from .inventory import RootsNotFound, StuckInventoryException
from .notifications import RemovalNotification
from .operations import RemoverError
from .recovery_bundle import ContentDataNotFound, SecretSharing
try:
secret_sharing = SecretSharing.from_dict(
ctx.obj["config"]["recovery_bundles"]["secret_sharing"]
)
except ValueError as e:
raise click.ClickException(f"Wrong secret sharing configuration: {e.args[0]}")
if dry_run != "stop-before-recovery-bundle":
try:
p = pathlib.Path(recovery_bundle)
p.touch(exist_ok=False)
p.unlink()
except FileExistsError:
raise click.ClickException(f"File “{recovery_bundle}” already exists")
except PermissionError:
raise click.ClickException(f"Permission denied: “{recovery_bundle}”")
remover = get_remover(ctx, dry_run, ignore_backends=ignore_backends)
swhids = [x.swhid() if isinstance(x, Origin) else x for x in requested]
journal_writer: Optional[JournalWriterInterface] = None
if "journal_writer" in ctx.obj["config"]:
cfg = ctx.obj["config"]["journal_writer"]
journal_writer = get_journal_writer(**cfg)
try:
removable = remover.get_removable(
swhids,
output_inventory_subgraph=output_inventory_subgraph,
output_removable_subgraph=output_removable_subgraph,
output_pruned_removable_subgraph=output_pruned_removable_subgraph,
)
removable.print_plan()
if dry_run == "stop-before-recovery-bundle":
click.echo("Stopping before creating the recovery bundle as requested.")
ctx.exit(0)
if dry_run is None:
click.confirm(
click.style(
"Proceed?",
fg="yellow",
bold=True,
),
abort=True,
)
decryption_key = remover.create_recovery_bundle(
secret_sharing=secret_sharing,
requested=list(requested),
removable=removable,
recovery_bundle_path=recovery_bundle,
removal_identifier=identifier,
reason=reason,
expire=expire.astimezone() if expire else None,
allow_empty_content_objects=allow_empty_content_objects,
)
click.secho(f"Recovery bundle decryption key: {decryption_key}", fg="blue")
except RemoverError as e:
click.secho(e.args[0], err=True, fg="red")
ctx.exit(1)
except RootsNotFound as e:
click.secho(
"Some requested objects were not found:",
err=True,
fg="red",
bold=True,
)
for label in e.get_labels(requested):
click.secho(f"- {label}", err=True)
ctx.exit(1)
except StuckInventoryException as e:
click.secho(
"Inventory phase got stuck. "
"Unable to learn the complete set of what these objects reference:",
err=True,
fg="red",
bold=True,
)
click.secho("\n".join(f"- {swhid}" for swhid in e.swhids), err=True, fg="red")
ctx.exit(1)
except ContentDataNotFound as e:
click.secho(
f"Content “{e.swhid}” exists, but its data was not found.",
err=True,
fg="red",
bold=True,
)
click.secho(
"Consider using `--allow-empty-content-objects` but only "
"if the above is expected.",
err=True,
fg="yellow",
)
ctx.exit(1)
if dry_run == "stop-before-removal":
click.echo("Stopping before removal.")
ctx.exit(0)
try:
remover.remove()
except Exception as e:
click.secho(str(e), err=True, fg="red", bold=True)
click.secho("Rolling back…", fg="cyan")
remover.restore_recovery_bundle()
ctx.exit(1)
else:
if journal_writer is not None:
click.secho("Sending removal notification…", fg="cyan")
notif = RemovalNotification(
removal_identifier=identifier,
reason=reason,
requested=list(swhids),
removed_objects=remover.swhids_to_remove,
)
journal_writer.write_addition("removal_notification", notif)
journal_writer.flush()
click.secho("Removal notification sent.", fg="green")
@alter_cli_group.command("list-candidates")
@click.option(
"--omit-referenced/--no-omit-referenced",
default=True,
help="Omit candidates that are referenced by other objects",
)
@click.argument(
"requested",
metavar="<SWHID|URL>..",
type=SwhidOrUrlParamType(),
required=True,
nargs=-1,
)
@click.pass_context
def list_candidates(
ctx: click.Context,
requested: Tuple["Origin" | "ExtendedSWHID", ...],
omit_referenced: bool,
):
"""List candidates for an altering operation (e.g. removal)
Display a list of SWHIDs of objects that would be affected by an altering
operation targeting the SWHIDs (or origin URLs) given as arguments.
Candidates referenced by objects in the graph outside the set of candidates
will be filtered out, unless `--no-omit-referenced` is given.
"""
from swh.graph.http_client import GraphAPIError, RemoteGraphClient
from swh.model.model import Origin
from swh.storage import get_storage
from .inventory import (
RootsNotFound,
StuckInventoryException,
get_raw_extrinsic_metadata,
make_inventory,
)
from .removable import mark_removable
conf = ctx.obj["config"]
try:
graph_client = RemoteGraphClient(**conf["graph"])
except GraphAPIError as e:
raise click.ClickException(f"Unable to connect to the graph server: {e.args}")
storage = get_storage(**conf["storage"])
swhids = [x.swhid() if isinstance(x, Origin) else x for x in requested]
try:
subgraph = make_inventory(
storage, graph_client, swhids, progressbar=progressbar
)
except StuckInventoryException as e:
click.secho(
"Inventory phase got stuck. "
"Unable to learn the complete set of what these objects reference:",
err=True,
fg="red",
bold=True,
)
click.secho("\n".join(f"- {swhid}" for swhid in e.swhids), err=True, fg="red")
ctx.exit(1)
except RootsNotFound as e:
click.secho(
"Some requested objects were not found:",
err=True,
fg="red",
bold=True,
)
for label in e.get_labels(requested):
click.secho(f"- {label}", err=True)
ctx.exit(1)
if omit_referenced:
subgraph = mark_removable(
storage, graph_client, subgraph, progressbar=progressbar
)
subgraph.delete_unremovable()
removable_swhids = list(subgraph.swhids())
removable_swhids.extend(
get_raw_extrinsic_metadata(storage, removable_swhids, progressbar=progressbar)
)
for swhid in removable_swhids:
click.echo(swhid)
@alter_cli_group.command("run-mirror-notification-watcher")
@click.pass_context
def run_mirror_notification_watcher(ctx: click.Context):
"""Watch the journal for notifications from the main archive.
For removal notifications, we mask the associated objects until
a decision is made by the mirror operators.
Example configuration:
\b
journal_client:
brokers: kafka.example.org:9092
group_id: mirror-notification-watcher
prefix: swh.journal
storage:
cls: remote
url: https://storage-ro
masking_admin:
cls: postgresql
db: service=masking-db-rw
emails:
from: swh-mirror@example.org
recipients:
- trinity@example.org
- neo@example.org
smtp:
host: localhost
port: 25
The addresses listed as “recipients” in the “emails” section will receive
an email to let them know that a decision needs to be taken.
"""
from swh.alter.mirror_notification_watcher import MirrorNotificationWatcher
from swh.journal.client import get_journal_client
from swh.storage import get_storage
from swh.storage.proxies.masking.db import MaskingAdmin
conf = ctx.obj["config"]
try:
storage = get_storage(**conf["storage"])
storage.check_config(check_write=False)
except Exception as e:
raise click.ClickException(f"Unable to query to the storage: {e}")
try:
journal_client = get_journal_client(
**{
**conf["journal_client"],
"cls": "kafka",
"object_types": ["removal_notification"],
}
)
except Exception as e:
raise click.ClickException(f"Unable to setup the journal client: {repr(e)}")
try:
masking_admin_dsn = conf["masking_admin"]["db"]
_ = MaskingAdmin.connect(masking_admin_dsn)
except Exception as e:
raise click.ClickException(
f"Unable to connect to the masking proxy database: {repr(e)}"
)
emails_from = conf["emails"].get("from")
if emails_from is None:
raise click.ClickException("“emails.from” has not been set.")
emails_recipients = conf["emails"].get("recipients")
if emails_recipients is None:
raise click.ClickException("“emails.recipients” has not been set.")
if not isinstance(emails_recipients, list) or len(emails_recipients) < 1:
raise click.ClickException(
"“emails.recipients” must be a list and contain at least one email address."
)
smtp_host = conf["smtp"].get("host")
if smtp_host is None:
raise click.ClickException("“smtp.host” has not been set.")
try:
smtp_port = int(conf["smtp"].get("port"))
except Exception as e:
raise click.ClickException(f"“smtp.port” must be set to a port number: {e}")
watcher = MirrorNotificationWatcher(
storage=storage,
journal_client=journal_client,
masking_admin_dsn=masking_admin_dsn,
emails_from=emails_from,
emails_recipients=emails_recipients,
smtp_host=smtp_host,
smtp_port=smtp_port,
)
try:
watcher.watch()
except KeyboardInterrupt:
ctx.exit(0)
@alter_cli_group.group(name="recovery-bundle", context_settings=CONTEXT_SETTINGS)
@click.pass_context
def recovery_bundle_cli_group(ctx):
"""Recovery bundle related tools."""
return ctx
@recovery_bundle_cli_group.command(name="info")
@click.option(
"--dump-manifest",
is_flag=True,
default=False,
help="Show raw manifest in YAML format.",
)
@click.option(
"--show-encrypted-secrets",
is_flag=True,
default=False,
help="Show encrypted secrets.",
)
@click.argument(
"recovery-bundle",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
)
@click.pass_context
def info(ctx, recovery_bundle, dump_manifest, show_encrypted_secrets) -> None:
"""Display the manifest of the given recovery bundle."""
from swh.model.model import Origin
from .recovery_bundle import RecoveryBundle
bundle = RecoveryBundle(recovery_bundle)
if dump_manifest:
click.echo(bundle.dump_manifest(), nl=False)
ctx.exit()
title = f"Recovery bundle “{bundle.removal_identifier}”"
click.echo(title)
click.echo("=" * len(title))
click.echo("")
click.echo(f"Created: {bundle.created.isoformat()}")
if bundle.reason:
lines = bundle.reason.rstrip().split("\n")
lines[0] = f"Reason: {lines[0]}"
click.echo("\n ".join(lines))
if bundle.expire:
click.echo(f"Expire: {bundle.expire}")
if bundle.version >= 3:
click.echo("Removal requested for:")
for x in bundle.requested:
click.echo(f"- {x.url if isinstance(x, Origin) else x}")
click.echo("SWHID of the objects present in the bundle:")
for swhid in bundle.swhids:
click.echo(f"- {swhid}")
if bundle.version >= 3 and len(bundle.referencing):
click.echo("SWHID referenced by objects in this bundle:")
for swhid in bundle.referencing:
click.echo(f"- {swhid}")
click.echo("Secret share holders:")
for share_id in sorted(bundle.share_ids):
click.echo(f"- {share_id}")
if show_encrypted_secrets:
click.echo(bundle.encrypted_secret(share_id))
def _share_decryption_keys_provider(share_ids: Set[str]) -> ShareDecryptionKeys:
import subprocess
import sys
from .recovery_bundle import list_yubikey_identities
for attempt in range(1, 10):
if not any(share_id.startswith("YubiKey") for share_id in share_ids):
# No shares require a YubiKey, so there is nothing we can do here
break
try:
for share_id, secret_key in list_yubikey_identities():
if share_id not in share_ids:
continue
share_ids.remove(share_id)
click.echo(
"🔧 Decrypting share using "
f"{click.style(share_id, fg='magenta', bold=True)}…"
)
click.echo("💭 You might need to tap the right YubiKey when it blinks.")
yield share_id, secret_key
click.echo()
except subprocess.CalledProcessError as ex:
if "age-plugin-yubikey" not in ex.cmd[0]:
raise
click.echo(
f"""💥 {click.style('age-plugin-yubikey failed to '
'list connected YubiKeys.', bold=True, fg='red')}"""
)
click.echo("💭 Please disconnect all YubiKeys and retry.")
sys.exit(1)
if share_ids:
yubikey_ids = list(sorted(share_ids))
if len(yubikey_ids) > 1:
yubikeys = ", ".join(
click.style(share_id, fg="magenta", bold=True)
for share_id in yubikey_ids[:-1]
)
yubikeys += " or " + click.style(
yubikey_ids[-1], fg="magenta", bold=True
)
else:
yubikeys = click.style(yubikey_ids[0], fg="magenta", bold=True)
click.prompt(
f"🔐 Please insert {yubikeys} and press "
f"{click.style('Enter', fg='green', bold=True)}…",
default="Ok",
show_default=False,
hide_input=True,
prompt_suffix="",
)
click.echo(
f"""💥 {click.style('Unable to decrypt enough shared secrets to recover '
'the object decryption key. Aborting.',
bold=True, fg='red')}"""
)
sys.exit(1)
def _print_decrypted_mnemonic(mnemonic: str, share_id: Optional[str] = None) -> None:
fmt_from = ""
if share_id:
fmt_from = f" from {click.style(share_id, fg='magenta', bold=True)}"
click.echo(f"🔑 Recovered shared secret{fmt_from}:")
# Quoting from SLIP-0039: This construction yields a beneficial
# property where the random identifier and the iteration exponent
# transform into the first two words of the mnemonic code, so the user
# can immediately tell whether the correct shares are being combined,
# i.e. they have to have the same first two words. Moreover, the third
# word encodes the group index, group threshold and part of the group
# count. Since the group threshold and group count are constant, all
# **shares belonging to the same group start with the same three words**.
words = mnemonic.split()
click.echo(
" ".join(
click.style(word, fg="blue", bold=index < 3)
for index, word in enumerate(words)
)
)
def _recover_mnemonics_from_identity_files(
manifest, share_ids, identity_files, show_decrypted_mnemonics
):
from .recovery_bundle import WrongDecryptionKey, age_decrypt_from_identity
# As we can’t know which identity file corresponds to which encrypted shared
# secret, we have to try them all and see which one we can actually decrypt.
recovered = {}
for identity_file in identity_files:
for share_id in share_ids:
try:
recovered[share_id] = age_decrypt_from_identity(
identity_file, manifest.decryption_key_shares[share_id]
).decode("us-ascii")
if show_decrypted_mnemonics:
_print_decrypted_mnemonic(recovered[share_id], share_id)
except WrongDecryptionKey:
pass
return recovered
[docs]
def prompting_object_decryption_key_provider(
manifest, known_mnemonics=None, identity_files=None, show_decrypted_mnemonics=False
) -> str:
import functools
from .recovery_bundle import recover_object_decryption_key_from_encrypted_shares
decrypted_mnemonic_processor = None
if show_decrypted_mnemonics:
decrypted_mnemonic_processor = _print_decrypted_mnemonic
share_ids = set(manifest.decryption_key_shares.keys())
# Normalize known_mnemonics
known_mnemonics = list(known_mnemonics or [])
if identity_files:
recovered = _recover_mnemonics_from_identity_files(
manifest, share_ids, identity_files, show_decrypted_mnemonics
)
share_ids.difference_update(recovered.keys())
known_mnemonics.extend(recovered.values())
yubikey_share_ids = set(
share_id for share_id in share_ids if share_id.startswith("YubiKey")
)
missing_ids = share_ids - yubikey_share_ids
if missing_ids:
fmt_ids = ", ".join(
click.style(share_id, fg="magenta", bold=True) for share_id in missing_ids
)
click.echo(
f"""\n🚸 {click.style('The following secret shares will not be '
'decrypted:', fg='yellow')} {fmt_ids}\n"""
)
return recover_object_decryption_key_from_encrypted_shares(
manifest.decryption_key_shares,
functools.partial(_share_decryption_keys_provider, yubikey_share_ids),
decrypted_mnemonic_processor=decrypted_mnemonic_processor,
known_mnemonics=known_mnemonics,
)
[docs]
def get_object_decryption_key_provider(ctx) -> ObjectDecryptionKeyProvider:
import functools
secrets = ctx.params.get("secret")
identity_files = ctx.params.get("identity")
object_decryption_key_provider: ObjectDecryptionKeyProvider = functools.partial(
prompting_object_decryption_key_provider,
known_mnemonics=secrets,
identity_files=identity_files,
)
decryption_key = ctx.params.get("decryption_key")
if decryption_key:
if not decryption_key.lower().startswith("age-secret-key-"):
ctx.fail(
"The given decryption key does not look like a decryption key. "
"It should start with “AGE-SECRET-KEY-”"
)
def known_key_provider(_):
return decryption_key
object_decryption_key_provider = known_key_provider
return object_decryption_key_provider
[docs]
class ContentSWHID(click.ParamType):
name = "swhid of a content object"
[docs]
def convert(self, value, param, ctx):
from swh.model.swhids import ExtendedObjectType, ExtendedSWHID, ValidationError
try:
swhid = ExtendedSWHID.from_string(value)
except ValidationError:
self.fail(f"expected SWHID, got {value!r}", param, ctx)
if swhid.object_type != ExtendedObjectType.CONTENT:
self.fail("We can only extract data for Content objects", param, ctx)
return swhid
@recovery_bundle_cli_group.command(name="extract-content")
@click.option(
"-o",
"--output",
type=click.File("wb"),
metavar="FILE",
required=True,
help="write data to FILE",
)
@click.option(
"--decryption-key",
metavar="AGE_SECRET_KEY",
help="use the given decryption key instead of the bundle shared secrets",
)
@click.option(
"-s",
"--secret",
metavar="MNEMONIC",
multiple=True,
help="Known shared secret. May be repeated.",
)
@click.option(
"-i",
"--identity",
metavar="IDENTITY",
type=click.Path(exists=True, readable=True, dir_okay=False),
multiple=True,
help="Path to file with age identities. May be repeated.",
)
@click.argument(
"recovery-bundle",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
)
@click.argument(
"SWHID",
type=ContentSWHID(),
required=True,
)
@click.pass_context
def extract_content(
ctx,
output,
recovery_bundle,
swhid,
decryption_key=None,
identity=None,
secret=None,
) -> None:
"""Extract data from content stored in a recovery bundle."""
from .recovery_bundle import RecoveryBundle, WrongDecryptionKey
secret_key_provider = get_object_decryption_key_provider(ctx)
bundle = RecoveryBundle(recovery_bundle, secret_key_provider)
if swhid not in bundle.swhids:
click.secho(
f"“{swhid}” is not in the recovery bundle", err=True, fg="red", bold=True
)
ctx.exit(1)
try:
bundle.write_content_data(swhid, output)
except WrongDecryptionKey:
click.secho(
f"Wrong decryption key for this bundle ({bundle.removal_identifier})",
err=True,
fg="red",
bold=True,
)
ctx.exit(2)
@recovery_bundle_cli_group.command(name="restore")
@click.option(
"--decryption-key",
metavar="AGE_SECRET_KEY",
help="use the given decryption key instead of the bundle shared secrets",
)
@click.option(
"-s",
"--secret",
metavar="MNEMONIC",
multiple=True,
help="Known shared secret. May be repeated.",
)
@click.option(
"-i",
"--identity",
metavar="IDENTITY",
type=click.Path(exists=True, readable=True, dir_okay=False),
multiple=True,
help="Path to file with age identities. May be repeated.",
)
@click.argument(
"recovery-bundle",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
)
@click.pass_context
def restore(
ctx, recovery_bundle, decryption_key=None, identity=None, secret=None
) -> None:
"""Restore a recovery bundle to Software Heritage archive."""
from .recovery_bundle import (
RecoveryBundle,
UnsupportedFeatureException,
WrongDecryptionKey,
)
conf = ctx.obj["config"]
from swh.storage import get_storage
restoration_storage = get_storage(**conf["restoration_storage"])
secret_key_provider = get_object_decryption_key_provider(ctx)
bundle = RecoveryBundle(recovery_bundle, secret_key_provider)
try:
missing = bundle.get_missing_referenced_objects(restoration_storage)
if len(missing) > 0:
click.secho(
"Objects to be restored are referencing objects that "
"are missing from storage:",
fg="yellow",
bold=True,
)
for swhid in missing:
click.secho(f"- {swhid}", fg="yellow")
click.confirm(
click.style(
"Proceed with restoration though it will create "
"references to missing objects?",
fg="yellow",
bold=True,
),
abort=True,
)
except UnsupportedFeatureException:
click.secho(
"Skipping checks for missing referenced objects: "
f"recovery bundle “{recovery_bundle}” is too old.",
fg="yellow",
bold=True,
)
try:
bundle.restore(restoration_storage, progressbar)
except WrongDecryptionKey:
click.echo(
f"Wrong decryption key for this bundle ({bundle.removal_identifier})"
)
ctx.exit(2)
@recovery_bundle_cli_group.command(name="resume-removal")
@click.option(
"--decryption-key",
metavar="AGE_SECRET_KEY",
prompt=True,
help="use the given decryption key instead of the bundle shared secrets",
envvar="SWH_BUNDLE_DECRYPTION_KEY",
)
@click.argument(
"recovery-bundle",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
)
@click.pass_context
def resume_removal(
ctx,
recovery_bundle,
decryption_key=None,
) -> None:
"""Resume a removal operation from a recovery bundle."""
from swh.journal.writer import JournalWriterInterface, get_journal_writer
from .notifications import RemovalNotification
from .recovery_bundle import WrongDecryptionKey
remover = get_remover(ctx)
journal_writer: Optional[JournalWriterInterface] = None
if "journal_writer" in ctx.obj["config"]:
cfg = ctx.obj["config"]["journal_writer"]
journal_writer = get_journal_writer(**cfg)
try:
bundle = remover.register_objects_from_bundle(
recovery_bundle_path=recovery_bundle, object_secret_key=decryption_key
)
except WrongDecryptionKey:
click.echo("Wrong decryption key for this bundle")
ctx.exit(2)
try:
remover.remove()
except Exception as e:
click.secho(str(e), err=True, fg="red", bold=True)
remover.restore_recovery_bundle()
ctx.exit(1)
else:
if journal_writer is not None:
notif = RemovalNotification(
removal_identifier=bundle.removal_identifier,
reason=bundle.reason or "",
requested=bundle.requested,
removed_objects=remover.swhids_to_remove,
)
journal_writer.write_addition("removal_notification", notif)
journal_writer.flush()
def _strip_rage_report(output):
# rage prompts for report when it errors like this:
# [ Did rage not do what you expected? Could an error be more useful? ]
# [ Tell us: https://str4d.xyz/rage/report ]
# This can be confusing in our case so strip them from the output.
return b"\n".join(
line
for line in output.split(b"\n")
if not line.startswith(b"[") and not line.endswith(b"]")
)
@recovery_bundle_cli_group.command(name="recover-decryption-key")
@click.option(
"-s",
"--secret",
metavar="MNEMONIC",
multiple=True,
help="Known shared secret. May be repeated.",
)
@click.option(
"-i",
"--identity",
metavar="IDENTITY",
type=click.Path(exists=True, readable=True, dir_okay=False),
multiple=True,
help="Path to file with age identities. May be repeated.",
)
@click.option(
"--show-recovered-secrets",
is_flag=True,
default=False,
help="Show recovered shared secrets. Useful for remote/distributed recoveries.",
)
@click.argument(
"recovery-bundle",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
)
def recover_decryption_key(
recovery_bundle, secret, identity, show_recovered_secrets
) -> None:
"""Recover the decryption key using shared secrets."""
import subprocess
import sys
from .recovery_bundle import RecoveryBundle
def object_decryption_key_provider(*args, **kwargs):
kwargs["known_mnemonics"] = list(secret)
kwargs["identity_files"] = list(identity)
kwargs["show_decrypted_mnemonics"] = show_recovered_secrets
return prompting_object_decryption_key_provider(*args, **kwargs)
try:
bundle = RecoveryBundle(recovery_bundle, object_decryption_key_provider)
decryption_key = bundle.object_decryption_key
click.echo(
f"\n🔓 Recovered decryption key:\n{click.style(decryption_key, bold=True)}"
)
except subprocess.CalledProcessError as ex:
if "rage" not in ex.cmd[0] and ex.cmd[1] != "--decrypt":
raise
click.echo(
f"""💥 {click.style('rage decryption failed:', bold=True, fg='red')}"""
)
click.echo(_strip_rage_report(ex.stderr))
sys.exit(1)
@recovery_bundle_cli_group.command(name="rollover")
@click.option(
"--decryption-key",
metavar="AGE_SECRET_KEY",
help="use the given decryption key instead of the bundle shared secrets",
)
@click.option(
"-s",
"--secret",
metavar="MNEMONIC",
multiple=True,
help="Known shared secret. May be repeated.",
)
@click.option(
"-i",
"--identity",
metavar="IDENTITY",
type=click.Path(exists=True, readable=True, dir_okay=False),
multiple=True,
help="Path to file with age identities. May be repeated.",
)
@click.argument(
"recovery-bundles",
metavar="[RECOVERY_BUNDLE]…",
type=click.Path(exists=True, dir_okay=False, readable=True),
nargs=-1,
)
@click.pass_context
def rollover(
ctx, recovery_bundles, decryption_key=None, identity=None, secret=None
) -> None:
"""Rollover recovery bundles to new shared secrets."""
conf = ctx.obj["config"]
from .recovery_bundle import RecoveryBundle, SecretSharing, WrongDecryptionKey
secret_key_provider = get_object_decryption_key_provider(ctx)
secret_sharing = SecretSharing.from_dict(conf["recovery_bundles"]["secret_sharing"])
click.secho("New shared secret holders:")
for share_id in sorted(secret_sharing.share_ids):
click.echo(f"- {click.style(share_id, fg='magenta', bold=True)}")
click.confirm(
click.style(
"Proceed with rolling over the shared secrets?",
fg="yellow",
bold=True,
),
abort=True,
)
for recovery_bundle in recovery_bundles:
bundle = RecoveryBundle(recovery_bundle, secret_key_provider)
# Ensure that we can decrypt at least some objects with the provided key
try:
origin = list(bundle.origins())
assert len(origin) > 0, "Oops! No Origin objects in this recovery bundle."
except WrongDecryptionKey:
click.secho(
f"Wrong decryption key for this bundle ({bundle.removal_identifier})",
err=True,
fg="red",
bold=True,
)
ctx.exit(2)
bundle.rollover(secret_sharing)
click.secho("Shared secrets for ", fg="green", nl=False)
click.secho(bundle.removal_identifier, fg="green", bold=True, nl=False)
click.secho(" have been rolled over.", fg="green")
@alter_cli_group.group(
name="handle-removal-notification", context_settings=CONTEXT_SETTINGS
)
@click.pass_context
def handle_removal_notification_cli_group(ctx):
"""Tools to handle removal notifications."""
return ctx
@handle_removal_notification_cli_group.command(name="remove")
@click.option(
"--recovery-bundle",
type=click.Path(dir_okay=False),
required=True,
)
@click.option(
"--allow-empty-content-objects/--disallow-empty-content-objects",
"allow_empty_content_objects",
default=False,
help="Create recovery bundle even when data for Content object cannot be found",
)
@click.option(
"--ignore-requested",
"ignore_requested",
metavar="ORIGIN_OR_SWHID",
multiple=True,
type=SwhidOrUrlParamType(),
help="object that should be ignored from the list of "
"requested objects to be removed",
)
@click.option(
"--ignore",
"ignore_backends",
default=None,
multiple=True,
type=click.Choice(["search", "journal", "graph"], case_sensitive=False),
help=(
"Do not make the given backend mandatory when checking the configuration; "
"this command is usually meant to remove objects from all possible data silos, "
"so the default behavior is to have all of them mandatory in the configuration "
"file. Using this option allows to explicitly ignore the given 'backend'"
),
)
@click.option(
"--recompute/--no-recompute",
default=False,
help=(
"If set, recompute locally the list of swhids to remove to apply "
"the given removal request, otherwise, use the list of swhids "
"sent in the notification message. Note that an access to swh-graph "
"is required to reocmpute this list. Implies '--ignore graph' if not set."
),
)
@click.argument(
"removal-identifier",
required=True,
)
@click.pass_context
def handle_removal_notification_with_removal(
ctx,
recovery_bundle,
removal_identifier,
allow_empty_content_objects,
ignore_requested,
ignore_backends,
recompute,
):
"""Handle removal notification by removing the request objects."""
from .inventory import RootsNotFound, StuckInventoryException
from .operations import MaskingRequestNotFound, RemoverError
from .recovery_bundle import ContentDataNotFound, SecretSharing
ignore_backends = set(ignore_backends or [])
if recompute:
ignore_backends.discard("graph")
else:
ignore_backends.add("graph")
remover = get_remover(
ctx,
require_masking_admin=True,
ignore_backends=ignore_backends,
)
try:
secret_sharing = SecretSharing.from_dict(
ctx.obj["config"]["recovery_bundles"]["secret_sharing"]
)
except ValueError as e:
raise click.ClickException(f"Wrong secret sharing configuration: {e.args[0]}")
try:
p = pathlib.Path(recovery_bundle)
p.touch(exist_ok=False)
p.unlink()
except FileExistsError:
raise click.ClickException(f"File “{recovery_bundle}” already exists")
except PermissionError:
raise click.ClickException(f"Permission denied: “{recovery_bundle}”")
try:
remover.handle_removal_notification_with_removal(
notification_removal_identifier=removal_identifier,
secret_sharing=secret_sharing,
recovery_bundle_path=recovery_bundle,
ignore_requested=ignore_requested or [],
allow_empty_content_objects=allow_empty_content_objects,
recompute_swhids_to_remove=recompute,
)
except MaskingRequestNotFound as e:
click.secho(
f"Masking request “{e.masking_request_slug}” has not been found.",
err=True,
fg="red",
)
click.secho(
f"Please double-check that “{removal_identifier}” is the identifier of "
"the notification. Otherwise, there probably was an error with the "
"processing of the notification."
)
ctx.exit(1)
except RootsNotFound:
# handle_removal_notification_with_removal() has already
# displayed the missing objects
click.secho(
"You might want to use “--ignore-requested=https://…” "
"to specify which requested object should be ignored.",
err=True,
fg="yellow",
bold="True",
)
ctx.exit(1)
except StuckInventoryException as e:
click.secho(
"Inventory phase got stuck. "
"Unable to learn the complete set of what these objects reference:",
err=True,
fg="red",
bold=True,
)
click.secho("\n".join(f"- {swhid}" for swhid in e.swhids), err=True, fg="red")
ctx.exit(1)
except ContentDataNotFound as e:
click.secho(
f"Content “{e.swhid}” exists, but its data was not found.",
err=True,
fg="red",
bold=True,
)
click.secho(
"Consider using `--allow-empty-content-objects` but only "
"if the above is expected.",
err=True,
fg="yellow",
)
ctx.exit(1)
except RemoverError:
# Showing the exception and rolling back has been handled by
# handle_removal_notification_with_removal()
ctx.exit(1)
except Exception as e:
raise e
@handle_removal_notification_cli_group.command(name="restrict-permanently")
@click.argument(
"removal-identifier",
required=True,
)
@click.pass_context
def handle_removal_notification_with_permanent_restriction(
ctx,
removal_identifier,
):
"""Handle removal notification by permanently restricting access
to objects removed from the main archive."""
from swh.storage.proxies.masking.db import MaskedState
from .operations import MaskingRequestNotFound
remover = get_remover(
ctx, require_masking_admin=True, ignore_backends=["graph", "search", "journal"]
)
try:
remover.handle_removal_notification_by_changing_masked_status(
notification_removal_identifier=removal_identifier,
masked_state=MaskedState.RESTRICTED,
)
except MaskingRequestNotFound as e:
click.secho(
f"Masking request “{e.masking_request_slug}” has not been found.",
err=True,
fg="red",
)
click.secho(
f"Please double-check that “{removal_identifier}” is the identifier of "
"the notification. Otherwise, there probably was an error with the "
"processing of the notification."
)
ctx.exit(1)
click.secho(
f"Removal notification “{removal_identifier}” handled by "
"permanently restricting access to the objects removed from the main archive.",
fg="green",
bold=True,
)
@handle_removal_notification_cli_group.command(name="dismiss")
@click.argument(
"removal-identifier",
required=True,
)
@click.pass_context
def handle_removal_notification_with_dismissal(
ctx,
removal_identifier,
):
"""Handle removal notification by making the objects removed from the
main archive visible again."""
from swh.storage.proxies.masking.db import MaskedState
from .operations import MaskingRequestNotFound
remover = get_remover(
ctx, require_masking_admin=True, ignore_backends=["graph", "search", "journal"]
)
try:
remover.handle_removal_notification_by_changing_masked_status(
notification_removal_identifier=removal_identifier,
masked_state=MaskedState.VISIBLE,
)
except MaskingRequestNotFound as e:
click.secho(
f"Masking request “{e.masking_request_slug}” has not been found.",
err=True,
fg="red",
)
click.secho(
f"Please double-check that “{removal_identifier}” is the identifier of "
"the notification. Otherwise, there probably was an error with the "
"processing of the notification."
)
ctx.exit(1)
click.secho(
f"Removal notification “{removal_identifier}” has been dismissed: "
"objects removed from the main archive are now visible again.",
fg="green",
bold=True,
)