ReferencePipeline.java revision d0a2645e29a9b84d7e5ec822eb9904e93bd6c013
1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Comparator;
28import java.util.Iterator;
29import java.util.Objects;
30import java.util.Optional;
31import java.util.Spliterator;
32import java.util.Spliterators;
33import java.util.function.BiConsumer;
34import java.util.function.BiFunction;
35import java.util.function.BinaryOperator;
36import java.util.function.Consumer;
37import java.util.function.DoubleConsumer;
38import java.util.function.Function;
39import java.util.function.IntConsumer;
40import java.util.function.IntFunction;
41import java.util.function.LongConsumer;
42import java.util.function.Predicate;
43import java.util.function.Supplier;
44import java.util.function.ToDoubleFunction;
45import java.util.function.ToIntFunction;
46import java.util.function.ToLongFunction;
47
48/**
49 * Abstract base class for an intermediate pipeline stage or pipeline source
50 * stage implementing whose elements are of type {@code U}.
51 *
52 * @param  type of elements in the upstream source
53 * @param  type of elements in produced by this stage
54 *
55 * @since 1.8
56 */
57abstract class ReferencePipeline<P_IN, P_OUT>
58        extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
59        implements Stream<P_OUT>  {
60
61    /**
62     * Constructor for the head of a stream pipeline.
63     *
64     * @param source {@code Supplier<Spliterator>} describing the stream source
65     * @param sourceFlags the source flags for the stream source, described in
66     *        {@link StreamOpFlag}
67     * @param parallel {@code true} if the pipeline is parallel
68     */
69    ReferencePipeline(Supplier<? extends Spliterator<?>> source,
70                      int sourceFlags, boolean parallel) {
71        super(source, sourceFlags, parallel);
72    }
73
74    /**
75     * Constructor for the head of a stream pipeline.
76     *
77     * @param source {@code Spliterator} describing the stream source
78     * @param sourceFlags The source flags for the stream source, described in
79     *        {@link StreamOpFlag}
80     * @param parallel {@code true} if the pipeline is parallel
81     */
82    ReferencePipeline(Spliterator<?> source,
83                      int sourceFlags, boolean parallel) {
84        super(source, sourceFlags, parallel);
85    }
86
87    /**
88     * Constructor for appending an intermediate operation onto an existing
89     * pipeline.
90     *
91     * @param upstream the upstream element source.
92     */
93    ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
94        super(upstream, opFlags);
95    }
96
97    // Shape-specific methods
98
99    @Override
100    final StreamShape getOutputShape() {
101        return StreamShape.REFERENCE;
102    }
103
104    @Override
105    final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
106                                        Spliterator<P_IN> spliterator,
107                                        boolean flattenTree,
108                                        IntFunction<P_OUT[]> generator) {
109        return Nodes.collect(helper, spliterator, flattenTree, generator);
110    }
111
112    @Override
113    final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
114                                     Supplier<Spliterator<P_IN>> supplier,
115                                     boolean isParallel) {
116        return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
117    }
118
119    @Override
120    final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
121        return new StreamSpliterators.DelegatingSpliterator<>(supplier);
122    }
123
124    @Override
125    final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
126        do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
127    }
128
129    @Override
130    final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
131        return Nodes.builder(exactSizeIfKnown, generator);
132    }
133
134
135    // BaseStream
136
137    @Override
138    public final Iterator<P_OUT> iterator() {
139        return Spliterators.iterator(spliterator());
140    }
141
142
143    // Stream
144
145    // Stateless intermediate operations from Stream
146
147    @Override
148    public Stream<P_OUT> unordered() {
149        if (!isOrdered())
150            return this;
151        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
152            @Override
153            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
154                return sink;
155            }
156        };
157    }
158
159    @Override
160    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
161        Objects.requireNonNull(predicate);
162        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
163                                     StreamOpFlag.NOT_SIZED) {
164            @Override
165            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
166                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
167                    @Override
168                    public void begin(long size) {
169                        downstream.begin(-1);
170                    }
171
172                    @Override
173                    public void accept(P_OUT u) {
174                        if (predicate.test(u))
175                            downstream.accept(u);
176                    }
177                };
178            }
179        };
180    }
181
182    @Override
183    @SuppressWarnings("unchecked")
184    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
185        Objects.requireNonNull(mapper);
186        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
187                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
188            @Override
189            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
190                return new Sink.ChainedReference<P_OUT, R>(sink) {
191                    @Override
192                    public void accept(P_OUT u) {
193                        downstream.accept(mapper.apply(u));
194                    }
195                };
196            }
197        };
198    }
199
200    @Override
201    public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
202        Objects.requireNonNull(mapper);
203        return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
204                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
205            @Override
206            Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
207                return new Sink.ChainedReference<P_OUT, Integer>(sink) {
208                    @Override
209                    public void accept(P_OUT u) {
210                        downstream.accept(mapper.applyAsInt(u));
211                    }
212                };
213            }
214        };
215    }
216
217    @Override
218    public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
219        Objects.requireNonNull(mapper);
220        return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
221                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
222            @Override
223            Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
224                return new Sink.ChainedReference<P_OUT, Long>(sink) {
225                    @Override
226                    public void accept(P_OUT u) {
227                        downstream.accept(mapper.applyAsLong(u));
228                    }
229                };
230            }
231        };
232    }
233
234    @Override
235    public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) {
236        Objects.requireNonNull(mapper);
237        return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
238                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
239            @Override
240            Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
241                return new Sink.ChainedReference<P_OUT, Double>(sink) {
242                    @Override
243                    public void accept(P_OUT u) {
244                        downstream.accept(mapper.applyAsDouble(u));
245                    }
246                };
247            }
248        };
249    }
250
251    @Override
252    public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
253        Objects.requireNonNull(mapper);
254        // We can do better than this, by polling cancellationRequested when stream is infinite
255        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
256                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
257            @Override
258            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
259                return new Sink.ChainedReference<P_OUT, R>(sink) {
260                    @Override
261                    public void begin(long size) {
262                        downstream.begin(-1);
263                    }
264
265                    @Override
266                    public void accept(P_OUT u) {
267                        try (Stream<? extends R> result = mapper.apply(u)) {
268                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
269                            if (result != null)
270                                result.sequential().forEach(downstream);
271                        }
272                    }
273                };
274            }
275        };
276    }
277
278    @Override
279    public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
280        Objects.requireNonNull(mapper);
281        // We can do better than this, by polling cancellationRequested when stream is infinite
282        return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
283                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
284            @Override
285            Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
286                return new Sink.ChainedReference<P_OUT, Integer>(sink) {
287                    IntConsumer downstreamAsInt = downstream::accept;
288                    @Override
289                    public void begin(long size) {
290                        downstream.begin(-1);
291                    }
292
293                    @Override
294                    public void accept(P_OUT u) {
295                        try (IntStream result = mapper.apply(u)) {
296                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
297                            if (result != null)
298                                result.sequential().forEach(downstreamAsInt);
299                        }
300                    }
301                };
302            }
303        };
304    }
305
306    @Override
307    public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
308        Objects.requireNonNull(mapper);
309        // We can do better than this, by polling cancellationRequested when stream is infinite
310        return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
311                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
312            @Override
313            Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
314                return new Sink.ChainedReference<P_OUT, Double>(sink) {
315                    DoubleConsumer downstreamAsDouble = downstream::accept;
316                    @Override
317                    public void begin(long size) {
318                        downstream.begin(-1);
319                    }
320
321                    @Override
322                    public void accept(P_OUT u) {
323                        try (DoubleStream result = mapper.apply(u)) {
324                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
325                            if (result != null)
326                                result.sequential().forEach(downstreamAsDouble);
327                        }
328                    }
329                };
330            }
331        };
332    }
333
334    @Override
335    public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
336        Objects.requireNonNull(mapper);
337        // We can do better than this, by polling cancellationRequested when stream is infinite
338        return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
339                                                   StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
340            @Override
341            Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
342                return new Sink.ChainedReference<P_OUT, Long>(sink) {
343                    LongConsumer downstreamAsLong = downstream::accept;
344                    @Override
345                    public void begin(long size) {
346                        downstream.begin(-1);
347                    }
348
349                    @Override
350                    public void accept(P_OUT u) {
351                        try (LongStream result = mapper.apply(u)) {
352                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
353                            if (result != null)
354                                result.sequential().forEach(downstreamAsLong);
355                        }
356                    }
357                };
358            }
359        };
360    }
361
362    @Override
363    public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
364        Objects.requireNonNull(action);
365        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
366                                     0) {
367            @Override
368            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
369                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
370                    @Override
371                    public void accept(P_OUT u) {
372                        action.accept(u);
373                        downstream.accept(u);
374                    }
375                };
376            }
377        };
378    }
379
380    // Stateful intermediate operations from Stream
381
382    @Override
383    public final Stream<P_OUT> distinct() {
384        return DistinctOps.makeRef(this);
385    }
386
387    @Override
388    public final Stream<P_OUT> sorted() {
389        return SortedOps.makeRef(this);
390    }
391
392    @Override
393    public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
394        return SortedOps.makeRef(this, comparator);
395    }
396
397    @Override
398    public final Stream<P_OUT> limit(long maxSize) {
399        if (maxSize < 0)
400            throw new IllegalArgumentException(Long.toString(maxSize));
401        return SliceOps.makeRef(this, 0, maxSize);
402    }
403
404    @Override
405    public final Stream<P_OUT> skip(long n) {
406        if (n < 0)
407            throw new IllegalArgumentException(Long.toString(n));
408        if (n == 0)
409            return this;
410        else
411            return SliceOps.makeRef(this, n, -1);
412    }
413
414    // Terminal operations from Stream
415
416    @Override
417    public void forEach(Consumer<? super P_OUT> action) {
418        evaluate(ForEachOps.makeRef(action, false));
419    }
420
421    @Override
422    public void forEachOrdered(Consumer<? super P_OUT> action) {
423        evaluate(ForEachOps.makeRef(action, true));
424    }
425
426    @Override
427    @SuppressWarnings("unchecked")
428    public final <A> A[] toArray(IntFunction<A[]> generator) {
429        // Since A has no relation to U (not possible to declare that A is an upper bound of U)
430        // there will be no static type checking.
431        // Therefore use a raw type and assume A == U rather than propagating the separation of A and U
432        // throughout the code-base.
433        // The runtime type of U is never checked for equality with the component type of the runtime type of A[].
434        // Runtime checking will be performed when an element is stored in A[], thus if A is not a
435        // super type of U an ArrayStoreException will be thrown.
436        @SuppressWarnings("rawtypes")
437        IntFunction rawGenerator = (IntFunction) generator;
438        return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator)
439                              .asArray(rawGenerator);
440    }
441
442    @Override
443    public final Object[] toArray() {
444        return toArray(Object[]::new);
445    }
446
447    @Override
448    public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
449        return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
450    }
451
452    @Override
453    public final boolean allMatch(Predicate<? super P_OUT> predicate) {
454        return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
455    }
456
457    @Override
458    public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
459        return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
460    }
461
462    @Override
463    public final Optional<P_OUT> findFirst() {
464        return evaluate(FindOps.makeRef(true));
465    }
466
467    @Override
468    public final Optional<P_OUT> findAny() {
469        return evaluate(FindOps.makeRef(false));
470    }
471
472    @Override
473    public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
474        return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
475    }
476
477    @Override
478    public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
479        return evaluate(ReduceOps.makeRef(accumulator));
480    }
481
482    @Override
483    public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
484        return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
485    }
486
487    @Override
488    @SuppressWarnings("unchecked")
489    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
490        A container;
491        if (isParallel()
492                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
493                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
494            container = collector.supplier().get();
495            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
496            forEach(u -> accumulator.accept(container, u));
497        }
498        else {
499            container = evaluate(ReduceOps.makeRef(collector));
500        }
501        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
502               ? (R) container
503               : collector.finisher().apply(container);
504    }
505
506    @Override
507    public final <R> R collect(Supplier<R> supplier,
508                               BiConsumer<R, ? super P_OUT> accumulator,
509                               BiConsumer<R, R> combiner) {
510        return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
511    }
512
513    @Override
514    public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
515        return reduce(BinaryOperator.maxBy(comparator));
516    }
517
518    @Override
519    public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
520        return reduce(BinaryOperator.minBy(comparator));
521
522    }
523
524    @Override
525    public final long count() {
526        return mapToLong(e -> 1L).sum();
527    }
528
529
530    //
531
532    /**
533     * Source stage of a ReferencePipeline.
534     *
535     * @param  type of elements in the upstream source
536     * @param  type of elements in produced by this stage
537     * @since 1.8
538     */
539    static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
540        /**
541         * Constructor for the source stage of a Stream.
542         *
543         * @param source {@code Supplier<Spliterator>} describing the stream
544         *               source
545         * @param sourceFlags the source flags for the stream source, described
546         *                    in {@link StreamOpFlag}
547         */
548        Head(Supplier<? extends Spliterator<?>> source,
549             int sourceFlags, boolean parallel) {
550            super(source, sourceFlags, parallel);
551        }
552
553        /**
554         * Constructor for the source stage of a Stream.
555         *
556         * @param source {@code Spliterator} describing the stream source
557         * @param sourceFlags the source flags for the stream source, described
558         *                    in {@link StreamOpFlag}
559         */
560        Head(Spliterator<?> source,
561             int sourceFlags, boolean parallel) {
562            super(source, sourceFlags, parallel);
563        }
564
565        @Override
566        final boolean opIsStateful() {
567            throw new UnsupportedOperationException();
568        }
569
570        @Override
571        final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
572            throw new UnsupportedOperationException();
573        }
574
575        // Optimized sequential terminal operations for the head of the pipeline
576
577        @Override
578        public void forEach(Consumer<? super E_OUT> action) {
579            if (!isParallel()) {
580                sourceStageSpliterator().forEachRemaining(action);
581            }
582            else {
583                super.forEach(action);
584            }
585        }
586
587        @Override
588        public void forEachOrdered(Consumer<? super E_OUT> action) {
589            if (!isParallel()) {
590                sourceStageSpliterator().forEachRemaining(action);
591            }
592            else {
593                super.forEachOrdered(action);
594            }
595        }
596    }
597
598    /**
599     * Base class for a stateless intermediate stage of a Stream.
600     *
601     * @param  type of elements in the upstream source
602     * @param  type of elements in produced by this stage
603     * @since 1.8
604     */
605    abstract static class StatelessOp<E_IN, E_OUT>
606            extends ReferencePipeline<E_IN, E_OUT> {
607        /**
608         * Construct a new Stream by appending a stateless intermediate
609         * operation to an existing stream.
610         *
611         * @param upstream The upstream pipeline stage
612         * @param inputShape The stream shape for the upstream pipeline stage
613         * @param opFlags Operation flags for the new stage
614         */
615        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
616                    StreamShape inputShape,
617                    int opFlags) {
618            super(upstream, opFlags);
619            assert upstream.getOutputShape() == inputShape;
620        }
621
622        @Override
623        final boolean opIsStateful() {
624            return false;
625        }
626    }
627
628    /**
629     * Base class for a stateful intermediate stage of a Stream.
630     *
631     * @param  type of elements in the upstream source
632     * @param  type of elements in produced by this stage
633     * @since 1.8
634     */
635    abstract static class StatefulOp<E_IN, E_OUT>
636            extends ReferencePipeline<E_IN, E_OUT> {
637        /**
638         * Construct a new Stream by appending a stateful intermediate operation
639         * to an existing stream.
640         * @param upstream The upstream pipeline stage
641         * @param inputShape The stream shape for the upstream pipeline stage
642         * @param opFlags Operation flags for the new stage
643         */
644        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
645                   StreamShape inputShape,
646                   int opFlags) {
647            super(upstream, opFlags);
648            assert upstream.getOutputShape() == inputShape;
649        }
650
651        @Override
652        final boolean opIsStateful() {
653            return true;
654        }
655
656        @Override
657        abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
658                                                       Spliterator<P_IN> spliterator,
659                                                       IntFunction<E_OUT[]> generator);
660    }
661}
662