1/* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11#if defined(WEBRTC_POSIX) 12#include <sys/time.h> 13#endif 14 15#include "webrtc/base/common.h" 16#include "webrtc/base/logging.h" 17#include "webrtc/base/messagequeue.h" 18#if defined(__native_client__) 19#include "webrtc/base/nullsocketserver.h" 20typedef rtc::NullSocketServer DefaultSocketServer; 21#else 22#include "webrtc/base/physicalsocketserver.h" 23typedef rtc::PhysicalSocketServer DefaultSocketServer; 24#endif 25 26namespace rtc { 27 28const uint32 kMaxMsgLatency = 150; // 150 ms 29 30//------------------------------------------------------------------ 31// MessageQueueManager 32 33MessageQueueManager* MessageQueueManager::instance_ = NULL; 34 35MessageQueueManager* MessageQueueManager::Instance() { 36 // Note: This is not thread safe, but it is first called before threads are 37 // spawned. 38 if (!instance_) 39 instance_ = new MessageQueueManager; 40 return instance_; 41} 42 43bool MessageQueueManager::IsInitialized() { 44 return instance_ != NULL; 45} 46 47MessageQueueManager::MessageQueueManager() { 48} 49 50MessageQueueManager::~MessageQueueManager() { 51} 52 53void MessageQueueManager::Add(MessageQueue *message_queue) { 54 return Instance()->AddInternal(message_queue); 55} 56void MessageQueueManager::AddInternal(MessageQueue *message_queue) { 57 // MessageQueueManager methods should be non-reentrant, so we 58 // ASSERT that is the case. If any of these ASSERT, please 59 // contact bpm or jbeda. 60 ASSERT(!crit_.CurrentThreadIsOwner()); 61 CritScope cs(&crit_); 62 message_queues_.push_back(message_queue); 63} 64 65void MessageQueueManager::Remove(MessageQueue *message_queue) { 66 // If there isn't a message queue manager instance, then there isn't a queue 67 // to remove. 68 if (!instance_) return; 69 return Instance()->RemoveInternal(message_queue); 70} 71void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { 72 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 73 // If this is the last MessageQueue, destroy the manager as well so that 74 // we don't leak this object at program shutdown. As mentioned above, this is 75 // not thread-safe, but this should only happen at program termination (when 76 // the ThreadManager is destroyed, and threads are no longer active). 77 bool destroy = false; 78 { 79 CritScope cs(&crit_); 80 std::vector<MessageQueue *>::iterator iter; 81 iter = std::find(message_queues_.begin(), message_queues_.end(), 82 message_queue); 83 if (iter != message_queues_.end()) { 84 message_queues_.erase(iter); 85 } 86 destroy = message_queues_.empty(); 87 } 88 if (destroy) { 89 instance_ = NULL; 90 delete this; 91 } 92} 93 94void MessageQueueManager::Clear(MessageHandler *handler) { 95 // If there isn't a message queue manager instance, then there aren't any 96 // queues to remove this handler from. 97 if (!instance_) return; 98 return Instance()->ClearInternal(handler); 99} 100void MessageQueueManager::ClearInternal(MessageHandler *handler) { 101 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 102 CritScope cs(&crit_); 103 std::vector<MessageQueue *>::iterator iter; 104 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) 105 (*iter)->Clear(handler); 106} 107 108//------------------------------------------------------------------ 109// MessageQueue 110 111MessageQueue::MessageQueue(SocketServer* ss) 112 : ss_(ss), fStop_(false), fPeekKeep_(false), 113 dmsgq_next_num_(0) { 114 if (!ss_) { 115 // Currently, MessageQueue holds a socket server, and is the base class for 116 // Thread. It seems like it makes more sense for Thread to hold the socket 117 // server, and provide it to the MessageQueue, since the Thread controls 118 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 119 // messagequeue_unittest to depend on network libraries... yuck. 120 default_ss_.reset(new DefaultSocketServer()); 121 ss_ = default_ss_.get(); 122 } 123 ss_->SetMessageQueue(this); 124 MessageQueueManager::Add(this); 125} 126 127MessageQueue::~MessageQueue() { 128 // The signal is done from here to ensure 129 // that it always gets called when the queue 130 // is going away. 131 SignalQueueDestroyed(); 132 MessageQueueManager::Remove(this); 133 Clear(NULL); 134 if (ss_) { 135 ss_->SetMessageQueue(NULL); 136 } 137} 138 139void MessageQueue::set_socketserver(SocketServer* ss) { 140 ss_ = ss ? ss : default_ss_.get(); 141 ss_->SetMessageQueue(this); 142} 143 144void MessageQueue::Quit() { 145 fStop_ = true; 146 ss_->WakeUp(); 147} 148 149bool MessageQueue::IsQuitting() { 150 return fStop_; 151} 152 153void MessageQueue::Restart() { 154 fStop_ = false; 155} 156 157bool MessageQueue::Peek(Message *pmsg, int cmsWait) { 158 if (fPeekKeep_) { 159 *pmsg = msgPeek_; 160 return true; 161 } 162 if (!Get(pmsg, cmsWait)) 163 return false; 164 msgPeek_ = *pmsg; 165 fPeekKeep_ = true; 166 return true; 167} 168 169bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { 170 // Return and clear peek if present 171 // Always return the peek if it exists so there is Peek/Get symmetry 172 173 if (fPeekKeep_) { 174 *pmsg = msgPeek_; 175 fPeekKeep_ = false; 176 return true; 177 } 178 179 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch 180 181 int cmsTotal = cmsWait; 182 int cmsElapsed = 0; 183 uint32 msStart = Time(); 184 uint32 msCurrent = msStart; 185 while (true) { 186 // Check for sent messages 187 ReceiveSends(); 188 189 // Check for posted events 190 int cmsDelayNext = kForever; 191 bool first_pass = true; 192 while (true) { 193 // All queue operations need to be locked, but nothing else in this loop 194 // (specifically handling disposed message) can happen inside the crit. 195 // Otherwise, disposed MessageHandlers will cause deadlocks. 196 { 197 CritScope cs(&crit_); 198 // On the first pass, check for delayed messages that have been 199 // triggered and calculate the next trigger time. 200 if (first_pass) { 201 first_pass = false; 202 while (!dmsgq_.empty()) { 203 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { 204 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); 205 break; 206 } 207 msgq_.push_back(dmsgq_.top().msg_); 208 dmsgq_.pop(); 209 } 210 } 211 // Pull a message off the message queue, if available. 212 if (msgq_.empty()) { 213 break; 214 } else { 215 *pmsg = msgq_.front(); 216 msgq_.pop_front(); 217 } 218 } // crit_ is released here. 219 220 // Log a warning for time-sensitive messages that we're late to deliver. 221 if (pmsg->ts_sensitive) { 222 int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive); 223 if (delay > 0) { 224 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " 225 << (delay + kMaxMsgLatency) << "ms"; 226 } 227 } 228 // If this was a dispose message, delete it and skip it. 229 if (MQID_DISPOSE == pmsg->message_id) { 230 ASSERT(NULL == pmsg->phandler); 231 delete pmsg->pdata; 232 *pmsg = Message(); 233 continue; 234 } 235 return true; 236 } 237 238 if (fStop_) 239 break; 240 241 // Which is shorter, the delay wait or the asked wait? 242 243 int cmsNext; 244 if (cmsWait == kForever) { 245 cmsNext = cmsDelayNext; 246 } else { 247 cmsNext = _max(0, cmsTotal - cmsElapsed); 248 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 249 cmsNext = cmsDelayNext; 250 } 251 252 // Wait and multiplex in the meantime 253 if (!ss_->Wait(cmsNext, process_io)) 254 return false; 255 256 // If the specified timeout expired, return 257 258 msCurrent = Time(); 259 cmsElapsed = TimeDiff(msCurrent, msStart); 260 if (cmsWait != kForever) { 261 if (cmsElapsed >= cmsWait) 262 return false; 263 } 264 } 265 return false; 266} 267 268void MessageQueue::ReceiveSends() { 269} 270 271void MessageQueue::Post(MessageHandler *phandler, uint32 id, 272 MessageData *pdata, bool time_sensitive) { 273 if (fStop_) 274 return; 275 276 // Keep thread safe 277 // Add the message to the end of the queue 278 // Signal for the multiplexer to return 279 280 CritScope cs(&crit_); 281 Message msg; 282 msg.phandler = phandler; 283 msg.message_id = id; 284 msg.pdata = pdata; 285 if (time_sensitive) { 286 msg.ts_sensitive = Time() + kMaxMsgLatency; 287 } 288 msgq_.push_back(msg); 289 ss_->WakeUp(); 290} 291 292void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp, 293 MessageHandler *phandler, uint32 id, MessageData* pdata) { 294 if (fStop_) 295 return; 296 297 // Keep thread safe 298 // Add to the priority queue. Gets sorted soonest first. 299 // Signal for the multiplexer to return. 300 301 CritScope cs(&crit_); 302 Message msg; 303 msg.phandler = phandler; 304 msg.message_id = id; 305 msg.pdata = pdata; 306 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); 307 dmsgq_.push(dmsg); 308 // If this message queue processes 1 message every millisecond for 50 days, 309 // we will wrap this number. Even then, only messages with identical times 310 // will be misordered, and then only briefly. This is probably ok. 311 VERIFY(0 != ++dmsgq_next_num_); 312 ss_->WakeUp(); 313} 314 315int MessageQueue::GetDelay() { 316 CritScope cs(&crit_); 317 318 if (!msgq_.empty()) 319 return 0; 320 321 if (!dmsgq_.empty()) { 322 int delay = TimeUntil(dmsgq_.top().msTrigger_); 323 if (delay < 0) 324 delay = 0; 325 return delay; 326 } 327 328 return kForever; 329} 330 331void MessageQueue::Clear(MessageHandler *phandler, uint32 id, 332 MessageList* removed) { 333 CritScope cs(&crit_); 334 335 // Remove messages with phandler 336 337 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { 338 if (removed) { 339 removed->push_back(msgPeek_); 340 } else { 341 delete msgPeek_.pdata; 342 } 343 fPeekKeep_ = false; 344 } 345 346 // Remove from ordered message queue 347 348 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { 349 if (it->Match(phandler, id)) { 350 if (removed) { 351 removed->push_back(*it); 352 } else { 353 delete it->pdata; 354 } 355 it = msgq_.erase(it); 356 } else { 357 ++it; 358 } 359 } 360 361 // Remove from priority queue. Not directly iterable, so use this approach 362 363 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); 364 for (PriorityQueue::container_type::iterator it = new_end; 365 it != dmsgq_.container().end(); ++it) { 366 if (it->msg_.Match(phandler, id)) { 367 if (removed) { 368 removed->push_back(it->msg_); 369 } else { 370 delete it->msg_.pdata; 371 } 372 } else { 373 *new_end++ = *it; 374 } 375 } 376 dmsgq_.container().erase(new_end, dmsgq_.container().end()); 377 dmsgq_.reheap(); 378} 379 380void MessageQueue::Dispatch(Message *pmsg) { 381 pmsg->phandler->OnMessage(pmsg); 382} 383 384} // namespace rtc 385