SliceOps.java revision c3d875180189cdb59ff48a3dc7cd4b8bf1efcea4
1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Spliterator;
28import java.util.concurrent.CountedCompleter;
29import java.util.function.IntFunction;
30
31/**
32 * Factory for instances of a short-circuiting stateful intermediate operations
33 * that produce subsequences of their input stream.
34 *
35 * @since 1.8
36 */
37final class SliceOps {
38
39    // No instances
40    private SliceOps() { }
41
42    /**
43     * Calculates the sliced size given the current size, number of elements
44     * skip, and the number of elements to limit.
45     *
46     * @param size the current size
47     * @param skip the number of elements to skip, assumed to be >= 0
48     * @param limit the number of elements to limit, assumed to be >= 0, with
49     *        a value of {@code Long.MAX_VALUE} if there is no limit
50     * @return the sliced size
51     */
52    private static long calcSize(long size, long skip, long limit) {
53        return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1;
54    }
55
56    /**
57     * Calculates the slice fence, which is one past the index of the slice
58     * range
59     * @param skip the number of elements to skip, assumed to be >= 0
60     * @param limit the number of elements to limit, assumed to be >= 0, with
61     *        a value of {@code Long.MAX_VALUE} if there is no limit
62     * @return the slice fence.
63     */
64    private static long calcSliceFence(long skip, long limit) {
65        long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE;
66        // Check for overflow
67        return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE;
68    }
69
70    /**
71     * Creates a slice spliterator given a stream shape governing the
72     * spliterator type.  Requires that the underlying Spliterator
73     * be SUBSIZED.
74     */
75    @SuppressWarnings("unchecked")
76    private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape,
77                                                             Spliterator<P_IN> s,
78                                                             long skip, long limit) {
79        assert s.hasCharacteristics(Spliterator.SUBSIZED);
80        long sliceFence = calcSliceFence(skip, limit);
81        switch (shape) {
82            case REFERENCE:
83                return new StreamSpliterators
84                        .SliceSpliterator.OfRef<>(s, skip, sliceFence);
85            case INT_VALUE:
86                return (Spliterator<P_IN>) new StreamSpliterators
87                        .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
88            case LONG_VALUE:
89                return (Spliterator<P_IN>) new StreamSpliterators
90                        .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
91            case DOUBLE_VALUE:
92                return (Spliterator<P_IN>) new StreamSpliterators
93                        .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
94            default:
95                throw new IllegalStateException("Unknown shape " + shape);
96        }
97    }
98
99    @SuppressWarnings("unchecked")
100    private static <T> IntFunction<T[]> castingArray() {
101        return size -> (T[]) new Object[size];
102    }
103
104    /**
105     * Appends a "slice" operation to the provided stream.  The slice operation
106     * may be may be skip-only, limit-only, or skip-and-limit.
107     *
108     * @param <T> the type of both input and output elements
109     * @param upstream a reference stream with element type T
110     * @param skip the number of elements to skip.  Must be >= 0.
111     * @param limit the maximum size of the resulting stream, or -1 if no limit
112     *        is to be imposed
113     */
114    public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
115                                        long skip, long limit) {
116        if (skip < 0)
117            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
118
119        return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
120                                                      flags(limit)) {
121            Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
122                                                         long skip, long limit, long sizeIfKnown) {
123                if (skip <= sizeIfKnown) {
124                    // Use just the limit if the number of elements
125                    // to skip is <= the known pipeline size
126                    limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
127                    skip = 0;
128                }
129                return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
130            }
131
132            @Override
133            <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
134                long size = helper.exactOutputSizeIfKnown(spliterator);
135                if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
136                    return new StreamSpliterators.SliceSpliterator.OfRef<>(
137                            helper.wrapSpliterator(spliterator),
138                            skip,
139                            calcSliceFence(skip, limit));
140                } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
141                    return unorderedSkipLimitSpliterator(
142                            helper.wrapSpliterator(spliterator),
143                            skip, limit, size);
144                }
145                else {
146                    // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n)
147                    //     regardless of the value of n
148                    //     Need to adjust the target size of splitting for the
149                    //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
150                    //     This will limit the size of the buffers created at the leaf nodes
151                    //     cancellation will be more aggressive cancelling later tasks
152                    //     if the target slice size has been reached from a given task,
153                    //     cancellation should also clear local results if any
154                    return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit).
155                            invoke().spliterator();
156                }
157            }
158
159            @Override
160            <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
161                                              Spliterator<P_IN> spliterator,
162                                              IntFunction<T[]> generator) {
163                long size = helper.exactOutputSizeIfKnown(spliterator);
164                if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
165                    // Because the pipeline is SIZED the slice spliterator
166                    // can be created from the source, this requires matching
167                    // to shape of the source, and is potentially more efficient
168                    // than creating the slice spliterator from the pipeline
169                    // wrapping spliterator
170                    Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
171                    return Nodes.collect(helper, s, true, generator);
172                } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
173                    Spliterator<T> s =  unorderedSkipLimitSpliterator(
174                            helper.wrapSpliterator(spliterator),
175                            skip, limit, size);
176                    // Collect using this pipeline, which is empty and therefore
177                    // can be used with the pipeline wrapping spliterator
178                    // Note that we cannot create a slice spliterator from
179                    // the source spliterator if the pipeline is not SIZED
180                    return Nodes.collect(this, s, true, generator);
181                }
182                else {
183                    return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
184                            invoke();
185                }
186            }
187
188            @Override
189            Sink<T> opWrapSink(int flags, Sink<T> sink) {
190                return new Sink.ChainedReference<T, T>(sink) {
191                    long n = skip;
192                    long m = limit >= 0 ? limit : Long.MAX_VALUE;
193
194                    @Override
195                    public void begin(long size) {
196                        downstream.begin(calcSize(size, skip, m));
197                    }
198
199                    @Override
200                    public void accept(T t) {
201                        if (n == 0) {
202                            if (m > 0) {
203                                m--;
204                                downstream.accept(t);
205                            }
206                        }
207                        else {
208                            n--;
209                        }
210                    }
211
212                    @Override
213                    public boolean cancellationRequested() {
214                        return m == 0 || downstream.cancellationRequested();
215                    }
216                };
217            }
218        };
219    }
220
221    /**
222     * Appends a "slice" operation to the provided IntStream.  The slice
223     * operation may be may be skip-only, limit-only, or skip-and-limit.
224     *
225     * @param upstream An IntStream
226     * @param skip The number of elements to skip.  Must be >= 0.
227     * @param limit The maximum size of the resulting stream, or -1 if no limit
228     *        is to be imposed
229     */
230    public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream,
231                                    long skip, long limit) {
232        if (skip < 0)
233            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
234
235        return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE,
236                                                   flags(limit)) {
237            Spliterator.OfInt unorderedSkipLimitSpliterator(
238                    Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) {
239                if (skip <= sizeIfKnown) {
240                    // Use just the limit if the number of elements
241                    // to skip is <= the known pipeline size
242                    limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
243                    skip = 0;
244                }
245                return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit);
246            }
247
248            @Override
249            <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
250                                                               Spliterator<P_IN> spliterator) {
251                long size = helper.exactOutputSizeIfKnown(spliterator);
252                if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
253                    return new StreamSpliterators.SliceSpliterator.OfInt(
254                            (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
255                            skip,
256                            calcSliceFence(skip, limit));
257                } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
258                    return unorderedSkipLimitSpliterator(
259                            (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
260                            skip, limit, size);
261                }
262                else {
263                    return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit).
264                            invoke().spliterator();
265                }
266            }
267
268            @Override
269            <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
270                                                    Spliterator<P_IN> spliterator,
271                                                    IntFunction<Integer[]> generator) {
272                long size = helper.exactOutputSizeIfKnown(spliterator);
273                if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
274                    // Because the pipeline is SIZED the slice spliterator
275                    // can be created from the source, this requires matching
276                    // to shape of the source, and is potentially more efficient
277                    // than creating the slice spliterator from the pipeline
278                    // wrapping spliterator
279                    Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
280                    return Nodes.collectInt(helper, s, true);
281                } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
282                    Spliterator.OfInt s =  unorderedSkipLimitSpliterator(
283                            (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
284                            skip, limit, size);
285                    // Collect using this pipeline, which is empty and therefore
286                    // can be used with the pipeline wrapping spliterator
287                    // Note that we cannot create a slice spliterator from
288                    // the source spliterator if the pipeline is not SIZED
289                    return Nodes.collectInt(this, s, true);
290                }
291                else {
292                    return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
293                            invoke();
294                }
295            }
296
297            @Override
298            Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
299                return new Sink.ChainedInt<Integer>(sink) {
300                    long n = skip;
301                    long m = limit >= 0 ? limit : Long.MAX_VALUE;
302
303                    @Override
304                    public void begin(long size) {
305                        downstream.begin(calcSize(size, skip, m));
306                    }
307
308                    @Override
309                    public void accept(int t) {
310                        if (n == 0) {
311                            if (m > 0) {
312                                m--;
313                                downstream.accept(t);
314                            }
315                        }
316                        else {
317                            n--;
318                        }
319                    }
320
321                    @Override
322                    public boolean cancellationRequested() {
323                        return m == 0 || downstream.cancellationRequested();
324                    }
325                };
326            }
327        };
328    }
329
330    /**
331     * Appends a "slice" operation to the provided LongStream.  The slice
332     * operation may be may be skip-only, limit-only, or skip-and-limit.
333     *
334     * @param upstream A LongStream
335     * @param skip The number of elements to skip.  Must be >= 0.
336     * @param limit The maximum size of the resulting stream, or -1 if no limit
337     *        is to be imposed
338     */
339    public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream,
340                                      long skip, long limit) {
341        if (skip < 0)
342            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
343
344        return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE,
345                                                 flags(limit)) {
346            Spliterator.OfLong unorderedSkipLimitSpliterator(
347                    Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) {
348                if (skip <= sizeIfKnown) {
349                    // Use just the limit if the number of elements
350                    // to skip is <= the known pipeline size
351                    limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
352                    skip = 0;
353                }
354                return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit);
355            }
356
357            @Override
358            <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
359                                                            Spliterator<P_IN> spliterator) {
360                long size = helper.exactOutputSizeIfKnown(spliterator);
361                if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
362                    return new StreamSpliterators.SliceSpliterator.OfLong(
363                            (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
364                            skip,
365                            calcSliceFence(skip, limit));
366                } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
367                    return unorderedSkipLimitSpliterator(
368                            (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
369                            skip, limit, size);
370                }
371                else {
372                    return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit).
373                            invoke().spliterator();
374                }
375            }
376
377            @Override
378            <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
379                                                 Spliterator<P_IN> spliterator,
380                                                 IntFunction<Long[]> generator) {
381                long size = helper.exactOutputSizeIfKnown(spliterator);
382                if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
383                    // Because the pipeline is SIZED the slice spliterator
384                    // can be created from the source, this requires matching
385                    // to shape of the source, and is potentially more efficient
386                    // than creating the slice spliterator from the pipeline
387                    // wrapping spliterator
388                    Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
389                    return Nodes.collectLong(helper, s, true);
390                } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
391                    Spliterator.OfLong s =  unorderedSkipLimitSpliterator(
392                            (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
393                            skip, limit, size);
394                    // Collect using this pipeline, which is empty and therefore
395                    // can be used with the pipeline wrapping spliterator
396                    // Note that we cannot create a slice spliterator from
397                    // the source spliterator if the pipeline is not SIZED
398                    return Nodes.collectLong(this, s, true);
399                }
400                else {
401                    return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
402                            invoke();
403                }
404            }
405
406            @Override
407            Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
408                return new Sink.ChainedLong<Long>(sink) {
409                    long n = skip;
410                    long m = limit >= 0 ? limit : Long.MAX_VALUE;
411
412                    @Override
413                    public void begin(long size) {
414                        downstream.begin(calcSize(size, skip, m));
415                    }
416
417                    @Override
418                    public void accept(long t) {
419                        if (n == 0) {
420                            if (m > 0) {
421                                m--;
422                                downstream.accept(t);
423                            }
424                        }
425                        else {
426                            n--;
427                        }
428                    }
429
430                    @Override
431                    public boolean cancellationRequested() {
432                        return m == 0 || downstream.cancellationRequested();
433                    }
434                };
435            }
436        };
437    }
438
439    /**
440     * Appends a "slice" operation to the provided DoubleStream.  The slice
441     * operation may be may be skip-only, limit-only, or skip-and-limit.
442     *
443     * @param upstream A DoubleStream
444     * @param skip The number of elements to skip.  Must be >= 0.
445     * @param limit The maximum size of the resulting stream, or -1 if no limit
446     *        is to be imposed
447     */
448    public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream,
449                                          long skip, long limit) {
450        if (skip < 0)
451            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
452
453        return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE,
454                                                     flags(limit)) {
455            Spliterator.OfDouble unorderedSkipLimitSpliterator(
456                    Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) {
457                if (skip <= sizeIfKnown) {
458                    // Use just the limit if the number of elements
459                    // to skip is <= the known pipeline size
460                    limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
461                    skip = 0;
462                }
463                return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit);
464            }
465
466            @Override
467            <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
468                                                              Spliterator<P_IN> spliterator) {
469                long size = helper.exactOutputSizeIfKnown(spliterator);
470                if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
471                    return new StreamSpliterators.SliceSpliterator.OfDouble(
472                            (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
473                            skip,
474                            calcSliceFence(skip, limit));
475                } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
476                    return unorderedSkipLimitSpliterator(
477                            (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
478                            skip, limit, size);
479                }
480                else {
481                    return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit).
482                            invoke().spliterator();
483                }
484            }
485
486            @Override
487            <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
488                                                   Spliterator<P_IN> spliterator,
489                                                   IntFunction<Double[]> generator) {
490                long size = helper.exactOutputSizeIfKnown(spliterator);
491                if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
492                    // Because the pipeline is SIZED the slice spliterator
493                    // can be created from the source, this requires matching
494                    // to shape of the source, and is potentially more efficient
495                    // than creating the slice spliterator from the pipeline
496                    // wrapping spliterator
497                    Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
498                    return Nodes.collectDouble(helper, s, true);
499                } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
500                    Spliterator.OfDouble s =  unorderedSkipLimitSpliterator(
501                            (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
502                            skip, limit, size);
503                    // Collect using this pipeline, which is empty and therefore
504                    // can be used with the pipeline wrapping spliterator
505                    // Note that we cannot create a slice spliterator from
506                    // the source spliterator if the pipeline is not SIZED
507                    return Nodes.collectDouble(this, s, true);
508                }
509                else {
510                    return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
511                            invoke();
512                }
513            }
514
515            @Override
516            Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
517                return new Sink.ChainedDouble<Double>(sink) {
518                    long n = skip;
519                    long m = limit >= 0 ? limit : Long.MAX_VALUE;
520
521                    @Override
522                    public void begin(long size) {
523                        downstream.begin(calcSize(size, skip, m));
524                    }
525
526                    @Override
527                    public void accept(double t) {
528                        if (n == 0) {
529                            if (m > 0) {
530                                m--;
531                                downstream.accept(t);
532                            }
533                        }
534                        else {
535                            n--;
536                        }
537                    }
538
539                    @Override
540                    public boolean cancellationRequested() {
541                        return m == 0 || downstream.cancellationRequested();
542                    }
543                };
544            }
545        };
546    }
547
548    private static int flags(long limit) {
549        return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
550    }
551
552    /**
553     * {@code ForkJoinTask} implementing slice computation.
554     *
555     * @param  Input element type to the stream pipeline
556     * @param  Output element type from the stream pipeline
557     */
558    @SuppressWarnings("serial")
559    private static final class SliceTask<P_IN, P_OUT>
560            extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> {
561        private final AbstractPipeline<P_OUT, P_OUT, ?> op;
562        private final IntFunction<P_OUT[]> generator;
563        private final long targetOffset, targetSize;
564        private long thisNodeSize;
565
566        private volatile boolean completed;
567
568        SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
569                  PipelineHelper<P_OUT> helper,
570                  Spliterator<P_IN> spliterator,
571                  IntFunction<P_OUT[]> generator,
572                  long offset, long size) {
573            super(helper, spliterator);
574            this.op = op;
575            this.generator = generator;
576            this.targetOffset = offset;
577            this.targetSize = size;
578        }
579
580        SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
581            super(parent, spliterator);
582            this.op = parent.op;
583            this.generator = parent.generator;
584            this.targetOffset = parent.targetOffset;
585            this.targetSize = parent.targetSize;
586        }
587
588        @Override
589        protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
590            return new SliceTask<>(this, spliterator);
591        }
592
593        @Override
594        protected final Node<P_OUT> getEmptyResult() {
595            return Nodes.emptyNode(op.getOutputShape());
596        }
597
598        @Override
599        protected final Node<P_OUT> doLeaf() {
600            if (isRoot()) {
601                long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags)
602                                   ? op.exactOutputSizeIfKnown(spliterator)
603                                   : -1;
604                final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator);
605                Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
606                helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
607                // There is no need to truncate since the op performs the
608                // skipping and limiting of elements
609                return nb.build();
610            }
611            else {
612                Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator),
613                                                          spliterator).build();
614                thisNodeSize = node.count();
615                completed = true;
616                spliterator = null;
617                return node;
618            }
619        }
620
621        @Override
622        public final void onCompletion(CountedCompleter<?> caller) {
623            if (!isLeaf()) {
624                Node<P_OUT> result;
625                thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
626                if (canceled) {
627                    thisNodeSize = 0;
628                    result = getEmptyResult();
629                }
630                else if (thisNodeSize == 0)
631                    result = getEmptyResult();
632                else if (leftChild.thisNodeSize == 0)
633                    result = rightChild.getLocalResult();
634                else {
635                    result = Nodes.conc(op.getOutputShape(),
636                                        leftChild.getLocalResult(), rightChild.getLocalResult());
637                }
638                setLocalResult(isRoot() ? doTruncate(result) : result);
639                completed = true;
640            }
641            if (targetSize >= 0
642                && !isRoot()
643                && isLeftCompleted(targetOffset + targetSize))
644                    cancelLaterNodes();
645
646            super.onCompletion(caller);
647        }
648
649        @Override
650        protected void cancel() {
651            super.cancel();
652            if (completed)
653                setLocalResult(getEmptyResult());
654        }
655
656        private Node<P_OUT> doTruncate(Node<P_OUT> input) {
657            long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize;
658            return input.truncate(targetOffset, to, generator);
659        }
660
661        /**
662         * Determine if the number of completed elements in this node and nodes
663         * to the left of this node is greater than or equal to the target size.
664         *
665         * @param target the target size
666         * @return true if the number of elements is greater than or equal to
667         *         the target size, otherwise false.
668         */
669        private boolean isLeftCompleted(long target) {
670            long size = completed ? thisNodeSize : completedSize(target);
671            if (size >= target)
672                return true;
673            for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this;
674                 parent != null;
675                 node = parent, parent = parent.getParent()) {
676                if (node == parent.rightChild) {
677                    SliceTask<P_IN, P_OUT> left = parent.leftChild;
678                    if (left != null) {
679                        size += left.completedSize(target);
680                        if (size >= target)
681                            return true;
682                    }
683                }
684            }
685            return size >= target;
686        }
687
688        /**
689         * Compute the number of completed elements in this node.
690         * <p>
691         * Computation terminates if all nodes have been processed or the
692         * number of completed elements is greater than or equal to the target
693         * size.
694         *
695         * @param target the target size
696         * @return return the number of completed elements
697         */
698        private long completedSize(long target) {
699            if (completed)
700                return thisNodeSize;
701            else {
702                SliceTask<P_IN, P_OUT> left = leftChild;
703                SliceTask<P_IN, P_OUT> right = rightChild;
704                if (left == null || right == null) {
705                    // must be completed
706                    return thisNodeSize;
707                }
708                else {
709                    long leftSize = left.completedSize(target);
710                    return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target);
711                }
712            }
713        }
714    }
715}
716