1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/services/public/cpp/network/web_socket_write_queue.h" 6 7#include "base/bind.h" 8 9namespace mojo { 10 11struct WebSocketWriteQueue::Operation { 12 uint32_t num_bytes_; 13 base::Callback<void(const char*)> callback_; 14 15 const char* data_; 16 // Only initialized if the initial Write fails. This saves a copy in 17 // the common case. 18 std::vector<char> data_copy_; 19}; 20 21WebSocketWriteQueue::WebSocketWriteQueue(DataPipeProducerHandle handle) 22 : handle_(handle), is_waiting_(false) { 23} 24 25WebSocketWriteQueue::~WebSocketWriteQueue() { 26} 27 28void WebSocketWriteQueue::Write(const char* data, 29 uint32_t num_bytes, 30 base::Callback<void(const char*)> callback) { 31 Operation* op = new Operation; 32 op->num_bytes_ = num_bytes; 33 op->callback_ = callback; 34 op->data_ = data; 35 queue_.push_back(op); 36 37 MojoResult result = MOJO_RESULT_SHOULD_WAIT; 38 if (!is_waiting_) 39 result = TryToWrite(); 40 41 // If we have to wait, make a local copy of the data so we know it will 42 // live until we need it. 43 if (result == MOJO_RESULT_SHOULD_WAIT) { 44 op->data_copy_.resize(num_bytes); 45 memcpy(&op->data_copy_[0], data, num_bytes); 46 op->data_ = &op->data_copy_[0]; 47 } 48} 49 50MojoResult WebSocketWriteQueue::TryToWrite() { 51 Operation* op = queue_[0]; 52 uint32_t bytes_written = op->num_bytes_; 53 MojoResult result = WriteDataRaw( 54 handle_, op->data_, &bytes_written, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE); 55 if (result == MOJO_RESULT_SHOULD_WAIT) { 56 Wait(); 57 return result; 58 } 59 60 // Ensure |op| is deleted, whether or not |this| goes away. 61 scoped_ptr<Operation> op_deleter(op); 62 queue_.weak_erase(queue_.begin()); 63 if (result != MOJO_RESULT_OK) 64 return result; 65 66 op->callback_.Run(op->data_); // may delete |this| 67 return result; 68} 69 70void WebSocketWriteQueue::Wait() { 71 is_waiting_ = true; 72 handle_watcher_.Start(handle_, 73 MOJO_HANDLE_SIGNAL_WRITABLE, 74 MOJO_DEADLINE_INDEFINITE, 75 base::Bind(&WebSocketWriteQueue::OnHandleReady, 76 base::Unretained(this))); 77} 78 79void WebSocketWriteQueue::OnHandleReady(MojoResult result) { 80 is_waiting_ = false; 81 TryToWrite(); 82} 83 84} // namespace mojo 85