1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4//
5// Protobuf ZeroCopy[Input/Output]Stream implementations capable of using a
6// net::StreamSocket. Built to work with Protobuf CodedStreams.
7
8#ifndef GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
9#define GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
10
11#include "base/basictypes.h"
12#include "base/callback_forward.h"
13#include "base/compiler_specific.h"
14#include "base/memory/ref_counted.h"
15#include "base/memory/weak_ptr.h"
16#include "google/protobuf/io/zero_copy_stream.h"
17#include "google_apis/gcm/base/gcm_export.h"
18#include "net/base/net_errors.h"
19
20namespace net {
21class DrainableIOBuffer;
22class IOBuffer;
23class StreamSocket;
24}  // namespace net
25
26namespace gcm {
27
28// A helper class for interacting with a net::StreamSocket that is receiving
29// protobuf encoded messages. A SocketInputStream does not take ownership of
30// the socket itself, and it is expected that the life of the input stream
31// should match the life of the socket itself (while the socket remains
32// connected). If an error is encounters, the input stream will store the error
33// in |last_error_|, and GetState() will be set to CLOSED.
34// Typical usage:
35// 1. Check the GetState() of the input stream before using it. If CLOSED, the
36//    input stream must be rebuilt (and the socket likely needs to be
37//    reconnected as an error was encountered).
38// 2. If GetState() is EMPTY, call Refresh(..), passing the maximum byte size
39//    for a message, and wait until completion. It is invalid to attempt to
40//    Refresh an input stream or read data from the stream while a Refresh is
41//    pending.
42// 3. Check GetState() again to ensure the Refresh was successful.
43// 4. Use a CodedInputStream to read from the ZeroCopyInputStream interface of
44//    the SocketInputStream. Next(..) will return true until there is no data
45//    remaining.
46// 5. Call RebuildBuffer when done reading, to shift any unread data to the
47//    start of the buffer.
48// 6. Repeat as necessary.
49class GCM_EXPORT SocketInputStream
50    : public google::protobuf::io::ZeroCopyInputStream {
51 public:
52  enum State {
53    // No valid data to read. This means the buffer is either empty or all data
54    // in the buffer has already been consumed.
55    EMPTY,
56    // Valid data to read.
57    READY,
58    // In the process of reading new data from the socket.
59    READING,
60    // An permanent error occurred and the stream is now closed.
61    CLOSED,
62  };
63
64  // |socket| should already be connected.
65  explicit SocketInputStream(net::StreamSocket* socket);
66  virtual ~SocketInputStream();
67
68  // ZeroCopyInputStream implementation.
69  virtual bool Next(const void** data, int* size) OVERRIDE;
70  virtual void BackUp(int count) OVERRIDE;
71  virtual bool Skip(int count) OVERRIDE;  // Not implemented.
72  virtual int64 ByteCount() const OVERRIDE;
73
74  // The remaining amount of valid data available to be read.
75  int UnreadByteCount() const;
76
77  // Reads from the socket, appending a max of |byte_limit| bytes onto the read
78  // buffer. net::ERR_IO_PENDING is returned if the refresh can't complete
79  // synchronously, in which case the callback is invoked upon completion. If
80  // the refresh can complete synchronously, even in case of an error, returns
81  // net::OK without invoking callback.
82  // Note: GetState() (and possibly last_error()) should be checked upon
83  // completion to determine whether the Refresh encountered an error.
84  net::Error Refresh(const base::Closure& callback, int byte_limit);
85
86  // Rebuilds the buffer state by copying over any unread data to the beginning
87  // of the buffer and resetting the buffer read/write positions.
88  // Note: it is not valid to call Rebuild() if GetState() == CLOSED. The stream
89  // must be recreated from scratch in such a scenario.
90  void RebuildBuffer();
91
92  // Returns the last fatal error encountered. Only valid if GetState() ==
93  // CLOSED.
94  net::Error last_error() const;
95
96  // Returns the current state.
97  State GetState() const;
98
99 private:
100  // Clears the local state.
101  void ResetInternal();
102
103  // Callback for Socket::Read calls.
104  void RefreshCompletionCallback(const base::Closure& callback, int result);
105
106  // Permanently closes the stream.
107  void CloseStream(net::Error error, const base::Closure& callback);
108
109  // Internal net components.
110  net::StreamSocket* const socket_;
111  const scoped_refptr<net::IOBuffer> io_buffer_;
112  // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't
113  // been written to yet by Socket::Read calls.
114  const scoped_refptr<net::DrainableIOBuffer> read_buffer_;
115
116  // Starting position of the data within |io_buffer_| to consume on subsequent
117  // Next(..) call.  0 <= next_pos_ <= read_buffer_.BytesConsumed()
118  // Note: next_pos == read_buffer_.BytesConsumed() implies GetState() == EMPTY.
119  int next_pos_;
120
121  // If < net::ERR_IO_PENDING, the last net error received.
122  // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == READING.
123  net::Error last_error_;
124
125  base::WeakPtrFactory<SocketInputStream> weak_ptr_factory_;
126
127  DISALLOW_COPY_AND_ASSIGN(SocketInputStream);
128};
129
130// A helper class for writing to a SocketStream with protobuf encoded data.
131// A SocketOutputStream does not take ownership of the socket itself, and it is
132// expected that the life of the output stream should match the life of the
133// socket itself (while the socket remains connected).
134// Typical usage:
135// 1. Check the GetState() of the output stream before using it. If CLOSED, the
136//    output stream must be rebuilt (and the socket likely needs to be
137//    reconnected, as an error was encountered).
138// 2. If EMPTY, the output stream can be written via a CodedOutputStream using
139//    the ZeroCopyOutputStream interface.
140// 3. Once done writing, GetState() should be READY, so call Flush(..) to write
141//    the buffer into the StreamSocket. Wait for the callback to be invoked
142//    (it's invalid to write to an output stream while it's flushing).
143// 4. Check the GetState() again to ensure the Flush was successful. GetState()
144//    should be EMPTY again.
145// 5. Repeat.
146class GCM_EXPORT SocketOutputStream
147    : public google::protobuf::io::ZeroCopyOutputStream {
148 public:
149  enum State {
150    // No valid data yet.
151    EMPTY,
152    // Ready for flushing (some data is present).
153    READY,
154    // In the process of flushing into the socket.
155    FLUSHING,
156    // A permanent error occurred, and the stream is now closed.
157    CLOSED,
158  };
159
160  // |socket| should already be connected.
161  explicit SocketOutputStream(net::StreamSocket* socket);
162  virtual ~SocketOutputStream();
163
164  // ZeroCopyOutputStream implementation.
165  virtual bool Next(void** data, int* size) OVERRIDE;
166  virtual void BackUp(int count) OVERRIDE;
167  virtual int64 ByteCount() const OVERRIDE;
168
169  // Writes the buffer into the Socket.
170  net::Error Flush(const base::Closure& callback);
171
172  // Returns the last fatal error encountered. Only valid if GetState() ==
173  // CLOSED.
174  net::Error last_error() const;
175
176  // Returns the current state.
177  State GetState() const;
178
179 private:
180  void FlushCompletionCallback(const base::Closure& callback, int result);
181
182  // Internal net components.
183  net::StreamSocket* const socket_;
184  const scoped_refptr<net::IOBuffer> io_buffer_;
185  // IOBuffer implementation that wraps the data within |io_buffer_| that hasn't
186  // been written to the socket yet.
187  const scoped_refptr<net::DrainableIOBuffer> write_buffer_;
188
189  // Starting position of the data within |io_buffer_| to consume on subsequent
190  // Next(..) call.  0 <= write_buffer_.BytesConsumed() <= next_pos_
191  // Note: next_pos == 0 implies GetState() == EMPTY.
192  int next_pos_;
193
194  // If < net::ERR_IO_PENDING, the last net error received.
195  // Note: last_error_ == net::ERR_IO_PENDING implies GetState() == FLUSHING.
196  net::Error last_error_;
197
198  base::WeakPtrFactory<SocketOutputStream> weak_ptr_factory_;
199
200  DISALLOW_COPY_AND_ASSIGN(SocketOutputStream);
201};
202
203}  // namespace gcm
204
205#endif  // GOOGLE_APIS_GCM_BASE_SOCKET_STREAM_H_
206