raw_channel_posix.cc revision 4e180b6a0b4720a9b8e9e959a882386f690f08ff
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/raw_channel.h"
6
7#include <errno.h>
8#include <string.h>
9#include <unistd.h>
10
11#include <algorithm>
12#include <deque>
13#include <vector>
14
15#include "base/basictypes.h"
16#include "base/bind.h"
17#include "base/compiler_specific.h"
18#include "base/location.h"
19#include "base/logging.h"
20#include "base/memory/scoped_ptr.h"
21#include "base/memory/weak_ptr.h"
22#include "base/message_loop/message_loop.h"
23#include "base/posix/eintr_wrapper.h"
24#include "base/synchronization/lock.h"
25#include "mojo/system/message_in_transit.h"
26#include "mojo/system/platform_channel_handle.h"
27
28namespace mojo {
29namespace system {
30
31const size_t kReadSize = 4096;
32
33class RawChannelPosix : public RawChannel,
34                        public base::MessageLoopForIO::Watcher {
35 public:
36  RawChannelPosix(const PlatformChannelHandle& handle,
37                  Delegate* delegate,
38                  base::MessageLoop* message_loop);
39  virtual ~RawChannelPosix();
40
41  // |RawChannel| implementation:
42  virtual void Init() OVERRIDE;
43  virtual void Shutdown() OVERRIDE;
44  virtual bool WriteMessage(MessageInTransit* message) OVERRIDE;
45
46 private:
47  // |base::MessageLoopForIO::Watcher| implementation:
48  virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
49  virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
50
51  // Watches for |fd_| to become writable. Must be called on the I/O thread.
52  void WaitToWrite();
53
54  // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
55  // thread WITHOUT |write_lock_| held.
56  void CallOnFatalError(Delegate::FatalError fatal_error);
57
58  // Writes the message at the front of |write_message_queue_|, starting at
59  // |write_message_offset_|. It removes and destroys if the write completes and
60  // otherwise updates |write_message_offset_|. Returns true on success. Must be
61  // called under |write_lock_|.
62  bool WriteFrontMessageNoLock();
63
64  // Cancels all pending writes and destroys the contents of
65  // |write_message_queue_|. Should only be called if |is_dead_| is false; sets
66  // |is_dead_| to true. Must be called under |write_lock_|.
67  void CancelPendingWritesNoLock();
68
69  base::MessageLoopForIO* message_loop_for_io() {
70    return static_cast<base::MessageLoopForIO*>(message_loop());
71  }
72
73  int fd_;
74
75  // Only used on the I/O thread:
76  scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
77  scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
78
79  // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_|
80  // is always aligned with a message boundary (we will copy memory to ensure
81  // this), but |read_buffer_| may be larger than the actual number of bytes we
82  // have.
83  std::vector<char> read_buffer_;
84  size_t read_buffer_num_valid_bytes_;
85
86  base::Lock write_lock_;  // Protects the following members.
87  bool is_dead_;
88  std::deque<MessageInTransit*> write_message_queue_;
89  size_t write_message_offset_;
90  // This is used for posting tasks from write threads to the I/O thread. It
91  // must only be accessed under |write_lock_|. The weak pointers it produces
92  // are only used/invalidated on the I/O thread.
93  base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
94
95  DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
96};
97
98RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle,
99                                 Delegate* delegate,
100                                 base::MessageLoop* message_loop)
101    : RawChannel(delegate, message_loop),
102      fd_(handle.fd),
103      read_buffer_num_valid_bytes_(0),
104      is_dead_(false),
105      write_message_offset_(0),
106      weak_ptr_factory_(this) {
107  CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO);
108  DCHECK_NE(fd_, -1);
109}
110
111RawChannelPosix::~RawChannelPosix() {
112  DCHECK(is_dead_);
113  DCHECK_EQ(fd_, -1);
114
115  // No need to take the |write_lock_| here -- if there are still weak pointers
116  // outstanding, then we're hosed anyway (since we wouldn't be able to
117  // invalidate them cleanly, since we might not be on the I/O thread).
118  DCHECK(!weak_ptr_factory_.HasWeakPtrs());
119
120  // These must have been shut down/destroyed on the I/O thread.
121  DCHECK(!read_watcher_.get());
122  DCHECK(!write_watcher_.get());
123}
124
125void RawChannelPosix::Init() {
126  DCHECK_EQ(base::MessageLoop::current(), message_loop());
127
128  DCHECK(!read_watcher_.get());
129  read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
130  DCHECK(!write_watcher_.get());
131  write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
132
133  // No need to take the lock. No one should be using us yet.
134  DCHECK(write_message_queue_.empty());
135
136  bool result = message_loop_for_io()->WatchFileDescriptor(
137      fd_, true, base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
138  DCHECK(result);
139}
140
141void RawChannelPosix::Shutdown() {
142  DCHECK_EQ(base::MessageLoop::current(), message_loop());
143
144  base::AutoLock locker(write_lock_);
145  if (!is_dead_)
146    CancelPendingWritesNoLock();
147
148  DCHECK_NE(fd_, -1);
149  if (close(fd_) != 0)
150    PLOG(ERROR) << "close";
151  fd_ = -1;
152
153  weak_ptr_factory_.InvalidateWeakPtrs();
154
155  read_watcher_.reset();  // This will stop watching (if necessary).
156  write_watcher_.reset();  // This will stop watching (if necessary).
157}
158
159// Reminder: This must be thread-safe, and takes ownership of |message| on
160// success.
161bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
162  base::AutoLock locker(write_lock_);
163  if (is_dead_) {
164    message->Destroy();
165    return false;
166  }
167
168  if (!write_message_queue_.empty()) {
169    write_message_queue_.push_back(message);
170    return true;
171  }
172
173  write_message_queue_.push_front(message);
174  DCHECK_EQ(write_message_offset_, 0u);
175  bool result = WriteFrontMessageNoLock();
176  DCHECK(result || write_message_queue_.empty());
177
178  if (!result) {
179    // Even if we're on the I/O thread, don't call |OnFatalError()| in the
180    // nested context.
181    message_loop()->PostTask(FROM_HERE,
182                             base::Bind(&RawChannelPosix::CallOnFatalError,
183                                        weak_ptr_factory_.GetWeakPtr(),
184                                        Delegate::FATAL_ERROR_FAILED_WRITE));
185  } else if (!write_message_queue_.empty()) {
186    // Set up to wait for the FD to become writable. If we're not on the I/O
187    // thread, we have to post a task to do this.
188    if (base::MessageLoop::current() == message_loop()) {
189      WaitToWrite();
190    } else {
191      message_loop()->PostTask(FROM_HERE,
192                               base::Bind(&RawChannelPosix::WaitToWrite,
193                                          weak_ptr_factory_.GetWeakPtr()));
194    }
195  }
196
197  return result;
198}
199
200void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
201  DCHECK_EQ(fd, fd_);
202  DCHECK_EQ(base::MessageLoop::current(), message_loop());
203
204  bool did_dispatch_message = false;
205  // Tracks the offset of the first undispatched message in |read_buffer_|.
206  // Currently, we copy data to ensure that this is zero at the beginning.
207  size_t read_buffer_start = 0;
208  for (;;) {
209    if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_)
210            < kReadSize) {
211      // Use power-of-2 buffer sizes.
212      // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
213      // maximum message size to whatever extent necessary).
214      // TODO(vtl): We may often be able to peek at the header and get the real
215      // required extra space (which may be much bigger than |kReadSize|).
216      size_t new_size = std::max(read_buffer_.size(), kReadSize);
217      while (new_size <
218                 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize)
219        new_size *= 2;
220
221      // TODO(vtl): It's suboptimal to zero out the fresh memory.
222      read_buffer_.resize(new_size, 0);
223    }
224
225    ssize_t bytes_read = HANDLE_EINTR(
226        read(fd_,
227             &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
228             kReadSize));
229    if (bytes_read < 0) {
230      if (errno != EAGAIN && errno != EWOULDBLOCK) {
231        PLOG(ERROR) << "read";
232        {
233          base::AutoLock locker(write_lock_);
234          CancelPendingWritesNoLock();
235        }
236        CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
237        return;
238      }
239
240      break;
241    }
242
243    read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
244
245    // Dispatch all the messages that we can.
246    while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) {
247      const MessageInTransit* message =
248          reinterpret_cast<const MessageInTransit*>(
249              &read_buffer_[read_buffer_start]);
250      DCHECK_EQ(reinterpret_cast<size_t>(message) %
251                    MessageInTransit::kMessageAlignment, 0u);
252      // If we have the header, not the whole message....
253      if (read_buffer_num_valid_bytes_ <
254              message->size_with_header_and_padding())
255        break;
256
257      // Dispatch the message.
258      delegate()->OnReadMessage(*message);
259      if (!read_watcher_.get()) {
260        // |Shutdown()| was called in |OnReadMessage()|.
261        // TODO(vtl): Add test for this case.
262        return;
263      }
264      did_dispatch_message = true;
265
266      // Update our state.
267      read_buffer_start += message->size_with_header_and_padding();
268      read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding();
269    }
270
271    // If we dispatched any messages, stop reading for now (and let the message
272    // loop do its thing for another round).
273    // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
274    // a single message. Risks: slower, more complex if we want to avoid lots of
275    // copying. ii. Keep reading until there's no more data and dispatch all the
276    // messages we can. Risks: starvation of other users of the message loop.)
277    if (did_dispatch_message)
278      break;
279
280    // If we didn't max out |kReadSize|, stop reading for now.
281    if (static_cast<size_t>(bytes_read) < kReadSize)
282      break;
283
284    // Else try to read some more....
285  }
286
287  // Move data back to start.
288  if (read_buffer_start > 0) {
289    memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
290            read_buffer_num_valid_bytes_);
291    read_buffer_start = 0;
292  }
293}
294
295void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
296  DCHECK_EQ(fd, fd_);
297  DCHECK_EQ(base::MessageLoop::current(), message_loop());
298
299  bool did_fail = false;
300  {
301    base::AutoLock locker(write_lock_);
302    DCHECK(!is_dead_);
303    DCHECK(!write_message_queue_.empty());
304
305    bool result = WriteFrontMessageNoLock();
306    DCHECK(result || write_message_queue_.empty());
307
308    if (!result)
309      did_fail = true;
310    else if (!write_message_queue_.empty())
311      WaitToWrite();
312  }
313  if (did_fail)
314    CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
315}
316
317void RawChannelPosix::WaitToWrite() {
318  DCHECK_EQ(base::MessageLoop::current(), message_loop());
319
320  DCHECK(write_watcher_.get());
321  bool result = message_loop_for_io()->WatchFileDescriptor(
322      fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(),
323      this);
324  DCHECK(result);
325}
326
327void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
328  DCHECK_EQ(base::MessageLoop::current(), message_loop());
329  // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
330  delegate()->OnFatalError(fatal_error);
331}
332
333bool RawChannelPosix::WriteFrontMessageNoLock() {
334  write_lock_.AssertAcquired();
335
336  DCHECK(!is_dead_);
337  DCHECK(!write_message_queue_.empty());
338
339  MessageInTransit* message = write_message_queue_.front();
340  DCHECK_LT(write_message_offset_, message->size_with_header_and_padding());
341  size_t bytes_to_write =
342      message->size_with_header_and_padding() - write_message_offset_;
343  ssize_t bytes_written = HANDLE_EINTR(
344      write(fd_,
345            reinterpret_cast<char*>(message) + write_message_offset_,
346            bytes_to_write));
347  if (bytes_written < 0) {
348    if (errno != EAGAIN && errno != EWOULDBLOCK) {
349      PLOG(ERROR) << "write of size " << bytes_to_write;
350      CancelPendingWritesNoLock();
351      return false;
352    }
353
354    // We simply failed to write since we'd block. The logic is the same as if
355    // we got a partial write.
356    bytes_written = 0;
357  }
358
359  DCHECK_GE(bytes_written, 0);
360  if (static_cast<size_t>(bytes_written) < bytes_to_write) {
361    // Partial (or no) write.
362    write_message_offset_ += static_cast<size_t>(bytes_written);
363  } else {
364    // Complete write.
365    DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
366    write_message_queue_.pop_front();
367    write_message_offset_ = 0;
368    message->Destroy();
369  }
370
371  return true;
372}
373
374void RawChannelPosix::CancelPendingWritesNoLock() {
375  write_lock_.AssertAcquired();
376  DCHECK(!is_dead_);
377
378  is_dead_ = true;
379  for (std::deque<MessageInTransit*>::iterator it =
380           write_message_queue_.begin(); it != write_message_queue_.end();
381       ++it) {
382    (*it)->Destroy();
383  }
384  write_message_queue_.clear();
385}
386
387// -----------------------------------------------------------------------------
388
389// Static factory method declared in raw_channel.h.
390// static
391RawChannel* RawChannel::Create(const PlatformChannelHandle& handle,
392                               Delegate* delegate,
393                               base::MessageLoop* message_loop) {
394  return new RawChannelPosix(handle, delegate, message_loop);
395}
396
397}  // namespace system
398}  // namespace mojo
399