Source code for

# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

r"""This module implements a shell-like command pipeline system in

Pipelines are built like this:

>>> from import Command, Sink
>>> (
...     Command.echo("foo")
...     | Command.zstdmt()
...     |"-", Command.echo("bar") | Command.zstdmt())
...     | Command.zstdcat()
...     > Sink()
... ).run()

which is the equivalent of this bash command:

.. code-block:: bash

    echo foo \
    | zstdmt \
    | cat - <(echo bar | zstdmt) \
    | zstdcat

:class:`Sink` is mainly meant for tests; it causes ``.run()`` to return
the stdout of the last process.

Actual pipelines will usually write to a file instead, using
:class:`AtomicFileSink`. This calls is similar to ``>`` in bash,
with a twist: it is only written after all other commands in the pipeline
succeeded (but unlike ``sponge`` from moreutils, it buffers to disk and
rename the file at the end).

from __future__ import annotations

import dataclasses
import functools
import logging
import os
from pathlib import Path
import shlex
import signal
import subprocess
from typing import Any, Dict, List, NoReturn, Optional, Tuple, TypeVar, Union

    from luigi import LocalTarget
except ImportError:

    class LocalTarget:  # type: ignore
        """Placeholder for ``luigi.LocalTarget`` if it could not be imported"""


logger = logging.getLogger(__name__)

  <appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">
      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} %msg%n</pattern>

  <root level="debug">
    <appender-ref ref="STDERR" />
"""Overrides the default config, to log to stderr instead of stdout"""

[docs] class CommandException(Exception): def __init__(self, command, returncode): super().__init__(f"{command[0]} returned: {returncode}") self.command = command self.returncode = returncode
_PROC = Path("/proc/") """:file:`/proc/`""" _CGROUP_ROOT = Path("/sys/fs/cgroup/") """Base path of the cgroup filesystem"""
[docs] @functools.lru_cache(1) def base_cgroup() -> Optional[Path]: """Returns the cgroup that should be used as parent for child processes. As `cgroups with children should not contain processes themselves <>`_, this is the parent of the cgroup this process was started in. """ import atexit if not _CGROUP_ROOT.is_dir():"%s is not mounted", _CGROUP_ROOT) return None proc_cgroup_path = _PROC / str(os.getpid()) / "cgroup" if not proc_cgroup_path.is_file():"%s does not exist", proc_cgroup_path) return None my_cgroup = proc_cgroup_path.read_text().strip() if not my_cgroup.startswith("0::/"): # logger.warning("Process was started in %s which is not a cgroupv2", my_cgroup) return None # this is the cgroup that contains the current process, plus whatever process # spawned it (eg. pytest or bash); and neither cgroupv2 nor systemd allows a cgroup # to both contain processes itself and have child cgroups; so we have to use the # parent cgroup as root for the cgroups we are going to create. original_cgroup_path = _CGROUP_ROOT / my_cgroup[4:] if original_cgroup_path == _CGROUP_ROOT: # Running directly in the root cgroup, so there is no parent. # TODO: this means we are running in a container, so this is probably the only # process in the cgroup, we could try moving it to a child cgroup. return None assert (original_cgroup_path.parent / "cgroup.procs").read_text().strip() == "" # create a cgroup that will encapsulate both the "" cgroup and # all the children base_cgroup_path = create_cgroup( f"swh.graph@{os.getpid()}", original_cgroup_path.parent, add_suffix=False ) if base_cgroup_path is None: return None assert (base_cgroup_path / "cgroup.procs").read_text().strip() == "" for controller in ("cpu", "memory"): try: with (base_cgroup_path / "cgroup.subtree_control").open("wt") as f: f.write(f"+{controller}\n") except OSError as e: logger.warning( "Failed to enable %r controller for %s: %s", controller, base_cgroup_path, e, ) def cleanup(): # Clean up the base cgroup we created base_cgroup_path.rmdir() atexit.register(cleanup) return base_cgroup_path
_num_child_cgroups = 0
[docs] def create_cgroup( base_name: str, parent: Optional[Path] = None, add_suffix: bool = True ) -> Optional[Path]: global _num_child_cgroups parent = parent or base_cgroup() if parent is None: return None if add_suffix: name = f"{base_name}@{_num_child_cgroups}" _num_child_cgroups += 1 else: name = base_name new_cgroup_path = parent / name try: new_cgroup_path.mkdir() except OSError as e: logger.warning("Failed to create %s: %s", new_cgroup_path, e) return None return new_cgroup_path
[docs] def move_to_cgroup(cgroup: Path, pid: Optional[int] = None) -> bool: """Returns whether the process was successfully moved.""" if pid is None: pid = os.getpid() try: with (cgroup / "cgroup.procs").open("at") as f: f.write(f"{pid}\n") except OSError as e: logger.warning("Failed to move process to %s: %s", cgroup, e) cgroup.rmdir() return False else: return True
class _MetaCommand(type): def __getattr__(self, name): return functools.partial(Command, name)
[docs] class Command(metaclass=_MetaCommand): """Runs a command with the given name and arguments. ``**kwargs`` is passed to :class:`subprocess.Popen`.""" def __init__(self, *args: Union[str, Path], **kwargs): self.args = args self.kwargs = dict(kwargs) self.preexec_fn = self.kwargs.pop("preexec_fn", lambda: None) self.cgroup = create_cgroup(str(args[0]).split("/")[-1]) def _preexec_fn(self): if self.cgroup is not None: move_to_cgroup(self.cgroup) self.preexec_fn() def _run(self, stdin, stdout) -> _RunningCommand: pass_fds = [] children = [] final_args = [] for arg in self.args: if isinstance(arg, (Command, Pipe)): # command stdout piped to a non-stdin FD (r, w) = os.pipe() pass_fds.append(r) final_args.append(f"/dev/fd/{r}") children.append(arg._run(None, w)) os.close(w) elif isinstance(arg, LocalTarget): final_args.append(arg.path) elif isinstance(arg, Path): final_args.append(str(arg)) else: final_args.append(arg) proc = subprocess.Popen( final_args, stdin=stdin, stdout=stdout, pass_fds=pass_fds, preexec_fn=self._preexec_fn, **self.kwargs, ) return _RunningCommand(self, proc, children, self.cgroup)
[docs] def run(self) -> None: self._run(None, None).wait()
def __or__(self, other: Union[Command, Pipe]) -> Pipe: """``self | other``: pipe self's stdout to other's stdin""" if isinstance(other, Command): return Pipe([self, other]) elif isinstance(other, Pipe): return Pipe([self, *other.children]) else: raise NotImplementedError( f"{self.__class__.__name__} | {other.__class__.__name__}" ) def __str__(self) -> str: return f"{' '.join(shlex.quote(str(arg)) for arg in self.args)}" def _cleanup(self) -> None: if self.cgroup is not None: self.cgroup.rmdir()
[docs] class Java(Command): def __init__( self, *args: Union[str, Path], max_ram: Optional[int] = None, conf: Optional[Dict[str, Any]] = None, ): import tempfile from .config import check_config conf = dict(conf or {}) if max_ram: conf["max_ram"] = max_ram conf = check_config(conf) assert conf is not None # for mypy self.logback_conf = tempfile.NamedTemporaryFile( prefix="logback_", suffix=".xml" ) self.logback_conf.write(LOGBACK_CONF) self.logback_conf.flush() java_tool_options = [ f"-Dlogback.configurationFile={}", conf["java_tool_options"], ] env = { **os.environ.copy(), "JAVA_TOOL_OPTIONS": " ".join(java_tool_options), "CLASSPATH": conf["classpath"], } super().__init__("java", *args, env=env) def _cleanup(self) -> None: self.logback_conf.close() super()._cleanup()
[docs] class Rust(Command): def __init__( self, bin_name, *args: Union[str, Path], conf: Optional[Dict[str, Any]] = None, env: Optional[Dict[str, str]] = None, ): from .config import check_config conf = dict(conf or {}) conf = check_config(conf) assert conf is not None # for mypy env = env or dict(os.environ) path = env.get("PATH") if path: env["PATH"] = f"{path}:{conf['rust_executable_dir']}" else: env["PATH"] = conf["rust_executable_dir"] super().__init__(bin_name, *args, env=env)
class _RunningCommand: def __init__( self, command: Command, proc: subprocess.Popen, running_children: List[Union[_RunningCommand, _RunningPipe]], cgroup: Optional[Path], ): self.command = command self.proc = proc self.running_children = running_children self.cgroup = cgroup def stdout(self): return self.proc.stdout def is_alive(self) -> bool: return self.proc.poll() is None def wait(self) -> List[RunResult]: results = [] try: self.proc.wait() results.append( RunResult( command=tuple(map(str, self.command.args)), cgroup=self.cgroup, cgroup_stats={ p.read_text().strip() for p in (self.cgroup.iterdir() if self.cgroup else []) if"cpu.", "memory.", "io.", "pids.")) # exclude writeable files (they are for control, not statistics) and p.stat().st_mode & 0o600 == 0o400 }, ) ) self.command._cleanup() if self.proc.returncode not in (0, -int(signal.SIGPIPE)): raise CommandException(self.command.args, self.proc.returncode) for child in self.running_children: results.extend(child.wait()) except BaseException: self.kill() raise return results def kill(self) -> None: for child in self.running_children: child.kill() if self.proc.returncode is not None: self.proc.kill()
[docs] class Pipe: def __init__(self, children: List[Union[Command, Pipe]]): self.children = children def _run(self, stdin, stdout) -> _RunningPipe: read_pipes: List[Any] = [stdin] write_pipes: List[Any] = [] for _ in range(len(self.children) - 1): (r, w) = os.pipe() read_pipes.append(os.fdopen(r, "rb")) write_pipes.append(os.fdopen(w, "wb")) write_pipes.append(stdout) running_children = [ child._run(r, w) for (r, w, child) in zip(read_pipes, write_pipes, self.children) ] return _RunningPipe(self, running_children)
[docs] def run(self) -> None: self._run(None, None).wait()
def __or__(self, other) -> Pipe: if isinstance(other, Pipe): return Pipe([*self.children, *other.children]) elif isinstance(other, Command): return Pipe([*self.children, other]) else: raise NotImplementedError( f"{self.__class__.__name__} | {other.__class__.__name__}" ) def __str__(self) -> str: children = "\n| ".join(map(str, self.children)) return f"( {children}\n)"
[docs] def wc(source: Union[Command, Pipe], *args: str) -> int: return int((source | Command.wc(*args) > Sink()).run().strip())
class _RunningPipe: def __init__( self, pipe: Pipe, children: List[Union[_RunningCommand, _RunningPipe]] ): self.pipe = pipe self.children = children def stdout(self): return self.children[-1].stdout() def is_alive(self) -> bool: return all(child.is_alive() for child in self.children) def wait(self) -> List[RunResult]: results = [] try: for child in self.children: results.extend(child.wait()) except BaseException: self.kill() raise return results def kill(self) -> None: for child in self.children: child.kill() TSink = TypeVar("TSink", bound="_BaseSink") class _BaseSink: def __init__(self) -> None: self.source_pipe: Union[None, Command, Pipe] = None def _run(self, stdin, stdout) -> NoReturn: raise TypeError(f"{self.__class__.__name__} must be the end of a pipeline.") def __lt__(self: TSink, other: Union[Command, Pipe]) -> TSink: """``other > self``""" if isinstance(other, (Command, Pipe)): if self.source_pipe is not None: raise TypeError(f"{self!r} is already piped to {self.source_pipe!r}") self.source_pipe = other return self else: raise NotImplementedError( f"{other.__class__.__name__} > {self.__class__.__name__}" )
[docs] class Sink(_BaseSink): """Captures the final output instead of sending it to the process' stdout"""
[docs] def run(self) -> bytes: if self.source_pipe is None: raise TypeError("AtomicFileSink has no stdin") source = self.source_pipe._run(stdin=None, stdout=subprocess.PIPE) chunks = [] while True: new_chunk = source.stdout().read(10240) if not new_chunk and not source.is_alive(): break chunks.append(new_chunk) source.wait() return b"".join(chunks)
[docs] class AtomicFileSink(_BaseSink): """Similar to ``> path`` at the end of a command, but writes only if the whole command succeeded.""" def __init__(self, path: Union[Path, LocalTarget]): super().__init__() if isinstance(path, LocalTarget): path = Path(path.path) self.path = path
[docs] def run(self) -> None: if self.source_pipe is None: raise TypeError("AtomicFileSink has no stdin") tmp_path = Path(f"{self.path}.tmp") if tmp_path.exists(): tmp_path.unlink() tmp_fd ="wb") running_source = self.source_pipe._run(stdin=None, stdout=tmp_fd) try: running_source.wait() except BaseException: tmp_fd.close() tmp_path.unlink() raise else: tmp_fd.close() tmp_path.replace(self.path)
def __str__(self) -> str: return f"{self.source_pipe} > AtomicFileSink({self.path})"
[docs] @dataclasses.dataclass class RunResult: cgroup: Optional[Path] command: Tuple[str, ...] cgroup_stats: Dict[str, str]