概要
ソーシャル経済メディア「NewsPicks」SREチームの中川です。
皆さんはバッチ処理基盤はどうされていますでしょうか。 NewsPicks では少し前まではそれらをEC2、cronの組み合わせで動作させていました。
何年も前からこの仕組みだったのですがSREとしてはEC2の面倒見るのも手間ですし、それ以上にcronを変更する際のオペレーションミスが目立ったのが懸念点でした。
その為、まずはAWSマネージド化するための基盤を整備し、その後バッチアプリを載せ替えていくようにしました。
対応前の基盤構成
同じSREチームの安藤さんが CloudNative Days Tokyo 2023 で登壇されたときの資料をお借りします。 ご覧の通り、大体のサービスはマネージド化していましたがバッチ基盤だけは旧来のままEC2インスタンスを利用していました。
10年モノのサービスのインフラを漸進的に改善する、頑張りすぎないクラウドネイティブ | ドクセル
対応後の基盤構成
これも上記資料からスライドをお借りします。 マネージド化対応後はEC2を廃止し、StepFunctions + ECS on Fargate + EventBridge Rule に載せ替えることができました。
EventBridge Ruleでcron定義できるのでこれをトリガーとしてStepFunctionsを起動し、その中でECS RunTask を実行することでバッチ処理が実行できます。
実装
それではサービス毎に見ていきますが、NewsPicks ではIaC化を推進しておりマネージド化の際には AWS CDK(Typescript) を用いてコード化していきましたので適宜それらも例示しながら解説していきます。
ECS
定時バッチ用のタスク定義を作成
もともと定時バッチについては共通実行ファイルを持っており、実行の際は引数を渡して各々のバッチを呼び出せる様になっていました。 バッチ側には極力変更を入れないようにするため、ECSタスクとして実行するときも同じようにバッチ毎に実行コマンドを引数として渡せる設計にしました。
それ以外は他のアプリケーションと同様にタスク定義を作ってるだけです。 タスク定義ではCPU/Memory、コマンド引数、JAVA_OPTS用の環境変数とかを個別に渡せる様にしております。
StepFunctions
ステートマシン定義
以下の様に定義しています。
ロジックとしては以下の3つに大別できるので解説していきます。
- 二重起動防止
- ECS RunTask
- エラーハンドリング
1. 二重起動防止
定時バッチには複数起動するとバグに繋がるものがあるので基本的には一つだけしか動かない様にしています。
Describe Scheduled Task
ではDescribeTasks API を使って、動かそうとしているバッチが既に起動しているかの結果を取得します。
Verify Describe Task
では上記結果を見て既に起動していれば何もせず終了し、起動していなければ実行に入ります。
2. ECS RunTask
StateMachine で ECS RunTask を呼び出すには CustomState を使いました。 普通はL2コンストラクトで用意されているEcsRunTaskクラスを使えば良いのですが、執筆時点では既存ECSタスク定義をそのまま使おうとするのはできないみたいですね。
Note: this must be TaskDefinition, and not ITaskDefinition, as it requires properties that are not known for imported task definitions If you want to run a RunTask with an imported task definition, consider using CustomState
なので CustomState を使う事にします。パラメータを柔軟に指定できるのは良いのですが記述量が多くなるのが難点でした。 その一部を以下に示します。
const runTaskStateParam = { Type: "Task", Resource: "arn:aws:states:::ecs:runTask.sync", ResultPath: "$.results.ECSRunTask", TimeoutSeconds: defaultTimeoutHours * 60 * 60, // ECS起因で失敗したときはリトライ処理させる Retry: [ { ErrorEquals: ["ECS.AmazonECSException", "ECS.ServerException"], IntervalSeconds: 60, MaxAttempts: 3, }, ], // StateMachine の中で発生したエラーは失敗ステートに遷移させる Catch: [ { ErrorEquals: ["States.~~~"], Next: failedState.id, ResultPath: "$.results.ECSRunTaskFailed", }, ], Parameters: { LaunchType: "FARGATE", // 中略(ECSのデプロイ設定) PropagateTags: "TASK_DEFINITION", // バッチ個別の設定をオーバライドする Overrides: { Cpu: task.cpu, Memory: task.memory, ContainerOverrides: [ { Name: "batch", // バッチ名(タスク名)を指定し起動させる、引数ある場合はそれも渡す Command: [task.name, ...(task.args ?? [])], }, ], }, }, }; const RunTaskState = new sfn.CustomState(this, "ECS RunTask.", { stateJson: runTaskStateParam, });
要点だけ絞って話すと
Resource: "arn:aws:states:::ecs:runTask.sync",
では同期的にタスクを起動してるのがわかります。
その後の Timeout, Retry, Catch は後述するので飛ばして Parameters
では実行パラメータを指定していきます。
実行はFargateで PropagateTags
ではタスク定義にあるタグをECSタスクに伝播させています。
Overrides
ではバッチ処理による個別設定があればそれを上書きしていきます。
具体的にはコマンド引数、使用リソース量とかですね。
最後に new sfn.CustomState
でステートを定義してます。
3. エラーハンドリング
まず気になるのが Parse Exit Code.
って何をしているのかですが、これは ECS RunTask のコンテナ毎の終了コードを見ています。
バッチ処理では1つのタスクで複数コンテナを起動しているので、それらコンテナの ExitCode
が 0 で終わっているかを見ています。
const ParseExitCodeState = new sfn.Pass(this, "Parse Exit Code.", { resultPath: "$.results.parseExitCode", parameters: { // areIncludeExitCodesZero は true が期待値 "areIncludeExitCodesZero.$": "States.ArrayContains($.results.ECSRunTask.Containers[*].ExitCode, 0)", // uniqueExitCodesArrayLength は 1 が期待値 "uniqueExitCodesArrayLength.$": "States.ArrayLength(States.ArrayUnique($.results.ECSRunTask.Containers[*].ExitCode))", }, });
Error Choice.
の部分では上記の結果を判定し、適切なステートへ遷移させます。
問題なければ Succeed
に飛んで処理終了です。
コンテナ終了コードだけだとエラーハンドルとして心許ないので、先述したRunTaskのエラー処理についてもみていきます。 肝となる部分は上記で示した、TimeoutSeconds、Retry、Catchの部分ですね。
TimeoutSeconds: defaultTimeoutHours * 60 * 60, // ECS起因で失敗したときはリトライ処理させる Retry: [ { ErrorEquals: ["ECS.AmazonECSException", "ECS.ServerException"], IntervalSeconds: 60, MaxAttempts: 3, }, ], // StateMachine の中で発生したエラーは失敗ステートに遷移させる Catch: [ { ErrorEquals: ["States.~~~"], Next: failedState.id, ResultPath: "$.results.ECSRunTaskFailed", }, ],
まずタイムアウトについては TimeoutSeconds
を指定していますが、これはECSタスクが正常終了せずに残存し続ける可能性があり、その場合でも StateMachine としてはタイムアウトで落ちて欲しいので明示してます。これはバッチ実行時間を計測されて個別に設定される方が良いかと思います。
Retry
についてはリトライする条件や試行回数などを指定できます。ここではECS起因のエラーは偶に起きるとの想定でキャッチしています。
Catch
についてはコメントに書いてある通り 任意のState句でキャッチしたとすれば失敗ステートへ遷移することになっています。
EventBridge
次はイベントルールを登録しましょう。 上記で定義した StateMachine に対してスケジュール登録するだけです。 これもCDKで良い感じにEventBridgeRuleを定義できます!
schedule
プロパティでは cronをベタ書きすることもできますが是非とも CronOptions を使うのをお薦めします。ベタ書きだと月曜と日曜どっちが0だっけとか、分/時間/日のフィールド間違えとか経験上 typo も結構ありましたのでそれを防止できます。
import * as events from "aws-cdk-lib/aws-events"; import * as targets from "aws-cdk-lib/aws-events-targets"; new events.Rule(this, "EventRule", { ruleName: "ruleName", schedule: events.Schedule.cron({ hour: "1", minute: "0" }), targets: [new targets.SfnStateMachine(this.stateMachine)], });
これで一通りの設定は完了です!
まとめ
これら基盤構築と、実際に処理を載せ替えていく工数は結構かかってしまいましたが、それ以上の恩恵は受けられています。 コスト的にはEC2をずっと立てる必要なく、スポットインスタンスも利用でき、タスク毎にCPU/Memory割り振れるなど柔軟に対応できるようになりました。
他には最初に述べた通り、EC2管理のトイル、オペレーションミスの根本的撲滅でしょうか。 またコード化したことで変更が容易になり、担当チームが自発的にバッチ処理改善に取り組んでくれることになったことも大きいです。
告知
NewsPicks ではエンジニアを募集中です!ご興味のある方はこちらまで。