ハマログ

株式会社イーツー・インフォの社員ブログ

AWS CDKで作った定期的なタスク起動がたまに失敗するのでStep Functionsを導入したお話

昔々あるところに、毎日起動しているはずのAmazon ECSのタスクが起動しなかったことがありました。この記事ではその原因と対策について記します

TL;DR

要点は次の3つです。AWS CDKのサンプルコードは記事後半にあります。

  • AWS基盤のキャパシティ不足が原因で、Fargateのタスク起動に失敗することがある
  • EventBridgeから直接起動するときは、起動失敗時にもリトライされない
  • 対策として、AWS Step Functionsを使って失敗時にリトライするようにすると良い

 

背景とシステム概要

とあるシステムで毎朝8時に送信されるはずのメールがなぜか送信されない、ということがありました。今回は関係なかったアプリケーション部分も含めましたが、概略図がこちらです。

メール送信は、EventBridge SchedulerでAWS ECSのタスク起動を設定することで実現していました。AWS CDKを使っているので、実際にはScheduledFargateTaskクラスをベースにしたコードです。AWS マネジメントコンソールで「スケジュールされたタスク」を作成したときとほぼ同じはずです。ちなみにこのタスク起動については、今年の6月に次の記事を書いてます。コードもあるので、ご参考ください

AWS CDKを活用して既存Webサービスをコンテナ化・インフラ刷新した案件の事例紹介

原因

調べてみると、AWS CloudWatch Logsに該当するFargateのログが見当たりませんでした。そもそもバッチ処理が起動した形跡がなかったので、その旨をAWSのサポートに相談したところ、

結論としては、タスク起動時にの AWS 基盤側のキャパシティ不足が原因。回避策としてStep Functionsを使ってタスクを起動するとよい。

■ 原因について
CloudTrailのイベント履歴を見ると、RunTask APIの呼び出し履歴があり、”Capacity is unavailable at this time. Please try again later or in a different availability zone”というエラーが記録されている。一時的にキャパシティが不足し、タスクがプロビジョニングできなかったことを示している。

■ 対策について
この対策としては、Fargate タスク実行をリトライするように変更する。タスク実行に失敗したとしても、RunTask API の呼び出し自体は正常終了しているため EventBridge 側でのリトライは行われない。そのため、EventBridge ルールから直接 RunTask を呼び出す形を、EventBridge ルールから Step Functions を呼び出して、Step Functions から RunTask を呼び出す形に変更するとよい。

<参考資料>
Step Functions で Amazon ECS または Fargate タスクを管理する
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/connect-ecs.html

(外部サイト) Step Functions で ECS タスク(Fargate)を定期実行してみる
https://dev.classmethod.jp/articles/try-to-retry-ecs-tasks-using-step-functions/

という上記返答(要約)をいただきました。改めて調べてみると、この回答内容が的確だったので、案内いただいた Step Functions を使った方法に変更しました。

改善対応

EventBridgeからそのままタスク起動するだけだと稀に失敗することがあるため、間にStep Functionsを挟むことで失敗時にリトライできるようにします。対策の前後での構成図がこちらです。

この改善策をAWS CDK (v2)で実装すると、次のコードになりました。サンプルコードでは、alpine のコンテナイメージで echo "Hello" しているだけです。

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';

export class StepfunctionsSampleCdkStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const vpc = new cdk.aws_ec2.Vpc(this, 'VPC', {
      natGateways: 1,
      maxAzs: 2,
    });

    const fargateSecurityGroup = new cdk.aws_ec2.SecurityGroup(this, 'SecurityGroup', {
      vpc,
      allowAllOutbound: true,
    });

    const logGroup = new cdk.aws_logs.LogGroup(this, 'LogGroup', {
      retention: cdk.aws_logs.RetentionDays.ONE_WEEK,
    });

    const cluster = new cdk.aws_ecs.Cluster(this, 'Cluster', {
      vpc,
    });

    // ECSのタスク定義(適当なDockerイメージ)
    const taskDefinition = new cdk.aws_ecs.FargateTaskDefinition(this, 'TaskDefinition', {
      memoryLimitMiB: 512,
      cpu: 256,
    });
    taskDefinition.addContainer('app', {
      image: cdk.aws_ecs.ContainerImage.fromRegistry('public.ecr.aws/docker/library/alpine:latest'),
      command: ['echo', 'Hello World'],
      // コンテナからCloudWatch Logsにログを出力
      logging: cdk.aws_ecs.LogDrivers.awsLogs({
        streamPrefix: 'StepFunctionTest',
        logGroup,
      }),
    });

    // 定期起動でエラー時に実行するAWS Lambda
    const alertLambda = new cdk.aws_lambda.Function(this, 'AlertLambda', {
      runtime: cdk.aws_lambda.Runtime.NODEJS_20_X,
      handler: 'index.handler',
      code: cdk.aws_lambda.Code.fromInline(`
        exports.handler = async (event) => {
          // eventをパースしてSlackに通知などする。今回はログ出力のみ
          console.log(event);
          return;
        };
      `),
    });

    // EventBridge & Step Functions でタスクの定期起動を設定
    new ScheduledStepFunctionsFargateTask(this, 'ScheduledFargateTask', {
      ruleName: 'run-once-a-hour',
      schedule: cdk.aws_events.Schedule.expression('cron(0 * * * ? *)'),
      cluster,
      taskDefinition,
      containerOverrides: [
        {
          containerDefinition: taskDefinition.defaultContainer!,
          command: ['echo', 'This is scheduled task as run-once-a-hour'],
        },
      ],
      securityGroups: [fargateSecurityGroup],
      alertLambda,
    });
  }
}


type ScheduledStepFunctionsFargateTaskProps = {
  ruleName: string;
  schedule: cdk.aws_events.Schedule;
  enabled?: boolean;
  cluster: cdk.aws_ecs.ICluster;
  taskDefinition: cdk.aws_ecs.FargateTaskDefinition;
  containerOverrides: cdk.aws_stepfunctions_tasks.ContainerOverride[];
  securityGroups: cdk.aws_ec2.ISecurityGroup[];
  alertLambda: cdk.aws_lambda.Function;
};

class ScheduledStepFunctionsFargateTask extends Construct {
  constructor(scope: Construct, id: string, props: ScheduledStepFunctionsFargateTaskProps) {
    super(scope, id);
    const { ruleName, schedule, enabled, cluster, taskDefinition, containerOverrides, securityGroups, alertLambda } = props;

    // Step Functionsを定義
    const runTask = new cdk.aws_stepfunctions_tasks.EcsRunTask(this, 'RunTask', {
      cluster,
      taskDefinition,
      containerOverrides,
      subnets: {
        subnetType: cdk.aws_ec2.SubnetType.PRIVATE_WITH_EGRESS, // プライベートサブネットで実行
      },
      launchTarget: new cdk.aws_stepfunctions_tasks.EcsFargateLaunchTarget({
        platformVersion: cdk.aws_ecs.FargatePlatformVersion.VERSION1_4,
      }),
      resultPath: "$.RunTask",
      securityGroups,
      integrationPattern: cdk.aws_stepfunctions.IntegrationPattern.RUN_JOB, // ".sync"
    });

    const definition = runTask.addRetry({
      maxAttempts: 3,
      backoffRate: 2,
      interval: cdk.Duration.seconds(1),
    }).addCatch(
      new cdk.aws_stepfunctions_tasks.LambdaInvoke(this, 'NotifyError', {
        lambdaFunction: alertLambda,
        payload: cdk.aws_stepfunctions.TaskInput.fromObject({
          Response: cdk.aws_stepfunctions.JsonPath.stringAt('$'),
          Execution: cdk.aws_stepfunctions.JsonPath.stringAt('$$.Execution'),
        }),
      }),
    );

    const stateMachine = new cdk.aws_stepfunctions.StateMachine(this, 'StateMachine', {
      definition,
      stateMachineName: `${ruleName}-state-machine`,
    });

    // EventBridgeのルールを定義
    const rule = new cdk.aws_events.Rule(this, 'Rule', {
      ruleName,
      schedule,
      enabled,
    });
    
    rule.addTarget(new cdk.aws_events_targets.SfnStateMachine(stateMachine));
  }
}

StepfunctionsSampleCdkStackではVPCやCloudWatchの準備などをして、ScheduledStepFunctionsFargateTaskに引数 (props)を渡しています。そのため、このコードのポイントは、Step Functionsを構築しているScheduledStepFunctionsFargateTaskの部分です。

runTask変数が「ECSのタスク起動」をするStep Functionsのタスクにあたります。これに .addRetry で失敗時のリトライと addCatch でさらに失敗時のエラー処理を追加して、Step Functionsのステートマシンの定義として設定しています。

ステートマシンの定義が一見分かりづらいかもしれませんが、AWSマネジメントコンソールで見るとシンプルな構成なのが分かります。JSONでステートマシンを定義することもできますが、このCDKのメソッドチェーンの方は入力補完が効くので、こちらの方が読み書きしやすかったです。

また、alertLambda変数にエラー通知用のAWS Lambdaを作成していて、ステートマシン定義の addCatch の呼び出し先として使っています。サンプルでの中身はNode.jsでconsole.logするだけですが、実案件ではエラー内容を頑張ってパースしてChatworkにこのような通知が来るようにしました。Lambda関数の中身については、別途機会があれば紹介します。

ちなみに、エラー通知は Amazon SNS(Simple Notification Service)を使うのが定石かもしれません。Lambdaで独自コードを書くよりも確実に通知が届きますし、そのSNSからLambdaを呼び出すこともできます。今回はシンプルな構成にしたかったのと、リトライ機構があるのでエラー発生はほぼないはずという見込みでAmazon SNSは使っていません。

おわりに

つい先日の re:Invent 2023 で「Step Functionsの”Call third-party API”という新機能で、外部APIを直接呼べるようになる」という発表(参考:ブログ)がありました。それと合わせてStep Functionsがデバッグしやすくする機能追加もあります。

私はこの件で初めてStep Functionsを使ったのですが、AWS CDKでもステートマシンを定義できますし、マネジメントコンソールでも見やすく、かなり便利そうです。

  koni   2023年12月5日


関連記事

NetbeansのFormatをPSRに準拠させる

こんにちは、かねこです。 いま挑戦しているPHPのプロジェクトで、NetBean…

集計処理の実装

大分間が空いてしまいましたが、前回の キーブレークを使った集計の実装を行ってみま…

AWSのRDSをアップグレードのためテスト

こんにちはいけしまです。たてつづけにブログ投稿。 もうひとつ前のブログで書いたR…


← 前の投稿

次の投稿 →