1// Copyright (c) 2009 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <algorithm>
6#include <limits>
7
8#include "net/websockets/websocket.h"
9
10#include "base/message_loop.h"
11#include "net/base/host_resolver.h"
12#include "net/websockets/websocket_handshake.h"
13#include "net/websockets/websocket_handshake_draft75.h"
14
15namespace net {
16
17static const char kClosingFrame[2] = {'\xff', '\x00'};
18static int64 kClosingHandshakeTimeout = 1000;  // msec.
19
20WebSocket::WebSocket(Request* request, WebSocketDelegate* delegate)
21    : ready_state_(INITIALIZED),
22      request_(request),
23      handshake_(NULL),
24      delegate_(delegate),
25      origin_loop_(MessageLoop::current()),
26      socket_stream_(NULL),
27      max_pending_send_allowed_(0),
28      current_read_buf_(NULL),
29      read_consumed_len_(0),
30      current_write_buf_(NULL),
31      server_closing_handshake_(false),
32      client_closing_handshake_(false),
33      closing_handshake_started_(false),
34      force_close_task_(NULL),
35      closing_handshake_timeout_(kClosingHandshakeTimeout) {
36  DCHECK(request_.get());
37  DCHECK(delegate_);
38  DCHECK(origin_loop_);
39}
40
41WebSocket::~WebSocket() {
42  DCHECK(ready_state_ == INITIALIZED || !delegate_);
43  DCHECK(!socket_stream_);
44  DCHECK(!delegate_);
45}
46
47void WebSocket::Connect() {
48  DCHECK(ready_state_ == INITIALIZED);
49  DCHECK(request_.get());
50  DCHECK(delegate_);
51  DCHECK(!socket_stream_);
52  DCHECK(MessageLoop::current() == origin_loop_);
53
54  socket_stream_ = new SocketStream(request_->url(), this);
55  socket_stream_->set_context(request_->context());
56
57  if (request_->host_resolver())
58    socket_stream_->SetHostResolver(request_->host_resolver());
59  if (request_->client_socket_factory())
60    socket_stream_->SetClientSocketFactory(request_->client_socket_factory());
61
62  AddRef();  // Release in DoClose().
63  ready_state_ = CONNECTING;
64  socket_stream_->Connect();
65}
66
67void WebSocket::Send(const std::string& msg) {
68  if (ready_state_ == CLOSING || ready_state_ == CLOSED) {
69    return;
70  }
71  if (client_closing_handshake_) {
72    // We must not send any data after we start the WebSocket closing handshake.
73    return;
74  }
75  DCHECK(ready_state_ == OPEN);
76  DCHECK(MessageLoop::current() == origin_loop_);
77
78  IOBufferWithSize* buf = new IOBufferWithSize(msg.size() + 2);
79  char* p = buf->data();
80  *p = '\0';
81  memcpy(p + 1, msg.data(), msg.size());
82  *(p + 1 + msg.size()) = '\xff';
83  pending_write_bufs_.push_back(make_scoped_refptr(buf));
84  SendPending();
85}
86
87void WebSocket::Close() {
88  DCHECK(MessageLoop::current() == origin_loop_);
89
90  // If connection has not yet started, do nothing.
91  if (ready_state_ == INITIALIZED) {
92    DCHECK(!socket_stream_);
93    ready_state_ = CLOSED;
94    return;
95  }
96
97  // If the readyState attribute is in the CLOSING or CLOSED state, do nothing
98  if (ready_state_ == CLOSING || ready_state_ == CLOSED)
99    return;
100
101  if (request_->version() == DRAFT75) {
102    DCHECK(socket_stream_);
103    socket_stream_->Close();
104    return;
105  }
106
107  // If the WebSocket connection is not yet established, fail the WebSocket
108  // connection and set the readyState attribute's value to CLOSING.
109  if (ready_state_ == CONNECTING) {
110    ready_state_ = CLOSING;
111    origin_loop_->PostTask(
112        FROM_HERE,
113        NewRunnableMethod(this, &WebSocket::FailConnection));
114  }
115
116  // If the WebSocket closing handshake has not yet been started, start
117  // the WebSocket closing handshake and set the readyState attribute's value
118  // to CLOSING.
119  if (!closing_handshake_started_) {
120    ready_state_ = CLOSING;
121    origin_loop_->PostTask(
122        FROM_HERE,
123        NewRunnableMethod(this, &WebSocket::StartClosingHandshake));
124  }
125
126  // Otherwise, set the readyState attribute's value to CLOSING.
127  ready_state_ = CLOSING;
128}
129
130void WebSocket::DetachDelegate() {
131  if (!delegate_)
132    return;
133  delegate_ = NULL;
134  if (ready_state_ == INITIALIZED) {
135    DCHECK(!socket_stream_);
136    ready_state_ = CLOSED;
137    return;
138  }
139  if (ready_state_ != CLOSED) {
140    DCHECK(socket_stream_);
141    socket_stream_->Close();
142  }
143}
144
145void WebSocket::OnConnected(SocketStream* socket_stream,
146                            int max_pending_send_allowed) {
147  DCHECK(socket_stream == socket_stream_);
148  max_pending_send_allowed_ = max_pending_send_allowed;
149
150  // Use |max_pending_send_allowed| as hint for initial size of read buffer.
151  current_read_buf_ = new GrowableIOBuffer();
152  current_read_buf_->SetCapacity(max_pending_send_allowed_);
153  read_consumed_len_ = 0;
154
155  DCHECK(!current_write_buf_);
156  DCHECK(!handshake_.get());
157  switch (request_->version()) {
158    case DEFAULT_VERSION:
159      handshake_.reset(new WebSocketHandshake(
160          request_->url(), request_->origin(), request_->location(),
161          request_->protocol()));
162      break;
163    case DRAFT75:
164      handshake_.reset(new WebSocketHandshakeDraft75(
165          request_->url(), request_->origin(), request_->location(),
166          request_->protocol()));
167      break;
168    default:
169      NOTREACHED() << "Unexpected protocol version:" << request_->version();
170  }
171
172  const std::string msg = handshake_->CreateClientHandshakeMessage();
173  IOBufferWithSize* buf = new IOBufferWithSize(msg.size());
174  memcpy(buf->data(), msg.data(), msg.size());
175  pending_write_bufs_.push_back(make_scoped_refptr(buf));
176  origin_loop_->PostTask(FROM_HERE,
177                         NewRunnableMethod(this, &WebSocket::SendPending));
178}
179
180void WebSocket::OnSentData(SocketStream* socket_stream, int amount_sent) {
181  DCHECK(socket_stream == socket_stream_);
182  DCHECK(current_write_buf_);
183  current_write_buf_->DidConsume(amount_sent);
184  DCHECK_GE(current_write_buf_->BytesRemaining(), 0);
185  if (current_write_buf_->BytesRemaining() == 0) {
186    current_write_buf_ = NULL;
187    pending_write_bufs_.pop_front();
188  }
189  origin_loop_->PostTask(FROM_HERE,
190                         NewRunnableMethod(this, &WebSocket::SendPending));
191}
192
193void WebSocket::OnReceivedData(SocketStream* socket_stream,
194                               const char* data, int len) {
195  DCHECK(socket_stream == socket_stream_);
196  AddToReadBuffer(data, len);
197  origin_loop_->PostTask(FROM_HERE,
198                         NewRunnableMethod(this, &WebSocket::DoReceivedData));
199}
200
201void WebSocket::OnClose(SocketStream* socket_stream) {
202  origin_loop_->PostTask(FROM_HERE,
203                         NewRunnableMethod(this, &WebSocket::DoClose));
204}
205
206void WebSocket::OnError(const SocketStream* socket_stream, int error) {
207  origin_loop_->PostTask(
208      FROM_HERE, NewRunnableMethod(this, &WebSocket::DoSocketError, error));
209}
210
211void WebSocket::SendPending() {
212  DCHECK(MessageLoop::current() == origin_loop_);
213  if (!socket_stream_) {
214    DCHECK_EQ(CLOSED, ready_state_);
215    return;
216  }
217  if (!current_write_buf_) {
218    if (pending_write_bufs_.empty()) {
219      if (client_closing_handshake_) {
220        // Already sent 0xFF and 0x00 bytes.
221        // *The WebSocket closing handshake has started.*
222        closing_handshake_started_ = true;
223        if (server_closing_handshake_) {
224          // 4.2 3-8-3 If the WebSocket connection is not already closed,
225          // then close the WebSocket connection.
226          // *The WebSocket closing handshake has finished*
227          socket_stream_->Close();
228        } else {
229          // 5. Wait a user-agent-determined length of time, or until the
230          // WebSocket connection is closed.
231          force_close_task_ =
232              NewRunnableMethod(this, &WebSocket::DoForceCloseConnection);
233          origin_loop_->PostDelayedTask(
234              FROM_HERE, force_close_task_, closing_handshake_timeout_);
235        }
236      }
237      return;
238    }
239    current_write_buf_ = new DrainableIOBuffer(
240        pending_write_bufs_.front(), pending_write_bufs_.front()->size());
241  }
242  DCHECK_GT(current_write_buf_->BytesRemaining(), 0);
243  bool sent = socket_stream_->SendData(
244      current_write_buf_->data(),
245      std::min(current_write_buf_->BytesRemaining(),
246               max_pending_send_allowed_));
247  DCHECK(sent);
248}
249
250void WebSocket::DoReceivedData() {
251  DCHECK(MessageLoop::current() == origin_loop_);
252  scoped_refptr<WebSocket> protect(this);
253  switch (ready_state_) {
254    case CONNECTING:
255      {
256        DCHECK(handshake_.get());
257        DCHECK(current_read_buf_);
258        const char* data =
259            current_read_buf_->StartOfBuffer() + read_consumed_len_;
260        size_t len = current_read_buf_->offset() - read_consumed_len_;
261        int eoh = handshake_->ReadServerHandshake(data, len);
262        if (eoh < 0) {
263          // Not enough data,  Retry when more data is available.
264          return;
265        }
266        SkipReadBuffer(eoh);
267      }
268      if (handshake_->mode() != WebSocketHandshake::MODE_CONNECTED) {
269        // Handshake failed.
270        socket_stream_->Close();
271        return;
272      }
273      ready_state_ = OPEN;
274      if (delegate_)
275        delegate_->OnOpen(this);
276      if (current_read_buf_->offset() == read_consumed_len_) {
277        // No remaining data after handshake message.
278        break;
279      }
280      // FALL THROUGH
281    case OPEN:
282    case CLOSING:  // need to process closing-frame from server.
283      ProcessFrameData();
284      break;
285
286    case CLOSED:
287      // Closed just after DoReceivedData is queued on |origin_loop_|.
288      break;
289    default:
290      NOTREACHED();
291      break;
292  }
293}
294
295void WebSocket::ProcessFrameData() {
296  DCHECK(current_read_buf_);
297  if (server_closing_handshake_) {
298    // Any data on the connection after the 0xFF frame is discarded.
299    return;
300  }
301  scoped_refptr<WebSocket> protect(this);
302  const char* start_frame =
303      current_read_buf_->StartOfBuffer() + read_consumed_len_;
304  const char* next_frame = start_frame;
305  const char* p = next_frame;
306  const char* end =
307      current_read_buf_->StartOfBuffer() + current_read_buf_->offset();
308  while (p < end) {
309    // Let /error/ be false.
310    bool error = false;
311
312    // Handle the /frame type/ byte as follows.
313    unsigned char frame_byte = static_cast<unsigned char>(*p++);
314    if ((frame_byte & 0x80) == 0x80) {
315      int length = 0;
316      while (p < end) {
317        if (length > std::numeric_limits<int>::max() / 128) {
318          // frame length overflow.
319          socket_stream_->Close();
320          return;
321        }
322        unsigned char c = static_cast<unsigned char>(*p);
323        length = length * 128 + (c & 0x7f);
324        ++p;
325        if ((c & 0x80) != 0x80)
326          break;
327      }
328      // Checks if the frame body hasn't been completely received yet.
329      // It also checks the case the frame length bytes haven't been completely
330      // received yet, because p == end and length > 0 in such case.
331      if (p + length < end) {
332        p += length;
333        next_frame = p;
334        if (request_->version() != DRAFT75 &&
335            frame_byte == 0xFF && length == 0) {
336          // 4.2 Data framing 3. Handle the /frame type/ byte.
337          // 8. If the /frame type/ is 0xFF and the /length/ was 0, then
338          // run the following substeps:
339          // 1. If the WebSocket closing handshake has not yet started, then
340          // start the WebSocket closing handshake.
341          server_closing_handshake_ = true;
342          if (!closing_handshake_started_) {
343            origin_loop_->PostTask(
344                FROM_HERE,
345                NewRunnableMethod(this, &WebSocket::StartClosingHandshake));
346          } else {
347            // If the WebSocket closing handshake has been started and
348            // the WebSocket connection is not already closed, then close
349            // the WebSocket connection.
350            socket_stream_->Close();
351          }
352          return;
353        }
354        // 4.2 3-8 Otherwise, let /error/ be true.
355        error = true;
356      } else {
357        // Not enough data in buffer.
358        break;
359      }
360    } else {
361      const char* msg_start = p;
362      while (p < end && *p != '\xff')
363        ++p;
364      if (p < end && *p == '\xff') {
365        if (frame_byte == 0x00) {
366          if (delegate_) {
367            delegate_->OnMessage(this, std::string(msg_start, p - msg_start));
368          }
369        } else {
370          // Otherwise, discard the data and let /error/ to be true.
371          error = true;
372        }
373        ++p;
374        next_frame = p;
375      }
376    }
377    // If /error/ is true, then *a WebSocket error has been detected.*
378    if (error && delegate_)
379      delegate_->OnError(this);
380  }
381  SkipReadBuffer(next_frame - start_frame);
382}
383
384void WebSocket::AddToReadBuffer(const char* data, int len) {
385  DCHECK(current_read_buf_);
386  // Check if |current_read_buf_| has enough space to store |len| of |data|.
387  if (len >= current_read_buf_->RemainingCapacity()) {
388    current_read_buf_->SetCapacity(
389        current_read_buf_->offset() + len);
390  }
391
392  DCHECK(current_read_buf_->RemainingCapacity() >= len);
393  memcpy(current_read_buf_->data(), data, len);
394  current_read_buf_->set_offset(current_read_buf_->offset() + len);
395}
396
397void WebSocket::SkipReadBuffer(int len) {
398  if (len == 0)
399    return;
400  DCHECK_GT(len, 0);
401  read_consumed_len_ += len;
402  int remaining = current_read_buf_->offset() - read_consumed_len_;
403  DCHECK_GE(remaining, 0);
404  if (remaining < read_consumed_len_ &&
405      current_read_buf_->RemainingCapacity() < read_consumed_len_) {
406    // Pre compaction:
407    // 0             v-read_consumed_len_  v-offset               v- capacity
408    // |..processed..| .. remaining ..     | .. RemainingCapacity |
409    //
410    memmove(current_read_buf_->StartOfBuffer(),
411            current_read_buf_->StartOfBuffer() + read_consumed_len_,
412            remaining);
413    read_consumed_len_ = 0;
414    current_read_buf_->set_offset(remaining);
415    // Post compaction:
416    // 0read_consumed_len_  v- offset                             v- capacity
417    // |.. remaining ..     | ..  RemainingCapacity  ...          |
418    //
419  }
420}
421
422void WebSocket::StartClosingHandshake() {
423  // 4.2 *start the WebSocket closing handshake*.
424  if (closing_handshake_started_ || client_closing_handshake_) {
425    // 1. If the WebSocket closing handshake has started, then abort these
426    // steps.
427    return;
428  }
429  // 2.,3. Send a 0xFF and 0x00 byte to the server.
430  client_closing_handshake_ = true;
431  IOBufferWithSize* buf = new IOBufferWithSize(2);
432  memcpy(buf->data(), kClosingFrame, 2);
433  pending_write_bufs_.push_back(make_scoped_refptr(buf));
434  SendPending();
435}
436
437void WebSocket::DoForceCloseConnection() {
438  // 4.2 *start the WebSocket closing handshake*
439  // 6. If the WebSocket connection is not already closed, then close the
440  // WebSocket connection.  (If this happens, then the closing handshake
441  // doesn't finish.)
442  DCHECK(MessageLoop::current() == origin_loop_);
443  force_close_task_ = NULL;
444  FailConnection();
445}
446
447void WebSocket::FailConnection() {
448  DCHECK(MessageLoop::current() == origin_loop_);
449  // 6.1 Client-initiated closure.
450  // *fail the WebSocket connection*.
451  // the user agent must close the WebSocket connection, and may report the
452  // problem to the user.
453  if (!socket_stream_)
454    return;
455  socket_stream_->Close();
456}
457
458void WebSocket::DoClose() {
459  DCHECK(MessageLoop::current() == origin_loop_);
460  if (force_close_task_) {
461    // WebSocket connection is closed while waiting a user-agent-determined
462    // length of time after *The WebSocket closing handshake has started*.
463    force_close_task_->Cancel();
464    force_close_task_ = NULL;
465  }
466  WebSocketDelegate* delegate = delegate_;
467  delegate_ = NULL;
468  ready_state_ = CLOSED;
469  if (!socket_stream_)
470    return;
471  socket_stream_ = NULL;
472  if (delegate)
473    delegate->OnClose(this,
474                      server_closing_handshake_ && closing_handshake_started_);
475  Release();
476}
477
478void WebSocket::DoSocketError(int error) {
479  DCHECK(MessageLoop::current() == origin_loop_);
480  if (delegate_)
481    delegate_->OnSocketError(this, error);
482}
483
484}  // namespace net
485