raw_channel_posix.cc revision a3f6a49ab37290eeeb8db0f41ec0f1cb74a68be7
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/raw_channel.h"
6
7#include <errno.h>
8#include <string.h>
9#include <unistd.h>
10
11#include <algorithm>
12#include <deque>
13#include <vector>
14
15#include "base/basictypes.h"
16#include "base/bind.h"
17#include "base/compiler_specific.h"
18#include "base/location.h"
19#include "base/logging.h"
20#include "base/memory/scoped_ptr.h"
21#include "base/memory/weak_ptr.h"
22#include "base/message_loop/message_loop.h"
23#include "base/posix/eintr_wrapper.h"
24#include "base/synchronization/lock.h"
25#include "mojo/system/message_in_transit.h"
26#include "mojo/system/platform_channel_handle.h"
27
28namespace mojo {
29namespace system {
30
31namespace {
32
33const size_t kReadSize = 4096;
34
35class RawChannelPosix : public RawChannel,
36                        public base::MessageLoopForIO::Watcher {
37 public:
38  RawChannelPosix(const PlatformChannelHandle& handle,
39                  Delegate* delegate,
40                  base::MessageLoop* message_loop);
41  virtual ~RawChannelPosix();
42
43  // |RawChannel| implementation:
44  virtual bool Init() OVERRIDE;
45  virtual void Shutdown() OVERRIDE;
46  virtual bool WriteMessage(MessageInTransit* message) OVERRIDE;
47
48 private:
49  // |base::MessageLoopForIO::Watcher| implementation:
50  virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
51  virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
52
53  // Watches for |fd_| to become writable. Must be called on the I/O thread.
54  void WaitToWrite();
55
56  // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
57  // thread WITHOUT |write_lock_| held.
58  void CallOnFatalError(Delegate::FatalError fatal_error);
59
60  // Writes the message at the front of |write_message_queue_|, starting at
61  // |write_message_offset_|. It removes and destroys if the write completes and
62  // otherwise updates |write_message_offset_|. Returns true on success. Must be
63  // called under |write_lock_|.
64  bool WriteFrontMessageNoLock();
65
66  // Cancels all pending writes and destroys the contents of
67  // |write_message_queue_|. Should only be called if |is_dead_| is false; sets
68  // |is_dead_| to true. Must be called under |write_lock_|.
69  void CancelPendingWritesNoLock();
70
71  base::MessageLoopForIO* message_loop_for_io() {
72    return static_cast<base::MessageLoopForIO*>(message_loop());
73  }
74
75  int fd_;
76
77  // Only used on the I/O thread:
78  scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
79  scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
80
81  // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_|
82  // is always aligned with a message boundary (we will copy memory to ensure
83  // this), but |read_buffer_| may be larger than the actual number of bytes we
84  // have.
85  std::vector<char> read_buffer_;
86  size_t read_buffer_num_valid_bytes_;
87
88  base::Lock write_lock_;  // Protects the following members.
89  bool is_dead_;
90  std::deque<MessageInTransit*> write_message_queue_;
91  size_t write_message_offset_;
92  // This is used for posting tasks from write threads to the I/O thread. It
93  // must only be accessed under |write_lock_|. The weak pointers it produces
94  // are only used/invalidated on the I/O thread.
95  base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
96
97  DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
98};
99
100RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle,
101                                 Delegate* delegate,
102                                 base::MessageLoop* message_loop)
103    : RawChannel(delegate, message_loop),
104      fd_(handle.fd),
105      read_buffer_num_valid_bytes_(0),
106      is_dead_(false),
107      write_message_offset_(0),
108      weak_ptr_factory_(this) {
109  CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO);
110  DCHECK_NE(fd_, -1);
111}
112
113RawChannelPosix::~RawChannelPosix() {
114  DCHECK(is_dead_);
115  DCHECK_EQ(fd_, -1);
116
117  // No need to take the |write_lock_| here -- if there are still weak pointers
118  // outstanding, then we're hosed anyway (since we wouldn't be able to
119  // invalidate them cleanly, since we might not be on the I/O thread).
120  DCHECK(!weak_ptr_factory_.HasWeakPtrs());
121
122  // These must have been shut down/destroyed on the I/O thread.
123  DCHECK(!read_watcher_.get());
124  DCHECK(!write_watcher_.get());
125}
126
127bool RawChannelPosix::Init() {
128  DCHECK_EQ(base::MessageLoop::current(), message_loop());
129
130  DCHECK(!read_watcher_.get());
131  read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
132  DCHECK(!write_watcher_.get());
133  write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
134
135  // No need to take the lock. No one should be using us yet.
136  DCHECK(write_message_queue_.empty());
137
138  if (!message_loop_for_io()->WatchFileDescriptor(fd_, true,
139          base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
140    // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
141    // (in the sense of returning the message loop's state to what it was before
142    // it was called).
143    read_watcher_.reset();
144    write_watcher_.reset();
145    return false;
146  }
147
148  return true;
149}
150
151void RawChannelPosix::Shutdown() {
152  DCHECK_EQ(base::MessageLoop::current(), message_loop());
153
154  base::AutoLock locker(write_lock_);
155  if (!is_dead_)
156    CancelPendingWritesNoLock();
157
158  DCHECK_NE(fd_, -1);
159  if (close(fd_) != 0)
160    PLOG(ERROR) << "close";
161  fd_ = -1;
162
163  weak_ptr_factory_.InvalidateWeakPtrs();
164
165  read_watcher_.reset();  // This will stop watching (if necessary).
166  write_watcher_.reset();  // This will stop watching (if necessary).
167}
168
169// Reminder: This must be thread-safe, and takes ownership of |message| on
170// success.
171bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
172  base::AutoLock locker(write_lock_);
173  if (is_dead_) {
174    message->Destroy();
175    return false;
176  }
177
178  if (!write_message_queue_.empty()) {
179    write_message_queue_.push_back(message);
180    return true;
181  }
182
183  write_message_queue_.push_front(message);
184  DCHECK_EQ(write_message_offset_, 0u);
185  bool result = WriteFrontMessageNoLock();
186  DCHECK(result || write_message_queue_.empty());
187
188  if (!result) {
189    // Even if we're on the I/O thread, don't call |OnFatalError()| in the
190    // nested context.
191    message_loop()->PostTask(FROM_HERE,
192                             base::Bind(&RawChannelPosix::CallOnFatalError,
193                                        weak_ptr_factory_.GetWeakPtr(),
194                                        Delegate::FATAL_ERROR_FAILED_WRITE));
195  } else if (!write_message_queue_.empty()) {
196    // Set up to wait for the FD to become writable. If we're not on the I/O
197    // thread, we have to post a task to do this.
198    if (base::MessageLoop::current() == message_loop()) {
199      WaitToWrite();
200    } else {
201      message_loop()->PostTask(FROM_HERE,
202                               base::Bind(&RawChannelPosix::WaitToWrite,
203                                          weak_ptr_factory_.GetWeakPtr()));
204    }
205  }
206
207  return result;
208}
209
210void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
211  DCHECK_EQ(fd, fd_);
212  DCHECK_EQ(base::MessageLoop::current(), message_loop());
213
214  bool did_dispatch_message = false;
215  // Tracks the offset of the first undispatched message in |read_buffer_|.
216  // Currently, we copy data to ensure that this is zero at the beginning.
217  size_t read_buffer_start = 0;
218  for (;;) {
219    if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_)
220            < kReadSize) {
221      // Use power-of-2 buffer sizes.
222      // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
223      // maximum message size to whatever extent necessary).
224      // TODO(vtl): We may often be able to peek at the header and get the real
225      // required extra space (which may be much bigger than |kReadSize|).
226      size_t new_size = std::max(read_buffer_.size(), kReadSize);
227      while (new_size <
228                 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize)
229        new_size *= 2;
230
231      // TODO(vtl): It's suboptimal to zero out the fresh memory.
232      read_buffer_.resize(new_size, 0);
233    }
234
235    ssize_t bytes_read = HANDLE_EINTR(
236        read(fd_,
237             &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
238             kReadSize));
239    if (bytes_read < 0) {
240      if (errno != EAGAIN && errno != EWOULDBLOCK) {
241        PLOG(ERROR) << "read";
242        {
243          base::AutoLock locker(write_lock_);
244          CancelPendingWritesNoLock();
245        }
246        CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
247        return;
248      }
249
250      break;
251    }
252
253    read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
254
255    // Dispatch all the messages that we can.
256    while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) {
257      const MessageInTransit* message =
258          reinterpret_cast<const MessageInTransit*>(
259              &read_buffer_[read_buffer_start]);
260      DCHECK_EQ(reinterpret_cast<size_t>(message) %
261                    MessageInTransit::kMessageAlignment, 0u);
262      // If we have the header, not the whole message....
263      if (read_buffer_num_valid_bytes_ <
264              message->size_with_header_and_padding())
265        break;
266
267      // Dispatch the message.
268      delegate()->OnReadMessage(*message);
269      if (!read_watcher_.get()) {
270        // |Shutdown()| was called in |OnReadMessage()|.
271        // TODO(vtl): Add test for this case.
272        return;
273      }
274      did_dispatch_message = true;
275
276      // Update our state.
277      read_buffer_start += message->size_with_header_and_padding();
278      read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding();
279    }
280
281    // If we dispatched any messages, stop reading for now (and let the message
282    // loop do its thing for another round).
283    // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
284    // a single message. Risks: slower, more complex if we want to avoid lots of
285    // copying. ii. Keep reading until there's no more data and dispatch all the
286    // messages we can. Risks: starvation of other users of the message loop.)
287    if (did_dispatch_message)
288      break;
289
290    // If we didn't max out |kReadSize|, stop reading for now.
291    if (static_cast<size_t>(bytes_read) < kReadSize)
292      break;
293
294    // Else try to read some more....
295  }
296
297  // Move data back to start.
298  if (read_buffer_start > 0) {
299    memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
300            read_buffer_num_valid_bytes_);
301    read_buffer_start = 0;
302  }
303}
304
305void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
306  DCHECK_EQ(fd, fd_);
307  DCHECK_EQ(base::MessageLoop::current(), message_loop());
308
309  bool did_fail = false;
310  {
311    base::AutoLock locker(write_lock_);
312    DCHECK(!is_dead_);
313    DCHECK(!write_message_queue_.empty());
314
315    bool result = WriteFrontMessageNoLock();
316    DCHECK(result || write_message_queue_.empty());
317
318    if (!result)
319      did_fail = true;
320    else if (!write_message_queue_.empty())
321      WaitToWrite();
322  }
323  if (did_fail)
324    CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
325}
326
327void RawChannelPosix::WaitToWrite() {
328  DCHECK_EQ(base::MessageLoop::current(), message_loop());
329
330  DCHECK(write_watcher_.get());
331  bool result = message_loop_for_io()->WatchFileDescriptor(
332      fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(),
333      this);
334  DCHECK(result);
335}
336
337void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
338  DCHECK_EQ(base::MessageLoop::current(), message_loop());
339  // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
340  delegate()->OnFatalError(fatal_error);
341}
342
343bool RawChannelPosix::WriteFrontMessageNoLock() {
344  write_lock_.AssertAcquired();
345
346  DCHECK(!is_dead_);
347  DCHECK(!write_message_queue_.empty());
348
349  MessageInTransit* message = write_message_queue_.front();
350  DCHECK_LT(write_message_offset_, message->size_with_header_and_padding());
351  size_t bytes_to_write =
352      message->size_with_header_and_padding() - write_message_offset_;
353  ssize_t bytes_written = HANDLE_EINTR(
354      write(fd_,
355            reinterpret_cast<char*>(message) + write_message_offset_,
356            bytes_to_write));
357  if (bytes_written < 0) {
358    if (errno != EAGAIN && errno != EWOULDBLOCK) {
359      PLOG(ERROR) << "write of size " << bytes_to_write;
360      CancelPendingWritesNoLock();
361      return false;
362    }
363
364    // We simply failed to write since we'd block. The logic is the same as if
365    // we got a partial write.
366    bytes_written = 0;
367  }
368
369  DCHECK_GE(bytes_written, 0);
370  if (static_cast<size_t>(bytes_written) < bytes_to_write) {
371    // Partial (or no) write.
372    write_message_offset_ += static_cast<size_t>(bytes_written);
373  } else {
374    // Complete write.
375    DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
376    write_message_queue_.pop_front();
377    write_message_offset_ = 0;
378    message->Destroy();
379  }
380
381  return true;
382}
383
384void RawChannelPosix::CancelPendingWritesNoLock() {
385  write_lock_.AssertAcquired();
386  DCHECK(!is_dead_);
387
388  is_dead_ = true;
389  for (std::deque<MessageInTransit*>::iterator it =
390           write_message_queue_.begin(); it != write_message_queue_.end();
391       ++it) {
392    (*it)->Destroy();
393  }
394  write_message_queue_.clear();
395}
396
397}  // namespace
398
399// -----------------------------------------------------------------------------
400
401// Static factory method declared in raw_channel.h.
402// static
403RawChannel* RawChannel::Create(const PlatformChannelHandle& handle,
404                               Delegate* delegate,
405                               base::MessageLoop* message_loop) {
406  return new RawChannelPosix(handle, delegate, message_loop);
407}
408
409}  // namespace system
410}  // namespace mojo
411