このエントリは
Java Advent Calendar 2013 の第 9 日目です。
昨日は、
@nabedge さんの
Mixer2のSpringMVC連携機能がver 1.2.17でさらに進化!
明日は Satoyuki Tsukano さんです。
去年も一昨年も
Project Lambda について書いてきました。一昨年は Project Lambda の基本的なところ (まだこの頃は Stream がありませんでした)、去年は Stream の遅延処理についてです。
今年はこの Advent Calendar でも Project Lambda や Stream を扱っている人がいるぐらい、興味を持っている人が増えてきたと思います。
普通の Stream の使い方とかは、当たり前なので、ちょっと違う視点から書いてみようと思います (普通の Stream の使い方は ITpro の Java 技術最前線にいつか書きます)。
で、何を取り上げるかというと、パラレル処理です。去年はシリアルの方をやったので、残っていたほうです。
Stream のパラレル処理は Java SE 7 で導入された Fork/Join Framework がベースになっています。でも、どうやって Fork/Join Framework を使っているのかがよく分からないので、そこら辺を調べてみるということです。
ここでは、Fork/Join Framework がどうやってパラレル処理をするかまでは立ち入らずに、Stream がどのように Fork/Join Framework を使っているかだけにとどめておきます。
で、解析するコードはこちらです。
IntStream.range(0, 11)
.parallel()
.map(x -> x*2)
.reduce(0, (x, y) -> x+y);
0 から 10 までの整数の 2 乗和を求める処理です。
では、順番に処理を追っていきましょう。
- IntStream.range
- parallel
- map
- reduce
なお、解析には JDK 8b118 を使用しています。今後、コードが変化する可能性もあるので、ご了承ください。
というのも、去年使用した b64 と比較すると、実装がかなり変わっているからです。ここが違うということはいいませんけど、かなり変わっていました。これから変更される可能性はすくないですが、テストの過程で若干変化する可能性はあります。
IntStream.range メソッド
普通の Stream は Iterator インタフェースと同じように、ソースがあってそこから生成するのですが、いくつかそれとは違う生成法もあります。IntStream.range メソッドもそんなメソッドのうちの 1 つです。
そういえば、Stream と何も書いていない時は、Stream インタフェースと IntStream インタフェースなどのプリミティブ用 Stream を合わせた言い方にしています。ここのインタフェースを指す時は Stream インタフェースと記述し、オブジェクトであれば Stream オブジェクトと書きます。
ということで、Stream にはオブジェクト用の Stream インタフェースと、プリミティブ用の IntStream/LongStream/DoubleStream インタフェースがあるわけです。この中で IntStream インタフェースと LongStream インタフェースだけ range メソッドが定義されています。
range メソッドは引数が 2 つで、1 つ目が Stream がはじまるはじめの数、2 つ目が最後の数の 1 つ大きい数です。上のように range(0, 11) と書くと、0 から 10 までの IntStream オブジェクトが生成されます。
2 つ目の引数を含めたい時は rangeClosed を使います。
この range メソッドを使えば、for (int i = 0; i < 11; i++) { ... } のようなループを Stream で書き換えることができるはずです。
まぁ、それはいいとして、range メソッドの実装です。
public static IntStream range(int startInclusive, int endExclusive) {
if (startInclusive >= endExclusive) {
return empty();
} else {
return StreamSupport.intStream(
new Streams.RangeIntSpliterator(startInclusive,
endExclusive,
false),
false);
}
}
RangeIntSpliterator オブジェクトを作成して、StreamSupport.intStream メソッドをコールしています。
そういえば、Project Lambda ではインタフェースにデフォルトメソッドを書けるようになりましたが、static メソッドも書けるようになったのでした。ここでも、それが使われています。
ここで重要なのが RangeIntSpliterator クラスです。RangeIntSpliterator クラスは java.util.Spliterator インタフェースの実装クラスです。
Spliterator インタフェースは分割のためのインタフェースで、Stream を使う時には必ず出てきます。とはいっても、表にはあまり出てこないインタフェースなので、意識はしないと思います。
ここでは、そういう Spliterator インタフェースの実装クラスを使っていたとところにとどめておきましょう。
StreamSupport.intStream メソッドに移りましょう。
public static IntStream intStream(Spliterator.OfInt spliterator,
boolean parallel) {
return new IntPipeline.Head<>(
spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
intStream メソッドでは、IntPipeline.Head オブジェクトを生成しています。IntPipeline.Head クラスのスーパークラスが IntPipeline クラスです。そして、この IntPipeline クラスが IntStream インタフェースの実体とでもいうクラスです。
そして、IntPipeline クラスには 3 種類の内部クラスがあります。その 1 つが Head クラスです。いうなれば Stream のパイプラインの先頭ということだと思います。
最後に Head クラスのコンストラクタの最後に parallel を指定できるようですが、ここでは false が設定されています。
ということで、range メソッドで IntPipeline.Head オブジェクトが作成されることが分かりました。
parallel メソッド
先ほど生成した IntPipeline.Head オブジェクトは parallel の指定がされていなかったので、paralle メソッドでパラレル対応にします。とはいっても、パラレル処理用のクラスがあるわけではありません。
parallel メソッドは IntPipeline クラスではなく、そのスーパークラスの AbstractPipeline クラスで定義されています。
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
単純に parallel 変数に true を代入しているだけでした。
map メソッド
続いて、map メソッドです。この map メソッドに関しては、去年の blog で紹介したのとはずいぶん変化しています。
public final IntStream map(IntUnaryOperator mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<Integer>(
this, StreamShape.INT_VALUE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<Integer> opWrapSink(int flags, Sink sink) {
return new Sink.ChainedInt(sink) {
@Override
public void accept(int t) {
downstream.accept(mapper.applyAsInt(t));
}
};
}
};
}
去年の実装では ValuePipeline クラスを使用していたのですが、今は StatelessOp オブジェクトを生成して返しています。この StatelessOp クラスは、先ほどの Head クラスと同様に IntPipeline クラスの内部クラスで、サブクラスになっています。
Stream の中間のパイプラインのうち、ステートを持たないものが StatelessOp クラスになるようです。map メソッドは順番は特に保持しなくてもかまわないので、StatelessOp クラスが選択されているのだと思います。
クラス名は変化していますが、処理自体は Sink インタフェースの匿名クラスで記述されていおり、処理を遅延させられるようになっています。
ちなみに Sink インタフェースは、java.util.functions.Consumer インタフェースのサブインタフェースで、連なった処理を記述するためのインタフェースのようです。
accept メソッドで downstream の accept メソッドをコールしているのは去年と同じですね。
reduce メソッド
ここまではパラレルに関する実質的な処理は表れませんでした。パラレル処理があるのは、パイプラインの途中過程ではなく、最後の処理になります。
去年も forEach メソッドの中でパラレルかどうかのフラグがあったところまでは見ていましたが、reduce でも同じような処理でパラレル化どうかを切り分けています。
では、reduce メソッドの実装を見ていきましょう。
public final int reduce(int identity, IntBinaryOperator op) {
return evaluate(ReduceOps.makeInt(identity, op));
}
ReduceOps クラスは ReduceOp オブジェクトを生成するためのファクトリクラスです。ReduceOp クラスは TerminalOp インタフェースの実装クラスで、パイプラインの最後の処理を表しています。
とりあえず、ReduceOps.makeInt メソッドを見てみましょう。
public static TerminalOp<Integer, Integer>
makeInt(int identity, IntBinaryOperator operator) {
Objects.requireNonNull(operator);
class ReducingSink
implements AccumulatingSink<Integer,
Integer, ReducingSink>,
Sink.OfInt {
private int state;
@Override
public void begin(long size) {
state = identity;
}
@Override
public void accept(int t) {
state = operator.applyAsInt(state, t);
}
@Override
public Integer get() {
return state;
}
@Override
public void combine(ReducingSink other) {
accept(other.state);
}
}
return new ReduceOp<Integer, Integer,
ReducingSink>(StreamShape.INT_VALUE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
makeInt メソッドの中で ReducingSink クラスを定義しています。このクラスも Sink インタフェースの実装クラスです。とりあえず、combine メソッドと accept メソッドがあったということだけ覚えておきましょう。
そして、この ReducingSink オブジェクトを生成する makeSink メソッドを定義した ReduceOp オブジェクトを返しています。
では、再び IntPipeline クラスの reduce メソッドに戻って、evaluate メソッドを追ってみましょう。このメソッドは IntPipeline クラスではなく、スーパークラスの AbstractPipeline クラスで定義されています。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(
this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(
this, sourceSpliterator(terminalOp.getOpFlags()));
}
ここでやっとパラレルかどうかで振り分けを行っています。
evaluateParallel メソッドの引数で sourceSpliterator メソッドがコールされていますが、この戻り値は IntStream.range メソッドで使用した RangeIntSpliterator オブジェクトになります。ようするに、自分自身と Spliterator オブジェクトを引数にして、evaluateParallel メソッドをコールしているわけです。
この terminalOp 変数は、さきほど作成した ReduceOp オブジェクトを指しています。
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
evaluateParallel メソッドでは、ReduceTask オブジェクトを生成しています。コンストラクタの引数の this が ReduceOp オブジェクトで、helper 変数が IntPipeline オブジェクト、spliterator 変数が RangeIntSpliterator オブジェクトです。
なんたら Task というクラスが出てくると、なんとなくパラレルっぽいですね。
それもそのはず、ForkJoinTask <- CountedCompleter <- AbstractTask <- ReduceTask というクラス構成になっていて、Fork/Join Framework がようやく出てきました。
ちなみに、ReduceTask クラスは ReduceOps クラスの内部クラスになっていて、ほとんどの処理は AbstractTask クラスで行っているようです。
ReduceTask オブジェクトが生成できたら、invoke メソッドをコールします。invoke メソッドを定義しているのは ForkJoinTask クラスです。
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
if 文の中に入ってしまってますけど、重要なのは doInvoke メソッドです。doInvoke メソッドも ForkJoinTask クラスで定義されています。
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
externalAwaitDone();
}
処理を行っているのは、doExec メソッドです。doExec メソッドも ForkJoinTask に定義されています。
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
そして、処理は exec メソッドに飛ばされます。exec メソッドは CountedCompleter クラスで定義されています。
protected final boolean exec() {
compute();
return false;
}
で、compute メソッドに委譲されています。ついでに、戻り値は false なので、先ほどの doExec メソッドの if (completed) ... は実行されないことがわかります。
そして、compute メソッドを定義しているのは AbstractTask クラスです。
public void compute() {
Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
long sizeEstimate = rs.estimateSize();
long sizeThreshold = getTargetSize(sizeEstimate);
boolean forkRight = false;
@SuppressWarnings("unchecked") K task = (K) this;
while (sizeEstimate > sizeThreshold
&& (ls = rs.trySplit()) != null) {
K leftChild, rightChild, taskToFork;
task.leftChild = leftChild = task.makeChild(ls);
task.rightChild = rightChild = task.makeChild(rs);
task.setPendingCount(1);
if (forkRight) {
forkRight = false;
rs = ls;
task = leftChild;
taskToFork = rightChild;
}
else {
forkRight = true;
task = rightChild;
taskToFork = leftChild;
}
taskToFork.fork();
sizeEstimate = rs.estimateSize();
}
task.setLocalResult(task.doLeaf());
task.tryComplete();
}
やっと、Fork/Join Framework っぽい書き方が出てきました。
Fork/Join Framework は分割統治法で、タスクを分割し、分割したタスクを fork していくという処理になります。このタスクを分割するという処に Spliterator が使われているわけです。
RangeIntSpliterator クラスの場合、単純にはじめと終わりの半分のところで分割しています。そして、makeChild メソッドでタスクを分割していますが、ReduceTask クラスの場合、単純に新しい ReduceTask オブジェクトを生成しているだけです。
ReduceTask オブジェクトが対象とする範囲は Spliterator オブジェクトが持っているので、これで大丈夫なわけです。
そして、左、右の順番で fork されています。fork するということは、パラレルに処理されることになります。fork すると、そのタスクの compute メソッドがコールされるので、再帰的にタスクを分割して処理できます。
タスクが十分に小さくなった段階で、doLeaf メソッドがコールされます。ReduceTask クラスでの doLeaf メソッドは次のようになります。
protected S doLeaf() {
return helper.wrapAndCopyInto(op.makeSink(), spliterator);
}
先ほど ReduceOps クラスの makeInt メソッドで定義した ReduceOp クラスの makeSink メソッドがコールされているので、ReduceSink オブジェクトが生成されて、引数になります。
wrapAndCopyInto メソッドは AbstractPipeline クラスで定義されています。ちょっとはしょりますが、wrapAndCopyInto メソッドは内部的に copyInto メソッドをコールしています。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink,
Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
この中の Spliterator オブジェクトの forEachRemaining メソッドがパイプラインをさかのぼって、処理を行う部分です。この場合だと、map メソッドを行ってから、reduce メソッドを行います。
では、RangeIntSpliterator クラスの forEachRemaining メソッドを見てみましょう。
public void forEachRemaining(IntConsumer consumer) {
Objects.requireNonNull(consumer);
int i = from;
final int hUpTo = upTo;
int hLast = last;
from = upTo;
last = 0;
while (i < hUpTo) {
consumer.accept(i++);
}
if (hLast > 0) {
// Last element of closed range
consumer.accept(i);
}
}
やっと、accept メソッドが出てきました。ここで、まず map メソッドで作成した StatelessOp オブジェクトの accept メソッドがコールされ、そしてその後 ReducingSink クラスの accept メソッドがコールされるわけです。
ところで、map メソッドはパラレル処理でしやすいですけど、reduce メソッドはどうするのか思いますよね。
タスクを分割した最後のところは 0 と IntStream オブジェクトの個々の要素の map メソッドの戻り値を足し合わせます。そして、forEachRemaining メソッドを抜けて、compute メソッドまで戻ります。
そして、compute メソッドの最後の task.tryComplete() が実行されます。
ReduceTask クラスの onCompletion メソッドは次のようになっています。
public void onCompletion(CountedCompleter<?> caller) {
if (!isLeaf()) {
S leftResult = leftChild.getLocalResult();
leftResult.combine(rightChild.getLocalResult());
setLocalResult(leftResult);
}
// GC spliterator, left and right child
super.onCompletion(caller);
}
leftResult 変数の結果と rightChild 変数の結果を combine メソッドで統合しています。この combine メソッドは ReducingSink の combine メソッドです。先ほど ReducingSink クラスは示しましたけど、もう一回出します。
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
combiner 変数の型は BinaryOperator インタフェースです。つまり、Lambda 式で記述している部分は、BinaryOperator インタフェースの apply メソッドなわけです。つまりここでは、単純に足し算をしています。
ということで、0 と map メソッドの戻り値を足した右と左のタスクが、その上の段階で足し合わされるわけです。これがさらに統合されて、最終的にすべての足し算が行われるというわけです。
というわけで、分割統治でパラレルに reduce メソッドを行っていく処理を追ってみました。
今回は reduce メソッドで見てきましたが、他のメソッドではまた違う方法でパラレルに処理しています。特に順番が決まっている処理の場合、パラレル処理をするための工夫があるので、もし興味があるようでしたらぜひ見てみてください。
ふー、長かった (笑)。