TECHSCORE BLOG

クラウドCRMを提供するシナジーマーケティングのエンジニアブログです。

AWS Step Functionsレシピ集

はじめに

AWSで定期的なデータ連携バッチを書く方法の一つにAWS Step Functions(以下Step Functions)があります。Step FunctionsはAWS Lambda(以下Lambda)を始めとするAWSの様々なサービスを組み合わせたワークフローを記述できるサービスです。LambdaおよびStep Functionsはバッチ用のサーバを構築・管理する必要がなく安価にサービスを運用できるため、私のチームでも活用しています。

LambdaおよびStep Functionsにはそれぞれ制約があり工夫が必要となる場合もあります。例えばパイプラインを定義するためにASL(Amazon State Language)というDSLを使いますが、ASLで何ができるかを理解するには時間が掛かります。細かい書き方などはすぐ忘れてしまって、以前書いたのを探すのに苦労したりもします。この記事は主に自分のために、用途別の「レシピ」をまとめようというものです。

西尾 義英(ニシオ ヨシヒデ)
開発から離れていた時期もありましたが最近プロダクトづくりに戻ってきました。
データを活用できる製品・基盤づくりがテーマです。

概要

後の説明のためにStep Functionsで取り扱う概念や用語について説明します。

ワークフロー

Step Functionsではいわゆるパイプライン、つまり複数の処理を有向無循環グラフ(DAG)でつなげたワークフローを書けます。開発者ガイドではワークフローのことを状態遷移機械(state machine)、個々の処理を状態(state)またはタスク(task)と呼んでいます。状態遷移図で書いたワークフローの例を示します。

  • 処理の始まりは「Start At: タスク名」で宣言します
  • 最後のstateには「End: true」という属性を付与します
  • あるstateの直後に起動する stateを「Next」で指定します
  • 「Next」は一つだけ指定できますが、分岐やループなどの制御を実現する特別な stateがあります(下図)
  • 分岐やループの中身が一つ以上の stateをもったサブ状態群となる場合があります

データの流れ

state間の入出力は JSON形式のテキストで、Lambdaとの連携が自然に行なえます。デベロッパーズガイドでは payloadと呼ばれることがあります。デフォルトではstateに入力されたJSONがそのままLambdaに渡り、Lambdaの出力が次のstateに渡ることになりますが、ASLの中でJSONを定義し直したり、JSONPath を使って抽出・変形することもできます。設定が可能な箇所を以下に示します。

基本的な書き方(SAM)

Step FunctionsはAWSコンソールからGUI形式で開発することもできますが、実際の開発ではAWS SAMなどを使ってコードとして記述した方が良いと思われます。SAMで開発する時の書き方を説明します。

  1. 処理を行うLambdaを書く
  2. SAMテンプレートにAWS::Serverless::Functionリソースを定義する
  3. state machineを定義するためのASLをJSONまたはYAML形式で書く
    • 3-1. 2で定義したLambdaのARNは変数置換(substitution)が可能です
  4. template.yamlAWS::Serverless::StateMachineリソースを定義する
    • 4-1. ASLでの変数置換(substitution)を定義します。2のLambdaのARNを参照します。
    • コピペミスや、ARN(!GetAtt リソース名.Arn)ではなく論理名(!Ref リソース名)を書いてしまう誤りが起こりがち
    • 4-2. Lambdaを呼び出せるようなRoleを定義します
# main.asl.yaml
StartAt: Task1

States:  
  Task1:
    Type: Task
    Comment: |
      2.で定義した AWS::Serverless::Function を呼び出します。
    Resource: ${TaskFunctionArn}
    End: true
# template.yaml(一部)
    TaskFunction:  #2.
      Type: AWS::Serverless::Function
      ~省略~
    BatchStateMachine:
      Type: AWS::Serverless::StateMachine
      Properties: statemachines/main.asl.yaml
        DefinitionUri: statemachines/main.asl.yaml
        DefinitionSubstitutions: #4-1.
          TaskFunctionArn: !GetAtt TaskFunction.Arn  #3-1.
        Role: !GetAtt BatchStateMachineRole.Arn  #4-2.
    BatchStateMachineRole:  #4-2.
      Type: AWS::IAM::Role
      Properties:
          AssumeRolePolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Sid: StsAssumeRole
                Effect: Allow
                Principal:
                  Service: states.amazonaws.com
                Action: sts:AssumeRole
          Path: /
          Policies:
          - PolicyName: InvokeLambdas
              PolicyDocument:
                Version: '2012-10-17'
                Statement:
                - Effect: Allow
                  Action:
                    - lambda:InvokeFunction
                  Resource: "*"

Lambda以外のサービスと連携(SNSで通知)

LambdaはResourceにARNを書くだけで連携できますが、Lambda以外のサービスを呼び出すこともできます。以下はSNSで通知を行う例です。

    Publish Success to SNS: 
      Type: Task
      Resource: arn:aws:states:::sns:publish
      Comment: |
        SNSでバッチ処理成功を通知する。
        件名や本文には固定の文字列だけでなく payload(の一部)を指定することもできる。
      Parameters: 
        Subject: Batch job succeeded
        Message: $.result
        TopicArn: $SuccessTopicArn
      End: true

State Machineの分割

状態遷移の数に制限があったり、ASLが長すぎて読みづらい、複数のstateで発生する例外をまとめて一箇所でキャッチしたいなど、いくつかの理由からASLを分割してサブルーチンのように呼び出す場合があります。

# main.asl.yaml
  Process Purchases:
    Type: Task
    Resource: arn:aws:states:::states:startExecution.sync  # StateMachineを同期的に呼び出す組み込みTask。後ろの`.sync`がポイント
    Parameters:
      StateMachineArn: ${PurchasesStateMachineArn}  # 呼び出すStateMachineのARN
      Input:  # StateMachineにわたすPayload
        updated.$: $.updated
        from.$: $.from
        sales_detail.$: $.keys.sales_detail
    ResultPath: null  # StateMachineからのoutputは入れ子が深く使いにくいので破棄してしまう
    Catch:  # 呼び出したStateMachineのどこかで例外が発生したらここでまとめてキャッチできる
      - ErrorEquals: ["States.TaskFailed"]
        Next: Publish Failure to SNS
    End: true

StateMachineからStateMachineを呼び出すにはStateMachineのRoleにポリシーが必要です。詳細は https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/stepfunctions-iam.html に説明されています。

入出力

{Input|Output}Pathの指定方法

以下のパターンを使い分ければおおよそ大丈夫かと思います。詳細はリファレンスを参照。

加工前 Path 加工後(Python object)
{"input": "somevalue"} "$" {"input": "somevalue"}
{"input": "somevalue"} null None
{"input": "somevalue"} "$.input" "somevalue"
{"parent": {"child1": "v1", "child2": "v2"}} "$.parent.child1" "v1" | | |
[1, 2, 3] "$[0]" 1

ResultPathの意味

InputPathやOutputPathの指定は、入力されたJSONのどの要素を取り出すかを意味しますが、ResultPathは、stateから出力されるJSONのどこに結果を挿入するかを指定します。以下の3つの使い方があります。

  1. ResultPath: $
    • $はJSONのroot要素を意味します。つまり入力されたJSONを破棄して結果で上書きします
  2. ResultPath: null
    • 逆に挿入先を指定しないということは、結果を破棄し入力されたJSONをそのまま出力する意味です
  3. 入力されたJSONの要素に結果をマージする
    • ResultPath: $.resultは、入力されたJSONに"result"という要素を追加します。

3の入力例

{
  "job_id": 1,
  "process_at": "2022-04-01T00:00:00+09:00"
}

3の出力例

{
  "job_id": 1,
  "process_at": "2022-04-01T00:00:00+09:00",
  "result": {
    "status": 200,
    "key": "processed.json"
  }
}

{Parameters|ResultSelector}の指定方法

出力したいJSONをそのまま書けばよいのですが、入力されたJSONを参照する時だけプロパティ名の末尾に.$を付けるのがポイントです。

同じく詳細は https://states-language.net/spec.html#payload-template を参照。

# 指定例
  Parameters:
    flagged: true
    parts:
      first.$: $.vals[0]
      last3.$: $.vals[3:]

同期的な制御

ファイルの分割処理

Map stateは、JSON配列の要素ごとの処理を記述することができます。可能な場合は各タスクは並行に実行されるので、ループと言うよりMap-ReduceのMapに相当します。Mapへの入力が [1, 2, 3]の場合、Map下のIterator先頭のstateは1, 2, 3をそれぞれ受け取ります。他のパラメータが必要な場合、ItemsPathを使って配列を参照するJSONPathを指定するとともに、MapのParametersに子のTaskが受け取るJSONを定義することができます。

よくある使い方は、Lambdaのメモリまたは実行時間の制約のためデータを分割処理する場合です。

# ファイルの分割処理
    1.Split Users in Files:
      Type: Task
      Resource: ${SplitUsersInFilesFunctionArn}
      Comment: |
        入力されたファイルを固定行数で分割してS3に保存。
        input:
          key_users(str): 処理するファイルを取り出すS3キー
        output:
          s3_keys(array of str): 分割したファイルのS3キー
      ResultPath: $.s3_keys # <-- ここに配列を入れる
      Next: 2.Import Users
    }
    2.Import Users:
      Type: Map
      ItemsPath: $.s3_keys # ここから配列を取り出す
      Parameters:
        job_id.$: $.job_id  # バッチ全体の共通変数
        s3_key.$: $$.Map.Item.Value  # Iteratorが受け取る配列要素(ItemsPathで指定)を参照する
      ResultPath: null # 実行結果は捨てる
      MaxConcurrency: 1  # 並行実行させない場合単なるループ処理
      Next: 3.Another process
      Iterator: 
        StartAt: Load Database
        States: 
          Load Database: 
            Type: Task
            Resource: ${LoadDatabaseFunctionArn}
            Comment: |
              入力されたファイル一つをデータベースにロードする。
              input:
                job_id(int): ジョブID
                s3_key(str): 処理するファイルを取り出すS3キー
              output:
                なし
            End: true
    3.Another process: # 省略

固定回ループする

ItemPathは、State間で受け渡されるJSONの一部を指定することができるのみで、固定要素の繰り返しを書くことができませんが、Passを組み合わせると実現できます。

  Enumerate Reset Data:
    Type: Pass
    Comment: |
      ループ変数を生成
      input:
        job_id(int): ジョブID
      output:
        types(array of str): 処理するデータの種類
    Result: ["customers", "purchases", "purchase_items"]
    ResultPath: $.types  # 上の配列をこのPathで参照できるようにする
    Next: Reset Data

  Reset Data:
    Type: Map
    ItemsPath: $.types  # ループで回したい要素
    Parameters:
      job_id: $.job_id
      type.$: $$.Map.Item.Value
    ResultPath: null # 結果は捨てる
    End: true
    Iterator:
      StartAt: Revert Data
      States:
        Revert Data:
          Type: Task
          Resource: ${RevertDataFunctionArn}
          Comment: |
            指定された種類のデータをリセットする
            input:
                job_id(int): ジョブID
                type(str): 処理するデータの種類
            output:
                なし
          ResultPath: null
          End: true

カウンタを使ったループ

Stepfunctionsで Choiceが分岐を書けるのでループを実現することはできますが、複雑なので基本的には Mapを使うことを考えたほうが良いと思います。 https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/tutorial-create-iterate-pattern-section.html

入力が空だったら終了

  Concatenate:
    Type: Task
    Resource: ${ConcatenatePurchasesFunctionArn}
    Comment: |
      日次に分割されたファイルを結合する
      input:
        job_id(int): ジョブID
        keys(array of str): ファイルを取り出すS3のキー
      output:
        key(str | null): 結合した結果を保存したS3のキー。入力されるファイルが0件ならnull
    ResultPath: $.key
    Next: Test Empty Data  # 処理するデータが空なら終了させたい

  Test Empty Data:
    Type: Choice  # 分岐を書く
    Choices:
      - Variable: $.key  # 評価した値のPath
        IsNull: true  # 評価式
        Next: Return If Empty  # 当てはまった場合の遷移先
    Default: Convert Date  # 全ての Choice要素 の評価が失敗した場合の遷移先

  Return If Empty: { Type: Succeed }  # 終端のState。Succeedは何もせずここで状態遷移が正常に終了したことを表す

Lambdaの実行結果で分岐する

Step FunctionsはLambdaの例外をキャッチして分岐することもできますが、Lambdaの出力結果で処理を継続するか失敗させるかを判断したい場合はChoice stateで分岐を書くこともできます。

    Tell Status :
      Type: Task
      Resource: ${TellStatusFunctionArn}
      ResultPath: $.result # {statusCode: 200} または {statusCode: 500}
      Next: Check Status

    Check Status: 
      Type: Choice
      Choices:
        - Variable: $.result.statusCode <-- ここで取り出す
          NumericEquals: 500
          Next: FailState
      Default: SuccessState

    SuccessState: { Type: Succeed }
    FailState: { Type: Fail }

非同期処理

子のStateMachineの完了を待たない

別のStateMachineを呼び出す方法については「State Machineの分割」で説明しました。子のStateMachineの完了を待ちたい場合が多いですが、待たずに終了してしまうこともできます。

  Delegate:
    Type: Task
    Resource: arn:aws:states:::states:startExecution  # 非同期実行
    Parameters:
      StateMachineArn: ${IntegrationStateMachineArn}
      Input:
        updated.$: $.updated
        job_id.$: $.job_id
    End: true

SNSやEvent、SQSから復帰する

StateMachineから明示的にLambdaやStateMachineを起動せずにSNSなどで間接的に処理を起動する場合、普通はそのまま制御が戻ってきませんが、必要であればコールバックしてもらうことができます。 https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/callback-task-sample-sqs.html に解説されていますが、Stepfunctionsから制御の外れてしまったLambdaからトークン付きのコールバックを待ち受けることができます。

実装例

初期化バッチが特定のS3バケットにデータを投げると、自動的に取り込み処理が走ります。取り込みが完了すると初期化バッチに処理が戻り、別の処理を続けます。少々複雑な例ですが、トークンを複数のLambda間でリレーできます。なお、LambdaのコードはPython3.8ランタイム向けのものです。

# main.asl.yaml
StartAt: Upload Datalake
States:
  Upload Datalake:
    Type: Task
    Comment: |
      S3バケットにファイルをアップロードする。
      アップロードした後はバケットのOnPlaceイベントで別のLambdaが呼び出される。
      そちらの処理が終わるとtask_tokenをもってコールバックされて次のTaskへ遷移。
    Resource: arn:aws:states:::lambda:invoke.waitForTaskToken
    HeartbeatSeconds: 60  # https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/connect-to-resource.html#wait-token-hearbeat
    Parameters:
      FunctionName: ${UploadFunctionArn}
      Payload:
        task_token.$: $$.Task.Token
    Next: Do Something
    
    # 省略
  BucketXInput:
    # ファイルを置くバケット
    Type: AWS::S3::Bucket
    Properties:
      NotificationConfiguration:
        TopicConfigurations:
          - Event: 's3:ObjectCreated:Put'
            Topic: !Ref TopicXOnPlaceFile

  TopicXOnPlaceFile:
    # 通知内容
    Type: AWS::SNS::Topic

  FunctionPermissionXOnPlaceFile:
    # SNSきっかけでLambdaを起動する場合に必要
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:* #InvokeFunction
      FunctionName: !GetAtt FunctionXOnPlaceFile.Arn
      Principal: sns.amazonaws.com
      SourceArn: !Ref TopicXOnPlaceFile

 SubscriptionXOnPlaceFile:
    # SNS::TopicにLambdaをひもづける
    Type: AWS::SNS::Subscription
    Properties:
      Endpoint: !GetAtt FunctionXOnPlaceFile.Arn
      Protocol: lambda
      TopicArn: !Ref TopicXOnPlaceFile

  FunctionXOnPlaceFile:
    # ファイルが設置されたときに呼ばれる関数
    Type: AWS::Serverless::Function
    Properties:
        CodeUrl: src/
        Handler: on_place.lambda_handler
      # 省略

  InvokeConfigXOnPlaceFile:
    # FunctionXOnPlaceFileの実行が成功・失敗した場合にSNS通知する設定
    Type: AWS::Lambda::EventInvokeConfig
    Properties:
      FunctionName: !Ref FunctionXOnPlaceFile
      MaximumRetryAttempts: 0
      DestinationConfig:
        OnSuccess:
          Destination: !Ref TopicXOnSuccess
        OnFailure:
          Destination: !Ref TopicXOnFailure  # 以下省略
      Qualifier: "$LATEST"

  TopicXOnSuccess:
    Type: AWS::SNS::Topic

  SubscriptionXOnSuccess:
    Type: AWS::SNS::Subscription
    Properties:
      Endpoint: !GetAtt FunctionXOnResult.Arn
      Protocol: lambda
      TopicArn: !Ref TopicXOnSuccess

  FunctionXOnResult:
    # 関数の処理が成功あるいは失敗したときに呼ばれる関数
    Type: AWS::Serverless::Function
    Properties:
        CodeUrl: src/
        Handler: on_result.lambda_handler
        # 省略
# src/main.py
def lambda_handler(event: LambdaEvent, context: LambdaContext) -> None:
    result = parse_sns(event)
    print("do something")
    if result.task_token is None:
        return  # 常にstateMachineに戻る必要はない

    try:
        # TODO 失敗時にsend_task_failureしたほうが良い
        sfn_client = boto3.client("stepfunctions")
        sfn_client.send_task_success(
            taskToken=result.task_token,
            output=result.to_json()
        )
    except Exception as e:
        print(e)


def parse_sns(event: LambdaEvent) -> ResultOutput:
    # ResultOutput は、on_place_file Lambdaの戻り値をラップしたdataclass
    record = event['Records'][0]
    message = json.loads(record['Sns']['Message'])
    payload = message["responsePayload"]
    return ResultOutput.from_dict(payload)

コールバックするための権限が必要です。on_result lambda のRoleに以下Policyを追加します。

  - PolicyName: StateMachineCallback
     PolicyDocument:
       Version: '2012-10-17'
       Statement:
          - Effect: Allow
            Action:
              - states:SendTaskSuccess
            Resource: "*"

おわりに

私たちがプロジェクト内で使っているパターンはおおよそこんなところです。最後にStep Functionsを使うときのポイントをまとめます。

  • ASLはYAMLで書くのがおすすめ
    • カッコやクォーテーションを書く量が減る
    • 行コメントも書ける
  • 繰り返し処理は、まず Map の利用を考える
    • Mapはデフォルト並列実行しようとするので注意
  • 大きくなってきたらStep Functionsを分割する
    • 見通しを良くするため
    • 状態遷移数の上限に引っかかることもある
  • Payloadの加工を良く間違うので、ワークフローを流れるJSONの構造を先に決めておくと良い
    • pydanticを使ってタスク間で共有するJSONをPythonのクラスに対応付けておき、Lambda間で共有する
    • この方法がまとまったらまた記事を書くかもしれません
  • 複雑なワークフローを組む場合、Lambdaで時間がかかる処理を書く前にTask間の入出力を確かめておくと良い
    • Pass stateを使って状態遷移だけ作っておく
    • 中身のない入出力だけのLambdaを定義する