このエントリーは Java Advent Calendar の20日目のエントリーです。
qiita.com
Java 19のJEP 425: Virtual Threadsの裏で、ひっそりと登場していたのが、JEP 428 Structured Concurrency です。Virtual ThreadはPreview JEPでしたが、Structured ConcurrencyはIncubator JEPです。
そして、Java 20では、Virtual ThreadsはSecond Previewとして JEP 436 になっています。同じようにStructured ConcurrencyもSecond Incubatorとして JEP 437 になりました。
JEP 428とJEP 437の違いはほとんどないので、このままいけば次のLTSであるJava 21で正式に導入されると思います。
動機
Structured Concurrencyは名前にConcurrencyとついていることから分かるように、並列/並行処理に関するAPIです。構造化した並列とはどういうことなのでしょう。
それを考えるために、ちょっとしたサンプルで考えてみます。
最近は、複数の外部のサービスを組み合わせて使うシステムが多くありますね。外部にあるということは、何らかのI/Oが発生して待ちがあるということです。
たとえば、HTTPで通信するWebサービスを2つ使用し、クエリ結果を組み合わせるようなサンプルを考えてみます。
HTTP通信が発生するので、逐次的に処理をしてしまうととても時間がかかってしまいます。そこで、クエリを非同期に行います。
すごい単純ですが、こんな感じ。
String query(final String queryText) throws InterruptedException, IOException { var client = HttpClient.newHttpClient(); var request = HttpRequest.newBuilder() .uri(URI.create(queryText)) .build(); var response = client.send(request, BodyHandlers.ofString()); var status = response.statusCode(); if (status / 100 != 2) { throw new IOException("HTTP Error: " + status); } return response.body(); } record Result(String result1, String result2) {} Response complexQuery(final String queryText1, final String queryText2) throws InterruptedException, ExecutionException { try (var pool = Executors.newFixedThreadPool(2)) { Future<String> future1 = pool.submit(() -> query(queryText1)); Future<String> future2 = pool.submit(() -> query(queryText2)); String result1 = future1.get(); String result2 = future2.get(); return new Result(result1, result2); } }
TwoTaskクラスではHTTPのGETでクエリーを行いますが、それぞれを非同期に行っています。クエリーのURLが引数のqueryText1とqueryText2です。
これを実行すると、特に問題がなければ普通に動くはずです。
しかし、このコードには複数の問題があります。
- 1番目のクエリーで例外が発生した場合、complexQueryメソッドを抜けてしまうが、2番目のクエリーは実行したままになってしまう
- complexQueryメソッド実行中に割り込み(Thread.interrupt()メソッドがコール)されるとメソッドを抜けるが、クエリーは実行したままになってしまう
- 1番目のクエリーを実行中に、2番目のクエリーで例外が発生しても、1番目のクエリーが完了しない限り(future1.get()が戻らない限り)、2番目のクエリーの例外を扱うことができない
正常に動作していればいいのですが、例外や割り込みが発生した時に、それらが正しくサブタスクに伝播されないことに問題があります。
もちろん、割り込みが発生したらサブタスクにも割り込みしたり、あるサブタスクで例外が発生したら他のサブタスクをキャンセルするという処理を書くこともできます。とはいうものの、こういうボイラープレート的な記述が増えてくるのは避けたいところです。
そこで、サブタスクを構造的に扱い、例外や割り込みなどを容易に扱えるようにしたのが、JEP 437 Structured Concurrencyなのです。
Structured Concurrency
Structured Concurrencyで提供されるメインのクラスはStructuredTaskScopeクラスです。
JEP 437はIncubator APIなので、今のところパッケージはjdk.incubator.concurrentですが、正式なAPIになった時にはパッケージは変更になります。たぶん、java.util.concurrentに含まれると思いますが、あくまでも予想です。
StructuredTaskScopeクラスは、名前の通りスコープです。そのスコープの中で作られたサブタスクを構造化するという使い方をします。
たとえば、先ほどのサンプルのcomplexQueryメソッドをStructuredTaskScopeクラスで書き直したものが以下になります。
Result complexQuery(final String queryText1, final String queryText2) throws InterruptedException, ExecutionException { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Future<String> future1 = scope.fork(() -> query(queryText1)); Future<String> future2 = scope.fork(() -> query(queryText2)); scope.join(); scope.throwIfFailed(); String result1 = future1.resultNow(); String result2 = future2.resultNow(); return new Result(result1, result2); } }
ここでは、StructuredTaskScopeクラスのサブクラスで内部クラスのShutdownOnFailureクラスを使用しています。
try-with-resources構文で記述されたブロックがStructuredTaskScopeクラスのスコープになるわけです。
このスコープの内部でサブタスクをforkし、サブタスクの完了を待つためにjoinしています。
その次のthrowIfFailedメソッドはShutdownOnFailureクラスのメソッドで、サブタスクで例外が発生した場合、他のサブクラスに伝播させタスクをキャンセルさせます。
ShutdownOnFailureというのはサブタスクで例外が発生した時に、サブタスクをキャンセルし、スコープをシャットダウンするためのクラスになります。
その後、サブタスクの結果を取得するために、getメソッドではなくresultNowメソッドを使用しています。Future.resultNowメソッドも新たに定義されたメソッドです。タスクが完了していれば結果を返し、キャンセルされた場合はキャンセルされた時点での結果を返します(結果がないこともあります)。
StructureTaskScopeクラスはVirtual Threadでの使用が前提になっており、デフォルトでVirtual Threadを使うようになっています。
なお、ここではforkしているのは2つのタスクだけですが、さらに多くのタスクをforkすることも可能です。
ところで、forkしてjoinするというAPIは他にもありましたね。
そう、Fork/Join Frameworkです。
前回のエントリーでも書きましたが、Fork/Join Frameworkは計算処理などI/O処理を含まないタスクがターゲットでした。一方で、Structured ConcurrencyはVirtual Threadを使うことからも分かるように、I/O処理を含むタスクにフォーカスしています。
この2つのAPIは用途に応じて使い分けるようにしましょう。
さて、このサンプルをコンパイルし、実行してみましょう。
Structured ConcurrencyはIncubator JEPなので、使用するにはオプションとして--enable-previewが必要です。コンパイル時に--enable-previewを使用するには--releaseも必要なので、こちらも指定します。
また、Incubatorのため、jdk.incubator.concurrentモジュールがデフォルトでは読み込まれません。そのために--add-modulesでモジュールを指定する必要があります。
このサンプルがTwoTaskクラスだったとした場合、次のようにコンパイル、実行を行います。
> javac --release 20 --enable-preview --add-modules jdk.incubator.concurrent TwoTask.java 警告:実験的なモジュールを使用しています: jdk.incubator.concurrent 警告1個 > java --enable-preview --add-modules jdk.incubator.concurrent TwoTask
せっかくなので、例外も出してみましょう。一方のクエリーだけ失敗するようなURLを使って実行してみると、単にExecutorServiceを使っていた場合、java.util.concurrent.ExecutionException例外がスローされるのに時間がかかります。
ところが、StructuredTaskScopeクラスを使った場合、すぐに例外がスローされます。これが例外が発生させた時に、他のサブタスクをキャンセルさせて、スコープをシャットダウンさせているためです。
ところで、StructuredTaskScopeクラスには、もう1つサブクラス兼内部クラスのStructuredTaskScope.ShutdownOnSuccessクラスがあります。
先ほどのShutdownOnFailureクラスは例外が出た場合はシャットダウンします。逆にいうと、例外が出ない場合は複数のサブタスクがすべて完了するまでjoinメソッドはブロックします。
これに対し、ShutdownOnSuccessクラスはサブタスクのうち、どれか1つが完了すれば、他のサブタスクはキャンセルして、シャットダウンします。
ShutdownOnFailureクラスはサブタスクのANDであるのに対し、ShutdownOnSuccessクラスはORになるとも言えますね。
たとえば、先ほどのcomplexQueryメソッドを早く完了したタスクの結果を返すように書きかえてみましょう。
String complexQuery(final String queryText1, final String queryText2) throws InterruptedException, ExecutionException { try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) { Future<String> future1 = scope.fork(() -> query(queryText1)); Future<String> future2 = scope.fork(() -> query(queryText2)); scope.join(); return scope.result(); } }
ShutdownOnFailureクラスと異なり、ShutdownOnSuccessクラスはサブタスクの結果を扱うため、ジェネリクスでサブタスクの戻り値の型を指定します。
最も早く完了したサブタスクの処理結果を取得するのがresultメソッドです。
他はShutdownOnFailureクラスと同じです。
Structured Concurrencyでは他に、サブタスク間で共有できる値を扱うScopedValueクラスがあるのですが、これはまた別の機会に紹介します。
0 件のコメント:
コメントを投稿