1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6
7#include <stddef.h>
8#include <stdint.h>
9
10#include <algorithm>
11#include <limits>
12#include <utility>
13
14#include "base/bind.h"
15#include "base/logging.h"
16#include "base/memory/ref_counted.h"
17#include "base/message_loop/message_loop.h"
18#include "mojo/edk/embedder/embedder_internal.h"
19#include "mojo/edk/embedder/platform_shared_buffer.h"
20#include "mojo/edk/system/core.h"
21#include "mojo/edk/system/data_pipe_control_message.h"
22#include "mojo/edk/system/node_controller.h"
23#include "mojo/edk/system/ports_message.h"
24#include "mojo/edk/system/request_context.h"
25#include "mojo/public/c/system/data_pipe.h"
26
27namespace mojo {
28namespace edk {
29
30namespace {
31
32const uint8_t kFlagPeerClosed = 0x01;
33
34#pragma pack(push, 1)
35
36struct SerializedState {
37  MojoCreateDataPipeOptions options;
38  uint64_t pipe_id;
39  uint32_t read_offset;
40  uint32_t bytes_available;
41  uint8_t flags;
42  char padding[7];
43};
44
45static_assert(sizeof(SerializedState) % 8 == 0,
46              "Invalid SerializedState size.");
47
48#pragma pack(pop)
49
50}  // namespace
51
52// A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a
53// reference to the dispatcher to ensure it lives as long as the observed port.
54class DataPipeConsumerDispatcher::PortObserverThunk
55    : public NodeController::PortObserver {
56 public:
57  explicit PortObserverThunk(
58      scoped_refptr<DataPipeConsumerDispatcher> dispatcher)
59      : dispatcher_(dispatcher) {}
60
61 private:
62  ~PortObserverThunk() override {}
63
64  // NodeController::PortObserver:
65  void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
66
67  scoped_refptr<DataPipeConsumerDispatcher> dispatcher_;
68
69  DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
70};
71
72DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
73    NodeController* node_controller,
74    const ports::PortRef& control_port,
75    scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
76    const MojoCreateDataPipeOptions& options,
77    bool initialized,
78    uint64_t pipe_id)
79    : options_(options),
80      node_controller_(node_controller),
81      control_port_(control_port),
82      pipe_id_(pipe_id),
83      shared_ring_buffer_(shared_ring_buffer) {
84  if (initialized) {
85    base::AutoLock lock(lock_);
86    InitializeNoLock();
87  }
88}
89
90Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
91  return Type::DATA_PIPE_CONSUMER;
92}
93
94MojoResult DataPipeConsumerDispatcher::Close() {
95  base::AutoLock lock(lock_);
96  DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
97  return CloseNoLock();
98}
99
100
101MojoResult DataPipeConsumerDispatcher::Watch(
102    MojoHandleSignals signals,
103    const Watcher::WatchCallback& callback,
104    uintptr_t context) {
105  base::AutoLock lock(lock_);
106
107  if (is_closed_ || in_transit_)
108    return MOJO_RESULT_INVALID_ARGUMENT;
109
110  return awakable_list_.AddWatcher(
111      signals, callback, context, GetHandleSignalsStateNoLock());
112}
113
114MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
115  base::AutoLock lock(lock_);
116
117  if (is_closed_ || in_transit_)
118    return MOJO_RESULT_INVALID_ARGUMENT;
119
120  return awakable_list_.RemoveWatcher(context);
121}
122
123MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
124                                                uint32_t* num_bytes,
125                                                MojoReadDataFlags flags) {
126  base::AutoLock lock(lock_);
127  if (!shared_ring_buffer_ || in_transit_)
128    return MOJO_RESULT_INVALID_ARGUMENT;
129
130  if (in_two_phase_read_)
131    return MOJO_RESULT_BUSY;
132
133  if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
134    if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
135        (flags & MOJO_READ_DATA_FLAG_DISCARD))
136      return MOJO_RESULT_INVALID_ARGUMENT;
137    DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD));  // Handled above.
138    DVLOG_IF(2, elements)
139        << "Query mode: ignoring non-null |elements|";
140    *num_bytes = static_cast<uint32_t>(bytes_available_);
141    return MOJO_RESULT_OK;
142  }
143
144  bool discard = false;
145  if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
146    // These flags are mutally exclusive.
147    if (flags & MOJO_READ_DATA_FLAG_PEEK)
148      return MOJO_RESULT_INVALID_ARGUMENT;
149    DVLOG_IF(2, elements)
150        << "Discard mode: ignoring non-null |elements|";
151    discard = true;
152  }
153
154  uint32_t max_num_bytes_to_read = *num_bytes;
155  if (max_num_bytes_to_read % options_.element_num_bytes != 0)
156    return MOJO_RESULT_INVALID_ARGUMENT;
157
158  bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
159  uint32_t min_num_bytes_to_read =
160      all_or_none ? max_num_bytes_to_read : 0;
161
162  if (min_num_bytes_to_read > bytes_available_) {
163    return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
164                        : MOJO_RESULT_OUT_OF_RANGE;
165  }
166
167  uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
168  if (bytes_to_read == 0) {
169    return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
170                        : MOJO_RESULT_SHOULD_WAIT;
171  }
172
173  if (!discard) {
174    uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
175    CHECK(data);
176
177    uint8_t* destination = static_cast<uint8_t*>(elements);
178    CHECK(destination);
179
180    DCHECK_LE(read_offset_, options_.capacity_num_bytes);
181    uint32_t tail_bytes_to_copy =
182        std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read);
183    uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy;
184    if (tail_bytes_to_copy > 0)
185      memcpy(destination, data + read_offset_, tail_bytes_to_copy);
186    if (head_bytes_to_copy > 0)
187      memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy);
188  }
189  *num_bytes = bytes_to_read;
190
191  bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
192  if (discard || !peek) {
193    read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
194    bytes_available_ -= bytes_to_read;
195
196    base::AutoUnlock unlock(lock_);
197    NotifyRead(bytes_to_read);
198  }
199
200  return MOJO_RESULT_OK;
201}
202
203MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
204                                                     uint32_t* buffer_num_bytes,
205                                                     MojoReadDataFlags flags) {
206  base::AutoLock lock(lock_);
207  if (!shared_ring_buffer_ || in_transit_)
208    return MOJO_RESULT_INVALID_ARGUMENT;
209
210  if (in_two_phase_read_)
211    return MOJO_RESULT_BUSY;
212
213  // These flags may not be used in two-phase mode.
214  if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
215      (flags & MOJO_READ_DATA_FLAG_QUERY) ||
216      (flags & MOJO_READ_DATA_FLAG_PEEK))
217    return MOJO_RESULT_INVALID_ARGUMENT;
218
219  if (bytes_available_ == 0) {
220    return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
221                        : MOJO_RESULT_SHOULD_WAIT;
222  }
223
224  DCHECK_LT(read_offset_, options_.capacity_num_bytes);
225  uint32_t bytes_to_read = std::min(bytes_available_,
226                                    options_.capacity_num_bytes - read_offset_);
227
228  CHECK(ring_buffer_mapping_);
229  uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
230  CHECK(data);
231
232  in_two_phase_read_ = true;
233  *buffer = data + read_offset_;
234  *buffer_num_bytes = bytes_to_read;
235  two_phase_max_bytes_read_ = bytes_to_read;
236
237  return MOJO_RESULT_OK;
238}
239
240MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
241  base::AutoLock lock(lock_);
242  if (!in_two_phase_read_)
243    return MOJO_RESULT_FAILED_PRECONDITION;
244
245  if (in_transit_)
246    return MOJO_RESULT_INVALID_ARGUMENT;
247
248  CHECK(shared_ring_buffer_);
249
250  HandleSignalsState old_state = GetHandleSignalsStateNoLock();
251  MojoResult rv;
252  if (num_bytes_read > two_phase_max_bytes_read_ ||
253      num_bytes_read % options_.element_num_bytes != 0) {
254    rv = MOJO_RESULT_INVALID_ARGUMENT;
255  } else {
256    rv = MOJO_RESULT_OK;
257    read_offset_ =
258        (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
259
260    DCHECK_GE(bytes_available_, num_bytes_read);
261    bytes_available_ -= num_bytes_read;
262
263    base::AutoUnlock unlock(lock_);
264    NotifyRead(num_bytes_read);
265  }
266
267  in_two_phase_read_ = false;
268  two_phase_max_bytes_read_ = 0;
269
270  HandleSignalsState new_state = GetHandleSignalsStateNoLock();
271  if (!new_state.equals(old_state))
272    awakable_list_.AwakeForStateChange(new_state);
273
274  return rv;
275}
276
277HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
278  base::AutoLock lock(lock_);
279  return GetHandleSignalsStateNoLock();
280}
281
282MojoResult DataPipeConsumerDispatcher::AddAwakable(
283    Awakable* awakable,
284    MojoHandleSignals signals,
285    uintptr_t context,
286    HandleSignalsState* signals_state) {
287  base::AutoLock lock(lock_);
288  if (!shared_ring_buffer_ || in_transit_) {
289    if (signals_state)
290      *signals_state = HandleSignalsState();
291    return MOJO_RESULT_INVALID_ARGUMENT;
292  }
293  UpdateSignalsStateNoLock();
294  HandleSignalsState state = GetHandleSignalsStateNoLock();
295  if (state.satisfies(signals)) {
296    if (signals_state)
297      *signals_state = state;
298    return MOJO_RESULT_ALREADY_EXISTS;
299  }
300  if (!state.can_satisfy(signals)) {
301    if (signals_state)
302      *signals_state = state;
303    return MOJO_RESULT_FAILED_PRECONDITION;
304  }
305
306  awakable_list_.Add(awakable, signals, context);
307  return MOJO_RESULT_OK;
308}
309
310void DataPipeConsumerDispatcher::RemoveAwakable(
311    Awakable* awakable,
312    HandleSignalsState* signals_state) {
313  base::AutoLock lock(lock_);
314  if ((!shared_ring_buffer_ || in_transit_) && signals_state)
315    *signals_state = HandleSignalsState();
316  else if (signals_state)
317    *signals_state = GetHandleSignalsStateNoLock();
318  awakable_list_.Remove(awakable);
319}
320
321void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
322                                                uint32_t* num_ports,
323                                                uint32_t* num_handles) {
324  base::AutoLock lock(lock_);
325  DCHECK(in_transit_);
326  *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
327  *num_ports = 1;
328  *num_handles = 1;
329}
330
331bool DataPipeConsumerDispatcher::EndSerialize(
332    void* destination,
333    ports::PortName* ports,
334    PlatformHandle* platform_handles) {
335  SerializedState* state = static_cast<SerializedState*>(destination);
336  memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
337  memset(state->padding, 0, sizeof(state->padding));
338
339  base::AutoLock lock(lock_);
340  DCHECK(in_transit_);
341  state->pipe_id = pipe_id_;
342  state->read_offset = read_offset_;
343  state->bytes_available = bytes_available_;
344  state->flags = peer_closed_ ? kFlagPeerClosed : 0;
345
346  ports[0] = control_port_.name();
347
348  buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
349  platform_handles[0] = buffer_handle_for_transit_.get();
350
351  return true;
352}
353
354bool DataPipeConsumerDispatcher::BeginTransit() {
355  base::AutoLock lock(lock_);
356  if (in_transit_)
357    return false;
358  in_transit_ = !in_two_phase_read_;
359  return in_transit_;
360}
361
362void DataPipeConsumerDispatcher::CompleteTransitAndClose() {
363  node_controller_->SetPortObserver(control_port_, nullptr);
364
365  base::AutoLock lock(lock_);
366  DCHECK(in_transit_);
367  in_transit_ = false;
368  transferred_ = true;
369  ignore_result(buffer_handle_for_transit_.release());
370  CloseNoLock();
371}
372
373void DataPipeConsumerDispatcher::CancelTransit() {
374  base::AutoLock lock(lock_);
375  DCHECK(in_transit_);
376  in_transit_ = false;
377  buffer_handle_for_transit_.reset();
378  UpdateSignalsStateNoLock();
379}
380
381// static
382scoped_refptr<DataPipeConsumerDispatcher>
383DataPipeConsumerDispatcher::Deserialize(const void* data,
384                                        size_t num_bytes,
385                                        const ports::PortName* ports,
386                                        size_t num_ports,
387                                        PlatformHandle* handles,
388                                        size_t num_handles) {
389  if (num_ports != 1 || num_handles != 1 ||
390      num_bytes != sizeof(SerializedState)) {
391    return nullptr;
392  }
393
394  const SerializedState* state = static_cast<const SerializedState*>(data);
395
396  NodeController* node_controller = internal::g_core->GetNodeController();
397  ports::PortRef port;
398  if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
399    return nullptr;
400
401  PlatformHandle buffer_handle;
402  std::swap(buffer_handle, handles[0]);
403  scoped_refptr<PlatformSharedBuffer> ring_buffer =
404      PlatformSharedBuffer::CreateFromPlatformHandle(
405          state->options.capacity_num_bytes,
406          false /* read_only */,
407          ScopedPlatformHandle(buffer_handle));
408  if (!ring_buffer) {
409    DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
410    return nullptr;
411  }
412
413  scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
414      new DataPipeConsumerDispatcher(node_controller, port, ring_buffer,
415                                     state->options, false /* initialized */,
416                                     state->pipe_id);
417
418  {
419    base::AutoLock lock(dispatcher->lock_);
420    dispatcher->read_offset_ = state->read_offset;
421    dispatcher->bytes_available_ = state->bytes_available;
422    dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
423    dispatcher->InitializeNoLock();
424  }
425
426  return dispatcher;
427}
428
429DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
430  DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ &&
431         !in_transit_);
432}
433
434void DataPipeConsumerDispatcher::InitializeNoLock() {
435  lock_.AssertAcquired();
436
437  if (shared_ring_buffer_) {
438    DCHECK(!ring_buffer_mapping_);
439    ring_buffer_mapping_ =
440        shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
441    if (!ring_buffer_mapping_) {
442      DLOG(ERROR) << "Failed to map shared buffer.";
443      shared_ring_buffer_ = nullptr;
444    }
445  }
446
447  base::AutoUnlock unlock(lock_);
448  node_controller_->SetPortObserver(
449      control_port_,
450      make_scoped_refptr(new PortObserverThunk(this)));
451}
452
453MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
454  lock_.AssertAcquired();
455  if (is_closed_ || in_transit_)
456    return MOJO_RESULT_INVALID_ARGUMENT;
457  is_closed_ = true;
458  ring_buffer_mapping_.reset();
459  shared_ring_buffer_ = nullptr;
460
461  awakable_list_.CancelAll();
462  if (!transferred_) {
463    base::AutoUnlock unlock(lock_);
464    node_controller_->ClosePort(control_port_);
465  }
466
467  return MOJO_RESULT_OK;
468}
469
470HandleSignalsState
471DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
472  lock_.AssertAcquired();
473
474  HandleSignalsState rv;
475  if (shared_ring_buffer_ && bytes_available_) {
476    if (!in_two_phase_read_)
477      rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
478    rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
479  } else if (!peer_closed_ && shared_ring_buffer_) {
480    rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
481  }
482
483  if (peer_closed_)
484    rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
485  rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
486  return rv;
487}
488
489void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
490  DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: "
491           << num_bytes << " bytes read. [control_port="
492           << control_port_.name() << "]";
493
494  SendDataPipeControlMessage(node_controller_, control_port_,
495                             DataPipeCommand::DATA_WAS_READ, num_bytes);
496}
497
498void DataPipeConsumerDispatcher::OnPortStatusChanged() {
499  DCHECK(RequestContext::current());
500
501  base::AutoLock lock(lock_);
502
503  // We stop observing the control port as soon it's transferred, but this can
504  // race with events which are raised right before that happens. This is fine
505  // to ignore.
506  if (transferred_)
507    return;
508
509  DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
510
511  UpdateSignalsStateNoLock();
512}
513
514void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
515  lock_.AssertAcquired();
516
517  bool was_peer_closed = peer_closed_;
518  size_t previous_bytes_available = bytes_available_;
519
520  ports::PortStatus port_status;
521  int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
522  if (rv != ports::OK || !port_status.receiving_messages) {
523    DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure"
524             << " [control_port=" << control_port_.name() << "]";
525    peer_closed_ = true;
526  } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
527    ports::ScopedMessage message;
528    do {
529      int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
530                                                      &message);
531      if (rv != ports::OK)
532        peer_closed_ = true;
533      if (message) {
534        if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
535          peer_closed_ = true;
536          break;
537        }
538
539        const DataPipeControlMessage* m =
540            static_cast<const DataPipeControlMessage*>(
541                message->payload_bytes());
542
543        if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
544          DLOG(ERROR) << "Unexpected control message from producer.";
545          peer_closed_ = true;
546          break;
547        }
548
549        if (static_cast<size_t>(bytes_available_) + m->num_bytes >
550              options_.capacity_num_bytes) {
551          DLOG(ERROR) << "Producer claims to have written too many bytes.";
552          peer_closed_ = true;
553          break;
554        }
555
556        DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
557                 << m->num_bytes << " bytes were written. [control_port="
558                 << control_port_.name() << "]";
559
560        bytes_available_ += m->num_bytes;
561      }
562    } while (message);
563  }
564
565  if (peer_closed_ != was_peer_closed ||
566      bytes_available_ != previous_bytes_available) {
567    awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
568  }
569}
570
571}  // namespace edk
572}  // namespace mojo
573