RDS(PostgreSQL)がCloudwatchLogsに出力しているメッセージをサブスクリプションフィルター機能により特定のメッセージが出力されたらLambdaを起動させてSQSにメッセージを書き込んだ後にEC2からメッセージを取り出してみます。
構成イメージ

手順の流れ
【前提】
・RDS(PostgreSQL)の用意。※ログをCloudwatchLogsに出力する設定にしていること。
・SQSからメッセージを取り出す用のEC2インスタンス(Linux)の用意。
【手順の流れ】
- SQSの作成
- Lambdaの作成
- CloudwatchLogsでサブスクリプションフィルターの作成
- (EC2がSQSにアクセスするための)VPCエンドポイント作成
- (EC2がSQSにアクセスするための)IAMロールの作成
- 動作確認
1.SQSの作成
Lambdaからメッセージを受け取るためのSQSを作成する。

タイプは標準にして設定は特に変更せずにデフォルトを使用。
名前は「Queue-Lambda-RDSLog」とする。


作成されたことを確認。

2.Lambdaの作成
CloudwatchLogsのサブスクリプションフィルターを作成する前に先にLambdaを作成する。
※サブスクリプションフィルター作成時にLambdaを指定する必要があるため。

関数名は「rdss-log-sqs」としておきます。
ランタイムはPython3.9を使用。


作成されたことを確認。

Lambdaの「コード」としては以下を記述。
import logging import json import base64 import gzip import os import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): sqs = boto3.resource('sqs',region_name='ap-northeast-1') queue = sqs.get_queue_by_name(QueueName='Queue-Lambda-RDSlog') # CloudWatchLogsからのデータはbase64エンコードされているのでデコード decoded_data = base64.b64decode(event['awslogs']['data']) # バイナリに圧縮されているため展開 json_data = json.loads(gzip.decompress(decoded_data)) logger.info("EVENT: " + json.dumps(json_data)) # ログデータ取得 message = json_data['logEvents'][0]['message'] # SQSのBodyにCloudWatchLogsのメッセージ内容を送信 responce = queue.send_message(MessageBody=message)
次にLambdaがSQSにアクセスできるようにロールにアクセス権限を付与します。
ロール(rds-log-sqs-role-j0e0t4lk)はLambda作成時にデフォルトで作成されるものを使用し、
“AmazonSQSFullAccess”を付与します。

3.サブスクリプションフィルターの作成
CloudwatchLogsで特定のメッセージが出力された場合にLambdaを実行させるためにサブスクリプションフィルターを作成します。
まずはRDSのロググループが事前に作成されていることを確認。(RDSを作成すると自動的に作成される。)

対象のロググループを選択して、[作成] > [Lambdaサブスクリプションフィルターの作成]を選択する。

Lambda関数は先ほど作成した「rds-log-sqs」を指定。
その他については下記のとおり指定。
[ログの形式]:その他
[サブスクリプションフィルターのパターン]:ERROR
[サブスクリプションフィルター名]:subscription-filter-rds

テストするログデータを選択では「rds-postgres.0」を選択してみますが、
ERRORを含むメッセージが出力されていないのでテスト結果には何も表示されないです。
このままストリーミング開始を実行します。

サブスクリプションフィルター作成後、Lambda関数を確認するとトリガーとしてCloudwatchLogsが追加されます。

4.(EC2がSQSにアクセスするための)VPCエンドポイント作成
EC2がSQSにアクセスするためのVPCエンドポイントを作成。

サービは「com.amazonaws.ap-northeast-1.sqs」を指定して作成。

VPCエンドポイントにアタッチするセキュリティグループはインバウンドとしてEC2からTCP/443の通信を許可する設定にする。

5.(EC2がSQSにアクセスするための)IAMロールの作成
EC2がSQSにアクセスするためのIAMロールを作成。
“AmazonSQSFullAccess”を付与する。

作成したIAMロールをEC2にアタッチする。

6.動作確認
まずはEC2からSQSのリストが表示されることを確認する。
# aws sqs list-queues 【実行結果】 { "QueueUrls": [ "https://sqs.ap-northeast-1.amazonaws.com/336137140379/Queue-Lambda-RDSl og" ] }
EC2からRDSに接続してERRORを発生させてみる。
# select count * from mybook; 【実行結果】 ERROR: syntax error at or near "from" LINE 1: select count * from mybook;
CloudwatchLogsのログストリームにERRORが表示されたことを確認。

EC2からAWS CLIでSQSのreceive-messageコマンドでキュー(Queue-Lambda-RDSlog)からメッセージを取得してみる。
※–queue-urlにはSQSのURLを指定。
# aws sqs receive-message --queue-url "https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXX/Queue-Lambda-RDSlog" 【実行結果】 { "Messages": [ { "MessageId": "3d2badcd-8475-4de8-97e2-d5a5fe5debb4", "ReceiptHandle": "AQEBbDcbpbQilODmlKuBsunJf6kRyGUYYGZ2e93H0kTxHI8fKtwUvj3cTtFLMVFw54jtLT6jJHDlfgBRcmP/tEn/AZm+MZ9J4Dg4/f4q5RHEKM5w9gTsOe3yqv1hJq3YzYHI/oum2dNcRvbh94sJsU6zRWrC0L7H7enFIe/SLfGheAfETwdhVWNyzNOsmhVrV1AOUVLndzex+aNEOWvQ2bNqhomZMfy2PE7y+mzvkmGqK5sn8eHkqWefjbWlLuk1+OYsTKsrTnTOjrjZGV8awnIIQo+hkpsT4e4GzHljXSBAd38byXKEBTaCvr3Cz2kuo9mMM/97GFnbWmgfALAbYspM6b4JLLvxVVUchAmoDizK8zEdY6siSABs/CdeKb2d3Z0dP1FzeDXsNQRS7KlBxpztvbjvBInuDEgL8SHvIkk0R/0=", "MD5OfBody": "b852dfb322bc79b9accad65e967ccf73", "Body": "2022-10-16 11:59:10 UTC:10.0.2.10(49140):postgres@shimmy:[6239]:ERROR: syntax error at or near "from" at character 16" } ] }
Body部のみを出力する場合はjqを使用して抽出する。
# aws sqs receive-message --queue-url "https://sqs.ap-northeast-1.amazonaws.com/336137140379/Queue-Lambda-RDSlog" | jq -r '.Messages[].Body' 【実行結果】 2022-10-16 11:59:10 UTC:10.0.2.10(49140):postgres@shimmy:[6239]:ERROR: syntax error at or near "from" at character 16
キューのメッセージを削除する場合は、delete-messageコマンドを使用する。
–receipt-handleにはreceive-messageでメッセージを取り出した時に表示されていた値を指定する。
# aws sqs delete-message --queue-url "https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXX/Queue-Lambda-RDSlog" --receipt-handle AQEBbDcbpbQilODmlKuBsunJf6kRyGUYYGZ2e93H0kTxHI8fKtwUvj3cTtFLMVFw54jtLT6jJHDlfgBRcmP/tEn/AZm+MZ9J4Dg4/f4q5RHEKM5w9gTsOe3yqv1hJq3YzYHI/oum2dNcRvbh94sJsU6zRWrC0L7H7enFIe/SLfGheAfETwdhVWNyzNOsmhVrV1AOUVLndzex+aNEOWvQ2bNqhomZMfy2PE7y+mzvkmGqK5sn8eHkqWefjbWlLuk1+OYsTKsrTnTOjrjZGV8awnIIQo+hkpsT4e4GzHljXSBAd38byXKEBTaCvr3Cz2kuo9mMM/97GFnbWmgfALAbYspM6b4JLLvxVVUchAmoDizK8zEdY6siSABs/CdeKb2d3Z0dP1FzeDXsNQRS7KlBxpztvbjvBInuDEgL8SHvIkk0R/0=
コメント