1// Copyright (c) 2011 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "content/child/webmessageportchannel_impl.h"
6
7#include "base/bind.h"
8#include "base/message_loop/message_loop_proxy.h"
9#include "content/child/child_process.h"
10#include "content/child/child_thread.h"
11#include "content/common/message_port_messages.h"
12#include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
13#include "third_party/WebKit/public/platform/WebString.h"
14
15using blink::WebMessagePortChannel;
16using blink::WebMessagePortChannelArray;
17using blink::WebMessagePortChannelClient;
18using blink::WebString;
19
20namespace content {
21
22WebMessagePortChannelImpl::WebMessagePortChannelImpl(
23    const scoped_refptr<base::MessageLoopProxy>& child_thread_loop)
24    : client_(NULL),
25      route_id_(MSG_ROUTING_NONE),
26      message_port_id_(MSG_ROUTING_NONE),
27      child_thread_loop_(child_thread_loop) {
28  AddRef();
29  Init();
30}
31
32WebMessagePortChannelImpl::WebMessagePortChannelImpl(
33    int route_id,
34    int message_port_id,
35    const scoped_refptr<base::MessageLoopProxy>& child_thread_loop)
36    : client_(NULL),
37      route_id_(route_id),
38      message_port_id_(message_port_id),
39      child_thread_loop_(child_thread_loop) {
40  AddRef();
41  Init();
42}
43
44WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
45  // If we have any queued messages with attached ports, manually destroy them.
46  while (!message_queue_.empty()) {
47    const std::vector<WebMessagePortChannelImpl*>& channel_array =
48        message_queue_.front().ports;
49    for (size_t i = 0; i < channel_array.size(); i++) {
50      channel_array[i]->destroy();
51    }
52    message_queue_.pop();
53  }
54
55  if (message_port_id_ != MSG_ROUTING_NONE)
56    Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_));
57
58  if (route_id_ != MSG_ROUTING_NONE)
59    ChildThread::current()->GetRouter()->RemoveRoute(route_id_);
60}
61
62// static
63void WebMessagePortChannelImpl::CreatePair(
64    const scoped_refptr<base::MessageLoopProxy>& child_thread_loop,
65    blink::WebMessagePortChannel** channel1,
66    blink::WebMessagePortChannel** channel2) {
67  WebMessagePortChannelImpl* impl1 =
68      new WebMessagePortChannelImpl(child_thread_loop);
69  WebMessagePortChannelImpl* impl2 =
70      new WebMessagePortChannelImpl(child_thread_loop);
71
72  impl1->Entangle(impl2);
73  impl2->Entangle(impl1);
74
75  *channel1 = impl1;
76  *channel2 = impl2;
77}
78
79// static
80std::vector<int> WebMessagePortChannelImpl::ExtractMessagePortIDs(
81    WebMessagePortChannelArray* channels) {
82  std::vector<int> message_port_ids;
83  if (channels) {
84    message_port_ids.resize(channels->size());
85    // Extract the port IDs from the source array, then free it.
86    for (size_t i = 0; i < channels->size(); ++i) {
87      WebMessagePortChannelImpl* webchannel =
88          static_cast<WebMessagePortChannelImpl*>((*channels)[i]);
89      // The message port ids might not be set up yet if this channel
90      // wasn't created on the main thread.
91      DCHECK(webchannel->child_thread_loop_->BelongsToCurrentThread());
92      message_port_ids[i] = webchannel->message_port_id();
93      webchannel->QueueMessages();
94      DCHECK(message_port_ids[i] != MSG_ROUTING_NONE);
95    }
96    delete channels;
97  }
98  return message_port_ids;
99}
100
101void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
102  // Must lock here since client_ is called on the main thread.
103  base::AutoLock auto_lock(lock_);
104  client_ = client;
105}
106
107void WebMessagePortChannelImpl::destroy() {
108  setClient(NULL);
109
110  // Release the object on the main thread, since the destructor might want to
111  // send an IPC, and that has to happen on the main thread.
112  child_thread_loop_->ReleaseSoon(FROM_HERE, this);
113}
114
115void WebMessagePortChannelImpl::postMessage(
116    const WebString& message,
117    WebMessagePortChannelArray* channels) {
118  if (!child_thread_loop_->BelongsToCurrentThread()) {
119    child_thread_loop_->PostTask(
120        FROM_HERE,
121        base::Bind(
122            &WebMessagePortChannelImpl::PostMessage, this,
123            static_cast<base::string16>(message), channels));
124  } else {
125    PostMessage(message, channels);
126  }
127}
128
129void WebMessagePortChannelImpl::PostMessage(
130    const base::string16& message,
131    WebMessagePortChannelArray* channels) {
132  IPC::Message* msg = new MessagePortHostMsg_PostMessage(
133      message_port_id_, message, ExtractMessagePortIDs(channels));
134  Send(msg);
135}
136
137bool WebMessagePortChannelImpl::tryGetMessage(
138    WebString* message,
139    WebMessagePortChannelArray& channels) {
140  base::AutoLock auto_lock(lock_);
141  if (message_queue_.empty())
142    return false;
143
144  *message = message_queue_.front().message;
145  const std::vector<WebMessagePortChannelImpl*>& channel_array =
146      message_queue_.front().ports;
147  WebMessagePortChannelArray result_ports(channel_array.size());
148  for (size_t i = 0; i < channel_array.size(); i++) {
149    result_ports[i] = channel_array[i];
150  }
151
152  channels.swap(result_ports);
153  message_queue_.pop();
154  return true;
155}
156
157void WebMessagePortChannelImpl::Init() {
158  if (!child_thread_loop_->BelongsToCurrentThread()) {
159    child_thread_loop_->PostTask(
160        FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this));
161    return;
162  }
163
164  if (route_id_ == MSG_ROUTING_NONE) {
165    DCHECK(message_port_id_ == MSG_ROUTING_NONE);
166    Send(new MessagePortHostMsg_CreateMessagePort(
167        &route_id_, &message_port_id_));
168  }
169
170  ChildThread::current()->GetRouter()->AddRoute(route_id_, this);
171}
172
173void WebMessagePortChannelImpl::Entangle(
174    scoped_refptr<WebMessagePortChannelImpl> channel) {
175  // The message port ids might not be set up yet, if this channel wasn't
176  // created on the main thread.  So need to wait until we're on the main thread
177  // before getting the other message port id.
178  if (!child_thread_loop_->BelongsToCurrentThread()) {
179    child_thread_loop_->PostTask(
180        FROM_HERE,
181        base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel));
182    return;
183  }
184
185  Send(new MessagePortHostMsg_Entangle(
186      message_port_id_, channel->message_port_id()));
187}
188
189void WebMessagePortChannelImpl::QueueMessages() {
190  if (!child_thread_loop_->BelongsToCurrentThread()) {
191    child_thread_loop_->PostTask(
192        FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this));
193    return;
194  }
195  // This message port is being sent elsewhere (perhaps to another process).
196  // The new endpoint needs to receive the queued messages, including ones that
197  // could still be in-flight.  So we tell the browser to queue messages, and it
198  // sends us an ack, whose receipt we know means that no more messages are
199  // in-flight.  We then send the queued messages to the browser, which prepends
200  // them to the ones it queued and it sends them to the new endpoint.
201  Send(new MessagePortHostMsg_QueueMessages(message_port_id_));
202
203  // The process could potentially go away while we're still waiting for
204  // in-flight messages.  Ensure it stays alive.
205  ChildProcess::current()->AddRefProcess();
206}
207
208void WebMessagePortChannelImpl::Send(IPC::Message* message) {
209  if (!child_thread_loop_->BelongsToCurrentThread()) {
210    DCHECK(!message->is_sync());
211    child_thread_loop_->PostTask(
212        FROM_HERE,
213        base::Bind(&WebMessagePortChannelImpl::Send, this, message));
214    return;
215  }
216
217  ChildThread::current()->GetRouter()->Send(message);
218}
219
220bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
221  bool handled = true;
222  IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
223    IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage)
224    IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued)
225    IPC_MESSAGE_UNHANDLED(handled = false)
226  IPC_END_MESSAGE_MAP()
227  return handled;
228}
229
230void WebMessagePortChannelImpl::OnMessage(
231    const base::string16& message,
232    const std::vector<int>& sent_message_port_ids,
233    const std::vector<int>& new_routing_ids) {
234  base::AutoLock auto_lock(lock_);
235  Message msg;
236  msg.message = message;
237  if (!sent_message_port_ids.empty()) {
238    msg.ports.resize(sent_message_port_ids.size());
239    for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
240      msg.ports[i] = new WebMessagePortChannelImpl(new_routing_ids[i],
241                                                   sent_message_port_ids[i],
242                                                   child_thread_loop_.get());
243    }
244  }
245
246  bool was_empty = message_queue_.empty();
247  message_queue_.push(msg);
248  if (client_ && was_empty)
249    client_->messageAvailable();
250}
251
252void WebMessagePortChannelImpl::OnMessagesQueued() {
253  std::vector<QueuedMessage> queued_messages;
254
255  {
256    base::AutoLock auto_lock(lock_);
257    queued_messages.reserve(message_queue_.size());
258    while (!message_queue_.empty()) {
259      base::string16 message = message_queue_.front().message;
260      const std::vector<WebMessagePortChannelImpl*>& channel_array =
261          message_queue_.front().ports;
262      std::vector<int> port_ids(channel_array.size());
263      for (size_t i = 0; i < channel_array.size(); ++i) {
264        port_ids[i] = channel_array[i]->message_port_id();
265        channel_array[i]->QueueMessages();
266      }
267      queued_messages.push_back(std::make_pair(message, port_ids));
268      message_queue_.pop();
269    }
270  }
271
272  Send(new MessagePortHostMsg_SendQueuedMessages(
273      message_port_id_, queued_messages));
274
275  message_port_id_ = MSG_ROUTING_NONE;
276
277  Release();
278  ChildProcess::current()->ReleaseProcess();
279}
280
281WebMessagePortChannelImpl::Message::Message() {}
282
283WebMessagePortChannelImpl::Message::~Message() {}
284
285}  // namespace content
286