LongPipeline.java revision ff18b5f136f92154f2e05217e3953d10f459e561
1/*
2 * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.LongSummaryStatistics;
28import java.util.Objects;
29import java.util.OptionalDouble;
30import java.util.OptionalLong;
31import java.util.PrimitiveIterator;
32import java.util.Spliterator;
33import java.util.Spliterators;
34import java.util.function.BiConsumer;
35import java.util.function.BinaryOperator;
36import java.util.function.IntFunction;
37import java.util.function.LongBinaryOperator;
38import java.util.function.LongConsumer;
39import java.util.function.LongFunction;
40import java.util.function.LongPredicate;
41import java.util.function.LongToDoubleFunction;
42import java.util.function.LongToIntFunction;
43import java.util.function.LongUnaryOperator;
44import java.util.function.ObjLongConsumer;
45import java.util.function.Supplier;
46
47/**
48 * Abstract base class for an intermediate pipeline stage or pipeline source
49 * stage implementing whose elements are of type {@code long}.
50 *
51 * @param  type of elements in the upstream source
52 * @since 1.8
53 */
54abstract class LongPipeline<E_IN>
55        extends AbstractPipeline<E_IN, Long, LongStream>
56        implements LongStream {
57
58    /**
59     * Constructor for the head of a stream pipeline.
60     *
61     * @param source {@code Supplier<Spliterator>} describing the stream source
62     * @param sourceFlags the source flags for the stream source, described in
63     *        {@link StreamOpFlag}
64     * @param parallel {@code true} if the pipeline is parallel
65     */
66    LongPipeline(Supplier<? extends Spliterator<Long>> source,
67                 int sourceFlags, boolean parallel) {
68        super(source, sourceFlags, parallel);
69    }
70
71    /**
72     * Constructor for the head of a stream pipeline.
73     *
74     * @param source {@code Spliterator} describing the stream source
75     * @param sourceFlags the source flags for the stream source, described in
76     *        {@link StreamOpFlag}
77     * @param parallel {@code true} if the pipeline is parallel
78     */
79    LongPipeline(Spliterator<Long> source,
80                 int sourceFlags, boolean parallel) {
81        super(source, sourceFlags, parallel);
82    }
83
84    /**
85     * Constructor for appending an intermediate operation onto an existing pipeline.
86     *
87     * @param upstream the upstream element source.
88     * @param opFlags the operation flags
89     */
90    LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
91        super(upstream, opFlags);
92    }
93
94    /**
95     * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply
96     * by casting.
97     */
98    private static LongConsumer adapt(Sink<Long> sink) {
99        if (sink instanceof LongConsumer) {
100            return (LongConsumer) sink;
101        } else {
102            if (Tripwire.ENABLED)
103                Tripwire.trip(AbstractPipeline.class,
104                              "using LongStream.adapt(Sink<Long> s)");
105            return sink::accept;
106        }
107    }
108
109    /**
110     * Adapt a {@code Spliterator<Long>} to a {@code Spliterator.OfLong}.
111     *
112     * @implNote
113     * The implementation attempts to cast to a Spliterator.OfLong, and throws
114     * an exception if this cast is not possible.
115     */
116    private static Spliterator.OfLong adapt(Spliterator<Long> s) {
117        if (s instanceof Spliterator.OfLong) {
118            return (Spliterator.OfLong) s;
119        } else {
120            if (Tripwire.ENABLED)
121                Tripwire.trip(AbstractPipeline.class,
122                              "using LongStream.adapt(Spliterator<Long> s)");
123            throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)");
124        }
125    }
126
127
128    // Shape-specific methods
129
130    @Override
131    final StreamShape getOutputShape() {
132        return StreamShape.LONG_VALUE;
133    }
134
135    @Override
136    final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
137                                           Spliterator<P_IN> spliterator,
138                                           boolean flattenTree,
139                                           IntFunction<Long[]> generator) {
140        return Nodes.collectLong(helper, spliterator, flattenTree);
141    }
142
143    @Override
144    final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
145                                        Supplier<Spliterator<P_IN>> supplier,
146                                        boolean isParallel) {
147        return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
148    }
149
150    @Override
151    @SuppressWarnings("unchecked")
152    final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
153        return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
154    }
155
156    @Override
157    final void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
158        Spliterator.OfLong spl = adapt(spliterator);
159        LongConsumer adaptedSink =  adapt(sink);
160        do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
161    }
162
163    @Override
164    final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
165        return Nodes.longBuilder(exactSizeIfKnown);
166    }
167
168
169    // LongStream
170
171    @Override
172    public final PrimitiveIterator.OfLong iterator() {
173        return Spliterators.iterator(spliterator());
174    }
175
176    @Override
177    public final Spliterator.OfLong spliterator() {
178        return adapt(super.spliterator());
179    }
180
181    // Stateless intermediate ops from LongStream
182
183    @Override
184    public final DoubleStream asDoubleStream() {
185        return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
186                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
187            @Override
188            Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
189                return new Sink.ChainedLong<Double>(sink) {
190                    @Override
191                    public void accept(long t) {
192                        downstream.accept((double) t);
193                    }
194                };
195            }
196        };
197    }
198
199    @Override
200    public final Stream<Long> boxed() {
201        return mapToObj(Long::valueOf);
202    }
203
204    @Override
205    public final LongStream map(LongUnaryOperator mapper) {
206        Objects.requireNonNull(mapper);
207        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
208                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
209            @Override
210            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
211                return new Sink.ChainedLong<Long>(sink) {
212                    @Override
213                    public void accept(long t) {
214                        downstream.accept(mapper.applyAsLong(t));
215                    }
216                };
217            }
218        };
219    }
220
221    @Override
222    public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
223        Objects.requireNonNull(mapper);
224        return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE,
225                                                          StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
226            @Override
227            Sink<Long> opWrapSink(int flags, Sink<U> sink) {
228                return new Sink.ChainedLong<U>(sink) {
229                    @Override
230                    public void accept(long t) {
231                        downstream.accept(mapper.apply(t));
232                    }
233                };
234            }
235        };
236    }
237
238    @Override
239    public final IntStream mapToInt(LongToIntFunction mapper) {
240        Objects.requireNonNull(mapper);
241        return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
242                                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
243            @Override
244            Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
245                return new Sink.ChainedLong<Integer>(sink) {
246                    @Override
247                    public void accept(long t) {
248                        downstream.accept(mapper.applyAsInt(t));
249                    }
250                };
251            }
252        };
253    }
254
255    @Override
256    public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
257        Objects.requireNonNull(mapper);
258        return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
259                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
260            @Override
261            Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
262                return new Sink.ChainedLong<Double>(sink) {
263                    @Override
264                    public void accept(long t) {
265                        downstream.accept(mapper.applyAsDouble(t));
266                    }
267                };
268            }
269        };
270    }
271
272    @Override
273    public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
274        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
275                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
276            @Override
277            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
278                return new Sink.ChainedLong<Long>(sink) {
279                    @Override
280                    public void begin(long size) {
281                        downstream.begin(-1);
282                    }
283
284                    @Override
285                    public void accept(long t) {
286                        try (LongStream result = mapper.apply(t)) {
287                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
288                            if (result != null)
289                                result.sequential().forEach(i -> downstream.accept(i));
290                        }
291                    }
292                };
293            }
294        };
295    }
296
297    @Override
298    public LongStream unordered() {
299        if (!isOrdered())
300            return this;
301        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
302            @Override
303            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
304                return sink;
305            }
306        };
307    }
308
309    @Override
310    public final LongStream filter(LongPredicate predicate) {
311        Objects.requireNonNull(predicate);
312        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
313                                     StreamOpFlag.NOT_SIZED) {
314            @Override
315            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
316                return new Sink.ChainedLong<Long>(sink) {
317                    @Override
318                    public void begin(long size) {
319                        downstream.begin(-1);
320                    }
321
322                    @Override
323                    public void accept(long t) {
324                        if (predicate.test(t))
325                            downstream.accept(t);
326                    }
327                };
328            }
329        };
330    }
331
332    @Override
333    public final LongStream peek(LongConsumer action) {
334        Objects.requireNonNull(action);
335        return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
336                                     0) {
337            @Override
338            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
339                return new Sink.ChainedLong<Long>(sink) {
340                    @Override
341                    public void accept(long t) {
342                        action.accept(t);
343                        downstream.accept(t);
344                    }
345                };
346            }
347        };
348    }
349
350    // Stateful intermediate ops from LongStream
351
352    @Override
353    public final LongStream limit(long maxSize) {
354        if (maxSize < 0)
355            throw new IllegalArgumentException(Long.toString(maxSize));
356        return SliceOps.makeLong(this, 0, maxSize);
357    }
358
359    @Override
360    public final LongStream skip(long n) {
361        if (n < 0)
362            throw new IllegalArgumentException(Long.toString(n));
363        if (n == 0)
364            return this;
365        else
366            return SliceOps.makeLong(this, n, -1);
367    }
368
369    @Override
370    public final LongStream sorted() {
371        return SortedOps.makeLong(this);
372    }
373
374    @Override
375    public final LongStream distinct() {
376        // While functional and quick to implement, this approach is not very efficient.
377        // An efficient version requires a long-specific map/set implementation.
378        return boxed().distinct().mapToLong(i -> (long) i);
379    }
380
381    // Terminal ops from LongStream
382
383    @Override
384    public void forEach(LongConsumer action) {
385        evaluate(ForEachOps.makeLong(action, false));
386    }
387
388    @Override
389    public void forEachOrdered(LongConsumer action) {
390        evaluate(ForEachOps.makeLong(action, true));
391    }
392
393    @Override
394    public final long sum() {
395        // use better algorithm to compensate for intermediate overflow?
396        return reduce(0, Long::sum);
397    }
398
399    @Override
400    public final OptionalLong min() {
401        return reduce(Math::min);
402    }
403
404    @Override
405    public final OptionalLong max() {
406        return reduce(Math::max);
407    }
408
409    @Override
410    public final OptionalDouble average() {
411        long[] avg = collect(() -> new long[2],
412                             (ll, i) -> {
413                                 ll[0]++;
414                                 ll[1] += i;
415                             },
416                             (ll, rr) -> {
417                                 ll[0] += rr[0];
418                                 ll[1] += rr[1];
419                             });
420        return avg[0] > 0
421               ? OptionalDouble.of((double) avg[1] / avg[0])
422               : OptionalDouble.empty();
423    }
424
425    @Override
426    public final long count() {
427        return map(e -> 1L).sum();
428    }
429
430    @Override
431    public final LongSummaryStatistics summaryStatistics() {
432        return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,
433                       LongSummaryStatistics::combine);
434    }
435
436    @Override
437    public final long reduce(long identity, LongBinaryOperator op) {
438        return evaluate(ReduceOps.makeLong(identity, op));
439    }
440
441    @Override
442    public final OptionalLong reduce(LongBinaryOperator op) {
443        return evaluate(ReduceOps.makeLong(op));
444    }
445
446    @Override
447    public final <R> R collect(Supplier<R> supplier,
448                               ObjLongConsumer<R> accumulator,
449                               BiConsumer<R, R> combiner) {
450        BinaryOperator<R> operator = (left, right) -> {
451            combiner.accept(left, right);
452            return left;
453        };
454        return evaluate(ReduceOps.makeLong(supplier, accumulator, operator));
455    }
456
457    @Override
458    public final boolean anyMatch(LongPredicate predicate) {
459        return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
460    }
461
462    @Override
463    public final boolean allMatch(LongPredicate predicate) {
464        return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
465    }
466
467    @Override
468    public final boolean noneMatch(LongPredicate predicate) {
469        return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
470    }
471
472    @Override
473    public final OptionalLong findFirst() {
474        return evaluate(FindOps.makeLong(true));
475    }
476
477    @Override
478    public final OptionalLong findAny() {
479        return evaluate(FindOps.makeLong(false));
480    }
481
482    @Override
483    public final long[] toArray() {
484        return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new))
485                .asPrimitiveArray();
486    }
487
488
489    //
490
491    /**
492     * Source stage of a LongPipeline.
493     *
494     * @param  type of elements in the upstream source
495     * @since 1.8
496     */
497    static class Head<E_IN> extends LongPipeline<E_IN> {
498        /**
499         * Constructor for the source stage of a LongStream.
500         *
501         * @param source {@code Supplier<Spliterator>} describing the stream
502         *               source
503         * @param sourceFlags the source flags for the stream source, described
504         *                    in {@link StreamOpFlag}
505         * @param parallel {@code true} if the pipeline is parallel
506         */
507        Head(Supplier<? extends Spliterator<Long>> source,
508             int sourceFlags, boolean parallel) {
509            super(source, sourceFlags, parallel);
510        }
511
512        /**
513         * Constructor for the source stage of a LongStream.
514         *
515         * @param source {@code Spliterator} describing the stream source
516         * @param sourceFlags the source flags for the stream source, described
517         *                    in {@link StreamOpFlag}
518         * @param parallel {@code true} if the pipeline is parallel
519         */
520        Head(Spliterator<Long> source,
521             int sourceFlags, boolean parallel) {
522            super(source, sourceFlags, parallel);
523        }
524
525        @Override
526        final boolean opIsStateful() {
527            throw new UnsupportedOperationException();
528        }
529
530        @Override
531        final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
532            throw new UnsupportedOperationException();
533        }
534
535        // Optimized sequential terminal operations for the head of the pipeline
536
537        @Override
538        public void forEach(LongConsumer action) {
539            if (!isParallel()) {
540                adapt(sourceStageSpliterator()).forEachRemaining(action);
541            } else {
542                super.forEach(action);
543            }
544        }
545
546        @Override
547        public void forEachOrdered(LongConsumer action) {
548            if (!isParallel()) {
549                adapt(sourceStageSpliterator()).forEachRemaining(action);
550            } else {
551                super.forEachOrdered(action);
552            }
553        }
554    }
555
556    /** Base class for a stateless intermediate stage of a LongStream.
557     *
558     * @param  type of elements in the upstream source
559     * @since 1.8
560     */
561    abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
562        /**
563         * Construct a new LongStream by appending a stateless intermediate
564         * operation to an existing stream.
565         * @param upstream The upstream pipeline stage
566         * @param inputShape The stream shape for the upstream pipeline stage
567         * @param opFlags Operation flags for the new stage
568         */
569        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
570                    StreamShape inputShape,
571                    int opFlags) {
572            super(upstream, opFlags);
573            assert upstream.getOutputShape() == inputShape;
574        }
575
576        @Override
577        final boolean opIsStateful() {
578            return false;
579        }
580    }
581
582    /**
583     * Base class for a stateful intermediate stage of a LongStream.
584     *
585     * @param  type of elements in the upstream source
586     * @since 1.8
587     */
588    abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
589        /**
590         * Construct a new LongStream by appending a stateful intermediate
591         * operation to an existing stream.
592         * @param upstream The upstream pipeline stage
593         * @param inputShape The stream shape for the upstream pipeline stage
594         * @param opFlags Operation flags for the new stage
595         */
596        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
597                   StreamShape inputShape,
598                   int opFlags) {
599            super(upstream, opFlags);
600            assert upstream.getOutputShape() == inputShape;
601        }
602
603        @Override
604        final boolean opIsStateful() {
605            return true;
606        }
607
608        @Override
609        abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
610                                                      Spliterator<P_IN> spliterator,
611                                                      IntFunction<Long[]> generator);
612    }
613}
614