raw_channel.cc revision effb81e5f8246d0db0270817048dc992db66e9fb
1a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// Copyright 2014 The Chromium Authors. All rights reserved.
2a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// Use of this source code is governed by a BSD-style license that can be
3a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// found in the LICENSE file.
4a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
5a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "mojo/system/raw_channel.h"
6a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
7a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include <string.h>
8a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
9a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include <algorithm>
10a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
11a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/bind.h"
12a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/location.h"
13a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/logging.h"
14a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/message_loop/message_loop.h"
15a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/stl_util.h"
16a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "mojo/system/message_in_transit.h"
17a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
18a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)namespace mojo {
19a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)namespace system {
20a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
21a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)const size_t kReadSize = 4096;
22a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
23a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
24a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
25a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
26a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::ReadBuffer::~ReadBuffer() {}
27a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
28a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
29a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
30a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  *addr = &buffer_[0] + num_valid_bytes_;
31a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  *size = kReadSize;
32a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
33a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
34a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
35a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
36a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::WriteBuffer::~WriteBuffer() {
37a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  STLDeleteElements(&message_queue_);
38a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
39a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
40a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
41a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  buffers->clear();
42a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
43a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  size_t bytes_to_write = GetTotalBytesToWrite();
44a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (bytes_to_write == 0)
45a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return;
46a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
47a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  MessageInTransit* message = message_queue_.front();
48a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (!message->secondary_buffer_size()) {
49a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Only write from the main buffer.
50a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    DCHECK_LT(offset_, message->main_buffer_size());
51a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    DCHECK_LE(bytes_to_write, message->main_buffer_size());
52a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    Buffer buffer = {
53a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        static_cast<const char*>(message->main_buffer()) + offset_,
54a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        bytes_to_write};
55a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    buffers->push_back(buffer);
56a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return;
57a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
58a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
59a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (offset_ >= message->main_buffer_size()) {
60a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Only write from the secondary buffer.
61a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    DCHECK_LT(offset_ - message->main_buffer_size(),
62a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)              message->secondary_buffer_size());
63a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    DCHECK_LE(bytes_to_write, message->secondary_buffer_size());
64a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    Buffer buffer = {
65a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        static_cast<const char*>(message->secondary_buffer()) +
66a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)            (offset_ - message->main_buffer_size()),
67a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        bytes_to_write};
68a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    buffers->push_back(buffer);
69a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return;
70a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
71a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
72a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // Write from both buffers.
73a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ +
74a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                                message->secondary_buffer_size());
75a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  Buffer buffer1 = {
76a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      static_cast<const char*>(message->main_buffer()) + offset_,
77a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      message->main_buffer_size() - offset_};
78a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  buffers->push_back(buffer1);
79a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  Buffer buffer2 = {
80a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      static_cast<const char*>(message->secondary_buffer()),
81a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      message->secondary_buffer_size()};
82a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  buffers->push_back(buffer2);
83a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
84a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
85a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const {
86a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (message_queue_.empty())
87a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return 0;
88a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
89a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  MessageInTransit* message = message_queue_.front();
90a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_LT(offset_, message->total_size());
91a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return message->total_size() - offset_;
92a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
93a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
94a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::RawChannel(Delegate* delegate,
95a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                       base::MessageLoopForIO* message_loop_for_io)
96a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    : delegate_(delegate),
97a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      message_loop_for_io_(message_loop_for_io),
98a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_stopped_(false),
99a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      write_stopped_(false),
100a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      weak_ptr_factory_(this) {
101a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
102a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
103a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::~RawChannel() {
104a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!read_buffer_);
105a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!write_buffer_);
106a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
107a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // No need to take the |write_lock_| here -- if there are still weak pointers
108a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // outstanding, then we're hosed anyway (since we wouldn't be able to
109a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // invalidate them cleanly, since we might not be on the I/O thread).
110a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!weak_ptr_factory_.HasWeakPtrs());
111a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
112a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
113a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)bool RawChannel::Init() {
114a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
115a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
116a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // No need to take the lock. No one should be using us yet.
117a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!read_buffer_);
118a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  read_buffer_.reset(new ReadBuffer);
119a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!write_buffer_);
120a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_buffer_.reset(new WriteBuffer);
121a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
122a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (!OnInit())
123a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return false;
124a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
125a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return ScheduleRead() == IO_PENDING;
126a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
127a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
128a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::Shutdown() {
129a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
130a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
131a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  base::AutoLock locker(write_lock_);
132a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
133effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch  LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
134effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch      << "Shutting down RawChannel with write buffer nonempty";
135effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch
136a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  weak_ptr_factory_.InvalidateWeakPtrs();
137a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
138a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  read_stopped_ = true;
139a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_stopped_ = true;
140a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
141a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
142a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
143a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
144a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// Reminder: This must be thread-safe.
145a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
146a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  base::AutoLock locker(write_lock_);
147a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (write_stopped_)
148a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return false;
149a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
150a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (!write_buffer_->message_queue_.empty()) {
151a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    write_buffer_->message_queue_.push_back(message.release());
152a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return true;
153a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
154a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
155a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_buffer_->message_queue_.push_front(message.release());
156a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(write_buffer_->offset_, 0u);
157a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
158a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  size_t bytes_written = 0;
159a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  IOResult io_result = WriteNoLock(&bytes_written);
160a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (io_result == IO_PENDING)
161a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return true;
162a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
163a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
164a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                                       bytes_written);
165a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (!result) {
166a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Even if we're on the I/O thread, don't call |OnFatalError()| in the
167a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // nested context.
168a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    message_loop_for_io_->PostTask(
169a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        FROM_HERE,
170a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        base::Bind(&RawChannel::CallOnFatalError,
171a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                   weak_ptr_factory_.GetWeakPtr(),
172a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                   Delegate::FATAL_ERROR_FAILED_WRITE));
173a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
174a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
175a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return result;
176a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
177a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
178effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch// Reminder: This must be thread-safe.
179effb81e5f8246d0db0270817048dc992db66e9fbBen Murdochbool RawChannel::IsWriteBufferEmpty() {
180effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch  base::AutoLock locker(write_lock_);
181effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch  return write_buffer_->message_queue_.empty();
182effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch}
183effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch
184a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::ReadBuffer* RawChannel::read_buffer() {
185a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
186a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return read_buffer_.get();
187a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
188a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
189a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
190a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_lock_.AssertAcquired();
191a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return write_buffer_.get();
192a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
193a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
194a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
195a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
196a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
197a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (read_stopped_) {
198a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    NOTREACHED();
199a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return;
200a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
201a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
202a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
203a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
204a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // Keep reading data in a loop, and dispatch messages if enough data is
205a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // received. Exit the loop if any of the following happens:
206a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  //   - one or more messages were dispatched;
207a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  //   - the last read failed, was a partial read or would block;
208a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  //   - |Shutdown()| was called.
209a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  do {
210a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (io_result != IO_SUCCEEDED) {
211a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_stopped_ = true;
212a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
213a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      return;
214a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
215a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
216a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    read_buffer_->num_valid_bytes_ += bytes_read;
217a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
218a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Dispatch all the messages that we can.
219a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    bool did_dispatch_message = false;
220a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Tracks the offset of the first undispatched message in |read_buffer_|.
221a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Currently, we copy data to ensure that this is zero at the beginning.
222a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    size_t read_buffer_start = 0;
223a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    size_t remaining_bytes = read_buffer_->num_valid_bytes_;
224a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    size_t message_size;
225a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Note that we rely on short-circuit evaluation here:
226a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    //   - |read_buffer_start| may be an invalid index into
227a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    //     |read_buffer_->buffer_| if |remaining_bytes| is zero.
228a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    //   - |message_size| is only valid if |GetNextMessageSize()| returns true.
229a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
230a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // next read).
231a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // TODO(vtl): Validate that |message_size| is sane.
232a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    while (remaining_bytes > 0 &&
233a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)           MessageInTransit::GetNextMessageSize(
234a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)               &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
235a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)               &message_size) &&
236a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)           remaining_bytes >= message_size) {
237a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      MessageInTransit::View
238a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)          message_view(message_size, &read_buffer_->buffer_[read_buffer_start]);
239a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      DCHECK_EQ(message_view.total_size(), message_size);
240a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
241a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Dispatch the message.
242a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      delegate_->OnReadMessage(message_view);
243a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      if (read_stopped_) {
244a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        // |Shutdown()| was called in |OnReadMessage()|.
245a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        // TODO(vtl): Add test for this case.
246a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        return;
247a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      }
248a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      did_dispatch_message = true;
249a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
250a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Update our state.
251a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_buffer_start += message_size;
252a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      remaining_bytes -= message_size;
253a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
254a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
255a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (read_buffer_start > 0) {
256a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Move data back to start.
257a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_buffer_->num_valid_bytes_ = remaining_bytes;
258a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      if (read_buffer_->num_valid_bytes_ > 0) {
259a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        memmove(&read_buffer_->buffer_[0],
260a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
261a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      }
262a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_buffer_start = 0;
263a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
264a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
265a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
266a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)            kReadSize) {
267a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Use power-of-2 buffer sizes.
268a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
269a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // maximum message size to whatever extent necessary).
270a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // TODO(vtl): We may often be able to peek at the header and get the real
271a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // required extra space (which may be much bigger than |kReadSize|).
272a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
273a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
274a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        new_size *= 2;
275a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
276a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // TODO(vtl): It's suboptimal to zero out the fresh memory.
277a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_buffer_->buffer_.resize(new_size, 0);
278a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
279a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
280a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // (1) If we dispatched any messages, stop reading for now (and let the
281a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // message loop do its thing for another round).
282a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
283a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // a single message. Risks: slower, more complex if we want to avoid lots of
284a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // copying. ii. Keep reading until there's no more data and dispatch all the
285a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // messages we can. Risks: starvation of other users of the message loop.)
286a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // (2) If we didn't max out |kReadSize|, stop reading for now.
287a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
288a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    bytes_read = 0;
289a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
290a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  } while (io_result != IO_PENDING);
291a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
292a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
293a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
294a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
295a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
296a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  bool did_fail = false;
297a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  {
298a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    base::AutoLock locker(write_lock_);
299a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
300a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
301a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (write_stopped_) {
302a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      NOTREACHED();
303a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      return;
304a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
305a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
306a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    did_fail = !OnWriteCompletedNoLock(result, bytes_written);
307a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
308a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
309a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (did_fail)
310a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
311a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
312a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
313a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
314a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
315a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
316a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  delegate_->OnFatalError(fatal_error);
317a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
318a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
319a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
320a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_lock_.AssertAcquired();
321a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
322a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!write_stopped_);
323a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!write_buffer_->message_queue_.empty());
324a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
325a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (result) {
326a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (bytes_written < write_buffer_->GetTotalBytesToWrite()) {
327a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Partial (or no) write.
328a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      write_buffer_->offset_ += bytes_written;
329a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    } else {
330a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Complete write.
331a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite());
332a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      delete write_buffer_->message_queue_.front();
333a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      write_buffer_->message_queue_.pop_front();
334a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      write_buffer_->offset_ = 0;
335a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
336a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
337a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (write_buffer_->message_queue_.empty())
338a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      return true;
339a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
340a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Schedule the next write.
341a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (ScheduleWriteNoLock() == IO_PENDING)
342a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      return true;
343a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
344a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
345a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_stopped_ = true;
346a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  STLDeleteElements(&write_buffer_->message_queue_);
347a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_buffer_->offset_ = 0;
348a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return false;
349a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
350a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
351a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}  // namespace system
352a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}  // namespace mojo
353