このエントリは Java Advent Calendar 2012 の第 1 日目です。
去年の Java Advent Calendar も第 1 日目を書いて、しかも Project Lambda についてでした。
今年も Project Lambda について書くわけですが、去年とはちょっと観点を変えようと思います。
今年の JavaOne で Java SE の一番のトピックといえば、やっぱり Project Lambda だったと思います。実際、Keynote でも Brian Goetz が登壇して Lambda の説明をしていますし、セッションも Lambda だけで 5 つもあったほどです。
実際に Brian Goetz の Lambda のセッションに出て思ったのは、Lambda の言語仕様はほぼ固まったということです。そして、開発の中心は Lambda の実装方法や、Lambda を使った API の方に移ってきています。
今年は櫻庭も JJUG で Project Lambda のハンズオンを行なったのですが、Project Lambda を使いこなすのは、やはり Lambda に対応した API を使いこなすことにあると感じました。
でも、その API についてはほとんど触れません。いちおう、ハンズオンから変化したことをあげておきます。
- インタフェースのデフォルトメソッドの記述が変更
- Stream インタフェースの導入
- パラレル処理の実装
現在公開されている build 64 ではデフォルトメソッドの記述はまだ変更されていないのですが、Lambda の Mercurial ではすでに記述法が変更されています。
具体的には default の書く位置が変更になりました。
public interface Foo {
void foo() default { .... }
}
| 変更
V
public interface Foo {
default void foo() { .... }
}
もう一方の Stream インタフェースが Lambda の API の中心になるインタフェースです。今までは filter メソッドや map メソッドなどは Iterable インタフェースで定義されていたのですが、これらはみな Stream インタフェースで定義されるようになりました。
Iterable インタフェースに残っているのは forEach メソッドぐらいです。
Stream オブジェクトを取得するには Streamable インタフェースの stream メソッドを使用します。てっきり、Iterable インタフェースに定義されるのかと思ったら、新たにインタフェースが導入されていたので、ちょっと意外でした。
Streamable インタフェースのサブインタフェースとして Collection インタフェースや Queue インタフェースなどがあります。
ハンズオンの時は、パラレル処理は Array に対してしか実装されていなかったのですが、本格的に実装が行なわれています。
さて、今日の本題は Stream インタフェースです。
といっても Stream インタフェースの使い方ではありません。ではなくて、Stream インタフェースの実装についてです。
というのも、Stream インタフェースで提供されているメソッドは遅延評価されるからなのです。これは Scala の Stream と同じですね。
Java で遅延評価というのはあまり例がないと思うので、どうやって実装されているか調べて見たよというのが、この blog エントリなわけです。
だから、Lambda 式についてはまったく解説しませんww Lambda 式については、ハンズオンの資料を見てください。
遅延評価
注) コメントで id:bleis-tift さんに指摘いただきましたが、ここでいっている遅延評価は正しくは遅延計算と呼ぶべきものです。この後の遅延評価と書かれている部分は遅延計算と読み直してください。とりえあず、このエントリではそのままにしておきますが、今後は安易に遅延評価という言葉を使用しないようにしようと思います。
例題として、整数のリストから偶数を取り出して標準出力に出力することを考えてみましょう。
Lambda 式と Stream インタフェースを使うと次のように書けます。
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5);
// Stream オブジェクトの取得
Stream<Integer> stream = nums.stream();
// 偶数をフィルタリングしたStreamオブジェクトを生成
Stream<Integer> stream2 = stream.filter(s -> s%2 == 0);
// 標準出力に出力
stream2.forEach(s -> System.out.println(s));
これは内部イテレータで書いているわけですが、あえて外部イテレータで書き直してみました。
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5);
List<Integer> nums2 = new ArrayList<>();
for (Integer s: nums) {
if (s%2 == 0) {
nums2.add(s);
}
}
for (Integer s: nums2) {
System.out.println(s);
}
するとループが 2 回になってしまうわけです。こんなことをするぐらいだったら、下のように 1 回のループで書けばいいわけです。
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5);
for (Integer s: nums) {
if (s%2 == 0) {
System.out.println(s);
}
}
でも、内部イテレータで書くのは、処理が独立に記述できることや、処理順序をイテレータに任せられるとか、パラレル処理にしやすいとか、いろいろと利点があるわけです。
とはいうものの、filter メソッドや map メソッドのようなバルクオペレーションを連ねてしまうと、どんどんループが増えていってしまいます。ループが増えるということはパフォーマンスが落ちるということです。
これじゃ、せっかくバルクオペレーションで書けたとしても、魅力半減です。
そこで登場するのが、遅延評価です。
遅延評価は変数を実際に使用する時に、変数を評価することですが、いまいち意味が分かりません。
先ほどのコードで考えてみましょう。遅延評価を行うことで filter メソッドでは、実際にはループを行っていません。
ループを行うのは forEach メソッドだけになります。
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5);
// Stream オブジェクトの取得
Stream<Integer> stream = nums.stream();
// ループしない。遅延評価の対象
Stream<Integer> stream2 = stream.filter(s -> s%2 == 0);
// ループする。
// s を出力するときにフィルタリングされるかどうかを評価する
// s がフィルタリングされるのであれば、出力しない
stream2.forEach(s -> System.out.println(s));
変数を使うときというのは、この場合 forEach メソッドで変数 s を出力するときになります。そこで forEach のループの時に、遡ってフィルタリングの処理も行われるのです。
この結果、遅延評価を行うことで、ループが 1 回で処理が完了することになります。
また、このことから、Stream インタフェースで定義しているすべてのメソッドが遅延評価を行うわけではないことも分かります。
では、どのメソッドが遅延評価され、どのメソッドが即時評価されるのでしょうか。
残念ながら、Stream インタフェースの Javadoc を見ただけでは、どれが遅延評価されるのか分かりません。なので、ソースを読んでチェックしてみました。遅延評価をされる主なメソッドを以下に示します。
- concat
- cumulate
- filter
- flatMap
- limit
- map
- skip
- sorted
- sequential
- tee
- uniqueElement
- unordered
逆に遅延評価されずに即時評価される主なメソッドも列挙しておきます。
- allMatch
- anyMatch
- findAny
- findFirst
- fold
- forEach
- groupBy
- noneMatch
- reduce
- reduceBy
- toArray
基本的には戻り値が Stream インタフェースのメソッドが遅延評価されると覚えておけば大丈夫です。
遅延評価の実装
では遅延評価をどのように実装するのでしょうか。
とりあえず、Project Lambda の API のクラス構成から見ていきましょう。なお、Project Lambda で追加される API は、バルクオペレーション API (Bulk Operation API)と呼ばれているので、ここでもそう呼ぶことにします。
まずは、filter メソッドや map メソッドの引数として Lambda 式で記述されるインタフェースです。これらのインタフェースは、java.uitl.funtions パッケージに定義されています。
主なものをあげると
- Block
- Combiner
- BinaryOperator
- Mapper
- Predicate
などがあります。たとえば、Block インタフェースは forEach メソッドの引数として使われます。
これに対し、これらのインタフェースで記述した処理を実行するクラス群が java.util.streams パッケージで定義されています。
Stream インタフェースも java.util.streams パッケージで提供されています。
Stream インタフェースを実装しているコンクリートが ValuePipeline クラスです。ValuePipeline クラスのスーパークラスは AbstractPipeline クラスなのですが、こちらは Stream インタフェースを実装していません。
また、Stream オブジェクトを生成するなどのユーティリティメソッドを定義しているのが Streams クラスです。
Streamable インタフェースのサブインタフェースの Collection インタフェースや Queue インタフェースが Streams クラスを使用して Stream オブジェクトを取得しています。
他のクラスは追々説明していきましょう。
さて、コードを追っていくとしても、やみくもに追っていってもしかたありません。そこで、次のコードの処理を見ていこうと思います。
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6);
Stream<Integer> stream1 = nums.stream();
Stream<Integer> stream2 = stream1.filter(s -> s%2 == 0);
Stream<Double> stream3 = stream2.map(s -> s/2.0);
stream3.forEach(s -> System.out.println(s));
filter メソッドと map メソッドという 2 つの遅延評価が行われるメソッドがあり、最後に即時評価が行われる forEach メソッドがあるというコードです。
Streamable#stream メソッド
まずは stream メソッドです。stream メソッドは Streamable インタフェースで定義されています。
Streamable インタフェースのサブインタフェースとして、Collection インタフェースなどがあります。Collection インタフェースのサブインタフェースにはここでも使用している List インタフェースや Set インタフェース、Queue インタフェースなどがあるので、これらのクラスはバルクオペレーションが可能です。
stream メソッドの実装はコンクリートクラスではなく、デフォルトメソッドを使用して Collection インタフェースに記述されています。
Stream<E> stream() default {
return Streams.stream(this, StreamOpFlags.IS_SIZED);
}
前述したように、今公開されているビルドは default を書く位置が違うので注意してください。
stream メソッドは単に Streams.stream メソッドをコールしているだけでした。Streams.stream メソッドはオーバーロードされているのですが、第 1 引数が T で、T が T extends Sized & Iterable<U> の stream メソッドがコールされます。
Sized インタフェースも Project Lambda で導入されたインタフェースで、サイズが決まっているコレクションをあらわしています。
public static<U, T extends Sized & Iterable<U>> Stream<U> stream(T entity, int flags) {
return new ValuePipeline<>(new Spliterator.Sequential<U>() {
@Override
public Iterator<U> iterator() {
return entity.iterator();
}
@Override
public void forEach(Block<? super U> block) {
entity.forEach(block);
}
@Override
public int getSizeIfKnown() {
return entity.size();
}
}, StreamOpFlags.IS_SIZED | flags);
}
Streams クラスの stream メソッドでは ValuePipeline オブジェクトを生成して、返しています。前述したように、ValuePipeline クラスは Stream インタフェースの実装クラスになっています。
そして、ValuePipeline クラスのコンストラクタはスーパークラスの AbstractPipeline クラスのコンストラクタをコールしています。
protected final AbstractPipeline<?, E_IN> upstream;
protected final IntermediateOp<E_IN, E_OUT> op;
protected final Spliterator<?> spliterator;
protected final int depth;
protected final int sourceFlags;
protected final StreamShape shape;
private Iterator<E_OUT> iterator;
protected AbstractPipeline(Spliterator<?> spliterator, int sourceFlags, StreamShape shape) {
this.upstream = null;
this.op = null;
this.spliterator = Objects.requireNonNull(spliterator);
this.depth = 0;
this.sourceFlags = sourceFlags;
this.shape = shape;
}
ここで注目しておいて欲しいのが、upstream というフィールドがあることです。upstream フィールドの型は AbstractPipeline クラスです。つまり、リスト構造的に AbstractPipeline オブジェクトが連なっていくことができるということです。
Stream#filter メソッド, Stream#map メソッド
では、次に filter メソッドと map メソッドです。この 2 つのメソッドは実質的にはほとんど変わりません。
filter メソッドと map メソッドを実装しているのは ValuePipeline クラスです。
@Override
public Stream<U> filter(Predicate<? super U> predicate) {
return chainValue(new FilterOp<>(predicate));
}
@Override
public <R> Stream<R> map(Mapper<? extends R, ? super U> mapper) {
return chainValue(new MapOp<>(mapper));
}
この chainValue メソッドをコールしているメソッドが遅延評価の対象になります。chainValue メソッドは AbstractPipeline クラスで実装されています。
protected<E_NEXT> Stream<E_NEXT> chainValue(IntermediateOp<E_OUT, E_NEXT> op) {
return new ValuePipeline<>(this, op);
}
なんと、ValuePipeline オブジェクトを生成しているだけです。ただし、先ほどの ValuePipeline オブジェクトの生成とはコンストラクタの引数が違います。
public ValuePipeline(AbstractPipeline<?, T> upstream, IntermediateOp<T, U> op) {
super(upstream, op);
}
第 1 引数が upstream という変数名になっています。ということは...
protected AbstractPipeline(AbstractPipeline<?, E_IN> upstream, IntermediateOp<E_IN, E_OUT> op) {
this.upstream = Objects.requireNonNull(upstream);
this.op = Objects.requireNonNull(op);
this.spliterator = upstream.spliterator;
this.depth = upstream.depth + 1;
this.sourceFlags = upstream.sourceFlags;
this.shape = upstream.shape;
assert upstream.getShape() == op.inputShape();
assert (upstream.depth == 0) ^ (upstream.op != null);
}
やはり AbstractPipeline オブジェクトの upstream フィールドに代入されています。先ほどの指摘したように、Stream オブジェクトがこのようにして、リスト構造的に津ならなっていくわけです。
もう 1 つ、spliterator フィールドに upstream フィールドの spliterator フィールドが代入されています。
とすると、遅延評価のメソッドをいくつ連ねても、spliterator フィールドは一番始めの stream オブジェクトを作った時の spliterator フィールドが参照されるということになります。
Stream#forEach メソッド
最後に forEach メソッドです。ValuePipeline クラスの forEach メソッドを次に示します。
public void forEach(Block<? super U> block) {
pipeline(ForEachOp.make(block));
}
filter メソッドや、map メソッドは、実際に処理を行なうオブジェクトをラップするクラスのオブジェクトを生成していましたが、 forEach メソッドでは ForEachOp オブジェクトの生成ではなく、make メソッドをコールしています。
最後に Op がつくクラスは java.util.streams.op パッケージで定義されています。
private final TerminalSink<T, Void> sink;
private final StreamShape shape;
protected ForEachOp(TerminalSink<T, Void> sink, StreamShape shape) {
this.shape = Objects.requireNonNull(shape);
this.sink = Objects.requireNonNull(sink);
}
public static<T> ForEachOp<T> make(final Block<? super T> block) {
Objects.requireNonNull(block);
return new ForEachOp<>(new TerminalSink<T, Void>() {
@Override
public void accept(T t) {
block.apply(t);
}
@Override
public Void getAndClearState() {
return null;
}
}, StreamShape.VALUE);
}
ForEachOp クラスの make メソッドを見てみると、結果的には ForEachOp オブジェクトを生成していることが分かります。この時に TerminalSink オブジェクトもいっしょに生成していることが分かります。
そして、先ほどの forEach メソッドに戻って、pipeline メソッドをコールしていることが分かります。
public<R> R pipeline(TerminalOp<E_OUT, R> terminal) {
assert getShape() == terminal.inputShape();
return evaluate(terminal);
}
protected<R> R evaluate(TerminalOp<E_OUT, R> terminal) {
// @@@ NYI If the source size estimate is small, don't bother going parallel
if (StreamOpFlags.PARALLEL.isKnown(sourceFlags)) {
return evaluateParallel(terminal);
}
else
return evaluateSequential(terminal);
}
はじめの assert 文はともかく、evaluate メソッドをコールしていることが分かります。なんとなく、それっぽいですね。
そして、evaluate メソッドではパラレルに処理するかどうかで処理を切り替えています。ここではシリアルに処理する方を見ていきましょう。
protected <R> R evaluateSequential(TerminalOp<E_OUT, R> terminal) {
return (R) terminal.evaluateSequential(new SequentialImplPipelineHelperSource());
}
SequentialImplePipelineHelperSource クラスがなんなのかはとりあえずおいておいて、terminal の evaluateSequential メソッドをコールしていることが分かります。
terminal の型は TerminalOp インタフェースですが、ここでの実装クラスは ForEachOp クラスです。
public <S> Void evaluateSequential(PipelineHelper<S, T> helper) {
return helper.into(sink).getAndClearState();
}
先ほどおいておいた SequentialImplePipelineHelperSource クラスの into メソッドをコールしているのでした。into メソッドの引数の sink は、先ほど ForEachOp クラスの make メソッドの中で作成した無名クラスのオブジェクトです。
SequentialImplePipelineHelperSource クラスは AbstractPipeline クラスの内部クラスになり、into メソッドを定義しているのはスーパークラスの SequentialImplePipelineHelper クラスです。で、それを見てみると、今度はさらにスーパークラスの AbstractPipelineHelper クラスの into メソッドがコールされていることが分かります。
public <S extends Sink<P_OUT>> S into(S sink) {
Sink<P_IN> wrappedSink = wrapSink(sink);
wrappedSink.begin(spliterator.getSizeIfKnown());
spliterator.forEach(wrappedSink);
wrappedSink.end();
return sink;
}
@Override
public Sink<P_IN> wrapSink(Sink sink) {
Objects.requireNonNull(sink);
for (int i = upTo - 1; i >= from; i--) {
sink = ops[i].wrapSink(
StreamOpFlags.combineStreamFlags(sourceFlags, opsFlags[i]),
sink);
}
return sink;
}
いろいろと処理は行なわれていますが、重要なのは赤字の部分。つまり、spliterator の forEach メソッドをコールしていることが分かります。
この forEach メソッドでやっとループになります。
spliterator が、連なっている Stream オブジェクトのはじめの Stream オブジェクトに定義されているものだということを思いだしてください。
forEach メソッドでは Block インタフェースの apply メソッドをコールします。しかし、ここでは Sink クラスでラップしてあり、コールされるのは spliterator を定義した Stream オブジェクトの op フィールドの accept メソッドがコールされます。ここではそれがなんだったかというと、FilterOp クラスになるわけです。
FilterOp クラスの wrapSink メソッドを次に示します。
public Sink wrapSink(int flags, final Sink sink) {
Objects.requireNonNull(sink);
return new Sink.ChainedValue(sink) {
@Override
public void accept(T t) {
if (predicate.test(t))
downstream.accept(t);
}
};
}
ようやく、filter メソッドの処理が出てきました。filter メソッドの引数は Predicate インタフェースのオブジェクトになり、Predicate インタフェースは test メソッドが定義されています。
この test メソッドがコールされるのが赤字の部分です。test メソッドの戻り値が true の場合、言いかえればフィルターされた結果で残る場合は downstream 変数が示している次の処理の accept メソッドをコールします。
こうすることで連なっている処理を行なっていくわけです。
同様に MapOp クラスの wrapSink メソッドを見てみましょう。
public Sink<t> wrapSink(int flags, Sink sink) {
return new Sink.ChainedValue<t>(sink) {
@Override
public void accept(T t) {
downstream.accept(mapper.map(t));
}
};
}
Stream インタフェースの map メソッドの引数の型は Mapper インタフェースになります。Mapper インタフェースも map メソッドを定義しており、この部分が Lambda 式で評価されるわけです。それが赤字で書いた map メソッドのコールになります。
map メソッドで 1 つの要素を他の型に変換するので、その後、再び連なる処理の accept メソッドをコールするというわけです。
このように Stream オブジェクトの連なりから、処理をつなげていくことで、ループが 1 回だけで処理が可能になっているわけです。
最後はちょっとはしょってしまいましたが、遅延評価の手法についてコードをおっていきました。遅延評価というだけであれば簡単ですけど、実装はなかなか大変そうです。
明日は @btnrouge さんです!