MyHandler.h revision 100a4408968b90e314526185d572c72ea4cc784a
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22#define LOG_TAG "MyHandler"
23#include <utils/Log.h>
24
25#include "APacketSource.h"
26#include "ARTPConnection.h"
27#include "ARTSPConnection.h"
28#include "ASessionDescription.h"
29
30#include <ctype.h>
31#include <cutils/properties.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/ALooper.h>
36#include <media/stagefright/foundation/AMessage.h>
37#include <media/stagefright/MediaErrors.h>
38
39#include <arpa/inet.h>
40#include <sys/socket.h>
41
42// If no access units are received within 5 secs, assume that the rtp
43// stream has ended and signal end of stream.
44static int64_t kAccessUnitTimeoutUs = 5000000ll;
45
46// If no access units arrive for the first 10 secs after starting the
47// stream, assume none ever will and signal EOS or switch transports.
48static int64_t kStartupTimeoutUs = 10000000ll;
49
50namespace android {
51
52static void MakeUserAgentString(AString *s) {
53    s->setTo("stagefright/1.1 (Linux;Android ");
54
55#if (PROPERTY_VALUE_MAX < 8)
56#error "PROPERTY_VALUE_MAX must be at least 8"
57#endif
58
59    char value[PROPERTY_VALUE_MAX];
60    property_get("ro.build.version.release", value, "Unknown");
61    s->append(value);
62    s->append(")");
63}
64
65static bool GetAttribute(const char *s, const char *key, AString *value) {
66    value->clear();
67
68    size_t keyLen = strlen(key);
69
70    for (;;) {
71        while (isspace(*s)) {
72            ++s;
73        }
74
75        const char *colonPos = strchr(s, ';');
76
77        size_t len =
78            (colonPos == NULL) ? strlen(s) : colonPos - s;
79
80        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
81            value->setTo(&s[keyLen + 1], len - keyLen - 1);
82            return true;
83        }
84
85        if (colonPos == NULL) {
86            return false;
87        }
88
89        s = colonPos + 1;
90    }
91}
92
93struct MyHandler : public AHandler {
94    MyHandler(const char *url, const sp<ALooper> &looper)
95        : mLooper(looper),
96          mNetLooper(new ALooper),
97          mConn(new ARTSPConnection),
98          mRTPConn(new ARTPConnection),
99          mOriginalSessionURL(url),
100          mSessionURL(url),
101          mSetupTracksSuccessful(false),
102          mSeekPending(false),
103          mFirstAccessUnit(true),
104          mNTPAnchorUs(-1),
105          mMediaAnchorUs(-1),
106          mLastMediaTimeUs(0),
107          mNumAccessUnitsReceived(0),
108          mCheckPending(false),
109          mCheckGeneration(0),
110          mTryTCPInterleaving(false),
111          mTryFakeRTCP(false),
112          mReceivedFirstRTCPPacket(false),
113          mReceivedFirstRTPPacket(false),
114          mSeekable(false) {
115        mNetLooper->setName("rtsp net");
116        mNetLooper->start(false /* runOnCallingThread */,
117                          false /* canCallJava */,
118                          PRIORITY_HIGHEST);
119
120        // Strip any authentication info from the session url, we don't
121        // want to transmit user/pass in cleartext.
122        AString host, path, user, pass;
123        unsigned port;
124        if (ARTSPConnection::ParseURL(
125                    mSessionURL.c_str(), &host, &port, &path, &user, &pass)
126                && user.size() > 0) {
127            mSessionURL.clear();
128            mSessionURL.append("rtsp://");
129            mSessionURL.append(host);
130            mSessionURL.append(":");
131            mSessionURL.append(StringPrintf("%u", port));
132            mSessionURL.append(path);
133
134            LOGI("rewritten session url: '%s'", mSessionURL.c_str());
135        }
136    }
137
138    void connect(const sp<AMessage> &doneMsg) {
139        mDoneMsg = doneMsg;
140
141        mLooper->registerHandler(this);
142        mLooper->registerHandler(mConn);
143        (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
144
145        sp<AMessage> notify = new AMessage('biny', id());
146        mConn->observeBinaryData(notify);
147
148        sp<AMessage> reply = new AMessage('conn', id());
149        mConn->connect(mOriginalSessionURL.c_str(), reply);
150    }
151
152    void disconnect(const sp<AMessage> &doneMsg) {
153        mDoneMsg = doneMsg;
154
155        (new AMessage('abor', id()))->post();
156    }
157
158    void seek(int64_t timeUs, const sp<AMessage> &doneMsg) {
159        sp<AMessage> msg = new AMessage('seek', id());
160        msg->setInt64("time", timeUs);
161        msg->setMessage("doneMsg", doneMsg);
162        msg->post();
163    }
164
165    int64_t getNormalPlayTimeUs() {
166        int64_t maxTimeUs = 0;
167        for (size_t i = 0; i < mTracks.size(); ++i) {
168            int64_t timeUs = mTracks.editItemAt(i).mPacketSource
169                ->getNormalPlayTimeUs();
170
171            if (i == 0 || timeUs > maxTimeUs) {
172                maxTimeUs = timeUs;
173            }
174        }
175
176        return maxTimeUs;
177    }
178
179    static void addRR(const sp<ABuffer> &buf) {
180        uint8_t *ptr = buf->data() + buf->size();
181        ptr[0] = 0x80 | 0;
182        ptr[1] = 201;  // RR
183        ptr[2] = 0;
184        ptr[3] = 1;
185        ptr[4] = 0xde;  // SSRC
186        ptr[5] = 0xad;
187        ptr[6] = 0xbe;
188        ptr[7] = 0xef;
189
190        buf->setRange(0, buf->size() + 8);
191    }
192
193    static void addSDES(int s, const sp<ABuffer> &buffer) {
194        struct sockaddr_in addr;
195        socklen_t addrSize = sizeof(addr);
196        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
197
198        uint8_t *data = buffer->data() + buffer->size();
199        data[0] = 0x80 | 1;
200        data[1] = 202;  // SDES
201        data[4] = 0xde;  // SSRC
202        data[5] = 0xad;
203        data[6] = 0xbe;
204        data[7] = 0xef;
205
206        size_t offset = 8;
207
208        data[offset++] = 1;  // CNAME
209
210        AString cname = "stagefright@";
211        cname.append(inet_ntoa(addr.sin_addr));
212        data[offset++] = cname.size();
213
214        memcpy(&data[offset], cname.c_str(), cname.size());
215        offset += cname.size();
216
217        data[offset++] = 6;  // TOOL
218
219        AString tool;
220        MakeUserAgentString(&tool);
221
222        data[offset++] = tool.size();
223
224        memcpy(&data[offset], tool.c_str(), tool.size());
225        offset += tool.size();
226
227        data[offset++] = 0;
228
229        if ((offset % 4) > 0) {
230            size_t count = 4 - (offset % 4);
231            switch (count) {
232                case 3:
233                    data[offset++] = 0;
234                case 2:
235                    data[offset++] = 0;
236                case 1:
237                    data[offset++] = 0;
238            }
239        }
240
241        size_t numWords = (offset / 4) - 1;
242        data[2] = numWords >> 8;
243        data[3] = numWords & 0xff;
244
245        buffer->setRange(buffer->offset(), buffer->size() + offset);
246    }
247
248    // In case we're behind NAT, fire off two UDP packets to the remote
249    // rtp/rtcp ports to poke a hole into the firewall for future incoming
250    // packets. We're going to send an RR/SDES RTCP packet to both of them.
251    void pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
252        AString source;
253        AString server_port;
254        if (!GetAttribute(transport.c_str(),
255                          "source",
256                          &source)
257                || !GetAttribute(transport.c_str(),
258                                 "server_port",
259                                 &server_port)) {
260            return;
261        }
262
263        int rtpPort, rtcpPort;
264        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
265                || rtpPort <= 0 || rtpPort > 65535
266                || rtcpPort <=0 || rtcpPort > 65535
267                || rtcpPort != rtpPort + 1
268                || (rtpPort & 1) != 0) {
269            return;
270        }
271
272        struct sockaddr_in addr;
273        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
274        addr.sin_family = AF_INET;
275        addr.sin_addr.s_addr = inet_addr(source.c_str());
276
277        if (addr.sin_addr.s_addr == INADDR_NONE) {
278            return;
279        }
280
281        // Make up an RR/SDES RTCP packet.
282        sp<ABuffer> buf = new ABuffer(65536);
283        buf->setRange(0, 0);
284        addRR(buf);
285        addSDES(rtpSocket, buf);
286
287        addr.sin_port = htons(rtpPort);
288
289        ssize_t n = sendto(
290                rtpSocket, buf->data(), buf->size(), 0,
291                (const sockaddr *)&addr, sizeof(addr));
292        CHECK_EQ(n, (ssize_t)buf->size());
293
294        addr.sin_port = htons(rtcpPort);
295
296        n = sendto(
297                rtcpSocket, buf->data(), buf->size(), 0,
298                (const sockaddr *)&addr, sizeof(addr));
299        CHECK_EQ(n, (ssize_t)buf->size());
300
301        LOGV("successfully poked holes.");
302    }
303
304    virtual void onMessageReceived(const sp<AMessage> &msg) {
305        switch (msg->what()) {
306            case 'conn':
307            {
308                int32_t result;
309                CHECK(msg->findInt32("result", &result));
310
311                LOGI("connection request completed with result %d (%s)",
312                     result, strerror(-result));
313
314                if (result == OK) {
315                    AString request;
316                    request = "DESCRIBE ";
317                    request.append(mSessionURL);
318                    request.append(" RTSP/1.0\r\n");
319                    request.append("Accept: application/sdp\r\n");
320                    request.append("\r\n");
321
322                    sp<AMessage> reply = new AMessage('desc', id());
323                    mConn->sendRequest(request.c_str(), reply);
324                } else {
325                    (new AMessage('disc', id()))->post();
326                }
327                break;
328            }
329
330            case 'disc':
331            {
332                int32_t reconnect;
333                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
334                    sp<AMessage> reply = new AMessage('conn', id());
335                    mConn->connect(mOriginalSessionURL.c_str(), reply);
336                } else {
337                    (new AMessage('quit', id()))->post();
338                }
339                break;
340            }
341
342            case 'desc':
343            {
344                int32_t result;
345                CHECK(msg->findInt32("result", &result));
346
347                LOGI("DESCRIBE completed with result %d (%s)",
348                     result, strerror(-result));
349
350                if (result == OK) {
351                    sp<RefBase> obj;
352                    CHECK(msg->findObject("response", &obj));
353                    sp<ARTSPResponse> response =
354                        static_cast<ARTSPResponse *>(obj.get());
355
356                    if (response->mStatusCode == 302) {
357                        ssize_t i = response->mHeaders.indexOfKey("location");
358                        CHECK_GE(i, 0);
359
360                        mSessionURL = response->mHeaders.valueAt(i);
361
362                        AString request;
363                        request = "DESCRIBE ";
364                        request.append(mSessionURL);
365                        request.append(" RTSP/1.0\r\n");
366                        request.append("Accept: application/sdp\r\n");
367                        request.append("\r\n");
368
369                        sp<AMessage> reply = new AMessage('desc', id());
370                        mConn->sendRequest(request.c_str(), reply);
371                        break;
372                    }
373
374                    if (response->mStatusCode != 200) {
375                        result = UNKNOWN_ERROR;
376                    } else {
377                        mSessionDesc = new ASessionDescription;
378
379                        mSessionDesc->setTo(
380                                response->mContent->data(),
381                                response->mContent->size());
382
383                        if (!mSessionDesc->isValid()) {
384                            result = ERROR_MALFORMED;
385                        } else {
386                            ssize_t i = response->mHeaders.indexOfKey("content-base");
387                            if (i >= 0) {
388                                mBaseURL = response->mHeaders.valueAt(i);
389                            } else {
390                                i = response->mHeaders.indexOfKey("content-location");
391                                if (i >= 0) {
392                                    mBaseURL = response->mHeaders.valueAt(i);
393                                } else {
394                                    mBaseURL = mSessionURL;
395                                }
396                            }
397
398                            CHECK_GT(mSessionDesc->countTracks(), 1u);
399                            setupTrack(1);
400                        }
401                    }
402                }
403
404                if (result != OK) {
405                    sp<AMessage> reply = new AMessage('disc', id());
406                    mConn->disconnect(reply);
407                }
408                break;
409            }
410
411            case 'setu':
412            {
413                size_t index;
414                CHECK(msg->findSize("index", &index));
415
416                TrackInfo *track = NULL;
417                size_t trackIndex;
418                if (msg->findSize("track-index", &trackIndex)) {
419                    track = &mTracks.editItemAt(trackIndex);
420                }
421
422                int32_t result;
423                CHECK(msg->findInt32("result", &result));
424
425                LOGI("SETUP(%d) completed with result %d (%s)",
426                     index, result, strerror(-result));
427
428                if (result == OK) {
429                    CHECK(track != NULL);
430
431                    sp<RefBase> obj;
432                    CHECK(msg->findObject("response", &obj));
433                    sp<ARTSPResponse> response =
434                        static_cast<ARTSPResponse *>(obj.get());
435
436                    if (response->mStatusCode != 200) {
437                        result = UNKNOWN_ERROR;
438                    } else {
439                        ssize_t i = response->mHeaders.indexOfKey("session");
440                        CHECK_GE(i, 0);
441
442                        mSessionID = response->mHeaders.valueAt(i);
443                        i = mSessionID.find(";");
444                        if (i >= 0) {
445                            // Remove options, i.e. ";timeout=90"
446                            mSessionID.erase(i, mSessionID.size() - i);
447                        }
448
449                        sp<AMessage> notify = new AMessage('accu', id());
450                        notify->setSize("track-index", trackIndex);
451
452                        i = response->mHeaders.indexOfKey("transport");
453                        CHECK_GE(i, 0);
454
455                        if (!track->mUsingInterleavedTCP) {
456                            AString transport = response->mHeaders.valueAt(i);
457
458                            pokeAHole(track->mRTPSocket,
459                                      track->mRTCPSocket,
460                                      transport);
461                        }
462
463                        mRTPConn->addStream(
464                                track->mRTPSocket, track->mRTCPSocket,
465                                mSessionDesc, index,
466                                notify, track->mUsingInterleavedTCP);
467
468                        mSetupTracksSuccessful = true;
469                    }
470                }
471
472                if (result != OK) {
473                    if (track) {
474                        if (!track->mUsingInterleavedTCP) {
475                            close(track->mRTPSocket);
476                            close(track->mRTCPSocket);
477                        }
478
479                        mTracks.removeItemsAt(trackIndex);
480                    }
481                }
482
483                ++index;
484                if (index < mSessionDesc->countTracks()) {
485                    setupTrack(index);
486                } else if (mSetupTracksSuccessful) {
487                    AString request = "PLAY ";
488                    request.append(mSessionURL);
489                    request.append(" RTSP/1.0\r\n");
490
491                    request.append("Session: ");
492                    request.append(mSessionID);
493                    request.append("\r\n");
494
495                    request.append("\r\n");
496
497                    sp<AMessage> reply = new AMessage('play', id());
498                    mConn->sendRequest(request.c_str(), reply);
499                } else {
500                    sp<AMessage> reply = new AMessage('disc', id());
501                    mConn->disconnect(reply);
502                }
503                break;
504            }
505
506            case 'play':
507            {
508                int32_t result;
509                CHECK(msg->findInt32("result", &result));
510
511                LOGI("PLAY completed with result %d (%s)",
512                     result, strerror(-result));
513
514                if (result == OK) {
515                    sp<RefBase> obj;
516                    CHECK(msg->findObject("response", &obj));
517                    sp<ARTSPResponse> response =
518                        static_cast<ARTSPResponse *>(obj.get());
519
520                    if (response->mStatusCode != 200) {
521                        result = UNKNOWN_ERROR;
522                    } else {
523                        parsePlayResponse(response);
524
525                        sp<AMessage> timeout = new AMessage('tiou', id());
526                        timeout->post(kStartupTimeoutUs);
527                    }
528                }
529
530                if (result != OK) {
531                    sp<AMessage> reply = new AMessage('disc', id());
532                    mConn->disconnect(reply);
533                }
534
535                break;
536            }
537
538            case 'abor':
539            {
540                for (size_t i = 0; i < mTracks.size(); ++i) {
541                    TrackInfo *info = &mTracks.editItemAt(i);
542
543                    info->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
544
545                    if (!info->mUsingInterleavedTCP) {
546                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
547
548                        close(info->mRTPSocket);
549                        close(info->mRTCPSocket);
550                    }
551                }
552                mTracks.clear();
553                mSetupTracksSuccessful = false;
554                mSeekPending = false;
555                mFirstAccessUnit = true;
556                mNTPAnchorUs = -1;
557                mMediaAnchorUs = -1;
558                mNumAccessUnitsReceived = 0;
559                mReceivedFirstRTCPPacket = false;
560                mReceivedFirstRTPPacket = false;
561                mSeekable = false;
562
563                sp<AMessage> reply = new AMessage('tear', id());
564
565                int32_t reconnect;
566                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
567                    reply->setInt32("reconnect", true);
568                }
569
570                AString request;
571                request = "TEARDOWN ";
572
573                // XXX should use aggregate url from SDP here...
574                request.append(mSessionURL);
575                request.append(" RTSP/1.0\r\n");
576
577                request.append("Session: ");
578                request.append(mSessionID);
579                request.append("\r\n");
580
581                request.append("\r\n");
582
583                mConn->sendRequest(request.c_str(), reply);
584                break;
585            }
586
587            case 'tear':
588            {
589                int32_t result;
590                CHECK(msg->findInt32("result", &result));
591
592                LOGI("TEARDOWN completed with result %d (%s)",
593                     result, strerror(-result));
594
595                sp<AMessage> reply = new AMessage('disc', id());
596
597                int32_t reconnect;
598                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
599                    reply->setInt32("reconnect", true);
600                }
601
602                mConn->disconnect(reply);
603                break;
604            }
605
606            case 'quit':
607            {
608                if (mDoneMsg != NULL) {
609                    mDoneMsg->setInt32("result", UNKNOWN_ERROR);
610                    mDoneMsg->post();
611                    mDoneMsg = NULL;
612                }
613                break;
614            }
615
616            case 'chek':
617            {
618                int32_t generation;
619                CHECK(msg->findInt32("generation", &generation));
620                if (generation != mCheckGeneration) {
621                    // This is an outdated message. Ignore.
622                    break;
623                }
624
625                if (mNumAccessUnitsReceived == 0) {
626                    LOGI("stream ended? aborting.");
627                    (new AMessage('abor', id()))->post();
628                    break;
629                }
630
631                mNumAccessUnitsReceived = 0;
632                msg->post(kAccessUnitTimeoutUs);
633                break;
634            }
635
636            case 'accu':
637            {
638                int32_t timeUpdate;
639                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
640                    size_t trackIndex;
641                    CHECK(msg->findSize("track-index", &trackIndex));
642
643                    uint32_t rtpTime;
644                    uint64_t ntpTime;
645                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
646                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
647
648                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
649                    break;
650                }
651
652                int32_t first;
653                if (msg->findInt32("first-rtcp", &first)) {
654                    mReceivedFirstRTCPPacket = true;
655                    break;
656                }
657
658                if (msg->findInt32("first-rtp", &first)) {
659                    mReceivedFirstRTPPacket = true;
660                    break;
661                }
662
663                ++mNumAccessUnitsReceived;
664                postAccessUnitTimeoutCheck();
665
666                size_t trackIndex;
667                CHECK(msg->findSize("track-index", &trackIndex));
668
669                if (trackIndex >= mTracks.size()) {
670                    LOGV("late packets ignored.");
671                    break;
672                }
673
674                TrackInfo *track = &mTracks.editItemAt(trackIndex);
675
676                int32_t eos;
677                if (msg->findInt32("eos", &eos)) {
678                    LOGI("received BYE on track index %d", trackIndex);
679#if 0
680                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
681#endif
682                    return;
683                }
684
685                sp<RefBase> obj;
686                CHECK(msg->findObject("access-unit", &obj));
687
688                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
689
690                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
691
692                if (mSeekPending) {
693                    LOGV("we're seeking, dropping stale packet.");
694                    break;
695                }
696
697                if (seqNum < track->mFirstSeqNumInSegment) {
698                    LOGV("dropping stale access-unit (%d < %d)",
699                         seqNum, track->mFirstSeqNumInSegment);
700                    break;
701                }
702
703                if (track->mNewSegment) {
704                    track->mNewSegment = false;
705                }
706
707                onAccessUnitComplete(trackIndex, accessUnit);
708                break;
709            }
710
711            case 'seek':
712            {
713                sp<AMessage> doneMsg;
714                CHECK(msg->findMessage("doneMsg", &doneMsg));
715
716                if (mSeekPending) {
717                    doneMsg->post();
718                    break;
719                }
720
721                if (!mSeekable) {
722                    LOGW("This is a live stream, ignoring seek request.");
723                    doneMsg->post();
724                    break;
725                }
726
727                int64_t timeUs;
728                CHECK(msg->findInt64("time", &timeUs));
729
730                mSeekPending = true;
731
732                // Disable the access unit timeout until we resumed
733                // playback again.
734                mCheckPending = true;
735                ++mCheckGeneration;
736
737                AString request = "PAUSE ";
738                request.append(mSessionURL);
739                request.append(" RTSP/1.0\r\n");
740
741                request.append("Session: ");
742                request.append(mSessionID);
743                request.append("\r\n");
744
745                request.append("\r\n");
746
747                sp<AMessage> reply = new AMessage('see1', id());
748                reply->setInt64("time", timeUs);
749                reply->setMessage("doneMsg", doneMsg);
750                mConn->sendRequest(request.c_str(), reply);
751                break;
752            }
753
754            case 'see1':
755            {
756                // Session is paused now.
757                for (size_t i = 0; i < mTracks.size(); ++i) {
758                    TrackInfo *info = &mTracks.editItemAt(i);
759
760                    info->mPacketSource->flushQueue();
761                    info->mRTPAnchor = 0;
762                    info->mNTPAnchorUs = -1;
763                }
764
765                mNTPAnchorUs = -1;
766
767                int64_t timeUs;
768                CHECK(msg->findInt64("time", &timeUs));
769
770                AString request = "PLAY ";
771                request.append(mSessionURL);
772                request.append(" RTSP/1.0\r\n");
773
774                request.append("Session: ");
775                request.append(mSessionID);
776                request.append("\r\n");
777
778                request.append(
779                        StringPrintf(
780                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
781
782                request.append("\r\n");
783
784                sp<AMessage> doneMsg;
785                CHECK(msg->findMessage("doneMsg", &doneMsg));
786
787                sp<AMessage> reply = new AMessage('see2', id());
788                reply->setMessage("doneMsg", doneMsg);
789                mConn->sendRequest(request.c_str(), reply);
790                break;
791            }
792
793            case 'see2':
794            {
795                CHECK(mSeekPending);
796
797                int32_t result;
798                CHECK(msg->findInt32("result", &result));
799
800                LOGI("PLAY completed with result %d (%s)",
801                     result, strerror(-result));
802
803                mCheckPending = false;
804                postAccessUnitTimeoutCheck();
805
806                if (result == OK) {
807                    sp<RefBase> obj;
808                    CHECK(msg->findObject("response", &obj));
809                    sp<ARTSPResponse> response =
810                        static_cast<ARTSPResponse *>(obj.get());
811
812                    if (response->mStatusCode != 200) {
813                        result = UNKNOWN_ERROR;
814                    } else {
815                        parsePlayResponse(response);
816
817                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
818                        CHECK_GE(i, 0);
819
820                        LOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
821
822                        LOGI("seek completed.");
823                    }
824                }
825
826                if (result != OK) {
827                    LOGE("seek failed, aborting.");
828                    (new AMessage('abor', id()))->post();
829                }
830
831                mSeekPending = false;
832
833                sp<AMessage> doneMsg;
834                CHECK(msg->findMessage("doneMsg", &doneMsg));
835
836                doneMsg->post();
837                break;
838            }
839
840            case 'biny':
841            {
842                sp<RefBase> obj;
843                CHECK(msg->findObject("buffer", &obj));
844                sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
845
846                int32_t index;
847                CHECK(buffer->meta()->findInt32("index", &index));
848
849                mRTPConn->injectPacket(index, buffer);
850                break;
851            }
852
853            case 'tiou':
854            {
855                if (!mReceivedFirstRTCPPacket) {
856                    if (mTryFakeRTCP) {
857                        LOGW("Never received any data, disconnecting.");
858                        (new AMessage('abor', id()))->post();
859                    } else if (mTryTCPInterleaving && mReceivedFirstRTPPacket) {
860                        LOGW("We received RTP packets but no RTCP packets, "
861                             "using fake timestamps.");
862
863                        mTryFakeRTCP = true;
864
865                        mReceivedFirstRTCPPacket = true;
866                    } else {
867                        LOGW("Never received any data, switching transports.");
868
869                        mTryTCPInterleaving = true;
870
871                        sp<AMessage> msg = new AMessage('abor', id());
872                        msg->setInt32("reconnect", true);
873                        msg->post();
874                    }
875                }
876                break;
877            }
878
879            default:
880                TRESPASS();
881                break;
882        }
883    }
884
885    void postAccessUnitTimeoutCheck() {
886        if (mCheckPending) {
887            return;
888        }
889
890        mCheckPending = true;
891        sp<AMessage> check = new AMessage('chek', id());
892        check->setInt32("generation", mCheckGeneration);
893        check->post(kAccessUnitTimeoutUs);
894    }
895
896    static void SplitString(
897            const AString &s, const char *separator, List<AString> *items) {
898        items->clear();
899        size_t start = 0;
900        while (start < s.size()) {
901            ssize_t offset = s.find(separator, start);
902
903            if (offset < 0) {
904                items->push_back(AString(s, start, s.size() - start));
905                break;
906            }
907
908            items->push_back(AString(s, start, offset - start));
909            start = offset + strlen(separator);
910        }
911    }
912
913    void parsePlayResponse(const sp<ARTSPResponse> &response) {
914        mSeekable = false;
915
916        ssize_t i = response->mHeaders.indexOfKey("range");
917        if (i < 0) {
918            // Server doesn't even tell use what range it is going to
919            // play, therefore we won't support seeking.
920            return;
921        }
922
923        AString range = response->mHeaders.valueAt(i);
924        LOGV("Range: %s", range.c_str());
925
926        AString val;
927        CHECK(GetAttribute(range.c_str(), "npt", &val));
928
929        float npt1, npt2;
930        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
931            // This is a live stream and therefore not seekable.
932            return;
933        }
934
935        i = response->mHeaders.indexOfKey("rtp-info");
936        CHECK_GE(i, 0);
937
938        AString rtpInfo = response->mHeaders.valueAt(i);
939        List<AString> streamInfos;
940        SplitString(rtpInfo, ",", &streamInfos);
941
942        int n = 1;
943        for (List<AString>::iterator it = streamInfos.begin();
944             it != streamInfos.end(); ++it) {
945            (*it).trim();
946            LOGV("streamInfo[%d] = %s", n, (*it).c_str());
947
948            CHECK(GetAttribute((*it).c_str(), "url", &val));
949
950            size_t trackIndex = 0;
951            while (trackIndex < mTracks.size()
952                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
953                ++trackIndex;
954            }
955            CHECK_LT(trackIndex, mTracks.size());
956
957            CHECK(GetAttribute((*it).c_str(), "seq", &val));
958
959            char *end;
960            unsigned long seq = strtoul(val.c_str(), &end, 10);
961
962            TrackInfo *info = &mTracks.editItemAt(trackIndex);
963            info->mFirstSeqNumInSegment = seq;
964            info->mNewSegment = true;
965
966            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
967
968            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
969
970            LOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
971
972            info->mPacketSource->setNormalPlayTimeMapping(
973                    rtpTime, (int64_t)(npt1 * 1E6));
974
975            ++n;
976        }
977
978        mSeekable = true;
979    }
980
981    sp<APacketSource> getPacketSource(size_t index) {
982        CHECK_GE(index, 0u);
983        CHECK_LT(index, mTracks.size());
984
985        return mTracks.editItemAt(index).mPacketSource;
986    }
987
988    size_t countTracks() const {
989        return mTracks.size();
990    }
991
992private:
993    struct TrackInfo {
994        AString mURL;
995        int mRTPSocket;
996        int mRTCPSocket;
997        bool mUsingInterleavedTCP;
998        uint32_t mFirstSeqNumInSegment;
999        bool mNewSegment;
1000
1001        uint32_t mRTPAnchor;
1002        int64_t mNTPAnchorUs;
1003        int32_t mTimeScale;
1004
1005        sp<APacketSource> mPacketSource;
1006
1007        // Stores packets temporarily while no notion of time
1008        // has been established yet.
1009        List<sp<ABuffer> > mPackets;
1010    };
1011
1012    sp<ALooper> mLooper;
1013    sp<ALooper> mNetLooper;
1014    sp<ARTSPConnection> mConn;
1015    sp<ARTPConnection> mRTPConn;
1016    sp<ASessionDescription> mSessionDesc;
1017    AString mOriginalSessionURL;  // This one still has user:pass@
1018    AString mSessionURL;
1019    AString mBaseURL;
1020    AString mSessionID;
1021    bool mSetupTracksSuccessful;
1022    bool mSeekPending;
1023    bool mFirstAccessUnit;
1024
1025    int64_t mNTPAnchorUs;
1026    int64_t mMediaAnchorUs;
1027    int64_t mLastMediaTimeUs;
1028
1029    int64_t mNumAccessUnitsReceived;
1030    bool mCheckPending;
1031    int32_t mCheckGeneration;
1032    bool mTryTCPInterleaving;
1033    bool mTryFakeRTCP;
1034    bool mReceivedFirstRTCPPacket;
1035    bool mReceivedFirstRTPPacket;
1036    bool mSeekable;
1037
1038    Vector<TrackInfo> mTracks;
1039
1040    sp<AMessage> mDoneMsg;
1041
1042    void setupTrack(size_t index) {
1043        sp<APacketSource> source =
1044            new APacketSource(mSessionDesc, index);
1045
1046        if (source->initCheck() != OK) {
1047            LOGW("Unsupported format. Ignoring track #%d.", index);
1048
1049            sp<AMessage> reply = new AMessage('setu', id());
1050            reply->setSize("index", index);
1051            reply->setInt32("result", ERROR_UNSUPPORTED);
1052            reply->post();
1053            return;
1054        }
1055
1056        AString url;
1057        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1058
1059        AString trackURL;
1060        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1061
1062        mTracks.push(TrackInfo());
1063        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1064        info->mURL = trackURL;
1065        info->mPacketSource = source;
1066        info->mUsingInterleavedTCP = false;
1067        info->mFirstSeqNumInSegment = 0;
1068        info->mNewSegment = true;
1069        info->mRTPAnchor = 0;
1070        info->mNTPAnchorUs = -1;
1071
1072        unsigned long PT;
1073        AString formatDesc;
1074        AString formatParams;
1075        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1076
1077        int32_t timescale;
1078        int32_t numChannels;
1079        ASessionDescription::ParseFormatDesc(
1080                formatDesc.c_str(), &timescale, &numChannels);
1081
1082        info->mTimeScale = timescale;
1083
1084        LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1085
1086        AString request = "SETUP ";
1087        request.append(trackURL);
1088        request.append(" RTSP/1.0\r\n");
1089
1090        if (mTryTCPInterleaving) {
1091            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1092            info->mUsingInterleavedTCP = true;
1093            info->mRTPSocket = interleaveIndex;
1094            info->mRTCPSocket = interleaveIndex + 1;
1095
1096            request.append("Transport: RTP/AVP/TCP;interleaved=");
1097            request.append(interleaveIndex);
1098            request.append("-");
1099            request.append(interleaveIndex + 1);
1100        } else {
1101            unsigned rtpPort;
1102            ARTPConnection::MakePortPair(
1103                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1104
1105            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1106            request.append(rtpPort);
1107            request.append("-");
1108            request.append(rtpPort + 1);
1109        }
1110
1111        request.append("\r\n");
1112
1113        if (index > 1) {
1114            request.append("Session: ");
1115            request.append(mSessionID);
1116            request.append("\r\n");
1117        }
1118
1119        request.append("\r\n");
1120
1121        sp<AMessage> reply = new AMessage('setu', id());
1122        reply->setSize("index", index);
1123        reply->setSize("track-index", mTracks.size() - 1);
1124        mConn->sendRequest(request.c_str(), reply);
1125    }
1126
1127    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1128        out->clear();
1129
1130        if (strncasecmp("rtsp://", baseURL, 7)) {
1131            // Base URL must be absolute
1132            return false;
1133        }
1134
1135        if (!strncasecmp("rtsp://", url, 7)) {
1136            // "url" is already an absolute URL, ignore base URL.
1137            out->setTo(url);
1138            return true;
1139        }
1140
1141        size_t n = strlen(baseURL);
1142        if (baseURL[n - 1] == '/') {
1143            out->setTo(baseURL);
1144            out->append(url);
1145        } else {
1146            const char *slashPos = strrchr(baseURL, '/');
1147
1148            if (slashPos > &baseURL[6]) {
1149                out->setTo(baseURL, slashPos - baseURL);
1150            } else {
1151                out->setTo(baseURL);
1152            }
1153
1154            out->append("/");
1155            out->append(url);
1156        }
1157
1158        return true;
1159    }
1160
1161    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1162        LOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1163             trackIndex, rtpTime, ntpTime);
1164
1165        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1166
1167        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1168
1169        track->mRTPAnchor = rtpTime;
1170        track->mNTPAnchorUs = ntpTimeUs;
1171
1172        if (mNTPAnchorUs < 0) {
1173            mNTPAnchorUs = ntpTimeUs;
1174            mMediaAnchorUs = mLastMediaTimeUs;
1175        }
1176    }
1177
1178    void onAccessUnitComplete(
1179            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1180        LOGV("onAccessUnitComplete track %d", trackIndex);
1181
1182        if (mFirstAccessUnit) {
1183            mDoneMsg->setInt32("result", OK);
1184            mDoneMsg->post();
1185            mDoneMsg = NULL;
1186
1187            mFirstAccessUnit = false;
1188        }
1189
1190        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1191
1192        if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) {
1193            LOGV("storing accessUnit, no time established yet");
1194            track->mPackets.push_back(accessUnit);
1195            return;
1196        }
1197
1198        while (!track->mPackets.empty()) {
1199            sp<ABuffer> accessUnit = *track->mPackets.begin();
1200            track->mPackets.erase(track->mPackets.begin());
1201
1202            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1203                track->mPacketSource->queueAccessUnit(accessUnit);
1204            }
1205        }
1206
1207        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1208            track->mPacketSource->queueAccessUnit(accessUnit);
1209        }
1210    }
1211
1212    bool addMediaTimestamp(
1213            int32_t trackIndex, const TrackInfo *track,
1214            const sp<ABuffer> &accessUnit) {
1215        uint32_t rtpTime;
1216        CHECK(accessUnit->meta()->findInt32(
1217                    "rtp-time", (int32_t *)&rtpTime));
1218
1219        int64_t relRtpTimeUs =
1220            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1221                / track->mTimeScale;
1222
1223        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1224
1225        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1226
1227        if (mediaTimeUs > mLastMediaTimeUs) {
1228            mLastMediaTimeUs = mediaTimeUs;
1229        }
1230
1231        if (mediaTimeUs < 0) {
1232            LOGV("dropping early accessUnit.");
1233            return false;
1234        }
1235
1236        LOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1237             trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1238
1239        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1240
1241        return true;
1242    }
1243
1244
1245    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1246};
1247
1248}  // namespace android
1249
1250#endif  // MY_HANDLER_H_
1251