1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <stdint.h>
6#include <stdio.h>
7#include <string.h>
8
9#include <vector>
10
11#include "base/bind.h"
12#include "base/files/file_path.h"
13#include "base/files/file_util.h"
14#include "base/files/scoped_file.h"
15#include "base/files/scoped_temp_dir.h"
16#include "base/location.h"
17#include "base/logging.h"
18#include "base/macros.h"
19#include "base/message_loop/message_loop.h"
20#include "base/test/test_io_thread.h"
21#include "base/threading/platform_thread.h"  // For |Sleep()|.
22#include "build/build_config.h"              // TODO(vtl): Remove this.
23#include "mojo/common/test/test_utils.h"
24#include "mojo/embedder/platform_channel_pair.h"
25#include "mojo/embedder/platform_shared_buffer.h"
26#include "mojo/embedder/scoped_platform_handle.h"
27#include "mojo/embedder/simple_platform_support.h"
28#include "mojo/system/channel.h"
29#include "mojo/system/channel_endpoint.h"
30#include "mojo/system/message_pipe.h"
31#include "mojo/system/message_pipe_dispatcher.h"
32#include "mojo/system/platform_handle_dispatcher.h"
33#include "mojo/system/raw_channel.h"
34#include "mojo/system/shared_buffer_dispatcher.h"
35#include "mojo/system/test_utils.h"
36#include "mojo/system/waiter.h"
37#include "testing/gtest/include/gtest/gtest.h"
38
39namespace mojo {
40namespace system {
41namespace {
42
43class RemoteMessagePipeTest : public testing::Test {
44 public:
45  RemoteMessagePipeTest() : io_thread_(base::TestIOThread::kAutoStart) {}
46  virtual ~RemoteMessagePipeTest() {}
47
48  virtual void SetUp() OVERRIDE {
49    io_thread_.PostTaskAndWait(
50        FROM_HERE,
51        base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
52                   base::Unretained(this)));
53  }
54
55  virtual void TearDown() OVERRIDE {
56    io_thread_.PostTaskAndWait(
57        FROM_HERE,
58        base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
59                   base::Unretained(this)));
60  }
61
62 protected:
63  // This connects the two given |ChannelEndpoint|s.
64  void ConnectChannelEndpoints(scoped_refptr<ChannelEndpoint> ep0,
65                               scoped_refptr<ChannelEndpoint> ep1) {
66    io_thread_.PostTaskAndWait(
67        FROM_HERE,
68        base::Bind(&RemoteMessagePipeTest::ConnectChannelEndpointsOnIOThread,
69                   base::Unretained(this),
70                   ep0,
71                   ep1));
72  }
73
74  // This bootstraps |ep| on |channels_[channel_index]|. It assumes/requires
75  // that this is the bootstrap case, i.e., that the endpoint IDs are both/will
76  // both be |Channel::kBootstrapEndpointId|. This returns *without* waiting for
77  // it to finish connecting.
78  void BootstrapChannelEndpointNoWait(unsigned channel_index,
79                                      scoped_refptr<ChannelEndpoint> ep) {
80    io_thread_.PostTask(
81        FROM_HERE,
82        base::Bind(&RemoteMessagePipeTest::BootstrapChannelEndpointOnIOThread,
83                   base::Unretained(this),
84                   channel_index,
85                   ep));
86  }
87
88  void RestoreInitialState() {
89    io_thread_.PostTaskAndWait(
90        FROM_HERE,
91        base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
92                   base::Unretained(this)));
93  }
94
95  embedder::PlatformSupport* platform_support() { return &platform_support_; }
96  base::TestIOThread* io_thread() { return &io_thread_; }
97
98 private:
99  void SetUpOnIOThread() {
100    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
101
102    embedder::PlatformChannelPair channel_pair;
103    platform_handles_[0] = channel_pair.PassServerHandle();
104    platform_handles_[1] = channel_pair.PassClientHandle();
105  }
106
107  void TearDownOnIOThread() {
108    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
109
110    if (channels_[0].get()) {
111      channels_[0]->Shutdown();
112      channels_[0] = nullptr;
113    }
114    if (channels_[1].get()) {
115      channels_[1]->Shutdown();
116      channels_[1] = nullptr;
117    }
118  }
119
120  void CreateAndInitChannel(unsigned channel_index) {
121    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
122    CHECK(channel_index == 0 || channel_index == 1);
123    CHECK(!channels_[channel_index].get());
124
125    channels_[channel_index] = new Channel(&platform_support_);
126    CHECK(channels_[channel_index]->Init(
127        RawChannel::Create(platform_handles_[channel_index].Pass())));
128  }
129
130  void ConnectChannelEndpointsOnIOThread(scoped_refptr<ChannelEndpoint> ep0,
131                                         scoped_refptr<ChannelEndpoint> ep1) {
132    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
133
134    if (!channels_[0].get())
135      CreateAndInitChannel(0);
136    if (!channels_[1].get())
137      CreateAndInitChannel(1);
138
139    MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint(ep0);
140    MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint(ep1);
141
142    CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
143    CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
144  }
145
146  void BootstrapChannelEndpointOnIOThread(unsigned channel_index,
147                                          scoped_refptr<ChannelEndpoint> ep) {
148    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
149    CHECK(channel_index == 0 || channel_index == 1);
150
151    CreateAndInitChannel(channel_index);
152    MessageInTransit::EndpointId endpoint_id =
153        channels_[channel_index]->AttachEndpoint(ep);
154    if (endpoint_id == MessageInTransit::kInvalidEndpointId)
155      return;
156
157    CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
158    CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
159        Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
160  }
161
162  void RestoreInitialStateOnIOThread() {
163    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
164
165    TearDownOnIOThread();
166    SetUpOnIOThread();
167  }
168
169  embedder::SimplePlatformSupport platform_support_;
170  base::TestIOThread io_thread_;
171  embedder::ScopedPlatformHandle platform_handles_[2];
172  scoped_refptr<Channel> channels_[2];
173
174  DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
175};
176
177TEST_F(RemoteMessagePipeTest, Basic) {
178  static const char kHello[] = "hello";
179  static const char kWorld[] = "world!!!1!!!1!";
180  char buffer[100] = {0};
181  uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
182  Waiter waiter;
183  HandleSignalsState hss;
184  uint32_t context = 0;
185
186  // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
187  // connected to MP 1, port 0, which will be attached to channel 1. This leaves
188  // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
189
190  scoped_refptr<ChannelEndpoint> ep0;
191  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
192  scoped_refptr<ChannelEndpoint> ep1;
193  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
194  ConnectChannelEndpoints(ep0, ep1);
195
196  // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
197
198  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
199  // it later, it might already be readable.)
200  waiter.Init();
201  ASSERT_EQ(
202      MOJO_RESULT_OK,
203      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
204
205  // Write to MP 0, port 0.
206  EXPECT_EQ(MOJO_RESULT_OK,
207            mp0->WriteMessage(0,
208                              UserPointer<const void>(kHello),
209                              sizeof(kHello),
210                              nullptr,
211                              MOJO_WRITE_MESSAGE_FLAG_NONE));
212
213  // Wait.
214  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
215  EXPECT_EQ(123u, context);
216  hss = HandleSignalsState();
217  mp1->RemoveWaiter(1, &waiter, &hss);
218  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
219            hss.satisfied_signals);
220  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
221            hss.satisfiable_signals);
222
223  // Read from MP 1, port 1.
224  EXPECT_EQ(MOJO_RESULT_OK,
225            mp1->ReadMessage(1,
226                             UserPointer<void>(buffer),
227                             MakeUserPointer(&buffer_size),
228                             nullptr,
229                             nullptr,
230                             MOJO_READ_MESSAGE_FLAG_NONE));
231  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
232  EXPECT_STREQ(kHello, buffer);
233
234  // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
235
236  waiter.Init();
237  ASSERT_EQ(
238      MOJO_RESULT_OK,
239      mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
240
241  EXPECT_EQ(MOJO_RESULT_OK,
242            mp1->WriteMessage(1,
243                              UserPointer<const void>(kWorld),
244                              sizeof(kWorld),
245                              nullptr,
246                              MOJO_WRITE_MESSAGE_FLAG_NONE));
247
248  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
249  EXPECT_EQ(456u, context);
250  hss = HandleSignalsState();
251  mp0->RemoveWaiter(0, &waiter, &hss);
252  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
253            hss.satisfied_signals);
254  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
255            hss.satisfiable_signals);
256
257  buffer_size = static_cast<uint32_t>(sizeof(buffer));
258  EXPECT_EQ(MOJO_RESULT_OK,
259            mp0->ReadMessage(0,
260                             UserPointer<void>(buffer),
261                             MakeUserPointer(&buffer_size),
262                             nullptr,
263                             nullptr,
264                             MOJO_READ_MESSAGE_FLAG_NONE));
265  EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
266  EXPECT_STREQ(kWorld, buffer);
267
268  // Close MP 0, port 0.
269  mp0->Close(0);
270
271  // Try to wait for MP 1, port 1 to become readable. This will eventually fail
272  // when it realizes that MP 0, port 0 has been closed. (It may also fail
273  // immediately.)
274  waiter.Init();
275  hss = HandleSignalsState();
276  MojoResult result =
277      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
278  if (result == MOJO_RESULT_OK) {
279    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
280              waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
281    EXPECT_EQ(789u, context);
282    hss = HandleSignalsState();
283    mp1->RemoveWaiter(1, &waiter, &hss);
284  }
285  EXPECT_EQ(0u, hss.satisfied_signals);
286  EXPECT_EQ(0u, hss.satisfiable_signals);
287
288  // And MP 1, port 1.
289  mp1->Close(1);
290}
291
292TEST_F(RemoteMessagePipeTest, Multiplex) {
293  static const char kHello[] = "hello";
294  static const char kWorld[] = "world!!!1!!!1!";
295  char buffer[100] = {0};
296  uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
297  Waiter waiter;
298  HandleSignalsState hss;
299  uint32_t context = 0;
300
301  // Connect message pipes as in the |Basic| test.
302
303  scoped_refptr<ChannelEndpoint> ep0;
304  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
305  scoped_refptr<ChannelEndpoint> ep1;
306  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
307  ConnectChannelEndpoints(ep0, ep1);
308
309  // Now put another message pipe on the channel.
310
311  scoped_refptr<ChannelEndpoint> ep2;
312  scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy(&ep2));
313  scoped_refptr<ChannelEndpoint> ep3;
314  scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal(&ep3));
315  ConnectChannelEndpoints(ep2, ep3);
316
317  // Write: MP 2, port 0 -> MP 3, port 1.
318
319  waiter.Init();
320  ASSERT_EQ(
321      MOJO_RESULT_OK,
322      mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
323
324  EXPECT_EQ(MOJO_RESULT_OK,
325            mp2->WriteMessage(0,
326                              UserPointer<const void>(kHello),
327                              sizeof(kHello),
328                              nullptr,
329                              MOJO_WRITE_MESSAGE_FLAG_NONE));
330
331  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
332  EXPECT_EQ(789u, context);
333  hss = HandleSignalsState();
334  mp3->RemoveWaiter(1, &waiter, &hss);
335  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
336            hss.satisfied_signals);
337  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
338            hss.satisfiable_signals);
339
340  // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
341  buffer_size = static_cast<uint32_t>(sizeof(buffer));
342  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
343            mp0->ReadMessage(0,
344                             UserPointer<void>(buffer),
345                             MakeUserPointer(&buffer_size),
346                             nullptr,
347                             nullptr,
348                             MOJO_READ_MESSAGE_FLAG_NONE));
349  buffer_size = static_cast<uint32_t>(sizeof(buffer));
350  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
351            mp1->ReadMessage(1,
352                             UserPointer<void>(buffer),
353                             MakeUserPointer(&buffer_size),
354                             nullptr,
355                             nullptr,
356                             MOJO_READ_MESSAGE_FLAG_NONE));
357  buffer_size = static_cast<uint32_t>(sizeof(buffer));
358  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
359            mp2->ReadMessage(0,
360                             UserPointer<void>(buffer),
361                             MakeUserPointer(&buffer_size),
362                             nullptr,
363                             nullptr,
364                             MOJO_READ_MESSAGE_FLAG_NONE));
365
366  // Read from MP 3, port 1.
367  buffer_size = static_cast<uint32_t>(sizeof(buffer));
368  EXPECT_EQ(MOJO_RESULT_OK,
369            mp3->ReadMessage(1,
370                             UserPointer<void>(buffer),
371                             MakeUserPointer(&buffer_size),
372                             nullptr,
373                             nullptr,
374                             MOJO_READ_MESSAGE_FLAG_NONE));
375  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
376  EXPECT_STREQ(kHello, buffer);
377
378  // Write: MP 0, port 0 -> MP 1, port 1 again.
379
380  waiter.Init();
381  ASSERT_EQ(
382      MOJO_RESULT_OK,
383      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
384
385  EXPECT_EQ(MOJO_RESULT_OK,
386            mp0->WriteMessage(0,
387                              UserPointer<const void>(kWorld),
388                              sizeof(kWorld),
389                              nullptr,
390                              MOJO_WRITE_MESSAGE_FLAG_NONE));
391
392  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
393  EXPECT_EQ(123u, context);
394  hss = HandleSignalsState();
395  mp1->RemoveWaiter(1, &waiter, &hss);
396  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
397            hss.satisfied_signals);
398  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
399            hss.satisfiable_signals);
400
401  // Make sure there's nothing on the other ports.
402  buffer_size = static_cast<uint32_t>(sizeof(buffer));
403  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
404            mp0->ReadMessage(0,
405                             UserPointer<void>(buffer),
406                             MakeUserPointer(&buffer_size),
407                             nullptr,
408                             nullptr,
409                             MOJO_READ_MESSAGE_FLAG_NONE));
410  buffer_size = static_cast<uint32_t>(sizeof(buffer));
411  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
412            mp2->ReadMessage(0,
413                             UserPointer<void>(buffer),
414                             MakeUserPointer(&buffer_size),
415                             nullptr,
416                             nullptr,
417                             MOJO_READ_MESSAGE_FLAG_NONE));
418  buffer_size = static_cast<uint32_t>(sizeof(buffer));
419  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
420            mp3->ReadMessage(1,
421                             UserPointer<void>(buffer),
422                             MakeUserPointer(&buffer_size),
423                             nullptr,
424                             nullptr,
425                             MOJO_READ_MESSAGE_FLAG_NONE));
426
427  buffer_size = static_cast<uint32_t>(sizeof(buffer));
428  EXPECT_EQ(MOJO_RESULT_OK,
429            mp1->ReadMessage(1,
430                             UserPointer<void>(buffer),
431                             MakeUserPointer(&buffer_size),
432                             nullptr,
433                             nullptr,
434                             MOJO_READ_MESSAGE_FLAG_NONE));
435  EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
436  EXPECT_STREQ(kWorld, buffer);
437
438  mp0->Close(0);
439  mp1->Close(1);
440  mp2->Close(0);
441  mp3->Close(1);
442}
443
444TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
445  static const char kHello[] = "hello";
446  char buffer[100] = {0};
447  uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
448  Waiter waiter;
449  HandleSignalsState hss;
450  uint32_t context = 0;
451
452  // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
453  // connected to MP 1, port 0, which will be attached to channel 1. This leaves
454  // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
455
456  scoped_refptr<ChannelEndpoint> ep0;
457  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
458
459  // Write to MP 0, port 0.
460  EXPECT_EQ(MOJO_RESULT_OK,
461            mp0->WriteMessage(0,
462                              UserPointer<const void>(kHello),
463                              sizeof(kHello),
464                              nullptr,
465                              MOJO_WRITE_MESSAGE_FLAG_NONE));
466
467  BootstrapChannelEndpointNoWait(0, ep0);
468
469  // Close MP 0, port 0 before channel 1 is even connected.
470  mp0->Close(0);
471
472  scoped_refptr<ChannelEndpoint> ep1;
473  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
474
475  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
476  // it later, it might already be readable.)
477  waiter.Init();
478  ASSERT_EQ(
479      MOJO_RESULT_OK,
480      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
481
482  BootstrapChannelEndpointNoWait(1, ep1);
483
484  // Wait.
485  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
486  EXPECT_EQ(123u, context);
487  hss = HandleSignalsState();
488  // Note: MP 1, port 1 should definitely should be readable, but it may or may
489  // not appear as writable (there's a race, and it may not have noticed that
490  // the other side was closed yet -- e.g., inserting a sleep here would make it
491  // much more likely to notice that it's no longer writable).
492  mp1->RemoveWaiter(1, &waiter, &hss);
493  EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
494  EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
495
496  // Read from MP 1, port 1.
497  EXPECT_EQ(MOJO_RESULT_OK,
498            mp1->ReadMessage(1,
499                             UserPointer<void>(buffer),
500                             MakeUserPointer(&buffer_size),
501                             nullptr,
502                             nullptr,
503                             MOJO_READ_MESSAGE_FLAG_NONE));
504  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
505  EXPECT_STREQ(kHello, buffer);
506
507  // And MP 1, port 1.
508  mp1->Close(1);
509}
510
511TEST_F(RemoteMessagePipeTest, HandlePassing) {
512  static const char kHello[] = "hello";
513  Waiter waiter;
514  HandleSignalsState hss;
515  uint32_t context = 0;
516
517  scoped_refptr<ChannelEndpoint> ep0;
518  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
519  scoped_refptr<ChannelEndpoint> ep1;
520  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
521  ConnectChannelEndpoints(ep0, ep1);
522
523  // We'll try to pass this dispatcher.
524  scoped_refptr<MessagePipeDispatcher> dispatcher(
525      new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
526  scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal());
527  dispatcher->Init(local_mp, 0);
528
529  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
530  // it later, it might already be readable.)
531  waiter.Init();
532  ASSERT_EQ(
533      MOJO_RESULT_OK,
534      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
535
536  // Write to MP 0, port 0.
537  {
538    DispatcherTransport transport(
539        test::DispatcherTryStartTransport(dispatcher.get()));
540    EXPECT_TRUE(transport.is_valid());
541
542    std::vector<DispatcherTransport> transports;
543    transports.push_back(transport);
544    EXPECT_EQ(MOJO_RESULT_OK,
545              mp0->WriteMessage(0,
546                                UserPointer<const void>(kHello),
547                                sizeof(kHello),
548                                &transports,
549                                MOJO_WRITE_MESSAGE_FLAG_NONE));
550    transport.End();
551
552    // |dispatcher| should have been closed. This is |DCHECK()|ed when the
553    // |dispatcher| is destroyed.
554    EXPECT_TRUE(dispatcher->HasOneRef());
555    dispatcher = nullptr;
556  }
557
558  // Wait.
559  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
560  EXPECT_EQ(123u, context);
561  hss = HandleSignalsState();
562  mp1->RemoveWaiter(1, &waiter, &hss);
563  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
564            hss.satisfied_signals);
565  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
566            hss.satisfiable_signals);
567
568  // Read from MP 1, port 1.
569  char read_buffer[100] = {0};
570  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
571  DispatcherVector read_dispatchers;
572  uint32_t read_num_dispatchers = 10;  // Maximum to get.
573  EXPECT_EQ(MOJO_RESULT_OK,
574            mp1->ReadMessage(1,
575                             UserPointer<void>(read_buffer),
576                             MakeUserPointer(&read_buffer_size),
577                             &read_dispatchers,
578                             &read_num_dispatchers,
579                             MOJO_READ_MESSAGE_FLAG_NONE));
580  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
581  EXPECT_STREQ(kHello, read_buffer);
582  EXPECT_EQ(1u, read_dispatchers.size());
583  EXPECT_EQ(1u, read_num_dispatchers);
584  ASSERT_TRUE(read_dispatchers[0].get());
585  EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
586
587  EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
588  dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
589
590  // Add the waiter now, before it becomes readable to avoid a race.
591  waiter.Init();
592  ASSERT_EQ(MOJO_RESULT_OK,
593            dispatcher->AddWaiter(
594                &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
595
596  // Write to "local_mp", port 1.
597  EXPECT_EQ(MOJO_RESULT_OK,
598            local_mp->WriteMessage(1,
599                                   UserPointer<const void>(kHello),
600                                   sizeof(kHello),
601                                   nullptr,
602                                   MOJO_WRITE_MESSAGE_FLAG_NONE));
603
604  // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
605  // here. (We don't crash if I sleep and then close.)
606
607  // Wait for the dispatcher to become readable.
608  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
609  EXPECT_EQ(456u, context);
610  hss = HandleSignalsState();
611  dispatcher->RemoveWaiter(&waiter, &hss);
612  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
613            hss.satisfied_signals);
614  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
615            hss.satisfiable_signals);
616
617  // Read from the dispatcher.
618  memset(read_buffer, 0, sizeof(read_buffer));
619  read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
620  EXPECT_EQ(MOJO_RESULT_OK,
621            dispatcher->ReadMessage(UserPointer<void>(read_buffer),
622                                    MakeUserPointer(&read_buffer_size),
623                                    0,
624                                    nullptr,
625                                    MOJO_READ_MESSAGE_FLAG_NONE));
626  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
627  EXPECT_STREQ(kHello, read_buffer);
628
629  // Prepare to wait on "local_mp", port 1.
630  waiter.Init();
631  ASSERT_EQ(MOJO_RESULT_OK,
632            local_mp->AddWaiter(
633                1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
634
635  // Write to the dispatcher.
636  EXPECT_EQ(MOJO_RESULT_OK,
637            dispatcher->WriteMessage(UserPointer<const void>(kHello),
638                                     sizeof(kHello),
639                                     nullptr,
640                                     MOJO_WRITE_MESSAGE_FLAG_NONE));
641
642  // Wait.
643  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
644  EXPECT_EQ(789u, context);
645  hss = HandleSignalsState();
646  local_mp->RemoveWaiter(1, &waiter, &hss);
647  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
648            hss.satisfied_signals);
649  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
650            hss.satisfiable_signals);
651
652  // Read from "local_mp", port 1.
653  memset(read_buffer, 0, sizeof(read_buffer));
654  read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
655  EXPECT_EQ(MOJO_RESULT_OK,
656            local_mp->ReadMessage(1,
657                                  UserPointer<void>(read_buffer),
658                                  MakeUserPointer(&read_buffer_size),
659                                  nullptr,
660                                  nullptr,
661                                  MOJO_READ_MESSAGE_FLAG_NONE));
662  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
663  EXPECT_STREQ(kHello, read_buffer);
664
665  // TODO(vtl): Also test that messages queued up before the handle was sent are
666  // delivered properly.
667
668  // Close everything that belongs to us.
669  mp0->Close(0);
670  mp1->Close(1);
671  EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
672  // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
673  local_mp->Close(1);
674}
675
676#if defined(OS_POSIX)
677#define MAYBE_SharedBufferPassing SharedBufferPassing
678#else
679// Not yet implemented (on Windows).
680#define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
681#endif
682TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
683  static const char kHello[] = "hello";
684  Waiter waiter;
685  HandleSignalsState hss;
686  uint32_t context = 0;
687
688  scoped_refptr<ChannelEndpoint> ep0;
689  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
690  scoped_refptr<ChannelEndpoint> ep1;
691  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
692  ConnectChannelEndpoints(ep0, ep1);
693
694  // We'll try to pass this dispatcher.
695  scoped_refptr<SharedBufferDispatcher> dispatcher;
696  EXPECT_EQ(MOJO_RESULT_OK,
697            SharedBufferDispatcher::Create(
698                platform_support(),
699                SharedBufferDispatcher::kDefaultCreateOptions,
700                100,
701                &dispatcher));
702  ASSERT_TRUE(dispatcher.get());
703
704  // Make a mapping.
705  scoped_ptr<embedder::PlatformSharedBufferMapping> mapping0;
706  EXPECT_EQ(
707      MOJO_RESULT_OK,
708      dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping0));
709  ASSERT_TRUE(mapping0);
710  ASSERT_TRUE(mapping0->GetBase());
711  ASSERT_EQ(100u, mapping0->GetLength());
712  static_cast<char*>(mapping0->GetBase())[0] = 'A';
713  static_cast<char*>(mapping0->GetBase())[50] = 'B';
714  static_cast<char*>(mapping0->GetBase())[99] = 'C';
715
716  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
717  // it later, it might already be readable.)
718  waiter.Init();
719  ASSERT_EQ(
720      MOJO_RESULT_OK,
721      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
722
723  // Write to MP 0, port 0.
724  {
725    DispatcherTransport transport(
726        test::DispatcherTryStartTransport(dispatcher.get()));
727    EXPECT_TRUE(transport.is_valid());
728
729    std::vector<DispatcherTransport> transports;
730    transports.push_back(transport);
731    EXPECT_EQ(MOJO_RESULT_OK,
732              mp0->WriteMessage(0,
733                                UserPointer<const void>(kHello),
734                                sizeof(kHello),
735                                &transports,
736                                MOJO_WRITE_MESSAGE_FLAG_NONE));
737    transport.End();
738
739    // |dispatcher| should have been closed. This is |DCHECK()|ed when the
740    // |dispatcher| is destroyed.
741    EXPECT_TRUE(dispatcher->HasOneRef());
742    dispatcher = nullptr;
743  }
744
745  // Wait.
746  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
747  EXPECT_EQ(123u, context);
748  hss = HandleSignalsState();
749  mp1->RemoveWaiter(1, &waiter, &hss);
750  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
751            hss.satisfied_signals);
752  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
753            hss.satisfiable_signals);
754
755  // Read from MP 1, port 1.
756  char read_buffer[100] = {0};
757  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
758  DispatcherVector read_dispatchers;
759  uint32_t read_num_dispatchers = 10;  // Maximum to get.
760  EXPECT_EQ(MOJO_RESULT_OK,
761            mp1->ReadMessage(1,
762                             UserPointer<void>(read_buffer),
763                             MakeUserPointer(&read_buffer_size),
764                             &read_dispatchers,
765                             &read_num_dispatchers,
766                             MOJO_READ_MESSAGE_FLAG_NONE));
767  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
768  EXPECT_STREQ(kHello, read_buffer);
769  EXPECT_EQ(1u, read_dispatchers.size());
770  EXPECT_EQ(1u, read_num_dispatchers);
771  ASSERT_TRUE(read_dispatchers[0].get());
772  EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
773
774  EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
775  dispatcher = static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
776
777  // Make another mapping.
778  scoped_ptr<embedder::PlatformSharedBufferMapping> mapping1;
779  EXPECT_EQ(
780      MOJO_RESULT_OK,
781      dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1));
782  ASSERT_TRUE(mapping1);
783  ASSERT_TRUE(mapping1->GetBase());
784  ASSERT_EQ(100u, mapping1->GetLength());
785  EXPECT_NE(mapping1->GetBase(), mapping0->GetBase());
786  EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
787  EXPECT_EQ('B', static_cast<char*>(mapping1->GetBase())[50]);
788  EXPECT_EQ('C', static_cast<char*>(mapping1->GetBase())[99]);
789
790  // Write stuff either way.
791  static_cast<char*>(mapping1->GetBase())[1] = 'x';
792  EXPECT_EQ('x', static_cast<char*>(mapping0->GetBase())[1]);
793  static_cast<char*>(mapping0->GetBase())[2] = 'y';
794  EXPECT_EQ('y', static_cast<char*>(mapping1->GetBase())[2]);
795
796  // Kill the first mapping; the second should still be valid.
797  mapping0.reset();
798  EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
799
800  // Close everything that belongs to us.
801  mp0->Close(0);
802  mp1->Close(1);
803  EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
804
805  // The second mapping should still be good.
806  EXPECT_EQ('x', static_cast<char*>(mapping1->GetBase())[1]);
807}
808
809#if defined(OS_POSIX)
810#define MAYBE_PlatformHandlePassing PlatformHandlePassing
811#else
812// Not yet implemented (on Windows).
813#define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
814#endif
815TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
816  base::ScopedTempDir temp_dir;
817  ASSERT_TRUE(temp_dir.CreateUniqueTempDir());
818
819  static const char kHello[] = "hello";
820  static const char kWorld[] = "world";
821  Waiter waiter;
822  uint32_t context = 0;
823  HandleSignalsState hss;
824
825  scoped_refptr<ChannelEndpoint> ep0;
826  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
827  scoped_refptr<ChannelEndpoint> ep1;
828  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
829  ConnectChannelEndpoints(ep0, ep1);
830
831  base::FilePath unused;
832  base::ScopedFILE fp(
833      CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused));
834  EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
835  // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
836  // be passed.
837  scoped_refptr<PlatformHandleDispatcher> dispatcher(
838      new PlatformHandleDispatcher(
839          mojo::test::PlatformHandleFromFILE(fp.Pass())));
840
841  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
842  // it later, it might already be readable.)
843  waiter.Init();
844  ASSERT_EQ(
845      MOJO_RESULT_OK,
846      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
847
848  // Write to MP 0, port 0.
849  {
850    DispatcherTransport transport(
851        test::DispatcherTryStartTransport(dispatcher.get()));
852    EXPECT_TRUE(transport.is_valid());
853
854    std::vector<DispatcherTransport> transports;
855    transports.push_back(transport);
856    EXPECT_EQ(MOJO_RESULT_OK,
857              mp0->WriteMessage(0,
858                                UserPointer<const void>(kWorld),
859                                sizeof(kWorld),
860                                &transports,
861                                MOJO_WRITE_MESSAGE_FLAG_NONE));
862    transport.End();
863
864    // |dispatcher| should have been closed. This is |DCHECK()|ed when the
865    // |dispatcher| is destroyed.
866    EXPECT_TRUE(dispatcher->HasOneRef());
867    dispatcher = nullptr;
868  }
869
870  // Wait.
871  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
872  EXPECT_EQ(123u, context);
873  hss = HandleSignalsState();
874  mp1->RemoveWaiter(1, &waiter, &hss);
875  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
876            hss.satisfied_signals);
877  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
878            hss.satisfiable_signals);
879
880  // Read from MP 1, port 1.
881  char read_buffer[100] = {0};
882  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
883  DispatcherVector read_dispatchers;
884  uint32_t read_num_dispatchers = 10;  // Maximum to get.
885  EXPECT_EQ(MOJO_RESULT_OK,
886            mp1->ReadMessage(1,
887                             UserPointer<void>(read_buffer),
888                             MakeUserPointer(&read_buffer_size),
889                             &read_dispatchers,
890                             &read_num_dispatchers,
891                             MOJO_READ_MESSAGE_FLAG_NONE));
892  EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
893  EXPECT_STREQ(kWorld, read_buffer);
894  EXPECT_EQ(1u, read_dispatchers.size());
895  EXPECT_EQ(1u, read_num_dispatchers);
896  ASSERT_TRUE(read_dispatchers[0].get());
897  EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
898
899  EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
900  dispatcher =
901      static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
902
903  embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
904  EXPECT_TRUE(h.is_valid());
905
906  fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
907  EXPECT_FALSE(h.is_valid());
908  EXPECT_TRUE(fp);
909
910  rewind(fp.get());
911  memset(read_buffer, 0, sizeof(read_buffer));
912  EXPECT_EQ(sizeof(kHello),
913            fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
914  EXPECT_STREQ(kHello, read_buffer);
915
916  // Close everything that belongs to us.
917  mp0->Close(0);
918  mp1->Close(1);
919  EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
920}
921
922// Test racing closes (on each end).
923// Note: A flaky failure would almost certainly indicate a problem in the code
924// itself (not in the test). Also, any logged warnings/errors would also
925// probably be indicative of bugs.
926TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
927  base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
928
929  for (unsigned i = 0; i < 256; i++) {
930    DVLOG(2) << "---------------------------------------- " << i;
931    scoped_refptr<ChannelEndpoint> ep0;
932    scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
933    BootstrapChannelEndpointNoWait(0, ep0);
934
935    scoped_refptr<ChannelEndpoint> ep1;
936    scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
937    BootstrapChannelEndpointNoWait(1, ep1);
938
939    if (i & 1u) {
940      io_thread()->task_runner()->PostTask(
941          FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
942    }
943    if (i & 2u)
944      base::PlatformThread::Sleep(delay);
945
946    mp0->Close(0);
947
948    if (i & 4u) {
949      io_thread()->task_runner()->PostTask(
950          FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
951    }
952    if (i & 8u)
953      base::PlatformThread::Sleep(delay);
954
955    mp1->Close(1);
956
957    RestoreInitialState();
958  }
959}
960
961// Tests passing an end of a message pipe over a remote message pipe, and then
962// passing that end back.
963// TODO(vtl): Also test passing a message pipe across two remote message pipes.
964TEST_F(RemoteMessagePipeTest, PassMessagePipeHandleAcrossAndBack) {
965  static const char kHello[] = "hello";
966  static const char kWorld[] = "world";
967  Waiter waiter;
968  HandleSignalsState hss;
969  uint32_t context = 0;
970
971  scoped_refptr<ChannelEndpoint> ep0;
972  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
973  scoped_refptr<ChannelEndpoint> ep1;
974  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
975  ConnectChannelEndpoints(ep0, ep1);
976
977  // We'll try to pass this dispatcher.
978  scoped_refptr<MessagePipeDispatcher> dispatcher(
979      new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
980  scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal());
981  dispatcher->Init(local_mp, 0);
982
983  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
984  // it later, it might already be readable.)
985  waiter.Init();
986  ASSERT_EQ(
987      MOJO_RESULT_OK,
988      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
989
990  // Write to MP 0, port 0.
991  {
992    DispatcherTransport transport(
993        test::DispatcherTryStartTransport(dispatcher.get()));
994    EXPECT_TRUE(transport.is_valid());
995
996    std::vector<DispatcherTransport> transports;
997    transports.push_back(transport);
998    EXPECT_EQ(MOJO_RESULT_OK,
999              mp0->WriteMessage(0,
1000                                UserPointer<const void>(kHello),
1001                                sizeof(kHello),
1002                                &transports,
1003                                MOJO_WRITE_MESSAGE_FLAG_NONE));
1004    transport.End();
1005
1006    // |dispatcher| should have been closed. This is |DCHECK()|ed when the
1007    // |dispatcher| is destroyed.
1008    EXPECT_TRUE(dispatcher->HasOneRef());
1009    dispatcher = nullptr;
1010  }
1011
1012  // Wait.
1013  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1014  EXPECT_EQ(123u, context);
1015  hss = HandleSignalsState();
1016  mp1->RemoveWaiter(1, &waiter, &hss);
1017  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1018            hss.satisfied_signals);
1019  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1020            hss.satisfiable_signals);
1021
1022  // Read from MP 1, port 1.
1023  char read_buffer[100] = {0};
1024  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1025  DispatcherVector read_dispatchers;
1026  uint32_t read_num_dispatchers = 10;  // Maximum to get.
1027  EXPECT_EQ(MOJO_RESULT_OK,
1028            mp1->ReadMessage(1,
1029                             UserPointer<void>(read_buffer),
1030                             MakeUserPointer(&read_buffer_size),
1031                             &read_dispatchers,
1032                             &read_num_dispatchers,
1033                             MOJO_READ_MESSAGE_FLAG_NONE));
1034  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
1035  EXPECT_STREQ(kHello, read_buffer);
1036  EXPECT_EQ(1u, read_dispatchers.size());
1037  EXPECT_EQ(1u, read_num_dispatchers);
1038  ASSERT_TRUE(read_dispatchers[0].get());
1039  EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
1040
1041  EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
1042  dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
1043  read_dispatchers.clear();
1044
1045  // Now pass it back.
1046
1047  // Prepare to wait on MP 0, port 0. (Add the waiter now. Otherwise, if we do
1048  // it later, it might already be readable.)
1049  waiter.Init();
1050  ASSERT_EQ(
1051      MOJO_RESULT_OK,
1052      mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
1053
1054  // Write to MP 1, port 1.
1055  {
1056    DispatcherTransport transport(
1057        test::DispatcherTryStartTransport(dispatcher.get()));
1058    EXPECT_TRUE(transport.is_valid());
1059
1060    std::vector<DispatcherTransport> transports;
1061    transports.push_back(transport);
1062    EXPECT_EQ(MOJO_RESULT_OK,
1063              mp1->WriteMessage(1,
1064                                UserPointer<const void>(kWorld),
1065                                sizeof(kWorld),
1066                                &transports,
1067                                MOJO_WRITE_MESSAGE_FLAG_NONE));
1068    transport.End();
1069
1070    // |dispatcher| should have been closed. This is |DCHECK()|ed when the
1071    // |dispatcher| is destroyed.
1072    EXPECT_TRUE(dispatcher->HasOneRef());
1073    dispatcher = nullptr;
1074  }
1075
1076  // Wait.
1077  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1078  EXPECT_EQ(456u, context);
1079  hss = HandleSignalsState();
1080  mp0->RemoveWaiter(0, &waiter, &hss);
1081  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1082            hss.satisfied_signals);
1083  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1084            hss.satisfiable_signals);
1085
1086  // Read from MP 0, port 0.
1087  read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1088  read_num_dispatchers = 10;  // Maximum to get.
1089  EXPECT_EQ(MOJO_RESULT_OK,
1090            mp0->ReadMessage(0,
1091                             UserPointer<void>(read_buffer),
1092                             MakeUserPointer(&read_buffer_size),
1093                             &read_dispatchers,
1094                             &read_num_dispatchers,
1095                             MOJO_READ_MESSAGE_FLAG_NONE));
1096  EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
1097  EXPECT_STREQ(kWorld, read_buffer);
1098  EXPECT_EQ(1u, read_dispatchers.size());
1099  EXPECT_EQ(1u, read_num_dispatchers);
1100  ASSERT_TRUE(read_dispatchers[0].get());
1101  EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
1102
1103  EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
1104  dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
1105  read_dispatchers.clear();
1106
1107  // Add the waiter now, before it becomes readable to avoid a race.
1108  waiter.Init();
1109  ASSERT_EQ(MOJO_RESULT_OK,
1110            dispatcher->AddWaiter(
1111                &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
1112
1113  // Write to "local_mp", port 1.
1114  EXPECT_EQ(MOJO_RESULT_OK,
1115            local_mp->WriteMessage(1,
1116                                   UserPointer<const void>(kHello),
1117                                   sizeof(kHello),
1118                                   nullptr,
1119                                   MOJO_WRITE_MESSAGE_FLAG_NONE));
1120
1121  // Wait for the dispatcher to become readable.
1122  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1123  EXPECT_EQ(789u, context);
1124  hss = HandleSignalsState();
1125  dispatcher->RemoveWaiter(&waiter, &hss);
1126  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1127            hss.satisfied_signals);
1128  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1129            hss.satisfiable_signals);
1130
1131  // Read from the dispatcher.
1132  memset(read_buffer, 0, sizeof(read_buffer));
1133  read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1134  EXPECT_EQ(MOJO_RESULT_OK,
1135            dispatcher->ReadMessage(UserPointer<void>(read_buffer),
1136                                    MakeUserPointer(&read_buffer_size),
1137                                    0,
1138                                    nullptr,
1139                                    MOJO_READ_MESSAGE_FLAG_NONE));
1140  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
1141  EXPECT_STREQ(kHello, read_buffer);
1142
1143  // Prepare to wait on "local_mp", port 1.
1144  waiter.Init();
1145  ASSERT_EQ(MOJO_RESULT_OK,
1146            local_mp->AddWaiter(
1147                1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
1148
1149  // Write to the dispatcher.
1150  EXPECT_EQ(MOJO_RESULT_OK,
1151            dispatcher->WriteMessage(UserPointer<const void>(kHello),
1152                                     sizeof(kHello),
1153                                     nullptr,
1154                                     MOJO_WRITE_MESSAGE_FLAG_NONE));
1155
1156  // Wait.
1157  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1158  EXPECT_EQ(789u, context);
1159  hss = HandleSignalsState();
1160  local_mp->RemoveWaiter(1, &waiter, &hss);
1161  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1162            hss.satisfied_signals);
1163  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1164            hss.satisfiable_signals);
1165
1166  // Read from "local_mp", port 1.
1167  memset(read_buffer, 0, sizeof(read_buffer));
1168  read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1169  EXPECT_EQ(MOJO_RESULT_OK,
1170            local_mp->ReadMessage(1,
1171                                  UserPointer<void>(read_buffer),
1172                                  MakeUserPointer(&read_buffer_size),
1173                                  nullptr,
1174                                  nullptr,
1175                                  MOJO_READ_MESSAGE_FLAG_NONE));
1176  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
1177  EXPECT_STREQ(kHello, read_buffer);
1178
1179  // TODO(vtl): Also test the cases where messages are written and read (at
1180  // various points) on the message pipe being passed around.
1181
1182  // Close everything that belongs to us.
1183  mp0->Close(0);
1184  mp1->Close(1);
1185  EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
1186  // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
1187  local_mp->Close(1);
1188}
1189
1190}  // namespace
1191}  // namespace system
1192}  // namespace mojo
1193