websocket_job.cc revision c407dc5cd9bdc5668497f21b26b09d988ab439de
1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/websockets/websocket_job.h"
6
7#include <algorithm>
8
9#include "base/string_tokenizer.h"
10#include "googleurl/src/gurl.h"
11#include "net/base/net_errors.h"
12#include "net/base/cookie_policy.h"
13#include "net/base/cookie_store.h"
14#include "net/base/io_buffer.h"
15#include "net/http/http_util.h"
16#include "net/url_request/url_request_context.h"
17#include "net/websockets/websocket_frame_handler.h"
18#include "net/websockets/websocket_handshake_handler.h"
19#include "net/websockets/websocket_throttle.h"
20
21namespace {
22
23// lower-case header names.
24const char* const kCookieHeaders[] = {
25  "cookie", "cookie2"
26};
27const char* const kSetCookieHeaders[] = {
28  "set-cookie", "set-cookie2"
29};
30
31net::SocketStreamJob* WebSocketJobFactory(
32    const GURL& url, net::SocketStream::Delegate* delegate) {
33  net::WebSocketJob* job = new net::WebSocketJob(delegate);
34  job->InitSocketStream(new net::SocketStream(url, job));
35  return job;
36}
37
38class WebSocketJobInitSingleton {
39 private:
40  friend struct DefaultSingletonTraits<WebSocketJobInitSingleton>;
41  WebSocketJobInitSingleton() {
42    net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
43    net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
44  }
45};
46
47}  // anonymous namespace
48
49namespace net {
50
51// static
52void WebSocketJob::EnsureInit() {
53  Singleton<WebSocketJobInitSingleton>::get();
54}
55
56WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
57    : delegate_(delegate),
58      state_(INITIALIZED),
59      waiting_(false),
60      callback_(NULL),
61      handshake_request_(new WebSocketHandshakeRequestHandler),
62      handshake_response_(new WebSocketHandshakeResponseHandler),
63      handshake_request_sent_(0),
64      response_cookies_save_index_(0),
65      ALLOW_THIS_IN_INITIALIZER_LIST(can_get_cookies_callback_(
66          this, &WebSocketJob::OnCanGetCookiesCompleted)),
67      ALLOW_THIS_IN_INITIALIZER_LIST(can_set_cookie_callback_(
68          this, &WebSocketJob::OnCanSetCookieCompleted)),
69      send_frame_handler_(new WebSocketFrameHandler),
70      receive_frame_handler_(new WebSocketFrameHandler) {
71}
72
73WebSocketJob::~WebSocketJob() {
74  DCHECK_EQ(CLOSED, state_);
75  DCHECK(!delegate_);
76  DCHECK(!socket_.get());
77}
78
79void WebSocketJob::Connect() {
80  DCHECK(socket_.get());
81  DCHECK_EQ(state_, INITIALIZED);
82  state_ = CONNECTING;
83  socket_->Connect();
84}
85
86bool WebSocketJob::SendData(const char* data, int len) {
87  switch (state_) {
88    case INITIALIZED:
89      return false;
90
91    case CONNECTING:
92      return SendHandshakeRequest(data, len);
93
94    case OPEN:
95      {
96        send_frame_handler_->AppendData(data, len);
97        // If current buffer is sending now, this data will be sent in
98        // SendPending() after current data was sent.
99        // Do not buffer sending data for now.  Since
100        // WebCore::SocketStreamHandle controls traffic to keep number of
101        // pending bytes less than max_pending_send_allowed, so when sending
102        // larger message than max_pending_send_allowed should not be buffered.
103        // If we don't call OnSentData, WebCore::SocketStreamHandle would stop
104        // sending more data when pending data reaches max_pending_send_allowed.
105        // TODO(ukai): Fix this to support compression for larger message.
106        int err = 0;
107        if (!send_frame_handler_->GetCurrentBuffer() &&
108            (err = send_frame_handler_->UpdateCurrentBuffer(false)) > 0) {
109          DCHECK(!current_buffer_);
110          current_buffer_ = new DrainableIOBuffer(
111              send_frame_handler_->GetCurrentBuffer(),
112              send_frame_handler_->GetCurrentBufferSize());
113          return socket_->SendData(
114              current_buffer_->data(), current_buffer_->BytesRemaining());
115        }
116        return err >= 0;
117      }
118
119    case CLOSING:
120    case CLOSED:
121      return false;
122  }
123  return false;
124}
125
126void WebSocketJob::Close() {
127  state_ = CLOSING;
128  if (current_buffer_) {
129    // Will close in SendPending.
130    return;
131  }
132  state_ = CLOSED;
133  socket_->Close();
134}
135
136void WebSocketJob::RestartWithAuth(
137    const std::wstring& username,
138    const std::wstring& password) {
139  state_ = CONNECTING;
140  socket_->RestartWithAuth(username, password);
141}
142
143void WebSocketJob::DetachDelegate() {
144  state_ = CLOSED;
145  Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this);
146  Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary();
147
148  scoped_refptr<WebSocketJob> protect(this);
149
150  delegate_ = NULL;
151  if (socket_)
152    socket_->DetachDelegate();
153  socket_ = NULL;
154  if (callback_) {
155    waiting_ = false;
156    callback_ = NULL;
157    Release();  // Balanced with OnStartOpenConnection().
158  }
159}
160
161int WebSocketJob::OnStartOpenConnection(
162    SocketStream* socket, CompletionCallback* callback) {
163  DCHECK(!callback_);
164  state_ = CONNECTING;
165  addresses_.Copy(socket->address_list().head(), true);
166  Singleton<WebSocketThrottle>::get()->PutInQueue(this);
167  if (!waiting_)
168    return OK;
169  callback_ = callback;
170  AddRef();  // Balanced when callback_ becomes NULL.
171  return ERR_IO_PENDING;
172}
173
174void WebSocketJob::OnConnected(
175    SocketStream* socket, int max_pending_send_allowed) {
176  if (state_ == CLOSED)
177    return;
178  DCHECK_EQ(CONNECTING, state_);
179  if (delegate_)
180    delegate_->OnConnected(socket, max_pending_send_allowed);
181}
182
183void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
184  DCHECK_NE(INITIALIZED, state_);
185  if (state_ == CLOSED)
186    return;
187  if (state_ == CONNECTING) {
188    OnSentHandshakeRequest(socket, amount_sent);
189    return;
190  }
191  if (delegate_) {
192    DCHECK(state_ == OPEN || state_ == CLOSING);
193    DCHECK_GT(amount_sent, 0);
194    DCHECK(current_buffer_);
195    current_buffer_->DidConsume(amount_sent);
196    if (current_buffer_->BytesRemaining() > 0)
197      return;
198
199    // We need to report amount_sent of original buffer size, instead of
200    // amount sent to |socket|.
201    amount_sent = send_frame_handler_->GetOriginalBufferSize();
202    DCHECK_GT(amount_sent, 0);
203    current_buffer_ = NULL;
204    send_frame_handler_->ReleaseCurrentBuffer();
205    delegate_->OnSentData(socket, amount_sent);
206    MessageLoopForIO::current()->PostTask(
207        FROM_HERE, NewRunnableMethod(this, &WebSocketJob::SendPending));
208  }
209}
210
211void WebSocketJob::OnReceivedData(
212    SocketStream* socket, const char* data, int len) {
213  DCHECK_NE(INITIALIZED, state_);
214  if (state_ == CLOSED)
215    return;
216  if (state_ == CONNECTING) {
217    OnReceivedHandshakeResponse(socket, data, len);
218    return;
219  }
220  DCHECK(state_ == OPEN || state_ == CLOSING);
221  std::string received_data;
222  receive_frame_handler_->AppendData(data, len);
223  // Don't buffer receiving data for now.
224  // TODO(ukai): fix performance of WebSocketFrameHandler.
225  while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
226    received_data +=
227        std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
228                    receive_frame_handler_->GetCurrentBufferSize());
229    receive_frame_handler_->ReleaseCurrentBuffer();
230  }
231  if (delegate_ && received_data.size() > 0)
232      delegate_->OnReceivedData(
233          socket, received_data.data(), received_data.size());
234}
235
236void WebSocketJob::OnClose(SocketStream* socket) {
237  state_ = CLOSED;
238  Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this);
239  Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary();
240
241  scoped_refptr<WebSocketJob> protect(this);
242
243  SocketStream::Delegate* delegate = delegate_;
244  delegate_ = NULL;
245  socket_ = NULL;
246  if (callback_) {
247    waiting_ = false;
248    callback_ = NULL;
249    Release();  // Balanced with OnStartOpenConnection().
250  }
251  if (delegate)
252    delegate->OnClose(socket);
253}
254
255void WebSocketJob::OnAuthRequired(
256    SocketStream* socket, AuthChallengeInfo* auth_info) {
257  if (delegate_)
258    delegate_->OnAuthRequired(socket, auth_info);
259}
260
261void WebSocketJob::OnError(const SocketStream* socket, int error) {
262  if (delegate_)
263    delegate_->OnError(socket, error);
264}
265
266bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
267  DCHECK_EQ(state_, CONNECTING);
268  if (!handshake_request_->ParseRequest(data, len))
269    return false;
270
271  // handshake message is completed.
272  AddCookieHeaderAndSend();
273  // Just buffered in |handshake_request_|.
274  return true;
275}
276
277void WebSocketJob::AddCookieHeaderAndSend() {
278  AddRef();  // Balanced in OnCanGetCookiesCompleted
279
280  int policy = OK;
281  if (socket_->context()->cookie_policy()) {
282    GURL url_for_cookies = GetURLForCookies();
283    policy = socket_->context()->cookie_policy()->CanGetCookies(
284        url_for_cookies,
285        url_for_cookies,
286        &can_get_cookies_callback_);
287    if (policy == ERR_IO_PENDING)
288      return;  // Wait for completion callback
289  }
290  OnCanGetCookiesCompleted(policy);
291}
292
293void WebSocketJob::OnCanGetCookiesCompleted(int policy) {
294  if (socket_ && delegate_ && state_ == CONNECTING) {
295    handshake_request_->RemoveHeaders(
296        kCookieHeaders, arraysize(kCookieHeaders));
297    if (policy == OK) {
298      // Add cookies, including HttpOnly cookies.
299      if (socket_->context()->cookie_store()) {
300        CookieOptions cookie_options;
301        cookie_options.set_include_httponly();
302        std::string cookie =
303            socket_->context()->cookie_store()->GetCookiesWithOptions(
304                GetURLForCookies(), cookie_options);
305        if (!cookie.empty())
306          handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
307      }
308    }
309
310    const std::string& handshake_request = handshake_request_->GetRawRequest();
311    handshake_request_sent_ = 0;
312    socket_->SendData(handshake_request.data(),
313                      handshake_request.size());
314  }
315  Release();  // Balance AddRef taken in AddCookieHeaderAndSend
316}
317
318void WebSocketJob::OnSentHandshakeRequest(
319    SocketStream* socket, int amount_sent) {
320  DCHECK_EQ(state_, CONNECTING);
321  handshake_request_sent_ += amount_sent;
322  DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
323  if (handshake_request_sent_ >= handshake_request_->raw_length()) {
324    // handshake request has been sent.
325    // notify original size of handshake request to delegate.
326    if (delegate_)
327      delegate_->OnSentData(
328          socket,
329          handshake_request_->original_length());
330    handshake_request_.reset();
331  }
332}
333
334void WebSocketJob::OnReceivedHandshakeResponse(
335    SocketStream* socket, const char* data, int len) {
336  DCHECK_EQ(state_, CONNECTING);
337  if (handshake_response_->HasResponse()) {
338    // If we already has handshake response, received data should be frame
339    // data, not handshake message.
340    receive_frame_handler_->AppendData(data, len);
341    return;
342  }
343
344  size_t response_length = handshake_response_->ParseRawResponse(data, len);
345  if (!handshake_response_->HasResponse()) {
346    // not yet. we need more data.
347    return;
348  }
349  // handshake message is completed.
350  if (len - response_length > 0) {
351    // If we received extra data, it should be frame data.
352    receive_frame_handler_->AppendData(data + response_length,
353                                       len - response_length);
354  }
355  SaveCookiesAndNotifyHeaderComplete();
356}
357
358void WebSocketJob::SaveCookiesAndNotifyHeaderComplete() {
359  // handshake message is completed.
360  DCHECK(handshake_response_->HasResponse());
361
362  response_cookies_.clear();
363  response_cookies_save_index_ = 0;
364
365  handshake_response_->GetHeaders(
366      kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
367
368  // Now, loop over the response cookies, and attempt to persist each.
369  SaveNextCookie();
370}
371
372void WebSocketJob::SaveNextCookie() {
373  if (response_cookies_save_index_ == response_cookies_.size()) {
374    response_cookies_.clear();
375    response_cookies_save_index_ = 0;
376
377    // Remove cookie headers, with malformed headers preserved.
378    // Actual handshake should be done in WebKit.
379    handshake_response_->RemoveHeaders(
380        kSetCookieHeaders, arraysize(kSetCookieHeaders));
381    std::string received_data = handshake_response_->GetResponse();
382    // Don't buffer receiving data for now.
383    // TODO(ukai): fix performance of WebSocketFrameHandler.
384    while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) {
385      received_data +=
386          std::string(receive_frame_handler_->GetCurrentBuffer()->data(),
387                      receive_frame_handler_->GetCurrentBufferSize());
388      receive_frame_handler_->ReleaseCurrentBuffer();
389    }
390
391    state_ = OPEN;
392    if (delegate_)
393      delegate_->OnReceivedData(
394          socket_, received_data.data(), received_data.size());
395
396    handshake_response_.reset();
397
398    Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this);
399    Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary();
400    return;
401  }
402
403  AddRef();  // Balanced in OnCanSetCookieCompleted
404
405  int policy = OK;
406  if (socket_->context()->cookie_policy()) {
407    GURL url_for_cookies = GetURLForCookies();
408    policy = socket_->context()->cookie_policy()->CanSetCookie(
409        url_for_cookies,
410        url_for_cookies,
411        response_cookies_[response_cookies_save_index_],
412        &can_set_cookie_callback_);
413    if (policy == ERR_IO_PENDING)
414      return;  // Wait for completion callback
415  }
416
417  OnCanSetCookieCompleted(policy);
418}
419
420void WebSocketJob::OnCanSetCookieCompleted(int policy) {
421  if (socket_ && delegate_ && state_ == CONNECTING) {
422    if ((policy == OK || policy == OK_FOR_SESSION_ONLY) &&
423        socket_->context()->cookie_store()) {
424      CookieOptions options;
425      options.set_include_httponly();
426      if (policy == OK_FOR_SESSION_ONLY)
427        options.set_force_session();
428      GURL url_for_cookies = GetURLForCookies();
429      socket_->context()->cookie_store()->SetCookieWithOptions(
430          url_for_cookies, response_cookies_[response_cookies_save_index_],
431          options);
432    }
433    response_cookies_save_index_++;
434    SaveNextCookie();
435  }
436  Release();  // Balance AddRef taken in SaveNextCookie
437}
438
439GURL WebSocketJob::GetURLForCookies() const {
440  GURL url = socket_->url();
441  std::string scheme = socket_->is_secure() ? "https" : "http";
442  url_canon::Replacements<char> replacements;
443  replacements.SetScheme(scheme.c_str(),
444                         url_parse::Component(0, scheme.length()));
445  return url.ReplaceComponents(replacements);
446}
447
448const AddressList& WebSocketJob::address_list() const {
449  return addresses_;
450}
451
452void WebSocketJob::SetWaiting() {
453  waiting_ = true;
454}
455
456bool WebSocketJob::IsWaiting() const {
457  return waiting_;
458}
459
460void WebSocketJob::Wakeup() {
461  if (!waiting_)
462    return;
463  waiting_ = false;
464  DCHECK(callback_);
465  MessageLoopForIO::current()->PostTask(
466      FROM_HERE,
467      NewRunnableMethod(this,
468                        &WebSocketJob::DoCallback));
469}
470
471void WebSocketJob::DoCallback() {
472  // |callback_| may be NULL if OnClose() or DetachDelegate() was called.
473  if (callback_) {
474    net::CompletionCallback* callback = callback_;
475    callback_ = NULL;
476    callback->Run(net::OK);
477    Release();  // Balanced with OnStartOpenConnection().
478  }
479}
480
481void WebSocketJob::SendPending() {
482  if (current_buffer_)
483    return;
484  // Current buffer is done.  Try next buffer if any.
485  // Don't buffer sending data. See comment on case OPEN in SendData().
486  if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) {
487    // No more data to send.
488    if (state_ == CLOSING)
489      socket_->Close();
490    return;
491  }
492  current_buffer_ = new DrainableIOBuffer(
493      send_frame_handler_->GetCurrentBuffer(),
494      send_frame_handler_->GetCurrentBufferSize());
495  socket_->SendData(current_buffer_->data(), current_buffer_->BytesRemaining());
496}
497
498}  // namespace net
499