前回のお話
Fargate + cloudwatch eventでcronシステム構築 - Sionの技術ブログ
技術
- aws-cloudwatch-logging module: terraform module。kinesis firehose + lambda。(cloudwatch logsに書き込まれたらs3とdatadog logsに吐き出す)
- s3: 長期間保存するログを格納
- datadog logs: リアルタイムな1ヶ月間のログを保管。障害対応用
詳細
loggingは1つのmoduleで作られます。
module "logging_cron" { source = "../../../../../modules/aws-cloudwatch-logging" cluster_name = "cron" s3_bucket = "xxxxxxxxxx" dd_tag_system = "cron" dd_tag_env = "sandbox" services = [ "koyama_test", ] }
services
に送りたい対象のservice名を書いていけばdatadog logとs3に保管するように設計しております。
firehoseはサービス分作っており、上限が懸念点ではありますが、緩和すれば良い + いつか出るkinesis直結ログドライバーが解決するかもしれない。のでそれまでの辛抱ということで許容してます。
こだわったところ
ネームスペース
最初のロギング周りは下記でした。
logGroup: /Service名 logStream: ランダム
この場合、lambdaのプログラムが影響し、aws-cloudwatch-logging moduleをサービス毎に1つ作らないといけませんでした。
しかし、ネームスペースを下記のようにすることで、lambdaが適切な値を引っ張れるようになり、 services
に追加したいものを入れるmoduleにすることができました。
logGroup:/cron/Service名 logStream:/cron/Service名/コンテナ名/taskArn
s3側のロギング
s3については、ソースレコードのバックアップを使えばlambdaを介さずs3にログを保管できます。
lambdaに影響せず、生ログが保管でき安全性が担保できますが、これをやると全てのログが1行で出力されてしまいます。
全てのログがjsonであれば、jqを使ってログ調査が用意になりますが、そうであるとは限らないため、lambdaで改行したデータを保管するようにしました。
datadog側のタグ
logStream:/cron/Service名/コンテナ名/taskArn
を設定したことにより、理想のタグをlambdaでつけて送信することが出来ました
system: cron service: service名 application: コンテナ名 task: taskArn
こうすることで、タグ指定で調査することが出来、より素早い障害対応が可能になります。
datadogのエラータグ
json形式で loglevel:error で出力すると、datadog logsでもErrorタグとして検知されます。
「もしerrorがxx回あったら通知する」という監視が可能になるので、出来る限りjsonで吐くと良いです。
上が loglevel:fatal
下が loglevel:error
です。
lambdaファイル
firehoseに紐づけてる、datadog logsに送るlambdaは下記となります。
# coding: utf-8 # cloudwatchのデータをdatadog-logsに流しこむ # またfirehoseでs3にjsonデータを保管するために整形しreturnする # ref: https://github.com/DataDog/dd-aws-lambda-functions/blob/master/Log/lambda_function.py from __future__ import print_function import io import base64 import gzip import json import socket import ssl import os import boto3 DEFAULT_REGION = 'ap-northeast-1' DD_URL = os.getenv("DD_URL", default="lambda-intake.logs.datadoghq.com") #Define the proxy port to forward the logs to DD_PORT = os.environ.get('DD_PORT', 10516) DD_SOURCE = "ddsource" DD_CUSTOM_TAGS = "ddtags" DD_SERVICE = "service" # metadata: Additional metadata to send with the logs metadata = { "ddsourcecategory": "aws", } # DD_API_KEY: Datadog API Key DD_API_KEY = boto3.client('ssm').get_parameters( Names=[ "xxxxxxx", # pathは隠します ], WithDecryption=True )['Parameters'][0]['Value'] # Pass custom tags as environment variable, ensure comma separated, no trailing comma in envvar! DD_TAGS = os.environ.get('DD_TAGS', "") ONLY_JSON = os.environ.get('ONLY_JSON') class DatadogConnection(object): def __init__(self, host, port, ddApiKey): self.host = host self.port = port self.api_key = ddApiKey self._sock = None def _connect(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = ssl.wrap_socket(s) s.connect((self.host, self.port)) return s def safe_submit_log(self, log): try: self.send_entry(log) except Exception as e: # retry once self._sock = self._connect() self.send_entry(log) return self def send_entry(self, log_entry): # The log_entry can only be a string or a dict if isinstance(log_entry, str): log_entry = {"message": log_entry} elif not isinstance(log_entry, dict): raise Exception( "Cannot send the entry as it must be either a string or a dict. Provided entry: " + str(log_entry) ) # Merge with metadata log_entry = merge_dicts(log_entry, metadata) # Send to Datadog str_entry = json.dumps(log_entry) #For debugging purpose uncomment the following line prefix = "%s " % self.api_key return self._sock.send((prefix + str_entry + "\n").encode("UTF-8")) def __enter__(self): self._sock = self._connect() return self def __exit__(self, ex_type, ex_value, traceback): if self._sock: self._sock.close() if ex_type is not None: print("DatadogConnection exit: ", ex_type, ex_value, traceback) def merge_dicts(a, b, path=None): if path is None: path = [] for key in b: if key in a: if isinstance(a[key], dict) and isinstance(b[key], dict): merge_dicts(a[key], b[key], path + [str(key)]) elif a[key] == b[key]: pass # same leaf value else: raise Exception( 'Conflict while merging metadatas and the log entry at %s' % '.'.join(path + [str(key)]) ) else: a[key] = b[key] return a def processRecords(records): """ cloudwatch logsから送られてきたデータをデコード + 解凍し、firehoseに適したデータに整形する :param records: cloudwatchから送られてくるevent['records'] """ for r in records: compressed = base64.b64decode(r['data']) expand = gzip.decompress(compressed) data = json.loads(expand) record_id = r['recordId'] # 送信先が到達可能かどうか if data['messageType'] == 'CONTROL_MESSAGE': yield { 'result': 'Dropped', 'recordId': record_id } # 通常のデータ elif data['messageType'] == 'DATA_MESSAGE': yield { 'data': base64.b64encode(expand).decode('utf-8'), 'result': 'Ok', 'recordId': record_id } # それ以外は失敗扱い else: yield { 'result': 'ProcessingFailed', 'recordId': record_id } def datadog_handler(records, con): """ datadog logsにデータを送信する :param records: firehose用に整形されたデータのリスト :param con: datadogのconnection :return output: firehose用のデータ。applicationのjson部分だけをdataに格納 """ metadata[DD_SOURCE] = "firehose" output = [] for r in records: payload = "" # result OK以外は送信しない if r['result'] != 'Ok': continue # base64デコードとbyte to stringにする data = base64.b64decode(r['data']).decode() log = json.loads(data) if log['messageType'] == 'DATA_MESSAGE': for e in log['logEvents']: # firehose to s3用データ payload += e['message'] + "\n" # datadogのserviceタグを付与 metadata[DD_SERVICE] = log['logGroup'].split("/")[-1] metadata[DD_CUSTOM_TAGS] = ",".join( [ DD_TAGS, "task:" + log['logStream'].split("/")[-1], "application:" + log['logStream'].split("/")[-2] ] ) # jsonはloadして送信 if isValidJson(e['message']): # タグを付与 # datadog logsに送信 con.safe_submit_log(json.loads(e['message'])) # ONLY_JSONがfalseのときはJSONじゃないものの処理は飛ばす elif ONLY_JSON != "false": continue # JSON以外は普通に送信 else: con.safe_submit_log(e['message']) output_record = { 'recordId': r['recordId'], 'result': 'Ok', 'data': base64.b64encode(gzip.compress(payload.encode("utf-8"))).decode("utf-8") } output.append(output_record) return output def isValidJson(json_string): """ isValidJson ... 渡された string が json フォーマットかどうかを確認する """ try: json_object = json.loads(json_string) except ValueError as e: return False return True def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts): """ firehose streamにデータをputする :param streamName: firehose :param records: firehose用に整形されたdata list :param client: boto3.client('firehose') :param attemptsMade: 現在の呼び出しリクエストの試行回数 :param maxAttempts: 最大試行回数 """ failedRecords = [] codes = [] errMsg = '' # if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid # response will prevent this response = None try: response = client.put_record_batch(DeliveryStreamName=streamName, Records=records) except Exception as e: failedRecords = records errMsg = str(e) # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results if not failedRecords and response and response['FailedPutCount'] > 0: for idx, res in enumerate(response['RequestResponses']): # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest if 'ErrorCode' not in res or not res['ErrorCode']: continue codes.append(res['ErrorCode']) failedRecords.append(records[idx]) errMsg = 'Individual error codes: ' + ','.join(codes) if len(failedRecords) > 0: if attemptsMade + 1 < maxAttempts: print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg)) putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts) else: raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg)) def lambda_handler(event, context): # Lambda 関数の設計図 kinesis-firehose-cloudwatch-logs-processor-python をベースに # 6MB溢れたらfirehoseにputする実装をしてます # 最終的にreturnする値 output = [] # Check prerequisites if DD_API_KEY == "<your_api_key>" or DD_API_KEY == "": raise Exception( "You must configure your API key before starting this lambda function (see #Parameters section)" ) # パラメータ等のチェック if not 'records' in event or not event['records']: return {"status": "ok", "msg": "no records"} # define variable streamARN = event['deliveryStreamArn'] # 溢れたデータを送信するためのkinesis firehose arn projectedSize = 0 # 全データのバイト数 recordsReingestedSoFar = 0 # 溢れたデータが発生したときに、ログにどれくらい溢れたかを測るための変数 putRecordBatches = [] # 溢れたデータのグループリスト recordsToReingest = [] # 溢れたデータリスト # cloudwatchからきたデータをデコードして、firehose用のデータに変換 records = list(processRecords(event['records'])) # 溢れたデータ用に、転送するための整形データ dataByRecordId = {r['recordId']: {'data': base64.b64decode(r['data'])} for r in event['records']} # 溢れたデータの処理 # データが6MB以上になったらlambdaがこけてしまうので、別の配列に入れてあとでfirehoseにputRecordする for idx, r in enumerate(records): if r['result'] != 'Ok': continue projectedSize += len(r['data']) + len(r['recordId']) if projectedSize > 6000000: # 転送するための整形データをappendしてく recordsToReingest.append( {'Data': dataByRecordId[r['recordId']]['data']} ) records[idx]['result'] = 'Dropped' # 既存データから削除 del(records[idx]['data']) # レコード数を1グループ最大500レコードにする if len(recordsToReingest) == 500: putRecordBatches.append(recordsToReingest) recordsToReingest = [] # 6MB超えた + 500レコード未満のやつをputRecordBatches 配列に入れる if len(recordsToReingest) > 0: # add the last batch putRecordBatches.append(recordsToReingest) # 溢れたデータをputRecordsでfirehoseに流し込む if len(putRecordBatches) > 0: client = boto3.client('firehose') for recordBatch in putRecordBatches: putRecordsToFirehoseStream(streamARN, recordBatch, client, maxAttempts=20) recordsReingestedSoFar += len(recordBatch) print('Reingested %d records out of %d' % (recordsReingestedSoFar, len(event['records']))) # datadogのsocket生成 with DatadogConnection(DD_URL, DD_PORT, DD_API_KEY) as con: # Add the context to meta if "aws" not in metadata: metadata["aws"] = {} aws_meta = metadata["aws"] aws_meta["function_version"] = context.function_version aws_meta["invoked_function_arn"] = context.invoked_function_arn # datadog logsに送信 + firehose用データを取得 output = datadog_handler(records, con) return {'records': output}