ひらめの日常

日常のメモをつらつらと

『Akka実践バイブル』を読んだ(後半)

前半はこちらです

hiramekun.hatenablog.com

第9章 メッセージのルーティング

EIPのうち、ルーターパターンについて。メッセージのルーティングを行う理由がパフォーマンスやスケーリングである場合は、Akkaに組み込まれている最適化されたルーターを使用する。しかしながら、メッセージの内容が主要な関心ごとである場合は、通常のアクターを使って役割ごとにメッセージを振り分けるのが良いとされている。

Akkaのルーターを使った負荷分散

システムのパフォーマンスを向上させるために、異なるアクターに負荷を分散させる。 Akkaには組み込みのルーターが2種類存在している。

プールルーター

以下のようにしてプールルーターは設定ファイルを用いて生成できる。

val router = system.actorOf(
  FromConfig.props(Props(new Rootie(actorRef))),
  "poolRouter"
)
akka.actor.deployment {
  /poolRouter {
    router = balancing-pool
    nr-of-instances = 5
 }
}

リモートにルーティーを作成したい場合は、方法は複数あるが、簡単な方法としてFromConfigRemoteRouterConfigに変えるだけで良い。

また、動的にルーティーのサイズを変更したい場合も、設定ファイルにリサイザー機能のオプションをカスタマイズすることで細かくリサイズの条件を指定できる。

ルーターはルーティーを生成しているため、ルーティに対するスーパバイザーでもある。デフォルトのルーターを使用する場合、ルーティーは常にスーパバイザーに障害をエスカレーションする。ルーターがさらに障害をエスカレーションすると、障害の起こったルーティーのみではなく、ルーター自体が再起動されてしまい、全てのルーティーが再起動されてしまう。そこで、ルーターの生成寺に独自の戦略を与えることで障害の発生したルーティーだけが再起動されるように変更できる。

グループルーター

ルーティーを自分でインスタンス化し、明示的にルーターの管理下に置く。これにより、ルーティーを生成するタイミングの制御が可能になる。
グループの場合は、ルーティーパスのリストを指定することでルーティーを生成する。具体的には、CreatorアクターがRootieアクターをRootie1, Rootie2の二つ生成し、その二つのアクターをルーティーに指定する場合は以下のように設定する。

akka.actor.deployment {
  /groupRouter {
    router = round-robin-group
    routees.paths = [
      "/user/Creator/Rootie1",
      "/user/Creator/Rootie2"
    ]
  }
}

ルーティーが終了してもグループルーターは引き続きルーティーにメッセージを送信するため、ルーティーを管理しているアクターが、子アクターの終了に対してハンドリングする必要がある。

その他

よく出てくるbecomeメソッドについて定義を確認した。定義を見ればわかるように、Receiveを渡すことで、メッセージを受け取った時の挙動を変えることができる。なので、状態を持ったアクターのなかによく現れることがある。

  /**
   * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
   * Replaces the current behavior on the top of the behavior stack.
   */
  def become(behavior: Actor.Receive): Unit = become(behavior, discardOld = true)

第10章 メッセージチャネル

  • point to point channel:今まで扱ってきたアクターは全てこれ。
  • publish/subscribe channel:送信者がメッセージを必要としている受信者を知ることなく、複数の受信者にメッセージを送信する。

publish/subscribe channel

チャネルは、全てのsubscriberがメッセージを受け取れるようにする。受信者が自身でsubscribeするため、動的に受信者の数を変えることができ柔軟に変更できる。

満たされるべき要件は以下の通り。

  • 送信側は、メッセージをpublishできる必要がある。
  • 受信側は、チャネルのsubscribeとunsbscribeができる必要がある。

Akkaのイベントストリーム

Akkaでは EventStreamを使用することでpublish/subscribeできるようになる。全てのアクターシステムには一つの EventStream があり、どのアクターからでも利用できる。アクターは特定のメッセージ型をsubscribeでき、誰かがこれをpublishすると受け取ることができる

subscribeは、送受信するアクターが自分で設定する必要はない。必要なものは、次の2点のみで、これさえあれば任意の場所でsubscribeさせることができる。

  • subscribeするアクターの参照
  • subscribeの設定を行うEventStreamへの参照
system.eventStream.subscribe(
  actorRef,
  classOf[Message]
)

publishは以下のようにして行われる

system.eventStream.publish(msg)

ローカルシステム全体からメッセージを送信し、それを収集するための解決策として、EventStreamは活用できる。例えばActorLoggingは内部的にEventStreamを用いてシステム全体からログを収集している。以下は ActorLoggingapply関数。

def apply[T: LogSource](system: ActorSystem, logSource: T): LoggingAdapter = {
  val (str, clazz) = LogSource(logSource, system)
  new BusLogging(system.eventStream, str, clazz, system.asInstanceOf[ExtendedActorSystem].logFilter)
}

カスタムイベントバス

「条件xを満たす時のみメッセージを送る」という場合を考えてみる。EventStreamではメッセージの型を元に送信するか否かを決定するため、フィルタリングができない。受信先でフィルタリングはできるが、それ以外の方法も考えてみる。独自のpublish/subscribe channel を作成することでこれを実現することができる。

EventBusというインターフェースを用いて作成することができる。EventBusってそもそも何かというと、お互いを知る必要がなく双方向に通知が行えるような仕組みのことのようだ。

RRiBbit – What is an Eventbus

An Eventbus is a mechanism that allows different components to communicate with each other without knowing about each other. A component can send an Event to the Eventbus without knowing who will pick it up or how many others will pick it up. Components can also listen to Events on an Eventbus, without knowing who sent the Events. That way, components can communicate without depending on each other. Also, it is very easy to substitute a component. As long as the new component understands the Events that are being sent and received, the other components will never know.

EventBus には3つのエンティティを指定する。

  • Event: busにpublishされるイベント全てを表す型。
  • Subscriber: イベントバスに登録するsubscriberの型。Akkaの場合はActorRefであり、ActorEventBus をミックスインすることで登録することが多い。
  • Classifier: イベントを送信するときのSubscriberの選択に使用する分類子を定義。

Classificationトレイトをミックスインすることで、classifyメソッドにより、EventからClassifierを抽出する。

  /**
   * Returns the Classifier associated with the given Event
   */
  protected def classify(event: Event): Classifier

つまり、

eventBus.subscribe(actorRef, classifier) // classifierの登録
eventBus.publish(msg) // ここで、msgが上でsubscribeしたclassifierであるならば、subscriberが選択され、実際に送信される。

特殊チャネル

デッドレターチャネル

処理または配信できない全てのメッセージを含むチャネル。(デッドメッセージキューとも呼ばれる。)   このチャネルを監視すると、処理されていないメッセージがわかる。
Akka内部では、DeadLetterListenerpreStartでDeadLetterを受け取るEventStreamをsubscribeする形で実装されている。

  override def preStart(): Unit = eventStream.subscribe(self, classOf[DeadLetter])

保証配信チャネル

リモートアクターを使用する場合、間のネットワークが死んだりしているとメッセージが消失してしまう。ReliableProxyを使うと、この問題が解決し、リモートアクターに対して送信できる可能性が高まる。
今コードを見たら @deprecated("Use AtLeastOnceDelivery instead", "2.5.0") と書いてあったので、非推奨になってるっぽい。

第11章 有限状態マシンとエージェント

状態を扱うための方法として、新しく二つの方法を紹介する。

有限状態マシン(Finite Stage Machine: FSM)

Akkaドキュメントより引用

FSMは、次の式の関係の集合として記述できます。
State(S) x Event(E) -> Actions (A), State(S')
これらの関係は、次のように解釈されます。
状態SでイベントEが発生した場合は、アクションAを実行して状態S'に遷移する必要があります

エージェントを使った状態共有の実装

現在エージェントは非推奨になっており、Akka Typed を使うことが推奨されている。

2.6.0-RC1でstableになったAkka Typedを試してみる - Candy, Vitamin or Painkiller

speakerdeck.com

第12章 ストリーミング

データのストリームとは、終わりがない要素の配列のことで、以下の時に存在している。

  • プロデューサーがストリームに要素を提供
  • コンシューマーがストリームから要素を読み込む

akka-streamは有限のバッファで無条件のストリームを処理する方法を提供する。また、akka-httpでは内部でakka-streamを使用している。

基本的なストリーム処理

以下の3つの存在を考える。

  • 要素のプロデューサー
  • 処理ノード
  • 要素のコンシューマー

sourceとsinkを使う

akka-streamを使うには、以下のステップが必要になる。

  • 処理フローの定義:ストリーム処理コンポーネントのグラフを定義。
  • 処理フローの実行:アクターシステムでの実行。グラフからアクターに変換される。

ファイルをコピーする簡単な例を考える。要素の供給源である Source と要素の吸収源である Sink をつなげることによって、
sourceからsinkにデータを直接送るGraphを定義する。

val source: Source[ByteString, Future[IOResult]] =
  FileIO.fromPath(inputFile)
                                                       
val sink: Sink[ByteString, Future[IOResult]] =
  FileIO.toPath(outputFile, Set(CREATE, WRITE, APPEND))
                                                       
val runnableGraph: RunnableGraph[Future[IOResult]] =
  source to sink

マテリアライズとは

runnableGraph.run() はimplicitな Materializer を必要とする。これは、RunnableGraph をアクターに変換してグラフを実行する。具体的には、

  • グラフに存在する入力と出力が全て接続されているか確認
  • SourceとSinkに紐づくアクターをそれぞれ生成。
  • そのアクターの間にpub/sub関係を結ぶ。

さらに、PublisherとSubscriberの間では、リクエストされた以上のメッセージを送信しない仕組みになっており、メモリがオーバーフローしないように送受信をしている。

SourceとSinkでは、グラフがまてリアライズされた時に補助値を提供する。ファイルの場合はFuture[IOResult]を提供する。どの結果をKeepするかを指定でき、それによってどのマテリアライズされた値を保持するかを指定できる。

フローによるイベントの処理

FlowはSourceとSinkの間に使えるコンポーネントで、全てのストリームロジックをキャプチャする。たくさんの処理を組み合わせてFlowとして構成される。

Flowの定義を見ておく。入力と出力が一つずつ定義されていることがわかる。第3の型指定はMaterializeなので、Flowの補助値(副作用のようなものと捉えている)が得られるものと考えられる。

/**
 * A `Flow` is a set of stream processing steps that has one open input and one open output.
 */
final class Flow[-In, +Out, +Mat]

例えば、ファイルからByteStringを受け取って、それをパースする処理を考える。すると、Flowの各コンポーネントは以下のようになる。

// ファイルから受け取ったByteStringをStringに変換するFlow
val frame: Flow[ByteString, String, NotUsed] =
  Framing.delimiter(ByteString("\n"), maxLine).map(_.decodeString("UTF8"))

// ByteStringをcase classであるEventにマッピングするFlow
val parse: Flow[String, Event, NotUsed] =
  Flow[String].map(LogStreamProcessor.parseLineEx)
    .collect { case Some(e) => e }
    .withAttributes(ActorAttributes.supervisionStrategy(decider))

// 合成されたフロー
val composedFlow: Flow[ByteString, Event, NotUsed] = flow via parse

// runnableグラフの構築
val runnableGraph: RunnableGraph[Future[IOResult]] =
    source.via(composedFlow).toMat(sink)(Keep.right)

ストリームのエラー処理

上記のようなFlowの場合にエラーが発生した時、Futureは失敗した値を返す。そこで、パースが失敗した時にその例外を無視して他の行のパースは行わせる事を考える。ストリームにもスーパバイザー戦略を定義できる。

val decider: Supervision.Decider = {
  case _: LogParseException => Supervision.Resume
  case _ => Supervision.Stop
}
                                                                 
val parse: Flow[String, Event, NotUsed] =
  Flow[String].map(LogStreamProcessor.parseLineEx)
    .collect { case Some(e) => e }
    .withAttributes(ActorAttributes.supervisionStrategy(decider))

BidiFlowとは

2つの入力と2つの出力を持つグラフコンポーネント。多分Bidirectionの略。ソースコードに添付してある図を見るとわかりやすい。

  /**
   * Wraps two Flows to create a ''BidiFlow''. The materialized value of the resulting BidiFlow is determined
   * by the combiner function passed in the second argument list.
   *
   * {{{
   *     +----------------------------+
   *     | Resulting BidiFlow         |
   *     |                            |
   *     |  +----------------------+  |
   * I1 ~~> |        Flow1         | ~~> O1
   *     |  +----------------------+  |
   *     |                            |
   *     |  +----------------------+  |
   * O2 <~~ |        Flow2         | <~~ I2
   *     |  +----------------------+  |
   *     +----------------------------+
   * }}}
   *
   */

つまり、I1が全体の入力。O2が全体の出力とする。そして、上記のO1とI2をインターフェースとして持っているFlowをつなげる事で、任意のフロート接続ができるようにするというもの。具体的には以下のように使う。

val bidiFlow = BidiFlow.fromFlows(inFlow, outFlow)
val flow = bidiFlow.join(filterFlow)

ストリーミングHTTP

実際にakka-httpを使ってPOSTとGETをどのように処理するかを学んだ。詳細は本のソースコードなどに譲る。

マーシャリングとアンマーシャリング

HTTPレスポンスを返す際に、アプリケーションで使われているオブジェクトからHTTPレスポンス用のデータフォーマットに変換する事をマーシャリングという。その逆をアンマーシャリングという。

グラフDSL

入力と出力を任意の数だけ持たせられるグラフDSL。例えば、Eventを受け取って以下の5つのイベントをそれぞれ別々のFlowで処理する場合はこのようなコードになる。

  • Jsonに変換し、ByteStringを返す
  • okのログ出力
  • warningのログ出力
  • errorのログ出力
  • criticalのログ出力
type FlowLike = Graph[FlowShape[Event, ByteString], NotUsed]
                                                                 
def processStates(logId: String): FlowLike = {
  val jsFlow = LogJson.jsonOutFlow
  Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._
      // all logs, ok, warning, error, critical, so 5 outputs
      val bcast = builder.add(Broadcast[Event](5))
      val js = builder.add(jsFlow)
                                                                 
      val ok = Flow[Event].filter(_.state == Ok)
      val warning = Flow[Event].filter(_.state == Warning)
      val error = Flow[Event].filter(_.state == Error)
      val critical = Flow[Event].filter(_.state == Critical)
                                                                 
      bcast ~> js.in
      bcast ~> ok ~> jsFlow ~> logFileSink(logId, Ok)
      bcast ~> warning ~> jsFlow ~> logFileSink(logId, Warning)
      bcast ~> error ~> jsFlow ~> logFileSink(logId, Error)
      bcast ~> critical ~> jsFlow ~> logFileSink(logId, Critical)
                                                                 
      FlowShape(bcast.in, js.out)
    })
}

これまでで考えると、返り値が Flow[Event, ByteString, NotUsed]でないのが違和感を感じるかもしれないが、FlowはGraphを継承している。つまり入力と出力を一つずつ持つ特別な場合のShapeがFlowなのである。

コンシューマーとプロデューサーの仲介

需要と供給のバランスをどうやって取るかについて。バッファーを用いてこれを解決する。

デフォルトではバックプレッシャーが有効になっており、ストリームを処理できる。

Akka Streamsで実装するリアクティブストリーム | Think IT(シンクイット)より引用

バックプレッシャープロトコルには、下流のサブスクライバーが受信してバッファリングできる要素の数が定義されています。バックプレッシャーは、サブスクライバーが処理できる以上の要素を、パブリッシャーがパブリッシュしないことを保証します。

さらには、bufferやexpandのようなメソッドを使って変更を加えることもできる。

第13章 システム統合

  • Alpakkaを使用して、外部システムと連携する。
  • akka-httpを使用してHTTPプロトコルをサポートする。

メッセージエンドポイント

正規化パターン

  • 様々な種類のメッセージを共通の標準化されたメッセージに変換する。
  • システムは、メッセージが様々な外部システムから送られてくる事を気にせずメッセージを処理できる。
  • ルーターを介して、トランスレーターに送り、共通のメッセージに正規化して送るパターンもある。

標準データモデルパターン

システム間の接続要件が増加すると、エンドポイントが多くなり、正規化パターンでは複雑になってしまう。   標準データモデルパターンでは、全てのシステムで共通のインターフェースを実装して、共通メッセージを利用するエンドポイントを持たせる。

エンドポイントが共通形式のメッセージを受信し、そのあと独自のメッセージに変換する。逆にエンドポイントが独自のメッセージを共通形式のメッセージに変換して、共通インターフェースを介して送信する。

標準データモデルパターンが、アプリケーションの個々のデータ形式と外部システムで利用されるデータ形式に間接参照を提供する一方で、正規化パターンは1つのアプリケーションに閉じているという点です。この間接参照がもたらす利点は、新しいアプリケーションをシステムに追加する時に共通メッセージを処理するトランスレーターだけ用意すればよいということです。既存システムの変更は必要ありません。

Alpakkaを用いたエンドポイントの実装

Alpakkaで提供されているコンポーネントを使うと、簡単に外部システムと接続できる。 例として、ディレクトリ内のファイル変更検知や、AMQPを用いたメッセージ送信があげられる。今回は共通形式のメッセージを Order クラスに指定している。それぞれイベントをSourceに変換しているので、toMatRunnableGraphに変換する事でストリーム処理が可能になる。

外部システムからメッセージの受信

Alpakkaを用いてファイルの変更を検知するSourceを生成する。

object FileXmlOrderSource {
    def watch(dirPath: Path): Source[Order, NotUsed] =
      DirectoryChangesSource(dirPath, pollInterval = 500.millis, maxBufferSize = 1000)
        .collect {
          case (path, DirectoryChange.Creation) => path
        }
        .map(_.toFile)
        .filter(file => file.isFile && file.canRead)
        .map(scala.io.Source.fromFile(_).mkString)
        .via(parseOrderXmlFlow)
  }

// stream処理できる
val consumer: RunnableGraph[Future[Order]] =
  FileXmlOrderSource.watch(dir.toPath)
    .toMat(Sink.head[Order])(Keep.right)

AMQPの場合

object AmqpXmlOrderSource {
  def apply(amqpSourceSettings: AmqpSourceSettings): Source[Order, NotUsed] =
    AmqpSource.atMostOnceSource(amqpSourceSettings, bufferSize = 10)
      .map(_.bytes.utf8String)
      .via(parseOrderXmlFlow)
}

// stream処理できる
val consumer: RunnableGraph[Future[Order]] =
  AmqpXmlOrderSource(amqpSourceSettings)
    .toMat(Sink.head)(Keep.right)

AMQP自体なんぞやという感じだったのでこの辺の記事を読んだ。

www.slideshare.net

Advanced Message Queuing Protocol - Wikipedia

外部システムへのメッセージ送信

外部システムへのメッセージ送信も受信時と同様に、今度は AmqpSink を用いれば共通メッセージを介したSinkが生成できる。

HTTP

今回はakka-httpを用いてRESTインターフェースを実装する。

~を使う事で、ルートの定義やディレクティブを組み合わせることができる。以下の例は getOrderpostOrders のいずれかとマッチすると読むことがができる。

val routes = getOrder ~ postOrders

~の定義は以下の通り。

/**
 * Returns a Route that chains two Routes. If the first Route rejects the request the second route is given a
 * chance to act upon the request.
 */
def ~(other: Route): Route = ...

具体的にディレクティブがどのようにリクエストを処理するかを追ってみる。以下はgetOrderの詳細。

 // getはMethodDirectivesの一つ。GETリクエスト以外をrejectする。
def getOrder = get {

  // pathPrefixはPathDirectivesの一つ。PathMatcherを受け取り、
  // スラッシュ以降のパターンマッチしていない部分に適用する。
  pathPrefix("orders" / IntNumber) { id => 
    // IntNumberはNumberMatcher, PathMatcherをextendしている。リクエストのpathから数値を取り出す。

    // onSuccessはFutureDirectivesの一つ。Futureの値を取り出して、inner scopeのrouteを実行する。
    onSuccess(processOrders.ask(OrderId(id))) {
      case result: TrackingOrder =>
        // completeはRouteDirectivesの一つ。引数からリクエストを完了する。
        complete(
          <statusResponse>
            <id>
              {result.id}
            </id>
            <status>
              {result.status}
            </status>
          </statusResponse>
        )
                                               
      case result: NoSuchOrder =>
        complete(StatusCodes.NotFound)
    }
  }
}

第14章 クラスタリング

6章では、決まった数のノードを利用して分散アプリケーションを構築した。詳細はこちら

『Akka実践バイブル』を読んだ(前半) - ひらめの日常

さらに、クラスターを使うと、分散アプリケーションで使用するノード数を動的に増減させることができる。

なぜクラスタリングを用いるか

クラスターは動的なノードのグループ。各ノードはアクターシステムを持っている。クラスターに所属するメンバーノードのリストは現在のクラスターの状態として維持される。アクターシステムはお互いにこの情報を伝達し合う。具体的には次のような機能を持っている。

クラスターメンバーシップ

クラスターはシードノード、マスターノード、ワーカーノードで構成される。

  • シードノード:クラスターを起動するために必要。クラスターの起点であり、他のノードとの最初の接点として機能する。
  • マスターノード:ジョブの制御と監督。
  • ワーカーノード:マスターに仕事を要求し、処理結果をマスターに返す。

シードノードが起動したのちは、全てのノードを独立に依存関係なく起動することができる。

これとは別に、リーダー という責務がある。これは、メンバーノードの状態が Up なのか Down なのかを判断する。そして、実際にクラスターにノードを参加させたり、離脱させたりする。クラスターに存在するどのノードもリーダーになる可能性がある。

障害が起きた時

障害が起きた時 Unreachable という状態になる。クラスターは到達不能なノードを検出する。到達不能なノードがある限り、リーダーはアクションを実行することができないので、まずは到達不能ノードを Down させる必要がある。

クラスターないのノードで起きた障害を通知されたい場合は、 subscribe する事でイベント通知をアクターで受けることができる。

クラスタリングされたジョブの処理

マスターは最初にワーカーを作成してから、メッセージをブロードキャストする必要がある。それはルーターで実現するが、クラスターとルーターを合わせるためには既に存在する有効なプールを ClusterRouterPoolへ渡すことが必要になる。 具体的な流れは次の通り

  • マスターが ClusterRouterPool, BroadcastPool を用いてルーターとなり、ワーカーをルーティーとして生成する。
  • マスター(=ルーター)では、ワーカー(=ルーティー)に対して、jobを始めるというメッセージをboradcastする
  • マスターは、ワーカーからタスク開始したいというリクエストを受信したら、タスクに必要なメッセージを送信する。(ここはルーターの機能は用いず、個々に対して送信する)

ワーカーはマスターに対して Enlistメッセージを送信することで、ジョブへの参加を表明する。マスターではEnlistメッセージで受け取った ActorRef を用いて監視したり、ジョブ終了時に全てのワーカーを停止したりできる。

第15章 アクターの永続化

akka-persistenceモジュールを使って、アクターの状態を永続化する方法について。akka-persistenceモジュールを使うと、クラスターのノードに障害が起きたり置き換えられたとしても継続して動作するようなアプリケーションを構築できる。

クラスター拡張の方法は二つある。

イベントソーシング

イベントソーシングとは

成功した全ての操作をイベントとしてジャーナルに保存する。そのイベント列から指示された操作を実行することで値を得る。

アクターのイベントソーシング

イベントソーシングを用いることによる大きなメリットは、データベースへの書き込みとデータベースへの読み込みを明確に分離できること。アクターを回復するときのみ、ジャーナルから読み取りが発生する。

永続アクター

永続アクターは1)イベントから状態を回復するか、2)コマンドを処理するか、の2つのもノードのうちどちらかで動作する。

  • イベント:アクターが処理を正しく実行したという証跡を残すためのもの。
  • コマンド:アクターに処理を実行させるために送信するメッセージ。

永続アクターではまず以下のことが特徴となる

  • PersistentActor トレイトを利用する
  • receiveメソッドを定義する代わりに、 receiveCommandreceiveRecover の2つのメソッドを定義する必要がある。receiveRecoverでは、アクターの回復中に過去のイベントとスナップショットを受け取るために使う。

persist メソッドでコマンドをイベントとして永続化する。二番目に渡されている引数は、永続化されたイベントを処理する関数であり、今回のupdateState はアクターの計算結果を更新する。

val receiveCommand: Receive = {
    case Add(value) => persist(Added(value))(updateState)
    case Subtract(value) => persist(Subtracted(value))(updateState)
    case Divide(value) => if (value != 0) persist(Divided(value))(updateState)
    case Multiply(value) => persist(Multiplied(value))(updateState)
    case PrintResult => println(s"the result is: ${state.result}")
    case GetResult => sender() ! state.result
    case Clear => persist(Reset)(updateState)
  }

永続化されたイベントを処理する関数は非同期に呼ばれるが、akka-persistenceではこの関数の処理が完了する前に次のコマンドが処理されないようにする。このためにメッセージをいくらか蓄えるために、パフォーマンス上のオーバーヘッドが多少ある。

receiveRecoverでは、コマンドが正しく処理された時と全く同じ処理を実行する必要がある。回復しようとしているアクターと同じ persistenceIdでジャーナルに保存されたイベントは以前と同じ結果を得るために処理される。アクターが起動したり再起動した際に receiveRecoverメソッドが使われる

val receiveRecover: Receive = {
  case event: Event => updateState(event)
  case RecoveryCompleted => log.info("Calculator recovery completed")
}

スナップショット

アクターが回復するまでの時間を短くするにはスナップショットが利用できる。スナップショットは別の SnapshotStore に保存される。アクターの回復時には、最新のスナップショットが渡され、その後にそのスナップショットが保存された時点から発生したイベントが渡される。最新のスナップショット以前のイベントは渡されてこない。

永続クエリー

永続アクターの回復処理以外でジャーナルを検索するためのモジュール。最適なユースケースは、永続アクターから連続的にイベントを読み取り、クエリーに適した形で別のデータベースに保存すること。基本的には2種類のクエリがある。具体的に eventsByPersistenceIdcurrentEventsByPersistenceId の違いを見てみる。

  • eventsByPersistenceId: PersistenceActor が受け取ったイベントをその順番に取得する。このストリームは終了することはなく、新しくアクターがイベントを受け取った時にそのイベントをpushする。
  • currentEventsByPersistenceId: 基本的には eventsByPersistenceId と同じ挙動が、現在の状態まで到達した時点で、このストリームは終了する。

シリアライズ

独自のシリアライザーを設定するには、設定ファイルから指定する。

akka {
  actor {
    serializers {
      basket = "aia.persistence.BasketEventSerializer" // 独自のシリアライザーを登録
      basketSnapshot = "aia.persistence.BasketSnapshotSerializer"
    }
    serialization-bindings {
      "aia.persistence.Basket$Event" = basket // シリアライズが必要なクラスにシリアライザーをバインド
      "aia.persistence.Basket$Snapshot" = basketSnapshot
    }
  }
}

クラスターシングルトンとクラスターシャーディング

概念

クラスターシングルトン: アクターのインスタンスがAkkaクラスター内の同じロールを持つノード上でただ一つだけ存在することを保証した上でそのアクターを実行できる。もしこのクラスターシングルトンが停止した場合、他のノード上で開始する。

シングルトンやシャーディングについて仕組みが理解しづらかったのでこちらの記事を読んだ。非常にわかりやすい記事だった。

Akka Clusterで超レジリエンスを手に入れる | Think IT(シンクイット)

Akka Clusterで超レジリエンスを手に入れる(その2) | Think IT(シンクイット)

クラスターの中にノードがいくつか存在していて、その中でさらにシャードが分かれている。(下記図は Akka Clusterで超レジリエンスを手に入れる(その2)より引用) f:id:thescript1210:20210121191342p:plain

ShardRegion がメッセージを受け取り、それが ShardCoordinator にシャードの位置を聞く。ShardCoordinatorからの回答を用いて、メッセージは目的のシャード・エンティティに届けられる。

ShardCoordinatorクラスタ内で1つのみ存在すべきであり、Akkaのクラスタシングルトンで実装されている。さらに ShardCoordinator は情報を復旧できる必要があるので、 Akka Distributed Data を使用して、データを復元することができるようになっている。

Akka実践バイブルにおけるシャーディング

  • ShardShoppersClusterSharding を起動し、ShardRegion への参照を取得する。
  • ShardRegionはメッセージを受け取った時、シャードである ShardedShopper にメッセージを転送する。
  • ShardedShopper は自身の一意となるentityIdの指定方法と、どのシャードに配置されるかのshardIdの指定方法を明示する。これは ClusterSharding がスタートする時に渡され、どこに何を配置するのかを把握するのに必要となる。
class ShardedShoppers extends Actor {

  ClusterSharding(context.system).start(
    ShardedShopper.shardName,
    ShardedShopper.props,
    ClusterShardingSettings(context.system),
    ShardedShopper.extractEntityId,  // アクターを一意に定めるid
    ShardedShopper.extractShardId // どのシャードに位置するかを定めるid
  )

  def shardedShopper = {
    ClusterSharding(context.system).shardRegion(ShardedShopper.shardName) // shardRegionへの参照を取得
  }

  def receive = {
    case cmd: Shopper.Command =>
      shardedShopper forward cmd 
  }
}