ARTPConnection.cpp revision 3a48d4d7269a37308eee4affd021adfcab7629a1
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "ARTPConnection.h"
18
19#include "ARTPSource.h"
20#include "ASessionDescription.h"
21
22#include <media/stagefright/foundation/ABuffer.h>
23#include <media/stagefright/foundation/ADebug.h>
24#include <media/stagefright/foundation/AMessage.h>
25#include <media/stagefright/foundation/AString.h>
26#include <media/stagefright/foundation/hexdump.h>
27
28#include <arpa/inet.h>
29#include <sys/socket.h>
30
31namespace android {
32
33static const size_t kMaxUDPSize = 1500;
34
35static uint16_t u16at(const uint8_t *data) {
36    return data[0] << 8 | data[1];
37}
38
39static uint32_t u32at(const uint8_t *data) {
40    return u16at(data) << 16 | u16at(&data[2]);
41}
42
43static uint64_t u64at(const uint8_t *data) {
44    return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
45}
46
47// static
48const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
49
50struct ARTPConnection::StreamInfo {
51    int mRTPSocket;
52    int mRTCPSocket;
53    sp<ASessionDescription> mSessionDesc;
54    size_t mIndex;
55    sp<AMessage> mNotifyMsg;
56    KeyedVector<uint32_t, sp<ARTPSource> > mSources;
57
58    int32_t mNumRTCPPacketsReceived;
59    struct sockaddr_in mRemoteRTCPAddr;
60
61    bool mIsInjected;
62};
63
64ARTPConnection::ARTPConnection(uint32_t flags)
65    : mFlags(flags),
66      mPollEventPending(false),
67      mLastReceiverReportTimeUs(-1) {
68}
69
70ARTPConnection::~ARTPConnection() {
71}
72
73void ARTPConnection::addStream(
74        int rtpSocket, int rtcpSocket,
75        const sp<ASessionDescription> &sessionDesc,
76        size_t index,
77        const sp<AMessage> &notify,
78        bool injected) {
79    sp<AMessage> msg = new AMessage(kWhatAddStream, id());
80    msg->setInt32("rtp-socket", rtpSocket);
81    msg->setInt32("rtcp-socket", rtcpSocket);
82    msg->setObject("session-desc", sessionDesc);
83    msg->setSize("index", index);
84    msg->setMessage("notify", notify);
85    msg->setInt32("injected", injected);
86    msg->post();
87}
88
89void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
90    sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
91    msg->setInt32("rtp-socket", rtpSocket);
92    msg->setInt32("rtcp-socket", rtcpSocket);
93    msg->post();
94}
95
96static void bumpSocketBufferSize(int s) {
97    int size = 256 * 1024;
98    CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
99}
100
101// static
102void ARTPConnection::MakePortPair(
103        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
104    *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
105    CHECK_GE(*rtpSocket, 0);
106
107    bumpSocketBufferSize(*rtpSocket);
108
109    *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
110    CHECK_GE(*rtcpSocket, 0);
111
112    bumpSocketBufferSize(*rtcpSocket);
113
114    unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
115    start &= ~1;
116
117    for (unsigned port = start; port < 65536; port += 2) {
118        struct sockaddr_in addr;
119        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
120        addr.sin_family = AF_INET;
121        addr.sin_addr.s_addr = INADDR_ANY;
122        addr.sin_port = htons(port);
123
124        if (bind(*rtpSocket,
125                 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
126            continue;
127        }
128
129        addr.sin_port = htons(port + 1);
130
131        if (bind(*rtcpSocket,
132                 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
133            *rtpPort = port;
134            return;
135        }
136    }
137
138    TRESPASS();
139}
140
141void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
142    switch (msg->what()) {
143        case kWhatAddStream:
144        {
145            onAddStream(msg);
146            break;
147        }
148
149        case kWhatRemoveStream:
150        {
151            onRemoveStream(msg);
152            break;
153        }
154
155        case kWhatPollStreams:
156        {
157            onPollStreams();
158            break;
159        }
160
161        case kWhatInjectPacket:
162        {
163            onInjectPacket(msg);
164            break;
165        }
166
167        default:
168        {
169            TRESPASS();
170            break;
171        }
172    }
173}
174
175void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
176    mStreams.push_back(StreamInfo());
177    StreamInfo *info = &*--mStreams.end();
178
179    int32_t s;
180    CHECK(msg->findInt32("rtp-socket", &s));
181    info->mRTPSocket = s;
182    CHECK(msg->findInt32("rtcp-socket", &s));
183    info->mRTCPSocket = s;
184
185    int32_t injected;
186    CHECK(msg->findInt32("injected", &injected));
187
188    info->mIsInjected = injected;
189
190    sp<RefBase> obj;
191    CHECK(msg->findObject("session-desc", &obj));
192    info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
193
194    CHECK(msg->findSize("index", &info->mIndex));
195    CHECK(msg->findMessage("notify", &info->mNotifyMsg));
196
197    info->mNumRTCPPacketsReceived = 0;
198    memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
199
200    if (!injected) {
201        postPollEvent();
202    }
203}
204
205void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
206    int32_t rtpSocket, rtcpSocket;
207    CHECK(msg->findInt32("rtp-socket", &rtpSocket));
208    CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
209
210    List<StreamInfo>::iterator it = mStreams.begin();
211    while (it != mStreams.end()
212           && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
213        ++it;
214    }
215
216    if (it == mStreams.end()) {
217        TRESPASS();
218    }
219
220    mStreams.erase(it);
221}
222
223void ARTPConnection::postPollEvent() {
224    if (mPollEventPending) {
225        return;
226    }
227
228    sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
229    msg->post();
230
231    mPollEventPending = true;
232}
233
234void ARTPConnection::onPollStreams() {
235    mPollEventPending = false;
236
237    if (mStreams.empty()) {
238        return;
239    }
240
241    struct timeval tv;
242    tv.tv_sec = 0;
243    tv.tv_usec = kSelectTimeoutUs;
244
245    fd_set rs;
246    FD_ZERO(&rs);
247
248    int maxSocket = -1;
249    for (List<StreamInfo>::iterator it = mStreams.begin();
250         it != mStreams.end(); ++it) {
251        if ((*it).mIsInjected) {
252            continue;
253        }
254
255        FD_SET(it->mRTPSocket, &rs);
256        FD_SET(it->mRTCPSocket, &rs);
257
258        if (it->mRTPSocket > maxSocket) {
259            maxSocket = it->mRTPSocket;
260        }
261        if (it->mRTCPSocket > maxSocket) {
262            maxSocket = it->mRTCPSocket;
263        }
264    }
265
266    if (maxSocket == -1) {
267        return;
268    }
269
270    int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
271    CHECK_GE(res, 0);
272
273    if (res > 0) {
274        for (List<StreamInfo>::iterator it = mStreams.begin();
275             it != mStreams.end(); ++it) {
276            if ((*it).mIsInjected) {
277                continue;
278            }
279
280            if (FD_ISSET(it->mRTPSocket, &rs)) {
281                receive(&*it, true);
282            }
283            if (FD_ISSET(it->mRTCPSocket, &rs)) {
284                receive(&*it, false);
285            }
286        }
287    }
288
289    postPollEvent();
290
291    int64_t nowUs = ALooper::GetNowUs();
292    if (mLastReceiverReportTimeUs <= 0
293            || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
294        sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
295        for (List<StreamInfo>::iterator it = mStreams.begin();
296             it != mStreams.end(); ++it) {
297            StreamInfo *s = &*it;
298
299            if (s->mIsInjected) {
300                continue;
301            }
302
303            if (s->mNumRTCPPacketsReceived == 0) {
304                // We have never received any RTCP packets on this stream,
305                // we don't even know where to send a report.
306                continue;
307            }
308
309            buffer->setRange(0, 0);
310
311            for (size_t i = 0; i < s->mSources.size(); ++i) {
312                sp<ARTPSource> source = s->mSources.valueAt(i);
313
314                source->addReceiverReport(buffer);
315
316                if (mFlags & kRegularlyRequestFIR) {
317                    source->addFIR(buffer);
318                }
319            }
320
321            if (buffer->size() > 0) {
322                LOG(VERBOSE) << "Sending RR...";
323
324                ssize_t n = sendto(
325                        s->mRTCPSocket, buffer->data(), buffer->size(), 0,
326                        (const struct sockaddr *)&s->mRemoteRTCPAddr,
327                        sizeof(s->mRemoteRTCPAddr));
328                CHECK_EQ(n, (ssize_t)buffer->size());
329
330                mLastReceiverReportTimeUs = nowUs;
331            }
332        }
333    }
334}
335
336status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
337    CHECK(!s->mIsInjected);
338
339    sp<ABuffer> buffer = new ABuffer(65536);
340
341    socklen_t remoteAddrLen =
342        (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
343            ? sizeof(s->mRemoteRTCPAddr) : 0;
344
345    ssize_t nbytes = recvfrom(
346            receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
347            buffer->data(),
348            buffer->capacity(),
349            0,
350            remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
351            remoteAddrLen > 0 ? &remoteAddrLen : NULL);
352
353    if (nbytes < 0) {
354        return -1;
355    }
356
357    buffer->setRange(0, nbytes);
358
359    // LOG(INFO) << "received " << buffer->size() << " bytes.";
360
361    status_t err;
362    if (receiveRTP) {
363        err = parseRTP(s, buffer);
364    } else {
365        err = parseRTCP(s, buffer);
366    }
367
368    return err;
369}
370
371status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
372    size_t size = buffer->size();
373
374    if (size < 12) {
375        // Too short to be a valid RTP header.
376        return -1;
377    }
378
379    const uint8_t *data = buffer->data();
380
381    if ((data[0] >> 6) != 2) {
382        // Unsupported version.
383        return -1;
384    }
385
386    if (data[0] & 0x20) {
387        // Padding present.
388
389        size_t paddingLength = data[size - 1];
390
391        if (paddingLength + 12 > size) {
392            // If we removed this much padding we'd end up with something
393            // that's too short to be a valid RTP header.
394            return -1;
395        }
396
397        size -= paddingLength;
398    }
399
400    int numCSRCs = data[0] & 0x0f;
401
402    size_t payloadOffset = 12 + 4 * numCSRCs;
403
404    if (size < payloadOffset) {
405        // Not enough data to fit the basic header and all the CSRC entries.
406        return -1;
407    }
408
409    if (data[0] & 0x10) {
410        // Header eXtension present.
411
412        if (size < payloadOffset + 4) {
413            // Not enough data to fit the basic header, all CSRC entries
414            // and the first 4 bytes of the extension header.
415
416            return -1;
417        }
418
419        const uint8_t *extensionData = &data[payloadOffset];
420
421        size_t extensionLength =
422            4 * (extensionData[2] << 8 | extensionData[3]);
423
424        if (size < payloadOffset + 4 + extensionLength) {
425            return -1;
426        }
427
428        payloadOffset += 4 + extensionLength;
429    }
430
431    uint32_t srcId = u32at(&data[8]);
432
433    sp<ARTPSource> source = findSource(s, srcId);
434
435    uint32_t rtpTime = u32at(&data[4]);
436
437    sp<AMessage> meta = buffer->meta();
438    meta->setInt32("ssrc", srcId);
439    meta->setInt32("rtp-time", rtpTime);
440    meta->setInt32("PT", data[1] & 0x7f);
441    meta->setInt32("M", data[1] >> 7);
442
443    buffer->setInt32Data(u16at(&data[2]));
444    buffer->setRange(payloadOffset, size - payloadOffset);
445
446    if ((mFlags & kFakeTimestamps) && !source->timeEstablished()) {
447        source->timeUpdate(rtpTime, 0);
448        source->timeUpdate(rtpTime + 90000, 0x100000000ll);
449        CHECK(source->timeEstablished());
450    }
451
452    source->processRTPPacket(buffer);
453
454    return OK;
455}
456
457status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
458    if (s->mNumRTCPPacketsReceived++ == 0) {
459        sp<AMessage> notify = s->mNotifyMsg->dup();
460        notify->setInt32("first-rtcp", true);
461        notify->post();
462    }
463
464    const uint8_t *data = buffer->data();
465    size_t size = buffer->size();
466
467    while (size > 0) {
468        if (size < 8) {
469            // Too short to be a valid RTCP header
470            return -1;
471        }
472
473        if ((data[0] >> 6) != 2) {
474            // Unsupported version.
475            return -1;
476        }
477
478        if (data[0] & 0x20) {
479            // Padding present.
480
481            size_t paddingLength = data[size - 1];
482
483            if (paddingLength + 12 > size) {
484                // If we removed this much padding we'd end up with something
485                // that's too short to be a valid RTP header.
486                return -1;
487            }
488
489            size -= paddingLength;
490        }
491
492        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
493
494        if (size < headerLength) {
495            // Only received a partial packet?
496            return -1;
497        }
498
499        switch (data[1]) {
500            case 200:
501            {
502                parseSR(s, data, headerLength);
503                break;
504            }
505
506            case 201:  // RR
507            case 202:  // SDES
508            case 204:  // APP
509                break;
510
511            case 205:  // TSFB (transport layer specific feedback)
512            case 206:  // PSFB (payload specific feedback)
513                // hexdump(data, headerLength);
514                break;
515
516            case 203:
517            {
518                parseBYE(s, data, headerLength);
519                break;
520            }
521
522            default:
523            {
524                LOG(WARNING) << "Unknown RTCP packet type "
525                             << (unsigned)data[1]
526                             << " of size " << headerLength;
527                break;
528            }
529        }
530
531        data += headerLength;
532        size -= headerLength;
533    }
534
535    return OK;
536}
537
538status_t ARTPConnection::parseBYE(
539        StreamInfo *s, const uint8_t *data, size_t size) {
540    size_t SC = data[0] & 0x3f;
541
542    if (SC == 0 || size < (4 + SC * 4)) {
543        // Packet too short for the minimal BYE header.
544        return -1;
545    }
546
547    uint32_t id = u32at(&data[4]);
548
549    sp<ARTPSource> source = findSource(s, id);
550
551    source->byeReceived();
552
553    return OK;
554}
555
556status_t ARTPConnection::parseSR(
557        StreamInfo *s, const uint8_t *data, size_t size) {
558    size_t RC = data[0] & 0x1f;
559
560    if (size < (7 + RC * 6) * 4) {
561        // Packet too short for the minimal SR header.
562        return -1;
563    }
564
565    uint32_t id = u32at(&data[4]);
566    uint64_t ntpTime = u64at(&data[8]);
567    uint32_t rtpTime = u32at(&data[16]);
568
569#if 0
570    LOG(INFO) << StringPrintf(
571            "XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
572            id,
573            rtpTime, (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
574#endif
575
576    sp<ARTPSource> source = findSource(s, id);
577
578    if ((mFlags & kFakeTimestamps) == 0) {
579        source->timeUpdate(rtpTime, ntpTime);
580    }
581
582    return 0;
583}
584
585sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
586    sp<ARTPSource> source;
587    ssize_t index = info->mSources.indexOfKey(srcId);
588    if (index < 0) {
589        index = info->mSources.size();
590
591        source = new ARTPSource(
592                srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
593
594        info->mSources.add(srcId, source);
595    } else {
596        source = info->mSources.valueAt(index);
597    }
598
599    return source;
600}
601
602void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
603    sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
604    msg->setInt32("index", index);
605    msg->setObject("buffer", buffer);
606    msg->post();
607}
608
609void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
610    int32_t index;
611    CHECK(msg->findInt32("index", &index));
612
613    sp<RefBase> obj;
614    CHECK(msg->findObject("buffer", &obj));
615
616    sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
617
618    List<StreamInfo>::iterator it = mStreams.begin();
619    while (it != mStreams.end()
620           && it->mRTPSocket != index && it->mRTCPSocket != index) {
621        ++it;
622    }
623
624    if (it == mStreams.end()) {
625        TRESPASS();
626    }
627
628    StreamInfo *s = &*it;
629
630    status_t err;
631    if (it->mRTPSocket == index) {
632        err = parseRTP(s, buffer);
633    } else {
634        err = parseRTCP(s, buffer);
635    }
636}
637
638}  // namespace android
639
640