1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/services/public/cpp/network/web_socket_read_queue.h"
6
7#include "base/bind.h"
8
9namespace mojo {
10
11struct WebSocketReadQueue::Operation {
12  uint32_t num_bytes_;
13  base::Callback<void(const char*)> callback_;
14};
15
16WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle)
17    : handle_(handle), is_waiting_(false) {
18}
19
20WebSocketReadQueue::~WebSocketReadQueue() {
21}
22
23void WebSocketReadQueue::Read(uint32_t num_bytes,
24                              base::Callback<void(const char*)> callback) {
25  Operation* op = new Operation;
26  op->num_bytes_ = num_bytes;
27  op->callback_ = callback;
28  queue_.push_back(op);
29
30  if (!is_waiting_)
31    TryToRead();
32}
33
34void WebSocketReadQueue::TryToRead() {
35  Operation* op = queue_[0];
36  const void* buffer = NULL;
37  uint32_t bytes_read = op->num_bytes_;
38  MojoResult result = BeginReadDataRaw(
39      handle_, &buffer, &bytes_read, MOJO_READ_DATA_FLAG_ALL_OR_NONE);
40  if (result == MOJO_RESULT_SHOULD_WAIT) {
41    EndReadDataRaw(handle_, bytes_read);
42    Wait();
43    return;
44  }
45
46  // Ensure |op| is deleted, whether or not |this| goes away.
47  scoped_ptr<Operation> op_deleter(op);
48  queue_.weak_erase(queue_.begin());
49  if (result != MOJO_RESULT_OK)
50    return;
51  DataPipeConsumerHandle handle = handle_;
52  op->callback_.Run(static_cast<const char*>(buffer));  // may delete |this|
53  EndReadDataRaw(handle, bytes_read);
54}
55
56void WebSocketReadQueue::Wait() {
57  is_waiting_ = true;
58  handle_watcher_.Start(
59      handle_,
60      MOJO_HANDLE_SIGNAL_READABLE,
61      MOJO_DEADLINE_INDEFINITE,
62      base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this)));
63}
64
65void WebSocketReadQueue::OnHandleReady(MojoResult result) {
66  is_waiting_ = false;
67  TryToRead();
68}
69
70}  // namespace mojo
71