AWS SAM で SNS + SQS + Lambda 通知システムを構築しよう【Pub/Sub・ファンアウト・DLQ / コンソール版との比較付き】

AWS Basic
スポンサーリンク
スポンサーリンク

はじめに

「通知システムをコードで管理して、チームで同じ環境を再現できるようにしたい」という要件に、AWS SAM(Serverless Application Model) は最適な選択肢のひとつです。

この記事では、AWS SAM を使って、SNS + SQS + Lambda によるPub/Sub通知システム(ファンアウトパターン)をゼロから構築するハンズオンを紹介します。

Publisher(aws sns publish)
  ↓ メッセージを1回発行するだけ
SNS Topic(sns-sqs-lambda-stack-topic)
  │
  ├─ サブスクリプション(フィルターなし)
  │    → LogQueue → LogFunction → CloudWatch Logs(全メッセージを記録)
  │
  └─ サブスクリプション(level=warning/critical のみ)
       → AlertQueue → AlertFunction → CloudWatch Logs(アラート処理)
                          ↓ 失敗(maxReceiveCount=3)
                        AlertDLQ

このハンズオンで体験できること:

  • AWS::SNS::Topic / AWS::SQS::Queue / AWS::SNS::Subscription の SAM 定義
  • AWS::SQS::QueuePolicy — コンソールでは自動追加されるが SAM では明示的な定義が必要な理由
  • FilterPolicy で SNS フィルターポリシーを JSON 文字列で指定する方法
  • SQSPollerPolicy による1行での SQS ポーリング権限付与

このハンズオンの特徴:

  • sam build + sam deploy2コマンドで全リソースをデプロイ
  • SNS Topic + SQS キュー × 3 + Lambda × 2 + IAM ロール × 2 を template.yaml 1ファイルで管理
  • sam delete全リソースを一括削除(コンソール版では SNS / SQS × 3 / Lambda × 2 / IAM / ロググループを個別に削除)

Amazon検索[本 AWS 開発]

スポンサーリンク

SAM vs コンソール:どれだけ違うか

比較項目SAM(コード)コンソール(手動)
SQS キューポリシーAWS::SQS::QueuePolicy を明示的に定義が必要SNS サブスクリプション作成時に自動追加
SNS フィルターポリシーFilterPolicy: '{"level": ...}' で定義JSON を直接入力
Lambda の SQS 権限SQSPollerPolicy 1行で付与AWSLambdaSQSQueueExecutionRole を手動アタッチ
削除sam delete 1コマンドSNS / SQS × 3 / Lambda × 2 / IAM / ロググループを個別に削除
再現性チームで同じ環境を即座に再現できる手順書が必要

スポンサーリンク

キーワード解説

用語意味
SNS Topicメッセージの送信先。複数のサブスクライバーに同時配信できる
Pub/SubPublisher(発行者)と Subscriber(購読者)が疎結合で通信するパターン
ファンアウト1つのメッセージを複数の宛先に同時配信するパターン
フィルターポリシーメッセージ属性に基づき、配信するかどうかを制御するルール
DLQ(デッドレターキュー)処理に失敗したメッセージを退避させるキュー
SNS エンベロープSNS が SQS に配信する際にメッセージを包む JSON 構造
SQSPollerPolicySAM 組み込みポリシー。sqs:ReceiveMessage / sqs:DeleteMessage 権限を1行で付与
RedrivePolicySQS の DLQ 設定。失敗した場合の退避先キューと最大受信数を定義する

前提条件

ツール確認コマンド最低バージョン目安
AWS CLI v2aws --version2.x
AWS SAM CLIsam --version1.x
Python 3.12python --version3.12

AWS認証確認:

aws sts get-caller-identity

アカウントIDが表示されれば認証設定済みです。


使用するAWSサービス

サービス役割料金
SNSメッセージの発行・ファンアウト配信月100万リクエストまで無料
SQSメッセージキューイング(LogQueue / AlertQueue / AlertDLQ)月100万リクエストまで無料
Lambdaキューからメッセージを受信して処理(LogFunction / AlertFunction)月100万リクエスト・400,000 GB-秒まで無料
IAMLambda の実行権限管理無料
S3SAM デプロイパッケージ置き場少量のため実質無料
CloudWatch LogsLambda の実行ログ月5GBまで無料

Step 1: プロジェクトフォルダを開く

cd C:\my-aws\aws-learning-projects\sns-sqs-lambda

フォルダ構造

sns-sqs-lambda/
├── template.yaml           # SAMテンプレート(SNS + SQS × 3 + Lambda × 2)
├── samconfig.toml          # デプロイ設定(gitignore 対象・毎回手動作成が必要)
├── docs/
│   ├── 1_console.md        # AWSコンソール版手順
│   └── 2_sam.md            # SAM版手順
└── src/
    ├── log_handler.py      # 全ログ処理 Lambda(LogQueue 用)
    └── alert_handler.py    # アラート処理 Lambda(AlertQueue 用・DLQ テスト機能付き)

Step 2: SAMテンプレートの確認(template.yaml)

template.yaml は全 AWSリソースの設計図です。コンソール版との違いに注目しながらポイントを確認します。

SNS Topic とキューの定義

Resources:
  NotificationTopic:
    Type: AWS::SNS::Topic          # メッセージの発行点

  LogQueue:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeout: 180       # 可視性タイムアウト(DLQテストで 180秒×3回 待つ)

  AlertDLQ:
    Type: AWS::SQS::Queue          # AlertFunction が 3回失敗したメッセージの退避先

  AlertQueue:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeout: 180
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt AlertDLQ.Arn
        maxReceiveCount: 3         # 3回失敗で DLQ へ

SQS キューポリシー(SNS からの受信許可)

SAM版で最も注意が必要なポイントです。コンソールでは SNS サブスクリプション作成時に自動追加されますが、CloudFormation/SAM では明示的な定義が必要です。

  LogQueuePolicy:
    Type: AWS::SQS::QueuePolicy    # SNS が LogQueue に送信することを許可
    Properties:
      Queues:
        - !Ref LogQueue
      PolicyDocument:
        Statement:
          - Effect: Allow
            Principal:
              Service: sns.amazonaws.com
            Action: sqs:SendMessage
            Resource: !GetAtt LogQueue.Arn
            Condition:
              ArnEquals:
                aws:SourceArn: !Ref NotificationTopic   # この Topic からのみ許可

  AlertQueuePolicy:
    Type: AWS::SQS::QueuePolicy    # SNS が AlertQueue に送信することを許可
    Properties:
      Queues:
        - !Ref AlertQueue
      PolicyDocument:
        Statement:
          - Effect: Allow
            Principal:
              Service: sns.amazonaws.com
            Action: sqs:SendMessage
            Resource: !GetAtt AlertQueue.Arn
            Condition:
              ArnEquals:
                aws:SourceArn: !Ref NotificationTopic

Condition: ArnEquals を付ける理由:
この条件がないと、同じ AWS アカウント内の他の SNS Topicからも SQS にメッセージを送れてしまいます。セキュリティのベストプラクティスとして、特定の Topic からのみ許可するよう制限します。


SNS サブスクリプション(ファンアウトとフィルター)

  LogQueueSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      Protocol: sqs
      TopicArn: !Ref NotificationTopic
      Endpoint: !GetAtt LogQueue.Arn
      # FilterPolicy なし → 全メッセージが届く

  # フィルターあり: level=warning/critical のみ AlertQueue に配信
  AlertQueueSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      Protocol: sqs
      TopicArn: !Ref NotificationTopic
      Endpoint: !GetAtt AlertQueue.Arn
      FilterPolicy: '{"level": ["warning", "critical"]}'
      FilterPolicyScope: MessageAttributes   # メッセージ属性でフィルター(デフォルト)

FilterPolicy は JSON 文字列として指定:
YAML 内では '{"level": ["warning", "critical"]}' のようにシングルクォートで囲んだ JSON 文字列として記述します。オブジェクト形式ではないことに注意してください。


Lambda 関数の定義

  LogFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/
      Handler: log_handler.lambda_handler
      Runtime: python3.12
      Timeout: 30
      Policies:
        - SQSPollerPolicy:               # ReceiveMessage / DeleteMessage を自動付与
            QueueName: !GetAtt LogQueue.QueueName
      Events:
        LogQueueEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt LogQueue.Arn
            BatchSize: 5                 # 1回の呼び出しで最大5件処理

  AlertFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/
      Handler: alert_handler.lambda_handler
      Runtime: python3.12
      Timeout: 30
      Policies:
        - SQSPollerPolicy:
            QueueName: !GetAtt AlertQueue.QueueName
      Events:
        AlertQueueEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt AlertQueue.Arn
            BatchSize: 1    # 1件失敗しても他のメッセージに影響しないよう 1 に設定

template.yaml のポイント:

ポイント説明
AWS::SQS::QueuePolicyコンソール版では自動付与されるが SAM では明示的に定義が必要
FilterPolicyJSON 文字列で指定。level 属性が warning または critical のメッセージのみ通過
SQSPollerPolicySAM 組み込みポリシー。コンソール版では AWSLambdaSQSQueueExecutionRole を手動アタッチ
BatchSize: 1(AlertFunction)1件ずつ処理することで、失敗時に DLQ に移動するメッセージを1件に限定

Step 3: Lambda コードの確認(src/)

コードはすでに作成済みです。各ファイルの役割を確認しておきます。

log_handler.py のポイント

for record in event["Records"]:
    # SNS → SQS 配信では、SQS のボディに SNS エンベロープ(JSON文字列)が格納される
    sns_envelope = json.loads(record["body"])   # ← json.loads が必要
    message = sns_envelope["Message"]           # 実際のメッセージ本文
    attributes = sns_envelope.get("MessageAttributes", {})
    level = attributes.get("level", {}).get("Value", "info")

SNS エンベロープの構造(SQS に届く body の中身):

{
  "Type": "Notification",
  "MessageId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
  "TopicArn": "arn:aws:sns:ap-northeast-1:...:topic",
  "Message": "警告メッセージ",
  "MessageAttributes": {
    "level": {"Type": "String", "Value": "warning"}
  }
}

SQS を直接使う場合(record["body"] がそのまま文字列)とは構造が異なります。
SNS 経由の場合は必ず json.loads してエンベロープを展開してください。

alert_handler.py のポイント

  • message == "error" の場合は意図的に ValueError を発生させる(DLQ テスト用)
  • BatchSize: 1 のため、1件失敗すると AlertQueue に戻り、3回試行後 AlertDLQ へ移動する

Step 4: samconfig.toml を作成する

samconfig.toml.gitignore で管理外のため、毎回手動で作成する必要があります。

sns-sqs-lambda/samconfig.toml を新規作成して以下を貼り付けます。

version = 0.1

[default.deploy.parameters]
stack_name = "sns-sqs-lambda-stack"
resolve_s3 = true
s3_prefix = "sns-sqs-lambda-stack"
region = "ap-northeast-1"
confirm_changeset = true
capabilities = "CAPABILITY_IAM"
disable_rollback = true
image_repositories = []

[default.global.parameters]
region = "ap-northeast-1"

samconfig.toml の置き場所:
sns-sqs-lambda/ フォルダの直下に置く必要があります。


Step 5: sam build(ビルド)

cd C:\my-aws\aws-learning-projects\sns-sqs-lambda
sam build

成功すると以下のように表示されます。

Build Succeeded

Built Artifacts  : .aws-sam/build
Built Template   : .aws-sam/build/template.yaml

sam build が行っていること:
src/ フォルダを ZIP パッケージ化して .aws-sam/build/ に配置し、Lambda デプロイの準備を整えます。


Step 6: sam deploy(デプロイ)

sam deploy

変更内容が表示されて確認を求められます。

Deploy this changeset? [y/N]: y

y を入力して進めます。数分でデプロイが完了します。

デプロイ完了の確認

ターミナルに Outputs が表示されます。

Outputs
----------------------------------------------------------------------
Key    TopicArn
Value  arn:aws:sns:ap-northeast-1:123456789012:sns-sqs-lambda-stack-topic

Key    LogQueueUrl
Value  https://sqs.ap-northeast-1.amazonaws.com/123456789012/sns-sqs-lambda-stack-log-queue

Key    AlertQueueUrl
Value  https://sqs.ap-northeast-1.amazonaws.com/123456789012/sns-sqs-lambda-stack-alert-queue

Key    AlertDLQUrl
Value  https://sqs.ap-northeast-1.amazonaws.com/123456789012/sns-sqs-lambda-stack-alert-dlq
----------------------------------------------------------------------

TopicArn を控えておきます。

デプロイされるリソース一覧

SNS Topic × 1          : sns-sqs-lambda-stack-topic
SNS サブスクリプション × 2  : LogQueue用(フィルターなし)/ AlertQueue用(フィルターあり)
SQS キュー × 3          : LogQueue / AlertQueue / AlertDLQ
Lambda 関数 × 2        : sns-sqs-lambda-stack-LogFunction-XXXX 他
IAM ロール × 2          : Lambda 用
S3                     : SAM デプロイパッケージ
CloudWatch Logs        : Lambda 自動生成ログ × 2

Step 7: 動作テスト

事前準備: ARN を変数に設定する

set TOPIC_ARN=arn:aws:sns:ap-northeast-1:123456789012:sns-sqs-lambda-stack-topic
set ALERT_DLQ_URL=https://sqs.ap-northeast-1.amazonaws.com/123456789012/sns-sqs-lambda-stack-alert-dlq

123456789012 を自分のアカウントIDに置き換えてください(Outputs の値をそのままコピーすればよいです)。


テスト1: level=info(LogFunction のみ受信)

aws sns publish ^
  --topic-arn %TOPIC_ARN% ^
  --message "情報メッセージ" ^
  --message-attributes "{\"level\":{\"DataType\":\"String\",\"StringValue\":\"info\"}}" ^
  --region ap-northeast-1

レスポンス例:

{
    "MessageId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
}

CloudWatch Logs で LogFunction を確認(数秒後):

aws logs tail /aws/lambda/sns-sqs-lambda-stack-LogFunction-XXXX --follow
{"queue": "log", "level": "info", "message": "情報メッセージ"}
{"processedCount": 1}

AlertFunction は動作しないことを確認:

Lambda コンソールで AlertFunction の呼び出し回数が増えていないことを確認します。


テスト2: level=warning(ファンアウト確認)

aws sns publish ^
  --topic-arn %TOPIC_ARN% ^
  --message "警告メッセージ" ^
  --message-attributes "{\"level\":{\"DataType\":\"String\",\"StringValue\":\"warning\"}}" ^
  --region ap-northeast-1

期待する動作: 1回の publish で LogFunction と AlertFunction の両方が動作する(ファンアウト)

LogFunction のログ:

{"queue": "log", "level": "warning", "message": "警告メッセージ"}

AlertFunction のログ:

{"queue": "alert", "level": "warning", "message": "警告メッセージ", "alert_triggered": true}


テスト3: level=warning + body=error(DLQ テスト)

aws sns publish ^
  --topic-arn %TOPIC_ARN% ^
  --message "error" ^
  --message-attributes "{\"level\":{\"DataType\":\"String\",\"StringValue\":\"warning\"}}" ^
  --region ap-northeast-1

期待する動作:

  • LogFunction → 正常完了
  • AlertFunctionValueError で失敗 → 180秒後に再試行 × 3回 → AlertDLQ に移動

DLQ に移動するまでの待機時間: 最大 180秒 × 3回 ≒ 9分。
CloudWatch Logs で AlertFunction[ERROR] ValueError が 3回記録されたことを確認してから DLQ を確認するとよいです。

DLQ にメッセージが届いたか確認:

aws sqs receive-message ^
  --queue-url %ALERT_DLQ_URL% ^
  --region ap-northeast-1

レスポンスの "Body" フィールドに SNS エンベロープが格納されていれば DLQ 動作確認完了。


Step 8: AWSコンソールで確認(任意)

SAMでデプロイしたリソースはコンソールでも確認できます。

  • SNS: SNS → sns-sqs-lambda-stack-topic → 「サブスクリプション」タブ → 2つのサブスクリプションと各フィルターポリシーを確認
  • SQS: SQS → sns-sqs-lambda-stack-alert-queue → 「デッドレターキュー」タブ → AlertDLQ の ARN が設定されていることを確認
  • Lambda: CloudWatch Logs で LogFunction / AlertFunction の呼び出し履歴を比較確認

Step 9: リソースの削除

課金を止めるために、ハンズオン完了後は必ずリソースを削除してください。

sam delete --stack-name sns-sqs-lambda-stack --region ap-northeast-1
Are you sure you want to delete the stack sns-sqs-lambda-stack? [y/N]: y
Are you sure you want to delete the folder sns-sqs-lambda-stack in S3? [y/N]: y

削除完了の確認:

aws cloudformation describe-stacks --stack-name sns-sqs-lambda-stack --region ap-northeast-1

Stack with id sns-sqs-lambda-stack does not exist が表示されれば削除完了。

sam delete で削除されるリソース一覧

リソース
SNS Topic + サブスクリプション× 1 + × 2
SQS キュー(LogQueue / AlertQueue / AlertDLQ)× 3
Lambda 関数(LogFunction / AlertFunction)× 2
IAM ロール× 2
Lambda デプロイパッケージ(S3)× 1
CloudWatch Logs ロググループ× 2

コンソール版との大きな違い(SAMのメリット):
コンソール版では SNS / SQS × 3 / Lambda × 2 / IAM / ロググループを個別に削除する必要があります。
SAMでは sam delete 1コマンドで全リソースを一括削除できます。


トラブルシューティング

症状原因対処
sam deploy でキューポリシーエラーAWS::SQS::QueuePolicy の記述ミスtemplate.yaml の LogQueuePolicy / AlertQueuePolicy を確認
sns publish 後 LogFunction が動作しないSNS → SQS のサブスクリプションまたはキューポリシーが未設定sam deploy が正常完了しているか、スタックイベントを確認
level=warning でも AlertFunction が動作しないフィルターポリシーの属性名が間違っている--message-attributes"level" がフィルターの "level" と一致しているか確認
json.loads(record["body"]) でエラーSQS を直接テストした(SNS 経由でない)SNS publish コマンドを使っているか確認
DLQ にメッセージが届かない可視性タイムアウト(180秒)× 3回 の待機中最大 9分待つ
sam deployCAPABILITY_IAM エラーsamconfig.tomlcapabilities が未設定capabilities = "CAPABILITY_IAM" を確認

まとめ

今回のハンズオンで実現したこと:

確認項目内容
SNS + SQS のファンアウト1回の publish で LogFunction と AlertFunction が並行して動作
SNS フィルターポリシーlevel=info は LogQueue のみ、level=warning/critical は両キューに配信
DLQAlertFunction が 3回失敗したメッセージが AlertDLQ に退避
AWS::SQS::QueuePolicyコンソールでは自動追加、SAM では明示的定義が必要という差を体験
一括削除sam delete で全リソースをクリーンアップ

SAMのメリットを実感できたポイント

  • SQSPollerPolicy により Lambda の SQS ポーリング権限が1行で完結
  • FilterPolicy で SNS フィルターポリシーをコードとして管理
  • sam delete で SNS / SQS × 3 / Lambda × 2 / IAM を一括削除

コンソール版と比較してみる

SAMが裏で何をやっているか、同じ構成をAWSコンソールのみで構築する手順をまとめました。コンソール版では SNS サブスクリプション作成時にキューポリシーが自動追加される仕組みや、フィルターポリシーの GUI 入力方法など、コンソールならではの操作を解説しています。


関連記事

Amazon検索[本 AWS 開発]

コメント