raw_channel_posix.cc revision f2477e01787aa58f445919b809d89e252beef54f
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/raw_channel.h"
6
7#include <errno.h>
8#include <string.h>
9#include <unistd.h>
10
11#include <algorithm>
12#include <deque>
13#include <vector>
14
15#include "base/basictypes.h"
16#include "base/bind.h"
17#include "base/compiler_specific.h"
18#include "base/location.h"
19#include "base/logging.h"
20#include "base/memory/scoped_ptr.h"
21#include "base/memory/weak_ptr.h"
22#include "base/message_loop/message_loop.h"
23#include "base/posix/eintr_wrapper.h"
24#include "base/synchronization/lock.h"
25#include "mojo/system/message_in_transit.h"
26#include "mojo/system/platform_channel_handle.h"
27
28namespace mojo {
29namespace system {
30
31const size_t kReadSize = 4096;
32
33class RawChannelPosix : public RawChannel,
34                        public base::MessageLoopForIO::Watcher {
35 public:
36  RawChannelPosix(const PlatformChannelHandle& handle,
37                  Delegate* delegate,
38                  base::MessageLoop* message_loop);
39  virtual ~RawChannelPosix();
40
41  // |RawChannel| implementation:
42  virtual bool Init() OVERRIDE;
43  virtual void Shutdown() OVERRIDE;
44  virtual bool WriteMessage(MessageInTransit* message) OVERRIDE;
45
46 private:
47  // |base::MessageLoopForIO::Watcher| implementation:
48  virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
49  virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
50
51  // Watches for |fd_| to become writable. Must be called on the I/O thread.
52  void WaitToWrite();
53
54  // Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
55  // thread WITHOUT |write_lock_| held.
56  void CallOnFatalError(Delegate::FatalError fatal_error);
57
58  // Writes the message at the front of |write_message_queue_|, starting at
59  // |write_message_offset_|. It removes and destroys if the write completes and
60  // otherwise updates |write_message_offset_|. Returns true on success. Must be
61  // called under |write_lock_|.
62  bool WriteFrontMessageNoLock();
63
64  // Cancels all pending writes and destroys the contents of
65  // |write_message_queue_|. Should only be called if |is_dead_| is false; sets
66  // |is_dead_| to true. Must be called under |write_lock_|.
67  void CancelPendingWritesNoLock();
68
69  base::MessageLoopForIO* message_loop_for_io() {
70    return static_cast<base::MessageLoopForIO*>(message_loop());
71  }
72
73  int fd_;
74
75  // Only used on the I/O thread:
76  scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
77  scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
78
79  // We store data from |read()|s in |read_buffer_|. The start of |read_buffer_|
80  // is always aligned with a message boundary (we will copy memory to ensure
81  // this), but |read_buffer_| may be larger than the actual number of bytes we
82  // have.
83  std::vector<char> read_buffer_;
84  size_t read_buffer_num_valid_bytes_;
85
86  base::Lock write_lock_;  // Protects the following members.
87  bool is_dead_;
88  std::deque<MessageInTransit*> write_message_queue_;
89  size_t write_message_offset_;
90  // This is used for posting tasks from write threads to the I/O thread. It
91  // must only be accessed under |write_lock_|. The weak pointers it produces
92  // are only used/invalidated on the I/O thread.
93  base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
94
95  DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
96};
97
98RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle,
99                                 Delegate* delegate,
100                                 base::MessageLoop* message_loop)
101    : RawChannel(delegate, message_loop),
102      fd_(handle.fd),
103      read_buffer_num_valid_bytes_(0),
104      is_dead_(false),
105      write_message_offset_(0),
106      weak_ptr_factory_(this) {
107  CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO);
108  DCHECK_NE(fd_, -1);
109}
110
111RawChannelPosix::~RawChannelPosix() {
112  DCHECK(is_dead_);
113  DCHECK_EQ(fd_, -1);
114
115  // No need to take the |write_lock_| here -- if there are still weak pointers
116  // outstanding, then we're hosed anyway (since we wouldn't be able to
117  // invalidate them cleanly, since we might not be on the I/O thread).
118  DCHECK(!weak_ptr_factory_.HasWeakPtrs());
119
120  // These must have been shut down/destroyed on the I/O thread.
121  DCHECK(!read_watcher_.get());
122  DCHECK(!write_watcher_.get());
123}
124
125bool RawChannelPosix::Init() {
126  DCHECK_EQ(base::MessageLoop::current(), message_loop());
127
128  DCHECK(!read_watcher_.get());
129  read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
130  DCHECK(!write_watcher_.get());
131  write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
132
133  // No need to take the lock. No one should be using us yet.
134  DCHECK(write_message_queue_.empty());
135
136  if (!message_loop_for_io()->WatchFileDescriptor(fd_, true,
137          base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
138    // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
139    // (in the sense of returning the message loop's state to what it was before
140    // it was called).
141    read_watcher_.reset();
142    write_watcher_.reset();
143    return false;
144  }
145
146  return true;
147}
148
149void RawChannelPosix::Shutdown() {
150  DCHECK_EQ(base::MessageLoop::current(), message_loop());
151
152  base::AutoLock locker(write_lock_);
153  if (!is_dead_)
154    CancelPendingWritesNoLock();
155
156  DCHECK_NE(fd_, -1);
157  if (close(fd_) != 0)
158    PLOG(ERROR) << "close";
159  fd_ = -1;
160
161  weak_ptr_factory_.InvalidateWeakPtrs();
162
163  read_watcher_.reset();  // This will stop watching (if necessary).
164  write_watcher_.reset();  // This will stop watching (if necessary).
165}
166
167// Reminder: This must be thread-safe, and takes ownership of |message| on
168// success.
169bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
170  base::AutoLock locker(write_lock_);
171  if (is_dead_) {
172    message->Destroy();
173    return false;
174  }
175
176  if (!write_message_queue_.empty()) {
177    write_message_queue_.push_back(message);
178    return true;
179  }
180
181  write_message_queue_.push_front(message);
182  DCHECK_EQ(write_message_offset_, 0u);
183  bool result = WriteFrontMessageNoLock();
184  DCHECK(result || write_message_queue_.empty());
185
186  if (!result) {
187    // Even if we're on the I/O thread, don't call |OnFatalError()| in the
188    // nested context.
189    message_loop()->PostTask(FROM_HERE,
190                             base::Bind(&RawChannelPosix::CallOnFatalError,
191                                        weak_ptr_factory_.GetWeakPtr(),
192                                        Delegate::FATAL_ERROR_FAILED_WRITE));
193  } else if (!write_message_queue_.empty()) {
194    // Set up to wait for the FD to become writable. If we're not on the I/O
195    // thread, we have to post a task to do this.
196    if (base::MessageLoop::current() == message_loop()) {
197      WaitToWrite();
198    } else {
199      message_loop()->PostTask(FROM_HERE,
200                               base::Bind(&RawChannelPosix::WaitToWrite,
201                                          weak_ptr_factory_.GetWeakPtr()));
202    }
203  }
204
205  return result;
206}
207
208void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
209  DCHECK_EQ(fd, fd_);
210  DCHECK_EQ(base::MessageLoop::current(), message_loop());
211
212  bool did_dispatch_message = false;
213  // Tracks the offset of the first undispatched message in |read_buffer_|.
214  // Currently, we copy data to ensure that this is zero at the beginning.
215  size_t read_buffer_start = 0;
216  for (;;) {
217    if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_)
218            < kReadSize) {
219      // Use power-of-2 buffer sizes.
220      // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
221      // maximum message size to whatever extent necessary).
222      // TODO(vtl): We may often be able to peek at the header and get the real
223      // required extra space (which may be much bigger than |kReadSize|).
224      size_t new_size = std::max(read_buffer_.size(), kReadSize);
225      while (new_size <
226                 read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize)
227        new_size *= 2;
228
229      // TODO(vtl): It's suboptimal to zero out the fresh memory.
230      read_buffer_.resize(new_size, 0);
231    }
232
233    ssize_t bytes_read = HANDLE_EINTR(
234        read(fd_,
235             &read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
236             kReadSize));
237    if (bytes_read < 0) {
238      if (errno != EAGAIN && errno != EWOULDBLOCK) {
239        PLOG(ERROR) << "read";
240        {
241          base::AutoLock locker(write_lock_);
242          CancelPendingWritesNoLock();
243        }
244        CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
245        return;
246      }
247
248      break;
249    }
250
251    read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
252
253    // Dispatch all the messages that we can.
254    while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) {
255      const MessageInTransit* message =
256          reinterpret_cast<const MessageInTransit*>(
257              &read_buffer_[read_buffer_start]);
258      DCHECK_EQ(reinterpret_cast<size_t>(message) %
259                    MessageInTransit::kMessageAlignment, 0u);
260      // If we have the header, not the whole message....
261      if (read_buffer_num_valid_bytes_ <
262              message->size_with_header_and_padding())
263        break;
264
265      // Dispatch the message.
266      delegate()->OnReadMessage(*message);
267      if (!read_watcher_.get()) {
268        // |Shutdown()| was called in |OnReadMessage()|.
269        // TODO(vtl): Add test for this case.
270        return;
271      }
272      did_dispatch_message = true;
273
274      // Update our state.
275      read_buffer_start += message->size_with_header_and_padding();
276      read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding();
277    }
278
279    // If we dispatched any messages, stop reading for now (and let the message
280    // loop do its thing for another round).
281    // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
282    // a single message. Risks: slower, more complex if we want to avoid lots of
283    // copying. ii. Keep reading until there's no more data and dispatch all the
284    // messages we can. Risks: starvation of other users of the message loop.)
285    if (did_dispatch_message)
286      break;
287
288    // If we didn't max out |kReadSize|, stop reading for now.
289    if (static_cast<size_t>(bytes_read) < kReadSize)
290      break;
291
292    // Else try to read some more....
293  }
294
295  // Move data back to start.
296  if (read_buffer_start > 0) {
297    memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
298            read_buffer_num_valid_bytes_);
299    read_buffer_start = 0;
300  }
301}
302
303void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
304  DCHECK_EQ(fd, fd_);
305  DCHECK_EQ(base::MessageLoop::current(), message_loop());
306
307  bool did_fail = false;
308  {
309    base::AutoLock locker(write_lock_);
310    DCHECK(!is_dead_);
311    DCHECK(!write_message_queue_.empty());
312
313    bool result = WriteFrontMessageNoLock();
314    DCHECK(result || write_message_queue_.empty());
315
316    if (!result)
317      did_fail = true;
318    else if (!write_message_queue_.empty())
319      WaitToWrite();
320  }
321  if (did_fail)
322    CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
323}
324
325void RawChannelPosix::WaitToWrite() {
326  DCHECK_EQ(base::MessageLoop::current(), message_loop());
327
328  DCHECK(write_watcher_.get());
329  bool result = message_loop_for_io()->WatchFileDescriptor(
330      fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(),
331      this);
332  DCHECK(result);
333}
334
335void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
336  DCHECK_EQ(base::MessageLoop::current(), message_loop());
337  // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
338  delegate()->OnFatalError(fatal_error);
339}
340
341bool RawChannelPosix::WriteFrontMessageNoLock() {
342  write_lock_.AssertAcquired();
343
344  DCHECK(!is_dead_);
345  DCHECK(!write_message_queue_.empty());
346
347  MessageInTransit* message = write_message_queue_.front();
348  DCHECK_LT(write_message_offset_, message->size_with_header_and_padding());
349  size_t bytes_to_write =
350      message->size_with_header_and_padding() - write_message_offset_;
351  ssize_t bytes_written = HANDLE_EINTR(
352      write(fd_,
353            reinterpret_cast<char*>(message) + write_message_offset_,
354            bytes_to_write));
355  if (bytes_written < 0) {
356    if (errno != EAGAIN && errno != EWOULDBLOCK) {
357      PLOG(ERROR) << "write of size " << bytes_to_write;
358      CancelPendingWritesNoLock();
359      return false;
360    }
361
362    // We simply failed to write since we'd block. The logic is the same as if
363    // we got a partial write.
364    bytes_written = 0;
365  }
366
367  DCHECK_GE(bytes_written, 0);
368  if (static_cast<size_t>(bytes_written) < bytes_to_write) {
369    // Partial (or no) write.
370    write_message_offset_ += static_cast<size_t>(bytes_written);
371  } else {
372    // Complete write.
373    DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
374    write_message_queue_.pop_front();
375    write_message_offset_ = 0;
376    message->Destroy();
377  }
378
379  return true;
380}
381
382void RawChannelPosix::CancelPendingWritesNoLock() {
383  write_lock_.AssertAcquired();
384  DCHECK(!is_dead_);
385
386  is_dead_ = true;
387  for (std::deque<MessageInTransit*>::iterator it =
388           write_message_queue_.begin(); it != write_message_queue_.end();
389       ++it) {
390    (*it)->Destroy();
391  }
392  write_message_queue_.clear();
393}
394
395// -----------------------------------------------------------------------------
396
397// Static factory method declared in raw_channel.h.
398// static
399RawChannel* RawChannel::Create(const PlatformChannelHandle& handle,
400                               Delegate* delegate,
401                               base::MessageLoop* message_loop) {
402  return new RawChannelPosix(handle, delegate, message_loop);
403}
404
405}  // namespace system
406}  // namespace mojo
407