ReferencePipeline.java revision ce0115e08189fcb96f990f0b93a6c2aeae59250e
1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Comparator;
28import java.util.Iterator;
29import java.util.Objects;
30import java.util.Optional;
31import java.util.Spliterator;
32import java.util.Spliterators;
33import java.util.function.BiConsumer;
34import java.util.function.BiFunction;
35import java.util.function.BinaryOperator;
36import java.util.function.Consumer;
37import java.util.function.DoubleConsumer;
38import java.util.function.Function;
39import java.util.function.IntConsumer;
40import java.util.function.IntFunction;
41import java.util.function.LongConsumer;
42import java.util.function.Predicate;
43import java.util.function.Supplier;
44import java.util.function.ToDoubleFunction;
45import java.util.function.ToIntFunction;
46import java.util.function.ToLongFunction;
47
48/**
49 * Abstract base class for an intermediate pipeline stage or pipeline source
50 * stage implementing whose elements are of type {@code U}.
51 *
52 * @param  type of elements in the upstream source
53 * @param  type of elements in produced by this stage
54 *
55 * @since 1.8
56 * @hide Visible for CTS testing only (OpenJDK8 tests).
57 */
58public abstract class ReferencePipeline<P_IN, P_OUT>
59        extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
60        implements Stream<P_OUT>  {
61
62    /**
63     * Constructor for the head of a stream pipeline.
64     *
65     * @param source {@code Supplier<Spliterator>} describing the stream source
66     * @param sourceFlags the source flags for the stream source, described in
67     *        {@link StreamOpFlag}
68     * @param parallel {@code true} if the pipeline is parallel
69     */
70    ReferencePipeline(Supplier<? extends Spliterator<?>> source,
71                      int sourceFlags, boolean parallel) {
72        super(source, sourceFlags, parallel);
73    }
74
75    /**
76     * Constructor for the head of a stream pipeline.
77     *
78     * @param source {@code Spliterator} describing the stream source
79     * @param sourceFlags The source flags for the stream source, described in
80     *        {@link StreamOpFlag}
81     * @param parallel {@code true} if the pipeline is parallel
82     */
83    ReferencePipeline(Spliterator<?> source,
84                      int sourceFlags, boolean parallel) {
85        super(source, sourceFlags, parallel);
86    }
87
88    /**
89     * Constructor for appending an intermediate operation onto an existing
90     * pipeline.
91     *
92     * @param upstream the upstream element source.
93     */
94    ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
95        super(upstream, opFlags);
96    }
97
98    // Shape-specific methods
99
100    @Override
101    public final StreamShape getOutputShape() {
102        return StreamShape.REFERENCE;
103    }
104
105    @Override
106    public final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
107                                        Spliterator<P_IN> spliterator,
108                                        boolean flattenTree,
109                                        IntFunction<P_OUT[]> generator) {
110        return Nodes.collect(helper, spliterator, flattenTree, generator);
111    }
112
113    @Override
114    public final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
115                                     Supplier<Spliterator<P_IN>> supplier,
116                                     boolean isParallel) {
117        return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
118    }
119
120    @Override
121    public final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
122        return new StreamSpliterators.DelegatingSpliterator<>(supplier);
123    }
124
125    @Override
126    public final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
127        do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
128    }
129
130    @Override
131    public final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
132        return Nodes.builder(exactSizeIfKnown, generator);
133    }
134
135
136    // BaseStream
137
138    @Override
139    public final Iterator<P_OUT> iterator() {
140        return Spliterators.iterator(spliterator());
141    }
142
143
144    // Stream
145
146    // Stateless intermediate operations from Stream
147
148    @Override
149    public Stream<P_OUT> unordered() {
150        if (!isOrdered())
151            return this;
152        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
153            @Override
154            public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
155                return sink;
156            }
157        };
158    }
159
160    @Override
161    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
162        Objects.requireNonNull(predicate);
163        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
164                                     StreamOpFlag.NOT_SIZED) {
165            @Override
166            public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
167                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
168                    @Override
169                    public void begin(long size) {
170                        downstream.begin(-1);
171                    }
172
173                    @Override
174                    public void accept(P_OUT u) {
175                        if (predicate.test(u))
176                            downstream.accept(u);
177                    }
178                };
179            }
180        };
181    }
182
183    @Override
184    @SuppressWarnings("unchecked")
185    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
186        Objects.requireNonNull(mapper);
187        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
188                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
189            @Override
190            public Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
191                return new Sink.ChainedReference<P_OUT, R>(sink) {
192                    @Override
193                    public void accept(P_OUT u) {
194                        downstream.accept(mapper.apply(u));
195                    }
196                };
197            }
198        };
199    }
200
201    @Override
202    public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
203        Objects.requireNonNull(mapper);
204        return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
205                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
206            @Override
207            public Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
208                return new Sink.ChainedReference<P_OUT, Integer>(sink) {
209                    @Override
210                    public void accept(P_OUT u) {
211                        downstream.accept(mapper.applyAsInt(u));
212                    }
213                };
214            }
215        };
216    }
217
218    @Override
219    public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
220        Objects.requireNonNull(mapper);
221        return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
222                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
223            @Override
224            public Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
225                return new Sink.ChainedReference<P_OUT, Long>(sink) {
226                    @Override
227                    public void accept(P_OUT u) {
228                        downstream.accept(mapper.applyAsLong(u));
229                    }
230                };
231            }
232        };
233    }
234
235    @Override
236    public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) {
237        Objects.requireNonNull(mapper);
238        return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
239                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
240            @Override
241            public Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
242                return new Sink.ChainedReference<P_OUT, Double>(sink) {
243                    @Override
244                    public void accept(P_OUT u) {
245                        downstream.accept(mapper.applyAsDouble(u));
246                    }
247                };
248            }
249        };
250    }
251
252    @Override
253    public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
254        Objects.requireNonNull(mapper);
255        // We can do better than this, by polling cancellationRequested when stream is infinite
256        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
257                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
258            @Override
259            public Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
260                return new Sink.ChainedReference<P_OUT, R>(sink) {
261                    @Override
262                    public void begin(long size) {
263                        downstream.begin(-1);
264                    }
265
266                    @Override
267                    public void accept(P_OUT u) {
268                        try (Stream<? extends R> result = mapper.apply(u)) {
269                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
270                            if (result != null)
271                                result.sequential().forEach(downstream);
272                        }
273                    }
274                };
275            }
276        };
277    }
278
279    @Override
280    public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
281        Objects.requireNonNull(mapper);
282        // We can do better than this, by polling cancellationRequested when stream is infinite
283        return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
284                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
285            @Override
286            public Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
287                return new Sink.ChainedReference<P_OUT, Integer>(sink) {
288                    IntConsumer downstreamAsInt = downstream::accept;
289                    @Override
290                    public void begin(long size) {
291                        downstream.begin(-1);
292                    }
293
294                    @Override
295                    public void accept(P_OUT u) {
296                        try (IntStream result = mapper.apply(u)) {
297                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
298                            if (result != null)
299                                result.sequential().forEach(downstreamAsInt);
300                        }
301                    }
302                };
303            }
304        };
305    }
306
307    @Override
308    public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
309        Objects.requireNonNull(mapper);
310        // We can do better than this, by polling cancellationRequested when stream is infinite
311        return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
312                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
313            @Override
314            public Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
315                return new Sink.ChainedReference<P_OUT, Double>(sink) {
316                    DoubleConsumer downstreamAsDouble = downstream::accept;
317                    @Override
318                    public void begin(long size) {
319                        downstream.begin(-1);
320                    }
321
322                    @Override
323                    public void accept(P_OUT u) {
324                        try (DoubleStream result = mapper.apply(u)) {
325                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
326                            if (result != null)
327                                result.sequential().forEach(downstreamAsDouble);
328                        }
329                    }
330                };
331            }
332        };
333    }
334
335    @Override
336    public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
337        Objects.requireNonNull(mapper);
338        // We can do better than this, by polling cancellationRequested when stream is infinite
339        return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
340                                                   StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
341            @Override
342            public Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
343                return new Sink.ChainedReference<P_OUT, Long>(sink) {
344                    LongConsumer downstreamAsLong = downstream::accept;
345                    @Override
346                    public void begin(long size) {
347                        downstream.begin(-1);
348                    }
349
350                    @Override
351                    public void accept(P_OUT u) {
352                        try (LongStream result = mapper.apply(u)) {
353                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
354                            if (result != null)
355                                result.sequential().forEach(downstreamAsLong);
356                        }
357                    }
358                };
359            }
360        };
361    }
362
363    @Override
364    public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
365        Objects.requireNonNull(action);
366        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
367                                     0) {
368            @Override
369            public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
370                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
371                    @Override
372                    public void accept(P_OUT u) {
373                        action.accept(u);
374                        downstream.accept(u);
375                    }
376                };
377            }
378        };
379    }
380
381    // Stateful intermediate operations from Stream
382
383    @Override
384    public final Stream<P_OUT> distinct() {
385        return DistinctOps.makeRef(this);
386    }
387
388    @Override
389    public final Stream<P_OUT> sorted() {
390        return SortedOps.makeRef(this);
391    }
392
393    @Override
394    public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
395        return SortedOps.makeRef(this, comparator);
396    }
397
398    @Override
399    public final Stream<P_OUT> limit(long maxSize) {
400        if (maxSize < 0)
401            throw new IllegalArgumentException(Long.toString(maxSize));
402        return SliceOps.makeRef(this, 0, maxSize);
403    }
404
405    @Override
406    public final Stream<P_OUT> skip(long n) {
407        if (n < 0)
408            throw new IllegalArgumentException(Long.toString(n));
409        if (n == 0)
410            return this;
411        else
412            return SliceOps.makeRef(this, n, -1);
413    }
414
415    // Terminal operations from Stream
416
417    @Override
418    public void forEach(Consumer<? super P_OUT> action) {
419        evaluate(ForEachOps.makeRef(action, false));
420    }
421
422    @Override
423    public void forEachOrdered(Consumer<? super P_OUT> action) {
424        evaluate(ForEachOps.makeRef(action, true));
425    }
426
427    @Override
428    @SuppressWarnings("unchecked")
429    public final <A> A[] toArray(IntFunction<A[]> generator) {
430        // Since A has no relation to U (not possible to declare that A is an upper bound of U)
431        // there will be no static type checking.
432        // Therefore use a raw type and assume A == U rather than propagating the separation of A and U
433        // throughout the code-base.
434        // The runtime type of U is never checked for equality with the component type of the runtime type of A[].
435        // Runtime checking will be performed when an element is stored in A[], thus if A is not a
436        // super type of U an ArrayStoreException will be thrown.
437        @SuppressWarnings("rawtypes")
438        IntFunction rawGenerator = (IntFunction) generator;
439        return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator)
440                              .asArray(rawGenerator);
441    }
442
443    @Override
444    public final Object[] toArray() {
445        return toArray(Object[]::new);
446    }
447
448    @Override
449    public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
450        return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
451    }
452
453    @Override
454    public final boolean allMatch(Predicate<? super P_OUT> predicate) {
455        return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
456    }
457
458    @Override
459    public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
460        return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
461    }
462
463    @Override
464    public final Optional<P_OUT> findFirst() {
465        return evaluate(FindOps.makeRef(true));
466    }
467
468    @Override
469    public final Optional<P_OUT> findAny() {
470        return evaluate(FindOps.makeRef(false));
471    }
472
473    @Override
474    public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
475        return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
476    }
477
478    @Override
479    public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
480        return evaluate(ReduceOps.makeRef(accumulator));
481    }
482
483    @Override
484    public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
485        return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
486    }
487
488    @Override
489    @SuppressWarnings("unchecked")
490    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
491        A container;
492        if (isParallel()
493                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
494                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
495            container = collector.supplier().get();
496            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
497            forEach(u -> accumulator.accept(container, u));
498        }
499        else {
500            container = evaluate(ReduceOps.makeRef(collector));
501        }
502        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
503               ? (R) container
504               : collector.finisher().apply(container);
505    }
506
507    @Override
508    public final <R> R collect(Supplier<R> supplier,
509                               BiConsumer<R, ? super P_OUT> accumulator,
510                               BiConsumer<R, R> combiner) {
511        return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
512    }
513
514    @Override
515    public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
516        return reduce(BinaryOperator.maxBy(comparator));
517    }
518
519    @Override
520    public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
521        return reduce(BinaryOperator.minBy(comparator));
522
523    }
524
525    @Override
526    public final long count() {
527        return mapToLong(e -> 1L).sum();
528    }
529
530
531    //
532
533    /**
534     * Source stage of a ReferencePipeline.
535     *
536     * @param  type of elements in the upstream source
537     * @param  type of elements in produced by this stage
538     * @since 1.8
539     * @hide Visible for CTS testing only (OpenJDK8 tests).
540     */
541    public static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
542        /**
543         * Constructor for the source stage of a Stream.
544         *
545         * @param source {@code Supplier<Spliterator>} describing the stream
546         *               source
547         * @param sourceFlags the source flags for the stream source, described
548         *                    in {@link StreamOpFlag}
549         */
550        public Head(Supplier<? extends Spliterator<?>> source,
551             int sourceFlags, boolean parallel) {
552            super(source, sourceFlags, parallel);
553        }
554
555        /**
556         * Constructor for the source stage of a Stream.
557         *
558         * @param source {@code Spliterator} describing the stream source
559         * @param sourceFlags the source flags for the stream source, described
560         *                    in {@link StreamOpFlag}
561         */
562        public Head(Spliterator<?> source,
563             int sourceFlags, boolean parallel) {
564            super(source, sourceFlags, parallel);
565        }
566
567        @Override
568        public final boolean opIsStateful() {
569            throw new UnsupportedOperationException();
570        }
571
572        @Override
573        public final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
574            throw new UnsupportedOperationException();
575        }
576
577        // Optimized sequential terminal operations for the head of the pipeline
578
579        @Override
580        public void forEach(Consumer<? super E_OUT> action) {
581            if (!isParallel()) {
582                sourceStageSpliterator().forEachRemaining(action);
583            }
584            else {
585                super.forEach(action);
586            }
587        }
588
589        @Override
590        public void forEachOrdered(Consumer<? super E_OUT> action) {
591            if (!isParallel()) {
592                sourceStageSpliterator().forEachRemaining(action);
593            }
594            else {
595                super.forEachOrdered(action);
596            }
597        }
598    }
599
600    /**
601     * Base class for a stateless intermediate stage of a Stream.
602     *
603     * @param  type of elements in the upstream source
604     * @param  type of elements in produced by this stage
605     * @since 1.8
606     * @hide Visible for CTS testing only (OpenJDK8 tests).
607     */
608    public abstract static class StatelessOp<E_IN, E_OUT>
609            extends ReferencePipeline<E_IN, E_OUT> {
610        /**
611         * Construct a new Stream by appending a stateless intermediate
612         * operation to an existing stream.
613         *
614         * @param upstream The upstream pipeline stage
615         * @param inputShape The stream shape for the upstream pipeline stage
616         * @param opFlags Operation flags for the new stage
617         */
618        public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
619                    StreamShape inputShape,
620                    int opFlags) {
621            super(upstream, opFlags);
622            assert upstream.getOutputShape() == inputShape;
623        }
624
625        @Override
626        public final boolean opIsStateful() {
627            return false;
628        }
629    }
630
631    /**
632     * Base class for a stateful intermediate stage of a Stream.
633     *
634     * @param  type of elements in the upstream source
635     * @param  type of elements in produced by this stage
636     * @since 1.8
637     * @hide Visible for CTS testing only (OpenJDK8 tests).
638     */
639    public abstract static class StatefulOp<E_IN, E_OUT>
640            extends ReferencePipeline<E_IN, E_OUT> {
641        /**
642         * Construct a new Stream by appending a stateful intermediate operation
643         * to an existing stream.
644         * @param upstream The upstream pipeline stage
645         * @param inputShape The stream shape for the upstream pipeline stage
646         * @param opFlags Operation flags for the new stage
647         */
648        public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
649                   StreamShape inputShape,
650                   int opFlags) {
651            super(upstream, opFlags);
652            assert upstream.getOutputShape() == inputShape;
653        }
654
655        @Override
656        public final boolean opIsStateful() {
657            return true;
658        }
659
660        @Override
661        public abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
662                                                       Spliterator<P_IN> spliterator,
663                                                       IntFunction<E_OUT[]> generator);
664    }
665}
666