ARTPConnection.cpp revision a979ad6739d573b3823b0fe7321f554ef5544753
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "ARTPConnection.h"
18
19#include "ARTPSource.h"
20#include "ASessionDescription.h"
21
22#include <media/stagefright/foundation/ABuffer.h>
23#include <media/stagefright/foundation/ADebug.h>
24#include <media/stagefright/foundation/AMessage.h>
25#include <media/stagefright/foundation/AString.h>
26#include <media/stagefright/foundation/hexdump.h>
27
28#include <arpa/inet.h>
29#include <sys/socket.h>
30
31namespace android {
32
33static const size_t kMaxUDPSize = 1500;
34
35static uint16_t u16at(const uint8_t *data) {
36    return data[0] << 8 | data[1];
37}
38
39static uint32_t u32at(const uint8_t *data) {
40    return u16at(data) << 16 | u16at(&data[2]);
41}
42
43static uint64_t u64at(const uint8_t *data) {
44    return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
45}
46
47// static
48const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
49
50struct ARTPConnection::StreamInfo {
51    int mRTPSocket;
52    int mRTCPSocket;
53    sp<ASessionDescription> mSessionDesc;
54    size_t mIndex;
55    sp<AMessage> mNotifyMsg;
56    KeyedVector<uint32_t, sp<ARTPSource> > mSources;
57
58    int32_t mNumRTCPPacketsReceived;
59    struct sockaddr_in mRemoteRTCPAddr;
60};
61
62ARTPConnection::ARTPConnection(uint32_t flags)
63    : mFlags(flags),
64      mPollEventPending(false),
65      mLastReceiverReportTimeUs(-1) {
66}
67
68ARTPConnection::~ARTPConnection() {
69}
70
71void ARTPConnection::addStream(
72        int rtpSocket, int rtcpSocket,
73        const sp<ASessionDescription> &sessionDesc,
74        size_t index,
75        const sp<AMessage> &notify) {
76    sp<AMessage> msg = new AMessage(kWhatAddStream, id());
77    msg->setInt32("rtp-socket", rtpSocket);
78    msg->setInt32("rtcp-socket", rtcpSocket);
79    msg->setObject("session-desc", sessionDesc);
80    msg->setSize("index", index);
81    msg->setMessage("notify", notify);
82    msg->post();
83}
84
85void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
86    sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
87    msg->setInt32("rtp-socket", rtpSocket);
88    msg->setInt32("rtcp-socket", rtcpSocket);
89    msg->post();
90}
91
92static void bumpSocketBufferSize(int s) {
93    int size = 256 * 1024;
94    CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
95}
96
97// static
98void ARTPConnection::MakePortPair(
99        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
100    *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
101    CHECK_GE(*rtpSocket, 0);
102
103    bumpSocketBufferSize(*rtpSocket);
104
105    *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
106    CHECK_GE(*rtcpSocket, 0);
107
108    bumpSocketBufferSize(*rtcpSocket);
109
110    unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
111    start &= ~1;
112
113    for (unsigned port = start; port < 65536; port += 2) {
114        struct sockaddr_in addr;
115        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
116        addr.sin_family = AF_INET;
117        addr.sin_addr.s_addr = INADDR_ANY;
118        addr.sin_port = htons(port);
119
120        if (bind(*rtpSocket,
121                 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
122            continue;
123        }
124
125        addr.sin_port = htons(port + 1);
126
127        if (bind(*rtcpSocket,
128                 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
129            *rtpPort = port;
130            return;
131        }
132    }
133
134    TRESPASS();
135}
136
137void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
138    switch (msg->what()) {
139        case kWhatAddStream:
140        {
141            onAddStream(msg);
142            break;
143        }
144
145        case kWhatRemoveStream:
146        {
147            onRemoveStream(msg);
148            break;
149        }
150
151        case kWhatPollStreams:
152        {
153            onPollStreams();
154            break;
155        }
156
157        default:
158        {
159            TRESPASS();
160            break;
161        }
162    }
163}
164
165void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
166    mStreams.push_back(StreamInfo());
167    StreamInfo *info = &*--mStreams.end();
168
169    int32_t s;
170    CHECK(msg->findInt32("rtp-socket", &s));
171    info->mRTPSocket = s;
172    CHECK(msg->findInt32("rtcp-socket", &s));
173    info->mRTCPSocket = s;
174
175    sp<RefBase> obj;
176    CHECK(msg->findObject("session-desc", &obj));
177    info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
178
179    CHECK(msg->findSize("index", &info->mIndex));
180    CHECK(msg->findMessage("notify", &info->mNotifyMsg));
181
182    info->mNumRTCPPacketsReceived = 0;
183    memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
184
185    postPollEvent();
186}
187
188void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
189    int32_t rtpSocket, rtcpSocket;
190    CHECK(msg->findInt32("rtp-socket", &rtpSocket));
191    CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
192
193    List<StreamInfo>::iterator it = mStreams.begin();
194    while (it != mStreams.end()
195           && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
196        ++it;
197    }
198
199    if (it == mStreams.end()) {
200        TRESPASS();
201    }
202
203    mStreams.erase(it);
204}
205
206void ARTPConnection::postPollEvent() {
207    if (mPollEventPending) {
208        return;
209    }
210
211    sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
212    msg->post();
213
214    mPollEventPending = true;
215}
216
217void ARTPConnection::onPollStreams() {
218    mPollEventPending = false;
219
220    if (mStreams.empty()) {
221        return;
222    }
223
224    struct timeval tv;
225    tv.tv_sec = 0;
226    tv.tv_usec = kSelectTimeoutUs;
227
228    fd_set rs;
229    FD_ZERO(&rs);
230
231    int maxSocket = -1;
232    for (List<StreamInfo>::iterator it = mStreams.begin();
233         it != mStreams.end(); ++it) {
234        FD_SET(it->mRTPSocket, &rs);
235        FD_SET(it->mRTCPSocket, &rs);
236
237        if (it->mRTPSocket > maxSocket) {
238            maxSocket = it->mRTPSocket;
239        }
240        if (it->mRTCPSocket > maxSocket) {
241            maxSocket = it->mRTCPSocket;
242        }
243    }
244
245    int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
246    CHECK_GE(res, 0);
247
248    if (res > 0) {
249        for (List<StreamInfo>::iterator it = mStreams.begin();
250             it != mStreams.end(); ++it) {
251            if (FD_ISSET(it->mRTPSocket, &rs)) {
252                receive(&*it, true);
253            }
254            if (FD_ISSET(it->mRTCPSocket, &rs)) {
255                receive(&*it, false);
256            }
257        }
258    }
259
260    postPollEvent();
261
262    int64_t nowUs = ALooper::GetNowUs();
263    if (mLastReceiverReportTimeUs <= 0
264            || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
265        sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
266        for (List<StreamInfo>::iterator it = mStreams.begin();
267             it != mStreams.end(); ++it) {
268            StreamInfo *s = &*it;
269
270            if (s->mNumRTCPPacketsReceived == 0) {
271                // We have never received any RTCP packets on this stream,
272                // we don't even know where to send a report.
273                continue;
274            }
275
276            buffer->setRange(0, 0);
277
278            for (size_t i = 0; i < s->mSources.size(); ++i) {
279                sp<ARTPSource> source = s->mSources.valueAt(i);
280
281                source->addReceiverReport(buffer);
282
283                if (mFlags & kRegularlyRequestFIR) {
284                    source->addFIR(buffer);
285                }
286            }
287
288            if (buffer->size() > 0) {
289                LOG(VERBOSE) << "Sending RR...";
290
291                ssize_t n = sendto(
292                        s->mRTCPSocket, buffer->data(), buffer->size(), 0,
293                        (const struct sockaddr *)&s->mRemoteRTCPAddr,
294                        sizeof(s->mRemoteRTCPAddr));
295                CHECK_EQ(n, (ssize_t)buffer->size());
296
297                mLastReceiverReportTimeUs = nowUs;
298            }
299        }
300    }
301}
302
303status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
304    sp<ABuffer> buffer = new ABuffer(65536);
305
306    socklen_t remoteAddrLen =
307        (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
308            ? sizeof(s->mRemoteRTCPAddr) : 0;
309
310    ssize_t nbytes = recvfrom(
311            receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
312            buffer->data(),
313            buffer->capacity(),
314            0,
315            remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
316            remoteAddrLen > 0 ? &remoteAddrLen : NULL);
317
318    if (nbytes < 0) {
319        return -1;
320    }
321
322    buffer->setRange(0, nbytes);
323
324    // LOG(INFO) << "received " << buffer->size() << " bytes.";
325
326    status_t err;
327    if (receiveRTP) {
328        err = parseRTP(s, buffer);
329    } else {
330        ++s->mNumRTCPPacketsReceived;
331        err = parseRTCP(s, buffer);
332    }
333
334    return err;
335}
336
337status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
338    size_t size = buffer->size();
339
340    if (size < 12) {
341        // Too short to be a valid RTP header.
342        return -1;
343    }
344
345    const uint8_t *data = buffer->data();
346
347    if ((data[0] >> 6) != 2) {
348        // Unsupported version.
349        return -1;
350    }
351
352    if (data[0] & 0x20) {
353        // Padding present.
354
355        size_t paddingLength = data[size - 1];
356
357        if (paddingLength + 12 > size) {
358            // If we removed this much padding we'd end up with something
359            // that's too short to be a valid RTP header.
360            return -1;
361        }
362
363        size -= paddingLength;
364    }
365
366    int numCSRCs = data[0] & 0x0f;
367
368    size_t payloadOffset = 12 + 4 * numCSRCs;
369
370    if (size < payloadOffset) {
371        // Not enough data to fit the basic header and all the CSRC entries.
372        return -1;
373    }
374
375    if (data[0] & 0x10) {
376        // Header eXtension present.
377
378        if (size < payloadOffset + 4) {
379            // Not enough data to fit the basic header, all CSRC entries
380            // and the first 4 bytes of the extension header.
381
382            return -1;
383        }
384
385        const uint8_t *extensionData = &data[payloadOffset];
386
387        size_t extensionLength =
388            4 * (extensionData[2] << 8 | extensionData[3]);
389
390        if (size < payloadOffset + 4 + extensionLength) {
391            return -1;
392        }
393
394        payloadOffset += 4 + extensionLength;
395    }
396
397    uint32_t srcId = u32at(&data[8]);
398
399    sp<ARTPSource> source = findSource(s, srcId);
400
401    uint32_t rtpTime = u32at(&data[4]);
402
403    sp<AMessage> meta = buffer->meta();
404    meta->setInt32("ssrc", srcId);
405    meta->setInt32("rtp-time", rtpTime);
406    meta->setInt32("PT", data[1] & 0x7f);
407    meta->setInt32("M", data[1] >> 7);
408
409    buffer->setInt32Data(u16at(&data[2]));
410    buffer->setRange(payloadOffset, size - payloadOffset);
411
412    if ((mFlags & kFakeTimestamps) && !source->timeEstablished()) {
413        source->timeUpdate(rtpTime, 0);
414        source->timeUpdate(rtpTime + 90000, 0x100000000ll);
415        CHECK(source->timeEstablished());
416    }
417
418    source->processRTPPacket(buffer);
419
420    return OK;
421}
422
423status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
424    const uint8_t *data = buffer->data();
425    size_t size = buffer->size();
426
427    while (size > 0) {
428        if (size < 8) {
429            // Too short to be a valid RTCP header
430            return -1;
431        }
432
433        if ((data[0] >> 6) != 2) {
434            // Unsupported version.
435            return -1;
436        }
437
438        if (data[0] & 0x20) {
439            // Padding present.
440
441            size_t paddingLength = data[size - 1];
442
443            if (paddingLength + 12 > size) {
444                // If we removed this much padding we'd end up with something
445                // that's too short to be a valid RTP header.
446                return -1;
447            }
448
449            size -= paddingLength;
450        }
451
452        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
453
454        if (size < headerLength) {
455            // Only received a partial packet?
456            return -1;
457        }
458
459        switch (data[1]) {
460            case 200:
461            {
462                parseSR(s, data, headerLength);
463                break;
464            }
465
466            case 201:  // RR
467            case 202:  // SDES
468            case 204:  // APP
469                break;
470
471            case 205:  // TSFB (transport layer specific feedback)
472            case 206:  // PSFB (payload specific feedback)
473                // hexdump(data, headerLength);
474                break;
475
476            case 203:
477            {
478                parseBYE(s, data, headerLength);
479                break;
480            }
481
482            default:
483            {
484                LOG(WARNING) << "Unknown RTCP packet type "
485                             << (unsigned)data[1]
486                             << " of size " << headerLength;
487                break;
488            }
489        }
490
491        data += headerLength;
492        size -= headerLength;
493    }
494
495    return OK;
496}
497
498status_t ARTPConnection::parseBYE(
499        StreamInfo *s, const uint8_t *data, size_t size) {
500    size_t SC = data[0] & 0x3f;
501
502    if (SC == 0 || size < (4 + SC * 4)) {
503        // Packet too short for the minimal BYE header.
504        return -1;
505    }
506
507    uint32_t id = u32at(&data[4]);
508
509    sp<ARTPSource> source = findSource(s, id);
510
511    source->byeReceived();
512
513    return OK;
514}
515
516status_t ARTPConnection::parseSR(
517        StreamInfo *s, const uint8_t *data, size_t size) {
518    size_t RC = data[0] & 0x1f;
519
520    if (size < (7 + RC * 6) * 4) {
521        // Packet too short for the minimal SR header.
522        return -1;
523    }
524
525    uint32_t id = u32at(&data[4]);
526    uint64_t ntpTime = u64at(&data[8]);
527    uint32_t rtpTime = u32at(&data[16]);
528
529#if 0
530    LOG(INFO) << StringPrintf(
531            "XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
532            id,
533            rtpTime, (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
534#endif
535
536    sp<ARTPSource> source = findSource(s, id);
537
538    if ((mFlags & kFakeTimestamps) == 0) {
539        source->timeUpdate(rtpTime, ntpTime);
540    }
541
542    return 0;
543}
544
545sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
546    sp<ARTPSource> source;
547    ssize_t index = info->mSources.indexOfKey(srcId);
548    if (index < 0) {
549        index = info->mSources.size();
550
551        source = new ARTPSource(
552                srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
553
554        info->mSources.add(srcId, source);
555    } else {
556        source = info->mSources.valueAt(index);
557    }
558
559    return source;
560}
561
562}  // namespace android
563
564