このエントリーは Java Advent Calendar の9日目のエントリーです。
qiita.com
Virtual ThraedはJava 19でPreview (JEP 425)、Java 20でSecond Preview (JEP 436)となり、うまくいけば次のLTSであるJava 21で導入予定です。
パフォーマンスを考える時に、一般的にはスループットと応答性の2つがあります。スループットは単位時間あたりにどのくらいリクエストをさばけるか、応答性は処理のリクエストから結果が帰るまでの時間です。Virtual Threadのこの2者のうち、スループットを向上させるために導入されます。
では、なぜ今になってVirtual Threadが導入されるのかということを、歴史を振り返りながら考えてみるのがこのエントリーです。
いにしえの時代 - Java 1.0からJ2SE 1.4
Javaが発表されたのが1995年。その当時の代表的なCPUといえばIntel Pentiumです。Pentiumのリリースが1993年、Pentium Proが1995年、Pentium IIが1997年です。
つまり、その当時のPCはほとんどがシングルCPU、もちろんシングルコアです。
そんな時代にあって、Javaはマルチスレッドを標準の言語機能としたことは驚きです。当然ながら、その当時はコアは1つなのでパラレル処理ではなく、コンカレント処理になります。
Javaのスレッドは、OSのスレッドに対応します(初期のSolarisはちょっと違いましたが、J2SE 1.2でOSのスレッドを使うようになりました)。Javaのスレッドが複数あったとしても、CPUは1つなのでOSのスレッドは1つです。したがって、Javaのスレッドをすべて実行するにはスレッド切り替えが必要です。このスレッドの切り替えのことをコンテキストスイッチと呼びます。
このコンテキストスイッチは比較的重い処理なので、頻繁に行うと性能劣化の原因になります。
とはいうものの、複数のスレッドを実行するには何らかの実行スケジューリングが必要になります。スレッドのスケジューリングにはラウンドロビンやタイムシェアリングなどがあります。
Javaのそれは単純で、どのようにスレッドが実行されるかはJVMSには明記されていません。スレッドに優先度も設定できますが、優先度が高いからといって優先的に実行されるとは限りません。
また、やっかいだったのが、タスクを記述するためのRunnableインタフェースは返り値を返せないところです。そのため、処理の結果は複数のスレッドでアクセスできるように同期化したオブジェクトを用意する必要がありました。
同期化のために使用できたのがsynchronizedブロック(もしくはsynchronizedメソッド)です。
synchronizedブロックはモニタというロックを使用して同期化を行います。synchronizedメソッドの場合、ロックするオブジェクトは指定しませんが、thisがロック対象になります。
たとえば、スレッド間のやりとりによく使用されるブロッキングキューを作成してみましょう。
ブロッキングキューはキューがいっぱいの時に要素を追加しようとすると、要素が追加できるまで処理をブロックします。同様に要素を取り出そうとした時に要素がなければ、要素が追加されるまで処理をブロックするキューです。
Javaが標準で提供しているブロッキングキューのArrayBlockingQueueクラスなどはかなり複雑なので、ここではキューのサイズが1つのシンプルなブロッキングキューを作ってみます。
キューがいっぱいの時や、空の時に待ち状態にするにはwaitメソッドを使用します。そして、Object.wait状態にあるスレッドを起こすには、Object.notifyAllメソッドを使用します。notifyメソッドもあるのですが、notifyメソッドだと1つの目的のスレッドを起こせるとは限らないので、通常はすべてのスレッドを起こすnotifyAllメソッドを使用します。
public class SingleBlockingQueue<T> {
private T x;
public synchronized void push(T x) throws InterruptedException {
while (this.x != null) {
System.out.println("Wait - push: " + x);
wait();
System.out.println("Wake up - push");
}
System.out.println("NotifyAll - push: " + x);
this.x = x;
notifyAll();
}
public synchronized T pull() throws InterruptedException {
while (this.x == null) {
System.out.println("Wait - pull");
wait();
System.out.println("Wake up - pull");
}
T result = x;
x = null;
System.out.println("NotifyAll - pull: " + result);
notifyAll();
return result;
}
}
notifyAllメソッドで起こされるのはすべてのスレッドなので、本当に起こされるべきスレッドではないこともあります。そこで、while文でチェックを毎回行っているわけです。
では、このSingleBlockingQueueクラスを使ってみましょう。
public class QueueTest {
static SingleBlockingQueue<String> queue = new SingleBlockingQueue<>();
public static void main(String[] args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
try {
queue.push("A");
queue.push("B");
queue.push("C");
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
queue.pull();
queue.pull();
queue.pull();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
これを実行すると次のようになります。
> java QueueTest
NotifyAll - push: A
Wait - push: B
NotifyAll - pull: A
Wait - pull
Wake up - push
NotifyAll - push: B
Wait - push: C
Wake up - pull
NotifyAll - pull: B
Wait - pull
Wake up - push
NotifyAll - push: C
Wake up - pull
NotifyAll - pull: C
Aをpushした後、Bをpushしようとしますが、すでに要素がキューにあるのでwaitします。すると、別スレッドがAをpullし、notifyAllメソッドをコールします。
キューに要素がなくなったので、waitしていたスレッドがnotifyAllで起き、Bをpushします。
といような流れで処理が進みます。
このように、synchronizedを使用するとアクセスできるスレッドを制限することができます。しかし、あるスレッドがsynchronizedブロックを実行している間、他のスレッドは処理待ち状態になってしまいます。
この当時は、想定しているスレッド数はたかがしれていました。しかし、同時に動作できるスレッド数が増えてくると、synchronizedブロックがボトルネックになってしまいます。
Java SE 1.0から1.4までの時代をまとめると
- スレッドを実行するには、OSのスレッドとの対応付けが必要
- スレッドの実行スケジューリングはシンプルなものしかなかった
- 必要に応じて、明示的に他のスレッドに処理を移す処理を記述する
- タスクはRunnableインタフェースで記述するため、処理の結果を直接返すことはできない
- 複数のスレッドからアクセスするにはsynchronizedが必要
マルチコア黎明期 - Java 5, Java 6
Java 5がリリースされたのは2004年、Java 6は2006年。このころ、ちょうどAMDやIntelがマルチコアのCPUをリリースし始めたころです。
AMDがAthlon64 X2で先行し、Intelが1つのパッケージに2つのダイをのせたPentium Dをリリースし、その後にちゃんとしたマルチコアのCore 2をリリースしたなんてこともありました。また、Intelはハイパースレッディングで1つのコアで2つのスレッドを処理できるようになっています。
それに合わせたわけではないと思いますが、Java 5ではご存じの通りConcurrency Utilities (java.util.concurrentパッケージ)が導入され、マルチスレッドに関するAPIが大幅に拡充されました。
主なAPIをあげてみましょう
- スレッドプール
- 非同期処理を結果を返すCallableとFuture
- 多様なロックのAPI
- スレッドセーフなコレクション
- プリミティブ型に対するアトミック処理
スレッドプールであるExecutorServiceインタフェースが導入されたことで、スレッドとタスクが切り離され、スレッドを直接生成する必要はなくなりました。
Executorsクラスが作成するスレッドプールはJava 5では4種類ありましたが、一般的には以下の2つのメソッドで作成するスレッドプールを使用します。
- newFixedThreadPoolメソッド
- newCachedThreadPoolメソッド
newFixedThreadPoolメソッドで作成するスレッドプールはメソッド名の通り、指定した数しかスレッドを作成しません。通常はマシンのトータルのCPUのコア数よりも少ないスレッド数を指定します。トータルのコア数を超えるスレッドを作成してしまうとコンテキストスイッチが発生してしまいます。
スレッドに割り当てられないタスクはキューで管理されます。タスクがスレッドに割り当てられれば、コンテキストスイッチが発生しないので応答性を高めることができます。
一方のnewChachedThreadPoolメソッドではタスクが登録された時にスレッドが余っていれば新たにスレッドを作成することはありません。しかし、余っているスレッドがない場合はスレッドを作成して、タスクに割り当てます。
このスレッドプールはWebアプリケーションなどでリクエストごとにスレッドを作成するThread per Request方式で使われることが多くあります。Thread per Requestはスループットを向上させることが可能です。しかし、応答性は低下します。また、リクエストごとにThreadオブジェクトを生成してしまうので、ヒープの消費も大きくなります。
ExecutorServiceインタフェースで扱えるタスクには、Runnableインタフェースに加え、Callableインタフェースが導入されました。Callableインタフェースを使うと非同期処理の結果を返せるようになります。これに伴い、非同期処理の結果を取得するためのFutureインタフェースが導入されています。
こんな感じで使います。
ExecutorService pool = Executors.newFixedThreadPool(4);
Future<String> future = pool.submit(new Callable<>() {
@Override
public String call() throws Exception {
String result = // 何らかの処理
return result;
}
});
String result = future.get();
pool.shutdown();
結果が返せるということは、複数のスレッドで結果をやりとりするためにだけ使っていたsynchronizedブロック(もしくはsynchronizedメソッド)を使用しなくてもいいということです。synchronizedが減らせれば、マルチスレッドのボトルネックを減らすことができます。
ただし、Future.getメソッドは、非同期処理が完了して値が戻るまでブロックしてしまいます。この点は気を付ける必要があります。
さらに、セマフォなどのロックのAPIが導入されています。
特にReentrantLockクラスはsynchronizedと同様の動作をするので、synchrnozedを減らすことができます。ReentrantLockクラスは実行時に必要のないロックはロックを外すなどの最適化が行われるため、synchnorzedよりも効果的です。
他のスレッドセーフなコレクションやアトミック処理APIはスレッド数が少ない場合は有効に使えるのですが、多くのスレッドが同時にアクセスするような場合はボトルネックになりがちです。この当時はまだ同時に扱えるスレッドが少なかったので、よかったのですが、コア数が増えて同時に動作するスレッドが増えれば増えるほど、使い方を考えなくてはいけません。というか、多くのスレッドが同時にアクセスするような場合は、なるべく使わない方が賢明です。
さて、Java SE 5, 6時代をまとめると。
- スレッドプールは応答性重視とスループット重視の2種類
- Callable/Futureの導入で非同期処理の結果を返せる
- 非同期処理の結果の受け渡しのためにだけ使用していたsynchronizedは減らせる
- synchronizedではなく、ロックAPIを使用することでロックの最適化が可能
マルチコア実用期 - Java SE 7
Java 6がリリースされた後、ラムダ式導入でもめたり、Sun MircrosystemsがOracleに買収されたりなどあって、Java 7のリリースは大幅に遅れ、2011年にやっとリリースされました。結局、ラムダ式はJava 8に延期されたのはご存じのとおりです。
この当時、インテルは2010年に6コアのCPUをリリース、一方のAMDは2011年に8コアのCPUをリリースしています。ノートPCでもマルチコアが当たり前になってきたのもこの頃ですね。
さて、ラムダ式がJava 8にスリップしたということで、Java 7ではあまり大きな変化はないように思えますが、マルチスレッドに関しては大きな変化がありました。それがFork/Join Frameworkの導入です。
Fork/Join Frameworkは主に計算処理や再帰処理の効率化を行うために導入されました。たとえば、ソートや行列計算などです。
たとえば、Java SE 6まで標準APIで使われていたマージソートを考えてみましょう。
マージソートはソートを行う範囲を半分に分割していき、ソートする範囲を小さくしていきます。小さい範囲でまずソートを行い、それを組み合わせて徐々に大きい範囲のソートを行うアルゴリズムです。
このような処理の方法を分割統治法と呼びます。
現在のJavaの標準APIのソートはティムソートに変更されていますが、ティムソートも分割統治法を用いたソートアルゴリズムです。
分割統治法で分割したタスクを非同期に実行するようにすれば、マルチスレッドで処理することができます。しかし、Java 6までのスレッドプールはタスクスケジューリングが単純で、多くのタスクを非同期に実行させなければならない分割統治法では有効にスレッドを活用することができません。
そこで、Fork/Join Frameworkでは、Work Stealingを採用したタスクスケジューリングが導入されました。
それまではタスクを1つのキューで管理していましたが、Work Stealingではスレッドごとにタスクキューを持ちます。このキューはFIFOではなく、両端キューで構成されます。
スレッドが自身のタスクキューからタスクを取り出す場合は、キューの先頭から取り出します。
もし、あるスレッドのタスクキューが空になった場合、そのスレッドは他のスレッドのタスクキューからタスクを取り出す(他のスレッドから取り出すのでSteal、つまり盗むということです)ことができます。この時、タスクキューの先頭から取り出すのではなく、キューの最後から取り出します。
分割統治法では大きい粒度のタスクの処理中に分割したタスクを再びキューに登録していきます。このことは、タスクキューの先頭に近いほどタスクが大きく、最後になればなるほどタスクが小さくなることを示しています。
このため、他のスレッドが盗んでいくタスクは小さい粒度で短時間で処理ができるはずです。このようにすることで、複数のスレッドで効率よく分割したタスクを処理していくことができます。
Work Stealing方式のスレッドプールは、Java 8で導入されるパラレルストリームでも使用されます。
Fork/Join Frameworkは計算処理の効率化はできるものの、Webアプリケーションのような通信やDBへのアクセスがあるような用途には向いていません。しかし、Work Stealing方式のタスクスケジューリングを含むスレッドプールは、Fork/Join Framework以外の用途でも使われていくようになりました。
このスレッドプールはForkJoinPoolクラスで使用できます。デフォルトではトータルのCPUのコア数だけスレッドを生成します。このため、タスクスイッチは頻繁に行うものの、スレッドのコンテキストスイッチは抑えることができます。
また、Executors.newWorkStealingPoolメソッドでもForkJoinPoolオブジェクトを取得できます。ただし、このメソッドはJava 8以降で使用することができます。
マルチコア熟成期 - Java SE 8
Java SE 8は2014年にやっとリリースされました。Java 8で最も注目されたのがProject Lambda、つまりラムダ式とストリームです。マルチスレッド的にはパラレルストリームの導入もあります。
しかし、ここで取り上げるのはパラレルストリームではなく、CompletableFutureです。く
Java 7のFork/Join Frmeworkで計算処理は効率よく行うことが可能になりました。しかし、通信やDBアクセスなどのI/O処理を含むようなタスクには向いていません。
I/O処理は処理をブロックします。たとえば、Readerオブジェクトから読み込む処理はどうでしょう。
try (BufferedReader reader = new BufferedReader(...)) {
int c = reader.read();
}
readメソッドは文字が読み込めるまでブロックします。Readerオブジェクトがソケットから生成されていれば、通信が届かない限り文字を読み込めません。その間、ブロックします。
I/O処理はCPUの計算時間に比べると多大な時間を使います。しかし、ブロックしてしまうと、その間CPUは遊んでしまうわけです。
そこで、I/O処理を非同期に行うことが考えられます。たとえば、こんな感じです。
Future<String> future = pool.submit(() -> {
// I/O読み込み処理
return contents;
});
// 非同期処理の結果を受け取る
String contents = future.get();
お分かりかと思いますが、これでは非同期処理の利点を生かすことができません。というのも、Future.getメソッドが非同期処理が完了するまでブロックしてしまうからです。
そこで、導入されたのがCompletableFutureクラスです。
非同期で行いたいような処理は1つだけということはあまりなく、複数の処理を連ねることが多いのではないでしょうか。
たとえば、Webアプリケーションであれば
- リクエストを受けて何らかのロジック処理を行う
- DBへのクエリを投げる
- クエリ結果からレスポンスを作成し、送信する
のような感じです。もちろん、こんな単純なことで済まないことも多いとは思いますが、こんな感じで処理が進むという例のつだと思ってください。
Thread per Requestでリクエストごとにスレッドを生成して、非同期に1, 2, 3の処理を行えばスループットは向上します。しかし、2のDB処理は完了するまでブロッキングしてしまいます。そのため、ブロッキングしている間、CPUは遊んでしまいます。
その間にも、リクエストがあればスレッドがどんどん増えるばかりで、あっという間にスループットは飽和してしまいます。そこで、DB処理も非同期にして... とやっていると先ほどのFuture.getメソッドのようにまたブロッキングを増やしてしまうかもしれません。
CompletableFutureクラスを使うと、このような一連の処理をラムダ式で記述し、メソッドチェーンで記述することができます。
たとえば、リクエストを受けるメソッドがhandleメソッドだとしてみましょう。引数の型はRequestクラスとResponseクラスだとします。
void handle(final Request request, final Response response) {
record Param<S>(S s, Response response) {}
CompletableFuture
.completedFuture(new Param(request, response))
.thenApply(param -> {
// 1. ロジック処理
return new Param(logicResult, response);
})
.thenApplyAsync(param -> {
// 2. 非同期でDBへのクエリ
return new Param(queryResult, response);
})
.thenAccept(param -> {
// 3. レスポンスの作成、送信
param.response().response(responseContents);
});
}
ここではタプル的に2つのオブジェクトを扱うためにレコードクラスのParamクラスを作りましたが、これが必須というわけではないです。
そのParamオブジェクトを引数としてcompletedFutureメソッドでCompletableFuture生成しています。その後のメソッドチェーンで処理を連ねていきます。
thenが先頭につくメソッドは前段までの処理が完了した時に引数のラムダ式がコールバック的に呼ばれるメソッドです。ApplyやAcceptはラムダ式の引数や返り値によって異なります。
メソッド名の最後がAsyncのメソッドは、引数のラムダ式がさらに非同期に実行されることを示します。したがって、1.のロジック処理と2.のDBクエリが同じスレッドで実行されるわけではないことを示します。
このように、一連の処理を同期、非同期を交ぜながら記述できるのが、CompletableFutureクラスの特徴です。また、CompletableFutureクラスはデフォルトでForkJoinPoolを使用するので、スレッド数は抑えたまま、効率的に実行することができます。
このCompletableFutureクラスを使えば、Thread per Requestに比べるとスループットの飽和も抑えることができます。
しかし、残念ながら、いまだにCompletableFutureクラスはそれほど使われていません。
単にシーケンシャルに処理を記述していくのに比べると、CompletableFutureクラスで処理を記述するには考え方を変えなければいけません。処理のキャンセルや、例外の扱いなどが煩雑になるという点もあります。
さらに、CompletableFutureクラスのデバッグは難しいのです。
ストリームのデバッグをしたことがある方は分かると思うのですが、ラムダ式が含まれるとスタックトレースが読みにくくなります。
しかも、CompletableFutureクラスは処理を記述したラムダ式が同じスレッドで動作するわけでありません。スタックトレースのスタックは、スレッドが持つスタックのことです。スレッドが異なればスタックトレースは別ものになります。このため、CompletableFutureクラスにおいて、ラムダ式で記述した処理で例外が発生したとしてもスレッドが異なるため、スタックトレースも別々になり、一連の処理の流れはスタックトレースからは追うことができません。
このように、CompletableFutureクラスは使いこなすうえで難しい点があります。しかし、CompletableFutureクラスを使いこなせるのであれば、スループットの高いシステムを実現することができます。
これはSpring WebFluxのようなリアクティブ系のフレームワークでも同じです。使いこなせるのであれば、高いスループット性能が可能です。
そして、CompletableFutureクラスでも、リアクティブなフレームワークでも使いこなせるのであれば、Virtual Threadは使わなくても大丈夫なはずです。
ここまでのJavaでのマルチスレッドの流れをまとめてみると
- Thread per Requestを使えば、スループットを上げることは可能だが、飽和するのも早い
- 計算処理など処理をブロックする要因がない処理であればFork/Join Frameworkもしくはパラレルストリームで応答性を向上することができる
- I/Oなど処理をブロックする要因がある場合はブロックする処理を非同期に処理する。そのためにCompletableFutureクラスやリアクティブ系のフレームワークが使用可能
- しかし、CompletableFutureクラスやリアクティブ系のフレームワークは使いこなすのが難しく、デバッグもやりにくい
ここらへんを何とかしてくれるのがVirtual Threadです。
Concurrency Utilities以降のJavaでは、自分でスレッドを生成することはなくなっているはずです。Virtual Threadも自分で生成させることはないはずです。
フレームワークなどがVirtual Threadに対応すれば、コードを変更しなくても自動的にVirtual Threadを使えるようになっているはず。
Springが早々にVirtual Threadに対応することを発表しているように、意外にVirtual Threadを使えるようになる日も近いかもしれません。
おまけ
わざわざ、Java 1.0のころの古いスレッドん使い方を説明したのは、ここで説明したことはほとんどがVirtual Threadではバッドマナーになるからです。
Virtual Threadは、Virtualではないスレッドの上で動く仮想スレッドです。しかし、Java 1.0のころのsynchronizedやwait/notifyなどの操作は仮想スレッドではなく、仮想スレッドが動作しているスレッドに対して行われます。
スレッドがブロックしてしまったりすると、スレッドの上で動作する複数のVirtual Threadすべてが影響を受けてしまいます。
ThreadクラスのいくつかのAPIは@deprecatedになり、使うことが非推奨になっています。
Threadクラスの非推奨(@deprecated)のAPI
- checkAccess
- countStackFrames
- getId
- resume
- stop
- suspend
これらのメソッドはいつ削除されても不思議はありません。
ちなみに、Java 8では使えていたdestroyメソッドとstop(Throwable)メソッドはすでに削除されています。
また、synchronizedブロック(メソッド)はロックAPIで書きかえられます。wait/notifyを使う場面はほとんどないとは思いますが、なるべくタスクは独立に処理できるようにし、これらを使わないような設計を考えていく必要があります。