1/* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11#include <iomanip> 12 13#include "webrtc/base/asyncsocket.h" 14#include "webrtc/base/logging.h" 15#include "webrtc/base/socketfactory.h" 16#include "webrtc/base/socketpool.h" 17#include "webrtc/base/socketstream.h" 18#include "webrtc/base/thread.h" 19 20namespace rtc { 21 22/////////////////////////////////////////////////////////////////////////////// 23// StreamCache - Caches a set of open streams, defers creation to a separate 24// StreamPool. 25/////////////////////////////////////////////////////////////////////////////// 26 27StreamCache::StreamCache(StreamPool* pool) : pool_(pool) { 28} 29 30StreamCache::~StreamCache() { 31 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); 32 ++it) { 33 delete it->second; 34 } 35 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); 36 ++it) { 37 delete it->second; 38 } 39} 40 41StreamInterface* StreamCache::RequestConnectedStream( 42 const SocketAddress& remote, int* err) { 43 LOG_F(LS_VERBOSE) << "(" << remote << ")"; 44 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); 45 ++it) { 46 if (remote == it->first) { 47 it->second->SignalEvent.disconnect(this); 48 // Move from cached_ to active_ 49 active_.push_front(*it); 50 cached_.erase(it); 51 if (err) 52 *err = 0; 53 LOG_F(LS_VERBOSE) << "Providing cached stream"; 54 return active_.front().second; 55 } 56 } 57 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { 58 // We track active streams so that we can remember their address 59 active_.push_front(ConnectedStream(remote, stream)); 60 LOG_F(LS_VERBOSE) << "Providing new stream"; 61 return active_.front().second; 62 } 63 return NULL; 64} 65 66void StreamCache::ReturnConnectedStream(StreamInterface* stream) { 67 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); 68 ++it) { 69 if (stream == it->second) { 70 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; 71 if (stream->GetState() == SS_CLOSED) { 72 // Return closed streams 73 LOG_F(LS_VERBOSE) << "Returning closed stream"; 74 pool_->ReturnConnectedStream(it->second); 75 } else { 76 // Monitor open streams 77 stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent); 78 LOG_F(LS_VERBOSE) << "Caching stream"; 79 cached_.push_front(*it); 80 } 81 active_.erase(it); 82 return; 83 } 84 } 85 ASSERT(false); 86} 87 88void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) { 89 if ((events & SE_CLOSE) == 0) { 90 LOG_F(LS_WARNING) << "(" << events << ", " << err 91 << ") received non-close event"; 92 return; 93 } 94 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); 95 ++it) { 96 if (stream == it->second) { 97 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; 98 // We don't cache closed streams, so return it. 99 it->second->SignalEvent.disconnect(this); 100 LOG_F(LS_VERBOSE) << "Returning closed stream"; 101 pool_->ReturnConnectedStream(it->second); 102 cached_.erase(it); 103 return; 104 } 105 } 106 ASSERT(false); 107} 108 109////////////////////////////////////////////////////////////////////// 110// NewSocketPool 111////////////////////////////////////////////////////////////////////// 112 113NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) { 114} 115 116NewSocketPool::~NewSocketPool() { 117} 118 119StreamInterface* 120NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { 121 AsyncSocket* socket = 122 factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM); 123 if (!socket) { 124 if (err) 125 *err = -1; 126 return NULL; 127 } 128 if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) { 129 if (err) 130 *err = socket->GetError(); 131 delete socket; 132 return NULL; 133 } 134 if (err) 135 *err = 0; 136 return new SocketStream(socket); 137} 138 139void 140NewSocketPool::ReturnConnectedStream(StreamInterface* stream) { 141 Thread::Current()->Dispose(stream); 142} 143 144////////////////////////////////////////////////////////////////////// 145// ReuseSocketPool 146////////////////////////////////////////////////////////////////////// 147 148ReuseSocketPool::ReuseSocketPool(SocketFactory* factory) 149: factory_(factory), stream_(NULL), checked_out_(false) { 150} 151 152ReuseSocketPool::~ReuseSocketPool() { 153 ASSERT(!checked_out_); 154 delete stream_; 155} 156 157StreamInterface* 158ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { 159 // Only one socket can be used from this "pool" at a time 160 ASSERT(!checked_out_); 161 if (!stream_) { 162 LOG_F(LS_VERBOSE) << "Creating new socket"; 163 int family = remote.family(); 164 // TODO: Deal with this when we/I clean up DNS resolution. 165 if (remote.IsUnresolvedIP()) { 166 family = AF_INET; 167 } 168 AsyncSocket* socket = 169 factory_->CreateAsyncSocket(family, SOCK_STREAM); 170 if (!socket) { 171 if (err) 172 *err = -1; 173 return NULL; 174 } 175 stream_ = new SocketStream(socket); 176 } 177 if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) { 178 LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_; 179 } else { 180 remote_ = remote; 181 stream_->Close(); 182 if ((stream_->GetSocket()->Connect(remote_) != 0) 183 && !stream_->GetSocket()->IsBlocking()) { 184 if (err) 185 *err = stream_->GetSocket()->GetError(); 186 return NULL; 187 } else { 188 LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_; 189 } 190 } 191 stream_->SignalEvent.disconnect(this); 192 checked_out_ = true; 193 if (err) 194 *err = 0; 195 return stream_; 196} 197 198void 199ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) { 200 ASSERT(stream == stream_); 201 ASSERT(checked_out_); 202 checked_out_ = false; 203 // Until the socket is reused, monitor it to determine if it closes. 204 stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent); 205} 206 207void 208ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) { 209 ASSERT(stream == stream_); 210 ASSERT(!checked_out_); 211 212 // If the stream was written to and then immediately returned to us then 213 // we may get a writable notification for it, which we should ignore. 214 if (events == SE_WRITE) { 215 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring"; 216 return; 217 } 218 219 // If the peer sent data, we can't process it, so drop the connection. 220 // If the socket has closed, clean it up. 221 // In either case, we'll reconnect it the next time it is used. 222 ASSERT(0 != (events & (SE_READ|SE_CLOSE))); 223 if (0 != (events & SE_CLOSE)) { 224 LOG_F(LS_VERBOSE) << "Connection closed with error: " << err; 225 } else { 226 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing"; 227 } 228 stream_->Close(); 229} 230 231/////////////////////////////////////////////////////////////////////////////// 232// LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached 233// LoggingAdapters. 234/////////////////////////////////////////////////////////////////////////////// 235 236LoggingPoolAdapter::LoggingPoolAdapter( 237 StreamPool* pool, LoggingSeverity level, const std::string& label, 238 bool binary_mode) 239 : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) { 240} 241 242LoggingPoolAdapter::~LoggingPoolAdapter() { 243 for (StreamList::iterator it = recycle_bin_.begin(); 244 it != recycle_bin_.end(); ++it) { 245 delete *it; 246 } 247} 248 249StreamInterface* LoggingPoolAdapter::RequestConnectedStream( 250 const SocketAddress& remote, int* err) { 251 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { 252 ASSERT(SS_CLOSED != stream->GetState()); 253 std::stringstream ss; 254 ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8) 255 << stream << ")"; 256 LOG_V(level_) << ss.str() 257 << ((SS_OPEN == stream->GetState()) ? " Connected" 258 : " Connecting") 259 << " to " << remote; 260 if (recycle_bin_.empty()) { 261 return new LoggingAdapter(stream, level_, ss.str(), binary_mode_); 262 } 263 LoggingAdapter* logging = recycle_bin_.front(); 264 recycle_bin_.pop_front(); 265 logging->set_label(ss.str()); 266 logging->Attach(stream); 267 return logging; 268 } 269 return NULL; 270} 271 272void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) { 273 LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream); 274 pool_->ReturnConnectedStream(logging->Detach()); 275 recycle_bin_.push_back(logging); 276} 277 278/////////////////////////////////////////////////////////////////////////////// 279 280} // namespace rtc 281