IntPipeline.java revision d0a2645e29a9b84d7e5ec822eb9904e93bd6c013
1/*
2 * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.IntSummaryStatistics;
28import java.util.Objects;
29import java.util.OptionalDouble;
30import java.util.OptionalInt;
31import java.util.PrimitiveIterator;
32import java.util.Spliterator;
33import java.util.Spliterators;
34import java.util.function.BiConsumer;
35import java.util.function.BinaryOperator;
36import java.util.function.IntBinaryOperator;
37import java.util.function.IntConsumer;
38import java.util.function.IntFunction;
39import java.util.function.IntPredicate;
40import java.util.function.IntToDoubleFunction;
41import java.util.function.IntToLongFunction;
42import java.util.function.IntUnaryOperator;
43import java.util.function.ObjIntConsumer;
44import java.util.function.Supplier;
45
46/**
47 * Abstract base class for an intermediate pipeline stage or pipeline source
48 * stage implementing whose elements are of type {@code int}.
49 *
50 * @param  type of elements in the upstream source
51 * @since 1.8
52 */
53abstract class IntPipeline<E_IN>
54        extends AbstractPipeline<E_IN, Integer, IntStream>
55        implements IntStream {
56
57    /**
58     * Constructor for the head of a stream pipeline.
59     *
60     * @param source {@code Supplier<Spliterator>} describing the stream source
61     * @param sourceFlags The source flags for the stream source, described in
62     *        {@link StreamOpFlag}
63     * @param parallel {@code true} if the pipeline is parallel
64     */
65    IntPipeline(Supplier<? extends Spliterator<Integer>> source,
66                int sourceFlags, boolean parallel) {
67        super(source, sourceFlags, parallel);
68    }
69
70    /**
71     * Constructor for the head of a stream pipeline.
72     *
73     * @param source {@code Spliterator} describing the stream source
74     * @param sourceFlags The source flags for the stream source, described in
75     *        {@link StreamOpFlag}
76     * @param parallel {@code true} if the pipeline is parallel
77     */
78    IntPipeline(Spliterator<Integer> source,
79                int sourceFlags, boolean parallel) {
80        super(source, sourceFlags, parallel);
81    }
82
83    /**
84     * Constructor for appending an intermediate operation onto an existing
85     * pipeline.
86     *
87     * @param upstream the upstream element source
88     * @param opFlags the operation flags for the new operation
89     */
90    IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
91        super(upstream, opFlags);
92    }
93
94    /**
95     * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply
96     * by casting.
97     */
98    private static IntConsumer adapt(Sink<Integer> sink) {
99        if (sink instanceof IntConsumer) {
100            return (IntConsumer) sink;
101        }
102        else {
103            if (Tripwire.ENABLED)
104                Tripwire.trip(AbstractPipeline.class,
105                              "using IntStream.adapt(Sink<Integer> s)");
106            return sink::accept;
107        }
108    }
109
110    /**
111     * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}.
112     *
113     * @implNote
114     * The implementation attempts to cast to a Spliterator.OfInt, and throws an
115     * exception if this cast is not possible.
116     */
117    private static Spliterator.OfInt adapt(Spliterator<Integer> s) {
118        if (s instanceof Spliterator.OfInt) {
119            return (Spliterator.OfInt) s;
120        }
121        else {
122            if (Tripwire.ENABLED)
123                Tripwire.trip(AbstractPipeline.class,
124                              "using IntStream.adapt(Spliterator<Integer> s)");
125            throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)");
126        }
127    }
128
129
130    // Shape-specific methods
131
132    @Override
133    final StreamShape getOutputShape() {
134        return StreamShape.INT_VALUE;
135    }
136
137    @Override
138    final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper,
139                                              Spliterator<P_IN> spliterator,
140                                              boolean flattenTree,
141                                              IntFunction<Integer[]> generator) {
142        return Nodes.collectInt(helper, spliterator, flattenTree);
143    }
144
145    @Override
146    final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph,
147                                           Supplier<Spliterator<P_IN>> supplier,
148                                           boolean isParallel) {
149        return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel);
150    }
151
152    @Override
153    @SuppressWarnings("unchecked")
154    final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) {
155        return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier);
156    }
157
158    @Override
159    final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
160        Spliterator.OfInt spl = adapt(spliterator);
161        IntConsumer adaptedSink = adapt(sink);
162        do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
163    }
164
165    @Override
166    final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown,
167                                                IntFunction<Integer[]> generator) {
168        return Nodes.intBuilder(exactSizeIfKnown);
169    }
170
171
172    // IntStream
173
174    @Override
175    public final PrimitiveIterator.OfInt iterator() {
176        return Spliterators.iterator(spliterator());
177    }
178
179    @Override
180    public final Spliterator.OfInt spliterator() {
181        return adapt(super.spliterator());
182    }
183
184    // Stateless intermediate ops from IntStream
185
186    @Override
187    public final LongStream asLongStream() {
188        return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
189                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
190            @Override
191            Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
192                return new Sink.ChainedInt<Long>(sink) {
193                    @Override
194                    public void accept(int t) {
195                        downstream.accept((long) t);
196                    }
197                };
198            }
199        };
200    }
201
202    @Override
203    public final DoubleStream asDoubleStream() {
204        return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
205                                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
206            @Override
207            Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
208                return new Sink.ChainedInt<Double>(sink) {
209                    @Override
210                    public void accept(int t) {
211                        downstream.accept((double) t);
212                    }
213                };
214            }
215        };
216    }
217
218    @Override
219    public final Stream<Integer> boxed() {
220        return mapToObj(Integer::valueOf);
221    }
222
223    @Override
224    public final IntStream map(IntUnaryOperator mapper) {
225        Objects.requireNonNull(mapper);
226        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
227                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
228            @Override
229            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
230                return new Sink.ChainedInt<Integer>(sink) {
231                    @Override
232                    public void accept(int t) {
233                        downstream.accept(mapper.applyAsInt(t));
234                    }
235                };
236            }
237        };
238    }
239
240    @Override
241    public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
242        Objects.requireNonNull(mapper);
243        return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE,
244                                                             StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
245            @Override
246            Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
247                return new Sink.ChainedInt<U>(sink) {
248                    @Override
249                    public void accept(int t) {
250                        downstream.accept(mapper.apply(t));
251                    }
252                };
253            }
254        };
255    }
256
257    @Override
258    public final LongStream mapToLong(IntToLongFunction mapper) {
259        Objects.requireNonNull(mapper);
260        return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
261                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
262            @Override
263            Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
264                return new Sink.ChainedInt<Long>(sink) {
265                    @Override
266                    public void accept(int t) {
267                        downstream.accept(mapper.applyAsLong(t));
268                    }
269                };
270            }
271        };
272    }
273
274    @Override
275    public final DoubleStream mapToDouble(IntToDoubleFunction mapper) {
276        Objects.requireNonNull(mapper);
277        return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
278                                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
279            @Override
280            Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
281                return new Sink.ChainedInt<Double>(sink) {
282                    @Override
283                    public void accept(int t) {
284                        downstream.accept(mapper.applyAsDouble(t));
285                    }
286                };
287            }
288        };
289    }
290
291    @Override
292    public final IntStream flatMap(IntFunction<? extends IntStream> mapper) {
293        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
294                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
295            @Override
296            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
297                return new Sink.ChainedInt<Integer>(sink) {
298                    @Override
299                    public void begin(long size) {
300                        downstream.begin(-1);
301                    }
302
303                    @Override
304                    public void accept(int t) {
305                        try (IntStream result = mapper.apply(t)) {
306                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
307                            if (result != null)
308                                result.sequential().forEach(i -> downstream.accept(i));
309                        }
310                    }
311                };
312            }
313        };
314    }
315
316    @Override
317    public IntStream unordered() {
318        if (!isOrdered())
319            return this;
320        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) {
321            @Override
322            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
323                return sink;
324            }
325        };
326    }
327
328    @Override
329    public final IntStream filter(IntPredicate predicate) {
330        Objects.requireNonNull(predicate);
331        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
332                                        StreamOpFlag.NOT_SIZED) {
333            @Override
334            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
335                return new Sink.ChainedInt<Integer>(sink) {
336                    @Override
337                    public void begin(long size) {
338                        downstream.begin(-1);
339                    }
340
341                    @Override
342                    public void accept(int t) {
343                        if (predicate.test(t))
344                            downstream.accept(t);
345                    }
346                };
347            }
348        };
349    }
350
351    @Override
352    public final IntStream peek(IntConsumer action) {
353        Objects.requireNonNull(action);
354        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
355                                        0) {
356            @Override
357            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
358                return new Sink.ChainedInt<Integer>(sink) {
359                    @Override
360                    public void accept(int t) {
361                        action.accept(t);
362                        downstream.accept(t);
363                    }
364                };
365            }
366        };
367    }
368
369    // Stateful intermediate ops from IntStream
370
371    @Override
372    public final IntStream limit(long maxSize) {
373        if (maxSize < 0)
374            throw new IllegalArgumentException(Long.toString(maxSize));
375        return SliceOps.makeInt(this, 0, maxSize);
376    }
377
378    @Override
379    public final IntStream skip(long n) {
380        if (n < 0)
381            throw new IllegalArgumentException(Long.toString(n));
382        if (n == 0)
383            return this;
384        else
385            return SliceOps.makeInt(this, n, -1);
386    }
387
388    @Override
389    public final IntStream sorted() {
390        return SortedOps.makeInt(this);
391    }
392
393    @Override
394    public final IntStream distinct() {
395        // While functional and quick to implement, this approach is not very efficient.
396        // An efficient version requires an int-specific map/set implementation.
397        return boxed().distinct().mapToInt(i -> i);
398    }
399
400    // Terminal ops from IntStream
401
402    @Override
403    public void forEach(IntConsumer action) {
404        evaluate(ForEachOps.makeInt(action, false));
405    }
406
407    @Override
408    public void forEachOrdered(IntConsumer action) {
409        evaluate(ForEachOps.makeInt(action, true));
410    }
411
412    @Override
413    public final int sum() {
414        return reduce(0, Integer::sum);
415    }
416
417    @Override
418    public final OptionalInt min() {
419        return reduce(Math::min);
420    }
421
422    @Override
423    public final OptionalInt max() {
424        return reduce(Math::max);
425    }
426
427    @Override
428    public final long count() {
429        return mapToLong(e -> 1L).sum();
430    }
431
432    @Override
433    public final OptionalDouble average() {
434        long[] avg = collect(() -> new long[2],
435                             (ll, i) -> {
436                                 ll[0]++;
437                                 ll[1] += i;
438                             },
439                             (ll, rr) -> {
440                                 ll[0] += rr[0];
441                                 ll[1] += rr[1];
442                             });
443        return avg[0] > 0
444               ? OptionalDouble.of((double) avg[1] / avg[0])
445               : OptionalDouble.empty();
446    }
447
448    @Override
449    public final IntSummaryStatistics summaryStatistics() {
450        return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept,
451                       IntSummaryStatistics::combine);
452    }
453
454    @Override
455    public final int reduce(int identity, IntBinaryOperator op) {
456        return evaluate(ReduceOps.makeInt(identity, op));
457    }
458
459    @Override
460    public final OptionalInt reduce(IntBinaryOperator op) {
461        return evaluate(ReduceOps.makeInt(op));
462    }
463
464    @Override
465    public final <R> R collect(Supplier<R> supplier,
466                               ObjIntConsumer<R> accumulator,
467                               BiConsumer<R, R> combiner) {
468        BinaryOperator<R> operator = (left, right) -> {
469            combiner.accept(left, right);
470            return left;
471        };
472        return evaluate(ReduceOps.makeInt(supplier, accumulator, operator));
473    }
474
475    @Override
476    public final boolean anyMatch(IntPredicate predicate) {
477        return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY));
478    }
479
480    @Override
481    public final boolean allMatch(IntPredicate predicate) {
482        return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL));
483    }
484
485    @Override
486    public final boolean noneMatch(IntPredicate predicate) {
487        return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE));
488    }
489
490    @Override
491    public final OptionalInt findFirst() {
492        return evaluate(FindOps.makeInt(true));
493    }
494
495    @Override
496    public final OptionalInt findAny() {
497        return evaluate(FindOps.makeInt(false));
498    }
499
500    @Override
501    public final int[] toArray() {
502        return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new))
503                        .asPrimitiveArray();
504    }
505
506    //
507
508    /**
509     * Source stage of an IntStream.
510     *
511     * @param  type of elements in the upstream source
512     * @since 1.8
513     */
514    static class Head<E_IN> extends IntPipeline<E_IN> {
515        /**
516         * Constructor for the source stage of an IntStream.
517         *
518         * @param source {@code Supplier<Spliterator>} describing the stream
519         *               source
520         * @param sourceFlags the source flags for the stream source, described
521         *                    in {@link StreamOpFlag}
522         * @param parallel {@code true} if the pipeline is parallel
523         */
524        Head(Supplier<? extends Spliterator<Integer>> source,
525             int sourceFlags, boolean parallel) {
526            super(source, sourceFlags, parallel);
527        }
528
529        /**
530         * Constructor for the source stage of an IntStream.
531         *
532         * @param source {@code Spliterator} describing the stream source
533         * @param sourceFlags the source flags for the stream source, described
534         *                    in {@link StreamOpFlag}
535         * @param parallel {@code true} if the pipeline is parallel
536         */
537        Head(Spliterator<Integer> source,
538             int sourceFlags, boolean parallel) {
539            super(source, sourceFlags, parallel);
540        }
541
542        @Override
543        final boolean opIsStateful() {
544            throw new UnsupportedOperationException();
545        }
546
547        @Override
548        final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) {
549            throw new UnsupportedOperationException();
550        }
551
552        // Optimized sequential terminal operations for the head of the pipeline
553
554        @Override
555        public void forEach(IntConsumer action) {
556            if (!isParallel()) {
557                adapt(sourceStageSpliterator()).forEachRemaining(action);
558            }
559            else {
560                super.forEach(action);
561            }
562        }
563
564        @Override
565        public void forEachOrdered(IntConsumer action) {
566            if (!isParallel()) {
567                adapt(sourceStageSpliterator()).forEachRemaining(action);
568            }
569            else {
570                super.forEachOrdered(action);
571            }
572        }
573    }
574
575    /**
576     * Base class for a stateless intermediate stage of an IntStream
577     *
578     * @param  type of elements in the upstream source
579     * @since 1.8
580     */
581    abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> {
582        /**
583         * Construct a new IntStream by appending a stateless intermediate
584         * operation to an existing stream.
585         * @param upstream The upstream pipeline stage
586         * @param inputShape The stream shape for the upstream pipeline stage
587         * @param opFlags Operation flags for the new stage
588         */
589        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
590                    StreamShape inputShape,
591                    int opFlags) {
592            super(upstream, opFlags);
593            assert upstream.getOutputShape() == inputShape;
594        }
595
596        @Override
597        final boolean opIsStateful() {
598            return false;
599        }
600    }
601
602    /**
603     * Base class for a stateful intermediate stage of an IntStream.
604     *
605     * @param  type of elements in the upstream source
606     * @since 1.8
607     */
608    abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> {
609        /**
610         * Construct a new IntStream by appending a stateful intermediate
611         * operation to an existing stream.
612         * @param upstream The upstream pipeline stage
613         * @param inputShape The stream shape for the upstream pipeline stage
614         * @param opFlags Operation flags for the new stage
615         */
616        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
617                   StreamShape inputShape,
618                   int opFlags) {
619            super(upstream, opFlags);
620            assert upstream.getOutputShape() == inputShape;
621        }
622
623        @Override
624        final boolean opIsStateful() {
625            return true;
626        }
627
628        @Override
629        abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
630                                                         Spliterator<P_IN> spliterator,
631                                                         IntFunction<Integer[]> generator);
632    }
633}
634