1/*
2 * Copyright (c) 2013, 2014, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.DoubleSummaryStatistics;
28import java.util.Objects;
29import java.util.OptionalDouble;
30import java.util.PrimitiveIterator;
31import java.util.Spliterator;
32import java.util.Spliterators;
33import java.util.function.BiConsumer;
34import java.util.function.BinaryOperator;
35import java.util.function.DoubleBinaryOperator;
36import java.util.function.DoubleConsumer;
37import java.util.function.DoubleFunction;
38import java.util.function.DoublePredicate;
39import java.util.function.DoubleToIntFunction;
40import java.util.function.DoubleToLongFunction;
41import java.util.function.DoubleUnaryOperator;
42import java.util.function.IntFunction;
43import java.util.function.ObjDoubleConsumer;
44import java.util.function.Supplier;
45
46/**
47 * Abstract base class for an intermediate pipeline stage or pipeline source
48 * stage implementing whose elements are of type {@code double}.
49 *
50 * @param  type of elements in the upstream source
51 *
52 * @since 1.8
53 * @hide Visible for CTS testing only (OpenJDK8 tests).
54 */
55public abstract class DoublePipeline<E_IN>
56        extends AbstractPipeline<E_IN, Double, DoubleStream>
57        implements DoubleStream {
58
59    /**
60     * Constructor for the head of a stream pipeline.
61     *
62     * @param source {@code Supplier<Spliterator>} describing the stream source
63     * @param sourceFlags the source flags for the stream source, described in
64     * {@link StreamOpFlag}
65     */
66    DoublePipeline(Supplier<? extends Spliterator<Double>> source,
67                   int sourceFlags, boolean parallel) {
68        super(source, sourceFlags, parallel);
69    }
70
71    /**
72     * Constructor for the head of a stream pipeline.
73     *
74     * @param source {@code Spliterator} describing the stream source
75     * @param sourceFlags the source flags for the stream source, described in
76     * {@link StreamOpFlag}
77     */
78    DoublePipeline(Spliterator<Double> source,
79                   int sourceFlags, boolean parallel) {
80        super(source, sourceFlags, parallel);
81    }
82
83    /**
84     * Constructor for appending an intermediate operation onto an existing
85     * pipeline.
86     *
87     * @param upstream the upstream element source.
88     * @param opFlags the operation flags
89     */
90    DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
91        super(upstream, opFlags);
92    }
93
94    /**
95     * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply
96     * by casting.
97     */
98    private static DoubleConsumer adapt(Sink<Double> sink) {
99        if (sink instanceof DoubleConsumer) {
100            return (DoubleConsumer) sink;
101        } else {
102            if (Tripwire.ENABLED)
103                Tripwire.trip(AbstractPipeline.class,
104                              "using DoubleStream.adapt(Sink<Double> s)");
105            return sink::accept;
106        }
107    }
108
109    /**
110     * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}.
111     *
112     * @implNote
113     * The implementation attempts to cast to a Spliterator.OfDouble, and throws
114     * an exception if this cast is not possible.
115     */
116    private static Spliterator.OfDouble adapt(Spliterator<Double> s) {
117        if (s instanceof Spliterator.OfDouble) {
118            return (Spliterator.OfDouble) s;
119        } else {
120            if (Tripwire.ENABLED)
121                Tripwire.trip(AbstractPipeline.class,
122                              "using DoubleStream.adapt(Spliterator<Double> s)");
123            throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)");
124        }
125    }
126
127
128    // Shape-specific methods
129
130    @Override
131    public final StreamShape getOutputShape() {
132        return StreamShape.DOUBLE_VALUE;
133    }
134
135    @Override
136    public final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper,
137                                             Spliterator<P_IN> spliterator,
138                                             boolean flattenTree,
139                                             IntFunction<Double[]> generator) {
140        return Nodes.collectDouble(helper, spliterator, flattenTree);
141    }
142
143    @Override
144    public final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph,
145                                          Supplier<Spliterator<P_IN>> supplier,
146                                          boolean isParallel) {
147        return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel);
148    }
149
150    @Override
151    @SuppressWarnings("unchecked")
152    public final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) {
153        return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier);
154    }
155
156    @Override
157    public final void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
158        Spliterator.OfDouble spl = adapt(spliterator);
159        DoubleConsumer adaptedSink = adapt(sink);
160        do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
161    }
162
163    @Override
164    public final Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) {
165        return Nodes.doubleBuilder(exactSizeIfKnown);
166    }
167
168
169    // DoubleStream
170
171    @Override
172    public final PrimitiveIterator.OfDouble iterator() {
173        return Spliterators.iterator(spliterator());
174    }
175
176    @Override
177    public final Spliterator.OfDouble spliterator() {
178        return adapt(super.spliterator());
179    }
180
181    // Stateless intermediate ops from DoubleStream
182
183    @Override
184    public final Stream<Double> boxed() {
185        return mapToObj(Double::valueOf);
186    }
187
188    @Override
189    public final DoubleStream map(DoubleUnaryOperator mapper) {
190        Objects.requireNonNull(mapper);
191        return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
192                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
193            @Override
194            public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
195                return new Sink.ChainedDouble<Double>(sink) {
196                    @Override
197                    public void accept(double t) {
198                        downstream.accept(mapper.applyAsDouble(t));
199                    }
200                };
201            }
202        };
203    }
204
205    @Override
206    public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
207        Objects.requireNonNull(mapper);
208        return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE,
209                                                            StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
210            @Override
211            public Sink<Double> opWrapSink(int flags, Sink<U> sink) {
212                return new Sink.ChainedDouble<U>(sink) {
213                    @Override
214                    public void accept(double t) {
215                        downstream.accept(mapper.apply(t));
216                    }
217                };
218            }
219        };
220    }
221
222    @Override
223    public final IntStream mapToInt(DoubleToIntFunction mapper) {
224        Objects.requireNonNull(mapper);
225        return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
226                                                   StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
227            @Override
228            public Sink<Double> opWrapSink(int flags, Sink<Integer> sink) {
229                return new Sink.ChainedDouble<Integer>(sink) {
230                    @Override
231                    public void accept(double t) {
232                        downstream.accept(mapper.applyAsInt(t));
233                    }
234                };
235            }
236        };
237    }
238
239    @Override
240    public final LongStream mapToLong(DoubleToLongFunction mapper) {
241        Objects.requireNonNull(mapper);
242        return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
243                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
244            @Override
245            public Sink<Double> opWrapSink(int flags, Sink<Long> sink) {
246                return new Sink.ChainedDouble<Long>(sink) {
247                    @Override
248                    public void accept(double t) {
249                        downstream.accept(mapper.applyAsLong(t));
250                    }
251                };
252            }
253        };
254    }
255
256    @Override
257    public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
258        return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
259                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
260            @Override
261            public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
262                return new Sink.ChainedDouble<Double>(sink) {
263                    @Override
264                    public void begin(long size) {
265                        downstream.begin(-1);
266                    }
267
268                    @Override
269                    public void accept(double t) {
270                        try (DoubleStream result = mapper.apply(t)) {
271                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
272                            if (result != null)
273                                result.sequential().forEach(i -> downstream.accept(i));
274                        }
275                    }
276                };
277            }
278        };
279    }
280
281    @Override
282    public DoubleStream unordered() {
283        if (!isOrdered())
284            return this;
285        return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) {
286            @Override
287            public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
288                return sink;
289            }
290        };
291    }
292
293    @Override
294    public final DoubleStream filter(DoublePredicate predicate) {
295        Objects.requireNonNull(predicate);
296        return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
297                                       StreamOpFlag.NOT_SIZED) {
298            @Override
299            public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
300                return new Sink.ChainedDouble<Double>(sink) {
301                    @Override
302                    public void begin(long size) {
303                        downstream.begin(-1);
304                    }
305
306                    @Override
307                    public void accept(double t) {
308                        if (predicate.test(t))
309                            downstream.accept(t);
310                    }
311                };
312            }
313        };
314    }
315
316    @Override
317    public final DoubleStream peek(DoubleConsumer action) {
318        Objects.requireNonNull(action);
319        return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
320                                       0) {
321            @Override
322            public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
323                return new Sink.ChainedDouble<Double>(sink) {
324                    @Override
325                    public void accept(double t) {
326                        action.accept(t);
327                        downstream.accept(t);
328                    }
329                };
330            }
331        };
332    }
333
334    // Stateful intermediate ops from DoubleStream
335
336    @Override
337    public final DoubleStream limit(long maxSize) {
338        if (maxSize < 0)
339            throw new IllegalArgumentException(Long.toString(maxSize));
340        return SliceOps.makeDouble(this, (long) 0, maxSize);
341    }
342
343    @Override
344    public final DoubleStream skip(long n) {
345        if (n < 0)
346            throw new IllegalArgumentException(Long.toString(n));
347        if (n == 0)
348            return this;
349        else {
350            long limit = -1;
351            return SliceOps.makeDouble(this, n, limit);
352        }
353    }
354
355    @Override
356    public final DoubleStream sorted() {
357        return SortedOps.makeDouble(this);
358    }
359
360    @Override
361    public final DoubleStream distinct() {
362        // While functional and quick to implement, this approach is not very efficient.
363        // An efficient version requires a double-specific map/set implementation.
364        return boxed().distinct().mapToDouble(i -> (double) i);
365    }
366
367    // Terminal ops from DoubleStream
368
369    @Override
370    public void forEach(DoubleConsumer consumer) {
371        evaluate(ForEachOps.makeDouble(consumer, false));
372    }
373
374    @Override
375    public void forEachOrdered(DoubleConsumer consumer) {
376        evaluate(ForEachOps.makeDouble(consumer, true));
377    }
378
379    @Override
380    public final double sum() {
381        /*
382         * In the arrays allocated for the collect operation, index 0
383         * holds the high-order bits of the running sum, index 1 holds
384         * the low-order bits of the sum computed via compensated
385         * summation, and index 2 holds the simple sum used to compute
386         * the proper result if the stream contains infinite values of
387         * the same sign.
388         */
389        double[] summation = collect(() -> new double[3],
390                               (ll, d) -> {
391                                   Collectors.sumWithCompensation(ll, d);
392                                   ll[2] += d;
393                               },
394                               (ll, rr) -> {
395                                   Collectors.sumWithCompensation(ll, rr[0]);
396                                   Collectors.sumWithCompensation(ll, rr[1]);
397                                   ll[2] += rr[2];
398                               });
399
400        return Collectors.computeFinalSum(summation);
401    }
402
403    @Override
404    public final OptionalDouble min() {
405        return reduce(Math::min);
406    }
407
408    @Override
409    public final OptionalDouble max() {
410        return reduce(Math::max);
411    }
412
413    /**
414     * {@inheritDoc}
415     *
416     * @implNote The {@code double} format can represent all
417     * consecutive integers in the range -2<sup>53</sup> to
418     * 2<sup>53</sup>. If the pipeline has more than 2<sup>53</sup>
419     * values, the divisor in the average computation will saturate at
420     * 2<sup>53</sup>, leading to additional numerical errors.
421     */
422    @Override
423    public final OptionalDouble average() {
424        /*
425         * In the arrays allocated for the collect operation, index 0
426         * holds the high-order bits of the running sum, index 1 holds
427         * the low-order bits of the sum computed via compensated
428         * summation, index 2 holds the number of values seen, index 3
429         * holds the simple sum.
430         */
431        double[] avg = collect(() -> new double[4],
432                               (ll, d) -> {
433                                   ll[2]++;
434                                   Collectors.sumWithCompensation(ll, d);
435                                   ll[3] += d;
436                               },
437                               (ll, rr) -> {
438                                   Collectors.sumWithCompensation(ll, rr[0]);
439                                   Collectors.sumWithCompensation(ll, rr[1]);
440                                   ll[2] += rr[2];
441                                   ll[3] += rr[3];
442                               });
443        return avg[2] > 0
444            ? OptionalDouble.of(Collectors.computeFinalSum(avg) / avg[2])
445            : OptionalDouble.empty();
446    }
447
448    @Override
449    public final long count() {
450        return mapToLong(e -> 1L).sum();
451    }
452
453    @Override
454    public final DoubleSummaryStatistics summaryStatistics() {
455        return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept,
456                       DoubleSummaryStatistics::combine);
457    }
458
459    @Override
460    public final double reduce(double identity, DoubleBinaryOperator op) {
461        return evaluate(ReduceOps.makeDouble(identity, op));
462    }
463
464    @Override
465    public final OptionalDouble reduce(DoubleBinaryOperator op) {
466        return evaluate(ReduceOps.makeDouble(op));
467    }
468
469    @Override
470    public final <R> R collect(Supplier<R> supplier,
471                               ObjDoubleConsumer<R> accumulator,
472                               BiConsumer<R, R> combiner) {
473        BinaryOperator<R> operator = (left, right) -> {
474            combiner.accept(left, right);
475            return left;
476        };
477        return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator));
478    }
479
480    @Override
481    public final boolean anyMatch(DoublePredicate predicate) {
482        return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY));
483    }
484
485    @Override
486    public final boolean allMatch(DoublePredicate predicate) {
487        return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL));
488    }
489
490    @Override
491    public final boolean noneMatch(DoublePredicate predicate) {
492        return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE));
493    }
494
495    @Override
496    public final OptionalDouble findFirst() {
497        return evaluate(FindOps.makeDouble(true));
498    }
499
500    @Override
501    public final OptionalDouble findAny() {
502        return evaluate(FindOps.makeDouble(false));
503    }
504
505    @Override
506    public final double[] toArray() {
507        return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new))
508                        .asPrimitiveArray();
509    }
510
511    //
512
513    /**
514     * Source stage of a DoubleStream
515     *
516     * @param  type of elements in the upstream source
517     * @hide Visibility for CTS only (OpenJDK 8 streams tests).
518     */
519    public static class Head<E_IN> extends DoublePipeline<E_IN> {
520        /**
521         * Constructor for the source stage of a DoubleStream.
522         *
523         * @param source {@code Supplier<Spliterator>} describing the stream
524         *               source
525         * @param sourceFlags the source flags for the stream source, described
526         *                    in {@link StreamOpFlag}
527         * @param parallel {@code true} if the pipeline is parallel
528         */
529        public Head(Supplier<? extends Spliterator<Double>> source,
530             int sourceFlags, boolean parallel) {
531            super(source, sourceFlags, parallel);
532        }
533
534        /**
535         * Constructor for the source stage of a DoubleStream.
536         *
537         * @param source {@code Spliterator} describing the stream source
538         * @param sourceFlags the source flags for the stream source, described
539         *                    in {@link StreamOpFlag}
540         * @param parallel {@code true} if the pipeline is parallel
541         */
542        public Head(Spliterator<Double> source,
543             int sourceFlags, boolean parallel) {
544            super(source, sourceFlags, parallel);
545        }
546
547        @Override
548        public final boolean opIsStateful() {
549            throw new UnsupportedOperationException();
550        }
551
552        @Override
553        public final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) {
554            throw new UnsupportedOperationException();
555        }
556
557        // Optimized sequential terminal operations for the head of the pipeline
558
559        @Override
560        public void forEach(DoubleConsumer consumer) {
561            if (!isParallel()) {
562                adapt(sourceStageSpliterator()).forEachRemaining(consumer);
563            }
564            else {
565                super.forEach(consumer);
566            }
567        }
568
569        @Override
570        public void forEachOrdered(DoubleConsumer consumer) {
571            if (!isParallel()) {
572                adapt(sourceStageSpliterator()).forEachRemaining(consumer);
573            }
574            else {
575                super.forEachOrdered(consumer);
576            }
577        }
578
579    }
580
581    /**
582     * Base class for a stateless intermediate stage of a DoubleStream.
583     *
584     * @param  type of elements in the upstream source
585     * @since 1.8
586     * @hide Visible for CTS testing only (OpenJDK8 tests).
587     */
588    public abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> {
589        /**
590         * Construct a new DoubleStream by appending a stateless intermediate
591         * operation to an existing stream.
592         *
593         * @param upstream the upstream pipeline stage
594         * @param inputShape the stream shape for the upstream pipeline stage
595         * @param opFlags operation flags for the new stage
596         */
597        public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
598                    StreamShape inputShape,
599                    int opFlags) {
600            super(upstream, opFlags);
601            assert upstream.getOutputShape() == inputShape;
602        }
603
604        @Override
605        public final boolean opIsStateful() {
606            return false;
607        }
608    }
609
610    /**
611     * Base class for a stateful intermediate stage of a DoubleStream.
612     *
613     * @param  type of elements in the upstream source
614     * @since 1.8
615     * @hide Visible for CTS testing only (OpenJDK8 tests).
616     */
617    public abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> {
618        /**
619         * Construct a new DoubleStream by appending a stateful intermediate
620         * operation to an existing stream.
621         *
622         * @param upstream the upstream pipeline stage
623         * @param inputShape the stream shape for the upstream pipeline stage
624         * @param opFlags operation flags for the new stage
625         */
626        public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
627                   StreamShape inputShape,
628                   int opFlags) {
629            super(upstream, opFlags);
630            assert upstream.getOutputShape() == inputShape;
631        }
632
633        @Override
634        public final boolean opIsStateful() {
635            return true;
636        }
637
638        @Override
639        public abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
640                                                        Spliterator<P_IN> spliterator,
641                                                        IntFunction<Double[]> generator);
642    }
643}
644