1/*
2 *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/time.h>
13#endif
14
15#include <algorithm>
16
17#include "webrtc/base/common.h"
18#include "webrtc/base/logging.h"
19#include "webrtc/base/messagequeue.h"
20#if defined(__native_client__)
21#include "webrtc/base/nullsocketserver.h"
22typedef rtc::NullSocketServer DefaultSocketServer;
23#else
24#include "webrtc/base/physicalsocketserver.h"
25typedef rtc::PhysicalSocketServer DefaultSocketServer;
26#endif
27
28namespace rtc {
29
30const uint32_t kMaxMsgLatency = 150;  // 150 ms
31
32//------------------------------------------------------------------
33// MessageQueueManager
34
35MessageQueueManager* MessageQueueManager::instance_ = NULL;
36
37MessageQueueManager* MessageQueueManager::Instance() {
38  // Note: This is not thread safe, but it is first called before threads are
39  // spawned.
40  if (!instance_)
41    instance_ = new MessageQueueManager;
42  return instance_;
43}
44
45bool MessageQueueManager::IsInitialized() {
46  return instance_ != NULL;
47}
48
49MessageQueueManager::MessageQueueManager() {
50}
51
52MessageQueueManager::~MessageQueueManager() {
53}
54
55void MessageQueueManager::Add(MessageQueue *message_queue) {
56  return Instance()->AddInternal(message_queue);
57}
58void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
59  // MessageQueueManager methods should be non-reentrant, so we
60  // ASSERT that is the case.  If any of these ASSERT, please
61  // contact bpm or jbeda.
62#if CS_DEBUG_CHECKS  // CurrentThreadIsOwner returns true by default.
63  ASSERT(!crit_.CurrentThreadIsOwner());
64#endif
65  CritScope cs(&crit_);
66  message_queues_.push_back(message_queue);
67}
68
69void MessageQueueManager::Remove(MessageQueue *message_queue) {
70  // If there isn't a message queue manager instance, then there isn't a queue
71  // to remove.
72  if (!instance_) return;
73  return Instance()->RemoveInternal(message_queue);
74}
75void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
76#if CS_DEBUG_CHECKS  // CurrentThreadIsOwner returns true by default.
77  ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
78#endif
79  // If this is the last MessageQueue, destroy the manager as well so that
80  // we don't leak this object at program shutdown. As mentioned above, this is
81  // not thread-safe, but this should only happen at program termination (when
82  // the ThreadManager is destroyed, and threads are no longer active).
83  bool destroy = false;
84  {
85    CritScope cs(&crit_);
86    std::vector<MessageQueue *>::iterator iter;
87    iter = std::find(message_queues_.begin(), message_queues_.end(),
88                     message_queue);
89    if (iter != message_queues_.end()) {
90      message_queues_.erase(iter);
91    }
92    destroy = message_queues_.empty();
93  }
94  if (destroy) {
95    instance_ = NULL;
96    delete this;
97  }
98}
99
100void MessageQueueManager::Clear(MessageHandler *handler) {
101  // If there isn't a message queue manager instance, then there aren't any
102  // queues to remove this handler from.
103  if (!instance_) return;
104  return Instance()->ClearInternal(handler);
105}
106void MessageQueueManager::ClearInternal(MessageHandler *handler) {
107#if CS_DEBUG_CHECKS  // CurrentThreadIsOwner returns true by default.
108  ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
109#endif
110  CritScope cs(&crit_);
111  std::vector<MessageQueue *>::iterator iter;
112  for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
113    (*iter)->Clear(handler);
114}
115
116//------------------------------------------------------------------
117// MessageQueue
118
119MessageQueue::MessageQueue(SocketServer* ss)
120    : ss_(ss), fStop_(false), fPeekKeep_(false),
121      dmsgq_next_num_(0) {
122  if (!ss_) {
123    // Currently, MessageQueue holds a socket server, and is the base class for
124    // Thread.  It seems like it makes more sense for Thread to hold the socket
125    // server, and provide it to the MessageQueue, since the Thread controls
126    // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
127    // messagequeue_unittest to depend on network libraries... yuck.
128    default_ss_.reset(new DefaultSocketServer());
129    ss_ = default_ss_.get();
130  }
131  ss_->SetMessageQueue(this);
132  MessageQueueManager::Add(this);
133}
134
135MessageQueue::~MessageQueue() {
136  // The signal is done from here to ensure
137  // that it always gets called when the queue
138  // is going away.
139  SignalQueueDestroyed();
140  MessageQueueManager::Remove(this);
141  Clear(NULL);
142  if (ss_) {
143    ss_->SetMessageQueue(NULL);
144  }
145}
146
147void MessageQueue::set_socketserver(SocketServer* ss) {
148  ss_ = ss ? ss : default_ss_.get();
149  ss_->SetMessageQueue(this);
150}
151
152void MessageQueue::Quit() {
153  fStop_ = true;
154  ss_->WakeUp();
155}
156
157bool MessageQueue::IsQuitting() {
158  return fStop_;
159}
160
161void MessageQueue::Restart() {
162  fStop_ = false;
163}
164
165bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
166  if (fPeekKeep_) {
167    *pmsg = msgPeek_;
168    return true;
169  }
170  if (!Get(pmsg, cmsWait))
171    return false;
172  msgPeek_ = *pmsg;
173  fPeekKeep_ = true;
174  return true;
175}
176
177bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
178  // Return and clear peek if present
179  // Always return the peek if it exists so there is Peek/Get symmetry
180
181  if (fPeekKeep_) {
182    *pmsg = msgPeek_;
183    fPeekKeep_ = false;
184    return true;
185  }
186
187  // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
188
189  int cmsTotal = cmsWait;
190  int cmsElapsed = 0;
191  uint32_t msStart = Time();
192  uint32_t msCurrent = msStart;
193  while (true) {
194    // Check for sent messages
195    ReceiveSends();
196
197    // Check for posted events
198    int cmsDelayNext = kForever;
199    bool first_pass = true;
200    while (true) {
201      // All queue operations need to be locked, but nothing else in this loop
202      // (specifically handling disposed message) can happen inside the crit.
203      // Otherwise, disposed MessageHandlers will cause deadlocks.
204      {
205        CritScope cs(&crit_);
206        // On the first pass, check for delayed messages that have been
207        // triggered and calculate the next trigger time.
208        if (first_pass) {
209          first_pass = false;
210          while (!dmsgq_.empty()) {
211            if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
212              cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
213              break;
214            }
215            msgq_.push_back(dmsgq_.top().msg_);
216            dmsgq_.pop();
217          }
218        }
219        // Pull a message off the message queue, if available.
220        if (msgq_.empty()) {
221          break;
222        } else {
223          *pmsg = msgq_.front();
224          msgq_.pop_front();
225        }
226      }  // crit_ is released here.
227
228      // Log a warning for time-sensitive messages that we're late to deliver.
229      if (pmsg->ts_sensitive) {
230        int32_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
231        if (delay > 0) {
232          LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
233                            << (delay + kMaxMsgLatency) << "ms";
234        }
235      }
236      // If this was a dispose message, delete it and skip it.
237      if (MQID_DISPOSE == pmsg->message_id) {
238        ASSERT(NULL == pmsg->phandler);
239        delete pmsg->pdata;
240        *pmsg = Message();
241        continue;
242      }
243      return true;
244    }
245
246    if (fStop_)
247      break;
248
249    // Which is shorter, the delay wait or the asked wait?
250
251    int cmsNext;
252    if (cmsWait == kForever) {
253      cmsNext = cmsDelayNext;
254    } else {
255      cmsNext = std::max(0, cmsTotal - cmsElapsed);
256      if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
257        cmsNext = cmsDelayNext;
258    }
259
260    // Wait and multiplex in the meantime
261    if (!ss_->Wait(cmsNext, process_io))
262      return false;
263
264    // If the specified timeout expired, return
265
266    msCurrent = Time();
267    cmsElapsed = TimeDiff(msCurrent, msStart);
268    if (cmsWait != kForever) {
269      if (cmsElapsed >= cmsWait)
270        return false;
271    }
272  }
273  return false;
274}
275
276void MessageQueue::ReceiveSends() {
277}
278
279void MessageQueue::Post(MessageHandler* phandler,
280                        uint32_t id,
281                        MessageData* pdata,
282                        bool time_sensitive) {
283  if (fStop_)
284    return;
285
286  // Keep thread safe
287  // Add the message to the end of the queue
288  // Signal for the multiplexer to return
289
290  CritScope cs(&crit_);
291  Message msg;
292  msg.phandler = phandler;
293  msg.message_id = id;
294  msg.pdata = pdata;
295  if (time_sensitive) {
296    msg.ts_sensitive = Time() + kMaxMsgLatency;
297  }
298  msgq_.push_back(msg);
299  ss_->WakeUp();
300}
301
302void MessageQueue::PostDelayed(int cmsDelay,
303                               MessageHandler* phandler,
304                               uint32_t id,
305                               MessageData* pdata) {
306  return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
307}
308
309void MessageQueue::PostAt(uint32_t tstamp,
310                          MessageHandler* phandler,
311                          uint32_t id,
312                          MessageData* pdata) {
313  return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
314}
315
316void MessageQueue::DoDelayPost(int cmsDelay,
317                               uint32_t tstamp,
318                               MessageHandler* phandler,
319                               uint32_t id,
320                               MessageData* pdata) {
321  if (fStop_)
322    return;
323
324  // Keep thread safe
325  // Add to the priority queue. Gets sorted soonest first.
326  // Signal for the multiplexer to return.
327
328  CritScope cs(&crit_);
329  Message msg;
330  msg.phandler = phandler;
331  msg.message_id = id;
332  msg.pdata = pdata;
333  DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
334  dmsgq_.push(dmsg);
335  // If this message queue processes 1 message every millisecond for 50 days,
336  // we will wrap this number.  Even then, only messages with identical times
337  // will be misordered, and then only briefly.  This is probably ok.
338  VERIFY(0 != ++dmsgq_next_num_);
339  ss_->WakeUp();
340}
341
342int MessageQueue::GetDelay() {
343  CritScope cs(&crit_);
344
345  if (!msgq_.empty())
346    return 0;
347
348  if (!dmsgq_.empty()) {
349    int delay = TimeUntil(dmsgq_.top().msTrigger_);
350    if (delay < 0)
351      delay = 0;
352    return delay;
353  }
354
355  return kForever;
356}
357
358void MessageQueue::Clear(MessageHandler* phandler,
359                         uint32_t id,
360                         MessageList* removed) {
361  CritScope cs(&crit_);
362
363  // Remove messages with phandler
364
365  if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
366    if (removed) {
367      removed->push_back(msgPeek_);
368    } else {
369      delete msgPeek_.pdata;
370    }
371    fPeekKeep_ = false;
372  }
373
374  // Remove from ordered message queue
375
376  for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
377    if (it->Match(phandler, id)) {
378      if (removed) {
379        removed->push_back(*it);
380      } else {
381        delete it->pdata;
382      }
383      it = msgq_.erase(it);
384    } else {
385      ++it;
386    }
387  }
388
389  // Remove from priority queue. Not directly iterable, so use this approach
390
391  PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
392  for (PriorityQueue::container_type::iterator it = new_end;
393       it != dmsgq_.container().end(); ++it) {
394    if (it->msg_.Match(phandler, id)) {
395      if (removed) {
396        removed->push_back(it->msg_);
397      } else {
398        delete it->msg_.pdata;
399      }
400    } else {
401      *new_end++ = *it;
402    }
403  }
404  dmsgq_.container().erase(new_end, dmsgq_.container().end());
405  dmsgq_.reheap();
406}
407
408void MessageQueue::Dispatch(Message *pmsg) {
409  pmsg->phandler->OnMessage(pmsg);
410}
411
412}  // namespace rtc
413