1// Copyright (c) 2009 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include <algorithm> 6#include <limits> 7 8#include "net/websockets/websocket.h" 9 10#include "base/message_loop.h" 11#include "net/base/host_resolver.h" 12#include "net/websockets/websocket_handshake.h" 13#include "net/websockets/websocket_handshake_draft75.h" 14 15namespace net { 16 17static const char kClosingFrame[2] = {'\xff', '\x00'}; 18static int64 kClosingHandshakeTimeout = 1000; // msec. 19 20WebSocket::WebSocket(Request* request, WebSocketDelegate* delegate) 21 : ready_state_(INITIALIZED), 22 request_(request), 23 handshake_(NULL), 24 delegate_(delegate), 25 origin_loop_(MessageLoop::current()), 26 socket_stream_(NULL), 27 max_pending_send_allowed_(0), 28 current_read_buf_(NULL), 29 read_consumed_len_(0), 30 current_write_buf_(NULL), 31 server_closing_handshake_(false), 32 client_closing_handshake_(false), 33 closing_handshake_started_(false), 34 force_close_task_(NULL), 35 closing_handshake_timeout_(kClosingHandshakeTimeout) { 36 DCHECK(request_.get()); 37 DCHECK(delegate_); 38 DCHECK(origin_loop_); 39} 40 41WebSocket::~WebSocket() { 42 DCHECK(ready_state_ == INITIALIZED || !delegate_); 43 DCHECK(!socket_stream_); 44 DCHECK(!delegate_); 45} 46 47void WebSocket::Connect() { 48 DCHECK(ready_state_ == INITIALIZED); 49 DCHECK(request_.get()); 50 DCHECK(delegate_); 51 DCHECK(!socket_stream_); 52 DCHECK(MessageLoop::current() == origin_loop_); 53 54 socket_stream_ = new SocketStream(request_->url(), this); 55 socket_stream_->set_context(request_->context()); 56 57 if (request_->host_resolver()) 58 socket_stream_->SetHostResolver(request_->host_resolver()); 59 if (request_->client_socket_factory()) 60 socket_stream_->SetClientSocketFactory(request_->client_socket_factory()); 61 62 AddRef(); // Release in DoClose(). 63 ready_state_ = CONNECTING; 64 socket_stream_->Connect(); 65} 66 67void WebSocket::Send(const std::string& msg) { 68 if (ready_state_ == CLOSING || ready_state_ == CLOSED) { 69 return; 70 } 71 if (client_closing_handshake_) { 72 // We must not send any data after we start the WebSocket closing handshake. 73 return; 74 } 75 DCHECK(ready_state_ == OPEN); 76 DCHECK(MessageLoop::current() == origin_loop_); 77 78 IOBufferWithSize* buf = new IOBufferWithSize(msg.size() + 2); 79 char* p = buf->data(); 80 *p = '\0'; 81 memcpy(p + 1, msg.data(), msg.size()); 82 *(p + 1 + msg.size()) = '\xff'; 83 pending_write_bufs_.push_back(make_scoped_refptr(buf)); 84 SendPending(); 85} 86 87void WebSocket::Close() { 88 DCHECK(MessageLoop::current() == origin_loop_); 89 90 // If connection has not yet started, do nothing. 91 if (ready_state_ == INITIALIZED) { 92 DCHECK(!socket_stream_); 93 ready_state_ = CLOSED; 94 return; 95 } 96 97 // If the readyState attribute is in the CLOSING or CLOSED state, do nothing 98 if (ready_state_ == CLOSING || ready_state_ == CLOSED) 99 return; 100 101 if (request_->version() == DRAFT75) { 102 DCHECK(socket_stream_); 103 socket_stream_->Close(); 104 return; 105 } 106 107 // If the WebSocket connection is not yet established, fail the WebSocket 108 // connection and set the readyState attribute's value to CLOSING. 109 if (ready_state_ == CONNECTING) { 110 ready_state_ = CLOSING; 111 origin_loop_->PostTask( 112 FROM_HERE, 113 NewRunnableMethod(this, &WebSocket::FailConnection)); 114 } 115 116 // If the WebSocket closing handshake has not yet been started, start 117 // the WebSocket closing handshake and set the readyState attribute's value 118 // to CLOSING. 119 if (!closing_handshake_started_) { 120 ready_state_ = CLOSING; 121 origin_loop_->PostTask( 122 FROM_HERE, 123 NewRunnableMethod(this, &WebSocket::StartClosingHandshake)); 124 } 125 126 // Otherwise, set the readyState attribute's value to CLOSING. 127 ready_state_ = CLOSING; 128} 129 130void WebSocket::DetachDelegate() { 131 if (!delegate_) 132 return; 133 delegate_ = NULL; 134 if (ready_state_ == INITIALIZED) { 135 DCHECK(!socket_stream_); 136 ready_state_ = CLOSED; 137 return; 138 } 139 if (ready_state_ != CLOSED) { 140 DCHECK(socket_stream_); 141 socket_stream_->Close(); 142 } 143} 144 145void WebSocket::OnConnected(SocketStream* socket_stream, 146 int max_pending_send_allowed) { 147 DCHECK(socket_stream == socket_stream_); 148 max_pending_send_allowed_ = max_pending_send_allowed; 149 150 // Use |max_pending_send_allowed| as hint for initial size of read buffer. 151 current_read_buf_ = new GrowableIOBuffer(); 152 current_read_buf_->SetCapacity(max_pending_send_allowed_); 153 read_consumed_len_ = 0; 154 155 DCHECK(!current_write_buf_); 156 DCHECK(!handshake_.get()); 157 switch (request_->version()) { 158 case DEFAULT_VERSION: 159 handshake_.reset(new WebSocketHandshake( 160 request_->url(), request_->origin(), request_->location(), 161 request_->protocol())); 162 break; 163 case DRAFT75: 164 handshake_.reset(new WebSocketHandshakeDraft75( 165 request_->url(), request_->origin(), request_->location(), 166 request_->protocol())); 167 break; 168 default: 169 NOTREACHED() << "Unexpected protocol version:" << request_->version(); 170 } 171 172 const std::string msg = handshake_->CreateClientHandshakeMessage(); 173 IOBufferWithSize* buf = new IOBufferWithSize(msg.size()); 174 memcpy(buf->data(), msg.data(), msg.size()); 175 pending_write_bufs_.push_back(make_scoped_refptr(buf)); 176 origin_loop_->PostTask(FROM_HERE, 177 NewRunnableMethod(this, &WebSocket::SendPending)); 178} 179 180void WebSocket::OnSentData(SocketStream* socket_stream, int amount_sent) { 181 DCHECK(socket_stream == socket_stream_); 182 DCHECK(current_write_buf_); 183 current_write_buf_->DidConsume(amount_sent); 184 DCHECK_GE(current_write_buf_->BytesRemaining(), 0); 185 if (current_write_buf_->BytesRemaining() == 0) { 186 current_write_buf_ = NULL; 187 pending_write_bufs_.pop_front(); 188 } 189 origin_loop_->PostTask(FROM_HERE, 190 NewRunnableMethod(this, &WebSocket::SendPending)); 191} 192 193void WebSocket::OnReceivedData(SocketStream* socket_stream, 194 const char* data, int len) { 195 DCHECK(socket_stream == socket_stream_); 196 AddToReadBuffer(data, len); 197 origin_loop_->PostTask(FROM_HERE, 198 NewRunnableMethod(this, &WebSocket::DoReceivedData)); 199} 200 201void WebSocket::OnClose(SocketStream* socket_stream) { 202 origin_loop_->PostTask(FROM_HERE, 203 NewRunnableMethod(this, &WebSocket::DoClose)); 204} 205 206void WebSocket::OnError(const SocketStream* socket_stream, int error) { 207 origin_loop_->PostTask( 208 FROM_HERE, NewRunnableMethod(this, &WebSocket::DoSocketError, error)); 209} 210 211void WebSocket::SendPending() { 212 DCHECK(MessageLoop::current() == origin_loop_); 213 if (!socket_stream_) { 214 DCHECK_EQ(CLOSED, ready_state_); 215 return; 216 } 217 if (!current_write_buf_) { 218 if (pending_write_bufs_.empty()) { 219 if (client_closing_handshake_) { 220 // Already sent 0xFF and 0x00 bytes. 221 // *The WebSocket closing handshake has started.* 222 closing_handshake_started_ = true; 223 if (server_closing_handshake_) { 224 // 4.2 3-8-3 If the WebSocket connection is not already closed, 225 // then close the WebSocket connection. 226 // *The WebSocket closing handshake has finished* 227 socket_stream_->Close(); 228 } else { 229 // 5. Wait a user-agent-determined length of time, or until the 230 // WebSocket connection is closed. 231 force_close_task_ = 232 NewRunnableMethod(this, &WebSocket::DoForceCloseConnection); 233 origin_loop_->PostDelayedTask( 234 FROM_HERE, force_close_task_, closing_handshake_timeout_); 235 } 236 } 237 return; 238 } 239 current_write_buf_ = new DrainableIOBuffer( 240 pending_write_bufs_.front(), pending_write_bufs_.front()->size()); 241 } 242 DCHECK_GT(current_write_buf_->BytesRemaining(), 0); 243 bool sent = socket_stream_->SendData( 244 current_write_buf_->data(), 245 std::min(current_write_buf_->BytesRemaining(), 246 max_pending_send_allowed_)); 247 DCHECK(sent); 248} 249 250void WebSocket::DoReceivedData() { 251 DCHECK(MessageLoop::current() == origin_loop_); 252 scoped_refptr<WebSocket> protect(this); 253 switch (ready_state_) { 254 case CONNECTING: 255 { 256 DCHECK(handshake_.get()); 257 DCHECK(current_read_buf_); 258 const char* data = 259 current_read_buf_->StartOfBuffer() + read_consumed_len_; 260 size_t len = current_read_buf_->offset() - read_consumed_len_; 261 int eoh = handshake_->ReadServerHandshake(data, len); 262 if (eoh < 0) { 263 // Not enough data, Retry when more data is available. 264 return; 265 } 266 SkipReadBuffer(eoh); 267 } 268 if (handshake_->mode() != WebSocketHandshake::MODE_CONNECTED) { 269 // Handshake failed. 270 socket_stream_->Close(); 271 return; 272 } 273 ready_state_ = OPEN; 274 if (delegate_) 275 delegate_->OnOpen(this); 276 if (current_read_buf_->offset() == read_consumed_len_) { 277 // No remaining data after handshake message. 278 break; 279 } 280 // FALL THROUGH 281 case OPEN: 282 case CLOSING: // need to process closing-frame from server. 283 ProcessFrameData(); 284 break; 285 286 case CLOSED: 287 // Closed just after DoReceivedData is queued on |origin_loop_|. 288 break; 289 default: 290 NOTREACHED(); 291 break; 292 } 293} 294 295void WebSocket::ProcessFrameData() { 296 DCHECK(current_read_buf_); 297 if (server_closing_handshake_) { 298 // Any data on the connection after the 0xFF frame is discarded. 299 return; 300 } 301 scoped_refptr<WebSocket> protect(this); 302 const char* start_frame = 303 current_read_buf_->StartOfBuffer() + read_consumed_len_; 304 const char* next_frame = start_frame; 305 const char* p = next_frame; 306 const char* end = 307 current_read_buf_->StartOfBuffer() + current_read_buf_->offset(); 308 while (p < end) { 309 // Let /error/ be false. 310 bool error = false; 311 312 // Handle the /frame type/ byte as follows. 313 unsigned char frame_byte = static_cast<unsigned char>(*p++); 314 if ((frame_byte & 0x80) == 0x80) { 315 int length = 0; 316 while (p < end) { 317 if (length > std::numeric_limits<int>::max() / 128) { 318 // frame length overflow. 319 socket_stream_->Close(); 320 return; 321 } 322 unsigned char c = static_cast<unsigned char>(*p); 323 length = length * 128 + (c & 0x7f); 324 ++p; 325 if ((c & 0x80) != 0x80) 326 break; 327 } 328 // Checks if the frame body hasn't been completely received yet. 329 // It also checks the case the frame length bytes haven't been completely 330 // received yet, because p == end and length > 0 in such case. 331 if (p + length < end) { 332 p += length; 333 next_frame = p; 334 if (request_->version() != DRAFT75 && 335 frame_byte == 0xFF && length == 0) { 336 // 4.2 Data framing 3. Handle the /frame type/ byte. 337 // 8. If the /frame type/ is 0xFF and the /length/ was 0, then 338 // run the following substeps: 339 // 1. If the WebSocket closing handshake has not yet started, then 340 // start the WebSocket closing handshake. 341 server_closing_handshake_ = true; 342 if (!closing_handshake_started_) { 343 origin_loop_->PostTask( 344 FROM_HERE, 345 NewRunnableMethod(this, &WebSocket::StartClosingHandshake)); 346 } else { 347 // If the WebSocket closing handshake has been started and 348 // the WebSocket connection is not already closed, then close 349 // the WebSocket connection. 350 socket_stream_->Close(); 351 } 352 return; 353 } 354 // 4.2 3-8 Otherwise, let /error/ be true. 355 error = true; 356 } else { 357 // Not enough data in buffer. 358 break; 359 } 360 } else { 361 const char* msg_start = p; 362 while (p < end && *p != '\xff') 363 ++p; 364 if (p < end && *p == '\xff') { 365 if (frame_byte == 0x00) { 366 if (delegate_) { 367 delegate_->OnMessage(this, std::string(msg_start, p - msg_start)); 368 } 369 } else { 370 // Otherwise, discard the data and let /error/ to be true. 371 error = true; 372 } 373 ++p; 374 next_frame = p; 375 } 376 } 377 // If /error/ is true, then *a WebSocket error has been detected.* 378 if (error && delegate_) 379 delegate_->OnError(this); 380 } 381 SkipReadBuffer(next_frame - start_frame); 382} 383 384void WebSocket::AddToReadBuffer(const char* data, int len) { 385 DCHECK(current_read_buf_); 386 // Check if |current_read_buf_| has enough space to store |len| of |data|. 387 if (len >= current_read_buf_->RemainingCapacity()) { 388 current_read_buf_->SetCapacity( 389 current_read_buf_->offset() + len); 390 } 391 392 DCHECK(current_read_buf_->RemainingCapacity() >= len); 393 memcpy(current_read_buf_->data(), data, len); 394 current_read_buf_->set_offset(current_read_buf_->offset() + len); 395} 396 397void WebSocket::SkipReadBuffer(int len) { 398 if (len == 0) 399 return; 400 DCHECK_GT(len, 0); 401 read_consumed_len_ += len; 402 int remaining = current_read_buf_->offset() - read_consumed_len_; 403 DCHECK_GE(remaining, 0); 404 if (remaining < read_consumed_len_ && 405 current_read_buf_->RemainingCapacity() < read_consumed_len_) { 406 // Pre compaction: 407 // 0 v-read_consumed_len_ v-offset v- capacity 408 // |..processed..| .. remaining .. | .. RemainingCapacity | 409 // 410 memmove(current_read_buf_->StartOfBuffer(), 411 current_read_buf_->StartOfBuffer() + read_consumed_len_, 412 remaining); 413 read_consumed_len_ = 0; 414 current_read_buf_->set_offset(remaining); 415 // Post compaction: 416 // 0read_consumed_len_ v- offset v- capacity 417 // |.. remaining .. | .. RemainingCapacity ... | 418 // 419 } 420} 421 422void WebSocket::StartClosingHandshake() { 423 // 4.2 *start the WebSocket closing handshake*. 424 if (closing_handshake_started_ || client_closing_handshake_) { 425 // 1. If the WebSocket closing handshake has started, then abort these 426 // steps. 427 return; 428 } 429 // 2.,3. Send a 0xFF and 0x00 byte to the server. 430 client_closing_handshake_ = true; 431 IOBufferWithSize* buf = new IOBufferWithSize(2); 432 memcpy(buf->data(), kClosingFrame, 2); 433 pending_write_bufs_.push_back(make_scoped_refptr(buf)); 434 SendPending(); 435} 436 437void WebSocket::DoForceCloseConnection() { 438 // 4.2 *start the WebSocket closing handshake* 439 // 6. If the WebSocket connection is not already closed, then close the 440 // WebSocket connection. (If this happens, then the closing handshake 441 // doesn't finish.) 442 DCHECK(MessageLoop::current() == origin_loop_); 443 force_close_task_ = NULL; 444 FailConnection(); 445} 446 447void WebSocket::FailConnection() { 448 DCHECK(MessageLoop::current() == origin_loop_); 449 // 6.1 Client-initiated closure. 450 // *fail the WebSocket connection*. 451 // the user agent must close the WebSocket connection, and may report the 452 // problem to the user. 453 if (!socket_stream_) 454 return; 455 socket_stream_->Close(); 456} 457 458void WebSocket::DoClose() { 459 DCHECK(MessageLoop::current() == origin_loop_); 460 if (force_close_task_) { 461 // WebSocket connection is closed while waiting a user-agent-determined 462 // length of time after *The WebSocket closing handshake has started*. 463 force_close_task_->Cancel(); 464 force_close_task_ = NULL; 465 } 466 WebSocketDelegate* delegate = delegate_; 467 delegate_ = NULL; 468 ready_state_ = CLOSED; 469 if (!socket_stream_) 470 return; 471 socket_stream_ = NULL; 472 if (delegate) 473 delegate->OnClose(this, 474 server_closing_handshake_ && closing_handshake_started_); 475 Release(); 476} 477 478void WebSocket::DoSocketError(int error) { 479 DCHECK(MessageLoop::current() == origin_loop_); 480 if (delegate_) 481 delegate_->OnSocketError(this, error); 482} 483 484} // namespace net 485