1// Copyright (c) 2011 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "content/child/webmessageportchannel_impl.h" 6 7#include "base/bind.h" 8#include "base/message_loop/message_loop_proxy.h" 9#include "content/child/child_process.h" 10#include "content/child/child_thread.h" 11#include "content/common/message_port_messages.h" 12#include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h" 13#include "third_party/WebKit/public/platform/WebString.h" 14 15using blink::WebMessagePortChannel; 16using blink::WebMessagePortChannelArray; 17using blink::WebMessagePortChannelClient; 18using blink::WebString; 19 20namespace content { 21 22WebMessagePortChannelImpl::WebMessagePortChannelImpl( 23 base::MessageLoopProxy* child_thread_loop) 24 : client_(NULL), 25 route_id_(MSG_ROUTING_NONE), 26 message_port_id_(MSG_ROUTING_NONE), 27 child_thread_loop_(child_thread_loop) { 28 AddRef(); 29 Init(); 30} 31 32WebMessagePortChannelImpl::WebMessagePortChannelImpl( 33 int route_id, 34 int message_port_id, 35 base::MessageLoopProxy* child_thread_loop) 36 : client_(NULL), 37 route_id_(route_id), 38 message_port_id_(message_port_id), 39 child_thread_loop_(child_thread_loop) { 40 AddRef(); 41 Init(); 42} 43 44WebMessagePortChannelImpl::~WebMessagePortChannelImpl() { 45 // If we have any queued messages with attached ports, manually destroy them. 46 while (!message_queue_.empty()) { 47 const std::vector<WebMessagePortChannelImpl*>& channel_array = 48 message_queue_.front().ports; 49 for (size_t i = 0; i < channel_array.size(); i++) { 50 channel_array[i]->destroy(); 51 } 52 message_queue_.pop(); 53 } 54 55 if (message_port_id_ != MSG_ROUTING_NONE) 56 Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_)); 57 58 if (route_id_ != MSG_ROUTING_NONE) 59 ChildThread::current()->GetRouter()->RemoveRoute(route_id_); 60} 61 62// static 63void WebMessagePortChannelImpl::CreatePair( 64 base::MessageLoopProxy* child_thread_loop, 65 blink::WebMessagePortChannel** channel1, 66 blink::WebMessagePortChannel** channel2) { 67 WebMessagePortChannelImpl* impl1 = 68 new WebMessagePortChannelImpl(child_thread_loop); 69 WebMessagePortChannelImpl* impl2 = 70 new WebMessagePortChannelImpl(child_thread_loop); 71 72 impl1->Entangle(impl2); 73 impl2->Entangle(impl1); 74 75 *channel1 = impl1; 76 *channel2 = impl2; 77} 78 79// static 80std::vector<int> WebMessagePortChannelImpl::ExtractMessagePortIDs( 81 WebMessagePortChannelArray* channels) { 82 std::vector<int> message_port_ids; 83 if (channels) { 84 message_port_ids.resize(channels->size()); 85 // Extract the port IDs from the source array, then free it. 86 for (size_t i = 0; i < channels->size(); ++i) { 87 WebMessagePortChannelImpl* webchannel = 88 static_cast<WebMessagePortChannelImpl*>((*channels)[i]); 89 // The message port ids might not be set up yet if this channel 90 // wasn't created on the main thread. 91 DCHECK(webchannel->child_thread_loop_->BelongsToCurrentThread()); 92 message_port_ids[i] = webchannel->message_port_id(); 93 webchannel->QueueMessages(); 94 DCHECK(message_port_ids[i] != MSG_ROUTING_NONE); 95 } 96 delete channels; 97 } 98 return message_port_ids; 99} 100 101void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) { 102 // Must lock here since client_ is called on the main thread. 103 base::AutoLock auto_lock(lock_); 104 client_ = client; 105} 106 107void WebMessagePortChannelImpl::destroy() { 108 setClient(NULL); 109 110 // Release the object on the main thread, since the destructor might want to 111 // send an IPC, and that has to happen on the main thread. 112 child_thread_loop_->ReleaseSoon(FROM_HERE, this); 113} 114 115void WebMessagePortChannelImpl::postMessage( 116 const WebString& message, 117 WebMessagePortChannelArray* channels) { 118 if (!child_thread_loop_->BelongsToCurrentThread()) { 119 child_thread_loop_->PostTask( 120 FROM_HERE, 121 base::Bind( 122 &WebMessagePortChannelImpl::PostMessage, this, message, channels)); 123 } else { 124 PostMessage(message, channels); 125 } 126} 127 128void WebMessagePortChannelImpl::PostMessage( 129 const base::string16& message, 130 WebMessagePortChannelArray* channels) { 131 IPC::Message* msg = new MessagePortHostMsg_PostMessage( 132 message_port_id_, message, ExtractMessagePortIDs(channels)); 133 Send(msg); 134} 135 136bool WebMessagePortChannelImpl::tryGetMessage( 137 WebString* message, 138 WebMessagePortChannelArray& channels) { 139 base::AutoLock auto_lock(lock_); 140 if (message_queue_.empty()) 141 return false; 142 143 *message = message_queue_.front().message; 144 const std::vector<WebMessagePortChannelImpl*>& channel_array = 145 message_queue_.front().ports; 146 WebMessagePortChannelArray result_ports(channel_array.size()); 147 for (size_t i = 0; i < channel_array.size(); i++) { 148 result_ports[i] = channel_array[i]; 149 } 150 151 channels.swap(result_ports); 152 message_queue_.pop(); 153 return true; 154} 155 156void WebMessagePortChannelImpl::Init() { 157 if (!child_thread_loop_->BelongsToCurrentThread()) { 158 child_thread_loop_->PostTask( 159 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this)); 160 return; 161 } 162 163 if (route_id_ == MSG_ROUTING_NONE) { 164 DCHECK(message_port_id_ == MSG_ROUTING_NONE); 165 Send(new MessagePortHostMsg_CreateMessagePort( 166 &route_id_, &message_port_id_)); 167 } 168 169 ChildThread::current()->GetRouter()->AddRoute(route_id_, this); 170} 171 172void WebMessagePortChannelImpl::Entangle( 173 scoped_refptr<WebMessagePortChannelImpl> channel) { 174 // The message port ids might not be set up yet, if this channel wasn't 175 // created on the main thread. So need to wait until we're on the main thread 176 // before getting the other message port id. 177 if (!child_thread_loop_->BelongsToCurrentThread()) { 178 child_thread_loop_->PostTask( 179 FROM_HERE, 180 base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel)); 181 return; 182 } 183 184 Send(new MessagePortHostMsg_Entangle( 185 message_port_id_, channel->message_port_id())); 186} 187 188void WebMessagePortChannelImpl::QueueMessages() { 189 if (!child_thread_loop_->BelongsToCurrentThread()) { 190 child_thread_loop_->PostTask( 191 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this)); 192 return; 193 } 194 // This message port is being sent elsewhere (perhaps to another process). 195 // The new endpoint needs to receive the queued messages, including ones that 196 // could still be in-flight. So we tell the browser to queue messages, and it 197 // sends us an ack, whose receipt we know means that no more messages are 198 // in-flight. We then send the queued messages to the browser, which prepends 199 // them to the ones it queued and it sends them to the new endpoint. 200 Send(new MessagePortHostMsg_QueueMessages(message_port_id_)); 201 202 // The process could potentially go away while we're still waiting for 203 // in-flight messages. Ensure it stays alive. 204 ChildProcess::current()->AddRefProcess(); 205} 206 207void WebMessagePortChannelImpl::Send(IPC::Message* message) { 208 if (!child_thread_loop_->BelongsToCurrentThread()) { 209 DCHECK(!message->is_sync()); 210 child_thread_loop_->PostTask( 211 FROM_HERE, 212 base::Bind(&WebMessagePortChannelImpl::Send, this, message)); 213 return; 214 } 215 216 ChildThread::current()->GetRouter()->Send(message); 217} 218 219bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) { 220 bool handled = true; 221 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message) 222 IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage) 223 IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued) 224 IPC_MESSAGE_UNHANDLED(handled = false) 225 IPC_END_MESSAGE_MAP() 226 return handled; 227} 228 229void WebMessagePortChannelImpl::OnMessage( 230 const base::string16& message, 231 const std::vector<int>& sent_message_port_ids, 232 const std::vector<int>& new_routing_ids) { 233 base::AutoLock auto_lock(lock_); 234 Message msg; 235 msg.message = message; 236 if (!sent_message_port_ids.empty()) { 237 msg.ports.resize(sent_message_port_ids.size()); 238 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { 239 msg.ports[i] = new WebMessagePortChannelImpl(new_routing_ids[i], 240 sent_message_port_ids[i], 241 child_thread_loop_.get()); 242 } 243 } 244 245 bool was_empty = message_queue_.empty(); 246 message_queue_.push(msg); 247 if (client_ && was_empty) 248 client_->messageAvailable(); 249} 250 251void WebMessagePortChannelImpl::OnMessagesQueued() { 252 std::vector<QueuedMessage> queued_messages; 253 254 { 255 base::AutoLock auto_lock(lock_); 256 queued_messages.reserve(message_queue_.size()); 257 while (!message_queue_.empty()) { 258 base::string16 message = message_queue_.front().message; 259 const std::vector<WebMessagePortChannelImpl*>& channel_array = 260 message_queue_.front().ports; 261 std::vector<int> port_ids(channel_array.size()); 262 for (size_t i = 0; i < channel_array.size(); ++i) { 263 port_ids[i] = channel_array[i]->message_port_id(); 264 channel_array[i]->QueueMessages(); 265 } 266 queued_messages.push_back(std::make_pair(message, port_ids)); 267 message_queue_.pop(); 268 } 269 } 270 271 Send(new MessagePortHostMsg_SendQueuedMessages( 272 message_port_id_, queued_messages)); 273 274 message_port_id_ = MSG_ROUTING_NONE; 275 276 Release(); 277 ChildProcess::current()->ReleaseProcess(); 278} 279 280WebMessagePortChannelImpl::Message::Message() {} 281 282WebMessagePortChannelImpl::Message::~Message() {} 283 284} // namespace content 285