1/* 2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23package java.util.stream; 24 25import java.io.PrintWriter; 26import java.io.StringWriter; 27import java.util.ArrayList; 28import java.util.Arrays; 29import java.util.Collection; 30import java.util.Collections; 31import java.util.EnumMap; 32import java.util.EnumSet; 33import java.util.HashMap; 34import java.util.HashSet; 35import java.util.List; 36import java.util.Map; 37import java.util.Objects; 38import java.util.Set; 39import java.util.Spliterator; 40import java.util.function.BiConsumer; 41import java.util.function.Consumer; 42import java.util.function.Function; 43 44import org.testng.annotations.Test; 45 46/** 47 * Base class for streams test cases. Provides 'exercise' methods for taking 48 * lambdas that construct and modify streams, and evaluates them in different 49 * ways and asserts that they produce equivalent results. 50 */ 51@Test 52public abstract class OpTestCase extends LoggingTestCase { 53 54 private final Map<StreamShape, Set<? extends BaseStreamTestScenario>> testScenarios; 55 56 protected OpTestCase() { 57 testScenarios = new EnumMap<>(StreamShape.class); 58 testScenarios.put(StreamShape.REFERENCE, Collections.unmodifiableSet(EnumSet.allOf(StreamTestScenario.class))); 59 testScenarios.put(StreamShape.INT_VALUE, Collections.unmodifiableSet(EnumSet.allOf(IntStreamTestScenario.class))); 60 testScenarios.put(StreamShape.LONG_VALUE, Collections.unmodifiableSet(EnumSet.allOf(LongStreamTestScenario.class))); 61 testScenarios.put(StreamShape.DOUBLE_VALUE, Collections.unmodifiableSet(EnumSet.allOf(DoubleStreamTestScenario.class))); 62 } 63 64 @SuppressWarnings("rawtypes") 65 public static int getStreamFlags(BaseStream s) { 66 return ((AbstractPipeline) s).getStreamFlags(); 67 } 68 69 /** 70 * An asserter for results produced when exercising of stream or terminal 71 * tests. 72 * 73 * @param <R> the type of result to assert on 74 */ 75 public interface ResultAsserter<R> { 76 /** 77 * Assert a result produced when exercising of stream or terminal 78 * test. 79 * 80 * @param actual the actual result 81 * @param expected the expected result 82 * @param isOrdered true if the pipeline is ordered 83 * @param isParallel true if the pipeline is parallel 84 */ 85 void assertResult(R actual, R expected, boolean isOrdered, boolean isParallel); 86 } 87 88 // Exercise stream operations 89 90 public interface BaseStreamTestScenario { 91 StreamShape getShape(); 92 93 boolean isParallel(); 94 95 boolean isOrdered(); 96 97 <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> 98 void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m); 99 } 100 101 protected <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> 102 Collection<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) { 103 return withData(data).stream(m).exercise(); 104 } 105 106 // Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result 107 // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants 108 @SafeVarargs 109 protected final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> 110 Collection<U> exerciseOpsMulti(TestData<T, S_IN> data, 111 Function<S_IN, S_OUT>... ms) { 112 Collection<U> result = null; 113 for (Function<S_IN, S_OUT> m : ms) { 114 if (result == null) 115 result = withData(data).stream(m).exercise(); 116 else { 117 Collection<U> r2 = withData(data).stream(m).exercise(); 118 assertEquals(result, r2); 119 } 120 } 121 return result; 122 } 123 124 // Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result 125 // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same 126 // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams 127 protected final 128 Collection<Integer> exerciseOpsInt(TestData.OfRef<Integer> data, 129 Function<Stream<Integer>, Stream<Integer>> mRef, 130 Function<IntStream, IntStream> mInt, 131 Function<LongStream, LongStream> mLong, 132 Function<DoubleStream, DoubleStream> mDouble) { 133 @SuppressWarnings({ "rawtypes", "unchecked" }) 134 Function<Stream<Integer>, Stream<Integer>>[] ms = new Function[4]; 135 ms[0] = mRef; 136 ms[1] = s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e); 137 ms[2] = s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e); 138 ms[3] = s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e); 139 return exerciseOpsMulti(data, ms); 140 } 141 142 // Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result 143 // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants 144 protected final<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> 145 void exerciseTerminalOpsMulti(TestData<T, S_IN> data, 146 R expected, 147 Map<String, Function<S_IN, S_OUT>> streams, 148 Map<String, Function<S_OUT, R>> terminals) { 149 for (Map.Entry<String, Function<S_IN, S_OUT>> se : streams.entrySet()) { 150 setContext("Intermediate stream", se.getKey()); 151 for (Map.Entry<String, Function<S_OUT, R>> te : terminals.entrySet()) { 152 setContext("Terminal stream", te.getKey()); 153 withData(data) 154 .terminal(se.getValue(), te.getValue()) 155 .expectedResult(expected) 156 .exercise(); 157 158 } 159 } 160 } 161 162 // Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result 163 // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same 164 // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams 165 protected final 166 void exerciseTerminalOpsInt(TestData<Integer, Stream<Integer>> data, 167 Collection<Integer> expected, 168 String desc, 169 Function<Stream<Integer>, Stream<Integer>> mRef, 170 Function<IntStream, IntStream> mInt, 171 Function<LongStream, LongStream> mLong, 172 Function<DoubleStream, DoubleStream> mDouble, 173 Map<String, Function<Stream<Integer>, Collection<Integer>>> terminals) { 174 175 Map<String, Function<Stream<Integer>, Stream<Integer>>> m = new HashMap<>(); 176 m.put("Ref " + desc, mRef); 177 m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e)); 178 m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e)); 179 m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e)); 180 181 exerciseTerminalOpsMulti(data, expected, m, terminals); 182 } 183 184 185 protected <T, U, S_OUT extends BaseStream<U, S_OUT>> 186 Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) { 187 TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); 188 return withData(data1).stream(m).exercise(); 189 } 190 191 protected <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>> 192 Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) { 193 TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); 194 return withData(data1).stream(m).expectedResult(expected).exercise(); 195 } 196 197 @SuppressWarnings("unchecked") 198 protected <U, S_OUT extends BaseStream<U, S_OUT>> 199 Collection<U> exerciseOps(int[] data, Function<IntStream, S_OUT> m) { 200 return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise(); 201 } 202 203 protected Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) { 204 TestData.OfInt data1 = TestData.Factory.ofArray("int array", data); 205 return withData(data1).stream(m).expectedResult(expected).exercise(); 206 } 207 208 protected <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) { 209 Objects.requireNonNull(data); 210 return new DataStreamBuilder<>(data); 211 } 212 213 @SuppressWarnings({"rawtypes", "unchecked"}) 214 public class DataStreamBuilder<T, S_IN extends BaseStream<T, S_IN>> { 215 final TestData<T, S_IN> data; 216 217 private DataStreamBuilder(TestData<T, S_IN> data) { 218 this.data = Objects.requireNonNull(data); 219 } 220 221 public <U, S_OUT extends BaseStream<U, S_OUT>> 222 ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> ops(IntermediateTestOp... ops) { 223 return new ExerciseDataStreamBuilder<>(data, (S_IN s) -> (S_OUT) chain(s, ops)); 224 } 225 226 public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> 227 stream(Function<S_IN, S_OUT> m) { 228 return new ExerciseDataStreamBuilder<>(data, m); 229 } 230 231 public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> 232 stream(Function<S_IN, S_OUT> m, IntermediateTestOp<U, U> additionalOp) { 233 return new ExerciseDataStreamBuilder<>(data, s -> (S_OUT) chain(m.apply(s), additionalOp)); 234 } 235 236 public <R> ExerciseDataTerminalBuilder<T, T, R, S_IN, S_IN> 237 terminal(Function<S_IN, R> terminalF) { 238 return new ExerciseDataTerminalBuilder<>(data, s -> s, terminalF); 239 } 240 241 public <U, R, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> 242 terminal(Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) { 243 return new ExerciseDataTerminalBuilder<>(data, streamF, terminalF); 244 } 245 } 246 247 @SuppressWarnings({"rawtypes", "unchecked"}) 248 public class ExerciseDataStreamBuilder<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> { 249 final TestData<T, S_IN> data; 250 final Function<S_IN, S_OUT> m; 251 final StreamShape shape; 252 253 Set<BaseStreamTestScenario> testSet = new HashSet<>(); 254 255 Collection<U> refResult; 256 257 Consumer<TestData<T, S_IN>> before = LambdaTestHelpers.bEmpty; 258 259 Consumer<TestData<T, S_IN>> after = LambdaTestHelpers.bEmpty; 260 261 ResultAsserter<Iterable<U>> resultAsserter = (act, exp, ord, par) -> { 262 if (par & !ord) { 263 LambdaTestHelpers.assertContentsUnordered(act, exp); 264 } 265 else { 266 LambdaTestHelpers.assertContentsEqual(act, exp); 267 } 268 }; 269 270 private ExerciseDataStreamBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) { 271 this.data = data; 272 273 this.m = Objects.requireNonNull(m); 274 275 this.shape = ((AbstractPipeline<?, U, ?>) m.apply(data.stream())).getOutputShape(); 276 277 // Have to initiate from the output shape of the last stream 278 // This means the stream mapper is required first rather than last 279 testSet.addAll(testScenarios.get(shape)); 280 } 281 282 // 283 284 public <I extends Iterable<U>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(I expectedResult) { 285 List<U> l = new ArrayList<>(); 286 expectedResult.forEach(l::add); 287 refResult = l; 288 return this; 289 } 290 291 public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(int[] expectedResult) { 292 List l = new ArrayList(); 293 for (int anExpectedResult : expectedResult) { 294 l.add(anExpectedResult); 295 } 296 refResult = l; 297 return this; 298 } 299 300 public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(long[] expectedResult) { 301 List l = new ArrayList(); 302 for (long anExpectedResult : expectedResult) { 303 l.add(anExpectedResult); 304 } 305 refResult = l; 306 return this; 307 } 308 309 public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(double[] expectedResult) { 310 List l = new ArrayList(); 311 for (double anExpectedResult : expectedResult) { 312 l.add(anExpectedResult); 313 } 314 refResult = l; 315 return this; 316 } 317 318 public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> before(Consumer<TestData<T, S_IN>> before) { 319 this.before = Objects.requireNonNull(before); 320 return this; 321 } 322 323 public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> after(Consumer<TestData<T, S_IN>> after) { 324 this.after = Objects.requireNonNull(after); 325 return this; 326 } 327 328 public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> without(BaseStreamTestScenario... tests) { 329 return without(Arrays.asList(tests)); 330 } 331 332 public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> without(Collection<? extends BaseStreamTestScenario> tests) { 333 for (BaseStreamTestScenario ts : tests) { 334 if (ts.getShape() == shape) { 335 testSet.remove(ts); 336 } 337 } 338 339 if (testSet.isEmpty()) { 340 throw new IllegalStateException("Test scenario set is empty"); 341 } 342 343 return this; 344 } 345 346 public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> with(BaseStreamTestScenario... tests) { 347 return with(Arrays.asList(tests)); 348 } 349 350 public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> with(Collection<? extends BaseStreamTestScenario> tests) { 351 testSet = new HashSet<>(); 352 353 for (BaseStreamTestScenario ts : tests) { 354 if (ts.getShape() == shape) { 355 testSet.add(ts); 356 } 357 } 358 359 if (testSet.isEmpty()) { 360 throw new IllegalStateException("Test scenario set is empty"); 361 } 362 363 return this; 364 } 365 366 public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> resultAsserter(ResultAsserter<Iterable<U>> resultAsserter) { 367 this.resultAsserter = resultAsserter; 368 return this; 369 } 370 371 // Build method 372 373 public Collection<U> exercise() { 374 final boolean isStreamOrdered; 375 if (refResult == null) { 376 // Induce the reference result 377 before.accept(data); 378 S_OUT sOut = m.apply(data.stream()); 379 isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); 380 Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]); 381 refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator()); 382 after.accept(data); 383 } 384 else { 385 S_OUT sOut = m.apply(data.stream()); 386 isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); 387 } 388 389 List<Error> errors = new ArrayList<>(); 390 for (BaseStreamTestScenario test : testSet) { 391 try { 392 before.accept(data); 393 394 List<U> result = new ArrayList<>(); 395 test.run(data, LambdaTestHelpers.<U>toBoxingConsumer(result::add), m); 396 397 Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel()); 398 399 if (refResult.size() > 1000) { 400 LambdaTestHelpers.launderAssertion( 401 asserter, 402 () -> String.format("%n%s: [actual size=%d] != [expected size=%d]", test, result.size(), refResult.size())); 403 } 404 else { 405 LambdaTestHelpers.launderAssertion( 406 asserter, 407 () -> String.format("%n%s: [actual] %s != [expected] %s", test, result, refResult)); 408 } 409 410 after.accept(data); 411 } catch (Throwable t) { 412 errors.add(new Error(String.format("%s: %s", test, t), t)); 413 } 414 } 415 416 if (!errors.isEmpty()) { 417 StringBuilder sb = new StringBuilder(); 418 int i = 1; 419 for (Error t : errors) { 420 sb.append(i++).append(": "); 421 if (t instanceof AssertionError) { 422 sb.append(t).append("\n"); 423 } 424 else { 425 StringWriter sw = new StringWriter(); 426 PrintWriter pw = new PrintWriter(sw); 427 428 t.getCause().printStackTrace(pw); 429 pw.flush(); 430 sb.append(t).append("\n").append(sw); 431 } 432 } 433 sb.append("--"); 434 435 fail(String.format("%d failure(s) for test data: %s\n%s", i - 1, data.toString(), sb)); 436 } 437 438 return refResult; 439 } 440 } 441 442 // Exercise terminal operations 443 444 interface BaseTerminalTestScenario<U, R, S_OUT extends BaseStream<U, S_OUT>> { 445 boolean requiresSingleStageSource(); 446 447 boolean requiresParallelSource(); 448 449 default R run(Function<S_OUT, R> terminalF, S_OUT source, StreamShape shape) { 450 return terminalF.apply(source); 451 } 452 } 453 454 @SuppressWarnings({"rawtypes", "unchecked"}) 455 enum TerminalTestScenario implements BaseTerminalTestScenario { 456 SINGLE_SEQUENTIAL(true, false), 457 458 SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) { 459 @Override 460 public Object run(Function terminalF, BaseStream source, StreamShape shape) { 461 source = (BaseStream) chain(source, new ShortCircuitOp(shape)); 462 return terminalF.apply(source); 463 } 464 }, 465 466 SINGLE_PARALLEL(true, true), 467 468 ALL_SEQUENTIAL(false, false), 469 470 ALL_SEQUENTIAL_SHORT_CIRCUIT(false, false) { 471 @Override 472 public Object run(Function terminalF, BaseStream source, StreamShape shape) { 473 source = (BaseStream) chain(source, new ShortCircuitOp(shape)); 474 return terminalF.apply(source); 475 } 476 }, 477 478 ALL_PARALLEL(false, true), 479 480 ALL_PARALLEL_SEQUENTIAL(false, false) { 481 @Override 482 public Object run(Function terminalF, BaseStream source, StreamShape shape) { 483 return terminalF.apply(source.sequential()); 484 } 485 }, 486 ; 487 488 private final boolean requiresSingleStageSource; 489 private final boolean isParallel; 490 491 TerminalTestScenario(boolean requiresSingleStageSource, boolean isParallel) { 492 this.requiresSingleStageSource = requiresSingleStageSource; 493 this.isParallel = isParallel; 494 } 495 496 @Override 497 public boolean requiresSingleStageSource() { 498 return requiresSingleStageSource; 499 } 500 501 @Override 502 public boolean requiresParallelSource() { 503 return isParallel; 504 } 505 506 } 507 508 @SuppressWarnings({"rawtypes", "unchecked"}) 509 public class ExerciseDataTerminalBuilder<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> { 510 final TestData<T, S_IN> data; 511 final Function<S_IN, S_OUT> streamF; 512 final Function<S_OUT, R> terminalF; 513 514 R refResult; 515 516 ResultAsserter<R> resultAsserter = (act, exp, ord, par) -> LambdaTestHelpers.assertContentsEqual(act, exp); 517 518 private ExerciseDataTerminalBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) { 519 this.data = data; 520 this.streamF = Objects.requireNonNull(streamF); 521 this.terminalF = Objects.requireNonNull(terminalF); 522 } 523 524 // 525 526 public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> expectedResult(R expectedResult) { 527 this.refResult = expectedResult; 528 return this; 529 } 530 531 public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> equalator(BiConsumer<R, R> equalityAsserter) { 532 resultAsserter = (act, exp, ord, par) -> equalityAsserter.accept(act, exp); 533 return this; 534 } 535 536 public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> resultAsserter(ResultAsserter<R> resultAsserter) { 537 this.resultAsserter = resultAsserter; 538 return this; 539 } 540 541 // Build method 542 543 public R exercise() { 544 S_OUT out = streamF.apply(data.stream()).sequential(); 545 AbstractPipeline ap = (AbstractPipeline) out; 546 boolean isOrdered = StreamOpFlag.ORDERED.isKnown(ap.getStreamFlags()); 547 StreamShape shape = ap.getOutputShape(); 548 549 EnumSet<TerminalTestScenario> tests = EnumSet.allOf(TerminalTestScenario.class); 550 // Sequentially collect the output that will be input to the terminal op 551 Node<U> node = ap.evaluateToArrayNode(size -> (U[]) new Object[size]); 552 if (refResult == null) { 553 // Induce the reference result 554 S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(), 555 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED, 556 false); 557 558 refResult = (R) TerminalTestScenario.SINGLE_SEQUENTIAL.run(terminalF, source, shape); 559 tests.remove(TerminalTestScenario.SINGLE_SEQUENTIAL); 560 } 561 562 for (BaseTerminalTestScenario test : tests) { 563 S_OUT source; 564 if (test.requiresSingleStageSource()) { 565 source = (S_OUT) createPipeline(shape, node.spliterator(), 566 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED, 567 test.requiresParallelSource()); 568 } 569 else { 570 source = streamF.apply(test.requiresParallelSource() 571 ? data.parallelStream() : data.stream()); 572 } 573 574 R result = (R) test.run(terminalF, source, shape); 575 576 LambdaTestHelpers.launderAssertion( 577 () -> resultAsserter.assertResult(result, refResult, isOrdered, test.requiresParallelSource()), 578 () -> String.format("%s: %s != %s", test, refResult, result)); 579 } 580 581 return refResult; 582 } 583 584 AbstractPipeline createPipeline(StreamShape shape, Spliterator s, int flags, boolean parallel) { 585 switch (shape) { 586 case REFERENCE: return new ReferencePipeline.Head<>(s, flags, parallel); 587 case INT_VALUE: return new IntPipeline.Head(s, flags, parallel); 588 case LONG_VALUE: return new LongPipeline.Head(s, flags, parallel); 589 case DOUBLE_VALUE: return new DoublePipeline.Head(s, flags, parallel); 590 default: throw new IllegalStateException("Unknown shape: " + shape); 591 } 592 } 593 } 594 595 protected <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) { 596 TestData.OfRef<T> data1 597 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); 598 return withData(data1).terminal(m).expectedResult(expected).exercise(); 599 } 600 601 protected <T, R, S_IN extends BaseStream<T, S_IN>> R 602 exerciseTerminalOps(TestData<T, S_IN> data, 603 Function<S_IN, R> terminalF) { 604 return withData(data).terminal(terminalF).exercise(); 605 } 606 607 protected <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R 608 exerciseTerminalOps(TestData<T, S_IN> data, 609 Function<S_IN, S_OUT> streamF, 610 Function<S_OUT, R> terminalF) { 611 return withData(data).terminal(streamF, terminalF).exercise(); 612 } 613 614 // 615 616 @SuppressWarnings({"rawtypes", "unchecked"}) 617 private static <T> AbstractPipeline<?, T, ?> chain(AbstractPipeline upstream, IntermediateTestOp<?, T> op) { 618 return (AbstractPipeline<?, T, ?>) IntermediateTestOp.chain(upstream, op); 619 } 620 621 @SuppressWarnings({"rawtypes", "unchecked"}) 622 private static AbstractPipeline<?, ?, ?> chain(AbstractPipeline pipe, IntermediateTestOp... ops) { 623 for (IntermediateTestOp op : ops) 624 pipe = chain(pipe, op); 625 return pipe; 626 } 627 628 @SuppressWarnings("rawtypes") 629 private static <T> AbstractPipeline<?, T, ?> chain(BaseStream pipe, IntermediateTestOp<?, T> op) { 630 return chain((AbstractPipeline) pipe, op); 631 } 632 633 @SuppressWarnings("rawtypes") 634 public static AbstractPipeline<?, ?, ?> chain(BaseStream pipe, IntermediateTestOp... ops) { 635 return chain((AbstractPipeline) pipe, ops); 636 } 637 638 // Test data 639 640 static class ShortCircuitOp<T> implements StatelessTestOp<T,T> { 641 private final StreamShape shape; 642 643 ShortCircuitOp(StreamShape shape) { 644 this.shape = shape; 645 } 646 647 @Override 648 public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) { 649 return sink; 650 } 651 652 @Override 653 public int opGetFlags() { 654 return StreamOpFlag.IS_SHORT_CIRCUIT; 655 } 656 657 @Override 658 public StreamShape outputShape() { 659 return shape; 660 } 661 662 @Override 663 public StreamShape inputShape() { 664 return shape; 665 } 666 } 667} 668