18bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)// Copyright 2013 The Chromium Authors. All rights reserved.
28bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)// Use of this source code is governed by a BSD-style license that can be
38bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)// found in the LICENSE file.
48bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
58bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)#include "google_apis/gcm/base/socket_stream.h"
68bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
78bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)#include "base/bind.h"
88bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)#include "base/callback.h"
98bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)#include "net/base/io_buffer.h"
108bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)#include "net/socket/stream_socket.h"
118bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
128bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)namespace gcm {
138bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
148bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)namespace {
158bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
168bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)// TODO(zea): consider having dynamically-sized buffers if this becomes too
178bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)// expensive.
188bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)const uint32 kDefaultBufferSize = 8*1024;
198bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
208bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}  // namespace
218bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
228bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)SocketInputStream::SocketInputStream(net::StreamSocket* socket)
238bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    : socket_(socket),
248bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
258bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
268bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)                                              kDefaultBufferSize)),
278bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      next_pos_(0),
288bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      last_error_(net::OK),
298bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      weak_ptr_factory_(this) {
308bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK(socket->IsConnected());
318bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
328bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
338bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)SocketInputStream::~SocketInputStream() {
348bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
358bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
368bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)bool SocketInputStream::Next(const void** data, int* size) {
378bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (GetState() != EMPTY && GetState() != READY) {
388bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    NOTREACHED() << "Invalid input stream read attempt.";
398bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return false;
408bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  }
418bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
428bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (GetState() == EMPTY) {
438bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    DVLOG(1) << "No unread data remaining, ending read.";
448bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return false;
458bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  }
468bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
478bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_EQ(GetState(), READY)
488bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      << " Input stream must have pending data before reading.";
498bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_LT(next_pos_, read_buffer_->BytesConsumed());
508bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  *data = io_buffer_->data() + next_pos_;
518bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  *size = UnreadByteCount();
528bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  next_pos_ = read_buffer_->BytesConsumed();
538bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
548bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return true;
558bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
568bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
578bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)void SocketInputStream::BackUp(int count) {
588bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK(GetState() == READY || GetState() == EMPTY);
598bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_GT(count, 0);
608bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_LE(count, next_pos_);
618bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
628bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  next_pos_ -= count;
638bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
648bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)           << "Current position now at " << next_pos_
658bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)           << " of " << read_buffer_->BytesConsumed();
668bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
678bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
688bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)bool SocketInputStream::Skip(int count) {
698bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  NOTIMPLEMENTED();
708bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return false;
718bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
728bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
738bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)int64 SocketInputStream::ByteCount() const {
748bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), CLOSED);
758bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), READING);
768bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return next_pos_;
778bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
788bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
798bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)size_t SocketInputStream::UnreadByteCount() const {
808bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), CLOSED);
818bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), READING);
828bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return read_buffer_->BytesConsumed() - next_pos_;
838bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
848bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
858bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)net::Error SocketInputStream::Refresh(const base::Closure& callback,
868bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)                                      int byte_limit) {
878bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), CLOSED);
888bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), READING);
898bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_GT(byte_limit, 0);
908bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
918bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (byte_limit > read_buffer_->BytesRemaining()) {
928bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    NOTREACHED() << "Out of buffer space, closing input stream.";
938bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    CloseStream(net::ERR_UNEXPECTED, base::Closure());
948bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return net::OK;
958bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  }
968bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
978bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (!socket_->IsConnected()) {
988bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    LOG(ERROR) << "Socket was disconnected, closing input stream";
998bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure());
1008bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return net::OK;
1018bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  }
1028bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1038bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
1048bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  int result = socket_->Read(
1058bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      read_buffer_,
1068bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      byte_limit,
1078bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      base::Bind(&SocketInputStream::RefreshCompletionCallback,
1088bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)                 weak_ptr_factory_.GetWeakPtr(),
1098bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)                 callback));
1108bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DVLOG(1) << "Read returned " << result;
1118bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (result == net::ERR_IO_PENDING) {
1128bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    last_error_ = net::ERR_IO_PENDING;
1138bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return net::ERR_IO_PENDING;
1148bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  }
1158bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1168bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  RefreshCompletionCallback(base::Closure(), result);
1178bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return net::OK;
1188bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
1198bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1208bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)void SocketInputStream::RebuildBuffer() {
1218bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DVLOG(1) << "Rebuilding input stream, consumed "
1228bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)           << next_pos_ << " bytes.";
1238bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), READING);
1248bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), CLOSED);
1258bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1268bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  int unread_data_size = 0;
1278bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  const void* unread_data_ptr = NULL;
1288bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  Next(&unread_data_ptr, &unread_data_size);
1298bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  ResetInternal();
1308bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1318bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (unread_data_ptr != io_buffer_->data()) {
1328bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    DVLOG(1) << "Have " << unread_data_size
1338bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)             << " unread bytes remaining, shifting.";
1348bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    // Move any remaining unread data to the start of the buffer;
1358bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size);
1368bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  } else {
1378bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining.";
1388bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  }
1398bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  read_buffer_->DidConsume(unread_data_size);
1408bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
1418bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1428bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)net::Error SocketInputStream::last_error() const {
1438bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return last_error_;
1448bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
1458bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1468bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)SocketInputStream::State SocketInputStream::GetState() const {
1478bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (last_error_ < net::ERR_IO_PENDING)
1488bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return CLOSED;
1498bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1508bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (last_error_ == net::ERR_IO_PENDING)
1518bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return READING;
1528bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1538bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_EQ(last_error_, net::OK);
1548bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (read_buffer_->BytesConsumed() == next_pos_)
1558bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return EMPTY;
1568bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1578bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return READY;
1588bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
1598bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1608bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)void SocketInputStream::RefreshCompletionCallback(
1618bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    const base::Closure& callback, int result) {
1628bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  // If an error occurred before the completion callback could complete, ignore
1638bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  // the result.
1648bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (GetState() == CLOSED)
1658bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return;
1668bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1678bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  // Result == 0 implies EOF, which is treated as an error.
1688bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (result == 0)
1698bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    result = net::ERR_CONNECTION_CLOSED;
1708bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1718bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(result, net::ERR_IO_PENDING);
1728bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1738bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (result < net::OK) {
1748bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    DVLOG(1) << "Failed to refresh socket: " << result;
1758bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    CloseStream(static_cast<net::Error>(result), callback);
1768bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return;
1778bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  }
1788bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1798bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_GT(result, 0);
1808bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  last_error_ = net::OK;
1818bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  read_buffer_->DidConsume(result);
1828bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1838bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DVLOG(1) << "Refresh complete with " << result << " new bytes. "
1848bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)           << "Current position " << next_pos_
1858bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)           << " of " << read_buffer_->BytesConsumed() << ".";
1868bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1878bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (!callback.is_null())
1888bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    callback.Run();
1898bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
1908bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1918bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)void SocketInputStream::ResetInternal() {
1928bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  read_buffer_->SetOffset(0);
1938bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  next_pos_ = 0;
1948bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  last_error_ = net::OK;
1958bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  weak_ptr_factory_.InvalidateWeakPtrs();  // Invalidate any callbacks.
1968bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
1978bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
1988bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)void SocketInputStream::CloseStream(net::Error error,
1998bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)                                    const base::Closure& callback) {
2008bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_LT(error, net::ERR_IO_PENDING);
2018bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  ResetInternal();
2028bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  last_error_ = error;
2038bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  LOG(ERROR) << "Closing stream with result " << error;
2048bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (!callback.is_null())
2058bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    callback.Run();
2068bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
2078bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2088bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
2098bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    : socket_(socket),
2108bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
2118bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
2128bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)                                               kDefaultBufferSize)),
2138bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      next_pos_(0),
2148bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      last_error_(net::OK),
2158bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      weak_ptr_factory_(this) {
2168bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK(socket->IsConnected());
2178bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
2188bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2198bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)SocketOutputStream::~SocketOutputStream() {
2208bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
2218bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2228bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)bool SocketOutputStream::Next(void** data, int* size) {
2238bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), CLOSED);
2248bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), FLUSHING);
2258bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (next_pos_ == write_buffer_->size())
2268bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return false;
2278bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2288bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  *data = write_buffer_->data() + next_pos_;
2298bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  *size = write_buffer_->size() - next_pos_;
2308bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  next_pos_ = write_buffer_->size();
2318bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return true;
2328bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
2338bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2348bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)void SocketOutputStream::BackUp(int count) {
2358bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_GE(count, 0);
2368bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (count > next_pos_)
2378bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    next_pos_ = 0;
2388bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  next_pos_ -= count;
2398bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
2408bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)           << next_pos_ << " bytes used.";
2418bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
2428bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2438bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)int64 SocketOutputStream::ByteCount() const {
2448bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), CLOSED);
2458bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(GetState(), FLUSHING);
2468bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return next_pos_;
2478bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
2488bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2498bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)net::Error SocketOutputStream::Flush(const base::Closure& callback) {
2508bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_EQ(GetState(), READY);
2518bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2528bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (!socket_->IsConnected()) {
2538bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    LOG(ERROR) << "Socket was disconnected, closing output stream";
2548bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    last_error_ = net::ERR_CONNECTION_CLOSED;
2558bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return net::OK;
2568bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  }
2578bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2588bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DVLOG(1) << "Flushing " << next_pos_ << " bytes into socket.";
2598bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  int result = socket_->Write(
2608bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      write_buffer_,
2618bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      next_pos_,
2628bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      base::Bind(&SocketOutputStream::FlushCompletionCallback,
2638bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)                 weak_ptr_factory_.GetWeakPtr(),
2648bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)                 callback));
2658bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DVLOG(1) << "Write returned " << result;
2668bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (result == net::ERR_IO_PENDING) {
2678bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    last_error_ = net::ERR_IO_PENDING;
2688bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return net::ERR_IO_PENDING;
2698bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  }
2708bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2718bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  FlushCompletionCallback(base::Closure(), result);
2728bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return net::OK;
2738bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
2748bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2758bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)SocketOutputStream::State SocketOutputStream::GetState() const{
2768bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (last_error_ < net::ERR_IO_PENDING)
2778bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return CLOSED;
2788bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2798bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (last_error_ == net::ERR_IO_PENDING)
2808bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return FLUSHING;
2818bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2828bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_EQ(last_error_, net::OK);
2838bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (next_pos_ == 0)
2848bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return EMPTY;
2858bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2868bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return READY;
2878bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
2888bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2898bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)net::Error SocketOutputStream::last_error() const {
2908bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  return last_error_;
2918bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
2928bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
2938bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)void SocketOutputStream::FlushCompletionCallback(
2948bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    const base::Closure& callback, int result) {
2958bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  // If an error occurred before the completion callback could complete, ignore
2968bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  // the result.
2978bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (GetState() == CLOSED)
2988bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return;
2998bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
3008bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  // Result == 0 implies EOF, which is treated as an error.
3018bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (result == 0)
3028bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    result = net::ERR_CONNECTION_CLOSED;
3038bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
3048bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_NE(result, net::ERR_IO_PENDING);
3058bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
3068bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (result < net::OK) {
3078bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    LOG(ERROR) << "Failed to flush socket.";
3088bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    last_error_ = static_cast<net::Error>(result);
3098bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    if (!callback.is_null())
3108bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)      callback.Run();
3118bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return;
3128bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  }
3138bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
3148bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DCHECK_GT(result, net::OK);
3158bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  last_error_ = net::OK;
3168bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
3178bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (write_buffer_->BytesConsumed() + result < next_pos_) {
3188bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    DVLOG(1) << "Partial flush complete. Retrying.";
3198bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)     // Only a partial write was completed. Flush again to finish the write.
3208bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    write_buffer_->DidConsume(result);
3218bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    Flush(callback);
3228bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    return;
3238bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  }
3248bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
3258bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  DVLOG(1) << "Socket flush complete.";
3268bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  write_buffer_->SetOffset(0);
3278bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  next_pos_ = 0;
3288bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)  if (!callback.is_null())
3298bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)    callback.Run();
3308bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}
3318bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)
3328bcbed890bc3ce4d7a057a8f32cab53fa534672eTorne (Richard Coles)}  // namespace gcm
333