messagequeue.cc revision f74420b3285b9fe04a7e00aa3b8c0ab07ea344bc
1/* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#if defined(_MSC_VER) && _MSC_VER < 1300 29#pragma warning(disable:4786) 30#endif 31 32#ifdef POSIX 33#include <sys/time.h> 34#endif 35 36#include "talk/base/common.h" 37#include "talk/base/logging.h" 38#include "talk/base/messagequeue.h" 39#include "talk/base/physicalsocketserver.h" 40 41 42namespace talk_base { 43 44const uint32 kMaxMsgLatency = 150; // 150 ms 45 46//------------------------------------------------------------------ 47// MessageQueueManager 48 49MessageQueueManager* MessageQueueManager::instance_; 50 51MessageQueueManager* MessageQueueManager::Instance() { 52 // Note: This is not thread safe, but it is first called before threads are 53 // spawned. 54 if (!instance_) 55 instance_ = new MessageQueueManager; 56 return instance_; 57} 58 59MessageQueueManager::MessageQueueManager() { 60} 61 62MessageQueueManager::~MessageQueueManager() { 63} 64 65void MessageQueueManager::Add(MessageQueue *message_queue) { 66 // MessageQueueManager methods should be non-reentrant, so we 67 // ASSERT that is the case. If any of these ASSERT, please 68 // contact bpm or jbeda. 69 ASSERT(!crit_.CurrentThreadIsOwner()); 70 CritScope cs(&crit_); 71 message_queues_.push_back(message_queue); 72} 73 74void MessageQueueManager::Remove(MessageQueue *message_queue) { 75 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 76 CritScope cs(&crit_); 77 std::vector<MessageQueue *>::iterator iter; 78 iter = std::find(message_queues_.begin(), message_queues_.end(), 79 message_queue); 80 if (iter != message_queues_.end()) 81 message_queues_.erase(iter); 82} 83 84void MessageQueueManager::Clear(MessageHandler *handler) { 85 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 86 CritScope cs(&crit_); 87 std::vector<MessageQueue *>::iterator iter; 88 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) 89 (*iter)->Clear(handler); 90} 91 92//------------------------------------------------------------------ 93// MessageQueue 94 95MessageQueue::MessageQueue(SocketServer* ss) 96 : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false), 97 dmsgq_next_num_(0) { 98 if (!ss_) { 99 // Currently, MessageQueue holds a socket server, and is the base class for 100 // Thread. It seems like it makes more sense for Thread to hold the socket 101 // server, and provide it to the MessageQueue, since the Thread controls 102 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 103 // messagequeue_unittest to depend on network libraries... yuck. 104 default_ss_.reset(new PhysicalSocketServer()); 105 ss_ = default_ss_.get(); 106 } 107 ss_->SetMessageQueue(this); 108} 109 110MessageQueue::~MessageQueue() { 111 // The signal is done from here to ensure 112 // that it always gets called when the queue 113 // is going away. 114 SignalQueueDestroyed(); 115 if (active_) { 116 MessageQueueManager::Instance()->Remove(this); 117 Clear(NULL); 118 } 119 if (ss_) { 120 ss_->SetMessageQueue(NULL); 121 } 122} 123 124void MessageQueue::set_socketserver(SocketServer* ss) { 125 ss_ = ss ? ss : default_ss_.get(); 126 ss_->SetMessageQueue(this); 127} 128 129void MessageQueue::Quit() { 130 fStop_ = true; 131 ss_->WakeUp(); 132} 133 134bool MessageQueue::IsQuitting() { 135 return fStop_; 136} 137 138void MessageQueue::Restart() { 139 fStop_ = false; 140} 141 142bool MessageQueue::Peek(Message *pmsg, int cmsWait) { 143 if (fPeekKeep_) { 144 *pmsg = msgPeek_; 145 return true; 146 } 147 if (!Get(pmsg, cmsWait)) 148 return false; 149 msgPeek_ = *pmsg; 150 fPeekKeep_ = true; 151 return true; 152} 153 154bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { 155 // Return and clear peek if present 156 // Always return the peek if it exists so there is Peek/Get symmetry 157 158 if (fPeekKeep_) { 159 *pmsg = msgPeek_; 160 fPeekKeep_ = false; 161 return true; 162 } 163 164 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch 165 166 int cmsTotal = cmsWait; 167 int cmsElapsed = 0; 168 uint32 msStart = Time(); 169 uint32 msCurrent = msStart; 170 while (true) { 171 // Check for sent messages 172 173 ReceiveSends(); 174 175 // Check queues 176 177 int cmsDelayNext = kForever; 178 { 179 CritScope cs(&crit_); 180 181 // Check for delayed messages that have been triggered 182 // Calc the next trigger too 183 184 while (!dmsgq_.empty()) { 185 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { 186 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); 187 break; 188 } 189 msgq_.push_back(dmsgq_.top().msg_); 190 dmsgq_.pop(); 191 } 192 193 // Check for posted events 194 195 while (!msgq_.empty()) { 196 *pmsg = msgq_.front(); 197 if (pmsg->ts_sensitive) { 198 long delay = TimeDiff(msCurrent, pmsg->ts_sensitive); 199 if (delay > 0) { 200 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " 201 << (delay + kMaxMsgLatency) << "ms"; 202 } 203 } 204 msgq_.pop_front(); 205 if (MQID_DISPOSE == pmsg->message_id) { 206 ASSERT(NULL == pmsg->phandler); 207 delete pmsg->pdata; 208 continue; 209 } 210 return true; 211 } 212 } 213 214 if (fStop_) 215 break; 216 217 // Which is shorter, the delay wait or the asked wait? 218 219 int cmsNext; 220 if (cmsWait == kForever) { 221 cmsNext = cmsDelayNext; 222 } else { 223 cmsNext = _max(0, cmsTotal - cmsElapsed); 224 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 225 cmsNext = cmsDelayNext; 226 } 227 228 // Wait and multiplex in the meantime 229 if (!ss_->Wait(cmsNext, process_io)) 230 return false; 231 232 // If the specified timeout expired, return 233 234 msCurrent = Time(); 235 cmsElapsed = TimeDiff(msCurrent, msStart); 236 if (cmsWait != kForever) { 237 if (cmsElapsed >= cmsWait) 238 return false; 239 } 240 } 241 return false; 242} 243 244void MessageQueue::ReceiveSends() { 245} 246 247void MessageQueue::Post(MessageHandler *phandler, uint32 id, 248 MessageData *pdata, bool time_sensitive) { 249 if (fStop_) 250 return; 251 252 // Keep thread safe 253 // Add the message to the end of the queue 254 // Signal for the multiplexer to return 255 256 CritScope cs(&crit_); 257 EnsureActive(); 258 Message msg; 259 msg.phandler = phandler; 260 msg.message_id = id; 261 msg.pdata = pdata; 262 if (time_sensitive) { 263 msg.ts_sensitive = Time() + kMaxMsgLatency; 264 } 265 msgq_.push_back(msg); 266 ss_->WakeUp(); 267} 268 269void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp, 270 MessageHandler *phandler, uint32 id, MessageData* pdata) { 271 if (fStop_) 272 return; 273 274 // Keep thread safe 275 // Add to the priority queue. Gets sorted soonest first. 276 // Signal for the multiplexer to return. 277 278 CritScope cs(&crit_); 279 EnsureActive(); 280 Message msg; 281 msg.phandler = phandler; 282 msg.message_id = id; 283 msg.pdata = pdata; 284 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); 285 dmsgq_.push(dmsg); 286 // If this message queue processes 1 message every millisecond for 50 days, 287 // we will wrap this number. Even then, only messages with identical times 288 // will be misordered, and then only briefly. This is probably ok. 289 VERIFY(0 != ++dmsgq_next_num_); 290 ss_->WakeUp(); 291} 292 293int MessageQueue::GetDelay() { 294 CritScope cs(&crit_); 295 296 if (!msgq_.empty()) 297 return 0; 298 299 if (!dmsgq_.empty()) { 300 int delay = TimeUntil(dmsgq_.top().msTrigger_); 301 if (delay < 0) 302 delay = 0; 303 return delay; 304 } 305 306 return kForever; 307} 308 309void MessageQueue::Clear(MessageHandler *phandler, uint32 id, 310 MessageList* removed) { 311 CritScope cs(&crit_); 312 313 // Remove messages with phandler 314 315 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { 316 if (removed) { 317 removed->push_back(msgPeek_); 318 } else { 319 delete msgPeek_.pdata; 320 } 321 fPeekKeep_ = false; 322 } 323 324 // Remove from ordered message queue 325 326 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { 327 if (it->Match(phandler, id)) { 328 if (removed) { 329 removed->push_back(*it); 330 } else { 331 delete it->pdata; 332 } 333 it = msgq_.erase(it); 334 } else { 335 ++it; 336 } 337 } 338 339 // Remove from priority queue. Not directly iterable, so use this approach 340 341 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); 342 for (PriorityQueue::container_type::iterator it = new_end; 343 it != dmsgq_.container().end(); ++it) { 344 if (it->msg_.Match(phandler, id)) { 345 if (removed) { 346 removed->push_back(it->msg_); 347 } else { 348 delete it->msg_.pdata; 349 } 350 } else { 351 *new_end++ = *it; 352 } 353 } 354 dmsgq_.container().erase(new_end, dmsgq_.container().end()); 355 dmsgq_.reheap(); 356} 357 358void MessageQueue::Dispatch(Message *pmsg) { 359 pmsg->phandler->OnMessage(pmsg); 360} 361 362void MessageQueue::EnsureActive() { 363 ASSERT(crit_.CurrentThreadIsOwner()); 364 if (!active_) { 365 active_ = true; 366 MessageQueueManager::Instance()->Add(this); 367 } 368} 369 370} // namespace talk_base 371