1/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.volley;
18
19import android.os.Handler;
20import android.os.Looper;
21
22import java.util.HashMap;
23import java.util.HashSet;
24import java.util.LinkedList;
25import java.util.Map;
26import java.util.Queue;
27import java.util.Set;
28import java.util.concurrent.PriorityBlockingQueue;
29import java.util.concurrent.atomic.AtomicInteger;
30
31/**
32 * A request dispatch queue with a thread pool of dispatchers.
33 *
34 * Calling {@link #add(Request)} will enqueue the given Request for dispatch,
35 * resolving from either cache or network on a worker thread, and then delivering
36 * a parsed response on the main thread.
37 */
38@SuppressWarnings("rawtypes")
39public class RequestQueue {
40
41    /** Used for generating monotonically-increasing sequence numbers for requests. */
42    private AtomicInteger mSequenceGenerator = new AtomicInteger();
43
44    /**
45     * Staging area for requests that already have a duplicate request in flight.
46     *
47     * <ul>
48     *     <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache
49     *          key.</li>
50     *     <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request
51     *          is <em>not</em> contained in that list. Is null if no requests are staged.</li>
52     * </ul>
53     */
54    private final Map<String, Queue<Request>> mWaitingRequests =
55            new HashMap<String, Queue<Request>>();
56
57    /**
58     * The set of all requests currently being processed by this RequestQueue. A Request
59     * will be in this set if it is waiting in any queue or currently being processed by
60     * any dispatcher.
61     */
62    private final Set<Request> mCurrentRequests = new HashSet<Request>();
63
64    /** The cache triage queue. */
65    private final PriorityBlockingQueue<Request> mCacheQueue =
66        new PriorityBlockingQueue<Request>();
67
68    /** The queue of requests that are actually going out to the network. */
69    private final PriorityBlockingQueue<Request> mNetworkQueue =
70        new PriorityBlockingQueue<Request>();
71
72    /** Number of network request dispatcher threads to start. */
73    private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;
74
75    /** Cache interface for retrieving and storing respones. */
76    private final Cache mCache;
77
78    /** Network interface for performing requests. */
79    private final Network mNetwork;
80
81    /** Response delivery mechanism. */
82    private final ResponseDelivery mDelivery;
83
84    /** The network dispatchers. */
85    private NetworkDispatcher[] mDispatchers;
86
87    /** The cache dispatcher. */
88    private CacheDispatcher mCacheDispatcher;
89
90    /**
91     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
92     *
93     * @param cache A Cache to use for persisting responses to disk
94     * @param network A Network interface for performing HTTP requests
95     * @param threadPoolSize Number of network dispatcher threads to create
96     * @param delivery A ResponseDelivery interface for posting responses and errors
97     */
98    public RequestQueue(Cache cache, Network network, int threadPoolSize,
99            ResponseDelivery delivery) {
100        mCache = cache;
101        mNetwork = network;
102        mDispatchers = new NetworkDispatcher[threadPoolSize];
103        mDelivery = delivery;
104    }
105
106    /**
107     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
108     *
109     * @param cache A Cache to use for persisting responses to disk
110     * @param network A Network interface for performing HTTP requests
111     * @param threadPoolSize Number of network dispatcher threads to create
112     */
113    public RequestQueue(Cache cache, Network network, int threadPoolSize) {
114        this(cache, network, threadPoolSize,
115                new ExecutorDelivery(new Handler(Looper.getMainLooper())));
116    }
117
118    /**
119     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
120     *
121     * @param cache A Cache to use for persisting responses to disk
122     * @param network A Network interface for performing HTTP requests
123     */
124    public RequestQueue(Cache cache, Network network) {
125        this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
126    }
127
128    /**
129     * Starts the dispatchers in this queue.
130     */
131    public void start() {
132        stop();  // Make sure any currently running dispatchers are stopped.
133        // Create the cache dispatcher and start it.
134        mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
135        mCacheDispatcher.start();
136
137        // Create network dispatchers (and corresponding threads) up to the pool size.
138        for (int i = 0; i < mDispatchers.length; i++) {
139            NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
140                    mCache, mDelivery);
141            mDispatchers[i] = networkDispatcher;
142            networkDispatcher.start();
143        }
144    }
145
146    /**
147     * Stops the cache and network dispatchers.
148     */
149    public void stop() {
150        if (mCacheDispatcher != null) {
151            mCacheDispatcher.quit();
152        }
153        for (int i = 0; i < mDispatchers.length; i++) {
154            if (mDispatchers[i] != null) {
155                mDispatchers[i].quit();
156            }
157        }
158    }
159
160    /**
161     * Gets a sequence number.
162     */
163    public int getSequenceNumber() {
164        return mSequenceGenerator.incrementAndGet();
165    }
166
167    /**
168     * Gets the {@link Cache} instance being used.
169     */
170    public Cache getCache() {
171        return mCache;
172    }
173
174    /**
175     * A simple predicate or filter interface for Requests, for use by
176     * {@link RequestQueue#cancelAll(RequestFilter)}.
177     */
178    public interface RequestFilter {
179        public boolean apply(Request<?> request);
180    }
181
182    /**
183     * Cancels all requests in this queue for which the given filter applies.
184     * @param filter The filtering function to use
185     */
186    public void cancelAll(RequestFilter filter) {
187        synchronized (mCurrentRequests) {
188            for (Request<?> request : mCurrentRequests) {
189                if (filter.apply(request)) {
190                    request.cancel();
191                }
192            }
193        }
194    }
195
196    /**
197     * Cancels all requests in this queue with the given tag. Tag must be non-null
198     * and equality is by identity.
199     */
200    public void cancelAll(final Object tag) {
201        if (tag == null) {
202            throw new IllegalArgumentException("Cannot cancelAll with a null tag");
203        }
204        cancelAll(new RequestFilter() {
205            @Override
206            public boolean apply(Request<?> request) {
207                return request.getTag() == tag;
208            }
209        });
210    }
211
212    /**
213     * Adds a Request to the dispatch queue.
214     * @param request The request to service
215     * @return The passed-in request
216     */
217    public Request add(Request request) {
218        // Tag the request as belonging to this queue and add it to the set of current requests.
219        request.setRequestQueue(this);
220        synchronized (mCurrentRequests) {
221            mCurrentRequests.add(request);
222        }
223
224        // Process requests in the order they are added.
225        request.setSequence(getSequenceNumber());
226        request.addMarker("add-to-queue");
227
228        // If the request is uncacheable, skip the cache queue and go straight to the network.
229        if (!request.shouldCache()) {
230            mNetworkQueue.add(request);
231            return request;
232        }
233
234        // Insert request into stage if there's already a request with the same cache key in flight.
235        synchronized (mWaitingRequests) {
236            String cacheKey = request.getCacheKey();
237            if (mWaitingRequests.containsKey(cacheKey)) {
238                // There is already a request in flight. Queue up.
239                Queue<Request> stagedRequests = mWaitingRequests.get(cacheKey);
240                if (stagedRequests == null) {
241                    stagedRequests = new LinkedList<Request>();
242                }
243                stagedRequests.add(request);
244                mWaitingRequests.put(cacheKey, stagedRequests);
245                if (VolleyLog.DEBUG) {
246                    VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey);
247                }
248            } else {
249                // Insert 'null' queue for this cacheKey, indicating there is now a request in
250                // flight.
251                mWaitingRequests.put(cacheKey, null);
252                mCacheQueue.add(request);
253            }
254            return request;
255        }
256    }
257
258    /**
259     * Called from {@link Request#finish(String)}, indicating that processing of the given request
260     * has finished.
261     *
262     * <p>Releases waiting requests for <code>request.getCacheKey()</code> if
263     *      <code>request.shouldCache()</code>.</p>
264     */
265    void finish(Request request) {
266        // Remove from the set of requests currently being processed.
267        synchronized (mCurrentRequests) {
268            mCurrentRequests.remove(request);
269        }
270
271        if (request.shouldCache()) {
272            synchronized (mWaitingRequests) {
273                String cacheKey = request.getCacheKey();
274                Queue<Request> waitingRequests = mWaitingRequests.remove(cacheKey);
275                if (waitingRequests != null) {
276                    if (VolleyLog.DEBUG) {
277                        VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.",
278                                waitingRequests.size(), cacheKey);
279                    }
280                    // Process all queued up requests. They won't be considered as in flight, but
281                    // that's not a problem as the cache has been primed by 'request'.
282                    mCacheQueue.addAll(waitingRequests);
283                }
284            }
285        }
286    }
287}
288