1/* 2 * Copyright (c) 2011 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11#include "webrtc/test/channel_transport/udp_socket_manager_posix.h" 12 13#include <stdio.h> 14#include <strings.h> 15#include <sys/time.h> 16#include <sys/types.h> 17#include <time.h> 18#include <unistd.h> 19 20#include "webrtc/system_wrappers/interface/sleep.h" 21#include "webrtc/system_wrappers/interface/trace.h" 22#include "webrtc/test/channel_transport/udp_socket_posix.h" 23 24namespace webrtc { 25namespace test { 26 27UdpSocketManagerPosix::UdpSocketManagerPosix() 28 : UdpSocketManager(), 29 _id(-1), 30 _critSect(CriticalSectionWrapper::CreateCriticalSection()), 31 _numberOfSocketMgr(-1), 32 _incSocketMgrNextTime(0), 33 _nextSocketMgrToAssign(0), 34 _socketMgr() 35{ 36} 37 38bool UdpSocketManagerPosix::Init(int32_t id, uint8_t& numOfWorkThreads) { 39 CriticalSectionScoped cs(_critSect); 40 if ((_id != -1) || (_numOfWorkThreads != 0)) { 41 assert(_id != -1); 42 assert(_numOfWorkThreads != 0); 43 return false; 44 } 45 46 _id = id; 47 _numberOfSocketMgr = numOfWorkThreads; 48 _numOfWorkThreads = numOfWorkThreads; 49 50 if(MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX < _numberOfSocketMgr) 51 { 52 _numberOfSocketMgr = MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX; 53 } 54 for(int i = 0;i < _numberOfSocketMgr; i++) 55 { 56 _socketMgr[i] = new UdpSocketManagerPosixImpl(); 57 } 58 return true; 59} 60 61 62UdpSocketManagerPosix::~UdpSocketManagerPosix() 63{ 64 Stop(); 65 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, 66 "UdpSocketManagerPosix(%d)::UdpSocketManagerPosix()", 67 _numberOfSocketMgr); 68 69 for(int i = 0;i < _numberOfSocketMgr; i++) 70 { 71 delete _socketMgr[i]; 72 } 73 delete _critSect; 74} 75 76int32_t UdpSocketManagerPosix::ChangeUniqueId(const int32_t id) 77{ 78 _id = id; 79 return 0; 80} 81 82bool UdpSocketManagerPosix::Start() 83{ 84 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, 85 "UdpSocketManagerPosix(%d)::Start()", 86 _numberOfSocketMgr); 87 88 _critSect->Enter(); 89 bool retVal = true; 90 for(int i = 0;i < _numberOfSocketMgr && retVal; i++) 91 { 92 retVal = _socketMgr[i]->Start(); 93 } 94 if(!retVal) 95 { 96 WEBRTC_TRACE( 97 kTraceError, 98 kTraceTransport, 99 _id, 100 "UdpSocketManagerPosix(%d)::Start() error starting socket managers", 101 _numberOfSocketMgr); 102 } 103 _critSect->Leave(); 104 return retVal; 105} 106 107bool UdpSocketManagerPosix::Stop() 108{ 109 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, 110 "UdpSocketManagerPosix(%d)::Stop()",_numberOfSocketMgr); 111 112 _critSect->Enter(); 113 bool retVal = true; 114 for(int i = 0; i < _numberOfSocketMgr && retVal; i++) 115 { 116 retVal = _socketMgr[i]->Stop(); 117 } 118 if(!retVal) 119 { 120 WEBRTC_TRACE( 121 kTraceError, 122 kTraceTransport, 123 _id, 124 "UdpSocketManagerPosix(%d)::Stop() there are still active socket " 125 "managers", 126 _numberOfSocketMgr); 127 } 128 _critSect->Leave(); 129 return retVal; 130} 131 132bool UdpSocketManagerPosix::AddSocket(UdpSocketWrapper* s) 133{ 134 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, 135 "UdpSocketManagerPosix(%d)::AddSocket()",_numberOfSocketMgr); 136 137 _critSect->Enter(); 138 bool retVal = _socketMgr[_nextSocketMgrToAssign]->AddSocket(s); 139 if(!retVal) 140 { 141 WEBRTC_TRACE( 142 kTraceError, 143 kTraceTransport, 144 _id, 145 "UdpSocketManagerPosix(%d)::AddSocket() failed to add socket to\ 146 manager", 147 _numberOfSocketMgr); 148 } 149 150 // Distribute sockets on UdpSocketManagerPosixImpls in a round-robin 151 // fashion. 152 if(_incSocketMgrNextTime == 0) 153 { 154 _incSocketMgrNextTime++; 155 } else { 156 _incSocketMgrNextTime = 0; 157 _nextSocketMgrToAssign++; 158 if(_nextSocketMgrToAssign >= _numberOfSocketMgr) 159 { 160 _nextSocketMgrToAssign = 0; 161 } 162 } 163 _critSect->Leave(); 164 return retVal; 165} 166 167bool UdpSocketManagerPosix::RemoveSocket(UdpSocketWrapper* s) 168{ 169 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, 170 "UdpSocketManagerPosix(%d)::RemoveSocket()", 171 _numberOfSocketMgr); 172 173 _critSect->Enter(); 174 bool retVal = false; 175 for(int i = 0;i < _numberOfSocketMgr && (retVal == false); i++) 176 { 177 retVal = _socketMgr[i]->RemoveSocket(s); 178 } 179 if(!retVal) 180 { 181 WEBRTC_TRACE( 182 kTraceError, 183 kTraceTransport, 184 _id, 185 "UdpSocketManagerPosix(%d)::RemoveSocket() failed to remove socket\ 186 from manager", 187 _numberOfSocketMgr); 188 } 189 _critSect->Leave(); 190 return retVal; 191} 192 193 194UdpSocketManagerPosixImpl::UdpSocketManagerPosixImpl() 195{ 196 _critSectList = CriticalSectionWrapper::CreateCriticalSection(); 197 _thread = ThreadWrapper::CreateThread(UdpSocketManagerPosixImpl::Run, this, 198 kRealtimePriority, 199 "UdpSocketManagerPosixImplThread"); 200 FD_ZERO(&_readFds); 201 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, 202 "UdpSocketManagerPosix created"); 203} 204 205UdpSocketManagerPosixImpl::~UdpSocketManagerPosixImpl() 206{ 207 if(_thread != NULL) 208 { 209 delete _thread; 210 } 211 212 if (_critSectList != NULL) 213 { 214 UpdateSocketMap(); 215 216 _critSectList->Enter(); 217 for (std::map<SOCKET, UdpSocketPosix*>::iterator it = 218 _socketMap.begin(); 219 it != _socketMap.end(); 220 ++it) { 221 delete it->second; 222 } 223 _socketMap.clear(); 224 _critSectList->Leave(); 225 226 delete _critSectList; 227 } 228 229 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, 230 "UdpSocketManagerPosix deleted"); 231} 232 233bool UdpSocketManagerPosixImpl::Start() 234{ 235 unsigned int id = 0; 236 if (_thread == NULL) 237 { 238 return false; 239 } 240 241 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, 242 "Start UdpSocketManagerPosix"); 243 return _thread->Start(id); 244} 245 246bool UdpSocketManagerPosixImpl::Stop() 247{ 248 if (_thread == NULL) 249 { 250 return true; 251 } 252 253 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, 254 "Stop UdpSocketManagerPosix"); 255 return _thread->Stop(); 256} 257 258bool UdpSocketManagerPosixImpl::Process() 259{ 260 bool doSelect = false; 261 // Timeout = 1 second. 262 struct timeval timeout; 263 timeout.tv_sec = 0; 264 timeout.tv_usec = 10000; 265 266 FD_ZERO(&_readFds); 267 268 UpdateSocketMap(); 269 270 SOCKET maxFd = 0; 271 for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin(); 272 it != _socketMap.end(); 273 ++it) { 274 doSelect = true; 275 if (it->first > maxFd) 276 maxFd = it->first; 277 FD_SET(it->first, &_readFds); 278 } 279 280 int num = 0; 281 if (doSelect) 282 { 283 num = select(maxFd+1, &_readFds, NULL, NULL, &timeout); 284 285 if (num == SOCKET_ERROR) 286 { 287 // Timeout = 10 ms. 288 SleepMs(10); 289 return true; 290 } 291 }else 292 { 293 // Timeout = 10 ms. 294 SleepMs(10); 295 return true; 296 } 297 298 for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin(); 299 it != _socketMap.end(); 300 ++it) { 301 if (FD_ISSET(it->first, &_readFds)) { 302 it->second->HasIncoming(); 303 --num; 304 } 305 } 306 307 return true; 308} 309 310bool UdpSocketManagerPosixImpl::Run(ThreadObj obj) 311{ 312 UdpSocketManagerPosixImpl* mgr = 313 static_cast<UdpSocketManagerPosixImpl*>(obj); 314 return mgr->Process(); 315} 316 317bool UdpSocketManagerPosixImpl::AddSocket(UdpSocketWrapper* s) 318{ 319 UdpSocketPosix* sl = static_cast<UdpSocketPosix*>(s); 320 if(sl->GetFd() == INVALID_SOCKET || !(sl->GetFd() < FD_SETSIZE)) 321 { 322 return false; 323 } 324 _critSectList->Enter(); 325 _addList.push_back(s); 326 _critSectList->Leave(); 327 return true; 328} 329 330bool UdpSocketManagerPosixImpl::RemoveSocket(UdpSocketWrapper* s) 331{ 332 // Put in remove list if this is the correct UdpSocketManagerPosixImpl. 333 _critSectList->Enter(); 334 335 // If the socket is in the add list it's safe to remove and delete it. 336 for (SocketList::iterator iter = _addList.begin(); 337 iter != _addList.end(); ++iter) { 338 UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter); 339 unsigned int addFD = addSocket->GetFd(); 340 unsigned int removeFD = static_cast<UdpSocketPosix*>(s)->GetFd(); 341 if(removeFD == addFD) 342 { 343 _removeList.push_back(removeFD); 344 _critSectList->Leave(); 345 return true; 346 } 347 } 348 349 // Checking the socket map is safe since all Erase and Insert calls to this 350 // map are also protected by _critSectList. 351 if (_socketMap.find(static_cast<UdpSocketPosix*>(s)->GetFd()) != 352 _socketMap.end()) { 353 _removeList.push_back(static_cast<UdpSocketPosix*>(s)->GetFd()); 354 _critSectList->Leave(); 355 return true; 356 } 357 _critSectList->Leave(); 358 return false; 359} 360 361void UdpSocketManagerPosixImpl::UpdateSocketMap() 362{ 363 // Remove items in remove list. 364 _critSectList->Enter(); 365 for (FdList::iterator iter = _removeList.begin(); 366 iter != _removeList.end(); ++iter) { 367 UdpSocketPosix* deleteSocket = NULL; 368 SOCKET removeFD = *iter; 369 370 // If the socket is in the add list it hasn't been added to the socket 371 // map yet. Just remove the socket from the add list. 372 for (SocketList::iterator iter = _addList.begin(); 373 iter != _addList.end(); ++iter) { 374 UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter); 375 SOCKET addFD = addSocket->GetFd(); 376 if(removeFD == addFD) 377 { 378 deleteSocket = addSocket; 379 _addList.erase(iter); 380 break; 381 } 382 } 383 384 // Find and remove socket from _socketMap. 385 std::map<SOCKET, UdpSocketPosix*>::iterator it = 386 _socketMap.find(removeFD); 387 if(it != _socketMap.end()) 388 { 389 deleteSocket = it->second; 390 _socketMap.erase(it); 391 } 392 if(deleteSocket) 393 { 394 deleteSocket->ReadyForDeletion(); 395 delete deleteSocket; 396 } 397 } 398 _removeList.clear(); 399 400 // Add sockets from add list. 401 for (SocketList::iterator iter = _addList.begin(); 402 iter != _addList.end(); ++iter) { 403 UdpSocketPosix* s = static_cast<UdpSocketPosix*>(*iter); 404 if(s) { 405 _socketMap[s->GetFd()] = s; 406 } 407 } 408 _addList.clear(); 409 _critSectList->Leave(); 410} 411 412} // namespace test 413} // namespace webrtc 414