raw_channel.cc revision a1401311d1ab56c4ed0a474bd38c108f75cb0cd9
1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/system/raw_channel.h" 6 7#include <string.h> 8 9#include <algorithm> 10 11#include "base/bind.h" 12#include "base/location.h" 13#include "base/logging.h" 14#include "base/message_loop/message_loop.h" 15#include "base/stl_util.h" 16#include "mojo/system/message_in_transit.h" 17 18namespace mojo { 19namespace system { 20 21const size_t kReadSize = 4096; 22 23RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { 24} 25 26RawChannel::ReadBuffer::~ReadBuffer() {} 27 28void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { 29 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); 30 *addr = &buffer_[0] + num_valid_bytes_; 31 *size = kReadSize; 32} 33 34RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {} 35 36RawChannel::WriteBuffer::~WriteBuffer() { 37 STLDeleteElements(&message_queue_); 38} 39 40void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { 41 buffers->clear(); 42 43 size_t bytes_to_write = GetTotalBytesToWrite(); 44 if (bytes_to_write == 0) 45 return; 46 47 MessageInTransit* message = message_queue_.front(); 48 if (!message->secondary_buffer_size()) { 49 // Only write from the main buffer. 50 DCHECK_LT(offset_, message->main_buffer_size()); 51 DCHECK_LE(bytes_to_write, message->main_buffer_size()); 52 Buffer buffer = { 53 static_cast<const char*>(message->main_buffer()) + offset_, 54 bytes_to_write}; 55 buffers->push_back(buffer); 56 return; 57 } 58 59 if (offset_ >= message->main_buffer_size()) { 60 // Only write from the secondary buffer. 61 DCHECK_LT(offset_ - message->main_buffer_size(), 62 message->secondary_buffer_size()); 63 DCHECK_LE(bytes_to_write, message->secondary_buffer_size()); 64 Buffer buffer = { 65 static_cast<const char*>(message->secondary_buffer()) + 66 (offset_ - message->main_buffer_size()), 67 bytes_to_write}; 68 buffers->push_back(buffer); 69 return; 70 } 71 72 // Write from both buffers. 73 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ + 74 message->secondary_buffer_size()); 75 Buffer buffer1 = { 76 static_cast<const char*>(message->main_buffer()) + offset_, 77 message->main_buffer_size() - offset_}; 78 buffers->push_back(buffer1); 79 Buffer buffer2 = { 80 static_cast<const char*>(message->secondary_buffer()), 81 message->secondary_buffer_size()}; 82 buffers->push_back(buffer2); 83} 84 85size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const { 86 if (message_queue_.empty()) 87 return 0; 88 89 MessageInTransit* message = message_queue_.front(); 90 DCHECK_LT(offset_, message->total_size()); 91 return message->total_size() - offset_; 92} 93 94RawChannel::RawChannel(Delegate* delegate, 95 base::MessageLoopForIO* message_loop_for_io) 96 : delegate_(delegate), 97 message_loop_for_io_(message_loop_for_io), 98 read_stopped_(false), 99 write_stopped_(false), 100 weak_ptr_factory_(this) { 101} 102 103RawChannel::~RawChannel() { 104 DCHECK(!read_buffer_); 105 DCHECK(!write_buffer_); 106 107 // No need to take the |write_lock_| here -- if there are still weak pointers 108 // outstanding, then we're hosed anyway (since we wouldn't be able to 109 // invalidate them cleanly, since we might not be on the I/O thread). 110 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 111} 112 113bool RawChannel::Init() { 114 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 115 116 // No need to take the lock. No one should be using us yet. 117 DCHECK(!read_buffer_); 118 read_buffer_.reset(new ReadBuffer); 119 DCHECK(!write_buffer_); 120 write_buffer_.reset(new WriteBuffer); 121 122 if (!OnInit()) 123 return false; 124 125 return ScheduleRead() == IO_PENDING; 126} 127 128void RawChannel::Shutdown() { 129 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 130 131 base::AutoLock locker(write_lock_); 132 133 weak_ptr_factory_.InvalidateWeakPtrs(); 134 135 read_stopped_ = true; 136 write_stopped_ = true; 137 138 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); 139} 140 141// Reminder: This must be thread-safe. 142bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { 143 base::AutoLock locker(write_lock_); 144 if (write_stopped_) 145 return false; 146 147 if (!write_buffer_->message_queue_.empty()) { 148 write_buffer_->message_queue_.push_back(message.release()); 149 return true; 150 } 151 152 write_buffer_->message_queue_.push_front(message.release()); 153 DCHECK_EQ(write_buffer_->offset_, 0u); 154 155 size_t bytes_written = 0; 156 IOResult io_result = WriteNoLock(&bytes_written); 157 if (io_result == IO_PENDING) 158 return true; 159 160 bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, 161 bytes_written); 162 if (!result) { 163 // Even if we're on the I/O thread, don't call |OnFatalError()| in the 164 // nested context. 165 message_loop_for_io_->PostTask( 166 FROM_HERE, 167 base::Bind(&RawChannel::CallOnFatalError, 168 weak_ptr_factory_.GetWeakPtr(), 169 Delegate::FATAL_ERROR_FAILED_WRITE)); 170 } 171 172 return result; 173} 174 175RawChannel::ReadBuffer* RawChannel::read_buffer() { 176 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 177 return read_buffer_.get(); 178} 179 180RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() { 181 write_lock_.AssertAcquired(); 182 return write_buffer_.get(); 183} 184 185void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { 186 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 187 188 if (read_stopped_) { 189 NOTREACHED(); 190 return; 191 } 192 193 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; 194 195 // Keep reading data in a loop, and dispatch messages if enough data is 196 // received. Exit the loop if any of the following happens: 197 // - one or more messages were dispatched; 198 // - the last read failed, was a partial read or would block; 199 // - |Shutdown()| was called. 200 do { 201 if (io_result != IO_SUCCEEDED) { 202 read_stopped_ = true; 203 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); 204 return; 205 } 206 207 read_buffer_->num_valid_bytes_ += bytes_read; 208 209 // Dispatch all the messages that we can. 210 bool did_dispatch_message = false; 211 // Tracks the offset of the first undispatched message in |read_buffer_|. 212 // Currently, we copy data to ensure that this is zero at the beginning. 213 size_t read_buffer_start = 0; 214 size_t remaining_bytes = read_buffer_->num_valid_bytes_; 215 size_t message_size; 216 // Note that we rely on short-circuit evaluation here: 217 // - |read_buffer_start| may be an invalid index into 218 // |read_buffer_->buffer_| if |remaining_bytes| is zero. 219 // - |message_size| is only valid if |GetNextMessageSize()| returns true. 220 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the 221 // next read). 222 // TODO(vtl): Validate that |message_size| is sane. 223 while (remaining_bytes > 0 && 224 MessageInTransit::GetNextMessageSize( 225 &read_buffer_->buffer_[read_buffer_start], remaining_bytes, 226 &message_size) && 227 remaining_bytes >= message_size) { 228 MessageInTransit::View 229 message_view(message_size, &read_buffer_->buffer_[read_buffer_start]); 230 DCHECK_EQ(message_view.total_size(), message_size); 231 232 // Dispatch the message. 233 delegate_->OnReadMessage(message_view); 234 if (read_stopped_) { 235 // |Shutdown()| was called in |OnReadMessage()|. 236 // TODO(vtl): Add test for this case. 237 return; 238 } 239 did_dispatch_message = true; 240 241 // Update our state. 242 read_buffer_start += message_size; 243 remaining_bytes -= message_size; 244 } 245 246 if (read_buffer_start > 0) { 247 // Move data back to start. 248 read_buffer_->num_valid_bytes_ = remaining_bytes; 249 if (read_buffer_->num_valid_bytes_ > 0) { 250 memmove(&read_buffer_->buffer_[0], 251 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); 252 } 253 read_buffer_start = 0; 254 } 255 256 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < 257 kReadSize) { 258 // Use power-of-2 buffer sizes. 259 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 260 // maximum message size to whatever extent necessary). 261 // TODO(vtl): We may often be able to peek at the header and get the real 262 // required extra space (which may be much bigger than |kReadSize|). 263 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); 264 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) 265 new_size *= 2; 266 267 // TODO(vtl): It's suboptimal to zero out the fresh memory. 268 read_buffer_->buffer_.resize(new_size, 0); 269 } 270 271 // (1) If we dispatched any messages, stop reading for now (and let the 272 // message loop do its thing for another round). 273 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only 274 // a single message. Risks: slower, more complex if we want to avoid lots of 275 // copying. ii. Keep reading until there's no more data and dispatch all the 276 // messages we can. Risks: starvation of other users of the message loop.) 277 // (2) If we didn't max out |kReadSize|, stop reading for now. 278 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; 279 bytes_read = 0; 280 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); 281 } while (io_result != IO_PENDING); 282} 283 284void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { 285 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 286 287 bool did_fail = false; 288 { 289 base::AutoLock locker(write_lock_); 290 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); 291 292 if (write_stopped_) { 293 NOTREACHED(); 294 return; 295 } 296 297 did_fail = !OnWriteCompletedNoLock(result, bytes_written); 298 } 299 300 if (did_fail) 301 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); 302} 303 304void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { 305 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 306 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 307 delegate_->OnFatalError(fatal_error); 308} 309 310bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { 311 write_lock_.AssertAcquired(); 312 313 DCHECK(!write_stopped_); 314 DCHECK(!write_buffer_->message_queue_.empty()); 315 316 if (result) { 317 if (bytes_written < write_buffer_->GetTotalBytesToWrite()) { 318 // Partial (or no) write. 319 write_buffer_->offset_ += bytes_written; 320 } else { 321 // Complete write. 322 DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite()); 323 delete write_buffer_->message_queue_.front(); 324 write_buffer_->message_queue_.pop_front(); 325 write_buffer_->offset_ = 0; 326 } 327 328 if (write_buffer_->message_queue_.empty()) 329 return true; 330 331 // Schedule the next write. 332 if (ScheduleWriteNoLock() == IO_PENDING) 333 return true; 334 } 335 336 write_stopped_ = true; 337 STLDeleteElements(&write_buffer_->message_queue_); 338 write_buffer_->offset_ = 0; 339 return false; 340} 341 342} // namespace system 343} // namespace mojo 344