1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/websockets/websocket_channel.h"
6
7#include <limits.h>  // for INT_MAX
8
9#include <algorithm>
10#include <deque>
11
12#include "base/basictypes.h"  // for size_t
13#include "base/big_endian.h"
14#include "base/bind.h"
15#include "base/compiler_specific.h"
16#include "base/memory/ref_counted.h"
17#include "base/memory/weak_ptr.h"
18#include "base/message_loop/message_loop.h"
19#include "base/metrics/histogram.h"
20#include "base/numerics/safe_conversions.h"
21#include "base/stl_util.h"
22#include "base/strings/stringprintf.h"
23#include "base/time/time.h"
24#include "net/base/io_buffer.h"
25#include "net/base/net_log.h"
26#include "net/http/http_request_headers.h"
27#include "net/http/http_response_headers.h"
28#include "net/http/http_util.h"
29#include "net/websockets/websocket_errors.h"
30#include "net/websockets/websocket_event_interface.h"
31#include "net/websockets/websocket_frame.h"
32#include "net/websockets/websocket_handshake_request_info.h"
33#include "net/websockets/websocket_handshake_response_info.h"
34#include "net/websockets/websocket_mux.h"
35#include "net/websockets/websocket_stream.h"
36#include "url/origin.h"
37
38namespace net {
39
40namespace {
41
42using base::StreamingUtf8Validator;
43
44const int kDefaultSendQuotaLowWaterMark = 1 << 16;
45const int kDefaultSendQuotaHighWaterMark = 1 << 17;
46const size_t kWebSocketCloseCodeLength = 2;
47// This timeout is based on TCPMaximumSegmentLifetime * 2 from
48// MainThreadWebSocketChannel.cpp in Blink.
49const int kClosingHandshakeTimeoutSeconds = 2 * 2 * 60;
50
51typedef WebSocketEventInterface::ChannelState ChannelState;
52const ChannelState CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE;
53const ChannelState CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED;
54
55// Maximum close reason length = max control frame payload -
56//                               status code length
57//                             = 125 - 2
58const size_t kMaximumCloseReasonLength = 125 - kWebSocketCloseCodeLength;
59
60// Check a close status code for strict compliance with RFC6455. This is only
61// used for close codes received from a renderer that we are intending to send
62// out over the network. See ParseClose() for the restrictions on incoming close
63// codes. The |code| parameter is type int for convenience of implementation;
64// the real type is uint16. Code 1005 is treated specially; it cannot be set
65// explicitly by Javascript but the renderer uses it to indicate we should send
66// a Close frame with no payload.
67bool IsStrictlyValidCloseStatusCode(int code) {
68  static const int kInvalidRanges[] = {
69      // [BAD, OK)
70      0,    1000,   // 1000 is the first valid code
71      1006, 1007,   // 1006 MUST NOT be set.
72      1014, 3000,   // 1014 unassigned; 1015 up to 2999 are reserved.
73      5000, 65536,  // Codes above 5000 are invalid.
74  };
75  const int* const kInvalidRangesEnd =
76      kInvalidRanges + arraysize(kInvalidRanges);
77
78  DCHECK_GE(code, 0);
79  DCHECK_LT(code, 65536);
80  const int* upper = std::upper_bound(kInvalidRanges, kInvalidRangesEnd, code);
81  DCHECK_NE(kInvalidRangesEnd, upper);
82  DCHECK_GT(upper, kInvalidRanges);
83  DCHECK_GT(*upper, code);
84  DCHECK_LE(*(upper - 1), code);
85  return ((upper - kInvalidRanges) % 2) == 0;
86}
87
88// This function avoids a bunch of boilerplate code.
89void AllowUnused(ChannelState ALLOW_UNUSED unused) {}
90
91// Sets |name| to the name of the frame type for the given |opcode|. Note that
92// for all of Text, Binary and Continuation opcode, this method returns
93// "Data frame".
94void GetFrameTypeForOpcode(WebSocketFrameHeader::OpCode opcode,
95                           std::string* name) {
96  switch (opcode) {
97    case WebSocketFrameHeader::kOpCodeText:    // fall-thru
98    case WebSocketFrameHeader::kOpCodeBinary:  // fall-thru
99    case WebSocketFrameHeader::kOpCodeContinuation:
100      *name = "Data frame";
101      break;
102
103    case WebSocketFrameHeader::kOpCodePing:
104      *name = "Ping";
105      break;
106
107    case WebSocketFrameHeader::kOpCodePong:
108      *name = "Pong";
109      break;
110
111    case WebSocketFrameHeader::kOpCodeClose:
112      *name = "Close";
113      break;
114
115    default:
116      *name = "Unknown frame type";
117      break;
118  }
119
120  return;
121}
122
123}  // namespace
124
125// A class to encapsulate a set of frames and information about the size of
126// those frames.
127class WebSocketChannel::SendBuffer {
128 public:
129  SendBuffer() : total_bytes_(0) {}
130
131  // Add a WebSocketFrame to the buffer and increase total_bytes_.
132  void AddFrame(scoped_ptr<WebSocketFrame> chunk);
133
134  // Return a pointer to the frames_ for write purposes.
135  ScopedVector<WebSocketFrame>* frames() { return &frames_; }
136
137 private:
138  // The frames_ that will be sent in the next call to WriteFrames().
139  ScopedVector<WebSocketFrame> frames_;
140
141  // The total size of the payload data in |frames_|. This will be used to
142  // measure the throughput of the link.
143  // TODO(ricea): Measure the throughput of the link.
144  size_t total_bytes_;
145};
146
147void WebSocketChannel::SendBuffer::AddFrame(scoped_ptr<WebSocketFrame> frame) {
148  total_bytes_ += frame->header.payload_length;
149  frames_.push_back(frame.release());
150}
151
152// Implementation of WebSocketStream::ConnectDelegate that simply forwards the
153// calls on to the WebSocketChannel that created it.
154class WebSocketChannel::ConnectDelegate
155    : public WebSocketStream::ConnectDelegate {
156 public:
157  explicit ConnectDelegate(WebSocketChannel* creator) : creator_(creator) {}
158
159  virtual void OnSuccess(scoped_ptr<WebSocketStream> stream) OVERRIDE {
160    creator_->OnConnectSuccess(stream.Pass());
161    // |this| may have been deleted.
162  }
163
164  virtual void OnFailure(const std::string& message) OVERRIDE {
165    creator_->OnConnectFailure(message);
166    // |this| has been deleted.
167  }
168
169  virtual void OnStartOpeningHandshake(
170      scoped_ptr<WebSocketHandshakeRequestInfo> request) OVERRIDE {
171    creator_->OnStartOpeningHandshake(request.Pass());
172  }
173
174  virtual void OnFinishOpeningHandshake(
175      scoped_ptr<WebSocketHandshakeResponseInfo> response) OVERRIDE {
176    creator_->OnFinishOpeningHandshake(response.Pass());
177  }
178
179  virtual void OnSSLCertificateError(
180      scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks>
181          ssl_error_callbacks,
182      const SSLInfo& ssl_info,
183      bool fatal) OVERRIDE {
184    creator_->OnSSLCertificateError(
185        ssl_error_callbacks.Pass(), ssl_info, fatal);
186  }
187
188 private:
189  // A pointer to the WebSocketChannel that created this object. There is no
190  // danger of this pointer being stale, because deleting the WebSocketChannel
191  // cancels the connect process, deleting this object and preventing its
192  // callbacks from being called.
193  WebSocketChannel* const creator_;
194
195  DISALLOW_COPY_AND_ASSIGN(ConnectDelegate);
196};
197
198class WebSocketChannel::HandshakeNotificationSender
199    : public base::SupportsWeakPtr<HandshakeNotificationSender> {
200 public:
201  explicit HandshakeNotificationSender(WebSocketChannel* channel);
202  ~HandshakeNotificationSender();
203
204  static void Send(base::WeakPtr<HandshakeNotificationSender> sender);
205
206  ChannelState SendImmediately(WebSocketEventInterface* event_interface);
207
208  const WebSocketHandshakeRequestInfo* handshake_request_info() const {
209    return handshake_request_info_.get();
210  }
211
212  void set_handshake_request_info(
213      scoped_ptr<WebSocketHandshakeRequestInfo> request_info) {
214    handshake_request_info_ = request_info.Pass();
215  }
216
217  const WebSocketHandshakeResponseInfo* handshake_response_info() const {
218    return handshake_response_info_.get();
219  }
220
221  void set_handshake_response_info(
222      scoped_ptr<WebSocketHandshakeResponseInfo> response_info) {
223    handshake_response_info_ = response_info.Pass();
224  }
225
226 private:
227  WebSocketChannel* owner_;
228  scoped_ptr<WebSocketHandshakeRequestInfo> handshake_request_info_;
229  scoped_ptr<WebSocketHandshakeResponseInfo> handshake_response_info_;
230};
231
232WebSocketChannel::HandshakeNotificationSender::HandshakeNotificationSender(
233    WebSocketChannel* channel)
234    : owner_(channel) {}
235
236WebSocketChannel::HandshakeNotificationSender::~HandshakeNotificationSender() {}
237
238void WebSocketChannel::HandshakeNotificationSender::Send(
239    base::WeakPtr<HandshakeNotificationSender> sender) {
240  // Do nothing if |sender| is already destructed.
241  if (sender) {
242    WebSocketChannel* channel = sender->owner_;
243    AllowUnused(sender->SendImmediately(channel->event_interface_.get()));
244  }
245}
246
247ChannelState WebSocketChannel::HandshakeNotificationSender::SendImmediately(
248    WebSocketEventInterface* event_interface) {
249
250  if (handshake_request_info_.get()) {
251    if (CHANNEL_DELETED == event_interface->OnStartOpeningHandshake(
252                               handshake_request_info_.Pass()))
253      return CHANNEL_DELETED;
254  }
255
256  if (handshake_response_info_.get()) {
257    if (CHANNEL_DELETED == event_interface->OnFinishOpeningHandshake(
258                               handshake_response_info_.Pass()))
259      return CHANNEL_DELETED;
260
261    // TODO(yhirano): We can release |this| to save memory because
262    // there will be no more opening handshake notification.
263  }
264
265  return CHANNEL_ALIVE;
266}
267
268WebSocketChannel::PendingReceivedFrame::PendingReceivedFrame(
269    bool final,
270    WebSocketFrameHeader::OpCode opcode,
271    const scoped_refptr<IOBuffer>& data,
272    size_t offset,
273    size_t size)
274    : final_(final),
275      opcode_(opcode),
276      data_(data),
277      offset_(offset),
278      size_(size) {}
279
280WebSocketChannel::PendingReceivedFrame::~PendingReceivedFrame() {}
281
282void WebSocketChannel::PendingReceivedFrame::ResetOpcode() {
283  DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(opcode_));
284  opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
285}
286
287void WebSocketChannel::PendingReceivedFrame::DidConsume(size_t bytes) {
288  DCHECK_LE(offset_, size_);
289  DCHECK_LE(bytes, size_ - offset_);
290  offset_ += bytes;
291}
292
293WebSocketChannel::WebSocketChannel(
294    scoped_ptr<WebSocketEventInterface> event_interface,
295    URLRequestContext* url_request_context)
296    : event_interface_(event_interface.Pass()),
297      url_request_context_(url_request_context),
298      send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark),
299      send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark),
300      current_send_quota_(0),
301      current_receive_quota_(0),
302      timeout_(base::TimeDelta::FromSeconds(kClosingHandshakeTimeoutSeconds)),
303      received_close_code_(0),
304      state_(FRESHLY_CONSTRUCTED),
305      notification_sender_(new HandshakeNotificationSender(this)),
306      sending_text_message_(false),
307      receiving_text_message_(false),
308      expecting_to_handle_continuation_(false),
309      initial_frame_forwarded_(false) {}
310
311WebSocketChannel::~WebSocketChannel() {
312  // The stream may hold a pointer to read_frames_, and so it needs to be
313  // destroyed first.
314  stream_.reset();
315  // The timer may have a callback pointing back to us, so stop it just in case
316  // someone decides to run the event loop from their destructor.
317  timer_.Stop();
318}
319
320void WebSocketChannel::SendAddChannelRequest(
321    const GURL& socket_url,
322    const std::vector<std::string>& requested_subprotocols,
323    const url::Origin& origin) {
324  // Delegate to the tested version.
325  SendAddChannelRequestWithSuppliedCreator(
326      socket_url,
327      requested_subprotocols,
328      origin,
329      base::Bind(&WebSocketStream::CreateAndConnectStream));
330}
331
332void WebSocketChannel::SetState(State new_state) {
333  DCHECK_NE(state_, new_state);
334
335  if (new_state == CONNECTED)
336    established_on_ = base::TimeTicks::Now();
337  if (state_ == CONNECTED && !established_on_.is_null()) {
338    UMA_HISTOGRAM_LONG_TIMES(
339        "Net.WebSocket.Duration", base::TimeTicks::Now() - established_on_);
340  }
341
342  state_ = new_state;
343}
344
345bool WebSocketChannel::InClosingState() const {
346  // The state RECV_CLOSED is not supported here, because it is only used in one
347  // code path and should not leak into the code in general.
348  DCHECK_NE(RECV_CLOSED, state_)
349      << "InClosingState called with state_ == RECV_CLOSED";
350  return state_ == SEND_CLOSED || state_ == CLOSE_WAIT || state_ == CLOSED;
351}
352
353void WebSocketChannel::SendFrame(bool fin,
354                                 WebSocketFrameHeader::OpCode op_code,
355                                 const std::vector<char>& data) {
356  if (data.size() > INT_MAX) {
357    NOTREACHED() << "Frame size sanity check failed";
358    return;
359  }
360  if (stream_ == NULL) {
361    LOG(DFATAL) << "Got SendFrame without a connection established; "
362                << "misbehaving renderer? fin=" << fin << " op_code=" << op_code
363                << " data.size()=" << data.size();
364    return;
365  }
366  if (InClosingState()) {
367    DVLOG(1) << "SendFrame called in state " << state_
368             << ". This may be a bug, or a harmless race.";
369    return;
370  }
371  if (state_ != CONNECTED) {
372    NOTREACHED() << "SendFrame() called in state " << state_;
373    return;
374  }
375  if (data.size() > base::checked_cast<size_t>(current_send_quota_)) {
376    // TODO(ricea): Kill renderer.
377    AllowUnused(
378        FailChannel("Send quota exceeded", kWebSocketErrorGoingAway, ""));
379    // |this| has been deleted.
380    return;
381  }
382  if (!WebSocketFrameHeader::IsKnownDataOpCode(op_code)) {
383    LOG(DFATAL) << "Got SendFrame with bogus op_code " << op_code
384                << "; misbehaving renderer? fin=" << fin
385                << " data.size()=" << data.size();
386    return;
387  }
388  if (op_code == WebSocketFrameHeader::kOpCodeText ||
389      (op_code == WebSocketFrameHeader::kOpCodeContinuation &&
390       sending_text_message_)) {
391    StreamingUtf8Validator::State state =
392        outgoing_utf8_validator_.AddBytes(vector_as_array(&data), data.size());
393    if (state == StreamingUtf8Validator::INVALID ||
394        (state == StreamingUtf8Validator::VALID_MIDPOINT && fin)) {
395      // TODO(ricea): Kill renderer.
396      AllowUnused(
397          FailChannel("Browser sent a text frame containing invalid UTF-8",
398                      kWebSocketErrorGoingAway,
399                      ""));
400      // |this| has been deleted.
401      return;
402    }
403    sending_text_message_ = !fin;
404    DCHECK(!fin || state == StreamingUtf8Validator::VALID_ENDPOINT);
405  }
406  current_send_quota_ -= data.size();
407  // TODO(ricea): If current_send_quota_ has dropped below
408  // send_quota_low_water_mark_, it might be good to increase the "low
409  // water mark" and "high water mark", but only if the link to the WebSocket
410  // server is not saturated.
411  scoped_refptr<IOBuffer> buffer(new IOBuffer(data.size()));
412  std::copy(data.begin(), data.end(), buffer->data());
413  AllowUnused(SendFrameFromIOBuffer(fin, op_code, buffer, data.size()));
414  // |this| may have been deleted.
415}
416
417void WebSocketChannel::SendFlowControl(int64 quota) {
418  DCHECK(state_ == CONNECTING || state_ == CONNECTED || state_ == SEND_CLOSED ||
419         state_ == CLOSE_WAIT);
420  // TODO(ricea): Kill the renderer if it tries to send us a negative quota
421  // value or > INT_MAX.
422  DCHECK_GE(quota, 0);
423  DCHECK_LE(quota, INT_MAX);
424  if (!pending_received_frames_.empty()) {
425    DCHECK_EQ(0, current_receive_quota_);
426  }
427  while (!pending_received_frames_.empty() && quota > 0) {
428    PendingReceivedFrame& front = pending_received_frames_.front();
429    const size_t data_size = front.size() - front.offset();
430    const size_t bytes_to_send =
431        std::min(base::checked_cast<size_t>(quota), data_size);
432    const bool final = front.final() && data_size == bytes_to_send;
433    const char* data =
434        front.data().get() ? front.data()->data() + front.offset() : NULL;
435    DCHECK(!bytes_to_send || data) << "Non empty data should not be null.";
436    const std::vector<char> data_vector(data, data + bytes_to_send);
437    DVLOG(3) << "Sending frame previously split due to quota to the "
438             << "renderer: quota=" << quota << " data_size=" << data_size
439             << " bytes_to_send=" << bytes_to_send;
440    if (event_interface_->OnDataFrame(final, front.opcode(), data_vector) ==
441        CHANNEL_DELETED)
442      return;
443    if (bytes_to_send < data_size) {
444      front.DidConsume(bytes_to_send);
445      front.ResetOpcode();
446      return;
447    }
448    const int64 signed_bytes_to_send = base::checked_cast<int64>(bytes_to_send);
449    DCHECK_GE(quota, signed_bytes_to_send);
450    quota -= signed_bytes_to_send;
451
452    pending_received_frames_.pop();
453  }
454  // If current_receive_quota_ == 0 then there is no pending ReadFrames()
455  // operation.
456  const bool start_read =
457      current_receive_quota_ == 0 && quota > 0 &&
458      (state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT);
459  current_receive_quota_ += base::checked_cast<int>(quota);
460  if (start_read)
461    AllowUnused(ReadFrames());
462  // |this| may have been deleted.
463}
464
465void WebSocketChannel::StartClosingHandshake(uint16 code,
466                                             const std::string& reason) {
467  if (InClosingState()) {
468    // When the associated renderer process is killed while the channel is in
469    // CLOSING state we reach here.
470    DVLOG(1) << "StartClosingHandshake called in state " << state_
471             << ". This may be a bug, or a harmless race.";
472    return;
473  }
474  if (state_ == CONNECTING) {
475    // Abort the in-progress handshake and drop the connection immediately.
476    stream_request_.reset();
477    SetState(CLOSED);
478    AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
479    return;
480  }
481  if (state_ != CONNECTED) {
482    NOTREACHED() << "StartClosingHandshake() called in state " << state_;
483    return;
484  }
485  // Javascript actually only permits 1000 and 3000-4999, but the implementation
486  // itself may produce different codes. The length of |reason| is also checked
487  // by Javascript.
488  if (!IsStrictlyValidCloseStatusCode(code) ||
489      reason.size() > kMaximumCloseReasonLength) {
490    // "InternalServerError" is actually used for errors from any endpoint, per
491    // errata 3227 to RFC6455. If the renderer is sending us an invalid code or
492    // reason it must be malfunctioning in some way, and based on that we
493    // interpret this as an internal error.
494    if (SendClose(kWebSocketErrorInternalServerError, "") != CHANNEL_DELETED) {
495      DCHECK_EQ(CONNECTED, state_);
496      SetState(SEND_CLOSED);
497    }
498    return;
499  }
500  if (SendClose(
501          code,
502          StreamingUtf8Validator::Validate(reason) ? reason : std::string()) ==
503      CHANNEL_DELETED)
504    return;
505  DCHECK_EQ(CONNECTED, state_);
506  SetState(SEND_CLOSED);
507}
508
509void WebSocketChannel::SendAddChannelRequestForTesting(
510    const GURL& socket_url,
511    const std::vector<std::string>& requested_subprotocols,
512    const url::Origin& origin,
513    const WebSocketStreamCreator& creator) {
514  SendAddChannelRequestWithSuppliedCreator(
515      socket_url, requested_subprotocols, origin, creator);
516}
517
518void WebSocketChannel::SetClosingHandshakeTimeoutForTesting(
519    base::TimeDelta delay) {
520  timeout_ = delay;
521}
522
523void WebSocketChannel::SendAddChannelRequestWithSuppliedCreator(
524    const GURL& socket_url,
525    const std::vector<std::string>& requested_subprotocols,
526    const url::Origin& origin,
527    const WebSocketStreamCreator& creator) {
528  DCHECK_EQ(FRESHLY_CONSTRUCTED, state_);
529  if (!socket_url.SchemeIsWSOrWSS()) {
530    // TODO(ricea): Kill the renderer (this error should have been caught by
531    // Javascript).
532    AllowUnused(event_interface_->OnAddChannelResponse(true, "", ""));
533    // |this| is deleted here.
534    return;
535  }
536  socket_url_ = socket_url;
537  scoped_ptr<WebSocketStream::ConnectDelegate> connect_delegate(
538      new ConnectDelegate(this));
539  stream_request_ = creator.Run(socket_url_,
540                                requested_subprotocols,
541                                origin,
542                                url_request_context_,
543                                BoundNetLog(),
544                                connect_delegate.Pass());
545  SetState(CONNECTING);
546}
547
548void WebSocketChannel::OnConnectSuccess(scoped_ptr<WebSocketStream> stream) {
549  DCHECK(stream);
550  DCHECK_EQ(CONNECTING, state_);
551
552  stream_ = stream.Pass();
553
554  SetState(CONNECTED);
555
556  if (event_interface_->OnAddChannelResponse(
557          false, stream_->GetSubProtocol(), stream_->GetExtensions()) ==
558      CHANNEL_DELETED)
559    return;
560
561  // TODO(ricea): Get flow control information from the WebSocketStream once we
562  // have a multiplexing WebSocketStream.
563  current_send_quota_ = send_quota_high_water_mark_;
564  if (event_interface_->OnFlowControl(send_quota_high_water_mark_) ==
565      CHANNEL_DELETED)
566    return;
567
568  // |stream_request_| is not used once the connection has succeeded.
569  stream_request_.reset();
570
571  AllowUnused(ReadFrames());
572  // |this| may have been deleted.
573}
574
575void WebSocketChannel::OnConnectFailure(const std::string& message) {
576  DCHECK_EQ(CONNECTING, state_);
577
578  // Copy the message before we delete its owner.
579  std::string message_copy = message;
580
581  SetState(CLOSED);
582  stream_request_.reset();
583
584  if (CHANNEL_DELETED ==
585      notification_sender_->SendImmediately(event_interface_.get())) {
586    // |this| has been deleted.
587    return;
588  }
589  AllowUnused(event_interface_->OnFailChannel(message_copy));
590  // |this| has been deleted.
591}
592
593void WebSocketChannel::OnSSLCertificateError(
594    scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> ssl_error_callbacks,
595    const SSLInfo& ssl_info,
596    bool fatal) {
597  AllowUnused(event_interface_->OnSSLCertificateError(
598      ssl_error_callbacks.Pass(), socket_url_, ssl_info, fatal));
599}
600
601void WebSocketChannel::OnStartOpeningHandshake(
602    scoped_ptr<WebSocketHandshakeRequestInfo> request) {
603  DCHECK(!notification_sender_->handshake_request_info());
604
605  // Because it is hard to handle an IPC error synchronously is difficult,
606  // we asynchronously notify the information.
607  notification_sender_->set_handshake_request_info(request.Pass());
608  ScheduleOpeningHandshakeNotification();
609}
610
611void WebSocketChannel::OnFinishOpeningHandshake(
612    scoped_ptr<WebSocketHandshakeResponseInfo> response) {
613  DCHECK(!notification_sender_->handshake_response_info());
614
615  // Because it is hard to handle an IPC error synchronously is difficult,
616  // we asynchronously notify the information.
617  notification_sender_->set_handshake_response_info(response.Pass());
618  ScheduleOpeningHandshakeNotification();
619}
620
621void WebSocketChannel::ScheduleOpeningHandshakeNotification() {
622  base::MessageLoop::current()->PostTask(
623      FROM_HERE,
624      base::Bind(HandshakeNotificationSender::Send,
625                 notification_sender_->AsWeakPtr()));
626}
627
628ChannelState WebSocketChannel::WriteFrames() {
629  int result = OK;
630  do {
631    // This use of base::Unretained is safe because this object owns the
632    // WebSocketStream and destroying it cancels all callbacks.
633    result = stream_->WriteFrames(
634        data_being_sent_->frames(),
635        base::Bind(base::IgnoreResult(&WebSocketChannel::OnWriteDone),
636                   base::Unretained(this),
637                   false));
638    if (result != ERR_IO_PENDING) {
639      if (OnWriteDone(true, result) == CHANNEL_DELETED)
640        return CHANNEL_DELETED;
641      // OnWriteDone() returns CHANNEL_DELETED on error. Here |state_| is
642      // guaranteed to be the same as before OnWriteDone() call.
643    }
644  } while (result == OK && data_being_sent_);
645  return CHANNEL_ALIVE;
646}
647
648ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) {
649  DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
650  DCHECK_NE(CONNECTING, state_);
651  DCHECK_NE(ERR_IO_PENDING, result);
652  DCHECK(data_being_sent_);
653  switch (result) {
654    case OK:
655      if (data_to_send_next_) {
656        data_being_sent_ = data_to_send_next_.Pass();
657        if (!synchronous)
658          return WriteFrames();
659      } else {
660        data_being_sent_.reset();
661        if (current_send_quota_ < send_quota_low_water_mark_) {
662          // TODO(ricea): Increase low_water_mark and high_water_mark if
663          // throughput is high, reduce them if throughput is low.  Low water
664          // mark needs to be >= the bandwidth delay product *of the IPC
665          // channel*. Because factors like context-switch time, thread wake-up
666          // time, and bus speed come into play it is complex and probably needs
667          // to be determined empirically.
668          DCHECK_LE(send_quota_low_water_mark_, send_quota_high_water_mark_);
669          // TODO(ricea): Truncate quota by the quota specified by the remote
670          // server, if the protocol in use supports quota.
671          int fresh_quota = send_quota_high_water_mark_ - current_send_quota_;
672          current_send_quota_ += fresh_quota;
673          return event_interface_->OnFlowControl(fresh_quota);
674        }
675      }
676      return CHANNEL_ALIVE;
677
678    // If a recoverable error condition existed, it would go here.
679
680    default:
681      DCHECK_LT(result, 0)
682          << "WriteFrames() should only return OK or ERR_ codes";
683
684      stream_->Close();
685      SetState(CLOSED);
686      return DoDropChannel(false, kWebSocketErrorAbnormalClosure, "");
687  }
688}
689
690ChannelState WebSocketChannel::ReadFrames() {
691  int result = OK;
692  while (result == OK && current_receive_quota_ > 0) {
693    // This use of base::Unretained is safe because this object owns the
694    // WebSocketStream, and any pending reads will be cancelled when it is
695    // destroyed.
696    result = stream_->ReadFrames(
697        &read_frames_,
698        base::Bind(base::IgnoreResult(&WebSocketChannel::OnReadDone),
699                   base::Unretained(this),
700                   false));
701    if (result != ERR_IO_PENDING) {
702      if (OnReadDone(true, result) == CHANNEL_DELETED)
703        return CHANNEL_DELETED;
704    }
705    DCHECK_NE(CLOSED, state_);
706  }
707  return CHANNEL_ALIVE;
708}
709
710ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) {
711  DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
712  DCHECK_NE(CONNECTING, state_);
713  DCHECK_NE(ERR_IO_PENDING, result);
714  switch (result) {
715    case OK:
716      // ReadFrames() must use ERR_CONNECTION_CLOSED for a closed connection
717      // with no data read, not an empty response.
718      DCHECK(!read_frames_.empty())
719          << "ReadFrames() returned OK, but nothing was read.";
720      for (size_t i = 0; i < read_frames_.size(); ++i) {
721        scoped_ptr<WebSocketFrame> frame(read_frames_[i]);
722        read_frames_[i] = NULL;
723        if (HandleFrame(frame.Pass()) == CHANNEL_DELETED)
724          return CHANNEL_DELETED;
725      }
726      read_frames_.clear();
727      // There should always be a call to ReadFrames pending.
728      // TODO(ricea): Unless we are out of quota.
729      DCHECK_NE(CLOSED, state_);
730      if (!synchronous)
731        return ReadFrames();
732      return CHANNEL_ALIVE;
733
734    case ERR_WS_PROTOCOL_ERROR:
735      // This could be kWebSocketErrorProtocolError (specifically, non-minimal
736      // encoding of payload length) or kWebSocketErrorMessageTooBig, or an
737      // extension-specific error.
738      return FailChannel("Invalid frame header",
739                         kWebSocketErrorProtocolError,
740                         "WebSocket Protocol Error");
741
742    default:
743      DCHECK_LT(result, 0)
744          << "ReadFrames() should only return OK or ERR_ codes";
745
746      stream_->Close();
747      SetState(CLOSED);
748
749      uint16 code = kWebSocketErrorAbnormalClosure;
750      std::string reason = "";
751      bool was_clean = false;
752      if (received_close_code_ != 0) {
753        code = received_close_code_;
754        reason = received_close_reason_;
755        was_clean = (result == ERR_CONNECTION_CLOSED);
756      }
757
758      return DoDropChannel(was_clean, code, reason);
759  }
760}
761
762ChannelState WebSocketChannel::HandleFrame(scoped_ptr<WebSocketFrame> frame) {
763  if (frame->header.masked) {
764    // RFC6455 Section 5.1 "A client MUST close a connection if it detects a
765    // masked frame."
766    return FailChannel(
767        "A server must not mask any frames that it sends to the "
768        "client.",
769        kWebSocketErrorProtocolError,
770        "Masked frame from server");
771  }
772  const WebSocketFrameHeader::OpCode opcode = frame->header.opcode;
773  DCHECK(!WebSocketFrameHeader::IsKnownControlOpCode(opcode) ||
774         frame->header.final);
775  if (frame->header.reserved1 || frame->header.reserved2 ||
776      frame->header.reserved3) {
777    return FailChannel(base::StringPrintf(
778                           "One or more reserved bits are on: reserved1 = %d, "
779                           "reserved2 = %d, reserved3 = %d",
780                           static_cast<int>(frame->header.reserved1),
781                           static_cast<int>(frame->header.reserved2),
782                           static_cast<int>(frame->header.reserved3)),
783                       kWebSocketErrorProtocolError,
784                       "Invalid reserved bit");
785  }
786
787  // Respond to the frame appropriately to its type.
788  return HandleFrameByState(
789      opcode, frame->header.final, frame->data, frame->header.payload_length);
790}
791
792ChannelState WebSocketChannel::HandleFrameByState(
793    const WebSocketFrameHeader::OpCode opcode,
794    bool final,
795    const scoped_refptr<IOBuffer>& data_buffer,
796    size_t size) {
797  DCHECK_NE(RECV_CLOSED, state_)
798      << "HandleFrame() does not support being called re-entrantly from within "
799         "SendClose()";
800  DCHECK_NE(CLOSED, state_);
801  if (state_ == CLOSE_WAIT) {
802    std::string frame_name;
803    GetFrameTypeForOpcode(opcode, &frame_name);
804
805    // FailChannel() won't send another Close frame.
806    return FailChannel(
807        frame_name + " received after close", kWebSocketErrorProtocolError, "");
808  }
809  switch (opcode) {
810    case WebSocketFrameHeader::kOpCodeText:  // fall-thru
811    case WebSocketFrameHeader::kOpCodeBinary:
812    case WebSocketFrameHeader::kOpCodeContinuation:
813      return HandleDataFrame(opcode, final, data_buffer, size);
814
815    case WebSocketFrameHeader::kOpCodePing:
816      DVLOG(1) << "Got Ping of size " << size;
817      if (state_ == CONNECTED)
818        return SendFrameFromIOBuffer(
819            true, WebSocketFrameHeader::kOpCodePong, data_buffer, size);
820      DVLOG(3) << "Ignored ping in state " << state_;
821      return CHANNEL_ALIVE;
822
823    case WebSocketFrameHeader::kOpCodePong:
824      DVLOG(1) << "Got Pong of size " << size;
825      // There is no need to do anything with pong messages.
826      return CHANNEL_ALIVE;
827
828    case WebSocketFrameHeader::kOpCodeClose: {
829      // TODO(ricea): If there is a message which is queued for transmission to
830      // the renderer, then the renderer should not receive an
831      // OnClosingHandshake or OnDropChannel IPC until the queued message has
832      // been completedly transmitted.
833      uint16 code = kWebSocketNormalClosure;
834      std::string reason;
835      std::string message;
836      if (!ParseClose(data_buffer, size, &code, &reason, &message)) {
837        return FailChannel(message, code, reason);
838      }
839      // TODO(ricea): Find a way to safely log the message from the close
840      // message (escape control codes and so on).
841      DVLOG(1) << "Got Close with code " << code;
842      switch (state_) {
843        case CONNECTED:
844          SetState(RECV_CLOSED);
845          if (SendClose(code, reason) == CHANNEL_DELETED)
846            return CHANNEL_DELETED;
847          DCHECK_EQ(RECV_CLOSED, state_);
848          SetState(CLOSE_WAIT);
849
850          if (event_interface_->OnClosingHandshake() == CHANNEL_DELETED)
851            return CHANNEL_DELETED;
852          received_close_code_ = code;
853          received_close_reason_ = reason;
854          break;
855
856        case SEND_CLOSED:
857          SetState(CLOSE_WAIT);
858          // From RFC6455 section 7.1.5: "Each endpoint
859          // will see the status code sent by the other end as _The WebSocket
860          // Connection Close Code_."
861          received_close_code_ = code;
862          received_close_reason_ = reason;
863          break;
864
865        default:
866          LOG(DFATAL) << "Got Close in unexpected state " << state_;
867          break;
868      }
869      return CHANNEL_ALIVE;
870    }
871
872    default:
873      return FailChannel(
874          base::StringPrintf("Unrecognized frame opcode: %d", opcode),
875          kWebSocketErrorProtocolError,
876          "Unknown opcode");
877  }
878}
879
880ChannelState WebSocketChannel::HandleDataFrame(
881    WebSocketFrameHeader::OpCode opcode,
882    bool final,
883    const scoped_refptr<IOBuffer>& data_buffer,
884    size_t size) {
885  if (state_ != CONNECTED) {
886    DVLOG(3) << "Ignored data packet received in state " << state_;
887    return CHANNEL_ALIVE;
888  }
889  DCHECK(opcode == WebSocketFrameHeader::kOpCodeContinuation ||
890         opcode == WebSocketFrameHeader::kOpCodeText ||
891         opcode == WebSocketFrameHeader::kOpCodeBinary);
892  const bool got_continuation =
893      (opcode == WebSocketFrameHeader::kOpCodeContinuation);
894  if (got_continuation != expecting_to_handle_continuation_) {
895    const std::string console_log = got_continuation
896        ? "Received unexpected continuation frame."
897        : "Received start of new message but previous message is unfinished.";
898    const std::string reason = got_continuation
899        ? "Unexpected continuation"
900        : "Previous data frame unfinished";
901    return FailChannel(console_log, kWebSocketErrorProtocolError, reason);
902  }
903  expecting_to_handle_continuation_ = !final;
904  WebSocketFrameHeader::OpCode opcode_to_send = opcode;
905  if (!initial_frame_forwarded_ &&
906      opcode == WebSocketFrameHeader::kOpCodeContinuation) {
907    opcode_to_send = receiving_text_message_
908                         ? WebSocketFrameHeader::kOpCodeText
909                         : WebSocketFrameHeader::kOpCodeBinary;
910  }
911  if (opcode == WebSocketFrameHeader::kOpCodeText ||
912      (opcode == WebSocketFrameHeader::kOpCodeContinuation &&
913       receiving_text_message_)) {
914    // This call is not redundant when size == 0 because it tells us what
915    // the current state is.
916    StreamingUtf8Validator::State state = incoming_utf8_validator_.AddBytes(
917        size ? data_buffer->data() : NULL, size);
918    if (state == StreamingUtf8Validator::INVALID ||
919        (state == StreamingUtf8Validator::VALID_MIDPOINT && final)) {
920      return FailChannel("Could not decode a text frame as UTF-8.",
921                         kWebSocketErrorProtocolError,
922                         "Invalid UTF-8 in text frame");
923    }
924    receiving_text_message_ = !final;
925    DCHECK(!final || state == StreamingUtf8Validator::VALID_ENDPOINT);
926  }
927  if (size == 0U && !final)
928    return CHANNEL_ALIVE;
929
930  initial_frame_forwarded_ = !final;
931  if (size > base::checked_cast<size_t>(current_receive_quota_) ||
932      !pending_received_frames_.empty()) {
933    const bool no_quota = (current_receive_quota_ == 0);
934    DCHECK(no_quota || pending_received_frames_.empty());
935    DVLOG(3) << "Queueing frame to renderer due to quota. quota="
936             << current_receive_quota_ << " size=" << size;
937    WebSocketFrameHeader::OpCode opcode_to_queue =
938        no_quota ? opcode_to_send : WebSocketFrameHeader::kOpCodeContinuation;
939    pending_received_frames_.push(PendingReceivedFrame(
940        final, opcode_to_queue, data_buffer, current_receive_quota_, size));
941    if (no_quota)
942      return CHANNEL_ALIVE;
943    size = current_receive_quota_;
944    final = false;
945  }
946
947  // TODO(ricea): Can this copy be eliminated?
948  const char* const data_begin = size ? data_buffer->data() : NULL;
949  const char* const data_end = data_begin + size;
950  const std::vector<char> data(data_begin, data_end);
951  current_receive_quota_ -= size;
952  DCHECK_GE(current_receive_quota_, 0);
953
954  // Sends the received frame to the renderer process.
955  return event_interface_->OnDataFrame(final, opcode_to_send, data);
956}
957
958ChannelState WebSocketChannel::SendFrameFromIOBuffer(
959    bool fin,
960    WebSocketFrameHeader::OpCode op_code,
961    const scoped_refptr<IOBuffer>& buffer,
962    size_t size) {
963  DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
964  DCHECK(stream_);
965
966  scoped_ptr<WebSocketFrame> frame(new WebSocketFrame(op_code));
967  WebSocketFrameHeader& header = frame->header;
968  header.final = fin;
969  header.masked = true;
970  header.payload_length = size;
971  frame->data = buffer;
972
973  if (data_being_sent_) {
974    // Either the link to the WebSocket server is saturated, or several messages
975    // are being sent in a batch.
976    // TODO(ricea): Keep some statistics to work out the situation and adjust
977    // quota appropriately.
978    if (!data_to_send_next_)
979      data_to_send_next_.reset(new SendBuffer);
980    data_to_send_next_->AddFrame(frame.Pass());
981    return CHANNEL_ALIVE;
982  }
983
984  data_being_sent_.reset(new SendBuffer);
985  data_being_sent_->AddFrame(frame.Pass());
986  return WriteFrames();
987}
988
989ChannelState WebSocketChannel::FailChannel(const std::string& message,
990                                           uint16 code,
991                                           const std::string& reason) {
992  DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
993  DCHECK_NE(CONNECTING, state_);
994  DCHECK_NE(CLOSED, state_);
995
996  // TODO(ricea): Logging.
997  if (state_ == CONNECTED) {
998    if (SendClose(code, reason) == CHANNEL_DELETED)
999      return CHANNEL_DELETED;
1000  }
1001
1002  // Careful study of RFC6455 section 7.1.7 and 7.1.1 indicates the browser
1003  // should close the connection itself without waiting for the closing
1004  // handshake.
1005  stream_->Close();
1006  SetState(CLOSED);
1007  return event_interface_->OnFailChannel(message);
1008}
1009
1010ChannelState WebSocketChannel::SendClose(uint16 code,
1011                                         const std::string& reason) {
1012  DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
1013  DCHECK_LE(reason.size(), kMaximumCloseReasonLength);
1014  scoped_refptr<IOBuffer> body;
1015  size_t size = 0;
1016  if (code == kWebSocketErrorNoStatusReceived) {
1017    // Special case: translate kWebSocketErrorNoStatusReceived into a Close
1018    // frame with no payload.
1019    DCHECK(reason.empty());
1020    body = new IOBuffer(0);
1021  } else {
1022    const size_t payload_length = kWebSocketCloseCodeLength + reason.length();
1023    body = new IOBuffer(payload_length);
1024    size = payload_length;
1025    base::WriteBigEndian(body->data(), code);
1026    COMPILE_ASSERT(sizeof(code) == kWebSocketCloseCodeLength,
1027                   they_should_both_be_two);
1028    std::copy(
1029        reason.begin(), reason.end(), body->data() + kWebSocketCloseCodeLength);
1030  }
1031  // This use of base::Unretained() is safe because we stop the timer in the
1032  // destructor.
1033  timer_.Start(
1034      FROM_HERE,
1035      timeout_,
1036      base::Bind(&WebSocketChannel::CloseTimeout, base::Unretained(this)));
1037  if (SendFrameFromIOBuffer(
1038          true, WebSocketFrameHeader::kOpCodeClose, body, size) ==
1039      CHANNEL_DELETED)
1040    return CHANNEL_DELETED;
1041  return CHANNEL_ALIVE;
1042}
1043
1044bool WebSocketChannel::ParseClose(const scoped_refptr<IOBuffer>& buffer,
1045                                  size_t size,
1046                                  uint16* code,
1047                                  std::string* reason,
1048                                  std::string* message) {
1049  reason->clear();
1050  if (size < kWebSocketCloseCodeLength) {
1051    if (size == 0U) {
1052      *code = kWebSocketErrorNoStatusReceived;
1053      return true;
1054    }
1055
1056    DVLOG(1) << "Close frame with payload size " << size << " received "
1057             << "(the first byte is " << std::hex
1058             << static_cast<int>(buffer->data()[0]) << ")";
1059    *code = kWebSocketErrorProtocolError;
1060    *message =
1061        "Received a broken close frame containing an invalid size body.";
1062    return false;
1063  }
1064
1065  const char* data = buffer->data();
1066  uint16 unchecked_code = 0;
1067  base::ReadBigEndian(data, &unchecked_code);
1068  COMPILE_ASSERT(sizeof(unchecked_code) == kWebSocketCloseCodeLength,
1069                 they_should_both_be_two_bytes);
1070
1071  switch (unchecked_code) {
1072    case kWebSocketErrorNoStatusReceived:
1073    case kWebSocketErrorAbnormalClosure:
1074    case kWebSocketErrorTlsHandshake:
1075      *code = kWebSocketErrorProtocolError;
1076      *message =
1077          "Received a broken close frame containing a reserved status code.";
1078      return false;
1079
1080    default:
1081      *code = unchecked_code;
1082      break;
1083  }
1084
1085  std::string text(data + kWebSocketCloseCodeLength, data + size);
1086  if (StreamingUtf8Validator::Validate(text)) {
1087    reason->swap(text);
1088    return true;
1089  }
1090
1091  *code = kWebSocketErrorProtocolError;
1092  *reason = "Invalid UTF-8 in Close frame";
1093  *message = "Received a broken close frame containing invalid UTF-8.";
1094  return false;
1095}
1096
1097ChannelState WebSocketChannel::DoDropChannel(bool was_clean,
1098                                             uint16 code,
1099                                             const std::string& reason) {
1100  if (CHANNEL_DELETED ==
1101      notification_sender_->SendImmediately(event_interface_.get()))
1102    return CHANNEL_DELETED;
1103  ChannelState result =
1104      event_interface_->OnDropChannel(was_clean, code, reason);
1105  DCHECK_EQ(CHANNEL_DELETED, result);
1106  return result;
1107}
1108
1109void WebSocketChannel::CloseTimeout() {
1110  stream_->Close();
1111  SetState(CLOSED);
1112  AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
1113  // |this| has been deleted.
1114}
1115
1116}  // namespace net
1117