1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.gallery3d.util;
18
19import android.util.Log;
20
21import java.util.concurrent.Executor;
22import java.util.concurrent.LinkedBlockingQueue;
23import java.util.concurrent.ThreadPoolExecutor;
24import java.util.concurrent.TimeUnit;
25
26public class ThreadPool {
27    @SuppressWarnings("unused")
28    private static final String TAG = "ThreadPool";
29    private static final int CORE_POOL_SIZE = 4;
30    private static final int MAX_POOL_SIZE = 8;
31    private static final int KEEP_ALIVE_TIME = 10; // 10 seconds
32
33    // Resource type
34    public static final int MODE_NONE = 0;
35    public static final int MODE_CPU = 1;
36    public static final int MODE_NETWORK = 2;
37
38    public static final JobContext JOB_CONTEXT_STUB = new JobContextStub();
39
40    ResourceCounter mCpuCounter = new ResourceCounter(2);
41    ResourceCounter mNetworkCounter = new ResourceCounter(2);
42
43    // A Job is like a Callable, but it has an addition JobContext parameter.
44    public interface Job<T> {
45        public T run(JobContext jc);
46    }
47
48    public interface JobContext {
49        boolean isCancelled();
50        void setCancelListener(CancelListener listener);
51        boolean setMode(int mode);
52    }
53
54    private static class JobContextStub implements JobContext {
55        @Override
56        public boolean isCancelled() {
57            return false;
58        }
59
60        @Override
61        public void setCancelListener(CancelListener listener) {
62        }
63
64        @Override
65        public boolean setMode(int mode) {
66            return true;
67        }
68    }
69
70    public interface CancelListener {
71        public void onCancel();
72    }
73
74    private static class ResourceCounter {
75        public int value;
76        public ResourceCounter(int v) {
77            value = v;
78        }
79    }
80
81    private final Executor mExecutor;
82
83    public ThreadPool() {
84        this(CORE_POOL_SIZE, MAX_POOL_SIZE);
85    }
86
87    public ThreadPool(int initPoolSize, int maxPoolSize) {
88        mExecutor = new ThreadPoolExecutor(
89                initPoolSize, maxPoolSize, KEEP_ALIVE_TIME,
90                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
91                new PriorityThreadFactory("thread-pool",
92                android.os.Process.THREAD_PRIORITY_BACKGROUND));
93    }
94
95    // Submit a job to the thread pool. The listener will be called when the
96    // job is finished (or cancelled).
97    public <T> Future<T> submit(Job<T> job, FutureListener<T> listener) {
98        Worker<T> w = new Worker<T>(job, listener);
99        mExecutor.execute(w);
100        return w;
101    }
102
103    public <T> Future<T> submit(Job<T> job) {
104        return submit(job, null);
105    }
106
107    private class Worker<T> implements Runnable, Future<T>, JobContext {
108        @SuppressWarnings("hiding")
109        private static final String TAG = "Worker";
110        private Job<T> mJob;
111        private FutureListener<T> mListener;
112        private CancelListener mCancelListener;
113        private ResourceCounter mWaitOnResource;
114        private volatile boolean mIsCancelled;
115        private boolean mIsDone;
116        private T mResult;
117        private int mMode;
118
119        public Worker(Job<T> job, FutureListener<T> listener) {
120            mJob = job;
121            mListener = listener;
122        }
123
124        // This is called by a thread in the thread pool.
125        @Override
126        public void run() {
127            T result = null;
128
129            // A job is in CPU mode by default. setMode returns false
130            // if the job is cancelled.
131            if (setMode(MODE_CPU)) {
132                try {
133                    result = mJob.run(this);
134                } catch (Throwable ex) {
135                    Log.w(TAG, "Exception in running a job", ex);
136                }
137            }
138
139            synchronized(this) {
140                setMode(MODE_NONE);
141                mResult = result;
142                mIsDone = true;
143                notifyAll();
144            }
145            if (mListener != null) mListener.onFutureDone(this);
146        }
147
148        // Below are the methods for Future.
149        @Override
150        public synchronized void cancel() {
151            if (mIsCancelled) return;
152            mIsCancelled = true;
153            if (mWaitOnResource != null) {
154                synchronized (mWaitOnResource) {
155                    mWaitOnResource.notifyAll();
156                }
157            }
158            if (mCancelListener != null) {
159                mCancelListener.onCancel();
160            }
161        }
162
163        @Override
164        public boolean isCancelled() {
165            return mIsCancelled;
166        }
167
168        @Override
169        public synchronized boolean isDone() {
170            return mIsDone;
171        }
172
173        @Override
174        public synchronized T get() {
175            while (!mIsDone) {
176                try {
177                    wait();
178                } catch (Exception ex) {
179                    Log.w(TAG, "ingore exception", ex);
180                    // ignore.
181                }
182            }
183            return mResult;
184        }
185
186        @Override
187        public void waitDone() {
188            get();
189        }
190
191        // Below are the methods for JobContext (only called from the
192        // thread running the job)
193        @Override
194        public synchronized void setCancelListener(CancelListener listener) {
195            mCancelListener = listener;
196            if (mIsCancelled && mCancelListener != null) {
197                mCancelListener.onCancel();
198            }
199        }
200
201        @Override
202        public boolean setMode(int mode) {
203            // Release old resource
204            ResourceCounter rc = modeToCounter(mMode);
205            if (rc != null) releaseResource(rc);
206            mMode = MODE_NONE;
207
208            // Acquire new resource
209            rc = modeToCounter(mode);
210            if (rc != null) {
211                if (!acquireResource(rc)) {
212                    return false;
213                }
214                mMode = mode;
215            }
216
217            return true;
218        }
219
220        private ResourceCounter modeToCounter(int mode) {
221            if (mode == MODE_CPU) {
222                return mCpuCounter;
223            } else if (mode == MODE_NETWORK) {
224                return mNetworkCounter;
225            } else {
226                return null;
227            }
228        }
229
230        private boolean acquireResource(ResourceCounter counter) {
231            while (true) {
232                synchronized (this) {
233                    if (mIsCancelled) {
234                        mWaitOnResource = null;
235                        return false;
236                    }
237                    mWaitOnResource = counter;
238                }
239
240                synchronized (counter) {
241                    if (counter.value > 0) {
242                        counter.value--;
243                        break;
244                    } else {
245                        try {
246                            counter.wait();
247                        } catch (InterruptedException ex) {
248                            // ignore.
249                        }
250                    }
251                }
252            }
253
254            synchronized (this) {
255                mWaitOnResource = null;
256            }
257
258            return true;
259        }
260
261        private void releaseResource(ResourceCounter counter) {
262            synchronized (counter) {
263                counter.value++;
264                counter.notifyAll();
265            }
266        }
267    }
268}
269