MyHandler.h revision 2bfdd428c56c7524d1a11979f200a1762866032d
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22#define LOG_TAG "MyHandler"
23#include <utils/Log.h>
24
25#include "APacketSource.h"
26#include "ARTPConnection.h"
27#include "ARTSPConnection.h"
28#include "ASessionDescription.h"
29
30#include <ctype.h>
31#include <cutils/properties.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/ALooper.h>
36#include <media/stagefright/foundation/AMessage.h>
37#include <media/stagefright/MediaErrors.h>
38
39#include <arpa/inet.h>
40#include <sys/socket.h>
41#include <netdb.h>
42
43#include "HTTPBase.h"
44
45// If no access units are received within 5 secs, assume that the rtp
46// stream has ended and signal end of stream.
47static int64_t kAccessUnitTimeoutUs = 5000000ll;
48
49// If no access units arrive for the first 10 secs after starting the
50// stream, assume none ever will and signal EOS or switch transports.
51static int64_t kStartupTimeoutUs = 10000000ll;
52
53namespace android {
54
55static void MakeUserAgentString(AString *s) {
56    s->setTo("stagefright/1.1 (Linux;Android ");
57
58#if (PROPERTY_VALUE_MAX < 8)
59#error "PROPERTY_VALUE_MAX must be at least 8"
60#endif
61
62    char value[PROPERTY_VALUE_MAX];
63    property_get("ro.build.version.release", value, "Unknown");
64    s->append(value);
65    s->append(")");
66}
67
68static bool GetAttribute(const char *s, const char *key, AString *value) {
69    value->clear();
70
71    size_t keyLen = strlen(key);
72
73    for (;;) {
74        while (isspace(*s)) {
75            ++s;
76        }
77
78        const char *colonPos = strchr(s, ';');
79
80        size_t len =
81            (colonPos == NULL) ? strlen(s) : colonPos - s;
82
83        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
84            value->setTo(&s[keyLen + 1], len - keyLen - 1);
85            return true;
86        }
87
88        if (colonPos == NULL) {
89            return false;
90        }
91
92        s = colonPos + 1;
93    }
94}
95
96struct MyHandler : public AHandler {
97    enum {
98        kWhatConnected                  = 'conn',
99        kWhatDisconnected               = 'disc',
100        kWhatSeekDone                   = 'sdon',
101
102        kWhatAccessUnit                 = 'accU',
103        kWhatEOS                        = 'eos!',
104        kWhatSeekDiscontinuity          = 'seeD',
105        kWhatNormalPlayTimeMapping      = 'nptM',
106    };
107
108    MyHandler(
109            const char *url,
110            const sp<AMessage> &notify,
111            bool uidValid = false, uid_t uid = 0)
112        : mNotify(notify),
113          mUIDValid(uidValid),
114          mUID(uid),
115          mNetLooper(new ALooper),
116          mConn(new ARTSPConnection(mUIDValid, mUID)),
117          mRTPConn(new ARTPConnection),
118          mOriginalSessionURL(url),
119          mSessionURL(url),
120          mSetupTracksSuccessful(false),
121          mSeekPending(false),
122          mFirstAccessUnit(true),
123          mNTPAnchorUs(-1),
124          mMediaAnchorUs(-1),
125          mLastMediaTimeUs(0),
126          mNumAccessUnitsReceived(0),
127          mCheckPending(false),
128          mCheckGeneration(0),
129          mTryTCPInterleaving(false),
130          mTryFakeRTCP(false),
131          mReceivedFirstRTCPPacket(false),
132          mReceivedFirstRTPPacket(false),
133          mSeekable(false) {
134        mNetLooper->setName("rtsp net");
135        mNetLooper->start(false /* runOnCallingThread */,
136                          false /* canCallJava */,
137                          PRIORITY_HIGHEST);
138
139        // Strip any authentication info from the session url, we don't
140        // want to transmit user/pass in cleartext.
141        AString host, path, user, pass;
142        unsigned port;
143        CHECK(ARTSPConnection::ParseURL(
144                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
145
146        if (user.size() > 0) {
147            mSessionURL.clear();
148            mSessionURL.append("rtsp://");
149            mSessionURL.append(host);
150            mSessionURL.append(":");
151            mSessionURL.append(StringPrintf("%u", port));
152            mSessionURL.append(path);
153
154            LOGI("rewritten session url: '%s'", mSessionURL.c_str());
155        }
156
157        mSessionHost = host;
158    }
159
160    void connect() {
161        looper()->registerHandler(mConn);
162        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
163
164        sp<AMessage> notify = new AMessage('biny', id());
165        mConn->observeBinaryData(notify);
166
167        sp<AMessage> reply = new AMessage('conn', id());
168        mConn->connect(mOriginalSessionURL.c_str(), reply);
169    }
170
171    void disconnect() {
172        (new AMessage('abor', id()))->post();
173    }
174
175    void seek(int64_t timeUs) {
176        sp<AMessage> msg = new AMessage('seek', id());
177        msg->setInt64("time", timeUs);
178        msg->post();
179    }
180
181    static void addRR(const sp<ABuffer> &buf) {
182        uint8_t *ptr = buf->data() + buf->size();
183        ptr[0] = 0x80 | 0;
184        ptr[1] = 201;  // RR
185        ptr[2] = 0;
186        ptr[3] = 1;
187        ptr[4] = 0xde;  // SSRC
188        ptr[5] = 0xad;
189        ptr[6] = 0xbe;
190        ptr[7] = 0xef;
191
192        buf->setRange(0, buf->size() + 8);
193    }
194
195    static void addSDES(int s, const sp<ABuffer> &buffer) {
196        struct sockaddr_in addr;
197        socklen_t addrSize = sizeof(addr);
198        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
199
200        uint8_t *data = buffer->data() + buffer->size();
201        data[0] = 0x80 | 1;
202        data[1] = 202;  // SDES
203        data[4] = 0xde;  // SSRC
204        data[5] = 0xad;
205        data[6] = 0xbe;
206        data[7] = 0xef;
207
208        size_t offset = 8;
209
210        data[offset++] = 1;  // CNAME
211
212        AString cname = "stagefright@";
213        cname.append(inet_ntoa(addr.sin_addr));
214        data[offset++] = cname.size();
215
216        memcpy(&data[offset], cname.c_str(), cname.size());
217        offset += cname.size();
218
219        data[offset++] = 6;  // TOOL
220
221        AString tool;
222        MakeUserAgentString(&tool);
223
224        data[offset++] = tool.size();
225
226        memcpy(&data[offset], tool.c_str(), tool.size());
227        offset += tool.size();
228
229        data[offset++] = 0;
230
231        if ((offset % 4) > 0) {
232            size_t count = 4 - (offset % 4);
233            switch (count) {
234                case 3:
235                    data[offset++] = 0;
236                case 2:
237                    data[offset++] = 0;
238                case 1:
239                    data[offset++] = 0;
240            }
241        }
242
243        size_t numWords = (offset / 4) - 1;
244        data[2] = numWords >> 8;
245        data[3] = numWords & 0xff;
246
247        buffer->setRange(buffer->offset(), buffer->size() + offset);
248    }
249
250    // In case we're behind NAT, fire off two UDP packets to the remote
251    // rtp/rtcp ports to poke a hole into the firewall for future incoming
252    // packets. We're going to send an RR/SDES RTCP packet to both of them.
253    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
254        struct sockaddr_in addr;
255        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
256        addr.sin_family = AF_INET;
257
258        AString source;
259        AString server_port;
260        if (!GetAttribute(transport.c_str(),
261                          "source",
262                          &source)) {
263            LOGW("Missing 'source' field in Transport response. Using "
264                 "RTSP endpoint address.");
265
266            struct hostent *ent = gethostbyname(mSessionHost.c_str());
267            if (ent == NULL) {
268                LOGE("Failed to look up address of session host '%s'",
269                     mSessionHost.c_str());
270
271                return false;
272            }
273
274            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
275        } else {
276            addr.sin_addr.s_addr = inet_addr(source.c_str());
277        }
278
279        if (!GetAttribute(transport.c_str(),
280                                 "server_port",
281                                 &server_port)) {
282            LOGI("Missing 'server_port' field in Transport response.");
283            return false;
284        }
285
286        int rtpPort, rtcpPort;
287        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
288                || rtpPort <= 0 || rtpPort > 65535
289                || rtcpPort <=0 || rtcpPort > 65535
290                || rtcpPort != rtpPort + 1) {
291            LOGE("Server picked invalid RTP/RTCP port pair %s,"
292                 " RTP port must be even, RTCP port must be one higher.",
293                 server_port.c_str());
294
295            return false;
296        }
297
298        if (rtpPort & 1) {
299            LOGW("Server picked an odd RTP port, it should've picked an "
300                 "even one, we'll let it pass for now, but this may break "
301                 "in the future.");
302        }
303
304        if (addr.sin_addr.s_addr == INADDR_NONE) {
305            return true;
306        }
307
308        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
309            // No firewalls to traverse on the loopback interface.
310            return true;
311        }
312
313        // Make up an RR/SDES RTCP packet.
314        sp<ABuffer> buf = new ABuffer(65536);
315        buf->setRange(0, 0);
316        addRR(buf);
317        addSDES(rtpSocket, buf);
318
319        addr.sin_port = htons(rtpPort);
320
321        ssize_t n = sendto(
322                rtpSocket, buf->data(), buf->size(), 0,
323                (const sockaddr *)&addr, sizeof(addr));
324
325        if (n < (ssize_t)buf->size()) {
326            LOGE("failed to poke a hole for RTP packets");
327            return false;
328        }
329
330        addr.sin_port = htons(rtcpPort);
331
332        n = sendto(
333                rtcpSocket, buf->data(), buf->size(), 0,
334                (const sockaddr *)&addr, sizeof(addr));
335
336        if (n < (ssize_t)buf->size()) {
337            LOGE("failed to poke a hole for RTCP packets");
338            return false;
339        }
340
341        LOGV("successfully poked holes.");
342
343        return true;
344    }
345
346    virtual void onMessageReceived(const sp<AMessage> &msg) {
347        switch (msg->what()) {
348            case 'conn':
349            {
350                int32_t result;
351                CHECK(msg->findInt32("result", &result));
352
353                LOGI("connection request completed with result %d (%s)",
354                     result, strerror(-result));
355
356                if (result == OK) {
357                    AString request;
358                    request = "DESCRIBE ";
359                    request.append(mSessionURL);
360                    request.append(" RTSP/1.0\r\n");
361                    request.append("Accept: application/sdp\r\n");
362                    request.append("\r\n");
363
364                    sp<AMessage> reply = new AMessage('desc', id());
365                    mConn->sendRequest(request.c_str(), reply);
366                } else {
367                    (new AMessage('disc', id()))->post();
368                }
369                break;
370            }
371
372            case 'disc':
373            {
374                int32_t reconnect;
375                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
376                    sp<AMessage> reply = new AMessage('conn', id());
377                    mConn->connect(mOriginalSessionURL.c_str(), reply);
378                } else {
379                    (new AMessage('quit', id()))->post();
380                }
381                break;
382            }
383
384            case 'desc':
385            {
386                int32_t result;
387                CHECK(msg->findInt32("result", &result));
388
389                LOGI("DESCRIBE completed with result %d (%s)",
390                     result, strerror(-result));
391
392                if (result == OK) {
393                    sp<RefBase> obj;
394                    CHECK(msg->findObject("response", &obj));
395                    sp<ARTSPResponse> response =
396                        static_cast<ARTSPResponse *>(obj.get());
397
398                    if (response->mStatusCode == 302) {
399                        ssize_t i = response->mHeaders.indexOfKey("location");
400                        CHECK_GE(i, 0);
401
402                        mSessionURL = response->mHeaders.valueAt(i);
403
404                        AString request;
405                        request = "DESCRIBE ";
406                        request.append(mSessionURL);
407                        request.append(" RTSP/1.0\r\n");
408                        request.append("Accept: application/sdp\r\n");
409                        request.append("\r\n");
410
411                        sp<AMessage> reply = new AMessage('desc', id());
412                        mConn->sendRequest(request.c_str(), reply);
413                        break;
414                    }
415
416                    if (response->mStatusCode != 200) {
417                        result = UNKNOWN_ERROR;
418                    } else {
419                        mSessionDesc = new ASessionDescription;
420
421                        mSessionDesc->setTo(
422                                response->mContent->data(),
423                                response->mContent->size());
424
425                        if (!mSessionDesc->isValid()) {
426                            LOGE("Failed to parse session description.");
427                            result = ERROR_MALFORMED;
428                        } else {
429                            ssize_t i = response->mHeaders.indexOfKey("content-base");
430                            if (i >= 0) {
431                                mBaseURL = response->mHeaders.valueAt(i);
432                            } else {
433                                i = response->mHeaders.indexOfKey("content-location");
434                                if (i >= 0) {
435                                    mBaseURL = response->mHeaders.valueAt(i);
436                                } else {
437                                    mBaseURL = mSessionURL;
438                                }
439                            }
440
441                            if (!mBaseURL.startsWith("rtsp://")) {
442                                // Some misbehaving servers specify a relative
443                                // URL in one of the locations above, combine
444                                // it with the absolute session URL to get
445                                // something usable...
446
447                                LOGW("Server specified a non-absolute base URL"
448                                     ", combining it with the session URL to "
449                                     "get something usable...");
450
451                                AString tmp;
452                                CHECK(MakeURL(
453                                            mSessionURL.c_str(),
454                                            mBaseURL.c_str(),
455                                            &tmp));
456
457                                mBaseURL = tmp;
458                            }
459
460                            CHECK_GT(mSessionDesc->countTracks(), 1u);
461                            setupTrack(1);
462                        }
463                    }
464                }
465
466                if (result != OK) {
467                    sp<AMessage> reply = new AMessage('disc', id());
468                    mConn->disconnect(reply);
469                }
470                break;
471            }
472
473            case 'setu':
474            {
475                size_t index;
476                CHECK(msg->findSize("index", &index));
477
478                TrackInfo *track = NULL;
479                size_t trackIndex;
480                if (msg->findSize("track-index", &trackIndex)) {
481                    track = &mTracks.editItemAt(trackIndex);
482                }
483
484                int32_t result;
485                CHECK(msg->findInt32("result", &result));
486
487                LOGI("SETUP(%d) completed with result %d (%s)",
488                     index, result, strerror(-result));
489
490                if (result == OK) {
491                    CHECK(track != NULL);
492
493                    sp<RefBase> obj;
494                    CHECK(msg->findObject("response", &obj));
495                    sp<ARTSPResponse> response =
496                        static_cast<ARTSPResponse *>(obj.get());
497
498                    if (response->mStatusCode != 200) {
499                        result = UNKNOWN_ERROR;
500                    } else {
501                        ssize_t i = response->mHeaders.indexOfKey("session");
502                        CHECK_GE(i, 0);
503
504                        mSessionID = response->mHeaders.valueAt(i);
505                        i = mSessionID.find(";");
506                        if (i >= 0) {
507                            // Remove options, i.e. ";timeout=90"
508                            mSessionID.erase(i, mSessionID.size() - i);
509                        }
510
511                        sp<AMessage> notify = new AMessage('accu', id());
512                        notify->setSize("track-index", trackIndex);
513
514                        i = response->mHeaders.indexOfKey("transport");
515                        CHECK_GE(i, 0);
516
517                        if (!track->mUsingInterleavedTCP) {
518                            AString transport = response->mHeaders.valueAt(i);
519
520                            // We are going to continue even if we were
521                            // unable to poke a hole into the firewall...
522                            pokeAHole(
523                                    track->mRTPSocket,
524                                    track->mRTCPSocket,
525                                    transport);
526                        }
527
528                        mRTPConn->addStream(
529                                track->mRTPSocket, track->mRTCPSocket,
530                                mSessionDesc, index,
531                                notify, track->mUsingInterleavedTCP);
532
533                        mSetupTracksSuccessful = true;
534                    }
535                }
536
537                if (result != OK) {
538                    if (track) {
539                        if (!track->mUsingInterleavedTCP) {
540                            // Clear the tag
541                            if (mUIDValid) {
542                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
543                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
544                            }
545
546                            close(track->mRTPSocket);
547                            close(track->mRTCPSocket);
548                        }
549
550                        mTracks.removeItemsAt(trackIndex);
551                    }
552                }
553
554                ++index;
555                if (index < mSessionDesc->countTracks()) {
556                    setupTrack(index);
557                } else if (mSetupTracksSuccessful) {
558                    AString request = "PLAY ";
559                    request.append(mSessionURL);
560                    request.append(" RTSP/1.0\r\n");
561
562                    request.append("Session: ");
563                    request.append(mSessionID);
564                    request.append("\r\n");
565
566                    request.append("\r\n");
567
568                    sp<AMessage> reply = new AMessage('play', id());
569                    mConn->sendRequest(request.c_str(), reply);
570                } else {
571                    sp<AMessage> reply = new AMessage('disc', id());
572                    mConn->disconnect(reply);
573                }
574                break;
575            }
576
577            case 'play':
578            {
579                int32_t result;
580                CHECK(msg->findInt32("result", &result));
581
582                LOGI("PLAY completed with result %d (%s)",
583                     result, strerror(-result));
584
585                if (result == OK) {
586                    sp<RefBase> obj;
587                    CHECK(msg->findObject("response", &obj));
588                    sp<ARTSPResponse> response =
589                        static_cast<ARTSPResponse *>(obj.get());
590
591                    if (response->mStatusCode != 200) {
592                        result = UNKNOWN_ERROR;
593                    } else {
594                        parsePlayResponse(response);
595
596                        sp<AMessage> timeout = new AMessage('tiou', id());
597                        timeout->post(kStartupTimeoutUs);
598                    }
599                }
600
601                if (result != OK) {
602                    sp<AMessage> reply = new AMessage('disc', id());
603                    mConn->disconnect(reply);
604                }
605
606                break;
607            }
608
609            case 'abor':
610            {
611                for (size_t i = 0; i < mTracks.size(); ++i) {
612                    TrackInfo *info = &mTracks.editItemAt(i);
613
614                    if (!mFirstAccessUnit) {
615                        postQueueEOS(i, ERROR_END_OF_STREAM);
616                    }
617
618                    if (!info->mUsingInterleavedTCP) {
619                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
620
621                        // Clear the tag
622                        if (mUIDValid) {
623                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
624                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
625                        }
626
627                        close(info->mRTPSocket);
628                        close(info->mRTCPSocket);
629                    }
630                }
631                mTracks.clear();
632                mSetupTracksSuccessful = false;
633                mSeekPending = false;
634                mFirstAccessUnit = true;
635                mNTPAnchorUs = -1;
636                mMediaAnchorUs = -1;
637                mNumAccessUnitsReceived = 0;
638                mReceivedFirstRTCPPacket = false;
639                mReceivedFirstRTPPacket = false;
640                mSeekable = false;
641
642                sp<AMessage> reply = new AMessage('tear', id());
643
644                int32_t reconnect;
645                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
646                    reply->setInt32("reconnect", true);
647                }
648
649                AString request;
650                request = "TEARDOWN ";
651
652                // XXX should use aggregate url from SDP here...
653                request.append(mSessionURL);
654                request.append(" RTSP/1.0\r\n");
655
656                request.append("Session: ");
657                request.append(mSessionID);
658                request.append("\r\n");
659
660                request.append("\r\n");
661
662                mConn->sendRequest(request.c_str(), reply);
663                break;
664            }
665
666            case 'tear':
667            {
668                int32_t result;
669                CHECK(msg->findInt32("result", &result));
670
671                LOGI("TEARDOWN completed with result %d (%s)",
672                     result, strerror(-result));
673
674                sp<AMessage> reply = new AMessage('disc', id());
675
676                int32_t reconnect;
677                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
678                    reply->setInt32("reconnect", true);
679                }
680
681                mConn->disconnect(reply);
682                break;
683            }
684
685            case 'quit':
686            {
687                sp<AMessage> msg = mNotify->dup();
688                msg->setInt32("what", kWhatDisconnected);
689                msg->setInt32("result", UNKNOWN_ERROR);
690                msg->post();
691                break;
692            }
693
694            case 'chek':
695            {
696                int32_t generation;
697                CHECK(msg->findInt32("generation", &generation));
698                if (generation != mCheckGeneration) {
699                    // This is an outdated message. Ignore.
700                    break;
701                }
702
703                if (mNumAccessUnitsReceived == 0) {
704                    LOGI("stream ended? aborting.");
705                    (new AMessage('abor', id()))->post();
706                    break;
707                }
708
709                mNumAccessUnitsReceived = 0;
710                msg->post(kAccessUnitTimeoutUs);
711                break;
712            }
713
714            case 'accu':
715            {
716                int32_t timeUpdate;
717                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
718                    size_t trackIndex;
719                    CHECK(msg->findSize("track-index", &trackIndex));
720
721                    uint32_t rtpTime;
722                    uint64_t ntpTime;
723                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
724                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
725
726                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
727                    break;
728                }
729
730                int32_t first;
731                if (msg->findInt32("first-rtcp", &first)) {
732                    mReceivedFirstRTCPPacket = true;
733                    break;
734                }
735
736                if (msg->findInt32("first-rtp", &first)) {
737                    mReceivedFirstRTPPacket = true;
738                    break;
739                }
740
741                ++mNumAccessUnitsReceived;
742                postAccessUnitTimeoutCheck();
743
744                size_t trackIndex;
745                CHECK(msg->findSize("track-index", &trackIndex));
746
747                if (trackIndex >= mTracks.size()) {
748                    LOGV("late packets ignored.");
749                    break;
750                }
751
752                TrackInfo *track = &mTracks.editItemAt(trackIndex);
753
754                int32_t eos;
755                if (msg->findInt32("eos", &eos)) {
756                    LOGI("received BYE on track index %d", trackIndex);
757#if 0
758                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
759#endif
760                    return;
761                }
762
763                sp<RefBase> obj;
764                CHECK(msg->findObject("access-unit", &obj));
765
766                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
767
768                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
769
770                if (mSeekPending) {
771                    LOGV("we're seeking, dropping stale packet.");
772                    break;
773                }
774
775                if (seqNum < track->mFirstSeqNumInSegment) {
776                    LOGV("dropping stale access-unit (%d < %d)",
777                         seqNum, track->mFirstSeqNumInSegment);
778                    break;
779                }
780
781                if (track->mNewSegment) {
782                    track->mNewSegment = false;
783                }
784
785                onAccessUnitComplete(trackIndex, accessUnit);
786                break;
787            }
788
789            case 'seek':
790            {
791                if (!mSeekable) {
792                    LOGW("This is a live stream, ignoring seek request.");
793
794                    sp<AMessage> msg = mNotify->dup();
795                    msg->setInt32("what", kWhatSeekDone);
796                    msg->post();
797                    break;
798                }
799
800                int64_t timeUs;
801                CHECK(msg->findInt64("time", &timeUs));
802
803                mSeekPending = true;
804
805                // Disable the access unit timeout until we resumed
806                // playback again.
807                mCheckPending = true;
808                ++mCheckGeneration;
809
810                AString request = "PAUSE ";
811                request.append(mSessionURL);
812                request.append(" RTSP/1.0\r\n");
813
814                request.append("Session: ");
815                request.append(mSessionID);
816                request.append("\r\n");
817
818                request.append("\r\n");
819
820                sp<AMessage> reply = new AMessage('see1', id());
821                reply->setInt64("time", timeUs);
822                mConn->sendRequest(request.c_str(), reply);
823                break;
824            }
825
826            case 'see1':
827            {
828                // Session is paused now.
829                for (size_t i = 0; i < mTracks.size(); ++i) {
830                    TrackInfo *info = &mTracks.editItemAt(i);
831
832                    postQueueSeekDiscontinuity(i);
833
834                    info->mRTPAnchor = 0;
835                    info->mNTPAnchorUs = -1;
836                }
837
838                mNTPAnchorUs = -1;
839
840                int64_t timeUs;
841                CHECK(msg->findInt64("time", &timeUs));
842
843                AString request = "PLAY ";
844                request.append(mSessionURL);
845                request.append(" RTSP/1.0\r\n");
846
847                request.append("Session: ");
848                request.append(mSessionID);
849                request.append("\r\n");
850
851                request.append(
852                        StringPrintf(
853                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
854
855                request.append("\r\n");
856
857                sp<AMessage> reply = new AMessage('see2', id());
858                mConn->sendRequest(request.c_str(), reply);
859                break;
860            }
861
862            case 'see2':
863            {
864                CHECK(mSeekPending);
865
866                int32_t result;
867                CHECK(msg->findInt32("result", &result));
868
869                LOGI("PLAY completed with result %d (%s)",
870                     result, strerror(-result));
871
872                mCheckPending = false;
873                postAccessUnitTimeoutCheck();
874
875                if (result == OK) {
876                    sp<RefBase> obj;
877                    CHECK(msg->findObject("response", &obj));
878                    sp<ARTSPResponse> response =
879                        static_cast<ARTSPResponse *>(obj.get());
880
881                    if (response->mStatusCode != 200) {
882                        result = UNKNOWN_ERROR;
883                    } else {
884                        parsePlayResponse(response);
885
886                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
887                        CHECK_GE(i, 0);
888
889                        LOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
890
891                        LOGI("seek completed.");
892                    }
893                }
894
895                if (result != OK) {
896                    LOGE("seek failed, aborting.");
897                    (new AMessage('abor', id()))->post();
898                }
899
900                mSeekPending = false;
901
902                sp<AMessage> msg = mNotify->dup();
903                msg->setInt32("what", kWhatSeekDone);
904                msg->post();
905                break;
906            }
907
908            case 'biny':
909            {
910                sp<RefBase> obj;
911                CHECK(msg->findObject("buffer", &obj));
912                sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
913
914                int32_t index;
915                CHECK(buffer->meta()->findInt32("index", &index));
916
917                mRTPConn->injectPacket(index, buffer);
918                break;
919            }
920
921            case 'tiou':
922            {
923                if (!mReceivedFirstRTCPPacket) {
924                    if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
925                        LOGW("We received RTP packets but no RTCP packets, "
926                             "using fake timestamps.");
927
928                        mTryFakeRTCP = true;
929
930                        mReceivedFirstRTCPPacket = true;
931
932                        fakeTimestamps();
933                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
934                        LOGW("Never received any data, switching transports.");
935
936                        mTryTCPInterleaving = true;
937
938                        sp<AMessage> msg = new AMessage('abor', id());
939                        msg->setInt32("reconnect", true);
940                        msg->post();
941                    } else {
942                        LOGW("Never received any data, disconnecting.");
943                        (new AMessage('abor', id()))->post();
944                    }
945                }
946                break;
947            }
948
949            default:
950                TRESPASS();
951                break;
952        }
953    }
954
955    void postAccessUnitTimeoutCheck() {
956        if (mCheckPending) {
957            return;
958        }
959
960        mCheckPending = true;
961        sp<AMessage> check = new AMessage('chek', id());
962        check->setInt32("generation", mCheckGeneration);
963        check->post(kAccessUnitTimeoutUs);
964    }
965
966    static void SplitString(
967            const AString &s, const char *separator, List<AString> *items) {
968        items->clear();
969        size_t start = 0;
970        while (start < s.size()) {
971            ssize_t offset = s.find(separator, start);
972
973            if (offset < 0) {
974                items->push_back(AString(s, start, s.size() - start));
975                break;
976            }
977
978            items->push_back(AString(s, start, offset - start));
979            start = offset + strlen(separator);
980        }
981    }
982
983    void parsePlayResponse(const sp<ARTSPResponse> &response) {
984        mSeekable = false;
985
986        ssize_t i = response->mHeaders.indexOfKey("range");
987        if (i < 0) {
988            // Server doesn't even tell use what range it is going to
989            // play, therefore we won't support seeking.
990            return;
991        }
992
993        AString range = response->mHeaders.valueAt(i);
994        LOGV("Range: %s", range.c_str());
995
996        AString val;
997        CHECK(GetAttribute(range.c_str(), "npt", &val));
998
999        float npt1, npt2;
1000        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1001            // This is a live stream and therefore not seekable.
1002            return;
1003        }
1004
1005        i = response->mHeaders.indexOfKey("rtp-info");
1006        CHECK_GE(i, 0);
1007
1008        AString rtpInfo = response->mHeaders.valueAt(i);
1009        List<AString> streamInfos;
1010        SplitString(rtpInfo, ",", &streamInfos);
1011
1012        int n = 1;
1013        for (List<AString>::iterator it = streamInfos.begin();
1014             it != streamInfos.end(); ++it) {
1015            (*it).trim();
1016            LOGV("streamInfo[%d] = %s", n, (*it).c_str());
1017
1018            CHECK(GetAttribute((*it).c_str(), "url", &val));
1019
1020            size_t trackIndex = 0;
1021            while (trackIndex < mTracks.size()
1022                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1023                ++trackIndex;
1024            }
1025            CHECK_LT(trackIndex, mTracks.size());
1026
1027            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1028
1029            char *end;
1030            unsigned long seq = strtoul(val.c_str(), &end, 10);
1031
1032            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1033            info->mFirstSeqNumInSegment = seq;
1034            info->mNewSegment = true;
1035
1036            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1037
1038            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1039
1040            LOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1041
1042            info->mNormalPlayTimeRTP = rtpTime;
1043            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1044
1045            if (!mFirstAccessUnit) {
1046                postNormalPlayTimeMapping(
1047                        trackIndex,
1048                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1049            }
1050
1051            ++n;
1052        }
1053
1054        mSeekable = true;
1055    }
1056
1057    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1058        CHECK_GE(index, 0u);
1059        CHECK_LT(index, mTracks.size());
1060
1061        const TrackInfo &info = mTracks.itemAt(index);
1062
1063        *timeScale = info.mTimeScale;
1064
1065        return info.mPacketSource->getFormat();
1066    }
1067
1068    size_t countTracks() const {
1069        return mTracks.size();
1070    }
1071
1072private:
1073    struct TrackInfo {
1074        AString mURL;
1075        int mRTPSocket;
1076        int mRTCPSocket;
1077        bool mUsingInterleavedTCP;
1078        uint32_t mFirstSeqNumInSegment;
1079        bool mNewSegment;
1080
1081        uint32_t mRTPAnchor;
1082        int64_t mNTPAnchorUs;
1083        int32_t mTimeScale;
1084
1085        uint32_t mNormalPlayTimeRTP;
1086        int64_t mNormalPlayTimeUs;
1087
1088        sp<APacketSource> mPacketSource;
1089
1090        // Stores packets temporarily while no notion of time
1091        // has been established yet.
1092        List<sp<ABuffer> > mPackets;
1093    };
1094
1095    sp<AMessage> mNotify;
1096    bool mUIDValid;
1097    uid_t mUID;
1098    sp<ALooper> mNetLooper;
1099    sp<ARTSPConnection> mConn;
1100    sp<ARTPConnection> mRTPConn;
1101    sp<ASessionDescription> mSessionDesc;
1102    AString mOriginalSessionURL;  // This one still has user:pass@
1103    AString mSessionURL;
1104    AString mSessionHost;
1105    AString mBaseURL;
1106    AString mSessionID;
1107    bool mSetupTracksSuccessful;
1108    bool mSeekPending;
1109    bool mFirstAccessUnit;
1110
1111    int64_t mNTPAnchorUs;
1112    int64_t mMediaAnchorUs;
1113    int64_t mLastMediaTimeUs;
1114
1115    int64_t mNumAccessUnitsReceived;
1116    bool mCheckPending;
1117    int32_t mCheckGeneration;
1118    bool mTryTCPInterleaving;
1119    bool mTryFakeRTCP;
1120    bool mReceivedFirstRTCPPacket;
1121    bool mReceivedFirstRTPPacket;
1122    bool mSeekable;
1123
1124    Vector<TrackInfo> mTracks;
1125
1126    void setupTrack(size_t index) {
1127        sp<APacketSource> source =
1128            new APacketSource(mSessionDesc, index);
1129
1130        if (source->initCheck() != OK) {
1131            LOGW("Unsupported format. Ignoring track #%d.", index);
1132
1133            sp<AMessage> reply = new AMessage('setu', id());
1134            reply->setSize("index", index);
1135            reply->setInt32("result", ERROR_UNSUPPORTED);
1136            reply->post();
1137            return;
1138        }
1139
1140        AString url;
1141        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1142
1143        AString trackURL;
1144        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1145
1146        mTracks.push(TrackInfo());
1147        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1148        info->mURL = trackURL;
1149        info->mPacketSource = source;
1150        info->mUsingInterleavedTCP = false;
1151        info->mFirstSeqNumInSegment = 0;
1152        info->mNewSegment = true;
1153        info->mRTPAnchor = 0;
1154        info->mNTPAnchorUs = -1;
1155        info->mNormalPlayTimeRTP = 0;
1156        info->mNormalPlayTimeUs = 0ll;
1157
1158        unsigned long PT;
1159        AString formatDesc;
1160        AString formatParams;
1161        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1162
1163        int32_t timescale;
1164        int32_t numChannels;
1165        ASessionDescription::ParseFormatDesc(
1166                formatDesc.c_str(), &timescale, &numChannels);
1167
1168        info->mTimeScale = timescale;
1169
1170        LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1171
1172        AString request = "SETUP ";
1173        request.append(trackURL);
1174        request.append(" RTSP/1.0\r\n");
1175
1176        if (mTryTCPInterleaving) {
1177            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1178            info->mUsingInterleavedTCP = true;
1179            info->mRTPSocket = interleaveIndex;
1180            info->mRTCPSocket = interleaveIndex + 1;
1181
1182            request.append("Transport: RTP/AVP/TCP;interleaved=");
1183            request.append(interleaveIndex);
1184            request.append("-");
1185            request.append(interleaveIndex + 1);
1186        } else {
1187            unsigned rtpPort;
1188            ARTPConnection::MakePortPair(
1189                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1190
1191            if (mUIDValid) {
1192                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1193                                                (uint32_t)*(uint32_t*) "RTP_");
1194                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1195                                                (uint32_t)*(uint32_t*) "RTP_");
1196            }
1197
1198            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1199            request.append(rtpPort);
1200            request.append("-");
1201            request.append(rtpPort + 1);
1202        }
1203
1204        request.append("\r\n");
1205
1206        if (index > 1) {
1207            request.append("Session: ");
1208            request.append(mSessionID);
1209            request.append("\r\n");
1210        }
1211
1212        request.append("\r\n");
1213
1214        sp<AMessage> reply = new AMessage('setu', id());
1215        reply->setSize("index", index);
1216        reply->setSize("track-index", mTracks.size() - 1);
1217        mConn->sendRequest(request.c_str(), reply);
1218    }
1219
1220    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1221        out->clear();
1222
1223        if (strncasecmp("rtsp://", baseURL, 7)) {
1224            // Base URL must be absolute
1225            return false;
1226        }
1227
1228        if (!strncasecmp("rtsp://", url, 7)) {
1229            // "url" is already an absolute URL, ignore base URL.
1230            out->setTo(url);
1231            return true;
1232        }
1233
1234        size_t n = strlen(baseURL);
1235        if (baseURL[n - 1] == '/') {
1236            out->setTo(baseURL);
1237            out->append(url);
1238        } else {
1239            const char *slashPos = strrchr(baseURL, '/');
1240
1241            if (slashPos > &baseURL[6]) {
1242                out->setTo(baseURL, slashPos - baseURL);
1243            } else {
1244                out->setTo(baseURL);
1245            }
1246
1247            out->append("/");
1248            out->append(url);
1249        }
1250
1251        return true;
1252    }
1253
1254    void fakeTimestamps() {
1255        for (size_t i = 0; i < mTracks.size(); ++i) {
1256            onTimeUpdate(i, 0, 0ll);
1257        }
1258    }
1259
1260    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1261        LOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1262             trackIndex, rtpTime, ntpTime);
1263
1264        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1265
1266        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1267
1268        track->mRTPAnchor = rtpTime;
1269        track->mNTPAnchorUs = ntpTimeUs;
1270
1271        if (mNTPAnchorUs < 0) {
1272            mNTPAnchorUs = ntpTimeUs;
1273            mMediaAnchorUs = mLastMediaTimeUs;
1274        }
1275    }
1276
1277    void onAccessUnitComplete(
1278            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1279        LOGV("onAccessUnitComplete track %d", trackIndex);
1280
1281        if (mFirstAccessUnit) {
1282            sp<AMessage> msg = mNotify->dup();
1283            msg->setInt32("what", kWhatConnected);
1284            msg->post();
1285
1286            for (size_t i = 0; i < mTracks.size(); ++i) {
1287                TrackInfo *info = &mTracks.editItemAt(i);
1288
1289                postNormalPlayTimeMapping(
1290                        i,
1291                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1292            }
1293
1294            mFirstAccessUnit = false;
1295        }
1296
1297        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1298
1299        if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) {
1300            LOGV("storing accessUnit, no time established yet");
1301            track->mPackets.push_back(accessUnit);
1302            return;
1303        }
1304
1305        while (!track->mPackets.empty()) {
1306            sp<ABuffer> accessUnit = *track->mPackets.begin();
1307            track->mPackets.erase(track->mPackets.begin());
1308
1309            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1310                postQueueAccessUnit(trackIndex, accessUnit);
1311            }
1312        }
1313
1314        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1315            postQueueAccessUnit(trackIndex, accessUnit);
1316        }
1317    }
1318
1319    bool addMediaTimestamp(
1320            int32_t trackIndex, const TrackInfo *track,
1321            const sp<ABuffer> &accessUnit) {
1322        uint32_t rtpTime;
1323        CHECK(accessUnit->meta()->findInt32(
1324                    "rtp-time", (int32_t *)&rtpTime));
1325
1326        int64_t relRtpTimeUs =
1327            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1328                / track->mTimeScale;
1329
1330        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1331
1332        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1333
1334        if (mediaTimeUs > mLastMediaTimeUs) {
1335            mLastMediaTimeUs = mediaTimeUs;
1336        }
1337
1338        if (mediaTimeUs < 0) {
1339            LOGV("dropping early accessUnit.");
1340            return false;
1341        }
1342
1343        LOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1344             trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1345
1346        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1347
1348        return true;
1349    }
1350
1351    void postQueueAccessUnit(
1352            size_t trackIndex, const sp<ABuffer> &accessUnit) {
1353        sp<AMessage> msg = mNotify->dup();
1354        msg->setInt32("what", kWhatAccessUnit);
1355        msg->setSize("trackIndex", trackIndex);
1356        msg->setObject("accessUnit", accessUnit);
1357        msg->post();
1358    }
1359
1360    void postQueueEOS(size_t trackIndex, status_t finalResult) {
1361        sp<AMessage> msg = mNotify->dup();
1362        msg->setInt32("what", kWhatEOS);
1363        msg->setSize("trackIndex", trackIndex);
1364        msg->setInt32("finalResult", finalResult);
1365        msg->post();
1366    }
1367
1368    void postQueueSeekDiscontinuity(size_t trackIndex) {
1369        sp<AMessage> msg = mNotify->dup();
1370        msg->setInt32("what", kWhatSeekDiscontinuity);
1371        msg->setSize("trackIndex", trackIndex);
1372        msg->post();
1373    }
1374
1375    void postNormalPlayTimeMapping(
1376            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1377        sp<AMessage> msg = mNotify->dup();
1378        msg->setInt32("what", kWhatNormalPlayTimeMapping);
1379        msg->setSize("trackIndex", trackIndex);
1380        msg->setInt32("rtpTime", rtpTime);
1381        msg->setInt64("nptUs", nptUs);
1382        msg->post();
1383    }
1384
1385    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1386};
1387
1388}  // namespace android
1389
1390#endif  // MY_HANDLER_H_
1391