Blogaomu

WEBアプリケーション開発とその周辺のメモをゆるふわに書いていきます。

fluentd の filter_record_transformer プラグイン便利っぽい

fluentd使っていて、入力データの特定フィールドの値をごにょごにょして出力したい(次の工程に渡したい)っていう需要がありました。それでどうやるんだろうとググっていたら filter_record_transformer というプラグインが使えそうだ、ということで使ってみました。fluentdバージョンは 0.12.x です。

record_transformer Filter Plugin | Fluentd

fluentd に内包されているのでプラグインのインストールは必要ありません😄

書き方はこのようになります。 <record> ディレクティブの中にフィールド名とその値を書きます。NEW_FIELD と言っていますが、入力に同名のフィールドがあればその値が NEW_VALUE にマップされる動作になっています。

<filter foo.bar>
  @type record_transformer
  <record>
    NEW_FIELD NEW_VALUE
  </record>
</filter>

また enable_ruby というオプションを有効にすると Ruby 式の評価結果を値にできるようです。計算処理や文字列操作なんかで使えそうな感じですね。便利。

自分のやりたかったこととしては、以下のように値に含まれる特定の文字列を取り除きたかったので、 gsub メソッドを使いました。

<filter foo.bar>
  @type record_transformer
  enable_ruby
  <record>
    hoge ${record["hoge"].gsub("P-", "")}
  </record>
</filter>
{"hoge":"P-1234A567890"}

このような入力があると、出力は以下になります。map関数を適用した感じになって便利だなあと思ったのでした。

{"hoge":"1234A567890"}

Akka Streams ドキュメントを何回かに分けてちらちら見てみる(2)

Akka Streams ドキュメントを何回かに分けてちらちら見てみる(1)の続き。

バックプレッシャー

Back-pressure explained

  • Akka Streamsの利用者はバックプレッシャーをハンドリングするコードを書く必要がない。
    • ライブラリ側でいい感じに処理してくれる
  • 下流のSubscriberが受け取ることのできるそしてバッファすることのできる要素数(demand)という観点でバックプレッシャープロトコルが定義されている。
  • データソース(Reactive StrteamsではPublisher, Akka StreamsではSource)は与えられるどのSubscriberのdemandの総数よりも多くの要素を出さないことを保証する。

遅いPublisher、速いSubscriber

  • このケースではPublisherを遅くする必要がない。
    • たしかにもともと下流の方が要素をより多く受け取れるので詰まることがなく調整しなくてよい。
  • しかし、signaling rates(?)はめったに一定ではないし、任意の時点で変わりうる、最終的にはSubscriberがPublisherよりも遅い状況に至る。
    • リソースの状況によってはPublisher, Subscriberの処理速度が変わるということを言ってるのだろうか?
  • これらの状況から保護するためにバックプレッシャープロトコルは有効なままにしておく必要がある。でもこれを有効のままにしておくことによる影響を受けたくはない。
  • Reactive Streamsプロトコルは、SubscriberからPublisherに Request(n:Long) *1の信号を非同期に送ることでこの問題を解決している。
  • このプロトコルは、送られてきたdemandよりも多い要素をPublisherが送らないことを保証している。

速いPublisher、遅いSubscriber

  • Publisherをバックプレッシャーするのが必須なケースである。
  • Publisherは以下の戦略の一つを適用してバックプレッシャーに従う必要がある。
    • 生成速度をコントロールできるならば、要素を生成しない
    • より多くのdemandが通知されるまで、bounded(?)な方法で要素をバッファリングする
    • より多くのdemandが通知されるまで、要素を使わない(破棄する?)
    • 上記の戦略を適用できない場合、ストリームを取り壊す

Stream Materialization

Stream Materialization

  • MaterializationはAkka Streamsにおいてはアクターの起動、ファイルオープン、ソケット接続などを意味する。
    • 必要なリソースに依存した処理を行う。
  • run()runWith()runForeach(el => ...) というメソッドでmaterializationが始まる。
    • runForeach(el => ...)runWith(Sink.foreach(el => ...))シンタックスシュガーである
  • Materializationはmaterializeしているスレッド(?)で同期的に行われる。
  • ストリーム処理はmaterializationの間に起動したアクターによって扱われる。
    • これらアクターは ActorMaterializer の作成時に MaterializationSettings でセットされるディスパッチャーで設定されたスレッドプールを使う。
    • 今までの例だと ActorMaterializer() で特に設定を渡していないのでデフォルトのディスパッチャーが使われている。

Operator Fusion

  • Fusing
  • asyncBoundary
  • ストリーム処理のステージを融合して、同期的に計算する方が早い場合は効率化できるっていうイメージなのか?

Combining materialized values

  • materializeされた値の合成
    • run したあと(materializeしたあと)に得られる値を RunnableGraph生成時に指定できる
  • Keep.right Keep.both
  • materializeされた値をmapできる
  • GraphDSL という便利DSLが使える

GraphDSLの例(抜粋)。

// An source that can be signalled explicitly from the outside
val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]
 
// A flow that internally throttles elements to 1/second, and returns a Cancellable
// which can be used to shut down the stream
val flow: Flow[Int, Int, Cancellable] = throttler
 
// A sink that returns the first element of a stream in the returned Future
val sink: Sink[Int, Future[Int]] = Sink.head[Int]

// The result of r11 can be also achieved by using the Graph API
val r12: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] =
  RunnableGraph.fromGraph(GraphDSL.create(source, flow, sink)((_, _, _)) { implicit builder => (src, f, dst) =>
    import GraphDSL.Implicits._
    src ~> f ~> dst
    ClosedShape
  })

Stream ordering

  • Akka Streamsではほぼ全ての計算ステージで要素の入力順序を保持する
  • mapAsync のような非同期操作でもこれは維持される(mapAsyncUnorderdという順序を保証しない版のメソッドもある)
  • ただ、複数の入力を扱うもの(Mergeなど)では出力順は定義されていない。
  • Zip では出力順は保証されている。
  • ファン・インの場合に出力順をコントロールしたい場合は、 MergePreferred または GraphStage を使う

所感

  • バックプレッシャーをライブラリ側でいい感じにやってくれるのは非常にありがたい
  • boudaryの概念がまだつかめていない。
    • 文字通り境界を表すのだろうけども...

*1:ドキュメントでは引数の型がIntになっているがソースコードではLongになっている https://github.com/akka/akka/blob/v2.4.10/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala#L48

Akka Streams ドキュメントを何回かに分けてちらちら見てみる(1)

最近、業務でAkka Streamsを使った処理に触れることがあって、詳しく知りたいなあと思ったのでドキュメントを読みながら触ってみようと思う。 一度には読み切れないのでいい感じに区切りながら進めていきたい。

Basics and working with Flows — Akka Documentation

主要なコンセプト

Core concepts

用語が定義されてるので雑に訳してみる。

  • Stream
    • データの移動、変形を含むアクティブなプロセス
  • Element
    • ストリームの処理単位。すべての処理は要素を変形して上流から下流に流す。
    • Buffer sizeのくだりはよく分からず...
  • Back-pressure
    • フロー制御の手段
    • コンシューマーが可用性についてプロデューサーに通知する方法
    • 消費のスピードに一致するように上流のデータ生産を遅らせる
    • Akka streamの文脈ではノンブロッキング・非同期として理解される(謎)
  • Non-Blocking
    • ある操作が完了まで長い時間がかかっても呼び出し元のスレッドの進行を妨げない、ということ
  • Graph
    • ストリームが実行されているときに要素が流れるべき経路を定義する
    • ストリーム処理topologyとは...
  • Processing Stage
    • グラフを構築するブロックの共通する一般名称(?)
    • 例: map, filter, カスタムGraphStage, MergeやBloadcastのようなグラフ接続

ストリームの定義と実行

Defining and running streams の項を見て基礎的なところを押さえておく。

Source, Flow, Sink という各段階があって、Sourceは1つの出力、Flowは1つの入出力、Sinkは1つの入力を持ってる。 これらSourceSinkFlowの前後に付けたものをRunnableGraphと言っている。 データが上流から途中で処理を通りながら下流まで流れていくというイメージだろうか。

サンプルコードを実行しようと思うとimport文など足りないので補足しています。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, RunnableGraph, Sink, Source}

import scala.concurrent.Future

implicit val system = ActorSystem("basic")
implicit val materializer = ActorMaterializer()

val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _)
 
// connect the Source to the Sink, obtaining a RunnableGraph
val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
 
// materialize the flow and get the value of the FoldSink
val sum: Future[Int] = runnable.run()

解説によると、SourceとSinkを接続してRunnableGraphを作成したあとでもmaterializedするまではデータは流れないとのこと。 ふむふむ。上記のコードでいうと source.toMat(sink)(Keep.right) するとパイプラインはできたけどデータの放流はまだっすよ、というイメージだろうか。 run() することでmaterializeされ実行結果(Future[Int]型の値)が得られる。 Materializeは、グラフで表した計算を実行するのに必要なすべてのリソースを割り当てるプロセスと解説しています。なるほど。

source.toMat(sink)(Keep.right) でmaterializeした値しかもSinkの値だけほしい(Keep.right)と表しています。しかし、Sourceをmaterializeした値が欲しい場面ってどんなときなんだろうか? 上記の例は runWith() で置き換えられて source.runWith(sink) とかけます。

複数回materializeできて、それぞれで計算される。結果としては異なる値となる。 (実際に例のコード実行するとFuture[Int]型の値が返ってきて別のhashcodeを持っている。valueは55で等しい)

Defining sources, sinks and flows

SourceやSinkのさまざまな定義方法や、これらをつなぎ合わせる方法がいくつかあるようです。 こういうふうに役割が別れてることでパーツ化できてうれしいんだろうなという気がする。

Illegal stream elements

あとは要素として null を渡すことが許可されてないというのはおもしろい。普段Scalaで実装していて null を使うことは極力避けてる(Option型使うなど)ので当たり前な感じもするが大事なことなのであろう。

雑感

半分くらいの理解だけど、こんなもんかーと思って進めていきたい。 今回のサンプルコードだとあんまり良さがわからないけど、バックプレッシャーというのがきっといいんだろうなという気がする。

kanazawa.rb meetup #48 に参加しました #kzrb

金沢に戻ってきてかねてから行ってみたかったkanazawa.rbに参加してきました。

kanazawa.rb meetup #48 - kanazawa.rb | Doorkeeper Meetup #48 - Kanazawarb

この回は「祝4周年LT大会」ということで、参加者は1人1本以上LTを発表するという縛りがありました。私は初参加だったので自己紹介も兼ねて2本発表しました。

『長めの自己紹介』

Rubyとの関わりや影響を受けたShibuya.rbの紹介、最近の興味などについて話しました。順番としてはまずこれを発表して次にリモートワークの話ができればスムーズだったのですが、shuffleの結果自己紹介LTが後になってしまいましたw 他のコミュニティに参加する機会はそれほど多くないと思うので、Shibuya.rbの話はみなさん興味を持ってくれた印象がありました。

『リモートワーク1ヶ月経った』

もう一本はリモートワークについて1ヶ月やってみた感想を話しました。会社や組織によってもリモートワークの形態や考え方は異なると思うので、あまり一般的な話というよりかは個人の感想として話しました。意識してやってることのチャットに現れたりWIPでもPRを出すっていうのは、リモートでも「いる感」を出すっていうのに繋がってるのかなと思いました。

また、会議のくだりはいい感じに集音できるマイクだったり安定した回線が必要だろうなあと思っています。教育については私一人ではどうにもならないっていうところはあるんで組織全体でどうしていくのか考えないといけないなと資料を作ってて思いました。オンライン上でうまくやる方法を見つけるのか、一定の期間人のいるところで力をつけてリモートなり実践に送り込むのか、方法はいろいろあると思うんで何か最適なものを見つけられたら良いです。

あとはオフィスで常勤しているメンバーとリモートワークしてるメンバーがN:1の関係性になるので、もう少しリモートワークするメンバーが増えてくるとチームの考え方も変わるのかなあとうっすら思いました。

その他感想

今回初めて Google slides でスライドの作成からそのまま公開までやってみたんですが、なかなか便利でした。Webブラウザがあればどの端末でも編集できるし、公開するのもボタンポチッとすればいいだけ(アップロードしないので即公開できる)なんで楽ちんですね。SlideShareやSpeakerDeckのように、特定のユーザーのスライドリストみたいのはなさそう(用途がちがう)なんで、その辺はどうするのがいいか悩ましいところはありますね。

LT祭りということで他の人のLTもたくさん聞いたわけですが、皆さんそれぞれ興味のある分野が異なってて面白かったです。参加者だったりコミュニティのベースにあるものとしてはRubyなんだと思いますが。それだけに捕らわれないところも良いなあと思いました。

4周年ということでkanazawa.rb手ぬぐいを頂きました!

#kzrb 手ぬぐいもろた

まだ余っていそうだったので、次回のmeetupに来たらもしかしたら貰えるのでは?(宣伝)次回は9/11日曜日開催とのことです。

懇親会

火鍋でした。ひたすら肉食べてた気がするw

火鍋ー

金沢でリモートワーク始めました

今週から金沢の実家でリモートワークを始めました。まだ始めて4日間ですが金曜まで終えて一区切りと言うことで感想でも書いてみようと思います。

背景

もともと東京のオフィスに通って働いていたのですが、家庭の事情により実家に戻ることになりました。金沢に拠点を置く企業に転職するという選択肢もありましたが、積極的に今の会社を辞める理由も無かったので上司や社長に事情を話したところOKが出たので今に到っています。

会社では他のチームで週一のリモートワークが認められており運用実績があったこと、また私が携わっているチームでも別拠点で働いているメンバーがいたり深夜メンテナンス作業で自宅で作業したりなどと自分自身でも少ないながらも実績があったことがOKを出しやすかった要因なのかなと思っています。ただ、私のように完全リモートかつ関東圏以外に拠点を置くのは社内初の試みなので人柱感半端ないですw

そもそもどういう仕事をしているのかという話なのですが、現在携わっているのはコンシューマー向けアプリの開発・運用で、私の行う作業としては、APIの設計・実装・コードレビュー、お問い合わせに関する調査や対応、AWS上で構築されているサーバー群の運用というところがメインです。こういう具合なのであまり一人でもくもくと作業するという感じではなく、同期/非同期にコミュニケーション取りながら進めていくことが多いです。

準備

準備としてここ1ヶ月くらいは約週1回のペースで東京もしくは金沢の家で仕事するようにしていました。私もオフィスにいるメンバーもお互いに慣れる期間として設けていました。普段からSlackやSkypeのチャットベースでやり取りしていたので、リモートワークでもそこは普段通りです。まだ自分だけリモート越しの会議をやっていないのでこれはどうなるか気になってます。当初はオフィスだけで話が進んでてリモートで自分がやるべき対応が遅れてしまったなんてこともありましたが、このところはそういう問題も少ない気がします。

あとは、他のメンバーがQiita:Teamに書いたリモートワーク制度についての記事を見て参考にしたり、実際にリモートワークしているメンバーにどんな感じですか〜というのを聞いたりして知見を増やしていました。

実際やってみて

気付いたら作業しっぱなしになる

休憩取るのは自分のペースで行けるんですが、逆に休憩を取るタイミングが無さすぎて気付いたら昼過ぎてたとか日が暮れてたというのはあります。オフィスにいるときはSlackの雑談チャンネルで腹減ったーとか言って昼飯を食べに行ってたので、半強制的?に作業が切れていました。今は実家(両親が住んでる)なので母親がご飯を準備してくれてじゃあできたので食べようとなるんですが、初日に仕事のキリが悪くあとでーとなったので、もう自分のタイミングで食べてくれってなりました(汗) キリが悪くても一緒に食べるのが理想ではありますが。

通勤が無くなった

自由に使える時間が増えました。といっても今週はまだ慣れてないので疲れて横になってしまってますが。ただ、歩くことも減ってしまったので意識的に動かないと体が鈍りそうだなと思ってます。

お土産が貰えない

弊社では、週末にどこどこ行ってきてお土産買ってきましたー、という風景がしばしば見られるのですがリモートワークだとその恩恵にあずかれないことが分かってしまいました。。まさかこんなデメリットがあるとは思いませんでした(汗)

おわりに

まだ始まったばかりなので、これからいろいろなことが起こるんだとは思いますが、ひとまずは1週間終えたというところで良かったです。慣れてきたら周辺の技術コミュニティーにも顔出していこうかなと思ってます。

f:id:TAKAyuki_atkwsk:20160719095803j:plain

2015年個人的振り返り

今年の初postが振り返り記事という何とも残念な感じですが、毎年のように振り返っていきます。

仕事

今年はひたすら新規プロジェクトをリリースまで持っていくということをやってました。前半はPlay(Scala)を使ってWebAPIを量産し、後半はチーム管理をやりつつコード書きつつインフラ整備しつつという感じでした。

Scalaについては1年間触ってきて、また経験者と一緒に仕事することでPlay上である程度はやりたいことはできるようになってきたと感じています。しかし非同期処理やファイル操作に関してはまだパッと書けるレベルではないので来年の課題です。

上記でもあるように年の後半は複数の役割をこなさないとならない状況でだいぶ苦しかったですね。慣れない管理や調整、コードを書きたいのに時間がないジレンマ、などなど思うように仕事が進まないストレスもだいぶありました。しかし、最近になって自分はコードを書かない(必要なら書く)というスタンスに切り替えてからは時間の余裕もでき、心の余裕もできつつあるかなと思います。逆に全くコードを書かなくなってしまうのも微妙感あるので、この辺りのバランスを取るのが来年の課題でしょうかね。

仕事以外

前半はScala勉強会(rpscala)やScala TV、Effective Akka読書会などScala系の勉強会に顔を出していたんですが、業務が忙しくなるにつれて全く参加できなくなってしまいました。また社内でも実践DDD読書会を@tyabeさんとやってたんですが、業務の忙しさを理由に休止となってしまいました(来年からは復活するぞ)。

今年のOSS活動ですが以下になります。どちらもドキュメントの修正でした。ドキュメント読んでて、あれ何か実装とずれてる?と思い修正出してみました。

つい先日ですが初めて北陸のコミュニティに顔を出しました。東京でずっと活動していて地元の開発者のつながりがほとんど無かったので新鮮でした。

仕事以外ではありますが、仕事の比重が大きくなってプライベートがおろそかになったのが反省すべき点でした。体作りや交友関係、勉強や趣味の時間をこれでもかというぐらい削ったので来年はこちらの方にも時間を振り分けられるようにしていきたいです。

来年に向けて

だいたい既に書いてきている事ではありますが、足りないスキルを身に付けること、仕事におけるリソース配分、生活におけるリソース配分というところを意識して過ごしていきたいです。また秋ぐらいから数学欲がまた湧いてきたので知識を身に付けていきたいです。

それでは皆様今年もお世話になりました。良いお年を。

2014年個人的振り返り

今年ももうすぐ終わりそうなので、毎年恒例の振り返りを書こうと思います。

なにやってたっけ?

仕事ではあいかわらず Rails / Ruby メインでコードを書くことが多かったです。 今年は(も?)あんまり Angular.js は触ってません。私の代わりに @mknkisk ががんばってくれましたw

去年と異なるのはインフラだとか開発基盤を見ることが多かったのかなということです。

バージョンアップ系

Ruby 1.9 系から 2.1 系へのバージョンアップを行ったのですが、バージョンを上げる際に既存の処理が動かなくなる心配があったので、 テストコードを充実させることを私が中心となって開発チームで進めました。 これをきっかけに自分以外の人もテスト書くようになってきたので副産物的によかったです。

あとは MongoDB を 2.4 -> 2.6 に上げるサポートをしました。というかこれはほぼ @mknkisk 先生のがんばりです。 ありがとうございました!

コアじゃない部分は外部サービスに任せる

今まで自前で管理していたこと(メール配信、CI)を外部サービスを使うようにしました。 メール配信、移行前は大量のメールを配信する難しさ、暗黙知みたいなところが多くてとても苦労しました。 移行自体は @sakagami3 がやってくれました、ありがとうございます。

CIも今までは Jenkins を自分たちで運用していて、出勤したらいつの間にか OutOfMemoryError で動かなくなっていたり などなかなか見守りが大変でした。Shibuya.rb の常連さんである @ryonext さんが CircleCI を導入したという話をブログで書いていたので 直接知見を聞き、弊社も無事に導入できました。ありがとうございます。

餅は餅屋に任せておけばいい的な考え方が今年は会社に浸透したのかなあという気がします。

インフラ周りその他

セキュリティに関してのEC2インスタンスメンテナンスアップデートが割と大規模に適用されててんやわんやしてましたw
事前に再起動すれば回避できるとかできないとか情報が錯綜していて混乱してましたが、なんとか対応できました。

オペレーションの自動化や Infrastructure as Code に関しては、 hubotを導入したり(まだ導入しただけでchat-opsまではできてない)、ほっかい道場をきっかけに Ansible 触ってみたり ということをやりました。ちょっと中途半端に終わってしまいましたね...。

開発しやすくする仕組みとか

どうやったら開発しやすくなるかな、というのを考えたり実践してみたりしたのも今年っぽかった項目だと思います。 ペアプロをやってみたり、スクラム的なことをやってみたり、プルリクを通したコードレビューをやってみたりなどなど。 個人レベルでは、去年から引き続きテストコードを書き続け、知見をドキュメントに残したり、設定ファイルをGitで管理する みたいなことをやってました。開発チーム内で習慣になりつつあることも増えてきたので、そこは良かったなと思います。

仕事以外の話

去年の年末ぐらいから関数型言語を触り始めました。最初は Haskell を勉強してましたが最近は Scala 寄りになっています。 関数や代数的データ型の考え方にはまだ慣れない部分が多いですが、こういう考え方やアプローチもあるのか! と新たな発見があったりしておもしろいです。まだまだ時間をかけて勉強と実践を続けていきたいと思います。

自分でも驚いたのですが Shibuya.rb に顔を出すようになったのも今年からでした。 存在自体は去年から知っていたのですがタイミングが合わずにいけなかったのです。 何回か参加するうちに顔なじみの人が増えてきて一月に一度の密かな楽しみの一つとなっています。

EC Night というミートアップにて発表を行いました。40-50人くらいの規模での発表は初めてだったので かなり緊張しましたがいい経験になりました。

OSS活動はあまりできませんでしたが、こんな感じでした。Vimまわりで貢献できたのはとてもうれしいです!

mina-slackは会社でごくごく普通に使われているのでいい感じです。

とかなんとかやってたら1年終わってました。まさにあっという間だった...。

自分の限界?

やってたことの範囲が広すぎて、深く学ぶことがなかなかできなかったような気がしました。 とはいえ、去年に挙げたやりたいこと項目はひと通りできたように思います。

何かを作るモチベーションと自分の力の無さ

「あー、これあったら便利だなあ」と思って作り始めてみるものの、いらんところでハマったりしてそのままズルズルモチベーションが下がったことが多かったです。 プロトタイプぐらいはさくっと作れるようにしておきたいと強く感じました。とは言えども、2つ世の中に出せたのは良かったのかなと思います。

新しいこともやってみたいけど、それを実現するための基礎がしっかりしていないとだめだなあと実感した1年でした。

来年やりたいこと

アプリケーションでもライブラリでも思いついたものをとりあえず作りきる、ことをやりたいです。 これを補完するものとしてRubyだとかデザインだとか、引き続き関数型言語とかやる感じですかね。

会社に所属していて(インターネットコンテンツとして)私個人がWebメディアに出る機会が増えてきたので エンジニアとしての実力も伴っていかないといかんなーと気を引き締める2014年末であります。

また、今年はさまざまな人に助けられた1年でした。この場を借りて御礼申し上げます。

それでは良いお年を。