1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "jingle/glue/pseudotcp_adapter.h"
6
7#include "base/compiler_specific.h"
8#include "base/logging.h"
9#include "base/time/time.h"
10#include "net/base/address_list.h"
11#include "net/base/completion_callback.h"
12#include "net/base/io_buffer.h"
13#include "net/base/net_errors.h"
14#include "net/base/net_util.h"
15
16using cricket::PseudoTcp;
17
18namespace {
19const int kReadBufferSize = 65536;  // Maximum size of a packet.
20const uint16 kDefaultMtu = 1280;
21}  // namespace
22
23namespace jingle_glue {
24
25class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify,
26                               public base::RefCounted<Core> {
27 public:
28  Core(net::Socket* socket);
29
30  // Functions used to implement net::StreamSocket.
31  int Read(net::IOBuffer* buffer, int buffer_size,
32           const net::CompletionCallback& callback);
33  int Write(net::IOBuffer* buffer, int buffer_size,
34            const net::CompletionCallback& callback);
35  int Connect(const net::CompletionCallback& callback);
36  void Disconnect();
37  bool IsConnected() const;
38
39  // cricket::IPseudoTcpNotify interface.
40  // These notifications are triggered from NotifyPacket.
41  virtual void OnTcpOpen(cricket::PseudoTcp* tcp) OVERRIDE;
42  virtual void OnTcpReadable(cricket::PseudoTcp* tcp) OVERRIDE;
43  virtual void OnTcpWriteable(cricket::PseudoTcp* tcp) OVERRIDE;
44  // This is triggered by NotifyClock or NotifyPacket.
45  virtual void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) OVERRIDE;
46  // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
47  virtual WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
48                                     const char* buffer, size_t len) OVERRIDE;
49
50  void SetAckDelay(int delay_ms);
51  void SetNoDelay(bool no_delay);
52  void SetReceiveBufferSize(int32 size);
53  void SetSendBufferSize(int32 size);
54  void SetWriteWaitsForSend(bool write_waits_for_send);
55
56  void DeleteSocket();
57
58 private:
59  friend class base::RefCounted<Core>;
60  virtual ~Core();
61
62  // These are invoked by the underlying Socket, and may trigger callbacks.
63  // They hold a reference to |this| while running, to protect from deletion.
64  void OnRead(int result);
65  void OnWritten(int result);
66
67  // These may trigger callbacks, so the holder must hold a reference on
68  // the stack while calling them.
69  void DoReadFromSocket();
70  void HandleReadResults(int result);
71  void HandleTcpClock();
72
73  // Checks if current write has completed in the write-waits-for-send
74  // mode.
75  void CheckWriteComplete();
76
77  // This re-sets |timer| without triggering callbacks.
78  void AdjustClock();
79
80  net::CompletionCallback connect_callback_;
81  net::CompletionCallback read_callback_;
82  net::CompletionCallback write_callback_;
83
84  cricket::PseudoTcp pseudo_tcp_;
85  scoped_ptr<net::Socket> socket_;
86
87  scoped_refptr<net::IOBuffer> read_buffer_;
88  int read_buffer_size_;
89  scoped_refptr<net::IOBuffer> write_buffer_;
90  int write_buffer_size_;
91
92  // Whether we need to wait for data to be sent before completing write.
93  bool write_waits_for_send_;
94
95  // Set to true in the write-waits-for-send mode when we've
96  // successfully writtend data to the send buffer and waiting for the
97  // data to be sent to the remote end.
98  bool waiting_write_position_;
99
100  // Number of the bytes written by the last write stored while we wait
101  // for the data to be sent (i.e. when waiting_write_position_ = true).
102  int last_write_result_;
103
104  bool socket_write_pending_;
105  scoped_refptr<net::IOBuffer> socket_read_buffer_;
106
107  base::OneShotTimer<Core> timer_;
108
109  DISALLOW_COPY_AND_ASSIGN(Core);
110};
111
112
113PseudoTcpAdapter::Core::Core(net::Socket* socket)
114    : pseudo_tcp_(this, 0),
115      socket_(socket),
116      write_waits_for_send_(false),
117      waiting_write_position_(false),
118      socket_write_pending_(false) {
119  // Doesn't trigger callbacks.
120  pseudo_tcp_.NotifyMTU(kDefaultMtu);
121}
122
123PseudoTcpAdapter::Core::~Core() {
124}
125
126int PseudoTcpAdapter::Core::Read(net::IOBuffer* buffer, int buffer_size,
127                                 const net::CompletionCallback& callback) {
128  DCHECK(read_callback_.is_null());
129
130  // Reference the Core in case a callback deletes the adapter.
131  scoped_refptr<Core> core(this);
132
133  int result = pseudo_tcp_.Recv(buffer->data(), buffer_size);
134  if (result < 0) {
135    result = net::MapSystemError(pseudo_tcp_.GetError());
136    DCHECK(result < 0);
137  }
138
139  if (result == net::ERR_IO_PENDING) {
140    read_buffer_ = buffer;
141    read_buffer_size_ = buffer_size;
142    read_callback_ = callback;
143  }
144
145  AdjustClock();
146
147  return result;
148}
149
150int PseudoTcpAdapter::Core::Write(net::IOBuffer* buffer, int buffer_size,
151                                  const net::CompletionCallback& callback) {
152  DCHECK(write_callback_.is_null());
153
154  // Reference the Core in case a callback deletes the adapter.
155  scoped_refptr<Core> core(this);
156
157  int result = pseudo_tcp_.Send(buffer->data(), buffer_size);
158  if (result < 0) {
159    result = net::MapSystemError(pseudo_tcp_.GetError());
160    DCHECK(result < 0);
161  }
162
163  AdjustClock();
164
165  if (result == net::ERR_IO_PENDING) {
166    write_buffer_ = buffer;
167    write_buffer_size_ = buffer_size;
168    write_callback_ = callback;
169    return result;
170  }
171
172  if (result < 0)
173    return result;
174
175  // Need to wait until the data is sent to the peer when
176  // send-confirmation mode is enabled.
177  if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
178    DCHECK(!waiting_write_position_);
179    waiting_write_position_ = true;
180    last_write_result_ = result;
181    write_buffer_ = buffer;
182    write_buffer_size_ = buffer_size;
183    write_callback_ = callback;
184    return net::ERR_IO_PENDING;
185  }
186
187  return result;
188}
189
190int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback& callback) {
191  DCHECK_EQ(pseudo_tcp_.State(), cricket::PseudoTcp::TCP_LISTEN);
192
193  // Reference the Core in case a callback deletes the adapter.
194  scoped_refptr<Core> core(this);
195
196  // Start the connection attempt.
197  int result = pseudo_tcp_.Connect();
198  if (result < 0)
199    return net::ERR_FAILED;
200
201  AdjustClock();
202
203  connect_callback_ = callback;
204  DoReadFromSocket();
205
206  return net::ERR_IO_PENDING;
207}
208
209void PseudoTcpAdapter::Core::Disconnect() {
210  // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket.
211  read_callback_.Reset();
212  read_buffer_ = NULL;
213  write_callback_.Reset();
214  write_buffer_ = NULL;
215  connect_callback_.Reset();
216
217  // TODO(wez): Connect should succeed if called after Disconnect, which
218  // PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp
219  // and create a new one in Connect.
220  // TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other
221  // effect.  This should be addressed in PseudoTcp, really.
222  // In the meantime we can fake OnTcpClosed notification and tear down the
223  // PseudoTcp.
224  pseudo_tcp_.Close(true);
225}
226
227bool PseudoTcpAdapter::Core::IsConnected() const {
228  return pseudo_tcp_.State() == PseudoTcp::TCP_ESTABLISHED;
229}
230
231void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) {
232  DCHECK(tcp == &pseudo_tcp_);
233
234  if (!connect_callback_.is_null()) {
235    net::CompletionCallback callback = connect_callback_;
236    connect_callback_.Reset();
237    callback.Run(net::OK);
238  }
239
240  OnTcpReadable(tcp);
241  OnTcpWriteable(tcp);
242}
243
244void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) {
245  DCHECK_EQ(tcp, &pseudo_tcp_);
246  if (read_callback_.is_null())
247    return;
248
249  int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_);
250  if (result < 0) {
251    result = net::MapSystemError(pseudo_tcp_.GetError());
252    DCHECK(result < 0);
253    if (result == net::ERR_IO_PENDING)
254      return;
255  }
256
257  AdjustClock();
258
259  net::CompletionCallback callback = read_callback_;
260  read_callback_.Reset();
261  read_buffer_ = NULL;
262  callback.Run(result);
263}
264
265void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) {
266  DCHECK_EQ(tcp, &pseudo_tcp_);
267  if (write_callback_.is_null())
268    return;
269
270  if (waiting_write_position_) {
271    CheckWriteComplete();
272    return;
273  }
274
275  int result = pseudo_tcp_.Send(write_buffer_->data(), write_buffer_size_);
276  if (result < 0) {
277    result = net::MapSystemError(pseudo_tcp_.GetError());
278    DCHECK(result < 0);
279    if (result == net::ERR_IO_PENDING)
280      return;
281  }
282
283  AdjustClock();
284
285  if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
286    DCHECK(!waiting_write_position_);
287    waiting_write_position_ = true;
288    last_write_result_ = result;
289    return;
290  }
291
292  net::CompletionCallback callback = write_callback_;
293  write_callback_.Reset();
294  write_buffer_ = NULL;
295  callback.Run(result);
296}
297
298void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32 error) {
299  DCHECK_EQ(tcp, &pseudo_tcp_);
300
301  if (!connect_callback_.is_null()) {
302    net::CompletionCallback callback = connect_callback_;
303    connect_callback_.Reset();
304    callback.Run(net::MapSystemError(error));
305  }
306
307  if (!read_callback_.is_null()) {
308    net::CompletionCallback callback = read_callback_;
309    read_callback_.Reset();
310    callback.Run(net::MapSystemError(error));
311  }
312
313  if (!write_callback_.is_null()) {
314    net::CompletionCallback callback = write_callback_;
315    write_callback_.Reset();
316    callback.Run(net::MapSystemError(error));
317  }
318}
319
320void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms) {
321  pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_ACKDELAY, delay_ms);
322}
323
324void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay) {
325  pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_NODELAY, no_delay ? 1 : 0);
326}
327
328void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32 size) {
329  pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_RCVBUF, size);
330}
331
332void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) {
333  pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size);
334}
335
336void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) {
337  write_waits_for_send_ = write_waits_for_send;
338}
339
340void PseudoTcpAdapter::Core::DeleteSocket() {
341  socket_.reset();
342}
343
344cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket(
345    PseudoTcp* tcp,
346    const char* buffer,
347    size_t len) {
348  DCHECK_EQ(tcp, &pseudo_tcp_);
349
350  // If we already have a write pending, we behave like a congested network,
351  // returning success for the write, but dropping the packet.  PseudoTcp will
352  // back-off and retransmit, adjusting for the perceived congestion.
353  if (socket_write_pending_)
354    return IPseudoTcpNotify::WR_SUCCESS;
355
356  scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len);
357  memcpy(write_buffer->data(), buffer, len);
358
359  // Our underlying socket is datagram-oriented, which means it should either
360  // send exactly as many bytes as we requested, or fail.
361  int result;
362  if (socket_.get()) {
363    result = socket_->Write(
364        write_buffer.get(),
365        len,
366        base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this)));
367  } else {
368    result = net::ERR_CONNECTION_CLOSED;
369  }
370  if (result == net::ERR_IO_PENDING) {
371    socket_write_pending_ = true;
372    return IPseudoTcpNotify::WR_SUCCESS;
373  } else if (result == net::ERR_MSG_TOO_BIG) {
374    return IPseudoTcpNotify::WR_TOO_LARGE;
375  } else if (result < 0) {
376    return IPseudoTcpNotify::WR_FAIL;
377  } else {
378    return IPseudoTcpNotify::WR_SUCCESS;
379  }
380}
381
382void PseudoTcpAdapter::Core::DoReadFromSocket() {
383  if (!socket_read_buffer_.get())
384    socket_read_buffer_ = new net::IOBuffer(kReadBufferSize);
385
386  int result = 1;
387  while (socket_.get() && result > 0) {
388    result = socket_->Read(
389        socket_read_buffer_.get(),
390        kReadBufferSize,
391        base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this)));
392    if (result != net::ERR_IO_PENDING)
393      HandleReadResults(result);
394  }
395}
396
397void PseudoTcpAdapter::Core::HandleReadResults(int result) {
398  if (result <= 0) {
399    LOG(ERROR) << "Read returned " << result;
400    return;
401  }
402
403  // TODO(wez): Disconnect on failure of NotifyPacket?
404  pseudo_tcp_.NotifyPacket(socket_read_buffer_->data(), result);
405  AdjustClock();
406
407  CheckWriteComplete();
408}
409
410void PseudoTcpAdapter::Core::OnRead(int result) {
411  // Reference the Core in case a callback deletes the adapter.
412  scoped_refptr<Core> core(this);
413
414  HandleReadResults(result);
415  if (result >= 0)
416    DoReadFromSocket();
417}
418
419void PseudoTcpAdapter::Core::OnWritten(int result) {
420  // Reference the Core in case a callback deletes the adapter.
421  scoped_refptr<Core> core(this);
422
423  socket_write_pending_ = false;
424  if (result < 0) {
425    LOG(WARNING) << "Write failed. Error code: " << result;
426  }
427}
428
429void PseudoTcpAdapter::Core::AdjustClock() {
430  long timeout = 0;
431  if (pseudo_tcp_.GetNextClock(PseudoTcp::Now(), timeout)) {
432    timer_.Stop();
433    timer_.Start(FROM_HERE,
434                 base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this,
435                 &PseudoTcpAdapter::Core::HandleTcpClock);
436  }
437}
438
439void PseudoTcpAdapter::Core::HandleTcpClock() {
440  // Reference the Core in case a callback deletes the adapter.
441  scoped_refptr<Core> core(this);
442
443  pseudo_tcp_.NotifyClock(PseudoTcp::Now());
444  AdjustClock();
445
446  CheckWriteComplete();
447}
448
449void PseudoTcpAdapter::Core::CheckWriteComplete() {
450  if (!write_callback_.is_null() && waiting_write_position_) {
451    if (pseudo_tcp_.GetBytesBufferedNotSent() == 0) {
452      waiting_write_position_ = false;
453
454      net::CompletionCallback callback = write_callback_;
455      write_callback_.Reset();
456      write_buffer_ = NULL;
457      callback.Run(last_write_result_);
458    }
459  }
460}
461
462// Public interface implemention.
463
464PseudoTcpAdapter::PseudoTcpAdapter(net::Socket* socket)
465    : core_(new Core(socket)) {
466}
467
468PseudoTcpAdapter::~PseudoTcpAdapter() {
469  Disconnect();
470
471  // Make sure that the underlying socket is destroyed before PseudoTcp.
472  core_->DeleteSocket();
473}
474
475int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size,
476                           const net::CompletionCallback& callback) {
477  DCHECK(CalledOnValidThread());
478  return core_->Read(buffer, buffer_size, callback);
479}
480
481int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size,
482                            const net::CompletionCallback& callback) {
483  DCHECK(CalledOnValidThread());
484  return core_->Write(buffer, buffer_size, callback);
485}
486
487int PseudoTcpAdapter::SetReceiveBufferSize(int32 size) {
488  DCHECK(CalledOnValidThread());
489
490  core_->SetReceiveBufferSize(size);
491  return net::OK;
492}
493
494int PseudoTcpAdapter::SetSendBufferSize(int32 size) {
495  DCHECK(CalledOnValidThread());
496
497  core_->SetSendBufferSize(size);
498  return net::OK;
499}
500
501int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) {
502  DCHECK(CalledOnValidThread());
503
504  // net::StreamSocket requires that Connect return OK if already connected.
505  if (IsConnected())
506    return net::OK;
507
508  return core_->Connect(callback);
509}
510
511void PseudoTcpAdapter::Disconnect() {
512  DCHECK(CalledOnValidThread());
513  core_->Disconnect();
514}
515
516bool PseudoTcpAdapter::IsConnected() const {
517  return core_->IsConnected();
518}
519
520bool PseudoTcpAdapter::IsConnectedAndIdle() const {
521  DCHECK(CalledOnValidThread());
522  NOTIMPLEMENTED();
523  return false;
524}
525
526int PseudoTcpAdapter::GetPeerAddress(net::IPEndPoint* address) const {
527  DCHECK(CalledOnValidThread());
528
529  // We don't have a meaningful peer address, but we can't return an
530  // error, so we return a INADDR_ANY instead.
531  net::IPAddressNumber ip_address(net::kIPv4AddressSize);
532  *address = net::IPEndPoint(ip_address, 0);
533  return net::OK;
534}
535
536int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const {
537  DCHECK(CalledOnValidThread());
538  NOTIMPLEMENTED();
539  return net::ERR_FAILED;
540}
541
542const net::BoundNetLog& PseudoTcpAdapter::NetLog() const {
543  DCHECK(CalledOnValidThread());
544  return net_log_;
545}
546
547void PseudoTcpAdapter::SetSubresourceSpeculation() {
548  DCHECK(CalledOnValidThread());
549  NOTIMPLEMENTED();
550}
551
552void PseudoTcpAdapter::SetOmniboxSpeculation() {
553  DCHECK(CalledOnValidThread());
554  NOTIMPLEMENTED();
555}
556
557bool PseudoTcpAdapter::WasEverUsed() const {
558  DCHECK(CalledOnValidThread());
559  NOTIMPLEMENTED();
560  return true;
561}
562
563bool PseudoTcpAdapter::UsingTCPFastOpen() const {
564  DCHECK(CalledOnValidThread());
565  return false;
566}
567
568bool PseudoTcpAdapter::WasNpnNegotiated() const {
569  DCHECK(CalledOnValidThread());
570  return false;
571}
572
573net::NextProto PseudoTcpAdapter::GetNegotiatedProtocol() const {
574  DCHECK(CalledOnValidThread());
575  return net::kProtoUnknown;
576}
577
578bool PseudoTcpAdapter::GetSSLInfo(net::SSLInfo* ssl_info) {
579  DCHECK(CalledOnValidThread());
580  return false;
581}
582
583void PseudoTcpAdapter::SetAckDelay(int delay_ms) {
584  DCHECK(CalledOnValidThread());
585  core_->SetAckDelay(delay_ms);
586}
587
588void PseudoTcpAdapter::SetNoDelay(bool no_delay) {
589  DCHECK(CalledOnValidThread());
590  core_->SetNoDelay(no_delay);
591}
592
593void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) {
594  DCHECK(CalledOnValidThread());
595  core_->SetWriteWaitsForSend(write_waits_for_send);
596}
597
598}  // namespace jingle_glue
599