raw_channel.cc revision 010d83a9304c5a91596085d917d248abff47903a
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/raw_channel.h"
6
7#include <string.h>
8
9#include <algorithm>
10
11#include "base/bind.h"
12#include "base/location.h"
13#include "base/logging.h"
14#include "base/message_loop/message_loop.h"
15#include "base/stl_util.h"
16#include "mojo/system/message_in_transit.h"
17#include "mojo/system/transport_data.h"
18
19namespace mojo {
20namespace system {
21
22const size_t kReadSize = 4096;
23
24RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
25}
26
27RawChannel::ReadBuffer::~ReadBuffer() {}
28
29void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
30  DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
31  *addr = &buffer_[0] + num_valid_bytes_;
32  *size = kReadSize;
33}
34
35RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
36
37RawChannel::WriteBuffer::~WriteBuffer() {
38  STLDeleteElements(&message_queue_);
39}
40
41void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
42  buffers->clear();
43
44  if (message_queue_.empty())
45    return;
46
47  MessageInTransit* message = message_queue_.front();
48  DCHECK_LT(offset_, message->total_size());
49  size_t bytes_to_write = message->total_size() - offset_;
50
51  size_t transport_data_buffer_size = message->transport_data() ?
52      message->transport_data()->buffer_size() : 0;
53
54  if (!transport_data_buffer_size) {
55    // Only write from the main buffer.
56    DCHECK_LT(offset_, message->main_buffer_size());
57    DCHECK_LE(bytes_to_write, message->main_buffer_size());
58    Buffer buffer = {
59        static_cast<const char*>(message->main_buffer()) + offset_,
60        bytes_to_write};
61    buffers->push_back(buffer);
62    return;
63  }
64
65  if (offset_ >= message->main_buffer_size()) {
66    // Only write from the transport data buffer.
67    DCHECK_LT(offset_ - message->main_buffer_size(),
68              transport_data_buffer_size);
69    DCHECK_LE(bytes_to_write, transport_data_buffer_size);
70    Buffer buffer = {
71        static_cast<const char*>(message->transport_data()->buffer()) +
72            (offset_ - message->main_buffer_size()),
73        bytes_to_write};
74    buffers->push_back(buffer);
75    return;
76  }
77
78  // Write from both buffers.
79  DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ +
80                                transport_data_buffer_size);
81  Buffer buffer1 = {
82    static_cast<const char*>(message->main_buffer()) + offset_,
83    message->main_buffer_size() - offset_
84  };
85  buffers->push_back(buffer1);
86  Buffer buffer2 = {
87    static_cast<const char*>(message->transport_data()->buffer()),
88    transport_data_buffer_size
89  };
90  buffers->push_back(buffer2);
91}
92
93RawChannel::RawChannel()
94    : message_loop_for_io_(NULL),
95      delegate_(NULL),
96      read_stopped_(false),
97      write_stopped_(false),
98      weak_ptr_factory_(this) {
99}
100
101RawChannel::~RawChannel() {
102  DCHECK(!read_buffer_);
103  DCHECK(!write_buffer_);
104
105  // No need to take the |write_lock_| here -- if there are still weak pointers
106  // outstanding, then we're hosed anyway (since we wouldn't be able to
107  // invalidate them cleanly, since we might not be on the I/O thread).
108  DCHECK(!weak_ptr_factory_.HasWeakPtrs());
109}
110
111bool RawChannel::Init(Delegate* delegate) {
112  DCHECK(delegate);
113
114  DCHECK(!delegate_);
115  delegate_ = delegate;
116
117  CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
118  DCHECK(!message_loop_for_io_);
119  message_loop_for_io_ =
120      static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
121
122  // No need to take the lock. No one should be using us yet.
123  DCHECK(!read_buffer_);
124  read_buffer_.reset(new ReadBuffer);
125  DCHECK(!write_buffer_);
126  write_buffer_.reset(new WriteBuffer);
127
128  if (!OnInit()) {
129    delegate_ = NULL;
130    message_loop_for_io_ = NULL;
131    read_buffer_.reset();
132    write_buffer_.reset();
133    return false;
134  }
135
136  return ScheduleRead() == IO_PENDING;
137}
138
139void RawChannel::Shutdown() {
140  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
141
142  base::AutoLock locker(write_lock_);
143
144  LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
145      << "Shutting down RawChannel with write buffer nonempty";
146
147  // Reset the delegate so that it won't receive further calls.
148  delegate_ = NULL;
149  read_stopped_ = true;
150  write_stopped_ = true;
151  weak_ptr_factory_.InvalidateWeakPtrs();
152
153  OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
154}
155
156// Reminder: This must be thread-safe.
157bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
158  DCHECK(message);
159
160  // TODO(vtl)
161  if (message->transport_data() &&
162      message->transport_data()->has_platform_handles()) {
163    NOTIMPLEMENTED();
164    return false;
165  }
166
167  base::AutoLock locker(write_lock_);
168  if (write_stopped_)
169    return false;
170
171  if (!write_buffer_->message_queue_.empty()) {
172    write_buffer_->message_queue_.push_back(message.release());
173    return true;
174  }
175
176  write_buffer_->message_queue_.push_front(message.release());
177  DCHECK_EQ(write_buffer_->offset_, 0u);
178
179  size_t bytes_written = 0;
180  IOResult io_result = WriteNoLock(&bytes_written);
181  if (io_result == IO_PENDING)
182    return true;
183
184  bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
185                                       bytes_written);
186  if (!result) {
187    // Even if we're on the I/O thread, don't call |OnFatalError()| in the
188    // nested context.
189    message_loop_for_io_->PostTask(
190        FROM_HERE,
191        base::Bind(&RawChannel::CallOnFatalError,
192                   weak_ptr_factory_.GetWeakPtr(),
193                   Delegate::FATAL_ERROR_FAILED_WRITE));
194  }
195
196  return result;
197}
198
199// Reminder: This must be thread-safe.
200bool RawChannel::IsWriteBufferEmpty() {
201  base::AutoLock locker(write_lock_);
202  return write_buffer_->message_queue_.empty();
203}
204
205RawChannel::ReadBuffer* RawChannel::read_buffer() {
206  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
207  return read_buffer_.get();
208}
209
210RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
211  write_lock_.AssertAcquired();
212  return write_buffer_.get();
213}
214
215void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
216  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
217
218  if (read_stopped_) {
219    NOTREACHED();
220    return;
221  }
222
223  IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
224
225  // Keep reading data in a loop, and dispatch messages if enough data is
226  // received. Exit the loop if any of the following happens:
227  //   - one or more messages were dispatched;
228  //   - the last read failed, was a partial read or would block;
229  //   - |Shutdown()| was called.
230  do {
231    if (io_result != IO_SUCCEEDED) {
232      read_stopped_ = true;
233      CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
234      return;
235    }
236
237    read_buffer_->num_valid_bytes_ += bytes_read;
238
239    // Dispatch all the messages that we can.
240    bool did_dispatch_message = false;
241    // Tracks the offset of the first undispatched message in |read_buffer_|.
242    // Currently, we copy data to ensure that this is zero at the beginning.
243    size_t read_buffer_start = 0;
244    size_t remaining_bytes = read_buffer_->num_valid_bytes_;
245    size_t message_size;
246    // Note that we rely on short-circuit evaluation here:
247    //   - |read_buffer_start| may be an invalid index into
248    //     |read_buffer_->buffer_| if |remaining_bytes| is zero.
249    //   - |message_size| is only valid if |GetNextMessageSize()| returns true.
250    // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
251    // next read).
252    // TODO(vtl): Validate that |message_size| is sane.
253    while (remaining_bytes > 0 &&
254           MessageInTransit::GetNextMessageSize(
255               &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
256               &message_size) &&
257           remaining_bytes >= message_size) {
258      MessageInTransit::View
259          message_view(message_size, &read_buffer_->buffer_[read_buffer_start]);
260      DCHECK_EQ(message_view.total_size(), message_size);
261
262      // Dispatch the message.
263      DCHECK(delegate_);
264      delegate_->OnReadMessage(message_view);
265      if (read_stopped_) {
266        // |Shutdown()| was called in |OnReadMessage()|.
267        // TODO(vtl): Add test for this case.
268        return;
269      }
270      did_dispatch_message = true;
271
272      // Update our state.
273      read_buffer_start += message_size;
274      remaining_bytes -= message_size;
275    }
276
277    if (read_buffer_start > 0) {
278      // Move data back to start.
279      read_buffer_->num_valid_bytes_ = remaining_bytes;
280      if (read_buffer_->num_valid_bytes_ > 0) {
281        memmove(&read_buffer_->buffer_[0],
282                &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
283      }
284      read_buffer_start = 0;
285    }
286
287    if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
288            kReadSize) {
289      // Use power-of-2 buffer sizes.
290      // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
291      // maximum message size to whatever extent necessary).
292      // TODO(vtl): We may often be able to peek at the header and get the real
293      // required extra space (which may be much bigger than |kReadSize|).
294      size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
295      while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
296        new_size *= 2;
297
298      // TODO(vtl): It's suboptimal to zero out the fresh memory.
299      read_buffer_->buffer_.resize(new_size, 0);
300    }
301
302    // (1) If we dispatched any messages, stop reading for now (and let the
303    // message loop do its thing for another round).
304    // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
305    // a single message. Risks: slower, more complex if we want to avoid lots of
306    // copying. ii. Keep reading until there's no more data and dispatch all the
307    // messages we can. Risks: starvation of other users of the message loop.)
308    // (2) If we didn't max out |kReadSize|, stop reading for now.
309    bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
310    bytes_read = 0;
311    io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
312  } while (io_result != IO_PENDING);
313}
314
315void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
316  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
317
318  bool did_fail = false;
319  {
320    base::AutoLock locker(write_lock_);
321    DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
322
323    if (write_stopped_) {
324      NOTREACHED();
325      return;
326    }
327
328    did_fail = !OnWriteCompletedNoLock(result, bytes_written);
329  }
330
331  if (did_fail)
332    CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
333}
334
335void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
336  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
337  // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
338  if (delegate_)
339    delegate_->OnFatalError(fatal_error);
340}
341
342bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
343  write_lock_.AssertAcquired();
344
345  DCHECK(!write_stopped_);
346  DCHECK(!write_buffer_->message_queue_.empty());
347
348  if (result) {
349    write_buffer_->offset_ += bytes_written;
350
351    MessageInTransit* message = write_buffer_->message_queue_.front();
352    if (write_buffer_->offset_ >= message->total_size()) {
353      // Complete write.
354      DCHECK_EQ(write_buffer_->offset_, message->total_size());
355      write_buffer_->message_queue_.pop_front();
356      delete message;
357      write_buffer_->offset_ = 0;
358
359      if (write_buffer_->message_queue_.empty())
360        return true;
361    }
362
363    // Schedule the next write.
364    IOResult io_result = ScheduleWriteNoLock();
365    if (io_result == IO_PENDING)
366      return true;
367    DCHECK_EQ(io_result, IO_FAILED);
368  }
369
370  write_stopped_ = true;
371  STLDeleteElements(&write_buffer_->message_queue_);
372  write_buffer_->offset_ = 0;
373  return false;
374}
375
376}  // namespace system
377}  // namespace mojo
378