raw_channel.cc revision effb81e5f8246d0db0270817048dc992db66e9fb
1c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)// Copyright 2014 The Chromium Authors. All rights reserved.
2c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)// Use of this source code is governed by a BSD-style license that can be
3c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)// found in the LICENSE file.
4c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
5c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)#include "mojo/system/raw_channel.h"
6c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
70529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch#include <string.h>
8a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
95d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)#include <algorithm>
10c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
11a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)#include "base/bind.h"
12c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)#include "base/location.h"
13c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)#include "base/logging.h"
14c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)#include "base/message_loop/message_loop.h"
15c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)#include "base/stl_util.h"
16c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)#include "mojo/system/message_in_transit.h"
17c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
185d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)namespace mojo {
191320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tuccinamespace system {
20bb1529ce867d8845a77ec7cdf3e3003ef1771a40Ben Murdoch
21c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)const size_t kReadSize = 4096;
22c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
237d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
24c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)}
25c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
26c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)RawChannel::ReadBuffer::~ReadBuffer() {}
27c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
28c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
29c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
30c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  *addr = &buffer_[0] + num_valid_bytes_;
31c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  *size = kReadSize;
32c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)}
33c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
34c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
35c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
36c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)RawChannel::WriteBuffer::~WriteBuffer() {
37c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  STLDeleteElements(&message_queue_);
38c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)}
390529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch
40f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
41c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  buffers->clear();
4290dce4d38c5ff5333bea97d859d4e484e27edf0cTorne (Richard Coles)
43c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  size_t bytes_to_write = GetTotalBytesToWrite();
440529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch  if (bytes_to_write == 0)
45f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)    return;
46c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
47c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  MessageInTransit* message = message_queue_.front();
4890dce4d38c5ff5333bea97d859d4e484e27edf0cTorne (Richard Coles)  if (!message->secondary_buffer_size()) {
49c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    // Only write from the main buffer.
50c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    DCHECK_LT(offset_, message->main_buffer_size());
51c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    DCHECK_LE(bytes_to_write, message->main_buffer_size());
52c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    Buffer buffer = {
53c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)        static_cast<const char*>(message->main_buffer()) + offset_,
54c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)        bytes_to_write};
55c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    buffers->push_back(buffer);
56c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    return;
57868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  }
580529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch
59c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  if (offset_ >= message->main_buffer_size()) {
603551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)    // Only write from the secondary buffer.
61c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    DCHECK_LT(offset_ - message->main_buffer_size(),
62c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)              message->secondary_buffer_size());
63c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    DCHECK_LE(bytes_to_write, message->secondary_buffer_size());
64c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    Buffer buffer = {
65c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)        static_cast<const char*>(message->secondary_buffer()) +
66c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)            (offset_ - message->main_buffer_size()),
67c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)        bytes_to_write};
68c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    buffers->push_back(buffer);
69c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    return;
7090dce4d38c5ff5333bea97d859d4e484e27edf0cTorne (Richard Coles)  }
71c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
72c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  // Write from both buffers.
73c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ +
74c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)                                message->secondary_buffer_size());
75c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  Buffer buffer1 = {
7690dce4d38c5ff5333bea97d859d4e484e27edf0cTorne (Richard Coles)      static_cast<const char*>(message->main_buffer()) + offset_,
77c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      message->main_buffer_size() - offset_};
78c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  buffers->push_back(buffer1);
79c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  Buffer buffer2 = {
80c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      static_cast<const char*>(message->secondary_buffer()),
810529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch      message->secondary_buffer_size()};
82c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  buffers->push_back(buffer2);
83c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)}
84c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
85c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const {
86c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  if (message_queue_.empty())
87c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    return 0;
88c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
89f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  MessageInTransit* message = message_queue_.front();
90f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  DCHECK_LT(offset_, message->total_size());
91f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  return message->total_size() - offset_;
92f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)}
93f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)
94f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)RawChannel::RawChannel(Delegate* delegate,
95f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)                       base::MessageLoopForIO* message_loop_for_io)
96c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    : delegate_(delegate),
97c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      message_loop_for_io_(message_loop_for_io),
98c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      read_stopped_(false),
99f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)      write_stopped_(false),
100f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)      weak_ptr_factory_(this) {
101f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)}
102f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)
103f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)RawChannel::~RawChannel() {
104f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  DCHECK(!read_buffer_);
105f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  DCHECK(!write_buffer_);
106f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)
107f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  // No need to take the |write_lock_| here -- if there are still weak pointers
108f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  // outstanding, then we're hosed anyway (since we wouldn't be able to
109f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  // invalidate them cleanly, since we might not be on the I/O thread).
110f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  DCHECK(!weak_ptr_factory_.HasWeakPtrs());
111f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)}
112f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)
113f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)bool RawChannel::Init() {
114f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
115f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)
116c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  // No need to take the lock. No one should be using us yet.
117c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  DCHECK(!read_buffer_);
118f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  read_buffer_.reset(new ReadBuffer);
119f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  DCHECK(!write_buffer_);
120f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  write_buffer_.reset(new WriteBuffer);
121f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)
122f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  if (!OnInit())
123c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    return false;
124c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
125a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)  return ScheduleRead() == IO_PENDING;
126a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)}
127c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
128a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)void RawChannel::Shutdown() {
129a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
130a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)
131a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)  base::AutoLock locker(write_lock_);
132a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)
133a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)  LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
134a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)      << "Shutting down RawChannel with write buffer nonempty";
135a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)
136a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)  weak_ptr_factory_.InvalidateWeakPtrs();
137a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)
138a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)  read_stopped_ = true;
139a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_stopped_ = true;
140a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
141a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
1423551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)}
1433551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)
1443551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)// Reminder: This must be thread-safe.
1453551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
1463551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)  base::AutoLock locker(write_lock_);
147bb1529ce867d8845a77ec7cdf3e3003ef1771a40Ben Murdoch  if (write_stopped_)
148bb1529ce867d8845a77ec7cdf3e3003ef1771a40Ben Murdoch    return false;
149a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
1503551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)  if (!write_buffer_->message_queue_.empty()) {
1513551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)    write_buffer_->message_queue_.push_back(message.release());
1523551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)    return true;
1533551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)  }
1543551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)
155a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  write_buffer_->message_queue_.push_front(message.release());
156a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  DCHECK_EQ(write_buffer_->offset_, 0u);
157a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
158a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  size_t bytes_written = 0;
159a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  IOResult io_result = WriteNoLock(&bytes_written);
160a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  if (io_result == IO_PENDING)
161a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    return true;
162a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
163a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
164a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)                                       bytes_written);
165c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  if (!result) {
166c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    // Even if we're on the I/O thread, don't call |OnFatalError()| in the
167c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    // nested context.
168c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    message_loop_for_io_->PostTask(
169c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)        FROM_HERE,
170c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)        base::Bind(&RawChannel::CallOnFatalError,
171c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)                   weak_ptr_factory_.GetWeakPtr(),
172d0247b1b59f9c528cb6df88b4f2b9afaf80d181eTorne (Richard Coles)                   Delegate::FATAL_ERROR_FAILED_WRITE));
1731320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tucci  }
174cedac228d2dd51db4b79ea1e72c7f249408ee061Torne (Richard Coles)
1750529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch  return result;
1760529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch}
1770529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch
1780529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch// Reminder: This must be thread-safe.
1790529e5d033099cbfc42635f6f6183833b09dff6eBen Murdochbool RawChannel::IsWriteBufferEmpty() {
180a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  base::AutoLock locker(write_lock_);
181a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  return write_buffer_->message_queue_.empty();
182cedac228d2dd51db4b79ea1e72c7f249408ee061Torne (Richard Coles)}
183cedac228d2dd51db4b79ea1e72c7f249408ee061Torne (Richard Coles)
184cedac228d2dd51db4b79ea1e72c7f249408ee061Torne (Richard Coles)RawChannel::ReadBuffer* RawChannel::read_buffer() {
1851320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tucci  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
186f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  return read_buffer_.get();
1875d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)}
188c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
1890529e5d033099cbfc42635f6f6183833b09dff6eBen MurdochRawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
1900529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch  write_lock_.AssertAcquired();
1915d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)  return write_buffer_.get();
192c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)}
193c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
1941320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tuccivoid RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
1951320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tucci  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
1961320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tucci
1971320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tucci  if (read_stopped_) {
1981320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tucci    NOTREACHED();
199d0247b1b59f9c528cb6df88b4f2b9afaf80d181eTorne (Richard Coles)    return;
2000529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch  }
201c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
202c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
203c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
204c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  // Keep reading data in a loop, and dispatch messages if enough data is
205c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  // received. Exit the loop if any of the following happens:
206c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  //   - one or more messages were dispatched;
207c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  //   - the last read failed, was a partial read or would block;
208c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  //   - |Shutdown()| was called.
209c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  do {
2105d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)    if (io_result != IO_SUCCEEDED) {
2115d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)      read_stopped_ = true;
2125d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)      CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
2131320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tucci      return;
2147d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)    }
215a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
216bb1529ce867d8845a77ec7cdf3e3003ef1771a40Ben Murdoch    read_buffer_->num_valid_bytes_ += bytes_read;
217a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)
218c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    // Dispatch all the messages that we can.
219a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    bool did_dispatch_message = false;
2203551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)    // Tracks the offset of the first undispatched message in |read_buffer_|.
221f8ee788a64d60abd8f2d742a5fdedde054ecd910Torne (Richard Coles)    // Currently, we copy data to ensure that this is zero at the beginning.
22258537e28ecd584eab876aee8be7156509866d23aTorne (Richard Coles)    size_t read_buffer_start = 0;
22358537e28ecd584eab876aee8be7156509866d23aTorne (Richard Coles)    size_t remaining_bytes = read_buffer_->num_valid_bytes_;
224a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    size_t message_size;
22558537e28ecd584eab876aee8be7156509866d23aTorne (Richard Coles)    // Note that we rely on short-circuit evaluation here:
2263551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)    //   - |read_buffer_start| may be an invalid index into
227c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    //     |read_buffer_->buffer_| if |remaining_bytes| is zero.
228c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    //   - |message_size| is only valid if |GetNextMessageSize()| returns true.
229c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
2307d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)    // next read).
231f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)    // TODO(vtl): Validate that |message_size| is sane.
232f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)    while (remaining_bytes > 0 &&
233c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)           MessageInTransit::GetNextMessageSize(
234c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)               &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
235c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)               &message_size) &&
236c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)           remaining_bytes >= message_size) {
237c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      MessageInTransit::View
238c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)          message_view(message_size, &read_buffer_->buffer_[read_buffer_start]);
239c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      DCHECK_EQ(message_view.total_size(), message_size);
240c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
241f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)      // Dispatch the message.
242f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)      delegate_->OnReadMessage(message_view);
243f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)      if (read_stopped_) {
2445d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)        // |Shutdown()| was called in |OnReadMessage()|.
2455d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)        // TODO(vtl): Add test for this case.
2465d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)        return;
2471320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tucci      }
2487d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)      did_dispatch_message = true;
249a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
250a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      // Update our state.
251a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)      read_buffer_start += message_size;
2525d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)      remaining_bytes -= message_size;
253c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    }
254a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
2553551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)    if (read_buffer_start > 0) {
256f8ee788a64d60abd8f2d742a5fdedde054ecd910Torne (Richard Coles)      // Move data back to start.
257a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      read_buffer_->num_valid_bytes_ = remaining_bytes;
258a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)      if (read_buffer_->num_valid_bytes_ > 0) {
259c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)        memmove(&read_buffer_->buffer_[0],
260c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)                &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
261c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      }
262c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      read_buffer_start = 0;
263f8ee788a64d60abd8f2d742a5fdedde054ecd910Torne (Richard Coles)    }
2643551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)
265a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
266c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)            kReadSize) {
267c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      // Use power-of-2 buffer sizes.
2687d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)      // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
269f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)      // maximum message size to whatever extent necessary).
270f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)      // TODO(vtl): We may often be able to peek at the header and get the real
2715d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)      // required extra space (which may be much bigger than |kReadSize|).
272c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
273c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
274c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)        new_size *= 2;
275c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
27668043e1e95eeb07d5cae7aca370b26518b0867d6Torne (Richard Coles)      // TODO(vtl): It's suboptimal to zero out the fresh memory.
27768043e1e95eeb07d5cae7aca370b26518b0867d6Torne (Richard Coles)      read_buffer_->buffer_.resize(new_size, 0);
2785d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)    }
2795d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)
2805d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)    // (1) If we dispatched any messages, stop reading for now (and let the
2811320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tucci    // message loop do its thing for another round).
2827d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)    // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
283a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    // a single message. Risks: slower, more complex if we want to avoid lots of
284a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)    // copying. ii. Keep reading until there's no more data and dispatch all the
2855d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)    // messages we can. Risks: starvation of other users of the message loop.)
286c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    // (2) If we didn't max out |kReadSize|, stop reading for now.
287a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
2883551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)    bytes_read = 0;
289f8ee788a64d60abd8f2d742a5fdedde054ecd910Torne (Richard Coles)    io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
2903551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)  } while (io_result != IO_PENDING);
291a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
292a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
293c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
2947d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
295c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
2965d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)  bool did_fail = false;
2975d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)  {
2985d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)    base::AutoLock locker(write_lock_);
2991320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tucci    DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
3007d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)
301a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)    if (write_stopped_) {
302c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)      NOTREACHED();
3037d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)      return;
304c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    }
305c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
3067d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)    did_fail = !OnWriteCompletedNoLock(result, bytes_written);
307a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7Torne (Richard Coles)  }
308a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
309f8ee788a64d60abd8f2d742a5fdedde054ecd910Torne (Richard Coles)  if (did_fail)
3103551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)    CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
311a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)}
312a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)
313f8ee788a64d60abd8f2d742a5fdedde054ecd910Torne (Richard Coles)void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
3143551c9c881056c480085172ff9840cab31610854Torne (Richard Coles)  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
315a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
316a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)  delegate_->OnFatalError(fatal_error);
3177d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)}
318c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
319c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
320c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)  write_lock_.AssertAcquired();
321c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)
3227d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)  DCHECK(!write_stopped_);
3235d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)  DCHECK(!write_buffer_->message_queue_.empty());
3247d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)
325f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  if (result) {
326c2e0dbddbe15c98d52c4786dac06cb8952a8ae6dTorne (Richard Coles)    if (bytes_written < write_buffer_->GetTotalBytesToWrite()) {
3277d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)      // Partial (or no) write.
3285d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)      write_buffer_->offset_ += bytes_written;
3297d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)    } else {
3307d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)      // Complete write.
3317d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)      DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite());
3327d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)      delete write_buffer_->message_queue_.front();
3337d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)      write_buffer_->message_queue_.pop_front();
3347d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)      write_buffer_->offset_ = 0;
3355d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)    }
3365d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)
3375d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)    if (write_buffer_->message_queue_.empty())
3381320f92c476a1ad9d19dba2a48c72b75566198e9Primiano Tucci      return true;
3397d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)
3407d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)    // Schedule the next write.
3417d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)    if (ScheduleWriteNoLock() == IO_PENDING)
3427d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)      return true;
343f2477e01787aa58f445919b809d89e252beef54fTorne (Richard Coles)  }
3445d1f7b1de12d16ceb2c938c56701a3e8bfa558f7Torne (Richard Coles)
3457d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)  write_stopped_ = true;
3467d4cd473f85ac64c3747c96c277f9e506a0d2246Torne (Richard Coles)  STLDeleteElements(&write_buffer_->message_queue_);
3470529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch  write_buffer_->offset_ = 0;
3480529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch  return false;
3490529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch}
3500529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch
3510529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch}  // namespace system
3520529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch}  // namespace mojo
3530529e5d033099cbfc42635f6f6183833b09dff6eBen Murdoch