1/*
2 *  Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/test/channel_transport/udp_socket_manager_posix.h"
12
13#include <stdio.h>
14#include <strings.h>
15#include <sys/time.h>
16#include <sys/types.h>
17#include <time.h>
18#include <unistd.h>
19
20#include "webrtc/system_wrappers/interface/sleep.h"
21#include "webrtc/system_wrappers/interface/trace.h"
22#include "webrtc/test/channel_transport/udp_socket_posix.h"
23
24namespace webrtc {
25namespace test {
26
27UdpSocketManagerPosix::UdpSocketManagerPosix()
28    : UdpSocketManager(),
29      _id(-1),
30      _critSect(CriticalSectionWrapper::CreateCriticalSection()),
31      _numberOfSocketMgr(-1),
32      _incSocketMgrNextTime(0),
33      _nextSocketMgrToAssign(0),
34      _socketMgr()
35{
36}
37
38bool UdpSocketManagerPosix::Init(int32_t id, uint8_t& numOfWorkThreads) {
39    CriticalSectionScoped cs(_critSect);
40    if ((_id != -1) || (_numOfWorkThreads != 0)) {
41        assert(_id != -1);
42        assert(_numOfWorkThreads != 0);
43        return false;
44    }
45
46    _id = id;
47    _numberOfSocketMgr = numOfWorkThreads;
48    _numOfWorkThreads = numOfWorkThreads;
49
50    if(MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX < _numberOfSocketMgr)
51    {
52        _numberOfSocketMgr = MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX;
53    }
54    for(int i = 0;i < _numberOfSocketMgr; i++)
55    {
56        _socketMgr[i] = new UdpSocketManagerPosixImpl();
57    }
58    return true;
59}
60
61
62UdpSocketManagerPosix::~UdpSocketManagerPosix()
63{
64    Stop();
65    WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
66                 "UdpSocketManagerPosix(%d)::UdpSocketManagerPosix()",
67                 _numberOfSocketMgr);
68
69    for(int i = 0;i < _numberOfSocketMgr; i++)
70    {
71        delete _socketMgr[i];
72    }
73    delete _critSect;
74}
75
76int32_t UdpSocketManagerPosix::ChangeUniqueId(const int32_t id)
77{
78    _id = id;
79    return 0;
80}
81
82bool UdpSocketManagerPosix::Start()
83{
84    WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
85                 "UdpSocketManagerPosix(%d)::Start()",
86                 _numberOfSocketMgr);
87
88    _critSect->Enter();
89    bool retVal = true;
90    for(int i = 0;i < _numberOfSocketMgr && retVal; i++)
91    {
92        retVal = _socketMgr[i]->Start();
93    }
94    if(!retVal)
95    {
96        WEBRTC_TRACE(
97            kTraceError,
98            kTraceTransport,
99            _id,
100            "UdpSocketManagerPosix(%d)::Start() error starting socket managers",
101            _numberOfSocketMgr);
102    }
103    _critSect->Leave();
104    return retVal;
105}
106
107bool UdpSocketManagerPosix::Stop()
108{
109    WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
110                 "UdpSocketManagerPosix(%d)::Stop()",_numberOfSocketMgr);
111
112    _critSect->Enter();
113    bool retVal = true;
114    for(int i = 0; i < _numberOfSocketMgr && retVal; i++)
115    {
116        retVal = _socketMgr[i]->Stop();
117    }
118    if(!retVal)
119    {
120        WEBRTC_TRACE(
121            kTraceError,
122            kTraceTransport,
123            _id,
124            "UdpSocketManagerPosix(%d)::Stop() there are still active socket "
125            "managers",
126            _numberOfSocketMgr);
127    }
128    _critSect->Leave();
129    return retVal;
130}
131
132bool UdpSocketManagerPosix::AddSocket(UdpSocketWrapper* s)
133{
134    WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
135                 "UdpSocketManagerPosix(%d)::AddSocket()",_numberOfSocketMgr);
136
137    _critSect->Enter();
138    bool retVal = _socketMgr[_nextSocketMgrToAssign]->AddSocket(s);
139    if(!retVal)
140    {
141        WEBRTC_TRACE(
142            kTraceError,
143            kTraceTransport,
144            _id,
145            "UdpSocketManagerPosix(%d)::AddSocket() failed to add socket to\
146 manager",
147            _numberOfSocketMgr);
148    }
149
150    // Distribute sockets on UdpSocketManagerPosixImpls in a round-robin
151    // fashion.
152    if(_incSocketMgrNextTime == 0)
153    {
154        _incSocketMgrNextTime++;
155    } else {
156        _incSocketMgrNextTime = 0;
157        _nextSocketMgrToAssign++;
158        if(_nextSocketMgrToAssign >= _numberOfSocketMgr)
159        {
160            _nextSocketMgrToAssign = 0;
161        }
162    }
163    _critSect->Leave();
164    return retVal;
165}
166
167bool UdpSocketManagerPosix::RemoveSocket(UdpSocketWrapper* s)
168{
169    WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
170                 "UdpSocketManagerPosix(%d)::RemoveSocket()",
171                 _numberOfSocketMgr);
172
173    _critSect->Enter();
174    bool retVal = false;
175    for(int i = 0;i < _numberOfSocketMgr && (retVal == false); i++)
176    {
177        retVal = _socketMgr[i]->RemoveSocket(s);
178    }
179    if(!retVal)
180    {
181        WEBRTC_TRACE(
182            kTraceError,
183            kTraceTransport,
184            _id,
185            "UdpSocketManagerPosix(%d)::RemoveSocket() failed to remove socket\
186 from manager",
187            _numberOfSocketMgr);
188    }
189    _critSect->Leave();
190    return retVal;
191}
192
193
194UdpSocketManagerPosixImpl::UdpSocketManagerPosixImpl()
195{
196    _critSectList = CriticalSectionWrapper::CreateCriticalSection();
197    _thread = ThreadWrapper::CreateThread(UdpSocketManagerPosixImpl::Run, this,
198                                          kRealtimePriority,
199                                          "UdpSocketManagerPosixImplThread");
200    FD_ZERO(&_readFds);
201    WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
202                 "UdpSocketManagerPosix created");
203}
204
205UdpSocketManagerPosixImpl::~UdpSocketManagerPosixImpl()
206{
207    if(_thread != NULL)
208    {
209        delete _thread;
210    }
211
212    if (_critSectList != NULL)
213    {
214        UpdateSocketMap();
215
216        _critSectList->Enter();
217        for (std::map<SOCKET, UdpSocketPosix*>::iterator it =
218                 _socketMap.begin();
219             it != _socketMap.end();
220             ++it) {
221          delete it->second;
222        }
223        _socketMap.clear();
224        _critSectList->Leave();
225
226        delete _critSectList;
227    }
228
229    WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
230                 "UdpSocketManagerPosix deleted");
231}
232
233bool UdpSocketManagerPosixImpl::Start()
234{
235    unsigned int id = 0;
236    if (_thread == NULL)
237    {
238        return false;
239    }
240
241    WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
242                 "Start UdpSocketManagerPosix");
243    return _thread->Start(id);
244}
245
246bool UdpSocketManagerPosixImpl::Stop()
247{
248    if (_thread == NULL)
249    {
250        return true;
251    }
252
253    WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
254                 "Stop UdpSocketManagerPosix");
255    return _thread->Stop();
256}
257
258bool UdpSocketManagerPosixImpl::Process()
259{
260    bool doSelect = false;
261    // Timeout = 1 second.
262    struct timeval timeout;
263    timeout.tv_sec = 0;
264    timeout.tv_usec = 10000;
265
266    FD_ZERO(&_readFds);
267
268    UpdateSocketMap();
269
270    SOCKET maxFd = 0;
271    for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin();
272         it != _socketMap.end();
273         ++it) {
274      doSelect = true;
275      if (it->first > maxFd)
276        maxFd = it->first;
277      FD_SET(it->first, &_readFds);
278    }
279
280    int num = 0;
281    if (doSelect)
282    {
283        num = select(maxFd+1, &_readFds, NULL, NULL, &timeout);
284
285        if (num == SOCKET_ERROR)
286        {
287            // Timeout = 10 ms.
288            SleepMs(10);
289            return true;
290        }
291    }else
292    {
293        // Timeout = 10 ms.
294        SleepMs(10);
295        return true;
296    }
297
298    for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin();
299         it != _socketMap.end();
300         ++it) {
301      if (FD_ISSET(it->first, &_readFds)) {
302        it->second->HasIncoming();
303        --num;
304      }
305    }
306
307    return true;
308}
309
310bool UdpSocketManagerPosixImpl::Run(ThreadObj obj)
311{
312    UdpSocketManagerPosixImpl* mgr =
313        static_cast<UdpSocketManagerPosixImpl*>(obj);
314    return mgr->Process();
315}
316
317bool UdpSocketManagerPosixImpl::AddSocket(UdpSocketWrapper* s)
318{
319    UdpSocketPosix* sl = static_cast<UdpSocketPosix*>(s);
320    if(sl->GetFd() == INVALID_SOCKET || !(sl->GetFd() < FD_SETSIZE))
321    {
322        return false;
323    }
324    _critSectList->Enter();
325    _addList.push_back(s);
326    _critSectList->Leave();
327    return true;
328}
329
330bool UdpSocketManagerPosixImpl::RemoveSocket(UdpSocketWrapper* s)
331{
332    // Put in remove list if this is the correct UdpSocketManagerPosixImpl.
333    _critSectList->Enter();
334
335    // If the socket is in the add list it's safe to remove and delete it.
336    for (SocketList::iterator iter = _addList.begin();
337         iter != _addList.end(); ++iter) {
338        UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter);
339        unsigned int addFD = addSocket->GetFd();
340        unsigned int removeFD = static_cast<UdpSocketPosix*>(s)->GetFd();
341        if(removeFD == addFD)
342        {
343            _removeList.push_back(removeFD);
344            _critSectList->Leave();
345            return true;
346        }
347    }
348
349    // Checking the socket map is safe since all Erase and Insert calls to this
350    // map are also protected by _critSectList.
351    if (_socketMap.find(static_cast<UdpSocketPosix*>(s)->GetFd()) !=
352        _socketMap.end()) {
353      _removeList.push_back(static_cast<UdpSocketPosix*>(s)->GetFd());
354      _critSectList->Leave();
355      return true;
356    }
357    _critSectList->Leave();
358    return false;
359}
360
361void UdpSocketManagerPosixImpl::UpdateSocketMap()
362{
363    // Remove items in remove list.
364    _critSectList->Enter();
365    for (FdList::iterator iter = _removeList.begin();
366         iter != _removeList.end(); ++iter) {
367        UdpSocketPosix* deleteSocket = NULL;
368        SOCKET removeFD = *iter;
369
370        // If the socket is in the add list it hasn't been added to the socket
371        // map yet. Just remove the socket from the add list.
372        for (SocketList::iterator iter = _addList.begin();
373             iter != _addList.end(); ++iter) {
374            UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter);
375            SOCKET addFD = addSocket->GetFd();
376            if(removeFD == addFD)
377            {
378                deleteSocket = addSocket;
379                _addList.erase(iter);
380                break;
381            }
382        }
383
384        // Find and remove socket from _socketMap.
385        std::map<SOCKET, UdpSocketPosix*>::iterator it =
386            _socketMap.find(removeFD);
387        if(it != _socketMap.end())
388        {
389          deleteSocket = it->second;
390          _socketMap.erase(it);
391        }
392        if(deleteSocket)
393        {
394            deleteSocket->ReadyForDeletion();
395            delete deleteSocket;
396        }
397    }
398    _removeList.clear();
399
400    // Add sockets from add list.
401    for (SocketList::iterator iter = _addList.begin();
402         iter != _addList.end(); ++iter) {
403        UdpSocketPosix* s = static_cast<UdpSocketPosix*>(*iter);
404        if(s) {
405          _socketMap[s->GetFd()] = s;
406        }
407    }
408    _addList.clear();
409    _critSectList->Leave();
410}
411
412}  // namespace test
413}  // namespace webrtc
414