1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Spliterator;
28import java.util.concurrent.atomic.AtomicReference;
29
30/**
31 * Abstract class for fork-join tasks used to implement short-circuiting
32 * stream ops, which can produce a result without processing all elements of the
33 * stream.
34 *
35 * @param  type of input elements to the pipeline
36 * @param  type of output elements from the pipeline
37 * @param <R> type of intermediate result, may be different from operation
38 *        result type
39 * @param <K> type of child and sibling tasks
40 * @since 1.8
41 */
42@SuppressWarnings("serial")
43abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,
44                                        K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>>
45        extends AbstractTask<P_IN, P_OUT, R, K> {
46    /**
47     * The result for this computation; this is shared among all tasks and set
48     * exactly once
49     */
50    protected final AtomicReference<R> sharedResult;
51
52    /**
53     * Indicates whether this task has been canceled.  Tasks may cancel other
54     * tasks in the computation under various conditions, such as in a
55     * find-first operation, a task that finds a value will cancel all tasks
56     * that are later in the encounter order.
57     */
58    protected volatile boolean canceled;
59
60    /**
61     * Constructor for root tasks.
62     *
63     * @param helper the {@code PipelineHelper} describing the stream pipeline
64     *               up to this operation
65     * @param spliterator the {@code Spliterator} describing the source for this
66     *                    pipeline
67     */
68    protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper,
69                                       Spliterator<P_IN> spliterator) {
70        super(helper, spliterator);
71        sharedResult = new AtomicReference<>(null);
72    }
73
74    /**
75     * Constructor for non-root nodes.
76     *
77     * @param parent parent task in the computation tree
78     * @param spliterator the {@code Spliterator} for the portion of the
79     *                    computation tree described by this task
80     */
81    protected AbstractShortCircuitTask(K parent,
82                                       Spliterator<P_IN> spliterator) {
83        super(parent, spliterator);
84        sharedResult = parent.sharedResult;
85    }
86
87    /**
88     * Returns the value indicating the computation completed with no task
89     * finding a short-circuitable result.  For example, for a "find" operation,
90     * this might be null or an empty {@code Optional}.
91     *
92     * @return the result to return when no task finds a result
93     */
94    protected abstract R getEmptyResult();
95
96    /**
97     * Overrides AbstractTask version to include checks for early
98     * exits while splitting or computing.
99     */
100    @Override
101    public void compute() {
102        Spliterator<P_IN> rs = spliterator, ls;
103        long sizeEstimate = rs.estimateSize();
104        long sizeThreshold = getTargetSize(sizeEstimate);
105        boolean forkRight = false;
106        @SuppressWarnings("unchecked") K task = (K) this;
107        AtomicReference<R> sr = sharedResult;
108        R result;
109        while ((result = sr.get()) == null) {
110            if (task.taskCanceled()) {
111                result = task.getEmptyResult();
112                break;
113            }
114            if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {
115                result = task.doLeaf();
116                break;
117            }
118            K leftChild, rightChild, taskToFork;
119            task.leftChild  = leftChild = task.makeChild(ls);
120            task.rightChild = rightChild = task.makeChild(rs);
121            task.setPendingCount(1);
122            if (forkRight) {
123                forkRight = false;
124                rs = ls;
125                task = leftChild;
126                taskToFork = rightChild;
127            }
128            else {
129                forkRight = true;
130                task = rightChild;
131                taskToFork = leftChild;
132            }
133            taskToFork.fork();
134            sizeEstimate = rs.estimateSize();
135        }
136        task.setLocalResult(result);
137        task.tryComplete();
138    }
139
140
141    /**
142     * Declares that a globally valid result has been found.  If another task has
143     * not already found the answer, the result is installed in
144     * {@code sharedResult}.  The {@code compute()} method will check
145     * {@code sharedResult} before proceeding with computation, so this causes
146     * the computation to terminate early.
147     *
148     * @param result the result found
149     */
150    protected void shortCircuit(R result) {
151        if (result != null)
152            sharedResult.compareAndSet(null, result);
153    }
154
155    /**
156     * Sets a local result for this task.  If this task is the root, set the
157     * shared result instead (if not already set).
158     *
159     * @param localResult The result to set for this task
160     */
161    @Override
162    protected void setLocalResult(R localResult) {
163        if (isRoot()) {
164            if (localResult != null)
165                sharedResult.compareAndSet(null, localResult);
166        }
167        else
168            super.setLocalResult(localResult);
169    }
170
171    /**
172     * Retrieves the local result for this task
173     */
174    @Override
175    public R getRawResult() {
176        return getLocalResult();
177    }
178
179    /**
180     * Retrieves the local result for this task.  If this task is the root,
181     * retrieves the shared result instead.
182     */
183    @Override
184    public R getLocalResult() {
185        if (isRoot()) {
186            R answer = sharedResult.get();
187            return (answer == null) ? getEmptyResult() : answer;
188        }
189        else
190            return super.getLocalResult();
191    }
192
193    /**
194     * Mark this task as canceled
195     */
196    protected void cancel() {
197        canceled = true;
198    }
199
200    /**
201     * Queries whether this task is canceled.  A task is considered canceled if
202     * it or any of its parents have been canceled.
203     *
204     * @return {@code true} if this task or any parent is canceled.
205     */
206    protected boolean taskCanceled() {
207        boolean cancel = canceled;
208        if (!cancel) {
209            for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent())
210                cancel = parent.canceled;
211        }
212
213        return cancel;
214    }
215
216    /**
217     * Cancels all tasks which succeed this one in the encounter order.  This
218     * includes canceling all the current task's right sibling, as well as the
219     * later right siblings of all its parents.
220     */
221    protected void cancelLaterNodes() {
222        // Go up the tree, cancel right siblings of this node and all parents
223        for (@SuppressWarnings("unchecked") K parent = getParent(), node = (K) this;
224             parent != null;
225             node = parent, parent = parent.getParent()) {
226            // If node is a left child of parent, then has a right sibling
227            if (parent.leftChild == node) {
228                K rightSibling = parent.rightChild;
229                if (!rightSibling.canceled)
230                    rightSibling.cancel();
231            }
232        }
233    }
234}
235