AbstractPipeline.java revision 289e51c2258b001f2aa6d6e67b21be7bb65d5102
1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Objects;
28import java.util.Spliterator;
29import java.util.function.IntFunction;
30import java.util.function.Supplier;
31
32/**
33 * Abstract base class for "pipeline" classes, which are the core
34 * implementations of the Stream interface and its primitive specializations.
35 * Manages construction and evaluation of stream pipelines.
36 *
37 * <p>An {@code AbstractPipeline} represents an initial portion of a stream
38 * pipeline, encapsulating a stream source and zero or more intermediate
39 * operations.  The individual {@code AbstractPipeline} objects are often
40 * referred to as <em>stages</em>, where each stage describes either the stream
41 * source or an intermediate operation.
42 *
43 * <p>A concrete intermediate stage is generally built from an
44 * {@code AbstractPipeline}, a shape-specific pipeline class which extends it
45 * (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific
46 * concrete class which extends that.  {@code AbstractPipeline} contains most of
47 * the mechanics of evaluating the pipeline, and implements methods that will be
48 * used by the operation; the shape-specific classes add helper methods for
49 * dealing with collection of results into the appropriate shape-specific
50 * containers.
51 *
52 * <p>After chaining a new intermediate operation, or executing a terminal
53 * operation, the stream is considered to be consumed, and no more intermediate
54 * or terminal operations are permitted on this stream instance.
55 *
56 * @implNote
57 * <p>For sequential streams, and parallel streams without
58 * <a href="package-summary.html#StreamOps">stateful intermediate
59 * operations</a>, parallel streams, pipeline evaluation is done in a single
60 * pass that "jams" all the operations together.  For parallel streams with
61 * stateful operations, execution is divided into segments, where each
62 * stateful operations marks the end of a segment, and each segment is
63 * evaluated separately and the result used as the input to the next
64 * segment.  In all cases, the source data is not consumed until a terminal
65 * operation begins.
66 *
67 * @param   type of input elements
68 * @param  type of output elements
69 * @param <S> type of the subclass implementing {@code BaseStream}
70 * @since 1.8
71 * @hide Visibility for CTS only (OpenJDK 8 streams tests).
72 */
73public abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
74        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
75    private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
76    private static final String MSG_CONSUMED = "source already consumed or closed";
77
78    /**
79     * Backlink to the head of the pipeline chain (self if this is the source
80     * stage).
81     */
82    @SuppressWarnings("rawtypes")
83    private final AbstractPipeline sourceStage;
84
85    /**
86     * The "upstream" pipeline, or null if this is the source stage.
87     */
88    @SuppressWarnings("rawtypes")
89    private final AbstractPipeline previousStage;
90
91    /**
92     * The operation flags for the intermediate operation represented by this
93     * pipeline object.
94     */
95    protected final int sourceOrOpFlags;
96
97    /**
98     * The next stage in the pipeline, or null if this is the last stage.
99     * Effectively final at the point of linking to the next pipeline.
100     */
101    @SuppressWarnings("rawtypes")
102    private AbstractPipeline nextStage;
103
104    /**
105     * The number of intermediate operations between this pipeline object
106     * and the stream source if sequential, or the previous stateful if parallel.
107     * Valid at the point of pipeline preparation for evaluation.
108     */
109    private int depth;
110
111    /**
112     * The combined source and operation flags for the source and all operations
113     * up to and including the operation represented by this pipeline object.
114     * Valid at the point of pipeline preparation for evaluation.
115     */
116    private int combinedFlags;
117
118    /**
119     * The source spliterator. Only valid for the head pipeline.
120     * Before the pipeline is consumed if non-null then {@code sourceSupplier}
121     * must be null. After the pipeline is consumed if non-null then is set to
122     * null.
123     */
124    private Spliterator<?> sourceSpliterator;
125
126    /**
127     * The source supplier. Only valid for the head pipeline. Before the
128     * pipeline is consumed if non-null then {@code sourceSpliterator} must be
129     * null. After the pipeline is consumed if non-null then is set to null.
130     */
131    private Supplier<? extends Spliterator<?>> sourceSupplier;
132
133    /**
134     * True if this pipeline has been linked or consumed
135     */
136    private boolean linkedOrConsumed;
137
138    /**
139     * True if there are any stateful ops in the pipeline; only valid for the
140     * source stage.
141     */
142    private boolean sourceAnyStateful;
143
144    private Runnable sourceCloseAction;
145
146    /**
147     * True if pipeline is parallel, otherwise the pipeline is sequential; only
148     * valid for the source stage.
149     */
150    private boolean parallel;
151
152    /**
153     * Constructor for the head of a stream pipeline.
154     *
155     * @param source {@code Supplier<Spliterator>} describing the stream source
156     * @param sourceFlags The source flags for the stream source, described in
157     * {@link StreamOpFlag}
158     * @param parallel True if the pipeline is parallel
159     */
160    AbstractPipeline(Supplier<? extends Spliterator<?>> source,
161                     int sourceFlags, boolean parallel) {
162        this.previousStage = null;
163        this.sourceSupplier = source;
164        this.sourceStage = this;
165        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
166        // The following is an optimization of:
167        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
168        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
169        this.depth = 0;
170        this.parallel = parallel;
171    }
172
173    /**
174     * Constructor for the head of a stream pipeline.
175     *
176     * @param source {@code Spliterator} describing the stream source
177     * @param sourceFlags the source flags for the stream source, described in
178     * {@link StreamOpFlag}
179     * @param parallel {@code true} if the pipeline is parallel
180     */
181    AbstractPipeline(Spliterator<?> source,
182                     int sourceFlags, boolean parallel) {
183        this.previousStage = null;
184        this.sourceSpliterator = source;
185        this.sourceStage = this;
186        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
187        // The following is an optimization of:
188        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
189        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
190        this.depth = 0;
191        this.parallel = parallel;
192    }
193
194    /**
195     * Constructor for appending an intermediate operation stage onto an
196     * existing pipeline.
197     *
198     * @param previousStage the upstream pipeline stage
199     * @param opFlags the operation flags for the new stage, described in
200     * {@link StreamOpFlag}
201     */
202    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
203        if (previousStage.linkedOrConsumed)
204            throw new IllegalStateException(MSG_STREAM_LINKED);
205        previousStage.linkedOrConsumed = true;
206        previousStage.nextStage = this;
207
208        this.previousStage = previousStage;
209        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
210        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
211        this.sourceStage = previousStage.sourceStage;
212        if (opIsStateful())
213            sourceStage.sourceAnyStateful = true;
214        this.depth = previousStage.depth + 1;
215    }
216
217
218    // Terminal evaluation methods
219
220    /**
221     * Evaluate the pipeline with a terminal operation to produce a result.
222     *
223     * @param <R> the type of result
224     * @param terminalOp the terminal operation to be applied to the pipeline.
225     * @return the result
226     */
227    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
228        assert getOutputShape() == terminalOp.inputShape();
229        if (linkedOrConsumed)
230            throw new IllegalStateException(MSG_STREAM_LINKED);
231        linkedOrConsumed = true;
232
233        return isParallel()
234               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
235               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
236    }
237
238    /**
239     * Collect the elements output from the pipeline stage.
240     *
241     * @param generator the array generator to be used to create array instances
242     * @return a flat array-backed Node that holds the collected output elements
243     */
244    @SuppressWarnings("unchecked")
245    public final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
246        if (linkedOrConsumed)
247            throw new IllegalStateException(MSG_STREAM_LINKED);
248        linkedOrConsumed = true;
249
250        // If the last intermediate operation is stateful then
251        // evaluate directly to avoid an extra collection step
252        if (isParallel() && previousStage != null && opIsStateful()) {
253            // Set the depth of this, last, pipeline stage to zero to slice the
254            // pipeline such that this operation will not be included in the
255            // upstream slice and upstream operations will not be included
256            // in this slice
257            depth = 0;
258            return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
259        }
260        else {
261            return evaluate(sourceSpliterator(0), true, generator);
262        }
263    }
264
265    /**
266     * Gets the source stage spliterator if this pipeline stage is the source
267     * stage.  The pipeline is consumed after this method is called and
268     * returns successfully.
269     *
270     * @return the source stage spliterator
271     * @throws IllegalStateException if this pipeline stage is not the source
272     *         stage.
273     */
274    @SuppressWarnings("unchecked")
275    final Spliterator<E_OUT> sourceStageSpliterator() {
276        if (this != sourceStage)
277            throw new IllegalStateException();
278
279        if (linkedOrConsumed)
280            throw new IllegalStateException(MSG_STREAM_LINKED);
281        linkedOrConsumed = true;
282
283        if (sourceStage.sourceSpliterator != null) {
284            @SuppressWarnings("unchecked")
285            Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
286            sourceStage.sourceSpliterator = null;
287            return s;
288        }
289        else if (sourceStage.sourceSupplier != null) {
290            @SuppressWarnings("unchecked")
291            Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
292            sourceStage.sourceSupplier = null;
293            return s;
294        }
295        else {
296            throw new IllegalStateException(MSG_CONSUMED);
297        }
298    }
299
300    // BaseStream
301
302    @Override
303    @SuppressWarnings("unchecked")
304    public final S sequential() {
305        sourceStage.parallel = false;
306        return (S) this;
307    }
308
309    @Override
310    @SuppressWarnings("unchecked")
311    public final S parallel() {
312        sourceStage.parallel = true;
313        return (S) this;
314    }
315
316    @Override
317    public void close() {
318        linkedOrConsumed = true;
319        sourceSupplier = null;
320        sourceSpliterator = null;
321        if (sourceStage.sourceCloseAction != null) {
322            Runnable closeAction = sourceStage.sourceCloseAction;
323            sourceStage.sourceCloseAction = null;
324            closeAction.run();
325        }
326    }
327
328    @Override
329    @SuppressWarnings("unchecked")
330    public S onClose(Runnable closeHandler) {
331        Runnable existingHandler = sourceStage.sourceCloseAction;
332        sourceStage.sourceCloseAction =
333                (existingHandler == null)
334                ? closeHandler
335                : Streams.composeWithExceptions(existingHandler, closeHandler);
336        return (S) this;
337    }
338
339    // Primitive specialization use co-variant overrides, hence is not final
340    @Override
341    @SuppressWarnings("unchecked")
342    public Spliterator<E_OUT> spliterator() {
343        if (linkedOrConsumed)
344            throw new IllegalStateException(MSG_STREAM_LINKED);
345        linkedOrConsumed = true;
346
347        if (this == sourceStage) {
348            if (sourceStage.sourceSpliterator != null) {
349                @SuppressWarnings("unchecked")
350                Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;
351                sourceStage.sourceSpliterator = null;
352                return s;
353            }
354            else if (sourceStage.sourceSupplier != null) {
355                @SuppressWarnings("unchecked")
356                Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
357                sourceStage.sourceSupplier = null;
358                return lazySpliterator(s);
359            }
360            else {
361                throw new IllegalStateException(MSG_CONSUMED);
362            }
363        }
364        else {
365            return wrap(this, () -> sourceSpliterator(0), isParallel());
366        }
367    }
368
369    @Override
370    public final boolean isParallel() {
371        return sourceStage.parallel;
372    }
373
374
375    /**
376     * Returns the composition of stream flags of the stream source and all
377     * intermediate operations.
378     *
379     * @return the composition of stream flags of the stream source and all
380     *         intermediate operations
381     * @see StreamOpFlag
382     */
383    public final int getStreamFlags() {
384        return StreamOpFlag.toStreamFlags(combinedFlags);
385    }
386
387    /**
388     * Get the source spliterator for this pipeline stage.  For a sequential or
389     * stateless parallel pipeline, this is the source spliterator.  For a
390     * stateful parallel pipeline, this is a spliterator describing the results
391     * of all computations up to and including the most recent stateful
392     * operation.
393     */
394    @SuppressWarnings("unchecked")
395    private Spliterator<?> sourceSpliterator(int terminalFlags) {
396        // Get the source spliterator of the pipeline
397        Spliterator<?> spliterator = null;
398        if (sourceStage.sourceSpliterator != null) {
399            spliterator = sourceStage.sourceSpliterator;
400            sourceStage.sourceSpliterator = null;
401        }
402        else if (sourceStage.sourceSupplier != null) {
403            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
404            sourceStage.sourceSupplier = null;
405        }
406        else {
407            throw new IllegalStateException(MSG_CONSUMED);
408        }
409
410        if (isParallel() && sourceStage.sourceAnyStateful) {
411            // Adapt the source spliterator, evaluating each stateful op
412            // in the pipeline up to and including this pipeline stage.
413            // The depth and flags of each pipeline stage are adjusted accordingly.
414            int depth = 1;
415            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
416                 u != e;
417                 u = p, p = p.nextStage) {
418
419                int thisOpFlags = p.sourceOrOpFlags;
420                if (p.opIsStateful()) {
421                    depth = 0;
422
423                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
424                        // Clear the short circuit flag for next pipeline stage
425                        // This stage encapsulates short-circuiting, the next
426                        // stage may not have any short-circuit operations, and
427                        // if so spliterator.forEachRemaining should be used
428                        // for traversal
429                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
430                    }
431
432                    spliterator = p.opEvaluateParallelLazy(u, spliterator);
433
434                    // Inject or clear SIZED on the source pipeline stage
435                    // based on the stage's spliterator
436                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
437                            ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
438                            : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
439                }
440                p.depth = depth++;
441                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
442            }
443        }
444
445        if (terminalFlags != 0)  {
446            // Apply flags from the terminal operation to last pipeline stage
447            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
448        }
449
450        return spliterator;
451    }
452
453    // PipelineHelper
454
455    @Override
456    final StreamShape getSourceShape() {
457        @SuppressWarnings("rawtypes")
458        AbstractPipeline p = AbstractPipeline.this;
459        while (p.depth > 0) {
460            p = p.previousStage;
461        }
462        return p.getOutputShape();
463    }
464
465    @Override
466    final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
467        return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
468    }
469
470    @Override
471    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
472        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
473        return sink;
474    }
475
476    @Override
477    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
478        Objects.requireNonNull(wrappedSink);
479
480        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
481            wrappedSink.begin(spliterator.getExactSizeIfKnown());
482            spliterator.forEachRemaining(wrappedSink);
483            wrappedSink.end();
484        }
485        else {
486            copyIntoWithCancel(wrappedSink, spliterator);
487        }
488    }
489
490    @Override
491    @SuppressWarnings("unchecked")
492    final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
493        @SuppressWarnings({"rawtypes","unchecked"})
494        AbstractPipeline p = AbstractPipeline.this;
495        while (p.depth > 0) {
496            p = p.previousStage;
497        }
498        wrappedSink.begin(spliterator.getExactSizeIfKnown());
499        p.forEachWithCancel(spliterator, wrappedSink);
500        wrappedSink.end();
501    }
502
503    @Override
504    public final int getStreamAndOpFlags() {
505        return combinedFlags;
506    }
507
508    final boolean isOrdered() {
509        return StreamOpFlag.ORDERED.isKnown(combinedFlags);
510    }
511
512    @Override
513    @SuppressWarnings("unchecked")
514    public final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
515        Objects.requireNonNull(sink);
516
517        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
518            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
519        }
520        return (Sink<P_IN>) sink;
521    }
522
523    @Override
524    @SuppressWarnings("unchecked")
525    final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {
526        if (depth == 0) {
527            return (Spliterator<E_OUT>) sourceSpliterator;
528        }
529        else {
530            return wrap(this, () -> sourceSpliterator, isParallel());
531        }
532    }
533
534    @Override
535    @SuppressWarnings("unchecked")
536    public final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
537                                      boolean flatten,
538                                      IntFunction<E_OUT[]> generator) {
539        if (isParallel()) {
540            // @@@ Optimize if op of this pipeline stage is a stateful op
541            return evaluateToNode(this, spliterator, flatten, generator);
542        }
543        else {
544            Node.Builder<E_OUT> nb = makeNodeBuilder(
545                    exactOutputSizeIfKnown(spliterator), generator);
546            return wrapAndCopyInto(nb, spliterator).build();
547        }
548    }
549
550
551    // Shape-specific abstract methods, implemented by XxxPipeline classes
552
553    /**
554     * Get the output shape of the pipeline.  If the pipeline is the head,
555     * then it's output shape corresponds to the shape of the source.
556     * Otherwise, it's output shape corresponds to the output shape of the
557     * associated operation.
558     *
559     * @return the output shape
560     */
561    public abstract StreamShape getOutputShape();
562
563    /**
564     * Collect elements output from a pipeline into a Node that holds elements
565     * of this shape.
566     *
567     * @param helper the pipeline helper describing the pipeline stages
568     * @param spliterator the source spliterator
569     * @param flattenTree true if the returned node should be flattened
570     * @param generator the array generator
571     * @return a Node holding the output of the pipeline
572     */
573    public abstract <P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> helper,
574                                                      Spliterator<P_IN> spliterator,
575                                                      boolean flattenTree,
576                                                      IntFunction<E_OUT[]> generator);
577
578    /**
579     * Create a spliterator that wraps a source spliterator, compatible with
580     * this stream shape, and operations associated with a {@link
581     * PipelineHelper}.
582     *
583     * @param ph the pipeline helper describing the pipeline stages
584     * @param supplier the supplier of a spliterator
585     * @return a wrapping spliterator compatible with this shape
586     */
587    public abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,
588                                                   Supplier<Spliterator<P_IN>> supplier,
589                                                   boolean isParallel);
590
591    /**
592     * Create a lazy spliterator that wraps and obtains the supplied the
593     * spliterator when a method is invoked on the lazy spliterator.
594     * @param supplier the supplier of a spliterator
595     */
596    public abstract Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier);
597
598    /**
599     * Traverse the elements of a spliterator compatible with this stream shape,
600     * pushing those elements into a sink.   If the sink requests cancellation,
601     * no further elements will be pulled or pushed.
602     *
603     * @param spliterator the spliterator to pull elements from
604     * @param sink the sink to push elements to
605     */
606    public abstract void forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
607
608    /**
609     * Make a node builder compatible with this stream shape.
610     *
611     * @param exactSizeIfKnown if {@literal >=0}, then a node builder will be
612     * created that has a fixed capacity of at most sizeIfKnown elements. If
613     * {@literal < 0}, then the node builder has an unfixed capacity. A fixed
614     * capacity node builder will throw exceptions if an element is added after
615     * builder has reached capacity, or is built before the builder has reached
616     * capacity.
617     *
618     * @param generator the array generator to be used to create instances of a
619     * T[] array. For implementations supporting primitive nodes, this parameter
620     * may be ignored.
621     * @return a node builder
622     */
623    @Override
624    public abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
625                                                        IntFunction<E_OUT[]> generator);
626
627
628    // Op-specific abstract methods, implemented by the operation class
629
630    /**
631     * Returns whether this operation is stateful or not.  If it is stateful,
632     * then the method
633     * {@link #opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}
634     * must be overridden.
635     *
636     * @return {@code true} if this operation is stateful
637     */
638    public abstract boolean opIsStateful();
639
640    /**
641     * Accepts a {@code Sink} which will receive the results of this operation,
642     * and return a {@code Sink} which accepts elements of the input type of
643     * this operation and which performs the operation, passing the results to
644     * the provided {@code Sink}.
645     *
646     * @apiNote
647     * The implementation may use the {@code flags} parameter to optimize the
648     * sink wrapping.  For example, if the input is already {@code DISTINCT},
649     * the implementation for the {@code Stream#distinct()} method could just
650     * return the sink it was passed.
651     *
652     * @param flags The combined stream and operation flags up to, but not
653     *        including, this operation
654     * @param sink sink to which elements should be sent after processing
655     * @return a sink which accepts elements, perform the operation upon
656     *         each element, and passes the results (if any) to the provided
657     *         {@code Sink}.
658     */
659    public abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
660
661    /**
662     * Performs a parallel evaluation of the operation using the specified
663     * {@code PipelineHelper} which describes the upstream intermediate
664     * operations.  Only called on stateful operations.  If {@link
665     * #opIsStateful()} returns true then implementations must override the
666     * default implementation.
667     *
668     * @implSpec The default implementation always throw
669     * {@code UnsupportedOperationException}.
670     *
671     * @param helper the pipeline helper describing the pipeline stages
672     * @param spliterator the source {@code Spliterator}
673     * @param generator the array generator
674     * @return a {@code Node} describing the result of the evaluation
675     */
676    public <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
677                                          Spliterator<P_IN> spliterator,
678                                          IntFunction<E_OUT[]> generator) {
679        throw new UnsupportedOperationException("Parallel evaluation is not supported");
680    }
681
682    /**
683     * Returns a {@code Spliterator} describing a parallel evaluation of the
684     * operation, using the specified {@code PipelineHelper} which describes the
685     * upstream intermediate operations.  Only called on stateful operations.
686     * It is not necessary (though acceptable) to do a full computation of the
687     * result here; it is preferable, if possible, to describe the result via a
688     * lazily evaluated spliterator.
689     *
690     * @implSpec The default implementation behaves as if:
691     * <pre>{@code
692     *     return evaluateParallel(helper, i -> (E_OUT[]) new
693     * Object[i]).spliterator();
694     * }</pre>
695     * and is suitable for implementations that cannot do better than a full
696     * synchronous evaluation.
697     *
698     * @param helper the pipeline helper
699     * @param spliterator the source {@code Spliterator}
700     * @return a {@code Spliterator} describing the result of the evaluation
701     */
702    @SuppressWarnings("unchecked")
703    public <P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
704                                                     Spliterator<P_IN> spliterator) {
705        return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
706    }
707}
708