1/* 2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23package org.openjdk.testlib.java.util.stream; 24 25import java.util.Collections; 26import java.util.EnumSet; 27import java.util.Iterator; 28import java.util.Set; 29import java.util.Spliterator; 30import java.util.function.Consumer; 31import java.util.function.Function; 32import java.util.stream.*; 33 34import org.openjdk.testlib.java.util.stream.FlagDeclaringOp; 35 36/** 37 * Test scenarios for reference streams. 38 * 39 * Each scenario is provided with a data source, a function that maps a fresh 40 * stream (as provided by the data source) to a new stream, and a sink to 41 * receive results. Each scenario describes a different way of computing the 42 * stream contents. The test driver will ensure that all scenarios produce 43 * the same output (modulo allowable differences in ordering). 44 */ 45@SuppressWarnings({"rawtypes", "unchecked"}) 46public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario { 47 48 STREAM_FOR_EACH_WITH_CLOSE(false) { 49 <T, U, S_IN extends BaseStream<T, S_IN>> 50 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 51 Stream<U> s = m.apply(data.stream()); 52 if (s.isParallel()) { 53 s = s.sequential(); 54 } 55 s.forEach(b); 56 s.close(); 57 } 58 }, 59 60 // Collec to list 61 STREAM_COLLECT(false) { 62 <T, U, S_IN extends BaseStream<T, S_IN>> 63 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 64 for (U t : m.apply(data.stream()).collect(Collectors.toList())) { 65 b.accept(t); 66 } 67 } 68 }, 69 70 // To array 71 STREAM_TO_ARRAY(false) { 72 <T, U, S_IN extends BaseStream<T, S_IN>> 73 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 74 for (Object t : m.apply(data.stream()).toArray()) { 75 b.accept((U) t); 76 } 77 } 78 }, 79 80 // Wrap as stream, and iterate in pull mode 81 STREAM_ITERATOR(false) { 82 <T, U, S_IN extends BaseStream<T, S_IN>> 83 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 84 for (Iterator<U> seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); ) 85 b.accept(seqIter.next()); 86 } 87 }, 88 89 // Wrap as stream, and spliterate then iterate in pull mode 90 STREAM_SPLITERATOR(false) { 91 <T, U, S_IN extends BaseStream<T, S_IN>> 92 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 93 for (Spliterator<U> spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) { } 94 } 95 }, 96 97 // Wrap as stream, spliterate, then split a few times mixing advances with forEach 98 STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) { 99 <T, U, S_IN extends BaseStream<T, S_IN>> 100 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 101 SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator()); 102 } 103 }, 104 105 // Wrap as stream, and spliterate then iterate in pull mode 106 STREAM_SPLITERATOR_FOREACH(false) { 107 <T, U, S_IN extends BaseStream<T, S_IN>> 108 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 109 m.apply(data.stream()).spliterator().forEachRemaining(b); 110 } 111 }, 112 113 // Wrap as parallel stream + sequential 114 PAR_STREAM_SEQUENTIAL_FOR_EACH(true) { 115 <T, U, S_IN extends BaseStream<T, S_IN>> 116 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 117 m.apply(data.parallelStream()).sequential().forEach(b); 118 } 119 }, 120 121 // Wrap as parallel stream + forEachOrdered 122 PAR_STREAM_FOR_EACH_ORDERED(true) { 123 <T, U, S_IN extends BaseStream<T, S_IN>> 124 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 125 // @@@ Want to explicitly select ordered equalator 126 m.apply(data.parallelStream()).forEachOrdered(b); 127 } 128 }, 129 130 // Wrap as stream, and spliterate then iterate sequentially 131 PAR_STREAM_SPLITERATOR(true) { 132 <T, U, S_IN extends BaseStream<T, S_IN>> 133 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 134 for (Spliterator<U> spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) { } 135 } 136 }, 137 138 // Wrap as stream, and spliterate then iterate sequentially 139 PAR_STREAM_SPLITERATOR_FOREACH(true) { 140 <T, U, S_IN extends BaseStream<T, S_IN>> 141 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 142 m.apply(data.parallelStream()).spliterator().forEachRemaining(b); 143 } 144 }, 145 146 // Wrap as parallel stream + toArray 147 PAR_STREAM_TO_ARRAY(true) { 148 <T, U, S_IN extends BaseStream<T, S_IN>> 149 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 150 for (Object t : m.apply(data.parallelStream()).toArray()) 151 b.accept((U) t); 152 } 153 }, 154 155 // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray 156 PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) { 157 <T, U, S_IN extends BaseStream<T, S_IN>> 158 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 159 Stream<U> s = m.apply(data.parallelStream()); 160 Spliterator<U> sp = s.spliterator(); 161 Stream<U> ss = StreamSupport.stream(() -> sp, 162 StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s)) 163 | (sp.getExactSizeIfKnown() < 0 ? 0 : Spliterator.SIZED), true); 164 for (Object t : ss.toArray()) 165 b.accept((U) t); 166 } 167 }, 168 169 // Wrap as parallel stream + toArray and clear SIZED flag 170 PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) { 171 <T, U, S_IN extends BaseStream<T, S_IN>> 172 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 173 S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(), 174 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); 175 Stream<U> pipe2 = m.apply(pipe1); 176 177 for (Object t : pipe2.toArray()) 178 b.accept((U) t); 179 } 180 }, 181 182 // Wrap as parallel + collect to list 183 PAR_STREAM_COLLECT_TO_LIST(true) { 184 <T, U, S_IN extends BaseStream<T, S_IN>> 185 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 186 for (U u : m.apply(data.parallelStream()).collect(Collectors.toList())) 187 b.accept(u); 188 } 189 }, 190 191 // Wrap sequential as parallel, + collect to list 192 STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) { 193 <T, U, S_IN extends BaseStream<T, S_IN>> 194 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 195 for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList())) 196 b.accept(u); 197 } 198 }, 199 200 // Wrap parallel as sequential,, + collect 201 PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) { 202 <T, U, S_IN extends BaseStream<T, S_IN>> 203 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 204 for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList())) 205 b.accept(u); 206 } 207 }, 208 209 // Wrap as parallel stream + forEach synchronizing 210 PAR_STREAM_FOR_EACH(true, false) { 211 <T, U, S_IN extends BaseStream<T, S_IN>> 212 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 213 m.apply(data.parallelStream()).forEach(e -> { 214 synchronized (data) { 215 b.accept(e); 216 } 217 }); 218 } 219 }, 220 221 // Wrap as parallel stream + forEach synchronizing and clear SIZED flag 222 PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) { 223 <T, U, S_IN extends BaseStream<T, S_IN>> 224 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 225 S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(), 226 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); 227 m.apply(pipe1).forEach(e -> { 228 synchronized (data) { 229 b.accept(e); 230 } 231 }); 232 } 233 }, 234 ; 235 236 // The set of scenarios that clean the SIZED flag 237 public static final Set<StreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet( 238 EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED)); 239 240 private final boolean isParallel; 241 242 private final boolean isOrdered; 243 244 StreamTestScenario(boolean isParallel) { 245 this(isParallel, true); 246 } 247 248 StreamTestScenario(boolean isParallel, boolean isOrdered) { 249 this.isParallel = isParallel; 250 this.isOrdered = isOrdered; 251 } 252 253 public StreamShape getShape() { 254 return StreamShape.REFERENCE; 255 } 256 257 public boolean isParallel() { 258 return isParallel; 259 } 260 261 public boolean isOrdered() { 262 return isOrdered; 263 } 264 265 public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> 266 void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) { 267 _run(data, b, (Function<S_IN, Stream<U>>) m); 268 } 269 270 abstract <T, U, S_IN extends BaseStream<T, S_IN>> 271 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m); 272 273} 274