1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/common/handle_watcher.h"
6
7#include <string>
8
9#include "base/at_exit.h"
10#include "base/auto_reset.h"
11#include "base/bind.h"
12#include "base/memory/scoped_vector.h"
13#include "base/run_loop.h"
14#include "base/test/simple_test_tick_clock.h"
15#include "base/threading/thread.h"
16#include "mojo/common/message_pump_mojo.h"
17#include "mojo/common/time_helper.h"
18#include "mojo/public/cpp/system/core.h"
19#include "mojo/public/cpp/test_support/test_utils.h"
20#include "testing/gtest/include/gtest/gtest.h"
21
22namespace mojo {
23namespace common {
24namespace test {
25
26enum MessageLoopConfig {
27  MESSAGE_LOOP_CONFIG_DEFAULT = 0,
28  MESSAGE_LOOP_CONFIG_MOJO = 1
29};
30
31void ObserveCallback(bool* was_signaled,
32                     MojoResult* result_observed,
33                     MojoResult result) {
34  *was_signaled = true;
35  *result_observed = result;
36}
37
38void RunUntilIdle() {
39  base::RunLoop run_loop;
40  run_loop.RunUntilIdle();
41}
42
43void DeleteWatcherAndForwardResult(
44    HandleWatcher* watcher,
45    base::Callback<void(MojoResult)> next_callback,
46    MojoResult result) {
47  delete watcher;
48  next_callback.Run(result);
49}
50
51scoped_ptr<base::MessageLoop> CreateMessageLoop(MessageLoopConfig config) {
52  scoped_ptr<base::MessageLoop> loop;
53  if (config == MESSAGE_LOOP_CONFIG_DEFAULT)
54    loop.reset(new base::MessageLoop());
55  else
56    loop.reset(new base::MessageLoop(MessagePumpMojo::Create()));
57  return loop.Pass();
58}
59
60// Helper class to manage the callback and running the message loop waiting for
61// message to be received. Typical usage is something like:
62//   Schedule callback returned from GetCallback().
63//   RunUntilGotCallback();
64//   EXPECT_TRUE(got_callback());
65//   clear_callback();
66class CallbackHelper {
67 public:
68  CallbackHelper()
69      : got_callback_(false),
70        run_loop_(NULL),
71        weak_factory_(this) {}
72  ~CallbackHelper() {}
73
74  // See description above |got_callback_|.
75  bool got_callback() const { return got_callback_; }
76  void clear_callback() { got_callback_ = false; }
77
78  // Runs the current MessageLoop until the callback returned from GetCallback()
79  // is notified.
80  void RunUntilGotCallback() {
81    ASSERT_TRUE(run_loop_ == NULL);
82    base::RunLoop run_loop;
83    base::AutoReset<base::RunLoop*> reseter(&run_loop_, &run_loop);
84    run_loop.Run();
85  }
86
87  base::Callback<void(MojoResult)> GetCallback() {
88    return base::Bind(&CallbackHelper::OnCallback, weak_factory_.GetWeakPtr());
89  }
90
91  void Start(HandleWatcher* watcher, const MessagePipeHandle& handle) {
92    StartWithCallback(watcher, handle, GetCallback());
93  }
94
95  void StartWithCallback(HandleWatcher* watcher,
96                         const MessagePipeHandle& handle,
97                         const base::Callback<void(MojoResult)>& callback) {
98    watcher->Start(handle, MOJO_HANDLE_SIGNAL_READABLE,
99                   MOJO_DEADLINE_INDEFINITE, callback);
100  }
101
102 private:
103  void OnCallback(MojoResult result) {
104    got_callback_ = true;
105    if (run_loop_)
106      run_loop_->Quit();
107  }
108
109  // Set to true when the callback is called.
110  bool got_callback_;
111
112  // If non-NULL we're in RunUntilGotCallback().
113  base::RunLoop* run_loop_;
114
115  base::WeakPtrFactory<CallbackHelper> weak_factory_;
116
117 private:
118  DISALLOW_COPY_AND_ASSIGN(CallbackHelper);
119};
120
121class HandleWatcherTest : public testing::TestWithParam<MessageLoopConfig> {
122 public:
123  HandleWatcherTest() : message_loop_(CreateMessageLoop(GetParam())) {}
124  virtual ~HandleWatcherTest() {
125    test::SetTickClockForTest(NULL);
126  }
127
128 protected:
129  void TearDownMessageLoop() {
130    message_loop_.reset();
131  }
132
133  void InstallTickClock() {
134    test::SetTickClockForTest(&tick_clock_);
135  }
136
137  base::SimpleTestTickClock tick_clock_;
138
139 private:
140  base::ShadowingAtExitManager at_exit_;
141  scoped_ptr<base::MessageLoop> message_loop_;
142
143  DISALLOW_COPY_AND_ASSIGN(HandleWatcherTest);
144};
145
146INSTANTIATE_TEST_CASE_P(
147    MultipleMessageLoopConfigs, HandleWatcherTest,
148    testing::Values(MESSAGE_LOOP_CONFIG_DEFAULT, MESSAGE_LOOP_CONFIG_MOJO));
149
150// Trivial test case with a single handle to watch.
151TEST_P(HandleWatcherTest, SingleHandler) {
152  MessagePipe test_pipe;
153  ASSERT_TRUE(test_pipe.handle0.is_valid());
154  CallbackHelper callback_helper;
155  HandleWatcher watcher;
156  callback_helper.Start(&watcher, test_pipe.handle0.get());
157  RunUntilIdle();
158  EXPECT_FALSE(callback_helper.got_callback());
159  EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(),
160                                           std::string()));
161  callback_helper.RunUntilGotCallback();
162  EXPECT_TRUE(callback_helper.got_callback());
163}
164
165// Creates three handles and notfies them in reverse order ensuring each one is
166// notified appropriately.
167TEST_P(HandleWatcherTest, ThreeHandles) {
168  MessagePipe test_pipe1;
169  MessagePipe test_pipe2;
170  MessagePipe test_pipe3;
171  CallbackHelper callback_helper1;
172  CallbackHelper callback_helper2;
173  CallbackHelper callback_helper3;
174  ASSERT_TRUE(test_pipe1.handle0.is_valid());
175  ASSERT_TRUE(test_pipe2.handle0.is_valid());
176  ASSERT_TRUE(test_pipe3.handle0.is_valid());
177
178  HandleWatcher watcher1;
179  callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
180  RunUntilIdle();
181  EXPECT_FALSE(callback_helper1.got_callback());
182  EXPECT_FALSE(callback_helper2.got_callback());
183  EXPECT_FALSE(callback_helper3.got_callback());
184
185  HandleWatcher watcher2;
186  callback_helper2.Start(&watcher2, test_pipe2.handle0.get());
187  RunUntilIdle();
188  EXPECT_FALSE(callback_helper1.got_callback());
189  EXPECT_FALSE(callback_helper2.got_callback());
190  EXPECT_FALSE(callback_helper3.got_callback());
191
192  HandleWatcher watcher3;
193  callback_helper3.Start(&watcher3, test_pipe3.handle0.get());
194  RunUntilIdle();
195  EXPECT_FALSE(callback_helper1.got_callback());
196  EXPECT_FALSE(callback_helper2.got_callback());
197  EXPECT_FALSE(callback_helper3.got_callback());
198
199  // Write to 3 and make sure it's notified.
200  EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(),
201                                           std::string()));
202  callback_helper3.RunUntilGotCallback();
203  EXPECT_FALSE(callback_helper1.got_callback());
204  EXPECT_FALSE(callback_helper2.got_callback());
205  EXPECT_TRUE(callback_helper3.got_callback());
206  callback_helper3.clear_callback();
207
208  // Write to 1 and 3. Only 1 should be notified since 3 was is no longer
209  // running.
210  EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
211                                           std::string()));
212  EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(),
213                                           std::string()));
214  callback_helper1.RunUntilGotCallback();
215  EXPECT_TRUE(callback_helper1.got_callback());
216  EXPECT_FALSE(callback_helper2.got_callback());
217  EXPECT_FALSE(callback_helper3.got_callback());
218  callback_helper1.clear_callback();
219
220  // Write to 1 and 2. Only 2 should be notified (since 1 was already notified).
221  EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
222                                           std::string()));
223  EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(),
224                                           std::string()));
225  callback_helper2.RunUntilGotCallback();
226  EXPECT_FALSE(callback_helper1.got_callback());
227  EXPECT_TRUE(callback_helper2.got_callback());
228  EXPECT_FALSE(callback_helper3.got_callback());
229}
230
231// Verifies Start() invoked a second time works.
232TEST_P(HandleWatcherTest, Restart) {
233  MessagePipe test_pipe1;
234  MessagePipe test_pipe2;
235  CallbackHelper callback_helper1;
236  CallbackHelper callback_helper2;
237  ASSERT_TRUE(test_pipe1.handle0.is_valid());
238  ASSERT_TRUE(test_pipe2.handle0.is_valid());
239
240  HandleWatcher watcher1;
241  callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
242  RunUntilIdle();
243  EXPECT_FALSE(callback_helper1.got_callback());
244  EXPECT_FALSE(callback_helper2.got_callback());
245
246  HandleWatcher watcher2;
247  callback_helper2.Start(&watcher2, test_pipe2.handle0.get());
248  RunUntilIdle();
249  EXPECT_FALSE(callback_helper1.got_callback());
250  EXPECT_FALSE(callback_helper2.got_callback());
251
252  // Write to 1 and make sure it's notified.
253  EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
254                                           std::string()));
255  callback_helper1.RunUntilGotCallback();
256  EXPECT_TRUE(callback_helper1.got_callback());
257  EXPECT_FALSE(callback_helper2.got_callback());
258  callback_helper1.clear_callback();
259  EXPECT_TRUE(mojo::test::DiscardMessage(test_pipe1.handle0.get()));
260
261  // Write to 2 and make sure it's notified.
262  EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(),
263                                           std::string()));
264  callback_helper2.RunUntilGotCallback();
265  EXPECT_FALSE(callback_helper1.got_callback());
266  EXPECT_TRUE(callback_helper2.got_callback());
267  callback_helper2.clear_callback();
268
269  // Listen on 1 again.
270  callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
271  RunUntilIdle();
272  EXPECT_FALSE(callback_helper1.got_callback());
273  EXPECT_FALSE(callback_helper2.got_callback());
274
275  // Write to 1 and make sure it's notified.
276  EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
277                                           std::string()));
278  callback_helper1.RunUntilGotCallback();
279  EXPECT_TRUE(callback_helper1.got_callback());
280  EXPECT_FALSE(callback_helper2.got_callback());
281}
282
283// Verifies deadline is honored.
284TEST_P(HandleWatcherTest, Deadline) {
285  InstallTickClock();
286
287  MessagePipe test_pipe1;
288  MessagePipe test_pipe2;
289  MessagePipe test_pipe3;
290  CallbackHelper callback_helper1;
291  CallbackHelper callback_helper2;
292  CallbackHelper callback_helper3;
293  ASSERT_TRUE(test_pipe1.handle0.is_valid());
294  ASSERT_TRUE(test_pipe2.handle0.is_valid());
295  ASSERT_TRUE(test_pipe3.handle0.is_valid());
296
297  // Add a watcher with an infinite timeout.
298  HandleWatcher watcher1;
299  callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
300  RunUntilIdle();
301  EXPECT_FALSE(callback_helper1.got_callback());
302  EXPECT_FALSE(callback_helper2.got_callback());
303  EXPECT_FALSE(callback_helper3.got_callback());
304
305  // Add another watcher wth a timeout of 500 microseconds.
306  HandleWatcher watcher2;
307  watcher2.Start(test_pipe2.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE, 500,
308                 callback_helper2.GetCallback());
309  RunUntilIdle();
310  EXPECT_FALSE(callback_helper1.got_callback());
311  EXPECT_FALSE(callback_helper2.got_callback());
312  EXPECT_FALSE(callback_helper3.got_callback());
313
314  // Advance the clock passed the deadline. We also have to start another
315  // watcher to wake up the background thread.
316  tick_clock_.Advance(base::TimeDelta::FromMicroseconds(501));
317
318  HandleWatcher watcher3;
319  callback_helper3.Start(&watcher3, test_pipe3.handle0.get());
320
321  callback_helper2.RunUntilGotCallback();
322  EXPECT_FALSE(callback_helper1.got_callback());
323  EXPECT_TRUE(callback_helper2.got_callback());
324  EXPECT_FALSE(callback_helper3.got_callback());
325}
326
327TEST_P(HandleWatcherTest, DeleteInCallback) {
328  MessagePipe test_pipe;
329  CallbackHelper callback_helper;
330
331  HandleWatcher* watcher = new HandleWatcher();
332  callback_helper.StartWithCallback(watcher, test_pipe.handle1.get(),
333                                    base::Bind(&DeleteWatcherAndForwardResult,
334                                               watcher,
335                                               callback_helper.GetCallback()));
336  EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle0.get(),
337                                           std::string()));
338  callback_helper.RunUntilGotCallback();
339  EXPECT_TRUE(callback_helper.got_callback());
340}
341
342TEST_P(HandleWatcherTest, AbortedOnMessageLoopDestruction) {
343  bool was_signaled = false;
344  MojoResult result = MOJO_RESULT_OK;
345
346  MessagePipe pipe;
347  HandleWatcher watcher;
348  watcher.Start(pipe.handle0.get(),
349                MOJO_HANDLE_SIGNAL_READABLE,
350                MOJO_DEADLINE_INDEFINITE,
351                base::Bind(&ObserveCallback, &was_signaled, &result));
352
353  // Now, let the MessageLoop get torn down. We expect our callback to run.
354  TearDownMessageLoop();
355
356  EXPECT_TRUE(was_signaled);
357  EXPECT_EQ(MOJO_RESULT_ABORTED, result);
358}
359
360void NeverReached(MojoResult result) {
361  FAIL() << "Callback should never be invoked " << result;
362}
363
364// Called on the main thread when a thread is done. Decrements |active_count|
365// and if |active_count| is zero quits |run_loop|.
366void StressThreadDone(base::RunLoop* run_loop, int* active_count) {
367  (*active_count)--;
368  EXPECT_GE(*active_count, 0);
369  if (*active_count == 0)
370    run_loop->Quit();
371}
372
373// See description of StressTest. This is called on the background thread.
374// |count| is the number of HandleWatchers to create. |active_count| is the
375// number of outstanding threads, |task_runner| the task runner for the main
376// thread and |run_loop| the run loop that should be quit when there are no more
377// threads running. When done StressThreadDone() is invoked on the main thread.
378// |active_count| and |run_loop| should only be used on the main thread.
379void RunStressTest(int count,
380                   scoped_refptr<base::TaskRunner> task_runner,
381                   base::RunLoop* run_loop,
382                   int* active_count) {
383  struct TestData {
384    MessagePipe pipe;
385    HandleWatcher watcher;
386  };
387  ScopedVector<TestData> data_vector;
388  for (int i = 0; i < count; ++i) {
389    if (i % 20 == 0) {
390      // Every so often we wait. This results in some level of thread balancing
391      // as well as making sure HandleWatcher has time to actually start some
392      // watches.
393      MessagePipe test_pipe;
394      ASSERT_TRUE(test_pipe.handle0.is_valid());
395      CallbackHelper callback_helper;
396      HandleWatcher watcher;
397      callback_helper.Start(&watcher, test_pipe.handle0.get());
398      RunUntilIdle();
399      EXPECT_FALSE(callback_helper.got_callback());
400      EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(),
401                                               std::string()));
402      base::MessageLoop::ScopedNestableTaskAllower scoper(
403          base::MessageLoop::current());
404      callback_helper.RunUntilGotCallback();
405      EXPECT_TRUE(callback_helper.got_callback());
406    } else {
407      scoped_ptr<TestData> test_data(new TestData);
408      ASSERT_TRUE(test_data->pipe.handle0.is_valid());
409      test_data->watcher.Start(test_data->pipe.handle0.get(),
410                    MOJO_HANDLE_SIGNAL_READABLE,
411                    MOJO_DEADLINE_INDEFINITE,
412                    base::Bind(&NeverReached));
413      data_vector.push_back(test_data.release());
414    }
415    if (i % 15 == 0)
416      data_vector.clear();
417  }
418  task_runner->PostTask(FROM_HERE,
419                        base::Bind(&StressThreadDone, run_loop,
420                                   active_count));
421}
422
423// This test is meant to stress HandleWatcher. It uses from various threads
424// repeatedly starting and stopping watches. It spins up kThreadCount
425// threads. Each thread creates kWatchCount watches. Every so often each thread
426// writes to a pipe and waits for the response.
427TEST(HandleWatcherCleanEnvironmentTest, StressTest) {
428#if defined(NDEBUG)
429  const int kThreadCount = 15;
430  const int kWatchCount = 400;
431#else
432  const int kThreadCount = 10;
433  const int kWatchCount = 250;
434#endif
435
436  base::ShadowingAtExitManager at_exit;
437  base::MessageLoop message_loop;
438  base::RunLoop run_loop;
439  ScopedVector<base::Thread> threads;
440  int threads_active_counter = kThreadCount;
441  // Starts the threads first and then post the task in hopes of having more
442  // threads running at once.
443  for (int i = 0; i < kThreadCount; ++i) {
444    scoped_ptr<base::Thread> thread(new base::Thread("test thread"));
445    if (i % 2) {
446      base::Thread::Options thread_options;
447      thread_options.message_pump_factory =
448          base::Bind(&MessagePumpMojo::Create);
449      thread->StartWithOptions(thread_options);
450    } else {
451      thread->Start();
452    }
453    threads.push_back(thread.release());
454  }
455  for (int i = 0; i < kThreadCount; ++i) {
456    threads[i]->task_runner()->PostTask(
457        FROM_HERE, base::Bind(&RunStressTest, kWatchCount,
458                              message_loop.task_runner(),
459                              &run_loop, &threads_active_counter));
460  }
461  run_loop.Run();
462  ASSERT_EQ(0, threads_active_counter);
463}
464
465}  // namespace test
466}  // namespace common
467}  // namespace mojo
468