2025/01/09

Stream Gatherer 動作編

このエントリーをはてなブックマークに追加

年が変わってしまいましたが、前回の続き。前回はこちら

全開でGathererの使い方を一通り説明したので、今回はGathererがどのように動作しているのかを解説していきます。

 

Stream APIの動作

Gathererがどのように動作するのかを説明する前に、もともとのStream APIがどのように動作しているのか復習しておきましょう。

といっても、詳しく説明すると長くなってしまうので、簡単に。

ここではIntStreamインタフェースなどのプリミティブ系は省略、またシーケンシャルな動作に限定して説明します。

Streamの実行に重要なインタフェース

Stream APIの動作を説明する前に、Stream APIを使っているだけであれば出てこない、でも実装では重要なインタフェースを2つだけ先に紹介しておきます。

  • java.util.Spliterator
  • java.util.stream.Sink

Spliteratorインタフェースは公開インタフェースで、Sinkインタフェースはパッケージプライベートなインタフェースです。

1つ目のSpliteratorインタフェースはSplit + Iteratorのことで、簡単にいえばストリームのイテレーションを制御するインタフェースになります。

Splitがパラレルの場合で、分割統治法により個々の要素を処理します。Iteratorがシーケンシャルの場合で、こちらは普通のイテレーターですね。

Spliteratorオブジェクトはソースによって実装クラスが異なり、ソースからStreamオブジェクトを生成する時に一緒に作られます。

もう一方のSinkは、台所にあるシンクと同じ単語です。動詞だと「沈む」もしくは「沈める」という意味です。

何が沈んでいるかというと操作です。中間操作や終端操作で行われる操作がSinkに沈められています。

SinkインタフェースはConsumerインタフェースのサブインタフェースで、acceptメソッドがコールされると、その操作が実行されます。

 

Sinkオブジェクトで操作を実行する仕組み

では、コードを使って説明していきましょう。ここでは、次のコードで説明していきます。

    var stream = Stream.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
    var stream2 = stream.map(i -> Integer.toString(i));
    var result = stream2.reduce("", (prev, pres) -> prev + pres);

 

Integerのリストを文字列化して、文字列の連結を行うコードです。

さて、ここで変数stream2の型はどうなるでしょう?

もちろん、Streamインタフェースではありますが、Streamインタフェースを実装したコンクリートクラスの方です。

答えはStatelessOpクラスの匿名クラスです。

このStatelessOpクラスが、mapメソッドやfilterメソッドなど状態を持たない中間操作で使用されるクラスです。

StatelessOpクラスのスーパークラスがReferencePipelineクラスで、ReferencePipelineクラスがStreamインタフェースを実装しています。

Referenceなのはストリームを流れる要素が参照型だからで、intであればIntPipelineクラスになります。

それはそれとして、ここで使われているStatelessOpクラスの匿名クラスを生成している部分が次のコードになります。ここでは、mapメソッドの場合です。

    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<>(this, StreamShape.REFERENCE,
                StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

 

StatelessOpクラスの匿名クラスを作る時に、opWrapSinkメソッドをオーバーライドしています。ここで、Sinkが出てきましたね。

opWrapSinkメソッドの中ではSink.ChainedReferenceクラスの匿名クラス生成しています。もちろん、Sink.ChainedReferenceクラスはSinkインタフェースの実装クラスです。

Sink.ChainedReferenceクラスはその名の通りSinkをチェーンでつなげていくインタフェースです。フィールドに下流のSinkオブジェクトであるdownstreamを保持しています。

Gathererでもdownstreamが出てきましたが、ここでは次段のSinkオブジェクトを表しています。

そして、赤字で示したacceptメソッドで、mapメソッドの引数で指定された関数(Functionインタフェースのラムダ式)を実行し、その結果を引数にして下流のSinkオブジェクトのacceptメソッドをコールしています。

これで、中間操作の操作を順々に実行する仕組みができました。

 

終端操作のSinkオブジェクトを生成する仕組み

実際に中間操作から終端操作まで操作をつなげていくのは、終端操作の時です。

まずは終端操作に対応するSinkオブジェクトを作るしくみです。これはreduceメソッドの内部でコールされるReduceOps.makeRefメソッドで行われます。

    public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
        return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
    }

 

ReduceOpsクラスは、ReduceOpクラスのユーティリティクラスですね。中間操作で使用したStatelessOpの終端操作版がReduceOpクラスになります。

おもしろいことに、ReduceOpクラスはReduceOpsクラスのインナークラスになっています。

さて、そのmakeRefメソッドは以下のようになっています。

    public static <T, U> TerminalOp<T, U>
    makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
        Objects.requireNonNull(reducer);
        Objects.requireNonNull(combiner);
    
        class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
            @Override
            public void begin(long size) {
                state = seed;
            }

            @Override
            public void accept(T t) {
                state = reducer.apply(state, t);
            }

            @Override
            public void combine(ReducingSink other) {
                state = combiner.apply(state, other.state);
            }
        }
    
        return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

 

makeRefメソッドの内部で、青字で示したReducingSinkクラスを定義しています。ここでSinkが出てきました。

このReducingSinkクラスが終端操作に対応するSinkです。

ただし、ReducingSinkクラスは中間操作で使用したSinkとはちょっと異なります。

それはReducingSinkクラスがBoxクラスのサブクラスだということです。Boxクラスは値を1つだけ保持するコンテナクラスです。

Gathererと同じで、終端操作は状態を持ち、最終的に処理の結果を返します。その状態を保持するためにBoxクラスを使用しています。

そして、makeRefメソッドの戻り値としてReduceOpクラスの匿名クラスを生成し、その内部でmakeSinkメソッドをオーバーライドし、ここで定義したReducingSinkオブジェクトを返すようにしています。

makeSinkメソッドは定義しましたが、ここでコールされるわけではありません。したがって、まだSinkオブジェクトは生成されていません。

 

中間操作から終端操作までのSinkオブジェクトを連ねる

ここまでで、中間操作と終端操作のSinkオブジェクトを生成する仕組みを見てきました。後は、中間操作から終端操作にいたるSinkオブジェクトを生成し、一連の処理をつなげる必要があります。

これは、reduceメソッドの内部でコールされているevaluateメソッドで行われます。

evaluateメソッド内では処理がシーケンシャルかパラレルかによって処理が分かれますが、ここではシーケンシャルに処理するevaluateSequentialメソッドを見ていきます。

次のコードはReduceOpクラスのevaluateSequentialメソッドです。

    public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                       Spliterator<P_IN> spliterator) {
        return helper.wrapAndCopyInto(makeSink(), spliterator).get();
    }

 

ここで、赤字で示したmakeSinkメソッドが出てきました。

makeRefメソッドの中で、ReduceOpクラスの匿名クラスを定義し、makeSinkをオーバーライドしていましたが、そのmakeSinkメソッドをコールするのがevaluateSequentialメソッドの中でした。

これで、終端操作に対応するSinkオブジェクトが生成できました。

一方のSpliteratorオブジェクトも出てきました(変数spliterator)。Spliteretorオブジェクトの生成は、ソースからStreamオブジェクトを作成する時に作られます。ここでは、省略しますが、もし興味があれば、ソースからStreamオブジェクトを生成する部分を見てみるのもおもしろいと思います。

さて、evaluateSequentialメソッドの第1引数のhelperは、実はReduceOpオブジェクト自身です。

継承関係をさかのぼっていくと、ReferencePipelineクラスのスーパークラスがAbstractPipelineクラスで、さらにそのスーパークラスがPipelineHelperクラスになります。

evaluateSequentialメソッドの内部でコールされているwrapAndCopyIntoメソッドがPipelineHelperクラスで定義されているためこうなっているとは思いますが、ちょっと分かりにくいですね。

PipelineHelperクラスのwrapAndCopyIntoメソッドはabstractとして定義されており、AbstractPipelineクラスでオーバーライドされています。

    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

 

赤字で示したwrapSinkメソッドがSinkオブジェクトを連ねる処理を行いそうなことが分かります。

中間操作のmapメソッドの中でStatelessOpeクラスの匿名クラスがopWrapSinkメソッドをオーバーライドしていたのを思い出してください。

では、そのwrapSinkメソッドです。

    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

 

for文で自分自身(終端操作のReduceOpオブジェクト)からパイプラインをさかのぼって、opWrapSinkメソッドをコールしています。

opWrapSinkメソッドの内部ではSinkオブジェクトを生成しているので、これで終端操作から中間操作の先頭までのSinkオブジェクトを生成して、チェーンでつなげていくことができました。

残るは、ここで生成したSinkオブジェクトに対して、登録されている処理を行う部分です。

 

Spliteratorを使用したイテレーション

やっと最後のイテレーションの部分にまで到達しました。

先ほどのwrapAndCopyIntoメソッドでwrapSinkメソッドの戻り値(Sinkオブジェクト)とSpliteratorオブジェクトを引数にしてコールされるのが、copyIntoメソッドです。

copyIntoメソッドもAbstractPipelineクラスでオーバーライドされています。

    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

 

if文でShort Circuitの有無で処理を変えています。

前回、Short Circuitが出てきましたが、イテレーションの途中で停止させるのがShort Circuitです。

Short Circuitの可能性がある場合、イテレーションを継続するか停止するかチェックする必要があるため別メソッド(copyIntoWithCancelメソッド)になっています。

イテレーションの本体は赤字で示したforEachRemainingメソッドです。

Gatherer.IntegratorインタフェースのファクトリーメソッドであるofGreedyメソッドを使った場合も、Short CircuitされることはないのでforEachRemainingメソッドが使用されます。

 

forEachRemainingメソッドを定義しているSpliteratorインタフェースの実装クラスはソースによって異なります。ArrayListクラスや配列の場合、ArraySpliteratorクラスが使われます。

サンプルのコードはStream.ofメソッドでStreamオブジェクトを生成していますが、この場合もArraySpliteratorクラスが使われます。

ArraySpliteratorクラスのforEachRemainingメソッドを次に示します。

    public void forEachRemaining(Consumer<? super T> action) {
        Object[] a; int i, hi; // hoist accesses and checks from loop
        if (action == null)
            throw new NullPointerException();

        if ((a = array).length >= (hi = fence) &&
            (i = index) >= 0 && i > (index = hi)) {
            do { action.accept((T)a[i]); } while (++i < hi);
        }
    }

 

forEachRemainingメソッドの引数の型がConsumerインタフェースになっていますが、SinkインタフェースはConsumerインタフェースのサブインタフェースなので、実態はSinkインタフェースです。

メソッド内では、配列の範囲チェックの後に、do-while文でループします。

このループで、配列の要素を引数にして変数actionのacceptメソッドをコールしています。

この変数actionは最初の中間操作を保持しているSinkオブジェクトなので、中間操作から終端操作まで順々に実行されます。

 

簡単に紹介するつもりでしたが、かなり長くなってしまいました。

 

Stream Gathererの動作

やっとGathererです。

ここまでのStream APIの動作が理解できていれば、Gathererを理解するのも簡単です。

GathererのPipelineとSink

mapやfilterなど状態を持たない中間操作のパイプラインにはStatelessOpクラスが使われてきました。また、SinkにはSink.ChainedReferenceクラスが使われています。

これに対し、Gathererでは専用のパイプラインクラスであるGathererOpクラスが使用されます。また、SinkもGatherSinkクラスが使われます。

これをコードで確かめてみましょう。以下のコードはReferencePipelineクラスのgatherメソッドです。

    public final <R> Stream<R> gather(Gatherer<? super P_OUT, ?, R> gatherer) {
        return GathererOp.of(this, gatherer);
    }

 

ofメソッドがファクトリーメソッドになっており、GathererOpオブジェクトを返しています。

GathererOpクラスのopメソッドはパイプラインの前段がGathererOpクラスであれば、合成する処理が含まれていますが、基本的にはGathererOpクラスのオブジェクトを生成しているだけです。

もう一方のSinkの方ですが、mapメソッドなどではStatelessOpクラスのopWrapSinkメソッドをオーバーライドしていたのを思い出してください。

GathererOpクラスはGatherer専用のクラスなので、opWrapSinkメソッドもオーバーライドされずに、そのまま使われます。

以下にGathererOpクラスのopWrapSinkメソッドを示します。

    Sink<T> opWrapSink(int flags, Sink<R> downstream) {
        return new GatherSink<>(gatherer, downstream);
    }

 

ここでGatherSinkクラスが出てきました。

GatherSinkクラスもGatherer専用なので、匿名クラスなどを使わずに、そのまま使われます。

では、GatherSinkクラスの定義とコンストラクターを見ておきましょう。

    static final class GatherSink<T, A, R> implements Sink<T>, Gatherer.Downstream<R> {
        private final Sink<R> sink;
        private final Gatherer<T, A, R> gatherer;
        private final Integrator<A, T, R> integrator; // Optimization: reuse
        private A state;
        private boolean proceed = true;
        private boolean downstreamProceed = true;

        GatherSink(Gatherer<T, A, R> gatherer, Sink<R> sink) {
            this.gatherer = gatherer;
            this.sink = sink;
            this.integrator = gatherer.integrator();
        }

 

GatherSinkクラスはSinkインタフェースを実装しているのは当然ですが、Gatherer.Downstreamインタフェースも実装しています。

Gatherer.Downstreamインタフェースは、Gatherer.Integratorインタフェースのintegratメソッドで使用するpushメソッドを定義しています。

前回は下流に対してデータを流すという説明をしていましたが、実際はGatherSinkクラスのpushメソッドがコールされるわけです。

また、Gatherer.Downstreamインタフェースを実装しているため、ごっちゃにならないように下流のSinkを表すフィールドはdownstreamではなくsinkという名前になっています。

 

GatherSinkクラスの動作

前述したように、終端操作のパイプラインにおいて、forEachRemainingメソッドでイテレーションが実行されます。その時に、Sinkオブジェクトのacceptメソッドがコールされます。

これはGathererを使った場合も同じです。GatherSinkオブジェクトのacceptメソッドがコールされます。

では、GatherSinkクラスのacceptメソッドを見てみましょう。

        public void accept(T t) {
            /* Benchmarks have indicated that doing an unconditional write to
             * `proceed` is more efficient than branching.
             * We use `&=` here to prevent flips from `false` -> `true`.
             *
             * As of writing this, taking `greedy` or `stateless` into
             * consideration at this point doesn't yield any performance gains.
             */
            proceed &= integrator.integrate(state, t, this);
        }

 

コメントがおもしろいですけど、主題とは関係ないので...

acceptメソッドの内部では、Gatherer.Integratorインタフェースのintegrateメソッドがコールされています。

通常の中間操作であれば、下流のSinkオブジェクトのacceptメソッドをコールするのですが、ここではそれがありません。

Gathererを使う時は、下流にデータを流すかどうかはGathererによるためです。

とはいえ、下流にデータを流す(つまり、下流のSinkオブジェクトのacceptメソッドをコールすることに相当します)場合もあります。これはどこで行っているのでしょうか。

ヒントはintegrateメソッドの第3引数です。

前回、integratorメソッドの説明で、下流にデータを流す時はGatherer.Downstreamインタフェースのpushメソッドをコールしますと説明しました。

そのpushメソッドはintegrateメソッドの第3引数のdownstream変数に対して行っていたことを覚えていますでしょうか。

では、第3引数が何かというと、上のコードではthisを渡しています。

GatherSinkクラスはGatherer.Downstreamインタフェースを実装していますと前述しました。ということは、結局、自分自身のpushメソッドをコールしているということになります。

では、そのpushメソッドを見てみましょう。

    public boolean push(R r) {
        var p = downstreamProceed;
        if (p)
            sink.accept(r);
        return !cancellationRequested(p);
    }

 

ここで、下流のSinkオブジェクトに対してacceptメソッドをコールしていました。

つまり、Gathererで下流にデータをpushした時に、次段から処理が行われるということです。

このようにして、二重ループではなく、効率的に状態を保持した中間操作を行うことができるのです。

 

まとめ

いちおうまとめておきましょう。

Stream APIの動作は要約すると次のようになります。

  1. Spliteratorがイテレーションを行う
  2. Sinkが処理をまとめて、一括してデータを処理する

これに対し、GathererではSinkを工夫することで、下流にデータを流した時にだけ次段から連なる処理を行うようにします。

 

今回はストリームの開始時と終了時の処理や、パラレル処理の場合を省略しましたが、もし興味があるのであればソースを見てみるとおもしろいですよ。