2013/12/11

大都会岡山 Advent Calendar 11 日目 - 岡山といえばパフェだよね #2

このエントリーをはてなブックマークに追加
_DSC6753

このエントリは 大都会岡山 Advent Calendar 2013 の第 11 日目のエントリーです。

昨日は @ne_sachirou さんの 岡山に帰ろう で、明日は Tama Eguchi さんです。

 

去年も、フルーツパフェの街 岡山の紹介をしたのですが、こりもせず今年も。ということで、今年、櫻庭が岡山で食べたパフェを紹介しようと思います。

カフェレストランオリビエ

今年、櫻庭が岡山にいったのは岡山 Java ユーザ会の Java Day Tokyo 2013 報告会@岡山 に参加するためです。報告会は 6/15 だったのですが、櫻庭は前泊して 6/14 に岡山入りしました。

岡山に着いたのは、夜の 9 時ぐらい。その時間でまだパフェを出してくれていて、駅のそばといえば、JR 西日本が運営しているホテルグランヴィアのカフェのオリビエです。

去年もここで食べましたけど、やっぱり一番安定していていいですね。

今年、食べたのは岡山県産足守メロンのパフェ。メロンがふんだんに使われていて、豪華ですね。

ただ、グラスの一番下にコーンフレークを使っているのがダメ。パフェの最後にコーンフレークなどを食べさせられる不幸を考えてみてください!!

_DSC6779

_DSC6757

_DSC6762

関連ランキング:カフェ | 岡山駅前駅岡山駅西川緑道公園駅

Antenna

続いて、城下にあるカフェの Antenna でいづし大正浪漫パフェ。

報告会の前のランチに来たわけですが、ランチで食べたスリランカのフィッシュカレーはおいしかったのですが....

パフェの要素ってだいたい決まっていて、フルーツ、アイスクリーム、生クリームが 3 大要素だと思います。この中で一番おいしさがはっきり分かれるのが生クリーム。

フルーツは新鮮であれば、それほど外すことはないですし、アイスクリームもどうせ市販のものが多いはずなのでそれほど差は表れません。

ところが、生クリームはすごいよく分かるんですよね。

ここで使われているのは、たぶん業務用の冷凍生クリーム。これがまたおいしくないんですよ。他にもエスプーマで作った生クリームもおいしくありません。エスプーマはスタバで使っているやつです。

なるべく手をかけたくないというのは分かるんですけど、ここまで手を抜かなくてもいいんじゃないかなぁ。少なくとも看板メニューにしているわけなんだし。

ということで、残念なパフェでした。

_DSC6968

_DSC6964

_DSC6979

_DSC6960

_DSC6943

スリランカフィッシュカレー

_DSC6944

_DSC6950

パパドもついてました

関連ランキング:カフェ | 城下駅柳川駅県庁通り駅


コンテンツカフェ

報告会の後、打ち上げから帰ってきても、まだ日が沈んでません! ということで、もう 1 つパフェです。

天満屋のそばのコンテンツカフェでピオーネのパフェ。

ところが、ここも冷凍生クリーム。この日食べた 2 つのパフェはどちらも外してしまいました。残念。

_DSC7157

_DSC7150

_DSC7169

関連ランキング:カフェ | 城下駅県庁通り駅郵便局前駅



クラブラティエ

クラブラティエは岡山珈琲館が経営している、パン屋を併設しているカフェです。

昨日食べたパフェが 2 つとも残念だったので、最後にもう 1 つ食べていこうと思い、岡山駅のそばのここに来てみたのでした。

食べたのは夏のフルーツパフェ。でも、パフェには全然期待していませんでした。だって、珈琲館ですよww

ところが、これがおいしかった!!

生クリームもちゃんとしたのつかっていたし、フルーツもおいしかったのです。マンゴーが冷凍物なのがちょっと残念ですけど、まぁ許容範囲。

しかも、一番下はコーンフレークではなくて、ジュレなのもポイントが高いです。

何ごともあなどってはいけないですね ^ ^;;

_DSC8103

_DSC8096

_DSC8115

関連ランキング:喫茶店 | 岡山駅前駅西川緑道公園駅岡山駅





番外編 #1 フリュティエ

岡山のフルーツパフェの Web にパフェの一覧があるのですが、そこに出ていたのでフリュティエ。ここではさんすての中にあり、テイクアウトのみのケーキ屋さんです。

Web に出ていたパフェはなかったので、普通のケーキ買ってきました。

_DSC6796

_DSC6788

_DSC6801
_DSC6839

_DSC6812

_DSC6865
_DSC6881

関連ランキング:洋菓子(その他) | 岡山駅岡山駅前駅西川緑道公園駅




番外編 #2 白十字 一番街店

こちらも Web に出ているので来てみたのですが、パフェがない! 完全に自分が勘違いしていたのですが、パフェがあるのは白十字 今店で一番街店ではなかったのです ><

それでも、ケーキは食べていくのです。

_DSC6926

_DSC6936

_DSC6921

関連ランキング:ケーキ | 岡山駅前駅岡山駅西川緑道公園駅





番外編 #3 さくら Cafe

もうパフェも岡山も関係ないのですが、尾道で食べた和菓子がおいしかったので、いっしょに貼っておきます ^ ^;;

_DSC7918

_DSC7940

_DSC7970

関連ランキング:喫茶店 | 尾道駅


おまけ

ほんとについでなのですが、尾道でみつけた子猫が超絶かわいかったので、貼っておきます ^ ^;;;;

_DSC7352

来年は第 3 弾やりたいので、ぜひ岡山によんでください!!

2013/12/09

Java Advent Calendar 9 日目 - Stream のパラレル処理

このエントリーをはてなブックマークに追加
このエントリは Java Advent Calendar 2013 の第 9 日目です。

昨日は、@nabedge さんの Mixer2のSpringMVC連携機能がver 1.2.17でさらに進化!
明日は Satoyuki Tsukano さんです。

去年も一昨年も Project Lambda について書いてきました。一昨年は Project Lambda の基本的なところ (まだこの頃は Stream がありませんでした)、去年は Stream の遅延処理についてです。

今年はこの Advent Calendar でも Project Lambda や Stream を扱っている人がいるぐらい、興味を持っている人が増えてきたと思います。

普通の Stream の使い方とかは、当たり前なので、ちょっと違う視点から書いてみようと思います (普通の Stream の使い方は ITpro の Java 技術最前線にいつか書きます)。
で、何を取り上げるかというと、パラレル処理です。去年はシリアルの方をやったので、残っていたほうです。

Stream のパラレル処理は Java SE 7 で導入された Fork/Join Framework がベースになっています。でも、どうやって Fork/Join Framework を使っているのかがよく分からないので、そこら辺を調べてみるということです。

ここでは、Fork/Join Framework がどうやってパラレル処理をするかまでは立ち入らずに、Stream がどのように Fork/Join Framework を使っているかだけにとどめておきます。
で、解析するコードはこちらです。

        IntStream.range(0, 11)
                 .parallel()
                 .map(x -> x*2)
                 .reduce(0, (x, y) -> x+y);

0 から 10 までの整数の 2 乗和を求める処理です。
では、順番に処理を追っていきましょう。
  1. IntStream.range
  2. parallel
  3. map
  4. reduce
なお、解析には JDK 8b118 を使用しています。今後、コードが変化する可能性もあるので、ご了承ください。

というのも、去年使用した b64 と比較すると、実装がかなり変わっているからです。ここが違うということはいいませんけど、かなり変わっていました。これから変更される可能性はすくないですが、テストの過程で若干変化する可能性はあります。

IntStream.range メソッド

普通の Stream は Iterator インタフェースと同じように、ソースがあってそこから生成するのですが、いくつかそれとは違う生成法もあります。IntStream.range メソッドもそんなメソッドのうちの 1 つです。

そういえば、Stream と何も書いていない時は、Stream インタフェースと IntStream インタフェースなどのプリミティブ用 Stream を合わせた言い方にしています。ここのインタフェースを指す時は Stream インタフェースと記述し、オブジェクトであれば Stream オブジェクトと書きます。

ということで、Stream にはオブジェクト用の Stream インタフェースと、プリミティブ用の IntStream/LongStream/DoubleStream インタフェースがあるわけです。この中で IntStream インタフェースと LongStream インタフェースだけ range メソッドが定義されています。

range メソッドは引数が 2 つで、1 つ目が Stream がはじまるはじめの数、2 つ目が最後の数の 1 つ大きい数です。上のように range(0, 11) と書くと、0 から 10 までの IntStream オブジェクトが生成されます。

2 つ目の引数を含めたい時は rangeClosed を使います。

この range メソッドを使えば、for (int i = 0; i < 11; i++) { ... } のようなループを Stream で書き換えることができるはずです。

まぁ、それはいいとして、range メソッドの実装です。

    public static IntStream range(int startInclusive, int endExclusive) {
        if (startInclusive >= endExclusive) {
            return empty();
        } else {
            return StreamSupport.intStream(
                    new Streams.RangeIntSpliterator(startInclusive, 
                                                    endExclusive,
                                                    false),
                                                    false);
        }
    }

RangeIntSpliterator オブジェクトを作成して、StreamSupport.intStream メソッドをコールしています。

そういえば、Project Lambda ではインタフェースにデフォルトメソッドを書けるようになりましたが、static メソッドも書けるようになったのでした。ここでも、それが使われています。

ここで重要なのが RangeIntSpliterator クラスです。RangeIntSpliterator クラスは java.util.Spliterator インタフェースの実装クラスです。

Spliterator インタフェースは分割のためのインタフェースで、Stream を使う時には必ず出てきます。とはいっても、表にはあまり出てこないインタフェースなので、意識はしないと思います。

ここでは、そういう Spliterator インタフェースの実装クラスを使っていたとところにとどめておきましょう。

StreamSupport.intStream メソッドに移りましょう。

    public static IntStream intStream(Spliterator.OfInt spliterator, 
                                      boolean parallel) {
        return new IntPipeline.Head<>(
            spliterator,
            StreamOpFlag.fromCharacteristics(spliterator),
            parallel);
    }

intStream メソッドでは、IntPipeline.Head オブジェクトを生成しています。IntPipeline.Head クラスのスーパークラスが IntPipeline クラスです。そして、この IntPipeline クラスが IntStream インタフェースの実体とでもいうクラスです。

そして、IntPipeline クラスには 3 種類の内部クラスがあります。その 1 つが Head クラスです。いうなれば Stream のパイプラインの先頭ということだと思います。

最後に Head クラスのコンストラクタの最後に parallel を指定できるようですが、ここでは false が設定されています。

ということで、range メソッドで IntPipeline.Head オブジェクトが作成されることが分かりました。

parallel メソッド

先ほど生成した IntPipeline.Head オブジェクトは parallel の指定がされていなかったので、paralle メソッドでパラレル対応にします。とはいっても、パラレル処理用のクラスがあるわけではありません。

parallel メソッドは IntPipeline クラスではなく、そのスーパークラスの AbstractPipeline クラスで定義されています。

    public final S parallel() {
        sourceStage.parallel = true;
        return (S) this;
    }

単純に parallel 変数に true を代入しているだけでした。

map メソッド

続いて、map メソッドです。この map メソッドに関しては、去年の blog で紹介したのとはずいぶん変化しています。

    public final IntStream map(IntUnaryOperator mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<Integer>(
                this, StreamShape.INT_VALUE,
                StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<Integer> opWrapSink(int flags, Sink sink) {
                return new Sink.ChainedInt(sink) {
                    @Override
                    public void accept(int t) {
                        downstream.accept(mapper.applyAsInt(t));
                    }
                };
            }
        };
    }

去年の実装では ValuePipeline クラスを使用していたのですが、今は StatelessOp オブジェクトを生成して返しています。この StatelessOp クラスは、先ほどの Head クラスと同様に IntPipeline クラスの内部クラスで、サブクラスになっています。

Stream の中間のパイプラインのうち、ステートを持たないものが StatelessOp クラスになるようです。map メソッドは順番は特に保持しなくてもかまわないので、StatelessOp クラスが選択されているのだと思います。

クラス名は変化していますが、処理自体は Sink インタフェースの匿名クラスで記述されていおり、処理を遅延させられるようになっています。

ちなみに Sink インタフェースは、java.util.functions.Consumer インタフェースのサブインタフェースで、連なった処理を記述するためのインタフェースのようです。

accept メソッドで downstream の accept メソッドをコールしているのは去年と同じですね。

reduce メソッド

ここまではパラレルに関する実質的な処理は表れませんでした。パラレル処理があるのは、パイプラインの途中過程ではなく、最後の処理になります。

去年も forEach メソッドの中でパラレルかどうかのフラグがあったところまでは見ていましたが、reduce でも同じような処理でパラレル化どうかを切り分けています。

では、reduce メソッドの実装を見ていきましょう。

    public final int reduce(int identity, IntBinaryOperator op) {
        return evaluate(ReduceOps.makeInt(identity, op));
    }

ReduceOps クラスは ReduceOp オブジェクトを生成するためのファクトリクラスです。ReduceOp クラスは TerminalOp インタフェースの実装クラスで、パイプラインの最後の処理を表しています。
とりあえず、ReduceOps.makeInt メソッドを見てみましょう。

    public static TerminalOp<Integer, Integer>
    makeInt(int identity, IntBinaryOperator operator) {
        Objects.requireNonNull(operator);
        class ReducingSink
                implements AccumulatingSink<Integer,
                           Integer, ReducingSink>, 
                           Sink.OfInt {
            private int state;

            @Override
            public void begin(long size) {
                state = identity;
            }

            @Override
            public void accept(int t) {
                state = operator.applyAsInt(state, t);
            }

            @Override
            public Integer get() {
                return state;
            }

            @Override
            public void combine(ReducingSink other) {
                accept(other.state);
            }
        }
        return new ReduceOp<Integer, Integer, 
                             ReducingSink>(StreamShape.INT_VALUE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

makeInt メソッドの中で ReducingSink クラスを定義しています。このクラスも Sink インタフェースの実装クラスです。とりあえず、combine メソッドと accept メソッドがあったということだけ覚えておきましょう。

そして、この ReducingSink オブジェクトを生成する makeSink メソッドを定義した ReduceOp オブジェクトを返しています。

では、再び IntPipeline クラスの reduce メソッドに戻って、evaluate メソッドを追ってみましょう。このメソッドは IntPipeline クラスではなく、スーパークラスの AbstractPipeline クラスで定義されています。

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(
                     this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(
                     this, sourceSpliterator(terminalOp.getOpFlags()));
    }

ここでやっとパラレルかどうかで振り分けを行っています。

evaluateParallel メソッドの引数で sourceSpliterator メソッドがコールされていますが、この戻り値は IntStream.range メソッドで使用した RangeIntSpliterator オブジェクトになります。ようするに、自分自身と Spliterator オブジェクトを引数にして、evaluateParallel メソッドをコールしているわけです。

この terminalOp 変数は、さきほど作成した ReduceOp オブジェクトを指しています。

        public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<P_IN> spliterator) {
            return new ReduceTask<>(this, helper, spliterator).invoke().get();
        }

evaluateParallel メソッドでは、ReduceTask オブジェクトを生成しています。コンストラクタの引数の this が ReduceOp オブジェクトで、helper 変数が IntPipeline オブジェクト、spliterator 変数が RangeIntSpliterator オブジェクトです。

なんたら Task というクラスが出てくると、なんとなくパラレルっぽいですね。

それもそのはず、ForkJoinTask <- CountedCompleter <- AbstractTask <- ReduceTask というクラス構成になっていて、Fork/Join Framework がようやく出てきました。

ちなみに、ReduceTask クラスは ReduceOps クラスの内部クラスになっていて、ほとんどの処理は AbstractTask クラスで行っているようです。

ReduceTask オブジェクトが生成できたら、invoke メソッドをコールします。invoke メソッドを定義しているのは ForkJoinTask クラスです。

    public final V invoke() {
        int s;
        if ((s = doInvoke() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

if 文の中に入ってしまってますけど、重要なのは doInvoke メソッドです。doInvoke メソッドも ForkJoinTask クラスで定義されています。

    private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
            externalAwaitDone();
    }

処理を行っているのは、doExec メソッドです。doExec メソッドも ForkJoinTask に定義されています。

    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

そして、処理は exec メソッドに飛ばされます。exec メソッドは CountedCompleter クラスで定義されています。

    protected final boolean exec() {
        compute();
        return false;
    }

で、compute メソッドに委譲されています。ついでに、戻り値は false なので、先ほどの doExec メソッドの if (completed) ... は実行されないことがわかります。

そして、compute メソッドを定義しているのは AbstractTask クラスです。

    public void compute() {
        Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
        long sizeEstimate = rs.estimateSize();
        long sizeThreshold = getTargetSize(sizeEstimate);
        boolean forkRight = false;
        @SuppressWarnings("unchecked") K task = (K) this;
        while (sizeEstimate > sizeThreshold
               && (ls = rs.trySplit()) != null) {
            K leftChild, rightChild, taskToFork;
            task.leftChild  = leftChild = task.makeChild(ls);
            task.rightChild = rightChild = task.makeChild(rs);
            task.setPendingCount(1);
            if (forkRight) {
                forkRight = false;
                rs = ls;
                task = leftChild;
                taskToFork = rightChild;
            }
            else {
                forkRight = true;
                task = rightChild;
                taskToFork = leftChild;
            }
            taskToFork.fork();
            sizeEstimate = rs.estimateSize();
        }
        task.setLocalResult(task.doLeaf());
        task.tryComplete();
    }

やっと、Fork/Join Framework っぽい書き方が出てきました。

Fork/Join Framework は分割統治法で、タスクを分割し、分割したタスクを fork していくという処理になります。このタスクを分割するという処に Spliterator が使われているわけです。

RangeIntSpliterator クラスの場合、単純にはじめと終わりの半分のところで分割しています。そして、makeChild メソッドでタスクを分割していますが、ReduceTask クラスの場合、単純に新しい ReduceTask オブジェクトを生成しているだけです。

ReduceTask オブジェクトが対象とする範囲は Spliterator オブジェクトが持っているので、これで大丈夫なわけです。

そして、左、右の順番で fork されています。fork するということは、パラレルに処理されることになります。fork すると、そのタスクの compute メソッドがコールされるので、再帰的にタスクを分割して処理できます。

タスクが十分に小さくなった段階で、doLeaf メソッドがコールされます。ReduceTask クラスでの doLeaf メソッドは次のようになります。

        protected S doLeaf() {
            return helper.wrapAndCopyInto(op.makeSink(), spliterator);
        }

先ほど ReduceOps クラスの makeInt メソッドで定義した ReduceOp クラスの makeSink メソッドがコールされているので、ReduceSink オブジェクトが生成されて、引数になります。

wrapAndCopyInto メソッドは AbstractPipeline クラスで定義されています。ちょっとはしょりますが、wrapAndCopyInto メソッドは内部的に copyInto メソッドをコールしています。

    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, 
                                     Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

この中の Spliterator オブジェクトの forEachRemaining メソッドがパイプラインをさかのぼって、処理を行う部分です。この場合だと、map メソッドを行ってから、reduce メソッドを行います。

では、RangeIntSpliterator クラスの forEachRemaining メソッドを見てみましょう。

       public void forEachRemaining(IntConsumer consumer) {
            Objects.requireNonNull(consumer);

            int i = from;
            final int hUpTo = upTo;
            int hLast = last;
            from = upTo;
            last = 0;
            while (i < hUpTo) {
                consumer.accept(i++);
            }
            if (hLast > 0) {
                // Last element of closed range
                consumer.accept(i);
            }
        }

やっと、accept メソッドが出てきました。ここで、まず map メソッドで作成した StatelessOp オブジェクトの accept メソッドがコールされ、そしてその後 ReducingSink クラスの accept メソッドがコールされるわけです。

ところで、map メソッドはパラレル処理でしやすいですけど、reduce メソッドはどうするのか思いますよね。

タスクを分割した最後のところは 0 と IntStream オブジェクトの個々の要素の map メソッドの戻り値を足し合わせます。そして、forEachRemaining メソッドを抜けて、compute メソッドまで戻ります。
そして、compute メソッドの最後の task.tryComplete() が実行されます。

ReduceTask クラスの onCompletion メソッドは次のようになっています。

        public void onCompletion(CountedCompleter<?> caller) {
            if (!isLeaf()) {
                S leftResult = leftChild.getLocalResult();
                leftResult.combine(rightChild.getLocalResult());
                setLocalResult(leftResult);
            }
            // GC spliterator, left and right child
            super.onCompletion(caller);
        }

leftResult 変数の結果と rightChild 変数の結果を combine メソッドで統合しています。この combine メソッドは ReducingSink の combine メソッドです。先ほど ReducingSink クラスは示しましたけど、もう一回出します。

            public void combine(ReducingSink other) {
                state = combiner.apply(state, other.state);
            }

combiner 変数の型は BinaryOperator インタフェースです。つまり、Lambda 式で記述している部分は、BinaryOperator インタフェースの apply メソッドなわけです。つまりここでは、単純に足し算をしています。

ということで、0 と map メソッドの戻り値を足した右と左のタスクが、その上の段階で足し合わされるわけです。これがさらに統合されて、最終的にすべての足し算が行われるというわけです。

というわけで、分割統治でパラレルに reduce メソッドを行っていく処理を追ってみました。

今回は reduce メソッドで見てきましたが、他のメソッドではまた違う方法でパラレルに処理しています。特に順番が決まっている処理の場合、パラレル処理をするための工夫があるので、もし興味があるようでしたらぜひ見てみてください。

ふー、長かった (笑)。