1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "jingle/glue/thread_wrapper.h"
6
7#include "base/bind.h"
8#include "base/bind_helpers.h"
9#include "base/lazy_instance.h"
10#include "base/threading/thread_local.h"
11#include "third_party/webrtc/base/nullsocketserver.h"
12
13namespace jingle_glue {
14
15struct JingleThreadWrapper::PendingSend {
16  PendingSend(const rtc::Message& message_value)
17      : sending_thread(JingleThreadWrapper::current()),
18        message(message_value),
19        done_event(true, false) {
20    DCHECK(sending_thread);
21  }
22
23  JingleThreadWrapper* sending_thread;
24  rtc::Message message;
25  base::WaitableEvent done_event;
26};
27
28base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> >
29    g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER;
30
31// static
32void JingleThreadWrapper::EnsureForCurrentMessageLoop() {
33  if (JingleThreadWrapper::current() == NULL) {
34    base::MessageLoop* message_loop = base::MessageLoop::current();
35    g_jingle_thread_wrapper.Get()
36        .Set(new JingleThreadWrapper(message_loop->message_loop_proxy()));
37    message_loop->AddDestructionObserver(current());
38  }
39
40  DCHECK_EQ(rtc::Thread::Current(), current());
41}
42
43// static
44JingleThreadWrapper* JingleThreadWrapper::current() {
45  return g_jingle_thread_wrapper.Get().Get();
46}
47
48JingleThreadWrapper::JingleThreadWrapper(
49    scoped_refptr<base::SingleThreadTaskRunner> task_runner)
50    : rtc::Thread(new rtc::NullSocketServer()),
51      task_runner_(task_runner),
52      send_allowed_(false),
53      last_task_id_(0),
54      pending_send_event_(true, false),
55      weak_ptr_factory_(this) {
56  DCHECK(task_runner->BelongsToCurrentThread());
57  DCHECK(!rtc::Thread::Current());
58  weak_ptr_ = weak_ptr_factory_.GetWeakPtr();
59  rtc::MessageQueueManager::Add(this);
60  SafeWrapCurrent();
61}
62
63JingleThreadWrapper::~JingleThreadWrapper() {
64  Clear(NULL, rtc::MQID_ANY, NULL);
65}
66
67void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
68  DCHECK_EQ(rtc::Thread::Current(), current());
69  UnwrapCurrent();
70  g_jingle_thread_wrapper.Get().Set(NULL);
71  rtc::ThreadManager::Instance()->SetCurrentThread(NULL);
72  rtc::MessageQueueManager::Remove(this);
73  rtc::SocketServer* ss = socketserver();
74  delete this;
75  delete ss;
76}
77
78void JingleThreadWrapper::Post(
79    rtc::MessageHandler* handler, uint32 message_id,
80    rtc::MessageData* data, bool time_sensitive) {
81  PostTaskInternal(0, handler, message_id, data);
82}
83
84void JingleThreadWrapper::PostDelayed(
85    int delay_ms, rtc::MessageHandler* handler,
86    uint32 message_id, rtc::MessageData* data) {
87  PostTaskInternal(delay_ms, handler, message_id, data);
88}
89
90void JingleThreadWrapper::Clear(rtc::MessageHandler* handler, uint32 id,
91                                rtc::MessageList* removed) {
92  base::AutoLock auto_lock(lock_);
93
94  for (MessagesQueue::iterator it = messages_.begin();
95       it != messages_.end();) {
96    MessagesQueue::iterator next = it;
97    ++next;
98
99    if (it->second.Match(handler, id)) {
100      if (removed) {
101        removed->push_back(it->second);
102      } else {
103        delete it->second.pdata;
104      }
105      messages_.erase(it);
106    }
107
108    it = next;
109  }
110
111  for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin();
112       it != pending_send_messages_.end();) {
113    std::list<PendingSend*>::iterator next = it;
114    ++next;
115
116    if ((*it)->message.Match(handler, id)) {
117      if (removed) {
118        removed ->push_back((*it)->message);
119      } else {
120        delete (*it)->message.pdata;
121      }
122      (*it)->done_event.Signal();
123      pending_send_messages_.erase(it);
124    }
125
126    it = next;
127  }
128}
129
130void JingleThreadWrapper::Send(rtc::MessageHandler *handler, uint32 id,
131                               rtc::MessageData *data) {
132  if (fStop_)
133    return;
134
135  JingleThreadWrapper* current_thread = JingleThreadWrapper::current();
136  DCHECK(current_thread != NULL) << "Send() can be called only from a "
137      "thread that has JingleThreadWrapper.";
138
139  rtc::Message message;
140  message.phandler = handler;
141  message.message_id = id;
142  message.pdata = data;
143
144  if (current_thread == this) {
145    handler->OnMessage(&message);
146    return;
147  }
148
149  // Send message from a thread different than |this|.
150
151  // Allow inter-thread send only from threads that have
152  // |send_allowed_| flag set.
153  DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous "
154      "messages is not allowed from the current thread.";
155
156  PendingSend pending_send(message);
157  {
158    base::AutoLock auto_lock(lock_);
159    pending_send_messages_.push_back(&pending_send);
160  }
161
162  // Need to signal |pending_send_event_| here in case the thread is
163  // sending message to another thread.
164  pending_send_event_.Signal();
165  task_runner_->PostTask(FROM_HERE,
166                         base::Bind(&JingleThreadWrapper::ProcessPendingSends,
167                                    weak_ptr_));
168
169
170  while (!pending_send.done_event.IsSignaled()) {
171    base::WaitableEvent* events[] = {&pending_send.done_event,
172                                     &current_thread->pending_send_event_};
173    size_t event = base::WaitableEvent::WaitMany(events, arraysize(events));
174    DCHECK(event == 0 || event == 1);
175
176    if (event == 1)
177      current_thread->ProcessPendingSends();
178  }
179}
180
181void JingleThreadWrapper::ProcessPendingSends() {
182  while (true) {
183    PendingSend* pending_send = NULL;
184    {
185      base::AutoLock auto_lock(lock_);
186      if (!pending_send_messages_.empty()) {
187        pending_send = pending_send_messages_.front();
188        pending_send_messages_.pop_front();
189      } else {
190        // Reset the event while |lock_| is still locked.
191        pending_send_event_.Reset();
192        break;
193      }
194    }
195    if (pending_send) {
196      pending_send->message.phandler->OnMessage(&pending_send->message);
197      pending_send->done_event.Signal();
198    }
199  }
200}
201
202void JingleThreadWrapper::PostTaskInternal(
203    int delay_ms, rtc::MessageHandler* handler,
204    uint32 message_id, rtc::MessageData* data) {
205  int task_id;
206  rtc::Message message;
207  message.phandler = handler;
208  message.message_id = message_id;
209  message.pdata = data;
210  {
211    base::AutoLock auto_lock(lock_);
212    task_id = ++last_task_id_;
213    messages_.insert(std::pair<int, rtc::Message>(task_id, message));
214  }
215
216  if (delay_ms <= 0) {
217    task_runner_->PostTask(FROM_HERE,
218                           base::Bind(&JingleThreadWrapper::RunTask,
219                                      weak_ptr_, task_id));
220  } else {
221    task_runner_->PostDelayedTask(FROM_HERE,
222                                  base::Bind(&JingleThreadWrapper::RunTask,
223                                             weak_ptr_, task_id),
224                                  base::TimeDelta::FromMilliseconds(delay_ms));
225  }
226}
227
228void JingleThreadWrapper::RunTask(int task_id) {
229  bool have_message = false;
230  rtc::Message message;
231  {
232    base::AutoLock auto_lock(lock_);
233    MessagesQueue::iterator it = messages_.find(task_id);
234    if (it != messages_.end()) {
235      have_message = true;
236      message = it->second;
237      messages_.erase(it);
238    }
239  }
240
241  if (have_message) {
242    if (message.message_id == rtc::MQID_DISPOSE) {
243      DCHECK(message.phandler == NULL);
244      delete message.pdata;
245    } else {
246      message.phandler->OnMessage(&message);
247    }
248  }
249}
250
251// All methods below are marked as not reached. See comments in the
252// header for more details.
253void JingleThreadWrapper::Quit() {
254  NOTREACHED();
255}
256
257bool JingleThreadWrapper::IsQuitting() {
258  NOTREACHED();
259  return false;
260}
261
262void JingleThreadWrapper::Restart() {
263  NOTREACHED();
264}
265
266bool JingleThreadWrapper::Get(rtc::Message*, int, bool) {
267  NOTREACHED();
268  return false;
269}
270
271bool JingleThreadWrapper::Peek(rtc::Message*, int) {
272  NOTREACHED();
273  return false;
274}
275
276void JingleThreadWrapper::PostAt(uint32, rtc::MessageHandler*,
277                                 uint32, rtc::MessageData*) {
278  NOTREACHED();
279}
280
281void JingleThreadWrapper::Dispatch(rtc::Message* message) {
282  NOTREACHED();
283}
284
285void JingleThreadWrapper::ReceiveSends() {
286  NOTREACHED();
287}
288
289int JingleThreadWrapper::GetDelay() {
290  NOTREACHED();
291  return 0;
292}
293
294void JingleThreadWrapper::Stop() {
295  NOTREACHED();
296}
297
298void JingleThreadWrapper::Run() {
299  NOTREACHED();
300}
301
302}  // namespace jingle_glue
303