multiprocess_message_pipe_unittest.cc revision c5cede9ae108bb15f6b7a8aea21c7e1fefa2834c
15f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)// Copyright 2013 The Chromium Authors. All rights reserved. 25f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)// Use of this source code is governed by a BSD-style license that can be 35f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)// found in the LICENSE file. 45f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles) 55f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include <stdint.h> 65f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles) 75f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include <string> 85f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles) 95f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "base/basictypes.h" 105f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "base/bind.h" 115f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "base/location.h" 125f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "base/logging.h" 135f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "base/threading/platform_thread.h" // For |Sleep()|. 145f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/common/test/multiprocess_test_helper.h" 155f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/embedder/scoped_platform_handle.h" 165f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/channel.h" 175f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/local_message_pipe_endpoint.h" 185f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/message_pipe.h" 195f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/proxy_message_pipe_endpoint.h" 205f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/raw_channel.h" 215f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/test_utils.h" 225f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "mojo/system/waiter.h" 235f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)#include "testing/gtest/include/gtest/gtest.h" 245f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles) 255f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)namespace mojo { 265f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)namespace system { 275f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)namespace { 285f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles) 295f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles)class ChannelThread { 305f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles) public: 315f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles) ChannelThread() : test_io_thread_(test::TestIOThread::kManualStart) {} 325f1c94371a64b3196d4be9466099bb892df9b88eTorne (Richard Coles) ~ChannelThread() { 33 Stop(); 34 } 35 36 void Start(embedder::ScopedPlatformHandle platform_handle, 37 scoped_refptr<MessagePipe> message_pipe) { 38 test_io_thread_.Start(); 39 test_io_thread_.PostTaskAndWait( 40 FROM_HERE, 41 base::Bind(&ChannelThread::InitChannelOnIOThread, 42 base::Unretained(this), base::Passed(&platform_handle), 43 message_pipe)); 44 } 45 46 void Stop() { 47 if (channel_) { 48 // Hack to flush write buffers before quitting. 49 // TODO(vtl): Remove this once |Channel| has a 50 // |FlushWriteBufferAndShutdown()| (or whatever). 51 while (!channel_->IsWriteBufferEmpty()) 52 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20)); 53 54 test_io_thread_.PostTaskAndWait( 55 FROM_HERE, 56 base::Bind(&ChannelThread::ShutdownChannelOnIOThread, 57 base::Unretained(this))); 58 } 59 test_io_thread_.Stop(); 60 } 61 62 private: 63 void InitChannelOnIOThread(embedder::ScopedPlatformHandle platform_handle, 64 scoped_refptr<MessagePipe> message_pipe) { 65 CHECK_EQ(base::MessageLoop::current(), test_io_thread_.message_loop()); 66 CHECK(platform_handle.is_valid()); 67 68 // Create and initialize |Channel|. 69 channel_ = new Channel(); 70 CHECK(channel_->Init(RawChannel::Create(platform_handle.Pass()))); 71 72 // Attach the message pipe endpoint. 73 // Note: On the "server" (parent process) side, we need not attach the 74 // message pipe endpoint immediately. However, on the "client" (child 75 // process) side, this *must* be done here -- otherwise, the |Channel| may 76 // receive/process messages (which it can do as soon as it's hooked up to 77 // the IO thread message loop, and that message loop runs) before the 78 // message pipe endpoint is attached. 79 CHECK_EQ(channel_->AttachMessagePipeEndpoint(message_pipe, 1), 80 Channel::kBootstrapEndpointId); 81 channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId, 82 Channel::kBootstrapEndpointId); 83 } 84 85 void ShutdownChannelOnIOThread() { 86 CHECK(channel_.get()); 87 channel_->Shutdown(); 88 channel_ = NULL; 89 } 90 91 test::TestIOThread test_io_thread_; 92 scoped_refptr<Channel> channel_; 93 94 DISALLOW_COPY_AND_ASSIGN(ChannelThread); 95}; 96 97class MultiprocessMessagePipeTest : public testing::Test { 98 public: 99 MultiprocessMessagePipeTest() {} 100 virtual ~MultiprocessMessagePipeTest() {} 101 102 protected: 103 void Init(scoped_refptr<MessagePipe> mp) { 104 channel_thread_.Start(helper_.server_platform_handle.Pass(), mp); 105 } 106 107 mojo::test::MultiprocessTestHelper* helper() { return &helper_; } 108 109 private: 110 ChannelThread channel_thread_; 111 mojo::test::MultiprocessTestHelper helper_; 112 113 DISALLOW_COPY_AND_ASSIGN(MultiprocessMessagePipeTest); 114}; 115 116MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp, MojoWaitFlags flags) { 117 Waiter waiter; 118 waiter.Init(); 119 120 MojoResult add_result = mp->AddWaiter(0, &waiter, flags, MOJO_RESULT_OK); 121 if (add_result != MOJO_RESULT_OK) { 122 return (add_result == MOJO_RESULT_ALREADY_EXISTS) ? MOJO_RESULT_OK : 123 add_result; 124 } 125 126 MojoResult wait_result = waiter.Wait(MOJO_DEADLINE_INDEFINITE); 127 mp->RemoveWaiter(0, &waiter); 128 return wait_result; 129} 130 131// For each message received, sends a reply message with the same contents 132// repeated twice, until the other end is closed or it receives "quitquitquit" 133// (which it doesn't reply to). It'll return the number of messages received, 134// not including any "quitquitquit" message, modulo 100. 135MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) { 136 ChannelThread channel_thread; 137 embedder::ScopedPlatformHandle client_platform_handle = 138 mojo::test::MultiprocessTestHelper::client_platform_handle.Pass(); 139 CHECK(client_platform_handle.is_valid()); 140 scoped_refptr<MessagePipe> mp(new MessagePipe( 141 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 142 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 143 channel_thread.Start(client_platform_handle.Pass(), mp); 144 145 const std::string quitquitquit("quitquitquit"); 146 int rv = 0; 147 for (;; rv = (rv + 1) % 100) { 148 // Wait for our end of the message pipe to be readable. 149 MojoResult result = WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE); 150 if (result != MOJO_RESULT_OK) { 151 // It was closed, probably. 152 CHECK_EQ(result, MOJO_RESULT_FAILED_PRECONDITION); 153 break; 154 } 155 156 std::string read_buffer(1000, '\0'); 157 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size()); 158 CHECK_EQ(mp->ReadMessage(0, 159 &read_buffer[0], &read_buffer_size, 160 NULL, NULL, 161 MOJO_READ_MESSAGE_FLAG_NONE), 162 MOJO_RESULT_OK); 163 read_buffer.resize(read_buffer_size); 164 VLOG(2) << "Child got: " << read_buffer; 165 166 if (read_buffer == quitquitquit) { 167 VLOG(2) << "Child quitting."; 168 break; 169 } 170 171 std::string write_buffer = read_buffer + read_buffer; 172 CHECK_EQ(mp->WriteMessage(0, 173 write_buffer.data(), 174 static_cast<uint32_t>(write_buffer.size()), 175 NULL, 176 MOJO_WRITE_MESSAGE_FLAG_NONE), 177 MOJO_RESULT_OK); 178 } 179 180 181 mp->Close(0); 182 return rv; 183} 184 185// Sends "hello" to child, and expects "hellohello" back. 186TEST_F(MultiprocessMessagePipeTest, Basic) { 187 helper()->StartChild("EchoEcho"); 188 189 scoped_refptr<MessagePipe> mp(new MessagePipe( 190 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 191 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 192 Init(mp); 193 194 std::string hello("hello"); 195 EXPECT_EQ(MOJO_RESULT_OK, 196 mp->WriteMessage(0, 197 hello.data(), static_cast<uint32_t>(hello.size()), 198 NULL, 199 MOJO_WRITE_MESSAGE_FLAG_NONE)); 200 201 EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE)); 202 203 std::string read_buffer(1000, '\0'); 204 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size()); 205 CHECK_EQ(mp->ReadMessage(0, 206 &read_buffer[0], &read_buffer_size, 207 NULL, NULL, 208 MOJO_READ_MESSAGE_FLAG_NONE), 209 MOJO_RESULT_OK); 210 read_buffer.resize(read_buffer_size); 211 VLOG(2) << "Parent got: " << read_buffer; 212 EXPECT_EQ(hello + hello, read_buffer); 213 214 mp->Close(0); 215 216 // We sent one message. 217 EXPECT_EQ(1 % 100, helper()->WaitForChildShutdown()); 218} 219 220// Sends a bunch of messages to the child. Expects them "repeated" back. Waits 221// for the child to close its end before quitting. 222TEST_F(MultiprocessMessagePipeTest, QueueMessages) { 223 helper()->StartChild("EchoEcho"); 224 225 scoped_refptr<MessagePipe> mp(new MessagePipe( 226 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), 227 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); 228 Init(mp); 229 230 static const size_t kNumMessages = 1001; 231 for (size_t i = 0; i < kNumMessages; i++) { 232 std::string write_buffer(i, 'A' + (i % 26)); 233 EXPECT_EQ(MOJO_RESULT_OK, 234 mp->WriteMessage(0, 235 write_buffer.data(), 236 static_cast<uint32_t>(write_buffer.size()), 237 NULL, 238 MOJO_WRITE_MESSAGE_FLAG_NONE)); 239 } 240 241 const std::string quitquitquit("quitquitquit"); 242 EXPECT_EQ(MOJO_RESULT_OK, 243 mp->WriteMessage(0, 244 quitquitquit.data(), 245 static_cast<uint32_t>(quitquitquit.size()), 246 NULL, 247 MOJO_WRITE_MESSAGE_FLAG_NONE)); 248 249 for (size_t i = 0; i < kNumMessages; i++) { 250 EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE)); 251 252 std::string read_buffer(kNumMessages * 2, '\0'); 253 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size()); 254 CHECK_EQ(mp->ReadMessage(0, 255 &read_buffer[0], &read_buffer_size, 256 NULL, NULL, 257 MOJO_READ_MESSAGE_FLAG_NONE), 258 MOJO_RESULT_OK); 259 read_buffer.resize(read_buffer_size); 260 261 EXPECT_EQ(std::string(i * 2, 'A' + (i % 26)), read_buffer); 262 } 263 264 // Wait for it to become readable, which should fail (since we sent 265 // "quitquitquit"). 266 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 267 WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE)); 268 269 mp->Close(0); 270 271 EXPECT_EQ(static_cast<int>(kNumMessages % 100), 272 helper()->WaitForChildShutdown()); 273} 274 275} // namespace 276} // namespace system 277} // namespace mojo 278