websocket_job.cc revision 7dbb3d5cf0c15f500944d211057644d6a2f37371
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/websockets/websocket_job.h"
6
7#include <algorithm>
8
9#include "base/bind.h"
10#include "base/lazy_instance.h"
11#include "net/base/io_buffer.h"
12#include "net/base/net_errors.h"
13#include "net/base/net_log.h"
14#include "net/cookies/cookie_store.h"
15#include "net/http/http_network_session.h"
16#include "net/http/http_transaction_factory.h"
17#include "net/http/http_util.h"
18#include "net/spdy/spdy_session.h"
19#include "net/spdy/spdy_session_pool.h"
20#include "net/url_request/url_request_context.h"
21#include "net/websockets/websocket_handshake_handler.h"
22#include "net/websockets/websocket_net_log_params.h"
23#include "net/websockets/websocket_throttle.h"
24#include "url/gurl.h"
25
26static const int kMaxPendingSendAllowed = 32768;  // 32 kilobytes.
27
28namespace {
29
30// lower-case header names.
31const char* const kCookieHeaders[] = {
32  "cookie", "cookie2"
33};
34const char* const kSetCookieHeaders[] = {
35  "set-cookie", "set-cookie2"
36};
37
38net::SocketStreamJob* WebSocketJobFactory(
39    const GURL& url, net::SocketStream::Delegate* delegate) {
40  net::WebSocketJob* job = new net::WebSocketJob(delegate);
41  job->InitSocketStream(new net::SocketStream(url, job));
42  return job;
43}
44
45class WebSocketJobInitSingleton {
46 private:
47  friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
48  WebSocketJobInitSingleton() {
49    net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
50    net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
51  }
52};
53
54static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
55    LAZY_INSTANCE_INITIALIZER;
56
57}  // anonymous namespace
58
59namespace net {
60
61bool WebSocketJob::websocket_over_spdy_enabled_ = false;
62
63// static
64void WebSocketJob::EnsureInit() {
65  g_websocket_job_init.Get();
66}
67
68// static
69void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) {
70  websocket_over_spdy_enabled_ = enabled;
71}
72
73WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
74    : delegate_(delegate),
75      state_(INITIALIZED),
76      waiting_(false),
77      handshake_request_(new WebSocketHandshakeRequestHandler),
78      handshake_response_(new WebSocketHandshakeResponseHandler),
79      started_to_send_handshake_request_(false),
80      handshake_request_sent_(0),
81      response_cookies_save_index_(0),
82      spdy_protocol_version_(0),
83      save_next_cookie_running_(false),
84      callback_pending_(false),
85      weak_ptr_factory_(this),
86      weak_ptr_factory_for_send_pending_(this) {
87}
88
89WebSocketJob::~WebSocketJob() {
90  DCHECK_EQ(CLOSED, state_);
91  DCHECK(!delegate_);
92  DCHECK(!socket_.get());
93}
94
95void WebSocketJob::Connect() {
96  DCHECK(socket_.get());
97  DCHECK_EQ(state_, INITIALIZED);
98  state_ = CONNECTING;
99  socket_->Connect();
100}
101
102bool WebSocketJob::SendData(const char* data, int len) {
103  switch (state_) {
104    case INITIALIZED:
105      return false;
106
107    case CONNECTING:
108      return SendHandshakeRequest(data, len);
109
110    case OPEN:
111      {
112        scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
113        memcpy(buffer->data(), data, len);
114        if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
115          send_buffer_queue_.push_back(buffer);
116          return true;
117        }
118        current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
119        return SendDataInternal(current_send_buffer_->data(),
120                                current_send_buffer_->BytesRemaining());
121      }
122
123    case CLOSING:
124    case CLOSED:
125      return false;
126  }
127  return false;
128}
129
130void WebSocketJob::Close() {
131  if (state_ == CLOSED)
132    return;
133
134  state_ = CLOSING;
135  if (current_send_buffer_.get()) {
136    // Will close in SendPending.
137    return;
138  }
139  state_ = CLOSED;
140  CloseInternal();
141}
142
143void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
144  state_ = CONNECTING;
145  socket_->RestartWithAuth(credentials);
146}
147
148void WebSocketJob::DetachDelegate() {
149  state_ = CLOSED;
150  WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
151  WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
152
153  scoped_refptr<WebSocketJob> protect(this);
154  weak_ptr_factory_.InvalidateWeakPtrs();
155  weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
156
157  delegate_ = NULL;
158  if (socket_.get())
159    socket_->DetachDelegate();
160  socket_ = NULL;
161  if (!callback_.is_null()) {
162    waiting_ = false;
163    callback_.Reset();
164    Release();  // Balanced with OnStartOpenConnection().
165  }
166}
167
168int WebSocketJob::OnStartOpenConnection(
169    SocketStream* socket, const CompletionCallback& callback) {
170  DCHECK(callback_.is_null());
171  state_ = CONNECTING;
172  addresses_ = socket->address_list();
173  WebSocketThrottle::GetInstance()->PutInQueue(this);
174  if (delegate_) {
175    int result = delegate_->OnStartOpenConnection(socket, callback);
176    DCHECK_EQ(OK, result);
177  }
178  if (waiting_) {
179    // PutInQueue() may set |waiting_| true for throttling. In this case,
180    // Wakeup() will be called later.
181    callback_ = callback;
182    AddRef();  // Balanced when callback_ is cleared.
183    return ERR_IO_PENDING;
184  }
185  return TrySpdyStream();
186}
187
188void WebSocketJob::OnConnected(
189    SocketStream* socket, int max_pending_send_allowed) {
190  if (state_ == CLOSED)
191    return;
192  DCHECK_EQ(CONNECTING, state_);
193  if (delegate_)
194    delegate_->OnConnected(socket, max_pending_send_allowed);
195}
196
197void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
198  DCHECK_NE(INITIALIZED, state_);
199  DCHECK_GT(amount_sent, 0);
200  if (state_ == CLOSED)
201    return;
202  if (state_ == CONNECTING) {
203    OnSentHandshakeRequest(socket, amount_sent);
204    return;
205  }
206  if (delegate_) {
207    DCHECK(state_ == OPEN || state_ == CLOSING);
208    if (!current_send_buffer_.get()) {
209      VLOG(1)
210          << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
211      return;
212    }
213    current_send_buffer_->DidConsume(amount_sent);
214    if (current_send_buffer_->BytesRemaining() > 0)
215      return;
216
217    // We need to report amount_sent of original buffer size, instead of
218    // amount sent to |socket|.
219    amount_sent = current_send_buffer_->size();
220    DCHECK_GT(amount_sent, 0);
221    current_send_buffer_ = NULL;
222    if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
223      base::MessageLoopForIO::current()->PostTask(
224          FROM_HERE,
225          base::Bind(&WebSocketJob::SendPending,
226                     weak_ptr_factory_for_send_pending_.GetWeakPtr()));
227    }
228    delegate_->OnSentData(socket, amount_sent);
229  }
230}
231
232void WebSocketJob::OnReceivedData(
233    SocketStream* socket, const char* data, int len) {
234  DCHECK_NE(INITIALIZED, state_);
235  if (state_ == CLOSED)
236    return;
237  if (state_ == CONNECTING) {
238    OnReceivedHandshakeResponse(socket, data, len);
239    return;
240  }
241  DCHECK(state_ == OPEN || state_ == CLOSING);
242  if (delegate_ && len > 0)
243    delegate_->OnReceivedData(socket, data, len);
244}
245
246void WebSocketJob::OnClose(SocketStream* socket) {
247  state_ = CLOSED;
248  WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
249  WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
250
251  scoped_refptr<WebSocketJob> protect(this);
252  weak_ptr_factory_.InvalidateWeakPtrs();
253
254  SocketStream::Delegate* delegate = delegate_;
255  delegate_ = NULL;
256  socket_ = NULL;
257  if (!callback_.is_null()) {
258    waiting_ = false;
259    callback_.Reset();
260    Release();  // Balanced with OnStartOpenConnection().
261  }
262  if (delegate)
263    delegate->OnClose(socket);
264}
265
266void WebSocketJob::OnAuthRequired(
267    SocketStream* socket, AuthChallengeInfo* auth_info) {
268  if (delegate_)
269    delegate_->OnAuthRequired(socket, auth_info);
270}
271
272void WebSocketJob::OnSSLCertificateError(
273    SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
274  if (delegate_)
275    delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
276}
277
278void WebSocketJob::OnError(const SocketStream* socket, int error) {
279  if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
280    delegate_->OnError(socket, error);
281}
282
283void WebSocketJob::OnCreatedSpdyStream(int result) {
284  DCHECK(spdy_websocket_stream_.get());
285  DCHECK(socket_.get());
286  DCHECK_NE(ERR_IO_PENDING, result);
287
288  if (state_ == CLOSED) {
289    result = ERR_ABORTED;
290  } else if (result == OK) {
291    state_ = CONNECTING;
292    result = ERR_PROTOCOL_SWITCHED;
293  } else {
294    spdy_websocket_stream_.reset();
295  }
296
297  CompleteIO(result);
298}
299
300void WebSocketJob::OnSentSpdyHeaders() {
301  DCHECK_NE(INITIALIZED, state_);
302  if (state_ != CONNECTING)
303    return;
304  if (delegate_)
305    delegate_->OnSentData(socket_.get(), handshake_request_->original_length());
306  handshake_request_.reset();
307}
308
309void WebSocketJob::OnSpdyResponseHeadersUpdated(
310    const SpdyHeaderBlock& response_headers) {
311  DCHECK_NE(INITIALIZED, state_);
312  if (state_ != CONNECTING)
313    return;
314  // TODO(toyoshim): Fallback to non-spdy connection?
315  handshake_response_->ParseResponseHeaderBlock(response_headers,
316                                                challenge_,
317                                                spdy_protocol_version_);
318
319  SaveCookiesAndNotifyHeadersComplete();
320}
321
322void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
323  DCHECK_NE(INITIALIZED, state_);
324  DCHECK_NE(CONNECTING, state_);
325  if (state_ == CLOSED)
326    return;
327  if (!spdy_websocket_stream_.get())
328    return;
329  OnSentData(socket_.get(), static_cast<int>(bytes_sent));
330}
331
332void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
333  DCHECK_NE(INITIALIZED, state_);
334  DCHECK_NE(CONNECTING, state_);
335  if (state_ == CLOSED)
336    return;
337  if (!spdy_websocket_stream_.get())
338    return;
339  if (buffer) {
340    OnReceivedData(
341        socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
342  } else {
343    OnReceivedData(socket_.get(), NULL, 0);
344  }
345}
346
347void WebSocketJob::OnCloseSpdyStream() {
348  spdy_websocket_stream_.reset();
349  OnClose(socket_.get());
350}
351
352bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
353  DCHECK_EQ(state_, CONNECTING);
354  if (started_to_send_handshake_request_)
355    return false;
356  if (!handshake_request_->ParseRequest(data, len))
357    return false;
358
359  // handshake message is completed.
360  handshake_response_->set_protocol_version(
361      handshake_request_->protocol_version());
362  AddCookieHeaderAndSend();
363  return true;
364}
365
366void WebSocketJob::AddCookieHeaderAndSend() {
367  bool allow = true;
368  if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
369    allow = false;
370
371  if (socket_.get() && delegate_ && state_ == CONNECTING) {
372    handshake_request_->RemoveHeaders(kCookieHeaders,
373                                      arraysize(kCookieHeaders));
374    if (allow && socket_->context()->cookie_store()) {
375      // Add cookies, including HttpOnly cookies.
376      CookieOptions cookie_options;
377      cookie_options.set_include_httponly();
378      socket_->context()->cookie_store()->GetCookiesWithOptionsAsync(
379          GetURLForCookies(), cookie_options,
380          base::Bind(&WebSocketJob::LoadCookieCallback,
381                     weak_ptr_factory_.GetWeakPtr()));
382    } else {
383      DoSendData();
384    }
385  }
386}
387
388void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
389  if (!cookie.empty())
390    // TODO(tyoshino): Sending cookie means that connection doesn't need
391    // kPrivacyModeEnabled as cookies may be server-bound and channel id
392    // wouldn't negatively affect privacy anyway. Need to restart connection
393    // or refactor to determine cookie status prior to connecting.
394    handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
395  DoSendData();
396}
397
398void WebSocketJob::DoSendData() {
399  if (spdy_websocket_stream_.get()) {
400    scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
401    handshake_request_->GetRequestHeaderBlock(
402        socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
403    spdy_websocket_stream_->SendRequest(headers.Pass());
404  } else {
405    const std::string& handshake_request =
406        handshake_request_->GetRawRequest();
407    handshake_request_sent_ = 0;
408    socket_->net_log()->AddEvent(
409        NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
410        base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
411    socket_->SendData(handshake_request.data(),
412                      handshake_request.size());
413  }
414  // Just buffered in |handshake_request_|.
415  started_to_send_handshake_request_ = true;
416}
417
418void WebSocketJob::OnSentHandshakeRequest(
419    SocketStream* socket, int amount_sent) {
420  DCHECK_EQ(state_, CONNECTING);
421  handshake_request_sent_ += amount_sent;
422  DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
423  if (handshake_request_sent_ >= handshake_request_->raw_length()) {
424    // handshake request has been sent.
425    // notify original size of handshake request to delegate.
426    if (delegate_)
427      delegate_->OnSentData(
428          socket,
429          handshake_request_->original_length());
430    handshake_request_.reset();
431  }
432}
433
434void WebSocketJob::OnReceivedHandshakeResponse(
435    SocketStream* socket, const char* data, int len) {
436  DCHECK_EQ(state_, CONNECTING);
437  if (handshake_response_->HasResponse()) {
438    // If we already has handshake response, received data should be frame
439    // data, not handshake message.
440    received_data_after_handshake_.insert(
441        received_data_after_handshake_.end(), data, data + len);
442    return;
443  }
444
445  size_t response_length = handshake_response_->ParseRawResponse(data, len);
446  if (!handshake_response_->HasResponse()) {
447    // not yet. we need more data.
448    return;
449  }
450  // handshake message is completed.
451  std::string raw_response = handshake_response_->GetRawResponse();
452  socket_->net_log()->AddEvent(
453      NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
454      base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
455  if (len - response_length > 0) {
456    // If we received extra data, it should be frame data.
457    DCHECK(received_data_after_handshake_.empty());
458    received_data_after_handshake_.assign(data + response_length, data + len);
459  }
460  SaveCookiesAndNotifyHeadersComplete();
461}
462
463void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
464  // handshake message is completed.
465  DCHECK(handshake_response_->HasResponse());
466
467  // Extract cookies from the handshake response into a temporary vector.
468  response_cookies_.clear();
469  response_cookies_save_index_ = 0;
470
471  handshake_response_->GetHeaders(
472      kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
473
474  // Now, loop over the response cookies, and attempt to persist each.
475  SaveNextCookie();
476}
477
478void WebSocketJob::NotifyHeadersComplete() {
479  // Remove cookie headers, with malformed headers preserved.
480  // Actual handshake should be done in Blink.
481  handshake_response_->RemoveHeaders(
482      kSetCookieHeaders, arraysize(kSetCookieHeaders));
483  std::string handshake_response = handshake_response_->GetResponse();
484  handshake_response_.reset();
485  std::vector<char> received_data(handshake_response.begin(),
486                                  handshake_response.end());
487  received_data.insert(received_data.end(),
488                       received_data_after_handshake_.begin(),
489                       received_data_after_handshake_.end());
490  received_data_after_handshake_.clear();
491
492  state_ = OPEN;
493
494  DCHECK(!received_data.empty());
495  if (delegate_)
496    delegate_->OnReceivedData(
497        socket_.get(), &received_data.front(), received_data.size());
498
499  WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
500  WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
501}
502
503void WebSocketJob::SaveNextCookie() {
504  if (!socket_.get() || !delegate_ || state_ != CONNECTING)
505    return;
506
507  callback_pending_ = false;
508  save_next_cookie_running_ = true;
509
510  if (socket_->context()->cookie_store()) {
511    GURL url_for_cookies = GetURLForCookies();
512
513    CookieOptions options;
514    options.set_include_httponly();
515
516    // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
517    // CookieMonster's asynchronous operation APIs queue the callback to run it
518    // on the thread where the API was called, there won't be race. I.e. unless
519    // the callback is run synchronously, it won't be run in parallel with this
520    // method.
521    while (!callback_pending_ &&
522           response_cookies_save_index_ < response_cookies_.size()) {
523      std::string cookie = response_cookies_[response_cookies_save_index_];
524      response_cookies_save_index_++;
525
526      if (!delegate_->CanSetCookie(
527              socket_.get(), url_for_cookies, cookie, &options))
528        continue;
529
530      callback_pending_ = true;
531      socket_->context()->cookie_store()->SetCookieWithOptionsAsync(
532          url_for_cookies, cookie, options,
533          base::Bind(&WebSocketJob::OnCookieSaved,
534                     weak_ptr_factory_.GetWeakPtr()));
535    }
536  }
537
538  save_next_cookie_running_ = false;
539
540  if (callback_pending_)
541    return;
542
543  response_cookies_.clear();
544  response_cookies_save_index_ = 0;
545
546  NotifyHeadersComplete();
547}
548
549void WebSocketJob::OnCookieSaved(bool cookie_status) {
550  // Tell the caller of SetCookieWithOptionsAsync() that this completion
551  // callback is invoked.
552  // - If the caller checks callback_pending earlier than this callback, the
553  //   caller exits to let this method continue iteration.
554  // - Otherwise, the caller continues iteration.
555  callback_pending_ = false;
556
557  // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
558  // the loop. Otherwise, return.
559  if (save_next_cookie_running_)
560    return;
561
562  SaveNextCookie();
563}
564
565GURL WebSocketJob::GetURLForCookies() const {
566  GURL url = socket_->url();
567  std::string scheme = socket_->is_secure() ? "https" : "http";
568  url_canon::Replacements<char> replacements;
569  replacements.SetScheme(scheme.c_str(),
570                         url_parse::Component(0, scheme.length()));
571  return url.ReplaceComponents(replacements);
572}
573
574const AddressList& WebSocketJob::address_list() const {
575  return addresses_;
576}
577
578int WebSocketJob::TrySpdyStream() {
579  if (!socket_.get())
580    return ERR_FAILED;
581
582  if (!websocket_over_spdy_enabled_)
583    return OK;
584
585  // Check if we have a SPDY session available.
586  HttpTransactionFactory* factory =
587      socket_->context()->http_transaction_factory();
588  if (!factory)
589    return OK;
590  scoped_refptr<HttpNetworkSession> session = factory->GetSession();
591  if (!session.get())
592    return OK;
593  SpdySessionPool* spdy_pool = session->spdy_session_pool();
594  PrivacyMode privacy_mode = socket_->privacy_mode();
595  const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
596                           socket_->proxy_server(), privacy_mode);
597  // Forbid wss downgrade to SPDY without SSL.
598  // TODO(toyoshim): Does it realize the same policy with HTTP?
599  scoped_refptr<SpdySession> spdy_session =
600      spdy_pool->FindAvailableSession(key, *socket_->net_log());
601  if (!spdy_session)
602    return OK;
603
604  SSLInfo ssl_info;
605  bool was_npn_negotiated;
606  NextProto protocol_negotiated = kProtoUnknown;
607  bool use_ssl = spdy_session->GetSSLInfo(
608      &ssl_info, &was_npn_negotiated, &protocol_negotiated);
609  if (socket_->is_secure() && !use_ssl)
610    return OK;
611
612  // Create SpdyWebSocketStream.
613  spdy_protocol_version_ = spdy_session->GetProtocolVersion();
614  spdy_websocket_stream_.reset(
615      new SpdyWebSocketStream(spdy_session.get(), this));
616
617  int result = spdy_websocket_stream_->InitializeStream(
618      socket_->url(), MEDIUM, *socket_->net_log());
619  if (result == OK) {
620    OnConnected(socket_.get(), kMaxPendingSendAllowed);
621    return ERR_PROTOCOL_SWITCHED;
622  }
623  if (result != ERR_IO_PENDING) {
624    spdy_websocket_stream_.reset();
625    return OK;
626  }
627
628  return ERR_IO_PENDING;
629}
630
631void WebSocketJob::SetWaiting() {
632  waiting_ = true;
633}
634
635bool WebSocketJob::IsWaiting() const {
636  return waiting_;
637}
638
639void WebSocketJob::Wakeup() {
640  if (!waiting_)
641    return;
642  waiting_ = false;
643  DCHECK(!callback_.is_null());
644  base::MessageLoopForIO::current()->PostTask(
645      FROM_HERE,
646      base::Bind(&WebSocketJob::RetryPendingIO,
647                 weak_ptr_factory_.GetWeakPtr()));
648}
649
650void WebSocketJob::RetryPendingIO() {
651  int result = TrySpdyStream();
652
653  // In the case of ERR_IO_PENDING, CompleteIO() will be called from
654  // OnCreatedSpdyStream().
655  if (result != ERR_IO_PENDING)
656    CompleteIO(result);
657}
658
659void WebSocketJob::CompleteIO(int result) {
660  // |callback_| may be null if OnClose() or DetachDelegate() was called.
661  if (!callback_.is_null()) {
662    CompletionCallback callback = callback_;
663    callback_.Reset();
664    callback.Run(result);
665    Release();  // Balanced with OnStartOpenConnection().
666  }
667}
668
669bool WebSocketJob::SendDataInternal(const char* data, int length) {
670  if (spdy_websocket_stream_.get())
671    return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
672  if (socket_.get())
673    return socket_->SendData(data, length);
674  return false;
675}
676
677void WebSocketJob::CloseInternal() {
678  if (spdy_websocket_stream_.get())
679    spdy_websocket_stream_->Close();
680  if (socket_.get())
681    socket_->Close();
682}
683
684void WebSocketJob::SendPending() {
685  if (current_send_buffer_.get())
686    return;
687
688  // Current buffer has been sent. Try next if any.
689  if (send_buffer_queue_.empty()) {
690    // No more data to send.
691    if (state_ == CLOSING)
692      CloseInternal();
693    return;
694  }
695
696  scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
697  send_buffer_queue_.pop_front();
698  current_send_buffer_ =
699      new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
700  SendDataInternal(current_send_buffer_->data(),
701                   current_send_buffer_->BytesRemaining());
702}
703
704}  // namespace net
705