Sionの技術ブログ

SREとして日々の学習を書いて行きます。twitterは@sion_cojp

Fargateでfirelensを利用し、datadog logsとs3にログを保存する

f:id:sion_cojp:20200207102418p:plain

firelens is 何?

fluentd/fluent-bitに連携することが可能なログドライバーです。

なぜfirelensか

firelensが出るまでFargateはawslogsドライバくらいしか選択肢がありませんでした。

s3に生ログの保存、解析+アラート用にdatadog logsに送るためには

fargate -> cloudwatch -> firehose -> lambda -> datadog logs, s3 といった構成を取る必要があり、そこには2つ問題がありました。

  • cloudwatch使わないのに無駄な費用が発生
  • lambdaがたまにこけていた(これは我々の実装の問題だと思います)

fargate(firelens) -> firehose -> datadog logs, s3 とfirelensを使うとこの構成になり、上記解決 + cloudwatch, lambda費用もかからなくなります。

fluent-bitのイメージについて

今回サイドカーで立てるfluent-bitのイメージは

https://hub.docker.com/r/sioncojp/fluent-bit-datadog-firehose を使います。

なぜこのイメージを使うかというと、Fargateはs3にあるconfigファイルを利用できないので、firehose, datadogに送るconfigを設定したカスタムイメージを作成する必要がありました。

ここで注意するのは、 /fluent-bit/etc/fluent-bit.conf/fluentd/etc/fluent.conf のファイルパスは予約されてるので、/fluent-bit/etc/extra.conf などの名前で保管しましょう。

ref: s3にあるconfigが使えない + ファイルパスの予約

ECS task definition

タイミーではecs-deployというGoの自社ツールでecsのtask設定 + service updateをしてますが、そちらで自動的にfluent-bitのContainerDefinition設定がされるようになってます。

fluent-bitの管理はSREなので、開発者には動かしたいコンテナの設定だけしてもらうよう意識してます。

設定されるjsonについて、firelens部分だけ紹介するとこのような感じになります

# サービス側
      "logConfiguration": {
        "logDriver": "awsfirelens",
        "secretOptions": null,
        "options": null
      },
# fluent-bit側
      "environment": [
        {
          "name": "AWS_REGION",
          "value": "ap-northeast-1"
        },
        {
          "name": "DD_SERVICE",
          "value": "koyama"
        },
        {
          "name": "DD_SOURCE",
          "value": "fargate"
        },
        {
          "name": "DD_TAGS",
          "value": "env:sandbox"
        },
        {
          "name": "DELIVERY_STREAM",
          "value": "koyama_koyama"
        }
      ],
      "secrets": [
        {
          "valueFrom": "SSMのDD_API_KEYパス",
          "name": "DD_API_KEY"
        }
      ],
      "firelensConfiguration": {
        "type": "fluentbit",
        "options": {
          "config-file-type": "file",
          "config-file-value": "/fluent-bit/etc/extra.conf"
        }
      },

全task definitionはこちら。

datadogに飛ばす

firelens -> サイドカーのfluentbit -> datadog

### タグ付け
- source: fargate/cron/runtask
- service: コンテナのサービス名
- env: prod/stg/sandbox...

s3に飛ばす

firelens -> fluentbit-> firehose -> s3

https://github.com/cosmo0920/fluent-bit-go-s3 も検証しましたが、1リクエスト1ファイルだったので管理が辛く断念しました。

またfirehoseは各サービスで1つ作るのがベストです。そうしないと同じs3ディレクトリに全ファイルが集約されてしまいます。

### 共有firehose
s3://生ログ保存バケット/共通ディレクトリ/YYYY/MM/DD/ファイル名

### サービス毎にfirehose
s3://生ログ保存バケット/クラスタ名/サービス名/YYYY/MM/DD/ファイル名

1つずつ作り クラスタ名/サービス名 にfirehose prefixを設定することでディレクティブに保管できます。データ分析の際に使いやすくなるよう意識しました。

fluent-bit自体のロギング

awslogsドライバーでcloudwatchに吐くことができますが、特に必要な情報が今の所なさそうだったので出してません。

もし必要になった場合、cloudwatch logsで全fluent-bitログを保管するlog groupを作り、そこのグループにlogstreamにサービス名を定義して保管するのがベストだと思います。

fluent-bit自体のモニタリング

今の所はしてません。問題が起きたらそれに合わせてすると思います。

またessential:trueなので、落ちた場合、taskにある全コンテナ落ちますので、datadogでサービス単位のコンテナ変動をモニタリングはしております。

Fargate + cloudwatch eventでcronシステム モニタリング編

前回のお話

Fargate + cloudwatch eventでcronシステム構築 - Sionの技術ブログ

non exit0かどうか

こちらはlambdaで監視してます。

Fargate + cloudwatch eventでcronシステム構築 - Sionの技術ブログ

ロジックとしては、タスク終了時のイベントをトリガーに、exitCodeの値をとって、0じゃない場合slackに通知してます。

タスクが正常に動いてるかどうか

こちらはdatadog logsの量が0以上なのをチェックしてます。

例えば1分間毎のcronの場合、「1分間でログが0以上あるよね」というチェックをしてます。

クラスタのタスク数と、タスクが並列で動いてないかどうか

クラスタのタスク数

1Clusterにおける、Fargateのタスク上限数は50です

Amazon ECS サービス制限 - Amazon Elastic Container Service

datadogで監視出来るのはFargateのserviceのみ。今回はcloudwatch eventsを通してタスクを動かしてるので取得できませんでした。

これが監視できないと、上限値に達した場合タスクが起動できなくなってしまいます。

その前に検知して上限緩和をしたいですね。

タスクが並列で動いてないか

最初は「datadog logsで決めた量の2倍以上になったら2つ動いてる」というロジックで監視してましたが、

2倍以上にならない場合もあるので、悩みました。

双方の監視はGoで実装し、Fargate service上で動かして監視してます。

github.com

上記でやってることは

- stg, prodのcronクラスター内のタスクの数を監視し、slackに通知。 
- タスクがn個以上実行されてるかを監視します。15分ごとにまとめをslackに通知。(nは設定できます)

となってます。

大変だったところ

メトリクスが取れない?

datadogではfargate service + clusterの場合メトリクスが取れます。

が、今回はfargate + cloudwatch eventなのでクラスタからメトリクスが取れません。

なのでもし中々taskが終わらない...などの調査用に取りたい場合は、サイドカーでdatadog/agentをいつでも立ち上げれるよう、準備するだけでとどめてます。

datadog/agentが立ち上がる前にメインコンテナが起動終了する?

https://hub.docker.com/r/sioncojp/docker-slack

curlでslackにメッセージPOSTするイメージで検証してました。

あまりにも起動、終了が早いため、datadog/agentより先に立ち上がって終了します。

dependsOnでdatadogに向ければ、datadogが立ち上がった後にアプリケーションが起動するので、メトリクスが取れるようになります。

datadog/agentが永遠に立ち上がる?

dependsOnサイドカーのdatadogを向けた場合、永遠に立ち上がってしまいます。

https://github.com/sioncojp/fargate-sidecar-datadog-agent

のようにタスクが全部終了したら、検知してdatadogをshutdownするソリューションを考えましたが、

app = essential: true + dependsOn: datadog/agent
datadog/agent = essential: false

の設定を施せばこれは解決しました

タスクが永久にpending?

例えば、

app:latest = essential: true + dependsOn: datadog/agent
datadog/agent:hogeeee = essential: false

でdatadogをサイドカーで立ててる場合、datadog/agent:hogeeee のような存在しないイメージを指定すると永久にタスクがpendingになります。

細かい原因はよくわかりませんが、もしこのようなパターンを使う場合は、app側のwrapperでサイドカーの起動を失敗検知する設定が必要です

ref: https://docs.docker.com/compose/startup-order/

Fargate + cloudwatch eventでcronシステム ロギング設計編

f:id:sion_cojp:20190805140304p:plain

前回のお話

Fargate + cloudwatch eventでcronシステム構築 - Sionの技術ブログ

技術

  • aws-cloudwatch-logging module: terraform module。kinesis firehose + lambda。(cloudwatch logsに書き込まれたらs3とdatadog logsに吐き出す)
  • s3: 長期間保存するログを格納
  • datadog logs: リアルタイムな1ヶ月間のログを保管。障害対応用

詳細

loggingは1つのmoduleで作られます。

module "logging_cron" {
  source        = "../../../../../modules/aws-cloudwatch-logging"
  cluster_name  = "cron"
  s3_bucket     = "xxxxxxxxxx"
  dd_tag_system = "cron"
  dd_tag_env    = "sandbox"

  services = [
    "koyama_test",
  ]
}

services に送りたい対象のservice名を書いていけばdatadog logとs3に保管するように設計しております。

firehoseはサービス分作っており、上限が懸念点ではありますが、緩和すれば良い + いつか出るkinesis直結ログドライバーが解決するかもしれない。のでそれまでの辛抱ということで許容してます。

こだわったところ

ネームスペース

最初のロギング周りは下記でした。

logGroup: /Service名
logStream: ランダム

この場合、lambdaのプログラムが影響し、aws-cloudwatch-logging moduleをサービス毎に1つ作らないといけませんでした。

しかし、ネームスペースを下記のようにすることで、lambdaが適切な値を引っ張れるようになり、 services に追加したいものを入れるmoduleにすることができました。

logGroup:/cron/Service名
logStream:/cron/Service名/コンテナ名/taskArn

s3側のロギング

s3については、ソースレコードのバックアップを使えばlambdaを介さずs3にログを保管できます。

lambdaに影響せず、生ログが保管でき安全性が担保できますが、これをやると全てのログが1行で出力されてしまいます。

全てのログがjsonであれば、jqを使ってログ調査が用意になりますが、そうであるとは限らないため、lambdaで改行したデータを保管するようにしました。

datadog側のタグ

logStream:/cron/Service名/コンテナ名/taskArn を設定したことにより、理想のタグをlambdaでつけて送信することが出来ました

system: cron
service: service名
application: コンテナ名
task: taskArn

こうすることで、タグ指定で調査することが出来、より素早い障害対応が可能になります。

datadogのエラータグ

f:id:sion_cojp:20190805204958p:plain

json形式で loglevel:error で出力すると、datadog logsでもErrorタグとして検知されます。

「もしerrorがxx回あったら通知する」という監視が可能になるので、出来る限りjsonで吐くと良いです。

上が loglevel:fatal 下が loglevel:error です。

lambdaファイル

firehoseに紐づけてる、datadog logsに送るlambdaは下記となります。

# coding: utf-8

# cloudwatchのデータをdatadog-logsに流しこむ
# またfirehoseでs3にjsonデータを保管するために整形しreturnする
# ref: https://github.com/DataDog/dd-aws-lambda-functions/blob/master/Log/lambda_function.py

from __future__ import print_function

import io
import base64
import gzip
import json
import socket
import ssl
import os

import boto3

DEFAULT_REGION = 'ap-northeast-1'

DD_URL = os.getenv("DD_URL", default="lambda-intake.logs.datadoghq.com")
#Define the proxy port to forward the logs to
DD_PORT = os.environ.get('DD_PORT', 10516)
DD_SOURCE = "ddsource"
DD_CUSTOM_TAGS = "ddtags"
DD_SERVICE = "service"

# metadata: Additional metadata to send with the logs
metadata = {
    "ddsourcecategory": "aws",
}

# DD_API_KEY: Datadog API Key
DD_API_KEY = boto3.client('ssm').get_parameters(
    Names=[
        "xxxxxxx", # pathは隠します
    ],
    WithDecryption=True
)['Parameters'][0]['Value']

# Pass custom tags as environment variable, ensure comma separated, no trailing comma in envvar!
DD_TAGS = os.environ.get('DD_TAGS', "")
ONLY_JSON = os.environ.get('ONLY_JSON')

class DatadogConnection(object):
    def __init__(self, host, port, ddApiKey):
        self.host = host
        self.port = port
        self.api_key = ddApiKey
        self._sock = None

    def _connect(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s = ssl.wrap_socket(s)
        s.connect((self.host, self.port))
        return s

    def safe_submit_log(self, log):
        try:
            self.send_entry(log)
        except Exception as e:
            # retry once
            self._sock = self._connect()
            self.send_entry(log)
        return self

    def send_entry(self, log_entry):
        # The log_entry can only be a string or a dict
        if isinstance(log_entry, str):
            log_entry = {"message": log_entry}
        elif not isinstance(log_entry, dict):
            raise Exception(
                "Cannot send the entry as it must be either a string or a dict. Provided entry: " + str(log_entry)
            )

        # Merge with metadata
        log_entry = merge_dicts(log_entry, metadata)

        # Send to Datadog
        str_entry = json.dumps(log_entry)
        #For debugging purpose uncomment the following line
        prefix = "%s " % self.api_key
        return self._sock.send((prefix + str_entry + "\n").encode("UTF-8"))

    def __enter__(self):
        self._sock = self._connect()
        return self

    def __exit__(self, ex_type, ex_value, traceback):
        if self._sock:
            self._sock.close()
        if ex_type is not None:
            print("DatadogConnection exit: ", ex_type, ex_value, traceback)

def merge_dicts(a, b, path=None):
    if path is None:
        path = []
    for key in b:
        if key in a:
            if isinstance(a[key], dict) and isinstance(b[key], dict):
                merge_dicts(a[key], b[key], path + [str(key)])
            elif a[key] == b[key]:
                pass  # same leaf value
            else:
                raise Exception(
                    'Conflict while merging metadatas and the log entry at %s' % '.'.join(path + [str(key)])
                )
        else:
            a[key] = b[key]
    return a

def processRecords(records):
    """
    cloudwatch logsから送られてきたデータをデコード + 解凍し、firehoseに適したデータに整形する

    :param records: cloudwatchから送られてくるevent['records']
    """
    for r in records:
        compressed = base64.b64decode(r['data'])
        expand = gzip.decompress(compressed)
        data = json.loads(expand)
        record_id = r['recordId']


        # 送信先が到達可能かどうか
        if data['messageType'] == 'CONTROL_MESSAGE':
            yield {
                'result': 'Dropped',
                'recordId': record_id
            }
        # 通常のデータ
        elif data['messageType'] == 'DATA_MESSAGE':
            yield {
                'data': base64.b64encode(expand).decode('utf-8'),
                'result': 'Ok',
                'recordId': record_id
            }
        # それ以外は失敗扱い
        else:
            yield {
                'result': 'ProcessingFailed',
                'recordId': record_id
            }

def datadog_handler(records, con):
    """
    datadog logsにデータを送信する

    :param records: firehose用に整形されたデータのリスト
    :param con: datadogのconnection

    :return output: firehose用のデータ。applicationのjson部分だけをdataに格納
    """

    metadata[DD_SOURCE] = "firehose"
    output = []

    for r in records:
        payload = ""
        # result OK以外は送信しない
        if r['result'] != 'Ok':
            continue

        # base64デコードとbyte to stringにする
        data = base64.b64decode(r['data']).decode()
        log = json.loads(data)

        if log['messageType'] == 'DATA_MESSAGE':
            for e in log['logEvents']:
                # firehose to s3用データ
                payload += e['message'] + "\n"

                # datadogのserviceタグを付与
                metadata[DD_SERVICE] = log['logGroup'].split("/")[-1]
                metadata[DD_CUSTOM_TAGS] = ",".join(
                    [
                        DD_TAGS,
                        "task:" + log['logStream'].split("/")[-1],
                        "application:" + log['logStream'].split("/")[-2]
                    ]
                )

                # jsonはloadして送信
                if isValidJson(e['message']):
                    # タグを付与
                    # datadog logsに送信
                    con.safe_submit_log(json.loads(e['message']))

                # ONLY_JSONがfalseのときはJSONじゃないものの処理は飛ばす
                elif ONLY_JSON != "false":
                    continue

                # JSON以外は普通に送信
                else:
                    con.safe_submit_log(e['message'])


        output_record = {
            'recordId': r['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(gzip.compress(payload.encode("utf-8"))).decode("utf-8")
        }
        output.append(output_record)

    return output



def isValidJson(json_string):
    """
    isValidJson ... 渡された string が json フォーマットかどうかを確認する
    """
    try:
        json_object = json.loads(json_string)
    except ValueError as e:
        return False
    return True


def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
    """
    firehose streamにデータをputする

    :param streamName: firehose
    :param records: firehose用に整形されたdata list
    :param client: boto3.client('firehose')
    :param attemptsMade: 現在の呼び出しリクエストの試行回数
    :param maxAttempts: 最大試行回数

    """
    failedRecords = []
    codes = []
    errMsg = ''
    # if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
    try:
        response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedPutCount'] > 0:
        for idx, res in enumerate(response['RequestResponses']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:
                continue

            codes.append(res['ErrorCode'])
            failedRecords.append(records[idx])

        errMsg = 'Individual error codes: ' + ','.join(codes)

    if len(failedRecords) > 0:
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
            putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
        else:
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))


def lambda_handler(event, context):
    # Lambda 関数の設計図 kinesis-firehose-cloudwatch-logs-processor-python をベースに
    # 6MB溢れたらfirehoseにputする実装をしてます

    # 最終的にreturnする値
    output = []

    # Check prerequisites
    if DD_API_KEY == "<your_api_key>" or DD_API_KEY == "":
        raise Exception(
            "You must configure your API key before starting this lambda function (see #Parameters section)"
        )

    # パラメータ等のチェック
    if not 'records' in event or not event['records']:
        return {"status": "ok", "msg": "no records"}

    # define variable
    streamARN = event['deliveryStreamArn'] # 溢れたデータを送信するためのkinesis firehose arn
    projectedSize = 0          # 全データのバイト数
    recordsReingestedSoFar = 0 # 溢れたデータが発生したときに、ログにどれくらい溢れたかを測るための変数
    putRecordBatches = []      # 溢れたデータのグループリスト
    recordsToReingest = []     # 溢れたデータリスト

    # cloudwatchからきたデータをデコードして、firehose用のデータに変換
    records = list(processRecords(event['records']))

    # 溢れたデータ用に、転送するための整形データ
    dataByRecordId = {r['recordId']: {'data': base64.b64decode(r['data'])} for r in event['records']}

    # 溢れたデータの処理
    # データが6MB以上になったらlambdaがこけてしまうので、別の配列に入れてあとでfirehoseにputRecordする
    for idx, r in enumerate(records):
        if r['result'] != 'Ok':
            continue
        projectedSize += len(r['data']) + len(r['recordId'])
        if projectedSize > 6000000:
            # 転送するための整形データをappendしてく
            recordsToReingest.append(
                {'Data': dataByRecordId[r['recordId']]['data']}
            )
            records[idx]['result'] = 'Dropped'
            # 既存データから削除
            del(records[idx]['data'])
        # レコード数を1グループ最大500レコードにする
        if len(recordsToReingest) == 500:
            putRecordBatches.append(recordsToReingest)
            recordsToReingest = []

    # 6MB超えた + 500レコード未満のやつをputRecordBatches 配列に入れる
    if len(recordsToReingest) > 0:
        # add the last batch
        putRecordBatches.append(recordsToReingest)

    # 溢れたデータをputRecordsでfirehoseに流し込む
    if len(putRecordBatches) > 0:
        client = boto3.client('firehose')
        for recordBatch in putRecordBatches:
            putRecordsToFirehoseStream(streamARN, recordBatch, client, maxAttempts=20)
            recordsReingestedSoFar += len(recordBatch)
            print('Reingested %d records out of %d' % (recordsReingestedSoFar, len(event['records'])))


    # datadogのsocket生成
    with DatadogConnection(DD_URL, DD_PORT, DD_API_KEY) as con:
        # Add the context to meta
        if "aws" not in metadata:
            metadata["aws"] = {}
        aws_meta = metadata["aws"]
        aws_meta["function_version"] = context.function_version
        aws_meta["invoked_function_arn"] = context.invoked_function_arn

        # datadog logsに送信 + firehose用データを取得
        output = datadog_handler(records, con)

    return {'records': output}

Fargate + cloudwatch eventでcronシステム構築

FOLIOでFargate + cloudwatch eventを使ったcron(マイクロバッチ)システムを設計し、実際に本番で動いてるので紹介します。

(ロギング、モニタリングは別記事で紹介したいと思います)

構成図

f:id:sion_cojp:20190903204310p:plain

技術

  • fargate: アプリケーション
  • cloudwatch event schedule: cron形式でFargateをさせる発火
  • cloudwatch logs: awslogsドライバーでアプリのログを出力
  • ecscli: Go製の社内cliツール。ECSタスク登録、cloudwatch eventのschedule put(update)をする

詳細

terraformでcluster / serviceの2つのmoduleがあります。

/* cluster生成 */
module "cron" {
  source       = "modules/aws-ecs/fargate_batch_cron_cluster"
  cluster_name = "cron"
  vpc_id       = "${data.aws_vpc.sandbox.id}"
  dd_tag_env   = "sandbox"

  # monitor sandbox
  slack_webhook_url = "${var.SLACK_WEBHOOK_MONITOR_SANDBOX}"
}

/* koyama_testサービス生成 */
module "koyama_test" {
  source       = "modules/aws-ecs/fargate-cron"
  cluster_name = "cron"
  task_name    = "koyama_test"

  subnets = [
    "${data.aws_subnet.private-subnet-1a.id}",
    "${data.aws_subnet.private-subnet-1c.id}",
  ]
}

clusterは下記を行ってます。

  • ecs cluster作成
  • デフォルトで使える、iam作成
  • non exit 0 の場合、slackに通知するlambda

serviceは下記を行ってます。

  • 対象ecs fargate taskをターゲットとした、cloudwatch event作成
  • cloudwatch eventからecsが操作できる + ecsで使うiam作成

下記のコマンドでdeploy, disable, enable が実行できます。

### deploy
$ ecscli schedule update --env sandbox -t ecs.yml

### 停止
$ aws --profile sandbox events disable-rule --name cron-koyama_test

### 再開
$ aws --profile sandbox events enable-rule --name cron-koyama_test

こだわったところ

applyの負担軽減

$ tree
├── cluster
│   ├── ecs.tf
├── koyama_test
│   ├── ecs.tf
└── logs
    ├── main.tf

のようにサービス毎のディレクトリに分割することで、apply時に他サービスが影響しないようにしました。

SREと開発者の責務分け

taskDefinitionはSRE。containerDefinitionは開発者に設定してもらうようにしました。

理由は、例えばFargateのネットワークはaws-vpcモードなのでSecurityGroupとSubnetIDがタスク定義に必要ですが、そこの設定を開発者にさせたくなかった(というか開発者は分からない)からです。

SRE管轄はecscliのGoコードにハードコーディング。 開発者には下記yamlを書いてもらいdeployしてもらってます。

$ vim ecs.yml
type: "FARGATE"
cluster: "cron"
name: "cron-koyama_test"
containers:
- name: app
  cpu: 256
  memory: 512
  image: sioncojp/docker-slack:latest
  environment:
    WEBHOOK_URL: "https://hooks.slack.com/services/xxxxxxxxx"
schedule:
  scheduleExpression: "rate(1 minute)"
  taskCount: 1

deploy = updateであり、開発者にcreate権限は与えてません。

理由はSREが把握できないcronを自由にdeployされてしまうのはよくないため、事前にSREが作った枠に対し、開発者がupdateする仕組みにしてます。

Fargateのtask CPU, memoryの自動選択

開発者にはcontainerDefinitionを設定してもらってますが、Fargateだと全体のCPU, memoryを決めないといけません。

開発者にはコンテナのCPU, memoryだけ決めさせたかった + 良いことにFargateの全体値は決め打ちだったので、全てのコンテナのCPU, memoryを加算した値から、適切な値を自動で選択させるようにしました。

$ vim internal/fargate/fargate.go
package fargate

import (
    "strconv"
)

// NewTaskCpuMemoryValue ... Fargateのタスク全体のCPU, Memoryを算出
func NewTaskCpuMemoryValue(cpus, mems []int64) (string, string) {
    var c, m int64

    for _, v := range cpus {
        c += v
    }

    for _, v := range mems {
        m += v
    }

    taskCpu, taskMemory := convertTaskValueToString(c, supportMemoryValue(m))

    return taskCpu, taskMemory
}

// supportMemoryValue ... 512 or 1024, 2048 ... のように1024の倍数を算出
func supportMemoryValue(m int64) int64 {
    if m <= 512 {
        return 512
    }
    return (m/1024 + 1) * 1024
}

// convertTaskValueToString ... CPUの範囲をベースに、Taskがサポートしてる値を算出
// https://docs.aws.amazon.com/ja_jp/AmazonECS/latest/developerguide/task-cpu-memory-error.html
func convertTaskValueToString(c, m int64) (string, string) {
    memoryStr := strconv.FormatInt(m, 10)

    switch {
    case c == 256:
        if 512 <= m && m <= 2048 {
            return "256", memoryStr
        }
        fallthrough
    case 256 < c && c <= 512:
        if 1024 <= m && m <= 4096 {
            return "512", memoryStr
        }
        fallthrough
    case 512 < c && c <= 1024:
        if 2048 <= m && m <= 8192 {
            return "1024", memoryStr
        }
        fallthrough
    case 1024 < c && c <= 2048:
        if 4096 <= m && m <= 16384 {
            return "2048", memoryStr
        }
        fallthrough
    case 2048 < c && c <= 4096:
        if 8192 <= m && m <= 30720 {
            return "4096", memoryStr
        }
        fallthrough
    default:
        return "", ""
    }
}

GoでSlackに定常アラートが出たら、スレッドで自動コメントする

github.com

Slackに定常アラートが出たら、スレッドで自動コメントするGoのプログラムです。

今はアラート撲滅に着手出来ないけど、それまで周りに分かりやすいようにコメントを自動でしたい!という時に便利に使えるなぁと思って作りました。

動作

例えばこんなtomlを設定したら

$ vim examples/monitoring.toml
[[action]]
channel = "xxxxx"
in      = "test"
out     = "hoge"

testというワードに反応してスレッドにhogeを書いてくれます。

https://github.com/sioncojp/go-slack-auto-comment/blob/master/docs/go-slack-auto-comment01.png?raw=true


https://github.com/sioncojp/go-slack-auto-comment/blob/master/docs/go-slack-auto-comment02.png?raw=true

https://github.com/sioncojp/go-slack-auto-comment/blob/master/examples/monitoring-channel.toml

にサンプルがあります。

inは正規表現が使えたり、outは改行した書き方が出来ます。

またディレクトリ配下のtomlを全て読み込むので、チャンネルごとにtomlファイルを分離すると良いでしょう。

Usage

事前準備

Customize Slack -> API -> Your Apps -> Create New App

でAppを作り、

  • Bot UsersをONにし、Web版SlackからBOT ID
  • OAuth & PermissionsでBot User OAuth Access Token
  • BasicInformationにあるVerification Token

を取得してください。

動かしてみる

https://github.com/sioncojp/go-slack-auto-comment/tree/master/examples

を参考にconfigを設定してください。

Parameter Storeを使ってる場合は GitHub - sioncojp/tomlssm を使ってるので、 ssm://SSMのAlias名 と書けばIAM, KMS権限があればDecodeしてくれます。

### build
$ make build

### help
$ ./bin/go-slack-auto-comment help
NAME:
   go-slack-auto-comment - A new cli application

USAGE:
   go-slack-auto-comment [global options] command [command options] [arguments...]

VERSION:
   0.0.0

COMMANDS:
     help, h  Shows a list of commands or help for one command

GLOBAL OPTIONS:
   --config-dir value, -c value  Load configuration *.toml in target dir
   --region value, -r value      Setting AWS region for tomlssm (default: "ap-northeast-1")
   --help, -h                    show help
   --version, -v                 print the version

### run。configが入ってるdirectoryを指定
$ ./bin/go-slack-auto-comment -c examples/
{"level":"info","ts":1562750380.530795,"caller":"go-slack-auto-comment/main.go:64","msg":"start..."}

実際にどう利用してる?

datadogからの通知、slowQueryの通知などに対して、設定してたりします f:id:sion_cojp:20190719145235p:plain

[[action]]
channel = "xxxxx"
in      = "^(Re-Triggered|Triggered|Warn):(.*)ECS cluster CPU reservation high.*"
out     = "autoscaleでCPUが下がらなければ、Terraformからcluster台数を増やしましょう"

課題

正規表現に癖があるので、そこが少し難しいです。

例えば

RDS Slow Log [test]

[IP: xxx.xxx.xxx.xxx]

'''
DELETE FROM `xxxx`
'''

にヒットさせようとしたら下記を書きます。

in = "^RDS Slow Log \\[test\\](?s)(.*)FROM `xxx`"

# (?s)(.*) ... 改行含む全ての文字

デバッグは容易ですが、さくっとやるにはサンプル数を増やしたり慣れる必要があります。

最後に

定常アラートをなくすことが大事なので、忘れずに。。

FOLIOに入社して1年でやったこと

やったことをまとめてみます。

terraformのコマンドラッパー(Makefile)作成

  • terraform周りの自動セットアップ
  • terraformのバージョンが固定
  • terraform providerが自動更新
  • 実行ログをs3に保管するようにしたりなどなど。

コンテナ本番導入

コンテナイメージの脆弱性診断チャレンジ

ECS用イメージをcodebuild + packer + bashで生成

batch/cronのコンテナ設計

  • batch: fargate runTask
  • cron: fargate + cloudwatch event
  • でterraform module作成し、検証まで。
  • これから移行していきたいなぁというお気持ちです。

slack deploy

社内ラジオ、映像サイト構築

landing pageサイト構築

  • s3 hosting + cloudfront + waf
  • deployツールがtypescriptだったので、IAM認証系周りのPRを投げたり。

CodeBuild + PackerでAMIを焼く

CodeBuild + Packerを使って、AmazonLinuxベースのimageを焼きます。

完成品こちらです

github.com

構成図

f:id:sion_cojp:20190128200331p:plain

CodeBuildの準備

Terraformでセットアップします。

https://github.com/sioncojp/codebuild-packer/tree/master/terraform/codebuild/packer を参考にして terraform apply してください。

やってることは、

  • CodeBuild作成
  • CodeBuildで使うファイル群を保管するs3作成
  • CodeBuildのIAM作成
  • Packerで使うEC2のIAM instance profile作成
  • Packerで使うEC2のSG作成

instance profileを作っておくことで、後からAMIを焼くための権限をインスタンスに付与することができます。

s3にimageに必要なファイルをsyncし、codebuildにqueueを飛ばす

https://github.com/sioncojp/codebuild-packer/blob/master/Makefile

$ make build IMAGE=images/amazon-linux.json

やってることは、

  • images配下をs3にsync
  • codebuildにqueueを飛ばす

これでCodeBuildが起動し、s3にあるbuildspec.ymlを元に、AMIを焼いてくれます。