raw_channel_posix.cc revision 4e180b6a0b4720a9b8e9e959a882386f690f08ff
1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/system/raw_channel.h" 6 7#include <errno.h> 8#include <string.h> 9#include <unistd.h> 10 11#include <algorithm> 12#include <deque> 13#include <vector> 14 15#include "base/basictypes.h" 16#include "base/bind.h" 17#include "base/compiler_specific.h" 18#include "base/location.h" 19#include "base/logging.h" 20#include "base/memory/scoped_ptr.h" 21#include "base/memory/weak_ptr.h" 22#include "base/message_loop/message_loop.h" 23#include "base/posix/eintr_wrapper.h" 24#include "base/synchronization/lock.h" 25#include "mojo/system/message_in_transit.h" 26#include "mojo/system/platform_channel_handle.h" 27 28namespace mojo { 29namespace system { 30 31const size_t kReadSize = 4096; 32 33class RawChannelPosix : public RawChannel, 34 public base::MessageLoopForIO::Watcher { 35 public: 36 RawChannelPosix(const PlatformChannelHandle& handle, 37 Delegate* delegate, 38 base::MessageLoop* message_loop); 39 virtual ~RawChannelPosix(); 40 41 // |RawChannel| implementation: 42 virtual void Init() OVERRIDE; 43 virtual void Shutdown() OVERRIDE; 44 virtual bool WriteMessage(MessageInTransit* message) OVERRIDE; 45 46 private: 47 // |base::MessageLoopForIO::Watcher| implementation: 48 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; 49 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; 50 51 // Watches for |fd_| to become writable. Must be called on the I/O thread. 52 void WaitToWrite(); 53 54 // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O 55 // thread WITHOUT |write_lock_| held. 56 void CallOnFatalError(Delegate::FatalError fatal_error); 57 58 // Writes the message at the front of |write_message_queue_|, starting at 59 // |write_message_offset_|. It removes and destroys if the write completes and 60 // otherwise updates |write_message_offset_|. Returns true on success. Must be 61 // called under |write_lock_|. 62 bool WriteFrontMessageNoLock(); 63 64 // Cancels all pending writes and destroys the contents of 65 // |write_message_queue_|. Should only be called if |is_dead_| is false; sets 66 // |is_dead_| to true. Must be called under |write_lock_|. 67 void CancelPendingWritesNoLock(); 68 69 base::MessageLoopForIO* message_loop_for_io() { 70 return static_cast<base::MessageLoopForIO*>(message_loop()); 71 } 72 73 int fd_; 74 75 // Only used on the I/O thread: 76 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 77 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 78 79 // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_| 80 // is always aligned with a message boundary (we will copy memory to ensure 81 // this), but |read_buffer_| may be larger than the actual number of bytes we 82 // have. 83 std::vector<char> read_buffer_; 84 size_t read_buffer_num_valid_bytes_; 85 86 base::Lock write_lock_; // Protects the following members. 87 bool is_dead_; 88 std::deque<MessageInTransit*> write_message_queue_; 89 size_t write_message_offset_; 90 // This is used for posting tasks from write threads to the I/O thread. It 91 // must only be accessed under |write_lock_|. The weak pointers it produces 92 // are only used/invalidated on the I/O thread. 93 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; 94 95 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); 96}; 97 98RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle, 99 Delegate* delegate, 100 base::MessageLoop* message_loop) 101 : RawChannel(delegate, message_loop), 102 fd_(handle.fd), 103 read_buffer_num_valid_bytes_(0), 104 is_dead_(false), 105 write_message_offset_(0), 106 weak_ptr_factory_(this) { 107 CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO); 108 DCHECK_NE(fd_, -1); 109} 110 111RawChannelPosix::~RawChannelPosix() { 112 DCHECK(is_dead_); 113 DCHECK_EQ(fd_, -1); 114 115 // No need to take the |write_lock_| here -- if there are still weak pointers 116 // outstanding, then we're hosed anyway (since we wouldn't be able to 117 // invalidate them cleanly, since we might not be on the I/O thread). 118 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 119 120 // These must have been shut down/destroyed on the I/O thread. 121 DCHECK(!read_watcher_.get()); 122 DCHECK(!write_watcher_.get()); 123} 124 125void RawChannelPosix::Init() { 126 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 127 128 DCHECK(!read_watcher_.get()); 129 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 130 DCHECK(!write_watcher_.get()); 131 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 132 133 // No need to take the lock. No one should be using us yet. 134 DCHECK(write_message_queue_.empty()); 135 136 bool result = message_loop_for_io()->WatchFileDescriptor( 137 fd_, true, base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); 138 DCHECK(result); 139} 140 141void RawChannelPosix::Shutdown() { 142 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 143 144 base::AutoLock locker(write_lock_); 145 if (!is_dead_) 146 CancelPendingWritesNoLock(); 147 148 DCHECK_NE(fd_, -1); 149 if (close(fd_) != 0) 150 PLOG(ERROR) << "close"; 151 fd_ = -1; 152 153 weak_ptr_factory_.InvalidateWeakPtrs(); 154 155 read_watcher_.reset(); // This will stop watching (if necessary). 156 write_watcher_.reset(); // This will stop watching (if necessary). 157} 158 159// Reminder: This must be thread-safe, and takes ownership of |message| on 160// success. 161bool RawChannelPosix::WriteMessage(MessageInTransit* message) { 162 base::AutoLock locker(write_lock_); 163 if (is_dead_) { 164 message->Destroy(); 165 return false; 166 } 167 168 if (!write_message_queue_.empty()) { 169 write_message_queue_.push_back(message); 170 return true; 171 } 172 173 write_message_queue_.push_front(message); 174 DCHECK_EQ(write_message_offset_, 0u); 175 bool result = WriteFrontMessageNoLock(); 176 DCHECK(result || write_message_queue_.empty()); 177 178 if (!result) { 179 // Even if we're on the I/O thread, don't call |OnFatalError()| in the 180 // nested context. 181 message_loop()->PostTask(FROM_HERE, 182 base::Bind(&RawChannelPosix::CallOnFatalError, 183 weak_ptr_factory_.GetWeakPtr(), 184 Delegate::FATAL_ERROR_FAILED_WRITE)); 185 } else if (!write_message_queue_.empty()) { 186 // Set up to wait for the FD to become writable. If we're not on the I/O 187 // thread, we have to post a task to do this. 188 if (base::MessageLoop::current() == message_loop()) { 189 WaitToWrite(); 190 } else { 191 message_loop()->PostTask(FROM_HERE, 192 base::Bind(&RawChannelPosix::WaitToWrite, 193 weak_ptr_factory_.GetWeakPtr())); 194 } 195 } 196 197 return result; 198} 199 200void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { 201 DCHECK_EQ(fd, fd_); 202 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 203 204 bool did_dispatch_message = false; 205 // Tracks the offset of the first undispatched message in |read_buffer_|. 206 // Currently, we copy data to ensure that this is zero at the beginning. 207 size_t read_buffer_start = 0; 208 for (;;) { 209 if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_) 210 < kReadSize) { 211 // Use power-of-2 buffer sizes. 212 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the 213 // maximum message size to whatever extent necessary). 214 // TODO(vtl): We may often be able to peek at the header and get the real 215 // required extra space (which may be much bigger than |kReadSize|). 216 size_t new_size = std::max(read_buffer_.size(), kReadSize); 217 while (new_size < 218 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize) 219 new_size *= 2; 220 221 // TODO(vtl): It's suboptimal to zero out the fresh memory. 222 read_buffer_.resize(new_size, 0); 223 } 224 225 ssize_t bytes_read = HANDLE_EINTR( 226 read(fd_, 227 &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_], 228 kReadSize)); 229 if (bytes_read < 0) { 230 if (errno != EAGAIN && errno != EWOULDBLOCK) { 231 PLOG(ERROR) << "read"; 232 { 233 base::AutoLock locker(write_lock_); 234 CancelPendingWritesNoLock(); 235 } 236 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ); 237 return; 238 } 239 240 break; 241 } 242 243 read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read); 244 245 // Dispatch all the messages that we can. 246 while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) { 247 const MessageInTransit* message = 248 reinterpret_cast<const MessageInTransit*>( 249 &read_buffer_[read_buffer_start]); 250 DCHECK_EQ(reinterpret_cast<size_t>(message) % 251 MessageInTransit::kMessageAlignment, 0u); 252 // If we have the header, not the whole message.... 253 if (read_buffer_num_valid_bytes_ < 254 message->size_with_header_and_padding()) 255 break; 256 257 // Dispatch the message. 258 delegate()->OnReadMessage(*message); 259 if (!read_watcher_.get()) { 260 // |Shutdown()| was called in |OnReadMessage()|. 261 // TODO(vtl): Add test for this case. 262 return; 263 } 264 did_dispatch_message = true; 265 266 // Update our state. 267 read_buffer_start += message->size_with_header_and_padding(); 268 read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding(); 269 } 270 271 // If we dispatched any messages, stop reading for now (and let the message 272 // loop do its thing for another round). 273 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only 274 // a single message. Risks: slower, more complex if we want to avoid lots of 275 // copying. ii. Keep reading until there's no more data and dispatch all the 276 // messages we can. Risks: starvation of other users of the message loop.) 277 if (did_dispatch_message) 278 break; 279 280 // If we didn't max out |kReadSize|, stop reading for now. 281 if (static_cast<size_t>(bytes_read) < kReadSize) 282 break; 283 284 // Else try to read some more.... 285 } 286 287 // Move data back to start. 288 if (read_buffer_start > 0) { 289 memmove(&read_buffer_[0], &read_buffer_[read_buffer_start], 290 read_buffer_num_valid_bytes_); 291 read_buffer_start = 0; 292 } 293} 294 295void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { 296 DCHECK_EQ(fd, fd_); 297 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 298 299 bool did_fail = false; 300 { 301 base::AutoLock locker(write_lock_); 302 DCHECK(!is_dead_); 303 DCHECK(!write_message_queue_.empty()); 304 305 bool result = WriteFrontMessageNoLock(); 306 DCHECK(result || write_message_queue_.empty()); 307 308 if (!result) 309 did_fail = true; 310 else if (!write_message_queue_.empty()) 311 WaitToWrite(); 312 } 313 if (did_fail) 314 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE); 315} 316 317void RawChannelPosix::WaitToWrite() { 318 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 319 320 DCHECK(write_watcher_.get()); 321 bool result = message_loop_for_io()->WatchFileDescriptor( 322 fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), 323 this); 324 DCHECK(result); 325} 326 327void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) { 328 DCHECK_EQ(base::MessageLoop::current(), message_loop()); 329 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"? 330 delegate()->OnFatalError(fatal_error); 331} 332 333bool RawChannelPosix::WriteFrontMessageNoLock() { 334 write_lock_.AssertAcquired(); 335 336 DCHECK(!is_dead_); 337 DCHECK(!write_message_queue_.empty()); 338 339 MessageInTransit* message = write_message_queue_.front(); 340 DCHECK_LT(write_message_offset_, message->size_with_header_and_padding()); 341 size_t bytes_to_write = 342 message->size_with_header_and_padding() - write_message_offset_; 343 ssize_t bytes_written = HANDLE_EINTR( 344 write(fd_, 345 reinterpret_cast<char*>(message) + write_message_offset_, 346 bytes_to_write)); 347 if (bytes_written < 0) { 348 if (errno != EAGAIN && errno != EWOULDBLOCK) { 349 PLOG(ERROR) << "write of size " << bytes_to_write; 350 CancelPendingWritesNoLock(); 351 return false; 352 } 353 354 // We simply failed to write since we'd block. The logic is the same as if 355 // we got a partial write. 356 bytes_written = 0; 357 } 358 359 DCHECK_GE(bytes_written, 0); 360 if (static_cast<size_t>(bytes_written) < bytes_to_write) { 361 // Partial (or no) write. 362 write_message_offset_ += static_cast<size_t>(bytes_written); 363 } else { 364 // Complete write. 365 DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write); 366 write_message_queue_.pop_front(); 367 write_message_offset_ = 0; 368 message->Destroy(); 369 } 370 371 return true; 372} 373 374void RawChannelPosix::CancelPendingWritesNoLock() { 375 write_lock_.AssertAcquired(); 376 DCHECK(!is_dead_); 377 378 is_dead_ = true; 379 for (std::deque<MessageInTransit*>::iterator it = 380 write_message_queue_.begin(); it != write_message_queue_.end(); 381 ++it) { 382 (*it)->Destroy(); 383 } 384 write_message_queue_.clear(); 385} 386 387// ----------------------------------------------------------------------------- 388 389// Static factory method declared in raw_channel.h. 390// static 391RawChannel* RawChannel::Create(const PlatformChannelHandle& handle, 392 Delegate* delegate, 393 base::MessageLoop* message_loop) { 394 return new RawChannelPosix(handle, delegate, message_loop); 395} 396 397} // namespace system 398} // namespace mojo 399