Laravel on ECSで動かすQueueとScheduleワーカー

f:id:kubotak:20220323214652p:plain

皆さんこんにちは。kubotak(@kubotak_public)です。

この記事ではLaravelをECS Fargateで動かす際のQueueとScheduleに関して、弊社で行った知見を紹介したいと思います。

Laravel on ECSに関しては以下の記事も是非どうぞ

※なお、本稿においてはLaravel8系を利用しています。(おそらくLaravel9系でも問題ありません)

ECS FargateでQueueを動かす

弊社ではもともとAWS ElasticBeanstalkのWorker環境(以下EB Worker)でQueueおよびScheduleを実行していました。
この環境を簡単に説明すると、EB WorkerはSQSと接続されていて、SQSにメッセージが送られるとEB Worker環境のlocalhostの指定したエンドポイントにPOSTでリクエストしてくれる仕組みを持っています。
この仕組を利用して、HTTPリクエストとしてQueueを処理できるライブラリによって実行していました。

詳しくは導入当初の記事を参照ください。

tech.macloud.jp

ECS Fargateでは類似の環境を用意することができないのでLaravelのArtisanによるQueueのデーモンコマンドを利用することにしました。

Queues - Laravel - The PHP Framework For Web Artisans

DockerコンテナのエントリーポイントでこのArtisanコマンドを実行するだけで良さそうですね。
しかし、意外な落とし穴がありました。

EB Workerで動かしていた際は、HTTPリクエストによる処理のためQueueが失敗した場合はステータスコード500のエラーとしてレスポンスを返し、それを受け取ったEB WorkerはQueueメッセージを消費せず可視性タイムアウトになり再度Queueに詰み直されるという挙動で動いています。
そしてSQS側のメッセージの保持数の上限、つまりリトライ回数を超えるとDead Letter Queue(以下DLQ)に送られる仕組みになっています。

f:id:kubotak:20220323213812j:plain

しかし、ArtisanコマンドではQueueが失敗した場合、Laravel独自のリトライ処理を経て、それでも実行できない場合はfailed_jobとして扱われるような仕組みになっています。
ここで重要なのは、Queueが失敗してfailed扱いになった場合に、SQSに対して該当のメッセージを削除するロジックが入っていることです。
つまりSQSから見た場合、メッセージは正常に処理されているものとして扱われます。

今までの運用方法を変えたくなかったため、引き続きDLQを利用した仕組みに乗せたいと思いました。
DLQにも再送の仕組みがあるので失敗したJobの再実行は可能ですし、死活監視などもDLQを対象にCloudWatch Alarmを仕込んでいるので引き続き同じ仕組みにしたい意図がありました。

そこで今回はQueueServiceProviderを独自に上書きして登録するように変更しました。

app/Providers/Sqs/QueueServiceProvider.php

<?php
declare(strict_types=1);

namespace App\Providers\Sqs;

use Illuminate\Queue\Failed\NullFailedJobProvider;
use Illuminate\Queue\QueueServiceProvider as PackageQueueServiceProvider;

class QueueServiceProvider extends PackageQueueServiceProvider
{
    protected function registerSqsConnector($manager): void
    {
        $manager->addConnector('sqs', function () {
            return new SqsConnector;
        });
    }


    protected function registerFailedJobServices(): void
    {
        // FYI SQSを利用する際はfailed_jobを利用しないでDead Letter Queueを使う
        $this->app->singleton('queue.failer', function () {
            return new NullFailedJobProvider;
        });
    }
}

IlluminateのQueueServiceProviderクラスを継承して上書きしたいメソッドのみ変更しています。
まず、SQSコネクタを独自のクラスに変更しています。
そしてfailed_jobが不要になるのでNullFailedJobProviderを利用すようにしています。

app/Providers/Sqs/SqsConnector.php

<?php
// 略
class SqsConnector extends PackageSqsConnector
{
    public function connect(array $config)
    {
        $config = $this->getDefaultConfiguration($config);

        if (! empty($config['key']) && ! empty($config['secret'])) {
            $config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);
        }

        return new SqsQueueWithDLQ(
            new SqsClient($config),
            $config['queue'],
            $config['prefix'] ?? '',
            $config['suffix'] ?? '',
            $config['after_commit'] ?? null
        );
    }
}

SQSコネクタもProvider同様にIlluminateのライブラリを継承して必要なメソッドのみ上書きします。
ここではSqsQueueWithDLQクラスを使うように変更しています。

app/Providers/Sqs/SqsQueueWithDLQ.php

<?php
// 略
class SqsQueueWithDLQ extends SqsQueue
{
    public function pop($queue = null)
    {
        $response = $this->sqs->receiveMessage([
            'QueueUrl' => $queue = $this->getQueue($queue),
            'AttributeNames' => ['ApproximateReceiveCount'],
        ]);

        if (! is_null($response['Messages']) && count($response['Messages']) > 0) {
            return new SqsJobWithDLQ(
                $this->container, $this->sqs, $response['Messages'][0],
                $this->connectionName, $queue
            );
        }
    }
}

app/Providers/Sqs/SqsJobWithDLQ.php

<?php
// 略
class SqsJobWithDLQ extends SqsJob
{
    public function fail($e = null): void
    {
        $this->markAsFailed();

        if ($this->isDeleted()) {
            return;
        }

        try {
            // FYI 失敗した場合SQSからメッセージを削除しないで可視性タイムアウトを待つ挙動にする
            // $this->delete();

            $this->failed($e);
        } finally {
            $this->resolve(Dispatcher::class)->dispatch(new JobFailed(
                $this->connectionName, $this, $e ?: new ManuallyFailedException
            ));
        }
    }
}

やりたかったことはSqsJobクラスのfailメソッドで$this->deleteを呼んでいる箇所を消したかっただけです。
これでQueue(Job)が失敗した場合にSQSメッセージを削除しない挙動になります。

最後にProviderの登録を独自のものに差し替えます。

config/app.php

<?php
// 略
// Illuminate\Queue\QueueServiceProvider::class, FYI 独自のProviderを使うためコメントアウト
App\Providers\Sqs\QueueServiceProvider::class,

ECS FargateでScheduleを動かす

LaravelのScheduleといえば任意の時間に特定のコマンドを実行してくれる便利な機能です。

Task Scheduling - Laravel - The PHP Framework For Web Artisans

この機能はphp artisan schedule:runコマンドをcronによって毎分実行し、登録された時間のコマンドを実行してくれるという使い方が一般的です。
しかしECS Fargateでコンテナ化する際に「cronも同梱させるのか?」「リリース時にいい感じに切り替わるのか?」とう懸念を覚えました。

実はこのScheduleの機能、php artisan schedule:workというデーモンコマンドも用意されています。
ドキュメントを見る限りでは開発時に使えるコマンドとして紹介していて、本番運用向けではなさそうです。
実際にコードを見てみるとQueueとは異なりSIGTERM(Linuxの終了シグナル)を検知して安全に終了する仕組みが入っていませんでした。
そのため、安全に切り替わるように独自のScheduleコマンドを作成しました。

<?php
declare(strict_types=1);

namespace App\Console\Commands;

use Carbon\CarbonImmutable;
use Illuminate\Console\Command;
use Symfony\Component\Process\Process;

/**
 * @see https://www.egeniq.com/blog/how-gracefully-stop-laravel-cli-command
 */
class GracefulScheduleWorkCommand extends Command
{
    protected $name = 'schedule:work-graceful';

    protected static $defaultName = 'schedule:work-graceful';

    protected $description = 'Start the schedule worker for graceful';

    private bool $run = true;

    /**
     * Execute the console command.
     *
     * @see https://github.com/laravel/framework/blob/9.x/src/Illuminate/Console/Scheduling/ScheduleWorkCommand.php
     * @return void
     */
    public function handle()
    {
        $this->info('Schedule worker started successfully.');

        [$lastExecutionStartedAt, $keyOfLastExecutionWithOutput, $executions] = [null, null, []];

        if ($this->supportsAsyncSignals()) {
            $this->listenForSignals();
        }

        // NOTE PHPプロセスが実行中もしくはタスクが実行中かどうか
        while (
            $this->run ||
            ! $this->canBeStopped($executions)
        ) {
            usleep(100 * 1000);

            $now = CarbonImmutable::now();
            if (
                $this->run &&
                $now->second === 0 &&
                ! $now->startOfMinute()->equalTo($lastExecutionStartedAt)
            ) {
                $executions[] = $execution = new Process([PHP_BINARY, 'artisan', 'schedule:run']);

                $execution->start();

                $lastExecutionStartedAt = CarbonImmutable::now()->startOfMinute();
            }

           // 略
        }
    }

    private function canBeStopped(array $executions): bool
    {
        /** @var Process $execution */
        foreach ($executions as $execution) {
            // NOTE 実行中のタスクがあれば止められない
            if ($execution->isRunning()) {
                $this->info($execution->getCommandLine() . ': running!');
                return false;
            }
        }
        $this->info('Executions is nothing.');
        return true;
    }

    private function listenForSignals(): void
    {
        pcntl_async_signals(true);

        pcntl_signal(SIGINT, [$this, 'shutdown']); // Call $this->shutdown() on SIGINT
        pcntl_signal(SIGTERM, [$this, 'shutdown']); // Call $this->shutdown() on SIGTERM

        $this->info('Ready to work gracefully.');
    }

    /**
     * @see https://github.com/laravel/framework/blob/9.x/src/Illuminate/Queue/Worker.php#L696-L699
     */
    private function supportsAsyncSignals(): bool
    {
        return extension_loaded('pcntl');
    }

    public function shutdown(): void
    {
        $this->info('Gracefully stopping worker...');

        // When set to false, worker will finish current item and stop.
        $this->run = false;
    }
}

処理自体はIlluminateのScheduleWorkCommandクラスとほぼ同じ実装になり、whileの条件でSIGTERM等を受け付けた場合にfalseになり緩やかに終了するようになっています。
とはいえ、runInBackgroundなどで実行されるコマンドであれば別プロセスで動くはずなのであまり意味はないかもしれませんが、cronで動かす場合でも同じことは言えるのではないかと思います。

移行してみて

今回は初めてECSでLaravelのWorker処理を動かすにあたって色々模索してみました。
似たような事例があまり見られなかったので他の方はどうやって運用しているのか気になります!

また、Queueワーカーに関して言うとSQSのメッセージ数に応じてスケールするようにオートスケール設定を入れたのでコンテナ化の恩恵を非常に感じていますが、Scheduleワーカーは常に高いスペックのコンテナ1台で動かしているのでこれだけEC2インスタンスのほうが良さそうだな・・・と思っている次第です。
LaravelをECSで運用している方、知見交換ぜひお願いします!

では今回の記事はここまで。