1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <stdlib.h>
6#include <string.h>
7
8#include "mojo/public/cpp/bindings/lib/connector.h"
9#include "mojo/public/cpp/bindings/lib/message_builder.h"
10#include "mojo/public/cpp/bindings/lib/message_queue.h"
11#include "mojo/public/cpp/environment/environment.h"
12#include "mojo/public/cpp/system/macros.h"
13#include "mojo/public/cpp/utility/run_loop.h"
14#include "testing/gtest/include/gtest/gtest.h"
15
16namespace mojo {
17namespace test {
18namespace {
19
20class MessageAccumulator : public MessageReceiver {
21 public:
22  MessageAccumulator() {
23  }
24
25  virtual bool Accept(Message* message) MOJO_OVERRIDE {
26    queue_.Push(message);
27    return true;
28  }
29
30  bool IsEmpty() const {
31    return queue_.IsEmpty();
32  }
33
34  void Pop(Message* message) {
35    queue_.Pop(message);
36  }
37
38 private:
39  internal::MessageQueue queue_;
40};
41
42class ConnectorDeletingMessageAccumulator : public MessageAccumulator {
43 public:
44  ConnectorDeletingMessageAccumulator(internal::Connector** connector)
45      : connector_(connector) {}
46
47  virtual bool Accept(Message* message) MOJO_OVERRIDE {
48    delete *connector_;
49    *connector_ = 0;
50    return MessageAccumulator::Accept(message);
51  }
52
53 private:
54  internal::Connector** connector_;
55};
56
57class ReentrantMessageAccumulator : public MessageAccumulator {
58 public:
59  ReentrantMessageAccumulator(internal::Connector* connector)
60      : connector_(connector), number_of_calls_(0) {}
61
62  virtual bool Accept(Message* message) MOJO_OVERRIDE {
63    if (!MessageAccumulator::Accept(message))
64      return false;
65    number_of_calls_++;
66    if (number_of_calls_ == 1) {
67      return connector_->WaitForIncomingMessage();
68    }
69    return true;
70  }
71
72  int number_of_calls() { return number_of_calls_; }
73
74 private:
75  internal::Connector* connector_;
76  int number_of_calls_;
77};
78
79class ConnectorTest : public testing::Test {
80 public:
81  ConnectorTest() {
82  }
83
84  virtual void SetUp() MOJO_OVERRIDE {
85    CreateMessagePipe(NULL, &handle0_, &handle1_);
86  }
87
88  virtual void TearDown() MOJO_OVERRIDE {
89  }
90
91  void AllocMessage(const char* text, Message* message) {
92    size_t payload_size = strlen(text) + 1;  // Plus null terminator.
93    internal::MessageBuilder builder(1, payload_size);
94    memcpy(builder.buffer()->Allocate(payload_size), text, payload_size);
95    builder.Finish(message);
96  }
97
98  void PumpMessages() {
99    loop_.RunUntilIdle();
100  }
101
102 protected:
103  ScopedMessagePipeHandle handle0_;
104  ScopedMessagePipeHandle handle1_;
105
106 private:
107  Environment env_;
108  RunLoop loop_;
109};
110
111TEST_F(ConnectorTest, Basic) {
112  internal::Connector connector0(handle0_.Pass());
113  internal::Connector connector1(handle1_.Pass());
114
115  const char kText[] = "hello world";
116
117  Message message;
118  AllocMessage(kText, &message);
119
120  connector0.Accept(&message);
121
122  MessageAccumulator accumulator;
123  connector1.set_incoming_receiver(&accumulator);
124
125  PumpMessages();
126
127  ASSERT_FALSE(accumulator.IsEmpty());
128
129  Message message_received;
130  accumulator.Pop(&message_received);
131
132  EXPECT_EQ(
133      std::string(kText),
134      std::string(reinterpret_cast<const char*>(message_received.payload())));
135}
136
137TEST_F(ConnectorTest, Basic_Synchronous) {
138  internal::Connector connector0(handle0_.Pass());
139  internal::Connector connector1(handle1_.Pass());
140
141  const char kText[] = "hello world";
142
143  Message message;
144  AllocMessage(kText, &message);
145
146  connector0.Accept(&message);
147
148  MessageAccumulator accumulator;
149  connector1.set_incoming_receiver(&accumulator);
150
151  connector1.WaitForIncomingMessage();
152
153  ASSERT_FALSE(accumulator.IsEmpty());
154
155  Message message_received;
156  accumulator.Pop(&message_received);
157
158  EXPECT_EQ(
159      std::string(kText),
160      std::string(reinterpret_cast<const char*>(message_received.payload())));
161}
162
163TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) {
164  internal::Connector connector0(handle0_.Pass());
165  internal::Connector connector1(handle1_.Pass());
166
167  MessageAccumulator accumulator;
168  connector1.set_incoming_receiver(&accumulator);
169
170  const char kText[] = "hello world";
171
172  Message message;
173  AllocMessage(kText, &message);
174
175  connector0.Accept(&message);
176
177  PumpMessages();
178
179  ASSERT_FALSE(accumulator.IsEmpty());
180
181  Message message_received;
182  accumulator.Pop(&message_received);
183
184  EXPECT_EQ(
185      std::string(kText),
186      std::string(reinterpret_cast<const char*>(message_received.payload())));
187}
188
189TEST_F(ConnectorTest, Basic_TwoMessages) {
190  internal::Connector connector0(handle0_.Pass());
191  internal::Connector connector1(handle1_.Pass());
192
193  const char* kText[] = { "hello", "world" };
194
195  for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
196    Message message;
197    AllocMessage(kText[i], &message);
198
199    connector0.Accept(&message);
200  }
201
202  MessageAccumulator accumulator;
203  connector1.set_incoming_receiver(&accumulator);
204
205  PumpMessages();
206
207  for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
208    ASSERT_FALSE(accumulator.IsEmpty());
209
210    Message message_received;
211    accumulator.Pop(&message_received);
212
213    EXPECT_EQ(
214        std::string(kText[i]),
215        std::string(reinterpret_cast<const char*>(message_received.payload())));
216  }
217}
218
219TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) {
220  internal::Connector connector0(handle0_.Pass());
221  internal::Connector connector1(handle1_.Pass());
222
223  const char* kText[] = { "hello", "world" };
224
225  for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
226    Message message;
227    AllocMessage(kText[i], &message);
228
229    connector0.Accept(&message);
230  }
231
232  MessageAccumulator accumulator;
233  connector1.set_incoming_receiver(&accumulator);
234
235  connector1.WaitForIncomingMessage();
236
237  ASSERT_FALSE(accumulator.IsEmpty());
238
239  Message message_received;
240  accumulator.Pop(&message_received);
241
242  EXPECT_EQ(
243      std::string(kText[0]),
244      std::string(reinterpret_cast<const char*>(message_received.payload())));
245
246  ASSERT_TRUE(accumulator.IsEmpty());
247}
248
249TEST_F(ConnectorTest, WriteToClosedPipe) {
250  internal::Connector connector0(handle0_.Pass());
251
252  const char kText[] = "hello world";
253
254  Message message;
255  AllocMessage(kText, &message);
256
257  // Close the other end of the pipe.
258  handle1_.reset();
259
260  // Not observed yet because we haven't spun the RunLoop yet.
261  EXPECT_FALSE(connector0.encountered_error());
262
263  // Write failures are not reported.
264  bool ok = connector0.Accept(&message);
265  EXPECT_TRUE(ok);
266
267  // Still not observed.
268  EXPECT_FALSE(connector0.encountered_error());
269
270  // Spin the RunLoop, and then we should start observing the closed pipe.
271  PumpMessages();
272
273  EXPECT_TRUE(connector0.encountered_error());
274}
275
276TEST_F(ConnectorTest, MessageWithHandles) {
277  internal::Connector connector0(handle0_.Pass());
278  internal::Connector connector1(handle1_.Pass());
279
280  const char kText[] = "hello world";
281
282  Message message1;
283  AllocMessage(kText, &message1);
284
285  MessagePipe pipe;
286  message1.mutable_handles()->push_back(pipe.handle0.release());
287
288  connector0.Accept(&message1);
289
290  // The message should have been transferred, releasing the handles.
291  EXPECT_TRUE(message1.handles()->empty());
292
293  MessageAccumulator accumulator;
294  connector1.set_incoming_receiver(&accumulator);
295
296  PumpMessages();
297
298  ASSERT_FALSE(accumulator.IsEmpty());
299
300  Message message_received;
301  accumulator.Pop(&message_received);
302
303  EXPECT_EQ(
304      std::string(kText),
305      std::string(reinterpret_cast<const char*>(message_received.payload())));
306  ASSERT_EQ(1U, message_received.handles()->size());
307
308  // Now send a message to the transferred handle and confirm it's sent through
309  // to the orginal pipe.
310  // TODO(vtl): Do we need a better way of "downcasting" the handle types?
311  ScopedMessagePipeHandle smph;
312  smph.reset(MessagePipeHandle(message_received.handles()->front().value()));
313  message_received.mutable_handles()->front() = Handle();
314  // |smph| now owns this handle.
315
316  internal::Connector connector_received(smph.Pass());
317  internal::Connector connector_original(pipe.handle1.Pass());
318
319  Message message2;
320  AllocMessage(kText, &message2);
321
322  connector_received.Accept(&message2);
323  connector_original.set_incoming_receiver(&accumulator);
324  PumpMessages();
325
326  ASSERT_FALSE(accumulator.IsEmpty());
327
328  accumulator.Pop(&message_received);
329
330  EXPECT_EQ(
331      std::string(kText),
332      std::string(reinterpret_cast<const char*>(message_received.payload())));
333}
334
335TEST_F(ConnectorTest, WaitForIncomingMessageWithError) {
336  internal::Connector connector0(handle0_.Pass());
337  // Close the other end of the pipe.
338  handle1_.reset();
339  ASSERT_FALSE(connector0.WaitForIncomingMessage());
340}
341
342TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) {
343  internal::Connector connector0(handle0_.Pass());
344  internal::Connector* connector1 = new internal::Connector(handle1_.Pass());
345
346  const char kText[] = "hello world";
347
348  Message message;
349  AllocMessage(kText, &message);
350
351  connector0.Accept(&message);
352
353  ConnectorDeletingMessageAccumulator accumulator(&connector1);
354  connector1->set_incoming_receiver(&accumulator);
355
356  connector1->WaitForIncomingMessage();
357
358  ASSERT_FALSE(connector1);
359  ASSERT_FALSE(accumulator.IsEmpty());
360
361  Message message_received;
362  accumulator.Pop(&message_received);
363
364  EXPECT_EQ(
365      std::string(kText),
366      std::string(reinterpret_cast<const char*>(message_received.payload())));
367}
368
369TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) {
370  internal::Connector connector0(handle0_.Pass());
371  internal::Connector connector1(handle1_.Pass());
372
373  const char* kText[] = { "hello", "world" };
374
375  for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
376    Message message;
377    AllocMessage(kText[i], &message);
378
379    connector0.Accept(&message);
380  }
381
382  ReentrantMessageAccumulator accumulator(&connector1);
383  connector1.set_incoming_receiver(&accumulator);
384
385  PumpMessages();
386
387  for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
388    ASSERT_FALSE(accumulator.IsEmpty());
389
390    Message message_received;
391    accumulator.Pop(&message_received);
392
393    EXPECT_EQ(
394        std::string(kText[i]),
395        std::string(reinterpret_cast<const char*>(message_received.payload())));
396  }
397
398  ASSERT_EQ(2, accumulator.number_of_calls());
399}
400
401}  // namespace
402}  // namespace test
403}  // namespace mojo
404