Blogaomu

WEBアプリケーション開発とその周辺のメモをゆるふわに書いていきます。

Akka Streamsでgzip圧縮されたテキストファイルを読み込んでデータを扱うサンプル

Akka Streamsのドキュメントをパラパラと見ていたら Dealing with compressed data streams という項目があって、なんか便利そうだなあと思ってサンプルコードを書いて試してみました。

Streams Cookbook — Akka Documentation

package sample

import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.{ActorMaterializer, Attributes}
import akka.stream.scaladsl._
import akka.util.ByteString

/**
  * Created by takayuki_atkwsk on 2017/02/05.
  */
object DecompressExample {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("decompress-example")
    implicit val materializer = ActorMaterializer()

    FileIO.fromPath(Paths.get("in.txt.gz"), 100) // ファイルから100バイトごと読み込む
      .log("compressed").withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
      .via(Compression.gunzip()) // akka.stream.scaladsl.Compression
      .log("decompressed").withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
      .map(_.utf8String)
      .map(_.toUpperCase)
      .map(ByteString.fromString) // FileIO.toPathに繋げるときはByteStringで渡す
      .to(FileIO.toPath(Paths.get("out.txt")))
      .run()
  }
}

テキストファイルをgzipした in.txt.gz を用意しておいて上記を実行すると out.txt に大文字になったテキストが出力されます。

Lorem ipsumを書いて圧縮したファイルを用意して実行するとログはこんな感じで吐かれます。

[INFO] [02/08/2017 20:44:36.092] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [compressed] Element: ByteString(31, -117, 8, 8, 36, 12, -105, 88, 0, 3, 104, 111, 103, 101, 46, 116, 120, 116, 0, 85, 83, 75, -118, -36, 64, 12, -35, -49, 41, 116, -128, -90, 111, -47, -112, 64, 2, 3, 33, -39, -53, 46, -75, 91, 80, 31, -113, -86, -28, 76, 114, -6, 60, -107, 103, -96, -77, -15, -62, -91, -49, -5, -23, 91, 51, 41, -92, 123, -9, 66, -87, -27, 102, -44, 117, 16, 23, 25, 23, 122, 115, -91, 71, 43, 98, -115, -122, -23, -32, -86, -108, 116, 21, -109, 65, -15, 124, 104, 39, -82)
[INFO] [02/08/2017 20:44:36.095] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [decompressed] Element: ByteString(76, 111, 114, 101, 109, 32, 105, 112, 115, 117, 109, 32, 100, 111, 108, 111, 114, 32, 115, 105, 116, 32, 97, 109, 101, 116, 44, 32, 113, 117, 105, 32, 104, 111, 109, 101, 114, 111, 32, 116, 114, 105, 116, 97, 110, 105, 32, 100, 105, 99, 101, 114, 101, 116, 32, 101, 116, 44, 32, 118, 105, 115, 32, 97, 110)
[INFO] [02/08/2017 20:44:36.096] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [compressed] Element: ByteString(36, -17, 98, -85, 18, -37, -26, 69, -22, -16, 114, -95, -38, -86, -105, 63, 81, 60, -72, -45, 46, 70, 85, -82, -12, 106, 74, -19, 126, -105, -102, -80, 1, 63, 117, -11, -116, 17, -30, 87, -6, -71, -24, -101, 11, -83, -83, 30, 82, -43, -124, -6, 106, -70, -60, 52, -61, 59, 61, -72, -57, -70, 119, 76, -20, -79, -97, -12, -77, 14, 29, 93, -122, 68, 29, -41, 11, -3, -42, -82, -12, -41, 52, 83, 102, 79, 32, -42, -123, 1, -16, 74, 55, 107, -99, -114, 103, 38, 111, -34)
[INFO] [02/08/2017 20:44:36.096] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [decompressed] Element: ByteString(32, 101, 120, 101, 114, 99, 105, 32, 97, 114, 103, 117, 109, 101, 110, 116, 117, 109, 44, 32, 110, 111, 110, 117, 109, 121, 32, 100, 105, 99, 116, 97, 115, 32, 112, 101, 114, 32, 110, 101, 46, 32, 80, 114, 105, 32, 111, 102, 102, 101, 110, 100, 105, 116, 32, 112, 101, 114, 105, 99, 117, 108, 105, 115, 32, 101, 117, 46, 32, 85, 98, 105, 113, 117, 101, 32, 99, 111, 110, 118, 101, 110, 105, 114, 101, 32, 115, 99, 114, 105, 98, 101, 110, 116, 117, 114, 32, 101, 117, 32)... and [86] more
[INFO] [02/08/2017 20:44:36.097] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [compressed] Element: ByteString(-120, -63, 34, -48, 29, 26, -112, 38, 125, -12, -84, 122, -24, -30, -63, -19, 74, 63, -128, 99, 31, 76, -100, 116, -57, 110, -83, -37, 19, 81, -88, 64, -107, -53, -11, -27, -27, -26, 84, -76, 106, -63, 84, -106, 96, 114, 23, -33, 24, 51, 37, -28, -24, 31, 67, -110, -36, 81, 36, 11, 7, -23, 2, -107, 46, -76, -6, -108, -79, -107, -118, 111, -26, 5, 0, 58, -108, 106, 61, 92, 48, -39, 77, -53, 84, -26, -127, 103, 5, -98, -17, 14, -11, -64, 122, -41, -42, 89, -94, 29)
[INFO] [02/08/2017 20:44:36.097] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [decompressed] Element: ByteString(111, 32, 97, 116, 44, 32, 112, 101, 114, 32, 118, 105, 100, 105, 116, 32, 100, 111, 108, 111, 114, 101, 109, 32, 99, 105, 118, 105, 98, 117, 115, 32, 97, 110, 46, 32, 83, 99, 114, 105, 112, 116, 97, 32, 97, 100, 105, 112, 105, 115, 99, 105, 110, 103, 32, 97, 114, 103, 117, 109, 101, 110, 116, 117, 109, 32, 101, 116, 32, 110, 97, 109, 46, 10, 10, 69, 117, 32, 109, 105, 110, 105, 109, 32, 113, 117, 97, 101, 113, 117, 101, 32, 102, 101, 117, 103, 97, 105, 116, 32)... and [98] more
[INFO] [02/08/2017 20:44:36.098] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [compressed] Element: ByteString(37, -105, 96, -90, 36, -8, 14, 37, 31, -42, -94, -91, -74, 120, 3, 5, 76, -38, -116, 101, -59, -120, -46, -78, -12, -95, -24, 60, 116, 56, -88, -13, -28, 118, 33, -127, 60, 24, 6, -47, -108, -53, 7, -48, -95, -83, -54, 100, -57, 51, 20, 107, 51, -13, -78, -1, 7, -20, 76, 65, -103, -88, -4, 52, -79, 84, -23, 108, -21, 99, -22, 115, -91, 95, 33, 122, -25, 125, -58, -90, 72, 6, -66, 8, 8, -90, -107, -46, 18, 60, -56, -24, -46, 61, -100, 28, 80, 53, 118, -10)
[INFO] [02/08/2017 20:44:36.098] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [decompressed] Element: ByteString(112, 114, 105, 44, 32, 118, 101, 114, 105, 32, 101, 118, 101, 114, 116, 105, 32, 117, 116, 114, 111, 113, 117, 101, 32, 110, 111, 32, 112, 114, 105, 46, 32, 83, 105, 116, 32, 103, 114, 97, 101, 99, 105, 115, 32, 109, 111, 108, 101, 115, 116, 105, 97, 101, 32, 118, 105, 116, 117, 112, 101, 114, 97, 116, 97, 32, 97, 100, 44, 32, 101, 111, 115, 32, 99, 117, 32, 97, 117, 100, 105, 97, 109, 32, 100, 101, 102, 105, 110, 105, 116, 105, 111, 110, 101, 109, 46, 10, 10, 69)... and [112] more
[INFO] [02/08/2017 20:44:36.098] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [compressed] Element: ByteString(19, -8, -51, -89, -25, 40, 80, 12, -128, 1, 83, 127, 56, -122, -127, -64, 90, -37, -126, -43, 80, 97, 23, 0, -50, 18, -128, 103, -109, -124, 17, -110, 117, -117, -92, -63, -29, 86, 26, 112, 99, 86, 112, 121, -123, -35, 120, 27, -92, 57, 123, -102, -36, -69, -108, 8, -63, 84, -97, 11, -116, -113, -48, 12, 80, 11, 121, -112, -118, 18, -82, 67, 91, 97, 3, -8, 32, 124, -118, 122, 120, -34, 125, -16, -128, -104, 45, 59, 12, 54, 101, -46, 20, 38, -83, 26, 99, -72, -82, 35)
[INFO] [02/08/2017 20:44:36.098] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [decompressed] Element: ByteString(32, 112, 114, 105, 46, 32, 69, 117, 32, 118, 105, 120, 32, 97, 108, 105, 105, 32, 115, 97, 100, 105, 112, 115, 99, 105, 110, 103, 44, 32, 112, 114, 111, 32, 99, 117, 32, 110, 111, 98, 105, 115, 32, 115, 97, 101, 112, 101, 32, 100, 101, 108, 101, 110, 105, 116, 105, 46, 32, 69, 117, 32, 101, 110, 105, 109, 32, 101, 108, 105, 103, 101, 110, 100, 105, 32, 97, 116, 111, 109, 111, 114, 117, 109, 32, 118, 105, 109, 46, 10, 10, 80, 101, 114, 32, 101, 108, 105, 116, 32)... and [103] more
[INFO] [02/08/2017 20:44:36.099] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [compressed] Element: ByteString(114, -12, -119, 0, 49, -98, 38, 126, 1, -18, -107, -69, 80, 91, -14, -108, 44, 75, 96, -123, 5, 2, 47, 59, 60, 2, 60, -88, -79, 33, -32, -56, -41, 60, -98, -101, 126, -58, -125, 30, -88, 53, 24, -114, 116, 49, -46, -123, 19, 60, -11, 127, -118, 11, -8, -32, -88, 112, -124, -92, -90, 30, -9, 36, -72, -109, 104, -87, -78, -50, -32, 33, -2, -119, 0, 30, 122, -97, -7, 60, -97, -67, 59, 40, -52, 92, 34, -111, 25, 96, 42, -64, 36, -127, -9, 18, 33, 9, 5, -35)
[INFO] [02/08/2017 20:44:36.099] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [decompressed] Element: ByteString(117, 115, 32, 97, 116, 111, 109, 111, 114, 117, 109, 32, 104, 97, 115, 32, 105, 110, 46, 32, 72, 105, 115, 32, 99, 97, 115, 101, 32, 111, 98, 108, 105, 113, 117, 101, 32, 108, 101, 103, 101, 110, 100, 111, 115, 32, 101, 105, 44, 32, 115, 111, 108, 101, 116, 32, 105, 110, 116, 101, 103, 114, 101, 32, 110, 97, 109, 32, 110, 101, 46, 32, 69, 105, 32, 108, 97, 98, 111, 114, 101, 115, 32, 104, 101, 110, 100, 114, 101, 114, 105, 116, 32, 101, 97, 109, 44, 32, 97, 110)... and [112] more
[INFO] [02/08/2017 20:44:36.099] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [compressed] Element: ByteString(-58, -109, 125, 41, 104, 77, 124, 9, 23, 117, -9, 13, 83, 70, 51, 28, -62, 104, -72, 90, 77, -28, 80, 26, 87, -11, 104, -16, -19, 89, 41, 71, -37, -49, -48, 51, 33, 52, -100, -89, 68, 90, -5, 48, 95, 1, -53, 34, -13, 87, -6, 90, 103, -20, 118, -88, -36, 117, -85, 122, 23, -108, 5, -59, -72, -96, 64, -22, 39, 104, -104, 9, -87, 31, -68, 8, 28, 60, -81, 26, 85, -123, -78, 46, 6, -71, 55, 95, -60, -96, 103, 13, -125, 94, -2, 1, -71, 19, 69, 70)
[INFO] [02/08/2017 20:44:36.100] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [decompressed] Element: ByteString(114, 116, 111, 32, 99, 111, 109, 109, 111, 100, 111, 32, 97, 100, 32, 104, 97, 115, 46, 32, 69, 105, 32, 100, 117, 111, 32, 102, 117, 103, 105, 116, 32, 116, 111, 114, 113, 117, 97, 116, 111, 115, 44, 32, 105, 100, 32, 117, 108, 108, 117, 109, 32, 99, 104, 111, 114, 111, 32, 118, 111, 108, 117, 112, 116, 97, 114, 105, 97, 32, 105, 117, 115, 46, 32, 85, 116, 32, 118, 105, 100, 101, 32, 109, 97, 108, 111, 114, 117, 109, 32, 105, 110, 115, 116, 114, 117, 99, 116, 105)... and [93] more
[INFO] [02/08/2017 20:44:36.103] [decompress-example-akka.actor.default-dispatcher-3] [akka.stream.Log(akka://decompress-example/user/StreamSupervisor-0)] [compressed] Element: ByteString(-11, 4, 0, 0)

ちょっと実戦的な使いどころが怪しげですが、gzip圧縮されたデータを扱う場合には役に立ちそうな感じです。