1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/services/public/cpp/network/web_socket_read_queue.h" 6 7#include "base/bind.h" 8 9namespace mojo { 10 11struct WebSocketReadQueue::Operation { 12 uint32_t num_bytes_; 13 base::Callback<void(const char*)> callback_; 14}; 15 16WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle) 17 : handle_(handle), is_waiting_(false) { 18} 19 20WebSocketReadQueue::~WebSocketReadQueue() { 21} 22 23void WebSocketReadQueue::Read(uint32_t num_bytes, 24 base::Callback<void(const char*)> callback) { 25 Operation* op = new Operation; 26 op->num_bytes_ = num_bytes; 27 op->callback_ = callback; 28 queue_.push_back(op); 29 30 if (!is_waiting_) 31 TryToRead(); 32} 33 34void WebSocketReadQueue::TryToRead() { 35 Operation* op = queue_[0]; 36 const void* buffer = NULL; 37 uint32_t bytes_read = op->num_bytes_; 38 MojoResult result = BeginReadDataRaw( 39 handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE); 40 if (result == MOJO_RESULT_SHOULD_WAIT) { 41 EndReadDataRaw(handle_, bytes_read); 42 Wait(); 43 return; 44 } 45 46 // Ensure |op| is deleted, whether or not |this| goes away. 47 scoped_ptr<Operation> op_deleter(op); 48 queue_.weak_erase(queue_.begin()); 49 if (result != MOJO_RESULT_OK) 50 return; 51 DataPipeConsumerHandle handle = handle_; 52 op->callback_.Run(static_cast<const char*>(buffer)); // may delete |this| 53 EndReadDataRaw(handle, bytes_read); 54} 55 56void WebSocketReadQueue::Wait() { 57 is_waiting_ = true; 58 handle_watcher_.Start( 59 handle_, 60 MOJO_HANDLE_SIGNAL_READABLE, 61 MOJO_DEADLINE_INDEFINITE, 62 base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this))); 63} 64 65void WebSocketReadQueue::OnHandleReady(MojoResult result) { 66 is_waiting_ = false; 67 TryToRead(); 68} 69 70} // namespace mojo 71