1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/websockets/websocket_job.h"
6
7#include <algorithm>
8
9#include "base/bind.h"
10#include "base/lazy_instance.h"
11#include "net/base/io_buffer.h"
12#include "net/base/net_errors.h"
13#include "net/base/net_log.h"
14#include "net/cookies/cookie_store.h"
15#include "net/http/http_network_session.h"
16#include "net/http/http_transaction_factory.h"
17#include "net/http/http_util.h"
18#include "net/spdy/spdy_session.h"
19#include "net/spdy/spdy_session_pool.h"
20#include "net/url_request/url_request_context.h"
21#include "net/websockets/websocket_handshake_handler.h"
22#include "net/websockets/websocket_net_log_params.h"
23#include "net/websockets/websocket_throttle.h"
24#include "url/gurl.h"
25
26static const int kMaxPendingSendAllowed = 32768;  // 32 kilobytes.
27
28namespace {
29
30// lower-case header names.
31const char* const kCookieHeaders[] = {
32  "cookie", "cookie2"
33};
34const char* const kSetCookieHeaders[] = {
35  "set-cookie", "set-cookie2"
36};
37
38net::SocketStreamJob* WebSocketJobFactory(
39    const GURL& url, net::SocketStream::Delegate* delegate,
40    net::URLRequestContext* context, net::CookieStore* cookie_store) {
41  net::WebSocketJob* job = new net::WebSocketJob(delegate);
42  job->InitSocketStream(new net::SocketStream(url, job, context, cookie_store));
43  return job;
44}
45
46class WebSocketJobInitSingleton {
47 private:
48  friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
49  WebSocketJobInitSingleton() {
50    net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
51    net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
52  }
53};
54
55static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
56    LAZY_INSTANCE_INITIALIZER;
57
58}  // anonymous namespace
59
60namespace net {
61
62// static
63void WebSocketJob::EnsureInit() {
64  g_websocket_job_init.Get();
65}
66
67WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
68    : delegate_(delegate),
69      state_(INITIALIZED),
70      waiting_(false),
71      handshake_request_(new WebSocketHandshakeRequestHandler),
72      handshake_response_(new WebSocketHandshakeResponseHandler),
73      started_to_send_handshake_request_(false),
74      handshake_request_sent_(0),
75      response_cookies_save_index_(0),
76      spdy_protocol_version_(0),
77      save_next_cookie_running_(false),
78      callback_pending_(false),
79      weak_ptr_factory_(this),
80      weak_ptr_factory_for_send_pending_(this) {
81}
82
83WebSocketJob::~WebSocketJob() {
84  DCHECK_EQ(CLOSED, state_);
85  DCHECK(!delegate_);
86  DCHECK(!socket_.get());
87}
88
89void WebSocketJob::Connect() {
90  DCHECK(socket_.get());
91  DCHECK_EQ(state_, INITIALIZED);
92  state_ = CONNECTING;
93  socket_->Connect();
94}
95
96bool WebSocketJob::SendData(const char* data, int len) {
97  switch (state_) {
98    case INITIALIZED:
99      return false;
100
101    case CONNECTING:
102      return SendHandshakeRequest(data, len);
103
104    case OPEN:
105      {
106        scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
107        memcpy(buffer->data(), data, len);
108        if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
109          send_buffer_queue_.push_back(buffer);
110          return true;
111        }
112        current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
113        return SendDataInternal(current_send_buffer_->data(),
114                                current_send_buffer_->BytesRemaining());
115      }
116
117    case CLOSING:
118    case CLOSED:
119      return false;
120  }
121  return false;
122}
123
124void WebSocketJob::Close() {
125  if (state_ == CLOSED)
126    return;
127
128  state_ = CLOSING;
129  if (current_send_buffer_.get()) {
130    // Will close in SendPending.
131    return;
132  }
133  state_ = CLOSED;
134  CloseInternal();
135}
136
137void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
138  state_ = CONNECTING;
139  socket_->RestartWithAuth(credentials);
140}
141
142void WebSocketJob::DetachDelegate() {
143  state_ = CLOSED;
144  WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
145
146  scoped_refptr<WebSocketJob> protect(this);
147  weak_ptr_factory_.InvalidateWeakPtrs();
148  weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
149
150  delegate_ = NULL;
151  if (socket_.get())
152    socket_->DetachDelegate();
153  socket_ = NULL;
154  if (!callback_.is_null()) {
155    waiting_ = false;
156    callback_.Reset();
157    Release();  // Balanced with OnStartOpenConnection().
158  }
159}
160
161int WebSocketJob::OnStartOpenConnection(
162    SocketStream* socket, const CompletionCallback& callback) {
163  DCHECK(callback_.is_null());
164  state_ = CONNECTING;
165
166  addresses_ = socket->address_list();
167  if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
168    return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
169  }
170
171  if (delegate_) {
172    int result = delegate_->OnStartOpenConnection(socket, callback);
173    DCHECK_EQ(OK, result);
174  }
175  if (waiting_) {
176    // PutInQueue() may set |waiting_| true for throttling. In this case,
177    // Wakeup() will be called later.
178    callback_ = callback;
179    AddRef();  // Balanced when callback_ is cleared.
180    return ERR_IO_PENDING;
181  }
182  return TrySpdyStream();
183}
184
185void WebSocketJob::OnConnected(
186    SocketStream* socket, int max_pending_send_allowed) {
187  if (state_ == CLOSED)
188    return;
189  DCHECK_EQ(CONNECTING, state_);
190  if (delegate_)
191    delegate_->OnConnected(socket, max_pending_send_allowed);
192}
193
194void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
195  DCHECK_NE(INITIALIZED, state_);
196  DCHECK_GT(amount_sent, 0);
197  if (state_ == CLOSED)
198    return;
199  if (state_ == CONNECTING) {
200    OnSentHandshakeRequest(socket, amount_sent);
201    return;
202  }
203  if (delegate_) {
204    DCHECK(state_ == OPEN || state_ == CLOSING);
205    if (!current_send_buffer_.get()) {
206      VLOG(1)
207          << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
208      return;
209    }
210    current_send_buffer_->DidConsume(amount_sent);
211    if (current_send_buffer_->BytesRemaining() > 0)
212      return;
213
214    // We need to report amount_sent of original buffer size, instead of
215    // amount sent to |socket|.
216    amount_sent = current_send_buffer_->size();
217    DCHECK_GT(amount_sent, 0);
218    current_send_buffer_ = NULL;
219    if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
220      base::MessageLoopForIO::current()->PostTask(
221          FROM_HERE,
222          base::Bind(&WebSocketJob::SendPending,
223                     weak_ptr_factory_for_send_pending_.GetWeakPtr()));
224    }
225    delegate_->OnSentData(socket, amount_sent);
226  }
227}
228
229void WebSocketJob::OnReceivedData(
230    SocketStream* socket, const char* data, int len) {
231  DCHECK_NE(INITIALIZED, state_);
232  if (state_ == CLOSED)
233    return;
234  if (state_ == CONNECTING) {
235    OnReceivedHandshakeResponse(socket, data, len);
236    return;
237  }
238  DCHECK(state_ == OPEN || state_ == CLOSING);
239  if (delegate_ && len > 0)
240    delegate_->OnReceivedData(socket, data, len);
241}
242
243void WebSocketJob::OnClose(SocketStream* socket) {
244  state_ = CLOSED;
245  WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
246
247  scoped_refptr<WebSocketJob> protect(this);
248  weak_ptr_factory_.InvalidateWeakPtrs();
249
250  SocketStream::Delegate* delegate = delegate_;
251  delegate_ = NULL;
252  socket_ = NULL;
253  if (!callback_.is_null()) {
254    waiting_ = false;
255    callback_.Reset();
256    Release();  // Balanced with OnStartOpenConnection().
257  }
258  if (delegate)
259    delegate->OnClose(socket);
260}
261
262void WebSocketJob::OnAuthRequired(
263    SocketStream* socket, AuthChallengeInfo* auth_info) {
264  if (delegate_)
265    delegate_->OnAuthRequired(socket, auth_info);
266}
267
268void WebSocketJob::OnSSLCertificateError(
269    SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
270  if (delegate_)
271    delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
272}
273
274void WebSocketJob::OnError(const SocketStream* socket, int error) {
275  if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
276    delegate_->OnError(socket, error);
277}
278
279void WebSocketJob::OnCreatedSpdyStream(int result) {
280  DCHECK(spdy_websocket_stream_.get());
281  DCHECK(socket_.get());
282  DCHECK_NE(ERR_IO_PENDING, result);
283
284  if (state_ == CLOSED) {
285    result = ERR_ABORTED;
286  } else if (result == OK) {
287    state_ = CONNECTING;
288    result = ERR_PROTOCOL_SWITCHED;
289  } else {
290    spdy_websocket_stream_.reset();
291  }
292
293  CompleteIO(result);
294}
295
296void WebSocketJob::OnSentSpdyHeaders() {
297  DCHECK_NE(INITIALIZED, state_);
298  if (state_ != CONNECTING)
299    return;
300  size_t original_length = handshake_request_->original_length();
301  handshake_request_.reset();
302  if (delegate_)
303    delegate_->OnSentData(socket_.get(), original_length);
304}
305
306void WebSocketJob::OnSpdyResponseHeadersUpdated(
307    const SpdyHeaderBlock& response_headers) {
308  DCHECK_NE(INITIALIZED, state_);
309  if (state_ != CONNECTING)
310    return;
311  // TODO(toyoshim): Fallback to non-spdy connection?
312  handshake_response_->ParseResponseHeaderBlock(response_headers,
313                                                challenge_,
314                                                spdy_protocol_version_);
315
316  SaveCookiesAndNotifyHeadersComplete();
317}
318
319void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
320  DCHECK_NE(INITIALIZED, state_);
321  DCHECK_NE(CONNECTING, state_);
322  if (state_ == CLOSED)
323    return;
324  if (!spdy_websocket_stream_.get())
325    return;
326  OnSentData(socket_.get(), static_cast<int>(bytes_sent));
327}
328
329void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
330  DCHECK_NE(INITIALIZED, state_);
331  DCHECK_NE(CONNECTING, state_);
332  if (state_ == CLOSED)
333    return;
334  if (!spdy_websocket_stream_.get())
335    return;
336  if (buffer) {
337    OnReceivedData(
338        socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
339  } else {
340    OnReceivedData(socket_.get(), NULL, 0);
341  }
342}
343
344void WebSocketJob::OnCloseSpdyStream() {
345  spdy_websocket_stream_.reset();
346  OnClose(socket_.get());
347}
348
349bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
350  DCHECK_EQ(state_, CONNECTING);
351  if (started_to_send_handshake_request_)
352    return false;
353  if (!handshake_request_->ParseRequest(data, len))
354    return false;
355
356  AddCookieHeaderAndSend();
357  return true;
358}
359
360void WebSocketJob::AddCookieHeaderAndSend() {
361  bool allow = true;
362  if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
363    allow = false;
364
365  if (socket_.get() && delegate_ && state_ == CONNECTING) {
366    handshake_request_->RemoveHeaders(kCookieHeaders,
367                                      arraysize(kCookieHeaders));
368    if (allow && socket_->cookie_store()) {
369      // Add cookies, including HttpOnly cookies.
370      CookieOptions cookie_options;
371      cookie_options.set_include_httponly();
372      socket_->cookie_store()->GetCookiesWithOptionsAsync(
373          GetURLForCookies(), cookie_options,
374          base::Bind(&WebSocketJob::LoadCookieCallback,
375                     weak_ptr_factory_.GetWeakPtr()));
376    } else {
377      DoSendData();
378    }
379  }
380}
381
382void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
383  if (!cookie.empty())
384    // TODO(tyoshino): Sending cookie means that connection doesn't need
385    // PRIVACY_MODE_ENABLED as cookies may be server-bound and channel id
386    // wouldn't negatively affect privacy anyway. Need to restart connection
387    // or refactor to determine cookie status prior to connecting.
388    handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
389  DoSendData();
390}
391
392void WebSocketJob::DoSendData() {
393  if (spdy_websocket_stream_.get()) {
394    scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
395    handshake_request_->GetRequestHeaderBlock(
396        socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
397    spdy_websocket_stream_->SendRequest(headers.Pass());
398  } else {
399    const std::string& handshake_request =
400        handshake_request_->GetRawRequest();
401    handshake_request_sent_ = 0;
402    socket_->net_log()->AddEvent(
403        NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
404        base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
405    socket_->SendData(handshake_request.data(),
406                      handshake_request.size());
407  }
408  // Just buffered in |handshake_request_|.
409  started_to_send_handshake_request_ = true;
410}
411
412void WebSocketJob::OnSentHandshakeRequest(
413    SocketStream* socket, int amount_sent) {
414  DCHECK_EQ(state_, CONNECTING);
415  handshake_request_sent_ += amount_sent;
416  DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
417  if (handshake_request_sent_ >= handshake_request_->raw_length()) {
418    // handshake request has been sent.
419    // notify original size of handshake request to delegate.
420    // Reset the handshake_request_ first in case this object is deleted by the
421    // delegate.
422    size_t original_length = handshake_request_->original_length();
423    handshake_request_.reset();
424    if (delegate_)
425      delegate_->OnSentData(socket, original_length);
426  }
427}
428
429void WebSocketJob::OnReceivedHandshakeResponse(
430    SocketStream* socket, const char* data, int len) {
431  DCHECK_EQ(state_, CONNECTING);
432  if (handshake_response_->HasResponse()) {
433    // If we already has handshake response, received data should be frame
434    // data, not handshake message.
435    received_data_after_handshake_.insert(
436        received_data_after_handshake_.end(), data, data + len);
437    return;
438  }
439
440  size_t response_length = handshake_response_->ParseRawResponse(data, len);
441  if (!handshake_response_->HasResponse()) {
442    // not yet. we need more data.
443    return;
444  }
445  // handshake message is completed.
446  std::string raw_response = handshake_response_->GetRawResponse();
447  socket_->net_log()->AddEvent(
448      NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
449      base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
450  if (len - response_length > 0) {
451    // If we received extra data, it should be frame data.
452    DCHECK(received_data_after_handshake_.empty());
453    received_data_after_handshake_.assign(data + response_length, data + len);
454  }
455  SaveCookiesAndNotifyHeadersComplete();
456}
457
458void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
459  // handshake message is completed.
460  DCHECK(handshake_response_->HasResponse());
461
462  // Extract cookies from the handshake response into a temporary vector.
463  response_cookies_.clear();
464  response_cookies_save_index_ = 0;
465
466  handshake_response_->GetHeaders(
467      kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
468
469  // Now, loop over the response cookies, and attempt to persist each.
470  SaveNextCookie();
471}
472
473void WebSocketJob::NotifyHeadersComplete() {
474  // Remove cookie headers, with malformed headers preserved.
475  // Actual handshake should be done in Blink.
476  handshake_response_->RemoveHeaders(
477      kSetCookieHeaders, arraysize(kSetCookieHeaders));
478  std::string handshake_response = handshake_response_->GetResponse();
479  handshake_response_.reset();
480  std::vector<char> received_data(handshake_response.begin(),
481                                  handshake_response.end());
482  received_data.insert(received_data.end(),
483                       received_data_after_handshake_.begin(),
484                       received_data_after_handshake_.end());
485  received_data_after_handshake_.clear();
486
487  state_ = OPEN;
488
489  DCHECK(!received_data.empty());
490  if (delegate_)
491    delegate_->OnReceivedData(
492        socket_.get(), &received_data.front(), received_data.size());
493
494  WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
495}
496
497void WebSocketJob::SaveNextCookie() {
498  if (!socket_.get() || !delegate_ || state_ != CONNECTING)
499    return;
500
501  callback_pending_ = false;
502  save_next_cookie_running_ = true;
503
504  if (socket_->cookie_store()) {
505    GURL url_for_cookies = GetURLForCookies();
506
507    CookieOptions options;
508    options.set_include_httponly();
509
510    // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
511    // CookieMonster's asynchronous operation APIs queue the callback to run it
512    // on the thread where the API was called, there won't be race. I.e. unless
513    // the callback is run synchronously, it won't be run in parallel with this
514    // method.
515    while (!callback_pending_ &&
516           response_cookies_save_index_ < response_cookies_.size()) {
517      std::string cookie = response_cookies_[response_cookies_save_index_];
518      response_cookies_save_index_++;
519
520      if (!delegate_->CanSetCookie(
521              socket_.get(), url_for_cookies, cookie, &options))
522        continue;
523
524      callback_pending_ = true;
525      socket_->cookie_store()->SetCookieWithOptionsAsync(
526          url_for_cookies, cookie, options,
527          base::Bind(&WebSocketJob::OnCookieSaved,
528                     weak_ptr_factory_.GetWeakPtr()));
529    }
530  }
531
532  save_next_cookie_running_ = false;
533
534  if (callback_pending_)
535    return;
536
537  response_cookies_.clear();
538  response_cookies_save_index_ = 0;
539
540  NotifyHeadersComplete();
541}
542
543void WebSocketJob::OnCookieSaved(bool cookie_status) {
544  // Tell the caller of SetCookieWithOptionsAsync() that this completion
545  // callback is invoked.
546  // - If the caller checks callback_pending earlier than this callback, the
547  //   caller exits to let this method continue iteration.
548  // - Otherwise, the caller continues iteration.
549  callback_pending_ = false;
550
551  // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
552  // the loop. Otherwise, return.
553  if (save_next_cookie_running_)
554    return;
555
556  SaveNextCookie();
557}
558
559GURL WebSocketJob::GetURLForCookies() const {
560  GURL url = socket_->url();
561  std::string scheme = socket_->is_secure() ? "https" : "http";
562  url::Replacements<char> replacements;
563  replacements.SetScheme(scheme.c_str(), url::Component(0, scheme.length()));
564  return url.ReplaceComponents(replacements);
565}
566
567const AddressList& WebSocketJob::address_list() const {
568  return addresses_;
569}
570
571int WebSocketJob::TrySpdyStream() {
572  if (!socket_.get())
573    return ERR_FAILED;
574
575  // Check if we have a SPDY session available.
576  HttpTransactionFactory* factory =
577      socket_->context()->http_transaction_factory();
578  if (!factory)
579    return OK;
580  scoped_refptr<HttpNetworkSession> session = factory->GetSession();
581  if (!session.get() || !session->params().enable_websocket_over_spdy)
582    return OK;
583  SpdySessionPool* spdy_pool = session->spdy_session_pool();
584  PrivacyMode privacy_mode = socket_->privacy_mode();
585  const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
586                           socket_->proxy_server(), privacy_mode);
587  // Forbid wss downgrade to SPDY without SSL.
588  // TODO(toyoshim): Does it realize the same policy with HTTP?
589  base::WeakPtr<SpdySession> spdy_session =
590      spdy_pool->FindAvailableSession(key, *socket_->net_log());
591  if (!spdy_session)
592    return OK;
593
594  SSLInfo ssl_info;
595  bool was_npn_negotiated;
596  NextProto protocol_negotiated = kProtoUnknown;
597  bool use_ssl = spdy_session->GetSSLInfo(
598      &ssl_info, &was_npn_negotiated, &protocol_negotiated);
599  if (socket_->is_secure() && !use_ssl)
600    return OK;
601
602  // Create SpdyWebSocketStream.
603  spdy_protocol_version_ = spdy_session->GetProtocolVersion();
604  spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
605
606  int result = spdy_websocket_stream_->InitializeStream(
607      socket_->url(), MEDIUM, *socket_->net_log());
608  if (result == OK) {
609    OnConnected(socket_.get(), kMaxPendingSendAllowed);
610    return ERR_PROTOCOL_SWITCHED;
611  }
612  if (result != ERR_IO_PENDING) {
613    spdy_websocket_stream_.reset();
614    return OK;
615  }
616
617  return ERR_IO_PENDING;
618}
619
620void WebSocketJob::SetWaiting() {
621  waiting_ = true;
622}
623
624bool WebSocketJob::IsWaiting() const {
625  return waiting_;
626}
627
628void WebSocketJob::Wakeup() {
629  if (!waiting_)
630    return;
631  waiting_ = false;
632  DCHECK(!callback_.is_null());
633  base::MessageLoopForIO::current()->PostTask(
634      FROM_HERE,
635      base::Bind(&WebSocketJob::RetryPendingIO,
636                 weak_ptr_factory_.GetWeakPtr()));
637}
638
639void WebSocketJob::RetryPendingIO() {
640  int result = TrySpdyStream();
641
642  // In the case of ERR_IO_PENDING, CompleteIO() will be called from
643  // OnCreatedSpdyStream().
644  if (result != ERR_IO_PENDING)
645    CompleteIO(result);
646}
647
648void WebSocketJob::CompleteIO(int result) {
649  // |callback_| may be null if OnClose() or DetachDelegate() was called.
650  if (!callback_.is_null()) {
651    CompletionCallback callback = callback_;
652    callback_.Reset();
653    callback.Run(result);
654    Release();  // Balanced with OnStartOpenConnection().
655  }
656}
657
658bool WebSocketJob::SendDataInternal(const char* data, int length) {
659  if (spdy_websocket_stream_.get())
660    return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
661  if (socket_.get())
662    return socket_->SendData(data, length);
663  return false;
664}
665
666void WebSocketJob::CloseInternal() {
667  if (spdy_websocket_stream_.get())
668    spdy_websocket_stream_->Close();
669  if (socket_.get())
670    socket_->Close();
671}
672
673void WebSocketJob::SendPending() {
674  if (current_send_buffer_.get())
675    return;
676
677  // Current buffer has been sent. Try next if any.
678  if (send_buffer_queue_.empty()) {
679    // No more data to send.
680    if (state_ == CLOSING)
681      CloseInternal();
682    return;
683  }
684
685  scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
686  send_buffer_queue_.pop_front();
687  current_send_buffer_ =
688      new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
689  SendDataInternal(current_send_buffer_->data(),
690                   current_send_buffer_->BytesRemaining());
691}
692
693}  // namespace net
694