asynctcpsocket.cc revision 3345a6884c488ff3a535c2c9acdd33d74b37e311
1/*
2 * libjingle
3 * Copyright 2004--2010, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "talk/base/asynctcpsocket.h"
29
30#include <cstring>
31
32#include "talk/base/byteorder.h"
33#include "talk/base/common.h"
34#include "talk/base/logging.h"
35
36#ifdef POSIX
37#include <errno.h>
38#endif  // POSIX
39
40namespace talk_base {
41
42const size_t MAX_PACKET_SIZE = 64 * 1024;
43
44typedef uint16 PacketLength;
45const size_t PKT_LEN_SIZE = sizeof(PacketLength);
46
47const size_t BUF_SIZE = MAX_PACKET_SIZE + PKT_LEN_SIZE;
48
49AsyncTCPSocket* AsyncTCPSocket::Create(SocketFactory* factory) {
50  AsyncSocket* sock = factory->CreateAsyncSocket(SOCK_STREAM);
51  return (sock) ? new AsyncTCPSocket(sock) : NULL;
52}
53
54AsyncTCPSocket::AsyncTCPSocket(AsyncSocket* socket)
55    : AsyncPacketSocket(socket),
56      insize_(BUF_SIZE),
57      inpos_(0),
58      outsize_(BUF_SIZE),
59      outpos_(0) {
60  inbuf_ = new char[insize_];
61  outbuf_ = new char[outsize_];
62
63  ASSERT(socket_ != NULL);
64  socket_->SignalConnectEvent.connect(this, &AsyncTCPSocket::OnConnectEvent);
65  socket_->SignalReadEvent.connect(this, &AsyncTCPSocket::OnReadEvent);
66  socket_->SignalWriteEvent.connect(this, &AsyncTCPSocket::OnWriteEvent);
67  socket_->SignalCloseEvent.connect(this, &AsyncTCPSocket::OnCloseEvent);
68}
69
70AsyncTCPSocket::~AsyncTCPSocket() {
71  delete [] inbuf_;
72  delete [] outbuf_;
73}
74
75int AsyncTCPSocket::Send(const void *pv, size_t cb) {
76  if (cb > MAX_PACKET_SIZE) {
77    socket_->SetError(EMSGSIZE);
78    return -1;
79  }
80
81  // If we are blocking on send, then silently drop this packet
82  if (outpos_)
83    return static_cast<int>(cb);
84
85  PacketLength pkt_len = HostToNetwork16(static_cast<PacketLength>(cb));
86  memcpy(outbuf_, &pkt_len, PKT_LEN_SIZE);
87  memcpy(outbuf_ + PKT_LEN_SIZE, pv, cb);
88  outpos_ = PKT_LEN_SIZE + cb;
89
90  int res = Flush();
91  if (res <= 0) {
92    // drop packet if we made no progress
93    outpos_ = 0;
94    return res;
95  }
96
97  // We claim to have sent the whole thing, even if we only sent partial
98  return static_cast<int>(cb);
99}
100
101int AsyncTCPSocket::SendTo(const void *pv, size_t cb,
102                           const SocketAddress& addr) {
103  if (addr == GetRemoteAddress())
104    return Send(pv, cb);
105
106  ASSERT(false);
107  socket_->SetError(ENOTCONN);
108  return -1;
109}
110
111int AsyncTCPSocket::SendRaw(const void * pv, size_t cb) {
112  if (outpos_ + cb > outsize_) {
113    socket_->SetError(EMSGSIZE);
114    return -1;
115  }
116
117  memcpy(outbuf_ + outpos_, pv, cb);
118  outpos_ += cb;
119
120  return Flush();
121}
122
123void AsyncTCPSocket::ProcessInput(char * data, size_t& len) {
124  SocketAddress remote_addr(GetRemoteAddress());
125
126  while (true) {
127    if (len < PKT_LEN_SIZE)
128      return;
129
130    PacketLength pkt_len;
131    memcpy(&pkt_len, data, PKT_LEN_SIZE);
132    pkt_len = NetworkToHost16(pkt_len);
133
134    if (len < PKT_LEN_SIZE + pkt_len)
135      return;
136
137    SignalReadPacket(data + PKT_LEN_SIZE, pkt_len, remote_addr, this);
138
139    len -= PKT_LEN_SIZE + pkt_len;
140    if (len > 0) {
141      memmove(data, data + PKT_LEN_SIZE + pkt_len, len);
142    }
143  }
144}
145
146int AsyncTCPSocket::Flush() {
147  int res = socket_->Send(outbuf_, outpos_);
148  if (res <= 0) {
149    return res;
150  }
151  if (static_cast<size_t>(res) <= outpos_) {
152    outpos_ -= res;
153  } else {
154    ASSERT(false);
155    return -1;
156  }
157  if (outpos_ > 0) {
158    memmove(outbuf_, outbuf_ + res, outpos_);
159  }
160  return res;
161}
162
163void AsyncTCPSocket::OnConnectEvent(AsyncSocket* socket) {
164  SignalConnect(this);
165}
166
167void AsyncTCPSocket::OnReadEvent(AsyncSocket* socket) {
168  ASSERT(socket == socket_);
169
170  int len = socket_->Recv(inbuf_ + inpos_, insize_ - inpos_);
171  if (len < 0) {
172    // TODO: Do something better like forwarding the error to the user.
173    if (!socket_->IsBlocking()) {
174      LOG_ERR(LS_ERROR) << "recvfrom";
175    }
176    return;
177  }
178
179  inpos_ += len;
180
181  ProcessInput(inbuf_, inpos_);
182
183  if (inpos_ >= insize_) {
184    LOG(INFO) << "input buffer overflow";
185    ASSERT(false);
186    inpos_ = 0;
187  }
188}
189
190void AsyncTCPSocket::OnWriteEvent(AsyncSocket* socket) {
191  ASSERT(socket == socket_);
192
193  if (outpos_ > 0) {
194    Flush();
195  }
196}
197
198void AsyncTCPSocket::OnCloseEvent(AsyncSocket* socket, int error) {
199  SignalClose(this, error);
200}
201
202}  // namespace talk_base
203