MyHandler.h revision ec29a2bfb5364a5968b77559fd13821b827d173a
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22#define LOG_TAG "MyHandler"
23#include <utils/Log.h>
24
25#include "APacketSource.h"
26#include "ARTPConnection.h"
27#include "ARTSPConnection.h"
28#include "ASessionDescription.h"
29
30#include <ctype.h>
31#include <cutils/properties.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/ALooper.h>
36#include <media/stagefright/foundation/AMessage.h>
37#include <media/stagefright/MediaErrors.h>
38
39#include <arpa/inet.h>
40#include <sys/socket.h>
41#include <netdb.h>
42
43#include "HTTPBase.h"
44
45// If no access units are received within 5 secs, assume that the rtp
46// stream has ended and signal end of stream.
47static int64_t kAccessUnitTimeoutUs = 10000000ll;
48
49// If no access units arrive for the first 10 secs after starting the
50// stream, assume none ever will and signal EOS or switch transports.
51static int64_t kStartupTimeoutUs = 10000000ll;
52
53static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
54
55namespace android {
56
57static void MakeUserAgentString(AString *s) {
58    s->setTo("stagefright/1.1 (Linux;Android ");
59
60#if (PROPERTY_VALUE_MAX < 8)
61#error "PROPERTY_VALUE_MAX must be at least 8"
62#endif
63
64    char value[PROPERTY_VALUE_MAX];
65    property_get("ro.build.version.release", value, "Unknown");
66    s->append(value);
67    s->append(")");
68}
69
70static bool GetAttribute(const char *s, const char *key, AString *value) {
71    value->clear();
72
73    size_t keyLen = strlen(key);
74
75    for (;;) {
76        while (isspace(*s)) {
77            ++s;
78        }
79
80        const char *colonPos = strchr(s, ';');
81
82        size_t len =
83            (colonPos == NULL) ? strlen(s) : colonPos - s;
84
85        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
86            value->setTo(&s[keyLen + 1], len - keyLen - 1);
87            return true;
88        }
89
90        if (colonPos == NULL) {
91            return false;
92        }
93
94        s = colonPos + 1;
95    }
96}
97
98struct MyHandler : public AHandler {
99    enum {
100        kWhatConnected                  = 'conn',
101        kWhatDisconnected               = 'disc',
102        kWhatSeekDone                   = 'sdon',
103
104        kWhatAccessUnit                 = 'accU',
105        kWhatEOS                        = 'eos!',
106        kWhatSeekDiscontinuity          = 'seeD',
107        kWhatNormalPlayTimeMapping      = 'nptM',
108    };
109
110    MyHandler(
111            const char *url,
112            const sp<AMessage> &notify,
113            bool uidValid = false, uid_t uid = 0)
114        : mNotify(notify),
115          mUIDValid(uidValid),
116          mUID(uid),
117          mNetLooper(new ALooper),
118          mConn(new ARTSPConnection(mUIDValid, mUID)),
119          mRTPConn(new ARTPConnection),
120          mOriginalSessionURL(url),
121          mSessionURL(url),
122          mSetupTracksSuccessful(false),
123          mSeekPending(false),
124          mFirstAccessUnit(true),
125          mAllTracksHaveTime(false),
126          mNTPAnchorUs(-1),
127          mMediaAnchorUs(-1),
128          mLastMediaTimeUs(0),
129          mNumAccessUnitsReceived(0),
130          mCheckPending(false),
131          mCheckGeneration(0),
132          mTryTCPInterleaving(false),
133          mTryFakeRTCP(false),
134          mReceivedFirstRTCPPacket(false),
135          mReceivedFirstRTPPacket(false),
136          mSeekable(true),
137          mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
138          mKeepAliveGeneration(0) {
139        mNetLooper->setName("rtsp net");
140        mNetLooper->start(false /* runOnCallingThread */,
141                          false /* canCallJava */,
142                          PRIORITY_HIGHEST);
143
144        // Strip any authentication info from the session url, we don't
145        // want to transmit user/pass in cleartext.
146        AString host, path, user, pass;
147        unsigned port;
148        CHECK(ARTSPConnection::ParseURL(
149                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
150
151        if (user.size() > 0) {
152            mSessionURL.clear();
153            mSessionURL.append("rtsp://");
154            mSessionURL.append(host);
155            mSessionURL.append(":");
156            mSessionURL.append(StringPrintf("%u", port));
157            mSessionURL.append(path);
158
159            ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
160        }
161
162        mSessionHost = host;
163    }
164
165    void connect() {
166        looper()->registerHandler(mConn);
167        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
168
169        sp<AMessage> notify = new AMessage('biny', id());
170        mConn->observeBinaryData(notify);
171
172        sp<AMessage> reply = new AMessage('conn', id());
173        mConn->connect(mOriginalSessionURL.c_str(), reply);
174    }
175
176    void loadSDP(const sp<ASessionDescription>& desc) {
177        looper()->registerHandler(mConn);
178        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
179
180        sp<AMessage> notify = new AMessage('biny', id());
181        mConn->observeBinaryData(notify);
182
183        sp<AMessage> reply = new AMessage('sdpl', id());
184        reply->setObject("description", desc);
185        mConn->connect(mOriginalSessionURL.c_str(), reply);
186    }
187
188    void disconnect() {
189        (new AMessage('abor', id()))->post();
190    }
191
192    void seek(int64_t timeUs) {
193        sp<AMessage> msg = new AMessage('seek', id());
194        msg->setInt64("time", timeUs);
195        msg->post();
196    }
197
198    static void addRR(const sp<ABuffer> &buf) {
199        uint8_t *ptr = buf->data() + buf->size();
200        ptr[0] = 0x80 | 0;
201        ptr[1] = 201;  // RR
202        ptr[2] = 0;
203        ptr[3] = 1;
204        ptr[4] = 0xde;  // SSRC
205        ptr[5] = 0xad;
206        ptr[6] = 0xbe;
207        ptr[7] = 0xef;
208
209        buf->setRange(0, buf->size() + 8);
210    }
211
212    static void addSDES(int s, const sp<ABuffer> &buffer) {
213        struct sockaddr_in addr;
214        socklen_t addrSize = sizeof(addr);
215        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
216
217        uint8_t *data = buffer->data() + buffer->size();
218        data[0] = 0x80 | 1;
219        data[1] = 202;  // SDES
220        data[4] = 0xde;  // SSRC
221        data[5] = 0xad;
222        data[6] = 0xbe;
223        data[7] = 0xef;
224
225        size_t offset = 8;
226
227        data[offset++] = 1;  // CNAME
228
229        AString cname = "stagefright@";
230        cname.append(inet_ntoa(addr.sin_addr));
231        data[offset++] = cname.size();
232
233        memcpy(&data[offset], cname.c_str(), cname.size());
234        offset += cname.size();
235
236        data[offset++] = 6;  // TOOL
237
238        AString tool;
239        MakeUserAgentString(&tool);
240
241        data[offset++] = tool.size();
242
243        memcpy(&data[offset], tool.c_str(), tool.size());
244        offset += tool.size();
245
246        data[offset++] = 0;
247
248        if ((offset % 4) > 0) {
249            size_t count = 4 - (offset % 4);
250            switch (count) {
251                case 3:
252                    data[offset++] = 0;
253                case 2:
254                    data[offset++] = 0;
255                case 1:
256                    data[offset++] = 0;
257            }
258        }
259
260        size_t numWords = (offset / 4) - 1;
261        data[2] = numWords >> 8;
262        data[3] = numWords & 0xff;
263
264        buffer->setRange(buffer->offset(), buffer->size() + offset);
265    }
266
267    // In case we're behind NAT, fire off two UDP packets to the remote
268    // rtp/rtcp ports to poke a hole into the firewall for future incoming
269    // packets. We're going to send an RR/SDES RTCP packet to both of them.
270    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
271        struct sockaddr_in addr;
272        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
273        addr.sin_family = AF_INET;
274
275        AString source;
276        AString server_port;
277        if (!GetAttribute(transport.c_str(),
278                          "source",
279                          &source)) {
280            ALOGW("Missing 'source' field in Transport response. Using "
281                 "RTSP endpoint address.");
282
283            struct hostent *ent = gethostbyname(mSessionHost.c_str());
284            if (ent == NULL) {
285                ALOGE("Failed to look up address of session host '%s'",
286                     mSessionHost.c_str());
287
288                return false;
289            }
290
291            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
292        } else {
293            addr.sin_addr.s_addr = inet_addr(source.c_str());
294        }
295
296        if (!GetAttribute(transport.c_str(),
297                                 "server_port",
298                                 &server_port)) {
299            ALOGI("Missing 'server_port' field in Transport response.");
300            return false;
301        }
302
303        int rtpPort, rtcpPort;
304        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
305                || rtpPort <= 0 || rtpPort > 65535
306                || rtcpPort <=0 || rtcpPort > 65535
307                || rtcpPort != rtpPort + 1) {
308            ALOGE("Server picked invalid RTP/RTCP port pair %s,"
309                 " RTP port must be even, RTCP port must be one higher.",
310                 server_port.c_str());
311
312            return false;
313        }
314
315        if (rtpPort & 1) {
316            ALOGW("Server picked an odd RTP port, it should've picked an "
317                 "even one, we'll let it pass for now, but this may break "
318                 "in the future.");
319        }
320
321        if (addr.sin_addr.s_addr == INADDR_NONE) {
322            return true;
323        }
324
325        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
326            // No firewalls to traverse on the loopback interface.
327            return true;
328        }
329
330        // Make up an RR/SDES RTCP packet.
331        sp<ABuffer> buf = new ABuffer(65536);
332        buf->setRange(0, 0);
333        addRR(buf);
334        addSDES(rtpSocket, buf);
335
336        addr.sin_port = htons(rtpPort);
337
338        ssize_t n = sendto(
339                rtpSocket, buf->data(), buf->size(), 0,
340                (const sockaddr *)&addr, sizeof(addr));
341
342        if (n < (ssize_t)buf->size()) {
343            ALOGE("failed to poke a hole for RTP packets");
344            return false;
345        }
346
347        addr.sin_port = htons(rtcpPort);
348
349        n = sendto(
350                rtcpSocket, buf->data(), buf->size(), 0,
351                (const sockaddr *)&addr, sizeof(addr));
352
353        if (n < (ssize_t)buf->size()) {
354            ALOGE("failed to poke a hole for RTCP packets");
355            return false;
356        }
357
358        ALOGV("successfully poked holes.");
359
360        return true;
361    }
362
363    static bool isLiveStream(const sp<ASessionDescription> &desc) {
364        AString attrLiveStream;
365        if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
366            ssize_t semicolonPos = attrLiveStream.find(";", 2);
367
368            const char* liveStreamValue;
369            if (semicolonPos < 0) {
370                liveStreamValue = attrLiveStream.c_str();
371            } else {
372                AString valString;
373                valString.setTo(attrLiveStream,
374                        semicolonPos + 1,
375                        attrLiveStream.size() - semicolonPos - 1);
376                liveStreamValue = valString.c_str();
377            }
378
379            uint32_t value = strtoul(liveStreamValue, NULL, 10);
380            if (value == 1) {
381                ALOGV("found live stream");
382                return true;
383            }
384        } else {
385            // It is a live stream if no duration is returned
386            int64_t durationUs;
387            if (!desc->getDurationUs(&durationUs)) {
388                ALOGV("No duration found, assume live stream");
389                return true;
390            }
391        }
392
393        return false;
394    }
395
396    virtual void onMessageReceived(const sp<AMessage> &msg) {
397        switch (msg->what()) {
398            case 'conn':
399            {
400                int32_t result;
401                CHECK(msg->findInt32("result", &result));
402
403                ALOGI("connection request completed with result %d (%s)",
404                     result, strerror(-result));
405
406                if (result == OK) {
407                    AString request;
408                    request = "DESCRIBE ";
409                    request.append(mSessionURL);
410                    request.append(" RTSP/1.0\r\n");
411                    request.append("Accept: application/sdp\r\n");
412                    request.append("\r\n");
413
414                    sp<AMessage> reply = new AMessage('desc', id());
415                    mConn->sendRequest(request.c_str(), reply);
416                } else {
417                    (new AMessage('disc', id()))->post();
418                }
419                break;
420            }
421
422            case 'disc':
423            {
424                ++mKeepAliveGeneration;
425
426                int32_t reconnect;
427                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
428                    sp<AMessage> reply = new AMessage('conn', id());
429                    mConn->connect(mOriginalSessionURL.c_str(), reply);
430                } else {
431                    (new AMessage('quit', id()))->post();
432                }
433                break;
434            }
435
436            case 'desc':
437            {
438                int32_t result;
439                CHECK(msg->findInt32("result", &result));
440
441                ALOGI("DESCRIBE completed with result %d (%s)",
442                     result, strerror(-result));
443
444                if (result == OK) {
445                    sp<RefBase> obj;
446                    CHECK(msg->findObject("response", &obj));
447                    sp<ARTSPResponse> response =
448                        static_cast<ARTSPResponse *>(obj.get());
449
450                    if (response->mStatusCode == 302) {
451                        ssize_t i = response->mHeaders.indexOfKey("location");
452                        CHECK_GE(i, 0);
453
454                        mSessionURL = response->mHeaders.valueAt(i);
455
456                        AString request;
457                        request = "DESCRIBE ";
458                        request.append(mSessionURL);
459                        request.append(" RTSP/1.0\r\n");
460                        request.append("Accept: application/sdp\r\n");
461                        request.append("\r\n");
462
463                        sp<AMessage> reply = new AMessage('desc', id());
464                        mConn->sendRequest(request.c_str(), reply);
465                        break;
466                    }
467
468                    if (response->mStatusCode != 200) {
469                        result = UNKNOWN_ERROR;
470                    } else {
471                        mSessionDesc = new ASessionDescription;
472
473                        mSessionDesc->setTo(
474                                response->mContent->data(),
475                                response->mContent->size());
476
477                        if (!mSessionDesc->isValid()) {
478                            ALOGE("Failed to parse session description.");
479                            result = ERROR_MALFORMED;
480                        } else {
481                            ssize_t i = response->mHeaders.indexOfKey("content-base");
482                            if (i >= 0) {
483                                mBaseURL = response->mHeaders.valueAt(i);
484                            } else {
485                                i = response->mHeaders.indexOfKey("content-location");
486                                if (i >= 0) {
487                                    mBaseURL = response->mHeaders.valueAt(i);
488                                } else {
489                                    mBaseURL = mSessionURL;
490                                }
491                            }
492
493                            mSeekable = !isLiveStream(mSessionDesc);
494
495                            if (!mBaseURL.startsWith("rtsp://")) {
496                                // Some misbehaving servers specify a relative
497                                // URL in one of the locations above, combine
498                                // it with the absolute session URL to get
499                                // something usable...
500
501                                ALOGW("Server specified a non-absolute base URL"
502                                     ", combining it with the session URL to "
503                                     "get something usable...");
504
505                                AString tmp;
506                                CHECK(MakeURL(
507                                            mSessionURL.c_str(),
508                                            mBaseURL.c_str(),
509                                            &tmp));
510
511                                mBaseURL = tmp;
512                            }
513
514                            if (mSessionDesc->countTracks() < 2) {
515                                // There's no actual tracks in this session.
516                                // The first "track" is merely session meta
517                                // data.
518
519                                ALOGW("Session doesn't contain any playable "
520                                     "tracks. Aborting.");
521                                result = ERROR_UNSUPPORTED;
522                            } else {
523                                setupTrack(1);
524                            }
525                        }
526                    }
527                }
528
529                if (result != OK) {
530                    sp<AMessage> reply = new AMessage('disc', id());
531                    mConn->disconnect(reply);
532                }
533                break;
534            }
535
536            case 'sdpl':
537            {
538                int32_t result;
539                CHECK(msg->findInt32("result", &result));
540
541                ALOGI("SDP connection request completed with result %d (%s)",
542                     result, strerror(-result));
543
544                if (result == OK) {
545                    sp<RefBase> obj;
546                    CHECK(msg->findObject("description", &obj));
547                    mSessionDesc =
548                        static_cast<ASessionDescription *>(obj.get());
549
550                    if (!mSessionDesc->isValid()) {
551                        ALOGE("Failed to parse session description.");
552                        result = ERROR_MALFORMED;
553                    } else {
554                        mBaseURL = mSessionURL;
555
556                        mSeekable = !isLiveStream(mSessionDesc);
557
558                        if (mSessionDesc->countTracks() < 2) {
559                            // There's no actual tracks in this session.
560                            // The first "track" is merely session meta
561                            // data.
562
563                            ALOGW("Session doesn't contain any playable "
564                                 "tracks. Aborting.");
565                            result = ERROR_UNSUPPORTED;
566                        } else {
567                            setupTrack(1);
568                        }
569                    }
570                }
571
572                if (result != OK) {
573                    sp<AMessage> reply = new AMessage('disc', id());
574                    mConn->disconnect(reply);
575                }
576                break;
577            }
578
579            case 'setu':
580            {
581                size_t index;
582                CHECK(msg->findSize("index", &index));
583
584                TrackInfo *track = NULL;
585                size_t trackIndex;
586                if (msg->findSize("track-index", &trackIndex)) {
587                    track = &mTracks.editItemAt(trackIndex);
588                }
589
590                int32_t result;
591                CHECK(msg->findInt32("result", &result));
592
593                ALOGI("SETUP(%d) completed with result %d (%s)",
594                     index, result, strerror(-result));
595
596                if (result == OK) {
597                    CHECK(track != NULL);
598
599                    sp<RefBase> obj;
600                    CHECK(msg->findObject("response", &obj));
601                    sp<ARTSPResponse> response =
602                        static_cast<ARTSPResponse *>(obj.get());
603
604                    if (response->mStatusCode != 200) {
605                        result = UNKNOWN_ERROR;
606                    } else {
607                        ssize_t i = response->mHeaders.indexOfKey("session");
608                        CHECK_GE(i, 0);
609
610                        mSessionID = response->mHeaders.valueAt(i);
611
612                        mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
613                        AString timeoutStr;
614                        if (GetAttribute(
615                                    mSessionID.c_str(), "timeout", &timeoutStr)) {
616                            char *end;
617                            unsigned long timeoutSecs =
618                                strtoul(timeoutStr.c_str(), &end, 10);
619
620                            if (end == timeoutStr.c_str() || *end != '\0') {
621                                ALOGW("server specified malformed timeout '%s'",
622                                     timeoutStr.c_str());
623
624                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
625                            } else if (timeoutSecs < 15) {
626                                ALOGW("server specified too short a timeout "
627                                     "(%lu secs), using default.",
628                                     timeoutSecs);
629
630                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
631                            } else {
632                                mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
633
634                                ALOGI("server specified timeout of %lu secs.",
635                                     timeoutSecs);
636                            }
637                        }
638
639                        i = mSessionID.find(";");
640                        if (i >= 0) {
641                            // Remove options, i.e. ";timeout=90"
642                            mSessionID.erase(i, mSessionID.size() - i);
643                        }
644
645                        sp<AMessage> notify = new AMessage('accu', id());
646                        notify->setSize("track-index", trackIndex);
647
648                        i = response->mHeaders.indexOfKey("transport");
649                        CHECK_GE(i, 0);
650
651                        if (!track->mUsingInterleavedTCP) {
652                            AString transport = response->mHeaders.valueAt(i);
653
654                            // We are going to continue even if we were
655                            // unable to poke a hole into the firewall...
656                            pokeAHole(
657                                    track->mRTPSocket,
658                                    track->mRTCPSocket,
659                                    transport);
660                        }
661
662                        mRTPConn->addStream(
663                                track->mRTPSocket, track->mRTCPSocket,
664                                mSessionDesc, index,
665                                notify, track->mUsingInterleavedTCP);
666
667                        mSetupTracksSuccessful = true;
668                    }
669                }
670
671                if (result != OK) {
672                    if (track) {
673                        if (!track->mUsingInterleavedTCP) {
674                            // Clear the tag
675                            if (mUIDValid) {
676                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
677                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
678                            }
679
680                            close(track->mRTPSocket);
681                            close(track->mRTCPSocket);
682                        }
683
684                        mTracks.removeItemsAt(trackIndex);
685                    }
686                }
687
688                ++index;
689                if (index < mSessionDesc->countTracks()) {
690                    setupTrack(index);
691                } else if (mSetupTracksSuccessful) {
692                    ++mKeepAliveGeneration;
693                    postKeepAlive();
694
695                    AString request = "PLAY ";
696                    request.append(mSessionURL);
697                    request.append(" RTSP/1.0\r\n");
698
699                    request.append("Session: ");
700                    request.append(mSessionID);
701                    request.append("\r\n");
702
703                    request.append("\r\n");
704
705                    sp<AMessage> reply = new AMessage('play', id());
706                    mConn->sendRequest(request.c_str(), reply);
707                } else {
708                    sp<AMessage> reply = new AMessage('disc', id());
709                    mConn->disconnect(reply);
710                }
711                break;
712            }
713
714            case 'play':
715            {
716                int32_t result;
717                CHECK(msg->findInt32("result", &result));
718
719                ALOGI("PLAY completed with result %d (%s)",
720                     result, strerror(-result));
721
722                if (result == OK) {
723                    sp<RefBase> obj;
724                    CHECK(msg->findObject("response", &obj));
725                    sp<ARTSPResponse> response =
726                        static_cast<ARTSPResponse *>(obj.get());
727
728                    if (response->mStatusCode != 200) {
729                        result = UNKNOWN_ERROR;
730                    } else {
731                        parsePlayResponse(response);
732
733                        sp<AMessage> timeout = new AMessage('tiou', id());
734                        timeout->post(kStartupTimeoutUs);
735                    }
736                }
737
738                if (result != OK) {
739                    sp<AMessage> reply = new AMessage('disc', id());
740                    mConn->disconnect(reply);
741                }
742
743                break;
744            }
745
746            case 'aliv':
747            {
748                int32_t generation;
749                CHECK(msg->findInt32("generation", &generation));
750
751                if (generation != mKeepAliveGeneration) {
752                    // obsolete event.
753                    break;
754                }
755
756                AString request;
757                request.append("OPTIONS ");
758                request.append(mSessionURL);
759                request.append(" RTSP/1.0\r\n");
760                request.append("Session: ");
761                request.append(mSessionID);
762                request.append("\r\n");
763                request.append("\r\n");
764
765                sp<AMessage> reply = new AMessage('opts', id());
766                reply->setInt32("generation", mKeepAliveGeneration);
767                mConn->sendRequest(request.c_str(), reply);
768                break;
769            }
770
771            case 'opts':
772            {
773                int32_t result;
774                CHECK(msg->findInt32("result", &result));
775
776                ALOGI("OPTIONS completed with result %d (%s)",
777                     result, strerror(-result));
778
779                int32_t generation;
780                CHECK(msg->findInt32("generation", &generation));
781
782                if (generation != mKeepAliveGeneration) {
783                    // obsolete event.
784                    break;
785                }
786
787                postKeepAlive();
788                break;
789            }
790
791            case 'abor':
792            {
793                for (size_t i = 0; i < mTracks.size(); ++i) {
794                    TrackInfo *info = &mTracks.editItemAt(i);
795
796                    if (!mFirstAccessUnit) {
797                        postQueueEOS(i, ERROR_END_OF_STREAM);
798                    }
799
800                    if (!info->mUsingInterleavedTCP) {
801                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
802
803                        // Clear the tag
804                        if (mUIDValid) {
805                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
806                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
807                        }
808
809                        close(info->mRTPSocket);
810                        close(info->mRTCPSocket);
811                    }
812                }
813                mTracks.clear();
814                mSetupTracksSuccessful = false;
815                mSeekPending = false;
816                mFirstAccessUnit = true;
817                mAllTracksHaveTime = false;
818                mNTPAnchorUs = -1;
819                mMediaAnchorUs = -1;
820                mNumAccessUnitsReceived = 0;
821                mReceivedFirstRTCPPacket = false;
822                mReceivedFirstRTPPacket = false;
823                mSeekable = true;
824
825                sp<AMessage> reply = new AMessage('tear', id());
826
827                int32_t reconnect;
828                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
829                    reply->setInt32("reconnect", true);
830                }
831
832                AString request;
833                request = "TEARDOWN ";
834
835                // XXX should use aggregate url from SDP here...
836                request.append(mSessionURL);
837                request.append(" RTSP/1.0\r\n");
838
839                request.append("Session: ");
840                request.append(mSessionID);
841                request.append("\r\n");
842
843                request.append("\r\n");
844
845                mConn->sendRequest(request.c_str(), reply);
846                break;
847            }
848
849            case 'tear':
850            {
851                int32_t result;
852                CHECK(msg->findInt32("result", &result));
853
854                ALOGI("TEARDOWN completed with result %d (%s)",
855                     result, strerror(-result));
856
857                sp<AMessage> reply = new AMessage('disc', id());
858
859                int32_t reconnect;
860                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
861                    reply->setInt32("reconnect", true);
862                }
863
864                mConn->disconnect(reply);
865                break;
866            }
867
868            case 'quit':
869            {
870                sp<AMessage> msg = mNotify->dup();
871                msg->setInt32("what", kWhatDisconnected);
872                msg->setInt32("result", UNKNOWN_ERROR);
873                msg->post();
874                break;
875            }
876
877            case 'chek':
878            {
879                int32_t generation;
880                CHECK(msg->findInt32("generation", &generation));
881                if (generation != mCheckGeneration) {
882                    // This is an outdated message. Ignore.
883                    break;
884                }
885
886                if (mNumAccessUnitsReceived == 0) {
887#if 1
888                    ALOGI("stream ended? aborting.");
889                    (new AMessage('abor', id()))->post();
890                    break;
891#else
892                    ALOGI("haven't seen an AU in a looong time.");
893#endif
894                }
895
896                mNumAccessUnitsReceived = 0;
897                msg->post(kAccessUnitTimeoutUs);
898                break;
899            }
900
901            case 'accu':
902            {
903                int32_t timeUpdate;
904                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
905                    size_t trackIndex;
906                    CHECK(msg->findSize("track-index", &trackIndex));
907
908                    uint32_t rtpTime;
909                    uint64_t ntpTime;
910                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
911                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
912
913                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
914                    break;
915                }
916
917                int32_t first;
918                if (msg->findInt32("first-rtcp", &first)) {
919                    mReceivedFirstRTCPPacket = true;
920                    break;
921                }
922
923                if (msg->findInt32("first-rtp", &first)) {
924                    mReceivedFirstRTPPacket = true;
925                    break;
926                }
927
928                ++mNumAccessUnitsReceived;
929                postAccessUnitTimeoutCheck();
930
931                size_t trackIndex;
932                CHECK(msg->findSize("track-index", &trackIndex));
933
934                if (trackIndex >= mTracks.size()) {
935                    ALOGV("late packets ignored.");
936                    break;
937                }
938
939                TrackInfo *track = &mTracks.editItemAt(trackIndex);
940
941                int32_t eos;
942                if (msg->findInt32("eos", &eos)) {
943                    ALOGI("received BYE on track index %d", trackIndex);
944#if 0
945                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
946#endif
947                    return;
948                }
949
950                sp<ABuffer> accessUnit;
951                CHECK(msg->findBuffer("access-unit", &accessUnit));
952
953                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
954
955                if (mSeekPending) {
956                    ALOGV("we're seeking, dropping stale packet.");
957                    break;
958                }
959
960                if (seqNum < track->mFirstSeqNumInSegment) {
961                    ALOGV("dropping stale access-unit (%d < %d)",
962                         seqNum, track->mFirstSeqNumInSegment);
963                    break;
964                }
965
966                if (track->mNewSegment) {
967                    track->mNewSegment = false;
968                }
969
970                onAccessUnitComplete(trackIndex, accessUnit);
971                break;
972            }
973
974            case 'seek':
975            {
976                if (!mSeekable) {
977                    ALOGW("This is a live stream, ignoring seek request.");
978
979                    sp<AMessage> msg = mNotify->dup();
980                    msg->setInt32("what", kWhatSeekDone);
981                    msg->post();
982                    break;
983                }
984
985                int64_t timeUs;
986                CHECK(msg->findInt64("time", &timeUs));
987
988                mSeekPending = true;
989
990                // Disable the access unit timeout until we resumed
991                // playback again.
992                mCheckPending = true;
993                ++mCheckGeneration;
994
995                AString request = "PAUSE ";
996                request.append(mSessionURL);
997                request.append(" RTSP/1.0\r\n");
998
999                request.append("Session: ");
1000                request.append(mSessionID);
1001                request.append("\r\n");
1002
1003                request.append("\r\n");
1004
1005                sp<AMessage> reply = new AMessage('see1', id());
1006                reply->setInt64("time", timeUs);
1007                mConn->sendRequest(request.c_str(), reply);
1008                break;
1009            }
1010
1011            case 'see1':
1012            {
1013                // Session is paused now.
1014                for (size_t i = 0; i < mTracks.size(); ++i) {
1015                    TrackInfo *info = &mTracks.editItemAt(i);
1016
1017                    postQueueSeekDiscontinuity(i);
1018
1019                    info->mRTPAnchor = 0;
1020                    info->mNTPAnchorUs = -1;
1021                }
1022
1023                mAllTracksHaveTime = false;
1024                mNTPAnchorUs = -1;
1025
1026                int64_t timeUs;
1027                CHECK(msg->findInt64("time", &timeUs));
1028
1029                AString request = "PLAY ";
1030                request.append(mSessionURL);
1031                request.append(" RTSP/1.0\r\n");
1032
1033                request.append("Session: ");
1034                request.append(mSessionID);
1035                request.append("\r\n");
1036
1037                request.append(
1038                        StringPrintf(
1039                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1040
1041                request.append("\r\n");
1042
1043                sp<AMessage> reply = new AMessage('see2', id());
1044                mConn->sendRequest(request.c_str(), reply);
1045                break;
1046            }
1047
1048            case 'see2':
1049            {
1050                CHECK(mSeekPending);
1051
1052                int32_t result;
1053                CHECK(msg->findInt32("result", &result));
1054
1055                ALOGI("PLAY completed with result %d (%s)",
1056                     result, strerror(-result));
1057
1058                mCheckPending = false;
1059                postAccessUnitTimeoutCheck();
1060
1061                if (result == OK) {
1062                    sp<RefBase> obj;
1063                    CHECK(msg->findObject("response", &obj));
1064                    sp<ARTSPResponse> response =
1065                        static_cast<ARTSPResponse *>(obj.get());
1066
1067                    if (response->mStatusCode != 200) {
1068                        result = UNKNOWN_ERROR;
1069                    } else {
1070                        parsePlayResponse(response);
1071
1072                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1073                        CHECK_GE(i, 0);
1074
1075                        ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1076
1077                        ALOGI("seek completed.");
1078                    }
1079                }
1080
1081                if (result != OK) {
1082                    ALOGE("seek failed, aborting.");
1083                    (new AMessage('abor', id()))->post();
1084                }
1085
1086                mSeekPending = false;
1087
1088                sp<AMessage> msg = mNotify->dup();
1089                msg->setInt32("what", kWhatSeekDone);
1090                msg->post();
1091                break;
1092            }
1093
1094            case 'biny':
1095            {
1096                sp<ABuffer> buffer;
1097                CHECK(msg->findBuffer("buffer", &buffer));
1098
1099                int32_t index;
1100                CHECK(buffer->meta()->findInt32("index", &index));
1101
1102                mRTPConn->injectPacket(index, buffer);
1103                break;
1104            }
1105
1106            case 'tiou':
1107            {
1108                if (!mReceivedFirstRTCPPacket) {
1109                    if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
1110                        ALOGW("We received RTP packets but no RTCP packets, "
1111                             "using fake timestamps.");
1112
1113                        mTryFakeRTCP = true;
1114
1115                        mReceivedFirstRTCPPacket = true;
1116
1117                        fakeTimestamps();
1118                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1119                        ALOGW("Never received any data, switching transports.");
1120
1121                        mTryTCPInterleaving = true;
1122
1123                        sp<AMessage> msg = new AMessage('abor', id());
1124                        msg->setInt32("reconnect", true);
1125                        msg->post();
1126                    } else {
1127                        ALOGW("Never received any data, disconnecting.");
1128                        (new AMessage('abor', id()))->post();
1129                    }
1130                } else {
1131                    if (!mAllTracksHaveTime) {
1132                        ALOGW("We received some RTCP packets, but time "
1133                              "could not be established on all tracks, now "
1134                              "using fake timestamps");
1135
1136                        fakeTimestamps();
1137                    }
1138                }
1139                break;
1140            }
1141
1142            default:
1143                TRESPASS();
1144                break;
1145        }
1146    }
1147
1148    void postKeepAlive() {
1149        sp<AMessage> msg = new AMessage('aliv', id());
1150        msg->setInt32("generation", mKeepAliveGeneration);
1151        msg->post((mKeepAliveTimeoutUs * 9) / 10);
1152    }
1153
1154    void postAccessUnitTimeoutCheck() {
1155        if (mCheckPending) {
1156            return;
1157        }
1158
1159        mCheckPending = true;
1160        sp<AMessage> check = new AMessage('chek', id());
1161        check->setInt32("generation", mCheckGeneration);
1162        check->post(kAccessUnitTimeoutUs);
1163    }
1164
1165    static void SplitString(
1166            const AString &s, const char *separator, List<AString> *items) {
1167        items->clear();
1168        size_t start = 0;
1169        while (start < s.size()) {
1170            ssize_t offset = s.find(separator, start);
1171
1172            if (offset < 0) {
1173                items->push_back(AString(s, start, s.size() - start));
1174                break;
1175            }
1176
1177            items->push_back(AString(s, start, offset - start));
1178            start = offset + strlen(separator);
1179        }
1180    }
1181
1182    void parsePlayResponse(const sp<ARTSPResponse> &response) {
1183        if (mTracks.size() == 0) {
1184            ALOGV("parsePlayResponse: late packets ignored.");
1185            return;
1186        }
1187
1188        ssize_t i = response->mHeaders.indexOfKey("range");
1189        if (i < 0) {
1190            // Server doesn't even tell use what range it is going to
1191            // play, therefore we won't support seeking.
1192            return;
1193        }
1194
1195        AString range = response->mHeaders.valueAt(i);
1196        ALOGV("Range: %s", range.c_str());
1197
1198        AString val;
1199        CHECK(GetAttribute(range.c_str(), "npt", &val));
1200
1201        float npt1, npt2;
1202        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1203            // This is a live stream and therefore not seekable.
1204
1205            ALOGI("This is a live stream");
1206            return;
1207        }
1208
1209        i = response->mHeaders.indexOfKey("rtp-info");
1210        CHECK_GE(i, 0);
1211
1212        AString rtpInfo = response->mHeaders.valueAt(i);
1213        List<AString> streamInfos;
1214        SplitString(rtpInfo, ",", &streamInfos);
1215
1216        int n = 1;
1217        for (List<AString>::iterator it = streamInfos.begin();
1218             it != streamInfos.end(); ++it) {
1219            (*it).trim();
1220            ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1221
1222            CHECK(GetAttribute((*it).c_str(), "url", &val));
1223
1224            size_t trackIndex = 0;
1225            while (trackIndex < mTracks.size()
1226                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1227                ++trackIndex;
1228            }
1229            CHECK_LT(trackIndex, mTracks.size());
1230
1231            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1232
1233            char *end;
1234            unsigned long seq = strtoul(val.c_str(), &end, 10);
1235
1236            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1237            info->mFirstSeqNumInSegment = seq;
1238            info->mNewSegment = true;
1239
1240            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1241
1242            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1243
1244            ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1245
1246            info->mNormalPlayTimeRTP = rtpTime;
1247            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1248
1249            if (!mFirstAccessUnit) {
1250                postNormalPlayTimeMapping(
1251                        trackIndex,
1252                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1253            }
1254
1255            ++n;
1256        }
1257    }
1258
1259    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1260        CHECK_GE(index, 0u);
1261        CHECK_LT(index, mTracks.size());
1262
1263        const TrackInfo &info = mTracks.itemAt(index);
1264
1265        *timeScale = info.mTimeScale;
1266
1267        return info.mPacketSource->getFormat();
1268    }
1269
1270    size_t countTracks() const {
1271        return mTracks.size();
1272    }
1273
1274private:
1275    struct TrackInfo {
1276        AString mURL;
1277        int mRTPSocket;
1278        int mRTCPSocket;
1279        bool mUsingInterleavedTCP;
1280        uint32_t mFirstSeqNumInSegment;
1281        bool mNewSegment;
1282
1283        uint32_t mRTPAnchor;
1284        int64_t mNTPAnchorUs;
1285        int32_t mTimeScale;
1286
1287        uint32_t mNormalPlayTimeRTP;
1288        int64_t mNormalPlayTimeUs;
1289
1290        sp<APacketSource> mPacketSource;
1291
1292        // Stores packets temporarily while no notion of time
1293        // has been established yet.
1294        List<sp<ABuffer> > mPackets;
1295    };
1296
1297    sp<AMessage> mNotify;
1298    bool mUIDValid;
1299    uid_t mUID;
1300    sp<ALooper> mNetLooper;
1301    sp<ARTSPConnection> mConn;
1302    sp<ARTPConnection> mRTPConn;
1303    sp<ASessionDescription> mSessionDesc;
1304    AString mOriginalSessionURL;  // This one still has user:pass@
1305    AString mSessionURL;
1306    AString mSessionHost;
1307    AString mBaseURL;
1308    AString mSessionID;
1309    bool mSetupTracksSuccessful;
1310    bool mSeekPending;
1311    bool mFirstAccessUnit;
1312
1313    bool mAllTracksHaveTime;
1314    int64_t mNTPAnchorUs;
1315    int64_t mMediaAnchorUs;
1316    int64_t mLastMediaTimeUs;
1317
1318    int64_t mNumAccessUnitsReceived;
1319    bool mCheckPending;
1320    int32_t mCheckGeneration;
1321    bool mTryTCPInterleaving;
1322    bool mTryFakeRTCP;
1323    bool mReceivedFirstRTCPPacket;
1324    bool mReceivedFirstRTPPacket;
1325    bool mSeekable;
1326    int64_t mKeepAliveTimeoutUs;
1327    int32_t mKeepAliveGeneration;
1328
1329    Vector<TrackInfo> mTracks;
1330
1331    void setupTrack(size_t index) {
1332        sp<APacketSource> source =
1333            new APacketSource(mSessionDesc, index);
1334
1335        if (source->initCheck() != OK) {
1336            ALOGW("Unsupported format. Ignoring track #%d.", index);
1337
1338            sp<AMessage> reply = new AMessage('setu', id());
1339            reply->setSize("index", index);
1340            reply->setInt32("result", ERROR_UNSUPPORTED);
1341            reply->post();
1342            return;
1343        }
1344
1345        AString url;
1346        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1347
1348        AString trackURL;
1349        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1350
1351        mTracks.push(TrackInfo());
1352        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1353        info->mURL = trackURL;
1354        info->mPacketSource = source;
1355        info->mUsingInterleavedTCP = false;
1356        info->mFirstSeqNumInSegment = 0;
1357        info->mNewSegment = true;
1358        info->mRTPAnchor = 0;
1359        info->mNTPAnchorUs = -1;
1360        info->mNormalPlayTimeRTP = 0;
1361        info->mNormalPlayTimeUs = 0ll;
1362
1363        unsigned long PT;
1364        AString formatDesc;
1365        AString formatParams;
1366        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1367
1368        int32_t timescale;
1369        int32_t numChannels;
1370        ASessionDescription::ParseFormatDesc(
1371                formatDesc.c_str(), &timescale, &numChannels);
1372
1373        info->mTimeScale = timescale;
1374
1375        ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1376
1377        AString request = "SETUP ";
1378        request.append(trackURL);
1379        request.append(" RTSP/1.0\r\n");
1380
1381        if (mTryTCPInterleaving) {
1382            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1383            info->mUsingInterleavedTCP = true;
1384            info->mRTPSocket = interleaveIndex;
1385            info->mRTCPSocket = interleaveIndex + 1;
1386
1387            request.append("Transport: RTP/AVP/TCP;interleaved=");
1388            request.append(interleaveIndex);
1389            request.append("-");
1390            request.append(interleaveIndex + 1);
1391        } else {
1392            unsigned rtpPort;
1393            ARTPConnection::MakePortPair(
1394                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1395
1396            if (mUIDValid) {
1397                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1398                                                (uint32_t)*(uint32_t*) "RTP_");
1399                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1400                                                (uint32_t)*(uint32_t*) "RTP_");
1401            }
1402
1403            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1404            request.append(rtpPort);
1405            request.append("-");
1406            request.append(rtpPort + 1);
1407        }
1408
1409        request.append("\r\n");
1410
1411        if (index > 1) {
1412            request.append("Session: ");
1413            request.append(mSessionID);
1414            request.append("\r\n");
1415        }
1416
1417        request.append("\r\n");
1418
1419        sp<AMessage> reply = new AMessage('setu', id());
1420        reply->setSize("index", index);
1421        reply->setSize("track-index", mTracks.size() - 1);
1422        mConn->sendRequest(request.c_str(), reply);
1423    }
1424
1425    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1426        out->clear();
1427
1428        if (strncasecmp("rtsp://", baseURL, 7)) {
1429            // Base URL must be absolute
1430            return false;
1431        }
1432
1433        if (!strncasecmp("rtsp://", url, 7)) {
1434            // "url" is already an absolute URL, ignore base URL.
1435            out->setTo(url);
1436            return true;
1437        }
1438
1439        size_t n = strlen(baseURL);
1440        if (baseURL[n - 1] == '/') {
1441            out->setTo(baseURL);
1442            out->append(url);
1443        } else {
1444            const char *slashPos = strrchr(baseURL, '/');
1445
1446            if (slashPos > &baseURL[6]) {
1447                out->setTo(baseURL, slashPos - baseURL);
1448            } else {
1449                out->setTo(baseURL);
1450            }
1451
1452            out->append("/");
1453            out->append(url);
1454        }
1455
1456        return true;
1457    }
1458
1459    void fakeTimestamps() {
1460        mNTPAnchorUs = -1ll;
1461        for (size_t i = 0; i < mTracks.size(); ++i) {
1462            onTimeUpdate(i, 0, 0ll);
1463        }
1464    }
1465
1466    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1467        ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1468             trackIndex, rtpTime, ntpTime);
1469
1470        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1471
1472        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1473
1474        track->mRTPAnchor = rtpTime;
1475        track->mNTPAnchorUs = ntpTimeUs;
1476
1477        if (mNTPAnchorUs < 0) {
1478            mNTPAnchorUs = ntpTimeUs;
1479            mMediaAnchorUs = mLastMediaTimeUs;
1480        }
1481
1482        if (!mAllTracksHaveTime) {
1483            bool allTracksHaveTime = true;
1484            for (size_t i = 0; i < mTracks.size(); ++i) {
1485                TrackInfo *track = &mTracks.editItemAt(i);
1486                if (track->mNTPAnchorUs < 0) {
1487                    allTracksHaveTime = false;
1488                    break;
1489                }
1490            }
1491            if (allTracksHaveTime) {
1492                mAllTracksHaveTime = true;
1493                ALOGI("Time now established for all tracks.");
1494            }
1495        }
1496    }
1497
1498    void onAccessUnitComplete(
1499            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1500        ALOGV("onAccessUnitComplete track %d", trackIndex);
1501
1502        if (mFirstAccessUnit) {
1503            sp<AMessage> msg = mNotify->dup();
1504            msg->setInt32("what", kWhatConnected);
1505            msg->post();
1506
1507            if (mSeekable) {
1508                for (size_t i = 0; i < mTracks.size(); ++i) {
1509                    TrackInfo *info = &mTracks.editItemAt(i);
1510
1511                    postNormalPlayTimeMapping(
1512                            i,
1513                            info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1514                }
1515            }
1516
1517            mFirstAccessUnit = false;
1518        }
1519
1520        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1521
1522        if (!mAllTracksHaveTime) {
1523            ALOGV("storing accessUnit, no time established yet");
1524            track->mPackets.push_back(accessUnit);
1525            return;
1526        }
1527
1528        while (!track->mPackets.empty()) {
1529            sp<ABuffer> accessUnit = *track->mPackets.begin();
1530            track->mPackets.erase(track->mPackets.begin());
1531
1532            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1533                postQueueAccessUnit(trackIndex, accessUnit);
1534            }
1535        }
1536
1537        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1538            postQueueAccessUnit(trackIndex, accessUnit);
1539        }
1540    }
1541
1542    bool addMediaTimestamp(
1543            int32_t trackIndex, const TrackInfo *track,
1544            const sp<ABuffer> &accessUnit) {
1545        uint32_t rtpTime;
1546        CHECK(accessUnit->meta()->findInt32(
1547                    "rtp-time", (int32_t *)&rtpTime));
1548
1549        int64_t relRtpTimeUs =
1550            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1551                / track->mTimeScale;
1552
1553        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1554
1555        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1556
1557        if (mediaTimeUs > mLastMediaTimeUs) {
1558            mLastMediaTimeUs = mediaTimeUs;
1559        }
1560
1561        if (mediaTimeUs < 0) {
1562            ALOGV("dropping early accessUnit.");
1563            return false;
1564        }
1565
1566        ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1567             trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1568
1569        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1570
1571        return true;
1572    }
1573
1574    void postQueueAccessUnit(
1575            size_t trackIndex, const sp<ABuffer> &accessUnit) {
1576        sp<AMessage> msg = mNotify->dup();
1577        msg->setInt32("what", kWhatAccessUnit);
1578        msg->setSize("trackIndex", trackIndex);
1579        msg->setBuffer("accessUnit", accessUnit);
1580        msg->post();
1581    }
1582
1583    void postQueueEOS(size_t trackIndex, status_t finalResult) {
1584        sp<AMessage> msg = mNotify->dup();
1585        msg->setInt32("what", kWhatEOS);
1586        msg->setSize("trackIndex", trackIndex);
1587        msg->setInt32("finalResult", finalResult);
1588        msg->post();
1589    }
1590
1591    void postQueueSeekDiscontinuity(size_t trackIndex) {
1592        sp<AMessage> msg = mNotify->dup();
1593        msg->setInt32("what", kWhatSeekDiscontinuity);
1594        msg->setSize("trackIndex", trackIndex);
1595        msg->post();
1596    }
1597
1598    void postNormalPlayTimeMapping(
1599            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1600        sp<AMessage> msg = mNotify->dup();
1601        msg->setInt32("what", kWhatNormalPlayTimeMapping);
1602        msg->setSize("trackIndex", trackIndex);
1603        msg->setInt32("rtpTime", rtpTime);
1604        msg->setInt64("nptUs", nptUs);
1605        msg->post();
1606    }
1607
1608    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1609};
1610
1611}  // namespace android
1612
1613#endif  // MY_HANDLER_H_
1614