1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4// 5// Protobuf ZeroCopy[Input/Output]Stream implementations capable of using a 6// net::StreamSocket. Built to work with Protobuf CodedStreams. 7 8#ifndef GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_ 9#define GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_ 10 11#include "base/basictypes.h" 12#include "base/callback_forward.h" 13#include "base/compiler_specific.h" 14#include "base/memory/ref_counted.h" 15#include "base/memory/weak_ptr.h" 16#include "google/protobuf/io/zero_copy_stream.h" 17#include "google_apis/gcm/base/gcm_export.h" 18#include "net/base/net_errors.h" 19 20namespace net { 21class DrainableIOBuffer; 22class IOBuffer; 23class StreamSocket; 24} // namespace net 25 26namespace gcm { 27 28// A helper class for interacting with a net::StreamSocket that is receiving 29// protobuf encoded messages. A SocketInputStream does not take ownership of 30// the socket itself, and it is expected that the life of the input stream 31// should match the life of the socket itself (while the socket remains 32// connected). If an error is encounters, the input stream will store the error 33// in |last_error_|, and GetState() will be set to CLOSED. 34// Typical usage: 35// 1. Check the GetState() of the input stream before using it. If CLOSED, the 36// input stream must be rebuilt (and the socket likely needs to be 37// reconnected as an error was encountered). 38// 2. If GetState() is EMPTY, call Refresh(..), passing the maximum byte size 39// for a message, and wait until completion. It is invalid to attempt to 40// Refresh an input stream or read data from the stream while a Refresh is 41// pending. 42// 3. Check GetState() again to ensure the Refresh was successful. 43// 4. Use a CodedInputStream to read from the ZeroCopyInputStream interface of 44// the SocketInputStream. Next(..) will return true until there is no data 45// remaining. 46// 5. Call RebuildBuffer when done reading, to shift any unread data to the 47// start of the buffer. 48// 6. Repeat as necessary. 49class GCM_EXPORT SocketInputStream 50 : public google::protobuf::io::ZeroCopyInputStream { 51 public: 52 enum State { 53 // No valid data to read. This means the buffer is either empty or all data 54 // in the buffer has already been consumed. 55 EMPTY, 56 // Valid data to read. 57 READY, 58 // In the process of reading new data from the socket. 59 READING, 60 // An permanent error occurred and the stream is now closed. 61 CLOSED, 62 }; 63 64 // |socket| should already be connected. 65 explicit SocketInputStream(net::StreamSocket* socket); 66 virtual ~SocketInputStream(); 67 68 // ZeroCopyInputStream implementation. 69 virtual bool Next(const void** data, int* size) OVERRIDE; 70 virtual void BackUp(int count) OVERRIDE; 71 virtual bool Skip(int count) OVERRIDE; // Not implemented. 72 virtual int64 ByteCount() const OVERRIDE; 73 74 // The remaining amount of valid data available to be read. 75 int UnreadByteCount() const; 76 77 // Reads from the socket, appending a max of |byte_limit| bytes onto the read 78 // buffer. net::ERR_IO_PENDING is returned if the refresh can't complete 79 // synchronously, in which case the callback is invoked upon completion. If 80 // the refresh can complete synchronously, even in case of an error, returns 81 // net::OK without invoking callback. 82 // Note: GetState() (and possibly last_error()) should be checked upon 83 // completion to determine whether the Refresh encountered an error. 84 net::Error Refresh(const base::Closure& callback, int byte_limit); 85 86 // Rebuilds the buffer state by copying over any unread data to the beginning 87 // of the buffer and resetting the buffer read/write positions. 88 // Note: it is not valid to call Rebuild() if GetState() == CLOSED. The stream 89 // must be recreated from scratch in such a scenario. 90 void RebuildBuffer(); 91 92 // Returns the last fatal error encountered. Only valid if GetState() == 93 // CLOSED. 94 net::Error last_error() const; 95 96 // Returns the current state. 97 State GetState() const; 98 99 private: 100 // Clears the local state. 101 void ResetInternal(); 102 103 // Callback for Socket::Read calls. 104 void RefreshCompletionCallback(const base::Closure& callback, int result); 105 106 // Permanently closes the stream. 107 void CloseStream(net::Error error, const base::Closure& callback); 108 109 // Internal net components. 110 net::StreamSocket* const socket_; 111 const scoped_refptr<net::IOBuffer> io_buffer_; 112 // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't 113 // been written to yet by Socket::Read calls. 114 const scoped_refptr<net::DrainableIOBuffer> read_buffer_; 115 116 // Starting position of the data within |io_buffer_| to consume on subsequent 117 // Next(..) call. 0 <= next_pos_ <= read_buffer_.BytesConsumed() 118 // Note: next_pos == read_buffer_.BytesConsumed() implies GetState() == EMPTY. 119 int next_pos_; 120 121 // If < net::ERR_IO_PENDING, the last net error received. 122 // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == READING. 123 net::Error last_error_; 124 125 base::WeakPtrFactory<SocketInputStream> weak_ptr_factory_; 126 127 DISALLOW_COPY_AND_ASSIGN(SocketInputStream); 128}; 129 130// A helper class for writing to a SocketStream with protobuf encoded data. 131// A SocketOutputStream does not take ownership of the socket itself, and it is 132// expected that the life of the output stream should match the life of the 133// socket itself (while the socket remains connected). 134// Typical usage: 135// 1. Check the GetState() of the output stream before using it. If CLOSED, the 136// output stream must be rebuilt (and the socket likely needs to be 137// reconnected, as an error was encountered). 138// 2. If EMPTY, the output stream can be written via a CodedOutputStream using 139// the ZeroCopyOutputStream interface. 140// 3. Once done writing, GetState() should be READY, so call Flush(..) to write 141// the buffer into the StreamSocket. Wait for the callback to be invoked 142// (it's invalid to write to an output stream while it's flushing). 143// 4. Check the GetState() again to ensure the Flush was successful. GetState() 144// should be EMPTY again. 145// 5. Repeat. 146class GCM_EXPORT SocketOutputStream 147 : public google::protobuf::io::ZeroCopyOutputStream { 148 public: 149 enum State { 150 // No valid data yet. 151 EMPTY, 152 // Ready for flushing (some data is present). 153 READY, 154 // In the process of flushing into the socket. 155 FLUSHING, 156 // A permanent error occurred, and the stream is now closed. 157 CLOSED, 158 }; 159 160 // |socket| should already be connected. 161 explicit SocketOutputStream(net::StreamSocket* socket); 162 virtual ~SocketOutputStream(); 163 164 // ZeroCopyOutputStream implementation. 165 virtual bool Next(void** data, int* size) OVERRIDE; 166 virtual void BackUp(int count) OVERRIDE; 167 virtual int64 ByteCount() const OVERRIDE; 168 169 // Writes the buffer into the Socket. 170 net::Error Flush(const base::Closure& callback); 171 172 // Returns the last fatal error encountered. Only valid if GetState() == 173 // CLOSED. 174 net::Error last_error() const; 175 176 // Returns the current state. 177 State GetState() const; 178 179 private: 180 void FlushCompletionCallback(const base::Closure& callback, int result); 181 182 // Internal net components. 183 net::StreamSocket* const socket_; 184 const scoped_refptr<net::IOBuffer> io_buffer_; 185 // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't 186 // been written to the socket yet. 187 const scoped_refptr<net::DrainableIOBuffer> write_buffer_; 188 189 // Starting position of the data within |io_buffer_| to consume on subsequent 190 // Next(..) call. 0 <= write_buffer_.BytesConsumed() <= next_pos_ 191 // Note: next_pos == 0 implies GetState() == EMPTY. 192 int next_pos_; 193 194 // If < net::ERR_IO_PENDING, the last net error received. 195 // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == FLUSHING. 196 net::Error last_error_; 197 198 base::WeakPtrFactory<SocketOutputStream> weak_ptr_factory_; 199 200 DISALLOW_COPY_AND_ASSIGN(SocketOutputStream); 201}; 202 203} // namespace gcm 204 205#endif // GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_ 206