1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
6// heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to
7// increase tolerance and reduce observed flakiness (though doing so reduces the
8// meaningfulness of the test).
9
10#include "mojo/system/message_pipe_dispatcher.h"
11
12#include <string.h>
13
14#include <limits>
15
16#include "base/memory/ref_counted.h"
17#include "base/memory/scoped_vector.h"
18#include "base/rand_util.h"
19#include "base/threading/platform_thread.h"  // For |Sleep()|.
20#include "base/threading/simple_thread.h"
21#include "base/time/time.h"
22#include "mojo/system/message_pipe.h"
23#include "mojo/system/test_utils.h"
24#include "mojo/system/waiter.h"
25#include "mojo/system/waiter_test_utils.h"
26#include "testing/gtest/include/gtest/gtest.h"
27
28namespace mojo {
29namespace system {
30namespace {
31
32TEST(MessagePipeDispatcherTest, Basic) {
33  test::Stopwatch stopwatch;
34  int32_t buffer[1];
35  const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
36  uint32_t buffer_size;
37
38  // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
39  for (unsigned i = 0; i < 2; i++) {
40    scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
41        MessagePipeDispatcher::kDefaultCreateOptions));
42    EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType());
43    scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
44        MessagePipeDispatcher::kDefaultCreateOptions));
45    {
46      scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
47      d0->Init(mp, i);      // 0, 1.
48      d1->Init(mp, i ^ 1);  // 1, 0.
49    }
50    Waiter w;
51    uint32_t context = 0;
52    HandleSignalsState hss;
53
54    // Try adding a writable waiter when already writable.
55    w.Init();
56    hss = HandleSignalsState();
57    EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
58              d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
59    EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
60    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
61              hss.satisfiable_signals);
62    // Shouldn't need to remove the waiter (it was not added).
63
64    // Add a readable waiter to |d0|, then make it readable (by writing to
65    // |d1|), then wait.
66    w.Init();
67    ASSERT_EQ(MOJO_RESULT_OK,
68              d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr));
69    buffer[0] = 123456789;
70    EXPECT_EQ(MOJO_RESULT_OK,
71              d1->WriteMessage(UserPointer<const void>(buffer),
72                               kBufferSize,
73                               nullptr,
74                               MOJO_WRITE_MESSAGE_FLAG_NONE));
75    stopwatch.Start();
76    EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context));
77    EXPECT_EQ(1u, context);
78    EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
79    hss = HandleSignalsState();
80    d0->RemoveWaiter(&w, &hss);
81    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
82              hss.satisfied_signals);
83    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
84              hss.satisfiable_signals);
85
86    // Try adding a readable waiter when already readable (from above).
87    w.Init();
88    hss = HandleSignalsState();
89    EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
90              d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
91    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
92              hss.satisfied_signals);
93    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
94              hss.satisfiable_signals);
95    // Shouldn't need to remove the waiter (it was not added).
96
97    // Make |d0| no longer readable (by reading from it).
98    buffer[0] = 0;
99    buffer_size = kBufferSize;
100    EXPECT_EQ(MOJO_RESULT_OK,
101              d0->ReadMessage(UserPointer<void>(buffer),
102                              MakeUserPointer(&buffer_size),
103                              0,
104                              nullptr,
105                              MOJO_READ_MESSAGE_FLAG_NONE));
106    EXPECT_EQ(kBufferSize, buffer_size);
107    EXPECT_EQ(123456789, buffer[0]);
108
109    // Wait for zero time for readability on |d0| (will time out).
110    w.Init();
111    ASSERT_EQ(MOJO_RESULT_OK,
112              d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr));
113    stopwatch.Start();
114    EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr));
115    EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
116    hss = HandleSignalsState();
117    d0->RemoveWaiter(&w, &hss);
118    EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
119    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
120              hss.satisfiable_signals);
121
122    // Wait for non-zero, finite time for readability on |d0| (will time out).
123    w.Init();
124    ASSERT_EQ(MOJO_RESULT_OK,
125              d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr));
126    stopwatch.Start();
127    EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
128              w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), nullptr));
129    base::TimeDelta elapsed = stopwatch.Elapsed();
130    EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
131    EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
132    hss = HandleSignalsState();
133    d0->RemoveWaiter(&w, &hss);
134    EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
135    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
136              hss.satisfiable_signals);
137
138    EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
139    EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
140  }
141}
142
143TEST(MessagePipeDispatcherTest, InvalidParams) {
144  char buffer[1];
145
146  scoped_refptr<MessagePipeDispatcher> d0(
147      new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
148  scoped_refptr<MessagePipeDispatcher> d1(
149      new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
150  {
151    scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
152    d0->Init(mp, 0);
153    d1->Init(mp, 1);
154  }
155
156  // |WriteMessage|:
157  // Huge buffer size.
158  EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
159            d0->WriteMessage(UserPointer<const void>(buffer),
160                             std::numeric_limits<uint32_t>::max(),
161                             nullptr,
162                             MOJO_WRITE_MESSAGE_FLAG_NONE));
163
164  EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
165  EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
166}
167
168// These test invalid arguments that should cause death if we're being paranoid
169// about checking arguments (which we would want to do if, e.g., we were in a
170// true "kernel" situation, but we might not want to do otherwise for
171// performance reasons). Probably blatant errors like passing in null pointers
172// (for required pointer arguments) will still cause death, but perhaps not
173// predictably.
174TEST(MessagePipeDispatcherTest, InvalidParamsDeath) {
175  const char kMemoryCheckFailedRegex[] = "Check failed";
176
177  scoped_refptr<MessagePipeDispatcher> d0(
178      new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
179  scoped_refptr<MessagePipeDispatcher> d1(
180      new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
181  {
182    scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
183    d0->Init(mp, 0);
184    d1->Init(mp, 1);
185  }
186
187  // |WriteMessage|:
188  // Null buffer with nonzero buffer size.
189  EXPECT_DEATH_IF_SUPPORTED(
190      d0->WriteMessage(
191          NullUserPointer(), 1, nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE),
192      kMemoryCheckFailedRegex);
193
194  // |ReadMessage|:
195  // Null buffer with nonzero buffer size.
196  // First write something so that we actually have something to read.
197  EXPECT_EQ(MOJO_RESULT_OK,
198            d1->WriteMessage(UserPointer<const void>("x"),
199                             1,
200                             nullptr,
201                             MOJO_WRITE_MESSAGE_FLAG_NONE));
202  uint32_t buffer_size = 1;
203  EXPECT_DEATH_IF_SUPPORTED(d0->ReadMessage(NullUserPointer(),
204                                            MakeUserPointer(&buffer_size),
205                                            0,
206                                            nullptr,
207                                            MOJO_READ_MESSAGE_FLAG_NONE),
208                            kMemoryCheckFailedRegex);
209
210  EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
211  EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
212}
213
214// Test what happens when one end is closed (single-threaded test).
215TEST(MessagePipeDispatcherTest, BasicClosed) {
216  int32_t buffer[1];
217  const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
218  uint32_t buffer_size;
219
220  // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
221  for (unsigned i = 0; i < 2; i++) {
222    scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
223        MessagePipeDispatcher::kDefaultCreateOptions));
224    scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
225        MessagePipeDispatcher::kDefaultCreateOptions));
226    {
227      scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
228      d0->Init(mp, i);      // 0, 1.
229      d1->Init(mp, i ^ 1);  // 1, 0.
230    }
231    Waiter w;
232    HandleSignalsState hss;
233
234    // Write (twice) to |d1|.
235    buffer[0] = 123456789;
236    EXPECT_EQ(MOJO_RESULT_OK,
237              d1->WriteMessage(UserPointer<const void>(buffer),
238                               kBufferSize,
239                               nullptr,
240                               MOJO_WRITE_MESSAGE_FLAG_NONE));
241    buffer[0] = 234567890;
242    EXPECT_EQ(MOJO_RESULT_OK,
243              d1->WriteMessage(UserPointer<const void>(buffer),
244                               kBufferSize,
245                               nullptr,
246                               MOJO_WRITE_MESSAGE_FLAG_NONE));
247
248    // Try waiting for readable on |d0|; should fail (already satisfied).
249    w.Init();
250    hss = HandleSignalsState();
251    EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
252              d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
253    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
254              hss.satisfied_signals);
255    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
256              hss.satisfiable_signals);
257
258    // Try reading from |d1|; should fail (nothing to read).
259    buffer[0] = 0;
260    buffer_size = kBufferSize;
261    EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
262              d1->ReadMessage(UserPointer<void>(buffer),
263                              MakeUserPointer(&buffer_size),
264                              0,
265                              nullptr,
266                              MOJO_READ_MESSAGE_FLAG_NONE));
267
268    // Close |d1|.
269    EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
270
271    // Try waiting for readable on |d0|; should fail (already satisfied).
272    w.Init();
273    hss = HandleSignalsState();
274    EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
275              d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, &hss));
276    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
277    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
278
279    // Read from |d0|.
280    buffer[0] = 0;
281    buffer_size = kBufferSize;
282    EXPECT_EQ(MOJO_RESULT_OK,
283              d0->ReadMessage(UserPointer<void>(buffer),
284                              MakeUserPointer(&buffer_size),
285                              0,
286                              nullptr,
287                              MOJO_READ_MESSAGE_FLAG_NONE));
288    EXPECT_EQ(kBufferSize, buffer_size);
289    EXPECT_EQ(123456789, buffer[0]);
290
291    // Try waiting for readable on |d0|; should fail (already satisfied).
292    w.Init();
293    hss = HandleSignalsState();
294    EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
295              d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
296    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
297    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
298
299    // Read again from |d0|.
300    buffer[0] = 0;
301    buffer_size = kBufferSize;
302    EXPECT_EQ(MOJO_RESULT_OK,
303              d0->ReadMessage(UserPointer<void>(buffer),
304                              MakeUserPointer(&buffer_size),
305                              0,
306                              nullptr,
307                              MOJO_READ_MESSAGE_FLAG_NONE));
308    EXPECT_EQ(kBufferSize, buffer_size);
309    EXPECT_EQ(234567890, buffer[0]);
310
311    // Try waiting for readable on |d0|; should fail (unsatisfiable).
312    w.Init();
313    hss = HandleSignalsState();
314    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
315              d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss));
316    EXPECT_EQ(0u, hss.satisfied_signals);
317    EXPECT_EQ(0u, hss.satisfiable_signals);
318
319    // Try waiting for writable on |d0|; should fail (unsatisfiable).
320    w.Init();
321    hss = HandleSignalsState();
322    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
323              d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss));
324    EXPECT_EQ(0u, hss.satisfied_signals);
325    EXPECT_EQ(0u, hss.satisfiable_signals);
326
327    // Try reading from |d0|; should fail (nothing to read and other end
328    // closed).
329    buffer[0] = 0;
330    buffer_size = kBufferSize;
331    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
332              d0->ReadMessage(UserPointer<void>(buffer),
333                              MakeUserPointer(&buffer_size),
334                              0,
335                              nullptr,
336                              MOJO_READ_MESSAGE_FLAG_NONE));
337
338    // Try writing to |d0|; should fail (other end closed).
339    buffer[0] = 345678901;
340    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
341              d0->WriteMessage(UserPointer<const void>(buffer),
342                               kBufferSize,
343                               nullptr,
344                               MOJO_WRITE_MESSAGE_FLAG_NONE));
345
346    EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
347  }
348}
349
350#if defined(OS_WIN)
351// http://crbug.com/396386
352#define MAYBE_BasicThreaded DISABLED_BasicThreaded
353#else
354#define MAYBE_BasicThreaded BasicThreaded
355#endif
356TEST(MessagePipeDispatcherTest, MAYBE_BasicThreaded) {
357  test::Stopwatch stopwatch;
358  int32_t buffer[1];
359  const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
360  uint32_t buffer_size;
361  base::TimeDelta elapsed;
362  bool did_wait;
363  MojoResult result;
364  uint32_t context;
365  HandleSignalsState hss;
366
367  // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
368  for (unsigned i = 0; i < 2; i++) {
369    scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
370        MessagePipeDispatcher::kDefaultCreateOptions));
371    scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
372        MessagePipeDispatcher::kDefaultCreateOptions));
373    {
374      scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
375      d0->Init(mp, i);      // 0, 1.
376      d1->Init(mp, i ^ 1);  // 1, 0.
377    }
378
379    // Wait for readable on |d1|, which will become readable after some time.
380    {
381      test::WaiterThread thread(d1,
382                                MOJO_HANDLE_SIGNAL_READABLE,
383                                MOJO_DEADLINE_INDEFINITE,
384                                1,
385                                &did_wait,
386                                &result,
387                                &context,
388                                &hss);
389      stopwatch.Start();
390      thread.Start();
391      base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
392      // Wake it up by writing to |d0|.
393      buffer[0] = 123456789;
394      EXPECT_EQ(MOJO_RESULT_OK,
395                d0->WriteMessage(UserPointer<const void>(buffer),
396                                 kBufferSize,
397                                 nullptr,
398                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
399    }  // Joins the thread.
400    elapsed = stopwatch.Elapsed();
401    EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
402    EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
403    EXPECT_TRUE(did_wait);
404    EXPECT_EQ(MOJO_RESULT_OK, result);
405    EXPECT_EQ(1u, context);
406    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
407              hss.satisfied_signals);
408    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
409              hss.satisfiable_signals);
410
411    // Now |d1| is already readable. Try waiting for it again.
412    {
413      test::WaiterThread thread(d1,
414                                MOJO_HANDLE_SIGNAL_READABLE,
415                                MOJO_DEADLINE_INDEFINITE,
416                                2,
417                                &did_wait,
418                                &result,
419                                &context,
420                                &hss);
421      stopwatch.Start();
422      thread.Start();
423    }  // Joins the thread.
424    EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
425    EXPECT_FALSE(did_wait);
426    EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
427    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
428              hss.satisfied_signals);
429    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
430              hss.satisfiable_signals);
431
432    // Consume what we wrote to |d0|.
433    buffer[0] = 0;
434    buffer_size = kBufferSize;
435    EXPECT_EQ(MOJO_RESULT_OK,
436              d1->ReadMessage(UserPointer<void>(buffer),
437                              MakeUserPointer(&buffer_size),
438                              0,
439                              nullptr,
440                              MOJO_READ_MESSAGE_FLAG_NONE));
441    EXPECT_EQ(kBufferSize, buffer_size);
442    EXPECT_EQ(123456789, buffer[0]);
443
444    // Wait for readable on |d1| and close |d0| after some time, which should
445    // cancel that wait.
446    {
447      test::WaiterThread thread(d1,
448                                MOJO_HANDLE_SIGNAL_READABLE,
449                                MOJO_DEADLINE_INDEFINITE,
450                                3,
451                                &did_wait,
452                                &result,
453                                &context,
454                                &hss);
455      stopwatch.Start();
456      thread.Start();
457      base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
458      EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
459    }  // Joins the thread.
460    elapsed = stopwatch.Elapsed();
461    EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
462    EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
463    EXPECT_TRUE(did_wait);
464    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
465    EXPECT_EQ(3u, context);
466    EXPECT_EQ(0u, hss.satisfied_signals);
467    EXPECT_EQ(0u, hss.satisfiable_signals);
468
469    EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
470  }
471
472  for (unsigned i = 0; i < 2; i++) {
473    scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
474        MessagePipeDispatcher::kDefaultCreateOptions));
475    scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
476        MessagePipeDispatcher::kDefaultCreateOptions));
477    {
478      scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
479      d0->Init(mp, i);      // 0, 1.
480      d1->Init(mp, i ^ 1);  // 1, 0.
481    }
482
483    // Wait for readable on |d1| and close |d1| after some time, which should
484    // cancel that wait.
485    {
486      test::WaiterThread thread(d1,
487                                MOJO_HANDLE_SIGNAL_READABLE,
488                                MOJO_DEADLINE_INDEFINITE,
489                                4,
490                                &did_wait,
491                                &result,
492                                &context,
493                                &hss);
494      stopwatch.Start();
495      thread.Start();
496      base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
497      EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
498    }  // Joins the thread.
499    elapsed = stopwatch.Elapsed();
500    EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
501    EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
502    EXPECT_TRUE(did_wait);
503    EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
504    EXPECT_EQ(4u, context);
505    EXPECT_EQ(0u, hss.satisfied_signals);
506    EXPECT_EQ(0u, hss.satisfiable_signals);
507
508    EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
509  }
510}
511
512// Stress test -----------------------------------------------------------------
513
514const size_t kMaxMessageSize = 2000;
515
516class WriterThread : public base::SimpleThread {
517 public:
518  // |*messages_written| and |*bytes_written| belong to the thread while it's
519  // alive.
520  WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
521               size_t* messages_written,
522               size_t* bytes_written)
523      : base::SimpleThread("writer_thread"),
524        write_dispatcher_(write_dispatcher),
525        messages_written_(messages_written),
526        bytes_written_(bytes_written) {
527    *messages_written_ = 0;
528    *bytes_written_ = 0;
529  }
530
531  virtual ~WriterThread() { Join(); }
532
533 private:
534  virtual void Run() OVERRIDE {
535    // Make some data to write.
536    unsigned char buffer[kMaxMessageSize];
537    for (size_t i = 0; i < kMaxMessageSize; i++)
538      buffer[i] = static_cast<unsigned char>(i);
539
540    // Number of messages to write.
541    *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
542
543    // Write messages.
544    for (size_t i = 0; i < *messages_written_; i++) {
545      uint32_t bytes_to_write = static_cast<uint32_t>(
546          base::RandInt(1, static_cast<int>(kMaxMessageSize)));
547      EXPECT_EQ(MOJO_RESULT_OK,
548                write_dispatcher_->WriteMessage(UserPointer<const void>(buffer),
549                                                bytes_to_write,
550                                                nullptr,
551                                                MOJO_WRITE_MESSAGE_FLAG_NONE));
552      *bytes_written_ += bytes_to_write;
553    }
554
555    // Write one last "quit" message.
556    EXPECT_EQ(MOJO_RESULT_OK,
557              write_dispatcher_->WriteMessage(UserPointer<const void>("quit"),
558                                              4,
559                                              nullptr,
560                                              MOJO_WRITE_MESSAGE_FLAG_NONE));
561  }
562
563  const scoped_refptr<Dispatcher> write_dispatcher_;
564  size_t* const messages_written_;
565  size_t* const bytes_written_;
566
567  DISALLOW_COPY_AND_ASSIGN(WriterThread);
568};
569
570class ReaderThread : public base::SimpleThread {
571 public:
572  // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
573  ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
574               size_t* messages_read,
575               size_t* bytes_read)
576      : base::SimpleThread("reader_thread"),
577        read_dispatcher_(read_dispatcher),
578        messages_read_(messages_read),
579        bytes_read_(bytes_read) {
580    *messages_read_ = 0;
581    *bytes_read_ = 0;
582  }
583
584  virtual ~ReaderThread() { Join(); }
585
586 private:
587  virtual void Run() OVERRIDE {
588    unsigned char buffer[kMaxMessageSize];
589    Waiter w;
590    HandleSignalsState hss;
591    MojoResult result;
592
593    // Read messages.
594    for (;;) {
595      // Wait for it to be readable.
596      w.Init();
597      hss = HandleSignalsState();
598      result =
599          read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss);
600      EXPECT_TRUE(result == MOJO_RESULT_OK ||
601                  result == MOJO_RESULT_ALREADY_EXISTS)
602          << "result: " << result;
603      if (result == MOJO_RESULT_OK) {
604        // Actually need to wait.
605        EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, nullptr));
606        read_dispatcher_->RemoveWaiter(&w, &hss);
607      }
608      // We may not actually be readable, since we're racing with other threads.
609      EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
610
611      // Now, try to do the read.
612      // Clear the buffer so that we can check the result.
613      memset(buffer, 0, sizeof(buffer));
614      uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
615      result = read_dispatcher_->ReadMessage(UserPointer<void>(buffer),
616                                             MakeUserPointer(&buffer_size),
617                                             0,
618                                             nullptr,
619                                             MOJO_READ_MESSAGE_FLAG_NONE);
620      EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_SHOULD_WAIT)
621          << "result: " << result;
622      // We're racing with others to read, so maybe we failed.
623      if (result == MOJO_RESULT_SHOULD_WAIT)
624        continue;  // In which case, try again.
625      // Check for quit.
626      if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
627        return;
628      EXPECT_GE(buffer_size, 1u);
629      EXPECT_LE(buffer_size, kMaxMessageSize);
630      EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
631
632      (*messages_read_)++;
633      *bytes_read_ += buffer_size;
634    }
635  }
636
637  static bool IsValidMessage(const unsigned char* buffer,
638                             uint32_t message_size) {
639    size_t i;
640    for (i = 0; i < message_size; i++) {
641      if (buffer[i] != static_cast<unsigned char>(i))
642        return false;
643    }
644    // Check that the remaining bytes weren't stomped on.
645    for (; i < kMaxMessageSize; i++) {
646      if (buffer[i] != 0)
647        return false;
648    }
649    return true;
650  }
651
652  const scoped_refptr<Dispatcher> read_dispatcher_;
653  size_t* const messages_read_;
654  size_t* const bytes_read_;
655
656  DISALLOW_COPY_AND_ASSIGN(ReaderThread);
657};
658
659TEST(MessagePipeDispatcherTest, Stress) {
660  static const size_t kNumWriters = 30;
661  static const size_t kNumReaders = kNumWriters;
662
663  scoped_refptr<MessagePipeDispatcher> d_write(
664      new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
665  scoped_refptr<MessagePipeDispatcher> d_read(
666      new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
667  {
668    scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
669    d_write->Init(mp, 0);
670    d_read->Init(mp, 1);
671  }
672
673  size_t messages_written[kNumWriters];
674  size_t bytes_written[kNumWriters];
675  size_t messages_read[kNumReaders];
676  size_t bytes_read[kNumReaders];
677  {
678    // Make writers.
679    ScopedVector<WriterThread> writers;
680    for (size_t i = 0; i < kNumWriters; i++) {
681      writers.push_back(
682          new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
683    }
684
685    // Make readers.
686    ScopedVector<ReaderThread> readers;
687    for (size_t i = 0; i < kNumReaders; i++) {
688      readers.push_back(
689          new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
690    }
691
692    // Start writers.
693    for (size_t i = 0; i < kNumWriters; i++)
694      writers[i]->Start();
695
696    // Start readers.
697    for (size_t i = 0; i < kNumReaders; i++)
698      readers[i]->Start();
699
700    // TODO(vtl): Maybe I should have an event that triggers all the threads to
701    // start doing stuff for real (so that the first ones created/started aren't
702    // advantaged).
703  }  // Joins all the threads.
704
705  size_t total_messages_written = 0;
706  size_t total_bytes_written = 0;
707  for (size_t i = 0; i < kNumWriters; i++) {
708    total_messages_written += messages_written[i];
709    total_bytes_written += bytes_written[i];
710  }
711  size_t total_messages_read = 0;
712  size_t total_bytes_read = 0;
713  for (size_t i = 0; i < kNumReaders; i++) {
714    total_messages_read += messages_read[i];
715    total_bytes_read += bytes_read[i];
716    // We'd have to be really unlucky to have read no messages on a thread.
717    EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
718    EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
719  }
720  EXPECT_EQ(total_messages_written, total_messages_read);
721  EXPECT_EQ(total_bytes_written, total_bytes_read);
722
723  EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
724  EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
725}
726
727}  // namespace
728}  // namespace system
729}  // namespace mojo
730