websocket_job.cc revision 7dbb3d5cf0c15f500944d211057644d6a2f37371
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/websockets/websocket_job.h" 6 7#include <algorithm> 8 9#include "base/bind.h" 10#include "base/lazy_instance.h" 11#include "net/base/io_buffer.h" 12#include "net/base/net_errors.h" 13#include "net/base/net_log.h" 14#include "net/cookies/cookie_store.h" 15#include "net/http/http_network_session.h" 16#include "net/http/http_transaction_factory.h" 17#include "net/http/http_util.h" 18#include "net/spdy/spdy_session.h" 19#include "net/spdy/spdy_session_pool.h" 20#include "net/url_request/url_request_context.h" 21#include "net/websockets/websocket_handshake_handler.h" 22#include "net/websockets/websocket_net_log_params.h" 23#include "net/websockets/websocket_throttle.h" 24#include "url/gurl.h" 25 26static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes. 27 28namespace { 29 30// lower-case header names. 31const char* const kCookieHeaders[] = { 32 "cookie", "cookie2" 33}; 34const char* const kSetCookieHeaders[] = { 35 "set-cookie", "set-cookie2" 36}; 37 38net::SocketStreamJob* WebSocketJobFactory( 39 const GURL& url, net::SocketStream::Delegate* delegate) { 40 net::WebSocketJob* job = new net::WebSocketJob(delegate); 41 job->InitSocketStream(new net::SocketStream(url, job)); 42 return job; 43} 44 45class WebSocketJobInitSingleton { 46 private: 47 friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>; 48 WebSocketJobInitSingleton() { 49 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory); 50 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory); 51 } 52}; 53 54static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init = 55 LAZY_INSTANCE_INITIALIZER; 56 57} // anonymous namespace 58 59namespace net { 60 61bool WebSocketJob::websocket_over_spdy_enabled_ = false; 62 63// static 64void WebSocketJob::EnsureInit() { 65 g_websocket_job_init.Get(); 66} 67 68// static 69void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) { 70 websocket_over_spdy_enabled_ = enabled; 71} 72 73WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate) 74 : delegate_(delegate), 75 state_(INITIALIZED), 76 waiting_(false), 77 handshake_request_(new WebSocketHandshakeRequestHandler), 78 handshake_response_(new WebSocketHandshakeResponseHandler), 79 started_to_send_handshake_request_(false), 80 handshake_request_sent_(0), 81 response_cookies_save_index_(0), 82 spdy_protocol_version_(0), 83 save_next_cookie_running_(false), 84 callback_pending_(false), 85 weak_ptr_factory_(this), 86 weak_ptr_factory_for_send_pending_(this) { 87} 88 89WebSocketJob::~WebSocketJob() { 90 DCHECK_EQ(CLOSED, state_); 91 DCHECK(!delegate_); 92 DCHECK(!socket_.get()); 93} 94 95void WebSocketJob::Connect() { 96 DCHECK(socket_.get()); 97 DCHECK_EQ(state_, INITIALIZED); 98 state_ = CONNECTING; 99 socket_->Connect(); 100} 101 102bool WebSocketJob::SendData(const char* data, int len) { 103 switch (state_) { 104 case INITIALIZED: 105 return false; 106 107 case CONNECTING: 108 return SendHandshakeRequest(data, len); 109 110 case OPEN: 111 { 112 scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len); 113 memcpy(buffer->data(), data, len); 114 if (current_send_buffer_.get() || !send_buffer_queue_.empty()) { 115 send_buffer_queue_.push_back(buffer); 116 return true; 117 } 118 current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len); 119 return SendDataInternal(current_send_buffer_->data(), 120 current_send_buffer_->BytesRemaining()); 121 } 122 123 case CLOSING: 124 case CLOSED: 125 return false; 126 } 127 return false; 128} 129 130void WebSocketJob::Close() { 131 if (state_ == CLOSED) 132 return; 133 134 state_ = CLOSING; 135 if (current_send_buffer_.get()) { 136 // Will close in SendPending. 137 return; 138 } 139 state_ = CLOSED; 140 CloseInternal(); 141} 142 143void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) { 144 state_ = CONNECTING; 145 socket_->RestartWithAuth(credentials); 146} 147 148void WebSocketJob::DetachDelegate() { 149 state_ = CLOSED; 150 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 151 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary(); 152 153 scoped_refptr<WebSocketJob> protect(this); 154 weak_ptr_factory_.InvalidateWeakPtrs(); 155 weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs(); 156 157 delegate_ = NULL; 158 if (socket_.get()) 159 socket_->DetachDelegate(); 160 socket_ = NULL; 161 if (!callback_.is_null()) { 162 waiting_ = false; 163 callback_.Reset(); 164 Release(); // Balanced with OnStartOpenConnection(). 165 } 166} 167 168int WebSocketJob::OnStartOpenConnection( 169 SocketStream* socket, const CompletionCallback& callback) { 170 DCHECK(callback_.is_null()); 171 state_ = CONNECTING; 172 addresses_ = socket->address_list(); 173 WebSocketThrottle::GetInstance()->PutInQueue(this); 174 if (delegate_) { 175 int result = delegate_->OnStartOpenConnection(socket, callback); 176 DCHECK_EQ(OK, result); 177 } 178 if (waiting_) { 179 // PutInQueue() may set |waiting_| true for throttling. In this case, 180 // Wakeup() will be called later. 181 callback_ = callback; 182 AddRef(); // Balanced when callback_ is cleared. 183 return ERR_IO_PENDING; 184 } 185 return TrySpdyStream(); 186} 187 188void WebSocketJob::OnConnected( 189 SocketStream* socket, int max_pending_send_allowed) { 190 if (state_ == CLOSED) 191 return; 192 DCHECK_EQ(CONNECTING, state_); 193 if (delegate_) 194 delegate_->OnConnected(socket, max_pending_send_allowed); 195} 196 197void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) { 198 DCHECK_NE(INITIALIZED, state_); 199 DCHECK_GT(amount_sent, 0); 200 if (state_ == CLOSED) 201 return; 202 if (state_ == CONNECTING) { 203 OnSentHandshakeRequest(socket, amount_sent); 204 return; 205 } 206 if (delegate_) { 207 DCHECK(state_ == OPEN || state_ == CLOSING); 208 if (!current_send_buffer_.get()) { 209 VLOG(1) 210 << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent; 211 return; 212 } 213 current_send_buffer_->DidConsume(amount_sent); 214 if (current_send_buffer_->BytesRemaining() > 0) 215 return; 216 217 // We need to report amount_sent of original buffer size, instead of 218 // amount sent to |socket|. 219 amount_sent = current_send_buffer_->size(); 220 DCHECK_GT(amount_sent, 0); 221 current_send_buffer_ = NULL; 222 if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) { 223 base::MessageLoopForIO::current()->PostTask( 224 FROM_HERE, 225 base::Bind(&WebSocketJob::SendPending, 226 weak_ptr_factory_for_send_pending_.GetWeakPtr())); 227 } 228 delegate_->OnSentData(socket, amount_sent); 229 } 230} 231 232void WebSocketJob::OnReceivedData( 233 SocketStream* socket, const char* data, int len) { 234 DCHECK_NE(INITIALIZED, state_); 235 if (state_ == CLOSED) 236 return; 237 if (state_ == CONNECTING) { 238 OnReceivedHandshakeResponse(socket, data, len); 239 return; 240 } 241 DCHECK(state_ == OPEN || state_ == CLOSING); 242 if (delegate_ && len > 0) 243 delegate_->OnReceivedData(socket, data, len); 244} 245 246void WebSocketJob::OnClose(SocketStream* socket) { 247 state_ = CLOSED; 248 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 249 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary(); 250 251 scoped_refptr<WebSocketJob> protect(this); 252 weak_ptr_factory_.InvalidateWeakPtrs(); 253 254 SocketStream::Delegate* delegate = delegate_; 255 delegate_ = NULL; 256 socket_ = NULL; 257 if (!callback_.is_null()) { 258 waiting_ = false; 259 callback_.Reset(); 260 Release(); // Balanced with OnStartOpenConnection(). 261 } 262 if (delegate) 263 delegate->OnClose(socket); 264} 265 266void WebSocketJob::OnAuthRequired( 267 SocketStream* socket, AuthChallengeInfo* auth_info) { 268 if (delegate_) 269 delegate_->OnAuthRequired(socket, auth_info); 270} 271 272void WebSocketJob::OnSSLCertificateError( 273 SocketStream* socket, const SSLInfo& ssl_info, bool fatal) { 274 if (delegate_) 275 delegate_->OnSSLCertificateError(socket, ssl_info, fatal); 276} 277 278void WebSocketJob::OnError(const SocketStream* socket, int error) { 279 if (delegate_ && error != ERR_PROTOCOL_SWITCHED) 280 delegate_->OnError(socket, error); 281} 282 283void WebSocketJob::OnCreatedSpdyStream(int result) { 284 DCHECK(spdy_websocket_stream_.get()); 285 DCHECK(socket_.get()); 286 DCHECK_NE(ERR_IO_PENDING, result); 287 288 if (state_ == CLOSED) { 289 result = ERR_ABORTED; 290 } else if (result == OK) { 291 state_ = CONNECTING; 292 result = ERR_PROTOCOL_SWITCHED; 293 } else { 294 spdy_websocket_stream_.reset(); 295 } 296 297 CompleteIO(result); 298} 299 300void WebSocketJob::OnSentSpdyHeaders() { 301 DCHECK_NE(INITIALIZED, state_); 302 if (state_ != CONNECTING) 303 return; 304 if (delegate_) 305 delegate_->OnSentData(socket_.get(), handshake_request_->original_length()); 306 handshake_request_.reset(); 307} 308 309void WebSocketJob::OnSpdyResponseHeadersUpdated( 310 const SpdyHeaderBlock& response_headers) { 311 DCHECK_NE(INITIALIZED, state_); 312 if (state_ != CONNECTING) 313 return; 314 // TODO(toyoshim): Fallback to non-spdy connection? 315 handshake_response_->ParseResponseHeaderBlock(response_headers, 316 challenge_, 317 spdy_protocol_version_); 318 319 SaveCookiesAndNotifyHeadersComplete(); 320} 321 322void WebSocketJob::OnSentSpdyData(size_t bytes_sent) { 323 DCHECK_NE(INITIALIZED, state_); 324 DCHECK_NE(CONNECTING, state_); 325 if (state_ == CLOSED) 326 return; 327 if (!spdy_websocket_stream_.get()) 328 return; 329 OnSentData(socket_.get(), static_cast<int>(bytes_sent)); 330} 331 332void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) { 333 DCHECK_NE(INITIALIZED, state_); 334 DCHECK_NE(CONNECTING, state_); 335 if (state_ == CLOSED) 336 return; 337 if (!spdy_websocket_stream_.get()) 338 return; 339 if (buffer) { 340 OnReceivedData( 341 socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize()); 342 } else { 343 OnReceivedData(socket_.get(), NULL, 0); 344 } 345} 346 347void WebSocketJob::OnCloseSpdyStream() { 348 spdy_websocket_stream_.reset(); 349 OnClose(socket_.get()); 350} 351 352bool WebSocketJob::SendHandshakeRequest(const char* data, int len) { 353 DCHECK_EQ(state_, CONNECTING); 354 if (started_to_send_handshake_request_) 355 return false; 356 if (!handshake_request_->ParseRequest(data, len)) 357 return false; 358 359 // handshake message is completed. 360 handshake_response_->set_protocol_version( 361 handshake_request_->protocol_version()); 362 AddCookieHeaderAndSend(); 363 return true; 364} 365 366void WebSocketJob::AddCookieHeaderAndSend() { 367 bool allow = true; 368 if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies())) 369 allow = false; 370 371 if (socket_.get() && delegate_ && state_ == CONNECTING) { 372 handshake_request_->RemoveHeaders(kCookieHeaders, 373 arraysize(kCookieHeaders)); 374 if (allow && socket_->context()->cookie_store()) { 375 // Add cookies, including HttpOnly cookies. 376 CookieOptions cookie_options; 377 cookie_options.set_include_httponly(); 378 socket_->context()->cookie_store()->GetCookiesWithOptionsAsync( 379 GetURLForCookies(), cookie_options, 380 base::Bind(&WebSocketJob::LoadCookieCallback, 381 weak_ptr_factory_.GetWeakPtr())); 382 } else { 383 DoSendData(); 384 } 385 } 386} 387 388void WebSocketJob::LoadCookieCallback(const std::string& cookie) { 389 if (!cookie.empty()) 390 // TODO(tyoshino): Sending cookie means that connection doesn't need 391 // kPrivacyModeEnabled as cookies may be server-bound and channel id 392 // wouldn't negatively affect privacy anyway. Need to restart connection 393 // or refactor to determine cookie status prior to connecting. 394 handshake_request_->AppendHeaderIfMissing("Cookie", cookie); 395 DoSendData(); 396} 397 398void WebSocketJob::DoSendData() { 399 if (spdy_websocket_stream_.get()) { 400 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); 401 handshake_request_->GetRequestHeaderBlock( 402 socket_->url(), headers.get(), &challenge_, spdy_protocol_version_); 403 spdy_websocket_stream_->SendRequest(headers.Pass()); 404 } else { 405 const std::string& handshake_request = 406 handshake_request_->GetRawRequest(); 407 handshake_request_sent_ = 0; 408 socket_->net_log()->AddEvent( 409 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS, 410 base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request)); 411 socket_->SendData(handshake_request.data(), 412 handshake_request.size()); 413 } 414 // Just buffered in |handshake_request_|. 415 started_to_send_handshake_request_ = true; 416} 417 418void WebSocketJob::OnSentHandshakeRequest( 419 SocketStream* socket, int amount_sent) { 420 DCHECK_EQ(state_, CONNECTING); 421 handshake_request_sent_ += amount_sent; 422 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length()); 423 if (handshake_request_sent_ >= handshake_request_->raw_length()) { 424 // handshake request has been sent. 425 // notify original size of handshake request to delegate. 426 if (delegate_) 427 delegate_->OnSentData( 428 socket, 429 handshake_request_->original_length()); 430 handshake_request_.reset(); 431 } 432} 433 434void WebSocketJob::OnReceivedHandshakeResponse( 435 SocketStream* socket, const char* data, int len) { 436 DCHECK_EQ(state_, CONNECTING); 437 if (handshake_response_->HasResponse()) { 438 // If we already has handshake response, received data should be frame 439 // data, not handshake message. 440 received_data_after_handshake_.insert( 441 received_data_after_handshake_.end(), data, data + len); 442 return; 443 } 444 445 size_t response_length = handshake_response_->ParseRawResponse(data, len); 446 if (!handshake_response_->HasResponse()) { 447 // not yet. we need more data. 448 return; 449 } 450 // handshake message is completed. 451 std::string raw_response = handshake_response_->GetRawResponse(); 452 socket_->net_log()->AddEvent( 453 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS, 454 base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response)); 455 if (len - response_length > 0) { 456 // If we received extra data, it should be frame data. 457 DCHECK(received_data_after_handshake_.empty()); 458 received_data_after_handshake_.assign(data + response_length, data + len); 459 } 460 SaveCookiesAndNotifyHeadersComplete(); 461} 462 463void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() { 464 // handshake message is completed. 465 DCHECK(handshake_response_->HasResponse()); 466 467 // Extract cookies from the handshake response into a temporary vector. 468 response_cookies_.clear(); 469 response_cookies_save_index_ = 0; 470 471 handshake_response_->GetHeaders( 472 kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_); 473 474 // Now, loop over the response cookies, and attempt to persist each. 475 SaveNextCookie(); 476} 477 478void WebSocketJob::NotifyHeadersComplete() { 479 // Remove cookie headers, with malformed headers preserved. 480 // Actual handshake should be done in Blink. 481 handshake_response_->RemoveHeaders( 482 kSetCookieHeaders, arraysize(kSetCookieHeaders)); 483 std::string handshake_response = handshake_response_->GetResponse(); 484 handshake_response_.reset(); 485 std::vector<char> received_data(handshake_response.begin(), 486 handshake_response.end()); 487 received_data.insert(received_data.end(), 488 received_data_after_handshake_.begin(), 489 received_data_after_handshake_.end()); 490 received_data_after_handshake_.clear(); 491 492 state_ = OPEN; 493 494 DCHECK(!received_data.empty()); 495 if (delegate_) 496 delegate_->OnReceivedData( 497 socket_.get(), &received_data.front(), received_data.size()); 498 499 WebSocketThrottle::GetInstance()->RemoveFromQueue(this); 500 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary(); 501} 502 503void WebSocketJob::SaveNextCookie() { 504 if (!socket_.get() || !delegate_ || state_ != CONNECTING) 505 return; 506 507 callback_pending_ = false; 508 save_next_cookie_running_ = true; 509 510 if (socket_->context()->cookie_store()) { 511 GURL url_for_cookies = GetURLForCookies(); 512 513 CookieOptions options; 514 options.set_include_httponly(); 515 516 // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since 517 // CookieMonster's asynchronous operation APIs queue the callback to run it 518 // on the thread where the API was called, there won't be race. I.e. unless 519 // the callback is run synchronously, it won't be run in parallel with this 520 // method. 521 while (!callback_pending_ && 522 response_cookies_save_index_ < response_cookies_.size()) { 523 std::string cookie = response_cookies_[response_cookies_save_index_]; 524 response_cookies_save_index_++; 525 526 if (!delegate_->CanSetCookie( 527 socket_.get(), url_for_cookies, cookie, &options)) 528 continue; 529 530 callback_pending_ = true; 531 socket_->context()->cookie_store()->SetCookieWithOptionsAsync( 532 url_for_cookies, cookie, options, 533 base::Bind(&WebSocketJob::OnCookieSaved, 534 weak_ptr_factory_.GetWeakPtr())); 535 } 536 } 537 538 save_next_cookie_running_ = false; 539 540 if (callback_pending_) 541 return; 542 543 response_cookies_.clear(); 544 response_cookies_save_index_ = 0; 545 546 NotifyHeadersComplete(); 547} 548 549void WebSocketJob::OnCookieSaved(bool cookie_status) { 550 // Tell the caller of SetCookieWithOptionsAsync() that this completion 551 // callback is invoked. 552 // - If the caller checks callback_pending earlier than this callback, the 553 // caller exits to let this method continue iteration. 554 // - Otherwise, the caller continues iteration. 555 callback_pending_ = false; 556 557 // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited 558 // the loop. Otherwise, return. 559 if (save_next_cookie_running_) 560 return; 561 562 SaveNextCookie(); 563} 564 565GURL WebSocketJob::GetURLForCookies() const { 566 GURL url = socket_->url(); 567 std::string scheme = socket_->is_secure() ? "https" : "http"; 568 url_canon::Replacements<char> replacements; 569 replacements.SetScheme(scheme.c_str(), 570 url_parse::Component(0, scheme.length())); 571 return url.ReplaceComponents(replacements); 572} 573 574const AddressList& WebSocketJob::address_list() const { 575 return addresses_; 576} 577 578int WebSocketJob::TrySpdyStream() { 579 if (!socket_.get()) 580 return ERR_FAILED; 581 582 if (!websocket_over_spdy_enabled_) 583 return OK; 584 585 // Check if we have a SPDY session available. 586 HttpTransactionFactory* factory = 587 socket_->context()->http_transaction_factory(); 588 if (!factory) 589 return OK; 590 scoped_refptr<HttpNetworkSession> session = factory->GetSession(); 591 if (!session.get()) 592 return OK; 593 SpdySessionPool* spdy_pool = session->spdy_session_pool(); 594 PrivacyMode privacy_mode = socket_->privacy_mode(); 595 const SpdySessionKey key(HostPortPair::FromURL(socket_->url()), 596 socket_->proxy_server(), privacy_mode); 597 // Forbid wss downgrade to SPDY without SSL. 598 // TODO(toyoshim): Does it realize the same policy with HTTP? 599 scoped_refptr<SpdySession> spdy_session = 600 spdy_pool->FindAvailableSession(key, *socket_->net_log()); 601 if (!spdy_session) 602 return OK; 603 604 SSLInfo ssl_info; 605 bool was_npn_negotiated; 606 NextProto protocol_negotiated = kProtoUnknown; 607 bool use_ssl = spdy_session->GetSSLInfo( 608 &ssl_info, &was_npn_negotiated, &protocol_negotiated); 609 if (socket_->is_secure() && !use_ssl) 610 return OK; 611 612 // Create SpdyWebSocketStream. 613 spdy_protocol_version_ = spdy_session->GetProtocolVersion(); 614 spdy_websocket_stream_.reset( 615 new SpdyWebSocketStream(spdy_session.get(), this)); 616 617 int result = spdy_websocket_stream_->InitializeStream( 618 socket_->url(), MEDIUM, *socket_->net_log()); 619 if (result == OK) { 620 OnConnected(socket_.get(), kMaxPendingSendAllowed); 621 return ERR_PROTOCOL_SWITCHED; 622 } 623 if (result != ERR_IO_PENDING) { 624 spdy_websocket_stream_.reset(); 625 return OK; 626 } 627 628 return ERR_IO_PENDING; 629} 630 631void WebSocketJob::SetWaiting() { 632 waiting_ = true; 633} 634 635bool WebSocketJob::IsWaiting() const { 636 return waiting_; 637} 638 639void WebSocketJob::Wakeup() { 640 if (!waiting_) 641 return; 642 waiting_ = false; 643 DCHECK(!callback_.is_null()); 644 base::MessageLoopForIO::current()->PostTask( 645 FROM_HERE, 646 base::Bind(&WebSocketJob::RetryPendingIO, 647 weak_ptr_factory_.GetWeakPtr())); 648} 649 650void WebSocketJob::RetryPendingIO() { 651 int result = TrySpdyStream(); 652 653 // In the case of ERR_IO_PENDING, CompleteIO() will be called from 654 // OnCreatedSpdyStream(). 655 if (result != ERR_IO_PENDING) 656 CompleteIO(result); 657} 658 659void WebSocketJob::CompleteIO(int result) { 660 // |callback_| may be null if OnClose() or DetachDelegate() was called. 661 if (!callback_.is_null()) { 662 CompletionCallback callback = callback_; 663 callback_.Reset(); 664 callback.Run(result); 665 Release(); // Balanced with OnStartOpenConnection(). 666 } 667} 668 669bool WebSocketJob::SendDataInternal(const char* data, int length) { 670 if (spdy_websocket_stream_.get()) 671 return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length); 672 if (socket_.get()) 673 return socket_->SendData(data, length); 674 return false; 675} 676 677void WebSocketJob::CloseInternal() { 678 if (spdy_websocket_stream_.get()) 679 spdy_websocket_stream_->Close(); 680 if (socket_.get()) 681 socket_->Close(); 682} 683 684void WebSocketJob::SendPending() { 685 if (current_send_buffer_.get()) 686 return; 687 688 // Current buffer has been sent. Try next if any. 689 if (send_buffer_queue_.empty()) { 690 // No more data to send. 691 if (state_ == CLOSING) 692 CloseInternal(); 693 return; 694 } 695 696 scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front(); 697 send_buffer_queue_.pop_front(); 698 current_send_buffer_ = 699 new DrainableIOBuffer(next_buffer.get(), next_buffer->size()); 700 SendDataInternal(current_send_buffer_->data(), 701 current_send_buffer_->BytesRemaining()); 702} 703 704} // namespace net 705