raw_channel_posix.cc revision f2477e01787aa58f445919b809d89e252beef54f
1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/system/raw_channel.h" 6 7#include <errno.h> 8#include <string.h> 9#include <unistd.h> 10 11#include <algorithm> 12#include <deque> 13#include <vector> 14 15#include "base/basictypes.h" 16#include "base/bind.h" 17#include "base/compiler_specific.h" 18#include "base/location.h" 19#include "base/logging.h" 20#include "base/memory/scoped_ptr.h" 21#include "base/memory/weak_ptr.h" 22#include "base/message_loop/message_loop.h" 23#include "base/posix/eintr_wrapper.h" 24#include "base/synchronization/lock.h" 25#include "mojo/system/message_in_transit.h" 26#include "mojo/system/platform_channel_handle.h" 27 28namespace mojo { 29namespace system { 30 31const size_t kReadSize = 4096; 32 33class RawChannelPosix : public RawChannel, 34 public base::MessageLoopForIO::Watcher { 35 public: 36 RawChannelPosix(const PlatformChannelHandle& handle, 37 Delegate* delegate, 38 base::MessageLoop* message_loop); 39 virtual ~RawChannelPosix(); 40 41 // |RawChannel| implementation: 42 virtual bool Init() OVERRIDE; 43 virtual void Shutdown() OVERRIDE; 44 virtual bool WriteMessage(MessageInTransit* message) OVERRIDE; 45 46 private: 47 // |base::MessageLoopForIO::Watcher| implementation: 48 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; 49 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; 50 51 // Watches for |fd_| to become writable. Must be called on the I/O thread. 52 void WaitToWrite(); 53 54 // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O 55 // thread WITHOUT |write_lock_| held. 56 void CallOnFatalError(Delegate::FatalError fatal_error); 57 58 // Writes the message at the front of |write_message_queue_|, starting at 59 // |write_message_offset_|. It removes and destroys if the write completes and 60 // otherwise updates |write_message_offset_|. Returns true on success. Must be 61 // called under |write_lock_|. 62 bool WriteFrontMessageNoLock(); 63 64 // Cancels all pending writes and destroys the contents of 65 // |write_message_queue_|. Should only be called if |is_dead_| is false; sets 66 // |is_dead_| to true. Must be called under |write_lock_|. 67 void CancelPendingWritesNoLock(); 68 69 base::MessageLoopForIO* message_loop_for_io() { 70 return static_cast<base::MessageLoopForIO*>(message_loop()); 71 } 72 73 int fd_; 74 75 // Only used on the I/O thread: 76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 77 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 78 79 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| 80 // is always aligned with a message boundary (we will copy memory to ensure 81 // this), but |read_buffer_| may be larger than the actual number of bytes we 82 // have. 83 std::vector<char> read_buffer_; 84 size_t read_buffer_num_valid_bytes_; 85 86 base::Lock write_lock_; // Protects the following members. 87 bool is_dead_; 88 std::deque<MessageInTransit*> write_message_queue_; 89 size_t write_message_offset_; 90 // This is used for posting tasks from write threads to the I/O thread. It 91 // must only be accessed under |write_lock_|. The weak pointers it produces 92 // are only used/invalidated on the I/O thread. 93 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; 94 95 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); 96}; 97 98RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle, 99 Delegate* delegate, 100 base::MessageLoop* message_loop) 101 : RawChannel(delegate, message_loop), 102 fd_(handle.fd), 103 read_buffer_num_valid_bytes_(0), 104 is_dead_(false), 105 write_message_offset_(0), 106 weak_ptr_factory_(this) { 107 CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO); 108 DCHECK_NE(fd_, -1); 109} 110 111RawChannelPosix::~RawChannelPosix() { 112 DCHECK(is_dead_); 113 DCHECK_EQ(fd_, -1); 114 115 // No need to take the |write_lock_| here -- if there are still weak pointers 116 // outstanding, then we're hosed anyway (since we wouldn't be able to 117 // invalidate them cleanly, since we might not be on the I/O thread). 118 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 119 120 // These must have been shut down/destroyed on the I/O thread. 121 DCHECK(!read_watcher_.get()); 122 DCHECK(!write_watcher_.get()); 123} 124 125bool RawChannelPosix::Init() { 126 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 127 128 DCHECK(!read_watcher_.get()); 129 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 130 DCHECK(!write_watcher_.get()); 131 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 132 133 // No need to take the lock. No one should be using us yet. 134 DCHECK(write_message_queue_.empty()); 135 136 if (!message_loop_for_io()->WatchFileDescriptor(fd_, true, 137 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { 138 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly 139 // (in the sense of returning the message loop's state to what it was before 140 // it was called). 141 read_watcher_.reset(); 142 write_watcher_.reset(); 143 return false; 144 } 145 146 return true; 147} 148 149void RawChannelPosix::Shutdown() { 150 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 151 152 base::AutoLock locker(write_lock_); 153 if (!is_dead_) 154 CancelPendingWritesNoLock(); 155 156 DCHECK_NE(fd_, -1); 157 if (close(fd_) != 0) 158 PLOG(ERROR) << "close"; 159 fd_ = -1; 160 161 weak_ptr_factory_.InvalidateWeakPtrs(); 162 163 read_watcher_.reset(); // This will stop watching (if necessary). 164 write_watcher_.reset(); // This will stop watching (if necessary). 165} 166 167// Reminder: This must be thread-safe, and takes ownership of |message| on 168// success. 169bool RawChannelPosix::WriteMessage(MessageInTransit* message) { 170 base::AutoLock locker(write_lock_); 171 if (is_dead_) { 172 message->Destroy(); 173 return false; 174 } 175 176 if (!write_message_queue_.empty()) { 177 write_message_queue_.push_back(message); 178 return true; 179 } 180 181 write_message_queue_.push_front(message); 182 DCHECK_EQ(write_message_offset_, 0u); 183 bool result = WriteFrontMessageNoLock(); 184 DCHECK(result || write_message_queue_.empty()); 185 186 if (!result) { 187 // Even if we're on the I/O thread, don't call |OnFatalError()| in the 188 // nested context. 189 message_loop()->PostTask(FROM_HERE, 190 base::Bind(&RawChannelPosix::CallOnFatalError, 191 weak_ptr_factory_.GetWeakPtr(), 192 Delegate::FATAL_ERROR_FAILED_WRITE)); 193 } else if (!write_message_queue_.empty()) { 194 // Set up to wait for the FD to become writable. If we're not on the I/O 195 // thread, we have to post a task to do this. 196 if (base::MessageLoop::current() == message_loop()) { 197 WaitToWrite(); 198 } else { 199 message_loop()->PostTask(FROM_HERE, 200 base::Bind(&RawChannelPosix::WaitToWrite, 201 weak_ptr_factory_.GetWeakPtr())); 202 } 203 } 204 205 return result; 206} 207 208void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { 209 DCHECK_EQ(fd, fd_); 210 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 211 212 bool did_dispatch_message = false; 213 // Tracks the offset of the first undispatched message in |read_buffer_|. 214 // Currently, we copy data to ensure that this is zero at the beginning. 215 size_t read_buffer_start = 0; 216 for (;;) { 217 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) 218 < kReadSize) { 219 // Use power-of-2 buffer sizes. 220 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 221 // maximum message size to whatever extent necessary). 222 // TODO(vtl): We may often be able to peek at the header and get the real 223 // required extra space (which may be much bigger than |kReadSize|). 224 size_t new_size = std::max(read_buffer_.size(), kReadSize); 225 while (new_size < 226 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) 227 new_size *= 2; 228 229 // TODO(vtl): It's suboptimal to zero out the fresh memory. 230 read_buffer_.resize(new_size, 0); 231 } 232 233 ssize_t bytes_read = HANDLE_EINTR( 234 read(fd_, 235 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], 236 kReadSize)); 237 if (bytes_read < 0) { 238 if (errno != EAGAIN && errno != EWOULDBLOCK) { 239 PLOG(ERROR) << "read"; 240 { 241 base::AutoLock locker(write_lock_); 242 CancelPendingWritesNoLock(); 243 } 244 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); 245 return; 246 } 247 248 break; 249 } 250 251 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); 252 253 // Dispatch all the messages that we can. 254 while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) { 255 const MessageInTransit* message = 256 reinterpret_cast<const MessageInTransit*>( 257 &read_buffer_[read_buffer_start]); 258 DCHECK_EQ(reinterpret_cast<size_t>(message) % 259 MessageInTransit::kMessageAlignment, 0u); 260 // If we have the header, not the whole message.... 261 if (read_buffer_num_valid_bytes_ < 262 message->size_with_header_and_padding()) 263 break; 264 265 // Dispatch the message. 266 delegate()->OnReadMessage(*message); 267 if (!read_watcher_.get()) { 268 // |Shutdown()| was called in |OnReadMessage()|. 269 // TODO(vtl): Add test for this case. 270 return; 271 } 272 did_dispatch_message = true; 273 274 // Update our state. 275 read_buffer_start += message->size_with_header_and_padding(); 276 read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding(); 277 } 278 279 // If we dispatched any messages, stop reading for now (and let the message 280 // loop do its thing for another round). 281 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only 282 // a single message. Risks: slower, more complex if we want to avoid lots of 283 // copying. ii. Keep reading until there's no more data and dispatch all the 284 // messages we can. Risks: starvation of other users of the message loop.) 285 if (did_dispatch_message) 286 break; 287 288 // If we didn't max out |kReadSize|, stop reading for now. 289 if (static_cast<size_t>(bytes_read) < kReadSize) 290 break; 291 292 // Else try to read some more.... 293 } 294 295 // Move data back to start. 296 if (read_buffer_start > 0) { 297 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], 298 read_buffer_num_valid_bytes_); 299 read_buffer_start = 0; 300 } 301} 302 303void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { 304 DCHECK_EQ(fd, fd_); 305 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 306 307 bool did_fail = false; 308 { 309 base::AutoLock locker(write_lock_); 310 DCHECK(!is_dead_); 311 DCHECK(!write_message_queue_.empty()); 312 313 bool result = WriteFrontMessageNoLock(); 314 DCHECK(result || write_message_queue_.empty()); 315 316 if (!result) 317 did_fail = true; 318 else if (!write_message_queue_.empty()) 319 WaitToWrite(); 320 } 321 if (did_fail) 322 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); 323} 324 325void RawChannelPosix::WaitToWrite() { 326 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 327 328 DCHECK(write_watcher_.get()); 329 bool result = message_loop_for_io()->WatchFileDescriptor( 330 fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), 331 this); 332 DCHECK(result); 333} 334 335void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { 336 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 337 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 338 delegate()->OnFatalError(fatal_error); 339} 340 341bool RawChannelPosix::WriteFrontMessageNoLock() { 342 write_lock_.AssertAcquired(); 343 344 DCHECK(!is_dead_); 345 DCHECK(!write_message_queue_.empty()); 346 347 MessageInTransit* message = write_message_queue_.front(); 348 DCHECK_LT(write_message_offset_, message->size_with_header_and_padding()); 349 size_t bytes_to_write = 350 message->size_with_header_and_padding() - write_message_offset_; 351 ssize_t bytes_written = HANDLE_EINTR( 352 write(fd_, 353 reinterpret_cast<char*>(message) + write_message_offset_, 354 bytes_to_write)); 355 if (bytes_written < 0) { 356 if (errno != EAGAIN && errno != EWOULDBLOCK) { 357 PLOG(ERROR) << "write of size " << bytes_to_write; 358 CancelPendingWritesNoLock(); 359 return false; 360 } 361 362 // We simply failed to write since we'd block. The logic is the same as if 363 // we got a partial write. 364 bytes_written = 0; 365 } 366 367 DCHECK_GE(bytes_written, 0); 368 if (static_cast<size_t>(bytes_written) < bytes_to_write) { 369 // Partial (or no) write. 370 write_message_offset_ += static_cast<size_t>(bytes_written); 371 } else { 372 // Complete write. 373 DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); 374 write_message_queue_.pop_front(); 375 write_message_offset_ = 0; 376 message->Destroy(); 377 } 378 379 return true; 380} 381 382void RawChannelPosix::CancelPendingWritesNoLock() { 383 write_lock_.AssertAcquired(); 384 DCHECK(!is_dead_); 385 386 is_dead_ = true; 387 for (std::deque<MessageInTransit*>::iterator it = 388 write_message_queue_.begin(); it != write_message_queue_.end(); 389 ++it) { 390 (*it)->Destroy(); 391 } 392 write_message_queue_.clear(); 393} 394 395// ----------------------------------------------------------------------------- 396 397// Static factory method declared in raw_channel.h. 398// static 399RawChannel* RawChannel::Create(const PlatformChannelHandle& handle, 400 Delegate* delegate, 401 base::MessageLoop* message_loop) { 402 return new RawChannelPosix(handle, delegate, message_loop); 403} 404 405} // namespace system 406} // namespace mojo 407