ひらめの日常

日常のメモをつらつらと

『Akka実践バイブル』を読んだ(後半)

前半はこちらです

hiramekun.hatenablog.com

第9章 メッセージのルーティング

EIPのうち、ルーターパターンについて。メッセージのルーティングを行う理由がパフォーマンスやスケーリングである場合は、Akkaに組み込まれている最適化されたルーターを使用する。しかしながら、メッセージの内容が主要な関心ごとである場合は、通常のアクターを使って役割ごとにメッセージを振り分けるのが良いとされている。

Akkaのルーターを使った負荷分散

システムのパフォーマンスを向上させるために、異なるアクターに負荷を分散させる。 Akkaには組み込みのルーターが2種類存在している。

プールルーター

以下のようにしてプールルーターは設定ファイルを用いて生成できる。

val router = system.actorOf(
  FromConfig.props(Props(new Rootie(actorRef))),
  "poolRouter"
)
akka.actor.deployment {
  /poolRouter {
    router = balancing-pool
    nr-of-instances = 5
 }
}

リモートにルーティーを作成したい場合は、方法は複数あるが、簡単な方法としてFromConfigRemoteRouterConfigに変えるだけで良い。

また、動的にルーティーのサイズを変更したい場合も、設定ファイルにリサイザー機能のオプションをカスタマイズすることで細かくリサイズの条件を指定できる。

ルーターはルーティーを生成しているため、ルーティに対するスーパバイザーでもある。デフォルトのルーターを使用する場合、ルーティーは常にスーパバイザーに障害をエスカレーションする。ルーターがさらに障害をエスカレーションすると、障害の起こったルーティーのみではなく、ルーター自体が再起動されてしまい、全てのルーティーが再起動されてしまう。そこで、ルーターの生成寺に独自の戦略を与えることで障害の発生したルーティーだけが再起動されるように変更できる。

グループルーター

ルーティーを自分でインスタンス化し、明示的にルーターの管理下に置く。これにより、ルーティーを生成するタイミングの制御が可能になる。
グループの場合は、ルーティーパスのリストを指定することでルーティーを生成する。具体的には、CreatorアクターがRootieアクターをRootie1, Rootie2の二つ生成し、その二つのアクターをルーティーに指定する場合は以下のように設定する。

akka.actor.deployment {
  /groupRouter {
    router = round-robin-group
    routees.paths = [
      "/user/Creator/Rootie1",
      "/user/Creator/Rootie2"
    ]
  }
}

ルーティーが終了してもグループルーターは引き続きルーティーにメッセージを送信するため、ルーティーを管理しているアクターが、子アクターの終了に対してハンドリングする必要がある。

その他

よく出てくるbecomeメソッドについて定義を確認した。定義を見ればわかるように、Receiveを渡すことで、メッセージを受け取った時の挙動を変えることができる。なので、状態を持ったアクターのなかによく現れることがある。

  /**
   * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
   * Replaces the current behavior on the top of the behavior stack.
   */
  def become(behavior: Actor.Receive): Unit = become(behavior, discardOld = true)

第10章 メッセージチャネル

  • point to point channel:今まで扱ってきたアクターは全てこれ。
  • publish/subscribe channel:送信者がメッセージを必要としている受信者を知ることなく、複数の受信者にメッセージを送信する。

publish/subscribe channel

チャネルは、全てのsubscriberがメッセージを受け取れるようにする。受信者が自身でsubscribeするため、動的に受信者の数を変えることができ柔軟に変更できる。

満たされるべき要件は以下の通り。

  • 送信側は、メッセージをpublishできる必要がある。
  • 受信側は、チャネルのsubscribeとunsbscribeができる必要がある。

Akkaのイベントストリーム

Akkaでは EventStreamを使用することでpublish/subscribeできるようになる。全てのアクターシステムには一つの EventStream があり、どのアクターからでも利用できる。アクターは特定のメッセージ型をsubscribeでき、誰かがこれをpublishすると受け取ることができる

subscribeは、送受信するアクターが自分で設定する必要はない。必要なものは、次の2点のみで、これさえあれば任意の場所でsubscribeさせることができる。

  • subscribeするアクターの参照
  • subscribeの設定を行うEventStreamへの参照
system.eventStream.subscribe(
  actorRef,
  classOf[Message]
)

publishは以下のようにして行われる

system.eventStream.publish(msg)

ローカルシステム全体からメッセージを送信し、それを収集するための解決策として、EventStreamは活用できる。例えばActorLoggingは内部的にEventStreamを用いてシステム全体からログを収集している。以下は ActorLoggingapply関数。

def apply[T: LogSource](system: ActorSystem, logSource: T): LoggingAdapter = {
  val (str, clazz) = LogSource(logSource, system)
  new BusLogging(system.eventStream, str, clazz, system.asInstanceOf[ExtendedActorSystem].logFilter)
}

カスタムイベントバス

「条件xを満たす時のみメッセージを送る」という場合を考えてみる。EventStreamではメッセージの型を元に送信するか否かを決定するため、フィルタリングができない。受信先でフィルタリングはできるが、それ以外の方法も考えてみる。独自のpublish/subscribe channel を作成することでこれを実現することができる。

EventBusというインターフェースを用いて作成することができる。EventBusってそもそも何かというと、お互いを知る必要がなく双方向に通知が行えるような仕組みのことのようだ。

RRiBbit – What is an Eventbus

An Eventbus is a mechanism that allows different components to communicate with each other without knowing about each other. A component can send an Event to the Eventbus without knowing who will pick it up or how many others will pick it up. Components can also listen to Events on an Eventbus, without knowing who sent the Events. That way, components can communicate without depending on each other. Also, it is very easy to substitute a component. As long as the new component understands the Events that are being sent and received, the other components will never know.

EventBus には3つのエンティティを指定する。

  • Event: busにpublishされるイベント全てを表す型。
  • Subscriber: イベントバスに登録するsubscriberの型。Akkaの場合はActorRefであり、ActorEventBus をミックスインすることで登録することが多い。
  • Classifier: イベントを送信するときのSubscriberの選択に使用する分類子を定義。

Classificationトレイトをミックスインすることで、classifyメソッドにより、EventからClassifierを抽出する。

  /**
   * Returns the Classifier associated with the given Event
   */
  protected def classify(event: Event): Classifier

つまり、

eventBus.subscribe(actorRef, classifier) // classifierの登録
eventBus.publish(msg) // ここで、msgが上でsubscribeしたclassifierであるならば、subscriberが選択され、実際に送信される。

特殊チャネル

デッドレターチャネル

処理または配信できない全てのメッセージを含むチャネル。(デッドメッセージキューとも呼ばれる。)   このチャネルを監視すると、処理されていないメッセージがわかる。
Akka内部では、DeadLetterListenerpreStartでDeadLetterを受け取るEventStreamをsubscribeする形で実装されている。

  override def preStart(): Unit = eventStream.subscribe(self, classOf[DeadLetter])

保証配信チャネル

リモートアクターを使用する場合、間のネットワークが死んだりしているとメッセージが消失してしまう。ReliableProxyを使うと、この問題が解決し、リモートアクターに対して送信できる可能性が高まる。
今コードを見たら @deprecated("Use AtLeastOnceDelivery instead", "2.5.0") と書いてあったので、非推奨になってるっぽい。

第11章 有限状態マシンとエージェント

状態を扱うための方法として、新しく二つの方法を紹介する。

有限状態マシン(Finite Stage Machine: FSM)

Akkaドキュメントより引用

FSMは、次の式の関係の集合として記述できます。
State(S) x Event(E) -> Actions (A), State(S')
これらの関係は、次のように解釈されます。
状態SでイベントEが発生した場合は、アクションAを実行して状態S'に遷移する必要があります

エージェントを使った状態共有の実装

現在エージェントは非推奨になっており、Akka Typed を使うことが推奨されている。

2.6.0-RC1でstableになったAkka Typedを試してみる - Candy, Vitamin or Painkiller

speakerdeck.com

第12章 ストリーミング

データのストリームとは、終わりがない要素の配列のことで、以下の時に存在している。

  • プロデューサーがストリームに要素を提供
  • コンシューマーがストリームから要素を読み込む

akka-streamは有限のバッファで無条件のストリームを処理する方法を提供する。また、akka-httpでは内部でakka-streamを使用している。

基本的なストリーム処理

以下の3つの存在を考える。

  • 要素のプロデューサー
  • 処理ノード
  • 要素のコンシューマー

sourceとsinkを使う

akka-streamを使うには、以下のステップが必要になる。

  • 処理フローの定義:ストリーム処理コンポーネントのグラフを定義。
  • 処理フローの実行:アクターシステムでの実行。グラフからアクターに変換される。

ファイルをコピーする簡単な例を考える。要素の供給源である Source と要素の吸収源である Sink をつなげることによって、
sourceからsinkにデータを直接送るGraphを定義する。

val source: Source[ByteString, Future[IOResult]] =
  FileIO.fromPath(inputFile)
                                                       
val sink: Sink[ByteString, Future[IOResult]] =
  FileIO.toPath(outputFile, Set(CREATE, WRITE, APPEND))
                                                       
val runnableGraph: RunnableGraph[Future[IOResult]] =
  source to sink

マテリアライズとは

runnableGraph.run() はimplicitな Materializer を必要とする。これは、RunnableGraph をアクターに変換してグラフを実行する。具体的には、

  • グラフに存在する入力と出力が全て接続されているか確認
  • SourceとSinkに紐づくアクターをそれぞれ生成。
  • そのアクターの間にpub/sub関係を結ぶ。

さらに、PublisherとSubscriberの間では、リクエストされた以上のメッセージを送信しない仕組みになっており、メモリがオーバーフローしないように送受信をしている。

SourceとSinkでは、グラフがまてリアライズされた時に補助値を提供する。ファイルの場合はFuture[IOResult]を提供する。どの結果をKeepするかを指定でき、それによってどのマテリアライズされた値を保持するかを指定できる。

フローによるイベントの処理

FlowはSourceとSinkの間に使えるコンポーネントで、全てのストリームロジックをキャプチャする。たくさんの処理を組み合わせてFlowとして構成される。

Flowの定義を見ておく。入力と出力が一つずつ定義されていることがわかる。第3の型指定はMaterializeなので、Flowの補助値(副作用のようなものと捉えている)が得られるものと考えられる。

/**
 * A `Flow` is a set of stream processing steps that has one open input and one open output.
 */
final class Flow[-In, +Out, +Mat]

例えば、ファイルからByteStringを受け取って、それをパースする処理を考える。すると、Flowの各コンポーネントは以下のようになる。

// ファイルから受け取ったByteStringをStringに変換するFlow
val frame: Flow[ByteString, String, NotUsed] =
  Framing.delimiter(ByteString("\n"), maxLine).map(_.decodeString("UTF8"))

// ByteStringをcase classであるEventにマッピングするFlow
val parse: Flow[String, Event, NotUsed] =
  Flow[String].map(LogStreamProcessor.parseLineEx)
    .collect { case Some(e) => e }
    .withAttributes(ActorAttributes.supervisionStrategy(decider))

// 合成されたフロー
val composedFlow: Flow[ByteString, Event, NotUsed] = flow via parse

// runnableグラフの構築
val runnableGraph: RunnableGraph[Future[IOResult]] =
    source.via(composedFlow).toMat(sink)(Keep.right)

ストリームのエラー処理

上記のようなFlowの場合にエラーが発生した時、Futureは失敗した値を返す。そこで、パースが失敗した時にその例外を無視して他の行のパースは行わせる事を考える。ストリームにもスーパバイザー戦略を定義できる。

val decider: Supervision.Decider = {
  case _: LogParseException => Supervision.Resume
  case _ => Supervision.Stop
}
                                                                 
val parse: Flow[String, Event, NotUsed] =
  Flow[String].map(LogStreamProcessor.parseLineEx)
    .collect { case Some(e) => e }
    .withAttributes(ActorAttributes.supervisionStrategy(decider))

BidiFlowとは

2つの入力と2つの出力を持つグラフコンポーネント。多分Bidirectionの略。ソースコードに添付してある図を見るとわかりやすい。

  /**
   * Wraps two Flows to create a ''BidiFlow''. The materialized value of the resulting BidiFlow is determined
   * by the combiner function passed in the second argument list.
   *
   * {{{
   *     +----------------------------+
   *     | Resulting BidiFlow         |
   *     |                            |
   *     |  +----------------------+  |
   * I1 ~~> |        Flow1         | ~~> O1
   *     |  +----------------------+  |
   *     |                            |
   *     |  +----------------------+  |
   * O2 <~~ |        Flow2         | <~~ I2
   *     |  +----------------------+  |
   *     +----------------------------+
   * }}}
   *
   */

つまり、I1が全体の入力。O2が全体の出力とする。そして、上記のO1とI2をインターフェースとして持っているFlowをつなげる事で、任意のフロート接続ができるようにするというもの。具体的には以下のように使う。

val bidiFlow = BidiFlow.fromFlows(inFlow, outFlow)
val flow = bidiFlow.join(filterFlow)

ストリーミングHTTP

実際にakka-httpを使ってPOSTとGETをどのように処理するかを学んだ。詳細は本のソースコードなどに譲る。

マーシャリングとアンマーシャリング

HTTPレスポンスを返す際に、アプリケーションで使われているオブジェクトからHTTPレスポンス用のデータフォーマットに変換する事をマーシャリングという。その逆をアンマーシャリングという。

グラフDSL

入力と出力を任意の数だけ持たせられるグラフDSL。例えば、Eventを受け取って以下の5つのイベントをそれぞれ別々のFlowで処理する場合はこのようなコードになる。

  • Jsonに変換し、ByteStringを返す
  • okのログ出力
  • warningのログ出力
  • errorのログ出力
  • criticalのログ出力
type FlowLike = Graph[FlowShape[Event, ByteString], NotUsed]
                                                                 
def processStates(logId: String): FlowLike = {
  val jsFlow = LogJson.jsonOutFlow
  Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._
      // all logs, ok, warning, error, critical, so 5 outputs
      val bcast = builder.add(Broadcast[Event](5))
      val js = builder.add(jsFlow)
                                                                 
      val ok = Flow[Event].filter(_.state == Ok)
      val warning = Flow[Event].filter(_.state == Warning)
      val error = Flow[Event].filter(_.state == Error)
      val critical = Flow[Event].filter(_.state == Critical)
                                                                 
      bcast ~> js.in
      bcast ~> ok ~> jsFlow ~> logFileSink(logId, Ok)
      bcast ~> warning ~> jsFlow ~> logFileSink(logId, Warning)
      bcast ~> error ~> jsFlow ~> logFileSink(logId, Error)
      bcast ~> critical ~> jsFlow ~> logFileSink(logId, Critical)
                                                                 
      FlowShape(bcast.in, js.out)
    })
}

これまでで考えると、返り値が Flow[Event, ByteString, NotUsed]でないのが違和感を感じるかもしれないが、FlowはGraphを継承している。つまり入力と出力を一つずつ持つ特別な場合のShapeがFlowなのである。

コンシューマーとプロデューサーの仲介

需要と供給のバランスをどうやって取るかについて。バッファーを用いてこれを解決する。

デフォルトではバックプレッシャーが有効になっており、ストリームを処理できる。

Akka Streamsで実装するリアクティブストリーム | Think IT(シンクイット)より引用

バックプレッシャープロトコルには、下流のサブスクライバーが受信してバッファリングできる要素の数が定義されています。バックプレッシャーは、サブスクライバーが処理できる以上の要素を、パブリッシャーがパブリッシュしないことを保証します。

さらには、bufferやexpandのようなメソッドを使って変更を加えることもできる。

第13章 システム統合

  • Alpakkaを使用して、外部システムと連携する。
  • akka-httpを使用してHTTPプロトコルをサポートする。

メッセージエンドポイント

正規化パターン

  • 様々な種類のメッセージを共通の標準化されたメッセージに変換する。
  • システムは、メッセージが様々な外部システムから送られてくる事を気にせずメッセージを処理できる。
  • ルーターを介して、トランスレーターに送り、共通のメッセージに正規化して送るパターンもある。

標準データモデルパターン

システム間の接続要件が増加すると、エンドポイントが多くなり、正規化パターンでは複雑になってしまう。   標準データモデルパターンでは、全てのシステムで共通のインターフェースを実装して、共通メッセージを利用するエンドポイントを持たせる。

エンドポイントが共通形式のメッセージを受信し、そのあと独自のメッセージに変換する。逆にエンドポイントが独自のメッセージを共通形式のメッセージに変換して、共通インターフェースを介して送信する。

標準データモデルパターンが、アプリケーションの個々のデータ形式と外部システムで利用されるデータ形式に間接参照を提供する一方で、正規化パターンは1つのアプリケーションに閉じているという点です。この間接参照がもたらす利点は、新しいアプリケーションをシステムに追加する時に共通メッセージを処理するトランスレーターだけ用意すればよいということです。既存システムの変更は必要ありません。

Alpakkaを用いたエンドポイントの実装

Alpakkaで提供されているコンポーネントを使うと、簡単に外部システムと接続できる。 例として、ディレクトリ内のファイル変更検知や、AMQPを用いたメッセージ送信があげられる。今回は共通形式のメッセージを Order クラスに指定している。それぞれイベントをSourceに変換しているので、toMatRunnableGraphに変換する事でストリーム処理が可能になる。

外部システムからメッセージの受信

Alpakkaを用いてファイルの変更を検知するSourceを生成する。

object FileXmlOrderSource {
    def watch(dirPath: Path): Source[Order, NotUsed] =
      DirectoryChangesSource(dirPath, pollInterval = 500.millis, maxBufferSize = 1000)
        .collect {
          case (path, DirectoryChange.Creation) => path
        }
        .map(_.toFile)
        .filter(file => file.isFile && file.canRead)
        .map(scala.io.Source.fromFile(_).mkString)
        .via(parseOrderXmlFlow)
  }

// stream処理できる
val consumer: RunnableGraph[Future[Order]] =
  FileXmlOrderSource.watch(dir.toPath)
    .toMat(Sink.head[Order])(Keep.right)

AMQPの場合

object AmqpXmlOrderSource {
  def apply(amqpSourceSettings: AmqpSourceSettings): Source[Order, NotUsed] =
    AmqpSource.atMostOnceSource(amqpSourceSettings, bufferSize = 10)
      .map(_.bytes.utf8String)
      .via(parseOrderXmlFlow)
}

// stream処理できる
val consumer: RunnableGraph[Future[Order]] =
  AmqpXmlOrderSource(amqpSourceSettings)
    .toMat(Sink.head)(Keep.right)

AMQP自体なんぞやという感じだったのでこの辺の記事を読んだ。

www.slideshare.net

Advanced Message Queuing Protocol - Wikipedia

外部システムへのメッセージ送信

外部システムへのメッセージ送信も受信時と同様に、今度は AmqpSink を用いれば共通メッセージを介したSinkが生成できる。

HTTP

今回はakka-httpを用いてRESTインターフェースを実装する。

~を使う事で、ルートの定義やディレクティブを組み合わせることができる。以下の例は getOrderpostOrders のいずれかとマッチすると読むことがができる。

val routes = getOrder ~ postOrders

~の定義は以下の通り。

/**
 * Returns a Route that chains two Routes. If the first Route rejects the request the second route is given a
 * chance to act upon the request.
 */
def ~(other: Route): Route = ...

具体的にディレクティブがどのようにリクエストを処理するかを追ってみる。以下はgetOrderの詳細。

 // getはMethodDirectivesの一つ。GETリクエスト以外をrejectする。
def getOrder = get {

  // pathPrefixはPathDirectivesの一つ。PathMatcherを受け取り、
  // スラッシュ以降のパターンマッチしていない部分に適用する。
  pathPrefix("orders" / IntNumber) { id => 
    // IntNumberはNumberMatcher, PathMatcherをextendしている。リクエストのpathから数値を取り出す。

    // onSuccessはFutureDirectivesの一つ。Futureの値を取り出して、inner scopeのrouteを実行する。
    onSuccess(processOrders.ask(OrderId(id))) {
      case result: TrackingOrder =>
        // completeはRouteDirectivesの一つ。引数からリクエストを完了する。
        complete(
          <statusResponse>
            <id>
              {result.id}
            </id>
            <status>
              {result.status}
            </status>
          </statusResponse>
        )
                                               
      case result: NoSuchOrder =>
        complete(StatusCodes.NotFound)
    }
  }
}

第14章 クラスタリング

6章では、決まった数のノードを利用して分散アプリケーションを構築した。詳細はこちら

『Akka実践バイブル』を読んだ(前半) - ひらめの日常

さらに、クラスターを使うと、分散アプリケーションで使用するノード数を動的に増減させることができる。

なぜクラスタリングを用いるか

クラスターは動的なノードのグループ。各ノードはアクターシステムを持っている。クラスターに所属するメンバーノードのリストは現在のクラスターの状態として維持される。アクターシステムはお互いにこの情報を伝達し合う。具体的には次のような機能を持っている。

クラスターメンバーシップ

クラスターはシードノード、マスターノード、ワーカーノードで構成される。

  • シードノード:クラスターを起動するために必要。クラスターの起点であり、他のノードとの最初の接点として機能する。
  • マスターノード:ジョブの制御と監督。
  • ワーカーノード:マスターに仕事を要求し、処理結果をマスターに返す。

シードノードが起動したのちは、全てのノードを独立に依存関係なく起動することができる。

これとは別に、リーダー という責務がある。これは、メンバーノードの状態が Up なのか Down なのかを判断する。そして、実際にクラスターにノードを参加させたり、離脱させたりする。クラスターに存在するどのノードもリーダーになる可能性がある。

障害が起きた時

障害が起きた時 Unreachable という状態になる。クラスターは到達不能なノードを検出する。到達不能なノードがある限り、リーダーはアクションを実行することができないので、まずは到達不能ノードを Down させる必要がある。

クラスターないのノードで起きた障害を通知されたい場合は、 subscribe する事でイベント通知をアクターで受けることができる。

クラスタリングされたジョブの処理

マスターは最初にワーカーを作成してから、メッセージをブロードキャストする必要がある。それはルーターで実現するが、クラスターとルーターを合わせるためには既に存在する有効なプールを ClusterRouterPoolへ渡すことが必要になる。 具体的な流れは次の通り

  • マスターが ClusterRouterPool, BroadcastPool を用いてルーターとなり、ワーカーをルーティーとして生成する。
  • マスター(=ルーター)では、ワーカー(=ルーティー)に対して、jobを始めるというメッセージをboradcastする
  • マスターは、ワーカーからタスク開始したいというリクエストを受信したら、タスクに必要なメッセージを送信する。(ここはルーターの機能は用いず、個々に対して送信する)

ワーカーはマスターに対して Enlistメッセージを送信することで、ジョブへの参加を表明する。マスターではEnlistメッセージで受け取った ActorRef を用いて監視したり、ジョブ終了時に全てのワーカーを停止したりできる。

第15章 アクターの永続化

akka-persistenceモジュールを使って、アクターの状態を永続化する方法について。akka-persistenceモジュールを使うと、クラスターのノードに障害が起きたり置き換えられたとしても継続して動作するようなアプリケーションを構築できる。

クラスター拡張の方法は二つある。

イベントソーシング

イベントソーシングとは

成功した全ての操作をイベントとしてジャーナルに保存する。そのイベント列から指示された操作を実行することで値を得る。

アクターのイベントソーシング

イベントソーシングを用いることによる大きなメリットは、データベースへの書き込みとデータベースへの読み込みを明確に分離できること。アクターを回復するときのみ、ジャーナルから読み取りが発生する。

永続アクター

永続アクターは1)イベントから状態を回復するか、2)コマンドを処理するか、の2つのもノードのうちどちらかで動作する。

  • イベント:アクターが処理を正しく実行したという証跡を残すためのもの。
  • コマンド:アクターに処理を実行させるために送信するメッセージ。

永続アクターではまず以下のことが特徴となる

  • PersistentActor トレイトを利用する
  • receiveメソッドを定義する代わりに、 receiveCommandreceiveRecover の2つのメソッドを定義する必要がある。receiveRecoverでは、アクターの回復中に過去のイベントとスナップショットを受け取るために使う。

persist メソッドでコマンドをイベントとして永続化する。二番目に渡されている引数は、永続化されたイベントを処理する関数であり、今回のupdateState はアクターの計算結果を更新する。

val receiveCommand: Receive = {
    case Add(value) => persist(Added(value))(updateState)
    case Subtract(value) => persist(Subtracted(value))(updateState)
    case Divide(value) => if (value != 0) persist(Divided(value))(updateState)
    case Multiply(value) => persist(Multiplied(value))(updateState)
    case PrintResult => println(s"the result is: ${state.result}")
    case GetResult => sender() ! state.result
    case Clear => persist(Reset)(updateState)
  }

永続化されたイベントを処理する関数は非同期に呼ばれるが、akka-persistenceではこの関数の処理が完了する前に次のコマンドが処理されないようにする。このためにメッセージをいくらか蓄えるために、パフォーマンス上のオーバーヘッドが多少ある。

receiveRecoverでは、コマンドが正しく処理された時と全く同じ処理を実行する必要がある。回復しようとしているアクターと同じ persistenceIdでジャーナルに保存されたイベントは以前と同じ結果を得るために処理される。アクターが起動したり再起動した際に receiveRecoverメソッドが使われる

val receiveRecover: Receive = {
  case event: Event => updateState(event)
  case RecoveryCompleted => log.info("Calculator recovery completed")
}

スナップショット

アクターが回復するまでの時間を短くするにはスナップショットが利用できる。スナップショットは別の SnapshotStore に保存される。アクターの回復時には、最新のスナップショットが渡され、その後にそのスナップショットが保存された時点から発生したイベントが渡される。最新のスナップショット以前のイベントは渡されてこない。

永続クエリー

永続アクターの回復処理以外でジャーナルを検索するためのモジュール。最適なユースケースは、永続アクターから連続的にイベントを読み取り、クエリーに適した形で別のデータベースに保存すること。基本的には2種類のクエリがある。具体的に eventsByPersistenceIdcurrentEventsByPersistenceId の違いを見てみる。

  • eventsByPersistenceId: PersistenceActor が受け取ったイベントをその順番に取得する。このストリームは終了することはなく、新しくアクターがイベントを受け取った時にそのイベントをpushする。
  • currentEventsByPersistenceId: 基本的には eventsByPersistenceId と同じ挙動が、現在の状態まで到達した時点で、このストリームは終了する。

シリアライズ

独自のシリアライザーを設定するには、設定ファイルから指定する。

akka {
  actor {
    serializers {
      basket = "aia.persistence.BasketEventSerializer" // 独自のシリアライザーを登録
      basketSnapshot = "aia.persistence.BasketSnapshotSerializer"
    }
    serialization-bindings {
      "aia.persistence.Basket$Event" = basket // シリアライズが必要なクラスにシリアライザーをバインド
      "aia.persistence.Basket$Snapshot" = basketSnapshot
    }
  }
}

クラスターシングルトンとクラスターシャーディング

概念

クラスターシングルトン: アクターのインスタンスがAkkaクラスター内の同じロールを持つノード上でただ一つだけ存在することを保証した上でそのアクターを実行できる。もしこのクラスターシングルトンが停止した場合、他のノード上で開始する。

シングルトンやシャーディングについて仕組みが理解しづらかったのでこちらの記事を読んだ。非常にわかりやすい記事だった。

Akka Clusterで超レジリエンスを手に入れる | Think IT(シンクイット)

Akka Clusterで超レジリエンスを手に入れる(その2) | Think IT(シンクイット)

クラスターの中にノードがいくつか存在していて、その中でさらにシャードが分かれている。(下記図は Akka Clusterで超レジリエンスを手に入れる(その2)より引用) f:id:thescript1210:20210121191342p:plain

ShardRegion がメッセージを受け取り、それが ShardCoordinator にシャードの位置を聞く。ShardCoordinatorからの回答を用いて、メッセージは目的のシャード・エンティティに届けられる。

ShardCoordinatorクラスタ内で1つのみ存在すべきであり、Akkaのクラスタシングルトンで実装されている。さらに ShardCoordinator は情報を復旧できる必要があるので、 Akka Distributed Data を使用して、データを復元することができるようになっている。

Akka実践バイブルにおけるシャーディング

  • ShardShoppersClusterSharding を起動し、ShardRegion への参照を取得する。
  • ShardRegionはメッセージを受け取った時、シャードである ShardedShopper にメッセージを転送する。
  • ShardedShopper は自身の一意となるentityIdの指定方法と、どのシャードに配置されるかのshardIdの指定方法を明示する。これは ClusterSharding がスタートする時に渡され、どこに何を配置するのかを把握するのに必要となる。
class ShardedShoppers extends Actor {

  ClusterSharding(context.system).start(
    ShardedShopper.shardName,
    ShardedShopper.props,
    ClusterShardingSettings(context.system),
    ShardedShopper.extractEntityId,  // アクターを一意に定めるid
    ShardedShopper.extractShardId // どのシャードに位置するかを定めるid
  )

  def shardedShopper = {
    ClusterSharding(context.system).shardRegion(ShardedShopper.shardName) // shardRegionへの参照を取得
  }

  def receive = {
    case cmd: Shopper.Command =>
      shardedShopper forward cmd 
  }
}

『エンジニアリング組織論への招待』を読んだ

目的

エンジニアの先輩におすすめされたので、自分も気になっていたしこのタイミングで読んでみました。印象に残ったところと感想を簡単にまとめます。

印象に残ったところ

Chapter 1 思考のリファクタリング

エンジニアリングとは

エンジニアリングは、曖昧さを減らしていく過程である。そしてそれに向かって組織を作る必要がある。

  • 誰かの曖昧な要求からスタートし、それを具体的にしていく過程全てがエンジニアリングという行為。つまりは、曖昧さを減らし、具体性を増やす行為がエンジアリングである
    • 物を実現するというのは、不確実な状態から確実な状態に推移させていく過程だと理解できる。
  • マイクロマネジメント型組織
    • 不確実性の削減が少ししかできない。具体的で細かい指示を必要とする。
  • 自己組織化された組織
    • 不確実性の削減をより多く行うことができる。抽象的で自由度のある指示でも動ける。

不確実性を孕んでいるのは、

  • 未来
  • 他人

であり、これらに向き合って不確実性を少しでも減らしていくことが物事を実現させる手段である。

考え方のフレームワーク

  • 論理的思考の盲点を知る:自分がどんな時に論理的でなくなる可能性があるか知る
    • これは感情的になる瞬間を知り、その影響を少なくすることと同義。
    • 怒りを伝えるのではなく、悲しみを相手に伝えることが重要。
  • 経験主義・仮説思考
    • 複雑で不確実性の高いものに対しては、とりあえず手を動かしてみて次何をすれば良いか考える。
    • わからないということ自体が次の一手への重大なヒントになる。
    • コントロールできる物を操作し、そして観察できる事を通じて、その結果を知識にすることしかできない。
    • 何が仮説なのかを明らかにする。
  • システム思考
    • 同じ目的を持ったチームのメンバーが局所最適の言い争いを発生させることなく全体最適に向かうことができるように。
    • 一次元上の観点から問題を捉えて、システムの全体像を把握していく。
    • 視野を広く、視点を鋭く、視座を高く考える。

何が組織の理不尽を増幅させるか

情報の非対称性と限定合理性が人間の不完全さを加速させる。

f:id:thescript1210:20210112003610p:plain

(画像は本より引用)

組織で求められるコミュニケーション能力とは、コミュニケーションの不確実性を減少させる能力のこと。これによって情報の非対称性と限定合理性を低減させていく。

Chapter 2 メンタリングの技術

自立型人材とメンタリング

  • ある人が自立型人材であるには、上司が部下に自立的に考えてほしい期待値と、部下がどこまで自立的に考えるのが自分の仕事だと捉えているかの二つの期待値が一致している 必要がある。
  • メンタリングとは、自立型人材を作るために、信頼関係の上に期待値を調整して、適切に自己効力感を持てるようなフィードバックループを作り出すこと。

メンターメンティーの関係性

  • 謙虚:お互いに弱さを見せられる
  • 敬意:お互いに敬意を持っている
  • 信頼:お互いに成長の期待を持っている

メンタリングの知識

  • メンタリングでは、見えてない課題に自分から気付かせる事を重視する。
    • 自分自身で気がついた事を自己説得といい、これを重視する。
    • 答えではなく質問でメンティにとっての思考回路の盲点を外していく。
  • 悩みを聞き出し、気づきを促して、「考える」に変えていくことが大事。
  • 問題に対して向かう際に意識する流れ
    • 感情的に固執しているかもしれないので、傾聴する。
      • 全く同じ感情になるのではなく、個人的な感情を理解すること。
    • 客観視できないかもしれないので、可視化する。
      • 些細な言葉の使い方に、認知の歪みを見つけ出すことができる。
    • 前提を変える、リフレーミングをする。
      • 一旦この前提がなかったらどうなりますか?のように聞く。

本当の心理的安全性とは

ここでいうラーニングゾーンに導くことによって、初めて成長を促すことができる。 f:id:thescript1210:20210112004725p:plain

(画像は本より引用)

  • 心理的安全性のポジティブな影響を享受するのであれば、明確に対人リスクをとる事を促す必要がある。
  • メンティが存在することに対して、メンティがした行動に対して、理解をし、受け入れ、感謝を伝える
    • 必要なのは、結果でハンク行動、行動だけでなく存在への承認
    • 頼られるという体験は強烈な自己承認と自己効力感を生み出す。
  • アクノレッジメント:相手に対して興味関心を持ち、変化にいち早く気がつき、言葉や行動を通して伝える
  • 自己開示と感謝の共有。そして自己開示へのフィードバック。

どのようにして成長を促すか

  • 内心だけでなく行動に注目する。
    • 行動は見ることができる。さらにメンティ自身がコントロールできる。
    • SMARTの原則を意識する。(Specific, Measureable, Achievable, Related, Time-Bounded)
  • 能力は習慣の積分。習慣は行動の積分
  • 行動が取れない時は、促進する力よりも阻害する力の方が大きいからだと考える。
  • 行動を促進する要因はフィードバック機会を増やし、適切に承認していき強化する。

Chapter 3 アジャイルなチームの原理

ここからはあまり興味分野ではなかったので、そこまでしっかりとまとめていない。

アジャイル開発においては、最初期には大雑把に見積り、実際の開発工程にどの程度進んだかという実験的な知見をもとに、どの程度の期間がかるのかを推計します。そして、それを繰り返していくことで、徐々に方法不確実性を減少させ、スケジュールの精度を上げるように振る舞います。

Chapter 4 学習するチームと不確実性マネジメント

スケジュール不安とどのように向き合っていくのか、どのように可視化していけばよいのかを中心にマネジメントしていくことで、現場のストレスを削減しながら、同時に経営上のメリットも実現することができます。

Chapter 5 技術組織の力学とアーキテクチャ

権限と責任

  • 組織の情報処理能力を考えた時、人間同士の関係性の問題が大事。
  • 権限の異常には、明示的で連続的なコミュニケーションが必要不可欠。
    • 明示的でない権限は、最も不自由な状態と違いがない
  • 上司と部下の責任の不一致が生じると、情報の非対称性によって組織の情報処理能力は低下する。
  • 言う度にひっくり返されるのに、自分で考えろと言われてしまうのは、不透明な権限理解によるもの。

目標設定

  • 目標を管理するのは、従業員が納得して達成に臨む事を支援するため。
  • OKRでは、100%の努力に対して達成できそうなところを70%のところにし、それよりも高い目標を掲げる。
  • 組織の透明性とは、情報が整合性を持って、組織内に正しく伝達される事。
    • 上から下だけではなく、下がどのような状態で仕事をしているかも伝達される必要がある。

その他

  • 技術的負債は見えてしまえば、非機能要件として管理可能。
  • 根本的な問題が構造上の問題にあると気づけば、対立は消滅する。

感想

自分が仕事をしていて、責任と権限については非常に思うところがあった。確かに自分が任されていたものについても、自分がどれだけの責任を明確に持っているかがはっきりしていないために、自分で悩んでしまい手が止まることが多かったからだ。自分が悩んでいる時は、まずは考えるに移行できるように、自分の責任について明確に期待値を合わせていく必要があると強く感じた。

また、メンタリングについても自分がメンタリングする側/される側どちらの時にも意識しておけることだと感じ、これからに生かしていきたいと思った。

本全体を通じて、

  • エンジニアリングとは不確実性を減らしていくことだ。
  • 不条理は組織の構造や不確実性に対する向き合い方によって生じているのだ。

と言う一貫したメッセージを受け取ることができ、実践するために始業前とかに見返してどんなことができるか考えてみようかと思う。

『Akka実践バイブル』を読んだ(前半)

後半はこちらです

hiramekun.hatenablog.com

目的

とても分厚いAkka実践バイブルを読んでいるので、自分の印象に残ったところをメモ程度にまとめます。全部で17章あるので、前後半で半分に分けて行きます。

第1章 Akkaの紹介

アプリケーションをスケールさせる時は、次の2つが目標となる。

  • 複雑性は可能な限り低く留めておくこと
  • リソースを効率的に使用するいこと

Akkaとは何か?

Lightbendによって開発されたオープンソースプロジェクト。Akkaは並行・分散アプリケーションをよりシンプルに単一のモデルで実装するプログラミングモデルを提供する。そのモデルは アクターモデル と呼ばれている。

Akkaの特徴としては、アプリケーションのスケールアップとスケールアウトをアクターによって提供している。

アクターの概要

  • Akkaはアクターが中心となる。アクターはメッセージをやりとりし、そのメッセージによって実行する。メッセージはイミュータブル。
  • アクターは全て非同期で実行する。
  • メッセージの送受信でアプリケーションを構築する。

スケーリングのための2つのアプローチ

双方向での情報のやり取りは、情報をポーリングして取得するのではなく、イベント駆動でイベントが発生したときに通知するアプローチ。 さらにネットワーク上では非同期なメッセージ送受信、ノンブロッキングI/Oをメインにする。

Akkaのアクター

  • Akkaのアプリケーションで最初にアクターシステムを作成する。アクターシステムは、トップレベルのアクターを作成できる。アプリケーション内の全てのアクターに対してトップレベルアクターを一つだけ作成するのが一般的なパターン。
  • アクターシステムは、作成したトップレベルアクターにアクターのアドレスを返す。これを ActorRef という。ActorRefはアクターにメッセージを送信するために使用できる。
  • アクターシステム内でアクターを探したい時は、アクターパス を用いる。URLパス構造とアクターの階層をマッピングできる。
  • メッセージはアクターのActorRefに送信される。全てのアクターはメールボックスを持っており、メッセージをそこに保存する。そして、そこから到着順に1つずつ処理していく。
  • ディスパッチャーはアクターを呼び出す。このディスパッチャーの種類によって、どのスレッドモデルを使用してメッセージをプッシュするかが決まる。

第2章 最小のAkkaアプリケーション

アプリケーションを構築して、実際に動くところを確認する。個人的にはAkka HTTPのルーティングDSL周りが読めなくてちょっと混乱していた。

また、この中に出てくる各種の要素はあとの章で詳細に説明されるとのことなので、いったんは動かすことと、中身コードの概略を理解するだけに留めた。

第3章 アクターによるテスト駆動開発

Akkaによるアクターモデルは、TDDと相性が良い。

  • アクターは振る舞いを持つため、より直接的にテストの対象になる。
  • アクターはメッセージングを用いて構築されており、メッセージを送信することで振る舞いを容易にシミュレートできる。これはテストの実行しやすさにつながる。

アクターのテスト

アクターのテストは、通常のオブジェクトのテストよりも困難。

  • メッセージ送信は非同期なため、単体テストで期待値をアサートするタイミングが難しい。
  • アクターは内部状態を隠蔽し、この状態へのアクセスを許可していない。(ステートレス)などなど...

これを解決するためにも、Akkaのakka-testkitモジュールが役に立つ。

  • シングルスレッドでの単体テストTestActorRef を使用すると、直接アクターインスタンスをテストしたり、通常のオブジェクトをテストするようにシングルスレッド環境でテストができる。
  • マルチスレッドの単体テストTestKitTestProbe クラスが提供されている。メッセージの期待値をアサートするためのメソッドも用意されている。
  • マルチJVMのテスト(第6章で)。

テスト専用の CallingThreadDispatcherにディスパッチャーを指定すると、呼び出し元と同じスレッドでアクターを動作させることができる。本番環境に近いのは、TestKit を用いてマルチスレッドに焦点を当てたテスト。

SilenceActorの例

アクターの振る舞いは外部から直接確認ができない場合。テストしたいことは、アクターがメッセージを処理する際に例外をスローせず、アクターが終了し、内部状態の変化が意図してものであるかどうか。

  • シングルスレッドテスト用のTestActorRefを作成する。
  • 内部アクターを underlyingActorで取得し、状態をアサートする。
        val silentActor = TestActorRef[SilentActor]
        silentActor ! SilentMessage("whisper")
        silentActor.underlyingActor.state must (contain("whisper"))

これをマルチスレッド版にすると以下のようになる。

  • マルチスレッドでは、ActorSystemを用いる。アクターは常にその生成する方法を指示するためのオブジェクトである Props オブジェクトから作られる。
  • 状態変化を確認するには、作成したアクターの内部状態をゲットするメッセージを追加し、それを用いて内部状態をテストする。
        val silentActor = system.actorOf(Props[SilentActor], "s3")
        silentActor ! SilentMessage("whisper1")
        silentActor ! SilentMessage("whisper2")
        silentActor ! GetState(testActor)
        expectMsg(Vector("whisper1", "whisper2"))

アクターが呼び出し元のActorRefを受け取ることで、アクターからメッセージを受け取ることができる。これは一般的で、以下のようにしてコンストラクタにActorRefを渡す実装が一般的である。コンパニオンオブジェクトでPropsの生成を行うのは、他のアクターからこのアクターの生成時に内部状態を隠蔽できる利点がある。

object SilentActor {
    def props(receiver: ActorRef) = Props(new SilentActor(receiver))
    // SilentActor.props でPropsをgetし、そこから呼び出しもとでインスタンス化する。
    // val silentActor = system.actorOf(SilentActor.props, "s3")

    //...
}

SideEffectingActorの例

受信した挨拶文をコンソールに出力するアクターの振る舞いをテストすることを考える。

  • TestEventListener: 設定すると、ログ出力される全てのイベントを制御できる。
  • EventFilter: ログメッセージを確認するために使う。イメージ下記。
EventFilter.info(...).intercept { actor ! message }

しかし、このテストは複雑さが増してしまう。テスト対象のクラスにlistenerとしてActorRefを渡してやり、ログ出力されるたびにlistenerに同じ文章をメッセージ送信してやるとより簡単にテストができる。

双方向のメッセージ

ImplicitSenderトレイトを使用することで、テストで用いる暗黙的なsenderをTestKitのActorRefに変更する。

その他tips

  • ミュータブルなデータ構造と組み合わせたvalよりも、イミュータブルなデータ構造を組み合わせたvarを優先する。別のアクターに内部状態を送信し、ミュータブルな状態を共有してしまうのを防ぐ。
  • receiveWhile(): 引数に与えた部分関数にマッチしなくなるまで繰り返す。
  • ignoreMsg: パターンにマッチしたメッセージを無視する。
  • expectNoMsg: ある一定時間の間にtestActorにもうこれ以上メッセージがこないということをアサートする。

第4章 耐障害性

耐障害性とはなんなのか

実際にはどんなシステムでも可用性の高いシステムを分散環境で構築するのは難しい。そこで let it crashという概念が生まれた。全ての障害の発生を防ぐことはできないため、以下のような点に留意して戦略を組み立てる。

  • システムが壊れることは起こり得るので、実行し続けるために耐障害性を持つ必要がある。これは、回復可能な障害によって壊滅的な障害に陥らないことを表す。
  • 障害の発生した部分を停止してシステムから切り離してでも、システムの最も重要な機能をできるだけ長く利用可能な状態で維持する。
  • 重要なコンポーネントに対して、バックアップを取っておく。
  • システムの特定箇所における障害によって、システム全体がクラッシュするようなことがあってはならない。

let it crash

Akkaのアクターは、通常ロジック用と障害回復ロジック用の2つのフローを提供している。障害回復フローは、通常フローのアクターを監視するアクター(=アクターを生成するアクター、スーパーバイザー)で構成される。アクターで例外をキャッチするのではなく、アクターをクラッシュさせるようにする。クラッシュしたアクターのメールボックスは、障害回復フロー内のスーパーバイザーが例外の処理方法を決定するまで保留される。

スーパーバイザーは例外をキャッチするのではなく、クラッシュしたアクターをどうするべきかその原因に基づいて決定する。以下の4つの選択肢がある。

  • 再起動:Propsから再生成される。再起動した後に、アクターは引き続きメッセージを処理する。
  • 再開:同じアクターインスタンスがメッセージを処理し続け、クラッシュは無視される。
  • 停止:メッセージの処理を行わない。
  • エスカレート:スーパーバイザーは問題を自身の親アクターに対してエスカレーションする。

障害は隔離され、関心は分離されている。具体的には、通常のアクターのメッセージ処理と、スーパーバイザーの障害からのリカバリーフローは無関係であり、互いに完全に独立して定義し、開発できる。

アクターのライフサイクル

以下の3つのイベントがあり、それぞれライフサイクルの変更イベントが発生した時に呼び出される関数がいくつかある。

  • 開始イベント(start)
  • 再起動イベント(restart)
  • 停止イベント(stop)

それぞれメソッドが定義されており、overrideすることでフックに応じて処理を行うことができる。

ライフサイクルの監視

  • アクターのライフサイクルは監視ができるActorContextにはアクターの停止を監視するためのwatchメソッドと、監視を解除するためのunwatchメソッドが存在している。
  • アクターがActorRefのwatchメソッドを呼び出すと、そのActorRefを監視することになる。
  • 監視対象のアクターが終了すると、Terminatedメッセージがアクターに送信される。

監督は親アクターから子アクターに対してのみできるが、監視はどのアクターに対してもできる

監督(supervision)

スーパーバイザーヒエラルキー

アクターがお互いを生成する機能。アクターを生成する全てのアクターは、生成された子アクターのスーパーバイザーになる。この親子関係はずっと変わることはない。子アクターが終了した時のみ、その親子関係が終了する。

最もクラッシュする可能性の高いアクターは、できるだけ低い階層に配置する。これは、より多くのスーパーバイザーが処理できる可能性などがあるためである。

戦略(strategy)

スーパーバイザーは、障害に対してどのように対応するか、戦略を持っている。Default strategyは、アクターが停止した時を除いて、あらゆるExceptionで子アクターを再起動する。さらにAkkaでは、子アクターへの戦略をどのように適用するか2つの選択肢を持っている。

  • 1つの子アクターがクラッシュした時、全ての子アクターに対して、一斉に同じ障害回復方法を適用する。
  • クラッシュしたアクターのみを対象として障害回復方法を適用する。

デフォルトで定義されている戦略をoverrideすることで、独自の戦略を適用することができる。

第5章 Future

Futureのユースケース

ActorよりもFutureの方がふさわしいケースがあり、それを取り上げていく。Futureのユースケースは以下のようになる。

  • 関数の結果を処理するためにブロッキングしない。
  • 関数を一度だけ呼び出し、その結果をどこかで使う。
  • 多くの関数を合成する。
  • 結果の一部のみ使用する。
  • 関数をパイプライン化する。

AkkaではaskメソッドのレスポンスとしてFutureを受け取ることになる。Futureの結果はsenderにpipeすると、senderを参照しなくて良くなる。

具体的にAkkaの公式ドキュメントで見てみた。

このドキュメントの例を見ればわかるように、Actorが Future の実行結果を pipeTo sender を用いることで、senderに対してFutureの結果を送信することができる。

class UserProxyActor(userData: ActorRef, userActivities: ActorRef) extends Actor {
  import UserProxyActor._
  import akka.pattern.{ ask, pipe }
  implicit val ec: ExecutionContext = context.dispatcher

  implicit val timeout = Timeout(5 seconds)

  def receive = {
    case GetUserData =>
      (userData ? UserDataActor.Get).pipeTo(sender())
    case GetUserActivities =>
      (userActivities ? UserActivityActor.Get).pipeTo(sender())
  }
}

pipeTo? と一緒に使うのがよくある使い方。Futureの結果を受け取るために?でリクエストするのがよくあるからだと思う。

doc.akka.io

その他tips

  • recoverメソッドを使うことで、 Futureのなかで例外が発生した時にデフォルト値を返したり、別の例外にして投げたりすることができる。
  • firstCompleteOfメソッドを使うことで、最初に完了したFutureを取得できる。
  • findを使うと、条件にマッチするFutureを取得することができる。

第6章 Akkaによるはじめての分散アプリケーション

Akkaはネットワークを介してアプリケーションをスケールアウトする際に、RPCとは異なるアプローチを採用している。表面上はローカル呼び出しと同じように見えるようになっている。
リモートノードのアクターへの参照は、1) パスによってアクターを検索する方法、2) リモートにデプロイする方法。の2種類がある。

パスによってアクターを検索する方法

まずリモート処理を有効化する設定をconfigファイルに記載する。すると、これを読み込んだアクターのActorRefはリモートアクターのものになり、指定したホスト、ポートをリッスンするようになる。

akka {
  中略...

  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }

  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "0.0.0.0"
      port = 2552
    }
  }

  中略...
}

リモートアクターへの参照パスは、以下のようにして定まる。ガーディアンは常にuserである。

$プロトコル//$アクターシステム@$サーバーホスト:$ポート/$ガーディアン/$トップレベルアクター

このパスを文字列としてactionSelectionメソッドに渡すと、リモートに存在するアクターを検索できる。

フロントエンドとバックエンドに分割

フロントエンドの設定には、バックエンドのアクターを参照するための設定セクションを追加する。そうすることで、フロントエンドからバックエンドのアクターへの参照パスを組み立てて参照することができる。

RemoteLookupProxyの導入

actorForメソッドで直接ActorRefを参照するのは、deprecated。ローカルとリモートどちらのActorRefも取得できるが、それぞれActorRefがリモートのものかローカルのものかによって挙動が異なることが大きな理由の一つ。

そこで、RemoteLookupProxyは、フロントエンドノードからバックエンドノードへの橋渡し役として存在し、

  • リモートに存在するアクターシステム自体の障害による停止などをハンドリングする
  • リモートに存在するアクター自体の障害による停止などをハンドリングする

などの役割を持つ。そのためにリモートのアクターを監視し、それに応じて再接続を試したり、メッセージを転送したり挙動を変える。

リモートにデプロイする方法

設定ファイルを通してデプロイする方法が推奨されている。これは、アプリケーションを再ビルドすることなくクラスター設定を変更できるため。

アクターへのローカルパスを指定し、デプロイ先のリモートアドレスを追記する必要がある。

actor {
  provider = "akka.remote.RemoteActorRefProvider"
  deployment {
    /boxOffice {
      remote = "akka.tcp://backend@0.0.0.0:2552"
    }
  }
}

一応

デプロイとは | 「分かりそう」で「分からない」でも「分かった」気になれるIT用語辞典

第7章 設定とロギングとデプロイ

簡単にまとめるに留める

  • 設定ファイルにはデフォルト値を設定でき、それを上書きすることで細かい設定の変更が可能になる。
  • 設定ファイルをimportして設定を上書きすることもできる。
  • ロギングはAkkaが非同期に出力するため、アプリケーションのパフォーマンスに支障をきたすことは避けられている。
  • sbt-native-packagerを使うと非常に簡単にアプリケーションのディストリビューションの作成ができる。

第8章 アクターの構造パターン

アクターベースプログラミングで直面する課題の一つとして、複数の登場人物がそれぞれの作業をどのように並列実行して、それをまとめるか。そしてそれをどのようにコードに落とし込むかということ。Enterprise Integration Patterns: EIP をいくつか実装し、Akkaの特徴を生かしつつ設計することを目指す。

パイプ&フィルターパターン

パイプとは、あるプロセスまたはスレッドがその結果を別のプロセッサーに受け渡して追加の処理を行うことを指す。そのパイプで送られる途中にフィルターが存在し、受け取ったメッセージを次に送るかどうかチェックがなされる。どのパイプも、どのフィルターも、プロセスの追加や順序変更などに耐えられるように、同じインターフェースを持つ必要がある。

Akkaでは、アクターでフィルターを実装し、メッセージングでパイプを実装することになる。

スキャッタギャザーパターン

タスクの並列実行を可能にするパターン。Scatter部分でメッセージを並列に送信し、Gather部分で並列に分散したメッセージを集約する。具体的には、競争パターンと並列協調処理パターンの二つの場合がある。

特に並列協調処理パターンにおいては、片方からメッセージが届いているのにもう片方から届いていない時がある。その状態でActorが再起動するとメッセージが失われてしまうので、preRestartメソッドで自分自身にメッセージを送信し直すことで解決する。

override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
  super.preRestart(reason, message)
  messages.foreach(self ! _)
  messages.clear()
}

ルーティングスリップ(回覧表)パターン

パイプ&フィルターパターンの動的なバージョンと捉えることができる。パイプ&フィルターのルートは静的に決まっていたが、それが受け取ったメッセージによってルートが動的に決定される。ここでも、各ステップのインターフェースが同じでそれぞれのタスクが独立していることが重要。

具体的には、全てのアクターは以下のtraitをミックスインし、sendMessageToNextTaskによって次のアクターへメッセージパッシングする。全てのルートが配列として入っていて、それのheadをタスクとして扱い、tailを次の配列として渡していくところがポイント。

def sendMessageToNextTask(routeSlip: Seq[ActorRef], message: AnyRef) {
  val nextTask = routeSlip.head
  val newSlip = routeSlip.tail
  // 省略...
}

2021年のインプット目標

2020年振り返り

入社前にどんな本を読んでいたのか忘れたので、画像認識だけ追加しています(他に何か読んだっけ...)
覚えてる限り読了した本は、新書合わせて18冊でした。24冊ペース行きたいですね。統計のための行列代数をちゃんとノート取ってたら莫大な時間がかかってしまいました。焦らずコツコツやっていきます。

数学/統計

Scala周り

  • Scalaスケーラブルプログラミング
  • Scala関数型デザイン&プログラミング(35%)
  • Akka実践バイブル(50%)

エンジニアリング全般

その他

  • 理論と事例でわかる自己肯定感
  • 進化する勉強法
  • 脳が認める勉強法
  • 東大卒プロゲーマー 論理は結局、情熱にかなわない
  • スタンフォードの自分を変える教室
  • コンサル一年目が学ぶこと
  • アメリカ海軍が実践している「無敵の心」の作り方
  • 図解 人材マネジメント入門
  • 原因と結果の経済学

2021年の目標

24冊読む を目標にやります。現時点で読もうと思っている本を上げておき、随時見直すようにしたいと思います。

数学/統計

  • 統計のための行列代数 上(25%)
  • 統計のための行列代数 下
  • プログラミングのための線形代数
  • ベーシック圏論
  • グラフィカルモデル(15%)
  • モンテカルロ統計計算

Scala周り

  • Akka実践バイブル(50%)
  • sbt in Action
  • Scala関数型デザイン&プログラミング(35%)

エンジニアリング全般

その他

  • WHYから始めよ! インスパイア型リーダーはここが違う
  • 教師崩壊(50%)
  • 子供がつまずかない教師の教え方10の「原理・原則」

『スタンフォードの自分を変える教室』を読んだ

目的

読むのは2回目なのですが、1回目に読んだことを何も覚えてなくて絶望しました。
これを繰り返したくはないので、読んで印象に残った点をまとめておきます。

まとめ&感想

この本を読んで早速、付箋に「なんで自分が勉強を頑張っているのか」を書いて貼っておきました。勉強する前や集中力が切れそうなときに目を通すと、再び机に向かおうと思えます。

「自分がなぜこの行動をしているのか」を思い出すことは、読書の際に本を読み切る際も大事なようで、Daigoさんのyoutubeでも同じようなことが言われていました。この中では、メンタルマップという名前がついていましたが、「なぜこの本を読んでいるのか?」「この本の何を知りたいか?」をメモしていつでも見れるようにしておくと、意志力が高まって読み切ることができるらしいです。


難しい本でも最後まで読み切るノート術

特に自分に刺さったのは、自分をゆるすという観点です。勉強をサボって昼寝してしまった時など、自分を責めて自暴自棄になってたことが多かったことに気づきました。読んだ後は失敗したときに自分を許すように意識して生活するようにしています。確かに失敗した時のメンタルは安定したと感じますが、意志力自体が強くなったかと言われるとあまり実感はないですね...

メモしたこと全部意識するのは大変なので、1日に2つくらい意識して、それを日々変えて実践していくなどすると良いかと思いました。

印象に残ったところ

第1章 やる力、やらない力、望む力

  • 自己コントロールを強化するための最もよい方法は、自分がどのように、そしてなぜ自制心を失ってしまうかを理解すること。
    • この自己コントロールを実現するためには、意志力が必要になる。意志力は「やる力」「やらない力」「望む力」の3つの力が必要。「望む力」とは、やるやらないの意思決定の際に自分が本当に望んでいることを思い出す力のこと。
  • その日に行った選択を振り返ってみて、「自分がいつ目標を達成するための選択、あるいは妨げてしまう選択をしたのか」を分析してみると良い。自分の選択を振り返って意識することで、いい加減な選択の数が減っていく。
  • 瞑想を行うようになると、注意力、集中力、ストレス管理、衝動の抑制、自己認識といった自己コントロールの様々なスキルが向上する。自己コントロールは瞑想と同じで、目標から離れかけている自分に気づき、再び自分を目標へ向かって軌道修正するプロセス

第2章 意志力の本能

  • 人間はよくやりたくないはずのことをやってしまう。一体どんな考えや感情のせいで、本当ならやりたくないはずのことをやりたくなってしまうのか?自分を観察してみると良い。
    • 大体はストレスのせいで自制心が落ちてしまう。
  • 呼吸のペースを1分間に4~6回にすることで心拍数が上がり、自制心が高まる。
  • 運動をしても自己コントロールの機能が向上する。
  • 意志力のチャレンジがうまくいかいない場合は、自分の心に問題があるのではなく、単に脳と体が自己コントロールに適さない状態にあるだけである。
    • 外へ出る。
    • 眠る。
    • 横になって深呼吸をする。

第3章 疲れていると抵抗できない

  • 意志力が弱くなるタイミングを自覚することで、誘惑に負けそうになるのを未然に防ぐことができる。
  • 日々より実行するのが困難な方を選択することで意志力を鍛える。
  • 望む力を作り出すために考えること
    • このチャレンジに成功したらどんないいことがあるだろう?
    • このチャレンジに成功したら、自分の他に誰の利益になるだろう?
    • 頑張っていくうちにだんだんと楽になっていく想像する。

第4章 罪のライセンス

  • モラル・ライセンシング効果:良いことをしたせいでいい気分になり、次に自分が下した決断の悪い点が目に入らなくなる。
  • 自分の言い訳を知ると良い。
    • 自分自身や周りにどんな言い訳や説明をしているか?
    • 成功すると自分を褒め、失敗するとダメだったと思っているか?
  • 自分の進歩ではなく、努力する姿勢に注目する
    • 甘い考えをしている自分に気づいたら、なぜ自分は今頑張っているのか思い出すとよい。
  • 人は目標にふさわしい行動をとる機会が与えられただけでいい気分になり、実際に目標を達成したような満足感を覚えてしまう。
  • 人間は「明日は取り返せる」と考えがちだが、「明日も同じ行動をする」と考えることが大事。
    • ある行動を変えたい場合、その行動自体を変えるのではなく、日によってばらつきが出ないように注意する。

第5章 脳が大きな嘘をつく

  • 欲望のストレスを観察する。
    • 何かを欲しいと思う気持ちのせいで、ストレスや焦りを感じていることに気づく。
  • 「やる力」とドーパミンを結びづける。

第6章 どうにでもなれ

  • 自分に対する慰めの言葉で、自暴自棄になることが緩和される。
  • 自分に厳しくしても意志力は強くならない
    • 自己批判は常にモチベーションの低下や自己コントロールの低下を招く。
    • 自分への思いやりは、やる気の向上や自制心の強化につながる。
  • 「決心するだけ」を楽しんでいないか?
    • "偽りの希望シンドローム"。簡単に目標を諦めてはまた決心する。

第7章 将来を売り飛ばす

  • 将来の価値を低く見積もっていないか?
    • 長期的な利益に反する行動をとりたくなった場合は、目先の快楽に負けてしまったら、あとで手に入るはずの報酬を諦めることになるのだと言い聞かせる。
    • 長期的な報酬が手に入ったところを想像する。
  • 自分の欲求を刺激するものは目の前から隠すことで誘惑を断ち切ることができる。
  • どんな誘惑に対しても必ず10分ほど辛抱してみる
    • 10分でも待つとなると、脳はそれを先の報酬として解釈し、報酬への期待がそれほど起こらないため、目先の快楽に飛びつく生物学的反応も起きない。
    • 実際に自制心が上がった研究もある。
  • 将来のことをリアルに感じるほど、将来の自分が後悔しないような意思決定ができる。

第8章 感染した!ー意志力はうつる

  • 個人の選択は、他人が考えていることや欲しがっているもの、やっていること、期待していることなどに強い影響を受けている。
  • 自分が誰かの行動を真似しているかどうか注意する。
    • 自分より下の人を見て安心するのではなく、意識たかい環境に身を置いて、努力するのを普通にしよう。
  • 他人の欲求に対する免疫反応を強化するには、1日の初めに数分間、自分自身の目標について改めて考えておく。
  • 尊敬する人や好きな人から影響を受ける。
    • お手本にしたい人のことを心に思い浮かべる。
  • プライドや恥などの感情は、理性的な議論よりも自分たちの選択に対して速やかで直接的な影響を及ぼす。
    • 他の人たちが見ているつもりになるか、誘惑に打ち勝ったことを他人に自慢しようと考えると良い。
    • 自分の意志力のチャレンジをみんなに宣言すること。 そうすればモチベーションがますます向上する。

Scalaで累積和

scanLeftはfoldLeftの途中経過の値を保持するような関数。 これを使えば、累積和の計算が簡単にできる。

初期値が一番最初に挿入されるので、元の配列より大きさが+1されていることに注意する。

val a = Array(1, 2, 3)
val d = a.scanLeft(0L)(_ + _)

println(d.mkString("Array(", ", ", ")"))
// Array(0, 1, 3, 6)

Visual Scala Reference はコレクションメソッドを図でわかりやすく解説していておすすめ。 superruzafa.github.io

Scalaで競技プログラミング ABC114-C 755

始めに

今までC++競技プログラミングをしてきたのですが、業務でScalaを書いていることもあり、ScalaAtCoderを解いてみようと思います。大半が解いたことある問題かつ、目的はScalaに慣れることなので、解説は省いています。
勉強途中であることもあり、より良い書き方とかあれば是非是非コメントにアドバイスお願いします!

問題はこちら

atcoder.jp

解答

最初はdfsの外に var ans = 0 を持って、dfsのなかでインクリメントする処理を書いていたが、なるべくvalを使いたいのでdfsが答えを返すように書き直した。割と綺麗に書けて満足。

import scala.io.StdIn

// https://atcoder.jp/contests/abc114/submissions/17722890
object Main extends App {
  val n = StdIn.readLong

  def dfs(now: Long, use7: Boolean, use5: Boolean, use3: Boolean): Int = {
    if (now > n) 0
    else {
      (if (use3 && use5 && use7) 1 else 0) +
        dfs(now * 10 + 3, use7, use5, use3 = true) +
        dfs(now * 10 + 5, use7, use5 = true, use3) +
        dfs(now * 10 + 7, use7 = true, use5, use3)
    }
  }

  println(dfs(0, use7 = false, use5 = false, use3 = false))
}

Submission #17722890 - AtCoder Beginner Contest 114