AbstractPipeline.java revision c3d875180189cdb59ff48a3dc7cd4b8bf1efcea4
1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Objects;
28import java.util.Spliterator;
29import java.util.function.IntFunction;
30import java.util.function.Supplier;
31
32/**
33 * Abstract base class for "pipeline" classes, which are the core
34 * implementations of the Stream interface and its primitive specializations.
35 * Manages construction and evaluation of stream pipelines.
36 *
37 * <p>An {@code AbstractPipeline} represents an initial portion of a stream
38 * pipeline, encapsulating a stream source and zero or more intermediate
39 * operations.  The individual {@code AbstractPipeline} objects are often
40 * referred to as <em>stages</em>, where each stage describes either the stream
41 * source or an intermediate operation.
42 *
43 * <p>A concrete intermediate stage is generally built from an
44 * {@code AbstractPipeline}, a shape-specific pipeline class which extends it
45 * (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific
46 * concrete class which extends that.  {@code AbstractPipeline} contains most of
47 * the mechanics of evaluating the pipeline, and implements methods that will be
48 * used by the operation; the shape-specific classes add helper methods for
49 * dealing with collection of results into the appropriate shape-specific
50 * containers.
51 *
52 * <p>After chaining a new intermediate operation, or executing a terminal
53 * operation, the stream is considered to be consumed, and no more intermediate
54 * or terminal operations are permitted on this stream instance.
55 *
56 * @implNote
57 * <p>For sequential streams, and parallel streams without
58 * <a href="package-summary.html#StreamOps">stateful intermediate
59 * operations</a>, parallel streams, pipeline evaluation is done in a single
60 * pass that "jams" all the operations together.  For parallel streams with
61 * stateful operations, execution is divided into segments, where each
62 * stateful operations marks the end of a segment, and each segment is
63 * evaluated separately and the result used as the input to the next
64 * segment.  In all cases, the source data is not consumed until a terminal
65 * operation begins.
66 *
67 * @param   type of input elements
68 * @param  type of output elements
69 * @param <S> type of the subclass implementing {@code BaseStream}
70 * @since 1.8
71 */
72abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
73        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
74    private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
75    private static final String MSG_CONSUMED = "source already consumed or closed";
76
77    /**
78     * Backlink to the head of the pipeline chain (self if this is the source
79     * stage).
80     */
81    @SuppressWarnings("rawtypes")
82    private final AbstractPipeline sourceStage;
83
84    /**
85     * The "upstream" pipeline, or null if this is the source stage.
86     */
87    @SuppressWarnings("rawtypes")
88    private final AbstractPipeline previousStage;
89
90    /**
91     * The operation flags for the intermediate operation represented by this
92     * pipeline object.
93     */
94    protected final int sourceOrOpFlags;
95
96    /**
97     * The next stage in the pipeline, or null if this is the last stage.
98     * Effectively final at the point of linking to the next pipeline.
99     */
100    @SuppressWarnings("rawtypes")
101    private AbstractPipeline nextStage;
102
103    /**
104     * The number of intermediate operations between this pipeline object
105     * and the stream source if sequential, or the previous stateful if parallel.
106     * Valid at the point of pipeline preparation for evaluation.
107     */
108    private int depth;
109
110    /**
111     * The combined source and operation flags for the source and all operations
112     * up to and including the operation represented by this pipeline object.
113     * Valid at the point of pipeline preparation for evaluation.
114     */
115    private int combinedFlags;
116
117    /**
118     * The source spliterator. Only valid for the head pipeline.
119     * Before the pipeline is consumed if non-null then {@code sourceSupplier}
120     * must be null. After the pipeline is consumed if non-null then is set to
121     * null.
122     */
123    private Spliterator<?> sourceSpliterator;
124
125    /**
126     * The source supplier. Only valid for the head pipeline. Before the
127     * pipeline is consumed if non-null then {@code sourceSpliterator} must be
128     * null. After the pipeline is consumed if non-null then is set to null.
129     */
130    private Supplier<? extends Spliterator<?>> sourceSupplier;
131
132    /**
133     * True if this pipeline has been linked or consumed
134     */
135    private boolean linkedOrConsumed;
136
137    /**
138     * True if there are any stateful ops in the pipeline; only valid for the
139     * source stage.
140     */
141    private boolean sourceAnyStateful;
142
143    private Runnable sourceCloseAction;
144
145    /**
146     * True if pipeline is parallel, otherwise the pipeline is sequential; only
147     * valid for the source stage.
148     */
149    private boolean parallel;
150
151    /**
152     * Constructor for the head of a stream pipeline.
153     *
154     * @param source {@code Supplier<Spliterator>} describing the stream source
155     * @param sourceFlags The source flags for the stream source, described in
156     * {@link StreamOpFlag}
157     * @param parallel True if the pipeline is parallel
158     */
159    AbstractPipeline(Supplier<? extends Spliterator<?>> source,
160                     int sourceFlags, boolean parallel) {
161        this.previousStage = null;
162        this.sourceSupplier = source;
163        this.sourceStage = this;
164        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
165        // The following is an optimization of:
166        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
167        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
168        this.depth = 0;
169        this.parallel = parallel;
170    }
171
172    /**
173     * Constructor for the head of a stream pipeline.
174     *
175     * @param source {@code Spliterator} describing the stream source
176     * @param sourceFlags the source flags for the stream source, described in
177     * {@link StreamOpFlag}
178     * @param parallel {@code true} if the pipeline is parallel
179     */
180    AbstractPipeline(Spliterator<?> source,
181                     int sourceFlags, boolean parallel) {
182        this.previousStage = null;
183        this.sourceSpliterator = source;
184        this.sourceStage = this;
185        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
186        // The following is an optimization of:
187        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
188        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
189        this.depth = 0;
190        this.parallel = parallel;
191    }
192
193    /**
194     * Constructor for appending an intermediate operation stage onto an
195     * existing pipeline.
196     *
197     * @param previousStage the upstream pipeline stage
198     * @param opFlags the operation flags for the new stage, described in
199     * {@link StreamOpFlag}
200     */
201    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
202        if (previousStage.linkedOrConsumed)
203            throw new IllegalStateException(MSG_STREAM_LINKED);
204        previousStage.linkedOrConsumed = true;
205        previousStage.nextStage = this;
206
207        this.previousStage = previousStage;
208        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
209        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
210        this.sourceStage = previousStage.sourceStage;
211        if (opIsStateful())
212            sourceStage.sourceAnyStateful = true;
213        this.depth = previousStage.depth + 1;
214    }
215
216
217    // Terminal evaluation methods
218
219    /**
220     * Evaluate the pipeline with a terminal operation to produce a result.
221     *
222     * @param <R> the type of result
223     * @param terminalOp the terminal operation to be applied to the pipeline.
224     * @return the result
225     */
226    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
227        assert getOutputShape() == terminalOp.inputShape();
228        if (linkedOrConsumed)
229            throw new IllegalStateException(MSG_STREAM_LINKED);
230        linkedOrConsumed = true;
231
232        return isParallel()
233               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
234               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
235    }
236
237    /**
238     * Collect the elements output from the pipeline stage.
239     *
240     * @param generator the array generator to be used to create array instances
241     * @return a flat array-backed Node that holds the collected output elements
242     */
243    @SuppressWarnings("unchecked")
244    final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
245        if (linkedOrConsumed)
246            throw new IllegalStateException(MSG_STREAM_LINKED);
247        linkedOrConsumed = true;
248
249        // If the last intermediate operation is stateful then
250        // evaluate directly to avoid an extra collection step
251        if (isParallel() && previousStage != null && opIsStateful()) {
252            // Set the depth of this, last, pipeline stage to zero to slice the
253            // pipeline such that this operation will not be included in the
254            // upstream slice and upstream operations will not be included
255            // in this slice
256            depth = 0;
257            return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
258        }
259        else {
260            return evaluate(sourceSpliterator(0), true, generator);
261        }
262    }
263
264    /**
265     * Gets the source stage spliterator if this pipeline stage is the source
266     * stage.  The pipeline is consumed after this method is called and
267     * returns successfully.
268     *
269     * @return the source stage spliterator
270     * @throws IllegalStateException if this pipeline stage is not the source
271     *         stage.
272     */
273    @SuppressWarnings("unchecked")
274    final Spliterator<E_OUT> sourceStageSpliterator() {
275        if (this != sourceStage)
276            throw new IllegalStateException();
277
278        if (linkedOrConsumed)
279            throw new IllegalStateException(MSG_STREAM_LINKED);
280        linkedOrConsumed = true;
281
282        if (sourceStage.sourceSpliterator != null) {
283            @SuppressWarnings("unchecked")
284            Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
285            sourceStage.sourceSpliterator = null;
286            return s;
287        }
288        else if (sourceStage.sourceSupplier != null) {
289            @SuppressWarnings("unchecked")
290            Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
291            sourceStage.sourceSupplier = null;
292            return s;
293        }
294        else {
295            throw new IllegalStateException(MSG_CONSUMED);
296        }
297    }
298
299    // BaseStream
300
301    @Override
302    @SuppressWarnings("unchecked")
303    public final S sequential() {
304        sourceStage.parallel = false;
305        return (S) this;
306    }
307
308    @Override
309    @SuppressWarnings("unchecked")
310    public final S parallel() {
311        sourceStage.parallel = true;
312        return (S) this;
313    }
314
315    @Override
316    public void close() {
317        linkedOrConsumed = true;
318        sourceSupplier = null;
319        sourceSpliterator = null;
320        if (sourceStage.sourceCloseAction != null) {
321            Runnable closeAction = sourceStage.sourceCloseAction;
322            sourceStage.sourceCloseAction = null;
323            closeAction.run();
324        }
325    }
326
327    @Override
328    @SuppressWarnings("unchecked")
329    public S onClose(Runnable closeHandler) {
330        Runnable existingHandler = sourceStage.sourceCloseAction;
331        sourceStage.sourceCloseAction =
332                (existingHandler == null)
333                ? closeHandler
334                : Streams.composeWithExceptions(existingHandler, closeHandler);
335        return (S) this;
336    }
337
338    // Primitive specialization use co-variant overrides, hence is not final
339    @Override
340    @SuppressWarnings("unchecked")
341    public Spliterator<E_OUT> spliterator() {
342        if (linkedOrConsumed)
343            throw new IllegalStateException(MSG_STREAM_LINKED);
344        linkedOrConsumed = true;
345
346        if (this == sourceStage) {
347            if (sourceStage.sourceSpliterator != null) {
348                @SuppressWarnings("unchecked")
349                Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;
350                sourceStage.sourceSpliterator = null;
351                return s;
352            }
353            else if (sourceStage.sourceSupplier != null) {
354                @SuppressWarnings("unchecked")
355                Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
356                sourceStage.sourceSupplier = null;
357                return lazySpliterator(s);
358            }
359            else {
360                throw new IllegalStateException(MSG_CONSUMED);
361            }
362        }
363        else {
364            return wrap(this, () -> sourceSpliterator(0), isParallel());
365        }
366    }
367
368    @Override
369    public final boolean isParallel() {
370        return sourceStage.parallel;
371    }
372
373
374    /**
375     * Returns the composition of stream flags of the stream source and all
376     * intermediate operations.
377     *
378     * @return the composition of stream flags of the stream source and all
379     *         intermediate operations
380     * @see StreamOpFlag
381     */
382    final int getStreamFlags() {
383        return StreamOpFlag.toStreamFlags(combinedFlags);
384    }
385
386    /**
387     * Get the source spliterator for this pipeline stage.  For a sequential or
388     * stateless parallel pipeline, this is the source spliterator.  For a
389     * stateful parallel pipeline, this is a spliterator describing the results
390     * of all computations up to and including the most recent stateful
391     * operation.
392     */
393    @SuppressWarnings("unchecked")
394    private Spliterator<?> sourceSpliterator(int terminalFlags) {
395        // Get the source spliterator of the pipeline
396        Spliterator<?> spliterator = null;
397        if (sourceStage.sourceSpliterator != null) {
398            spliterator = sourceStage.sourceSpliterator;
399            sourceStage.sourceSpliterator = null;
400        }
401        else if (sourceStage.sourceSupplier != null) {
402            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
403            sourceStage.sourceSupplier = null;
404        }
405        else {
406            throw new IllegalStateException(MSG_CONSUMED);
407        }
408
409        if (isParallel() && sourceStage.sourceAnyStateful) {
410            // Adapt the source spliterator, evaluating each stateful op
411            // in the pipeline up to and including this pipeline stage.
412            // The depth and flags of each pipeline stage are adjusted accordingly.
413            int depth = 1;
414            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
415                 u != e;
416                 u = p, p = p.nextStage) {
417
418                int thisOpFlags = p.sourceOrOpFlags;
419                if (p.opIsStateful()) {
420                    depth = 0;
421
422                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
423                        // Clear the short circuit flag for next pipeline stage
424                        // This stage encapsulates short-circuiting, the next
425                        // stage may not have any short-circuit operations, and
426                        // if so spliterator.forEachRemaining should be used
427                        // for traversal
428                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
429                    }
430
431                    spliterator = p.opEvaluateParallelLazy(u, spliterator);
432
433                    // Inject or clear SIZED on the source pipeline stage
434                    // based on the stage's spliterator
435                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
436                            ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
437                            : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
438                }
439                p.depth = depth++;
440                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
441            }
442        }
443
444        if (terminalFlags != 0)  {
445            // Apply flags from the terminal operation to last pipeline stage
446            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
447        }
448
449        return spliterator;
450    }
451
452    // PipelineHelper
453
454    @Override
455    final StreamShape getSourceShape() {
456        @SuppressWarnings("rawtypes")
457        AbstractPipeline p = AbstractPipeline.this;
458        while (p.depth > 0) {
459            p = p.previousStage;
460        }
461        return p.getOutputShape();
462    }
463
464    @Override
465    final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
466        return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
467    }
468
469    @Override
470    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
471        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
472        return sink;
473    }
474
475    @Override
476    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
477        Objects.requireNonNull(wrappedSink);
478
479        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
480            wrappedSink.begin(spliterator.getExactSizeIfKnown());
481            spliterator.forEachRemaining(wrappedSink);
482            wrappedSink.end();
483        }
484        else {
485            copyIntoWithCancel(wrappedSink, spliterator);
486        }
487    }
488
489    @Override
490    @SuppressWarnings("unchecked")
491    final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
492        @SuppressWarnings({"rawtypes","unchecked"})
493        AbstractPipeline p = AbstractPipeline.this;
494        while (p.depth > 0) {
495            p = p.previousStage;
496        }
497        wrappedSink.begin(spliterator.getExactSizeIfKnown());
498        p.forEachWithCancel(spliterator, wrappedSink);
499        wrappedSink.end();
500    }
501
502    @Override
503    final int getStreamAndOpFlags() {
504        return combinedFlags;
505    }
506
507    final boolean isOrdered() {
508        return StreamOpFlag.ORDERED.isKnown(combinedFlags);
509    }
510
511    @Override
512    @SuppressWarnings("unchecked")
513    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
514        Objects.requireNonNull(sink);
515
516        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
517            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
518        }
519        return (Sink<P_IN>) sink;
520    }
521
522    @Override
523    @SuppressWarnings("unchecked")
524    final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {
525        if (depth == 0) {
526            return (Spliterator<E_OUT>) sourceSpliterator;
527        }
528        else {
529            return wrap(this, () -> sourceSpliterator, isParallel());
530        }
531    }
532
533    @Override
534    @SuppressWarnings("unchecked")
535    final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
536                                      boolean flatten,
537                                      IntFunction<E_OUT[]> generator) {
538        if (isParallel()) {
539            // @@@ Optimize if op of this pipeline stage is a stateful op
540            return evaluateToNode(this, spliterator, flatten, generator);
541        }
542        else {
543            Node.Builder<E_OUT> nb = makeNodeBuilder(
544                    exactOutputSizeIfKnown(spliterator), generator);
545            return wrapAndCopyInto(nb, spliterator).build();
546        }
547    }
548
549
550    // Shape-specific abstract methods, implemented by XxxPipeline classes
551
552    /**
553     * Get the output shape of the pipeline.  If the pipeline is the head,
554     * then it's output shape corresponds to the shape of the source.
555     * Otherwise, it's output shape corresponds to the output shape of the
556     * associated operation.
557     *
558     * @return the output shape
559     */
560    abstract StreamShape getOutputShape();
561
562    /**
563     * Collect elements output from a pipeline into a Node that holds elements
564     * of this shape.
565     *
566     * @param helper the pipeline helper describing the pipeline stages
567     * @param spliterator the source spliterator
568     * @param flattenTree true if the returned node should be flattened
569     * @param generator the array generator
570     * @return a Node holding the output of the pipeline
571     */
572    abstract <P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> helper,
573                                               Spliterator<P_IN> spliterator,
574                                               boolean flattenTree,
575                                               IntFunction<E_OUT[]> generator);
576
577    /**
578     * Create a spliterator that wraps a source spliterator, compatible with
579     * this stream shape, and operations associated with a {@link
580     * PipelineHelper}.
581     *
582     * @param ph the pipeline helper describing the pipeline stages
583     * @param supplier the supplier of a spliterator
584     * @return a wrapping spliterator compatible with this shape
585     */
586    abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,
587                                            Supplier<Spliterator<P_IN>> supplier,
588                                            boolean isParallel);
589
590    /**
591     * Create a lazy spliterator that wraps and obtains the supplied the
592     * spliterator when a method is invoked on the lazy spliterator.
593     * @param supplier the supplier of a spliterator
594     */
595    abstract Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier);
596
597    /**
598     * Traverse the elements of a spliterator compatible with this stream shape,
599     * pushing those elements into a sink.   If the sink requests cancellation,
600     * no further elements will be pulled or pushed.
601     *
602     * @param spliterator the spliterator to pull elements from
603     * @param sink the sink to push elements to
604     */
605    abstract void forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
606
607    /**
608     * Make a node builder compatible with this stream shape.
609     *
610     * @param exactSizeIfKnown if {@literal >=0}, then a node builder will be
611     * created that has a fixed capacity of at most sizeIfKnown elements. If
612     * {@literal < 0}, then the node builder has an unfixed capacity. A fixed
613     * capacity node builder will throw exceptions if an element is added after
614     * builder has reached capacity, or is built before the builder has reached
615     * capacity.
616     *
617     * @param generator the array generator to be used to create instances of a
618     * T[] array. For implementations supporting primitive nodes, this parameter
619     * may be ignored.
620     * @return a node builder
621     */
622    @Override
623    abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
624                                                 IntFunction<E_OUT[]> generator);
625
626
627    // Op-specific abstract methods, implemented by the operation class
628
629    /**
630     * Returns whether this operation is stateful or not.  If it is stateful,
631     * then the method
632     * {@link #opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}
633     * must be overridden.
634     *
635     * @return {@code true} if this operation is stateful
636     */
637    abstract boolean opIsStateful();
638
639    /**
640     * Accepts a {@code Sink} which will receive the results of this operation,
641     * and return a {@code Sink} which accepts elements of the input type of
642     * this operation and which performs the operation, passing the results to
643     * the provided {@code Sink}.
644     *
645     * @apiNote
646     * The implementation may use the {@code flags} parameter to optimize the
647     * sink wrapping.  For example, if the input is already {@code DISTINCT},
648     * the implementation for the {@code Stream#distinct()} method could just
649     * return the sink it was passed.
650     *
651     * @param flags The combined stream and operation flags up to, but not
652     *        including, this operation
653     * @param sink sink to which elements should be sent after processing
654     * @return a sink which accepts elements, perform the operation upon
655     *         each element, and passes the results (if any) to the provided
656     *         {@code Sink}.
657     */
658    abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
659
660    /**
661     * Performs a parallel evaluation of the operation using the specified
662     * {@code PipelineHelper} which describes the upstream intermediate
663     * operations.  Only called on stateful operations.  If {@link
664     * #opIsStateful()} returns true then implementations must override the
665     * default implementation.
666     *
667     * @implSpec The default implementation always throw
668     * {@code UnsupportedOperationException}.
669     *
670     * @param helper the pipeline helper describing the pipeline stages
671     * @param spliterator the source {@code Spliterator}
672     * @param generator the array generator
673     * @return a {@code Node} describing the result of the evaluation
674     */
675    <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
676                                          Spliterator<P_IN> spliterator,
677                                          IntFunction<E_OUT[]> generator) {
678        throw new UnsupportedOperationException("Parallel evaluation is not supported");
679    }
680
681    /**
682     * Returns a {@code Spliterator} describing a parallel evaluation of the
683     * operation, using the specified {@code PipelineHelper} which describes the
684     * upstream intermediate operations.  Only called on stateful operations.
685     * It is not necessary (though acceptable) to do a full computation of the
686     * result here; it is preferable, if possible, to describe the result via a
687     * lazily evaluated spliterator.
688     *
689     * @implSpec The default implementation behaves as if:
690     * <pre>{@code
691     *     return evaluateParallel(helper, i -> (E_OUT[]) new
692     * Object[i]).spliterator();
693     * }</pre>
694     * and is suitable for implementations that cannot do better than a full
695     * synchronous evaluation.
696     *
697     * @param helper the pipeline helper
698     * @param spliterator the source {@code Spliterator}
699     * @return a {@code Spliterator} describing the result of the evaluation
700     */
701    @SuppressWarnings("unchecked")
702    <P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
703                                                     Spliterator<P_IN> spliterator) {
704        return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
705    }
706}
707