1// Copyright (c) 2009 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/flip/flip_session.h" 6 7#include "base/basictypes.h" 8#include "base/logging.h" 9#include "base/message_loop.h" 10#include "base/rand_util.h" 11#include "base/stats_counters.h" 12#include "base/stl_util-inl.h" 13#include "base/string_util.h" 14#include "net/base/connection_type_histograms.h" 15#include "net/base/load_flags.h" 16#include "net/base/load_log.h" 17#include "net/base/net_util.h" 18#include "net/flip/flip_frame_builder.h" 19#include "net/flip/flip_protocol.h" 20#include "net/flip/flip_stream.h" 21#include "net/http/http_network_session.h" 22#include "net/http/http_request_info.h" 23#include "net/http/http_response_headers.h" 24#include "net/http/http_response_info.h" 25#include "net/socket/client_socket.h" 26#include "net/socket/client_socket_factory.h" 27#include "net/socket/ssl_client_socket.h" 28#include "net/tools/dump_cache/url_to_filename_encoder.h" 29 30namespace { 31 32// Diagnostics function to dump the headers of a request. 33// TODO(mbelshe): Remove this function. 34void DumpFlipHeaders(const flip::FlipHeaderBlock& headers) { 35 // Because this function gets called on every request, 36 // take extra care to optimize it away if logging is turned off. 37 if (logging::LOG_INFO < logging::GetMinLogLevel()) 38 return; 39 40 flip::FlipHeaderBlock::const_iterator it = headers.begin(); 41 while (it != headers.end()) { 42 std::string val = (*it).second; 43 std::string::size_type pos = 0; 44 while ((pos = val.find('\0', pos)) != val.npos) 45 val[pos] = '\n'; 46 LOG(INFO) << (*it).first << "==" << val; 47 ++it; 48 } 49} 50 51} // namespace 52 53namespace net { 54 55namespace { 56 57#ifdef WIN32 58// We use an artificially small buffer size on windows because the async IO 59// system will artifiially delay IO completions when we use large buffers. 60const int kReadBufferSize = 2 * 1024; 61#else 62const int kReadBufferSize = 8 * 1024; 63#endif 64 65// Convert a FlipHeaderBlock into an HttpResponseInfo. 66// |headers| input parameter with the FlipHeaderBlock. 67// |info| output parameter for the HttpResponseInfo. 68// Returns true if successfully converted. False if there was a failure 69// or if the FlipHeaderBlock was invalid. 70bool FlipHeadersToHttpResponse(const flip::FlipHeaderBlock& headers, 71 HttpResponseInfo* response) { 72 std::string version; 73 std::string status; 74 75 // The "status" and "version" headers are required. 76 flip::FlipHeaderBlock::const_iterator it; 77 it = headers.find("status"); 78 if (it == headers.end()) { 79 LOG(ERROR) << "FlipHeaderBlock without status header."; 80 return false; 81 } 82 status = it->second; 83 84 // Grab the version. If not provided by the server, 85 it = headers.find("version"); 86 if (it == headers.end()) { 87 LOG(ERROR) << "FlipHeaderBlock without version header."; 88 return false; 89 } 90 version = it->second; 91 92 std::string raw_headers(version); 93 raw_headers.push_back(' '); 94 raw_headers.append(status); 95 raw_headers.push_back('\0'); 96 for (it = headers.begin(); it != headers.end(); ++it) { 97 // For each value, if the server sends a NUL-separated 98 // list of values, we separate that back out into 99 // individual headers for each value in the list. 100 // e.g. 101 // Set-Cookie "foo\0bar" 102 // becomes 103 // Set-Cookie: foo\0 104 // Set-Cookie: bar\0 105 std::string value = it->second; 106 size_t start = 0; 107 size_t end = 0; 108 do { 109 end = value.find('\0', start); 110 std::string tval; 111 if (end != value.npos) 112 tval = value.substr(start, (end - start)); 113 else 114 tval = value.substr(start); 115 raw_headers.append(it->first); 116 raw_headers.push_back(':'); 117 raw_headers.append(tval); 118 raw_headers.push_back('\0'); 119 start = end + 1; 120 } while (end != value.npos); 121 } 122 123 response->headers = new HttpResponseHeaders(raw_headers); 124 response->was_fetched_via_spdy = true; 125 return true; 126} 127 128// Create a FlipHeaderBlock for a Flip SYN_STREAM Frame from 129// a HttpRequestInfo block. 130void CreateFlipHeadersFromHttpRequest( 131 const HttpRequestInfo& info, flip::FlipHeaderBlock* headers) { 132 static const char kHttpProtocolVersion[] = "HTTP/1.1"; 133 134 HttpUtil::HeadersIterator it(info.extra_headers.begin(), 135 info.extra_headers.end(), 136 "\r\n"); 137 while (it.GetNext()) { 138 std::string name = StringToLowerASCII(it.name()); 139 if (headers->find(name) == headers->end()) { 140 (*headers)[name] = it.values(); 141 } else { 142 std::string new_value = (*headers)[name]; 143 new_value += "\0"; 144 new_value += it.values(); 145 (*headers)[name] = new_value; 146 } 147 } 148 149 // TODO(mbelshe): Add Proxy headers here. (See http_network_transaction.cc) 150 // TODO(mbelshe): Add authentication headers here. 151 152 (*headers)["method"] = info.method; 153 (*headers)["url"] = info.url.spec(); 154 (*headers)["version"] = kHttpProtocolVersion; 155 if (info.user_agent.length()) 156 (*headers)["user-agent"] = info.user_agent; 157 if (!info.referrer.is_empty()) 158 (*headers)["referer"] = info.referrer.spec(); 159 160 // Honor load flags that impact proxy caches. 161 if (info.load_flags & LOAD_BYPASS_CACHE) { 162 (*headers)["pragma"] = "no-cache"; 163 (*headers)["cache-control"] = "no-cache"; 164 } else if (info.load_flags & LOAD_VALIDATE_CACHE) { 165 (*headers)["cache-control"] = "max-age=0"; 166 } 167} 168 169void AdjustSocketBufferSizes(ClientSocket* socket) { 170 // Adjust socket buffer sizes. 171 // FLIP uses one socket, and we want a really big buffer. 172 // This greatly helps on links with packet loss - we can even 173 // outperform Vista's dynamic window sizing algorithm. 174 // TODO(mbelshe): more study. 175 const int kSocketBufferSize = 512 * 1024; 176 socket->SetReceiveBufferSize(kSocketBufferSize); 177 socket->SetSendBufferSize(kSocketBufferSize); 178} 179 180} // namespace 181 182// static 183bool FlipSession::use_ssl_ = true; 184 185FlipSession::FlipSession(const std::string& host, HttpNetworkSession* session) 186 : ALLOW_THIS_IN_INITIALIZER_LIST( 187 connect_callback_(this, &FlipSession::OnTCPConnect)), 188 ALLOW_THIS_IN_INITIALIZER_LIST( 189 ssl_connect_callback_(this, &FlipSession::OnSSLConnect)), 190 ALLOW_THIS_IN_INITIALIZER_LIST( 191 read_callback_(this, &FlipSession::OnReadComplete)), 192 ALLOW_THIS_IN_INITIALIZER_LIST( 193 write_callback_(this, &FlipSession::OnWriteComplete)), 194 domain_(host), 195 session_(session), 196 connection_(new ClientSocketHandle), 197 read_buffer_(new IOBuffer(kReadBufferSize)), 198 read_pending_(false), 199 stream_hi_water_mark_(1), // Always start at 1 for the first stream id. 200 write_pending_(false), 201 delayed_write_pending_(false), 202 is_secure_(false), 203 error_(OK), 204 state_(IDLE), 205 streams_initiated_count_(0), 206 streams_pushed_count_(0), 207 streams_pushed_and_claimed_count_(0), 208 streams_abandoned_count_(0) { 209 // TODO(mbelshe): consider randomization of the stream_hi_water_mark. 210 211 flip_framer_.set_visitor(this); 212 213 session_->ssl_config_service()->GetSSLConfig(&ssl_config_); 214 215 // TODO(agl): This is a temporary hack for testing reasons. In the medium 216 // term we'll want to use NPN for all HTTPS connections and use the protocol 217 // suggested. 218 // 219 // In the event that the server supports Next Protocol Negotiation, but 220 // doesn't support either of these protocols, we'll request the first 221 // protocol in the list. Because of that, HTTP is listed first because it's 222 // what we'll actually fallback to in the case that the server doesn't 223 // support SPDY. 224 ssl_config_.next_protos = "\007http1.1\004spdy"; 225} 226 227FlipSession::~FlipSession() { 228 // Cleanup all the streams. 229 CloseAllStreams(net::ERR_ABORTED); 230 231 if (connection_->is_initialized()) { 232 // With Flip we can't recycle sockets. 233 connection_->socket()->Disconnect(); 234 } 235 236 // TODO(willchan): Don't hardcode port 80 here. 237 DCHECK(!session_->flip_session_pool()->HasSession( 238 HostResolver::RequestInfo(domain_, 80))); 239 240 // Record per-session histograms here. 241 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession", 242 streams_initiated_count_, 243 0, 300, 50); 244 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession", 245 streams_pushed_count_, 246 0, 300, 50); 247 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession", 248 streams_pushed_and_claimed_count_, 249 0, 300, 50); 250 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession", 251 streams_abandoned_count_, 252 0, 300, 50); 253} 254 255void FlipSession::InitializeWithSocket(ClientSocketHandle* connection) { 256 static StatsCounter flip_sessions("flip.sessions"); 257 flip_sessions.Increment(); 258 259 AdjustSocketBufferSizes(connection->socket()); 260 261 state_ = CONNECTED; 262 connection_.reset(connection); 263 264 // This is a newly initialized session that no client should have a handle to 265 // yet, so there's no need to start writing data as in OnTCPConnect(), but we 266 // should start reading data. 267 ReadSocket(); 268} 269 270net::Error FlipSession::Connect(const std::string& group_name, 271 const HostResolver::RequestInfo& host, 272 RequestPriority priority, 273 LoadLog* load_log) { 274 DCHECK(priority >= FLIP_PRIORITY_HIGHEST && priority <= FLIP_PRIORITY_LOWEST); 275 276 // If the connect process is started, let the caller continue. 277 if (state_ > IDLE) 278 return net::OK; 279 280 state_ = CONNECTING; 281 282 static StatsCounter flip_sessions("flip.sessions"); 283 flip_sessions.Increment(); 284 285 int rv = connection_->Init(group_name, host, priority, &connect_callback_, 286 session_->tcp_socket_pool(), load_log); 287 DCHECK(rv <= 0); 288 289 // If the connect is pending, we still return ok. The APIs enqueue 290 // work until after the connect completes asynchronously later. 291 if (rv == net::ERR_IO_PENDING) 292 return net::OK; 293 return static_cast<net::Error>(rv); 294} 295 296scoped_refptr<FlipStream> FlipSession::GetOrCreateStream( 297 const HttpRequestInfo& request, 298 const UploadDataStream* upload_data, 299 LoadLog* log) { 300 const GURL& url = request.url; 301 const std::string& path = url.PathForRequest(); 302 303 scoped_refptr<FlipStream> stream; 304 305 // Check if we have a push stream for this path. 306 if (request.method == "GET") { 307 stream = GetPushStream(path); 308 if (stream) { 309 DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_); 310 streams_pushed_and_claimed_count_++; 311 return stream; 312 } 313 } 314 315 // Check if we have a pending push stream for this url. 316 PendingStreamMap::iterator it; 317 it = pending_streams_.find(path); 318 if (it != pending_streams_.end()) { 319 DCHECK(!it->second); 320 // Server will assign a stream id when the push stream arrives. Use 0 for 321 // now. 322 LoadLog::AddEvent(log, LoadLog::TYPE_FLIP_STREAM_ADOPTED_PUSH_STREAM); 323 FlipStream* stream = new FlipStream(this, 0, true, log); 324 stream->set_path(path); 325 it->second = stream; 326 return it->second; 327 } 328 329 const flip::FlipStreamId stream_id = GetNewStreamId(); 330 331 // If we still don't have a stream, activate one now. 332 stream = new FlipStream(this, stream_id, false, log); 333 stream->set_priority(request.priority); 334 stream->set_path(path); 335 ActivateStream(stream); 336 337 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", 338 static_cast<int>(request.priority), 0, 10, 11); 339 340 LOG(INFO) << "FlipStream: Creating stream " << stream_id << " for " << url; 341 342 // TODO(mbelshe): Optimize memory allocations 343 DCHECK(request.priority >= FLIP_PRIORITY_HIGHEST && 344 request.priority <= FLIP_PRIORITY_LOWEST); 345 346 // Convert from HttpRequestHeaders to Flip Headers. 347 flip::FlipHeaderBlock headers; 348 CreateFlipHeadersFromHttpRequest(request, &headers); 349 350 flip::FlipControlFlags flags = flip::CONTROL_FLAG_NONE; 351 if (!request.upload_data || !upload_data->size()) 352 flags = flip::CONTROL_FLAG_FIN; 353 354 // Create a SYN_STREAM packet and add to the output queue. 355 scoped_ptr<flip::FlipSynStreamControlFrame> syn_frame( 356 flip_framer_.CreateSynStream(stream_id, request.priority, flags, false, 357 &headers)); 358 int length = flip::FlipFrame::size() + syn_frame->length(); 359 IOBuffer* buffer = new IOBuffer(length); 360 memcpy(buffer->data(), syn_frame->data(), length); 361 queue_.push(FlipIOBuffer(buffer, length, request.priority, stream)); 362 363 static StatsCounter flip_requests("flip.requests"); 364 flip_requests.Increment(); 365 366 LOG(INFO) << "FETCHING: " << request.url.spec(); 367 streams_initiated_count_++; 368 369 LOG(INFO) << "FLIP SYN_STREAM HEADERS ----------------------------------"; 370 DumpFlipHeaders(headers); 371 372 // Schedule to write to the socket after we've made it back 373 // to the message loop so that we can aggregate multiple 374 // requests. 375 // TODO(mbelshe): Should we do the "first" request immediately? 376 // maybe we should only 'do later' for subsequent 377 // requests. 378 WriteSocketLater(); 379 380 return stream; 381} 382 383int FlipSession::WriteStreamData(flip::FlipStreamId stream_id, 384 net::IOBuffer* data, int len) { 385 LOG(INFO) << "Writing Stream Data for stream " << stream_id << " (" << len 386 << " bytes)"; 387 const int kMss = 1430; // This is somewhat arbitrary and not really fixed, 388 // but it will always work reasonably with ethernet. 389 // Chop the world into 2-packet chunks. This is somewhat arbitrary, but 390 // is reasonably small and ensures that we elicit ACKs quickly from TCP 391 // (because TCP tries to only ACK every other packet). 392 const int kMaxFlipFrameChunkSize = (2 * kMss) - flip::FlipFrame::size(); 393 394 // Find our stream 395 DCHECK(IsStreamActive(stream_id)); 396 scoped_refptr<FlipStream> stream = active_streams_[stream_id]; 397 CHECK(stream->stream_id() == stream_id); 398 if (!stream) 399 return ERR_INVALID_FLIP_STREAM; 400 401 // TODO(mbelshe): Setting of the FIN is assuming that the caller will pass 402 // all data to write in a single chunk. Is this always true? 403 404 // Set the flags on the upload. 405 flip::FlipDataFlags flags = flip::DATA_FLAG_FIN; 406 if (len > kMaxFlipFrameChunkSize) { 407 len = kMaxFlipFrameChunkSize; 408 flags = flip::DATA_FLAG_NONE; 409 } 410 411 // TODO(mbelshe): reduce memory copies here. 412 scoped_ptr<flip::FlipDataFrame> frame( 413 flip_framer_.CreateDataFrame(stream_id, data->data(), len, flags)); 414 int length = flip::FlipFrame::size() + frame->length(); 415 IOBufferWithSize* buffer = new IOBufferWithSize(length); 416 memcpy(buffer->data(), frame->data(), length); 417 queue_.push(FlipIOBuffer(buffer, length, stream->priority(), stream)); 418 419 // Whenever we queue onto the socket we need to ensure that we will write to 420 // it later. 421 WriteSocketLater(); 422 423 return ERR_IO_PENDING; 424} 425 426bool FlipSession::CancelStream(flip::FlipStreamId stream_id) { 427 LOG(INFO) << "Cancelling stream " << stream_id; 428 if (!IsStreamActive(stream_id)) 429 return false; 430 431 // TODO(mbelshe): We should send a FIN_STREAM control frame here 432 // so that the server can cancel a large send. 433 434 // TODO(mbelshe): Write a method for tearing down a stream 435 // that cleans it out of the active list, the pending list, 436 // etc. 437 scoped_refptr<FlipStream> stream = active_streams_[stream_id]; 438 DeactivateStream(stream_id); 439 return true; 440} 441 442bool FlipSession::IsStreamActive(flip::FlipStreamId stream_id) const { 443 return ContainsKey(active_streams_, stream_id); 444} 445 446LoadState FlipSession::GetLoadState() const { 447 // NOTE: The application only queries the LoadState via the 448 // FlipNetworkTransaction, and details are only needed when 449 // we're in the process of connecting. 450 451 // If we're connecting, defer to the connection to give us the actual 452 // LoadState. 453 if (state_ == CONNECTING) 454 return connection_->GetLoadState(); 455 456 // Just report that we're idle since the session could be doing 457 // many things concurrently. 458 return LOAD_STATE_IDLE; 459} 460 461void FlipSession::OnTCPConnect(int result) { 462 LOG(INFO) << "Flip socket connected (result=" << result << ")"; 463 464 // We shouldn't be coming through this path if we didn't just open a fresh 465 // socket (or have an error trying to do so). 466 DCHECK(!connection_->socket() || !connection_->is_reused()); 467 468 UpdateConnectionTypeHistograms(CONNECTION_SPDY, result >= 0); 469 470 if (result != net::OK) { 471 DCHECK_LT(result, 0); 472 CloseSessionOnError(static_cast<net::Error>(result)); 473 return; 474 } 475 476 AdjustSocketBufferSizes(connection_->socket()); 477 478 if (use_ssl_) { 479 // Add a SSL socket on top of our existing transport socket. 480 ClientSocket* socket = connection_->release_socket(); 481 // TODO(mbelshe): Fix the hostname. This is BROKEN without having 482 // a real hostname. 483 socket = session_->socket_factory()->CreateSSLClientSocket( 484 socket, "" /* request_->url.HostNoBrackets() */ , ssl_config_); 485 connection_->set_socket(socket); 486 is_secure_ = true; 487 // TODO(willchan): Plumb LoadLog into FLIP code. 488 int status = connection_->socket()->Connect(&ssl_connect_callback_, NULL); 489 if (status != ERR_IO_PENDING) 490 OnSSLConnect(status); 491 } else { 492 DCHECK_EQ(state_, CONNECTING); 493 state_ = CONNECTED; 494 495 // Make sure we get any pending data sent. 496 WriteSocketLater(); 497 // Start reading 498 ReadSocket(); 499 } 500} 501 502void FlipSession::OnSSLConnect(int result) { 503 // TODO(mbelshe): We need to replicate the functionality of 504 // HttpNetworkTransaction::DoSSLConnectComplete here, where it calls 505 // HandleCertificateError() and such. 506 if (IsCertificateError(result)) 507 result = OK; // TODO(mbelshe): pretend we're happy anyway. 508 509 if (result == OK) { 510 DCHECK_EQ(state_, CONNECTING); 511 state_ = CONNECTED; 512 513 // After we've connected, send any data to the server, and then issue 514 // our read. 515 WriteSocketLater(); 516 ReadSocket(); 517 } else { 518 DCHECK_LT(result, 0); // It should be an error, not a byte count. 519 CloseSessionOnError(static_cast<net::Error>(result)); 520 } 521} 522 523void FlipSession::OnReadComplete(int bytes_read) { 524 // Parse a frame. For now this code requires that the frame fit into our 525 // buffer (32KB). 526 // TODO(mbelshe): support arbitrarily large frames! 527 528 LOG(INFO) << "Flip socket read: " << bytes_read << " bytes"; 529 530 read_pending_ = false; 531 532 if (bytes_read <= 0) { 533 // Session is tearing down. 534 net::Error error = static_cast<net::Error>(bytes_read); 535 if (error == OK) 536 error = ERR_CONNECTION_CLOSED; 537 CloseSessionOnError(error); 538 return; 539 } 540 541 // The FlipFramer will use callbacks onto |this| as it parses frames. 542 // When errors occur, those callbacks can lead to teardown of all references 543 // to |this|, so maintain a reference to self during this call for safe 544 // cleanup. 545 scoped_refptr<FlipSession> self(this); 546 547 char *data = read_buffer_->data(); 548 while (bytes_read && 549 flip_framer_.error_code() == flip::FlipFramer::FLIP_NO_ERROR) { 550 uint32 bytes_processed = flip_framer_.ProcessInput(data, bytes_read); 551 bytes_read -= bytes_processed; 552 data += bytes_processed; 553 if (flip_framer_.state() == flip::FlipFramer::FLIP_DONE) 554 flip_framer_.Reset(); 555 } 556 557 if (state_ != CLOSED) 558 ReadSocket(); 559} 560 561void FlipSession::OnWriteComplete(int result) { 562 DCHECK(write_pending_); 563 DCHECK(in_flight_write_.size()); 564 DCHECK(result != 0); // This shouldn't happen for write. 565 566 write_pending_ = false; 567 568 LOG(INFO) << "Flip write complete (result=" << result << ") for stream: " 569 << in_flight_write_.stream()->stream_id(); 570 571 if (result >= 0) { 572 // It should not be possible to have written more bytes than our 573 // in_flight_write_. 574 DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); 575 576 in_flight_write_.buffer()->DidConsume(result); 577 578 // We only notify the stream when we've fully written the pending frame. 579 if (!in_flight_write_.buffer()->BytesRemaining()) { 580 scoped_refptr<FlipStream> stream = in_flight_write_.stream(); 581 DCHECK(stream.get()); 582 583 // Report the number of bytes written to the caller, but exclude the 584 // frame size overhead. NOTE: if this frame was compressed the reported 585 // bytes written is the compressed size, not the original size. 586 if (result > 0) { 587 result = in_flight_write_.buffer()->size(); 588 DCHECK_GT(result, static_cast<int>(flip::FlipFrame::size())); 589 result -= static_cast<int>(flip::FlipFrame::size()); 590 } 591 592 // It is possible that the stream was cancelled while we were writing 593 // to the socket. 594 if (!stream->cancelled()) 595 stream->OnWriteComplete(result); 596 597 // Cleanup the write which just completed. 598 in_flight_write_.release(); 599 } 600 601 // Write more data. We're already in a continuation, so we can 602 // go ahead and write it immediately (without going back to the 603 // message loop). 604 WriteSocketLater(); 605 } else { 606 in_flight_write_.release(); 607 608 // The stream is now errored. Close it down. 609 CloseSessionOnError(static_cast<net::Error>(result)); 610 } 611} 612 613void FlipSession::ReadSocket() { 614 if (read_pending_) 615 return; 616 617 if (state_ == CLOSED) { 618 NOTREACHED(); 619 return; 620 } 621 622 CHECK(connection_.get()); 623 CHECK(connection_->socket()); 624 int bytes_read = connection_->socket()->Read(read_buffer_.get(), 625 kReadBufferSize, 626 &read_callback_); 627 switch (bytes_read) { 628 case 0: 629 // Socket is closed! 630 // TODO(mbelshe): Need to abort any active streams here. 631 DCHECK(!active_streams_.size()); 632 return; 633 case net::ERR_IO_PENDING: 634 // Waiting for data. Nothing to do now. 635 read_pending_ = true; 636 return; 637 default: 638 // Data was read, process it. 639 // Schedule the work through the message loop to avoid recursive 640 // callbacks. 641 read_pending_ = true; 642 MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( 643 this, &FlipSession::OnReadComplete, bytes_read)); 644 break; 645 } 646} 647 648void FlipSession::WriteSocketLater() { 649 if (delayed_write_pending_) 650 return; 651 652 delayed_write_pending_ = true; 653 MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( 654 this, &FlipSession::WriteSocket)); 655} 656 657void FlipSession::WriteSocket() { 658 // This function should only be called via WriteSocketLater. 659 DCHECK(delayed_write_pending_); 660 delayed_write_pending_ = false; 661 662 // If the socket isn't connected yet, just wait; we'll get called 663 // again when the socket connection completes. If the socket is 664 // closed, just return. 665 if (state_ < CONNECTED || state_ == CLOSED) 666 return; 667 668 if (write_pending_) // Another write is in progress still. 669 return; 670 671 // Loop sending frames until we've sent everything or until the write 672 // returns error (or ERR_IO_PENDING). 673 while (in_flight_write_.buffer() || queue_.size()) { 674 if (!in_flight_write_.buffer()) { 675 // Grab the next FlipFrame to send. 676 FlipIOBuffer next_buffer = queue_.top(); 677 queue_.pop(); 678 679 // We've deferred compression until just before we write it to the socket, 680 // which is now. At this time, we don't compress our data frames. 681 flip::FlipFrame uncompressed_frame(next_buffer.buffer()->data(), false); 682 size_t size; 683 if (uncompressed_frame.is_control_frame()) { 684 scoped_ptr<flip::FlipFrame> compressed_frame( 685 flip_framer_.CompressFrame(&uncompressed_frame)); 686 size = compressed_frame->length() + flip::FlipFrame::size(); 687 688 DCHECK(size > 0); 689 690 // TODO(mbelshe): We have too much copying of data here. 691 IOBufferWithSize* buffer = new IOBufferWithSize(size); 692 memcpy(buffer->data(), compressed_frame->data(), size); 693 694 // Attempt to send the frame. 695 in_flight_write_ = FlipIOBuffer(buffer, size, 0, next_buffer.stream()); 696 } else { 697 size = uncompressed_frame.length() + flip::FlipFrame::size(); 698 in_flight_write_ = next_buffer; 699 } 700 } else { 701 DCHECK(in_flight_write_.buffer()->BytesRemaining()); 702 } 703 704 write_pending_ = true; 705 int rv = connection_->socket()->Write(in_flight_write_.buffer(), 706 in_flight_write_.buffer()->BytesRemaining(), &write_callback_); 707 if (rv == net::ERR_IO_PENDING) 708 break; 709 710 // We sent the frame successfully. 711 OnWriteComplete(rv); 712 713 // TODO(mbelshe): Test this error case. Maybe we should mark the socket 714 // as in an error state. 715 if (rv < 0) 716 break; 717 } 718} 719 720void FlipSession::CloseAllStreams(net::Error code) { 721 LOG(INFO) << "Closing all FLIP Streams"; 722 723 static StatsCounter abandoned_streams("flip.abandoned_streams"); 724 static StatsCounter abandoned_push_streams("flip.abandoned_push_streams"); 725 726 if (active_streams_.size()) { 727 abandoned_streams.Add(active_streams_.size()); 728 729 // Create a copy of the list, since aborting streams can invalidate 730 // our list. 731 FlipStream** list = new FlipStream*[active_streams_.size()]; 732 ActiveStreamMap::const_iterator it; 733 int index = 0; 734 for (it = active_streams_.begin(); it != active_streams_.end(); ++it) 735 list[index++] = it->second; 736 737 // Issue the aborts. 738 for (--index; index >= 0; index--) { 739 LOG(ERROR) << "ABANDONED (stream_id=" << list[index]->stream_id() 740 << "): " << list[index]->path(); 741 list[index]->OnClose(code); 742 } 743 744 // Clear out anything pending. 745 active_streams_.clear(); 746 747 delete[] list; 748 } 749 750 if (pushed_streams_.size()) { 751 streams_abandoned_count_ += pushed_streams_.size(); 752 abandoned_push_streams.Add(pushed_streams_.size()); 753 pushed_streams_.clear(); 754 } 755} 756 757int FlipSession::GetNewStreamId() { 758 int id = stream_hi_water_mark_; 759 stream_hi_water_mark_ += 2; 760 if (stream_hi_water_mark_ > 0x7fff) 761 stream_hi_water_mark_ = 1; 762 return id; 763} 764 765void FlipSession::CloseSessionOnError(net::Error err) { 766 DCHECK_LT(err, OK); 767 LOG(INFO) << "Flip::CloseSessionOnError(" << err << ")"; 768 769 // Don't close twice. This can occur because we can have both 770 // a read and a write outstanding, and each can complete with 771 // an error. 772 if (state_ != CLOSED) { 773 state_ = CLOSED; 774 error_ = err; 775 CloseAllStreams(err); 776 session_->flip_session_pool()->Remove(this); 777 } 778} 779 780void FlipSession::ActivateStream(FlipStream* stream) { 781 const flip::FlipStreamId id = stream->stream_id(); 782 DCHECK(!IsStreamActive(id)); 783 784 active_streams_[id] = stream; 785} 786 787void FlipSession::DeactivateStream(flip::FlipStreamId id) { 788 DCHECK(IsStreamActive(id)); 789 790 // Verify it is not on the pushed_streams_ list. 791 ActiveStreamList::iterator it; 792 for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { 793 scoped_refptr<FlipStream> curr = *it; 794 if (id == curr->stream_id()) { 795 pushed_streams_.erase(it); 796 break; 797 } 798 } 799 800 active_streams_.erase(id); 801} 802 803scoped_refptr<FlipStream> FlipSession::GetPushStream(const std::string& path) { 804 static StatsCounter used_push_streams("flip.claimed_push_streams"); 805 806 LOG(INFO) << "Looking for push stream: " << path; 807 808 scoped_refptr<FlipStream> stream; 809 810 // We just walk a linear list here. 811 ActiveStreamList::iterator it; 812 for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { 813 stream = *it; 814 if (path == stream->path()) { 815 CHECK(stream->pushed()); 816 pushed_streams_.erase(it); 817 used_push_streams.Increment(); 818 LOG(INFO) << "Push Stream Claim for: " << path; 819 break; 820 } 821 } 822 823 return stream; 824} 825 826void FlipSession::GetSSLInfo(SSLInfo* ssl_info) { 827 if (is_secure_) { 828 SSLClientSocket* ssl_socket = 829 reinterpret_cast<SSLClientSocket*>(connection_->socket()); 830 ssl_socket->GetSSLInfo(ssl_info); 831 } 832} 833 834void FlipSession::OnError(flip::FlipFramer* framer) { 835 LOG(ERROR) << "FlipSession error: " << framer->error_code(); 836 CloseSessionOnError(net::ERR_FLIP_PROTOCOL_ERROR); 837} 838 839void FlipSession::OnStreamFrameData(flip::FlipStreamId stream_id, 840 const char* data, 841 size_t len) { 842 LOG(INFO) << "Flip data for stream " << stream_id << ", " << len << " bytes"; 843 bool valid_stream = IsStreamActive(stream_id); 844 if (!valid_stream) { 845 // NOTE: it may just be that the stream was cancelled. 846 LOG(WARNING) << "Received data frame for invalid stream " << stream_id; 847 return; 848 } 849 850 scoped_refptr<FlipStream> stream = active_streams_[stream_id]; 851 bool success = stream->OnDataReceived(data, len); 852 // |len| == 0 implies a closed stream. 853 if (!success || !len) 854 DeactivateStream(stream_id); 855} 856 857void FlipSession::OnSyn(const flip::FlipSynStreamControlFrame* frame, 858 const flip::FlipHeaderBlock* headers) { 859 flip::FlipStreamId stream_id = frame->stream_id(); 860 861 // Server-initiated streams should have even sequence numbers. 862 if ((stream_id & 0x1) != 0) { 863 LOG(ERROR) << "Received invalid OnSyn stream id " << stream_id; 864 return; 865 } 866 867 if (IsStreamActive(stream_id)) { 868 LOG(ERROR) << "Received OnSyn for active stream " << stream_id; 869 return; 870 } 871 872 streams_pushed_count_++; 873 874 LOG(INFO) << "FlipSession: Syn received for stream: " << stream_id; 875 876 LOG(INFO) << "FLIP SYN RESPONSE HEADERS -----------------------"; 877 DumpFlipHeaders(*headers); 878 879 // TODO(mbelshe): DCHECK that this is a GET method? 880 881 const std::string& path = ContainsKey(*headers, "path") ? 882 headers->find("path")->second : ""; 883 884 // Verify that the response had a URL for us. 885 DCHECK(!path.empty()); 886 if (path.empty()) { 887 LOG(WARNING) << "Pushed stream did not contain a path."; 888 return; 889 } 890 891 scoped_refptr<FlipStream> stream; 892 893 // Check if we already have a delegate awaiting this stream. 894 PendingStreamMap::iterator it; 895 it = pending_streams_.find(path); 896 if (it != pending_streams_.end()) { 897 stream = it->second; 898 pending_streams_.erase(it); 899 if (stream) 900 pushed_streams_.push_back(stream); 901 } else { 902 pushed_streams_.push_back(stream); 903 } 904 905 if (stream) { 906 CHECK(stream->pushed()); 907 CHECK(stream->stream_id() == 0); 908 stream->set_stream_id(stream_id); 909 } else { 910 // TODO(mbelshe): can we figure out how to use a LoadLog here? 911 stream = new FlipStream(this, stream_id, true, NULL); 912 } 913 914 // Activate a stream and parse the headers. 915 ActivateStream(stream); 916 917 stream->set_path(path); 918 919 // TODO(mbelshe): For now we convert from our nice hash map back 920 // to a string of headers; this is because the HttpResponseInfo 921 // is a bit rigid for its http (non-flip) design. 922 HttpResponseInfo response; 923 if (FlipHeadersToHttpResponse(*headers, &response)) { 924 GetSSLInfo(&response.ssl_info); 925 stream->OnResponseReceived(response); 926 } else { 927 stream->OnClose(ERR_INVALID_RESPONSE); 928 DeactivateStream(stream_id); 929 return; 930 } 931 932 LOG(INFO) << "Got pushed stream for " << stream->path(); 933 934 static StatsCounter push_requests("flip.pushed_streams"); 935 push_requests.Increment(); 936} 937 938void FlipSession::OnSynReply(const flip::FlipSynReplyControlFrame* frame, 939 const flip::FlipHeaderBlock* headers) { 940 DCHECK(headers); 941 flip::FlipStreamId stream_id = frame->stream_id(); 942 bool valid_stream = IsStreamActive(stream_id); 943 if (!valid_stream) { 944 // NOTE: it may just be that the stream was cancelled. 945 LOG(WARNING) << "Received SYN_REPLY for invalid stream " << stream_id; 946 return; 947 } 948 949 LOG(INFO) << "FLIP SYN_REPLY RESPONSE HEADERS for stream: " << stream_id; 950 DumpFlipHeaders(*headers); 951 952 // We record content declared as being pushed so that we don't 953 // request a duplicate stream which is already scheduled to be 954 // sent to us. 955 flip::FlipHeaderBlock::const_iterator it; 956 it = headers->find("X-Associated-Content"); 957 if (it != headers->end()) { 958 const std::string& content = it->second; 959 std::string::size_type start = 0; 960 std::string::size_type end = 0; 961 do { 962 end = content.find("||", start); 963 if (end == std::string::npos) 964 end = content.length(); 965 std::string url = content.substr(start, end - start); 966 std::string::size_type pos = url.find("??"); 967 if (pos == std::string::npos) 968 break; 969 url = url.substr(pos + 2); 970 GURL gurl(url); 971 std::string path = gurl.PathForRequest(); 972 if (path.length()) 973 pending_streams_[path] = NULL; 974 else 975 LOG(INFO) << "Invalid X-Associated-Content path: " << url; 976 start = end + 2; 977 } while (start < content.length()); 978 } 979 980 scoped_refptr<FlipStream> stream = active_streams_[stream_id]; 981 CHECK(stream->stream_id() == stream_id); 982 CHECK(!stream->cancelled()); 983 HttpResponseInfo response; 984 if (FlipHeadersToHttpResponse(*headers, &response)) { 985 GetSSLInfo(&response.ssl_info); 986 stream->OnResponseReceived(response); 987 } else { 988 stream->OnClose(ERR_INVALID_RESPONSE); 989 DeactivateStream(stream_id); 990 } 991} 992 993void FlipSession::OnControl(const flip::FlipControlFrame* frame) { 994 flip::FlipHeaderBlock headers; 995 uint32 type = frame->type(); 996 if (type == flip::SYN_STREAM || type == flip::SYN_REPLY) { 997 if (!flip_framer_.ParseHeaderBlock(frame, &headers)) { 998 LOG(WARNING) << "Could not parse Flip Control Frame Header"; 999 // TODO(mbelshe): Error the session? 1000 return; 1001 } 1002 } 1003 1004 switch (type) { 1005 case flip::SYN_STREAM: 1006 LOG(INFO) << "Flip SynStream for stream " << frame->stream_id(); 1007 OnSyn(reinterpret_cast<const flip::FlipSynStreamControlFrame*>(frame), 1008 &headers); 1009 break; 1010 case flip::SYN_REPLY: 1011 LOG(INFO) << "Flip SynReply for stream " << frame->stream_id(); 1012 OnSynReply( 1013 reinterpret_cast<const flip::FlipSynReplyControlFrame*>(frame), 1014 &headers); 1015 break; 1016 case flip::FIN_STREAM: 1017 LOG(INFO) << "Flip Fin for stream " << frame->stream_id(); 1018 OnFin(reinterpret_cast<const flip::FlipFinStreamControlFrame*>(frame)); 1019 break; 1020 default: 1021 DCHECK(false); // Error! 1022 } 1023} 1024 1025void FlipSession::OnFin(const flip::FlipFinStreamControlFrame* frame) { 1026 flip::FlipStreamId stream_id = frame->stream_id(); 1027 bool valid_stream = IsStreamActive(stream_id); 1028 if (!valid_stream) { 1029 // NOTE: it may just be that the stream was cancelled. 1030 LOG(WARNING) << "Received FIN for invalid stream" << stream_id; 1031 return; 1032 } 1033 scoped_refptr<FlipStream> stream = active_streams_[stream_id]; 1034 CHECK(stream->stream_id() == stream_id); 1035 CHECK(!stream->cancelled()); 1036 if (frame->status() == 0) { 1037 stream->OnDataReceived(NULL, 0); 1038 } else { 1039 LOG(ERROR) << "Flip stream closed: " << frame->status(); 1040 // TODO(mbelshe): Map from Flip-protocol errors to something sensical. 1041 // For now, it doesn't matter much - it is a protocol error. 1042 stream->OnClose(ERR_FAILED); 1043 } 1044 1045 DeactivateStream(stream_id); 1046} 1047 1048} // namespace net 1049