Sionの技術ブログ

SREとして日々の学習を書いて行きます。twitterは@sion_cojp

Fargate + cloudwatch eventでcronシステム ロギング設計編

f:id:sion_cojp:20190805140304p:plain

前回のお話

Fargate + cloudwatch eventでcronシステム構築 - Sionの技術ブログ

技術

  • aws-cloudwatch-logging module: terraform module。kinesis firehose + lambda。(cloudwatch logsに書き込まれたらs3とdatadog logsに吐き出す)
  • s3: 長期間保存するログを格納
  • datadog logs: リアルタイムな1ヶ月間のログを保管。障害対応用

詳細

loggingは1つのmoduleで作られます。

module "logging_cron" {
  source        = "../../../../../modules/aws-cloudwatch-logging"
  cluster_name  = "cron"
  s3_bucket     = "xxxxxxxxxx"
  dd_tag_system = "cron"
  dd_tag_env    = "sandbox"

  services = [
    "koyama_test",
  ]
}

services に送りたい対象のservice名を書いていけばdatadog logとs3に保管するように設計しております。

firehoseはサービス分作っており、上限が懸念点ではありますが、緩和すれば良い + いつか出るkinesis直結ログドライバーが解決するかもしれない。のでそれまでの辛抱ということで許容してます。

こだわったところ

ネームスペース

最初のロギング周りは下記でした。

logGroup: /Service名
logStream: ランダム

この場合、lambdaのプログラムが影響し、aws-cloudwatch-logging moduleをサービス毎に1つ作らないといけませんでした。

しかし、ネームスペースを下記のようにすることで、lambdaが適切な値を引っ張れるようになり、 services に追加したいものを入れるmoduleにすることができました。

logGroup:/cron/Service名
logStream:/cron/Service名/コンテナ名/taskArn

s3側のロギング

s3については、ソースレコードのバックアップを使えばlambdaを介さずs3にログを保管できます。

lambdaに影響せず、生ログが保管でき安全性が担保できますが、これをやると全てのログが1行で出力されてしまいます。

全てのログがjsonであれば、jqを使ってログ調査が用意になりますが、そうであるとは限らないため、lambdaで改行したデータを保管するようにしました。

datadog側のタグ

logStream:/cron/Service名/コンテナ名/taskArn を設定したことにより、理想のタグをlambdaでつけて送信することが出来ました

system: cron
service: service名
application: コンテナ名
task: taskArn

こうすることで、タグ指定で調査することが出来、より素早い障害対応が可能になります。

datadogのエラータグ

f:id:sion_cojp:20190805204958p:plain

json形式で loglevel:error で出力すると、datadog logsでもErrorタグとして検知されます。

「もしerrorがxx回あったら通知する」という監視が可能になるので、出来る限りjsonで吐くと良いです。

上が loglevel:fatal 下が loglevel:error です。

lambdaファイル

firehoseに紐づけてる、datadog logsに送るlambdaは下記となります。

# coding: utf-8

# cloudwatchのデータをdatadog-logsに流しこむ
# またfirehoseでs3にjsonデータを保管するために整形しreturnする
# ref: https://github.com/DataDog/dd-aws-lambda-functions/blob/master/Log/lambda_function.py

from __future__ import print_function

import io
import base64
import gzip
import json
import socket
import ssl
import os

import boto3

DEFAULT_REGION = 'ap-northeast-1'

DD_URL = os.getenv("DD_URL", default="lambda-intake.logs.datadoghq.com")
#Define the proxy port to forward the logs to
DD_PORT = os.environ.get('DD_PORT', 10516)
DD_SOURCE = "ddsource"
DD_CUSTOM_TAGS = "ddtags"
DD_SERVICE = "service"

# metadata: Additional metadata to send with the logs
metadata = {
    "ddsourcecategory": "aws",
}

# DD_API_KEY: Datadog API Key
DD_API_KEY = boto3.client('ssm').get_parameters(
    Names=[
        "xxxxxxx", # pathは隠します
    ],
    WithDecryption=True
)['Parameters'][0]['Value']

# Pass custom tags as environment variable, ensure comma separated, no trailing comma in envvar!
DD_TAGS = os.environ.get('DD_TAGS', "")
ONLY_JSON = os.environ.get('ONLY_JSON')

class DatadogConnection(object):
    def __init__(self, host, port, ddApiKey):
        self.host = host
        self.port = port
        self.api_key = ddApiKey
        self._sock = None

    def _connect(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s = ssl.wrap_socket(s)
        s.connect((self.host, self.port))
        return s

    def safe_submit_log(self, log):
        try:
            self.send_entry(log)
        except Exception as e:
            # retry once
            self._sock = self._connect()
            self.send_entry(log)
        return self

    def send_entry(self, log_entry):
        # The log_entry can only be a string or a dict
        if isinstance(log_entry, str):
            log_entry = {"message": log_entry}
        elif not isinstance(log_entry, dict):
            raise Exception(
                "Cannot send the entry as it must be either a string or a dict. Provided entry: " + str(log_entry)
            )

        # Merge with metadata
        log_entry = merge_dicts(log_entry, metadata)

        # Send to Datadog
        str_entry = json.dumps(log_entry)
        #For debugging purpose uncomment the following line
        prefix = "%s " % self.api_key
        return self._sock.send((prefix + str_entry + "\n").encode("UTF-8"))

    def __enter__(self):
        self._sock = self._connect()
        return self

    def __exit__(self, ex_type, ex_value, traceback):
        if self._sock:
            self._sock.close()
        if ex_type is not None:
            print("DatadogConnection exit: ", ex_type, ex_value, traceback)

def merge_dicts(a, b, path=None):
    if path is None:
        path = []
    for key in b:
        if key in a:
            if isinstance(a[key], dict) and isinstance(b[key], dict):
                merge_dicts(a[key], b[key], path + [str(key)])
            elif a[key] == b[key]:
                pass  # same leaf value
            else:
                raise Exception(
                    'Conflict while merging metadatas and the log entry at %s' % '.'.join(path + [str(key)])
                )
        else:
            a[key] = b[key]
    return a

def processRecords(records):
    """
    cloudwatch logsから送られてきたデータをデコード + 解凍し、firehoseに適したデータに整形する

    :param records: cloudwatchから送られてくるevent['records']
    """
    for r in records:
        compressed = base64.b64decode(r['data'])
        expand = gzip.decompress(compressed)
        data = json.loads(expand)
        record_id = r['recordId']


        # 送信先が到達可能かどうか
        if data['messageType'] == 'CONTROL_MESSAGE':
            yield {
                'result': 'Dropped',
                'recordId': record_id
            }
        # 通常のデータ
        elif data['messageType'] == 'DATA_MESSAGE':
            yield {
                'data': base64.b64encode(expand).decode('utf-8'),
                'result': 'Ok',
                'recordId': record_id
            }
        # それ以外は失敗扱い
        else:
            yield {
                'result': 'ProcessingFailed',
                'recordId': record_id
            }

def datadog_handler(records, con):
    """
    datadog logsにデータを送信する

    :param records: firehose用に整形されたデータのリスト
    :param con: datadogのconnection

    :return output: firehose用のデータ。applicationのjson部分だけをdataに格納
    """

    metadata[DD_SOURCE] = "firehose"
    output = []

    for r in records:
        payload = ""
        # result OK以外は送信しない
        if r['result'] != 'Ok':
            continue

        # base64デコードとbyte to stringにする
        data = base64.b64decode(r['data']).decode()
        log = json.loads(data)

        if log['messageType'] == 'DATA_MESSAGE':
            for e in log['logEvents']:
                # firehose to s3用データ
                payload += e['message'] + "\n"

                # datadogのserviceタグを付与
                metadata[DD_SERVICE] = log['logGroup'].split("/")[-1]
                metadata[DD_CUSTOM_TAGS] = ",".join(
                    [
                        DD_TAGS,
                        "task:" + log['logStream'].split("/")[-1],
                        "application:" + log['logStream'].split("/")[-2]
                    ]
                )

                # jsonはloadして送信
                if isValidJson(e['message']):
                    # タグを付与
                    # datadog logsに送信
                    con.safe_submit_log(json.loads(e['message']))

                # ONLY_JSONがfalseのときはJSONじゃないものの処理は飛ばす
                elif ONLY_JSON != "false":
                    continue

                # JSON以外は普通に送信
                else:
                    con.safe_submit_log(e['message'])


        output_record = {
            'recordId': r['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(gzip.compress(payload.encode("utf-8"))).decode("utf-8")
        }
        output.append(output_record)

    return output



def isValidJson(json_string):
    """
    isValidJson ... 渡された string が json フォーマットかどうかを確認する
    """
    try:
        json_object = json.loads(json_string)
    except ValueError as e:
        return False
    return True


def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
    """
    firehose streamにデータをputする

    :param streamName: firehose
    :param records: firehose用に整形されたdata list
    :param client: boto3.client('firehose')
    :param attemptsMade: 現在の呼び出しリクエストの試行回数
    :param maxAttempts: 最大試行回数

    """
    failedRecords = []
    codes = []
    errMsg = ''
    # if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
    try:
        response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedPutCount'] > 0:
        for idx, res in enumerate(response['RequestResponses']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:
                continue

            codes.append(res['ErrorCode'])
            failedRecords.append(records[idx])

        errMsg = 'Individual error codes: ' + ','.join(codes)

    if len(failedRecords) > 0:
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
            putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
        else:
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))


def lambda_handler(event, context):
    # Lambda 関数の設計図 kinesis-firehose-cloudwatch-logs-processor-python をベースに
    # 6MB溢れたらfirehoseにputする実装をしてます

    # 最終的にreturnする値
    output = []

    # Check prerequisites
    if DD_API_KEY == "<your_api_key>" or DD_API_KEY == "":
        raise Exception(
            "You must configure your API key before starting this lambda function (see #Parameters section)"
        )

    # パラメータ等のチェック
    if not 'records' in event or not event['records']:
        return {"status": "ok", "msg": "no records"}

    # define variable
    streamARN = event['deliveryStreamArn'] # 溢れたデータを送信するためのkinesis firehose arn
    projectedSize = 0          # 全データのバイト数
    recordsReingestedSoFar = 0 # 溢れたデータが発生したときに、ログにどれくらい溢れたかを測るための変数
    putRecordBatches = []      # 溢れたデータのグループリスト
    recordsToReingest = []     # 溢れたデータリスト

    # cloudwatchからきたデータをデコードして、firehose用のデータに変換
    records = list(processRecords(event['records']))

    # 溢れたデータ用に、転送するための整形データ
    dataByRecordId = {r['recordId']: {'data': base64.b64decode(r['data'])} for r in event['records']}

    # 溢れたデータの処理
    # データが6MB以上になったらlambdaがこけてしまうので、別の配列に入れてあとでfirehoseにputRecordする
    for idx, r in enumerate(records):
        if r['result'] != 'Ok':
            continue
        projectedSize += len(r['data']) + len(r['recordId'])
        if projectedSize > 6000000:
            # 転送するための整形データをappendしてく
            recordsToReingest.append(
                {'Data': dataByRecordId[r['recordId']]['data']}
            )
            records[idx]['result'] = 'Dropped'
            # 既存データから削除
            del(records[idx]['data'])
        # レコード数を1グループ最大500レコードにする
        if len(recordsToReingest) == 500:
            putRecordBatches.append(recordsToReingest)
            recordsToReingest = []

    # 6MB超えた + 500レコード未満のやつをputRecordBatches 配列に入れる
    if len(recordsToReingest) > 0:
        # add the last batch
        putRecordBatches.append(recordsToReingest)

    # 溢れたデータをputRecordsでfirehoseに流し込む
    if len(putRecordBatches) > 0:
        client = boto3.client('firehose')
        for recordBatch in putRecordBatches:
            putRecordsToFirehoseStream(streamARN, recordBatch, client, maxAttempts=20)
            recordsReingestedSoFar += len(recordBatch)
            print('Reingested %d records out of %d' % (recordsReingestedSoFar, len(event['records'])))


    # datadogのsocket生成
    with DatadogConnection(DD_URL, DD_PORT, DD_API_KEY) as con:
        # Add the context to meta
        if "aws" not in metadata:
            metadata["aws"] = {}
        aws_meta = metadata["aws"]
        aws_meta["function_version"] = context.function_version
        aws_meta["invoked_function_arn"] = context.invoked_function_arn

        # datadog logsに送信 + firehose用データを取得
        output = datadog_handler(records, con)

    return {'records': output}