raw_channel.cc revision 5c02ac1a9c1b504631c0a3d2b6e737b5d738bae1
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/raw_channel.h"
6
7#include <string.h>
8
9#include <algorithm>
10
11#include "base/bind.h"
12#include "base/location.h"
13#include "base/logging.h"
14#include "base/message_loop/message_loop.h"
15#include "base/stl_util.h"
16#include "mojo/system/message_in_transit.h"
17
18namespace mojo {
19namespace system {
20
21const size_t kReadSize = 4096;
22
23RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
24}
25
26RawChannel::ReadBuffer::~ReadBuffer() {}
27
28void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
29  DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
30  *addr = &buffer_[0] + num_valid_bytes_;
31  *size = kReadSize;
32}
33
34RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
35
36RawChannel::WriteBuffer::~WriteBuffer() {
37  STLDeleteElements(&message_queue_);
38}
39
40void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
41  buffers->clear();
42
43  size_t bytes_to_write = GetTotalBytesToWrite();
44  if (bytes_to_write == 0)
45    return;
46
47  MessageInTransit* message = message_queue_.front();
48  if (!message->secondary_buffer_size()) {
49    // Only write from the main buffer.
50    DCHECK_LT(offset_, message->main_buffer_size());
51    DCHECK_LE(bytes_to_write, message->main_buffer_size());
52    Buffer buffer = {
53        static_cast<const char*>(message->main_buffer()) + offset_,
54        bytes_to_write};
55    buffers->push_back(buffer);
56    return;
57  }
58
59  if (offset_ >= message->main_buffer_size()) {
60    // Only write from the secondary buffer.
61    DCHECK_LT(offset_ - message->main_buffer_size(),
62              message->secondary_buffer_size());
63    DCHECK_LE(bytes_to_write, message->secondary_buffer_size());
64    Buffer buffer = {
65        static_cast<const char*>(message->secondary_buffer()) +
66            (offset_ - message->main_buffer_size()),
67        bytes_to_write};
68    buffers->push_back(buffer);
69    return;
70  }
71
72  // Write from both buffers.
73  DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ +
74                                message->secondary_buffer_size());
75  Buffer buffer1 = {
76      static_cast<const char*>(message->main_buffer()) + offset_,
77      message->main_buffer_size() - offset_};
78  buffers->push_back(buffer1);
79  Buffer buffer2 = {
80      static_cast<const char*>(message->secondary_buffer()),
81      message->secondary_buffer_size()};
82  buffers->push_back(buffer2);
83}
84
85size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const {
86  if (message_queue_.empty())
87    return 0;
88
89  MessageInTransit* message = message_queue_.front();
90  DCHECK_LT(offset_, message->total_size());
91  return message->total_size() - offset_;
92}
93
94RawChannel::RawChannel()
95    : message_loop_for_io_(NULL),
96      delegate_(NULL),
97      read_stopped_(false),
98      write_stopped_(false),
99      weak_ptr_factory_(this) {
100}
101
102RawChannel::~RawChannel() {
103  DCHECK(!read_buffer_);
104  DCHECK(!write_buffer_);
105
106  // No need to take the |write_lock_| here -- if there are still weak pointers
107  // outstanding, then we're hosed anyway (since we wouldn't be able to
108  // invalidate them cleanly, since we might not be on the I/O thread).
109  DCHECK(!weak_ptr_factory_.HasWeakPtrs());
110}
111
112bool RawChannel::Init(Delegate* delegate) {
113  DCHECK(delegate);
114
115  DCHECK(!delegate_);
116  delegate_ = delegate;
117
118  CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
119  DCHECK(!message_loop_for_io_);
120  message_loop_for_io_ =
121      static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
122
123  // No need to take the lock. No one should be using us yet.
124  DCHECK(!read_buffer_);
125  read_buffer_.reset(new ReadBuffer);
126  DCHECK(!write_buffer_);
127  write_buffer_.reset(new WriteBuffer);
128
129  if (!OnInit()) {
130    delegate_ = NULL;
131    message_loop_for_io_ = NULL;
132    read_buffer_.reset();
133    write_buffer_.reset();
134    return false;
135  }
136
137  return ScheduleRead() == IO_PENDING;
138}
139
140void RawChannel::Shutdown() {
141  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
142
143  base::AutoLock locker(write_lock_);
144
145  LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
146      << "Shutting down RawChannel with write buffer nonempty";
147
148  // Reset the delegate so that it won't receive further calls.
149  delegate_ = NULL;
150  read_stopped_ = true;
151  write_stopped_ = true;
152  weak_ptr_factory_.InvalidateWeakPtrs();
153
154  OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
155}
156
157// Reminder: This must be thread-safe.
158bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
159  DCHECK(message);
160
161  // TODO(vtl)
162  if (message->has_platform_handles()) {
163    NOTIMPLEMENTED();
164    return false;
165  }
166
167  base::AutoLock locker(write_lock_);
168  if (write_stopped_)
169    return false;
170
171  if (!write_buffer_->message_queue_.empty()) {
172    write_buffer_->message_queue_.push_back(message.release());
173    return true;
174  }
175
176  write_buffer_->message_queue_.push_front(message.release());
177  DCHECK_EQ(write_buffer_->offset_, 0u);
178
179  size_t bytes_written = 0;
180  IOResult io_result = WriteNoLock(&bytes_written);
181  if (io_result == IO_PENDING)
182    return true;
183
184  bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
185                                       bytes_written);
186  if (!result) {
187    // Even if we're on the I/O thread, don't call |OnFatalError()| in the
188    // nested context.
189    message_loop_for_io_->PostTask(
190        FROM_HERE,
191        base::Bind(&RawChannel::CallOnFatalError,
192                   weak_ptr_factory_.GetWeakPtr(),
193                   Delegate::FATAL_ERROR_FAILED_WRITE));
194  }
195
196  return result;
197}
198
199// Reminder: This must be thread-safe.
200bool RawChannel::IsWriteBufferEmpty() {
201  base::AutoLock locker(write_lock_);
202  return write_buffer_->message_queue_.empty();
203}
204
205RawChannel::ReadBuffer* RawChannel::read_buffer() {
206  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
207  return read_buffer_.get();
208}
209
210RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
211  write_lock_.AssertAcquired();
212  return write_buffer_.get();
213}
214
215void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
216  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
217
218  if (read_stopped_) {
219    NOTREACHED();
220    return;
221  }
222
223  IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
224
225  // Keep reading data in a loop, and dispatch messages if enough data is
226  // received. Exit the loop if any of the following happens:
227  //   - one or more messages were dispatched;
228  //   - the last read failed, was a partial read or would block;
229  //   - |Shutdown()| was called.
230  do {
231    if (io_result != IO_SUCCEEDED) {
232      read_stopped_ = true;
233      CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
234      return;
235    }
236
237    read_buffer_->num_valid_bytes_ += bytes_read;
238
239    // Dispatch all the messages that we can.
240    bool did_dispatch_message = false;
241    // Tracks the offset of the first undispatched message in |read_buffer_|.
242    // Currently, we copy data to ensure that this is zero at the beginning.
243    size_t read_buffer_start = 0;
244    size_t remaining_bytes = read_buffer_->num_valid_bytes_;
245    size_t message_size;
246    // Note that we rely on short-circuit evaluation here:
247    //   - |read_buffer_start| may be an invalid index into
248    //     |read_buffer_->buffer_| if |remaining_bytes| is zero.
249    //   - |message_size| is only valid if |GetNextMessageSize()| returns true.
250    // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
251    // next read).
252    // TODO(vtl): Validate that |message_size| is sane.
253    while (remaining_bytes > 0 &&
254           MessageInTransit::GetNextMessageSize(
255               &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
256               &message_size) &&
257           remaining_bytes >= message_size) {
258      MessageInTransit::View
259          message_view(message_size, &read_buffer_->buffer_[read_buffer_start]);
260      DCHECK_EQ(message_view.total_size(), message_size);
261
262      // Dispatch the message.
263      DCHECK(delegate_);
264      delegate_->OnReadMessage(message_view);
265      if (read_stopped_) {
266        // |Shutdown()| was called in |OnReadMessage()|.
267        // TODO(vtl): Add test for this case.
268        return;
269      }
270      did_dispatch_message = true;
271
272      // Update our state.
273      read_buffer_start += message_size;
274      remaining_bytes -= message_size;
275    }
276
277    if (read_buffer_start > 0) {
278      // Move data back to start.
279      read_buffer_->num_valid_bytes_ = remaining_bytes;
280      if (read_buffer_->num_valid_bytes_ > 0) {
281        memmove(&read_buffer_->buffer_[0],
282                &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
283      }
284      read_buffer_start = 0;
285    }
286
287    if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
288            kReadSize) {
289      // Use power-of-2 buffer sizes.
290      // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
291      // maximum message size to whatever extent necessary).
292      // TODO(vtl): We may often be able to peek at the header and get the real
293      // required extra space (which may be much bigger than |kReadSize|).
294      size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
295      while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
296        new_size *= 2;
297
298      // TODO(vtl): It's suboptimal to zero out the fresh memory.
299      read_buffer_->buffer_.resize(new_size, 0);
300    }
301
302    // (1) If we dispatched any messages, stop reading for now (and let the
303    // message loop do its thing for another round).
304    // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
305    // a single message. Risks: slower, more complex if we want to avoid lots of
306    // copying. ii. Keep reading until there's no more data and dispatch all the
307    // messages we can. Risks: starvation of other users of the message loop.)
308    // (2) If we didn't max out |kReadSize|, stop reading for now.
309    bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
310    bytes_read = 0;
311    io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
312  } while (io_result != IO_PENDING);
313}
314
315void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
316  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
317
318  bool did_fail = false;
319  {
320    base::AutoLock locker(write_lock_);
321    DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
322
323    if (write_stopped_) {
324      NOTREACHED();
325      return;
326    }
327
328    did_fail = !OnWriteCompletedNoLock(result, bytes_written);
329  }
330
331  if (did_fail)
332    CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
333}
334
335void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
336  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
337  // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
338  if (delegate_)
339    delegate_->OnFatalError(fatal_error);
340}
341
342bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
343  write_lock_.AssertAcquired();
344
345  DCHECK(!write_stopped_);
346  DCHECK(!write_buffer_->message_queue_.empty());
347
348  if (result) {
349    if (bytes_written < write_buffer_->GetTotalBytesToWrite()) {
350      // Partial (or no) write.
351      write_buffer_->offset_ += bytes_written;
352    } else {
353      // Complete write.
354      DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite());
355      delete write_buffer_->message_queue_.front();
356      write_buffer_->message_queue_.pop_front();
357      write_buffer_->offset_ = 0;
358    }
359
360    if (write_buffer_->message_queue_.empty())
361      return true;
362
363    // Schedule the next write.
364    if (ScheduleWriteNoLock() == IO_PENDING)
365      return true;
366  }
367
368  write_stopped_ = true;
369  STLDeleteElements(&write_buffer_->message_queue_);
370  write_buffer_->offset_ = 0;
371  return false;
372}
373
374}  // namespace system
375}  // namespace mojo
376