ARTPConnection.cpp revision cf7b9c7aae758ac0b99833915053c63c2ac46e09
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "ARTPConnection.h"
18
19#include "ARTPSource.h"
20#include "ASessionDescription.h"
21
22#include <media/stagefright/foundation/ABuffer.h>
23#include <media/stagefright/foundation/ADebug.h>
24#include <media/stagefright/foundation/AMessage.h>
25#include <media/stagefright/foundation/AString.h>
26
27#include <arpa/inet.h>
28#include <sys/socket.h>
29
30#define VERBOSE         0
31
32#if VERBOSE
33#include "hexdump.h"
34#endif
35
36namespace android {
37
38static uint16_t u16at(const uint8_t *data) {
39    return data[0] << 8 | data[1];
40}
41
42static uint32_t u32at(const uint8_t *data) {
43    return u16at(data) << 16 | u16at(&data[2]);
44}
45
46static uint64_t u64at(const uint8_t *data) {
47    return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
48}
49
50// static
51const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
52
53struct ARTPConnection::StreamInfo {
54    int mRTPSocket;
55    int mRTCPSocket;
56    sp<ASessionDescription> mSessionDesc;
57    size_t mIndex;
58    sp<AMessage> mNotifyMsg;
59};
60
61ARTPConnection::ARTPConnection()
62    : mPollEventPending(false) {
63}
64
65ARTPConnection::~ARTPConnection() {
66}
67
68void ARTPConnection::addStream(
69        int rtpSocket, int rtcpSocket,
70        const sp<ASessionDescription> &sessionDesc,
71        size_t index,
72        const sp<AMessage> &notify) {
73    sp<AMessage> msg = new AMessage(kWhatAddStream, id());
74    msg->setInt32("rtp-socket", rtpSocket);
75    msg->setInt32("rtcp-socket", rtcpSocket);
76    msg->setObject("session-desc", sessionDesc);
77    msg->setSize("index", index);
78    msg->setMessage("notify", notify);
79    msg->post();
80}
81
82void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
83    sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
84    msg->setInt32("rtp-socket", rtpSocket);
85    msg->setInt32("rtcp-socket", rtcpSocket);
86    msg->post();
87}
88
89static void bumpSocketBufferSize(int s) {
90    int size = 256 * 1024;
91    CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
92}
93
94// static
95void ARTPConnection::MakePortPair(
96        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
97    *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
98    CHECK_GE(*rtpSocket, 0);
99
100    bumpSocketBufferSize(*rtpSocket);
101
102    *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
103    CHECK_GE(*rtcpSocket, 0);
104
105    bumpSocketBufferSize(*rtcpSocket);
106
107    unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
108    start &= ~1;
109
110    for (unsigned port = start; port < 65536; port += 2) {
111        struct sockaddr_in addr;
112        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
113        addr.sin_family = AF_INET;
114        addr.sin_addr.s_addr = INADDR_ANY;
115        addr.sin_port = htons(port);
116
117        if (bind(*rtpSocket,
118                 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
119            continue;
120        }
121
122        addr.sin_port = htons(port + 1);
123
124        if (bind(*rtcpSocket,
125                 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
126            *rtpPort = port;
127            return;
128        }
129    }
130
131    TRESPASS();
132}
133
134void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
135    switch (msg->what()) {
136        case kWhatAddStream:
137        {
138            onAddStream(msg);
139            break;
140        }
141
142        case kWhatRemoveStream:
143        {
144            onRemoveStream(msg);
145            break;
146        }
147
148        case kWhatPollStreams:
149        {
150            onPollStreams();
151            break;
152        }
153
154        default:
155        {
156            TRESPASS();
157            break;
158        }
159    }
160}
161
162void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
163    mStreams.push_back(StreamInfo());
164    StreamInfo *info = &*--mStreams.end();
165
166    int32_t s;
167    CHECK(msg->findInt32("rtp-socket", &s));
168    info->mRTPSocket = s;
169    CHECK(msg->findInt32("rtcp-socket", &s));
170    info->mRTCPSocket = s;
171
172    sp<RefBase> obj;
173    CHECK(msg->findObject("session-desc", &obj));
174    info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
175
176    CHECK(msg->findSize("index", &info->mIndex));
177    CHECK(msg->findMessage("notify", &info->mNotifyMsg));
178
179    postPollEvent();
180}
181
182void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
183    int32_t rtpSocket, rtcpSocket;
184    CHECK(msg->findInt32("rtp-socket", &rtpSocket));
185    CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
186
187    List<StreamInfo>::iterator it = mStreams.begin();
188    while (it != mStreams.end()
189           && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
190        ++it;
191    }
192
193    if (it == mStreams.end()) {
194        TRESPASS();
195    }
196
197    mStreams.erase(it);
198}
199
200void ARTPConnection::postPollEvent() {
201    if (mPollEventPending) {
202        return;
203    }
204
205    sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
206    msg->post();
207
208    mPollEventPending = true;
209}
210
211void ARTPConnection::onPollStreams() {
212    mPollEventPending = false;
213
214    if (mStreams.empty()) {
215        return;
216    }
217
218    struct timeval tv;
219    tv.tv_sec = 0;
220    tv.tv_usec = kSelectTimeoutUs;
221
222    fd_set rs;
223    FD_ZERO(&rs);
224
225    int maxSocket = -1;
226    for (List<StreamInfo>::iterator it = mStreams.begin();
227         it != mStreams.end(); ++it) {
228        FD_SET(it->mRTPSocket, &rs);
229        FD_SET(it->mRTCPSocket, &rs);
230
231        if (it->mRTPSocket > maxSocket) {
232            maxSocket = it->mRTPSocket;
233        }
234        if (it->mRTCPSocket > maxSocket) {
235            maxSocket = it->mRTCPSocket;
236        }
237    }
238
239    int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
240    CHECK_GE(res, 0);
241
242    if (res > 0) {
243        for (List<StreamInfo>::iterator it = mStreams.begin();
244             it != mStreams.end(); ++it) {
245            if (FD_ISSET(it->mRTPSocket, &rs)) {
246                receive(&*it, true);
247            }
248            if (FD_ISSET(it->mRTCPSocket, &rs)) {
249                receive(&*it, false);
250            }
251        }
252    }
253
254    postPollEvent();
255}
256
257status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
258    sp<ABuffer> buffer = new ABuffer(65536);
259
260    struct sockaddr_in from;
261    socklen_t fromSize = sizeof(from);
262
263    ssize_t nbytes = recvfrom(
264            receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
265            buffer->data(),
266            buffer->capacity(),
267            0,
268            (struct sockaddr *)&from,
269            &fromSize);
270
271    if (nbytes < 0) {
272        return -1;
273    }
274
275    buffer->setRange(0, nbytes);
276
277    status_t err;
278    if (receiveRTP) {
279        err = parseRTP(s, buffer);
280    } else {
281        err = parseRTCP(s, buffer);
282    }
283
284    return err;
285}
286
287status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
288    size_t size = buffer->size();
289
290    if (size < 12) {
291        // Too short to be a valid RTP header.
292        return -1;
293    }
294
295    const uint8_t *data = buffer->data();
296
297    if ((data[0] >> 6) != 2) {
298        // Unsupported version.
299        return -1;
300    }
301
302    if (data[0] & 0x20) {
303        // Padding present.
304
305        size_t paddingLength = data[size - 1];
306
307        if (paddingLength + 12 > size) {
308            // If we removed this much padding we'd end up with something
309            // that's too short to be a valid RTP header.
310            return -1;
311        }
312
313        size -= paddingLength;
314    }
315
316    int numCSRCs = data[0] & 0x0f;
317
318    size_t payloadOffset = 12 + 4 * numCSRCs;
319
320    if (size < payloadOffset) {
321        // Not enough data to fit the basic header and all the CSRC entries.
322        return -1;
323    }
324
325    if (data[0] & 0x10) {
326        // Header eXtension present.
327
328        if (size < payloadOffset + 4) {
329            // Not enough data to fit the basic header, all CSRC entries
330            // and the first 4 bytes of the extension header.
331
332            return -1;
333        }
334
335        const uint8_t *extensionData = &data[payloadOffset];
336
337        size_t extensionLength =
338            4 * (extensionData[2] << 8 | extensionData[3]);
339
340        if (size < payloadOffset + 4 + extensionLength) {
341            return -1;
342        }
343
344        payloadOffset += 4 + extensionLength;
345    }
346
347    uint32_t srcId = u32at(&data[8]);
348
349    sp<ARTPSource> source;
350    ssize_t index = mSources.indexOfKey(srcId);
351    if (index < 0) {
352        index = mSources.size();
353
354        source = new ARTPSource(
355                srcId, s->mSessionDesc, s->mIndex, s->mNotifyMsg);
356
357        mSources.add(srcId, source);
358    } else {
359        source = mSources.valueAt(index);
360    }
361
362    uint32_t rtpTime = u32at(&data[4]);
363
364    sp<AMessage> meta = buffer->meta();
365    meta->setInt32("ssrc", srcId);
366    meta->setInt32("rtp-time", rtpTime);
367    meta->setInt32("PT", data[1] & 0x7f);
368    meta->setInt32("M", data[1] >> 7);
369
370    buffer->setInt32Data(u16at(&data[2]));
371
372#if VERBOSE
373    printf("RTP = {\n"
374           "  PT: %d\n"
375           "  sequence number: %d\n"
376           "  RTP-time: 0x%08x\n"
377           "  M: %d\n"
378           "  SSRC: 0x%08x\n"
379           "}\n",
380           data[1] & 0x7f,
381           u16at(&data[2]),
382           rtpTime,
383           data[1] >> 7,
384           srcId);
385
386    // hexdump(&data[payloadOffset], size - payloadOffset);
387#endif
388
389    buffer->setRange(payloadOffset, size - payloadOffset);
390
391    source->processRTPPacket(buffer);
392
393    return OK;
394}
395
396status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
397    const uint8_t *data = buffer->data();
398    size_t size = buffer->size();
399
400    while (size > 0) {
401        if (size < 8) {
402            // Too short to be a valid RTCP header
403            return -1;
404        }
405
406        if ((data[0] >> 6) != 2) {
407            // Unsupported version.
408            return -1;
409        }
410
411        if (data[0] & 0x20) {
412            // Padding present.
413
414            size_t paddingLength = data[size - 1];
415
416            if (paddingLength + 12 > size) {
417                // If we removed this much padding we'd end up with something
418                // that's too short to be a valid RTP header.
419                return -1;
420            }
421
422            size -= paddingLength;
423        }
424
425        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
426
427        if (size < headerLength) {
428            // Only received a partial packet?
429            return -1;
430        }
431
432        switch (data[1]) {
433            case 200:
434            {
435                parseSR(s, data, headerLength);
436                break;
437            }
438
439            default:
440            {
441#if VERBOSE
442                printf("Unknown RTCP packet type %d of size %ld\n",
443                       data[1], headerLength);
444
445                hexdump(data, headerLength);
446#endif
447                break;
448            }
449        }
450
451        data += headerLength;
452        size -= headerLength;
453    }
454
455    return OK;
456}
457
458status_t ARTPConnection::parseSR(
459        StreamInfo *s, const uint8_t *data, size_t size) {
460    size_t RC = data[0] & 0x1f;
461
462    if (size < (7 + RC * 6) * 4) {
463        // Packet too short for the minimal SR header.
464        return -1;
465    }
466
467    uint32_t id = u32at(&data[4]);
468    uint64_t ntpTime = u64at(&data[8]);
469    uint32_t rtpTime = u32at(&data[16]);
470
471#if VERBOSE
472    printf("SR = {\n"
473           "  SSRC:      0x%08x\n"
474           "  NTP-time:  0x%016llx\n"
475           "  RTP-time:  0x%08x\n"
476           "}\n",
477           id, ntpTime, rtpTime);
478#endif
479
480    sp<ARTPSource> source;
481    ssize_t index = mSources.indexOfKey(id);
482    if (index < 0) {
483        index = mSources.size();
484
485        source = new ARTPSource(
486                id, s->mSessionDesc, s->mIndex, s->mNotifyMsg);
487
488        mSources.add(id, source);
489    } else {
490        source = mSources.valueAt(index);
491    }
492
493    source->timeUpdate(rtpTime, ntpTime);
494
495    return 0;
496}
497
498}  // namespace android
499
500