raw_channel.cc revision 5c02ac1a9c1b504631c0a3d2b6e737b5d738bae1
1a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// Copyright 2014 The Chromium Authors. All rights reserved. 2a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// Use of this source code is governed by a BSD-style license that can be 3a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// found in the LICENSE file. 4a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 5a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "mojo/system/raw_channel.h" 6a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 7a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include <string.h> 8a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 9a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include <algorithm> 10a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 11a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/bind.h" 12a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/location.h" 13a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/logging.h" 14a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/message_loop/message_loop.h" 15a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "base/stl_util.h" 16a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)#include "mojo/system/message_in_transit.h" 17a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 18a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)namespace mojo { 19a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)namespace system { 20a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 21a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)const size_t kReadSize = 4096; 22a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 23a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { 24a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 25a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 26a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::ReadBuffer::~ReadBuffer() {} 27a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 28a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { 29a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); 30a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) *addr = &buffer_[0] + num_valid_bytes_; 31a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) *size = kReadSize; 32a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 33a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 34a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {} 35a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 36a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::WriteBuffer::~WriteBuffer() { 37a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) STLDeleteElements(&message_queue_); 38a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 39a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 40a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { 41a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) buffers->clear(); 42a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 43a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) size_t bytes_to_write = GetTotalBytesToWrite(); 44a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (bytes_to_write == 0) 45a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return; 46a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 47a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) MessageInTransit* message = message_queue_.front(); 48a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (!message->secondary_buffer_size()) { 49a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Only write from the main buffer. 50a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_LT(offset_, message->main_buffer_size()); 51a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_LE(bytes_to_write, message->main_buffer_size()); 52a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) Buffer buffer = { 53a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) static_cast<const char*>(message->main_buffer()) + offset_, 54a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) bytes_to_write}; 55a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) buffers->push_back(buffer); 56a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return; 57a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 58a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 59a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (offset_ >= message->main_buffer_size()) { 60a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Only write from the secondary buffer. 61a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_LT(offset_ - message->main_buffer_size(), 62a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) message->secondary_buffer_size()); 63a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_LE(bytes_to_write, message->secondary_buffer_size()); 64a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) Buffer buffer = { 65a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) static_cast<const char*>(message->secondary_buffer()) + 66a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) (offset_ - message->main_buffer_size()), 67a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) bytes_to_write}; 68a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) buffers->push_back(buffer); 69a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return; 70a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 71a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 72a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Write from both buffers. 73a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ + 74a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) message->secondary_buffer_size()); 75a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) Buffer buffer1 = { 76a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) static_cast<const char*>(message->main_buffer()) + offset_, 77a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) message->main_buffer_size() - offset_}; 78a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) buffers->push_back(buffer1); 79a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) Buffer buffer2 = { 80a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) static_cast<const char*>(message->secondary_buffer()), 81a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) message->secondary_buffer_size()}; 82a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) buffers->push_back(buffer2); 83a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 84a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 85a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const { 86a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (message_queue_.empty()) 87a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return 0; 88a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 89a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) MessageInTransit* message = message_queue_.front(); 90a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_LT(offset_, message->total_size()); 91a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return message->total_size() - offset_; 92a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 93a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 94c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen MurdochRawChannel::RawChannel() 955c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu : message_loop_for_io_(NULL), 965c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu delegate_(NULL), 97a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) read_stopped_(false), 98a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_stopped_(false), 99a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) weak_ptr_factory_(this) { 100a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 101a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 102a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::~RawChannel() { 103a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK(!read_buffer_); 104a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK(!write_buffer_); 105a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 106a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // No need to take the |write_lock_| here -- if there are still weak pointers 107a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // outstanding, then we're hosed anyway (since we wouldn't be able to 108a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // invalidate them cleanly, since we might not be on the I/O thread). 109a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 110a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 111a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 112c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdochbool RawChannel::Init(Delegate* delegate) { 113c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch DCHECK(delegate); 114c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch 115c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch DCHECK(!delegate_); 116c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch delegate_ = delegate; 117c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch 118c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); 119c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch DCHECK(!message_loop_for_io_); 120c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch message_loop_for_io_ = 121c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); 122a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 123a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // No need to take the lock. No one should be using us yet. 124a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK(!read_buffer_); 125a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) read_buffer_.reset(new ReadBuffer); 126a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK(!write_buffer_); 127a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_buffer_.reset(new WriteBuffer); 128a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 1295c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu if (!OnInit()) { 1305c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu delegate_ = NULL; 1315c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu message_loop_for_io_ = NULL; 1325c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu read_buffer_.reset(); 1335c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu write_buffer_.reset(); 134a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return false; 1355c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu } 136a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 137a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return ScheduleRead() == IO_PENDING; 138a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 139a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 140a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::Shutdown() { 141a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 142a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 143a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) base::AutoLock locker(write_lock_); 144a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 145effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) 146effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch << "Shutting down RawChannel with write buffer nonempty"; 147effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch 1485c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu // Reset the delegate so that it won't receive further calls. 1495c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu delegate_ = NULL; 150a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) read_stopped_ = true; 151a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_stopped_ = true; 1525c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu weak_ptr_factory_.InvalidateWeakPtrs(); 153a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 154a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); 155a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 156a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 157a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)// Reminder: This must be thread-safe. 158a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { 159c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch DCHECK(message); 160c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch 161c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch // TODO(vtl) 162c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch if (message->has_platform_handles()) { 163c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch NOTIMPLEMENTED(); 164c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch return false; 165c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch } 166c5cede9ae108bb15f6b7a8aea21c7e1fefa2834cBen Murdoch 167a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) base::AutoLock locker(write_lock_); 168a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (write_stopped_) 169a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return false; 170a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 171a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (!write_buffer_->message_queue_.empty()) { 172a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_buffer_->message_queue_.push_back(message.release()); 173a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return true; 174a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 175a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 176a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_buffer_->message_queue_.push_front(message.release()); 177a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_EQ(write_buffer_->offset_, 0u); 178a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 179a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) size_t bytes_written = 0; 180a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) IOResult io_result = WriteNoLock(&bytes_written); 181a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (io_result == IO_PENDING) 182a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return true; 183a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 184a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, 185a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) bytes_written); 186a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (!result) { 187a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Even if we're on the I/O thread, don't call |OnFatalError()| in the 188a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // nested context. 189a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) message_loop_for_io_->PostTask( 190a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) FROM_HERE, 191a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) base::Bind(&RawChannel::CallOnFatalError, 192a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) weak_ptr_factory_.GetWeakPtr(), 193a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) Delegate::FATAL_ERROR_FAILED_WRITE)); 194a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 195a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 196a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return result; 197a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 198a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 199effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch// Reminder: This must be thread-safe. 200effb81e5f8246d0db0270817048dc992db66e9fbBen Murdochbool RawChannel::IsWriteBufferEmpty() { 201effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch base::AutoLock locker(write_lock_); 202effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch return write_buffer_->message_queue_.empty(); 203effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch} 204effb81e5f8246d0db0270817048dc992db66e9fbBen Murdoch 205a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::ReadBuffer* RawChannel::read_buffer() { 206a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 207a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return read_buffer_.get(); 208a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 209a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 210a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() { 211a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_lock_.AssertAcquired(); 212a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return write_buffer_.get(); 213a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 214a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 215a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { 216a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 217a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 218a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (read_stopped_) { 219a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) NOTREACHED(); 220a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return; 221a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 222a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 223a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; 224a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 225a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Keep reading data in a loop, and dispatch messages if enough data is 226a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // received. Exit the loop if any of the following happens: 227a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // - one or more messages were dispatched; 228a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // - the last read failed, was a partial read or would block; 229a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // - |Shutdown()| was called. 230a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) do { 231a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (io_result != IO_SUCCEEDED) { 232a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) read_stopped_ = true; 233a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); 234a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return; 235a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 236a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 237a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) read_buffer_->num_valid_bytes_ += bytes_read; 238a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 239a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Dispatch all the messages that we can. 240a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) bool did_dispatch_message = false; 241a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Tracks the offset of the first undispatched message in |read_buffer_|. 242a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Currently, we copy data to ensure that this is zero at the beginning. 243a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) size_t read_buffer_start = 0; 244a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) size_t remaining_bytes = read_buffer_->num_valid_bytes_; 245a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) size_t message_size; 246a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Note that we rely on short-circuit evaluation here: 247a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // - |read_buffer_start| may be an invalid index into 248a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // |read_buffer_->buffer_| if |remaining_bytes| is zero. 249a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // - |message_size| is only valid if |GetNextMessageSize()| returns true. 250a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // TODO(vtl): Use |message_size| more intelligently (e.g., to request the 251a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // next read). 252a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // TODO(vtl): Validate that |message_size| is sane. 253a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) while (remaining_bytes > 0 && 254a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) MessageInTransit::GetNextMessageSize( 255a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) &read_buffer_->buffer_[read_buffer_start], remaining_bytes, 256a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) &message_size) && 257a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) remaining_bytes >= message_size) { 258a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) MessageInTransit::View 259a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) message_view(message_size, &read_buffer_->buffer_[read_buffer_start]); 260a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_EQ(message_view.total_size(), message_size); 261a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 262a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Dispatch the message. 2635c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu DCHECK(delegate_); 264a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) delegate_->OnReadMessage(message_view); 265a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (read_stopped_) { 266a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // |Shutdown()| was called in |OnReadMessage()|. 267a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // TODO(vtl): Add test for this case. 268a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return; 269a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 270a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) did_dispatch_message = true; 271a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 272a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Update our state. 273a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) read_buffer_start += message_size; 274a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) remaining_bytes -= message_size; 275a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 276a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 277a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (read_buffer_start > 0) { 278a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Move data back to start. 279a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) read_buffer_->num_valid_bytes_ = remaining_bytes; 280a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (read_buffer_->num_valid_bytes_ > 0) { 281a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) memmove(&read_buffer_->buffer_[0], 282a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) &read_buffer_->buffer_[read_buffer_start], remaining_bytes); 283a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 284a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) read_buffer_start = 0; 285a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 286a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 287a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < 288a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) kReadSize) { 289a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Use power-of-2 buffer sizes. 290a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 291a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // maximum message size to whatever extent necessary). 292a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // TODO(vtl): We may often be able to peek at the header and get the real 293a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // required extra space (which may be much bigger than |kReadSize|). 294a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); 295a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) 296a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) new_size *= 2; 297a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 298a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // TODO(vtl): It's suboptimal to zero out the fresh memory. 299a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) read_buffer_->buffer_.resize(new_size, 0); 300a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 301a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 302a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // (1) If we dispatched any messages, stop reading for now (and let the 303a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // message loop do its thing for another round). 304a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only 305a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // a single message. Risks: slower, more complex if we want to avoid lots of 306a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // copying. ii. Keep reading until there's no more data and dispatch all the 307a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // messages we can. Risks: starvation of other users of the message loop.) 308a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // (2) If we didn't max out |kReadSize|, stop reading for now. 309a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; 310a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) bytes_read = 0; 311a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); 312a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } while (io_result != IO_PENDING); 313a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 314a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 315a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { 316a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 317a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 318a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) bool did_fail = false; 319a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) { 320a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) base::AutoLock locker(write_lock_); 321a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); 322a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 323a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (write_stopped_) { 324a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) NOTREACHED(); 325a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return; 326a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 327a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 328a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) did_fail = !OnWriteCompletedNoLock(result, bytes_written); 329a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 330a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 331a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (did_fail) 332a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); 333a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 334a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 335a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { 336a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 337a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 3385c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu if (delegate_) 3395c02ac1a9c1b504631c0a3d2b6e737b5d738bae1Bo Liu delegate_->OnFatalError(fatal_error); 340a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 341a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 342a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { 343a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_lock_.AssertAcquired(); 344a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 345a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK(!write_stopped_); 346a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK(!write_buffer_->message_queue_.empty()); 347a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 348a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (result) { 349a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (bytes_written < write_buffer_->GetTotalBytesToWrite()) { 350a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Partial (or no) write. 351a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_buffer_->offset_ += bytes_written; 352a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } else { 353a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Complete write. 354a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite()); 355a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) delete write_buffer_->message_queue_.front(); 356a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_buffer_->message_queue_.pop_front(); 357a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_buffer_->offset_ = 0; 358a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 359a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 360a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (write_buffer_->message_queue_.empty()) 361a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return true; 362a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 363a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) // Schedule the next write. 364a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) if (ScheduleWriteNoLock() == IO_PENDING) 365a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return true; 366a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) } 367a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 368a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_stopped_ = true; 369a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) STLDeleteElements(&write_buffer_->message_queue_); 370a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) write_buffer_->offset_ = 0; 371a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) return false; 372a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} 373a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles) 374a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} // namespace system 375a1401311d1ab56c4ed0a474bd38c108f75cb0cd9Torne (Richard Coles)} // namespace mojo 376