Blogaomu

WEBアプリケーション開発とその周辺のメモをゆるふわに書いていきます。

Akka Streams ドキュメントを何回かに分けてちらちら見てみる(1)

最近、業務でAkka Streamsを使った処理に触れることがあって、詳しく知りたいなあと思ったのでドキュメントを読みながら触ってみようと思う。 一度には読み切れないのでいい感じに区切りながら進めていきたい。

Basics and working with Flows — Akka Documentation

主要なコンセプト

Core concepts

用語が定義されてるので雑に訳してみる。

  • Stream
    • データの移動、変形を含むアクティブなプロセス
  • Element
    • ストリームの処理単位。すべての処理は要素を変形して上流から下流に流す。
    • Buffer sizeのくだりはよく分からず...
  • Back-pressure
    • フロー制御の手段
    • コンシューマーが可用性についてプロデューサーに通知する方法
    • 消費のスピードに一致するように上流のデータ生産を遅らせる
    • Akka streamの文脈ではノンブロッキング・非同期として理解される(謎)
  • Non-Blocking
    • ある操作が完了まで長い時間がかかっても呼び出し元のスレッドの進行を妨げない、ということ
  • Graph
    • ストリームが実行されているときに要素が流れるべき経路を定義する
    • ストリーム処理topologyとは...
  • Processing Stage
    • グラフを構築するブロックの共通する一般名称(?)
    • 例: map, filter, カスタムGraphStage, MergeやBloadcastのようなグラフ接続

ストリームの定義と実行

Defining and running streams の項を見て基礎的なところを押さえておく。

Source, Flow, Sink という各段階があって、Sourceは1つの出力、Flowは1つの入出力、Sinkは1つの入力を持ってる。 これらSourceSinkFlowの前後に付けたものをRunnableGraphと言っている。 データが上流から途中で処理を通りながら下流まで流れていくというイメージだろうか。

サンプルコードを実行しようと思うとimport文など足りないので補足しています。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, RunnableGraph, Sink, Source}

import scala.concurrent.Future

implicit val system = ActorSystem("basic")
implicit val materializer = ActorMaterializer()

val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _)
 
// connect the Source to the Sink, obtaining a RunnableGraph
val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
 
// materialize the flow and get the value of the FoldSink
val sum: Future[Int] = runnable.run()

解説によると、SourceとSinkを接続してRunnableGraphを作成したあとでもmaterializedするまではデータは流れないとのこと。 ふむふむ。上記のコードでいうと source.toMat(sink)(Keep.right) するとパイプラインはできたけどデータの放流はまだっすよ、というイメージだろうか。 run() することでmaterializeされ実行結果(Future[Int]型の値)が得られる。 Materializeは、グラフで表した計算を実行するのに必要なすべてのリソースを割り当てるプロセスと解説しています。なるほど。

source.toMat(sink)(Keep.right) でmaterializeした値しかもSinkの値だけほしい(Keep.right)と表しています。しかし、Sourceをmaterializeした値が欲しい場面ってどんなときなんだろうか? 上記の例は runWith() で置き換えられて source.runWith(sink) とかけます。

複数回materializeできて、それぞれで計算される。結果としては異なる値となる。 (実際に例のコード実行するとFuture[Int]型の値が返ってきて別のhashcodeを持っている。valueは55で等しい)

Defining sources, sinks and flows

SourceやSinkのさまざまな定義方法や、これらをつなぎ合わせる方法がいくつかあるようです。 こういうふうに役割が別れてることでパーツ化できてうれしいんだろうなという気がする。

Illegal stream elements

あとは要素として null を渡すことが許可されてないというのはおもしろい。普段Scalaで実装していて null を使うことは極力避けてる(Option型使うなど)ので当たり前な感じもするが大事なことなのであろう。

雑感

半分くらいの理解だけど、こんなもんかーと思って進めていきたい。 今回のサンプルコードだとあんまり良さがわからないけど、バックプレッシャーというのがきっといいんだろうなという気がする。