1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/data_pipe.h"
6
7#include <string.h>
8
9#include <algorithm>
10#include <limits>
11
12#include "base/compiler_specific.h"
13#include "base/logging.h"
14#include "mojo/system/constants.h"
15#include "mojo/system/memory.h"
16#include "mojo/system/options_validation.h"
17#include "mojo/system/waiter_list.h"
18
19namespace mojo {
20namespace system {
21
22// static
23const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = {
24    static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
25    MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1u,
26    static_cast<uint32_t>(kDefaultDataPipeCapacityBytes)};
27
28// static
29MojoResult DataPipe::ValidateCreateOptions(
30    UserPointer<const MojoCreateDataPipeOptions> in_options,
31    MojoCreateDataPipeOptions* out_options) {
32  const MojoCreateDataPipeOptionsFlags kKnownFlags =
33      MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD;
34
35  *out_options = kDefaultCreateOptions;
36  if (in_options.IsNull())
37    return MOJO_RESULT_OK;
38
39  UserOptionsReader<MojoCreateDataPipeOptions> reader(in_options);
40  if (!reader.is_valid())
41    return MOJO_RESULT_INVALID_ARGUMENT;
42
43  if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, flags, reader))
44    return MOJO_RESULT_OK;
45  if ((reader.options().flags & ~kKnownFlags))
46    return MOJO_RESULT_UNIMPLEMENTED;
47  out_options->flags = reader.options().flags;
48
49  // Checks for fields beyond |flags|:
50
51  if (!OPTIONS_STRUCT_HAS_MEMBER(
52          MojoCreateDataPipeOptions, element_num_bytes, reader))
53    return MOJO_RESULT_OK;
54  if (reader.options().element_num_bytes == 0)
55    return MOJO_RESULT_INVALID_ARGUMENT;
56  out_options->element_num_bytes = reader.options().element_num_bytes;
57
58  if (!OPTIONS_STRUCT_HAS_MEMBER(
59          MojoCreateDataPipeOptions, capacity_num_bytes, reader) ||
60      reader.options().capacity_num_bytes == 0) {
61    // Round the default capacity down to a multiple of the element size (but at
62    // least one element).
63    out_options->capacity_num_bytes =
64        std::max(static_cast<uint32_t>(kDefaultDataPipeCapacityBytes -
65                                       (kDefaultDataPipeCapacityBytes %
66                                        out_options->element_num_bytes)),
67                 out_options->element_num_bytes);
68    return MOJO_RESULT_OK;
69  }
70  if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0)
71    return MOJO_RESULT_INVALID_ARGUMENT;
72  if (reader.options().capacity_num_bytes > kMaxDataPipeCapacityBytes)
73    return MOJO_RESULT_RESOURCE_EXHAUSTED;
74  out_options->capacity_num_bytes = reader.options().capacity_num_bytes;
75
76  return MOJO_RESULT_OK;
77}
78
79void DataPipe::ProducerCancelAllWaiters() {
80  base::AutoLock locker(lock_);
81  DCHECK(has_local_producer_no_lock());
82  producer_waiter_list_->CancelAllWaiters();
83}
84
85void DataPipe::ProducerClose() {
86  base::AutoLock locker(lock_);
87  DCHECK(producer_open_);
88  producer_open_ = false;
89  DCHECK(has_local_producer_no_lock());
90  producer_waiter_list_.reset();
91  // Not a bug, except possibly in "user" code.
92  DVLOG_IF(2, producer_in_two_phase_write_no_lock())
93      << "Producer closed with active two-phase write";
94  producer_two_phase_max_num_bytes_written_ = 0;
95  ProducerCloseImplNoLock();
96  AwakeConsumerWaitersForStateChangeNoLock(
97      ConsumerGetHandleSignalsStateImplNoLock());
98}
99
100MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements,
101                                       UserPointer<uint32_t> num_bytes,
102                                       bool all_or_none) {
103  base::AutoLock locker(lock_);
104  DCHECK(has_local_producer_no_lock());
105
106  if (producer_in_two_phase_write_no_lock())
107    return MOJO_RESULT_BUSY;
108  if (!consumer_open_no_lock())
109    return MOJO_RESULT_FAILED_PRECONDITION;
110
111  // Returning "busy" takes priority over "invalid argument".
112  uint32_t max_num_bytes_to_write = num_bytes.Get();
113  if (max_num_bytes_to_write % element_num_bytes_ != 0)
114    return MOJO_RESULT_INVALID_ARGUMENT;
115
116  if (max_num_bytes_to_write == 0)
117    return MOJO_RESULT_OK;  // Nothing to do.
118
119  uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0;
120
121  HandleSignalsState old_consumer_state =
122      ConsumerGetHandleSignalsStateImplNoLock();
123  MojoResult rv = ProducerWriteDataImplNoLock(
124      elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write);
125  HandleSignalsState new_consumer_state =
126      ConsumerGetHandleSignalsStateImplNoLock();
127  if (!new_consumer_state.equals(old_consumer_state))
128    AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
129  return rv;
130}
131
132MojoResult DataPipe::ProducerBeginWriteData(
133    UserPointer<void*> buffer,
134    UserPointer<uint32_t> buffer_num_bytes,
135    bool all_or_none) {
136  base::AutoLock locker(lock_);
137  DCHECK(has_local_producer_no_lock());
138
139  if (producer_in_two_phase_write_no_lock())
140    return MOJO_RESULT_BUSY;
141  if (!consumer_open_no_lock())
142    return MOJO_RESULT_FAILED_PRECONDITION;
143
144  uint32_t min_num_bytes_to_write = 0;
145  if (all_or_none) {
146    min_num_bytes_to_write = buffer_num_bytes.Get();
147    if (min_num_bytes_to_write % element_num_bytes_ != 0)
148      return MOJO_RESULT_INVALID_ARGUMENT;
149  }
150
151  MojoResult rv = ProducerBeginWriteDataImplNoLock(
152      buffer, buffer_num_bytes, min_num_bytes_to_write);
153  if (rv != MOJO_RESULT_OK)
154    return rv;
155  // Note: No need to awake producer waiters, even though we're going from
156  // writable to non-writable (since you can't wait on non-writability).
157  // Similarly, though this may have discarded data (in "may discard" mode),
158  // making it non-readable, there's still no need to awake consumer waiters.
159  DCHECK(producer_in_two_phase_write_no_lock());
160  return MOJO_RESULT_OK;
161}
162
163MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
164  base::AutoLock locker(lock_);
165  DCHECK(has_local_producer_no_lock());
166
167  if (!producer_in_two_phase_write_no_lock())
168    return MOJO_RESULT_FAILED_PRECONDITION;
169  // Note: Allow successful completion of the two-phase write even if the
170  // consumer has been closed.
171
172  HandleSignalsState old_consumer_state =
173      ConsumerGetHandleSignalsStateImplNoLock();
174  MojoResult rv;
175  if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
176      num_bytes_written % element_num_bytes_ != 0) {
177    rv = MOJO_RESULT_INVALID_ARGUMENT;
178    producer_two_phase_max_num_bytes_written_ = 0;
179  } else {
180    rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
181  }
182  // Two-phase write ended even on failure.
183  DCHECK(!producer_in_two_phase_write_no_lock());
184  // If we're now writable, we *became* writable (since we weren't writable
185  // during the two-phase write), so awake producer waiters.
186  HandleSignalsState new_producer_state =
187      ProducerGetHandleSignalsStateImplNoLock();
188  if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
189    AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
190  HandleSignalsState new_consumer_state =
191      ConsumerGetHandleSignalsStateImplNoLock();
192  if (!new_consumer_state.equals(old_consumer_state))
193    AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
194  return rv;
195}
196
197HandleSignalsState DataPipe::ProducerGetHandleSignalsState() {
198  base::AutoLock locker(lock_);
199  DCHECK(has_local_producer_no_lock());
200  return ProducerGetHandleSignalsStateImplNoLock();
201}
202
203MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
204                                       MojoHandleSignals signals,
205                                       uint32_t context,
206                                       HandleSignalsState* signals_state) {
207  base::AutoLock locker(lock_);
208  DCHECK(has_local_producer_no_lock());
209
210  HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock();
211  if (producer_state.satisfies(signals)) {
212    if (signals_state)
213      *signals_state = producer_state;
214    return MOJO_RESULT_ALREADY_EXISTS;
215  }
216  if (!producer_state.can_satisfy(signals)) {
217    if (signals_state)
218      *signals_state = producer_state;
219    return MOJO_RESULT_FAILED_PRECONDITION;
220  }
221
222  producer_waiter_list_->AddWaiter(waiter, signals, context);
223  return MOJO_RESULT_OK;
224}
225
226void DataPipe::ProducerRemoveWaiter(Waiter* waiter,
227                                    HandleSignalsState* signals_state) {
228  base::AutoLock locker(lock_);
229  DCHECK(has_local_producer_no_lock());
230  producer_waiter_list_->RemoveWaiter(waiter);
231  if (signals_state)
232    *signals_state = ProducerGetHandleSignalsStateImplNoLock();
233}
234
235bool DataPipe::ProducerIsBusy() const {
236  base::AutoLock locker(lock_);
237  return producer_in_two_phase_write_no_lock();
238}
239
240void DataPipe::ConsumerCancelAllWaiters() {
241  base::AutoLock locker(lock_);
242  DCHECK(has_local_consumer_no_lock());
243  consumer_waiter_list_->CancelAllWaiters();
244}
245
246void DataPipe::ConsumerClose() {
247  base::AutoLock locker(lock_);
248  DCHECK(consumer_open_);
249  consumer_open_ = false;
250  DCHECK(has_local_consumer_no_lock());
251  consumer_waiter_list_.reset();
252  // Not a bug, except possibly in "user" code.
253  DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
254      << "Consumer closed with active two-phase read";
255  consumer_two_phase_max_num_bytes_read_ = 0;
256  ConsumerCloseImplNoLock();
257  AwakeProducerWaitersForStateChangeNoLock(
258      ProducerGetHandleSignalsStateImplNoLock());
259}
260
261MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements,
262                                      UserPointer<uint32_t> num_bytes,
263                                      bool all_or_none) {
264  base::AutoLock locker(lock_);
265  DCHECK(has_local_consumer_no_lock());
266
267  if (consumer_in_two_phase_read_no_lock())
268    return MOJO_RESULT_BUSY;
269
270  uint32_t max_num_bytes_to_read = num_bytes.Get();
271  if (max_num_bytes_to_read % element_num_bytes_ != 0)
272    return MOJO_RESULT_INVALID_ARGUMENT;
273
274  if (max_num_bytes_to_read == 0)
275    return MOJO_RESULT_OK;  // Nothing to do.
276
277  uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
278
279  HandleSignalsState old_producer_state =
280      ProducerGetHandleSignalsStateImplNoLock();
281  MojoResult rv = ConsumerReadDataImplNoLock(
282      elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read);
283  HandleSignalsState new_producer_state =
284      ProducerGetHandleSignalsStateImplNoLock();
285  if (!new_producer_state.equals(old_producer_state))
286    AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
287  return rv;
288}
289
290MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
291                                         bool all_or_none) {
292  base::AutoLock locker(lock_);
293  DCHECK(has_local_consumer_no_lock());
294
295  if (consumer_in_two_phase_read_no_lock())
296    return MOJO_RESULT_BUSY;
297
298  uint32_t max_num_bytes_to_discard = num_bytes.Get();
299  if (max_num_bytes_to_discard % element_num_bytes_ != 0)
300    return MOJO_RESULT_INVALID_ARGUMENT;
301
302  if (max_num_bytes_to_discard == 0)
303    return MOJO_RESULT_OK;  // Nothing to do.
304
305  uint32_t min_num_bytes_to_discard =
306      all_or_none ? max_num_bytes_to_discard : 0;
307
308  HandleSignalsState old_producer_state =
309      ProducerGetHandleSignalsStateImplNoLock();
310  MojoResult rv = ConsumerDiscardDataImplNoLock(
311      num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard);
312  HandleSignalsState new_producer_state =
313      ProducerGetHandleSignalsStateImplNoLock();
314  if (!new_producer_state.equals(old_producer_state))
315    AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
316  return rv;
317}
318
319MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
320  base::AutoLock locker(lock_);
321  DCHECK(has_local_consumer_no_lock());
322
323  if (consumer_in_two_phase_read_no_lock())
324    return MOJO_RESULT_BUSY;
325
326  // Note: Don't need to validate |*num_bytes| for query.
327  return ConsumerQueryDataImplNoLock(num_bytes);
328}
329
330MojoResult DataPipe::ConsumerBeginReadData(
331    UserPointer<const void*> buffer,
332    UserPointer<uint32_t> buffer_num_bytes,
333    bool all_or_none) {
334  base::AutoLock locker(lock_);
335  DCHECK(has_local_consumer_no_lock());
336
337  if (consumer_in_two_phase_read_no_lock())
338    return MOJO_RESULT_BUSY;
339
340  uint32_t min_num_bytes_to_read = 0;
341  if (all_or_none) {
342    min_num_bytes_to_read = buffer_num_bytes.Get();
343    if (min_num_bytes_to_read % element_num_bytes_ != 0)
344      return MOJO_RESULT_INVALID_ARGUMENT;
345  }
346
347  MojoResult rv = ConsumerBeginReadDataImplNoLock(
348      buffer, buffer_num_bytes, min_num_bytes_to_read);
349  if (rv != MOJO_RESULT_OK)
350    return rv;
351  DCHECK(consumer_in_two_phase_read_no_lock());
352  return MOJO_RESULT_OK;
353}
354
355MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
356  base::AutoLock locker(lock_);
357  DCHECK(has_local_consumer_no_lock());
358
359  if (!consumer_in_two_phase_read_no_lock())
360    return MOJO_RESULT_FAILED_PRECONDITION;
361
362  HandleSignalsState old_producer_state =
363      ProducerGetHandleSignalsStateImplNoLock();
364  MojoResult rv;
365  if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
366      num_bytes_read % element_num_bytes_ != 0) {
367    rv = MOJO_RESULT_INVALID_ARGUMENT;
368    consumer_two_phase_max_num_bytes_read_ = 0;
369  } else {
370    rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
371  }
372  // Two-phase read ended even on failure.
373  DCHECK(!consumer_in_two_phase_read_no_lock());
374  // If we're now readable, we *became* readable (since we weren't readable
375  // during the two-phase read), so awake consumer waiters.
376  HandleSignalsState new_consumer_state =
377      ConsumerGetHandleSignalsStateImplNoLock();
378  if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
379    AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
380  HandleSignalsState new_producer_state =
381      ProducerGetHandleSignalsStateImplNoLock();
382  if (!new_producer_state.equals(old_producer_state))
383    AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
384  return rv;
385}
386
387HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() {
388  base::AutoLock locker(lock_);
389  DCHECK(has_local_consumer_no_lock());
390  return ConsumerGetHandleSignalsStateImplNoLock();
391}
392
393MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
394                                       MojoHandleSignals signals,
395                                       uint32_t context,
396                                       HandleSignalsState* signals_state) {
397  base::AutoLock locker(lock_);
398  DCHECK(has_local_consumer_no_lock());
399
400  HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock();
401  if (consumer_state.satisfies(signals)) {
402    if (signals_state)
403      *signals_state = consumer_state;
404    return MOJO_RESULT_ALREADY_EXISTS;
405  }
406  if (!consumer_state.can_satisfy(signals)) {
407    if (signals_state)
408      *signals_state = consumer_state;
409    return MOJO_RESULT_FAILED_PRECONDITION;
410  }
411
412  consumer_waiter_list_->AddWaiter(waiter, signals, context);
413  return MOJO_RESULT_OK;
414}
415
416void DataPipe::ConsumerRemoveWaiter(Waiter* waiter,
417                                    HandleSignalsState* signals_state) {
418  base::AutoLock locker(lock_);
419  DCHECK(has_local_consumer_no_lock());
420  consumer_waiter_list_->RemoveWaiter(waiter);
421  if (signals_state)
422    *signals_state = ConsumerGetHandleSignalsStateImplNoLock();
423}
424
425bool DataPipe::ConsumerIsBusy() const {
426  base::AutoLock locker(lock_);
427  return consumer_in_two_phase_read_no_lock();
428}
429
430DataPipe::DataPipe(bool has_local_producer,
431                   bool has_local_consumer,
432                   const MojoCreateDataPipeOptions& validated_options)
433    : may_discard_((validated_options.flags &
434                    MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
435      element_num_bytes_(validated_options.element_num_bytes),
436      capacity_num_bytes_(validated_options.capacity_num_bytes),
437      producer_open_(true),
438      consumer_open_(true),
439      producer_waiter_list_(has_local_producer ? new WaiterList() : nullptr),
440      consumer_waiter_list_(has_local_consumer ? new WaiterList() : nullptr),
441      producer_two_phase_max_num_bytes_written_(0),
442      consumer_two_phase_max_num_bytes_read_(0) {
443  // Check that the passed in options actually are validated.
444  MojoCreateDataPipeOptions unused ALLOW_UNUSED = {0};
445  DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
446            MOJO_RESULT_OK);
447}
448
449DataPipe::~DataPipe() {
450  DCHECK(!producer_open_);
451  DCHECK(!consumer_open_);
452  DCHECK(!producer_waiter_list_);
453  DCHECK(!consumer_waiter_list_);
454}
455
456void DataPipe::AwakeProducerWaitersForStateChangeNoLock(
457    const HandleSignalsState& new_producer_state) {
458  lock_.AssertAcquired();
459  if (!has_local_producer_no_lock())
460    return;
461  producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state);
462}
463
464void DataPipe::AwakeConsumerWaitersForStateChangeNoLock(
465    const HandleSignalsState& new_consumer_state) {
466  lock_.AssertAcquired();
467  if (!has_local_consumer_no_lock())
468    return;
469  consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state);
470}
471
472}  // namespace system
473}  // namespace mojo
474