1/* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25package java.util.stream; 26 27import java.util.Comparator; 28import java.util.Iterator; 29import java.util.Objects; 30import java.util.Optional; 31import java.util.Spliterator; 32import java.util.Spliterators; 33import java.util.function.BiConsumer; 34import java.util.function.BiFunction; 35import java.util.function.BinaryOperator; 36import java.util.function.Consumer; 37import java.util.function.DoubleConsumer; 38import java.util.function.Function; 39import java.util.function.IntConsumer; 40import java.util.function.IntFunction; 41import java.util.function.LongConsumer; 42import java.util.function.Predicate; 43import java.util.function.Supplier; 44import java.util.function.ToDoubleFunction; 45import java.util.function.ToIntFunction; 46import java.util.function.ToLongFunction; 47 48/** 49 * Abstract base class for an intermediate pipeline stage or pipeline source 50 * stage implementing whose elements are of type {@code U}. 51 * 52 * @paramtype of elements in the upstream source 53 * @param type of elements in produced by this stage 54 * 55 * @since 1.8 56 * @hide Visible for CTS testing only (OpenJDK8 tests). 57 */ 58public abstract class ReferencePipeline<P_IN, P_OUT> 59 extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>> 60 implements Stream<P_OUT> { 61 62 /** 63 * Constructor for the head of a stream pipeline. 64 * 65 * @param source {@code Supplier<Spliterator>} describing the stream source 66 * @param sourceFlags the source flags for the stream source, described in 67 * {@link StreamOpFlag} 68 * @param parallel {@code true} if the pipeline is parallel 69 */ 70 ReferencePipeline(Supplier<? extends Spliterator<?>> source, 71 int sourceFlags, boolean parallel) { 72 super(source, sourceFlags, parallel); 73 } 74 75 /** 76 * Constructor for the head of a stream pipeline. 77 * 78 * @param source {@code Spliterator} describing the stream source 79 * @param sourceFlags The source flags for the stream source, described in 80 * {@link StreamOpFlag} 81 * @param parallel {@code true} if the pipeline is parallel 82 */ 83 ReferencePipeline(Spliterator<?> source, 84 int sourceFlags, boolean parallel) { 85 super(source, sourceFlags, parallel); 86 } 87 88 /** 89 * Constructor for appending an intermediate operation onto an existing 90 * pipeline. 91 * 92 * @param upstream the upstream element source. 93 */ 94 ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) { 95 super(upstream, opFlags); 96 } 97 98 // Shape-specific methods 99 100 @Override 101 public final StreamShape getOutputShape() { 102 return StreamShape.REFERENCE; 103 } 104 105 @Override 106 public final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper, 107 Spliterator<P_IN> spliterator, 108 boolean flattenTree, 109 IntFunction<P_OUT[]> generator) { 110 return Nodes.collect(helper, spliterator, flattenTree, generator); 111 } 112 113 @Override 114 public final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph, 115 Supplier<Spliterator<P_IN>> supplier, 116 boolean isParallel) { 117 return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel); 118 } 119 120 @Override 121 public final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) { 122 return new StreamSpliterators.DelegatingSpliterator<>(supplier); 123 } 124 125 @Override 126 public final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { 127 do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)); 128 } 129 130 @Override 131 public final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) { 132 return Nodes.builder(exactSizeIfKnown, generator); 133 } 134 135 136 // BaseStream 137 138 @Override 139 public final Iterator<P_OUT> iterator() { 140 return Spliterators.iterator(spliterator()); 141 } 142 143 144 // Stream 145 146 // Stateless intermediate operations from Stream 147 148 @Override 149 public Stream<P_OUT> unordered() { 150 if (!isOrdered()) 151 return this; 152 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) { 153 @Override 154 public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 155 return sink; 156 } 157 }; 158 } 159 160 @Override 161 public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { 162 Objects.requireNonNull(predicate); 163 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 164 StreamOpFlag.NOT_SIZED) { 165 @Override 166 public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 167 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { 168 @Override 169 public void begin(long size) { 170 downstream.begin(-1); 171 } 172 173 @Override 174 public void accept(P_OUT u) { 175 if (predicate.test(u)) 176 downstream.accept(u); 177 } 178 }; 179 } 180 }; 181 } 182 183 @Override 184 @SuppressWarnings("unchecked") 185 public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { 186 Objects.requireNonNull(mapper); 187 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, 188 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 189 @Override 190 public Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { 191 return new Sink.ChainedReference<P_OUT, R>(sink) { 192 @Override 193 public void accept(P_OUT u) { 194 downstream.accept(mapper.apply(u)); 195 } 196 }; 197 } 198 }; 199 } 200 201 @Override 202 public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) { 203 Objects.requireNonNull(mapper); 204 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 205 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 206 @Override 207 public Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { 208 return new Sink.ChainedReference<P_OUT, Integer>(sink) { 209 @Override 210 public void accept(P_OUT u) { 211 downstream.accept(mapper.applyAsInt(u)); 212 } 213 }; 214 } 215 }; 216 } 217 218 @Override 219 public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) { 220 Objects.requireNonNull(mapper); 221 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 222 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 223 @Override 224 public Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { 225 return new Sink.ChainedReference<P_OUT, Long>(sink) { 226 @Override 227 public void accept(P_OUT u) { 228 downstream.accept(mapper.applyAsLong(u)); 229 } 230 }; 231 } 232 }; 233 } 234 235 @Override 236 public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) { 237 Objects.requireNonNull(mapper); 238 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 239 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 240 @Override 241 public Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { 242 return new Sink.ChainedReference<P_OUT, Double>(sink) { 243 @Override 244 public void accept(P_OUT u) { 245 downstream.accept(mapper.applyAsDouble(u)); 246 } 247 }; 248 } 249 }; 250 } 251 252 @Override 253 public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { 254 Objects.requireNonNull(mapper); 255 // We can do better than this, by polling cancellationRequested when stream is infinite 256 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, 257 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 258 @Override 259 public Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { 260 return new Sink.ChainedReference<P_OUT, R>(sink) { 261 @Override 262 public void begin(long size) { 263 downstream.begin(-1); 264 } 265 266 @Override 267 public void accept(P_OUT u) { 268 try (Stream<? extends R> result = mapper.apply(u)) { 269 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 270 if (result != null) 271 result.sequential().forEach(downstream); 272 } 273 } 274 }; 275 } 276 }; 277 } 278 279 @Override 280 public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) { 281 Objects.requireNonNull(mapper); 282 // We can do better than this, by polling cancellationRequested when stream is infinite 283 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 284 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 285 @Override 286 public Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { 287 return new Sink.ChainedReference<P_OUT, Integer>(sink) { 288 IntConsumer downstreamAsInt = downstream::accept; 289 @Override 290 public void begin(long size) { 291 downstream.begin(-1); 292 } 293 294 @Override 295 public void accept(P_OUT u) { 296 try (IntStream result = mapper.apply(u)) { 297 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 298 if (result != null) 299 result.sequential().forEach(downstreamAsInt); 300 } 301 } 302 }; 303 } 304 }; 305 } 306 307 @Override 308 public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) { 309 Objects.requireNonNull(mapper); 310 // We can do better than this, by polling cancellationRequested when stream is infinite 311 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 312 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 313 @Override 314 public Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { 315 return new Sink.ChainedReference<P_OUT, Double>(sink) { 316 DoubleConsumer downstreamAsDouble = downstream::accept; 317 @Override 318 public void begin(long size) { 319 downstream.begin(-1); 320 } 321 322 @Override 323 public void accept(P_OUT u) { 324 try (DoubleStream result = mapper.apply(u)) { 325 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 326 if (result != null) 327 result.sequential().forEach(downstreamAsDouble); 328 } 329 } 330 }; 331 } 332 }; 333 } 334 335 @Override 336 public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) { 337 Objects.requireNonNull(mapper); 338 // We can do better than this, by polling cancellationRequested when stream is infinite 339 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 340 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 341 @Override 342 public Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { 343 return new Sink.ChainedReference<P_OUT, Long>(sink) { 344 LongConsumer downstreamAsLong = downstream::accept; 345 @Override 346 public void begin(long size) { 347 downstream.begin(-1); 348 } 349 350 @Override 351 public void accept(P_OUT u) { 352 try (LongStream result = mapper.apply(u)) { 353 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 354 if (result != null) 355 result.sequential().forEach(downstreamAsLong); 356 } 357 } 358 }; 359 } 360 }; 361 } 362 363 @Override 364 public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) { 365 Objects.requireNonNull(action); 366 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 367 0) { 368 @Override 369 public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 370 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { 371 @Override 372 public void accept(P_OUT u) { 373 action.accept(u); 374 downstream.accept(u); 375 } 376 }; 377 } 378 }; 379 } 380 381 // Stateful intermediate operations from Stream 382 383 @Override 384 public final Stream<P_OUT> distinct() { 385 return DistinctOps.makeRef(this); 386 } 387 388 @Override 389 public final Stream<P_OUT> sorted() { 390 return SortedOps.makeRef(this); 391 } 392 393 @Override 394 public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) { 395 return SortedOps.makeRef(this, comparator); 396 } 397 398 @Override 399 public final Stream<P_OUT> limit(long maxSize) { 400 if (maxSize < 0) 401 throw new IllegalArgumentException(Long.toString(maxSize)); 402 return SliceOps.makeRef(this, 0, maxSize); 403 } 404 405 @Override 406 public final Stream<P_OUT> skip(long n) { 407 if (n < 0) 408 throw new IllegalArgumentException(Long.toString(n)); 409 if (n == 0) 410 return this; 411 else 412 return SliceOps.makeRef(this, n, -1); 413 } 414 415 // Terminal operations from Stream 416 417 @Override 418 public void forEach(Consumer<? super P_OUT> action) { 419 evaluate(ForEachOps.makeRef(action, false)); 420 } 421 422 @Override 423 public void forEachOrdered(Consumer<? super P_OUT> action) { 424 evaluate(ForEachOps.makeRef(action, true)); 425 } 426 427 @Override 428 @SuppressWarnings("unchecked") 429 public final <A> A[] toArray(IntFunction<A[]> generator) { 430 // Since A has no relation to U (not possible to declare that A is an upper bound of U) 431 // there will be no static type checking. 432 // Therefore use a raw type and assume A == U rather than propagating the separation of A and U 433 // throughout the code-base. 434 // The runtime type of U is never checked for equality with the component type of the runtime type of A[]. 435 // Runtime checking will be performed when an element is stored in A[], thus if A is not a 436 // super type of U an ArrayStoreException will be thrown. 437 @SuppressWarnings("rawtypes") 438 IntFunction rawGenerator = (IntFunction) generator; 439 // TODO(b/29399275): Eclipse compiler requires explicit (Node<A[]>) cast below. 440 return (A[]) Nodes.flatten((Node<A[]>) evaluateToArrayNode(rawGenerator), rawGenerator) 441 .asArray(rawGenerator); 442 } 443 444 @Override 445 public final Object[] toArray() { 446 return toArray(Object[]::new); 447 } 448 449 @Override 450 public final boolean anyMatch(Predicate<? super P_OUT> predicate) { 451 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY)); 452 } 453 454 @Override 455 public final boolean allMatch(Predicate<? super P_OUT> predicate) { 456 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL)); 457 } 458 459 @Override 460 public final boolean noneMatch(Predicate<? super P_OUT> predicate) { 461 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE)); 462 } 463 464 @Override 465 public final Optional<P_OUT> findFirst() { 466 return evaluate(FindOps.makeRef(true)); 467 } 468 469 @Override 470 public final Optional<P_OUT> findAny() { 471 return evaluate(FindOps.makeRef(false)); 472 } 473 474 @Override 475 public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) { 476 return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator)); 477 } 478 479 @Override 480 public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) { 481 return evaluate(ReduceOps.makeRef(accumulator)); 482 } 483 484 @Override 485 public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) { 486 return evaluate(ReduceOps.makeRef(identity, accumulator, combiner)); 487 } 488 489 @Override 490 @SuppressWarnings("unchecked") 491 public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { 492 A container; 493 if (isParallel() 494 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) 495 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { 496 container = collector.supplier().get(); 497 BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); 498 forEach(u -> accumulator.accept(container, u)); 499 } 500 else { 501 container = evaluate(ReduceOps.makeRef(collector)); 502 } 503 return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) 504 ? (R) container 505 : collector.finisher().apply(container); 506 } 507 508 @Override 509 public final <R> R collect(Supplier<R> supplier, 510 BiConsumer<R, ? super P_OUT> accumulator, 511 BiConsumer<R, R> combiner) { 512 return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner)); 513 } 514 515 @Override 516 public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) { 517 return reduce(BinaryOperator.maxBy(comparator)); 518 } 519 520 @Override 521 public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) { 522 return reduce(BinaryOperator.minBy(comparator)); 523 524 } 525 526 @Override 527 public final long count() { 528 return mapToLong(e -> 1L).sum(); 529 } 530 531 532 // 533 534 /** 535 * Source stage of a ReferencePipeline. 536 * 537 * @param type of elements in the upstream source 538 * @param type of elements in produced by this stage 539 * @since 1.8 540 * @hide Visible for CTS testing only (OpenJDK8 tests). 541 */ 542 public static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { 543 /** 544 * Constructor for the source stage of a Stream. 545 * 546 * @param source {@code Supplier<Spliterator>} describing the stream 547 * source 548 * @param sourceFlags the source flags for the stream source, described 549 * in {@link StreamOpFlag} 550 */ 551 public Head(Supplier<? extends Spliterator<?>> source, 552 int sourceFlags, boolean parallel) { 553 super(source, sourceFlags, parallel); 554 } 555 556 /** 557 * Constructor for the source stage of a Stream. 558 * 559 * @param source {@code Spliterator} describing the stream source 560 * @param sourceFlags the source flags for the stream source, described 561 * in {@link StreamOpFlag} 562 */ 563 public Head(Spliterator<?> source, 564 int sourceFlags, boolean parallel) { 565 super(source, sourceFlags, parallel); 566 } 567 568 @Override 569 public final boolean opIsStateful() { 570 throw new UnsupportedOperationException(); 571 } 572 573 @Override 574 public final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) { 575 throw new UnsupportedOperationException(); 576 } 577 578 // Optimized sequential terminal operations for the head of the pipeline 579 580 @Override 581 public void forEach(Consumer<? super E_OUT> action) { 582 if (!isParallel()) { 583 sourceStageSpliterator().forEachRemaining(action); 584 } 585 else { 586 super.forEach(action); 587 } 588 } 589 590 @Override 591 public void forEachOrdered(Consumer<? super E_OUT> action) { 592 if (!isParallel()) { 593 sourceStageSpliterator().forEachRemaining(action); 594 } 595 else { 596 super.forEachOrdered(action); 597 } 598 } 599 } 600 601 /** 602 * Base class for a stateless intermediate stage of a Stream. 603 * 604 * @param type of elements in the upstream source 605 * @param type of elements in produced by this stage 606 * @since 1.8 607 * @hide Visible for CTS testing only (OpenJDK8 tests). 608 */ 609 public abstract static class StatelessOp<E_IN, E_OUT> 610 extends ReferencePipeline<E_IN, E_OUT> { 611 /** 612 * Construct a new Stream by appending a stateless intermediate 613 * operation to an existing stream. 614 * 615 * @param upstream The upstream pipeline stage 616 * @param inputShape The stream shape for the upstream pipeline stage 617 * @param opFlags Operation flags for the new stage 618 */ 619 public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 620 StreamShape inputShape, 621 int opFlags) { 622 super(upstream, opFlags); 623 assert upstream.getOutputShape() == inputShape; 624 } 625 626 @Override 627 public final boolean opIsStateful() { 628 return false; 629 } 630 } 631 632 /** 633 * Base class for a stateful intermediate stage of a Stream. 634 * 635 * @param type of elements in the upstream source 636 * @param type of elements in produced by this stage 637 * @since 1.8 638 * @hide Visible for CTS testing only (OpenJDK8 tests). 639 */ 640 public abstract static class StatefulOp<E_IN, E_OUT> 641 extends ReferencePipeline<E_IN, E_OUT> { 642 /** 643 * Construct a new Stream by appending a stateful intermediate operation 644 * to an existing stream. 645 * @param upstream The upstream pipeline stage 646 * @param inputShape The stream shape for the upstream pipeline stage 647 * @param opFlags Operation flags for the new stage 648 */ 649 public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 650 StreamShape inputShape, 651 int opFlags) { 652 super(upstream, opFlags); 653 assert upstream.getOutputShape() == inputShape; 654 } 655 656 @Override 657 public final boolean opIsStateful() { 658 return true; 659 } 660 661 @Override 662 public abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper, 663 Spliterator<P_IN> spliterator, 664 IntFunction<E_OUT[]> generator); 665 } 666} 667