1/*
2 * libjingle
3 * Copyright 2004 Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#ifndef TALK_BASE_THREAD_H_
29#define TALK_BASE_THREAD_H_
30
31#include <algorithm>
32#include <list>
33#include <string>
34#include <vector>
35
36#ifdef POSIX
37#include <pthread.h>
38#endif
39#include "talk/base/constructormagic.h"
40#include "talk/base/event.h"
41#include "talk/base/messagequeue.h"
42
43#ifdef WIN32
44#include "talk/base/win32.h"
45#endif
46
47namespace talk_base {
48
49class Thread;
50
51class ThreadManager {
52 public:
53  ThreadManager();
54  ~ThreadManager();
55
56  static ThreadManager* Instance();
57
58  Thread* CurrentThread();
59  void SetCurrentThread(Thread* thread);
60
61  // Returns a thread object with its thread_ ivar set
62  // to whatever the OS uses to represent the thread.
63  // If there already *is* a Thread object corresponding to this thread,
64  // this method will return that.  Otherwise it creates a new Thread
65  // object whose wrapped() method will return true, and whose
66  // handle will, on Win32, be opened with only synchronization privileges -
67  // if you need more privilegs, rather than changing this method, please
68  // write additional code to adjust the privileges, or call a different
69  // factory method of your own devising, because this one gets used in
70  // unexpected contexts (like inside browser plugins) and it would be a
71  // shame to break it.  It is also conceivable on Win32 that we won't even
72  // be able to get synchronization privileges, in which case the result
73  // will have a NULL handle.
74  Thread *WrapCurrentThread();
75  void UnwrapCurrentThread();
76
77 private:
78#ifdef POSIX
79  pthread_key_t key_;
80#endif
81
82#ifdef WIN32
83  DWORD key_;
84#endif
85
86  DISALLOW_COPY_AND_ASSIGN(ThreadManager);
87};
88
89struct _SendMessage {
90  _SendMessage() {}
91  Thread *thread;
92  Message msg;
93  bool *ready;
94};
95
96enum ThreadPriority {
97  PRIORITY_IDLE = -1,
98  PRIORITY_NORMAL = 0,
99  PRIORITY_ABOVE_NORMAL = 1,
100  PRIORITY_HIGH = 2,
101};
102
103class Runnable {
104 public:
105  virtual ~Runnable() {}
106  virtual void Run(Thread* thread) = 0;
107
108 protected:
109  Runnable() {}
110
111 private:
112  DISALLOW_COPY_AND_ASSIGN(Runnable);
113};
114
115// WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS!  See ~Thread().
116
117class Thread : public MessageQueue {
118 public:
119  explicit Thread(SocketServer* ss = NULL);
120  // NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or
121  // guarantee Stop() is explicitly called before the subclass is destroyed).
122  // This is required to avoid a data race between the destructor modifying the
123  // vtable, and the Thread::PreRun calling the virtual method Run().
124  virtual ~Thread();
125
126  static Thread* Current();
127
128  bool IsCurrent() const {
129    return Current() == this;
130  }
131
132  // Sleeps the calling thread for the specified number of milliseconds, during
133  // which time no processing is performed. Returns false if sleeping was
134  // interrupted by a signal (POSIX only).
135  static bool SleepMs(int millis);
136
137  // Sets the thread's name, for debugging. Must be called before Start().
138  // If |obj| is non-NULL, its value is appended to |name|.
139  const std::string& name() const { return name_; }
140  bool SetName(const std::string& name, const void* obj);
141
142  // Sets the thread's priority. Must be called before Start().
143  ThreadPriority priority() const { return priority_; }
144  bool SetPriority(ThreadPriority priority);
145
146  // Starts the execution of the thread.
147  bool Start(Runnable* runnable = NULL);
148
149  // Tells the thread to stop and waits until it is joined.
150  // Never call Stop on the current thread.  Instead use the inherited Quit
151  // function which will exit the base MessageQueue without terminating the
152  // underlying OS thread.
153  virtual void Stop();
154
155  // By default, Thread::Run() calls ProcessMessages(kForever).  To do other
156  // work, override Run().  To receive and dispatch messages, call
157  // ProcessMessages occasionally.
158  virtual void Run();
159
160  virtual void Send(MessageHandler *phandler, uint32 id = 0,
161      MessageData *pdata = NULL);
162
163  // Convenience method to invoke a functor on another thread.  Caller must
164  // provide the |ReturnT| template argument, which cannot (easily) be deduced.
165  // Uses Send() internally, which blocks the current thread until execution
166  // is complete.
167  // Ex: bool result = thread.Invoke<bool>(&MyFunctionReturningBool);
168  template <class ReturnT, class FunctorT>
169  ReturnT Invoke(const FunctorT& functor) {
170    FunctorMessageHandler<ReturnT, FunctorT> handler(functor);
171    Send(&handler);
172    return handler.result();
173  }
174
175  // From MessageQueue
176  virtual void Clear(MessageHandler *phandler, uint32 id = MQID_ANY,
177                     MessageList* removed = NULL);
178  virtual void ReceiveSends();
179
180  // ProcessMessages will process I/O and dispatch messages until:
181  //  1) cms milliseconds have elapsed (returns true)
182  //  2) Stop() is called (returns false)
183  bool ProcessMessages(int cms);
184
185  // Returns true if this is a thread that we created using the standard
186  // constructor, false if it was created by a call to
187  // ThreadManager::WrapCurrentThread().  The main thread of an application
188  // is generally not owned, since the OS representation of the thread
189  // obviously exists before we can get to it.
190  // You cannot call Start on non-owned threads.
191  bool IsOwned();
192
193#ifdef WIN32
194  HANDLE GetHandle() const {
195    return thread_;
196  }
197  DWORD GetId() const {
198    return thread_id_;
199  }
200#elif POSIX
201  pthread_t GetPThread() {
202    return thread_;
203  }
204#endif
205
206  // This method should be called when thread is created using non standard
207  // method, like derived implementation of talk_base::Thread and it can not be
208  // started by calling Start(). This will set started flag to true and
209  // owned to false. This must be called from the current thread.
210  // NOTE: These methods should be used by the derived classes only, added here
211  // only for testing.
212  bool WrapCurrent();
213  void UnwrapCurrent();
214
215  // Expose private method running() for tests.
216  //
217  // DANGER: this is a terrible public API.  Most callers that might want to
218  // call this likely do not have enough control/knowledge of the Thread in
219  // question to guarantee that the returned value remains true for the duration
220  // of whatever code is conditionally executing because of the return value!
221  bool RunningForTest() { return running(); }
222  // This is a legacy call-site that probably doesn't need to exist in the first
223  // place.
224  // TODO(fischman): delete once the ASSERT added in channelmanager.cc sticks
225  // for a month (ETA 2014/06/22).
226  bool RunningForChannelManager() { return running(); }
227
228 protected:
229  // Blocks the calling thread until this thread has terminated.
230  void Join();
231
232 private:
233  static void *PreRun(void *pv);
234
235  // ThreadManager calls this instead WrapCurrent() because
236  // ThreadManager::Instance() cannot be used while ThreadManager is
237  // being created.
238  bool WrapCurrentWithThreadManager(ThreadManager* thread_manager);
239
240  // Return true if the thread was started and hasn't yet stopped.
241  bool running() { return running_.Wait(0); }
242
243  std::list<_SendMessage> sendlist_;
244  std::string name_;
245  ThreadPriority priority_;
246  Event running_;  // Signalled means running.
247
248#ifdef POSIX
249  pthread_t thread_;
250#endif
251
252#ifdef WIN32
253  HANDLE thread_;
254  DWORD thread_id_;
255#endif
256
257  bool owned_;
258
259  friend class ThreadManager;
260
261  DISALLOW_COPY_AND_ASSIGN(Thread);
262};
263
264// AutoThread automatically installs itself at construction
265// uninstalls at destruction, if a Thread object is
266// _not already_ associated with the current OS thread.
267
268class AutoThread : public Thread {
269 public:
270  explicit AutoThread(SocketServer* ss = 0);
271  virtual ~AutoThread();
272
273 private:
274  DISALLOW_COPY_AND_ASSIGN(AutoThread);
275};
276
277// Win32 extension for threads that need to use COM
278#ifdef WIN32
279class ComThread : public Thread {
280 public:
281  ComThread() {}
282  virtual ~ComThread() { Stop(); }
283
284 protected:
285  virtual void Run();
286
287 private:
288  DISALLOW_COPY_AND_ASSIGN(ComThread);
289};
290#endif
291
292// Provides an easy way to install/uninstall a socketserver on a thread.
293class SocketServerScope {
294 public:
295  explicit SocketServerScope(SocketServer* ss) {
296    old_ss_ = Thread::Current()->socketserver();
297    Thread::Current()->set_socketserver(ss);
298  }
299  ~SocketServerScope() {
300    Thread::Current()->set_socketserver(old_ss_);
301  }
302
303 private:
304  SocketServer* old_ss_;
305
306  DISALLOW_IMPLICIT_CONSTRUCTORS(SocketServerScope);
307};
308
309}  // namespace talk_base
310
311#endif  // TALK_BASE_THREAD_H_
312