Sionの技術ブログ

SREとして日々の学習を書いて行きます。twitterは@sion_cojp

AWS S3 Glacierの取り出しをRubyで検証してみる

f:id:sion_cojp:20210316152558p:plain

What

  • S3のGlacier, Glacier Deep Archiveがあるが、実際に取り出したことがあまりない
  • 取り出すにはリクエストが必要
  • 今働いてる chikakurailsをメインにしてるので、rubyで検証しました

Glacier周りの料金や仕様

f:id:sion_cojp:20210316145443p:plain:w400

ref: https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/dev/restoring-objects.html

やったこと

  • s3にオブジェクト(動画)を保存
  • 動画は https://www.home-movie.biz/free_movie.html から拝借しております
  • 下記のように3種類ストレージクラスを用意し、ダウンロードしてみる
    • STANDARD
    • Glacier...今回は迅速取り出し(Expedited)をやります
    • Deep Archive

f:id:sion_cojp:20210316143845p:plain

今回使ったコード

Gemfile

source 'https://rubygems.org'

gem 'aws-sdk-s3'

main.rb

### 秘匿情報はxxxxxで隠してます
### awsにはsaml認証経由でアクセスしてます
require 'aws-sdk-s3'

ENV['AWS_REGION'] = "ap-northeast-1"

def main(object_key)
  bucket_name = 'xxxxx'
  local_path = "./#{object_key}"
  role_arn = "arn:aws:iam::xxxxx:role/xxxxx"
  role_session_name = "xxxxx"

  sts_client = Aws::STS::Client.new(profile: 'saml')
  role_credentials = Aws::AssumeRoleCredentials.new(
    client: sts_client,
    role_arn: role_arn,
    role_session_name: role_session_name
  )

  client = Aws::S3::Client.new(credentials: role_credentials)

  s3_download(client, bucket_name, object_key, local_path)
end

# glacierはrestoreしてからダウンロードする
def s3_download(s3_client, bucket_name, object_key, local_path)
  # storageクラスによってはrestoreする必要があるので、取得する
  resp = s3_client.list_objects_v2(
    {
      bucket: bucket_name,
      prefix: object_key,
    }
  )

  case resp.contents[0].storage_class
  when "STANDARD"
  when "GLACIER"
    restore(s3_client, bucket_name, object_key, local_path, "Expedited")
  when "DEEP_ARCHIVE"
    # こいつだけrestoreが終わるまで下記エラーが出て実行できない
    # Error getting object: Object restore is already in progress
    restore(s3_client, bucket_name, object_key, local_path, "Standard")
  end
  download(s3_client, bucket_name, object_key, local_path)

rescue StandardError => e
  puts "Error getting object: #{e.message}"
end

# glacierはrestoreする
def restore(s3_client, bucket_name, object_key, local_path, tier)
  s3_client.restore_object(
    {
      bucket: bucket_name,
      key: object_key,
      restore_request: {
        days: 1,
        glacier_job_parameters: {
          # https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/dev/restoring-objects.html
          # Standard...標準取り出し
          # Bulk...大容量取り出し
          # Expedited...迅速取り出し。deep archiveは対応してない
          tier: tier
        },
      },
    }
  )
end

# download
def download(s3_client, bucket_name, object_key, local_path)
  s3_client.get_object(
    response_target: local_path,
    bucket: bucket_name,
    key: object_key
  )
end

# 動作確認用
def list_bukets(s3_client)
  puts s3_client.list_buckets
end

# 動作確認用
def list_objects(s3_client, bucket_name)
  s3_client.list_objects(bucket: bucket_name).contents.each do |object|
    p object.key
  end
end

# STANDARD
main(mov_hts-samp001.mp4)

# Glaicer
main(mov_hts-samp002.mp4)

# Deep Archive
main(mov_hts-samp003.mp4)

実行

$ bundle install --path vendor/bundle

# 最初はglacier両方ともエラー
$ bundle exec ruby main.rb
Error getting object: Object restore is already in progress
Error getting object: Object restore is already in progress

# 1分後くらい
$ bundle exec ruby main.rb
Error getting object: Object restore is already in progress

# Glacierはダウンロードできたけど、Deep Archiveはだめだった
# 30分すぎても取り出せないので諦めた

結論

terraform 0.13でローカルproviderを利用する方法

はじめに

terraform 0.13からproviderが https://registry.terraform.io/ から配布され、下記のように記述すると自動取得するようになりました。

# ref: https://www.terraform.io/upgrade-guides/0-13.html#explicit-provider-source-locations
terraform {
  required_providers {
    azurerm = {
      source  = "hashicorp/azurerm"
      version = "~> 2.12"
    }
    newrelic = {
      source  = "newrelic/newrelic"
      version = "~> 2.1.1"
    }
  }
}

0.13でprovider開発などでローカルで利用する場合や、まだregistryに上がってないcommunity providerはどうするのか?

0.12では ~/.terraform.d/plugins に置いてました。

しかし、これをやっても読み込めずエラーが出てしまいます。それを私が実際にどうやって解決したかを書きます。

他に良い方法を知ってる方がいらっしゃれば別途記事を書いてもらえると有難いです。

今回は https://github.com/anasinnyk/terraform-provider-1password を例として紹介します。

1. providerのバイナリを配置します

### https://github.com/anasinnyk/terraform-provider-1password/releases/tag/1.1.0
### をダウンロードして下記のように配置します。このファイル名形式じゃないとダメです
/Users/sion_cojp/.terraform.d/plugins/registry.terraform.io/hashicorp/onepassword/1.1.0/darwin_amd64/terraform-provider-onepassword_v1.1.0


### hashicorp じゃないと下記エラーが出ます
# Error: Failed to query available provider packages
# 
# Could not retrieve the list of available versions for provider
# anasinnyk/onepassword: provider registry.terraform.io/anasinnyk/onepassword
# was not found in any of the search locations
# 
# - /Users/sion_cojp/.terraform.d/plugins

2. required_providerを書きます

$ vim backend.tf
terraform {
  backend "s3" {
    必要であれば書く
  }

  required_providers {
    onepassword = {
      source  = "hashicorp/onepassword"
      email = "sion_cojp@xxxxx"
      password = "xxxxxxx"
      secret_key = "xxxxxx"
      subdomain = "organization_name"
    }
  }
}

3. 1passwordにcliからsign inしてセッションキーをexportします

### signinしたらOP_SESSIONが出力されます
$ op signin organization_name.1password.com sion_cojp@xxxxx

export OP_SESSION_organization_name="xxxxxxxx"

### OP_SESSION_myに変えてexportする必要があります。理由はわかりませんが、やらないと下記エラーのどれかがでます
# 1. Error: email, password or secret_key is empty and environment variable OP_SESSION_my is not set
# 2. Output: [ERROR] 2020/10/13 17:07:29 You are not currently signed in. Please run `op signin --help` for instructions
export OP_SESSION_my="xxxxxxxx"

4. applyやshowをします

$ vim main.tf
data onepassword_user this {
  email = "sion_cojp@xxx"
}

### initでpluginの場所を指定するのがポイント。絶対パスじゃないとダメです
$ terraform init -plugin-dir=/Users/sion_cojp/.terraform.d/plugins
$ terraform show
# module.users.data.onepassword_user.this:
data "onepassword_user" "this" {
    email     = "sion_cojp@xxxxxx"
    firstname = "Shohei Koyama"
    id        = "xxxxx"
    state     = "A"
}

Reference

タイミーのEMとしてどのようにマネジメントしているか

はじめに

16歳からゲーマーとしてずっとチームマネジメントをやっており、

自身の強みなので本腰入れてやりたいという思いで1月にタイミーに入社しました。

結果、今はSREマネージャー(2人) + BI基盤チームマネージャー(3人) + コーポレートエンジニア(3人)として働いてます。

そんなにメンバーはいませんがマネージャーはやれてると思ってるので、色々と書いてみたいと思います。

そんなに掛け持ちして大丈夫なの?

スタートアップは仕方ないです。ぜひ強い方募集してます。

自分の脳内の長期計画は全てアウトプットしてチームと精査してるので出来る人がやればいい状態です。

私自身マネジメントしながら、業務でコードも書いてレビューもしてるので、自身の時間を作るために自走できるチームを作り上げるためのマネジメントでもあります。

マネージャーとしてどういう動きしてるの?

- ローテーション可能のものとする
- マネージャーを無くす意思って良いチームを
- 1on1でフィードバックをもらいましょう
- チーム内で最適な人がやりましょう

基本方針はこれです。(私がマネジメントしてるチームのオンボーディングに書いてます)

ローテーションは、それが出来れば理想的だなぁと思ってます。

なので、チームの権限は同じで、by nameで呼ばれた会議やメッセージはチームメンションに促してます。

もちろんベンダーとのメールのやりとりはslackに飛ばしてます。

あとは拾うべき役割の人がやってくれるでしょう。

私はマネージャーというのは偉い立場ではなく、あくまで役割だと思ってるのでチームメンバーと同じ扱いでいいと思ってます。

1on1

チームメンバーと1on1をまずは週1。徐々に週2くらいの間隔で30分行なってます。

質問は、その人のslackでの動きや発言の仕草、仕事ぶりを見て「その人が今どう悩んでるか」を分析し、それを踏まえて聞きたいことを考えてます。

もちろん1on1をやる前に項目を毎回伝えてます。またテンプレートで聞く部分もあります。

最近は

「もし10万円与えられたら、より効率的なリモートワークをするには何に使いますか?」

「変えたいと思ってる会社全体の文化はありますか?」

とか聞いたりしました。

ちなみに最初の1on1で送るメッセージはテンプレートでこれです。

### 最初なので1週間に1日/30分ほど作っていきたい気持ちです。
### そこから徐々に頻度を下げていきます
- キャリア
- 今後の休暇や働き方
- 目標とアクション決定
- 目標を阻害する問題に対して助けられること
- 時間があればその他

なぜ1on1をやるの?

# メンバー向け
- チームの意思を統一するためのコーチング
- チームワークのためのコーチング
- キャリア開発
- 可能性を広げていく

# 自身向け
- メンバーを知り、チームの可能性が広がるピースを考える(採用)
- メンバーがやりたいことを知り、その役割に当てはめる
- 自身のフィードバックをもらう
- 会社やチームがより成長するアドバイスをもらう

などを意識してます。

「良いチームを作るためにメンバーの変化に柔軟に対応して常にベストな状態にする」ために、個性やステータス(特に体調不良やモチベーション)を理解して、チームにどう適用するか戦略を立てていく。

ゲームもそうですが、他スポーツもそうですね。

意思の統一?

意思の統一 = ベクトルが同じかどうかです。

無駄にだらだらコミュニケーション(意思疎通)をやるより、まず先に意思を統一すれば相互理解も早まってシンプルに解決しやすい思ってます。

そうすることで、必要なときに助けを求めやすくなるし、ダイバーシティーなチームでも回答も一致しやすくなります。そのためには相手が自身や相手がどんな人かを理解する必要があります。

- 自分がどういうポジションで動きたいか、動いてるか理解して働きましょう
- 相手がどういう役割なのか、相手がどういう思考をする人なのかを意識しましょう
- 仕事には感情入れずシンプルに会話しましょう

シンプルな会話はリモートワークにおいて非常に効果的なテクニックであり難しいと思ってます。

「会話の途中途中に絵文字はなるべく使わない」「できる限り短い文章にする」など色々あります。(ただし雑談は別

これを意識し、ベクトルが同じであれば意見はスピーディーに収束すると思ってます。

また「多部署に対して、同じ意見を言える」「役割を理解して連携し合う」ところも重視したいですね。

どんなチームを目指してるの?

# メンバー全員が
- 責任持って自走する
- 他部署に対して、同じ意見を言える
- お互いに役割を理解して連携し合う

みたいな感じですね。

SREチームとしてどう適用してるの?

- 未来予測しながら、仕事する
- 建設的でロジカルな議論をする
- 健康的である
- 成果させ出してれば自由。色々試していけばよい
- 意思疎通より意思の統一(ベクトルが同じかどうか)。意思疎通はPR。(深い、長くなりそうならchat or meets)
- HRT
    - 謙虚さを持って相手より勉学に励む(「おれについてこい」くらいの度量で)
    - 信頼を持ってあまり口を出さない。任せる
    - 尊敬して讃えあう(thumbsup)

というのを掲げてます。

HRTを意識することで、シンプルな会話になりやすくなります。

例えば会話の中で「人格否定してるわけではないですが」「非難してるわけではないですが」のような無駄な枕詞をなくした会話になりやすいです。

目指すは 誠実に言い合えるチーム です。

「自由を与えながら成果を出させる」というのは非常に環境が影響しやすいと思ってます。

なので我々は「失敗しても良い。何やってもいい。その代わり責任は持ちましょう。」という環境にしてます。

「迷ったり不安になったら、相談やレビューが絶対くる」とメンバーを信頼しましょう。来なかったらフィードバックし成長しあいましょう。

また働く時間も休みも自由にしてます。要所で会議や相談をセットすると思いますし、例えば新入社員が入ったばかりなら、相手が働いてる時間帯で働くメンバーだと信頼してます。

やりたい仕事をさせる

バックログにはいっぱいタスクが積んであります。

メンバーが自由にとってやらせれば良い仕組みにしており、なるべくslackや1on1で様子を伺いながら、成長したいorしてほしい分野をやらせます。

被ったら一緒にやらせれば良いのですが、問題はやりたくない仕事が余ることです。

そこはマネージャーである私がやってもよいし、成長のためにやらせてもよいし、そういうのが得意なメンバーを雇った良いチームを作り上げたいと思ってます。

会議はどれくらいやってるの?

  • 1週間何やったかを共有 + datadog dashboardを見る(15分。ドキュメントさえ見てれば参加自由)
  • 1on1(2週間に1回、30分)

各チーム会議はこれだけです。

これ以上必要ないと思いますし、全員時間を有意義に使うためです。

相手が何やってるか把握出来ないのでは?

まずマネージャーだから全部把握する必要はないかなぁと思ってます。

今何をやってるかに関しては、チーム全体で「相手に情報を出す」より「必要そうだったら出して、それを各々pull型で必要だったら拾う」を意識してます。

今後何やっていくかに関しては、基本github issueのissueやPRの通知や動きで把握出来ます。

その上でslackでは議論、issueやPRに5W1Hで決まったこととslackのURLだけ貼って簡潔にしてます。

なので、よくプレイヤーが疲弊しがちな、マネージャーのためにバックログにリアルタイム経緯を書いたりはしません。

物事の成果が出るタイミングにレビュー会を開いて、そこで「コメントに経緯を書いておくと、後からjoinした人がわかりやすいよね」というツッコミをすることが多いです。

最後に

多様な人材をうまく組み合わせることで、良いチームが出来上がります。

今でも良いチームですが、これからもっと高みを目指すために日々勤勉を積んで、爆速に成果を出せるチームにしたいと思います。

Fargateでfirelensを利用し、datadog logsとs3にログを保存する

f:id:sion_cojp:20200207102418p:plain

firelens is 何?

fluentd/fluent-bitに連携することが可能なログドライバーです。

なぜfirelensか

firelensが出るまでFargateはawslogsドライバくらいしか選択肢がありませんでした。

s3に生ログの保存、解析+アラート用にdatadog logsに送るためには

fargate -> cloudwatch -> firehose -> lambda -> datadog logs, s3 といった構成を取る必要があり、そこには2つ問題がありました。

  • cloudwatch使わないのに無駄な費用が発生
  • lambdaがたまにこけていた(これは我々の実装の問題だと思います)

fargate(firelens) -> firehose -> datadog logs, s3 とfirelensを使うとこの構成になり、上記解決 + cloudwatch, lambda費用もかからなくなります。

fluent-bitのイメージについて

今回サイドカーで立てるfluent-bitのイメージは

https://hub.docker.com/r/sioncojp/fluent-bit-datadog-firehose を使います。

なぜこのイメージを使うかというと、Fargateはs3にあるconfigファイルを利用できないので、firehose, datadogに送るconfigを設定したカスタムイメージを作成する必要がありました。

ここで注意するのは、 /fluent-bit/etc/fluent-bit.conf/fluentd/etc/fluent.conf のファイルパスは予約されてるので、/fluent-bit/etc/extra.conf などの名前で保管しましょう。

ref: s3にあるconfigが使えない + ファイルパスの予約

ECS task definition

タイミーではecs-deployというGoの自社ツールでecsのtask設定 + service updateをしてますが、そちらで自動的にfluent-bitのContainerDefinition設定がされるようになってます。

fluent-bitの管理はSREなので、開発者には動かしたいコンテナの設定だけしてもらうよう意識してます。

設定されるjsonについて、firelens部分だけ紹介するとこのような感じになります

# サービス側
      "logConfiguration": {
        "logDriver": "awsfirelens",
        "secretOptions": null,
        "options": null
      },
# fluent-bit側
      "environment": [
        {
          "name": "AWS_REGION",
          "value": "ap-northeast-1"
        },
        {
          "name": "DD_SERVICE",
          "value": "koyama"
        },
        {
          "name": "DD_SOURCE",
          "value": "fargate"
        },
        {
          "name": "DD_TAGS",
          "value": "env:sandbox"
        },
        {
          "name": "DELIVERY_STREAM",
          "value": "koyama_koyama"
        }
      ],
      "secrets": [
        {
          "valueFrom": "SSMのDD_API_KEYパス",
          "name": "DD_API_KEY"
        }
      ],
      "firelensConfiguration": {
        "type": "fluentbit",
        "options": {
          "config-file-type": "file",
          "config-file-value": "/fluent-bit/etc/extra.conf"
        }
      },

全task definitionはこちら。

datadogに飛ばす

firelens -> サイドカーのfluentbit -> datadog

### タグ付け
- source: fargate/cron/runtask
- service: コンテナのサービス名
- env: prod/stg/sandbox...

s3に飛ばす

firelens -> fluentbit-> firehose -> s3

https://github.com/cosmo0920/fluent-bit-go-s3 も検証しましたが、1リクエスト1ファイルだったので管理が辛く断念しました。

またfirehoseは各サービスで1つ作るのがベストです。そうしないと同じs3ディレクトリに全ファイルが集約されてしまいます。

### 共有firehose
s3://生ログ保存バケット/共通ディレクトリ/YYYY/MM/DD/ファイル名

### サービス毎にfirehose
s3://生ログ保存バケット/クラスタ名/サービス名/YYYY/MM/DD/ファイル名

1つずつ作り クラスタ名/サービス名 にfirehose prefixを設定することでディレクティブに保管できます。データ分析の際に使いやすくなるよう意識しました。

fluent-bit自体のロギング

awslogsドライバーでcloudwatchに吐くことができますが、特に必要な情報が今の所なさそうだったので出してません。

もし必要になった場合、cloudwatch logsで全fluent-bitログを保管するlog groupを作り、そこのグループにlogstreamにサービス名を定義して保管するのがベストだと思います。

fluent-bit自体のモニタリング

今の所はしてません。問題が起きたらそれに合わせてすると思います。

またessential:trueなので、落ちた場合、taskにある全コンテナ落ちますので、datadogでサービス単位のコンテナ変動をモニタリングはしております。

Fargate + cloudwatch eventでcronシステム モニタリング編

前回のお話

Fargate + cloudwatch eventでcronシステム構築 - Sionの技術ブログ

non exit0かどうか

こちらはlambdaで監視してます。

Fargate + cloudwatch eventでcronシステム構築 - Sionの技術ブログ

ロジックとしては、タスク終了時のイベントをトリガーに、exitCodeの値をとって、0じゃない場合slackに通知してます。

タスクが正常に動いてるかどうか

こちらはdatadog logsの量が0以上なのをチェックしてます。

例えば1分間毎のcronの場合、「1分間でログが0以上あるよね」というチェックをしてます。

クラスタのタスク数と、タスクが並列で動いてないかどうか

クラスタのタスク数

1Clusterにおける、Fargateのタスク上限数は50です

Amazon ECS サービス制限 - Amazon Elastic Container Service

datadogで監視出来るのはFargateのserviceのみ。今回はcloudwatch eventsを通してタスクを動かしてるので取得できませんでした。

これが監視できないと、上限値に達した場合タスクが起動できなくなってしまいます。

その前に検知して上限緩和をしたいですね。

タスクが並列で動いてないか

最初は「datadog logsで決めた量の2倍以上になったら2つ動いてる」というロジックで監視してましたが、

2倍以上にならない場合もあるので、悩みました。

双方の監視はGoで実装し、Fargate service上で動かして監視してます。

github.com

上記でやってることは

- stg, prodのcronクラスター内のタスクの数を監視し、slackに通知。 
- タスクがn個以上実行されてるかを監視します。15分ごとにまとめをslackに通知。(nは設定できます)

となってます。

大変だったところ

メトリクスが取れない?

datadogではfargate service + clusterの場合メトリクスが取れます。

が、今回はfargate + cloudwatch eventなのでクラスタからメトリクスが取れません。

なのでもし中々taskが終わらない...などの調査用に取りたい場合は、サイドカーでdatadog/agentをいつでも立ち上げれるよう、準備するだけでとどめてます。

datadog/agentが立ち上がる前にメインコンテナが起動終了する?

https://hub.docker.com/r/sioncojp/docker-slack

curlでslackにメッセージPOSTするイメージで検証してました。

あまりにも起動、終了が早いため、datadog/agentより先に立ち上がって終了します。

dependsOnでdatadogに向ければ、datadogが立ち上がった後にアプリケーションが起動するので、メトリクスが取れるようになります。

datadog/agentが永遠に立ち上がる?

dependsOnサイドカーのdatadogを向けた場合、永遠に立ち上がってしまいます。

https://github.com/sioncojp/fargate-sidecar-datadog-agent

のようにタスクが全部終了したら、検知してdatadogをshutdownするソリューションを考えましたが、

app = essential: true + dependsOn: datadog/agent
datadog/agent = essential: false

の設定を施せばこれは解決しました

タスクが永久にpending?

例えば、

app:latest = essential: true + dependsOn: datadog/agent
datadog/agent:hogeeee = essential: false

でdatadogをサイドカーで立ててる場合、datadog/agent:hogeeee のような存在しないイメージを指定すると永久にタスクがpendingになります。

細かい原因はよくわかりませんが、もしこのようなパターンを使う場合は、app側のwrapperでサイドカーの起動を失敗検知する設定が必要です

ref: https://docs.docker.com/compose/startup-order/

Fargate + cloudwatch eventでcronシステム ロギング設計編

f:id:sion_cojp:20190805140304p:plain

前回のお話

Fargate + cloudwatch eventでcronシステム構築 - Sionの技術ブログ

技術

  • aws-cloudwatch-logging module: terraform module。kinesis firehose + lambda。(cloudwatch logsに書き込まれたらs3とdatadog logsに吐き出す)
  • s3: 長期間保存するログを格納
  • datadog logs: リアルタイムな1ヶ月間のログを保管。障害対応用

詳細

loggingは1つのmoduleで作られます。

module "logging_cron" {
  source        = "../../../../../modules/aws-cloudwatch-logging"
  cluster_name  = "cron"
  s3_bucket     = "xxxxxxxxxx"
  dd_tag_system = "cron"
  dd_tag_env    = "sandbox"

  services = [
    "koyama_test",
  ]
}

services に送りたい対象のservice名を書いていけばdatadog logとs3に保管するように設計しております。

firehoseはサービス分作っており、上限が懸念点ではありますが、緩和すれば良い + いつか出るkinesis直結ログドライバーが解決するかもしれない。のでそれまでの辛抱ということで許容してます。

こだわったところ

ネームスペース

最初のロギング周りは下記でした。

logGroup: /Service名
logStream: ランダム

この場合、lambdaのプログラムが影響し、aws-cloudwatch-logging moduleをサービス毎に1つ作らないといけませんでした。

しかし、ネームスペースを下記のようにすることで、lambdaが適切な値を引っ張れるようになり、 services に追加したいものを入れるmoduleにすることができました。

logGroup:/cron/Service名
logStream:/cron/Service名/コンテナ名/taskArn

s3側のロギング

s3については、ソースレコードのバックアップを使えばlambdaを介さずs3にログを保管できます。

lambdaに影響せず、生ログが保管でき安全性が担保できますが、これをやると全てのログが1行で出力されてしまいます。

全てのログがjsonであれば、jqを使ってログ調査が用意になりますが、そうであるとは限らないため、lambdaで改行したデータを保管するようにしました。

datadog側のタグ

logStream:/cron/Service名/コンテナ名/taskArn を設定したことにより、理想のタグをlambdaでつけて送信することが出来ました

system: cron
service: service名
application: コンテナ名
task: taskArn

こうすることで、タグ指定で調査することが出来、より素早い障害対応が可能になります。

datadogのエラータグ

f:id:sion_cojp:20190805204958p:plain

json形式で loglevel:error で出力すると、datadog logsでもErrorタグとして検知されます。

「もしerrorがxx回あったら通知する」という監視が可能になるので、出来る限りjsonで吐くと良いです。

上が loglevel:fatal 下が loglevel:error です。

lambdaファイル

firehoseに紐づけてる、datadog logsに送るlambdaは下記となります。

# coding: utf-8

# cloudwatchのデータをdatadog-logsに流しこむ
# またfirehoseでs3にjsonデータを保管するために整形しreturnする
# ref: https://github.com/DataDog/dd-aws-lambda-functions/blob/master/Log/lambda_function.py

from __future__ import print_function

import io
import base64
import gzip
import json
import socket
import ssl
import os

import boto3

DEFAULT_REGION = 'ap-northeast-1'

DD_URL = os.getenv("DD_URL", default="lambda-intake.logs.datadoghq.com")
#Define the proxy port to forward the logs to
DD_PORT = os.environ.get('DD_PORT', 10516)
DD_SOURCE = "ddsource"
DD_CUSTOM_TAGS = "ddtags"
DD_SERVICE = "service"

# metadata: Additional metadata to send with the logs
metadata = {
    "ddsourcecategory": "aws",
}

# DD_API_KEY: Datadog API Key
DD_API_KEY = boto3.client('ssm').get_parameters(
    Names=[
        "xxxxxxx", # pathは隠します
    ],
    WithDecryption=True
)['Parameters'][0]['Value']

# Pass custom tags as environment variable, ensure comma separated, no trailing comma in envvar!
DD_TAGS = os.environ.get('DD_TAGS', "")
ONLY_JSON = os.environ.get('ONLY_JSON')

class DatadogConnection(object):
    def __init__(self, host, port, ddApiKey):
        self.host = host
        self.port = port
        self.api_key = ddApiKey
        self._sock = None

    def _connect(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s = ssl.wrap_socket(s)
        s.connect((self.host, self.port))
        return s

    def safe_submit_log(self, log):
        try:
            self.send_entry(log)
        except Exception as e:
            # retry once
            self._sock = self._connect()
            self.send_entry(log)
        return self

    def send_entry(self, log_entry):
        # The log_entry can only be a string or a dict
        if isinstance(log_entry, str):
            log_entry = {"message": log_entry}
        elif not isinstance(log_entry, dict):
            raise Exception(
                "Cannot send the entry as it must be either a string or a dict. Provided entry: " + str(log_entry)
            )

        # Merge with metadata
        log_entry = merge_dicts(log_entry, metadata)

        # Send to Datadog
        str_entry = json.dumps(log_entry)
        #For debugging purpose uncomment the following line
        prefix = "%s " % self.api_key
        return self._sock.send((prefix + str_entry + "\n").encode("UTF-8"))

    def __enter__(self):
        self._sock = self._connect()
        return self

    def __exit__(self, ex_type, ex_value, traceback):
        if self._sock:
            self._sock.close()
        if ex_type is not None:
            print("DatadogConnection exit: ", ex_type, ex_value, traceback)

def merge_dicts(a, b, path=None):
    if path is None:
        path = []
    for key in b:
        if key in a:
            if isinstance(a[key], dict) and isinstance(b[key], dict):
                merge_dicts(a[key], b[key], path + [str(key)])
            elif a[key] == b[key]:
                pass  # same leaf value
            else:
                raise Exception(
                    'Conflict while merging metadatas and the log entry at %s' % '.'.join(path + [str(key)])
                )
        else:
            a[key] = b[key]
    return a

def processRecords(records):
    """
    cloudwatch logsから送られてきたデータをデコード + 解凍し、firehoseに適したデータに整形する

    :param records: cloudwatchから送られてくるevent['records']
    """
    for r in records:
        compressed = base64.b64decode(r['data'])
        expand = gzip.decompress(compressed)
        data = json.loads(expand)
        record_id = r['recordId']


        # 送信先が到達可能かどうか
        if data['messageType'] == 'CONTROL_MESSAGE':
            yield {
                'result': 'Dropped',
                'recordId': record_id
            }
        # 通常のデータ
        elif data['messageType'] == 'DATA_MESSAGE':
            yield {
                'data': base64.b64encode(expand).decode('utf-8'),
                'result': 'Ok',
                'recordId': record_id
            }
        # それ以外は失敗扱い
        else:
            yield {
                'result': 'ProcessingFailed',
                'recordId': record_id
            }

def datadog_handler(records, con):
    """
    datadog logsにデータを送信する

    :param records: firehose用に整形されたデータのリスト
    :param con: datadogのconnection

    :return output: firehose用のデータ。applicationのjson部分だけをdataに格納
    """

    metadata[DD_SOURCE] = "firehose"
    output = []

    for r in records:
        payload = ""
        # result OK以外は送信しない
        if r['result'] != 'Ok':
            continue

        # base64デコードとbyte to stringにする
        data = base64.b64decode(r['data']).decode()
        log = json.loads(data)

        if log['messageType'] == 'DATA_MESSAGE':
            for e in log['logEvents']:
                # firehose to s3用データ
                payload += e['message'] + "\n"

                # datadogのserviceタグを付与
                metadata[DD_SERVICE] = log['logGroup'].split("/")[-1]
                metadata[DD_CUSTOM_TAGS] = ",".join(
                    [
                        DD_TAGS,
                        "task:" + log['logStream'].split("/")[-1],
                        "application:" + log['logStream'].split("/")[-2]
                    ]
                )

                # jsonはloadして送信
                if isValidJson(e['message']):
                    # タグを付与
                    # datadog logsに送信
                    con.safe_submit_log(json.loads(e['message']))

                # ONLY_JSONがfalseのときはJSONじゃないものの処理は飛ばす
                elif ONLY_JSON != "false":
                    continue

                # JSON以外は普通に送信
                else:
                    con.safe_submit_log(e['message'])


        output_record = {
            'recordId': r['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(gzip.compress(payload.encode("utf-8"))).decode("utf-8")
        }
        output.append(output_record)

    return output



def isValidJson(json_string):
    """
    isValidJson ... 渡された string が json フォーマットかどうかを確認する
    """
    try:
        json_object = json.loads(json_string)
    except ValueError as e:
        return False
    return True


def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
    """
    firehose streamにデータをputする

    :param streamName: firehose
    :param records: firehose用に整形されたdata list
    :param client: boto3.client('firehose')
    :param attemptsMade: 現在の呼び出しリクエストの試行回数
    :param maxAttempts: 最大試行回数

    """
    failedRecords = []
    codes = []
    errMsg = ''
    # if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
    try:
        response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedPutCount'] > 0:
        for idx, res in enumerate(response['RequestResponses']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:
                continue

            codes.append(res['ErrorCode'])
            failedRecords.append(records[idx])

        errMsg = 'Individual error codes: ' + ','.join(codes)

    if len(failedRecords) > 0:
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
            putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
        else:
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))


def lambda_handler(event, context):
    # Lambda 関数の設計図 kinesis-firehose-cloudwatch-logs-processor-python をベースに
    # 6MB溢れたらfirehoseにputする実装をしてます

    # 最終的にreturnする値
    output = []

    # Check prerequisites
    if DD_API_KEY == "<your_api_key>" or DD_API_KEY == "":
        raise Exception(
            "You must configure your API key before starting this lambda function (see #Parameters section)"
        )

    # パラメータ等のチェック
    if not 'records' in event or not event['records']:
        return {"status": "ok", "msg": "no records"}

    # define variable
    streamARN = event['deliveryStreamArn'] # 溢れたデータを送信するためのkinesis firehose arn
    projectedSize = 0          # 全データのバイト数
    recordsReingestedSoFar = 0 # 溢れたデータが発生したときに、ログにどれくらい溢れたかを測るための変数
    putRecordBatches = []      # 溢れたデータのグループリスト
    recordsToReingest = []     # 溢れたデータリスト

    # cloudwatchからきたデータをデコードして、firehose用のデータに変換
    records = list(processRecords(event['records']))

    # 溢れたデータ用に、転送するための整形データ
    dataByRecordId = {r['recordId']: {'data': base64.b64decode(r['data'])} for r in event['records']}

    # 溢れたデータの処理
    # データが6MB以上になったらlambdaがこけてしまうので、別の配列に入れてあとでfirehoseにputRecordする
    for idx, r in enumerate(records):
        if r['result'] != 'Ok':
            continue
        projectedSize += len(r['data']) + len(r['recordId'])
        if projectedSize > 6000000:
            # 転送するための整形データをappendしてく
            recordsToReingest.append(
                {'Data': dataByRecordId[r['recordId']]['data']}
            )
            records[idx]['result'] = 'Dropped'
            # 既存データから削除
            del(records[idx]['data'])
        # レコード数を1グループ最大500レコードにする
        if len(recordsToReingest) == 500:
            putRecordBatches.append(recordsToReingest)
            recordsToReingest = []

    # 6MB超えた + 500レコード未満のやつをputRecordBatches 配列に入れる
    if len(recordsToReingest) > 0:
        # add the last batch
        putRecordBatches.append(recordsToReingest)

    # 溢れたデータをputRecordsでfirehoseに流し込む
    if len(putRecordBatches) > 0:
        client = boto3.client('firehose')
        for recordBatch in putRecordBatches:
            putRecordsToFirehoseStream(streamARN, recordBatch, client, maxAttempts=20)
            recordsReingestedSoFar += len(recordBatch)
            print('Reingested %d records out of %d' % (recordsReingestedSoFar, len(event['records'])))


    # datadogのsocket生成
    with DatadogConnection(DD_URL, DD_PORT, DD_API_KEY) as con:
        # Add the context to meta
        if "aws" not in metadata:
            metadata["aws"] = {}
        aws_meta = metadata["aws"]
        aws_meta["function_version"] = context.function_version
        aws_meta["invoked_function_arn"] = context.invoked_function_arn

        # datadog logsに送信 + firehose用データを取得
        output = datadog_handler(records, con)

    return {'records': output}

Fargate + cloudwatch eventでcronシステム構築

FOLIOでFargate + cloudwatch eventを使ったcron(マイクロバッチ)システムを設計し、実際に本番で動いてるので紹介します。

(ロギング、モニタリングは別記事で紹介したいと思います)

構成図

f:id:sion_cojp:20190903204310p:plain

技術

  • fargate: アプリケーション
  • cloudwatch event schedule: cron形式でFargateをさせる発火
  • cloudwatch logs: awslogsドライバーでアプリのログを出力
  • ecscli: Go製の社内cliツール。ECSタスク登録、cloudwatch eventのschedule put(update)をする

詳細

terraformでcluster / serviceの2つのmoduleがあります。

/* cluster生成 */
module "cron" {
  source       = "modules/aws-ecs/fargate_batch_cron_cluster"
  cluster_name = "cron"
  vpc_id       = "${data.aws_vpc.sandbox.id}"
  dd_tag_env   = "sandbox"

  # monitor sandbox
  slack_webhook_url = "${var.SLACK_WEBHOOK_MONITOR_SANDBOX}"
}

/* koyama_testサービス生成 */
module "koyama_test" {
  source       = "modules/aws-ecs/fargate-cron"
  cluster_name = "cron"
  task_name    = "koyama_test"

  subnets = [
    "${data.aws_subnet.private-subnet-1a.id}",
    "${data.aws_subnet.private-subnet-1c.id}",
  ]
}

clusterは下記を行ってます。

  • ecs cluster作成
  • デフォルトで使える、iam作成
  • non exit 0 の場合、slackに通知するlambda

serviceは下記を行ってます。

  • 対象ecs fargate taskをターゲットとした、cloudwatch event作成
  • cloudwatch eventからecsが操作できる + ecsで使うiam作成

下記のコマンドでdeploy, disable, enable が実行できます。

### deploy
$ ecscli schedule update --env sandbox -t ecs.yml

### 停止
$ aws --profile sandbox events disable-rule --name cron-koyama_test

### 再開
$ aws --profile sandbox events enable-rule --name cron-koyama_test

こだわったところ

applyの負担軽減

$ tree
├── cluster
│   ├── ecs.tf
├── koyama_test
│   ├── ecs.tf
└── logs
    ├── main.tf

のようにサービス毎のディレクトリに分割することで、apply時に他サービスが影響しないようにしました。

SREと開発者の責務分け

taskDefinitionはSRE。containerDefinitionは開発者に設定してもらうようにしました。

理由は、例えばFargateのネットワークはaws-vpcモードなのでSecurityGroupとSubnetIDがタスク定義に必要ですが、そこの設定を開発者にさせたくなかった(というか開発者は分からない)からです。

SRE管轄はecscliのGoコードにハードコーディング。 開発者には下記yamlを書いてもらいdeployしてもらってます。

$ vim ecs.yml
type: "FARGATE"
cluster: "cron"
name: "cron-koyama_test"
containers:
- name: app
  cpu: 256
  memory: 512
  image: sioncojp/docker-slack:latest
  environment:
    WEBHOOK_URL: "https://hooks.slack.com/services/xxxxxxxxx"
schedule:
  scheduleExpression: "rate(1 minute)"
  taskCount: 1

deploy = updateであり、開発者にcreate権限は与えてません。

理由はSREが把握できないcronを自由にdeployされてしまうのはよくないため、事前にSREが作った枠に対し、開発者がupdateする仕組みにしてます。

Fargateのtask CPU, memoryの自動選択

開発者にはcontainerDefinitionを設定してもらってますが、Fargateだと全体のCPU, memoryを決めないといけません。

開発者にはコンテナのCPU, memoryだけ決めさせたかった + 良いことにFargateの全体値は決め打ちだったので、全てのコンテナのCPU, memoryを加算した値から、適切な値を自動で選択させるようにしました。

$ vim internal/fargate/fargate.go
package fargate

import (
    "strconv"
)

// NewTaskCpuMemoryValue ... Fargateのタスク全体のCPU, Memoryを算出
func NewTaskCpuMemoryValue(cpus, mems []int64) (string, string) {
    var c, m int64

    for _, v := range cpus {
        c += v
    }

    for _, v := range mems {
        m += v
    }

    taskCpu, taskMemory := convertTaskValueToString(c, supportMemoryValue(m))

    return taskCpu, taskMemory
}

// supportMemoryValue ... 512 or 1024, 2048 ... のように1024の倍数を算出
func supportMemoryValue(m int64) int64 {
    if m <= 512 {
        return 512
    }
    return (m/1024 + 1) * 1024
}

// convertTaskValueToString ... CPUの範囲をベースに、Taskがサポートしてる値を算出
// https://docs.aws.amazon.com/ja_jp/AmazonECS/latest/developerguide/task-cpu-memory-error.html
func convertTaskValueToString(c, m int64) (string, string) {
    memoryStr := strconv.FormatInt(m, 10)

    switch {
    case c == 256:
        if 512 <= m && m <= 2048 {
            return "256", memoryStr
        }
        fallthrough
    case 256 < c && c <= 512:
        if 1024 <= m && m <= 4096 {
            return "512", memoryStr
        }
        fallthrough
    case 512 < c && c <= 1024:
        if 2048 <= m && m <= 8192 {
            return "1024", memoryStr
        }
        fallthrough
    case 1024 < c && c <= 2048:
        if 4096 <= m && m <= 16384 {
            return "2048", memoryStr
        }
        fallthrough
    case 2048 < c && c <= 4096:
        if 8192 <= m && m <= 30720 {
            return "4096", memoryStr
        }
        fallthrough
    default:
        return "", ""
    }
}