1/*
2 *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/thread.h"
12
13#ifndef __has_feature
14#define __has_feature(x) 0  // Compatibility with non-clang or LLVM compilers.
15#endif  // __has_feature
16
17#if defined(WEBRTC_WIN)
18#include <comdef.h>
19#elif defined(WEBRTC_POSIX)
20#include <time.h>
21#endif
22
23#include "webrtc/base/common.h"
24#include "webrtc/base/logging.h"
25#include "webrtc/base/platform_thread.h"
26#include "webrtc/base/stringutils.h"
27#include "webrtc/base/timeutils.h"
28
29#if !__has_feature(objc_arc) && (defined(WEBRTC_MAC))
30#include "webrtc/base/maccocoathreadhelper.h"
31#include "webrtc/base/scoped_autorelease_pool.h"
32#endif
33
34#include "webrtc/base/trace_event.h"
35
36namespace rtc {
37
38ThreadManager* ThreadManager::Instance() {
39  RTC_DEFINE_STATIC_LOCAL(ThreadManager, thread_manager, ());
40  return &thread_manager;
41}
42
43// static
44Thread* Thread::Current() {
45  return ThreadManager::Instance()->CurrentThread();
46}
47
48#if defined(WEBRTC_POSIX)
49ThreadManager::ThreadManager() {
50  pthread_key_create(&key_, NULL);
51#ifndef NO_MAIN_THREAD_WRAPPING
52  WrapCurrentThread();
53#endif
54#if !__has_feature(objc_arc) && (defined(WEBRTC_MAC))
55  // Under Automatic Reference Counting (ARC), you cannot use autorelease pools
56  // directly. Instead, you use @autoreleasepool blocks instead.  Also, we are
57  // maintaining thread safety using immutability within context of GCD dispatch
58  // queues in this case.
59  InitCocoaMultiThreading();
60#endif
61}
62
63ThreadManager::~ThreadManager() {
64#if __has_feature(objc_arc)
65  @autoreleasepool
66#elif defined(WEBRTC_MAC)
67  // This is called during exit, at which point apparently no NSAutoreleasePools
68  // are available; but we might still need them to do cleanup (or we get the
69  // "no autoreleasepool in place, just leaking" warning when exiting).
70  ScopedAutoreleasePool pool;
71#endif
72  {
73    UnwrapCurrentThread();
74    pthread_key_delete(key_);
75  }
76}
77
78Thread *ThreadManager::CurrentThread() {
79  return static_cast<Thread *>(pthread_getspecific(key_));
80}
81
82void ThreadManager::SetCurrentThread(Thread *thread) {
83  pthread_setspecific(key_, thread);
84}
85#endif
86
87#if defined(WEBRTC_WIN)
88ThreadManager::ThreadManager() {
89  key_ = TlsAlloc();
90#ifndef NO_MAIN_THREAD_WRAPPING
91  WrapCurrentThread();
92#endif
93}
94
95ThreadManager::~ThreadManager() {
96  UnwrapCurrentThread();
97  TlsFree(key_);
98}
99
100Thread *ThreadManager::CurrentThread() {
101  return static_cast<Thread *>(TlsGetValue(key_));
102}
103
104void ThreadManager::SetCurrentThread(Thread *thread) {
105  TlsSetValue(key_, thread);
106}
107#endif
108
109Thread *ThreadManager::WrapCurrentThread() {
110  Thread* result = CurrentThread();
111  if (NULL == result) {
112    result = new Thread();
113    result->WrapCurrentWithThreadManager(this, true);
114  }
115  return result;
116}
117
118void ThreadManager::UnwrapCurrentThread() {
119  Thread* t = CurrentThread();
120  if (t && !(t->IsOwned())) {
121    t->UnwrapCurrent();
122    delete t;
123  }
124}
125
126struct ThreadInit {
127  Thread* thread;
128  Runnable* runnable;
129};
130
131Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls()
132  : thread_(Thread::Current()),
133    previous_state_(thread_->SetAllowBlockingCalls(false)) {
134}
135
136Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
137  ASSERT(thread_->IsCurrent());
138  thread_->SetAllowBlockingCalls(previous_state_);
139}
140
141Thread::Thread(SocketServer* ss)
142    : MessageQueue(ss),
143      running_(true, false),
144#if defined(WEBRTC_WIN)
145      thread_(NULL),
146      thread_id_(0),
147#endif
148      owned_(true),
149      blocking_calls_allowed_(true) {
150  SetName("Thread", this);  // default name
151}
152
153Thread::~Thread() {
154  Stop();
155  Clear(NULL);
156}
157
158bool Thread::SleepMs(int milliseconds) {
159  AssertBlockingIsAllowedOnCurrentThread();
160
161#if defined(WEBRTC_WIN)
162  ::Sleep(milliseconds);
163  return true;
164#else
165  // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
166  // so we use nanosleep() even though it has greater precision than necessary.
167  struct timespec ts;
168  ts.tv_sec = milliseconds / 1000;
169  ts.tv_nsec = (milliseconds % 1000) * 1000000;
170  int ret = nanosleep(&ts, NULL);
171  if (ret != 0) {
172    LOG_ERR(LS_WARNING) << "nanosleep() returning early";
173    return false;
174  }
175  return true;
176#endif
177}
178
179bool Thread::SetName(const std::string& name, const void* obj) {
180  if (running()) return false;
181  name_ = name;
182  if (obj) {
183    char buf[16];
184    sprintfn(buf, sizeof(buf), " 0x%p", obj);
185    name_ += buf;
186  }
187  return true;
188}
189
190bool Thread::Start(Runnable* runnable) {
191  ASSERT(owned_);
192  if (!owned_) return false;
193  ASSERT(!running());
194  if (running()) return false;
195
196  Restart();  // reset fStop_ if the thread is being restarted
197
198  // Make sure that ThreadManager is created on the main thread before
199  // we start a new thread.
200  ThreadManager::Instance();
201
202  ThreadInit* init = new ThreadInit;
203  init->thread = this;
204  init->runnable = runnable;
205#if defined(WEBRTC_WIN)
206  thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, 0,
207                         &thread_id_);
208  if (thread_) {
209    running_.Set();
210  } else {
211    return false;
212  }
213#elif defined(WEBRTC_POSIX)
214  pthread_attr_t attr;
215  pthread_attr_init(&attr);
216
217  int error_code = pthread_create(&thread_, &attr, PreRun, init);
218  if (0 != error_code) {
219    LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
220    return false;
221  }
222  running_.Set();
223#endif
224  return true;
225}
226
227bool Thread::WrapCurrent() {
228  return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
229}
230
231void Thread::UnwrapCurrent() {
232  // Clears the platform-specific thread-specific storage.
233  ThreadManager::Instance()->SetCurrentThread(NULL);
234#if defined(WEBRTC_WIN)
235  if (thread_ != NULL) {
236    if (!CloseHandle(thread_)) {
237      LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle.";
238    }
239    thread_ = NULL;
240  }
241#endif
242  running_.Reset();
243}
244
245void Thread::SafeWrapCurrent() {
246  WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
247}
248
249void Thread::Join() {
250  if (running()) {
251    ASSERT(!IsCurrent());
252    if (Current() && !Current()->blocking_calls_allowed_) {
253      LOG(LS_WARNING) << "Waiting for the thread to join, "
254                      << "but blocking calls have been disallowed";
255    }
256
257#if defined(WEBRTC_WIN)
258    ASSERT(thread_ != NULL);
259    WaitForSingleObject(thread_, INFINITE);
260    CloseHandle(thread_);
261    thread_ = NULL;
262    thread_id_ = 0;
263#elif defined(WEBRTC_POSIX)
264    void *pv;
265    pthread_join(thread_, &pv);
266#endif
267    running_.Reset();
268  }
269}
270
271bool Thread::SetAllowBlockingCalls(bool allow) {
272  ASSERT(IsCurrent());
273  bool previous = blocking_calls_allowed_;
274  blocking_calls_allowed_ = allow;
275  return previous;
276}
277
278// static
279void Thread::AssertBlockingIsAllowedOnCurrentThread() {
280#if !defined(NDEBUG)
281  Thread* current = Thread::Current();
282  ASSERT(!current || current->blocking_calls_allowed_);
283#endif
284}
285
286void* Thread::PreRun(void* pv) {
287  ThreadInit* init = static_cast<ThreadInit*>(pv);
288  ThreadManager::Instance()->SetCurrentThread(init->thread);
289  rtc::SetCurrentThreadName(init->thread->name_.c_str());
290#if __has_feature(objc_arc)
291  @autoreleasepool
292#elif defined(WEBRTC_MAC)
293  // Make sure the new thread has an autoreleasepool
294  ScopedAutoreleasePool pool;
295#endif
296  {
297    if (init->runnable) {
298      init->runnable->Run(init->thread);
299    } else {
300      init->thread->Run();
301    }
302    delete init;
303    return NULL;
304  }
305}
306
307void Thread::Run() {
308  ProcessMessages(kForever);
309}
310
311bool Thread::IsOwned() {
312  return owned_;
313}
314
315void Thread::Stop() {
316  MessageQueue::Quit();
317  Join();
318}
319
320void Thread::Send(MessageHandler* phandler, uint32_t id, MessageData* pdata) {
321  if (fStop_)
322    return;
323
324  // Sent messages are sent to the MessageHandler directly, in the context
325  // of "thread", like Win32 SendMessage. If in the right context,
326  // call the handler directly.
327  Message msg;
328  msg.phandler = phandler;
329  msg.message_id = id;
330  msg.pdata = pdata;
331  if (IsCurrent()) {
332    phandler->OnMessage(&msg);
333    return;
334  }
335
336  AssertBlockingIsAllowedOnCurrentThread();
337
338  AutoThread thread;
339  Thread *current_thread = Thread::Current();
340  ASSERT(current_thread != NULL);  // AutoThread ensures this
341
342  bool ready = false;
343  {
344    CritScope cs(&crit_);
345    _SendMessage smsg;
346    smsg.thread = current_thread;
347    smsg.msg = msg;
348    smsg.ready = &ready;
349    sendlist_.push_back(smsg);
350  }
351
352  // Wait for a reply
353
354  ss_->WakeUp();
355
356  bool waited = false;
357  crit_.Enter();
358  while (!ready) {
359    crit_.Leave();
360    // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
361    // thread invoking calls on the current thread.
362    current_thread->ReceiveSendsFromThread(this);
363    current_thread->socketserver()->Wait(kForever, false);
364    waited = true;
365    crit_.Enter();
366  }
367  crit_.Leave();
368
369  // Our Wait loop above may have consumed some WakeUp events for this
370  // MessageQueue, that weren't relevant to this Send.  Losing these WakeUps can
371  // cause problems for some SocketServers.
372  //
373  // Concrete example:
374  // Win32SocketServer on thread A calls Send on thread B.  While processing the
375  // message, thread B Posts a message to A.  We consume the wakeup for that
376  // Post while waiting for the Send to complete, which means that when we exit
377  // this loop, we need to issue another WakeUp, or else the Posted message
378  // won't be processed in a timely manner.
379
380  if (waited) {
381    current_thread->socketserver()->WakeUp();
382  }
383}
384
385void Thread::ReceiveSends() {
386  ReceiveSendsFromThread(NULL);
387}
388
389void Thread::ReceiveSendsFromThread(const Thread* source) {
390  // Receive a sent message. Cleanup scenarios:
391  // - thread sending exits: We don't allow this, since thread can exit
392  //   only via Join, so Send must complete.
393  // - thread receiving exits: Wakeup/set ready in Thread::Clear()
394  // - object target cleared: Wakeup/set ready in Thread::Clear()
395  _SendMessage smsg;
396
397  crit_.Enter();
398  while (PopSendMessageFromThread(source, &smsg)) {
399    crit_.Leave();
400
401    smsg.msg.phandler->OnMessage(&smsg.msg);
402
403    crit_.Enter();
404    *smsg.ready = true;
405    smsg.thread->socketserver()->WakeUp();
406  }
407  crit_.Leave();
408}
409
410bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
411  for (std::list<_SendMessage>::iterator it = sendlist_.begin();
412       it != sendlist_.end(); ++it) {
413    if (it->thread == source || source == NULL) {
414      *msg = *it;
415      sendlist_.erase(it);
416      return true;
417    }
418  }
419  return false;
420}
421
422void Thread::InvokeBegin() {
423  TRACE_EVENT_BEGIN0("webrtc", "Thread::Invoke");
424}
425
426void Thread::InvokeEnd() {
427  TRACE_EVENT_END0("webrtc", "Thread::Invoke");
428}
429
430void Thread::Clear(MessageHandler* phandler,
431                   uint32_t id,
432                   MessageList* removed) {
433  CritScope cs(&crit_);
434
435  // Remove messages on sendlist_ with phandler
436  // Object target cleared: remove from send list, wakeup/set ready
437  // if sender not NULL.
438
439  std::list<_SendMessage>::iterator iter = sendlist_.begin();
440  while (iter != sendlist_.end()) {
441    _SendMessage smsg = *iter;
442    if (smsg.msg.Match(phandler, id)) {
443      if (removed) {
444        removed->push_back(smsg.msg);
445      } else {
446        delete smsg.msg.pdata;
447      }
448      iter = sendlist_.erase(iter);
449      *smsg.ready = true;
450      smsg.thread->socketserver()->WakeUp();
451      continue;
452    }
453    ++iter;
454  }
455
456  MessageQueue::Clear(phandler, id, removed);
457}
458
459bool Thread::ProcessMessages(int cmsLoop) {
460  uint32_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
461  int cmsNext = cmsLoop;
462
463  while (true) {
464#if __has_feature(objc_arc)
465    @autoreleasepool
466#elif defined(WEBRTC_MAC)
467    // see: http://developer.apple.com/library/mac/#documentation/Cocoa/Reference/Foundation/Classes/NSAutoreleasePool_Class/Reference/Reference.html
468    // Each thread is supposed to have an autorelease pool. Also for event loops
469    // like this, autorelease pool needs to be created and drained/released
470    // for each cycle.
471    ScopedAutoreleasePool pool;
472#endif
473    {
474      Message msg;
475      if (!Get(&msg, cmsNext))
476        return !IsQuitting();
477      Dispatch(&msg);
478
479      if (cmsLoop != kForever) {
480        cmsNext = TimeUntil(msEnd);
481        if (cmsNext < 0)
482          return true;
483      }
484    }
485  }
486}
487
488bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
489                                          bool need_synchronize_access) {
490  if (running())
491    return false;
492
493#if defined(WEBRTC_WIN)
494  if (need_synchronize_access) {
495    // We explicitly ask for no rights other than synchronization.
496    // This gives us the best chance of succeeding.
497    thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
498    if (!thread_) {
499      LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
500      return false;
501    }
502    thread_id_ = GetCurrentThreadId();
503  }
504#elif defined(WEBRTC_POSIX)
505  thread_ = pthread_self();
506#endif
507
508  owned_ = false;
509  running_.Set();
510  thread_manager->SetCurrentThread(this);
511  return true;
512}
513
514AutoThread::AutoThread(SocketServer* ss) : Thread(ss) {
515  if (!ThreadManager::Instance()->CurrentThread()) {
516    ThreadManager::Instance()->SetCurrentThread(this);
517  }
518}
519
520AutoThread::~AutoThread() {
521  Stop();
522  if (ThreadManager::Instance()->CurrentThread() == this) {
523    ThreadManager::Instance()->SetCurrentThread(NULL);
524  }
525}
526
527#if defined(WEBRTC_WIN)
528void ComThread::Run() {
529  HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED);
530  ASSERT(SUCCEEDED(hr));
531  if (SUCCEEDED(hr)) {
532    Thread::Run();
533    CoUninitialize();
534  } else {
535    LOG(LS_ERROR) << "CoInitialize failed, hr=" << hr;
536  }
537}
538#endif
539
540}  // namespace rtc
541