1// Copyright (c) 2009 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "content/browser/worker_host/message_port_service.h"
6
7#include "content/browser/worker_host/worker_message_filter.h"
8#include "content/common/worker_messages.h"
9
10namespace content {
11
12struct MessagePortService::MessagePort {
13  // |filter| and |route_id| are what we need to send messages to the port.
14  // |filter| is just a weak pointer since we get notified when its process has
15  // gone away and remove it.
16  WorkerMessageFilter* filter;
17  int route_id;
18  // A globally unique id for this message port.
19  int message_port_id;
20  // The globally unique id of the entangled message port.
21  int entangled_message_port_id;
22  // If true, all messages to this message port are queued and not delivered.
23  bool queue_messages;
24  QueuedMessages queued_messages;
25};
26
27MessagePortService* MessagePortService::GetInstance() {
28  return Singleton<MessagePortService>::get();
29}
30
31MessagePortService::MessagePortService()
32    : next_message_port_id_(0) {
33}
34
35MessagePortService::~MessagePortService() {
36}
37
38void MessagePortService::UpdateMessagePort(
39    int message_port_id,
40    WorkerMessageFilter* filter,
41    int routing_id) {
42  if (!message_ports_.count(message_port_id)) {
43    NOTREACHED();
44    return;
45  }
46
47  MessagePort& port = message_ports_[message_port_id];
48  port.filter = filter;
49  port.route_id = routing_id;
50}
51
52void MessagePortService::OnWorkerMessageFilterClosing(
53    WorkerMessageFilter* filter) {
54  // Check if the (possibly) crashed process had any message ports.
55  for (MessagePorts::iterator iter = message_ports_.begin();
56       iter != message_ports_.end();) {
57    MessagePorts::iterator cur_item = iter++;
58    if (cur_item->second.filter == filter) {
59      Erase(cur_item->first);
60    }
61  }
62}
63
64void MessagePortService::Create(int route_id,
65                                WorkerMessageFilter* filter,
66                                int* message_port_id) {
67  *message_port_id = ++next_message_port_id_;
68
69  MessagePort port;
70  port.filter = filter;
71  port.route_id = route_id;
72  port.message_port_id = *message_port_id;
73  port.entangled_message_port_id = MSG_ROUTING_NONE;
74  port.queue_messages = false;
75  message_ports_[*message_port_id] = port;
76}
77
78void MessagePortService::Destroy(int message_port_id) {
79  if (!message_ports_.count(message_port_id)) {
80    NOTREACHED();
81    return;
82  }
83
84  DCHECK(message_ports_[message_port_id].queued_messages.empty());
85  Erase(message_port_id);
86}
87
88void MessagePortService::Entangle(int local_message_port_id,
89                                  int remote_message_port_id) {
90  if (!message_ports_.count(local_message_port_id) ||
91      !message_ports_.count(remote_message_port_id)) {
92    NOTREACHED();
93    return;
94  }
95
96  DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id ==
97      MSG_ROUTING_NONE);
98  message_ports_[remote_message_port_id].entangled_message_port_id =
99      local_message_port_id;
100}
101
102void MessagePortService::PostMessage(
103    int sender_message_port_id,
104    const string16& message,
105    const std::vector<int>& sent_message_port_ids) {
106  if (!message_ports_.count(sender_message_port_id)) {
107    NOTREACHED();
108    return;
109  }
110
111  int entangled_message_port_id =
112      message_ports_[sender_message_port_id].entangled_message_port_id;
113  if (entangled_message_port_id == MSG_ROUTING_NONE)
114    return;  // Process could have crashed.
115
116  if (!message_ports_.count(entangled_message_port_id)) {
117    NOTREACHED();
118    return;
119  }
120
121  PostMessageTo(entangled_message_port_id, message, sent_message_port_ids);
122}
123
124void MessagePortService::PostMessageTo(
125    int message_port_id,
126    const string16& message,
127    const std::vector<int>& sent_message_port_ids) {
128  if (!message_ports_.count(message_port_id)) {
129    NOTREACHED();
130    return;
131  }
132  for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
133    if (!message_ports_.count(sent_message_port_ids[i])) {
134      NOTREACHED();
135      return;
136    }
137  }
138
139  MessagePort& entangled_port = message_ports_[message_port_id];
140
141  std::vector<MessagePort*> sent_ports(sent_message_port_ids.size());
142  for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
143    sent_ports[i] = &message_ports_[sent_message_port_ids[i]];
144    sent_ports[i]->queue_messages = true;
145  }
146
147  if (entangled_port.queue_messages) {
148    entangled_port.queued_messages.push_back(
149        std::make_pair(message, sent_message_port_ids));
150    return;
151  }
152
153  if (!entangled_port.filter) {
154    NOTREACHED();
155    return;
156  }
157
158  // If a message port was sent around, the new location will need a routing
159  // id.  Instead of having the created port send us a sync message to get it,
160  // send along with the message.
161  std::vector<int> new_routing_ids(sent_message_port_ids.size());
162  for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
163    new_routing_ids[i] = entangled_port.filter->GetNextRoutingID();
164    sent_ports[i]->filter = entangled_port.filter;
165
166    // Update the entry for the sent port as it can be in a different process.
167    sent_ports[i]->route_id = new_routing_ids[i];
168  }
169
170  // Now send the message to the entangled port.
171  entangled_port.filter->Send(new WorkerProcessMsg_Message(
172      entangled_port.route_id, message, sent_message_port_ids,
173      new_routing_ids));
174}
175
176void MessagePortService::QueueMessages(int message_port_id) {
177  if (!message_ports_.count(message_port_id)) {
178    NOTREACHED();
179    return;
180  }
181
182  MessagePort& port = message_ports_[message_port_id];
183  if (port.filter) {
184    port.filter->Send(new WorkerProcessMsg_MessagesQueued(port.route_id));
185    port.queue_messages = true;
186    port.filter = NULL;
187  }
188}
189
190void MessagePortService::SendQueuedMessages(
191    int message_port_id,
192    const QueuedMessages& queued_messages) {
193  if (!message_ports_.count(message_port_id)) {
194    NOTREACHED();
195    return;
196  }
197
198  // Send the queued messages to the port again.  This time they'll reach the
199  // new location.
200  MessagePort& port = message_ports_[message_port_id];
201  port.queue_messages = false;
202  port.queued_messages.insert(port.queued_messages.begin(),
203                              queued_messages.begin(),
204                              queued_messages.end());
205  SendQueuedMessagesIfPossible(message_port_id);
206}
207
208void MessagePortService::SendQueuedMessagesIfPossible(int message_port_id) {
209  if (!message_ports_.count(message_port_id)) {
210    NOTREACHED();
211    return;
212  }
213
214  MessagePort& port = message_ports_[message_port_id];
215  if (port.queue_messages || !port.filter)
216    return;
217
218  for (QueuedMessages::iterator iter = port.queued_messages.begin();
219       iter != port.queued_messages.end(); ++iter) {
220    PostMessageTo(message_port_id, iter->first, iter->second);
221  }
222  port.queued_messages.clear();
223}
224
225void MessagePortService::Erase(int message_port_id) {
226  MessagePorts::iterator erase_item = message_ports_.find(message_port_id);
227  DCHECK(erase_item != message_ports_.end());
228
229  int entangled_id = erase_item->second.entangled_message_port_id;
230  if (entangled_id != MSG_ROUTING_NONE) {
231    // Do the disentanglement (and be paranoid about the other side existing
232    // just in case something unusual happened during entanglement).
233    if (message_ports_.count(entangled_id)) {
234      message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE;
235    }
236  }
237  message_ports_.erase(erase_item);
238}
239
240}  // namespace content
241