Sink.java revision d0a2645e29a9b84d7e5ec822eb9904e93bd6c013
1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Objects;
28import java.util.function.Consumer;
29import java.util.function.DoubleConsumer;
30import java.util.function.IntConsumer;
31import java.util.function.LongConsumer;
32
33/**
34 * An extension of {@link Consumer} used to conduct values through the stages of
35 * a stream pipeline, with additional methods to manage size information,
36 * control flow, etc.  Before calling the {@code accept()} method on a
37 * {@code Sink} for the first time, you must first call the {@code begin()}
38 * method to inform it that data is coming (optionally informing the sink how
39 * much data is coming), and after all data has been sent, you must call the
40 * {@code end()} method.  After calling {@code end()}, you should not call
41 * {@code accept()} without again calling {@code begin()}.  {@code Sink} also
42 * offers a mechanism by which the sink can cooperatively signal that it does
43 * not wish to receive any more data (the {@code cancellationRequested()}
44 * method), which a source can poll before sending more data to the
45 * {@code Sink}.
46 *
47 * <p>A sink may be in one of two states: an initial state and an active state.
48 * It starts out in the initial state; the {@code begin()} method transitions
49 * it to the active state, and the {@code end()} method transitions it back into
50 * the initial state, where it can be re-used.  Data-accepting methods (such as
51 * {@code accept()} are only valid in the active state.
52 *
53 * @apiNote
54 * A stream pipeline consists of a source, zero or more intermediate stages
55 * (such as filtering or mapping), and a terminal stage, such as reduction or
56 * for-each.  For concreteness, consider the pipeline:
57 *
58 * <pre>{@code
59 *     int longestStringLengthStartingWithA
60 *         = strings.stream()
61 *                  .filter(s -> s.startsWith("A"))
62 *                  .mapToInt(String::length)
63 *                  .max();
64 * }</pre>
65 *
66 * <p>Here, we have three stages, filtering, mapping, and reducing.  The
67 * filtering stage consumes strings and emits a subset of those strings; the
68 * mapping stage consumes strings and emits ints; the reduction stage consumes
69 * those ints and computes the maximal value.
70 *
71 * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
72 * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
73 * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
74 * not need a specialized interface for each primitive specialization.  (It
75 * might be called a "kitchen sink" for this omnivorous tendency.)  The entry
76 * point to the pipeline is the {@code Sink} for the filtering stage, which
77 * sends some elements "downstream" -- into the {@code Sink} for the mapping
78 * stage, which in turn sends integral values downstream into the {@code Sink}
79 * for the reduction stage. The {@code Sink} implementations associated with a
80 * given stage is expected to know the data type for the next stage, and call
81 * the correct {@code accept} method on its downstream {@code Sink}.  Similarly,
82 * each stage must implement the correct {@code accept} method corresponding to
83 * the data type it accepts.
84 *
85 * <p>The specialized subtypes such as {@link Sink.OfInt} override
86 * {@code accept(Object)} to call the appropriate primitive specialization of
87 * {@code accept}, implement the appropriate primitive specialization of
88 * {@code Consumer}, and re-abstract the appropriate primitive specialization of
89 * {@code accept}.
90 *
91 * <p>The chaining subtypes such as {@link ChainedInt} not only implement
92 * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
93 * represents the downstream {@code Sink}, and implement the methods
94 * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
95 * delegate to the downstream {@code Sink}.  Most implementations of
96 * intermediate operations will use these chaining wrappers.  For example, the
97 * mapping stage in the above example would look like:
98 *
99 * <pre>{@code
100 *     IntSink is = new Sink.ChainedReference<U>(sink) {
101 *         public void accept(U u) {
102 *             downstream.accept(mapper.applyAsInt(u));
103 *         }
104 *     };
105 * }</pre>
106 *
107 * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
108 * to receive elements of type {@code U} as input, and pass the downstream sink
109 * to the constructor.  Because the next stage expects to receive integers, we
110 * must call the {@code accept(int)} method when emitting values to the downstream.
111 * The {@code accept()} method applies the mapping function from {@code U} to
112 * {@code int} and passes the resulting value to the downstream {@code Sink}.
113 *
114 * @param <T> type of elements for value streams
115 * @since 1.8
116 */
117interface Sink<T> extends Consumer<T> {
118    /**
119     * Resets the sink state to receive a fresh data set.  This must be called
120     * before sending any data to the sink.  After calling {@link #end()},
121     * you may call this method to reset the sink for another calculation.
122     * @param size The exact size of the data to be pushed downstream, if
123     * known or {@code -1} if unknown or infinite.
124     *
125     * <p>Prior to this call, the sink must be in the initial state, and after
126     * this call it is in the active state.
127     */
128    default void begin(long size) {}
129
130    /**
131     * Indicates that all elements have been pushed.  If the {@code Sink} is
132     * stateful, it should send any stored state downstream at this time, and
133     * should clear any accumulated state (and associated resources).
134     *
135     * <p>Prior to this call, the sink must be in the active state, and after
136     * this call it is returned to the initial state.
137     */
138    default void end() {}
139
140    /**
141     * Indicates that this {@code Sink} does not wish to receive any more data.
142     *
143     * @implSpec The default implementation always returns false.
144     *
145     * @return true if cancellation is requested
146     */
147    default boolean cancellationRequested() {
148        return false;
149    }
150
151    /**
152     * Accepts an int value.
153     *
154     * @implSpec The default implementation throws IllegalStateException.
155     *
156     * @throws IllegalStateException if this sink does not accept int values
157     */
158    default void accept(int value) {
159        throw new IllegalStateException("called wrong accept method");
160    }
161
162    /**
163     * Accepts a long value.
164     *
165     * @implSpec The default implementation throws IllegalStateException.
166     *
167     * @throws IllegalStateException if this sink does not accept long values
168     */
169    default void accept(long value) {
170        throw new IllegalStateException("called wrong accept method");
171    }
172
173    /**
174     * Accepts a double value.
175     *
176     * @implSpec The default implementation throws IllegalStateException.
177     *
178     * @throws IllegalStateException if this sink does not accept double values
179     */
180    default void accept(double value) {
181        throw new IllegalStateException("called wrong accept method");
182    }
183
184    /**
185     * {@code Sink} that implements {@code Sink<Integer>}, re-abstracts
186     * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to
187     * {@code accept(int)}.
188     */
189    interface OfInt extends Sink<Integer>, IntConsumer {
190        @Override
191        void accept(int value);
192
193        @Override
194        default void accept(Integer i) {
195            if (Tripwire.ENABLED)
196                Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
197            accept(i.intValue());
198        }
199    }
200
201    /**
202     * {@code Sink} that implements {@code Sink<Long>}, re-abstracts
203     * {@code accept(long)}, and wires {@code accept(Long)} to bridge to
204     * {@code accept(long)}.
205     */
206    interface OfLong extends Sink<Long>, LongConsumer {
207        @Override
208        void accept(long value);
209
210        @Override
211        default void accept(Long i) {
212            if (Tripwire.ENABLED)
213                Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
214            accept(i.longValue());
215        }
216    }
217
218    /**
219     * {@code Sink} that implements {@code Sink<Double>}, re-abstracts
220     * {@code accept(double)}, and wires {@code accept(Double)} to bridge to
221     * {@code accept(double)}.
222     */
223    interface OfDouble extends Sink<Double>, DoubleConsumer {
224        @Override
225        void accept(double value);
226
227        @Override
228        default void accept(Double i) {
229            if (Tripwire.ENABLED)
230                Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
231            accept(i.doubleValue());
232        }
233    }
234
235    /**
236     * Abstract {@code Sink} implementation for creating chains of
237     * sinks.  The {@code begin}, {@code end}, and
238     * {@code cancellationRequested} methods are wired to chain to the
239     * downstream {@code Sink}.  This implementation takes a downstream
240     * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
241     * implementation of the {@code accept()} method must call the correct
242     * {@code accept()} method on the downstream {@code Sink}.
243     */
244    static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
245        protected final Sink<? super E_OUT> downstream;
246
247        public ChainedReference(Sink<? super E_OUT> downstream) {
248            this.downstream = Objects.requireNonNull(downstream);
249        }
250
251        @Override
252        public void begin(long size) {
253            downstream.begin(size);
254        }
255
256        @Override
257        public void end() {
258            downstream.end();
259        }
260
261        @Override
262        public boolean cancellationRequested() {
263            return downstream.cancellationRequested();
264        }
265    }
266
267    /**
268     * Abstract {@code Sink} implementation designed for creating chains of
269     * sinks.  The {@code begin}, {@code end}, and
270     * {@code cancellationRequested} methods are wired to chain to the
271     * downstream {@code Sink}.  This implementation takes a downstream
272     * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}.
273     * The implementation of the {@code accept()} method must call the correct
274     * {@code accept()} method on the downstream {@code Sink}.
275     */
276    static abstract class ChainedInt<E_OUT> implements Sink.OfInt {
277        protected final Sink<? super E_OUT> downstream;
278
279        public ChainedInt(Sink<? super E_OUT> downstream) {
280            this.downstream = Objects.requireNonNull(downstream);
281        }
282
283        @Override
284        public void begin(long size) {
285            downstream.begin(size);
286        }
287
288        @Override
289        public void end() {
290            downstream.end();
291        }
292
293        @Override
294        public boolean cancellationRequested() {
295            return downstream.cancellationRequested();
296        }
297    }
298
299    /**
300     * Abstract {@code Sink} implementation designed for creating chains of
301     * sinks.  The {@code begin}, {@code end}, and
302     * {@code cancellationRequested} methods are wired to chain to the
303     * downstream {@code Sink}.  This implementation takes a downstream
304     * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}.
305     * The implementation of the {@code accept()} method must call the correct
306     * {@code accept()} method on the downstream {@code Sink}.
307     */
308    static abstract class ChainedLong<E_OUT> implements Sink.OfLong {
309        protected final Sink<? super E_OUT> downstream;
310
311        public ChainedLong(Sink<? super E_OUT> downstream) {
312            this.downstream = Objects.requireNonNull(downstream);
313        }
314
315        @Override
316        public void begin(long size) {
317            downstream.begin(size);
318        }
319
320        @Override
321        public void end() {
322            downstream.end();
323        }
324
325        @Override
326        public boolean cancellationRequested() {
327            return downstream.cancellationRequested();
328        }
329    }
330
331    /**
332     * Abstract {@code Sink} implementation designed for creating chains of
333     * sinks.  The {@code begin}, {@code end}, and
334     * {@code cancellationRequested} methods are wired to chain to the
335     * downstream {@code Sink}.  This implementation takes a downstream
336     * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
337     * The implementation of the {@code accept()} method must call the correct
338     * {@code accept()} method on the downstream {@code Sink}.
339     */
340    static abstract class ChainedDouble<E_OUT> implements Sink.OfDouble {
341        protected final Sink<? super E_OUT> downstream;
342
343        public ChainedDouble(Sink<? super E_OUT> downstream) {
344            this.downstream = Objects.requireNonNull(downstream);
345        }
346
347        @Override
348        public void begin(long size) {
349            downstream.begin(size);
350        }
351
352        @Override
353        public void end() {
354            downstream.end();
355        }
356
357        @Override
358        public boolean cancellationRequested() {
359            return downstream.cancellationRequested();
360        }
361    }
362}
363