後半はこちらです
目的
とても分厚いAkka実践バイブルを読んでいるので、自分の印象に残ったところをメモ程度にまとめます。全部で17章あるので、前後半で半分に分けて行きます。
第1章 Akkaの紹介
アプリケーションをスケールさせる時は、次の2つが目標となる。
- 複雑性は可能な限り低く留めておくこと
- リソースを効率的に使用するいこと
Akkaとは何か?
Lightbendによって開発されたオープンソースプロジェクト。Akkaは並行・分散アプリケーションをよりシンプルに単一のモデルで実装するプログラミングモデルを提供する。そのモデルは アクターモデル と呼ばれている。
Akkaの特徴としては、アプリケーションのスケールアップとスケールアウトをアクターによって提供している。
アクターの概要
- Akkaはアクターが中心となる。アクターはメッセージをやりとりし、そのメッセージによって実行する。メッセージはイミュータブル。
- アクターは全て非同期で実行する。
- メッセージの送受信でアプリケーションを構築する。
スケーリングのための2つのアプローチ
双方向での情報のやり取りは、情報をポーリングして取得するのではなく、イベント駆動でイベントが発生したときに通知するアプローチ。 さらにネットワーク上では非同期なメッセージ送受信、ノンブロッキングI/Oをメインにする。
Akkaのアクター
- Akkaのアプリケーションで最初にアクターシステムを作成する。アクターシステムは、トップレベルのアクターを作成できる。アプリケーション内の全てのアクターに対してトップレベルアクターを一つだけ作成するのが一般的なパターン。
- アクターシステムは、作成したトップレベルアクターにアクターのアドレスを返す。これを ActorRef という。ActorRefはアクターにメッセージを送信するために使用できる。
- アクターシステム内でアクターを探したい時は、アクターパス を用いる。URLパス構造とアクターの階層をマッピングできる。
- メッセージはアクターのActorRefに送信される。全てのアクターはメールボックスを持っており、メッセージをそこに保存する。そして、そこから到着順に1つずつ処理していく。
- ディスパッチャーはアクターを呼び出す。このディスパッチャーの種類によって、どのスレッドモデルを使用してメッセージをプッシュするかが決まる。
第2章 最小のAkkaアプリケーション
アプリケーションを構築して、実際に動くところを確認する。個人的にはAkka HTTPのルーティングDSL周りが読めなくてちょっと混乱していた。
また、この中に出てくる各種の要素はあとの章で詳細に説明されるとのことなので、いったんは動かすことと、中身コードの概略を理解するだけに留めた。
第3章 アクターによるテスト駆動開発
Akkaによるアクターモデルは、TDDと相性が良い。
- アクターは振る舞いを持つため、より直接的にテストの対象になる。
- アクターはメッセージングを用いて構築されており、メッセージを送信することで振る舞いを容易にシミュレートできる。これはテストの実行しやすさにつながる。
アクターのテスト
アクターのテストは、通常のオブジェクトのテストよりも困難。
- メッセージ送信は非同期なため、単体テストで期待値をアサートするタイミングが難しい。
- アクターは内部状態を隠蔽し、この状態へのアクセスを許可していない。(ステートレス)などなど...
これを解決するためにも、Akkaのakka-testkitモジュールが役に立つ。
- シングルスレッドでの単体テスト。
TestActorRef
を使用すると、直接アクターインスタンスをテストしたり、通常のオブジェクトをテストするようにシングルスレッド環境でテストができる。 - マルチスレッドの単体テスト。
TestKit
とTestProbe
クラスが提供されている。メッセージの期待値をアサートするためのメソッドも用意されている。 - マルチJVMのテスト(第6章で)。
テスト専用の CallingThreadDispatcher
にディスパッチャーを指定すると、呼び出し元と同じスレッドでアクターを動作させることができる。本番環境に近いのは、TestKit
を用いてマルチスレッドに焦点を当てたテスト。
SilenceActorの例
アクターの振る舞いは外部から直接確認ができない場合。テストしたいことは、アクターがメッセージを処理する際に例外をスローせず、アクターが終了し、内部状態の変化が意図してものであるかどうか。
- シングルスレッドテスト用の
TestActorRef
を作成する。 - 内部アクターを
underlyingActor
で取得し、状態をアサートする。
val silentActor = TestActorRef[SilentActor] silentActor ! SilentMessage("whisper") silentActor.underlyingActor.state must (contain("whisper"))
これをマルチスレッド版にすると以下のようになる。
- マルチスレッドでは、
ActorSystem
を用いる。アクターは常にその生成する方法を指示するためのオブジェクトであるProps
オブジェクトから作られる。 - 状態変化を確認するには、作成したアクターの内部状態をゲットするメッセージを追加し、それを用いて内部状態をテストする。
val silentActor = system.actorOf(Props[SilentActor], "s3") silentActor ! SilentMessage("whisper1") silentActor ! SilentMessage("whisper2") silentActor ! GetState(testActor) expectMsg(Vector("whisper1", "whisper2"))
アクターが呼び出し元のActorRef
を受け取ることで、アクターからメッセージを受け取ることができる。これは一般的で、以下のようにしてコンストラクタにActorRef
を渡す実装が一般的である。コンパニオンオブジェクトでProps
の生成を行うのは、他のアクターからこのアクターの生成時に内部状態を隠蔽できる利点がある。
object SilentActor { def props(receiver: ActorRef) = Props(new SilentActor(receiver)) // SilentActor.props でPropsをgetし、そこから呼び出しもとでインスタンス化する。 // val silentActor = system.actorOf(SilentActor.props, "s3") //... }
SideEffectingActorの例
受信した挨拶文をコンソールに出力するアクターの振る舞いをテストすることを考える。
TestEventListener
: 設定すると、ログ出力される全てのイベントを制御できる。EventFilter
: ログメッセージを確認するために使う。イメージ下記。
EventFilter.info(...).intercept { actor ! message }
しかし、このテストは複雑さが増してしまう。テスト対象のクラスにlistenerとしてActorRefを渡してやり、ログ出力されるたびにlistenerに同じ文章をメッセージ送信してやるとより簡単にテストができる。
双方向のメッセージ
ImplicitSender
トレイトを使用することで、テストで用いる暗黙的なsenderをTestKitのActorRefに変更する。
その他tips
- ミュータブルなデータ構造と組み合わせたvalよりも、イミュータブルなデータ構造を組み合わせたvarを優先する。別のアクターに内部状態を送信し、ミュータブルな状態を共有してしまうのを防ぐ。
receiveWhile()
: 引数に与えた部分関数にマッチしなくなるまで繰り返す。ignoreMsg
: パターンにマッチしたメッセージを無視する。expectNoMsg
: ある一定時間の間にtestActorにもうこれ以上メッセージがこないということをアサートする。
第4章 耐障害性
耐障害性とはなんなのか
実際にはどんなシステムでも可用性の高いシステムを分散環境で構築するのは難しい。そこで let it crashという概念が生まれた。全ての障害の発生を防ぐことはできないため、以下のような点に留意して戦略を組み立てる。
- システムが壊れることは起こり得るので、実行し続けるために耐障害性を持つ必要がある。これは、回復可能な障害によって壊滅的な障害に陥らないことを表す。
- 障害の発生した部分を停止してシステムから切り離してでも、システムの最も重要な機能をできるだけ長く利用可能な状態で維持する。
- 重要なコンポーネントに対して、バックアップを取っておく。
- システムの特定箇所における障害によって、システム全体がクラッシュするようなことがあってはならない。
let it crash
Akkaのアクターは、通常ロジック用と障害回復ロジック用の2つのフローを提供している。障害回復フローは、通常フローのアクターを監視するアクター(=アクターを生成するアクター、スーパーバイザー)で構成される。アクターで例外をキャッチするのではなく、アクターをクラッシュさせるようにする。クラッシュしたアクターのメールボックスは、障害回復フロー内のスーパーバイザーが例外の処理方法を決定するまで保留される。
スーパーバイザーは例外をキャッチするのではなく、クラッシュしたアクターをどうするべきかその原因に基づいて決定する。以下の4つの選択肢がある。
- 再起動:Propsから再生成される。再起動した後に、アクターは引き続きメッセージを処理する。
- 再開:同じアクターインスタンスがメッセージを処理し続け、クラッシュは無視される。
- 停止:メッセージの処理を行わない。
- エスカレート:スーパーバイザーは問題を自身の親アクターに対してエスカレーションする。
障害は隔離され、関心は分離されている。具体的には、通常のアクターのメッセージ処理と、スーパーバイザーの障害からのリカバリーフローは無関係であり、互いに完全に独立して定義し、開発できる。
アクターのライフサイクル
以下の3つのイベントがあり、それぞれライフサイクルの変更イベントが発生した時に呼び出される関数がいくつかある。
- 開始イベント(start)
- 再起動イベント(restart)
- 停止イベント(stop)
それぞれメソッドが定義されており、overrideすることでフックに応じて処理を行うことができる。
ライフサイクルの監視
- アクターのライフサイクルは監視ができる。
ActorContext
にはアクターの停止を監視するためのwatch
メソッドと、監視を解除するためのunwatch
メソッドが存在している。 - アクターがActorRefのwatchメソッドを呼び出すと、そのActorRefを監視することになる。
- 監視対象のアクターが終了すると、
Terminated
メッセージがアクターに送信される。
監督は親アクターから子アクターに対してのみできるが、監視はどのアクターに対してもできる。
監督(supervision)
スーパーバイザーヒエラルキー
アクターがお互いを生成する機能。アクターを生成する全てのアクターは、生成された子アクターのスーパーバイザーになる。この親子関係はずっと変わることはない。子アクターが終了した時のみ、その親子関係が終了する。
最もクラッシュする可能性の高いアクターは、できるだけ低い階層に配置する。これは、より多くのスーパーバイザーが処理できる可能性などがあるためである。
戦略(strategy)
スーパーバイザーは、障害に対してどのように対応するか、戦略を持っている。Default strategyは、アクターが停止した時を除いて、あらゆるExceptionで子アクターを再起動する。さらにAkkaでは、子アクターへの戦略をどのように適用するか2つの選択肢を持っている。
- 1つの子アクターがクラッシュした時、全ての子アクターに対して、一斉に同じ障害回復方法を適用する。
- クラッシュしたアクターのみを対象として障害回復方法を適用する。
デフォルトで定義されている戦略をoverrideすることで、独自の戦略を適用することができる。
第5章 Future
Futureのユースケース
ActorよりもFutureの方がふさわしいケースがあり、それを取り上げていく。Futureのユースケースは以下のようになる。
- 関数の結果を処理するためにブロッキングしない。
- 関数を一度だけ呼び出し、その結果をどこかで使う。
- 多くの関数を合成する。
- 結果の一部のみ使用する。
- 関数をパイプライン化する。
AkkaではaskメソッドのレスポンスとしてFutureを受け取ることになる。Futureの結果はsenderにpipeすると、senderを参照しなくて良くなる。
具体的にAkkaの公式ドキュメントで見てみた。
このドキュメントの例を見ればわかるように、Actorが Future
の実行結果を pipeTo sender
を用いることで、senderに対してFutureの結果を送信することができる。
class UserProxyActor(userData: ActorRef, userActivities: ActorRef) extends Actor { import UserProxyActor._ import akka.pattern.{ ask, pipe } implicit val ec: ExecutionContext = context.dispatcher implicit val timeout = Timeout(5 seconds) def receive = { case GetUserData => (userData ? UserDataActor.Get).pipeTo(sender()) case GetUserActivities => (userActivities ? UserActivityActor.Get).pipeTo(sender()) } }
pipeTo
は ?
と一緒に使うのがよくある使い方。Futureの結果を受け取るために?
でリクエストするのがよくあるからだと思う。
その他tips
recover
メソッドを使うことで、 Futureのなかで例外が発生した時にデフォルト値を返したり、別の例外にして投げたりすることができる。firstCompleteOf
メソッドを使うことで、最初に完了したFutureを取得できる。find
を使うと、条件にマッチするFutureを取得することができる。
第6章 Akkaによるはじめての分散アプリケーション
Akkaはネットワークを介してアプリケーションをスケールアウトする際に、RPCとは異なるアプローチを採用している。表面上はローカル呼び出しと同じように見えるようになっている。
リモートノードのアクターへの参照は、1) パスによってアクターを検索する方法、2) リモートにデプロイする方法。の2種類がある。
パスによってアクターを検索する方法
まずリモート処理を有効化する設定をconfigファイルに記載する。すると、これを読み込んだアクターのActorRefはリモートアクターのものになり、指定したホスト、ポートをリッスンするようになる。
akka { 中略... actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "0.0.0.0" port = 2552 } } 中略... }
リモートアクターへの参照パスは、以下のようにして定まる。ガーディアンは常にuser
である。
$プロトコル//$アクターシステム@$サーバーホスト:$ポート/$ガーディアン/$トップレベルアクター
このパスを文字列としてactionSelection
メソッドに渡すと、リモートに存在するアクターを検索できる。
フロントエンドとバックエンドに分割
フロントエンドの設定には、バックエンドのアクターを参照するための設定セクションを追加する。そうすることで、フロントエンドからバックエンドのアクターへの参照パスを組み立てて参照することができる。
RemoteLookupProxy
の導入
actorFor
メソッドで直接ActorRefを参照するのは、deprecated。ローカルとリモートどちらのActorRefも取得できるが、それぞれActorRefがリモートのものかローカルのものかによって挙動が異なることが大きな理由の一つ。
そこで、RemoteLookupProxy
は、フロントエンドノードからバックエンドノードへの橋渡し役として存在し、
- リモートに存在するアクターシステム自体の障害による停止などをハンドリングする
- リモートに存在するアクター自体の障害による停止などをハンドリングする
などの役割を持つ。そのためにリモートのアクターを監視し、それに応じて再接続を試したり、メッセージを転送したり挙動を変える。
リモートにデプロイする方法
設定ファイルを通してデプロイする方法が推奨されている。これは、アプリケーションを再ビルドすることなくクラスター設定を変更できるため。
アクターへのローカルパスを指定し、デプロイ先のリモートアドレスを追記する必要がある。
actor { provider = "akka.remote.RemoteActorRefProvider" deployment { /boxOffice { remote = "akka.tcp://backend@0.0.0.0:2552" } } }
一応
デプロイとは | 「分かりそう」で「分からない」でも「分かった」気になれるIT用語辞典
第7章 設定とロギングとデプロイ
簡単にまとめるに留める
- 設定ファイルにはデフォルト値を設定でき、それを上書きすることで細かい設定の変更が可能になる。
- 設定ファイルをimportして設定を上書きすることもできる。
- ロギングはAkkaが非同期に出力するため、アプリケーションのパフォーマンスに支障をきたすことは避けられている。
- sbt-native-packagerを使うと非常に簡単にアプリケーションのディストリビューションの作成ができる。
第8章 アクターの構造パターン
アクターベースプログラミングで直面する課題の一つとして、複数の登場人物がそれぞれの作業をどのように並列実行して、それをまとめるか。そしてそれをどのようにコードに落とし込むかということ。Enterprise Integration Patterns: EIP をいくつか実装し、Akkaの特徴を生かしつつ設計することを目指す。
パイプ&フィルターパターン
パイプとは、あるプロセスまたはスレッドがその結果を別のプロセッサーに受け渡して追加の処理を行うことを指す。そのパイプで送られる途中にフィルターが存在し、受け取ったメッセージを次に送るかどうかチェックがなされる。どのパイプも、どのフィルターも、プロセスの追加や順序変更などに耐えられるように、同じインターフェースを持つ必要がある。
Akkaでは、アクターでフィルターを実装し、メッセージングでパイプを実装することになる。
スキャッタギャザーパターン
タスクの並列実行を可能にするパターン。Scatter部分でメッセージを並列に送信し、Gather部分で並列に分散したメッセージを集約する。具体的には、競争パターンと並列協調処理パターンの二つの場合がある。
特に並列協調処理パターンにおいては、片方からメッセージが届いているのにもう片方から届いていない時がある。その状態でActorが再起動するとメッセージが失われてしまうので、preRestart
メソッドで自分自身にメッセージを送信し直すことで解決する。
override def preRestart(reason: Throwable, message: Option[Any]): Unit = { super.preRestart(reason, message) messages.foreach(self ! _) messages.clear() }
ルーティングスリップ(回覧表)パターン
パイプ&フィルターパターンの動的なバージョンと捉えることができる。パイプ&フィルターのルートは静的に決まっていたが、それが受け取ったメッセージによってルートが動的に決定される。ここでも、各ステップのインターフェースが同じでそれぞれのタスクが独立していることが重要。
具体的には、全てのアクターは以下のtraitをミックスインし、sendMessageToNextTask
によって次のアクターへメッセージパッシングする。全てのルートが配列として入っていて、それのheadをタスクとして扱い、tailを次の配列として渡していくところがポイント。
def sendMessageToNextTask(routeSlip: Seq[ActorRef], message: AnyRef) { val nextTask = routeSlip.head val newSlip = routeSlip.tail // 省略... }