1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Objects;
28import java.util.Optional;
29import java.util.OptionalDouble;
30import java.util.OptionalInt;
31import java.util.OptionalLong;
32import java.util.Spliterator;
33import java.util.concurrent.CountedCompleter;
34import java.util.function.BiConsumer;
35import java.util.function.BiFunction;
36import java.util.function.BinaryOperator;
37import java.util.function.DoubleBinaryOperator;
38import java.util.function.IntBinaryOperator;
39import java.util.function.LongBinaryOperator;
40import java.util.function.ObjDoubleConsumer;
41import java.util.function.ObjIntConsumer;
42import java.util.function.ObjLongConsumer;
43import java.util.function.Supplier;
44
45/**
46 * Factory for creating instances of {@code TerminalOp} that implement
47 * reductions.
48 *
49 * @since 1.8
50 */
51final class ReduceOps {
52
53    private ReduceOps() { }
54
55    /**
56     * Constructs a {@code TerminalOp} that implements a functional reduce on
57     * reference values.
58     *
59     * @param <T> the type of the input elements
60     * @param <U> the type of the result
61     * @param seed the identity element for the reduction
62     * @param reducer the accumulating function that incorporates an additional
63     *        input element into the result
64     * @param combiner the combining function that combines two intermediate
65     *        results
66     * @return a {@code TerminalOp} implementing the reduction
67     */
68    public static <T, U> TerminalOp<T, U>
69    makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
70        Objects.requireNonNull(reducer);
71        Objects.requireNonNull(combiner);
72        class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
73            @Override
74            public void begin(long size) {
75                state = seed;
76            }
77
78            @Override
79            public void accept(T t) {
80                state = reducer.apply(state, t);
81            }
82
83            @Override
84            public void combine(ReducingSink other) {
85                state = combiner.apply(state, other.state);
86            }
87        }
88        return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
89            @Override
90            public ReducingSink makeSink() {
91                return new ReducingSink();
92            }
93        };
94    }
95
96    /**
97     * Constructs a {@code TerminalOp} that implements a functional reduce on
98     * reference values producing an optional reference result.
99     *
100     * @param <T> The type of the input elements, and the type of the result
101     * @param operator The reducing function
102     * @return A {@code TerminalOp} implementing the reduction
103     */
104    public static <T> TerminalOp<T, Optional<T>>
105    makeRef(BinaryOperator<T> operator) {
106        Objects.requireNonNull(operator);
107        class ReducingSink
108                implements AccumulatingSink<T, Optional<T>, ReducingSink> {
109            private boolean empty;
110            private T state;
111
112            public void begin(long size) {
113                empty = true;
114                state = null;
115            }
116
117            @Override
118            public void accept(T t) {
119                if (empty) {
120                    empty = false;
121                    state = t;
122                } else {
123                    state = operator.apply(state, t);
124                }
125            }
126
127            @Override
128            public Optional<T> get() {
129                return empty ? Optional.empty() : Optional.of(state);
130            }
131
132            @Override
133            public void combine(ReducingSink other) {
134                if (!other.empty)
135                    accept(other.state);
136            }
137        }
138        return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
139            @Override
140            public ReducingSink makeSink() {
141                return new ReducingSink();
142            }
143        };
144    }
145
146    /**
147     * Constructs a {@code TerminalOp} that implements a mutable reduce on
148     * reference values.
149     *
150     * @param <T> the type of the input elements
151     * @param <I> the type of the intermediate reduction result
152     * @param collector a {@code Collector} defining the reduction
153     * @return a {@code ReduceOp} implementing the reduction
154     */
155    public static <T, I> TerminalOp<T, I>
156    makeRef(Collector<? super T, I, ?> collector) {
157        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
158        BiConsumer<I, ? super T> accumulator = collector.accumulator();
159        BinaryOperator<I> combiner = collector.combiner();
160        class ReducingSink extends Box<I>
161                implements AccumulatingSink<T, I, ReducingSink> {
162            @Override
163            public void begin(long size) {
164                state = supplier.get();
165            }
166
167            @Override
168            public void accept(T t) {
169                accumulator.accept(state, t);
170            }
171
172            @Override
173            public void combine(ReducingSink other) {
174                state = combiner.apply(state, other.state);
175            }
176        }
177        return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
178            @Override
179            public ReducingSink makeSink() {
180                return new ReducingSink();
181            }
182
183            @Override
184            public int getOpFlags() {
185                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
186                       ? StreamOpFlag.NOT_ORDERED
187                       : 0;
188            }
189        };
190    }
191
192    /**
193     * Constructs a {@code TerminalOp} that implements a mutable reduce on
194     * reference values.
195     *
196     * @param <T> the type of the input elements
197     * @param <R> the type of the result
198     * @param seedFactory a factory to produce a new base accumulator
199     * @param accumulator a function to incorporate an element into an
200     *        accumulator
201     * @param reducer a function to combine an accumulator into another
202     * @return a {@code TerminalOp} implementing the reduction
203     */
204    public static <T, R> TerminalOp<T, R>
205    makeRef(Supplier<R> seedFactory,
206            BiConsumer<R, ? super T> accumulator,
207            BiConsumer<R,R> reducer) {
208        Objects.requireNonNull(seedFactory);
209        Objects.requireNonNull(accumulator);
210        Objects.requireNonNull(reducer);
211        class ReducingSink extends Box<R>
212                implements AccumulatingSink<T, R, ReducingSink> {
213            @Override
214            public void begin(long size) {
215                state = seedFactory.get();
216            }
217
218            @Override
219            public void accept(T t) {
220                accumulator.accept(state, t);
221            }
222
223            @Override
224            public void combine(ReducingSink other) {
225                reducer.accept(state, other.state);
226            }
227        }
228        return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
229            @Override
230            public ReducingSink makeSink() {
231                return new ReducingSink();
232            }
233        };
234    }
235
236    /**
237     * Constructs a {@code TerminalOp} that implements a functional reduce on
238     * {@code int} values.
239     *
240     * @param identity the identity for the combining function
241     * @param operator the combining function
242     * @return a {@code TerminalOp} implementing the reduction
243     */
244    public static TerminalOp<Integer, Integer>
245    makeInt(int identity, IntBinaryOperator operator) {
246        Objects.requireNonNull(operator);
247        class ReducingSink
248                implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
249            private int state;
250
251            @Override
252            public void begin(long size) {
253                state = identity;
254            }
255
256            @Override
257            public void accept(int t) {
258                state = operator.applyAsInt(state, t);
259            }
260
261            @Override
262            public Integer get() {
263                return state;
264            }
265
266            @Override
267            public void combine(ReducingSink other) {
268                accept(other.state);
269            }
270        }
271        return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
272            @Override
273            public ReducingSink makeSink() {
274                return new ReducingSink();
275            }
276        };
277    }
278
279    /**
280     * Constructs a {@code TerminalOp} that implements a functional reduce on
281     * {@code int} values, producing an optional integer result.
282     *
283     * @param operator the combining function
284     * @return a {@code TerminalOp} implementing the reduction
285     */
286    public static TerminalOp<Integer, OptionalInt>
287    makeInt(IntBinaryOperator operator) {
288        Objects.requireNonNull(operator);
289        class ReducingSink
290                implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
291            private boolean empty;
292            private int state;
293
294            public void begin(long size) {
295                empty = true;
296                state = 0;
297            }
298
299            @Override
300            public void accept(int t) {
301                if (empty) {
302                    empty = false;
303                    state = t;
304                }
305                else {
306                    state = operator.applyAsInt(state, t);
307                }
308            }
309
310            @Override
311            public OptionalInt get() {
312                return empty ? OptionalInt.empty() : OptionalInt.of(state);
313            }
314
315            @Override
316            public void combine(ReducingSink other) {
317                if (!other.empty)
318                    accept(other.state);
319            }
320        }
321        return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
322            @Override
323            public ReducingSink makeSink() {
324                return new ReducingSink();
325            }
326        };
327    }
328
329    /**
330     * Constructs a {@code TerminalOp} that implements a mutable reduce on
331     * {@code int} values.
332     *
333     * @param <R> The type of the result
334     * @param supplier a factory to produce a new accumulator of the result type
335     * @param accumulator a function to incorporate an int into an
336     *        accumulator
337     * @param combiner a function to combine an accumulator into another
338     * @return A {@code ReduceOp} implementing the reduction
339     */
340    public static <R> TerminalOp<Integer, R>
341    makeInt(Supplier<R> supplier,
342            ObjIntConsumer<R> accumulator,
343            BinaryOperator<R> combiner) {
344        Objects.requireNonNull(supplier);
345        Objects.requireNonNull(accumulator);
346        Objects.requireNonNull(combiner);
347        class ReducingSink extends Box<R>
348                implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
349            @Override
350            public void begin(long size) {
351                state = supplier.get();
352            }
353
354            @Override
355            public void accept(int t) {
356                accumulator.accept(state, t);
357            }
358
359            @Override
360            public void combine(ReducingSink other) {
361                state = combiner.apply(state, other.state);
362            }
363        }
364        return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
365            @Override
366            public ReducingSink makeSink() {
367                return new ReducingSink();
368            }
369        };
370    }
371
372    /**
373     * Constructs a {@code TerminalOp} that implements a functional reduce on
374     * {@code long} values.
375     *
376     * @param identity the identity for the combining function
377     * @param operator the combining function
378     * @return a {@code TerminalOp} implementing the reduction
379     */
380    public static TerminalOp<Long, Long>
381    makeLong(long identity, LongBinaryOperator operator) {
382        Objects.requireNonNull(operator);
383        class ReducingSink
384                implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
385            private long state;
386
387            @Override
388            public void begin(long size) {
389                state = identity;
390            }
391
392            @Override
393            public void accept(long t) {
394                state = operator.applyAsLong(state, t);
395            }
396
397            @Override
398            public Long get() {
399                return state;
400            }
401
402            @Override
403            public void combine(ReducingSink other) {
404                accept(other.state);
405            }
406        }
407        return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
408            @Override
409            public ReducingSink makeSink() {
410                return new ReducingSink();
411            }
412        };
413    }
414
415    /**
416     * Constructs a {@code TerminalOp} that implements a functional reduce on
417     * {@code long} values, producing an optional long result.
418     *
419     * @param operator the combining function
420     * @return a {@code TerminalOp} implementing the reduction
421     */
422    public static TerminalOp<Long, OptionalLong>
423    makeLong(LongBinaryOperator operator) {
424        Objects.requireNonNull(operator);
425        class ReducingSink
426                implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
427            private boolean empty;
428            private long state;
429
430            public void begin(long size) {
431                empty = true;
432                state = 0;
433            }
434
435            @Override
436            public void accept(long t) {
437                if (empty) {
438                    empty = false;
439                    state = t;
440                }
441                else {
442                    state = operator.applyAsLong(state, t);
443                }
444            }
445
446            @Override
447            public OptionalLong get() {
448                return empty ? OptionalLong.empty() : OptionalLong.of(state);
449            }
450
451            @Override
452            public void combine(ReducingSink other) {
453                if (!other.empty)
454                    accept(other.state);
455            }
456        }
457        return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
458            @Override
459            public ReducingSink makeSink() {
460                return new ReducingSink();
461            }
462        };
463    }
464
465    /**
466     * Constructs a {@code TerminalOp} that implements a mutable reduce on
467     * {@code long} values.
468     *
469     * @param <R> the type of the result
470     * @param supplier a factory to produce a new accumulator of the result type
471     * @param accumulator a function to incorporate an int into an
472     *        accumulator
473     * @param combiner a function to combine an accumulator into another
474     * @return a {@code TerminalOp} implementing the reduction
475     */
476    public static <R> TerminalOp<Long, R>
477    makeLong(Supplier<R> supplier,
478             ObjLongConsumer<R> accumulator,
479             BinaryOperator<R> combiner) {
480        Objects.requireNonNull(supplier);
481        Objects.requireNonNull(accumulator);
482        Objects.requireNonNull(combiner);
483        class ReducingSink extends Box<R>
484                implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
485            @Override
486            public void begin(long size) {
487                state = supplier.get();
488            }
489
490            @Override
491            public void accept(long t) {
492                accumulator.accept(state, t);
493            }
494
495            @Override
496            public void combine(ReducingSink other) {
497                state = combiner.apply(state, other.state);
498            }
499        }
500        return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
501            @Override
502            public ReducingSink makeSink() {
503                return new ReducingSink();
504            }
505        };
506    }
507
508    /**
509     * Constructs a {@code TerminalOp} that implements a functional reduce on
510     * {@code double} values.
511     *
512     * @param identity the identity for the combining function
513     * @param operator the combining function
514     * @return a {@code TerminalOp} implementing the reduction
515     */
516    public static TerminalOp<Double, Double>
517    makeDouble(double identity, DoubleBinaryOperator operator) {
518        Objects.requireNonNull(operator);
519        class ReducingSink
520                implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
521            private double state;
522
523            @Override
524            public void begin(long size) {
525                state = identity;
526            }
527
528            @Override
529            public void accept(double t) {
530                state = operator.applyAsDouble(state, t);
531            }
532
533            @Override
534            public Double get() {
535                return state;
536            }
537
538            @Override
539            public void combine(ReducingSink other) {
540                accept(other.state);
541            }
542        }
543        return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
544            @Override
545            public ReducingSink makeSink() {
546                return new ReducingSink();
547            }
548        };
549    }
550
551    /**
552     * Constructs a {@code TerminalOp} that implements a functional reduce on
553     * {@code double} values, producing an optional double result.
554     *
555     * @param operator the combining function
556     * @return a {@code TerminalOp} implementing the reduction
557     */
558    public static TerminalOp<Double, OptionalDouble>
559    makeDouble(DoubleBinaryOperator operator) {
560        Objects.requireNonNull(operator);
561        class ReducingSink
562                implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
563            private boolean empty;
564            private double state;
565
566            public void begin(long size) {
567                empty = true;
568                state = 0;
569            }
570
571            @Override
572            public void accept(double t) {
573                if (empty) {
574                    empty = false;
575                    state = t;
576                }
577                else {
578                    state = operator.applyAsDouble(state, t);
579                }
580            }
581
582            @Override
583            public OptionalDouble get() {
584                return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
585            }
586
587            @Override
588            public void combine(ReducingSink other) {
589                if (!other.empty)
590                    accept(other.state);
591            }
592        }
593        return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
594            @Override
595            public ReducingSink makeSink() {
596                return new ReducingSink();
597            }
598        };
599    }
600
601    /**
602     * Constructs a {@code TerminalOp} that implements a mutable reduce on
603     * {@code double} values.
604     *
605     * @param <R> the type of the result
606     * @param supplier a factory to produce a new accumulator of the result type
607     * @param accumulator a function to incorporate an int into an
608     *        accumulator
609     * @param combiner a function to combine an accumulator into another
610     * @return a {@code TerminalOp} implementing the reduction
611     */
612    public static <R> TerminalOp<Double, R>
613    makeDouble(Supplier<R> supplier,
614               ObjDoubleConsumer<R> accumulator,
615               BinaryOperator<R> combiner) {
616        Objects.requireNonNull(supplier);
617        Objects.requireNonNull(accumulator);
618        Objects.requireNonNull(combiner);
619        class ReducingSink extends Box<R>
620                implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
621            @Override
622            public void begin(long size) {
623                state = supplier.get();
624            }
625
626            @Override
627            public void accept(double t) {
628                accumulator.accept(state, t);
629            }
630
631            @Override
632            public void combine(ReducingSink other) {
633                state = combiner.apply(state, other.state);
634            }
635        }
636        return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
637            @Override
638            public ReducingSink makeSink() {
639                return new ReducingSink();
640            }
641        };
642    }
643
644    /**
645     * A type of {@code TerminalSink} that implements an associative reducing
646     * operation on elements of type {@code T} and producing a result of type
647     * {@code R}.
648     *
649     * @param <T> the type of input element to the combining operation
650     * @param <R> the result type
651     * @param <K> the type of the {@code AccumulatingSink}.
652     */
653    private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
654            extends TerminalSink<T, R> {
655        public void combine(K other);
656    }
657
658    /**
659     * State box for a single state element, used as a base class for
660     * {@code AccumulatingSink} instances
661     *
662     * @param <U> The type of the state element
663     */
664    private static abstract class Box<U> {
665        U state;
666
667        Box() {} // Avoid creation of special accessor
668
669        public U get() {
670            return state;
671        }
672    }
673
674    /**
675     * A {@code TerminalOp} that evaluates a stream pipeline and sends the
676     * output into an {@code AccumulatingSink}, which performs a reduce
677     * operation. The {@code AccumulatingSink} must represent an associative
678     * reducing operation.
679     *
680     * @param <T> the output type of the stream pipeline
681     * @param <R> the result type of the reducing operation
682     * @param <S> the type of the {@code AccumulatingSink}
683     */
684    private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
685            implements TerminalOp<T, R> {
686        private final StreamShape inputShape;
687
688        /**
689         * Create a {@code ReduceOp} of the specified stream shape which uses
690         * the specified {@code Supplier} to create accumulating sinks.
691         *
692         * @param shape The shape of the stream pipeline
693         */
694        ReduceOp(StreamShape shape) {
695            inputShape = shape;
696        }
697
698        public abstract S makeSink();
699
700        @Override
701        public StreamShape inputShape() {
702            return inputShape;
703        }
704
705        @Override
706        public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
707                                           Spliterator<P_IN> spliterator) {
708            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
709        }
710
711        @Override
712        public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
713                                         Spliterator<P_IN> spliterator) {
714            return new ReduceTask<>(this, helper, spliterator).invoke().get();
715        }
716    }
717
718    /**
719     * A {@code ForkJoinTask} for performing a parallel reduce operation.
720     */
721    @SuppressWarnings("serial")
722    private static final class ReduceTask<P_IN, P_OUT, R,
723                                          S extends AccumulatingSink<P_OUT, R, S>>
724            extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
725        private final ReduceOp<P_OUT, R, S> op;
726
727        ReduceTask(ReduceOp<P_OUT, R, S> op,
728                   PipelineHelper<P_OUT> helper,
729                   Spliterator<P_IN> spliterator) {
730            super(helper, spliterator);
731            this.op = op;
732        }
733
734        ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
735                   Spliterator<P_IN> spliterator) {
736            super(parent, spliterator);
737            this.op = parent.op;
738        }
739
740        @Override
741        protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
742            return new ReduceTask<>(this, spliterator);
743        }
744
745        @Override
746        protected S doLeaf() {
747            return helper.wrapAndCopyInto(op.makeSink(), spliterator);
748        }
749
750        @Override
751        public void onCompletion(CountedCompleter<?> caller) {
752            if (!isLeaf()) {
753                S leftResult = leftChild.getLocalResult();
754                leftResult.combine(rightChild.getLocalResult());
755                setLocalResult(leftResult);
756            }
757            // GC spliterator, left and right child
758            super.onCompletion(caller);
759        }
760    }
761}
762