multiprocess_message_pipe_unittest.cc revision c5cede9ae108bb15f6b7a8aea21c7e1fefa2834c
15f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)// Copyright 2013 The Chromium Authors. All rights reserved.
25f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)// Use of this source code is governed by a BSD-style license that can be
35f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)// found in the LICENSE file.
45f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)
55f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include <stdint.h>
65f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)
75f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include <string>
85f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)
95f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "base/basictypes.h"
105f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "base/bind.h"
115f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "base/location.h"
125f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "base/logging.h"
135f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "base/threading/platform_thread.h"  // For |Sleep()|.
145f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/common/test/multiprocess_test_helper.h"
155f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/embedder/scoped_platform_handle.h"
165f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/channel.h"
175f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/local_message_pipe_endpoint.h"
185f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/message_pipe.h"
195f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/proxy_message_pipe_endpoint.h"
205f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/raw_channel.h"
215f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/test_utils.h"
225f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/waiter.h"
235f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "testing/gtest/include/gtest/gtest.h"
245f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)
255f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)namespace mojo {
265f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)namespace system {
275f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)namespace {
285f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)
295f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)class ChannelThread {
305f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles) public:
315f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)  ChannelThread() : test_io_thread_(test::TestIOThread::kManualStart) {}
325f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)  ~ChannelThread() {
33    Stop();
34  }
35
36  void Start(embedder::ScopedPlatformHandle platform_handle,
37             scoped_refptr<MessagePipe> message_pipe) {
38    test_io_thread_.Start();
39    test_io_thread_.PostTaskAndWait(
40        FROM_HERE,
41        base::Bind(&ChannelThread::InitChannelOnIOThread,
42                   base::Unretained(this), base::Passed(&platform_handle),
43                   message_pipe));
44  }
45
46  void Stop() {
47    if (channel_) {
48      // Hack to flush write buffers before quitting.
49      // TODO(vtl): Remove this once |Channel| has a
50      // |FlushWriteBufferAndShutdown()| (or whatever).
51      while (!channel_->IsWriteBufferEmpty())
52        base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20));
53
54      test_io_thread_.PostTaskAndWait(
55          FROM_HERE,
56          base::Bind(&ChannelThread::ShutdownChannelOnIOThread,
57                     base::Unretained(this)));
58    }
59    test_io_thread_.Stop();
60  }
61
62 private:
63  void InitChannelOnIOThread(embedder::ScopedPlatformHandle platform_handle,
64                             scoped_refptr<MessagePipe> message_pipe) {
65    CHECK_EQ(base::MessageLoop::current(), test_io_thread_.message_loop());
66    CHECK(platform_handle.is_valid());
67
68    // Create and initialize |Channel|.
69    channel_ = new Channel();
70    CHECK(channel_->Init(RawChannel::Create(platform_handle.Pass())));
71
72    // Attach the message pipe endpoint.
73    // Note: On the "server" (parent process) side, we need not attach the
74    // message pipe endpoint immediately. However, on the "client" (child
75    // process) side, this *must* be done here -- otherwise, the |Channel| may
76    // receive/process messages (which it can do as soon as it's hooked up to
77    // the IO thread message loop, and that message loop runs) before the
78    // message pipe endpoint is attached.
79    CHECK_EQ(channel_->AttachMessagePipeEndpoint(message_pipe, 1),
80             Channel::kBootstrapEndpointId);
81    channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId,
82                                     Channel::kBootstrapEndpointId);
83  }
84
85  void ShutdownChannelOnIOThread() {
86    CHECK(channel_.get());
87    channel_->Shutdown();
88    channel_ = NULL;
89  }
90
91  test::TestIOThread test_io_thread_;
92  scoped_refptr<Channel> channel_;
93
94  DISALLOW_COPY_AND_ASSIGN(ChannelThread);
95};
96
97class MultiprocessMessagePipeTest : public testing::Test {
98 public:
99  MultiprocessMessagePipeTest() {}
100  virtual ~MultiprocessMessagePipeTest() {}
101
102 protected:
103  void Init(scoped_refptr<MessagePipe> mp) {
104    channel_thread_.Start(helper_.server_platform_handle.Pass(), mp);
105  }
106
107  mojo::test::MultiprocessTestHelper* helper() { return &helper_; }
108
109 private:
110  ChannelThread channel_thread_;
111  mojo::test::MultiprocessTestHelper helper_;
112
113  DISALLOW_COPY_AND_ASSIGN(MultiprocessMessagePipeTest);
114};
115
116MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp, MojoWaitFlags flags) {
117  Waiter waiter;
118  waiter.Init();
119
120  MojoResult add_result = mp->AddWaiter(0, &waiter, flags, MOJO_RESULT_OK);
121  if (add_result != MOJO_RESULT_OK) {
122    return (add_result == MOJO_RESULT_ALREADY_EXISTS) ? MOJO_RESULT_OK :
123                                                        add_result;
124  }
125
126  MojoResult wait_result = waiter.Wait(MOJO_DEADLINE_INDEFINITE);
127  mp->RemoveWaiter(0, &waiter);
128  return wait_result;
129}
130
131// For each message received, sends a reply message with the same contents
132// repeated twice, until the other end is closed or it receives "quitquitquit"
133// (which it doesn't reply to). It'll return the number of messages received,
134// not including any "quitquitquit" message, modulo 100.
135MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) {
136  ChannelThread channel_thread;
137  embedder::ScopedPlatformHandle client_platform_handle =
138      mojo::test::MultiprocessTestHelper::client_platform_handle.Pass();
139  CHECK(client_platform_handle.is_valid());
140  scoped_refptr<MessagePipe> mp(new MessagePipe(
141      scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
142      scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
143  channel_thread.Start(client_platform_handle.Pass(), mp);
144
145  const std::string quitquitquit("quitquitquit");
146  int rv = 0;
147  for (;; rv = (rv + 1) % 100) {
148    // Wait for our end of the message pipe to be readable.
149    MojoResult result = WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE);
150    if (result != MOJO_RESULT_OK) {
151      // It was closed, probably.
152      CHECK_EQ(result, MOJO_RESULT_FAILED_PRECONDITION);
153      break;
154    }
155
156    std::string read_buffer(1000, '\0');
157    uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
158    CHECK_EQ(mp->ReadMessage(0,
159                             &read_buffer[0], &read_buffer_size,
160                             NULL, NULL,
161                             MOJO_READ_MESSAGE_FLAG_NONE),
162             MOJO_RESULT_OK);
163    read_buffer.resize(read_buffer_size);
164    VLOG(2) << "Child got: " << read_buffer;
165
166    if (read_buffer == quitquitquit) {
167      VLOG(2) << "Child quitting.";
168      break;
169    }
170
171    std::string write_buffer = read_buffer + read_buffer;
172    CHECK_EQ(mp->WriteMessage(0,
173                              write_buffer.data(),
174                              static_cast<uint32_t>(write_buffer.size()),
175                              NULL,
176                              MOJO_WRITE_MESSAGE_FLAG_NONE),
177             MOJO_RESULT_OK);
178  }
179
180
181  mp->Close(0);
182  return rv;
183}
184
185// Sends "hello" to child, and expects "hellohello" back.
186TEST_F(MultiprocessMessagePipeTest, Basic) {
187  helper()->StartChild("EchoEcho");
188
189  scoped_refptr<MessagePipe> mp(new MessagePipe(
190      scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
191      scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
192  Init(mp);
193
194  std::string hello("hello");
195  EXPECT_EQ(MOJO_RESULT_OK,
196            mp->WriteMessage(0,
197                             hello.data(), static_cast<uint32_t>(hello.size()),
198                             NULL,
199                             MOJO_WRITE_MESSAGE_FLAG_NONE));
200
201  EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
202
203  std::string read_buffer(1000, '\0');
204  uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
205  CHECK_EQ(mp->ReadMessage(0,
206                           &read_buffer[0], &read_buffer_size,
207                           NULL, NULL,
208                           MOJO_READ_MESSAGE_FLAG_NONE),
209           MOJO_RESULT_OK);
210  read_buffer.resize(read_buffer_size);
211  VLOG(2) << "Parent got: " << read_buffer;
212  EXPECT_EQ(hello + hello, read_buffer);
213
214  mp->Close(0);
215
216  // We sent one message.
217  EXPECT_EQ(1 % 100, helper()->WaitForChildShutdown());
218}
219
220// Sends a bunch of messages to the child. Expects them "repeated" back. Waits
221// for the child to close its end before quitting.
222TEST_F(MultiprocessMessagePipeTest, QueueMessages) {
223  helper()->StartChild("EchoEcho");
224
225  scoped_refptr<MessagePipe> mp(new MessagePipe(
226      scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
227      scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
228  Init(mp);
229
230  static const size_t kNumMessages = 1001;
231  for (size_t i = 0; i < kNumMessages; i++) {
232    std::string write_buffer(i, 'A' + (i % 26));
233    EXPECT_EQ(MOJO_RESULT_OK,
234              mp->WriteMessage(0,
235                               write_buffer.data(),
236                               static_cast<uint32_t>(write_buffer.size()),
237                               NULL,
238                               MOJO_WRITE_MESSAGE_FLAG_NONE));
239  }
240
241  const std::string quitquitquit("quitquitquit");
242  EXPECT_EQ(MOJO_RESULT_OK,
243            mp->WriteMessage(0,
244                             quitquitquit.data(),
245                             static_cast<uint32_t>(quitquitquit.size()),
246                             NULL,
247                             MOJO_WRITE_MESSAGE_FLAG_NONE));
248
249  for (size_t i = 0; i < kNumMessages; i++) {
250    EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
251
252    std::string read_buffer(kNumMessages * 2, '\0');
253    uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
254    CHECK_EQ(mp->ReadMessage(0,
255                             &read_buffer[0], &read_buffer_size,
256                             NULL, NULL,
257                             MOJO_READ_MESSAGE_FLAG_NONE),
258             MOJO_RESULT_OK);
259    read_buffer.resize(read_buffer_size);
260
261    EXPECT_EQ(std::string(i * 2, 'A' + (i % 26)), read_buffer);
262  }
263
264  // Wait for it to become readable, which should fail (since we sent
265  // "quitquitquit").
266  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
267            WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
268
269  mp->Close(0);
270
271  EXPECT_EQ(static_cast<int>(kNumMessages % 100),
272            helper()->WaitForChildShutdown());
273}
274
275}  // namespace
276}  // namespace system
277}  // namespace mojo
278