ipc_sync_channel_unittest.cc revision 5821806d5e7f356e8fa4b058a389a808ea183019
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4// 5// Unit test for SyncChannel. 6 7#include "ipc/ipc_sync_channel.h" 8 9#include <string> 10#include <vector> 11 12#include "base/basictypes.h" 13#include "base/bind.h" 14#include "base/logging.h" 15#include "base/memory/scoped_ptr.h" 16#include "base/message_loop.h" 17#include "base/process_util.h" 18#include "base/stl_util.h" 19#include "base/string_util.h" 20#include "base/third_party/dynamic_annotations/dynamic_annotations.h" 21#include "base/threading/platform_thread.h" 22#include "base/threading/thread.h" 23#include "base/synchronization/waitable_event.h" 24#include "ipc/ipc_listener.h" 25#include "ipc/ipc_message.h" 26#include "ipc/ipc_sender.h" 27#include "ipc/ipc_sync_message_filter.h" 28#include "ipc/ipc_sync_message_unittest.h" 29#include "testing/gtest/include/gtest/gtest.h" 30 31using base::WaitableEvent; 32 33namespace IPC { 34 35namespace { 36 37// Base class for a "process" with listener and IPC threads. 38class Worker : public Listener, public Sender { 39 public: 40 // Will create a channel without a name. 41 Worker(Channel::Mode mode, const std::string& thread_name) 42 : done_(new WaitableEvent(false, false)), 43 channel_created_(new WaitableEvent(false, false)), 44 mode_(mode), 45 ipc_thread_((thread_name + "_ipc").c_str()), 46 listener_thread_((thread_name + "_listener").c_str()), 47 overrided_thread_(NULL), 48 shutdown_event_(true, false) { 49 // The data race on vfptr is real but is very hard 50 // to suppress using standard Valgrind mechanism (suppressions). 51 // We have to use ANNOTATE_BENIGN_RACE to hide the reports and 52 // make ThreadSanitizer bots green. 53 ANNOTATE_BENIGN_RACE(this, "Race on vfptr, http://crbug.com/25841"); 54 } 55 56 // Will create a named channel and use this name for the threads' name. 57 Worker(const std::string& channel_name, Channel::Mode mode) 58 : done_(new WaitableEvent(false, false)), 59 channel_created_(new WaitableEvent(false, false)), 60 channel_name_(channel_name), 61 mode_(mode), 62 ipc_thread_((channel_name + "_ipc").c_str()), 63 listener_thread_((channel_name + "_listener").c_str()), 64 overrided_thread_(NULL), 65 shutdown_event_(true, false) { 66 // The data race on vfptr is real but is very hard 67 // to suppress using standard Valgrind mechanism (suppressions). 68 // We have to use ANNOTATE_BENIGN_RACE to hide the reports and 69 // make ThreadSanitizer bots green. 70 ANNOTATE_BENIGN_RACE(this, "Race on vfptr, http://crbug.com/25841"); 71 } 72 73 // The IPC thread needs to outlive SyncChannel, so force the correct order of 74 // destruction. 75 virtual ~Worker() { 76 WaitableEvent listener_done(false, false), ipc_done(false, false); 77 ListenerThread()->message_loop()->PostTask( 78 FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown1, this, 79 &listener_done, &ipc_done)); 80 listener_done.Wait(); 81 ipc_done.Wait(); 82 ipc_thread_.Stop(); 83 listener_thread_.Stop(); 84 } 85 void AddRef() { } 86 void Release() { } 87 bool Send(Message* msg) { return channel_->Send(msg); } 88 bool SendWithTimeout(Message* msg, int timeout_ms) { 89 return channel_->SendWithTimeout(msg, timeout_ms); 90 } 91 void WaitForChannelCreation() { channel_created_->Wait(); } 92 void CloseChannel() { 93 DCHECK(MessageLoop::current() == ListenerThread()->message_loop()); 94 channel_->Close(); 95 } 96 void Start() { 97 StartThread(&listener_thread_, MessageLoop::TYPE_DEFAULT); 98 ListenerThread()->message_loop()->PostTask( 99 FROM_HERE, base::Bind(&Worker::OnStart, this)); 100 } 101 void OverrideThread(base::Thread* overrided_thread) { 102 DCHECK(overrided_thread_ == NULL); 103 overrided_thread_ = overrided_thread; 104 } 105 bool SendAnswerToLife(bool pump, int timeout, bool succeed) { 106 int answer = 0; 107 SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); 108 if (pump) 109 msg->EnableMessagePumping(); 110 bool result = SendWithTimeout(msg, timeout); 111 DCHECK_EQ(result, succeed); 112 DCHECK_EQ(answer, (succeed ? 42 : 0)); 113 return result; 114 } 115 bool SendDouble(bool pump, bool succeed) { 116 int answer = 0; 117 SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer); 118 if (pump) 119 msg->EnableMessagePumping(); 120 bool result = Send(msg); 121 DCHECK_EQ(result, succeed); 122 DCHECK_EQ(answer, (succeed ? 10 : 0)); 123 return result; 124 } 125 const std::string& channel_name() { return channel_name_; } 126 Channel::Mode mode() { return mode_; } 127 WaitableEvent* done_event() { return done_.get(); } 128 WaitableEvent* shutdown_event() { return &shutdown_event_; } 129 void ResetChannel() { channel_.reset(); } 130 // Derived classes need to call this when they've completed their part of 131 // the test. 132 void Done() { done_->Signal(); } 133 134 protected: 135 SyncChannel* channel() { return channel_.get(); } 136 // Functions for dervied classes to implement if they wish. 137 virtual void Run() { } 138 virtual void OnAnswer(int* answer) { NOTREACHED(); } 139 virtual void OnAnswerDelay(Message* reply_msg) { 140 // The message handler map below can only take one entry for 141 // SyncChannelTestMsg_AnswerToLife, so since some classes want 142 // the normal version while other want the delayed reply, we 143 // call the normal version if the derived class didn't override 144 // this function. 145 int answer; 146 OnAnswer(&answer); 147 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer); 148 Send(reply_msg); 149 } 150 virtual void OnDouble(int in, int* out) { NOTREACHED(); } 151 virtual void OnDoubleDelay(int in, Message* reply_msg) { 152 int result; 153 OnDouble(in, &result); 154 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result); 155 Send(reply_msg); 156 } 157 158 virtual void OnNestedTestMsg(Message* reply_msg) { 159 NOTREACHED(); 160 } 161 162 virtual SyncChannel* CreateChannel() { 163 return new SyncChannel( 164 channel_name_, mode_, this, ipc_thread_.message_loop_proxy(), true, 165 &shutdown_event_); 166 } 167 168 base::Thread* ListenerThread() { 169 return overrided_thread_ ? overrided_thread_ : &listener_thread_; 170 } 171 172 const base::Thread& ipc_thread() const { return ipc_thread_; } 173 174 private: 175 // Called on the listener thread to create the sync channel. 176 void OnStart() { 177 // Link ipc_thread_, listener_thread_ and channel_ altogether. 178 StartThread(&ipc_thread_, MessageLoop::TYPE_IO); 179 channel_.reset(CreateChannel()); 180 channel_created_->Signal(); 181 Run(); 182 } 183 184 void OnListenerThreadShutdown1(WaitableEvent* listener_event, 185 WaitableEvent* ipc_event) { 186 // SyncChannel needs to be destructed on the thread that it was created on. 187 channel_.reset(); 188 189 MessageLoop::current()->RunAllPending(); 190 191 ipc_thread_.message_loop()->PostTask( 192 FROM_HERE, base::Bind(&Worker::OnIPCThreadShutdown, this, 193 listener_event, ipc_event)); 194 } 195 196 void OnIPCThreadShutdown(WaitableEvent* listener_event, 197 WaitableEvent* ipc_event) { 198 MessageLoop::current()->RunAllPending(); 199 ipc_event->Signal(); 200 201 listener_thread_.message_loop()->PostTask( 202 FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown2, this, 203 listener_event)); 204 } 205 206 void OnListenerThreadShutdown2(WaitableEvent* listener_event) { 207 MessageLoop::current()->RunAllPending(); 208 listener_event->Signal(); 209 } 210 211 bool OnMessageReceived(const Message& message) { 212 IPC_BEGIN_MESSAGE_MAP(Worker, message) 213 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay) 214 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife, 215 OnAnswerDelay) 216 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelNestedTestMsg_String, 217 OnNestedTestMsg) 218 IPC_END_MESSAGE_MAP() 219 return true; 220 } 221 222 void StartThread(base::Thread* thread, MessageLoop::Type type) { 223 base::Thread::Options options; 224 options.message_loop_type = type; 225 thread->StartWithOptions(options); 226 } 227 228 scoped_ptr<WaitableEvent> done_; 229 scoped_ptr<WaitableEvent> channel_created_; 230 std::string channel_name_; 231 Channel::Mode mode_; 232 scoped_ptr<SyncChannel> channel_; 233 base::Thread ipc_thread_; 234 base::Thread listener_thread_; 235 base::Thread* overrided_thread_; 236 237 base::WaitableEvent shutdown_event_; 238 239 DISALLOW_COPY_AND_ASSIGN(Worker); 240}; 241 242 243// Starts the test with the given workers. This function deletes the workers 244// when it's done. 245void RunTest(std::vector<Worker*> workers) { 246 // First we create the workers that are channel servers, or else the other 247 // workers' channel initialization might fail because the pipe isn't created.. 248 for (size_t i = 0; i < workers.size(); ++i) { 249 if (workers[i]->mode() & Channel::MODE_SERVER_FLAG) { 250 workers[i]->Start(); 251 workers[i]->WaitForChannelCreation(); 252 } 253 } 254 255 // now create the clients 256 for (size_t i = 0; i < workers.size(); ++i) { 257 if (workers[i]->mode() & Channel::MODE_CLIENT_FLAG) 258 workers[i]->Start(); 259 } 260 261 // wait for all the workers to finish 262 for (size_t i = 0; i < workers.size(); ++i) 263 workers[i]->done_event()->Wait(); 264 265 STLDeleteContainerPointers(workers.begin(), workers.end()); 266} 267 268} // namespace 269 270class IPCSyncChannelTest : public testing::Test { 271 private: 272 MessageLoop message_loop_; 273}; 274 275//----------------------------------------------------------------------------- 276 277namespace { 278 279class SimpleServer : public Worker { 280 public: 281 explicit SimpleServer(bool pump_during_send) 282 : Worker(Channel::MODE_SERVER, "simpler_server"), 283 pump_during_send_(pump_during_send) { } 284 void Run() { 285 SendAnswerToLife(pump_during_send_, base::kNoTimeout, true); 286 Done(); 287 } 288 289 bool pump_during_send_; 290}; 291 292class SimpleClient : public Worker { 293 public: 294 SimpleClient() : Worker(Channel::MODE_CLIENT, "simple_client") { } 295 296 void OnAnswer(int* answer) { 297 *answer = 42; 298 Done(); 299 } 300}; 301 302void Simple(bool pump_during_send) { 303 std::vector<Worker*> workers; 304 workers.push_back(new SimpleServer(pump_during_send)); 305 workers.push_back(new SimpleClient()); 306 RunTest(workers); 307} 308 309} // namespace 310 311// Tests basic synchronous call 312TEST_F(IPCSyncChannelTest, Simple) { 313 Simple(false); 314 Simple(true); 315} 316 317//----------------------------------------------------------------------------- 318 319namespace { 320 321// Worker classes which override how the sync channel is created to use the 322// two-step initialization (calling the lightweight constructor and then 323// ChannelProxy::Init separately) process. 324class TwoStepServer : public Worker { 325 public: 326 explicit TwoStepServer(bool create_pipe_now) 327 : Worker(Channel::MODE_SERVER, "simpler_server"), 328 create_pipe_now_(create_pipe_now) { } 329 330 void Run() { 331 SendAnswerToLife(false, base::kNoTimeout, true); 332 Done(); 333 } 334 335 virtual SyncChannel* CreateChannel() { 336 SyncChannel* channel = new SyncChannel( 337 this, ipc_thread().message_loop_proxy(), shutdown_event()); 338 channel->Init(channel_name(), mode(), create_pipe_now_); 339 return channel; 340 } 341 342 bool create_pipe_now_; 343}; 344 345class TwoStepClient : public Worker { 346 public: 347 TwoStepClient(bool create_pipe_now) 348 : Worker(Channel::MODE_CLIENT, "simple_client"), 349 create_pipe_now_(create_pipe_now) { } 350 351 void OnAnswer(int* answer) { 352 *answer = 42; 353 Done(); 354 } 355 356 virtual SyncChannel* CreateChannel() { 357 SyncChannel* channel = new SyncChannel( 358 this, ipc_thread().message_loop_proxy(), shutdown_event()); 359 channel->Init(channel_name(), mode(), create_pipe_now_); 360 return channel; 361 } 362 363 bool create_pipe_now_; 364}; 365 366void TwoStep(bool create_server_pipe_now, bool create_client_pipe_now) { 367 std::vector<Worker*> workers; 368 workers.push_back(new TwoStepServer(create_server_pipe_now)); 369 workers.push_back(new TwoStepClient(create_client_pipe_now)); 370 RunTest(workers); 371} 372 373} // namespace 374 375// Tests basic two-step initialization, where you call the lightweight 376// constructor then Init. 377TEST_F(IPCSyncChannelTest, TwoStepInitialization) { 378 TwoStep(false, false); 379 TwoStep(false, true); 380 TwoStep(true, false); 381 TwoStep(true, true); 382} 383 384 385//----------------------------------------------------------------------------- 386 387namespace { 388 389class DelayClient : public Worker { 390 public: 391 DelayClient() : Worker(Channel::MODE_CLIENT, "delay_client") { } 392 393 void OnAnswerDelay(Message* reply_msg) { 394 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 395 Send(reply_msg); 396 Done(); 397 } 398}; 399 400void DelayReply(bool pump_during_send) { 401 std::vector<Worker*> workers; 402 workers.push_back(new SimpleServer(pump_during_send)); 403 workers.push_back(new DelayClient()); 404 RunTest(workers); 405} 406 407} // namespace 408 409// Tests that asynchronous replies work 410TEST_F(IPCSyncChannelTest, DelayReply) { 411 DelayReply(false); 412 DelayReply(true); 413} 414 415//----------------------------------------------------------------------------- 416 417namespace { 418 419class NoHangServer : public Worker { 420 public: 421 NoHangServer(WaitableEvent* got_first_reply, bool pump_during_send) 422 : Worker(Channel::MODE_SERVER, "no_hang_server"), 423 got_first_reply_(got_first_reply), 424 pump_during_send_(pump_during_send) { } 425 void Run() { 426 SendAnswerToLife(pump_during_send_, base::kNoTimeout, true); 427 got_first_reply_->Signal(); 428 429 SendAnswerToLife(pump_during_send_, base::kNoTimeout, false); 430 Done(); 431 } 432 433 WaitableEvent* got_first_reply_; 434 bool pump_during_send_; 435}; 436 437class NoHangClient : public Worker { 438 public: 439 explicit NoHangClient(WaitableEvent* got_first_reply) 440 : Worker(Channel::MODE_CLIENT, "no_hang_client"), 441 got_first_reply_(got_first_reply) { } 442 443 virtual void OnAnswerDelay(Message* reply_msg) { 444 // Use the DELAY_REPLY macro so that we can force the reply to be sent 445 // before this function returns (when the channel will be reset). 446 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 447 Send(reply_msg); 448 got_first_reply_->Wait(); 449 CloseChannel(); 450 Done(); 451 } 452 453 WaitableEvent* got_first_reply_; 454}; 455 456void NoHang(bool pump_during_send) { 457 WaitableEvent got_first_reply(false, false); 458 std::vector<Worker*> workers; 459 workers.push_back(new NoHangServer(&got_first_reply, pump_during_send)); 460 workers.push_back(new NoHangClient(&got_first_reply)); 461 RunTest(workers); 462} 463 464} // namespace 465 466// Tests that caller doesn't hang if receiver dies 467TEST_F(IPCSyncChannelTest, NoHang) { 468 NoHang(false); 469 NoHang(true); 470} 471 472//----------------------------------------------------------------------------- 473 474namespace { 475 476class UnblockServer : public Worker { 477 public: 478 UnblockServer(bool pump_during_send, bool delete_during_send) 479 : Worker(Channel::MODE_SERVER, "unblock_server"), 480 pump_during_send_(pump_during_send), 481 delete_during_send_(delete_during_send) { } 482 void Run() { 483 if (delete_during_send_) { 484 // Use custom code since race conditions mean the answer may or may not be 485 // available. 486 int answer = 0; 487 SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); 488 if (pump_during_send_) 489 msg->EnableMessagePumping(); 490 Send(msg); 491 } else { 492 SendAnswerToLife(pump_during_send_, base::kNoTimeout, true); 493 } 494 Done(); 495 } 496 497 void OnDoubleDelay(int in, Message* reply_msg) { 498 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2); 499 Send(reply_msg); 500 if (delete_during_send_) 501 ResetChannel(); 502 } 503 504 bool pump_during_send_; 505 bool delete_during_send_; 506}; 507 508class UnblockClient : public Worker { 509 public: 510 explicit UnblockClient(bool pump_during_send) 511 : Worker(Channel::MODE_CLIENT, "unblock_client"), 512 pump_during_send_(pump_during_send) { } 513 514 void OnAnswer(int* answer) { 515 SendDouble(pump_during_send_, true); 516 *answer = 42; 517 Done(); 518 } 519 520 bool pump_during_send_; 521}; 522 523void Unblock(bool server_pump, bool client_pump, bool delete_during_send) { 524 std::vector<Worker*> workers; 525 workers.push_back(new UnblockServer(server_pump, delete_during_send)); 526 workers.push_back(new UnblockClient(client_pump)); 527 RunTest(workers); 528} 529 530} // namespace 531 532// Tests that the caller unblocks to answer a sync message from the receiver. 533TEST_F(IPCSyncChannelTest, Unblock) { 534 Unblock(false, false, false); 535 Unblock(false, true, false); 536 Unblock(true, false, false); 537 Unblock(true, true, false); 538} 539 540//----------------------------------------------------------------------------- 541 542// Tests that the the SyncChannel object can be deleted during a Send. 543TEST_F(IPCSyncChannelTest, ChannelDeleteDuringSend) { 544 Unblock(false, false, true); 545 Unblock(false, true, true); 546 Unblock(true, false, true); 547 Unblock(true, true, true); 548} 549 550//----------------------------------------------------------------------------- 551 552namespace { 553 554class RecursiveServer : public Worker { 555 public: 556 RecursiveServer(bool expected_send_result, bool pump_first, bool pump_second) 557 : Worker(Channel::MODE_SERVER, "recursive_server"), 558 expected_send_result_(expected_send_result), 559 pump_first_(pump_first), pump_second_(pump_second) {} 560 void Run() { 561 SendDouble(pump_first_, expected_send_result_); 562 Done(); 563 } 564 565 void OnDouble(int in, int* out) { 566 *out = in * 2; 567 SendAnswerToLife(pump_second_, base::kNoTimeout, expected_send_result_); 568 } 569 570 bool expected_send_result_, pump_first_, pump_second_; 571}; 572 573class RecursiveClient : public Worker { 574 public: 575 RecursiveClient(bool pump_during_send, bool close_channel) 576 : Worker(Channel::MODE_CLIENT, "recursive_client"), 577 pump_during_send_(pump_during_send), close_channel_(close_channel) {} 578 579 void OnDoubleDelay(int in, Message* reply_msg) { 580 SendDouble(pump_during_send_, !close_channel_); 581 if (close_channel_) { 582 delete reply_msg; 583 } else { 584 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2); 585 Send(reply_msg); 586 } 587 Done(); 588 } 589 590 void OnAnswerDelay(Message* reply_msg) { 591 if (close_channel_) { 592 delete reply_msg; 593 CloseChannel(); 594 } else { 595 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 596 Send(reply_msg); 597 } 598 } 599 600 bool pump_during_send_, close_channel_; 601}; 602 603void Recursive( 604 bool server_pump_first, bool server_pump_second, bool client_pump) { 605 std::vector<Worker*> workers; 606 workers.push_back( 607 new RecursiveServer(true, server_pump_first, server_pump_second)); 608 workers.push_back(new RecursiveClient(client_pump, false)); 609 RunTest(workers); 610} 611 612} // namespace 613 614// Tests a server calling Send while another Send is pending. 615TEST_F(IPCSyncChannelTest, Recursive) { 616 Recursive(false, false, false); 617 Recursive(false, false, true); 618 Recursive(false, true, false); 619 Recursive(false, true, true); 620 Recursive(true, false, false); 621 Recursive(true, false, true); 622 Recursive(true, true, false); 623 Recursive(true, true, true); 624} 625 626//----------------------------------------------------------------------------- 627 628namespace { 629 630void RecursiveNoHang( 631 bool server_pump_first, bool server_pump_second, bool client_pump) { 632 std::vector<Worker*> workers; 633 workers.push_back( 634 new RecursiveServer(false, server_pump_first, server_pump_second)); 635 workers.push_back(new RecursiveClient(client_pump, true)); 636 RunTest(workers); 637} 638 639} // namespace 640 641// Tests that if a caller makes a sync call during an existing sync call and 642// the receiver dies, neither of the Send() calls hang. 643TEST_F(IPCSyncChannelTest, RecursiveNoHang) { 644 RecursiveNoHang(false, false, false); 645 RecursiveNoHang(false, false, true); 646 RecursiveNoHang(false, true, false); 647 RecursiveNoHang(false, true, true); 648 RecursiveNoHang(true, false, false); 649 RecursiveNoHang(true, false, true); 650 RecursiveNoHang(true, true, false); 651 RecursiveNoHang(true, true, true); 652} 653 654//----------------------------------------------------------------------------- 655 656namespace { 657 658class MultipleServer1 : public Worker { 659 public: 660 explicit MultipleServer1(bool pump_during_send) 661 : Worker("test_channel1", Channel::MODE_SERVER), 662 pump_during_send_(pump_during_send) { } 663 664 void Run() { 665 SendDouble(pump_during_send_, true); 666 Done(); 667 } 668 669 bool pump_during_send_; 670}; 671 672class MultipleClient1 : public Worker { 673 public: 674 MultipleClient1(WaitableEvent* client1_msg_received, 675 WaitableEvent* client1_can_reply) : 676 Worker("test_channel1", Channel::MODE_CLIENT), 677 client1_msg_received_(client1_msg_received), 678 client1_can_reply_(client1_can_reply) { } 679 680 void OnDouble(int in, int* out) { 681 client1_msg_received_->Signal(); 682 *out = in * 2; 683 client1_can_reply_->Wait(); 684 Done(); 685 } 686 687 private: 688 WaitableEvent *client1_msg_received_, *client1_can_reply_; 689}; 690 691class MultipleServer2 : public Worker { 692 public: 693 MultipleServer2() : Worker("test_channel2", Channel::MODE_SERVER) { } 694 695 void OnAnswer(int* result) { 696 *result = 42; 697 Done(); 698 } 699}; 700 701class MultipleClient2 : public Worker { 702 public: 703 MultipleClient2( 704 WaitableEvent* client1_msg_received, WaitableEvent* client1_can_reply, 705 bool pump_during_send) 706 : Worker("test_channel2", Channel::MODE_CLIENT), 707 client1_msg_received_(client1_msg_received), 708 client1_can_reply_(client1_can_reply), 709 pump_during_send_(pump_during_send) { } 710 711 void Run() { 712 client1_msg_received_->Wait(); 713 SendAnswerToLife(pump_during_send_, base::kNoTimeout, true); 714 client1_can_reply_->Signal(); 715 Done(); 716 } 717 718 private: 719 WaitableEvent *client1_msg_received_, *client1_can_reply_; 720 bool pump_during_send_; 721}; 722 723void Multiple(bool server_pump, bool client_pump) { 724 std::vector<Worker*> workers; 725 726 // A shared worker thread so that server1 and server2 run on one thread. 727 base::Thread worker_thread("Multiple"); 728 ASSERT_TRUE(worker_thread.Start()); 729 730 // Server1 sends a sync msg to client1, which blocks the reply until 731 // server2 (which runs on the same worker thread as server1) responds 732 // to a sync msg from client2. 733 WaitableEvent client1_msg_received(false, false); 734 WaitableEvent client1_can_reply(false, false); 735 736 Worker* worker; 737 738 worker = new MultipleServer2(); 739 worker->OverrideThread(&worker_thread); 740 workers.push_back(worker); 741 742 worker = new MultipleClient2( 743 &client1_msg_received, &client1_can_reply, client_pump); 744 workers.push_back(worker); 745 746 worker = new MultipleServer1(server_pump); 747 worker->OverrideThread(&worker_thread); 748 workers.push_back(worker); 749 750 worker = new MultipleClient1( 751 &client1_msg_received, &client1_can_reply); 752 workers.push_back(worker); 753 754 RunTest(workers); 755} 756 757} // namespace 758 759// Tests that multiple SyncObjects on the same listener thread can unblock each 760// other. 761TEST_F(IPCSyncChannelTest, Multiple) { 762 Multiple(false, false); 763 Multiple(false, true); 764 Multiple(true, false); 765 Multiple(true, true); 766} 767 768//----------------------------------------------------------------------------- 769 770namespace { 771 772// This class provides server side functionality to test the case where 773// multiple sync channels are in use on the same thread on the client and 774// nested calls are issued. 775class QueuedReplyServer : public Worker { 776 public: 777 QueuedReplyServer(base::Thread* listener_thread, 778 const std::string& channel_name, 779 const std::string& reply_text) 780 : Worker(channel_name, Channel::MODE_SERVER), 781 reply_text_(reply_text) { 782 Worker::OverrideThread(listener_thread); 783 } 784 785 virtual void OnNestedTestMsg(Message* reply_msg) { 786 VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_; 787 SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_); 788 Send(reply_msg); 789 Done(); 790 } 791 792 private: 793 std::string reply_text_; 794}; 795 796// The QueuedReplyClient class provides functionality to test the case where 797// multiple sync channels are in use on the same thread and they make nested 798// sync calls, i.e. while the first channel waits for a response it makes a 799// sync call on another channel. 800// The callstack should unwind correctly, i.e. the outermost call should 801// complete first, and so on. 802class QueuedReplyClient : public Worker { 803 public: 804 QueuedReplyClient(base::Thread* listener_thread, 805 const std::string& channel_name, 806 const std::string& expected_text, 807 bool pump_during_send) 808 : Worker(channel_name, Channel::MODE_CLIENT), 809 pump_during_send_(pump_during_send), 810 expected_text_(expected_text) { 811 Worker::OverrideThread(listener_thread); 812 } 813 814 virtual void Run() { 815 std::string response; 816 SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response); 817 if (pump_during_send_) 818 msg->EnableMessagePumping(); 819 bool result = Send(msg); 820 DCHECK(result); 821 DCHECK_EQ(response, expected_text_); 822 823 VLOG(1) << __FUNCTION__ << " Received reply: " << response; 824 Done(); 825 } 826 827 private: 828 bool pump_during_send_; 829 std::string expected_text_; 830}; 831 832void QueuedReply(bool client_pump) { 833 std::vector<Worker*> workers; 834 835 // A shared worker thread for servers 836 base::Thread server_worker_thread("QueuedReply_ServerListener"); 837 ASSERT_TRUE(server_worker_thread.Start()); 838 839 base::Thread client_worker_thread("QueuedReply_ClientListener"); 840 ASSERT_TRUE(client_worker_thread.Start()); 841 842 Worker* worker; 843 844 worker = new QueuedReplyServer(&server_worker_thread, 845 "QueuedReply_Server1", 846 "Got first message"); 847 workers.push_back(worker); 848 849 worker = new QueuedReplyServer(&server_worker_thread, 850 "QueuedReply_Server2", 851 "Got second message"); 852 workers.push_back(worker); 853 854 worker = new QueuedReplyClient(&client_worker_thread, 855 "QueuedReply_Server1", 856 "Got first message", 857 client_pump); 858 workers.push_back(worker); 859 860 worker = new QueuedReplyClient(&client_worker_thread, 861 "QueuedReply_Server2", 862 "Got second message", 863 client_pump); 864 workers.push_back(worker); 865 866 RunTest(workers); 867} 868 869} // namespace 870 871// While a blocking send is in progress, the listener thread might answer other 872// synchronous messages. This tests that if during the response to another 873// message the reply to the original messages comes, it is queued up correctly 874// and the original Send is unblocked later. 875// We also test that the send call stacks unwind correctly when the channel 876// pumps messages while waiting for a response. 877TEST_F(IPCSyncChannelTest, QueuedReply) { 878 QueuedReply(false); 879 QueuedReply(true); 880} 881 882//----------------------------------------------------------------------------- 883 884namespace { 885 886class ChattyClient : public Worker { 887 public: 888 ChattyClient() : 889 Worker(Channel::MODE_CLIENT, "chatty_client") { } 890 891 void OnAnswer(int* answer) { 892 // The PostMessage limit is 10k. Send 20% more than that. 893 const int kMessageLimit = 10000; 894 const int kMessagesToSend = kMessageLimit * 120 / 100; 895 for (int i = 0; i < kMessagesToSend; ++i) { 896 if (!SendDouble(false, true)) 897 break; 898 } 899 *answer = 42; 900 Done(); 901 } 902}; 903 904void ChattyServer(bool pump_during_send) { 905 std::vector<Worker*> workers; 906 workers.push_back(new UnblockServer(pump_during_send, false)); 907 workers.push_back(new ChattyClient()); 908 RunTest(workers); 909} 910 911} // namespace 912 913// Tests http://b/1093251 - that sending lots of sync messages while 914// the receiver is waiting for a sync reply does not overflow the PostMessage 915// queue. 916TEST_F(IPCSyncChannelTest, ChattyServer) { 917 ChattyServer(false); 918 ChattyServer(true); 919} 920 921//------------------------------------------------------------------------------ 922 923namespace { 924 925class TimeoutServer : public Worker { 926 public: 927 TimeoutServer(int timeout_ms, 928 std::vector<bool> timeout_seq, 929 bool pump_during_send) 930 : Worker(Channel::MODE_SERVER, "timeout_server"), 931 timeout_ms_(timeout_ms), 932 timeout_seq_(timeout_seq), 933 pump_during_send_(pump_during_send) { 934 } 935 936 void Run() { 937 for (std::vector<bool>::const_iterator iter = timeout_seq_.begin(); 938 iter != timeout_seq_.end(); ++iter) { 939 SendAnswerToLife(pump_during_send_, timeout_ms_, !*iter); 940 } 941 Done(); 942 } 943 944 private: 945 int timeout_ms_; 946 std::vector<bool> timeout_seq_; 947 bool pump_during_send_; 948}; 949 950class UnresponsiveClient : public Worker { 951 public: 952 explicit UnresponsiveClient(std::vector<bool> timeout_seq) 953 : Worker(Channel::MODE_CLIENT, "unresponsive_client"), 954 timeout_seq_(timeout_seq) { 955 } 956 957 void OnAnswerDelay(Message* reply_msg) { 958 DCHECK(!timeout_seq_.empty()); 959 if (!timeout_seq_[0]) { 960 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 961 Send(reply_msg); 962 } else { 963 // Don't reply. 964 delete reply_msg; 965 } 966 timeout_seq_.erase(timeout_seq_.begin()); 967 if (timeout_seq_.empty()) 968 Done(); 969 } 970 971 private: 972 // Whether we should time-out or respond to the various messages we receive. 973 std::vector<bool> timeout_seq_; 974}; 975 976void SendWithTimeoutOK(bool pump_during_send) { 977 std::vector<Worker*> workers; 978 std::vector<bool> timeout_seq; 979 timeout_seq.push_back(false); 980 timeout_seq.push_back(false); 981 timeout_seq.push_back(false); 982 workers.push_back(new TimeoutServer(5000, timeout_seq, pump_during_send)); 983 workers.push_back(new SimpleClient()); 984 RunTest(workers); 985} 986 987void SendWithTimeoutTimeout(bool pump_during_send) { 988 std::vector<Worker*> workers; 989 std::vector<bool> timeout_seq; 990 timeout_seq.push_back(true); 991 timeout_seq.push_back(false); 992 timeout_seq.push_back(false); 993 workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send)); 994 workers.push_back(new UnresponsiveClient(timeout_seq)); 995 RunTest(workers); 996} 997 998void SendWithTimeoutMixedOKAndTimeout(bool pump_during_send) { 999 std::vector<Worker*> workers; 1000 std::vector<bool> timeout_seq; 1001 timeout_seq.push_back(true); 1002 timeout_seq.push_back(false); 1003 timeout_seq.push_back(false); 1004 timeout_seq.push_back(true); 1005 timeout_seq.push_back(false); 1006 workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send)); 1007 workers.push_back(new UnresponsiveClient(timeout_seq)); 1008 RunTest(workers); 1009} 1010 1011} // namespace 1012 1013// Tests that SendWithTimeout does not time-out if the response comes back fast 1014// enough. 1015TEST_F(IPCSyncChannelTest, SendWithTimeoutOK) { 1016 SendWithTimeoutOK(false); 1017 SendWithTimeoutOK(true); 1018} 1019 1020// Tests that SendWithTimeout does time-out. 1021TEST_F(IPCSyncChannelTest, SendWithTimeoutTimeout) { 1022 SendWithTimeoutTimeout(false); 1023 SendWithTimeoutTimeout(true); 1024} 1025 1026// Sends some message that time-out and some that succeed. 1027// Crashes flakily, http://crbug.com/70075. 1028TEST_F(IPCSyncChannelTest, DISABLED_SendWithTimeoutMixedOKAndTimeout) { 1029 SendWithTimeoutMixedOKAndTimeout(false); 1030 SendWithTimeoutMixedOKAndTimeout(true); 1031} 1032 1033//------------------------------------------------------------------------------ 1034 1035namespace { 1036 1037void NestedCallback(Worker* server) { 1038 // Sleep a bit so that we wake up after the reply has been received. 1039 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(250)); 1040 server->SendAnswerToLife(true, base::kNoTimeout, true); 1041} 1042 1043bool timeout_occurred = false; 1044 1045void TimeoutCallback() { 1046 timeout_occurred = true; 1047} 1048 1049class DoneEventRaceServer : public Worker { 1050 public: 1051 DoneEventRaceServer() 1052 : Worker(Channel::MODE_SERVER, "done_event_race_server") { } 1053 1054 void Run() { 1055 MessageLoop::current()->PostTask(FROM_HERE, 1056 base::Bind(&NestedCallback, this)); 1057 MessageLoop::current()->PostDelayedTask( 1058 FROM_HERE, 1059 base::Bind(&TimeoutCallback), 1060 base::TimeDelta::FromSeconds(9)); 1061 // Even though we have a timeout on the Send, it will succeed since for this 1062 // bug, the reply message comes back and is deserialized, however the done 1063 // event wasn't set. So we indirectly use the timeout task to notice if a 1064 // timeout occurred. 1065 SendAnswerToLife(true, 10000, true); 1066 DCHECK(!timeout_occurred); 1067 Done(); 1068 } 1069}; 1070 1071} // namespace 1072 1073// Tests http://b/1474092 - that if after the done_event is set but before 1074// OnObjectSignaled is called another message is sent out, then after its 1075// reply comes back OnObjectSignaled will be called for the first message. 1076TEST_F(IPCSyncChannelTest, DoneEventRace) { 1077 std::vector<Worker*> workers; 1078 workers.push_back(new DoneEventRaceServer()); 1079 workers.push_back(new SimpleClient()); 1080 RunTest(workers); 1081} 1082 1083//----------------------------------------------------------------------------- 1084 1085namespace { 1086 1087class TestSyncMessageFilter : public SyncMessageFilter { 1088 public: 1089 TestSyncMessageFilter(base::WaitableEvent* shutdown_event, 1090 Worker* worker, 1091 scoped_refptr<base::MessageLoopProxy> message_loop) 1092 : SyncMessageFilter(shutdown_event), 1093 worker_(worker), 1094 message_loop_(message_loop) { 1095 } 1096 1097 virtual void OnFilterAdded(Channel* channel) { 1098 SyncMessageFilter::OnFilterAdded(channel); 1099 message_loop_->PostTask( 1100 FROM_HERE, 1101 base::Bind(&TestSyncMessageFilter::SendMessageOnHelperThread, this)); 1102 } 1103 1104 void SendMessageOnHelperThread() { 1105 int answer = 0; 1106 bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); 1107 DCHECK(result); 1108 DCHECK_EQ(answer, 42); 1109 1110 worker_->Done(); 1111 } 1112 1113 private: 1114 virtual ~TestSyncMessageFilter() {} 1115 1116 Worker* worker_; 1117 scoped_refptr<base::MessageLoopProxy> message_loop_; 1118}; 1119 1120class SyncMessageFilterServer : public Worker { 1121 public: 1122 SyncMessageFilterServer() 1123 : Worker(Channel::MODE_SERVER, "sync_message_filter_server"), 1124 thread_("helper_thread") { 1125 base::Thread::Options options; 1126 options.message_loop_type = MessageLoop::TYPE_DEFAULT; 1127 thread_.StartWithOptions(options); 1128 filter_ = new TestSyncMessageFilter(shutdown_event(), this, 1129 thread_.message_loop_proxy()); 1130 } 1131 1132 void Run() { 1133 channel()->AddFilter(filter_.get()); 1134 } 1135 1136 base::Thread thread_; 1137 scoped_refptr<TestSyncMessageFilter> filter_; 1138}; 1139 1140// This class provides functionality to test the case that a Send on the sync 1141// channel does not crash after the channel has been closed. 1142class ServerSendAfterClose : public Worker { 1143 public: 1144 ServerSendAfterClose() 1145 : Worker(Channel::MODE_SERVER, "simpler_server"), 1146 send_result_(true) { 1147 } 1148 1149 bool SendDummy() { 1150 ListenerThread()->message_loop()->PostTask( 1151 FROM_HERE, base::Bind(base::IgnoreResult(&ServerSendAfterClose::Send), 1152 this, new SyncChannelTestMsg_NoArgs)); 1153 return true; 1154 } 1155 1156 bool send_result() const { 1157 return send_result_; 1158 } 1159 1160 private: 1161 virtual void Run() { 1162 CloseChannel(); 1163 Done(); 1164 } 1165 1166 bool Send(Message* msg) { 1167 send_result_ = Worker::Send(msg); 1168 Done(); 1169 return send_result_; 1170 } 1171 1172 bool send_result_; 1173}; 1174 1175} // namespace 1176 1177// Tests basic synchronous call 1178TEST_F(IPCSyncChannelTest, SyncMessageFilter) { 1179 std::vector<Worker*> workers; 1180 workers.push_back(new SyncMessageFilterServer()); 1181 workers.push_back(new SimpleClient()); 1182 RunTest(workers); 1183} 1184 1185// Test the case when the channel is closed and a Send is attempted after that. 1186TEST_F(IPCSyncChannelTest, SendAfterClose) { 1187 ServerSendAfterClose server; 1188 server.Start(); 1189 1190 server.done_event()->Wait(); 1191 server.done_event()->Reset(); 1192 1193 server.SendDummy(); 1194 server.done_event()->Wait(); 1195 1196 EXPECT_FALSE(server.send_result()); 1197} 1198 1199//----------------------------------------------------------------------------- 1200 1201namespace { 1202 1203class RestrictedDispatchServer : public Worker { 1204 public: 1205 RestrictedDispatchServer(WaitableEvent* sent_ping_event, 1206 WaitableEvent* wait_event) 1207 : Worker("restricted_channel", Channel::MODE_SERVER), 1208 sent_ping_event_(sent_ping_event), 1209 wait_event_(wait_event) { } 1210 1211 void OnDoPing(int ping) { 1212 // Send an asynchronous message that unblocks the caller. 1213 Message* msg = new SyncChannelTestMsg_Ping(ping); 1214 msg->set_unblock(true); 1215 Send(msg); 1216 // Signal the event after the message has been sent on the channel, on the 1217 // IPC thread. 1218 ipc_thread().message_loop()->PostTask( 1219 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnPingSent, this)); 1220 } 1221 1222 void OnPingTTL(int ping, int* out) { 1223 *out = ping; 1224 wait_event_->Wait(); 1225 } 1226 1227 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1228 1229 private: 1230 bool OnMessageReceived(const Message& message) { 1231 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchServer, message) 1232 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1233 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL) 1234 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1235 IPC_END_MESSAGE_MAP() 1236 return true; 1237 } 1238 1239 void OnPingSent() { 1240 sent_ping_event_->Signal(); 1241 } 1242 1243 void OnNoArgs() { } 1244 WaitableEvent* sent_ping_event_; 1245 WaitableEvent* wait_event_; 1246}; 1247 1248class NonRestrictedDispatchServer : public Worker { 1249 public: 1250 NonRestrictedDispatchServer(WaitableEvent* signal_event) 1251 : Worker("non_restricted_channel", Channel::MODE_SERVER), 1252 signal_event_(signal_event) {} 1253 1254 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1255 1256 void OnDoPingTTL(int ping) { 1257 int value = 0; 1258 Send(new SyncChannelTestMsg_PingTTL(ping, &value)); 1259 signal_event_->Signal(); 1260 } 1261 1262 private: 1263 bool OnMessageReceived(const Message& message) { 1264 IPC_BEGIN_MESSAGE_MAP(NonRestrictedDispatchServer, message) 1265 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1266 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1267 IPC_END_MESSAGE_MAP() 1268 return true; 1269 } 1270 1271 void OnNoArgs() { } 1272 WaitableEvent* signal_event_; 1273}; 1274 1275class RestrictedDispatchClient : public Worker { 1276 public: 1277 RestrictedDispatchClient(WaitableEvent* sent_ping_event, 1278 RestrictedDispatchServer* server, 1279 NonRestrictedDispatchServer* server2, 1280 int* success) 1281 : Worker("restricted_channel", Channel::MODE_CLIENT), 1282 ping_(0), 1283 server_(server), 1284 server2_(server2), 1285 success_(success), 1286 sent_ping_event_(sent_ping_event) {} 1287 1288 void Run() { 1289 // Incoming messages from our channel should only be dispatched when we 1290 // send a message on that same channel. 1291 channel()->SetRestrictDispatchChannelGroup(1); 1292 1293 server_->ListenerThread()->message_loop()->PostTask( 1294 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 1)); 1295 sent_ping_event_->Wait(); 1296 Send(new SyncChannelTestMsg_NoArgs); 1297 if (ping_ == 1) 1298 ++*success_; 1299 else 1300 LOG(ERROR) << "Send failed to dispatch incoming message on same channel"; 1301 1302 non_restricted_channel_.reset(new SyncChannel( 1303 "non_restricted_channel", Channel::MODE_CLIENT, this, 1304 ipc_thread().message_loop_proxy(), true, shutdown_event())); 1305 1306 server_->ListenerThread()->message_loop()->PostTask( 1307 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 2)); 1308 sent_ping_event_->Wait(); 1309 // Check that the incoming message is *not* dispatched when sending on the 1310 // non restricted channel. 1311 // TODO(piman): there is a possibility of a false positive race condition 1312 // here, if the message that was posted on the server-side end of the pipe 1313 // is not visible yet on the client side, but I don't know how to solve this 1314 // without hooking into the internals of SyncChannel. I haven't seen it in 1315 // practice (i.e. not setting SetRestrictDispatchToSameChannel does cause 1316 // the following to fail). 1317 non_restricted_channel_->Send(new SyncChannelTestMsg_NoArgs); 1318 if (ping_ == 1) 1319 ++*success_; 1320 else 1321 LOG(ERROR) << "Send dispatched message from restricted channel"; 1322 1323 Send(new SyncChannelTestMsg_NoArgs); 1324 if (ping_ == 2) 1325 ++*success_; 1326 else 1327 LOG(ERROR) << "Send failed to dispatch incoming message on same channel"; 1328 1329 // Check that the incoming message on the non-restricted channel is 1330 // dispatched when sending on the restricted channel. 1331 server2_->ListenerThread()->message_loop()->PostTask( 1332 FROM_HERE, 1333 base::Bind(&NonRestrictedDispatchServer::OnDoPingTTL, server2_, 3)); 1334 int value = 0; 1335 Send(new SyncChannelTestMsg_PingTTL(4, &value)); 1336 if (ping_ == 3 && value == 4) 1337 ++*success_; 1338 else 1339 LOG(ERROR) << "Send failed to dispatch message from unrestricted channel"; 1340 1341 non_restricted_channel_->Send(new SyncChannelTestMsg_Done); 1342 non_restricted_channel_.reset(); 1343 Send(new SyncChannelTestMsg_Done); 1344 Done(); 1345 } 1346 1347 private: 1348 bool OnMessageReceived(const Message& message) { 1349 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchClient, message) 1350 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Ping, OnPing) 1351 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_PingTTL, OnPingTTL) 1352 IPC_END_MESSAGE_MAP() 1353 return true; 1354 } 1355 1356 void OnPing(int ping) { 1357 ping_ = ping; 1358 } 1359 1360 void OnPingTTL(int ping, IPC::Message* reply) { 1361 ping_ = ping; 1362 // This message comes from the NonRestrictedDispatchServer, we have to send 1363 // the reply back manually. 1364 SyncChannelTestMsg_PingTTL::WriteReplyParams(reply, ping); 1365 non_restricted_channel_->Send(reply); 1366 } 1367 1368 int ping_; 1369 RestrictedDispatchServer* server_; 1370 NonRestrictedDispatchServer* server2_; 1371 int* success_; 1372 WaitableEvent* sent_ping_event_; 1373 scoped_ptr<SyncChannel> non_restricted_channel_; 1374}; 1375 1376} // namespace 1377 1378TEST_F(IPCSyncChannelTest, RestrictedDispatch) { 1379 WaitableEvent sent_ping_event(false, false); 1380 WaitableEvent wait_event(false, false); 1381 RestrictedDispatchServer* server = 1382 new RestrictedDispatchServer(&sent_ping_event, &wait_event); 1383 NonRestrictedDispatchServer* server2 = 1384 new NonRestrictedDispatchServer(&wait_event); 1385 1386 int success = 0; 1387 std::vector<Worker*> workers; 1388 workers.push_back(server); 1389 workers.push_back(server2); 1390 workers.push_back(new RestrictedDispatchClient( 1391 &sent_ping_event, server, server2, &success)); 1392 RunTest(workers); 1393 EXPECT_EQ(4, success); 1394} 1395 1396//----------------------------------------------------------------------------- 1397 1398// This test case inspired by crbug.com/108491 1399// We create two servers that use the same ListenerThread but have 1400// SetRestrictDispatchToSameChannel set to true. 1401// We create clients, then use some specific WaitableEvent wait/signalling to 1402// ensure that messages get dispatched in a way that causes a deadlock due to 1403// a nested dispatch and an eligible message in a higher-level dispatch's 1404// delayed_queue. Specifically, we start with client1 about so send an 1405// unblocking message to server1, while the shared listener thread for the 1406// servers server1 and server2 is about to send a non-unblocking message to 1407// client1. At the same time, client2 will be about to send an unblocking 1408// message to server2. Server1 will handle the client1->server1 message by 1409// telling server2 to send a non-unblocking message to client2. 1410// What should happen is that the send to server2 should find the pending, 1411// same-context client2->server2 message to dispatch, causing client2 to 1412// unblock then handle the server2->client2 message, so that the shared 1413// servers' listener thread can then respond to the client1->server1 message. 1414// Then client1 can handle the non-unblocking server1->client1 message. 1415// The old code would end up in a state where the server2->client2 message is 1416// sent, but the client2->server2 message (which is eligible for dispatch, and 1417// which is what client2 is waiting for) is stashed in a local delayed_queue 1418// that has server1's channel context, causing a deadlock. 1419// WaitableEvents in the events array are used to: 1420// event 0: indicate to client1 that server listener is in OnDoServerTask 1421// event 1: indicate to client1 that client2 listener is in OnDoClient2Task 1422// event 2: indicate to server1 that client2 listener is in OnDoClient2Task 1423// event 3: indicate to client2 that server listener is in OnDoServerTask 1424 1425namespace { 1426 1427class RestrictedDispatchDeadlockServer : public Worker { 1428 public: 1429 RestrictedDispatchDeadlockServer(int server_num, 1430 WaitableEvent* server_ready_event, 1431 WaitableEvent** events, 1432 RestrictedDispatchDeadlockServer* peer) 1433 : Worker(server_num == 1 ? "channel1" : "channel2", Channel::MODE_SERVER), 1434 server_num_(server_num), 1435 server_ready_event_(server_ready_event), 1436 events_(events), 1437 peer_(peer) { } 1438 1439 void OnDoServerTask() { 1440 events_[3]->Signal(); 1441 events_[2]->Wait(); 1442 events_[0]->Signal(); 1443 SendMessageToClient(); 1444 } 1445 1446 void Run() { 1447 channel()->SetRestrictDispatchChannelGroup(1); 1448 server_ready_event_->Signal(); 1449 } 1450 1451 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1452 1453 private: 1454 bool OnMessageReceived(const Message& message) { 1455 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message) 1456 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1457 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1458 IPC_END_MESSAGE_MAP() 1459 return true; 1460 } 1461 1462 void OnNoArgs() { 1463 if (server_num_ == 1) { 1464 DCHECK(peer_ != NULL); 1465 peer_->SendMessageToClient(); 1466 } 1467 } 1468 1469 void SendMessageToClient() { 1470 Message* msg = new SyncChannelTestMsg_NoArgs; 1471 msg->set_unblock(false); 1472 DCHECK(!msg->should_unblock()); 1473 Send(msg); 1474 } 1475 1476 int server_num_; 1477 WaitableEvent* server_ready_event_; 1478 WaitableEvent** events_; 1479 RestrictedDispatchDeadlockServer* peer_; 1480}; 1481 1482class RestrictedDispatchDeadlockClient2 : public Worker { 1483 public: 1484 RestrictedDispatchDeadlockClient2(RestrictedDispatchDeadlockServer* server, 1485 WaitableEvent* server_ready_event, 1486 WaitableEvent** events) 1487 : Worker("channel2", Channel::MODE_CLIENT), 1488 server_ready_event_(server_ready_event), 1489 events_(events), 1490 received_msg_(false), 1491 received_noarg_reply_(false), 1492 done_issued_(false) {} 1493 1494 void Run() { 1495 server_ready_event_->Wait(); 1496 } 1497 1498 void OnDoClient2Task() { 1499 events_[3]->Wait(); 1500 events_[1]->Signal(); 1501 events_[2]->Signal(); 1502 DCHECK(received_msg_ == false); 1503 1504 Message* message = new SyncChannelTestMsg_NoArgs; 1505 message->set_unblock(true); 1506 Send(message); 1507 received_noarg_reply_ = true; 1508 } 1509 1510 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1511 private: 1512 bool OnMessageReceived(const Message& message) { 1513 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message) 1514 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1515 IPC_END_MESSAGE_MAP() 1516 return true; 1517 } 1518 1519 void OnNoArgs() { 1520 received_msg_ = true; 1521 PossiblyDone(); 1522 } 1523 1524 void PossiblyDone() { 1525 if (received_noarg_reply_ && received_msg_) { 1526 DCHECK(done_issued_ == false); 1527 done_issued_ = true; 1528 Send(new SyncChannelTestMsg_Done); 1529 Done(); 1530 } 1531 } 1532 1533 WaitableEvent* server_ready_event_; 1534 WaitableEvent** events_; 1535 bool received_msg_; 1536 bool received_noarg_reply_; 1537 bool done_issued_; 1538}; 1539 1540class RestrictedDispatchDeadlockClient1 : public Worker { 1541 public: 1542 RestrictedDispatchDeadlockClient1(RestrictedDispatchDeadlockServer* server, 1543 RestrictedDispatchDeadlockClient2* peer, 1544 WaitableEvent* server_ready_event, 1545 WaitableEvent** events) 1546 : Worker("channel1", Channel::MODE_CLIENT), 1547 server_(server), 1548 peer_(peer), 1549 server_ready_event_(server_ready_event), 1550 events_(events), 1551 received_msg_(false), 1552 received_noarg_reply_(false), 1553 done_issued_(false) {} 1554 1555 void Run() { 1556 server_ready_event_->Wait(); 1557 server_->ListenerThread()->message_loop()->PostTask( 1558 FROM_HERE, 1559 base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask, server_)); 1560 peer_->ListenerThread()->message_loop()->PostTask( 1561 FROM_HERE, 1562 base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task, peer_)); 1563 events_[0]->Wait(); 1564 events_[1]->Wait(); 1565 DCHECK(received_msg_ == false); 1566 1567 Message* message = new SyncChannelTestMsg_NoArgs; 1568 message->set_unblock(true); 1569 Send(message); 1570 received_noarg_reply_ = true; 1571 PossiblyDone(); 1572 } 1573 1574 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1575 private: 1576 bool OnMessageReceived(const Message& message) { 1577 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message) 1578 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1579 IPC_END_MESSAGE_MAP() 1580 return true; 1581 } 1582 1583 void OnNoArgs() { 1584 received_msg_ = true; 1585 PossiblyDone(); 1586 } 1587 1588 void PossiblyDone() { 1589 if (received_noarg_reply_ && received_msg_) { 1590 DCHECK(done_issued_ == false); 1591 done_issued_ = true; 1592 Send(new SyncChannelTestMsg_Done); 1593 Done(); 1594 } 1595 } 1596 1597 RestrictedDispatchDeadlockServer* server_; 1598 RestrictedDispatchDeadlockClient2* peer_; 1599 WaitableEvent* server_ready_event_; 1600 WaitableEvent** events_; 1601 bool received_msg_; 1602 bool received_noarg_reply_; 1603 bool done_issued_; 1604}; 1605 1606} // namespace 1607 1608TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) { 1609 std::vector<Worker*> workers; 1610 1611 // A shared worker thread so that server1 and server2 run on one thread. 1612 base::Thread worker_thread("RestrictedDispatchDeadlock"); 1613 ASSERT_TRUE(worker_thread.Start()); 1614 1615 WaitableEvent server1_ready(false, false); 1616 WaitableEvent server2_ready(false, false); 1617 1618 WaitableEvent event0(false, false); 1619 WaitableEvent event1(false, false); 1620 WaitableEvent event2(false, false); 1621 WaitableEvent event3(false, false); 1622 WaitableEvent* events[4] = {&event0, &event1, &event2, &event3}; 1623 1624 RestrictedDispatchDeadlockServer* server1; 1625 RestrictedDispatchDeadlockServer* server2; 1626 RestrictedDispatchDeadlockClient1* client1; 1627 RestrictedDispatchDeadlockClient2* client2; 1628 1629 server2 = new RestrictedDispatchDeadlockServer(2, &server2_ready, events, 1630 NULL); 1631 server2->OverrideThread(&worker_thread); 1632 workers.push_back(server2); 1633 1634 client2 = new RestrictedDispatchDeadlockClient2(server2, &server2_ready, 1635 events); 1636 workers.push_back(client2); 1637 1638 server1 = new RestrictedDispatchDeadlockServer(1, &server1_ready, events, 1639 server2); 1640 server1->OverrideThread(&worker_thread); 1641 workers.push_back(server1); 1642 1643 client1 = new RestrictedDispatchDeadlockClient1(server1, client2, 1644 &server1_ready, events); 1645 workers.push_back(client1); 1646 1647 RunTest(workers); 1648} 1649 1650//----------------------------------------------------------------------------- 1651 1652// This test case inspired by crbug.com/120530 1653// We create 4 workers that pipe to each other W1->W2->W3->W4->W1 then we send a 1654// message that recurses through 3, 4 or 5 steps to make sure, say, W1 can 1655// re-enter when called from W4 while it's sending a message to W2. 1656// The first worker drives the whole test so it must be treated specially. 1657namespace { 1658 1659class RestrictedDispatchPipeWorker : public Worker { 1660 public: 1661 RestrictedDispatchPipeWorker( 1662 const std::string &channel1, 1663 WaitableEvent* event1, 1664 const std::string &channel2, 1665 WaitableEvent* event2, 1666 int group, 1667 int* success) 1668 : Worker(channel1, Channel::MODE_SERVER), 1669 event1_(event1), 1670 event2_(event2), 1671 other_channel_name_(channel2), 1672 group_(group), 1673 success_(success) { 1674 } 1675 1676 void OnPingTTL(int ping, int* ret) { 1677 *ret = 0; 1678 if (!ping) 1679 return; 1680 other_channel_->Send(new SyncChannelTestMsg_PingTTL(ping - 1, ret)); 1681 ++*ret; 1682 } 1683 1684 void OnDone() { 1685 if (is_first()) 1686 return; 1687 other_channel_->Send(new SyncChannelTestMsg_Done); 1688 other_channel_.reset(); 1689 Done(); 1690 } 1691 1692 void Run() { 1693 channel()->SetRestrictDispatchChannelGroup(group_); 1694 if (is_first()) 1695 event1_->Signal(); 1696 event2_->Wait(); 1697 other_channel_.reset(new SyncChannel( 1698 other_channel_name_, Channel::MODE_CLIENT, this, 1699 ipc_thread().message_loop_proxy(), true, shutdown_event())); 1700 other_channel_->SetRestrictDispatchChannelGroup(group_); 1701 if (!is_first()) { 1702 event1_->Signal(); 1703 return; 1704 } 1705 *success_ = 0; 1706 int value = 0; 1707 OnPingTTL(3, &value); 1708 *success_ += (value == 3); 1709 OnPingTTL(4, &value); 1710 *success_ += (value == 4); 1711 OnPingTTL(5, &value); 1712 *success_ += (value == 5); 1713 other_channel_->Send(new SyncChannelTestMsg_Done); 1714 other_channel_.reset(); 1715 Done(); 1716 } 1717 1718 bool is_first() { return !!success_; } 1719 1720 private: 1721 bool OnMessageReceived(const Message& message) { 1722 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchPipeWorker, message) 1723 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL) 1724 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, OnDone) 1725 IPC_END_MESSAGE_MAP() 1726 return true; 1727 } 1728 1729 scoped_ptr<SyncChannel> other_channel_; 1730 WaitableEvent* event1_; 1731 WaitableEvent* event2_; 1732 std::string other_channel_name_; 1733 int group_; 1734 int* success_; 1735}; 1736 1737} // namespace 1738 1739TEST_F(IPCSyncChannelTest, RestrictedDispatch4WayDeadlock) { 1740 int success = 0; 1741 std::vector<Worker*> workers; 1742 WaitableEvent event0(true, false); 1743 WaitableEvent event1(true, false); 1744 WaitableEvent event2(true, false); 1745 WaitableEvent event3(true, false); 1746 workers.push_back(new RestrictedDispatchPipeWorker( 1747 "channel0", &event0, "channel1", &event1, 1, &success)); 1748 workers.push_back(new RestrictedDispatchPipeWorker( 1749 "channel1", &event1, "channel2", &event2, 2, NULL)); 1750 workers.push_back(new RestrictedDispatchPipeWorker( 1751 "channel2", &event2, "channel3", &event3, 3, NULL)); 1752 workers.push_back(new RestrictedDispatchPipeWorker( 1753 "channel3", &event3, "channel0", &event0, 4, NULL)); 1754 RunTest(workers); 1755 EXPECT_EQ(3, success); 1756} 1757 1758 1759//----------------------------------------------------------------------------- 1760// 1761// This test case inspired by crbug.com/122443 1762// We want to make sure a reply message with the unblock flag set correctly 1763// behaves as a reply, not a regular message. 1764// We have 3 workers. Server1 will send a message to Server2 (which will block), 1765// during which it will dispatch a message comming from Client, at which point 1766// it will send another message to Server2. While sending that second message it 1767// will receive a reply from Server1 with the unblock flag. 1768 1769namespace { 1770 1771class ReentrantReplyServer1 : public Worker { 1772 public: 1773 ReentrantReplyServer1(WaitableEvent* server_ready) 1774 : Worker("reentrant_reply1", Channel::MODE_SERVER), 1775 server_ready_(server_ready) { } 1776 1777 void Run() { 1778 server2_channel_.reset(new SyncChannel( 1779 "reentrant_reply2", Channel::MODE_CLIENT, this, 1780 ipc_thread().message_loop_proxy(), true, shutdown_event())); 1781 server_ready_->Signal(); 1782 Message* msg = new SyncChannelTestMsg_Reentrant1(); 1783 server2_channel_->Send(msg); 1784 server2_channel_.reset(); 1785 Done(); 1786 } 1787 1788 private: 1789 bool OnMessageReceived(const Message& message) { 1790 IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer1, message) 1791 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant2, OnReentrant2) 1792 IPC_REPLY_HANDLER(OnReply) 1793 IPC_END_MESSAGE_MAP() 1794 return true; 1795 } 1796 1797 void OnReentrant2() { 1798 Message* msg = new SyncChannelTestMsg_Reentrant3(); 1799 server2_channel_->Send(msg); 1800 } 1801 1802 void OnReply(const Message& message) { 1803 // If we get here, the Send() will never receive the reply (thus would 1804 // hang), so abort instead. 1805 LOG(FATAL) << "Reply message was dispatched"; 1806 } 1807 1808 WaitableEvent* server_ready_; 1809 scoped_ptr<SyncChannel> server2_channel_; 1810}; 1811 1812class ReentrantReplyServer2 : public Worker { 1813 public: 1814 ReentrantReplyServer2() 1815 : Worker("reentrant_reply2", Channel::MODE_SERVER), 1816 reply_(NULL) { } 1817 1818 private: 1819 bool OnMessageReceived(const Message& message) { 1820 IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer2, message) 1821 IPC_MESSAGE_HANDLER_DELAY_REPLY( 1822 SyncChannelTestMsg_Reentrant1, OnReentrant1) 1823 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant3, OnReentrant3) 1824 IPC_END_MESSAGE_MAP() 1825 return true; 1826 } 1827 1828 void OnReentrant1(Message* reply) { 1829 DCHECK(!reply_); 1830 reply_ = reply; 1831 } 1832 1833 void OnReentrant3() { 1834 DCHECK(reply_); 1835 Message* reply = reply_; 1836 reply_ = NULL; 1837 reply->set_unblock(true); 1838 Send(reply); 1839 Done(); 1840 } 1841 1842 Message* reply_; 1843}; 1844 1845class ReentrantReplyClient : public Worker { 1846 public: 1847 ReentrantReplyClient(WaitableEvent* server_ready) 1848 : Worker("reentrant_reply1", Channel::MODE_CLIENT), 1849 server_ready_(server_ready) { } 1850 1851 void Run() { 1852 server_ready_->Wait(); 1853 Send(new SyncChannelTestMsg_Reentrant2()); 1854 Done(); 1855 } 1856 1857 private: 1858 WaitableEvent* server_ready_; 1859}; 1860 1861} // namespace 1862 1863TEST_F(IPCSyncChannelTest, ReentrantReply) { 1864 std::vector<Worker*> workers; 1865 WaitableEvent server_ready(false, false); 1866 workers.push_back(new ReentrantReplyServer2()); 1867 workers.push_back(new ReentrantReplyServer1(&server_ready)); 1868 workers.push_back(new ReentrantReplyClient(&server_ready)); 1869 RunTest(workers); 1870} 1871 1872//----------------------------------------------------------------------------- 1873 1874// Generate a validated channel ID using Channel::GenerateVerifiedChannelID(). 1875namespace { 1876 1877class VerifiedServer : public Worker { 1878 public: 1879 VerifiedServer(base::Thread* listener_thread, 1880 const std::string& channel_name, 1881 const std::string& reply_text) 1882 : Worker(channel_name, Channel::MODE_SERVER), 1883 reply_text_(reply_text) { 1884 Worker::OverrideThread(listener_thread); 1885 } 1886 1887 virtual void OnNestedTestMsg(Message* reply_msg) { 1888 VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_; 1889 SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_); 1890 Send(reply_msg); 1891 ASSERT_EQ(channel()->peer_pid(), base::GetCurrentProcId()); 1892 Done(); 1893 } 1894 1895 private: 1896 std::string reply_text_; 1897}; 1898 1899class VerifiedClient : public Worker { 1900 public: 1901 VerifiedClient(base::Thread* listener_thread, 1902 const std::string& channel_name, 1903 const std::string& expected_text) 1904 : Worker(channel_name, Channel::MODE_CLIENT), 1905 expected_text_(expected_text) { 1906 Worker::OverrideThread(listener_thread); 1907 } 1908 1909 virtual void Run() { 1910 std::string response; 1911 SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response); 1912 bool result = Send(msg); 1913 DCHECK(result); 1914 DCHECK_EQ(response, expected_text_); 1915 // expected_text_ is only used in the above DCHECK. This line suppresses the 1916 // "unused private field" warning in release builds. 1917 (void)expected_text_; 1918 1919 VLOG(1) << __FUNCTION__ << " Received reply: " << response; 1920 ASSERT_EQ(channel()->peer_pid(), base::GetCurrentProcId()); 1921 Done(); 1922 } 1923 1924 private: 1925 std::string expected_text_; 1926}; 1927 1928void Verified() { 1929 std::vector<Worker*> workers; 1930 1931 // A shared worker thread for servers 1932 base::Thread server_worker_thread("Verified_ServerListener"); 1933 ASSERT_TRUE(server_worker_thread.Start()); 1934 1935 base::Thread client_worker_thread("Verified_ClientListener"); 1936 ASSERT_TRUE(client_worker_thread.Start()); 1937 1938 std::string channel_id = Channel::GenerateVerifiedChannelID("Verified"); 1939 Worker* worker; 1940 1941 worker = new VerifiedServer(&server_worker_thread, 1942 channel_id, 1943 "Got first message"); 1944 workers.push_back(worker); 1945 1946 worker = new VerifiedClient(&client_worker_thread, 1947 channel_id, 1948 "Got first message"); 1949 workers.push_back(worker); 1950 1951 RunTest(workers); 1952 1953#if defined(OS_WIN) 1954#endif 1955} 1956 1957} // namespace 1958 1959// Windows needs to send an out-of-band secret to verify the client end of the 1960// channel. Test that we still connect correctly in that case. 1961TEST_F(IPCSyncChannelTest, Verified) { 1962 Verified(); 1963} 1964 1965} // namespace IPC 1966