最近、業務でAkka Streamsを使った処理に触れることがあって、詳しく知りたいなあと思ったのでドキュメントを読みながら触ってみようと思う。 一度には読み切れないのでいい感じに区切りながら進めていきたい。
Basics and working with Flows — Akka Documentation
主要なコンセプト
用語が定義されてるので雑に訳してみる。
- Stream
- データの移動、変形を含むアクティブなプロセス
- Element
- ストリームの処理単位。すべての処理は要素を変形して上流から下流に流す。
- Buffer sizeのくだりはよく分からず...
- Back-pressure
- フロー制御の手段
- コンシューマーが可用性についてプロデューサーに通知する方法
- 消費のスピードに一致するように上流のデータ生産を遅らせる
- Akka streamの文脈ではノンブロッキング・非同期として理解される(謎)
- Non-Blocking
- ある操作が完了まで長い時間がかかっても呼び出し元のスレッドの進行を妨げない、ということ
- Graph
- ストリームが実行されているときに要素が流れるべき経路を定義する
- ストリーム処理topologyとは...
- Processing Stage
- グラフを構築するブロックの共通する一般名称(?)
- 例: map, filter, カスタムGraphStage, MergeやBloadcastのようなグラフ接続
ストリームの定義と実行
Defining and running streams の項を見て基礎的なところを押さえておく。
Source
, Flow
, Sink
という各段階があって、Source
は1つの出力、Flow
は1つの入出力、Sink
は1つの入力を持ってる。
これらSource
とSink
をFlow
の前後に付けたものをRunnableGraph
と言っている。
データが上流から途中で処理を通りながら下流まで流れていくというイメージだろうか。
サンプルコードを実行しようと思うとimport文など足りないので補足しています。
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Keep, RunnableGraph, Sink, Source} import scala.concurrent.Future implicit val system = ActorSystem("basic") implicit val materializer = ActorMaterializer() val source = Source(1 to 10) val sink = Sink.fold[Int, Int](0)(_ + _) // connect the Source to the Sink, obtaining a RunnableGraph val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right) // materialize the flow and get the value of the FoldSink val sum: Future[Int] = runnable.run()
解説によると、SourceとSinkを接続してRunnableGraphを作成したあとでもmaterializedするまではデータは流れないとのこと。
ふむふむ。上記のコードでいうと source.toMat(sink)(Keep.right)
するとパイプラインはできたけどデータの放流はまだっすよ、というイメージだろうか。
run()
することでmaterializeされ実行結果(Future[Int]
型の値)が得られる。
Materializeは、グラフで表した計算を実行するのに必要なすべてのリソースを割り当てるプロセスと解説しています。なるほど。
source.toMat(sink)(Keep.right)
でmaterializeした値しかもSinkの値だけほしい(Keep.right
)と表しています。しかし、Sourceをmaterializeした値が欲しい場面ってどんなときなんだろうか?
上記の例は runWith()
で置き換えられて source.runWith(sink)
とかけます。
複数回materializeできて、それぞれで計算される。結果としては異なる値となる。 (実際に例のコード実行するとFuture[Int]型の値が返ってきて別のhashcodeを持っている。valueは55で等しい)
Defining sources, sinks and flows
SourceやSinkのさまざまな定義方法や、これらをつなぎ合わせる方法がいくつかあるようです。 こういうふうに役割が別れてることでパーツ化できてうれしいんだろうなという気がする。
あとは要素として null
を渡すことが許可されてないというのはおもしろい。普段Scalaで実装していて null
を使うことは極力避けてる(Option型使うなど)ので当たり前な感じもするが大事なことなのであろう。
雑感
半分くらいの理解だけど、こんなもんかーと思って進めていきたい。 今回のサンプルコードだとあんまり良さがわからないけど、バックプレッシャーというのがきっといいんだろうなという気がする。