• フロー型の処理をStageを用いて抽象化してより簡潔に記述することができる抽象型を提供する

  • Stageには生産者(Producer)、消費者(Consumer)、生産者/消費者(Producer-Consumer)の3つの役職がある

  • それぞれのプロセスをSupervisorに乗せることができる

  • 強化版GenEvent

最もシンプルな形のデータ処理フロー

この例だとそれぞれが一対一だけど一対多も可能 プロセスがSupervisor上に乗っているので、それぞれのプロセスが並列に動作する

バックプレッシャー

GenStageにはバックプレッシャーという概念がある。これはConsumerがデータを要求した時のみProducerがデータを送信するというもの。 この概念があることで、ProducerがConsumerの許容範囲を超えたデータを送信する事を避けることができる。

導入

Hex.pm

使い方

まず始めに--supオプションを付けて新しいプロジェクトを作成する。

Producer

一定数のストリームを生成する

  • init():producerを指定している
  • handle_demandでConsumerから要求があった時の処理を行っている。この場合だとstateに応じたリストを生成する
defmodule Sample do
  use GenStage
 
  def start_link(_initial) do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end
 
  def init(counter), do: {:producer, counter}
 
  def handle_demand(demand, state) do
    events = Enum.to_list(state..state + state - 1)
    {:noreply, events, state + demand}
  end
end

Consumer-Prosucer

Prosucerからデータを受け取り、