ARTPConnection.cpp revision b4a7a2df4c28c3f32b5d877b54831d2cc5d78f81
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "ARTPConnection"
19#include <utils/Log.h>
20
21#include "ARTPAssembler.h"
22#include "ARTPConnection.h"
23
24#include "ARTPSource.h"
25#include "ASessionDescription.h"
26
27#include <media/stagefright/foundation/ABuffer.h>
28#include <media/stagefright/foundation/ADebug.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/foundation/AString.h>
31#include <media/stagefright/foundation/hexdump.h>
32
33#include <arpa/inet.h>
34#include <sys/socket.h>
35
36namespace android {
37
38static const size_t kMaxUDPSize = 1500;
39
40static uint16_t u16at(const uint8_t *data) {
41    return data[0] << 8 | data[1];
42}
43
44static uint32_t u32at(const uint8_t *data) {
45    return u16at(data) << 16 | u16at(&data[2]);
46}
47
48static uint64_t u64at(const uint8_t *data) {
49    return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
50}
51
52// static
53const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
54
55struct ARTPConnection::StreamInfo {
56    int mRTPSocket;
57    int mRTCPSocket;
58    sp<ASessionDescription> mSessionDesc;
59    size_t mIndex;
60    sp<AMessage> mNotifyMsg;
61    KeyedVector<uint32_t, sp<ARTPSource> > mSources;
62
63    int64_t mNumRTCPPacketsReceived;
64    int64_t mNumRTPPacketsReceived;
65    struct sockaddr_in mRemoteRTCPAddr;
66
67    bool mIsInjected;
68};
69
70ARTPConnection::ARTPConnection(uint32_t flags)
71    : mFlags(flags),
72      mPollEventPending(false),
73      mLastReceiverReportTimeUs(-1) {
74}
75
76ARTPConnection::~ARTPConnection() {
77}
78
79void ARTPConnection::addStream(
80        int rtpSocket, int rtcpSocket,
81        const sp<ASessionDescription> &sessionDesc,
82        size_t index,
83        const sp<AMessage> &notify,
84        bool injected) {
85    sp<AMessage> msg = new AMessage(kWhatAddStream, id());
86    msg->setInt32("rtp-socket", rtpSocket);
87    msg->setInt32("rtcp-socket", rtcpSocket);
88    msg->setObject("session-desc", sessionDesc);
89    msg->setSize("index", index);
90    msg->setMessage("notify", notify);
91    msg->setInt32("injected", injected);
92    msg->post();
93}
94
95void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
96    sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
97    msg->setInt32("rtp-socket", rtpSocket);
98    msg->setInt32("rtcp-socket", rtcpSocket);
99    msg->post();
100}
101
102static void bumpSocketBufferSize(int s) {
103    int size = 256 * 1024;
104    CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
105}
106
107// static
108void ARTPConnection::MakePortPair(
109        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
110    *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
111    CHECK_GE(*rtpSocket, 0);
112
113    bumpSocketBufferSize(*rtpSocket);
114
115    *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
116    CHECK_GE(*rtcpSocket, 0);
117
118    bumpSocketBufferSize(*rtcpSocket);
119
120    /* rand() * 1000 may overflow int type, use long long */
121    unsigned start = (unsigned)((rand()* 1000ll)/RAND_MAX) + 15550;
122    start &= ~1;
123
124    for (unsigned port = start; port < 65536; port += 2) {
125        struct sockaddr_in addr;
126        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
127        addr.sin_family = AF_INET;
128        addr.sin_addr.s_addr = htonl(INADDR_ANY);
129        addr.sin_port = htons(port);
130
131        if (bind(*rtpSocket,
132                 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
133            continue;
134        }
135
136        addr.sin_port = htons(port + 1);
137
138        if (bind(*rtcpSocket,
139                 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
140            *rtpPort = port;
141            return;
142        }
143    }
144
145    TRESPASS();
146}
147
148void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
149    switch (msg->what()) {
150        case kWhatAddStream:
151        {
152            onAddStream(msg);
153            break;
154        }
155
156        case kWhatRemoveStream:
157        {
158            onRemoveStream(msg);
159            break;
160        }
161
162        case kWhatPollStreams:
163        {
164            onPollStreams();
165            break;
166        }
167
168        case kWhatInjectPacket:
169        {
170            onInjectPacket(msg);
171            break;
172        }
173
174        default:
175        {
176            TRESPASS();
177            break;
178        }
179    }
180}
181
182void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
183    mStreams.push_back(StreamInfo());
184    StreamInfo *info = &*--mStreams.end();
185
186    int32_t s;
187    CHECK(msg->findInt32("rtp-socket", &s));
188    info->mRTPSocket = s;
189    CHECK(msg->findInt32("rtcp-socket", &s));
190    info->mRTCPSocket = s;
191
192    int32_t injected;
193    CHECK(msg->findInt32("injected", &injected));
194
195    info->mIsInjected = injected;
196
197    sp<RefBase> obj;
198    CHECK(msg->findObject("session-desc", &obj));
199    info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
200
201    CHECK(msg->findSize("index", &info->mIndex));
202    CHECK(msg->findMessage("notify", &info->mNotifyMsg));
203
204    info->mNumRTCPPacketsReceived = 0;
205    info->mNumRTPPacketsReceived = 0;
206    memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
207
208    if (!injected) {
209        postPollEvent();
210    }
211}
212
213void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
214    int32_t rtpSocket, rtcpSocket;
215    CHECK(msg->findInt32("rtp-socket", &rtpSocket));
216    CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
217
218    List<StreamInfo>::iterator it = mStreams.begin();
219    while (it != mStreams.end()
220           && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
221        ++it;
222    }
223
224    if (it == mStreams.end()) {
225        return;
226    }
227
228    mStreams.erase(it);
229}
230
231void ARTPConnection::postPollEvent() {
232    if (mPollEventPending) {
233        return;
234    }
235
236    sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
237    msg->post();
238
239    mPollEventPending = true;
240}
241
242void ARTPConnection::onPollStreams() {
243    mPollEventPending = false;
244
245    if (mStreams.empty()) {
246        return;
247    }
248
249    struct timeval tv;
250    tv.tv_sec = 0;
251    tv.tv_usec = kSelectTimeoutUs;
252
253    fd_set rs;
254    FD_ZERO(&rs);
255
256    int maxSocket = -1;
257    for (List<StreamInfo>::iterator it = mStreams.begin();
258         it != mStreams.end(); ++it) {
259        if ((*it).mIsInjected) {
260            continue;
261        }
262
263        FD_SET(it->mRTPSocket, &rs);
264        FD_SET(it->mRTCPSocket, &rs);
265
266        if (it->mRTPSocket > maxSocket) {
267            maxSocket = it->mRTPSocket;
268        }
269        if (it->mRTCPSocket > maxSocket) {
270            maxSocket = it->mRTCPSocket;
271        }
272    }
273
274    if (maxSocket == -1) {
275        return;
276    }
277
278    int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
279
280    if (res > 0) {
281        List<StreamInfo>::iterator it = mStreams.begin();
282        while (it != mStreams.end()) {
283            if ((*it).mIsInjected) {
284                ++it;
285                continue;
286            }
287
288            status_t err = OK;
289            if (FD_ISSET(it->mRTPSocket, &rs)) {
290                err = receive(&*it, true);
291            }
292            if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
293                err = receive(&*it, false);
294            }
295
296            if (err == -ECONNRESET) {
297                // socket failure, this stream is dead, Jim.
298
299                ALOGW("failed to receive RTP/RTCP datagram.");
300                it = mStreams.erase(it);
301                continue;
302            }
303
304            ++it;
305        }
306    }
307
308    int64_t nowUs = ALooper::GetNowUs();
309    if (mLastReceiverReportTimeUs <= 0
310            || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
311        sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
312        List<StreamInfo>::iterator it = mStreams.begin();
313        while (it != mStreams.end()) {
314            StreamInfo *s = &*it;
315
316            if (s->mIsInjected) {
317                ++it;
318                continue;
319            }
320
321            if (s->mNumRTCPPacketsReceived == 0) {
322                // We have never received any RTCP packets on this stream,
323                // we don't even know where to send a report.
324                ++it;
325                continue;
326            }
327
328            buffer->setRange(0, 0);
329
330            for (size_t i = 0; i < s->mSources.size(); ++i) {
331                sp<ARTPSource> source = s->mSources.valueAt(i);
332
333                source->addReceiverReport(buffer);
334
335                if (mFlags & kRegularlyRequestFIR) {
336                    source->addFIR(buffer);
337                }
338            }
339
340            if (buffer->size() > 0) {
341                ALOGV("Sending RR...");
342
343                ssize_t n;
344                do {
345                    n = sendto(
346                        s->mRTCPSocket, buffer->data(), buffer->size(), 0,
347                        (const struct sockaddr *)&s->mRemoteRTCPAddr,
348                        sizeof(s->mRemoteRTCPAddr));
349                } while (n < 0 && errno == EINTR);
350
351                if (n <= 0) {
352                    ALOGW("failed to send RTCP receiver report (%s).",
353                         n == 0 ? "connection gone" : strerror(errno));
354
355                    it = mStreams.erase(it);
356                    continue;
357                }
358
359                CHECK_EQ(n, (ssize_t)buffer->size());
360
361                mLastReceiverReportTimeUs = nowUs;
362            }
363
364            ++it;
365        }
366    }
367
368    if (!mStreams.empty()) {
369        postPollEvent();
370    }
371}
372
373status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
374    ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
375
376    CHECK(!s->mIsInjected);
377
378    sp<ABuffer> buffer = new ABuffer(65536);
379
380    socklen_t remoteAddrLen =
381        (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
382            ? sizeof(s->mRemoteRTCPAddr) : 0;
383
384    ssize_t nbytes;
385    do {
386        nbytes = recvfrom(
387            receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
388            buffer->data(),
389            buffer->capacity(),
390            0,
391            remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
392            remoteAddrLen > 0 ? &remoteAddrLen : NULL);
393    } while (nbytes < 0 && errno == EINTR);
394
395    if (nbytes <= 0) {
396        return -ECONNRESET;
397    }
398
399    buffer->setRange(0, nbytes);
400
401    // ALOGI("received %d bytes.", buffer->size());
402
403    status_t err;
404    if (receiveRTP) {
405        err = parseRTP(s, buffer);
406    } else {
407        err = parseRTCP(s, buffer);
408    }
409
410    return err;
411}
412
413status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
414    if (s->mNumRTPPacketsReceived++ == 0) {
415        sp<AMessage> notify = s->mNotifyMsg->dup();
416        notify->setInt32("first-rtp", true);
417        notify->post();
418    }
419
420    size_t size = buffer->size();
421
422    if (size < 12) {
423        // Too short to be a valid RTP header.
424        return -1;
425    }
426
427    const uint8_t *data = buffer->data();
428
429    if ((data[0] >> 6) != 2) {
430        // Unsupported version.
431        return -1;
432    }
433
434    if (data[0] & 0x20) {
435        // Padding present.
436
437        size_t paddingLength = data[size - 1];
438
439        if (paddingLength + 12 > size) {
440            // If we removed this much padding we'd end up with something
441            // that's too short to be a valid RTP header.
442            return -1;
443        }
444
445        size -= paddingLength;
446    }
447
448    int numCSRCs = data[0] & 0x0f;
449
450    size_t payloadOffset = 12 + 4 * numCSRCs;
451
452    if (size < payloadOffset) {
453        // Not enough data to fit the basic header and all the CSRC entries.
454        return -1;
455    }
456
457    if (data[0] & 0x10) {
458        // Header eXtension present.
459
460        if (size < payloadOffset + 4) {
461            // Not enough data to fit the basic header, all CSRC entries
462            // and the first 4 bytes of the extension header.
463
464            return -1;
465        }
466
467        const uint8_t *extensionData = &data[payloadOffset];
468
469        size_t extensionLength =
470            4 * (extensionData[2] << 8 | extensionData[3]);
471
472        if (size < payloadOffset + 4 + extensionLength) {
473            return -1;
474        }
475
476        payloadOffset += 4 + extensionLength;
477    }
478
479    uint32_t srcId = u32at(&data[8]);
480
481    sp<ARTPSource> source = findSource(s, srcId);
482
483    uint32_t rtpTime = u32at(&data[4]);
484
485    sp<AMessage> meta = buffer->meta();
486    meta->setInt32("ssrc", srcId);
487    meta->setInt32("rtp-time", rtpTime);
488    meta->setInt32("PT", data[1] & 0x7f);
489    meta->setInt32("M", data[1] >> 7);
490
491    buffer->setInt32Data(u16at(&data[2]));
492    buffer->setRange(payloadOffset, size - payloadOffset);
493
494    source->processRTPPacket(buffer);
495
496    return OK;
497}
498
499status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
500    if (s->mNumRTCPPacketsReceived++ == 0) {
501        sp<AMessage> notify = s->mNotifyMsg->dup();
502        notify->setInt32("first-rtcp", true);
503        notify->post();
504    }
505
506    const uint8_t *data = buffer->data();
507    size_t size = buffer->size();
508
509    while (size > 0) {
510        if (size < 8) {
511            // Too short to be a valid RTCP header
512            return -1;
513        }
514
515        if ((data[0] >> 6) != 2) {
516            // Unsupported version.
517            return -1;
518        }
519
520        if (data[0] & 0x20) {
521            // Padding present.
522
523            size_t paddingLength = data[size - 1];
524
525            if (paddingLength + 12 > size) {
526                // If we removed this much padding we'd end up with something
527                // that's too short to be a valid RTP header.
528                return -1;
529            }
530
531            size -= paddingLength;
532        }
533
534        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
535
536        if (size < headerLength) {
537            // Only received a partial packet?
538            return -1;
539        }
540
541        switch (data[1]) {
542            case 200:
543            {
544                parseSR(s, data, headerLength);
545                break;
546            }
547
548            case 201:  // RR
549            case 202:  // SDES
550            case 204:  // APP
551                break;
552
553            case 205:  // TSFB (transport layer specific feedback)
554            case 206:  // PSFB (payload specific feedback)
555                // hexdump(data, headerLength);
556                break;
557
558            case 203:
559            {
560                parseBYE(s, data, headerLength);
561                break;
562            }
563
564            default:
565            {
566                ALOGW("Unknown RTCP packet type %u of size %zu",
567                     (unsigned)data[1], headerLength);
568                break;
569            }
570        }
571
572        data += headerLength;
573        size -= headerLength;
574    }
575
576    return OK;
577}
578
579status_t ARTPConnection::parseBYE(
580        StreamInfo *s, const uint8_t *data, size_t size) {
581    size_t SC = data[0] & 0x3f;
582
583    if (SC == 0 || size < (4 + SC * 4)) {
584        // Packet too short for the minimal BYE header.
585        return -1;
586    }
587
588    uint32_t id = u32at(&data[4]);
589
590    sp<ARTPSource> source = findSource(s, id);
591
592    source->byeReceived();
593
594    return OK;
595}
596
597status_t ARTPConnection::parseSR(
598        StreamInfo *s, const uint8_t *data, size_t size) {
599    size_t RC = data[0] & 0x1f;
600
601    if (size < (7 + RC * 6) * 4) {
602        // Packet too short for the minimal SR header.
603        return -1;
604    }
605
606    uint32_t id = u32at(&data[4]);
607    uint64_t ntpTime = u64at(&data[8]);
608    uint32_t rtpTime = u32at(&data[16]);
609
610#if 0
611    ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
612         id,
613         rtpTime,
614         (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
615#endif
616
617    sp<ARTPSource> source = findSource(s, id);
618
619    source->timeUpdate(rtpTime, ntpTime);
620
621    return 0;
622}
623
624sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
625    sp<ARTPSource> source;
626    ssize_t index = info->mSources.indexOfKey(srcId);
627    if (index < 0) {
628        index = info->mSources.size();
629
630        source = new ARTPSource(
631                srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
632
633        info->mSources.add(srcId, source);
634    } else {
635        source = info->mSources.valueAt(index);
636    }
637
638    return source;
639}
640
641void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
642    sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
643    msg->setInt32("index", index);
644    msg->setBuffer("buffer", buffer);
645    msg->post();
646}
647
648void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
649    int32_t index;
650    CHECK(msg->findInt32("index", &index));
651
652    sp<ABuffer> buffer;
653    CHECK(msg->findBuffer("buffer", &buffer));
654
655    List<StreamInfo>::iterator it = mStreams.begin();
656    while (it != mStreams.end()
657           && it->mRTPSocket != index && it->mRTCPSocket != index) {
658        ++it;
659    }
660
661    if (it == mStreams.end()) {
662        TRESPASS();
663    }
664
665    StreamInfo *s = &*it;
666
667    status_t err;
668    if (it->mRTPSocket == index) {
669        err = parseRTP(s, buffer);
670    } else {
671        err = parseRTCP(s, buffer);
672    }
673}
674
675}  // namespace android
676
677