1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/public/cpp/bindings/connector.h" 6 7#include <stddef.h> 8#include <stdlib.h> 9#include <string.h> 10#include <utility> 11 12#include "base/bind.h" 13#include "base/callback.h" 14#include "base/callback_helpers.h" 15#include "base/message_loop/message_loop.h" 16#include "base/run_loop.h" 17#include "base/threading/thread_task_runner_handle.h" 18#include "mojo/public/cpp/bindings/lib/message_builder.h" 19#include "mojo/public/cpp/bindings/tests/message_queue.h" 20#include "testing/gtest/include/gtest/gtest.h" 21 22namespace mojo { 23namespace test { 24namespace { 25 26class MessageAccumulator : public MessageReceiver { 27 public: 28 MessageAccumulator() {} 29 explicit MessageAccumulator(const base::Closure& closure) 30 : closure_(closure) {} 31 32 bool Accept(Message* message) override { 33 queue_.Push(message); 34 if (!closure_.is_null()) 35 base::ResetAndReturn(&closure_).Run(); 36 return true; 37 } 38 39 bool IsEmpty() const { return queue_.IsEmpty(); } 40 41 void Pop(Message* message) { queue_.Pop(message); } 42 43 void set_closure(const base::Closure& closure) { closure_ = closure; } 44 45 size_t size() const { return queue_.size(); } 46 47 private: 48 MessageQueue queue_; 49 base::Closure closure_; 50}; 51 52class ConnectorDeletingMessageAccumulator : public MessageAccumulator { 53 public: 54 ConnectorDeletingMessageAccumulator(Connector** connector) 55 : connector_(connector) {} 56 57 bool Accept(Message* message) override { 58 delete *connector_; 59 *connector_ = nullptr; 60 return MessageAccumulator::Accept(message); 61 } 62 63 private: 64 Connector** connector_; 65}; 66 67class ReentrantMessageAccumulator : public MessageAccumulator { 68 public: 69 ReentrantMessageAccumulator(Connector* connector) 70 : connector_(connector), number_of_calls_(0) {} 71 72 bool Accept(Message* message) override { 73 if (!MessageAccumulator::Accept(message)) 74 return false; 75 number_of_calls_++; 76 if (number_of_calls_ == 1) { 77 return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); 78 } 79 return true; 80 } 81 82 int number_of_calls() { return number_of_calls_; } 83 84 private: 85 Connector* connector_; 86 int number_of_calls_; 87}; 88 89class ConnectorTest : public testing::Test { 90 public: 91 ConnectorTest() {} 92 93 void SetUp() override { 94 CreateMessagePipe(nullptr, &handle0_, &handle1_); 95 } 96 97 void TearDown() override {} 98 99 void AllocMessage(const char* text, Message* message) { 100 size_t payload_size = strlen(text) + 1; // Plus null terminator. 101 internal::MessageBuilder builder(1, payload_size); 102 memcpy(builder.buffer()->Allocate(payload_size), text, payload_size); 103 104 builder.message()->MoveTo(message); 105 } 106 107 protected: 108 ScopedMessagePipeHandle handle0_; 109 ScopedMessagePipeHandle handle1_; 110 111 private: 112 base::MessageLoop loop_; 113}; 114 115TEST_F(ConnectorTest, Basic) { 116 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 117 base::ThreadTaskRunnerHandle::Get()); 118 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND, 119 base::ThreadTaskRunnerHandle::Get()); 120 121 const char kText[] = "hello world"; 122 123 Message message; 124 AllocMessage(kText, &message); 125 126 connector0.Accept(&message); 127 128 base::RunLoop run_loop; 129 MessageAccumulator accumulator(run_loop.QuitClosure()); 130 connector1.set_incoming_receiver(&accumulator); 131 132 run_loop.Run(); 133 134 ASSERT_FALSE(accumulator.IsEmpty()); 135 136 Message message_received; 137 accumulator.Pop(&message_received); 138 139 EXPECT_EQ( 140 std::string(kText), 141 std::string(reinterpret_cast<const char*>(message_received.payload()))); 142} 143 144TEST_F(ConnectorTest, Basic_Synchronous) { 145 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 146 base::ThreadTaskRunnerHandle::Get()); 147 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND, 148 base::ThreadTaskRunnerHandle::Get()); 149 150 const char kText[] = "hello world"; 151 152 Message message; 153 AllocMessage(kText, &message); 154 155 connector0.Accept(&message); 156 157 MessageAccumulator accumulator; 158 connector1.set_incoming_receiver(&accumulator); 159 160 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); 161 162 ASSERT_FALSE(accumulator.IsEmpty()); 163 164 Message message_received; 165 accumulator.Pop(&message_received); 166 167 EXPECT_EQ( 168 std::string(kText), 169 std::string(reinterpret_cast<const char*>(message_received.payload()))); 170} 171 172TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) { 173 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 174 base::ThreadTaskRunnerHandle::Get()); 175 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND, 176 base::ThreadTaskRunnerHandle::Get()); 177 178 base::RunLoop run_loop; 179 MessageAccumulator accumulator(run_loop.QuitClosure()); 180 connector1.set_incoming_receiver(&accumulator); 181 182 const char kText[] = "hello world"; 183 184 Message message; 185 AllocMessage(kText, &message); 186 187 connector0.Accept(&message); 188 189 run_loop.Run(); 190 191 ASSERT_FALSE(accumulator.IsEmpty()); 192 193 Message message_received; 194 accumulator.Pop(&message_received); 195 196 EXPECT_EQ( 197 std::string(kText), 198 std::string(reinterpret_cast<const char*>(message_received.payload()))); 199} 200 201TEST_F(ConnectorTest, Basic_TwoMessages) { 202 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 203 base::ThreadTaskRunnerHandle::Get()); 204 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND, 205 base::ThreadTaskRunnerHandle::Get()); 206 207 const char* kText[] = {"hello", "world"}; 208 209 for (size_t i = 0; i < arraysize(kText); ++i) { 210 Message message; 211 AllocMessage(kText[i], &message); 212 213 connector0.Accept(&message); 214 } 215 216 MessageAccumulator accumulator; 217 connector1.set_incoming_receiver(&accumulator); 218 219 for (size_t i = 0; i < arraysize(kText); ++i) { 220 if (accumulator.IsEmpty()) { 221 base::RunLoop run_loop; 222 accumulator.set_closure(run_loop.QuitClosure()); 223 run_loop.Run(); 224 } 225 ASSERT_FALSE(accumulator.IsEmpty()); 226 227 Message message_received; 228 accumulator.Pop(&message_received); 229 230 EXPECT_EQ( 231 std::string(kText[i]), 232 std::string(reinterpret_cast<const char*>(message_received.payload()))); 233 } 234} 235 236TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) { 237 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 238 base::ThreadTaskRunnerHandle::Get()); 239 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND, 240 base::ThreadTaskRunnerHandle::Get()); 241 242 const char* kText[] = {"hello", "world"}; 243 244 for (size_t i = 0; i < arraysize(kText); ++i) { 245 Message message; 246 AllocMessage(kText[i], &message); 247 248 connector0.Accept(&message); 249 } 250 251 MessageAccumulator accumulator; 252 connector1.set_incoming_receiver(&accumulator); 253 254 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); 255 256 ASSERT_FALSE(accumulator.IsEmpty()); 257 258 Message message_received; 259 accumulator.Pop(&message_received); 260 261 EXPECT_EQ( 262 std::string(kText[0]), 263 std::string(reinterpret_cast<const char*>(message_received.payload()))); 264 265 ASSERT_TRUE(accumulator.IsEmpty()); 266} 267 268TEST_F(ConnectorTest, WriteToClosedPipe) { 269 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 270 base::ThreadTaskRunnerHandle::Get()); 271 272 const char kText[] = "hello world"; 273 274 Message message; 275 AllocMessage(kText, &message); 276 277 // Close the other end of the pipe. 278 handle1_.reset(); 279 280 // Not observed yet because we haven't spun the message loop yet. 281 EXPECT_FALSE(connector0.encountered_error()); 282 283 // Write failures are not reported. 284 bool ok = connector0.Accept(&message); 285 EXPECT_TRUE(ok); 286 287 // Still not observed. 288 EXPECT_FALSE(connector0.encountered_error()); 289 290 // Spin the message loop, and then we should start observing the closed pipe. 291 base::RunLoop run_loop; 292 connector0.set_connection_error_handler(run_loop.QuitClosure()); 293 run_loop.Run(); 294 295 EXPECT_TRUE(connector0.encountered_error()); 296} 297 298TEST_F(ConnectorTest, MessageWithHandles) { 299 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 300 base::ThreadTaskRunnerHandle::Get()); 301 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND, 302 base::ThreadTaskRunnerHandle::Get()); 303 304 const char kText[] = "hello world"; 305 306 Message message1; 307 AllocMessage(kText, &message1); 308 309 MessagePipe pipe; 310 message1.mutable_handles()->push_back(pipe.handle0.release()); 311 312 connector0.Accept(&message1); 313 314 // The message should have been transferred, releasing the handles. 315 EXPECT_TRUE(message1.handles()->empty()); 316 317 base::RunLoop run_loop; 318 MessageAccumulator accumulator(run_loop.QuitClosure()); 319 connector1.set_incoming_receiver(&accumulator); 320 321 run_loop.Run(); 322 323 ASSERT_FALSE(accumulator.IsEmpty()); 324 325 Message message_received; 326 accumulator.Pop(&message_received); 327 328 EXPECT_EQ( 329 std::string(kText), 330 std::string(reinterpret_cast<const char*>(message_received.payload()))); 331 ASSERT_EQ(1U, message_received.handles()->size()); 332 333 // Now send a message to the transferred handle and confirm it's sent through 334 // to the orginal pipe. 335 // TODO(vtl): Do we need a better way of "downcasting" the handle types? 336 ScopedMessagePipeHandle smph; 337 smph.reset(MessagePipeHandle(message_received.handles()->front().value())); 338 message_received.mutable_handles()->front() = Handle(); 339 // |smph| now owns this handle. 340 341 Connector connector_received(std::move(smph), Connector::SINGLE_THREADED_SEND, 342 base::ThreadTaskRunnerHandle::Get()); 343 Connector connector_original(std::move(pipe.handle1), 344 Connector::SINGLE_THREADED_SEND, 345 base::ThreadTaskRunnerHandle::Get()); 346 347 Message message2; 348 AllocMessage(kText, &message2); 349 350 connector_received.Accept(&message2); 351 base::RunLoop run_loop2; 352 MessageAccumulator accumulator2(run_loop2.QuitClosure()); 353 connector_original.set_incoming_receiver(&accumulator2); 354 run_loop2.Run(); 355 356 ASSERT_FALSE(accumulator2.IsEmpty()); 357 358 accumulator2.Pop(&message_received); 359 360 EXPECT_EQ( 361 std::string(kText), 362 std::string(reinterpret_cast<const char*>(message_received.payload()))); 363} 364 365TEST_F(ConnectorTest, WaitForIncomingMessageWithError) { 366 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 367 base::ThreadTaskRunnerHandle::Get()); 368 // Close the other end of the pipe. 369 handle1_.reset(); 370 ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE)); 371} 372 373TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) { 374 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 375 base::ThreadTaskRunnerHandle::Get()); 376 Connector* connector1 = 377 new Connector(std::move(handle1_), Connector::SINGLE_THREADED_SEND, 378 base::ThreadTaskRunnerHandle::Get()); 379 380 const char kText[] = "hello world"; 381 382 Message message; 383 AllocMessage(kText, &message); 384 385 connector0.Accept(&message); 386 387 ConnectorDeletingMessageAccumulator accumulator(&connector1); 388 connector1->set_incoming_receiver(&accumulator); 389 390 connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); 391 392 ASSERT_FALSE(connector1); 393 ASSERT_FALSE(accumulator.IsEmpty()); 394 395 Message message_received; 396 accumulator.Pop(&message_received); 397 398 EXPECT_EQ( 399 std::string(kText), 400 std::string(reinterpret_cast<const char*>(message_received.payload()))); 401} 402 403TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) { 404 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 405 base::ThreadTaskRunnerHandle::Get()); 406 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND, 407 base::ThreadTaskRunnerHandle::Get()); 408 409 const char* kText[] = {"hello", "world"}; 410 411 for (size_t i = 0; i < arraysize(kText); ++i) { 412 Message message; 413 AllocMessage(kText[i], &message); 414 415 connector0.Accept(&message); 416 } 417 418 ReentrantMessageAccumulator accumulator(&connector1); 419 connector1.set_incoming_receiver(&accumulator); 420 421 for (size_t i = 0; i < arraysize(kText); ++i) { 422 if (accumulator.IsEmpty()) { 423 base::RunLoop run_loop; 424 accumulator.set_closure(run_loop.QuitClosure()); 425 run_loop.Run(); 426 } 427 ASSERT_FALSE(accumulator.IsEmpty()); 428 429 Message message_received; 430 accumulator.Pop(&message_received); 431 432 EXPECT_EQ( 433 std::string(kText[i]), 434 std::string(reinterpret_cast<const char*>(message_received.payload()))); 435 } 436 437 ASSERT_EQ(2, accumulator.number_of_calls()); 438} 439 440void ForwardErrorHandler(bool* called, const base::Closure& callback) { 441 *called = true; 442 callback.Run(); 443} 444 445TEST_F(ConnectorTest, RaiseError) { 446 base::RunLoop run_loop, run_loop2; 447 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 448 base::ThreadTaskRunnerHandle::Get()); 449 bool error_handler_called0 = false; 450 connector0.set_connection_error_handler( 451 base::Bind(&ForwardErrorHandler, &error_handler_called0, 452 run_loop.QuitClosure())); 453 454 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND, 455 base::ThreadTaskRunnerHandle::Get()); 456 bool error_handler_called1 = false; 457 connector1.set_connection_error_handler( 458 base::Bind(&ForwardErrorHandler, &error_handler_called1, 459 run_loop2.QuitClosure())); 460 461 const char kText[] = "hello world"; 462 463 Message message; 464 AllocMessage(kText, &message); 465 466 connector0.Accept(&message); 467 connector0.RaiseError(); 468 469 base::RunLoop run_loop3; 470 MessageAccumulator accumulator(run_loop3.QuitClosure()); 471 connector1.set_incoming_receiver(&accumulator); 472 473 run_loop3.Run(); 474 475 // Messages sent prior to RaiseError() still arrive at the other end. 476 ASSERT_FALSE(accumulator.IsEmpty()); 477 478 Message message_received; 479 accumulator.Pop(&message_received); 480 481 EXPECT_EQ( 482 std::string(kText), 483 std::string(reinterpret_cast<const char*>(message_received.payload()))); 484 485 run_loop.Run(); 486 run_loop2.Run(); 487 488 // Connection error handler is called at both sides. 489 EXPECT_TRUE(error_handler_called0); 490 EXPECT_TRUE(error_handler_called1); 491 492 // The error flag is set at both sides. 493 EXPECT_TRUE(connector0.encountered_error()); 494 EXPECT_TRUE(connector1.encountered_error()); 495 496 // The message pipe handle is valid at both sides. 497 EXPECT_TRUE(connector0.is_valid()); 498 EXPECT_TRUE(connector1.is_valid()); 499} 500 501void PauseConnectorAndRunClosure(Connector* connector, 502 const base::Closure& closure) { 503 connector->PauseIncomingMethodCallProcessing(); 504 closure.Run(); 505} 506 507TEST_F(ConnectorTest, PauseWithQueuedMessages) { 508 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 509 base::ThreadTaskRunnerHandle::Get()); 510 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND, 511 base::ThreadTaskRunnerHandle::Get()); 512 513 const char kText[] = "hello world"; 514 515 // Queue up two messages. 516 Message message; 517 AllocMessage(kText, &message); 518 connector0.Accept(&message); 519 AllocMessage(kText, &message); 520 connector0.Accept(&message); 521 522 base::RunLoop run_loop; 523 // Configure the accumulator such that it pauses after the first message is 524 // received. 525 MessageAccumulator accumulator( 526 base::Bind(&PauseConnectorAndRunClosure, &connector1, 527 run_loop.QuitClosure())); 528 connector1.set_incoming_receiver(&accumulator); 529 530 run_loop.Run(); 531 532 // As we paused after the first message we should only have gotten one 533 // message. 534 ASSERT_EQ(1u, accumulator.size()); 535} 536 537void AccumulateWithNestedLoop(MessageAccumulator* accumulator, 538 const base::Closure& closure) { 539 base::RunLoop nested_run_loop; 540 base::MessageLoop::ScopedNestableTaskAllower allow( 541 base::MessageLoop::current()); 542 accumulator->set_closure(nested_run_loop.QuitClosure()); 543 nested_run_loop.Run(); 544 closure.Run(); 545} 546 547TEST_F(ConnectorTest, ProcessWhenNested) { 548 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND, 549 base::ThreadTaskRunnerHandle::Get()); 550 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND, 551 base::ThreadTaskRunnerHandle::Get()); 552 553 const char kText[] = "hello world"; 554 555 // Queue up two messages. 556 Message message; 557 AllocMessage(kText, &message); 558 connector0.Accept(&message); 559 AllocMessage(kText, &message); 560 connector0.Accept(&message); 561 562 base::RunLoop run_loop; 563 MessageAccumulator accumulator; 564 // When the accumulator gets the first message it spins a nested message 565 // loop. The loop is quit when another message is received. 566 accumulator.set_closure(base::Bind(&AccumulateWithNestedLoop, &accumulator, 567 run_loop.QuitClosure())); 568 connector1.set_incoming_receiver(&accumulator); 569 570 run_loop.Run(); 571 572 ASSERT_EQ(2u, accumulator.size()); 573} 574 575} // namespace 576} // namespace test 577} // namespace mojo 578