1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5// NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a 6// heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to 7// increase tolerance and reduce observed flakiness (though doing so reduces the 8// meaningfulness of the test). 9 10#include "mojo/system/message_pipe_dispatcher.h" 11 12#include <string.h> 13 14#include <limits> 15 16#include "base/memory/ref_counted.h" 17#include "base/memory/scoped_vector.h" 18#include "base/rand_util.h" 19#include "base/threading/platform_thread.h" // For |Sleep()|. 20#include "base/threading/simple_thread.h" 21#include "base/time/time.h" 22#include "mojo/system/message_pipe.h" 23#include "mojo/system/test_utils.h" 24#include "mojo/system/waiter.h" 25#include "mojo/system/waiter_test_utils.h" 26#include "testing/gtest/include/gtest/gtest.h" 27 28namespace mojo { 29namespace system { 30namespace { 31 32TEST(MessagePipeDispatcherTest, Basic) { 33 test::Stopwatch stopwatch; 34 int32_t buffer[1]; 35 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); 36 uint32_t buffer_size; 37 38 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. 39 for (unsigned i = 0; i < 2; i++) { 40 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( 41 MessagePipeDispatcher::kDefaultCreateOptions)); 42 EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType()); 43 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( 44 MessagePipeDispatcher::kDefaultCreateOptions)); 45 { 46 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); 47 d0->Init(mp, i); // 0, 1. 48 d1->Init(mp, i ^ 1); // 1, 0. 49 } 50 Waiter w; 51 uint32_t context = 0; 52 HandleSignalsState hss; 53 54 // Try adding a writable waiter when already writable. 55 w.Init(); 56 hss = HandleSignalsState(); 57 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 58 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); 59 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 60 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 61 hss.satisfiable_signals); 62 // Shouldn't need to remove the waiter (it was not added). 63 64 // Add a readable waiter to |d0|, then make it readable (by writing to 65 // |d1|), then wait. 66 w.Init(); 67 ASSERT_EQ(MOJO_RESULT_OK, 68 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr)); 69 buffer[0] = 123456789; 70 EXPECT_EQ(MOJO_RESULT_OK, 71 d1->WriteMessage(UserPointer<const void>(buffer), 72 kBufferSize, 73 nullptr, 74 MOJO_WRITE_MESSAGE_FLAG_NONE)); 75 stopwatch.Start(); 76 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context)); 77 EXPECT_EQ(1u, context); 78 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); 79 hss = HandleSignalsState(); 80 d0->RemoveWaiter(&w, &hss); 81 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 82 hss.satisfied_signals); 83 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 84 hss.satisfiable_signals); 85 86 // Try adding a readable waiter when already readable (from above). 87 w.Init(); 88 hss = HandleSignalsState(); 89 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 90 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss)); 91 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 92 hss.satisfied_signals); 93 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 94 hss.satisfiable_signals); 95 // Shouldn't need to remove the waiter (it was not added). 96 97 // Make |d0| no longer readable (by reading from it). 98 buffer[0] = 0; 99 buffer_size = kBufferSize; 100 EXPECT_EQ(MOJO_RESULT_OK, 101 d0->ReadMessage(UserPointer<void>(buffer), 102 MakeUserPointer(&buffer_size), 103 0, 104 nullptr, 105 MOJO_READ_MESSAGE_FLAG_NONE)); 106 EXPECT_EQ(kBufferSize, buffer_size); 107 EXPECT_EQ(123456789, buffer[0]); 108 109 // Wait for zero time for readability on |d0| (will time out). 110 w.Init(); 111 ASSERT_EQ(MOJO_RESULT_OK, 112 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr)); 113 stopwatch.Start(); 114 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr)); 115 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); 116 hss = HandleSignalsState(); 117 d0->RemoveWaiter(&w, &hss); 118 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 119 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 120 hss.satisfiable_signals); 121 122 // Wait for non-zero, finite time for readability on |d0| (will time out). 123 w.Init(); 124 ASSERT_EQ(MOJO_RESULT_OK, 125 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr)); 126 stopwatch.Start(); 127 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, 128 w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), nullptr)); 129 base::TimeDelta elapsed = stopwatch.Elapsed(); 130 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); 131 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); 132 hss = HandleSignalsState(); 133 d0->RemoveWaiter(&w, &hss); 134 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); 135 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 136 hss.satisfiable_signals); 137 138 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); 139 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); 140 } 141} 142 143TEST(MessagePipeDispatcherTest, InvalidParams) { 144 char buffer[1]; 145 146 scoped_refptr<MessagePipeDispatcher> d0( 147 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); 148 scoped_refptr<MessagePipeDispatcher> d1( 149 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); 150 { 151 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); 152 d0->Init(mp, 0); 153 d1->Init(mp, 1); 154 } 155 156 // |WriteMessage|: 157 // Huge buffer size. 158 EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, 159 d0->WriteMessage(UserPointer<const void>(buffer), 160 std::numeric_limits<uint32_t>::max(), 161 nullptr, 162 MOJO_WRITE_MESSAGE_FLAG_NONE)); 163 164 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); 165 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); 166} 167 168// These test invalid arguments that should cause death if we're being paranoid 169// about checking arguments (which we would want to do if, e.g., we were in a 170// true "kernel" situation, but we might not want to do otherwise for 171// performance reasons). Probably blatant errors like passing in null pointers 172// (for required pointer arguments) will still cause death, but perhaps not 173// predictably. 174TEST(MessagePipeDispatcherTest, InvalidParamsDeath) { 175 const char kMemoryCheckFailedRegex[] = "Check failed"; 176 177 scoped_refptr<MessagePipeDispatcher> d0( 178 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); 179 scoped_refptr<MessagePipeDispatcher> d1( 180 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); 181 { 182 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); 183 d0->Init(mp, 0); 184 d1->Init(mp, 1); 185 } 186 187 // |WriteMessage|: 188 // Null buffer with nonzero buffer size. 189 EXPECT_DEATH_IF_SUPPORTED( 190 d0->WriteMessage( 191 NullUserPointer(), 1, nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE), 192 kMemoryCheckFailedRegex); 193 194 // |ReadMessage|: 195 // Null buffer with nonzero buffer size. 196 // First write something so that we actually have something to read. 197 EXPECT_EQ(MOJO_RESULT_OK, 198 d1->WriteMessage(UserPointer<const void>("x"), 199 1, 200 nullptr, 201 MOJO_WRITE_MESSAGE_FLAG_NONE)); 202 uint32_t buffer_size = 1; 203 EXPECT_DEATH_IF_SUPPORTED(d0->ReadMessage(NullUserPointer(), 204 MakeUserPointer(&buffer_size), 205 0, 206 nullptr, 207 MOJO_READ_MESSAGE_FLAG_NONE), 208 kMemoryCheckFailedRegex); 209 210 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); 211 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); 212} 213 214// Test what happens when one end is closed (single-threaded test). 215TEST(MessagePipeDispatcherTest, BasicClosed) { 216 int32_t buffer[1]; 217 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); 218 uint32_t buffer_size; 219 220 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. 221 for (unsigned i = 0; i < 2; i++) { 222 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( 223 MessagePipeDispatcher::kDefaultCreateOptions)); 224 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( 225 MessagePipeDispatcher::kDefaultCreateOptions)); 226 { 227 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); 228 d0->Init(mp, i); // 0, 1. 229 d1->Init(mp, i ^ 1); // 1, 0. 230 } 231 Waiter w; 232 HandleSignalsState hss; 233 234 // Write (twice) to |d1|. 235 buffer[0] = 123456789; 236 EXPECT_EQ(MOJO_RESULT_OK, 237 d1->WriteMessage(UserPointer<const void>(buffer), 238 kBufferSize, 239 nullptr, 240 MOJO_WRITE_MESSAGE_FLAG_NONE)); 241 buffer[0] = 234567890; 242 EXPECT_EQ(MOJO_RESULT_OK, 243 d1->WriteMessage(UserPointer<const void>(buffer), 244 kBufferSize, 245 nullptr, 246 MOJO_WRITE_MESSAGE_FLAG_NONE)); 247 248 // Try waiting for readable on |d0|; should fail (already satisfied). 249 w.Init(); 250 hss = HandleSignalsState(); 251 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 252 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); 253 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 254 hss.satisfied_signals); 255 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 256 hss.satisfiable_signals); 257 258 // Try reading from |d1|; should fail (nothing to read). 259 buffer[0] = 0; 260 buffer_size = kBufferSize; 261 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, 262 d1->ReadMessage(UserPointer<void>(buffer), 263 MakeUserPointer(&buffer_size), 264 0, 265 nullptr, 266 MOJO_READ_MESSAGE_FLAG_NONE)); 267 268 // Close |d1|. 269 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); 270 271 // Try waiting for readable on |d0|; should fail (already satisfied). 272 w.Init(); 273 hss = HandleSignalsState(); 274 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 275 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, &hss)); 276 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 277 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); 278 279 // Read from |d0|. 280 buffer[0] = 0; 281 buffer_size = kBufferSize; 282 EXPECT_EQ(MOJO_RESULT_OK, 283 d0->ReadMessage(UserPointer<void>(buffer), 284 MakeUserPointer(&buffer_size), 285 0, 286 nullptr, 287 MOJO_READ_MESSAGE_FLAG_NONE)); 288 EXPECT_EQ(kBufferSize, buffer_size); 289 EXPECT_EQ(123456789, buffer[0]); 290 291 // Try waiting for readable on |d0|; should fail (already satisfied). 292 w.Init(); 293 hss = HandleSignalsState(); 294 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, 295 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss)); 296 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); 297 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); 298 299 // Read again from |d0|. 300 buffer[0] = 0; 301 buffer_size = kBufferSize; 302 EXPECT_EQ(MOJO_RESULT_OK, 303 d0->ReadMessage(UserPointer<void>(buffer), 304 MakeUserPointer(&buffer_size), 305 0, 306 nullptr, 307 MOJO_READ_MESSAGE_FLAG_NONE)); 308 EXPECT_EQ(kBufferSize, buffer_size); 309 EXPECT_EQ(234567890, buffer[0]); 310 311 // Try waiting for readable on |d0|; should fail (unsatisfiable). 312 w.Init(); 313 hss = HandleSignalsState(); 314 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 315 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss)); 316 EXPECT_EQ(0u, hss.satisfied_signals); 317 EXPECT_EQ(0u, hss.satisfiable_signals); 318 319 // Try waiting for writable on |d0|; should fail (unsatisfiable). 320 w.Init(); 321 hss = HandleSignalsState(); 322 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 323 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss)); 324 EXPECT_EQ(0u, hss.satisfied_signals); 325 EXPECT_EQ(0u, hss.satisfiable_signals); 326 327 // Try reading from |d0|; should fail (nothing to read and other end 328 // closed). 329 buffer[0] = 0; 330 buffer_size = kBufferSize; 331 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 332 d0->ReadMessage(UserPointer<void>(buffer), 333 MakeUserPointer(&buffer_size), 334 0, 335 nullptr, 336 MOJO_READ_MESSAGE_FLAG_NONE)); 337 338 // Try writing to |d0|; should fail (other end closed). 339 buffer[0] = 345678901; 340 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, 341 d0->WriteMessage(UserPointer<const void>(buffer), 342 kBufferSize, 343 nullptr, 344 MOJO_WRITE_MESSAGE_FLAG_NONE)); 345 346 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); 347 } 348} 349 350#if defined(OS_WIN) 351// http://crbug.com/396386 352#define MAYBE_BasicThreaded DISABLED_BasicThreaded 353#else 354#define MAYBE_BasicThreaded BasicThreaded 355#endif 356TEST(MessagePipeDispatcherTest, MAYBE_BasicThreaded) { 357 test::Stopwatch stopwatch; 358 int32_t buffer[1]; 359 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); 360 uint32_t buffer_size; 361 base::TimeDelta elapsed; 362 bool did_wait; 363 MojoResult result; 364 uint32_t context; 365 HandleSignalsState hss; 366 367 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. 368 for (unsigned i = 0; i < 2; i++) { 369 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( 370 MessagePipeDispatcher::kDefaultCreateOptions)); 371 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( 372 MessagePipeDispatcher::kDefaultCreateOptions)); 373 { 374 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); 375 d0->Init(mp, i); // 0, 1. 376 d1->Init(mp, i ^ 1); // 1, 0. 377 } 378 379 // Wait for readable on |d1|, which will become readable after some time. 380 { 381 test::WaiterThread thread(d1, 382 MOJO_HANDLE_SIGNAL_READABLE, 383 MOJO_DEADLINE_INDEFINITE, 384 1, 385 &did_wait, 386 &result, 387 &context, 388 &hss); 389 stopwatch.Start(); 390 thread.Start(); 391 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); 392 // Wake it up by writing to |d0|. 393 buffer[0] = 123456789; 394 EXPECT_EQ(MOJO_RESULT_OK, 395 d0->WriteMessage(UserPointer<const void>(buffer), 396 kBufferSize, 397 nullptr, 398 MOJO_WRITE_MESSAGE_FLAG_NONE)); 399 } // Joins the thread. 400 elapsed = stopwatch.Elapsed(); 401 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); 402 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); 403 EXPECT_TRUE(did_wait); 404 EXPECT_EQ(MOJO_RESULT_OK, result); 405 EXPECT_EQ(1u, context); 406 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 407 hss.satisfied_signals); 408 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 409 hss.satisfiable_signals); 410 411 // Now |d1| is already readable. Try waiting for it again. 412 { 413 test::WaiterThread thread(d1, 414 MOJO_HANDLE_SIGNAL_READABLE, 415 MOJO_DEADLINE_INDEFINITE, 416 2, 417 &did_wait, 418 &result, 419 &context, 420 &hss); 421 stopwatch.Start(); 422 thread.Start(); 423 } // Joins the thread. 424 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); 425 EXPECT_FALSE(did_wait); 426 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); 427 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 428 hss.satisfied_signals); 429 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, 430 hss.satisfiable_signals); 431 432 // Consume what we wrote to |d0|. 433 buffer[0] = 0; 434 buffer_size = kBufferSize; 435 EXPECT_EQ(MOJO_RESULT_OK, 436 d1->ReadMessage(UserPointer<void>(buffer), 437 MakeUserPointer(&buffer_size), 438 0, 439 nullptr, 440 MOJO_READ_MESSAGE_FLAG_NONE)); 441 EXPECT_EQ(kBufferSize, buffer_size); 442 EXPECT_EQ(123456789, buffer[0]); 443 444 // Wait for readable on |d1| and close |d0| after some time, which should 445 // cancel that wait. 446 { 447 test::WaiterThread thread(d1, 448 MOJO_HANDLE_SIGNAL_READABLE, 449 MOJO_DEADLINE_INDEFINITE, 450 3, 451 &did_wait, 452 &result, 453 &context, 454 &hss); 455 stopwatch.Start(); 456 thread.Start(); 457 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); 458 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); 459 } // Joins the thread. 460 elapsed = stopwatch.Elapsed(); 461 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); 462 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); 463 EXPECT_TRUE(did_wait); 464 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); 465 EXPECT_EQ(3u, context); 466 EXPECT_EQ(0u, hss.satisfied_signals); 467 EXPECT_EQ(0u, hss.satisfiable_signals); 468 469 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); 470 } 471 472 for (unsigned i = 0; i < 2; i++) { 473 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( 474 MessagePipeDispatcher::kDefaultCreateOptions)); 475 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( 476 MessagePipeDispatcher::kDefaultCreateOptions)); 477 { 478 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); 479 d0->Init(mp, i); // 0, 1. 480 d1->Init(mp, i ^ 1); // 1, 0. 481 } 482 483 // Wait for readable on |d1| and close |d1| after some time, which should 484 // cancel that wait. 485 { 486 test::WaiterThread thread(d1, 487 MOJO_HANDLE_SIGNAL_READABLE, 488 MOJO_DEADLINE_INDEFINITE, 489 4, 490 &did_wait, 491 &result, 492 &context, 493 &hss); 494 stopwatch.Start(); 495 thread.Start(); 496 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); 497 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); 498 } // Joins the thread. 499 elapsed = stopwatch.Elapsed(); 500 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); 501 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); 502 EXPECT_TRUE(did_wait); 503 EXPECT_EQ(MOJO_RESULT_CANCELLED, result); 504 EXPECT_EQ(4u, context); 505 EXPECT_EQ(0u, hss.satisfied_signals); 506 EXPECT_EQ(0u, hss.satisfiable_signals); 507 508 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); 509 } 510} 511 512// Stress test ----------------------------------------------------------------- 513 514const size_t kMaxMessageSize = 2000; 515 516class WriterThread : public base::SimpleThread { 517 public: 518 // |*messages_written| and |*bytes_written| belong to the thread while it's 519 // alive. 520 WriterThread(scoped_refptr<Dispatcher> write_dispatcher, 521 size_t* messages_written, 522 size_t* bytes_written) 523 : base::SimpleThread("writer_thread"), 524 write_dispatcher_(write_dispatcher), 525 messages_written_(messages_written), 526 bytes_written_(bytes_written) { 527 *messages_written_ = 0; 528 *bytes_written_ = 0; 529 } 530 531 virtual ~WriterThread() { Join(); } 532 533 private: 534 virtual void Run() OVERRIDE { 535 // Make some data to write. 536 unsigned char buffer[kMaxMessageSize]; 537 for (size_t i = 0; i < kMaxMessageSize; i++) 538 buffer[i] = static_cast<unsigned char>(i); 539 540 // Number of messages to write. 541 *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000)); 542 543 // Write messages. 544 for (size_t i = 0; i < *messages_written_; i++) { 545 uint32_t bytes_to_write = static_cast<uint32_t>( 546 base::RandInt(1, static_cast<int>(kMaxMessageSize))); 547 EXPECT_EQ(MOJO_RESULT_OK, 548 write_dispatcher_->WriteMessage(UserPointer<const void>(buffer), 549 bytes_to_write, 550 nullptr, 551 MOJO_WRITE_MESSAGE_FLAG_NONE)); 552 *bytes_written_ += bytes_to_write; 553 } 554 555 // Write one last "quit" message. 556 EXPECT_EQ(MOJO_RESULT_OK, 557 write_dispatcher_->WriteMessage(UserPointer<const void>("quit"), 558 4, 559 nullptr, 560 MOJO_WRITE_MESSAGE_FLAG_NONE)); 561 } 562 563 const scoped_refptr<Dispatcher> write_dispatcher_; 564 size_t* const messages_written_; 565 size_t* const bytes_written_; 566 567 DISALLOW_COPY_AND_ASSIGN(WriterThread); 568}; 569 570class ReaderThread : public base::SimpleThread { 571 public: 572 // |*messages_read| and |*bytes_read| belong to the thread while it's alive. 573 ReaderThread(scoped_refptr<Dispatcher> read_dispatcher, 574 size_t* messages_read, 575 size_t* bytes_read) 576 : base::SimpleThread("reader_thread"), 577 read_dispatcher_(read_dispatcher), 578 messages_read_(messages_read), 579 bytes_read_(bytes_read) { 580 *messages_read_ = 0; 581 *bytes_read_ = 0; 582 } 583 584 virtual ~ReaderThread() { Join(); } 585 586 private: 587 virtual void Run() OVERRIDE { 588 unsigned char buffer[kMaxMessageSize]; 589 Waiter w; 590 HandleSignalsState hss; 591 MojoResult result; 592 593 // Read messages. 594 for (;;) { 595 // Wait for it to be readable. 596 w.Init(); 597 hss = HandleSignalsState(); 598 result = 599 read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss); 600 EXPECT_TRUE(result == MOJO_RESULT_OK || 601 result == MOJO_RESULT_ALREADY_EXISTS) 602 << "result: " << result; 603 if (result == MOJO_RESULT_OK) { 604 // Actually need to wait. 605 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, nullptr)); 606 read_dispatcher_->RemoveWaiter(&w, &hss); 607 } 608 // We may not actually be readable, since we're racing with other threads. 609 EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE)); 610 611 // Now, try to do the read. 612 // Clear the buffer so that we can check the result. 613 memset(buffer, 0, sizeof(buffer)); 614 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); 615 result = read_dispatcher_->ReadMessage(UserPointer<void>(buffer), 616 MakeUserPointer(&buffer_size), 617 0, 618 nullptr, 619 MOJO_READ_MESSAGE_FLAG_NONE); 620 EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_SHOULD_WAIT) 621 << "result: " << result; 622 // We're racing with others to read, so maybe we failed. 623 if (result == MOJO_RESULT_SHOULD_WAIT) 624 continue; // In which case, try again. 625 // Check for quit. 626 if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0) 627 return; 628 EXPECT_GE(buffer_size, 1u); 629 EXPECT_LE(buffer_size, kMaxMessageSize); 630 EXPECT_TRUE(IsValidMessage(buffer, buffer_size)); 631 632 (*messages_read_)++; 633 *bytes_read_ += buffer_size; 634 } 635 } 636 637 static bool IsValidMessage(const unsigned char* buffer, 638 uint32_t message_size) { 639 size_t i; 640 for (i = 0; i < message_size; i++) { 641 if (buffer[i] != static_cast<unsigned char>(i)) 642 return false; 643 } 644 // Check that the remaining bytes weren't stomped on. 645 for (; i < kMaxMessageSize; i++) { 646 if (buffer[i] != 0) 647 return false; 648 } 649 return true; 650 } 651 652 const scoped_refptr<Dispatcher> read_dispatcher_; 653 size_t* const messages_read_; 654 size_t* const bytes_read_; 655 656 DISALLOW_COPY_AND_ASSIGN(ReaderThread); 657}; 658 659TEST(MessagePipeDispatcherTest, Stress) { 660 static const size_t kNumWriters = 30; 661 static const size_t kNumReaders = kNumWriters; 662 663 scoped_refptr<MessagePipeDispatcher> d_write( 664 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); 665 scoped_refptr<MessagePipeDispatcher> d_read( 666 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); 667 { 668 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); 669 d_write->Init(mp, 0); 670 d_read->Init(mp, 1); 671 } 672 673 size_t messages_written[kNumWriters]; 674 size_t bytes_written[kNumWriters]; 675 size_t messages_read[kNumReaders]; 676 size_t bytes_read[kNumReaders]; 677 { 678 // Make writers. 679 ScopedVector<WriterThread> writers; 680 for (size_t i = 0; i < kNumWriters; i++) { 681 writers.push_back( 682 new WriterThread(d_write, &messages_written[i], &bytes_written[i])); 683 } 684 685 // Make readers. 686 ScopedVector<ReaderThread> readers; 687 for (size_t i = 0; i < kNumReaders; i++) { 688 readers.push_back( 689 new ReaderThread(d_read, &messages_read[i], &bytes_read[i])); 690 } 691 692 // Start writers. 693 for (size_t i = 0; i < kNumWriters; i++) 694 writers[i]->Start(); 695 696 // Start readers. 697 for (size_t i = 0; i < kNumReaders; i++) 698 readers[i]->Start(); 699 700 // TODO(vtl): Maybe I should have an event that triggers all the threads to 701 // start doing stuff for real (so that the first ones created/started aren't 702 // advantaged). 703 } // Joins all the threads. 704 705 size_t total_messages_written = 0; 706 size_t total_bytes_written = 0; 707 for (size_t i = 0; i < kNumWriters; i++) { 708 total_messages_written += messages_written[i]; 709 total_bytes_written += bytes_written[i]; 710 } 711 size_t total_messages_read = 0; 712 size_t total_bytes_read = 0; 713 for (size_t i = 0; i < kNumReaders; i++) { 714 total_messages_read += messages_read[i]; 715 total_bytes_read += bytes_read[i]; 716 // We'd have to be really unlucky to have read no messages on a thread. 717 EXPECT_GT(messages_read[i], 0u) << "reader: " << i; 718 EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i; 719 } 720 EXPECT_EQ(total_messages_written, total_messages_read); 721 EXPECT_EQ(total_bytes_written, total_bytes_read); 722 723 EXPECT_EQ(MOJO_RESULT_OK, d_write->Close()); 724 EXPECT_EQ(MOJO_RESULT_OK, d_read->Close()); 725} 726 727} // namespace 728} // namespace system 729} // namespace mojo 730