1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6
7#include <stddef.h>
8#include <stdint.h>
9
10#include <utility>
11
12#include "base/bind.h"
13#include "base/logging.h"
14#include "base/memory/ref_counted.h"
15#include "base/message_loop/message_loop.h"
16#include "mojo/edk/embedder/embedder_internal.h"
17#include "mojo/edk/embedder/platform_shared_buffer.h"
18#include "mojo/edk/system/configuration.h"
19#include "mojo/edk/system/core.h"
20#include "mojo/edk/system/data_pipe_control_message.h"
21#include "mojo/edk/system/node_controller.h"
22#include "mojo/edk/system/ports_message.h"
23#include "mojo/edk/system/request_context.h"
24#include "mojo/public/c/system/data_pipe.h"
25
26namespace mojo {
27namespace edk {
28
29namespace {
30
31const uint8_t kFlagPeerClosed = 0x01;
32
33#pragma pack(push, 1)
34
35struct SerializedState {
36  MojoCreateDataPipeOptions options;
37  uint64_t pipe_id;
38  uint32_t write_offset;
39  uint32_t available_capacity;
40  uint8_t flags;
41  char padding[7];
42};
43
44static_assert(sizeof(SerializedState) % 8 == 0,
45              "Invalid SerializedState size.");
46
47#pragma pack(pop)
48
49}  // namespace
50
51// A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a
52// reference to the dispatcher to ensure it lives as long as the observed port.
53class DataPipeProducerDispatcher::PortObserverThunk
54    : public NodeController::PortObserver {
55 public:
56  explicit PortObserverThunk(
57      scoped_refptr<DataPipeProducerDispatcher> dispatcher)
58      : dispatcher_(dispatcher) {}
59
60 private:
61  ~PortObserverThunk() override {}
62
63  // NodeController::PortObserver:
64  void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
65
66  scoped_refptr<DataPipeProducerDispatcher> dispatcher_;
67
68  DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
69};
70
71DataPipeProducerDispatcher::DataPipeProducerDispatcher(
72    NodeController* node_controller,
73    const ports::PortRef& control_port,
74    scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
75    const MojoCreateDataPipeOptions& options,
76    bool initialized,
77    uint64_t pipe_id)
78    : options_(options),
79      node_controller_(node_controller),
80      control_port_(control_port),
81      pipe_id_(pipe_id),
82      shared_ring_buffer_(shared_ring_buffer),
83      available_capacity_(options_.capacity_num_bytes) {
84  if (initialized) {
85    base::AutoLock lock(lock_);
86    InitializeNoLock();
87  }
88}
89
90Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
91  return Type::DATA_PIPE_PRODUCER;
92}
93
94MojoResult DataPipeProducerDispatcher::Close() {
95  base::AutoLock lock(lock_);
96  DVLOG(1) << "Closing data pipe producer " << pipe_id_;
97  return CloseNoLock();
98}
99
100MojoResult DataPipeProducerDispatcher::Watch(
101    MojoHandleSignals signals,
102    const Watcher::WatchCallback& callback,
103    uintptr_t context) {
104  base::AutoLock lock(lock_);
105
106  if (is_closed_ || in_transit_)
107    return MOJO_RESULT_INVALID_ARGUMENT;
108
109  return awakable_list_.AddWatcher(
110      signals, callback, context, GetHandleSignalsStateNoLock());
111}
112
113MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
114  base::AutoLock lock(lock_);
115
116  if (is_closed_ || in_transit_)
117    return MOJO_RESULT_INVALID_ARGUMENT;
118
119  return awakable_list_.RemoveWatcher(context);
120}
121
122MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
123                                                 uint32_t* num_bytes,
124                                                 MojoWriteDataFlags flags) {
125  base::AutoLock lock(lock_);
126  if (!shared_ring_buffer_ || in_transit_)
127    return MOJO_RESULT_INVALID_ARGUMENT;
128
129  if (in_two_phase_write_)
130    return MOJO_RESULT_BUSY;
131
132  if (peer_closed_)
133    return MOJO_RESULT_FAILED_PRECONDITION;
134
135  if (*num_bytes % options_.element_num_bytes != 0)
136    return MOJO_RESULT_INVALID_ARGUMENT;
137  if (*num_bytes == 0)
138    return MOJO_RESULT_OK;  // Nothing to do.
139
140  bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
141  uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
142  if (min_num_bytes_to_write > options_.capacity_num_bytes) {
143    // Don't return "should wait" since you can't wait for a specified amount of
144    // data.
145    return MOJO_RESULT_OUT_OF_RANGE;
146  }
147
148  DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
149  uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
150  if (num_bytes_to_write == 0)
151    return MOJO_RESULT_SHOULD_WAIT;
152
153  HandleSignalsState old_state = GetHandleSignalsStateNoLock();
154
155  *num_bytes = num_bytes_to_write;
156
157  CHECK(ring_buffer_mapping_);
158  uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
159  CHECK(data);
160
161  const uint8_t* source = static_cast<const uint8_t*>(elements);
162  CHECK(source);
163
164  DCHECK_LE(write_offset_, options_.capacity_num_bytes);
165  uint32_t tail_bytes_to_write =
166      std::min(options_.capacity_num_bytes - write_offset_,
167               num_bytes_to_write);
168  uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write;
169
170  DCHECK_GT(tail_bytes_to_write, 0u);
171  memcpy(data + write_offset_, source, tail_bytes_to_write);
172  if (head_bytes_to_write > 0)
173    memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
174
175  DCHECK_LE(num_bytes_to_write, available_capacity_);
176  available_capacity_ -= num_bytes_to_write;
177  write_offset_ = (write_offset_ + num_bytes_to_write) %
178      options_.capacity_num_bytes;
179
180  HandleSignalsState new_state = GetHandleSignalsStateNoLock();
181  if (!new_state.equals(old_state))
182    awakable_list_.AwakeForStateChange(new_state);
183
184  base::AutoUnlock unlock(lock_);
185  NotifyWrite(num_bytes_to_write);
186
187  return MOJO_RESULT_OK;
188}
189
190MojoResult DataPipeProducerDispatcher::BeginWriteData(
191    void** buffer,
192    uint32_t* buffer_num_bytes,
193    MojoWriteDataFlags flags) {
194  base::AutoLock lock(lock_);
195  if (!shared_ring_buffer_ || in_transit_)
196    return MOJO_RESULT_INVALID_ARGUMENT;
197  if (in_two_phase_write_)
198    return MOJO_RESULT_BUSY;
199  if (peer_closed_)
200    return MOJO_RESULT_FAILED_PRECONDITION;
201
202  if (available_capacity_ == 0) {
203    return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
204                        : MOJO_RESULT_SHOULD_WAIT;
205  }
206
207  in_two_phase_write_ = true;
208  *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_,
209                               available_capacity_);
210  DCHECK_GT(*buffer_num_bytes, 0u);
211
212  CHECK(ring_buffer_mapping_);
213  uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
214  *buffer = data + write_offset_;
215
216  return MOJO_RESULT_OK;
217}
218
219MojoResult DataPipeProducerDispatcher::EndWriteData(
220    uint32_t num_bytes_written) {
221  base::AutoLock lock(lock_);
222  if (is_closed_ || in_transit_)
223    return MOJO_RESULT_INVALID_ARGUMENT;
224
225  if (!in_two_phase_write_)
226    return MOJO_RESULT_FAILED_PRECONDITION;
227
228  DCHECK(shared_ring_buffer_);
229  DCHECK(ring_buffer_mapping_);
230
231  // Note: Allow successful completion of the two-phase write even if the other
232  // side has been closed.
233  MojoResult rv = MOJO_RESULT_OK;
234  if (num_bytes_written > available_capacity_ ||
235      num_bytes_written % options_.element_num_bytes != 0 ||
236      write_offset_ + num_bytes_written > options_.capacity_num_bytes) {
237    rv = MOJO_RESULT_INVALID_ARGUMENT;
238  } else {
239    DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes);
240    available_capacity_ -= num_bytes_written;
241    write_offset_ = (write_offset_ + num_bytes_written) %
242        options_.capacity_num_bytes;
243
244    base::AutoUnlock unlock(lock_);
245    NotifyWrite(num_bytes_written);
246  }
247
248  in_two_phase_write_ = false;
249
250  // If we're now writable, we *became* writable (since we weren't writable
251  // during the two-phase write), so awake producer awakables.
252  HandleSignalsState new_state = GetHandleSignalsStateNoLock();
253  if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
254    awakable_list_.AwakeForStateChange(new_state);
255
256  return rv;
257}
258
259HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
260  base::AutoLock lock(lock_);
261  return GetHandleSignalsStateNoLock();
262}
263
264MojoResult DataPipeProducerDispatcher::AddAwakable(
265    Awakable* awakable,
266    MojoHandleSignals signals,
267    uintptr_t context,
268    HandleSignalsState* signals_state) {
269  base::AutoLock lock(lock_);
270  if (!shared_ring_buffer_ || in_transit_) {
271    if (signals_state)
272      *signals_state = HandleSignalsState();
273    return MOJO_RESULT_INVALID_ARGUMENT;
274  }
275  UpdateSignalsStateNoLock();
276  HandleSignalsState state = GetHandleSignalsStateNoLock();
277  if (state.satisfies(signals)) {
278    if (signals_state)
279      *signals_state = state;
280    return MOJO_RESULT_ALREADY_EXISTS;
281  }
282  if (!state.can_satisfy(signals)) {
283    if (signals_state)
284      *signals_state = state;
285    return MOJO_RESULT_FAILED_PRECONDITION;
286  }
287
288  awakable_list_.Add(awakable, signals, context);
289  return MOJO_RESULT_OK;
290}
291
292void DataPipeProducerDispatcher::RemoveAwakable(
293    Awakable* awakable,
294    HandleSignalsState* signals_state) {
295  base::AutoLock lock(lock_);
296  if ((!shared_ring_buffer_ || in_transit_) && signals_state)
297    *signals_state = HandleSignalsState();
298  else if (signals_state)
299    *signals_state = GetHandleSignalsStateNoLock();
300  awakable_list_.Remove(awakable);
301}
302
303void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes,
304                                                uint32_t* num_ports,
305                                                uint32_t* num_handles) {
306  base::AutoLock lock(lock_);
307  DCHECK(in_transit_);
308  *num_bytes = sizeof(SerializedState);
309  *num_ports = 1;
310  *num_handles = 1;
311}
312
313bool DataPipeProducerDispatcher::EndSerialize(
314    void* destination,
315    ports::PortName* ports,
316    PlatformHandle* platform_handles) {
317  SerializedState* state = static_cast<SerializedState*>(destination);
318  memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
319  memset(state->padding, 0, sizeof(state->padding));
320
321  base::AutoLock lock(lock_);
322  DCHECK(in_transit_);
323  state->pipe_id = pipe_id_;
324  state->write_offset = write_offset_;
325  state->available_capacity = available_capacity_;
326  state->flags = peer_closed_ ? kFlagPeerClosed : 0;
327
328  ports[0] = control_port_.name();
329
330  buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
331  platform_handles[0] = buffer_handle_for_transit_.get();
332
333  return true;
334}
335
336bool DataPipeProducerDispatcher::BeginTransit() {
337  base::AutoLock lock(lock_);
338  if (in_transit_)
339    return false;
340  in_transit_ = !in_two_phase_write_;
341  return in_transit_;
342}
343
344void DataPipeProducerDispatcher::CompleteTransitAndClose() {
345  node_controller_->SetPortObserver(control_port_, nullptr);
346
347  base::AutoLock lock(lock_);
348  DCHECK(in_transit_);
349  transferred_ = true;
350  in_transit_ = false;
351  ignore_result(buffer_handle_for_transit_.release());
352  CloseNoLock();
353}
354
355void DataPipeProducerDispatcher::CancelTransit() {
356  base::AutoLock lock(lock_);
357  DCHECK(in_transit_);
358  in_transit_ = false;
359  buffer_handle_for_transit_.reset();
360  awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
361}
362
363// static
364scoped_refptr<DataPipeProducerDispatcher>
365DataPipeProducerDispatcher::Deserialize(const void* data,
366                                        size_t num_bytes,
367                                        const ports::PortName* ports,
368                                        size_t num_ports,
369                                        PlatformHandle* handles,
370                                        size_t num_handles) {
371  if (num_ports != 1 || num_handles != 1 ||
372      num_bytes != sizeof(SerializedState)) {
373    return nullptr;
374  }
375
376  const SerializedState* state = static_cast<const SerializedState*>(data);
377
378  NodeController* node_controller = internal::g_core->GetNodeController();
379  ports::PortRef port;
380  if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
381    return nullptr;
382
383  PlatformHandle buffer_handle;
384  std::swap(buffer_handle, handles[0]);
385  scoped_refptr<PlatformSharedBuffer> ring_buffer =
386      PlatformSharedBuffer::CreateFromPlatformHandle(
387          state->options.capacity_num_bytes,
388          false /* read_only */,
389          ScopedPlatformHandle(buffer_handle));
390  if (!ring_buffer) {
391    DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
392    return nullptr;
393  }
394
395  scoped_refptr<DataPipeProducerDispatcher> dispatcher =
396      new DataPipeProducerDispatcher(node_controller, port, ring_buffer,
397                                     state->options, false /* initialized */,
398                                     state->pipe_id);
399
400  {
401    base::AutoLock lock(dispatcher->lock_);
402    dispatcher->write_offset_ = state->write_offset;
403    dispatcher->available_capacity_ = state->available_capacity;
404    dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
405    dispatcher->InitializeNoLock();
406  }
407
408  return dispatcher;
409}
410
411DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
412  DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ &&
413         !ring_buffer_mapping_);
414}
415
416void DataPipeProducerDispatcher::InitializeNoLock() {
417  lock_.AssertAcquired();
418
419  if (shared_ring_buffer_) {
420    ring_buffer_mapping_ =
421        shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
422    if (!ring_buffer_mapping_) {
423      DLOG(ERROR) << "Failed to map shared buffer.";
424      shared_ring_buffer_ = nullptr;
425    }
426  }
427
428  base::AutoUnlock unlock(lock_);
429  node_controller_->SetPortObserver(
430      control_port_,
431      make_scoped_refptr(new PortObserverThunk(this)));
432}
433
434MojoResult DataPipeProducerDispatcher::CloseNoLock() {
435  lock_.AssertAcquired();
436  if (is_closed_ || in_transit_)
437    return MOJO_RESULT_INVALID_ARGUMENT;
438  is_closed_ = true;
439  ring_buffer_mapping_.reset();
440  shared_ring_buffer_ = nullptr;
441
442  awakable_list_.CancelAll();
443  if (!transferred_) {
444    base::AutoUnlock unlock(lock_);
445    node_controller_->ClosePort(control_port_);
446  }
447
448  return MOJO_RESULT_OK;
449}
450
451HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
452    const {
453  lock_.AssertAcquired();
454  HandleSignalsState rv;
455  if (!peer_closed_) {
456    if (!in_two_phase_write_ && shared_ring_buffer_ &&
457        available_capacity_ > 0)
458      rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
459    rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
460  } else {
461    rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
462  }
463  rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
464  return rv;
465}
466
467void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
468  DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: "
469           << num_bytes << " bytes written. [control_port="
470           << control_port_.name() << "]";
471
472  SendDataPipeControlMessage(node_controller_, control_port_,
473                             DataPipeCommand::DATA_WAS_WRITTEN, num_bytes);
474}
475
476void DataPipeProducerDispatcher::OnPortStatusChanged() {
477  DCHECK(RequestContext::current());
478
479  base::AutoLock lock(lock_);
480
481  // We stop observing the control port as soon it's transferred, but this can
482  // race with events which are raised right before that happens. This is fine
483  // to ignore.
484  if (transferred_)
485    return;
486
487  DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
488
489  UpdateSignalsStateNoLock();
490}
491
492void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
493  lock_.AssertAcquired();
494
495  bool was_peer_closed = peer_closed_;
496  size_t previous_capacity = available_capacity_;
497
498  ports::PortStatus port_status;
499  int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
500  if (rv != ports::OK || !port_status.receiving_messages) {
501    DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure"
502             << " [control_port=" << control_port_.name() << "]";
503    peer_closed_ = true;
504  } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
505    ports::ScopedMessage message;
506    do {
507      int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
508                                                      &message);
509      if (rv != ports::OK)
510        peer_closed_ = true;
511      if (message) {
512        if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
513          peer_closed_ = true;
514          break;
515        }
516
517        const DataPipeControlMessage* m =
518            static_cast<const DataPipeControlMessage*>(
519                message->payload_bytes());
520
521        if (m->command != DataPipeCommand::DATA_WAS_READ) {
522          DLOG(ERROR) << "Unexpected message from consumer.";
523          peer_closed_ = true;
524          break;
525        }
526
527        if (static_cast<size_t>(available_capacity_) + m->num_bytes >
528              options_.capacity_num_bytes) {
529          DLOG(ERROR) << "Consumer claims to have read too many bytes.";
530          break;
531        }
532
533        DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that "
534                 << m->num_bytes << " bytes were read. [control_port="
535                 << control_port_.name() << "]";
536
537        available_capacity_ += m->num_bytes;
538      }
539    } while (message);
540  }
541
542  if (peer_closed_ != was_peer_closed ||
543      available_capacity_ != previous_capacity) {
544    awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
545  }
546}
547
548}  // namespace edk
549}  // namespace mojo
550