1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "tools/android/forwarder2/forwarder.h"
6
7#include "base/basictypes.h"
8#include "base/logging.h"
9#include "base/posix/eintr_wrapper.h"
10#include "tools/android/forwarder2/socket.h"
11
12namespace forwarder2 {
13namespace {
14
15const int kBufferSize = 32 * 1024;
16
17}  // namespace
18
19
20// Helper class to buffer reads and writes from one socket to another.
21// Each implements a small buffer connected two one input socket, and
22// one output socket.
23//
24//   socket_from_ ---> [BufferedCopier] ---> socket_to_
25//
26// These objects are used in a pair to handle duplex traffic, as in:
27//
28//                    ------> [BufferedCopier_1] --->
29//                  /                                \
30//      socket_1   *                                  * socket_2
31//                  \                                /
32//                   <------ [BufferedCopier_2] <----
33//
34// When a BufferedCopier is in the READING state (see below), it only listens
35// to events on its input socket, and won't detect when its output socket
36// disconnects. To work around this, its peer will call its Close() method
37// when that happens.
38
39class Forwarder::BufferedCopier {
40 public:
41  // Possible states:
42  //    READING - Empty buffer and Waiting for input.
43  //    WRITING - Data in buffer, and waiting for output.
44  //    CLOSING - Like WRITING, but do not try to read after that.
45  //    CLOSED  - Completely closed.
46  //
47  // State transitions are:
48  //
49  //   T01:  READING ---[receive data]---> WRITING
50  //   T02:  READING ---[error on input socket]---> CLOSED
51  //   T03:  READING ---[Close() call]---> CLOSED
52  //
53  //   T04:  WRITING ---[write partial data]---> WRITING
54  //   T05:  WRITING ---[write all data]----> READING
55  //   T06:  WRITING ---[error on output socket]----> CLOSED
56  //   T07:  WRITING ---[Close() call]---> CLOSING
57  //
58  //   T08:  CLOSING ---[write partial data]---> CLOSING
59  //   T09:  CLOSING ---[write all data]----> CLOSED
60  //   T10:  CLOSING ---[Close() call]---> CLOSING
61  //   T11:  CLOSING ---[error on output socket] ---> CLOSED
62  //
63  enum State {
64    STATE_READING = 0,
65    STATE_WRITING = 1,
66    STATE_CLOSING = 2,
67    STATE_CLOSED = 3,
68  };
69
70  // Does NOT own the pointers.
71  BufferedCopier(Socket* socket_from, Socket* socket_to)
72      : socket_from_(socket_from),
73        socket_to_(socket_to),
74        bytes_read_(0),
75        write_offset_(0),
76        peer_(NULL),
77        state_(STATE_READING) {}
78
79  // Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
80  void SetPeer(BufferedCopier* peer) {
81    DCHECK(!peer_);
82    peer_ = peer;
83  }
84
85  bool is_closed() const { return state_ == STATE_CLOSED; }
86
87  // Gently asks to close a buffer. Called either by the peer or the forwarder.
88  void Close() {
89    switch (state_) {
90      case STATE_READING:
91        state_ = STATE_CLOSED;  // T03
92        break;
93      case STATE_WRITING:
94        state_ = STATE_CLOSING;  // T07
95        break;
96      case STATE_CLOSING:
97        break;  // T10
98      case STATE_CLOSED:
99        ;
100    }
101  }
102
103  // Call this before select(). This updates |read_fds|,
104  // |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
105  void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
106    int fd;
107    switch (state_) {
108      case STATE_READING:
109        DCHECK(bytes_read_ == 0);
110        DCHECK(write_offset_ == 0);
111        fd = socket_from_->fd();
112        if (fd < 0) {
113          ForceClose();  // T02
114          return;
115        }
116        FD_SET(fd, read_fds);
117        break;
118
119      case STATE_WRITING:
120      case STATE_CLOSING:
121        DCHECK(bytes_read_ > 0);
122        DCHECK(write_offset_ < bytes_read_);
123        fd = socket_to_->fd();
124        if (fd < 0) {
125          ForceClose();  // T06
126          return;
127        }
128        FD_SET(fd, write_fds);
129        break;
130
131      case STATE_CLOSED:
132        return;
133    }
134    *max_fd = std::max(*max_fd, fd);
135  }
136
137  // Call this after a select() call to operate over the buffer.
138  void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
139    int fd, ret;
140    switch (state_) {
141      case STATE_READING:
142        fd = socket_from_->fd();
143        if (fd < 0) {
144          state_ = STATE_CLOSED;  // T02
145          return;
146        }
147        if (!FD_ISSET(fd, &read_fds))
148          return;
149
150        ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
151        if (ret <= 0) {
152          ForceClose();  // T02
153          return;
154        }
155        bytes_read_ = ret;
156        write_offset_ = 0;
157        state_ = STATE_WRITING;  // T01
158        break;
159
160      case STATE_WRITING:
161      case STATE_CLOSING:
162        fd = socket_to_->fd();
163        if (fd < 0) {
164          ForceClose();  // T06 + T11
165          return;
166        }
167        if (!FD_ISSET(fd, &write_fds))
168          return;
169
170        ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
171                                           bytes_read_ - write_offset_);
172        if (ret <= 0) {
173          ForceClose();  // T06 + T11
174          return;
175        }
176
177        write_offset_ += ret;
178        if (write_offset_ < bytes_read_)
179          return;  // T08 + T04
180
181        write_offset_ = 0;
182        bytes_read_ = 0;
183        if (state_ == STATE_CLOSING) {
184          ForceClose();  // T09
185          return;
186        }
187        state_ = STATE_READING;  // T05
188        break;
189
190      case STATE_CLOSED:
191        ;
192    }
193  }
194
195 private:
196  // Internal method used to close the buffer and notify the peer, if any.
197  void ForceClose() {
198    if (peer_) {
199      peer_->Close();
200      peer_ = NULL;
201    }
202    state_ = STATE_CLOSED;
203  }
204
205  // Not owned.
206  Socket* socket_from_;
207  Socket* socket_to_;
208
209  int bytes_read_;
210  int write_offset_;
211  BufferedCopier* peer_;
212  State state_;
213  char buffer_[kBufferSize];
214
215  DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
216};
217
218Forwarder::Forwarder(scoped_ptr<Socket> socket1,
219                     scoped_ptr<Socket> socket2)
220    : socket1_(socket1.Pass()),
221      socket2_(socket2.Pass()),
222      buffer1_(new BufferedCopier(socket1_.get(), socket2_.get())),
223      buffer2_(new BufferedCopier(socket2_.get(), socket1_.get())) {
224  buffer1_->SetPeer(buffer2_.get());
225  buffer2_->SetPeer(buffer1_.get());
226}
227
228Forwarder::~Forwarder() {
229  DCHECK(thread_checker_.CalledOnValidThread());
230}
231
232void Forwarder::RegisterFDs(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
233  DCHECK(thread_checker_.CalledOnValidThread());
234  buffer1_->PrepareSelect(read_fds, write_fds, max_fd);
235  buffer2_->PrepareSelect(read_fds, write_fds, max_fd);
236}
237
238void Forwarder::ProcessEvents(const fd_set& read_fds, const fd_set& write_fds) {
239  DCHECK(thread_checker_.CalledOnValidThread());
240  buffer1_->ProcessSelect(read_fds, write_fds);
241  buffer2_->ProcessSelect(read_fds, write_fds);
242}
243
244bool Forwarder::IsClosed() const {
245  DCHECK(thread_checker_.CalledOnValidThread());
246  return buffer1_->is_closed() && buffer2_->is_closed();
247}
248
249void Forwarder::Shutdown() {
250  DCHECK(thread_checker_.CalledOnValidThread());
251  buffer1_->Close();
252  buffer2_->Close();
253}
254
255}  // namespace forwarder2
256