1/*
2 * libjingle
3 * Copyright 2004--2006, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include <string>
29#include "talk/base/basictypes.h"
30#include "talk/base/common.h"
31#include "talk/base/logging.h"
32#include "talk/base/scoped_ptr.h"
33#include "talk/base/stringutils.h"
34#include "talk/p2p/base/transportchannel.h"
35#include "pseudotcpchannel.h"
36
37using namespace talk_base;
38
39namespace cricket {
40
41extern const talk_base::ConstantLabel SESSION_STATES[];
42
43// MSG_WK_* - worker thread messages
44// MSG_ST_* - stream thread messages
45// MSG_SI_* - signal thread messages
46
47enum {
48  MSG_WK_CLOCK = 1,
49  MSG_WK_PURGE,
50  MSG_ST_EVENT,
51  MSG_SI_DESTROYCHANNEL,
52  MSG_SI_DESTROY,
53};
54
55struct EventData : public MessageData {
56  int event, error;
57  EventData(int ev, int err = 0) : event(ev), error(err) { }
58};
59
60///////////////////////////////////////////////////////////////////////////////
61// PseudoTcpChannel::InternalStream
62///////////////////////////////////////////////////////////////////////////////
63
64class PseudoTcpChannel::InternalStream : public StreamInterface {
65public:
66  InternalStream(PseudoTcpChannel* parent);
67  virtual ~InternalStream();
68
69  virtual StreamState GetState() const;
70  virtual StreamResult Read(void* buffer, size_t buffer_len,
71                                       size_t* read, int* error);
72  virtual StreamResult Write(const void* data, size_t data_len,
73                                        size_t* written, int* error);
74  virtual void Close();
75
76private:
77  // parent_ is accessed and modified exclusively on the event thread, to
78  // avoid thread contention.  This means that the PseudoTcpChannel cannot go
79  // away until after it receives a Close() from TunnelStream.
80  PseudoTcpChannel* parent_;
81};
82
83///////////////////////////////////////////////////////////////////////////////
84// PseudoTcpChannel
85// Member object lifetime summaries:
86//   session_ - passed in constructor, cleared when channel_ goes away.
87//   channel_ - created in Connect, destroyed when session_ or tcp_ goes away.
88//   tcp_ - created in Connect, destroyed when channel_ goes away, or connection
89//     closes.
90//   worker_thread_ - created when channel_ is created, purged when channel_ is
91//     destroyed.
92//   stream_ - created in GetStream, destroyed by owner at arbitrary time.
93//   this - created in constructor, destroyed when worker_thread_ and stream_
94//     are both gone.
95///////////////////////////////////////////////////////////////////////////////
96
97//
98// Signal thread methods
99//
100
101PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session)
102  : signal_thread_(session->session_manager()->signaling_thread()),
103    worker_thread_(NULL),
104    stream_thread_(stream_thread),
105    session_(session), channel_(NULL), tcp_(NULL), stream_(NULL),
106    stream_readable_(false), pending_read_event_(false),
107    ready_to_connect_(false) {
108  ASSERT(signal_thread_->IsCurrent());
109  ASSERT(NULL != session_);
110}
111
112PseudoTcpChannel::~PseudoTcpChannel() {
113  ASSERT(signal_thread_->IsCurrent());
114  ASSERT(worker_thread_ == NULL);
115  ASSERT(session_ == NULL);
116  ASSERT(channel_ == NULL);
117  ASSERT(stream_ == NULL);
118  ASSERT(tcp_ == NULL);
119}
120
121bool PseudoTcpChannel::Connect(const std::string& content_name,
122                               const std::string& channel_name) {
123  ASSERT(signal_thread_->IsCurrent());
124  CritScope lock(&cs_);
125
126  if (channel_)
127    return false;
128
129  ASSERT(session_ != NULL);
130  worker_thread_ = session_->session_manager()->worker_thread();
131  content_name_ = content_name;
132  channel_ = session_->CreateChannel(content_name, channel_name);
133  channel_name_ = channel_name;
134  channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1);
135
136  channel_->SignalDestroyed.connect(this,
137    &PseudoTcpChannel::OnChannelDestroyed);
138  channel_->SignalWritableState.connect(this,
139    &PseudoTcpChannel::OnChannelWritableState);
140  channel_->SignalReadPacket.connect(this,
141    &PseudoTcpChannel::OnChannelRead);
142  channel_->SignalRouteChange.connect(this,
143    &PseudoTcpChannel::OnChannelConnectionChanged);
144
145  ASSERT(tcp_ == NULL);
146  tcp_ = new PseudoTcp(this, 0);
147  if (session_->initiator()) {
148    // Since we may try several protocols and network adapters that won't work,
149    // waiting until we get our first writable notification before initiating
150    // TCP negotiation.
151    ready_to_connect_ = true;
152  }
153
154  return true;
155}
156
157StreamInterface* PseudoTcpChannel::GetStream() {
158  ASSERT(signal_thread_->IsCurrent());
159  CritScope lock(&cs_);
160  ASSERT(NULL != session_);
161  if (!stream_)
162    stream_ = new PseudoTcpChannel::InternalStream(this);
163  //TODO("should we disallow creation of new stream at some point?");
164  return stream_;
165}
166
167void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) {
168  LOG_F(LS_INFO) << "(" << channel->name() << ")";
169  ASSERT(signal_thread_->IsCurrent());
170  CritScope lock(&cs_);
171  ASSERT(channel == channel_);
172  signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL);
173  // When MSG_WK_PURGE is received, we know there will be no more messages from
174  // the worker thread.
175  worker_thread_->Clear(this, MSG_WK_CLOCK);
176  worker_thread_->Post(this, MSG_WK_PURGE);
177  session_ = NULL;
178  channel_ = NULL;
179  if ((stream_ != NULL)
180      && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED)))
181    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0));
182  if (tcp_) {
183    tcp_->Close(true);
184    AdjustClock();
185  }
186  SignalChannelClosed(this);
187}
188
189void PseudoTcpChannel::OnSessionTerminate(Session* session) {
190  // When the session terminates before we even connected
191  CritScope lock(&cs_);
192  if (session_ != NULL && channel_ == NULL) {
193    ASSERT(session == session_);
194    ASSERT(worker_thread_ == NULL);
195    ASSERT(tcp_ == NULL);
196    LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel";
197    session_ = NULL;
198    if (stream_ != NULL)
199      stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1));
200  }
201
202  // Even though session_ is being destroyed, we mustn't clear the pointer,
203  // since we'll need it to tear down channel_.
204  //
205  // TODO(wez): Is it always the case that if channel_ != NULL then we'll get
206  // a channel-destroyed notification?
207}
208
209void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) {
210  ASSERT(signal_thread_->IsCurrent());
211  CritScope lock(&cs_);
212  ASSERT(tcp_ != NULL);
213  tcp_->GetOption(opt, value);
214}
215
216void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) {
217  ASSERT(signal_thread_->IsCurrent());
218  CritScope lock(&cs_);
219  ASSERT(tcp_ != NULL);
220  tcp_->SetOption(opt, value);
221}
222
223//
224// Stream thread methods
225//
226
227StreamState PseudoTcpChannel::GetState() const {
228  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
229  CritScope lock(&cs_);
230  if (!session_)
231    return SS_CLOSED;
232  if (!tcp_)
233    return SS_OPENING;
234  switch (tcp_->State()) {
235    case PseudoTcp::TCP_LISTEN:
236    case PseudoTcp::TCP_SYN_SENT:
237    case PseudoTcp::TCP_SYN_RECEIVED:
238      return SS_OPENING;
239    case PseudoTcp::TCP_ESTABLISHED:
240      return SS_OPEN;
241    case PseudoTcp::TCP_CLOSED:
242    default:
243      return SS_CLOSED;
244  }
245}
246
247StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len,
248                                    size_t* read, int* error) {
249  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
250  CritScope lock(&cs_);
251  if (!tcp_)
252    return SR_BLOCK;
253
254  stream_readable_ = false;
255  int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len);
256  //LOG_F(LS_VERBOSE) << "Recv returned: " << result;
257  if (result > 0) {
258    if (read)
259      *read = result;
260    // PseudoTcp doesn't currently support repeated Readable signals.  Simulate
261    // them here.
262    stream_readable_ = true;
263    if (!pending_read_event_) {
264      pending_read_event_ = true;
265      stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true);
266    }
267    return SR_SUCCESS;
268  } else if (IsBlockingError(tcp_->GetError())) {
269    return SR_BLOCK;
270  } else {
271    if (error)
272      *error = tcp_->GetError();
273    return SR_ERROR;
274  }
275  // This spot is never reached.
276}
277
278StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len,
279                                     size_t* written, int* error) {
280  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
281  CritScope lock(&cs_);
282  if (!tcp_)
283    return SR_BLOCK;
284  int result = tcp_->Send(static_cast<const char*>(data), data_len);
285  //LOG_F(LS_VERBOSE) << "Send returned: " << result;
286  if (result > 0) {
287    if (written)
288      *written = result;
289    return SR_SUCCESS;
290  } else if (IsBlockingError(tcp_->GetError())) {
291    return SR_BLOCK;
292  } else {
293    if (error)
294      *error = tcp_->GetError();
295    return SR_ERROR;
296  }
297  // This spot is never reached.
298}
299
300void PseudoTcpChannel::Close() {
301  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
302  CritScope lock(&cs_);
303  stream_ = NULL;
304  // Clear out any pending event notifications
305  stream_thread_->Clear(this, MSG_ST_EVENT);
306  if (tcp_) {
307    tcp_->Close(false);
308    AdjustClock();
309  } else {
310    CheckDestroy();
311  }
312}
313
314//
315// Worker thread methods
316//
317
318void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) {
319  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
320  ASSERT(worker_thread_->IsCurrent());
321  CritScope lock(&cs_);
322  if (!channel_) {
323    LOG_F(LS_WARNING) << "NULL channel";
324    return;
325  }
326  ASSERT(channel == channel_);
327  if (!tcp_) {
328    LOG_F(LS_WARNING) << "NULL tcp";
329    return;
330  }
331  if (!ready_to_connect_ || !channel->writable())
332    return;
333
334  ready_to_connect_ = false;
335  tcp_->Connect();
336  AdjustClock();
337}
338
339void PseudoTcpChannel::OnChannelRead(TransportChannel* channel,
340                                     const char* data, size_t size) {
341  //LOG_F(LS_VERBOSE) << "(" << size << ")";
342  ASSERT(worker_thread_->IsCurrent());
343  CritScope lock(&cs_);
344  if (!channel_) {
345    LOG_F(LS_WARNING) << "NULL channel";
346    return;
347  }
348  ASSERT(channel == channel_);
349  if (!tcp_) {
350    LOG_F(LS_WARNING) << "NULL tcp";
351    return;
352  }
353  tcp_->NotifyPacket(data, size);
354  AdjustClock();
355}
356
357void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel,
358                                                  const SocketAddress& addr) {
359  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
360  ASSERT(worker_thread_->IsCurrent());
361  CritScope lock(&cs_);
362  if (!channel_) {
363    LOG_F(LS_WARNING) << "NULL channel";
364    return;
365  }
366  ASSERT(channel == channel_);
367  if (!tcp_) {
368    LOG_F(LS_WARNING) << "NULL tcp";
369    return;
370  }
371
372  uint16 mtu = 1280;  // safe default
373  talk_base::scoped_ptr<Socket> mtu_socket(
374      worker_thread_->socketserver()->CreateSocket(SOCK_DGRAM));
375  if (mtu_socket->Connect(addr) < 0 ||
376      mtu_socket->EstimateMTU(&mtu) < 0) {
377    LOG_F(LS_WARNING) << "Failed to estimate MTU, error="
378                      << mtu_socket->GetError();
379  }
380
381  LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes";
382  tcp_->NotifyMTU(mtu);
383  AdjustClock();
384}
385
386void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) {
387  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
388  ASSERT(cs_.CurrentThreadIsOwner());
389  ASSERT(worker_thread_->IsCurrent());
390  ASSERT(tcp == tcp_);
391  if (stream_) {
392    stream_readable_ = true;
393    pending_read_event_ = true;
394    stream_thread_->Post(this, MSG_ST_EVENT,
395                         new EventData(SE_OPEN | SE_READ | SE_WRITE));
396  }
397}
398
399void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) {
400  //LOG_F(LS_VERBOSE);
401  ASSERT(cs_.CurrentThreadIsOwner());
402  ASSERT(worker_thread_->IsCurrent());
403  ASSERT(tcp == tcp_);
404  if (stream_) {
405    stream_readable_ = true;
406    if (!pending_read_event_) {
407      pending_read_event_ = true;
408      stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ));
409    }
410  }
411}
412
413void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) {
414  //LOG_F(LS_VERBOSE);
415  ASSERT(cs_.CurrentThreadIsOwner());
416  ASSERT(worker_thread_->IsCurrent());
417  ASSERT(tcp == tcp_);
418  if (stream_)
419    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));
420}
421
422void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) {
423  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
424  ASSERT(cs_.CurrentThreadIsOwner());
425  ASSERT(worker_thread_->IsCurrent());
426  ASSERT(tcp == tcp_);
427  if (stream_)
428    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));
429}
430
431//
432// Multi-thread methods
433//
434
435void PseudoTcpChannel::OnMessage(Message* pmsg) {
436  if (pmsg->message_id == MSG_WK_CLOCK) {
437
438    ASSERT(worker_thread_->IsCurrent());
439    //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)";
440    CritScope lock(&cs_);
441    if (tcp_) {
442      tcp_->NotifyClock(PseudoTcp::Now());
443      AdjustClock(false);
444    }
445
446  } else if (pmsg->message_id == MSG_WK_PURGE) {
447
448    ASSERT(worker_thread_->IsCurrent());
449    LOG_F(LS_INFO) << "(MSG_WK_PURGE)";
450    // At this point, we know there are no additional worker thread messages.
451    CritScope lock(&cs_);
452    ASSERT(NULL == session_);
453    ASSERT(NULL == channel_);
454    worker_thread_ = NULL;
455    CheckDestroy();
456
457  } else if (pmsg->message_id == MSG_ST_EVENT) {
458
459    ASSERT(stream_thread_->IsCurrent());
460    //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, "
461    //             << data->event << ", " << data->error << ")";
462    ASSERT(stream_ != NULL);
463    EventData* data = static_cast<EventData*>(pmsg->pdata);
464    if (data->event & SE_READ) {
465      CritScope lock(&cs_);
466      pending_read_event_ = false;
467    }
468    stream_->SignalEvent(stream_, data->event, data->error);
469    delete data;
470
471  } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) {
472
473    ASSERT(signal_thread_->IsCurrent());
474    LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)";
475    ASSERT(session_ != NULL);
476    ASSERT(channel_ != NULL);
477    session_->DestroyChannel(content_name_, channel_->name());
478
479  } else if (pmsg->message_id == MSG_SI_DESTROY) {
480
481    ASSERT(signal_thread_->IsCurrent());
482    LOG_F(LS_INFO) << "(MSG_SI_DESTROY)";
483    // The message queue is empty, so it is safe to destroy ourselves.
484    delete this;
485
486  } else {
487    ASSERT(false);
488  }
489}
490
491IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket(
492    PseudoTcp* tcp, const char* buffer, size_t len) {
493  ASSERT(cs_.CurrentThreadIsOwner());
494  ASSERT(tcp == tcp_);
495  ASSERT(NULL != channel_);
496  int sent = channel_->SendPacket(buffer, len);
497  if (sent > 0) {
498    //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent";
499    return IPseudoTcpNotify::WR_SUCCESS;
500  } else if (IsBlockingError(channel_->GetError())) {
501    LOG_F(LS_VERBOSE) << "Blocking";
502    return IPseudoTcpNotify::WR_SUCCESS;
503  } else if (channel_->GetError() == EMSGSIZE) {
504    LOG_F(LS_ERROR) << "EMSGSIZE";
505    return IPseudoTcpNotify::WR_TOO_LARGE;
506  } else {
507    PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket";
508    ASSERT(false);
509    return IPseudoTcpNotify::WR_FAIL;
510  }
511}
512
513void PseudoTcpChannel::AdjustClock(bool clear) {
514  ASSERT(cs_.CurrentThreadIsOwner());
515  ASSERT(NULL != tcp_);
516
517  long timeout = 0;
518  if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) {
519    ASSERT(NULL != channel_);
520    // Reset the next clock, by clearing the old and setting a new one.
521    if (clear)
522      worker_thread_->Clear(this, MSG_WK_CLOCK);
523    worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK);
524    return;
525  }
526
527  delete tcp_;
528  tcp_ = NULL;
529  ready_to_connect_ = false;
530
531  if (channel_) {
532    // If TCP has failed, no need for channel_ anymore
533    signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL);
534  }
535}
536
537void PseudoTcpChannel::CheckDestroy() {
538  ASSERT(cs_.CurrentThreadIsOwner());
539  if ((worker_thread_ != NULL) || (stream_ != NULL))
540    return;
541  signal_thread_->Post(this, MSG_SI_DESTROY);
542}
543
544///////////////////////////////////////////////////////////////////////////////
545// PseudoTcpChannel::InternalStream
546///////////////////////////////////////////////////////////////////////////////
547
548PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent)
549  : parent_(parent) {
550}
551
552PseudoTcpChannel::InternalStream::~InternalStream() {
553  Close();
554}
555
556StreamState PseudoTcpChannel::InternalStream::GetState() const {
557  if (!parent_)
558    return SS_CLOSED;
559  return parent_->GetState();
560}
561
562StreamResult PseudoTcpChannel::InternalStream::Read(
563    void* buffer, size_t buffer_len, size_t* read, int* error) {
564  if (!parent_) {
565    if (error)
566      *error = ENOTCONN;
567    return SR_ERROR;
568  }
569  return parent_->Read(buffer, buffer_len, read, error);
570}
571
572StreamResult PseudoTcpChannel::InternalStream::Write(
573    const void* data, size_t data_len,  size_t* written, int* error) {
574  if (!parent_) {
575    if (error)
576      *error = ENOTCONN;
577    return SR_ERROR;
578  }
579  return parent_->Write(data, data_len, written, error);
580}
581
582void PseudoTcpChannel::InternalStream::Close() {
583  if (!parent_)
584    return;
585  parent_->Close();
586  parent_ = NULL;
587}
588
589///////////////////////////////////////////////////////////////////////////////
590
591} // namespace cricket
592