1/*
2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23package org.openjdk.testlib.java.util.stream;
24
25import java.util.Collections;
26import java.util.EnumSet;
27import java.util.Iterator;
28import java.util.Set;
29import java.util.Spliterator;
30import java.util.function.Consumer;
31import java.util.function.Function;
32import java.util.stream.*;
33
34import org.openjdk.testlib.java.util.stream.FlagDeclaringOp;
35
36/**
37 * Test scenarios for reference streams.
38 *
39 * Each scenario is provided with a data source, a function that maps a fresh
40 * stream (as provided by the data source) to a new stream, and a sink to
41 * receive results.  Each scenario describes a different way of computing the
42 * stream contents.  The test driver will ensure that all scenarios produce
43 * the same output (modulo allowable differences in ordering).
44 */
45@SuppressWarnings({"rawtypes", "unchecked"})
46public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
47
48    STREAM_FOR_EACH_WITH_CLOSE(false) {
49        <T, U, S_IN extends BaseStream<T, S_IN>>
50        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
51            Stream<U> s = m.apply(data.stream());
52            if (s.isParallel()) {
53                s = s.sequential();
54            }
55            s.forEach(b);
56            s.close();
57        }
58    },
59
60    // Collec to list
61    STREAM_COLLECT(false) {
62        <T, U, S_IN extends BaseStream<T, S_IN>>
63        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
64            for (U t : m.apply(data.stream()).collect(Collectors.toList())) {
65                b.accept(t);
66            }
67        }
68    },
69
70    // To array
71    STREAM_TO_ARRAY(false) {
72        <T, U, S_IN extends BaseStream<T, S_IN>>
73        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
74            for (Object t : m.apply(data.stream()).toArray()) {
75                b.accept((U) t);
76            }
77        }
78    },
79
80    // Wrap as stream, and iterate in pull mode
81    STREAM_ITERATOR(false) {
82        <T, U, S_IN extends BaseStream<T, S_IN>>
83        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
84            for (Iterator<U> seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); )
85                b.accept(seqIter.next());
86        }
87    },
88
89    // Wrap as stream, and spliterate then iterate in pull mode
90    STREAM_SPLITERATOR(false) {
91        <T, U, S_IN extends BaseStream<T, S_IN>>
92        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
93            for (Spliterator<U> spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) { }
94        }
95    },
96
97    // Wrap as stream, spliterate, then split a few times mixing advances with forEach
98    STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
99        <T, U, S_IN extends BaseStream<T, S_IN>>
100        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
101            SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator());
102        }
103    },
104
105    // Wrap as stream, and spliterate then iterate in pull mode
106    STREAM_SPLITERATOR_FOREACH(false) {
107        <T, U, S_IN extends BaseStream<T, S_IN>>
108        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
109            m.apply(data.stream()).spliterator().forEachRemaining(b);
110        }
111    },
112
113    // Wrap as parallel stream + sequential
114    PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
115        <T, U, S_IN extends BaseStream<T, S_IN>>
116        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
117            m.apply(data.parallelStream()).sequential().forEach(b);
118        }
119    },
120
121    // Wrap as parallel stream + forEachOrdered
122    PAR_STREAM_FOR_EACH_ORDERED(true) {
123        <T, U, S_IN extends BaseStream<T, S_IN>>
124        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
125            // @@@ Want to explicitly select ordered equalator
126            m.apply(data.parallelStream()).forEachOrdered(b);
127        }
128    },
129
130    // Wrap as stream, and spliterate then iterate sequentially
131    PAR_STREAM_SPLITERATOR(true) {
132        <T, U, S_IN extends BaseStream<T, S_IN>>
133        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
134            for (Spliterator<U> spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) { }
135        }
136    },
137
138    // Wrap as stream, and spliterate then iterate sequentially
139    PAR_STREAM_SPLITERATOR_FOREACH(true) {
140        <T, U, S_IN extends BaseStream<T, S_IN>>
141        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
142            m.apply(data.parallelStream()).spliterator().forEachRemaining(b);
143        }
144    },
145
146    // Wrap as parallel stream + toArray
147    PAR_STREAM_TO_ARRAY(true) {
148        <T, U, S_IN extends BaseStream<T, S_IN>>
149        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
150            for (Object t : m.apply(data.parallelStream()).toArray())
151                b.accept((U) t);
152        }
153    },
154
155    // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
156    PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
157        <T, U, S_IN extends BaseStream<T, S_IN>>
158        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
159            Stream<U> s = m.apply(data.parallelStream());
160            Spliterator<U> sp = s.spliterator();
161            Stream<U> ss = StreamSupport.stream(() -> sp,
162                                                StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
163                                                | (sp.getExactSizeIfKnown() < 0 ? 0 : Spliterator.SIZED), true);
164            for (Object t : ss.toArray())
165                b.accept((U) t);
166        }
167    },
168
169    // Wrap as parallel stream + toArray and clear SIZED flag
170    PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
171        <T, U, S_IN extends BaseStream<T, S_IN>>
172        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
173            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
174                                                 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
175            Stream<U> pipe2 = m.apply(pipe1);
176
177            for (Object t : pipe2.toArray())
178                b.accept((U) t);
179        }
180    },
181
182    // Wrap as parallel + collect to list
183    PAR_STREAM_COLLECT_TO_LIST(true) {
184        <T, U, S_IN extends BaseStream<T, S_IN>>
185        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
186            for (U u : m.apply(data.parallelStream()).collect(Collectors.toList()))
187                b.accept(u);
188        }
189    },
190
191    // Wrap sequential as parallel, + collect to list
192    STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) {
193        <T, U, S_IN extends BaseStream<T, S_IN>>
194        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
195            for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList()))
196                b.accept(u);
197        }
198    },
199
200    // Wrap parallel as sequential,, + collect
201    PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) {
202        <T, U, S_IN extends BaseStream<T, S_IN>>
203        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
204            for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList()))
205                b.accept(u);
206        }
207    },
208
209    // Wrap as parallel stream + forEach synchronizing
210    PAR_STREAM_FOR_EACH(true, false) {
211        <T, U, S_IN extends BaseStream<T, S_IN>>
212        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
213            m.apply(data.parallelStream()).forEach(e -> {
214                synchronized (data) {
215                    b.accept(e);
216                }
217            });
218        }
219    },
220
221    // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
222    PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
223        <T, U, S_IN extends BaseStream<T, S_IN>>
224        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
225            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
226                                                 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
227            m.apply(pipe1).forEach(e -> {
228                synchronized (data) {
229                    b.accept(e);
230                }
231            });
232        }
233    },
234    ;
235
236    // The set of scenarios that clean the SIZED flag
237    public static final Set<StreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
238            EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
239
240    private final boolean isParallel;
241
242    private final boolean isOrdered;
243
244    StreamTestScenario(boolean isParallel) {
245        this(isParallel, true);
246    }
247
248    StreamTestScenario(boolean isParallel, boolean isOrdered) {
249        this.isParallel = isParallel;
250        this.isOrdered = isOrdered;
251    }
252
253    public StreamShape getShape() {
254        return StreamShape.REFERENCE;
255    }
256
257    public boolean isParallel() {
258        return isParallel;
259    }
260
261    public boolean isOrdered() {
262        return isOrdered;
263    }
264
265    public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
266    void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
267        _run(data, b, (Function<S_IN, Stream<U>>) m);
268    }
269
270    abstract <T, U, S_IN extends BaseStream<T, S_IN>>
271    void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m);
272
273}
274