Laravel のキューの処理を並行並列にし 4 倍高速化しました

こんにちは!トラーナのエンジニアの @m3m0r7 です。 弊社では Laravel 6 を利用していて、重たい処理は Swoole のキューと Amazon SQS と分けて処理するようにしています。

キューの課題

Laravel の queue:listen を使用していたのですが、Amazon SQS をキュードライバとして使用している場合、キューが 1 つずつしか処理できず、systemd を用いて複数起動するようにしてはいたものの、パフォーマンスに課題がありました。 queue:listen は内部的には queue:work を呼んでいるようで、いわゆる supervisor のような存在なのですが、この queue:work の数を増やす方法が提供されていなようです。 ちなみに queue:work は一度キューを拾うと、実行が終了するような仕組みです。

Laravel の Illuminate\Queue\Console\ListenCommand は以下のようなパラメータしか引き受けていないことがわかります。

<?php
protected $signature = 'queue:listen
                        {connection? : The name of connection}
                        {--delay=0 : The number of seconds to delay failed jobs}
                        {--force : Force the worker to run even in maintenance mode}
                        {--memory=128 : The memory limit in megabytes}
                        {--queue= : The queue to listen on}
                        {--sleep=3 : Number of seconds to sleep when no job is available}
                        {--timeout=60 : The number of seconds a child process can run}
                        {--tries=1 : Number of times to attempt a job before logging it failed}';

Laravel Horizon というものもあるのですが残念ながら Amazon SQS には対応しておらず Redis のみの対応となっています。

また、他にもパフォーマンスにも課題がありました。社内システムの Elasticsearch への書き込みが情報量のある複数のインデックスを更新するため、とても重く(1 回あたり 10 秒程度かかる)、同期的に待っているとレスポンスのパフォーマンスが悪くなってしまうので、弊社ではキューを使って課題の解決をしています。

最終的に 複数の queue:work プロセスを並行並列管理するような仕組みを自作することにしました。

プロセスの破棄と生成

PHP には多重化するために stream_select と呼ばれる関数と、プロセスの双方向通信を実現する proc_open というのがあるので、これを利用して実装をしました。

proc_open で queue:work のプロセスを指定したプロセス数だけ起動し queue:work が終了したら、再度同様にプロセスを立ち上げるようにしています。

<?php
$processorSize = (int) $this->option('processors');

// ...

for ($i = 0; $i < $processorSize; ++$i) {
   // プロセスが実行済み(もしくは死んでいる)か判定
    if (!$this->isAlive($i)) {
        // 念の為終了処理
        $this->closeProcess($i);

        // プロセスの作成
        $this->createProcess($i);
    }
}

isAlive は書き込み用のストリーム(stdout)、エラー用のストリーム(stderr)、プロセス本体のリソースのいずれかが動作終了もしくは EOF に到達している場合は false を返すようになっています。

<?php
protected function isAlive(int $index): bool
{
    if (!isset($this->processors[$index])) {
        return false;
    }

    [$resource, $out, $err] = $this->processors[$index];

    $isResourcesAll = is_resource($out) && is_resource($err) && is_resource($resource);

    // いずれかがリソースではなくなっているものがある場合
    if (!$isResourcesAll) {
        return false;
    }

    $outMeta = stream_get_meta_data($out);
    $errMeta = stream_get_meta_data($err);
    $resourceMeta = proc_get_status($resource);

    return $outMeta['eof'] === false
        && $errMeta['eof'] === false
        && $resourceMeta['running'] === true;
}

そして closeProcess でプロセスやストリームの終了をさせます。すでに終了しているプロセスやストリームもあるので、終了できるかどうか分岐してあげる必要があります。

<?php
protected function closeProcess(int $index): void
{
    if (!isset($this->processors[$index])) {
        return;
    }

    [$resource, $out, $err] = $this->processors[$index];

    if (is_resource($out)) {
        fclose($out);
    }

    if (is_resource($err)) {
        fclose($err);
    }

    if (is_resource($resource)) {
        proc_close($resource);
    }

    unset($this->processors[$index]);
}

次に実際にプロセスを起動させるところを記述します。

<?php
protected function createProcess(int $index): void
{
    $resource = proc_open(
        // queue:work を実行するためのコマンド
        [
            'php',
            'artisan',
            'queue:work',
            'sqs',
            '--queue=' . config('queue.connections.sqs.queue'),
            '--memory=256',
        ],
        [
            // stdout
            1 => ['pipe', 'w'],

            // stderr
            2 => ['pipe', 'w'],
        ],
        $pipes,
        getcwd(),
        // (1)
        array_merge(
            $_ENV,
            [
                'CACHE_DRIVER' => 'array',
            ],
        )
    );

    // それぞれノンブロッキングモードにします。
    stream_set_blocking($pipes[1], false);
    stream_set_blocking($pipes[2], false);

    $this->processors[$index] = [$resource, $pipes[1], $pipes[2]];
}

(1) … 弊社では Laravel から Redis への接続のためのドライバそのものを Swoole に対応させるために自作しています。その際に $_ENV['CACHE_DRIVER'] に custom-redis-driver のように値が入ってきます。 proc_open は 第 5 パラメータを省略した場合親プロセスの$_ENV をそのまま引き継ぐ仕様になっています。 しかし、Swoole 対応のドライバはコルーチン内もしくはスケジューラ内でしか動作しないため queue:work にそのまま渡すと、プロセスが一瞬で終了してしまう問題がありました。

Illuminate\Queue\Worker の getTimestampOfLastQueueRestart() で $this->cache->get(...) と呼び出しているところがあるのですが、ここが正常に実行できていないためです。これは queue:work 起動時に呼ばれるdaemon($connectionName, $queue, WorkerOptions $options) 内で呼び出されています。

Redis にわざわざ繋いで保存する必要もないので array で渡してあげるようにして解決しました。

本当は [...$_ENV, 'CACHE_DRIVER' => 'array'] と書きたかったのですが、この記法は PHP8.1 からなので、未だ使えず。 なお、弊社のチームリードが最近 PHP8.0 へアップグレードしてくれました!

多重化の対応

そして、次に多重化の対応です。複数のプロセスを createProcess で起動したものを stream_select で取りまとめて処理をします。 実は Symfony に多重化に対応したものとして Symfony\Component\Process\Process があるのですが、 PHP8.0 への移行の際、Swoole との相性が悪いことが判明しており、あえて stream_select で実装する方針としました。

stream_select は要約すると監視対象のストリームに書き込み等があれば、参照渡しとしている引数を変更があったものだけに絞ってくれる、といったものです。つまりどのストリームに書き込みがあったのか、ウォッチすることができるので、多重化の際には重宝します。 stream_select 自体は変更のあったストリームの数を返り値として返してくれます。

そこで createProcess で使っている proc_open から生えた stdoutstderr を監視するようにすればいいのです。

<?php

$outs = array_column(
    $this->processors,
    1
);
$errs = array_column(
    $this->processors,
    2
);

$changes = stream_select(
    $outs,
    $in,
    $errs,
    0,
    200000
);

上記のようにすると $outs または $errs に変更があった時に $changes が 1 以上になり、何かしらの処理を行うことできるようになります。 また $changes はエラー発生時やタイムアウト時に 0 や false を返すこともあるので、その場合は、スキップするようにしてあげればよいです。

<?php

foreach ($outs as $out) {
    $index = $this->getIndexByResource(
        $out,
        1
    );

    if ($index === null) {
        continue;
    }
    $this->processOut($out, $index);
}

foreach ($errs as $out) {
    $index = $this->getIndexByResource(
        $out,
        2
    );
    if ($index === null) {
        continue;
    }
    $this->processErr($out, $index);
}

processOut と processErr はそれぞれ、以下のようになっています。

<?php

protected function processOut($resource, int $index): void
{
    [$pid, $message] = $this->process($resource, $index);

    if ($message === '') {
        return;
    }

    $this->getOutput()->writeln(
        "[PID: {$pid}] {$message}"
    );
}

protected function processErr($resource, int $index): void
{
    [$pid, $message] = $this->process($resource, $index);
    if ($message === '') {
        return;
    }

    $this->getOutput()->writeln(
        "[PID: {$pid}] {$message}"
    );

    \Log::error($message);

    $this->closeProcess($index);
}

isAlive から、stream_select 、出力までの上記の処理をひっくるめて、イベントループ(while(true) { ... })にしてあげると、常にプロセスの生死を見つつ、多重化の実装ができます。

どれくらい早くなったのか

キューを処理しているサーバーのプロダクションが c5.xlarge で vCPU が 4、ステージングが t3.large で vCPU 2 なのですが、ステージングで本番の倍以上のキューの処理数を実現しているため 4 倍ほどとなりました。

本当はプロセッサアフィニティ等も考えられると、より効率の良い処理になるのですが、まぁそこまでやる必要も一旦ないかなと…。

このような、取り組みをしている弊社に興味があればぜひカジュアル面談からいかがでしょうか!

 

herp.careers

PHP 7.4-&gt;8.0にバージョンアップしました~Swoole 4.8.2を添えて~

@watarukuraです。 社内向け管理画面のMadrasPHPバージョンを7.4から8.0へメジャーバージョンアップしました。 苦労話を共有させてください。

PHP7.4->8.0移行計画

まず、ドキュメントを書きました。 これにより、なんでこれをやってるんだっけ、ということを明確にします。

f:id:watarukura:20211125101346p:plain
esa

 

公式ドキュメントを熟読します。 下位互換性のない変更点

↓こんな移行作戦を立てました。

  1. ローカル開発用のDocker環境がPHP8.0で動くようにする
  2. ローカルでのテストが全部通るようにする
  3. CI用のDockerイメージをPHP8.0でも作って、手動でCIを回してテストが全部通るようにする
  4. 通常のCIはPHP7.4で動作させておき、本番deploy後にPHP8.0イメージでCIを回す
  5. stg環境にPHP8.0を導入して動作検証する
  6. 本番移行手順を作る
  7. 本番移行する

ポイント

PHP7.4とPHP8.0でcomposer.jsonを共通化する

  "require": {
    "php": "^7.4||^8.0",

毎回composer.jsonを修正して検証用ブランチにpushしてGitHub Actionsのworkflow_dispatchで手動実行していたのです。(修正しないとcomposer validateで落ちる) 上記の修正後は検証用の専用ブランチではなくdefaultブランチで検証できるようになったので、本番へのdeploy後にGitHub Actionsのworkflow_run機能でPHP8.0用のCIを回すところが自動化できました。

演算子は、致命的なエラー を隠さなくなりました

急にテストでHTTPステータス400エラーを期待している箇所で500エラーが出るようになりました。 ログを見ると、array_keys(): Argument #1 ($array) must be of type array, null given というエラーが出ています。

$hasX = @count(array_keys($hash['x'])) !== 1;

なるほど。

- $hasX = @count(array_keys($hash['x'])) !== 1;
+ $hasX = count(array_keys($hash['x'] ?? [])) !== 1;

\Symfony\Component\Process\Processは生きてるのに\Swoole\Coroutineは死んでる

これが一番のハマりどころで、移行が1週間遅れました...。

バッチ処理の高速化のために\Symfony\Component\Process\Processを使ってマルチプロセスで実行している箇所があるのですが、$process->isRunning()がtrueを返すのにSwooleのCoroutineがWARNING swoole_signalfd_event_callback(): read from signalfd failed, Error: Resource temporarily unavailable[11]って警告を吐いて死んでしまっていました。

(余談ですが、signalfdってsignal用のファイルディスクリプタなんですね。初めて知りました Man page of SIGNALFD)

原因として推測したのは、PHP8.0ではexit()は例外と同じ挙動になるようで、この辺りはSwooleでもIssueがいくつか上がっていました。 移行を始めた時点はSwoole4.8.1を使用していたのですが、バグにハマっている間に4.8.2がでており、ReleaseNoteをみると、Fixed cannot exit directly when a fatal error occurs in PHP8 environmentとの記述が! しめしめとバージョンアップしてみたものの、残念ながら同じ警告がでました...。

SwooleのソースもPHPのソースも読んでも何もわからん(CとC++を読んで原因までたどり着ける力量がない)ので、ワークアラウンドとしてtimeoutを設定するようにしました。 これで、異常時は検知できるので、拾ってリトライすれば良さそうです。

  $process = new Process(['ls', '-lsa']);
+ $process->setTimeout(3600);
  $process->start();

  while ($process->isRunning()) {
      // waiting for process to finish
+    // check if the timeout is reached
+    $process->checkTimeout();

      usleep(200000);
  }

  echo $process->getOutput();

まとめ

2021/10/01から開始したPHP8.0移行プロジェクトは、本日11/26の動作検証を以て完了となります。 PHP8.0に起因していない障害(cronの記述ミス...)こそあったものの、日次のバッチ処理は正常に稼働しており、まずは成功と言えるのではないでしょうか。

さて、昨日11/25に待望のPHP8.1が来ました! Fibersにより、いよいよPHPで非同期処理が公式サポートされるようになります。 PHP8.0移行プロジェクトの終了と同時に、PHP8.1移行プロジェクトの幕開けとなりました。 (その前にLaravel6->8移行プロジェクトもあるのですが...) では、次のプロジェクト完了報告でお会いしましょう...!

プロダクト開発部マネージャー兼テックリードからプロダクト開発部マネージャーへ

みなさん、こんにちは!プロダクト開発部マネージャーの @m3m0r7 です。 10 月 1 日時点で、実はプロダクト開発部所属のエンジニアが 13 人ほどとなり、プロダクト開発部の組織編成そのものを考え直す機会が訪れました。


プロダクト開発部自体は、エンジニア正社員第 1 号の私ともうひとり業務委託の方がいるエンジニア 2 名体制から昨年 2020 の 4 月にスタートしました。 もともと何から整備すればいいのか、何をしたらテクニカルな最適化が行えるのか、組織最適化などの前提条件が一切土台がないフェーズからのスタートでした。

私の前に居た前任のエンジニアが作った SaaS 上のシステムで既存のオペレーションが回っていたのですが、日が経つにつれて、その SaaS はバンドルされたファイルしか残っていなかったなどいくつかの理由で掌握できないシステムと化し、最終的にメンテナンス不能な状態にまで陥りました。

詳しくは以下のスライドをご覧ください。

speakerdeck.com

ここから、組織が掌握できるシステムへ移行するために何をするべきかロードマップを敷き、そして既存のオペレーション改善など推し進めるためエンジニアの採用を加速させ、 昨年から各種イベント、例えば PHP カンファレンスなどへ積極的にスポンサードしていくようになりました。

採用がうまくいかない月もあり、歯がゆい時期もありましたが、メンテナンス不能なものを「MADRAS」と呼ばれる社内システムへの リプレイスを推し進めるために、プロダクト開発の傍らにエンジニアを募り、新しくジョイン、そして信じて付いてきてくれたエンジニアたちの助力もあり、今年 2021 の 7 月にようやく 1 年ほどの月日をかけ、祈願のシステムリプレイスを果たしました。 システムリプレイスに至る過程は次の機会に記事にまとめられたらと思います。

MADRAS への移行インタビューは以下です。

note.com

そして、昨年はあまり見えていなかった会社として本当はやりたいことなどが、システムリプレイスを伴って出てくるようになってきました。 やりたいことに対して、圧倒的にエンジニアが足りなさすぎる、常々そう思いながらプロダクト開発をしている傍らで採用活動を続け、今では 13 人ほどの組織規模となってきました。


前置きは以上にして本題に入ります。もともとプロダクト開発部では「フロントエンドチーム」と「バックエンドチーム」の大きな括りで採用と組織運用を続けていたのですが、ある時不都合をいくつか感じるようになりました。

それは、テックリード業を兼任していた私にメンションが集中してしまう私自身が単一障害点になってしまっていたことです。私は、組織運用として誰かの意思によって過剰に左右される、または停滞するような組織設計は健全ではないと考えていた中で、相反する状況だなと感じてきました。

個人ではなくチームが意思決定をすべきであり、我々はチームで行動をすべきという考えのもと、このままでは良くないと考え組織設計を見直すことにしました。

スキルの得意領域で分断するのではなく、プロダクトに対する専門性を軸にチームそれぞれの得意なドメインを作ることで、チームが意思決定し、チームが行動できるように編成を行いました。

f:id:m3m0r7:20211029085428j:plain

いま現状、プラットフォーム開発セクションが主軸となっていますが、ここから来年 2022 には実際にお客様に提供するサービスを設計・開発・運用する「ユーザーサービス開発セクション」を設立する予定です。

そして、私自身は組織設計、採用活動、技術投資・予算に係る業務に主軸を置くためテックリード業から離れ、現場は設立したチームのチームリードに一任して、プロダクト開発部マネージャー兼テックリードからプロダクト開発部マネージャーへと役職転換しました。

急成長を続ける組織の中で、どのようにうまく立ち回り、エンジニアが働きやすい環境を作れるか、どのように組織設計をすればプロダクト開発が円滑に回るかなど、常々熟考しています。

チームとしての能力が最大限に発揮されるように試行錯誤を重ねている組織で働いてみたいエンジニアを、積極採用中です!

PHP カンファレンス 2021 へスポンサード &amp;&amp; 登壇しました

みなさま、こんにちは! プロダクト開発部マネージャーの @m3m0r7 です。

スポンサード

弊社では PHP カンファレンス 2021 でゴールドスポンサーとしてスポンサードさせていただいておりました。

f:id:m3m0r7:20211004164720p:plain

去年から、本格的に各 PHP 系のイベントでスポンサードしており、通算 3 回目となりました。 そして、来年開催される PHPerKaigi にもスポンサード予定です。

カンファレンスなどのテック系イベントに対してスポンサードすることでコミュニティへの貢献を少しでも実現するために、スポンサードの取り組みを続けています。これは、昨年エンジニア組織を立ち上げる際に決めた取り組みの一つでもあります。

登壇について

Laravel でも非同期処理が扱える!? PHP 8 から初める非同期処理 〜Laravel Octane〜 で登壇させていただきました。

speakerdeck.com

弊社の使用している技術として Laravel と Swoole があります。 トーク中に少しお話したのですが、開発途中で Laravel Octane が発表されたりしたものの、当時 PHP 7.4 で開発をしており PHP 8 ではなかったので、laravel-swoole のまま継続して開発をしていました。

そして、弊社のプロダクト開発部では Laravel と Swoole を組み合わせて使うことに関して多くのナレッジを得ることができたのではないかと思っています。実はスライドに書いていない内容がまだあるほど、お伝えしたいナレッジがたくさんありました。例えば Laravel 向けの MySQL ドライバーの自作の話であるとかです。完全にお伝えするには 25 分では足りなかったと思っています。

寄稿について

実は PHP カンファレンス 2021 への登壇前に、Laravel Octane についてより実践的に記載したものをソフトウェアデザインに寄稿しているので、ぜひ見ていただければと思います。

www.amazon.co.jp

おわりに

弊社代表の @sdx_ の登壇と、スポンサーツアー、そして私の登壇を見てくださった皆様、本当にありがとうございました。 また、PHP カンファレンスを運営していただいたスタッフのみなさま、本当にお疲れさまでした。

ちなみに、弊社ではプロダクトをより良くしていくために、フロントエンド、バックエンド問わずエンジニアを絶賛募集中です!