raw_channel.cc revision 010d83a9304c5a91596085d917d248abff47903a
1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/system/raw_channel.h" 6 7#include <string.h> 8 9#include <algorithm> 10 11#include "base/bind.h" 12#include "base/location.h" 13#include "base/logging.h" 14#include "base/message_loop/message_loop.h" 15#include "base/stl_util.h" 16#include "mojo/system/message_in_transit.h" 17#include "mojo/system/transport_data.h" 18 19namespace mojo { 20namespace system { 21 22const size_t kReadSize = 4096; 23 24RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) { 25} 26 27RawChannel::ReadBuffer::~ReadBuffer() {} 28 29void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) { 30 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize); 31 *addr = &buffer_[0] + num_valid_bytes_; 32 *size = kReadSize; 33} 34 35RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {} 36 37RawChannel::WriteBuffer::~WriteBuffer() { 38 STLDeleteElements(&message_queue_); 39} 40 41void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const { 42 buffers->clear(); 43 44 if (message_queue_.empty()) 45 return; 46 47 MessageInTransit* message = message_queue_.front(); 48 DCHECK_LT(offset_, message->total_size()); 49 size_t bytes_to_write = message->total_size() - offset_; 50 51 size_t transport_data_buffer_size = message->transport_data() ? 52 message->transport_data()->buffer_size() : 0; 53 54 if (!transport_data_buffer_size) { 55 // Only write from the main buffer. 56 DCHECK_LT(offset_, message->main_buffer_size()); 57 DCHECK_LE(bytes_to_write, message->main_buffer_size()); 58 Buffer buffer = { 59 static_cast<const char*>(message->main_buffer()) + offset_, 60 bytes_to_write}; 61 buffers->push_back(buffer); 62 return; 63 } 64 65 if (offset_ >= message->main_buffer_size()) { 66 // Only write from the transport data buffer. 67 DCHECK_LT(offset_ - message->main_buffer_size(), 68 transport_data_buffer_size); 69 DCHECK_LE(bytes_to_write, transport_data_buffer_size); 70 Buffer buffer = { 71 static_cast<const char*>(message->transport_data()->buffer()) + 72 (offset_ - message->main_buffer_size()), 73 bytes_to_write}; 74 buffers->push_back(buffer); 75 return; 76 } 77 78 // Write from both buffers. 79 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ + 80 transport_data_buffer_size); 81 Buffer buffer1 = { 82 static_cast<const char*>(message->main_buffer()) + offset_, 83 message->main_buffer_size() - offset_ 84 }; 85 buffers->push_back(buffer1); 86 Buffer buffer2 = { 87 static_cast<const char*>(message->transport_data()->buffer()), 88 transport_data_buffer_size 89 }; 90 buffers->push_back(buffer2); 91} 92 93RawChannel::RawChannel() 94 : message_loop_for_io_(NULL), 95 delegate_(NULL), 96 read_stopped_(false), 97 write_stopped_(false), 98 weak_ptr_factory_(this) { 99} 100 101RawChannel::~RawChannel() { 102 DCHECK(!read_buffer_); 103 DCHECK(!write_buffer_); 104 105 // No need to take the |write_lock_| here -- if there are still weak pointers 106 // outstanding, then we're hosed anyway (since we wouldn't be able to 107 // invalidate them cleanly, since we might not be on the I/O thread). 108 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 109} 110 111bool RawChannel::Init(Delegate* delegate) { 112 DCHECK(delegate); 113 114 DCHECK(!delegate_); 115 delegate_ = delegate; 116 117 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO); 118 DCHECK(!message_loop_for_io_); 119 message_loop_for_io_ = 120 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current()); 121 122 // No need to take the lock. No one should be using us yet. 123 DCHECK(!read_buffer_); 124 read_buffer_.reset(new ReadBuffer); 125 DCHECK(!write_buffer_); 126 write_buffer_.reset(new WriteBuffer); 127 128 if (!OnInit()) { 129 delegate_ = NULL; 130 message_loop_for_io_ = NULL; 131 read_buffer_.reset(); 132 write_buffer_.reset(); 133 return false; 134 } 135 136 return ScheduleRead() == IO_PENDING; 137} 138 139void RawChannel::Shutdown() { 140 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 141 142 base::AutoLock locker(write_lock_); 143 144 LOG_IF(WARNING, !write_buffer_->message_queue_.empty()) 145 << "Shutting down RawChannel with write buffer nonempty"; 146 147 // Reset the delegate so that it won't receive further calls. 148 delegate_ = NULL; 149 read_stopped_ = true; 150 write_stopped_ = true; 151 weak_ptr_factory_.InvalidateWeakPtrs(); 152 153 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass()); 154} 155 156// Reminder: This must be thread-safe. 157bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) { 158 DCHECK(message); 159 160 // TODO(vtl) 161 if (message->transport_data() && 162 message->transport_data()->has_platform_handles()) { 163 NOTIMPLEMENTED(); 164 return false; 165 } 166 167 base::AutoLock locker(write_lock_); 168 if (write_stopped_) 169 return false; 170 171 if (!write_buffer_->message_queue_.empty()) { 172 write_buffer_->message_queue_.push_back(message.release()); 173 return true; 174 } 175 176 write_buffer_->message_queue_.push_front(message.release()); 177 DCHECK_EQ(write_buffer_->offset_, 0u); 178 179 size_t bytes_written = 0; 180 IOResult io_result = WriteNoLock(&bytes_written); 181 if (io_result == IO_PENDING) 182 return true; 183 184 bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED, 185 bytes_written); 186 if (!result) { 187 // Even if we're on the I/O thread, don't call |OnFatalError()| in the 188 // nested context. 189 message_loop_for_io_->PostTask( 190 FROM_HERE, 191 base::Bind(&RawChannel::CallOnFatalError, 192 weak_ptr_factory_.GetWeakPtr(), 193 Delegate::FATAL_ERROR_FAILED_WRITE)); 194 } 195 196 return result; 197} 198 199// Reminder: This must be thread-safe. 200bool RawChannel::IsWriteBufferEmpty() { 201 base::AutoLock locker(write_lock_); 202 return write_buffer_->message_queue_.empty(); 203} 204 205RawChannel::ReadBuffer* RawChannel::read_buffer() { 206 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 207 return read_buffer_.get(); 208} 209 210RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() { 211 write_lock_.AssertAcquired(); 212 return write_buffer_.get(); 213} 214 215void RawChannel::OnReadCompleted(bool result, size_t bytes_read) { 216 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 217 218 if (read_stopped_) { 219 NOTREACHED(); 220 return; 221 } 222 223 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED; 224 225 // Keep reading data in a loop, and dispatch messages if enough data is 226 // received. Exit the loop if any of the following happens: 227 // - one or more messages were dispatched; 228 // - the last read failed, was a partial read or would block; 229 // - |Shutdown()| was called. 230 do { 231 if (io_result != IO_SUCCEEDED) { 232 read_stopped_ = true; 233 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); 234 return; 235 } 236 237 read_buffer_->num_valid_bytes_ += bytes_read; 238 239 // Dispatch all the messages that we can. 240 bool did_dispatch_message = false; 241 // Tracks the offset of the first undispatched message in |read_buffer_|. 242 // Currently, we copy data to ensure that this is zero at the beginning. 243 size_t read_buffer_start = 0; 244 size_t remaining_bytes = read_buffer_->num_valid_bytes_; 245 size_t message_size; 246 // Note that we rely on short-circuit evaluation here: 247 // - |read_buffer_start| may be an invalid index into 248 // |read_buffer_->buffer_| if |remaining_bytes| is zero. 249 // - |message_size| is only valid if |GetNextMessageSize()| returns true. 250 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the 251 // next read). 252 // TODO(vtl): Validate that |message_size| is sane. 253 while (remaining_bytes > 0 && 254 MessageInTransit::GetNextMessageSize( 255 &read_buffer_->buffer_[read_buffer_start], remaining_bytes, 256 &message_size) && 257 remaining_bytes >= message_size) { 258 MessageInTransit::View 259 message_view(message_size, &read_buffer_->buffer_[read_buffer_start]); 260 DCHECK_EQ(message_view.total_size(), message_size); 261 262 // Dispatch the message. 263 DCHECK(delegate_); 264 delegate_->OnReadMessage(message_view); 265 if (read_stopped_) { 266 // |Shutdown()| was called in |OnReadMessage()|. 267 // TODO(vtl): Add test for this case. 268 return; 269 } 270 did_dispatch_message = true; 271 272 // Update our state. 273 read_buffer_start += message_size; 274 remaining_bytes -= message_size; 275 } 276 277 if (read_buffer_start > 0) { 278 // Move data back to start. 279 read_buffer_->num_valid_bytes_ = remaining_bytes; 280 if (read_buffer_->num_valid_bytes_ > 0) { 281 memmove(&read_buffer_->buffer_[0], 282 &read_buffer_->buffer_[read_buffer_start], remaining_bytes); 283 } 284 read_buffer_start = 0; 285 } 286 287 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ < 288 kReadSize) { 289 // Use power-of-2 buffer sizes. 290 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 291 // maximum message size to whatever extent necessary). 292 // TODO(vtl): We may often be able to peek at the header and get the real 293 // required extra space (which may be much bigger than |kReadSize|). 294 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize); 295 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize) 296 new_size *= 2; 297 298 // TODO(vtl): It's suboptimal to zero out the fresh memory. 299 read_buffer_->buffer_.resize(new_size, 0); 300 } 301 302 // (1) If we dispatched any messages, stop reading for now (and let the 303 // message loop do its thing for another round). 304 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only 305 // a single message. Risks: slower, more complex if we want to avoid lots of 306 // copying. ii. Keep reading until there's no more data and dispatch all the 307 // messages we can. Risks: starvation of other users of the message loop.) 308 // (2) If we didn't max out |kReadSize|, stop reading for now. 309 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize; 310 bytes_read = 0; 311 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read); 312 } while (io_result != IO_PENDING); 313} 314 315void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) { 316 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 317 318 bool did_fail = false; 319 { 320 base::AutoLock locker(write_lock_); 321 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty()); 322 323 if (write_stopped_) { 324 NOTREACHED(); 325 return; 326 } 327 328 did_fail = !OnWriteCompletedNoLock(result, bytes_written); 329 } 330 331 if (did_fail) 332 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); 333} 334 335void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) { 336 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_); 337 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 338 if (delegate_) 339 delegate_->OnFatalError(fatal_error); 340} 341 342bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) { 343 write_lock_.AssertAcquired(); 344 345 DCHECK(!write_stopped_); 346 DCHECK(!write_buffer_->message_queue_.empty()); 347 348 if (result) { 349 write_buffer_->offset_ += bytes_written; 350 351 MessageInTransit* message = write_buffer_->message_queue_.front(); 352 if (write_buffer_->offset_ >= message->total_size()) { 353 // Complete write. 354 DCHECK_EQ(write_buffer_->offset_, message->total_size()); 355 write_buffer_->message_queue_.pop_front(); 356 delete message; 357 write_buffer_->offset_ = 0; 358 359 if (write_buffer_->message_queue_.empty()) 360 return true; 361 } 362 363 // Schedule the next write. 364 IOResult io_result = ScheduleWriteNoLock(); 365 if (io_result == IO_PENDING) 366 return true; 367 DCHECK_EQ(io_result, IO_FAILED); 368 } 369 370 write_stopped_ = true; 371 STLDeleteElements(&write_buffer_->message_queue_); 372 write_buffer_->offset_ = 0; 373 return false; 374} 375 376} // namespace system 377} // namespace mojo 378