- はじめに
- ExecutorService
- ThreadPoolExecutor
- ForkJoinPool
- ThreadPoolExecutor vs ForkJoinPool
- ExecutionContext
- これから知りたいこと
はじめに
Scalaの ExecutionContext
について調べていたのですが、JVMのスレッドプール周りの知識が必要だと思ったので、一通り調べた内容をまとめます。
全体として『Javaパフォーマンス』を参考にしつつ、適宜参考資料を添付します。
ExecutionContext
とスレッドプール周りの関係は簡単に以下のように表せます。
ExecutionContext
はインターフェースとしてExecutor
を利用していますが、実際には Executor
を拡張した ExecutorService
を利用することが多いため、ExecutorService
から調べていこうと思います。
ExecutorService
まず、JVMでスレッドやタスクの管理を担っている ExecutorService
について知る必要があります。
ExecutorService
とは、スレッドを利用してタスクを実行するために抽象化されたインターフェースです。利用者は ExecutorService
を通して操作するため、スレッド・タスク・キューを直接操作する必要がありません。ExecutorService
は下図のような仕組みで並列実行を管理してくれます。
- スレッドで実行される単位をタスクと呼ぶ(JavaだとRunnableインタフェースを継承しているやつ)
- グローバルなタスクキューがあり、タスクが追加されていく
- 各スレッドはタスクキューからタスクを追加し、実行する
- 実行中はそのスレッドに対して新しいタスクは追加されない
- あるスレッドが実行中のタスクが終了した場合、タスクキューから新しいタスクが追加される
ExecutorService
がどのようにしてスレッドとタスクを管理しているのかについては、次の動画を見るのがおすすめです。
ExecutorService
の実装には大きく分けて二つの種類があります。 ThreadPoolExecutor
と ForkJoinPool
です。
ThreadPoolExecutor
ThreadPoolExecutor (Java Platform SE 8 )
ThreadPoolExecutor
とはスレッドプールを管理する仕組みで、ExecutorService
を実装しているクラスです。スレッドプールを用いることで、パフォーマンス向上とリソースの最適化を図ります。以下公式ドキュメントより引用
スレッド・プールでは、2つの問題に対処します。まず、タスク当たりの呼出しオーバーヘッドが減少するため、通常は大量の非同期タスクの実行時にパフォーマンスが向上します。また、タスクのコレクションを実行するときに消費されるリソース(スレッドを含む)の境界設定および管理のための方法を提供します。各
ThreadPoolExecutor
も基本的な統計情報(完了したタスクの数など)を保持します
タスクを保持するキューには種類があり、それぞれスレッドを追加する基準が異なります。ThreadPoolExecutor
を利用する際に明示的にキューを直接指定することは少ないですが、キューの種類に応じてスレッドプールの挙動が異なります。Executors.newCachedThreadPool()
, Executors.newFixedThreadPool(int)
, Executors.newSingleThreadExecutor()
のように適切なスレッドプールの挙動を指定して ThreadPoolExecutor
を初期化する必要があります。
キューとスレッドプールの挙動の種類については、次の動画を見るのがおすすめです。
ForkJoinPool
ForkJoinPool (Java Platform SE 8 )
ForkJoinPool
とはJava7で追加された、スレッドプールを管理する仕組みで、ExecutorService
を実装しているクラスです。主に ForkJoinTask
を実行することを目的としています。ForkJoinTask
とは、分割統治のアルゴリズムに基づくタスクのことです。タスクが再帰的に分割が可能で、大量にサブタスクが生成される点が特徴です。
例えばフィボナッチ数の計算をする関数が例として挙げられます
def fibonacci(n: Int): Int = { if (n <= 1) n else fibonacci(n - 1) + fibonacci(n - 2) }
ForkJoinTask
を実装した FibonacciTask
を作成すると、こんな感じに書けます。サブタスクが再帰的に生成されているのがわかります。
// FibonacciTask(n: Int)#solve() の実装 if (n <= 1) { n } else { val f1 = FibonacciTask(n - 1) // サブタスク1 (fork) val f2 = FibonacciTask(n - 2) // サブタスク2 (fork) f1.solve() f2.folve() f1.result + f2.result // サブタスクの結果を結合 (join) }
ここで注意の注意点は、FibonacciTask(n)
が処理を進めるためには、 FibonacciTask(n - 1)
と FibonacciTask(n - 2)
の処理が終了するまで待つ必要があるという点です。
ThreadPoolExecutor
を用いる場合、次の図のようになります。
FibonacciTask(n)
をスレッド1が実行する。サブタスクとしてFibonacciTask(n - 1)
とFibonacciTask(n - 2)
をグローバルにあるタスクキューに追加する- スレッド2が
FibonacciTask(n - 1)
を実行する - スレッド3が
FibonacciTask(n - 2)
を実行する
この状態では、スレッド1は FibonacciTask(n - 1)
と FibonacciTask(n - 2)
が完了して結果を受け取るまで処理を進めることができず、新しいタスクを受け取ることもできません。
このような処理を効率的に行うために、ForkJoinPool
を使用します。ForkJoinPool
では、あるスレッドが新しいタスクを開始した後、そのタスクについて一時停止することができます。そして、そのタスクを一時停止している間、スレッドは他のタスクを実行することができます。
よって、タスクが再帰的に分割できる際は、数個の少ないスレッドによって大量のタスクを処理できます。これが ForkJoinPool
の利点です。
ForkJoinPool
では、グローバルに存在するタスクキューとは別に、スレッドごとにローカルのタスクキューを持っています。これにより、スレッドが生成したサブタスクをスレッド単位で管理することが可能になります。
ここまで ForkJoinPool
の役割とそれに適したタスクについて説明しました。もう一点、ForkJoinPool
の大きな特徴として、他のスレッドのタスクを奪うことができることが挙げられます。これを work stealing と呼んだりします。
スレッドごとにローカルのタスクキューを持っていますが、これが空になった時、他のスレッドのキューを参照して残っているタスクを実行することができます。以下公式ドキュメントより引用
ForkJoinPool
は、主にwork-stealingを使用する点で、他の種類のExecutorService
とは異なります。プール内のすべてのスレッドが、プールに送信されたタスク、他のアクティブなタスクによって作成されたタスク、あるいはその両方を見つけて実行しようとします(1つも存在しない場合は、最終的に作業の待機がブロックされます)。これにより、(ほとんどのForkJoinTask
と同様に)ほとんどのタスクが他のサブタスクを生成する場合や、外部のクライアントからプールに小さいタスクが数多く送信される場合に、効率的な処理が可能になります。特に、コンストラクタでasyncModeをtrueに設定した場合、ForkJoinPool
は結合されないイベント形式のタスクでの使用にも適している可能性があります。
これにより、あるスレッドがタスクの一つに長時間かかっている状態でも、別のスレッドが残りのサブタスクを実行することができます。
ForkJoinPool
の挙動については、次の動画を見るのがおすすめです。
ThreadPoolExecutor vs ForkJoinPool
さて、二種類の ExecutorService
を紹介しましたが、これはどのように使い分けるのが良いのでしょうか。ForkJoinPool
に適している状況を考え、その反対を ThreadPoolExecutor
に当てると考えると理解しやすいと思います。
ForkJoinPool
- 再帰的にタスクがサブタスクへと分解でき、
ForkJoinTask
として実装できるタスク - 各タスクのバランスがアンバランスであり、work stealing によって一連の処理を効率化できるタスク
- 再帰的にタスクがサブタスクへと分解でき、
ThreadPoolExecutor
- IOブロックのような、スレッドを止めてしまうようなタスク
- これは
ForkJoinPool
の利点をあまり受けられない、という理由が大きい。スレッドが止まっているので、work stealing もできないし、他のサブタスクを実行することもできない - 例えば、Slick がIOブロックを管理している
AsyncExecutor
内部でもThreadPoolExecutor
が利用されている。Slick 3.4.0 - slick.util.AsyncExecutor:slick.util.AsyncExecutor)
- これは
- 各タスクのバランスが均等で、どのタスクも当程度の時間がかかると見通されるタスク
- IOブロックのような、スレッドを止めてしまうようなタスク
この辺の使い分けはあまり言及がないのですが、この辺を参考にしました。
- ForkJoinPool vs ThreadPoolExecutor - 映画は中劇
- java - ThreadPoolExecutor vs ForkJoinPool: stealing subtasks - Stack Overflow
- Java - ForkJoinPool vs ExecutorService - GeeksforGeeks
ExecutionContext
Scala で非同期実行の際に必要となる ExecutionContext
は、これまで説明してきた ExecutorService
と同じような働きを持ちます。ExecutionContext
を自作する際は、どの ExecutorService
の実装を利用するか指定する必要があります。
では、Future
をとりあえず動かしたいときに使うExecutionContext.global
はどのような ExecutorService
を利用しているのでしょうか?ExecutionContext
について少しドキュメントを読んでみましょう。
Scala Standard Library 2.13.3 - scala.concurrent.ExecutionContext
The default
ExecutionContext
implementation is backed by a work-stealing thread pool. It can be configured via the following scala.sys.SystemProperties:
scala.concurrent.context.minThreads
= defaults to "1"scala.concurrent.context.numThreads
= defaults to "x1" (i.e. the current number of available processors * 1)
scala.concurrent.context.maxThreads
= defaults to "x1" (i.e. the current number of available processors * 1)
scala.concurrent.context.maxExtraThreads
= defaults to "256"The pool size of threads is then
numThreads
bounded byminThreads
on the lower end andmaxThreads
on the high end.
書いてある通りですが簡単にまとめると、こんな感じです。
- 内部では
ForkJoinPool
( = work-stealing thread pool) を使用している - 並列数は
max(256, CPUコア数)
に設定される
全ての処理がCPU負荷のみでスレッドがIO待ちなどでブロックされない場合は、並列数=コア数でもそこまで問題にはならないですが、スレッドがブロックされる処理がある場合、ブロック中に他のスレッドの処理を実行したほうが効率が良いです。その場合はCPUコア数以上を設定する必要があります(他にも ExecutionContext.global
をあらゆるところで使うと良くないのですが、ここでは一旦省略...参考記事を読んでいただけると)。
Scalaにおける ExecutionContext
周りはこの辺が参考になりました。
- Scala ExecutionContextって何 / Futureはスレッド立ち上げじゃないよ - ましめも
- Future内でThread.sleepはするな - ましめも
- ScalaのFutureに渡す ExecutionContext の挙動がわかりにくいので図解する - セカイノカタチ
これから知りたいこと
- なぜ
ExecutionContext
のデフォルトはForkJoinPool
を使用しているのか- とりあえずChatGPTに聞いてみた。
In Scala 2.13, the default implementation of
ExecutionContext
isForkJoinPool
. There are several reasons for this choice:
ForkJoinPool
is optimized for parallelism and work-stealing, which makes it a good fit forFuture
andPromise
operations that require parallel execution.ForkJoinPool
is a shared resource, which means that multipleExecutionContexts
can use the sameForkJoinPool
instance. This reduces resource consumption and helps prevent contention for system resources.ForkJoinPool
provides a good balance between concurrency and overhead. It is designed to efficiently manage large numbers of threads while minimizing context switching and other overhead.ForkJoinPool
is a standard part of the Java concurrency API, which makes it a familiar and widely-used technology for Java and Scala developers.Overall, the choice of
ForkJoinPool
as the defaultExecutionContext
implementation in Scala 2.13 reflects a balance between efficiency, flexibility, and ease of use
- Akka の dispatcher はデフォルトで
ForkJoinPool
を利用しているが、それはなぜか。アクターモデルとForkJoinPool
の相性の良さとはどの辺なのか