1/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "talk/p2p/base/tcpport.h"
29
30#include "talk/base/common.h"
31#include "talk/base/logging.h"
32#include "talk/p2p/base/common.h"
33
34namespace cricket {
35
36TCPPort::TCPPort(talk_base::Thread* thread,
37                 talk_base::PacketSocketFactory* factory,
38                 talk_base::Network* network, uint32 ip,
39                 int min_port, int max_port, bool allow_listen)
40    : Port(thread, LOCAL_PORT_TYPE, factory, network, ip, min_port, max_port),
41      incoming_only_(false),
42      allow_listen_(allow_listen),
43      socket_(NULL),
44      error_(0) {
45}
46
47bool TCPPort::Init() {
48  // Treat failure to create or bind a TCP socket as fatal.  This
49  // should never happen.
50  socket_ = factory_->CreateServerTcpSocket(
51      talk_base::SocketAddress(ip_, 0), min_port_, max_port_, allow_listen_,
52      false /* ssl */);
53  if (!socket_) {
54    LOG_J(LS_ERROR, this) << "TCP socket creation failed.";
55    return false;
56  }
57  socket_->SignalNewConnection.connect(this, &TCPPort::OnNewConnection);
58  return true;
59}
60
61TCPPort::~TCPPort() {
62  delete socket_;
63}
64
65Connection* TCPPort::CreateConnection(const Candidate& address,
66                                      CandidateOrigin origin) {
67  // We only support TCP protocols
68  if ((address.protocol() != "tcp") && (address.protocol() != "ssltcp"))
69    return NULL;
70
71  // We can't accept TCP connections incoming on other ports
72  if (origin == ORIGIN_OTHER_PORT)
73    return NULL;
74
75  // Check if we are allowed to make outgoing TCP connections
76  if (incoming_only_ && (origin == ORIGIN_MESSAGE))
77    return NULL;
78
79  // We don't know how to act as an ssl server yet
80  if ((address.protocol() == "ssltcp") && (origin == ORIGIN_THIS_PORT))
81    return NULL;
82
83  TCPConnection* conn = NULL;
84  if (talk_base::AsyncPacketSocket* socket =
85      GetIncoming(address.address(), true)) {
86    socket->SignalReadPacket.disconnect(this);
87    conn = new TCPConnection(this, address, socket);
88  } else {
89    conn = new TCPConnection(this, address);
90  }
91  AddConnection(conn);
92  return conn;
93}
94
95void TCPPort::PrepareAddress() {
96  if (!allow_listen_) {
97    LOG_J(LS_INFO, this) << "Not listening due to firewall restrictions.";
98  }
99  // Note: We still add the address, since otherwise the remote side won't
100  // recognize our incoming TCP connections.
101  bool allocated;
102  talk_base::SocketAddress address = socket_->GetLocalAddress(&allocated);
103  if (allocated) {
104    AddAddress(address, "tcp", true);
105  } else {
106    socket_->SignalAddressReady.connect(this, &TCPPort::OnAddresReady);
107  }
108}
109
110int TCPPort::SendTo(const void* data, size_t size,
111                    const talk_base::SocketAddress& addr, bool payload) {
112  talk_base::AsyncPacketSocket * socket = NULL;
113  if (TCPConnection * conn = static_cast<TCPConnection*>(GetConnection(addr))) {
114    socket = conn->socket();
115  } else {
116    socket = GetIncoming(addr);
117  }
118  if (!socket) {
119    LOG_J(LS_ERROR, this) << "Attempted to send to an unknown destination, "
120                          << addr.ToString();
121    return -1;  // TODO: Set error_
122  }
123
124  int sent = socket->Send(data, size);
125  if (sent < 0) {
126    error_ = socket->GetError();
127    LOG_J(LS_ERROR, this) << "TCP send of " << size
128                          << " bytes failed with error " << error_;
129  }
130  return sent;
131}
132
133int TCPPort::SetOption(talk_base::Socket::Option opt, int value) {
134  return socket_->SetOption(opt, value);
135}
136
137int TCPPort::GetError() {
138  return error_;
139}
140
141void TCPPort::OnNewConnection(talk_base::AsyncPacketSocket* socket,
142                              talk_base::AsyncPacketSocket* new_socket) {
143  ASSERT(socket == socket_);
144
145  Incoming incoming;
146  incoming.addr = new_socket->GetRemoteAddress();
147  incoming.socket = new_socket;
148  incoming.socket->SignalReadPacket.connect(this, &TCPPort::OnReadPacket);
149
150  LOG_J(LS_VERBOSE, this) << "Accepted connection from "
151                          << incoming.addr.ToString();
152  incoming_.push_back(incoming);
153}
154
155talk_base::AsyncPacketSocket* TCPPort::GetIncoming(
156    const talk_base::SocketAddress& addr, bool remove) {
157  talk_base::AsyncPacketSocket* socket = NULL;
158  for (std::list<Incoming>::iterator it = incoming_.begin();
159       it != incoming_.end(); ++it) {
160    if (it->addr == addr) {
161      socket = it->socket;
162      if (remove)
163        incoming_.erase(it);
164      break;
165    }
166  }
167  return socket;
168}
169
170void TCPPort::OnReadPacket(talk_base::AsyncPacketSocket* socket,
171                           const char* data, size_t size,
172                           const talk_base::SocketAddress& remote_addr) {
173  Port::OnReadPacket(data, size, remote_addr);
174}
175
176void TCPPort::OnAddresReady(talk_base::AsyncPacketSocket* socket,
177                            const talk_base::SocketAddress& address) {
178  AddAddress(address, "tcp", true);
179}
180
181TCPConnection::TCPConnection(TCPPort* port, const Candidate& candidate,
182                             talk_base::AsyncPacketSocket* socket)
183    : Connection(port, 0, candidate), socket_(socket), error_(0) {
184  bool outgoing = (socket_ == NULL);
185  if (outgoing) {
186    // TODO: Handle failures here (unlikely since TCP).
187
188    socket_ = port->socket_factory()->CreateClientTcpSocket(
189        talk_base::SocketAddress(port_->network()->ip(), 0),
190        candidate.address(), port->proxy(), port->user_agent(),
191        candidate.protocol() == "ssltcp");
192    if (socket_) {
193      LOG_J(LS_VERBOSE, this) << "Connecting from "
194                              << socket_->GetLocalAddress(NULL).ToString()
195                              << " to " << candidate.address().ToString();
196      set_connected(false);
197      socket_->SignalConnect.connect(this, &TCPConnection::OnConnect);
198    } else {
199      LOG_J(LS_WARNING, this) << "Failed to create connection to "
200                              << candidate.address().ToString();
201    }
202  } else {
203    // Incoming connections should match the network address.
204    ASSERT(socket_->GetLocalAddress(NULL).ip() == port->ip_);
205  }
206
207  if (socket_) {
208    socket_->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket);
209    socket_->SignalClose.connect(this, &TCPConnection::OnClose);
210  }
211}
212
213TCPConnection::~TCPConnection() {
214  delete socket_;
215}
216
217int TCPConnection::Send(const void* data, size_t size) {
218  if (!socket_) {
219    error_ = ENOTCONN;
220    return SOCKET_ERROR;
221  }
222
223  if (write_state() != STATE_WRITABLE) {
224    // TODO: Should STATE_WRITE_TIMEOUT return a non-blocking error?
225    error_ = EWOULDBLOCK;
226    return SOCKET_ERROR;
227  }
228  int sent = socket_->Send(data, size);
229  if (sent < 0) {
230    error_ = socket_->GetError();
231  } else {
232    send_rate_tracker_.Update(sent);
233  }
234  return sent;
235}
236
237int TCPConnection::GetError() {
238  return error_;
239}
240
241void TCPConnection::OnConnect(talk_base::AsyncPacketSocket* socket) {
242  ASSERT(socket == socket_);
243  LOG_J(LS_VERBOSE, this) << "Connection established to "
244                          << socket->GetRemoteAddress().ToString();
245  set_connected(true);
246}
247
248void TCPConnection::OnClose(talk_base::AsyncPacketSocket* socket, int error) {
249  ASSERT(socket == socket_);
250  LOG_J(LS_VERBOSE, this) << "Connection closed with error " << error;
251  set_connected(false);
252  set_write_state(STATE_WRITE_TIMEOUT);
253}
254
255void TCPConnection::OnReadPacket(talk_base::AsyncPacketSocket* socket,
256                                 const char* data, size_t size,
257                                 const talk_base::SocketAddress& remote_addr) {
258  ASSERT(socket == socket_);
259  Connection::OnReadPacket(data, size);
260}
261
262}  // namespace cricket
263