IntPipeline.java revision 603700058363f21b7b2ecae9c44e9617183ab3f9
1/*
2 * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.IntSummaryStatistics;
28import java.util.Objects;
29import java.util.OptionalDouble;
30import java.util.OptionalInt;
31import java.util.PrimitiveIterator;
32import java.util.Spliterator;
33import java.util.Spliterators;
34import java.util.function.BiConsumer;
35import java.util.function.BinaryOperator;
36import java.util.function.IntBinaryOperator;
37import java.util.function.IntConsumer;
38import java.util.function.IntFunction;
39import java.util.function.IntPredicate;
40import java.util.function.IntToDoubleFunction;
41import java.util.function.IntToLongFunction;
42import java.util.function.IntUnaryOperator;
43import java.util.function.ObjIntConsumer;
44import java.util.function.Supplier;
45
46/**
47 * Abstract base class for an intermediate pipeline stage or pipeline source
48 * stage implementing whose elements are of type {@code int}.
49 *
50 * @param  type of elements in the upstream source
51 * @since 1.8
52 * @hide Visible for CTS testing only (OpenJDK8 tests).
53 */
54public abstract class IntPipeline<E_IN>
55        extends AbstractPipeline<E_IN, Integer, IntStream>
56        implements IntStream {
57
58    /**
59     * Constructor for the head of a stream pipeline.
60     *
61     * @param source {@code Supplier<Spliterator>} describing the stream source
62     * @param sourceFlags The source flags for the stream source, described in
63     *        {@link StreamOpFlag}
64     * @param parallel {@code true} if the pipeline is parallel
65     */
66    IntPipeline(Supplier<? extends Spliterator<Integer>> source,
67                int sourceFlags, boolean parallel) {
68        super(source, sourceFlags, parallel);
69    }
70
71    /**
72     * Constructor for the head of a stream pipeline.
73     *
74     * @param source {@code Spliterator} describing the stream source
75     * @param sourceFlags The source flags for the stream source, described in
76     *        {@link StreamOpFlag}
77     * @param parallel {@code true} if the pipeline is parallel
78     */
79    IntPipeline(Spliterator<Integer> source,
80                int sourceFlags, boolean parallel) {
81        super(source, sourceFlags, parallel);
82    }
83
84    /**
85     * Constructor for appending an intermediate operation onto an existing
86     * pipeline.
87     *
88     * @param upstream the upstream element source
89     * @param opFlags the operation flags for the new operation
90     */
91    IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
92        super(upstream, opFlags);
93    }
94
95    /**
96     * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply
97     * by casting.
98     */
99    private static IntConsumer adapt(Sink<Integer> sink) {
100        if (sink instanceof IntConsumer) {
101            return (IntConsumer) sink;
102        }
103        else {
104            if (Tripwire.ENABLED)
105                Tripwire.trip(AbstractPipeline.class,
106                              "using IntStream.adapt(Sink<Integer> s)");
107            return sink::accept;
108        }
109    }
110
111    /**
112     * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}.
113     *
114     * @implNote
115     * The implementation attempts to cast to a Spliterator.OfInt, and throws an
116     * exception if this cast is not possible.
117     */
118    private static Spliterator.OfInt adapt(Spliterator<Integer> s) {
119        if (s instanceof Spliterator.OfInt) {
120            return (Spliterator.OfInt) s;
121        }
122        else {
123            if (Tripwire.ENABLED)
124                Tripwire.trip(AbstractPipeline.class,
125                              "using IntStream.adapt(Spliterator<Integer> s)");
126            throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)");
127        }
128    }
129
130
131    // Shape-specific methods
132
133    @Override
134    public final StreamShape getOutputShape() {
135        return StreamShape.INT_VALUE;
136    }
137
138    @Override
139    public final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper,
140                                                     Spliterator<P_IN> spliterator,
141                                                     boolean flattenTree,
142                                                     IntFunction<Integer[]> generator) {
143        return Nodes.collectInt(helper, spliterator, flattenTree);
144    }
145
146    @Override
147    public final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph,
148                                                  Supplier<Spliterator<P_IN>> supplier,
149                                                  boolean isParallel) {
150        return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel);
151    }
152
153    @Override
154    @SuppressWarnings("unchecked")
155    public final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) {
156        return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier);
157    }
158
159    @Override
160    public final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
161        Spliterator.OfInt spl = adapt(spliterator);
162        IntConsumer adaptedSink = adapt(sink);
163        do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
164    }
165
166    @Override
167    public final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown,
168                                                       IntFunction<Integer[]> generator) {
169        return Nodes.intBuilder(exactSizeIfKnown);
170    }
171
172
173    // IntStream
174
175    @Override
176    public final PrimitiveIterator.OfInt iterator() {
177        return Spliterators.iterator(spliterator());
178    }
179
180    @Override
181    public final Spliterator.OfInt spliterator() {
182        return adapt(super.spliterator());
183    }
184
185    // Stateless intermediate ops from IntStream
186
187    @Override
188    public final LongStream asLongStream() {
189        return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
190                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
191            @Override
192            public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
193                return new Sink.ChainedInt<Long>(sink) {
194                    @Override
195                    public void accept(int t) {
196                        downstream.accept((long) t);
197                    }
198                };
199            }
200        };
201    }
202
203    @Override
204    public final DoubleStream asDoubleStream() {
205        return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
206                                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
207            @Override
208            public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
209                return new Sink.ChainedInt<Double>(sink) {
210                    @Override
211                    public void accept(int t) {
212                        downstream.accept((double) t);
213                    }
214                };
215            }
216        };
217    }
218
219    @Override
220    public final Stream<Integer> boxed() {
221        return mapToObj(Integer::valueOf);
222    }
223
224    @Override
225    public final IntStream map(IntUnaryOperator mapper) {
226        Objects.requireNonNull(mapper);
227        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
228                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
229            @Override
230            public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
231                return new Sink.ChainedInt<Integer>(sink) {
232                    @Override
233                    public void accept(int t) {
234                        downstream.accept(mapper.applyAsInt(t));
235                    }
236                };
237            }
238        };
239    }
240
241    @Override
242    public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
243        Objects.requireNonNull(mapper);
244        return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE,
245                                                             StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
246            @Override
247            public Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
248                return new Sink.ChainedInt<U>(sink) {
249                    @Override
250                    public void accept(int t) {
251                        downstream.accept(mapper.apply(t));
252                    }
253                };
254            }
255        };
256    }
257
258    @Override
259    public final LongStream mapToLong(IntToLongFunction mapper) {
260        Objects.requireNonNull(mapper);
261        return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
262                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
263            @Override
264            public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
265                return new Sink.ChainedInt<Long>(sink) {
266                    @Override
267                    public void accept(int t) {
268                        downstream.accept(mapper.applyAsLong(t));
269                    }
270                };
271            }
272        };
273    }
274
275    @Override
276    public final DoubleStream mapToDouble(IntToDoubleFunction mapper) {
277        Objects.requireNonNull(mapper);
278        return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
279                                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
280            @Override
281            public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
282                return new Sink.ChainedInt<Double>(sink) {
283                    @Override
284                    public void accept(int t) {
285                        downstream.accept(mapper.applyAsDouble(t));
286                    }
287                };
288            }
289        };
290    }
291
292    @Override
293    public final IntStream flatMap(IntFunction<? extends IntStream> mapper) {
294        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
295                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
296            @Override
297            public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
298                return new Sink.ChainedInt<Integer>(sink) {
299                    @Override
300                    public void begin(long size) {
301                        downstream.begin(-1);
302                    }
303
304                    @Override
305                    public void accept(int t) {
306                        try (IntStream result = mapper.apply(t)) {
307                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
308                            if (result != null)
309                                result.sequential().forEach(i -> downstream.accept(i));
310                        }
311                    }
312                };
313            }
314        };
315    }
316
317    @Override
318    public IntStream unordered() {
319        if (!isOrdered())
320            return this;
321        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) {
322            @Override
323            public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
324                return sink;
325            }
326        };
327    }
328
329    @Override
330    public final IntStream filter(IntPredicate predicate) {
331        Objects.requireNonNull(predicate);
332        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
333                                        StreamOpFlag.NOT_SIZED) {
334            @Override
335            public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
336                return new Sink.ChainedInt<Integer>(sink) {
337                    @Override
338                    public void begin(long size) {
339                        downstream.begin(-1);
340                    }
341
342                    @Override
343                    public void accept(int t) {
344                        if (predicate.test(t))
345                            downstream.accept(t);
346                    }
347                };
348            }
349        };
350    }
351
352    @Override
353    public final IntStream peek(IntConsumer action) {
354        Objects.requireNonNull(action);
355        return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
356                                        0) {
357            @Override
358            public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
359                return new Sink.ChainedInt<Integer>(sink) {
360                    @Override
361                    public void accept(int t) {
362                        action.accept(t);
363                        downstream.accept(t);
364                    }
365                };
366            }
367        };
368    }
369
370    // Stateful intermediate ops from IntStream
371
372    @Override
373    public final IntStream limit(long maxSize) {
374        if (maxSize < 0)
375            throw new IllegalArgumentException(Long.toString(maxSize));
376        return SliceOps.makeInt(this, 0, maxSize);
377    }
378
379    @Override
380    public final IntStream skip(long n) {
381        if (n < 0)
382            throw new IllegalArgumentException(Long.toString(n));
383        if (n == 0)
384            return this;
385        else
386            return SliceOps.makeInt(this, n, -1);
387    }
388
389    @Override
390    public final IntStream sorted() {
391        return SortedOps.makeInt(this);
392    }
393
394    @Override
395    public final IntStream distinct() {
396        // While functional and quick to implement, this approach is not very efficient.
397        // An efficient version requires an int-specific map/set implementation.
398        return boxed().distinct().mapToInt(i -> i);
399    }
400
401    // Terminal ops from IntStream
402
403    @Override
404    public void forEach(IntConsumer action) {
405        evaluate(ForEachOps.makeInt(action, false));
406    }
407
408    @Override
409    public void forEachOrdered(IntConsumer action) {
410        evaluate(ForEachOps.makeInt(action, true));
411    }
412
413    @Override
414    public final int sum() {
415        return reduce(0, Integer::sum);
416    }
417
418    @Override
419    public final OptionalInt min() {
420        return reduce(Math::min);
421    }
422
423    @Override
424    public final OptionalInt max() {
425        return reduce(Math::max);
426    }
427
428    @Override
429    public final long count() {
430        return mapToLong(e -> 1L).sum();
431    }
432
433    @Override
434    public final OptionalDouble average() {
435        long[] avg = collect(() -> new long[2],
436                             (ll, i) -> {
437                                 ll[0]++;
438                                 ll[1] += i;
439                             },
440                             (ll, rr) -> {
441                                 ll[0] += rr[0];
442                                 ll[1] += rr[1];
443                             });
444        return avg[0] > 0
445               ? OptionalDouble.of((double) avg[1] / avg[0])
446               : OptionalDouble.empty();
447    }
448
449    @Override
450    public final IntSummaryStatistics summaryStatistics() {
451        return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept,
452                       IntSummaryStatistics::combine);
453    }
454
455    @Override
456    public final int reduce(int identity, IntBinaryOperator op) {
457        return evaluate(ReduceOps.makeInt(identity, op));
458    }
459
460    @Override
461    public final OptionalInt reduce(IntBinaryOperator op) {
462        return evaluate(ReduceOps.makeInt(op));
463    }
464
465    @Override
466    public final <R> R collect(Supplier<R> supplier,
467                               ObjIntConsumer<R> accumulator,
468                               BiConsumer<R, R> combiner) {
469        BinaryOperator<R> operator = (left, right) -> {
470            combiner.accept(left, right);
471            return left;
472        };
473        return evaluate(ReduceOps.makeInt(supplier, accumulator, operator));
474    }
475
476    @Override
477    public final boolean anyMatch(IntPredicate predicate) {
478        return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY));
479    }
480
481    @Override
482    public final boolean allMatch(IntPredicate predicate) {
483        return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL));
484    }
485
486    @Override
487    public final boolean noneMatch(IntPredicate predicate) {
488        return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE));
489    }
490
491    @Override
492    public final OptionalInt findFirst() {
493        return evaluate(FindOps.makeInt(true));
494    }
495
496    @Override
497    public final OptionalInt findAny() {
498        return evaluate(FindOps.makeInt(false));
499    }
500
501    @Override
502    public final int[] toArray() {
503        return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new))
504                        .asPrimitiveArray();
505    }
506
507    //
508
509    /**
510     * Source stage of an IntStream.
511     *
512     * @param  type of elements in the upstream source
513     * @since 1.8
514     * @hide Visible for CTS testing only (OpenJDK8 tests).
515     */
516    public static class Head<E_IN> extends IntPipeline<E_IN> {
517        /**
518         * Constructor for the source stage of an IntStream.
519         *
520         * @param source {@code Supplier<Spliterator>} describing the stream
521         *               source
522         * @param sourceFlags the source flags for the stream source, described
523         *                    in {@link StreamOpFlag}
524         * @param parallel {@code true} if the pipeline is parallel
525         */
526        public Head(Supplier<? extends Spliterator<Integer>> source,
527             int sourceFlags, boolean parallel) {
528            super(source, sourceFlags, parallel);
529        }
530
531        /**
532         * Constructor for the source stage of an IntStream.
533         *
534         * @param source {@code Spliterator} describing the stream source
535         * @param sourceFlags the source flags for the stream source, described
536         *                    in {@link StreamOpFlag}
537         * @param parallel {@code true} if the pipeline is parallel
538         */
539        public Head(Spliterator<Integer> source,
540             int sourceFlags, boolean parallel) {
541            super(source, sourceFlags, parallel);
542        }
543
544        @Override
545        public final boolean opIsStateful() {
546            throw new UnsupportedOperationException();
547        }
548
549        @Override
550        public final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) {
551            throw new UnsupportedOperationException();
552        }
553
554        // Optimized sequential terminal operations for the head of the pipeline
555
556        @Override
557        public void forEach(IntConsumer action) {
558            if (!isParallel()) {
559                adapt(sourceStageSpliterator()).forEachRemaining(action);
560            }
561            else {
562                super.forEach(action);
563            }
564        }
565
566        @Override
567        public void forEachOrdered(IntConsumer action) {
568            if (!isParallel()) {
569                adapt(sourceStageSpliterator()).forEachRemaining(action);
570            }
571            else {
572                super.forEachOrdered(action);
573            }
574        }
575    }
576
577    /**
578     * Base class for a stateless intermediate stage of an IntStream
579     *
580     * @param  type of elements in the upstream source
581     * @since 1.8
582     * @hide Visible for CTS testing only (OpenJDK8 tests).
583     */
584    public abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> {
585        /**
586         * Construct a new IntStream by appending a stateless intermediate
587         * operation to an existing stream.
588         * @param upstream The upstream pipeline stage
589         * @param inputShape The stream shape for the upstream pipeline stage
590         * @param opFlags Operation flags for the new stage
591         */
592        public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
593                    StreamShape inputShape,
594                    int opFlags) {
595            super(upstream, opFlags);
596            assert upstream.getOutputShape() == inputShape;
597        }
598
599        @Override
600        public final boolean opIsStateful() {
601            return false;
602        }
603    }
604
605    /**
606     * Base class for a stateful intermediate stage of an IntStream.
607     *
608     * @param  type of elements in the upstream source
609     * @since 1.8
610     * @hide Visible for CTS testing only (OpenJDK8 tests).
611     */
612    public abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> {
613        /**
614         * Construct a new IntStream by appending a stateful intermediate
615         * operation to an existing stream.
616         * @param upstream The upstream pipeline stage
617         * @param inputShape The stream shape for the upstream pipeline stage
618         * @param opFlags Operation flags for the new stage
619         */
620        public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
621                   StreamShape inputShape,
622                   int opFlags) {
623            super(upstream, opFlags);
624            assert upstream.getOutputShape() == inputShape;
625        }
626
627        @Override
628        public final boolean opIsStateful() {
629            return true;
630        }
631
632        @Override
633        public abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
634                                                         Spliterator<P_IN> spliterator,
635                                                         IntFunction<Integer[]> generator);
636    }
637}
638