raw_channel.cc revision 5c02ac1a9c1b504631c0a3d2b6e737b5d738bae1
1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/system/raw_channel.h" 6 7#include <string.h> 8 9#include <algorithm> 10 11#include "base/bind.h" 12#include "base/location.h" 13#include "base/logging.h" 14#include "base/message_loop/message_loop.h" 15#include "base/stl_util.h" 16#include "mojo/system/message_in_transit.h" 17 18namespace mojo { 19namespace system { 20 21const size_t kReadSize = 4096; 22 23RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { 24} 25 26RawChannel::ReadBuffer::~ReadBuffer() {} 27 28void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { 29 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); 30 *addr = &buffer_[0] + num_valid_bytes_; 31 *size = kReadSize; 32} 33 34RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {} 35 36RawChannel::WriteBuffer::~WriteBuffer() { 37 STLDeleteElements(&message_queue_); 38} 39 40void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { 41 buffers->clear(); 42 43 size_t bytes_to_write = GetTotalBytesToWrite(); 44 if (bytes_to_write == 0) 45 return; 46 47 MessageInTransit* message = message_queue_.front(); 48 if (!message->secondary_buffer_size()) { 49 // Only write from the main buffer. 50 DCHECK_LT(offset_, message->main_buffer_size()); 51 DCHECK_LE(bytes_to_write, message->main_buffer_size()); 52 Buffer buffer = { 53 static_cast<const char*>(message->main_buffer()) + offset_, 54 bytes_to_write}; 55 buffers->push_back(buffer); 56 return; 57 } 58 59 if (offset_ >= message->main_buffer_size()) { 60 // Only write from the secondary buffer. 61 DCHECK_LT(offset_ - message->main_buffer_size(), 62 message->secondary_buffer_size()); 63 DCHECK_LE(bytes_to_write, message->secondary_buffer_size()); 64 Buffer buffer = { 65 static_cast<const char*>(message->secondary_buffer()) + 66 (offset_ - message->main_buffer_size()), 67 bytes_to_write}; 68 buffers->push_back(buffer); 69 return; 70 } 71 72 // Write from both buffers. 73 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ + 74 message->secondary_buffer_size()); 75 Buffer buffer1 = { 76 static_cast<const char*>(message->main_buffer()) + offset_, 77 message->main_buffer_size() - offset_}; 78 buffers->push_back(buffer1); 79 Buffer buffer2 = { 80 static_cast<const char*>(message->secondary_buffer()), 81 message->secondary_buffer_size()}; 82 buffers->push_back(buffer2); 83} 84 85size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const { 86 if (message_queue_.empty()) 87 return 0; 88 89 MessageInTransit* message = message_queue_.front(); 90 DCHECK_LT(offset_, message->total_size()); 91 return message->total_size() - offset_; 92} 93 94RawChannel::RawChannel() 95 : message_loop_for_io_(NULL), 96 delegate_(NULL), 97 read_stopped_(false), 98 write_stopped_(false), 99 weak_ptr_factory_(this) { 100} 101 102RawChannel::~RawChannel() { 103 DCHECK(!read_buffer_); 104 DCHECK(!write_buffer_); 105 106 // No need to take the |write_lock_| here -- if there are still weak pointers 107 // outstanding, then we're hosed anyway (since we wouldn't be able to 108 // invalidate them cleanly, since we might not be on the I/O thread). 109 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 110} 111 112bool RawChannel::Init(Delegate* delegate) { 113 DCHECK(delegate); 114 115 DCHECK(!delegate_); 116 delegate_ = delegate; 117 118 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); 119 DCHECK(!message_loop_for_io_); 120 message_loop_for_io_ = 121 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); 122 123 // No need to take the lock. No one should be using us yet. 124 DCHECK(!read_buffer_); 125 read_buffer_.reset(new ReadBuffer); 126 DCHECK(!write_buffer_); 127 write_buffer_.reset(new WriteBuffer); 128 129 if (!OnInit()) { 130 delegate_ = NULL; 131 message_loop_for_io_ = NULL; 132 read_buffer_.reset(); 133 write_buffer_.reset(); 134 return false; 135 } 136 137 return ScheduleRead() == IO_PENDING; 138} 139 140void RawChannel::Shutdown() { 141 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 142 143 base::AutoLock locker(write_lock_); 144 145 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) 146 << "Shutting down RawChannel with write buffer nonempty"; 147 148 // Reset the delegate so that it won't receive further calls. 149 delegate_ = NULL; 150 read_stopped_ = true; 151 write_stopped_ = true; 152 weak_ptr_factory_.InvalidateWeakPtrs(); 153 154 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); 155} 156 157// Reminder: This must be thread-safe. 158bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { 159 DCHECK(message); 160 161 // TODO(vtl) 162 if (message->has_platform_handles()) { 163 NOTIMPLEMENTED(); 164 return false; 165 } 166 167 base::AutoLock locker(write_lock_); 168 if (write_stopped_) 169 return false; 170 171 if (!write_buffer_->message_queue_.empty()) { 172 write_buffer_->message_queue_.push_back(message.release()); 173 return true; 174 } 175 176 write_buffer_->message_queue_.push_front(message.release()); 177 DCHECK_EQ(write_buffer_->offset_, 0u); 178 179 size_t bytes_written = 0; 180 IOResult io_result = WriteNoLock(&bytes_written); 181 if (io_result == IO_PENDING) 182 return true; 183 184 bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, 185 bytes_written); 186 if (!result) { 187 // Even if we're on the I/O thread, don't call |OnFatalError()| in the 188 // nested context. 189 message_loop_for_io_->PostTask( 190 FROM_HERE, 191 base::Bind(&RawChannel::CallOnFatalError, 192 weak_ptr_factory_.GetWeakPtr(), 193 Delegate::FATAL_ERROR_FAILED_WRITE)); 194 } 195 196 return result; 197} 198 199// Reminder: This must be thread-safe. 200bool RawChannel::IsWriteBufferEmpty() { 201 base::AutoLock locker(write_lock_); 202 return write_buffer_->message_queue_.empty(); 203} 204 205RawChannel::ReadBuffer* RawChannel::read_buffer() { 206 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 207 return read_buffer_.get(); 208} 209 210RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() { 211 write_lock_.AssertAcquired(); 212 return write_buffer_.get(); 213} 214 215void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { 216 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 217 218 if (read_stopped_) { 219 NOTREACHED(); 220 return; 221 } 222 223 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; 224 225 // Keep reading data in a loop, and dispatch messages if enough data is 226 // received. Exit the loop if any of the following happens: 227 // - one or more messages were dispatched; 228 // - the last read failed, was a partial read or would block; 229 // - |Shutdown()| was called. 230 do { 231 if (io_result != IO_SUCCEEDED) { 232 read_stopped_ = true; 233 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); 234 return; 235 } 236 237 read_buffer_->num_valid_bytes_ += bytes_read; 238 239 // Dispatch all the messages that we can. 240 bool did_dispatch_message = false; 241 // Tracks the offset of the first undispatched message in |read_buffer_|. 242 // Currently, we copy data to ensure that this is zero at the beginning. 243 size_t read_buffer_start = 0; 244 size_t remaining_bytes = read_buffer_->num_valid_bytes_; 245 size_t message_size; 246 // Note that we rely on short-circuit evaluation here: 247 // - |read_buffer_start| may be an invalid index into 248 // |read_buffer_->buffer_| if |remaining_bytes| is zero. 249 // - |message_size| is only valid if |GetNextMessageSize()| returns true. 250 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the 251 // next read). 252 // TODO(vtl): Validate that |message_size| is sane. 253 while (remaining_bytes > 0 && 254 MessageInTransit::GetNextMessageSize( 255 &read_buffer_->buffer_[read_buffer_start], remaining_bytes, 256 &message_size) && 257 remaining_bytes >= message_size) { 258 MessageInTransit::View 259 message_view(message_size, &read_buffer_->buffer_[read_buffer_start]); 260 DCHECK_EQ(message_view.total_size(), message_size); 261 262 // Dispatch the message. 263 DCHECK(delegate_); 264 delegate_->OnReadMessage(message_view); 265 if (read_stopped_) { 266 // |Shutdown()| was called in |OnReadMessage()|. 267 // TODO(vtl): Add test for this case. 268 return; 269 } 270 did_dispatch_message = true; 271 272 // Update our state. 273 read_buffer_start += message_size; 274 remaining_bytes -= message_size; 275 } 276 277 if (read_buffer_start > 0) { 278 // Move data back to start. 279 read_buffer_->num_valid_bytes_ = remaining_bytes; 280 if (read_buffer_->num_valid_bytes_ > 0) { 281 memmove(&read_buffer_->buffer_[0], 282 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); 283 } 284 read_buffer_start = 0; 285 } 286 287 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < 288 kReadSize) { 289 // Use power-of-2 buffer sizes. 290 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 291 // maximum message size to whatever extent necessary). 292 // TODO(vtl): We may often be able to peek at the header and get the real 293 // required extra space (which may be much bigger than |kReadSize|). 294 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); 295 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) 296 new_size *= 2; 297 298 // TODO(vtl): It's suboptimal to zero out the fresh memory. 299 read_buffer_->buffer_.resize(new_size, 0); 300 } 301 302 // (1) If we dispatched any messages, stop reading for now (and let the 303 // message loop do its thing for another round). 304 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only 305 // a single message. Risks: slower, more complex if we want to avoid lots of 306 // copying. ii. Keep reading until there's no more data and dispatch all the 307 // messages we can. Risks: starvation of other users of the message loop.) 308 // (2) If we didn't max out |kReadSize|, stop reading for now. 309 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; 310 bytes_read = 0; 311 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); 312 } while (io_result != IO_PENDING); 313} 314 315void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { 316 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 317 318 bool did_fail = false; 319 { 320 base::AutoLock locker(write_lock_); 321 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); 322 323 if (write_stopped_) { 324 NOTREACHED(); 325 return; 326 } 327 328 did_fail = !OnWriteCompletedNoLock(result, bytes_written); 329 } 330 331 if (did_fail) 332 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); 333} 334 335void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { 336 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 337 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 338 if (delegate_) 339 delegate_->OnFatalError(fatal_error); 340} 341 342bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { 343 write_lock_.AssertAcquired(); 344 345 DCHECK(!write_stopped_); 346 DCHECK(!write_buffer_->message_queue_.empty()); 347 348 if (result) { 349 if (bytes_written < write_buffer_->GetTotalBytesToWrite()) { 350 // Partial (or no) write. 351 write_buffer_->offset_ += bytes_written; 352 } else { 353 // Complete write. 354 DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite()); 355 delete write_buffer_->message_queue_.front(); 356 write_buffer_->message_queue_.pop_front(); 357 write_buffer_->offset_ = 0; 358 } 359 360 if (write_buffer_->message_queue_.empty()) 361 return true; 362 363 // Schedule the next write. 364 if (ScheduleWriteNoLock() == IO_PENDING) 365 return true; 366 } 367 368 write_stopped_ = true; 369 STLDeleteElements(&write_buffer_->message_queue_); 370 write_buffer_->offset_ = 0; 371 return false; 372} 373 374} // namespace system 375} // namespace mojo 376