/* * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.util.stream; import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumMap; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.Spliterator; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import org.testng.annotations.Test; /** * Base class for streams test cases. Provides 'exercise' methods for taking * lambdas that construct and modify streams, and evaluates them in different * ways and asserts that they produce equivalent results. */ @Test public abstract class OpTestCase extends LoggingTestCase { private final Map> testScenarios; protected OpTestCase() { testScenarios = new EnumMap<>(StreamShape.class); testScenarios.put(StreamShape.REFERENCE, Collections.unmodifiableSet(EnumSet.allOf(StreamTestScenario.class))); testScenarios.put(StreamShape.INT_VALUE, Collections.unmodifiableSet(EnumSet.allOf(IntStreamTestScenario.class))); testScenarios.put(StreamShape.LONG_VALUE, Collections.unmodifiableSet(EnumSet.allOf(LongStreamTestScenario.class))); testScenarios.put(StreamShape.DOUBLE_VALUE, Collections.unmodifiableSet(EnumSet.allOf(DoubleStreamTestScenario.class))); } @SuppressWarnings("rawtypes") public static int getStreamFlags(BaseStream s) { return ((AbstractPipeline) s).getStreamFlags(); } /** * An asserter for results produced when exercising of stream or terminal * tests. * * @param the type of result to assert on */ public interface ResultAsserter { /** * Assert a result produced when exercising of stream or terminal * test. * * @param actual the actual result * @param expected the expected result * @param isOrdered true if the pipeline is ordered * @param isParallel true if the pipeline is parallel */ void assertResult(R actual, R expected, boolean isOrdered, boolean isParallel); } // Exercise stream operations public interface BaseStreamTestScenario { StreamShape getShape(); boolean isParallel(); boolean isOrdered(); , S_OUT extends BaseStream> void run(TestData data, Consumer b, Function m); } protected , S_OUT extends BaseStream> Collection exerciseOps(TestData data, Function m) { return withData(data).stream(m).exercise(); } // Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants @SafeVarargs protected final, S_OUT extends BaseStream> Collection exerciseOpsMulti(TestData data, Function... ms) { Collection result = null; for (Function m : ms) { if (result == null) result = withData(data).stream(m).exercise(); else { Collection r2 = withData(data).stream(m).exercise(); assertEquals(result, r2); } } return result; } // Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result // Automates the conversion between Stream and {Int,Long,Double}Stream and back, so client sites look like you are passing the same // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams protected final Collection exerciseOpsInt(TestData.OfRef data, Function, Stream> mRef, Function mInt, Function mLong, Function mDouble) { @SuppressWarnings({ "rawtypes", "unchecked" }) Function, Stream>[] ms = new Function[4]; ms[0] = mRef; ms[1] = s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e); ms[2] = s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e); ms[3] = s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e); return exerciseOpsMulti(data, ms); } // Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants protected final, S_OUT extends BaseStream> void exerciseTerminalOpsMulti(TestData data, R expected, Map> streams, Map> terminals) { for (Map.Entry> se : streams.entrySet()) { setContext("Intermediate stream", se.getKey()); for (Map.Entry> te : terminals.entrySet()) { setContext("Terminal stream", te.getKey()); withData(data) .terminal(se.getValue(), te.getValue()) .expectedResult(expected) .exercise(); } } } // Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result // Automates the conversion between Stream and {Int,Long,Double}Stream and back, so client sites look like you are passing the same // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams protected final void exerciseTerminalOpsInt(TestData> data, Collection expected, String desc, Function, Stream> mRef, Function mInt, Function mLong, Function mDouble, Map, Collection>> terminals) { Map, Stream>> m = new HashMap<>(); m.put("Ref " + desc, mRef); m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e)); m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e)); m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e)); exerciseTerminalOpsMulti(data, expected, m, terminals); } protected > Collection exerciseOps(Collection data, Function, S_OUT> m) { TestData.OfRef data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); return withData(data1).stream(m).exercise(); } protected , I extends Iterable> Collection exerciseOps(Collection data, Function, S_OUT> m, I expected) { TestData.OfRef data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); return withData(data1).stream(m).expectedResult(expected).exercise(); } @SuppressWarnings("unchecked") protected > Collection exerciseOps(int[] data, Function m) { return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise(); } protected Collection exerciseOps(int[] data, Function m, int[] expected) { TestData.OfInt data1 = TestData.Factory.ofArray("int array", data); return withData(data1).stream(m).expectedResult(expected).exercise(); } protected > DataStreamBuilder withData(TestData data) { Objects.requireNonNull(data); return new DataStreamBuilder<>(data); } @SuppressWarnings({"rawtypes", "unchecked"}) public class DataStreamBuilder> { final TestData data; private DataStreamBuilder(TestData data) { this.data = Objects.requireNonNull(data); } public > ExerciseDataStreamBuilder ops(IntermediateTestOp... ops) { return new ExerciseDataStreamBuilder<>(data, (S_IN s) -> (S_OUT) chain(s, ops)); } public > ExerciseDataStreamBuilder stream(Function m) { return new ExerciseDataStreamBuilder<>(data, m); } public > ExerciseDataStreamBuilder stream(Function m, IntermediateTestOp additionalOp) { return new ExerciseDataStreamBuilder<>(data, s -> (S_OUT) chain(m.apply(s), additionalOp)); } public ExerciseDataTerminalBuilder terminal(Function terminalF) { return new ExerciseDataTerminalBuilder<>(data, s -> s, terminalF); } public > ExerciseDataTerminalBuilder terminal(Function streamF, Function terminalF) { return new ExerciseDataTerminalBuilder<>(data, streamF, terminalF); } } @SuppressWarnings({"rawtypes", "unchecked"}) public class ExerciseDataStreamBuilder, S_OUT extends BaseStream> { final TestData data; final Function m; final StreamShape shape; Set testSet = new HashSet<>(); Collection refResult; Consumer> before = LambdaTestHelpers.bEmpty; Consumer> after = LambdaTestHelpers.bEmpty; ResultAsserter> resultAsserter = (act, exp, ord, par) -> { if (par & !ord) { LambdaTestHelpers.assertContentsUnordered(act, exp); } else { LambdaTestHelpers.assertContentsEqual(act, exp); } }; private ExerciseDataStreamBuilder(TestData data, Function m) { this.data = data; this.m = Objects.requireNonNull(m); this.shape = ((AbstractPipeline) m.apply(data.stream())).getOutputShape(); // Have to initiate from the output shape of the last stream // This means the stream mapper is required first rather than last testSet.addAll(testScenarios.get(shape)); } // public > ExerciseDataStreamBuilder expectedResult(I expectedResult) { List l = new ArrayList<>(); expectedResult.forEach(l::add); refResult = l; return this; } public ExerciseDataStreamBuilder expectedResult(int[] expectedResult) { List l = new ArrayList(); for (int anExpectedResult : expectedResult) { l.add(anExpectedResult); } refResult = l; return this; } public ExerciseDataStreamBuilder expectedResult(long[] expectedResult) { List l = new ArrayList(); for (long anExpectedResult : expectedResult) { l.add(anExpectedResult); } refResult = l; return this; } public ExerciseDataStreamBuilder expectedResult(double[] expectedResult) { List l = new ArrayList(); for (double anExpectedResult : expectedResult) { l.add(anExpectedResult); } refResult = l; return this; } public ExerciseDataStreamBuilder before(Consumer> before) { this.before = Objects.requireNonNull(before); return this; } public ExerciseDataStreamBuilder after(Consumer> after) { this.after = Objects.requireNonNull(after); return this; } public ExerciseDataStreamBuilder without(BaseStreamTestScenario... tests) { return without(Arrays.asList(tests)); } public ExerciseDataStreamBuilder without(Collection tests) { for (BaseStreamTestScenario ts : tests) { if (ts.getShape() == shape) { testSet.remove(ts); } } if (testSet.isEmpty()) { throw new IllegalStateException("Test scenario set is empty"); } return this; } public ExerciseDataStreamBuilder with(BaseStreamTestScenario... tests) { return with(Arrays.asList(tests)); } public ExerciseDataStreamBuilder with(Collection tests) { testSet = new HashSet<>(); for (BaseStreamTestScenario ts : tests) { if (ts.getShape() == shape) { testSet.add(ts); } } if (testSet.isEmpty()) { throw new IllegalStateException("Test scenario set is empty"); } return this; } public ExerciseDataStreamBuilder resultAsserter(ResultAsserter> resultAsserter) { this.resultAsserter = resultAsserter; return this; } // Build method public Collection exercise() { final boolean isStreamOrdered; if (refResult == null) { // Induce the reference result before.accept(data); S_OUT sOut = m.apply(data.stream()); isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); Node refNodeResult = ((AbstractPipeline) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]); refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator()); after.accept(data); } else { S_OUT sOut = m.apply(data.stream()); isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags()); } List errors = new ArrayList<>(); for (BaseStreamTestScenario test : testSet) { try { before.accept(data); List result = new ArrayList<>(); test.run(data, LambdaTestHelpers.toBoxingConsumer(result::add), m); Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel()); if (refResult.size() > 1000) { LambdaTestHelpers.launderAssertion( asserter, () -> String.format("%n%s: [actual size=%d] != [expected size=%d]", test, result.size(), refResult.size())); } else { LambdaTestHelpers.launderAssertion( asserter, () -> String.format("%n%s: [actual] %s != [expected] %s", test, result, refResult)); } after.accept(data); } catch (Throwable t) { errors.add(new Error(String.format("%s: %s", test, t), t)); } } if (!errors.isEmpty()) { StringBuilder sb = new StringBuilder(); int i = 1; for (Error t : errors) { sb.append(i++).append(": "); if (t instanceof AssertionError) { sb.append(t).append("\n"); } else { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); t.getCause().printStackTrace(pw); pw.flush(); sb.append(t).append("\n").append(sw); } } sb.append("--"); fail(String.format("%d failure(s) for test data: %s\n%s", i - 1, data.toString(), sb)); } return refResult; } } // Exercise terminal operations interface BaseTerminalTestScenario> { boolean requiresSingleStageSource(); boolean requiresParallelSource(); default R run(Function terminalF, S_OUT source, StreamShape shape) { return terminalF.apply(source); } } @SuppressWarnings({"rawtypes", "unchecked"}) enum TerminalTestScenario implements BaseTerminalTestScenario { SINGLE_SEQUENTIAL(true, false), SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) { @Override public Object run(Function terminalF, BaseStream source, StreamShape shape) { source = (BaseStream) chain(source, new ShortCircuitOp(shape)); return terminalF.apply(source); } }, SINGLE_PARALLEL(true, true), ALL_SEQUENTIAL(false, false), ALL_SEQUENTIAL_SHORT_CIRCUIT(false, false) { @Override public Object run(Function terminalF, BaseStream source, StreamShape shape) { source = (BaseStream) chain(source, new ShortCircuitOp(shape)); return terminalF.apply(source); } }, ALL_PARALLEL(false, true), ALL_PARALLEL_SEQUENTIAL(false, false) { @Override public Object run(Function terminalF, BaseStream source, StreamShape shape) { return terminalF.apply(source.sequential()); } }, ; private final boolean requiresSingleStageSource; private final boolean isParallel; TerminalTestScenario(boolean requiresSingleStageSource, boolean isParallel) { this.requiresSingleStageSource = requiresSingleStageSource; this.isParallel = isParallel; } @Override public boolean requiresSingleStageSource() { return requiresSingleStageSource; } @Override public boolean requiresParallelSource() { return isParallel; } } @SuppressWarnings({"rawtypes", "unchecked"}) public class ExerciseDataTerminalBuilder, S_OUT extends BaseStream> { final TestData data; final Function streamF; final Function terminalF; R refResult; ResultAsserter resultAsserter = (act, exp, ord, par) -> LambdaTestHelpers.assertContentsEqual(act, exp); private ExerciseDataTerminalBuilder(TestData data, Function streamF, Function terminalF) { this.data = data; this.streamF = Objects.requireNonNull(streamF); this.terminalF = Objects.requireNonNull(terminalF); } // public ExerciseDataTerminalBuilder expectedResult(R expectedResult) { this.refResult = expectedResult; return this; } public ExerciseDataTerminalBuilder equalator(BiConsumer equalityAsserter) { resultAsserter = (act, exp, ord, par) -> equalityAsserter.accept(act, exp); return this; } public ExerciseDataTerminalBuilder resultAsserter(ResultAsserter resultAsserter) { this.resultAsserter = resultAsserter; return this; } // Build method public R exercise() { S_OUT out = streamF.apply(data.stream()).sequential(); AbstractPipeline ap = (AbstractPipeline) out; boolean isOrdered = StreamOpFlag.ORDERED.isKnown(ap.getStreamFlags()); StreamShape shape = ap.getOutputShape(); EnumSet tests = EnumSet.allOf(TerminalTestScenario.class); // Sequentially collect the output that will be input to the terminal op Node node = ap.evaluateToArrayNode(size -> (U[]) new Object[size]); if (refResult == null) { // Induce the reference result S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(), StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED, false); refResult = (R) TerminalTestScenario.SINGLE_SEQUENTIAL.run(terminalF, source, shape); tests.remove(TerminalTestScenario.SINGLE_SEQUENTIAL); } for (BaseTerminalTestScenario test : tests) { S_OUT source; if (test.requiresSingleStageSource()) { source = (S_OUT) createPipeline(shape, node.spliterator(), StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED, test.requiresParallelSource()); } else { source = streamF.apply(test.requiresParallelSource() ? data.parallelStream() : data.stream()); } R result = (R) test.run(terminalF, source, shape); LambdaTestHelpers.launderAssertion( () -> resultAsserter.assertResult(result, refResult, isOrdered, test.requiresParallelSource()), () -> String.format("%s: %s != %s", test, refResult, result)); } return refResult; } AbstractPipeline createPipeline(StreamShape shape, Spliterator s, int flags, boolean parallel) { switch (shape) { case REFERENCE: return new ReferencePipeline.Head<>(s, flags, parallel); case INT_VALUE: return new IntPipeline.Head(s, flags, parallel); case LONG_VALUE: return new LongPipeline.Head(s, flags, parallel); case DOUBLE_VALUE: return new DoublePipeline.Head(s, flags, parallel); default: throw new IllegalStateException("Unknown shape: " + shape); } } } protected R exerciseTerminalOps(Collection data, Function, R> m, R expected) { TestData.OfRef data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data); return withData(data1).terminal(m).expectedResult(expected).exercise(); } protected > R exerciseTerminalOps(TestData data, Function terminalF) { return withData(data).terminal(terminalF).exercise(); } protected , S_OUT extends BaseStream> R exerciseTerminalOps(TestData data, Function streamF, Function terminalF) { return withData(data).terminal(streamF, terminalF).exercise(); } // @SuppressWarnings({"rawtypes", "unchecked"}) private static AbstractPipeline chain(AbstractPipeline upstream, IntermediateTestOp op) { return (AbstractPipeline) IntermediateTestOp.chain(upstream, op); } @SuppressWarnings({"rawtypes", "unchecked"}) private static AbstractPipeline chain(AbstractPipeline pipe, IntermediateTestOp... ops) { for (IntermediateTestOp op : ops) pipe = chain(pipe, op); return pipe; } @SuppressWarnings("rawtypes") private static AbstractPipeline chain(BaseStream pipe, IntermediateTestOp op) { return chain((AbstractPipeline) pipe, op); } @SuppressWarnings("rawtypes") public static AbstractPipeline chain(BaseStream pipe, IntermediateTestOp... ops) { return chain((AbstractPipeline) pipe, ops); } // Test data static class ShortCircuitOp implements StatelessTestOp { private final StreamShape shape; ShortCircuitOp(StreamShape shape) { this.shape = shape; } @Override public Sink opWrapSink(int flags, boolean parallel, Sink sink) { return sink; } @Override public int opGetFlags() { return StreamOpFlag.IS_SHORT_CIRCUIT; } @Override public StreamShape outputShape() { return shape; } @Override public StreamShape inputShape() { return shape; } } }