1/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#if defined(_MSC_VER) && _MSC_VER < 1300
29#pragma warning(disable:4786)
30#endif
31
32#ifdef POSIX
33#include <sys/time.h>
34#endif
35
36#include "talk/base/common.h"
37#include "talk/base/logging.h"
38#include "talk/base/messagequeue.h"
39#include "talk/base/physicalsocketserver.h"
40
41
42namespace talk_base {
43
44const uint32 kMaxMsgLatency = 150;  // 150 ms
45
46//------------------------------------------------------------------
47// MessageQueueManager
48
49MessageQueueManager* MessageQueueManager::instance_;
50
51MessageQueueManager* MessageQueueManager::Instance() {
52  // Note: This is not thread safe, but it is first called before threads are
53  // spawned.
54  if (!instance_)
55    instance_ = new MessageQueueManager;
56  return instance_;
57}
58
59MessageQueueManager::MessageQueueManager() {
60}
61
62MessageQueueManager::~MessageQueueManager() {
63}
64
65void MessageQueueManager::Add(MessageQueue *message_queue) {
66  // MessageQueueManager methods should be non-reentrant, so we
67  // ASSERT that is the case.  If any of these ASSERT, please
68  // contact bpm or jbeda.
69  ASSERT(!crit_.CurrentThreadIsOwner());
70  CritScope cs(&crit_);
71  message_queues_.push_back(message_queue);
72}
73
74void MessageQueueManager::Remove(MessageQueue *message_queue) {
75  ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
76  // If this is the last MessageQueue, destroy the manager as well so that
77  // we don't leak this object at program shutdown. As mentioned above, this is
78  // not thread-safe, but this should only happen at program termination (when
79  // the ThreadManager is destroyed, and threads are no longer active).
80  bool destroy = false;
81  {
82    CritScope cs(&crit_);
83    std::vector<MessageQueue *>::iterator iter;
84    iter = std::find(message_queues_.begin(), message_queues_.end(),
85                     message_queue);
86    if (iter != message_queues_.end()) {
87      message_queues_.erase(iter);
88    }
89    destroy = message_queues_.empty();
90  }
91  if (destroy) {
92    instance_ = NULL;
93    delete this;
94  }
95}
96
97void MessageQueueManager::Clear(MessageHandler *handler) {
98  ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
99  CritScope cs(&crit_);
100  std::vector<MessageQueue *>::iterator iter;
101  for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
102    (*iter)->Clear(handler);
103}
104
105//------------------------------------------------------------------
106// MessageQueue
107
108MessageQueue::MessageQueue(SocketServer* ss)
109    : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false),
110      dmsgq_next_num_(0) {
111  if (!ss_) {
112    // Currently, MessageQueue holds a socket server, and is the base class for
113    // Thread.  It seems like it makes more sense for Thread to hold the socket
114    // server, and provide it to the MessageQueue, since the Thread controls
115    // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
116    // messagequeue_unittest to depend on network libraries... yuck.
117    default_ss_.reset(new PhysicalSocketServer());
118    ss_ = default_ss_.get();
119  }
120  ss_->SetMessageQueue(this);
121}
122
123MessageQueue::~MessageQueue() {
124  // The signal is done from here to ensure
125  // that it always gets called when the queue
126  // is going away.
127  SignalQueueDestroyed();
128  if (active_) {
129    MessageQueueManager::Instance()->Remove(this);
130    Clear(NULL);
131  }
132  if (ss_) {
133    ss_->SetMessageQueue(NULL);
134  }
135}
136
137void MessageQueue::set_socketserver(SocketServer* ss) {
138  ss_ = ss ? ss : default_ss_.get();
139  ss_->SetMessageQueue(this);
140}
141
142void MessageQueue::Quit() {
143  fStop_ = true;
144  ss_->WakeUp();
145}
146
147bool MessageQueue::IsQuitting() {
148  return fStop_;
149}
150
151void MessageQueue::Restart() {
152  fStop_ = false;
153}
154
155bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
156  if (fPeekKeep_) {
157    *pmsg = msgPeek_;
158    return true;
159  }
160  if (!Get(pmsg, cmsWait))
161    return false;
162  msgPeek_ = *pmsg;
163  fPeekKeep_ = true;
164  return true;
165}
166
167bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
168  // Return and clear peek if present
169  // Always return the peek if it exists so there is Peek/Get symmetry
170
171  if (fPeekKeep_) {
172    *pmsg = msgPeek_;
173    fPeekKeep_ = false;
174    return true;
175  }
176
177  // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
178
179  int cmsTotal = cmsWait;
180  int cmsElapsed = 0;
181  uint32 msStart = Time();
182  uint32 msCurrent = msStart;
183  while (true) {
184    // Check for sent messages
185
186    ReceiveSends();
187
188    // Check queues
189
190    int cmsDelayNext = kForever;
191    {
192      CritScope cs(&crit_);
193
194      // Check for delayed messages that have been triggered
195      // Calc the next trigger too
196
197      while (!dmsgq_.empty()) {
198        if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
199          cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
200          break;
201        }
202        msgq_.push_back(dmsgq_.top().msg_);
203        dmsgq_.pop();
204      }
205
206      // Check for posted events
207
208      while (!msgq_.empty()) {
209        *pmsg = msgq_.front();
210        if (pmsg->ts_sensitive) {
211          long delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
212          if (delay > 0) {
213            LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
214                              << (delay + kMaxMsgLatency) << "ms";
215          }
216        }
217        msgq_.pop_front();
218        if (MQID_DISPOSE == pmsg->message_id) {
219          ASSERT(NULL == pmsg->phandler);
220          delete pmsg->pdata;
221          continue;
222        }
223        return true;
224      }
225    }
226
227    if (fStop_)
228      break;
229
230    // Which is shorter, the delay wait or the asked wait?
231
232    int cmsNext;
233    if (cmsWait == kForever) {
234      cmsNext = cmsDelayNext;
235    } else {
236      cmsNext = _max(0, cmsTotal - cmsElapsed);
237      if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
238        cmsNext = cmsDelayNext;
239    }
240
241    // Wait and multiplex in the meantime
242    if (!ss_->Wait(cmsNext, process_io))
243      return false;
244
245    // If the specified timeout expired, return
246
247    msCurrent = Time();
248    cmsElapsed = TimeDiff(msCurrent, msStart);
249    if (cmsWait != kForever) {
250      if (cmsElapsed >= cmsWait)
251        return false;
252    }
253  }
254  return false;
255}
256
257void MessageQueue::ReceiveSends() {
258}
259
260void MessageQueue::Post(MessageHandler *phandler, uint32 id,
261    MessageData *pdata, bool time_sensitive) {
262  if (fStop_)
263    return;
264
265  // Keep thread safe
266  // Add the message to the end of the queue
267  // Signal for the multiplexer to return
268
269  CritScope cs(&crit_);
270  EnsureActive();
271  Message msg;
272  msg.phandler = phandler;
273  msg.message_id = id;
274  msg.pdata = pdata;
275  if (time_sensitive) {
276    msg.ts_sensitive = Time() + kMaxMsgLatency;
277  }
278  msgq_.push_back(msg);
279  ss_->WakeUp();
280}
281
282void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
283    MessageHandler *phandler, uint32 id, MessageData* pdata) {
284  if (fStop_)
285    return;
286
287  // Keep thread safe
288  // Add to the priority queue. Gets sorted soonest first.
289  // Signal for the multiplexer to return.
290
291  CritScope cs(&crit_);
292  EnsureActive();
293  Message msg;
294  msg.phandler = phandler;
295  msg.message_id = id;
296  msg.pdata = pdata;
297  DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
298  dmsgq_.push(dmsg);
299  // If this message queue processes 1 message every millisecond for 50 days,
300  // we will wrap this number.  Even then, only messages with identical times
301  // will be misordered, and then only briefly.  This is probably ok.
302  VERIFY(0 != ++dmsgq_next_num_);
303  ss_->WakeUp();
304}
305
306int MessageQueue::GetDelay() {
307  CritScope cs(&crit_);
308
309  if (!msgq_.empty())
310    return 0;
311
312  if (!dmsgq_.empty()) {
313    int delay = TimeUntil(dmsgq_.top().msTrigger_);
314    if (delay < 0)
315      delay = 0;
316    return delay;
317  }
318
319  return kForever;
320}
321
322void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
323                         MessageList* removed) {
324  CritScope cs(&crit_);
325
326  // Remove messages with phandler
327
328  if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
329    if (removed) {
330      removed->push_back(msgPeek_);
331    } else {
332      delete msgPeek_.pdata;
333    }
334    fPeekKeep_ = false;
335  }
336
337  // Remove from ordered message queue
338
339  for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
340    if (it->Match(phandler, id)) {
341      if (removed) {
342        removed->push_back(*it);
343      } else {
344        delete it->pdata;
345      }
346      it = msgq_.erase(it);
347    } else {
348      ++it;
349    }
350  }
351
352  // Remove from priority queue. Not directly iterable, so use this approach
353
354  PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
355  for (PriorityQueue::container_type::iterator it = new_end;
356       it != dmsgq_.container().end(); ++it) {
357    if (it->msg_.Match(phandler, id)) {
358      if (removed) {
359        removed->push_back(it->msg_);
360      } else {
361        delete it->msg_.pdata;
362      }
363    } else {
364      *new_end++ = *it;
365    }
366  }
367  dmsgq_.container().erase(new_end, dmsgq_.container().end());
368  dmsgq_.reheap();
369}
370
371void MessageQueue::Dispatch(Message *pmsg) {
372  pmsg->phandler->OnMessage(pmsg);
373}
374
375void MessageQueue::EnsureActive() {
376  ASSERT(crit_.CurrentThreadIsOwner());
377  if (!active_) {
378    active_ = true;
379    MessageQueueManager::Instance()->Add(this);
380  }
381}
382
383}  // namespace talk_base
384