1/* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#ifndef TALK_BASE_MESSAGEQUEUE_H_ 29#define TALK_BASE_MESSAGEQUEUE_H_ 30 31#include <algorithm> 32#include <cstring> 33#include <list> 34#include <queue> 35#include <vector> 36 37#include "talk/base/basictypes.h" 38#include "talk/base/criticalsection.h" 39#include "talk/base/messagehandler.h" 40#include "talk/base/scoped_ptr.h" 41#include "talk/base/sigslot.h" 42#include "talk/base/socketserver.h" 43#include "talk/base/time.h" 44 45namespace talk_base { 46 47struct Message; 48class MessageQueue; 49 50// MessageQueueManager does cleanup of of message queues 51 52class MessageQueueManager { 53 public: 54 static MessageQueueManager* Instance(); 55 56 void Add(MessageQueue *message_queue); 57 void Remove(MessageQueue *message_queue); 58 void Clear(MessageHandler *handler); 59 60 private: 61 MessageQueueManager(); 62 ~MessageQueueManager(); 63 64 static MessageQueueManager* instance_; 65 // This list contains 'active' MessageQueues. 66 std::vector<MessageQueue *> message_queues_; 67 CriticalSection crit_; 68}; 69 70// Derive from this for specialized data 71// App manages lifetime, except when messages are purged 72 73class MessageData { 74 public: 75 MessageData() {} 76 virtual ~MessageData() {} 77}; 78 79template <class T> 80class TypedMessageData : public MessageData { 81 public: 82 explicit TypedMessageData(const T& data) : data_(data) { } 83 const T& data() const { return data_; } 84 T& data() { return data_; } 85 private: 86 T data_; 87}; 88 89// Like TypedMessageData, but for pointers that require a delete. 90template <class T> 91class ScopedMessageData : public MessageData { 92 public: 93 explicit ScopedMessageData(T* data) : data_(data) { } 94 const scoped_ptr<T>& data() const { return data_; } 95 scoped_ptr<T>& data() { return data_; } 96 private: 97 scoped_ptr<T> data_; 98}; 99 100template<class T> 101inline MessageData* WrapMessageData(const T& data) { 102 return new TypedMessageData<T>(data); 103} 104 105template<class T> 106inline const T& UseMessageData(MessageData* data) { 107 return static_cast< TypedMessageData<T>* >(data)->data(); 108} 109 110template<class T> 111class DisposeData : public MessageData { 112 public: 113 explicit DisposeData(T* data) : data_(data) { } 114 virtual ~DisposeData() { delete data_; } 115 private: 116 T* data_; 117}; 118 119const uint32 MQID_ANY = static_cast<uint32>(-1); 120const uint32 MQID_DISPOSE = static_cast<uint32>(-2); 121 122// No destructor 123 124struct Message { 125 Message() { 126 memset(this, 0, sizeof(*this)); 127 } 128 inline bool Match(MessageHandler* handler, uint32 id) const { 129 return (handler == NULL || handler == phandler) 130 && (id == MQID_ANY || id == message_id); 131 } 132 MessageHandler *phandler; 133 uint32 message_id; 134 MessageData *pdata; 135 uint32 ts_sensitive; 136}; 137 138typedef std::list<Message> MessageList; 139 140// DelayedMessage goes into a priority queue, sorted by trigger time. Messages 141// with the same trigger time are processed in num_ (FIFO) order. 142 143class DelayedMessage { 144 public: 145 DelayedMessage(int delay, uint32 trigger, uint32 num, const Message& msg) 146 : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) { } 147 148 bool operator< (const DelayedMessage& dmsg) const { 149 return (dmsg.msTrigger_ < msTrigger_) 150 || ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_)); 151 } 152 153 int cmsDelay_; // for debugging 154 uint32 msTrigger_; 155 uint32 num_; 156 Message msg_; 157}; 158 159class MessageQueue { 160 public: 161 explicit MessageQueue(SocketServer* ss = NULL); 162 virtual ~MessageQueue(); 163 164 SocketServer* socketserver() { return ss_; } 165 void set_socketserver(SocketServer* ss); 166 167 // Note: The behavior of MessageQueue has changed. When a MQ is stopped, 168 // futher Posts and Sends will fail. However, any pending Sends and *ready* 169 // Posts (as opposed to unexpired delayed Posts) will be delivered before 170 // Get (or Peek) returns false. By guaranteeing delivery of those messages, 171 // we eliminate the race condition when an MessageHandler and MessageQueue 172 // may be destroyed independently of each other. 173 virtual void Quit(); 174 virtual bool IsQuitting(); 175 virtual void Restart(); 176 177 // Get() will process I/O until: 178 // 1) A message is available (returns true) 179 // 2) cmsWait seconds have elapsed (returns false) 180 // 3) Stop() is called (returns false) 181 virtual bool Get(Message *pmsg, int cmsWait = kForever, 182 bool process_io = true); 183 virtual bool Peek(Message *pmsg, int cmsWait = 0); 184 virtual void Post(MessageHandler *phandler, uint32 id = 0, 185 MessageData *pdata = NULL, bool time_sensitive = false); 186 virtual void PostDelayed(int cmsDelay, MessageHandler *phandler, 187 uint32 id = 0, MessageData *pdata = NULL) { 188 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); 189 } 190 virtual void PostAt(uint32 tstamp, MessageHandler *phandler, 191 uint32 id = 0, MessageData *pdata = NULL) { 192 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); 193 } 194 virtual void Clear(MessageHandler *phandler, uint32 id = MQID_ANY, 195 MessageList* removed = NULL); 196 virtual void Dispatch(Message *pmsg); 197 virtual void ReceiveSends(); 198 199 // Amount of time until the next message can be retrieved 200 virtual int GetDelay(); 201 202 bool empty() const { return msgq_.empty() && dmsgq_.empty() && !fPeekKeep_; } 203 size_t size() const { return msgq_.size() + dmsgq_.size() + fPeekKeep_; } 204 205 // Internally posts a message which causes the doomed object to be deleted 206 template<class T> void Dispose(T* doomed) { 207 if (doomed) { 208 Post(NULL, MQID_DISPOSE, new DisposeData<T>(doomed)); 209 } 210 } 211 212 // When this signal is sent out, any references to this queue should 213 // no longer be used. 214 sigslot::signal0<> SignalQueueDestroyed; 215 216 protected: 217 class PriorityQueue : public std::priority_queue<DelayedMessage> { 218 public: 219 container_type& container() { return c; } 220 void reheap() { make_heap(c.begin(), c.end(), comp); } 221 }; 222 223 void EnsureActive(); 224 void DoDelayPost(int cmsDelay, uint32 tstamp, MessageHandler *phandler, 225 uint32 id, MessageData* pdata); 226 227 // The SocketServer is not owned by MessageQueue. 228 SocketServer* ss_; 229 // If a server isn't supplied in the constructor, use this one. 230 scoped_ptr<SocketServer> default_ss_; 231 bool fStop_; 232 bool fPeekKeep_; 233 Message msgPeek_; 234 // A message queue is active if it has ever had a message posted to it. 235 // This also corresponds to being in MessageQueueManager's global list. 236 bool active_; 237 MessageList msgq_; 238 PriorityQueue dmsgq_; 239 uint32 dmsgq_next_num_; 240 CriticalSection crit_; 241}; 242 243} // namespace talk_base 244 245#endif // TALK_BASE_MESSAGEQUEUE_H_ 246