1/*
2 *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/modules/utility/source/process_thread_impl.h"
12
13#include "webrtc/base/checks.h"
14#include "webrtc/modules/include/module.h"
15#include "webrtc/system_wrappers/include/logging.h"
16#include "webrtc/system_wrappers/include/tick_util.h"
17
18namespace webrtc {
19namespace {
20
21// We use this constant internally to signal that a module has requested
22// a callback right away.  When this is set, no call to TimeUntilNextProcess
23// should be made, but Process() should be called directly.
24const int64_t kCallProcessImmediately = -1;
25
26int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
27  int64_t interval = module->TimeUntilNextProcess();
28  if (interval < 0) {
29    // Falling behind, we should call the callback now.
30    return time_now;
31  }
32  return time_now + interval;
33}
34}
35
36ProcessThread::~ProcessThread() {}
37
38// static
39rtc::scoped_ptr<ProcessThread> ProcessThread::Create(
40    const char* thread_name) {
41  return rtc::scoped_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
42}
43
44ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
45    : wake_up_(EventWrapper::Create()),
46      stop_(false),
47      thread_name_(thread_name) {}
48
49ProcessThreadImpl::~ProcessThreadImpl() {
50  RTC_DCHECK(thread_checker_.CalledOnValidThread());
51  RTC_DCHECK(!thread_.get());
52  RTC_DCHECK(!stop_);
53
54  while (!queue_.empty()) {
55    delete queue_.front();
56    queue_.pop();
57  }
58}
59
60void ProcessThreadImpl::Start() {
61  RTC_DCHECK(thread_checker_.CalledOnValidThread());
62  RTC_DCHECK(!thread_.get());
63  if (thread_.get())
64    return;
65
66  RTC_DCHECK(!stop_);
67
68  {
69    // TODO(tommi): Since DeRegisterModule is currently being called from
70    // different threads in some cases (ChannelOwner), we need to lock access to
71    // the modules_ collection even on the controller thread.
72    // Once we've cleaned up those places, we can remove this lock.
73    rtc::CritScope lock(&lock_);
74    for (ModuleCallback& m : modules_)
75      m.module->ProcessThreadAttached(this);
76  }
77
78  thread_.reset(
79      new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
80  thread_->Start();
81}
82
83void ProcessThreadImpl::Stop() {
84  RTC_DCHECK(thread_checker_.CalledOnValidThread());
85  if(!thread_.get())
86    return;
87
88  {
89    rtc::CritScope lock(&lock_);
90    stop_ = true;
91  }
92
93  wake_up_->Set();
94
95  thread_->Stop();
96  stop_ = false;
97
98  // TODO(tommi): Since DeRegisterModule is currently being called from
99  // different threads in some cases (ChannelOwner), we need to lock access to
100  // the modules_ collection even on the controller thread.
101  // Since DeRegisterModule also checks thread_, we also need to hold the
102  // lock for the .reset() operation.
103  // Once we've cleaned up those places, we can remove this lock.
104  rtc::CritScope lock(&lock_);
105  thread_.reset();
106  for (ModuleCallback& m : modules_)
107    m.module->ProcessThreadAttached(nullptr);
108}
109
110void ProcessThreadImpl::WakeUp(Module* module) {
111  // Allowed to be called on any thread.
112  {
113    rtc::CritScope lock(&lock_);
114    for (ModuleCallback& m : modules_) {
115      if (m.module == module)
116        m.next_callback = kCallProcessImmediately;
117    }
118  }
119  wake_up_->Set();
120}
121
122void ProcessThreadImpl::PostTask(rtc::scoped_ptr<ProcessTask> task) {
123  // Allowed to be called on any thread.
124  {
125    rtc::CritScope lock(&lock_);
126    queue_.push(task.release());
127  }
128  wake_up_->Set();
129}
130
131void ProcessThreadImpl::RegisterModule(Module* module) {
132  RTC_DCHECK(thread_checker_.CalledOnValidThread());
133  RTC_DCHECK(module);
134
135#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
136  {
137    // Catch programmer error.
138    rtc::CritScope lock(&lock_);
139    for (const ModuleCallback& mc : modules_)
140      RTC_DCHECK(mc.module != module);
141  }
142#endif
143
144  // Now that we know the module isn't in the list, we'll call out to notify
145  // the module that it's attached to the worker thread.  We don't hold
146  // the lock while we make this call.
147  if (thread_.get())
148    module->ProcessThreadAttached(this);
149
150  {
151    rtc::CritScope lock(&lock_);
152    modules_.push_back(ModuleCallback(module));
153  }
154
155  // Wake the thread calling ProcessThreadImpl::Process() to update the
156  // waiting time. The waiting time for the just registered module may be
157  // shorter than all other registered modules.
158  wake_up_->Set();
159}
160
161void ProcessThreadImpl::DeRegisterModule(Module* module) {
162  // Allowed to be called on any thread.
163  // TODO(tommi): Disallow this ^^^
164  RTC_DCHECK(module);
165
166  {
167    rtc::CritScope lock(&lock_);
168    modules_.remove_if([&module](const ModuleCallback& m) {
169        return m.module == module;
170      });
171
172    // TODO(tommi): we currently need to hold the lock while calling out to
173    // ProcessThreadAttached.  This is to make sure that the thread hasn't been
174    // destroyed while we attach the module.  Once we can make sure
175    // DeRegisterModule isn't being called on arbitrary threads, we can move the
176    // |if (thread_.get())| check and ProcessThreadAttached() call outside the
177    // lock scope.
178
179    // Notify the module that it's been detached.
180    if (thread_.get())
181      module->ProcessThreadAttached(nullptr);
182  }
183}
184
185// static
186bool ProcessThreadImpl::Run(void* obj) {
187  return static_cast<ProcessThreadImpl*>(obj)->Process();
188}
189
190bool ProcessThreadImpl::Process() {
191  int64_t now = TickTime::MillisecondTimestamp();
192  int64_t next_checkpoint = now + (1000 * 60);
193
194  {
195    rtc::CritScope lock(&lock_);
196    if (stop_)
197      return false;
198    for (ModuleCallback& m : modules_) {
199      // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
200      // takes and dcheck if it takes too long (e.g. >=10ms).  Ideally this
201      // operation should not require taking a lock, so querying all modules
202      // should run in a matter of nanoseconds.
203      if (m.next_callback == 0)
204        m.next_callback = GetNextCallbackTime(m.module, now);
205
206      if (m.next_callback <= now ||
207          m.next_callback == kCallProcessImmediately) {
208        m.module->Process();
209        // Use a new 'now' reference to calculate when the next callback
210        // should occur.  We'll continue to use 'now' above for the baseline
211        // of calculating how long we should wait, to reduce variance.
212        int64_t new_now = TickTime::MillisecondTimestamp();
213        m.next_callback = GetNextCallbackTime(m.module, new_now);
214      }
215
216      if (m.next_callback < next_checkpoint)
217        next_checkpoint = m.next_callback;
218    }
219
220    while (!queue_.empty()) {
221      ProcessTask* task = queue_.front();
222      queue_.pop();
223      lock_.Leave();
224      task->Run();
225      delete task;
226      lock_.Enter();
227    }
228  }
229
230  int64_t time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp();
231  if (time_to_wait > 0)
232    wake_up_->Wait(static_cast<unsigned long>(time_to_wait));
233
234  return true;
235}
236}  // namespace webrtc
237