ipc_sync_channel_unittest.cc revision 46d4c2bc3267f3f028f39e7e311b0f89aba2e4fd
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "ipc/ipc_sync_channel.h"
6
7#include <string>
8#include <vector>
9
10#include "base/basictypes.h"
11#include "base/bind.h"
12#include "base/logging.h"
13#include "base/memory/scoped_ptr.h"
14#include "base/message_loop/message_loop.h"
15#include "base/process/process_handle.h"
16#include "base/run_loop.h"
17#include "base/strings/string_util.h"
18#include "base/synchronization/waitable_event.h"
19#include "base/threading/platform_thread.h"
20#include "base/threading/thread.h"
21#include "ipc/ipc_listener.h"
22#include "ipc/ipc_message.h"
23#include "ipc/ipc_sender.h"
24#include "ipc/ipc_sync_message_filter.h"
25#include "ipc/ipc_sync_message_unittest.h"
26#include "testing/gtest/include/gtest/gtest.h"
27
28using base::WaitableEvent;
29
30namespace IPC {
31namespace {
32
33// Base class for a "process" with listener and IPC threads.
34class Worker : public Listener, public Sender {
35 public:
36  // Will create a channel without a name.
37  Worker(Channel::Mode mode, const std::string& thread_name)
38      : done_(new WaitableEvent(false, false)),
39        channel_created_(new WaitableEvent(false, false)),
40        mode_(mode),
41        ipc_thread_((thread_name + "_ipc").c_str()),
42        listener_thread_((thread_name + "_listener").c_str()),
43        overrided_thread_(NULL),
44        shutdown_event_(true, false),
45        is_shutdown_(false) {
46  }
47
48  // Will create a named channel and use this name for the threads' name.
49  Worker(const std::string& channel_name, Channel::Mode mode)
50      : done_(new WaitableEvent(false, false)),
51        channel_created_(new WaitableEvent(false, false)),
52        channel_name_(channel_name),
53        mode_(mode),
54        ipc_thread_((channel_name + "_ipc").c_str()),
55        listener_thread_((channel_name + "_listener").c_str()),
56        overrided_thread_(NULL),
57        shutdown_event_(true, false),
58        is_shutdown_(false) {
59  }
60
61  virtual ~Worker() {
62    // Shutdown() must be called before destruction.
63    CHECK(is_shutdown_);
64  }
65  void AddRef() { }
66  void Release() { }
67  virtual bool Send(Message* msg) OVERRIDE { return channel_->Send(msg); }
68  void WaitForChannelCreation() { channel_created_->Wait(); }
69  void CloseChannel() {
70    DCHECK(base::MessageLoop::current() == ListenerThread()->message_loop());
71    channel_->Close();
72  }
73  void Start() {
74    StartThread(&listener_thread_, base::MessageLoop::TYPE_DEFAULT);
75    ListenerThread()->message_loop()->PostTask(
76        FROM_HERE, base::Bind(&Worker::OnStart, this));
77  }
78  void Shutdown() {
79    // The IPC thread needs to outlive SyncChannel. We can't do this in
80    // ~Worker(), since that'll reset the vtable pointer (to Worker's), which
81    // may result in a race conditions. See http://crbug.com/25841.
82    WaitableEvent listener_done(false, false), ipc_done(false, false);
83    ListenerThread()->message_loop()->PostTask(
84        FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown1, this,
85                              &listener_done, &ipc_done));
86    listener_done.Wait();
87    ipc_done.Wait();
88    ipc_thread_.Stop();
89    listener_thread_.Stop();
90    is_shutdown_ = true;
91  }
92  void OverrideThread(base::Thread* overrided_thread) {
93    DCHECK(overrided_thread_ == NULL);
94    overrided_thread_ = overrided_thread;
95  }
96  bool SendAnswerToLife(bool pump, bool succeed) {
97    int answer = 0;
98    SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
99    if (pump)
100      msg->EnableMessagePumping();
101    bool result = Send(msg);
102    DCHECK_EQ(result, succeed);
103    DCHECK_EQ(answer, (succeed ? 42 : 0));
104    return result;
105  }
106  bool SendDouble(bool pump, bool succeed) {
107    int answer = 0;
108    SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
109    if (pump)
110      msg->EnableMessagePumping();
111    bool result = Send(msg);
112    DCHECK_EQ(result, succeed);
113    DCHECK_EQ(answer, (succeed ? 10 : 0));
114    return result;
115  }
116  const std::string& channel_name() { return channel_name_; }
117  Channel::Mode mode() { return mode_; }
118  WaitableEvent* done_event() { return done_.get(); }
119  WaitableEvent* shutdown_event() { return &shutdown_event_; }
120  void ResetChannel() { channel_.reset(); }
121  // Derived classes need to call this when they've completed their part of
122  // the test.
123  void Done() { done_->Signal(); }
124
125 protected:
126  SyncChannel* channel() { return channel_.get(); }
127  // Functions for dervied classes to implement if they wish.
128  virtual void Run() { }
129  virtual void OnAnswer(int* answer) { NOTREACHED(); }
130  virtual void OnAnswerDelay(Message* reply_msg) {
131    // The message handler map below can only take one entry for
132    // SyncChannelTestMsg_AnswerToLife, so since some classes want
133    // the normal version while other want the delayed reply, we
134    // call the normal version if the derived class didn't override
135    // this function.
136    int answer;
137    OnAnswer(&answer);
138    SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer);
139    Send(reply_msg);
140  }
141  virtual void OnDouble(int in, int* out) { NOTREACHED(); }
142  virtual void OnDoubleDelay(int in, Message* reply_msg) {
143    int result;
144    OnDouble(in, &result);
145    SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result);
146    Send(reply_msg);
147  }
148
149  virtual void OnNestedTestMsg(Message* reply_msg) {
150    NOTREACHED();
151  }
152
153  virtual SyncChannel* CreateChannel() {
154    scoped_ptr<SyncChannel> channel = SyncChannel::Create(
155        channel_name_, mode_, this, ipc_thread_.message_loop_proxy().get(),
156        true, &shutdown_event_);
157    return channel.release();
158  }
159
160  base::Thread* ListenerThread() {
161    return overrided_thread_ ? overrided_thread_ : &listener_thread_;
162  }
163
164  const base::Thread& ipc_thread() const { return ipc_thread_; }
165
166 private:
167  // Called on the listener thread to create the sync channel.
168  void OnStart() {
169    // Link ipc_thread_, listener_thread_ and channel_ altogether.
170    StartThread(&ipc_thread_, base::MessageLoop::TYPE_IO);
171    channel_.reset(CreateChannel());
172    channel_created_->Signal();
173    Run();
174  }
175
176  void OnListenerThreadShutdown1(WaitableEvent* listener_event,
177                                 WaitableEvent* ipc_event) {
178    // SyncChannel needs to be destructed on the thread that it was created on.
179    channel_.reset();
180
181    base::RunLoop().RunUntilIdle();
182
183    ipc_thread_.message_loop()->PostTask(
184        FROM_HERE, base::Bind(&Worker::OnIPCThreadShutdown, this,
185                              listener_event, ipc_event));
186  }
187
188  void OnIPCThreadShutdown(WaitableEvent* listener_event,
189                           WaitableEvent* ipc_event) {
190    base::RunLoop().RunUntilIdle();
191    ipc_event->Signal();
192
193    listener_thread_.message_loop()->PostTask(
194        FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown2, this,
195                              listener_event));
196  }
197
198  void OnListenerThreadShutdown2(WaitableEvent* listener_event) {
199    base::RunLoop().RunUntilIdle();
200    listener_event->Signal();
201  }
202
203  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
204    IPC_BEGIN_MESSAGE_MAP(Worker, message)
205     IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay)
206     IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife,
207                                     OnAnswerDelay)
208     IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelNestedTestMsg_String,
209                                     OnNestedTestMsg)
210    IPC_END_MESSAGE_MAP()
211    return true;
212  }
213
214  void StartThread(base::Thread* thread, base::MessageLoop::Type type) {
215    base::Thread::Options options;
216    options.message_loop_type = type;
217    thread->StartWithOptions(options);
218  }
219
220  scoped_ptr<WaitableEvent> done_;
221  scoped_ptr<WaitableEvent> channel_created_;
222  std::string channel_name_;
223  Channel::Mode mode_;
224  scoped_ptr<SyncChannel> channel_;
225  base::Thread ipc_thread_;
226  base::Thread listener_thread_;
227  base::Thread* overrided_thread_;
228
229  base::WaitableEvent shutdown_event_;
230
231  bool is_shutdown_;
232
233  DISALLOW_COPY_AND_ASSIGN(Worker);
234};
235
236
237// Starts the test with the given workers.  This function deletes the workers
238// when it's done.
239void RunTest(std::vector<Worker*> workers) {
240  // First we create the workers that are channel servers, or else the other
241  // workers' channel initialization might fail because the pipe isn't created..
242  for (size_t i = 0; i < workers.size(); ++i) {
243    if (workers[i]->mode() & Channel::MODE_SERVER_FLAG) {
244      workers[i]->Start();
245      workers[i]->WaitForChannelCreation();
246    }
247  }
248
249  // now create the clients
250  for (size_t i = 0; i < workers.size(); ++i) {
251    if (workers[i]->mode() & Channel::MODE_CLIENT_FLAG)
252      workers[i]->Start();
253  }
254
255  // wait for all the workers to finish
256  for (size_t i = 0; i < workers.size(); ++i)
257    workers[i]->done_event()->Wait();
258
259  for (size_t i = 0; i < workers.size(); ++i) {
260    workers[i]->Shutdown();
261    delete workers[i];
262  }
263}
264
265class IPCSyncChannelTest : public testing::Test {
266 private:
267  base::MessageLoop message_loop_;
268};
269
270//------------------------------------------------------------------------------
271
272class SimpleServer : public Worker {
273 public:
274  explicit SimpleServer(bool pump_during_send)
275      : Worker(Channel::MODE_SERVER, "simpler_server"),
276        pump_during_send_(pump_during_send) { }
277  virtual void Run() OVERRIDE {
278    SendAnswerToLife(pump_during_send_, true);
279    Done();
280  }
281
282  bool pump_during_send_;
283};
284
285class SimpleClient : public Worker {
286 public:
287  SimpleClient() : Worker(Channel::MODE_CLIENT, "simple_client") { }
288
289  virtual void OnAnswer(int* answer) OVERRIDE {
290    *answer = 42;
291    Done();
292  }
293};
294
295void Simple(bool pump_during_send) {
296  std::vector<Worker*> workers;
297  workers.push_back(new SimpleServer(pump_during_send));
298  workers.push_back(new SimpleClient());
299  RunTest(workers);
300}
301
302// Tests basic synchronous call
303TEST_F(IPCSyncChannelTest, Simple) {
304  Simple(false);
305  Simple(true);
306}
307
308//------------------------------------------------------------------------------
309
310// Worker classes which override how the sync channel is created to use the
311// two-step initialization (calling the lightweight constructor and then
312// ChannelProxy::Init separately) process.
313class TwoStepServer : public Worker {
314 public:
315  explicit TwoStepServer(bool create_pipe_now)
316      : Worker(Channel::MODE_SERVER, "simpler_server"),
317        create_pipe_now_(create_pipe_now) { }
318
319  virtual void Run() OVERRIDE {
320    SendAnswerToLife(false, true);
321    Done();
322  }
323
324  virtual SyncChannel* CreateChannel() OVERRIDE {
325    SyncChannel* channel =
326        SyncChannel::Create(channel_name(), mode(), this,
327                            ipc_thread().message_loop_proxy().get(),
328                            create_pipe_now_,
329                            shutdown_event()).release();
330    return channel;
331  }
332
333  bool create_pipe_now_;
334};
335
336class TwoStepClient : public Worker {
337 public:
338  TwoStepClient(bool create_pipe_now)
339      : Worker(Channel::MODE_CLIENT, "simple_client"),
340        create_pipe_now_(create_pipe_now) { }
341
342  virtual void OnAnswer(int* answer) OVERRIDE {
343    *answer = 42;
344    Done();
345  }
346
347  virtual SyncChannel* CreateChannel() OVERRIDE {
348    SyncChannel* channel =
349        SyncChannel::Create(channel_name(), mode(), this,
350                            ipc_thread().message_loop_proxy().get(),
351                            create_pipe_now_,
352                            shutdown_event()).release();
353    return channel;
354  }
355
356  bool create_pipe_now_;
357};
358
359void TwoStep(bool create_server_pipe_now, bool create_client_pipe_now) {
360  std::vector<Worker*> workers;
361  workers.push_back(new TwoStepServer(create_server_pipe_now));
362  workers.push_back(new TwoStepClient(create_client_pipe_now));
363  RunTest(workers);
364}
365
366// Tests basic two-step initialization, where you call the lightweight
367// constructor then Init.
368TEST_F(IPCSyncChannelTest, TwoStepInitialization) {
369  TwoStep(false, false);
370  TwoStep(false, true);
371  TwoStep(true, false);
372  TwoStep(true, true);
373}
374
375//------------------------------------------------------------------------------
376
377class DelayClient : public Worker {
378 public:
379  DelayClient() : Worker(Channel::MODE_CLIENT, "delay_client") { }
380
381  virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE {
382    SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
383    Send(reply_msg);
384    Done();
385  }
386};
387
388void DelayReply(bool pump_during_send) {
389  std::vector<Worker*> workers;
390  workers.push_back(new SimpleServer(pump_during_send));
391  workers.push_back(new DelayClient());
392  RunTest(workers);
393}
394
395// Tests that asynchronous replies work
396TEST_F(IPCSyncChannelTest, DelayReply) {
397  DelayReply(false);
398  DelayReply(true);
399}
400
401//------------------------------------------------------------------------------
402
403class NoHangServer : public Worker {
404 public:
405  NoHangServer(WaitableEvent* got_first_reply, bool pump_during_send)
406      : Worker(Channel::MODE_SERVER, "no_hang_server"),
407        got_first_reply_(got_first_reply),
408        pump_during_send_(pump_during_send) { }
409  virtual void Run() OVERRIDE {
410    SendAnswerToLife(pump_during_send_, true);
411    got_first_reply_->Signal();
412
413    SendAnswerToLife(pump_during_send_, false);
414    Done();
415  }
416
417  WaitableEvent* got_first_reply_;
418  bool pump_during_send_;
419};
420
421class NoHangClient : public Worker {
422 public:
423  explicit NoHangClient(WaitableEvent* got_first_reply)
424    : Worker(Channel::MODE_CLIENT, "no_hang_client"),
425      got_first_reply_(got_first_reply) { }
426
427  virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE {
428    // Use the DELAY_REPLY macro so that we can force the reply to be sent
429    // before this function returns (when the channel will be reset).
430    SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
431    Send(reply_msg);
432    got_first_reply_->Wait();
433    CloseChannel();
434    Done();
435  }
436
437  WaitableEvent* got_first_reply_;
438};
439
440void NoHang(bool pump_during_send) {
441  WaitableEvent got_first_reply(false, false);
442  std::vector<Worker*> workers;
443  workers.push_back(new NoHangServer(&got_first_reply, pump_during_send));
444  workers.push_back(new NoHangClient(&got_first_reply));
445  RunTest(workers);
446}
447
448// Tests that caller doesn't hang if receiver dies
449TEST_F(IPCSyncChannelTest, NoHang) {
450  NoHang(false);
451  NoHang(true);
452}
453
454//------------------------------------------------------------------------------
455
456class UnblockServer : public Worker {
457 public:
458  UnblockServer(bool pump_during_send, bool delete_during_send)
459    : Worker(Channel::MODE_SERVER, "unblock_server"),
460      pump_during_send_(pump_during_send),
461      delete_during_send_(delete_during_send) { }
462  virtual void Run() OVERRIDE {
463    if (delete_during_send_) {
464      // Use custom code since race conditions mean the answer may or may not be
465      // available.
466      int answer = 0;
467      SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
468      if (pump_during_send_)
469        msg->EnableMessagePumping();
470      Send(msg);
471    } else {
472      SendAnswerToLife(pump_during_send_, true);
473    }
474    Done();
475  }
476
477  virtual void OnDoubleDelay(int in, Message* reply_msg) OVERRIDE {
478    SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
479    Send(reply_msg);
480    if (delete_during_send_)
481      ResetChannel();
482  }
483
484  bool pump_during_send_;
485  bool delete_during_send_;
486};
487
488class UnblockClient : public Worker {
489 public:
490  explicit UnblockClient(bool pump_during_send)
491    : Worker(Channel::MODE_CLIENT, "unblock_client"),
492      pump_during_send_(pump_during_send) { }
493
494  virtual void OnAnswer(int* answer) OVERRIDE {
495    SendDouble(pump_during_send_, true);
496    *answer = 42;
497    Done();
498  }
499
500  bool pump_during_send_;
501};
502
503void Unblock(bool server_pump, bool client_pump, bool delete_during_send) {
504  std::vector<Worker*> workers;
505  workers.push_back(new UnblockServer(server_pump, delete_during_send));
506  workers.push_back(new UnblockClient(client_pump));
507  RunTest(workers);
508}
509
510// Tests that the caller unblocks to answer a sync message from the receiver.
511TEST_F(IPCSyncChannelTest, Unblock) {
512  Unblock(false, false, false);
513  Unblock(false, true, false);
514  Unblock(true, false, false);
515  Unblock(true, true, false);
516}
517
518//------------------------------------------------------------------------------
519
520// Tests that the the SyncChannel object can be deleted during a Send.
521TEST_F(IPCSyncChannelTest, ChannelDeleteDuringSend) {
522  Unblock(false, false, true);
523  Unblock(false, true, true);
524  Unblock(true, false, true);
525  Unblock(true, true, true);
526}
527
528//------------------------------------------------------------------------------
529
530class RecursiveServer : public Worker {
531 public:
532  RecursiveServer(bool expected_send_result, bool pump_first, bool pump_second)
533      : Worker(Channel::MODE_SERVER, "recursive_server"),
534        expected_send_result_(expected_send_result),
535        pump_first_(pump_first), pump_second_(pump_second) {}
536  virtual void Run() OVERRIDE {
537    SendDouble(pump_first_, expected_send_result_);
538    Done();
539  }
540
541  virtual void OnDouble(int in, int* out) OVERRIDE {
542    *out = in * 2;
543    SendAnswerToLife(pump_second_, expected_send_result_);
544  }
545
546  bool expected_send_result_, pump_first_, pump_second_;
547};
548
549class RecursiveClient : public Worker {
550 public:
551  RecursiveClient(bool pump_during_send, bool close_channel)
552      : Worker(Channel::MODE_CLIENT, "recursive_client"),
553        pump_during_send_(pump_during_send), close_channel_(close_channel) {}
554
555  virtual void OnDoubleDelay(int in, Message* reply_msg) OVERRIDE {
556    SendDouble(pump_during_send_, !close_channel_);
557    if (close_channel_) {
558      delete reply_msg;
559    } else {
560      SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
561      Send(reply_msg);
562    }
563    Done();
564  }
565
566  virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE {
567    if (close_channel_) {
568      delete reply_msg;
569      CloseChannel();
570    } else {
571      SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
572      Send(reply_msg);
573    }
574  }
575
576  bool pump_during_send_, close_channel_;
577};
578
579void Recursive(
580    bool server_pump_first, bool server_pump_second, bool client_pump) {
581  std::vector<Worker*> workers;
582  workers.push_back(
583      new RecursiveServer(true, server_pump_first, server_pump_second));
584  workers.push_back(new RecursiveClient(client_pump, false));
585  RunTest(workers);
586}
587
588// Tests a server calling Send while another Send is pending.
589TEST_F(IPCSyncChannelTest, Recursive) {
590  Recursive(false, false, false);
591  Recursive(false, false, true);
592  Recursive(false, true, false);
593  Recursive(false, true, true);
594  Recursive(true, false, false);
595  Recursive(true, false, true);
596  Recursive(true, true, false);
597  Recursive(true, true, true);
598}
599
600//------------------------------------------------------------------------------
601
602void RecursiveNoHang(
603    bool server_pump_first, bool server_pump_second, bool client_pump) {
604  std::vector<Worker*> workers;
605  workers.push_back(
606      new RecursiveServer(false, server_pump_first, server_pump_second));
607  workers.push_back(new RecursiveClient(client_pump, true));
608  RunTest(workers);
609}
610
611// Tests that if a caller makes a sync call during an existing sync call and
612// the receiver dies, neither of the Send() calls hang.
613TEST_F(IPCSyncChannelTest, RecursiveNoHang) {
614  RecursiveNoHang(false, false, false);
615  RecursiveNoHang(false, false, true);
616  RecursiveNoHang(false, true, false);
617  RecursiveNoHang(false, true, true);
618  RecursiveNoHang(true, false, false);
619  RecursiveNoHang(true, false, true);
620  RecursiveNoHang(true, true, false);
621  RecursiveNoHang(true, true, true);
622}
623
624//------------------------------------------------------------------------------
625
626class MultipleServer1 : public Worker {
627 public:
628  explicit MultipleServer1(bool pump_during_send)
629    : Worker("test_channel1", Channel::MODE_SERVER),
630      pump_during_send_(pump_during_send) { }
631
632  virtual void Run() OVERRIDE {
633    SendDouble(pump_during_send_, true);
634    Done();
635  }
636
637  bool pump_during_send_;
638};
639
640class MultipleClient1 : public Worker {
641 public:
642  MultipleClient1(WaitableEvent* client1_msg_received,
643                  WaitableEvent* client1_can_reply) :
644      Worker("test_channel1", Channel::MODE_CLIENT),
645      client1_msg_received_(client1_msg_received),
646      client1_can_reply_(client1_can_reply) { }
647
648  virtual void OnDouble(int in, int* out) OVERRIDE {
649    client1_msg_received_->Signal();
650    *out = in * 2;
651    client1_can_reply_->Wait();
652    Done();
653  }
654
655 private:
656  WaitableEvent *client1_msg_received_, *client1_can_reply_;
657};
658
659class MultipleServer2 : public Worker {
660 public:
661  MultipleServer2() : Worker("test_channel2", Channel::MODE_SERVER) { }
662
663  virtual void OnAnswer(int* result) OVERRIDE {
664    *result = 42;
665    Done();
666  }
667};
668
669class MultipleClient2 : public Worker {
670 public:
671  MultipleClient2(
672    WaitableEvent* client1_msg_received, WaitableEvent* client1_can_reply,
673    bool pump_during_send)
674    : Worker("test_channel2", Channel::MODE_CLIENT),
675      client1_msg_received_(client1_msg_received),
676      client1_can_reply_(client1_can_reply),
677      pump_during_send_(pump_during_send) { }
678
679  virtual void Run() OVERRIDE {
680    client1_msg_received_->Wait();
681    SendAnswerToLife(pump_during_send_, true);
682    client1_can_reply_->Signal();
683    Done();
684  }
685
686 private:
687  WaitableEvent *client1_msg_received_, *client1_can_reply_;
688  bool pump_during_send_;
689};
690
691void Multiple(bool server_pump, bool client_pump) {
692  std::vector<Worker*> workers;
693
694  // A shared worker thread so that server1 and server2 run on one thread.
695  base::Thread worker_thread("Multiple");
696  ASSERT_TRUE(worker_thread.Start());
697
698  // Server1 sends a sync msg to client1, which blocks the reply until
699  // server2 (which runs on the same worker thread as server1) responds
700  // to a sync msg from client2.
701  WaitableEvent client1_msg_received(false, false);
702  WaitableEvent client1_can_reply(false, false);
703
704  Worker* worker;
705
706  worker = new MultipleServer2();
707  worker->OverrideThread(&worker_thread);
708  workers.push_back(worker);
709
710  worker = new MultipleClient2(
711      &client1_msg_received, &client1_can_reply, client_pump);
712  workers.push_back(worker);
713
714  worker = new MultipleServer1(server_pump);
715  worker->OverrideThread(&worker_thread);
716  workers.push_back(worker);
717
718  worker = new MultipleClient1(
719      &client1_msg_received, &client1_can_reply);
720  workers.push_back(worker);
721
722  RunTest(workers);
723}
724
725// Tests that multiple SyncObjects on the same listener thread can unblock each
726// other.
727TEST_F(IPCSyncChannelTest, Multiple) {
728  Multiple(false, false);
729  Multiple(false, true);
730  Multiple(true, false);
731  Multiple(true, true);
732}
733
734//------------------------------------------------------------------------------
735
736// This class provides server side functionality to test the case where
737// multiple sync channels are in use on the same thread on the client and
738// nested calls are issued.
739class QueuedReplyServer : public Worker {
740 public:
741  QueuedReplyServer(base::Thread* listener_thread,
742                    const std::string& channel_name,
743                    const std::string& reply_text)
744      : Worker(channel_name, Channel::MODE_SERVER),
745        reply_text_(reply_text) {
746    Worker::OverrideThread(listener_thread);
747  }
748
749  virtual void OnNestedTestMsg(Message* reply_msg) OVERRIDE {
750    VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_;
751    SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_);
752    Send(reply_msg);
753    Done();
754  }
755
756 private:
757  std::string reply_text_;
758};
759
760// The QueuedReplyClient class provides functionality to test the case where
761// multiple sync channels are in use on the same thread and they make nested
762// sync calls, i.e. while the first channel waits for a response it makes a
763// sync call on another channel.
764// The callstack should unwind correctly, i.e. the outermost call should
765// complete first, and so on.
766class QueuedReplyClient : public Worker {
767 public:
768  QueuedReplyClient(base::Thread* listener_thread,
769                    const std::string& channel_name,
770                    const std::string& expected_text,
771                    bool pump_during_send)
772      : Worker(channel_name, Channel::MODE_CLIENT),
773        pump_during_send_(pump_during_send),
774        expected_text_(expected_text) {
775    Worker::OverrideThread(listener_thread);
776  }
777
778  virtual void Run() OVERRIDE {
779    std::string response;
780    SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response);
781    if (pump_during_send_)
782      msg->EnableMessagePumping();
783    bool result = Send(msg);
784    DCHECK(result);
785    DCHECK_EQ(response, expected_text_);
786
787    VLOG(1) << __FUNCTION__ << " Received reply: " << response;
788    Done();
789  }
790
791 private:
792  bool pump_during_send_;
793  std::string expected_text_;
794};
795
796void QueuedReply(bool client_pump) {
797  std::vector<Worker*> workers;
798
799  // A shared worker thread for servers
800  base::Thread server_worker_thread("QueuedReply_ServerListener");
801  ASSERT_TRUE(server_worker_thread.Start());
802
803  base::Thread client_worker_thread("QueuedReply_ClientListener");
804  ASSERT_TRUE(client_worker_thread.Start());
805
806  Worker* worker;
807
808  worker = new QueuedReplyServer(&server_worker_thread,
809                                 "QueuedReply_Server1",
810                                 "Got first message");
811  workers.push_back(worker);
812
813  worker = new QueuedReplyServer(&server_worker_thread,
814                                 "QueuedReply_Server2",
815                                 "Got second message");
816  workers.push_back(worker);
817
818  worker = new QueuedReplyClient(&client_worker_thread,
819                                 "QueuedReply_Server1",
820                                 "Got first message",
821                                 client_pump);
822  workers.push_back(worker);
823
824  worker = new QueuedReplyClient(&client_worker_thread,
825                                 "QueuedReply_Server2",
826                                 "Got second message",
827                                 client_pump);
828  workers.push_back(worker);
829
830  RunTest(workers);
831}
832
833// While a blocking send is in progress, the listener thread might answer other
834// synchronous messages.  This tests that if during the response to another
835// message the reply to the original messages comes, it is queued up correctly
836// and the original Send is unblocked later.
837// We also test that the send call stacks unwind correctly when the channel
838// pumps messages while waiting for a response.
839TEST_F(IPCSyncChannelTest, QueuedReply) {
840  QueuedReply(false);
841  QueuedReply(true);
842}
843
844//------------------------------------------------------------------------------
845
846class ChattyClient : public Worker {
847 public:
848  ChattyClient() :
849      Worker(Channel::MODE_CLIENT, "chatty_client") { }
850
851  virtual void OnAnswer(int* answer) OVERRIDE {
852    // The PostMessage limit is 10k.  Send 20% more than that.
853    const int kMessageLimit = 10000;
854    const int kMessagesToSend = kMessageLimit * 120 / 100;
855    for (int i = 0; i < kMessagesToSend; ++i) {
856      if (!SendDouble(false, true))
857        break;
858    }
859    *answer = 42;
860    Done();
861  }
862};
863
864void ChattyServer(bool pump_during_send) {
865  std::vector<Worker*> workers;
866  workers.push_back(new UnblockServer(pump_during_send, false));
867  workers.push_back(new ChattyClient());
868  RunTest(workers);
869}
870
871// Tests http://b/1093251 - that sending lots of sync messages while
872// the receiver is waiting for a sync reply does not overflow the PostMessage
873// queue.
874TEST_F(IPCSyncChannelTest, ChattyServer) {
875  ChattyServer(false);
876  ChattyServer(true);
877}
878
879//------------------------------------------------------------------------------
880
881void NestedCallback(Worker* server) {
882  // Sleep a bit so that we wake up after the reply has been received.
883  base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(250));
884  server->SendAnswerToLife(true, true);
885}
886
887bool timeout_occurred = false;
888
889void TimeoutCallback() {
890  timeout_occurred = true;
891}
892
893class DoneEventRaceServer : public Worker {
894 public:
895  DoneEventRaceServer()
896      : Worker(Channel::MODE_SERVER, "done_event_race_server") { }
897
898  virtual void Run() OVERRIDE {
899    base::MessageLoop::current()->PostTask(FROM_HERE,
900                                           base::Bind(&NestedCallback, this));
901    base::MessageLoop::current()->PostDelayedTask(
902        FROM_HERE,
903        base::Bind(&TimeoutCallback),
904        base::TimeDelta::FromSeconds(9));
905    // Even though we have a timeout on the Send, it will succeed since for this
906    // bug, the reply message comes back and is deserialized, however the done
907    // event wasn't set.  So we indirectly use the timeout task to notice if a
908    // timeout occurred.
909    SendAnswerToLife(true, true);
910    DCHECK(!timeout_occurred);
911    Done();
912  }
913};
914
915// Tests http://b/1474092 - that if after the done_event is set but before
916// OnObjectSignaled is called another message is sent out, then after its
917// reply comes back OnObjectSignaled will be called for the first message.
918TEST_F(IPCSyncChannelTest, DoneEventRace) {
919  std::vector<Worker*> workers;
920  workers.push_back(new DoneEventRaceServer());
921  workers.push_back(new SimpleClient());
922  RunTest(workers);
923}
924
925//------------------------------------------------------------------------------
926
927class TestSyncMessageFilter : public SyncMessageFilter {
928 public:
929  TestSyncMessageFilter(base::WaitableEvent* shutdown_event,
930                        Worker* worker,
931                        scoped_refptr<base::MessageLoopProxy> message_loop)
932      : SyncMessageFilter(shutdown_event),
933        worker_(worker),
934        message_loop_(message_loop) {
935  }
936
937  virtual void OnFilterAdded(Channel* channel) OVERRIDE {
938    SyncMessageFilter::OnFilterAdded(channel);
939    message_loop_->PostTask(
940        FROM_HERE,
941        base::Bind(&TestSyncMessageFilter::SendMessageOnHelperThread, this));
942  }
943
944  void SendMessageOnHelperThread() {
945    int answer = 0;
946    bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
947    DCHECK(result);
948    DCHECK_EQ(answer, 42);
949
950    worker_->Done();
951  }
952
953 private:
954  virtual ~TestSyncMessageFilter() {}
955
956  Worker* worker_;
957  scoped_refptr<base::MessageLoopProxy> message_loop_;
958};
959
960class SyncMessageFilterServer : public Worker {
961 public:
962  SyncMessageFilterServer()
963      : Worker(Channel::MODE_SERVER, "sync_message_filter_server"),
964        thread_("helper_thread") {
965    base::Thread::Options options;
966    options.message_loop_type = base::MessageLoop::TYPE_DEFAULT;
967    thread_.StartWithOptions(options);
968    filter_ = new TestSyncMessageFilter(shutdown_event(), this,
969                                        thread_.message_loop_proxy());
970  }
971
972  virtual void Run() OVERRIDE {
973    channel()->AddFilter(filter_.get());
974  }
975
976  base::Thread thread_;
977  scoped_refptr<TestSyncMessageFilter> filter_;
978};
979
980// This class provides functionality to test the case that a Send on the sync
981// channel does not crash after the channel has been closed.
982class ServerSendAfterClose : public Worker {
983 public:
984  ServerSendAfterClose()
985     : Worker(Channel::MODE_SERVER, "simpler_server"),
986       send_result_(true) {
987  }
988
989  bool SendDummy() {
990    ListenerThread()->message_loop()->PostTask(
991        FROM_HERE, base::Bind(base::IgnoreResult(&ServerSendAfterClose::Send),
992                              this, new SyncChannelTestMsg_NoArgs));
993    return true;
994  }
995
996  bool send_result() const {
997    return send_result_;
998  }
999
1000 private:
1001  virtual void Run() OVERRIDE {
1002    CloseChannel();
1003    Done();
1004  }
1005
1006  virtual bool Send(Message* msg) OVERRIDE {
1007    send_result_ = Worker::Send(msg);
1008    Done();
1009    return send_result_;
1010  }
1011
1012  bool send_result_;
1013};
1014
1015// Tests basic synchronous call
1016TEST_F(IPCSyncChannelTest, SyncMessageFilter) {
1017  std::vector<Worker*> workers;
1018  workers.push_back(new SyncMessageFilterServer());
1019  workers.push_back(new SimpleClient());
1020  RunTest(workers);
1021}
1022
1023// Test the case when the channel is closed and a Send is attempted after that.
1024TEST_F(IPCSyncChannelTest, SendAfterClose) {
1025  ServerSendAfterClose server;
1026  server.Start();
1027
1028  server.done_event()->Wait();
1029  server.done_event()->Reset();
1030
1031  server.SendDummy();
1032  server.done_event()->Wait();
1033
1034  EXPECT_FALSE(server.send_result());
1035
1036  server.Shutdown();
1037}
1038
1039//------------------------------------------------------------------------------
1040
1041class RestrictedDispatchServer : public Worker {
1042 public:
1043  RestrictedDispatchServer(WaitableEvent* sent_ping_event,
1044                           WaitableEvent* wait_event)
1045      : Worker("restricted_channel", Channel::MODE_SERVER),
1046        sent_ping_event_(sent_ping_event),
1047        wait_event_(wait_event) { }
1048
1049  void OnDoPing(int ping) {
1050    // Send an asynchronous message that unblocks the caller.
1051    Message* msg = new SyncChannelTestMsg_Ping(ping);
1052    msg->set_unblock(true);
1053    Send(msg);
1054    // Signal the event after the message has been sent on the channel, on the
1055    // IPC thread.
1056    ipc_thread().message_loop()->PostTask(
1057        FROM_HERE, base::Bind(&RestrictedDispatchServer::OnPingSent, this));
1058  }
1059
1060  void OnPingTTL(int ping, int* out) {
1061    *out = ping;
1062    wait_event_->Wait();
1063  }
1064
1065  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1066
1067 private:
1068  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1069    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchServer, message)
1070     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1071     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
1072     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1073    IPC_END_MESSAGE_MAP()
1074    return true;
1075  }
1076
1077  void OnPingSent() {
1078    sent_ping_event_->Signal();
1079  }
1080
1081  void OnNoArgs() { }
1082  WaitableEvent* sent_ping_event_;
1083  WaitableEvent* wait_event_;
1084};
1085
1086class NonRestrictedDispatchServer : public Worker {
1087 public:
1088  NonRestrictedDispatchServer(WaitableEvent* signal_event)
1089      : Worker("non_restricted_channel", Channel::MODE_SERVER),
1090        signal_event_(signal_event) {}
1091
1092  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1093
1094  void OnDoPingTTL(int ping) {
1095    int value = 0;
1096    Send(new SyncChannelTestMsg_PingTTL(ping, &value));
1097    signal_event_->Signal();
1098  }
1099
1100 private:
1101  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1102    IPC_BEGIN_MESSAGE_MAP(NonRestrictedDispatchServer, message)
1103     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1104     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1105    IPC_END_MESSAGE_MAP()
1106    return true;
1107  }
1108
1109  void OnNoArgs() { }
1110  WaitableEvent* signal_event_;
1111};
1112
1113class RestrictedDispatchClient : public Worker {
1114 public:
1115  RestrictedDispatchClient(WaitableEvent* sent_ping_event,
1116                           RestrictedDispatchServer* server,
1117                           NonRestrictedDispatchServer* server2,
1118                           int* success)
1119      : Worker("restricted_channel", Channel::MODE_CLIENT),
1120        ping_(0),
1121        server_(server),
1122        server2_(server2),
1123        success_(success),
1124        sent_ping_event_(sent_ping_event) {}
1125
1126  virtual void Run() OVERRIDE {
1127    // Incoming messages from our channel should only be dispatched when we
1128    // send a message on that same channel.
1129    channel()->SetRestrictDispatchChannelGroup(1);
1130
1131    server_->ListenerThread()->message_loop()->PostTask(
1132        FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 1));
1133    sent_ping_event_->Wait();
1134    Send(new SyncChannelTestMsg_NoArgs);
1135    if (ping_ == 1)
1136      ++*success_;
1137    else
1138      LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
1139
1140    non_restricted_channel_ =
1141        SyncChannel::Create("non_restricted_channel",
1142                            IPC::Channel::MODE_CLIENT,
1143                            this,
1144                            ipc_thread().message_loop_proxy().get(),
1145                            true,
1146                            shutdown_event());
1147
1148    server_->ListenerThread()->message_loop()->PostTask(
1149        FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 2));
1150    sent_ping_event_->Wait();
1151    // Check that the incoming message is *not* dispatched when sending on the
1152    // non restricted channel.
1153    // TODO(piman): there is a possibility of a false positive race condition
1154    // here, if the message that was posted on the server-side end of the pipe
1155    // is not visible yet on the client side, but I don't know how to solve this
1156    // without hooking into the internals of SyncChannel. I haven't seen it in
1157    // practice (i.e. not setting SetRestrictDispatchToSameChannel does cause
1158    // the following to fail).
1159    non_restricted_channel_->Send(new SyncChannelTestMsg_NoArgs);
1160    if (ping_ == 1)
1161      ++*success_;
1162    else
1163      LOG(ERROR) << "Send dispatched message from restricted channel";
1164
1165    Send(new SyncChannelTestMsg_NoArgs);
1166    if (ping_ == 2)
1167      ++*success_;
1168    else
1169      LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
1170
1171    // Check that the incoming message on the non-restricted channel is
1172    // dispatched when sending on the restricted channel.
1173    server2_->ListenerThread()->message_loop()->PostTask(
1174        FROM_HERE,
1175        base::Bind(&NonRestrictedDispatchServer::OnDoPingTTL, server2_, 3));
1176    int value = 0;
1177    Send(new SyncChannelTestMsg_PingTTL(4, &value));
1178    if (ping_ == 3 && value == 4)
1179      ++*success_;
1180    else
1181      LOG(ERROR) << "Send failed to dispatch message from unrestricted channel";
1182
1183    non_restricted_channel_->Send(new SyncChannelTestMsg_Done);
1184    non_restricted_channel_.reset();
1185    Send(new SyncChannelTestMsg_Done);
1186    Done();
1187  }
1188
1189 private:
1190  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1191    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchClient, message)
1192     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Ping, OnPing)
1193     IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_PingTTL, OnPingTTL)
1194    IPC_END_MESSAGE_MAP()
1195    return true;
1196  }
1197
1198  void OnPing(int ping) {
1199    ping_ = ping;
1200  }
1201
1202  void OnPingTTL(int ping, IPC::Message* reply) {
1203    ping_ = ping;
1204    // This message comes from the NonRestrictedDispatchServer, we have to send
1205    // the reply back manually.
1206    SyncChannelTestMsg_PingTTL::WriteReplyParams(reply, ping);
1207    non_restricted_channel_->Send(reply);
1208  }
1209
1210  int ping_;
1211  RestrictedDispatchServer* server_;
1212  NonRestrictedDispatchServer* server2_;
1213  int* success_;
1214  WaitableEvent* sent_ping_event_;
1215  scoped_ptr<SyncChannel> non_restricted_channel_;
1216};
1217
1218TEST_F(IPCSyncChannelTest, RestrictedDispatch) {
1219  WaitableEvent sent_ping_event(false, false);
1220  WaitableEvent wait_event(false, false);
1221  RestrictedDispatchServer* server =
1222      new RestrictedDispatchServer(&sent_ping_event, &wait_event);
1223  NonRestrictedDispatchServer* server2 =
1224      new NonRestrictedDispatchServer(&wait_event);
1225
1226  int success = 0;
1227  std::vector<Worker*> workers;
1228  workers.push_back(server);
1229  workers.push_back(server2);
1230  workers.push_back(new RestrictedDispatchClient(
1231      &sent_ping_event, server, server2, &success));
1232  RunTest(workers);
1233  EXPECT_EQ(4, success);
1234}
1235
1236//------------------------------------------------------------------------------
1237
1238// This test case inspired by crbug.com/108491
1239// We create two servers that use the same ListenerThread but have
1240// SetRestrictDispatchToSameChannel set to true.
1241// We create clients, then use some specific WaitableEvent wait/signalling to
1242// ensure that messages get dispatched in a way that causes a deadlock due to
1243// a nested dispatch and an eligible message in a higher-level dispatch's
1244// delayed_queue. Specifically, we start with client1 about so send an
1245// unblocking message to server1, while the shared listener thread for the
1246// servers server1 and server2 is about to send a non-unblocking message to
1247// client1. At the same time, client2 will be about to send an unblocking
1248// message to server2. Server1 will handle the client1->server1 message by
1249// telling server2 to send a non-unblocking message to client2.
1250// What should happen is that the send to server2 should find the pending,
1251// same-context client2->server2 message to dispatch, causing client2 to
1252// unblock then handle the server2->client2 message, so that the shared
1253// servers' listener thread can then respond to the client1->server1 message.
1254// Then client1 can handle the non-unblocking server1->client1 message.
1255// The old code would end up in a state where the server2->client2 message is
1256// sent, but the client2->server2 message (which is eligible for dispatch, and
1257// which is what client2 is waiting for) is stashed in a local delayed_queue
1258// that has server1's channel context, causing a deadlock.
1259// WaitableEvents in the events array are used to:
1260//   event 0: indicate to client1 that server listener is in OnDoServerTask
1261//   event 1: indicate to client1 that client2 listener is in OnDoClient2Task
1262//   event 2: indicate to server1 that client2 listener is in OnDoClient2Task
1263//   event 3: indicate to client2 that server listener is in OnDoServerTask
1264
1265class RestrictedDispatchDeadlockServer : public Worker {
1266 public:
1267  RestrictedDispatchDeadlockServer(int server_num,
1268                                   WaitableEvent* server_ready_event,
1269                                   WaitableEvent** events,
1270                                   RestrictedDispatchDeadlockServer* peer)
1271      : Worker(server_num == 1 ? "channel1" : "channel2", Channel::MODE_SERVER),
1272        server_num_(server_num),
1273        server_ready_event_(server_ready_event),
1274        events_(events),
1275        peer_(peer) { }
1276
1277  void OnDoServerTask() {
1278    events_[3]->Signal();
1279    events_[2]->Wait();
1280    events_[0]->Signal();
1281    SendMessageToClient();
1282  }
1283
1284  virtual void Run() OVERRIDE {
1285    channel()->SetRestrictDispatchChannelGroup(1);
1286    server_ready_event_->Signal();
1287  }
1288
1289  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1290
1291 private:
1292  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1293    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message)
1294     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1295     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1296    IPC_END_MESSAGE_MAP()
1297    return true;
1298  }
1299
1300  void OnNoArgs() {
1301    if (server_num_ == 1) {
1302      DCHECK(peer_ != NULL);
1303      peer_->SendMessageToClient();
1304    }
1305  }
1306
1307  void SendMessageToClient() {
1308    Message* msg = new SyncChannelTestMsg_NoArgs;
1309    msg->set_unblock(false);
1310    DCHECK(!msg->should_unblock());
1311    Send(msg);
1312  }
1313
1314  int server_num_;
1315  WaitableEvent* server_ready_event_;
1316  WaitableEvent** events_;
1317  RestrictedDispatchDeadlockServer* peer_;
1318};
1319
1320class RestrictedDispatchDeadlockClient2 : public Worker {
1321 public:
1322  RestrictedDispatchDeadlockClient2(RestrictedDispatchDeadlockServer* server,
1323                                    WaitableEvent* server_ready_event,
1324                                    WaitableEvent** events)
1325      : Worker("channel2", Channel::MODE_CLIENT),
1326        server_ready_event_(server_ready_event),
1327        events_(events),
1328        received_msg_(false),
1329        received_noarg_reply_(false),
1330        done_issued_(false) {}
1331
1332  virtual void Run() OVERRIDE {
1333    server_ready_event_->Wait();
1334  }
1335
1336  void OnDoClient2Task() {
1337    events_[3]->Wait();
1338    events_[1]->Signal();
1339    events_[2]->Signal();
1340    DCHECK(received_msg_ == false);
1341
1342    Message* message = new SyncChannelTestMsg_NoArgs;
1343    message->set_unblock(true);
1344    Send(message);
1345    received_noarg_reply_ = true;
1346  }
1347
1348  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1349 private:
1350  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1351    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message)
1352     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1353    IPC_END_MESSAGE_MAP()
1354    return true;
1355  }
1356
1357  void OnNoArgs() {
1358    received_msg_ = true;
1359    PossiblyDone();
1360  }
1361
1362  void PossiblyDone() {
1363    if (received_noarg_reply_ && received_msg_) {
1364      DCHECK(done_issued_ == false);
1365      done_issued_ = true;
1366      Send(new SyncChannelTestMsg_Done);
1367      Done();
1368    }
1369  }
1370
1371  WaitableEvent* server_ready_event_;
1372  WaitableEvent** events_;
1373  bool received_msg_;
1374  bool received_noarg_reply_;
1375  bool done_issued_;
1376};
1377
1378class RestrictedDispatchDeadlockClient1 : public Worker {
1379 public:
1380  RestrictedDispatchDeadlockClient1(RestrictedDispatchDeadlockServer* server,
1381                                    RestrictedDispatchDeadlockClient2* peer,
1382                                    WaitableEvent* server_ready_event,
1383                                    WaitableEvent** events)
1384      : Worker("channel1", Channel::MODE_CLIENT),
1385        server_(server),
1386        peer_(peer),
1387        server_ready_event_(server_ready_event),
1388        events_(events),
1389        received_msg_(false),
1390        received_noarg_reply_(false),
1391        done_issued_(false) {}
1392
1393  virtual void Run() OVERRIDE {
1394    server_ready_event_->Wait();
1395    server_->ListenerThread()->message_loop()->PostTask(
1396        FROM_HERE,
1397        base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask, server_));
1398    peer_->ListenerThread()->message_loop()->PostTask(
1399        FROM_HERE,
1400        base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task, peer_));
1401    events_[0]->Wait();
1402    events_[1]->Wait();
1403    DCHECK(received_msg_ == false);
1404
1405    Message* message = new SyncChannelTestMsg_NoArgs;
1406    message->set_unblock(true);
1407    Send(message);
1408    received_noarg_reply_ = true;
1409    PossiblyDone();
1410  }
1411
1412 private:
1413  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1414    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message)
1415     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1416    IPC_END_MESSAGE_MAP()
1417    return true;
1418  }
1419
1420  void OnNoArgs() {
1421    received_msg_ = true;
1422    PossiblyDone();
1423  }
1424
1425  void PossiblyDone() {
1426    if (received_noarg_reply_ && received_msg_) {
1427      DCHECK(done_issued_ == false);
1428      done_issued_ = true;
1429      Send(new SyncChannelTestMsg_Done);
1430      Done();
1431    }
1432  }
1433
1434  RestrictedDispatchDeadlockServer* server_;
1435  RestrictedDispatchDeadlockClient2* peer_;
1436  WaitableEvent* server_ready_event_;
1437  WaitableEvent** events_;
1438  bool received_msg_;
1439  bool received_noarg_reply_;
1440  bool done_issued_;
1441};
1442
1443TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) {
1444  std::vector<Worker*> workers;
1445
1446  // A shared worker thread so that server1 and server2 run on one thread.
1447  base::Thread worker_thread("RestrictedDispatchDeadlock");
1448  ASSERT_TRUE(worker_thread.Start());
1449
1450  WaitableEvent server1_ready(false, false);
1451  WaitableEvent server2_ready(false, false);
1452
1453  WaitableEvent event0(false, false);
1454  WaitableEvent event1(false, false);
1455  WaitableEvent event2(false, false);
1456  WaitableEvent event3(false, false);
1457  WaitableEvent* events[4] = {&event0, &event1, &event2, &event3};
1458
1459  RestrictedDispatchDeadlockServer* server1;
1460  RestrictedDispatchDeadlockServer* server2;
1461  RestrictedDispatchDeadlockClient1* client1;
1462  RestrictedDispatchDeadlockClient2* client2;
1463
1464  server2 = new RestrictedDispatchDeadlockServer(2, &server2_ready, events,
1465                                                 NULL);
1466  server2->OverrideThread(&worker_thread);
1467  workers.push_back(server2);
1468
1469  client2 = new RestrictedDispatchDeadlockClient2(server2, &server2_ready,
1470                                                  events);
1471  workers.push_back(client2);
1472
1473  server1 = new RestrictedDispatchDeadlockServer(1, &server1_ready, events,
1474                                                 server2);
1475  server1->OverrideThread(&worker_thread);
1476  workers.push_back(server1);
1477
1478  client1 = new RestrictedDispatchDeadlockClient1(server1, client2,
1479                                                  &server1_ready, events);
1480  workers.push_back(client1);
1481
1482  RunTest(workers);
1483}
1484
1485//------------------------------------------------------------------------------
1486
1487// This test case inspired by crbug.com/120530
1488// We create 4 workers that pipe to each other W1->W2->W3->W4->W1 then we send a
1489// message that recurses through 3, 4 or 5 steps to make sure, say, W1 can
1490// re-enter when called from W4 while it's sending a message to W2.
1491// The first worker drives the whole test so it must be treated specially.
1492
1493class RestrictedDispatchPipeWorker : public Worker {
1494 public:
1495  RestrictedDispatchPipeWorker(
1496      const std::string &channel1,
1497      WaitableEvent* event1,
1498      const std::string &channel2,
1499      WaitableEvent* event2,
1500      int group,
1501      int* success)
1502      : Worker(channel1, Channel::MODE_SERVER),
1503        event1_(event1),
1504        event2_(event2),
1505        other_channel_name_(channel2),
1506        group_(group),
1507        success_(success) {
1508  }
1509
1510  void OnPingTTL(int ping, int* ret) {
1511    *ret = 0;
1512    if (!ping)
1513      return;
1514    other_channel_->Send(new SyncChannelTestMsg_PingTTL(ping - 1, ret));
1515    ++*ret;
1516  }
1517
1518  void OnDone() {
1519    if (is_first())
1520      return;
1521    other_channel_->Send(new SyncChannelTestMsg_Done);
1522    other_channel_.reset();
1523    Done();
1524  }
1525
1526  virtual void Run() OVERRIDE {
1527    channel()->SetRestrictDispatchChannelGroup(group_);
1528    if (is_first())
1529      event1_->Signal();
1530    event2_->Wait();
1531    other_channel_ =
1532        SyncChannel::Create(other_channel_name_,
1533                            IPC::Channel::MODE_CLIENT,
1534                            this,
1535                            ipc_thread().message_loop_proxy().get(),
1536                            true,
1537                            shutdown_event());
1538    other_channel_->SetRestrictDispatchChannelGroup(group_);
1539    if (!is_first()) {
1540      event1_->Signal();
1541      return;
1542    }
1543    *success_ = 0;
1544    int value = 0;
1545    OnPingTTL(3, &value);
1546    *success_ += (value == 3);
1547    OnPingTTL(4, &value);
1548    *success_ += (value == 4);
1549    OnPingTTL(5, &value);
1550    *success_ += (value == 5);
1551    other_channel_->Send(new SyncChannelTestMsg_Done);
1552    other_channel_.reset();
1553    Done();
1554  }
1555
1556  bool is_first() { return !!success_; }
1557
1558 private:
1559  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1560    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchPipeWorker, message)
1561     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
1562     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, OnDone)
1563    IPC_END_MESSAGE_MAP()
1564    return true;
1565  }
1566
1567  scoped_ptr<SyncChannel> other_channel_;
1568  WaitableEvent* event1_;
1569  WaitableEvent* event2_;
1570  std::string other_channel_name_;
1571  int group_;
1572  int* success_;
1573};
1574
1575TEST_F(IPCSyncChannelTest, RestrictedDispatch4WayDeadlock) {
1576  int success = 0;
1577  std::vector<Worker*> workers;
1578  WaitableEvent event0(true, false);
1579  WaitableEvent event1(true, false);
1580  WaitableEvent event2(true, false);
1581  WaitableEvent event3(true, false);
1582  workers.push_back(new RestrictedDispatchPipeWorker(
1583        "channel0", &event0, "channel1", &event1, 1, &success));
1584  workers.push_back(new RestrictedDispatchPipeWorker(
1585        "channel1", &event1, "channel2", &event2, 2, NULL));
1586  workers.push_back(new RestrictedDispatchPipeWorker(
1587        "channel2", &event2, "channel3", &event3, 3, NULL));
1588  workers.push_back(new RestrictedDispatchPipeWorker(
1589        "channel3", &event3, "channel0", &event0, 4, NULL));
1590  RunTest(workers);
1591  EXPECT_EQ(3, success);
1592}
1593
1594//------------------------------------------------------------------------------
1595
1596// This test case inspired by crbug.com/122443
1597// We want to make sure a reply message with the unblock flag set correctly
1598// behaves as a reply, not a regular message.
1599// We have 3 workers. Server1 will send a message to Server2 (which will block),
1600// during which it will dispatch a message comming from Client, at which point
1601// it will send another message to Server2. While sending that second message it
1602// will receive a reply from Server1 with the unblock flag.
1603
1604class ReentrantReplyServer1 : public Worker {
1605 public:
1606  ReentrantReplyServer1(WaitableEvent* server_ready)
1607      : Worker("reentrant_reply1", Channel::MODE_SERVER),
1608        server_ready_(server_ready) { }
1609
1610  virtual void Run() OVERRIDE {
1611    server2_channel_ =
1612        SyncChannel::Create("reentrant_reply2",
1613                            IPC::Channel::MODE_CLIENT,
1614                            this,
1615                            ipc_thread().message_loop_proxy().get(),
1616                            true,
1617                            shutdown_event());
1618    server_ready_->Signal();
1619    Message* msg = new SyncChannelTestMsg_Reentrant1();
1620    server2_channel_->Send(msg);
1621    server2_channel_.reset();
1622    Done();
1623  }
1624
1625 private:
1626  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1627    IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer1, message)
1628     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant2, OnReentrant2)
1629     IPC_REPLY_HANDLER(OnReply)
1630    IPC_END_MESSAGE_MAP()
1631    return true;
1632  }
1633
1634  void OnReentrant2() {
1635    Message* msg = new SyncChannelTestMsg_Reentrant3();
1636    server2_channel_->Send(msg);
1637  }
1638
1639  void OnReply(const Message& message) {
1640    // If we get here, the Send() will never receive the reply (thus would
1641    // hang), so abort instead.
1642    LOG(FATAL) << "Reply message was dispatched";
1643  }
1644
1645  WaitableEvent* server_ready_;
1646  scoped_ptr<SyncChannel> server2_channel_;
1647};
1648
1649class ReentrantReplyServer2 : public Worker {
1650 public:
1651  ReentrantReplyServer2()
1652      : Worker("reentrant_reply2", Channel::MODE_SERVER),
1653        reply_(NULL) { }
1654
1655 private:
1656  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1657    IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer2, message)
1658     IPC_MESSAGE_HANDLER_DELAY_REPLY(
1659         SyncChannelTestMsg_Reentrant1, OnReentrant1)
1660     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant3, OnReentrant3)
1661    IPC_END_MESSAGE_MAP()
1662    return true;
1663  }
1664
1665  void OnReentrant1(Message* reply) {
1666    DCHECK(!reply_);
1667    reply_ = reply;
1668  }
1669
1670  void OnReentrant3() {
1671    DCHECK(reply_);
1672    Message* reply = reply_;
1673    reply_ = NULL;
1674    reply->set_unblock(true);
1675    Send(reply);
1676    Done();
1677  }
1678
1679  Message* reply_;
1680};
1681
1682class ReentrantReplyClient : public Worker {
1683 public:
1684  ReentrantReplyClient(WaitableEvent* server_ready)
1685      : Worker("reentrant_reply1", Channel::MODE_CLIENT),
1686        server_ready_(server_ready) { }
1687
1688  virtual void Run() OVERRIDE {
1689    server_ready_->Wait();
1690    Send(new SyncChannelTestMsg_Reentrant2());
1691    Done();
1692  }
1693
1694 private:
1695  WaitableEvent* server_ready_;
1696};
1697
1698TEST_F(IPCSyncChannelTest, ReentrantReply) {
1699  std::vector<Worker*> workers;
1700  WaitableEvent server_ready(false, false);
1701  workers.push_back(new ReentrantReplyServer2());
1702  workers.push_back(new ReentrantReplyServer1(&server_ready));
1703  workers.push_back(new ReentrantReplyClient(&server_ready));
1704  RunTest(workers);
1705}
1706
1707//------------------------------------------------------------------------------
1708
1709// Generate a validated channel ID using Channel::GenerateVerifiedChannelID().
1710
1711class VerifiedServer : public Worker {
1712 public:
1713  VerifiedServer(base::Thread* listener_thread,
1714                 const std::string& channel_name,
1715                 const std::string& reply_text)
1716      : Worker(channel_name, Channel::MODE_SERVER),
1717        reply_text_(reply_text) {
1718    Worker::OverrideThread(listener_thread);
1719  }
1720
1721  virtual void OnNestedTestMsg(Message* reply_msg) OVERRIDE {
1722    VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_;
1723    SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_);
1724    Send(reply_msg);
1725    ASSERT_EQ(channel()->GetPeerPID(), base::GetCurrentProcId());
1726    Done();
1727  }
1728
1729 private:
1730  std::string reply_text_;
1731};
1732
1733class VerifiedClient : public Worker {
1734 public:
1735  VerifiedClient(base::Thread* listener_thread,
1736                 const std::string& channel_name,
1737                 const std::string& expected_text)
1738      : Worker(channel_name, Channel::MODE_CLIENT),
1739        expected_text_(expected_text) {
1740    Worker::OverrideThread(listener_thread);
1741  }
1742
1743  virtual void Run() OVERRIDE {
1744    std::string response;
1745    SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response);
1746    bool result = Send(msg);
1747    DCHECK(result);
1748    DCHECK_EQ(response, expected_text_);
1749    // expected_text_ is only used in the above DCHECK. This line suppresses the
1750    // "unused private field" warning in release builds.
1751    (void)expected_text_;
1752
1753    VLOG(1) << __FUNCTION__ << " Received reply: " << response;
1754    ASSERT_EQ(channel()->GetPeerPID(), base::GetCurrentProcId());
1755    Done();
1756  }
1757
1758 private:
1759  std::string expected_text_;
1760};
1761
1762void Verified() {
1763  std::vector<Worker*> workers;
1764
1765  // A shared worker thread for servers
1766  base::Thread server_worker_thread("Verified_ServerListener");
1767  ASSERT_TRUE(server_worker_thread.Start());
1768
1769  base::Thread client_worker_thread("Verified_ClientListener");
1770  ASSERT_TRUE(client_worker_thread.Start());
1771
1772  std::string channel_id = Channel::GenerateVerifiedChannelID("Verified");
1773  Worker* worker;
1774
1775  worker = new VerifiedServer(&server_worker_thread,
1776                              channel_id,
1777                              "Got first message");
1778  workers.push_back(worker);
1779
1780  worker = new VerifiedClient(&client_worker_thread,
1781                              channel_id,
1782                              "Got first message");
1783  workers.push_back(worker);
1784
1785  RunTest(workers);
1786}
1787
1788// Windows needs to send an out-of-band secret to verify the client end of the
1789// channel. Test that we still connect correctly in that case.
1790TEST_F(IPCSyncChannelTest, Verified) {
1791  Verified();
1792}
1793
1794}  // namespace
1795}  // namespace IPC
1796