1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include <stdlib.h> 6#include <string.h> 7 8#include "mojo/public/cpp/bindings/lib/connector.h" 9#include "mojo/public/cpp/bindings/lib/message_builder.h" 10#include "mojo/public/cpp/bindings/lib/message_queue.h" 11#include "mojo/public/cpp/environment/environment.h" 12#include "mojo/public/cpp/system/macros.h" 13#include "mojo/public/cpp/utility/run_loop.h" 14#include "testing/gtest/include/gtest/gtest.h" 15 16namespace mojo { 17namespace test { 18namespace { 19 20class MessageAccumulator : public MessageReceiver { 21 public: 22 MessageAccumulator() { 23 } 24 25 virtual bool Accept(Message* message) MOJO_OVERRIDE { 26 queue_.Push(message); 27 return true; 28 } 29 30 bool IsEmpty() const { 31 return queue_.IsEmpty(); 32 } 33 34 void Pop(Message* message) { 35 queue_.Pop(message); 36 } 37 38 private: 39 internal::MessageQueue queue_; 40}; 41 42class ConnectorDeletingMessageAccumulator : public MessageAccumulator { 43 public: 44 ConnectorDeletingMessageAccumulator(internal::Connector** connector) 45 : connector_(connector) {} 46 47 virtual bool Accept(Message* message) MOJO_OVERRIDE { 48 delete *connector_; 49 *connector_ = 0; 50 return MessageAccumulator::Accept(message); 51 } 52 53 private: 54 internal::Connector** connector_; 55}; 56 57class ReentrantMessageAccumulator : public MessageAccumulator { 58 public: 59 ReentrantMessageAccumulator(internal::Connector* connector) 60 : connector_(connector), number_of_calls_(0) {} 61 62 virtual bool Accept(Message* message) MOJO_OVERRIDE { 63 if (!MessageAccumulator::Accept(message)) 64 return false; 65 number_of_calls_++; 66 if (number_of_calls_ == 1) { 67 return connector_->WaitForIncomingMessage(); 68 } 69 return true; 70 } 71 72 int number_of_calls() { return number_of_calls_; } 73 74 private: 75 internal::Connector* connector_; 76 int number_of_calls_; 77}; 78 79class ConnectorTest : public testing::Test { 80 public: 81 ConnectorTest() { 82 } 83 84 virtual void SetUp() MOJO_OVERRIDE { 85 CreateMessagePipe(NULL, &handle0_, &handle1_); 86 } 87 88 virtual void TearDown() MOJO_OVERRIDE { 89 } 90 91 void AllocMessage(const char* text, Message* message) { 92 size_t payload_size = strlen(text) + 1; // Plus null terminator. 93 internal::MessageBuilder builder(1, payload_size); 94 memcpy(builder.buffer()->Allocate(payload_size), text, payload_size); 95 builder.Finish(message); 96 } 97 98 void PumpMessages() { 99 loop_.RunUntilIdle(); 100 } 101 102 protected: 103 ScopedMessagePipeHandle handle0_; 104 ScopedMessagePipeHandle handle1_; 105 106 private: 107 Environment env_; 108 RunLoop loop_; 109}; 110 111TEST_F(ConnectorTest, Basic) { 112 internal::Connector connector0(handle0_.Pass()); 113 internal::Connector connector1(handle1_.Pass()); 114 115 const char kText[] = "hello world"; 116 117 Message message; 118 AllocMessage(kText, &message); 119 120 connector0.Accept(&message); 121 122 MessageAccumulator accumulator; 123 connector1.set_incoming_receiver(&accumulator); 124 125 PumpMessages(); 126 127 ASSERT_FALSE(accumulator.IsEmpty()); 128 129 Message message_received; 130 accumulator.Pop(&message_received); 131 132 EXPECT_EQ( 133 std::string(kText), 134 std::string(reinterpret_cast<const char*>(message_received.payload()))); 135} 136 137TEST_F(ConnectorTest, Basic_Synchronous) { 138 internal::Connector connector0(handle0_.Pass()); 139 internal::Connector connector1(handle1_.Pass()); 140 141 const char kText[] = "hello world"; 142 143 Message message; 144 AllocMessage(kText, &message); 145 146 connector0.Accept(&message); 147 148 MessageAccumulator accumulator; 149 connector1.set_incoming_receiver(&accumulator); 150 151 connector1.WaitForIncomingMessage(); 152 153 ASSERT_FALSE(accumulator.IsEmpty()); 154 155 Message message_received; 156 accumulator.Pop(&message_received); 157 158 EXPECT_EQ( 159 std::string(kText), 160 std::string(reinterpret_cast<const char*>(message_received.payload()))); 161} 162 163TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) { 164 internal::Connector connector0(handle0_.Pass()); 165 internal::Connector connector1(handle1_.Pass()); 166 167 MessageAccumulator accumulator; 168 connector1.set_incoming_receiver(&accumulator); 169 170 const char kText[] = "hello world"; 171 172 Message message; 173 AllocMessage(kText, &message); 174 175 connector0.Accept(&message); 176 177 PumpMessages(); 178 179 ASSERT_FALSE(accumulator.IsEmpty()); 180 181 Message message_received; 182 accumulator.Pop(&message_received); 183 184 EXPECT_EQ( 185 std::string(kText), 186 std::string(reinterpret_cast<const char*>(message_received.payload()))); 187} 188 189TEST_F(ConnectorTest, Basic_TwoMessages) { 190 internal::Connector connector0(handle0_.Pass()); 191 internal::Connector connector1(handle1_.Pass()); 192 193 const char* kText[] = { "hello", "world" }; 194 195 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { 196 Message message; 197 AllocMessage(kText[i], &message); 198 199 connector0.Accept(&message); 200 } 201 202 MessageAccumulator accumulator; 203 connector1.set_incoming_receiver(&accumulator); 204 205 PumpMessages(); 206 207 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { 208 ASSERT_FALSE(accumulator.IsEmpty()); 209 210 Message message_received; 211 accumulator.Pop(&message_received); 212 213 EXPECT_EQ( 214 std::string(kText[i]), 215 std::string(reinterpret_cast<const char*>(message_received.payload()))); 216 } 217} 218 219TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) { 220 internal::Connector connector0(handle0_.Pass()); 221 internal::Connector connector1(handle1_.Pass()); 222 223 const char* kText[] = { "hello", "world" }; 224 225 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { 226 Message message; 227 AllocMessage(kText[i], &message); 228 229 connector0.Accept(&message); 230 } 231 232 MessageAccumulator accumulator; 233 connector1.set_incoming_receiver(&accumulator); 234 235 connector1.WaitForIncomingMessage(); 236 237 ASSERT_FALSE(accumulator.IsEmpty()); 238 239 Message message_received; 240 accumulator.Pop(&message_received); 241 242 EXPECT_EQ( 243 std::string(kText[0]), 244 std::string(reinterpret_cast<const char*>(message_received.payload()))); 245 246 ASSERT_TRUE(accumulator.IsEmpty()); 247} 248 249TEST_F(ConnectorTest, WriteToClosedPipe) { 250 internal::Connector connector0(handle0_.Pass()); 251 252 const char kText[] = "hello world"; 253 254 Message message; 255 AllocMessage(kText, &message); 256 257 // Close the other end of the pipe. 258 handle1_.reset(); 259 260 // Not observed yet because we haven't spun the RunLoop yet. 261 EXPECT_FALSE(connector0.encountered_error()); 262 263 // Write failures are not reported. 264 bool ok = connector0.Accept(&message); 265 EXPECT_TRUE(ok); 266 267 // Still not observed. 268 EXPECT_FALSE(connector0.encountered_error()); 269 270 // Spin the RunLoop, and then we should start observing the closed pipe. 271 PumpMessages(); 272 273 EXPECT_TRUE(connector0.encountered_error()); 274} 275 276TEST_F(ConnectorTest, MessageWithHandles) { 277 internal::Connector connector0(handle0_.Pass()); 278 internal::Connector connector1(handle1_.Pass()); 279 280 const char kText[] = "hello world"; 281 282 Message message1; 283 AllocMessage(kText, &message1); 284 285 MessagePipe pipe; 286 message1.mutable_handles()->push_back(pipe.handle0.release()); 287 288 connector0.Accept(&message1); 289 290 // The message should have been transferred, releasing the handles. 291 EXPECT_TRUE(message1.handles()->empty()); 292 293 MessageAccumulator accumulator; 294 connector1.set_incoming_receiver(&accumulator); 295 296 PumpMessages(); 297 298 ASSERT_FALSE(accumulator.IsEmpty()); 299 300 Message message_received; 301 accumulator.Pop(&message_received); 302 303 EXPECT_EQ( 304 std::string(kText), 305 std::string(reinterpret_cast<const char*>(message_received.payload()))); 306 ASSERT_EQ(1U, message_received.handles()->size()); 307 308 // Now send a message to the transferred handle and confirm it's sent through 309 // to the orginal pipe. 310 // TODO(vtl): Do we need a better way of "downcasting" the handle types? 311 ScopedMessagePipeHandle smph; 312 smph.reset(MessagePipeHandle(message_received.handles()->front().value())); 313 message_received.mutable_handles()->front() = Handle(); 314 // |smph| now owns this handle. 315 316 internal::Connector connector_received(smph.Pass()); 317 internal::Connector connector_original(pipe.handle1.Pass()); 318 319 Message message2; 320 AllocMessage(kText, &message2); 321 322 connector_received.Accept(&message2); 323 connector_original.set_incoming_receiver(&accumulator); 324 PumpMessages(); 325 326 ASSERT_FALSE(accumulator.IsEmpty()); 327 328 accumulator.Pop(&message_received); 329 330 EXPECT_EQ( 331 std::string(kText), 332 std::string(reinterpret_cast<const char*>(message_received.payload()))); 333} 334 335TEST_F(ConnectorTest, WaitForIncomingMessageWithError) { 336 internal::Connector connector0(handle0_.Pass()); 337 // Close the other end of the pipe. 338 handle1_.reset(); 339 ASSERT_FALSE(connector0.WaitForIncomingMessage()); 340} 341 342TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) { 343 internal::Connector connector0(handle0_.Pass()); 344 internal::Connector* connector1 = new internal::Connector(handle1_.Pass()); 345 346 const char kText[] = "hello world"; 347 348 Message message; 349 AllocMessage(kText, &message); 350 351 connector0.Accept(&message); 352 353 ConnectorDeletingMessageAccumulator accumulator(&connector1); 354 connector1->set_incoming_receiver(&accumulator); 355 356 connector1->WaitForIncomingMessage(); 357 358 ASSERT_FALSE(connector1); 359 ASSERT_FALSE(accumulator.IsEmpty()); 360 361 Message message_received; 362 accumulator.Pop(&message_received); 363 364 EXPECT_EQ( 365 std::string(kText), 366 std::string(reinterpret_cast<const char*>(message_received.payload()))); 367} 368 369TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) { 370 internal::Connector connector0(handle0_.Pass()); 371 internal::Connector connector1(handle1_.Pass()); 372 373 const char* kText[] = { "hello", "world" }; 374 375 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { 376 Message message; 377 AllocMessage(kText[i], &message); 378 379 connector0.Accept(&message); 380 } 381 382 ReentrantMessageAccumulator accumulator(&connector1); 383 connector1.set_incoming_receiver(&accumulator); 384 385 PumpMessages(); 386 387 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { 388 ASSERT_FALSE(accumulator.IsEmpty()); 389 390 Message message_received; 391 accumulator.Pop(&message_received); 392 393 EXPECT_EQ( 394 std::string(kText[i]), 395 std::string(reinterpret_cast<const char*>(message_received.payload()))); 396 } 397 398 ASSERT_EQ(2, accumulator.number_of_calls()); 399} 400 401} // namespace 402} // namespace test 403} // namespace mojo 404