# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import logging
import pathlib
import sys
from typing import (
TYPE_CHECKING,
Callable,
Dict,
Iterable,
Optional,
Set,
TextIO,
Tuple,
cast,
)
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
if TYPE_CHECKING:
from swh.model.model import Origin
from swh.model.swhids import ExtendedSWHID
from .operations import Remover
from .progressbar import ProgressBar, V
from .recovery_bundle import ObjectDecryptionKeyProvider, ShareDecryptionKeys
[docs]
class SWHIDType(click.ParamType):
name = "swhid"
[docs]
def convert(self, value, param, ctx) -> "ExtendedSWHID":
from swh.model.swhids import ExtendedSWHID, ValidationError
try:
return ExtendedSWHID.from_string(value)
except ValidationError:
raise click.ClickException(f"Unable to parse “{value}” as a SWHID.")
[docs]
class SwhidOrUrlParamType(click.ParamType):
name = "swhid or origin URL"
[docs]
def convert(self, value, param, ctx):
from swh.model.exceptions import ValidationError
from swh.model.model import Origin
from swh.model.swhids import ExtendedSWHID
if value.startswith("swh:1:"):
try:
return ExtendedSWHID.from_string(value)
except ValidationError:
self.fail(f"expected extended SWHID, got {value!r}", param, ctx)
else:
return Origin(url=value)
[docs]
class ClickLoggingHandler(logging.Handler):
"""Handler displaying logs using click.secho(), passing the style extra
attribute."""
[docs]
def emit(self, record):
if hasattr(record, "style"):
click.secho(self.format(record), **record.style)
else:
click.echo(self.format(record))
[docs]
def progressbar(
iterable: Optional[Iterable[V]] = None,
length: Optional[int] = None,
label: Optional[str] = None,
show_eta: bool = True,
show_pos: bool = False,
show_percent: Optional[bool] = None,
item_show_func: Optional[Callable[[V], str]] = None,
) -> ProgressBar[V]:
bar = click.progressbar(
iterable=iterable,
length=length,
label=label,
show_eta=show_eta,
show_pos=show_pos,
show_percent=show_percent,
item_show_func=item_show_func,
file=sys.stderr,
)
# We have to use `cast()` to renconcile the case where
# length is used and `click.progressbar()` returns a
# `ProgressBar[int]`. But in that case, iterable is not
# given, so V is not bound and it is safe to assume
# that V = int.
return cast("ProgressBar[V]", bar)
@swh_cli_group.group(name="alter", context_settings=CONTEXT_SETTINGS)
@click.pass_context
def alter_cli_group(ctx):
"""Archive alteration tools.
Location of the configuration should be specified through the environment
variable ``SWH_CONFIG_FILENAME``.
Expected config format:
\b
storage:
cls: remote
url: https://storage-cassandra-ro.softwareheritage.org
\b
graph:
url: "http://granet.internal.softwareheritage.org:5009/graph"
\b
restoration_storage:
cls: remote
url: https://storage-rw.softwareheritage.org
\b
removal_searches:
main:
cls: elasticsearch
hosts:
- elasticsearch:9200
\b
removal_storages:
old_primary:
cls: postgresql
db: "service=swh"
new_primary:
cls: cassandra
hosts:
- cassandra-seed
keyspace: swh
\b
removal_objstorages:
main:
cls: remote
url: https://objstorage.softwareheritage.org
\b
removal_journals:
main_journal:
cls: kafka
brokers:
- kafka1.internal.softwareheritage.org
prefix: swh.journal.objects
client_id: swh.alter.removals
\b
recovery_bundles:
secret_sharing:
minimum_required_groups: 2
groups:
legal:
minimum_required_shares: 1
recipient_keys:
"YubiKey serial 4245067 slot 1": age1yubikey1q2e37f74zzazz75mtggzql3at66pegemfnul0dtd7axctahljkvsqezscaq
"YubiKey serial 2284622 slot 3": age1yubikey4o1aypv83isatti92q1zasv1hkpuozlkoak4zd66t7poud23rftqrcszjgul
sysadmins:
minimum_required_shares: 1
recipient_keys:
"YubiKey serial 3862152 slot 1": age1yubikeyrupnxsu6uneqxw146g9szaofyxexiy4nhnzqg1ayb9b85g8h4oardwj6c212
"Ruby": age1y6epp27nq8n4faj8g8hkw8thcvj744y5vnr8jyfmp4857d6npc3qn9k7jz
The identifier for the recipient key must be in the form of
“YubiKey serial ####### slot #” if the secret key is stored
on a YubiKey. Keys specified by any other identifiers will be
considered as plain age identities.
""" # noqa: B950
from swh.core import config
from .operations import logger as operations_logger
from .recovery_bundle import logger as recovery_bundle_logger
try:
conf = config.load_from_envvar()
except AssertionError as ex:
raise click.ClickException(ex.args[0])
ctx.ensure_object(dict)
ctx.obj["config"] = conf
for logger in (operations_logger, recovery_bundle_logger):
if not logger.propagate:
# Avoid configuring the logger twice
continue
logger.propagate = False
logger.addHandler(ClickLoggingHandler())
return ctx
[docs]
def read_swhids(file: TextIO) -> Set["ExtendedSWHID"]:
import re
from swh.model.swhids import ExtendedSWHID
filter_re = re.compile(r"^(#|$)")
return {
ExtendedSWHID.from_string(line.strip())
for line in file.read().split("\n")
if not filter_re.match(line)
}
[docs]
def get_remover(ctx: click.Context, dry_run: bool = False) -> "Remover":
from swh.core.api import RemoteException
from swh.graph.http_client import GraphAPIError, RemoteGraphClient
from swh.journal.writer import get_journal_writer
from swh.journal.writer.kafka import KafkaJournalWriter
from swh.objstorage.factory import get_objstorage
from swh.objstorage.interface import ObjStorageInterface
from swh.search import get_search
from swh.search.interface import SearchInterface
from swh.storage import get_storage
from swh.storage.interface import ObjectDeletionInterface
from .operations import Remover
conf = ctx.obj["config"]
try:
graph_client = RemoteGraphClient(**conf["graph"])
except GraphAPIError as e:
raise click.ClickException(f"Unable to connect to the graph server: {e.args}")
storage = get_storage(**conf["storage"])
if not dry_run:
if "restoration_storage" not in conf:
raise click.ClickException(
"Configuration does not define `restoration_storage`"
)
if "removal_searches" not in conf or len(conf["removal_searches"]) == 0:
raise click.ClickException(
"Configuration does not define any `removal_searches`"
)
if "removal_storages" not in conf or len(conf["removal_storages"]) == 0:
raise click.ClickException(
"Configuration does not define any `removal_storages`"
)
if "removal_objstorages" not in conf or len(conf["removal_objstorages"]) == 0:
raise click.ClickException(
"Configuration does not define any `removal_objstorages`"
)
if "removal_journals" not in conf or len(conf["removal_journals"]) == 0:
raise click.ClickException(
"Configuration does not define any `removal_journals`"
)
restoration_storage = (
get_storage(**conf["restoration_storage"])
if "restoration_storage" in conf
else None
)
removal_searches = {}
for name, d in conf.get("removal_searches", {}).items():
removal_searches[name] = get_search(**d)
try:
removal_searches[name].check()
except RemoteException as e:
raise click.ClickException(f"Search “{name}” is unreachable: {e}")
removal_storages = {}
for name, d in conf.get("removal_storages", {}).items():
removal_storage = get_storage(**d)
assert hasattr(
removal_storage, "object_delete"
), f"storage “{name}” does not implement ObjectDeletionInterface"
removal_storages[name] = removal_storage
removal_objstorages = {}
for name, d in conf.get("removal_objstorages", {}).items():
removal_objstorages[name] = get_objstorage(**d)
removal_journals = {}
for name, d in conf.get("removal_journals", {}).items():
journal_writer = get_journal_writer(**d)
assert isinstance(
journal_writer, KafkaJournalWriter
), "journal writer is not kafka-based"
removal_journals[name] = journal_writer
known_missing = set(ctx.params.get("known_missing_swhids", set()))
if known_missing_file := ctx.params.get("known_missing_file"):
known_missing.update(read_swhids(known_missing_file))
return Remover(
storage=storage,
graph_client=graph_client,
restoration_storage=restoration_storage,
removal_searches=cast(Dict[str, SearchInterface], removal_searches),
removal_storages=cast(Dict[str, ObjectDeletionInterface], removal_storages),
removal_objstorages=cast(Dict[str, ObjStorageInterface], removal_objstorages),
removal_journals=cast(Dict[str, KafkaJournalWriter], removal_journals),
known_missing=known_missing,
progressbar=progressbar,
)
@alter_cli_group.command()
@click.option(
"--dry-run",
type=click.Choice(
["stop-before-recovery-bundle", "stop-before-removal"], case_sensitive=False
),
help="perform a trial run",
)
@click.option(
"--output-inventory-subgraph",
type=click.File(mode="w", atomic=True),
)
@click.option(
"--output-removable-subgraph",
type=click.File(mode="w", atomic=True),
)
@click.option(
"--output-pruned-removable-subgraph",
type=click.File(mode="w", atomic=True),
)
@click.option(
"--identifier",
metavar="IDENTIFIER",
required=True,
help="identifier for this removal operation",
)
@click.option(
"--reason",
metavar="REASON",
help="reason for this removal operation",
)
@click.option(
"--expire",
metavar="YYYY-MM-DD",
type=click.DateTime(formats=["%Y-%m-%d"]),
help="date when the recovery bundle should be removed",
)
@click.option(
"--recovery-bundle",
metavar="PATH",
type=click.Path(dir_okay=False),
required=True,
help="path to the recovery bundle that will be created",
)
@click.option(
"--known-missing",
"known_missing_swhids",
metavar="SWHID",
type=SWHIDType(),
multiple=True,
help="object known to be missing from storage",
)
@click.option(
"--known-missing-file",
"known_missing_file",
metavar="PATH",
type=click.File(),
help=(
"file (or '-') with object known to be missing from storage, "
"one SWHID per line"
),
)
@click.option(
"--allow-empty-content-objects/--disallow-empty-content-objects",
"allow_empty_content_objects",
default=False,
help="Create recovery bundle even when data for Content object cannot be found",
)
@click.argument(
"requested",
metavar="<SWHID|URL>..",
type=SwhidOrUrlParamType(),
required=True,
nargs=-1,
)
@click.pass_context
def remove(
ctx,
requested: Tuple["Origin" | "ExtendedSWHID", ...],
dry_run: bool,
output_inventory_subgraph,
output_removable_subgraph,
output_pruned_removable_subgraph,
identifier,
reason,
expire,
recovery_bundle,
known_missing_swhids,
known_missing_file,
allow_empty_content_objects,
) -> None:
"""Remove the given SWHIDs or URLs from the archive."""
from swh.model.model import Origin
from .inventory import OriginNotFound, StuckInventoryException
from .operations import RemoverError
from .recovery_bundle import ContentDataNotFound, SecretSharing
try:
secret_sharing = SecretSharing.from_dict(
ctx.obj["config"]["recovery_bundles"]["secret_sharing"]
)
except ValueError as e:
raise click.ClickException(f"Wrong secret sharing configuration: {e.args[0]}")
if dry_run != "stop-before-recovery-bundle":
try:
p = pathlib.Path(recovery_bundle)
p.touch(exist_ok=False)
p.unlink()
except FileExistsError:
raise click.ClickException(f"File “{recovery_bundle}” already exists")
except PermissionError:
raise click.ClickException(f"Permission denied: “{recovery_bundle}”")
remover = get_remover(ctx, dry_run)
swhids = [x.swhid() if isinstance(x, Origin) else x for x in requested]
try:
removable = remover.get_removable(
swhids,
output_inventory_subgraph=output_inventory_subgraph,
output_removable_subgraph=output_removable_subgraph,
output_pruned_removable_subgraph=output_pruned_removable_subgraph,
)
removable.print_plan()
if dry_run == "stop-before-recovery-bundle":
click.echo("Stopping before creating the recovery bundle as requested.")
ctx.exit(0)
if dry_run is None:
click.confirm(
click.style(
"Proceed?",
fg="yellow",
bold=True,
),
abort=True,
)
decryption_key = remover.create_recovery_bundle(
secret_sharing=secret_sharing,
requested=list(requested),
removable=removable,
recovery_bundle_path=recovery_bundle,
removal_identifier=identifier,
reason=reason,
expire=expire.astimezone() if expire else None,
allow_empty_content_objects=allow_empty_content_objects,
)
click.secho(f"Recovery bundle decryption key: {decryption_key}", fg="blue")
except RemoverError as e:
click.secho(e.args[0], err=True, fg="red")
ctx.exit(1)
except OriginNotFound as e:
click.secho(
f"Origin “{e.get_label(requested)}” not found.",
err=True,
fg="red",
bold=True,
)
ctx.exit(1)
except StuckInventoryException as e:
click.secho(
"Inventory phase got stuck. "
"Unable to learn the complete set of what these objects reference:\n",
err=True,
fg="red",
bold=True,
)
click.secho("\n".join(f"- {swhid}" for swhid in e.swhids), err=True, fg="red")
ctx.exit(1)
except ContentDataNotFound as e:
click.secho(
f"Content “{e.swhid}” exists, but its data was not found.",
err=True,
fg="red",
bold=True,
)
click.secho(
"Consider using `--allow-empty-content-objects` but only "
"if the above is expected.",
err=True,
fg="yellow",
)
ctx.exit(1)
if dry_run == "stop-before-removal":
click.echo("Stopping before removal.")
ctx.exit(0)
try:
remover.remove()
except Exception as e:
click.secho(str(e), err=True, fg="red", bold=True)
click.secho("Rolling back…", fg="cyan")
remover.restore_recovery_bundle()
ctx.exit(1)
@alter_cli_group.command("list-candidates")
@click.option(
"--omit-referenced/--no-omit-referenced",
default=True,
help="Omit candidates that are referenced by other objects",
)
@click.argument(
"requested",
metavar="<SWHID|URL>..",
type=SwhidOrUrlParamType(),
required=True,
nargs=-1,
)
@click.pass_context
def list_candidates(
ctx: click.Context,
requested: Tuple["Origin" | "ExtendedSWHID", ...],
omit_referenced: bool,
):
"""List candidates for an altering operation (e.g. removal)
Display a list of SWHIDs of objects that would be affected by an altering
operation targeting the SWHIDs (or origin URLs) given as arguments.
Candidates referenced by objects in the graph outside the set of candidates
will be filtered out, unless `--no-omit-referenced` is given.
"""
from swh.graph.http_client import GraphAPIError, RemoteGraphClient
from swh.model.model import Origin
from swh.storage import get_storage
from .inventory import (
OriginNotFound,
StuckInventoryException,
get_raw_extrinsic_metadata,
make_inventory,
)
from .removable import mark_removable
conf = ctx.obj["config"]
try:
graph_client = RemoteGraphClient(**conf["graph"])
except GraphAPIError as e:
raise click.ClickException(f"Unable to connect to the graph server: {e.args}")
storage = get_storage(**conf["storage"])
swhids = [x.swhid() if isinstance(x, Origin) else x for x in requested]
try:
subgraph = make_inventory(
storage, graph_client, swhids, progressbar=progressbar
)
except StuckInventoryException as e:
click.secho(
"Inventory phase got stuck. "
"Unable to learn the complete set of what these objects reference:\n",
err=True,
fg="red",
bold=True,
)
click.secho("\n".join(f"- {swhid}" for swhid in e.swhids), err=True, fg="red")
ctx.exit(1)
except OriginNotFound as e:
click.secho(
f"Origin “{e.get_label(requested)}” not found.",
err=True,
fg="red",
bold=True,
)
ctx.exit(1)
if omit_referenced:
subgraph = mark_removable(
storage, graph_client, subgraph, progressbar=progressbar
)
subgraph.delete_unremovable()
removable_swhids = list(subgraph.swhids())
removable_swhids.extend(
get_raw_extrinsic_metadata(storage, removable_swhids, progressbar=progressbar)
)
for swhid in removable_swhids:
click.echo(swhid)
@alter_cli_group.group(name="recovery-bundle", context_settings=CONTEXT_SETTINGS)
@click.pass_context
def recovery_bundle_cli_group(ctx):
"""Recovery bundle related tools."""
return ctx
@recovery_bundle_cli_group.command(name="info")
@click.option(
"--dump-manifest",
is_flag=True,
default=False,
help="Show raw manifest in YAML format.",
)
@click.option(
"--show-encrypted-secrets",
is_flag=True,
default=False,
help="Show encrypted secrets.",
)
@click.argument(
"recovery-bundle",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
)
@click.pass_context
def info(ctx, recovery_bundle, dump_manifest, show_encrypted_secrets) -> None:
"""Display the manifest of the given recovery bundle."""
from swh.model.model import Origin
from .recovery_bundle import RecoveryBundle
bundle = RecoveryBundle(recovery_bundle)
if dump_manifest:
click.echo(bundle.dump_manifest(), nl=False)
ctx.exit()
title = f"Recovery bundle “{bundle.removal_identifier}”"
click.echo(title)
click.echo("=" * len(title))
click.echo("")
click.echo(f"Created: {bundle.created.isoformat()}")
if bundle.reason:
lines = bundle.reason.rstrip().split("\n")
lines[0] = f"Reason: {lines[0]}"
click.echo("\n ".join(lines))
if bundle.expire:
click.echo(f"Expire: {bundle.expire}")
if bundle.version >= 3:
click.echo("Removal requested for:")
for x in bundle.requested:
click.echo(f"- {x.url if isinstance(x, Origin) else x}")
click.echo("SWHID of the objects present in the bundle:")
for swhid in bundle.swhids:
click.echo(f"- {swhid}")
if bundle.version >= 3 and len(bundle.referencing):
click.echo("SWHID referenced by objects in this bundle:")
for swhid in bundle.referencing:
click.echo(f"- {swhid}")
click.echo("Secret share holders:")
for share_id in sorted(bundle.share_ids):
click.echo(f"- {share_id}")
if show_encrypted_secrets:
click.echo(bundle.encrypted_secret(share_id))
def _share_decryption_keys_provider(share_ids: Set[str]) -> ShareDecryptionKeys:
import subprocess
import sys
from .recovery_bundle import list_yubikey_identities
for attempt in range(1, 10):
if not any(share_id.startswith("YubiKey") for share_id in share_ids):
# No shares require a YubiKey, so there is nothing we can do here
break
try:
for share_id, secret_key in list_yubikey_identities():
if share_id not in share_ids:
continue
share_ids.remove(share_id)
click.echo(
"🔧 Decrypting share using "
f"{click.style(share_id, fg='magenta', bold=True)}…"
)
click.echo("💭 You might need to tap the right YubiKey when it blinks.")
yield share_id, secret_key
click.echo()
except subprocess.CalledProcessError as ex:
if "age-plugin-yubikey" not in ex.cmd[0]:
raise
click.echo(
f"""💥 {click.style('age-plugin-yubikey failed to '
'list connected YubiKeys.', bold=True, fg='red')}"""
)
click.echo("💭 Please disconnect all YubiKeys and retry.")
sys.exit(1)
if share_ids:
yubikey_ids = list(sorted(share_ids))
if len(yubikey_ids) > 1:
yubikeys = ", ".join(
click.style(share_id, fg="magenta", bold=True)
for share_id in yubikey_ids[:-1]
)
yubikeys += " or " + click.style(
yubikey_ids[-1], fg="magenta", bold=True
)
else:
yubikeys = click.style(yubikey_ids[0], fg="magenta", bold=True)
click.prompt(
f"🔐 Please insert {yubikeys} and press "
f"{click.style('Enter', fg='green', bold=True)}…",
default="Ok",
show_default=False,
hide_input=True,
prompt_suffix="",
)
click.echo(
f"""💥 {click.style('Unable to decrypt enough shared secrets to recover '
'the object decryption key. Aborting.',
bold=True, fg='red')}"""
)
sys.exit(1)
def _print_decrypted_mnemonic(mnemonic: str, share_id: Optional[str] = None) -> None:
fmt_from = ""
if share_id:
fmt_from = f" from {click.style(share_id, fg='magenta', bold=True)}"
click.echo(f"🔑 Recovered shared secret{fmt_from}:")
# Quoting from SLIP-0039: This construction yields a beneficial
# property where the random identifier and the iteration exponent
# transform into the first two words of the mnemonic code, so the user
# can immediately tell whether the correct shares are being combined,
# i.e. they have to have the same first two words. Moreover, the third
# word encodes the group index, group threshold and part of the group
# count. Since the group threshold and group count are constant, all
# **shares belonging to the same group start with the same three words**.
words = mnemonic.split()
click.echo(
" ".join(
click.style(word, fg="blue", bold=index < 3)
for index, word in enumerate(words)
)
)
def _recover_mnemonics_from_identity_files(
manifest, share_ids, identity_files, show_decrypted_mnemonics
):
from .recovery_bundle import WrongDecryptionKey, age_decrypt_from_identity
# As we can’t know which identity file corresponds to which encrypted shared
# secret, we have to try them all and see which one we can actually decrypt.
recovered = {}
for identity_file in identity_files:
for share_id in share_ids:
try:
recovered[share_id] = age_decrypt_from_identity(
identity_file, manifest.decryption_key_shares[share_id]
).decode("us-ascii")
if show_decrypted_mnemonics:
_print_decrypted_mnemonic(recovered[share_id], share_id)
except WrongDecryptionKey:
pass
return recovered
[docs]
def prompting_object_decryption_key_provider(
manifest, known_mnemonics=None, identity_files=None, show_decrypted_mnemonics=False
) -> str:
import functools
from .recovery_bundle import recover_object_decryption_key_from_encrypted_shares
decrypted_mnemonic_processor = None
if show_decrypted_mnemonics:
decrypted_mnemonic_processor = _print_decrypted_mnemonic
share_ids = set(manifest.decryption_key_shares.keys())
# Normalize known_mnemonics
known_mnemonics = list(known_mnemonics or [])
if identity_files:
recovered = _recover_mnemonics_from_identity_files(
manifest, share_ids, identity_files, show_decrypted_mnemonics
)
share_ids.difference_update(recovered.keys())
known_mnemonics.extend(recovered.values())
yubikey_share_ids = set(
share_id for share_id in share_ids if share_id.startswith("YubiKey")
)
missing_ids = share_ids - yubikey_share_ids
if missing_ids:
fmt_ids = ", ".join(
click.style(share_id, fg="magenta", bold=True) for share_id in missing_ids
)
click.echo(
f"""\n🚸 {click.style('The following secret shares will not be '
'decrypted:', fg='yellow')} {fmt_ids}\n"""
)
return recover_object_decryption_key_from_encrypted_shares(
manifest.decryption_key_shares,
functools.partial(_share_decryption_keys_provider, yubikey_share_ids),
decrypted_mnemonic_processor=decrypted_mnemonic_processor,
known_mnemonics=known_mnemonics,
)
[docs]
def get_object_decryption_key_provider(ctx) -> ObjectDecryptionKeyProvider:
import functools
secrets = ctx.params.get("secret")
identity_files = ctx.params.get("identity")
object_decryption_key_provider: ObjectDecryptionKeyProvider = functools.partial(
prompting_object_decryption_key_provider,
known_mnemonics=secrets,
identity_files=identity_files,
)
decryption_key = ctx.params.get("decryption_key")
if decryption_key:
if not decryption_key.lower().startswith("age-secret-key-"):
ctx.fail(
"The given decryption key does not look like a decryption key. "
"It should start with “AGE-SECRET-KEY-”"
)
def known_key_provider(_):
return decryption_key
object_decryption_key_provider = known_key_provider
return object_decryption_key_provider
[docs]
class ContentSWHID(click.ParamType):
name = "swhid of a content object"
[docs]
def convert(self, value, param, ctx):
from swh.model.swhids import ExtendedObjectType, ExtendedSWHID, ValidationError
try:
swhid = ExtendedSWHID.from_string(value)
except ValidationError:
self.fail(f"expected SWHID, got {value!r}", param, ctx)
if swhid.object_type != ExtendedObjectType.CONTENT:
self.fail("We can only extract data for Content objects", param, ctx)
return swhid
@recovery_bundle_cli_group.command(name="extract-content")
@click.option(
"-o",
"--output",
type=click.File("wb"),
metavar="FILE",
required=True,
help="write data to FILE",
)
@click.option(
"--decryption-key",
metavar="AGE_SECRET_KEY",
help="use the given decryption key instead of the bundle shared secrets",
)
@click.option(
"-s",
"--secret",
metavar="MNEMONIC",
multiple=True,
help="Known shared secret. May be repeated.",
)
@click.option(
"-i",
"--identity",
metavar="IDENTITY",
type=click.Path(exists=True, readable=True, dir_okay=False),
multiple=True,
help="Path to file with age identities. May be repeated.",
)
@click.argument(
"recovery-bundle",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
)
@click.argument(
"SWHID",
type=ContentSWHID(),
required=True,
)
@click.pass_context
def extract_content(
ctx,
output,
recovery_bundle,
swhid,
decryption_key=None,
identity=None,
secret=None,
) -> None:
"""Extract data from content stored in a recovery bundle."""
from .recovery_bundle import RecoveryBundle, WrongDecryptionKey
secret_key_provider = get_object_decryption_key_provider(ctx)
bundle = RecoveryBundle(recovery_bundle, secret_key_provider)
if swhid not in bundle.swhids:
click.secho(
f"“{swhid}” is not in the recovery bundle", err=True, fg="red", bold=True
)
ctx.exit(1)
try:
bundle.write_content_data(swhid, output)
except WrongDecryptionKey:
click.secho(
f"Wrong decryption key for this bundle ({bundle.removal_identifier})",
err=True,
fg="red",
bold=True,
)
ctx.exit(2)
@recovery_bundle_cli_group.command(name="restore")
@click.option(
"--decryption-key",
metavar="AGE_SECRET_KEY",
help="use the given decryption key instead of the bundle shared secrets",
)
@click.option(
"-s",
"--secret",
metavar="MNEMONIC",
multiple=True,
help="Known shared secret. May be repeated.",
)
@click.option(
"-i",
"--identity",
metavar="IDENTITY",
type=click.Path(exists=True, readable=True, dir_okay=False),
multiple=True,
help="Path to file with age identities. May be repeated.",
)
@click.argument(
"recovery-bundle",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
)
@click.pass_context
def restore(
ctx, recovery_bundle, decryption_key=None, identity=None, secret=None
) -> None:
"""Restore a recovery bundle to Software Heritage archive."""
from .recovery_bundle import (
RecoveryBundle,
UnsupportedFeatureException,
WrongDecryptionKey,
)
conf = ctx.obj["config"]
from swh.storage import get_storage
restoration_storage = get_storage(**conf["restoration_storage"])
secret_key_provider = get_object_decryption_key_provider(ctx)
bundle = RecoveryBundle(recovery_bundle, secret_key_provider)
try:
missing = bundle.get_missing_referenced_objects(restoration_storage)
if len(missing) > 0:
click.secho(
"Objects to be restored are referencing objects that "
"are missing from storage:",
fg="yellow",
bold=True,
)
for swhid in missing:
click.secho(f"- {swhid}", fg="yellow")
click.confirm(
click.style(
"Proceed with restoration though it will create "
"references to missing objects?",
fg="yellow",
bold=True,
),
abort=True,
)
except UnsupportedFeatureException:
click.secho(
"Skipping checks for missing referenced objects: "
f"recovery bundle “{recovery_bundle}” is too old.",
fg="yellow",
bold=True,
)
try:
bundle.restore(restoration_storage, progressbar)
except WrongDecryptionKey:
click.echo(
f"Wrong decryption key for this bundle ({bundle.removal_identifier})"
)
ctx.exit(2)
@recovery_bundle_cli_group.command(name="resume-removal")
@click.option(
"--decryption-key",
metavar="AGE_SECRET_KEY",
prompt=True,
help="use the given decryption key instead of the bundle shared secrets",
envvar="SWH_BUNDLE_DECRYPTION_KEY",
)
@click.argument(
"recovery-bundle",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
)
@click.pass_context
def resume_removal(
ctx,
recovery_bundle,
decryption_key=None,
) -> None:
"""Resume a removal operation from a recovery bundle."""
from .recovery_bundle import WrongDecryptionKey
remover = get_remover(ctx)
try:
remover.register_objects_from_bundle(
recovery_bundle_path=recovery_bundle, object_secret_key=decryption_key
)
except WrongDecryptionKey:
click.echo("Wrong decryption key for this bundle")
ctx.exit(2)
try:
remover.remove()
except Exception as e:
click.secho(str(e), err=True, fg="red", bold=True)
remover.restore_recovery_bundle()
ctx.exit(1)
def _strip_rage_report(output):
# rage prompts for report when it errors like this:
# [ Did rage not do what you expected? Could an error be more useful? ]
# [ Tell us: https://str4d.xyz/rage/report ]
# This can be confusing in our case so strip them from the output.
return b"\n".join(
line
for line in output.split(b"\n")
if not line.startswith(b"[") and not line.endswith(b"]")
)
@recovery_bundle_cli_group.command(name="recover-decryption-key")
@click.option(
"-s",
"--secret",
metavar="MNEMONIC",
multiple=True,
help="Known shared secret. May be repeated.",
)
@click.option(
"-i",
"--identity",
metavar="IDENTITY",
type=click.Path(exists=True, readable=True, dir_okay=False),
multiple=True,
help="Path to file with age identities. May be repeated.",
)
@click.option(
"--show-recovered-secrets",
is_flag=True,
default=False,
help="Show recovered shared secrets. Useful for remote/distributed recoveries.",
)
@click.argument(
"recovery-bundle",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
)
def recover_decryption_key(
recovery_bundle, secret, identity, show_recovered_secrets
) -> None:
"""Recover the decryption key using shared secrets."""
import subprocess
import sys
from .recovery_bundle import RecoveryBundle
def object_decryption_key_provider(*args, **kwargs):
kwargs["known_mnemonics"] = list(secret)
kwargs["identity_files"] = list(identity)
kwargs["show_decrypted_mnemonics"] = show_recovered_secrets
return prompting_object_decryption_key_provider(*args, **kwargs)
try:
bundle = RecoveryBundle(recovery_bundle, object_decryption_key_provider)
decryption_key = bundle.object_decryption_key
click.echo(
f"\n🔓 Recovered decryption key:\n{click.style(decryption_key, bold=True)}"
)
except subprocess.CalledProcessError as ex:
if "rage" not in ex.cmd[0] and ex.cmd[1] != "--decrypt":
raise
click.echo(
f"""💥 {click.style('rage decryption failed:', bold=True, fg='red')}"""
)
click.echo(_strip_rage_report(ex.stderr))
sys.exit(1)
@recovery_bundle_cli_group.command(name="rollover")
@click.option(
"--decryption-key",
metavar="AGE_SECRET_KEY",
help="use the given decryption key instead of the bundle shared secrets",
)
@click.option(
"-s",
"--secret",
metavar="MNEMONIC",
multiple=True,
help="Known shared secret. May be repeated.",
)
@click.option(
"-i",
"--identity",
metavar="IDENTITY",
type=click.Path(exists=True, readable=True, dir_okay=False),
multiple=True,
help="Path to file with age identities. May be repeated.",
)
@click.argument(
"recovery-bundles",
metavar="[RECOVERY_BUNDLE]…",
type=click.Path(exists=True, dir_okay=False, readable=True),
nargs=-1,
)
@click.pass_context
def rollover(
ctx, recovery_bundles, decryption_key=None, identity=None, secret=None
) -> None:
"""Rollover recovery bundles to new shared secrets."""
conf = ctx.obj["config"]
from .recovery_bundle import RecoveryBundle, SecretSharing, WrongDecryptionKey
secret_key_provider = get_object_decryption_key_provider(ctx)
secret_sharing = SecretSharing.from_dict(conf["recovery_bundles"]["secret_sharing"])
click.secho("New shared secret holders:")
for share_id in sorted(secret_sharing.share_ids):
click.echo(f"- {click.style(share_id, fg='magenta', bold=True)}")
click.confirm(
click.style(
"Proceed with rolling over the shared secrets?",
fg="yellow",
bold=True,
),
abort=True,
)
for recovery_bundle in recovery_bundles:
bundle = RecoveryBundle(recovery_bundle, secret_key_provider)
# Ensure that we can decrypt at least some objects with the provided key
try:
origin = list(bundle.origins())
assert len(origin) > 0, "Oops! No Origin objects in this recovery bundle."
except WrongDecryptionKey:
click.secho(
f"Wrong decryption key for this bundle ({bundle.removal_identifier})",
err=True,
fg="red",
bold=True,
)
ctx.exit(2)
bundle.rollover(secret_sharing)
click.secho("Shared secrets for ", fg="green", nl=False)
click.secho(bundle.removal_identifier, fg="green", bold=True, nl=False)
click.secho(" have been rolled over.", fg="green")