1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/memory/weak_ptr.h"
6#include "base/message_loop/message_loop.h"
7#include "base/rand_util.h"
8#include "chrome/browser/devtools/device/android_device_manager.h"
9#include "content/public/browser/browser_thread.h"
10#include "net/base/io_buffer.h"
11#include "net/base/net_errors.h"
12#include "net/server/web_socket.h"
13#include "net/socket/stream_socket.h"
14
15using content::BrowserThread;
16using net::WebSocket;
17
18namespace {
19
20const int kBufferSize = 16 * 1024;
21
22class WebSocketImpl {
23 public:
24  typedef AndroidDeviceManager::AndroidWebSocket::Delegate Delegate;
25
26  WebSocketImpl(Delegate* delegate,
27                scoped_ptr<net::StreamSocket> socket);
28  void StartListening();
29  void SendFrame(const std::string& message);
30
31 private:
32  void OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer, int result);
33  void SendPendingRequests(int result);
34  void Disconnect();
35
36  Delegate* delegate_;
37  scoped_ptr<net::StreamSocket> socket_;
38  std::string response_buffer_;
39  std::string request_buffer_;
40  base::ThreadChecker thread_checker_;
41  DISALLOW_COPY_AND_ASSIGN(WebSocketImpl);
42};
43
44class DelegateWrapper
45    : public AndroidDeviceManager::AndroidWebSocket::Delegate {
46 public:
47  DelegateWrapper(base::WeakPtr<Delegate> weak_delegate,
48                  scoped_refptr<base::MessageLoopProxy> message_loop)
49      : weak_delegate_(weak_delegate),
50        message_loop_(message_loop) {
51  }
52
53  virtual ~DelegateWrapper() {}
54
55  // AndroidWebSocket::Delegate implementation
56  virtual void OnSocketOpened() OVERRIDE {
57    message_loop_->PostTask(FROM_HERE,
58        base::Bind(&Delegate::OnSocketOpened, weak_delegate_));
59  }
60
61  virtual void OnFrameRead(const std::string& message) OVERRIDE {
62    message_loop_->PostTask(FROM_HERE,
63        base::Bind(&Delegate::OnFrameRead, weak_delegate_, message));
64  }
65
66  virtual void OnSocketClosed() OVERRIDE {
67    message_loop_->PostTask(FROM_HERE,
68        base::Bind(&Delegate::OnSocketClosed, weak_delegate_));
69  }
70
71 private:
72  base::WeakPtr<Delegate> weak_delegate_;
73  scoped_refptr<base::MessageLoopProxy> message_loop_;
74};
75
76class AndroidWebSocketImpl
77    : public AndroidDeviceManager::AndroidWebSocket,
78      public AndroidDeviceManager::AndroidWebSocket::Delegate {
79 public:
80  typedef AndroidDeviceManager::Device Device;
81  AndroidWebSocketImpl(
82      scoped_refptr<base::MessageLoopProxy> device_message_loop,
83      scoped_refptr<Device> device,
84      const std::string& socket_name,
85      const std::string& url,
86      AndroidWebSocket::Delegate* delegate);
87
88  virtual ~AndroidWebSocketImpl();
89
90  // AndroidWebSocket implementation
91  virtual void SendFrame(const std::string& message) OVERRIDE;
92
93  // AndroidWebSocket::Delegate implementation
94  virtual void OnSocketOpened() OVERRIDE;
95  virtual void OnFrameRead(const std::string& message) OVERRIDE;
96  virtual void OnSocketClosed() OVERRIDE;
97
98 private:
99  void Connected(int result, scoped_ptr<net::StreamSocket> socket);
100
101  scoped_refptr<base::MessageLoopProxy> device_message_loop_;
102  scoped_refptr<Device> device_;
103  std::string socket_name_;
104  std::string url_;
105  WebSocketImpl* connection_;
106  DelegateWrapper* delegate_wrapper_;
107  AndroidWebSocket::Delegate* delegate_;
108  base::WeakPtrFactory<AndroidWebSocketImpl> weak_factory_;
109  DISALLOW_COPY_AND_ASSIGN(AndroidWebSocketImpl);
110};
111
112AndroidWebSocketImpl::AndroidWebSocketImpl(
113    scoped_refptr<base::MessageLoopProxy> device_message_loop,
114    scoped_refptr<Device> device,
115    const std::string& socket_name,
116    const std::string& url,
117    AndroidWebSocket::Delegate* delegate)
118    : device_message_loop_(device_message_loop),
119      device_(device),
120      socket_name_(socket_name),
121      url_(url),
122      delegate_(delegate),
123      weak_factory_(this) {
124  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
125  DCHECK(delegate_);
126  device_->HttpUpgrade(
127      socket_name_, url_,
128      base::Bind(&AndroidWebSocketImpl::Connected, weak_factory_.GetWeakPtr()));
129}
130
131void AndroidWebSocketImpl::SendFrame(const std::string& message) {
132  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
133  device_message_loop_->PostTask(
134      FROM_HERE,
135      base::Bind(&WebSocketImpl::SendFrame,
136                 base::Unretained(connection_), message));
137}
138
139void WebSocketImpl::SendFrame(const std::string& message) {
140  DCHECK(thread_checker_.CalledOnValidThread());
141  if (!socket_)
142    return;
143  int mask = base::RandInt(0, 0x7FFFFFFF);
144  std::string encoded_frame = WebSocket::EncodeFrameHybi17(message, mask);
145  request_buffer_ += encoded_frame;
146  if (request_buffer_.length() == encoded_frame.length())
147    SendPendingRequests(0);
148}
149
150AndroidWebSocketImpl::~AndroidWebSocketImpl() {
151  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
152  device_message_loop_->DeleteSoon(FROM_HERE, connection_);
153  device_message_loop_->DeleteSoon(FROM_HERE, delegate_wrapper_);
154}
155
156WebSocketImpl::WebSocketImpl(Delegate* delegate,
157                             scoped_ptr<net::StreamSocket> socket)
158                             : delegate_(delegate),
159                               socket_(socket.Pass()) {
160  thread_checker_.DetachFromThread();
161}
162
163void AndroidWebSocketImpl::Connected(int result,
164                                     scoped_ptr<net::StreamSocket> socket) {
165  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
166  if (result != net::OK || socket == NULL) {
167    OnSocketClosed();
168    return;
169  }
170  delegate_wrapper_ = new DelegateWrapper(weak_factory_.GetWeakPtr(),
171                                          base::MessageLoopProxy::current());
172  connection_ = new WebSocketImpl(delegate_wrapper_, socket.Pass());
173  device_message_loop_->PostTask(
174      FROM_HERE,
175      base::Bind(&WebSocketImpl::StartListening,
176                 base::Unretained(connection_)));
177  OnSocketOpened();
178}
179
180void WebSocketImpl::StartListening() {
181  DCHECK(thread_checker_.CalledOnValidThread());
182  DCHECK(socket_);
183  scoped_refptr<net::IOBuffer> response_buffer =
184      new net::IOBuffer(kBufferSize);
185  int result = socket_->Read(
186      response_buffer.get(),
187      kBufferSize,
188      base::Bind(&WebSocketImpl::OnBytesRead,
189                 base::Unretained(this), response_buffer));
190  if (result != net::ERR_IO_PENDING)
191    OnBytesRead(response_buffer, result);
192}
193
194void WebSocketImpl::OnBytesRead(scoped_refptr<net::IOBuffer> response_buffer,
195                                int result) {
196  DCHECK(thread_checker_.CalledOnValidThread());
197  if (result <= 0) {
198    Disconnect();
199    return;
200  }
201
202  response_buffer_.append(response_buffer->data(), result);
203
204  int bytes_consumed;
205  std::string output;
206  WebSocket::ParseResult parse_result = WebSocket::DecodeFrameHybi17(
207      response_buffer_, false, &bytes_consumed, &output);
208
209  while (parse_result == WebSocket::FRAME_OK) {
210    response_buffer_ = response_buffer_.substr(bytes_consumed);
211    delegate_->OnFrameRead(output);
212    parse_result = WebSocket::DecodeFrameHybi17(
213        response_buffer_, false, &bytes_consumed, &output);
214  }
215
216  if (parse_result == WebSocket::FRAME_ERROR ||
217      parse_result == WebSocket::FRAME_CLOSE) {
218    Disconnect();
219    return;
220  }
221
222  result = socket_->Read(
223      response_buffer.get(),
224      kBufferSize,
225      base::Bind(&WebSocketImpl::OnBytesRead,
226                 base::Unretained(this), response_buffer));
227  if (result != net::ERR_IO_PENDING)
228    OnBytesRead(response_buffer, result);
229}
230
231void WebSocketImpl::SendPendingRequests(int result) {
232  DCHECK(thread_checker_.CalledOnValidThread());
233  if (result < 0) {
234    Disconnect();
235    return;
236  }
237  request_buffer_ = request_buffer_.substr(result);
238  if (request_buffer_.empty())
239    return;
240
241  scoped_refptr<net::StringIOBuffer> buffer =
242      new net::StringIOBuffer(request_buffer_);
243  result = socket_->Write(buffer.get(), buffer->size(),
244                          base::Bind(&WebSocketImpl::SendPendingRequests,
245                                     base::Unretained(this)));
246  if (result != net::ERR_IO_PENDING)
247    SendPendingRequests(result);
248}
249
250void WebSocketImpl::Disconnect() {
251  DCHECK(thread_checker_.CalledOnValidThread());
252  socket_.reset();
253  delegate_->OnSocketClosed();
254}
255
256void AndroidWebSocketImpl::OnSocketOpened() {
257  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
258  delegate_->OnSocketOpened();
259}
260
261void AndroidWebSocketImpl::OnFrameRead(const std::string& message) {
262  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
263  delegate_->OnFrameRead(message);
264}
265
266void AndroidWebSocketImpl::OnSocketClosed() {
267  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
268  delegate_->OnSocketClosed();
269}
270
271}  // namespace
272
273AndroidDeviceManager::AndroidWebSocket*
274AndroidDeviceManager::Device::CreateWebSocket(
275    const std::string& socket,
276    const std::string& url,
277    AndroidDeviceManager::AndroidWebSocket::Delegate* delegate) {
278  return new AndroidWebSocketImpl(
279      device_message_loop_, this, socket, url, delegate);
280}
281