messagequeue.cc revision f74420b3285b9fe04a7e00aa3b8c0ab07ea344bc
1/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#if defined(_MSC_VER) && _MSC_VER < 1300
29#pragma warning(disable:4786)
30#endif
31
32#ifdef POSIX
33#include <sys/time.h>
34#endif
35
36#include "talk/base/common.h"
37#include "talk/base/logging.h"
38#include "talk/base/messagequeue.h"
39#include "talk/base/physicalsocketserver.h"
40
41
42namespace talk_base {
43
44const uint32 kMaxMsgLatency = 150;  // 150 ms
45
46//------------------------------------------------------------------
47// MessageQueueManager
48
49MessageQueueManager* MessageQueueManager::instance_;
50
51MessageQueueManager* MessageQueueManager::Instance() {
52  // Note: This is not thread safe, but it is first called before threads are
53  // spawned.
54  if (!instance_)
55    instance_ = new MessageQueueManager;
56  return instance_;
57}
58
59MessageQueueManager::MessageQueueManager() {
60}
61
62MessageQueueManager::~MessageQueueManager() {
63}
64
65void MessageQueueManager::Add(MessageQueue *message_queue) {
66  // MessageQueueManager methods should be non-reentrant, so we
67  // ASSERT that is the case.  If any of these ASSERT, please
68  // contact bpm or jbeda.
69  ASSERT(!crit_.CurrentThreadIsOwner());
70  CritScope cs(&crit_);
71  message_queues_.push_back(message_queue);
72}
73
74void MessageQueueManager::Remove(MessageQueue *message_queue) {
75  ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
76  CritScope cs(&crit_);
77  std::vector<MessageQueue *>::iterator iter;
78  iter = std::find(message_queues_.begin(), message_queues_.end(),
79                   message_queue);
80  if (iter != message_queues_.end())
81    message_queues_.erase(iter);
82}
83
84void MessageQueueManager::Clear(MessageHandler *handler) {
85  ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
86  CritScope cs(&crit_);
87  std::vector<MessageQueue *>::iterator iter;
88  for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
89    (*iter)->Clear(handler);
90}
91
92//------------------------------------------------------------------
93// MessageQueue
94
95MessageQueue::MessageQueue(SocketServer* ss)
96    : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false),
97      dmsgq_next_num_(0) {
98  if (!ss_) {
99    // Currently, MessageQueue holds a socket server, and is the base class for
100    // Thread.  It seems like it makes more sense for Thread to hold the socket
101    // server, and provide it to the MessageQueue, since the Thread controls
102    // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
103    // messagequeue_unittest to depend on network libraries... yuck.
104    default_ss_.reset(new PhysicalSocketServer());
105    ss_ = default_ss_.get();
106  }
107  ss_->SetMessageQueue(this);
108}
109
110MessageQueue::~MessageQueue() {
111  // The signal is done from here to ensure
112  // that it always gets called when the queue
113  // is going away.
114  SignalQueueDestroyed();
115  if (active_) {
116    MessageQueueManager::Instance()->Remove(this);
117    Clear(NULL);
118  }
119  if (ss_) {
120    ss_->SetMessageQueue(NULL);
121  }
122}
123
124void MessageQueue::set_socketserver(SocketServer* ss) {
125  ss_ = ss ? ss : default_ss_.get();
126  ss_->SetMessageQueue(this);
127}
128
129void MessageQueue::Quit() {
130  fStop_ = true;
131  ss_->WakeUp();
132}
133
134bool MessageQueue::IsQuitting() {
135  return fStop_;
136}
137
138void MessageQueue::Restart() {
139  fStop_ = false;
140}
141
142bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
143  if (fPeekKeep_) {
144    *pmsg = msgPeek_;
145    return true;
146  }
147  if (!Get(pmsg, cmsWait))
148    return false;
149  msgPeek_ = *pmsg;
150  fPeekKeep_ = true;
151  return true;
152}
153
154bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
155  // Return and clear peek if present
156  // Always return the peek if it exists so there is Peek/Get symmetry
157
158  if (fPeekKeep_) {
159    *pmsg = msgPeek_;
160    fPeekKeep_ = false;
161    return true;
162  }
163
164  // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
165
166  int cmsTotal = cmsWait;
167  int cmsElapsed = 0;
168  uint32 msStart = Time();
169  uint32 msCurrent = msStart;
170  while (true) {
171    // Check for sent messages
172
173    ReceiveSends();
174
175    // Check queues
176
177    int cmsDelayNext = kForever;
178    {
179      CritScope cs(&crit_);
180
181      // Check for delayed messages that have been triggered
182      // Calc the next trigger too
183
184      while (!dmsgq_.empty()) {
185        if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
186          cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
187          break;
188        }
189        msgq_.push_back(dmsgq_.top().msg_);
190        dmsgq_.pop();
191      }
192
193      // Check for posted events
194
195      while (!msgq_.empty()) {
196        *pmsg = msgq_.front();
197        if (pmsg->ts_sensitive) {
198          long delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
199          if (delay > 0) {
200            LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
201                              << (delay + kMaxMsgLatency) << "ms";
202          }
203        }
204        msgq_.pop_front();
205        if (MQID_DISPOSE == pmsg->message_id) {
206          ASSERT(NULL == pmsg->phandler);
207          delete pmsg->pdata;
208          continue;
209        }
210        return true;
211      }
212    }
213
214    if (fStop_)
215      break;
216
217    // Which is shorter, the delay wait or the asked wait?
218
219    int cmsNext;
220    if (cmsWait == kForever) {
221      cmsNext = cmsDelayNext;
222    } else {
223      cmsNext = _max(0, cmsTotal - cmsElapsed);
224      if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
225        cmsNext = cmsDelayNext;
226    }
227
228    // Wait and multiplex in the meantime
229    if (!ss_->Wait(cmsNext, process_io))
230      return false;
231
232    // If the specified timeout expired, return
233
234    msCurrent = Time();
235    cmsElapsed = TimeDiff(msCurrent, msStart);
236    if (cmsWait != kForever) {
237      if (cmsElapsed >= cmsWait)
238        return false;
239    }
240  }
241  return false;
242}
243
244void MessageQueue::ReceiveSends() {
245}
246
247void MessageQueue::Post(MessageHandler *phandler, uint32 id,
248    MessageData *pdata, bool time_sensitive) {
249  if (fStop_)
250    return;
251
252  // Keep thread safe
253  // Add the message to the end of the queue
254  // Signal for the multiplexer to return
255
256  CritScope cs(&crit_);
257  EnsureActive();
258  Message msg;
259  msg.phandler = phandler;
260  msg.message_id = id;
261  msg.pdata = pdata;
262  if (time_sensitive) {
263    msg.ts_sensitive = Time() + kMaxMsgLatency;
264  }
265  msgq_.push_back(msg);
266  ss_->WakeUp();
267}
268
269void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
270    MessageHandler *phandler, uint32 id, MessageData* pdata) {
271  if (fStop_)
272    return;
273
274  // Keep thread safe
275  // Add to the priority queue. Gets sorted soonest first.
276  // Signal for the multiplexer to return.
277
278  CritScope cs(&crit_);
279  EnsureActive();
280  Message msg;
281  msg.phandler = phandler;
282  msg.message_id = id;
283  msg.pdata = pdata;
284  DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
285  dmsgq_.push(dmsg);
286  // If this message queue processes 1 message every millisecond for 50 days,
287  // we will wrap this number.  Even then, only messages with identical times
288  // will be misordered, and then only briefly.  This is probably ok.
289  VERIFY(0 != ++dmsgq_next_num_);
290  ss_->WakeUp();
291}
292
293int MessageQueue::GetDelay() {
294  CritScope cs(&crit_);
295
296  if (!msgq_.empty())
297    return 0;
298
299  if (!dmsgq_.empty()) {
300    int delay = TimeUntil(dmsgq_.top().msTrigger_);
301    if (delay < 0)
302      delay = 0;
303    return delay;
304  }
305
306  return kForever;
307}
308
309void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
310                         MessageList* removed) {
311  CritScope cs(&crit_);
312
313  // Remove messages with phandler
314
315  if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
316    if (removed) {
317      removed->push_back(msgPeek_);
318    } else {
319      delete msgPeek_.pdata;
320    }
321    fPeekKeep_ = false;
322  }
323
324  // Remove from ordered message queue
325
326  for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
327    if (it->Match(phandler, id)) {
328      if (removed) {
329        removed->push_back(*it);
330      } else {
331        delete it->pdata;
332      }
333      it = msgq_.erase(it);
334    } else {
335      ++it;
336    }
337  }
338
339  // Remove from priority queue. Not directly iterable, so use this approach
340
341  PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
342  for (PriorityQueue::container_type::iterator it = new_end;
343       it != dmsgq_.container().end(); ++it) {
344    if (it->msg_.Match(phandler, id)) {
345      if (removed) {
346        removed->push_back(it->msg_);
347      } else {
348        delete it->msg_.pdata;
349      }
350    } else {
351      *new_end++ = *it;
352    }
353  }
354  dmsgq_.container().erase(new_end, dmsgq_.container().end());
355  dmsgq_.reheap();
356}
357
358void MessageQueue::Dispatch(Message *pmsg) {
359  pmsg->phandler->OnMessage(pmsg);
360}
361
362void MessageQueue::EnsureActive() {
363  ASSERT(crit_.CurrentThreadIsOwner());
364  if (!active_) {
365    active_ = true;
366    MessageQueueManager::Instance()->Add(this);
367  }
368}
369
370}  // namespace talk_base
371