MyHandler.h revision 820c4893fdec784321826fd903da34fe3d609b93
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22
23#ifndef LOG_TAG
24#define LOG_TAG "MyHandler"
25#endif
26
27#include <utils/Log.h>
28
29#include "APacketSource.h"
30#include "ARTPConnection.h"
31#include "ARTSPConnection.h"
32#include "ASessionDescription.h"
33
34#include <ctype.h>
35
36#include <media/stagefright/foundation/ABuffer.h>
37#include <media/stagefright/foundation/ADebug.h>
38#include <media/stagefright/foundation/ALooper.h>
39#include <media/stagefright/foundation/AMessage.h>
40#include <media/stagefright/MediaErrors.h>
41#include <media/stagefright/Utils.h>
42
43#include <arpa/inet.h>
44#include <sys/socket.h>
45#include <netdb.h>
46
47#include "HTTPBase.h"
48
49#if LOG_NDEBUG
50#define UNUSED_UNLESS_VERBOSE(x) (void)(x)
51#else
52#define UNUSED_UNLESS_VERBOSE(x)
53#endif
54
55// If no access units are received within 5 secs, assume that the rtp
56// stream has ended and signal end of stream.
57static int64_t kAccessUnitTimeoutUs = 10000000ll;
58
59// If no access units arrive for the first 10 secs after starting the
60// stream, assume none ever will and signal EOS or switch transports.
61static int64_t kStartupTimeoutUs = 10000000ll;
62
63static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
64
65static int64_t kPauseDelayUs = 3000000ll;
66
67namespace android {
68
69static bool GetAttribute(const char *s, const char *key, AString *value) {
70    value->clear();
71
72    size_t keyLen = strlen(key);
73
74    for (;;) {
75        while (isspace(*s)) {
76            ++s;
77        }
78
79        const char *colonPos = strchr(s, ';');
80
81        size_t len =
82            (colonPos == NULL) ? strlen(s) : colonPos - s;
83
84        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
85            value->setTo(&s[keyLen + 1], len - keyLen - 1);
86            return true;
87        }
88
89        if (colonPos == NULL) {
90            return false;
91        }
92
93        s = colonPos + 1;
94    }
95}
96
97struct MyHandler : public AHandler {
98    enum {
99        kWhatConnected                  = 'conn',
100        kWhatDisconnected               = 'disc',
101        kWhatSeekDone                   = 'sdon',
102
103        kWhatAccessUnit                 = 'accU',
104        kWhatEOS                        = 'eos!',
105        kWhatSeekDiscontinuity          = 'seeD',
106        kWhatNormalPlayTimeMapping      = 'nptM',
107    };
108
109    MyHandler(
110            const char *url,
111            const sp<AMessage> &notify,
112            bool uidValid = false, uid_t uid = 0)
113        : mNotify(notify),
114          mUIDValid(uidValid),
115          mUID(uid),
116          mNetLooper(new ALooper),
117          mConn(new ARTSPConnection(mUIDValid, mUID)),
118          mRTPConn(new ARTPConnection),
119          mOriginalSessionURL(url),
120          mSessionURL(url),
121          mSetupTracksSuccessful(false),
122          mSeekPending(false),
123          mFirstAccessUnit(true),
124          mAllTracksHaveTime(false),
125          mNTPAnchorUs(-1),
126          mMediaAnchorUs(-1),
127          mLastMediaTimeUs(0),
128          mNumAccessUnitsReceived(0),
129          mCheckPending(false),
130          mCheckGeneration(0),
131          mCheckTimeoutGeneration(0),
132          mTryTCPInterleaving(false),
133          mTryFakeRTCP(false),
134          mReceivedFirstRTCPPacket(false),
135          mReceivedFirstRTPPacket(false),
136          mSeekable(true),
137          mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
138          mKeepAliveGeneration(0),
139          mPausing(false),
140          mPauseGeneration(0),
141          mPlayResponseParsed(false) {
142        mNetLooper->setName("rtsp net");
143        mNetLooper->start(false /* runOnCallingThread */,
144                          false /* canCallJava */,
145                          PRIORITY_HIGHEST);
146
147        // Strip any authentication info from the session url, we don't
148        // want to transmit user/pass in cleartext.
149        AString host, path, user, pass;
150        unsigned port;
151        CHECK(ARTSPConnection::ParseURL(
152                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
153
154        if (user.size() > 0) {
155            mSessionURL.clear();
156            mSessionURL.append("rtsp://");
157            mSessionURL.append(host);
158            mSessionURL.append(":");
159            mSessionURL.append(StringPrintf("%u", port));
160            mSessionURL.append(path);
161
162            ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
163        }
164
165        mSessionHost = host;
166    }
167
168    void connect() {
169        looper()->registerHandler(mConn);
170        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
171
172        sp<AMessage> notify = new AMessage('biny', id());
173        mConn->observeBinaryData(notify);
174
175        sp<AMessage> reply = new AMessage('conn', id());
176        mConn->connect(mOriginalSessionURL.c_str(), reply);
177    }
178
179    void loadSDP(const sp<ASessionDescription>& desc) {
180        looper()->registerHandler(mConn);
181        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
182
183        sp<AMessage> notify = new AMessage('biny', id());
184        mConn->observeBinaryData(notify);
185
186        sp<AMessage> reply = new AMessage('sdpl', id());
187        reply->setObject("description", desc);
188        mConn->connect(mOriginalSessionURL.c_str(), reply);
189    }
190
191    AString getControlURL() {
192        AString sessionLevelControlURL;
193        if (mSessionDesc->findAttribute(
194                0,
195                "a=control",
196                &sessionLevelControlURL)) {
197            if (sessionLevelControlURL.compare("*") == 0) {
198                return mBaseURL;
199            } else {
200                AString controlURL;
201                CHECK(MakeURL(
202                        mBaseURL.c_str(),
203                        sessionLevelControlURL.c_str(),
204                        &controlURL));
205                return controlURL;
206            }
207        } else {
208            return mSessionURL;
209        }
210    }
211
212    void disconnect() {
213        (new AMessage('abor', id()))->post();
214    }
215
216    void seek(int64_t timeUs) {
217        sp<AMessage> msg = new AMessage('seek', id());
218        msg->setInt64("time", timeUs);
219        mPauseGeneration++;
220        msg->post();
221    }
222
223    bool isSeekable() const {
224        return mSeekable;
225    }
226
227    void pause() {
228        sp<AMessage> msg = new AMessage('paus', id());
229        mPauseGeneration++;
230        msg->setInt32("pausecheck", mPauseGeneration);
231        msg->post(kPauseDelayUs);
232    }
233
234    void resume() {
235        sp<AMessage> msg = new AMessage('resu', id());
236        mPauseGeneration++;
237        msg->post();
238    }
239
240    static void addRR(const sp<ABuffer> &buf) {
241        uint8_t *ptr = buf->data() + buf->size();
242        ptr[0] = 0x80 | 0;
243        ptr[1] = 201;  // RR
244        ptr[2] = 0;
245        ptr[3] = 1;
246        ptr[4] = 0xde;  // SSRC
247        ptr[5] = 0xad;
248        ptr[6] = 0xbe;
249        ptr[7] = 0xef;
250
251        buf->setRange(0, buf->size() + 8);
252    }
253
254    static void addSDES(int s, const sp<ABuffer> &buffer) {
255        struct sockaddr_in addr;
256        socklen_t addrSize = sizeof(addr);
257        if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
258            inet_aton("0.0.0.0", &(addr.sin_addr));
259        }
260
261        uint8_t *data = buffer->data() + buffer->size();
262        data[0] = 0x80 | 1;
263        data[1] = 202;  // SDES
264        data[4] = 0xde;  // SSRC
265        data[5] = 0xad;
266        data[6] = 0xbe;
267        data[7] = 0xef;
268
269        size_t offset = 8;
270
271        data[offset++] = 1;  // CNAME
272
273        AString cname = "stagefright@";
274        cname.append(inet_ntoa(addr.sin_addr));
275        data[offset++] = cname.size();
276
277        memcpy(&data[offset], cname.c_str(), cname.size());
278        offset += cname.size();
279
280        data[offset++] = 6;  // TOOL
281
282        AString tool = MakeUserAgent();
283
284        data[offset++] = tool.size();
285
286        memcpy(&data[offset], tool.c_str(), tool.size());
287        offset += tool.size();
288
289        data[offset++] = 0;
290
291        if ((offset % 4) > 0) {
292            size_t count = 4 - (offset % 4);
293            switch (count) {
294                case 3:
295                    data[offset++] = 0;
296                case 2:
297                    data[offset++] = 0;
298                case 1:
299                    data[offset++] = 0;
300            }
301        }
302
303        size_t numWords = (offset / 4) - 1;
304        data[2] = numWords >> 8;
305        data[3] = numWords & 0xff;
306
307        buffer->setRange(buffer->offset(), buffer->size() + offset);
308    }
309
310    // In case we're behind NAT, fire off two UDP packets to the remote
311    // rtp/rtcp ports to poke a hole into the firewall for future incoming
312    // packets. We're going to send an RR/SDES RTCP packet to both of them.
313    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
314        struct sockaddr_in addr;
315        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
316        addr.sin_family = AF_INET;
317
318        AString source;
319        AString server_port;
320        if (!GetAttribute(transport.c_str(),
321                          "source",
322                          &source)) {
323            ALOGW("Missing 'source' field in Transport response. Using "
324                 "RTSP endpoint address.");
325
326            struct hostent *ent = gethostbyname(mSessionHost.c_str());
327            if (ent == NULL) {
328                ALOGE("Failed to look up address of session host '%s'",
329                     mSessionHost.c_str());
330
331                return false;
332            }
333
334            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
335        } else {
336            addr.sin_addr.s_addr = inet_addr(source.c_str());
337        }
338
339        if (!GetAttribute(transport.c_str(),
340                                 "server_port",
341                                 &server_port)) {
342            ALOGI("Missing 'server_port' field in Transport response.");
343            return false;
344        }
345
346        int rtpPort, rtcpPort;
347        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
348                || rtpPort <= 0 || rtpPort > 65535
349                || rtcpPort <=0 || rtcpPort > 65535
350                || rtcpPort != rtpPort + 1) {
351            ALOGE("Server picked invalid RTP/RTCP port pair %s,"
352                 " RTP port must be even, RTCP port must be one higher.",
353                 server_port.c_str());
354
355            return false;
356        }
357
358        if (rtpPort & 1) {
359            ALOGW("Server picked an odd RTP port, it should've picked an "
360                 "even one, we'll let it pass for now, but this may break "
361                 "in the future.");
362        }
363
364        if (addr.sin_addr.s_addr == INADDR_NONE) {
365            return true;
366        }
367
368        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
369            // No firewalls to traverse on the loopback interface.
370            return true;
371        }
372
373        // Make up an RR/SDES RTCP packet.
374        sp<ABuffer> buf = new ABuffer(65536);
375        buf->setRange(0, 0);
376        addRR(buf);
377        addSDES(rtpSocket, buf);
378
379        addr.sin_port = htons(rtpPort);
380
381        ssize_t n = sendto(
382                rtpSocket, buf->data(), buf->size(), 0,
383                (const sockaddr *)&addr, sizeof(addr));
384
385        if (n < (ssize_t)buf->size()) {
386            ALOGE("failed to poke a hole for RTP packets");
387            return false;
388        }
389
390        addr.sin_port = htons(rtcpPort);
391
392        n = sendto(
393                rtcpSocket, buf->data(), buf->size(), 0,
394                (const sockaddr *)&addr, sizeof(addr));
395
396        if (n < (ssize_t)buf->size()) {
397            ALOGE("failed to poke a hole for RTCP packets");
398            return false;
399        }
400
401        ALOGV("successfully poked holes.");
402
403        return true;
404    }
405
406    static bool isLiveStream(const sp<ASessionDescription> &desc) {
407        AString attrLiveStream;
408        if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
409            ssize_t semicolonPos = attrLiveStream.find(";", 2);
410
411            const char* liveStreamValue;
412            if (semicolonPos < 0) {
413                liveStreamValue = attrLiveStream.c_str();
414            } else {
415                AString valString;
416                valString.setTo(attrLiveStream,
417                        semicolonPos + 1,
418                        attrLiveStream.size() - semicolonPos - 1);
419                liveStreamValue = valString.c_str();
420            }
421
422            uint32_t value = strtoul(liveStreamValue, NULL, 10);
423            if (value == 1) {
424                ALOGV("found live stream");
425                return true;
426            }
427        } else {
428            // It is a live stream if no duration is returned
429            int64_t durationUs;
430            if (!desc->getDurationUs(&durationUs)) {
431                ALOGV("No duration found, assume live stream");
432                return true;
433            }
434        }
435
436        return false;
437    }
438
439    virtual void onMessageReceived(const sp<AMessage> &msg) {
440        switch (msg->what()) {
441            case 'conn':
442            {
443                int32_t result;
444                CHECK(msg->findInt32("result", &result));
445
446                ALOGI("connection request completed with result %d (%s)",
447                     result, strerror(-result));
448
449                if (result == OK) {
450                    AString request;
451                    request = "DESCRIBE ";
452                    request.append(mSessionURL);
453                    request.append(" RTSP/1.0\r\n");
454                    request.append("Accept: application/sdp\r\n");
455                    request.append("\r\n");
456
457                    sp<AMessage> reply = new AMessage('desc', id());
458                    mConn->sendRequest(request.c_str(), reply);
459                } else {
460                    (new AMessage('disc', id()))->post();
461                }
462                break;
463            }
464
465            case 'disc':
466            {
467                ++mKeepAliveGeneration;
468
469                int32_t reconnect;
470                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
471                    sp<AMessage> reply = new AMessage('conn', id());
472                    mConn->connect(mOriginalSessionURL.c_str(), reply);
473                } else {
474                    (new AMessage('quit', id()))->post();
475                }
476                break;
477            }
478
479            case 'desc':
480            {
481                int32_t result;
482                CHECK(msg->findInt32("result", &result));
483
484                ALOGI("DESCRIBE completed with result %d (%s)",
485                     result, strerror(-result));
486
487                if (result == OK) {
488                    sp<RefBase> obj;
489                    CHECK(msg->findObject("response", &obj));
490                    sp<ARTSPResponse> response =
491                        static_cast<ARTSPResponse *>(obj.get());
492
493                    if (response->mStatusCode == 301 || response->mStatusCode == 302) {
494                        ssize_t i = response->mHeaders.indexOfKey("location");
495                        CHECK_GE(i, 0);
496
497                        mOriginalSessionURL = response->mHeaders.valueAt(i);
498                        mSessionURL = mOriginalSessionURL;
499
500                        // Strip any authentication info from the session url, we don't
501                        // want to transmit user/pass in cleartext.
502                        AString host, path, user, pass;
503                        unsigned port;
504                        if (ARTSPConnection::ParseURL(
505                                    mSessionURL.c_str(), &host, &port, &path, &user, &pass)
506                                && user.size() > 0) {
507                            mSessionURL.clear();
508                            mSessionURL.append("rtsp://");
509                            mSessionURL.append(host);
510                            mSessionURL.append(":");
511                            mSessionURL.append(StringPrintf("%u", port));
512                            mSessionURL.append(path);
513
514                            ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
515                        }
516
517                        sp<AMessage> reply = new AMessage('conn', id());
518                        mConn->connect(mOriginalSessionURL.c_str(), reply);
519                        break;
520                    }
521
522                    if (response->mStatusCode != 200) {
523                        result = UNKNOWN_ERROR;
524                    } else if (response->mContent == NULL) {
525                        result = ERROR_MALFORMED;
526                        ALOGE("The response has no content.");
527                    } else {
528                        mSessionDesc = new ASessionDescription;
529
530                        mSessionDesc->setTo(
531                                response->mContent->data(),
532                                response->mContent->size());
533
534                        if (!mSessionDesc->isValid()) {
535                            ALOGE("Failed to parse session description.");
536                            result = ERROR_MALFORMED;
537                        } else {
538                            ssize_t i = response->mHeaders.indexOfKey("content-base");
539                            if (i >= 0) {
540                                mBaseURL = response->mHeaders.valueAt(i);
541                            } else {
542                                i = response->mHeaders.indexOfKey("content-location");
543                                if (i >= 0) {
544                                    mBaseURL = response->mHeaders.valueAt(i);
545                                } else {
546                                    mBaseURL = mSessionURL;
547                                }
548                            }
549
550                            mSeekable = !isLiveStream(mSessionDesc);
551
552                            if (!mBaseURL.startsWith("rtsp://")) {
553                                // Some misbehaving servers specify a relative
554                                // URL in one of the locations above, combine
555                                // it with the absolute session URL to get
556                                // something usable...
557
558                                ALOGW("Server specified a non-absolute base URL"
559                                     ", combining it with the session URL to "
560                                     "get something usable...");
561
562                                AString tmp;
563                                CHECK(MakeURL(
564                                            mSessionURL.c_str(),
565                                            mBaseURL.c_str(),
566                                            &tmp));
567
568                                mBaseURL = tmp;
569                            }
570
571                            mControlURL = getControlURL();
572
573                            if (mSessionDesc->countTracks() < 2) {
574                                // There's no actual tracks in this session.
575                                // The first "track" is merely session meta
576                                // data.
577
578                                ALOGW("Session doesn't contain any playable "
579                                     "tracks. Aborting.");
580                                result = ERROR_UNSUPPORTED;
581                            } else {
582                                setupTrack(1);
583                            }
584                        }
585                    }
586                }
587
588                if (result != OK) {
589                    sp<AMessage> reply = new AMessage('disc', id());
590                    mConn->disconnect(reply);
591                }
592                break;
593            }
594
595            case 'sdpl':
596            {
597                int32_t result;
598                CHECK(msg->findInt32("result", &result));
599
600                ALOGI("SDP connection request completed with result %d (%s)",
601                     result, strerror(-result));
602
603                if (result == OK) {
604                    sp<RefBase> obj;
605                    CHECK(msg->findObject("description", &obj));
606                    mSessionDesc =
607                        static_cast<ASessionDescription *>(obj.get());
608
609                    if (!mSessionDesc->isValid()) {
610                        ALOGE("Failed to parse session description.");
611                        result = ERROR_MALFORMED;
612                    } else {
613                        mBaseURL = mSessionURL;
614
615                        mSeekable = !isLiveStream(mSessionDesc);
616
617                        mControlURL = getControlURL();
618
619                        if (mSessionDesc->countTracks() < 2) {
620                            // There's no actual tracks in this session.
621                            // The first "track" is merely session meta
622                            // data.
623
624                            ALOGW("Session doesn't contain any playable "
625                                 "tracks. Aborting.");
626                            result = ERROR_UNSUPPORTED;
627                        } else {
628                            setupTrack(1);
629                        }
630                    }
631                }
632
633                if (result != OK) {
634                    sp<AMessage> reply = new AMessage('disc', id());
635                    mConn->disconnect(reply);
636                }
637                break;
638            }
639
640            case 'setu':
641            {
642                size_t index;
643                CHECK(msg->findSize("index", &index));
644
645                TrackInfo *track = NULL;
646                size_t trackIndex;
647                if (msg->findSize("track-index", &trackIndex)) {
648                    track = &mTracks.editItemAt(trackIndex);
649                }
650
651                int32_t result;
652                CHECK(msg->findInt32("result", &result));
653
654                ALOGI("SETUP(%d) completed with result %d (%s)",
655                     index, result, strerror(-result));
656
657                if (result == OK) {
658                    CHECK(track != NULL);
659
660                    sp<RefBase> obj;
661                    CHECK(msg->findObject("response", &obj));
662                    sp<ARTSPResponse> response =
663                        static_cast<ARTSPResponse *>(obj.get());
664
665                    if (response->mStatusCode != 200) {
666                        result = UNKNOWN_ERROR;
667                    } else {
668                        ssize_t i = response->mHeaders.indexOfKey("session");
669                        CHECK_GE(i, 0);
670
671                        mSessionID = response->mHeaders.valueAt(i);
672
673                        mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
674                        AString timeoutStr;
675                        if (GetAttribute(
676                                    mSessionID.c_str(), "timeout", &timeoutStr)) {
677                            char *end;
678                            unsigned long timeoutSecs =
679                                strtoul(timeoutStr.c_str(), &end, 10);
680
681                            if (end == timeoutStr.c_str() || *end != '\0') {
682                                ALOGW("server specified malformed timeout '%s'",
683                                     timeoutStr.c_str());
684
685                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
686                            } else if (timeoutSecs < 15) {
687                                ALOGW("server specified too short a timeout "
688                                     "(%lu secs), using default.",
689                                     timeoutSecs);
690
691                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
692                            } else {
693                                mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
694
695                                ALOGI("server specified timeout of %lu secs.",
696                                     timeoutSecs);
697                            }
698                        }
699
700                        i = mSessionID.find(";");
701                        if (i >= 0) {
702                            // Remove options, i.e. ";timeout=90"
703                            mSessionID.erase(i, mSessionID.size() - i);
704                        }
705
706                        sp<AMessage> notify = new AMessage('accu', id());
707                        notify->setSize("track-index", trackIndex);
708
709                        i = response->mHeaders.indexOfKey("transport");
710                        CHECK_GE(i, 0);
711
712                        if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
713                            if (!track->mUsingInterleavedTCP) {
714                                AString transport = response->mHeaders.valueAt(i);
715
716                                // We are going to continue even if we were
717                                // unable to poke a hole into the firewall...
718                                pokeAHole(
719                                        track->mRTPSocket,
720                                        track->mRTCPSocket,
721                                        transport);
722                            }
723
724                            mRTPConn->addStream(
725                                    track->mRTPSocket, track->mRTCPSocket,
726                                    mSessionDesc, index,
727                                    notify, track->mUsingInterleavedTCP);
728
729                            mSetupTracksSuccessful = true;
730                        } else {
731                            result = BAD_VALUE;
732                        }
733                    }
734                }
735
736                if (result != OK) {
737                    if (track) {
738                        if (!track->mUsingInterleavedTCP) {
739                            // Clear the tag
740                            if (mUIDValid) {
741                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
742                                HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
743                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
744                                HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
745                            }
746
747                            close(track->mRTPSocket);
748                            close(track->mRTCPSocket);
749                        }
750
751                        mTracks.removeItemsAt(trackIndex);
752                    }
753                }
754
755                ++index;
756                if (result == OK && index < mSessionDesc->countTracks()) {
757                    setupTrack(index);
758                } else if (mSetupTracksSuccessful) {
759                    ++mKeepAliveGeneration;
760                    postKeepAlive();
761
762                    AString request = "PLAY ";
763                    request.append(mControlURL);
764                    request.append(" RTSP/1.0\r\n");
765
766                    request.append("Session: ");
767                    request.append(mSessionID);
768                    request.append("\r\n");
769
770                    request.append("\r\n");
771
772                    sp<AMessage> reply = new AMessage('play', id());
773                    mConn->sendRequest(request.c_str(), reply);
774                } else {
775                    sp<AMessage> reply = new AMessage('disc', id());
776                    mConn->disconnect(reply);
777                }
778                break;
779            }
780
781            case 'play':
782            {
783                int32_t result;
784                CHECK(msg->findInt32("result", &result));
785
786                ALOGI("PLAY completed with result %d (%s)",
787                     result, strerror(-result));
788
789                if (result == OK) {
790                    sp<RefBase> obj;
791                    CHECK(msg->findObject("response", &obj));
792                    sp<ARTSPResponse> response =
793                        static_cast<ARTSPResponse *>(obj.get());
794
795                    if (response->mStatusCode != 200) {
796                        result = UNKNOWN_ERROR;
797                    } else {
798                        parsePlayResponse(response);
799
800                        sp<AMessage> timeout = new AMessage('tiou', id());
801                        mCheckTimeoutGeneration++;
802                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
803                        timeout->post(kStartupTimeoutUs);
804                    }
805                }
806
807                if (result != OK) {
808                    sp<AMessage> reply = new AMessage('disc', id());
809                    mConn->disconnect(reply);
810                }
811
812                break;
813            }
814
815            case 'aliv':
816            {
817                int32_t generation;
818                CHECK(msg->findInt32("generation", &generation));
819
820                if (generation != mKeepAliveGeneration) {
821                    // obsolete event.
822                    break;
823                }
824
825                AString request;
826                request.append("OPTIONS ");
827                request.append(mSessionURL);
828                request.append(" RTSP/1.0\r\n");
829                request.append("Session: ");
830                request.append(mSessionID);
831                request.append("\r\n");
832                request.append("\r\n");
833
834                sp<AMessage> reply = new AMessage('opts', id());
835                reply->setInt32("generation", mKeepAliveGeneration);
836                mConn->sendRequest(request.c_str(), reply);
837                break;
838            }
839
840            case 'opts':
841            {
842                int32_t result;
843                CHECK(msg->findInt32("result", &result));
844
845                ALOGI("OPTIONS completed with result %d (%s)",
846                     result, strerror(-result));
847
848                int32_t generation;
849                CHECK(msg->findInt32("generation", &generation));
850
851                if (generation != mKeepAliveGeneration) {
852                    // obsolete event.
853                    break;
854                }
855
856                postKeepAlive();
857                break;
858            }
859
860            case 'abor':
861            {
862                for (size_t i = 0; i < mTracks.size(); ++i) {
863                    TrackInfo *info = &mTracks.editItemAt(i);
864
865                    if (!mFirstAccessUnit) {
866                        postQueueEOS(i, ERROR_END_OF_STREAM);
867                    }
868
869                    if (!info->mUsingInterleavedTCP) {
870                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
871
872                        // Clear the tag
873                        if (mUIDValid) {
874                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
875                            HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
876                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
877                            HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
878                        }
879
880                        close(info->mRTPSocket);
881                        close(info->mRTCPSocket);
882                    }
883                }
884                mTracks.clear();
885                mSetupTracksSuccessful = false;
886                mSeekPending = false;
887                mFirstAccessUnit = true;
888                mAllTracksHaveTime = false;
889                mNTPAnchorUs = -1;
890                mMediaAnchorUs = -1;
891                mNumAccessUnitsReceived = 0;
892                mReceivedFirstRTCPPacket = false;
893                mReceivedFirstRTPPacket = false;
894                mPausing = false;
895                mSeekable = true;
896
897                sp<AMessage> reply = new AMessage('tear', id());
898
899                int32_t reconnect;
900                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
901                    reply->setInt32("reconnect", true);
902                }
903
904                AString request;
905                request = "TEARDOWN ";
906
907                // XXX should use aggregate url from SDP here...
908                request.append(mSessionURL);
909                request.append(" RTSP/1.0\r\n");
910
911                request.append("Session: ");
912                request.append(mSessionID);
913                request.append("\r\n");
914
915                request.append("\r\n");
916
917                mConn->sendRequest(request.c_str(), reply);
918                break;
919            }
920
921            case 'tear':
922            {
923                int32_t result;
924                CHECK(msg->findInt32("result", &result));
925
926                ALOGI("TEARDOWN completed with result %d (%s)",
927                     result, strerror(-result));
928
929                sp<AMessage> reply = new AMessage('disc', id());
930
931                int32_t reconnect;
932                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
933                    reply->setInt32("reconnect", true);
934                }
935
936                mConn->disconnect(reply);
937                break;
938            }
939
940            case 'quit':
941            {
942                sp<AMessage> msg = mNotify->dup();
943                msg->setInt32("what", kWhatDisconnected);
944                msg->setInt32("result", UNKNOWN_ERROR);
945                msg->post();
946                break;
947            }
948
949            case 'chek':
950            {
951                int32_t generation;
952                CHECK(msg->findInt32("generation", &generation));
953                if (generation != mCheckGeneration) {
954                    // This is an outdated message. Ignore.
955                    break;
956                }
957
958                if (mNumAccessUnitsReceived == 0) {
959#if 1
960                    ALOGI("stream ended? aborting.");
961                    (new AMessage('abor', id()))->post();
962                    break;
963#else
964                    ALOGI("haven't seen an AU in a looong time.");
965#endif
966                }
967
968                mNumAccessUnitsReceived = 0;
969                msg->post(kAccessUnitTimeoutUs);
970                break;
971            }
972
973            case 'accu':
974            {
975                int32_t timeUpdate;
976                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
977                    size_t trackIndex;
978                    CHECK(msg->findSize("track-index", &trackIndex));
979
980                    uint32_t rtpTime;
981                    uint64_t ntpTime;
982                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
983                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
984
985                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
986                    break;
987                }
988
989                int32_t first;
990                if (msg->findInt32("first-rtcp", &first)) {
991                    mReceivedFirstRTCPPacket = true;
992                    break;
993                }
994
995                if (msg->findInt32("first-rtp", &first)) {
996                    mReceivedFirstRTPPacket = true;
997                    break;
998                }
999
1000                ++mNumAccessUnitsReceived;
1001                postAccessUnitTimeoutCheck();
1002
1003                size_t trackIndex;
1004                CHECK(msg->findSize("track-index", &trackIndex));
1005
1006                if (trackIndex >= mTracks.size()) {
1007                    ALOGV("late packets ignored.");
1008                    break;
1009                }
1010
1011                TrackInfo *track = &mTracks.editItemAt(trackIndex);
1012
1013                int32_t eos;
1014                if (msg->findInt32("eos", &eos)) {
1015                    ALOGI("received BYE on track index %d", trackIndex);
1016                    if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1017                        ALOGI("No time established => fake existing data");
1018
1019                        track->mEOSReceived = true;
1020                        mTryFakeRTCP = true;
1021                        mReceivedFirstRTCPPacket = true;
1022                        fakeTimestamps();
1023                    } else {
1024                        postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1025                    }
1026                    return;
1027                }
1028
1029                sp<ABuffer> accessUnit;
1030                CHECK(msg->findBuffer("access-unit", &accessUnit));
1031
1032                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1033
1034                if (mSeekPending) {
1035                    ALOGV("we're seeking, dropping stale packet.");
1036                    break;
1037                }
1038
1039                if (seqNum < track->mFirstSeqNumInSegment) {
1040                    ALOGV("dropping stale access-unit (%d < %d)",
1041                         seqNum, track->mFirstSeqNumInSegment);
1042                    break;
1043                }
1044
1045                if (track->mNewSegment) {
1046                    track->mNewSegment = false;
1047                }
1048
1049                onAccessUnitComplete(trackIndex, accessUnit);
1050                break;
1051            }
1052
1053            case 'paus':
1054            {
1055                int32_t generation;
1056                CHECK(msg->findInt32("pausecheck", &generation));
1057                if (generation != mPauseGeneration) {
1058                    ALOGV("Ignoring outdated pause message.");
1059                    break;
1060                }
1061
1062                if (!mSeekable) {
1063                    ALOGW("This is a live stream, ignoring pause request.");
1064                    break;
1065                }
1066                mCheckPending = true;
1067                ++mCheckGeneration;
1068                mPausing = true;
1069
1070                AString request = "PAUSE ";
1071                request.append(mControlURL);
1072                request.append(" RTSP/1.0\r\n");
1073
1074                request.append("Session: ");
1075                request.append(mSessionID);
1076                request.append("\r\n");
1077
1078                request.append("\r\n");
1079
1080                sp<AMessage> reply = new AMessage('pau2', id());
1081                mConn->sendRequest(request.c_str(), reply);
1082                break;
1083            }
1084
1085            case 'pau2':
1086            {
1087                int32_t result;
1088                CHECK(msg->findInt32("result", &result));
1089                mCheckTimeoutGeneration++;
1090
1091                ALOGI("PAUSE completed with result %d (%s)",
1092                     result, strerror(-result));
1093                break;
1094            }
1095
1096            case 'resu':
1097            {
1098                if (mPausing && mSeekPending) {
1099                    // If seeking, Play will be sent from see1 instead
1100                    break;
1101                }
1102
1103                if (!mPausing) {
1104                    // Dont send PLAY if we have not paused
1105                    break;
1106                }
1107                AString request = "PLAY ";
1108                request.append(mControlURL);
1109                request.append(" RTSP/1.0\r\n");
1110
1111                request.append("Session: ");
1112                request.append(mSessionID);
1113                request.append("\r\n");
1114
1115                request.append("\r\n");
1116
1117                sp<AMessage> reply = new AMessage('res2', id());
1118                mConn->sendRequest(request.c_str(), reply);
1119                break;
1120            }
1121
1122            case 'res2':
1123            {
1124                int32_t result;
1125                CHECK(msg->findInt32("result", &result));
1126
1127                ALOGI("PLAY completed with result %d (%s)",
1128                     result, strerror(-result));
1129
1130                mCheckPending = false;
1131                postAccessUnitTimeoutCheck();
1132
1133                if (result == OK) {
1134                    sp<RefBase> obj;
1135                    CHECK(msg->findObject("response", &obj));
1136                    sp<ARTSPResponse> response =
1137                        static_cast<ARTSPResponse *>(obj.get());
1138
1139                    if (response->mStatusCode != 200) {
1140                        result = UNKNOWN_ERROR;
1141                    } else {
1142                        parsePlayResponse(response);
1143
1144                        // Post new timeout in order to make sure to use
1145                        // fake timestamps if no new Sender Reports arrive
1146                        sp<AMessage> timeout = new AMessage('tiou', id());
1147                        mCheckTimeoutGeneration++;
1148                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1149                        timeout->post(kStartupTimeoutUs);
1150                    }
1151                }
1152
1153                if (result != OK) {
1154                    ALOGE("resume failed, aborting.");
1155                    (new AMessage('abor', id()))->post();
1156                }
1157
1158                mPausing = false;
1159                break;
1160            }
1161
1162            case 'seek':
1163            {
1164                if (!mSeekable) {
1165                    ALOGW("This is a live stream, ignoring seek request.");
1166
1167                    sp<AMessage> msg = mNotify->dup();
1168                    msg->setInt32("what", kWhatSeekDone);
1169                    msg->post();
1170                    break;
1171                }
1172
1173                int64_t timeUs;
1174                CHECK(msg->findInt64("time", &timeUs));
1175
1176                mSeekPending = true;
1177
1178                // Disable the access unit timeout until we resumed
1179                // playback again.
1180                mCheckPending = true;
1181                ++mCheckGeneration;
1182
1183                sp<AMessage> reply = new AMessage('see1', id());
1184                reply->setInt64("time", timeUs);
1185
1186                if (mPausing) {
1187                    // PAUSE already sent
1188                    ALOGI("Pause already sent");
1189                    reply->post();
1190                    break;
1191                }
1192                AString request = "PAUSE ";
1193                request.append(mControlURL);
1194                request.append(" RTSP/1.0\r\n");
1195
1196                request.append("Session: ");
1197                request.append(mSessionID);
1198                request.append("\r\n");
1199
1200                request.append("\r\n");
1201
1202                mConn->sendRequest(request.c_str(), reply);
1203                break;
1204            }
1205
1206            case 'see1':
1207            {
1208                // Session is paused now.
1209                for (size_t i = 0; i < mTracks.size(); ++i) {
1210                    TrackInfo *info = &mTracks.editItemAt(i);
1211
1212                    postQueueSeekDiscontinuity(i);
1213                    info->mEOSReceived = false;
1214
1215                    info->mRTPAnchor = 0;
1216                    info->mNTPAnchorUs = -1;
1217                }
1218
1219                mAllTracksHaveTime = false;
1220                mNTPAnchorUs = -1;
1221
1222                // Start new timeoutgeneration to avoid getting timeout
1223                // before PLAY response arrive
1224                sp<AMessage> timeout = new AMessage('tiou', id());
1225                mCheckTimeoutGeneration++;
1226                timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1227                timeout->post(kStartupTimeoutUs);
1228
1229                int64_t timeUs;
1230                CHECK(msg->findInt64("time", &timeUs));
1231
1232                AString request = "PLAY ";
1233                request.append(mControlURL);
1234                request.append(" RTSP/1.0\r\n");
1235
1236                request.append("Session: ");
1237                request.append(mSessionID);
1238                request.append("\r\n");
1239
1240                request.append(
1241                        StringPrintf(
1242                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1243
1244                request.append("\r\n");
1245
1246                sp<AMessage> reply = new AMessage('see2', id());
1247                mConn->sendRequest(request.c_str(), reply);
1248                break;
1249            }
1250
1251            case 'see2':
1252            {
1253                if (mTracks.size() == 0) {
1254                    // We have already hit abor, break
1255                    break;
1256                }
1257
1258                int32_t result;
1259                CHECK(msg->findInt32("result", &result));
1260
1261                ALOGI("PLAY completed with result %d (%s)",
1262                     result, strerror(-result));
1263
1264                mCheckPending = false;
1265                postAccessUnitTimeoutCheck();
1266
1267                if (result == OK) {
1268                    sp<RefBase> obj;
1269                    CHECK(msg->findObject("response", &obj));
1270                    sp<ARTSPResponse> response =
1271                        static_cast<ARTSPResponse *>(obj.get());
1272
1273                    if (response->mStatusCode != 200) {
1274                        result = UNKNOWN_ERROR;
1275                    } else {
1276                        parsePlayResponse(response);
1277
1278                        // Post new timeout in order to make sure to use
1279                        // fake timestamps if no new Sender Reports arrive
1280                        sp<AMessage> timeout = new AMessage('tiou', id());
1281                        mCheckTimeoutGeneration++;
1282                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1283                        timeout->post(kStartupTimeoutUs);
1284
1285                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1286                        CHECK_GE(i, 0);
1287
1288                        ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1289
1290                        ALOGI("seek completed.");
1291                    }
1292                }
1293
1294                if (result != OK) {
1295                    ALOGE("seek failed, aborting.");
1296                    (new AMessage('abor', id()))->post();
1297                }
1298
1299                mPausing = false;
1300                mSeekPending = false;
1301
1302                sp<AMessage> msg = mNotify->dup();
1303                msg->setInt32("what", kWhatSeekDone);
1304                msg->post();
1305                break;
1306            }
1307
1308            case 'biny':
1309            {
1310                sp<ABuffer> buffer;
1311                CHECK(msg->findBuffer("buffer", &buffer));
1312
1313                int32_t index;
1314                CHECK(buffer->meta()->findInt32("index", &index));
1315
1316                mRTPConn->injectPacket(index, buffer);
1317                break;
1318            }
1319
1320            case 'tiou':
1321            {
1322                int32_t timeoutGenerationCheck;
1323                CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1324                if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1325                    // This is an outdated message. Ignore.
1326                    // This typically happens if a lot of seeks are
1327                    // performed, since new timeout messages now are
1328                    // posted at seek as well.
1329                    break;
1330                }
1331                if (!mReceivedFirstRTCPPacket) {
1332                    if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1333                        ALOGW("We received RTP packets but no RTCP packets, "
1334                             "using fake timestamps.");
1335
1336                        mTryFakeRTCP = true;
1337
1338                        mReceivedFirstRTCPPacket = true;
1339
1340                        fakeTimestamps();
1341                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1342                        ALOGW("Never received any data, switching transports.");
1343
1344                        mTryTCPInterleaving = true;
1345
1346                        sp<AMessage> msg = new AMessage('abor', id());
1347                        msg->setInt32("reconnect", true);
1348                        msg->post();
1349                    } else {
1350                        ALOGW("Never received any data, disconnecting.");
1351                        (new AMessage('abor', id()))->post();
1352                    }
1353                } else {
1354                    if (!mAllTracksHaveTime) {
1355                        ALOGW("We received some RTCP packets, but time "
1356                              "could not be established on all tracks, now "
1357                              "using fake timestamps");
1358
1359                        fakeTimestamps();
1360                    }
1361                }
1362                break;
1363            }
1364
1365            default:
1366                TRESPASS();
1367                break;
1368        }
1369    }
1370
1371    void postKeepAlive() {
1372        sp<AMessage> msg = new AMessage('aliv', id());
1373        msg->setInt32("generation", mKeepAliveGeneration);
1374        msg->post((mKeepAliveTimeoutUs * 9) / 10);
1375    }
1376
1377    void postAccessUnitTimeoutCheck() {
1378        if (mCheckPending) {
1379            return;
1380        }
1381
1382        mCheckPending = true;
1383        sp<AMessage> check = new AMessage('chek', id());
1384        check->setInt32("generation", mCheckGeneration);
1385        check->post(kAccessUnitTimeoutUs);
1386    }
1387
1388    static void SplitString(
1389            const AString &s, const char *separator, List<AString> *items) {
1390        items->clear();
1391        size_t start = 0;
1392        while (start < s.size()) {
1393            ssize_t offset = s.find(separator, start);
1394
1395            if (offset < 0) {
1396                items->push_back(AString(s, start, s.size() - start));
1397                break;
1398            }
1399
1400            items->push_back(AString(s, start, offset - start));
1401            start = offset + strlen(separator);
1402        }
1403    }
1404
1405    void parsePlayResponse(const sp<ARTSPResponse> &response) {
1406        mPlayResponseParsed = true;
1407        if (mTracks.size() == 0) {
1408            ALOGV("parsePlayResponse: late packets ignored.");
1409            return;
1410        }
1411
1412        ssize_t i = response->mHeaders.indexOfKey("range");
1413        if (i < 0) {
1414            // Server doesn't even tell use what range it is going to
1415            // play, therefore we won't support seeking.
1416            return;
1417        }
1418
1419        AString range = response->mHeaders.valueAt(i);
1420        ALOGV("Range: %s", range.c_str());
1421
1422        AString val;
1423        CHECK(GetAttribute(range.c_str(), "npt", &val));
1424
1425        float npt1, npt2;
1426        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1427            // This is a live stream and therefore not seekable.
1428
1429            ALOGI("This is a live stream");
1430            return;
1431        }
1432
1433        i = response->mHeaders.indexOfKey("rtp-info");
1434        CHECK_GE(i, 0);
1435
1436        AString rtpInfo = response->mHeaders.valueAt(i);
1437        List<AString> streamInfos;
1438        SplitString(rtpInfo, ",", &streamInfos);
1439
1440        int n = 1;
1441        for (List<AString>::iterator it = streamInfos.begin();
1442             it != streamInfos.end(); ++it) {
1443            (*it).trim();
1444            ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1445
1446            CHECK(GetAttribute((*it).c_str(), "url", &val));
1447
1448            size_t trackIndex = 0;
1449            while (trackIndex < mTracks.size()
1450                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1451                ++trackIndex;
1452            }
1453            CHECK_LT(trackIndex, mTracks.size());
1454
1455            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1456
1457            char *end;
1458            unsigned long seq = strtoul(val.c_str(), &end, 10);
1459
1460            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1461            info->mFirstSeqNumInSegment = seq;
1462            info->mNewSegment = true;
1463
1464            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1465
1466            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1467
1468            ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1469
1470            info->mNormalPlayTimeRTP = rtpTime;
1471            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1472
1473            if (!mFirstAccessUnit) {
1474                postNormalPlayTimeMapping(
1475                        trackIndex,
1476                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1477            }
1478
1479            ++n;
1480        }
1481    }
1482
1483    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1484        CHECK_GE(index, 0u);
1485        CHECK_LT(index, mTracks.size());
1486
1487        const TrackInfo &info = mTracks.itemAt(index);
1488
1489        *timeScale = info.mTimeScale;
1490
1491        return info.mPacketSource->getFormat();
1492    }
1493
1494    size_t countTracks() const {
1495        return mTracks.size();
1496    }
1497
1498private:
1499    struct TrackInfo {
1500        AString mURL;
1501        int mRTPSocket;
1502        int mRTCPSocket;
1503        bool mUsingInterleavedTCP;
1504        uint32_t mFirstSeqNumInSegment;
1505        bool mNewSegment;
1506
1507        uint32_t mRTPAnchor;
1508        int64_t mNTPAnchorUs;
1509        int32_t mTimeScale;
1510        bool mEOSReceived;
1511
1512        uint32_t mNormalPlayTimeRTP;
1513        int64_t mNormalPlayTimeUs;
1514
1515        sp<APacketSource> mPacketSource;
1516
1517        // Stores packets temporarily while no notion of time
1518        // has been established yet.
1519        List<sp<ABuffer> > mPackets;
1520    };
1521
1522    sp<AMessage> mNotify;
1523    bool mUIDValid;
1524    uid_t mUID;
1525    sp<ALooper> mNetLooper;
1526    sp<ARTSPConnection> mConn;
1527    sp<ARTPConnection> mRTPConn;
1528    sp<ASessionDescription> mSessionDesc;
1529    AString mOriginalSessionURL;  // This one still has user:pass@
1530    AString mSessionURL;
1531    AString mSessionHost;
1532    AString mBaseURL;
1533    AString mControlURL;
1534    AString mSessionID;
1535    bool mSetupTracksSuccessful;
1536    bool mSeekPending;
1537    bool mFirstAccessUnit;
1538
1539    bool mAllTracksHaveTime;
1540    int64_t mNTPAnchorUs;
1541    int64_t mMediaAnchorUs;
1542    int64_t mLastMediaTimeUs;
1543
1544    int64_t mNumAccessUnitsReceived;
1545    bool mCheckPending;
1546    int32_t mCheckGeneration;
1547    int32_t mCheckTimeoutGeneration;
1548    bool mTryTCPInterleaving;
1549    bool mTryFakeRTCP;
1550    bool mReceivedFirstRTCPPacket;
1551    bool mReceivedFirstRTPPacket;
1552    bool mSeekable;
1553    int64_t mKeepAliveTimeoutUs;
1554    int32_t mKeepAliveGeneration;
1555    bool mPausing;
1556    int32_t mPauseGeneration;
1557
1558    Vector<TrackInfo> mTracks;
1559
1560    bool mPlayResponseParsed;
1561
1562    void setupTrack(size_t index) {
1563        sp<APacketSource> source =
1564            new APacketSource(mSessionDesc, index);
1565
1566        if (source->initCheck() != OK) {
1567            ALOGW("Unsupported format. Ignoring track #%d.", index);
1568
1569            sp<AMessage> reply = new AMessage('setu', id());
1570            reply->setSize("index", index);
1571            reply->setInt32("result", ERROR_UNSUPPORTED);
1572            reply->post();
1573            return;
1574        }
1575
1576        AString url;
1577        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1578
1579        AString trackURL;
1580        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1581
1582        mTracks.push(TrackInfo());
1583        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1584        info->mURL = trackURL;
1585        info->mPacketSource = source;
1586        info->mUsingInterleavedTCP = false;
1587        info->mFirstSeqNumInSegment = 0;
1588        info->mNewSegment = true;
1589        info->mRTPSocket = -1;
1590        info->mRTCPSocket = -1;
1591        info->mRTPAnchor = 0;
1592        info->mNTPAnchorUs = -1;
1593        info->mNormalPlayTimeRTP = 0;
1594        info->mNormalPlayTimeUs = 0ll;
1595
1596        unsigned long PT;
1597        AString formatDesc;
1598        AString formatParams;
1599        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1600
1601        int32_t timescale;
1602        int32_t numChannels;
1603        ASessionDescription::ParseFormatDesc(
1604                formatDesc.c_str(), &timescale, &numChannels);
1605
1606        info->mTimeScale = timescale;
1607        info->mEOSReceived = false;
1608
1609        ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1610
1611        AString request = "SETUP ";
1612        request.append(trackURL);
1613        request.append(" RTSP/1.0\r\n");
1614
1615        if (mTryTCPInterleaving) {
1616            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1617            info->mUsingInterleavedTCP = true;
1618            info->mRTPSocket = interleaveIndex;
1619            info->mRTCPSocket = interleaveIndex + 1;
1620
1621            request.append("Transport: RTP/AVP/TCP;interleaved=");
1622            request.append(interleaveIndex);
1623            request.append("-");
1624            request.append(interleaveIndex + 1);
1625        } else {
1626            unsigned rtpPort;
1627            ARTPConnection::MakePortPair(
1628                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1629
1630            if (mUIDValid) {
1631                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1632                                                (uint32_t)*(uint32_t*) "RTP_");
1633                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1634                                                (uint32_t)*(uint32_t*) "RTP_");
1635                HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1636                HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1637            }
1638
1639            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1640            request.append(rtpPort);
1641            request.append("-");
1642            request.append(rtpPort + 1);
1643        }
1644
1645        request.append("\r\n");
1646
1647        if (index > 1) {
1648            request.append("Session: ");
1649            request.append(mSessionID);
1650            request.append("\r\n");
1651        }
1652
1653        request.append("\r\n");
1654
1655        sp<AMessage> reply = new AMessage('setu', id());
1656        reply->setSize("index", index);
1657        reply->setSize("track-index", mTracks.size() - 1);
1658        mConn->sendRequest(request.c_str(), reply);
1659    }
1660
1661    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1662        out->clear();
1663
1664        if (strncasecmp("rtsp://", baseURL, 7)) {
1665            // Base URL must be absolute
1666            return false;
1667        }
1668
1669        if (!strncasecmp("rtsp://", url, 7)) {
1670            // "url" is already an absolute URL, ignore base URL.
1671            out->setTo(url);
1672            return true;
1673        }
1674
1675        size_t n = strlen(baseURL);
1676        if (baseURL[n - 1] == '/') {
1677            out->setTo(baseURL);
1678            out->append(url);
1679        } else {
1680            const char *slashPos = strrchr(baseURL, '/');
1681
1682            if (slashPos > &baseURL[6]) {
1683                out->setTo(baseURL, slashPos - baseURL);
1684            } else {
1685                out->setTo(baseURL);
1686            }
1687
1688            out->append("/");
1689            out->append(url);
1690        }
1691
1692        return true;
1693    }
1694
1695    void fakeTimestamps() {
1696        mNTPAnchorUs = -1ll;
1697        for (size_t i = 0; i < mTracks.size(); ++i) {
1698            onTimeUpdate(i, 0, 0ll);
1699        }
1700    }
1701
1702    bool dataReceivedOnAllChannels() {
1703        TrackInfo *track;
1704        for (size_t i = 0; i < mTracks.size(); ++i) {
1705            track = &mTracks.editItemAt(i);
1706            if (track->mPackets.empty()) {
1707                return false;
1708            }
1709        }
1710        return true;
1711    }
1712
1713    void handleFirstAccessUnit() {
1714        if (mFirstAccessUnit) {
1715            sp<AMessage> msg = mNotify->dup();
1716            msg->setInt32("what", kWhatConnected);
1717            msg->post();
1718
1719            if (mSeekable) {
1720                for (size_t i = 0; i < mTracks.size(); ++i) {
1721                    TrackInfo *info = &mTracks.editItemAt(i);
1722
1723                    postNormalPlayTimeMapping(
1724                            i,
1725                            info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1726                }
1727            }
1728
1729            mFirstAccessUnit = false;
1730        }
1731    }
1732
1733    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1734        ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1735             trackIndex, rtpTime, ntpTime);
1736
1737        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1738
1739        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1740
1741        track->mRTPAnchor = rtpTime;
1742        track->mNTPAnchorUs = ntpTimeUs;
1743
1744        if (mNTPAnchorUs < 0) {
1745            mNTPAnchorUs = ntpTimeUs;
1746            mMediaAnchorUs = mLastMediaTimeUs;
1747        }
1748
1749        if (!mAllTracksHaveTime) {
1750            bool allTracksHaveTime = true;
1751            for (size_t i = 0; i < mTracks.size(); ++i) {
1752                TrackInfo *track = &mTracks.editItemAt(i);
1753                if (track->mNTPAnchorUs < 0) {
1754                    allTracksHaveTime = false;
1755                    break;
1756                }
1757            }
1758            if (allTracksHaveTime) {
1759                mAllTracksHaveTime = true;
1760                ALOGI("Time now established for all tracks.");
1761            }
1762        }
1763        if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1764            handleFirstAccessUnit();
1765
1766            // Time is now established, lets start timestamping immediately
1767            for (size_t i = 0; i < mTracks.size(); ++i) {
1768                TrackInfo *trackInfo = &mTracks.editItemAt(i);
1769                while (!trackInfo->mPackets.empty()) {
1770                    sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
1771                    trackInfo->mPackets.erase(trackInfo->mPackets.begin());
1772
1773                    if (addMediaTimestamp(i, trackInfo, accessUnit)) {
1774                        postQueueAccessUnit(i, accessUnit);
1775                    }
1776                }
1777            }
1778            for (size_t i = 0; i < mTracks.size(); ++i) {
1779                TrackInfo *trackInfo = &mTracks.editItemAt(i);
1780                if (trackInfo->mEOSReceived) {
1781                    postQueueEOS(i, ERROR_END_OF_STREAM);
1782                    trackInfo->mEOSReceived = false;
1783                }
1784            }
1785        }
1786    }
1787
1788    void onAccessUnitComplete(
1789            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1790        ALOGV("onAccessUnitComplete track %d", trackIndex);
1791
1792        if(!mPlayResponseParsed){
1793            ALOGI("play response is not parsed, storing accessunit");
1794            TrackInfo *track = &mTracks.editItemAt(trackIndex);
1795            track->mPackets.push_back(accessUnit);
1796            return;
1797        }
1798
1799        handleFirstAccessUnit();
1800
1801        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1802
1803        if (!mAllTracksHaveTime) {
1804            ALOGV("storing accessUnit, no time established yet");
1805            track->mPackets.push_back(accessUnit);
1806            return;
1807        }
1808
1809        while (!track->mPackets.empty()) {
1810            sp<ABuffer> accessUnit = *track->mPackets.begin();
1811            track->mPackets.erase(track->mPackets.begin());
1812
1813            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1814                postQueueAccessUnit(trackIndex, accessUnit);
1815            }
1816        }
1817
1818        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1819            postQueueAccessUnit(trackIndex, accessUnit);
1820        }
1821
1822        if (track->mEOSReceived) {
1823            postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1824            track->mEOSReceived = false;
1825        }
1826    }
1827
1828    bool addMediaTimestamp(
1829            int32_t trackIndex, const TrackInfo *track,
1830            const sp<ABuffer> &accessUnit) {
1831        UNUSED_UNLESS_VERBOSE(trackIndex);
1832
1833        uint32_t rtpTime;
1834        CHECK(accessUnit->meta()->findInt32(
1835                    "rtp-time", (int32_t *)&rtpTime));
1836
1837        int64_t relRtpTimeUs =
1838            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1839                / track->mTimeScale;
1840
1841        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1842
1843        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1844
1845        if (mediaTimeUs > mLastMediaTimeUs) {
1846            mLastMediaTimeUs = mediaTimeUs;
1847        }
1848
1849        if (mediaTimeUs < 0) {
1850            ALOGV("dropping early accessUnit.");
1851            return false;
1852        }
1853
1854        ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1855             trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1856
1857        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1858
1859        return true;
1860    }
1861
1862    void postQueueAccessUnit(
1863            size_t trackIndex, const sp<ABuffer> &accessUnit) {
1864        sp<AMessage> msg = mNotify->dup();
1865        msg->setInt32("what", kWhatAccessUnit);
1866        msg->setSize("trackIndex", trackIndex);
1867        msg->setBuffer("accessUnit", accessUnit);
1868        msg->post();
1869    }
1870
1871    void postQueueEOS(size_t trackIndex, status_t finalResult) {
1872        sp<AMessage> msg = mNotify->dup();
1873        msg->setInt32("what", kWhatEOS);
1874        msg->setSize("trackIndex", trackIndex);
1875        msg->setInt32("finalResult", finalResult);
1876        msg->post();
1877    }
1878
1879    void postQueueSeekDiscontinuity(size_t trackIndex) {
1880        sp<AMessage> msg = mNotify->dup();
1881        msg->setInt32("what", kWhatSeekDiscontinuity);
1882        msg->setSize("trackIndex", trackIndex);
1883        msg->post();
1884    }
1885
1886    void postNormalPlayTimeMapping(
1887            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1888        sp<AMessage> msg = mNotify->dup();
1889        msg->setInt32("what", kWhatNormalPlayTimeMapping);
1890        msg->setSize("trackIndex", trackIndex);
1891        msg->setInt32("rtpTime", rtpTime);
1892        msg->setInt64("nptUs", nptUs);
1893        msg->post();
1894    }
1895
1896    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1897};
1898
1899}  // namespace android
1900
1901#endif  // MY_HANDLER_H_
1902