1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Spliterator;
28import java.util.concurrent.CountedCompleter;
29import java.util.concurrent.ForkJoinPool;
30
31/**
32 * Abstract base class for most fork-join tasks used to implement stream ops.
33 * Manages splitting logic, tracking of child tasks, and intermediate results.
34 * Each task is associated with a {@link Spliterator} that describes the portion
35 * of the input associated with the subtree rooted at this task.
36 * Tasks may be leaf nodes (which will traverse the elements of
37 * the {@code Spliterator}) or internal nodes (which split the
38 * {@code Spliterator} into multiple child tasks).
39 *
40 * @implNote
41 * <p>This class is based on {@link CountedCompleter}, a form of fork-join task
42 * where each task has a semaphore-like count of uncompleted children, and the
43 * task is implicitly completed and notified when its last child completes.
44 * Internal node tasks will likely override the {@code onCompletion} method from
45 * {@code CountedCompleter} to merge the results from child tasks into the
46 * current task's result.
47 *
48 * <p>Splitting and setting up the child task links is done by {@code compute()}
49 * for internal nodes.  At {@code compute()} time for leaf nodes, it is
50 * guaranteed that the parent's child-related fields (including sibling links
51 * for the parent's children) will be set up for all children.
52 *
53 * <p>For example, a task that performs a reduce would override {@code doLeaf()}
54 * to perform a reduction on that leaf node's chunk using the
55 * {@code Spliterator}, and override {@code onCompletion()} to merge the results
56 * of the child tasks for internal nodes:
57 *
58 * <pre>{@code
59 *     protected S doLeaf() {
60 *         spliterator.forEach(...);
61 *         return localReductionResult;
62 *     }
63 *
64 *     public void onCompletion(CountedCompleter caller) {
65 *         if (!isLeaf()) {
66 *             ReduceTask<P_IN, P_OUT, T, R> child = children;
67 *             R result = child.getLocalResult();
68 *             child = child.nextSibling;
69 *             for (; child != null; child = child.nextSibling)
70 *                 result = combine(result, child.getLocalResult());
71 *             setLocalResult(result);
72 *         }
73 *     }
74 * }</pre>
75 *
76 * <p>Serialization is not supported as there is no intention to serialize
77 * tasks managed by stream ops.
78 *
79 * @param  Type of elements input to the pipeline
80 * @param  Type of elements output from the pipeline
81 * @param <R> Type of intermediate result, which may be different from operation
82 *        result type
83 * @param <K> Type of parent, child and sibling tasks
84 * @since 1.8
85 */
86@SuppressWarnings("serial")
87abstract class AbstractTask<P_IN, P_OUT, R,
88                            K extends AbstractTask<P_IN, P_OUT, R, K>>
89        extends CountedCompleter<R> {
90
91    /**
92     * Default target factor of leaf tasks for parallel decomposition.
93     * To allow load balancing, we over-partition, currently to approximately
94     * four tasks per processor, which enables others to help out
95     * if leaf tasks are uneven or some processors are otherwise busy.
96     */
97    static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
98
99    /** The pipeline helper, common to all tasks in a computation */
100    protected final PipelineHelper<P_OUT> helper;
101
102    /**
103     * The spliterator for the portion of the input associated with the subtree
104     * rooted at this task
105     */
106    protected Spliterator<P_IN> spliterator;
107
108    /** Target leaf size, common to all tasks in a computation */
109    protected long targetSize; // may be laziliy initialized
110
111    /**
112     * The left child.
113     * null if no children
114     * if non-null rightChild is non-null
115     */
116    protected K leftChild;
117
118    /**
119     * The right child.
120     * null if no children
121     * if non-null leftChild is non-null
122     */
123    protected K rightChild;
124
125    /** The result of this node, if completed */
126    private R localResult;
127
128    /**
129     * Constructor for root nodes.
130     *
131     * @param helper The {@code PipelineHelper} describing the stream pipeline
132     *               up to this operation
133     * @param spliterator The {@code Spliterator} describing the source for this
134     *                    pipeline
135     */
136    protected AbstractTask(PipelineHelper<P_OUT> helper,
137                           Spliterator<P_IN> spliterator) {
138        super(null);
139        this.helper = helper;
140        this.spliterator = spliterator;
141        this.targetSize = 0L;
142    }
143
144    /**
145     * Constructor for non-root nodes.
146     *
147     * @param parent this node's parent task
148     * @param spliterator {@code Spliterator} describing the subtree rooted at
149     *        this node, obtained by splitting the parent {@code Spliterator}
150     */
151    protected AbstractTask(K parent,
152                           Spliterator<P_IN> spliterator) {
153        super(parent);
154        this.spliterator = spliterator;
155        this.helper = parent.helper;
156        this.targetSize = parent.targetSize;
157    }
158
159    /**
160     * Constructs a new node of type T whose parent is the receiver; must call
161     * the AbstractTask(T, Spliterator) constructor with the receiver and the
162     * provided Spliterator.
163     *
164     * @param spliterator {@code Spliterator} describing the subtree rooted at
165     *        this node, obtained by splitting the parent {@code Spliterator}
166     * @return newly constructed child node
167     */
168    protected abstract K makeChild(Spliterator<P_IN> spliterator);
169
170    /**
171     * Computes the result associated with a leaf node.  Will be called by
172     * {@code compute()} and the result passed to @{code setLocalResult()}
173     *
174     * @return the computed result of a leaf node
175     */
176    protected abstract R doLeaf();
177
178    /**
179     * Returns a suggested target leaf size based on the initial size estimate.
180     *
181     * @return suggested target leaf size
182     */
183    public static long suggestTargetSize(long sizeEstimate) {
184        long est = sizeEstimate / LEAF_TARGET;
185        return est > 0L ? est : 1L;
186    }
187
188    /**
189     * Returns the targetSize, initializing it via the supplied
190     * size estimate if not already initialized.
191     */
192    protected final long getTargetSize(long sizeEstimate) {
193        long s;
194        return ((s = targetSize) != 0 ? s :
195                (targetSize = suggestTargetSize(sizeEstimate)));
196    }
197
198    /**
199     * Returns the local result, if any. Subclasses should use
200     * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage
201     * results.  This returns the local result so that calls from within the
202     * fork-join framework will return the correct result.
203     *
204     * @return local result for this node previously stored with
205     * {@link #setLocalResult}
206     */
207    @Override
208    public R getRawResult() {
209        return localResult;
210    }
211
212    /**
213     * Does nothing; instead, subclasses should use
214     * {@link #setLocalResult(Object)}} to manage results.
215     *
216     * @param result must be null, or an exception is thrown (this is a safety
217     *        tripwire to detect when {@code setRawResult()} is being used
218     *        instead of {@code setLocalResult()}
219     */
220    @Override
221    protected void setRawResult(R result) {
222        if (result != null)
223            throw new IllegalStateException();
224    }
225
226    /**
227     * Retrieves a result previously stored with {@link #setLocalResult}
228     *
229     * @return local result for this node previously stored with
230     * {@link #setLocalResult}
231     */
232    protected R getLocalResult() {
233        return localResult;
234    }
235
236    /**
237     * Associates the result with the task, can be retrieved with
238     * {@link #getLocalResult}
239     *
240     * @param localResult local result for this node
241     */
242    protected void setLocalResult(R localResult) {
243        this.localResult = localResult;
244    }
245
246    /**
247     * Indicates whether this task is a leaf node.  (Only valid after
248     * {@link #compute} has been called on this node).  If the node is not a
249     * leaf node, then children will be non-null and numChildren will be
250     * positive.
251     *
252     * @return {@code true} if this task is a leaf node
253     */
254    protected boolean isLeaf() {
255        return leftChild == null;
256    }
257
258    /**
259     * Indicates whether this task is the root node
260     *
261     * @return {@code true} if this task is the root node.
262     */
263    protected boolean isRoot() {
264        return getParent() == null;
265    }
266
267    /**
268     * Returns the parent of this task, or null if this task is the root
269     *
270     * @return the parent of this task, or null if this task is the root
271     */
272    @SuppressWarnings("unchecked")
273    protected K getParent() {
274        return (K) getCompleter();
275    }
276
277    /**
278     * Decides whether or not to split a task further or compute it
279     * directly. If computing directly, calls {@code doLeaf} and pass
280     * the result to {@code setRawResult}. Otherwise splits off
281     * subtasks, forking one and continuing as the other.
282     *
283     * <p> The method is structured to conserve resources across a
284     * range of uses.  The loop continues with one of the child tasks
285     * when split, to avoid deep recursion. To cope with spliterators
286     * that may be systematically biased toward left-heavy or
287     * right-heavy splits, we alternate which child is forked versus
288     * continued in the loop.
289     */
290    @Override
291    public void compute() {
292        Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
293        long sizeEstimate = rs.estimateSize();
294        long sizeThreshold = getTargetSize(sizeEstimate);
295        boolean forkRight = false;
296        @SuppressWarnings("unchecked") K task = (K) this;
297        while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
298            K leftChild, rightChild, taskToFork;
299            task.leftChild  = leftChild = task.makeChild(ls);
300            task.rightChild = rightChild = task.makeChild(rs);
301            task.setPendingCount(1);
302            if (forkRight) {
303                forkRight = false;
304                rs = ls;
305                task = leftChild;
306                taskToFork = rightChild;
307            }
308            else {
309                forkRight = true;
310                task = rightChild;
311                taskToFork = leftChild;
312            }
313            taskToFork.fork();
314            sizeEstimate = rs.estimateSize();
315        }
316        task.setLocalResult(task.doLeaf());
317        task.tryComplete();
318    }
319
320    /**
321     * {@inheritDoc}
322     *
323     * @implNote
324     * Clears spliterator and children fields.  Overriders MUST call
325     * {@code super.onCompletion} as the last thing they do if they want these
326     * cleared.
327     */
328    @Override
329    public void onCompletion(CountedCompleter<?> caller) {
330        spliterator = null;
331        leftChild = rightChild = null;
332    }
333
334    /**
335     * Returns whether this node is a "leftmost" node -- whether the path from
336     * the root to this node involves only traversing leftmost child links.  For
337     * a leaf node, this means it is the first leaf node in the encounter order.
338     *
339     * @return {@code true} if this node is a "leftmost" node
340     */
341    protected boolean isLeftmostNode() {
342        @SuppressWarnings("unchecked")
343        K node = (K) this;
344        while (node != null) {
345            K parent = node.getParent();
346            if (parent != null && parent.leftChild != node)
347                return false;
348            node = parent;
349        }
350        return true;
351    }
352}
353