1// Copyright (c) 2011 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "content/child/webmessageportchannel_impl.h"
6
7#include "base/bind.h"
8#include "base/message_loop/message_loop_proxy.h"
9#include "content/child/child_process.h"
10#include "content/child/child_thread.h"
11#include "content/common/message_port_messages.h"
12#include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
13#include "third_party/WebKit/public/platform/WebString.h"
14
15using blink::WebMessagePortChannel;
16using blink::WebMessagePortChannelArray;
17using blink::WebMessagePortChannelClient;
18using blink::WebString;
19
20namespace content {
21
22WebMessagePortChannelImpl::WebMessagePortChannelImpl(
23    base::MessageLoopProxy* child_thread_loop)
24    : client_(NULL),
25      route_id_(MSG_ROUTING_NONE),
26      message_port_id_(MSG_ROUTING_NONE),
27      child_thread_loop_(child_thread_loop) {
28  AddRef();
29  Init();
30}
31
32WebMessagePortChannelImpl::WebMessagePortChannelImpl(
33    int route_id,
34    int message_port_id,
35    base::MessageLoopProxy* child_thread_loop)
36    : client_(NULL),
37      route_id_(route_id),
38      message_port_id_(message_port_id),
39      child_thread_loop_(child_thread_loop) {
40  AddRef();
41  Init();
42}
43
44WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
45  // If we have any queued messages with attached ports, manually destroy them.
46  while (!message_queue_.empty()) {
47    const std::vector<WebMessagePortChannelImpl*>& channel_array =
48        message_queue_.front().ports;
49    for (size_t i = 0; i < channel_array.size(); i++) {
50      channel_array[i]->destroy();
51    }
52    message_queue_.pop();
53  }
54
55  if (message_port_id_ != MSG_ROUTING_NONE)
56    Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_));
57
58  if (route_id_ != MSG_ROUTING_NONE)
59    ChildThread::current()->GetRouter()->RemoveRoute(route_id_);
60}
61
62// static
63void WebMessagePortChannelImpl::CreatePair(
64    base::MessageLoopProxy* child_thread_loop,
65    blink::WebMessagePortChannel** channel1,
66    blink::WebMessagePortChannel** channel2) {
67  WebMessagePortChannelImpl* impl1 =
68      new WebMessagePortChannelImpl(child_thread_loop);
69  WebMessagePortChannelImpl* impl2 =
70      new WebMessagePortChannelImpl(child_thread_loop);
71
72  impl1->Entangle(impl2);
73  impl2->Entangle(impl1);
74
75  *channel1 = impl1;
76  *channel2 = impl2;
77}
78
79// static
80std::vector<int> WebMessagePortChannelImpl::ExtractMessagePortIDs(
81    WebMessagePortChannelArray* channels) {
82  std::vector<int> message_port_ids;
83  if (channels) {
84    message_port_ids.resize(channels->size());
85    // Extract the port IDs from the source array, then free it.
86    for (size_t i = 0; i < channels->size(); ++i) {
87      WebMessagePortChannelImpl* webchannel =
88          static_cast<WebMessagePortChannelImpl*>((*channels)[i]);
89      // The message port ids might not be set up yet if this channel
90      // wasn't created on the main thread.
91      DCHECK(webchannel->child_thread_loop_->BelongsToCurrentThread());
92      message_port_ids[i] = webchannel->message_port_id();
93      webchannel->QueueMessages();
94      DCHECK(message_port_ids[i] != MSG_ROUTING_NONE);
95    }
96    delete channels;
97  }
98  return message_port_ids;
99}
100
101void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
102  // Must lock here since client_ is called on the main thread.
103  base::AutoLock auto_lock(lock_);
104  client_ = client;
105}
106
107void WebMessagePortChannelImpl::destroy() {
108  setClient(NULL);
109
110  // Release the object on the main thread, since the destructor might want to
111  // send an IPC, and that has to happen on the main thread.
112  child_thread_loop_->ReleaseSoon(FROM_HERE, this);
113}
114
115void WebMessagePortChannelImpl::postMessage(
116    const WebString& message,
117    WebMessagePortChannelArray* channels) {
118  if (!child_thread_loop_->BelongsToCurrentThread()) {
119    child_thread_loop_->PostTask(
120        FROM_HERE,
121        base::Bind(
122            &WebMessagePortChannelImpl::PostMessage, this, message, channels));
123  } else {
124    PostMessage(message, channels);
125  }
126}
127
128void WebMessagePortChannelImpl::PostMessage(
129    const base::string16& message,
130    WebMessagePortChannelArray* channels) {
131  IPC::Message* msg = new MessagePortHostMsg_PostMessage(
132      message_port_id_, message, ExtractMessagePortIDs(channels));
133  Send(msg);
134}
135
136bool WebMessagePortChannelImpl::tryGetMessage(
137    WebString* message,
138    WebMessagePortChannelArray& channels) {
139  base::AutoLock auto_lock(lock_);
140  if (message_queue_.empty())
141    return false;
142
143  *message = message_queue_.front().message;
144  const std::vector<WebMessagePortChannelImpl*>& channel_array =
145      message_queue_.front().ports;
146  WebMessagePortChannelArray result_ports(channel_array.size());
147  for (size_t i = 0; i < channel_array.size(); i++) {
148    result_ports[i] = channel_array[i];
149  }
150
151  channels.swap(result_ports);
152  message_queue_.pop();
153  return true;
154}
155
156void WebMessagePortChannelImpl::Init() {
157  if (!child_thread_loop_->BelongsToCurrentThread()) {
158    child_thread_loop_->PostTask(
159        FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this));
160    return;
161  }
162
163  if (route_id_ == MSG_ROUTING_NONE) {
164    DCHECK(message_port_id_ == MSG_ROUTING_NONE);
165    Send(new MessagePortHostMsg_CreateMessagePort(
166        &route_id_, &message_port_id_));
167  }
168
169  ChildThread::current()->GetRouter()->AddRoute(route_id_, this);
170}
171
172void WebMessagePortChannelImpl::Entangle(
173    scoped_refptr<WebMessagePortChannelImpl> channel) {
174  // The message port ids might not be set up yet, if this channel wasn't
175  // created on the main thread.  So need to wait until we're on the main thread
176  // before getting the other message port id.
177  if (!child_thread_loop_->BelongsToCurrentThread()) {
178    child_thread_loop_->PostTask(
179        FROM_HERE,
180        base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel));
181    return;
182  }
183
184  Send(new MessagePortHostMsg_Entangle(
185      message_port_id_, channel->message_port_id()));
186}
187
188void WebMessagePortChannelImpl::QueueMessages() {
189  if (!child_thread_loop_->BelongsToCurrentThread()) {
190    child_thread_loop_->PostTask(
191        FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this));
192    return;
193  }
194  // This message port is being sent elsewhere (perhaps to another process).
195  // The new endpoint needs to receive the queued messages, including ones that
196  // could still be in-flight.  So we tell the browser to queue messages, and it
197  // sends us an ack, whose receipt we know means that no more messages are
198  // in-flight.  We then send the queued messages to the browser, which prepends
199  // them to the ones it queued and it sends them to the new endpoint.
200  Send(new MessagePortHostMsg_QueueMessages(message_port_id_));
201
202  // The process could potentially go away while we're still waiting for
203  // in-flight messages.  Ensure it stays alive.
204  ChildProcess::current()->AddRefProcess();
205}
206
207void WebMessagePortChannelImpl::Send(IPC::Message* message) {
208  if (!child_thread_loop_->BelongsToCurrentThread()) {
209    DCHECK(!message->is_sync());
210    child_thread_loop_->PostTask(
211        FROM_HERE,
212        base::Bind(&WebMessagePortChannelImpl::Send, this, message));
213    return;
214  }
215
216  ChildThread::current()->GetRouter()->Send(message);
217}
218
219bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
220  bool handled = true;
221  IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
222    IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage)
223    IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued)
224    IPC_MESSAGE_UNHANDLED(handled = false)
225  IPC_END_MESSAGE_MAP()
226  return handled;
227}
228
229void WebMessagePortChannelImpl::OnMessage(
230    const base::string16& message,
231    const std::vector<int>& sent_message_port_ids,
232    const std::vector<int>& new_routing_ids) {
233  base::AutoLock auto_lock(lock_);
234  Message msg;
235  msg.message = message;
236  if (!sent_message_port_ids.empty()) {
237    msg.ports.resize(sent_message_port_ids.size());
238    for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
239      msg.ports[i] = new WebMessagePortChannelImpl(new_routing_ids[i],
240                                                   sent_message_port_ids[i],
241                                                   child_thread_loop_.get());
242    }
243  }
244
245  bool was_empty = message_queue_.empty();
246  message_queue_.push(msg);
247  if (client_ && was_empty)
248    client_->messageAvailable();
249}
250
251void WebMessagePortChannelImpl::OnMessagesQueued() {
252  std::vector<QueuedMessage> queued_messages;
253
254  {
255    base::AutoLock auto_lock(lock_);
256    queued_messages.reserve(message_queue_.size());
257    while (!message_queue_.empty()) {
258      base::string16 message = message_queue_.front().message;
259      const std::vector<WebMessagePortChannelImpl*>& channel_array =
260          message_queue_.front().ports;
261      std::vector<int> port_ids(channel_array.size());
262      for (size_t i = 0; i < channel_array.size(); ++i) {
263        port_ids[i] = channel_array[i]->message_port_id();
264        channel_array[i]->QueueMessages();
265      }
266      queued_messages.push_back(std::make_pair(message, port_ids));
267      message_queue_.pop();
268    }
269  }
270
271  Send(new MessagePortHostMsg_SendQueuedMessages(
272      message_port_id_, queued_messages));
273
274  message_port_id_ = MSG_ROUTING_NONE;
275
276  Release();
277  ChildProcess::current()->ReleaseProcess();
278}
279
280WebMessagePortChannelImpl::Message::Message() {}
281
282WebMessagePortChannelImpl::Message::~Message() {}
283
284}  // namespace content
285