1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "ARTPConnection"
19#include <utils/Log.h>
20
21#include "ARTPConnection.h"
22
23#include "ARTPSource.h"
24#include "ASessionDescription.h"
25
26#include <media/stagefright/foundation/ABuffer.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/AMessage.h>
29#include <media/stagefright/foundation/AString.h>
30#include <media/stagefright/foundation/hexdump.h>
31
32#include <arpa/inet.h>
33#include <sys/socket.h>
34
35namespace android {
36
37static const size_t kMaxUDPSize = 1500;
38
39static uint16_t u16at(const uint8_t *data) {
40    return data[0] << 8 | data[1];
41}
42
43static uint32_t u32at(const uint8_t *data) {
44    return u16at(data) << 16 | u16at(&data[2]);
45}
46
47static uint64_t u64at(const uint8_t *data) {
48    return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
49}
50
51// static
52const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
53
54struct ARTPConnection::StreamInfo {
55    int mRTPSocket;
56    int mRTCPSocket;
57    sp<ASessionDescription> mSessionDesc;
58    size_t mIndex;
59    sp<AMessage> mNotifyMsg;
60    KeyedVector<uint32_t, sp<ARTPSource> > mSources;
61
62    int64_t mNumRTCPPacketsReceived;
63    int64_t mNumRTPPacketsReceived;
64    struct sockaddr_in mRemoteRTCPAddr;
65
66    bool mIsInjected;
67};
68
69ARTPConnection::ARTPConnection(uint32_t flags)
70    : mFlags(flags),
71      mPollEventPending(false),
72      mLastReceiverReportTimeUs(-1) {
73}
74
75ARTPConnection::~ARTPConnection() {
76}
77
78void ARTPConnection::addStream(
79        int rtpSocket, int rtcpSocket,
80        const sp<ASessionDescription> &sessionDesc,
81        size_t index,
82        const sp<AMessage> &notify,
83        bool injected) {
84    sp<AMessage> msg = new AMessage(kWhatAddStream, id());
85    msg->setInt32("rtp-socket", rtpSocket);
86    msg->setInt32("rtcp-socket", rtcpSocket);
87    msg->setObject("session-desc", sessionDesc);
88    msg->setSize("index", index);
89    msg->setMessage("notify", notify);
90    msg->setInt32("injected", injected);
91    msg->post();
92}
93
94void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
95    sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
96    msg->setInt32("rtp-socket", rtpSocket);
97    msg->setInt32("rtcp-socket", rtcpSocket);
98    msg->post();
99}
100
101static void bumpSocketBufferSize(int s) {
102    int size = 256 * 1024;
103    CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
104}
105
106// static
107void ARTPConnection::MakePortPair(
108        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
109    *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
110    CHECK_GE(*rtpSocket, 0);
111
112    bumpSocketBufferSize(*rtpSocket);
113
114    *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
115    CHECK_GE(*rtcpSocket, 0);
116
117    bumpSocketBufferSize(*rtcpSocket);
118
119    unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
120    start &= ~1;
121
122    for (unsigned port = start; port < 65536; port += 2) {
123        struct sockaddr_in addr;
124        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
125        addr.sin_family = AF_INET;
126        addr.sin_addr.s_addr = htonl(INADDR_ANY);
127        addr.sin_port = htons(port);
128
129        if (bind(*rtpSocket,
130                 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
131            continue;
132        }
133
134        addr.sin_port = htons(port + 1);
135
136        if (bind(*rtcpSocket,
137                 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
138            *rtpPort = port;
139            return;
140        }
141    }
142
143    TRESPASS();
144}
145
146void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
147    switch (msg->what()) {
148        case kWhatAddStream:
149        {
150            onAddStream(msg);
151            break;
152        }
153
154        case kWhatRemoveStream:
155        {
156            onRemoveStream(msg);
157            break;
158        }
159
160        case kWhatPollStreams:
161        {
162            onPollStreams();
163            break;
164        }
165
166        case kWhatInjectPacket:
167        {
168            onInjectPacket(msg);
169            break;
170        }
171
172        default:
173        {
174            TRESPASS();
175            break;
176        }
177    }
178}
179
180void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
181    mStreams.push_back(StreamInfo());
182    StreamInfo *info = &*--mStreams.end();
183
184    int32_t s;
185    CHECK(msg->findInt32("rtp-socket", &s));
186    info->mRTPSocket = s;
187    CHECK(msg->findInt32("rtcp-socket", &s));
188    info->mRTCPSocket = s;
189
190    int32_t injected;
191    CHECK(msg->findInt32("injected", &injected));
192
193    info->mIsInjected = injected;
194
195    sp<RefBase> obj;
196    CHECK(msg->findObject("session-desc", &obj));
197    info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
198
199    CHECK(msg->findSize("index", &info->mIndex));
200    CHECK(msg->findMessage("notify", &info->mNotifyMsg));
201
202    info->mNumRTCPPacketsReceived = 0;
203    info->mNumRTPPacketsReceived = 0;
204    memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
205
206    if (!injected) {
207        postPollEvent();
208    }
209}
210
211void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
212    int32_t rtpSocket, rtcpSocket;
213    CHECK(msg->findInt32("rtp-socket", &rtpSocket));
214    CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
215
216    List<StreamInfo>::iterator it = mStreams.begin();
217    while (it != mStreams.end()
218           && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
219        ++it;
220    }
221
222    if (it == mStreams.end()) {
223        return;
224    }
225
226    mStreams.erase(it);
227}
228
229void ARTPConnection::postPollEvent() {
230    if (mPollEventPending) {
231        return;
232    }
233
234    sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
235    msg->post();
236
237    mPollEventPending = true;
238}
239
240void ARTPConnection::onPollStreams() {
241    mPollEventPending = false;
242
243    if (mStreams.empty()) {
244        return;
245    }
246
247    struct timeval tv;
248    tv.tv_sec = 0;
249    tv.tv_usec = kSelectTimeoutUs;
250
251    fd_set rs;
252    FD_ZERO(&rs);
253
254    int maxSocket = -1;
255    for (List<StreamInfo>::iterator it = mStreams.begin();
256         it != mStreams.end(); ++it) {
257        if ((*it).mIsInjected) {
258            continue;
259        }
260
261        FD_SET(it->mRTPSocket, &rs);
262        FD_SET(it->mRTCPSocket, &rs);
263
264        if (it->mRTPSocket > maxSocket) {
265            maxSocket = it->mRTPSocket;
266        }
267        if (it->mRTCPSocket > maxSocket) {
268            maxSocket = it->mRTCPSocket;
269        }
270    }
271
272    if (maxSocket == -1) {
273        return;
274    }
275
276    int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
277
278    if (res > 0) {
279        List<StreamInfo>::iterator it = mStreams.begin();
280        while (it != mStreams.end()) {
281            if ((*it).mIsInjected) {
282                ++it;
283                continue;
284            }
285
286            status_t err = OK;
287            if (FD_ISSET(it->mRTPSocket, &rs)) {
288                err = receive(&*it, true);
289            }
290            if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
291                err = receive(&*it, false);
292            }
293
294            if (err == -ECONNRESET) {
295                // socket failure, this stream is dead, Jim.
296
297                LOGW("failed to receive RTP/RTCP datagram.");
298                it = mStreams.erase(it);
299                continue;
300            }
301
302            ++it;
303        }
304    }
305
306    int64_t nowUs = ALooper::GetNowUs();
307    if (mLastReceiverReportTimeUs <= 0
308            || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
309        sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
310        List<StreamInfo>::iterator it = mStreams.begin();
311        while (it != mStreams.end()) {
312            StreamInfo *s = &*it;
313
314            if (s->mIsInjected) {
315                ++it;
316                continue;
317            }
318
319            if (s->mNumRTCPPacketsReceived == 0) {
320                // We have never received any RTCP packets on this stream,
321                // we don't even know where to send a report.
322                ++it;
323                continue;
324            }
325
326            buffer->setRange(0, 0);
327
328            for (size_t i = 0; i < s->mSources.size(); ++i) {
329                sp<ARTPSource> source = s->mSources.valueAt(i);
330
331                source->addReceiverReport(buffer);
332
333                if (mFlags & kRegularlyRequestFIR) {
334                    source->addFIR(buffer);
335                }
336            }
337
338            if (buffer->size() > 0) {
339                LOGV("Sending RR...");
340
341                ssize_t n;
342                do {
343                    n = sendto(
344                        s->mRTCPSocket, buffer->data(), buffer->size(), 0,
345                        (const struct sockaddr *)&s->mRemoteRTCPAddr,
346                        sizeof(s->mRemoteRTCPAddr));
347                } while (n < 0 && errno == EINTR);
348
349                if (n <= 0) {
350                    LOGW("failed to send RTCP receiver report (%s).",
351                         n == 0 ? "connection gone" : strerror(errno));
352
353                    it = mStreams.erase(it);
354                    continue;
355                }
356
357                CHECK_EQ(n, (ssize_t)buffer->size());
358
359                mLastReceiverReportTimeUs = nowUs;
360            }
361
362            ++it;
363        }
364    }
365
366    if (!mStreams.empty()) {
367        postPollEvent();
368    }
369}
370
371status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
372    LOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
373
374    CHECK(!s->mIsInjected);
375
376    sp<ABuffer> buffer = new ABuffer(65536);
377
378    socklen_t remoteAddrLen =
379        (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
380            ? sizeof(s->mRemoteRTCPAddr) : 0;
381
382    ssize_t nbytes;
383    do {
384        nbytes = recvfrom(
385            receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
386            buffer->data(),
387            buffer->capacity(),
388            0,
389            remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
390            remoteAddrLen > 0 ? &remoteAddrLen : NULL);
391    } while (nbytes < 0 && errno == EINTR);
392
393    if (nbytes <= 0) {
394        return -ECONNRESET;
395    }
396
397    buffer->setRange(0, nbytes);
398
399    // LOGI("received %d bytes.", buffer->size());
400
401    status_t err;
402    if (receiveRTP) {
403        err = parseRTP(s, buffer);
404    } else {
405        err = parseRTCP(s, buffer);
406    }
407
408    return err;
409}
410
411status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
412    if (s->mNumRTPPacketsReceived++ == 0) {
413        sp<AMessage> notify = s->mNotifyMsg->dup();
414        notify->setInt32("first-rtp", true);
415        notify->post();
416    }
417
418    size_t size = buffer->size();
419
420    if (size < 12) {
421        // Too short to be a valid RTP header.
422        return -1;
423    }
424
425    const uint8_t *data = buffer->data();
426
427    if ((data[0] >> 6) != 2) {
428        // Unsupported version.
429        return -1;
430    }
431
432    if (data[0] & 0x20) {
433        // Padding present.
434
435        size_t paddingLength = data[size - 1];
436
437        if (paddingLength + 12 > size) {
438            // If we removed this much padding we'd end up with something
439            // that's too short to be a valid RTP header.
440            return -1;
441        }
442
443        size -= paddingLength;
444    }
445
446    int numCSRCs = data[0] & 0x0f;
447
448    size_t payloadOffset = 12 + 4 * numCSRCs;
449
450    if (size < payloadOffset) {
451        // Not enough data to fit the basic header and all the CSRC entries.
452        return -1;
453    }
454
455    if (data[0] & 0x10) {
456        // Header eXtension present.
457
458        if (size < payloadOffset + 4) {
459            // Not enough data to fit the basic header, all CSRC entries
460            // and the first 4 bytes of the extension header.
461
462            return -1;
463        }
464
465        const uint8_t *extensionData = &data[payloadOffset];
466
467        size_t extensionLength =
468            4 * (extensionData[2] << 8 | extensionData[3]);
469
470        if (size < payloadOffset + 4 + extensionLength) {
471            return -1;
472        }
473
474        payloadOffset += 4 + extensionLength;
475    }
476
477    uint32_t srcId = u32at(&data[8]);
478
479    sp<ARTPSource> source = findSource(s, srcId);
480
481    uint32_t rtpTime = u32at(&data[4]);
482
483    sp<AMessage> meta = buffer->meta();
484    meta->setInt32("ssrc", srcId);
485    meta->setInt32("rtp-time", rtpTime);
486    meta->setInt32("PT", data[1] & 0x7f);
487    meta->setInt32("M", data[1] >> 7);
488
489    buffer->setInt32Data(u16at(&data[2]));
490    buffer->setRange(payloadOffset, size - payloadOffset);
491
492    source->processRTPPacket(buffer);
493
494    return OK;
495}
496
497status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
498    if (s->mNumRTCPPacketsReceived++ == 0) {
499        sp<AMessage> notify = s->mNotifyMsg->dup();
500        notify->setInt32("first-rtcp", true);
501        notify->post();
502    }
503
504    const uint8_t *data = buffer->data();
505    size_t size = buffer->size();
506
507    while (size > 0) {
508        if (size < 8) {
509            // Too short to be a valid RTCP header
510            return -1;
511        }
512
513        if ((data[0] >> 6) != 2) {
514            // Unsupported version.
515            return -1;
516        }
517
518        if (data[0] & 0x20) {
519            // Padding present.
520
521            size_t paddingLength = data[size - 1];
522
523            if (paddingLength + 12 > size) {
524                // If we removed this much padding we'd end up with something
525                // that's too short to be a valid RTP header.
526                return -1;
527            }
528
529            size -= paddingLength;
530        }
531
532        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
533
534        if (size < headerLength) {
535            // Only received a partial packet?
536            return -1;
537        }
538
539        switch (data[1]) {
540            case 200:
541            {
542                parseSR(s, data, headerLength);
543                break;
544            }
545
546            case 201:  // RR
547            case 202:  // SDES
548            case 204:  // APP
549                break;
550
551            case 205:  // TSFB (transport layer specific feedback)
552            case 206:  // PSFB (payload specific feedback)
553                // hexdump(data, headerLength);
554                break;
555
556            case 203:
557            {
558                parseBYE(s, data, headerLength);
559                break;
560            }
561
562            default:
563            {
564                LOGW("Unknown RTCP packet type %u of size %d",
565                     (unsigned)data[1], headerLength);
566                break;
567            }
568        }
569
570        data += headerLength;
571        size -= headerLength;
572    }
573
574    return OK;
575}
576
577status_t ARTPConnection::parseBYE(
578        StreamInfo *s, const uint8_t *data, size_t size) {
579    size_t SC = data[0] & 0x3f;
580
581    if (SC == 0 || size < (4 + SC * 4)) {
582        // Packet too short for the minimal BYE header.
583        return -1;
584    }
585
586    uint32_t id = u32at(&data[4]);
587
588    sp<ARTPSource> source = findSource(s, id);
589
590    source->byeReceived();
591
592    return OK;
593}
594
595status_t ARTPConnection::parseSR(
596        StreamInfo *s, const uint8_t *data, size_t size) {
597    size_t RC = data[0] & 0x1f;
598
599    if (size < (7 + RC * 6) * 4) {
600        // Packet too short for the minimal SR header.
601        return -1;
602    }
603
604    uint32_t id = u32at(&data[4]);
605    uint64_t ntpTime = u64at(&data[8]);
606    uint32_t rtpTime = u32at(&data[16]);
607
608#if 0
609    LOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
610         id,
611         rtpTime,
612         (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
613#endif
614
615    sp<ARTPSource> source = findSource(s, id);
616
617    source->timeUpdate(rtpTime, ntpTime);
618
619    return 0;
620}
621
622sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
623    sp<ARTPSource> source;
624    ssize_t index = info->mSources.indexOfKey(srcId);
625    if (index < 0) {
626        index = info->mSources.size();
627
628        source = new ARTPSource(
629                srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
630
631        info->mSources.add(srcId, source);
632    } else {
633        source = info->mSources.valueAt(index);
634    }
635
636    return source;
637}
638
639void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
640    sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
641    msg->setInt32("index", index);
642    msg->setObject("buffer", buffer);
643    msg->post();
644}
645
646void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
647    int32_t index;
648    CHECK(msg->findInt32("index", &index));
649
650    sp<RefBase> obj;
651    CHECK(msg->findObject("buffer", &obj));
652
653    sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
654
655    List<StreamInfo>::iterator it = mStreams.begin();
656    while (it != mStreams.end()
657           && it->mRTPSocket != index && it->mRTCPSocket != index) {
658        ++it;
659    }
660
661    if (it == mStreams.end()) {
662        TRESPASS();
663    }
664
665    StreamInfo *s = &*it;
666
667    status_t err;
668    if (it->mRTPSocket == index) {
669        err = parseRTP(s, buffer);
670    } else {
671        err = parseRTCP(s, buffer);
672    }
673}
674
675}  // namespace android
676
677