ipc_sync_channel_unittest.cc revision c2e0dbddbe15c98d52c4786dac06cb8952a8ae6d
1c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)// Use of this source code is governed by a BSD-style license that can be
3c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)// found in the LICENSE file.
4c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
5c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "ipc/ipc_sync_channel.h"
6c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
7c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include <string>
8c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include <vector>
9c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
10c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "base/basictypes.h"
11c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "base/bind.h"
12c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "base/logging.h"
13c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "base/memory/scoped_ptr.h"
14c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "base/message_loop.h"
15c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "base/process_util.h"
16c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "base/run_loop.h"
17c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "base/string_util.h"
18c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "base/threading/platform_thread.h"
19c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "base/threading/thread.h"
20c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "base/synchronization/waitable_event.h"
21c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "ipc/ipc_listener.h"
22c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "ipc/ipc_message.h"
23c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "ipc/ipc_sender.h"
24c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "ipc/ipc_sync_message_filter.h"
25c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "ipc/ipc_sync_message_unittest.h"
26c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)#include "testing/gtest/include/gtest/gtest.h"
27c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
28c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)using base::WaitableEvent;
29c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
30c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)namespace IPC {
3151b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)namespace {
32c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
33c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)// Base class for a "process" with listener and IPC threads.
34c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)class Worker : public Listener, public Sender {
35c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles) public:
36c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  // Will create a channel without a name.
37c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  Worker(Channel::Mode mode, const std::string& thread_name)
38323480423219ecd77329f8326dc5e0e3b50926d4Torne (Richard Coles)      : done_(new WaitableEvent(false, false)),
398abfc5808a4e34d6e03867af8bc440dee641886fTorne (Richard Coles)        channel_created_(new WaitableEvent(false, false)),
40323480423219ecd77329f8326dc5e0e3b50926d4Torne (Richard Coles)        mode_(mode),
418abfc5808a4e34d6e03867af8bc440dee641886fTorne (Richard Coles)        ipc_thread_((thread_name + "_ipc").c_str()),
428abfc5808a4e34d6e03867af8bc440dee641886fTorne (Richard Coles)        listener_thread_((thread_name + "_listener").c_str()),
43c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        overrided_thread_(NULL),
44c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        shutdown_event_(true, false),
45c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        is_shutdown_(false) {
46c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
47323480423219ecd77329f8326dc5e0e3b50926d4Torne (Richard Coles)
48323480423219ecd77329f8326dc5e0e3b50926d4Torne (Richard Coles)  // Will create a named channel and use this name for the threads' name.
49323480423219ecd77329f8326dc5e0e3b50926d4Torne (Richard Coles)  Worker(const std::string& channel_name, Channel::Mode mode)
50323480423219ecd77329f8326dc5e0e3b50926d4Torne (Richard Coles)      : done_(new WaitableEvent(false, false)),
51323480423219ecd77329f8326dc5e0e3b50926d4Torne (Richard Coles)        channel_created_(new WaitableEvent(false, false)),
52c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        channel_name_(channel_name),
53c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        mode_(mode),
54bfe3590b1806e3ff18f46ee3af5d4b83078f305aTorne (Richard Coles)        ipc_thread_((channel_name + "_ipc").c_str()),
55c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        listener_thread_((channel_name + "_listener").c_str()),
56c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        overrided_thread_(NULL),
57c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        shutdown_event_(true, false),
58c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        is_shutdown_(false) {
59c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
60c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
61c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  virtual ~Worker() {
62c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    // Shutdown() must be called before destruction.
63c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    CHECK(is_shutdown_);
64c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
65c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void AddRef() { }
66c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void Release() { }
67c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  virtual bool Send(Message* msg) OVERRIDE { return channel_->Send(msg); }
68c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  bool SendWithTimeout(Message* msg, int timeout_ms) {
69c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    return channel_->SendWithTimeout(msg, timeout_ms);
70c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
71c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void WaitForChannelCreation() { channel_created_->Wait(); }
72c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void CloseChannel() {
73c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    DCHECK(base::MessageLoop::current() == ListenerThread()->message_loop());
74c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    channel_->Close();
75c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
76c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void Start() {
77c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    StartThread(&listener_thread_, base::MessageLoop::TYPE_DEFAULT);
78c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    ListenerThread()->message_loop()->PostTask(
79c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        FROM_HERE, base::Bind(&Worker::OnStart, this));
80c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
81c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void Shutdown() {
82c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    // The IPC thread needs to outlive SyncChannel. We can't do this in
83c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    // ~Worker(), since that'll reset the vtable pointer (to Worker's), which
84c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    // may result in a race conditions. See http://crbug.com/25841.
85c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    WaitableEvent listener_done(false, false), ipc_done(false, false);
86c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    ListenerThread()->message_loop()->PostTask(
87c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown1, this,
88c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)                              &listener_done, &ipc_done));
89c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    listener_done.Wait();
90c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    ipc_done.Wait();
91c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    ipc_thread_.Stop();
92c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    listener_thread_.Stop();
93c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    is_shutdown_ = true;
94c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
95f523d2789ac2f83c4eca0ee4d5161bfdb5f2d052Torne (Richard Coles)  void OverrideThread(base::Thread* overrided_thread) {
96c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    DCHECK(overrided_thread_ == NULL);
97bfe3590b1806e3ff18f46ee3af5d4b83078f305aTorne (Richard Coles)    overrided_thread_ = overrided_thread;
98c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
99c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  bool SendAnswerToLife(bool pump, int timeout, bool succeed) {
100c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    int answer = 0;
101c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
102bfe3590b1806e3ff18f46ee3af5d4b83078f305aTorne (Richard Coles)    if (pump)
103c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)      msg->EnableMessagePumping();
104c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    bool result = SendWithTimeout(msg, timeout);
105c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    DCHECK_EQ(result, succeed);
106c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    DCHECK_EQ(answer, (succeed ? 42 : 0));
107c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    return result;
108c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
109c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  bool SendDouble(bool pump, bool succeed) {
110c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    int answer = 0;
111c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
112bfe3590b1806e3ff18f46ee3af5d4b83078f305aTorne (Richard Coles)    if (pump)
113c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)      msg->EnableMessagePumping();
114c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    bool result = Send(msg);
115c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    DCHECK_EQ(result, succeed);
116c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    DCHECK_EQ(answer, (succeed ? 10 : 0));
117c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    return result;
118c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
119c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  const std::string& channel_name() { return channel_name_; }
12009380295ba73501a205346becac22c6978e4671dTorne (Richard Coles)  Channel::Mode mode() { return mode_; }
121c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  WaitableEvent* done_event() { return done_.get(); }
122c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  WaitableEvent* shutdown_event() { return &shutdown_event_; }
123c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void ResetChannel() { channel_.reset(); }
124c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  // Derived classes need to call this when they've completed their part of
125c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  // the test.
126c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void Done() { done_->Signal(); }
127c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
12851b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles) protected:
129c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  SyncChannel* channel() { return channel_.get(); }
130c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  // Functions for dervied classes to implement if they wish.
131c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  virtual void Run() { }
13251b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)  virtual void OnAnswer(int* answer) { NOTREACHED(); }
13351b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)  virtual void OnAnswerDelay(Message* reply_msg) {
13451b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)    // The message handler map below can only take one entry for
13551b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)    // SyncChannelTestMsg_AnswerToLife, so since some classes want
13651b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)    // the normal version while other want the delayed reply, we
13751b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)    // call the normal version if the derived class didn't override
13851b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)    // this function.
13951b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)    int answer;
14051b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)    OnAnswer(&answer);
14151b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)    SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer);
14251b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)    Send(reply_msg);
14351b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)  }
14451b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)  virtual void OnDouble(int in, int* out) { NOTREACHED(); }
14551b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)  virtual void OnDoubleDelay(int in, Message* reply_msg) {
146c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    int result;
147c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    OnDouble(in, &result);
148c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result);
149c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    Send(reply_msg);
150c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
151c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
152c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  virtual void OnNestedTestMsg(Message* reply_msg) {
153c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    NOTREACHED();
154c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
155c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
156c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  virtual SyncChannel* CreateChannel() {
157c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    return new SyncChannel(
158c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        channel_name_, mode_, this, ipc_thread_.message_loop_proxy(), true,
159c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        &shutdown_event_);
160c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
161c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
162c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  base::Thread* ListenerThread() {
163c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    return overrided_thread_ ? overrided_thread_ : &listener_thread_;
164c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
165c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
166c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  const base::Thread& ipc_thread() const { return ipc_thread_; }
167c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
168c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles) private:
169c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  // Called on the listener thread to create the sync channel.
170c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void OnStart() {
17151b2906e11752df6c18351cf520e30522d3b53a1Torne (Richard Coles)    // Link ipc_thread_, listener_thread_ and channel_ altogether.
172c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    StartThread(&ipc_thread_, base::MessageLoop::TYPE_IO);
173c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    channel_.reset(CreateChannel());
174c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    channel_created_->Signal();
175c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    Run();
176c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
177c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
178c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void OnListenerThreadShutdown1(WaitableEvent* listener_event,
179c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)                                 WaitableEvent* ipc_event) {
180c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    // SyncChannel needs to be destructed on the thread that it was created on.
181c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    channel_.reset();
182c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
183c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    base::RunLoop().RunUntilIdle();
184c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
185c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    ipc_thread_.message_loop()->PostTask(
186c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        FROM_HERE, base::Bind(&Worker::OnIPCThreadShutdown, this,
187c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)                              listener_event, ipc_event));
188c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
189c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
190c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void OnIPCThreadShutdown(WaitableEvent* listener_event,
191c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)                           WaitableEvent* ipc_event) {
192c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    base::RunLoop().RunUntilIdle();
193c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    ipc_event->Signal();
194c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
195c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    listener_thread_.message_loop()->PostTask(
196c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)        FROM_HERE, base::Bind(&Worker::OnListenerThreadShutdown2, this,
197c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)                              listener_event));
198c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
199c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
200c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void OnListenerThreadShutdown2(WaitableEvent* listener_event) {
201c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    base::RunLoop().RunUntilIdle();
202c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    listener_event->Signal();
203c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
204c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
205c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
206c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    IPC_BEGIN_MESSAGE_MAP(Worker, message)
207c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)     IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay)
208c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)     IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife,
209c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)                                     OnAnswerDelay)
210c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)     IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelNestedTestMsg_String,
211c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)                                     OnNestedTestMsg)
212c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    IPC_END_MESSAGE_MAP()
213c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)    return true;
214c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  }
215c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
216c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)  void StartThread(base::Thread* thread, base::MessageLoop::Type type) {
217d5428f32f5d1719f774f62e19147104ca245a3abTorne (Richard Coles)    base::Thread::Options options;
218d5428f32f5d1719f774f62e19147104ca245a3abTorne (Richard Coles)    options.message_loop_type = type;
219d5428f32f5d1719f774f62e19147104ca245a3abTorne (Richard Coles)    thread->StartWithOptions(options);
220d5428f32f5d1719f774f62e19147104ca245a3abTorne (Richard Coles)  }
221c0e19a689c8ac22cdc96b291a8d33a5d3b0b34a4Torne (Richard Coles)
222  scoped_ptr<WaitableEvent> done_;
223  scoped_ptr<WaitableEvent> channel_created_;
224  std::string channel_name_;
225  Channel::Mode mode_;
226  scoped_ptr<SyncChannel> channel_;
227  base::Thread ipc_thread_;
228  base::Thread listener_thread_;
229  base::Thread* overrided_thread_;
230
231  base::WaitableEvent shutdown_event_;
232
233  bool is_shutdown_;
234
235  DISALLOW_COPY_AND_ASSIGN(Worker);
236};
237
238
239// Starts the test with the given workers.  This function deletes the workers
240// when it's done.
241void RunTest(std::vector<Worker*> workers) {
242  // First we create the workers that are channel servers, or else the other
243  // workers' channel initialization might fail because the pipe isn't created..
244  for (size_t i = 0; i < workers.size(); ++i) {
245    if (workers[i]->mode() & Channel::MODE_SERVER_FLAG) {
246      workers[i]->Start();
247      workers[i]->WaitForChannelCreation();
248    }
249  }
250
251  // now create the clients
252  for (size_t i = 0; i < workers.size(); ++i) {
253    if (workers[i]->mode() & Channel::MODE_CLIENT_FLAG)
254      workers[i]->Start();
255  }
256
257  // wait for all the workers to finish
258  for (size_t i = 0; i < workers.size(); ++i)
259    workers[i]->done_event()->Wait();
260
261  for (size_t i = 0; i < workers.size(); ++i) {
262    workers[i]->Shutdown();
263    delete workers[i];
264  }
265}
266
267class IPCSyncChannelTest : public testing::Test {
268 private:
269  base::MessageLoop message_loop_;
270};
271
272//------------------------------------------------------------------------------
273
274class SimpleServer : public Worker {
275 public:
276  explicit SimpleServer(bool pump_during_send)
277      : Worker(Channel::MODE_SERVER, "simpler_server"),
278        pump_during_send_(pump_during_send) { }
279  virtual void Run() OVERRIDE {
280    SendAnswerToLife(pump_during_send_, base::kNoTimeout, true);
281    Done();
282  }
283
284  bool pump_during_send_;
285};
286
287class SimpleClient : public Worker {
288 public:
289  SimpleClient() : Worker(Channel::MODE_CLIENT, "simple_client") { }
290
291  virtual void OnAnswer(int* answer) OVERRIDE {
292    *answer = 42;
293    Done();
294  }
295};
296
297void Simple(bool pump_during_send) {
298  std::vector<Worker*> workers;
299  workers.push_back(new SimpleServer(pump_during_send));
300  workers.push_back(new SimpleClient());
301  RunTest(workers);
302}
303
304// Tests basic synchronous call
305TEST_F(IPCSyncChannelTest, Simple) {
306  Simple(false);
307  Simple(true);
308}
309
310//------------------------------------------------------------------------------
311
312// Worker classes which override how the sync channel is created to use the
313// two-step initialization (calling the lightweight constructor and then
314// ChannelProxy::Init separately) process.
315class TwoStepServer : public Worker {
316 public:
317  explicit TwoStepServer(bool create_pipe_now)
318      : Worker(Channel::MODE_SERVER, "simpler_server"),
319        create_pipe_now_(create_pipe_now) { }
320
321  virtual void Run() OVERRIDE {
322    SendAnswerToLife(false, base::kNoTimeout, true);
323    Done();
324  }
325
326  virtual SyncChannel* CreateChannel() OVERRIDE {
327    SyncChannel* channel = new SyncChannel(
328        this, ipc_thread().message_loop_proxy(), shutdown_event());
329    channel->Init(channel_name(), mode(), create_pipe_now_);
330    return channel;
331  }
332
333  bool create_pipe_now_;
334};
335
336class TwoStepClient : public Worker {
337 public:
338  TwoStepClient(bool create_pipe_now)
339      : Worker(Channel::MODE_CLIENT, "simple_client"),
340        create_pipe_now_(create_pipe_now) { }
341
342  virtual void OnAnswer(int* answer) OVERRIDE {
343    *answer = 42;
344    Done();
345  }
346
347  virtual SyncChannel* CreateChannel() OVERRIDE {
348    SyncChannel* channel = new SyncChannel(
349        this, ipc_thread().message_loop_proxy(), shutdown_event());
350    channel->Init(channel_name(), mode(), create_pipe_now_);
351    return channel;
352  }
353
354  bool create_pipe_now_;
355};
356
357void TwoStep(bool create_server_pipe_now, bool create_client_pipe_now) {
358  std::vector<Worker*> workers;
359  workers.push_back(new TwoStepServer(create_server_pipe_now));
360  workers.push_back(new TwoStepClient(create_client_pipe_now));
361  RunTest(workers);
362}
363
364// Tests basic two-step initialization, where you call the lightweight
365// constructor then Init.
366TEST_F(IPCSyncChannelTest, TwoStepInitialization) {
367  TwoStep(false, false);
368  TwoStep(false, true);
369  TwoStep(true, false);
370  TwoStep(true, true);
371}
372
373//------------------------------------------------------------------------------
374
375class DelayClient : public Worker {
376 public:
377  DelayClient() : Worker(Channel::MODE_CLIENT, "delay_client") { }
378
379  virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE {
380    SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
381    Send(reply_msg);
382    Done();
383  }
384};
385
386void DelayReply(bool pump_during_send) {
387  std::vector<Worker*> workers;
388  workers.push_back(new SimpleServer(pump_during_send));
389  workers.push_back(new DelayClient());
390  RunTest(workers);
391}
392
393// Tests that asynchronous replies work
394TEST_F(IPCSyncChannelTest, DelayReply) {
395  DelayReply(false);
396  DelayReply(true);
397}
398
399//------------------------------------------------------------------------------
400
401class NoHangServer : public Worker {
402 public:
403  NoHangServer(WaitableEvent* got_first_reply, bool pump_during_send)
404      : Worker(Channel::MODE_SERVER, "no_hang_server"),
405        got_first_reply_(got_first_reply),
406        pump_during_send_(pump_during_send) { }
407  virtual void Run() OVERRIDE {
408    SendAnswerToLife(pump_during_send_, base::kNoTimeout, true);
409    got_first_reply_->Signal();
410
411    SendAnswerToLife(pump_during_send_, base::kNoTimeout, false);
412    Done();
413  }
414
415  WaitableEvent* got_first_reply_;
416  bool pump_during_send_;
417};
418
419class NoHangClient : public Worker {
420 public:
421  explicit NoHangClient(WaitableEvent* got_first_reply)
422    : Worker(Channel::MODE_CLIENT, "no_hang_client"),
423      got_first_reply_(got_first_reply) { }
424
425  virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE {
426    // Use the DELAY_REPLY macro so that we can force the reply to be sent
427    // before this function returns (when the channel will be reset).
428    SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
429    Send(reply_msg);
430    got_first_reply_->Wait();
431    CloseChannel();
432    Done();
433  }
434
435  WaitableEvent* got_first_reply_;
436};
437
438void NoHang(bool pump_during_send) {
439  WaitableEvent got_first_reply(false, false);
440  std::vector<Worker*> workers;
441  workers.push_back(new NoHangServer(&got_first_reply, pump_during_send));
442  workers.push_back(new NoHangClient(&got_first_reply));
443  RunTest(workers);
444}
445
446// Tests that caller doesn't hang if receiver dies
447TEST_F(IPCSyncChannelTest, NoHang) {
448  NoHang(false);
449  NoHang(true);
450}
451
452//------------------------------------------------------------------------------
453
454class UnblockServer : public Worker {
455 public:
456  UnblockServer(bool pump_during_send, bool delete_during_send)
457    : Worker(Channel::MODE_SERVER, "unblock_server"),
458      pump_during_send_(pump_during_send),
459      delete_during_send_(delete_during_send) { }
460  virtual void Run() OVERRIDE {
461    if (delete_during_send_) {
462      // Use custom code since race conditions mean the answer may or may not be
463      // available.
464      int answer = 0;
465      SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
466      if (pump_during_send_)
467        msg->EnableMessagePumping();
468      Send(msg);
469    } else {
470      SendAnswerToLife(pump_during_send_, base::kNoTimeout, true);
471    }
472    Done();
473  }
474
475  virtual void OnDoubleDelay(int in, Message* reply_msg) OVERRIDE {
476    SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
477    Send(reply_msg);
478    if (delete_during_send_)
479      ResetChannel();
480  }
481
482  bool pump_during_send_;
483  bool delete_during_send_;
484};
485
486class UnblockClient : public Worker {
487 public:
488  explicit UnblockClient(bool pump_during_send)
489    : Worker(Channel::MODE_CLIENT, "unblock_client"),
490      pump_during_send_(pump_during_send) { }
491
492  virtual void OnAnswer(int* answer) OVERRIDE {
493    SendDouble(pump_during_send_, true);
494    *answer = 42;
495    Done();
496  }
497
498  bool pump_during_send_;
499};
500
501void Unblock(bool server_pump, bool client_pump, bool delete_during_send) {
502  std::vector<Worker*> workers;
503  workers.push_back(new UnblockServer(server_pump, delete_during_send));
504  workers.push_back(new UnblockClient(client_pump));
505  RunTest(workers);
506}
507
508// Tests that the caller unblocks to answer a sync message from the receiver.
509TEST_F(IPCSyncChannelTest, Unblock) {
510  Unblock(false, false, false);
511  Unblock(false, true, false);
512  Unblock(true, false, false);
513  Unblock(true, true, false);
514}
515
516//------------------------------------------------------------------------------
517
518// Tests that the the SyncChannel object can be deleted during a Send.
519TEST_F(IPCSyncChannelTest, ChannelDeleteDuringSend) {
520  Unblock(false, false, true);
521  Unblock(false, true, true);
522  Unblock(true, false, true);
523  Unblock(true, true, true);
524}
525
526//------------------------------------------------------------------------------
527
528class RecursiveServer : public Worker {
529 public:
530  RecursiveServer(bool expected_send_result, bool pump_first, bool pump_second)
531      : Worker(Channel::MODE_SERVER, "recursive_server"),
532        expected_send_result_(expected_send_result),
533        pump_first_(pump_first), pump_second_(pump_second) {}
534  virtual void Run() OVERRIDE {
535    SendDouble(pump_first_, expected_send_result_);
536    Done();
537  }
538
539  virtual void OnDouble(int in, int* out) OVERRIDE {
540    *out = in * 2;
541    SendAnswerToLife(pump_second_, base::kNoTimeout, expected_send_result_);
542  }
543
544  bool expected_send_result_, pump_first_, pump_second_;
545};
546
547class RecursiveClient : public Worker {
548 public:
549  RecursiveClient(bool pump_during_send, bool close_channel)
550      : Worker(Channel::MODE_CLIENT, "recursive_client"),
551        pump_during_send_(pump_during_send), close_channel_(close_channel) {}
552
553  virtual void OnDoubleDelay(int in, Message* reply_msg) OVERRIDE {
554    SendDouble(pump_during_send_, !close_channel_);
555    if (close_channel_) {
556      delete reply_msg;
557    } else {
558      SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
559      Send(reply_msg);
560    }
561    Done();
562  }
563
564  virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE {
565    if (close_channel_) {
566      delete reply_msg;
567      CloseChannel();
568    } else {
569      SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
570      Send(reply_msg);
571    }
572  }
573
574  bool pump_during_send_, close_channel_;
575};
576
577void Recursive(
578    bool server_pump_first, bool server_pump_second, bool client_pump) {
579  std::vector<Worker*> workers;
580  workers.push_back(
581      new RecursiveServer(true, server_pump_first, server_pump_second));
582  workers.push_back(new RecursiveClient(client_pump, false));
583  RunTest(workers);
584}
585
586// Tests a server calling Send while another Send is pending.
587TEST_F(IPCSyncChannelTest, Recursive) {
588  Recursive(false, false, false);
589  Recursive(false, false, true);
590  Recursive(false, true, false);
591  Recursive(false, true, true);
592  Recursive(true, false, false);
593  Recursive(true, false, true);
594  Recursive(true, true, false);
595  Recursive(true, true, true);
596}
597
598//------------------------------------------------------------------------------
599
600void RecursiveNoHang(
601    bool server_pump_first, bool server_pump_second, bool client_pump) {
602  std::vector<Worker*> workers;
603  workers.push_back(
604      new RecursiveServer(false, server_pump_first, server_pump_second));
605  workers.push_back(new RecursiveClient(client_pump, true));
606  RunTest(workers);
607}
608
609// Tests that if a caller makes a sync call during an existing sync call and
610// the receiver dies, neither of the Send() calls hang.
611TEST_F(IPCSyncChannelTest, RecursiveNoHang) {
612  RecursiveNoHang(false, false, false);
613  RecursiveNoHang(false, false, true);
614  RecursiveNoHang(false, true, false);
615  RecursiveNoHang(false, true, true);
616  RecursiveNoHang(true, false, false);
617  RecursiveNoHang(true, false, true);
618  RecursiveNoHang(true, true, false);
619  RecursiveNoHang(true, true, true);
620}
621
622//------------------------------------------------------------------------------
623
624class MultipleServer1 : public Worker {
625 public:
626  explicit MultipleServer1(bool pump_during_send)
627    : Worker("test_channel1", Channel::MODE_SERVER),
628      pump_during_send_(pump_during_send) { }
629
630  virtual void Run() OVERRIDE {
631    SendDouble(pump_during_send_, true);
632    Done();
633  }
634
635  bool pump_during_send_;
636};
637
638class MultipleClient1 : public Worker {
639 public:
640  MultipleClient1(WaitableEvent* client1_msg_received,
641                  WaitableEvent* client1_can_reply) :
642      Worker("test_channel1", Channel::MODE_CLIENT),
643      client1_msg_received_(client1_msg_received),
644      client1_can_reply_(client1_can_reply) { }
645
646  virtual void OnDouble(int in, int* out) OVERRIDE {
647    client1_msg_received_->Signal();
648    *out = in * 2;
649    client1_can_reply_->Wait();
650    Done();
651  }
652
653 private:
654  WaitableEvent *client1_msg_received_, *client1_can_reply_;
655};
656
657class MultipleServer2 : public Worker {
658 public:
659  MultipleServer2() : Worker("test_channel2", Channel::MODE_SERVER) { }
660
661  virtual void OnAnswer(int* result) OVERRIDE {
662    *result = 42;
663    Done();
664  }
665};
666
667class MultipleClient2 : public Worker {
668 public:
669  MultipleClient2(
670    WaitableEvent* client1_msg_received, WaitableEvent* client1_can_reply,
671    bool pump_during_send)
672    : Worker("test_channel2", Channel::MODE_CLIENT),
673      client1_msg_received_(client1_msg_received),
674      client1_can_reply_(client1_can_reply),
675      pump_during_send_(pump_during_send) { }
676
677  virtual void Run() OVERRIDE {
678    client1_msg_received_->Wait();
679    SendAnswerToLife(pump_during_send_, base::kNoTimeout, true);
680    client1_can_reply_->Signal();
681    Done();
682  }
683
684 private:
685  WaitableEvent *client1_msg_received_, *client1_can_reply_;
686  bool pump_during_send_;
687};
688
689void Multiple(bool server_pump, bool client_pump) {
690  std::vector<Worker*> workers;
691
692  // A shared worker thread so that server1 and server2 run on one thread.
693  base::Thread worker_thread("Multiple");
694  ASSERT_TRUE(worker_thread.Start());
695
696  // Server1 sends a sync msg to client1, which blocks the reply until
697  // server2 (which runs on the same worker thread as server1) responds
698  // to a sync msg from client2.
699  WaitableEvent client1_msg_received(false, false);
700  WaitableEvent client1_can_reply(false, false);
701
702  Worker* worker;
703
704  worker = new MultipleServer2();
705  worker->OverrideThread(&worker_thread);
706  workers.push_back(worker);
707
708  worker = new MultipleClient2(
709      &client1_msg_received, &client1_can_reply, client_pump);
710  workers.push_back(worker);
711
712  worker = new MultipleServer1(server_pump);
713  worker->OverrideThread(&worker_thread);
714  workers.push_back(worker);
715
716  worker = new MultipleClient1(
717      &client1_msg_received, &client1_can_reply);
718  workers.push_back(worker);
719
720  RunTest(workers);
721}
722
723// Tests that multiple SyncObjects on the same listener thread can unblock each
724// other.
725TEST_F(IPCSyncChannelTest, Multiple) {
726  Multiple(false, false);
727  Multiple(false, true);
728  Multiple(true, false);
729  Multiple(true, true);
730}
731
732//------------------------------------------------------------------------------
733
734// This class provides server side functionality to test the case where
735// multiple sync channels are in use on the same thread on the client and
736// nested calls are issued.
737class QueuedReplyServer : public Worker {
738 public:
739  QueuedReplyServer(base::Thread* listener_thread,
740                    const std::string& channel_name,
741                    const std::string& reply_text)
742      : Worker(channel_name, Channel::MODE_SERVER),
743        reply_text_(reply_text) {
744    Worker::OverrideThread(listener_thread);
745  }
746
747  virtual void OnNestedTestMsg(Message* reply_msg) OVERRIDE {
748    VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_;
749    SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_);
750    Send(reply_msg);
751    Done();
752  }
753
754 private:
755  std::string reply_text_;
756};
757
758// The QueuedReplyClient class provides functionality to test the case where
759// multiple sync channels are in use on the same thread and they make nested
760// sync calls, i.e. while the first channel waits for a response it makes a
761// sync call on another channel.
762// The callstack should unwind correctly, i.e. the outermost call should
763// complete first, and so on.
764class QueuedReplyClient : public Worker {
765 public:
766  QueuedReplyClient(base::Thread* listener_thread,
767                    const std::string& channel_name,
768                    const std::string& expected_text,
769                    bool pump_during_send)
770      : Worker(channel_name, Channel::MODE_CLIENT),
771        pump_during_send_(pump_during_send),
772        expected_text_(expected_text) {
773    Worker::OverrideThread(listener_thread);
774  }
775
776  virtual void Run() OVERRIDE {
777    std::string response;
778    SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response);
779    if (pump_during_send_)
780      msg->EnableMessagePumping();
781    bool result = Send(msg);
782    DCHECK(result);
783    DCHECK_EQ(response, expected_text_);
784
785    VLOG(1) << __FUNCTION__ << " Received reply: " << response;
786    Done();
787  }
788
789 private:
790  bool pump_during_send_;
791  std::string expected_text_;
792};
793
794void QueuedReply(bool client_pump) {
795  std::vector<Worker*> workers;
796
797  // A shared worker thread for servers
798  base::Thread server_worker_thread("QueuedReply_ServerListener");
799  ASSERT_TRUE(server_worker_thread.Start());
800
801  base::Thread client_worker_thread("QueuedReply_ClientListener");
802  ASSERT_TRUE(client_worker_thread.Start());
803
804  Worker* worker;
805
806  worker = new QueuedReplyServer(&server_worker_thread,
807                                 "QueuedReply_Server1",
808                                 "Got first message");
809  workers.push_back(worker);
810
811  worker = new QueuedReplyServer(&server_worker_thread,
812                                 "QueuedReply_Server2",
813                                 "Got second message");
814  workers.push_back(worker);
815
816  worker = new QueuedReplyClient(&client_worker_thread,
817                                 "QueuedReply_Server1",
818                                 "Got first message",
819                                 client_pump);
820  workers.push_back(worker);
821
822  worker = new QueuedReplyClient(&client_worker_thread,
823                                 "QueuedReply_Server2",
824                                 "Got second message",
825                                 client_pump);
826  workers.push_back(worker);
827
828  RunTest(workers);
829}
830
831// While a blocking send is in progress, the listener thread might answer other
832// synchronous messages.  This tests that if during the response to another
833// message the reply to the original messages comes, it is queued up correctly
834// and the original Send is unblocked later.
835// We also test that the send call stacks unwind correctly when the channel
836// pumps messages while waiting for a response.
837TEST_F(IPCSyncChannelTest, QueuedReply) {
838  QueuedReply(false);
839  QueuedReply(true);
840}
841
842//------------------------------------------------------------------------------
843
844class ChattyClient : public Worker {
845 public:
846  ChattyClient() :
847      Worker(Channel::MODE_CLIENT, "chatty_client") { }
848
849  virtual void OnAnswer(int* answer) OVERRIDE {
850    // The PostMessage limit is 10k.  Send 20% more than that.
851    const int kMessageLimit = 10000;
852    const int kMessagesToSend = kMessageLimit * 120 / 100;
853    for (int i = 0; i < kMessagesToSend; ++i) {
854      if (!SendDouble(false, true))
855        break;
856    }
857    *answer = 42;
858    Done();
859  }
860};
861
862void ChattyServer(bool pump_during_send) {
863  std::vector<Worker*> workers;
864  workers.push_back(new UnblockServer(pump_during_send, false));
865  workers.push_back(new ChattyClient());
866  RunTest(workers);
867}
868
869// Tests http://b/1093251 - that sending lots of sync messages while
870// the receiver is waiting for a sync reply does not overflow the PostMessage
871// queue.
872TEST_F(IPCSyncChannelTest, ChattyServer) {
873  ChattyServer(false);
874  ChattyServer(true);
875}
876
877//------------------------------------------------------------------------------
878
879class TimeoutServer : public Worker {
880 public:
881  TimeoutServer(int timeout_ms,
882                std::vector<bool> timeout_seq,
883                bool pump_during_send)
884      : Worker(Channel::MODE_SERVER, "timeout_server"),
885        timeout_ms_(timeout_ms),
886        timeout_seq_(timeout_seq),
887        pump_during_send_(pump_during_send) {
888  }
889
890  virtual void Run() OVERRIDE {
891    for (std::vector<bool>::const_iterator iter = timeout_seq_.begin();
892         iter != timeout_seq_.end(); ++iter) {
893      SendAnswerToLife(pump_during_send_, timeout_ms_, !*iter);
894    }
895    Done();
896  }
897
898 private:
899  int timeout_ms_;
900  std::vector<bool> timeout_seq_;
901  bool pump_during_send_;
902};
903
904class UnresponsiveClient : public Worker {
905 public:
906  explicit UnresponsiveClient(std::vector<bool> timeout_seq)
907      : Worker(Channel::MODE_CLIENT, "unresponsive_client"),
908        timeout_seq_(timeout_seq) {
909  }
910
911  virtual void OnAnswerDelay(Message* reply_msg) OVERRIDE {
912    DCHECK(!timeout_seq_.empty());
913    if (!timeout_seq_[0]) {
914      SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
915      Send(reply_msg);
916    } else {
917      // Don't reply.
918      delete reply_msg;
919    }
920    timeout_seq_.erase(timeout_seq_.begin());
921    if (timeout_seq_.empty())
922      Done();
923  }
924
925 private:
926  // Whether we should time-out or respond to the various messages we receive.
927  std::vector<bool> timeout_seq_;
928};
929
930void SendWithTimeoutOK(bool pump_during_send) {
931  std::vector<Worker*> workers;
932  std::vector<bool> timeout_seq;
933  timeout_seq.push_back(false);
934  timeout_seq.push_back(false);
935  timeout_seq.push_back(false);
936  workers.push_back(new TimeoutServer(5000, timeout_seq, pump_during_send));
937  workers.push_back(new SimpleClient());
938  RunTest(workers);
939}
940
941void SendWithTimeoutTimeout(bool pump_during_send) {
942  std::vector<Worker*> workers;
943  std::vector<bool> timeout_seq;
944  timeout_seq.push_back(true);
945  timeout_seq.push_back(false);
946  timeout_seq.push_back(false);
947  workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send));
948  workers.push_back(new UnresponsiveClient(timeout_seq));
949  RunTest(workers);
950}
951
952void SendWithTimeoutMixedOKAndTimeout(bool pump_during_send) {
953  std::vector<Worker*> workers;
954  std::vector<bool> timeout_seq;
955  timeout_seq.push_back(true);
956  timeout_seq.push_back(false);
957  timeout_seq.push_back(false);
958  timeout_seq.push_back(true);
959  timeout_seq.push_back(false);
960  workers.push_back(new TimeoutServer(100, timeout_seq, pump_during_send));
961  workers.push_back(new UnresponsiveClient(timeout_seq));
962  RunTest(workers);
963}
964
965// Tests that SendWithTimeout does not time-out if the response comes back fast
966// enough.
967TEST_F(IPCSyncChannelTest, SendWithTimeoutOK) {
968  SendWithTimeoutOK(false);
969  SendWithTimeoutOK(true);
970}
971
972// Tests that SendWithTimeout does time-out.
973TEST_F(IPCSyncChannelTest, SendWithTimeoutTimeout) {
974  SendWithTimeoutTimeout(false);
975  SendWithTimeoutTimeout(true);
976}
977
978// Sends some message that time-out and some that succeed.
979TEST_F(IPCSyncChannelTest, SendWithTimeoutMixedOKAndTimeout) {
980  SendWithTimeoutMixedOKAndTimeout(false);
981  SendWithTimeoutMixedOKAndTimeout(true);
982}
983
984//------------------------------------------------------------------------------
985
986void NestedCallback(Worker* server) {
987  // Sleep a bit so that we wake up after the reply has been received.
988  base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(250));
989  server->SendAnswerToLife(true, base::kNoTimeout, true);
990}
991
992bool timeout_occurred = false;
993
994void TimeoutCallback() {
995  timeout_occurred = true;
996}
997
998class DoneEventRaceServer : public Worker {
999 public:
1000  DoneEventRaceServer()
1001      : Worker(Channel::MODE_SERVER, "done_event_race_server") { }
1002
1003  virtual void Run() OVERRIDE {
1004    base::MessageLoop::current()->PostTask(FROM_HERE,
1005                                           base::Bind(&NestedCallback, this));
1006    base::MessageLoop::current()->PostDelayedTask(
1007        FROM_HERE,
1008        base::Bind(&TimeoutCallback),
1009        base::TimeDelta::FromSeconds(9));
1010    // Even though we have a timeout on the Send, it will succeed since for this
1011    // bug, the reply message comes back and is deserialized, however the done
1012    // event wasn't set.  So we indirectly use the timeout task to notice if a
1013    // timeout occurred.
1014    SendAnswerToLife(true, 10000, true);
1015    DCHECK(!timeout_occurred);
1016    Done();
1017  }
1018};
1019
1020// Tests http://b/1474092 - that if after the done_event is set but before
1021// OnObjectSignaled is called another message is sent out, then after its
1022// reply comes back OnObjectSignaled will be called for the first message.
1023TEST_F(IPCSyncChannelTest, DoneEventRace) {
1024  std::vector<Worker*> workers;
1025  workers.push_back(new DoneEventRaceServer());
1026  workers.push_back(new SimpleClient());
1027  RunTest(workers);
1028}
1029
1030//------------------------------------------------------------------------------
1031
1032class TestSyncMessageFilter : public SyncMessageFilter {
1033 public:
1034  TestSyncMessageFilter(base::WaitableEvent* shutdown_event,
1035                        Worker* worker,
1036                        scoped_refptr<base::MessageLoopProxy> message_loop)
1037      : SyncMessageFilter(shutdown_event),
1038        worker_(worker),
1039        message_loop_(message_loop) {
1040  }
1041
1042  virtual void OnFilterAdded(Channel* channel) OVERRIDE {
1043    SyncMessageFilter::OnFilterAdded(channel);
1044    message_loop_->PostTask(
1045        FROM_HERE,
1046        base::Bind(&TestSyncMessageFilter::SendMessageOnHelperThread, this));
1047  }
1048
1049  void SendMessageOnHelperThread() {
1050    int answer = 0;
1051    bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
1052    DCHECK(result);
1053    DCHECK_EQ(answer, 42);
1054
1055    worker_->Done();
1056  }
1057
1058 private:
1059  virtual ~TestSyncMessageFilter() {}
1060
1061  Worker* worker_;
1062  scoped_refptr<base::MessageLoopProxy> message_loop_;
1063};
1064
1065class SyncMessageFilterServer : public Worker {
1066 public:
1067  SyncMessageFilterServer()
1068      : Worker(Channel::MODE_SERVER, "sync_message_filter_server"),
1069        thread_("helper_thread") {
1070    base::Thread::Options options;
1071    options.message_loop_type = base::MessageLoop::TYPE_DEFAULT;
1072    thread_.StartWithOptions(options);
1073    filter_ = new TestSyncMessageFilter(shutdown_event(), this,
1074                                        thread_.message_loop_proxy());
1075  }
1076
1077  virtual void Run() OVERRIDE {
1078    channel()->AddFilter(filter_.get());
1079  }
1080
1081  base::Thread thread_;
1082  scoped_refptr<TestSyncMessageFilter> filter_;
1083};
1084
1085// This class provides functionality to test the case that a Send on the sync
1086// channel does not crash after the channel has been closed.
1087class ServerSendAfterClose : public Worker {
1088 public:
1089  ServerSendAfterClose()
1090     : Worker(Channel::MODE_SERVER, "simpler_server"),
1091       send_result_(true) {
1092  }
1093
1094  bool SendDummy() {
1095    ListenerThread()->message_loop()->PostTask(
1096        FROM_HERE, base::Bind(base::IgnoreResult(&ServerSendAfterClose::Send),
1097                              this, new SyncChannelTestMsg_NoArgs));
1098    return true;
1099  }
1100
1101  bool send_result() const {
1102    return send_result_;
1103  }
1104
1105 private:
1106  virtual void Run() OVERRIDE {
1107    CloseChannel();
1108    Done();
1109  }
1110
1111  virtual bool Send(Message* msg) OVERRIDE {
1112    send_result_ = Worker::Send(msg);
1113    Done();
1114    return send_result_;
1115  }
1116
1117  bool send_result_;
1118};
1119
1120// Tests basic synchronous call
1121TEST_F(IPCSyncChannelTest, SyncMessageFilter) {
1122  std::vector<Worker*> workers;
1123  workers.push_back(new SyncMessageFilterServer());
1124  workers.push_back(new SimpleClient());
1125  RunTest(workers);
1126}
1127
1128// Test the case when the channel is closed and a Send is attempted after that.
1129TEST_F(IPCSyncChannelTest, SendAfterClose) {
1130  ServerSendAfterClose server;
1131  server.Start();
1132
1133  server.done_event()->Wait();
1134  server.done_event()->Reset();
1135
1136  server.SendDummy();
1137  server.done_event()->Wait();
1138
1139  EXPECT_FALSE(server.send_result());
1140
1141  server.Shutdown();
1142}
1143
1144//------------------------------------------------------------------------------
1145
1146class RestrictedDispatchServer : public Worker {
1147 public:
1148  RestrictedDispatchServer(WaitableEvent* sent_ping_event,
1149                           WaitableEvent* wait_event)
1150      : Worker("restricted_channel", Channel::MODE_SERVER),
1151        sent_ping_event_(sent_ping_event),
1152        wait_event_(wait_event) { }
1153
1154  void OnDoPing(int ping) {
1155    // Send an asynchronous message that unblocks the caller.
1156    Message* msg = new SyncChannelTestMsg_Ping(ping);
1157    msg->set_unblock(true);
1158    Send(msg);
1159    // Signal the event after the message has been sent on the channel, on the
1160    // IPC thread.
1161    ipc_thread().message_loop()->PostTask(
1162        FROM_HERE, base::Bind(&RestrictedDispatchServer::OnPingSent, this));
1163  }
1164
1165  void OnPingTTL(int ping, int* out) {
1166    *out = ping;
1167    wait_event_->Wait();
1168  }
1169
1170  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1171
1172 private:
1173  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1174    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchServer, message)
1175     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1176     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
1177     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1178    IPC_END_MESSAGE_MAP()
1179    return true;
1180  }
1181
1182  void OnPingSent() {
1183    sent_ping_event_->Signal();
1184  }
1185
1186  void OnNoArgs() { }
1187  WaitableEvent* sent_ping_event_;
1188  WaitableEvent* wait_event_;
1189};
1190
1191class NonRestrictedDispatchServer : public Worker {
1192 public:
1193  NonRestrictedDispatchServer(WaitableEvent* signal_event)
1194      : Worker("non_restricted_channel", Channel::MODE_SERVER),
1195        signal_event_(signal_event) {}
1196
1197  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1198
1199  void OnDoPingTTL(int ping) {
1200    int value = 0;
1201    Send(new SyncChannelTestMsg_PingTTL(ping, &value));
1202    signal_event_->Signal();
1203  }
1204
1205 private:
1206  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1207    IPC_BEGIN_MESSAGE_MAP(NonRestrictedDispatchServer, message)
1208     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1209     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1210    IPC_END_MESSAGE_MAP()
1211    return true;
1212  }
1213
1214  void OnNoArgs() { }
1215  WaitableEvent* signal_event_;
1216};
1217
1218class RestrictedDispatchClient : public Worker {
1219 public:
1220  RestrictedDispatchClient(WaitableEvent* sent_ping_event,
1221                           RestrictedDispatchServer* server,
1222                           NonRestrictedDispatchServer* server2,
1223                           int* success)
1224      : Worker("restricted_channel", Channel::MODE_CLIENT),
1225        ping_(0),
1226        server_(server),
1227        server2_(server2),
1228        success_(success),
1229        sent_ping_event_(sent_ping_event) {}
1230
1231  virtual void Run() OVERRIDE {
1232    // Incoming messages from our channel should only be dispatched when we
1233    // send a message on that same channel.
1234    channel()->SetRestrictDispatchChannelGroup(1);
1235
1236    server_->ListenerThread()->message_loop()->PostTask(
1237        FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 1));
1238    sent_ping_event_->Wait();
1239    Send(new SyncChannelTestMsg_NoArgs);
1240    if (ping_ == 1)
1241      ++*success_;
1242    else
1243      LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
1244
1245    non_restricted_channel_.reset(new SyncChannel(
1246        "non_restricted_channel", Channel::MODE_CLIENT, this,
1247        ipc_thread().message_loop_proxy(), true, shutdown_event()));
1248
1249    server_->ListenerThread()->message_loop()->PostTask(
1250        FROM_HERE, base::Bind(&RestrictedDispatchServer::OnDoPing, server_, 2));
1251    sent_ping_event_->Wait();
1252    // Check that the incoming message is *not* dispatched when sending on the
1253    // non restricted channel.
1254    // TODO(piman): there is a possibility of a false positive race condition
1255    // here, if the message that was posted on the server-side end of the pipe
1256    // is not visible yet on the client side, but I don't know how to solve this
1257    // without hooking into the internals of SyncChannel. I haven't seen it in
1258    // practice (i.e. not setting SetRestrictDispatchToSameChannel does cause
1259    // the following to fail).
1260    non_restricted_channel_->Send(new SyncChannelTestMsg_NoArgs);
1261    if (ping_ == 1)
1262      ++*success_;
1263    else
1264      LOG(ERROR) << "Send dispatched message from restricted channel";
1265
1266    Send(new SyncChannelTestMsg_NoArgs);
1267    if (ping_ == 2)
1268      ++*success_;
1269    else
1270      LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
1271
1272    // Check that the incoming message on the non-restricted channel is
1273    // dispatched when sending on the restricted channel.
1274    server2_->ListenerThread()->message_loop()->PostTask(
1275        FROM_HERE,
1276        base::Bind(&NonRestrictedDispatchServer::OnDoPingTTL, server2_, 3));
1277    int value = 0;
1278    Send(new SyncChannelTestMsg_PingTTL(4, &value));
1279    if (ping_ == 3 && value == 4)
1280      ++*success_;
1281    else
1282      LOG(ERROR) << "Send failed to dispatch message from unrestricted channel";
1283
1284    non_restricted_channel_->Send(new SyncChannelTestMsg_Done);
1285    non_restricted_channel_.reset();
1286    Send(new SyncChannelTestMsg_Done);
1287    Done();
1288  }
1289
1290 private:
1291  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1292    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchClient, message)
1293     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Ping, OnPing)
1294     IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_PingTTL, OnPingTTL)
1295    IPC_END_MESSAGE_MAP()
1296    return true;
1297  }
1298
1299  void OnPing(int ping) {
1300    ping_ = ping;
1301  }
1302
1303  void OnPingTTL(int ping, IPC::Message* reply) {
1304    ping_ = ping;
1305    // This message comes from the NonRestrictedDispatchServer, we have to send
1306    // the reply back manually.
1307    SyncChannelTestMsg_PingTTL::WriteReplyParams(reply, ping);
1308    non_restricted_channel_->Send(reply);
1309  }
1310
1311  int ping_;
1312  RestrictedDispatchServer* server_;
1313  NonRestrictedDispatchServer* server2_;
1314  int* success_;
1315  WaitableEvent* sent_ping_event_;
1316  scoped_ptr<SyncChannel> non_restricted_channel_;
1317};
1318
1319TEST_F(IPCSyncChannelTest, RestrictedDispatch) {
1320  WaitableEvent sent_ping_event(false, false);
1321  WaitableEvent wait_event(false, false);
1322  RestrictedDispatchServer* server =
1323      new RestrictedDispatchServer(&sent_ping_event, &wait_event);
1324  NonRestrictedDispatchServer* server2 =
1325      new NonRestrictedDispatchServer(&wait_event);
1326
1327  int success = 0;
1328  std::vector<Worker*> workers;
1329  workers.push_back(server);
1330  workers.push_back(server2);
1331  workers.push_back(new RestrictedDispatchClient(
1332      &sent_ping_event, server, server2, &success));
1333  RunTest(workers);
1334  EXPECT_EQ(4, success);
1335}
1336
1337//------------------------------------------------------------------------------
1338
1339// This test case inspired by crbug.com/108491
1340// We create two servers that use the same ListenerThread but have
1341// SetRestrictDispatchToSameChannel set to true.
1342// We create clients, then use some specific WaitableEvent wait/signalling to
1343// ensure that messages get dispatched in a way that causes a deadlock due to
1344// a nested dispatch and an eligible message in a higher-level dispatch's
1345// delayed_queue. Specifically, we start with client1 about so send an
1346// unblocking message to server1, while the shared listener thread for the
1347// servers server1 and server2 is about to send a non-unblocking message to
1348// client1. At the same time, client2 will be about to send an unblocking
1349// message to server2. Server1 will handle the client1->server1 message by
1350// telling server2 to send a non-unblocking message to client2.
1351// What should happen is that the send to server2 should find the pending,
1352// same-context client2->server2 message to dispatch, causing client2 to
1353// unblock then handle the server2->client2 message, so that the shared
1354// servers' listener thread can then respond to the client1->server1 message.
1355// Then client1 can handle the non-unblocking server1->client1 message.
1356// The old code would end up in a state where the server2->client2 message is
1357// sent, but the client2->server2 message (which is eligible for dispatch, and
1358// which is what client2 is waiting for) is stashed in a local delayed_queue
1359// that has server1's channel context, causing a deadlock.
1360// WaitableEvents in the events array are used to:
1361//   event 0: indicate to client1 that server listener is in OnDoServerTask
1362//   event 1: indicate to client1 that client2 listener is in OnDoClient2Task
1363//   event 2: indicate to server1 that client2 listener is in OnDoClient2Task
1364//   event 3: indicate to client2 that server listener is in OnDoServerTask
1365
1366class RestrictedDispatchDeadlockServer : public Worker {
1367 public:
1368  RestrictedDispatchDeadlockServer(int server_num,
1369                                   WaitableEvent* server_ready_event,
1370                                   WaitableEvent** events,
1371                                   RestrictedDispatchDeadlockServer* peer)
1372      : Worker(server_num == 1 ? "channel1" : "channel2", Channel::MODE_SERVER),
1373        server_num_(server_num),
1374        server_ready_event_(server_ready_event),
1375        events_(events),
1376        peer_(peer) { }
1377
1378  void OnDoServerTask() {
1379    events_[3]->Signal();
1380    events_[2]->Wait();
1381    events_[0]->Signal();
1382    SendMessageToClient();
1383  }
1384
1385  virtual void Run() OVERRIDE {
1386    channel()->SetRestrictDispatchChannelGroup(1);
1387    server_ready_event_->Signal();
1388  }
1389
1390  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1391
1392 private:
1393  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1394    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message)
1395     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1396     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1397    IPC_END_MESSAGE_MAP()
1398    return true;
1399  }
1400
1401  void OnNoArgs() {
1402    if (server_num_ == 1) {
1403      DCHECK(peer_ != NULL);
1404      peer_->SendMessageToClient();
1405    }
1406  }
1407
1408  void SendMessageToClient() {
1409    Message* msg = new SyncChannelTestMsg_NoArgs;
1410    msg->set_unblock(false);
1411    DCHECK(!msg->should_unblock());
1412    Send(msg);
1413  }
1414
1415  int server_num_;
1416  WaitableEvent* server_ready_event_;
1417  WaitableEvent** events_;
1418  RestrictedDispatchDeadlockServer* peer_;
1419};
1420
1421class RestrictedDispatchDeadlockClient2 : public Worker {
1422 public:
1423  RestrictedDispatchDeadlockClient2(RestrictedDispatchDeadlockServer* server,
1424                                    WaitableEvent* server_ready_event,
1425                                    WaitableEvent** events)
1426      : Worker("channel2", Channel::MODE_CLIENT),
1427        server_ready_event_(server_ready_event),
1428        events_(events),
1429        received_msg_(false),
1430        received_noarg_reply_(false),
1431        done_issued_(false) {}
1432
1433  virtual void Run() OVERRIDE {
1434    server_ready_event_->Wait();
1435  }
1436
1437  void OnDoClient2Task() {
1438    events_[3]->Wait();
1439    events_[1]->Signal();
1440    events_[2]->Signal();
1441    DCHECK(received_msg_ == false);
1442
1443    Message* message = new SyncChannelTestMsg_NoArgs;
1444    message->set_unblock(true);
1445    Send(message);
1446    received_noarg_reply_ = true;
1447  }
1448
1449  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1450 private:
1451  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1452    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message)
1453     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1454    IPC_END_MESSAGE_MAP()
1455    return true;
1456  }
1457
1458  void OnNoArgs() {
1459    received_msg_ = true;
1460    PossiblyDone();
1461  }
1462
1463  void PossiblyDone() {
1464    if (received_noarg_reply_ && received_msg_) {
1465      DCHECK(done_issued_ == false);
1466      done_issued_ = true;
1467      Send(new SyncChannelTestMsg_Done);
1468      Done();
1469    }
1470  }
1471
1472  WaitableEvent* server_ready_event_;
1473  WaitableEvent** events_;
1474  bool received_msg_;
1475  bool received_noarg_reply_;
1476  bool done_issued_;
1477};
1478
1479class RestrictedDispatchDeadlockClient1 : public Worker {
1480 public:
1481  RestrictedDispatchDeadlockClient1(RestrictedDispatchDeadlockServer* server,
1482                                    RestrictedDispatchDeadlockClient2* peer,
1483                                    WaitableEvent* server_ready_event,
1484                                    WaitableEvent** events)
1485      : Worker("channel1", Channel::MODE_CLIENT),
1486        server_(server),
1487        peer_(peer),
1488        server_ready_event_(server_ready_event),
1489        events_(events),
1490        received_msg_(false),
1491        received_noarg_reply_(false),
1492        done_issued_(false) {}
1493
1494  virtual void Run() OVERRIDE {
1495    server_ready_event_->Wait();
1496    server_->ListenerThread()->message_loop()->PostTask(
1497        FROM_HERE,
1498        base::Bind(&RestrictedDispatchDeadlockServer::OnDoServerTask, server_));
1499    peer_->ListenerThread()->message_loop()->PostTask(
1500        FROM_HERE,
1501        base::Bind(&RestrictedDispatchDeadlockClient2::OnDoClient2Task, peer_));
1502    events_[0]->Wait();
1503    events_[1]->Wait();
1504    DCHECK(received_msg_ == false);
1505
1506    Message* message = new SyncChannelTestMsg_NoArgs;
1507    message->set_unblock(true);
1508    Send(message);
1509    received_noarg_reply_ = true;
1510    PossiblyDone();
1511  }
1512
1513  base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1514 private:
1515  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1516    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message)
1517     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1518    IPC_END_MESSAGE_MAP()
1519    return true;
1520  }
1521
1522  void OnNoArgs() {
1523    received_msg_ = true;
1524    PossiblyDone();
1525  }
1526
1527  void PossiblyDone() {
1528    if (received_noarg_reply_ && received_msg_) {
1529      DCHECK(done_issued_ == false);
1530      done_issued_ = true;
1531      Send(new SyncChannelTestMsg_Done);
1532      Done();
1533    }
1534  }
1535
1536  RestrictedDispatchDeadlockServer* server_;
1537  RestrictedDispatchDeadlockClient2* peer_;
1538  WaitableEvent* server_ready_event_;
1539  WaitableEvent** events_;
1540  bool received_msg_;
1541  bool received_noarg_reply_;
1542  bool done_issued_;
1543};
1544
1545TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) {
1546  std::vector<Worker*> workers;
1547
1548  // A shared worker thread so that server1 and server2 run on one thread.
1549  base::Thread worker_thread("RestrictedDispatchDeadlock");
1550  ASSERT_TRUE(worker_thread.Start());
1551
1552  WaitableEvent server1_ready(false, false);
1553  WaitableEvent server2_ready(false, false);
1554
1555  WaitableEvent event0(false, false);
1556  WaitableEvent event1(false, false);
1557  WaitableEvent event2(false, false);
1558  WaitableEvent event3(false, false);
1559  WaitableEvent* events[4] = {&event0, &event1, &event2, &event3};
1560
1561  RestrictedDispatchDeadlockServer* server1;
1562  RestrictedDispatchDeadlockServer* server2;
1563  RestrictedDispatchDeadlockClient1* client1;
1564  RestrictedDispatchDeadlockClient2* client2;
1565
1566  server2 = new RestrictedDispatchDeadlockServer(2, &server2_ready, events,
1567                                                 NULL);
1568  server2->OverrideThread(&worker_thread);
1569  workers.push_back(server2);
1570
1571  client2 = new RestrictedDispatchDeadlockClient2(server2, &server2_ready,
1572                                                  events);
1573  workers.push_back(client2);
1574
1575  server1 = new RestrictedDispatchDeadlockServer(1, &server1_ready, events,
1576                                                 server2);
1577  server1->OverrideThread(&worker_thread);
1578  workers.push_back(server1);
1579
1580  client1 = new RestrictedDispatchDeadlockClient1(server1, client2,
1581                                                  &server1_ready, events);
1582  workers.push_back(client1);
1583
1584  RunTest(workers);
1585}
1586
1587//------------------------------------------------------------------------------
1588
1589// This test case inspired by crbug.com/120530
1590// We create 4 workers that pipe to each other W1->W2->W3->W4->W1 then we send a
1591// message that recurses through 3, 4 or 5 steps to make sure, say, W1 can
1592// re-enter when called from W4 while it's sending a message to W2.
1593// The first worker drives the whole test so it must be treated specially.
1594
1595class RestrictedDispatchPipeWorker : public Worker {
1596 public:
1597  RestrictedDispatchPipeWorker(
1598      const std::string &channel1,
1599      WaitableEvent* event1,
1600      const std::string &channel2,
1601      WaitableEvent* event2,
1602      int group,
1603      int* success)
1604      : Worker(channel1, Channel::MODE_SERVER),
1605        event1_(event1),
1606        event2_(event2),
1607        other_channel_name_(channel2),
1608        group_(group),
1609        success_(success) {
1610  }
1611
1612  void OnPingTTL(int ping, int* ret) {
1613    *ret = 0;
1614    if (!ping)
1615      return;
1616    other_channel_->Send(new SyncChannelTestMsg_PingTTL(ping - 1, ret));
1617    ++*ret;
1618  }
1619
1620  void OnDone() {
1621    if (is_first())
1622      return;
1623    other_channel_->Send(new SyncChannelTestMsg_Done);
1624    other_channel_.reset();
1625    Done();
1626  }
1627
1628  virtual void Run() OVERRIDE {
1629    channel()->SetRestrictDispatchChannelGroup(group_);
1630    if (is_first())
1631      event1_->Signal();
1632    event2_->Wait();
1633    other_channel_.reset(new SyncChannel(
1634        other_channel_name_, Channel::MODE_CLIENT, this,
1635        ipc_thread().message_loop_proxy(), true, shutdown_event()));
1636    other_channel_->SetRestrictDispatchChannelGroup(group_);
1637    if (!is_first()) {
1638      event1_->Signal();
1639      return;
1640    }
1641    *success_ = 0;
1642    int value = 0;
1643    OnPingTTL(3, &value);
1644    *success_ += (value == 3);
1645    OnPingTTL(4, &value);
1646    *success_ += (value == 4);
1647    OnPingTTL(5, &value);
1648    *success_ += (value == 5);
1649    other_channel_->Send(new SyncChannelTestMsg_Done);
1650    other_channel_.reset();
1651    Done();
1652  }
1653
1654  bool is_first() { return !!success_; }
1655
1656 private:
1657  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1658    IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchPipeWorker, message)
1659     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
1660     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, OnDone)
1661    IPC_END_MESSAGE_MAP()
1662    return true;
1663  }
1664
1665  scoped_ptr<SyncChannel> other_channel_;
1666  WaitableEvent* event1_;
1667  WaitableEvent* event2_;
1668  std::string other_channel_name_;
1669  int group_;
1670  int* success_;
1671};
1672
1673TEST_F(IPCSyncChannelTest, RestrictedDispatch4WayDeadlock) {
1674  int success = 0;
1675  std::vector<Worker*> workers;
1676  WaitableEvent event0(true, false);
1677  WaitableEvent event1(true, false);
1678  WaitableEvent event2(true, false);
1679  WaitableEvent event3(true, false);
1680  workers.push_back(new RestrictedDispatchPipeWorker(
1681        "channel0", &event0, "channel1", &event1, 1, &success));
1682  workers.push_back(new RestrictedDispatchPipeWorker(
1683        "channel1", &event1, "channel2", &event2, 2, NULL));
1684  workers.push_back(new RestrictedDispatchPipeWorker(
1685        "channel2", &event2, "channel3", &event3, 3, NULL));
1686  workers.push_back(new RestrictedDispatchPipeWorker(
1687        "channel3", &event3, "channel0", &event0, 4, NULL));
1688  RunTest(workers);
1689  EXPECT_EQ(3, success);
1690}
1691
1692//------------------------------------------------------------------------------
1693
1694// This test case inspired by crbug.com/122443
1695// We want to make sure a reply message with the unblock flag set correctly
1696// behaves as a reply, not a regular message.
1697// We have 3 workers. Server1 will send a message to Server2 (which will block),
1698// during which it will dispatch a message comming from Client, at which point
1699// it will send another message to Server2. While sending that second message it
1700// will receive a reply from Server1 with the unblock flag.
1701
1702class ReentrantReplyServer1 : public Worker {
1703 public:
1704  ReentrantReplyServer1(WaitableEvent* server_ready)
1705      : Worker("reentrant_reply1", Channel::MODE_SERVER),
1706        server_ready_(server_ready) { }
1707
1708  virtual void Run() OVERRIDE {
1709    server2_channel_.reset(new SyncChannel(
1710        "reentrant_reply2", Channel::MODE_CLIENT, this,
1711        ipc_thread().message_loop_proxy(), true, shutdown_event()));
1712    server_ready_->Signal();
1713    Message* msg = new SyncChannelTestMsg_Reentrant1();
1714    server2_channel_->Send(msg);
1715    server2_channel_.reset();
1716    Done();
1717  }
1718
1719 private:
1720  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1721    IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer1, message)
1722     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant2, OnReentrant2)
1723     IPC_REPLY_HANDLER(OnReply)
1724    IPC_END_MESSAGE_MAP()
1725    return true;
1726  }
1727
1728  void OnReentrant2() {
1729    Message* msg = new SyncChannelTestMsg_Reentrant3();
1730    server2_channel_->Send(msg);
1731  }
1732
1733  void OnReply(const Message& message) {
1734    // If we get here, the Send() will never receive the reply (thus would
1735    // hang), so abort instead.
1736    LOG(FATAL) << "Reply message was dispatched";
1737  }
1738
1739  WaitableEvent* server_ready_;
1740  scoped_ptr<SyncChannel> server2_channel_;
1741};
1742
1743class ReentrantReplyServer2 : public Worker {
1744 public:
1745  ReentrantReplyServer2()
1746      : Worker("reentrant_reply2", Channel::MODE_SERVER),
1747        reply_(NULL) { }
1748
1749 private:
1750  virtual bool OnMessageReceived(const Message& message) OVERRIDE {
1751    IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer2, message)
1752     IPC_MESSAGE_HANDLER_DELAY_REPLY(
1753         SyncChannelTestMsg_Reentrant1, OnReentrant1)
1754     IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant3, OnReentrant3)
1755    IPC_END_MESSAGE_MAP()
1756    return true;
1757  }
1758
1759  void OnReentrant1(Message* reply) {
1760    DCHECK(!reply_);
1761    reply_ = reply;
1762  }
1763
1764  void OnReentrant3() {
1765    DCHECK(reply_);
1766    Message* reply = reply_;
1767    reply_ = NULL;
1768    reply->set_unblock(true);
1769    Send(reply);
1770    Done();
1771  }
1772
1773  Message* reply_;
1774};
1775
1776class ReentrantReplyClient : public Worker {
1777 public:
1778  ReentrantReplyClient(WaitableEvent* server_ready)
1779      : Worker("reentrant_reply1", Channel::MODE_CLIENT),
1780        server_ready_(server_ready) { }
1781
1782  virtual void Run() OVERRIDE {
1783    server_ready_->Wait();
1784    Send(new SyncChannelTestMsg_Reentrant2());
1785    Done();
1786  }
1787
1788 private:
1789  WaitableEvent* server_ready_;
1790};
1791
1792TEST_F(IPCSyncChannelTest, ReentrantReply) {
1793  std::vector<Worker*> workers;
1794  WaitableEvent server_ready(false, false);
1795  workers.push_back(new ReentrantReplyServer2());
1796  workers.push_back(new ReentrantReplyServer1(&server_ready));
1797  workers.push_back(new ReentrantReplyClient(&server_ready));
1798  RunTest(workers);
1799}
1800
1801//------------------------------------------------------------------------------
1802
1803// Generate a validated channel ID using Channel::GenerateVerifiedChannelID().
1804
1805class VerifiedServer : public Worker {
1806 public:
1807  VerifiedServer(base::Thread* listener_thread,
1808                 const std::string& channel_name,
1809                 const std::string& reply_text)
1810      : Worker(channel_name, Channel::MODE_SERVER),
1811        reply_text_(reply_text) {
1812    Worker::OverrideThread(listener_thread);
1813  }
1814
1815  virtual void OnNestedTestMsg(Message* reply_msg) OVERRIDE {
1816    VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_;
1817    SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_);
1818    Send(reply_msg);
1819    ASSERT_EQ(channel()->peer_pid(), base::GetCurrentProcId());
1820    Done();
1821  }
1822
1823 private:
1824  std::string reply_text_;
1825};
1826
1827class VerifiedClient : public Worker {
1828 public:
1829  VerifiedClient(base::Thread* listener_thread,
1830                 const std::string& channel_name,
1831                 const std::string& expected_text)
1832      : Worker(channel_name, Channel::MODE_CLIENT),
1833        expected_text_(expected_text) {
1834    Worker::OverrideThread(listener_thread);
1835  }
1836
1837  virtual void Run() OVERRIDE {
1838    std::string response;
1839    SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response);
1840    bool result = Send(msg);
1841    DCHECK(result);
1842    DCHECK_EQ(response, expected_text_);
1843    // expected_text_ is only used in the above DCHECK. This line suppresses the
1844    // "unused private field" warning in release builds.
1845    (void)expected_text_;
1846
1847    VLOG(1) << __FUNCTION__ << " Received reply: " << response;
1848    ASSERT_EQ(channel()->peer_pid(), base::GetCurrentProcId());
1849    Done();
1850  }
1851
1852 private:
1853  std::string expected_text_;
1854};
1855
1856void Verified() {
1857  std::vector<Worker*> workers;
1858
1859  // A shared worker thread for servers
1860  base::Thread server_worker_thread("Verified_ServerListener");
1861  ASSERT_TRUE(server_worker_thread.Start());
1862
1863  base::Thread client_worker_thread("Verified_ClientListener");
1864  ASSERT_TRUE(client_worker_thread.Start());
1865
1866  std::string channel_id = Channel::GenerateVerifiedChannelID("Verified");
1867  Worker* worker;
1868
1869  worker = new VerifiedServer(&server_worker_thread,
1870                              channel_id,
1871                              "Got first message");
1872  workers.push_back(worker);
1873
1874  worker = new VerifiedClient(&client_worker_thread,
1875                              channel_id,
1876                              "Got first message");
1877  workers.push_back(worker);
1878
1879  RunTest(workers);
1880}
1881
1882// Windows needs to send an out-of-band secret to verify the client end of the
1883// channel. Test that we still connect correctly in that case.
1884TEST_F(IPCSyncChannelTest, Verified) {
1885  Verified();
1886}
1887
1888}  // namespace
1889}  // namespace IPC
1890