raw_channel_posix.cc revision a1401311d1ab56c4ed0a474bd38c108f75cb0cd9
1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/system/raw_channel.h" 6 7#include <errno.h> 8#include <sys/uio.h> 9#include <unistd.h> 10 11#include <algorithm> 12 13#include "base/basictypes.h" 14#include "base/bind.h" 15#include "base/compiler_specific.h" 16#include "base/location.h" 17#include "base/logging.h" 18#include "base/memory/scoped_ptr.h" 19#include "base/memory/weak_ptr.h" 20#include "base/message_loop/message_loop.h" 21#include "base/posix/eintr_wrapper.h" 22#include "base/synchronization/lock.h" 23#include "mojo/system/embedder/platform_handle.h" 24 25namespace mojo { 26namespace system { 27 28namespace { 29 30class RawChannelPosix : public RawChannel, 31 public base::MessageLoopForIO::Watcher { 32 public: 33 RawChannelPosix(embedder::ScopedPlatformHandle handle, 34 Delegate* delegate, 35 base::MessageLoopForIO* message_loop_for_io); 36 virtual ~RawChannelPosix(); 37 38 private: 39 // |RawChannel| implementation: 40 virtual IOResult Read(size_t* bytes_read) OVERRIDE; 41 virtual IOResult ScheduleRead() OVERRIDE; 42 virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE; 43 virtual IOResult ScheduleWriteNoLock() OVERRIDE; 44 virtual bool OnInit() OVERRIDE; 45 virtual void OnShutdownNoLock( 46 scoped_ptr<ReadBuffer> read_buffer, 47 scoped_ptr<WriteBuffer> write_buffer) OVERRIDE; 48 49 // |base::MessageLoopForIO::Watcher| implementation: 50 virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; 51 virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; 52 53 // Watches for |fd_| to become writable. Must be called on the I/O thread. 54 void WaitToWrite(); 55 56 embedder::ScopedPlatformHandle fd_; 57 58 // The following members are only used on the I/O thread: 59 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 60 scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 61 62 bool pending_read_; 63 64 // The following members are used on multiple threads and protected by 65 // |write_lock()|: 66 bool pending_write_; 67 68 // This is used for posting tasks from write threads to the I/O thread. It 69 // must only be accessed under |write_lock_|. The weak pointers it produces 70 // are only used/invalidated on the I/O thread. 71 base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_; 72 73 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix); 74}; 75 76RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle, 77 Delegate* delegate, 78 base::MessageLoopForIO* message_loop_for_io) 79 : RawChannel(delegate, message_loop_for_io), 80 fd_(handle.Pass()), 81 pending_read_(false), 82 pending_write_(false), 83 weak_ptr_factory_(this) { 84 DCHECK(fd_.is_valid()); 85} 86 87RawChannelPosix::~RawChannelPosix() { 88 DCHECK(!pending_read_); 89 DCHECK(!pending_write_); 90 91 // No need to take the |write_lock()| here -- if there are still weak pointers 92 // outstanding, then we're hosed anyway (since we wouldn't be able to 93 // invalidate them cleanly, since we might not be on the I/O thread). 94 DCHECK(!weak_ptr_factory_.HasWeakPtrs()); 95 96 // These must have been shut down/destroyed on the I/O thread. 97 DCHECK(!read_watcher_.get()); 98 DCHECK(!write_watcher_.get()); 99} 100 101RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) { 102 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 103 DCHECK(!pending_read_); 104 105 char* buffer = NULL; 106 size_t bytes_to_read = 0; 107 read_buffer()->GetBuffer(&buffer, &bytes_to_read); 108 109 ssize_t read_result = HANDLE_EINTR(read(fd_.get().fd, buffer, bytes_to_read)); 110 111 if (read_result > 0) { 112 *bytes_read = static_cast<size_t>(read_result); 113 return IO_SUCCEEDED; 114 } 115 116 // |read_result == 0| means "end of file". 117 if (read_result == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) { 118 if (read_result != 0) 119 PLOG(ERROR) << "read"; 120 121 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again. 122 read_watcher_.reset(); 123 124 return IO_FAILED; 125 } 126 127 return ScheduleRead(); 128} 129 130RawChannel::IOResult RawChannelPosix::ScheduleRead() { 131 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 132 DCHECK(!pending_read_); 133 134 pending_read_ = true; 135 136 return IO_PENDING; 137} 138 139RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) { 140 write_lock().AssertAcquired(); 141 142 DCHECK(!pending_write_); 143 144 std::vector<WriteBuffer::Buffer> buffers; 145 write_buffer_no_lock()->GetBuffers(&buffers); 146 DCHECK(!buffers.empty()); 147 148 ssize_t write_result = -1; 149 if (buffers.size() == 1) { 150 write_result = HANDLE_EINTR( 151 write(fd_.get().fd, buffers[0].addr, buffers[0].size)); 152 } else { 153 // Note that using |writev()| is measurably slower than using |write()| -- 154 // at least in a microbenchmark -- but much faster than using multiple 155 // |write()|s. 156 const size_t kMaxBufferCount = 10; 157 iovec iov[kMaxBufferCount]; 158 size_t buffer_count = std::min(buffers.size(), kMaxBufferCount); 159 160 for (size_t i = 0; i < buffer_count; ++i) { 161 iov[i].iov_base = const_cast<char*>(buffers[i].addr); 162 iov[i].iov_len = buffers[i].size; 163 } 164 165 write_result = HANDLE_EINTR(writev(fd_.get().fd, iov, buffer_count)); 166 } 167 168 if (write_result >= 0) { 169 *bytes_written = static_cast<size_t>(write_result); 170 return IO_SUCCEEDED; 171 } 172 173 if (errno != EAGAIN && errno != EWOULDBLOCK) { 174 PLOG(ERROR) << "write of size " 175 << write_buffer_no_lock()->GetTotalBytesToWrite(); 176 return IO_FAILED; 177 } 178 179 return ScheduleWriteNoLock(); 180} 181 182RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() { 183 write_lock().AssertAcquired(); 184 185 DCHECK(!pending_write_); 186 187 // Set up to wait for the FD to become writable. 188 // If we're not on the I/O thread, we have to post a task to do this. 189 if (base::MessageLoop::current() != message_loop_for_io()) { 190 message_loop_for_io()->PostTask( 191 FROM_HERE, 192 base::Bind(&RawChannelPosix::WaitToWrite, 193 weak_ptr_factory_.GetWeakPtr())); 194 pending_write_ = true; 195 return IO_PENDING; 196 } 197 198 if (message_loop_for_io()->WatchFileDescriptor( 199 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, 200 write_watcher_.get(), this)) { 201 pending_write_ = true; 202 return IO_PENDING; 203 } 204 205 return IO_FAILED; 206} 207 208bool RawChannelPosix::OnInit() { 209 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 210 211 DCHECK(!read_watcher_.get()); 212 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 213 DCHECK(!write_watcher_.get()); 214 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher()); 215 216 if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true, 217 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) { 218 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly 219 // (in the sense of returning the message loop's state to what it was before 220 // it was called). 221 read_watcher_.reset(); 222 write_watcher_.reset(); 223 return false; 224 } 225 226 return true; 227} 228 229void RawChannelPosix::OnShutdownNoLock( 230 scoped_ptr<ReadBuffer> /*read_buffer*/, 231 scoped_ptr<WriteBuffer> /*write_buffer*/) { 232 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 233 write_lock().AssertAcquired(); 234 235 read_watcher_.reset(); // This will stop watching (if necessary). 236 write_watcher_.reset(); // This will stop watching (if necessary). 237 238 pending_read_ = false; 239 pending_write_ = false; 240 241 DCHECK(fd_.is_valid()); 242 fd_.reset(); 243 244 weak_ptr_factory_.InvalidateWeakPtrs(); 245} 246 247void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) { 248 DCHECK_EQ(fd, fd_.get().fd); 249 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 250 251 if (!pending_read_) { 252 NOTREACHED(); 253 return; 254 } 255 256 pending_read_ = false; 257 size_t bytes_read = 0; 258 IOResult result = Read(&bytes_read); 259 if (result != IO_PENDING) 260 OnReadCompleted(result == IO_SUCCEEDED, bytes_read); 261 262 // On failure, |read_watcher_| must have been reset; on success, 263 // we assume that |OnReadCompleted()| always schedules another read. 264 // Otherwise, we could end up spinning -- getting 265 // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual 266 // read. 267 // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't 268 // schedule a new read. But that code won't be reached under the current 269 // RawChannel implementation. 270 DCHECK(!read_watcher_.get() || pending_read_); 271} 272 273void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { 274 DCHECK_EQ(fd, fd_.get().fd); 275 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 276 277 IOResult result = IO_FAILED; 278 size_t bytes_written = 0; 279 { 280 base::AutoLock locker(write_lock()); 281 282 DCHECK(pending_write_); 283 284 pending_write_ = false; 285 result = WriteNoLock(&bytes_written); 286 } 287 288 if (result != IO_PENDING) 289 OnWriteCompleted(result == IO_SUCCEEDED, bytes_written); 290} 291 292void RawChannelPosix::WaitToWrite() { 293 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 294 295 DCHECK(write_watcher_.get()); 296 297 if (!message_loop_for_io()->WatchFileDescriptor( 298 fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE, 299 write_watcher_.get(), this)) { 300 { 301 base::AutoLock locker(write_lock()); 302 303 DCHECK(pending_write_); 304 pending_write_ = false; 305 } 306 OnWriteCompleted(false, 0); 307 } 308} 309 310} // namespace 311 312// ----------------------------------------------------------------------------- 313 314// Static factory method declared in raw_channel.h. 315// static 316RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle, 317 Delegate* delegate, 318 base::MessageLoopForIO* message_loop_for_io) { 319 return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io); 320} 321 322} // namespace system 323} // namespace mojo 324