1/*
2 * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23package org.openjdk.testlib.java.util.stream;
24
25import java.util.Collections;
26import java.util.EnumSet;
27import java.util.PrimitiveIterator;
28import java.util.Set;
29import java.util.Spliterator;
30import java.util.function.Consumer;
31import java.util.function.Function;
32import java.util.function.LongConsumer;
33import java.util.stream.*;
34
35import org.openjdk.testlib.java.util.stream.FlagDeclaringOp;
36
37/**
38 * Test scenarios for long streams.
39 *
40 * Each scenario is provided with a data source, a function that maps a fresh
41 * stream (as provided by the data source) to a new stream, and a sink to
42 * receive results.  Each scenario describes a different way of computing the
43 * stream contents.  The test driver will ensure that all scenarios produce
44 * the same output (modulo allowable differences in ordering).
45 */
46@SuppressWarnings({"rawtypes", "unchecked"})
47public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
48
49    STREAM_FOR_EACH_WITH_CLOSE(false) {
50        <T, S_IN extends BaseStream<T, S_IN>>
51        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
52            LongStream s = m.apply(data.stream());
53            if (s.isParallel()) {
54                s = s.sequential();
55            }
56            s.forEach(b);
57            s.close();
58        }
59    },
60
61    STREAM_TO_ARRAY(false) {
62        <T, S_IN extends BaseStream<T, S_IN>>
63        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
64            for (long t : m.apply(data.stream()).toArray()) {
65                b.accept(t);
66            }
67        }
68    },
69
70    STREAM_ITERATOR(false) {
71        <T, S_IN extends BaseStream<T, S_IN>>
72        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
73            for (PrimitiveIterator.OfLong seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); )
74                b.accept(seqIter.nextLong());
75        }
76    },
77
78    // Wrap as stream, and spliterate then iterate in pull mode
79    STREAM_SPLITERATOR(false) {
80        <T, S_IN extends BaseStream<T, S_IN>>
81        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
82            for (Spliterator.OfLong spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) {
83            }
84        }
85    },
86
87    // Wrap as stream, spliterate, then split a few times mixing advances with forEach
88    STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
89        <T, S_IN extends BaseStream<T, S_IN>>
90        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
91            SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator());
92        }
93    },
94
95    // Wrap as stream, and spliterate then iterate in pull mode
96    STREAM_SPLITERATOR_FOREACH(false) {
97        <T, S_IN extends BaseStream<T, S_IN>>
98        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
99            m.apply(data.stream()).spliterator().forEachRemaining(b);
100        }
101    },
102
103    PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
104        <T, S_IN extends BaseStream<T, S_IN>>
105        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
106            m.apply(data.parallelStream()).sequential().forEach(b);
107        }
108    },
109
110    // Wrap as parallel stream + forEachOrdered
111    PAR_STREAM_FOR_EACH_ORDERED(true) {
112        <T, S_IN extends BaseStream<T, S_IN>>
113        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
114            // @@@ Want to explicitly select ordered equalator
115            m.apply(data.parallelStream()).forEachOrdered(b);
116        }
117    },
118
119    // Wrap as stream, and spliterate then iterate sequentially
120    PAR_STREAM_SPLITERATOR(true) {
121        <T, S_IN extends BaseStream<T, S_IN>>
122        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
123            for (Spliterator.OfLong spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) {
124            }
125        }
126    },
127
128    // Wrap as stream, and spliterate then iterate sequentially
129    PAR_STREAM_SPLITERATOR_FOREACH(true) {
130        <T, S_IN extends BaseStream<T, S_IN>>
131        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
132            m.apply(data.parallelStream()).spliterator().forEachRemaining(b);
133        }
134    },
135
136    PAR_STREAM_TO_ARRAY(true) {
137        <T, S_IN extends BaseStream<T, S_IN>>
138        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
139            for (long t : m.apply(data.parallelStream()).toArray())
140                b.accept(t);
141        }
142    },
143
144    // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
145    PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
146        <T, S_IN extends BaseStream<T, S_IN>>
147        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
148            LongStream s = m.apply(data.parallelStream());
149            Spliterator.OfLong sp = s.spliterator();
150            LongStream ss = StreamSupport.longStream(() -> sp,
151                                                     StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
152                                                     | (sp.getExactSizeIfKnown() < 0 ? 0 : Spliterator.SIZED), true);
153            for (long t : ss.toArray())
154                b.accept(t);
155        }
156    },
157
158    PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
159        <T, S_IN extends BaseStream<T, S_IN>>
160        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
161            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
162                                                 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
163            LongStream pipe2 = m.apply(pipe1);
164
165            for (long t : pipe2.toArray())
166                b.accept(t);
167        }
168    },
169
170    // Wrap as parallel stream + forEach synchronizing
171    PAR_STREAM_FOR_EACH(true, false) {
172        <T, S_IN extends BaseStream<T, S_IN>>
173        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
174            m.apply(data.parallelStream()).forEach(e -> {
175                synchronized (data) {
176                    b.accept(e);
177                }
178            });
179        }
180    },
181
182    // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
183    PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
184        <T, S_IN extends BaseStream<T, S_IN>>
185        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
186            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
187                                                 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
188            m.apply(pipe1).forEach(e -> {
189                synchronized (data) {
190                    b.accept(e);
191                }
192            });
193        }
194    },
195    ;
196
197    // The set of scenarios that clean the SIZED flag
198    public static final Set<LongStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
199            EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
200
201    private boolean isParallel;
202
203    private final boolean isOrdered;
204
205    LongStreamTestScenario(boolean isParallel) {
206        this(isParallel, true);
207    }
208
209    LongStreamTestScenario(boolean isParallel, boolean isOrdered) {
210        this.isParallel = isParallel;
211        this.isOrdered = isOrdered;
212    }
213
214    public StreamShape getShape() {
215        return StreamShape.LONG_VALUE;
216    }
217
218    public boolean isParallel() {
219        return isParallel;
220    }
221
222    public boolean isOrdered() {
223        return isOrdered;
224    }
225
226    public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
227    void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
228        _run(data, (LongConsumer) b, (Function<S_IN, LongStream>) m);
229    }
230
231    abstract <T, S_IN extends BaseStream<T, S_IN>>
232    void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m);
233
234}
235