raw_channel.cc revision a1401311d1ab56c4ed0a474bd38c108f75cb0cd9
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/system/raw_channel.h"
6
7#include <string.h>
8
9#include <algorithm>
10
11#include "base/bind.h"
12#include "base/location.h"
13#include "base/logging.h"
14#include "base/message_loop/message_loop.h"
15#include "base/stl_util.h"
16#include "mojo/system/message_in_transit.h"
17
18namespace mojo {
19namespace system {
20
21const size_t kReadSize = 4096;
22
23RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
24}
25
26RawChannel::ReadBuffer::~ReadBuffer() {}
27
28void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
29  DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
30  *addr = &buffer_[0] + num_valid_bytes_;
31  *size = kReadSize;
32}
33
34RawChannel::WriteBuffer::WriteBuffer() : offset_(0) {}
35
36RawChannel::WriteBuffer::~WriteBuffer() {
37  STLDeleteElements(&message_queue_);
38}
39
40void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
41  buffers->clear();
42
43  size_t bytes_to_write = GetTotalBytesToWrite();
44  if (bytes_to_write == 0)
45    return;
46
47  MessageInTransit* message = message_queue_.front();
48  if (!message->secondary_buffer_size()) {
49    // Only write from the main buffer.
50    DCHECK_LT(offset_, message->main_buffer_size());
51    DCHECK_LE(bytes_to_write, message->main_buffer_size());
52    Buffer buffer = {
53        static_cast<const char*>(message->main_buffer()) + offset_,
54        bytes_to_write};
55    buffers->push_back(buffer);
56    return;
57  }
58
59  if (offset_ >= message->main_buffer_size()) {
60    // Only write from the secondary buffer.
61    DCHECK_LT(offset_ - message->main_buffer_size(),
62              message->secondary_buffer_size());
63    DCHECK_LE(bytes_to_write, message->secondary_buffer_size());
64    Buffer buffer = {
65        static_cast<const char*>(message->secondary_buffer()) +
66            (offset_ - message->main_buffer_size()),
67        bytes_to_write};
68    buffers->push_back(buffer);
69    return;
70  }
71
72  // Write from both buffers.
73  DCHECK_EQ(bytes_to_write, message->main_buffer_size() - offset_ +
74                                message->secondary_buffer_size());
75  Buffer buffer1 = {
76      static_cast<const char*>(message->main_buffer()) + offset_,
77      message->main_buffer_size() - offset_};
78  buffers->push_back(buffer1);
79  Buffer buffer2 = {
80      static_cast<const char*>(message->secondary_buffer()),
81      message->secondary_buffer_size()};
82  buffers->push_back(buffer2);
83}
84
85size_t RawChannel::WriteBuffer::GetTotalBytesToWrite() const {
86  if (message_queue_.empty())
87    return 0;
88
89  MessageInTransit* message = message_queue_.front();
90  DCHECK_LT(offset_, message->total_size());
91  return message->total_size() - offset_;
92}
93
94RawChannel::RawChannel(Delegate* delegate,
95                       base::MessageLoopForIO* message_loop_for_io)
96    : delegate_(delegate),
97      message_loop_for_io_(message_loop_for_io),
98      read_stopped_(false),
99      write_stopped_(false),
100      weak_ptr_factory_(this) {
101}
102
103RawChannel::~RawChannel() {
104  DCHECK(!read_buffer_);
105  DCHECK(!write_buffer_);
106
107  // No need to take the |write_lock_| here -- if there are still weak pointers
108  // outstanding, then we're hosed anyway (since we wouldn't be able to
109  // invalidate them cleanly, since we might not be on the I/O thread).
110  DCHECK(!weak_ptr_factory_.HasWeakPtrs());
111}
112
113bool RawChannel::Init() {
114  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
115
116  // No need to take the lock. No one should be using us yet.
117  DCHECK(!read_buffer_);
118  read_buffer_.reset(new ReadBuffer);
119  DCHECK(!write_buffer_);
120  write_buffer_.reset(new WriteBuffer);
121
122  if (!OnInit())
123    return false;
124
125  return ScheduleRead() == IO_PENDING;
126}
127
128void RawChannel::Shutdown() {
129  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
130
131  base::AutoLock locker(write_lock_);
132
133  weak_ptr_factory_.InvalidateWeakPtrs();
134
135  read_stopped_ = true;
136  write_stopped_ = true;
137
138  OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
139}
140
141// Reminder: This must be thread-safe.
142bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
143  base::AutoLock locker(write_lock_);
144  if (write_stopped_)
145    return false;
146
147  if (!write_buffer_->message_queue_.empty()) {
148    write_buffer_->message_queue_.push_back(message.release());
149    return true;
150  }
151
152  write_buffer_->message_queue_.push_front(message.release());
153  DCHECK_EQ(write_buffer_->offset_, 0u);
154
155  size_t bytes_written = 0;
156  IOResult io_result = WriteNoLock(&bytes_written);
157  if (io_result == IO_PENDING)
158    return true;
159
160  bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
161                                       bytes_written);
162  if (!result) {
163    // Even if we're on the I/O thread, don't call |OnFatalError()| in the
164    // nested context.
165    message_loop_for_io_->PostTask(
166        FROM_HERE,
167        base::Bind(&RawChannel::CallOnFatalError,
168                   weak_ptr_factory_.GetWeakPtr(),
169                   Delegate::FATAL_ERROR_FAILED_WRITE));
170  }
171
172  return result;
173}
174
175RawChannel::ReadBuffer* RawChannel::read_buffer() {
176  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
177  return read_buffer_.get();
178}
179
180RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
181  write_lock_.AssertAcquired();
182  return write_buffer_.get();
183}
184
185void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
186  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
187
188  if (read_stopped_) {
189    NOTREACHED();
190    return;
191  }
192
193  IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
194
195  // Keep reading data in a loop, and dispatch messages if enough data is
196  // received. Exit the loop if any of the following happens:
197  //   - one or more messages were dispatched;
198  //   - the last read failed, was a partial read or would block;
199  //   - |Shutdown()| was called.
200  do {
201    if (io_result != IO_SUCCEEDED) {
202      read_stopped_ = true;
203      CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
204      return;
205    }
206
207    read_buffer_->num_valid_bytes_ += bytes_read;
208
209    // Dispatch all the messages that we can.
210    bool did_dispatch_message = false;
211    // Tracks the offset of the first undispatched message in |read_buffer_|.
212    // Currently, we copy data to ensure that this is zero at the beginning.
213    size_t read_buffer_start = 0;
214    size_t remaining_bytes = read_buffer_->num_valid_bytes_;
215    size_t message_size;
216    // Note that we rely on short-circuit evaluation here:
217    //   - |read_buffer_start| may be an invalid index into
218    //     |read_buffer_->buffer_| if |remaining_bytes| is zero.
219    //   - |message_size| is only valid if |GetNextMessageSize()| returns true.
220    // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
221    // next read).
222    // TODO(vtl): Validate that |message_size| is sane.
223    while (remaining_bytes > 0 &&
224           MessageInTransit::GetNextMessageSize(
225               &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
226               &message_size) &&
227           remaining_bytes >= message_size) {
228      MessageInTransit::View
229          message_view(message_size, &read_buffer_->buffer_[read_buffer_start]);
230      DCHECK_EQ(message_view.total_size(), message_size);
231
232      // Dispatch the message.
233      delegate_->OnReadMessage(message_view);
234      if (read_stopped_) {
235        // |Shutdown()| was called in |OnReadMessage()|.
236        // TODO(vtl): Add test for this case.
237        return;
238      }
239      did_dispatch_message = true;
240
241      // Update our state.
242      read_buffer_start += message_size;
243      remaining_bytes -= message_size;
244    }
245
246    if (read_buffer_start > 0) {
247      // Move data back to start.
248      read_buffer_->num_valid_bytes_ = remaining_bytes;
249      if (read_buffer_->num_valid_bytes_ > 0) {
250        memmove(&read_buffer_->buffer_[0],
251                &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
252      }
253      read_buffer_start = 0;
254    }
255
256    if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
257            kReadSize) {
258      // Use power-of-2 buffer sizes.
259      // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
260      // maximum message size to whatever extent necessary).
261      // TODO(vtl): We may often be able to peek at the header and get the real
262      // required extra space (which may be much bigger than |kReadSize|).
263      size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
264      while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
265        new_size *= 2;
266
267      // TODO(vtl): It's suboptimal to zero out the fresh memory.
268      read_buffer_->buffer_.resize(new_size, 0);
269    }
270
271    // (1) If we dispatched any messages, stop reading for now (and let the
272    // message loop do its thing for another round).
273    // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
274    // a single message. Risks: slower, more complex if we want to avoid lots of
275    // copying. ii. Keep reading until there's no more data and dispatch all the
276    // messages we can. Risks: starvation of other users of the message loop.)
277    // (2) If we didn't max out |kReadSize|, stop reading for now.
278    bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
279    bytes_read = 0;
280    io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
281  } while (io_result != IO_PENDING);
282}
283
284void RawChannel::OnWriteCompleted(bool result, size_t bytes_written) {
285  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
286
287  bool did_fail = false;
288  {
289    base::AutoLock locker(write_lock_);
290    DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
291
292    if (write_stopped_) {
293      NOTREACHED();
294      return;
295    }
296
297    did_fail = !OnWriteCompletedNoLock(result, bytes_written);
298  }
299
300  if (did_fail)
301    CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
302}
303
304void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
305  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
306  // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
307  delegate_->OnFatalError(fatal_error);
308}
309
310bool RawChannel::OnWriteCompletedNoLock(bool result, size_t bytes_written) {
311  write_lock_.AssertAcquired();
312
313  DCHECK(!write_stopped_);
314  DCHECK(!write_buffer_->message_queue_.empty());
315
316  if (result) {
317    if (bytes_written < write_buffer_->GetTotalBytesToWrite()) {
318      // Partial (or no) write.
319      write_buffer_->offset_ += bytes_written;
320    } else {
321      // Complete write.
322      DCHECK_EQ(bytes_written, write_buffer_->GetTotalBytesToWrite());
323      delete write_buffer_->message_queue_.front();
324      write_buffer_->message_queue_.pop_front();
325      write_buffer_->offset_ = 0;
326    }
327
328    if (write_buffer_->message_queue_.empty())
329      return true;
330
331    // Schedule the next write.
332    if (ScheduleWriteNoLock() == IO_PENDING)
333      return true;
334  }
335
336  write_stopped_ = true;
337  STLDeleteElements(&write_buffer_->message_queue_);
338  write_buffer_->offset_ = 0;
339  return false;
340}
341
342}  // namespace system
343}  // namespace mojo
344