FindOps.java revision ff18b5f136f92154f2e05217e3953d10f459e561
1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Optional;
28import java.util.OptionalDouble;
29import java.util.OptionalInt;
30import java.util.OptionalLong;
31import java.util.Spliterator;
32import java.util.concurrent.CountedCompleter;
33import java.util.function.Predicate;
34import java.util.function.Supplier;
35
36/**
37 * Factory for instances of a short-circuiting {@code TerminalOp} that searches
38 * for an element in a stream pipeline, and terminates when it finds one.
39 * Supported variants include find-first (find the first element in the
40 * encounter order) and find-any (find any element, may not be the first in
41 * encounter order.)
42 *
43 * @since 1.8
44 */
45final class FindOps {
46
47    private FindOps() { }
48
49    /**
50     * Constructs a {@code TerminalOp} for streams of objects.
51     *
52     * @param <T> the type of elements of the stream
53     * @param mustFindFirst whether the {@code TerminalOp} must produce the
54     *        first element in the encounter order
55     * @return a {@code TerminalOp} implementing the find operation
56     */
57    public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {
58        return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(),
59                            Optional::isPresent, FindSink.OfRef::new);
60    }
61
62    /**
63     * Constructs a {@code TerminalOp} for streams of ints.
64     *
65     * @param mustFindFirst whether the {@code TerminalOp} must produce the
66     *        first element in the encounter order
67     * @return a {@code TerminalOp} implementing the find operation
68     */
69    public static TerminalOp<Integer, OptionalInt> makeInt(boolean mustFindFirst) {
70        return new FindOp<>(mustFindFirst, StreamShape.INT_VALUE, OptionalInt.empty(),
71                            OptionalInt::isPresent, FindSink.OfInt::new);
72    }
73
74    /**
75     * Constructs a {@code TerminalOp} for streams of longs.
76     *
77     * @param mustFindFirst whether the {@code TerminalOp} must produce the
78     *        first element in the encounter order
79     * @return a {@code TerminalOp} implementing the find operation
80     */
81    public static TerminalOp<Long, OptionalLong> makeLong(boolean mustFindFirst) {
82        return new FindOp<>(mustFindFirst, StreamShape.LONG_VALUE, OptionalLong.empty(),
83                            OptionalLong::isPresent, FindSink.OfLong::new);
84    }
85
86    /**
87     * Constructs a {@code FindOp} for streams of doubles.
88     *
89     * @param mustFindFirst whether the {@code TerminalOp} must produce the
90     *        first element in the encounter order
91     * @return a {@code TerminalOp} implementing the find operation
92     */
93    public static TerminalOp<Double, OptionalDouble> makeDouble(boolean mustFindFirst) {
94        return new FindOp<>(mustFindFirst, StreamShape.DOUBLE_VALUE, OptionalDouble.empty(),
95                            OptionalDouble::isPresent, FindSink.OfDouble::new);
96    }
97
98    /**
99     * A short-circuiting {@code TerminalOp} that searches for an element in a
100     * stream pipeline, and terminates when it finds one.  Implements both
101     * find-first (find the first element in the encounter order) and find-any
102     * (find any element, may not be the first in encounter order.)
103     *
104     * @param <T> the output type of the stream pipeline
105     * @param <O> the result type of the find operation, typically an optional
106     *        type
107     */
108    private static final class FindOp<T, O> implements TerminalOp<T, O> {
109        private final StreamShape shape;
110        final boolean mustFindFirst;
111        final O emptyValue;
112        final Predicate<O> presentPredicate;
113        final Supplier<TerminalSink<T, O>> sinkSupplier;
114
115        /**
116         * Constructs a {@code FindOp}.
117         *
118         * @param mustFindFirst if true, must find the first element in
119         *        encounter order, otherwise can find any element
120         * @param shape stream shape of elements to search
121         * @param emptyValue result value corresponding to "found nothing"
122         * @param presentPredicate {@code Predicate} on result value
123         *        corresponding to "found something"
124         * @param sinkSupplier supplier for a {@code TerminalSink} implementing
125         *        the matching functionality
126         */
127        FindOp(boolean mustFindFirst,
128                       StreamShape shape,
129                       O emptyValue,
130                       Predicate<O> presentPredicate,
131                       Supplier<TerminalSink<T, O>> sinkSupplier) {
132            this.mustFindFirst = mustFindFirst;
133            this.shape = shape;
134            this.emptyValue = emptyValue;
135            this.presentPredicate = presentPredicate;
136            this.sinkSupplier = sinkSupplier;
137        }
138
139        @Override
140        public int getOpFlags() {
141            return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
142        }
143
144        @Override
145        public StreamShape inputShape() {
146            return shape;
147        }
148
149        @Override
150        public <S> O evaluateSequential(PipelineHelper<T> helper,
151                                        Spliterator<S> spliterator) {
152            O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
153            return result != null ? result : emptyValue;
154        }
155
156        @Override
157        public <P_IN> O evaluateParallel(PipelineHelper<T> helper,
158                                         Spliterator<P_IN> spliterator) {
159            return new FindTask<>(this, helper, spliterator).invoke();
160        }
161    }
162
163    /**
164     * Implementation of @{code TerminalSink} that implements the find
165     * functionality, requesting cancellation when something has been found
166     *
167     * @param <T> The type of input element
168     * @param <O> The result type, typically an optional type
169     */
170    private static abstract class FindSink<T, O> implements TerminalSink<T, O> {
171        boolean hasValue;
172        T value;
173
174        FindSink() {} // Avoid creation of special accessor
175
176        @Override
177        public void accept(T value) {
178            if (!hasValue) {
179                hasValue = true;
180                this.value = value;
181            }
182        }
183
184        @Override
185        public boolean cancellationRequested() {
186            return hasValue;
187        }
188
189        /** Specialization of {@code FindSink} for reference streams */
190        static final class OfRef<T> extends FindSink<T, Optional<T>> {
191            @Override
192            public Optional<T> get() {
193                return hasValue ? Optional.of(value) : null;
194            }
195        }
196
197        /** Specialization of {@code FindSink} for int streams */
198        static final class OfInt extends FindSink<Integer, OptionalInt>
199                implements Sink.OfInt {
200            @Override
201            public void accept(int value) {
202                // Boxing is OK here, since few values will actually flow into the sink
203                accept((Integer) value);
204            }
205
206            @Override
207            public OptionalInt get() {
208                return hasValue ? OptionalInt.of(value) : null;
209            }
210        }
211
212        /** Specialization of {@code FindSink} for long streams */
213        static final class OfLong extends FindSink<Long, OptionalLong>
214                implements Sink.OfLong {
215            @Override
216            public void accept(long value) {
217                // Boxing is OK here, since few values will actually flow into the sink
218                accept((Long) value);
219            }
220
221            @Override
222            public OptionalLong get() {
223                return hasValue ? OptionalLong.of(value) : null;
224            }
225        }
226
227        /** Specialization of {@code FindSink} for double streams */
228        static final class OfDouble extends FindSink<Double, OptionalDouble>
229                implements Sink.OfDouble {
230            @Override
231            public void accept(double value) {
232                // Boxing is OK here, since few values will actually flow into the sink
233                accept((Double) value);
234            }
235
236            @Override
237            public OptionalDouble get() {
238                return hasValue ? OptionalDouble.of(value) : null;
239            }
240        }
241    }
242
243    /**
244     * {@code ForkJoinTask} implementing parallel short-circuiting search
245     * @param  Input element type to the stream pipeline
246     * @param  Output element type from the stream pipeline
247     * @param <O> Result type from the find operation
248     */
249    @SuppressWarnings("serial")
250    private static final class FindTask<P_IN, P_OUT, O>
251            extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> {
252        private final FindOp<P_OUT, O> op;
253
254        FindTask(FindOp<P_OUT, O> op,
255                 PipelineHelper<P_OUT> helper,
256                 Spliterator<P_IN> spliterator) {
257            super(helper, spliterator);
258            this.op = op;
259        }
260
261        FindTask(FindTask<P_IN, P_OUT, O> parent, Spliterator<P_IN> spliterator) {
262            super(parent, spliterator);
263            this.op = parent.op;
264        }
265
266        @Override
267        protected FindTask<P_IN, P_OUT, O> makeChild(Spliterator<P_IN> spliterator) {
268            return new FindTask<>(this, spliterator);
269        }
270
271        @Override
272        protected O getEmptyResult() {
273            return op.emptyValue;
274        }
275
276        private void foundResult(O answer) {
277            if (isLeftmostNode())
278                shortCircuit(answer);
279            else
280                cancelLaterNodes();
281        }
282
283        @Override
284        protected O doLeaf() {
285            O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get();
286            if (!op.mustFindFirst) {
287                if (result != null)
288                    shortCircuit(result);
289                return null;
290            }
291            else {
292                if (result != null) {
293                    foundResult(result);
294                    return result;
295                }
296                else
297                    return null;
298            }
299        }
300
301        @Override
302        public void onCompletion(CountedCompleter<?> caller) {
303            if (op.mustFindFirst) {
304                    for (FindTask<P_IN, P_OUT, O> child = leftChild, p = null; child != p;
305                         p = child, child = rightChild) {
306                    O result = child.getLocalResult();
307                    if (result != null && op.presentPredicate.test(result)) {
308                        setLocalResult(result);
309                        foundResult(result);
310                        break;
311                    }
312                }
313            }
314            super.onCompletion(caller);
315        }
316    }
317}
318
319