はじめに
AWSで定期的なデータ連携バッチを書く方法の一つにAWS Step Functions(以下Step Functions)があります。Step FunctionsはAWS Lambda(以下Lambda)を始めとするAWSの様々なサービスを組み合わせたワークフローを記述できるサービスです。LambdaおよびStep Functionsはバッチ用のサーバを構築・管理する必要がなく安価にサービスを運用できるため、私のチームでも活用しています。
LambdaおよびStep Functionsにはそれぞれ制約があり工夫が必要となる場合もあります。例えばパイプラインを定義するためにASL(Amazon State Language)というDSLを使いますが、ASLで何ができるかを理解するには時間が掛かります。細かい書き方などはすぐ忘れてしまって、以前書いたのを探すのに苦労したりもします。この記事は主に自分のために、用途別の「レシピ」をまとめようというものです。

データを活用できる製品・基盤づくりがテーマです。
概要
後の説明のためにStep Functionsで取り扱う概念や用語について説明します。
ワークフロー
Step Functionsではいわゆるパイプライン、つまり複数の処理を有向無循環グラフ(DAG)でつなげたワークフローを書けます。開発者ガイドではワークフローのことを状態遷移機械(state machine)、個々の処理を状態(state)またはタスク(task)と呼んでいます。状態遷移図で書いたワークフローの例を示します。
- 処理の始まりは「Start At: タスク名」で宣言します
- 最後のstateには「End: true」という属性を付与します
- あるstateの直後に起動する stateを「Next」で指定します
- 「Next」は一つだけ指定できますが、分岐やループなどの制御を実現する特別な stateがあります(下図)
- 分岐やループの中身が一つ以上の stateをもったサブ状態群となる場合があります
データの流れ
state間の入出力は JSON形式のテキストで、Lambdaとの連携が自然に行なえます。デベロッパーズガイドでは payloadと呼ばれることがあります。デフォルトではstateに入力されたJSONがそのままLambdaに渡り、Lambdaの出力が次のstateに渡ることになりますが、ASLの中でJSONを定義し直したり、JSONPath を使って抽出・変形することもできます。設定が可能な箇所を以下に示します。
基本的な書き方(SAM)
Step FunctionsはAWSコンソールからGUI形式で開発することもできますが、実際の開発ではAWS SAMなどを使ってコードとして記述した方が良いと思われます。SAMで開発する時の書き方を説明します。
- 処理を行うLambdaを書く
- SAMテンプレートに
AWS::Serverless::Function
リソースを定義する - state machineを定義するためのASLをJSONまたはYAML形式で書く
- 3-1. 2で定義したLambdaのARNは変数置換(substitution)が可能です
template.yaml
にAWS::Serverless::StateMachine
リソースを定義する- 4-1. ASLでの変数置換(substitution)を定義します。2のLambdaのARNを参照します。
- コピペミスや、ARN(
!GetAtt リソース名.Arn
)ではなく論理名(!Ref リソース名
)を書いてしまう誤りが起こりがち - 4-2. Lambdaを呼び出せるようなRoleを定義します
# main.asl.yaml StartAt: Task1 States: Task1: Type: Task Comment: | 2.で定義した AWS::Serverless::Function を呼び出します。 Resource: ${TaskFunctionArn} End: true
# template.yaml(一部) TaskFunction: #2. Type: AWS::Serverless::Function ~省略~ BatchStateMachine: Type: AWS::Serverless::StateMachine Properties: statemachines/main.asl.yaml DefinitionUri: statemachines/main.asl.yaml DefinitionSubstitutions: #4-1. TaskFunctionArn: !GetAtt TaskFunction.Arn #3-1. Role: !GetAtt BatchStateMachineRole.Arn #4-2. BatchStateMachineRole: #4-2. Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Sid: StsAssumeRole Effect: Allow Principal: Service: states.amazonaws.com Action: sts:AssumeRole Path: / Policies: - PolicyName: InvokeLambdas PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - lambda:InvokeFunction Resource: "*"
Lambda以外のサービスと連携(SNSで通知)
LambdaはResourceにARNを書くだけで連携できますが、Lambda以外のサービスを呼び出すこともできます。以下はSNSで通知を行う例です。
Publish Success to SNS: Type: Task Resource: arn:aws:states:::sns:publish Comment: | SNSでバッチ処理成功を通知する。 件名や本文には固定の文字列だけでなく payload(の一部)を指定することもできる。 Parameters: Subject: Batch job succeeded Message: $.result TopicArn: $SuccessTopicArn End: true
State Machineの分割
状態遷移の数に制限があったり、ASLが長すぎて読みづらい、複数のstateで発生する例外をまとめて一箇所でキャッチしたいなど、いくつかの理由からASLを分割してサブルーチンのように呼び出す場合があります。
# main.asl.yaml Process Purchases: Type: Task Resource: arn:aws:states:::states:startExecution.sync # StateMachineを同期的に呼び出す組み込みTask。後ろの`.sync`がポイント Parameters: StateMachineArn: ${PurchasesStateMachineArn} # 呼び出すStateMachineのARN Input: # StateMachineにわたすPayload updated.$: $.updated from.$: $.from sales_detail.$: $.keys.sales_detail ResultPath: null # StateMachineからのoutputは入れ子が深く使いにくいので破棄してしまう Catch: # 呼び出したStateMachineのどこかで例外が発生したらここでまとめてキャッチできる - ErrorEquals: ["States.TaskFailed"] Next: Publish Failure to SNS End: true
StateMachineからStateMachineを呼び出すにはStateMachineのRoleにポリシーが必要です。詳細は https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/stepfunctions-iam.html に説明されています。
入出力
{Input|Output}Pathの指定方法
以下のパターンを使い分ければおおよそ大丈夫かと思います。詳細はリファレンスを参照。
加工前 | Path | 加工後(Python object) |
---|---|---|
{"input": "somevalue"} |
"$" |
{"input": "somevalue"} |
{"input": "somevalue"} |
null |
None |
{"input": "somevalue"} |
"$.input" |
"somevalue" |
{"parent": {"child1": "v1", "child2": "v2"}} |
"$.parent.child1" |
"v1" | | | |
[1, 2, 3] |
"$[0]" |
1 |
ResultPathの意味
InputPathやOutputPathの指定は、入力されたJSONのどの要素を取り出すかを意味しますが、ResultPathは、stateから出力されるJSONのどこに結果を挿入するかを指定します。以下の3つの使い方があります。
ResultPath: $
$
はJSONのroot要素を意味します。つまり入力されたJSONを破棄して結果で上書きします
ResultPath: null
- 逆に挿入先を指定しないということは、結果を破棄し入力されたJSONをそのまま出力する意味です
- 入力されたJSONの要素に結果をマージする
ResultPath: $.result
は、入力されたJSONに"result"という要素を追加します。
3の入力例
{ "job_id": 1, "process_at": "2022-04-01T00:00:00+09:00" }
3の出力例
{ "job_id": 1, "process_at": "2022-04-01T00:00:00+09:00", "result": { "status": 200, "key": "processed.json" } }
{Parameters|ResultSelector}の指定方法
出力したいJSONをそのまま書けばよいのですが、入力されたJSONを参照する時だけプロパティ名の末尾に.$
を付けるのがポイントです。
同じく詳細は https://states-language.net/spec.html#payload-template を参照。
# 指定例 Parameters: flagged: true parts: first.$: $.vals[0] last3.$: $.vals[3:]
同期的な制御
ファイルの分割処理
Map stateは、JSON配列の要素ごとの処理を記述することができます。可能な場合は各タスクは並行に実行されるので、ループと言うよりMap-ReduceのMapに相当します。Mapへの入力が [1, 2, 3]
の場合、Map下のIterator先頭のstateは1, 2, 3をそれぞれ受け取ります。他のパラメータが必要な場合、ItemsPathを使って配列を参照するJSONPathを指定するとともに、MapのParametersに子のTaskが受け取るJSONを定義することができます。
よくある使い方は、Lambdaのメモリまたは実行時間の制約のためデータを分割処理する場合です。
# ファイルの分割処理 1.Split Users in Files: Type: Task Resource: ${SplitUsersInFilesFunctionArn} Comment: | 入力されたファイルを固定行数で分割してS3に保存。 input: key_users(str): 処理するファイルを取り出すS3キー output: s3_keys(array of str): 分割したファイルのS3キー ResultPath: $.s3_keys # <-- ここに配列を入れる Next: 2.Import Users } 2.Import Users: Type: Map ItemsPath: $.s3_keys # ここから配列を取り出す Parameters: job_id.$: $.job_id # バッチ全体の共通変数 s3_key.$: $$.Map.Item.Value # Iteratorが受け取る配列要素(ItemsPathで指定)を参照する ResultPath: null # 実行結果は捨てる MaxConcurrency: 1 # 並行実行させない場合単なるループ処理 Next: 3.Another process Iterator: StartAt: Load Database States: Load Database: Type: Task Resource: ${LoadDatabaseFunctionArn} Comment: | 入力されたファイル一つをデータベースにロードする。 input: job_id(int): ジョブID s3_key(str): 処理するファイルを取り出すS3キー output: なし End: true 3.Another process: # 省略
固定回ループする
ItemPathは、State間で受け渡されるJSONの一部を指定することができるのみで、固定要素の繰り返しを書くことができませんが、Passを組み合わせると実現できます。
Enumerate Reset Data: Type: Pass Comment: | ループ変数を生成 input: job_id(int): ジョブID output: types(array of str): 処理するデータの種類 Result: ["customers", "purchases", "purchase_items"] ResultPath: $.types # 上の配列をこのPathで参照できるようにする Next: Reset Data Reset Data: Type: Map ItemsPath: $.types # ループで回したい要素 Parameters: job_id: $.job_id type.$: $$.Map.Item.Value ResultPath: null # 結果は捨てる End: true Iterator: StartAt: Revert Data States: Revert Data: Type: Task Resource: ${RevertDataFunctionArn} Comment: | 指定された種類のデータをリセットする input: job_id(int): ジョブID type(str): 処理するデータの種類 output: なし ResultPath: null End: true
カウンタを使ったループ
Stepfunctionsで Choiceが分岐を書けるのでループを実現することはできますが、複雑なので基本的には Mapを使うことを考えたほうが良いと思います。 https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/tutorial-create-iterate-pattern-section.html
入力が空だったら終了
Concatenate: Type: Task Resource: ${ConcatenatePurchasesFunctionArn} Comment: | 日次に分割されたファイルを結合する input: job_id(int): ジョブID keys(array of str): ファイルを取り出すS3のキー output: key(str | null): 結合した結果を保存したS3のキー。入力されるファイルが0件ならnull ResultPath: $.key Next: Test Empty Data # 処理するデータが空なら終了させたい Test Empty Data: Type: Choice # 分岐を書く Choices: - Variable: $.key # 評価した値のPath IsNull: true # 評価式 Next: Return If Empty # 当てはまった場合の遷移先 Default: Convert Date # 全ての Choice要素 の評価が失敗した場合の遷移先 Return If Empty: { Type: Succeed } # 終端のState。Succeedは何もせずここで状態遷移が正常に終了したことを表す
Lambdaの実行結果で分岐する
Step FunctionsはLambdaの例外をキャッチして分岐することもできますが、Lambdaの出力結果で処理を継続するか失敗させるかを判断したい場合はChoice stateで分岐を書くこともできます。
Tell Status : Type: Task Resource: ${TellStatusFunctionArn} ResultPath: $.result # {statusCode: 200} または {statusCode: 500} Next: Check Status Check Status: Type: Choice Choices: - Variable: $.result.statusCode <-- ここで取り出す NumericEquals: 500 Next: FailState Default: SuccessState SuccessState: { Type: Succeed } FailState: { Type: Fail }
非同期処理
子のStateMachineの完了を待たない
別のStateMachineを呼び出す方法については「State Machineの分割」で説明しました。子のStateMachineの完了を待ちたい場合が多いですが、待たずに終了してしまうこともできます。
Delegate: Type: Task Resource: arn:aws:states:::states:startExecution # 非同期実行 Parameters: StateMachineArn: ${IntegrationStateMachineArn} Input: updated.$: $.updated job_id.$: $.job_id End: true
SNSやEvent、SQSから復帰する
StateMachineから明示的にLambdaやStateMachineを起動せずにSNSなどで間接的に処理を起動する場合、普通はそのまま制御が戻ってきませんが、必要であればコールバックしてもらうことができます。 https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/callback-task-sample-sqs.html に解説されていますが、Stepfunctionsから制御の外れてしまったLambdaからトークン付きのコールバックを待ち受けることができます。
実装例
初期化バッチが特定のS3バケットにデータを投げると、自動的に取り込み処理が走ります。取り込みが完了すると初期化バッチに処理が戻り、別の処理を続けます。少々複雑な例ですが、トークンを複数のLambda間でリレーできます。なお、LambdaのコードはPython3.8ランタイム向けのものです。
# main.asl.yaml StartAt: Upload Datalake States: Upload Datalake: Type: Task Comment: | S3バケットにファイルをアップロードする。 アップロードした後はバケットのOnPlaceイベントで別のLambdaが呼び出される。 そちらの処理が終わるとtask_tokenをもってコールバックされて次のTaskへ遷移。 Resource: arn:aws:states:::lambda:invoke.waitForTaskToken HeartbeatSeconds: 60 # https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/connect-to-resource.html#wait-token-hearbeat Parameters: FunctionName: ${UploadFunctionArn} Payload: task_token.$: $$.Task.Token Next: Do Something # 省略
BucketXInput: # ファイルを置くバケット Type: AWS::S3::Bucket Properties: NotificationConfiguration: TopicConfigurations: - Event: 's3:ObjectCreated:Put' Topic: !Ref TopicXOnPlaceFile TopicXOnPlaceFile: # 通知内容 Type: AWS::SNS::Topic FunctionPermissionXOnPlaceFile: # SNSきっかけでLambdaを起動する場合に必要 Type: AWS::Lambda::Permission Properties: Action: lambda:* #InvokeFunction FunctionName: !GetAtt FunctionXOnPlaceFile.Arn Principal: sns.amazonaws.com SourceArn: !Ref TopicXOnPlaceFile SubscriptionXOnPlaceFile: # SNS::TopicにLambdaをひもづける Type: AWS::SNS::Subscription Properties: Endpoint: !GetAtt FunctionXOnPlaceFile.Arn Protocol: lambda TopicArn: !Ref TopicXOnPlaceFile FunctionXOnPlaceFile: # ファイルが設置されたときに呼ばれる関数 Type: AWS::Serverless::Function Properties: CodeUrl: src/ Handler: on_place.lambda_handler # 省略 InvokeConfigXOnPlaceFile: # FunctionXOnPlaceFileの実行が成功・失敗した場合にSNS通知する設定 Type: AWS::Lambda::EventInvokeConfig Properties: FunctionName: !Ref FunctionXOnPlaceFile MaximumRetryAttempts: 0 DestinationConfig: OnSuccess: Destination: !Ref TopicXOnSuccess OnFailure: Destination: !Ref TopicXOnFailure # 以下省略 Qualifier: "$LATEST" TopicXOnSuccess: Type: AWS::SNS::Topic SubscriptionXOnSuccess: Type: AWS::SNS::Subscription Properties: Endpoint: !GetAtt FunctionXOnResult.Arn Protocol: lambda TopicArn: !Ref TopicXOnSuccess FunctionXOnResult: # 関数の処理が成功あるいは失敗したときに呼ばれる関数 Type: AWS::Serverless::Function Properties: CodeUrl: src/ Handler: on_result.lambda_handler # 省略
# src/main.py def lambda_handler(event: LambdaEvent, context: LambdaContext) -> None: result = parse_sns(event) print("do something") if result.task_token is None: return # 常にstateMachineに戻る必要はない try: # TODO 失敗時にsend_task_failureしたほうが良い sfn_client = boto3.client("stepfunctions") sfn_client.send_task_success( taskToken=result.task_token, output=result.to_json() ) except Exception as e: print(e) def parse_sns(event: LambdaEvent) -> ResultOutput: # ResultOutput は、on_place_file Lambdaの戻り値をラップしたdataclass record = event['Records'][0] message = json.loads(record['Sns']['Message']) payload = message["responsePayload"] return ResultOutput.from_dict(payload)
コールバックするための権限が必要です。on_result lambda のRoleに以下Policyを追加します。
- PolicyName: StateMachineCallback PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - states:SendTaskSuccess Resource: "*"
おわりに
私たちがプロジェクト内で使っているパターンはおおよそこんなところです。最後にStep Functionsを使うときのポイントをまとめます。
- ASLはYAMLで書くのがおすすめ
- カッコやクォーテーションを書く量が減る
- 行コメントも書ける
- 繰り返し処理は、まず Map の利用を考える
- Mapはデフォルト並列実行しようとするので注意
- 大きくなってきたらStep Functionsを分割する
- 見通しを良くするため
- 状態遷移数の上限に引っかかることもある
- Payloadの加工を良く間違うので、ワークフローを流れるJSONの構造を先に決めておくと良い
- pydanticを使ってタスク間で共有するJSONをPythonのクラスに対応付けておき、Lambda間で共有する
- この方法がまとまったらまた記事を書くかもしれません
- 複雑なワークフローを組む場合、Lambdaで時間がかかる処理を書く前にTask間の入出力を確かめておくと良い
- Pass stateを使って状態遷移だけ作っておく
- 中身のない入出力だけのLambdaを定義する