swh.graph.luigi.shell module#

This module implements a shell-like command pipeline system in pure-Python.

Pipelines are built like this:

>>> from swh.graph.luigi.shell import Command, Sink
>>> (
...     Command.echo("foo")
...     | Command.zstdmt()
...     | Command.cat("-", Command.echo("bar") | Command.zstdmt())
...     | Command.zstdcat()
...     > Sink()
... ).run()

which is the equivalent of this bash command:

echo foo \
| zstdmt \
| cat - <(echo bar | zstdmt) \
| zstdcat

Sink is mainly meant for tests; it causes .run() to return the stdout of the last process.

Actual pipelines will usually write to a file instead, using AtomicFileSink. This calls is similar to > in bash, with a twist: it is only written after all other commands in the pipeline succeeded (but unlike sponge from moreutils, it buffers to disk and rename the file at the end).

swh.graph.luigi.shell.LOGBACK_CONF = b'<configuration>\n  <appender name="STDERR" class="ch.qos.logback.core.ConsoleAppender">\n    <target>System.err</target>\n    <encoder>\n      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} %msg%n</pattern>\n    </encoder>\n  </appender>\n\n  <root level="debug">\n    <appender-ref ref="STDERR" />\n  </root>\n</configuration>\n'#

Overrides the default config, to log to stderr instead of stdout

exception swh.graph.luigi.shell.CommandException[source]#

Bases: Exception

class swh.graph.luigi.shell.Command(*args: str, **kwargs)[source]#

Bases: object

Runs a command with the given name and arguments. **kwargs is passed to subprocess.Popen.

run() None[source]#
class swh.graph.luigi.shell.Java(*args: str, max_ram: Optional[int] = None)[source]#

Bases: Command

class swh.graph.luigi.shell.Pipe(children: List[Union[Command, Pipe]])[source]#

Bases: object

run() None[source]#
swh.graph.luigi.shell.wc(source: Union[Command, Pipe], *args: str) int[source]#
class swh.graph.luigi.shell.Sink[source]#

Bases: _BaseSink

Captures the final output instead of sending it to the process’ stdout

run() bytes[source]#
class swh.graph.luigi.shell.AtomicFileSink(path: Union[Path, LocalTarget])[source]#

Bases: _BaseSink

Similar to > path at the end of a command, but writes only if the whole command succeeded.

run() None[source]#