Source code for swh.alter.cli

# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Set, cast

import click

from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group

if TYPE_CHECKING:
    from swh.model.swhids import ExtendedSWHID

    from .recovery_bundle import ObjectDecryptionKeyProvider, ShareDecryptionKeys


[docs] class SwhidOrUrlParamType(click.ParamType): name = "swhid or origin URL"
[docs] def convert(self, value, param, ctx): import hashlib from swh.model.exceptions import ValidationError from swh.model.swhids import ExtendedSWHID if value.startswith("swh:1:"): try: return ExtendedSWHID.from_string(value) except ValidationError: self.fail(f"expected extended SWHID, got {value!r}", param, ctx) else: click.secho(f"Assuming {value} is an origin URL.", fg="cyan") sha1 = hashlib.sha1(value.encode("utf-8")).hexdigest() swhid = ExtendedSWHID.from_string(f"swh:1:ori:{sha1}") return swhid
[docs] class ClickLoggingHandler(logging.Handler): """Handler displaying logs using click.secho(), passing the style extra attribute."""
[docs] def emit(self, record): if hasattr(record, "style"): click.secho(self.format(record), **record.style) else: click.echo(self.format(record))
DEFAULT_CONFIG = { "storage": { "cls": "postgresql", "db": "dbname=softwareheritage host=db.internal.softwareheritage.org user=guest", "objstorage": { "cls": "memory", }, }, "graph": { "url": "http://granet.internal.softwareheritage.org:5009/graph", # timeout is in seconds # see https://requests.readthedocs.io/en/latest/user/advanced/#timeouts "timeout": 10, }, "recovery_bundles": { "secret_sharing": { "minimum_required_groups": 2, "groups": {}, } }, } @swh_cli_group.group(name="alter", context_settings=CONTEXT_SETTINGS) @click.pass_context def alter_cli_group(ctx): """Archive alteration tools. Location of the configuration should be specified through the environment variable ``SWH_CONFIG_FILENAME``. Expected config format: \b storage: cls: remote url: https://storage-cassandra-ro.softwareheritage.org \b graph: url: "http://granet.internal.softwareheritage.org:5009/graph" \b restoration_storage: cls: remote url: https://storage-rw.softwareheritage.org \b removal_searches: main: cls: elasticsearch hosts: - elasticsearch:9200 \b removal_storages: old_primary: cls: postgresql db: "service=swh" new_primary: cls: cassandra hosts: - cassandra-seed keyspace: swh \b removal_objstorages: main: cls: remote url: https://objstorage.softwareheritage.org \b removal_journals: main_journal: cls: kafka brokers: - kafka1.internal.softwareheritage.org prefix: swh.journal.objects client_id: swh.alter.removals \b recovery_bundles: secret_sharing: minimum_required_groups: 2 groups: legal: minimum_required_shares: 1 recipient_keys: "YubiKey serial 4245067 slot 1": age1yubikey1q2e37f74zzazz75mtggzql3at66pegemfnul0dtd7axctahljkvsqezscaq "YubiKey serial 2284622 slot 3": age1yubikey4o1aypv83isatti92q1zasv1hkpuozlkoak4zd66t7poud23rftqrcszjgul sysadmins: minimum_required_shares: 1 recipient_keys: "YubiKey serial 3862152 slot 1": age1yubikeyrupnxsu6uneqxw146g9szaofyxexiy4nhnzqg1ayb9b85g8h4oardwj6c212 "Ruby": age1y6epp27nq8n4faj8g8hkw8thcvj744y5vnr8jyfmp4857d6npc3qn9k7jz The identifier for the recipient key must be in the form of “YubiKey serial ####### slot #” if the secret key is stored on a YubiKey. Keys specified by any other identifiers will be considered as plain age identities. """ # noqa: B950 from swh.core import config conf = config.load_from_envvar(default_config=DEFAULT_CONFIG) ctx.ensure_object(dict) ctx.obj["config"] = conf return ctx @alter_cli_group.command() @click.option( "--dry-run", type=click.Choice( ["stop-before-recovery-bundle", "stop-before-removal"], case_sensitive=False ), help="perform a trial run", ) @click.option( "--output-inventory-subgraph", type=click.File(mode="w", atomic=True), ) @click.option( "--output-removable-subgraph", type=click.File(mode="w", atomic=True), ) @click.option( "--output-pruned-removable-subgraph", type=click.File(mode="w", atomic=True), ) @click.option( "--identifier", metavar="IDENTIFIER", required=True, help="identifier for this removal operation", ) @click.option( "--reason", metavar="REASON", help="reason for this removal operation", ) @click.option( "--expire", metavar="YYYY-MM-DD", type=click.DateTime(formats=["%Y-%m-%d"]), help="date when the recovery bundle should be removed", ) @click.option( "--recovery-bundle", metavar="PATH", type=click.Path(writable=True, dir_okay=False), required=True, help="path to the recovery bundle that will be created", ) @click.argument( "swhids", metavar="<SWHID|URL>..", type=SwhidOrUrlParamType(), required=True, nargs=-1, ) @click.pass_context def remove( ctx, swhids: List["ExtendedSWHID"], dry_run: bool, output_inventory_subgraph, output_removable_subgraph, output_pruned_removable_subgraph, identifier, reason, expire, recovery_bundle, ) -> None: """Remove the given SWHIDs or URLs from the archive.""" from swh.core.api import RemoteException from swh.graph.http_client import GraphAPIError, RemoteGraphClient from swh.journal.writer import get_journal_writer from swh.journal.writer.kafka import KafkaJournalWriter from swh.objstorage.factory import get_objstorage from swh.objstorage.interface import ObjStorageInterface from swh.search import get_search from swh.search.interface import SearchInterface from swh.storage import get_storage from swh.storage.interface import ObjectDeletionInterface from .operations import Remover, RemoverError, logger from .recovery_bundle import SecretSharing logger.propagate = False logger.addHandler(ClickLoggingHandler()) logger.setLevel(logging.INFO) conf = ctx.obj["config"] try: graph_client = RemoteGraphClient(**conf["graph"]) except GraphAPIError as e: raise click.ClickException(f"Unable to connect to the graph server: {e.args}") storage = get_storage(**conf["storage"]) if not dry_run: if "restoration_storage" not in conf: raise click.ClickException( "Configuration does not define `restoration_storage`" ) if "removal_searches" not in conf or len(conf["removal_searches"]) == 0: raise click.ClickException( "Configuration does not define any `removal_searches`" ) if "removal_storages" not in conf or len(conf["removal_storages"]) == 0: raise click.ClickException( "Configuration does not define any `removal_storages`" ) if "removal_objstorages" not in conf or len(conf["removal_objstorages"]) == 0: raise click.ClickException( "Configuration does not define any `removal_objstorages`" ) if "removal_journals" not in conf or len(conf["removal_journals"]) == 0: raise click.ClickException( "Configuration does not define any `removal_journals`" ) restoration_storage = ( get_storage(**conf["restoration_storage"]) if "restoration_storage" in conf else None ) removal_searches = {} for name, d in conf.get("removal_searches", {}).items(): removal_searches[name] = get_search(**d) try: removal_searches[name].check() except RemoteException as e: raise click.ClickException(f"Search “{name}” is unreachable: {e}") removal_storages = {} for name, d in conf.get("removal_storages", {}).items(): removal_storage = get_storage(**d) assert hasattr( removal_storage, "object_delete" ), f"storage “{name}” does not implement ObjectDeletionInterface" removal_storages[name] = removal_storage removal_objstorages = {} for name, d in conf.get("removal_objstorages", {}).items(): removal_objstorages[name] = get_objstorage(**d) removal_journals = {} for name, d in conf.get("removal_journals", {}).items(): journal_writer = get_journal_writer(**d) assert isinstance( journal_writer, KafkaJournalWriter ), "journal writer is not kafka-based" removal_journals[name] = journal_writer try: secret_sharing = SecretSharing.from_dict( conf["recovery_bundles"]["secret_sharing"] ) except ValueError as e: raise click.ClickException(f"Wrong secret sharing configuration: {e.args[0]}") remover = Remover( storage=storage, graph_client=graph_client, restoration_storage=restoration_storage, removal_searches=cast(Dict[str, SearchInterface], removal_searches), removal_storages=cast(Dict[str, ObjectDeletionInterface], removal_storages), removal_objstorages=cast(Dict[str, ObjStorageInterface], removal_objstorages), removal_journals=cast(Dict[str, KafkaJournalWriter], removal_journals), ) try: removable_swhids = remover.get_removable( swhids, output_inventory_subgraph=output_inventory_subgraph, output_removable_subgraph=output_removable_subgraph, output_pruned_removable_subgraph=output_pruned_removable_subgraph, ) if dry_run == "stop-before-recovery-bundle": click.echo(f"We would remove {len(removable_swhids)} objects:") for swhid in removable_swhids: click.echo(f" - {swhid}") ctx.exit(0) if dry_run is None: click.confirm( click.style( f"Proceed with removing {len(removable_swhids)} SWHIDs?", fg="yellow", bold=True, ), abort=True, ) remover.create_recovery_bundle( secret_sharing=secret_sharing, removable_swhids=removable_swhids, recovery_bundle_path=recovery_bundle, removal_identifier=identifier, reason=reason, expire=expire.astimezone() if expire else None, ) except RemoverError as e: click.secho(e.args[0], err=True, fg="red") ctx.exit(1) if dry_run == "stop-before-removal": click.echo("Stopping before removal.") ctx.exit(0) try: remover.remove() except Exception as e: click.secho(str(e), err=True, fg="red", bold=True) remover.restore_recovery_bundle() ctx.exit(1) @alter_cli_group.group(name="recovery-bundle", context_settings=CONTEXT_SETTINGS) @click.pass_context def recovery_bundle_cli_group(ctx): """Recovery bundle related tools.""" return ctx @recovery_bundle_cli_group.command(name="info") @click.option( "--dump-manifest", is_flag=True, default=False, help="Show raw manifest in YAML format.", ) @click.option( "--show-encrypted-secrets", is_flag=True, default=False, help="Show encrypted secrets.", ) @click.argument( "recovery-bundle", type=click.Path(exists=True, dir_okay=False, readable=True), required=True, ) @click.pass_context def info(ctx, recovery_bundle, dump_manifest, show_encrypted_secrets) -> None: """Display the manifest of the given recovery bundle.""" from .recovery_bundle import RecoveryBundle bundle = RecoveryBundle(recovery_bundle) if dump_manifest: click.echo(bundle.dump_manifest(), nl=False) ctx.exit() title = f"Recovery bundle “{bundle.removal_identifier}”" click.echo(title) click.echo("=" * len(title)) click.echo("") click.echo(f"Created: {bundle.created.isoformat()}") if bundle.reason: lines = bundle.reason.rstrip().split("\n") lines[0] = f"Reason: {lines[0]}" click.echo("\n ".join(lines)) if bundle.expire: click.echo(f"Expire: {bundle.expire}") click.echo("List of SWHID objects:") for swhid in bundle.swhids: click.echo(f"- {swhid}") click.echo("Secret share holders:") for share_id in sorted(bundle.share_ids): click.echo(f"- {share_id}") if show_encrypted_secrets: click.echo(bundle.encrypted_secret(share_id)) def _share_decryption_keys_provider(share_ids: Set[str]) -> ShareDecryptionKeys: import subprocess import sys from .recovery_bundle import list_yubikey_identities for attempt in range(1, 10): if not any(share_id.startswith("YubiKey") for share_id in share_ids): # No shares require a YubiKey, so there is nothing we can do here break try: for share_id, secret_key in list_yubikey_identities(): if share_id not in share_ids: continue share_ids.remove(share_id) click.echo( "🔧 Decrypting share using " f"{click.style(share_id, fg='magenta', bold=True)}…" ) click.echo("💭 You might need to tap the right YubiKey when it blinks.") yield share_id, secret_key click.echo() except subprocess.CalledProcessError as ex: if "age-plugin-yubikey" not in ex.cmd[0]: raise click.echo( f"""💥 {click.style('age-plugin-yubikey failed to ' 'list connected YubiKeys.', bold=True, fg='red')}""" ) click.echo("💭 Please disconnect all YubiKeys and retry.") sys.exit(1) if share_ids: yubikey_ids = list(sorted(share_ids)) if len(yubikey_ids) > 1: yubikeys = ", ".join( click.style(share_id, fg="magenta", bold=True) for share_id in yubikey_ids[:-1] ) yubikeys += " or " + click.style( yubikey_ids[-1], fg="magenta", bold=True ) else: yubikeys = click.style(yubikey_ids[0], fg="magenta", bold=True) click.prompt( f"🔐 Please insert {yubikeys} and press " f"{click.style('Enter', fg='green', bold=True)}…", default="Ok", show_default=False, hide_input=True, prompt_suffix="", ) click.echo( f"""💥 {click.style('Unable to decrypt enough shared secrets to recover ' 'the object decryption key. Aborting.', bold=True, fg='red')}""" ) sys.exit(1) def _print_decrypted_mnemonic(mnemonic: str, share_id: Optional[str] = None) -> None: fmt_from = "" if share_id: fmt_from = f" from {click.style(share_id, fg='magenta', bold=True)}" click.echo(f"🔑 Recovered shared secret{fmt_from}:") # Quoting from SLIP-0039: This construction yields a beneficial # property where the random identifier and the iteration exponent # transform into the first two words of the mnemonic code, so the user # can immediately tell whether the correct shares are being combined, # i.e. they have to have the same first two words. Moreover, the third # word encodes the group index, group threshold and part of the group # count. Since the group threshold and group count are constant, all # **shares belonging to the same group start with the same three words**. words = mnemonic.split() click.echo( " ".join( click.style(word, fg="blue", bold=index < 3) for index, word in enumerate(words) ) ) def _recover_mnemonics_from_identity_files( manifest, share_ids, identity_files, show_decrypted_mnemonics ): from .recovery_bundle import WrongDecryptionKey, age_decrypt_from_identity # As we can’t know which identity file corresponds to which encrypted shared # secret, we have to try them all and see which one we can actually decrypt. recovered = {} for identity_file in identity_files: for share_id in share_ids: try: recovered[share_id] = age_decrypt_from_identity( identity_file, manifest.decryption_key_shares[share_id] ).decode("us-ascii") if show_decrypted_mnemonics: _print_decrypted_mnemonic(recovered[share_id], share_id) except WrongDecryptionKey: pass return recovered
[docs] def prompting_object_decryption_key_provider( manifest, known_mnemonics=None, identity_files=None, show_decrypted_mnemonics=False ) -> str: import functools from .recovery_bundle import recover_object_decryption_key_from_encrypted_shares decrypted_mnemonic_processor = None if show_decrypted_mnemonics: decrypted_mnemonic_processor = _print_decrypted_mnemonic share_ids = set(manifest.decryption_key_shares.keys()) # Normalize known_mnemonics known_mnemonics = list(known_mnemonics or []) if identity_files: recovered = _recover_mnemonics_from_identity_files( manifest, share_ids, identity_files, show_decrypted_mnemonics ) share_ids.difference_update(recovered.keys()) known_mnemonics.extend(recovered.values()) yubikey_share_ids = set( share_id for share_id in share_ids if share_id.startswith("YubiKey") ) missing_ids = share_ids - yubikey_share_ids if missing_ids: fmt_ids = ", ".join( click.style(share_id, fg="magenta", bold=True) for share_id in missing_ids ) click.echo( f"""\n🚸 {click.style('The following secret shares will not be ' 'decrypted:', fg='yellow')} {fmt_ids}\n""" ) return recover_object_decryption_key_from_encrypted_shares( manifest.decryption_key_shares, functools.partial(_share_decryption_keys_provider, yubikey_share_ids), decrypted_mnemonic_processor=decrypted_mnemonic_processor, known_mnemonics=known_mnemonics, )
[docs] def get_object_decryption_key_provider(ctx) -> ObjectDecryptionKeyProvider: import functools secrets = ctx.params.get("secret") identity_files = ctx.params.get("identity") object_decryption_key_provider: ObjectDecryptionKeyProvider = functools.partial( prompting_object_decryption_key_provider, known_mnemonics=secrets, identity_files=identity_files, ) decryption_key = ctx.params.get("decryption_key") if decryption_key: if not decryption_key.lower().startswith("age-secret-key-"): ctx.fail( "The given decryption key does not look like a decryption key. " "It should start with “AGE-SECRET-KEY-”" ) def known_key_provider(_): return decryption_key object_decryption_key_provider = known_key_provider return object_decryption_key_provider
[docs] class ContentSWHID(click.ParamType): name = "swhid of a content object"
[docs] def convert(self, value, param, ctx): from swh.model.swhids import ExtendedObjectType, ExtendedSWHID, ValidationError try: swhid = ExtendedSWHID.from_string(value) except ValidationError: self.fail(f"expected SWHID, got {value!r}", param, ctx) if swhid.object_type != ExtendedObjectType.CONTENT: self.fail("We can only extract data for Content objects", param, ctx) return swhid
@recovery_bundle_cli_group.command(name="extract-content") @click.option( "-o", "--output", type=click.File("wb"), metavar="FILE", required=True, help="write data to FILE", ) @click.option( "--decryption-key", metavar="AGE_SECRET_KEY", help="use the given decryption key instead of the bundle shared secrets", ) @click.option( "-s", "--secret", metavar="MNEMONIC", multiple=True, help="Known shared secret. May be repeated.", ) @click.option( "-i", "--identity", metavar="IDENTITY", type=click.Path(exists=True, readable=True, dir_okay=False), multiple=True, help="Path to file with age identities. May be repeated.", ) @click.argument( "recovery-bundle", type=click.Path(exists=True, dir_okay=False, readable=True), required=True, ) @click.argument( "SWHID", type=ContentSWHID(), required=True, ) @click.pass_context def extract_content( ctx, output, recovery_bundle, swhid, decryption_key=None, identity=None, secret=None, ) -> None: """Extract data from content stored in a recovery bundle.""" from .recovery_bundle import RecoveryBundle, WrongDecryptionKey secret_key_provider = get_object_decryption_key_provider(ctx) bundle = RecoveryBundle(recovery_bundle, secret_key_provider) if str(swhid) not in bundle.swhids: click.secho( f"“{swhid}” is not in the recovery bundle", err=True, fg="red", bold=True ) ctx.exit(1) try: bundle.write_content_data(swhid, output) except WrongDecryptionKey: click.secho( f"Wrong decryption key for this bundle ({bundle.removal_identifier})", err=True, fg="red", bold=True, ) ctx.exit(2) @recovery_bundle_cli_group.command(name="restore") @click.option( "--decryption-key", metavar="AGE_SECRET_KEY", help="use the given decryption key instead of the bundle shared secrets", ) @click.option( "-s", "--secret", metavar="MNEMONIC", multiple=True, help="Known shared secret. May be repeated.", ) @click.option( "-i", "--identity", metavar="IDENTITY", type=click.Path(exists=True, readable=True, dir_okay=False), multiple=True, help="Path to file with age identities. May be repeated.", ) @click.argument( "recovery-bundle", type=click.Path(exists=True, dir_okay=False, readable=True), required=True, ) @click.pass_context def restore( ctx, recovery_bundle, decryption_key=None, identity=None, secret=None ) -> None: """Restore a recovery bundle to Software Heritage archive.""" from .recovery_bundle import RecoveryBundle, WrongDecryptionKey conf = ctx.obj["config"] from swh.storage import get_storage restoration_storage = get_storage(**conf["restoration_storage"]) secret_key_provider = get_object_decryption_key_provider(ctx) bundle = RecoveryBundle(recovery_bundle, secret_key_provider) try: # XXX: we could use click.progressbar here results = bundle.restore(restoration_storage) except WrongDecryptionKey: click.echo( f"Wrong decryption key for this bundle ({bundle.removal_identifier})" ) ctx.exit(2) click.echo( "Restoration complete. Results: \n" f"- Content objects added: {results['content:add']}\n" f"- Total bytes added to objstorage: {results['content:add:bytes']}\n" f"- SkippedContent objects added: {results['skipped_content:add']}\n" f"- Directory objects added: {results['directory:add']}\n" f"- Revision objects added: {results['revision:add']}\n" f"- Release objects added: {results['release:add']}\n" f"- Snapshot objects added: {results['snapshot:add']}\n" f"- Origin objects added: {results['origin:add']}\n" ) def _strip_rage_report(output): # rage prompts for report when it errors like this: # [ Did rage not do what you expected? Could an error be more useful? ] # [ Tell us: https://str4d.xyz/rage/report ] # This can be confusing in our case so strip them from the output. return b"\n".join( line for line in output.split(b"\n") if not line.startswith(b"[") and not line.endswith(b"]") ) @recovery_bundle_cli_group.command(name="recover-decryption-key") @click.option( "-s", "--secret", metavar="MNEMONIC", multiple=True, help="Known shared secret. May be repeated.", ) @click.option( "-i", "--identity", metavar="IDENTITY", type=click.Path(exists=True, readable=True, dir_okay=False), multiple=True, help="Path to file with age identities. May be repeated.", ) @click.option( "--show-recovered-secrets", is_flag=True, default=False, help="Show recovered shared secrets. Useful for remote/distributed recoveries.", ) @click.argument( "recovery-bundle", type=click.Path(exists=True, dir_okay=False, readable=True), required=True, ) def recover_decryption_key( recovery_bundle, secret, identity, show_recovered_secrets ) -> None: """Recover the decryption key using shared secrets.""" import subprocess import sys from .recovery_bundle import RecoveryBundle def object_decryption_key_provider(*args, **kwargs): kwargs["known_mnemonics"] = list(secret) kwargs["identity_files"] = list(identity) kwargs["show_decrypted_mnemonics"] = show_recovered_secrets return prompting_object_decryption_key_provider(*args, **kwargs) try: bundle = RecoveryBundle(recovery_bundle, object_decryption_key_provider) decryption_key = bundle.object_decryption_key click.echo( f"\n🔓 Recovered decryption key:\n{click.style(decryption_key, bold=True)}" ) except subprocess.CalledProcessError as ex: if "rage" not in ex.cmd[0] and ex.cmd[1] != "--decrypt": raise click.echo( f"""💥 {click.style('rage decryption failed:', bold=True, fg='red')}""" ) click.echo(_strip_rage_report(ex.stderr)) sys.exit(1) @recovery_bundle_cli_group.command(name="rollover") @click.option( "--decryption-key", metavar="AGE_SECRET_KEY", help="use the given decryption key instead of the bundle shared secrets", ) @click.option( "-s", "--secret", metavar="MNEMONIC", multiple=True, help="Known shared secret. May be repeated.", ) @click.option( "-i", "--identity", metavar="IDENTITY", type=click.Path(exists=True, readable=True, dir_okay=False), multiple=True, help="Path to file with age identities. May be repeated.", ) @click.argument( "recovery-bundles", metavar="[RECOVERY_BUNDLE]…", type=click.Path(exists=True, dir_okay=False, readable=True), nargs=-1, ) @click.pass_context def rollover( ctx, recovery_bundles, decryption_key=None, identity=None, secret=None ) -> None: """Rollover recovery bundles to new shared secrets.""" conf = ctx.obj["config"] from .recovery_bundle import RecoveryBundle, SecretSharing, WrongDecryptionKey secret_key_provider = get_object_decryption_key_provider(ctx) secret_sharing = SecretSharing.from_dict(conf["recovery_bundles"]["secret_sharing"]) click.secho("New shared secret holders:") for share_id in sorted(secret_sharing.share_ids): click.echo(f"- {click.style(share_id, fg='magenta', bold=True)}") click.confirm( click.style( "Proceed with rolling over the shared secrets?", fg="yellow", bold=True, ), abort=True, ) for recovery_bundle in recovery_bundles: bundle = RecoveryBundle(recovery_bundle, secret_key_provider) # Ensure that we can decrypt at least some objects with the provided key try: origin = list(bundle.origins()) assert len(origin) > 0, "Oops! No Origin objects in this recovery bundle." except WrongDecryptionKey: click.secho( f"Wrong decryption key for this bundle ({bundle.removal_identifier})", err=True, fg="red", bold=True, ) ctx.exit(2) bundle.rollover(secret_sharing) click.secho("Shared secrets for ", fg="green", nl=False) click.secho(bundle.removal_identifier, fg="green", bold=True, nl=False) click.secho(" have been rolled over.", fg="green")