Akka Streams ドキュメントを何回かに分けてちらちら見てみる(1)の続き。
バックプレッシャー
- Akka Streamsの利用者はバックプレッシャーをハンドリングするコードを書く必要がない。
- ライブラリ側でいい感じに処理してくれる
- 下流のSubscriberが受け取ることのできるそしてバッファすることのできる要素数(demand)という観点でバックプレッシャープロトコルが定義されている。
- データソース(Reactive StrteamsではPublisher, Akka StreamsではSource)は与えられるどのSubscriberのdemandの総数よりも多くの要素を出さないことを保証する。
遅いPublisher、速いSubscriber
- このケースではPublisherを遅くする必要がない。
- たしかにもともと下流の方が要素をより多く受け取れるので詰まることがなく調整しなくてよい。
- しかし、signaling rates(?)はめったに一定ではないし、任意の時点で変わりうる、最終的にはSubscriberがPublisherよりも遅い状況に至る。
- リソースの状況によってはPublisher, Subscriberの処理速度が変わるということを言ってるのだろうか?
- これらの状況から保護するためにバックプレッシャープロトコルは有効なままにしておく必要がある。でもこれを有効のままにしておくことによる影響を受けたくはない。
- Reactive Streamsプロトコルは、SubscriberからPublisherに
Request(n:Long)
*1の信号を非同期に送ることでこの問題を解決している。 - このプロトコルは、送られてきたdemandよりも多い要素をPublisherが送らないことを保証している。
速いPublisher、遅いSubscriber
- Publisherをバックプレッシャーするのが必須なケースである。
- Publisherは以下の戦略の一つを適用してバックプレッシャーに従う必要がある。
- 生成速度をコントロールできるならば、要素を生成しない
- より多くのdemandが通知されるまで、bounded(?)な方法で要素をバッファリングする
- より多くのdemandが通知されるまで、要素を使わない(破棄する?)
- 上記の戦略を適用できない場合、ストリームを取り壊す
Stream Materialization
- MaterializationはAkka Streamsにおいてはアクターの起動、ファイルオープン、ソケット接続などを意味する。
- 必要なリソースに依存した処理を行う。
run()
やrunWith()
、runForeach(el => ...)
というメソッドでmaterializationが始まる。runForeach(el => ...)
はrunWith(Sink.foreach(el => ...))
のシンタックスシュガーである
- Materializationはmaterializeしているスレッド(?)で同期的に行われる。
- ストリーム処理はmaterializationの間に起動したアクターによって扱われる。
- これらアクターは
ActorMaterializer
の作成時にMaterializationSettings
でセットされるディスパッチャーで設定されたスレッドプールを使う。 - 今までの例だと
ActorMaterializer()
で特に設定を渡していないのでデフォルトのディスパッチャーが使われている。
- これらアクターは
Operator Fusion
Fusing
asyncBoundary
- ストリーム処理のステージを融合して、同期的に計算する方が早い場合は効率化できるっていうイメージなのか?
Combining materialized values
- materializeされた値の合成
- run したあと(materializeしたあと)に得られる値を RunnableGraph生成時に指定できる
Keep.right
Keep.both
- materializeされた値をmapできる
GraphDSL
という便利DSLが使える
GraphDSLの例(抜粋)。
// An source that can be signalled explicitly from the outside val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int] // A flow that internally throttles elements to 1/second, and returns a Cancellable // which can be used to shut down the stream val flow: Flow[Int, Int, Cancellable] = throttler // A sink that returns the first element of a stream in the returned Future val sink: Sink[Int, Future[Int]] = Sink.head[Int] // The result of r11 can be also achieved by using the Graph API val r12: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] = RunnableGraph.fromGraph(GraphDSL.create(source, flow, sink)((_, _, _)) { implicit builder => (src, f, dst) => import GraphDSL.Implicits._ src ~> f ~> dst ClosedShape })
Stream ordering
- Akka Streamsではほぼ全ての計算ステージで要素の入力順序を保持する
mapAsync
のような非同期操作でもこれは維持される(mapAsyncUnorderd
という順序を保証しない版のメソッドもある)- ただ、複数の入力を扱うもの(
Merge
など)では出力順は定義されていない。 Zip
では出力順は保証されている。- ファン・インの場合に出力順をコントロールしたい場合は、
MergePreferred
またはGraphStage
を使う
所感
- バックプレッシャーをライブラリ側でいい感じにやってくれるのは非常にありがたい
- boudaryの概念がまだつかめていない。
- 文字通り境界を表すのだろうけども...
*1:ドキュメントでは引数の型がIntになっているがソースコードではLongになっている https://github.com/akka/akka/blob/v2.4.10/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala#L48