ARTPConnection.cpp revision 39ddf8e0f18766f7ba1e3246b774aa6ebd93eea8
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "ARTPConnection.h"
18
19#include "ARTPSource.h"
20#include "ASessionDescription.h"
21
22#include <media/stagefright/foundation/ABuffer.h>
23#include <media/stagefright/foundation/ADebug.h>
24#include <media/stagefright/foundation/AMessage.h>
25#include <media/stagefright/foundation/AString.h>
26#include <media/stagefright/foundation/hexdump.h>
27
28#include <arpa/inet.h>
29#include <sys/socket.h>
30
31#define IGNORE_RTCP_TIME        0
32
33namespace android {
34
35static const size_t kMaxUDPSize = 1500;
36
37static uint16_t u16at(const uint8_t *data) {
38    return data[0] << 8 | data[1];
39}
40
41static uint32_t u32at(const uint8_t *data) {
42    return u16at(data) << 16 | u16at(&data[2]);
43}
44
45static uint64_t u64at(const uint8_t *data) {
46    return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
47}
48
49// static
50const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
51
52struct ARTPConnection::StreamInfo {
53    int mRTPSocket;
54    int mRTCPSocket;
55    sp<ASessionDescription> mSessionDesc;
56    size_t mIndex;
57    sp<AMessage> mNotifyMsg;
58    KeyedVector<uint32_t, sp<ARTPSource> > mSources;
59
60    int32_t mNumRTCPPacketsReceived;
61    struct sockaddr_in mRemoteRTCPAddr;
62};
63
64ARTPConnection::ARTPConnection()
65    : mPollEventPending(false),
66      mLastReceiverReportTimeUs(-1) {
67}
68
69ARTPConnection::~ARTPConnection() {
70}
71
72void ARTPConnection::addStream(
73        int rtpSocket, int rtcpSocket,
74        const sp<ASessionDescription> &sessionDesc,
75        size_t index,
76        const sp<AMessage> &notify) {
77    sp<AMessage> msg = new AMessage(kWhatAddStream, id());
78    msg->setInt32("rtp-socket", rtpSocket);
79    msg->setInt32("rtcp-socket", rtcpSocket);
80    msg->setObject("session-desc", sessionDesc);
81    msg->setSize("index", index);
82    msg->setMessage("notify", notify);
83    msg->post();
84}
85
86void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
87    sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
88    msg->setInt32("rtp-socket", rtpSocket);
89    msg->setInt32("rtcp-socket", rtcpSocket);
90    msg->post();
91}
92
93static void bumpSocketBufferSize(int s) {
94    int size = 256 * 1024;
95    CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
96}
97
98// static
99void ARTPConnection::MakePortPair(
100        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
101    *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
102    CHECK_GE(*rtpSocket, 0);
103
104    bumpSocketBufferSize(*rtpSocket);
105
106    *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
107    CHECK_GE(*rtcpSocket, 0);
108
109    bumpSocketBufferSize(*rtcpSocket);
110
111    unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
112    start &= ~1;
113
114    for (unsigned port = start; port < 65536; port += 2) {
115        struct sockaddr_in addr;
116        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
117        addr.sin_family = AF_INET;
118        addr.sin_addr.s_addr = INADDR_ANY;
119        addr.sin_port = htons(port);
120
121        if (bind(*rtpSocket,
122                 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
123            continue;
124        }
125
126        addr.sin_port = htons(port + 1);
127
128        if (bind(*rtcpSocket,
129                 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
130            *rtpPort = port;
131            return;
132        }
133    }
134
135    TRESPASS();
136}
137
138void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
139    switch (msg->what()) {
140        case kWhatAddStream:
141        {
142            onAddStream(msg);
143            break;
144        }
145
146        case kWhatRemoveStream:
147        {
148            onRemoveStream(msg);
149            break;
150        }
151
152        case kWhatPollStreams:
153        {
154            onPollStreams();
155            break;
156        }
157
158        default:
159        {
160            TRESPASS();
161            break;
162        }
163    }
164}
165
166void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
167    mStreams.push_back(StreamInfo());
168    StreamInfo *info = &*--mStreams.end();
169
170    int32_t s;
171    CHECK(msg->findInt32("rtp-socket", &s));
172    info->mRTPSocket = s;
173    CHECK(msg->findInt32("rtcp-socket", &s));
174    info->mRTCPSocket = s;
175
176    sp<RefBase> obj;
177    CHECK(msg->findObject("session-desc", &obj));
178    info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
179
180    CHECK(msg->findSize("index", &info->mIndex));
181    CHECK(msg->findMessage("notify", &info->mNotifyMsg));
182
183    info->mNumRTCPPacketsReceived = 0;
184    memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
185
186    postPollEvent();
187}
188
189void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
190    int32_t rtpSocket, rtcpSocket;
191    CHECK(msg->findInt32("rtp-socket", &rtpSocket));
192    CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
193
194    List<StreamInfo>::iterator it = mStreams.begin();
195    while (it != mStreams.end()
196           && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
197        ++it;
198    }
199
200    if (it == mStreams.end()) {
201        TRESPASS();
202    }
203
204    mStreams.erase(it);
205}
206
207void ARTPConnection::postPollEvent() {
208    if (mPollEventPending) {
209        return;
210    }
211
212    sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
213    msg->post();
214
215    mPollEventPending = true;
216}
217
218void ARTPConnection::onPollStreams() {
219    mPollEventPending = false;
220
221    if (mStreams.empty()) {
222        return;
223    }
224
225    struct timeval tv;
226    tv.tv_sec = 0;
227    tv.tv_usec = kSelectTimeoutUs;
228
229    fd_set rs;
230    FD_ZERO(&rs);
231
232    int maxSocket = -1;
233    for (List<StreamInfo>::iterator it = mStreams.begin();
234         it != mStreams.end(); ++it) {
235        FD_SET(it->mRTPSocket, &rs);
236        FD_SET(it->mRTCPSocket, &rs);
237
238        if (it->mRTPSocket > maxSocket) {
239            maxSocket = it->mRTPSocket;
240        }
241        if (it->mRTCPSocket > maxSocket) {
242            maxSocket = it->mRTCPSocket;
243        }
244    }
245
246    int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
247    CHECK_GE(res, 0);
248
249    if (res > 0) {
250        for (List<StreamInfo>::iterator it = mStreams.begin();
251             it != mStreams.end(); ++it) {
252            if (FD_ISSET(it->mRTPSocket, &rs)) {
253                receive(&*it, true);
254            }
255            if (FD_ISSET(it->mRTCPSocket, &rs)) {
256                receive(&*it, false);
257            }
258        }
259    }
260
261    postPollEvent();
262
263    int64_t nowUs = ALooper::GetNowUs();
264    if (mLastReceiverReportTimeUs <= 0
265            || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
266        sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
267        for (List<StreamInfo>::iterator it = mStreams.begin();
268             it != mStreams.end(); ++it) {
269            StreamInfo *s = &*it;
270
271            if (s->mNumRTCPPacketsReceived == 0) {
272                // We have never received any RTCP packets on this stream,
273                // we don't even know where to send a report.
274                continue;
275            }
276
277            buffer->setRange(0, 0);
278
279            for (size_t i = 0; i < s->mSources.size(); ++i) {
280                sp<ARTPSource> source = s->mSources.valueAt(i);
281
282                source->addReceiverReport(buffer);
283                source->addFIR(buffer);
284            }
285
286            if (buffer->size() > 0) {
287                LOG(VERBOSE) << "Sending RR...";
288
289                ssize_t n = sendto(
290                        s->mRTCPSocket, buffer->data(), buffer->size(), 0,
291                        (const struct sockaddr *)&s->mRemoteRTCPAddr,
292                        sizeof(s->mRemoteRTCPAddr));
293                CHECK_EQ(n, (ssize_t)buffer->size());
294
295                mLastReceiverReportTimeUs = nowUs;
296            }
297        }
298    }
299}
300
301status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
302    sp<ABuffer> buffer = new ABuffer(65536);
303
304    socklen_t remoteAddrLen =
305        (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
306            ? sizeof(s->mRemoteRTCPAddr) : 0;
307
308    ssize_t nbytes = recvfrom(
309            receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
310            buffer->data(),
311            buffer->capacity(),
312            0,
313            remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
314            remoteAddrLen > 0 ? &remoteAddrLen : NULL);
315
316    if (nbytes < 0) {
317        return -1;
318    }
319
320    buffer->setRange(0, nbytes);
321
322    status_t err;
323    if (receiveRTP) {
324        err = parseRTP(s, buffer);
325    } else {
326        ++s->mNumRTCPPacketsReceived;
327        err = parseRTCP(s, buffer);
328    }
329
330    return err;
331}
332
333status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
334    size_t size = buffer->size();
335
336    if (size < 12) {
337        // Too short to be a valid RTP header.
338        return -1;
339    }
340
341    const uint8_t *data = buffer->data();
342
343    if ((data[0] >> 6) != 2) {
344        // Unsupported version.
345        return -1;
346    }
347
348    if (data[0] & 0x20) {
349        // Padding present.
350
351        size_t paddingLength = data[size - 1];
352
353        if (paddingLength + 12 > size) {
354            // If we removed this much padding we'd end up with something
355            // that's too short to be a valid RTP header.
356            return -1;
357        }
358
359        size -= paddingLength;
360    }
361
362    int numCSRCs = data[0] & 0x0f;
363
364    size_t payloadOffset = 12 + 4 * numCSRCs;
365
366    if (size < payloadOffset) {
367        // Not enough data to fit the basic header and all the CSRC entries.
368        return -1;
369    }
370
371    if (data[0] & 0x10) {
372        // Header eXtension present.
373
374        if (size < payloadOffset + 4) {
375            // Not enough data to fit the basic header, all CSRC entries
376            // and the first 4 bytes of the extension header.
377
378            return -1;
379        }
380
381        const uint8_t *extensionData = &data[payloadOffset];
382
383        size_t extensionLength =
384            4 * (extensionData[2] << 8 | extensionData[3]);
385
386        if (size < payloadOffset + 4 + extensionLength) {
387            return -1;
388        }
389
390        payloadOffset += 4 + extensionLength;
391    }
392
393    uint32_t srcId = u32at(&data[8]);
394
395    sp<ARTPSource> source = findSource(s, srcId);
396
397    uint32_t rtpTime = u32at(&data[4]);
398
399    sp<AMessage> meta = buffer->meta();
400    meta->setInt32("ssrc", srcId);
401    meta->setInt32("rtp-time", rtpTime);
402    meta->setInt32("PT", data[1] & 0x7f);
403    meta->setInt32("M", data[1] >> 7);
404
405    buffer->setInt32Data(u16at(&data[2]));
406    buffer->setRange(payloadOffset, size - payloadOffset);
407
408    source->processRTPPacket(buffer);
409
410    return OK;
411}
412
413status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
414    const uint8_t *data = buffer->data();
415    size_t size = buffer->size();
416
417    while (size > 0) {
418        if (size < 8) {
419            // Too short to be a valid RTCP header
420            return -1;
421        }
422
423        if ((data[0] >> 6) != 2) {
424            // Unsupported version.
425            return -1;
426        }
427
428        if (data[0] & 0x20) {
429            // Padding present.
430
431            size_t paddingLength = data[size - 1];
432
433            if (paddingLength + 12 > size) {
434                // If we removed this much padding we'd end up with something
435                // that's too short to be a valid RTP header.
436                return -1;
437            }
438
439            size -= paddingLength;
440        }
441
442        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
443
444        if (size < headerLength) {
445            // Only received a partial packet?
446            return -1;
447        }
448
449        switch (data[1]) {
450            case 200:
451            {
452                parseSR(s, data, headerLength);
453                break;
454            }
455
456            case 201:  // RR
457            case 202:  // SDES
458            case 204:  // APP
459                break;
460
461            case 205:  // TSFB (transport layer specific feedback)
462            case 206:  // PSFB (payload specific feedback)
463                // hexdump(data, headerLength);
464                break;
465
466            case 203:
467            {
468                parseBYE(s, data, headerLength);
469                break;
470            }
471
472            default:
473            {
474                LOG(WARNING) << "Unknown RTCP packet type "
475                             << (unsigned)data[1]
476                             << " of size " << headerLength;
477                break;
478            }
479        }
480
481        data += headerLength;
482        size -= headerLength;
483    }
484
485    return OK;
486}
487
488status_t ARTPConnection::parseBYE(
489        StreamInfo *s, const uint8_t *data, size_t size) {
490    size_t SC = data[0] & 0x3f;
491
492    if (SC == 0 || size < (4 + SC * 4)) {
493        // Packet too short for the minimal BYE header.
494        return -1;
495    }
496
497    uint32_t id = u32at(&data[4]);
498
499    sp<ARTPSource> source = findSource(s, id);
500
501    source->byeReceived();
502
503    return OK;
504}
505
506status_t ARTPConnection::parseSR(
507        StreamInfo *s, const uint8_t *data, size_t size) {
508    size_t RC = data[0] & 0x1f;
509
510    if (size < (7 + RC * 6) * 4) {
511        // Packet too short for the minimal SR header.
512        return -1;
513    }
514
515    uint32_t id = u32at(&data[4]);
516    uint64_t ntpTime = u64at(&data[8]);
517    uint32_t rtpTime = u32at(&data[16]);
518
519#if 0
520    LOG(INFO) << StringPrintf(
521            "XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
522            id,
523            rtpTime, (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
524#endif
525
526    sp<ARTPSource> source = findSource(s, id);
527
528#if !IGNORE_RTCP_TIME
529    source->timeUpdate(rtpTime, ntpTime);
530#endif
531
532    return 0;
533}
534
535sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
536    sp<ARTPSource> source;
537    ssize_t index = info->mSources.indexOfKey(srcId);
538    if (index < 0) {
539        index = info->mSources.size();
540
541        source = new ARTPSource(
542                srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
543
544#if IGNORE_RTCP_TIME
545        // For H.263 gtalk to work...
546        source->timeUpdate(0, 0);
547        source->timeUpdate(30, 0x100000000ll);
548#endif
549
550        info->mSources.add(srcId, source);
551    } else {
552        source = info->mSources.valueAt(index);
553    }
554
555    return source;
556}
557
558}  // namespace android
559
560