websocket_job.cc revision c407dc5cd9bdc5668497f21b26b09d988ab439de
1// Copyright (c) 2010 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/websockets/websocket_job.h" 6 7#include <algorithm> 8 9#include "base/string_tokenizer.h" 10#include "googleurl/src/gurl.h" 11#include "net/base/net_errors.h" 12#include "net/base/cookie_policy.h" 13#include "net/base/cookie_store.h" 14#include "net/base/io_buffer.h" 15#include "net/http/http_util.h" 16#include "net/url_request/url_request_context.h" 17#include "net/websockets/websocket_frame_handler.h" 18#include "net/websockets/websocket_handshake_handler.h" 19#include "net/websockets/websocket_throttle.h" 20 21namespace { 22 23// lower-case header names. 24const char* const kCookieHeaders[] = { 25 "cookie", "cookie2" 26}; 27const char* const kSetCookieHeaders[] = { 28 "set-cookie", "set-cookie2" 29}; 30 31net::SocketStreamJob* WebSocketJobFactory( 32 const GURL& url, net::SocketStream::Delegate* delegate) { 33 net::WebSocketJob* job = new net::WebSocketJob(delegate); 34 job->InitSocketStream(new net::SocketStream(url, job)); 35 return job; 36} 37 38class WebSocketJobInitSingleton { 39 private: 40 friend struct DefaultSingletonTraits<WebSocketJobInitSingleton>; 41 WebSocketJobInitSingleton() { 42 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory); 43 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory); 44 } 45}; 46 47} // anonymous namespace 48 49namespace net { 50 51// static 52void WebSocketJob::EnsureInit() { 53 Singleton<WebSocketJobInitSingleton>::get(); 54} 55 56WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate) 57 : delegate_(delegate), 58 state_(INITIALIZED), 59 waiting_(false), 60 callback_(NULL), 61 handshake_request_(new WebSocketHandshakeRequestHandler), 62 handshake_response_(new WebSocketHandshakeResponseHandler), 63 handshake_request_sent_(0), 64 response_cookies_save_index_(0), 65 ALLOW_THIS_IN_INITIALIZER_LIST(can_get_cookies_callback_( 66 this, &WebSocketJob::OnCanGetCookiesCompleted)), 67 ALLOW_THIS_IN_INITIALIZER_LIST(can_set_cookie_callback_( 68 this, &WebSocketJob::OnCanSetCookieCompleted)), 69 send_frame_handler_(new WebSocketFrameHandler), 70 receive_frame_handler_(new WebSocketFrameHandler) { 71} 72 73WebSocketJob::~WebSocketJob() { 74 DCHECK_EQ(CLOSED, state_); 75 DCHECK(!delegate_); 76 DCHECK(!socket_.get()); 77} 78 79void WebSocketJob::Connect() { 80 DCHECK(socket_.get()); 81 DCHECK_EQ(state_, INITIALIZED); 82 state_ = CONNECTING; 83 socket_->Connect(); 84} 85 86bool WebSocketJob::SendData(const char* data, int len) { 87 switch (state_) { 88 case INITIALIZED: 89 return false; 90 91 case CONNECTING: 92 return SendHandshakeRequest(data, len); 93 94 case OPEN: 95 { 96 send_frame_handler_->AppendData(data, len); 97 // If current buffer is sending now, this data will be sent in 98 // SendPending() after current data was sent. 99 // Do not buffer sending data for now. Since 100 // WebCore::SocketStreamHandle controls traffic to keep number of 101 // pending bytes less than max_pending_send_allowed, so when sending 102 // larger message than max_pending_send_allowed should not be buffered. 103 // If we don't call OnSentData, WebCore::SocketStreamHandle would stop 104 // sending more data when pending data reaches max_pending_send_allowed. 105 // TODO(ukai): Fix this to support compression for larger message. 106 int err = 0; 107 if (!send_frame_handler_->GetCurrentBuffer() && 108 (err = send_frame_handler_->UpdateCurrentBuffer(false)) > 0) { 109 DCHECK(!current_buffer_); 110 current_buffer_ = new DrainableIOBuffer( 111 send_frame_handler_->GetCurrentBuffer(), 112 send_frame_handler_->GetCurrentBufferSize()); 113 return socket_->SendData( 114 current_buffer_->data(), current_buffer_->BytesRemaining()); 115 } 116 return err >= 0; 117 } 118 119 case CLOSING: 120 case CLOSED: 121 return false; 122 } 123 return false; 124} 125 126void WebSocketJob::Close() { 127 state_ = CLOSING; 128 if (current_buffer_) { 129 // Will close in SendPending. 130 return; 131 } 132 state_ = CLOSED; 133 socket_->Close(); 134} 135 136void WebSocketJob::RestartWithAuth( 137 const std::wstring& username, 138 const std::wstring& password) { 139 state_ = CONNECTING; 140 socket_->RestartWithAuth(username, password); 141} 142 143void WebSocketJob::DetachDelegate() { 144 state_ = CLOSED; 145 Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this); 146 Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary(); 147 148 scoped_refptr<WebSocketJob> protect(this); 149 150 delegate_ = NULL; 151 if (socket_) 152 socket_->DetachDelegate(); 153 socket_ = NULL; 154 if (callback_) { 155 waiting_ = false; 156 callback_ = NULL; 157 Release(); // Balanced with OnStartOpenConnection(). 158 } 159} 160 161int WebSocketJob::OnStartOpenConnection( 162 SocketStream* socket, CompletionCallback* callback) { 163 DCHECK(!callback_); 164 state_ = CONNECTING; 165 addresses_.Copy(socket->address_list().head(), true); 166 Singleton<WebSocketThrottle>::get()->PutInQueue(this); 167 if (!waiting_) 168 return OK; 169 callback_ = callback; 170 AddRef(); // Balanced when callback_ becomes NULL. 171 return ERR_IO_PENDING; 172} 173 174void WebSocketJob::OnConnected( 175 SocketStream* socket, int max_pending_send_allowed) { 176 if (state_ == CLOSED) 177 return; 178 DCHECK_EQ(CONNECTING, state_); 179 if (delegate_) 180 delegate_->OnConnected(socket, max_pending_send_allowed); 181} 182 183void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) { 184 DCHECK_NE(INITIALIZED, state_); 185 if (state_ == CLOSED) 186 return; 187 if (state_ == CONNECTING) { 188 OnSentHandshakeRequest(socket, amount_sent); 189 return; 190 } 191 if (delegate_) { 192 DCHECK(state_ == OPEN || state_ == CLOSING); 193 DCHECK_GT(amount_sent, 0); 194 DCHECK(current_buffer_); 195 current_buffer_->DidConsume(amount_sent); 196 if (current_buffer_->BytesRemaining() > 0) 197 return; 198 199 // We need to report amount_sent of original buffer size, instead of 200 // amount sent to |socket|. 201 amount_sent = send_frame_handler_->GetOriginalBufferSize(); 202 DCHECK_GT(amount_sent, 0); 203 current_buffer_ = NULL; 204 send_frame_handler_->ReleaseCurrentBuffer(); 205 delegate_->OnSentData(socket, amount_sent); 206 MessageLoopForIO::current()->PostTask( 207 FROM_HERE, NewRunnableMethod(this, &WebSocketJob::SendPending)); 208 } 209} 210 211void WebSocketJob::OnReceivedData( 212 SocketStream* socket, const char* data, int len) { 213 DCHECK_NE(INITIALIZED, state_); 214 if (state_ == CLOSED) 215 return; 216 if (state_ == CONNECTING) { 217 OnReceivedHandshakeResponse(socket, data, len); 218 return; 219 } 220 DCHECK(state_ == OPEN || state_ == CLOSING); 221 std::string received_data; 222 receive_frame_handler_->AppendData(data, len); 223 // Don't buffer receiving data for now. 224 // TODO(ukai): fix performance of WebSocketFrameHandler. 225 while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) { 226 received_data += 227 std::string(receive_frame_handler_->GetCurrentBuffer()->data(), 228 receive_frame_handler_->GetCurrentBufferSize()); 229 receive_frame_handler_->ReleaseCurrentBuffer(); 230 } 231 if (delegate_ && received_data.size() > 0) 232 delegate_->OnReceivedData( 233 socket, received_data.data(), received_data.size()); 234} 235 236void WebSocketJob::OnClose(SocketStream* socket) { 237 state_ = CLOSED; 238 Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this); 239 Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary(); 240 241 scoped_refptr<WebSocketJob> protect(this); 242 243 SocketStream::Delegate* delegate = delegate_; 244 delegate_ = NULL; 245 socket_ = NULL; 246 if (callback_) { 247 waiting_ = false; 248 callback_ = NULL; 249 Release(); // Balanced with OnStartOpenConnection(). 250 } 251 if (delegate) 252 delegate->OnClose(socket); 253} 254 255void WebSocketJob::OnAuthRequired( 256 SocketStream* socket, AuthChallengeInfo* auth_info) { 257 if (delegate_) 258 delegate_->OnAuthRequired(socket, auth_info); 259} 260 261void WebSocketJob::OnError(const SocketStream* socket, int error) { 262 if (delegate_) 263 delegate_->OnError(socket, error); 264} 265 266bool WebSocketJob::SendHandshakeRequest(const char* data, int len) { 267 DCHECK_EQ(state_, CONNECTING); 268 if (!handshake_request_->ParseRequest(data, len)) 269 return false; 270 271 // handshake message is completed. 272 AddCookieHeaderAndSend(); 273 // Just buffered in |handshake_request_|. 274 return true; 275} 276 277void WebSocketJob::AddCookieHeaderAndSend() { 278 AddRef(); // Balanced in OnCanGetCookiesCompleted 279 280 int policy = OK; 281 if (socket_->context()->cookie_policy()) { 282 GURL url_for_cookies = GetURLForCookies(); 283 policy = socket_->context()->cookie_policy()->CanGetCookies( 284 url_for_cookies, 285 url_for_cookies, 286 &can_get_cookies_callback_); 287 if (policy == ERR_IO_PENDING) 288 return; // Wait for completion callback 289 } 290 OnCanGetCookiesCompleted(policy); 291} 292 293void WebSocketJob::OnCanGetCookiesCompleted(int policy) { 294 if (socket_ && delegate_ && state_ == CONNECTING) { 295 handshake_request_->RemoveHeaders( 296 kCookieHeaders, arraysize(kCookieHeaders)); 297 if (policy == OK) { 298 // Add cookies, including HttpOnly cookies. 299 if (socket_->context()->cookie_store()) { 300 CookieOptions cookie_options; 301 cookie_options.set_include_httponly(); 302 std::string cookie = 303 socket_->context()->cookie_store()->GetCookiesWithOptions( 304 GetURLForCookies(), cookie_options); 305 if (!cookie.empty()) 306 handshake_request_->AppendHeaderIfMissing("Cookie", cookie); 307 } 308 } 309 310 const std::string& handshake_request = handshake_request_->GetRawRequest(); 311 handshake_request_sent_ = 0; 312 socket_->SendData(handshake_request.data(), 313 handshake_request.size()); 314 } 315 Release(); // Balance AddRef taken in AddCookieHeaderAndSend 316} 317 318void WebSocketJob::OnSentHandshakeRequest( 319 SocketStream* socket, int amount_sent) { 320 DCHECK_EQ(state_, CONNECTING); 321 handshake_request_sent_ += amount_sent; 322 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length()); 323 if (handshake_request_sent_ >= handshake_request_->raw_length()) { 324 // handshake request has been sent. 325 // notify original size of handshake request to delegate. 326 if (delegate_) 327 delegate_->OnSentData( 328 socket, 329 handshake_request_->original_length()); 330 handshake_request_.reset(); 331 } 332} 333 334void WebSocketJob::OnReceivedHandshakeResponse( 335 SocketStream* socket, const char* data, int len) { 336 DCHECK_EQ(state_, CONNECTING); 337 if (handshake_response_->HasResponse()) { 338 // If we already has handshake response, received data should be frame 339 // data, not handshake message. 340 receive_frame_handler_->AppendData(data, len); 341 return; 342 } 343 344 size_t response_length = handshake_response_->ParseRawResponse(data, len); 345 if (!handshake_response_->HasResponse()) { 346 // not yet. we need more data. 347 return; 348 } 349 // handshake message is completed. 350 if (len - response_length > 0) { 351 // If we received extra data, it should be frame data. 352 receive_frame_handler_->AppendData(data + response_length, 353 len - response_length); 354 } 355 SaveCookiesAndNotifyHeaderComplete(); 356} 357 358void WebSocketJob::SaveCookiesAndNotifyHeaderComplete() { 359 // handshake message is completed. 360 DCHECK(handshake_response_->HasResponse()); 361 362 response_cookies_.clear(); 363 response_cookies_save_index_ = 0; 364 365 handshake_response_->GetHeaders( 366 kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_); 367 368 // Now, loop over the response cookies, and attempt to persist each. 369 SaveNextCookie(); 370} 371 372void WebSocketJob::SaveNextCookie() { 373 if (response_cookies_save_index_ == response_cookies_.size()) { 374 response_cookies_.clear(); 375 response_cookies_save_index_ = 0; 376 377 // Remove cookie headers, with malformed headers preserved. 378 // Actual handshake should be done in WebKit. 379 handshake_response_->RemoveHeaders( 380 kSetCookieHeaders, arraysize(kSetCookieHeaders)); 381 std::string received_data = handshake_response_->GetResponse(); 382 // Don't buffer receiving data for now. 383 // TODO(ukai): fix performance of WebSocketFrameHandler. 384 while (receive_frame_handler_->UpdateCurrentBuffer(false) > 0) { 385 received_data += 386 std::string(receive_frame_handler_->GetCurrentBuffer()->data(), 387 receive_frame_handler_->GetCurrentBufferSize()); 388 receive_frame_handler_->ReleaseCurrentBuffer(); 389 } 390 391 state_ = OPEN; 392 if (delegate_) 393 delegate_->OnReceivedData( 394 socket_, received_data.data(), received_data.size()); 395 396 handshake_response_.reset(); 397 398 Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this); 399 Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary(); 400 return; 401 } 402 403 AddRef(); // Balanced in OnCanSetCookieCompleted 404 405 int policy = OK; 406 if (socket_->context()->cookie_policy()) { 407 GURL url_for_cookies = GetURLForCookies(); 408 policy = socket_->context()->cookie_policy()->CanSetCookie( 409 url_for_cookies, 410 url_for_cookies, 411 response_cookies_[response_cookies_save_index_], 412 &can_set_cookie_callback_); 413 if (policy == ERR_IO_PENDING) 414 return; // Wait for completion callback 415 } 416 417 OnCanSetCookieCompleted(policy); 418} 419 420void WebSocketJob::OnCanSetCookieCompleted(int policy) { 421 if (socket_ && delegate_ && state_ == CONNECTING) { 422 if ((policy == OK || policy == OK_FOR_SESSION_ONLY) && 423 socket_->context()->cookie_store()) { 424 CookieOptions options; 425 options.set_include_httponly(); 426 if (policy == OK_FOR_SESSION_ONLY) 427 options.set_force_session(); 428 GURL url_for_cookies = GetURLForCookies(); 429 socket_->context()->cookie_store()->SetCookieWithOptions( 430 url_for_cookies, response_cookies_[response_cookies_save_index_], 431 options); 432 } 433 response_cookies_save_index_++; 434 SaveNextCookie(); 435 } 436 Release(); // Balance AddRef taken in SaveNextCookie 437} 438 439GURL WebSocketJob::GetURLForCookies() const { 440 GURL url = socket_->url(); 441 std::string scheme = socket_->is_secure() ? "https" : "http"; 442 url_canon::Replacements<char> replacements; 443 replacements.SetScheme(scheme.c_str(), 444 url_parse::Component(0, scheme.length())); 445 return url.ReplaceComponents(replacements); 446} 447 448const AddressList& WebSocketJob::address_list() const { 449 return addresses_; 450} 451 452void WebSocketJob::SetWaiting() { 453 waiting_ = true; 454} 455 456bool WebSocketJob::IsWaiting() const { 457 return waiting_; 458} 459 460void WebSocketJob::Wakeup() { 461 if (!waiting_) 462 return; 463 waiting_ = false; 464 DCHECK(callback_); 465 MessageLoopForIO::current()->PostTask( 466 FROM_HERE, 467 NewRunnableMethod(this, 468 &WebSocketJob::DoCallback)); 469} 470 471void WebSocketJob::DoCallback() { 472 // |callback_| may be NULL if OnClose() or DetachDelegate() was called. 473 if (callback_) { 474 net::CompletionCallback* callback = callback_; 475 callback_ = NULL; 476 callback->Run(net::OK); 477 Release(); // Balanced with OnStartOpenConnection(). 478 } 479} 480 481void WebSocketJob::SendPending() { 482 if (current_buffer_) 483 return; 484 // Current buffer is done. Try next buffer if any. 485 // Don't buffer sending data. See comment on case OPEN in SendData(). 486 if (send_frame_handler_->UpdateCurrentBuffer(false) <= 0) { 487 // No more data to send. 488 if (state_ == CLOSING) 489 socket_->Close(); 490 return; 491 } 492 current_buffer_ = new DrainableIOBuffer( 493 send_frame_handler_->GetCurrentBuffer(), 494 send_frame_handler_->GetCurrentBufferSize()); 495 socket_->SendData(current_buffer_->data(), current_buffer_->BytesRemaining()); 496} 497 498} // namespace net 499