1/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.volley;
18
19import android.os.Handler;
20import android.os.Looper;
21
22import java.util.ArrayList;
23import java.util.HashMap;
24import java.util.HashSet;
25import java.util.LinkedList;
26import java.util.List;
27import java.util.Map;
28import java.util.Queue;
29import java.util.Set;
30import java.util.concurrent.PriorityBlockingQueue;
31import java.util.concurrent.atomic.AtomicInteger;
32
33/**
34 * A request dispatch queue with a thread pool of dispatchers.
35 *
36 * Calling {@link #add(Request)} will enqueue the given Request for dispatch,
37 * resolving from either cache or network on a worker thread, and then delivering
38 * a parsed response on the main thread.
39 */
40public class RequestQueue {
41
42    /** Callback interface for completed requests. */
43    public static interface RequestFinishedListener<T> {
44        /** Called when a request has finished processing. */
45        public void onRequestFinished(Request<T> request);
46    }
47
48    /** Used for generating monotonically-increasing sequence numbers for requests. */
49    private AtomicInteger mSequenceGenerator = new AtomicInteger();
50
51    /**
52     * Staging area for requests that already have a duplicate request in flight.
53     *
54     * <ul>
55     *     <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache
56     *          key.</li>
57     *     <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request
58     *          is <em>not</em> contained in that list. Is null if no requests are staged.</li>
59     * </ul>
60     */
61    private final Map<String, Queue<Request<?>>> mWaitingRequests =
62            new HashMap<String, Queue<Request<?>>>();
63
64    /**
65     * The set of all requests currently being processed by this RequestQueue. A Request
66     * will be in this set if it is waiting in any queue or currently being processed by
67     * any dispatcher.
68     */
69    private final Set<Request<?>> mCurrentRequests = new HashSet<Request<?>>();
70
71    /** The cache triage queue. */
72    private final PriorityBlockingQueue<Request<?>> mCacheQueue =
73        new PriorityBlockingQueue<Request<?>>();
74
75    /** The queue of requests that are actually going out to the network. */
76    private final PriorityBlockingQueue<Request<?>> mNetworkQueue =
77        new PriorityBlockingQueue<Request<?>>();
78
79    /** Number of network request dispatcher threads to start. */
80    private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;
81
82    /** Cache interface for retrieving and storing responses. */
83    private final Cache mCache;
84
85    /** Network interface for performing requests. */
86    private final Network mNetwork;
87
88    /** Response delivery mechanism. */
89    private final ResponseDelivery mDelivery;
90
91    /** The network dispatchers. */
92    private NetworkDispatcher[] mDispatchers;
93
94    /** The cache dispatcher. */
95    private CacheDispatcher mCacheDispatcher;
96
97    private List<RequestFinishedListener> mFinishedListeners =
98            new ArrayList<RequestFinishedListener>();
99
100    /**
101     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
102     *
103     * @param cache A Cache to use for persisting responses to disk
104     * @param network A Network interface for performing HTTP requests
105     * @param threadPoolSize Number of network dispatcher threads to create
106     * @param delivery A ResponseDelivery interface for posting responses and errors
107     */
108    public RequestQueue(Cache cache, Network network, int threadPoolSize,
109            ResponseDelivery delivery) {
110        mCache = cache;
111        mNetwork = network;
112        mDispatchers = new NetworkDispatcher[threadPoolSize];
113        mDelivery = delivery;
114    }
115
116    /**
117     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
118     *
119     * @param cache A Cache to use for persisting responses to disk
120     * @param network A Network interface for performing HTTP requests
121     * @param threadPoolSize Number of network dispatcher threads to create
122     */
123    public RequestQueue(Cache cache, Network network, int threadPoolSize) {
124        this(cache, network, threadPoolSize,
125                new ExecutorDelivery(new Handler(Looper.getMainLooper())));
126    }
127
128    /**
129     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
130     *
131     * @param cache A Cache to use for persisting responses to disk
132     * @param network A Network interface for performing HTTP requests
133     */
134    public RequestQueue(Cache cache, Network network) {
135        this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
136    }
137
138    /**
139     * Starts the dispatchers in this queue.
140     */
141    public void start() {
142        stop();  // Make sure any currently running dispatchers are stopped.
143        // Create the cache dispatcher and start it.
144        mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
145        mCacheDispatcher.start();
146
147        // Create network dispatchers (and corresponding threads) up to the pool size.
148        for (int i = 0; i < mDispatchers.length; i++) {
149            NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
150                    mCache, mDelivery);
151            mDispatchers[i] = networkDispatcher;
152            networkDispatcher.start();
153        }
154    }
155
156    /**
157     * Stops the cache and network dispatchers.
158     */
159    public void stop() {
160        if (mCacheDispatcher != null) {
161            mCacheDispatcher.quit();
162        }
163        for (int i = 0; i < mDispatchers.length; i++) {
164            if (mDispatchers[i] != null) {
165                mDispatchers[i].quit();
166            }
167        }
168    }
169
170    /**
171     * Gets a sequence number.
172     */
173    public int getSequenceNumber() {
174        return mSequenceGenerator.incrementAndGet();
175    }
176
177    /**
178     * Gets the {@link Cache} instance being used.
179     */
180    public Cache getCache() {
181        return mCache;
182    }
183
184    /**
185     * A simple predicate or filter interface for Requests, for use by
186     * {@link RequestQueue#cancelAll(RequestFilter)}.
187     */
188    public interface RequestFilter {
189        public boolean apply(Request<?> request);
190    }
191
192    /**
193     * Cancels all requests in this queue for which the given filter applies.
194     * @param filter The filtering function to use
195     */
196    public void cancelAll(RequestFilter filter) {
197        synchronized (mCurrentRequests) {
198            for (Request<?> request : mCurrentRequests) {
199                if (filter.apply(request)) {
200                    request.cancel();
201                }
202            }
203        }
204    }
205
206    /**
207     * Cancels all requests in this queue with the given tag. Tag must be non-null
208     * and equality is by identity.
209     */
210    public void cancelAll(final Object tag) {
211        if (tag == null) {
212            throw new IllegalArgumentException("Cannot cancelAll with a null tag");
213        }
214        cancelAll(new RequestFilter() {
215            @Override
216            public boolean apply(Request<?> request) {
217                return request.getTag() == tag;
218            }
219        });
220    }
221
222    /**
223     * Adds a Request to the dispatch queue.
224     * @param request The request to service
225     * @return The passed-in request
226     */
227    public <T> Request<T> add(Request<T> request) {
228        // Tag the request as belonging to this queue and add it to the set of current requests.
229        request.setRequestQueue(this);
230        synchronized (mCurrentRequests) {
231            mCurrentRequests.add(request);
232        }
233
234        // Process requests in the order they are added.
235        request.setSequence(getSequenceNumber());
236        request.addMarker("add-to-queue");
237
238        // If the request is uncacheable, skip the cache queue and go straight to the network.
239        if (!request.shouldCache()) {
240            mNetworkQueue.add(request);
241            return request;
242        }
243
244        // Insert request into stage if there's already a request with the same cache key in flight.
245        synchronized (mWaitingRequests) {
246            String cacheKey = request.getCacheKey();
247            if (mWaitingRequests.containsKey(cacheKey)) {
248                // There is already a request in flight. Queue up.
249                Queue<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey);
250                if (stagedRequests == null) {
251                    stagedRequests = new LinkedList<Request<?>>();
252                }
253                stagedRequests.add(request);
254                mWaitingRequests.put(cacheKey, stagedRequests);
255                if (VolleyLog.DEBUG) {
256                    VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey);
257                }
258            } else {
259                // Insert 'null' queue for this cacheKey, indicating there is now a request in
260                // flight.
261                mWaitingRequests.put(cacheKey, null);
262                mCacheQueue.add(request);
263            }
264            return request;
265        }
266    }
267
268    /**
269     * Called from {@link Request#finish(String)}, indicating that processing of the given request
270     * has finished.
271     *
272     * <p>Releases waiting requests for <code>request.getCacheKey()</code> if
273     *      <code>request.shouldCache()</code>.</p>
274     */
275    <T> void finish(Request<T> request) {
276        // Remove from the set of requests currently being processed.
277        synchronized (mCurrentRequests) {
278            mCurrentRequests.remove(request);
279        }
280        synchronized (mFinishedListeners) {
281          for (RequestFinishedListener<T> listener : mFinishedListeners) {
282            listener.onRequestFinished(request);
283          }
284        }
285
286        if (request.shouldCache()) {
287            synchronized (mWaitingRequests) {
288                String cacheKey = request.getCacheKey();
289                Queue<Request<?>> waitingRequests = mWaitingRequests.remove(cacheKey);
290                if (waitingRequests != null) {
291                    if (VolleyLog.DEBUG) {
292                        VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.",
293                                waitingRequests.size(), cacheKey);
294                    }
295                    // Process all queued up requests. They won't be considered as in flight, but
296                    // that's not a problem as the cache has been primed by 'request'.
297                    mCacheQueue.addAll(waitingRequests);
298                }
299            }
300        }
301    }
302
303    public  <T> void addRequestFinishedListener(RequestFinishedListener<T> listener) {
304      synchronized (mFinishedListeners) {
305        mFinishedListeners.add(listener);
306      }
307    }
308
309    /**
310     * Remove a RequestFinishedListener. Has no effect if listener was not previously added.
311     */
312    public  <T> void removeRequestFinishedListener(RequestFinishedListener<T> listener) {
313      synchronized (mFinishedListeners) {
314        mFinishedListeners.remove(listener);
315      }
316    }
317}
318