IntPipeline.java revision c3d875180189cdb59ff48a3dc7cd4b8bf1efcea4
1/* 2 * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25package java.util.stream; 26 27import java.util.IntSummaryStatistics; 28import java.util.Objects; 29import java.util.OptionalDouble; 30import java.util.OptionalInt; 31import java.util.PrimitiveIterator; 32import java.util.Spliterator; 33import java.util.Spliterators; 34import java.util.function.BiConsumer; 35import java.util.function.BinaryOperator; 36import java.util.function.IntBinaryOperator; 37import java.util.function.IntConsumer; 38import java.util.function.IntFunction; 39import java.util.function.IntPredicate; 40import java.util.function.IntToDoubleFunction; 41import java.util.function.IntToLongFunction; 42import java.util.function.IntUnaryOperator; 43import java.util.function.ObjIntConsumer; 44import java.util.function.Supplier; 45 46/** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code int}. 49 * 50 * @paramtype of elements in the upstream source 51 * @since 1.8 52 */ 53abstract class IntPipeline<E_IN> 54 extends AbstractPipeline<E_IN, Integer, IntStream> 55 implements IntStream { 56 57 /** 58 * Constructor for the head of a stream pipeline. 59 * 60 * @param source {@code Supplier<Spliterator>} describing the stream source 61 * @param sourceFlags The source flags for the stream source, described in 62 * {@link StreamOpFlag} 63 * @param parallel {@code true} if the pipeline is parallel 64 */ 65 IntPipeline(Supplier<? extends Spliterator<Integer>> source, 66 int sourceFlags, boolean parallel) { 67 super(source, sourceFlags, parallel); 68 } 69 70 /** 71 * Constructor for the head of a stream pipeline. 72 * 73 * @param source {@code Spliterator} describing the stream source 74 * @param sourceFlags The source flags for the stream source, described in 75 * {@link StreamOpFlag} 76 * @param parallel {@code true} if the pipeline is parallel 77 */ 78 IntPipeline(Spliterator<Integer> source, 79 int sourceFlags, boolean parallel) { 80 super(source, sourceFlags, parallel); 81 } 82 83 /** 84 * Constructor for appending an intermediate operation onto an existing 85 * pipeline. 86 * 87 * @param upstream the upstream element source 88 * @param opFlags the operation flags for the new operation 89 */ 90 IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 91 super(upstream, opFlags); 92 } 93 94 /** 95 * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply 96 * by casting. 97 */ 98 private static IntConsumer adapt(Sink<Integer> sink) { 99 if (sink instanceof IntConsumer) { 100 return (IntConsumer) sink; 101 } 102 else { 103 if (Tripwire.ENABLED) 104 Tripwire.trip(AbstractPipeline.class, 105 "using IntStream.adapt(Sink<Integer> s)"); 106 return sink::accept; 107 } 108 } 109 110 /** 111 * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}. 112 * 113 * @implNote 114 * The implementation attempts to cast to a Spliterator.OfInt, and throws an 115 * exception if this cast is not possible. 116 */ 117 private static Spliterator.OfInt adapt(Spliterator<Integer> s) { 118 if (s instanceof Spliterator.OfInt) { 119 return (Spliterator.OfInt) s; 120 } 121 else { 122 if (Tripwire.ENABLED) 123 Tripwire.trip(AbstractPipeline.class, 124 "using IntStream.adapt(Spliterator<Integer> s)"); 125 throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)"); 126 } 127 } 128 129 130 // Shape-specific methods 131 132 @Override 133 final StreamShape getOutputShape() { 134 return StreamShape.INT_VALUE; 135 } 136 137 @Override 138 final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper, 139 Spliterator<P_IN> spliterator, 140 boolean flattenTree, 141 IntFunction<Integer[]> generator) { 142 return Nodes.collectInt(helper, spliterator, flattenTree); 143 } 144 145 @Override 146 final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph, 147 Supplier<Spliterator<P_IN>> supplier, 148 boolean isParallel) { 149 return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel); 150 } 151 152 @Override 153 @SuppressWarnings("unchecked") 154 final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) { 155 return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier); 156 } 157 158 @Override 159 final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) { 160 Spliterator.OfInt spl = adapt(spliterator); 161 IntConsumer adaptedSink = adapt(sink); 162 do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); 163 } 164 165 @Override 166 final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown, 167 IntFunction<Integer[]> generator) { 168 return Nodes.intBuilder(exactSizeIfKnown); 169 } 170 171 172 // IntStream 173 174 @Override 175 public final PrimitiveIterator.OfInt iterator() { 176 return Spliterators.iterator(spliterator()); 177 } 178 179 @Override 180 public final Spliterator.OfInt spliterator() { 181 return adapt(super.spliterator()); 182 } 183 184 // Stateless intermediate ops from IntStream 185 186 @Override 187 public final LongStream asLongStream() { 188 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 189 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 190 @Override 191 Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 192 return new Sink.ChainedInt<Long>(sink) { 193 @Override 194 public void accept(int t) { 195 downstream.accept((long) t); 196 } 197 }; 198 } 199 }; 200 } 201 202 @Override 203 public final DoubleStream asDoubleStream() { 204 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 205 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 206 @Override 207 Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 208 return new Sink.ChainedInt<Double>(sink) { 209 @Override 210 public void accept(int t) { 211 downstream.accept((double) t); 212 } 213 }; 214 } 215 }; 216 } 217 218 @Override 219 public final Stream<Integer> boxed() { 220 return mapToObj(Integer::valueOf); 221 } 222 223 @Override 224 public final IntStream map(IntUnaryOperator mapper) { 225 Objects.requireNonNull(mapper); 226 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 227 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 228 @Override 229 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 230 return new Sink.ChainedInt<Integer>(sink) { 231 @Override 232 public void accept(int t) { 233 downstream.accept(mapper.applyAsInt(t)); 234 } 235 }; 236 } 237 }; 238 } 239 240 @Override 241 public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) { 242 Objects.requireNonNull(mapper); 243 return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE, 244 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 245 @Override 246 Sink<Integer> opWrapSink(int flags, Sink<U> sink) { 247 return new Sink.ChainedInt<U>(sink) { 248 @Override 249 public void accept(int t) { 250 downstream.accept(mapper.apply(t)); 251 } 252 }; 253 } 254 }; 255 } 256 257 @Override 258 public final LongStream mapToLong(IntToLongFunction mapper) { 259 Objects.requireNonNull(mapper); 260 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 261 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 262 @Override 263 Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 264 return new Sink.ChainedInt<Long>(sink) { 265 @Override 266 public void accept(int t) { 267 downstream.accept(mapper.applyAsLong(t)); 268 } 269 }; 270 } 271 }; 272 } 273 274 @Override 275 public final DoubleStream mapToDouble(IntToDoubleFunction mapper) { 276 Objects.requireNonNull(mapper); 277 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 278 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 279 @Override 280 Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 281 return new Sink.ChainedInt<Double>(sink) { 282 @Override 283 public void accept(int t) { 284 downstream.accept(mapper.applyAsDouble(t)); 285 } 286 }; 287 } 288 }; 289 } 290 291 @Override 292 public final IntStream flatMap(IntFunction<? extends IntStream> mapper) { 293 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 294 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 295 @Override 296 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 297 return new Sink.ChainedInt<Integer>(sink) { 298 @Override 299 public void begin(long size) { 300 downstream.begin(-1); 301 } 302 303 @Override 304 public void accept(int t) { 305 try (IntStream result = mapper.apply(t)) { 306 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 307 if (result != null) 308 result.sequential().forEach(i -> downstream.accept(i)); 309 } 310 } 311 }; 312 } 313 }; 314 } 315 316 @Override 317 public IntStream unordered() { 318 if (!isOrdered()) 319 return this; 320 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) { 321 @Override 322 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 323 return sink; 324 } 325 }; 326 } 327 328 @Override 329 public final IntStream filter(IntPredicate predicate) { 330 Objects.requireNonNull(predicate); 331 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 332 StreamOpFlag.NOT_SIZED) { 333 @Override 334 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 335 return new Sink.ChainedInt<Integer>(sink) { 336 @Override 337 public void begin(long size) { 338 downstream.begin(-1); 339 } 340 341 @Override 342 public void accept(int t) { 343 if (predicate.test(t)) 344 downstream.accept(t); 345 } 346 }; 347 } 348 }; 349 } 350 351 @Override 352 public final IntStream peek(IntConsumer action) { 353 Objects.requireNonNull(action); 354 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 355 0) { 356 @Override 357 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 358 return new Sink.ChainedInt<Integer>(sink) { 359 @Override 360 public void accept(int t) { 361 action.accept(t); 362 downstream.accept(t); 363 } 364 }; 365 } 366 }; 367 } 368 369 // Stateful intermediate ops from IntStream 370 371 @Override 372 public final IntStream limit(long maxSize) { 373 if (maxSize < 0) 374 throw new IllegalArgumentException(Long.toString(maxSize)); 375 return SliceOps.makeInt(this, 0, maxSize); 376 } 377 378 @Override 379 public final IntStream skip(long n) { 380 if (n < 0) 381 throw new IllegalArgumentException(Long.toString(n)); 382 if (n == 0) 383 return this; 384 else 385 return SliceOps.makeInt(this, n, -1); 386 } 387 388 @Override 389 public final IntStream sorted() { 390 return SortedOps.makeInt(this); 391 } 392 393 @Override 394 public final IntStream distinct() { 395 // While functional and quick to implement, this approach is not very efficient. 396 // An efficient version requires an int-specific map/set implementation. 397 return boxed().distinct().mapToInt(i -> i); 398 } 399 400 // Terminal ops from IntStream 401 402 @Override 403 public void forEach(IntConsumer action) { 404 evaluate(ForEachOps.makeInt(action, false)); 405 } 406 407 @Override 408 public void forEachOrdered(IntConsumer action) { 409 evaluate(ForEachOps.makeInt(action, true)); 410 } 411 412 @Override 413 public final int sum() { 414 return reduce(0, Integer::sum); 415 } 416 417 @Override 418 public final OptionalInt min() { 419 return reduce(Math::min); 420 } 421 422 @Override 423 public final OptionalInt max() { 424 return reduce(Math::max); 425 } 426 427 @Override 428 public final long count() { 429 return mapToLong(e -> 1L).sum(); 430 } 431 432 @Override 433 public final OptionalDouble average() { 434 long[] avg = collect(() -> new long[2], 435 (ll, i) -> { 436 ll[0]++; 437 ll[1] += i; 438 }, 439 (ll, rr) -> { 440 ll[0] += rr[0]; 441 ll[1] += rr[1]; 442 }); 443 return avg[0] > 0 444 ? OptionalDouble.of((double) avg[1] / avg[0]) 445 : OptionalDouble.empty(); 446 } 447 448 @Override 449 public final IntSummaryStatistics summaryStatistics() { 450 return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept, 451 IntSummaryStatistics::combine); 452 } 453 454 @Override 455 public final int reduce(int identity, IntBinaryOperator op) { 456 return evaluate(ReduceOps.makeInt(identity, op)); 457 } 458 459 @Override 460 public final OptionalInt reduce(IntBinaryOperator op) { 461 return evaluate(ReduceOps.makeInt(op)); 462 } 463 464 @Override 465 public final <R> R collect(Supplier<R> supplier, 466 ObjIntConsumer<R> accumulator, 467 BiConsumer<R, R> combiner) { 468 BinaryOperator<R> operator = (left, right) -> { 469 combiner.accept(left, right); 470 return left; 471 }; 472 return evaluate(ReduceOps.makeInt(supplier, accumulator, operator)); 473 } 474 475 @Override 476 public final boolean anyMatch(IntPredicate predicate) { 477 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY)); 478 } 479 480 @Override 481 public final boolean allMatch(IntPredicate predicate) { 482 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL)); 483 } 484 485 @Override 486 public final boolean noneMatch(IntPredicate predicate) { 487 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE)); 488 } 489 490 @Override 491 public final OptionalInt findFirst() { 492 return evaluate(FindOps.makeInt(true)); 493 } 494 495 @Override 496 public final OptionalInt findAny() { 497 return evaluate(FindOps.makeInt(false)); 498 } 499 500 @Override 501 public final int[] toArray() { 502 return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new)) 503 .asPrimitiveArray(); 504 } 505 506 // 507 508 /** 509 * Source stage of an IntStream. 510 * 511 * @param type of elements in the upstream source 512 * @since 1.8 513 */ 514 static class Head<E_IN> extends IntPipeline<E_IN> { 515 /** 516 * Constructor for the source stage of an IntStream. 517 * 518 * @param source {@code Supplier<Spliterator>} describing the stream 519 * source 520 * @param sourceFlags the source flags for the stream source, described 521 * in {@link StreamOpFlag} 522 * @param parallel {@code true} if the pipeline is parallel 523 */ 524 Head(Supplier<? extends Spliterator<Integer>> source, 525 int sourceFlags, boolean parallel) { 526 super(source, sourceFlags, parallel); 527 } 528 529 /** 530 * Constructor for the source stage of an IntStream. 531 * 532 * @param source {@code Spliterator} describing the stream source 533 * @param sourceFlags the source flags for the stream source, described 534 * in {@link StreamOpFlag} 535 * @param parallel {@code true} if the pipeline is parallel 536 */ 537 Head(Spliterator<Integer> source, 538 int sourceFlags, boolean parallel) { 539 super(source, sourceFlags, parallel); 540 } 541 542 @Override 543 final boolean opIsStateful() { 544 throw new UnsupportedOperationException(); 545 } 546 547 @Override 548 final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) { 549 throw new UnsupportedOperationException(); 550 } 551 552 // Optimized sequential terminal operations for the head of the pipeline 553 554 @Override 555 public void forEach(IntConsumer action) { 556 if (!isParallel()) { 557 adapt(sourceStageSpliterator()).forEachRemaining(action); 558 } 559 else { 560 super.forEach(action); 561 } 562 } 563 564 @Override 565 public void forEachOrdered(IntConsumer action) { 566 if (!isParallel()) { 567 adapt(sourceStageSpliterator()).forEachRemaining(action); 568 } 569 else { 570 super.forEachOrdered(action); 571 } 572 } 573 } 574 575 /** 576 * Base class for a stateless intermediate stage of an IntStream 577 * 578 * @param type of elements in the upstream source 579 * @since 1.8 580 */ 581 abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> { 582 /** 583 * Construct a new IntStream by appending a stateless intermediate 584 * operation to an existing stream. 585 * @param upstream The upstream pipeline stage 586 * @param inputShape The stream shape for the upstream pipeline stage 587 * @param opFlags Operation flags for the new stage 588 */ 589 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 590 StreamShape inputShape, 591 int opFlags) { 592 super(upstream, opFlags); 593 assert upstream.getOutputShape() == inputShape; 594 } 595 596 @Override 597 final boolean opIsStateful() { 598 return false; 599 } 600 } 601 602 /** 603 * Base class for a stateful intermediate stage of an IntStream. 604 * 605 * @param type of elements in the upstream source 606 * @since 1.8 607 */ 608 abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> { 609 /** 610 * Construct a new IntStream by appending a stateful intermediate 611 * operation to an existing stream. 612 * @param upstream The upstream pipeline stage 613 * @param inputShape The stream shape for the upstream pipeline stage 614 * @param opFlags Operation flags for the new stage 615 */ 616 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 617 StreamShape inputShape, 618 int opFlags) { 619 super(upstream, opFlags); 620 assert upstream.getOutputShape() == inputShape; 621 } 622 623 @Override 624 final boolean opIsStateful() { 625 return true; 626 } 627 628 @Override 629 abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 630 Spliterator<P_IN> spliterator, 631 IntFunction<Integer[]> generator); 632 } 633} 634