1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/websockets/websocket_job.h"
6
7#include <algorithm>
8
9#include "base/bind.h"
10#include "base/lazy_instance.h"
11#include "net/base/io_buffer.h"
12#include "net/base/net_errors.h"
13#include "net/base/net_log.h"
14#include "net/cookies/cookie_store.h"
15#include "net/http/http_network_session.h"
16#include "net/http/http_transaction_factory.h"
17#include "net/http/http_util.h"
18#include "net/spdy/spdy_session.h"
19#include "net/spdy/spdy_session_pool.h"
20#include "net/url_request/url_request_context.h"
21#include "net/websockets/websocket_handshake_handler.h"
22#include "net/websockets/websocket_net_log_params.h"
23#include "net/websockets/websocket_throttle.h"
24#include "url/gurl.h"
25
26static const int kMaxPendingSendAllowed = 32768;  // 32 kilobytes.
27
28namespace {
29
30// lower-case header names.
31const char* const kCookieHeaders[] = {
32  "cookie", "cookie2"
33};
34const char* const kSetCookieHeaders[] = {
35  "set-cookie", "set-cookie2"
36};
37
38net::SocketStreamJob* WebSocketJobFactory(
39    const GURL& url, net::SocketStream::Delegate* delegate) {
40  net::WebSocketJob* job = new net::WebSocketJob(delegate);
41  job->InitSocketStream(new net::SocketStream(url, job));
42  return job;
43}
44
45class WebSocketJobInitSingleton {
46 private:
47  friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
48  WebSocketJobInitSingleton() {
49    net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
50    net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
51  }
52};
53
54static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
55    LAZY_INSTANCE_INITIALIZER;
56
57}  // anonymous namespace
58
59namespace net {
60
61bool WebSocketJob::websocket_over_spdy_enabled_ = false;
62
63// static
64void WebSocketJob::EnsureInit() {
65  g_websocket_job_init.Get();
66}
67
68// static
69void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) {
70  websocket_over_spdy_enabled_ = enabled;
71}
72
73WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
74    : delegate_(delegate),
75      state_(INITIALIZED),
76      waiting_(false),
77      handshake_request_(new WebSocketHandshakeRequestHandler),
78      handshake_response_(new WebSocketHandshakeResponseHandler),
79      started_to_send_handshake_request_(false),
80      handshake_request_sent_(0),
81      response_cookies_save_index_(0),
82      spdy_protocol_version_(0),
83      save_next_cookie_running_(false),
84      callback_pending_(false),
85      weak_ptr_factory_(this),
86      weak_ptr_factory_for_send_pending_(this) {
87}
88
89WebSocketJob::~WebSocketJob() {
90  DCHECK_EQ(CLOSED, state_);
91  DCHECK(!delegate_);
92  DCHECK(!socket_.get());
93}
94
95void WebSocketJob::Connect() {
96  DCHECK(socket_.get());
97  DCHECK_EQ(state_, INITIALIZED);
98  state_ = CONNECTING;
99  socket_->Connect();
100}
101
102bool WebSocketJob::SendData(const char* data, int len) {
103  switch (state_) {
104    case INITIALIZED:
105      return false;
106
107    case CONNECTING:
108      return SendHandshakeRequest(data, len);
109
110    case OPEN:
111      {
112        scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
113        memcpy(buffer->data(), data, len);
114        if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
115          send_buffer_queue_.push_back(buffer);
116          return true;
117        }
118        current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
119        return SendDataInternal(current_send_buffer_->data(),
120                                current_send_buffer_->BytesRemaining());
121      }
122
123    case CLOSING:
124    case CLOSED:
125      return false;
126  }
127  return false;
128}
129
130void WebSocketJob::Close() {
131  if (state_ == CLOSED)
132    return;
133
134  state_ = CLOSING;
135  if (current_send_buffer_.get()) {
136    // Will close in SendPending.
137    return;
138  }
139  state_ = CLOSED;
140  CloseInternal();
141}
142
143void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
144  state_ = CONNECTING;
145  socket_->RestartWithAuth(credentials);
146}
147
148void WebSocketJob::DetachDelegate() {
149  state_ = CLOSED;
150  WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
151
152  scoped_refptr<WebSocketJob> protect(this);
153  weak_ptr_factory_.InvalidateWeakPtrs();
154  weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
155
156  delegate_ = NULL;
157  if (socket_.get())
158    socket_->DetachDelegate();
159  socket_ = NULL;
160  if (!callback_.is_null()) {
161    waiting_ = false;
162    callback_.Reset();
163    Release();  // Balanced with OnStartOpenConnection().
164  }
165}
166
167int WebSocketJob::OnStartOpenConnection(
168    SocketStream* socket, const CompletionCallback& callback) {
169  DCHECK(callback_.is_null());
170  state_ = CONNECTING;
171
172  addresses_ = socket->address_list();
173  if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
174    return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
175  }
176
177  if (delegate_) {
178    int result = delegate_->OnStartOpenConnection(socket, callback);
179    DCHECK_EQ(OK, result);
180  }
181  if (waiting_) {
182    // PutInQueue() may set |waiting_| true for throttling. In this case,
183    // Wakeup() will be called later.
184    callback_ = callback;
185    AddRef();  // Balanced when callback_ is cleared.
186    return ERR_IO_PENDING;
187  }
188  return TrySpdyStream();
189}
190
191void WebSocketJob::OnConnected(
192    SocketStream* socket, int max_pending_send_allowed) {
193  if (state_ == CLOSED)
194    return;
195  DCHECK_EQ(CONNECTING, state_);
196  if (delegate_)
197    delegate_->OnConnected(socket, max_pending_send_allowed);
198}
199
200void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
201  DCHECK_NE(INITIALIZED, state_);
202  DCHECK_GT(amount_sent, 0);
203  if (state_ == CLOSED)
204    return;
205  if (state_ == CONNECTING) {
206    OnSentHandshakeRequest(socket, amount_sent);
207    return;
208  }
209  if (delegate_) {
210    DCHECK(state_ == OPEN || state_ == CLOSING);
211    if (!current_send_buffer_.get()) {
212      VLOG(1)
213          << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
214      return;
215    }
216    current_send_buffer_->DidConsume(amount_sent);
217    if (current_send_buffer_->BytesRemaining() > 0)
218      return;
219
220    // We need to report amount_sent of original buffer size, instead of
221    // amount sent to |socket|.
222    amount_sent = current_send_buffer_->size();
223    DCHECK_GT(amount_sent, 0);
224    current_send_buffer_ = NULL;
225    if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
226      base::MessageLoopForIO::current()->PostTask(
227          FROM_HERE,
228          base::Bind(&WebSocketJob::SendPending,
229                     weak_ptr_factory_for_send_pending_.GetWeakPtr()));
230    }
231    delegate_->OnSentData(socket, amount_sent);
232  }
233}
234
235void WebSocketJob::OnReceivedData(
236    SocketStream* socket, const char* data, int len) {
237  DCHECK_NE(INITIALIZED, state_);
238  if (state_ == CLOSED)
239    return;
240  if (state_ == CONNECTING) {
241    OnReceivedHandshakeResponse(socket, data, len);
242    return;
243  }
244  DCHECK(state_ == OPEN || state_ == CLOSING);
245  if (delegate_ && len > 0)
246    delegate_->OnReceivedData(socket, data, len);
247}
248
249void WebSocketJob::OnClose(SocketStream* socket) {
250  state_ = CLOSED;
251  WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
252
253  scoped_refptr<WebSocketJob> protect(this);
254  weak_ptr_factory_.InvalidateWeakPtrs();
255
256  SocketStream::Delegate* delegate = delegate_;
257  delegate_ = NULL;
258  socket_ = NULL;
259  if (!callback_.is_null()) {
260    waiting_ = false;
261    callback_.Reset();
262    Release();  // Balanced with OnStartOpenConnection().
263  }
264  if (delegate)
265    delegate->OnClose(socket);
266}
267
268void WebSocketJob::OnAuthRequired(
269    SocketStream* socket, AuthChallengeInfo* auth_info) {
270  if (delegate_)
271    delegate_->OnAuthRequired(socket, auth_info);
272}
273
274void WebSocketJob::OnSSLCertificateError(
275    SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
276  if (delegate_)
277    delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
278}
279
280void WebSocketJob::OnError(const SocketStream* socket, int error) {
281  if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
282    delegate_->OnError(socket, error);
283}
284
285void WebSocketJob::OnCreatedSpdyStream(int result) {
286  DCHECK(spdy_websocket_stream_.get());
287  DCHECK(socket_.get());
288  DCHECK_NE(ERR_IO_PENDING, result);
289
290  if (state_ == CLOSED) {
291    result = ERR_ABORTED;
292  } else if (result == OK) {
293    state_ = CONNECTING;
294    result = ERR_PROTOCOL_SWITCHED;
295  } else {
296    spdy_websocket_stream_.reset();
297  }
298
299  CompleteIO(result);
300}
301
302void WebSocketJob::OnSentSpdyHeaders() {
303  DCHECK_NE(INITIALIZED, state_);
304  if (state_ != CONNECTING)
305    return;
306  if (delegate_)
307    delegate_->OnSentData(socket_.get(), handshake_request_->original_length());
308  handshake_request_.reset();
309}
310
311void WebSocketJob::OnSpdyResponseHeadersUpdated(
312    const SpdyHeaderBlock& response_headers) {
313  DCHECK_NE(INITIALIZED, state_);
314  if (state_ != CONNECTING)
315    return;
316  // TODO(toyoshim): Fallback to non-spdy connection?
317  handshake_response_->ParseResponseHeaderBlock(response_headers,
318                                                challenge_,
319                                                spdy_protocol_version_);
320
321  SaveCookiesAndNotifyHeadersComplete();
322}
323
324void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
325  DCHECK_NE(INITIALIZED, state_);
326  DCHECK_NE(CONNECTING, state_);
327  if (state_ == CLOSED)
328    return;
329  if (!spdy_websocket_stream_.get())
330    return;
331  OnSentData(socket_.get(), static_cast<int>(bytes_sent));
332}
333
334void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
335  DCHECK_NE(INITIALIZED, state_);
336  DCHECK_NE(CONNECTING, state_);
337  if (state_ == CLOSED)
338    return;
339  if (!spdy_websocket_stream_.get())
340    return;
341  if (buffer) {
342    OnReceivedData(
343        socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
344  } else {
345    OnReceivedData(socket_.get(), NULL, 0);
346  }
347}
348
349void WebSocketJob::OnCloseSpdyStream() {
350  spdy_websocket_stream_.reset();
351  OnClose(socket_.get());
352}
353
354bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
355  DCHECK_EQ(state_, CONNECTING);
356  if (started_to_send_handshake_request_)
357    return false;
358  if (!handshake_request_->ParseRequest(data, len))
359    return false;
360
361  AddCookieHeaderAndSend();
362  return true;
363}
364
365void WebSocketJob::AddCookieHeaderAndSend() {
366  bool allow = true;
367  if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
368    allow = false;
369
370  if (socket_.get() && delegate_ && state_ == CONNECTING) {
371    handshake_request_->RemoveHeaders(kCookieHeaders,
372                                      arraysize(kCookieHeaders));
373    if (allow && socket_->context()->cookie_store()) {
374      // Add cookies, including HttpOnly cookies.
375      CookieOptions cookie_options;
376      cookie_options.set_include_httponly();
377      socket_->context()->cookie_store()->GetCookiesWithOptionsAsync(
378          GetURLForCookies(), cookie_options,
379          base::Bind(&WebSocketJob::LoadCookieCallback,
380                     weak_ptr_factory_.GetWeakPtr()));
381    } else {
382      DoSendData();
383    }
384  }
385}
386
387void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
388  if (!cookie.empty())
389    // TODO(tyoshino): Sending cookie means that connection doesn't need
390    // kPrivacyModeEnabled as cookies may be server-bound and channel id
391    // wouldn't negatively affect privacy anyway. Need to restart connection
392    // or refactor to determine cookie status prior to connecting.
393    handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
394  DoSendData();
395}
396
397void WebSocketJob::DoSendData() {
398  if (spdy_websocket_stream_.get()) {
399    scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
400    handshake_request_->GetRequestHeaderBlock(
401        socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
402    spdy_websocket_stream_->SendRequest(headers.Pass());
403  } else {
404    const std::string& handshake_request =
405        handshake_request_->GetRawRequest();
406    handshake_request_sent_ = 0;
407    socket_->net_log()->AddEvent(
408        NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
409        base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
410    socket_->SendData(handshake_request.data(),
411                      handshake_request.size());
412  }
413  // Just buffered in |handshake_request_|.
414  started_to_send_handshake_request_ = true;
415}
416
417void WebSocketJob::OnSentHandshakeRequest(
418    SocketStream* socket, int amount_sent) {
419  DCHECK_EQ(state_, CONNECTING);
420  handshake_request_sent_ += amount_sent;
421  DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
422  if (handshake_request_sent_ >= handshake_request_->raw_length()) {
423    // handshake request has been sent.
424    // notify original size of handshake request to delegate.
425    if (delegate_)
426      delegate_->OnSentData(
427          socket,
428          handshake_request_->original_length());
429    handshake_request_.reset();
430  }
431}
432
433void WebSocketJob::OnReceivedHandshakeResponse(
434    SocketStream* socket, const char* data, int len) {
435  DCHECK_EQ(state_, CONNECTING);
436  if (handshake_response_->HasResponse()) {
437    // If we already has handshake response, received data should be frame
438    // data, not handshake message.
439    received_data_after_handshake_.insert(
440        received_data_after_handshake_.end(), data, data + len);
441    return;
442  }
443
444  size_t response_length = handshake_response_->ParseRawResponse(data, len);
445  if (!handshake_response_->HasResponse()) {
446    // not yet. we need more data.
447    return;
448  }
449  // handshake message is completed.
450  std::string raw_response = handshake_response_->GetRawResponse();
451  socket_->net_log()->AddEvent(
452      NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
453      base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
454  if (len - response_length > 0) {
455    // If we received extra data, it should be frame data.
456    DCHECK(received_data_after_handshake_.empty());
457    received_data_after_handshake_.assign(data + response_length, data + len);
458  }
459  SaveCookiesAndNotifyHeadersComplete();
460}
461
462void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
463  // handshake message is completed.
464  DCHECK(handshake_response_->HasResponse());
465
466  // Extract cookies from the handshake response into a temporary vector.
467  response_cookies_.clear();
468  response_cookies_save_index_ = 0;
469
470  handshake_response_->GetHeaders(
471      kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
472
473  // Now, loop over the response cookies, and attempt to persist each.
474  SaveNextCookie();
475}
476
477void WebSocketJob::NotifyHeadersComplete() {
478  // Remove cookie headers, with malformed headers preserved.
479  // Actual handshake should be done in Blink.
480  handshake_response_->RemoveHeaders(
481      kSetCookieHeaders, arraysize(kSetCookieHeaders));
482  std::string handshake_response = handshake_response_->GetResponse();
483  handshake_response_.reset();
484  std::vector<char> received_data(handshake_response.begin(),
485                                  handshake_response.end());
486  received_data.insert(received_data.end(),
487                       received_data_after_handshake_.begin(),
488                       received_data_after_handshake_.end());
489  received_data_after_handshake_.clear();
490
491  state_ = OPEN;
492
493  DCHECK(!received_data.empty());
494  if (delegate_)
495    delegate_->OnReceivedData(
496        socket_.get(), &received_data.front(), received_data.size());
497
498  WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
499}
500
501void WebSocketJob::SaveNextCookie() {
502  if (!socket_.get() || !delegate_ || state_ != CONNECTING)
503    return;
504
505  callback_pending_ = false;
506  save_next_cookie_running_ = true;
507
508  if (socket_->context()->cookie_store()) {
509    GURL url_for_cookies = GetURLForCookies();
510
511    CookieOptions options;
512    options.set_include_httponly();
513
514    // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
515    // CookieMonster's asynchronous operation APIs queue the callback to run it
516    // on the thread where the API was called, there won't be race. I.e. unless
517    // the callback is run synchronously, it won't be run in parallel with this
518    // method.
519    while (!callback_pending_ &&
520           response_cookies_save_index_ < response_cookies_.size()) {
521      std::string cookie = response_cookies_[response_cookies_save_index_];
522      response_cookies_save_index_++;
523
524      if (!delegate_->CanSetCookie(
525              socket_.get(), url_for_cookies, cookie, &options))
526        continue;
527
528      callback_pending_ = true;
529      socket_->context()->cookie_store()->SetCookieWithOptionsAsync(
530          url_for_cookies, cookie, options,
531          base::Bind(&WebSocketJob::OnCookieSaved,
532                     weak_ptr_factory_.GetWeakPtr()));
533    }
534  }
535
536  save_next_cookie_running_ = false;
537
538  if (callback_pending_)
539    return;
540
541  response_cookies_.clear();
542  response_cookies_save_index_ = 0;
543
544  NotifyHeadersComplete();
545}
546
547void WebSocketJob::OnCookieSaved(bool cookie_status) {
548  // Tell the caller of SetCookieWithOptionsAsync() that this completion
549  // callback is invoked.
550  // - If the caller checks callback_pending earlier than this callback, the
551  //   caller exits to let this method continue iteration.
552  // - Otherwise, the caller continues iteration.
553  callback_pending_ = false;
554
555  // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
556  // the loop. Otherwise, return.
557  if (save_next_cookie_running_)
558    return;
559
560  SaveNextCookie();
561}
562
563GURL WebSocketJob::GetURLForCookies() const {
564  GURL url = socket_->url();
565  std::string scheme = socket_->is_secure() ? "https" : "http";
566  url_canon::Replacements<char> replacements;
567  replacements.SetScheme(scheme.c_str(),
568                         url_parse::Component(0, scheme.length()));
569  return url.ReplaceComponents(replacements);
570}
571
572const AddressList& WebSocketJob::address_list() const {
573  return addresses_;
574}
575
576int WebSocketJob::TrySpdyStream() {
577  if (!socket_.get())
578    return ERR_FAILED;
579
580  if (!websocket_over_spdy_enabled_)
581    return OK;
582
583  // Check if we have a SPDY session available.
584  HttpTransactionFactory* factory =
585      socket_->context()->http_transaction_factory();
586  if (!factory)
587    return OK;
588  scoped_refptr<HttpNetworkSession> session = factory->GetSession();
589  if (!session.get())
590    return OK;
591  SpdySessionPool* spdy_pool = session->spdy_session_pool();
592  PrivacyMode privacy_mode = socket_->privacy_mode();
593  const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
594                           socket_->proxy_server(), privacy_mode);
595  // Forbid wss downgrade to SPDY without SSL.
596  // TODO(toyoshim): Does it realize the same policy with HTTP?
597  base::WeakPtr<SpdySession> spdy_session =
598      spdy_pool->FindAvailableSession(key, *socket_->net_log());
599  if (!spdy_session)
600    return OK;
601
602  SSLInfo ssl_info;
603  bool was_npn_negotiated;
604  NextProto protocol_negotiated = kProtoUnknown;
605  bool use_ssl = spdy_session->GetSSLInfo(
606      &ssl_info, &was_npn_negotiated, &protocol_negotiated);
607  if (socket_->is_secure() && !use_ssl)
608    return OK;
609
610  // Create SpdyWebSocketStream.
611  spdy_protocol_version_ = spdy_session->GetProtocolVersion();
612  spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
613
614  int result = spdy_websocket_stream_->InitializeStream(
615      socket_->url(), MEDIUM, *socket_->net_log());
616  if (result == OK) {
617    OnConnected(socket_.get(), kMaxPendingSendAllowed);
618    return ERR_PROTOCOL_SWITCHED;
619  }
620  if (result != ERR_IO_PENDING) {
621    spdy_websocket_stream_.reset();
622    return OK;
623  }
624
625  return ERR_IO_PENDING;
626}
627
628void WebSocketJob::SetWaiting() {
629  waiting_ = true;
630}
631
632bool WebSocketJob::IsWaiting() const {
633  return waiting_;
634}
635
636void WebSocketJob::Wakeup() {
637  if (!waiting_)
638    return;
639  waiting_ = false;
640  DCHECK(!callback_.is_null());
641  base::MessageLoopForIO::current()->PostTask(
642      FROM_HERE,
643      base::Bind(&WebSocketJob::RetryPendingIO,
644                 weak_ptr_factory_.GetWeakPtr()));
645}
646
647void WebSocketJob::RetryPendingIO() {
648  int result = TrySpdyStream();
649
650  // In the case of ERR_IO_PENDING, CompleteIO() will be called from
651  // OnCreatedSpdyStream().
652  if (result != ERR_IO_PENDING)
653    CompleteIO(result);
654}
655
656void WebSocketJob::CompleteIO(int result) {
657  // |callback_| may be null if OnClose() or DetachDelegate() was called.
658  if (!callback_.is_null()) {
659    CompletionCallback callback = callback_;
660    callback_.Reset();
661    callback.Run(result);
662    Release();  // Balanced with OnStartOpenConnection().
663  }
664}
665
666bool WebSocketJob::SendDataInternal(const char* data, int length) {
667  if (spdy_websocket_stream_.get())
668    return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
669  if (socket_.get())
670    return socket_->SendData(data, length);
671  return false;
672}
673
674void WebSocketJob::CloseInternal() {
675  if (spdy_websocket_stream_.get())
676    spdy_websocket_stream_->Close();
677  if (socket_.get())
678    socket_->Close();
679}
680
681void WebSocketJob::SendPending() {
682  if (current_send_buffer_.get())
683    return;
684
685  // Current buffer has been sent. Try next if any.
686  if (send_buffer_queue_.empty()) {
687    // No more data to send.
688    if (state_ == CLOSING)
689      CloseInternal();
690    return;
691  }
692
693  scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
694  send_buffer_queue_.pop_front();
695  current_send_buffer_ =
696      new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
697  SendDataInternal(current_send_buffer_->data(),
698                   current_send_buffer_->BytesRemaining());
699}
700
701}  // namespace net
702