RTPSender.cpp revision 2aea9552aeba92bbaf9e56c666049ea2d14057b5
1/*
2 * Copyright 2013, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "RTPSender"
19#include <utils/Log.h>
20
21#include "RTPSender.h"
22
23#include "ANetworkSession.h"
24
25#include <media/stagefright/foundation/ABuffer.h>
26#include <media/stagefright/foundation/ADebug.h>
27#include <media/stagefright/foundation/AMessage.h>
28#include <media/stagefright/foundation/hexdump.h>
29#include <media/stagefright/MediaErrors.h>
30#include <media/stagefright/Utils.h>
31
32#include "include/avc_utils.h"
33
34namespace android {
35
36RTPSender::RTPSender(
37        const sp<ANetworkSession> &netSession,
38        const sp<AMessage> &notify)
39    : mNetSession(netSession),
40      mNotify(notify),
41      mRTPMode(TRANSPORT_UNDEFINED),
42      mRTCPMode(TRANSPORT_UNDEFINED),
43      mRTPSessionID(0),
44      mRTCPSessionID(0),
45      mRTPConnected(false),
46      mRTCPConnected(false),
47      mLastNTPTime(0),
48      mLastRTPTime(0),
49      mNumRTPSent(0),
50      mNumRTPOctetsSent(0),
51      mNumSRsSent(0),
52      mRTPSeqNo(0),
53      mHistorySize(0) {
54}
55
56RTPSender::~RTPSender() {
57    if (mRTCPSessionID != 0) {
58        mNetSession->destroySession(mRTCPSessionID);
59        mRTCPSessionID = 0;
60    }
61
62    if (mRTPSessionID != 0) {
63        mNetSession->destroySession(mRTPSessionID);
64        mRTPSessionID = 0;
65    }
66}
67
68// static
69int32_t RTPBase::PickRandomRTPPort() {
70    // Pick an even integer in range [1024, 65534)
71
72    static const size_t kRange = (65534 - 1024) / 2;
73
74    return (int32_t)(((float)(kRange + 1) * rand()) / RAND_MAX) * 2 + 1024;
75}
76
77status_t RTPSender::initAsync(
78        const char *remoteHost,
79        int32_t remoteRTPPort,
80        TransportMode rtpMode,
81        int32_t remoteRTCPPort,
82        TransportMode rtcpMode,
83        int32_t *outLocalRTPPort) {
84    if (mRTPMode != TRANSPORT_UNDEFINED
85            || rtpMode == TRANSPORT_UNDEFINED
86            || rtpMode == TRANSPORT_NONE
87            || rtcpMode == TRANSPORT_UNDEFINED) {
88        return INVALID_OPERATION;
89    }
90
91    CHECK_NE(rtpMode, TRANSPORT_TCP_INTERLEAVED);
92    CHECK_NE(rtcpMode, TRANSPORT_TCP_INTERLEAVED);
93
94    if ((rtcpMode == TRANSPORT_NONE && remoteRTCPPort >= 0)
95            || (rtcpMode != TRANSPORT_NONE && remoteRTCPPort < 0)) {
96        return INVALID_OPERATION;
97    }
98
99    sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id());
100
101    sp<AMessage> rtcpNotify;
102    if (remoteRTCPPort >= 0) {
103        rtcpNotify = new AMessage(kWhatRTCPNotify, id());
104    }
105
106    CHECK_EQ(mRTPSessionID, 0);
107    CHECK_EQ(mRTCPSessionID, 0);
108
109    int32_t localRTPPort;
110
111    for (;;) {
112        localRTPPort = PickRandomRTPPort();
113
114        status_t err;
115        if (rtpMode == TRANSPORT_UDP) {
116            err = mNetSession->createUDPSession(
117                    localRTPPort,
118                    remoteHost,
119                    remoteRTPPort,
120                    rtpNotify,
121                    &mRTPSessionID);
122        } else {
123            CHECK_EQ(rtpMode, TRANSPORT_TCP);
124            err = mNetSession->createTCPDatagramSession(
125                    localRTPPort,
126                    remoteHost,
127                    remoteRTPPort,
128                    rtpNotify,
129                    &mRTPSessionID);
130        }
131
132        if (err != OK) {
133            continue;
134        }
135
136        if (remoteRTCPPort < 0) {
137            break;
138        }
139
140        if (rtcpMode == TRANSPORT_UDP) {
141            err = mNetSession->createUDPSession(
142                    localRTPPort + 1,
143                    remoteHost,
144                    remoteRTCPPort,
145                    rtcpNotify,
146                    &mRTCPSessionID);
147        } else {
148            CHECK_EQ(rtcpMode, TRANSPORT_TCP);
149            err = mNetSession->createTCPDatagramSession(
150                    localRTPPort + 1,
151                    remoteHost,
152                    remoteRTCPPort,
153                    rtcpNotify,
154                    &mRTCPSessionID);
155        }
156
157        if (err == OK) {
158            break;
159        }
160
161        mNetSession->destroySession(mRTPSessionID);
162        mRTPSessionID = 0;
163    }
164
165    if (rtpMode == TRANSPORT_UDP) {
166        mRTPConnected = true;
167    }
168
169    if (rtcpMode == TRANSPORT_UDP) {
170        mRTCPConnected = true;
171    }
172
173    mRTPMode = rtpMode;
174    mRTCPMode = rtcpMode;
175    *outLocalRTPPort = localRTPPort;
176
177    if (mRTPMode == TRANSPORT_UDP
178            && (mRTCPMode == TRANSPORT_UDP || mRTCPMode == TRANSPORT_NONE)) {
179        notifyInitDone(OK);
180    }
181
182    return OK;
183}
184
185status_t RTPSender::queueBuffer(
186        const sp<ABuffer> &buffer, uint8_t packetType, PacketizationMode mode) {
187    status_t err;
188
189    switch (mode) {
190        case PACKETIZATION_TRANSPORT_STREAM:
191            err = queueTSPackets(buffer, packetType);
192            break;
193
194        case PACKETIZATION_H264:
195            err  = queueAVCBuffer(buffer, packetType);
196            break;
197
198        default:
199            TRESPASS();
200    }
201
202    return err;
203}
204
205status_t RTPSender::queueTSPackets(
206        const sp<ABuffer> &tsPackets, uint8_t packetType) {
207    CHECK_EQ(0, tsPackets->size() % 188);
208
209    int64_t timeUs;
210    CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs));
211
212    const size_t numTSPackets = tsPackets->size() / 188;
213
214    size_t srcOffset = 0;
215    while (srcOffset < tsPackets->size()) {
216        sp<ABuffer> udpPacket =
217            new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188);
218
219        udpPacket->setInt32Data(mRTPSeqNo);
220
221        uint8_t *rtp = udpPacket->data();
222        rtp[0] = 0x80;
223        rtp[1] = packetType;
224
225        rtp[2] = (mRTPSeqNo >> 8) & 0xff;
226        rtp[3] = mRTPSeqNo & 0xff;
227        ++mRTPSeqNo;
228
229        int64_t nowUs = ALooper::GetNowUs();
230        uint32_t rtpTime = (nowUs * 9) / 100ll;
231
232        rtp[4] = rtpTime >> 24;
233        rtp[5] = (rtpTime >> 16) & 0xff;
234        rtp[6] = (rtpTime >> 8) & 0xff;
235        rtp[7] = rtpTime & 0xff;
236
237        rtp[8] = kSourceID >> 24;
238        rtp[9] = (kSourceID >> 16) & 0xff;
239        rtp[10] = (kSourceID >> 8) & 0xff;
240        rtp[11] = kSourceID & 0xff;
241
242        size_t numTSPackets = (tsPackets->size() - srcOffset) / 188;
243        if (numTSPackets > kMaxNumTSPacketsPerRTPPacket) {
244            numTSPackets = kMaxNumTSPacketsPerRTPPacket;
245        }
246
247        memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188);
248
249        udpPacket->setRange(0, 12 + numTSPackets * 188);
250
251        srcOffset += numTSPackets * 188;
252        bool isLastPacket = (srcOffset == tsPackets->size());
253
254        status_t err = sendRTPPacket(
255                udpPacket,
256                true /* storeInHistory */,
257                isLastPacket /* timeValid */,
258                timeUs);
259
260        if (err != OK) {
261            return err;
262        }
263    }
264
265    return OK;
266}
267
268status_t RTPSender::queueAVCBuffer(
269        const sp<ABuffer> &accessUnit, uint8_t packetType) {
270    int64_t timeUs;
271    CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
272
273    uint32_t rtpTime = (timeUs * 9 / 100ll);
274
275    List<sp<ABuffer> > packets;
276
277    sp<ABuffer> out = new ABuffer(kMaxUDPPacketSize);
278    size_t outBytesUsed = 12;  // Placeholder for RTP header.
279
280    const uint8_t *data = accessUnit->data();
281    size_t size = accessUnit->size();
282    const uint8_t *nalStart;
283    size_t nalSize;
284    while (getNextNALUnit(
285                &data, &size, &nalStart, &nalSize,
286                true /* startCodeFollows */) == OK) {
287        size_t bytesNeeded = nalSize + 2;
288        if (outBytesUsed == 12) {
289            ++bytesNeeded;
290        }
291
292        if (outBytesUsed + bytesNeeded > out->capacity()) {
293            bool emitSingleNALPacket = false;
294
295            if (outBytesUsed == 12
296                    && outBytesUsed + nalSize <= out->capacity()) {
297                // We haven't emitted anything into the current packet yet and
298                // this NAL unit fits into a single-NAL-unit-packet while
299                // it wouldn't have fit as part of a STAP-A packet.
300
301                memcpy(out->data() + outBytesUsed, nalStart, nalSize);
302                outBytesUsed += nalSize;
303
304                emitSingleNALPacket = true;
305            }
306
307            if (outBytesUsed > 12) {
308                out->setRange(0, outBytesUsed);
309                packets.push_back(out);
310                out = new ABuffer(kMaxUDPPacketSize);
311                outBytesUsed = 12;  // Placeholder for RTP header
312            }
313
314            if (emitSingleNALPacket) {
315                continue;
316            }
317        }
318
319        if (outBytesUsed + bytesNeeded <= out->capacity()) {
320            uint8_t *dst = out->data() + outBytesUsed;
321
322            if (outBytesUsed == 12) {
323                *dst++ = 24;  // STAP-A header
324            }
325
326            *dst++ = (nalSize >> 8) & 0xff;
327            *dst++ = nalSize & 0xff;
328            memcpy(dst, nalStart, nalSize);
329
330            outBytesUsed += bytesNeeded;
331            continue;
332        }
333
334        // This single NAL unit does not fit into a single RTP packet,
335        // we need to emit an FU-A.
336
337        CHECK_EQ(outBytesUsed, 12u);
338
339        uint8_t nalType = nalStart[0] & 0x1f;
340        uint8_t nri = (nalStart[0] >> 5) & 3;
341
342        size_t srcOffset = 1;
343        while (srcOffset < nalSize) {
344            size_t copy = out->capacity() - outBytesUsed - 2;
345            if (copy > nalSize - srcOffset) {
346                copy = nalSize - srcOffset;
347            }
348
349            uint8_t *dst = out->data() + outBytesUsed;
350            dst[0] = (nri << 5) | 28;
351
352            dst[1] = nalType;
353
354            if (srcOffset == 1) {
355                dst[1] |= 0x80;
356            }
357
358            if (srcOffset + copy == nalSize) {
359                dst[1] |= 0x40;
360            }
361
362            memcpy(&dst[2], nalStart + srcOffset, copy);
363            srcOffset += copy;
364
365            out->setRange(0, outBytesUsed + copy + 2);
366
367            packets.push_back(out);
368            out = new ABuffer(kMaxUDPPacketSize);
369            outBytesUsed = 12;  // Placeholder for RTP header
370        }
371    }
372
373    if (outBytesUsed > 12) {
374        out->setRange(0, outBytesUsed);
375        packets.push_back(out);
376    }
377
378    while (!packets.empty()) {
379        sp<ABuffer> out = *packets.begin();
380        packets.erase(packets.begin());
381
382        out->setInt32Data(mRTPSeqNo);
383
384        bool last = packets.empty();
385
386        uint8_t *dst = out->data();
387
388        dst[0] = 0x80;
389
390        dst[1] = packetType;
391        if (last) {
392            dst[1] |= 1 << 7;  // M-bit
393        }
394
395        dst[2] = (mRTPSeqNo >> 8) & 0xff;
396        dst[3] = mRTPSeqNo & 0xff;
397        ++mRTPSeqNo;
398
399        dst[4] = rtpTime >> 24;
400        dst[5] = (rtpTime >> 16) & 0xff;
401        dst[6] = (rtpTime >> 8) & 0xff;
402        dst[7] = rtpTime & 0xff;
403        dst[8] = kSourceID >> 24;
404        dst[9] = (kSourceID >> 16) & 0xff;
405        dst[10] = (kSourceID >> 8) & 0xff;
406        dst[11] = kSourceID & 0xff;
407
408        status_t err = sendRTPPacket(out, true /* storeInHistory */);
409
410        if (err != OK) {
411            return err;
412        }
413    }
414
415    return OK;
416}
417
418status_t RTPSender::sendRTPPacket(
419        const sp<ABuffer> &buffer, bool storeInHistory,
420        bool timeValid, int64_t timeUs) {
421    CHECK(mRTPConnected);
422
423    status_t err = mNetSession->sendRequest(
424            mRTPSessionID, buffer->data(), buffer->size(),
425            timeValid, timeUs);
426
427    if (err != OK) {
428        return err;
429    }
430
431    mLastNTPTime = GetNowNTP();
432    mLastRTPTime = U32_AT(buffer->data() + 4);
433
434    ++mNumRTPSent;
435    mNumRTPOctetsSent += buffer->size() - 12;
436
437    if (storeInHistory) {
438        if (mHistorySize == kMaxHistorySize) {
439            mHistory.erase(mHistory.begin());
440        } else {
441            ++mHistorySize;
442        }
443        mHistory.push_back(buffer);
444    }
445
446    return OK;
447}
448
449// static
450uint64_t RTPSender::GetNowNTP() {
451    struct timeval tv;
452    gettimeofday(&tv, NULL /* timezone */);
453
454    uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec;
455
456    nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll;
457
458    uint64_t hi = nowUs / 1000000ll;
459    uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll;
460
461    return (hi << 32) | lo;
462}
463
464void RTPSender::onMessageReceived(const sp<AMessage> &msg) {
465    switch (msg->what()) {
466        case kWhatRTPNotify:
467        case kWhatRTCPNotify:
468            onNetNotify(msg->what() == kWhatRTPNotify, msg);
469            break;
470
471        default:
472            TRESPASS();
473    }
474}
475
476void RTPSender::onNetNotify(bool isRTP, const sp<AMessage> &msg) {
477    int32_t reason;
478    CHECK(msg->findInt32("reason", &reason));
479
480    switch (reason) {
481        case ANetworkSession::kWhatError:
482        {
483            int32_t sessionID;
484            CHECK(msg->findInt32("sessionID", &sessionID));
485
486            int32_t err;
487            CHECK(msg->findInt32("err", &err));
488
489            int32_t errorOccuredDuringSend;
490            CHECK(msg->findInt32("send", &errorOccuredDuringSend));
491
492            AString detail;
493            CHECK(msg->findString("detail", &detail));
494
495            ALOGE("An error occurred during %s in session %d "
496                  "(%d, '%s' (%s)).",
497                  errorOccuredDuringSend ? "send" : "receive",
498                  sessionID,
499                  err,
500                  detail.c_str(),
501                  strerror(-err));
502
503            mNetSession->destroySession(sessionID);
504
505            if (sessionID == mRTPSessionID) {
506                mRTPSessionID = 0;
507            } else if (sessionID == mRTCPSessionID) {
508                mRTCPSessionID = 0;
509            }
510
511            if (!mRTPConnected
512                    || (mRTPMode != TRANSPORT_NONE && !mRTCPConnected)) {
513                // We haven't completed initialization, attach the error
514                // to the notification instead.
515                notifyInitDone(err);
516                break;
517            }
518
519            notifyError(err);
520            break;
521        }
522
523        case ANetworkSession::kWhatDatagram:
524        {
525            sp<ABuffer> data;
526            CHECK(msg->findBuffer("data", &data));
527
528            if (isRTP) {
529                ALOGW("Huh? Received data on RTP connection...");
530            } else {
531                onRTCPData(data);
532            }
533            break;
534        }
535
536        case ANetworkSession::kWhatConnected:
537        {
538            int32_t sessionID;
539            CHECK(msg->findInt32("sessionID", &sessionID));
540
541            if  (isRTP) {
542                CHECK_EQ(mRTPMode, TRANSPORT_TCP);
543                CHECK_EQ(sessionID, mRTPSessionID);
544                mRTPConnected = true;
545            } else {
546                CHECK_EQ(mRTCPMode, TRANSPORT_TCP);
547                CHECK_EQ(sessionID, mRTCPSessionID);
548                mRTCPConnected = true;
549            }
550
551            if (mRTPConnected
552                    && (mRTCPMode == TRANSPORT_NONE || mRTCPConnected)) {
553                notifyInitDone(OK);
554            }
555            break;
556        }
557
558        case ANetworkSession::kWhatNetworkStall:
559        {
560            size_t numBytesQueued;
561            CHECK(msg->findSize("numBytesQueued", &numBytesQueued));
562
563            notifyNetworkStall(numBytesQueued);
564            break;
565        }
566
567        default:
568            TRESPASS();
569    }
570}
571
572status_t RTPSender::onRTCPData(const sp<ABuffer> &buffer) {
573    const uint8_t *data = buffer->data();
574    size_t size = buffer->size();
575
576    while (size > 0) {
577        if (size < 8) {
578            // Too short to be a valid RTCP header
579            return ERROR_MALFORMED;
580        }
581
582        if ((data[0] >> 6) != 2) {
583            // Unsupported version.
584            return ERROR_UNSUPPORTED;
585        }
586
587        if (data[0] & 0x20) {
588            // Padding present.
589
590            size_t paddingLength = data[size - 1];
591
592            if (paddingLength + 12 > size) {
593                // If we removed this much padding we'd end up with something
594                // that's too short to be a valid RTP header.
595                return ERROR_MALFORMED;
596            }
597
598            size -= paddingLength;
599        }
600
601        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
602
603        if (size < headerLength) {
604            // Only received a partial packet?
605            return ERROR_MALFORMED;
606        }
607
608        switch (data[1]) {
609            case 200:
610            case 201:  // RR
611                parseReceiverReport(data, headerLength);
612                break;
613
614            case 202:  // SDES
615            case 203:
616                break;
617
618            case 204:  // APP
619                parseAPP(data, headerLength);
620                break;
621
622            case 205:  // TSFB (transport layer specific feedback)
623                parseTSFB(data, headerLength);
624                break;
625
626            case 206:  // PSFB (payload specific feedback)
627                // hexdump(data, headerLength);
628                break;
629
630            default:
631            {
632                ALOGW("Unknown RTCP packet type %u of size %d",
633                     (unsigned)data[1], headerLength);
634                break;
635            }
636        }
637
638        data += headerLength;
639        size -= headerLength;
640    }
641
642    return OK;
643}
644
645status_t RTPSender::parseReceiverReport(const uint8_t *data, size_t size) {
646    // hexdump(data, size);
647
648    float fractionLost = data[12] / 256.0f;
649
650    ALOGI("lost %.2f %% of packets during report interval.",
651          100.0f * fractionLost);
652
653    return OK;
654}
655
656status_t RTPSender::parseTSFB(const uint8_t *data, size_t size) {
657    if ((data[0] & 0x1f) != 1) {
658        return ERROR_UNSUPPORTED;  // We only support NACK for now.
659    }
660
661    uint32_t srcId = U32_AT(&data[8]);
662    if (srcId != kSourceID) {
663        return ERROR_MALFORMED;
664    }
665
666    for (size_t i = 12; i < size; i += 4) {
667        uint16_t seqNo = U16_AT(&data[i]);
668        uint16_t blp = U16_AT(&data[i + 2]);
669
670        List<sp<ABuffer> >::iterator it = mHistory.begin();
671        bool foundSeqNo = false;
672        while (it != mHistory.end()) {
673            const sp<ABuffer> &buffer = *it;
674
675            uint16_t bufferSeqNo = buffer->int32Data() & 0xffff;
676
677            bool retransmit = false;
678            if (bufferSeqNo == seqNo) {
679                retransmit = true;
680            } else if (blp != 0) {
681                for (size_t i = 0; i < 16; ++i) {
682                    if ((blp & (1 << i))
683                        && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) {
684                        blp &= ~(1 << i);
685                        retransmit = true;
686                    }
687                }
688            }
689
690            if (retransmit) {
691                ALOGV("retransmitting seqNo %d", bufferSeqNo);
692
693                CHECK_EQ((status_t)OK,
694                         sendRTPPacket(buffer, false /* storeInHistory */));
695
696                if (bufferSeqNo == seqNo) {
697                    foundSeqNo = true;
698                }
699
700                if (foundSeqNo && blp == 0) {
701                    break;
702                }
703            }
704
705            ++it;
706        }
707
708        if (!foundSeqNo || blp != 0) {
709            ALOGI("Some sequence numbers were no longer available for "
710                  "retransmission (seqNo = %d, foundSeqNo = %d, blp = 0x%04x)",
711                  seqNo, foundSeqNo, blp);
712
713            if (!mHistory.empty()) {
714                int32_t earliest = (*mHistory.begin())->int32Data() & 0xffff;
715                int32_t latest = (*--mHistory.end())->int32Data() & 0xffff;
716
717                ALOGI("have seq numbers from %d - %d", earliest, latest);
718            }
719        }
720    }
721
722    return OK;
723}
724
725status_t RTPSender::parseAPP(const uint8_t *data, size_t size) {
726    if (!memcmp("late", &data[8], 4)) {
727        int64_t avgLatencyUs = (int64_t)U64_AT(&data[12]);
728        int64_t maxLatencyUs = (int64_t)U64_AT(&data[20]);
729
730        sp<AMessage> notify = mNotify->dup();
731        notify->setInt32("what", kWhatInformSender);
732        notify->setInt64("avgLatencyUs", avgLatencyUs);
733        notify->setInt64("maxLatencyUs", maxLatencyUs);
734        notify->post();
735    }
736
737    return OK;
738}
739
740void RTPSender::notifyInitDone(status_t err) {
741    sp<AMessage> notify = mNotify->dup();
742    notify->setInt32("what", kWhatInitDone);
743    notify->setInt32("err", err);
744    notify->post();
745}
746
747void RTPSender::notifyError(status_t err) {
748    sp<AMessage> notify = mNotify->dup();
749    notify->setInt32("what", kWhatError);
750    notify->setInt32("err", err);
751    notify->post();
752}
753
754void RTPSender::notifyNetworkStall(size_t numBytesQueued) {
755    sp<AMessage> notify = mNotify->dup();
756    notify->setInt32("what", kWhatNetworkStall);
757    notify->setSize("numBytesQueued", numBytesQueued);
758    notify->post();
759}
760
761}  // namespace android
762
763