raw_channel.cc revision c5cede9ae108bb15f6b7a8aea21c7e1fefa2834c
1a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// Copyright 2014 The Chromium Authors. All rights reserved.
2a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// Use of this source code is governed by a BSD-style license that can be
3a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// found in the LICENSE file.
4a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
5a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "mojo/system/raw_channel.h"
6a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
7a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include <string.h>
8a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
9a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include <algorithm>
10a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
11a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/bind.h"
12a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/location.h"
13a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/logging.h"
14a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/message_loop/message_loop.h"
15a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/stl_util.h"
16a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "mojo/system/message_in_transit.h"
17a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
18a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)namespace mojo {
19a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)namespace system {
20a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
21a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)const size_t kReadSize = 4096;
22a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
23a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
24a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
25a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
26a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::ReadBuffer::~ReadBuffer() {}
27a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
28a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
29a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
30a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  *addr = &buffer_[0] + num_valid_bytes_;
31a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  *size = kReadSize;
32a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
33a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
34a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
35a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
36a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::WriteBuffer::~WriteBuffer() {
37a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  STLDeleteElements(&message_queue_);
38a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
39a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
40a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
41a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  buffers->clear();
42a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
43a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  size_t bytes_to_write = GetTotalBytesToWrite();
44a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (bytes_to_write == 0)
45a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return;
46a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
47a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  MessageInTransit* message = message_queue_.front();
48a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (!message->secondary_buffer_size()) {
49a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Only write from the main buffer.
50a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    DCHECK_LT(offset_, message->main_buffer_size());
51a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    DCHECK_LE(bytes_to_write, message->main_buffer_size());
52a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    Buffer buffer = {
53a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        static_cast<const char*>(message->main_buffer()) + offset_,
54a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        bytes_to_write};
55a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    buffers->push_back(buffer);
56a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return;
57a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
58a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
59a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (offset_ >= message->main_buffer_size()) {
60a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Only write from the secondary buffer.
61a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    DCHECK_LT(offset_ - message->main_buffer_size(),
62a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)              message->secondary_buffer_size());
63a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    DCHECK_LE(bytes_to_write, message->secondary_buffer_size());
64a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    Buffer buffer = {
65a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        static_cast<const char*>(message->secondary_buffer()) +
66a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)            (offset_ - message->main_buffer_size()),
67a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        bytes_to_write};
68a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    buffers->push_back(buffer);
69a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return;
70a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
71a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
72a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // Write from both buffers.
73a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ +
74a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                                message->secondary_buffer_size());
75a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  Buffer buffer1 = {
76a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      static_cast<const char*>(message->main_buffer()) + offset_,
77a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      message->main_buffer_size() - offset_};
78a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  buffers->push_back(buffer1);
79a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  Buffer buffer2 = {
80a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      static_cast<const char*>(message->secondary_buffer()),
81a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      message->secondary_buffer_size()};
82a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  buffers->push_back(buffer2);
83a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
84a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
85a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const {
86a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (message_queue_.empty())
87a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return 0;
88a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
89a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  MessageInTransit* message = message_queue_.front();
90a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_LT(offset_, message->total_size());
91a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return message->total_size() - offset_;
92a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
93a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
94c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen MurdochRawChannel::RawChannel()
95c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch    : delegate_(NULL),
96c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch      message_loop_for_io_(NULL),
97a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_stopped_(false),
98a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      write_stopped_(false),
99a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      weak_ptr_factory_(this) {
100a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
101a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
102a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::~RawChannel() {
103a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!read_buffer_);
104a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!write_buffer_);
105a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
106a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // No need to take the |write_lock_| here -- if there are still weak pointers
107a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // outstanding, then we're hosed anyway (since we wouldn't be able to
108a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // invalidate them cleanly, since we might not be on the I/O thread).
109a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!weak_ptr_factory_.HasWeakPtrs());
110a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
111a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
112c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdochbool RawChannel::Init(Delegate* delegate) {
113c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch  DCHECK(delegate);
114c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch
115c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch  DCHECK(!delegate_);
116c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch  delegate_ = delegate;
117c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch
118c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch  CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
119c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch  DCHECK(!message_loop_for_io_);
120c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch  message_loop_for_io_ =
121c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch      static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
122a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
123a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // No need to take the lock. No one should be using us yet.
124a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!read_buffer_);
125a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  read_buffer_.reset(new ReadBuffer);
126a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!write_buffer_);
127a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_buffer_.reset(new WriteBuffer);
128a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
129a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (!OnInit())
130a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return false;
131a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
132a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return ScheduleRead() == IO_PENDING;
133a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
134a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
135a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::Shutdown() {
136a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
137a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
138a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  base::AutoLock locker(write_lock_);
139a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
140effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch  LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
141effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch      << "Shutting down RawChannel with write buffer nonempty";
142effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch
143a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  weak_ptr_factory_.InvalidateWeakPtrs();
144a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
145a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  read_stopped_ = true;
146a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_stopped_ = true;
147a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
148a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
149a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
150a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
151a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// Reminder: This must be thread-safe.
152a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
153c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch  DCHECK(message);
154c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch
155c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch  // TODO(vtl)
156c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch  if (message->has_platform_handles()) {
157c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch    NOTIMPLEMENTED();
158c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch    return false;
159c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch  }
160c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch
161a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  base::AutoLock locker(write_lock_);
162a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (write_stopped_)
163a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return false;
164a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
165a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (!write_buffer_->message_queue_.empty()) {
166a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    write_buffer_->message_queue_.push_back(message.release());
167a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return true;
168a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
169a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
170a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_buffer_->message_queue_.push_front(message.release());
171a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(write_buffer_->offset_, 0u);
172a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
173a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  size_t bytes_written = 0;
174a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  IOResult io_result = WriteNoLock(&bytes_written);
175a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (io_result == IO_PENDING)
176a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return true;
177a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
178a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
179a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                                       bytes_written);
180a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (!result) {
181a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Even if we're on the I/O thread, don't call |OnFatalError()| in the
182a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // nested context.
183a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    message_loop_for_io_->PostTask(
184a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        FROM_HERE,
185a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        base::Bind(&RawChannel::CallOnFatalError,
186a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                   weak_ptr_factory_.GetWeakPtr(),
187a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                   Delegate::FATAL_ERROR_FAILED_WRITE));
188a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
189a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
190a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return result;
191a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
192a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
193effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch// Reminder: This must be thread-safe.
194effb81e5f8246d0db0270817048dc992db66e9fbBen Murdochbool RawChannel::IsWriteBufferEmpty() {
195effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch  base::AutoLock locker(write_lock_);
196effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch  return write_buffer_->message_queue_.empty();
197effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch}
198effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch
199a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::ReadBuffer* RawChannel::read_buffer() {
200a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
201a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return read_buffer_.get();
202a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
203a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
204a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
205a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_lock_.AssertAcquired();
206a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return write_buffer_.get();
207a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
208a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
209a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
210a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
211a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
212a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (read_stopped_) {
213a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    NOTREACHED();
214a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return;
215a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
216a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
217a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
218a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
219a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // Keep reading data in a loop, and dispatch messages if enough data is
220a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // received. Exit the loop if any of the following happens:
221a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  //   - one or more messages were dispatched;
222a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  //   - the last read failed, was a partial read or would block;
223a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  //   - |Shutdown()| was called.
224a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  do {
225a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (io_result != IO_SUCCEEDED) {
226a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_stopped_ = true;
227a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
228a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      return;
229a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
230a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
231a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    read_buffer_->num_valid_bytes_ += bytes_read;
232a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
233a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Dispatch all the messages that we can.
234a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    bool did_dispatch_message = false;
235a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Tracks the offset of the first undispatched message in |read_buffer_|.
236a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Currently, we copy data to ensure that this is zero at the beginning.
237a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    size_t read_buffer_start = 0;
238a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    size_t remaining_bytes = read_buffer_->num_valid_bytes_;
239a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    size_t message_size;
240a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Note that we rely on short-circuit evaluation here:
241a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    //   - |read_buffer_start| may be an invalid index into
242a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    //     |read_buffer_->buffer_| if |remaining_bytes| is zero.
243a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    //   - |message_size| is only valid if |GetNextMessageSize()| returns true.
244a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
245a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // next read).
246a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // TODO(vtl): Validate that |message_size| is sane.
247a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    while (remaining_bytes > 0 &&
248a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)           MessageInTransit::GetNextMessageSize(
249a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)               &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
250a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)               &message_size) &&
251a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)           remaining_bytes >= message_size) {
252a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      MessageInTransit::View
253a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)          message_view(message_size, &read_buffer_->buffer_[read_buffer_start]);
254a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      DCHECK_EQ(message_view.total_size(), message_size);
255a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
256a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Dispatch the message.
257a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      delegate_->OnReadMessage(message_view);
258a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      if (read_stopped_) {
259a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        // |Shutdown()| was called in |OnReadMessage()|.
260a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        // TODO(vtl): Add test for this case.
261a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        return;
262a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      }
263a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      did_dispatch_message = true;
264a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
265a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Update our state.
266a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_buffer_start += message_size;
267a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      remaining_bytes -= message_size;
268a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
269a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
270a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (read_buffer_start > 0) {
271a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Move data back to start.
272a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_buffer_->num_valid_bytes_ = remaining_bytes;
273a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      if (read_buffer_->num_valid_bytes_ > 0) {
274a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        memmove(&read_buffer_->buffer_[0],
275a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
276a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      }
277a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_buffer_start = 0;
278a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
279a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
280a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
281a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)            kReadSize) {
282a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Use power-of-2 buffer sizes.
283a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
284a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // maximum message size to whatever extent necessary).
285a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // TODO(vtl): We may often be able to peek at the header and get the real
286a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // required extra space (which may be much bigger than |kReadSize|).
287a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
288a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
289a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)        new_size *= 2;
290a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
291a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // TODO(vtl): It's suboptimal to zero out the fresh memory.
292a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_buffer_->buffer_.resize(new_size, 0);
293a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
294a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
295a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // (1) If we dispatched any messages, stop reading for now (and let the
296a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // message loop do its thing for another round).
297a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
298a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // a single message. Risks: slower, more complex if we want to avoid lots of
299a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // copying. ii. Keep reading until there's no more data and dispatch all the
300a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // messages we can. Risks: starvation of other users of the message loop.)
301a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // (2) If we didn't max out |kReadSize|, stop reading for now.
302a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
303a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    bytes_read = 0;
304a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
305a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  } while (io_result != IO_PENDING);
306a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
307a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
308a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
309a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
310a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
311a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  bool did_fail = false;
312a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  {
313a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    base::AutoLock locker(write_lock_);
314a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
315a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
316a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (write_stopped_) {
317a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      NOTREACHED();
318a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      return;
319a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
320a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
321a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    did_fail = !OnWriteCompletedNoLock(result, bytes_written);
322a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
323a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
324a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (did_fail)
325a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
326a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
327a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
328a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
329a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
330a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
331a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  delegate_->OnFatalError(fatal_error);
332a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
333a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
334a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
335a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_lock_.AssertAcquired();
336a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
337a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!write_stopped_);
338a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK(!write_buffer_->message_queue_.empty());
339a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
340a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (result) {
341a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (bytes_written < write_buffer_->GetTotalBytesToWrite()) {
342a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Partial (or no) write.
343a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      write_buffer_->offset_ += bytes_written;
344a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    } else {
345a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Complete write.
346a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite());
347a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      delete write_buffer_->message_queue_.front();
348a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      write_buffer_->message_queue_.pop_front();
349a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      write_buffer_->offset_ = 0;
350a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    }
351a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
352a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (write_buffer_->message_queue_.empty())
353a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      return true;
354a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
355a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // Schedule the next write.
356a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (ScheduleWriteNoLock() == IO_PENDING)
357a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      return true;
358a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  }
359a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
360a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_stopped_ = true;
361a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  STLDeleteElements(&write_buffer_->message_queue_);
362a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_buffer_->offset_ = 0;
363a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return false;
364a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
365a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
366a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}  // namespace system
367a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}  // namespace mojo
368