1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "extensions/browser/api/sockets_tcp/tcp_socket_event_dispatcher.h"
6
7#include "base/lazy_instance.h"
8#include "extensions/browser/api/socket/tcp_socket.h"
9#include "extensions/browser/event_router.h"
10#include "extensions/browser/extension_system.h"
11#include "extensions/browser/extensions_browser_client.h"
12#include "net/base/net_errors.h"
13
14namespace {
15int kDefaultBufferSize = 4096;
16}
17
18namespace extensions {
19namespace core_api {
20
21using content::BrowserThread;
22
23static base::LazyInstance<
24    BrowserContextKeyedAPIFactory<TCPSocketEventDispatcher> > g_factory =
25    LAZY_INSTANCE_INITIALIZER;
26
27// static
28BrowserContextKeyedAPIFactory<TCPSocketEventDispatcher>*
29TCPSocketEventDispatcher::GetFactoryInstance() {
30  return g_factory.Pointer();
31}
32
33// static
34TCPSocketEventDispatcher* TCPSocketEventDispatcher::Get(
35    content::BrowserContext* context) {
36  DCHECK_CURRENTLY_ON(BrowserThread::UI);
37
38  return BrowserContextKeyedAPIFactory<TCPSocketEventDispatcher>::Get(context);
39}
40
41TCPSocketEventDispatcher::TCPSocketEventDispatcher(
42    content::BrowserContext* context)
43    : thread_id_(Socket::kThreadId), browser_context_(context) {
44  ApiResourceManager<ResumableTCPSocket>* manager =
45      ApiResourceManager<ResumableTCPSocket>::Get(browser_context_);
46  DCHECK(manager)
47      << "There is no socket manager. "
48         "If this assertion is failing during a test, then it is likely that "
49         "TestExtensionSystem is failing to provide an instance of "
50         "ApiResourceManager<ResumableTCPSocket>.";
51  sockets_ = manager->data_;
52}
53
54TCPSocketEventDispatcher::~TCPSocketEventDispatcher() {}
55
56TCPSocketEventDispatcher::ReadParams::ReadParams() {}
57
58TCPSocketEventDispatcher::ReadParams::~ReadParams() {}
59
60void TCPSocketEventDispatcher::OnSocketConnect(const std::string& extension_id,
61                                               int socket_id) {
62  DCHECK_CURRENTLY_ON(thread_id_);
63
64  StartSocketRead(extension_id, socket_id);
65}
66
67void TCPSocketEventDispatcher::OnSocketResume(const std::string& extension_id,
68                                              int socket_id) {
69  DCHECK_CURRENTLY_ON(thread_id_);
70
71  StartSocketRead(extension_id, socket_id);
72}
73
74void TCPSocketEventDispatcher::StartSocketRead(const std::string& extension_id,
75                                               int socket_id) {
76  DCHECK_CURRENTLY_ON(thread_id_);
77
78  ReadParams params;
79  params.thread_id = thread_id_;
80  params.browser_context_id = browser_context_;
81  params.extension_id = extension_id;
82  params.sockets = sockets_;
83  params.socket_id = socket_id;
84
85  StartRead(params);
86}
87
88// static
89void TCPSocketEventDispatcher::StartRead(const ReadParams& params) {
90  DCHECK_CURRENTLY_ON(params.thread_id);
91
92  ResumableTCPSocket* socket =
93      params.sockets->Get(params.extension_id, params.socket_id);
94  if (!socket) {
95    // This can happen if the socket is closed while our callback is active.
96    return;
97  }
98  DCHECK(params.extension_id == socket->owner_extension_id())
99      << "Socket has wrong owner.";
100
101  // Don't start another read if the socket has been paused.
102  if (socket->paused())
103    return;
104
105  int buffer_size = socket->buffer_size();
106  if (buffer_size <= 0)
107    buffer_size = kDefaultBufferSize;
108  socket->Read(buffer_size,
109               base::Bind(&TCPSocketEventDispatcher::ReadCallback, params));
110}
111
112// static
113void TCPSocketEventDispatcher::ReadCallback(
114    const ReadParams& params,
115    int bytes_read,
116    scoped_refptr<net::IOBuffer> io_buffer) {
117  DCHECK_CURRENTLY_ON(params.thread_id);
118
119  // If |bytes_read| == 0, the connection has been closed by the peer.
120  // If |bytes_read| < 0, there was a network error, and |bytes_read| is a value
121  // from "net::ERR_".
122
123  if (bytes_read == 0) {
124    bytes_read = net::ERR_CONNECTION_CLOSED;
125  }
126
127  if (bytes_read > 0) {
128    // Dispatch "onReceive" event.
129    sockets_tcp::ReceiveInfo receive_info;
130    receive_info.socket_id = params.socket_id;
131    receive_info.data = std::string(io_buffer->data(), bytes_read);
132    scoped_ptr<base::ListValue> args =
133        sockets_tcp::OnReceive::Create(receive_info);
134    scoped_ptr<Event> event(
135        new Event(sockets_tcp::OnReceive::kEventName, args.Pass()));
136    PostEvent(params, event.Pass());
137
138    // Post a task to delay the read until the socket is available, as
139    // calling StartReceive at this point would error with ERR_IO_PENDING.
140    BrowserThread::PostTask(
141        params.thread_id,
142        FROM_HERE,
143        base::Bind(&TCPSocketEventDispatcher::StartRead, params));
144  } else if (bytes_read == net::ERR_IO_PENDING) {
145    // This happens when resuming a socket which already had an
146    // active "read" callback.
147  } else {
148    // Dispatch "onReceiveError" event but don't start another read to avoid
149    // potential infinite reads if we have a persistent network error.
150    sockets_tcp::ReceiveErrorInfo receive_error_info;
151    receive_error_info.socket_id = params.socket_id;
152    receive_error_info.result_code = bytes_read;
153    scoped_ptr<base::ListValue> args =
154        sockets_tcp::OnReceiveError::Create(receive_error_info);
155    scoped_ptr<Event> event(
156        new Event(sockets_tcp::OnReceiveError::kEventName, args.Pass()));
157    PostEvent(params, event.Pass());
158
159    // Since we got an error, the socket is now "paused" until the application
160    // "resumes" it.
161    ResumableTCPSocket* socket =
162        params.sockets->Get(params.extension_id, params.socket_id);
163    if (socket) {
164      socket->set_paused(true);
165    }
166  }
167}
168
169// static
170void TCPSocketEventDispatcher::PostEvent(const ReadParams& params,
171                                         scoped_ptr<Event> event) {
172  DCHECK_CURRENTLY_ON(params.thread_id);
173
174  BrowserThread::PostTask(BrowserThread::UI,
175                          FROM_HERE,
176                          base::Bind(&DispatchEvent,
177                                     params.browser_context_id,
178                                     params.extension_id,
179                                     base::Passed(event.Pass())));
180}
181
182// static
183void TCPSocketEventDispatcher::DispatchEvent(void* browser_context_id,
184                                             const std::string& extension_id,
185                                             scoped_ptr<Event> event) {
186  DCHECK_CURRENTLY_ON(BrowserThread::UI);
187
188  content::BrowserContext* context =
189      reinterpret_cast<content::BrowserContext*>(browser_context_id);
190  if (!extensions::ExtensionsBrowserClient::Get()->IsValidContext(context))
191    return;
192
193  EventRouter* event_router = EventRouter::Get(context);
194  if (event_router)
195    event_router->DispatchEventToExtension(extension_id, event.Pass());
196}
197
198}  // namespace core_api
199}  // namespace extensions
200