<-- mermaid -->

GoでBigQueryに貯めたIstioのアクセスログを分析するツールをつくる

こんにちは。SaaS Product Team SREの八代です。

はじめに

弊社が開発しているSPEEDAでは、KubernetesとIstioを利用してサービスメッシュ基盤を構築しています。オンプレミス上に構築したK8s、GKE、Google Anthosなど10個以上のK8sクラスタを管理しており、その上でIstioが導入された数100個のPodが稼働しています。

今回は、Istioを導入しているPodのアクセス分析を行う上での課題を共有するとともに、それを改善するための仕組みを作り始めたので、それについて書きますので、同じような問題を抱えている方の参考になれば嬉しいです。

ログの収集基盤について

ログ収集の方式

弊社では多くのサーバー、PodのログをBigQuery上に蓄積しており、開発者はWeb UI、Metabase、Grafanaなどからこれらのデータを参照しています。

中でも、Istioで取得するマイクロサービスへのアクセスログは、複数クラスタに渡ったデータ分析をしやすくするため、1つのテーブルに集約しています。

各クラスタにはDaemonSetのFluentdが配備され、PodのログをBigQueryに転送しています。

Istioのログ収集方式

私たちの開発チームでは、マイクロサービスの開発を様々な言語で行っているため、それぞれのマイクロサービスで異なる形式のログを出力している場合が多いです。このため、Istioの機能によってアクセスログのフォーマットが統一されることにより得られる恩恵が大きく、同一のSQLクエリを利用して分析できる点で非常に重宝しています。

課題

上記の方式はメリットも大きいのですが、テーブルの容量が膨らんでしまうことが課題になります。

テーブルは日付分割パーティションテーブル(分割テーブルの概要  |  BigQuery  |  Google Cloud)で分離されるよう構成されているのですが、1日あたりのデータ量は40GBほどにはなってしまいます。東京リージョンにおいては、読み込みが1TBごとに$6ほどの料金がかかるため、1日のデータを取得するクエリを発行するたびに30円ほどの課金が発生します。

1クエリで見ると大きな金額ではないのですが、データの分析などを行う際には、SQL文を試行錯誤しながら書いたり、複数日程にわたりデータを取得することが多いので、ちりつも式に思いのほか課金額が膨らんでしまう場合があります。

特に、MetabaseやGrafanaなどから分析した情報を示すダッシュボードを作成するケースを考えると、週単位の期間の情報を取得するクエリを定期的に発行することになり、課金額は馬鹿にできない額になってしまいます。

また、クエリが重いため描画に時間がかかってしまうことや、クライアントから煩雑なSQL文を都度考えないといけないことから、使用体験が悪くなってしまうことも継続的に分析・改善を進めるための阻害要因になってしまっているのではないかと考えています。

改善案

上記の課題を改善するため、定期的にアクセスログを一定の粒度(クラスタ、Namespace、リクエストパス)でグルーピングしたのち、主要な分析結果(アクセス回数、最大・最小・平均レイテンシ、レイテンシの頻度分布、エラーレートなど)を別のテーブルに格納するジョブを作成しようと考えました。

ツールの実装

Istioのアクセスログを分析する目的に絞れば、BigQueryのコマンドラインツール bq を利用して、サクッとスクリプトで書くこともできるのですが、同様の仕組みを他のアプリケーションやミドルウェアに流用できる汎用性を持たせたかったため、Goでツールを書きました。

以下、実装の主要部分について、具体的に紹介していきたいと思います。

設定ファイル

まず、ツールの動作をイメージしていただくため、用意した設定ファイルの形式をサンプルとともに見ていただきたいです。yaml形式で以下のような構造になっています。

bigQueryRules:
  - name: <ルール名>
    dest: // insert先の情報
      projectId: <プロジェクトID>
      dataset: <データセット名>
      table: <テーブル名>
      field: // テーブルスキーマの情報
        - name: <フィールド1の名前>
          type: <フィールド1の型>
        - name: <フィールド2の名前>
          type:  <フィールド2の型>
    query: "\
    SELECT \
          <フィールド1の値>, \
          <フィールド2の値> \
      FROM <テーブル名> \
      WHERE <フィルタ条件>
      GROUP BY <グループ条件> \
      ;"

データの取得元に関する情報は全て bigQueryRules.queryに含んでおり、通常BigQueryで利用しているSQL文を記載します。

bigQueryRules.dest以下で出力先のテーブル情報を記載していきます。

このとき、クエリで取得するカラムの順序に合わせて、bigQueryRules.dest.field以下でそれぞれのカラム名と型を指定するようにします。(型については後述)

基本的にはツールは汎用性を持たせるため、クエリで取得した結果をもとにそのままカラム名と型をつけて別テーブルに流し込む実装をしているので、フィールド値の加工や取得期間の絞りこみはクエリ部分に担ってもらうことにしています。

イメージは持っていただけたでしょうか。以降でツールの実装を見ていきます。

設定ファイルの読み込み

まず、設定ファイルを読み込むため以下の構造体を定義します。

type Config struct {
    BigQueryRules []Rule `yaml:"bigQueryRules"`
}

type Rule struct {
    Name  string `yaml:"name"`
    Dest  Dest   `yaml:"dest"`
    Query string `yaml:"query"`
}

type Dest struct {
    ProjectID string  `yaml:"projectId"`
    Dataset   string  `yaml:"dataset"`
    Table     string  `yaml:"table"`
    Fields    []Field `yaml:"field"`
}

type Field struct {
    Name string `yaml:"name"`
    Type string `yaml:"type"`
}

パースは以下の関数で行います。

func ReadOnStruct(fileBuffer []byte) ([]Rule, error) {
    var config Config
    err := yaml.Unmarshal(fileBuffer, &config)
    if err != nil {
        return nil, err
    }
    return config.BigQueryRules, nil
}

クエリ結果の取得

BigQueryに対する操作は、Googleにより提供されている公式ライブラリ(cloud.google.com/go/bigquery)を使用して行います。bigQueryRules.query で指定したクエリは以下の関数で取得します。

func queryBasic(rule Rule) ([][]bigquery.Value, error) {
    ctx := context.Background()
    client, err := bigquery.NewClient(ctx, rule.Dest.ProjectID)
    // エラー処理
    defer client.Close()

    q := client.Query(rule.Query)
    q.Location = "asia-northeast1"

    job, err := q.Run(ctx)
    // エラー処理

    status, err := job.Wait(ctx)
    // エラー処理

    var rows [][]bigquery.Value
    it, err := job.Read(ctx)
    for {
        var row []bigquery.Value
        err := it.Next(&row)
        if err == iterator.Done {
            break
        }
        // エラー処理
        rows = append(rows, row)
    }

    return rows, nil
}

クエリ結果の行を順番に []bigquery.Valueに格納していき、そのスライスを返しています。 こちらは以下のサンプルコードと同等なので、参考にしていただければと思います。

cloud.google.com

スキーマの作成

クエリ結果を別テーブルにinsertするにあたり、先にスキーマを定義しておく必要があります。今回の場合、設定ファイルの内容をもとに動的にスキーマを定義する必要があるため、以下のようにします。

func createSchema(fields []Field) bigquery.Schema {
    var schema bigquery.Schema

        // 設定ファイルをもとにフィールドを順に定義
    for _, field := range fields {

        var fieldType bigquery.FieldType

                 // フィールドの型をセット
        switch field.Type {
        case "string":
            fieldType = bigquery.StringFieldType
        case "int":
            fieldType = bigquery.IntegerFieldType
        case "float":
            fieldType = bigquery.FloatFieldType
        case "timestamp":
            fieldType = bigquery.TimestampFieldType
        case "date":
            fieldType = bigquery.DateFieldType
        default:
            fieldType = bigquery.StringFieldType
        }

        s := bigquery.FieldSchema{
            Name:     field.Name,
            Required: false,
            Type:     fieldType,
        }
        schema = append(schema, &s)
    }

    return schema
}

for文の中では、bigQueryRules.dest.field に記載した配列要素それぞれで、BigQueryに対応した型をセットしたあと、bigquery.FieldSchemaを生成し、スライスに順に格納しています。bigquery.Schemaはこのスライスのエイリアスとなっているので、テーブルのスキーマとして使用することができます。

テーブルの作成

insertしようとしているテーブルが存在しない場合は、insert前に事前にテーブルを作成する必要がありますので、以下のように作成します。

   ctx := context.Background()
    client, err := bigquery.NewClient(ctx, Dest.ProjectID)
    if err != nil {
        return err
    }

    dataSet := client.Dataset(Dest.Dataset)
    _ = dataSet.Table(Dest.Table).Create(ctx, &bigquery.TableMetadata{
        Name:   Dest.Table,
        Schema: schema,
    })

まず、接続用のクライアントを生成したあと、Create()メソッドを利用しテーブルの作成を行います。第2引数には、テーブルのメタデータ情報をもった bigquery.TableMetadata を渡すことができ、今回は最低限必要なテーブル名とスキーマを設定しています。ここにフィールドを追加することで、データの有効期限なども設定することができます。

Create()メソッドは既にテーブルが存在する場合エラーを返すので、本来であればテーブルの有無に応じて処理を分岐させるべきなのですが、ドキュメントからいいメソッドを見つけることができなかったため、一旦エラーを握りつぶしています。改善したら追記したいと思います。

行の挿入

前項までで、データの取得と行を挿入するための準備が完了したので、いよいよinsertを行う部分です。以下のように書きました。

func insertRecord(rows [][]bigquery.Value, schema bigquery.Schema, Dest Dest) error {

    // 前項のクライアントとテーブル作成の処理はこの部分に記述

    ins := client.Dataset(Dest.Dataset).Table(Dest.Table).Inserter()
    var vss []*bigquery.ValuesSaver

    // 取得した各行のデータを順にセットし、bigquery.ValuesSaverに格納していく
    // 取得した行が1万行を超える場合は、1万行ごとに挿入する
    for i, row := range rows {
        if i%10000 == 0 {
            if err := ins.Put(ctx, vss); err != nil {
                return err
            }
            vss = nil
        }

        vss = append(vss, &bigquery.ValuesSaver{
            Schema: schema,
            Row:    row,
        })
    }

    if err := ins.Put(ctx, vss); err != nil {
        return err
    }

    return nil
}

順序としてはこれまでと似ており、取得したそれぞれの行とスキーマを渡して bigquery.ValuesSaveを生成し、スライスに格納していきます。 そして格納が終わったら、Putメソッドを利用してまとめてinsertを行います。

ハマりどころとして、一度に格納する行が大きくなりすぎると、Put()の処理で失敗してしまいます。エラーログに直接の原因は出力されないので、大きい行数をinsertした際に原因不明のエラーが発生したら疑ってみるといいと思います。

BigQuery APIのドキュメントを参照しても該当する制限が見つからなかったのですが、10000行分溜まったところで都度Putするような実装にすることで安定して動作するようになりましたので、一旦そうすることにしています。なお、その後はvssにnilを代入し、スライスのクリアを行っています。

以上で、ツールで使う主要な処理の紹介は終了です。最後に、これらの関数を呼び出すメイン部分の処理を記載します。

メインの処理

以下のようにメイン処理を行っています。

func Run(args []string) int {

    // 設定ファイルの読み込み
    buf, err := ioutil.ReadFile("./rules.yaml")
    if err != nil {
        fmt.Println(err)
        return 1
    }

    // 設定ファイルのパース
    rules, err := ReadOnStruct(buf)
    if err != nil {
        fmt.Println(err)
        return 1
    }

    // それぞれのルールについて、処理を実行
    for _, rule := range rules {
        // クエリ結果を取得
        rows, err := queryBasic(rule)
        if err != nil {
            fmt.Println((err))
            return 1
        }

        // スキーマを作成
        schema := createSchema(rule.Dest.Fields)

        // insert処理
        err = insertRecord(rows, schema, rule.Dest)
        if err != nil {
            fmt.Println(err)
            return 1
        }

    }

    return 0
}

ツールの紹介は以上になります。あとはこのコードを定期的に実行するようにすれば完成です。

ツールの実行

今回は、1日のデータを集計する目的で、GKE上でCronJobとして日次で稼働させるようにしています。

おそらくIstioを利用している方であれば、コンテナ化やCronJobのマニフェスト内容などの説明は不要だと思うので、そこの説明は割愛するのですが、BigQueryの認証についてTIPSを共有しておきます。

Workload Identityを利用した認証について

GCPに関連するAPIに接続するアプリケーションを稼働させるためには、GCPの認証が通るように設定を入れる必要があります。 アプリケーションをK8s上で稼働させることを想定した場合、まず思いつくのはコンソールから秘密鍵をダウンロードし、Secretリソースとしてコンテナにマウントする方法ではないかと思います。

もちろん問題はないのですが、アプリケーションが増えていくにつれ、管理する秘密鍵の個数が多くなってしまう課題があります。 SREチームでは、GCPのAPIを利用した運用ツールを作成することもよくあるのでGCPのWorkload Identityの機能を利用して、GCPのService Accountの情報だけを用いて認証する仕組みを利用しています。

本筋とずれるためこれ以上の説明はしないのですが、便利に使えるので気になる方はドキュメントを読んでみてください。

cloud.google.com

実例

最後に、本ツールを使って実際にデータの分析結果を可視化する一例を示したいと思います。

以下、各マイクロサービスについて、1日で集計したレイテンシの平均値、99パーセンタイル値を第2パスまでの値でグルーピングして集計する場合を想定します。

設定ファイルは以下のように記述します。

bigQueryRules:
  - name: "istio-log"
    dest:
      projectId: "<project ID>"
      dataset: "<dataset name>"
      table: "test_bq_log_reporter"
      field:
        - name: date
          type: date
        - name: k8s_cluster
          type: string
        - name: namespace
          type: string
        - name: pod_id
          type: string
        - name: path
          type: string
        - name: avg_resp_time
          type: int
        - name: p99
          type: int
    query: "\
     SELECT \
        CURRENT_DATE('Asia/Tokyo') as today, \
        resource.labels.cluster_name as cluster, \
        resource.labels.namespace_name as namespace, \
        REGEXP_REPLACE(resource.labels.pod_name, '-[a-z0-9]+-[a-z0-9]+$', '') as pod_id, \
        REGEXP_SUBSTR(jsonPayload.path, '^/[^/]+/[^/]+/') as path, \
        round(avg(cast (jsonPayload.duration as INT64))) as avg, \
        APPROX_QUANTILES(cast (jsonPayload.duration as INT64), 1000)[offset(990)] AS p99 \
      FROM `<your datasource table>` \
      WHERE jsonPayload.path IS NOT NULL \
        AND DATE(timestamp, 'Asia/Tokyo') = CURRENT_DATE('Asia/Tokyo') \
      GROUP BY cluster, namespace, pod_id, path \
      ORDER BY cluster, namespace, pod_id DESC;"

いくつかクエリ部分で補足します。 - pod_idのカラムはDeployment, ReplicaSetにより付与されるサフィックスを除去しています - pathのカラムでは第3パス(3つめの / 以降)の値は除去しています - APPROX_QUANTILES 関数を利用してパーセンタイル値を算出しています - カラム名はFluentdの転送設定次第なので、対応する情報に置換してください

この設定でツールが実行されると、以下のようなスキーマを持つテーブルが作成され、実際にデータが格納されます。

テーブルのスキーマ

可視化

上記で格納したデータをMetabaseから可視化する例を紹介します。

k8sクラスタ、Namespace、pod、第2パスまでの値でグルーピングした本日のアクセス統計情報を以下のクエリで可視化してみます。

SELECT k8s_cluster, namespace, pod_id, path, avg_resp_time, p99
FROM `<your-table-name>`
WHERE date = CURRENT_DATE('Asia/Tokyo')
  AND path IS NOT NULL
ORDER BY p99 DESC

以下のように各Podごとのアクセス統計(レイテンシの単位はms)を確認できます。今回の場合それなりに件数が多いので出力は表形式とし、レイテンシが大きい順に表示させています。

アクセスの統計情報

上記の表を見ると、開発者がパフォーマンスに問題のあるパスを認識することで、改善の打ち手の解像度をあげられるのではないかと考えています。ここで改善のきっかけを発見し、別のクエリで時系列・パスの粒度などをより小さくし詳細に解析を行う、といったアプローチに繋げられることを期待しています。

なお、今回の例では、テックブログ用にツールでinsertするカラム数は少なくしていますが、予め多くの統計値を格納するようにしておくと、分析の際使い勝手が良いのではないかと思います。

また、Metabaseの代わりにGrafanaのBigQueryプラグインを利用してGrafanaにダッシュボードを作成することもできます。GrafanaのBigQueryプラグインに関しては、同じSREチームの鈴木が記事を書いておりますので、よろしければご覧になってください。

tech.uzabase.com

簡単な例ではありますが、以上で紹介をおわります。

まとめ

BigQuery APIを利用して、Istioのアクセスログをコストリーズナブルに分析する方法を紹介しました。 本ツールにより、アプリケーションの状態を認識し、継続的にパフォーマンスを改善できる取り組みをより進めていきたいと考えています。

今回はツールで取得したクエリ結果を流し込む仕組みを用意したに過ぎないので、今後はいかに真に有用なデータを、開発者に使いやすいインターフェースで提供していくかが肝だと思うので、SaaS Product Team内でコミュニケーションをとりつつ、ブラッシュアップしていきたいと思います。

仲間募集中

私たちSaaS Product TeamのSREチームでは、今回紹介したSPEEDAに加えて、INITIAL/FORCAS/FORCAS Salesなど複数のプロダクトの開発に携わっており、オンプレ/GCP/AWSなど様々な環境で開発を行っています!

現在 SRE ポジションも募集を行っており、Wantedly弊社公式ホームページ で募集要項等を記載していますので、興味があればぜひご応募ください。お待ちしております!

Page top