1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/public/cpp/bindings/connector.h"
6
7#include <stddef.h>
8#include <stdlib.h>
9#include <string.h>
10#include <utility>
11
12#include "base/bind.h"
13#include "base/callback.h"
14#include "base/callback_helpers.h"
15#include "base/message_loop/message_loop.h"
16#include "base/run_loop.h"
17#include "base/threading/thread_task_runner_handle.h"
18#include "mojo/public/cpp/bindings/lib/message_builder.h"
19#include "mojo/public/cpp/bindings/tests/message_queue.h"
20#include "testing/gtest/include/gtest/gtest.h"
21
22namespace mojo {
23namespace test {
24namespace {
25
26class MessageAccumulator : public MessageReceiver {
27 public:
28  MessageAccumulator() {}
29  explicit MessageAccumulator(const base::Closure& closure)
30      : closure_(closure) {}
31
32  bool Accept(Message* message) override {
33    queue_.Push(message);
34    if (!closure_.is_null())
35      base::ResetAndReturn(&closure_).Run();
36    return true;
37  }
38
39  bool IsEmpty() const { return queue_.IsEmpty(); }
40
41  void Pop(Message* message) { queue_.Pop(message); }
42
43  void set_closure(const base::Closure& closure) { closure_ = closure; }
44
45  size_t size() const { return queue_.size(); }
46
47 private:
48  MessageQueue queue_;
49  base::Closure closure_;
50};
51
52class ConnectorDeletingMessageAccumulator : public MessageAccumulator {
53 public:
54  ConnectorDeletingMessageAccumulator(Connector** connector)
55      : connector_(connector) {}
56
57  bool Accept(Message* message) override {
58    delete *connector_;
59    *connector_ = nullptr;
60    return MessageAccumulator::Accept(message);
61  }
62
63 private:
64  Connector** connector_;
65};
66
67class ReentrantMessageAccumulator : public MessageAccumulator {
68 public:
69  ReentrantMessageAccumulator(Connector* connector)
70      : connector_(connector), number_of_calls_(0) {}
71
72  bool Accept(Message* message) override {
73    if (!MessageAccumulator::Accept(message))
74      return false;
75    number_of_calls_++;
76    if (number_of_calls_ == 1) {
77      return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
78    }
79    return true;
80  }
81
82  int number_of_calls() { return number_of_calls_; }
83
84 private:
85  Connector* connector_;
86  int number_of_calls_;
87};
88
89class ConnectorTest : public testing::Test {
90 public:
91  ConnectorTest() {}
92
93  void SetUp() override {
94    CreateMessagePipe(nullptr, &handle0_, &handle1_);
95  }
96
97  void TearDown() override {}
98
99  void AllocMessage(const char* text, Message* message) {
100    size_t payload_size = strlen(text) + 1;  // Plus null terminator.
101    internal::MessageBuilder builder(1, payload_size);
102    memcpy(builder.buffer()->Allocate(payload_size), text, payload_size);
103
104    builder.message()->MoveTo(message);
105  }
106
107 protected:
108  ScopedMessagePipeHandle handle0_;
109  ScopedMessagePipeHandle handle1_;
110
111 private:
112  base::MessageLoop loop_;
113};
114
115TEST_F(ConnectorTest, Basic) {
116  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
117                       base::ThreadTaskRunnerHandle::Get());
118  Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
119                       base::ThreadTaskRunnerHandle::Get());
120
121  const char kText[] = "hello world";
122
123  Message message;
124  AllocMessage(kText, &message);
125
126  connector0.Accept(&message);
127
128  base::RunLoop run_loop;
129  MessageAccumulator accumulator(run_loop.QuitClosure());
130  connector1.set_incoming_receiver(&accumulator);
131
132  run_loop.Run();
133
134  ASSERT_FALSE(accumulator.IsEmpty());
135
136  Message message_received;
137  accumulator.Pop(&message_received);
138
139  EXPECT_EQ(
140      std::string(kText),
141      std::string(reinterpret_cast<const char*>(message_received.payload())));
142}
143
144TEST_F(ConnectorTest, Basic_Synchronous) {
145  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
146                       base::ThreadTaskRunnerHandle::Get());
147  Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
148                       base::ThreadTaskRunnerHandle::Get());
149
150  const char kText[] = "hello world";
151
152  Message message;
153  AllocMessage(kText, &message);
154
155  connector0.Accept(&message);
156
157  MessageAccumulator accumulator;
158  connector1.set_incoming_receiver(&accumulator);
159
160  connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
161
162  ASSERT_FALSE(accumulator.IsEmpty());
163
164  Message message_received;
165  accumulator.Pop(&message_received);
166
167  EXPECT_EQ(
168      std::string(kText),
169      std::string(reinterpret_cast<const char*>(message_received.payload())));
170}
171
172TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) {
173  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
174                       base::ThreadTaskRunnerHandle::Get());
175  Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
176                       base::ThreadTaskRunnerHandle::Get());
177
178  base::RunLoop run_loop;
179  MessageAccumulator accumulator(run_loop.QuitClosure());
180  connector1.set_incoming_receiver(&accumulator);
181
182  const char kText[] = "hello world";
183
184  Message message;
185  AllocMessage(kText, &message);
186
187  connector0.Accept(&message);
188
189  run_loop.Run();
190
191  ASSERT_FALSE(accumulator.IsEmpty());
192
193  Message message_received;
194  accumulator.Pop(&message_received);
195
196  EXPECT_EQ(
197      std::string(kText),
198      std::string(reinterpret_cast<const char*>(message_received.payload())));
199}
200
201TEST_F(ConnectorTest, Basic_TwoMessages) {
202  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
203                       base::ThreadTaskRunnerHandle::Get());
204  Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
205                       base::ThreadTaskRunnerHandle::Get());
206
207  const char* kText[] = {"hello", "world"};
208
209  for (size_t i = 0; i < arraysize(kText); ++i) {
210    Message message;
211    AllocMessage(kText[i], &message);
212
213    connector0.Accept(&message);
214  }
215
216  MessageAccumulator accumulator;
217  connector1.set_incoming_receiver(&accumulator);
218
219  for (size_t i = 0; i < arraysize(kText); ++i) {
220    if (accumulator.IsEmpty()) {
221      base::RunLoop run_loop;
222      accumulator.set_closure(run_loop.QuitClosure());
223      run_loop.Run();
224    }
225    ASSERT_FALSE(accumulator.IsEmpty());
226
227    Message message_received;
228    accumulator.Pop(&message_received);
229
230    EXPECT_EQ(
231        std::string(kText[i]),
232        std::string(reinterpret_cast<const char*>(message_received.payload())));
233  }
234}
235
236TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) {
237  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
238                       base::ThreadTaskRunnerHandle::Get());
239  Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
240                       base::ThreadTaskRunnerHandle::Get());
241
242  const char* kText[] = {"hello", "world"};
243
244  for (size_t i = 0; i < arraysize(kText); ++i) {
245    Message message;
246    AllocMessage(kText[i], &message);
247
248    connector0.Accept(&message);
249  }
250
251  MessageAccumulator accumulator;
252  connector1.set_incoming_receiver(&accumulator);
253
254  connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
255
256  ASSERT_FALSE(accumulator.IsEmpty());
257
258  Message message_received;
259  accumulator.Pop(&message_received);
260
261  EXPECT_EQ(
262      std::string(kText[0]),
263      std::string(reinterpret_cast<const char*>(message_received.payload())));
264
265  ASSERT_TRUE(accumulator.IsEmpty());
266}
267
268TEST_F(ConnectorTest, WriteToClosedPipe) {
269  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
270                       base::ThreadTaskRunnerHandle::Get());
271
272  const char kText[] = "hello world";
273
274  Message message;
275  AllocMessage(kText, &message);
276
277  // Close the other end of the pipe.
278  handle1_.reset();
279
280  // Not observed yet because we haven't spun the message loop yet.
281  EXPECT_FALSE(connector0.encountered_error());
282
283  // Write failures are not reported.
284  bool ok = connector0.Accept(&message);
285  EXPECT_TRUE(ok);
286
287  // Still not observed.
288  EXPECT_FALSE(connector0.encountered_error());
289
290  // Spin the message loop, and then we should start observing the closed pipe.
291  base::RunLoop run_loop;
292  connector0.set_connection_error_handler(run_loop.QuitClosure());
293  run_loop.Run();
294
295  EXPECT_TRUE(connector0.encountered_error());
296}
297
298TEST_F(ConnectorTest, MessageWithHandles) {
299  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
300                       base::ThreadTaskRunnerHandle::Get());
301  Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
302                       base::ThreadTaskRunnerHandle::Get());
303
304  const char kText[] = "hello world";
305
306  Message message1;
307  AllocMessage(kText, &message1);
308
309  MessagePipe pipe;
310  message1.mutable_handles()->push_back(pipe.handle0.release());
311
312  connector0.Accept(&message1);
313
314  // The message should have been transferred, releasing the handles.
315  EXPECT_TRUE(message1.handles()->empty());
316
317  base::RunLoop run_loop;
318  MessageAccumulator accumulator(run_loop.QuitClosure());
319  connector1.set_incoming_receiver(&accumulator);
320
321  run_loop.Run();
322
323  ASSERT_FALSE(accumulator.IsEmpty());
324
325  Message message_received;
326  accumulator.Pop(&message_received);
327
328  EXPECT_EQ(
329      std::string(kText),
330      std::string(reinterpret_cast<const char*>(message_received.payload())));
331  ASSERT_EQ(1U, message_received.handles()->size());
332
333  // Now send a message to the transferred handle and confirm it's sent through
334  // to the orginal pipe.
335  // TODO(vtl): Do we need a better way of "downcasting" the handle types?
336  ScopedMessagePipeHandle smph;
337  smph.reset(MessagePipeHandle(message_received.handles()->front().value()));
338  message_received.mutable_handles()->front() = Handle();
339  // |smph| now owns this handle.
340
341  Connector connector_received(std::move(smph), Connector::SINGLE_THREADED_SEND,
342                               base::ThreadTaskRunnerHandle::Get());
343  Connector connector_original(std::move(pipe.handle1),
344                               Connector::SINGLE_THREADED_SEND,
345                               base::ThreadTaskRunnerHandle::Get());
346
347  Message message2;
348  AllocMessage(kText, &message2);
349
350  connector_received.Accept(&message2);
351  base::RunLoop run_loop2;
352  MessageAccumulator accumulator2(run_loop2.QuitClosure());
353  connector_original.set_incoming_receiver(&accumulator2);
354  run_loop2.Run();
355
356  ASSERT_FALSE(accumulator2.IsEmpty());
357
358  accumulator2.Pop(&message_received);
359
360  EXPECT_EQ(
361      std::string(kText),
362      std::string(reinterpret_cast<const char*>(message_received.payload())));
363}
364
365TEST_F(ConnectorTest, WaitForIncomingMessageWithError) {
366  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
367                       base::ThreadTaskRunnerHandle::Get());
368  // Close the other end of the pipe.
369  handle1_.reset();
370  ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE));
371}
372
373TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) {
374  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
375                       base::ThreadTaskRunnerHandle::Get());
376  Connector* connector1 =
377      new Connector(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
378                    base::ThreadTaskRunnerHandle::Get());
379
380  const char kText[] = "hello world";
381
382  Message message;
383  AllocMessage(kText, &message);
384
385  connector0.Accept(&message);
386
387  ConnectorDeletingMessageAccumulator accumulator(&connector1);
388  connector1->set_incoming_receiver(&accumulator);
389
390  connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
391
392  ASSERT_FALSE(connector1);
393  ASSERT_FALSE(accumulator.IsEmpty());
394
395  Message message_received;
396  accumulator.Pop(&message_received);
397
398  EXPECT_EQ(
399      std::string(kText),
400      std::string(reinterpret_cast<const char*>(message_received.payload())));
401}
402
403TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) {
404  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
405                       base::ThreadTaskRunnerHandle::Get());
406  Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
407                       base::ThreadTaskRunnerHandle::Get());
408
409  const char* kText[] = {"hello", "world"};
410
411  for (size_t i = 0; i < arraysize(kText); ++i) {
412    Message message;
413    AllocMessage(kText[i], &message);
414
415    connector0.Accept(&message);
416  }
417
418  ReentrantMessageAccumulator accumulator(&connector1);
419  connector1.set_incoming_receiver(&accumulator);
420
421  for (size_t i = 0; i < arraysize(kText); ++i) {
422    if (accumulator.IsEmpty()) {
423      base::RunLoop run_loop;
424      accumulator.set_closure(run_loop.QuitClosure());
425      run_loop.Run();
426    }
427    ASSERT_FALSE(accumulator.IsEmpty());
428
429    Message message_received;
430    accumulator.Pop(&message_received);
431
432    EXPECT_EQ(
433        std::string(kText[i]),
434        std::string(reinterpret_cast<const char*>(message_received.payload())));
435  }
436
437  ASSERT_EQ(2, accumulator.number_of_calls());
438}
439
440void ForwardErrorHandler(bool* called, const base::Closure& callback) {
441  *called = true;
442  callback.Run();
443}
444
445TEST_F(ConnectorTest, RaiseError) {
446  base::RunLoop run_loop, run_loop2;
447  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
448                       base::ThreadTaskRunnerHandle::Get());
449  bool error_handler_called0 = false;
450  connector0.set_connection_error_handler(
451      base::Bind(&ForwardErrorHandler, &error_handler_called0,
452                 run_loop.QuitClosure()));
453
454  Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
455                       base::ThreadTaskRunnerHandle::Get());
456  bool error_handler_called1 = false;
457  connector1.set_connection_error_handler(
458      base::Bind(&ForwardErrorHandler, &error_handler_called1,
459                 run_loop2.QuitClosure()));
460
461  const char kText[] = "hello world";
462
463  Message message;
464  AllocMessage(kText, &message);
465
466  connector0.Accept(&message);
467  connector0.RaiseError();
468
469  base::RunLoop run_loop3;
470  MessageAccumulator accumulator(run_loop3.QuitClosure());
471  connector1.set_incoming_receiver(&accumulator);
472
473  run_loop3.Run();
474
475  // Messages sent prior to RaiseError() still arrive at the other end.
476  ASSERT_FALSE(accumulator.IsEmpty());
477
478  Message message_received;
479  accumulator.Pop(&message_received);
480
481  EXPECT_EQ(
482      std::string(kText),
483      std::string(reinterpret_cast<const char*>(message_received.payload())));
484
485  run_loop.Run();
486  run_loop2.Run();
487
488  // Connection error handler is called at both sides.
489  EXPECT_TRUE(error_handler_called0);
490  EXPECT_TRUE(error_handler_called1);
491
492  // The error flag is set at both sides.
493  EXPECT_TRUE(connector0.encountered_error());
494  EXPECT_TRUE(connector1.encountered_error());
495
496  // The message pipe handle is valid at both sides.
497  EXPECT_TRUE(connector0.is_valid());
498  EXPECT_TRUE(connector1.is_valid());
499}
500
501void PauseConnectorAndRunClosure(Connector* connector,
502                                 const base::Closure& closure) {
503  connector->PauseIncomingMethodCallProcessing();
504  closure.Run();
505}
506
507TEST_F(ConnectorTest, PauseWithQueuedMessages) {
508  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
509                       base::ThreadTaskRunnerHandle::Get());
510  Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
511                       base::ThreadTaskRunnerHandle::Get());
512
513  const char kText[] = "hello world";
514
515  // Queue up two messages.
516  Message message;
517  AllocMessage(kText, &message);
518  connector0.Accept(&message);
519  AllocMessage(kText, &message);
520  connector0.Accept(&message);
521
522  base::RunLoop run_loop;
523  // Configure the accumulator such that it pauses after the first message is
524  // received.
525  MessageAccumulator accumulator(
526      base::Bind(&PauseConnectorAndRunClosure, &connector1,
527                 run_loop.QuitClosure()));
528  connector1.set_incoming_receiver(&accumulator);
529
530  run_loop.Run();
531
532  // As we paused after the first message we should only have gotten one
533  // message.
534  ASSERT_EQ(1u, accumulator.size());
535}
536
537void AccumulateWithNestedLoop(MessageAccumulator* accumulator,
538                              const base::Closure& closure) {
539  base::RunLoop nested_run_loop;
540  base::MessageLoop::ScopedNestableTaskAllower allow(
541      base::MessageLoop::current());
542  accumulator->set_closure(nested_run_loop.QuitClosure());
543  nested_run_loop.Run();
544  closure.Run();
545}
546
547TEST_F(ConnectorTest, ProcessWhenNested) {
548  Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
549                       base::ThreadTaskRunnerHandle::Get());
550  Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
551                       base::ThreadTaskRunnerHandle::Get());
552
553  const char kText[] = "hello world";
554
555  // Queue up two messages.
556  Message message;
557  AllocMessage(kText, &message);
558  connector0.Accept(&message);
559  AllocMessage(kText, &message);
560  connector0.Accept(&message);
561
562  base::RunLoop run_loop;
563  MessageAccumulator accumulator;
564  // When the accumulator gets the first message it spins a nested message
565  // loop. The loop is quit when another message is received.
566  accumulator.set_closure(base::Bind(&AccumulateWithNestedLoop, &accumulator,
567                                     run_loop.QuitClosure()));
568  connector1.set_incoming_receiver(&accumulator);
569
570  run_loop.Run();
571
572  ASSERT_EQ(2u, accumulator.size());
573}
574
575}  // namespace
576}  // namespace test
577}  // namespace mojo
578