1ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin/*
2ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
5ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * This code is free software; you can redistribute it and/or modify it
6ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * under the terms of the GNU General Public License version 2 only, as
7ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * published by the Free Software Foundation.  Oracle designates this
8ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * particular file as subject to the "Classpath" exception as provided
9ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * by Oracle in the LICENSE file that accompanied this code.
10ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
11ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * This code is distributed in the hope that it will be useful, but WITHOUT
12ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * version 2 for more details (a copy is included in the LICENSE file that
15ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * accompanied this code).
16ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
17ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * You should have received a copy of the GNU General Public License version
18ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * 2 along with this work; if not, write to the Free Software Foundation,
19ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
21ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * or visit www.oracle.com if you need additional information or have any
23ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * questions.
24ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin */
25ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinpackage java.util.stream;
26ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
27ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.Objects;
28ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.Spliterator;
29ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.function.IntFunction;
30ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.function.Supplier;
31ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
32ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin/**
33ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * Abstract base class for "pipeline" classes, which are the core
34ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * implementations of the Stream interface and its primitive specializations.
35ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * Manages construction and evaluation of stream pipelines.
36ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
37ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * <p>An {@code AbstractPipeline} represents an initial portion of a stream
38ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * pipeline, encapsulating a stream source and zero or more intermediate
39ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * operations.  The individual {@code AbstractPipeline} objects are often
40ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * referred to as <em>stages</em>, where each stage describes either the stream
41ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * source or an intermediate operation.
42ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
43ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * <p>A concrete intermediate stage is generally built from an
44ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * {@code AbstractPipeline}, a shape-specific pipeline class which extends it
45ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific
46ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * concrete class which extends that.  {@code AbstractPipeline} contains most of
47ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * the mechanics of evaluating the pipeline, and implements methods that will be
48ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * used by the operation; the shape-specific classes add helper methods for
49ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * dealing with collection of results into the appropriate shape-specific
50ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * containers.
51ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
52ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * <p>After chaining a new intermediate operation, or executing a terminal
53ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * operation, the stream is considered to be consumed, and no more intermediate
54ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * or terminal operations are permitted on this stream instance.
55ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
56ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * @implNote
57ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * <p>For sequential streams, and parallel streams without
58ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * <a href="package-summary.html#StreamOps">stateful intermediate
59ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * operations</a>, parallel streams, pipeline evaluation is done in a single
60ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * pass that "jams" all the operations together.  For parallel streams with
61ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * stateful operations, execution is divided into segments, where each
62ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * stateful operations marks the end of a segment, and each segment is
63ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * evaluated separately and the result used as the input to the next
64ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * segment.  In all cases, the source data is not consumed until a terminal
65ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * operation begins.
66ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
67ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * @param   type of input elements
68ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * @param  type of output elements
69ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * @param <S> type of the subclass implementing {@code BaseStream}
70ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * @since 1.8
71289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin * @hide Visibility for CTS only (OpenJDK 8 streams tests).
72ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin */
73289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkinpublic abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
74ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
75ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
76ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private static final String MSG_CONSUMED = "source already consumed or closed";
77ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
78ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
79ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Backlink to the head of the pipeline chain (self if this is the source
80ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * stage).
81ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
82ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("rawtypes")
83ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private final AbstractPipeline sourceStage;
84ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
85ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
86ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * The "upstream" pipeline, or null if this is the source stage.
87ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
88ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("rawtypes")
89ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private final AbstractPipeline previousStage;
90ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
91ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
92ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * The operation flags for the intermediate operation represented by this
93ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * pipeline object.
94ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
95ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    protected final int sourceOrOpFlags;
96ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
97ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
98ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * The next stage in the pipeline, or null if this is the last stage.
99ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Effectively final at the point of linking to the next pipeline.
100ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
101ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("rawtypes")
102ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private AbstractPipeline nextStage;
103ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
104ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
105ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * The number of intermediate operations between this pipeline object
106ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * and the stream source if sequential, or the previous stateful if parallel.
107ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Valid at the point of pipeline preparation for evaluation.
108ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
109ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private int depth;
110ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
111ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
112ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * The combined source and operation flags for the source and all operations
113ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * up to and including the operation represented by this pipeline object.
114ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Valid at the point of pipeline preparation for evaluation.
115ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
116ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private int combinedFlags;
117ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
118ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
119ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * The source spliterator. Only valid for the head pipeline.
120ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Before the pipeline is consumed if non-null then {@code sourceSupplier}
121ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * must be null. After the pipeline is consumed if non-null then is set to
122ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * null.
123ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
124ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private Spliterator<?> sourceSpliterator;
125ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
126ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
127ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * The source supplier. Only valid for the head pipeline. Before the
128ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * pipeline is consumed if non-null then {@code sourceSpliterator} must be
129ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * null. After the pipeline is consumed if non-null then is set to null.
130ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
131ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private Supplier<? extends Spliterator<?>> sourceSupplier;
132ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
133ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
134ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * True if this pipeline has been linked or consumed
135ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
136ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private boolean linkedOrConsumed;
137ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
138ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
139ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * True if there are any stateful ops in the pipeline; only valid for the
140ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * source stage.
141ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
142ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private boolean sourceAnyStateful;
143ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
144ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private Runnable sourceCloseAction;
145ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
146ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
147ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * True if pipeline is parallel, otherwise the pipeline is sequential; only
148ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * valid for the source stage.
149ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
150ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private boolean parallel;
151ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
152ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
153ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Constructor for the head of a stream pipeline.
154ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
155ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param source {@code Supplier<Spliterator>} describing the stream source
156ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param sourceFlags The source flags for the stream source, described in
157ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * {@link StreamOpFlag}
158ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param parallel True if the pipeline is parallel
159ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
160ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    AbstractPipeline(Supplier<? extends Spliterator<?>> source,
161ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                     int sourceFlags, boolean parallel) {
162ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.previousStage = null;
163ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.sourceSupplier = source;
164ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.sourceStage = this;
165ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
166ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        // The following is an optimization of:
167ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
168ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
169ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.depth = 0;
170ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.parallel = parallel;
171ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
172ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
173ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
174ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Constructor for the head of a stream pipeline.
175ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
176ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param source {@code Spliterator} describing the stream source
177ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param sourceFlags the source flags for the stream source, described in
178ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * {@link StreamOpFlag}
179ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param parallel {@code true} if the pipeline is parallel
180ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
181ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    AbstractPipeline(Spliterator<?> source,
182ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                     int sourceFlags, boolean parallel) {
183ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.previousStage = null;
184ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.sourceSpliterator = source;
185ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.sourceStage = this;
186ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
187ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        // The following is an optimization of:
188ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
189ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
190ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.depth = 0;
191ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.parallel = parallel;
192ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
193ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
194ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
195ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Constructor for appending an intermediate operation stage onto an
196ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * existing pipeline.
197ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
198ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param previousStage the upstream pipeline stage
199ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param opFlags the operation flags for the new stage, described in
200ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * {@link StreamOpFlag}
201ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
202ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
203ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (previousStage.linkedOrConsumed)
204ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            throw new IllegalStateException(MSG_STREAM_LINKED);
205ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        previousStage.linkedOrConsumed = true;
206ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        previousStage.nextStage = this;
207ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
208ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.previousStage = previousStage;
209ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
210ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
211ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.sourceStage = previousStage.sourceStage;
212ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (opIsStateful())
213ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            sourceStage.sourceAnyStateful = true;
214ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        this.depth = previousStage.depth + 1;
215ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
216ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
217ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
218ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    // Terminal evaluation methods
219ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
220ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
221ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Evaluate the pipeline with a terminal operation to produce a result.
222ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
223ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param <R> the type of result
224ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param terminalOp the terminal operation to be applied to the pipeline.
225ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return the result
226ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
227ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
228ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        assert getOutputShape() == terminalOp.inputShape();
229ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (linkedOrConsumed)
230ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            throw new IllegalStateException(MSG_STREAM_LINKED);
231ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        linkedOrConsumed = true;
232ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
233ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return isParallel()
234ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
235ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
236ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
237ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
238ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
239ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Collect the elements output from the pipeline stage.
240ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
241ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param generator the array generator to be used to create array instances
242ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return a flat array-backed Node that holds the collected output elements
243ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
244ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
245289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
246ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (linkedOrConsumed)
247ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            throw new IllegalStateException(MSG_STREAM_LINKED);
248ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        linkedOrConsumed = true;
249ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
250ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        // If the last intermediate operation is stateful then
251ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        // evaluate directly to avoid an extra collection step
252ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (isParallel() && previousStage != null && opIsStateful()) {
253ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            // Set the depth of this, last, pipeline stage to zero to slice the
254ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            // pipeline such that this operation will not be included in the
255ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            // upstream slice and upstream operations will not be included
256ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            // in this slice
257ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            depth = 0;
258ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
259ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
260ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        else {
261ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            return evaluate(sourceSpliterator(0), true, generator);
262ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
263ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
264ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
265ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
266ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Gets the source stage spliterator if this pipeline stage is the source
267ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * stage.  The pipeline is consumed after this method is called and
268ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * returns successfully.
269ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
270ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return the source stage spliterator
271ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @throws IllegalStateException if this pipeline stage is not the source
272ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *         stage.
273ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
274ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
275ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    final Spliterator<E_OUT> sourceStageSpliterator() {
276ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (this != sourceStage)
277ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            throw new IllegalStateException();
278ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
279ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (linkedOrConsumed)
280ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            throw new IllegalStateException(MSG_STREAM_LINKED);
281ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        linkedOrConsumed = true;
282ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
283ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (sourceStage.sourceSpliterator != null) {
284ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            @SuppressWarnings("unchecked")
285ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
286ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            sourceStage.sourceSpliterator = null;
287ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            return s;
288ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
289ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        else if (sourceStage.sourceSupplier != null) {
290ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            @SuppressWarnings("unchecked")
291ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
292ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            sourceStage.sourceSupplier = null;
293ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            return s;
294ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
295ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        else {
296ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            throw new IllegalStateException(MSG_CONSUMED);
297ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
298ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
299ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
300ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    // BaseStream
301ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
302ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
303ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
304ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    public final S sequential() {
305ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        sourceStage.parallel = false;
306ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return (S) this;
307ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
308ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
309ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
310ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
311ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    public final S parallel() {
312ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        sourceStage.parallel = true;
313ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return (S) this;
314ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
315ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
316ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
317ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    public void close() {
318ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        linkedOrConsumed = true;
319ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        sourceSupplier = null;
320ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        sourceSpliterator = null;
321ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (sourceStage.sourceCloseAction != null) {
322ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            Runnable closeAction = sourceStage.sourceCloseAction;
323ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            sourceStage.sourceCloseAction = null;
324ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            closeAction.run();
325ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
326ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
327ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
328ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
329ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
330ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    public S onClose(Runnable closeHandler) {
331ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        Runnable existingHandler = sourceStage.sourceCloseAction;
332ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        sourceStage.sourceCloseAction =
333ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                (existingHandler == null)
334ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                ? closeHandler
335ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                : Streams.composeWithExceptions(existingHandler, closeHandler);
336ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return (S) this;
337ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
338ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
339ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    // Primitive specialization use co-variant overrides, hence is not final
340ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
341ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
342ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    public Spliterator<E_OUT> spliterator() {
343ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (linkedOrConsumed)
344ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            throw new IllegalStateException(MSG_STREAM_LINKED);
345ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        linkedOrConsumed = true;
346ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
347ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (this == sourceStage) {
348ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            if (sourceStage.sourceSpliterator != null) {
349ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                @SuppressWarnings("unchecked")
350ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;
351ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                sourceStage.sourceSpliterator = null;
352ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                return s;
353ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            }
354ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            else if (sourceStage.sourceSupplier != null) {
355ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                @SuppressWarnings("unchecked")
356ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
357ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                sourceStage.sourceSupplier = null;
358ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                return lazySpliterator(s);
359ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            }
360ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            else {
361ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                throw new IllegalStateException(MSG_CONSUMED);
362ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            }
363ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
364ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        else {
365ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            return wrap(this, () -> sourceSpliterator(0), isParallel());
366ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
367ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
368ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
369ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
370ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    public final boolean isParallel() {
371ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return sourceStage.parallel;
372ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
373ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
374ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
375ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
376ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Returns the composition of stream flags of the stream source and all
377ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * intermediate operations.
378ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
379ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return the composition of stream flags of the stream source and all
380ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *         intermediate operations
381ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @see StreamOpFlag
382ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
383289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public final int getStreamFlags() {
384ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return StreamOpFlag.toStreamFlags(combinedFlags);
385ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
386ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
387ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
388ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Get the source spliterator for this pipeline stage.  For a sequential or
389ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * stateless parallel pipeline, this is the source spliterator.  For a
390ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * stateful parallel pipeline, this is a spliterator describing the results
391ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * of all computations up to and including the most recent stateful
392ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * operation.
393ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
394ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
395ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private Spliterator<?> sourceSpliterator(int terminalFlags) {
396ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        // Get the source spliterator of the pipeline
397ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        Spliterator<?> spliterator = null;
398ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (sourceStage.sourceSpliterator != null) {
399ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            spliterator = sourceStage.sourceSpliterator;
400ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            sourceStage.sourceSpliterator = null;
401ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
402ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        else if (sourceStage.sourceSupplier != null) {
403ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
404ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            sourceStage.sourceSupplier = null;
405ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
406ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        else {
407ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            throw new IllegalStateException(MSG_CONSUMED);
408ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
409ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
410ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (isParallel() && sourceStage.sourceAnyStateful) {
411ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            // Adapt the source spliterator, evaluating each stateful op
412ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            // in the pipeline up to and including this pipeline stage.
413ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            // The depth and flags of each pipeline stage are adjusted accordingly.
414ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            int depth = 1;
415ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
416ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                 u != e;
417ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                 u = p, p = p.nextStage) {
418ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
419ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                int thisOpFlags = p.sourceOrOpFlags;
420ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                if (p.opIsStateful()) {
421ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    depth = 0;
422ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
423ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
424ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        // Clear the short circuit flag for next pipeline stage
425ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        // This stage encapsulates short-circuiting, the next
426ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        // stage may not have any short-circuit operations, and
427ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        // if so spliterator.forEachRemaining should be used
428ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        // for traversal
429ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
430ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    }
431ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
432ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    spliterator = p.opEvaluateParallelLazy(u, spliterator);
433ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
434ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    // Inject or clear SIZED on the source pipeline stage
435ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    // based on the stage's spliterator
436ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
437ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
438ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
439ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                }
440ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                p.depth = depth++;
441ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
442ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            }
443ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
444ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
445ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (terminalFlags != 0)  {
446ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            // Apply flags from the terminal operation to last pipeline stage
447ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
448ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
449ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
450ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return spliterator;
451ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
452ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
453ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    // PipelineHelper
454ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
455ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
456ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    final StreamShape getSourceShape() {
457ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        @SuppressWarnings("rawtypes")
458ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        AbstractPipeline p = AbstractPipeline.this;
459ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        while (p.depth > 0) {
460ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            p = p.previousStage;
461ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
462ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return p.getOutputShape();
463ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
464ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
465ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
466ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
467ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
468ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
469ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
470ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
471ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
472ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
473ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return sink;
474ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
475ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
476ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
477ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
478ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        Objects.requireNonNull(wrappedSink);
479ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
480ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
481ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            wrappedSink.begin(spliterator.getExactSizeIfKnown());
482ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            spliterator.forEachRemaining(wrappedSink);
483ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            wrappedSink.end();
484ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
485ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        else {
486ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            copyIntoWithCancel(wrappedSink, spliterator);
487ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
488ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
489ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
490ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
491ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
492ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
493ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        @SuppressWarnings({"rawtypes","unchecked"})
494ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        AbstractPipeline p = AbstractPipeline.this;
495ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        while (p.depth > 0) {
496ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            p = p.previousStage;
497ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
498ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        wrappedSink.begin(spliterator.getExactSizeIfKnown());
499ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        p.forEachWithCancel(spliterator, wrappedSink);
500ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        wrappedSink.end();
501ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
502ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
503ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
504289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public final int getStreamAndOpFlags() {
505ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return combinedFlags;
506ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
507ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
508ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    final boolean isOrdered() {
509ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return StreamOpFlag.ORDERED.isKnown(combinedFlags);
510ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
511ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
512ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
513ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
514289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
515ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        Objects.requireNonNull(sink);
516ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
517ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
518ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
519ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
520ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return (Sink<P_IN>) sink;
521ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
522ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
523ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
524ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
525ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {
526ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (depth == 0) {
527ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            return (Spliterator<E_OUT>) sourceSpliterator;
528ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
529ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        else {
530ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            return wrap(this, () -> sourceSpliterator, isParallel());
531ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
532ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
533ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
534ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
535ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
536289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
537ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                      boolean flatten,
538ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                      IntFunction<E_OUT[]> generator) {
539ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        if (isParallel()) {
540ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            // @@@ Optimize if op of this pipeline stage is a stateful op
541ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            return evaluateToNode(this, spliterator, flatten, generator);
542ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
543ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        else {
544ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            Node.Builder<E_OUT> nb = makeNodeBuilder(
545ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    exactOutputSizeIfKnown(spliterator), generator);
546ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            return wrapAndCopyInto(nb, spliterator).build();
547ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        }
548ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
549ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
550ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
551ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    // Shape-specific abstract methods, implemented by XxxPipeline classes
552ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
553ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
554ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Get the output shape of the pipeline.  If the pipeline is the head,
555ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * then it's output shape corresponds to the shape of the source.
556ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Otherwise, it's output shape corresponds to the output shape of the
557ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * associated operation.
558ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
559ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return the output shape
560ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
561289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public abstract StreamShape getOutputShape();
562ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
563ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
564ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Collect elements output from a pipeline into a Node that holds elements
565ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * of this shape.
566ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
567ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param helper the pipeline helper describing the pipeline stages
568ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param spliterator the source spliterator
569ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param flattenTree true if the returned node should be flattened
570ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param generator the array generator
571ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return a Node holding the output of the pipeline
572ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
573289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public abstract <P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> helper,
574289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin                                                      Spliterator<P_IN> spliterator,
575289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin                                                      boolean flattenTree,
576289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin                                                      IntFunction<E_OUT[]> generator);
577ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
578ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
579ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Create a spliterator that wraps a source spliterator, compatible with
580ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * this stream shape, and operations associated with a {@link
581ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * PipelineHelper}.
582ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
583ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param ph the pipeline helper describing the pipeline stages
584ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param supplier the supplier of a spliterator
585ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return a wrapping spliterator compatible with this shape
586ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
587289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,
588289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin                                                   Supplier<Spliterator<P_IN>> supplier,
589289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin                                                   boolean isParallel);
590ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
591ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
592ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Create a lazy spliterator that wraps and obtains the supplied the
593ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * spliterator when a method is invoked on the lazy spliterator.
594ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param supplier the supplier of a spliterator
595ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
596289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public abstract Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier);
597ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
598ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
599ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Traverse the elements of a spliterator compatible with this stream shape,
600ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * pushing those elements into a sink.   If the sink requests cancellation,
601ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * no further elements will be pulled or pushed.
602ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
603ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param spliterator the spliterator to pull elements from
604ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param sink the sink to push elements to
605ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
606289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public abstract void forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
607ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
608ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
609ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Make a node builder compatible with this stream shape.
610ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
611ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param exactSizeIfKnown if {@literal >=0}, then a node builder will be
612ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * created that has a fixed capacity of at most sizeIfKnown elements. If
613ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * {@literal < 0}, then the node builder has an unfixed capacity. A fixed
614ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * capacity node builder will throw exceptions if an element is added after
615ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * builder has reached capacity, or is built before the builder has reached
616ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * capacity.
617ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
618ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param generator the array generator to be used to create instances of a
619ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * T[] array. For implementations supporting primitive nodes, this parameter
620ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * may be ignored.
621ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return a node builder
622ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
623ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @Override
624289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
625289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin                                                        IntFunction<E_OUT[]> generator);
626ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
627ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
628ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    // Op-specific abstract methods, implemented by the operation class
629ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
630ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
631ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Returns whether this operation is stateful or not.  If it is stateful,
632ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * then the method
633ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * {@link #opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}
634ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * must be overridden.
635ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
636ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return {@code true} if this operation is stateful
637ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
638289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public abstract boolean opIsStateful();
639ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
640ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
641ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Accepts a {@code Sink} which will receive the results of this operation,
642ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * and return a {@code Sink} which accepts elements of the input type of
643ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * this operation and which performs the operation, passing the results to
644ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * the provided {@code Sink}.
645ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
646ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @apiNote
647ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * The implementation may use the {@code flags} parameter to optimize the
648ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * sink wrapping.  For example, if the input is already {@code DISTINCT},
649ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * the implementation for the {@code Stream#distinct()} method could just
650ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * return the sink it was passed.
651ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
652ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param flags The combined stream and operation flags up to, but not
653ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *        including, this operation
654ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param sink sink to which elements should be sent after processing
655ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return a sink which accepts elements, perform the operation upon
656ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *         each element, and passes the results (if any) to the provided
657ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *         {@code Sink}.
658ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
659289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
660ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
661ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
662ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Performs a parallel evaluation of the operation using the specified
663ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * {@code PipelineHelper} which describes the upstream intermediate
664ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * operations.  Only called on stateful operations.  If {@link
665ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * #opIsStateful()} returns true then implementations must override the
666ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * default implementation.
667ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
668ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @implSpec The default implementation always throw
669ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * {@code UnsupportedOperationException}.
670ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
671ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param helper the pipeline helper describing the pipeline stages
672ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param spliterator the source {@code Spliterator}
673ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param generator the array generator
674ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return a {@code Node} describing the result of the evaluation
675ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
676289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
677ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                          Spliterator<P_IN> spliterator,
678ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                          IntFunction<E_OUT[]> generator) {
679ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        throw new UnsupportedOperationException("Parallel evaluation is not supported");
680ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
681ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
682ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
683ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Returns a {@code Spliterator} describing a parallel evaluation of the
684ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * operation, using the specified {@code PipelineHelper} which describes the
685ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * upstream intermediate operations.  Only called on stateful operations.
686ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * It is not necessary (though acceptable) to do a full computation of the
687ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * result here; it is preferable, if possible, to describe the result via a
688ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * lazily evaluated spliterator.
689ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
690ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @implSpec The default implementation behaves as if:
691ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * <pre>{@code
692ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *     return evaluateParallel(helper, i -> (E_OUT[]) new
693ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Object[i]).spliterator();
694ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * }</pre>
695ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * and is suitable for implementations that cannot do better than a full
696ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * synchronous evaluation.
697ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
698ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param helper the pipeline helper
699ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param spliterator the source {@code Spliterator}
700ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return a {@code Spliterator} describing the result of the evaluation
701ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
702ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    @SuppressWarnings("unchecked")
703289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin    public <P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
704ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                                     Spliterator<P_IN> spliterator) {
705ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
706ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
707ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin}
708