ipc_sync_channel_unittest.cc revision 0529e5d033099cbfc42635f6f6183833b09dff6e
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "ipc/ipc_sync_channel.h" 6 7#include <string> 8#include <vector> 9 10#include "base/basictypes.h" 11#include "base/bind.h" 12#include "base/logging.h" 13#include "base/memory/scoped_ptr.h" 14#include "base/message_loop/message_loop.h" 15#include "base/process/process_handle.h" 16#include "base/run_loop.h" 17#include "base/strings/string_util.h" 18#include "base/synchronization/waitable_event.h" 19#include "base/threading/platform_thread.h" 20#include "base/threading/thread.h" 21#include "ipc/ipc_listener.h" 22#include "ipc/ipc_message.h" 23#include "ipc/ipc_sender.h" 24#include "ipc/ipc_sync_message_filter.h" 25#include "ipc/ipc_sync_message_unittest.h" 26#include "testing/gtest/include/gtest/gtest.h" 27 28using base::WaitableEvent; 29 30namespace IPC { 31namespace { 32 33// Base class for a "process" with listener and IPC threads. 34class Worker : public Listener, public Sender { 35 public: 36 // Will create a channel without a name. 37 Worker(Channel::Mode mode, const std::string& thread_name) 38 : done_(new WaitableEvent(false, false)), 39 channel_created_(new WaitableEvent(false, false)), 40 mode_(mode), 41 ipc_thread_((thread_name + "_ipc").c_str()), 42 listener_thread_((thread_name + "_listener").c_str()), 43 overrided_thread_(NULL), 44 shutdown_event_(true, false), 45 is_shutdown_(false) { 46 } 47 48 // Will create a named channel and use this name for the threads' name. 49 Worker(const std::string& channel_name, Channel::Mode mode) 50 : done_(new WaitableEvent(false, false)), 51 channel_created_(new WaitableEvent(false, false)), 52 channel_name_(channel_name), 53 mode_(mode), 54 ipc_thread_((channel_name + "_ipc").c_str()), 55 listener_thread_((channel_name + "_listener").c_str()), 56 overrided_thread_(NULL), 57 shutdown_event_(true, false), 58 is_shutdown_(false) { 59 } 60 61 virtual ~Worker() { 62 // Shutdown() must be called before destruction. 63 CHECK(is_shutdown_); 64 } 65 void AddRef() { } 66 void Release() { } 67 virtual bool Send(Message* msg) OVERRIDE { return channel_->Send(msg); } 68 void WaitForChannelCreation() { channel_created_->Wait(); } 69 void CloseChannel() { 70 DCHECK(base::MessageLoop::current() == ListenerThread()->message_loop()); 71 channel_->Close(); 72 } 73 void Start() { 74 StartThread(&listener_thread_, base::MessageLoop::TYPE_DEFAULT); 75 ListenerThread()->message_loop()->PostTask( 76 FROM_HERE, base::Bind(&Worker::OnStart, this)); 77 } 78 void Shutdown() { 79 // The IPC thread needs to outlive SyncChannel. We can't do this in 80 // ~Worker(), since that'll reset the vtable pointer (to Worker's), which 81 // may result in a race conditions. See http://crbug.com/25841. 82 WaitableEvent listener_done(false, false), ipc_done(false, false); 83 ListenerThread()->message_loop()->PostTask( 84 FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown1, this, 85 &listener_done, &ipc_done)); 86 listener_done.Wait(); 87 ipc_done.Wait(); 88 ipc_thread_.Stop(); 89 listener_thread_.Stop(); 90 is_shutdown_ = true; 91 } 92 void OverrideThread(base::Thread* overrided_thread) { 93 DCHECK(overrided_thread_ == NULL); 94 overrided_thread_ = overrided_thread; 95 } 96 bool SendAnswerToLife(bool pump, bool succeed) { 97 int answer = 0; 98 SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); 99 if (pump) 100 msg->EnableMessagePumping(); 101 bool result = Send(msg); 102 DCHECK_EQ(result, succeed); 103 DCHECK_EQ(answer, (succeed ? 42 : 0)); 104 return result; 105 } 106 bool SendDouble(bool pump, bool succeed) { 107 int answer = 0; 108 SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer); 109 if (pump) 110 msg->EnableMessagePumping(); 111 bool result = Send(msg); 112 DCHECK_EQ(result, succeed); 113 DCHECK_EQ(answer, (succeed ? 10 : 0)); 114 return result; 115 } 116 const std::string& channel_name() { return channel_name_; } 117 Channel::Mode mode() { return mode_; } 118 WaitableEvent* done_event() { return done_.get(); } 119 WaitableEvent* shutdown_event() { return &shutdown_event_; } 120 void ResetChannel() { channel_.reset(); } 121 // Derived classes need to call this when they've completed their part of 122 // the test. 123 void Done() { done_->Signal(); } 124 125 protected: 126 SyncChannel* channel() { return channel_.get(); } 127 // Functions for dervied classes to implement if they wish. 128 virtual void Run() { } 129 virtual void OnAnswer(int* answer) { NOTREACHED(); } 130 virtual void OnAnswerDelay(Message* reply_msg) { 131 // The message handler map below can only take one entry for 132 // SyncChannelTestMsg_AnswerToLife, so since some classes want 133 // the normal version while other want the delayed reply, we 134 // call the normal version if the derived class didn't override 135 // this function. 136 int answer; 137 OnAnswer(&answer); 138 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer); 139 Send(reply_msg); 140 } 141 virtual void OnDouble(int in, int* out) { NOTREACHED(); } 142 virtual void OnDoubleDelay(int in, Message* reply_msg) { 143 int result; 144 OnDouble(in, &result); 145 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result); 146 Send(reply_msg); 147 } 148 149 virtual void OnNestedTestMsg(Message* reply_msg) { 150 NOTREACHED(); 151 } 152 153 virtual SyncChannel* CreateChannel() { 154 return new SyncChannel(channel_name_, 155 mode_, 156 this, 157 ipc_thread_.message_loop_proxy().get(), 158 true, 159 &shutdown_event_); 160 } 161 162 base::Thread* ListenerThread() { 163 return overrided_thread_ ? overrided_thread_ : &listener_thread_; 164 } 165 166 const base::Thread& ipc_thread() const { return ipc_thread_; } 167 168 private: 169 // Called on the listener thread to create the sync channel. 170 void OnStart() { 171 // Link ipc_thread_, listener_thread_ and channel_ altogether. 172 StartThread(&ipc_thread_, base::MessageLoop::TYPE_IO); 173 channel_.reset(CreateChannel()); 174 channel_created_->Signal(); 175 Run(); 176 } 177 178 void OnListenerThreadShutdown1(WaitableEvent* listener_event, 179 WaitableEvent* ipc_event) { 180 // SyncChannel needs to be destructed on the thread that it was created on. 181 channel_.reset(); 182 183 base::RunLoop().RunUntilIdle(); 184 185 ipc_thread_.message_loop()->PostTask( 186 FROM_HERE, base::Bind(&Worker::OnIPCThreadShutdown, this, 187 listener_event, ipc_event)); 188 } 189 190 void OnIPCThreadShutdown(WaitableEvent* listener_event, 191 WaitableEvent* ipc_event) { 192 base::RunLoop().RunUntilIdle(); 193 ipc_event->Signal(); 194 195 listener_thread_.message_loop()->PostTask( 196 FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown2, this, 197 listener_event)); 198 } 199 200 void OnListenerThreadShutdown2(WaitableEvent* listener_event) { 201 base::RunLoop().RunUntilIdle(); 202 listener_event->Signal(); 203 } 204 205 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 206 IPC_BEGIN_MESSAGE_MAP(Worker, message) 207 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay) 208 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife, 209 OnAnswerDelay) 210 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelNestedTestMsg_String, 211 OnNestedTestMsg) 212 IPC_END_MESSAGE_MAP() 213 return true; 214 } 215 216 void StartThread(base::Thread* thread, base::MessageLoop::Type type) { 217 base::Thread::Options options; 218 options.message_loop_type = type; 219 thread->StartWithOptions(options); 220 } 221 222 scoped_ptr<WaitableEvent> done_; 223 scoped_ptr<WaitableEvent> channel_created_; 224 std::string channel_name_; 225 Channel::Mode mode_; 226 scoped_ptr<SyncChannel> channel_; 227 base::Thread ipc_thread_; 228 base::Thread listener_thread_; 229 base::Thread* overrided_thread_; 230 231 base::WaitableEvent shutdown_event_; 232 233 bool is_shutdown_; 234 235 DISALLOW_COPY_AND_ASSIGN(Worker); 236}; 237 238 239// Starts the test with the given workers. This function deletes the workers 240// when it's done. 241void RunTest(std::vector<Worker*> workers) { 242 // First we create the workers that are channel servers, or else the other 243 // workers' channel initialization might fail because the pipe isn't created.. 244 for (size_t i = 0; i < workers.size(); ++i) { 245 if (workers[i]->mode() & Channel::MODE_SERVER_FLAG) { 246 workers[i]->Start(); 247 workers[i]->WaitForChannelCreation(); 248 } 249 } 250 251 // now create the clients 252 for (size_t i = 0; i < workers.size(); ++i) { 253 if (workers[i]->mode() & Channel::MODE_CLIENT_FLAG) 254 workers[i]->Start(); 255 } 256 257 // wait for all the workers to finish 258 for (size_t i = 0; i < workers.size(); ++i) 259 workers[i]->done_event()->Wait(); 260 261 for (size_t i = 0; i < workers.size(); ++i) { 262 workers[i]->Shutdown(); 263 delete workers[i]; 264 } 265} 266 267class IPCSyncChannelTest : public testing::Test { 268 private: 269 base::MessageLoop message_loop_; 270}; 271 272//------------------------------------------------------------------------------ 273 274class SimpleServer : public Worker { 275 public: 276 explicit SimpleServer(bool pump_during_send) 277 : Worker(Channel::MODE_SERVER, "simpler_server"), 278 pump_during_send_(pump_during_send) { } 279 virtual void Run() OVERRIDE { 280 SendAnswerToLife(pump_during_send_, true); 281 Done(); 282 } 283 284 bool pump_during_send_; 285}; 286 287class SimpleClient : public Worker { 288 public: 289 SimpleClient() : Worker(Channel::MODE_CLIENT, "simple_client") { } 290 291 virtual void OnAnswer(int* answer) OVERRIDE { 292 *answer = 42; 293 Done(); 294 } 295}; 296 297void Simple(bool pump_during_send) { 298 std::vector<Worker*> workers; 299 workers.push_back(new SimpleServer(pump_during_send)); 300 workers.push_back(new SimpleClient()); 301 RunTest(workers); 302} 303 304// Tests basic synchronous call 305TEST_F(IPCSyncChannelTest, Simple) { 306 Simple(false); 307 Simple(true); 308} 309 310//------------------------------------------------------------------------------ 311 312// Worker classes which override how the sync channel is created to use the 313// two-step initialization (calling the lightweight constructor and then 314// ChannelProxy::Init separately) process. 315class TwoStepServer : public Worker { 316 public: 317 explicit TwoStepServer(bool create_pipe_now) 318 : Worker(Channel::MODE_SERVER, "simpler_server"), 319 create_pipe_now_(create_pipe_now) { } 320 321 virtual void Run() OVERRIDE { 322 SendAnswerToLife(false, true); 323 Done(); 324 } 325 326 virtual SyncChannel* CreateChannel() OVERRIDE { 327 SyncChannel* channel = new SyncChannel( 328 this, ipc_thread().message_loop_proxy().get(), shutdown_event()); 329 channel->Init(channel_name(), mode(), create_pipe_now_); 330 return channel; 331 } 332 333 bool create_pipe_now_; 334}; 335 336class TwoStepClient : public Worker { 337 public: 338 TwoStepClient(bool create_pipe_now) 339 : Worker(Channel::MODE_CLIENT, "simple_client"), 340 create_pipe_now_(create_pipe_now) { } 341 342 virtual void OnAnswer(int* answer) OVERRIDE { 343 *answer = 42; 344 Done(); 345 } 346 347 virtual SyncChannel* CreateChannel() OVERRIDE { 348 SyncChannel* channel = new SyncChannel( 349 this, ipc_thread().message_loop_proxy().get(), shutdown_event()); 350 channel->Init(channel_name(), mode(), create_pipe_now_); 351 return channel; 352 } 353 354 bool create_pipe_now_; 355}; 356 357void TwoStep(bool create_server_pipe_now, bool create_client_pipe_now) { 358 std::vector<Worker*> workers; 359 workers.push_back(new TwoStepServer(create_server_pipe_now)); 360 workers.push_back(new TwoStepClient(create_client_pipe_now)); 361 RunTest(workers); 362} 363 364// Tests basic two-step initialization, where you call the lightweight 365// constructor then Init. 366TEST_F(IPCSyncChannelTest, TwoStepInitialization) { 367 TwoStep(false, false); 368 TwoStep(false, true); 369 TwoStep(true, false); 370 TwoStep(true, true); 371} 372 373//------------------------------------------------------------------------------ 374 375class DelayClient : public Worker { 376 public: 377 DelayClient() : Worker(Channel::MODE_CLIENT, "delay_client") { } 378 379 virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE { 380 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 381 Send(reply_msg); 382 Done(); 383 } 384}; 385 386void DelayReply(bool pump_during_send) { 387 std::vector<Worker*> workers; 388 workers.push_back(new SimpleServer(pump_during_send)); 389 workers.push_back(new DelayClient()); 390 RunTest(workers); 391} 392 393// Tests that asynchronous replies work 394TEST_F(IPCSyncChannelTest, DelayReply) { 395 DelayReply(false); 396 DelayReply(true); 397} 398 399//------------------------------------------------------------------------------ 400 401class NoHangServer : public Worker { 402 public: 403 NoHangServer(WaitableEvent* got_first_reply, bool pump_during_send) 404 : Worker(Channel::MODE_SERVER, "no_hang_server"), 405 got_first_reply_(got_first_reply), 406 pump_during_send_(pump_during_send) { } 407 virtual void Run() OVERRIDE { 408 SendAnswerToLife(pump_during_send_, true); 409 got_first_reply_->Signal(); 410 411 SendAnswerToLife(pump_during_send_, false); 412 Done(); 413 } 414 415 WaitableEvent* got_first_reply_; 416 bool pump_during_send_; 417}; 418 419class NoHangClient : public Worker { 420 public: 421 explicit NoHangClient(WaitableEvent* got_first_reply) 422 : Worker(Channel::MODE_CLIENT, "no_hang_client"), 423 got_first_reply_(got_first_reply) { } 424 425 virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE { 426 // Use the DELAY_REPLY macro so that we can force the reply to be sent 427 // before this function returns (when the channel will be reset). 428 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 429 Send(reply_msg); 430 got_first_reply_->Wait(); 431 CloseChannel(); 432 Done(); 433 } 434 435 WaitableEvent* got_first_reply_; 436}; 437 438void NoHang(bool pump_during_send) { 439 WaitableEvent got_first_reply(false, false); 440 std::vector<Worker*> workers; 441 workers.push_back(new NoHangServer(&got_first_reply, pump_during_send)); 442 workers.push_back(new NoHangClient(&got_first_reply)); 443 RunTest(workers); 444} 445 446// Tests that caller doesn't hang if receiver dies 447TEST_F(IPCSyncChannelTest, NoHang) { 448 NoHang(false); 449 NoHang(true); 450} 451 452//------------------------------------------------------------------------------ 453 454class UnblockServer : public Worker { 455 public: 456 UnblockServer(bool pump_during_send, bool delete_during_send) 457 : Worker(Channel::MODE_SERVER, "unblock_server"), 458 pump_during_send_(pump_during_send), 459 delete_during_send_(delete_during_send) { } 460 virtual void Run() OVERRIDE { 461 if (delete_during_send_) { 462 // Use custom code since race conditions mean the answer may or may not be 463 // available. 464 int answer = 0; 465 SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer); 466 if (pump_during_send_) 467 msg->EnableMessagePumping(); 468 Send(msg); 469 } else { 470 SendAnswerToLife(pump_during_send_, true); 471 } 472 Done(); 473 } 474 475 virtual void OnDoubleDelay(int in, Message* reply_msg) OVERRIDE { 476 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2); 477 Send(reply_msg); 478 if (delete_during_send_) 479 ResetChannel(); 480 } 481 482 bool pump_during_send_; 483 bool delete_during_send_; 484}; 485 486class UnblockClient : public Worker { 487 public: 488 explicit UnblockClient(bool pump_during_send) 489 : Worker(Channel::MODE_CLIENT, "unblock_client"), 490 pump_during_send_(pump_during_send) { } 491 492 virtual void OnAnswer(int* answer) OVERRIDE { 493 SendDouble(pump_during_send_, true); 494 *answer = 42; 495 Done(); 496 } 497 498 bool pump_during_send_; 499}; 500 501void Unblock(bool server_pump, bool client_pump, bool delete_during_send) { 502 std::vector<Worker*> workers; 503 workers.push_back(new UnblockServer(server_pump, delete_during_send)); 504 workers.push_back(new UnblockClient(client_pump)); 505 RunTest(workers); 506} 507 508// Tests that the caller unblocks to answer a sync message from the receiver. 509TEST_F(IPCSyncChannelTest, Unblock) { 510 Unblock(false, false, false); 511 Unblock(false, true, false); 512 Unblock(true, false, false); 513 Unblock(true, true, false); 514} 515 516//------------------------------------------------------------------------------ 517 518// Tests that the the SyncChannel object can be deleted during a Send. 519TEST_F(IPCSyncChannelTest, ChannelDeleteDuringSend) { 520 Unblock(false, false, true); 521 Unblock(false, true, true); 522 Unblock(true, false, true); 523 Unblock(true, true, true); 524} 525 526//------------------------------------------------------------------------------ 527 528class RecursiveServer : public Worker { 529 public: 530 RecursiveServer(bool expected_send_result, bool pump_first, bool pump_second) 531 : Worker(Channel::MODE_SERVER, "recursive_server"), 532 expected_send_result_(expected_send_result), 533 pump_first_(pump_first), pump_second_(pump_second) {} 534 virtual void Run() OVERRIDE { 535 SendDouble(pump_first_, expected_send_result_); 536 Done(); 537 } 538 539 virtual void OnDouble(int in, int* out) OVERRIDE { 540 *out = in * 2; 541 SendAnswerToLife(pump_second_, expected_send_result_); 542 } 543 544 bool expected_send_result_, pump_first_, pump_second_; 545}; 546 547class RecursiveClient : public Worker { 548 public: 549 RecursiveClient(bool pump_during_send, bool close_channel) 550 : Worker(Channel::MODE_CLIENT, "recursive_client"), 551 pump_during_send_(pump_during_send), close_channel_(close_channel) {} 552 553 virtual void OnDoubleDelay(int in, Message* reply_msg) OVERRIDE { 554 SendDouble(pump_during_send_, !close_channel_); 555 if (close_channel_) { 556 delete reply_msg; 557 } else { 558 SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2); 559 Send(reply_msg); 560 } 561 Done(); 562 } 563 564 virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE { 565 if (close_channel_) { 566 delete reply_msg; 567 CloseChannel(); 568 } else { 569 SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42); 570 Send(reply_msg); 571 } 572 } 573 574 bool pump_during_send_, close_channel_; 575}; 576 577void Recursive( 578 bool server_pump_first, bool server_pump_second, bool client_pump) { 579 std::vector<Worker*> workers; 580 workers.push_back( 581 new RecursiveServer(true, server_pump_first, server_pump_second)); 582 workers.push_back(new RecursiveClient(client_pump, false)); 583 RunTest(workers); 584} 585 586// Tests a server calling Send while another Send is pending. 587TEST_F(IPCSyncChannelTest, Recursive) { 588 Recursive(false, false, false); 589 Recursive(false, false, true); 590 Recursive(false, true, false); 591 Recursive(false, true, true); 592 Recursive(true, false, false); 593 Recursive(true, false, true); 594 Recursive(true, true, false); 595 Recursive(true, true, true); 596} 597 598//------------------------------------------------------------------------------ 599 600void RecursiveNoHang( 601 bool server_pump_first, bool server_pump_second, bool client_pump) { 602 std::vector<Worker*> workers; 603 workers.push_back( 604 new RecursiveServer(false, server_pump_first, server_pump_second)); 605 workers.push_back(new RecursiveClient(client_pump, true)); 606 RunTest(workers); 607} 608 609// Tests that if a caller makes a sync call during an existing sync call and 610// the receiver dies, neither of the Send() calls hang. 611TEST_F(IPCSyncChannelTest, RecursiveNoHang) { 612 RecursiveNoHang(false, false, false); 613 RecursiveNoHang(false, false, true); 614 RecursiveNoHang(false, true, false); 615 RecursiveNoHang(false, true, true); 616 RecursiveNoHang(true, false, false); 617 RecursiveNoHang(true, false, true); 618 RecursiveNoHang(true, true, false); 619 RecursiveNoHang(true, true, true); 620} 621 622//------------------------------------------------------------------------------ 623 624class MultipleServer1 : public Worker { 625 public: 626 explicit MultipleServer1(bool pump_during_send) 627 : Worker("test_channel1", Channel::MODE_SERVER), 628 pump_during_send_(pump_during_send) { } 629 630 virtual void Run() OVERRIDE { 631 SendDouble(pump_during_send_, true); 632 Done(); 633 } 634 635 bool pump_during_send_; 636}; 637 638class MultipleClient1 : public Worker { 639 public: 640 MultipleClient1(WaitableEvent* client1_msg_received, 641 WaitableEvent* client1_can_reply) : 642 Worker("test_channel1", Channel::MODE_CLIENT), 643 client1_msg_received_(client1_msg_received), 644 client1_can_reply_(client1_can_reply) { } 645 646 virtual void OnDouble(int in, int* out) OVERRIDE { 647 client1_msg_received_->Signal(); 648 *out = in * 2; 649 client1_can_reply_->Wait(); 650 Done(); 651 } 652 653 private: 654 WaitableEvent *client1_msg_received_, *client1_can_reply_; 655}; 656 657class MultipleServer2 : public Worker { 658 public: 659 MultipleServer2() : Worker("test_channel2", Channel::MODE_SERVER) { } 660 661 virtual void OnAnswer(int* result) OVERRIDE { 662 *result = 42; 663 Done(); 664 } 665}; 666 667class MultipleClient2 : public Worker { 668 public: 669 MultipleClient2( 670 WaitableEvent* client1_msg_received, WaitableEvent* client1_can_reply, 671 bool pump_during_send) 672 : Worker("test_channel2", Channel::MODE_CLIENT), 673 client1_msg_received_(client1_msg_received), 674 client1_can_reply_(client1_can_reply), 675 pump_during_send_(pump_during_send) { } 676 677 virtual void Run() OVERRIDE { 678 client1_msg_received_->Wait(); 679 SendAnswerToLife(pump_during_send_, true); 680 client1_can_reply_->Signal(); 681 Done(); 682 } 683 684 private: 685 WaitableEvent *client1_msg_received_, *client1_can_reply_; 686 bool pump_during_send_; 687}; 688 689void Multiple(bool server_pump, bool client_pump) { 690 std::vector<Worker*> workers; 691 692 // A shared worker thread so that server1 and server2 run on one thread. 693 base::Thread worker_thread("Multiple"); 694 ASSERT_TRUE(worker_thread.Start()); 695 696 // Server1 sends a sync msg to client1, which blocks the reply until 697 // server2 (which runs on the same worker thread as server1) responds 698 // to a sync msg from client2. 699 WaitableEvent client1_msg_received(false, false); 700 WaitableEvent client1_can_reply(false, false); 701 702 Worker* worker; 703 704 worker = new MultipleServer2(); 705 worker->OverrideThread(&worker_thread); 706 workers.push_back(worker); 707 708 worker = new MultipleClient2( 709 &client1_msg_received, &client1_can_reply, client_pump); 710 workers.push_back(worker); 711 712 worker = new MultipleServer1(server_pump); 713 worker->OverrideThread(&worker_thread); 714 workers.push_back(worker); 715 716 worker = new MultipleClient1( 717 &client1_msg_received, &client1_can_reply); 718 workers.push_back(worker); 719 720 RunTest(workers); 721} 722 723// Tests that multiple SyncObjects on the same listener thread can unblock each 724// other. 725TEST_F(IPCSyncChannelTest, Multiple) { 726 Multiple(false, false); 727 Multiple(false, true); 728 Multiple(true, false); 729 Multiple(true, true); 730} 731 732//------------------------------------------------------------------------------ 733 734// This class provides server side functionality to test the case where 735// multiple sync channels are in use on the same thread on the client and 736// nested calls are issued. 737class QueuedReplyServer : public Worker { 738 public: 739 QueuedReplyServer(base::Thread* listener_thread, 740 const std::string& channel_name, 741 const std::string& reply_text) 742 : Worker(channel_name, Channel::MODE_SERVER), 743 reply_text_(reply_text) { 744 Worker::OverrideThread(listener_thread); 745 } 746 747 virtual void OnNestedTestMsg(Message* reply_msg) OVERRIDE { 748 VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_; 749 SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_); 750 Send(reply_msg); 751 Done(); 752 } 753 754 private: 755 std::string reply_text_; 756}; 757 758// The QueuedReplyClient class provides functionality to test the case where 759// multiple sync channels are in use on the same thread and they make nested 760// sync calls, i.e. while the first channel waits for a response it makes a 761// sync call on another channel. 762// The callstack should unwind correctly, i.e. the outermost call should 763// complete first, and so on. 764class QueuedReplyClient : public Worker { 765 public: 766 QueuedReplyClient(base::Thread* listener_thread, 767 const std::string& channel_name, 768 const std::string& expected_text, 769 bool pump_during_send) 770 : Worker(channel_name, Channel::MODE_CLIENT), 771 pump_during_send_(pump_during_send), 772 expected_text_(expected_text) { 773 Worker::OverrideThread(listener_thread); 774 } 775 776 virtual void Run() OVERRIDE { 777 std::string response; 778 SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response); 779 if (pump_during_send_) 780 msg->EnableMessagePumping(); 781 bool result = Send(msg); 782 DCHECK(result); 783 DCHECK_EQ(response, expected_text_); 784 785 VLOG(1) << __FUNCTION__ << " Received reply: " << response; 786 Done(); 787 } 788 789 private: 790 bool pump_during_send_; 791 std::string expected_text_; 792}; 793 794void QueuedReply(bool client_pump) { 795 std::vector<Worker*> workers; 796 797 // A shared worker thread for servers 798 base::Thread server_worker_thread("QueuedReply_ServerListener"); 799 ASSERT_TRUE(server_worker_thread.Start()); 800 801 base::Thread client_worker_thread("QueuedReply_ClientListener"); 802 ASSERT_TRUE(client_worker_thread.Start()); 803 804 Worker* worker; 805 806 worker = new QueuedReplyServer(&server_worker_thread, 807 "QueuedReply_Server1", 808 "Got first message"); 809 workers.push_back(worker); 810 811 worker = new QueuedReplyServer(&server_worker_thread, 812 "QueuedReply_Server2", 813 "Got second message"); 814 workers.push_back(worker); 815 816 worker = new QueuedReplyClient(&client_worker_thread, 817 "QueuedReply_Server1", 818 "Got first message", 819 client_pump); 820 workers.push_back(worker); 821 822 worker = new QueuedReplyClient(&client_worker_thread, 823 "QueuedReply_Server2", 824 "Got second message", 825 client_pump); 826 workers.push_back(worker); 827 828 RunTest(workers); 829} 830 831// While a blocking send is in progress, the listener thread might answer other 832// synchronous messages. This tests that if during the response to another 833// message the reply to the original messages comes, it is queued up correctly 834// and the original Send is unblocked later. 835// We also test that the send call stacks unwind correctly when the channel 836// pumps messages while waiting for a response. 837TEST_F(IPCSyncChannelTest, QueuedReply) { 838 QueuedReply(false); 839 QueuedReply(true); 840} 841 842//------------------------------------------------------------------------------ 843 844class ChattyClient : public Worker { 845 public: 846 ChattyClient() : 847 Worker(Channel::MODE_CLIENT, "chatty_client") { } 848 849 virtual void OnAnswer(int* answer) OVERRIDE { 850 // The PostMessage limit is 10k. Send 20% more than that. 851 const int kMessageLimit = 10000; 852 const int kMessagesToSend = kMessageLimit * 120 / 100; 853 for (int i = 0; i < kMessagesToSend; ++i) { 854 if (!SendDouble(false, true)) 855 break; 856 } 857 *answer = 42; 858 Done(); 859 } 860}; 861 862void ChattyServer(bool pump_during_send) { 863 std::vector<Worker*> workers; 864 workers.push_back(new UnblockServer(pump_during_send, false)); 865 workers.push_back(new ChattyClient()); 866 RunTest(workers); 867} 868 869// Tests http://b/1093251 - that sending lots of sync messages while 870// the receiver is waiting for a sync reply does not overflow the PostMessage 871// queue. 872TEST_F(IPCSyncChannelTest, ChattyServer) { 873 ChattyServer(false); 874 ChattyServer(true); 875} 876 877//------------------------------------------------------------------------------ 878 879void NestedCallback(Worker* server) { 880 // Sleep a bit so that we wake up after the reply has been received. 881 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(250)); 882 server->SendAnswerToLife(true, true); 883} 884 885bool timeout_occurred = false; 886 887void TimeoutCallback() { 888 timeout_occurred = true; 889} 890 891class DoneEventRaceServer : public Worker { 892 public: 893 DoneEventRaceServer() 894 : Worker(Channel::MODE_SERVER, "done_event_race_server") { } 895 896 virtual void Run() OVERRIDE { 897 base::MessageLoop::current()->PostTask(FROM_HERE, 898 base::Bind(&NestedCallback, this)); 899 base::MessageLoop::current()->PostDelayedTask( 900 FROM_HERE, 901 base::Bind(&TimeoutCallback), 902 base::TimeDelta::FromSeconds(9)); 903 // Even though we have a timeout on the Send, it will succeed since for this 904 // bug, the reply message comes back and is deserialized, however the done 905 // event wasn't set. So we indirectly use the timeout task to notice if a 906 // timeout occurred. 907 SendAnswerToLife(true, true); 908 DCHECK(!timeout_occurred); 909 Done(); 910 } 911}; 912 913// Tests http://b/1474092 - that if after the done_event is set but before 914// OnObjectSignaled is called another message is sent out, then after its 915// reply comes back OnObjectSignaled will be called for the first message. 916TEST_F(IPCSyncChannelTest, DoneEventRace) { 917 std::vector<Worker*> workers; 918 workers.push_back(new DoneEventRaceServer()); 919 workers.push_back(new SimpleClient()); 920 RunTest(workers); 921} 922 923//------------------------------------------------------------------------------ 924 925class TestSyncMessageFilter : public SyncMessageFilter { 926 public: 927 TestSyncMessageFilter(base::WaitableEvent* shutdown_event, 928 Worker* worker, 929 scoped_refptr<base::MessageLoopProxy> message_loop) 930 : SyncMessageFilter(shutdown_event), 931 worker_(worker), 932 message_loop_(message_loop) { 933 } 934 935 virtual void OnFilterAdded(Channel* channel) OVERRIDE { 936 SyncMessageFilter::OnFilterAdded(channel); 937 message_loop_->PostTask( 938 FROM_HERE, 939 base::Bind(&TestSyncMessageFilter::SendMessageOnHelperThread, this)); 940 } 941 942 void SendMessageOnHelperThread() { 943 int answer = 0; 944 bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); 945 DCHECK(result); 946 DCHECK_EQ(answer, 42); 947 948 worker_->Done(); 949 } 950 951 private: 952 virtual ~TestSyncMessageFilter() {} 953 954 Worker* worker_; 955 scoped_refptr<base::MessageLoopProxy> message_loop_; 956}; 957 958class SyncMessageFilterServer : public Worker { 959 public: 960 SyncMessageFilterServer() 961 : Worker(Channel::MODE_SERVER, "sync_message_filter_server"), 962 thread_("helper_thread") { 963 base::Thread::Options options; 964 options.message_loop_type = base::MessageLoop::TYPE_DEFAULT; 965 thread_.StartWithOptions(options); 966 filter_ = new TestSyncMessageFilter(shutdown_event(), this, 967 thread_.message_loop_proxy()); 968 } 969 970 virtual void Run() OVERRIDE { 971 channel()->AddFilter(filter_.get()); 972 } 973 974 base::Thread thread_; 975 scoped_refptr<TestSyncMessageFilter> filter_; 976}; 977 978// This class provides functionality to test the case that a Send on the sync 979// channel does not crash after the channel has been closed. 980class ServerSendAfterClose : public Worker { 981 public: 982 ServerSendAfterClose() 983 : Worker(Channel::MODE_SERVER, "simpler_server"), 984 send_result_(true) { 985 } 986 987 bool SendDummy() { 988 ListenerThread()->message_loop()->PostTask( 989 FROM_HERE, base::Bind(base::IgnoreResult(&ServerSendAfterClose::Send), 990 this, new SyncChannelTestMsg_NoArgs)); 991 return true; 992 } 993 994 bool send_result() const { 995 return send_result_; 996 } 997 998 private: 999 virtual void Run() OVERRIDE { 1000 CloseChannel(); 1001 Done(); 1002 } 1003 1004 virtual bool Send(Message* msg) OVERRIDE { 1005 send_result_ = Worker::Send(msg); 1006 Done(); 1007 return send_result_; 1008 } 1009 1010 bool send_result_; 1011}; 1012 1013// Tests basic synchronous call 1014TEST_F(IPCSyncChannelTest, SyncMessageFilter) { 1015 std::vector<Worker*> workers; 1016 workers.push_back(new SyncMessageFilterServer()); 1017 workers.push_back(new SimpleClient()); 1018 RunTest(workers); 1019} 1020 1021// Test the case when the channel is closed and a Send is attempted after that. 1022TEST_F(IPCSyncChannelTest, SendAfterClose) { 1023 ServerSendAfterClose server; 1024 server.Start(); 1025 1026 server.done_event()->Wait(); 1027 server.done_event()->Reset(); 1028 1029 server.SendDummy(); 1030 server.done_event()->Wait(); 1031 1032 EXPECT_FALSE(server.send_result()); 1033 1034 server.Shutdown(); 1035} 1036 1037//------------------------------------------------------------------------------ 1038 1039class RestrictedDispatchServer : public Worker { 1040 public: 1041 RestrictedDispatchServer(WaitableEvent* sent_ping_event, 1042 WaitableEvent* wait_event) 1043 : Worker("restricted_channel", Channel::MODE_SERVER), 1044 sent_ping_event_(sent_ping_event), 1045 wait_event_(wait_event) { } 1046 1047 void OnDoPing(int ping) { 1048 // Send an asynchronous message that unblocks the caller. 1049 Message* msg = new SyncChannelTestMsg_Ping(ping); 1050 msg->set_unblock(true); 1051 Send(msg); 1052 // Signal the event after the message has been sent on the channel, on the 1053 // IPC thread. 1054 ipc_thread().message_loop()->PostTask( 1055 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnPingSent, this)); 1056 } 1057 1058 void OnPingTTL(int ping, int* out) { 1059 *out = ping; 1060 wait_event_->Wait(); 1061 } 1062 1063 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1064 1065 private: 1066 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1067 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchServer, message) 1068 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1069 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL) 1070 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1071 IPC_END_MESSAGE_MAP() 1072 return true; 1073 } 1074 1075 void OnPingSent() { 1076 sent_ping_event_->Signal(); 1077 } 1078 1079 void OnNoArgs() { } 1080 WaitableEvent* sent_ping_event_; 1081 WaitableEvent* wait_event_; 1082}; 1083 1084class NonRestrictedDispatchServer : public Worker { 1085 public: 1086 NonRestrictedDispatchServer(WaitableEvent* signal_event) 1087 : Worker("non_restricted_channel", Channel::MODE_SERVER), 1088 signal_event_(signal_event) {} 1089 1090 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1091 1092 void OnDoPingTTL(int ping) { 1093 int value = 0; 1094 Send(new SyncChannelTestMsg_PingTTL(ping, &value)); 1095 signal_event_->Signal(); 1096 } 1097 1098 private: 1099 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1100 IPC_BEGIN_MESSAGE_MAP(NonRestrictedDispatchServer, message) 1101 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1102 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1103 IPC_END_MESSAGE_MAP() 1104 return true; 1105 } 1106 1107 void OnNoArgs() { } 1108 WaitableEvent* signal_event_; 1109}; 1110 1111class RestrictedDispatchClient : public Worker { 1112 public: 1113 RestrictedDispatchClient(WaitableEvent* sent_ping_event, 1114 RestrictedDispatchServer* server, 1115 NonRestrictedDispatchServer* server2, 1116 int* success) 1117 : Worker("restricted_channel", Channel::MODE_CLIENT), 1118 ping_(0), 1119 server_(server), 1120 server2_(server2), 1121 success_(success), 1122 sent_ping_event_(sent_ping_event) {} 1123 1124 virtual void Run() OVERRIDE { 1125 // Incoming messages from our channel should only be dispatched when we 1126 // send a message on that same channel. 1127 channel()->SetRestrictDispatchChannelGroup(1); 1128 1129 server_->ListenerThread()->message_loop()->PostTask( 1130 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 1)); 1131 sent_ping_event_->Wait(); 1132 Send(new SyncChannelTestMsg_NoArgs); 1133 if (ping_ == 1) 1134 ++*success_; 1135 else 1136 LOG(ERROR) << "Send failed to dispatch incoming message on same channel"; 1137 1138 non_restricted_channel_.reset( 1139 new SyncChannel("non_restricted_channel", 1140 Channel::MODE_CLIENT, 1141 this, 1142 ipc_thread().message_loop_proxy().get(), 1143 true, 1144 shutdown_event())); 1145 1146 server_->ListenerThread()->message_loop()->PostTask( 1147 FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 2)); 1148 sent_ping_event_->Wait(); 1149 // Check that the incoming message is *not* dispatched when sending on the 1150 // non restricted channel. 1151 // TODO(piman): there is a possibility of a false positive race condition 1152 // here, if the message that was posted on the server-side end of the pipe 1153 // is not visible yet on the client side, but I don't know how to solve this 1154 // without hooking into the internals of SyncChannel. I haven't seen it in 1155 // practice (i.e. not setting SetRestrictDispatchToSameChannel does cause 1156 // the following to fail). 1157 non_restricted_channel_->Send(new SyncChannelTestMsg_NoArgs); 1158 if (ping_ == 1) 1159 ++*success_; 1160 else 1161 LOG(ERROR) << "Send dispatched message from restricted channel"; 1162 1163 Send(new SyncChannelTestMsg_NoArgs); 1164 if (ping_ == 2) 1165 ++*success_; 1166 else 1167 LOG(ERROR) << "Send failed to dispatch incoming message on same channel"; 1168 1169 // Check that the incoming message on the non-restricted channel is 1170 // dispatched when sending on the restricted channel. 1171 server2_->ListenerThread()->message_loop()->PostTask( 1172 FROM_HERE, 1173 base::Bind(&NonRestrictedDispatchServer::OnDoPingTTL, server2_, 3)); 1174 int value = 0; 1175 Send(new SyncChannelTestMsg_PingTTL(4, &value)); 1176 if (ping_ == 3 && value == 4) 1177 ++*success_; 1178 else 1179 LOG(ERROR) << "Send failed to dispatch message from unrestricted channel"; 1180 1181 non_restricted_channel_->Send(new SyncChannelTestMsg_Done); 1182 non_restricted_channel_.reset(); 1183 Send(new SyncChannelTestMsg_Done); 1184 Done(); 1185 } 1186 1187 private: 1188 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1189 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchClient, message) 1190 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Ping, OnPing) 1191 IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_PingTTL, OnPingTTL) 1192 IPC_END_MESSAGE_MAP() 1193 return true; 1194 } 1195 1196 void OnPing(int ping) { 1197 ping_ = ping; 1198 } 1199 1200 void OnPingTTL(int ping, IPC::Message* reply) { 1201 ping_ = ping; 1202 // This message comes from the NonRestrictedDispatchServer, we have to send 1203 // the reply back manually. 1204 SyncChannelTestMsg_PingTTL::WriteReplyParams(reply, ping); 1205 non_restricted_channel_->Send(reply); 1206 } 1207 1208 int ping_; 1209 RestrictedDispatchServer* server_; 1210 NonRestrictedDispatchServer* server2_; 1211 int* success_; 1212 WaitableEvent* sent_ping_event_; 1213 scoped_ptr<SyncChannel> non_restricted_channel_; 1214}; 1215 1216TEST_F(IPCSyncChannelTest, RestrictedDispatch) { 1217 WaitableEvent sent_ping_event(false, false); 1218 WaitableEvent wait_event(false, false); 1219 RestrictedDispatchServer* server = 1220 new RestrictedDispatchServer(&sent_ping_event, &wait_event); 1221 NonRestrictedDispatchServer* server2 = 1222 new NonRestrictedDispatchServer(&wait_event); 1223 1224 int success = 0; 1225 std::vector<Worker*> workers; 1226 workers.push_back(server); 1227 workers.push_back(server2); 1228 workers.push_back(new RestrictedDispatchClient( 1229 &sent_ping_event, server, server2, &success)); 1230 RunTest(workers); 1231 EXPECT_EQ(4, success); 1232} 1233 1234//------------------------------------------------------------------------------ 1235 1236// This test case inspired by crbug.com/108491 1237// We create two servers that use the same ListenerThread but have 1238// SetRestrictDispatchToSameChannel set to true. 1239// We create clients, then use some specific WaitableEvent wait/signalling to 1240// ensure that messages get dispatched in a way that causes a deadlock due to 1241// a nested dispatch and an eligible message in a higher-level dispatch's 1242// delayed_queue. Specifically, we start with client1 about so send an 1243// unblocking message to server1, while the shared listener thread for the 1244// servers server1 and server2 is about to send a non-unblocking message to 1245// client1. At the same time, client2 will be about to send an unblocking 1246// message to server2. Server1 will handle the client1->server1 message by 1247// telling server2 to send a non-unblocking message to client2. 1248// What should happen is that the send to server2 should find the pending, 1249// same-context client2->server2 message to dispatch, causing client2 to 1250// unblock then handle the server2->client2 message, so that the shared 1251// servers' listener thread can then respond to the client1->server1 message. 1252// Then client1 can handle the non-unblocking server1->client1 message. 1253// The old code would end up in a state where the server2->client2 message is 1254// sent, but the client2->server2 message (which is eligible for dispatch, and 1255// which is what client2 is waiting for) is stashed in a local delayed_queue 1256// that has server1's channel context, causing a deadlock. 1257// WaitableEvents in the events array are used to: 1258// event 0: indicate to client1 that server listener is in OnDoServerTask 1259// event 1: indicate to client1 that client2 listener is in OnDoClient2Task 1260// event 2: indicate to server1 that client2 listener is in OnDoClient2Task 1261// event 3: indicate to client2 that server listener is in OnDoServerTask 1262 1263class RestrictedDispatchDeadlockServer : public Worker { 1264 public: 1265 RestrictedDispatchDeadlockServer(int server_num, 1266 WaitableEvent* server_ready_event, 1267 WaitableEvent** events, 1268 RestrictedDispatchDeadlockServer* peer) 1269 : Worker(server_num == 1 ? "channel1" : "channel2", Channel::MODE_SERVER), 1270 server_num_(server_num), 1271 server_ready_event_(server_ready_event), 1272 events_(events), 1273 peer_(peer) { } 1274 1275 void OnDoServerTask() { 1276 events_[3]->Signal(); 1277 events_[2]->Wait(); 1278 events_[0]->Signal(); 1279 SendMessageToClient(); 1280 } 1281 1282 virtual void Run() OVERRIDE { 1283 channel()->SetRestrictDispatchChannelGroup(1); 1284 server_ready_event_->Signal(); 1285 } 1286 1287 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1288 1289 private: 1290 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1291 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message) 1292 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1293 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done) 1294 IPC_END_MESSAGE_MAP() 1295 return true; 1296 } 1297 1298 void OnNoArgs() { 1299 if (server_num_ == 1) { 1300 DCHECK(peer_ != NULL); 1301 peer_->SendMessageToClient(); 1302 } 1303 } 1304 1305 void SendMessageToClient() { 1306 Message* msg = new SyncChannelTestMsg_NoArgs; 1307 msg->set_unblock(false); 1308 DCHECK(!msg->should_unblock()); 1309 Send(msg); 1310 } 1311 1312 int server_num_; 1313 WaitableEvent* server_ready_event_; 1314 WaitableEvent** events_; 1315 RestrictedDispatchDeadlockServer* peer_; 1316}; 1317 1318class RestrictedDispatchDeadlockClient2 : public Worker { 1319 public: 1320 RestrictedDispatchDeadlockClient2(RestrictedDispatchDeadlockServer* server, 1321 WaitableEvent* server_ready_event, 1322 WaitableEvent** events) 1323 : Worker("channel2", Channel::MODE_CLIENT), 1324 server_ready_event_(server_ready_event), 1325 events_(events), 1326 received_msg_(false), 1327 received_noarg_reply_(false), 1328 done_issued_(false) {} 1329 1330 virtual void Run() OVERRIDE { 1331 server_ready_event_->Wait(); 1332 } 1333 1334 void OnDoClient2Task() { 1335 events_[3]->Wait(); 1336 events_[1]->Signal(); 1337 events_[2]->Signal(); 1338 DCHECK(received_msg_ == false); 1339 1340 Message* message = new SyncChannelTestMsg_NoArgs; 1341 message->set_unblock(true); 1342 Send(message); 1343 received_noarg_reply_ = true; 1344 } 1345 1346 base::Thread* ListenerThread() { return Worker::ListenerThread(); } 1347 private: 1348 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1349 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message) 1350 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1351 IPC_END_MESSAGE_MAP() 1352 return true; 1353 } 1354 1355 void OnNoArgs() { 1356 received_msg_ = true; 1357 PossiblyDone(); 1358 } 1359 1360 void PossiblyDone() { 1361 if (received_noarg_reply_ && received_msg_) { 1362 DCHECK(done_issued_ == false); 1363 done_issued_ = true; 1364 Send(new SyncChannelTestMsg_Done); 1365 Done(); 1366 } 1367 } 1368 1369 WaitableEvent* server_ready_event_; 1370 WaitableEvent** events_; 1371 bool received_msg_; 1372 bool received_noarg_reply_; 1373 bool done_issued_; 1374}; 1375 1376class RestrictedDispatchDeadlockClient1 : public Worker { 1377 public: 1378 RestrictedDispatchDeadlockClient1(RestrictedDispatchDeadlockServer* server, 1379 RestrictedDispatchDeadlockClient2* peer, 1380 WaitableEvent* server_ready_event, 1381 WaitableEvent** events) 1382 : Worker("channel1", Channel::MODE_CLIENT), 1383 server_(server), 1384 peer_(peer), 1385 server_ready_event_(server_ready_event), 1386 events_(events), 1387 received_msg_(false), 1388 received_noarg_reply_(false), 1389 done_issued_(false) {} 1390 1391 virtual void Run() OVERRIDE { 1392 server_ready_event_->Wait(); 1393 server_->ListenerThread()->message_loop()->PostTask( 1394 FROM_HERE, 1395 base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask, server_)); 1396 peer_->ListenerThread()->message_loop()->PostTask( 1397 FROM_HERE, 1398 base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task, peer_)); 1399 events_[0]->Wait(); 1400 events_[1]->Wait(); 1401 DCHECK(received_msg_ == false); 1402 1403 Message* message = new SyncChannelTestMsg_NoArgs; 1404 message->set_unblock(true); 1405 Send(message); 1406 received_noarg_reply_ = true; 1407 PossiblyDone(); 1408 } 1409 1410 private: 1411 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1412 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message) 1413 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs) 1414 IPC_END_MESSAGE_MAP() 1415 return true; 1416 } 1417 1418 void OnNoArgs() { 1419 received_msg_ = true; 1420 PossiblyDone(); 1421 } 1422 1423 void PossiblyDone() { 1424 if (received_noarg_reply_ && received_msg_) { 1425 DCHECK(done_issued_ == false); 1426 done_issued_ = true; 1427 Send(new SyncChannelTestMsg_Done); 1428 Done(); 1429 } 1430 } 1431 1432 RestrictedDispatchDeadlockServer* server_; 1433 RestrictedDispatchDeadlockClient2* peer_; 1434 WaitableEvent* server_ready_event_; 1435 WaitableEvent** events_; 1436 bool received_msg_; 1437 bool received_noarg_reply_; 1438 bool done_issued_; 1439}; 1440 1441TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) { 1442 std::vector<Worker*> workers; 1443 1444 // A shared worker thread so that server1 and server2 run on one thread. 1445 base::Thread worker_thread("RestrictedDispatchDeadlock"); 1446 ASSERT_TRUE(worker_thread.Start()); 1447 1448 WaitableEvent server1_ready(false, false); 1449 WaitableEvent server2_ready(false, false); 1450 1451 WaitableEvent event0(false, false); 1452 WaitableEvent event1(false, false); 1453 WaitableEvent event2(false, false); 1454 WaitableEvent event3(false, false); 1455 WaitableEvent* events[4] = {&event0, &event1, &event2, &event3}; 1456 1457 RestrictedDispatchDeadlockServer* server1; 1458 RestrictedDispatchDeadlockServer* server2; 1459 RestrictedDispatchDeadlockClient1* client1; 1460 RestrictedDispatchDeadlockClient2* client2; 1461 1462 server2 = new RestrictedDispatchDeadlockServer(2, &server2_ready, events, 1463 NULL); 1464 server2->OverrideThread(&worker_thread); 1465 workers.push_back(server2); 1466 1467 client2 = new RestrictedDispatchDeadlockClient2(server2, &server2_ready, 1468 events); 1469 workers.push_back(client2); 1470 1471 server1 = new RestrictedDispatchDeadlockServer(1, &server1_ready, events, 1472 server2); 1473 server1->OverrideThread(&worker_thread); 1474 workers.push_back(server1); 1475 1476 client1 = new RestrictedDispatchDeadlockClient1(server1, client2, 1477 &server1_ready, events); 1478 workers.push_back(client1); 1479 1480 RunTest(workers); 1481} 1482 1483//------------------------------------------------------------------------------ 1484 1485// This test case inspired by crbug.com/120530 1486// We create 4 workers that pipe to each other W1->W2->W3->W4->W1 then we send a 1487// message that recurses through 3, 4 or 5 steps to make sure, say, W1 can 1488// re-enter when called from W4 while it's sending a message to W2. 1489// The first worker drives the whole test so it must be treated specially. 1490 1491class RestrictedDispatchPipeWorker : public Worker { 1492 public: 1493 RestrictedDispatchPipeWorker( 1494 const std::string &channel1, 1495 WaitableEvent* event1, 1496 const std::string &channel2, 1497 WaitableEvent* event2, 1498 int group, 1499 int* success) 1500 : Worker(channel1, Channel::MODE_SERVER), 1501 event1_(event1), 1502 event2_(event2), 1503 other_channel_name_(channel2), 1504 group_(group), 1505 success_(success) { 1506 } 1507 1508 void OnPingTTL(int ping, int* ret) { 1509 *ret = 0; 1510 if (!ping) 1511 return; 1512 other_channel_->Send(new SyncChannelTestMsg_PingTTL(ping - 1, ret)); 1513 ++*ret; 1514 } 1515 1516 void OnDone() { 1517 if (is_first()) 1518 return; 1519 other_channel_->Send(new SyncChannelTestMsg_Done); 1520 other_channel_.reset(); 1521 Done(); 1522 } 1523 1524 virtual void Run() OVERRIDE { 1525 channel()->SetRestrictDispatchChannelGroup(group_); 1526 if (is_first()) 1527 event1_->Signal(); 1528 event2_->Wait(); 1529 other_channel_.reset( 1530 new SyncChannel(other_channel_name_, 1531 Channel::MODE_CLIENT, 1532 this, 1533 ipc_thread().message_loop_proxy().get(), 1534 true, 1535 shutdown_event())); 1536 other_channel_->SetRestrictDispatchChannelGroup(group_); 1537 if (!is_first()) { 1538 event1_->Signal(); 1539 return; 1540 } 1541 *success_ = 0; 1542 int value = 0; 1543 OnPingTTL(3, &value); 1544 *success_ += (value == 3); 1545 OnPingTTL(4, &value); 1546 *success_ += (value == 4); 1547 OnPingTTL(5, &value); 1548 *success_ += (value == 5); 1549 other_channel_->Send(new SyncChannelTestMsg_Done); 1550 other_channel_.reset(); 1551 Done(); 1552 } 1553 1554 bool is_first() { return !!success_; } 1555 1556 private: 1557 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1558 IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchPipeWorker, message) 1559 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL) 1560 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, OnDone) 1561 IPC_END_MESSAGE_MAP() 1562 return true; 1563 } 1564 1565 scoped_ptr<SyncChannel> other_channel_; 1566 WaitableEvent* event1_; 1567 WaitableEvent* event2_; 1568 std::string other_channel_name_; 1569 int group_; 1570 int* success_; 1571}; 1572 1573TEST_F(IPCSyncChannelTest, RestrictedDispatch4WayDeadlock) { 1574 int success = 0; 1575 std::vector<Worker*> workers; 1576 WaitableEvent event0(true, false); 1577 WaitableEvent event1(true, false); 1578 WaitableEvent event2(true, false); 1579 WaitableEvent event3(true, false); 1580 workers.push_back(new RestrictedDispatchPipeWorker( 1581 "channel0", &event0, "channel1", &event1, 1, &success)); 1582 workers.push_back(new RestrictedDispatchPipeWorker( 1583 "channel1", &event1, "channel2", &event2, 2, NULL)); 1584 workers.push_back(new RestrictedDispatchPipeWorker( 1585 "channel2", &event2, "channel3", &event3, 3, NULL)); 1586 workers.push_back(new RestrictedDispatchPipeWorker( 1587 "channel3", &event3, "channel0", &event0, 4, NULL)); 1588 RunTest(workers); 1589 EXPECT_EQ(3, success); 1590} 1591 1592//------------------------------------------------------------------------------ 1593 1594// This test case inspired by crbug.com/122443 1595// We want to make sure a reply message with the unblock flag set correctly 1596// behaves as a reply, not a regular message. 1597// We have 3 workers. Server1 will send a message to Server2 (which will block), 1598// during which it will dispatch a message comming from Client, at which point 1599// it will send another message to Server2. While sending that second message it 1600// will receive a reply from Server1 with the unblock flag. 1601 1602class ReentrantReplyServer1 : public Worker { 1603 public: 1604 ReentrantReplyServer1(WaitableEvent* server_ready) 1605 : Worker("reentrant_reply1", Channel::MODE_SERVER), 1606 server_ready_(server_ready) { } 1607 1608 virtual void Run() OVERRIDE { 1609 server2_channel_.reset( 1610 new SyncChannel("reentrant_reply2", 1611 Channel::MODE_CLIENT, 1612 this, 1613 ipc_thread().message_loop_proxy().get(), 1614 true, 1615 shutdown_event())); 1616 server_ready_->Signal(); 1617 Message* msg = new SyncChannelTestMsg_Reentrant1(); 1618 server2_channel_->Send(msg); 1619 server2_channel_.reset(); 1620 Done(); 1621 } 1622 1623 private: 1624 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1625 IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer1, message) 1626 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant2, OnReentrant2) 1627 IPC_REPLY_HANDLER(OnReply) 1628 IPC_END_MESSAGE_MAP() 1629 return true; 1630 } 1631 1632 void OnReentrant2() { 1633 Message* msg = new SyncChannelTestMsg_Reentrant3(); 1634 server2_channel_->Send(msg); 1635 } 1636 1637 void OnReply(const Message& message) { 1638 // If we get here, the Send() will never receive the reply (thus would 1639 // hang), so abort instead. 1640 LOG(FATAL) << "Reply message was dispatched"; 1641 } 1642 1643 WaitableEvent* server_ready_; 1644 scoped_ptr<SyncChannel> server2_channel_; 1645}; 1646 1647class ReentrantReplyServer2 : public Worker { 1648 public: 1649 ReentrantReplyServer2() 1650 : Worker("reentrant_reply2", Channel::MODE_SERVER), 1651 reply_(NULL) { } 1652 1653 private: 1654 virtual bool OnMessageReceived(const Message& message) OVERRIDE { 1655 IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer2, message) 1656 IPC_MESSAGE_HANDLER_DELAY_REPLY( 1657 SyncChannelTestMsg_Reentrant1, OnReentrant1) 1658 IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant3, OnReentrant3) 1659 IPC_END_MESSAGE_MAP() 1660 return true; 1661 } 1662 1663 void OnReentrant1(Message* reply) { 1664 DCHECK(!reply_); 1665 reply_ = reply; 1666 } 1667 1668 void OnReentrant3() { 1669 DCHECK(reply_); 1670 Message* reply = reply_; 1671 reply_ = NULL; 1672 reply->set_unblock(true); 1673 Send(reply); 1674 Done(); 1675 } 1676 1677 Message* reply_; 1678}; 1679 1680class ReentrantReplyClient : public Worker { 1681 public: 1682 ReentrantReplyClient(WaitableEvent* server_ready) 1683 : Worker("reentrant_reply1", Channel::MODE_CLIENT), 1684 server_ready_(server_ready) { } 1685 1686 virtual void Run() OVERRIDE { 1687 server_ready_->Wait(); 1688 Send(new SyncChannelTestMsg_Reentrant2()); 1689 Done(); 1690 } 1691 1692 private: 1693 WaitableEvent* server_ready_; 1694}; 1695 1696TEST_F(IPCSyncChannelTest, ReentrantReply) { 1697 std::vector<Worker*> workers; 1698 WaitableEvent server_ready(false, false); 1699 workers.push_back(new ReentrantReplyServer2()); 1700 workers.push_back(new ReentrantReplyServer1(&server_ready)); 1701 workers.push_back(new ReentrantReplyClient(&server_ready)); 1702 RunTest(workers); 1703} 1704 1705//------------------------------------------------------------------------------ 1706 1707// Generate a validated channel ID using Channel::GenerateVerifiedChannelID(). 1708 1709class VerifiedServer : public Worker { 1710 public: 1711 VerifiedServer(base::Thread* listener_thread, 1712 const std::string& channel_name, 1713 const std::string& reply_text) 1714 : Worker(channel_name, Channel::MODE_SERVER), 1715 reply_text_(reply_text) { 1716 Worker::OverrideThread(listener_thread); 1717 } 1718 1719 virtual void OnNestedTestMsg(Message* reply_msg) OVERRIDE { 1720 VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_; 1721 SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_); 1722 Send(reply_msg); 1723 ASSERT_EQ(channel()->peer_pid(), base::GetCurrentProcId()); 1724 Done(); 1725 } 1726 1727 private: 1728 std::string reply_text_; 1729}; 1730 1731class VerifiedClient : public Worker { 1732 public: 1733 VerifiedClient(base::Thread* listener_thread, 1734 const std::string& channel_name, 1735 const std::string& expected_text) 1736 : Worker(channel_name, Channel::MODE_CLIENT), 1737 expected_text_(expected_text) { 1738 Worker::OverrideThread(listener_thread); 1739 } 1740 1741 virtual void Run() OVERRIDE { 1742 std::string response; 1743 SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response); 1744 bool result = Send(msg); 1745 DCHECK(result); 1746 DCHECK_EQ(response, expected_text_); 1747 // expected_text_ is only used in the above DCHECK. This line suppresses the 1748 // "unused private field" warning in release builds. 1749 (void)expected_text_; 1750 1751 VLOG(1) << __FUNCTION__ << " Received reply: " << response; 1752 ASSERT_EQ(channel()->peer_pid(), base::GetCurrentProcId()); 1753 Done(); 1754 } 1755 1756 private: 1757 std::string expected_text_; 1758}; 1759 1760void Verified() { 1761 std::vector<Worker*> workers; 1762 1763 // A shared worker thread for servers 1764 base::Thread server_worker_thread("Verified_ServerListener"); 1765 ASSERT_TRUE(server_worker_thread.Start()); 1766 1767 base::Thread client_worker_thread("Verified_ClientListener"); 1768 ASSERT_TRUE(client_worker_thread.Start()); 1769 1770 std::string channel_id = Channel::GenerateVerifiedChannelID("Verified"); 1771 Worker* worker; 1772 1773 worker = new VerifiedServer(&server_worker_thread, 1774 channel_id, 1775 "Got first message"); 1776 workers.push_back(worker); 1777 1778 worker = new VerifiedClient(&client_worker_thread, 1779 channel_id, 1780 "Got first message"); 1781 workers.push_back(worker); 1782 1783 RunTest(workers); 1784} 1785 1786// Windows needs to send an out-of-band secret to verify the client end of the 1787// channel. Test that we still connect correctly in that case. 1788TEST_F(IPCSyncChannelTest, Verified) { 1789 Verified(); 1790} 1791 1792} // namespace 1793} // namespace IPC 1794