1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22
23#ifndef LOG_TAG
24#define LOG_TAG "MyHandler"
25#endif
26
27#include <utils/Log.h>
28#include <cutils/properties.h> // for property_get
29
30#include "APacketSource.h"
31#include "ARTPConnection.h"
32#include "ARTSPConnection.h"
33#include "ASessionDescription.h"
34
35#include <ctype.h>
36
37#include <media/stagefright/foundation/ABuffer.h>
38#include <media/stagefright/foundation/ADebug.h>
39#include <media/stagefright/foundation/ALooper.h>
40#include <media/stagefright/foundation/AMessage.h>
41#include <media/stagefright/MediaErrors.h>
42#include <media/stagefright/Utils.h>
43
44#include <arpa/inet.h>
45#include <sys/socket.h>
46#include <netdb.h>
47
48#include "HTTPBase.h"
49
50#if LOG_NDEBUG
51#define UNUSED_UNLESS_VERBOSE(x) (void)(x)
52#else
53#define UNUSED_UNLESS_VERBOSE(x)
54#endif
55
56// If no access units are received within 5 secs, assume that the rtp
57// stream has ended and signal end of stream.
58static int64_t kAccessUnitTimeoutUs = 10000000ll;
59
60// If no access units arrive for the first 10 secs after starting the
61// stream, assume none ever will and signal EOS or switch transports.
62static int64_t kStartupTimeoutUs = 10000000ll;
63
64static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
65
66static int64_t kPauseDelayUs = 3000000ll;
67
68// The allowed maximum number of stale access units at the beginning of
69// a new sequence.
70static int32_t kMaxAllowedStaleAccessUnits = 20;
71
72namespace android {
73
74static bool GetAttribute(const char *s, const char *key, AString *value) {
75    value->clear();
76
77    size_t keyLen = strlen(key);
78
79    for (;;) {
80        while (isspace(*s)) {
81            ++s;
82        }
83
84        const char *colonPos = strchr(s, ';');
85
86        size_t len =
87            (colonPos == NULL) ? strlen(s) : colonPos - s;
88
89        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
90            value->setTo(&s[keyLen + 1], len - keyLen - 1);
91            return true;
92        }
93
94        if (colonPos == NULL) {
95            return false;
96        }
97
98        s = colonPos + 1;
99    }
100}
101
102struct MyHandler : public AHandler {
103    enum {
104        kWhatConnected                  = 'conn',
105        kWhatDisconnected               = 'disc',
106        kWhatSeekPaused                 = 'spau',
107        kWhatSeekDone                   = 'sdon',
108
109        kWhatAccessUnit                 = 'accU',
110        kWhatEOS                        = 'eos!',
111        kWhatSeekDiscontinuity          = 'seeD',
112        kWhatNormalPlayTimeMapping      = 'nptM',
113    };
114
115    MyHandler(
116            const char *url,
117            const sp<AMessage> &notify,
118            bool uidValid = false, uid_t uid = 0)
119        : mNotify(notify),
120          mUIDValid(uidValid),
121          mUID(uid),
122          mNetLooper(new ALooper),
123          mConn(new ARTSPConnection(mUIDValid, mUID)),
124          mRTPConn(new ARTPConnection),
125          mOriginalSessionURL(url),
126          mSessionURL(url),
127          mSetupTracksSuccessful(false),
128          mSeekPending(false),
129          mFirstAccessUnit(true),
130          mAllTracksHaveTime(false),
131          mNTPAnchorUs(-1),
132          mMediaAnchorUs(-1),
133          mLastMediaTimeUs(0),
134          mNumAccessUnitsReceived(0),
135          mCheckPending(false),
136          mCheckGeneration(0),
137          mCheckTimeoutGeneration(0),
138          mTryTCPInterleaving(false),
139          mTryFakeRTCP(false),
140          mReceivedFirstRTCPPacket(false),
141          mReceivedFirstRTPPacket(false),
142          mSeekable(true),
143          mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
144          mKeepAliveGeneration(0),
145          mPausing(false),
146          mPauseGeneration(0),
147          mPlayResponseParsed(false) {
148        mNetLooper->setName("rtsp net");
149        mNetLooper->start(false /* runOnCallingThread */,
150                          false /* canCallJava */,
151                          PRIORITY_HIGHEST);
152
153        // Strip any authentication info from the session url, we don't
154        // want to transmit user/pass in cleartext.
155        AString host, path, user, pass;
156        unsigned port;
157        CHECK(ARTSPConnection::ParseURL(
158                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
159
160        if (user.size() > 0) {
161            mSessionURL.clear();
162            mSessionURL.append("rtsp://");
163            mSessionURL.append(host);
164            mSessionURL.append(":");
165            mSessionURL.append(AStringPrintf("%u", port));
166            mSessionURL.append(path);
167
168            ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
169        }
170
171        mSessionHost = host;
172    }
173
174    void connect() {
175        looper()->registerHandler(mConn);
176        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
177
178        sp<AMessage> notify = new AMessage('biny', this);
179        mConn->observeBinaryData(notify);
180
181        sp<AMessage> reply = new AMessage('conn', this);
182        mConn->connect(mOriginalSessionURL.c_str(), reply);
183    }
184
185    void loadSDP(const sp<ASessionDescription>& desc) {
186        looper()->registerHandler(mConn);
187        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
188
189        sp<AMessage> notify = new AMessage('biny', this);
190        mConn->observeBinaryData(notify);
191
192        sp<AMessage> reply = new AMessage('sdpl', this);
193        reply->setObject("description", desc);
194        mConn->connect(mOriginalSessionURL.c_str(), reply);
195    }
196
197    AString getControlURL() {
198        AString sessionLevelControlURL;
199        if (mSessionDesc->findAttribute(
200                0,
201                "a=control",
202                &sessionLevelControlURL)) {
203            if (sessionLevelControlURL.compare("*") == 0) {
204                return mBaseURL;
205            } else {
206                AString controlURL;
207                CHECK(MakeURL(
208                        mBaseURL.c_str(),
209                        sessionLevelControlURL.c_str(),
210                        &controlURL));
211                return controlURL;
212            }
213        } else {
214            return mSessionURL;
215        }
216    }
217
218    void disconnect() {
219        (new AMessage('abor', this))->post();
220    }
221
222    void seek(int64_t timeUs) {
223        sp<AMessage> msg = new AMessage('seek', this);
224        msg->setInt64("time", timeUs);
225        mPauseGeneration++;
226        msg->post();
227    }
228
229    void continueSeekAfterPause(int64_t timeUs) {
230        sp<AMessage> msg = new AMessage('see1', this);
231        msg->setInt64("time", timeUs);
232        msg->post();
233    }
234
235    bool isSeekable() const {
236        return mSeekable;
237    }
238
239    void pause() {
240        sp<AMessage> msg = new AMessage('paus', this);
241        mPauseGeneration++;
242        msg->setInt32("pausecheck", mPauseGeneration);
243        msg->post();
244    }
245
246    void resume() {
247        sp<AMessage> msg = new AMessage('resu', this);
248        mPauseGeneration++;
249        msg->post();
250    }
251
252    static void addRR(const sp<ABuffer> &buf) {
253        uint8_t *ptr = buf->data() + buf->size();
254        ptr[0] = 0x80 | 0;
255        ptr[1] = 201;  // RR
256        ptr[2] = 0;
257        ptr[3] = 1;
258        ptr[4] = 0xde;  // SSRC
259        ptr[5] = 0xad;
260        ptr[6] = 0xbe;
261        ptr[7] = 0xef;
262
263        buf->setRange(0, buf->size() + 8);
264    }
265
266    static void addSDES(int s, const sp<ABuffer> &buffer) {
267        struct sockaddr_in addr;
268        socklen_t addrSize = sizeof(addr);
269        if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
270            inet_aton("0.0.0.0", &(addr.sin_addr));
271        }
272
273        uint8_t *data = buffer->data() + buffer->size();
274        data[0] = 0x80 | 1;
275        data[1] = 202;  // SDES
276        data[4] = 0xde;  // SSRC
277        data[5] = 0xad;
278        data[6] = 0xbe;
279        data[7] = 0xef;
280
281        size_t offset = 8;
282
283        data[offset++] = 1;  // CNAME
284
285        AString cname = "stagefright@";
286        cname.append(inet_ntoa(addr.sin_addr));
287        data[offset++] = cname.size();
288
289        memcpy(&data[offset], cname.c_str(), cname.size());
290        offset += cname.size();
291
292        data[offset++] = 6;  // TOOL
293
294        AString tool = MakeUserAgent();
295
296        data[offset++] = tool.size();
297
298        memcpy(&data[offset], tool.c_str(), tool.size());
299        offset += tool.size();
300
301        data[offset++] = 0;
302
303        if ((offset % 4) > 0) {
304            size_t count = 4 - (offset % 4);
305            switch (count) {
306                case 3:
307                    data[offset++] = 0;
308                case 2:
309                    data[offset++] = 0;
310                case 1:
311                    data[offset++] = 0;
312            }
313        }
314
315        size_t numWords = (offset / 4) - 1;
316        data[2] = numWords >> 8;
317        data[3] = numWords & 0xff;
318
319        buffer->setRange(buffer->offset(), buffer->size() + offset);
320    }
321
322    // In case we're behind NAT, fire off two UDP packets to the remote
323    // rtp/rtcp ports to poke a hole into the firewall for future incoming
324    // packets. We're going to send an RR/SDES RTCP packet to both of them.
325    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
326        struct sockaddr_in addr;
327        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
328        addr.sin_family = AF_INET;
329
330        AString source;
331        AString server_port;
332        if (!GetAttribute(transport.c_str(),
333                          "source",
334                          &source)) {
335            ALOGW("Missing 'source' field in Transport response. Using "
336                 "RTSP endpoint address.");
337
338            struct hostent *ent = gethostbyname(mSessionHost.c_str());
339            if (ent == NULL) {
340                ALOGE("Failed to look up address of session host '%s'",
341                     mSessionHost.c_str());
342
343                return false;
344            }
345
346            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
347        } else {
348            addr.sin_addr.s_addr = inet_addr(source.c_str());
349        }
350
351        if (!GetAttribute(transport.c_str(),
352                                 "server_port",
353                                 &server_port)) {
354            ALOGI("Missing 'server_port' field in Transport response.");
355            return false;
356        }
357
358        int rtpPort, rtcpPort;
359        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
360                || rtpPort <= 0 || rtpPort > 65535
361                || rtcpPort <=0 || rtcpPort > 65535
362                || rtcpPort != rtpPort + 1) {
363            ALOGE("Server picked invalid RTP/RTCP port pair %s,"
364                 " RTP port must be even, RTCP port must be one higher.",
365                 server_port.c_str());
366
367            return false;
368        }
369
370        if (rtpPort & 1) {
371            ALOGW("Server picked an odd RTP port, it should've picked an "
372                 "even one, we'll let it pass for now, but this may break "
373                 "in the future.");
374        }
375
376        if (addr.sin_addr.s_addr == INADDR_NONE) {
377            return true;
378        }
379
380        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
381            // No firewalls to traverse on the loopback interface.
382            return true;
383        }
384
385        // Make up an RR/SDES RTCP packet.
386        sp<ABuffer> buf = new ABuffer(65536);
387        buf->setRange(0, 0);
388        addRR(buf);
389        addSDES(rtpSocket, buf);
390
391        addr.sin_port = htons(rtpPort);
392
393        ssize_t n = sendto(
394                rtpSocket, buf->data(), buf->size(), 0,
395                (const sockaddr *)&addr, sizeof(addr));
396
397        if (n < (ssize_t)buf->size()) {
398            ALOGE("failed to poke a hole for RTP packets");
399            return false;
400        }
401
402        addr.sin_port = htons(rtcpPort);
403
404        n = sendto(
405                rtcpSocket, buf->data(), buf->size(), 0,
406                (const sockaddr *)&addr, sizeof(addr));
407
408        if (n < (ssize_t)buf->size()) {
409            ALOGE("failed to poke a hole for RTCP packets");
410            return false;
411        }
412
413        ALOGV("successfully poked holes.");
414
415        return true;
416    }
417
418    static bool isLiveStream(const sp<ASessionDescription> &desc) {
419        AString attrLiveStream;
420        if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
421            ssize_t semicolonPos = attrLiveStream.find(";", 2);
422
423            const char* liveStreamValue;
424            if (semicolonPos < 0) {
425                liveStreamValue = attrLiveStream.c_str();
426            } else {
427                AString valString;
428                valString.setTo(attrLiveStream,
429                        semicolonPos + 1,
430                        attrLiveStream.size() - semicolonPos - 1);
431                liveStreamValue = valString.c_str();
432            }
433
434            uint32_t value = strtoul(liveStreamValue, NULL, 10);
435            if (value == 1) {
436                ALOGV("found live stream");
437                return true;
438            }
439        } else {
440            // It is a live stream if no duration is returned
441            int64_t durationUs;
442            if (!desc->getDurationUs(&durationUs)) {
443                ALOGV("No duration found, assume live stream");
444                return true;
445            }
446        }
447
448        return false;
449    }
450
451    virtual void onMessageReceived(const sp<AMessage> &msg) {
452        switch (msg->what()) {
453            case 'conn':
454            {
455                int32_t result;
456                CHECK(msg->findInt32("result", &result));
457
458                ALOGI("connection request completed with result %d (%s)",
459                     result, strerror(-result));
460
461                if (result == OK) {
462                    AString request;
463                    request = "DESCRIBE ";
464                    request.append(mSessionURL);
465                    request.append(" RTSP/1.0\r\n");
466                    request.append("Accept: application/sdp\r\n");
467                    request.append("\r\n");
468
469                    sp<AMessage> reply = new AMessage('desc', this);
470                    mConn->sendRequest(request.c_str(), reply);
471                } else {
472                    (new AMessage('disc', this))->post();
473                }
474                break;
475            }
476
477            case 'disc':
478            {
479                ++mKeepAliveGeneration;
480
481                int32_t reconnect;
482                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
483                    sp<AMessage> reply = new AMessage('conn', this);
484                    mConn->connect(mOriginalSessionURL.c_str(), reply);
485                } else {
486                    (new AMessage('quit', this))->post();
487                }
488                break;
489            }
490
491            case 'desc':
492            {
493                int32_t result;
494                CHECK(msg->findInt32("result", &result));
495
496                ALOGI("DESCRIBE completed with result %d (%s)",
497                     result, strerror(-result));
498
499                if (result == OK) {
500                    sp<RefBase> obj;
501                    CHECK(msg->findObject("response", &obj));
502                    sp<ARTSPResponse> response =
503                        static_cast<ARTSPResponse *>(obj.get());
504
505                    if (response->mStatusCode == 301 || response->mStatusCode == 302) {
506                        ssize_t i = response->mHeaders.indexOfKey("location");
507                        CHECK_GE(i, 0);
508
509                        mOriginalSessionURL = response->mHeaders.valueAt(i);
510                        mSessionURL = mOriginalSessionURL;
511
512                        // Strip any authentication info from the session url, we don't
513                        // want to transmit user/pass in cleartext.
514                        AString host, path, user, pass;
515                        unsigned port;
516                        if (ARTSPConnection::ParseURL(
517                                    mSessionURL.c_str(), &host, &port, &path, &user, &pass)
518                                && user.size() > 0) {
519                            mSessionURL.clear();
520                            mSessionURL.append("rtsp://");
521                            mSessionURL.append(host);
522                            mSessionURL.append(":");
523                            mSessionURL.append(AStringPrintf("%u", port));
524                            mSessionURL.append(path);
525
526                            ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
527                        }
528
529                        sp<AMessage> reply = new AMessage('conn', this);
530                        mConn->connect(mOriginalSessionURL.c_str(), reply);
531                        break;
532                    }
533
534                    if (response->mStatusCode != 200) {
535                        result = UNKNOWN_ERROR;
536                    } else if (response->mContent == NULL) {
537                        result = ERROR_MALFORMED;
538                        ALOGE("The response has no content.");
539                    } else {
540                        mSessionDesc = new ASessionDescription;
541
542                        mSessionDesc->setTo(
543                                response->mContent->data(),
544                                response->mContent->size());
545
546                        if (!mSessionDesc->isValid()) {
547                            ALOGE("Failed to parse session description.");
548                            result = ERROR_MALFORMED;
549                        } else {
550                            ssize_t i = response->mHeaders.indexOfKey("content-base");
551                            if (i >= 0) {
552                                mBaseURL = response->mHeaders.valueAt(i);
553                            } else {
554                                i = response->mHeaders.indexOfKey("content-location");
555                                if (i >= 0) {
556                                    mBaseURL = response->mHeaders.valueAt(i);
557                                } else {
558                                    mBaseURL = mSessionURL;
559                                }
560                            }
561
562                            mSeekable = !isLiveStream(mSessionDesc);
563
564                            if (!mBaseURL.startsWith("rtsp://")) {
565                                // Some misbehaving servers specify a relative
566                                // URL in one of the locations above, combine
567                                // it with the absolute session URL to get
568                                // something usable...
569
570                                ALOGW("Server specified a non-absolute base URL"
571                                     ", combining it with the session URL to "
572                                     "get something usable...");
573
574                                AString tmp;
575                                CHECK(MakeURL(
576                                            mSessionURL.c_str(),
577                                            mBaseURL.c_str(),
578                                            &tmp));
579
580                                mBaseURL = tmp;
581                            }
582
583                            mControlURL = getControlURL();
584
585                            if (mSessionDesc->countTracks() < 2) {
586                                // There's no actual tracks in this session.
587                                // The first "track" is merely session meta
588                                // data.
589
590                                ALOGW("Session doesn't contain any playable "
591                                     "tracks. Aborting.");
592                                result = ERROR_UNSUPPORTED;
593                            } else {
594                                setupTrack(1);
595                            }
596                        }
597                    }
598                }
599
600                if (result != OK) {
601                    sp<AMessage> reply = new AMessage('disc', this);
602                    mConn->disconnect(reply);
603                }
604                break;
605            }
606
607            case 'sdpl':
608            {
609                int32_t result;
610                CHECK(msg->findInt32("result", &result));
611
612                ALOGI("SDP connection request completed with result %d (%s)",
613                     result, strerror(-result));
614
615                if (result == OK) {
616                    sp<RefBase> obj;
617                    CHECK(msg->findObject("description", &obj));
618                    mSessionDesc =
619                        static_cast<ASessionDescription *>(obj.get());
620
621                    if (!mSessionDesc->isValid()) {
622                        ALOGE("Failed to parse session description.");
623                        result = ERROR_MALFORMED;
624                    } else {
625                        mBaseURL = mSessionURL;
626
627                        mSeekable = !isLiveStream(mSessionDesc);
628
629                        mControlURL = getControlURL();
630
631                        if (mSessionDesc->countTracks() < 2) {
632                            // There's no actual tracks in this session.
633                            // The first "track" is merely session meta
634                            // data.
635
636                            ALOGW("Session doesn't contain any playable "
637                                 "tracks. Aborting.");
638                            result = ERROR_UNSUPPORTED;
639                        } else {
640                            setupTrack(1);
641                        }
642                    }
643                }
644
645                if (result != OK) {
646                    sp<AMessage> reply = new AMessage('disc', this);
647                    mConn->disconnect(reply);
648                }
649                break;
650            }
651
652            case 'setu':
653            {
654                size_t index;
655                CHECK(msg->findSize("index", &index));
656
657                TrackInfo *track = NULL;
658                size_t trackIndex;
659                if (msg->findSize("track-index", &trackIndex)) {
660                    track = &mTracks.editItemAt(trackIndex);
661                }
662
663                int32_t result;
664                CHECK(msg->findInt32("result", &result));
665
666                ALOGI("SETUP(%zu) completed with result %d (%s)",
667                     index, result, strerror(-result));
668
669                if (result == OK) {
670                    CHECK(track != NULL);
671
672                    sp<RefBase> obj;
673                    CHECK(msg->findObject("response", &obj));
674                    sp<ARTSPResponse> response =
675                        static_cast<ARTSPResponse *>(obj.get());
676
677                    if (response->mStatusCode != 200) {
678                        result = UNKNOWN_ERROR;
679                    } else {
680                        ssize_t i = response->mHeaders.indexOfKey("session");
681                        CHECK_GE(i, 0);
682
683                        mSessionID = response->mHeaders.valueAt(i);
684
685                        mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
686                        AString timeoutStr;
687                        if (GetAttribute(
688                                    mSessionID.c_str(), "timeout", &timeoutStr)) {
689                            char *end;
690                            unsigned long timeoutSecs =
691                                strtoul(timeoutStr.c_str(), &end, 10);
692
693                            if (end == timeoutStr.c_str() || *end != '\0') {
694                                ALOGW("server specified malformed timeout '%s'",
695                                     timeoutStr.c_str());
696
697                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
698                            } else if (timeoutSecs < 15) {
699                                ALOGW("server specified too short a timeout "
700                                     "(%lu secs), using default.",
701                                     timeoutSecs);
702
703                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
704                            } else {
705                                mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
706
707                                ALOGI("server specified timeout of %lu secs.",
708                                     timeoutSecs);
709                            }
710                        }
711
712                        i = mSessionID.find(";");
713                        if (i >= 0) {
714                            // Remove options, i.e. ";timeout=90"
715                            mSessionID.erase(i, mSessionID.size() - i);
716                        }
717
718                        sp<AMessage> notify = new AMessage('accu', this);
719                        notify->setSize("track-index", trackIndex);
720
721                        i = response->mHeaders.indexOfKey("transport");
722                        CHECK_GE(i, 0);
723
724                        if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
725                            if (!track->mUsingInterleavedTCP) {
726                                AString transport = response->mHeaders.valueAt(i);
727
728                                // We are going to continue even if we were
729                                // unable to poke a hole into the firewall...
730                                pokeAHole(
731                                        track->mRTPSocket,
732                                        track->mRTCPSocket,
733                                        transport);
734                            }
735
736                            mRTPConn->addStream(
737                                    track->mRTPSocket, track->mRTCPSocket,
738                                    mSessionDesc, index,
739                                    notify, track->mUsingInterleavedTCP);
740
741                            mSetupTracksSuccessful = true;
742                        } else {
743                            result = BAD_VALUE;
744                        }
745                    }
746                }
747
748                if (result != OK) {
749                    if (track) {
750                        if (!track->mUsingInterleavedTCP) {
751                            // Clear the tag
752                            if (mUIDValid) {
753                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
754                                HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
755                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
756                                HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
757                            }
758
759                            close(track->mRTPSocket);
760                            close(track->mRTCPSocket);
761                        }
762
763                        mTracks.removeItemsAt(trackIndex);
764                    }
765                }
766
767                ++index;
768                if (result == OK && index < mSessionDesc->countTracks()) {
769                    setupTrack(index);
770                } else if (mSetupTracksSuccessful) {
771                    ++mKeepAliveGeneration;
772                    postKeepAlive();
773
774                    AString request = "PLAY ";
775                    request.append(mControlURL);
776                    request.append(" RTSP/1.0\r\n");
777
778                    request.append("Session: ");
779                    request.append(mSessionID);
780                    request.append("\r\n");
781
782                    request.append("\r\n");
783
784                    sp<AMessage> reply = new AMessage('play', this);
785                    mConn->sendRequest(request.c_str(), reply);
786                } else {
787                    sp<AMessage> reply = new AMessage('disc', this);
788                    mConn->disconnect(reply);
789                }
790                break;
791            }
792
793            case 'play':
794            {
795                int32_t result;
796                CHECK(msg->findInt32("result", &result));
797
798                ALOGI("PLAY completed with result %d (%s)",
799                     result, strerror(-result));
800
801                if (result == OK) {
802                    sp<RefBase> obj;
803                    CHECK(msg->findObject("response", &obj));
804                    sp<ARTSPResponse> response =
805                        static_cast<ARTSPResponse *>(obj.get());
806
807                    if (response->mStatusCode != 200) {
808                        result = UNKNOWN_ERROR;
809                    } else {
810                        parsePlayResponse(response);
811                        postTimeout();
812                    }
813                }
814
815                if (result != OK) {
816                    sp<AMessage> reply = new AMessage('disc', this);
817                    mConn->disconnect(reply);
818                }
819
820                break;
821            }
822
823            case 'aliv':
824            {
825                int32_t generation;
826                CHECK(msg->findInt32("generation", &generation));
827
828                if (generation != mKeepAliveGeneration) {
829                    // obsolete event.
830                    break;
831                }
832
833                AString request;
834                request.append("OPTIONS ");
835                request.append(mSessionURL);
836                request.append(" RTSP/1.0\r\n");
837                request.append("Session: ");
838                request.append(mSessionID);
839                request.append("\r\n");
840                request.append("\r\n");
841
842                sp<AMessage> reply = new AMessage('opts', this);
843                reply->setInt32("generation", mKeepAliveGeneration);
844                mConn->sendRequest(request.c_str(), reply);
845                break;
846            }
847
848            case 'opts':
849            {
850                int32_t result;
851                CHECK(msg->findInt32("result", &result));
852
853                ALOGI("OPTIONS completed with result %d (%s)",
854                     result, strerror(-result));
855
856                int32_t generation;
857                CHECK(msg->findInt32("generation", &generation));
858
859                if (generation != mKeepAliveGeneration) {
860                    // obsolete event.
861                    break;
862                }
863
864                postKeepAlive();
865                break;
866            }
867
868            case 'abor':
869            {
870                for (size_t i = 0; i < mTracks.size(); ++i) {
871                    TrackInfo *info = &mTracks.editItemAt(i);
872
873                    if (!mFirstAccessUnit) {
874                        postQueueEOS(i, ERROR_END_OF_STREAM);
875                    }
876
877                    if (!info->mUsingInterleavedTCP) {
878                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
879
880                        // Clear the tag
881                        if (mUIDValid) {
882                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
883                            HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
884                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
885                            HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
886                        }
887
888                        close(info->mRTPSocket);
889                        close(info->mRTCPSocket);
890                    }
891                }
892                mTracks.clear();
893                mSetupTracksSuccessful = false;
894                mSeekPending = false;
895                mFirstAccessUnit = true;
896                mAllTracksHaveTime = false;
897                mNTPAnchorUs = -1;
898                mMediaAnchorUs = -1;
899                mNumAccessUnitsReceived = 0;
900                mReceivedFirstRTCPPacket = false;
901                mReceivedFirstRTPPacket = false;
902                mPausing = false;
903                mSeekable = true;
904
905                sp<AMessage> reply = new AMessage('tear', this);
906
907                int32_t reconnect;
908                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
909                    reply->setInt32("reconnect", true);
910                }
911
912                AString request;
913                request = "TEARDOWN ";
914
915                // XXX should use aggregate url from SDP here...
916                request.append(mSessionURL);
917                request.append(" RTSP/1.0\r\n");
918
919                request.append("Session: ");
920                request.append(mSessionID);
921                request.append("\r\n");
922
923                request.append("\r\n");
924
925                mConn->sendRequest(request.c_str(), reply);
926                break;
927            }
928
929            case 'tear':
930            {
931                int32_t result;
932                CHECK(msg->findInt32("result", &result));
933
934                ALOGI("TEARDOWN completed with result %d (%s)",
935                     result, strerror(-result));
936
937                sp<AMessage> reply = new AMessage('disc', this);
938
939                int32_t reconnect;
940                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
941                    reply->setInt32("reconnect", true);
942                }
943
944                mConn->disconnect(reply);
945                break;
946            }
947
948            case 'quit':
949            {
950                sp<AMessage> msg = mNotify->dup();
951                msg->setInt32("what", kWhatDisconnected);
952                msg->setInt32("result", UNKNOWN_ERROR);
953                msg->post();
954                break;
955            }
956
957            case 'chek':
958            {
959                int32_t generation;
960                CHECK(msg->findInt32("generation", &generation));
961                if (generation != mCheckGeneration) {
962                    // This is an outdated message. Ignore.
963                    break;
964                }
965
966                if (mNumAccessUnitsReceived == 0) {
967#if 1
968                    ALOGI("stream ended? aborting.");
969                    (new AMessage('abor', this))->post();
970                    break;
971#else
972                    ALOGI("haven't seen an AU in a looong time.");
973#endif
974                }
975
976                mNumAccessUnitsReceived = 0;
977                msg->post(kAccessUnitTimeoutUs);
978                break;
979            }
980
981            case 'accu':
982            {
983                if (mSeekPending) {
984                    ALOGV("Stale access unit.");
985                    break;
986                }
987
988                int32_t timeUpdate;
989                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
990                    size_t trackIndex;
991                    CHECK(msg->findSize("track-index", &trackIndex));
992
993                    uint32_t rtpTime;
994                    uint64_t ntpTime;
995                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
996                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
997
998                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
999                    break;
1000                }
1001
1002                int32_t first;
1003                if (msg->findInt32("first-rtcp", &first)) {
1004                    mReceivedFirstRTCPPacket = true;
1005                    break;
1006                }
1007
1008                if (msg->findInt32("first-rtp", &first)) {
1009                    mReceivedFirstRTPPacket = true;
1010                    break;
1011                }
1012
1013                ++mNumAccessUnitsReceived;
1014                postAccessUnitTimeoutCheck();
1015
1016                size_t trackIndex;
1017                CHECK(msg->findSize("track-index", &trackIndex));
1018
1019                if (trackIndex >= mTracks.size()) {
1020                    ALOGV("late packets ignored.");
1021                    break;
1022                }
1023
1024                TrackInfo *track = &mTracks.editItemAt(trackIndex);
1025
1026                int32_t eos;
1027                if (msg->findInt32("eos", &eos)) {
1028                    ALOGI("received BYE on track index %zu", trackIndex);
1029                    if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1030                        ALOGI("No time established => fake existing data");
1031
1032                        track->mEOSReceived = true;
1033                        mTryFakeRTCP = true;
1034                        mReceivedFirstRTCPPacket = true;
1035                        fakeTimestamps();
1036                    } else {
1037                        postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1038                    }
1039                    return;
1040                }
1041
1042                if (mSeekPending) {
1043                    ALOGV("we're seeking, dropping stale packet.");
1044                    break;
1045                }
1046
1047                sp<ABuffer> accessUnit;
1048                CHECK(msg->findBuffer("access-unit", &accessUnit));
1049                onAccessUnitComplete(trackIndex, accessUnit);
1050                break;
1051            }
1052
1053            case 'paus':
1054            {
1055                int32_t generation;
1056                CHECK(msg->findInt32("pausecheck", &generation));
1057                if (generation != mPauseGeneration) {
1058                    ALOGV("Ignoring outdated pause message.");
1059                    break;
1060                }
1061
1062                if (!mSeekable) {
1063                    ALOGW("This is a live stream, ignoring pause request.");
1064                    break;
1065                }
1066
1067                if (mPausing) {
1068                    ALOGV("This stream is already paused.");
1069                    break;
1070                }
1071
1072                mCheckPending = true;
1073                ++mCheckGeneration;
1074                mPausing = true;
1075
1076                AString request = "PAUSE ";
1077                request.append(mControlURL);
1078                request.append(" RTSP/1.0\r\n");
1079
1080                request.append("Session: ");
1081                request.append(mSessionID);
1082                request.append("\r\n");
1083
1084                request.append("\r\n");
1085
1086                sp<AMessage> reply = new AMessage('pau2', this);
1087                mConn->sendRequest(request.c_str(), reply);
1088                break;
1089            }
1090
1091            case 'pau2':
1092            {
1093                int32_t result;
1094                CHECK(msg->findInt32("result", &result));
1095                mCheckTimeoutGeneration++;
1096
1097                ALOGI("PAUSE completed with result %d (%s)",
1098                     result, strerror(-result));
1099                break;
1100            }
1101
1102            case 'resu':
1103            {
1104                if (mPausing && mSeekPending) {
1105                    // If seeking, Play will be sent from see1 instead
1106                    break;
1107                }
1108
1109                if (!mPausing) {
1110                    // Dont send PLAY if we have not paused
1111                    break;
1112                }
1113                AString request = "PLAY ";
1114                request.append(mControlURL);
1115                request.append(" RTSP/1.0\r\n");
1116
1117                request.append("Session: ");
1118                request.append(mSessionID);
1119                request.append("\r\n");
1120
1121                request.append("\r\n");
1122
1123                sp<AMessage> reply = new AMessage('res2', this);
1124                mConn->sendRequest(request.c_str(), reply);
1125                break;
1126            }
1127
1128            case 'res2':
1129            {
1130                int32_t result;
1131                CHECK(msg->findInt32("result", &result));
1132
1133                ALOGI("PLAY (for resume) completed with result %d (%s)",
1134                     result, strerror(-result));
1135
1136                mCheckPending = false;
1137                ++mCheckGeneration;
1138                postAccessUnitTimeoutCheck();
1139
1140                if (result == OK) {
1141                    sp<RefBase> obj;
1142                    CHECK(msg->findObject("response", &obj));
1143                    sp<ARTSPResponse> response =
1144                        static_cast<ARTSPResponse *>(obj.get());
1145
1146                    if (response->mStatusCode != 200) {
1147                        result = UNKNOWN_ERROR;
1148                    } else {
1149                        parsePlayResponse(response);
1150
1151                        // Post new timeout in order to make sure to use
1152                        // fake timestamps if no new Sender Reports arrive
1153                        postTimeout();
1154                    }
1155                }
1156
1157                if (result != OK) {
1158                    ALOGE("resume failed, aborting.");
1159                    (new AMessage('abor', this))->post();
1160                }
1161
1162                mPausing = false;
1163                break;
1164            }
1165
1166            case 'seek':
1167            {
1168                if (!mSeekable) {
1169                    ALOGW("This is a live stream, ignoring seek request.");
1170
1171                    sp<AMessage> msg = mNotify->dup();
1172                    msg->setInt32("what", kWhatSeekDone);
1173                    msg->post();
1174                    break;
1175                }
1176
1177                int64_t timeUs;
1178                CHECK(msg->findInt64("time", &timeUs));
1179
1180                mSeekPending = true;
1181
1182                // Disable the access unit timeout until we resumed
1183                // playback again.
1184                mCheckPending = true;
1185                ++mCheckGeneration;
1186
1187                sp<AMessage> reply = new AMessage('see0', this);
1188                reply->setInt64("time", timeUs);
1189
1190                if (mPausing) {
1191                    // PAUSE already sent
1192                    ALOGI("Pause already sent");
1193                    reply->post();
1194                    break;
1195                }
1196                AString request = "PAUSE ";
1197                request.append(mControlURL);
1198                request.append(" RTSP/1.0\r\n");
1199
1200                request.append("Session: ");
1201                request.append(mSessionID);
1202                request.append("\r\n");
1203
1204                request.append("\r\n");
1205
1206                mConn->sendRequest(request.c_str(), reply);
1207                break;
1208            }
1209
1210            case 'see0':
1211            {
1212                // Session is paused now.
1213                status_t err = OK;
1214                msg->findInt32("result", &err);
1215
1216                int64_t timeUs;
1217                CHECK(msg->findInt64("time", &timeUs));
1218
1219                sp<AMessage> notify = mNotify->dup();
1220                notify->setInt32("what", kWhatSeekPaused);
1221                notify->setInt32("err", err);
1222                notify->setInt64("time", timeUs);
1223                notify->post();
1224                break;
1225
1226            }
1227
1228            case 'see1':
1229            {
1230                for (size_t i = 0; i < mTracks.size(); ++i) {
1231                    TrackInfo *info = &mTracks.editItemAt(i);
1232
1233                    postQueueSeekDiscontinuity(i);
1234                    info->mEOSReceived = false;
1235
1236                    info->mRTPAnchor = 0;
1237                    info->mNTPAnchorUs = -1;
1238                }
1239
1240                mAllTracksHaveTime = false;
1241                mNTPAnchorUs = -1;
1242
1243                // Start new timeoutgeneration to avoid getting timeout
1244                // before PLAY response arrive
1245                postTimeout();
1246
1247                int64_t timeUs;
1248                CHECK(msg->findInt64("time", &timeUs));
1249
1250                AString request = "PLAY ";
1251                request.append(mControlURL);
1252                request.append(" RTSP/1.0\r\n");
1253
1254                request.append("Session: ");
1255                request.append(mSessionID);
1256                request.append("\r\n");
1257
1258                request.append(
1259                        AStringPrintf(
1260                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1261
1262                request.append("\r\n");
1263
1264                sp<AMessage> reply = new AMessage('see2', this);
1265                mConn->sendRequest(request.c_str(), reply);
1266                break;
1267            }
1268
1269            case 'see2':
1270            {
1271                if (mTracks.size() == 0) {
1272                    // We have already hit abor, break
1273                    break;
1274                }
1275
1276                int32_t result;
1277                CHECK(msg->findInt32("result", &result));
1278
1279                ALOGI("PLAY (for seek) completed with result %d (%s)",
1280                     result, strerror(-result));
1281
1282                mCheckPending = false;
1283                ++mCheckGeneration;
1284                postAccessUnitTimeoutCheck();
1285
1286                if (result == OK) {
1287                    sp<RefBase> obj;
1288                    CHECK(msg->findObject("response", &obj));
1289                    sp<ARTSPResponse> response =
1290                        static_cast<ARTSPResponse *>(obj.get());
1291
1292                    if (response->mStatusCode != 200) {
1293                        result = UNKNOWN_ERROR;
1294                    } else {
1295                        parsePlayResponse(response);
1296
1297                        // Post new timeout in order to make sure to use
1298                        // fake timestamps if no new Sender Reports arrive
1299                        postTimeout();
1300
1301                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1302                        CHECK_GE(i, 0);
1303
1304                        ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1305
1306                        ALOGI("seek completed.");
1307                    }
1308                }
1309
1310                if (result != OK) {
1311                    ALOGE("seek failed, aborting.");
1312                    (new AMessage('abor', this))->post();
1313                }
1314
1315                mPausing = false;
1316                mSeekPending = false;
1317
1318                // Discard all stale access units.
1319                for (size_t i = 0; i < mTracks.size(); ++i) {
1320                    TrackInfo *track = &mTracks.editItemAt(i);
1321                    track->mPackets.clear();
1322                }
1323
1324                sp<AMessage> msg = mNotify->dup();
1325                msg->setInt32("what", kWhatSeekDone);
1326                msg->post();
1327                break;
1328            }
1329
1330            case 'biny':
1331            {
1332                sp<ABuffer> buffer;
1333                CHECK(msg->findBuffer("buffer", &buffer));
1334
1335                int32_t index;
1336                CHECK(buffer->meta()->findInt32("index", &index));
1337
1338                mRTPConn->injectPacket(index, buffer);
1339                break;
1340            }
1341
1342            case 'tiou':
1343            {
1344                int32_t timeoutGenerationCheck;
1345                CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1346                if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1347                    // This is an outdated message. Ignore.
1348                    // This typically happens if a lot of seeks are
1349                    // performed, since new timeout messages now are
1350                    // posted at seek as well.
1351                    break;
1352                }
1353                if (!mReceivedFirstRTCPPacket) {
1354                    if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1355                        ALOGW("We received RTP packets but no RTCP packets, "
1356                             "using fake timestamps.");
1357
1358                        mTryFakeRTCP = true;
1359
1360                        mReceivedFirstRTCPPacket = true;
1361
1362                        fakeTimestamps();
1363                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1364                        ALOGW("Never received any data, switching transports.");
1365
1366                        mTryTCPInterleaving = true;
1367
1368                        sp<AMessage> msg = new AMessage('abor', this);
1369                        msg->setInt32("reconnect", true);
1370                        msg->post();
1371                    } else {
1372                        ALOGW("Never received any data, disconnecting.");
1373                        (new AMessage('abor', this))->post();
1374                    }
1375                } else {
1376                    if (!mAllTracksHaveTime) {
1377                        ALOGW("We received some RTCP packets, but time "
1378                              "could not be established on all tracks, now "
1379                              "using fake timestamps");
1380
1381                        fakeTimestamps();
1382                    }
1383                }
1384                break;
1385            }
1386
1387            default:
1388                TRESPASS();
1389                break;
1390        }
1391    }
1392
1393    void postKeepAlive() {
1394        sp<AMessage> msg = new AMessage('aliv', this);
1395        msg->setInt32("generation", mKeepAliveGeneration);
1396        msg->post((mKeepAliveTimeoutUs * 9) / 10);
1397    }
1398
1399    void cancelAccessUnitTimeoutCheck() {
1400        ALOGV("cancelAccessUnitTimeoutCheck");
1401        ++mCheckGeneration;
1402    }
1403
1404    void postAccessUnitTimeoutCheck() {
1405        if (mCheckPending) {
1406            return;
1407        }
1408
1409        mCheckPending = true;
1410        sp<AMessage> check = new AMessage('chek', this);
1411        check->setInt32("generation", mCheckGeneration);
1412        check->post(kAccessUnitTimeoutUs);
1413    }
1414
1415    static void SplitString(
1416            const AString &s, const char *separator, List<AString> *items) {
1417        items->clear();
1418        size_t start = 0;
1419        while (start < s.size()) {
1420            ssize_t offset = s.find(separator, start);
1421
1422            if (offset < 0) {
1423                items->push_back(AString(s, start, s.size() - start));
1424                break;
1425            }
1426
1427            items->push_back(AString(s, start, offset - start));
1428            start = offset + strlen(separator);
1429        }
1430    }
1431
1432    void parsePlayResponse(const sp<ARTSPResponse> &response) {
1433        mPlayResponseParsed = true;
1434        if (mTracks.size() == 0) {
1435            ALOGV("parsePlayResponse: late packets ignored.");
1436            return;
1437        }
1438
1439        ssize_t i = response->mHeaders.indexOfKey("range");
1440        if (i < 0) {
1441            // Server doesn't even tell use what range it is going to
1442            // play, therefore we won't support seeking.
1443            return;
1444        }
1445
1446        AString range = response->mHeaders.valueAt(i);
1447        ALOGV("Range: %s", range.c_str());
1448
1449        AString val;
1450        CHECK(GetAttribute(range.c_str(), "npt", &val));
1451
1452        float npt1, npt2;
1453        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1454            // This is a live stream and therefore not seekable.
1455
1456            ALOGI("This is a live stream");
1457            return;
1458        }
1459
1460        i = response->mHeaders.indexOfKey("rtp-info");
1461        CHECK_GE(i, 0);
1462
1463        AString rtpInfo = response->mHeaders.valueAt(i);
1464        List<AString> streamInfos;
1465        SplitString(rtpInfo, ",", &streamInfos);
1466
1467        int n = 1;
1468        for (List<AString>::iterator it = streamInfos.begin();
1469             it != streamInfos.end(); ++it) {
1470            (*it).trim();
1471            ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1472
1473            CHECK(GetAttribute((*it).c_str(), "url", &val));
1474
1475            size_t trackIndex = 0;
1476            while (trackIndex < mTracks.size()
1477                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1478                ++trackIndex;
1479            }
1480            CHECK_LT(trackIndex, mTracks.size());
1481
1482            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1483
1484            char *end;
1485            unsigned long seq = strtoul(val.c_str(), &end, 10);
1486
1487            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1488            info->mFirstSeqNumInSegment = seq;
1489            info->mNewSegment = true;
1490            info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1491
1492            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1493
1494            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1495
1496            ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1497
1498            info->mNormalPlayTimeRTP = rtpTime;
1499            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1500
1501            if (!mFirstAccessUnit) {
1502                postNormalPlayTimeMapping(
1503                        trackIndex,
1504                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1505            }
1506
1507            ++n;
1508        }
1509    }
1510
1511    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1512        CHECK_GE(index, 0u);
1513        CHECK_LT(index, mTracks.size());
1514
1515        const TrackInfo &info = mTracks.itemAt(index);
1516
1517        *timeScale = info.mTimeScale;
1518
1519        return info.mPacketSource->getFormat();
1520    }
1521
1522    size_t countTracks() const {
1523        return mTracks.size();
1524    }
1525
1526private:
1527    struct TrackInfo {
1528        AString mURL;
1529        int mRTPSocket;
1530        int mRTCPSocket;
1531        bool mUsingInterleavedTCP;
1532        uint32_t mFirstSeqNumInSegment;
1533        bool mNewSegment;
1534        int32_t mAllowedStaleAccessUnits;
1535
1536        uint32_t mRTPAnchor;
1537        int64_t mNTPAnchorUs;
1538        int32_t mTimeScale;
1539        bool mEOSReceived;
1540
1541        uint32_t mNormalPlayTimeRTP;
1542        int64_t mNormalPlayTimeUs;
1543
1544        sp<APacketSource> mPacketSource;
1545
1546        // Stores packets temporarily while no notion of time
1547        // has been established yet.
1548        List<sp<ABuffer> > mPackets;
1549    };
1550
1551    sp<AMessage> mNotify;
1552    bool mUIDValid;
1553    uid_t mUID;
1554    sp<ALooper> mNetLooper;
1555    sp<ARTSPConnection> mConn;
1556    sp<ARTPConnection> mRTPConn;
1557    sp<ASessionDescription> mSessionDesc;
1558    AString mOriginalSessionURL;  // This one still has user:pass@
1559    AString mSessionURL;
1560    AString mSessionHost;
1561    AString mBaseURL;
1562    AString mControlURL;
1563    AString mSessionID;
1564    bool mSetupTracksSuccessful;
1565    bool mSeekPending;
1566    bool mFirstAccessUnit;
1567
1568    bool mAllTracksHaveTime;
1569    int64_t mNTPAnchorUs;
1570    int64_t mMediaAnchorUs;
1571    int64_t mLastMediaTimeUs;
1572
1573    int64_t mNumAccessUnitsReceived;
1574    bool mCheckPending;
1575    int32_t mCheckGeneration;
1576    int32_t mCheckTimeoutGeneration;
1577    bool mTryTCPInterleaving;
1578    bool mTryFakeRTCP;
1579    bool mReceivedFirstRTCPPacket;
1580    bool mReceivedFirstRTPPacket;
1581    bool mSeekable;
1582    int64_t mKeepAliveTimeoutUs;
1583    int32_t mKeepAliveGeneration;
1584    bool mPausing;
1585    int32_t mPauseGeneration;
1586
1587    Vector<TrackInfo> mTracks;
1588
1589    bool mPlayResponseParsed;
1590
1591    void setupTrack(size_t index) {
1592        sp<APacketSource> source =
1593            new APacketSource(mSessionDesc, index);
1594
1595        if (source->initCheck() != OK) {
1596            ALOGW("Unsupported format. Ignoring track #%zu.", index);
1597
1598            sp<AMessage> reply = new AMessage('setu', this);
1599            reply->setSize("index", index);
1600            reply->setInt32("result", ERROR_UNSUPPORTED);
1601            reply->post();
1602            return;
1603        }
1604
1605        AString url;
1606        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1607
1608        AString trackURL;
1609        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1610
1611        mTracks.push(TrackInfo());
1612        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1613        info->mURL = trackURL;
1614        info->mPacketSource = source;
1615        info->mUsingInterleavedTCP = false;
1616        info->mFirstSeqNumInSegment = 0;
1617        info->mNewSegment = true;
1618        info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1619        info->mRTPSocket = -1;
1620        info->mRTCPSocket = -1;
1621        info->mRTPAnchor = 0;
1622        info->mNTPAnchorUs = -1;
1623        info->mNormalPlayTimeRTP = 0;
1624        info->mNormalPlayTimeUs = 0ll;
1625
1626        unsigned long PT;
1627        AString formatDesc;
1628        AString formatParams;
1629        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1630
1631        int32_t timescale;
1632        int32_t numChannels;
1633        ASessionDescription::ParseFormatDesc(
1634                formatDesc.c_str(), &timescale, &numChannels);
1635
1636        info->mTimeScale = timescale;
1637        info->mEOSReceived = false;
1638
1639        ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1640
1641        AString request = "SETUP ";
1642        request.append(trackURL);
1643        request.append(" RTSP/1.0\r\n");
1644
1645        if (mTryTCPInterleaving) {
1646            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1647            info->mUsingInterleavedTCP = true;
1648            info->mRTPSocket = interleaveIndex;
1649            info->mRTCPSocket = interleaveIndex + 1;
1650
1651            request.append("Transport: RTP/AVP/TCP;interleaved=");
1652            request.append(interleaveIndex);
1653            request.append("-");
1654            request.append(interleaveIndex + 1);
1655        } else {
1656            unsigned rtpPort;
1657            ARTPConnection::MakePortPair(
1658                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1659
1660            if (mUIDValid) {
1661                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1662                                                (uint32_t)*(uint32_t*) "RTP_");
1663                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1664                                                (uint32_t)*(uint32_t*) "RTP_");
1665                HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1666                HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1667            }
1668
1669            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1670            request.append(rtpPort);
1671            request.append("-");
1672            request.append(rtpPort + 1);
1673        }
1674
1675        request.append("\r\n");
1676
1677        if (index > 1) {
1678            request.append("Session: ");
1679            request.append(mSessionID);
1680            request.append("\r\n");
1681        }
1682
1683        request.append("\r\n");
1684
1685        sp<AMessage> reply = new AMessage('setu', this);
1686        reply->setSize("index", index);
1687        reply->setSize("track-index", mTracks.size() - 1);
1688        mConn->sendRequest(request.c_str(), reply);
1689    }
1690
1691    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1692        out->clear();
1693
1694        if (strncasecmp("rtsp://", baseURL, 7)) {
1695            // Base URL must be absolute
1696            return false;
1697        }
1698
1699        if (!strncasecmp("rtsp://", url, 7)) {
1700            // "url" is already an absolute URL, ignore base URL.
1701            out->setTo(url);
1702            return true;
1703        }
1704
1705        size_t n = strlen(baseURL);
1706        out->setTo(baseURL);
1707        if (baseURL[n - 1] != '/') {
1708            out->append("/");
1709        }
1710        out->append(url);
1711
1712        return true;
1713    }
1714
1715    void fakeTimestamps() {
1716        mNTPAnchorUs = -1ll;
1717        for (size_t i = 0; i < mTracks.size(); ++i) {
1718            onTimeUpdate(i, 0, 0ll);
1719        }
1720    }
1721
1722    bool dataReceivedOnAllChannels() {
1723        TrackInfo *track;
1724        for (size_t i = 0; i < mTracks.size(); ++i) {
1725            track = &mTracks.editItemAt(i);
1726            if (track->mPackets.empty()) {
1727                return false;
1728            }
1729        }
1730        return true;
1731    }
1732
1733    void handleFirstAccessUnit() {
1734        if (mFirstAccessUnit) {
1735            sp<AMessage> msg = mNotify->dup();
1736            msg->setInt32("what", kWhatConnected);
1737            msg->post();
1738
1739            if (mSeekable) {
1740                for (size_t i = 0; i < mTracks.size(); ++i) {
1741                    TrackInfo *info = &mTracks.editItemAt(i);
1742
1743                    postNormalPlayTimeMapping(
1744                            i,
1745                            info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1746                }
1747            }
1748
1749            mFirstAccessUnit = false;
1750        }
1751    }
1752
1753    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1754        ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1755             trackIndex, rtpTime, (long long)ntpTime);
1756
1757        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1758
1759        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1760
1761        track->mRTPAnchor = rtpTime;
1762        track->mNTPAnchorUs = ntpTimeUs;
1763
1764        if (mNTPAnchorUs < 0) {
1765            mNTPAnchorUs = ntpTimeUs;
1766            mMediaAnchorUs = mLastMediaTimeUs;
1767        }
1768
1769        if (!mAllTracksHaveTime) {
1770            bool allTracksHaveTime = (mTracks.size() > 0);
1771            for (size_t i = 0; i < mTracks.size(); ++i) {
1772                TrackInfo *track = &mTracks.editItemAt(i);
1773                if (track->mNTPAnchorUs < 0) {
1774                    allTracksHaveTime = false;
1775                    break;
1776                }
1777            }
1778            if (allTracksHaveTime) {
1779                mAllTracksHaveTime = true;
1780                ALOGI("Time now established for all tracks.");
1781            }
1782        }
1783        if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1784            handleFirstAccessUnit();
1785
1786            // Time is now established, lets start timestamping immediately
1787            for (size_t i = 0; i < mTracks.size(); ++i) {
1788                if (OK != processAccessUnitQueue(i)) {
1789                    return;
1790                }
1791            }
1792            for (size_t i = 0; i < mTracks.size(); ++i) {
1793                TrackInfo *trackInfo = &mTracks.editItemAt(i);
1794                if (trackInfo->mEOSReceived) {
1795                    postQueueEOS(i, ERROR_END_OF_STREAM);
1796                    trackInfo->mEOSReceived = false;
1797                }
1798            }
1799        }
1800    }
1801
1802    status_t processAccessUnitQueue(int32_t trackIndex) {
1803        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1804        while (!track->mPackets.empty()) {
1805            sp<ABuffer> accessUnit = *track->mPackets.begin();
1806            track->mPackets.erase(track->mPackets.begin());
1807
1808            uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1809            if (track->mNewSegment) {
1810                // The sequence number from RTP packet has only 16 bits and is extended
1811                // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1812                // RTSP "PLAY" command should be used to detect the first RTP packet
1813                // after seeking.
1814                if (mSeekable) {
1815                    if (track->mAllowedStaleAccessUnits > 0) {
1816                        uint32_t seqNum16 = seqNum & 0xffff;
1817                        uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1818                        if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1819                                || seqNum16 < firstSeqNumInSegment16) {
1820                            // Not the first rtp packet of the stream after seeking, discarding.
1821                            track->mAllowedStaleAccessUnits--;
1822                            ALOGV("discarding stale access unit (0x%x : 0x%x)",
1823                                 seqNum, track->mFirstSeqNumInSegment);
1824                            continue;
1825                        }
1826                        ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1827                                "Missing the first packet(%u), now take packet(%u) as first one",
1828                                track->mFirstSeqNumInSegment, seqNum);
1829                    } else { // track->mAllowedStaleAccessUnits <= 0
1830                        mNumAccessUnitsReceived = 0;
1831                        ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1832                             "Still no first rtp packet after %d stale ones",
1833                             kMaxAllowedStaleAccessUnits);
1834                        track->mAllowedStaleAccessUnits = -1;
1835                        return UNKNOWN_ERROR;
1836                    }
1837                }
1838
1839                // Now found the first rtp packet of the stream after seeking.
1840                track->mFirstSeqNumInSegment = seqNum;
1841                track->mNewSegment = false;
1842            }
1843
1844            if (seqNum < track->mFirstSeqNumInSegment) {
1845                ALOGV("dropping stale access-unit (%d < %d)",
1846                     seqNum, track->mFirstSeqNumInSegment);
1847                continue;
1848            }
1849
1850            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1851                postQueueAccessUnit(trackIndex, accessUnit);
1852            }
1853        }
1854        return OK;
1855    }
1856
1857    void onAccessUnitComplete(
1858            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1859        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1860        track->mPackets.push_back(accessUnit);
1861
1862        uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1863        ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1864
1865        if(!mPlayResponseParsed){
1866            ALOGV("play response is not parsed");
1867            return;
1868        }
1869
1870        handleFirstAccessUnit();
1871
1872        if (!mAllTracksHaveTime) {
1873            ALOGV("storing accessUnit, no time established yet");
1874            return;
1875        }
1876
1877        if (OK != processAccessUnitQueue(trackIndex)) {
1878            return;
1879        }
1880
1881        if (track->mEOSReceived) {
1882            postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1883            track->mEOSReceived = false;
1884        }
1885    }
1886
1887    bool addMediaTimestamp(
1888            int32_t trackIndex, const TrackInfo *track,
1889            const sp<ABuffer> &accessUnit) {
1890        UNUSED_UNLESS_VERBOSE(trackIndex);
1891
1892        uint32_t rtpTime;
1893        CHECK(accessUnit->meta()->findInt32(
1894                    "rtp-time", (int32_t *)&rtpTime));
1895
1896        int64_t relRtpTimeUs =
1897            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1898                / track->mTimeScale;
1899
1900        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1901
1902        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1903
1904        if (mediaTimeUs > mLastMediaTimeUs) {
1905            mLastMediaTimeUs = mediaTimeUs;
1906        }
1907
1908        if (mediaTimeUs < 0) {
1909            ALOGV("dropping early accessUnit.");
1910            return false;
1911        }
1912
1913        ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1914             trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1915
1916        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1917
1918        return true;
1919    }
1920
1921    void postQueueAccessUnit(
1922            size_t trackIndex, const sp<ABuffer> &accessUnit) {
1923        sp<AMessage> msg = mNotify->dup();
1924        msg->setInt32("what", kWhatAccessUnit);
1925        msg->setSize("trackIndex", trackIndex);
1926        msg->setBuffer("accessUnit", accessUnit);
1927        msg->post();
1928    }
1929
1930    void postQueueEOS(size_t trackIndex, status_t finalResult) {
1931        sp<AMessage> msg = mNotify->dup();
1932        msg->setInt32("what", kWhatEOS);
1933        msg->setSize("trackIndex", trackIndex);
1934        msg->setInt32("finalResult", finalResult);
1935        msg->post();
1936    }
1937
1938    void postQueueSeekDiscontinuity(size_t trackIndex) {
1939        sp<AMessage> msg = mNotify->dup();
1940        msg->setInt32("what", kWhatSeekDiscontinuity);
1941        msg->setSize("trackIndex", trackIndex);
1942        msg->post();
1943    }
1944
1945    void postNormalPlayTimeMapping(
1946            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1947        sp<AMessage> msg = mNotify->dup();
1948        msg->setInt32("what", kWhatNormalPlayTimeMapping);
1949        msg->setSize("trackIndex", trackIndex);
1950        msg->setInt32("rtpTime", rtpTime);
1951        msg->setInt64("nptUs", nptUs);
1952        msg->post();
1953    }
1954
1955    void postTimeout() {
1956        sp<AMessage> timeout = new AMessage('tiou', this);
1957        mCheckTimeoutGeneration++;
1958        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1959
1960        int64_t startupTimeoutUs;
1961        startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1962        timeout->post(startupTimeoutUs);
1963    }
1964
1965    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1966};
1967
1968}  // namespace android
1969
1970#endif  // MY_HANDLER_H_
1971