websocket_channel.cc revision 23730a6e56a168d1879203e4b3819bb36e3d8f1f
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/websockets/websocket_channel.h"
6
7#include <algorithm>
8
9#include "base/basictypes.h"  // for size_t
10#include "base/big_endian.h"
11#include "base/bind.h"
12#include "base/compiler_specific.h"
13#include "base/memory/weak_ptr.h"
14#include "base/message_loop/message_loop.h"
15#include "base/numerics/safe_conversions.h"
16#include "base/stl_util.h"
17#include "base/strings/stringprintf.h"
18#include "base/time/time.h"
19#include "net/base/io_buffer.h"
20#include "net/base/net_log.h"
21#include "net/http/http_request_headers.h"
22#include "net/http/http_response_headers.h"
23#include "net/http/http_util.h"
24#include "net/websockets/websocket_errors.h"
25#include "net/websockets/websocket_event_interface.h"
26#include "net/websockets/websocket_frame.h"
27#include "net/websockets/websocket_handshake_request_info.h"
28#include "net/websockets/websocket_handshake_response_info.h"
29#include "net/websockets/websocket_mux.h"
30#include "net/websockets/websocket_stream.h"
31#include "url/origin.h"
32
33namespace net {
34
35namespace {
36
37using base::StreamingUtf8Validator;
38
39const int kDefaultSendQuotaLowWaterMark = 1 << 16;
40const int kDefaultSendQuotaHighWaterMark = 1 << 17;
41const size_t kWebSocketCloseCodeLength = 2;
42// This timeout is based on TCPMaximumSegmentLifetime * 2 from
43// MainThreadWebSocketChannel.cpp in Blink.
44const int kClosingHandshakeTimeoutSeconds = 2 * 2 * 60;
45
46typedef WebSocketEventInterface::ChannelState ChannelState;
47const ChannelState CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE;
48const ChannelState CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED;
49
50// Maximum close reason length = max control frame payload -
51//                               status code length
52//                             = 125 - 2
53const size_t kMaximumCloseReasonLength = 125 - kWebSocketCloseCodeLength;
54
55// Check a close status code for strict compliance with RFC6455. This is only
56// used for close codes received from a renderer that we are intending to send
57// out over the network. See ParseClose() for the restrictions on incoming close
58// codes. The |code| parameter is type int for convenience of implementation;
59// the real type is uint16. Code 1005 is treated specially; it cannot be set
60// explicitly by Javascript but the renderer uses it to indicate we should send
61// a Close frame with no payload.
62bool IsStrictlyValidCloseStatusCode(int code) {
63  static const int kInvalidRanges[] = {
64      // [BAD, OK)
65      0,    1000,   // 1000 is the first valid code
66      1006, 1007,   // 1006 MUST NOT be set.
67      1014, 3000,   // 1014 unassigned; 1015 up to 2999 are reserved.
68      5000, 65536,  // Codes above 5000 are invalid.
69  };
70  const int* const kInvalidRangesEnd =
71      kInvalidRanges + arraysize(kInvalidRanges);
72
73  DCHECK_GE(code, 0);
74  DCHECK_LT(code, 65536);
75  const int* upper = std::upper_bound(kInvalidRanges, kInvalidRangesEnd, code);
76  DCHECK_NE(kInvalidRangesEnd, upper);
77  DCHECK_GT(upper, kInvalidRanges);
78  DCHECK_GT(*upper, code);
79  DCHECK_LE(*(upper - 1), code);
80  return ((upper - kInvalidRanges) % 2) == 0;
81}
82
83// This function avoids a bunch of boilerplate code.
84void AllowUnused(ChannelState ALLOW_UNUSED unused) {}
85
86// Sets |name| to the name of the frame type for the given |opcode|. Note that
87// for all of Text, Binary and Continuation opcode, this method returns
88// "Data frame".
89void GetFrameTypeForOpcode(WebSocketFrameHeader::OpCode opcode,
90                           std::string* name) {
91  switch (opcode) {
92    case WebSocketFrameHeader::kOpCodeText:    // fall-thru
93    case WebSocketFrameHeader::kOpCodeBinary:  // fall-thru
94    case WebSocketFrameHeader::kOpCodeContinuation:
95      *name = "Data frame";
96      break;
97
98    case WebSocketFrameHeader::kOpCodePing:
99      *name = "Ping";
100      break;
101
102    case WebSocketFrameHeader::kOpCodePong:
103      *name = "Pong";
104      break;
105
106    case WebSocketFrameHeader::kOpCodeClose:
107      *name = "Close";
108      break;
109
110    default:
111      *name = "Unknown frame type";
112      break;
113  }
114
115  return;
116}
117
118}  // namespace
119
120// A class to encapsulate a set of frames and information about the size of
121// those frames.
122class WebSocketChannel::SendBuffer {
123 public:
124  SendBuffer() : total_bytes_(0) {}
125
126  // Add a WebSocketFrame to the buffer and increase total_bytes_.
127  void AddFrame(scoped_ptr<WebSocketFrame> chunk);
128
129  // Return a pointer to the frames_ for write purposes.
130  ScopedVector<WebSocketFrame>* frames() { return &frames_; }
131
132 private:
133  // The frames_ that will be sent in the next call to WriteFrames().
134  ScopedVector<WebSocketFrame> frames_;
135
136  // The total size of the payload data in |frames_|. This will be used to
137  // measure the throughput of the link.
138  // TODO(ricea): Measure the throughput of the link.
139  size_t total_bytes_;
140};
141
142void WebSocketChannel::SendBuffer::AddFrame(scoped_ptr<WebSocketFrame> frame) {
143  total_bytes_ += frame->header.payload_length;
144  frames_.push_back(frame.release());
145}
146
147// Implementation of WebSocketStream::ConnectDelegate that simply forwards the
148// calls on to the WebSocketChannel that created it.
149class WebSocketChannel::ConnectDelegate
150    : public WebSocketStream::ConnectDelegate {
151 public:
152  explicit ConnectDelegate(WebSocketChannel* creator) : creator_(creator) {}
153
154  virtual void OnSuccess(scoped_ptr<WebSocketStream> stream) OVERRIDE {
155    creator_->OnConnectSuccess(stream.Pass());
156    // |this| may have been deleted.
157  }
158
159  virtual void OnFailure(const std::string& message) OVERRIDE {
160    creator_->OnConnectFailure(message);
161    // |this| has been deleted.
162  }
163
164  virtual void OnStartOpeningHandshake(
165      scoped_ptr<WebSocketHandshakeRequestInfo> request) OVERRIDE {
166    creator_->OnStartOpeningHandshake(request.Pass());
167  }
168
169  virtual void OnFinishOpeningHandshake(
170      scoped_ptr<WebSocketHandshakeResponseInfo> response)
171      OVERRIDE {
172    creator_->OnFinishOpeningHandshake(response.Pass());
173  }
174
175 private:
176  // A pointer to the WebSocketChannel that created this object. There is no
177  // danger of this pointer being stale, because deleting the WebSocketChannel
178  // cancels the connect process, deleting this object and preventing its
179  // callbacks from being called.
180  WebSocketChannel* const creator_;
181
182  DISALLOW_COPY_AND_ASSIGN(ConnectDelegate);
183};
184
185class WebSocketChannel::HandshakeNotificationSender
186    : public base::SupportsWeakPtr<HandshakeNotificationSender> {
187 public:
188  explicit HandshakeNotificationSender(WebSocketChannel* channel);
189  ~HandshakeNotificationSender();
190
191  static void Send(base::WeakPtr<HandshakeNotificationSender> sender);
192
193  ChannelState SendImmediately(WebSocketEventInterface* event_interface);
194
195  const WebSocketHandshakeRequestInfo* handshake_request_info() const {
196    return handshake_request_info_.get();
197  }
198
199  void set_handshake_request_info(
200      scoped_ptr<WebSocketHandshakeRequestInfo> request_info) {
201    handshake_request_info_ = request_info.Pass();
202  }
203
204  const WebSocketHandshakeResponseInfo* handshake_response_info() const {
205    return handshake_response_info_.get();
206  }
207
208  void set_handshake_response_info(
209      scoped_ptr<WebSocketHandshakeResponseInfo> response_info) {
210    handshake_response_info_ = response_info.Pass();
211  }
212
213 private:
214  WebSocketChannel* owner_;
215  scoped_ptr<WebSocketHandshakeRequestInfo> handshake_request_info_;
216  scoped_ptr<WebSocketHandshakeResponseInfo> handshake_response_info_;
217};
218
219WebSocketChannel::HandshakeNotificationSender::HandshakeNotificationSender(
220    WebSocketChannel* channel) : owner_(channel) {}
221
222WebSocketChannel::HandshakeNotificationSender::~HandshakeNotificationSender() {}
223
224void WebSocketChannel::HandshakeNotificationSender::Send(
225    base::WeakPtr<HandshakeNotificationSender> sender) {
226  // Do nothing if |sender| is already destructed.
227  if (sender) {
228    WebSocketChannel* channel = sender->owner_;
229    AllowUnused(sender->SendImmediately(channel->event_interface_.get()));
230  }
231}
232
233ChannelState WebSocketChannel::HandshakeNotificationSender::SendImmediately(
234    WebSocketEventInterface* event_interface) {
235
236  if (handshake_request_info_.get()) {
237    if (CHANNEL_DELETED == event_interface->OnStartOpeningHandshake(
238            handshake_request_info_.Pass()))
239      return CHANNEL_DELETED;
240  }
241
242  if (handshake_response_info_.get()) {
243    if (CHANNEL_DELETED == event_interface->OnFinishOpeningHandshake(
244            handshake_response_info_.Pass()))
245      return CHANNEL_DELETED;
246
247    // TODO(yhirano): We can release |this| to save memory because
248    // there will be no more opening handshake notification.
249  }
250
251  return CHANNEL_ALIVE;
252}
253
254WebSocketChannel::WebSocketChannel(
255    scoped_ptr<WebSocketEventInterface> event_interface,
256    URLRequestContext* url_request_context)
257    : event_interface_(event_interface.Pass()),
258      url_request_context_(url_request_context),
259      send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark),
260      send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark),
261      current_send_quota_(0),
262      timeout_(base::TimeDelta::FromSeconds(kClosingHandshakeTimeoutSeconds)),
263      received_close_code_(0),
264      state_(FRESHLY_CONSTRUCTED),
265      notification_sender_(new HandshakeNotificationSender(this)),
266      sending_text_message_(false),
267      receiving_text_message_(false),
268      expecting_to_handle_continuation_(false),
269      initial_frame_forwarded_(false) {}
270
271WebSocketChannel::~WebSocketChannel() {
272  // The stream may hold a pointer to read_frames_, and so it needs to be
273  // destroyed first.
274  stream_.reset();
275  // The timer may have a callback pointing back to us, so stop it just in case
276  // someone decides to run the event loop from their destructor.
277  timer_.Stop();
278}
279
280void WebSocketChannel::SendAddChannelRequest(
281    const GURL& socket_url,
282    const std::vector<std::string>& requested_subprotocols,
283    const url::Origin& origin) {
284  // Delegate to the tested version.
285  SendAddChannelRequestWithSuppliedCreator(
286      socket_url,
287      requested_subprotocols,
288      origin,
289      base::Bind(&WebSocketStream::CreateAndConnectStream));
290}
291
292bool WebSocketChannel::InClosingState() const {
293  // The state RECV_CLOSED is not supported here, because it is only used in one
294  // code path and should not leak into the code in general.
295  DCHECK_NE(RECV_CLOSED, state_)
296      << "InClosingState called with state_ == RECV_CLOSED";
297  return state_ == SEND_CLOSED || state_ == CLOSE_WAIT || state_ == CLOSED;
298}
299
300void WebSocketChannel::SendFrame(bool fin,
301                                 WebSocketFrameHeader::OpCode op_code,
302                                 const std::vector<char>& data) {
303  if (data.size() > INT_MAX) {
304    NOTREACHED() << "Frame size sanity check failed";
305    return;
306  }
307  if (stream_ == NULL) {
308    LOG(DFATAL) << "Got SendFrame without a connection established; "
309                << "misbehaving renderer? fin=" << fin << " op_code=" << op_code
310                << " data.size()=" << data.size();
311    return;
312  }
313  if (InClosingState()) {
314    VLOG(1) << "SendFrame called in state " << state_
315            << ". This may be a bug, or a harmless race.";
316    return;
317  }
318  if (state_ != CONNECTED) {
319    NOTREACHED() << "SendFrame() called in state " << state_;
320    return;
321  }
322  if (data.size() > base::checked_cast<size_t>(current_send_quota_)) {
323    // TODO(ricea): Kill renderer.
324    AllowUnused(
325        FailChannel("Send quota exceeded", kWebSocketErrorGoingAway, ""));
326    // |this| has been deleted.
327    return;
328  }
329  if (!WebSocketFrameHeader::IsKnownDataOpCode(op_code)) {
330    LOG(DFATAL) << "Got SendFrame with bogus op_code " << op_code
331                << "; misbehaving renderer? fin=" << fin
332                << " data.size()=" << data.size();
333    return;
334  }
335  if (op_code == WebSocketFrameHeader::kOpCodeText ||
336      (op_code == WebSocketFrameHeader::kOpCodeContinuation &&
337       sending_text_message_)) {
338    StreamingUtf8Validator::State state =
339        outgoing_utf8_validator_.AddBytes(vector_as_array(&data), data.size());
340    if (state == StreamingUtf8Validator::INVALID ||
341        (state == StreamingUtf8Validator::VALID_MIDPOINT && fin)) {
342      // TODO(ricea): Kill renderer.
343      AllowUnused(
344          FailChannel("Browser sent a text frame containing invalid UTF-8",
345                      kWebSocketErrorGoingAway,
346                      ""));
347      // |this| has been deleted.
348      return;
349    }
350    sending_text_message_ = !fin;
351    DCHECK(!fin || state == StreamingUtf8Validator::VALID_ENDPOINT);
352  }
353  current_send_quota_ -= data.size();
354  // TODO(ricea): If current_send_quota_ has dropped below
355  // send_quota_low_water_mark_, it might be good to increase the "low
356  // water mark" and "high water mark", but only if the link to the WebSocket
357  // server is not saturated.
358  scoped_refptr<IOBuffer> buffer(new IOBuffer(data.size()));
359  std::copy(data.begin(), data.end(), buffer->data());
360  AllowUnused(SendIOBuffer(fin, op_code, buffer, data.size()));
361  // |this| may have been deleted.
362}
363
364void WebSocketChannel::SendFlowControl(int64 quota) {
365  DCHECK(state_ == CONNECTING || state_ == CONNECTED || state_ == SEND_CLOSED ||
366         state_ == CLOSE_WAIT);
367  // TODO(ricea): Add interface to WebSocketStream and implement.
368  // stream_->SendFlowControl(quota);
369}
370
371void WebSocketChannel::StartClosingHandshake(uint16 code,
372                                             const std::string& reason) {
373  if (InClosingState()) {
374    VLOG(1) << "StartClosingHandshake called in state " << state_
375            << ". This may be a bug, or a harmless race.";
376    return;
377  }
378  if (state_ == CONNECTING) {
379    // Abort the in-progress handshake and drop the connection immediately.
380    stream_request_.reset();
381    state_ = CLOSED;
382    AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
383    return;
384  }
385  if (state_ != CONNECTED) {
386    NOTREACHED() << "StartClosingHandshake() called in state " << state_;
387    return;
388  }
389  // Javascript actually only permits 1000 and 3000-4999, but the implementation
390  // itself may produce different codes. The length of |reason| is also checked
391  // by Javascript.
392  if (!IsStrictlyValidCloseStatusCode(code) ||
393      reason.size() > kMaximumCloseReasonLength) {
394    // "InternalServerError" is actually used for errors from any endpoint, per
395    // errata 3227 to RFC6455. If the renderer is sending us an invalid code or
396    // reason it must be malfunctioning in some way, and based on that we
397    // interpret this as an internal error.
398    if (SendClose(kWebSocketErrorInternalServerError, "") != CHANNEL_DELETED)
399      state_ = SEND_CLOSED;
400    return;
401  }
402  if (SendClose(
403          code,
404          StreamingUtf8Validator::Validate(reason) ? reason : std::string()) ==
405      CHANNEL_DELETED)
406    return;
407  state_ = SEND_CLOSED;
408}
409
410void WebSocketChannel::SendAddChannelRequestForTesting(
411    const GURL& socket_url,
412    const std::vector<std::string>& requested_subprotocols,
413    const url::Origin& origin,
414    const WebSocketStreamCreator& creator) {
415  SendAddChannelRequestWithSuppliedCreator(
416      socket_url, requested_subprotocols, origin, creator);
417}
418
419void WebSocketChannel::SetClosingHandshakeTimeoutForTesting(
420    base::TimeDelta delay) {
421  timeout_ = delay;
422}
423
424void WebSocketChannel::SendAddChannelRequestWithSuppliedCreator(
425    const GURL& socket_url,
426    const std::vector<std::string>& requested_subprotocols,
427    const url::Origin& origin,
428    const WebSocketStreamCreator& creator) {
429  DCHECK_EQ(FRESHLY_CONSTRUCTED, state_);
430  if (!socket_url.SchemeIsWSOrWSS()) {
431    // TODO(ricea): Kill the renderer (this error should have been caught by
432    // Javascript).
433    AllowUnused(event_interface_->OnAddChannelResponse(true, "", ""));
434    // |this| is deleted here.
435    return;
436  }
437  socket_url_ = socket_url;
438  scoped_ptr<WebSocketStream::ConnectDelegate> connect_delegate(
439      new ConnectDelegate(this));
440  stream_request_ = creator.Run(socket_url_,
441                                requested_subprotocols,
442                                origin,
443                                url_request_context_,
444                                BoundNetLog(),
445                                connect_delegate.Pass());
446  state_ = CONNECTING;
447}
448
449void WebSocketChannel::OnConnectSuccess(scoped_ptr<WebSocketStream> stream) {
450  DCHECK(stream);
451  DCHECK_EQ(CONNECTING, state_);
452  stream_ = stream.Pass();
453  state_ = CONNECTED;
454  if (event_interface_->OnAddChannelResponse(
455          false, stream_->GetSubProtocol(), stream_->GetExtensions()) ==
456      CHANNEL_DELETED)
457    return;
458
459  // TODO(ricea): Get flow control information from the WebSocketStream once we
460  // have a multiplexing WebSocketStream.
461  current_send_quota_ = send_quota_high_water_mark_;
462  if (event_interface_->OnFlowControl(send_quota_high_water_mark_) ==
463      CHANNEL_DELETED)
464    return;
465
466  // |stream_request_| is not used once the connection has succeeded.
467  stream_request_.reset();
468  AllowUnused(ReadFrames());
469  // |this| may have been deleted.
470}
471
472void WebSocketChannel::OnConnectFailure(const std::string& message) {
473  DCHECK_EQ(CONNECTING, state_);
474  state_ = CLOSED;
475  stream_request_.reset();
476
477  if (CHANNEL_DELETED ==
478      notification_sender_->SendImmediately(event_interface_.get())) {
479    // |this| has been deleted.
480    return;
481  }
482  AllowUnused(event_interface_->OnFailChannel(message));
483  // |this| has been deleted.
484}
485
486void WebSocketChannel::OnStartOpeningHandshake(
487    scoped_ptr<WebSocketHandshakeRequestInfo> request) {
488  DCHECK(!notification_sender_->handshake_request_info());
489
490  // Because it is hard to handle an IPC error synchronously is difficult,
491  // we asynchronously notify the information.
492  notification_sender_->set_handshake_request_info(request.Pass());
493  ScheduleOpeningHandshakeNotification();
494}
495
496void WebSocketChannel::OnFinishOpeningHandshake(
497    scoped_ptr<WebSocketHandshakeResponseInfo> response) {
498  DCHECK(!notification_sender_->handshake_response_info());
499
500  // Because it is hard to handle an IPC error synchronously is difficult,
501  // we asynchronously notify the information.
502  notification_sender_->set_handshake_response_info(response.Pass());
503  ScheduleOpeningHandshakeNotification();
504}
505
506void WebSocketChannel::ScheduleOpeningHandshakeNotification() {
507  base::MessageLoop::current()->PostTask(
508      FROM_HERE,
509      base::Bind(HandshakeNotificationSender::Send,
510                 notification_sender_->AsWeakPtr()));
511}
512
513ChannelState WebSocketChannel::WriteFrames() {
514  int result = OK;
515  do {
516    // This use of base::Unretained is safe because this object owns the
517    // WebSocketStream and destroying it cancels all callbacks.
518    result = stream_->WriteFrames(
519        data_being_sent_->frames(),
520        base::Bind(base::IgnoreResult(&WebSocketChannel::OnWriteDone),
521                   base::Unretained(this),
522                   false));
523    if (result != ERR_IO_PENDING) {
524      if (OnWriteDone(true, result) == CHANNEL_DELETED)
525        return CHANNEL_DELETED;
526      // OnWriteDone() returns CHANNEL_DELETED on error. Here |state_| is
527      // guaranteed to be the same as before OnWriteDone() call.
528    }
529  } while (result == OK && data_being_sent_);
530  return CHANNEL_ALIVE;
531}
532
533ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) {
534  DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
535  DCHECK_NE(CONNECTING, state_);
536  DCHECK_NE(ERR_IO_PENDING, result);
537  DCHECK(data_being_sent_);
538  switch (result) {
539    case OK:
540      if (data_to_send_next_) {
541        data_being_sent_ = data_to_send_next_.Pass();
542        if (!synchronous)
543          return WriteFrames();
544      } else {
545        data_being_sent_.reset();
546        if (current_send_quota_ < send_quota_low_water_mark_) {
547          // TODO(ricea): Increase low_water_mark and high_water_mark if
548          // throughput is high, reduce them if throughput is low.  Low water
549          // mark needs to be >= the bandwidth delay product *of the IPC
550          // channel*. Because factors like context-switch time, thread wake-up
551          // time, and bus speed come into play it is complex and probably needs
552          // to be determined empirically.
553          DCHECK_LE(send_quota_low_water_mark_, send_quota_high_water_mark_);
554          // TODO(ricea): Truncate quota by the quota specified by the remote
555          // server, if the protocol in use supports quota.
556          int fresh_quota = send_quota_high_water_mark_ - current_send_quota_;
557          current_send_quota_ += fresh_quota;
558          return event_interface_->OnFlowControl(fresh_quota);
559        }
560      }
561      return CHANNEL_ALIVE;
562
563    // If a recoverable error condition existed, it would go here.
564
565    default:
566      DCHECK_LT(result, 0)
567          << "WriteFrames() should only return OK or ERR_ codes";
568      stream_->Close();
569      DCHECK_NE(CLOSED, state_);
570      state_ = CLOSED;
571      return DoDropChannel(false, kWebSocketErrorAbnormalClosure, "");
572  }
573}
574
575ChannelState WebSocketChannel::ReadFrames() {
576  int result = OK;
577  do {
578    // This use of base::Unretained is safe because this object owns the
579    // WebSocketStream, and any pending reads will be cancelled when it is
580    // destroyed.
581    result = stream_->ReadFrames(
582        &read_frames_,
583        base::Bind(base::IgnoreResult(&WebSocketChannel::OnReadDone),
584                   base::Unretained(this),
585                   false));
586    if (result != ERR_IO_PENDING) {
587      if (OnReadDone(true, result) == CHANNEL_DELETED)
588        return CHANNEL_DELETED;
589    }
590    DCHECK_NE(CLOSED, state_);
591  } while (result == OK);
592  return CHANNEL_ALIVE;
593}
594
595ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) {
596  DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
597  DCHECK_NE(CONNECTING, state_);
598  DCHECK_NE(ERR_IO_PENDING, result);
599  switch (result) {
600    case OK:
601      // ReadFrames() must use ERR_CONNECTION_CLOSED for a closed connection
602      // with no data read, not an empty response.
603      DCHECK(!read_frames_.empty())
604          << "ReadFrames() returned OK, but nothing was read.";
605      for (size_t i = 0; i < read_frames_.size(); ++i) {
606        scoped_ptr<WebSocketFrame> frame(read_frames_[i]);
607        read_frames_[i] = NULL;
608        if (HandleFrame(frame.Pass()) == CHANNEL_DELETED)
609          return CHANNEL_DELETED;
610      }
611      read_frames_.clear();
612      // There should always be a call to ReadFrames pending.
613      // TODO(ricea): Unless we are out of quota.
614      DCHECK_NE(CLOSED, state_);
615      if (!synchronous)
616        return ReadFrames();
617      return CHANNEL_ALIVE;
618
619    case ERR_WS_PROTOCOL_ERROR:
620      // This could be kWebSocketErrorProtocolError (specifically, non-minimal
621      // encoding of payload length) or kWebSocketErrorMessageTooBig, or an
622      // extension-specific error.
623      return FailChannel("Invalid frame header",
624                         kWebSocketErrorProtocolError,
625                         "WebSocket Protocol Error");
626
627    default:
628      DCHECK_LT(result, 0)
629          << "ReadFrames() should only return OK or ERR_ codes";
630      stream_->Close();
631      DCHECK_NE(CLOSED, state_);
632      state_ = CLOSED;
633      uint16 code = kWebSocketErrorAbnormalClosure;
634      std::string reason = "";
635      bool was_clean = false;
636      if (received_close_code_ != 0) {
637        code = received_close_code_;
638        reason = received_close_reason_;
639        was_clean = (result == ERR_CONNECTION_CLOSED);
640      }
641      return DoDropChannel(was_clean, code, reason);
642  }
643}
644
645ChannelState WebSocketChannel::HandleFrame(
646    scoped_ptr<WebSocketFrame> frame) {
647  if (frame->header.masked) {
648    // RFC6455 Section 5.1 "A client MUST close a connection if it detects a
649    // masked frame."
650    return FailChannel(
651        "A server must not mask any frames that it sends to the "
652        "client.",
653        kWebSocketErrorProtocolError,
654        "Masked frame from server");
655  }
656  const WebSocketFrameHeader::OpCode opcode = frame->header.opcode;
657  DCHECK(!WebSocketFrameHeader::IsKnownControlOpCode(opcode) ||
658         frame->header.final);
659  if (frame->header.reserved1 || frame->header.reserved2 ||
660      frame->header.reserved3) {
661    return FailChannel(base::StringPrintf(
662                           "One or more reserved bits are on: reserved1 = %d, "
663                           "reserved2 = %d, reserved3 = %d",
664                           static_cast<int>(frame->header.reserved1),
665                           static_cast<int>(frame->header.reserved2),
666                           static_cast<int>(frame->header.reserved3)),
667                       kWebSocketErrorProtocolError,
668                       "Invalid reserved bit");
669  }
670
671  // Respond to the frame appropriately to its type.
672  return HandleFrameByState(
673      opcode, frame->header.final, frame->data, frame->header.payload_length);
674}
675
676ChannelState WebSocketChannel::HandleFrameByState(
677    const WebSocketFrameHeader::OpCode opcode,
678    bool final,
679    const scoped_refptr<IOBuffer>& data_buffer,
680    size_t size) {
681  DCHECK_NE(RECV_CLOSED, state_)
682      << "HandleFrame() does not support being called re-entrantly from within "
683         "SendClose()";
684  DCHECK_NE(CLOSED, state_);
685  if (state_ == CLOSE_WAIT) {
686    std::string frame_name;
687    GetFrameTypeForOpcode(opcode, &frame_name);
688
689    // FailChannel() won't send another Close frame.
690    return FailChannel(
691        frame_name + " received after close", kWebSocketErrorProtocolError, "");
692  }
693  switch (opcode) {
694    case WebSocketFrameHeader::kOpCodeText:    // fall-thru
695    case WebSocketFrameHeader::kOpCodeBinary:
696    case WebSocketFrameHeader::kOpCodeContinuation:
697      return HandleDataFrame(opcode, final, data_buffer, size);
698
699    case WebSocketFrameHeader::kOpCodePing:
700      VLOG(1) << "Got Ping of size " << size;
701      if (state_ == CONNECTED)
702        return SendIOBuffer(
703            true, WebSocketFrameHeader::kOpCodePong, data_buffer, size);
704      VLOG(3) << "Ignored ping in state " << state_;
705      return CHANNEL_ALIVE;
706
707    case WebSocketFrameHeader::kOpCodePong:
708      VLOG(1) << "Got Pong of size " << size;
709      // There is no need to do anything with pong messages.
710      return CHANNEL_ALIVE;
711
712    case WebSocketFrameHeader::kOpCodeClose: {
713      uint16 code = kWebSocketNormalClosure;
714      std::string reason;
715      std::string message;
716      if (!ParseClose(data_buffer, size, &code, &reason, &message)) {
717        return FailChannel(message, code, reason);
718      }
719      // TODO(ricea): Find a way to safely log the message from the close
720      // message (escape control codes and so on).
721      VLOG(1) << "Got Close with code " << code;
722      switch (state_) {
723        case CONNECTED:
724          state_ = RECV_CLOSED;
725          if (SendClose(code, reason) == CHANNEL_DELETED)
726            return CHANNEL_DELETED;
727          state_ = CLOSE_WAIT;
728
729          if (event_interface_->OnClosingHandshake() == CHANNEL_DELETED)
730            return CHANNEL_DELETED;
731          received_close_code_ = code;
732          received_close_reason_ = reason;
733          break;
734
735        case SEND_CLOSED:
736          state_ = CLOSE_WAIT;
737          // From RFC6455 section 7.1.5: "Each endpoint
738          // will see the status code sent by the other end as _The WebSocket
739          // Connection Close Code_."
740          received_close_code_ = code;
741          received_close_reason_ = reason;
742          break;
743
744        default:
745          LOG(DFATAL) << "Got Close in unexpected state " << state_;
746          break;
747      }
748      return CHANNEL_ALIVE;
749    }
750
751    default:
752      return FailChannel(
753          base::StringPrintf("Unrecognized frame opcode: %d", opcode),
754          kWebSocketErrorProtocolError,
755          "Unknown opcode");
756  }
757}
758
759ChannelState WebSocketChannel::HandleDataFrame(
760    WebSocketFrameHeader::OpCode opcode,
761    bool final,
762    const scoped_refptr<IOBuffer>& data_buffer,
763    size_t size) {
764  if (state_ != CONNECTED) {
765    DVLOG(3) << "Ignored data packet received in state " << state_;
766    return CHANNEL_ALIVE;
767  }
768  DCHECK(opcode == WebSocketFrameHeader::kOpCodeContinuation ||
769         opcode == WebSocketFrameHeader::kOpCodeText ||
770         opcode == WebSocketFrameHeader::kOpCodeBinary);
771  const bool got_continuation =
772      (opcode == WebSocketFrameHeader::kOpCodeContinuation);
773  if (got_continuation != expecting_to_handle_continuation_) {
774    const std::string console_log = got_continuation
775        ? "Received unexpected continuation frame."
776        : "Received start of new message but previous message is unfinished.";
777    const std::string reason = got_continuation
778        ? "Unexpected continuation"
779        : "Previous data frame unfinished";
780    return FailChannel(console_log, kWebSocketErrorProtocolError, reason);
781  }
782  expecting_to_handle_continuation_ = !final;
783  WebSocketFrameHeader::OpCode opcode_to_send = opcode;
784  if (!initial_frame_forwarded_ &&
785      opcode == WebSocketFrameHeader::kOpCodeContinuation) {
786    opcode_to_send = receiving_text_message_
787                         ? WebSocketFrameHeader::kOpCodeText
788                         : WebSocketFrameHeader::kOpCodeBinary;
789  }
790  if (opcode == WebSocketFrameHeader::kOpCodeText ||
791      (opcode == WebSocketFrameHeader::kOpCodeContinuation &&
792       receiving_text_message_)) {
793    // This call is not redundant when size == 0 because it tells us what
794    // the current state is.
795    StreamingUtf8Validator::State state = incoming_utf8_validator_.AddBytes(
796        size ? data_buffer->data() : NULL, size);
797    if (state == StreamingUtf8Validator::INVALID ||
798        (state == StreamingUtf8Validator::VALID_MIDPOINT && final)) {
799      return FailChannel("Could not decode a text frame as UTF-8.",
800                         kWebSocketErrorProtocolError,
801                         "Invalid UTF-8 in text frame");
802    }
803    receiving_text_message_ = !final;
804    DCHECK(!final || state == StreamingUtf8Validator::VALID_ENDPOINT);
805  }
806  if (size == 0U && !final)
807    return CHANNEL_ALIVE;
808
809  initial_frame_forwarded_ = !final;
810  // TODO(ricea): Can this copy be eliminated?
811  const char* const data_begin = size ? data_buffer->data() : NULL;
812  const char* const data_end = data_begin + size;
813  const std::vector<char> data(data_begin, data_end);
814  // TODO(ricea): Handle the case when ReadFrames returns far
815  // more data at once than should be sent in a single IPC. This needs to
816  // be handled carefully, as an overloaded IO thread is one possible
817  // cause of receiving very large chunks.
818
819  // Sends the received frame to the renderer process.
820  return event_interface_->OnDataFrame(final, opcode_to_send, data);
821}
822
823ChannelState WebSocketChannel::SendIOBuffer(
824    bool fin,
825    WebSocketFrameHeader::OpCode op_code,
826    const scoped_refptr<IOBuffer>& buffer,
827    size_t size) {
828  DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
829  DCHECK(stream_);
830
831  scoped_ptr<WebSocketFrame> frame(new WebSocketFrame(op_code));
832  WebSocketFrameHeader& header = frame->header;
833  header.final = fin;
834  header.masked = true;
835  header.payload_length = size;
836  frame->data = buffer;
837
838  if (data_being_sent_) {
839    // Either the link to the WebSocket server is saturated, or several messages
840    // are being sent in a batch.
841    // TODO(ricea): Keep some statistics to work out the situation and adjust
842    // quota appropriately.
843    if (!data_to_send_next_)
844      data_to_send_next_.reset(new SendBuffer);
845    data_to_send_next_->AddFrame(frame.Pass());
846    return CHANNEL_ALIVE;
847  }
848
849  data_being_sent_.reset(new SendBuffer);
850  data_being_sent_->AddFrame(frame.Pass());
851  return WriteFrames();
852}
853
854ChannelState WebSocketChannel::FailChannel(const std::string& message,
855                                           uint16 code,
856                                           const std::string& reason) {
857  DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
858  DCHECK_NE(CONNECTING, state_);
859  DCHECK_NE(CLOSED, state_);
860  // TODO(ricea): Logging.
861  if (state_ == CONNECTED) {
862    if (SendClose(code, reason) == CHANNEL_DELETED)
863      return CHANNEL_DELETED;
864  }
865  // Careful study of RFC6455 section 7.1.7 and 7.1.1 indicates the browser
866  // should close the connection itself without waiting for the closing
867  // handshake.
868  stream_->Close();
869  state_ = CLOSED;
870
871  return event_interface_->OnFailChannel(message);
872}
873
874ChannelState WebSocketChannel::SendClose(uint16 code,
875                                         const std::string& reason) {
876  DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
877  DCHECK_LE(reason.size(), kMaximumCloseReasonLength);
878  scoped_refptr<IOBuffer> body;
879  size_t size = 0;
880  if (code == kWebSocketErrorNoStatusReceived) {
881    // Special case: translate kWebSocketErrorNoStatusReceived into a Close
882    // frame with no payload.
883    DCHECK(reason.empty());
884    body = new IOBuffer(0);
885  } else {
886    const size_t payload_length = kWebSocketCloseCodeLength + reason.length();
887    body = new IOBuffer(payload_length);
888    size = payload_length;
889    base::WriteBigEndian(body->data(), code);
890    COMPILE_ASSERT(sizeof(code) == kWebSocketCloseCodeLength,
891                   they_should_both_be_two);
892    std::copy(
893        reason.begin(), reason.end(), body->data() + kWebSocketCloseCodeLength);
894  }
895  // This use of base::Unretained() is safe because we stop the timer in the
896  // destructor.
897  timer_.Start(
898      FROM_HERE,
899      timeout_,
900      base::Bind(&WebSocketChannel::CloseTimeout, base::Unretained(this)));
901  if (SendIOBuffer(true, WebSocketFrameHeader::kOpCodeClose, body, size) ==
902      CHANNEL_DELETED)
903    return CHANNEL_DELETED;
904  return CHANNEL_ALIVE;
905}
906
907bool WebSocketChannel::ParseClose(const scoped_refptr<IOBuffer>& buffer,
908                                  size_t size,
909                                  uint16* code,
910                                  std::string* reason,
911                                  std::string* message) {
912  reason->clear();
913  if (size < kWebSocketCloseCodeLength) {
914    if (size == 0U) {
915      *code = kWebSocketErrorNoStatusReceived;
916      return true;
917    }
918
919    DVLOG(1) << "Close frame with payload size " << size << " received "
920             << "(the first byte is " << std::hex
921             << static_cast<int>(buffer->data()[0]) << ")";
922    *code = kWebSocketErrorProtocolError;
923    *message =
924        "Received a broken close frame containing an invalid size body.";
925    return false;
926  }
927
928  const char* data = buffer->data();
929  uint16 unchecked_code = 0;
930  base::ReadBigEndian(data, &unchecked_code);
931  COMPILE_ASSERT(sizeof(unchecked_code) == kWebSocketCloseCodeLength,
932                 they_should_both_be_two_bytes);
933
934  switch (unchecked_code) {
935    case kWebSocketErrorNoStatusReceived:
936    case kWebSocketErrorAbnormalClosure:
937    case kWebSocketErrorTlsHandshake:
938      *code = kWebSocketErrorProtocolError;
939      *message =
940          "Received a broken close frame containing a reserved status code.";
941      return false;
942
943    default:
944      *code = unchecked_code;
945      break;
946  }
947
948  std::string text(data + kWebSocketCloseCodeLength, data + size);
949  if (StreamingUtf8Validator::Validate(text)) {
950    reason->swap(text);
951    return true;
952  }
953
954  *code = kWebSocketErrorProtocolError;
955  *reason = "Invalid UTF-8 in Close frame";
956  *message = "Received a broken close frame containing invalid UTF-8.";
957  return false;
958}
959
960ChannelState WebSocketChannel::DoDropChannel(bool was_clean,
961                                             uint16 code,
962                                             const std::string& reason) {
963  if (CHANNEL_DELETED ==
964      notification_sender_->SendImmediately(event_interface_.get()))
965    return CHANNEL_DELETED;
966  ChannelState result =
967      event_interface_->OnDropChannel(was_clean, code, reason);
968  DCHECK_EQ(CHANNEL_DELETED, result);
969  return result;
970}
971
972void WebSocketChannel::CloseTimeout() {
973  stream_->Close();
974  DCHECK_NE(CLOSED, state_);
975  state_ = CLOSED;
976  AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
977  // |this| has been deleted.
978}
979
980}  // namespace net
981