6. AWS LambdaのLambda function作成¶
本章のゴール: AWS Lambdaを経由して投入したテストデータがAmazon ES上のKibanaで表示される
作業の位置づけ;
6.1. IAMロールの作成¶
AWS Lambdaのfuctionを作成する前に、functionを実行するための権限ロールを作成します
6.1.1. ロールの作成¶
IAMコンソールのロール一覧から “新しいロールの作成” をクリックし、ウィザードを開始します
ウィザードの各項目は下記のようにしてください
ロール名 | jawsdays20160312_lambda_exec |
ロールタイプの選択 | AWS サービスロール => AWS Lambda を選択 |
ポリシーのアタッチ | <なにも選択せず> |
6.1.2. インラインポリシーの設定¶
- IAMコンソールのロール一覧から、先ほど作成した jawsdays20160312_lambda_exec ロールを選択
- “インラインポリシー” を選択 => “ここをクリックしてください” をクリック
- “カスタムポリシー” => “選択” をクリック
- 下記のとおりポリシー名とポリシードキュメントを設定し “ポリシーの検証” をした後 “ポリシーの適用” をクリック
ポリシー名 | jawsdays20160312_lambda_exec_adhoc_policy |
ポリシードキュメント
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
以上でロールの作成は完了です
6.2. AWS Lambda function作成¶
Lambdaのコンソールから Get started Now もしくは Create a Lambda function から開始します
テンプレート選択画面では “Skip” を選択し、コード入力画面に遷移し、それぞれ下記の通り入力します
Name | jawsdays20160312_to_es |
Description | Convert from AWS IoT Data to Amazon ES |
Runtime | Python 2.7 |
Lambda function code | [Edit code inline] にチェックをつけ、後述のpythonコードを貼り付ける |
Role | jawsdays20160312_lambda_exec |
6.2.1. 貼り付けるPythonコード¶
警告
貼付け時にコード内の YOUR_ES_ENDPOINT はAmazon ESのダッシュボードで得た Endpoint に書き換えるようにしてください
注釈
Amazon ESのインスタンス作成が完了してない場合は、とりあえず YOUR_ES_ENDPOINT のまま進め、その他の確認を終えた後に再度 Endpoint を指定して保存する方法があります
## example) es_endpoint = "search-jawsdays20160312handson-***.ap-northeast-1.es.amazonaws.com"
es_endpoint = "YOUR_ES_ENDPOINT"
es_index = "es-test"
es_type = "awsiot0"
# REF: https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/python-logging.html
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
http_req_headers = {
"pragma" : "no-cache",
"content-type": "application/json"
}
# REF: http://takatoshiono.hatenablog.com/entry/2014/12/18/192726
from datetime import timedelta, tzinfo
class JST(tzinfo):
def utcoffset(self, dt):
return timedelta(hours=9)
def dst(self, dt):
return timedelta(0)
def tzname(self, dt):
return "JST"
# Usage;
from datetime import datetime
#print datetime.now(tz=JST()).strftime("%Y-%m-%dT%H:%M:%S%z")
import urllib2, json
def lambda_handler(event, context):
logger.info('got event: {}'.format(event))
body = []
es_bulk_header = {"index": {}}
body.append(es_bulk_header)
payload = event["state"]["reported"] # When data of client pass through the AWS IoT, become {state: {reported: {PAYLOAD}}}
es_bulk_row = {
"deviceId" : payload["deviceId"],
"@timestamp": datetime.now(tz=JST()).strftime("%Y-%m-%dT%H:%M:%S%z"),
"payload" : payload
}
body.append(es_bulk_row)
post_body = "\n".join(map(json.dumps, body)) + "\n"
logger.debug('post_body: {}'.format(post_body))
# REF: http://takuya-1st.hatenablog.jp/entry/2014/08/23/023707
invoke_url = "http://" + "/".join([es_endpoint, es_index, es_type, "_bulk"])
logger.debug('invoke url: {}'.format(invoke_url))
req = urllib2.Request(invoke_url, post_body, http_req_headers)
res = urllib2.urlopen(req)
logger.info('res body :{}'.format(res.read()))
return "done"
6.3. テスト¶
6.3.1. テストデータを設定¶
[Actions] - [Configure test event] に下記JSONを入力して “Save” してください
{ "state":
{ "reported":
{ "deviceId": "lambda-test0",
"field1" : 2,
"field2" : "lambda_test"
}
}
}
注釈
{state:{reported: {PAYLOAD...}}}
はAWS IoTからの出力フォーマットに合わせたものです
6.3.2. テストを実行し、Amazon ESでデータを確認する¶
“Test” でLambda functionのテストを実行できます
“Execution result: succeeded” (グレー部分には “done”) の表示が出れば、Amazon ESのKibana上で今投入したテストデータが確認できます (“payload.キー: 値 という形になります”)
ここまで到達できればゴールです
AWS IoTの設定 へ進む
6.4. トラブルシュート¶
6.4.2. Lambda functionの実行失敗¶
CloudWatchにLambdaの実行ログが表示されています
- ロールは適切に作成されていますか?
- ログが出ない場合、ポリシーは適切に設定されていますか?
6.4.3. Amazon ESへの接続失敗¶
Amazon ESが立ち上がっていなかったり、Endpointが間違ってたりすると接続エラーが発生します
その場合のAWS Lambdaの実行ログは下記のようになります
<urlopen error [Errno -2] Name or service not known>: URLError
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 51, in lambda_handler
res = urllib2.urlopen(req)
- Amazon ESが起動しているか確認してください
- Amazon ESの Endpoint が間違っていないか確認してください
6.4.4. Amazon ESにデータが出ない¶
- テストデータの形式確認してみてください