1/* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11#include "webrtc/base/thread.h" 12 13#ifndef __has_feature 14#define __has_feature(x) 0 // Compatibility with non-clang or LLVM compilers. 15#endif // __has_feature 16 17#if defined(WEBRTC_WIN) 18#include <comdef.h> 19#elif defined(WEBRTC_POSIX) 20#include <time.h> 21#endif 22 23#include "webrtc/base/common.h" 24#include "webrtc/base/logging.h" 25#include "webrtc/base/stringutils.h" 26#include "webrtc/base/timeutils.h" 27 28#if !__has_feature(objc_arc) && (defined(WEBRTC_MAC)) 29#include "webrtc/base/maccocoathreadhelper.h" 30#include "webrtc/base/scoped_autorelease_pool.h" 31#endif 32 33namespace rtc { 34 35ThreadManager* ThreadManager::Instance() { 36 LIBJINGLE_DEFINE_STATIC_LOCAL(ThreadManager, thread_manager, ()); 37 return &thread_manager; 38} 39 40// static 41Thread* Thread::Current() { 42 return ThreadManager::Instance()->CurrentThread(); 43} 44 45#if defined(WEBRTC_POSIX) 46ThreadManager::ThreadManager() { 47 pthread_key_create(&key_, NULL); 48#ifndef NO_MAIN_THREAD_WRAPPING 49 WrapCurrentThread(); 50#endif 51#if !__has_feature(objc_arc) && (defined(WEBRTC_MAC)) 52 // Under Automatic Reference Counting (ARC), you cannot use autorelease pools 53 // directly. Instead, you use @autoreleasepool blocks instead. Also, we are 54 // maintaining thread safety using immutability within context of GCD dispatch 55 // queues in this case. 56 InitCocoaMultiThreading(); 57#endif 58} 59 60ThreadManager::~ThreadManager() { 61#if __has_feature(objc_arc) 62 @autoreleasepool 63#elif defined(WEBRTC_MAC) 64 // This is called during exit, at which point apparently no NSAutoreleasePools 65 // are available; but we might still need them to do cleanup (or we get the 66 // "no autoreleasepool in place, just leaking" warning when exiting). 67 ScopedAutoreleasePool pool; 68#endif 69 { 70 UnwrapCurrentThread(); 71 pthread_key_delete(key_); 72 } 73} 74 75Thread *ThreadManager::CurrentThread() { 76 return static_cast<Thread *>(pthread_getspecific(key_)); 77} 78 79void ThreadManager::SetCurrentThread(Thread *thread) { 80 pthread_setspecific(key_, thread); 81} 82#endif 83 84#if defined(WEBRTC_WIN) 85ThreadManager::ThreadManager() { 86 key_ = TlsAlloc(); 87#ifndef NO_MAIN_THREAD_WRAPPING 88 WrapCurrentThread(); 89#endif 90} 91 92ThreadManager::~ThreadManager() { 93 UnwrapCurrentThread(); 94 TlsFree(key_); 95} 96 97Thread *ThreadManager::CurrentThread() { 98 return static_cast<Thread *>(TlsGetValue(key_)); 99} 100 101void ThreadManager::SetCurrentThread(Thread *thread) { 102 TlsSetValue(key_, thread); 103} 104#endif 105 106Thread *ThreadManager::WrapCurrentThread() { 107 Thread* result = CurrentThread(); 108 if (NULL == result) { 109 result = new Thread(); 110 result->WrapCurrentWithThreadManager(this); 111 } 112 return result; 113} 114 115void ThreadManager::UnwrapCurrentThread() { 116 Thread* t = CurrentThread(); 117 if (t && !(t->IsOwned())) { 118 t->UnwrapCurrent(); 119 delete t; 120 } 121} 122 123struct ThreadInit { 124 Thread* thread; 125 Runnable* runnable; 126}; 127 128Thread::Thread(SocketServer* ss) 129 : MessageQueue(ss), 130 priority_(PRIORITY_NORMAL), 131 running_(true, false), 132#if defined(WEBRTC_WIN) 133 thread_(NULL), 134 thread_id_(0), 135#endif 136 owned_(true) { 137 SetName("Thread", this); // default name 138} 139 140Thread::~Thread() { 141 Stop(); 142 Clear(NULL); 143} 144 145bool Thread::SleepMs(int milliseconds) { 146#if defined(WEBRTC_WIN) 147 ::Sleep(milliseconds); 148 return true; 149#else 150 // POSIX has both a usleep() and a nanosleep(), but the former is deprecated, 151 // so we use nanosleep() even though it has greater precision than necessary. 152 struct timespec ts; 153 ts.tv_sec = milliseconds / 1000; 154 ts.tv_nsec = (milliseconds % 1000) * 1000000; 155 int ret = nanosleep(&ts, NULL); 156 if (ret != 0) { 157 LOG_ERR(LS_WARNING) << "nanosleep() returning early"; 158 return false; 159 } 160 return true; 161#endif 162} 163 164bool Thread::SetName(const std::string& name, const void* obj) { 165 if (running()) return false; 166 name_ = name; 167 if (obj) { 168 char buf[16]; 169 sprintfn(buf, sizeof(buf), " 0x%p", obj); 170 name_ += buf; 171 } 172 return true; 173} 174 175bool Thread::SetPriority(ThreadPriority priority) { 176#if defined(WEBRTC_WIN) 177 if (running()) { 178 BOOL ret = FALSE; 179 if (priority == PRIORITY_NORMAL) { 180 ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_NORMAL); 181 } else if (priority == PRIORITY_HIGH) { 182 ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_HIGHEST); 183 } else if (priority == PRIORITY_ABOVE_NORMAL) { 184 ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_ABOVE_NORMAL); 185 } else if (priority == PRIORITY_IDLE) { 186 ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_IDLE); 187 } 188 if (!ret) { 189 return false; 190 } 191 } 192 priority_ = priority; 193 return true; 194#else 195 // TODO: Implement for Linux/Mac if possible. 196 if (running()) return false; 197 priority_ = priority; 198 return true; 199#endif 200} 201 202bool Thread::Start(Runnable* runnable) { 203 ASSERT(owned_); 204 if (!owned_) return false; 205 ASSERT(!running()); 206 if (running()) return false; 207 208 Restart(); // reset fStop_ if the thread is being restarted 209 210 // Make sure that ThreadManager is created on the main thread before 211 // we start a new thread. 212 ThreadManager::Instance(); 213 214 ThreadInit* init = new ThreadInit; 215 init->thread = this; 216 init->runnable = runnable; 217#if defined(WEBRTC_WIN) 218 DWORD flags = 0; 219 if (priority_ != PRIORITY_NORMAL) { 220 flags = CREATE_SUSPENDED; 221 } 222 thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, flags, 223 &thread_id_); 224 if (thread_) { 225 running_.Set(); 226 if (priority_ != PRIORITY_NORMAL) { 227 SetPriority(priority_); 228 ::ResumeThread(thread_); 229 } 230 } else { 231 return false; 232 } 233#elif defined(WEBRTC_POSIX) 234 pthread_attr_t attr; 235 pthread_attr_init(&attr); 236 237 // Thread priorities are not supported in NaCl. 238#if !defined(__native_client__) 239 if (priority_ != PRIORITY_NORMAL) { 240 if (priority_ == PRIORITY_IDLE) { 241 // There is no POSIX-standard way to set a below-normal priority for an 242 // individual thread (only whole process), so let's not support it. 243 LOG(LS_WARNING) << "PRIORITY_IDLE not supported"; 244 } else { 245 // Set real-time round-robin policy. 246 if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) { 247 LOG(LS_ERROR) << "pthread_attr_setschedpolicy"; 248 } 249 struct sched_param param; 250 if (pthread_attr_getschedparam(&attr, ¶m) != 0) { 251 LOG(LS_ERROR) << "pthread_attr_getschedparam"; 252 } else { 253 // The numbers here are arbitrary. 254 if (priority_ == PRIORITY_HIGH) { 255 param.sched_priority = 6; // 6 = HIGH 256 } else { 257 ASSERT(priority_ == PRIORITY_ABOVE_NORMAL); 258 param.sched_priority = 4; // 4 = ABOVE_NORMAL 259 } 260 if (pthread_attr_setschedparam(&attr, ¶m) != 0) { 261 LOG(LS_ERROR) << "pthread_attr_setschedparam"; 262 } 263 } 264 } 265 } 266#endif // !defined(__native_client__) 267 268 int error_code = pthread_create(&thread_, &attr, PreRun, init); 269 if (0 != error_code) { 270 LOG(LS_ERROR) << "Unable to create pthread, error " << error_code; 271 return false; 272 } 273 running_.Set(); 274#endif 275 return true; 276} 277 278void Thread::Join() { 279 if (running()) { 280 ASSERT(!IsCurrent()); 281#if defined(WEBRTC_WIN) 282 WaitForSingleObject(thread_, INFINITE); 283 CloseHandle(thread_); 284 thread_ = NULL; 285 thread_id_ = 0; 286#elif defined(WEBRTC_POSIX) 287 void *pv; 288 pthread_join(thread_, &pv); 289#endif 290 running_.Reset(); 291 } 292} 293 294#if defined(WEBRTC_WIN) 295// As seen on MSDN. 296// http://msdn.microsoft.com/en-us/library/xcb2z8hs(VS.71).aspx 297#define MSDEV_SET_THREAD_NAME 0x406D1388 298typedef struct tagTHREADNAME_INFO { 299 DWORD dwType; 300 LPCSTR szName; 301 DWORD dwThreadID; 302 DWORD dwFlags; 303} THREADNAME_INFO; 304 305void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName) { 306 THREADNAME_INFO info; 307 info.dwType = 0x1000; 308 info.szName = szThreadName; 309 info.dwThreadID = dwThreadID; 310 info.dwFlags = 0; 311 312 __try { 313 RaiseException(MSDEV_SET_THREAD_NAME, 0, sizeof(info) / sizeof(DWORD), 314 reinterpret_cast<ULONG_PTR*>(&info)); 315 } 316 __except(EXCEPTION_CONTINUE_EXECUTION) { 317 } 318} 319#endif // WEBRTC_WIN 320 321void* Thread::PreRun(void* pv) { 322 ThreadInit* init = static_cast<ThreadInit*>(pv); 323 ThreadManager::Instance()->SetCurrentThread(init->thread); 324#if defined(WEBRTC_WIN) 325 SetThreadName(GetCurrentThreadId(), init->thread->name_.c_str()); 326#elif defined(WEBRTC_POSIX) 327 // TODO: See if naming exists for pthreads. 328#endif 329#if __has_feature(objc_arc) 330 @autoreleasepool 331#elif defined(WEBRTC_MAC) 332 // Make sure the new thread has an autoreleasepool 333 ScopedAutoreleasePool pool; 334#endif 335 { 336 if (init->runnable) { 337 init->runnable->Run(init->thread); 338 } else { 339 init->thread->Run(); 340 } 341 delete init; 342 return NULL; 343 } 344} 345 346void Thread::Run() { 347 ProcessMessages(kForever); 348} 349 350bool Thread::IsOwned() { 351 return owned_; 352} 353 354void Thread::Stop() { 355 MessageQueue::Quit(); 356 Join(); 357} 358 359void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { 360 if (fStop_) 361 return; 362 363 // Sent messages are sent to the MessageHandler directly, in the context 364 // of "thread", like Win32 SendMessage. If in the right context, 365 // call the handler directly. 366 367 Message msg; 368 msg.phandler = phandler; 369 msg.message_id = id; 370 msg.pdata = pdata; 371 if (IsCurrent()) { 372 phandler->OnMessage(&msg); 373 return; 374 } 375 376 AutoThread thread; 377 Thread *current_thread = Thread::Current(); 378 ASSERT(current_thread != NULL); // AutoThread ensures this 379 380 bool ready = false; 381 { 382 CritScope cs(&crit_); 383 _SendMessage smsg; 384 smsg.thread = current_thread; 385 smsg.msg = msg; 386 smsg.ready = &ready; 387 sendlist_.push_back(smsg); 388 } 389 390 // Wait for a reply 391 392 ss_->WakeUp(); 393 394 bool waited = false; 395 crit_.Enter(); 396 while (!ready) { 397 crit_.Leave(); 398 current_thread->ReceiveSends(); 399 current_thread->socketserver()->Wait(kForever, false); 400 waited = true; 401 crit_.Enter(); 402 } 403 crit_.Leave(); 404 405 // Our Wait loop above may have consumed some WakeUp events for this 406 // MessageQueue, that weren't relevant to this Send. Losing these WakeUps can 407 // cause problems for some SocketServers. 408 // 409 // Concrete example: 410 // Win32SocketServer on thread A calls Send on thread B. While processing the 411 // message, thread B Posts a message to A. We consume the wakeup for that 412 // Post while waiting for the Send to complete, which means that when we exit 413 // this loop, we need to issue another WakeUp, or else the Posted message 414 // won't be processed in a timely manner. 415 416 if (waited) { 417 current_thread->socketserver()->WakeUp(); 418 } 419} 420 421void Thread::ReceiveSends() { 422 // Receive a sent message. Cleanup scenarios: 423 // - thread sending exits: We don't allow this, since thread can exit 424 // only via Join, so Send must complete. 425 // - thread receiving exits: Wakeup/set ready in Thread::Clear() 426 // - object target cleared: Wakeup/set ready in Thread::Clear() 427 crit_.Enter(); 428 while (!sendlist_.empty()) { 429 _SendMessage smsg = sendlist_.front(); 430 sendlist_.pop_front(); 431 crit_.Leave(); 432 smsg.msg.phandler->OnMessage(&smsg.msg); 433 crit_.Enter(); 434 *smsg.ready = true; 435 smsg.thread->socketserver()->WakeUp(); 436 } 437 crit_.Leave(); 438} 439 440void Thread::Clear(MessageHandler *phandler, uint32 id, 441 MessageList* removed) { 442 CritScope cs(&crit_); 443 444 // Remove messages on sendlist_ with phandler 445 // Object target cleared: remove from send list, wakeup/set ready 446 // if sender not NULL. 447 448 std::list<_SendMessage>::iterator iter = sendlist_.begin(); 449 while (iter != sendlist_.end()) { 450 _SendMessage smsg = *iter; 451 if (smsg.msg.Match(phandler, id)) { 452 if (removed) { 453 removed->push_back(smsg.msg); 454 } else { 455 delete smsg.msg.pdata; 456 } 457 iter = sendlist_.erase(iter); 458 *smsg.ready = true; 459 smsg.thread->socketserver()->WakeUp(); 460 continue; 461 } 462 ++iter; 463 } 464 465 MessageQueue::Clear(phandler, id, removed); 466} 467 468bool Thread::ProcessMessages(int cmsLoop) { 469 uint32 msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); 470 int cmsNext = cmsLoop; 471 472 while (true) { 473#if __has_feature(objc_arc) 474 @autoreleasepool 475#elif defined(WEBRTC_MAC) 476 // see: http://developer.apple.com/library/mac/#documentation/Cocoa/Reference/Foundation/Classes/NSAutoreleasePool_Class/Reference/Reference.html 477 // Each thread is supposed to have an autorelease pool. Also for event loops 478 // like this, autorelease pool needs to be created and drained/released 479 // for each cycle. 480 ScopedAutoreleasePool pool; 481#endif 482 { 483 Message msg; 484 if (!Get(&msg, cmsNext)) 485 return !IsQuitting(); 486 Dispatch(&msg); 487 488 if (cmsLoop != kForever) { 489 cmsNext = TimeUntil(msEnd); 490 if (cmsNext < 0) 491 return true; 492 } 493 } 494 } 495} 496 497bool Thread::WrapCurrent() { 498 return WrapCurrentWithThreadManager(ThreadManager::Instance()); 499} 500 501bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager) { 502 if (running()) 503 return false; 504#if defined(WEBRTC_WIN) 505 // We explicitly ask for no rights other than synchronization. 506 // This gives us the best chance of succeeding. 507 thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId()); 508 if (!thread_) { 509 LOG_GLE(LS_ERROR) << "Unable to get handle to thread."; 510 return false; 511 } 512 thread_id_ = GetCurrentThreadId(); 513#elif defined(WEBRTC_POSIX) 514 thread_ = pthread_self(); 515#endif 516 owned_ = false; 517 running_.Set(); 518 thread_manager->SetCurrentThread(this); 519 return true; 520} 521 522void Thread::UnwrapCurrent() { 523 // Clears the platform-specific thread-specific storage. 524 ThreadManager::Instance()->SetCurrentThread(NULL); 525#if defined(WEBRTC_WIN) 526 if (!CloseHandle(thread_)) { 527 LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle."; 528 } 529#endif 530 running_.Reset(); 531} 532 533 534AutoThread::AutoThread(SocketServer* ss) : Thread(ss) { 535 if (!ThreadManager::Instance()->CurrentThread()) { 536 ThreadManager::Instance()->SetCurrentThread(this); 537 } 538} 539 540AutoThread::~AutoThread() { 541 Stop(); 542 if (ThreadManager::Instance()->CurrentThread() == this) { 543 ThreadManager::Instance()->SetCurrentThread(NULL); 544 } 545} 546 547#if defined(WEBRTC_WIN) 548void ComThread::Run() { 549 HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED); 550 ASSERT(SUCCEEDED(hr)); 551 if (SUCCEEDED(hr)) { 552 Thread::Run(); 553 CoUninitialize(); 554 } else { 555 LOG(LS_ERROR) << "CoInitialize failed, hr=" << hr; 556 } 557} 558#endif 559 560} // namespace rtc 561