1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/proxy/multi_threaded_proxy_resolver.h"
6
7#include "base/message_loop.h"
8#include "base/string_util.h"
9#include "base/stringprintf.h"
10#include "base/threading/thread.h"
11#include "base/threading/thread_restrictions.h"
12#include "net/base/net_errors.h"
13#include "net/base/net_log.h"
14#include "net/proxy/proxy_info.h"
15
16// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
17//               data when SetPacScript fails. That will reclaim memory when
18//               testing bogus scripts.
19
20namespace net {
21
22namespace {
23
24class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
25 public:
26  explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
27  void PurgeMemory() { resolver_->PurgeMemory(); }
28 private:
29  friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
30  ~PurgeMemoryTask() {}
31  ProxyResolver* resolver_;
32};
33
34}  // namespace
35
36// An "executor" is a job-runner for PAC requests. It encapsulates a worker
37// thread and a synchronous ProxyResolver (which will be operated on said
38// thread.)
39class MultiThreadedProxyResolver::Executor
40    : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
41 public:
42  // |coordinator| must remain valid throughout our lifetime. It is used to
43  // signal when the executor is ready to receive work by calling
44  // |coordinator->OnExecutorReady()|.
45  // The constructor takes ownership of |resolver|.
46  // |thread_number| is an identifier used when naming the worker thread.
47  Executor(MultiThreadedProxyResolver* coordinator,
48           ProxyResolver* resolver,
49           int thread_number);
50
51  // Submit a job to this executor.
52  void StartJob(Job* job);
53
54  // Callback for when a job has completed running on the executor's thread.
55  void OnJobCompleted(Job* job);
56
57  // Cleanup the executor. Cancels all outstanding work, and frees the thread
58  // and resolver.
59  void Destroy();
60
61  void PurgeMemory();
62
63  // Returns the outstanding job, or NULL.
64  Job* outstanding_job() const { return outstanding_job_.get(); }
65
66  ProxyResolver* resolver() { return resolver_.get(); }
67
68  int thread_number() const { return thread_number_; }
69
70 private:
71  friend class base::RefCountedThreadSafe<Executor>;
72  ~Executor();
73
74  MultiThreadedProxyResolver* coordinator_;
75  const int thread_number_;
76
77  // The currently active job for this executor (either a SetPacScript or
78  // GetProxyForURL task).
79  scoped_refptr<Job> outstanding_job_;
80
81  // The synchronous resolver implementation.
82  scoped_ptr<ProxyResolver> resolver_;
83
84  // The thread where |resolver_| is run on.
85  // Note that declaration ordering is important here. |thread_| needs to be
86  // destroyed *before* |resolver_|, in case |resolver_| is currently
87  // executing on |thread_|.
88  scoped_ptr<base::Thread> thread_;
89};
90
91// MultiThreadedProxyResolver::Job ---------------------------------------------
92
93class MultiThreadedProxyResolver::Job
94    : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
95 public:
96  // Identifies the subclass of Job (only being used for debugging purposes).
97  enum Type {
98    TYPE_GET_PROXY_FOR_URL,
99    TYPE_SET_PAC_SCRIPT,
100    TYPE_SET_PAC_SCRIPT_INTERNAL,
101  };
102
103  Job(Type type, CompletionCallback* user_callback)
104      : type_(type),
105        user_callback_(user_callback),
106        executor_(NULL),
107        was_cancelled_(false) {
108  }
109
110  void set_executor(Executor* executor) {
111    executor_ = executor;
112  }
113
114  // The "executor" is the job runner that is scheduling this job. If
115  // this job has not been submitted to an executor yet, this will be
116  // NULL (and we know it hasn't started yet).
117  Executor* executor() {
118    return executor_;
119  }
120
121  // Mark the job as having been cancelled.
122  void Cancel() {
123    was_cancelled_ = true;
124  }
125
126  // Returns true if Cancel() has been called.
127  bool was_cancelled() const { return was_cancelled_; }
128
129  Type type() const { return type_; }
130
131  // Returns true if this job still has a user callback. Some jobs
132  // do not have a user callback, because they were helper jobs
133  // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
134  //
135  // Otherwise jobs that correspond with user-initiated work will
136  // have a non-NULL callback up until the callback is run.
137  bool has_user_callback() const { return user_callback_ != NULL; }
138
139  // This method is called when the job is inserted into a wait queue
140  // because no executors were ready to accept it.
141  virtual void WaitingForThread() {}
142
143  // This method is called just before the job is posted to the work thread.
144  virtual void FinishedWaitingForThread() {}
145
146  // This method is called on the worker thread to do the job's work. On
147  // completion, implementors are expected to call OnJobCompleted() on
148  // |origin_loop|.
149  virtual void Run(MessageLoop* origin_loop) = 0;
150
151 protected:
152  void OnJobCompleted() {
153    // |executor_| will be NULL if the executor has already been deleted.
154    if (executor_)
155      executor_->OnJobCompleted(this);
156  }
157
158  void RunUserCallback(int result) {
159    DCHECK(has_user_callback());
160    CompletionCallback* callback = user_callback_;
161    // Null the callback so has_user_callback() will now return false.
162    user_callback_ = NULL;
163    callback->Run(result);
164  }
165
166  friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
167
168  virtual ~Job() {}
169
170 private:
171  const Type type_;
172  CompletionCallback* user_callback_;
173  Executor* executor_;
174  bool was_cancelled_;
175};
176
177// MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
178
179// Runs on the worker thread to call ProxyResolver::SetPacScript.
180class MultiThreadedProxyResolver::SetPacScriptJob
181    : public MultiThreadedProxyResolver::Job {
182 public:
183  SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
184                  CompletionCallback* callback)
185    : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL,
186          callback),
187      script_data_(script_data) {
188  }
189
190  // Runs on the worker thread.
191  virtual void Run(MessageLoop* origin_loop) {
192    ProxyResolver* resolver = executor()->resolver();
193    int rv = resolver->SetPacScript(script_data_, NULL);
194
195    DCHECK_NE(rv, ERR_IO_PENDING);
196    origin_loop->PostTask(
197        FROM_HERE,
198        NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv));
199  }
200
201 private:
202  // Runs the completion callback on the origin thread.
203  void RequestComplete(int result_code) {
204    // The task may have been cancelled after it was started.
205    if (!was_cancelled() && has_user_callback()) {
206      RunUserCallback(result_code);
207    }
208    OnJobCompleted();
209  }
210
211  const scoped_refptr<ProxyResolverScriptData> script_data_;
212};
213
214// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
215
216class MultiThreadedProxyResolver::GetProxyForURLJob
217    : public MultiThreadedProxyResolver::Job {
218 public:
219  // |url|         -- the URL of the query.
220  // |results|     -- the structure to fill with proxy resolve results.
221  GetProxyForURLJob(const GURL& url,
222                    ProxyInfo* results,
223                    CompletionCallback* callback,
224                    const BoundNetLog& net_log)
225      : Job(TYPE_GET_PROXY_FOR_URL, callback),
226        results_(results),
227        net_log_(net_log),
228        url_(url),
229        was_waiting_for_thread_(false) {
230    DCHECK(callback);
231  }
232
233  BoundNetLog* net_log() { return &net_log_; }
234
235  virtual void WaitingForThread() {
236    was_waiting_for_thread_ = true;
237    net_log_.BeginEvent(
238        NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
239  }
240
241  virtual void FinishedWaitingForThread() {
242    DCHECK(executor());
243
244    if (was_waiting_for_thread_) {
245      net_log_.EndEvent(
246          NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
247    }
248
249    net_log_.AddEvent(
250        NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
251        make_scoped_refptr(new NetLogIntegerParameter(
252            "thread_number", executor()->thread_number())));
253  }
254
255  // Runs on the worker thread.
256  virtual void Run(MessageLoop* origin_loop) {
257    ProxyResolver* resolver = executor()->resolver();
258    int rv = resolver->GetProxyForURL(
259        url_, &results_buf_, NULL, NULL, net_log_);
260    DCHECK_NE(rv, ERR_IO_PENDING);
261
262    origin_loop->PostTask(
263        FROM_HERE,
264        NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv));
265  }
266
267 private:
268  // Runs the completion callback on the origin thread.
269  void QueryComplete(int result_code) {
270    // The Job may have been cancelled after it was started.
271    if (!was_cancelled()) {
272      if (result_code >= OK) {  // Note: unit-tests use values > 0.
273        results_->Use(results_buf_);
274      }
275      RunUserCallback(result_code);
276    }
277    OnJobCompleted();
278  }
279
280  // Must only be used on the "origin" thread.
281  ProxyInfo* results_;
282
283  // Can be used on either "origin" or worker thread.
284  BoundNetLog net_log_;
285  const GURL url_;
286
287  // Usable from within DoQuery on the worker thread.
288  ProxyInfo results_buf_;
289
290  bool was_waiting_for_thread_;
291};
292
293// MultiThreadedProxyResolver::Executor ----------------------------------------
294
295MultiThreadedProxyResolver::Executor::Executor(
296    MultiThreadedProxyResolver* coordinator,
297    ProxyResolver* resolver,
298    int thread_number)
299    : coordinator_(coordinator),
300      thread_number_(thread_number),
301      resolver_(resolver) {
302  DCHECK(coordinator);
303  DCHECK(resolver);
304  // Start up the thread.
305  // Note that it is safe to pass a temporary C-String to Thread(), as it will
306  // make a copy.
307  std::string thread_name =
308      base::StringPrintf("PAC thread #%d", thread_number);
309  thread_.reset(new base::Thread(thread_name.c_str()));
310  CHECK(thread_->Start());
311}
312
313void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
314  DCHECK(!outstanding_job_);
315  outstanding_job_ = job;
316
317  // Run the job. Once it has completed (regardless of whether it was
318  // cancelled), it will invoke OnJobCompleted() on this thread.
319  job->set_executor(this);
320  job->FinishedWaitingForThread();
321  thread_->message_loop()->PostTask(
322      FROM_HERE,
323      NewRunnableMethod(job, &Job::Run, MessageLoop::current()));
324}
325
326void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
327  DCHECK_EQ(job, outstanding_job_.get());
328  outstanding_job_ = NULL;
329  coordinator_->OnExecutorReady(this);
330}
331
332void MultiThreadedProxyResolver::Executor::Destroy() {
333  DCHECK(coordinator_);
334
335  // Give the resolver an opportunity to shutdown from THIS THREAD before
336  // joining on the resolver thread. This allows certain implementations
337  // to avoid deadlocks.
338  resolver_->Shutdown();
339
340  {
341    // See http://crbug.com/69710.
342    base::ThreadRestrictions::ScopedAllowIO allow_io;
343
344    // Join the worker thread.
345    thread_.reset();
346  }
347
348  // Cancel any outstanding job.
349  if (outstanding_job_) {
350    outstanding_job_->Cancel();
351    // Orphan the job (since this executor may be deleted soon).
352    outstanding_job_->set_executor(NULL);
353  }
354
355  // It is now safe to free the ProxyResolver, since all the tasks that
356  // were using it on the resolver thread have completed.
357  resolver_.reset();
358
359  // Null some stuff as a precaution.
360  coordinator_ = NULL;
361  outstanding_job_ = NULL;
362}
363
364void MultiThreadedProxyResolver::Executor::PurgeMemory() {
365  scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
366  thread_->message_loop()->PostTask(
367      FROM_HERE,
368      NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
369}
370
371MultiThreadedProxyResolver::Executor::~Executor() {
372  // The important cleanup happens as part of Destroy(), which should always be
373  // called first.
374  DCHECK(!coordinator_) << "Destroy() was not called";
375  DCHECK(!thread_.get());
376  DCHECK(!resolver_.get());
377  DCHECK(!outstanding_job_);
378}
379
380// MultiThreadedProxyResolver --------------------------------------------------
381
382MultiThreadedProxyResolver::MultiThreadedProxyResolver(
383    ProxyResolverFactory* resolver_factory,
384    size_t max_num_threads)
385    : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
386      resolver_factory_(resolver_factory),
387      max_num_threads_(max_num_threads) {
388  DCHECK_GE(max_num_threads, 1u);
389}
390
391MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
392  // We will cancel all outstanding requests.
393  pending_jobs_.clear();
394  ReleaseAllExecutors();
395}
396
397int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url,
398                                               ProxyInfo* results,
399                                               CompletionCallback* callback,
400                                               RequestHandle* request,
401                                               const BoundNetLog& net_log) {
402  DCHECK(CalledOnValidThread());
403  DCHECK(callback);
404  DCHECK(current_script_data_.get())
405      << "Resolver is un-initialized. Must call SetPacScript() first!";
406
407  scoped_refptr<GetProxyForURLJob> job(
408      new GetProxyForURLJob(url, results, callback, net_log));
409
410  // Completion will be notified through |callback|, unless the caller cancels
411  // the request using |request|.
412  if (request)
413    *request = reinterpret_cast<RequestHandle>(job.get());
414
415  // If there is an executor that is ready to run this request, submit it!
416  Executor* executor = FindIdleExecutor();
417  if (executor) {
418    DCHECK_EQ(0u, pending_jobs_.size());
419    executor->StartJob(job);
420    return ERR_IO_PENDING;
421  }
422
423  // Otherwise queue this request. (We will schedule it to a thread once one
424  // becomes available).
425  job->WaitingForThread();
426  pending_jobs_.push_back(job);
427
428  // If we haven't already reached the thread limit, provision a new thread to
429  // drain the requests more quickly.
430  if (executors_.size() < max_num_threads_) {
431    executor = AddNewExecutor();
432    executor->StartJob(
433        new SetPacScriptJob(current_script_data_, NULL));
434  }
435
436  return ERR_IO_PENDING;
437}
438
439void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
440  DCHECK(CalledOnValidThread());
441  DCHECK(req);
442
443  Job* job = reinterpret_cast<Job*>(req);
444  DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
445
446  if (job->executor()) {
447    // If the job was already submitted to the executor, just mark it
448    // as cancelled so the user callback isn't run on completion.
449    job->Cancel();
450  } else {
451    // Otherwise the job is just sitting in a queue.
452    PendingJobsQueue::iterator it =
453        std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
454    DCHECK(it != pending_jobs_.end());
455    pending_jobs_.erase(it);
456  }
457}
458
459void MultiThreadedProxyResolver::CancelSetPacScript() {
460  DCHECK(CalledOnValidThread());
461  DCHECK_EQ(0u, pending_jobs_.size());
462  DCHECK_EQ(1u, executors_.size());
463  DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
464            executors_[0]->outstanding_job()->type());
465
466  // Defensively clear some data which shouldn't be getting used
467  // anymore.
468  current_script_data_ = NULL;
469
470  ReleaseAllExecutors();
471}
472
473void MultiThreadedProxyResolver::PurgeMemory() {
474  DCHECK(CalledOnValidThread());
475  for (ExecutorList::iterator it = executors_.begin();
476       it != executors_.end(); ++it) {
477    Executor* executor = *it;
478    executor->PurgeMemory();
479  }
480}
481
482int MultiThreadedProxyResolver::SetPacScript(
483    const scoped_refptr<ProxyResolverScriptData>& script_data,
484    CompletionCallback* callback) {
485  DCHECK(CalledOnValidThread());
486  DCHECK(callback);
487
488  // Save the script details, so we can provision new executors later.
489  current_script_data_ = script_data;
490
491  // The user should not have any outstanding requests when they call
492  // SetPacScript().
493  CheckNoOutstandingUserRequests();
494
495  // Destroy all of the current threads and their proxy resolvers.
496  ReleaseAllExecutors();
497
498  // Provision a new executor, and run the SetPacScript request. On completion
499  // notification will be sent through |callback|.
500  Executor* executor = AddNewExecutor();
501  executor->StartJob(new SetPacScriptJob(script_data, callback));
502  return ERR_IO_PENDING;
503}
504
505void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
506  DCHECK(CalledOnValidThread());
507  CHECK_EQ(0u, pending_jobs_.size());
508
509  for (ExecutorList::const_iterator it = executors_.begin();
510       it != executors_.end(); ++it) {
511    const Executor* executor = *it;
512    Job* job = executor->outstanding_job();
513    // The "has_user_callback()" is to exclude jobs for which the callback
514    // has already been invoked, or was not user-initiated (as in the case of
515    // lazy thread provisions). User-initiated jobs may !has_user_callback()
516    // when the callback has already been run. (Since we only clear the
517    // outstanding job AFTER the callback has been invoked, it is possible
518    // for a new request to be started from within the callback).
519    CHECK(!job || job->was_cancelled() || !job->has_user_callback());
520  }
521}
522
523void MultiThreadedProxyResolver::ReleaseAllExecutors() {
524  DCHECK(CalledOnValidThread());
525  for (ExecutorList::iterator it = executors_.begin();
526       it != executors_.end(); ++it) {
527    Executor* executor = *it;
528    executor->Destroy();
529  }
530  executors_.clear();
531}
532
533MultiThreadedProxyResolver::Executor*
534MultiThreadedProxyResolver::FindIdleExecutor() {
535  DCHECK(CalledOnValidThread());
536  for (ExecutorList::iterator it = executors_.begin();
537       it != executors_.end(); ++it) {
538    Executor* executor = *it;
539    if (!executor->outstanding_job())
540      return executor;
541  }
542  return NULL;
543}
544
545MultiThreadedProxyResolver::Executor*
546MultiThreadedProxyResolver::AddNewExecutor() {
547  DCHECK(CalledOnValidThread());
548  DCHECK_LT(executors_.size(), max_num_threads_);
549  // The "thread number" is used to give the thread a unique name.
550  int thread_number = executors_.size();
551  ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
552  Executor* executor = new Executor(
553      this, resolver, thread_number);
554  executors_.push_back(make_scoped_refptr(executor));
555  return executor;
556}
557
558void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
559  DCHECK(CalledOnValidThread());
560  if (pending_jobs_.empty())
561    return;
562
563  // Get the next job to process (FIFO). Transfer it from the pending queue
564  // to the executor.
565  scoped_refptr<Job> job = pending_jobs_.front();
566  pending_jobs_.pop_front();
567  executor->StartJob(job);
568}
569
570}  // namespace net
571