ARTPConnection.cpp revision 6e4c5c499999c04c2477b987f9e64f3ff2bf1a06
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "ARTPConnection"
19#include <utils/Log.h>
20
21#include "ARTPConnection.h"
22
23#include "ARTPSource.h"
24#include "ASessionDescription.h"
25
26#include <media/stagefright/foundation/ABuffer.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/AMessage.h>
29#include <media/stagefright/foundation/AString.h>
30#include <media/stagefright/foundation/hexdump.h>
31
32#include <arpa/inet.h>
33#include <sys/socket.h>
34
35namespace android {
36
37static const size_t kMaxUDPSize = 1500;
38
39static uint16_t u16at(const uint8_t *data) {
40    return data[0] << 8 | data[1];
41}
42
43static uint32_t u32at(const uint8_t *data) {
44    return u16at(data) << 16 | u16at(&data[2]);
45}
46
47static uint64_t u64at(const uint8_t *data) {
48    return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
49}
50
51// static
52const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
53
54struct ARTPConnection::StreamInfo {
55    int mRTPSocket;
56    int mRTCPSocket;
57    sp<ASessionDescription> mSessionDesc;
58    size_t mIndex;
59    sp<AMessage> mNotifyMsg;
60    KeyedVector<uint32_t, sp<ARTPSource> > mSources;
61
62    int32_t mNumRTCPPacketsReceived;
63    struct sockaddr_in mRemoteRTCPAddr;
64
65    bool mIsInjected;
66};
67
68ARTPConnection::ARTPConnection(uint32_t flags)
69    : mFlags(flags),
70      mPollEventPending(false),
71      mLastReceiverReportTimeUs(-1) {
72}
73
74ARTPConnection::~ARTPConnection() {
75}
76
77void ARTPConnection::addStream(
78        int rtpSocket, int rtcpSocket,
79        const sp<ASessionDescription> &sessionDesc,
80        size_t index,
81        const sp<AMessage> &notify,
82        bool injected) {
83    sp<AMessage> msg = new AMessage(kWhatAddStream, id());
84    msg->setInt32("rtp-socket", rtpSocket);
85    msg->setInt32("rtcp-socket", rtcpSocket);
86    msg->setObject("session-desc", sessionDesc);
87    msg->setSize("index", index);
88    msg->setMessage("notify", notify);
89    msg->setInt32("injected", injected);
90    msg->post();
91}
92
93void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
94    sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
95    msg->setInt32("rtp-socket", rtpSocket);
96    msg->setInt32("rtcp-socket", rtcpSocket);
97    msg->post();
98}
99
100static void bumpSocketBufferSize(int s) {
101    int size = 256 * 1024;
102    CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
103}
104
105// static
106void ARTPConnection::MakePortPair(
107        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
108    *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
109    CHECK_GE(*rtpSocket, 0);
110
111    bumpSocketBufferSize(*rtpSocket);
112
113    *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
114    CHECK_GE(*rtcpSocket, 0);
115
116    bumpSocketBufferSize(*rtcpSocket);
117
118    unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
119    start &= ~1;
120
121    for (unsigned port = start; port < 65536; port += 2) {
122        struct sockaddr_in addr;
123        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
124        addr.sin_family = AF_INET;
125        addr.sin_addr.s_addr = INADDR_ANY;
126        addr.sin_port = htons(port);
127
128        if (bind(*rtpSocket,
129                 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
130            continue;
131        }
132
133        addr.sin_port = htons(port + 1);
134
135        if (bind(*rtcpSocket,
136                 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
137            *rtpPort = port;
138            return;
139        }
140    }
141
142    TRESPASS();
143}
144
145void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
146    switch (msg->what()) {
147        case kWhatAddStream:
148        {
149            onAddStream(msg);
150            break;
151        }
152
153        case kWhatRemoveStream:
154        {
155            onRemoveStream(msg);
156            break;
157        }
158
159        case kWhatPollStreams:
160        {
161            onPollStreams();
162            break;
163        }
164
165        case kWhatInjectPacket:
166        {
167            onInjectPacket(msg);
168            break;
169        }
170
171        default:
172        {
173            TRESPASS();
174            break;
175        }
176    }
177}
178
179void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
180    mStreams.push_back(StreamInfo());
181    StreamInfo *info = &*--mStreams.end();
182
183    int32_t s;
184    CHECK(msg->findInt32("rtp-socket", &s));
185    info->mRTPSocket = s;
186    CHECK(msg->findInt32("rtcp-socket", &s));
187    info->mRTCPSocket = s;
188
189    int32_t injected;
190    CHECK(msg->findInt32("injected", &injected));
191
192    info->mIsInjected = injected;
193
194    sp<RefBase> obj;
195    CHECK(msg->findObject("session-desc", &obj));
196    info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
197
198    CHECK(msg->findSize("index", &info->mIndex));
199    CHECK(msg->findMessage("notify", &info->mNotifyMsg));
200
201    info->mNumRTCPPacketsReceived = 0;
202    memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
203
204    if (!injected) {
205        postPollEvent();
206    }
207}
208
209void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
210    int32_t rtpSocket, rtcpSocket;
211    CHECK(msg->findInt32("rtp-socket", &rtpSocket));
212    CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
213
214    List<StreamInfo>::iterator it = mStreams.begin();
215    while (it != mStreams.end()
216           && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
217        ++it;
218    }
219
220    if (it == mStreams.end()) {
221        TRESPASS();
222    }
223
224    mStreams.erase(it);
225}
226
227void ARTPConnection::postPollEvent() {
228    if (mPollEventPending) {
229        return;
230    }
231
232    sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
233    msg->post();
234
235    mPollEventPending = true;
236}
237
238void ARTPConnection::onPollStreams() {
239    mPollEventPending = false;
240
241    if (mStreams.empty()) {
242        return;
243    }
244
245    struct timeval tv;
246    tv.tv_sec = 0;
247    tv.tv_usec = kSelectTimeoutUs;
248
249    fd_set rs;
250    FD_ZERO(&rs);
251
252    int maxSocket = -1;
253    for (List<StreamInfo>::iterator it = mStreams.begin();
254         it != mStreams.end(); ++it) {
255        if ((*it).mIsInjected) {
256            continue;
257        }
258
259        FD_SET(it->mRTPSocket, &rs);
260        FD_SET(it->mRTCPSocket, &rs);
261
262        if (it->mRTPSocket > maxSocket) {
263            maxSocket = it->mRTPSocket;
264        }
265        if (it->mRTCPSocket > maxSocket) {
266            maxSocket = it->mRTCPSocket;
267        }
268    }
269
270    if (maxSocket == -1) {
271        return;
272    }
273
274    int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
275    CHECK_GE(res, 0);
276
277    if (res > 0) {
278        for (List<StreamInfo>::iterator it = mStreams.begin();
279             it != mStreams.end(); ++it) {
280            if ((*it).mIsInjected) {
281                continue;
282            }
283
284            if (FD_ISSET(it->mRTPSocket, &rs)) {
285                receive(&*it, true);
286            }
287            if (FD_ISSET(it->mRTCPSocket, &rs)) {
288                receive(&*it, false);
289            }
290        }
291    }
292
293    postPollEvent();
294
295    int64_t nowUs = ALooper::GetNowUs();
296    if (mLastReceiverReportTimeUs <= 0
297            || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
298        sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
299        for (List<StreamInfo>::iterator it = mStreams.begin();
300             it != mStreams.end(); ++it) {
301            StreamInfo *s = &*it;
302
303            if (s->mIsInjected) {
304                continue;
305            }
306
307            if (s->mNumRTCPPacketsReceived == 0) {
308                // We have never received any RTCP packets on this stream,
309                // we don't even know where to send a report.
310                continue;
311            }
312
313            buffer->setRange(0, 0);
314
315            for (size_t i = 0; i < s->mSources.size(); ++i) {
316                sp<ARTPSource> source = s->mSources.valueAt(i);
317
318                source->addReceiverReport(buffer);
319
320                if (mFlags & kRegularlyRequestFIR) {
321                    source->addFIR(buffer);
322                }
323            }
324
325            if (buffer->size() > 0) {
326                LOGV("Sending RR...");
327
328                ssize_t n = sendto(
329                        s->mRTCPSocket, buffer->data(), buffer->size(), 0,
330                        (const struct sockaddr *)&s->mRemoteRTCPAddr,
331                        sizeof(s->mRemoteRTCPAddr));
332                CHECK_EQ(n, (ssize_t)buffer->size());
333
334                mLastReceiverReportTimeUs = nowUs;
335            }
336        }
337    }
338}
339
340status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
341    CHECK(!s->mIsInjected);
342
343    sp<ABuffer> buffer = new ABuffer(65536);
344
345    socklen_t remoteAddrLen =
346        (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
347            ? sizeof(s->mRemoteRTCPAddr) : 0;
348
349    ssize_t nbytes = recvfrom(
350            receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
351            buffer->data(),
352            buffer->capacity(),
353            0,
354            remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
355            remoteAddrLen > 0 ? &remoteAddrLen : NULL);
356
357    if (nbytes < 0) {
358        return -1;
359    }
360
361    buffer->setRange(0, nbytes);
362
363    // LOGI("received %d bytes.", buffer->size());
364
365    status_t err;
366    if (receiveRTP) {
367        err = parseRTP(s, buffer);
368    } else {
369        err = parseRTCP(s, buffer);
370    }
371
372    return err;
373}
374
375status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
376    size_t size = buffer->size();
377
378    if (size < 12) {
379        // Too short to be a valid RTP header.
380        return -1;
381    }
382
383    const uint8_t *data = buffer->data();
384
385    if ((data[0] >> 6) != 2) {
386        // Unsupported version.
387        return -1;
388    }
389
390    if (data[0] & 0x20) {
391        // Padding present.
392
393        size_t paddingLength = data[size - 1];
394
395        if (paddingLength + 12 > size) {
396            // If we removed this much padding we'd end up with something
397            // that's too short to be a valid RTP header.
398            return -1;
399        }
400
401        size -= paddingLength;
402    }
403
404    int numCSRCs = data[0] & 0x0f;
405
406    size_t payloadOffset = 12 + 4 * numCSRCs;
407
408    if (size < payloadOffset) {
409        // Not enough data to fit the basic header and all the CSRC entries.
410        return -1;
411    }
412
413    if (data[0] & 0x10) {
414        // Header eXtension present.
415
416        if (size < payloadOffset + 4) {
417            // Not enough data to fit the basic header, all CSRC entries
418            // and the first 4 bytes of the extension header.
419
420            return -1;
421        }
422
423        const uint8_t *extensionData = &data[payloadOffset];
424
425        size_t extensionLength =
426            4 * (extensionData[2] << 8 | extensionData[3]);
427
428        if (size < payloadOffset + 4 + extensionLength) {
429            return -1;
430        }
431
432        payloadOffset += 4 + extensionLength;
433    }
434
435    uint32_t srcId = u32at(&data[8]);
436
437    sp<ARTPSource> source = findSource(s, srcId);
438
439    uint32_t rtpTime = u32at(&data[4]);
440
441    sp<AMessage> meta = buffer->meta();
442    meta->setInt32("ssrc", srcId);
443    meta->setInt32("rtp-time", rtpTime);
444    meta->setInt32("PT", data[1] & 0x7f);
445    meta->setInt32("M", data[1] >> 7);
446
447    buffer->setInt32Data(u16at(&data[2]));
448    buffer->setRange(payloadOffset, size - payloadOffset);
449
450    if ((mFlags & kFakeTimestamps) && !source->timeEstablished()) {
451        source->timeUpdate(rtpTime, 0);
452        source->timeUpdate(rtpTime + 90000, 0x100000000ll);
453        CHECK(source->timeEstablished());
454    }
455
456    source->processRTPPacket(buffer);
457
458    return OK;
459}
460
461status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
462    if (s->mNumRTCPPacketsReceived++ == 0) {
463        sp<AMessage> notify = s->mNotifyMsg->dup();
464        notify->setInt32("first-rtcp", true);
465        notify->post();
466    }
467
468    const uint8_t *data = buffer->data();
469    size_t size = buffer->size();
470
471    while (size > 0) {
472        if (size < 8) {
473            // Too short to be a valid RTCP header
474            return -1;
475        }
476
477        if ((data[0] >> 6) != 2) {
478            // Unsupported version.
479            return -1;
480        }
481
482        if (data[0] & 0x20) {
483            // Padding present.
484
485            size_t paddingLength = data[size - 1];
486
487            if (paddingLength + 12 > size) {
488                // If we removed this much padding we'd end up with something
489                // that's too short to be a valid RTP header.
490                return -1;
491            }
492
493            size -= paddingLength;
494        }
495
496        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
497
498        if (size < headerLength) {
499            // Only received a partial packet?
500            return -1;
501        }
502
503        switch (data[1]) {
504            case 200:
505            {
506                parseSR(s, data, headerLength);
507                break;
508            }
509
510            case 201:  // RR
511            case 202:  // SDES
512            case 204:  // APP
513                break;
514
515            case 205:  // TSFB (transport layer specific feedback)
516            case 206:  // PSFB (payload specific feedback)
517                // hexdump(data, headerLength);
518                break;
519
520            case 203:
521            {
522                parseBYE(s, data, headerLength);
523                break;
524            }
525
526            default:
527            {
528                LOGW("Unknown RTCP packet type %u of size %d",
529                     (unsigned)data[1], headerLength);
530                break;
531            }
532        }
533
534        data += headerLength;
535        size -= headerLength;
536    }
537
538    return OK;
539}
540
541status_t ARTPConnection::parseBYE(
542        StreamInfo *s, const uint8_t *data, size_t size) {
543    size_t SC = data[0] & 0x3f;
544
545    if (SC == 0 || size < (4 + SC * 4)) {
546        // Packet too short for the minimal BYE header.
547        return -1;
548    }
549
550    uint32_t id = u32at(&data[4]);
551
552    sp<ARTPSource> source = findSource(s, id);
553
554    source->byeReceived();
555
556    return OK;
557}
558
559status_t ARTPConnection::parseSR(
560        StreamInfo *s, const uint8_t *data, size_t size) {
561    size_t RC = data[0] & 0x1f;
562
563    if (size < (7 + RC * 6) * 4) {
564        // Packet too short for the minimal SR header.
565        return -1;
566    }
567
568    uint32_t id = u32at(&data[4]);
569    uint64_t ntpTime = u64at(&data[8]);
570    uint32_t rtpTime = u32at(&data[16]);
571
572#if 0
573    LOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
574         id,
575         rtpTime,
576         (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
577#endif
578
579    sp<ARTPSource> source = findSource(s, id);
580
581    if ((mFlags & kFakeTimestamps) == 0) {
582        source->timeUpdate(rtpTime, ntpTime);
583    }
584
585    return 0;
586}
587
588sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
589    sp<ARTPSource> source;
590    ssize_t index = info->mSources.indexOfKey(srcId);
591    if (index < 0) {
592        index = info->mSources.size();
593
594        source = new ARTPSource(
595                srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
596
597        info->mSources.add(srcId, source);
598    } else {
599        source = info->mSources.valueAt(index);
600    }
601
602    return source;
603}
604
605void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
606    sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
607    msg->setInt32("index", index);
608    msg->setObject("buffer", buffer);
609    msg->post();
610}
611
612void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
613    int32_t index;
614    CHECK(msg->findInt32("index", &index));
615
616    sp<RefBase> obj;
617    CHECK(msg->findObject("buffer", &obj));
618
619    sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
620
621    List<StreamInfo>::iterator it = mStreams.begin();
622    while (it != mStreams.end()
623           && it->mRTPSocket != index && it->mRTCPSocket != index) {
624        ++it;
625    }
626
627    if (it == mStreams.end()) {
628        TRESPASS();
629    }
630
631    StreamInfo *s = &*it;
632
633    status_t err;
634    if (it->mRTPSocket == index) {
635        err = parseRTP(s, buffer);
636    } else {
637        err = parseRTCP(s, buffer);
638    }
639}
640
641}  // namespace android
642
643