1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22
23#ifndef LOG_TAG
24#define LOG_TAG "MyHandler"
25#endif
26
27#include <utils/Log.h>
28
29#include "APacketSource.h"
30#include "ARTPConnection.h"
31#include "ARTSPConnection.h"
32#include "ASessionDescription.h"
33
34#include <ctype.h>
35
36#include <media/stagefright/foundation/ABuffer.h>
37#include <media/stagefright/foundation/ADebug.h>
38#include <media/stagefright/foundation/ALooper.h>
39#include <media/stagefright/foundation/AMessage.h>
40#include <media/stagefright/MediaErrors.h>
41#include <media/stagefright/Utils.h>
42
43#include <arpa/inet.h>
44#include <sys/socket.h>
45#include <netdb.h>
46
47#include "HTTPBase.h"
48
49#if LOG_NDEBUG
50#define UNUSED_UNLESS_VERBOSE(x) (void)(x)
51#else
52#define UNUSED_UNLESS_VERBOSE(x)
53#endif
54
55// If no access units are received within 5 secs, assume that the rtp
56// stream has ended and signal end of stream.
57static int64_t kAccessUnitTimeoutUs = 10000000ll;
58
59// If no access units arrive for the first 10 secs after starting the
60// stream, assume none ever will and signal EOS or switch transports.
61static int64_t kStartupTimeoutUs = 10000000ll;
62
63static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
64
65static int64_t kPauseDelayUs = 3000000ll;
66
67// The allowed maximum number of stale access units at the beginning of
68// a new sequence.
69static int32_t kMaxAllowedStaleAccessUnits = 20;
70
71namespace android {
72
73static bool GetAttribute(const char *s, const char *key, AString *value) {
74    value->clear();
75
76    size_t keyLen = strlen(key);
77
78    for (;;) {
79        while (isspace(*s)) {
80            ++s;
81        }
82
83        const char *colonPos = strchr(s, ';');
84
85        size_t len =
86            (colonPos == NULL) ? strlen(s) : colonPos - s;
87
88        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
89            value->setTo(&s[keyLen + 1], len - keyLen - 1);
90            return true;
91        }
92
93        if (colonPos == NULL) {
94            return false;
95        }
96
97        s = colonPos + 1;
98    }
99}
100
101struct MyHandler : public AHandler {
102    enum {
103        kWhatConnected                  = 'conn',
104        kWhatDisconnected               = 'disc',
105        kWhatSeekPaused                 = 'spau',
106        kWhatSeekDone                   = 'sdon',
107
108        kWhatAccessUnit                 = 'accU',
109        kWhatEOS                        = 'eos!',
110        kWhatSeekDiscontinuity          = 'seeD',
111        kWhatNormalPlayTimeMapping      = 'nptM',
112    };
113
114    MyHandler(
115            const char *url,
116            const sp<AMessage> &notify,
117            bool uidValid = false, uid_t uid = 0)
118        : mNotify(notify),
119          mUIDValid(uidValid),
120          mUID(uid),
121          mNetLooper(new ALooper),
122          mConn(new ARTSPConnection(mUIDValid, mUID)),
123          mRTPConn(new ARTPConnection),
124          mOriginalSessionURL(url),
125          mSessionURL(url),
126          mSetupTracksSuccessful(false),
127          mSeekPending(false),
128          mFirstAccessUnit(true),
129          mAllTracksHaveTime(false),
130          mNTPAnchorUs(-1),
131          mMediaAnchorUs(-1),
132          mLastMediaTimeUs(0),
133          mNumAccessUnitsReceived(0),
134          mCheckPending(false),
135          mCheckGeneration(0),
136          mCheckTimeoutGeneration(0),
137          mTryTCPInterleaving(false),
138          mTryFakeRTCP(false),
139          mReceivedFirstRTCPPacket(false),
140          mReceivedFirstRTPPacket(false),
141          mSeekable(true),
142          mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
143          mKeepAliveGeneration(0),
144          mPausing(false),
145          mPauseGeneration(0),
146          mPlayResponseParsed(false) {
147        mNetLooper->setName("rtsp net");
148        mNetLooper->start(false /* runOnCallingThread */,
149                          false /* canCallJava */,
150                          PRIORITY_HIGHEST);
151
152        // Strip any authentication info from the session url, we don't
153        // want to transmit user/pass in cleartext.
154        AString host, path, user, pass;
155        unsigned port;
156        CHECK(ARTSPConnection::ParseURL(
157                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
158
159        if (user.size() > 0) {
160            mSessionURL.clear();
161            mSessionURL.append("rtsp://");
162            mSessionURL.append(host);
163            mSessionURL.append(":");
164            mSessionURL.append(AStringPrintf("%u", port));
165            mSessionURL.append(path);
166
167            ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
168        }
169
170        mSessionHost = host;
171    }
172
173    void connect() {
174        looper()->registerHandler(mConn);
175        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
176
177        sp<AMessage> notify = new AMessage('biny', this);
178        mConn->observeBinaryData(notify);
179
180        sp<AMessage> reply = new AMessage('conn', this);
181        mConn->connect(mOriginalSessionURL.c_str(), reply);
182    }
183
184    void loadSDP(const sp<ASessionDescription>& desc) {
185        looper()->registerHandler(mConn);
186        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
187
188        sp<AMessage> notify = new AMessage('biny', this);
189        mConn->observeBinaryData(notify);
190
191        sp<AMessage> reply = new AMessage('sdpl', this);
192        reply->setObject("description", desc);
193        mConn->connect(mOriginalSessionURL.c_str(), reply);
194    }
195
196    AString getControlURL() {
197        AString sessionLevelControlURL;
198        if (mSessionDesc->findAttribute(
199                0,
200                "a=control",
201                &sessionLevelControlURL)) {
202            if (sessionLevelControlURL.compare("*") == 0) {
203                return mBaseURL;
204            } else {
205                AString controlURL;
206                CHECK(MakeURL(
207                        mBaseURL.c_str(),
208                        sessionLevelControlURL.c_str(),
209                        &controlURL));
210                return controlURL;
211            }
212        } else {
213            return mSessionURL;
214        }
215    }
216
217    void disconnect() {
218        (new AMessage('abor', this))->post();
219    }
220
221    void seek(int64_t timeUs) {
222        sp<AMessage> msg = new AMessage('seek', this);
223        msg->setInt64("time", timeUs);
224        mPauseGeneration++;
225        msg->post();
226    }
227
228    void continueSeekAfterPause(int64_t timeUs) {
229        sp<AMessage> msg = new AMessage('see1', this);
230        msg->setInt64("time", timeUs);
231        msg->post();
232    }
233
234    bool isSeekable() const {
235        return mSeekable;
236    }
237
238    void pause() {
239        sp<AMessage> msg = new AMessage('paus', this);
240        mPauseGeneration++;
241        msg->setInt32("pausecheck", mPauseGeneration);
242        msg->post();
243    }
244
245    void resume() {
246        sp<AMessage> msg = new AMessage('resu', this);
247        mPauseGeneration++;
248        msg->post();
249    }
250
251    static void addRR(const sp<ABuffer> &buf) {
252        uint8_t *ptr = buf->data() + buf->size();
253        ptr[0] = 0x80 | 0;
254        ptr[1] = 201;  // RR
255        ptr[2] = 0;
256        ptr[3] = 1;
257        ptr[4] = 0xde;  // SSRC
258        ptr[5] = 0xad;
259        ptr[6] = 0xbe;
260        ptr[7] = 0xef;
261
262        buf->setRange(0, buf->size() + 8);
263    }
264
265    static void addSDES(int s, const sp<ABuffer> &buffer) {
266        struct sockaddr_in addr;
267        socklen_t addrSize = sizeof(addr);
268        if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
269            inet_aton("0.0.0.0", &(addr.sin_addr));
270        }
271
272        uint8_t *data = buffer->data() + buffer->size();
273        data[0] = 0x80 | 1;
274        data[1] = 202;  // SDES
275        data[4] = 0xde;  // SSRC
276        data[5] = 0xad;
277        data[6] = 0xbe;
278        data[7] = 0xef;
279
280        size_t offset = 8;
281
282        data[offset++] = 1;  // CNAME
283
284        AString cname = "stagefright@";
285        cname.append(inet_ntoa(addr.sin_addr));
286        data[offset++] = cname.size();
287
288        memcpy(&data[offset], cname.c_str(), cname.size());
289        offset += cname.size();
290
291        data[offset++] = 6;  // TOOL
292
293        AString tool = MakeUserAgent();
294
295        data[offset++] = tool.size();
296
297        memcpy(&data[offset], tool.c_str(), tool.size());
298        offset += tool.size();
299
300        data[offset++] = 0;
301
302        if ((offset % 4) > 0) {
303            size_t count = 4 - (offset % 4);
304            switch (count) {
305                case 3:
306                    data[offset++] = 0;
307                case 2:
308                    data[offset++] = 0;
309                case 1:
310                    data[offset++] = 0;
311            }
312        }
313
314        size_t numWords = (offset / 4) - 1;
315        data[2] = numWords >> 8;
316        data[3] = numWords & 0xff;
317
318        buffer->setRange(buffer->offset(), buffer->size() + offset);
319    }
320
321    // In case we're behind NAT, fire off two UDP packets to the remote
322    // rtp/rtcp ports to poke a hole into the firewall for future incoming
323    // packets. We're going to send an RR/SDES RTCP packet to both of them.
324    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
325        struct sockaddr_in addr;
326        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
327        addr.sin_family = AF_INET;
328
329        AString source;
330        AString server_port;
331        if (!GetAttribute(transport.c_str(),
332                          "source",
333                          &source)) {
334            ALOGW("Missing 'source' field in Transport response. Using "
335                 "RTSP endpoint address.");
336
337            struct hostent *ent = gethostbyname(mSessionHost.c_str());
338            if (ent == NULL) {
339                ALOGE("Failed to look up address of session host '%s'",
340                     mSessionHost.c_str());
341
342                return false;
343            }
344
345            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
346        } else {
347            addr.sin_addr.s_addr = inet_addr(source.c_str());
348        }
349
350        if (!GetAttribute(transport.c_str(),
351                                 "server_port",
352                                 &server_port)) {
353            ALOGI("Missing 'server_port' field in Transport response.");
354            return false;
355        }
356
357        int rtpPort, rtcpPort;
358        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
359                || rtpPort <= 0 || rtpPort > 65535
360                || rtcpPort <=0 || rtcpPort > 65535
361                || rtcpPort != rtpPort + 1) {
362            ALOGE("Server picked invalid RTP/RTCP port pair %s,"
363                 " RTP port must be even, RTCP port must be one higher.",
364                 server_port.c_str());
365
366            return false;
367        }
368
369        if (rtpPort & 1) {
370            ALOGW("Server picked an odd RTP port, it should've picked an "
371                 "even one, we'll let it pass for now, but this may break "
372                 "in the future.");
373        }
374
375        if (addr.sin_addr.s_addr == INADDR_NONE) {
376            return true;
377        }
378
379        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
380            // No firewalls to traverse on the loopback interface.
381            return true;
382        }
383
384        // Make up an RR/SDES RTCP packet.
385        sp<ABuffer> buf = new ABuffer(65536);
386        buf->setRange(0, 0);
387        addRR(buf);
388        addSDES(rtpSocket, buf);
389
390        addr.sin_port = htons(rtpPort);
391
392        ssize_t n = sendto(
393                rtpSocket, buf->data(), buf->size(), 0,
394                (const sockaddr *)&addr, sizeof(addr));
395
396        if (n < (ssize_t)buf->size()) {
397            ALOGE("failed to poke a hole for RTP packets");
398            return false;
399        }
400
401        addr.sin_port = htons(rtcpPort);
402
403        n = sendto(
404                rtcpSocket, buf->data(), buf->size(), 0,
405                (const sockaddr *)&addr, sizeof(addr));
406
407        if (n < (ssize_t)buf->size()) {
408            ALOGE("failed to poke a hole for RTCP packets");
409            return false;
410        }
411
412        ALOGV("successfully poked holes.");
413
414        return true;
415    }
416
417    static bool isLiveStream(const sp<ASessionDescription> &desc) {
418        AString attrLiveStream;
419        if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
420            ssize_t semicolonPos = attrLiveStream.find(";", 2);
421
422            const char* liveStreamValue;
423            if (semicolonPos < 0) {
424                liveStreamValue = attrLiveStream.c_str();
425            } else {
426                AString valString;
427                valString.setTo(attrLiveStream,
428                        semicolonPos + 1,
429                        attrLiveStream.size() - semicolonPos - 1);
430                liveStreamValue = valString.c_str();
431            }
432
433            uint32_t value = strtoul(liveStreamValue, NULL, 10);
434            if (value == 1) {
435                ALOGV("found live stream");
436                return true;
437            }
438        } else {
439            // It is a live stream if no duration is returned
440            int64_t durationUs;
441            if (!desc->getDurationUs(&durationUs)) {
442                ALOGV("No duration found, assume live stream");
443                return true;
444            }
445        }
446
447        return false;
448    }
449
450    virtual void onMessageReceived(const sp<AMessage> &msg) {
451        switch (msg->what()) {
452            case 'conn':
453            {
454                int32_t result;
455                CHECK(msg->findInt32("result", &result));
456
457                ALOGI("connection request completed with result %d (%s)",
458                     result, strerror(-result));
459
460                if (result == OK) {
461                    AString request;
462                    request = "DESCRIBE ";
463                    request.append(mSessionURL);
464                    request.append(" RTSP/1.0\r\n");
465                    request.append("Accept: application/sdp\r\n");
466                    request.append("\r\n");
467
468                    sp<AMessage> reply = new AMessage('desc', this);
469                    mConn->sendRequest(request.c_str(), reply);
470                } else {
471                    (new AMessage('disc', this))->post();
472                }
473                break;
474            }
475
476            case 'disc':
477            {
478                ++mKeepAliveGeneration;
479
480                int32_t reconnect;
481                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
482                    sp<AMessage> reply = new AMessage('conn', this);
483                    mConn->connect(mOriginalSessionURL.c_str(), reply);
484                } else {
485                    (new AMessage('quit', this))->post();
486                }
487                break;
488            }
489
490            case 'desc':
491            {
492                int32_t result;
493                CHECK(msg->findInt32("result", &result));
494
495                ALOGI("DESCRIBE completed with result %d (%s)",
496                     result, strerror(-result));
497
498                if (result == OK) {
499                    sp<RefBase> obj;
500                    CHECK(msg->findObject("response", &obj));
501                    sp<ARTSPResponse> response =
502                        static_cast<ARTSPResponse *>(obj.get());
503
504                    if (response->mStatusCode == 301 || response->mStatusCode == 302) {
505                        ssize_t i = response->mHeaders.indexOfKey("location");
506                        CHECK_GE(i, 0);
507
508                        mOriginalSessionURL = response->mHeaders.valueAt(i);
509                        mSessionURL = mOriginalSessionURL;
510
511                        // Strip any authentication info from the session url, we don't
512                        // want to transmit user/pass in cleartext.
513                        AString host, path, user, pass;
514                        unsigned port;
515                        if (ARTSPConnection::ParseURL(
516                                    mSessionURL.c_str(), &host, &port, &path, &user, &pass)
517                                && user.size() > 0) {
518                            mSessionURL.clear();
519                            mSessionURL.append("rtsp://");
520                            mSessionURL.append(host);
521                            mSessionURL.append(":");
522                            mSessionURL.append(AStringPrintf("%u", port));
523                            mSessionURL.append(path);
524
525                            ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
526                        }
527
528                        sp<AMessage> reply = new AMessage('conn', this);
529                        mConn->connect(mOriginalSessionURL.c_str(), reply);
530                        break;
531                    }
532
533                    if (response->mStatusCode != 200) {
534                        result = UNKNOWN_ERROR;
535                    } else if (response->mContent == NULL) {
536                        result = ERROR_MALFORMED;
537                        ALOGE("The response has no content.");
538                    } else {
539                        mSessionDesc = new ASessionDescription;
540
541                        mSessionDesc->setTo(
542                                response->mContent->data(),
543                                response->mContent->size());
544
545                        if (!mSessionDesc->isValid()) {
546                            ALOGE("Failed to parse session description.");
547                            result = ERROR_MALFORMED;
548                        } else {
549                            ssize_t i = response->mHeaders.indexOfKey("content-base");
550                            if (i >= 0) {
551                                mBaseURL = response->mHeaders.valueAt(i);
552                            } else {
553                                i = response->mHeaders.indexOfKey("content-location");
554                                if (i >= 0) {
555                                    mBaseURL = response->mHeaders.valueAt(i);
556                                } else {
557                                    mBaseURL = mSessionURL;
558                                }
559                            }
560
561                            mSeekable = !isLiveStream(mSessionDesc);
562
563                            if (!mBaseURL.startsWith("rtsp://")) {
564                                // Some misbehaving servers specify a relative
565                                // URL in one of the locations above, combine
566                                // it with the absolute session URL to get
567                                // something usable...
568
569                                ALOGW("Server specified a non-absolute base URL"
570                                     ", combining it with the session URL to "
571                                     "get something usable...");
572
573                                AString tmp;
574                                CHECK(MakeURL(
575                                            mSessionURL.c_str(),
576                                            mBaseURL.c_str(),
577                                            &tmp));
578
579                                mBaseURL = tmp;
580                            }
581
582                            mControlURL = getControlURL();
583
584                            if (mSessionDesc->countTracks() < 2) {
585                                // There's no actual tracks in this session.
586                                // The first "track" is merely session meta
587                                // data.
588
589                                ALOGW("Session doesn't contain any playable "
590                                     "tracks. Aborting.");
591                                result = ERROR_UNSUPPORTED;
592                            } else {
593                                setupTrack(1);
594                            }
595                        }
596                    }
597                }
598
599                if (result != OK) {
600                    sp<AMessage> reply = new AMessage('disc', this);
601                    mConn->disconnect(reply);
602                }
603                break;
604            }
605
606            case 'sdpl':
607            {
608                int32_t result;
609                CHECK(msg->findInt32("result", &result));
610
611                ALOGI("SDP connection request completed with result %d (%s)",
612                     result, strerror(-result));
613
614                if (result == OK) {
615                    sp<RefBase> obj;
616                    CHECK(msg->findObject("description", &obj));
617                    mSessionDesc =
618                        static_cast<ASessionDescription *>(obj.get());
619
620                    if (!mSessionDesc->isValid()) {
621                        ALOGE("Failed to parse session description.");
622                        result = ERROR_MALFORMED;
623                    } else {
624                        mBaseURL = mSessionURL;
625
626                        mSeekable = !isLiveStream(mSessionDesc);
627
628                        mControlURL = getControlURL();
629
630                        if (mSessionDesc->countTracks() < 2) {
631                            // There's no actual tracks in this session.
632                            // The first "track" is merely session meta
633                            // data.
634
635                            ALOGW("Session doesn't contain any playable "
636                                 "tracks. Aborting.");
637                            result = ERROR_UNSUPPORTED;
638                        } else {
639                            setupTrack(1);
640                        }
641                    }
642                }
643
644                if (result != OK) {
645                    sp<AMessage> reply = new AMessage('disc', this);
646                    mConn->disconnect(reply);
647                }
648                break;
649            }
650
651            case 'setu':
652            {
653                size_t index;
654                CHECK(msg->findSize("index", &index));
655
656                TrackInfo *track = NULL;
657                size_t trackIndex;
658                if (msg->findSize("track-index", &trackIndex)) {
659                    track = &mTracks.editItemAt(trackIndex);
660                }
661
662                int32_t result;
663                CHECK(msg->findInt32("result", &result));
664
665                ALOGI("SETUP(%zu) completed with result %d (%s)",
666                     index, result, strerror(-result));
667
668                if (result == OK) {
669                    CHECK(track != NULL);
670
671                    sp<RefBase> obj;
672                    CHECK(msg->findObject("response", &obj));
673                    sp<ARTSPResponse> response =
674                        static_cast<ARTSPResponse *>(obj.get());
675
676                    if (response->mStatusCode != 200) {
677                        result = UNKNOWN_ERROR;
678                    } else {
679                        ssize_t i = response->mHeaders.indexOfKey("session");
680                        CHECK_GE(i, 0);
681
682                        mSessionID = response->mHeaders.valueAt(i);
683
684                        mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
685                        AString timeoutStr;
686                        if (GetAttribute(
687                                    mSessionID.c_str(), "timeout", &timeoutStr)) {
688                            char *end;
689                            unsigned long timeoutSecs =
690                                strtoul(timeoutStr.c_str(), &end, 10);
691
692                            if (end == timeoutStr.c_str() || *end != '\0') {
693                                ALOGW("server specified malformed timeout '%s'",
694                                     timeoutStr.c_str());
695
696                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
697                            } else if (timeoutSecs < 15) {
698                                ALOGW("server specified too short a timeout "
699                                     "(%lu secs), using default.",
700                                     timeoutSecs);
701
702                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
703                            } else {
704                                mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
705
706                                ALOGI("server specified timeout of %lu secs.",
707                                     timeoutSecs);
708                            }
709                        }
710
711                        i = mSessionID.find(";");
712                        if (i >= 0) {
713                            // Remove options, i.e. ";timeout=90"
714                            mSessionID.erase(i, mSessionID.size() - i);
715                        }
716
717                        sp<AMessage> notify = new AMessage('accu', this);
718                        notify->setSize("track-index", trackIndex);
719
720                        i = response->mHeaders.indexOfKey("transport");
721                        CHECK_GE(i, 0);
722
723                        if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
724                            if (!track->mUsingInterleavedTCP) {
725                                AString transport = response->mHeaders.valueAt(i);
726
727                                // We are going to continue even if we were
728                                // unable to poke a hole into the firewall...
729                                pokeAHole(
730                                        track->mRTPSocket,
731                                        track->mRTCPSocket,
732                                        transport);
733                            }
734
735                            mRTPConn->addStream(
736                                    track->mRTPSocket, track->mRTCPSocket,
737                                    mSessionDesc, index,
738                                    notify, track->mUsingInterleavedTCP);
739
740                            mSetupTracksSuccessful = true;
741                        } else {
742                            result = BAD_VALUE;
743                        }
744                    }
745                }
746
747                if (result != OK) {
748                    if (track) {
749                        if (!track->mUsingInterleavedTCP) {
750                            // Clear the tag
751                            if (mUIDValid) {
752                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
753                                HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
754                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
755                                HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
756                            }
757
758                            close(track->mRTPSocket);
759                            close(track->mRTCPSocket);
760                        }
761
762                        mTracks.removeItemsAt(trackIndex);
763                    }
764                }
765
766                ++index;
767                if (result == OK && index < mSessionDesc->countTracks()) {
768                    setupTrack(index);
769                } else if (mSetupTracksSuccessful) {
770                    ++mKeepAliveGeneration;
771                    postKeepAlive();
772
773                    AString request = "PLAY ";
774                    request.append(mControlURL);
775                    request.append(" RTSP/1.0\r\n");
776
777                    request.append("Session: ");
778                    request.append(mSessionID);
779                    request.append("\r\n");
780
781                    request.append("\r\n");
782
783                    sp<AMessage> reply = new AMessage('play', this);
784                    mConn->sendRequest(request.c_str(), reply);
785                } else {
786                    sp<AMessage> reply = new AMessage('disc', this);
787                    mConn->disconnect(reply);
788                }
789                break;
790            }
791
792            case 'play':
793            {
794                int32_t result;
795                CHECK(msg->findInt32("result", &result));
796
797                ALOGI("PLAY completed with result %d (%s)",
798                     result, strerror(-result));
799
800                if (result == OK) {
801                    sp<RefBase> obj;
802                    CHECK(msg->findObject("response", &obj));
803                    sp<ARTSPResponse> response =
804                        static_cast<ARTSPResponse *>(obj.get());
805
806                    if (response->mStatusCode != 200) {
807                        result = UNKNOWN_ERROR;
808                    } else {
809                        parsePlayResponse(response);
810
811                        sp<AMessage> timeout = new AMessage('tiou', this);
812                        mCheckTimeoutGeneration++;
813                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
814                        timeout->post(kStartupTimeoutUs);
815                    }
816                }
817
818                if (result != OK) {
819                    sp<AMessage> reply = new AMessage('disc', this);
820                    mConn->disconnect(reply);
821                }
822
823                break;
824            }
825
826            case 'aliv':
827            {
828                int32_t generation;
829                CHECK(msg->findInt32("generation", &generation));
830
831                if (generation != mKeepAliveGeneration) {
832                    // obsolete event.
833                    break;
834                }
835
836                AString request;
837                request.append("OPTIONS ");
838                request.append(mSessionURL);
839                request.append(" RTSP/1.0\r\n");
840                request.append("Session: ");
841                request.append(mSessionID);
842                request.append("\r\n");
843                request.append("\r\n");
844
845                sp<AMessage> reply = new AMessage('opts', this);
846                reply->setInt32("generation", mKeepAliveGeneration);
847                mConn->sendRequest(request.c_str(), reply);
848                break;
849            }
850
851            case 'opts':
852            {
853                int32_t result;
854                CHECK(msg->findInt32("result", &result));
855
856                ALOGI("OPTIONS completed with result %d (%s)",
857                     result, strerror(-result));
858
859                int32_t generation;
860                CHECK(msg->findInt32("generation", &generation));
861
862                if (generation != mKeepAliveGeneration) {
863                    // obsolete event.
864                    break;
865                }
866
867                postKeepAlive();
868                break;
869            }
870
871            case 'abor':
872            {
873                for (size_t i = 0; i < mTracks.size(); ++i) {
874                    TrackInfo *info = &mTracks.editItemAt(i);
875
876                    if (!mFirstAccessUnit) {
877                        postQueueEOS(i, ERROR_END_OF_STREAM);
878                    }
879
880                    if (!info->mUsingInterleavedTCP) {
881                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
882
883                        // Clear the tag
884                        if (mUIDValid) {
885                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
886                            HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
887                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
888                            HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
889                        }
890
891                        close(info->mRTPSocket);
892                        close(info->mRTCPSocket);
893                    }
894                }
895                mTracks.clear();
896                mSetupTracksSuccessful = false;
897                mSeekPending = false;
898                mFirstAccessUnit = true;
899                mAllTracksHaveTime = false;
900                mNTPAnchorUs = -1;
901                mMediaAnchorUs = -1;
902                mNumAccessUnitsReceived = 0;
903                mReceivedFirstRTCPPacket = false;
904                mReceivedFirstRTPPacket = false;
905                mPausing = false;
906                mSeekable = true;
907
908                sp<AMessage> reply = new AMessage('tear', this);
909
910                int32_t reconnect;
911                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
912                    reply->setInt32("reconnect", true);
913                }
914
915                AString request;
916                request = "TEARDOWN ";
917
918                // XXX should use aggregate url from SDP here...
919                request.append(mSessionURL);
920                request.append(" RTSP/1.0\r\n");
921
922                request.append("Session: ");
923                request.append(mSessionID);
924                request.append("\r\n");
925
926                request.append("\r\n");
927
928                mConn->sendRequest(request.c_str(), reply);
929                break;
930            }
931
932            case 'tear':
933            {
934                int32_t result;
935                CHECK(msg->findInt32("result", &result));
936
937                ALOGI("TEARDOWN completed with result %d (%s)",
938                     result, strerror(-result));
939
940                sp<AMessage> reply = new AMessage('disc', this);
941
942                int32_t reconnect;
943                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
944                    reply->setInt32("reconnect", true);
945                }
946
947                mConn->disconnect(reply);
948                break;
949            }
950
951            case 'quit':
952            {
953                sp<AMessage> msg = mNotify->dup();
954                msg->setInt32("what", kWhatDisconnected);
955                msg->setInt32("result", UNKNOWN_ERROR);
956                msg->post();
957                break;
958            }
959
960            case 'chek':
961            {
962                int32_t generation;
963                CHECK(msg->findInt32("generation", &generation));
964                if (generation != mCheckGeneration) {
965                    // This is an outdated message. Ignore.
966                    break;
967                }
968
969                if (mNumAccessUnitsReceived == 0) {
970#if 1
971                    ALOGI("stream ended? aborting.");
972                    (new AMessage('abor', this))->post();
973                    break;
974#else
975                    ALOGI("haven't seen an AU in a looong time.");
976#endif
977                }
978
979                mNumAccessUnitsReceived = 0;
980                msg->post(kAccessUnitTimeoutUs);
981                break;
982            }
983
984            case 'accu':
985            {
986                if (mSeekPending) {
987                    ALOGV("Stale access unit.");
988                    break;
989                }
990
991                int32_t timeUpdate;
992                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
993                    size_t trackIndex;
994                    CHECK(msg->findSize("track-index", &trackIndex));
995
996                    uint32_t rtpTime;
997                    uint64_t ntpTime;
998                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
999                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
1000
1001                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
1002                    break;
1003                }
1004
1005                int32_t first;
1006                if (msg->findInt32("first-rtcp", &first)) {
1007                    mReceivedFirstRTCPPacket = true;
1008                    break;
1009                }
1010
1011                if (msg->findInt32("first-rtp", &first)) {
1012                    mReceivedFirstRTPPacket = true;
1013                    break;
1014                }
1015
1016                ++mNumAccessUnitsReceived;
1017                postAccessUnitTimeoutCheck();
1018
1019                size_t trackIndex;
1020                CHECK(msg->findSize("track-index", &trackIndex));
1021
1022                if (trackIndex >= mTracks.size()) {
1023                    ALOGV("late packets ignored.");
1024                    break;
1025                }
1026
1027                TrackInfo *track = &mTracks.editItemAt(trackIndex);
1028
1029                int32_t eos;
1030                if (msg->findInt32("eos", &eos)) {
1031                    ALOGI("received BYE on track index %zu", trackIndex);
1032                    if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1033                        ALOGI("No time established => fake existing data");
1034
1035                        track->mEOSReceived = true;
1036                        mTryFakeRTCP = true;
1037                        mReceivedFirstRTCPPacket = true;
1038                        fakeTimestamps();
1039                    } else {
1040                        postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1041                    }
1042                    return;
1043                }
1044
1045                sp<ABuffer> accessUnit;
1046                CHECK(msg->findBuffer("access-unit", &accessUnit));
1047
1048                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1049
1050                if (mSeekPending) {
1051                    ALOGV("we're seeking, dropping stale packet.");
1052                    break;
1053                }
1054
1055                if (track->mNewSegment) {
1056                    // The sequence number from RTP packet has only 16 bits and is extended
1057                    // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1058                    // RTSP "PLAY" command should be used to detect the first RTP packet
1059                    // after seeking.
1060                    if (track->mAllowedStaleAccessUnits > 0) {
1061                        if ((((seqNum ^ track->mFirstSeqNumInSegment) & 0xffff) != 0)) {
1062                            // Not the first rtp packet of the stream after seeking, discarding.
1063                            track->mAllowedStaleAccessUnits--;
1064                            ALOGV("discarding stale access unit (0x%x : 0x%x)",
1065                                 seqNum, track->mFirstSeqNumInSegment);
1066                            break;
1067                        }
1068                    } else { // track->mAllowedStaleAccessUnits <= 0
1069                        mNumAccessUnitsReceived = 0;
1070                        ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1071                             "Still no first rtp packet after %d stale ones",
1072                             kMaxAllowedStaleAccessUnits);
1073                        track->mAllowedStaleAccessUnits = -1;
1074                        break;
1075                    }
1076
1077                    // Now found the first rtp packet of the stream after seeking.
1078                    track->mFirstSeqNumInSegment = seqNum;
1079                    track->mNewSegment = false;
1080                }
1081
1082                if (seqNum < track->mFirstSeqNumInSegment) {
1083                    ALOGV("dropping stale access-unit (%d < %d)",
1084                         seqNum, track->mFirstSeqNumInSegment);
1085                    break;
1086                }
1087
1088                onAccessUnitComplete(trackIndex, accessUnit);
1089                break;
1090            }
1091
1092            case 'paus':
1093            {
1094                int32_t generation;
1095                CHECK(msg->findInt32("pausecheck", &generation));
1096                if (generation != mPauseGeneration) {
1097                    ALOGV("Ignoring outdated pause message.");
1098                    break;
1099                }
1100
1101                if (!mSeekable) {
1102                    ALOGW("This is a live stream, ignoring pause request.");
1103                    break;
1104                }
1105
1106                if (mPausing) {
1107                    ALOGV("This stream is already paused.");
1108                    break;
1109                }
1110
1111                mCheckPending = true;
1112                ++mCheckGeneration;
1113                mPausing = true;
1114
1115                AString request = "PAUSE ";
1116                request.append(mControlURL);
1117                request.append(" RTSP/1.0\r\n");
1118
1119                request.append("Session: ");
1120                request.append(mSessionID);
1121                request.append("\r\n");
1122
1123                request.append("\r\n");
1124
1125                sp<AMessage> reply = new AMessage('pau2', this);
1126                mConn->sendRequest(request.c_str(), reply);
1127                break;
1128            }
1129
1130            case 'pau2':
1131            {
1132                int32_t result;
1133                CHECK(msg->findInt32("result", &result));
1134                mCheckTimeoutGeneration++;
1135
1136                ALOGI("PAUSE completed with result %d (%s)",
1137                     result, strerror(-result));
1138                break;
1139            }
1140
1141            case 'resu':
1142            {
1143                if (mPausing && mSeekPending) {
1144                    // If seeking, Play will be sent from see1 instead
1145                    break;
1146                }
1147
1148                if (!mPausing) {
1149                    // Dont send PLAY if we have not paused
1150                    break;
1151                }
1152                AString request = "PLAY ";
1153                request.append(mControlURL);
1154                request.append(" RTSP/1.0\r\n");
1155
1156                request.append("Session: ");
1157                request.append(mSessionID);
1158                request.append("\r\n");
1159
1160                request.append("\r\n");
1161
1162                sp<AMessage> reply = new AMessage('res2', this);
1163                mConn->sendRequest(request.c_str(), reply);
1164                break;
1165            }
1166
1167            case 'res2':
1168            {
1169                int32_t result;
1170                CHECK(msg->findInt32("result", &result));
1171
1172                ALOGI("PLAY (for resume) completed with result %d (%s)",
1173                     result, strerror(-result));
1174
1175                mCheckPending = false;
1176                ++mCheckGeneration;
1177                postAccessUnitTimeoutCheck();
1178
1179                if (result == OK) {
1180                    sp<RefBase> obj;
1181                    CHECK(msg->findObject("response", &obj));
1182                    sp<ARTSPResponse> response =
1183                        static_cast<ARTSPResponse *>(obj.get());
1184
1185                    if (response->mStatusCode != 200) {
1186                        result = UNKNOWN_ERROR;
1187                    } else {
1188                        parsePlayResponse(response);
1189
1190                        // Post new timeout in order to make sure to use
1191                        // fake timestamps if no new Sender Reports arrive
1192                        sp<AMessage> timeout = new AMessage('tiou', this);
1193                        mCheckTimeoutGeneration++;
1194                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1195                        timeout->post(kStartupTimeoutUs);
1196                    }
1197                }
1198
1199                if (result != OK) {
1200                    ALOGE("resume failed, aborting.");
1201                    (new AMessage('abor', this))->post();
1202                }
1203
1204                mPausing = false;
1205                break;
1206            }
1207
1208            case 'seek':
1209            {
1210                if (!mSeekable) {
1211                    ALOGW("This is a live stream, ignoring seek request.");
1212
1213                    sp<AMessage> msg = mNotify->dup();
1214                    msg->setInt32("what", kWhatSeekDone);
1215                    msg->post();
1216                    break;
1217                }
1218
1219                int64_t timeUs;
1220                CHECK(msg->findInt64("time", &timeUs));
1221
1222                mSeekPending = true;
1223
1224                // Disable the access unit timeout until we resumed
1225                // playback again.
1226                mCheckPending = true;
1227                ++mCheckGeneration;
1228
1229                sp<AMessage> reply = new AMessage('see0', this);
1230                reply->setInt64("time", timeUs);
1231
1232                if (mPausing) {
1233                    // PAUSE already sent
1234                    ALOGI("Pause already sent");
1235                    reply->post();
1236                    break;
1237                }
1238                AString request = "PAUSE ";
1239                request.append(mControlURL);
1240                request.append(" RTSP/1.0\r\n");
1241
1242                request.append("Session: ");
1243                request.append(mSessionID);
1244                request.append("\r\n");
1245
1246                request.append("\r\n");
1247
1248                mConn->sendRequest(request.c_str(), reply);
1249                break;
1250            }
1251
1252            case 'see0':
1253            {
1254                // Session is paused now.
1255                status_t err = OK;
1256                msg->findInt32("result", &err);
1257
1258                int64_t timeUs;
1259                CHECK(msg->findInt64("time", &timeUs));
1260
1261                sp<AMessage> notify = mNotify->dup();
1262                notify->setInt32("what", kWhatSeekPaused);
1263                notify->setInt32("err", err);
1264                notify->setInt64("time", timeUs);
1265                notify->post();
1266                break;
1267
1268            }
1269
1270            case 'see1':
1271            {
1272                for (size_t i = 0; i < mTracks.size(); ++i) {
1273                    TrackInfo *info = &mTracks.editItemAt(i);
1274
1275                    postQueueSeekDiscontinuity(i);
1276                    info->mEOSReceived = false;
1277
1278                    info->mRTPAnchor = 0;
1279                    info->mNTPAnchorUs = -1;
1280                }
1281
1282                mAllTracksHaveTime = false;
1283                mNTPAnchorUs = -1;
1284
1285                // Start new timeoutgeneration to avoid getting timeout
1286                // before PLAY response arrive
1287                sp<AMessage> timeout = new AMessage('tiou', this);
1288                mCheckTimeoutGeneration++;
1289                timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1290                timeout->post(kStartupTimeoutUs);
1291
1292                int64_t timeUs;
1293                CHECK(msg->findInt64("time", &timeUs));
1294
1295                AString request = "PLAY ";
1296                request.append(mControlURL);
1297                request.append(" RTSP/1.0\r\n");
1298
1299                request.append("Session: ");
1300                request.append(mSessionID);
1301                request.append("\r\n");
1302
1303                request.append(
1304                        AStringPrintf(
1305                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1306
1307                request.append("\r\n");
1308
1309                sp<AMessage> reply = new AMessage('see2', this);
1310                mConn->sendRequest(request.c_str(), reply);
1311                break;
1312            }
1313
1314            case 'see2':
1315            {
1316                if (mTracks.size() == 0) {
1317                    // We have already hit abor, break
1318                    break;
1319                }
1320
1321                int32_t result;
1322                CHECK(msg->findInt32("result", &result));
1323
1324                ALOGI("PLAY (for seek) completed with result %d (%s)",
1325                     result, strerror(-result));
1326
1327                mCheckPending = false;
1328                ++mCheckGeneration;
1329                postAccessUnitTimeoutCheck();
1330
1331                if (result == OK) {
1332                    sp<RefBase> obj;
1333                    CHECK(msg->findObject("response", &obj));
1334                    sp<ARTSPResponse> response =
1335                        static_cast<ARTSPResponse *>(obj.get());
1336
1337                    if (response->mStatusCode != 200) {
1338                        result = UNKNOWN_ERROR;
1339                    } else {
1340                        parsePlayResponse(response);
1341
1342                        // Post new timeout in order to make sure to use
1343                        // fake timestamps if no new Sender Reports arrive
1344                        sp<AMessage> timeout = new AMessage('tiou', this);
1345                        mCheckTimeoutGeneration++;
1346                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1347                        timeout->post(kStartupTimeoutUs);
1348
1349                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1350                        CHECK_GE(i, 0);
1351
1352                        ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1353
1354                        ALOGI("seek completed.");
1355                    }
1356                }
1357
1358                if (result != OK) {
1359                    ALOGE("seek failed, aborting.");
1360                    (new AMessage('abor', this))->post();
1361                }
1362
1363                mPausing = false;
1364                mSeekPending = false;
1365
1366                // Discard all stale access units.
1367                for (size_t i = 0; i < mTracks.size(); ++i) {
1368                    TrackInfo *track = &mTracks.editItemAt(i);
1369                    track->mPackets.clear();
1370                }
1371
1372                sp<AMessage> msg = mNotify->dup();
1373                msg->setInt32("what", kWhatSeekDone);
1374                msg->post();
1375                break;
1376            }
1377
1378            case 'biny':
1379            {
1380                sp<ABuffer> buffer;
1381                CHECK(msg->findBuffer("buffer", &buffer));
1382
1383                int32_t index;
1384                CHECK(buffer->meta()->findInt32("index", &index));
1385
1386                mRTPConn->injectPacket(index, buffer);
1387                break;
1388            }
1389
1390            case 'tiou':
1391            {
1392                int32_t timeoutGenerationCheck;
1393                CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1394                if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1395                    // This is an outdated message. Ignore.
1396                    // This typically happens if a lot of seeks are
1397                    // performed, since new timeout messages now are
1398                    // posted at seek as well.
1399                    break;
1400                }
1401                if (!mReceivedFirstRTCPPacket) {
1402                    if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1403                        ALOGW("We received RTP packets but no RTCP packets, "
1404                             "using fake timestamps.");
1405
1406                        mTryFakeRTCP = true;
1407
1408                        mReceivedFirstRTCPPacket = true;
1409
1410                        fakeTimestamps();
1411                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1412                        ALOGW("Never received any data, switching transports.");
1413
1414                        mTryTCPInterleaving = true;
1415
1416                        sp<AMessage> msg = new AMessage('abor', this);
1417                        msg->setInt32("reconnect", true);
1418                        msg->post();
1419                    } else {
1420                        ALOGW("Never received any data, disconnecting.");
1421                        (new AMessage('abor', this))->post();
1422                    }
1423                } else {
1424                    if (!mAllTracksHaveTime) {
1425                        ALOGW("We received some RTCP packets, but time "
1426                              "could not be established on all tracks, now "
1427                              "using fake timestamps");
1428
1429                        fakeTimestamps();
1430                    }
1431                }
1432                break;
1433            }
1434
1435            default:
1436                TRESPASS();
1437                break;
1438        }
1439    }
1440
1441    void postKeepAlive() {
1442        sp<AMessage> msg = new AMessage('aliv', this);
1443        msg->setInt32("generation", mKeepAliveGeneration);
1444        msg->post((mKeepAliveTimeoutUs * 9) / 10);
1445    }
1446
1447    void postAccessUnitTimeoutCheck() {
1448        if (mCheckPending) {
1449            return;
1450        }
1451
1452        mCheckPending = true;
1453        sp<AMessage> check = new AMessage('chek', this);
1454        check->setInt32("generation", mCheckGeneration);
1455        check->post(kAccessUnitTimeoutUs);
1456    }
1457
1458    static void SplitString(
1459            const AString &s, const char *separator, List<AString> *items) {
1460        items->clear();
1461        size_t start = 0;
1462        while (start < s.size()) {
1463            ssize_t offset = s.find(separator, start);
1464
1465            if (offset < 0) {
1466                items->push_back(AString(s, start, s.size() - start));
1467                break;
1468            }
1469
1470            items->push_back(AString(s, start, offset - start));
1471            start = offset + strlen(separator);
1472        }
1473    }
1474
1475    void parsePlayResponse(const sp<ARTSPResponse> &response) {
1476        mPlayResponseParsed = true;
1477        if (mTracks.size() == 0) {
1478            ALOGV("parsePlayResponse: late packets ignored.");
1479            return;
1480        }
1481
1482        ssize_t i = response->mHeaders.indexOfKey("range");
1483        if (i < 0) {
1484            // Server doesn't even tell use what range it is going to
1485            // play, therefore we won't support seeking.
1486            return;
1487        }
1488
1489        AString range = response->mHeaders.valueAt(i);
1490        ALOGV("Range: %s", range.c_str());
1491
1492        AString val;
1493        CHECK(GetAttribute(range.c_str(), "npt", &val));
1494
1495        float npt1, npt2;
1496        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1497            // This is a live stream and therefore not seekable.
1498
1499            ALOGI("This is a live stream");
1500            return;
1501        }
1502
1503        i = response->mHeaders.indexOfKey("rtp-info");
1504        CHECK_GE(i, 0);
1505
1506        AString rtpInfo = response->mHeaders.valueAt(i);
1507        List<AString> streamInfos;
1508        SplitString(rtpInfo, ",", &streamInfos);
1509
1510        int n = 1;
1511        for (List<AString>::iterator it = streamInfos.begin();
1512             it != streamInfos.end(); ++it) {
1513            (*it).trim();
1514            ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1515
1516            CHECK(GetAttribute((*it).c_str(), "url", &val));
1517
1518            size_t trackIndex = 0;
1519            while (trackIndex < mTracks.size()
1520                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1521                ++trackIndex;
1522            }
1523            CHECK_LT(trackIndex, mTracks.size());
1524
1525            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1526
1527            char *end;
1528            unsigned long seq = strtoul(val.c_str(), &end, 10);
1529
1530            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1531            info->mFirstSeqNumInSegment = seq;
1532            info->mNewSegment = true;
1533            info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1534
1535            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1536
1537            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1538
1539            ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1540
1541            info->mNormalPlayTimeRTP = rtpTime;
1542            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1543
1544            if (!mFirstAccessUnit) {
1545                postNormalPlayTimeMapping(
1546                        trackIndex,
1547                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1548            }
1549
1550            ++n;
1551        }
1552    }
1553
1554    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1555        CHECK_GE(index, 0u);
1556        CHECK_LT(index, mTracks.size());
1557
1558        const TrackInfo &info = mTracks.itemAt(index);
1559
1560        *timeScale = info.mTimeScale;
1561
1562        return info.mPacketSource->getFormat();
1563    }
1564
1565    size_t countTracks() const {
1566        return mTracks.size();
1567    }
1568
1569private:
1570    struct TrackInfo {
1571        AString mURL;
1572        int mRTPSocket;
1573        int mRTCPSocket;
1574        bool mUsingInterleavedTCP;
1575        uint32_t mFirstSeqNumInSegment;
1576        bool mNewSegment;
1577        int32_t mAllowedStaleAccessUnits;
1578
1579        uint32_t mRTPAnchor;
1580        int64_t mNTPAnchorUs;
1581        int32_t mTimeScale;
1582        bool mEOSReceived;
1583
1584        uint32_t mNormalPlayTimeRTP;
1585        int64_t mNormalPlayTimeUs;
1586
1587        sp<APacketSource> mPacketSource;
1588
1589        // Stores packets temporarily while no notion of time
1590        // has been established yet.
1591        List<sp<ABuffer> > mPackets;
1592    };
1593
1594    sp<AMessage> mNotify;
1595    bool mUIDValid;
1596    uid_t mUID;
1597    sp<ALooper> mNetLooper;
1598    sp<ARTSPConnection> mConn;
1599    sp<ARTPConnection> mRTPConn;
1600    sp<ASessionDescription> mSessionDesc;
1601    AString mOriginalSessionURL;  // This one still has user:pass@
1602    AString mSessionURL;
1603    AString mSessionHost;
1604    AString mBaseURL;
1605    AString mControlURL;
1606    AString mSessionID;
1607    bool mSetupTracksSuccessful;
1608    bool mSeekPending;
1609    bool mFirstAccessUnit;
1610
1611    bool mAllTracksHaveTime;
1612    int64_t mNTPAnchorUs;
1613    int64_t mMediaAnchorUs;
1614    int64_t mLastMediaTimeUs;
1615
1616    int64_t mNumAccessUnitsReceived;
1617    bool mCheckPending;
1618    int32_t mCheckGeneration;
1619    int32_t mCheckTimeoutGeneration;
1620    bool mTryTCPInterleaving;
1621    bool mTryFakeRTCP;
1622    bool mReceivedFirstRTCPPacket;
1623    bool mReceivedFirstRTPPacket;
1624    bool mSeekable;
1625    int64_t mKeepAliveTimeoutUs;
1626    int32_t mKeepAliveGeneration;
1627    bool mPausing;
1628    int32_t mPauseGeneration;
1629
1630    Vector<TrackInfo> mTracks;
1631
1632    bool mPlayResponseParsed;
1633
1634    void setupTrack(size_t index) {
1635        sp<APacketSource> source =
1636            new APacketSource(mSessionDesc, index);
1637
1638        if (source->initCheck() != OK) {
1639            ALOGW("Unsupported format. Ignoring track #%zu.", index);
1640
1641            sp<AMessage> reply = new AMessage('setu', this);
1642            reply->setSize("index", index);
1643            reply->setInt32("result", ERROR_UNSUPPORTED);
1644            reply->post();
1645            return;
1646        }
1647
1648        AString url;
1649        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1650
1651        AString trackURL;
1652        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1653
1654        mTracks.push(TrackInfo());
1655        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1656        info->mURL = trackURL;
1657        info->mPacketSource = source;
1658        info->mUsingInterleavedTCP = false;
1659        info->mFirstSeqNumInSegment = 0;
1660        info->mNewSegment = true;
1661        info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1662        info->mRTPSocket = -1;
1663        info->mRTCPSocket = -1;
1664        info->mRTPAnchor = 0;
1665        info->mNTPAnchorUs = -1;
1666        info->mNormalPlayTimeRTP = 0;
1667        info->mNormalPlayTimeUs = 0ll;
1668
1669        unsigned long PT;
1670        AString formatDesc;
1671        AString formatParams;
1672        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1673
1674        int32_t timescale;
1675        int32_t numChannels;
1676        ASessionDescription::ParseFormatDesc(
1677                formatDesc.c_str(), &timescale, &numChannels);
1678
1679        info->mTimeScale = timescale;
1680        info->mEOSReceived = false;
1681
1682        ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1683
1684        AString request = "SETUP ";
1685        request.append(trackURL);
1686        request.append(" RTSP/1.0\r\n");
1687
1688        if (mTryTCPInterleaving) {
1689            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1690            info->mUsingInterleavedTCP = true;
1691            info->mRTPSocket = interleaveIndex;
1692            info->mRTCPSocket = interleaveIndex + 1;
1693
1694            request.append("Transport: RTP/AVP/TCP;interleaved=");
1695            request.append(interleaveIndex);
1696            request.append("-");
1697            request.append(interleaveIndex + 1);
1698        } else {
1699            unsigned rtpPort;
1700            ARTPConnection::MakePortPair(
1701                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1702
1703            if (mUIDValid) {
1704                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1705                                                (uint32_t)*(uint32_t*) "RTP_");
1706                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1707                                                (uint32_t)*(uint32_t*) "RTP_");
1708                HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1709                HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1710            }
1711
1712            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1713            request.append(rtpPort);
1714            request.append("-");
1715            request.append(rtpPort + 1);
1716        }
1717
1718        request.append("\r\n");
1719
1720        if (index > 1) {
1721            request.append("Session: ");
1722            request.append(mSessionID);
1723            request.append("\r\n");
1724        }
1725
1726        request.append("\r\n");
1727
1728        sp<AMessage> reply = new AMessage('setu', this);
1729        reply->setSize("index", index);
1730        reply->setSize("track-index", mTracks.size() - 1);
1731        mConn->sendRequest(request.c_str(), reply);
1732    }
1733
1734    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1735        out->clear();
1736
1737        if (strncasecmp("rtsp://", baseURL, 7)) {
1738            // Base URL must be absolute
1739            return false;
1740        }
1741
1742        if (!strncasecmp("rtsp://", url, 7)) {
1743            // "url" is already an absolute URL, ignore base URL.
1744            out->setTo(url);
1745            return true;
1746        }
1747
1748        size_t n = strlen(baseURL);
1749        out->setTo(baseURL);
1750        if (baseURL[n - 1] != '/') {
1751            out->append("/");
1752        }
1753        out->append(url);
1754
1755        return true;
1756    }
1757
1758    void fakeTimestamps() {
1759        mNTPAnchorUs = -1ll;
1760        for (size_t i = 0; i < mTracks.size(); ++i) {
1761            onTimeUpdate(i, 0, 0ll);
1762        }
1763    }
1764
1765    bool dataReceivedOnAllChannels() {
1766        TrackInfo *track;
1767        for (size_t i = 0; i < mTracks.size(); ++i) {
1768            track = &mTracks.editItemAt(i);
1769            if (track->mPackets.empty()) {
1770                return false;
1771            }
1772        }
1773        return true;
1774    }
1775
1776    void handleFirstAccessUnit() {
1777        if (mFirstAccessUnit) {
1778            sp<AMessage> msg = mNotify->dup();
1779            msg->setInt32("what", kWhatConnected);
1780            msg->post();
1781
1782            if (mSeekable) {
1783                for (size_t i = 0; i < mTracks.size(); ++i) {
1784                    TrackInfo *info = &mTracks.editItemAt(i);
1785
1786                    postNormalPlayTimeMapping(
1787                            i,
1788                            info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1789                }
1790            }
1791
1792            mFirstAccessUnit = false;
1793        }
1794    }
1795
1796    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1797        ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1798             trackIndex, rtpTime, (long long)ntpTime);
1799
1800        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1801
1802        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1803
1804        track->mRTPAnchor = rtpTime;
1805        track->mNTPAnchorUs = ntpTimeUs;
1806
1807        if (mNTPAnchorUs < 0) {
1808            mNTPAnchorUs = ntpTimeUs;
1809            mMediaAnchorUs = mLastMediaTimeUs;
1810        }
1811
1812        if (!mAllTracksHaveTime) {
1813            bool allTracksHaveTime = (mTracks.size() > 0);
1814            for (size_t i = 0; i < mTracks.size(); ++i) {
1815                TrackInfo *track = &mTracks.editItemAt(i);
1816                if (track->mNTPAnchorUs < 0) {
1817                    allTracksHaveTime = false;
1818                    break;
1819                }
1820            }
1821            if (allTracksHaveTime) {
1822                mAllTracksHaveTime = true;
1823                ALOGI("Time now established for all tracks.");
1824            }
1825        }
1826        if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1827            handleFirstAccessUnit();
1828
1829            // Time is now established, lets start timestamping immediately
1830            for (size_t i = 0; i < mTracks.size(); ++i) {
1831                TrackInfo *trackInfo = &mTracks.editItemAt(i);
1832                while (!trackInfo->mPackets.empty()) {
1833                    sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
1834                    trackInfo->mPackets.erase(trackInfo->mPackets.begin());
1835
1836                    if (addMediaTimestamp(i, trackInfo, accessUnit)) {
1837                        postQueueAccessUnit(i, accessUnit);
1838                    }
1839                }
1840            }
1841            for (size_t i = 0; i < mTracks.size(); ++i) {
1842                TrackInfo *trackInfo = &mTracks.editItemAt(i);
1843                if (trackInfo->mEOSReceived) {
1844                    postQueueEOS(i, ERROR_END_OF_STREAM);
1845                    trackInfo->mEOSReceived = false;
1846                }
1847            }
1848        }
1849    }
1850
1851    void onAccessUnitComplete(
1852            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1853        ALOGV("onAccessUnitComplete track %d", trackIndex);
1854
1855        if(!mPlayResponseParsed){
1856            ALOGI("play response is not parsed, storing accessunit");
1857            TrackInfo *track = &mTracks.editItemAt(trackIndex);
1858            track->mPackets.push_back(accessUnit);
1859            return;
1860        }
1861
1862        handleFirstAccessUnit();
1863
1864        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1865
1866        if (!mAllTracksHaveTime) {
1867            ALOGV("storing accessUnit, no time established yet");
1868            track->mPackets.push_back(accessUnit);
1869            return;
1870        }
1871
1872        while (!track->mPackets.empty()) {
1873            sp<ABuffer> accessUnit = *track->mPackets.begin();
1874            track->mPackets.erase(track->mPackets.begin());
1875
1876            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1877                postQueueAccessUnit(trackIndex, accessUnit);
1878            }
1879        }
1880
1881        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1882            postQueueAccessUnit(trackIndex, accessUnit);
1883        }
1884
1885        if (track->mEOSReceived) {
1886            postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1887            track->mEOSReceived = false;
1888        }
1889    }
1890
1891    bool addMediaTimestamp(
1892            int32_t trackIndex, const TrackInfo *track,
1893            const sp<ABuffer> &accessUnit) {
1894        UNUSED_UNLESS_VERBOSE(trackIndex);
1895
1896        uint32_t rtpTime;
1897        CHECK(accessUnit->meta()->findInt32(
1898                    "rtp-time", (int32_t *)&rtpTime));
1899
1900        int64_t relRtpTimeUs =
1901            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1902                / track->mTimeScale;
1903
1904        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1905
1906        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1907
1908        if (mediaTimeUs > mLastMediaTimeUs) {
1909            mLastMediaTimeUs = mediaTimeUs;
1910        }
1911
1912        if (mediaTimeUs < 0) {
1913            ALOGV("dropping early accessUnit.");
1914            return false;
1915        }
1916
1917        ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1918             trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1919
1920        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1921
1922        return true;
1923    }
1924
1925    void postQueueAccessUnit(
1926            size_t trackIndex, const sp<ABuffer> &accessUnit) {
1927        sp<AMessage> msg = mNotify->dup();
1928        msg->setInt32("what", kWhatAccessUnit);
1929        msg->setSize("trackIndex", trackIndex);
1930        msg->setBuffer("accessUnit", accessUnit);
1931        msg->post();
1932    }
1933
1934    void postQueueEOS(size_t trackIndex, status_t finalResult) {
1935        sp<AMessage> msg = mNotify->dup();
1936        msg->setInt32("what", kWhatEOS);
1937        msg->setSize("trackIndex", trackIndex);
1938        msg->setInt32("finalResult", finalResult);
1939        msg->post();
1940    }
1941
1942    void postQueueSeekDiscontinuity(size_t trackIndex) {
1943        sp<AMessage> msg = mNotify->dup();
1944        msg->setInt32("what", kWhatSeekDiscontinuity);
1945        msg->setSize("trackIndex", trackIndex);
1946        msg->post();
1947    }
1948
1949    void postNormalPlayTimeMapping(
1950            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1951        sp<AMessage> msg = mNotify->dup();
1952        msg->setInt32("what", kWhatNormalPlayTimeMapping);
1953        msg->setSize("trackIndex", trackIndex);
1954        msg->setInt32("rtpTime", rtpTime);
1955        msg->setInt64("nptUs", nptUs);
1956        msg->post();
1957    }
1958
1959    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1960};
1961
1962}  // namespace android
1963
1964#endif  // MY_HANDLER_H_
1965