2024/12/25

Stream Gatherer 基礎編

このエントリーをはてなブックマークに追加

本エントリーはJava Advent Calendarの最終日です。昨日はzoosm3さんのDBFlute Examples有志募集中でした。

今年もJava Advent Calendarが無事完了してなによりですね。

 

さて、今日、紹介するのはJava 24で正式に導入されるStream Gathererです。JEPはこちら。

Stream APIはJava 8で導入されて、はや10年。もうStream APIなしにJavaのプログラムを書けといわれても、ムリとしかいえないぐらいですね。少なくともさくらばはそうなのですが、みなさんはどうでしょう。

とはいうものの、Stream APIが万能というわけではありません。文句をつけたくなるところも多々あります。本題とは違いますが、例外が扱いにくいのはほんとどうにかしてほしい...

 

中間操作の制限

さて、どうにかしてほしかったものの1つに中間操作があります。

ごぞんじの通り、Stream APIは以下の3つの段階で処理が行われます。

  1. ソースからStreamオブジェクト生成
  2. 中間操作
  3. 終端操作

この流れをパイプラインと呼びますが、パイプの中をデータが流れてきてそれに応じて処理を加えていくイメージです。

この流れ作業の中で、中間操作は流れてきたデータに対しmapなどの処理を行い、処理後のデータを次に流していきます。重要なのが、中間操作は状態を持たない関数として表されることです。

状態を持てるのは、最後の終端操作だけです。とはいうものの、Collectorsクラスで提供されているユーティリティメソッドを使うだけであれば、状態を保持していることを意識する必要はありません。。

ここで、少しだけStreamパイプラインがどのように処理されるか考えてみましょう。

たとえば、中間操作としてAとB、終端操作としてCがあったとします。疑似的なコードで書くと、次のようになります。

    var stream1 = ソースからStreamオブジェクト生成();
    var stream2 = stream1.A();
    var stream3 = stream2.B();
    var result = stream3.C();

この時、stream2やstream3を生成した段階では、ストリームのパイプライン処理は行われません。

最後のCの段階で、A、B、Cをパックした1つの処理を作ります。

そして、ソースから生成された要素に対して、このA-B-Cを施していきます。そして必要に応じて、Cの段階で前後の要素の処理結果や、並列処理の場合であれば他スレッドとの結果の統合処理を行います。

このため、A-B-Cのうち、Cの処理の統合処理以外は、要素ごとに処理が独立しています。

逆にいうと、処理を独立させるために中間処理のAとBは状態を持たせないことが必要になってくるわけです。

 

ところが、処理によっては中間処理に状態を持たせたいことがあります。

たとえば、移動平均を考えてみましょう。

移動平均とは時系列データの平滑化のために使用される処理です。

直近のデータからさかのぼっていき、n個のデータで平均をとる処理です。

移動平均は、時系列データのノイズを取り除くことや、株価の移動平均線など様々な分野で使われる処理です。

しかし、現状のStream APIでは移動平均を実装するのが難しいのです。やるとしたら、終端操作でn個分のデータで平均をとるというCollectorインタフェースの実装クラスを作る必要があります。

もし中間操作で状態を持たせることができたら、もっと直感的に移動平均処理を実装できるはずです。

このようなニーズを解決するために導入されるのがStream Gathererなのです。

 

Gatererはgatherする、つまりストリームを流れるデータを集めるものです。このイメージは中間操作でのCollectorという感じ。名前もgatherとcollectで似ていますし。

もちろん、Collectorと同じく、単にデータを集めるだけでなく、それに対して何らかの処理を行うことも可能です。

中間操作を柔軟にするのがStream Gathererなのです。

 

Stream Gatherer

中間操作で状態を保持することができるようになるのがStream Gathererですが、その動作もCollectorと似ています。

Collectorと同様、ユーティリティクラスのGatherersクラスも提供されています。

まずは、このGatherersクラスを使って、Stream Gathererがどのようなものか理解していきましょう。

 

ウィンドウ

先ほどの移動平均を行うには時系列データをn個のデータの並びに変化させる必要があります。n個だけ見えるような処理なので、それを窓に見立てて統計では窓関数(Window Function)と呼ばれます。

Gatherersクラスではn個に区切っていく処理が提供されているので、まずそれを使ってみましょう。

n個に区切っていく手法として2種類が提供されていますが、例を見ればすぐに分かるはずです。

たとえば、0から9までの数列[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]を考えましょう。

1つ目の区切りはデータの被りがない手法です。

0から9までの数列を3個ずつ区切った場合、[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]となります。

GatherersクラスではwindowFixedメソッドがこれに相当します。

もう1つの区切りはデータの被りがあり、直近までのn個のデータで構成されるようにする方法です。

この方法で0から9までの数列を3個ずつで区切ると、[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8], [7, 8, 9]]となります。

GatherersクラスではwindowSlidingメソッドになります。

 

それぞれを実際のコードをJShellで確かめてみましょう。

まずはwindowFixedメソッドです。

jshell> List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).
   ...> stream().
   ...> gather(Gatherers.windowFixed(3)).
   ...> toList()
$1 ==> [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

jshell>

JShellでストリームパイプラインを行を分けて書く時には、行の最後をピリオドにするのがコツです。ピリオドで終わらせないとJShellは行端にセミコロンを省略できるので、そこで行が終わったと解釈してしまうためです。

さて、Stream Gathererは前述したように中間操作ですが、メソッドとしてはgatherを使用します。

引数の型はGatherインタフェースです。GatherersクラスのwindowFixedメソッドの戻り値がGatherオブジェクトになります。

この使い方もcollectメソッドの引数の型がCollectorインターフェスで、Collectorsクラスのメソッド群がGollectorオブジェクトを戻すのと同じですね。

windowFixedメソッドの引数には、データをいくつで区切るかを指定します。上記のコードでは3で区切っています。

結果は先ほど示したのと同じで、[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]となります。

windowFixedメソッドで作成されるGathererオブジェクトはストリームを流れてきたデータを保持するためのリストを内部に持っています。

ストリームに0が流れてくると、Gathererオブジェクトはそれをリストに追加します。1が流れてきた時も同様にリストに追加します。

2が流れてきた時に、内部のリストに保持したデータ数が3になったので、リストをストリームに流します。そして、データ保持用のリストは初期化して、次のデータを保持できるようにします。

このようにして、Gathererオブジェクトはデータを保持、また保持したデータを処理してストリームに流す処理を行います。

 

次にwindowSlidingメソッドを使ってみましょう。

jshell> List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).
   ...> stream().
   ...> gather(Gatherers.windowSliding(3)).
   ...> toList()
$2 ==> [[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8], [7, 8, 9]]

jshell>

windowSlidingメソッドの引数も、windowFixedメソッドと同じく、データの区切り数です。

結果が、データの被りがあり、1つずつずれていくことが分かります。

 

このwindowSlidingメソッドを使用すれば、移動平均も簡単に記述できます。

jshell> List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).
   ...> stream().
   ...> gather(Gatherers.windowSliding(3)).
   ...> map(fragments
   ...>       -> fragments.stream().
   ...>           collect(Collectors.averagingDouble(x -> x))).
   ...> toList()
$3 ==> [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]

jshell>

windowSlidingメソッドで3つずつに区切られたリストがストリームに流れることになります。そこで、次のmapメソッドで3つの平均を求める処理を記述しています。ストリームの中にストリームを書いているのでちょっと分かりにくいかもしれませんが、mapメソッドの結果として要素が3つのリストの平均がストリームを流れるようになります。

これでもcollectメソッドで移動平均を記述するより、かなり分かりやすくなっているはずです。

結果は1.0から8.0までのリストになりますが、要素数が元のリストから減っていることに注意してください。

 

Gatherersクラスの他のメソッド

GatherersクラスではwindowFixed/windowSlidingメソッドの他に、以下の3種類のメソッドを提供しています。

  1. fold
  2. scan
  3. mapConcurrent

mapConcurrentメソッドだけはちょっと毛色が違います。

通常、パラレルストリームは中間操作と終端操作のまとまりをパラレルに処理します。これに対し、mapConcurrentメソッドは中間操作のmap処理をパラレルに処理できます。もちろん、パラレルストリームと併用することも可能です。

これに関しては、次回のエントリーでStream Gathererの内部動作と合わせて、もう少し詳しく説明する予定です。

他の、foldメソッドとscanメソッドは動作としては似ています。ちょうど、終端操作のreduceメソッドのような動作になります。

foldメソッドもscanメソッドも引数は同じで、第1引数の型がSupplierインタフェース、第2引数の型がBiFunctionインタフェースです。

要するに、第1引数が引数なし、戻り値ありのラムダ式です。そして、第2引数が引数が2つ、戻り値ありのラムダ式になります。

第1引数のラムダ式の戻り値の型が、第2引数のラムダ式の第1引数の型および戻り値の型と同じです(厳密にはsuperとextendsが含まれていますが)。第2引数のラムダ式の第2引数の型はストリームを流れてくるデータの型です。

文章で書くと、ちょっと分かりにくいですね。実際にコードで確かめてみましょう。

たとえば、0から9までの数値のストリームを文字列に統合していくことを考えてみましょう。

jshell> List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).
   ...> stream().
   ...> gather(Gatherers.fold(
   ...>    () -> "",
   ...>    (text, value) -> text + value)).
   ...> toList()
$1 ==> [0123456789]

jshell>

foldメソッドの第1引数のラムダ式で、空の文字列を戻します。

この文字列に数値の文字表現を連ねていくのが、第2引数のラムダ式です。ラムダ式の第1引数で前回の結果が渡されるので、そこに現在のストリームの値valueを追加しています。

結果的に"0123456789"という文字列が作成できます。

これはcombinerのないreduceメソッドと同じような動きですね。

foldは畳み込むという意味なので、ストリームに流れてきたデータをまとめていくといった感じになります。

 

scanメソッドはfoldメソッドと引数は同じなので、foldメソッドをscanメソッドに置き換えて試してみましょう。JShellだと履歴が使えるので、こういう時に簡単に実行できますね。

jshell> List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9).
   ...> stream().
   ...> gather(Gatherers.scan(
   ...>    () -> "",
   ...>    (text, value) -> text + value)).
   ...> toList()
$2 ==> [0, 01, 012, 0123, 01234, 012345, 0123456, 01234567, 012345678, 0123456789]
    
jshell>

foldメソッドと違って、データの統合処理のその時点での値をストリームに流しているのが分かります。

こういう英語のニュアンスが非ネイティブには分かりにくいのですが、データを1つずつ処理していくことをscanしているという意味でしょうか。

 

Gathererを自作する

Gatherersクラスのユーティリティメソッドを使用して、Stream Gathererがどういうことをやるかのイメージはつかめたでしょうか。

それでは、このエントリーの最後に、Gatherersクラスを使用せずにGathererインタフェースを実装したクラスを作ってみましょう。

本エントリーでは何度もGathererとCollectorが似ているということを言及してきましたが、インタフェースで実装するメソッドも似ています。重要なメソッドの対応関係を次表にまとめました。

なお、ジェネリクスの型パラメータは省略しています。

  Gatherer Collector
内部状態の初期化 Supplier initializer() Supplier supplier()
新しいデータの処理 Gatherer.Integrator integrator() BiConsumer accumulator()
パラレル処理時の状態統合 BinaryOperator combiner() BinaryOperator combiner()
最後の処理 BiConsumer finisher() Function finisher()

これらのメソッド群の中で、Collectorインタフェースと戻り値の型が異なるのがintegratorメソッドとfinisherメソッドです。

これらのメソッドの型が異なるのは、ストリームの上流から流れてきたデータを下流に流すという処理が必要になるためです。

特にintegratorメソッドはjava.util.functionパッケージで提供されている関数型インタフェースではないというところが着目すべきポイントです。

 

また、initializerメソッド、combinerメソッド、finisherメソッドはdefaultメソッドなので、状態を扱わないのであれば実装する必要はありません。

そこで、状態を扱わない単純なGathererインタフェースの実装クラスから作ってみましょう。

 

何もしないGatherer

まずはじめに作るのは、ストリームの上流から流れてきたデータを何もせずに下流に流すというGathererです。

状態を扱わないので、上述したようにintegrateメソッドだけを実装するクラスを作ればOKです。

Gathererインタフェースにはファクトリメソッドとして複数のofメソッドと、ofSequentialメソッドが用意されています。ofメソッドとofSequentialメソッドの違いについては次回説明するとして、ここではofメソッドを使用します。

ofメソッドの引数にはinitializeメソッドやintegrateメソッドで戻すオブジェクトをラムダ式で指定します。

ofメソッドは3種類のオーバーロードがあります。ここでは、integratorメソッドで戻すオブジェクトだけを指定するオーバーロードを使用します。

integratorメソッドで戻すのはGatherer.Integratorインターフェスを実装したオブジェクトです。Gatherer.Integratorインターフェスは関数型インタフェースで、integrateメソッドをラムダ式で記述します。

そのままラムダ式で記述できるのですが、ここではGatherer.Integratorインターフェスが手いきゅしているファクトリメソッドを使用します。

ファクトリメソッドにはofメソッドとofGreedyメソッドの2種類ありますが、ここではofGreedyメソッドを使用します。

関数型インタフェースなのにファクトリメソッドを提供していることや、ofメソッドとofGreedyメソッドの違いについては後述します。

    Gatherer.Integrator<Void, Integer, Integer> noEffectIntegrator =
            Gatherer.Integrator.ofGreedy(
                    (_, value, downstream) -> {
                        // 下流にデータを流す
                        downstream.push(value);

                        // ストリーム処理を続けるので、trueを戻す
                        return true;
                    });

    var list = Stream.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
                     .gather(Gatherer.of(noEffectIntegrator))
                     .toList();

ofGreedyメソッドの引数には引数が3種類、戻り値がbooleanのラムダ式を記述します。

第1引数: 状態を保持するオブジェクト
第2引数: ストリーム上流からのデータ
第3引数: 下流に流すためのオブジェクト
戻り値: ストリーム処理を継続するかどうか

第1引数の状態保持用のオブジェクトは、initializerメソッドのSupplierインタフェースで作成されるオブジェクトです。

第2引数がストリームの上流から流れてくるデータになります。

上流からのデータを処理して、下流に流すために使用するのが第3引数になります。第3引数の型はGatherer.Downstreamインタフェースです。

 

上記のコードでは状態を使用していないので、第1引数の型はVoidです。

なお、この第1引数の型は、Gatherer.Integratorインタフェースの1つ目のジェネリクスの型パラメータに相当します。

上記のコードでは状態は使用しないので、ラムダ式の引数はアンダースコアの"_"で記述しています。

本題とは外れますが、未使用の変数をアンダースコアで書けるようになったのはJava 22で導入されたJEP 456: Unnamed Variables & Pattersによるものです。

続いて第2引数です。ここでは数値(Integerクラス)のストリームを使用するので、ラムダ式の第2引数の型もIntegerクラスです。Gatherer.Integratorインタフェースの2つ目のジェネリクスの型パラメータがこれを表しています。

このGatherer.Integratorインタフェースのラムダ式では何も処理をせずに、上流から流れてきたデータを単に下流に流すだけです。そのために使用するのが、Gatherer.Downstreamインタフェースのputメソッドです。

putメソッドの引数の型は、Gatherer.Integratorインタフェースのジェネリクス型パラメータの最後の型パラメータです。上記のコードでは数値をそのまま使い続けるので、Integerクラスとなります。

もし、下流に流すデータの型を変更する場合は、型パラメータで指定するようにします。

 

さて、Gatherer.Integratorオブジェクトが生成できたので、ストリームパイプラインに適用してみましょう。GathererインタフェースのofメソッドでGatherer.Integratorオブジェクトを指定します。

実行したら、リストの[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]が得られるはずです。

 

フィルター処理を行うGatherer

次に、もうちょっと複雑な(といっても簡単ですけど)Gathereを作成してみましょう。

ストリームの中間処理にfilterメソッドがありますが、これと同じような処理を作ってみます。ここでは偶数だけをフィルターするGathererを作ることにします。

数値のストリームで、Gathererでは型変換を行わないので、型パラーメタなどは先ほどの例と同じです。

    Gatherer.Integrator<Void, Integer, Integer> oddFilterIntegrator =
        Gatherer.Integrator.ofGreedy(
            (_, value, downstream) -> {
                // 偶数であれば下流にデータを流す
                if (value % 2 == 0) {
                    downstream.push(value);
                }

                // ストリーム処理を続けるので、trueを戻す
                return true;
            });

    var list = Stream.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
                     .gather(Gatherer.of(oddFilterIntegrator))
                     .toList();

 

変更している点は、ラムダ式の中でif文を追加したところです。

if文で偶数かどうかを判定し、偶数であれば下流にデータを流します。

奇数の場合はデータを下流に流さなくてもいいのかと思いますよね。だいじょうぶなのです。

データを下流に流すかどうかはGathererに任されています。

たとえば、前述したGatherersクラスのwindowFixedメソッドはwindowSlidingメソッドでも結果はソースの要素数よりも減っていましたね。

どのようなデータを流すか、流さないかはGathererの内部処理で決めればよいことになっています。

さて、実行すると[0, 2, 4, 6, 8]が得られます。ぜひ試してみてください。

 

では、ストリームの処理を途中でやめたい場合はどうでしょう。

たとえば、ある特定の数値より大きい数値が上流から流れてきた場合、ストリーム処理をやめることにしましょう。

以下のコードでは8以上の数値が流れてきたら、そこでストリーム処理を停止させます。

    Gatherer.Integrator<Void, Integer, Integer> limitIntegrator =
        Gatherer.Integrator.of(
            (_, value, downstream) -> {
                if (value < 8) {
                    // 8より小さい数であれば下流にデータを流し
                    // ストリーム処理を続ける
                    downstream.push(value);
                    return true;
                } else {
                    // ストリーム処理を停止
                    return false;
                }
            });

    var list = Stream.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
                     .gather(Gatherer.of(limitIntegrator))
                     .toList();

 

ラムダ式の中で8と比較し、8より小さければ下流にデータを流し、trueを戻り値にしています。

8以上の場合、戻り値をfalseにします。こうすることで、これ以上のストリーム処理を停止させることができます。

パッと見、if文の部部分だけ変更したように見えますが、その他の部分で大きな違いがあります。

Gatherer.IntegratorインタフェースのファクトリメソッドとしてofGreedyメソッドではなく、ofメソッドを使用している点です。

ofメソッドとofGreedyメソッドの使い分けは以下のようになっています。

  1. ofメソッド: ストリーム処理を途中で停止することがある場合
  2. ofGreedyメソッド: ストリーム処理を必ず最後まで処理する場合

メソッドのシグネチャーをチェックするとofGreedyメソッドは引数も戻り値も型がGatherer.Integrator.Greedyインタフェースになっています。

Gatherer.Integrator.Greedyインタフェースはメソッドを定義していません。つまり、Cloneableインタフェースのような、ある特徴を持っているということを表すマーカーインタフェースになっているということです。

そして、その特徴がストリーム処理が途中で停止することがないということです。

ストリーム処理の停止することがなければ、最適化が行いやすくなります。この最適化については、次回紹介する予定です。

greedyは「貧欲な」とか「強欲な」といった意味の形容詞ですが、正規表現で最長マッチさせるときにGreedyといいますね。こういう英語のニュアンスはほんとよく分からないです。

分からないついでですが、ストリーム処理を停止させることをShort-Circuitと呼びます。電気回路のショートもしくは短絡のことですが、回路をショートさせたらやばいような気がするんですよね。英語だとニュアンスが違うんですかねぇ??

 

さて、上記のコードを実行させた結果は[0, 1, 2, 3, 4, 5, 6, 7]になります。

 

状態を扱うGatherer

ここまでは状態を保持せずに、流れてきたデータを処理するだけでした。しかし、これだと従来の中間操作と変わりません。

そこで、次に状態を保持するGathererを作ってみましょう。

まずは単純に流れてきたデータを文字列として追加していくGathererです。ようするに、Gatherers.scanメソッドのサンプルコードと同じ動作をするGathererです。

 

ここまでのGathererのサンプルコードではファクトリーメソッドのofメソッドを使用してきました。ofメソッドを使用すると、gatherメソッドの処理がパラレルに実行されることがあります。

しかし、文字列を結合していく場合はデータの順番が維持されることが期待されます。そこで、ファクトリーメソッドのofSequentialメソッドを使用して、パラレル処理されないようにします。

    var integrator =
        Gatherer.Integrator.<StringBuilder, Integer, String>ofGreedy(
            (builder, value, downstream) -> {
                builder.append(value);
                downstream.push(builder.toString());
                return true;
            });

    var list = Stream.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
                     .gather(Gatherer.<Integer, StringBuilder, String>ofSequential(
                             StringBuilder::new,
                             integrator))
                     .toList();

 

ここまでの例とはジェネリクスの型パラメータの指定方法を変更しました。

メソッド名の前に型パラメータを記述する方法は普段は使わないのとは思います。とはいうものの、ファクトリーメソッドで変数宣言の方に型パラメータを記述するのは、ちょっと煩雑です。

「こういう書き方もできるんだ」ぐらいに思ってください。

Gatherer.Integratorインタフェースのファクトリーメソッドと、Gathererインタフェースのファクトリーメソッドで型パラメータの順番が違うので、ちょっと分かりにくいのが難点。

 

さて、ここでは状態を保持させるためにStringBuilderクラスを使用しました。StringBuilderオブジェクトに流れてきたデータをアペンドし、下流にはStringBuilderオブジェクトから文字列を生成して流しています。

GathererインタフェースのofSequentialメソッドには4種類のオーバーロードがあります。

この例では、ストリームの最後になにか処理することはないので、finisherは必要ありません。そこで、initializerとintegratorを引数にとるofSequentialメソッドを使用しています。

ofSequentialメソッドの第1引数であるinitializerは、型がSupplier<StringBuilder>です。ここでは、StringBuilderオブジェクトの生成をメソッド参照を使用して指定しています。

integratorのラムダ式の第1引数がinitializerで生成したStringBuilderオブジェクト(変数builder)になります。

上流から流れてきたvalueをbuilderにアペンドし、下流にはbuilder.toStringメソッドで文字列を流しています。

 

このコードを実行すると、先ほどのscanメソッドのサンプルと同様に["0", "01", "012", ... , "0123456789]となるリストが生成されます。

 

次にfinisherを使用するようなGathererを作ってみましょう。

ここでは、Gatherers.windowFixedメソッドと同じ動作をするGathererを作成してみます。

前のサンプルでは状態を保持させるのにStringBuilderクラスを使用しましたが、ここではStateというクラスを作成します。Stateクラスは数値を保持するリストをフィールドとして持つようにしました。

    class State {
        // ウィンドウ用の数値を保持するリスト
        List<Integer> list = new ArrayList<>();

        static Gatherer.Integrator<State, Integer, List<Integer>> integrator
            = Gatherer.Integrator.ofGreedy(
                (state, value, downstream) -> {
                    // listにデータを追加
                    state.list.add(value);

                    // listの要素数が3になったら下流に流し
                    // 新たにリストを生成して、listに代入する
                    if (state.list.size() >= 3) {
                        downstream.push(state.list);
                        state.list = new ArrayList<>();
                    }
                    return true;
                });
	
        static void finisher(State state,
                             Gatherer.Downstream<? super List<Integer>> downstream) {
            // ストリームの最後になったら、要素数が3に満たなくても下流に流す
            downstream.push(state.list);
        }
    }

    var result = Stream.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
                     .gather(Gatherer.<Integer, State, List<Integer>>ofSequential(
                             State::new,
                             State.integrator,
                             State::finisher))
                     .toList();

 

Gather.ofSequentialの第1引数であるinitializerは、メソッド参照でStateオブジェクトを生成させています。

integratorでは、まずStateオブジェクトが保持するリストに、上流から流れてきた数値データを追加します。

ここではウィンドウの区切り数を3としています。Stateクラスのリストの要素数が3以上になった時、リストを下流に流します。

そして、新たにリストを生成しています。

finisherでは、Stateオブジェクトが保持するリストを下流に流します。finisherはストリームの最後でコールされ、これ以上データは流れてこないため、要素数が3に満たなくても下流にリストを流します。

finisherメソッドの第2引数のGatherer.Downstreamは型パラメータにワイルドカードが必要な点に注意してください。

後は、これらをGatherer.ofSequentialメソッドの引数にするだけです。

 

実行した結果のresultは[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]となります。

 

移動平均

最後に移動平均を求めるGathererを作ってみましょう。

ここまでやってくれば、そんなに難しくはないですね。1つ前のリストを区切るGathererと同じように作れますが、finisherは必要ありません。

ここでは、ちょっと汎用にするためにGathererオブジェクトを生成するmovingAverageメソッドを作成しました。movingAverageメソッドの引数には何個の数値を使用して移動平均を求めるかを指定します。

    Gatherer<? super Number, ?, Double> movingAverage(final int n) {
    
        Gatherer.Integrator<List<Double>, ? super Number, Double> integrator
            = Gatherer.Integrator.ofGreedy(
                (list, value, downstream) -> {
                    list.add(value.doubleValue());

                    if (list.size() >= n) {
                        // 引数で指定された個数以上の場合
                        // listを使用して平均を求め
                        // 下流に流す
                        var ave =
                            list.stream()
                                .collect(
                                    Collectors.averagingDouble(x -> x));
                        downstream.push(ave);

                        // 先頭の要素を削除
                        list.removeFirst();
                    }

                    return true;
                });

        return Gatherer.ofSequential(ArrayList<Double>::new, integrator);
    }

 

movingAverageメソッドの戻り値の1つ目の型パラメータが上流からのデータの型です。数値であれば何でもよいので、Numberクラスのワイルドカードを使用しています。

2つ目の型パラメータがワイルドカードになっているのは、戻り値のGathererオブジェクトを使う側からすると内部状態は気にしなくてもよいからです。

実際には内部状態としてDoubleクラスのリストを使用します。

最後の型パラメータが下流に流すデータの型です。移動平均値には、浮動小数点のDoubleクラスを使用します。

では、movingAverageメソッドの中を見ていきましょう。

ここでも、Gatherer.Integrator.ofGreedyメソッドを使用してGatherer.Integratorオブジェクトを生成しています。

型パラメータはmovingAverageメソッドと同じですが、内部状態を表す第1型パラメータだけはワイルドカードではなく、実際に使用するList<Double>を指定します。

上流からデータが流れてきたら、listに追加するのは先ほどの例と同じです。

listの要素数がmovingAverageメソッドの引数で指定された数より大きい場合、listの要素で平均の計算を行います。

そして、平均を求めたら、下流に流します。

if文の最後で、listの先頭要素を削除しておきます。

これでintegratorができました。後はGatherer.ofSequentialメソッドでGathererオブジェクトを生成するだけです。

ofSequentialメソッドの第1引数でリストを生成し、第2引数は作成したintegratorを指定します。

 

では、このmovingAverageメソッドを使用してみましょう。

    var list = Stream.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
                     .gather(movingAverage(3))
                     .toList();

 

結果は[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]になります。

 

Gathererの使い方のまとめ

最後にGathererの使い方をまとめておきましょう。

  • Gathererを使用することで、状態を用いた中間操作が可能
  • ユーティリティクラスとしてGatherersクラスが提供されている
  • Gahtererを自作する場合、ファクトリーメソッドを使うと便利
    • パラレル処理をしない(順番を保持): ofSequential
    • パラレル処理が可能: of
  • Gathererの核となるのは、Gatherer.Integrator
  • Gatherer.Integratorのファクトリーメソッドは2種類
    • ストリーム処理の停止がある場合: of
    • ストリーム処理の停止がない場合: ofGreedy
  • 下流にデータを流すためにはGatherer.Downstreamのpushを使用する
  • ジェネリクスの型パラメータが多いのだが、恐れる必要はない

本エントリーではGathererの使い方を紹介しました。次回は、Gathererがどのように動作するのかについて紹介する予定です。

0 件のコメント: