1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "ARTPConnection"
19#include <utils/Log.h>
20
21#include "ARTPAssembler.h"
22#include "ARTPConnection.h"
23
24#include "ARTPSource.h"
25#include "ASessionDescription.h"
26
27#include <media/stagefright/foundation/ABuffer.h>
28#include <media/stagefright/foundation/ADebug.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/foundation/AString.h>
31#include <media/stagefright/foundation/hexdump.h>
32
33#include <arpa/inet.h>
34#include <sys/socket.h>
35
36namespace android {
37
38static const size_t kMaxUDPSize = 1500;
39
40static uint16_t u16at(const uint8_t *data) {
41    return data[0] << 8 | data[1];
42}
43
44static uint32_t u32at(const uint8_t *data) {
45    return u16at(data) << 16 | u16at(&data[2]);
46}
47
48static uint64_t u64at(const uint8_t *data) {
49    return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
50}
51
52// static
53const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
54
55struct ARTPConnection::StreamInfo {
56    int mRTPSocket;
57    int mRTCPSocket;
58    sp<ASessionDescription> mSessionDesc;
59    size_t mIndex;
60    sp<AMessage> mNotifyMsg;
61    KeyedVector<uint32_t, sp<ARTPSource> > mSources;
62
63    int64_t mNumRTCPPacketsReceived;
64    int64_t mNumRTPPacketsReceived;
65    struct sockaddr_in mRemoteRTCPAddr;
66
67    bool mIsInjected;
68};
69
70ARTPConnection::ARTPConnection(uint32_t flags)
71    : mFlags(flags),
72      mPollEventPending(false),
73      mLastReceiverReportTimeUs(-1) {
74}
75
76ARTPConnection::~ARTPConnection() {
77}
78
79void ARTPConnection::addStream(
80        int rtpSocket, int rtcpSocket,
81        const sp<ASessionDescription> &sessionDesc,
82        size_t index,
83        const sp<AMessage> &notify,
84        bool injected) {
85    sp<AMessage> msg = new AMessage(kWhatAddStream, id());
86    msg->setInt32("rtp-socket", rtpSocket);
87    msg->setInt32("rtcp-socket", rtcpSocket);
88    msg->setObject("session-desc", sessionDesc);
89    msg->setSize("index", index);
90    msg->setMessage("notify", notify);
91    msg->setInt32("injected", injected);
92    msg->post();
93}
94
95void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
96    sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
97    msg->setInt32("rtp-socket", rtpSocket);
98    msg->setInt32("rtcp-socket", rtcpSocket);
99    msg->post();
100}
101
102static void bumpSocketBufferSize(int s) {
103    int size = 256 * 1024;
104    CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
105}
106
107// static
108void ARTPConnection::MakePortPair(
109        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
110    *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
111    CHECK_GE(*rtpSocket, 0);
112
113    bumpSocketBufferSize(*rtpSocket);
114
115    *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
116    CHECK_GE(*rtcpSocket, 0);
117
118    bumpSocketBufferSize(*rtcpSocket);
119
120    unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
121    start &= ~1;
122
123    for (unsigned port = start; port < 65536; port += 2) {
124        struct sockaddr_in addr;
125        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
126        addr.sin_family = AF_INET;
127        addr.sin_addr.s_addr = htonl(INADDR_ANY);
128        addr.sin_port = htons(port);
129
130        if (bind(*rtpSocket,
131                 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
132            continue;
133        }
134
135        addr.sin_port = htons(port + 1);
136
137        if (bind(*rtcpSocket,
138                 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
139            *rtpPort = port;
140            return;
141        }
142    }
143
144    TRESPASS();
145}
146
147void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
148    switch (msg->what()) {
149        case kWhatAddStream:
150        {
151            onAddStream(msg);
152            break;
153        }
154
155        case kWhatRemoveStream:
156        {
157            onRemoveStream(msg);
158            break;
159        }
160
161        case kWhatPollStreams:
162        {
163            onPollStreams();
164            break;
165        }
166
167        case kWhatInjectPacket:
168        {
169            onInjectPacket(msg);
170            break;
171        }
172
173        default:
174        {
175            TRESPASS();
176            break;
177        }
178    }
179}
180
181void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
182    mStreams.push_back(StreamInfo());
183    StreamInfo *info = &*--mStreams.end();
184
185    int32_t s;
186    CHECK(msg->findInt32("rtp-socket", &s));
187    info->mRTPSocket = s;
188    CHECK(msg->findInt32("rtcp-socket", &s));
189    info->mRTCPSocket = s;
190
191    int32_t injected;
192    CHECK(msg->findInt32("injected", &injected));
193
194    info->mIsInjected = injected;
195
196    sp<RefBase> obj;
197    CHECK(msg->findObject("session-desc", &obj));
198    info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
199
200    CHECK(msg->findSize("index", &info->mIndex));
201    CHECK(msg->findMessage("notify", &info->mNotifyMsg));
202
203    info->mNumRTCPPacketsReceived = 0;
204    info->mNumRTPPacketsReceived = 0;
205    memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
206
207    if (!injected) {
208        postPollEvent();
209    }
210}
211
212void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
213    int32_t rtpSocket, rtcpSocket;
214    CHECK(msg->findInt32("rtp-socket", &rtpSocket));
215    CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
216
217    List<StreamInfo>::iterator it = mStreams.begin();
218    while (it != mStreams.end()
219           && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
220        ++it;
221    }
222
223    if (it == mStreams.end()) {
224        return;
225    }
226
227    mStreams.erase(it);
228}
229
230void ARTPConnection::postPollEvent() {
231    if (mPollEventPending) {
232        return;
233    }
234
235    sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
236    msg->post();
237
238    mPollEventPending = true;
239}
240
241void ARTPConnection::onPollStreams() {
242    mPollEventPending = false;
243
244    if (mStreams.empty()) {
245        return;
246    }
247
248    struct timeval tv;
249    tv.tv_sec = 0;
250    tv.tv_usec = kSelectTimeoutUs;
251
252    fd_set rs;
253    FD_ZERO(&rs);
254
255    int maxSocket = -1;
256    for (List<StreamInfo>::iterator it = mStreams.begin();
257         it != mStreams.end(); ++it) {
258        if ((*it).mIsInjected) {
259            continue;
260        }
261
262        FD_SET(it->mRTPSocket, &rs);
263        FD_SET(it->mRTCPSocket, &rs);
264
265        if (it->mRTPSocket > maxSocket) {
266            maxSocket = it->mRTPSocket;
267        }
268        if (it->mRTCPSocket > maxSocket) {
269            maxSocket = it->mRTCPSocket;
270        }
271    }
272
273    if (maxSocket == -1) {
274        return;
275    }
276
277    int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
278
279    if (res > 0) {
280        List<StreamInfo>::iterator it = mStreams.begin();
281        while (it != mStreams.end()) {
282            if ((*it).mIsInjected) {
283                ++it;
284                continue;
285            }
286
287            status_t err = OK;
288            if (FD_ISSET(it->mRTPSocket, &rs)) {
289                err = receive(&*it, true);
290            }
291            if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
292                err = receive(&*it, false);
293            }
294
295            if (err == -ECONNRESET) {
296                // socket failure, this stream is dead, Jim.
297
298                ALOGW("failed to receive RTP/RTCP datagram.");
299                it = mStreams.erase(it);
300                continue;
301            }
302
303            ++it;
304        }
305    }
306
307    int64_t nowUs = ALooper::GetNowUs();
308    if (mLastReceiverReportTimeUs <= 0
309            || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
310        sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
311        List<StreamInfo>::iterator it = mStreams.begin();
312        while (it != mStreams.end()) {
313            StreamInfo *s = &*it;
314
315            if (s->mIsInjected) {
316                ++it;
317                continue;
318            }
319
320            if (s->mNumRTCPPacketsReceived == 0) {
321                // We have never received any RTCP packets on this stream,
322                // we don't even know where to send a report.
323                ++it;
324                continue;
325            }
326
327            buffer->setRange(0, 0);
328
329            for (size_t i = 0; i < s->mSources.size(); ++i) {
330                sp<ARTPSource> source = s->mSources.valueAt(i);
331
332                source->addReceiverReport(buffer);
333
334                if (mFlags & kRegularlyRequestFIR) {
335                    source->addFIR(buffer);
336                }
337            }
338
339            if (buffer->size() > 0) {
340                ALOGV("Sending RR...");
341
342                ssize_t n;
343                do {
344                    n = sendto(
345                        s->mRTCPSocket, buffer->data(), buffer->size(), 0,
346                        (const struct sockaddr *)&s->mRemoteRTCPAddr,
347                        sizeof(s->mRemoteRTCPAddr));
348                } while (n < 0 && errno == EINTR);
349
350                if (n <= 0) {
351                    ALOGW("failed to send RTCP receiver report (%s).",
352                         n == 0 ? "connection gone" : strerror(errno));
353
354                    it = mStreams.erase(it);
355                    continue;
356                }
357
358                CHECK_EQ(n, (ssize_t)buffer->size());
359
360                mLastReceiverReportTimeUs = nowUs;
361            }
362
363            ++it;
364        }
365    }
366
367    if (!mStreams.empty()) {
368        postPollEvent();
369    }
370}
371
372status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
373    ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
374
375    CHECK(!s->mIsInjected);
376
377    sp<ABuffer> buffer = new ABuffer(65536);
378
379    socklen_t remoteAddrLen =
380        (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
381            ? sizeof(s->mRemoteRTCPAddr) : 0;
382
383    ssize_t nbytes;
384    do {
385        nbytes = recvfrom(
386            receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
387            buffer->data(),
388            buffer->capacity(),
389            0,
390            remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
391            remoteAddrLen > 0 ? &remoteAddrLen : NULL);
392    } while (nbytes < 0 && errno == EINTR);
393
394    if (nbytes <= 0) {
395        return -ECONNRESET;
396    }
397
398    buffer->setRange(0, nbytes);
399
400    // ALOGI("received %d bytes.", buffer->size());
401
402    status_t err;
403    if (receiveRTP) {
404        err = parseRTP(s, buffer);
405    } else {
406        err = parseRTCP(s, buffer);
407    }
408
409    return err;
410}
411
412status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
413    if (s->mNumRTPPacketsReceived++ == 0) {
414        sp<AMessage> notify = s->mNotifyMsg->dup();
415        notify->setInt32("first-rtp", true);
416        notify->post();
417    }
418
419    size_t size = buffer->size();
420
421    if (size < 12) {
422        // Too short to be a valid RTP header.
423        return -1;
424    }
425
426    const uint8_t *data = buffer->data();
427
428    if ((data[0] >> 6) != 2) {
429        // Unsupported version.
430        return -1;
431    }
432
433    if (data[0] & 0x20) {
434        // Padding present.
435
436        size_t paddingLength = data[size - 1];
437
438        if (paddingLength + 12 > size) {
439            // If we removed this much padding we'd end up with something
440            // that's too short to be a valid RTP header.
441            return -1;
442        }
443
444        size -= paddingLength;
445    }
446
447    int numCSRCs = data[0] & 0x0f;
448
449    size_t payloadOffset = 12 + 4 * numCSRCs;
450
451    if (size < payloadOffset) {
452        // Not enough data to fit the basic header and all the CSRC entries.
453        return -1;
454    }
455
456    if (data[0] & 0x10) {
457        // Header eXtension present.
458
459        if (size < payloadOffset + 4) {
460            // Not enough data to fit the basic header, all CSRC entries
461            // and the first 4 bytes of the extension header.
462
463            return -1;
464        }
465
466        const uint8_t *extensionData = &data[payloadOffset];
467
468        size_t extensionLength =
469            4 * (extensionData[2] << 8 | extensionData[3]);
470
471        if (size < payloadOffset + 4 + extensionLength) {
472            return -1;
473        }
474
475        payloadOffset += 4 + extensionLength;
476    }
477
478    uint32_t srcId = u32at(&data[8]);
479
480    sp<ARTPSource> source = findSource(s, srcId);
481
482    uint32_t rtpTime = u32at(&data[4]);
483
484    sp<AMessage> meta = buffer->meta();
485    meta->setInt32("ssrc", srcId);
486    meta->setInt32("rtp-time", rtpTime);
487    meta->setInt32("PT", data[1] & 0x7f);
488    meta->setInt32("M", data[1] >> 7);
489
490    buffer->setInt32Data(u16at(&data[2]));
491    buffer->setRange(payloadOffset, size - payloadOffset);
492
493    source->processRTPPacket(buffer);
494
495    return OK;
496}
497
498status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
499    if (s->mNumRTCPPacketsReceived++ == 0) {
500        sp<AMessage> notify = s->mNotifyMsg->dup();
501        notify->setInt32("first-rtcp", true);
502        notify->post();
503    }
504
505    const uint8_t *data = buffer->data();
506    size_t size = buffer->size();
507
508    while (size > 0) {
509        if (size < 8) {
510            // Too short to be a valid RTCP header
511            return -1;
512        }
513
514        if ((data[0] >> 6) != 2) {
515            // Unsupported version.
516            return -1;
517        }
518
519        if (data[0] & 0x20) {
520            // Padding present.
521
522            size_t paddingLength = data[size - 1];
523
524            if (paddingLength + 12 > size) {
525                // If we removed this much padding we'd end up with something
526                // that's too short to be a valid RTP header.
527                return -1;
528            }
529
530            size -= paddingLength;
531        }
532
533        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
534
535        if (size < headerLength) {
536            // Only received a partial packet?
537            return -1;
538        }
539
540        switch (data[1]) {
541            case 200:
542            {
543                parseSR(s, data, headerLength);
544                break;
545            }
546
547            case 201:  // RR
548            case 202:  // SDES
549            case 204:  // APP
550                break;
551
552            case 205:  // TSFB (transport layer specific feedback)
553            case 206:  // PSFB (payload specific feedback)
554                // hexdump(data, headerLength);
555                break;
556
557            case 203:
558            {
559                parseBYE(s, data, headerLength);
560                break;
561            }
562
563            default:
564            {
565                ALOGW("Unknown RTCP packet type %u of size %d",
566                     (unsigned)data[1], headerLength);
567                break;
568            }
569        }
570
571        data += headerLength;
572        size -= headerLength;
573    }
574
575    return OK;
576}
577
578status_t ARTPConnection::parseBYE(
579        StreamInfo *s, const uint8_t *data, size_t size) {
580    size_t SC = data[0] & 0x3f;
581
582    if (SC == 0 || size < (4 + SC * 4)) {
583        // Packet too short for the minimal BYE header.
584        return -1;
585    }
586
587    uint32_t id = u32at(&data[4]);
588
589    sp<ARTPSource> source = findSource(s, id);
590
591    source->byeReceived();
592
593    return OK;
594}
595
596status_t ARTPConnection::parseSR(
597        StreamInfo *s, const uint8_t *data, size_t size) {
598    size_t RC = data[0] & 0x1f;
599
600    if (size < (7 + RC * 6) * 4) {
601        // Packet too short for the minimal SR header.
602        return -1;
603    }
604
605    uint32_t id = u32at(&data[4]);
606    uint64_t ntpTime = u64at(&data[8]);
607    uint32_t rtpTime = u32at(&data[16]);
608
609#if 0
610    ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
611         id,
612         rtpTime,
613         (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
614#endif
615
616    sp<ARTPSource> source = findSource(s, id);
617
618    source->timeUpdate(rtpTime, ntpTime);
619
620    return 0;
621}
622
623sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
624    sp<ARTPSource> source;
625    ssize_t index = info->mSources.indexOfKey(srcId);
626    if (index < 0) {
627        index = info->mSources.size();
628
629        source = new ARTPSource(
630                srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
631
632        info->mSources.add(srcId, source);
633    } else {
634        source = info->mSources.valueAt(index);
635    }
636
637    return source;
638}
639
640void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
641    sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
642    msg->setInt32("index", index);
643    msg->setBuffer("buffer", buffer);
644    msg->post();
645}
646
647void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
648    int32_t index;
649    CHECK(msg->findInt32("index", &index));
650
651    sp<ABuffer> buffer;
652    CHECK(msg->findBuffer("buffer", &buffer));
653
654    List<StreamInfo>::iterator it = mStreams.begin();
655    while (it != mStreams.end()
656           && it->mRTPSocket != index && it->mRTCPSocket != index) {
657        ++it;
658    }
659
660    if (it == mStreams.end()) {
661        TRESPASS();
662    }
663
664    StreamInfo *s = &*it;
665
666    status_t err;
667    if (it->mRTPSocket == index) {
668        err = parseRTP(s, buffer);
669    } else {
670        err = parseRTCP(s, buffer);
671    }
672}
673
674}  // namespace android
675
676