1// Copyright (c) 2009 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "content/browser/worker_host/message_port_service.h" 6 7#include "content/browser/worker_host/worker_message_filter.h" 8#include "content/common/worker_messages.h" 9 10namespace content { 11 12struct MessagePortService::MessagePort { 13 // |filter| and |route_id| are what we need to send messages to the port. 14 // |filter| is just a weak pointer since we get notified when its process has 15 // gone away and remove it. 16 WorkerMessageFilter* filter; 17 int route_id; 18 // A globally unique id for this message port. 19 int message_port_id; 20 // The globally unique id of the entangled message port. 21 int entangled_message_port_id; 22 // If true, all messages to this message port are queued and not delivered. 23 bool queue_messages; 24 QueuedMessages queued_messages; 25}; 26 27MessagePortService* MessagePortService::GetInstance() { 28 return Singleton<MessagePortService>::get(); 29} 30 31MessagePortService::MessagePortService() 32 : next_message_port_id_(0) { 33} 34 35MessagePortService::~MessagePortService() { 36} 37 38void MessagePortService::UpdateMessagePort( 39 int message_port_id, 40 WorkerMessageFilter* filter, 41 int routing_id) { 42 if (!message_ports_.count(message_port_id)) { 43 NOTREACHED(); 44 return; 45 } 46 47 MessagePort& port = message_ports_[message_port_id]; 48 port.filter = filter; 49 port.route_id = routing_id; 50} 51 52void MessagePortService::OnWorkerMessageFilterClosing( 53 WorkerMessageFilter* filter) { 54 // Check if the (possibly) crashed process had any message ports. 55 for (MessagePorts::iterator iter = message_ports_.begin(); 56 iter != message_ports_.end();) { 57 MessagePorts::iterator cur_item = iter++; 58 if (cur_item->second.filter == filter) { 59 Erase(cur_item->first); 60 } 61 } 62} 63 64void MessagePortService::Create(int route_id, 65 WorkerMessageFilter* filter, 66 int* message_port_id) { 67 *message_port_id = ++next_message_port_id_; 68 69 MessagePort port; 70 port.filter = filter; 71 port.route_id = route_id; 72 port.message_port_id = *message_port_id; 73 port.entangled_message_port_id = MSG_ROUTING_NONE; 74 port.queue_messages = false; 75 message_ports_[*message_port_id] = port; 76} 77 78void MessagePortService::Destroy(int message_port_id) { 79 if (!message_ports_.count(message_port_id)) { 80 NOTREACHED(); 81 return; 82 } 83 84 DCHECK(message_ports_[message_port_id].queued_messages.empty()); 85 Erase(message_port_id); 86} 87 88void MessagePortService::Entangle(int local_message_port_id, 89 int remote_message_port_id) { 90 if (!message_ports_.count(local_message_port_id) || 91 !message_ports_.count(remote_message_port_id)) { 92 NOTREACHED(); 93 return; 94 } 95 96 DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id == 97 MSG_ROUTING_NONE); 98 message_ports_[remote_message_port_id].entangled_message_port_id = 99 local_message_port_id; 100} 101 102void MessagePortService::PostMessage( 103 int sender_message_port_id, 104 const string16& message, 105 const std::vector<int>& sent_message_port_ids) { 106 if (!message_ports_.count(sender_message_port_id)) { 107 NOTREACHED(); 108 return; 109 } 110 111 int entangled_message_port_id = 112 message_ports_[sender_message_port_id].entangled_message_port_id; 113 if (entangled_message_port_id == MSG_ROUTING_NONE) 114 return; // Process could have crashed. 115 116 if (!message_ports_.count(entangled_message_port_id)) { 117 NOTREACHED(); 118 return; 119 } 120 121 PostMessageTo(entangled_message_port_id, message, sent_message_port_ids); 122} 123 124void MessagePortService::PostMessageTo( 125 int message_port_id, 126 const string16& message, 127 const std::vector<int>& sent_message_port_ids) { 128 if (!message_ports_.count(message_port_id)) { 129 NOTREACHED(); 130 return; 131 } 132 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { 133 if (!message_ports_.count(sent_message_port_ids[i])) { 134 NOTREACHED(); 135 return; 136 } 137 } 138 139 MessagePort& entangled_port = message_ports_[message_port_id]; 140 141 std::vector<MessagePort*> sent_ports(sent_message_port_ids.size()); 142 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { 143 sent_ports[i] = &message_ports_[sent_message_port_ids[i]]; 144 sent_ports[i]->queue_messages = true; 145 } 146 147 if (entangled_port.queue_messages) { 148 entangled_port.queued_messages.push_back( 149 std::make_pair(message, sent_message_port_ids)); 150 return; 151 } 152 153 if (!entangled_port.filter) { 154 NOTREACHED(); 155 return; 156 } 157 158 // If a message port was sent around, the new location will need a routing 159 // id. Instead of having the created port send us a sync message to get it, 160 // send along with the message. 161 std::vector<int> new_routing_ids(sent_message_port_ids.size()); 162 for (size_t i = 0; i < sent_message_port_ids.size(); ++i) { 163 new_routing_ids[i] = entangled_port.filter->GetNextRoutingID(); 164 sent_ports[i]->filter = entangled_port.filter; 165 166 // Update the entry for the sent port as it can be in a different process. 167 sent_ports[i]->route_id = new_routing_ids[i]; 168 } 169 170 // Now send the message to the entangled port. 171 entangled_port.filter->Send(new WorkerProcessMsg_Message( 172 entangled_port.route_id, message, sent_message_port_ids, 173 new_routing_ids)); 174} 175 176void MessagePortService::QueueMessages(int message_port_id) { 177 if (!message_ports_.count(message_port_id)) { 178 NOTREACHED(); 179 return; 180 } 181 182 MessagePort& port = message_ports_[message_port_id]; 183 if (port.filter) { 184 port.filter->Send(new WorkerProcessMsg_MessagesQueued(port.route_id)); 185 port.queue_messages = true; 186 port.filter = NULL; 187 } 188} 189 190void MessagePortService::SendQueuedMessages( 191 int message_port_id, 192 const QueuedMessages& queued_messages) { 193 if (!message_ports_.count(message_port_id)) { 194 NOTREACHED(); 195 return; 196 } 197 198 // Send the queued messages to the port again. This time they'll reach the 199 // new location. 200 MessagePort& port = message_ports_[message_port_id]; 201 port.queue_messages = false; 202 port.queued_messages.insert(port.queued_messages.begin(), 203 queued_messages.begin(), 204 queued_messages.end()); 205 SendQueuedMessagesIfPossible(message_port_id); 206} 207 208void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) { 209 if (!message_ports_.count(message_port_id)) { 210 NOTREACHED(); 211 return; 212 } 213 214 MessagePort& port = message_ports_[message_port_id]; 215 if (port.queue_messages || !port.filter) 216 return; 217 218 for (QueuedMessages::iterator iter = port.queued_messages.begin(); 219 iter != port.queued_messages.end(); ++iter) { 220 PostMessageTo(message_port_id, iter->first, iter->second); 221 } 222 port.queued_messages.clear(); 223} 224 225void MessagePortService::Erase(int message_port_id) { 226 MessagePorts::iterator erase_item = message_ports_.find(message_port_id); 227 DCHECK(erase_item != message_ports_.end()); 228 229 int entangled_id = erase_item->second.entangled_message_port_id; 230 if (entangled_id != MSG_ROUTING_NONE) { 231 // Do the disentanglement (and be paranoid about the other side existing 232 // just in case something unusual happened during entanglement). 233 if (message_ports_.count(entangled_id)) { 234 message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE; 235 } 236 } 237 message_ports_.erase(erase_item); 238} 239 240} // namespace content 241