Python Tips: 外部コマンドを実行して出力をリアルタイムに取得したい

Python で外部コマンドを実行してその出力(標準出力・標準エラー)をリアルタイムにキャプチャする方法についてです。

「 Python で外部コマンド実行」といえばまっさきに思いつくのは標準ライブラリの subprocess ですが、(私の理解が正しければ) subprocess を使うとコマンドが終了するまでその出力を Python でキャプチャできません。 外部コマンドの出力をある程度リアルタイムに Python でキャプチャしたい場合は asynciosubprocess 系の機能が有用です subprocess 。 具体的には asyncio.create_subprocess_exec() または asyncio.create_subprocess_shell() を使用します。

追記 2022/01/24

この方法で stdout.readline()stderr.readline() を使用すると、まれに while ループを抜けられないことがあります。 Python 公式ドキュメントにも次のような警告が書かれています。

Warning: Use the communicate() method rather than process.stdin.write(), await process.stdout.read() or await process.stderr.read. This avoids deadlocks due to streams pausing reading or writing and blocking the child process.

参考にされる際はご注意ください。

次のコードは Python 公式の asyncio.create_subprocess_exec() のサンプルコード(こちら)を少し変更したものです。

asyncio_run.py:

import asyncio
import sys


async def run(program: str, args: list[str]) -> None:
    """外部コマンドを実行する"""
    proc = await asyncio.create_subprocess_exec(
        program,
        *args,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    while True:
        if proc.stdout.at_eof() and proc.stderr.at_eof():
            break

        stdout = (await proc.stdout.readline()).decode()
        if stdout:
            print(f'[stdout] {stdout}', end='', flush=True)
        stderr = (await proc.stderr.readline()).decode()
        if stderr:
            print(f'[sdterr] {stderr}', end='', flush=True, file=sys.stderr)

        await asyncio.sleep(1)

    await proc.communicate()

    print(f'{program} {" ".join(args)} exited with {proc.returncode}')


asyncio.run(run('sh', ['./never-ending-script.sh']))

このスクリプトを実行すると、カレントディレクトリにある never-ending-script.sh を実行し、その出力を 1 秒間隔で Python でキャプチャしてから出力します。

たとえば never-ending-script.sh に次の内容を書き込んで実行すると挙動を確認することができます。

echo 'started.'
sleep 5
echo 'in progress.'
sleep 5
echo 'finished.'

ポイントは、 await asyncio.create_subprocess_exec() で生成した proc に対して proc.communicate() をすぐに実行して完了まで待機するのではなく、 proc.stdout proc.stderr をチェックするループを回すことです。 proc.stdout.at_eof()proc.stderr.at_eof() の両方が True であれば出力がなくなった(≒処理が終了した)とみなすことができます。 このようにすることで、実行に時間がかかる処理を実行した場合でも途中経過を Python 側でウォッチできます。

ちなみに、もし標準出力と標準エラーを Python 側でキャプチャする必要がなければ(=目視確認でよければ)書くべきコードは非常にシンプルです。 asyncio を使う場合は次のような感じで書けます(し、わざわざ asyncio を使わずとも subprocess で十分です)。

asyncio_run_2.py:

import asyncio
import sys


async def run2(program: str, args: list[str]) -> None:
    """外部コマンドを実行する(出力のキャプチャは行わない)"""
    proc = await asyncio.create_subprocess_exec(program, *args)
    await proc.communicate()
    print(f'{program} {" ".join(args)} exited with {proc.returncode}')


asyncio.run(run2('sh', ['./never-ending-script.sh']))

ちなみに、コマンドの出力をキャプチャしてさらに終了ステータスも取得したい場合は、もう少し長いコードを書く必要があります。 一連のロジックをラップしたクラスを書くと、たとえば次のような感じになるでしょうか。

asyncio_run_3.py:

import asyncio


class Runner:
    """外部コマンドを実行する"""

    def __init__(self, program, args, interval=1):
        self.program = program
        self.args = args
        self.interval = interval
        self.proc = None

    async def start(self):
        """コマンドの実行を開始する"""
        self.proc = await asyncio.create_subprocess_exec(
            self.program,
            *self.args,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )

    async def stream(self):
        """実行中のコマンドの出力(標準出力・標準エラー)を返す async generator"""
        while True:
            if self.proc.stdout.at_eof() and self.proc.stderr.at_eof():
                break

            stdout = await self.proc.stdout.readline()
            stderr = await self.proc.stderr.readline()
            yield stdout.decode(), stderr.decode()

            await asyncio.sleep(self.interval)

    async def wait(self):
        """コマンドが終了するまで待機してリターンコードを返す"""
        if self.proc is None:
            return None

        await self.proc.communicate()
        return self.proc.returncode

この Runner クラスは次のようにして使用します。

import sys


async def main():
    runner = Runner('sh', ['./never-ending-script.sh'])

    # コマンドを実行する
    await runner.start()

    # コマンドの出力を取得して標準出力・標準エラーに流す
    async for stdout, stderr in runner.stream():
        if stdout:
            print(f'[stdout] {stdout}', end='', flush=True)
        if stderr:
            print(f'[sdterr] {stderr}', end='', flush=True, file=sys.stderr)

    # コマンドの終了を待つ
    returncode = await runner.wait()
    print(f'return code is {returncode}')


asyncio.run(main())

この場合 Runner.stream() が async generator なので async for を使って中身を取り出すことができます。 Runner インスタンスの利用には async キーワードを使う必要があるため、 async 関数 main() を定義して asyncio.run(main()) で実行しています。

ということで、Python で外部コマンドを実行してその出力をリアルタイムに取得する方法についてでした。

動くサンプルを GitHub Gist に置いたので興味のある方はそちらもご覧ください。

Python: Stream output of asyncio.create_subprocess_exec() · GitHub


  1. 標準出力・標準エラーを Python でキャプチャせずにそのまま流せばよい場合は subprocess が使えます。