1/*
2 * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23package org.openjdk.testlib.java.util.stream;
24
25import java.util.Collections;
26import java.util.EnumSet;
27import java.util.PrimitiveIterator;
28import java.util.Set;
29import java.util.Spliterator;
30import java.util.function.Consumer;
31import java.util.function.DoubleConsumer;
32import java.util.function.Function;
33
34import org.openjdk.testlib.java.util.stream.FlagDeclaringOp;
35
36import java.util.stream.*;
37
38/**
39 * Test scenarios for double streams.
40 *
41 * Each scenario is provided with a data source, a function that maps a fresh
42 * stream (as provided by the data source) to a new stream, and a sink to
43 * receive results.  Each scenario describes a different way of computing the
44 * stream contents.  The test driver will ensure that all scenarios produce
45 * the same output (modulo allowable differences in ordering).
46 */
47@SuppressWarnings({"rawtypes", "unchecked"})
48public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
49
50    STREAM_FOR_EACH_WITH_CLOSE(false) {
51        <T, S_IN extends BaseStream<T, S_IN>>
52        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
53            DoubleStream s = m.apply(data.stream());
54            if (s.isParallel()) {
55                s = s.sequential();
56            }
57            s.forEach(b);
58            s.close();
59        }
60    },
61
62    STREAM_TO_ARRAY(false) {
63        <T, S_IN extends BaseStream<T, S_IN>>
64        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
65            for (double t : m.apply(data.stream()).toArray()) {
66                b.accept(t);
67            }
68        }
69    },
70
71    STREAM_ITERATOR(false) {
72        <T, S_IN extends BaseStream<T, S_IN>>
73        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
74            for (PrimitiveIterator.OfDouble seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); )
75                b.accept(seqIter.nextDouble());
76        }
77    },
78
79    // Wrap as stream, and spliterate then iterate in pull mode
80    STREAM_SPLITERATOR(false) {
81        <T, S_IN extends BaseStream<T, S_IN>>
82        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
83            for (Spliterator.OfDouble spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) {
84            }
85        }
86    },
87
88    // Wrap as stream, spliterate, then split a few times mixing advances with forEach
89    STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
90        <T, S_IN extends BaseStream<T, S_IN>>
91        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
92            SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator());
93        }
94    },
95
96    // Wrap as stream, and spliterate then iterate in pull mode
97    STREAM_SPLITERATOR_FOREACH(false) {
98        <T, S_IN extends BaseStream<T, S_IN>>
99        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
100            m.apply(data.stream()).spliterator().forEachRemaining(b);
101        }
102    },
103
104    PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
105        <T, S_IN extends BaseStream<T, S_IN>>
106        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
107            m.apply(data.parallelStream()).sequential().forEach(b);
108        }
109    },
110
111    // Wrap as parallel stream + forEachOrdered
112    PAR_STREAM_FOR_EACH_ORDERED(true) {
113        <T, S_IN extends BaseStream<T, S_IN>>
114        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
115            // @@@ Want to explicitly select ordered equalator
116            m.apply(data.parallelStream()).forEachOrdered(b);
117        }
118    },
119
120    // Wrap as stream, and spliterate then iterate sequentially
121    PAR_STREAM_SPLITERATOR(true) {
122        <T, S_IN extends BaseStream<T, S_IN>>
123        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
124            for (Spliterator.OfDouble spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) {
125            }
126        }
127    },
128
129    // Wrap as stream, and spliterate then iterate sequentially
130    PAR_STREAM_SPLITERATOR_FOREACH(true) {
131        <T, S_IN extends BaseStream<T, S_IN>>
132        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
133            m.apply(data.parallelStream()).spliterator().forEachRemaining(b);
134        }
135    },
136
137    PAR_STREAM_TO_ARRAY(true) {
138        <T, S_IN extends BaseStream<T, S_IN>>
139        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
140            for (double t : m.apply(data.parallelStream()).toArray())
141                b.accept(t);
142        }
143    },
144
145    // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
146    PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
147        <T, S_IN extends BaseStream<T, S_IN>>
148        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
149            DoubleStream s = m.apply(data.parallelStream());
150            Spliterator.OfDouble sp = s.spliterator();
151            DoubleStream ss = StreamSupport.doubleStream(() -> sp,
152                                                         StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
153                                                         | (sp.getExactSizeIfKnown() < 0 ? 0 : Spliterator.SIZED), true);
154            for (double t : ss.toArray())
155                b.accept(t);
156        }
157    },
158
159    PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
160        <T, S_IN extends BaseStream<T, S_IN>>
161        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
162            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
163                                                 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
164            DoubleStream pipe2 = m.apply(pipe1);
165
166            for (double t : pipe2.toArray())
167                b.accept(t);
168        }
169    },
170
171    // Wrap as parallel stream + forEach synchronizing
172    PAR_STREAM_FOR_EACH(true, false) {
173        <T, S_IN extends BaseStream<T, S_IN>>
174        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
175            m.apply(data.parallelStream()).forEach(e -> {
176                synchronized (data) {
177                    b.accept(e);
178                }
179            });
180        }
181    },
182
183    // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
184    PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
185        <T, S_IN extends BaseStream<T, S_IN>>
186        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
187            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
188                                                 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
189            m.apply(pipe1).forEach(e -> {
190                synchronized (data) {
191                    b.accept(e);
192                }
193            });
194        }
195    },
196    ;
197
198    // The set of scenarios that clean the SIZED flag
199    public static final Set<DoubleStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
200            EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
201
202    private boolean isParallel;
203
204    private final boolean isOrdered;
205
206    DoubleStreamTestScenario(boolean isParallel) {
207        this(isParallel, true);
208    }
209
210    DoubleStreamTestScenario(boolean isParallel, boolean isOrdered) {
211        this.isParallel = isParallel;
212        this.isOrdered = isOrdered;
213    }
214
215    public StreamShape getShape() {
216        return StreamShape.DOUBLE_VALUE;
217    }
218
219    public boolean isParallel() {
220        return isParallel;
221    }
222
223    public boolean isOrdered() {
224        return isOrdered;
225    }
226
227    public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
228    void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
229        _run(data, (DoubleConsumer) b, (Function<S_IN, DoubleStream>) m);
230    }
231
232    abstract <T, S_IN extends BaseStream<T, S_IN>>
233    void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m);
234
235}
236