UZABASE Tech Blog

〜迷ったら挑戦する道を選ぶ〜 株式会社ユーザベースの技術チームブログです。

ElixirライブラリのFlowを使ってAPIを優しく呼び出す

こんにちは。SPEEDA開発チームの上村です。

先日同僚が書いたElixirのコードを眺めていた際に、とあるバッチ処理内でback-pressureを用いてAPIサーバを優しく呼び出している処理を見かけました。 そのコード内で用いられていたライブラリを調べてみるとなかなかに面白かったので折角なので今回まとめてみました。

本エントリでは、APIサーバを優しく呼び出すことを「APIサーバが過負荷にならないくらいの負荷で呼び出す」と定義し、それを実現する方法の1つとしてElixir製のライブラリであるFlowを用いたback-pressureの手法について述べていきます。

Flowとは

FlowはElixir製のコレクションを並列分散処理することに特化したライブラリです。 Flowを用いて、あるテキストファイルに含まれる単語の出現頻度を数え上げるコードを書くと以下のようになります。

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

細かなFlowのAPIについて特に知らなくても、コードを見るとこのコードが何をしたいのかの雰囲気が伝わってくると思います。

Flowは正確にはback-pressureをよしなに扱えるGenStageラッパーのことを指し、コレクション操作に特化したものとなっています (GenStageについては後述)。 また、GenStageのラッパーには、MessageQueueをGenStageに連携させることに特化しているBroadwayも存在します。

GenStageとは

GenStageのリポジトリを見ると、GenStageはproducerとconsumer間でイベントを交換するための仕様と述べられています。 イベントと言われるとちょっとふわっとしている印象を受けますが、データ処理のトリガーというニュアンスで捉えても大丈夫だと思います。

GenStageでは、イベントの処理を行う単位をステージと呼び、ステージはproducer、producer_consumer、consumerのいずれかに属するものとして扱います。 これらのステージを組み合わせてパイプラインを構成し、イベントを処理する仕組みを、GenStageは提供します。

各ステージの役割を把握するために下記のようなパイプラインを考えてみます。

[A] -> [B] -> [C]

この例において、Aはproducer、Bはproducer_consumer、Cはconsumerに対応します。 producer/producer_consumer/consumerはGenStageでは以下のような役割を持つものと説明されています。

  • producerは、イベントの送り元であるため、イベントをconsumerへと受け渡します(つまり、sourceとなります)
  • producerは、イベントの受け手であるため、イベントをprocuerから受け取ります(つまり、sinkとなります)
  • producer_consumerは、producerでもありconsumerでもあります(つまり、前後のステージからの見方次第で異なる役割に見えることになります)

イベントを処理するためにはconsumerがproducerを購読しなければなりません。 そのため、producerはconsumerからの催促を待ち受けていると表現されたりもします。

パイプライン内の各ステージにイベントが伝搬する際のトリガーは、consumerの役割を持つステージがproducerの役割を持つステージに対して処理対象のイベントの需要がどれくらいあるかを伝えることです。 producerはその需要ときっちり同じ量のイベントをconsumerに対して送出することをきっかけに、先のパイプラインのフローのようなイベントの処理が行われます。 このようなconsumer側が処理可能なイベント量のみをproducerに伝えることで処理中のイベント量を管理するような方法は一般的にback-pressureと呼ばれています。

GenStageにおいて、back-pressureはconsumerに対して :max_demand:min_demand のオプションを設定することでコントロールされます。 consumerは:max_demand で対応するproducerと連結しているフロー内で処理されるイベントの最大量を設定し、:max_demand で対応するproducerと連結しているフロー内に存在するイベントの数が少なくなりすぎないように、フロー内にイベントを補充する閾値を設定します。 つまり、パイプラインの起動時には :max_demand 個のイベントがproducer Aから送出され、producer Aを購読するconsumer Bがイベントを処理します。その結果、パイプライン内のイベントの総数が :min_demand 個になったら、:max_demand - :min_demand 個のイベントの需要を、consumer Bがprocucer Aに対して新たに伝えて処理を続けることになります。

:max_demand:min_demandのパラメータのイメージは下記のようになります。 (:max_demand=3, :min_demand=1でconsumer Bがproducer Aを購読している場合を想定しています)

まず、:max_demand相当のイベントが送出されます。

[A]-[B]
   -> 
   -> :max_demand個
   ->
(->が各イベントに対応。:max_demandの数だけイベントが存在)

処理が進むと、パイプラインのイベントは:min_demand個まで捌けた状態になります。

[A]-[B]
   -> :min_demand個 

パイプライン内のイベントの総量が減ったので、:max_demand - :mix_demand個のイベントの送出需要を伝えて、パイプライン内のイベント総量が増えた状態にします。つまり、最初の状態に近しい状態になります。

[A]-[B]
   -> 
   -> :max_demand個
   ->

以上のようなイメージで:max_demand:min_demandのパラメータは作用します。

ちなみに、FlowはGenStageがベースとなっているため、このようなback-pressureの仕組みが背後で動いていることを理解しておくとパラメータチューニングの際に役に立ちます。

Flowの利用例

Flowを使ってAPIを優しく呼び出すサンプルコードを示します。 利用するAPIはランダムに200のステータスコードか503のステータスコードを返します。 サンプルコード内では、503のステータスコードを受け取った際は最大3回リトライする処理を簡素ながらリトライ処理として実装しています。

サンプルコード

こちらに置いてあります。 リポジトリ内のディレクトリツリーは以下のようになっています。

.
├── fragile_api
└── fragile_api_client

ランダムにステータスコード200かステータスコード503をレスポンスとして返すAPI(fragile_api)、そのAPIをFlowを使って呼び出すクライアント(fragile_api_client)の2つのディレクトリから構成されます。 本エントリがElixir関連のものなので、折角なのでfragile_apiはElixirのWebフレームワーク群の中で著名なPhoenixを用いて実装しました。

Flowの依存関係

fragile_api_client内のmix.exsに以下のような依存関係を追加しています。

defp deps do
  [
    {:flow, "~> 0.15.0"}
  ]
end

Flowを用いたコード例

Flowを利用しているコードを先述のサンプルコード(fragile_api_client/lib/fragile_api_client.ex )から抜粋します。

  def main(_args) do
    1..30
    |> Flow.from_enumerable(stages: 3, min_demand: 0, max_demand: 1)
    |> Flow.map(fn _ -> call_api() end)
    |> Enum.to_list()
    |> Enum.reduce(%{}, fn status_code, acc ->
      Map.update(acc, status_code, 1, &(&1 + 1))
    end)
    |> IO.inspect
  end

APIを呼び出す総回数は何回でも良いのですが、この例では30回としています。 1..30で先のGenStageの説明におけるproducerの役割となるステージを1つ作成します。

次に、|> Flow.from_enumerable(stages: 3, min_demand: 0, max_demand: 1) でconsumerの役割のステージを3つ作成します。 それぞれのconsumerは同時に1つの処理のみを処理するようにmin_demand: 0, max_demand: 1と設定します。

Flowを使った処理はここまでで、最後にEnum.to_listEnum.reduceを使って処理結果のサマリーを作成しています。

上記のコードを実行した結果は以下となります。

%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 1, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 1, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 1, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 1, reason: "success", status_code: 200}
%{attempts: 1, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 3, reason: "max_retry_exceeds", status_code: 503}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 0, reason: "success", status_code: 200}
%{attempts: 3, reason: "max_retry_exceeds", status_code: 503}
%{attempts: 2, reason: "success", status_code: 200}
%{200 => 28, 503 => 2}

APIの利用状況について可視化することができればFlowでの処理状況がより分かりやすかったのですが、筆者の調べ方が浅いせいか、 Phoenixアプリのモニタリング状況の可視化までは至りませんでした…。

まとめと感想

本エントリではFlow経由でGenStageのback-presssureを用いてAPIを優しく呼び出す方法について述べました。 FlowはGenStageが抽象化したback-pressureを更に抽象化することで、クライアントコード側の処理をより直感的に書けるようにしてくれます。 consumer/producerの購読関係を明示的に一つ一つ設定することなく、サッとイベントのパイプラインを書くことができるのは非常に魅力的です。 バッチ処理などでデータを多段に処理する際の選択肢として、Flowの採用を考えてみるのも面白いのではないかと思います。