quic_http_stream.cc revision d0247b1b59f9c528cb6df88b4f2b9afaf80d181e
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/quic/quic_http_stream.h" 6 7#include "base/callback_helpers.h" 8#include "base/strings/stringprintf.h" 9#include "net/base/io_buffer.h" 10#include "net/base/net_errors.h" 11#include "net/http/http_response_headers.h" 12#include "net/http/http_util.h" 13#include "net/quic/quic_client_session.h" 14#include "net/quic/quic_http_utils.h" 15#include "net/quic/quic_reliable_client_stream.h" 16#include "net/quic/quic_utils.h" 17#include "net/socket/next_proto.h" 18#include "net/spdy/spdy_frame_builder.h" 19#include "net/spdy/spdy_framer.h" 20#include "net/spdy/spdy_http_utils.h" 21#include "net/ssl/ssl_info.h" 22 23namespace net { 24 25static const size_t kHeaderBufInitialSize = 4096; 26 27QuicHttpStream::QuicHttpStream(const base::WeakPtr<QuicClientSession> session) 28 : next_state_(STATE_NONE), 29 session_(session), 30 stream_(NULL), 31 request_info_(NULL), 32 request_body_stream_(NULL), 33 priority_(MINIMUM_PRIORITY), 34 response_info_(NULL), 35 response_status_(OK), 36 response_headers_received_(false), 37 read_buf_(new GrowableIOBuffer()), 38 user_buffer_len_(0), 39 weak_factory_(this) { 40 DCHECK(session_); 41} 42 43QuicHttpStream::~QuicHttpStream() { 44 Close(false); 45} 46 47int QuicHttpStream::InitializeStream(const HttpRequestInfo* request_info, 48 RequestPriority priority, 49 const BoundNetLog& stream_net_log, 50 const CompletionCallback& callback) { 51 DCHECK(!stream_); 52 if (!session_) 53 return ERR_CONNECTION_CLOSED; 54 55 stream_net_log_ = stream_net_log; 56 request_info_ = request_info; 57 priority_ = priority; 58 59 int rv = stream_request_.StartRequest( 60 session_, &stream_, base::Bind(&QuicHttpStream::OnStreamReady, 61 weak_factory_.GetWeakPtr())); 62 if (rv == ERR_IO_PENDING) 63 callback_ = callback; 64 65 if (rv == OK) 66 stream_->SetDelegate(this); 67 68 return rv; 69} 70 71void QuicHttpStream::OnStreamReady(int rv) { 72 DCHECK(rv == OK || !stream_); 73 if (rv == OK) 74 stream_->SetDelegate(this); 75 76 ResetAndReturn(&callback_).Run(rv); 77} 78 79int QuicHttpStream::SendRequest(const HttpRequestHeaders& request_headers, 80 HttpResponseInfo* response, 81 const CompletionCallback& callback) { 82 CHECK(stream_); 83 CHECK(!request_body_stream_); 84 CHECK(!response_info_); 85 CHECK(!callback.is_null()); 86 CHECK(response); 87 88 QuicPriority priority = ConvertRequestPriorityToQuicPriority(priority_); 89 stream_->set_priority(priority); 90 // Store the serialized request headers. 91 SpdyHeaderBlock headers; 92 CreateSpdyHeadersFromHttpRequest(*request_info_, request_headers, 93 &headers, 3, /*direct=*/true); 94 if (session_->connection()->version() < QUIC_VERSION_9) { 95 request_ = stream_->compressor()->CompressHeaders(headers); 96 } else { 97 request_ = stream_->compressor()->CompressHeadersWithPriority(priority, 98 headers); 99 } 100 // Log the actual request with the URL Request's net log. 101 stream_net_log_.AddEvent( 102 NetLog::TYPE_HTTP_TRANSACTION_SPDY_SEND_REQUEST_HEADERS, 103 base::Bind(&SpdyHeaderBlockNetLogCallback, &headers)); 104 // Also log to the QuicSession's net log. 105 stream_->net_log().AddEvent( 106 NetLog::TYPE_QUIC_HTTP_STREAM_SEND_REQUEST_HEADERS, 107 base::Bind(&SpdyHeaderBlockNetLogCallback, &headers)); 108 109 // Store the request body. 110 request_body_stream_ = request_info_->upload_data_stream; 111 if (request_body_stream_) { 112 // TODO(rch): Can we be more precise about when to allocate 113 // raw_request_body_buf_. Removed the following check. DoReadRequestBody() 114 // was being called even if we didn't yet allocate raw_request_body_buf_. 115 // && (request_body_stream_->size() || 116 // request_body_stream_->is_chunked())) 117 // 118 // Use kMaxPacketSize as the buffer size, since the request 119 // body data is written with this size at a time. 120 // TODO(rch): use a smarter value since we can't write an entire 121 // packet due to overhead. 122 raw_request_body_buf_ = new IOBufferWithSize(kMaxPacketSize); 123 // The request body buffer is empty at first. 124 request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_.get(), 0); 125 } 126 127 // Store the response info. 128 response_info_ = response; 129 130 next_state_ = STATE_SEND_HEADERS; 131 int rv = DoLoop(OK); 132 if (rv == ERR_IO_PENDING) 133 callback_ = callback; 134 135 return rv > 0 ? OK : rv; 136} 137 138UploadProgress QuicHttpStream::GetUploadProgress() const { 139 if (!request_body_stream_) 140 return UploadProgress(); 141 142 return UploadProgress(request_body_stream_->position(), 143 request_body_stream_->size()); 144} 145 146int QuicHttpStream::ReadResponseHeaders(const CompletionCallback& callback) { 147 CHECK(!callback.is_null()); 148 149 if (stream_ == NULL) 150 return response_status_; 151 152 // Check if we already have the response headers. If so, return synchronously. 153 if (response_headers_received_) 154 return OK; 155 156 // Still waiting for the response, return IO_PENDING. 157 CHECK(callback_.is_null()); 158 callback_ = callback; 159 return ERR_IO_PENDING; 160} 161 162const HttpResponseInfo* QuicHttpStream::GetResponseInfo() const { 163 return response_info_; 164} 165 166int QuicHttpStream::ReadResponseBody( 167 IOBuffer* buf, int buf_len, const CompletionCallback& callback) { 168 CHECK(buf); 169 CHECK(buf_len); 170 CHECK(!callback.is_null()); 171 172 // If we have data buffered, complete the IO immediately. 173 if (!response_body_.empty()) { 174 int bytes_read = 0; 175 while (!response_body_.empty() && buf_len > 0) { 176 scoped_refptr<IOBufferWithSize> data = response_body_.front(); 177 const int bytes_to_copy = std::min(buf_len, data->size()); 178 memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy); 179 buf_len -= bytes_to_copy; 180 if (bytes_to_copy == data->size()) { 181 response_body_.pop_front(); 182 } else { 183 const int bytes_remaining = data->size() - bytes_to_copy; 184 IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining); 185 memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]), 186 bytes_remaining); 187 response_body_.pop_front(); 188 response_body_.push_front(make_scoped_refptr(new_buffer)); 189 } 190 bytes_read += bytes_to_copy; 191 } 192 return bytes_read; 193 } 194 195 if (!stream_) { 196 // If the stream is already closed, there is no body to read. 197 return response_status_; 198 } 199 200 CHECK(callback_.is_null()); 201 CHECK(!user_buffer_.get()); 202 CHECK_EQ(0, user_buffer_len_); 203 204 callback_ = callback; 205 user_buffer_ = buf; 206 user_buffer_len_ = buf_len; 207 return ERR_IO_PENDING; 208} 209 210void QuicHttpStream::Close(bool not_reusable) { 211 // Note: the not_reusable flag has no meaning for SPDY streams. 212 if (stream_) { 213 stream_->SetDelegate(NULL); 214 // TODO(rch): use new CANCELLED error code here once quic 11 215 // is everywhere. 216 stream_->Close(QUIC_ERROR_PROCESSING_STREAM); 217 stream_ = NULL; 218 } 219} 220 221HttpStream* QuicHttpStream::RenewStreamForAuth() { 222 return NULL; 223} 224 225bool QuicHttpStream::IsResponseBodyComplete() const { 226 return next_state_ == STATE_OPEN && !stream_; 227} 228 229bool QuicHttpStream::CanFindEndOfResponse() const { 230 return true; 231} 232 233bool QuicHttpStream::IsConnectionReused() const { 234 // TODO(rch): do something smarter here. 235 return stream_ && stream_->id() > 1; 236} 237 238void QuicHttpStream::SetConnectionReused() { 239 // QUIC doesn't need an indicator here. 240} 241 242bool QuicHttpStream::IsConnectionReusable() const { 243 // QUIC streams aren't considered reusable. 244 return false; 245} 246 247bool QuicHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const { 248 // TODO(mmenke): Figure out what to do here. 249 return true; 250} 251 252void QuicHttpStream::GetSSLInfo(SSLInfo* ssl_info) { 253 DCHECK(stream_); 254 stream_->GetSSLInfo(ssl_info); 255} 256 257void QuicHttpStream::GetSSLCertRequestInfo( 258 SSLCertRequestInfo* cert_request_info) { 259 DCHECK(stream_); 260 NOTIMPLEMENTED(); 261} 262 263bool QuicHttpStream::IsSpdyHttpStream() const { 264 return false; 265} 266 267void QuicHttpStream::Drain(HttpNetworkSession* session) { 268 Close(false); 269 delete this; 270} 271 272void QuicHttpStream::SetPriority(RequestPriority priority) { 273 priority_ = priority; 274} 275 276int QuicHttpStream::OnSendData() { 277 // TODO(rch): Change QUIC IO to provide notifications to the streams. 278 NOTREACHED(); 279 return OK; 280} 281 282int QuicHttpStream::OnSendDataComplete(int status, bool* eof) { 283 // TODO(rch): Change QUIC IO to provide notifications to the streams. 284 NOTREACHED(); 285 return OK; 286} 287 288int QuicHttpStream::OnDataReceived(const char* data, int length) { 289 DCHECK_NE(0, length); 290 // Are we still reading the response headers. 291 if (!response_headers_received_) { 292 // Grow the read buffer if necessary. 293 if (read_buf_->RemainingCapacity() < length) { 294 size_t additional_capacity = length - read_buf_->RemainingCapacity(); 295 if (additional_capacity < kHeaderBufInitialSize) 296 additional_capacity = kHeaderBufInitialSize; 297 read_buf_->SetCapacity(read_buf_->capacity() + additional_capacity); 298 } 299 memcpy(read_buf_->data(), data, length); 300 read_buf_->set_offset(read_buf_->offset() + length); 301 int rv = ParseResponseHeaders(); 302 if (rv != ERR_IO_PENDING && !callback_.is_null()) { 303 DoCallback(rv); 304 } 305 return OK; 306 } 307 308 if (callback_.is_null()) { 309 BufferResponseBody(data, length); 310 return OK; 311 } 312 313 if (length <= user_buffer_len_) { 314 memcpy(user_buffer_->data(), data, length); 315 } else { 316 memcpy(user_buffer_->data(), data, user_buffer_len_); 317 int delta = length - user_buffer_len_; 318 BufferResponseBody(data + user_buffer_len_, delta); 319 } 320 user_buffer_ = NULL; 321 user_buffer_len_ = 0; 322 DoCallback(length); 323 return OK; 324} 325 326void QuicHttpStream::OnClose(QuicErrorCode error) { 327 if (error != QUIC_NO_ERROR) { 328 response_status_ = ERR_QUIC_PROTOCOL_ERROR; 329 } else if (!response_headers_received_) { 330 response_status_ = ERR_ABORTED; 331 } 332 333 stream_ = NULL; 334 if (!callback_.is_null()) 335 DoCallback(response_status_); 336} 337 338void QuicHttpStream::OnError(int error) { 339 stream_ = NULL; 340 response_status_ = error; 341 if (!callback_.is_null()) 342 DoCallback(response_status_); 343} 344 345bool QuicHttpStream::HasSendHeadersComplete() { 346 return next_state_ > STATE_SEND_HEADERS_COMPLETE; 347} 348 349void QuicHttpStream::OnIOComplete(int rv) { 350 rv = DoLoop(rv); 351 352 if (rv != ERR_IO_PENDING && !callback_.is_null()) { 353 DoCallback(rv); 354 } 355} 356 357void QuicHttpStream::DoCallback(int rv) { 358 CHECK_NE(rv, ERR_IO_PENDING); 359 CHECK(!callback_.is_null()); 360 361 // The client callback can do anything, including destroying this class, 362 // so any pending callback must be issued after everything else is done. 363 base::ResetAndReturn(&callback_).Run(rv); 364} 365 366int QuicHttpStream::DoLoop(int rv) { 367 do { 368 State state = next_state_; 369 next_state_ = STATE_NONE; 370 switch (state) { 371 case STATE_SEND_HEADERS: 372 CHECK_EQ(OK, rv); 373 rv = DoSendHeaders(); 374 break; 375 case STATE_SEND_HEADERS_COMPLETE: 376 rv = DoSendHeadersComplete(rv); 377 break; 378 case STATE_READ_REQUEST_BODY: 379 CHECK_EQ(OK, rv); 380 rv = DoReadRequestBody(); 381 break; 382 case STATE_READ_REQUEST_BODY_COMPLETE: 383 rv = DoReadRequestBodyComplete(rv); 384 break; 385 case STATE_SEND_BODY: 386 CHECK_EQ(OK, rv); 387 rv = DoSendBody(); 388 break; 389 case STATE_SEND_BODY_COMPLETE: 390 rv = DoSendBodyComplete(rv); 391 break; 392 case STATE_OPEN: 393 CHECK_EQ(OK, rv); 394 break; 395 default: 396 NOTREACHED() << "next_state_: " << next_state_; 397 break; 398 } 399 } while (next_state_ != STATE_NONE && next_state_ != STATE_OPEN && 400 rv != ERR_IO_PENDING); 401 402 return rv; 403} 404 405int QuicHttpStream::DoSendHeaders() { 406 if (!stream_) 407 return ERR_UNEXPECTED; 408 409 bool has_upload_data = request_body_stream_ != NULL; 410 411 next_state_ = STATE_SEND_HEADERS_COMPLETE; 412 return stream_->WriteStreamData( 413 request_, !has_upload_data, 414 base::Bind(&QuicHttpStream::OnIOComplete, weak_factory_.GetWeakPtr())); 415} 416 417int QuicHttpStream::DoSendHeadersComplete(int rv) { 418 if (rv < 0) 419 return rv; 420 421 next_state_ = request_body_stream_ ? 422 STATE_READ_REQUEST_BODY : STATE_OPEN; 423 424 return OK; 425} 426 427int QuicHttpStream::DoReadRequestBody() { 428 next_state_ = STATE_READ_REQUEST_BODY_COMPLETE; 429 return request_body_stream_->Read( 430 raw_request_body_buf_.get(), 431 raw_request_body_buf_->size(), 432 base::Bind(&QuicHttpStream::OnIOComplete, weak_factory_.GetWeakPtr())); 433} 434 435int QuicHttpStream::DoReadRequestBodyComplete(int rv) { 436 // |rv| is the result of read from the request body from the last call to 437 // DoSendBody(). 438 if (rv < 0) 439 return rv; 440 441 request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_.get(), rv); 442 if (rv == 0) { // Reached the end. 443 DCHECK(request_body_stream_->IsEOF()); 444 } 445 446 next_state_ = STATE_SEND_BODY; 447 return OK; 448} 449 450int QuicHttpStream::DoSendBody() { 451 if (!stream_) 452 return ERR_UNEXPECTED; 453 454 CHECK(request_body_stream_); 455 CHECK(request_body_buf_.get()); 456 const bool eof = request_body_stream_->IsEOF(); 457 int len = request_body_buf_->BytesRemaining(); 458 if (len > 0 || eof) { 459 next_state_ = STATE_SEND_BODY_COMPLETE; 460 base::StringPiece data(request_body_buf_->data(), len); 461 return stream_->WriteStreamData( 462 data, eof, 463 base::Bind(&QuicHttpStream::OnIOComplete, weak_factory_.GetWeakPtr())); 464 } 465 466 next_state_ = STATE_OPEN; 467 return OK; 468} 469 470int QuicHttpStream::DoSendBodyComplete(int rv) { 471 if (rv < 0) 472 return rv; 473 474 request_body_buf_->DidConsume(request_body_buf_->BytesRemaining()); 475 476 if (!request_body_stream_->IsEOF()) { 477 next_state_ = STATE_READ_REQUEST_BODY; 478 return OK; 479 } 480 481 next_state_ = STATE_OPEN; 482 return OK; 483} 484 485int QuicHttpStream::ParseResponseHeaders() { 486 size_t read_buf_len = static_cast<size_t>(read_buf_->offset()); 487 SpdyFramer framer(SPDY3); 488 SpdyHeaderBlock headers; 489 char* data = read_buf_->StartOfBuffer(); 490 size_t len = framer.ParseHeaderBlockInBuffer(data, read_buf_->offset(), 491 &headers); 492 493 if (len == 0) { 494 return ERR_IO_PENDING; 495 } 496 497 // Save the remaining received data. 498 size_t delta = read_buf_len - len; 499 if (delta > 0) { 500 BufferResponseBody(data + len, delta); 501 } 502 503 // The URLRequest logs these headers, so only log to the QuicSession's 504 // net log. 505 stream_->net_log().AddEvent( 506 NetLog::TYPE_QUIC_HTTP_STREAM_READ_RESPONSE_HEADERS, 507 base::Bind(&SpdyHeaderBlockNetLogCallback, &headers)); 508 509 if (!SpdyHeadersToHttpResponse(headers, 3, response_info_)) { 510 DLOG(WARNING) << "Invalid headers"; 511 return ERR_QUIC_PROTOCOL_ERROR; 512 } 513 // Put the peer's IP address and port into the response. 514 IPEndPoint address = stream_->GetPeerAddress(); 515 response_info_->socket_address = HostPortPair::FromIPEndPoint(address); 516 response_info_->connection_info = 517 HttpResponseInfo::CONNECTION_INFO_QUIC1_SPDY3; 518 response_info_->vary_data 519 .Init(*request_info_, *response_info_->headers.get()); 520 response_info_->was_npn_negotiated = true; 521 response_info_->npn_negotiated_protocol = "quic/1+spdy/3"; 522 response_headers_received_ = true; 523 524 return OK; 525} 526 527void QuicHttpStream::BufferResponseBody(const char* data, int length) { 528 if (length == 0) 529 return; 530 IOBufferWithSize* io_buffer = new IOBufferWithSize(length); 531 memcpy(io_buffer->data(), data, length); 532 response_body_.push_back(make_scoped_refptr(io_buffer)); 533} 534 535} // namespace net 536