Sink.java revision 289e51c2258b001f2aa6d6e67b21be7bb65d5102
1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Objects;
28import java.util.function.Consumer;
29import java.util.function.DoubleConsumer;
30import java.util.function.IntConsumer;
31import java.util.function.LongConsumer;
32
33/**
34 * An extension of {@link Consumer} used to conduct values through the stages of
35 * a stream pipeline, with additional methods to manage size information,
36 * control flow, etc.  Before calling the {@code accept()} method on a
37 * {@code Sink} for the first time, you must first call the {@code begin()}
38 * method to inform it that data is coming (optionally informing the sink how
39 * much data is coming), and after all data has been sent, you must call the
40 * {@code end()} method.  After calling {@code end()}, you should not call
41 * {@code accept()} without again calling {@code begin()}.  {@code Sink} also
42 * offers a mechanism by which the sink can cooperatively signal that it does
43 * not wish to receive any more data (the {@code cancellationRequested()}
44 * method), which a source can poll before sending more data to the
45 * {@code Sink}.
46 *
47 * <p>A sink may be in one of two states: an initial state and an active state.
48 * It starts out in the initial state; the {@code begin()} method transitions
49 * it to the active state, and the {@code end()} method transitions it back into
50 * the initial state, where it can be re-used.  Data-accepting methods (such as
51 * {@code accept()} are only valid in the active state.
52 *
53 * @apiNote
54 * A stream pipeline consists of a source, zero or more intermediate stages
55 * (such as filtering or mapping), and a terminal stage, such as reduction or
56 * for-each.  For concreteness, consider the pipeline:
57 *
58 * <pre>{@code
59 *     int longestStringLengthStartingWithA
60 *         = strings.stream()
61 *                  .filter(s -> s.startsWith("A"))
62 *                  .mapToInt(String::length)
63 *                  .max();
64 * }</pre>
65 *
66 * <p>Here, we have three stages, filtering, mapping, and reducing.  The
67 * filtering stage consumes strings and emits a subset of those strings; the
68 * mapping stage consumes strings and emits ints; the reduction stage consumes
69 * those ints and computes the maximal value.
70 *
71 * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
72 * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
73 * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
74 * not need a specialized interface for each primitive specialization.  (It
75 * might be called a "kitchen sink" for this omnivorous tendency.)  The entry
76 * point to the pipeline is the {@code Sink} for the filtering stage, which
77 * sends some elements "downstream" -- into the {@code Sink} for the mapping
78 * stage, which in turn sends integral values downstream into the {@code Sink}
79 * for the reduction stage. The {@code Sink} implementations associated with a
80 * given stage is expected to know the data type for the next stage, and call
81 * the correct {@code accept} method on its downstream {@code Sink}.  Similarly,
82 * each stage must implement the correct {@code accept} method corresponding to
83 * the data type it accepts.
84 *
85 * <p>The specialized subtypes such as {@link Sink.OfInt} override
86 * {@code accept(Object)} to call the appropriate primitive specialization of
87 * {@code accept}, implement the appropriate primitive specialization of
88 * {@code Consumer}, and re-abstract the appropriate primitive specialization of
89 * {@code accept}.
90 *
91 * <p>The chaining subtypes such as {@link ChainedInt} not only implement
92 * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
93 * represents the downstream {@code Sink}, and implement the methods
94 * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
95 * delegate to the downstream {@code Sink}.  Most implementations of
96 * intermediate operations will use these chaining wrappers.  For example, the
97 * mapping stage in the above example would look like:
98 *
99 * <pre>{@code
100 *     IntSink is = new Sink.ChainedReference<U>(sink) {
101 *         public void accept(U u) {
102 *             downstream.accept(mapper.applyAsInt(u));
103 *         }
104 *     };
105 * }</pre>
106 *
107 * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
108 * to receive elements of type {@code U} as input, and pass the downstream sink
109 * to the constructor.  Because the next stage expects to receive integers, we
110 * must call the {@code accept(int)} method when emitting values to the downstream.
111 * The {@code accept()} method applies the mapping function from {@code U} to
112 * {@code int} and passes the resulting value to the downstream {@code Sink}.
113 *
114 * @param <T> type of elements for value streams
115 * @since 1.8
116 * @hide Visible for CTS testing only (OpenJDK8 tests).
117 */
118public interface Sink<T> extends Consumer<T> {
119    /**
120     * Resets the sink state to receive a fresh data set.  This must be called
121     * before sending any data to the sink.  After calling {@link #end()},
122     * you may call this method to reset the sink for another calculation.
123     * @param size The exact size of the data to be pushed downstream, if
124     * known or {@code -1} if unknown or infinite.
125     *
126     * <p>Prior to this call, the sink must be in the initial state, and after
127     * this call it is in the active state.
128     */
129    default void begin(long size) {}
130
131    /**
132     * Indicates that all elements have been pushed.  If the {@code Sink} is
133     * stateful, it should send any stored state downstream at this time, and
134     * should clear any accumulated state (and associated resources).
135     *
136     * <p>Prior to this call, the sink must be in the active state, and after
137     * this call it is returned to the initial state.
138     */
139    default void end() {}
140
141    /**
142     * Indicates that this {@code Sink} does not wish to receive any more data.
143     *
144     * @implSpec The default implementation always returns false.
145     *
146     * @return true if cancellation is requested
147     */
148    default boolean cancellationRequested() {
149        return false;
150    }
151
152    /**
153     * Accepts an int value.
154     *
155     * @implSpec The default implementation throws IllegalStateException.
156     *
157     * @throws IllegalStateException if this sink does not accept int values
158     */
159    default void accept(int value) {
160        throw new IllegalStateException("called wrong accept method");
161    }
162
163    /**
164     * Accepts a long value.
165     *
166     * @implSpec The default implementation throws IllegalStateException.
167     *
168     * @throws IllegalStateException if this sink does not accept long values
169     */
170    default void accept(long value) {
171        throw new IllegalStateException("called wrong accept method");
172    }
173
174    /**
175     * Accepts a double value.
176     *
177     * @implSpec The default implementation throws IllegalStateException.
178     *
179     * @throws IllegalStateException if this sink does not accept double values
180     */
181    default void accept(double value) {
182        throw new IllegalStateException("called wrong accept method");
183    }
184
185    /**
186     * {@code Sink} that implements {@code Sink<Integer>}, re-abstracts
187     * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to
188     * {@code accept(int)}.
189     */
190    interface OfInt extends Sink<Integer>, IntConsumer {
191        @Override
192        void accept(int value);
193
194        @Override
195        default void accept(Integer i) {
196            if (Tripwire.ENABLED)
197                Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
198            accept(i.intValue());
199        }
200    }
201
202    /**
203     * {@code Sink} that implements {@code Sink<Long>}, re-abstracts
204     * {@code accept(long)}, and wires {@code accept(Long)} to bridge to
205     * {@code accept(long)}.
206     */
207    interface OfLong extends Sink<Long>, LongConsumer {
208        @Override
209        void accept(long value);
210
211        @Override
212        default void accept(Long i) {
213            if (Tripwire.ENABLED)
214                Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
215            accept(i.longValue());
216        }
217    }
218
219    /**
220     * {@code Sink} that implements {@code Sink<Double>}, re-abstracts
221     * {@code accept(double)}, and wires {@code accept(Double)} to bridge to
222     * {@code accept(double)}.
223     */
224    interface OfDouble extends Sink<Double>, DoubleConsumer {
225        @Override
226        void accept(double value);
227
228        @Override
229        default void accept(Double i) {
230            if (Tripwire.ENABLED)
231                Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
232            accept(i.doubleValue());
233        }
234    }
235
236    /**
237     * Abstract {@code Sink} implementation for creating chains of
238     * sinks.  The {@code begin}, {@code end}, and
239     * {@code cancellationRequested} methods are wired to chain to the
240     * downstream {@code Sink}.  This implementation takes a downstream
241     * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
242     * implementation of the {@code accept()} method must call the correct
243     * {@code accept()} method on the downstream {@code Sink}.
244     */
245    static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
246        protected final Sink<? super E_OUT> downstream;
247
248        public ChainedReference(Sink<? super E_OUT> downstream) {
249            this.downstream = Objects.requireNonNull(downstream);
250        }
251
252        @Override
253        public void begin(long size) {
254            downstream.begin(size);
255        }
256
257        @Override
258        public void end() {
259            downstream.end();
260        }
261
262        @Override
263        public boolean cancellationRequested() {
264            return downstream.cancellationRequested();
265        }
266    }
267
268    /**
269     * Abstract {@code Sink} implementation designed for creating chains of
270     * sinks.  The {@code begin}, {@code end}, and
271     * {@code cancellationRequested} methods are wired to chain to the
272     * downstream {@code Sink}.  This implementation takes a downstream
273     * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}.
274     * The implementation of the {@code accept()} method must call the correct
275     * {@code accept()} method on the downstream {@code Sink}.
276     */
277    static abstract class ChainedInt<E_OUT> implements Sink.OfInt {
278        protected final Sink<? super E_OUT> downstream;
279
280        public ChainedInt(Sink<? super E_OUT> downstream) {
281            this.downstream = Objects.requireNonNull(downstream);
282        }
283
284        @Override
285        public void begin(long size) {
286            downstream.begin(size);
287        }
288
289        @Override
290        public void end() {
291            downstream.end();
292        }
293
294        @Override
295        public boolean cancellationRequested() {
296            return downstream.cancellationRequested();
297        }
298    }
299
300    /**
301     * Abstract {@code Sink} implementation designed for creating chains of
302     * sinks.  The {@code begin}, {@code end}, and
303     * {@code cancellationRequested} methods are wired to chain to the
304     * downstream {@code Sink}.  This implementation takes a downstream
305     * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}.
306     * The implementation of the {@code accept()} method must call the correct
307     * {@code accept()} method on the downstream {@code Sink}.
308     */
309    static abstract class ChainedLong<E_OUT> implements Sink.OfLong {
310        protected final Sink<? super E_OUT> downstream;
311
312        public ChainedLong(Sink<? super E_OUT> downstream) {
313            this.downstream = Objects.requireNonNull(downstream);
314        }
315
316        @Override
317        public void begin(long size) {
318            downstream.begin(size);
319        }
320
321        @Override
322        public void end() {
323            downstream.end();
324        }
325
326        @Override
327        public boolean cancellationRequested() {
328            return downstream.cancellationRequested();
329        }
330    }
331
332    /**
333     * Abstract {@code Sink} implementation designed for creating chains of
334     * sinks.  The {@code begin}, {@code end}, and
335     * {@code cancellationRequested} methods are wired to chain to the
336     * downstream {@code Sink}.  This implementation takes a downstream
337     * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
338     * The implementation of the {@code accept()} method must call the correct
339     * {@code accept()} method on the downstream {@code Sink}.
340     */
341    static abstract class ChainedDouble<E_OUT> implements Sink.OfDouble {
342        protected final Sink<? super E_OUT> downstream;
343
344        public ChainedDouble(Sink<? super E_OUT> downstream) {
345            this.downstream = Objects.requireNonNull(downstream);
346        }
347
348        @Override
349        public void begin(long size) {
350            downstream.begin(size);
351        }
352
353        @Override
354        public void end() {
355            downstream.end();
356        }
357
358        @Override
359        public boolean cancellationRequested() {
360            return downstream.cancellationRequested();
361        }
362    }
363}
364