spdy_http_stream.cc revision ddb351dbec246cf1fab5ec20d2d5520909041de1
1// Copyright (c) 2011 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/spdy/spdy_http_stream.h" 6 7#include <algorithm> 8#include <list> 9#include <string> 10 11#include "base/logging.h" 12#include "base/message_loop.h" 13#include "net/base/address_list.h" 14#include "net/base/host_port_pair.h" 15#include "net/base/load_flags.h" 16#include "net/base/net_util.h" 17#include "net/http/http_request_headers.h" 18#include "net/http/http_request_info.h" 19#include "net/http/http_response_info.h" 20#include "net/http/http_util.h" 21#include "net/spdy/spdy_http_utils.h" 22#include "net/spdy/spdy_session.h" 23 24namespace net { 25 26SpdyHttpStream::SpdyHttpStream(SpdySession* spdy_session, 27 bool direct) 28 : ALLOW_THIS_IN_INITIALIZER_LIST(read_callback_factory_(this)), 29 stream_(NULL), 30 spdy_session_(spdy_session), 31 response_info_(NULL), 32 download_finished_(false), 33 response_headers_received_(false), 34 user_callback_(NULL), 35 user_buffer_len_(0), 36 buffered_read_callback_pending_(false), 37 more_read_data_pending_(false), 38 direct_(direct) { } 39 40void SpdyHttpStream::InitializeWithExistingStream(SpdyStream* spdy_stream) { 41 stream_ = spdy_stream; 42 stream_->SetDelegate(this); 43 response_headers_received_ = true; 44} 45 46SpdyHttpStream::~SpdyHttpStream() { 47 if (stream_) 48 stream_->DetachDelegate(); 49} 50 51int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info, 52 const BoundNetLog& stream_net_log, 53 CompletionCallback* callback) { 54 DCHECK(!stream_.get()); 55 if (spdy_session_->IsClosed()) 56 return ERR_CONNECTION_CLOSED; 57 58 request_info_ = request_info; 59 if (request_info_->method == "GET") { 60 int error = spdy_session_->GetPushStream(request_info_->url, &stream_, 61 stream_net_log); 62 if (error != OK) 63 return error; 64 } 65 66 if (stream_.get()) 67 return OK; 68 69 return spdy_session_->CreateStream(request_info_->url, 70 request_info_->priority, &stream_, 71 stream_net_log, callback); 72} 73 74const HttpResponseInfo* SpdyHttpStream::GetResponseInfo() const { 75 return response_info_; 76} 77 78uint64 SpdyHttpStream::GetUploadProgress() const { 79 if (!request_body_stream_.get()) 80 return 0; 81 82 return request_body_stream_->position(); 83} 84 85int SpdyHttpStream::ReadResponseHeaders(CompletionCallback* callback) { 86 CHECK(callback); 87 CHECK(!stream_->cancelled()); 88 89 if (stream_->closed()) 90 return stream_->response_status(); 91 92 // Check if we already have the response headers. If so, return synchronously. 93 if(stream_->response_received()) { 94 CHECK(stream_->is_idle()); 95 return OK; 96 } 97 98 // Still waiting for the response, return IO_PENDING. 99 CHECK(!user_callback_); 100 user_callback_ = callback; 101 return ERR_IO_PENDING; 102} 103 104int SpdyHttpStream::ReadResponseBody( 105 IOBuffer* buf, int buf_len, CompletionCallback* callback) { 106 CHECK(stream_->is_idle()); 107 CHECK(buf); 108 CHECK(buf_len); 109 CHECK(callback); 110 111 // If we have data buffered, complete the IO immediately. 112 if (!response_body_.empty()) { 113 int bytes_read = 0; 114 while (!response_body_.empty() && buf_len > 0) { 115 scoped_refptr<IOBufferWithSize> data = response_body_.front(); 116 const int bytes_to_copy = std::min(buf_len, data->size()); 117 memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy); 118 buf_len -= bytes_to_copy; 119 if (bytes_to_copy == data->size()) { 120 response_body_.pop_front(); 121 } else { 122 const int bytes_remaining = data->size() - bytes_to_copy; 123 IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining); 124 memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]), 125 bytes_remaining); 126 response_body_.pop_front(); 127 response_body_.push_front(make_scoped_refptr(new_buffer)); 128 } 129 bytes_read += bytes_to_copy; 130 } 131 if (SpdySession::flow_control()) 132 stream_->IncreaseRecvWindowSize(bytes_read); 133 return bytes_read; 134 } else if (stream_->closed()) { 135 return stream_->response_status(); 136 } 137 138 CHECK(!user_callback_); 139 CHECK(!user_buffer_); 140 CHECK_EQ(0, user_buffer_len_); 141 142 user_callback_ = callback; 143 user_buffer_ = buf; 144 user_buffer_len_ = buf_len; 145 return ERR_IO_PENDING; 146} 147 148void SpdyHttpStream::Close(bool not_reusable) { 149 // Note: the not_reusable flag has no meaning for SPDY streams. 150 151 Cancel(); 152} 153 154HttpStream* SpdyHttpStream::RenewStreamForAuth() { 155 return NULL; 156} 157 158bool SpdyHttpStream::IsResponseBodyComplete() const { 159 if (!stream_) 160 return false; 161 return stream_->closed(); 162} 163 164bool SpdyHttpStream::CanFindEndOfResponse() const { 165 return true; 166} 167 168bool SpdyHttpStream::IsMoreDataBuffered() const { 169 return false; 170} 171 172bool SpdyHttpStream::IsConnectionReused() const { 173 return spdy_session_->IsReused(); 174} 175 176void SpdyHttpStream::SetConnectionReused() { 177 // SPDY doesn't need an indicator here. 178} 179 180bool SpdyHttpStream::IsConnectionReusable() const { 181 // SPDY streams aren't considered reusable. 182 return false; 183} 184 185void SpdyHttpStream::set_chunk_callback(ChunkCallback* callback) { 186 if (request_body_stream_ != NULL) 187 request_body_stream_->set_chunk_callback(callback); 188} 189 190int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers, 191 UploadDataStream* request_body, 192 HttpResponseInfo* response, 193 CompletionCallback* callback) { 194 base::Time request_time = base::Time::Now(); 195 CHECK(stream_.get()); 196 197 stream_->SetDelegate(this); 198 199 linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock); 200 CreateSpdyHeadersFromHttpRequest(*request_info_, request_headers, 201 headers.get(), direct_); 202 stream_->set_spdy_headers(headers); 203 204 stream_->SetRequestTime(request_time); 205 // This should only get called in the case of a request occurring 206 // during server push that has already begun but hasn't finished, 207 // so we set the response's request time to be the actual one 208 if (response_info_) 209 response_info_->request_time = request_time; 210 211 CHECK(!request_body_stream_.get()); 212 if (request_body) { 213 if (request_body->size() || request_body->is_chunked()) 214 request_body_stream_.reset(request_body); 215 else 216 delete request_body; 217 } 218 219 CHECK(callback); 220 CHECK(!stream_->cancelled()); 221 CHECK(response); 222 223 if (!stream_->pushed() && stream_->closed()) { 224 if (stream_->response_status() == OK) 225 return ERR_FAILED; 226 else 227 return stream_->response_status(); 228 } 229 230 // SendRequest can be called in two cases. 231 // 232 // a) A client initiated request. In this case, |response_info_| should be 233 // NULL to start with. 234 // b) A client request which matches a response that the server has already 235 // pushed. 236 if (push_response_info_.get()) { 237 *response = *(push_response_info_.get()); 238 push_response_info_.reset(); 239 } 240 else 241 DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_); 242 243 response_info_ = response; 244 245 // Put the peer's IP address and port into the response. 246 AddressList address; 247 int result = stream_->GetPeerAddress(&address); 248 if (result != OK) 249 return result; 250 response_info_->socket_address = HostPortPair::FromAddrInfo(address.head()); 251 252 bool has_upload_data = request_body_stream_.get() != NULL; 253 result = stream_->SendRequest(has_upload_data); 254 if (result == ERR_IO_PENDING) { 255 CHECK(!user_callback_); 256 user_callback_ = callback; 257 } 258 return result; 259} 260 261void SpdyHttpStream::Cancel() { 262 if (spdy_session_) 263 spdy_session_->CancelPendingCreateStreams(&stream_); 264 user_callback_ = NULL; 265 if (stream_) 266 stream_->Cancel(); 267} 268 269bool SpdyHttpStream::OnSendHeadersComplete(int status) { 270 if (user_callback_) 271 DoCallback(status); 272 return request_body_stream_.get() == NULL; 273} 274 275int SpdyHttpStream::OnSendBody() { 276 CHECK(request_body_stream_.get()); 277 278 int buf_len = static_cast<int>(request_body_stream_->buf_len()); 279 if (!buf_len) 280 return OK; 281 bool is_chunked = request_body_stream_->is_chunked(); 282 // TODO(satish): For non-chunked POST data, we set DATA_FLAG_FIN for all 283 // blocks of data written out. This is wrong if the POST data was larger than 284 // UploadDataStream::kBufSize as that is the largest buffer that 285 // UploadDataStream returns at a time and we'll be setting the FIN flag for 286 // each block of data written out. 287 bool eof = !is_chunked || request_body_stream_->IsOnLastChunk(); 288 return stream_->WriteStreamData( 289 request_body_stream_->buf(), buf_len, 290 eof ? spdy::DATA_FLAG_FIN : spdy::DATA_FLAG_NONE); 291} 292 293int SpdyHttpStream::OnSendBodyComplete(int status, bool* eof) { 294 CHECK(request_body_stream_.get()); 295 296 request_body_stream_->MarkConsumedAndFillBuffer(status); 297 *eof = request_body_stream_->eof(); 298 if (!*eof && 299 request_body_stream_->is_chunked() && 300 !request_body_stream_->buf_len()) 301 return ERR_IO_PENDING; 302 303 return OK; 304} 305 306int SpdyHttpStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response, 307 base::Time response_time, 308 int status) { 309 if (!response_info_) { 310 DCHECK(stream_->pushed()); 311 push_response_info_.reset(new HttpResponseInfo); 312 response_info_ = push_response_info_.get(); 313 } 314 315 // If the response is already received, these headers are too late. 316 if (response_headers_received_) { 317 LOG(WARNING) << "SpdyHttpStream headers received after response started."; 318 return OK; 319 } 320 321 // TODO(mbelshe): This is the time of all headers received, not just time 322 // to first byte. 323 response_info_->response_time = base::Time::Now(); 324 325 if (!SpdyHeadersToHttpResponse(response, response_info_)) { 326 // We might not have complete headers yet. 327 return ERR_INCOMPLETE_SPDY_HEADERS; 328 } 329 330 response_headers_received_ = true; 331 // Don't store the SSLInfo in the response here, HttpNetworkTransaction 332 // will take care of that part. 333 SSLInfo ssl_info; 334 stream_->GetSSLInfo(&ssl_info, 335 &response_info_->was_npn_negotiated); 336 response_info_->request_time = stream_->GetRequestTime(); 337 response_info_->vary_data.Init(*request_info_, *response_info_->headers); 338 // TODO(ahendrickson): This is recorded after the entire SYN_STREAM control 339 // frame has been received and processed. Move to framer? 340 response_info_->response_time = response_time; 341 342 if (user_callback_) 343 DoCallback(status); 344 return status; 345} 346 347void SpdyHttpStream::OnDataReceived(const char* data, int length) { 348 // SpdyStream won't call us with data if the header block didn't contain a 349 // valid set of headers. So we don't expect to not have headers received 350 // here. 351 DCHECK(response_headers_received_); 352 353 // Note that data may be received for a SpdyStream prior to the user calling 354 // ReadResponseBody(), therefore user_buffer_ may be NULL. This may often 355 // happen for server initiated streams. 356 DCHECK(!stream_->closed() || stream_->pushed()); 357 if (length > 0) { 358 // Save the received data. 359 IOBufferWithSize* io_buffer = new IOBufferWithSize(length); 360 memcpy(io_buffer->data(), data, length); 361 response_body_.push_back(make_scoped_refptr(io_buffer)); 362 363 if (user_buffer_) { 364 // Handing small chunks of data to the caller creates measurable overhead. 365 // We buffer data in short time-spans and send a single read notification. 366 ScheduleBufferedReadCallback(); 367 } 368 } 369} 370 371void SpdyHttpStream::OnDataSent(int length) { 372 // For HTTP streams, no data is sent from the client while in the OPEN state, 373 // so it is never called. 374 NOTREACHED(); 375} 376 377void SpdyHttpStream::OnClose(int status) { 378 bool invoked_callback = false; 379 if (status == net::OK) { 380 // We need to complete any pending buffered read now. 381 invoked_callback = DoBufferedReadCallback(); 382 } 383 if (!invoked_callback && user_callback_) 384 DoCallback(status); 385} 386 387void SpdyHttpStream::ScheduleBufferedReadCallback() { 388 // If there is already a scheduled DoBufferedReadCallback, don't issue 389 // another one. Mark that we have received more data and return. 390 if (buffered_read_callback_pending_) { 391 more_read_data_pending_ = true; 392 return; 393 } 394 395 more_read_data_pending_ = false; 396 buffered_read_callback_pending_ = true; 397 const int kBufferTimeMs = 1; 398 MessageLoop::current()->PostDelayedTask(FROM_HERE, read_callback_factory_. 399 NewRunnableMethod(&SpdyHttpStream::DoBufferedReadCallback), 400 kBufferTimeMs); 401} 402 403// Checks to see if we should wait for more buffered data before notifying 404// the caller. Returns true if we should wait, false otherwise. 405bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const { 406 // If the response is complete, there is no point in waiting. 407 if (stream_->closed()) 408 return false; 409 410 int bytes_buffered = 0; 411 std::list<scoped_refptr<IOBufferWithSize> >::const_iterator it; 412 for (it = response_body_.begin(); 413 it != response_body_.end() && bytes_buffered < user_buffer_len_; 414 ++it) 415 bytes_buffered += (*it)->size(); 416 417 return bytes_buffered < user_buffer_len_; 418} 419 420bool SpdyHttpStream::DoBufferedReadCallback() { 421 read_callback_factory_.RevokeAll(); 422 buffered_read_callback_pending_ = false; 423 424 // If the transaction is cancelled or errored out, we don't need to complete 425 // the read. 426 if (!stream_ || stream_->response_status() != OK || stream_->cancelled()) 427 return false; 428 429 // When more_read_data_pending_ is true, it means that more data has 430 // arrived since we started waiting. Wait a little longer and continue 431 // to buffer. 432 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { 433 ScheduleBufferedReadCallback(); 434 return false; 435 } 436 437 int rv = 0; 438 if (user_buffer_) { 439 rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_); 440 CHECK_NE(rv, ERR_IO_PENDING); 441 user_buffer_ = NULL; 442 user_buffer_len_ = 0; 443 DoCallback(rv); 444 return true; 445 } 446 return false; 447} 448 449void SpdyHttpStream::DoCallback(int rv) { 450 CHECK_NE(rv, ERR_IO_PENDING); 451 CHECK(user_callback_); 452 453 // Since Run may result in being called back, clear user_callback_ in advance. 454 CompletionCallback* c = user_callback_; 455 user_callback_ = NULL; 456 c->Run(rv); 457} 458 459void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) { 460 DCHECK(stream_); 461 bool using_npn; 462 stream_->GetSSLInfo(ssl_info, &using_npn); 463} 464 465void SpdyHttpStream::GetSSLCertRequestInfo( 466 SSLCertRequestInfo* cert_request_info) { 467 DCHECK(stream_); 468 stream_->GetSSLCertRequestInfo(cert_request_info); 469} 470 471bool SpdyHttpStream::IsSpdyHttpStream() const { 472 return true; 473} 474 475} // namespace net 476