MyHandler.h revision a8b8488f703bb6bda039d7d98f87e4f9d845664d
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22#define LOG_TAG "MyHandler"
23#include <utils/Log.h>
24
25#include "APacketSource.h"
26#include "ARTPConnection.h"
27#include "ARTSPConnection.h"
28#include "ASessionDescription.h"
29
30#include <ctype.h>
31
32#include <media/stagefright/foundation/ABuffer.h>
33#include <media/stagefright/foundation/ADebug.h>
34#include <media/stagefright/foundation/ALooper.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/MediaErrors.h>
37#include <media/stagefright/Utils.h>
38
39#include <arpa/inet.h>
40#include <sys/socket.h>
41#include <netdb.h>
42
43#include "HTTPBase.h"
44
45// If no access units are received within 5 secs, assume that the rtp
46// stream has ended and signal end of stream.
47static int64_t kAccessUnitTimeoutUs = 10000000ll;
48
49// If no access units arrive for the first 10 secs after starting the
50// stream, assume none ever will and signal EOS or switch transports.
51static int64_t kStartupTimeoutUs = 10000000ll;
52
53static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
54
55static int64_t kPauseDelayUs = 3000000ll;
56
57namespace android {
58
59static bool GetAttribute(const char *s, const char *key, AString *value) {
60    value->clear();
61
62    size_t keyLen = strlen(key);
63
64    for (;;) {
65        while (isspace(*s)) {
66            ++s;
67        }
68
69        const char *colonPos = strchr(s, ';');
70
71        size_t len =
72            (colonPos == NULL) ? strlen(s) : colonPos - s;
73
74        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
75            value->setTo(&s[keyLen + 1], len - keyLen - 1);
76            return true;
77        }
78
79        if (colonPos == NULL) {
80            return false;
81        }
82
83        s = colonPos + 1;
84    }
85}
86
87struct MyHandler : public AHandler {
88    enum {
89        kWhatConnected                  = 'conn',
90        kWhatDisconnected               = 'disc',
91        kWhatSeekDone                   = 'sdon',
92
93        kWhatAccessUnit                 = 'accU',
94        kWhatEOS                        = 'eos!',
95        kWhatSeekDiscontinuity          = 'seeD',
96        kWhatNormalPlayTimeMapping      = 'nptM',
97    };
98
99    MyHandler(
100            const char *url,
101            const sp<AMessage> &notify,
102            bool uidValid = false, uid_t uid = 0)
103        : mNotify(notify),
104          mUIDValid(uidValid),
105          mUID(uid),
106          mNetLooper(new ALooper),
107          mConn(new ARTSPConnection(mUIDValid, mUID)),
108          mRTPConn(new ARTPConnection),
109          mOriginalSessionURL(url),
110          mSessionURL(url),
111          mSetupTracksSuccessful(false),
112          mSeekPending(false),
113          mFirstAccessUnit(true),
114          mAllTracksHaveTime(false),
115          mNTPAnchorUs(-1),
116          mMediaAnchorUs(-1),
117          mLastMediaTimeUs(0),
118          mNumAccessUnitsReceived(0),
119          mCheckPending(false),
120          mCheckGeneration(0),
121          mCheckTimeoutGeneration(0),
122          mTryTCPInterleaving(false),
123          mTryFakeRTCP(false),
124          mReceivedFirstRTCPPacket(false),
125          mReceivedFirstRTPPacket(false),
126          mSeekable(true),
127          mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
128          mKeepAliveGeneration(0),
129          mPausing(false),
130          mPauseGeneration(0),
131          mPlayResponseParsed(false) {
132        mNetLooper->setName("rtsp net");
133        mNetLooper->start(false /* runOnCallingThread */,
134                          false /* canCallJava */,
135                          PRIORITY_HIGHEST);
136
137        // Strip any authentication info from the session url, we don't
138        // want to transmit user/pass in cleartext.
139        AString host, path, user, pass;
140        unsigned port;
141        CHECK(ARTSPConnection::ParseURL(
142                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
143
144        if (user.size() > 0) {
145            mSessionURL.clear();
146            mSessionURL.append("rtsp://");
147            mSessionURL.append(host);
148            mSessionURL.append(":");
149            mSessionURL.append(StringPrintf("%u", port));
150            mSessionURL.append(path);
151
152            ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
153        }
154
155        mSessionHost = host;
156    }
157
158    void connect() {
159        looper()->registerHandler(mConn);
160        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
161
162        sp<AMessage> notify = new AMessage('biny', id());
163        mConn->observeBinaryData(notify);
164
165        sp<AMessage> reply = new AMessage('conn', id());
166        mConn->connect(mOriginalSessionURL.c_str(), reply);
167    }
168
169    void loadSDP(const sp<ASessionDescription>& desc) {
170        looper()->registerHandler(mConn);
171        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
172
173        sp<AMessage> notify = new AMessage('biny', id());
174        mConn->observeBinaryData(notify);
175
176        sp<AMessage> reply = new AMessage('sdpl', id());
177        reply->setObject("description", desc);
178        mConn->connect(mOriginalSessionURL.c_str(), reply);
179    }
180
181    AString getControlURL(sp<ASessionDescription> desc) {
182        AString sessionLevelControlURL;
183        if (mSessionDesc->findAttribute(
184                0,
185                "a=control",
186                &sessionLevelControlURL)) {
187            if (sessionLevelControlURL.compare("*") == 0) {
188                return mBaseURL;
189            } else {
190                AString controlURL;
191                CHECK(MakeURL(
192                        mBaseURL.c_str(),
193                        sessionLevelControlURL.c_str(),
194                        &controlURL));
195                return controlURL;
196            }
197        } else {
198            return mSessionURL;
199        }
200    }
201
202    void disconnect() {
203        (new AMessage('abor', id()))->post();
204    }
205
206    void seek(int64_t timeUs) {
207        sp<AMessage> msg = new AMessage('seek', id());
208        msg->setInt64("time", timeUs);
209        mPauseGeneration++;
210        msg->post();
211    }
212
213    bool isSeekable() const {
214        return mSeekable;
215    }
216
217    void pause() {
218        sp<AMessage> msg = new AMessage('paus', id());
219        mPauseGeneration++;
220        msg->setInt32("pausecheck", mPauseGeneration);
221        msg->post(kPauseDelayUs);
222    }
223
224    void resume() {
225        sp<AMessage> msg = new AMessage('resu', id());
226        mPauseGeneration++;
227        msg->post();
228    }
229
230    static void addRR(const sp<ABuffer> &buf) {
231        uint8_t *ptr = buf->data() + buf->size();
232        ptr[0] = 0x80 | 0;
233        ptr[1] = 201;  // RR
234        ptr[2] = 0;
235        ptr[3] = 1;
236        ptr[4] = 0xde;  // SSRC
237        ptr[5] = 0xad;
238        ptr[6] = 0xbe;
239        ptr[7] = 0xef;
240
241        buf->setRange(0, buf->size() + 8);
242    }
243
244    static void addSDES(int s, const sp<ABuffer> &buffer) {
245        struct sockaddr_in addr;
246        socklen_t addrSize = sizeof(addr);
247        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
248
249        uint8_t *data = buffer->data() + buffer->size();
250        data[0] = 0x80 | 1;
251        data[1] = 202;  // SDES
252        data[4] = 0xde;  // SSRC
253        data[5] = 0xad;
254        data[6] = 0xbe;
255        data[7] = 0xef;
256
257        size_t offset = 8;
258
259        data[offset++] = 1;  // CNAME
260
261        AString cname = "stagefright@";
262        cname.append(inet_ntoa(addr.sin_addr));
263        data[offset++] = cname.size();
264
265        memcpy(&data[offset], cname.c_str(), cname.size());
266        offset += cname.size();
267
268        data[offset++] = 6;  // TOOL
269
270        AString tool = MakeUserAgent();
271
272        data[offset++] = tool.size();
273
274        memcpy(&data[offset], tool.c_str(), tool.size());
275        offset += tool.size();
276
277        data[offset++] = 0;
278
279        if ((offset % 4) > 0) {
280            size_t count = 4 - (offset % 4);
281            switch (count) {
282                case 3:
283                    data[offset++] = 0;
284                case 2:
285                    data[offset++] = 0;
286                case 1:
287                    data[offset++] = 0;
288            }
289        }
290
291        size_t numWords = (offset / 4) - 1;
292        data[2] = numWords >> 8;
293        data[3] = numWords & 0xff;
294
295        buffer->setRange(buffer->offset(), buffer->size() + offset);
296    }
297
298    // In case we're behind NAT, fire off two UDP packets to the remote
299    // rtp/rtcp ports to poke a hole into the firewall for future incoming
300    // packets. We're going to send an RR/SDES RTCP packet to both of them.
301    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
302        struct sockaddr_in addr;
303        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
304        addr.sin_family = AF_INET;
305
306        AString source;
307        AString server_port;
308        if (!GetAttribute(transport.c_str(),
309                          "source",
310                          &source)) {
311            ALOGW("Missing 'source' field in Transport response. Using "
312                 "RTSP endpoint address.");
313
314            struct hostent *ent = gethostbyname(mSessionHost.c_str());
315            if (ent == NULL) {
316                ALOGE("Failed to look up address of session host '%s'",
317                     mSessionHost.c_str());
318
319                return false;
320            }
321
322            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
323        } else {
324            addr.sin_addr.s_addr = inet_addr(source.c_str());
325        }
326
327        if (!GetAttribute(transport.c_str(),
328                                 "server_port",
329                                 &server_port)) {
330            ALOGI("Missing 'server_port' field in Transport response.");
331            return false;
332        }
333
334        int rtpPort, rtcpPort;
335        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
336                || rtpPort <= 0 || rtpPort > 65535
337                || rtcpPort <=0 || rtcpPort > 65535
338                || rtcpPort != rtpPort + 1) {
339            ALOGE("Server picked invalid RTP/RTCP port pair %s,"
340                 " RTP port must be even, RTCP port must be one higher.",
341                 server_port.c_str());
342
343            return false;
344        }
345
346        if (rtpPort & 1) {
347            ALOGW("Server picked an odd RTP port, it should've picked an "
348                 "even one, we'll let it pass for now, but this may break "
349                 "in the future.");
350        }
351
352        if (addr.sin_addr.s_addr == INADDR_NONE) {
353            return true;
354        }
355
356        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
357            // No firewalls to traverse on the loopback interface.
358            return true;
359        }
360
361        // Make up an RR/SDES RTCP packet.
362        sp<ABuffer> buf = new ABuffer(65536);
363        buf->setRange(0, 0);
364        addRR(buf);
365        addSDES(rtpSocket, buf);
366
367        addr.sin_port = htons(rtpPort);
368
369        ssize_t n = sendto(
370                rtpSocket, buf->data(), buf->size(), 0,
371                (const sockaddr *)&addr, sizeof(addr));
372
373        if (n < (ssize_t)buf->size()) {
374            ALOGE("failed to poke a hole for RTP packets");
375            return false;
376        }
377
378        addr.sin_port = htons(rtcpPort);
379
380        n = sendto(
381                rtcpSocket, buf->data(), buf->size(), 0,
382                (const sockaddr *)&addr, sizeof(addr));
383
384        if (n < (ssize_t)buf->size()) {
385            ALOGE("failed to poke a hole for RTCP packets");
386            return false;
387        }
388
389        ALOGV("successfully poked holes.");
390
391        return true;
392    }
393
394    static bool isLiveStream(const sp<ASessionDescription> &desc) {
395        AString attrLiveStream;
396        if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
397            ssize_t semicolonPos = attrLiveStream.find(";", 2);
398
399            const char* liveStreamValue;
400            if (semicolonPos < 0) {
401                liveStreamValue = attrLiveStream.c_str();
402            } else {
403                AString valString;
404                valString.setTo(attrLiveStream,
405                        semicolonPos + 1,
406                        attrLiveStream.size() - semicolonPos - 1);
407                liveStreamValue = valString.c_str();
408            }
409
410            uint32_t value = strtoul(liveStreamValue, NULL, 10);
411            if (value == 1) {
412                ALOGV("found live stream");
413                return true;
414            }
415        } else {
416            // It is a live stream if no duration is returned
417            int64_t durationUs;
418            if (!desc->getDurationUs(&durationUs)) {
419                ALOGV("No duration found, assume live stream");
420                return true;
421            }
422        }
423
424        return false;
425    }
426
427    virtual void onMessageReceived(const sp<AMessage> &msg) {
428        switch (msg->what()) {
429            case 'conn':
430            {
431                int32_t result;
432                CHECK(msg->findInt32("result", &result));
433
434                ALOGI("connection request completed with result %d (%s)",
435                     result, strerror(-result));
436
437                if (result == OK) {
438                    AString request;
439                    request = "DESCRIBE ";
440                    request.append(mSessionURL);
441                    request.append(" RTSP/1.0\r\n");
442                    request.append("Accept: application/sdp\r\n");
443                    request.append("\r\n");
444
445                    sp<AMessage> reply = new AMessage('desc', id());
446                    mConn->sendRequest(request.c_str(), reply);
447                } else {
448                    (new AMessage('disc', id()))->post();
449                }
450                break;
451            }
452
453            case 'disc':
454            {
455                ++mKeepAliveGeneration;
456
457                int32_t reconnect;
458                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
459                    sp<AMessage> reply = new AMessage('conn', id());
460                    mConn->connect(mOriginalSessionURL.c_str(), reply);
461                } else {
462                    (new AMessage('quit', id()))->post();
463                }
464                break;
465            }
466
467            case 'desc':
468            {
469                int32_t result;
470                CHECK(msg->findInt32("result", &result));
471
472                ALOGI("DESCRIBE completed with result %d (%s)",
473                     result, strerror(-result));
474
475                if (result == OK) {
476                    sp<RefBase> obj;
477                    CHECK(msg->findObject("response", &obj));
478                    sp<ARTSPResponse> response =
479                        static_cast<ARTSPResponse *>(obj.get());
480
481                    if (response->mStatusCode == 302) {
482                        ssize_t i = response->mHeaders.indexOfKey("location");
483                        CHECK_GE(i, 0);
484
485                        mSessionURL = response->mHeaders.valueAt(i);
486
487                        AString request;
488                        request = "DESCRIBE ";
489                        request.append(mSessionURL);
490                        request.append(" RTSP/1.0\r\n");
491                        request.append("Accept: application/sdp\r\n");
492                        request.append("\r\n");
493
494                        sp<AMessage> reply = new AMessage('desc', id());
495                        mConn->sendRequest(request.c_str(), reply);
496                        break;
497                    }
498
499                    if (response->mStatusCode != 200) {
500                        result = UNKNOWN_ERROR;
501                    } else if (response->mContent == NULL) {
502                        result = ERROR_MALFORMED;
503                        ALOGE("The response has no content.");
504                    } else {
505                        mSessionDesc = new ASessionDescription;
506
507                        mSessionDesc->setTo(
508                                response->mContent->data(),
509                                response->mContent->size());
510
511                        if (!mSessionDesc->isValid()) {
512                            ALOGE("Failed to parse session description.");
513                            result = ERROR_MALFORMED;
514                        } else {
515                            ssize_t i = response->mHeaders.indexOfKey("content-base");
516                            if (i >= 0) {
517                                mBaseURL = response->mHeaders.valueAt(i);
518                            } else {
519                                i = response->mHeaders.indexOfKey("content-location");
520                                if (i >= 0) {
521                                    mBaseURL = response->mHeaders.valueAt(i);
522                                } else {
523                                    mBaseURL = mSessionURL;
524                                }
525                            }
526
527                            mSeekable = !isLiveStream(mSessionDesc);
528
529                            if (!mBaseURL.startsWith("rtsp://")) {
530                                // Some misbehaving servers specify a relative
531                                // URL in one of the locations above, combine
532                                // it with the absolute session URL to get
533                                // something usable...
534
535                                ALOGW("Server specified a non-absolute base URL"
536                                     ", combining it with the session URL to "
537                                     "get something usable...");
538
539                                AString tmp;
540                                CHECK(MakeURL(
541                                            mSessionURL.c_str(),
542                                            mBaseURL.c_str(),
543                                            &tmp));
544
545                                mBaseURL = tmp;
546                            }
547
548                            mControlURL = getControlURL(mSessionDesc);
549
550                            if (mSessionDesc->countTracks() < 2) {
551                                // There's no actual tracks in this session.
552                                // The first "track" is merely session meta
553                                // data.
554
555                                ALOGW("Session doesn't contain any playable "
556                                     "tracks. Aborting.");
557                                result = ERROR_UNSUPPORTED;
558                            } else {
559                                setupTrack(1);
560                            }
561                        }
562                    }
563                }
564
565                if (result != OK) {
566                    sp<AMessage> reply = new AMessage('disc', id());
567                    mConn->disconnect(reply);
568                }
569                break;
570            }
571
572            case 'sdpl':
573            {
574                int32_t result;
575                CHECK(msg->findInt32("result", &result));
576
577                ALOGI("SDP connection request completed with result %d (%s)",
578                     result, strerror(-result));
579
580                if (result == OK) {
581                    sp<RefBase> obj;
582                    CHECK(msg->findObject("description", &obj));
583                    mSessionDesc =
584                        static_cast<ASessionDescription *>(obj.get());
585
586                    if (!mSessionDesc->isValid()) {
587                        ALOGE("Failed to parse session description.");
588                        result = ERROR_MALFORMED;
589                    } else {
590                        mBaseURL = mSessionURL;
591
592                        mSeekable = !isLiveStream(mSessionDesc);
593
594                        mControlURL = getControlURL(mSessionDesc);
595
596                        if (mSessionDesc->countTracks() < 2) {
597                            // There's no actual tracks in this session.
598                            // The first "track" is merely session meta
599                            // data.
600
601                            ALOGW("Session doesn't contain any playable "
602                                 "tracks. Aborting.");
603                            result = ERROR_UNSUPPORTED;
604                        } else {
605                            setupTrack(1);
606                        }
607                    }
608                }
609
610                if (result != OK) {
611                    sp<AMessage> reply = new AMessage('disc', id());
612                    mConn->disconnect(reply);
613                }
614                break;
615            }
616
617            case 'setu':
618            {
619                size_t index;
620                CHECK(msg->findSize("index", &index));
621
622                TrackInfo *track = NULL;
623                size_t trackIndex;
624                if (msg->findSize("track-index", &trackIndex)) {
625                    track = &mTracks.editItemAt(trackIndex);
626                }
627
628                int32_t result;
629                CHECK(msg->findInt32("result", &result));
630
631                ALOGI("SETUP(%d) completed with result %d (%s)",
632                     index, result, strerror(-result));
633
634                if (result == OK) {
635                    CHECK(track != NULL);
636
637                    sp<RefBase> obj;
638                    CHECK(msg->findObject("response", &obj));
639                    sp<ARTSPResponse> response =
640                        static_cast<ARTSPResponse *>(obj.get());
641
642                    if (response->mStatusCode != 200) {
643                        result = UNKNOWN_ERROR;
644                    } else {
645                        ssize_t i = response->mHeaders.indexOfKey("session");
646                        CHECK_GE(i, 0);
647
648                        mSessionID = response->mHeaders.valueAt(i);
649
650                        mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
651                        AString timeoutStr;
652                        if (GetAttribute(
653                                    mSessionID.c_str(), "timeout", &timeoutStr)) {
654                            char *end;
655                            unsigned long timeoutSecs =
656                                strtoul(timeoutStr.c_str(), &end, 10);
657
658                            if (end == timeoutStr.c_str() || *end != '\0') {
659                                ALOGW("server specified malformed timeout '%s'",
660                                     timeoutStr.c_str());
661
662                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
663                            } else if (timeoutSecs < 15) {
664                                ALOGW("server specified too short a timeout "
665                                     "(%lu secs), using default.",
666                                     timeoutSecs);
667
668                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
669                            } else {
670                                mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
671
672                                ALOGI("server specified timeout of %lu secs.",
673                                     timeoutSecs);
674                            }
675                        }
676
677                        i = mSessionID.find(";");
678                        if (i >= 0) {
679                            // Remove options, i.e. ";timeout=90"
680                            mSessionID.erase(i, mSessionID.size() - i);
681                        }
682
683                        sp<AMessage> notify = new AMessage('accu', id());
684                        notify->setSize("track-index", trackIndex);
685
686                        i = response->mHeaders.indexOfKey("transport");
687                        CHECK_GE(i, 0);
688
689                        if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
690                            if (!track->mUsingInterleavedTCP) {
691                                AString transport = response->mHeaders.valueAt(i);
692
693                                // We are going to continue even if we were
694                                // unable to poke a hole into the firewall...
695                                pokeAHole(
696                                        track->mRTPSocket,
697                                        track->mRTCPSocket,
698                                        transport);
699                            }
700
701                            mRTPConn->addStream(
702                                    track->mRTPSocket, track->mRTCPSocket,
703                                    mSessionDesc, index,
704                                    notify, track->mUsingInterleavedTCP);
705
706                            mSetupTracksSuccessful = true;
707                        } else {
708                            result = BAD_VALUE;
709                        }
710                    }
711                }
712
713                if (result != OK) {
714                    if (track) {
715                        if (!track->mUsingInterleavedTCP) {
716                            // Clear the tag
717                            if (mUIDValid) {
718                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
719                                HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
720                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
721                                HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
722                            }
723
724                            close(track->mRTPSocket);
725                            close(track->mRTCPSocket);
726                        }
727
728                        mTracks.removeItemsAt(trackIndex);
729                    }
730                }
731
732                ++index;
733                if (result == OK && index < mSessionDesc->countTracks()) {
734                    setupTrack(index);
735                } else if (mSetupTracksSuccessful) {
736                    ++mKeepAliveGeneration;
737                    postKeepAlive();
738
739                    AString request = "PLAY ";
740                    request.append(mControlURL);
741                    request.append(" RTSP/1.0\r\n");
742
743                    request.append("Session: ");
744                    request.append(mSessionID);
745                    request.append("\r\n");
746
747                    request.append("\r\n");
748
749                    sp<AMessage> reply = new AMessage('play', id());
750                    mConn->sendRequest(request.c_str(), reply);
751                } else {
752                    sp<AMessage> reply = new AMessage('disc', id());
753                    mConn->disconnect(reply);
754                }
755                break;
756            }
757
758            case 'play':
759            {
760                int32_t result;
761                CHECK(msg->findInt32("result", &result));
762
763                ALOGI("PLAY completed with result %d (%s)",
764                     result, strerror(-result));
765
766                if (result == OK) {
767                    sp<RefBase> obj;
768                    CHECK(msg->findObject("response", &obj));
769                    sp<ARTSPResponse> response =
770                        static_cast<ARTSPResponse *>(obj.get());
771
772                    if (response->mStatusCode != 200) {
773                        result = UNKNOWN_ERROR;
774                    } else {
775                        parsePlayResponse(response);
776
777                        sp<AMessage> timeout = new AMessage('tiou', id());
778                        mCheckTimeoutGeneration++;
779                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
780                        timeout->post(kStartupTimeoutUs);
781                    }
782                }
783
784                if (result != OK) {
785                    sp<AMessage> reply = new AMessage('disc', id());
786                    mConn->disconnect(reply);
787                }
788
789                break;
790            }
791
792            case 'aliv':
793            {
794                int32_t generation;
795                CHECK(msg->findInt32("generation", &generation));
796
797                if (generation != mKeepAliveGeneration) {
798                    // obsolete event.
799                    break;
800                }
801
802                AString request;
803                request.append("OPTIONS ");
804                request.append(mSessionURL);
805                request.append(" RTSP/1.0\r\n");
806                request.append("Session: ");
807                request.append(mSessionID);
808                request.append("\r\n");
809                request.append("\r\n");
810
811                sp<AMessage> reply = new AMessage('opts', id());
812                reply->setInt32("generation", mKeepAliveGeneration);
813                mConn->sendRequest(request.c_str(), reply);
814                break;
815            }
816
817            case 'opts':
818            {
819                int32_t result;
820                CHECK(msg->findInt32("result", &result));
821
822                ALOGI("OPTIONS completed with result %d (%s)",
823                     result, strerror(-result));
824
825                int32_t generation;
826                CHECK(msg->findInt32("generation", &generation));
827
828                if (generation != mKeepAliveGeneration) {
829                    // obsolete event.
830                    break;
831                }
832
833                postKeepAlive();
834                break;
835            }
836
837            case 'abor':
838            {
839                for (size_t i = 0; i < mTracks.size(); ++i) {
840                    TrackInfo *info = &mTracks.editItemAt(i);
841
842                    if (!mFirstAccessUnit) {
843                        postQueueEOS(i, ERROR_END_OF_STREAM);
844                    }
845
846                    if (!info->mUsingInterleavedTCP) {
847                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
848
849                        // Clear the tag
850                        if (mUIDValid) {
851                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
852                            HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
853                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
854                            HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
855                        }
856
857                        close(info->mRTPSocket);
858                        close(info->mRTCPSocket);
859                    }
860                }
861                mTracks.clear();
862                mSetupTracksSuccessful = false;
863                mSeekPending = false;
864                mFirstAccessUnit = true;
865                mAllTracksHaveTime = false;
866                mNTPAnchorUs = -1;
867                mMediaAnchorUs = -1;
868                mNumAccessUnitsReceived = 0;
869                mReceivedFirstRTCPPacket = false;
870                mReceivedFirstRTPPacket = false;
871                mPausing = false;
872                mSeekable = true;
873
874                sp<AMessage> reply = new AMessage('tear', id());
875
876                int32_t reconnect;
877                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
878                    reply->setInt32("reconnect", true);
879                }
880
881                AString request;
882                request = "TEARDOWN ";
883
884                // XXX should use aggregate url from SDP here...
885                request.append(mSessionURL);
886                request.append(" RTSP/1.0\r\n");
887
888                request.append("Session: ");
889                request.append(mSessionID);
890                request.append("\r\n");
891
892                request.append("\r\n");
893
894                mConn->sendRequest(request.c_str(), reply);
895                break;
896            }
897
898            case 'tear':
899            {
900                int32_t result;
901                CHECK(msg->findInt32("result", &result));
902
903                ALOGI("TEARDOWN completed with result %d (%s)",
904                     result, strerror(-result));
905
906                sp<AMessage> reply = new AMessage('disc', id());
907
908                int32_t reconnect;
909                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
910                    reply->setInt32("reconnect", true);
911                }
912
913                mConn->disconnect(reply);
914                break;
915            }
916
917            case 'quit':
918            {
919                sp<AMessage> msg = mNotify->dup();
920                msg->setInt32("what", kWhatDisconnected);
921                msg->setInt32("result", UNKNOWN_ERROR);
922                msg->post();
923                break;
924            }
925
926            case 'chek':
927            {
928                int32_t generation;
929                CHECK(msg->findInt32("generation", &generation));
930                if (generation != mCheckGeneration) {
931                    // This is an outdated message. Ignore.
932                    break;
933                }
934
935                if (mNumAccessUnitsReceived == 0) {
936#if 1
937                    ALOGI("stream ended? aborting.");
938                    (new AMessage('abor', id()))->post();
939                    break;
940#else
941                    ALOGI("haven't seen an AU in a looong time.");
942#endif
943                }
944
945                mNumAccessUnitsReceived = 0;
946                msg->post(kAccessUnitTimeoutUs);
947                break;
948            }
949
950            case 'accu':
951            {
952                int32_t timeUpdate;
953                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
954                    size_t trackIndex;
955                    CHECK(msg->findSize("track-index", &trackIndex));
956
957                    uint32_t rtpTime;
958                    uint64_t ntpTime;
959                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
960                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
961
962                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
963                    break;
964                }
965
966                int32_t first;
967                if (msg->findInt32("first-rtcp", &first)) {
968                    mReceivedFirstRTCPPacket = true;
969                    break;
970                }
971
972                if (msg->findInt32("first-rtp", &first)) {
973                    mReceivedFirstRTPPacket = true;
974                    break;
975                }
976
977                ++mNumAccessUnitsReceived;
978                postAccessUnitTimeoutCheck();
979
980                size_t trackIndex;
981                CHECK(msg->findSize("track-index", &trackIndex));
982
983                if (trackIndex >= mTracks.size()) {
984                    ALOGV("late packets ignored.");
985                    break;
986                }
987
988                TrackInfo *track = &mTracks.editItemAt(trackIndex);
989
990                int32_t eos;
991                if (msg->findInt32("eos", &eos)) {
992                    ALOGI("received BYE on track index %d", trackIndex);
993                    if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
994                        ALOGI("No time established => fake existing data");
995
996                        track->mEOSReceived = true;
997                        mTryFakeRTCP = true;
998                        mReceivedFirstRTCPPacket = true;
999                        fakeTimestamps();
1000                    } else {
1001                        postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1002                    }
1003                    return;
1004                }
1005
1006                sp<ABuffer> accessUnit;
1007                CHECK(msg->findBuffer("access-unit", &accessUnit));
1008
1009                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1010
1011                if (mSeekPending) {
1012                    ALOGV("we're seeking, dropping stale packet.");
1013                    break;
1014                }
1015
1016                if (seqNum < track->mFirstSeqNumInSegment) {
1017                    ALOGV("dropping stale access-unit (%d < %d)",
1018                         seqNum, track->mFirstSeqNumInSegment);
1019                    break;
1020                }
1021
1022                if (track->mNewSegment) {
1023                    track->mNewSegment = false;
1024                }
1025
1026                onAccessUnitComplete(trackIndex, accessUnit);
1027                break;
1028            }
1029
1030            case 'paus':
1031            {
1032                int32_t generation;
1033                CHECK(msg->findInt32("pausecheck", &generation));
1034                if (generation != mPauseGeneration) {
1035                    ALOGV("Ignoring outdated pause message.");
1036                    break;
1037                }
1038
1039                if (!mSeekable) {
1040                    ALOGW("This is a live stream, ignoring pause request.");
1041                    break;
1042                }
1043                mCheckPending = true;
1044                ++mCheckGeneration;
1045                mPausing = true;
1046
1047                AString request = "PAUSE ";
1048                request.append(mControlURL);
1049                request.append(" RTSP/1.0\r\n");
1050
1051                request.append("Session: ");
1052                request.append(mSessionID);
1053                request.append("\r\n");
1054
1055                request.append("\r\n");
1056
1057                sp<AMessage> reply = new AMessage('pau2', id());
1058                mConn->sendRequest(request.c_str(), reply);
1059                break;
1060            }
1061
1062            case 'pau2':
1063            {
1064                int32_t result;
1065                CHECK(msg->findInt32("result", &result));
1066                mCheckTimeoutGeneration++;
1067
1068                ALOGI("PAUSE completed with result %d (%s)",
1069                     result, strerror(-result));
1070                break;
1071            }
1072
1073            case 'resu':
1074            {
1075                if (mPausing && mSeekPending) {
1076                    // If seeking, Play will be sent from see1 instead
1077                    break;
1078                }
1079
1080                if (!mPausing) {
1081                    // Dont send PLAY if we have not paused
1082                    break;
1083                }
1084                AString request = "PLAY ";
1085                request.append(mControlURL);
1086                request.append(" RTSP/1.0\r\n");
1087
1088                request.append("Session: ");
1089                request.append(mSessionID);
1090                request.append("\r\n");
1091
1092                request.append("\r\n");
1093
1094                sp<AMessage> reply = new AMessage('res2', id());
1095                mConn->sendRequest(request.c_str(), reply);
1096                break;
1097            }
1098
1099            case 'res2':
1100            {
1101                int32_t result;
1102                CHECK(msg->findInt32("result", &result));
1103
1104                ALOGI("PLAY completed with result %d (%s)",
1105                     result, strerror(-result));
1106
1107                mCheckPending = false;
1108                postAccessUnitTimeoutCheck();
1109
1110                if (result == OK) {
1111                    sp<RefBase> obj;
1112                    CHECK(msg->findObject("response", &obj));
1113                    sp<ARTSPResponse> response =
1114                        static_cast<ARTSPResponse *>(obj.get());
1115
1116                    if (response->mStatusCode != 200) {
1117                        result = UNKNOWN_ERROR;
1118                    } else {
1119                        parsePlayResponse(response);
1120
1121                        // Post new timeout in order to make sure to use
1122                        // fake timestamps if no new Sender Reports arrive
1123                        sp<AMessage> timeout = new AMessage('tiou', id());
1124                        mCheckTimeoutGeneration++;
1125                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1126                        timeout->post(kStartupTimeoutUs);
1127                    }
1128                }
1129
1130                if (result != OK) {
1131                    ALOGE("resume failed, aborting.");
1132                    (new AMessage('abor', id()))->post();
1133                }
1134
1135                mPausing = false;
1136                break;
1137            }
1138
1139            case 'seek':
1140            {
1141                if (!mSeekable) {
1142                    ALOGW("This is a live stream, ignoring seek request.");
1143
1144                    sp<AMessage> msg = mNotify->dup();
1145                    msg->setInt32("what", kWhatSeekDone);
1146                    msg->post();
1147                    break;
1148                }
1149
1150                int64_t timeUs;
1151                CHECK(msg->findInt64("time", &timeUs));
1152
1153                mSeekPending = true;
1154
1155                // Disable the access unit timeout until we resumed
1156                // playback again.
1157                mCheckPending = true;
1158                ++mCheckGeneration;
1159
1160                sp<AMessage> reply = new AMessage('see1', id());
1161                reply->setInt64("time", timeUs);
1162
1163                if (mPausing) {
1164                    // PAUSE already sent
1165                    ALOGI("Pause already sent");
1166                    reply->post();
1167                    break;
1168                }
1169                AString request = "PAUSE ";
1170                request.append(mControlURL);
1171                request.append(" RTSP/1.0\r\n");
1172
1173                request.append("Session: ");
1174                request.append(mSessionID);
1175                request.append("\r\n");
1176
1177                request.append("\r\n");
1178
1179                mConn->sendRequest(request.c_str(), reply);
1180                break;
1181            }
1182
1183            case 'see1':
1184            {
1185                // Session is paused now.
1186                for (size_t i = 0; i < mTracks.size(); ++i) {
1187                    TrackInfo *info = &mTracks.editItemAt(i);
1188
1189                    postQueueSeekDiscontinuity(i);
1190                    info->mEOSReceived = false;
1191
1192                    info->mRTPAnchor = 0;
1193                    info->mNTPAnchorUs = -1;
1194                }
1195
1196                mAllTracksHaveTime = false;
1197                mNTPAnchorUs = -1;
1198
1199                // Start new timeoutgeneration to avoid getting timeout
1200                // before PLAY response arrive
1201                sp<AMessage> timeout = new AMessage('tiou', id());
1202                mCheckTimeoutGeneration++;
1203                timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1204                timeout->post(kStartupTimeoutUs);
1205
1206                int64_t timeUs;
1207                CHECK(msg->findInt64("time", &timeUs));
1208
1209                AString request = "PLAY ";
1210                request.append(mControlURL);
1211                request.append(" RTSP/1.0\r\n");
1212
1213                request.append("Session: ");
1214                request.append(mSessionID);
1215                request.append("\r\n");
1216
1217                request.append(
1218                        StringPrintf(
1219                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1220
1221                request.append("\r\n");
1222
1223                sp<AMessage> reply = new AMessage('see2', id());
1224                mConn->sendRequest(request.c_str(), reply);
1225                break;
1226            }
1227
1228            case 'see2':
1229            {
1230                if (mTracks.size() == 0) {
1231                    // We have already hit abor, break
1232                    break;
1233                }
1234
1235                int32_t result;
1236                CHECK(msg->findInt32("result", &result));
1237
1238                ALOGI("PLAY completed with result %d (%s)",
1239                     result, strerror(-result));
1240
1241                mCheckPending = false;
1242                postAccessUnitTimeoutCheck();
1243
1244                if (result == OK) {
1245                    sp<RefBase> obj;
1246                    CHECK(msg->findObject("response", &obj));
1247                    sp<ARTSPResponse> response =
1248                        static_cast<ARTSPResponse *>(obj.get());
1249
1250                    if (response->mStatusCode != 200) {
1251                        result = UNKNOWN_ERROR;
1252                    } else {
1253                        parsePlayResponse(response);
1254
1255                        // Post new timeout in order to make sure to use
1256                        // fake timestamps if no new Sender Reports arrive
1257                        sp<AMessage> timeout = new AMessage('tiou', id());
1258                        mCheckTimeoutGeneration++;
1259                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1260                        timeout->post(kStartupTimeoutUs);
1261
1262                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1263                        CHECK_GE(i, 0);
1264
1265                        ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1266
1267                        ALOGI("seek completed.");
1268                    }
1269                }
1270
1271                if (result != OK) {
1272                    ALOGE("seek failed, aborting.");
1273                    (new AMessage('abor', id()))->post();
1274                }
1275
1276                mPausing = false;
1277                mSeekPending = false;
1278
1279                sp<AMessage> msg = mNotify->dup();
1280                msg->setInt32("what", kWhatSeekDone);
1281                msg->post();
1282                break;
1283            }
1284
1285            case 'biny':
1286            {
1287                sp<ABuffer> buffer;
1288                CHECK(msg->findBuffer("buffer", &buffer));
1289
1290                int32_t index;
1291                CHECK(buffer->meta()->findInt32("index", &index));
1292
1293                mRTPConn->injectPacket(index, buffer);
1294                break;
1295            }
1296
1297            case 'tiou':
1298            {
1299                int32_t timeoutGenerationCheck;
1300                CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1301                if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1302                    // This is an outdated message. Ignore.
1303                    // This typically happens if a lot of seeks are
1304                    // performed, since new timeout messages now are
1305                    // posted at seek as well.
1306                    break;
1307                }
1308                if (!mReceivedFirstRTCPPacket) {
1309                    if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1310                        ALOGW("We received RTP packets but no RTCP packets, "
1311                             "using fake timestamps.");
1312
1313                        mTryFakeRTCP = true;
1314
1315                        mReceivedFirstRTCPPacket = true;
1316
1317                        fakeTimestamps();
1318                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1319                        ALOGW("Never received any data, switching transports.");
1320
1321                        mTryTCPInterleaving = true;
1322
1323                        sp<AMessage> msg = new AMessage('abor', id());
1324                        msg->setInt32("reconnect", true);
1325                        msg->post();
1326                    } else {
1327                        ALOGW("Never received any data, disconnecting.");
1328                        (new AMessage('abor', id()))->post();
1329                    }
1330                } else {
1331                    if (!mAllTracksHaveTime) {
1332                        ALOGW("We received some RTCP packets, but time "
1333                              "could not be established on all tracks, now "
1334                              "using fake timestamps");
1335
1336                        fakeTimestamps();
1337                    }
1338                }
1339                break;
1340            }
1341
1342            default:
1343                TRESPASS();
1344                break;
1345        }
1346    }
1347
1348    void postKeepAlive() {
1349        sp<AMessage> msg = new AMessage('aliv', id());
1350        msg->setInt32("generation", mKeepAliveGeneration);
1351        msg->post((mKeepAliveTimeoutUs * 9) / 10);
1352    }
1353
1354    void postAccessUnitTimeoutCheck() {
1355        if (mCheckPending) {
1356            return;
1357        }
1358
1359        mCheckPending = true;
1360        sp<AMessage> check = new AMessage('chek', id());
1361        check->setInt32("generation", mCheckGeneration);
1362        check->post(kAccessUnitTimeoutUs);
1363    }
1364
1365    static void SplitString(
1366            const AString &s, const char *separator, List<AString> *items) {
1367        items->clear();
1368        size_t start = 0;
1369        while (start < s.size()) {
1370            ssize_t offset = s.find(separator, start);
1371
1372            if (offset < 0) {
1373                items->push_back(AString(s, start, s.size() - start));
1374                break;
1375            }
1376
1377            items->push_back(AString(s, start, offset - start));
1378            start = offset + strlen(separator);
1379        }
1380    }
1381
1382    void parsePlayResponse(const sp<ARTSPResponse> &response) {
1383        mPlayResponseParsed = true;
1384        if (mTracks.size() == 0) {
1385            ALOGV("parsePlayResponse: late packets ignored.");
1386            return;
1387        }
1388
1389        ssize_t i = response->mHeaders.indexOfKey("range");
1390        if (i < 0) {
1391            // Server doesn't even tell use what range it is going to
1392            // play, therefore we won't support seeking.
1393            return;
1394        }
1395
1396        AString range = response->mHeaders.valueAt(i);
1397        ALOGV("Range: %s", range.c_str());
1398
1399        AString val;
1400        CHECK(GetAttribute(range.c_str(), "npt", &val));
1401
1402        float npt1, npt2;
1403        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1404            // This is a live stream and therefore not seekable.
1405
1406            ALOGI("This is a live stream");
1407            return;
1408        }
1409
1410        i = response->mHeaders.indexOfKey("rtp-info");
1411        CHECK_GE(i, 0);
1412
1413        AString rtpInfo = response->mHeaders.valueAt(i);
1414        List<AString> streamInfos;
1415        SplitString(rtpInfo, ",", &streamInfos);
1416
1417        int n = 1;
1418        for (List<AString>::iterator it = streamInfos.begin();
1419             it != streamInfos.end(); ++it) {
1420            (*it).trim();
1421            ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1422
1423            CHECK(GetAttribute((*it).c_str(), "url", &val));
1424
1425            size_t trackIndex = 0;
1426            while (trackIndex < mTracks.size()
1427                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1428                ++trackIndex;
1429            }
1430            CHECK_LT(trackIndex, mTracks.size());
1431
1432            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1433
1434            char *end;
1435            unsigned long seq = strtoul(val.c_str(), &end, 10);
1436
1437            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1438            info->mFirstSeqNumInSegment = seq;
1439            info->mNewSegment = true;
1440
1441            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1442
1443            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1444
1445            ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1446
1447            info->mNormalPlayTimeRTP = rtpTime;
1448            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1449
1450            if (!mFirstAccessUnit) {
1451                postNormalPlayTimeMapping(
1452                        trackIndex,
1453                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1454            }
1455
1456            ++n;
1457        }
1458    }
1459
1460    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1461        CHECK_GE(index, 0u);
1462        CHECK_LT(index, mTracks.size());
1463
1464        const TrackInfo &info = mTracks.itemAt(index);
1465
1466        *timeScale = info.mTimeScale;
1467
1468        return info.mPacketSource->getFormat();
1469    }
1470
1471    size_t countTracks() const {
1472        return mTracks.size();
1473    }
1474
1475private:
1476    struct TrackInfo {
1477        AString mURL;
1478        int mRTPSocket;
1479        int mRTCPSocket;
1480        bool mUsingInterleavedTCP;
1481        uint32_t mFirstSeqNumInSegment;
1482        bool mNewSegment;
1483
1484        uint32_t mRTPAnchor;
1485        int64_t mNTPAnchorUs;
1486        int32_t mTimeScale;
1487        bool mEOSReceived;
1488
1489        uint32_t mNormalPlayTimeRTP;
1490        int64_t mNormalPlayTimeUs;
1491
1492        sp<APacketSource> mPacketSource;
1493
1494        // Stores packets temporarily while no notion of time
1495        // has been established yet.
1496        List<sp<ABuffer> > mPackets;
1497    };
1498
1499    sp<AMessage> mNotify;
1500    bool mUIDValid;
1501    uid_t mUID;
1502    sp<ALooper> mNetLooper;
1503    sp<ARTSPConnection> mConn;
1504    sp<ARTPConnection> mRTPConn;
1505    sp<ASessionDescription> mSessionDesc;
1506    AString mOriginalSessionURL;  // This one still has user:pass@
1507    AString mSessionURL;
1508    AString mSessionHost;
1509    AString mBaseURL;
1510    AString mControlURL;
1511    AString mSessionID;
1512    bool mSetupTracksSuccessful;
1513    bool mSeekPending;
1514    bool mFirstAccessUnit;
1515
1516    bool mAllTracksHaveTime;
1517    int64_t mNTPAnchorUs;
1518    int64_t mMediaAnchorUs;
1519    int64_t mLastMediaTimeUs;
1520
1521    int64_t mNumAccessUnitsReceived;
1522    bool mCheckPending;
1523    int32_t mCheckGeneration;
1524    int32_t mCheckTimeoutGeneration;
1525    bool mTryTCPInterleaving;
1526    bool mTryFakeRTCP;
1527    bool mReceivedFirstRTCPPacket;
1528    bool mReceivedFirstRTPPacket;
1529    bool mSeekable;
1530    int64_t mKeepAliveTimeoutUs;
1531    int32_t mKeepAliveGeneration;
1532    bool mPausing;
1533    int32_t mPauseGeneration;
1534
1535    Vector<TrackInfo> mTracks;
1536
1537    bool mPlayResponseParsed;
1538
1539    void setupTrack(size_t index) {
1540        sp<APacketSource> source =
1541            new APacketSource(mSessionDesc, index);
1542
1543        if (source->initCheck() != OK) {
1544            ALOGW("Unsupported format. Ignoring track #%d.", index);
1545
1546            sp<AMessage> reply = new AMessage('setu', id());
1547            reply->setSize("index", index);
1548            reply->setInt32("result", ERROR_UNSUPPORTED);
1549            reply->post();
1550            return;
1551        }
1552
1553        AString url;
1554        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1555
1556        AString trackURL;
1557        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1558
1559        mTracks.push(TrackInfo());
1560        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1561        info->mURL = trackURL;
1562        info->mPacketSource = source;
1563        info->mUsingInterleavedTCP = false;
1564        info->mFirstSeqNumInSegment = 0;
1565        info->mNewSegment = true;
1566        info->mRTPSocket = -1;
1567        info->mRTCPSocket = -1;
1568        info->mRTPAnchor = 0;
1569        info->mNTPAnchorUs = -1;
1570        info->mNormalPlayTimeRTP = 0;
1571        info->mNormalPlayTimeUs = 0ll;
1572
1573        unsigned long PT;
1574        AString formatDesc;
1575        AString formatParams;
1576        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1577
1578        int32_t timescale;
1579        int32_t numChannels;
1580        ASessionDescription::ParseFormatDesc(
1581                formatDesc.c_str(), &timescale, &numChannels);
1582
1583        info->mTimeScale = timescale;
1584        info->mEOSReceived = false;
1585
1586        ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1587
1588        AString request = "SETUP ";
1589        request.append(trackURL);
1590        request.append(" RTSP/1.0\r\n");
1591
1592        if (mTryTCPInterleaving) {
1593            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1594            info->mUsingInterleavedTCP = true;
1595            info->mRTPSocket = interleaveIndex;
1596            info->mRTCPSocket = interleaveIndex + 1;
1597
1598            request.append("Transport: RTP/AVP/TCP;interleaved=");
1599            request.append(interleaveIndex);
1600            request.append("-");
1601            request.append(interleaveIndex + 1);
1602        } else {
1603            unsigned rtpPort;
1604            ARTPConnection::MakePortPair(
1605                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1606
1607            if (mUIDValid) {
1608                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1609                                                (uint32_t)*(uint32_t*) "RTP_");
1610                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1611                                                (uint32_t)*(uint32_t*) "RTP_");
1612                HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1613                HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1614            }
1615
1616            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1617            request.append(rtpPort);
1618            request.append("-");
1619            request.append(rtpPort + 1);
1620        }
1621
1622        request.append("\r\n");
1623
1624        if (index > 1) {
1625            request.append("Session: ");
1626            request.append(mSessionID);
1627            request.append("\r\n");
1628        }
1629
1630        request.append("\r\n");
1631
1632        sp<AMessage> reply = new AMessage('setu', id());
1633        reply->setSize("index", index);
1634        reply->setSize("track-index", mTracks.size() - 1);
1635        mConn->sendRequest(request.c_str(), reply);
1636    }
1637
1638    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1639        out->clear();
1640
1641        if (strncasecmp("rtsp://", baseURL, 7)) {
1642            // Base URL must be absolute
1643            return false;
1644        }
1645
1646        if (!strncasecmp("rtsp://", url, 7)) {
1647            // "url" is already an absolute URL, ignore base URL.
1648            out->setTo(url);
1649            return true;
1650        }
1651
1652        size_t n = strlen(baseURL);
1653        if (baseURL[n - 1] == '/') {
1654            out->setTo(baseURL);
1655            out->append(url);
1656        } else {
1657            const char *slashPos = strrchr(baseURL, '/');
1658
1659            if (slashPos > &baseURL[6]) {
1660                out->setTo(baseURL, slashPos - baseURL);
1661            } else {
1662                out->setTo(baseURL);
1663            }
1664
1665            out->append("/");
1666            out->append(url);
1667        }
1668
1669        return true;
1670    }
1671
1672    void fakeTimestamps() {
1673        mNTPAnchorUs = -1ll;
1674        for (size_t i = 0; i < mTracks.size(); ++i) {
1675            onTimeUpdate(i, 0, 0ll);
1676        }
1677    }
1678
1679    bool dataReceivedOnAllChannels() {
1680        TrackInfo *track;
1681        for (size_t i = 0; i < mTracks.size(); ++i) {
1682            track = &mTracks.editItemAt(i);
1683            if (track->mPackets.empty()) {
1684                return false;
1685            }
1686        }
1687        return true;
1688    }
1689
1690    void handleFirstAccessUnit() {
1691        if (mFirstAccessUnit) {
1692            sp<AMessage> msg = mNotify->dup();
1693            msg->setInt32("what", kWhatConnected);
1694            msg->post();
1695
1696            if (mSeekable) {
1697                for (size_t i = 0; i < mTracks.size(); ++i) {
1698                    TrackInfo *info = &mTracks.editItemAt(i);
1699
1700                    postNormalPlayTimeMapping(
1701                            i,
1702                            info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1703                }
1704            }
1705
1706            mFirstAccessUnit = false;
1707        }
1708    }
1709
1710    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1711        ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1712             trackIndex, rtpTime, ntpTime);
1713
1714        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1715
1716        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1717
1718        track->mRTPAnchor = rtpTime;
1719        track->mNTPAnchorUs = ntpTimeUs;
1720
1721        if (mNTPAnchorUs < 0) {
1722            mNTPAnchorUs = ntpTimeUs;
1723            mMediaAnchorUs = mLastMediaTimeUs;
1724        }
1725
1726        if (!mAllTracksHaveTime) {
1727            bool allTracksHaveTime = true;
1728            for (size_t i = 0; i < mTracks.size(); ++i) {
1729                TrackInfo *track = &mTracks.editItemAt(i);
1730                if (track->mNTPAnchorUs < 0) {
1731                    allTracksHaveTime = false;
1732                    break;
1733                }
1734            }
1735            if (allTracksHaveTime) {
1736                mAllTracksHaveTime = true;
1737                ALOGI("Time now established for all tracks.");
1738            }
1739        }
1740        if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1741            handleFirstAccessUnit();
1742
1743            // Time is now established, lets start timestamping immediately
1744            for (size_t i = 0; i < mTracks.size(); ++i) {
1745                TrackInfo *trackInfo = &mTracks.editItemAt(i);
1746                while (!trackInfo->mPackets.empty()) {
1747                    sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
1748                    trackInfo->mPackets.erase(trackInfo->mPackets.begin());
1749
1750                    if (addMediaTimestamp(i, trackInfo, accessUnit)) {
1751                        postQueueAccessUnit(i, accessUnit);
1752                    }
1753                }
1754            }
1755            for (size_t i = 0; i < mTracks.size(); ++i) {
1756                TrackInfo *trackInfo = &mTracks.editItemAt(i);
1757                if (trackInfo->mEOSReceived) {
1758                    postQueueEOS(i, ERROR_END_OF_STREAM);
1759                    trackInfo->mEOSReceived = false;
1760                }
1761            }
1762        }
1763    }
1764
1765    void onAccessUnitComplete(
1766            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1767        ALOGV("onAccessUnitComplete track %d", trackIndex);
1768
1769        if(!mPlayResponseParsed){
1770            ALOGI("play response is not parsed, storing accessunit");
1771            TrackInfo *track = &mTracks.editItemAt(trackIndex);
1772            track->mPackets.push_back(accessUnit);
1773            return;
1774        }
1775
1776        handleFirstAccessUnit();
1777
1778        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1779
1780        if (!mAllTracksHaveTime) {
1781            ALOGV("storing accessUnit, no time established yet");
1782            track->mPackets.push_back(accessUnit);
1783            return;
1784        }
1785
1786        while (!track->mPackets.empty()) {
1787            sp<ABuffer> accessUnit = *track->mPackets.begin();
1788            track->mPackets.erase(track->mPackets.begin());
1789
1790            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1791                postQueueAccessUnit(trackIndex, accessUnit);
1792            }
1793        }
1794
1795        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1796            postQueueAccessUnit(trackIndex, accessUnit);
1797        }
1798
1799        if (track->mEOSReceived) {
1800            postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1801            track->mEOSReceived = false;
1802        }
1803    }
1804
1805    bool addMediaTimestamp(
1806            int32_t trackIndex, const TrackInfo *track,
1807            const sp<ABuffer> &accessUnit) {
1808        uint32_t rtpTime;
1809        CHECK(accessUnit->meta()->findInt32(
1810                    "rtp-time", (int32_t *)&rtpTime));
1811
1812        int64_t relRtpTimeUs =
1813            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1814                / track->mTimeScale;
1815
1816        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1817
1818        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1819
1820        if (mediaTimeUs > mLastMediaTimeUs) {
1821            mLastMediaTimeUs = mediaTimeUs;
1822        }
1823
1824        if (mediaTimeUs < 0) {
1825            ALOGV("dropping early accessUnit.");
1826            return false;
1827        }
1828
1829        ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1830             trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1831
1832        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1833
1834        return true;
1835    }
1836
1837    void postQueueAccessUnit(
1838            size_t trackIndex, const sp<ABuffer> &accessUnit) {
1839        sp<AMessage> msg = mNotify->dup();
1840        msg->setInt32("what", kWhatAccessUnit);
1841        msg->setSize("trackIndex", trackIndex);
1842        msg->setBuffer("accessUnit", accessUnit);
1843        msg->post();
1844    }
1845
1846    void postQueueEOS(size_t trackIndex, status_t finalResult) {
1847        sp<AMessage> msg = mNotify->dup();
1848        msg->setInt32("what", kWhatEOS);
1849        msg->setSize("trackIndex", trackIndex);
1850        msg->setInt32("finalResult", finalResult);
1851        msg->post();
1852    }
1853
1854    void postQueueSeekDiscontinuity(size_t trackIndex) {
1855        sp<AMessage> msg = mNotify->dup();
1856        msg->setInt32("what", kWhatSeekDiscontinuity);
1857        msg->setSize("trackIndex", trackIndex);
1858        msg->post();
1859    }
1860
1861    void postNormalPlayTimeMapping(
1862            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1863        sp<AMessage> msg = mNotify->dup();
1864        msg->setInt32("what", kWhatNormalPlayTimeMapping);
1865        msg->setSize("trackIndex", trackIndex);
1866        msg->setInt32("rtpTime", rtpTime);
1867        msg->setInt64("nptUs", nptUs);
1868        msg->post();
1869    }
1870
1871    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1872};
1873
1874}  // namespace android
1875
1876#endif  // MY_HANDLER_H_
1877