1/*
2 *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/thread.h"
12
13#ifndef __has_feature
14#define __has_feature(x) 0  // Compatibility with non-clang or LLVM compilers.
15#endif  // __has_feature
16
17#if defined(WEBRTC_WIN)
18#include <comdef.h>
19#elif defined(WEBRTC_POSIX)
20#include <time.h>
21#endif
22
23#include "webrtc/base/common.h"
24#include "webrtc/base/logging.h"
25#include "webrtc/base/stringutils.h"
26#include "webrtc/base/timeutils.h"
27
28#if !__has_feature(objc_arc) && (defined(WEBRTC_MAC))
29#include "webrtc/base/maccocoathreadhelper.h"
30#include "webrtc/base/scoped_autorelease_pool.h"
31#endif
32
33namespace rtc {
34
35ThreadManager* ThreadManager::Instance() {
36  LIBJINGLE_DEFINE_STATIC_LOCAL(ThreadManager, thread_manager, ());
37  return &thread_manager;
38}
39
40// static
41Thread* Thread::Current() {
42  return ThreadManager::Instance()->CurrentThread();
43}
44
45#if defined(WEBRTC_POSIX)
46ThreadManager::ThreadManager() {
47  pthread_key_create(&key_, NULL);
48#ifndef NO_MAIN_THREAD_WRAPPING
49  WrapCurrentThread();
50#endif
51#if !__has_feature(objc_arc) && (defined(WEBRTC_MAC))
52  // Under Automatic Reference Counting (ARC), you cannot use autorelease pools
53  // directly. Instead, you use @autoreleasepool blocks instead.  Also, we are
54  // maintaining thread safety using immutability within context of GCD dispatch
55  // queues in this case.
56  InitCocoaMultiThreading();
57#endif
58}
59
60ThreadManager::~ThreadManager() {
61#if __has_feature(objc_arc)
62  @autoreleasepool
63#elif defined(WEBRTC_MAC)
64  // This is called during exit, at which point apparently no NSAutoreleasePools
65  // are available; but we might still need them to do cleanup (or we get the
66  // "no autoreleasepool in place, just leaking" warning when exiting).
67  ScopedAutoreleasePool pool;
68#endif
69  {
70    UnwrapCurrentThread();
71    pthread_key_delete(key_);
72  }
73}
74
75Thread *ThreadManager::CurrentThread() {
76  return static_cast<Thread *>(pthread_getspecific(key_));
77}
78
79void ThreadManager::SetCurrentThread(Thread *thread) {
80  pthread_setspecific(key_, thread);
81}
82#endif
83
84#if defined(WEBRTC_WIN)
85ThreadManager::ThreadManager() {
86  key_ = TlsAlloc();
87#ifndef NO_MAIN_THREAD_WRAPPING
88  WrapCurrentThread();
89#endif
90}
91
92ThreadManager::~ThreadManager() {
93  UnwrapCurrentThread();
94  TlsFree(key_);
95}
96
97Thread *ThreadManager::CurrentThread() {
98  return static_cast<Thread *>(TlsGetValue(key_));
99}
100
101void ThreadManager::SetCurrentThread(Thread *thread) {
102  TlsSetValue(key_, thread);
103}
104#endif
105
106Thread *ThreadManager::WrapCurrentThread() {
107  Thread* result = CurrentThread();
108  if (NULL == result) {
109    result = new Thread();
110    result->WrapCurrentWithThreadManager(this);
111  }
112  return result;
113}
114
115void ThreadManager::UnwrapCurrentThread() {
116  Thread* t = CurrentThread();
117  if (t && !(t->IsOwned())) {
118    t->UnwrapCurrent();
119    delete t;
120  }
121}
122
123struct ThreadInit {
124  Thread* thread;
125  Runnable* runnable;
126};
127
128Thread::Thread(SocketServer* ss)
129    : MessageQueue(ss),
130      priority_(PRIORITY_NORMAL),
131      running_(true, false),
132#if defined(WEBRTC_WIN)
133      thread_(NULL),
134      thread_id_(0),
135#endif
136      owned_(true) {
137  SetName("Thread", this);  // default name
138}
139
140Thread::~Thread() {
141  Stop();
142  Clear(NULL);
143}
144
145bool Thread::SleepMs(int milliseconds) {
146#if defined(WEBRTC_WIN)
147  ::Sleep(milliseconds);
148  return true;
149#else
150  // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
151  // so we use nanosleep() even though it has greater precision than necessary.
152  struct timespec ts;
153  ts.tv_sec = milliseconds / 1000;
154  ts.tv_nsec = (milliseconds % 1000) * 1000000;
155  int ret = nanosleep(&ts, NULL);
156  if (ret != 0) {
157    LOG_ERR(LS_WARNING) << "nanosleep() returning early";
158    return false;
159  }
160  return true;
161#endif
162}
163
164bool Thread::SetName(const std::string& name, const void* obj) {
165  if (running()) return false;
166  name_ = name;
167  if (obj) {
168    char buf[16];
169    sprintfn(buf, sizeof(buf), " 0x%p", obj);
170    name_ += buf;
171  }
172  return true;
173}
174
175bool Thread::SetPriority(ThreadPriority priority) {
176#if defined(WEBRTC_WIN)
177  if (running()) {
178    BOOL ret = FALSE;
179    if (priority == PRIORITY_NORMAL) {
180      ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_NORMAL);
181    } else if (priority == PRIORITY_HIGH) {
182      ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_HIGHEST);
183    } else if (priority == PRIORITY_ABOVE_NORMAL) {
184      ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_ABOVE_NORMAL);
185    } else if (priority == PRIORITY_IDLE) {
186      ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_IDLE);
187    }
188    if (!ret) {
189      return false;
190    }
191  }
192  priority_ = priority;
193  return true;
194#else
195  // TODO: Implement for Linux/Mac if possible.
196  if (running()) return false;
197  priority_ = priority;
198  return true;
199#endif
200}
201
202bool Thread::Start(Runnable* runnable) {
203  ASSERT(owned_);
204  if (!owned_) return false;
205  ASSERT(!running());
206  if (running()) return false;
207
208  Restart();  // reset fStop_ if the thread is being restarted
209
210  // Make sure that ThreadManager is created on the main thread before
211  // we start a new thread.
212  ThreadManager::Instance();
213
214  ThreadInit* init = new ThreadInit;
215  init->thread = this;
216  init->runnable = runnable;
217#if defined(WEBRTC_WIN)
218  DWORD flags = 0;
219  if (priority_ != PRIORITY_NORMAL) {
220    flags = CREATE_SUSPENDED;
221  }
222  thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, flags,
223                         &thread_id_);
224  if (thread_) {
225    running_.Set();
226    if (priority_ != PRIORITY_NORMAL) {
227      SetPriority(priority_);
228      ::ResumeThread(thread_);
229    }
230  } else {
231    return false;
232  }
233#elif defined(WEBRTC_POSIX)
234  pthread_attr_t attr;
235  pthread_attr_init(&attr);
236
237  // Thread priorities are not supported in NaCl.
238#if !defined(__native_client__)
239  if (priority_ != PRIORITY_NORMAL) {
240    if (priority_ == PRIORITY_IDLE) {
241      // There is no POSIX-standard way to set a below-normal priority for an
242      // individual thread (only whole process), so let's not support it.
243      LOG(LS_WARNING) << "PRIORITY_IDLE not supported";
244    } else {
245      // Set real-time round-robin policy.
246      if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) {
247        LOG(LS_ERROR) << "pthread_attr_setschedpolicy";
248      }
249      struct sched_param param;
250      if (pthread_attr_getschedparam(&attr, &param) != 0) {
251        LOG(LS_ERROR) << "pthread_attr_getschedparam";
252      } else {
253        // The numbers here are arbitrary.
254        if (priority_ == PRIORITY_HIGH) {
255          param.sched_priority = 6;           // 6 = HIGH
256        } else {
257          ASSERT(priority_ == PRIORITY_ABOVE_NORMAL);
258          param.sched_priority = 4;           // 4 = ABOVE_NORMAL
259        }
260        if (pthread_attr_setschedparam(&attr, &param) != 0) {
261          LOG(LS_ERROR) << "pthread_attr_setschedparam";
262        }
263      }
264    }
265  }
266#endif  // !defined(__native_client__)
267
268  int error_code = pthread_create(&thread_, &attr, PreRun, init);
269  if (0 != error_code) {
270    LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
271    return false;
272  }
273  running_.Set();
274#endif
275  return true;
276}
277
278void Thread::Join() {
279  if (running()) {
280    ASSERT(!IsCurrent());
281#if defined(WEBRTC_WIN)
282    WaitForSingleObject(thread_, INFINITE);
283    CloseHandle(thread_);
284    thread_ = NULL;
285    thread_id_ = 0;
286#elif defined(WEBRTC_POSIX)
287    void *pv;
288    pthread_join(thread_, &pv);
289#endif
290    running_.Reset();
291  }
292}
293
294#if defined(WEBRTC_WIN)
295// As seen on MSDN.
296// http://msdn.microsoft.com/en-us/library/xcb2z8hs(VS.71).aspx
297#define MSDEV_SET_THREAD_NAME  0x406D1388
298typedef struct tagTHREADNAME_INFO {
299  DWORD dwType;
300  LPCSTR szName;
301  DWORD dwThreadID;
302  DWORD dwFlags;
303} THREADNAME_INFO;
304
305void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) {
306  THREADNAME_INFO info;
307  info.dwType = 0x1000;
308  info.szName = szThreadName;
309  info.dwThreadID = dwThreadID;
310  info.dwFlags = 0;
311
312  __try {
313    RaiseException(MSDEV_SET_THREAD_NAME, 0, sizeof(info) / sizeof(DWORD),
314                   reinterpret_cast<ULONG_PTR*>(&info));
315  }
316  __except(EXCEPTION_CONTINUE_EXECUTION) {
317  }
318}
319#endif  // WEBRTC_WIN
320
321void* Thread::PreRun(void* pv) {
322  ThreadInit* init = static_cast<ThreadInit*>(pv);
323  ThreadManager::Instance()->SetCurrentThread(init->thread);
324#if defined(WEBRTC_WIN)
325  SetThreadName(GetCurrentThreadId(), init->thread->name_.c_str());
326#elif defined(WEBRTC_POSIX)
327  // TODO: See if naming exists for pthreads.
328#endif
329#if __has_feature(objc_arc)
330  @autoreleasepool
331#elif defined(WEBRTC_MAC)
332  // Make sure the new thread has an autoreleasepool
333  ScopedAutoreleasePool pool;
334#endif
335  {
336    if (init->runnable) {
337      init->runnable->Run(init->thread);
338    } else {
339      init->thread->Run();
340    }
341    delete init;
342    return NULL;
343  }
344}
345
346void Thread::Run() {
347  ProcessMessages(kForever);
348}
349
350bool Thread::IsOwned() {
351  return owned_;
352}
353
354void Thread::Stop() {
355  MessageQueue::Quit();
356  Join();
357}
358
359void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
360  if (fStop_)
361    return;
362
363  // Sent messages are sent to the MessageHandler directly, in the context
364  // of "thread", like Win32 SendMessage. If in the right context,
365  // call the handler directly.
366
367  Message msg;
368  msg.phandler = phandler;
369  msg.message_id = id;
370  msg.pdata = pdata;
371  if (IsCurrent()) {
372    phandler->OnMessage(&msg);
373    return;
374  }
375
376  AutoThread thread;
377  Thread *current_thread = Thread::Current();
378  ASSERT(current_thread != NULL);  // AutoThread ensures this
379
380  bool ready = false;
381  {
382    CritScope cs(&crit_);
383    _SendMessage smsg;
384    smsg.thread = current_thread;
385    smsg.msg = msg;
386    smsg.ready = &ready;
387    sendlist_.push_back(smsg);
388  }
389
390  // Wait for a reply
391
392  ss_->WakeUp();
393
394  bool waited = false;
395  crit_.Enter();
396  while (!ready) {
397    crit_.Leave();
398    current_thread->ReceiveSends();
399    current_thread->socketserver()->Wait(kForever, false);
400    waited = true;
401    crit_.Enter();
402  }
403  crit_.Leave();
404
405  // Our Wait loop above may have consumed some WakeUp events for this
406  // MessageQueue, that weren't relevant to this Send.  Losing these WakeUps can
407  // cause problems for some SocketServers.
408  //
409  // Concrete example:
410  // Win32SocketServer on thread A calls Send on thread B.  While processing the
411  // message, thread B Posts a message to A.  We consume the wakeup for that
412  // Post while waiting for the Send to complete, which means that when we exit
413  // this loop, we need to issue another WakeUp, or else the Posted message
414  // won't be processed in a timely manner.
415
416  if (waited) {
417    current_thread->socketserver()->WakeUp();
418  }
419}
420
421void Thread::ReceiveSends() {
422  // Receive a sent message. Cleanup scenarios:
423  // - thread sending exits: We don't allow this, since thread can exit
424  //   only via Join, so Send must complete.
425  // - thread receiving exits: Wakeup/set ready in Thread::Clear()
426  // - object target cleared: Wakeup/set ready in Thread::Clear()
427  crit_.Enter();
428  while (!sendlist_.empty()) {
429    _SendMessage smsg = sendlist_.front();
430    sendlist_.pop_front();
431    crit_.Leave();
432    smsg.msg.phandler->OnMessage(&smsg.msg);
433    crit_.Enter();
434    *smsg.ready = true;
435    smsg.thread->socketserver()->WakeUp();
436  }
437  crit_.Leave();
438}
439
440void Thread::Clear(MessageHandler *phandler, uint32 id,
441                   MessageList* removed) {
442  CritScope cs(&crit_);
443
444  // Remove messages on sendlist_ with phandler
445  // Object target cleared: remove from send list, wakeup/set ready
446  // if sender not NULL.
447
448  std::list<_SendMessage>::iterator iter = sendlist_.begin();
449  while (iter != sendlist_.end()) {
450    _SendMessage smsg = *iter;
451    if (smsg.msg.Match(phandler, id)) {
452      if (removed) {
453        removed->push_back(smsg.msg);
454      } else {
455        delete smsg.msg.pdata;
456      }
457      iter = sendlist_.erase(iter);
458      *smsg.ready = true;
459      smsg.thread->socketserver()->WakeUp();
460      continue;
461    }
462    ++iter;
463  }
464
465  MessageQueue::Clear(phandler, id, removed);
466}
467
468bool Thread::ProcessMessages(int cmsLoop) {
469  uint32 msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
470  int cmsNext = cmsLoop;
471
472  while (true) {
473#if __has_feature(objc_arc)
474    @autoreleasepool
475#elif defined(WEBRTC_MAC)
476    // see: http://developer.apple.com/library/mac/#documentation/Cocoa/Reference/Foundation/Classes/NSAutoreleasePool_Class/Reference/Reference.html
477    // Each thread is supposed to have an autorelease pool. Also for event loops
478    // like this, autorelease pool needs to be created and drained/released
479    // for each cycle.
480    ScopedAutoreleasePool pool;
481#endif
482    {
483      Message msg;
484      if (!Get(&msg, cmsNext))
485        return !IsQuitting();
486      Dispatch(&msg);
487
488      if (cmsLoop != kForever) {
489        cmsNext = TimeUntil(msEnd);
490        if (cmsNext < 0)
491          return true;
492      }
493    }
494  }
495}
496
497bool Thread::WrapCurrent() {
498  return WrapCurrentWithThreadManager(ThreadManager::Instance());
499}
500
501bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager) {
502  if (running())
503    return false;
504#if defined(WEBRTC_WIN)
505  // We explicitly ask for no rights other than synchronization.
506  // This gives us the best chance of succeeding.
507  thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
508  if (!thread_) {
509    LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
510    return false;
511  }
512  thread_id_ = GetCurrentThreadId();
513#elif defined(WEBRTC_POSIX)
514  thread_ = pthread_self();
515#endif
516  owned_ = false;
517  running_.Set();
518  thread_manager->SetCurrentThread(this);
519  return true;
520}
521
522void Thread::UnwrapCurrent() {
523  // Clears the platform-specific thread-specific storage.
524  ThreadManager::Instance()->SetCurrentThread(NULL);
525#if defined(WEBRTC_WIN)
526  if (!CloseHandle(thread_)) {
527    LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle.";
528  }
529#endif
530  running_.Reset();
531}
532
533
534AutoThread::AutoThread(SocketServer* ss) : Thread(ss) {
535  if (!ThreadManager::Instance()->CurrentThread()) {
536    ThreadManager::Instance()->SetCurrentThread(this);
537  }
538}
539
540AutoThread::~AutoThread() {
541  Stop();
542  if (ThreadManager::Instance()->CurrentThread() == this) {
543    ThreadManager::Instance()->SetCurrentThread(NULL);
544  }
545}
546
547#if defined(WEBRTC_WIN)
548void ComThread::Run() {
549  HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED);
550  ASSERT(SUCCEEDED(hr));
551  if (SUCCEEDED(hr)) {
552    Thread::Run();
553    CoUninitialize();
554  } else {
555    LOG(LS_ERROR) << "CoInitialize failed, hr=" << hr;
556  }
557}
558#endif
559
560}  // namespace rtc
561