1/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.gallery3d.util;
18
19import com.android.gallery3d.common.Utils;
20import com.android.gallery3d.util.ThreadPool.Job;
21import com.android.gallery3d.util.ThreadPool.JobContext;
22
23import java.util.LinkedList;
24
25// Limit the number of concurrent jobs that has been submitted into a ThreadPool
26@SuppressWarnings("rawtypes")
27public class JobLimiter implements FutureListener {
28    private static final String TAG = "JobLimiter";
29
30    // State Transition:
31    //      INIT -> DONE, CANCELLED
32    //      DONE -> CANCELLED
33    private static final int STATE_INIT = 0;
34    private static final int STATE_DONE = 1;
35    private static final int STATE_CANCELLED = 2;
36
37    private final LinkedList<JobWrapper<?>> mJobs = new LinkedList<JobWrapper<?>>();
38    private final ThreadPool mPool;
39    private int mLimit;
40
41    private static class JobWrapper<T> implements Future<T>, Job<T> {
42        private int mState = STATE_INIT;
43        private Job<T> mJob;
44        private Future<T> mDelegate;
45        private FutureListener<T> mListener;
46        private T mResult;
47
48        public JobWrapper(Job<T> job, FutureListener<T> listener) {
49            mJob = job;
50            mListener = listener;
51        }
52
53        public synchronized void setFuture(Future<T> future) {
54            if (mState != STATE_INIT) return;
55            mDelegate = future;
56        }
57
58        @Override
59        public void cancel() {
60            FutureListener<T> listener = null;
61            synchronized (this) {
62                if (mState != STATE_DONE) {
63                    listener = mListener;
64                    mJob = null;
65                    mListener = null;
66                    if (mDelegate != null) {
67                        mDelegate.cancel();
68                        mDelegate = null;
69                    }
70                }
71                mState = STATE_CANCELLED;
72                mResult = null;
73                notifyAll();
74            }
75            if (listener != null) listener.onFutureDone(this);
76        }
77
78        @Override
79        public synchronized boolean isCancelled() {
80            return mState == STATE_CANCELLED;
81        }
82
83        @Override
84        public boolean isDone() {
85            // Both CANCELLED AND DONE is considered as done
86            return mState !=  STATE_INIT;
87        }
88
89        @Override
90        public synchronized T get() {
91            while (mState == STATE_INIT) {
92                // handle the interrupted exception of wait()
93                Utils.waitWithoutInterrupt(this);
94            }
95            return mResult;
96        }
97
98        @Override
99        public void waitDone() {
100            get();
101        }
102
103        @Override
104        public T run(JobContext jc) {
105            Job<T> job = null;
106            synchronized (this) {
107                if (mState == STATE_CANCELLED) return null;
108                job = mJob;
109            }
110            T result  = null;
111            try {
112                result = job.run(jc);
113            } catch (Throwable t) {
114                Log.w(TAG, "error executing job: " + job, t);
115            }
116            FutureListener<T> listener = null;
117            synchronized (this) {
118                if (mState == STATE_CANCELLED) return null;
119                mState = STATE_DONE;
120                listener = mListener;
121                mListener = null;
122                mJob = null;
123                mResult = result;
124                notifyAll();
125            }
126            if (listener != null) listener.onFutureDone(this);
127            return result;
128        }
129    }
130
131    public JobLimiter(ThreadPool pool, int limit) {
132        mPool = Utils.checkNotNull(pool);
133        mLimit = limit;
134    }
135
136    public synchronized <T> Future<T> submit(Job<T> job, FutureListener<T> listener) {
137        JobWrapper<T> future = new JobWrapper<T>(Utils.checkNotNull(job), listener);
138        mJobs.addLast(future);
139        submitTasksIfAllowed();
140        return future;
141    }
142
143    @SuppressWarnings({"rawtypes", "unchecked"})
144    private void submitTasksIfAllowed() {
145        while (mLimit > 0 && !mJobs.isEmpty()) {
146            JobWrapper wrapper = mJobs.removeFirst();
147            if (!wrapper.isCancelled()) {
148                --mLimit;
149                wrapper.setFuture(mPool.submit(wrapper, this));
150            }
151        }
152    }
153
154    @Override
155    public synchronized void onFutureDone(Future future) {
156        ++mLimit;
157        submitTasksIfAllowed();
158    }
159}
160