1/*
2 * Copyright 2013, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "RTPSender"
19#include <utils/Log.h>
20
21#include "RTPSender.h"
22
23#include <media/stagefright/foundation/ABuffer.h>
24#include <media/stagefright/foundation/ADebug.h>
25#include <media/stagefright/foundation/AMessage.h>
26#include <media/stagefright/foundation/ANetworkSession.h>
27#include <media/stagefright/foundation/hexdump.h>
28#include <media/stagefright/MediaErrors.h>
29#include <media/stagefright/Utils.h>
30
31#include "include/avc_utils.h"
32
33namespace android {
34
35RTPSender::RTPSender(
36        const sp<ANetworkSession> &netSession,
37        const sp<AMessage> &notify)
38    : mNetSession(netSession),
39      mNotify(notify),
40      mRTPMode(TRANSPORT_UNDEFINED),
41      mRTCPMode(TRANSPORT_UNDEFINED),
42      mRTPSessionID(0),
43      mRTCPSessionID(0),
44      mRTPConnected(false),
45      mRTCPConnected(false),
46      mLastNTPTime(0),
47      mLastRTPTime(0),
48      mNumRTPSent(0),
49      mNumRTPOctetsSent(0),
50      mNumSRsSent(0),
51      mRTPSeqNo(0),
52      mHistorySize(0) {
53}
54
55RTPSender::~RTPSender() {
56    if (mRTCPSessionID != 0) {
57        mNetSession->destroySession(mRTCPSessionID);
58        mRTCPSessionID = 0;
59    }
60
61    if (mRTPSessionID != 0) {
62        mNetSession->destroySession(mRTPSessionID);
63        mRTPSessionID = 0;
64    }
65}
66
67// static
68int32_t RTPBase::PickRandomRTPPort() {
69    // Pick an even integer in range [1024, 65534)
70
71    static const size_t kRange = (65534 - 1024) / 2;
72
73    return (int32_t)(((float)(kRange + 1) * rand()) / RAND_MAX) * 2 + 1024;
74}
75
76status_t RTPSender::initAsync(
77        const char *remoteHost,
78        int32_t remoteRTPPort,
79        TransportMode rtpMode,
80        int32_t remoteRTCPPort,
81        TransportMode rtcpMode,
82        int32_t *outLocalRTPPort) {
83    if (mRTPMode != TRANSPORT_UNDEFINED
84            || rtpMode == TRANSPORT_UNDEFINED
85            || rtpMode == TRANSPORT_NONE
86            || rtcpMode == TRANSPORT_UNDEFINED) {
87        return INVALID_OPERATION;
88    }
89
90    CHECK_NE(rtpMode, TRANSPORT_TCP_INTERLEAVED);
91    CHECK_NE(rtcpMode, TRANSPORT_TCP_INTERLEAVED);
92
93    if ((rtcpMode == TRANSPORT_NONE && remoteRTCPPort >= 0)
94            || (rtcpMode != TRANSPORT_NONE && remoteRTCPPort < 0)) {
95        return INVALID_OPERATION;
96    }
97
98    sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id());
99
100    sp<AMessage> rtcpNotify;
101    if (remoteRTCPPort >= 0) {
102        rtcpNotify = new AMessage(kWhatRTCPNotify, id());
103    }
104
105    CHECK_EQ(mRTPSessionID, 0);
106    CHECK_EQ(mRTCPSessionID, 0);
107
108    int32_t localRTPPort;
109
110    for (;;) {
111        localRTPPort = PickRandomRTPPort();
112
113        status_t err;
114        if (rtpMode == TRANSPORT_UDP) {
115            err = mNetSession->createUDPSession(
116                    localRTPPort,
117                    remoteHost,
118                    remoteRTPPort,
119                    rtpNotify,
120                    &mRTPSessionID);
121        } else {
122            CHECK_EQ(rtpMode, TRANSPORT_TCP);
123            err = mNetSession->createTCPDatagramSession(
124                    localRTPPort,
125                    remoteHost,
126                    remoteRTPPort,
127                    rtpNotify,
128                    &mRTPSessionID);
129        }
130
131        if (err != OK) {
132            continue;
133        }
134
135        if (remoteRTCPPort < 0) {
136            break;
137        }
138
139        if (rtcpMode == TRANSPORT_UDP) {
140            err = mNetSession->createUDPSession(
141                    localRTPPort + 1,
142                    remoteHost,
143                    remoteRTCPPort,
144                    rtcpNotify,
145                    &mRTCPSessionID);
146        } else {
147            CHECK_EQ(rtcpMode, TRANSPORT_TCP);
148            err = mNetSession->createTCPDatagramSession(
149                    localRTPPort + 1,
150                    remoteHost,
151                    remoteRTCPPort,
152                    rtcpNotify,
153                    &mRTCPSessionID);
154        }
155
156        if (err == OK) {
157            break;
158        }
159
160        mNetSession->destroySession(mRTPSessionID);
161        mRTPSessionID = 0;
162    }
163
164    if (rtpMode == TRANSPORT_UDP) {
165        mRTPConnected = true;
166    }
167
168    if (rtcpMode == TRANSPORT_UDP) {
169        mRTCPConnected = true;
170    }
171
172    mRTPMode = rtpMode;
173    mRTCPMode = rtcpMode;
174    *outLocalRTPPort = localRTPPort;
175
176    if (mRTPMode == TRANSPORT_UDP
177            && (mRTCPMode == TRANSPORT_UDP || mRTCPMode == TRANSPORT_NONE)) {
178        notifyInitDone(OK);
179    }
180
181    return OK;
182}
183
184status_t RTPSender::queueBuffer(
185        const sp<ABuffer> &buffer, uint8_t packetType, PacketizationMode mode) {
186    status_t err;
187
188    switch (mode) {
189        case PACKETIZATION_NONE:
190            err = queueRawPacket(buffer, packetType);
191            break;
192
193        case PACKETIZATION_TRANSPORT_STREAM:
194            err = queueTSPackets(buffer, packetType);
195            break;
196
197        case PACKETIZATION_H264:
198            err  = queueAVCBuffer(buffer, packetType);
199            break;
200
201        default:
202            TRESPASS();
203    }
204
205    return err;
206}
207
208status_t RTPSender::queueRawPacket(
209        const sp<ABuffer> &packet, uint8_t packetType) {
210    CHECK_LE(packet->size(), kMaxUDPPacketSize - 12);
211
212    int64_t timeUs;
213    CHECK(packet->meta()->findInt64("timeUs", &timeUs));
214
215    sp<ABuffer> udpPacket = new ABuffer(12 + packet->size());
216
217    udpPacket->setInt32Data(mRTPSeqNo);
218
219    uint8_t *rtp = udpPacket->data();
220    rtp[0] = 0x80;
221    rtp[1] = packetType;
222
223    rtp[2] = (mRTPSeqNo >> 8) & 0xff;
224    rtp[3] = mRTPSeqNo & 0xff;
225    ++mRTPSeqNo;
226
227    uint32_t rtpTime = (timeUs * 9) / 100ll;
228
229    rtp[4] = rtpTime >> 24;
230    rtp[5] = (rtpTime >> 16) & 0xff;
231    rtp[6] = (rtpTime >> 8) & 0xff;
232    rtp[7] = rtpTime & 0xff;
233
234    rtp[8] = kSourceID >> 24;
235    rtp[9] = (kSourceID >> 16) & 0xff;
236    rtp[10] = (kSourceID >> 8) & 0xff;
237    rtp[11] = kSourceID & 0xff;
238
239    memcpy(&rtp[12], packet->data(), packet->size());
240
241    return sendRTPPacket(
242            udpPacket,
243            true /* storeInHistory */,
244            true /* timeValid */,
245            ALooper::GetNowUs());
246}
247
248status_t RTPSender::queueTSPackets(
249        const sp<ABuffer> &tsPackets, uint8_t packetType) {
250    CHECK_EQ(0, tsPackets->size() % 188);
251
252    int64_t timeUs;
253    CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs));
254
255    const size_t numTSPackets = tsPackets->size() / 188;
256
257    size_t srcOffset = 0;
258    while (srcOffset < tsPackets->size()) {
259        sp<ABuffer> udpPacket =
260            new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188);
261
262        udpPacket->setInt32Data(mRTPSeqNo);
263
264        uint8_t *rtp = udpPacket->data();
265        rtp[0] = 0x80;
266        rtp[1] = packetType;
267
268        rtp[2] = (mRTPSeqNo >> 8) & 0xff;
269        rtp[3] = mRTPSeqNo & 0xff;
270        ++mRTPSeqNo;
271
272        int64_t nowUs = ALooper::GetNowUs();
273        uint32_t rtpTime = (nowUs * 9) / 100ll;
274
275        rtp[4] = rtpTime >> 24;
276        rtp[5] = (rtpTime >> 16) & 0xff;
277        rtp[6] = (rtpTime >> 8) & 0xff;
278        rtp[7] = rtpTime & 0xff;
279
280        rtp[8] = kSourceID >> 24;
281        rtp[9] = (kSourceID >> 16) & 0xff;
282        rtp[10] = (kSourceID >> 8) & 0xff;
283        rtp[11] = kSourceID & 0xff;
284
285        size_t numTSPackets = (tsPackets->size() - srcOffset) / 188;
286        if (numTSPackets > kMaxNumTSPacketsPerRTPPacket) {
287            numTSPackets = kMaxNumTSPacketsPerRTPPacket;
288        }
289
290        memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188);
291
292        udpPacket->setRange(0, 12 + numTSPackets * 188);
293
294        srcOffset += numTSPackets * 188;
295        bool isLastPacket = (srcOffset == tsPackets->size());
296
297        status_t err = sendRTPPacket(
298                udpPacket,
299                true /* storeInHistory */,
300                isLastPacket /* timeValid */,
301                timeUs);
302
303        if (err != OK) {
304            return err;
305        }
306    }
307
308    return OK;
309}
310
311status_t RTPSender::queueAVCBuffer(
312        const sp<ABuffer> &accessUnit, uint8_t packetType) {
313    int64_t timeUs;
314    CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
315
316    uint32_t rtpTime = (timeUs * 9 / 100ll);
317
318    List<sp<ABuffer> > packets;
319
320    sp<ABuffer> out = new ABuffer(kMaxUDPPacketSize);
321    size_t outBytesUsed = 12;  // Placeholder for RTP header.
322
323    const uint8_t *data = accessUnit->data();
324    size_t size = accessUnit->size();
325    const uint8_t *nalStart;
326    size_t nalSize;
327    while (getNextNALUnit(
328                &data, &size, &nalStart, &nalSize,
329                true /* startCodeFollows */) == OK) {
330        size_t bytesNeeded = nalSize + 2;
331        if (outBytesUsed == 12) {
332            ++bytesNeeded;
333        }
334
335        if (outBytesUsed + bytesNeeded > out->capacity()) {
336            bool emitSingleNALPacket = false;
337
338            if (outBytesUsed == 12
339                    && outBytesUsed + nalSize <= out->capacity()) {
340                // We haven't emitted anything into the current packet yet and
341                // this NAL unit fits into a single-NAL-unit-packet while
342                // it wouldn't have fit as part of a STAP-A packet.
343
344                memcpy(out->data() + outBytesUsed, nalStart, nalSize);
345                outBytesUsed += nalSize;
346
347                emitSingleNALPacket = true;
348            }
349
350            if (outBytesUsed > 12) {
351                out->setRange(0, outBytesUsed);
352                packets.push_back(out);
353                out = new ABuffer(kMaxUDPPacketSize);
354                outBytesUsed = 12;  // Placeholder for RTP header
355            }
356
357            if (emitSingleNALPacket) {
358                continue;
359            }
360        }
361
362        if (outBytesUsed + bytesNeeded <= out->capacity()) {
363            uint8_t *dst = out->data() + outBytesUsed;
364
365            if (outBytesUsed == 12) {
366                *dst++ = 24;  // STAP-A header
367            }
368
369            *dst++ = (nalSize >> 8) & 0xff;
370            *dst++ = nalSize & 0xff;
371            memcpy(dst, nalStart, nalSize);
372
373            outBytesUsed += bytesNeeded;
374            continue;
375        }
376
377        // This single NAL unit does not fit into a single RTP packet,
378        // we need to emit an FU-A.
379
380        CHECK_EQ(outBytesUsed, 12u);
381
382        uint8_t nalType = nalStart[0] & 0x1f;
383        uint8_t nri = (nalStart[0] >> 5) & 3;
384
385        size_t srcOffset = 1;
386        while (srcOffset < nalSize) {
387            size_t copy = out->capacity() - outBytesUsed - 2;
388            if (copy > nalSize - srcOffset) {
389                copy = nalSize - srcOffset;
390            }
391
392            uint8_t *dst = out->data() + outBytesUsed;
393            dst[0] = (nri << 5) | 28;
394
395            dst[1] = nalType;
396
397            if (srcOffset == 1) {
398                dst[1] |= 0x80;
399            }
400
401            if (srcOffset + copy == nalSize) {
402                dst[1] |= 0x40;
403            }
404
405            memcpy(&dst[2], nalStart + srcOffset, copy);
406            srcOffset += copy;
407
408            out->setRange(0, outBytesUsed + copy + 2);
409
410            packets.push_back(out);
411            out = new ABuffer(kMaxUDPPacketSize);
412            outBytesUsed = 12;  // Placeholder for RTP header
413        }
414    }
415
416    if (outBytesUsed > 12) {
417        out->setRange(0, outBytesUsed);
418        packets.push_back(out);
419    }
420
421    while (!packets.empty()) {
422        sp<ABuffer> out = *packets.begin();
423        packets.erase(packets.begin());
424
425        out->setInt32Data(mRTPSeqNo);
426
427        bool last = packets.empty();
428
429        uint8_t *dst = out->data();
430
431        dst[0] = 0x80;
432
433        dst[1] = packetType;
434        if (last) {
435            dst[1] |= 1 << 7;  // M-bit
436        }
437
438        dst[2] = (mRTPSeqNo >> 8) & 0xff;
439        dst[3] = mRTPSeqNo & 0xff;
440        ++mRTPSeqNo;
441
442        dst[4] = rtpTime >> 24;
443        dst[5] = (rtpTime >> 16) & 0xff;
444        dst[6] = (rtpTime >> 8) & 0xff;
445        dst[7] = rtpTime & 0xff;
446        dst[8] = kSourceID >> 24;
447        dst[9] = (kSourceID >> 16) & 0xff;
448        dst[10] = (kSourceID >> 8) & 0xff;
449        dst[11] = kSourceID & 0xff;
450
451        status_t err = sendRTPPacket(out, true /* storeInHistory */);
452
453        if (err != OK) {
454            return err;
455        }
456    }
457
458    return OK;
459}
460
461status_t RTPSender::sendRTPPacket(
462        const sp<ABuffer> &buffer, bool storeInHistory,
463        bool timeValid, int64_t timeUs) {
464    CHECK(mRTPConnected);
465
466    status_t err = mNetSession->sendRequest(
467            mRTPSessionID, buffer->data(), buffer->size(),
468            timeValid, timeUs);
469
470    if (err != OK) {
471        return err;
472    }
473
474    mLastNTPTime = GetNowNTP();
475    mLastRTPTime = U32_AT(buffer->data() + 4);
476
477    ++mNumRTPSent;
478    mNumRTPOctetsSent += buffer->size() - 12;
479
480    if (storeInHistory) {
481        if (mHistorySize == kMaxHistorySize) {
482            mHistory.erase(mHistory.begin());
483        } else {
484            ++mHistorySize;
485        }
486        mHistory.push_back(buffer);
487    }
488
489    return OK;
490}
491
492// static
493uint64_t RTPSender::GetNowNTP() {
494    struct timeval tv;
495    gettimeofday(&tv, NULL /* timezone */);
496
497    uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec;
498
499    nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll;
500
501    uint64_t hi = nowUs / 1000000ll;
502    uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll;
503
504    return (hi << 32) | lo;
505}
506
507void RTPSender::onMessageReceived(const sp<AMessage> &msg) {
508    switch (msg->what()) {
509        case kWhatRTPNotify:
510        case kWhatRTCPNotify:
511            onNetNotify(msg->what() == kWhatRTPNotify, msg);
512            break;
513
514        default:
515            TRESPASS();
516    }
517}
518
519void RTPSender::onNetNotify(bool isRTP, const sp<AMessage> &msg) {
520    int32_t reason;
521    CHECK(msg->findInt32("reason", &reason));
522
523    switch (reason) {
524        case ANetworkSession::kWhatError:
525        {
526            int32_t sessionID;
527            CHECK(msg->findInt32("sessionID", &sessionID));
528
529            int32_t err;
530            CHECK(msg->findInt32("err", &err));
531
532            int32_t errorOccuredDuringSend;
533            CHECK(msg->findInt32("send", &errorOccuredDuringSend));
534
535            AString detail;
536            CHECK(msg->findString("detail", &detail));
537
538            ALOGE("An error occurred during %s in session %d "
539                  "(%d, '%s' (%s)).",
540                  errorOccuredDuringSend ? "send" : "receive",
541                  sessionID,
542                  err,
543                  detail.c_str(),
544                  strerror(-err));
545
546            mNetSession->destroySession(sessionID);
547
548            if (sessionID == mRTPSessionID) {
549                mRTPSessionID = 0;
550            } else if (sessionID == mRTCPSessionID) {
551                mRTCPSessionID = 0;
552            }
553
554            if (!mRTPConnected
555                    || (mRTPMode != TRANSPORT_NONE && !mRTCPConnected)) {
556                // We haven't completed initialization, attach the error
557                // to the notification instead.
558                notifyInitDone(err);
559                break;
560            }
561
562            notifyError(err);
563            break;
564        }
565
566        case ANetworkSession::kWhatDatagram:
567        {
568            sp<ABuffer> data;
569            CHECK(msg->findBuffer("data", &data));
570
571            if (isRTP) {
572                ALOGW("Huh? Received data on RTP connection...");
573            } else {
574                onRTCPData(data);
575            }
576            break;
577        }
578
579        case ANetworkSession::kWhatConnected:
580        {
581            int32_t sessionID;
582            CHECK(msg->findInt32("sessionID", &sessionID));
583
584            if  (isRTP) {
585                CHECK_EQ(mRTPMode, TRANSPORT_TCP);
586                CHECK_EQ(sessionID, mRTPSessionID);
587                mRTPConnected = true;
588            } else {
589                CHECK_EQ(mRTCPMode, TRANSPORT_TCP);
590                CHECK_EQ(sessionID, mRTCPSessionID);
591                mRTCPConnected = true;
592            }
593
594            if (mRTPConnected
595                    && (mRTCPMode == TRANSPORT_NONE || mRTCPConnected)) {
596                notifyInitDone(OK);
597            }
598            break;
599        }
600
601        case ANetworkSession::kWhatNetworkStall:
602        {
603            size_t numBytesQueued;
604            CHECK(msg->findSize("numBytesQueued", &numBytesQueued));
605
606            notifyNetworkStall(numBytesQueued);
607            break;
608        }
609
610        default:
611            TRESPASS();
612    }
613}
614
615status_t RTPSender::onRTCPData(const sp<ABuffer> &buffer) {
616    const uint8_t *data = buffer->data();
617    size_t size = buffer->size();
618
619    while (size > 0) {
620        if (size < 8) {
621            // Too short to be a valid RTCP header
622            return ERROR_MALFORMED;
623        }
624
625        if ((data[0] >> 6) != 2) {
626            // Unsupported version.
627            return ERROR_UNSUPPORTED;
628        }
629
630        if (data[0] & 0x20) {
631            // Padding present.
632
633            size_t paddingLength = data[size - 1];
634
635            if (paddingLength + 12 > size) {
636                // If we removed this much padding we'd end up with something
637                // that's too short to be a valid RTP header.
638                return ERROR_MALFORMED;
639            }
640
641            size -= paddingLength;
642        }
643
644        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
645
646        if (size < headerLength) {
647            // Only received a partial packet?
648            return ERROR_MALFORMED;
649        }
650
651        switch (data[1]) {
652            case 200:
653            case 201:  // RR
654                parseReceiverReport(data, headerLength);
655                break;
656
657            case 202:  // SDES
658            case 203:
659                break;
660
661            case 204:  // APP
662                parseAPP(data, headerLength);
663                break;
664
665            case 205:  // TSFB (transport layer specific feedback)
666                parseTSFB(data, headerLength);
667                break;
668
669            case 206:  // PSFB (payload specific feedback)
670                // hexdump(data, headerLength);
671                break;
672
673            default:
674            {
675                ALOGW("Unknown RTCP packet type %u of size %d",
676                     (unsigned)data[1], headerLength);
677                break;
678            }
679        }
680
681        data += headerLength;
682        size -= headerLength;
683    }
684
685    return OK;
686}
687
688status_t RTPSender::parseReceiverReport(
689        const uint8_t *data, size_t /* size */) {
690    float fractionLost = data[12] / 256.0f;
691
692    ALOGI("lost %.2f %% of packets during report interval.",
693          100.0f * fractionLost);
694
695    return OK;
696}
697
698status_t RTPSender::parseTSFB(const uint8_t *data, size_t size) {
699    if ((data[0] & 0x1f) != 1) {
700        return ERROR_UNSUPPORTED;  // We only support NACK for now.
701    }
702
703    uint32_t srcId = U32_AT(&data[8]);
704    if (srcId != kSourceID) {
705        return ERROR_MALFORMED;
706    }
707
708    for (size_t i = 12; i < size; i += 4) {
709        uint16_t seqNo = U16_AT(&data[i]);
710        uint16_t blp = U16_AT(&data[i + 2]);
711
712        List<sp<ABuffer> >::iterator it = mHistory.begin();
713        bool foundSeqNo = false;
714        while (it != mHistory.end()) {
715            const sp<ABuffer> &buffer = *it;
716
717            uint16_t bufferSeqNo = buffer->int32Data() & 0xffff;
718
719            bool retransmit = false;
720            if (bufferSeqNo == seqNo) {
721                retransmit = true;
722            } else if (blp != 0) {
723                for (size_t i = 0; i < 16; ++i) {
724                    if ((blp & (1 << i))
725                        && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) {
726                        blp &= ~(1 << i);
727                        retransmit = true;
728                    }
729                }
730            }
731
732            if (retransmit) {
733                ALOGV("retransmitting seqNo %d", bufferSeqNo);
734
735                CHECK_EQ((status_t)OK,
736                         sendRTPPacket(buffer, false /* storeInHistory */));
737
738                if (bufferSeqNo == seqNo) {
739                    foundSeqNo = true;
740                }
741
742                if (foundSeqNo && blp == 0) {
743                    break;
744                }
745            }
746
747            ++it;
748        }
749
750        if (!foundSeqNo || blp != 0) {
751            ALOGI("Some sequence numbers were no longer available for "
752                  "retransmission (seqNo = %d, foundSeqNo = %d, blp = 0x%04x)",
753                  seqNo, foundSeqNo, blp);
754
755            if (!mHistory.empty()) {
756                int32_t earliest = (*mHistory.begin())->int32Data() & 0xffff;
757                int32_t latest = (*--mHistory.end())->int32Data() & 0xffff;
758
759                ALOGI("have seq numbers from %d - %d", earliest, latest);
760            }
761        }
762    }
763
764    return OK;
765}
766
767status_t RTPSender::parseAPP(const uint8_t *data, size_t size) {
768    if (!memcmp("late", &data[8], 4)) {
769        int64_t avgLatencyUs = (int64_t)U64_AT(&data[12]);
770        int64_t maxLatencyUs = (int64_t)U64_AT(&data[20]);
771
772        sp<AMessage> notify = mNotify->dup();
773        notify->setInt32("what", kWhatInformSender);
774        notify->setInt64("avgLatencyUs", avgLatencyUs);
775        notify->setInt64("maxLatencyUs", maxLatencyUs);
776        notify->post();
777    }
778
779    return OK;
780}
781
782void RTPSender::notifyInitDone(status_t err) {
783    sp<AMessage> notify = mNotify->dup();
784    notify->setInt32("what", kWhatInitDone);
785    notify->setInt32("err", err);
786    notify->post();
787}
788
789void RTPSender::notifyError(status_t err) {
790    sp<AMessage> notify = mNotify->dup();
791    notify->setInt32("what", kWhatError);
792    notify->setInt32("err", err);
793    notify->post();
794}
795
796void RTPSender::notifyNetworkStall(size_t numBytesQueued) {
797    sp<AMessage> notify = mNotify->dup();
798    notify->setInt32("what", kWhatNetworkStall);
799    notify->setSize("numBytesQueued", numBytesQueued);
800    notify->post();
801}
802
803}  // namespace android
804
805