Blogaomu

WEBアプリケーション開発とその周辺のメモをゆるふわに書いていきます。

Akka Streams ドキュメントを何回かに分けてちらちら見てみる(2)

Akka Streams ドキュメントを何回かに分けてちらちら見てみる(1)の続き。

バックプレッシャー

Back-pressure explained

  • Akka Streamsの利用者はバックプレッシャーをハンドリングするコードを書く必要がない。
    • ライブラリ側でいい感じに処理してくれる
  • 下流のSubscriberが受け取ることのできるそしてバッファすることのできる要素数(demand)という観点でバックプレッシャープロトコルが定義されている。
  • データソース(Reactive StrteamsではPublisher, Akka StreamsではSource)は与えられるどのSubscriberのdemandの総数よりも多くの要素を出さないことを保証する。

遅いPublisher、速いSubscriber

  • このケースではPublisherを遅くする必要がない。
    • たしかにもともと下流の方が要素をより多く受け取れるので詰まることがなく調整しなくてよい。
  • しかし、signaling rates(?)はめったに一定ではないし、任意の時点で変わりうる、最終的にはSubscriberがPublisherよりも遅い状況に至る。
    • リソースの状況によってはPublisher, Subscriberの処理速度が変わるということを言ってるのだろうか?
  • これらの状況から保護するためにバックプレッシャープロトコルは有効なままにしておく必要がある。でもこれを有効のままにしておくことによる影響を受けたくはない。
  • Reactive Streamsプロトコルは、SubscriberからPublisherに Request(n:Long) *1の信号を非同期に送ることでこの問題を解決している。
  • このプロトコルは、送られてきたdemandよりも多い要素をPublisherが送らないことを保証している。

速いPublisher、遅いSubscriber

  • Publisherをバックプレッシャーするのが必須なケースである。
  • Publisherは以下の戦略の一つを適用してバックプレッシャーに従う必要がある。
    • 生成速度をコントロールできるならば、要素を生成しない
    • より多くのdemandが通知されるまで、bounded(?)な方法で要素をバッファリングする
    • より多くのdemandが通知されるまで、要素を使わない(破棄する?)
    • 上記の戦略を適用できない場合、ストリームを取り壊す

Stream Materialization

Stream Materialization

  • MaterializationはAkka Streamsにおいてはアクターの起動、ファイルオープン、ソケット接続などを意味する。
    • 必要なリソースに依存した処理を行う。
  • run()runWith()runForeach(el => ...) というメソッドでmaterializationが始まる。
    • runForeach(el => ...)runWith(Sink.foreach(el => ...))シンタックスシュガーである
  • Materializationはmaterializeしているスレッド(?)で同期的に行われる。
  • ストリーム処理はmaterializationの間に起動したアクターによって扱われる。
    • これらアクターは ActorMaterializer の作成時に MaterializationSettings でセットされるディスパッチャーで設定されたスレッドプールを使う。
    • 今までの例だと ActorMaterializer() で特に設定を渡していないのでデフォルトのディスパッチャーが使われている。

Operator Fusion

  • Fusing
  • asyncBoundary
  • ストリーム処理のステージを融合して、同期的に計算する方が早い場合は効率化できるっていうイメージなのか?

Combining materialized values

  • materializeされた値の合成
    • run したあと(materializeしたあと)に得られる値を RunnableGraph生成時に指定できる
  • Keep.right Keep.both
  • materializeされた値をmapできる
  • GraphDSL という便利DSLが使える

GraphDSLの例(抜粋)。

// An source that can be signalled explicitly from the outside
val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]
 
// A flow that internally throttles elements to 1/second, and returns a Cancellable
// which can be used to shut down the stream
val flow: Flow[Int, Int, Cancellable] = throttler
 
// A sink that returns the first element of a stream in the returned Future
val sink: Sink[Int, Future[Int]] = Sink.head[Int]

// The result of r11 can be also achieved by using the Graph API
val r12: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] =
  RunnableGraph.fromGraph(GraphDSL.create(source, flow, sink)((_, _, _)) { implicit builder => (src, f, dst) =>
    import GraphDSL.Implicits._
    src ~> f ~> dst
    ClosedShape
  })

Stream ordering

  • Akka Streamsではほぼ全ての計算ステージで要素の入力順序を保持する
  • mapAsync のような非同期操作でもこれは維持される(mapAsyncUnorderdという順序を保証しない版のメソッドもある)
  • ただ、複数の入力を扱うもの(Mergeなど)では出力順は定義されていない。
  • Zip では出力順は保証されている。
  • ファン・インの場合に出力順をコントロールしたい場合は、 MergePreferred または GraphStage を使う

所感

  • バックプレッシャーをライブラリ側でいい感じにやってくれるのは非常にありがたい
  • boudaryの概念がまだつかめていない。
    • 文字通り境界を表すのだろうけども...

*1:ドキュメントでは引数の型がIntになっているがソースコードではLongになっている https://github.com/akka/akka/blob/v2.4.10/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala#L48