1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/services/public/cpp/network/web_socket_write_queue.h"
6
7#include "base/bind.h"
8
9namespace mojo {
10
11struct WebSocketWriteQueue::Operation {
12  uint32_t num_bytes_;
13  base::Callback<void(const char*)> callback_;
14
15  const char* data_;
16  // Only initialized if the initial Write fails. This saves a copy in
17  // the common case.
18  std::vector<char> data_copy_;
19};
20
21WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle)
22    : handle_(handle), is_waiting_(false) {
23}
24
25WebSocketWriteQueue::~WebSocketWriteQueue() {
26}
27
28void WebSocketWriteQueue::Write(const char* data,
29                                uint32_t num_bytes,
30                                base::Callback<void(const char*)> callback) {
31  Operation* op = new Operation;
32  op->num_bytes_ = num_bytes;
33  op->callback_ = callback;
34  op->data_ = data;
35  queue_.push_back(op);
36
37  MojoResult result = MOJO_RESULT_SHOULD_WAIT;
38  if (!is_waiting_)
39    result = TryToWrite();
40
41  // If we have to wait, make a local copy of the data so we know it will
42  // live until we need it.
43  if (result == MOJO_RESULT_SHOULD_WAIT) {
44    op->data_copy_.resize(num_bytes);
45    memcpy(&op->data_copy_[0], data, num_bytes);
46    op->data_ = &op->data_copy_[0];
47  }
48}
49
50MojoResult WebSocketWriteQueue::TryToWrite() {
51  Operation* op = queue_[0];
52  uint32_t bytes_written = op->num_bytes_;
53  MojoResult result = WriteDataRaw(
54      handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
55  if (result == MOJO_RESULT_SHOULD_WAIT) {
56    Wait();
57    return result;
58  }
59
60  // Ensure |op| is deleted, whether or not |this| goes away.
61  scoped_ptr<Operation> op_deleter(op);
62  queue_.weak_erase(queue_.begin());
63  if (result != MOJO_RESULT_OK)
64    return result;
65
66  op->callback_.Run(op->data_);  // may delete |this|
67  return result;
68}
69
70void WebSocketWriteQueue::Wait() {
71  is_waiting_ = true;
72  handle_watcher_.Start(handle_,
73                        MOJO_HANDLE_SIGNAL_WRITABLE,
74                        MOJO_DEADLINE_INDEFINITE,
75                        base::Bind(&WebSocketWriteQueue::OnHandleReady,
76                                   base::Unretained(this)));
77}
78
79void WebSocketWriteQueue::OnHandleReady(MojoResult result) {
80  is_waiting_ = false;
81  TryToWrite();
82}
83
84}  // namespace mojo
85