1/*
2 *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/time.h>
13#endif
14
15#include "webrtc/base/common.h"
16#include "webrtc/base/logging.h"
17#include "webrtc/base/messagequeue.h"
18#if defined(__native_client__)
19#include "webrtc/base/nullsocketserver.h"
20typedef rtc::NullSocketServer DefaultSocketServer;
21#else
22#include "webrtc/base/physicalsocketserver.h"
23typedef rtc::PhysicalSocketServer DefaultSocketServer;
24#endif
25
26namespace rtc {
27
28const uint32 kMaxMsgLatency = 150;  // 150 ms
29
30//------------------------------------------------------------------
31// MessageQueueManager
32
33MessageQueueManager* MessageQueueManager::instance_ = NULL;
34
35MessageQueueManager* MessageQueueManager::Instance() {
36  // Note: This is not thread safe, but it is first called before threads are
37  // spawned.
38  if (!instance_)
39    instance_ = new MessageQueueManager;
40  return instance_;
41}
42
43bool MessageQueueManager::IsInitialized() {
44  return instance_ != NULL;
45}
46
47MessageQueueManager::MessageQueueManager() {
48}
49
50MessageQueueManager::~MessageQueueManager() {
51}
52
53void MessageQueueManager::Add(MessageQueue *message_queue) {
54  return Instance()->AddInternal(message_queue);
55}
56void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
57  // MessageQueueManager methods should be non-reentrant, so we
58  // ASSERT that is the case.  If any of these ASSERT, please
59  // contact bpm or jbeda.
60  ASSERT(!crit_.CurrentThreadIsOwner());
61  CritScope cs(&crit_);
62  message_queues_.push_back(message_queue);
63}
64
65void MessageQueueManager::Remove(MessageQueue *message_queue) {
66  // If there isn't a message queue manager instance, then there isn't a queue
67  // to remove.
68  if (!instance_) return;
69  return Instance()->RemoveInternal(message_queue);
70}
71void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
72  ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
73  // If this is the last MessageQueue, destroy the manager as well so that
74  // we don't leak this object at program shutdown. As mentioned above, this is
75  // not thread-safe, but this should only happen at program termination (when
76  // the ThreadManager is destroyed, and threads are no longer active).
77  bool destroy = false;
78  {
79    CritScope cs(&crit_);
80    std::vector<MessageQueue *>::iterator iter;
81    iter = std::find(message_queues_.begin(), message_queues_.end(),
82                     message_queue);
83    if (iter != message_queues_.end()) {
84      message_queues_.erase(iter);
85    }
86    destroy = message_queues_.empty();
87  }
88  if (destroy) {
89    instance_ = NULL;
90    delete this;
91  }
92}
93
94void MessageQueueManager::Clear(MessageHandler *handler) {
95  // If there isn't a message queue manager instance, then there aren't any
96  // queues to remove this handler from.
97  if (!instance_) return;
98  return Instance()->ClearInternal(handler);
99}
100void MessageQueueManager::ClearInternal(MessageHandler *handler) {
101  ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
102  CritScope cs(&crit_);
103  std::vector<MessageQueue *>::iterator iter;
104  for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
105    (*iter)->Clear(handler);
106}
107
108//------------------------------------------------------------------
109// MessageQueue
110
111MessageQueue::MessageQueue(SocketServer* ss)
112    : ss_(ss), fStop_(false), fPeekKeep_(false),
113      dmsgq_next_num_(0) {
114  if (!ss_) {
115    // Currently, MessageQueue holds a socket server, and is the base class for
116    // Thread.  It seems like it makes more sense for Thread to hold the socket
117    // server, and provide it to the MessageQueue, since the Thread controls
118    // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
119    // messagequeue_unittest to depend on network libraries... yuck.
120    default_ss_.reset(new DefaultSocketServer());
121    ss_ = default_ss_.get();
122  }
123  ss_->SetMessageQueue(this);
124  MessageQueueManager::Add(this);
125}
126
127MessageQueue::~MessageQueue() {
128  // The signal is done from here to ensure
129  // that it always gets called when the queue
130  // is going away.
131  SignalQueueDestroyed();
132  MessageQueueManager::Remove(this);
133  Clear(NULL);
134  if (ss_) {
135    ss_->SetMessageQueue(NULL);
136  }
137}
138
139void MessageQueue::set_socketserver(SocketServer* ss) {
140  ss_ = ss ? ss : default_ss_.get();
141  ss_->SetMessageQueue(this);
142}
143
144void MessageQueue::Quit() {
145  fStop_ = true;
146  ss_->WakeUp();
147}
148
149bool MessageQueue::IsQuitting() {
150  return fStop_;
151}
152
153void MessageQueue::Restart() {
154  fStop_ = false;
155}
156
157bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
158  if (fPeekKeep_) {
159    *pmsg = msgPeek_;
160    return true;
161  }
162  if (!Get(pmsg, cmsWait))
163    return false;
164  msgPeek_ = *pmsg;
165  fPeekKeep_ = true;
166  return true;
167}
168
169bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
170  // Return and clear peek if present
171  // Always return the peek if it exists so there is Peek/Get symmetry
172
173  if (fPeekKeep_) {
174    *pmsg = msgPeek_;
175    fPeekKeep_ = false;
176    return true;
177  }
178
179  // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
180
181  int cmsTotal = cmsWait;
182  int cmsElapsed = 0;
183  uint32 msStart = Time();
184  uint32 msCurrent = msStart;
185  while (true) {
186    // Check for sent messages
187    ReceiveSends();
188
189    // Check for posted events
190    int cmsDelayNext = kForever;
191    bool first_pass = true;
192    while (true) {
193      // All queue operations need to be locked, but nothing else in this loop
194      // (specifically handling disposed message) can happen inside the crit.
195      // Otherwise, disposed MessageHandlers will cause deadlocks.
196      {
197        CritScope cs(&crit_);
198        // On the first pass, check for delayed messages that have been
199        // triggered and calculate the next trigger time.
200        if (first_pass) {
201          first_pass = false;
202          while (!dmsgq_.empty()) {
203            if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
204              cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
205              break;
206            }
207            msgq_.push_back(dmsgq_.top().msg_);
208            dmsgq_.pop();
209          }
210        }
211        // Pull a message off the message queue, if available.
212        if (msgq_.empty()) {
213          break;
214        } else {
215          *pmsg = msgq_.front();
216          msgq_.pop_front();
217        }
218      }  // crit_ is released here.
219
220      // Log a warning for time-sensitive messages that we're late to deliver.
221      if (pmsg->ts_sensitive) {
222        int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
223        if (delay > 0) {
224          LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
225                            << (delay + kMaxMsgLatency) << "ms";
226        }
227      }
228      // If this was a dispose message, delete it and skip it.
229      if (MQID_DISPOSE == pmsg->message_id) {
230        ASSERT(NULL == pmsg->phandler);
231        delete pmsg->pdata;
232        *pmsg = Message();
233        continue;
234      }
235      return true;
236    }
237
238    if (fStop_)
239      break;
240
241    // Which is shorter, the delay wait or the asked wait?
242
243    int cmsNext;
244    if (cmsWait == kForever) {
245      cmsNext = cmsDelayNext;
246    } else {
247      cmsNext = _max(0, cmsTotal - cmsElapsed);
248      if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
249        cmsNext = cmsDelayNext;
250    }
251
252    // Wait and multiplex in the meantime
253    if (!ss_->Wait(cmsNext, process_io))
254      return false;
255
256    // If the specified timeout expired, return
257
258    msCurrent = Time();
259    cmsElapsed = TimeDiff(msCurrent, msStart);
260    if (cmsWait != kForever) {
261      if (cmsElapsed >= cmsWait)
262        return false;
263    }
264  }
265  return false;
266}
267
268void MessageQueue::ReceiveSends() {
269}
270
271void MessageQueue::Post(MessageHandler *phandler, uint32 id,
272    MessageData *pdata, bool time_sensitive) {
273  if (fStop_)
274    return;
275
276  // Keep thread safe
277  // Add the message to the end of the queue
278  // Signal for the multiplexer to return
279
280  CritScope cs(&crit_);
281  Message msg;
282  msg.phandler = phandler;
283  msg.message_id = id;
284  msg.pdata = pdata;
285  if (time_sensitive) {
286    msg.ts_sensitive = Time() + kMaxMsgLatency;
287  }
288  msgq_.push_back(msg);
289  ss_->WakeUp();
290}
291
292void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
293    MessageHandler *phandler, uint32 id, MessageData* pdata) {
294  if (fStop_)
295    return;
296
297  // Keep thread safe
298  // Add to the priority queue. Gets sorted soonest first.
299  // Signal for the multiplexer to return.
300
301  CritScope cs(&crit_);
302  Message msg;
303  msg.phandler = phandler;
304  msg.message_id = id;
305  msg.pdata = pdata;
306  DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
307  dmsgq_.push(dmsg);
308  // If this message queue processes 1 message every millisecond for 50 days,
309  // we will wrap this number.  Even then, only messages with identical times
310  // will be misordered, and then only briefly.  This is probably ok.
311  VERIFY(0 != ++dmsgq_next_num_);
312  ss_->WakeUp();
313}
314
315int MessageQueue::GetDelay() {
316  CritScope cs(&crit_);
317
318  if (!msgq_.empty())
319    return 0;
320
321  if (!dmsgq_.empty()) {
322    int delay = TimeUntil(dmsgq_.top().msTrigger_);
323    if (delay < 0)
324      delay = 0;
325    return delay;
326  }
327
328  return kForever;
329}
330
331void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
332                         MessageList* removed) {
333  CritScope cs(&crit_);
334
335  // Remove messages with phandler
336
337  if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
338    if (removed) {
339      removed->push_back(msgPeek_);
340    } else {
341      delete msgPeek_.pdata;
342    }
343    fPeekKeep_ = false;
344  }
345
346  // Remove from ordered message queue
347
348  for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
349    if (it->Match(phandler, id)) {
350      if (removed) {
351        removed->push_back(*it);
352      } else {
353        delete it->pdata;
354      }
355      it = msgq_.erase(it);
356    } else {
357      ++it;
358    }
359  }
360
361  // Remove from priority queue. Not directly iterable, so use this approach
362
363  PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
364  for (PriorityQueue::container_type::iterator it = new_end;
365       it != dmsgq_.container().end(); ++it) {
366    if (it->msg_.Match(phandler, id)) {
367      if (removed) {
368        removed->push_back(it->msg_);
369      } else {
370        delete it->msg_.pdata;
371      }
372    } else {
373      *new_end++ = *it;
374    }
375  }
376  dmsgq_.container().erase(new_end, dmsgq_.container().end());
377  dmsgq_.reheap();
378}
379
380void MessageQueue::Dispatch(Message *pmsg) {
381  pmsg->phandler->OnMessage(pmsg);
382}
383
384}  // namespace rtc
385