SortedOps.java revision ff18b5f136f92154f2e05217e3953d10f459e561
1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.ArrayList;
28import java.util.Arrays;
29import java.util.Comparator;
30import java.util.Objects;
31import java.util.Spliterator;
32import java.util.function.IntFunction;
33
34
35/**
36 * Factory methods for transforming streams into sorted streams.
37 *
38 * @since 1.8
39 */
40final class SortedOps {
41
42    private SortedOps() { }
43
44    /**
45     * Appends a "sorted" operation to the provided stream.
46     *
47     * @param <T> the type of both input and output elements
48     * @param upstream a reference stream with element type T
49     */
50    static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
51        return new OfRef<>(upstream);
52    }
53
54    /**
55     * Appends a "sorted" operation to the provided stream.
56     *
57     * @param <T> the type of both input and output elements
58     * @param upstream a reference stream with element type T
59     * @param comparator the comparator to order elements by
60     */
61    static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
62                                Comparator<? super T> comparator) {
63        return new OfRef<>(upstream, comparator);
64    }
65
66    /**
67     * Appends a "sorted" operation to the provided stream.
68     *
69     * @param <T> the type of both input and output elements
70     * @param upstream a reference stream with element type T
71     */
72    static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {
73        return new OfInt(upstream);
74    }
75
76    /**
77     * Appends a "sorted" operation to the provided stream.
78     *
79     * @param <T> the type of both input and output elements
80     * @param upstream a reference stream with element type T
81     */
82    static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
83        return new OfLong(upstream);
84    }
85
86    /**
87     * Appends a "sorted" operation to the provided stream.
88     *
89     * @param <T> the type of both input and output elements
90     * @param upstream a reference stream with element type T
91     */
92    static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
93        return new OfDouble(upstream);
94    }
95
96    /**
97     * Specialized subtype for sorting reference streams
98     */
99    private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
100        /**
101         * Comparator used for sorting
102         */
103        private final boolean isNaturalSort;
104        private final Comparator<? super T> comparator;
105
106        /**
107         * Sort using natural order of {@literal <T>} which must be
108         * {@code Comparable}.
109         */
110        OfRef(AbstractPipeline<?, T, ?> upstream) {
111            super(upstream, StreamShape.REFERENCE,
112                  StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
113            this.isNaturalSort = true;
114            // Will throw CCE when we try to sort if T is not Comparable
115            @SuppressWarnings("unchecked")
116            Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
117            this.comparator = comp;
118        }
119
120        /**
121         * Sort using the provided comparator.
122         *
123         * @param comparator The comparator to be used to evaluate ordering.
124         */
125        OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
126            super(upstream, StreamShape.REFERENCE,
127                  StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
128            this.isNaturalSort = false;
129            this.comparator = Objects.requireNonNull(comparator);
130        }
131
132        @Override
133        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
134            Objects.requireNonNull(sink);
135
136            // If the input is already naturally sorted and this operation
137            // also naturally sorted then this is a no-op
138            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
139                return sink;
140            else if (StreamOpFlag.SIZED.isKnown(flags))
141                return new SizedRefSortingSink<>(sink, comparator);
142            else
143                return new RefSortingSink<>(sink, comparator);
144        }
145
146        @Override
147        public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
148                                                 Spliterator<P_IN> spliterator,
149                                                 IntFunction<T[]> generator) {
150            // If the input is already naturally sorted and this operation
151            // naturally sorts then collect the output
152            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
153                return helper.evaluate(spliterator, false, generator);
154            }
155            else {
156                // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
157                T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
158                Arrays.parallelSort(flattenedData, comparator);
159                return Nodes.node(flattenedData);
160            }
161        }
162    }
163
164    /**
165     * Specialized subtype for sorting int streams.
166     */
167    private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
168        OfInt(AbstractPipeline<?, Integer, ?> upstream) {
169            super(upstream, StreamShape.INT_VALUE,
170                  StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
171        }
172
173        @Override
174        public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
175            Objects.requireNonNull(sink);
176
177            if (StreamOpFlag.SORTED.isKnown(flags))
178                return sink;
179            else if (StreamOpFlag.SIZED.isKnown(flags))
180                return new SizedIntSortingSink(sink);
181            else
182                return new IntSortingSink(sink);
183        }
184
185        @Override
186        public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
187                                                       Spliterator<P_IN> spliterator,
188                                                       IntFunction<Integer[]> generator) {
189            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
190                return helper.evaluate(spliterator, false, generator);
191            }
192            else {
193                Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
194
195                int[] content = n.asPrimitiveArray();
196                Arrays.parallelSort(content);
197
198                return Nodes.node(content);
199            }
200        }
201    }
202
203    /**
204     * Specialized subtype for sorting long streams.
205     */
206    private static final class OfLong extends LongPipeline.StatefulOp<Long> {
207        OfLong(AbstractPipeline<?, Long, ?> upstream) {
208            super(upstream, StreamShape.LONG_VALUE,
209                  StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
210        }
211
212        @Override
213        public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
214            Objects.requireNonNull(sink);
215
216            if (StreamOpFlag.SORTED.isKnown(flags))
217                return sink;
218            else if (StreamOpFlag.SIZED.isKnown(flags))
219                return new SizedLongSortingSink(sink);
220            else
221                return new LongSortingSink(sink);
222        }
223
224        @Override
225        public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
226                                                    Spliterator<P_IN> spliterator,
227                                                    IntFunction<Long[]> generator) {
228            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
229                return helper.evaluate(spliterator, false, generator);
230            }
231            else {
232                Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);
233
234                long[] content = n.asPrimitiveArray();
235                Arrays.parallelSort(content);
236
237                return Nodes.node(content);
238            }
239        }
240    }
241
242    /**
243     * Specialized subtype for sorting double streams.
244     */
245    private static final class OfDouble extends DoublePipeline.StatefulOp<Double> {
246        OfDouble(AbstractPipeline<?, Double, ?> upstream) {
247            super(upstream, StreamShape.DOUBLE_VALUE,
248                  StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
249        }
250
251        @Override
252        public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
253            Objects.requireNonNull(sink);
254
255            if (StreamOpFlag.SORTED.isKnown(flags))
256                return sink;
257            else if (StreamOpFlag.SIZED.isKnown(flags))
258                return new SizedDoubleSortingSink(sink);
259            else
260                return new DoubleSortingSink(sink);
261        }
262
263        @Override
264        public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
265                                                      Spliterator<P_IN> spliterator,
266                                                      IntFunction<Double[]> generator) {
267            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
268                return helper.evaluate(spliterator, false, generator);
269            }
270            else {
271                Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
272
273                double[] content = n.asPrimitiveArray();
274                Arrays.parallelSort(content);
275
276                return Nodes.node(content);
277            }
278        }
279    }
280
281    /**
282     * Abstract {@link Sink} for implementing sort on reference streams.
283     *
284     * <p>
285     * Note: documentation below applies to reference and all primitive sinks.
286     * <p>
287     * Sorting sinks first accept all elements, buffering then into an array
288     * or a re-sizable data structure, if the size of the pipeline is known or
289     * unknown respectively.  At the end of the sink protocol those elements are
290     * sorted and then pushed downstream.
291     * This class records if {@link #cancellationRequested} is called.  If so it
292     * can be inferred that the source pushing source elements into the pipeline
293     * knows that the pipeline is short-circuiting.  In such cases sub-classes
294     * pushing elements downstream will preserve the short-circuiting protocol
295     * by calling {@code downstream.cancellationRequested()} and checking the
296     * result is {@code false} before an element is pushed.
297     * <p>
298     * Note that the above behaviour is an optimization for sorting with
299     * sequential streams.  It is not an error that more elements, than strictly
300     * required to produce a result, may flow through the pipeline.  This can
301     * occur, in general (not restricted to just sorting), for short-circuiting
302     * parallel pipelines.
303     */
304    private static abstract class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
305        protected final Comparator<? super T> comparator;
306        // @@@ could be a lazy final value, if/when support is added
307        protected boolean cancellationWasRequested;
308
309        AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
310            super(downstream);
311            this.comparator = comparator;
312        }
313
314        /**
315         * Records is cancellation is requested so short-circuiting behaviour
316         * can be preserved when the sorted elements are pushed downstream.
317         *
318         * @return false, as this sink never short-circuits.
319         */
320        @Override
321        public final boolean cancellationRequested() {
322            cancellationWasRequested = true;
323            return false;
324        }
325    }
326
327    /**
328     * {@link Sink} for implementing sort on SIZED reference streams.
329     */
330    private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
331        private T[] array;
332        private int offset;
333
334        SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
335            super(sink, comparator);
336        }
337
338        @Override
339        @SuppressWarnings("unchecked")
340        public void begin(long size) {
341            if (size >= Nodes.MAX_ARRAY_SIZE)
342                throw new IllegalArgumentException(Nodes.BAD_SIZE);
343            array = (T[]) new Object[(int) size];
344        }
345
346        @Override
347        public void end() {
348            Arrays.sort(array, 0, offset, comparator);
349            downstream.begin(offset);
350            if (!cancellationWasRequested) {
351                for (int i = 0; i < offset; i++)
352                    downstream.accept(array[i]);
353            }
354            else {
355                for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
356                    downstream.accept(array[i]);
357            }
358            downstream.end();
359            array = null;
360        }
361
362        @Override
363        public void accept(T t) {
364            array[offset++] = t;
365        }
366    }
367
368    /**
369     * {@link Sink} for implementing sort on reference streams.
370     */
371    private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
372        private ArrayList<T> list;
373
374        RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
375            super(sink, comparator);
376        }
377
378        @Override
379        public void begin(long size) {
380            if (size >= Nodes.MAX_ARRAY_SIZE)
381                throw new IllegalArgumentException(Nodes.BAD_SIZE);
382            list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
383        }
384
385        @Override
386        public void end() {
387            list.sort(comparator);
388            downstream.begin(list.size());
389            if (!cancellationWasRequested) {
390                list.forEach(downstream::accept);
391            }
392            else {
393                for (T t : list) {
394                    if (downstream.cancellationRequested()) break;
395                    downstream.accept(t);
396                }
397            }
398            downstream.end();
399            list = null;
400        }
401
402        @Override
403        public void accept(T t) {
404            list.add(t);
405        }
406    }
407
408    /**
409     * Abstract {@link Sink} for implementing sort on int streams.
410     */
411    private static abstract class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {
412        protected boolean cancellationWasRequested;
413
414        AbstractIntSortingSink(Sink<? super Integer> downstream) {
415            super(downstream);
416        }
417
418        @Override
419        public final boolean cancellationRequested() {
420            cancellationWasRequested = true;
421            return false;
422        }
423    }
424
425    /**
426     * {@link Sink} for implementing sort on SIZED int streams.
427     */
428    private static final class SizedIntSortingSink extends AbstractIntSortingSink {
429        private int[] array;
430        private int offset;
431
432        SizedIntSortingSink(Sink<? super Integer> downstream) {
433            super(downstream);
434        }
435
436        @Override
437        public void begin(long size) {
438            if (size >= Nodes.MAX_ARRAY_SIZE)
439                throw new IllegalArgumentException(Nodes.BAD_SIZE);
440            array = new int[(int) size];
441        }
442
443        @Override
444        public void end() {
445            Arrays.sort(array, 0, offset);
446            downstream.begin(offset);
447            if (!cancellationWasRequested) {
448                for (int i = 0; i < offset; i++)
449                    downstream.accept(array[i]);
450            }
451            else {
452                for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
453                    downstream.accept(array[i]);
454            }
455            downstream.end();
456            array = null;
457        }
458
459        @Override
460        public void accept(int t) {
461            array[offset++] = t;
462        }
463    }
464
465    /**
466     * {@link Sink} for implementing sort on int streams.
467     */
468    private static final class IntSortingSink extends AbstractIntSortingSink {
469        private SpinedBuffer.OfInt b;
470
471        IntSortingSink(Sink<? super Integer> sink) {
472            super(sink);
473        }
474
475        @Override
476        public void begin(long size) {
477            if (size >= Nodes.MAX_ARRAY_SIZE)
478                throw new IllegalArgumentException(Nodes.BAD_SIZE);
479            b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
480        }
481
482        @Override
483        public void end() {
484            int[] ints = b.asPrimitiveArray();
485            Arrays.sort(ints);
486            downstream.begin(ints.length);
487            if (!cancellationWasRequested) {
488                for (int anInt : ints)
489                    downstream.accept(anInt);
490            }
491            else {
492                for (int anInt : ints) {
493                    if (downstream.cancellationRequested()) break;
494                    downstream.accept(anInt);
495                }
496            }
497            downstream.end();
498        }
499
500        @Override
501        public void accept(int t) {
502            b.accept(t);
503        }
504    }
505
506    /**
507     * Abstract {@link Sink} for implementing sort on long streams.
508     */
509    private static abstract class AbstractLongSortingSink extends Sink.ChainedLong<Long> {
510        protected boolean cancellationWasRequested;
511
512        AbstractLongSortingSink(Sink<? super Long> downstream) {
513            super(downstream);
514        }
515
516        @Override
517        public final boolean cancellationRequested() {
518            cancellationWasRequested = true;
519            return false;
520        }
521    }
522
523    /**
524     * {@link Sink} for implementing sort on SIZED long streams.
525     */
526    private static final class SizedLongSortingSink extends AbstractLongSortingSink {
527        private long[] array;
528        private int offset;
529
530        SizedLongSortingSink(Sink<? super Long> downstream) {
531            super(downstream);
532        }
533
534        @Override
535        public void begin(long size) {
536            if (size >= Nodes.MAX_ARRAY_SIZE)
537                throw new IllegalArgumentException(Nodes.BAD_SIZE);
538            array = new long[(int) size];
539        }
540
541        @Override
542        public void end() {
543            Arrays.sort(array, 0, offset);
544            downstream.begin(offset);
545            if (!cancellationWasRequested) {
546                for (int i = 0; i < offset; i++)
547                    downstream.accept(array[i]);
548            }
549            else {
550                for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
551                    downstream.accept(array[i]);
552            }
553            downstream.end();
554            array = null;
555        }
556
557        @Override
558        public void accept(long t) {
559            array[offset++] = t;
560        }
561    }
562
563    /**
564     * {@link Sink} for implementing sort on long streams.
565     */
566    private static final class LongSortingSink extends AbstractLongSortingSink {
567        private SpinedBuffer.OfLong b;
568
569        LongSortingSink(Sink<? super Long> sink) {
570            super(sink);
571        }
572
573        @Override
574        public void begin(long size) {
575            if (size >= Nodes.MAX_ARRAY_SIZE)
576                throw new IllegalArgumentException(Nodes.BAD_SIZE);
577            b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
578        }
579
580        @Override
581        public void end() {
582            long[] longs = b.asPrimitiveArray();
583            Arrays.sort(longs);
584            downstream.begin(longs.length);
585            if (!cancellationWasRequested) {
586                for (long aLong : longs)
587                    downstream.accept(aLong);
588            }
589            else {
590                for (long aLong : longs) {
591                    if (downstream.cancellationRequested()) break;
592                    downstream.accept(aLong);
593                }
594            }
595            downstream.end();
596        }
597
598        @Override
599        public void accept(long t) {
600            b.accept(t);
601        }
602    }
603
604    /**
605     * Abstract {@link Sink} for implementing sort on long streams.
606     */
607    private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {
608        protected boolean cancellationWasRequested;
609
610        AbstractDoubleSortingSink(Sink<? super Double> downstream) {
611            super(downstream);
612        }
613
614        @Override
615        public final boolean cancellationRequested() {
616            cancellationWasRequested = true;
617            return false;
618        }
619    }
620
621    /**
622     * {@link Sink} for implementing sort on SIZED double streams.
623     */
624    private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink {
625        private double[] array;
626        private int offset;
627
628        SizedDoubleSortingSink(Sink<? super Double> downstream) {
629            super(downstream);
630        }
631
632        @Override
633        public void begin(long size) {
634            if (size >= Nodes.MAX_ARRAY_SIZE)
635                throw new IllegalArgumentException(Nodes.BAD_SIZE);
636            array = new double[(int) size];
637        }
638
639        @Override
640        public void end() {
641            Arrays.sort(array, 0, offset);
642            downstream.begin(offset);
643            if (!cancellationWasRequested) {
644                for (int i = 0; i < offset; i++)
645                    downstream.accept(array[i]);
646            }
647            else {
648                for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
649                    downstream.accept(array[i]);
650            }
651            downstream.end();
652            array = null;
653        }
654
655        @Override
656        public void accept(double t) {
657            array[offset++] = t;
658        }
659    }
660
661    /**
662     * {@link Sink} for implementing sort on double streams.
663     */
664    private static final class DoubleSortingSink extends AbstractDoubleSortingSink {
665        private SpinedBuffer.OfDouble b;
666
667        DoubleSortingSink(Sink<? super Double> sink) {
668            super(sink);
669        }
670
671        @Override
672        public void begin(long size) {
673            if (size >= Nodes.MAX_ARRAY_SIZE)
674                throw new IllegalArgumentException(Nodes.BAD_SIZE);
675            b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
676        }
677
678        @Override
679        public void end() {
680            double[] doubles = b.asPrimitiveArray();
681            Arrays.sort(doubles);
682            downstream.begin(doubles.length);
683            if (!cancellationWasRequested) {
684                for (double aDouble : doubles)
685                    downstream.accept(aDouble);
686            }
687            else {
688                for (double aDouble : doubles) {
689                    if (downstream.cancellationRequested()) break;
690                    downstream.accept(aDouble);
691                }
692            }
693            downstream.end();
694        }
695
696        @Override
697        public void accept(double t) {
698            b.accept(t);
699        }
700    }
701}
702