raw_channel_posix.cc revision a1401311d1ab56c4ed0a474bd38c108f75cb0cd9
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/raw_channel.h"
6
7#include <errno.h>
8#include <sys/uio.h>
9#include <unistd.h>
10
11#include <algorithm>
12
13#include "base/basictypes.h"
14#include "base/bind.h"
15#include "base/compiler_specific.h"
16#include "base/location.h"
17#include "base/logging.h"
18#include "base/memory/scoped_ptr.h"
19#include "base/memory/weak_ptr.h"
20#include "base/message_loop/message_loop.h"
21#include "base/posix/eintr_wrapper.h"
22#include "base/synchronization/lock.h"
23#include "mojo/system/embedder/platform_handle.h"
24
25namespace mojo {
26namespace system {
27
28namespace {
29
30class RawChannelPosix : public RawChannel,
31                        public base::MessageLoopForIO::Watcher {
32 public:
33  RawChannelPosix(embedder::ScopedPlatformHandle handle,
34                  Delegate* delegate,
35                  base::MessageLoopForIO* message_loop_for_io);
36  virtual ~RawChannelPosix();
37
38 private:
39  // |RawChannel| implementation:
40  virtual IOResult Read(size_t* bytes_read) OVERRIDE;
41  virtual IOResult ScheduleRead() OVERRIDE;
42  virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE;
43  virtual IOResult ScheduleWriteNoLock() OVERRIDE;
44  virtual bool OnInit() OVERRIDE;
45  virtual void OnShutdownNoLock(
46      scoped_ptr<ReadBuffer> read_buffer,
47      scoped_ptr<WriteBuffer> write_buffer) OVERRIDE;
48
49  // |base::MessageLoopForIO::Watcher| implementation:
50  virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
51  virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
52
53  // Watches for |fd_| to become writable. Must be called on the I/O thread.
54  void WaitToWrite();
55
56  embedder::ScopedPlatformHandle fd_;
57
58  // The following members are only used on the I/O thread:
59  scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
60  scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
61
62  bool pending_read_;
63
64  // The following members are used on multiple threads and protected by
65  // |write_lock()|:
66  bool pending_write_;
67
68  // This is used for posting tasks from write threads to the I/O thread. It
69  // must only be accessed under |write_lock_|. The weak pointers it produces
70  // are only used/invalidated on the I/O thread.
71  base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
72
73  DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
74};
75
76RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle,
77                                 Delegate* delegate,
78                                 base::MessageLoopForIO* message_loop_for_io)
79    : RawChannel(delegate, message_loop_for_io),
80      fd_(handle.Pass()),
81      pending_read_(false),
82      pending_write_(false),
83      weak_ptr_factory_(this) {
84  DCHECK(fd_.is_valid());
85}
86
87RawChannelPosix::~RawChannelPosix() {
88  DCHECK(!pending_read_);
89  DCHECK(!pending_write_);
90
91  // No need to take the |write_lock()| here -- if there are still weak pointers
92  // outstanding, then we're hosed anyway (since we wouldn't be able to
93  // invalidate them cleanly, since we might not be on the I/O thread).
94  DCHECK(!weak_ptr_factory_.HasWeakPtrs());
95
96  // These must have been shut down/destroyed on the I/O thread.
97  DCHECK(!read_watcher_.get());
98  DCHECK(!write_watcher_.get());
99}
100
101RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
102  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
103  DCHECK(!pending_read_);
104
105  char* buffer = NULL;
106  size_t bytes_to_read = 0;
107  read_buffer()->GetBuffer(&buffer, &bytes_to_read);
108
109  ssize_t read_result = HANDLE_EINTR(read(fd_.get().fd, buffer, bytes_to_read));
110
111  if (read_result > 0) {
112    *bytes_read = static_cast<size_t>(read_result);
113    return IO_SUCCEEDED;
114  }
115
116  // |read_result == 0| means "end of file".
117  if (read_result == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
118    if (read_result != 0)
119      PLOG(ERROR) << "read";
120
121    // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
122    read_watcher_.reset();
123
124    return IO_FAILED;
125  }
126
127  return ScheduleRead();
128}
129
130RawChannel::IOResult RawChannelPosix::ScheduleRead() {
131  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
132  DCHECK(!pending_read_);
133
134  pending_read_ = true;
135
136  return IO_PENDING;
137}
138
139RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) {
140  write_lock().AssertAcquired();
141
142  DCHECK(!pending_write_);
143
144  std::vector<WriteBuffer::Buffer> buffers;
145  write_buffer_no_lock()->GetBuffers(&buffers);
146  DCHECK(!buffers.empty());
147
148  ssize_t write_result = -1;
149  if (buffers.size() == 1) {
150    write_result = HANDLE_EINTR(
151        write(fd_.get().fd, buffers[0].addr, buffers[0].size));
152  } else {
153    // Note that using |writev()| is measurably slower than using |write()| --
154    // at least in a microbenchmark -- but much faster than using multiple
155    // |write()|s.
156    const size_t kMaxBufferCount = 10;
157    iovec iov[kMaxBufferCount];
158    size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
159
160    for (size_t i = 0; i < buffer_count; ++i) {
161      iov[i].iov_base = const_cast<char*>(buffers[i].addr);
162      iov[i].iov_len = buffers[i].size;
163    }
164
165    write_result = HANDLE_EINTR(writev(fd_.get().fd, iov, buffer_count));
166  }
167
168  if (write_result >= 0) {
169    *bytes_written = static_cast<size_t>(write_result);
170    return IO_SUCCEEDED;
171  }
172
173  if (errno != EAGAIN && errno != EWOULDBLOCK) {
174    PLOG(ERROR) << "write of size "
175                << write_buffer_no_lock()->GetTotalBytesToWrite();
176    return IO_FAILED;
177  }
178
179  return ScheduleWriteNoLock();
180}
181
182RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
183  write_lock().AssertAcquired();
184
185  DCHECK(!pending_write_);
186
187  // Set up to wait for the FD to become writable.
188  // If we're not on the I/O thread, we have to post a task to do this.
189  if (base::MessageLoop::current() != message_loop_for_io()) {
190    message_loop_for_io()->PostTask(
191        FROM_HERE,
192        base::Bind(&RawChannelPosix::WaitToWrite,
193                   weak_ptr_factory_.GetWeakPtr()));
194    pending_write_ = true;
195    return IO_PENDING;
196  }
197
198  if (message_loop_for_io()->WatchFileDescriptor(
199      fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
200      write_watcher_.get(), this)) {
201    pending_write_ = true;
202    return IO_PENDING;
203  }
204
205  return IO_FAILED;
206}
207
208bool RawChannelPosix::OnInit() {
209  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
210
211  DCHECK(!read_watcher_.get());
212  read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
213  DCHECK(!write_watcher_.get());
214  write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
215
216  if (!message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, true,
217          base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
218    // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
219    // (in the sense of returning the message loop's state to what it was before
220    // it was called).
221    read_watcher_.reset();
222    write_watcher_.reset();
223    return false;
224  }
225
226  return true;
227}
228
229void RawChannelPosix::OnShutdownNoLock(
230    scoped_ptr<ReadBuffer> /*read_buffer*/,
231    scoped_ptr<WriteBuffer> /*write_buffer*/) {
232  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
233  write_lock().AssertAcquired();
234
235  read_watcher_.reset();  // This will stop watching (if necessary).
236  write_watcher_.reset();  // This will stop watching (if necessary).
237
238  pending_read_ = false;
239  pending_write_ = false;
240
241  DCHECK(fd_.is_valid());
242  fd_.reset();
243
244  weak_ptr_factory_.InvalidateWeakPtrs();
245}
246
247void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
248  DCHECK_EQ(fd, fd_.get().fd);
249  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
250
251  if (!pending_read_) {
252    NOTREACHED();
253    return;
254  }
255
256  pending_read_ = false;
257  size_t bytes_read = 0;
258  IOResult result = Read(&bytes_read);
259  if (result != IO_PENDING)
260    OnReadCompleted(result == IO_SUCCEEDED, bytes_read);
261
262  // On failure, |read_watcher_| must have been reset; on success,
263  // we assume that |OnReadCompleted()| always schedules another read.
264  // Otherwise, we could end up spinning -- getting
265  // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual
266  // read.
267  // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't
268  // schedule a new read. But that code won't be reached under the current
269  // RawChannel implementation.
270  DCHECK(!read_watcher_.get() || pending_read_);
271}
272
273void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
274  DCHECK_EQ(fd, fd_.get().fd);
275  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
276
277  IOResult result = IO_FAILED;
278  size_t bytes_written = 0;
279  {
280    base::AutoLock locker(write_lock());
281
282    DCHECK(pending_write_);
283
284    pending_write_ = false;
285    result = WriteNoLock(&bytes_written);
286  }
287
288  if (result != IO_PENDING)
289    OnWriteCompleted(result == IO_SUCCEEDED, bytes_written);
290}
291
292void RawChannelPosix::WaitToWrite() {
293  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
294
295  DCHECK(write_watcher_.get());
296
297  if (!message_loop_for_io()->WatchFileDescriptor(
298          fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
299          write_watcher_.get(), this)) {
300    {
301      base::AutoLock locker(write_lock());
302
303      DCHECK(pending_write_);
304      pending_write_ = false;
305    }
306    OnWriteCompleted(false, 0);
307  }
308}
309
310}  // namespace
311
312// -----------------------------------------------------------------------------
313
314// Static factory method declared in raw_channel.h.
315// static
316RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle,
317                               Delegate* delegate,
318                               base::MessageLoopForIO* message_loop_for_io) {
319  return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io);
320}
321
322}  // namespace system
323}  // namespace mojo
324