1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22
23#ifndef LOG_TAG
24#define LOG_TAG "MyHandler"
25#endif
26
27#include <utils/Log.h>
28
29#include "APacketSource.h"
30#include "ARTPConnection.h"
31#include "ARTSPConnection.h"
32#include "ASessionDescription.h"
33
34#include <ctype.h>
35
36#include <media/stagefright/foundation/ABuffer.h>
37#include <media/stagefright/foundation/ADebug.h>
38#include <media/stagefright/foundation/ALooper.h>
39#include <media/stagefright/foundation/AMessage.h>
40#include <media/stagefright/MediaErrors.h>
41#include <media/stagefright/Utils.h>
42
43#include <arpa/inet.h>
44#include <sys/socket.h>
45#include <netdb.h>
46
47#include "HTTPBase.h"
48
49#if LOG_NDEBUG
50#define UNUSED_UNLESS_VERBOSE(x) (void)(x)
51#else
52#define UNUSED_UNLESS_VERBOSE(x)
53#endif
54
55// If no access units are received within 5 secs, assume that the rtp
56// stream has ended and signal end of stream.
57static int64_t kAccessUnitTimeoutUs = 10000000ll;
58
59// If no access units arrive for the first 10 secs after starting the
60// stream, assume none ever will and signal EOS or switch transports.
61static int64_t kStartupTimeoutUs = 10000000ll;
62
63static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
64
65static int64_t kPauseDelayUs = 3000000ll;
66
67namespace android {
68
69static bool GetAttribute(const char *s, const char *key, AString *value) {
70    value->clear();
71
72    size_t keyLen = strlen(key);
73
74    for (;;) {
75        while (isspace(*s)) {
76            ++s;
77        }
78
79        const char *colonPos = strchr(s, ';');
80
81        size_t len =
82            (colonPos == NULL) ? strlen(s) : colonPos - s;
83
84        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
85            value->setTo(&s[keyLen + 1], len - keyLen - 1);
86            return true;
87        }
88
89        if (colonPos == NULL) {
90            return false;
91        }
92
93        s = colonPos + 1;
94    }
95}
96
97struct MyHandler : public AHandler {
98    enum {
99        kWhatConnected                  = 'conn',
100        kWhatDisconnected               = 'disc',
101        kWhatSeekPaused                 = 'spau',
102        kWhatSeekDone                   = 'sdon',
103
104        kWhatAccessUnit                 = 'accU',
105        kWhatEOS                        = 'eos!',
106        kWhatSeekDiscontinuity          = 'seeD',
107        kWhatNormalPlayTimeMapping      = 'nptM',
108    };
109
110    MyHandler(
111            const char *url,
112            const sp<AMessage> &notify,
113            bool uidValid = false, uid_t uid = 0)
114        : mNotify(notify),
115          mUIDValid(uidValid),
116          mUID(uid),
117          mNetLooper(new ALooper),
118          mConn(new ARTSPConnection(mUIDValid, mUID)),
119          mRTPConn(new ARTPConnection),
120          mOriginalSessionURL(url),
121          mSessionURL(url),
122          mSetupTracksSuccessful(false),
123          mSeekPending(false),
124          mFirstAccessUnit(true),
125          mAllTracksHaveTime(false),
126          mNTPAnchorUs(-1),
127          mMediaAnchorUs(-1),
128          mLastMediaTimeUs(0),
129          mNumAccessUnitsReceived(0),
130          mCheckPending(false),
131          mCheckGeneration(0),
132          mCheckTimeoutGeneration(0),
133          mTryTCPInterleaving(false),
134          mTryFakeRTCP(false),
135          mReceivedFirstRTCPPacket(false),
136          mReceivedFirstRTPPacket(false),
137          mSeekable(true),
138          mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
139          mKeepAliveGeneration(0),
140          mPausing(false),
141          mPauseGeneration(0),
142          mPlayResponseParsed(false) {
143        mNetLooper->setName("rtsp net");
144        mNetLooper->start(false /* runOnCallingThread */,
145                          false /* canCallJava */,
146                          PRIORITY_HIGHEST);
147
148        // Strip any authentication info from the session url, we don't
149        // want to transmit user/pass in cleartext.
150        AString host, path, user, pass;
151        unsigned port;
152        CHECK(ARTSPConnection::ParseURL(
153                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
154
155        if (user.size() > 0) {
156            mSessionURL.clear();
157            mSessionURL.append("rtsp://");
158            mSessionURL.append(host);
159            mSessionURL.append(":");
160            mSessionURL.append(AStringPrintf("%u", port));
161            mSessionURL.append(path);
162
163            ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
164        }
165
166        mSessionHost = host;
167    }
168
169    void connect() {
170        looper()->registerHandler(mConn);
171        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
172
173        sp<AMessage> notify = new AMessage('biny', this);
174        mConn->observeBinaryData(notify);
175
176        sp<AMessage> reply = new AMessage('conn', this);
177        mConn->connect(mOriginalSessionURL.c_str(), reply);
178    }
179
180    void loadSDP(const sp<ASessionDescription>& desc) {
181        looper()->registerHandler(mConn);
182        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
183
184        sp<AMessage> notify = new AMessage('biny', this);
185        mConn->observeBinaryData(notify);
186
187        sp<AMessage> reply = new AMessage('sdpl', this);
188        reply->setObject("description", desc);
189        mConn->connect(mOriginalSessionURL.c_str(), reply);
190    }
191
192    AString getControlURL() {
193        AString sessionLevelControlURL;
194        if (mSessionDesc->findAttribute(
195                0,
196                "a=control",
197                &sessionLevelControlURL)) {
198            if (sessionLevelControlURL.compare("*") == 0) {
199                return mBaseURL;
200            } else {
201                AString controlURL;
202                CHECK(MakeURL(
203                        mBaseURL.c_str(),
204                        sessionLevelControlURL.c_str(),
205                        &controlURL));
206                return controlURL;
207            }
208        } else {
209            return mSessionURL;
210        }
211    }
212
213    void disconnect() {
214        (new AMessage('abor', this))->post();
215    }
216
217    void seek(int64_t timeUs) {
218        sp<AMessage> msg = new AMessage('seek', this);
219        msg->setInt64("time", timeUs);
220        mPauseGeneration++;
221        msg->post();
222    }
223
224    void continueSeekAfterPause(int64_t timeUs) {
225        sp<AMessage> msg = new AMessage('see1', this);
226        msg->setInt64("time", timeUs);
227        msg->post();
228    }
229
230    bool isSeekable() const {
231        return mSeekable;
232    }
233
234    void pause() {
235        sp<AMessage> msg = new AMessage('paus', this);
236        mPauseGeneration++;
237        msg->setInt32("pausecheck", mPauseGeneration);
238        msg->post(kPauseDelayUs);
239    }
240
241    void resume() {
242        sp<AMessage> msg = new AMessage('resu', this);
243        mPauseGeneration++;
244        msg->post();
245    }
246
247    static void addRR(const sp<ABuffer> &buf) {
248        uint8_t *ptr = buf->data() + buf->size();
249        ptr[0] = 0x80 | 0;
250        ptr[1] = 201;  // RR
251        ptr[2] = 0;
252        ptr[3] = 1;
253        ptr[4] = 0xde;  // SSRC
254        ptr[5] = 0xad;
255        ptr[6] = 0xbe;
256        ptr[7] = 0xef;
257
258        buf->setRange(0, buf->size() + 8);
259    }
260
261    static void addSDES(int s, const sp<ABuffer> &buffer) {
262        struct sockaddr_in addr;
263        socklen_t addrSize = sizeof(addr);
264        if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
265            inet_aton("0.0.0.0", &(addr.sin_addr));
266        }
267
268        uint8_t *data = buffer->data() + buffer->size();
269        data[0] = 0x80 | 1;
270        data[1] = 202;  // SDES
271        data[4] = 0xde;  // SSRC
272        data[5] = 0xad;
273        data[6] = 0xbe;
274        data[7] = 0xef;
275
276        size_t offset = 8;
277
278        data[offset++] = 1;  // CNAME
279
280        AString cname = "stagefright@";
281        cname.append(inet_ntoa(addr.sin_addr));
282        data[offset++] = cname.size();
283
284        memcpy(&data[offset], cname.c_str(), cname.size());
285        offset += cname.size();
286
287        data[offset++] = 6;  // TOOL
288
289        AString tool = MakeUserAgent();
290
291        data[offset++] = tool.size();
292
293        memcpy(&data[offset], tool.c_str(), tool.size());
294        offset += tool.size();
295
296        data[offset++] = 0;
297
298        if ((offset % 4) > 0) {
299            size_t count = 4 - (offset % 4);
300            switch (count) {
301                case 3:
302                    data[offset++] = 0;
303                case 2:
304                    data[offset++] = 0;
305                case 1:
306                    data[offset++] = 0;
307            }
308        }
309
310        size_t numWords = (offset / 4) - 1;
311        data[2] = numWords >> 8;
312        data[3] = numWords & 0xff;
313
314        buffer->setRange(buffer->offset(), buffer->size() + offset);
315    }
316
317    // In case we're behind NAT, fire off two UDP packets to the remote
318    // rtp/rtcp ports to poke a hole into the firewall for future incoming
319    // packets. We're going to send an RR/SDES RTCP packet to both of them.
320    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
321        struct sockaddr_in addr;
322        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
323        addr.sin_family = AF_INET;
324
325        AString source;
326        AString server_port;
327        if (!GetAttribute(transport.c_str(),
328                          "source",
329                          &source)) {
330            ALOGW("Missing 'source' field in Transport response. Using "
331                 "RTSP endpoint address.");
332
333            struct hostent *ent = gethostbyname(mSessionHost.c_str());
334            if (ent == NULL) {
335                ALOGE("Failed to look up address of session host '%s'",
336                     mSessionHost.c_str());
337
338                return false;
339            }
340
341            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
342        } else {
343            addr.sin_addr.s_addr = inet_addr(source.c_str());
344        }
345
346        if (!GetAttribute(transport.c_str(),
347                                 "server_port",
348                                 &server_port)) {
349            ALOGI("Missing 'server_port' field in Transport response.");
350            return false;
351        }
352
353        int rtpPort, rtcpPort;
354        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
355                || rtpPort <= 0 || rtpPort > 65535
356                || rtcpPort <=0 || rtcpPort > 65535
357                || rtcpPort != rtpPort + 1) {
358            ALOGE("Server picked invalid RTP/RTCP port pair %s,"
359                 " RTP port must be even, RTCP port must be one higher.",
360                 server_port.c_str());
361
362            return false;
363        }
364
365        if (rtpPort & 1) {
366            ALOGW("Server picked an odd RTP port, it should've picked an "
367                 "even one, we'll let it pass for now, but this may break "
368                 "in the future.");
369        }
370
371        if (addr.sin_addr.s_addr == INADDR_NONE) {
372            return true;
373        }
374
375        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
376            // No firewalls to traverse on the loopback interface.
377            return true;
378        }
379
380        // Make up an RR/SDES RTCP packet.
381        sp<ABuffer> buf = new ABuffer(65536);
382        buf->setRange(0, 0);
383        addRR(buf);
384        addSDES(rtpSocket, buf);
385
386        addr.sin_port = htons(rtpPort);
387
388        ssize_t n = sendto(
389                rtpSocket, buf->data(), buf->size(), 0,
390                (const sockaddr *)&addr, sizeof(addr));
391
392        if (n < (ssize_t)buf->size()) {
393            ALOGE("failed to poke a hole for RTP packets");
394            return false;
395        }
396
397        addr.sin_port = htons(rtcpPort);
398
399        n = sendto(
400                rtcpSocket, buf->data(), buf->size(), 0,
401                (const sockaddr *)&addr, sizeof(addr));
402
403        if (n < (ssize_t)buf->size()) {
404            ALOGE("failed to poke a hole for RTCP packets");
405            return false;
406        }
407
408        ALOGV("successfully poked holes.");
409
410        return true;
411    }
412
413    static bool isLiveStream(const sp<ASessionDescription> &desc) {
414        AString attrLiveStream;
415        if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
416            ssize_t semicolonPos = attrLiveStream.find(";", 2);
417
418            const char* liveStreamValue;
419            if (semicolonPos < 0) {
420                liveStreamValue = attrLiveStream.c_str();
421            } else {
422                AString valString;
423                valString.setTo(attrLiveStream,
424                        semicolonPos + 1,
425                        attrLiveStream.size() - semicolonPos - 1);
426                liveStreamValue = valString.c_str();
427            }
428
429            uint32_t value = strtoul(liveStreamValue, NULL, 10);
430            if (value == 1) {
431                ALOGV("found live stream");
432                return true;
433            }
434        } else {
435            // It is a live stream if no duration is returned
436            int64_t durationUs;
437            if (!desc->getDurationUs(&durationUs)) {
438                ALOGV("No duration found, assume live stream");
439                return true;
440            }
441        }
442
443        return false;
444    }
445
446    virtual void onMessageReceived(const sp<AMessage> &msg) {
447        switch (msg->what()) {
448            case 'conn':
449            {
450                int32_t result;
451                CHECK(msg->findInt32("result", &result));
452
453                ALOGI("connection request completed with result %d (%s)",
454                     result, strerror(-result));
455
456                if (result == OK) {
457                    AString request;
458                    request = "DESCRIBE ";
459                    request.append(mSessionURL);
460                    request.append(" RTSP/1.0\r\n");
461                    request.append("Accept: application/sdp\r\n");
462                    request.append("\r\n");
463
464                    sp<AMessage> reply = new AMessage('desc', this);
465                    mConn->sendRequest(request.c_str(), reply);
466                } else {
467                    (new AMessage('disc', this))->post();
468                }
469                break;
470            }
471
472            case 'disc':
473            {
474                ++mKeepAliveGeneration;
475
476                int32_t reconnect;
477                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
478                    sp<AMessage> reply = new AMessage('conn', this);
479                    mConn->connect(mOriginalSessionURL.c_str(), reply);
480                } else {
481                    (new AMessage('quit', this))->post();
482                }
483                break;
484            }
485
486            case 'desc':
487            {
488                int32_t result;
489                CHECK(msg->findInt32("result", &result));
490
491                ALOGI("DESCRIBE completed with result %d (%s)",
492                     result, strerror(-result));
493
494                if (result == OK) {
495                    sp<RefBase> obj;
496                    CHECK(msg->findObject("response", &obj));
497                    sp<ARTSPResponse> response =
498                        static_cast<ARTSPResponse *>(obj.get());
499
500                    if (response->mStatusCode == 301 || response->mStatusCode == 302) {
501                        ssize_t i = response->mHeaders.indexOfKey("location");
502                        CHECK_GE(i, 0);
503
504                        mOriginalSessionURL = response->mHeaders.valueAt(i);
505                        mSessionURL = mOriginalSessionURL;
506
507                        // Strip any authentication info from the session url, we don't
508                        // want to transmit user/pass in cleartext.
509                        AString host, path, user, pass;
510                        unsigned port;
511                        if (ARTSPConnection::ParseURL(
512                                    mSessionURL.c_str(), &host, &port, &path, &user, &pass)
513                                && user.size() > 0) {
514                            mSessionURL.clear();
515                            mSessionURL.append("rtsp://");
516                            mSessionURL.append(host);
517                            mSessionURL.append(":");
518                            mSessionURL.append(AStringPrintf("%u", port));
519                            mSessionURL.append(path);
520
521                            ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
522                        }
523
524                        sp<AMessage> reply = new AMessage('conn', this);
525                        mConn->connect(mOriginalSessionURL.c_str(), reply);
526                        break;
527                    }
528
529                    if (response->mStatusCode != 200) {
530                        result = UNKNOWN_ERROR;
531                    } else if (response->mContent == NULL) {
532                        result = ERROR_MALFORMED;
533                        ALOGE("The response has no content.");
534                    } else {
535                        mSessionDesc = new ASessionDescription;
536
537                        mSessionDesc->setTo(
538                                response->mContent->data(),
539                                response->mContent->size());
540
541                        if (!mSessionDesc->isValid()) {
542                            ALOGE("Failed to parse session description.");
543                            result = ERROR_MALFORMED;
544                        } else {
545                            ssize_t i = response->mHeaders.indexOfKey("content-base");
546                            if (i >= 0) {
547                                mBaseURL = response->mHeaders.valueAt(i);
548                            } else {
549                                i = response->mHeaders.indexOfKey("content-location");
550                                if (i >= 0) {
551                                    mBaseURL = response->mHeaders.valueAt(i);
552                                } else {
553                                    mBaseURL = mSessionURL;
554                                }
555                            }
556
557                            mSeekable = !isLiveStream(mSessionDesc);
558
559                            if (!mBaseURL.startsWith("rtsp://")) {
560                                // Some misbehaving servers specify a relative
561                                // URL in one of the locations above, combine
562                                // it with the absolute session URL to get
563                                // something usable...
564
565                                ALOGW("Server specified a non-absolute base URL"
566                                     ", combining it with the session URL to "
567                                     "get something usable...");
568
569                                AString tmp;
570                                CHECK(MakeURL(
571                                            mSessionURL.c_str(),
572                                            mBaseURL.c_str(),
573                                            &tmp));
574
575                                mBaseURL = tmp;
576                            }
577
578                            mControlURL = getControlURL();
579
580                            if (mSessionDesc->countTracks() < 2) {
581                                // There's no actual tracks in this session.
582                                // The first "track" is merely session meta
583                                // data.
584
585                                ALOGW("Session doesn't contain any playable "
586                                     "tracks. Aborting.");
587                                result = ERROR_UNSUPPORTED;
588                            } else {
589                                setupTrack(1);
590                            }
591                        }
592                    }
593                }
594
595                if (result != OK) {
596                    sp<AMessage> reply = new AMessage('disc', this);
597                    mConn->disconnect(reply);
598                }
599                break;
600            }
601
602            case 'sdpl':
603            {
604                int32_t result;
605                CHECK(msg->findInt32("result", &result));
606
607                ALOGI("SDP connection request completed with result %d (%s)",
608                     result, strerror(-result));
609
610                if (result == OK) {
611                    sp<RefBase> obj;
612                    CHECK(msg->findObject("description", &obj));
613                    mSessionDesc =
614                        static_cast<ASessionDescription *>(obj.get());
615
616                    if (!mSessionDesc->isValid()) {
617                        ALOGE("Failed to parse session description.");
618                        result = ERROR_MALFORMED;
619                    } else {
620                        mBaseURL = mSessionURL;
621
622                        mSeekable = !isLiveStream(mSessionDesc);
623
624                        mControlURL = getControlURL();
625
626                        if (mSessionDesc->countTracks() < 2) {
627                            // There's no actual tracks in this session.
628                            // The first "track" is merely session meta
629                            // data.
630
631                            ALOGW("Session doesn't contain any playable "
632                                 "tracks. Aborting.");
633                            result = ERROR_UNSUPPORTED;
634                        } else {
635                            setupTrack(1);
636                        }
637                    }
638                }
639
640                if (result != OK) {
641                    sp<AMessage> reply = new AMessage('disc', this);
642                    mConn->disconnect(reply);
643                }
644                break;
645            }
646
647            case 'setu':
648            {
649                size_t index;
650                CHECK(msg->findSize("index", &index));
651
652                TrackInfo *track = NULL;
653                size_t trackIndex;
654                if (msg->findSize("track-index", &trackIndex)) {
655                    track = &mTracks.editItemAt(trackIndex);
656                }
657
658                int32_t result;
659                CHECK(msg->findInt32("result", &result));
660
661                ALOGI("SETUP(%zu) completed with result %d (%s)",
662                     index, result, strerror(-result));
663
664                if (result == OK) {
665                    CHECK(track != NULL);
666
667                    sp<RefBase> obj;
668                    CHECK(msg->findObject("response", &obj));
669                    sp<ARTSPResponse> response =
670                        static_cast<ARTSPResponse *>(obj.get());
671
672                    if (response->mStatusCode != 200) {
673                        result = UNKNOWN_ERROR;
674                    } else {
675                        ssize_t i = response->mHeaders.indexOfKey("session");
676                        CHECK_GE(i, 0);
677
678                        mSessionID = response->mHeaders.valueAt(i);
679
680                        mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
681                        AString timeoutStr;
682                        if (GetAttribute(
683                                    mSessionID.c_str(), "timeout", &timeoutStr)) {
684                            char *end;
685                            unsigned long timeoutSecs =
686                                strtoul(timeoutStr.c_str(), &end, 10);
687
688                            if (end == timeoutStr.c_str() || *end != '\0') {
689                                ALOGW("server specified malformed timeout '%s'",
690                                     timeoutStr.c_str());
691
692                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
693                            } else if (timeoutSecs < 15) {
694                                ALOGW("server specified too short a timeout "
695                                     "(%lu secs), using default.",
696                                     timeoutSecs);
697
698                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
699                            } else {
700                                mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
701
702                                ALOGI("server specified timeout of %lu secs.",
703                                     timeoutSecs);
704                            }
705                        }
706
707                        i = mSessionID.find(";");
708                        if (i >= 0) {
709                            // Remove options, i.e. ";timeout=90"
710                            mSessionID.erase(i, mSessionID.size() - i);
711                        }
712
713                        sp<AMessage> notify = new AMessage('accu', this);
714                        notify->setSize("track-index", trackIndex);
715
716                        i = response->mHeaders.indexOfKey("transport");
717                        CHECK_GE(i, 0);
718
719                        if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
720                            if (!track->mUsingInterleavedTCP) {
721                                AString transport = response->mHeaders.valueAt(i);
722
723                                // We are going to continue even if we were
724                                // unable to poke a hole into the firewall...
725                                pokeAHole(
726                                        track->mRTPSocket,
727                                        track->mRTCPSocket,
728                                        transport);
729                            }
730
731                            mRTPConn->addStream(
732                                    track->mRTPSocket, track->mRTCPSocket,
733                                    mSessionDesc, index,
734                                    notify, track->mUsingInterleavedTCP);
735
736                            mSetupTracksSuccessful = true;
737                        } else {
738                            result = BAD_VALUE;
739                        }
740                    }
741                }
742
743                if (result != OK) {
744                    if (track) {
745                        if (!track->mUsingInterleavedTCP) {
746                            // Clear the tag
747                            if (mUIDValid) {
748                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
749                                HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
750                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
751                                HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
752                            }
753
754                            close(track->mRTPSocket);
755                            close(track->mRTCPSocket);
756                        }
757
758                        mTracks.removeItemsAt(trackIndex);
759                    }
760                }
761
762                ++index;
763                if (result == OK && index < mSessionDesc->countTracks()) {
764                    setupTrack(index);
765                } else if (mSetupTracksSuccessful) {
766                    ++mKeepAliveGeneration;
767                    postKeepAlive();
768
769                    AString request = "PLAY ";
770                    request.append(mControlURL);
771                    request.append(" RTSP/1.0\r\n");
772
773                    request.append("Session: ");
774                    request.append(mSessionID);
775                    request.append("\r\n");
776
777                    request.append("\r\n");
778
779                    sp<AMessage> reply = new AMessage('play', this);
780                    mConn->sendRequest(request.c_str(), reply);
781                } else {
782                    sp<AMessage> reply = new AMessage('disc', this);
783                    mConn->disconnect(reply);
784                }
785                break;
786            }
787
788            case 'play':
789            {
790                int32_t result;
791                CHECK(msg->findInt32("result", &result));
792
793                ALOGI("PLAY completed with result %d (%s)",
794                     result, strerror(-result));
795
796                if (result == OK) {
797                    sp<RefBase> obj;
798                    CHECK(msg->findObject("response", &obj));
799                    sp<ARTSPResponse> response =
800                        static_cast<ARTSPResponse *>(obj.get());
801
802                    if (response->mStatusCode != 200) {
803                        result = UNKNOWN_ERROR;
804                    } else {
805                        parsePlayResponse(response);
806
807                        sp<AMessage> timeout = new AMessage('tiou', this);
808                        mCheckTimeoutGeneration++;
809                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
810                        timeout->post(kStartupTimeoutUs);
811                    }
812                }
813
814                if (result != OK) {
815                    sp<AMessage> reply = new AMessage('disc', this);
816                    mConn->disconnect(reply);
817                }
818
819                break;
820            }
821
822            case 'aliv':
823            {
824                int32_t generation;
825                CHECK(msg->findInt32("generation", &generation));
826
827                if (generation != mKeepAliveGeneration) {
828                    // obsolete event.
829                    break;
830                }
831
832                AString request;
833                request.append("OPTIONS ");
834                request.append(mSessionURL);
835                request.append(" RTSP/1.0\r\n");
836                request.append("Session: ");
837                request.append(mSessionID);
838                request.append("\r\n");
839                request.append("\r\n");
840
841                sp<AMessage> reply = new AMessage('opts', this);
842                reply->setInt32("generation", mKeepAliveGeneration);
843                mConn->sendRequest(request.c_str(), reply);
844                break;
845            }
846
847            case 'opts':
848            {
849                int32_t result;
850                CHECK(msg->findInt32("result", &result));
851
852                ALOGI("OPTIONS completed with result %d (%s)",
853                     result, strerror(-result));
854
855                int32_t generation;
856                CHECK(msg->findInt32("generation", &generation));
857
858                if (generation != mKeepAliveGeneration) {
859                    // obsolete event.
860                    break;
861                }
862
863                postKeepAlive();
864                break;
865            }
866
867            case 'abor':
868            {
869                for (size_t i = 0; i < mTracks.size(); ++i) {
870                    TrackInfo *info = &mTracks.editItemAt(i);
871
872                    if (!mFirstAccessUnit) {
873                        postQueueEOS(i, ERROR_END_OF_STREAM);
874                    }
875
876                    if (!info->mUsingInterleavedTCP) {
877                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
878
879                        // Clear the tag
880                        if (mUIDValid) {
881                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
882                            HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
883                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
884                            HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
885                        }
886
887                        close(info->mRTPSocket);
888                        close(info->mRTCPSocket);
889                    }
890                }
891                mTracks.clear();
892                mSetupTracksSuccessful = false;
893                mSeekPending = false;
894                mFirstAccessUnit = true;
895                mAllTracksHaveTime = false;
896                mNTPAnchorUs = -1;
897                mMediaAnchorUs = -1;
898                mNumAccessUnitsReceived = 0;
899                mReceivedFirstRTCPPacket = false;
900                mReceivedFirstRTPPacket = false;
901                mPausing = false;
902                mSeekable = true;
903
904                sp<AMessage> reply = new AMessage('tear', this);
905
906                int32_t reconnect;
907                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
908                    reply->setInt32("reconnect", true);
909                }
910
911                AString request;
912                request = "TEARDOWN ";
913
914                // XXX should use aggregate url from SDP here...
915                request.append(mSessionURL);
916                request.append(" RTSP/1.0\r\n");
917
918                request.append("Session: ");
919                request.append(mSessionID);
920                request.append("\r\n");
921
922                request.append("\r\n");
923
924                mConn->sendRequest(request.c_str(), reply);
925                break;
926            }
927
928            case 'tear':
929            {
930                int32_t result;
931                CHECK(msg->findInt32("result", &result));
932
933                ALOGI("TEARDOWN completed with result %d (%s)",
934                     result, strerror(-result));
935
936                sp<AMessage> reply = new AMessage('disc', this);
937
938                int32_t reconnect;
939                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
940                    reply->setInt32("reconnect", true);
941                }
942
943                mConn->disconnect(reply);
944                break;
945            }
946
947            case 'quit':
948            {
949                sp<AMessage> msg = mNotify->dup();
950                msg->setInt32("what", kWhatDisconnected);
951                msg->setInt32("result", UNKNOWN_ERROR);
952                msg->post();
953                break;
954            }
955
956            case 'chek':
957            {
958                int32_t generation;
959                CHECK(msg->findInt32("generation", &generation));
960                if (generation != mCheckGeneration) {
961                    // This is an outdated message. Ignore.
962                    break;
963                }
964
965                if (mNumAccessUnitsReceived == 0) {
966#if 1
967                    ALOGI("stream ended? aborting.");
968                    (new AMessage('abor', this))->post();
969                    break;
970#else
971                    ALOGI("haven't seen an AU in a looong time.");
972#endif
973                }
974
975                mNumAccessUnitsReceived = 0;
976                msg->post(kAccessUnitTimeoutUs);
977                break;
978            }
979
980            case 'accu':
981            {
982                int32_t timeUpdate;
983                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
984                    size_t trackIndex;
985                    CHECK(msg->findSize("track-index", &trackIndex));
986
987                    uint32_t rtpTime;
988                    uint64_t ntpTime;
989                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
990                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
991
992                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
993                    break;
994                }
995
996                int32_t first;
997                if (msg->findInt32("first-rtcp", &first)) {
998                    mReceivedFirstRTCPPacket = true;
999                    break;
1000                }
1001
1002                if (msg->findInt32("first-rtp", &first)) {
1003                    mReceivedFirstRTPPacket = true;
1004                    break;
1005                }
1006
1007                ++mNumAccessUnitsReceived;
1008                postAccessUnitTimeoutCheck();
1009
1010                size_t trackIndex;
1011                CHECK(msg->findSize("track-index", &trackIndex));
1012
1013                if (trackIndex >= mTracks.size()) {
1014                    ALOGV("late packets ignored.");
1015                    break;
1016                }
1017
1018                TrackInfo *track = &mTracks.editItemAt(trackIndex);
1019
1020                int32_t eos;
1021                if (msg->findInt32("eos", &eos)) {
1022                    ALOGI("received BYE on track index %zu", trackIndex);
1023                    if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1024                        ALOGI("No time established => fake existing data");
1025
1026                        track->mEOSReceived = true;
1027                        mTryFakeRTCP = true;
1028                        mReceivedFirstRTCPPacket = true;
1029                        fakeTimestamps();
1030                    } else {
1031                        postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1032                    }
1033                    return;
1034                }
1035
1036                sp<ABuffer> accessUnit;
1037                CHECK(msg->findBuffer("access-unit", &accessUnit));
1038
1039                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1040
1041                if (mSeekPending) {
1042                    ALOGV("we're seeking, dropping stale packet.");
1043                    break;
1044                }
1045
1046                if (seqNum < track->mFirstSeqNumInSegment) {
1047                    ALOGV("dropping stale access-unit (%d < %d)",
1048                         seqNum, track->mFirstSeqNumInSegment);
1049                    break;
1050                }
1051
1052                if (track->mNewSegment) {
1053                    track->mNewSegment = false;
1054                }
1055
1056                onAccessUnitComplete(trackIndex, accessUnit);
1057                break;
1058            }
1059
1060            case 'paus':
1061            {
1062                int32_t generation;
1063                CHECK(msg->findInt32("pausecheck", &generation));
1064                if (generation != mPauseGeneration) {
1065                    ALOGV("Ignoring outdated pause message.");
1066                    break;
1067                }
1068
1069                if (!mSeekable) {
1070                    ALOGW("This is a live stream, ignoring pause request.");
1071                    break;
1072                }
1073                mCheckPending = true;
1074                ++mCheckGeneration;
1075                mPausing = true;
1076
1077                AString request = "PAUSE ";
1078                request.append(mControlURL);
1079                request.append(" RTSP/1.0\r\n");
1080
1081                request.append("Session: ");
1082                request.append(mSessionID);
1083                request.append("\r\n");
1084
1085                request.append("\r\n");
1086
1087                sp<AMessage> reply = new AMessage('pau2', this);
1088                mConn->sendRequest(request.c_str(), reply);
1089                break;
1090            }
1091
1092            case 'pau2':
1093            {
1094                int32_t result;
1095                CHECK(msg->findInt32("result", &result));
1096                mCheckTimeoutGeneration++;
1097
1098                ALOGI("PAUSE completed with result %d (%s)",
1099                     result, strerror(-result));
1100                break;
1101            }
1102
1103            case 'resu':
1104            {
1105                if (mPausing && mSeekPending) {
1106                    // If seeking, Play will be sent from see1 instead
1107                    break;
1108                }
1109
1110                if (!mPausing) {
1111                    // Dont send PLAY if we have not paused
1112                    break;
1113                }
1114                AString request = "PLAY ";
1115                request.append(mControlURL);
1116                request.append(" RTSP/1.0\r\n");
1117
1118                request.append("Session: ");
1119                request.append(mSessionID);
1120                request.append("\r\n");
1121
1122                request.append("\r\n");
1123
1124                sp<AMessage> reply = new AMessage('res2', this);
1125                mConn->sendRequest(request.c_str(), reply);
1126                break;
1127            }
1128
1129            case 'res2':
1130            {
1131                int32_t result;
1132                CHECK(msg->findInt32("result", &result));
1133
1134                ALOGI("PLAY completed with result %d (%s)",
1135                     result, strerror(-result));
1136
1137                mCheckPending = false;
1138                postAccessUnitTimeoutCheck();
1139
1140                if (result == OK) {
1141                    sp<RefBase> obj;
1142                    CHECK(msg->findObject("response", &obj));
1143                    sp<ARTSPResponse> response =
1144                        static_cast<ARTSPResponse *>(obj.get());
1145
1146                    if (response->mStatusCode != 200) {
1147                        result = UNKNOWN_ERROR;
1148                    } else {
1149                        parsePlayResponse(response);
1150
1151                        // Post new timeout in order to make sure to use
1152                        // fake timestamps if no new Sender Reports arrive
1153                        sp<AMessage> timeout = new AMessage('tiou', this);
1154                        mCheckTimeoutGeneration++;
1155                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1156                        timeout->post(kStartupTimeoutUs);
1157                    }
1158                }
1159
1160                if (result != OK) {
1161                    ALOGE("resume failed, aborting.");
1162                    (new AMessage('abor', this))->post();
1163                }
1164
1165                mPausing = false;
1166                break;
1167            }
1168
1169            case 'seek':
1170            {
1171                if (!mSeekable) {
1172                    ALOGW("This is a live stream, ignoring seek request.");
1173
1174                    sp<AMessage> msg = mNotify->dup();
1175                    msg->setInt32("what", kWhatSeekDone);
1176                    msg->post();
1177                    break;
1178                }
1179
1180                int64_t timeUs;
1181                CHECK(msg->findInt64("time", &timeUs));
1182
1183                mSeekPending = true;
1184
1185                // Disable the access unit timeout until we resumed
1186                // playback again.
1187                mCheckPending = true;
1188                ++mCheckGeneration;
1189
1190                sp<AMessage> reply = new AMessage('see0', this);
1191                reply->setInt64("time", timeUs);
1192
1193                if (mPausing) {
1194                    // PAUSE already sent
1195                    ALOGI("Pause already sent");
1196                    reply->post();
1197                    break;
1198                }
1199                AString request = "PAUSE ";
1200                request.append(mControlURL);
1201                request.append(" RTSP/1.0\r\n");
1202
1203                request.append("Session: ");
1204                request.append(mSessionID);
1205                request.append("\r\n");
1206
1207                request.append("\r\n");
1208
1209                mConn->sendRequest(request.c_str(), reply);
1210                break;
1211            }
1212
1213            case 'see0':
1214            {
1215                // Session is paused now.
1216                status_t err = OK;
1217                msg->findInt32("result", &err);
1218
1219                int64_t timeUs;
1220                CHECK(msg->findInt64("time", &timeUs));
1221
1222                sp<AMessage> notify = mNotify->dup();
1223                notify->setInt32("what", kWhatSeekPaused);
1224                notify->setInt32("err", err);
1225                notify->setInt64("time", timeUs);
1226                notify->post();
1227                break;
1228
1229            }
1230
1231            case 'see1':
1232            {
1233                for (size_t i = 0; i < mTracks.size(); ++i) {
1234                    TrackInfo *info = &mTracks.editItemAt(i);
1235
1236                    postQueueSeekDiscontinuity(i);
1237                    info->mEOSReceived = false;
1238
1239                    info->mRTPAnchor = 0;
1240                    info->mNTPAnchorUs = -1;
1241                }
1242
1243                mAllTracksHaveTime = false;
1244                mNTPAnchorUs = -1;
1245
1246                // Start new timeoutgeneration to avoid getting timeout
1247                // before PLAY response arrive
1248                sp<AMessage> timeout = new AMessage('tiou', this);
1249                mCheckTimeoutGeneration++;
1250                timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1251                timeout->post(kStartupTimeoutUs);
1252
1253                int64_t timeUs;
1254                CHECK(msg->findInt64("time", &timeUs));
1255
1256                AString request = "PLAY ";
1257                request.append(mControlURL);
1258                request.append(" RTSP/1.0\r\n");
1259
1260                request.append("Session: ");
1261                request.append(mSessionID);
1262                request.append("\r\n");
1263
1264                request.append(
1265                        AStringPrintf(
1266                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1267
1268                request.append("\r\n");
1269
1270                sp<AMessage> reply = new AMessage('see2', this);
1271                mConn->sendRequest(request.c_str(), reply);
1272                break;
1273            }
1274
1275            case 'see2':
1276            {
1277                if (mTracks.size() == 0) {
1278                    // We have already hit abor, break
1279                    break;
1280                }
1281
1282                int32_t result;
1283                CHECK(msg->findInt32("result", &result));
1284
1285                ALOGI("PLAY completed with result %d (%s)",
1286                     result, strerror(-result));
1287
1288                mCheckPending = false;
1289                postAccessUnitTimeoutCheck();
1290
1291                if (result == OK) {
1292                    sp<RefBase> obj;
1293                    CHECK(msg->findObject("response", &obj));
1294                    sp<ARTSPResponse> response =
1295                        static_cast<ARTSPResponse *>(obj.get());
1296
1297                    if (response->mStatusCode != 200) {
1298                        result = UNKNOWN_ERROR;
1299                    } else {
1300                        parsePlayResponse(response);
1301
1302                        // Post new timeout in order to make sure to use
1303                        // fake timestamps if no new Sender Reports arrive
1304                        sp<AMessage> timeout = new AMessage('tiou', this);
1305                        mCheckTimeoutGeneration++;
1306                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1307                        timeout->post(kStartupTimeoutUs);
1308
1309                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1310                        CHECK_GE(i, 0);
1311
1312                        ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1313
1314                        ALOGI("seek completed.");
1315                    }
1316                }
1317
1318                if (result != OK) {
1319                    ALOGE("seek failed, aborting.");
1320                    (new AMessage('abor', this))->post();
1321                }
1322
1323                mPausing = false;
1324                mSeekPending = false;
1325
1326                sp<AMessage> msg = mNotify->dup();
1327                msg->setInt32("what", kWhatSeekDone);
1328                msg->post();
1329                break;
1330            }
1331
1332            case 'biny':
1333            {
1334                sp<ABuffer> buffer;
1335                CHECK(msg->findBuffer("buffer", &buffer));
1336
1337                int32_t index;
1338                CHECK(buffer->meta()->findInt32("index", &index));
1339
1340                mRTPConn->injectPacket(index, buffer);
1341                break;
1342            }
1343
1344            case 'tiou':
1345            {
1346                int32_t timeoutGenerationCheck;
1347                CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1348                if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1349                    // This is an outdated message. Ignore.
1350                    // This typically happens if a lot of seeks are
1351                    // performed, since new timeout messages now are
1352                    // posted at seek as well.
1353                    break;
1354                }
1355                if (!mReceivedFirstRTCPPacket) {
1356                    if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1357                        ALOGW("We received RTP packets but no RTCP packets, "
1358                             "using fake timestamps.");
1359
1360                        mTryFakeRTCP = true;
1361
1362                        mReceivedFirstRTCPPacket = true;
1363
1364                        fakeTimestamps();
1365                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1366                        ALOGW("Never received any data, switching transports.");
1367
1368                        mTryTCPInterleaving = true;
1369
1370                        sp<AMessage> msg = new AMessage('abor', this);
1371                        msg->setInt32("reconnect", true);
1372                        msg->post();
1373                    } else {
1374                        ALOGW("Never received any data, disconnecting.");
1375                        (new AMessage('abor', this))->post();
1376                    }
1377                } else {
1378                    if (!mAllTracksHaveTime) {
1379                        ALOGW("We received some RTCP packets, but time "
1380                              "could not be established on all tracks, now "
1381                              "using fake timestamps");
1382
1383                        fakeTimestamps();
1384                    }
1385                }
1386                break;
1387            }
1388
1389            default:
1390                TRESPASS();
1391                break;
1392        }
1393    }
1394
1395    void postKeepAlive() {
1396        sp<AMessage> msg = new AMessage('aliv', this);
1397        msg->setInt32("generation", mKeepAliveGeneration);
1398        msg->post((mKeepAliveTimeoutUs * 9) / 10);
1399    }
1400
1401    void postAccessUnitTimeoutCheck() {
1402        if (mCheckPending) {
1403            return;
1404        }
1405
1406        mCheckPending = true;
1407        sp<AMessage> check = new AMessage('chek', this);
1408        check->setInt32("generation", mCheckGeneration);
1409        check->post(kAccessUnitTimeoutUs);
1410    }
1411
1412    static void SplitString(
1413            const AString &s, const char *separator, List<AString> *items) {
1414        items->clear();
1415        size_t start = 0;
1416        while (start < s.size()) {
1417            ssize_t offset = s.find(separator, start);
1418
1419            if (offset < 0) {
1420                items->push_back(AString(s, start, s.size() - start));
1421                break;
1422            }
1423
1424            items->push_back(AString(s, start, offset - start));
1425            start = offset + strlen(separator);
1426        }
1427    }
1428
1429    void parsePlayResponse(const sp<ARTSPResponse> &response) {
1430        mPlayResponseParsed = true;
1431        if (mTracks.size() == 0) {
1432            ALOGV("parsePlayResponse: late packets ignored.");
1433            return;
1434        }
1435
1436        ssize_t i = response->mHeaders.indexOfKey("range");
1437        if (i < 0) {
1438            // Server doesn't even tell use what range it is going to
1439            // play, therefore we won't support seeking.
1440            return;
1441        }
1442
1443        AString range = response->mHeaders.valueAt(i);
1444        ALOGV("Range: %s", range.c_str());
1445
1446        AString val;
1447        CHECK(GetAttribute(range.c_str(), "npt", &val));
1448
1449        float npt1, npt2;
1450        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1451            // This is a live stream and therefore not seekable.
1452
1453            ALOGI("This is a live stream");
1454            return;
1455        }
1456
1457        i = response->mHeaders.indexOfKey("rtp-info");
1458        CHECK_GE(i, 0);
1459
1460        AString rtpInfo = response->mHeaders.valueAt(i);
1461        List<AString> streamInfos;
1462        SplitString(rtpInfo, ",", &streamInfos);
1463
1464        int n = 1;
1465        for (List<AString>::iterator it = streamInfos.begin();
1466             it != streamInfos.end(); ++it) {
1467            (*it).trim();
1468            ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1469
1470            CHECK(GetAttribute((*it).c_str(), "url", &val));
1471
1472            size_t trackIndex = 0;
1473            while (trackIndex < mTracks.size()
1474                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1475                ++trackIndex;
1476            }
1477            CHECK_LT(trackIndex, mTracks.size());
1478
1479            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1480
1481            char *end;
1482            unsigned long seq = strtoul(val.c_str(), &end, 10);
1483
1484            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1485            info->mFirstSeqNumInSegment = seq;
1486            info->mNewSegment = true;
1487
1488            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1489
1490            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1491
1492            ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1493
1494            info->mNormalPlayTimeRTP = rtpTime;
1495            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1496
1497            if (!mFirstAccessUnit) {
1498                postNormalPlayTimeMapping(
1499                        trackIndex,
1500                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1501            }
1502
1503            ++n;
1504        }
1505    }
1506
1507    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1508        CHECK_GE(index, 0u);
1509        CHECK_LT(index, mTracks.size());
1510
1511        const TrackInfo &info = mTracks.itemAt(index);
1512
1513        *timeScale = info.mTimeScale;
1514
1515        return info.mPacketSource->getFormat();
1516    }
1517
1518    size_t countTracks() const {
1519        return mTracks.size();
1520    }
1521
1522private:
1523    struct TrackInfo {
1524        AString mURL;
1525        int mRTPSocket;
1526        int mRTCPSocket;
1527        bool mUsingInterleavedTCP;
1528        uint32_t mFirstSeqNumInSegment;
1529        bool mNewSegment;
1530
1531        uint32_t mRTPAnchor;
1532        int64_t mNTPAnchorUs;
1533        int32_t mTimeScale;
1534        bool mEOSReceived;
1535
1536        uint32_t mNormalPlayTimeRTP;
1537        int64_t mNormalPlayTimeUs;
1538
1539        sp<APacketSource> mPacketSource;
1540
1541        // Stores packets temporarily while no notion of time
1542        // has been established yet.
1543        List<sp<ABuffer> > mPackets;
1544    };
1545
1546    sp<AMessage> mNotify;
1547    bool mUIDValid;
1548    uid_t mUID;
1549    sp<ALooper> mNetLooper;
1550    sp<ARTSPConnection> mConn;
1551    sp<ARTPConnection> mRTPConn;
1552    sp<ASessionDescription> mSessionDesc;
1553    AString mOriginalSessionURL;  // This one still has user:pass@
1554    AString mSessionURL;
1555    AString mSessionHost;
1556    AString mBaseURL;
1557    AString mControlURL;
1558    AString mSessionID;
1559    bool mSetupTracksSuccessful;
1560    bool mSeekPending;
1561    bool mFirstAccessUnit;
1562
1563    bool mAllTracksHaveTime;
1564    int64_t mNTPAnchorUs;
1565    int64_t mMediaAnchorUs;
1566    int64_t mLastMediaTimeUs;
1567
1568    int64_t mNumAccessUnitsReceived;
1569    bool mCheckPending;
1570    int32_t mCheckGeneration;
1571    int32_t mCheckTimeoutGeneration;
1572    bool mTryTCPInterleaving;
1573    bool mTryFakeRTCP;
1574    bool mReceivedFirstRTCPPacket;
1575    bool mReceivedFirstRTPPacket;
1576    bool mSeekable;
1577    int64_t mKeepAliveTimeoutUs;
1578    int32_t mKeepAliveGeneration;
1579    bool mPausing;
1580    int32_t mPauseGeneration;
1581
1582    Vector<TrackInfo> mTracks;
1583
1584    bool mPlayResponseParsed;
1585
1586    void setupTrack(size_t index) {
1587        sp<APacketSource> source =
1588            new APacketSource(mSessionDesc, index);
1589
1590        if (source->initCheck() != OK) {
1591            ALOGW("Unsupported format. Ignoring track #%zu.", index);
1592
1593            sp<AMessage> reply = new AMessage('setu', this);
1594            reply->setSize("index", index);
1595            reply->setInt32("result", ERROR_UNSUPPORTED);
1596            reply->post();
1597            return;
1598        }
1599
1600        AString url;
1601        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1602
1603        AString trackURL;
1604        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1605
1606        mTracks.push(TrackInfo());
1607        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1608        info->mURL = trackURL;
1609        info->mPacketSource = source;
1610        info->mUsingInterleavedTCP = false;
1611        info->mFirstSeqNumInSegment = 0;
1612        info->mNewSegment = true;
1613        info->mRTPSocket = -1;
1614        info->mRTCPSocket = -1;
1615        info->mRTPAnchor = 0;
1616        info->mNTPAnchorUs = -1;
1617        info->mNormalPlayTimeRTP = 0;
1618        info->mNormalPlayTimeUs = 0ll;
1619
1620        unsigned long PT;
1621        AString formatDesc;
1622        AString formatParams;
1623        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1624
1625        int32_t timescale;
1626        int32_t numChannels;
1627        ASessionDescription::ParseFormatDesc(
1628                formatDesc.c_str(), &timescale, &numChannels);
1629
1630        info->mTimeScale = timescale;
1631        info->mEOSReceived = false;
1632
1633        ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1634
1635        AString request = "SETUP ";
1636        request.append(trackURL);
1637        request.append(" RTSP/1.0\r\n");
1638
1639        if (mTryTCPInterleaving) {
1640            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1641            info->mUsingInterleavedTCP = true;
1642            info->mRTPSocket = interleaveIndex;
1643            info->mRTCPSocket = interleaveIndex + 1;
1644
1645            request.append("Transport: RTP/AVP/TCP;interleaved=");
1646            request.append(interleaveIndex);
1647            request.append("-");
1648            request.append(interleaveIndex + 1);
1649        } else {
1650            unsigned rtpPort;
1651            ARTPConnection::MakePortPair(
1652                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1653
1654            if (mUIDValid) {
1655                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1656                                                (uint32_t)*(uint32_t*) "RTP_");
1657                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1658                                                (uint32_t)*(uint32_t*) "RTP_");
1659                HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1660                HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1661            }
1662
1663            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1664            request.append(rtpPort);
1665            request.append("-");
1666            request.append(rtpPort + 1);
1667        }
1668
1669        request.append("\r\n");
1670
1671        if (index > 1) {
1672            request.append("Session: ");
1673            request.append(mSessionID);
1674            request.append("\r\n");
1675        }
1676
1677        request.append("\r\n");
1678
1679        sp<AMessage> reply = new AMessage('setu', this);
1680        reply->setSize("index", index);
1681        reply->setSize("track-index", mTracks.size() - 1);
1682        mConn->sendRequest(request.c_str(), reply);
1683    }
1684
1685    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1686        out->clear();
1687
1688        if (strncasecmp("rtsp://", baseURL, 7)) {
1689            // Base URL must be absolute
1690            return false;
1691        }
1692
1693        if (!strncasecmp("rtsp://", url, 7)) {
1694            // "url" is already an absolute URL, ignore base URL.
1695            out->setTo(url);
1696            return true;
1697        }
1698
1699        size_t n = strlen(baseURL);
1700        out->setTo(baseURL);
1701        if (baseURL[n - 1] != '/') {
1702            out->append("/");
1703        }
1704        out->append(url);
1705
1706        return true;
1707    }
1708
1709    void fakeTimestamps() {
1710        mNTPAnchorUs = -1ll;
1711        for (size_t i = 0; i < mTracks.size(); ++i) {
1712            onTimeUpdate(i, 0, 0ll);
1713        }
1714    }
1715
1716    bool dataReceivedOnAllChannels() {
1717        TrackInfo *track;
1718        for (size_t i = 0; i < mTracks.size(); ++i) {
1719            track = &mTracks.editItemAt(i);
1720            if (track->mPackets.empty()) {
1721                return false;
1722            }
1723        }
1724        return true;
1725    }
1726
1727    void handleFirstAccessUnit() {
1728        if (mFirstAccessUnit) {
1729            sp<AMessage> msg = mNotify->dup();
1730            msg->setInt32("what", kWhatConnected);
1731            msg->post();
1732
1733            if (mSeekable) {
1734                for (size_t i = 0; i < mTracks.size(); ++i) {
1735                    TrackInfo *info = &mTracks.editItemAt(i);
1736
1737                    postNormalPlayTimeMapping(
1738                            i,
1739                            info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1740                }
1741            }
1742
1743            mFirstAccessUnit = false;
1744        }
1745    }
1746
1747    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1748        ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1749             trackIndex, rtpTime, (long long)ntpTime);
1750
1751        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1752
1753        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1754
1755        track->mRTPAnchor = rtpTime;
1756        track->mNTPAnchorUs = ntpTimeUs;
1757
1758        if (mNTPAnchorUs < 0) {
1759            mNTPAnchorUs = ntpTimeUs;
1760            mMediaAnchorUs = mLastMediaTimeUs;
1761        }
1762
1763        if (!mAllTracksHaveTime) {
1764            bool allTracksHaveTime = (mTracks.size() > 0);
1765            for (size_t i = 0; i < mTracks.size(); ++i) {
1766                TrackInfo *track = &mTracks.editItemAt(i);
1767                if (track->mNTPAnchorUs < 0) {
1768                    allTracksHaveTime = false;
1769                    break;
1770                }
1771            }
1772            if (allTracksHaveTime) {
1773                mAllTracksHaveTime = true;
1774                ALOGI("Time now established for all tracks.");
1775            }
1776        }
1777        if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1778            handleFirstAccessUnit();
1779
1780            // Time is now established, lets start timestamping immediately
1781            for (size_t i = 0; i < mTracks.size(); ++i) {
1782                TrackInfo *trackInfo = &mTracks.editItemAt(i);
1783                while (!trackInfo->mPackets.empty()) {
1784                    sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
1785                    trackInfo->mPackets.erase(trackInfo->mPackets.begin());
1786
1787                    if (addMediaTimestamp(i, trackInfo, accessUnit)) {
1788                        postQueueAccessUnit(i, accessUnit);
1789                    }
1790                }
1791            }
1792            for (size_t i = 0; i < mTracks.size(); ++i) {
1793                TrackInfo *trackInfo = &mTracks.editItemAt(i);
1794                if (trackInfo->mEOSReceived) {
1795                    postQueueEOS(i, ERROR_END_OF_STREAM);
1796                    trackInfo->mEOSReceived = false;
1797                }
1798            }
1799        }
1800    }
1801
1802    void onAccessUnitComplete(
1803            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1804        ALOGV("onAccessUnitComplete track %d", trackIndex);
1805
1806        if(!mPlayResponseParsed){
1807            ALOGI("play response is not parsed, storing accessunit");
1808            TrackInfo *track = &mTracks.editItemAt(trackIndex);
1809            track->mPackets.push_back(accessUnit);
1810            return;
1811        }
1812
1813        handleFirstAccessUnit();
1814
1815        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1816
1817        if (!mAllTracksHaveTime) {
1818            ALOGV("storing accessUnit, no time established yet");
1819            track->mPackets.push_back(accessUnit);
1820            return;
1821        }
1822
1823        while (!track->mPackets.empty()) {
1824            sp<ABuffer> accessUnit = *track->mPackets.begin();
1825            track->mPackets.erase(track->mPackets.begin());
1826
1827            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1828                postQueueAccessUnit(trackIndex, accessUnit);
1829            }
1830        }
1831
1832        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1833            postQueueAccessUnit(trackIndex, accessUnit);
1834        }
1835
1836        if (track->mEOSReceived) {
1837            postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1838            track->mEOSReceived = false;
1839        }
1840    }
1841
1842    bool addMediaTimestamp(
1843            int32_t trackIndex, const TrackInfo *track,
1844            const sp<ABuffer> &accessUnit) {
1845        UNUSED_UNLESS_VERBOSE(trackIndex);
1846
1847        uint32_t rtpTime;
1848        CHECK(accessUnit->meta()->findInt32(
1849                    "rtp-time", (int32_t *)&rtpTime));
1850
1851        int64_t relRtpTimeUs =
1852            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1853                / track->mTimeScale;
1854
1855        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1856
1857        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1858
1859        if (mediaTimeUs > mLastMediaTimeUs) {
1860            mLastMediaTimeUs = mediaTimeUs;
1861        }
1862
1863        if (mediaTimeUs < 0) {
1864            ALOGV("dropping early accessUnit.");
1865            return false;
1866        }
1867
1868        ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1869             trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1870
1871        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1872
1873        return true;
1874    }
1875
1876    void postQueueAccessUnit(
1877            size_t trackIndex, const sp<ABuffer> &accessUnit) {
1878        sp<AMessage> msg = mNotify->dup();
1879        msg->setInt32("what", kWhatAccessUnit);
1880        msg->setSize("trackIndex", trackIndex);
1881        msg->setBuffer("accessUnit", accessUnit);
1882        msg->post();
1883    }
1884
1885    void postQueueEOS(size_t trackIndex, status_t finalResult) {
1886        sp<AMessage> msg = mNotify->dup();
1887        msg->setInt32("what", kWhatEOS);
1888        msg->setSize("trackIndex", trackIndex);
1889        msg->setInt32("finalResult", finalResult);
1890        msg->post();
1891    }
1892
1893    void postQueueSeekDiscontinuity(size_t trackIndex) {
1894        sp<AMessage> msg = mNotify->dup();
1895        msg->setInt32("what", kWhatSeekDiscontinuity);
1896        msg->setSize("trackIndex", trackIndex);
1897        msg->post();
1898    }
1899
1900    void postNormalPlayTimeMapping(
1901            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1902        sp<AMessage> msg = mNotify->dup();
1903        msg->setInt32("what", kWhatNormalPlayTimeMapping);
1904        msg->setSize("trackIndex", trackIndex);
1905        msg->setInt32("rtpTime", rtpTime);
1906        msg->setInt64("nptUs", nptUs);
1907        msg->post();
1908    }
1909
1910    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1911};
1912
1913}  // namespace android
1914
1915#endif  // MY_HANDLER_H_
1916