Laravel のキューの処理を並行並列にし 4 倍高速化しました

こんにちは!トラーナのエンジニアの @m3m0r7 です。 弊社では Laravel 6 を利用していて、重たい処理は Swoole のキューと Amazon SQS と分けて処理するようにしています。

キューの課題

Laravel の queue:listen を使用していたのですが、Amazon SQS をキュードライバとして使用している場合、キューが 1 つずつしか処理できず、systemd を用いて複数起動するようにしてはいたものの、パフォーマンスに課題がありました。 queue:listen は内部的には queue:work を呼んでいるようで、いわゆる supervisor のような存在なのですが、この queue:work の数を増やす方法が提供されていなようです。 ちなみに queue:work は一度キューを拾うと、実行が終了するような仕組みです。

Laravel の Illuminate\Queue\Console\ListenCommand は以下のようなパラメータしか引き受けていないことがわかります。

<?php
protected $signature = 'queue:listen
                        {connection? : The name of connection}
                        {--delay=0 : The number of seconds to delay failed jobs}
                        {--force : Force the worker to run even in maintenance mode}
                        {--memory=128 : The memory limit in megabytes}
                        {--queue= : The queue to listen on}
                        {--sleep=3 : Number of seconds to sleep when no job is available}
                        {--timeout=60 : The number of seconds a child process can run}
                        {--tries=1 : Number of times to attempt a job before logging it failed}';

Laravel Horizon というものもあるのですが残念ながら Amazon SQS には対応しておらず Redis のみの対応となっています。

また、他にもパフォーマンスにも課題がありました。社内システムの Elasticsearch への書き込みが情報量のある複数のインデックスを更新するため、とても重く(1 回あたり 10 秒程度かかる)、同期的に待っているとレスポンスのパフォーマンスが悪くなってしまうので、弊社ではキューを使って課題の解決をしています。

最終的に 複数の queue:work プロセスを並行並列管理するような仕組みを自作することにしました。

プロセスの破棄と生成

PHP には多重化するために stream_select と呼ばれる関数と、プロセスの双方向通信を実現する proc_open というのがあるので、これを利用して実装をしました。

proc_open で queue:work のプロセスを指定したプロセス数だけ起動し queue:work が終了したら、再度同様にプロセスを立ち上げるようにしています。

<?php
$processorSize = (int) $this->option('processors');

// ...

for ($i = 0; $i < $processorSize; ++$i) {
   // プロセスが実行済み(もしくは死んでいる)か判定
    if (!$this->isAlive($i)) {
        // 念の為終了処理
        $this->closeProcess($i);

        // プロセスの作成
        $this->createProcess($i);
    }
}

isAlive は書き込み用のストリーム(stdout)、エラー用のストリーム(stderr)、プロセス本体のリソースのいずれかが動作終了もしくは EOF に到達している場合は false を返すようになっています。

<?php
protected function isAlive(int $index): bool
{
    if (!isset($this->processors[$index])) {
        return false;
    }

    [$resource, $out, $err] = $this->processors[$index];

    $isResourcesAll = is_resource($out) && is_resource($err) && is_resource($resource);

    // いずれかがリソースではなくなっているものがある場合
    if (!$isResourcesAll) {
        return false;
    }

    $outMeta = stream_get_meta_data($out);
    $errMeta = stream_get_meta_data($err);
    $resourceMeta = proc_get_status($resource);

    return $outMeta['eof'] === false
        && $errMeta['eof'] === false
        && $resourceMeta['running'] === true;
}

そして closeProcess でプロセスやストリームの終了をさせます。すでに終了しているプロセスやストリームもあるので、終了できるかどうか分岐してあげる必要があります。

<?php
protected function closeProcess(int $index): void
{
    if (!isset($this->processors[$index])) {
        return;
    }

    [$resource, $out, $err] = $this->processors[$index];

    if (is_resource($out)) {
        fclose($out);
    }

    if (is_resource($err)) {
        fclose($err);
    }

    if (is_resource($resource)) {
        proc_close($resource);
    }

    unset($this->processors[$index]);
}

次に実際にプロセスを起動させるところを記述します。

<?php
protected function createProcess(int $index): void
{
    $resource = proc_open(
        // queue:work を実行するためのコマンド
        [
            'php',
            'artisan',
            'queue:work',
            'sqs',
            '--queue=' . config('queue.connections.sqs.queue'),
            '--memory=256',
        ],
        [
            // stdout
            1 => ['pipe', 'w'],

            // stderr
            2 => ['pipe', 'w'],
        ],
        $pipes,
        getcwd(),
        // (1)
        array_merge(
            $_ENV,
            [
                'CACHE_DRIVER' => 'array',
            ],
        )
    );

    // それぞれノンブロッキングモードにします。
    stream_set_blocking($pipes[1], false);
    stream_set_blocking($pipes[2], false);

    $this->processors[$index] = [$resource, $pipes[1], $pipes[2]];
}

(1) … 弊社では Laravel から Redis への接続のためのドライバそのものを Swoole に対応させるために自作しています。その際に $_ENV['CACHE_DRIVER'] に custom-redis-driver のように値が入ってきます。 proc_open は 第 5 パラメータを省略した場合親プロセスの$_ENV をそのまま引き継ぐ仕様になっています。 しかし、Swoole 対応のドライバはコルーチン内もしくはスケジューラ内でしか動作しないため queue:work にそのまま渡すと、プロセスが一瞬で終了してしまう問題がありました。

Illuminate\Queue\Worker の getTimestampOfLastQueueRestart() で $this->cache->get(...) と呼び出しているところがあるのですが、ここが正常に実行できていないためです。これは queue:work 起動時に呼ばれるdaemon($connectionName, $queue, WorkerOptions $options) 内で呼び出されています。

Redis にわざわざ繋いで保存する必要もないので array で渡してあげるようにして解決しました。

本当は [...$_ENV, 'CACHE_DRIVER' => 'array'] と書きたかったのですが、この記法は PHP8.1 からなので、未だ使えず。 なお、弊社のチームリードが最近 PHP8.0 へアップグレードしてくれました!

多重化の対応

そして、次に多重化の対応です。複数のプロセスを createProcess で起動したものを stream_select で取りまとめて処理をします。 実は Symfony に多重化に対応したものとして Symfony\Component\Process\Process があるのですが、 PHP8.0 への移行の際、Swoole との相性が悪いことが判明しており、あえて stream_select で実装する方針としました。

stream_select は要約すると監視対象のストリームに書き込み等があれば、参照渡しとしている引数を変更があったものだけに絞ってくれる、といったものです。つまりどのストリームに書き込みがあったのか、ウォッチすることができるので、多重化の際には重宝します。 stream_select 自体は変更のあったストリームの数を返り値として返してくれます。

そこで createProcess で使っている proc_open から生えた stdoutstderr を監視するようにすればいいのです。

<?php

$outs = array_column(
    $this->processors,
    1
);
$errs = array_column(
    $this->processors,
    2
);

$changes = stream_select(
    $outs,
    $in,
    $errs,
    0,
    200000
);

上記のようにすると $outs または $errs に変更があった時に $changes が 1 以上になり、何かしらの処理を行うことできるようになります。 また $changes はエラー発生時やタイムアウト時に 0 や false を返すこともあるので、その場合は、スキップするようにしてあげればよいです。

<?php

foreach ($outs as $out) {
    $index = $this->getIndexByResource(
        $out,
        1
    );

    if ($index === null) {
        continue;
    }
    $this->processOut($out, $index);
}

foreach ($errs as $out) {
    $index = $this->getIndexByResource(
        $out,
        2
    );
    if ($index === null) {
        continue;
    }
    $this->processErr($out, $index);
}

processOut と processErr はそれぞれ、以下のようになっています。

<?php

protected function processOut($resource, int $index): void
{
    [$pid, $message] = $this->process($resource, $index);

    if ($message === '') {
        return;
    }

    $this->getOutput()->writeln(
        "[PID: {$pid}] {$message}"
    );
}

protected function processErr($resource, int $index): void
{
    [$pid, $message] = $this->process($resource, $index);
    if ($message === '') {
        return;
    }

    $this->getOutput()->writeln(
        "[PID: {$pid}] {$message}"
    );

    \Log::error($message);

    $this->closeProcess($index);
}

isAlive から、stream_select 、出力までの上記の処理をひっくるめて、イベントループ(while(true) { ... })にしてあげると、常にプロセスの生死を見つつ、多重化の実装ができます。

どれくらい早くなったのか

キューを処理しているサーバーのプロダクションが c5.xlarge で vCPU が 4、ステージングが t3.large で vCPU 2 なのですが、ステージングで本番の倍以上のキューの処理数を実現しているため 4 倍ほどとなりました。

本当はプロセッサアフィニティ等も考えられると、より効率の良い処理になるのですが、まぁそこまでやる必要も一旦ないかなと…。

このような、取り組みをしている弊社に興味があればぜひカジュアル面談からいかがでしょうか!

 

herp.careers