6. AWS LambdaのLambda function作成

本章のゴール: AWS Lambdaを経由して投入したテストデータがAmazon ES上のKibanaで表示される

作業の位置づけ;

_images/bx1_04_overview.png

6.1. IAMロールの作成

AWS Lambdaのfuctionを作成する前に、functionを実行するための権限ロールを作成します

6.1.1. ロールの作成

IAMコンソールのロール一覧から “新しいロールの作成” をクリックし、ウィザードを開始します

ウィザードの各項目は下記のようにしてください

ロール名 jawsdays20160312_lambda_exec
ロールタイプの選択 AWS サービスロール => AWS Lambda を選択
ポリシーのアタッチ <なにも選択せず>

6.1.2. インラインポリシーの設定

  1. IAMコンソールのロール一覧から、先ほど作成した jawsdays20160312_lambda_exec ロールを選択
  2. “インラインポリシー” を選択 => “ここをクリックしてください” をクリック
  3. “カスタムポリシー” => “選択” をクリック
  4. 下記のとおりポリシー名とポリシードキュメントを設定し “ポリシーの検証” をした後 “ポリシーの適用” をクリック
ポリシー名 jawsdays20160312_lambda_exec_adhoc_policy

ポリシードキュメント

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
  ]
}

以上でロールの作成は完了です

6.2. AWS Lambda function作成

Lambdaのコンソールから Get started Now もしくは Create a Lambda function から開始します

テンプレート選択画面では “Skip” を選択し、コード入力画面に遷移し、それぞれ下記の通り入力します

Name jawsdays20160312_to_es
Description Convert from AWS IoT Data to Amazon ES
Runtime Python 2.7
Lambda function code [Edit code inline] にチェックをつけ、後述のpythonコードを貼り付ける
Role jawsdays20160312_lambda_exec

6.2.1. 貼り付けるPythonコード

警告

貼付け時にコード内の YOUR_ES_ENDPOINT はAmazon ESのダッシュボードで得た Endpoint に書き換えるようにしてください

注釈

Amazon ESのインスタンス作成が完了してない場合は、とりあえず YOUR_ES_ENDPOINT のまま進め、その他の確認を終えた後に再度 Endpoint を指定して保存する方法があります

## example) es_endpoint = "search-jawsdays20160312handson-***.ap-northeast-1.es.amazonaws.com"
es_endpoint = "YOUR_ES_ENDPOINT"

es_index = "es-test"
es_type  = "awsiot0"

# REF: https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/python-logging.html
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

http_req_headers = {
    "pragma"      : "no-cache",
    "content-type": "application/json"
}

# REF: http://takatoshiono.hatenablog.com/entry/2014/12/18/192726
from datetime import timedelta, tzinfo
class JST(tzinfo):
    def utcoffset(self, dt):
        return timedelta(hours=9)
    def dst(self, dt):
        return timedelta(0)
    def tzname(self, dt):
        return "JST"
# Usage;
from datetime import datetime
#print datetime.now(tz=JST()).strftime("%Y-%m-%dT%H:%M:%S%z")

import urllib2, json
def lambda_handler(event, context):
    logger.info('got event: {}'.format(event))
    body = []

    es_bulk_header = {"index": {}}
    body.append(es_bulk_header)

    payload = event["state"]["reported"] # When data of client pass through the AWS IoT, become {state: {reported: {PAYLOAD}}}
    es_bulk_row = {
        "deviceId"  : payload["deviceId"],
        "@timestamp": datetime.now(tz=JST()).strftime("%Y-%m-%dT%H:%M:%S%z"),
        "payload"   : payload
    }
    body.append(es_bulk_row)

    post_body = "\n".join(map(json.dumps, body)) + "\n"
    logger.debug('post_body: {}'.format(post_body))

    # REF: http://takuya-1st.hatenablog.jp/entry/2014/08/23/023707
    invoke_url = "http://" + "/".join([es_endpoint, es_index, es_type, "_bulk"])
    logger.debug('invoke url: {}'.format(invoke_url))
    req = urllib2.Request(invoke_url, post_body, http_req_headers)
    res = urllib2.urlopen(req)
    logger.info('res body   :{}'.format(res.read()))

    return "done"

6.3. テスト

6.3.1. テストデータを設定

[Actions] - [Configure test event] に下記JSONを入力して “Save” してください

{ "state":
  { "reported":
    { "deviceId": "lambda-test0",
      "field1" : 2,
      "field2" : "lambda_test"
    }
  }
}
_images/bx1_04_lambda-test.png

注釈

{state:{reported: {PAYLOAD...}}} はAWS IoTからの出力フォーマットに合わせたものです

6.3.2. テストを実行し、Amazon ESでデータを確認する

“Test” でLambda functionのテストを実行できます

“Execution result: succeeded” (グレー部分には “done”) の表示が出れば、Amazon ESのKibana上で今投入したテストデータが確認できます (“payload.キー: 値 という形になります”)

_images/bx1_04_kibana.png

ここまで到達できればゴールです

AWS IoTの設定 へ進む

6.4. トラブルシュート

6.4.1. コードの保存失敗

たまに [Save] が失敗することがあります

  1. Pythonコードの文法をチェックしてみてください
  2. 再度アップロードを実行してみてください

6.4.2. Lambda functionの実行失敗

CloudWatchにLambdaの実行ログが表示されています

  1. ロールは適切に作成されていますか?
  2. ログが出ない場合、ポリシーは適切に設定されていますか?

6.4.3. Amazon ESへの接続失敗

Amazon ESが立ち上がっていなかったり、Endpointが間違ってたりすると接続エラーが発生します

その場合のAWS Lambdaの実行ログは下記のようになります

<urlopen error [Errno -2] Name or service not known>: URLError
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 51, in lambda_handler
    res = urllib2.urlopen(req)
  1. Amazon ESが起動しているか確認してください
  2. Amazon ESの Endpoint が間違っていないか確認してください

6.4.4. Amazon ESにデータが出ない

  1. テストデータの形式確認してみてください