1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "jingle/glue/channel_socket_adapter.h" 6 7#include <limits> 8 9#include "base/callback.h" 10#include "base/logging.h" 11#include "base/message_loop/message_loop.h" 12#include "net/base/io_buffer.h" 13#include "net/base/net_errors.h" 14#include "third_party/libjingle/source/talk/p2p/base/transportchannel.h" 15 16namespace jingle_glue { 17 18TransportChannelSocketAdapter::TransportChannelSocketAdapter( 19 cricket::TransportChannel* channel) 20 : message_loop_(base::MessageLoop::current()), 21 channel_(channel), 22 closed_error_code_(net::OK) { 23 DCHECK(channel_); 24 25 channel_->SignalReadPacket.connect( 26 this, &TransportChannelSocketAdapter::OnNewPacket); 27 channel_->SignalWritableState.connect( 28 this, &TransportChannelSocketAdapter::OnWritableState); 29 channel_->SignalDestroyed.connect( 30 this, &TransportChannelSocketAdapter::OnChannelDestroyed); 31} 32 33TransportChannelSocketAdapter::~TransportChannelSocketAdapter() { 34 if (!destruction_callback_.is_null()) 35 destruction_callback_.Run(); 36} 37 38void TransportChannelSocketAdapter::SetOnDestroyedCallback( 39 const base::Closure& callback) { 40 destruction_callback_ = callback; 41} 42 43int TransportChannelSocketAdapter::Read( 44 net::IOBuffer* buf, 45 int buffer_size, 46 const net::CompletionCallback& callback) { 47 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 48 DCHECK(buf); 49 DCHECK(!callback.is_null()); 50 CHECK(read_callback_.is_null()); 51 52 if (!channel_) { 53 DCHECK(closed_error_code_ != net::OK); 54 return closed_error_code_; 55 } 56 57 read_callback_ = callback; 58 read_buffer_ = buf; 59 read_buffer_size_ = buffer_size; 60 61 return net::ERR_IO_PENDING; 62} 63 64int TransportChannelSocketAdapter::Write( 65 net::IOBuffer* buffer, 66 int buffer_size, 67 const net::CompletionCallback& callback) { 68 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 69 DCHECK(buffer); 70 DCHECK(!callback.is_null()); 71 CHECK(write_callback_.is_null()); 72 73 if (!channel_) { 74 DCHECK(closed_error_code_ != net::OK); 75 return closed_error_code_; 76 } 77 78 int result; 79 rtc::PacketOptions options; 80 if (channel_->writable()) { 81 result = channel_->SendPacket(buffer->data(), buffer_size, options); 82 if (result < 0) { 83 result = net::MapSystemError(channel_->GetError()); 84 85 // If the underlying socket returns IO pending where it shouldn't we 86 // pretend the packet is dropped and return as succeeded because no 87 // writeable callback will happen. 88 if (result == net::ERR_IO_PENDING) 89 result = net::OK; 90 } 91 } else { 92 // Channel is not writable yet. 93 result = net::ERR_IO_PENDING; 94 write_callback_ = callback; 95 write_buffer_ = buffer; 96 write_buffer_size_ = buffer_size; 97 } 98 99 return result; 100} 101 102int TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) { 103 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 104 return (channel_->SetOption(rtc::Socket::OPT_RCVBUF, size) == 0) ? 105 net::OK : net::ERR_SOCKET_SET_RECEIVE_BUFFER_SIZE_ERROR; 106} 107 108int TransportChannelSocketAdapter::SetSendBufferSize(int32 size) { 109 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 110 return (channel_->SetOption(rtc::Socket::OPT_SNDBUF, size) == 0) ? 111 net::OK : net::ERR_SOCKET_SET_SEND_BUFFER_SIZE_ERROR; 112} 113 114void TransportChannelSocketAdapter::Close(int error_code) { 115 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 116 117 if (!channel_) // Already closed. 118 return; 119 120 DCHECK(error_code != net::OK); 121 closed_error_code_ = error_code; 122 channel_->SignalReadPacket.disconnect(this); 123 channel_->SignalDestroyed.disconnect(this); 124 channel_ = NULL; 125 126 if (!read_callback_.is_null()) { 127 net::CompletionCallback callback = read_callback_; 128 read_callback_.Reset(); 129 read_buffer_ = NULL; 130 callback.Run(error_code); 131 } 132 133 if (!write_callback_.is_null()) { 134 net::CompletionCallback callback = write_callback_; 135 write_callback_.Reset(); 136 write_buffer_ = NULL; 137 callback.Run(error_code); 138 } 139} 140 141void TransportChannelSocketAdapter::OnNewPacket( 142 cricket::TransportChannel* channel, 143 const char* data, 144 size_t data_size, 145 const rtc::PacketTime& packet_time, 146 int flags) { 147 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 148 DCHECK_EQ(channel, channel_); 149 if (!read_callback_.is_null()) { 150 DCHECK(read_buffer_.get()); 151 CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max())); 152 153 if (read_buffer_size_ < static_cast<int>(data_size)) { 154 LOG(WARNING) << "Data buffer is smaller than the received packet. " 155 << "Dropping the data that doesn't fit."; 156 data_size = read_buffer_size_; 157 } 158 159 memcpy(read_buffer_->data(), data, data_size); 160 161 net::CompletionCallback callback = read_callback_; 162 read_callback_.Reset(); 163 read_buffer_ = NULL; 164 165 callback.Run(data_size); 166 } else { 167 LOG(WARNING) 168 << "Data was received without a callback. Dropping the packet."; 169 } 170} 171 172void TransportChannelSocketAdapter::OnWritableState( 173 cricket::TransportChannel* channel) { 174 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 175 // Try to send the packet if there is a pending write. 176 if (!write_callback_.is_null()) { 177 rtc::PacketOptions options; 178 int result = channel_->SendPacket(write_buffer_->data(), 179 write_buffer_size_, 180 options); 181 if (result < 0) 182 result = net::MapSystemError(channel_->GetError()); 183 184 if (result != net::ERR_IO_PENDING) { 185 net::CompletionCallback callback = write_callback_; 186 write_callback_.Reset(); 187 write_buffer_ = NULL; 188 callback.Run(result); 189 } 190 } 191} 192 193void TransportChannelSocketAdapter::OnChannelDestroyed( 194 cricket::TransportChannel* channel) { 195 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 196 DCHECK_EQ(channel, channel_); 197 Close(net::ERR_CONNECTION_ABORTED); 198} 199 200} // namespace jingle_glue 201