ひらめの日常

日常のメモをつらつらと

Java/Scala: ExecutorServiceとExecutionContextについてまとめる

はじめに

ScalaExecutionContext について調べていたのですが、JVMのスレッドプール周りの知識が必要だと思ったので、一通り調べた内容をまとめます。 全体として『Javaパフォーマンス』を参考にしつつ、適宜参考資料を添付します。

ExecutionContext とスレッドプール周りの関係は簡単に以下のように表せます。

classDiagram class Executor { +execute(command: Runnable): void } class ExecutorService { +submit(task: Callable): Future +shutdown(): void } class ThreadPoolExecutor { } class ForkJoinPool { } class ExecutionContext { +execute(runnable: Runnable): void +reportFailure(t: Throwable): void } Executor <|-- ExecutorService ExecutorService <|-- ThreadPoolExecutor ExecutorService <|-- ForkJoinPool ExecutionContext --> Executor: use

ExecutionContext はインターフェースとしてExecutor を利用していますが、実際には Executor を拡張した ExecutorService を利用することが多いため、ExecutorService から調べていこうと思います。

ExecutorService

まず、JVMでスレッドやタスクの管理を担っている ExecutorService について知る必要があります。

ExecutorServiceとは、スレッドを利用してタスクを実行するために抽象化されたインターフェースです。利用者は ExecutorService を通して操作するため、スレッド・タスク・キューを直接操作する必要がありません。ExecutorService は下図のような仕組みで並列実行を管理してくれます。

  • スレッドで実行される単位をタスクと呼ぶ(JavaだとRunnableインタフェースを継承しているやつ)
  • グローバルなタスクキューがあり、タスクが追加されていく
  • 各スレッドはタスクキューからタスクを追加し、実行する
  • 実行中はそのスレッドに対して新しいタスクは追加されない
  • あるスレッドが実行中のタスクが終了した場合、タスクキューから新しいタスクが追加される

ExecutorService がどのようにしてスレッドとタスクを管理しているのかについては、次の動画を見るのがおすすめです。

www.youtube.com

ExecutorService の実装には大きく分けて二つの種類があります。 ThreadPoolExecutorForkJoinPool です。

ThreadPoolExecutor

ThreadPoolExecutor (Java Platform SE 8 )

ThreadPoolExecutor とはスレッドプールを管理する仕組みで、ExecutorServiceを実装しているクラスです。スレッドプールを用いることで、パフォーマンス向上とリソースの最適化を図ります。以下公式ドキュメントより引用

スレッド・プールでは、2つの問題に対処します。まず、タスク当たりの呼出しオーバーヘッドが減少するため、通常は大量の非同期タスクの実行時にパフォーマンスが向上します。また、タスクのコレクションを実行するときに消費されるリソース(スレッドを含む)の境界設定および管理のための方法を提供します。各ThreadPoolExecutorも基本的な統計情報(完了したタスクの数など)を保持します

タスクを保持するキューには種類があり、それぞれスレッドを追加する基準が異なります。ThreadPoolExecutor を利用する際に明示的にキューを直接指定することは少ないですが、キューの種類に応じてスレッドプールの挙動が異なります。Executors.newCachedThreadPool(), Executors.newFixedThreadPool(int), Executors.newSingleThreadExecutor() のように適切なスレッドプールの挙動を指定して ThreadPoolExecutor を初期化する必要があります。

キューとスレッドプールの挙動の種類については、次の動画を見るのがおすすめです。

www.youtube.com

ForkJoinPool

ForkJoinPool (Java Platform SE 8 )

ForkJoinPool とはJava7で追加された、スレッドプールを管理する仕組みで、ExecutorServiceを実装しているクラスです。主に ForkJoinTask を実行することを目的としています。ForkJoinTask とは、分割統治のアルゴリズムに基づくタスクのことです。タスクが再帰的に分割が可能で、大量にサブタスクが生成される点が特徴です。

例えばフィボナッチ数の計算をする関数が例として挙げられます

def fibonacci(n: Int): Int = {
  if (n <= 1) n
  else fibonacci(n - 1) + fibonacci(n - 2)
}

ForkJoinTask を実装した FibonacciTask を作成すると、こんな感じに書けます。サブタスクが再帰的に生成されているのがわかります。

// FibonacciTask(n: Int)#solve() の実装
if (n <= 1) {
  n
} else {
  val f1 = FibonacciTask(n - 1) // サブタスク1 (fork)
  val f2 = FibonacciTask(n - 2) // サブタスク2 (fork)
  f1.solve()
  f2.folve()
  f1.result + f2.result // サブタスクの結果を結合 (join)
}

ここで注意の注意点は、FibonacciTask(n) が処理を進めるためには、 FibonacciTask(n - 1)FibonacciTask(n - 2) の処理が終了するまで待つ必要があるという点です。

ThreadPoolExecutor を用いる場合、次の図のようになります。

  1. FibonacciTask(n) をスレッド1が実行する。サブタスクとして FibonacciTask(n - 1)FibonacciTask(n - 2)をグローバルにあるタスクキューに追加する
  2. スレッド2が FibonacciTask(n - 1) を実行する
  3. スレッド3が FibonacciTask(n - 2) を実行する

この状態では、スレッド1は FibonacciTask(n - 1)FibonacciTask(n - 2) が完了して結果を受け取るまで処理を進めることができず、新しいタスクを受け取ることもできません。

このような処理を効率的に行うために、ForkJoinPool を使用します。ForkJoinPool では、あるスレッドが新しいタスクを開始した後、そのタスクについて一時停止することができます。そして、そのタスクを一時停止している間、スレッドは他のタスクを実行することができます。

よって、タスクが再帰的に分割できる際は、数個の少ないスレッドによって大量のタスクを処理できます。これが ForkJoinPool の利点です。

ForkJoinPool では、グローバルに存在するタスクキューとは別に、スレッドごとにローカルのタスクキューを持っています。これにより、スレッドが生成したサブタスクをスレッド単位で管理することが可能になります。

ここまで ForkJoinPool の役割とそれに適したタスクについて説明しました。もう一点、ForkJoinPool の大きな特徴として、他のスレッドのタスクを奪うことができることが挙げられます。これを work stealing と呼んだりします。

スレッドごとにローカルのタスクキューを持っていますが、これが空になった時、他のスレッドのキューを参照して残っているタスクを実行することができます。以下公式ドキュメントより引用

ForkJoinPoolは、主にwork-stealingを使用する点で、他の種類のExecutorServiceとは異なります。プール内のすべてのスレッドが、プールに送信されたタスク、他のアクティブなタスクによって作成されたタスク、あるいはその両方を見つけて実行しようとします(1つも存在しない場合は、最終的に作業の待機がブロックされます)。これにより、(ほとんどのForkJoinTaskと同様に)ほとんどのタスクが他のサブタスクを生成する場合や、外部のクライアントからプールに小さいタスクが数多く送信される場合に、効率的な処理が可能になります。特に、コンストラクタでasyncModeをtrueに設定した場合、ForkJoinPoolは結合されないイベント形式のタスクでの使用にも適している可能性があります。

これにより、あるスレッドがタスクの一つに長時間かかっている状態でも、別のスレッドが残りのサブタスクを実行することができます。

ForkJoinPool の挙動については、次の動画を見るのがおすすめです。

www.youtube.com

ThreadPoolExecutor vs ForkJoinPool

さて、二種類の ExecutorServiceを紹介しましたが、これはどのように使い分けるのが良いのでしょうか。ForkJoinPool に適している状況を考え、その反対を ThreadPoolExecutor に当てると考えると理解しやすいと思います。

  • ForkJoinPool
    • 再帰的にタスクがサブタスクへと分解でき、ForkJoinTask として実装できるタスク
    • 各タスクのバランスがアンバランスであり、work stealing によって一連の処理を効率化できるタスク
  • ThreadPoolExecutor
    • IOブロックのような、スレッドを止めてしまうようなタスク
      • これは ForkJoinPool の利点をあまり受けられない、という理由が大きい。スレッドが止まっているので、work stealing もできないし、他のサブタスクを実行することもできない
      • 例えば、Slick がIOブロックを管理している AsyncExecutor 内部でも ThreadPoolExecutor が利用されている。Slick 3.4.0 - slick.util.AsyncExecutor:slick.util.AsyncExecutor)
    • 各タスクのバランスが均等で、どのタスクも当程度の時間がかかると見通されるタスク

この辺の使い分けはあまり言及がないのですが、この辺を参考にしました。

ExecutionContext

Scala で非同期実行の際に必要となる ExecutionContext は、これまで説明してきた ExecutorService と同じような働きを持ちます。ExecutionContext を自作する際は、どの ExecutorService の実装を利用するか指定する必要があります。

では、Future をとりあえず動かしたいときに使うExecutionContext.global はどのような ExecutorService を利用しているのでしょうか?ExecutionContext について少しドキュメントを読んでみましょう。

Scala Standard Library 2.13.3 - scala.concurrent.ExecutionContext

The default ExecutionContext implementation is backed by a work-stealing thread pool. It can be configured via the following scala.sys.SystemProperties:

scala.concurrent.context.minThreads = defaults to "1" scala.concurrent.context.numThreads = defaults to "x1" (i.e. the current number of available processors * 1)

scala.concurrent.context.maxThreads = defaults to "x1" (i.e. the current number of available processors * 1)

scala.concurrent.context.maxExtraThreads = defaults to "256"

The pool size of threads is then numThreads bounded by minThreads on the lower end and maxThreads on the high end.

書いてある通りですが簡単にまとめると、こんな感じです。

  • 内部では ForkJoinPool( = work-stealing thread pool) を使用している
  • 並列数は max(256, CPUコア数) に設定される

全ての処理がCPU負荷のみでスレッドがIO待ちなどでブロックされない場合は、並列数=コア数でもそこまで問題にはならないですが、スレッドがブロックされる処理がある場合、ブロック中に他のスレッドの処理を実行したほうが効率が良いです。その場合はCPUコア数以上を設定する必要があります(他にも ExecutionContext.global をあらゆるところで使うと良くないのですが、ここでは一旦省略...参考記事を読んでいただけると)。

Scalaにおける ExecutionContext 周りはこの辺が参考になりました。

これから知りたいこと

  • なぜ ExecutionContext のデフォルトは ForkJoinPool を使用しているのか
    • とりあえずChatGPTに聞いてみた。

In Scala 2.13, the default implementation of ExecutionContext is ForkJoinPool. There are several reasons for this choice:

  1. ForkJoinPool is optimized for parallelism and work-stealing, which makes it a good fit for Future and Promise operations that require parallel execution.
  2. ForkJoinPool is a shared resource, which means that multiple ExecutionContexts can use the same ForkJoinPool instance. This reduces resource consumption and helps prevent contention for system resources.
  3. ForkJoinPool provides a good balance between concurrency and overhead. It is designed to efficiently manage large numbers of threads while minimizing context switching and other overhead.
  4. ForkJoinPool is a standard part of the Java concurrency API, which makes it a familiar and widely-used technology for Java and Scala developers.

Overall, the choice of ForkJoinPool as the default ExecutionContext implementation in Scala 2.13 reflects a balance between efficiency, flexibility, and ease of use

  • Akka の dispatcher はデフォルトで ForkJoinPool を利用しているが、それはなぜか。アクターモデルForkJoinPool の相性の良さとはどの辺なのか