1/*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7package java.util.concurrent;
8
9import java.security.AccessControlContext;
10import java.security.ProtectionDomain;
11
12/**
13 * A thread managed by a {@link ForkJoinPool}, which executes
14 * {@link ForkJoinTask}s.
15 * This class is subclassable solely for the sake of adding
16 * functionality -- there are no overridable methods dealing with
17 * scheduling or execution.  However, you can override initialization
18 * and termination methods surrounding the main task processing loop.
19 * If you do create such a subclass, you will also need to supply a
20 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
21 * {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
22 *
23 * @since 1.7
24 * @author Doug Lea
25 */
26public class ForkJoinWorkerThread extends Thread {
27    /*
28     * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
29     * ForkJoinTasks. For explanation, see the internal documentation
30     * of class ForkJoinPool.
31     *
32     * This class just maintains links to its pool and WorkQueue.  The
33     * pool field is set immediately upon construction, but the
34     * workQueue field is not set until a call to registerWorker
35     * completes. This leads to a visibility race, that is tolerated
36     * by requiring that the workQueue field is only accessed by the
37     * owning thread.
38     *
39     * Support for (non-public) subclass InnocuousForkJoinWorkerThread
40     * requires that we break quite a lot of encapsulation (via Unsafe)
41     * both here and in the subclass to access and set Thread fields.
42     */
43
44    final ForkJoinPool pool;                // the pool this thread works in
45    final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
46
47    /**
48     * Creates a ForkJoinWorkerThread operating in the given pool.
49     *
50     * @param pool the pool this thread works in
51     * @throws NullPointerException if pool is null
52     */
53    protected ForkJoinWorkerThread(ForkJoinPool pool) {
54        // Use a placeholder until a useful name can be set in registerWorker
55        super("aForkJoinWorkerThread");
56        this.pool = pool;
57        this.workQueue = pool.registerWorker(this);
58    }
59
60    /**
61     * Version for InnocuousForkJoinWorkerThread.
62     */
63    ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
64                         AccessControlContext acc) {
65        super(threadGroup, null, "aForkJoinWorkerThread");
66        U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
67        eraseThreadLocals(); // clear before registering
68        this.pool = pool;
69        this.workQueue = pool.registerWorker(this);
70    }
71
72    /**
73     * Returns the pool hosting this thread.
74     *
75     * @return the pool
76     */
77    public ForkJoinPool getPool() {
78        return pool;
79    }
80
81    /**
82     * Returns the unique index number of this thread in its pool.
83     * The returned value ranges from zero to the maximum number of
84     * threads (minus one) that may exist in the pool, and does not
85     * change during the lifetime of the thread.  This method may be
86     * useful for applications that track status or collect results
87     * per-worker-thread rather than per-task.
88     *
89     * @return the index number
90     */
91    public int getPoolIndex() {
92        return workQueue.getPoolIndex();
93    }
94
95    /**
96     * Initializes internal state after construction but before
97     * processing any tasks. If you override this method, you must
98     * invoke {@code super.onStart()} at the beginning of the method.
99     * Initialization requires care: Most fields must have legal
100     * default values, to ensure that attempted accesses from other
101     * threads work correctly even before this thread starts
102     * processing tasks.
103     */
104    protected void onStart() {
105    }
106
107    /**
108     * Performs cleanup associated with termination of this worker
109     * thread.  If you override this method, you must invoke
110     * {@code super.onTermination} at the end of the overridden method.
111     *
112     * @param exception the exception causing this thread to abort due
113     * to an unrecoverable error, or {@code null} if completed normally
114     */
115    protected void onTermination(Throwable exception) {
116    }
117
118    /**
119     * This method is required to be public, but should never be
120     * called explicitly. It performs the main run loop to execute
121     * {@link ForkJoinTask}s.
122     */
123    public void run() {
124        if (workQueue.array == null) { // only run once
125            Throwable exception = null;
126            try {
127                onStart();
128                pool.runWorker(workQueue);
129            } catch (Throwable ex) {
130                exception = ex;
131            } finally {
132                try {
133                    onTermination(exception);
134                } catch (Throwable ex) {
135                    if (exception == null)
136                        exception = ex;
137                } finally {
138                    pool.deregisterWorker(this, exception);
139                }
140            }
141        }
142    }
143
144    /**
145     * Erases ThreadLocals by nulling out Thread maps.
146     */
147    final void eraseThreadLocals() {
148        U.putObject(this, THREADLOCALS, null);
149        U.putObject(this, INHERITABLETHREADLOCALS, null);
150    }
151
152    /**
153     * Non-public hook method for InnocuousForkJoinWorkerThread.
154     */
155    void afterTopLevelExec() {
156    }
157
158    // Set up to allow setting thread fields in constructor
159    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
160    private static final long THREADLOCALS;
161    private static final long INHERITABLETHREADLOCALS;
162    private static final long INHERITEDACCESSCONTROLCONTEXT;
163    static {
164        try {
165            THREADLOCALS = U.objectFieldOffset
166                (Thread.class.getDeclaredField("threadLocals"));
167            INHERITABLETHREADLOCALS = U.objectFieldOffset
168                (Thread.class.getDeclaredField("inheritableThreadLocals"));
169            INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset
170                (Thread.class.getDeclaredField("inheritedAccessControlContext"));
171        } catch (ReflectiveOperationException e) {
172            throw new Error(e);
173        }
174    }
175
176    /**
177     * A worker thread that has no permissions, is not a member of any
178     * user-defined ThreadGroup, and erases all ThreadLocals after
179     * running each top-level task.
180     */
181    static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
182        /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
183        private static final ThreadGroup innocuousThreadGroup =
184            createThreadGroup();
185
186        /** An AccessControlContext supporting no privileges */
187        private static final AccessControlContext INNOCUOUS_ACC =
188            new AccessControlContext(
189                new ProtectionDomain[] {
190                    new ProtectionDomain(null, null)
191                });
192
193        InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
194            super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
195        }
196
197        @Override // to erase ThreadLocals
198        void afterTopLevelExec() {
199            eraseThreadLocals();
200        }
201
202        @Override // to always report system loader
203        public ClassLoader getContextClassLoader() {
204            return ClassLoader.getSystemClassLoader();
205        }
206
207        @Override // to silently fail
208        public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
209
210        @Override // paranoically
211        public void setContextClassLoader(ClassLoader cl) {
212            throw new SecurityException("setContextClassLoader");
213        }
214
215        /**
216         * Returns a new group with the system ThreadGroup (the
217         * topmost, parent-less group) as parent.  Uses Unsafe to
218         * traverse Thread.group and ThreadGroup.parent fields.
219         */
220        private static ThreadGroup createThreadGroup() {
221            try {
222                sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
223                long tg = u.objectFieldOffset
224                    (Thread.class.getDeclaredField("group"));
225                long gp = u.objectFieldOffset
226                    (ThreadGroup.class.getDeclaredField("parent"));
227                ThreadGroup group = (ThreadGroup)
228                    u.getObject(Thread.currentThread(), tg);
229                while (group != null) {
230                    ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
231                    if (parent == null)
232                        return new ThreadGroup(group,
233                                               "InnocuousForkJoinWorkerThreadGroup");
234                    group = parent;
235                }
236            } catch (ReflectiveOperationException e) {
237                throw new Error(e);
238            }
239            // fall through if null as cannot-happen safeguard
240            throw new Error("Cannot create ThreadGroup");
241        }
242    }
243
244}
245