1/*
2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23package java.util.stream;
24
25import java.io.PrintWriter;
26import java.io.StringWriter;
27import java.util.ArrayList;
28import java.util.Arrays;
29import java.util.Collection;
30import java.util.Collections;
31import java.util.EnumMap;
32import java.util.EnumSet;
33import java.util.HashMap;
34import java.util.HashSet;
35import java.util.List;
36import java.util.Map;
37import java.util.Objects;
38import java.util.Set;
39import java.util.Spliterator;
40import java.util.function.BiConsumer;
41import java.util.function.Consumer;
42import java.util.function.Function;
43
44import org.testng.annotations.Test;
45
46/**
47 * Base class for streams test cases.  Provides 'exercise' methods for taking
48 * lambdas that construct and modify streams, and evaluates them in different
49 * ways and asserts that they produce equivalent results.
50 */
51@Test
52public abstract class OpTestCase extends LoggingTestCase {
53
54    private final Map<StreamShape, Set<? extends BaseStreamTestScenario>> testScenarios;
55
56    protected OpTestCase() {
57        testScenarios = new EnumMap<>(StreamShape.class);
58        testScenarios.put(StreamShape.REFERENCE, Collections.unmodifiableSet(EnumSet.allOf(StreamTestScenario.class)));
59        testScenarios.put(StreamShape.INT_VALUE, Collections.unmodifiableSet(EnumSet.allOf(IntStreamTestScenario.class)));
60        testScenarios.put(StreamShape.LONG_VALUE, Collections.unmodifiableSet(EnumSet.allOf(LongStreamTestScenario.class)));
61        testScenarios.put(StreamShape.DOUBLE_VALUE, Collections.unmodifiableSet(EnumSet.allOf(DoubleStreamTestScenario.class)));
62    }
63
64    @SuppressWarnings("rawtypes")
65    public static int getStreamFlags(BaseStream s) {
66        return ((AbstractPipeline) s).getStreamFlags();
67    }
68
69    /**
70     * An asserter for results produced when exercising of stream or terminal
71     * tests.
72     *
73     * @param <R> the type of result to assert on
74     */
75    public interface ResultAsserter<R> {
76        /**
77         * Assert a result produced when exercising of stream or terminal
78         * test.
79         *
80         * @param actual the actual result
81         * @param expected the expected result
82         * @param isOrdered true if the pipeline is ordered
83         * @param isParallel true if the pipeline is parallel
84         */
85        void assertResult(R actual, R expected, boolean isOrdered, boolean isParallel);
86    }
87
88    // Exercise stream operations
89
90    public interface BaseStreamTestScenario {
91        StreamShape getShape();
92
93        boolean isParallel();
94
95        boolean isOrdered();
96
97        <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
98        void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m);
99    }
100
101    protected <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
102    Collection<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
103        return withData(data).stream(m).exercise();
104    }
105
106    // Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result
107    // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
108    @SafeVarargs
109    protected final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
110    Collection<U> exerciseOpsMulti(TestData<T, S_IN> data,
111                                   Function<S_IN, S_OUT>... ms) {
112        Collection<U> result = null;
113        for (Function<S_IN, S_OUT> m : ms) {
114            if (result == null)
115                result = withData(data).stream(m).exercise();
116            else {
117                Collection<U> r2 = withData(data).stream(m).exercise();
118                assertEquals(result, r2);
119            }
120        }
121        return result;
122    }
123
124    // Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result
125    // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
126    // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
127    protected final
128    Collection<Integer> exerciseOpsInt(TestData.OfRef<Integer> data,
129                                       Function<Stream<Integer>, Stream<Integer>> mRef,
130                                       Function<IntStream, IntStream> mInt,
131                                       Function<LongStream, LongStream> mLong,
132                                       Function<DoubleStream, DoubleStream> mDouble) {
133        @SuppressWarnings({ "rawtypes", "unchecked" })
134        Function<Stream<Integer>, Stream<Integer>>[] ms = new Function[4];
135        ms[0] = mRef;
136        ms[1] = s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e);
137        ms[2] = s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e);
138        ms[3] = s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e);
139        return exerciseOpsMulti(data, ms);
140    }
141
142    // Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result
143    // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
144    protected final<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
145    void exerciseTerminalOpsMulti(TestData<T, S_IN> data,
146                                  R expected,
147                                  Map<String, Function<S_IN, S_OUT>> streams,
148                                  Map<String, Function<S_OUT, R>> terminals) {
149        for (Map.Entry<String, Function<S_IN, S_OUT>> se : streams.entrySet()) {
150            setContext("Intermediate stream", se.getKey());
151            for (Map.Entry<String, Function<S_OUT, R>> te : terminals.entrySet()) {
152                setContext("Terminal stream", te.getKey());
153                withData(data)
154                        .terminal(se.getValue(), te.getValue())
155                        .expectedResult(expected)
156                        .exercise();
157
158            }
159        }
160    }
161
162    // Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result
163    // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
164    // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
165    protected final
166    void exerciseTerminalOpsInt(TestData<Integer, Stream<Integer>> data,
167                                Collection<Integer> expected,
168                                String desc,
169                                Function<Stream<Integer>, Stream<Integer>> mRef,
170                                Function<IntStream, IntStream> mInt,
171                                Function<LongStream, LongStream> mLong,
172                                Function<DoubleStream, DoubleStream> mDouble,
173                                Map<String, Function<Stream<Integer>, Collection<Integer>>> terminals) {
174
175        Map<String, Function<Stream<Integer>, Stream<Integer>>> m = new HashMap<>();
176        m.put("Ref " + desc, mRef);
177        m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e));
178        m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e));
179        m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e));
180
181        exerciseTerminalOpsMulti(data, expected, m, terminals);
182    }
183
184
185    protected <T, U, S_OUT extends BaseStream<U, S_OUT>>
186    Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) {
187        TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
188        return withData(data1).stream(m).exercise();
189    }
190
191    protected <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>>
192    Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) {
193        TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
194        return withData(data1).stream(m).expectedResult(expected).exercise();
195    }
196
197    @SuppressWarnings("unchecked")
198    protected <U, S_OUT extends BaseStream<U, S_OUT>>
199    Collection<U> exerciseOps(int[] data, Function<IntStream, S_OUT> m) {
200        return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise();
201    }
202
203    protected Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) {
204        TestData.OfInt data1 = TestData.Factory.ofArray("int array", data);
205        return withData(data1).stream(m).expectedResult(expected).exercise();
206    }
207
208    protected <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
209        Objects.requireNonNull(data);
210        return new DataStreamBuilder<>(data);
211    }
212
213    @SuppressWarnings({"rawtypes", "unchecked"})
214    public class DataStreamBuilder<T, S_IN extends BaseStream<T, S_IN>> {
215        final TestData<T, S_IN> data;
216
217        private DataStreamBuilder(TestData<T, S_IN> data) {
218            this.data = Objects.requireNonNull(data);
219        }
220
221        public <U, S_OUT extends BaseStream<U, S_OUT>>
222        ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> ops(IntermediateTestOp... ops) {
223            return new ExerciseDataStreamBuilder<>(data, (S_IN s) -> (S_OUT) chain(s, ops));
224        }
225
226        public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT>
227        stream(Function<S_IN, S_OUT> m) {
228            return new ExerciseDataStreamBuilder<>(data, m);
229        }
230
231        public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT>
232        stream(Function<S_IN, S_OUT> m, IntermediateTestOp<U, U> additionalOp) {
233            return new ExerciseDataStreamBuilder<>(data, s -> (S_OUT) chain(m.apply(s), additionalOp));
234        }
235
236        public <R> ExerciseDataTerminalBuilder<T, T, R, S_IN, S_IN>
237        terminal(Function<S_IN, R> terminalF) {
238            return new ExerciseDataTerminalBuilder<>(data, s -> s, terminalF);
239        }
240
241        public <U, R, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT>
242        terminal(Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) {
243            return new ExerciseDataTerminalBuilder<>(data, streamF, terminalF);
244        }
245    }
246
247    @SuppressWarnings({"rawtypes", "unchecked"})
248    public class ExerciseDataStreamBuilder<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> {
249        final TestData<T, S_IN> data;
250        final Function<S_IN, S_OUT> m;
251        final StreamShape shape;
252
253        Set<BaseStreamTestScenario> testSet = new HashSet<>();
254
255        Collection<U> refResult;
256
257        Consumer<TestData<T, S_IN>> before = LambdaTestHelpers.bEmpty;
258
259        Consumer<TestData<T, S_IN>> after = LambdaTestHelpers.bEmpty;
260
261        ResultAsserter<Iterable<U>> resultAsserter = (act, exp, ord, par) -> {
262            if (par & !ord) {
263                LambdaTestHelpers.assertContentsUnordered(act, exp);
264            }
265            else {
266                LambdaTestHelpers.assertContentsEqual(act, exp);
267            }
268        };
269
270        private ExerciseDataStreamBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
271            this.data = data;
272
273            this.m = Objects.requireNonNull(m);
274
275            this.shape = ((AbstractPipeline<?, U, ?>) m.apply(data.stream())).getOutputShape();
276
277            // Have to initiate from the output shape of the last stream
278            // This means the stream mapper is required first rather than last
279            testSet.addAll(testScenarios.get(shape));
280        }
281
282        //
283
284        public <I extends Iterable<U>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(I expectedResult) {
285            List<U> l = new ArrayList<>();
286            expectedResult.forEach(l::add);
287            refResult = l;
288            return this;
289        }
290
291        public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(int[] expectedResult) {
292            List l = new ArrayList();
293            for (int anExpectedResult : expectedResult) {
294                l.add(anExpectedResult);
295            }
296            refResult = l;
297            return this;
298        }
299
300        public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(long[] expectedResult) {
301            List l = new ArrayList();
302            for (long anExpectedResult : expectedResult) {
303                l.add(anExpectedResult);
304            }
305            refResult = l;
306            return this;
307        }
308
309        public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(double[] expectedResult) {
310            List l = new ArrayList();
311            for (double anExpectedResult : expectedResult) {
312                l.add(anExpectedResult);
313            }
314            refResult = l;
315            return this;
316        }
317
318        public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> before(Consumer<TestData<T, S_IN>> before) {
319            this.before = Objects.requireNonNull(before);
320            return this;
321        }
322
323        public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> after(Consumer<TestData<T, S_IN>> after) {
324            this.after = Objects.requireNonNull(after);
325            return this;
326        }
327
328        public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> without(BaseStreamTestScenario... tests) {
329            return without(Arrays.asList(tests));
330        }
331
332        public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> without(Collection<? extends BaseStreamTestScenario> tests) {
333            for (BaseStreamTestScenario ts : tests) {
334                if (ts.getShape() == shape) {
335                    testSet.remove(ts);
336                }
337            }
338
339            if (testSet.isEmpty()) {
340                throw new IllegalStateException("Test scenario set is empty");
341            }
342
343            return this;
344        }
345
346        public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> with(BaseStreamTestScenario... tests) {
347            return with(Arrays.asList(tests));
348        }
349
350        public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> with(Collection<? extends BaseStreamTestScenario> tests) {
351            testSet = new HashSet<>();
352
353            for (BaseStreamTestScenario ts : tests) {
354                if (ts.getShape() == shape) {
355                    testSet.add(ts);
356                }
357            }
358
359            if (testSet.isEmpty()) {
360                throw new IllegalStateException("Test scenario set is empty");
361            }
362
363            return this;
364        }
365
366        public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> resultAsserter(ResultAsserter<Iterable<U>> resultAsserter) {
367            this.resultAsserter = resultAsserter;
368            return this;
369        }
370
371        // Build method
372
373        public Collection<U> exercise() {
374            final boolean isStreamOrdered;
375            if (refResult == null) {
376                // Induce the reference result
377                before.accept(data);
378                S_OUT sOut = m.apply(data.stream());
379                isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
380                Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]);
381                refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator());
382                after.accept(data);
383            }
384            else {
385                S_OUT sOut = m.apply(data.stream());
386                isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
387            }
388
389            List<Error> errors = new ArrayList<>();
390            for (BaseStreamTestScenario test : testSet) {
391                try {
392                    before.accept(data);
393
394                    List<U> result = new ArrayList<>();
395                    test.run(data, LambdaTestHelpers.<U>toBoxingConsumer(result::add), m);
396
397                    Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel());
398
399                    if (refResult.size() > 1000) {
400                        LambdaTestHelpers.launderAssertion(
401                                asserter,
402                                () -> String.format("%n%s: [actual size=%d] != [expected size=%d]", test, result.size(), refResult.size()));
403                    }
404                    else {
405                        LambdaTestHelpers.launderAssertion(
406                                asserter,
407                                () -> String.format("%n%s: [actual] %s != [expected] %s", test, result, refResult));
408                    }
409
410                    after.accept(data);
411                } catch (Throwable t) {
412                    errors.add(new Error(String.format("%s: %s", test, t), t));
413                }
414            }
415
416            if (!errors.isEmpty()) {
417                StringBuilder sb = new StringBuilder();
418                int i = 1;
419                for (Error t : errors) {
420                    sb.append(i++).append(": ");
421                    if (t instanceof AssertionError) {
422                        sb.append(t).append("\n");
423                    }
424                    else {
425                        StringWriter sw = new StringWriter();
426                        PrintWriter pw = new PrintWriter(sw);
427
428                        t.getCause().printStackTrace(pw);
429                        pw.flush();
430                        sb.append(t).append("\n").append(sw);
431                    }
432                }
433                sb.append("--");
434
435                fail(String.format("%d failure(s) for test data: %s\n%s", i - 1, data.toString(), sb));
436            }
437
438            return refResult;
439        }
440    }
441
442    // Exercise terminal operations
443
444    interface BaseTerminalTestScenario<U, R, S_OUT extends BaseStream<U, S_OUT>> {
445        boolean requiresSingleStageSource();
446
447        boolean requiresParallelSource();
448
449        default R run(Function<S_OUT, R> terminalF, S_OUT source, StreamShape shape) {
450            return terminalF.apply(source);
451        }
452    }
453
454    @SuppressWarnings({"rawtypes", "unchecked"})
455    enum TerminalTestScenario implements BaseTerminalTestScenario {
456        SINGLE_SEQUENTIAL(true, false),
457
458        SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) {
459            @Override
460            public Object run(Function terminalF, BaseStream source, StreamShape shape) {
461                source = (BaseStream) chain(source, new ShortCircuitOp(shape));
462                return terminalF.apply(source);
463            }
464        },
465
466        SINGLE_PARALLEL(true, true),
467
468        ALL_SEQUENTIAL(false, false),
469
470        ALL_SEQUENTIAL_SHORT_CIRCUIT(false, false) {
471            @Override
472            public Object run(Function terminalF, BaseStream source, StreamShape shape) {
473                source = (BaseStream) chain(source, new ShortCircuitOp(shape));
474                return terminalF.apply(source);
475            }
476        },
477
478        ALL_PARALLEL(false, true),
479
480        ALL_PARALLEL_SEQUENTIAL(false, false) {
481            @Override
482            public Object run(Function terminalF, BaseStream source, StreamShape shape) {
483                return terminalF.apply(source.sequential());
484            }
485        },
486        ;
487
488        private final boolean requiresSingleStageSource;
489        private final boolean isParallel;
490
491        TerminalTestScenario(boolean requiresSingleStageSource, boolean isParallel) {
492            this.requiresSingleStageSource = requiresSingleStageSource;
493            this.isParallel = isParallel;
494        }
495
496        @Override
497        public boolean requiresSingleStageSource() {
498            return requiresSingleStageSource;
499        }
500
501        @Override
502        public boolean requiresParallelSource() {
503            return isParallel;
504        }
505
506    }
507
508    @SuppressWarnings({"rawtypes", "unchecked"})
509    public class ExerciseDataTerminalBuilder<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> {
510        final TestData<T, S_IN> data;
511        final Function<S_IN, S_OUT> streamF;
512        final Function<S_OUT, R> terminalF;
513
514        R refResult;
515
516        ResultAsserter<R> resultAsserter = (act, exp, ord, par) -> LambdaTestHelpers.assertContentsEqual(act, exp);
517
518        private ExerciseDataTerminalBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) {
519            this.data = data;
520            this.streamF = Objects.requireNonNull(streamF);
521            this.terminalF = Objects.requireNonNull(terminalF);
522        }
523
524        //
525
526        public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> expectedResult(R expectedResult) {
527            this.refResult = expectedResult;
528            return this;
529        }
530
531        public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> equalator(BiConsumer<R, R> equalityAsserter) {
532            resultAsserter = (act, exp, ord, par) -> equalityAsserter.accept(act, exp);
533            return this;
534        }
535
536        public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> resultAsserter(ResultAsserter<R> resultAsserter) {
537            this.resultAsserter = resultAsserter;
538            return this;
539        }
540
541        // Build method
542
543        public R exercise() {
544            S_OUT out = streamF.apply(data.stream()).sequential();
545            AbstractPipeline ap = (AbstractPipeline) out;
546            boolean isOrdered = StreamOpFlag.ORDERED.isKnown(ap.getStreamFlags());
547            StreamShape shape = ap.getOutputShape();
548
549            EnumSet<TerminalTestScenario> tests = EnumSet.allOf(TerminalTestScenario.class);
550            // Sequentially collect the output that will be input to the terminal op
551            Node<U> node = ap.evaluateToArrayNode(size -> (U[]) new Object[size]);
552            if (refResult == null) {
553                // Induce the reference result
554                S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
555                                                      StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED,
556                                                      false);
557
558                refResult = (R) TerminalTestScenario.SINGLE_SEQUENTIAL.run(terminalF, source, shape);
559                tests.remove(TerminalTestScenario.SINGLE_SEQUENTIAL);
560            }
561
562            for (BaseTerminalTestScenario test : tests) {
563                S_OUT source;
564                if (test.requiresSingleStageSource()) {
565                    source = (S_OUT) createPipeline(shape, node.spliterator(),
566                                                    StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED,
567                                                    test.requiresParallelSource());
568                }
569                else {
570                    source = streamF.apply(test.requiresParallelSource()
571                                           ? data.parallelStream() : data.stream());
572                }
573
574                R result = (R) test.run(terminalF, source, shape);
575
576                LambdaTestHelpers.launderAssertion(
577                        () -> resultAsserter.assertResult(result, refResult, isOrdered, test.requiresParallelSource()),
578                        () -> String.format("%s: %s != %s", test, refResult, result));
579            }
580
581            return refResult;
582        }
583
584        AbstractPipeline createPipeline(StreamShape shape, Spliterator s, int flags, boolean parallel) {
585            switch (shape) {
586                case REFERENCE:    return new ReferencePipeline.Head<>(s, flags, parallel);
587                case INT_VALUE:    return new IntPipeline.Head(s, flags, parallel);
588                case LONG_VALUE:   return new LongPipeline.Head(s, flags, parallel);
589                case DOUBLE_VALUE: return new DoublePipeline.Head(s, flags, parallel);
590                default: throw new IllegalStateException("Unknown shape: " + shape);
591            }
592        }
593    }
594
595    protected <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
596        TestData.OfRef<T> data1
597                = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
598        return withData(data1).terminal(m).expectedResult(expected).exercise();
599    }
600
601    protected <T, R, S_IN extends BaseStream<T, S_IN>> R
602    exerciseTerminalOps(TestData<T, S_IN> data,
603                        Function<S_IN, R> terminalF) {
604        return withData(data).terminal(terminalF).exercise();
605    }
606
607    protected <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R
608    exerciseTerminalOps(TestData<T, S_IN> data,
609                        Function<S_IN, S_OUT> streamF,
610                        Function<S_OUT, R> terminalF) {
611        return withData(data).terminal(streamF, terminalF).exercise();
612    }
613
614    //
615
616    @SuppressWarnings({"rawtypes", "unchecked"})
617    private static <T> AbstractPipeline<?, T, ?> chain(AbstractPipeline upstream, IntermediateTestOp<?, T> op) {
618        return (AbstractPipeline<?, T, ?>) IntermediateTestOp.chain(upstream, op);
619    }
620
621    @SuppressWarnings({"rawtypes", "unchecked"})
622    private static AbstractPipeline<?, ?, ?> chain(AbstractPipeline pipe, IntermediateTestOp... ops) {
623        for (IntermediateTestOp op : ops)
624            pipe = chain(pipe, op);
625        return pipe;
626    }
627
628    @SuppressWarnings("rawtypes")
629    private static <T> AbstractPipeline<?, T, ?> chain(BaseStream pipe, IntermediateTestOp<?, T> op) {
630        return chain((AbstractPipeline) pipe, op);
631    }
632
633    @SuppressWarnings("rawtypes")
634    public static AbstractPipeline<?, ?, ?> chain(BaseStream pipe, IntermediateTestOp... ops) {
635        return chain((AbstractPipeline) pipe, ops);
636    }
637
638    // Test data
639
640    static class ShortCircuitOp<T> implements StatelessTestOp<T,T> {
641        private final StreamShape shape;
642
643        ShortCircuitOp(StreamShape shape) {
644            this.shape = shape;
645        }
646
647        @Override
648        public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
649            return sink;
650        }
651
652        @Override
653        public int opGetFlags() {
654            return StreamOpFlag.IS_SHORT_CIRCUIT;
655        }
656
657        @Override
658        public StreamShape outputShape() {
659            return shape;
660        }
661
662        @Override
663        public StreamShape inputShape() {
664            return shape;
665        }
666    }
667}
668