1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/local_data_pipe.h"
6
7#include <string.h>
8
9#include "base/macros.h"
10#include "base/memory/ref_counted.h"
11#include "mojo/system/data_pipe.h"
12#include "mojo/system/waiter.h"
13#include "testing/gtest/include/gtest/gtest.h"
14
15namespace mojo {
16namespace system {
17namespace {
18
19const uint32_t kSizeOfOptions =
20    static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
21
22// Validate options.
23TEST(LocalDataPipeTest, Creation) {
24  // Create using default options.
25  {
26    // Get default options.
27    MojoCreateDataPipeOptions default_options = {0};
28    EXPECT_EQ(
29        MOJO_RESULT_OK,
30        DataPipe::ValidateCreateOptions(NullUserPointer(), &default_options));
31    scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(default_options));
32    dp->ProducerClose();
33    dp->ConsumerClose();
34  }
35
36  // Create using non-default options.
37  {
38    const MojoCreateDataPipeOptions options = {
39        kSizeOfOptions,                           // |struct_size|.
40        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
41        1,                                        // |element_num_bytes|.
42        1000                                      // |capacity_num_bytes|.
43    };
44    MojoCreateDataPipeOptions validated_options = {0};
45    EXPECT_EQ(MOJO_RESULT_OK,
46              DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
47                                              &validated_options));
48    scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
49    dp->ProducerClose();
50    dp->ConsumerClose();
51  }
52  {
53    const MojoCreateDataPipeOptions options = {
54        kSizeOfOptions,                           // |struct_size|.
55        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
56        4,                                        // |element_num_bytes|.
57        4000                                      // |capacity_num_bytes|.
58    };
59    MojoCreateDataPipeOptions validated_options = {0};
60    EXPECT_EQ(MOJO_RESULT_OK,
61              DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
62                                              &validated_options));
63    scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
64    dp->ProducerClose();
65    dp->ConsumerClose();
66  }
67  {
68    const MojoCreateDataPipeOptions options = {
69        kSizeOfOptions,                                  // |struct_size|.
70        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD,  // |flags|.
71        7,                                               // |element_num_bytes|.
72        7000000  // |capacity_num_bytes|.
73    };
74    MojoCreateDataPipeOptions validated_options = {0};
75    EXPECT_EQ(MOJO_RESULT_OK,
76              DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
77                                              &validated_options));
78    scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
79    dp->ProducerClose();
80    dp->ConsumerClose();
81  }
82  // Default capacity.
83  {
84    const MojoCreateDataPipeOptions options = {
85        kSizeOfOptions,                                  // |struct_size|.
86        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD,  // |flags|.
87        100,                                             // |element_num_bytes|.
88        0  // |capacity_num_bytes|.
89    };
90    MojoCreateDataPipeOptions validated_options = {0};
91    EXPECT_EQ(MOJO_RESULT_OK,
92              DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
93                                              &validated_options));
94    scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
95    dp->ProducerClose();
96    dp->ConsumerClose();
97  }
98}
99
100TEST(LocalDataPipeTest, SimpleReadWrite) {
101  const MojoCreateDataPipeOptions options = {
102      kSizeOfOptions,                           // |struct_size|.
103      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
104      static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
105      1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
106  };
107  MojoCreateDataPipeOptions validated_options = {0};
108  EXPECT_EQ(MOJO_RESULT_OK,
109            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
110                                            &validated_options));
111
112  scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
113
114  int32_t elements[10] = {0};
115  uint32_t num_bytes = 0;
116
117  // Try reading; nothing there yet.
118  num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
119  EXPECT_EQ(
120      MOJO_RESULT_SHOULD_WAIT,
121      dp->ConsumerReadData(
122          UserPointer<void>(elements), MakeUserPointer(&num_bytes), false));
123
124  // Query; nothing there yet.
125  num_bytes = 0;
126  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
127  EXPECT_EQ(0u, num_bytes);
128
129  // Discard; nothing there yet.
130  num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
131  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
132            dp->ConsumerDiscardData(MakeUserPointer(&num_bytes), false));
133
134  // Read with invalid |num_bytes|.
135  num_bytes = sizeof(elements[0]) + 1;
136  EXPECT_EQ(
137      MOJO_RESULT_INVALID_ARGUMENT,
138      dp->ConsumerReadData(
139          UserPointer<void>(elements), MakeUserPointer(&num_bytes), false));
140
141  // Write two elements.
142  elements[0] = 123;
143  elements[1] = 456;
144  num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
145  EXPECT_EQ(MOJO_RESULT_OK,
146            dp->ProducerWriteData(UserPointer<const void>(elements),
147                                  MakeUserPointer(&num_bytes),
148                                  false));
149  // It should have written everything (even without "all or none").
150  EXPECT_EQ(2u * sizeof(elements[0]), num_bytes);
151
152  // Query.
153  num_bytes = 0;
154  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
155  EXPECT_EQ(2 * sizeof(elements[0]), num_bytes);
156
157  // Read one element.
158  elements[0] = -1;
159  elements[1] = -1;
160  num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
161  EXPECT_EQ(
162      MOJO_RESULT_OK,
163      dp->ConsumerReadData(
164          UserPointer<void>(elements), MakeUserPointer(&num_bytes), false));
165  EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
166  EXPECT_EQ(123, elements[0]);
167  EXPECT_EQ(-1, elements[1]);
168
169  // Query.
170  num_bytes = 0;
171  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
172  EXPECT_EQ(1 * sizeof(elements[0]), num_bytes);
173
174  // Try to read two elements, with "all or none".
175  elements[0] = -1;
176  elements[1] = -1;
177  num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
178  EXPECT_EQ(
179      MOJO_RESULT_OUT_OF_RANGE,
180      dp->ConsumerReadData(
181          UserPointer<void>(elements), MakeUserPointer(&num_bytes), true));
182  EXPECT_EQ(-1, elements[0]);
183  EXPECT_EQ(-1, elements[1]);
184
185  // Try to read two elements, without "all or none".
186  elements[0] = -1;
187  elements[1] = -1;
188  num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
189  EXPECT_EQ(
190      MOJO_RESULT_OK,
191      dp->ConsumerReadData(
192          UserPointer<void>(elements), MakeUserPointer(&num_bytes), false));
193  EXPECT_EQ(456, elements[0]);
194  EXPECT_EQ(-1, elements[1]);
195
196  // Query.
197  num_bytes = 0;
198  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
199  EXPECT_EQ(0u, num_bytes);
200
201  dp->ProducerClose();
202  dp->ConsumerClose();
203}
204
205// Note: The "basic" waiting tests test that the "wait states" are correct in
206// various situations; they don't test that waiters are properly awoken on state
207// changes. (For that, we need to use multiple threads.)
208TEST(LocalDataPipeTest, BasicProducerWaiting) {
209  // Note: We take advantage of the fact that for |LocalDataPipe|, capacities
210  // are strict maximums. This is not guaranteed by the API.
211
212  const MojoCreateDataPipeOptions options = {
213      kSizeOfOptions,                           // |struct_size|.
214      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
215      static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
216      2 * sizeof(int32_t)                       // |capacity_num_bytes|.
217  };
218  MojoCreateDataPipeOptions validated_options = {0};
219  EXPECT_EQ(MOJO_RESULT_OK,
220            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
221                                            &validated_options));
222
223  scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
224  Waiter waiter;
225  uint32_t context = 0;
226  HandleSignalsState hss;
227
228  // Never readable.
229  waiter.Init();
230  hss = HandleSignalsState();
231  EXPECT_EQ(
232      MOJO_RESULT_FAILED_PRECONDITION,
233      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 12, &hss));
234  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
235  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
236
237  // Already writable.
238  waiter.Init();
239  hss = HandleSignalsState();
240  EXPECT_EQ(
241      MOJO_RESULT_ALREADY_EXISTS,
242      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 34, &hss));
243
244  // Write two elements.
245  int32_t elements[2] = {123, 456};
246  uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
247  EXPECT_EQ(MOJO_RESULT_OK,
248            dp->ProducerWriteData(UserPointer<const void>(elements),
249                                  MakeUserPointer(&num_bytes),
250                                  true));
251  EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
252
253  // Adding a waiter should now succeed.
254  waiter.Init();
255  ASSERT_EQ(
256      MOJO_RESULT_OK,
257      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56, nullptr));
258  // And it shouldn't be writable yet.
259  EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
260  hss = HandleSignalsState();
261  dp->ProducerRemoveWaiter(&waiter, &hss);
262  EXPECT_EQ(0u, hss.satisfied_signals);
263  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
264
265  // Do it again.
266  waiter.Init();
267  ASSERT_EQ(
268      MOJO_RESULT_OK,
269      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 78, nullptr));
270
271  // Read one element.
272  elements[0] = -1;
273  elements[1] = -1;
274  num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
275  EXPECT_EQ(
276      MOJO_RESULT_OK,
277      dp->ConsumerReadData(
278          UserPointer<void>(elements), MakeUserPointer(&num_bytes), true));
279  EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
280  EXPECT_EQ(123, elements[0]);
281  EXPECT_EQ(-1, elements[1]);
282
283  // Waiting should now succeed.
284  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(1000, &context));
285  EXPECT_EQ(78u, context);
286  hss = HandleSignalsState();
287  dp->ProducerRemoveWaiter(&waiter, &hss);
288  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
289  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
290
291  // Try writing, using a two-phase write.
292  void* buffer = nullptr;
293  num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
294  EXPECT_EQ(MOJO_RESULT_OK,
295            dp->ProducerBeginWriteData(
296                MakeUserPointer(&buffer), MakeUserPointer(&num_bytes), false));
297  EXPECT_TRUE(buffer);
298  EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
299
300  static_cast<int32_t*>(buffer)[0] = 789;
301  EXPECT_EQ(MOJO_RESULT_OK,
302            dp->ProducerEndWriteData(
303                static_cast<uint32_t>(1u * sizeof(elements[0]))));
304
305  // Add a waiter.
306  waiter.Init();
307  ASSERT_EQ(
308      MOJO_RESULT_OK,
309      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 90, nullptr));
310
311  // Read one element, using a two-phase read.
312  const void* read_buffer = nullptr;
313  num_bytes = 0u;
314  EXPECT_EQ(
315      MOJO_RESULT_OK,
316      dp->ConsumerBeginReadData(
317          MakeUserPointer(&read_buffer), MakeUserPointer(&num_bytes), false));
318  EXPECT_TRUE(read_buffer);
319  // Since we only read one element (after having written three in all), the
320  // two-phase read should only allow us to read one. This checks an
321  // implementation detail!
322  EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
323  EXPECT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
324  EXPECT_EQ(
325      MOJO_RESULT_OK,
326      dp->ConsumerEndReadData(static_cast<uint32_t>(1u * sizeof(elements[0]))));
327
328  // Waiting should succeed.
329  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(1000, &context));
330  EXPECT_EQ(90u, context);
331  hss = HandleSignalsState();
332  dp->ProducerRemoveWaiter(&waiter, &hss);
333  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
334  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
335
336  // Write one element.
337  elements[0] = 123;
338  num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
339  EXPECT_EQ(MOJO_RESULT_OK,
340            dp->ProducerWriteData(UserPointer<const void>(elements),
341                                  MakeUserPointer(&num_bytes),
342                                  false));
343  EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
344
345  // Add a waiter.
346  waiter.Init();
347  ASSERT_EQ(
348      MOJO_RESULT_OK,
349      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12, nullptr));
350
351  // Close the consumer.
352  dp->ConsumerClose();
353
354  // It should now be never-writable.
355  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, waiter.Wait(1000, &context));
356  EXPECT_EQ(12u, context);
357  hss = HandleSignalsState();
358  dp->ProducerRemoveWaiter(&waiter, &hss);
359  EXPECT_EQ(0u, hss.satisfied_signals);
360  EXPECT_EQ(0u, hss.satisfiable_signals);
361
362  dp->ProducerClose();
363}
364
365TEST(LocalDataPipeTest, BasicConsumerWaiting) {
366  const MojoCreateDataPipeOptions options = {
367      kSizeOfOptions,                           // |struct_size|.
368      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
369      static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
370      1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
371  };
372  MojoCreateDataPipeOptions validated_options = {0};
373  EXPECT_EQ(MOJO_RESULT_OK,
374            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
375                                            &validated_options));
376
377  {
378    scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
379    Waiter waiter;
380    uint32_t context = 0;
381    HandleSignalsState hss;
382
383    // Never writable.
384    waiter.Init();
385    hss = HandleSignalsState();
386    EXPECT_EQ(
387        MOJO_RESULT_FAILED_PRECONDITION,
388        dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12, &hss));
389    EXPECT_EQ(0u, hss.satisfied_signals);
390    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
391
392    // Not yet readable.
393    waiter.Init();
394    ASSERT_EQ(MOJO_RESULT_OK,
395              dp->ConsumerAddWaiter(
396                  &waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, nullptr));
397    EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
398    hss = HandleSignalsState();
399    dp->ConsumerRemoveWaiter(&waiter, &hss);
400    EXPECT_EQ(0u, hss.satisfied_signals);
401    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
402
403    // Write two elements.
404    int32_t elements[2] = {123, 456};
405    uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
406    EXPECT_EQ(MOJO_RESULT_OK,
407              dp->ProducerWriteData(UserPointer<const void>(elements),
408                                    MakeUserPointer(&num_bytes),
409                                    true));
410
411    // Should already be readable.
412    waiter.Init();
413    hss = HandleSignalsState();
414    EXPECT_EQ(
415        MOJO_RESULT_ALREADY_EXISTS,
416        dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 56, &hss));
417    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
418    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
419
420    // Discard one element.
421    num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
422    EXPECT_EQ(MOJO_RESULT_OK,
423              dp->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
424    EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
425
426    // Should still be readable.
427    waiter.Init();
428    hss = HandleSignalsState();
429    EXPECT_EQ(
430        MOJO_RESULT_ALREADY_EXISTS,
431        dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, &hss));
432    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
433    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
434
435    // Read one element.
436    elements[0] = -1;
437    elements[1] = -1;
438    num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
439    EXPECT_EQ(
440        MOJO_RESULT_OK,
441        dp->ConsumerReadData(
442            UserPointer<void>(elements), MakeUserPointer(&num_bytes), true));
443    EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
444    EXPECT_EQ(456, elements[0]);
445    EXPECT_EQ(-1, elements[1]);
446
447    // Adding a waiter should now succeed.
448    waiter.Init();
449    ASSERT_EQ(MOJO_RESULT_OK,
450              dp->ConsumerAddWaiter(
451                  &waiter, MOJO_HANDLE_SIGNAL_READABLE, 90, nullptr));
452
453    // Write one element.
454    elements[0] = 789;
455    elements[1] = -1;
456    num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
457    EXPECT_EQ(MOJO_RESULT_OK,
458              dp->ProducerWriteData(UserPointer<const void>(elements),
459                                    MakeUserPointer(&num_bytes),
460                                    true));
461
462    // Waiting should now succeed.
463    EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(1000, &context));
464    EXPECT_EQ(90u, context);
465    hss = HandleSignalsState();
466    dp->ConsumerRemoveWaiter(&waiter, &hss);
467    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
468    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
469
470    // Close the producer.
471    dp->ProducerClose();
472
473    // Should still be readable.
474    waiter.Init();
475    hss = HandleSignalsState();
476    EXPECT_EQ(
477        MOJO_RESULT_ALREADY_EXISTS,
478        dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 12, &hss));
479    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
480    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
481
482    // Read one element.
483    elements[0] = -1;
484    elements[1] = -1;
485    num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
486    EXPECT_EQ(
487        MOJO_RESULT_OK,
488        dp->ConsumerReadData(
489            UserPointer<void>(elements), MakeUserPointer(&num_bytes), true));
490    EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
491    EXPECT_EQ(789, elements[0]);
492    EXPECT_EQ(-1, elements[1]);
493
494    // Should be never-readable.
495    waiter.Init();
496    hss = HandleSignalsState();
497    EXPECT_EQ(
498        MOJO_RESULT_FAILED_PRECONDITION,
499        dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, &hss));
500    EXPECT_EQ(0u, hss.satisfied_signals);
501    EXPECT_EQ(0u, hss.satisfiable_signals);
502
503    dp->ConsumerClose();
504  }
505
506  // Test with two-phase APIs and closing the producer with an active consumer
507  // waiter.
508  {
509    scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
510    Waiter waiter;
511    uint32_t context = 0;
512    HandleSignalsState hss;
513
514    // Write two elements.
515    int32_t* elements = nullptr;
516    void* buffer = nullptr;
517    // Request room for three (but we'll only write two).
518    uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
519    EXPECT_EQ(MOJO_RESULT_OK,
520              dp->ProducerBeginWriteData(
521                  MakeUserPointer(&buffer), MakeUserPointer(&num_bytes), true));
522    EXPECT_TRUE(buffer);
523    EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
524    elements = static_cast<int32_t*>(buffer);
525    elements[0] = 123;
526    elements[1] = 456;
527    EXPECT_EQ(MOJO_RESULT_OK,
528              dp->ProducerEndWriteData(
529                  static_cast<uint32_t>(2u * sizeof(elements[0]))));
530
531    // Should already be readable.
532    waiter.Init();
533    hss = HandleSignalsState();
534    EXPECT_EQ(
535        MOJO_RESULT_ALREADY_EXISTS,
536        dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 12, &hss));
537    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
538    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
539
540    // Read one element.
541    // Request two in all-or-none mode, but only read one.
542    const void* read_buffer = nullptr;
543    num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
544    EXPECT_EQ(
545        MOJO_RESULT_OK,
546        dp->ConsumerBeginReadData(
547            MakeUserPointer(&read_buffer), MakeUserPointer(&num_bytes), true));
548    EXPECT_TRUE(read_buffer);
549    EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
550    const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
551    EXPECT_EQ(123, read_elements[0]);
552    EXPECT_EQ(MOJO_RESULT_OK,
553              dp->ConsumerEndReadData(
554                  static_cast<uint32_t>(1u * sizeof(elements[0]))));
555
556    // Should still be readable.
557    waiter.Init();
558    hss = HandleSignalsState();
559    EXPECT_EQ(
560        MOJO_RESULT_ALREADY_EXISTS,
561        dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, &hss));
562    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
563    EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
564
565    // Read one element.
566    // Request three, but not in all-or-none mode.
567    read_buffer = nullptr;
568    num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
569    EXPECT_EQ(
570        MOJO_RESULT_OK,
571        dp->ConsumerBeginReadData(
572            MakeUserPointer(&read_buffer), MakeUserPointer(&num_bytes), false));
573    EXPECT_TRUE(read_buffer);
574    EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
575    read_elements = static_cast<const int32_t*>(read_buffer);
576    EXPECT_EQ(456, read_elements[0]);
577    EXPECT_EQ(MOJO_RESULT_OK,
578              dp->ConsumerEndReadData(
579                  static_cast<uint32_t>(1u * sizeof(elements[0]))));
580
581    // Adding a waiter should now succeed.
582    waiter.Init();
583    ASSERT_EQ(MOJO_RESULT_OK,
584              dp->ConsumerAddWaiter(
585                  &waiter, MOJO_HANDLE_SIGNAL_READABLE, 56, nullptr));
586
587    // Close the producer.
588    dp->ProducerClose();
589
590    // Should be never-readable.
591    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, waiter.Wait(1000, &context));
592    EXPECT_EQ(56u, context);
593    hss = HandleSignalsState();
594    dp->ConsumerRemoveWaiter(&waiter, &hss);
595    EXPECT_EQ(0u, hss.satisfied_signals);
596    EXPECT_EQ(0u, hss.satisfiable_signals);
597
598    dp->ConsumerClose();
599  }
600}
601
602// Tests that data pipes aren't writable/readable during two-phase writes/reads.
603TEST(LocalDataPipeTest, BasicTwoPhaseWaiting) {
604  const MojoCreateDataPipeOptions options = {
605      kSizeOfOptions,                           // |struct_size|.
606      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
607      static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
608      1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
609  };
610  MojoCreateDataPipeOptions validated_options = {0};
611  EXPECT_EQ(MOJO_RESULT_OK,
612            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
613                                            &validated_options));
614
615  scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
616  Waiter waiter;
617  HandleSignalsState hss;
618
619  // It should be writable.
620  waiter.Init();
621  hss = HandleSignalsState();
622  EXPECT_EQ(
623      MOJO_RESULT_ALREADY_EXISTS,
624      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
625  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
626  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
627
628  uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
629  void* write_ptr = nullptr;
630  EXPECT_EQ(
631      MOJO_RESULT_OK,
632      dp->ProducerBeginWriteData(
633          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), false));
634  EXPECT_TRUE(write_ptr);
635  EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
636
637  // At this point, it shouldn't be writable.
638  waiter.Init();
639  ASSERT_EQ(
640      MOJO_RESULT_OK,
641      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1, nullptr));
642  EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
643  hss = HandleSignalsState();
644  dp->ProducerRemoveWaiter(&waiter, &hss);
645  EXPECT_EQ(0u, hss.satisfied_signals);
646  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
647
648  // It shouldn't be readable yet either.
649  waiter.Init();
650  ASSERT_EQ(
651      MOJO_RESULT_OK,
652      dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 2, nullptr));
653  EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
654  hss = HandleSignalsState();
655  dp->ConsumerRemoveWaiter(&waiter, &hss);
656  EXPECT_EQ(0u, hss.satisfied_signals);
657  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
658
659  static_cast<int32_t*>(write_ptr)[0] = 123;
660  EXPECT_EQ(
661      MOJO_RESULT_OK,
662      dp->ProducerEndWriteData(static_cast<uint32_t>(1u * sizeof(int32_t))));
663
664  // It should be writable again.
665  waiter.Init();
666  hss = HandleSignalsState();
667  EXPECT_EQ(
668      MOJO_RESULT_ALREADY_EXISTS,
669      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 3, &hss));
670  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
671  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
672
673  // And readable.
674  waiter.Init();
675  hss = HandleSignalsState();
676  EXPECT_EQ(
677      MOJO_RESULT_ALREADY_EXISTS,
678      dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 4, &hss));
679  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
680  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
681
682  // Start another two-phase write and check that it's readable even in the
683  // middle of it.
684  num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
685  write_ptr = nullptr;
686  EXPECT_EQ(
687      MOJO_RESULT_OK,
688      dp->ProducerBeginWriteData(
689          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), false));
690  EXPECT_TRUE(write_ptr);
691  EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
692
693  // It should be readable.
694  waiter.Init();
695  hss = HandleSignalsState();
696  EXPECT_EQ(
697      MOJO_RESULT_ALREADY_EXISTS,
698      dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 5, &hss));
699  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
700  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
701
702  // End the two-phase write without writing anything.
703  EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(0u));
704
705  // Start a two-phase read.
706  num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
707  const void* read_ptr = nullptr;
708  EXPECT_EQ(
709      MOJO_RESULT_OK,
710      dp->ConsumerBeginReadData(
711          MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), false));
712  EXPECT_TRUE(read_ptr);
713  EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
714
715  // At this point, it should still be writable.
716  waiter.Init();
717  hss = HandleSignalsState();
718  EXPECT_EQ(
719      MOJO_RESULT_ALREADY_EXISTS,
720      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 6, &hss));
721  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
722  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
723
724  // But not readable.
725  waiter.Init();
726  ASSERT_EQ(
727      MOJO_RESULT_OK,
728      dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 7, nullptr));
729  EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
730  hss = HandleSignalsState();
731  dp->ConsumerRemoveWaiter(&waiter, &hss);
732  EXPECT_EQ(0u, hss.satisfied_signals);
733  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
734
735  // End the two-phase read without reading anything.
736  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(0u));
737
738  // It should be readable again.
739  waiter.Init();
740  hss = HandleSignalsState();
741  EXPECT_EQ(
742      MOJO_RESULT_ALREADY_EXISTS,
743      dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 8, &hss));
744  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
745  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
746
747  dp->ProducerClose();
748  dp->ConsumerClose();
749}
750
751// Test that a "may discard" data pipe is writable even when it's full.
752TEST(LocalDataPipeTest, BasicMayDiscardWaiting) {
753  const MojoCreateDataPipeOptions options = {
754      kSizeOfOptions,                                  // |struct_size|.
755      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD,  // |flags|.
756      static_cast<uint32_t>(sizeof(int32_t)),          // |element_num_bytes|.
757      1 * sizeof(int32_t)                              // |capacity_num_bytes|.
758  };
759  MojoCreateDataPipeOptions validated_options = {0};
760  EXPECT_EQ(MOJO_RESULT_OK,
761            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
762                                            &validated_options));
763
764  scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
765  Waiter waiter;
766  HandleSignalsState hss;
767
768  // Writable.
769  waiter.Init();
770  hss = HandleSignalsState();
771  EXPECT_EQ(
772      MOJO_RESULT_ALREADY_EXISTS,
773      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
774  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
775  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
776
777  // Not readable.
778  waiter.Init();
779  ASSERT_EQ(
780      MOJO_RESULT_OK,
781      dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr));
782  EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
783  hss = HandleSignalsState();
784  dp->ConsumerRemoveWaiter(&waiter, &hss);
785  EXPECT_EQ(0u, hss.satisfied_signals);
786  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
787
788  uint32_t num_bytes = static_cast<uint32_t>(sizeof(int32_t));
789  int32_t element = 123;
790  EXPECT_EQ(MOJO_RESULT_OK,
791            dp->ProducerWriteData(UserPointer<const void>(&element),
792                                  MakeUserPointer(&num_bytes),
793                                  false));
794  EXPECT_EQ(static_cast<uint32_t>(sizeof(int32_t)), num_bytes);
795
796  // Still writable (even though it's full).
797  waiter.Init();
798  hss = HandleSignalsState();
799  EXPECT_EQ(
800      MOJO_RESULT_ALREADY_EXISTS,
801      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 2, &hss));
802  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
803  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
804
805  // Now readable.
806  waiter.Init();
807  hss = HandleSignalsState();
808  EXPECT_EQ(
809      MOJO_RESULT_ALREADY_EXISTS,
810      dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss));
811  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
812  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
813
814  // Overwrite that element.
815  num_bytes = static_cast<uint32_t>(sizeof(int32_t));
816  element = 456;
817  EXPECT_EQ(MOJO_RESULT_OK,
818            dp->ProducerWriteData(UserPointer<const void>(&element),
819                                  MakeUserPointer(&num_bytes),
820                                  false));
821  EXPECT_EQ(static_cast<uint32_t>(sizeof(int32_t)), num_bytes);
822
823  // Still writable.
824  waiter.Init();
825  hss = HandleSignalsState();
826  EXPECT_EQ(
827      MOJO_RESULT_ALREADY_EXISTS,
828      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss));
829  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
830  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
831
832  // And still readable.
833  waiter.Init();
834  hss = HandleSignalsState();
835  EXPECT_EQ(
836      MOJO_RESULT_ALREADY_EXISTS,
837      dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 5, &hss));
838  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
839  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
840
841  // Read that element.
842  num_bytes = static_cast<uint32_t>(sizeof(int32_t));
843  element = 0;
844  EXPECT_EQ(
845      MOJO_RESULT_OK,
846      dp->ConsumerReadData(
847          UserPointer<void>(&element), MakeUserPointer(&num_bytes), false));
848  EXPECT_EQ(static_cast<uint32_t>(sizeof(int32_t)), num_bytes);
849  EXPECT_EQ(456, element);
850
851  // Still writable.
852  waiter.Init();
853  hss = HandleSignalsState();
854  EXPECT_EQ(
855      MOJO_RESULT_ALREADY_EXISTS,
856      dp->ProducerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 6, &hss));
857  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
858  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfiable_signals);
859
860  // No longer readable.
861  waiter.Init();
862  ASSERT_EQ(
863      MOJO_RESULT_OK,
864      dp->ConsumerAddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 7, nullptr));
865  EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
866  hss = HandleSignalsState();
867  dp->ConsumerRemoveWaiter(&waiter, &hss);
868  EXPECT_EQ(0u, hss.satisfied_signals);
869  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
870
871  dp->ProducerClose();
872  dp->ConsumerClose();
873}
874
875void Seq(int32_t start, size_t count, int32_t* out) {
876  for (size_t i = 0; i < count; i++)
877    out[i] = start + static_cast<int32_t>(i);
878}
879
880TEST(LocalDataPipeTest, MayDiscard) {
881  const MojoCreateDataPipeOptions options = {
882      kSizeOfOptions,                                  // |struct_size|.
883      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD,  // |flags|.
884      static_cast<uint32_t>(sizeof(int32_t)),          // |element_num_bytes|.
885      10 * sizeof(int32_t)                             // |capacity_num_bytes|.
886  };
887  MojoCreateDataPipeOptions validated_options = {0};
888  EXPECT_EQ(MOJO_RESULT_OK,
889            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
890                                            &validated_options));
891
892  scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
893
894  int32_t buffer[100] = {0};
895  uint32_t num_bytes = 0;
896
897  num_bytes = 20u * sizeof(int32_t);
898  Seq(0, arraysize(buffer), buffer);
899  // Try writing more than capacity. (This test relies on the implementation
900  // enforcing the capacity strictly.)
901  EXPECT_EQ(
902      MOJO_RESULT_OK,
903      dp->ProducerWriteData(
904          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), false));
905  EXPECT_EQ(10u * sizeof(int32_t), num_bytes);
906
907  // Read half of what we wrote.
908  num_bytes = 5u * sizeof(int32_t);
909  memset(buffer, 0xab, sizeof(buffer));
910  EXPECT_EQ(MOJO_RESULT_OK,
911            dp->ConsumerReadData(
912                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
913  EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
914  int32_t expected_buffer[100];
915  memset(expected_buffer, 0xab, sizeof(expected_buffer));
916  Seq(0, 5u, expected_buffer);
917  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
918  // Internally, a circular buffer would now look like:
919  //   -, -, -, -, -, 5, 6, 7, 8, 9
920
921  // Write a bit more than the space that's available.
922  num_bytes = 8u * sizeof(int32_t);
923  Seq(100, arraysize(buffer), buffer);
924  EXPECT_EQ(
925      MOJO_RESULT_OK,
926      dp->ProducerWriteData(
927          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), false));
928  EXPECT_EQ(8u * sizeof(int32_t), num_bytes);
929  // Internally, a circular buffer would now look like:
930  //   100, 101, 102, 103, 104, 105, 106, 107, 8, 9
931
932  // Read half of what's available.
933  num_bytes = 5u * sizeof(int32_t);
934  memset(buffer, 0xab, sizeof(buffer));
935  EXPECT_EQ(MOJO_RESULT_OK,
936            dp->ConsumerReadData(
937                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
938  EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
939  memset(expected_buffer, 0xab, sizeof(expected_buffer));
940  expected_buffer[0] = 8;
941  expected_buffer[1] = 9;
942  expected_buffer[2] = 100;
943  expected_buffer[3] = 101;
944  expected_buffer[4] = 102;
945  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
946  // Internally, a circular buffer would now look like:
947  //   -, -, -, 103, 104, 105, 106, 107, -, -
948
949  // Write one integer.
950  num_bytes = 1u * sizeof(int32_t);
951  Seq(200, arraysize(buffer), buffer);
952  EXPECT_EQ(
953      MOJO_RESULT_OK,
954      dp->ProducerWriteData(
955          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), false));
956  EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
957  // Internally, a circular buffer would now look like:
958  //   -, -, -, 103, 104, 105, 106, 107, 200, -
959
960  // Write five more.
961  num_bytes = 5u * sizeof(int32_t);
962  Seq(300, arraysize(buffer), buffer);
963  EXPECT_EQ(
964      MOJO_RESULT_OK,
965      dp->ProducerWriteData(
966          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), false));
967  EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
968  // Internally, a circular buffer would now look like:
969  //   301, 302, 303, 304, 104, 105, 106, 107, 200, 300
970
971  // Read it all.
972  num_bytes = sizeof(buffer);
973  memset(buffer, 0xab, sizeof(buffer));
974  EXPECT_EQ(MOJO_RESULT_OK,
975            dp->ConsumerReadData(
976                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
977  EXPECT_EQ(10u * sizeof(int32_t), num_bytes);
978  memset(expected_buffer, 0xab, sizeof(expected_buffer));
979  expected_buffer[0] = 104;
980  expected_buffer[1] = 105;
981  expected_buffer[2] = 106;
982  expected_buffer[3] = 107;
983  expected_buffer[4] = 200;
984  expected_buffer[5] = 300;
985  expected_buffer[6] = 301;
986  expected_buffer[7] = 302;
987  expected_buffer[8] = 303;
988  expected_buffer[9] = 304;
989  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
990
991  // Test two-phase writes, including in all-or-none mode.
992  // Note: Again, the following depends on an implementation detail -- namely
993  // that the write pointer will point at the 5th element of the buffer (and the
994  // buffer has exactly the capacity requested).
995
996  num_bytes = 0u;
997  void* write_ptr = nullptr;
998  EXPECT_EQ(
999      MOJO_RESULT_OK,
1000      dp->ProducerBeginWriteData(
1001          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), false));
1002  EXPECT_TRUE(write_ptr);
1003  EXPECT_EQ(6u * sizeof(int32_t), num_bytes);
1004  Seq(400, 6, static_cast<int32_t*>(write_ptr));
1005  EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(6u * sizeof(int32_t)));
1006  // Internally, a circular buffer would now look like:
1007  //   -, -, -, -, 400, 401, 402, 403, 404, 405
1008
1009  // |ProducerBeginWriteData()| ignores |*num_bytes| except in "all-or-none"
1010  // mode.
1011  num_bytes = 6u * sizeof(int32_t);
1012  write_ptr = nullptr;
1013  EXPECT_EQ(
1014      MOJO_RESULT_OK,
1015      dp->ProducerBeginWriteData(
1016          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), false));
1017  EXPECT_EQ(4u * sizeof(int32_t), num_bytes);
1018  static_cast<int32_t*>(write_ptr)[0] = 500;
1019  EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(1u * sizeof(int32_t)));
1020  // Internally, a circular buffer would now look like:
1021  //   500, -, -, -, 400, 401, 402, 403, 404, 405
1022
1023  // Requesting a 10-element buffer in all-or-none mode fails at this point.
1024  num_bytes = 10u * sizeof(int32_t);
1025  write_ptr = nullptr;
1026  EXPECT_EQ(
1027      MOJO_RESULT_OUT_OF_RANGE,
1028      dp->ProducerBeginWriteData(
1029          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), true));
1030
1031  // But requesting, say, a 5-element (up to 9, really) buffer should be okay.
1032  // It will discard two elements.
1033  num_bytes = 5u * sizeof(int32_t);
1034  write_ptr = nullptr;
1035  EXPECT_EQ(
1036      MOJO_RESULT_OK,
1037      dp->ProducerBeginWriteData(
1038          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), true));
1039  EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
1040  // Only write 4 elements though.
1041  Seq(600, 4, static_cast<int32_t*>(write_ptr));
1042  EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(4u * sizeof(int32_t)));
1043  // Internally, a circular buffer would now look like:
1044  //   500, 600, 601, 602, 603, -, 402, 403, 404, 405
1045
1046  // Do this again. Make sure we can get a buffer all the way out to the end of
1047  // the internal buffer.
1048  num_bytes = 5u * sizeof(int32_t);
1049  write_ptr = nullptr;
1050  EXPECT_EQ(
1051      MOJO_RESULT_OK,
1052      dp->ProducerBeginWriteData(
1053          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), true));
1054  EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
1055  // Only write 3 elements though.
1056  Seq(700, 3, static_cast<int32_t*>(write_ptr));
1057  EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(3u * sizeof(int32_t)));
1058  // Internally, a circular buffer would now look like:
1059  //   500, 600, 601, 602, 603, 700, 701, 702, -, -
1060
1061  // Read everything.
1062  num_bytes = sizeof(buffer);
1063  memset(buffer, 0xab, sizeof(buffer));
1064  EXPECT_EQ(MOJO_RESULT_OK,
1065            dp->ConsumerReadData(
1066                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
1067  EXPECT_EQ(8u * sizeof(int32_t), num_bytes);
1068  memset(expected_buffer, 0xab, sizeof(expected_buffer));
1069  expected_buffer[0] = 500;
1070  expected_buffer[1] = 600;
1071  expected_buffer[2] = 601;
1072  expected_buffer[3] = 602;
1073  expected_buffer[4] = 603;
1074  expected_buffer[5] = 700;
1075  expected_buffer[6] = 701;
1076  expected_buffer[7] = 702;
1077  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1078
1079  dp->ProducerClose();
1080  dp->ConsumerClose();
1081}
1082
1083TEST(LocalDataPipeTest, AllOrNone) {
1084  const MojoCreateDataPipeOptions options = {
1085      kSizeOfOptions,                           // |struct_size|.
1086      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1087      static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
1088      10 * sizeof(int32_t)                      // |capacity_num_bytes|.
1089  };
1090  MojoCreateDataPipeOptions validated_options = {0};
1091  EXPECT_EQ(MOJO_RESULT_OK,
1092            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
1093                                            &validated_options));
1094
1095  scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
1096
1097  // Try writing way too much.
1098  uint32_t num_bytes = 20u * sizeof(int32_t);
1099  int32_t buffer[100];
1100  Seq(0, arraysize(buffer), buffer);
1101  EXPECT_EQ(
1102      MOJO_RESULT_OUT_OF_RANGE,
1103      dp->ProducerWriteData(
1104          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true));
1105
1106  // Should still be empty.
1107  num_bytes = ~0u;
1108  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1109  EXPECT_EQ(0u, num_bytes);
1110
1111  // Write some data.
1112  num_bytes = 5u * sizeof(int32_t);
1113  Seq(100, arraysize(buffer), buffer);
1114  EXPECT_EQ(
1115      MOJO_RESULT_OK,
1116      dp->ProducerWriteData(
1117          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true));
1118  EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
1119
1120  // Half full.
1121  num_bytes = 0u;
1122  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1123  EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
1124
1125  // Too much.
1126  num_bytes = 6u * sizeof(int32_t);
1127  Seq(200, arraysize(buffer), buffer);
1128  EXPECT_EQ(
1129      MOJO_RESULT_OUT_OF_RANGE,
1130      dp->ProducerWriteData(
1131          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true));
1132
1133  // Try reading too much.
1134  num_bytes = 11u * sizeof(int32_t);
1135  memset(buffer, 0xab, sizeof(buffer));
1136  EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
1137            dp->ConsumerReadData(
1138                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
1139  int32_t expected_buffer[100];
1140  memset(expected_buffer, 0xab, sizeof(expected_buffer));
1141  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1142
1143  // Try discarding too much.
1144  num_bytes = 11u * sizeof(int32_t);
1145  EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
1146            dp->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
1147
1148  // Just a little.
1149  num_bytes = 2u * sizeof(int32_t);
1150  Seq(300, arraysize(buffer), buffer);
1151  EXPECT_EQ(
1152      MOJO_RESULT_OK,
1153      dp->ProducerWriteData(
1154          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true));
1155  EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
1156
1157  // Just right.
1158  num_bytes = 3u * sizeof(int32_t);
1159  Seq(400, arraysize(buffer), buffer);
1160  EXPECT_EQ(
1161      MOJO_RESULT_OK,
1162      dp->ProducerWriteData(
1163          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true));
1164  EXPECT_EQ(3u * sizeof(int32_t), num_bytes);
1165
1166  // Exactly full.
1167  num_bytes = 0u;
1168  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1169  EXPECT_EQ(10u * sizeof(int32_t), num_bytes);
1170
1171  // Read half.
1172  num_bytes = 5u * sizeof(int32_t);
1173  memset(buffer, 0xab, sizeof(buffer));
1174  EXPECT_EQ(MOJO_RESULT_OK,
1175            dp->ConsumerReadData(
1176                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
1177  EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
1178  memset(expected_buffer, 0xab, sizeof(expected_buffer));
1179  Seq(100, 5, expected_buffer);
1180  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1181
1182  // Try reading too much again.
1183  num_bytes = 6u * sizeof(int32_t);
1184  memset(buffer, 0xab, sizeof(buffer));
1185  EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
1186            dp->ConsumerReadData(
1187                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
1188  memset(expected_buffer, 0xab, sizeof(expected_buffer));
1189  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1190
1191  // Try discarding too much again.
1192  num_bytes = 6u * sizeof(int32_t);
1193  EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
1194            dp->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
1195
1196  // Discard a little.
1197  num_bytes = 2u * sizeof(int32_t);
1198  EXPECT_EQ(MOJO_RESULT_OK,
1199            dp->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
1200  EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
1201
1202  // Three left.
1203  num_bytes = 0u;
1204  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1205  EXPECT_EQ(3u * sizeof(int32_t), num_bytes);
1206
1207  // Close the producer, then test producer-closed cases.
1208  dp->ProducerClose();
1209
1210  // Try reading too much; "failed precondition" since the producer is closed.
1211  num_bytes = 4u * sizeof(int32_t);
1212  memset(buffer, 0xab, sizeof(buffer));
1213  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1214            dp->ConsumerReadData(
1215                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
1216  memset(expected_buffer, 0xab, sizeof(expected_buffer));
1217  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1218
1219  // Try discarding too much; "failed precondition" again.
1220  num_bytes = 4u * sizeof(int32_t);
1221  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1222            dp->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
1223
1224  // Read a little.
1225  num_bytes = 2u * sizeof(int32_t);
1226  memset(buffer, 0xab, sizeof(buffer));
1227  EXPECT_EQ(MOJO_RESULT_OK,
1228            dp->ConsumerReadData(
1229                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
1230  EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
1231  memset(expected_buffer, 0xab, sizeof(expected_buffer));
1232  Seq(400, 2, expected_buffer);
1233  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1234
1235  // Discard the remaining element.
1236  num_bytes = 1u * sizeof(int32_t);
1237  EXPECT_EQ(MOJO_RESULT_OK,
1238            dp->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
1239  EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
1240
1241  // Empty again.
1242  num_bytes = ~0u;
1243  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1244  EXPECT_EQ(0u, num_bytes);
1245
1246  dp->ConsumerClose();
1247}
1248
1249TEST(LocalDataPipeTest, AllOrNoneMayDiscard) {
1250  const MojoCreateDataPipeOptions options = {
1251      kSizeOfOptions,                                  // |struct_size|.
1252      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD,  // |flags|.
1253      static_cast<uint32_t>(sizeof(int32_t)),          // |element_num_bytes|.
1254      10 * sizeof(int32_t)                             // |capacity_num_bytes|.
1255  };
1256  MojoCreateDataPipeOptions validated_options = {0};
1257  EXPECT_EQ(MOJO_RESULT_OK,
1258            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
1259                                            &validated_options));
1260
1261  scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
1262
1263  // Try writing way too much.
1264  uint32_t num_bytes = 20u * sizeof(int32_t);
1265  int32_t buffer[100];
1266  Seq(0, arraysize(buffer), buffer);
1267  EXPECT_EQ(
1268      MOJO_RESULT_OUT_OF_RANGE,
1269      dp->ProducerWriteData(
1270          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true));
1271
1272  // Write some stuff.
1273  num_bytes = 5u * sizeof(int32_t);
1274  Seq(100, arraysize(buffer), buffer);
1275  EXPECT_EQ(
1276      MOJO_RESULT_OK,
1277      dp->ProducerWriteData(
1278          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true));
1279  EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
1280
1281  // Write lots of stuff (discarding all but "104").
1282  num_bytes = 9u * sizeof(int32_t);
1283  Seq(200, arraysize(buffer), buffer);
1284  EXPECT_EQ(
1285      MOJO_RESULT_OK,
1286      dp->ProducerWriteData(
1287          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true));
1288  EXPECT_EQ(9u * sizeof(int32_t), num_bytes);
1289
1290  // Read one.
1291  num_bytes = 1u * sizeof(int32_t);
1292  memset(buffer, 0xab, sizeof(buffer));
1293  EXPECT_EQ(MOJO_RESULT_OK,
1294            dp->ConsumerReadData(
1295                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
1296  EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
1297  int32_t expected_buffer[100];
1298  memset(expected_buffer, 0xab, sizeof(expected_buffer));
1299  expected_buffer[0] = 104;
1300  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1301
1302  // Try reading too many.
1303  num_bytes = 10u * sizeof(int32_t);
1304  memset(buffer, 0xab, sizeof(buffer));
1305  EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
1306            dp->ConsumerReadData(
1307                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
1308  memset(expected_buffer, 0xab, sizeof(expected_buffer));
1309  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1310
1311  // Try discarding too many.
1312  num_bytes = 10u * sizeof(int32_t);
1313  EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
1314            dp->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
1315
1316  // Discard a bunch.
1317  num_bytes = 4u * sizeof(int32_t);
1318  EXPECT_EQ(MOJO_RESULT_OK,
1319            dp->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
1320
1321  // Half full.
1322  num_bytes = 0u;
1323  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1324  EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
1325
1326  // Write as much as possible.
1327  num_bytes = 10u * sizeof(int32_t);
1328  Seq(300, arraysize(buffer), buffer);
1329  EXPECT_EQ(
1330      MOJO_RESULT_OK,
1331      dp->ProducerWriteData(
1332          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true));
1333  EXPECT_EQ(10u * sizeof(int32_t), num_bytes);
1334
1335  // Read everything.
1336  num_bytes = 10u * sizeof(int32_t);
1337  memset(buffer, 0xab, sizeof(buffer));
1338  EXPECT_EQ(MOJO_RESULT_OK,
1339            dp->ConsumerReadData(
1340                UserPointer<void>(buffer), MakeUserPointer(&num_bytes), true));
1341  memset(expected_buffer, 0xab, sizeof(expected_buffer));
1342  EXPECT_EQ(10u * sizeof(int32_t), num_bytes);
1343  Seq(300, 10, expected_buffer);
1344  EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
1345
1346  // Note: All-or-none two-phase writes on a "may discard" data pipe are tested
1347  // in LocalDataPipeTest.MayDiscard.
1348
1349  dp->ProducerClose();
1350  dp->ConsumerClose();
1351}
1352
1353TEST(LocalDataPipeTest, TwoPhaseAllOrNone) {
1354  const MojoCreateDataPipeOptions options = {
1355      kSizeOfOptions,                           // |struct_size|.
1356      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1357      static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
1358      10 * sizeof(int32_t)                      // |capacity_num_bytes|.
1359  };
1360  MojoCreateDataPipeOptions validated_options = {0};
1361  EXPECT_EQ(MOJO_RESULT_OK,
1362            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
1363                                            &validated_options));
1364
1365  scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
1366
1367  // Try writing way too much (two-phase).
1368  uint32_t num_bytes = 20u * sizeof(int32_t);
1369  void* write_ptr = nullptr;
1370  EXPECT_EQ(
1371      MOJO_RESULT_OUT_OF_RANGE,
1372      dp->ProducerBeginWriteData(
1373          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), true));
1374
1375  // Try writing an amount which isn't a multiple of the element size
1376  // (two-phase).
1377  static_assert(sizeof(int32_t) > 1u, "Wow! int32_t's have size 1");
1378  num_bytes = 1u;
1379  write_ptr = nullptr;
1380  EXPECT_EQ(
1381      MOJO_RESULT_INVALID_ARGUMENT,
1382      dp->ProducerBeginWriteData(
1383          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), true));
1384
1385  // Try reading way too much (two-phase).
1386  num_bytes = 20u * sizeof(int32_t);
1387  const void* read_ptr = nullptr;
1388  EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
1389            dp->ConsumerBeginReadData(
1390                MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
1391
1392  // Write half (two-phase).
1393  num_bytes = 5u * sizeof(int32_t);
1394  write_ptr = nullptr;
1395  EXPECT_EQ(
1396      MOJO_RESULT_OK,
1397      dp->ProducerBeginWriteData(
1398          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), true));
1399  // May provide more space than requested.
1400  EXPECT_GE(num_bytes, 5u * sizeof(int32_t));
1401  EXPECT_TRUE(write_ptr);
1402  Seq(0, 5, static_cast<int32_t*>(write_ptr));
1403  EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(5u * sizeof(int32_t)));
1404
1405  // Try reading an amount which isn't a multiple of the element size
1406  // (two-phase).
1407  num_bytes = 1u;
1408  read_ptr = nullptr;
1409  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1410            dp->ConsumerBeginReadData(
1411                MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
1412
1413  // Read one (two-phase).
1414  num_bytes = 1u * sizeof(int32_t);
1415  read_ptr = nullptr;
1416  EXPECT_EQ(MOJO_RESULT_OK,
1417            dp->ConsumerBeginReadData(
1418                MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
1419  EXPECT_GE(num_bytes, 1u * sizeof(int32_t));
1420  EXPECT_EQ(0, static_cast<const int32_t*>(read_ptr)[0]);
1421  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(1u * sizeof(int32_t)));
1422
1423  // We should have four left, leaving room for six.
1424  num_bytes = 0u;
1425  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1426  EXPECT_EQ(4u * sizeof(int32_t), num_bytes);
1427
1428  // Assuming a tight circular buffer of the specified capacity, we can't do a
1429  // two-phase write of six now.
1430  num_bytes = 6u * sizeof(int32_t);
1431  write_ptr = nullptr;
1432  EXPECT_EQ(
1433      MOJO_RESULT_OUT_OF_RANGE,
1434      dp->ProducerBeginWriteData(
1435          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), true));
1436
1437  // Write six elements (simple), filling the buffer.
1438  num_bytes = 6u * sizeof(int32_t);
1439  int32_t buffer[100];
1440  Seq(100, 6, buffer);
1441  EXPECT_EQ(
1442      MOJO_RESULT_OK,
1443      dp->ProducerWriteData(
1444          UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true));
1445  EXPECT_EQ(6u * sizeof(int32_t), num_bytes);
1446
1447  // We have ten.
1448  num_bytes = 0u;
1449  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1450  EXPECT_EQ(10u * sizeof(int32_t), num_bytes);
1451
1452  // But a two-phase read of ten should fail.
1453  num_bytes = 10u * sizeof(int32_t);
1454  read_ptr = nullptr;
1455  EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
1456            dp->ConsumerBeginReadData(
1457                MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
1458
1459  // Close the producer.
1460  dp->ProducerClose();
1461
1462  // A two-phase read of nine should work.
1463  num_bytes = 9u * sizeof(int32_t);
1464  read_ptr = nullptr;
1465  EXPECT_EQ(MOJO_RESULT_OK,
1466            dp->ConsumerBeginReadData(
1467                MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
1468  EXPECT_GE(num_bytes, 9u * sizeof(int32_t));
1469  EXPECT_EQ(1, static_cast<const int32_t*>(read_ptr)[0]);
1470  EXPECT_EQ(2, static_cast<const int32_t*>(read_ptr)[1]);
1471  EXPECT_EQ(3, static_cast<const int32_t*>(read_ptr)[2]);
1472  EXPECT_EQ(4, static_cast<const int32_t*>(read_ptr)[3]);
1473  EXPECT_EQ(100, static_cast<const int32_t*>(read_ptr)[4]);
1474  EXPECT_EQ(101, static_cast<const int32_t*>(read_ptr)[5]);
1475  EXPECT_EQ(102, static_cast<const int32_t*>(read_ptr)[6]);
1476  EXPECT_EQ(103, static_cast<const int32_t*>(read_ptr)[7]);
1477  EXPECT_EQ(104, static_cast<const int32_t*>(read_ptr)[8]);
1478  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(9u * sizeof(int32_t)));
1479
1480  // A two-phase read of two should fail, with "failed precondition".
1481  num_bytes = 2u * sizeof(int32_t);
1482  read_ptr = nullptr;
1483  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1484            dp->ConsumerBeginReadData(
1485                MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
1486
1487  dp->ConsumerClose();
1488}
1489
1490// Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
1491// respectively, as much as possible, even if it has to "wrap around" the
1492// internal circular buffer. (Note that the two-phase write and read do not do
1493// this.)
1494TEST(LocalDataPipeTest, WrapAround) {
1495  unsigned char test_data[1000];
1496  for (size_t i = 0; i < arraysize(test_data); i++)
1497    test_data[i] = static_cast<unsigned char>(i);
1498
1499  const MojoCreateDataPipeOptions options = {
1500      kSizeOfOptions,                           // |struct_size|.
1501      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1502      1u,                                       // |element_num_bytes|.
1503      100u                                      // |capacity_num_bytes|.
1504  };
1505  MojoCreateDataPipeOptions validated_options = {0};
1506  EXPECT_EQ(MOJO_RESULT_OK,
1507            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
1508                                            &validated_options));
1509  // This test won't be valid if |ValidateCreateOptions()| decides to give the
1510  // pipe more space.
1511  ASSERT_EQ(100u, validated_options.capacity_num_bytes);
1512
1513  scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
1514
1515  // Write 20 bytes.
1516  uint32_t num_bytes = 20u;
1517  EXPECT_EQ(MOJO_RESULT_OK,
1518            dp->ProducerWriteData(UserPointer<const void>(&test_data[0]),
1519                                  MakeUserPointer(&num_bytes),
1520                                  false));
1521  EXPECT_EQ(20u, num_bytes);
1522
1523  // Read 10 bytes.
1524  unsigned char read_buffer[1000] = {0};
1525  num_bytes = 10u;
1526  EXPECT_EQ(
1527      MOJO_RESULT_OK,
1528      dp->ConsumerReadData(
1529          UserPointer<void>(read_buffer), MakeUserPointer(&num_bytes), false));
1530  EXPECT_EQ(10u, num_bytes);
1531  EXPECT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
1532
1533  // Check that a two-phase write can now only write (at most) 80 bytes. (This
1534  // checks an implementation detail; this behavior is not guaranteed, but we
1535  // need it for this test.)
1536  void* write_buffer_ptr = nullptr;
1537  num_bytes = 0u;
1538  EXPECT_EQ(MOJO_RESULT_OK,
1539            dp->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
1540                                       MakeUserPointer(&num_bytes),
1541                                       false));
1542  EXPECT_TRUE(write_buffer_ptr);
1543  EXPECT_EQ(80u, num_bytes);
1544  EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(0u));
1545
1546  // Write as much data as we can (using |ProducerWriteData()|). We should write
1547  // 90 bytes.
1548  num_bytes = 200u;
1549  EXPECT_EQ(MOJO_RESULT_OK,
1550            dp->ProducerWriteData(UserPointer<const void>(&test_data[20]),
1551                                  MakeUserPointer(&num_bytes),
1552                                  false));
1553  EXPECT_EQ(90u, num_bytes);
1554
1555  // Check that a two-phase read can now only read (at most) 90 bytes. (This
1556  // checks an implementation detail; this behavior is not guaranteed, but we
1557  // need it for this test.)
1558  const void* read_buffer_ptr = nullptr;
1559  num_bytes = 0u;
1560  EXPECT_EQ(MOJO_RESULT_OK,
1561            dp->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
1562                                      MakeUserPointer(&num_bytes),
1563                                      false));
1564  EXPECT_TRUE(read_buffer_ptr);
1565  EXPECT_EQ(90u, num_bytes);
1566  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(0u));
1567
1568  // Read as much as possible (using |ConsumerReadData()|). We should read 100
1569  // bytes.
1570  num_bytes =
1571      static_cast<uint32_t>(arraysize(read_buffer) * sizeof(read_buffer[0]));
1572  memset(read_buffer, 0, num_bytes);
1573  EXPECT_EQ(
1574      MOJO_RESULT_OK,
1575      dp->ConsumerReadData(
1576          UserPointer<void>(read_buffer), MakeUserPointer(&num_bytes), false));
1577  EXPECT_EQ(100u, num_bytes);
1578  EXPECT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
1579
1580  dp->ProducerClose();
1581  dp->ConsumerClose();
1582}
1583
1584// Tests the behavior of closing the producer or consumer with respect to
1585// writes and reads (simple and two-phase).
1586TEST(LocalDataPipeTest, CloseWriteRead) {
1587  const char kTestData[] = "hello world";
1588  const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1589
1590  const MojoCreateDataPipeOptions options = {
1591      kSizeOfOptions,                           // |struct_size|.
1592      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1593      1u,                                       // |element_num_bytes|.
1594      1000u                                     // |capacity_num_bytes|.
1595  };
1596  MojoCreateDataPipeOptions validated_options = {0};
1597  EXPECT_EQ(MOJO_RESULT_OK,
1598            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
1599                                            &validated_options));
1600
1601  // Close producer first, then consumer.
1602  {
1603    scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
1604
1605    // Write some data, so we'll have something to read.
1606    uint32_t num_bytes = kTestDataSize;
1607    EXPECT_EQ(MOJO_RESULT_OK,
1608              dp->ProducerWriteData(UserPointer<const void>(kTestData),
1609                                    MakeUserPointer(&num_bytes),
1610                                    false));
1611    EXPECT_EQ(kTestDataSize, num_bytes);
1612
1613    // Write it again, so we'll have something left over.
1614    num_bytes = kTestDataSize;
1615    EXPECT_EQ(MOJO_RESULT_OK,
1616              dp->ProducerWriteData(UserPointer<const void>(kTestData),
1617                                    MakeUserPointer(&num_bytes),
1618                                    false));
1619    EXPECT_EQ(kTestDataSize, num_bytes);
1620
1621    // Start two-phase write.
1622    void* write_buffer_ptr = nullptr;
1623    num_bytes = 0u;
1624    EXPECT_EQ(MOJO_RESULT_OK,
1625              dp->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
1626                                         MakeUserPointer(&num_bytes),
1627                                         false));
1628    EXPECT_TRUE(write_buffer_ptr);
1629    EXPECT_GT(num_bytes, 0u);
1630
1631    // Start two-phase read.
1632    const void* read_buffer_ptr = nullptr;
1633    num_bytes = 0u;
1634    EXPECT_EQ(MOJO_RESULT_OK,
1635              dp->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
1636                                        MakeUserPointer(&num_bytes),
1637                                        false));
1638    EXPECT_TRUE(read_buffer_ptr);
1639    EXPECT_EQ(2u * kTestDataSize, num_bytes);
1640
1641    // Close the producer.
1642    dp->ProducerClose();
1643
1644    // The consumer can finish its two-phase read.
1645    EXPECT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1646    EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(kTestDataSize));
1647
1648    // And start another.
1649    read_buffer_ptr = nullptr;
1650    num_bytes = 0u;
1651    EXPECT_EQ(MOJO_RESULT_OK,
1652              dp->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
1653                                        MakeUserPointer(&num_bytes),
1654                                        false));
1655    EXPECT_TRUE(read_buffer_ptr);
1656    EXPECT_EQ(kTestDataSize, num_bytes);
1657
1658    // Close the consumer, which cancels the two-phase read.
1659    dp->ConsumerClose();
1660  }
1661
1662  // Close consumer first, then producer.
1663  {
1664    scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
1665
1666    // Write some data, so we'll have something to read.
1667    uint32_t num_bytes = kTestDataSize;
1668    EXPECT_EQ(MOJO_RESULT_OK,
1669              dp->ProducerWriteData(UserPointer<const void>(kTestData),
1670                                    MakeUserPointer(&num_bytes),
1671                                    false));
1672    EXPECT_EQ(kTestDataSize, num_bytes);
1673
1674    // Start two-phase write.
1675    void* write_buffer_ptr = nullptr;
1676    num_bytes = 0u;
1677    EXPECT_EQ(MOJO_RESULT_OK,
1678              dp->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
1679                                         MakeUserPointer(&num_bytes),
1680                                         false));
1681    EXPECT_TRUE(write_buffer_ptr);
1682    ASSERT_GT(num_bytes, kTestDataSize);
1683
1684    // Start two-phase read.
1685    const void* read_buffer_ptr = nullptr;
1686    num_bytes = 0u;
1687    EXPECT_EQ(MOJO_RESULT_OK,
1688              dp->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
1689                                        MakeUserPointer(&num_bytes),
1690                                        false));
1691    EXPECT_TRUE(read_buffer_ptr);
1692    EXPECT_EQ(kTestDataSize, num_bytes);
1693
1694    // Close the consumer.
1695    dp->ConsumerClose();
1696
1697    // Actually write some data. (Note: Premature freeing of the buffer would
1698    // probably only be detected under ASAN or similar.)
1699    memcpy(write_buffer_ptr, kTestData, kTestDataSize);
1700    // Note: Even though the consumer has been closed, ending the two-phase
1701    // write will report success.
1702    EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData(kTestDataSize));
1703
1704    // But trying to write should result in failure.
1705    num_bytes = kTestDataSize;
1706    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1707              dp->ProducerWriteData(UserPointer<const void>(kTestData),
1708                                    MakeUserPointer(&num_bytes),
1709                                    false));
1710
1711    // As will trying to start another two-phase write.
1712    write_buffer_ptr = nullptr;
1713    num_bytes = 0u;
1714    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1715              dp->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
1716                                         MakeUserPointer(&num_bytes),
1717                                         false));
1718
1719    dp->ProducerClose();
1720  }
1721
1722  // Test closing the consumer first, then the producer, with an active
1723  // two-phase write.
1724  {
1725    scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
1726
1727    // Start two-phase write.
1728    void* write_buffer_ptr = nullptr;
1729    uint32_t num_bytes = 0u;
1730    EXPECT_EQ(MOJO_RESULT_OK,
1731              dp->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
1732                                         MakeUserPointer(&num_bytes),
1733                                         false));
1734    EXPECT_TRUE(write_buffer_ptr);
1735    ASSERT_GT(num_bytes, kTestDataSize);
1736
1737    dp->ConsumerClose();
1738    dp->ProducerClose();
1739  }
1740
1741  // Test closing the producer and then trying to read (with no data).
1742  {
1743    scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
1744
1745    // Write some data, so we'll have something to read.
1746    uint32_t num_bytes = kTestDataSize;
1747    EXPECT_EQ(MOJO_RESULT_OK,
1748              dp->ProducerWriteData(UserPointer<const void>(kTestData),
1749                                    MakeUserPointer(&num_bytes),
1750                                    false));
1751    EXPECT_EQ(kTestDataSize, num_bytes);
1752
1753    // Close the producer.
1754    dp->ProducerClose();
1755
1756    // Read that data.
1757    char buffer[1000];
1758    num_bytes = static_cast<uint32_t>(sizeof(buffer));
1759    EXPECT_EQ(
1760        MOJO_RESULT_OK,
1761        dp->ConsumerReadData(
1762            UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
1763    EXPECT_EQ(kTestDataSize, num_bytes);
1764    EXPECT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1765
1766    // A second read should fail.
1767    num_bytes = static_cast<uint32_t>(sizeof(buffer));
1768    EXPECT_EQ(
1769        MOJO_RESULT_FAILED_PRECONDITION,
1770        dp->ConsumerReadData(
1771            UserPointer<void>(buffer), MakeUserPointer(&num_bytes), false));
1772
1773    // A two-phase read should also fail.
1774    const void* read_buffer_ptr = nullptr;
1775    num_bytes = 0u;
1776    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1777              dp->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
1778                                        MakeUserPointer(&num_bytes),
1779                                        false));
1780
1781    // Ditto for discard.
1782    num_bytes = 10u;
1783    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1784              dp->ConsumerDiscardData(MakeUserPointer(&num_bytes), false));
1785
1786    dp->ConsumerClose();
1787  }
1788}
1789
1790TEST(LocalDataPipeTest, TwoPhaseMoreInvalidArguments) {
1791  const MojoCreateDataPipeOptions options = {
1792      kSizeOfOptions,                           // |struct_size|.
1793      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1794      static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
1795      10 * sizeof(int32_t)                      // |capacity_num_bytes|.
1796  };
1797  MojoCreateDataPipeOptions validated_options = {0};
1798  EXPECT_EQ(MOJO_RESULT_OK,
1799            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
1800                                            &validated_options));
1801
1802  scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
1803
1804  // No data.
1805  uint32_t num_bytes = 1000u;
1806  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1807  EXPECT_EQ(0u, num_bytes);
1808
1809  // Try "ending" a two-phase write when one isn't active.
1810  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1811            dp->ProducerEndWriteData(1u * sizeof(int32_t)));
1812
1813  // Still no data.
1814  num_bytes = 1000u;
1815  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1816  EXPECT_EQ(0u, num_bytes);
1817
1818  // Try ending a two-phase write with an invalid amount (too much).
1819  num_bytes = 0u;
1820  void* write_ptr = nullptr;
1821  EXPECT_EQ(
1822      MOJO_RESULT_OK,
1823      dp->ProducerBeginWriteData(
1824          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), false));
1825  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1826            dp->ProducerEndWriteData(num_bytes +
1827                                     static_cast<uint32_t>(sizeof(int32_t))));
1828
1829  // But the two-phase write still ended.
1830  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ProducerEndWriteData(0u));
1831
1832  // Still no data.
1833  num_bytes = 1000u;
1834  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1835  EXPECT_EQ(0u, num_bytes);
1836
1837  // Try ending a two-phase write with an invalid amount (not a multiple of the
1838  // element size).
1839  num_bytes = 0u;
1840  write_ptr = nullptr;
1841  EXPECT_EQ(
1842      MOJO_RESULT_OK,
1843      dp->ProducerBeginWriteData(
1844          MakeUserPointer(&write_ptr), MakeUserPointer(&num_bytes), false));
1845  EXPECT_GE(num_bytes, 1u);
1846  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, dp->ProducerEndWriteData(1u));
1847
1848  // But the two-phase write still ended.
1849  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, dp->ProducerEndWriteData(0u));
1850
1851  // Still no data.
1852  num_bytes = 1000u;
1853  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1854  EXPECT_EQ(0u, num_bytes);
1855
1856  // Now write some data, so we'll be able to try reading.
1857  int32_t element = 123;
1858  num_bytes = 1u * sizeof(int32_t);
1859  EXPECT_EQ(MOJO_RESULT_OK,
1860            dp->ProducerWriteData(UserPointer<const void>(&element),
1861                                  MakeUserPointer(&num_bytes),
1862                                  false));
1863
1864  // One element available.
1865  num_bytes = 0u;
1866  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1867  EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
1868
1869  // Try "ending" a two-phase read when one isn't active.
1870  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1871            dp->ConsumerEndReadData(1u * sizeof(int32_t)));
1872
1873  // Still one element available.
1874  num_bytes = 0u;
1875  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1876  EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
1877
1878  // Try ending a two-phase read with an invalid amount (too much).
1879  num_bytes = 0u;
1880  const void* read_ptr = nullptr;
1881  EXPECT_EQ(
1882      MOJO_RESULT_OK,
1883      dp->ConsumerBeginReadData(
1884          MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), false));
1885  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1886            dp->ConsumerEndReadData(num_bytes +
1887                                    static_cast<uint32_t>(sizeof(int32_t))));
1888
1889  // Still one element available.
1890  num_bytes = 0u;
1891  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1892  EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
1893
1894  // Try ending a two-phase read with an invalid amount (not a multiple of the
1895  // element size).
1896  num_bytes = 0u;
1897  read_ptr = nullptr;
1898  EXPECT_EQ(
1899      MOJO_RESULT_OK,
1900      dp->ConsumerBeginReadData(
1901          MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), false));
1902  EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
1903  EXPECT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
1904  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, dp->ConsumerEndReadData(1u));
1905
1906  // Still one element available.
1907  num_bytes = 0u;
1908  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerQueryData(MakeUserPointer(&num_bytes)));
1909  EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
1910
1911  dp->ProducerClose();
1912  dp->ConsumerClose();
1913}
1914
1915// Tests that even with "may discard", the data won't change under a two-phase
1916// read.
1917// TODO(vtl): crbug.com/348644: We currently don't pass this. (There are two
1918// related issues: First, we don't recognize that the data given to
1919// |ConsumerBeginReadData()| isn't discardable until |ConsumerEndReadData()|,
1920// and thus we erroneously allow |ProducerWriteData()| to succeed. Second, the
1921// |ProducerWriteData()| then changes the data underneath the two-phase read.)
1922TEST(LocalDataPipeTest, DISABLED_MayDiscardTwoPhaseConsistent) {
1923  const MojoCreateDataPipeOptions options = {
1924      kSizeOfOptions,                                  // |struct_size|.
1925      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD,  // |flags|.
1926      1,                                               // |element_num_bytes|.
1927      2                                                // |capacity_num_bytes|.
1928  };
1929  MojoCreateDataPipeOptions validated_options = {0};
1930  EXPECT_EQ(MOJO_RESULT_OK,
1931            DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
1932                                            &validated_options));
1933
1934  scoped_refptr<LocalDataPipe> dp(new LocalDataPipe(validated_options));
1935
1936  // Write some elements.
1937  char elements[2] = {'a', 'b'};
1938  uint32_t num_bytes = 2u;
1939  EXPECT_EQ(MOJO_RESULT_OK,
1940            dp->ProducerWriteData(UserPointer<const void>(elements),
1941                                  MakeUserPointer(&num_bytes),
1942                                  false));
1943  EXPECT_EQ(2u, num_bytes);
1944
1945  // Begin reading.
1946  const void* read_ptr = nullptr;
1947  num_bytes = 2u;
1948  EXPECT_EQ(
1949      MOJO_RESULT_OK,
1950      dp->ConsumerBeginReadData(
1951          MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), false));
1952  EXPECT_EQ(2u, num_bytes);
1953  EXPECT_EQ('a', static_cast<const char*>(read_ptr)[0]);
1954  EXPECT_EQ('b', static_cast<const char*>(read_ptr)[1]);
1955
1956  // Try to write some more. But nothing should be discardable right now.
1957  elements[0] = 'x';
1958  elements[1] = 'y';
1959  num_bytes = 2u;
1960  // TODO(vtl): This should be:
1961  //  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
1962  //            dp->ProducerWriteData(elements, &num_bytes, false));
1963  // but we incorrectly think that the bytes being read are discardable. Letting
1964  // this through reveals the significant consequence.
1965  EXPECT_EQ(MOJO_RESULT_OK,
1966            dp->ProducerWriteData(UserPointer<const void>(elements),
1967                                  MakeUserPointer(&num_bytes),
1968                                  false));
1969
1970  // Check that our read buffer hasn't changed underneath us.
1971  EXPECT_EQ('a', static_cast<const char*>(read_ptr)[0]);
1972  EXPECT_EQ('b', static_cast<const char*>(read_ptr)[1]);
1973
1974  // End reading.
1975  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(2u));
1976
1977  // Now writing should succeed.
1978  EXPECT_EQ(MOJO_RESULT_OK,
1979            dp->ProducerWriteData(UserPointer<const void>(elements),
1980                                  MakeUserPointer(&num_bytes),
1981                                  false));
1982
1983  // And if we read, we should get the new values.
1984  read_ptr = nullptr;
1985  num_bytes = 2u;
1986  EXPECT_EQ(
1987      MOJO_RESULT_OK,
1988      dp->ConsumerBeginReadData(
1989          MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), false));
1990  EXPECT_EQ(2u, num_bytes);
1991  EXPECT_EQ('x', static_cast<const char*>(read_ptr)[0]);
1992  EXPECT_EQ('y', static_cast<const char*>(read_ptr)[1]);
1993
1994  // End reading.
1995  EXPECT_EQ(MOJO_RESULT_OK, dp->ConsumerEndReadData(2u));
1996
1997  dp->ProducerClose();
1998  dp->ConsumerClose();
1999}
2000
2001}  // namespace
2002}  // namespace system
2003}  // namespace mojo
2004