1/*
2 * Copyright 2012, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "Sender"
19#include <utils/Log.h>
20
21#include "Sender.h"
22
23#include "ANetworkSession.h"
24
25#include <media/stagefright/foundation/ABuffer.h>
26#include <media/stagefright/foundation/ADebug.h>
27#include <media/stagefright/foundation/AMessage.h>
28#include <media/stagefright/foundation/hexdump.h>
29#include <media/stagefright/MediaErrors.h>
30#include <media/stagefright/Utils.h>
31
32#include <math.h>
33
34#define DEBUG_JITTER    0
35
36namespace android {
37
38////////////////////////////////////////////////////////////////////////////////
39
40#if DEBUG_JITTER
41struct TimeSeries {
42    TimeSeries();
43
44    void add(double val);
45
46    double mean() const;
47    double sdev() const;
48
49private:
50    enum {
51        kHistorySize = 20
52    };
53    double mValues[kHistorySize];
54
55    size_t mCount;
56    double mSum;
57};
58
59TimeSeries::TimeSeries()
60    : mCount(0),
61      mSum(0.0) {
62}
63
64void TimeSeries::add(double val) {
65    if (mCount < kHistorySize) {
66        mValues[mCount++] = val;
67        mSum += val;
68    } else {
69        mSum -= mValues[0];
70        memmove(&mValues[0], &mValues[1], (kHistorySize - 1) * sizeof(double));
71        mValues[kHistorySize - 1] = val;
72        mSum += val;
73    }
74}
75
76double TimeSeries::mean() const {
77    if (mCount < 1) {
78        return 0.0;
79    }
80
81    return mSum / mCount;
82}
83
84double TimeSeries::sdev() const {
85    if (mCount < 1) {
86        return 0.0;
87    }
88
89    double m = mean();
90
91    double sum = 0.0;
92    for (size_t i = 0; i < mCount; ++i) {
93        double tmp = mValues[i] - m;
94        tmp *= tmp;
95
96        sum += tmp;
97    }
98
99    return sqrt(sum / mCount);
100}
101#endif  // DEBUG_JITTER
102
103////////////////////////////////////////////////////////////////////////////////
104
105static size_t kMaxRTPPacketSize = 1500;
106static size_t kMaxNumTSPacketsPerRTPPacket = (kMaxRTPPacketSize - 12) / 188;
107
108Sender::Sender(
109        const sp<ANetworkSession> &netSession,
110        const sp<AMessage> &notify)
111    : mNetSession(netSession),
112      mNotify(notify),
113      mTSQueue(new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188)),
114      mTransportMode(TRANSPORT_UDP),
115      mRTPChannel(0),
116      mRTCPChannel(0),
117      mRTPPort(0),
118      mRTPSessionID(0),
119      mRTCPSessionID(0),
120#if ENABLE_RETRANSMISSION
121      mRTPRetransmissionSessionID(0),
122      mRTCPRetransmissionSessionID(0),
123#endif
124      mClientRTPPort(0),
125      mClientRTCPPort(0),
126      mRTPConnected(false),
127      mRTCPConnected(false),
128      mFirstOutputBufferReadyTimeUs(-1ll),
129      mFirstOutputBufferSentTimeUs(-1ll),
130      mRTPSeqNo(0),
131#if ENABLE_RETRANSMISSION
132      mRTPRetransmissionSeqNo(0),
133#endif
134      mLastNTPTime(0),
135      mLastRTPTime(0),
136      mNumRTPSent(0),
137      mNumRTPOctetsSent(0),
138      mNumSRsSent(0),
139      mSendSRPending(false)
140#if ENABLE_RETRANSMISSION
141      ,mHistoryLength(0)
142#endif
143#if TRACK_BANDWIDTH
144      ,mFirstPacketTimeUs(-1ll)
145      ,mTotalBytesSent(0ll)
146#endif
147#if LOG_TRANSPORT_STREAM
148    ,mLogFile(NULL)
149#endif
150{
151    mTSQueue->setRange(0, 12);
152
153#if LOG_TRANSPORT_STREAM
154    mLogFile = fopen("/system/etc/log.ts", "wb");
155#endif
156}
157
158Sender::~Sender() {
159#if ENABLE_RETRANSMISSION
160    if (mRTCPRetransmissionSessionID != 0) {
161        mNetSession->destroySession(mRTCPRetransmissionSessionID);
162    }
163
164    if (mRTPRetransmissionSessionID != 0) {
165        mNetSession->destroySession(mRTPRetransmissionSessionID);
166    }
167#endif
168
169    if (mRTCPSessionID != 0) {
170        mNetSession->destroySession(mRTCPSessionID);
171    }
172
173    if (mRTPSessionID != 0) {
174        mNetSession->destroySession(mRTPSessionID);
175    }
176
177#if LOG_TRANSPORT_STREAM
178    if (mLogFile != NULL) {
179        fclose(mLogFile);
180        mLogFile = NULL;
181    }
182#endif
183}
184
185status_t Sender::init(
186        const char *clientIP, int32_t clientRtp, int32_t clientRtcp,
187        TransportMode transportMode) {
188    mClientIP = clientIP;
189    mTransportMode = transportMode;
190
191    if (transportMode == TRANSPORT_TCP_INTERLEAVED) {
192        mRTPChannel = clientRtp;
193        mRTCPChannel = clientRtcp;
194        mRTPPort = 0;
195        mRTPSessionID = 0;
196        mRTCPSessionID = 0;
197        return OK;
198    }
199
200    mRTPChannel = 0;
201    mRTCPChannel = 0;
202
203    if (mTransportMode == TRANSPORT_TCP) {
204        // XXX This is wrong, we need to allocate sockets here, we only
205        // need to do this because the dongles are not establishing their
206        // end until after PLAY instead of before SETUP.
207        mRTPPort = 20000;
208        mRTPSessionID = 0;
209        mRTCPSessionID = 0;
210        mClientRTPPort = clientRtp;
211        mClientRTCPPort = clientRtcp;
212        return OK;
213    }
214
215    int serverRtp;
216
217    sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id());
218    sp<AMessage> rtcpNotify = new AMessage(kWhatRTCPNotify, id());
219
220#if ENABLE_RETRANSMISSION
221    sp<AMessage> rtpRetransmissionNotify =
222        new AMessage(kWhatRTPRetransmissionNotify, id());
223
224    sp<AMessage> rtcpRetransmissionNotify =
225        new AMessage(kWhatRTCPRetransmissionNotify, id());
226#endif
227
228    status_t err;
229    for (serverRtp = 15550;; serverRtp += 2) {
230        int32_t rtpSession;
231        if (mTransportMode == TRANSPORT_UDP) {
232            err = mNetSession->createUDPSession(
233                        serverRtp, clientIP, clientRtp,
234                        rtpNotify, &rtpSession);
235        } else {
236            err = mNetSession->createTCPDatagramSession(
237                        serverRtp, clientIP, clientRtp,
238                        rtpNotify, &rtpSession);
239        }
240
241        if (err != OK) {
242            ALOGI("failed to create RTP socket on port %d", serverRtp);
243            continue;
244        }
245
246        int32_t rtcpSession = 0;
247
248        if (clientRtcp >= 0) {
249            if (mTransportMode == TRANSPORT_UDP) {
250                err = mNetSession->createUDPSession(
251                        serverRtp + 1, clientIP, clientRtcp,
252                        rtcpNotify, &rtcpSession);
253            } else {
254                err = mNetSession->createTCPDatagramSession(
255                        serverRtp + 1, clientIP, clientRtcp,
256                        rtcpNotify, &rtcpSession);
257            }
258
259            if (err != OK) {
260                ALOGI("failed to create RTCP socket on port %d", serverRtp + 1);
261
262                mNetSession->destroySession(rtpSession);
263                continue;
264            }
265        }
266
267#if ENABLE_RETRANSMISSION
268        if (mTransportMode == TRANSPORT_UDP) {
269            int32_t rtpRetransmissionSession;
270
271            err = mNetSession->createUDPSession(
272                        serverRtp + kRetransmissionPortOffset,
273                        clientIP,
274                        clientRtp + kRetransmissionPortOffset,
275                        rtpRetransmissionNotify,
276                        &rtpRetransmissionSession);
277
278            if (err != OK) {
279                mNetSession->destroySession(rtcpSession);
280                mNetSession->destroySession(rtpSession);
281                continue;
282            }
283
284            CHECK_GE(clientRtcp, 0);
285
286            int32_t rtcpRetransmissionSession;
287            err = mNetSession->createUDPSession(
288                        serverRtp + 1 + kRetransmissionPortOffset,
289                        clientIP,
290                        clientRtp + 1 + kRetransmissionPortOffset,
291                        rtcpRetransmissionNotify,
292                        &rtcpRetransmissionSession);
293
294            if (err != OK) {
295                mNetSession->destroySession(rtpRetransmissionSession);
296                mNetSession->destroySession(rtcpSession);
297                mNetSession->destroySession(rtpSession);
298                continue;
299            }
300
301            mRTPRetransmissionSessionID = rtpRetransmissionSession;
302            mRTCPRetransmissionSessionID = rtcpRetransmissionSession;
303
304            ALOGI("rtpRetransmissionSessionID = %d, "
305                  "rtcpRetransmissionSessionID = %d",
306                  rtpRetransmissionSession, rtcpRetransmissionSession);
307        }
308#endif
309
310        mRTPPort = serverRtp;
311        mRTPSessionID = rtpSession;
312        mRTCPSessionID = rtcpSession;
313
314        ALOGI("rtpSessionID = %d, rtcpSessionID = %d", rtpSession, rtcpSession);
315        break;
316    }
317
318    if (mRTPPort == 0) {
319        return UNKNOWN_ERROR;
320    }
321
322    return OK;
323}
324
325status_t Sender::finishInit() {
326    if (mTransportMode != TRANSPORT_TCP) {
327        notifyInitDone();
328        return OK;
329    }
330
331    sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, id());
332
333    status_t err = mNetSession->createTCPDatagramSession(
334                mRTPPort, mClientIP.c_str(), mClientRTPPort,
335                rtpNotify, &mRTPSessionID);
336
337    if (err != OK) {
338        return err;
339    }
340
341    if (mClientRTCPPort >= 0) {
342        sp<AMessage> rtcpNotify = new AMessage(kWhatRTCPNotify, id());
343
344        err = mNetSession->createTCPDatagramSession(
345                mRTPPort + 1, mClientIP.c_str(), mClientRTCPPort,
346                rtcpNotify, &mRTCPSessionID);
347
348        if (err != OK) {
349            return err;
350        }
351    }
352
353    return OK;
354}
355
356int32_t Sender::getRTPPort() const {
357    return mRTPPort;
358}
359
360void Sender::queuePackets(
361        int64_t timeUs, const sp<ABuffer> &packets) {
362    bool isVideo = false;
363
364    int32_t dummy;
365    if (packets->meta()->findInt32("isVideo", &dummy)) {
366        isVideo = true;
367    }
368
369    int64_t delayUs;
370    int64_t whenUs;
371
372    if (mFirstOutputBufferReadyTimeUs < 0ll) {
373        mFirstOutputBufferReadyTimeUs = timeUs;
374        mFirstOutputBufferSentTimeUs = whenUs = ALooper::GetNowUs();
375        delayUs = 0ll;
376    } else {
377        int64_t nowUs = ALooper::GetNowUs();
378
379        whenUs = (timeUs - mFirstOutputBufferReadyTimeUs)
380                + mFirstOutputBufferSentTimeUs;
381
382        delayUs = whenUs - nowUs;
383    }
384
385    sp<AMessage> msg = new AMessage(kWhatQueuePackets, id());
386    msg->setBuffer("packets", packets);
387
388    packets->meta()->setInt64("timeUs", timeUs);
389    packets->meta()->setInt64("whenUs", whenUs);
390    packets->meta()->setInt64("delayUs", delayUs);
391    msg->post(delayUs > 0 ? delayUs : 0);
392}
393
394void Sender::onMessageReceived(const sp<AMessage> &msg) {
395    switch (msg->what()) {
396        case kWhatRTPNotify:
397        case kWhatRTCPNotify:
398#if ENABLE_RETRANSMISSION
399        case kWhatRTPRetransmissionNotify:
400        case kWhatRTCPRetransmissionNotify:
401#endif
402        {
403            int32_t reason;
404            CHECK(msg->findInt32("reason", &reason));
405
406            switch (reason) {
407                case ANetworkSession::kWhatError:
408                {
409                    int32_t sessionID;
410                    CHECK(msg->findInt32("sessionID", &sessionID));
411
412                    int32_t err;
413                    CHECK(msg->findInt32("err", &err));
414
415                    int32_t errorOccuredDuringSend;
416                    CHECK(msg->findInt32("send", &errorOccuredDuringSend));
417
418                    AString detail;
419                    CHECK(msg->findString("detail", &detail));
420
421                    if ((msg->what() == kWhatRTPNotify
422#if ENABLE_RETRANSMISSION
423                            || msg->what() == kWhatRTPRetransmissionNotify
424#endif
425                        ) && !errorOccuredDuringSend) {
426                        // This is ok, we don't expect to receive anything on
427                        // the RTP socket.
428                        break;
429                    }
430
431                    ALOGE("An error occurred during %s in session %d "
432                          "(%d, '%s' (%s)).",
433                          errorOccuredDuringSend ? "send" : "receive",
434                          sessionID,
435                          err,
436                          detail.c_str(),
437                          strerror(-err));
438
439                    mNetSession->destroySession(sessionID);
440
441                    if (sessionID == mRTPSessionID) {
442                        mRTPSessionID = 0;
443                    } else if (sessionID == mRTCPSessionID) {
444                        mRTCPSessionID = 0;
445                    }
446#if ENABLE_RETRANSMISSION
447                    else if (sessionID == mRTPRetransmissionSessionID) {
448                        mRTPRetransmissionSessionID = 0;
449                    } else if (sessionID == mRTCPRetransmissionSessionID) {
450                        mRTCPRetransmissionSessionID = 0;
451                    }
452#endif
453
454                    notifySessionDead();
455                    break;
456                }
457
458                case ANetworkSession::kWhatDatagram:
459                {
460                    int32_t sessionID;
461                    CHECK(msg->findInt32("sessionID", &sessionID));
462
463                    sp<ABuffer> data;
464                    CHECK(msg->findBuffer("data", &data));
465
466                    status_t err;
467                    if (msg->what() == kWhatRTCPNotify
468#if ENABLE_RETRANSMISSION
469                            || msg->what() == kWhatRTCPRetransmissionNotify
470#endif
471                       )
472                    {
473                        err = parseRTCP(data);
474                    }
475                    break;
476                }
477
478                case ANetworkSession::kWhatConnected:
479                {
480                    CHECK_EQ(mTransportMode, TRANSPORT_TCP);
481
482                    int32_t sessionID;
483                    CHECK(msg->findInt32("sessionID", &sessionID));
484
485                    if (sessionID == mRTPSessionID) {
486                        CHECK(!mRTPConnected);
487                        mRTPConnected = true;
488                        ALOGI("RTP Session now connected.");
489                    } else if (sessionID == mRTCPSessionID) {
490                        CHECK(!mRTCPConnected);
491                        mRTCPConnected = true;
492                        ALOGI("RTCP Session now connected.");
493                    } else {
494                        TRESPASS();
495                    }
496
497                    if (mRTPConnected
498                            && (mClientRTCPPort < 0 || mRTCPConnected)) {
499                        notifyInitDone();
500                    }
501                    break;
502                }
503
504                default:
505                    TRESPASS();
506            }
507            break;
508        }
509
510        case kWhatQueuePackets:
511        {
512            sp<ABuffer> packets;
513            CHECK(msg->findBuffer("packets", &packets));
514
515            onQueuePackets(packets);
516            break;
517        }
518
519        case kWhatSendSR:
520        {
521            mSendSRPending = false;
522
523            if (mRTCPSessionID == 0) {
524                break;
525            }
526
527            onSendSR();
528
529            scheduleSendSR();
530            break;
531        }
532    }
533}
534
535void Sender::onQueuePackets(const sp<ABuffer> &packets) {
536#if DEBUG_JITTER
537    int32_t dummy;
538    if (packets->meta()->findInt32("isVideo", &dummy)) {
539        static int64_t lastTimeUs = 0ll;
540        int64_t nowUs = ALooper::GetNowUs();
541
542        static TimeSeries series;
543        series.add((double)(nowUs - lastTimeUs));
544
545        ALOGI("deltaTimeUs = %lld us, mean %.2f, sdev %.2f",
546              nowUs - lastTimeUs, series.mean(), series.sdev());
547
548        lastTimeUs = nowUs;
549    }
550#endif
551
552    int64_t startTimeUs = ALooper::GetNowUs();
553
554    for (size_t offset = 0;
555            offset < packets->size(); offset += 188) {
556        bool lastTSPacket = (offset + 188 >= packets->size());
557
558        appendTSData(
559                packets->data() + offset,
560                188,
561                true /* timeDiscontinuity */,
562                lastTSPacket /* flush */);
563    }
564
565#if 0
566    int64_t netTimeUs = ALooper::GetNowUs() - startTimeUs;
567
568    int64_t whenUs;
569    CHECK(packets->meta()->findInt64("whenUs", &whenUs));
570
571    int64_t delayUs;
572    CHECK(packets->meta()->findInt64("delayUs", &delayUs));
573
574    bool isVideo = false;
575    int32_t dummy;
576    if (packets->meta()->findInt32("isVideo", &dummy)) {
577        isVideo = true;
578    }
579
580    int64_t nowUs = ALooper::GetNowUs();
581
582    if (nowUs - whenUs > 2000) {
583        ALOGI("[%s] delayUs = %lld us, delta = %lld us",
584              isVideo ? "video" : "audio", delayUs, nowUs - netTimeUs - whenUs);
585    }
586#endif
587
588#if LOG_TRANSPORT_STREAM
589    if (mLogFile != NULL) {
590        fwrite(packets->data(), 1, packets->size(), mLogFile);
591    }
592#endif
593}
594
595ssize_t Sender::appendTSData(
596        const void *data, size_t size, bool timeDiscontinuity, bool flush) {
597    CHECK_EQ(size, 188);
598
599    CHECK_LE(mTSQueue->size() + size, mTSQueue->capacity());
600
601    memcpy(mTSQueue->data() + mTSQueue->size(), data, size);
602    mTSQueue->setRange(0, mTSQueue->size() + size);
603
604    if (flush || mTSQueue->size() == mTSQueue->capacity()) {
605        // flush
606
607        int64_t nowUs = ALooper::GetNowUs();
608
609#if TRACK_BANDWIDTH
610        if (mFirstPacketTimeUs < 0ll) {
611            mFirstPacketTimeUs = nowUs;
612        }
613#endif
614
615        // 90kHz time scale
616        uint32_t rtpTime = (nowUs * 9ll) / 100ll;
617
618        uint8_t *rtp = mTSQueue->data();
619        rtp[0] = 0x80;
620        rtp[1] = 33 | (timeDiscontinuity ? (1 << 7) : 0);  // M-bit
621        rtp[2] = (mRTPSeqNo >> 8) & 0xff;
622        rtp[3] = mRTPSeqNo & 0xff;
623        rtp[4] = rtpTime >> 24;
624        rtp[5] = (rtpTime >> 16) & 0xff;
625        rtp[6] = (rtpTime >> 8) & 0xff;
626        rtp[7] = rtpTime & 0xff;
627        rtp[8] = kSourceID >> 24;
628        rtp[9] = (kSourceID >> 16) & 0xff;
629        rtp[10] = (kSourceID >> 8) & 0xff;
630        rtp[11] = kSourceID & 0xff;
631
632        ++mRTPSeqNo;
633        ++mNumRTPSent;
634        mNumRTPOctetsSent += mTSQueue->size() - 12;
635
636        mLastRTPTime = rtpTime;
637        mLastNTPTime = GetNowNTP();
638
639        if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) {
640            sp<AMessage> notify = mNotify->dup();
641            notify->setInt32("what", kWhatBinaryData);
642
643            sp<ABuffer> data = new ABuffer(mTSQueue->size());
644            memcpy(data->data(), rtp, mTSQueue->size());
645
646            notify->setInt32("channel", mRTPChannel);
647            notify->setBuffer("data", data);
648            notify->post();
649        } else {
650            sendPacket(mRTPSessionID, rtp, mTSQueue->size());
651
652#if TRACK_BANDWIDTH
653            mTotalBytesSent += mTSQueue->size();
654            int64_t delayUs = ALooper::GetNowUs() - mFirstPacketTimeUs;
655
656            if (delayUs > 0ll) {
657                ALOGI("approx. net bandwidth used: %.2f Mbit/sec",
658                        mTotalBytesSent * 8.0 / delayUs);
659            }
660#endif
661        }
662
663#if ENABLE_RETRANSMISSION
664        mTSQueue->setInt32Data(mRTPSeqNo - 1);
665
666        mHistory.push_back(mTSQueue);
667        ++mHistoryLength;
668
669        if (mHistoryLength > kMaxHistoryLength) {
670            mTSQueue = *mHistory.begin();
671            mHistory.erase(mHistory.begin());
672
673            --mHistoryLength;
674        } else {
675            mTSQueue = new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188);
676        }
677#endif
678
679        mTSQueue->setRange(0, 12);
680    }
681
682    return size;
683}
684
685void Sender::scheduleSendSR() {
686    if (mSendSRPending || mRTCPSessionID == 0) {
687        return;
688    }
689
690    mSendSRPending = true;
691    (new AMessage(kWhatSendSR, id()))->post(kSendSRIntervalUs);
692}
693
694void Sender::addSR(const sp<ABuffer> &buffer) {
695    uint8_t *data = buffer->data() + buffer->size();
696
697    // TODO: Use macros/utility functions to clean up all the bitshifts below.
698
699    data[0] = 0x80 | 0;
700    data[1] = 200;  // SR
701    data[2] = 0;
702    data[3] = 6;
703    data[4] = kSourceID >> 24;
704    data[5] = (kSourceID >> 16) & 0xff;
705    data[6] = (kSourceID >> 8) & 0xff;
706    data[7] = kSourceID & 0xff;
707
708    data[8] = mLastNTPTime >> (64 - 8);
709    data[9] = (mLastNTPTime >> (64 - 16)) & 0xff;
710    data[10] = (mLastNTPTime >> (64 - 24)) & 0xff;
711    data[11] = (mLastNTPTime >> 32) & 0xff;
712    data[12] = (mLastNTPTime >> 24) & 0xff;
713    data[13] = (mLastNTPTime >> 16) & 0xff;
714    data[14] = (mLastNTPTime >> 8) & 0xff;
715    data[15] = mLastNTPTime & 0xff;
716
717    data[16] = (mLastRTPTime >> 24) & 0xff;
718    data[17] = (mLastRTPTime >> 16) & 0xff;
719    data[18] = (mLastRTPTime >> 8) & 0xff;
720    data[19] = mLastRTPTime & 0xff;
721
722    data[20] = mNumRTPSent >> 24;
723    data[21] = (mNumRTPSent >> 16) & 0xff;
724    data[22] = (mNumRTPSent >> 8) & 0xff;
725    data[23] = mNumRTPSent & 0xff;
726
727    data[24] = mNumRTPOctetsSent >> 24;
728    data[25] = (mNumRTPOctetsSent >> 16) & 0xff;
729    data[26] = (mNumRTPOctetsSent >> 8) & 0xff;
730    data[27] = mNumRTPOctetsSent & 0xff;
731
732    buffer->setRange(buffer->offset(), buffer->size() + 28);
733}
734
735void Sender::addSDES(const sp<ABuffer> &buffer) {
736    uint8_t *data = buffer->data() + buffer->size();
737    data[0] = 0x80 | 1;
738    data[1] = 202;  // SDES
739    data[4] = kSourceID >> 24;
740    data[5] = (kSourceID >> 16) & 0xff;
741    data[6] = (kSourceID >> 8) & 0xff;
742    data[7] = kSourceID & 0xff;
743
744    size_t offset = 8;
745
746    data[offset++] = 1;  // CNAME
747
748    static const char *kCNAME = "someone@somewhere";
749    data[offset++] = strlen(kCNAME);
750
751    memcpy(&data[offset], kCNAME, strlen(kCNAME));
752    offset += strlen(kCNAME);
753
754    data[offset++] = 7;  // NOTE
755
756    static const char *kNOTE = "Hell's frozen over.";
757    data[offset++] = strlen(kNOTE);
758
759    memcpy(&data[offset], kNOTE, strlen(kNOTE));
760    offset += strlen(kNOTE);
761
762    data[offset++] = 0;
763
764    if ((offset % 4) > 0) {
765        size_t count = 4 - (offset % 4);
766        switch (count) {
767            case 3:
768                data[offset++] = 0;
769            case 2:
770                data[offset++] = 0;
771            case 1:
772                data[offset++] = 0;
773        }
774    }
775
776    size_t numWords = (offset / 4) - 1;
777    data[2] = numWords >> 8;
778    data[3] = numWords & 0xff;
779
780    buffer->setRange(buffer->offset(), buffer->size() + offset);
781}
782
783// static
784uint64_t Sender::GetNowNTP() {
785    uint64_t nowUs = ALooper::GetNowUs();
786
787    nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll;
788
789    uint64_t hi = nowUs / 1000000ll;
790    uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll;
791
792    return (hi << 32) | lo;
793}
794
795void Sender::onSendSR() {
796    sp<ABuffer> buffer = new ABuffer(1500);
797    buffer->setRange(0, 0);
798
799    addSR(buffer);
800    addSDES(buffer);
801
802    if (mTransportMode == TRANSPORT_TCP_INTERLEAVED) {
803        sp<AMessage> notify = mNotify->dup();
804        notify->setInt32("what", kWhatBinaryData);
805        notify->setInt32("channel", mRTCPChannel);
806        notify->setBuffer("data", buffer);
807        notify->post();
808    } else {
809        sendPacket(mRTCPSessionID, buffer->data(), buffer->size());
810    }
811
812    ++mNumSRsSent;
813}
814
815#if ENABLE_RETRANSMISSION
816status_t Sender::parseTSFB(
817        const uint8_t *data, size_t size) {
818    if ((data[0] & 0x1f) != 1) {
819        return ERROR_UNSUPPORTED;  // We only support NACK for now.
820    }
821
822    uint32_t srcId = U32_AT(&data[8]);
823    if (srcId != kSourceID) {
824        return ERROR_MALFORMED;
825    }
826
827    for (size_t i = 12; i < size; i += 4) {
828        uint16_t seqNo = U16_AT(&data[i]);
829        uint16_t blp = U16_AT(&data[i + 2]);
830
831        List<sp<ABuffer> >::iterator it = mHistory.begin();
832        bool foundSeqNo = false;
833        while (it != mHistory.end()) {
834            const sp<ABuffer> &buffer = *it;
835
836            uint16_t bufferSeqNo = buffer->int32Data() & 0xffff;
837
838            bool retransmit = false;
839            if (bufferSeqNo == seqNo) {
840                retransmit = true;
841            } else if (blp != 0) {
842                for (size_t i = 0; i < 16; ++i) {
843                    if ((blp & (1 << i))
844                        && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) {
845                        blp &= ~(1 << i);
846                        retransmit = true;
847                    }
848                }
849            }
850
851            if (retransmit) {
852                ALOGI("retransmitting seqNo %d", bufferSeqNo);
853
854                sp<ABuffer> retransRTP = new ABuffer(2 + buffer->size());
855                uint8_t *rtp = retransRTP->data();
856                memcpy(rtp, buffer->data(), 12);
857                rtp[2] = (mRTPRetransmissionSeqNo >> 8) & 0xff;
858                rtp[3] = mRTPRetransmissionSeqNo & 0xff;
859                rtp[12] = (bufferSeqNo >> 8) & 0xff;
860                rtp[13] = bufferSeqNo & 0xff;
861                memcpy(&rtp[14], buffer->data() + 12, buffer->size() - 12);
862
863                ++mRTPRetransmissionSeqNo;
864
865                sendPacket(
866                        mRTPRetransmissionSessionID,
867                        retransRTP->data(), retransRTP->size());
868
869                if (bufferSeqNo == seqNo) {
870                    foundSeqNo = true;
871                }
872
873                if (foundSeqNo && blp == 0) {
874                    break;
875                }
876            }
877
878            ++it;
879        }
880
881        if (!foundSeqNo || blp != 0) {
882            ALOGI("Some sequence numbers were no longer available for "
883                  "retransmission");
884        }
885    }
886
887    return OK;
888}
889#endif
890
891status_t Sender::parseRTCP(
892        const sp<ABuffer> &buffer) {
893    const uint8_t *data = buffer->data();
894    size_t size = buffer->size();
895
896    while (size > 0) {
897        if (size < 8) {
898            // Too short to be a valid RTCP header
899            return ERROR_MALFORMED;
900        }
901
902        if ((data[0] >> 6) != 2) {
903            // Unsupported version.
904            return ERROR_UNSUPPORTED;
905        }
906
907        if (data[0] & 0x20) {
908            // Padding present.
909
910            size_t paddingLength = data[size - 1];
911
912            if (paddingLength + 12 > size) {
913                // If we removed this much padding we'd end up with something
914                // that's too short to be a valid RTP header.
915                return ERROR_MALFORMED;
916            }
917
918            size -= paddingLength;
919        }
920
921        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
922
923        if (size < headerLength) {
924            // Only received a partial packet?
925            return ERROR_MALFORMED;
926        }
927
928        switch (data[1]) {
929            case 200:
930            case 201:  // RR
931            case 202:  // SDES
932            case 203:
933            case 204:  // APP
934                break;
935
936#if ENABLE_RETRANSMISSION
937            case 205:  // TSFB (transport layer specific feedback)
938                parseTSFB(data, headerLength);
939                break;
940#endif
941
942            case 206:  // PSFB (payload specific feedback)
943                hexdump(data, headerLength);
944                break;
945
946            default:
947            {
948                ALOGW("Unknown RTCP packet type %u of size %d",
949                     (unsigned)data[1], headerLength);
950                break;
951            }
952        }
953
954        data += headerLength;
955        size -= headerLength;
956    }
957
958    return OK;
959}
960
961status_t Sender::sendPacket(
962        int32_t sessionID, const void *data, size_t size) {
963    return mNetSession->sendRequest(sessionID, data, size);
964}
965
966void Sender::notifyInitDone() {
967    sp<AMessage> notify = mNotify->dup();
968    notify->setInt32("what", kWhatInitDone);
969    notify->post();
970}
971
972void Sender::notifySessionDead() {
973    sp<AMessage> notify = mNotify->dup();
974    notify->setInt32("what", kWhatSessionDead);
975    notify->post();
976}
977
978}  // namespace android
979
980