swh.graph.shell module#

This module implements a shell-like command pipeline system in pure-Python.

Pipelines are built like this:

>>> from swh.graph.shell import Command, Sink
>>> (
...     Command.echo("foo")
...     | Command.zstdmt()
...     | Command.cat("-", Command.echo("bar") | Command.zstdmt())
...     | Command.zstdcat()
...     > Sink()
... ).run()
b'foo\nbar\n'

which is the equivalent of this bash command:

echo foo \
| zstdmt \
| cat - <(echo bar | zstdmt) \
| zstdcat

Sink is mainly meant for tests; it causes .run() to return the stdout of the last process.

Actual pipelines will usually write to a file instead, using AtomicFileSink. This calls is similar to > in bash, with a twist: it is only written after all other commands in the pipeline succeeded (but unlike sponge from moreutils, it buffers to disk and rename the file at the end).

swh.graph.shell.LOGBACK_CONF = b'<configuration>\n  <appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">\n    <target>System.err</target>\n    <encoder>\n      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} %msg%n</pattern>\n    </encoder>\n  </appender>\n\n  <root level="debug">\n    <appender-ref ref="STDERR" />\n  </root>\n</configuration>\n'#

Overrides the default config, to log to stderr instead of stdout

exception swh.graph.shell.CommandException(command, returncode)[source]#

Bases: Exception

swh.graph.shell.base_cgroup() Path | None[source]#

Returns the cgroup that should be used as parent for child processes.

As cgroups with children should not contain processes themselves, this is the parent of the cgroup this process was started in.

swh.graph.shell.create_cgroup(base_name: str, parent: Path | None = None, add_suffix: bool = True) Path | None[source]#
swh.graph.shell.move_to_cgroup(cgroup: Path, pid: int | None = None) bool[source]#

Returns whether the process was successfully moved.

class swh.graph.shell.Command(*args: str | Path | LocalTarget, check: bool = True, **kwargs)[source]#

Bases: object

Runs a command with the given name and arguments. **kwargs is passed to subprocess.Popen.

If check is True (the default), raises an exception if the command returns a non-zero exit code.

run() None[source]#
class swh.graph.shell.Rust(bin_name, *args: str | Path | LocalTarget, conf: Dict[str, Any] | None = None, env: Dict[str, str] | None = None)[source]#

Bases: Command

class swh.graph.shell.Pipe(children: List[Command | Pipe])[source]#

Bases: object

run() None[source]#
swh.graph.shell.wc(source: Command | Pipe, *args: str) int[source]#
class swh.graph.shell.Sink[source]#

Bases: _BaseSink

Captures the final output instead of sending it to the process’ stdout

run() bytes[source]#
class swh.graph.shell.AtomicFileSink(path: Path | LocalTarget)[source]#

Bases: _BaseSink

Similar to > path at the end of a command, but writes only if the whole command succeeded.

run() None[source]#
class swh.graph.shell.RunResult(cgroup: 'Optional[Path]', command: 'Tuple[str, ...]', cgroup_stats: 'Dict[str, str]')[source]#

Bases: object

cgroup: Path | None#
command: Tuple[str, ...]#
cgroup_stats: Dict[str, str]#