1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/proxy/multi_threaded_proxy_resolver.h"
6
7#include "base/bind.h"
8#include "base/bind_helpers.h"
9#include "base/message_loop/message_loop_proxy.h"
10#include "base/strings/string_util.h"
11#include "base/strings/stringprintf.h"
12#include "base/threading/thread.h"
13#include "base/threading/thread_restrictions.h"
14#include "net/base/net_errors.h"
15#include "net/base/net_log.h"
16#include "net/proxy/proxy_info.h"
17
18// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
19//               data when SetPacScript fails. That will reclaim memory when
20//               testing bogus scripts.
21
22namespace net {
23
24// An "executor" is a job-runner for PAC requests. It encapsulates a worker
25// thread and a synchronous ProxyResolver (which will be operated on said
26// thread.)
27class MultiThreadedProxyResolver::Executor
28    : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
29 public:
30  // |coordinator| must remain valid throughout our lifetime. It is used to
31  // signal when the executor is ready to receive work by calling
32  // |coordinator->OnExecutorReady()|.
33  // The constructor takes ownership of |resolver|.
34  // |thread_number| is an identifier used when naming the worker thread.
35  Executor(MultiThreadedProxyResolver* coordinator,
36           ProxyResolver* resolver,
37           int thread_number);
38
39  // Submit a job to this executor.
40  void StartJob(Job* job);
41
42  // Callback for when a job has completed running on the executor's thread.
43  void OnJobCompleted(Job* job);
44
45  // Cleanup the executor. Cancels all outstanding work, and frees the thread
46  // and resolver.
47  void Destroy();
48
49  // Returns the outstanding job, or NULL.
50  Job* outstanding_job() const { return outstanding_job_.get(); }
51
52  ProxyResolver* resolver() { return resolver_.get(); }
53
54  int thread_number() const { return thread_number_; }
55
56 private:
57  friend class base::RefCountedThreadSafe<Executor>;
58  ~Executor();
59
60  MultiThreadedProxyResolver* coordinator_;
61  const int thread_number_;
62
63  // The currently active job for this executor (either a SetPacScript or
64  // GetProxyForURL task).
65  scoped_refptr<Job> outstanding_job_;
66
67  // The synchronous resolver implementation.
68  scoped_ptr<ProxyResolver> resolver_;
69
70  // The thread where |resolver_| is run on.
71  // Note that declaration ordering is important here. |thread_| needs to be
72  // destroyed *before* |resolver_|, in case |resolver_| is currently
73  // executing on |thread_|.
74  scoped_ptr<base::Thread> thread_;
75};
76
77// MultiThreadedProxyResolver::Job ---------------------------------------------
78
79class MultiThreadedProxyResolver::Job
80    : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
81 public:
82  // Identifies the subclass of Job (only being used for debugging purposes).
83  enum Type {
84    TYPE_GET_PROXY_FOR_URL,
85    TYPE_SET_PAC_SCRIPT,
86    TYPE_SET_PAC_SCRIPT_INTERNAL,
87  };
88
89  Job(Type type, const CompletionCallback& callback)
90      : type_(type),
91        callback_(callback),
92        executor_(NULL),
93        was_cancelled_(false) {
94  }
95
96  void set_executor(Executor* executor) {
97    executor_ = executor;
98  }
99
100  // The "executor" is the job runner that is scheduling this job. If
101  // this job has not been submitted to an executor yet, this will be
102  // NULL (and we know it hasn't started yet).
103  Executor* executor() {
104    return executor_;
105  }
106
107  // Mark the job as having been cancelled.
108  void Cancel() {
109    was_cancelled_ = true;
110  }
111
112  // Returns true if Cancel() has been called.
113  bool was_cancelled() const { return was_cancelled_; }
114
115  Type type() const { return type_; }
116
117  // Returns true if this job still has a user callback. Some jobs
118  // do not have a user callback, because they were helper jobs
119  // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
120  //
121  // Otherwise jobs that correspond with user-initiated work will
122  // have a non-null callback up until the callback is run.
123  bool has_user_callback() const { return !callback_.is_null(); }
124
125  // This method is called when the job is inserted into a wait queue
126  // because no executors were ready to accept it.
127  virtual void WaitingForThread() {}
128
129  // This method is called just before the job is posted to the work thread.
130  virtual void FinishedWaitingForThread() {}
131
132  // This method is called on the worker thread to do the job's work. On
133  // completion, implementors are expected to call OnJobCompleted() on
134  // |origin_loop|.
135  virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0;
136
137 protected:
138  void OnJobCompleted() {
139    // |executor_| will be NULL if the executor has already been deleted.
140    if (executor_)
141      executor_->OnJobCompleted(this);
142  }
143
144  void RunUserCallback(int result) {
145    DCHECK(has_user_callback());
146    CompletionCallback callback = callback_;
147    // Reset the callback so has_user_callback() will now return false.
148    callback_.Reset();
149    callback.Run(result);
150  }
151
152  friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
153
154  virtual ~Job() {}
155
156 private:
157  const Type type_;
158  CompletionCallback callback_;
159  Executor* executor_;
160  bool was_cancelled_;
161};
162
163// MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
164
165// Runs on the worker thread to call ProxyResolver::SetPacScript.
166class MultiThreadedProxyResolver::SetPacScriptJob
167    : public MultiThreadedProxyResolver::Job {
168 public:
169  SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
170                  const CompletionCallback& callback)
171    : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
172                                TYPE_SET_PAC_SCRIPT_INTERNAL,
173          callback),
174      script_data_(script_data) {
175  }
176
177  // Runs on the worker thread.
178  virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
179    ProxyResolver* resolver = executor()->resolver();
180    int rv = resolver->SetPacScript(script_data_, CompletionCallback());
181
182    DCHECK_NE(rv, ERR_IO_PENDING);
183    origin_loop->PostTask(
184        FROM_HERE,
185        base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
186  }
187
188 protected:
189  virtual ~SetPacScriptJob() {}
190
191 private:
192  // Runs the completion callback on the origin thread.
193  void RequestComplete(int result_code) {
194    // The task may have been cancelled after it was started.
195    if (!was_cancelled() && has_user_callback()) {
196      RunUserCallback(result_code);
197    }
198    OnJobCompleted();
199  }
200
201  const scoped_refptr<ProxyResolverScriptData> script_data_;
202};
203
204// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
205
206class MultiThreadedProxyResolver::GetProxyForURLJob
207    : public MultiThreadedProxyResolver::Job {
208 public:
209  // |url|         -- the URL of the query.
210  // |results|     -- the structure to fill with proxy resolve results.
211  GetProxyForURLJob(const GURL& url,
212                    ProxyInfo* results,
213                    const CompletionCallback& callback,
214                    const BoundNetLog& net_log)
215      : Job(TYPE_GET_PROXY_FOR_URL, callback),
216        results_(results),
217        net_log_(net_log),
218        url_(url),
219        was_waiting_for_thread_(false) {
220    DCHECK(!callback.is_null());
221  }
222
223  BoundNetLog* net_log() { return &net_log_; }
224
225  virtual void WaitingForThread() OVERRIDE {
226    was_waiting_for_thread_ = true;
227    net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
228  }
229
230  virtual void FinishedWaitingForThread() OVERRIDE {
231    DCHECK(executor());
232
233    if (was_waiting_for_thread_) {
234      net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
235    }
236
237    net_log_.AddEvent(
238        NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
239        NetLog::IntegerCallback("thread_number", executor()->thread_number()));
240  }
241
242  // Runs on the worker thread.
243  virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
244    ProxyResolver* resolver = executor()->resolver();
245    int rv = resolver->GetProxyForURL(
246        url_, &results_buf_, CompletionCallback(), NULL, net_log_);
247    DCHECK_NE(rv, ERR_IO_PENDING);
248
249    origin_loop->PostTask(
250        FROM_HERE,
251        base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
252  }
253
254 protected:
255  virtual ~GetProxyForURLJob() {}
256
257 private:
258  // Runs the completion callback on the origin thread.
259  void QueryComplete(int result_code) {
260    // The Job may have been cancelled after it was started.
261    if (!was_cancelled()) {
262      if (result_code >= OK) {  // Note: unit-tests use values > 0.
263        results_->Use(results_buf_);
264      }
265      RunUserCallback(result_code);
266    }
267    OnJobCompleted();
268  }
269
270  // Must only be used on the "origin" thread.
271  ProxyInfo* results_;
272
273  // Can be used on either "origin" or worker thread.
274  BoundNetLog net_log_;
275  const GURL url_;
276
277  // Usable from within DoQuery on the worker thread.
278  ProxyInfo results_buf_;
279
280  bool was_waiting_for_thread_;
281};
282
283// MultiThreadedProxyResolver::Executor ----------------------------------------
284
285MultiThreadedProxyResolver::Executor::Executor(
286    MultiThreadedProxyResolver* coordinator,
287    ProxyResolver* resolver,
288    int thread_number)
289    : coordinator_(coordinator),
290      thread_number_(thread_number),
291      resolver_(resolver) {
292  DCHECK(coordinator);
293  DCHECK(resolver);
294  // Start up the thread.
295  thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
296                                                    thread_number)));
297  CHECK(thread_->Start());
298}
299
300void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
301  DCHECK(!outstanding_job_.get());
302  outstanding_job_ = job;
303
304  // Run the job. Once it has completed (regardless of whether it was
305  // cancelled), it will invoke OnJobCompleted() on this thread.
306  job->set_executor(this);
307  job->FinishedWaitingForThread();
308  thread_->message_loop()->PostTask(
309      FROM_HERE,
310      base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
311}
312
313void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
314  DCHECK_EQ(job, outstanding_job_.get());
315  outstanding_job_ = NULL;
316  coordinator_->OnExecutorReady(this);
317}
318
319void MultiThreadedProxyResolver::Executor::Destroy() {
320  DCHECK(coordinator_);
321
322  {
323    // See http://crbug.com/69710.
324    base::ThreadRestrictions::ScopedAllowIO allow_io;
325
326    // Join the worker thread.
327    thread_.reset();
328  }
329
330  // Cancel any outstanding job.
331  if (outstanding_job_.get()) {
332    outstanding_job_->Cancel();
333    // Orphan the job (since this executor may be deleted soon).
334    outstanding_job_->set_executor(NULL);
335  }
336
337  // It is now safe to free the ProxyResolver, since all the tasks that
338  // were using it on the resolver thread have completed.
339  resolver_.reset();
340
341  // Null some stuff as a precaution.
342  coordinator_ = NULL;
343  outstanding_job_ = NULL;
344}
345
346MultiThreadedProxyResolver::Executor::~Executor() {
347  // The important cleanup happens as part of Destroy(), which should always be
348  // called first.
349  DCHECK(!coordinator_) << "Destroy() was not called";
350  DCHECK(!thread_.get());
351  DCHECK(!resolver_.get());
352  DCHECK(!outstanding_job_.get());
353}
354
355// MultiThreadedProxyResolver --------------------------------------------------
356
357MultiThreadedProxyResolver::MultiThreadedProxyResolver(
358    ProxyResolverFactory* resolver_factory,
359    size_t max_num_threads)
360    : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
361      resolver_factory_(resolver_factory),
362      max_num_threads_(max_num_threads) {
363  DCHECK_GE(max_num_threads, 1u);
364}
365
366MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
367  // We will cancel all outstanding requests.
368  pending_jobs_.clear();
369  ReleaseAllExecutors();
370}
371
372int MultiThreadedProxyResolver::GetProxyForURL(
373    const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
374    RequestHandle* request, const BoundNetLog& net_log) {
375  DCHECK(CalledOnValidThread());
376  DCHECK(!callback.is_null());
377  DCHECK(current_script_data_.get())
378      << "Resolver is un-initialized. Must call SetPacScript() first!";
379
380  scoped_refptr<GetProxyForURLJob> job(
381      new GetProxyForURLJob(url, results, callback, net_log));
382
383  // Completion will be notified through |callback|, unless the caller cancels
384  // the request using |request|.
385  if (request)
386    *request = reinterpret_cast<RequestHandle>(job.get());
387
388  // If there is an executor that is ready to run this request, submit it!
389  Executor* executor = FindIdleExecutor();
390  if (executor) {
391    DCHECK_EQ(0u, pending_jobs_.size());
392    executor->StartJob(job.get());
393    return ERR_IO_PENDING;
394  }
395
396  // Otherwise queue this request. (We will schedule it to a thread once one
397  // becomes available).
398  job->WaitingForThread();
399  pending_jobs_.push_back(job);
400
401  // If we haven't already reached the thread limit, provision a new thread to
402  // drain the requests more quickly.
403  if (executors_.size() < max_num_threads_) {
404    executor = AddNewExecutor();
405    executor->StartJob(
406        new SetPacScriptJob(current_script_data_, CompletionCallback()));
407  }
408
409  return ERR_IO_PENDING;
410}
411
412void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
413  DCHECK(CalledOnValidThread());
414  DCHECK(req);
415
416  Job* job = reinterpret_cast<Job*>(req);
417  DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
418
419  if (job->executor()) {
420    // If the job was already submitted to the executor, just mark it
421    // as cancelled so the user callback isn't run on completion.
422    job->Cancel();
423  } else {
424    // Otherwise the job is just sitting in a queue.
425    PendingJobsQueue::iterator it =
426        std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
427    DCHECK(it != pending_jobs_.end());
428    pending_jobs_.erase(it);
429  }
430}
431
432LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
433  DCHECK(CalledOnValidThread());
434  DCHECK(req);
435  return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
436}
437
438void MultiThreadedProxyResolver::CancelSetPacScript() {
439  DCHECK(CalledOnValidThread());
440  DCHECK_EQ(0u, pending_jobs_.size());
441  DCHECK_EQ(1u, executors_.size());
442  DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
443            executors_[0]->outstanding_job()->type());
444
445  // Defensively clear some data which shouldn't be getting used
446  // anymore.
447  current_script_data_ = NULL;
448
449  ReleaseAllExecutors();
450}
451
452int MultiThreadedProxyResolver::SetPacScript(
453    const scoped_refptr<ProxyResolverScriptData>& script_data,
454    const CompletionCallback&callback) {
455  DCHECK(CalledOnValidThread());
456  DCHECK(!callback.is_null());
457
458  // Save the script details, so we can provision new executors later.
459  current_script_data_ = script_data;
460
461  // The user should not have any outstanding requests when they call
462  // SetPacScript().
463  CheckNoOutstandingUserRequests();
464
465  // Destroy all of the current threads and their proxy resolvers.
466  ReleaseAllExecutors();
467
468  // Provision a new executor, and run the SetPacScript request. On completion
469  // notification will be sent through |callback|.
470  Executor* executor = AddNewExecutor();
471  executor->StartJob(new SetPacScriptJob(script_data, callback));
472  return ERR_IO_PENDING;
473}
474
475void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
476  DCHECK(CalledOnValidThread());
477  CHECK_EQ(0u, pending_jobs_.size());
478
479  for (ExecutorList::const_iterator it = executors_.begin();
480       it != executors_.end(); ++it) {
481    const Executor* executor = it->get();
482    Job* job = executor->outstanding_job();
483    // The "has_user_callback()" is to exclude jobs for which the callback
484    // has already been invoked, or was not user-initiated (as in the case of
485    // lazy thread provisions). User-initiated jobs may !has_user_callback()
486    // when the callback has already been run. (Since we only clear the
487    // outstanding job AFTER the callback has been invoked, it is possible
488    // for a new request to be started from within the callback).
489    CHECK(!job || job->was_cancelled() || !job->has_user_callback());
490  }
491}
492
493void MultiThreadedProxyResolver::ReleaseAllExecutors() {
494  DCHECK(CalledOnValidThread());
495  for (ExecutorList::iterator it = executors_.begin();
496       it != executors_.end(); ++it) {
497    Executor* executor = it->get();
498    executor->Destroy();
499  }
500  executors_.clear();
501}
502
503MultiThreadedProxyResolver::Executor*
504MultiThreadedProxyResolver::FindIdleExecutor() {
505  DCHECK(CalledOnValidThread());
506  for (ExecutorList::iterator it = executors_.begin();
507       it != executors_.end(); ++it) {
508    Executor* executor = it->get();
509    if (!executor->outstanding_job())
510      return executor;
511  }
512  return NULL;
513}
514
515MultiThreadedProxyResolver::Executor*
516MultiThreadedProxyResolver::AddNewExecutor() {
517  DCHECK(CalledOnValidThread());
518  DCHECK_LT(executors_.size(), max_num_threads_);
519  // The "thread number" is used to give the thread a unique name.
520  int thread_number = executors_.size();
521  ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
522  Executor* executor = new Executor(
523      this, resolver, thread_number);
524  executors_.push_back(make_scoped_refptr(executor));
525  return executor;
526}
527
528void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
529  DCHECK(CalledOnValidThread());
530  if (pending_jobs_.empty())
531    return;
532
533  // Get the next job to process (FIFO). Transfer it from the pending queue
534  // to the executor.
535  scoped_refptr<Job> job = pending_jobs_.front();
536  pending_jobs_.pop_front();
537  executor->StartJob(job.get());
538}
539
540}  // namespace net
541