MyHandler.h revision a6925e6149faf4a936a5b557a769d117454413d8
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22#define LOG_TAG "MyHandler"
23#include <utils/Log.h>
24
25#include "APacketSource.h"
26#include "ARTPConnection.h"
27#include "ARTSPConnection.h"
28#include "ASessionDescription.h"
29
30#include <ctype.h>
31#include <cutils/properties.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/ALooper.h>
36#include <media/stagefright/foundation/AMessage.h>
37#include <media/stagefright/MediaErrors.h>
38
39#include <arpa/inet.h>
40#include <sys/socket.h>
41#include <netdb.h>
42
43// If no access units are received within 5 secs, assume that the rtp
44// stream has ended and signal end of stream.
45static int64_t kAccessUnitTimeoutUs = 5000000ll;
46
47// If no access units arrive for the first 10 secs after starting the
48// stream, assume none ever will and signal EOS or switch transports.
49static int64_t kStartupTimeoutUs = 10000000ll;
50
51namespace android {
52
53static void MakeUserAgentString(AString *s) {
54    s->setTo("stagefright/1.1 (Linux;Android ");
55
56#if (PROPERTY_VALUE_MAX < 8)
57#error "PROPERTY_VALUE_MAX must be at least 8"
58#endif
59
60    char value[PROPERTY_VALUE_MAX];
61    property_get("ro.build.version.release", value, "Unknown");
62    s->append(value);
63    s->append(")");
64}
65
66static bool GetAttribute(const char *s, const char *key, AString *value) {
67    value->clear();
68
69    size_t keyLen = strlen(key);
70
71    for (;;) {
72        while (isspace(*s)) {
73            ++s;
74        }
75
76        const char *colonPos = strchr(s, ';');
77
78        size_t len =
79            (colonPos == NULL) ? strlen(s) : colonPos - s;
80
81        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
82            value->setTo(&s[keyLen + 1], len - keyLen - 1);
83            return true;
84        }
85
86        if (colonPos == NULL) {
87            return false;
88        }
89
90        s = colonPos + 1;
91    }
92}
93
94struct MyHandler : public AHandler {
95    MyHandler(const char *url, const sp<ALooper> &looper)
96        : mLooper(looper),
97          mNetLooper(new ALooper),
98          mConn(new ARTSPConnection),
99          mRTPConn(new ARTPConnection),
100          mOriginalSessionURL(url),
101          mSessionURL(url),
102          mSetupTracksSuccessful(false),
103          mSeekPending(false),
104          mFirstAccessUnit(true),
105          mNTPAnchorUs(-1),
106          mMediaAnchorUs(-1),
107          mLastMediaTimeUs(0),
108          mNumAccessUnitsReceived(0),
109          mCheckPending(false),
110          mCheckGeneration(0),
111          mTryTCPInterleaving(false),
112          mTryFakeRTCP(false),
113          mReceivedFirstRTCPPacket(false),
114          mReceivedFirstRTPPacket(false),
115          mSeekable(false) {
116        mNetLooper->setName("rtsp net");
117        mNetLooper->start(false /* runOnCallingThread */,
118                          false /* canCallJava */,
119                          PRIORITY_HIGHEST);
120
121        // Strip any authentication info from the session url, we don't
122        // want to transmit user/pass in cleartext.
123        AString host, path, user, pass;
124        unsigned port;
125        CHECK(ARTSPConnection::ParseURL(
126                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
127
128        if (user.size() > 0) {
129            mSessionURL.clear();
130            mSessionURL.append("rtsp://");
131            mSessionURL.append(host);
132            mSessionURL.append(":");
133            mSessionURL.append(StringPrintf("%u", port));
134            mSessionURL.append(path);
135
136            LOGI("rewritten session url: '%s'", mSessionURL.c_str());
137        }
138
139        mSessionHost = host;
140    }
141
142    void connect(const sp<AMessage> &doneMsg) {
143        mDoneMsg = doneMsg;
144
145        mLooper->registerHandler(this);
146        mLooper->registerHandler(mConn);
147        (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
148
149        sp<AMessage> notify = new AMessage('biny', id());
150        mConn->observeBinaryData(notify);
151
152        sp<AMessage> reply = new AMessage('conn', id());
153        mConn->connect(mOriginalSessionURL.c_str(), reply);
154    }
155
156    void disconnect(const sp<AMessage> &doneMsg) {
157        mDoneMsg = doneMsg;
158
159        (new AMessage('abor', id()))->post();
160    }
161
162    void seek(int64_t timeUs, const sp<AMessage> &doneMsg) {
163        sp<AMessage> msg = new AMessage('seek', id());
164        msg->setInt64("time", timeUs);
165        msg->setMessage("doneMsg", doneMsg);
166        msg->post();
167    }
168
169    int64_t getNormalPlayTimeUs() {
170        int64_t maxTimeUs = 0;
171        for (size_t i = 0; i < mTracks.size(); ++i) {
172            int64_t timeUs = mTracks.editItemAt(i).mPacketSource
173                ->getNormalPlayTimeUs();
174
175            if (i == 0 || timeUs > maxTimeUs) {
176                maxTimeUs = timeUs;
177            }
178        }
179
180        return maxTimeUs;
181    }
182
183    static void addRR(const sp<ABuffer> &buf) {
184        uint8_t *ptr = buf->data() + buf->size();
185        ptr[0] = 0x80 | 0;
186        ptr[1] = 201;  // RR
187        ptr[2] = 0;
188        ptr[3] = 1;
189        ptr[4] = 0xde;  // SSRC
190        ptr[5] = 0xad;
191        ptr[6] = 0xbe;
192        ptr[7] = 0xef;
193
194        buf->setRange(0, buf->size() + 8);
195    }
196
197    static void addSDES(int s, const sp<ABuffer> &buffer) {
198        struct sockaddr_in addr;
199        socklen_t addrSize = sizeof(addr);
200        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
201
202        uint8_t *data = buffer->data() + buffer->size();
203        data[0] = 0x80 | 1;
204        data[1] = 202;  // SDES
205        data[4] = 0xde;  // SSRC
206        data[5] = 0xad;
207        data[6] = 0xbe;
208        data[7] = 0xef;
209
210        size_t offset = 8;
211
212        data[offset++] = 1;  // CNAME
213
214        AString cname = "stagefright@";
215        cname.append(inet_ntoa(addr.sin_addr));
216        data[offset++] = cname.size();
217
218        memcpy(&data[offset], cname.c_str(), cname.size());
219        offset += cname.size();
220
221        data[offset++] = 6;  // TOOL
222
223        AString tool;
224        MakeUserAgentString(&tool);
225
226        data[offset++] = tool.size();
227
228        memcpy(&data[offset], tool.c_str(), tool.size());
229        offset += tool.size();
230
231        data[offset++] = 0;
232
233        if ((offset % 4) > 0) {
234            size_t count = 4 - (offset % 4);
235            switch (count) {
236                case 3:
237                    data[offset++] = 0;
238                case 2:
239                    data[offset++] = 0;
240                case 1:
241                    data[offset++] = 0;
242            }
243        }
244
245        size_t numWords = (offset / 4) - 1;
246        data[2] = numWords >> 8;
247        data[3] = numWords & 0xff;
248
249        buffer->setRange(buffer->offset(), buffer->size() + offset);
250    }
251
252    // In case we're behind NAT, fire off two UDP packets to the remote
253    // rtp/rtcp ports to poke a hole into the firewall for future incoming
254    // packets. We're going to send an RR/SDES RTCP packet to both of them.
255    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
256        struct sockaddr_in addr;
257        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
258        addr.sin_family = AF_INET;
259
260        AString source;
261        AString server_port;
262        if (!GetAttribute(transport.c_str(),
263                          "source",
264                          &source)) {
265            LOGW("Missing 'source' field in Transport response. Using "
266                 "RTSP endpoint address.");
267
268            struct hostent *ent = gethostbyname(mSessionHost.c_str());
269            if (ent == NULL) {
270                LOGE("Failed to look up address of session host '%s'",
271                     mSessionHost.c_str());
272
273                return false;
274            }
275
276            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
277        } else {
278            addr.sin_addr.s_addr = inet_addr(source.c_str());
279        }
280
281        if (!GetAttribute(transport.c_str(),
282                                 "server_port",
283                                 &server_port)) {
284            LOGI("Missing 'server_port' field in Transport response.");
285            return false;
286        }
287
288        int rtpPort, rtcpPort;
289        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
290                || rtpPort <= 0 || rtpPort > 65535
291                || rtcpPort <=0 || rtcpPort > 65535
292                || rtcpPort != rtpPort + 1) {
293            LOGE("Server picked invalid RTP/RTCP port pair %s,"
294                 " RTP port must be even, RTCP port must be one higher.",
295                 server_port.c_str());
296
297            return false;
298        }
299
300        if (rtpPort & 1) {
301            LOGW("Server picked an odd RTP port, it should've picked an "
302                 "even one, we'll let it pass for now, but this may break "
303                 "in the future.");
304        }
305
306        if (addr.sin_addr.s_addr == INADDR_NONE) {
307            return true;
308        }
309
310        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
311            // No firewalls to traverse on the loopback interface.
312            return true;
313        }
314
315        // Make up an RR/SDES RTCP packet.
316        sp<ABuffer> buf = new ABuffer(65536);
317        buf->setRange(0, 0);
318        addRR(buf);
319        addSDES(rtpSocket, buf);
320
321        addr.sin_port = htons(rtpPort);
322
323        ssize_t n = sendto(
324                rtpSocket, buf->data(), buf->size(), 0,
325                (const sockaddr *)&addr, sizeof(addr));
326
327        if (n < (ssize_t)buf->size()) {
328            LOGE("failed to poke a hole for RTP packets");
329            return false;
330        }
331
332        addr.sin_port = htons(rtcpPort);
333
334        n = sendto(
335                rtcpSocket, buf->data(), buf->size(), 0,
336                (const sockaddr *)&addr, sizeof(addr));
337
338        if (n < (ssize_t)buf->size()) {
339            LOGE("failed to poke a hole for RTCP packets");
340            return false;
341        }
342
343        LOGV("successfully poked holes.");
344
345        return true;
346    }
347
348    virtual void onMessageReceived(const sp<AMessage> &msg) {
349        switch (msg->what()) {
350            case 'conn':
351            {
352                int32_t result;
353                CHECK(msg->findInt32("result", &result));
354
355                LOGI("connection request completed with result %d (%s)",
356                     result, strerror(-result));
357
358                if (result == OK) {
359                    AString request;
360                    request = "DESCRIBE ";
361                    request.append(mSessionURL);
362                    request.append(" RTSP/1.0\r\n");
363                    request.append("Accept: application/sdp\r\n");
364                    request.append("\r\n");
365
366                    sp<AMessage> reply = new AMessage('desc', id());
367                    mConn->sendRequest(request.c_str(), reply);
368                } else {
369                    (new AMessage('disc', id()))->post();
370                }
371                break;
372            }
373
374            case 'disc':
375            {
376                int32_t reconnect;
377                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
378                    sp<AMessage> reply = new AMessage('conn', id());
379                    mConn->connect(mOriginalSessionURL.c_str(), reply);
380                } else {
381                    (new AMessage('quit', id()))->post();
382                }
383                break;
384            }
385
386            case 'desc':
387            {
388                int32_t result;
389                CHECK(msg->findInt32("result", &result));
390
391                LOGI("DESCRIBE completed with result %d (%s)",
392                     result, strerror(-result));
393
394                if (result == OK) {
395                    sp<RefBase> obj;
396                    CHECK(msg->findObject("response", &obj));
397                    sp<ARTSPResponse> response =
398                        static_cast<ARTSPResponse *>(obj.get());
399
400                    if (response->mStatusCode == 302) {
401                        ssize_t i = response->mHeaders.indexOfKey("location");
402                        CHECK_GE(i, 0);
403
404                        mSessionURL = response->mHeaders.valueAt(i);
405
406                        AString request;
407                        request = "DESCRIBE ";
408                        request.append(mSessionURL);
409                        request.append(" RTSP/1.0\r\n");
410                        request.append("Accept: application/sdp\r\n");
411                        request.append("\r\n");
412
413                        sp<AMessage> reply = new AMessage('desc', id());
414                        mConn->sendRequest(request.c_str(), reply);
415                        break;
416                    }
417
418                    if (response->mStatusCode != 200) {
419                        result = UNKNOWN_ERROR;
420                    } else {
421                        mSessionDesc = new ASessionDescription;
422
423                        mSessionDesc->setTo(
424                                response->mContent->data(),
425                                response->mContent->size());
426
427                        if (!mSessionDesc->isValid()) {
428                            LOGE("Failed to parse session description.");
429                            result = ERROR_MALFORMED;
430                        } else {
431                            ssize_t i = response->mHeaders.indexOfKey("content-base");
432                            if (i >= 0) {
433                                mBaseURL = response->mHeaders.valueAt(i);
434                            } else {
435                                i = response->mHeaders.indexOfKey("content-location");
436                                if (i >= 0) {
437                                    mBaseURL = response->mHeaders.valueAt(i);
438                                } else {
439                                    mBaseURL = mSessionURL;
440                                }
441                            }
442
443                            if (!mBaseURL.startsWith("rtsp://")) {
444                                // Some misbehaving servers specify a relative
445                                // URL in one of the locations above, combine
446                                // it with the absolute session URL to get
447                                // something usable...
448
449                                LOGW("Server specified a non-absolute base URL"
450                                     ", combining it with the session URL to "
451                                     "get something usable...");
452
453                                AString tmp;
454                                CHECK(MakeURL(
455                                            mSessionURL.c_str(),
456                                            mBaseURL.c_str(),
457                                            &tmp));
458
459                                mBaseURL = tmp;
460                            }
461
462                            CHECK_GT(mSessionDesc->countTracks(), 1u);
463                            setupTrack(1);
464                        }
465                    }
466                }
467
468                if (result != OK) {
469                    sp<AMessage> reply = new AMessage('disc', id());
470                    mConn->disconnect(reply);
471                }
472                break;
473            }
474
475            case 'setu':
476            {
477                size_t index;
478                CHECK(msg->findSize("index", &index));
479
480                TrackInfo *track = NULL;
481                size_t trackIndex;
482                if (msg->findSize("track-index", &trackIndex)) {
483                    track = &mTracks.editItemAt(trackIndex);
484                }
485
486                int32_t result;
487                CHECK(msg->findInt32("result", &result));
488
489                LOGI("SETUP(%d) completed with result %d (%s)",
490                     index, result, strerror(-result));
491
492                if (result == OK) {
493                    CHECK(track != NULL);
494
495                    sp<RefBase> obj;
496                    CHECK(msg->findObject("response", &obj));
497                    sp<ARTSPResponse> response =
498                        static_cast<ARTSPResponse *>(obj.get());
499
500                    if (response->mStatusCode != 200) {
501                        result = UNKNOWN_ERROR;
502                    } else {
503                        ssize_t i = response->mHeaders.indexOfKey("session");
504                        CHECK_GE(i, 0);
505
506                        mSessionID = response->mHeaders.valueAt(i);
507                        i = mSessionID.find(";");
508                        if (i >= 0) {
509                            // Remove options, i.e. ";timeout=90"
510                            mSessionID.erase(i, mSessionID.size() - i);
511                        }
512
513                        sp<AMessage> notify = new AMessage('accu', id());
514                        notify->setSize("track-index", trackIndex);
515
516                        i = response->mHeaders.indexOfKey("transport");
517                        CHECK_GE(i, 0);
518
519                        if (!track->mUsingInterleavedTCP) {
520                            AString transport = response->mHeaders.valueAt(i);
521
522                            // We are going to continue even if we were
523                            // unable to poke a hole into the firewall...
524                            pokeAHole(
525                                    track->mRTPSocket,
526                                    track->mRTCPSocket,
527                                    transport);
528                        }
529
530                        mRTPConn->addStream(
531                                track->mRTPSocket, track->mRTCPSocket,
532                                mSessionDesc, index,
533                                notify, track->mUsingInterleavedTCP);
534
535                        mSetupTracksSuccessful = true;
536                    }
537                }
538
539                if (result != OK) {
540                    if (track) {
541                        if (!track->mUsingInterleavedTCP) {
542                            close(track->mRTPSocket);
543                            close(track->mRTCPSocket);
544                        }
545
546                        mTracks.removeItemsAt(trackIndex);
547                    }
548                }
549
550                ++index;
551                if (index < mSessionDesc->countTracks()) {
552                    setupTrack(index);
553                } else if (mSetupTracksSuccessful) {
554                    AString request = "PLAY ";
555                    request.append(mSessionURL);
556                    request.append(" RTSP/1.0\r\n");
557
558                    request.append("Session: ");
559                    request.append(mSessionID);
560                    request.append("\r\n");
561
562                    request.append("\r\n");
563
564                    sp<AMessage> reply = new AMessage('play', id());
565                    mConn->sendRequest(request.c_str(), reply);
566                } else {
567                    sp<AMessage> reply = new AMessage('disc', id());
568                    mConn->disconnect(reply);
569                }
570                break;
571            }
572
573            case 'play':
574            {
575                int32_t result;
576                CHECK(msg->findInt32("result", &result));
577
578                LOGI("PLAY completed with result %d (%s)",
579                     result, strerror(-result));
580
581                if (result == OK) {
582                    sp<RefBase> obj;
583                    CHECK(msg->findObject("response", &obj));
584                    sp<ARTSPResponse> response =
585                        static_cast<ARTSPResponse *>(obj.get());
586
587                    if (response->mStatusCode != 200) {
588                        result = UNKNOWN_ERROR;
589                    } else {
590                        parsePlayResponse(response);
591
592                        sp<AMessage> timeout = new AMessage('tiou', id());
593                        timeout->post(kStartupTimeoutUs);
594                    }
595                }
596
597                if (result != OK) {
598                    sp<AMessage> reply = new AMessage('disc', id());
599                    mConn->disconnect(reply);
600                }
601
602                break;
603            }
604
605            case 'abor':
606            {
607                for (size_t i = 0; i < mTracks.size(); ++i) {
608                    TrackInfo *info = &mTracks.editItemAt(i);
609
610                    info->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
611
612                    if (!info->mUsingInterleavedTCP) {
613                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
614
615                        close(info->mRTPSocket);
616                        close(info->mRTCPSocket);
617                    }
618                }
619                mTracks.clear();
620                mSetupTracksSuccessful = false;
621                mSeekPending = false;
622                mFirstAccessUnit = true;
623                mNTPAnchorUs = -1;
624                mMediaAnchorUs = -1;
625                mNumAccessUnitsReceived = 0;
626                mReceivedFirstRTCPPacket = false;
627                mReceivedFirstRTPPacket = false;
628                mSeekable = false;
629
630                sp<AMessage> reply = new AMessage('tear', id());
631
632                int32_t reconnect;
633                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
634                    reply->setInt32("reconnect", true);
635                }
636
637                AString request;
638                request = "TEARDOWN ";
639
640                // XXX should use aggregate url from SDP here...
641                request.append(mSessionURL);
642                request.append(" RTSP/1.0\r\n");
643
644                request.append("Session: ");
645                request.append(mSessionID);
646                request.append("\r\n");
647
648                request.append("\r\n");
649
650                mConn->sendRequest(request.c_str(), reply);
651                break;
652            }
653
654            case 'tear':
655            {
656                int32_t result;
657                CHECK(msg->findInt32("result", &result));
658
659                LOGI("TEARDOWN completed with result %d (%s)",
660                     result, strerror(-result));
661
662                sp<AMessage> reply = new AMessage('disc', id());
663
664                int32_t reconnect;
665                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
666                    reply->setInt32("reconnect", true);
667                }
668
669                mConn->disconnect(reply);
670                break;
671            }
672
673            case 'quit':
674            {
675                if (mDoneMsg != NULL) {
676                    mDoneMsg->setInt32("result", UNKNOWN_ERROR);
677                    mDoneMsg->post();
678                    mDoneMsg = NULL;
679                }
680                break;
681            }
682
683            case 'chek':
684            {
685                int32_t generation;
686                CHECK(msg->findInt32("generation", &generation));
687                if (generation != mCheckGeneration) {
688                    // This is an outdated message. Ignore.
689                    break;
690                }
691
692                if (mNumAccessUnitsReceived == 0) {
693                    LOGI("stream ended? aborting.");
694                    (new AMessage('abor', id()))->post();
695                    break;
696                }
697
698                mNumAccessUnitsReceived = 0;
699                msg->post(kAccessUnitTimeoutUs);
700                break;
701            }
702
703            case 'accu':
704            {
705                int32_t timeUpdate;
706                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
707                    size_t trackIndex;
708                    CHECK(msg->findSize("track-index", &trackIndex));
709
710                    uint32_t rtpTime;
711                    uint64_t ntpTime;
712                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
713                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
714
715                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
716                    break;
717                }
718
719                int32_t first;
720                if (msg->findInt32("first-rtcp", &first)) {
721                    mReceivedFirstRTCPPacket = true;
722                    break;
723                }
724
725                if (msg->findInt32("first-rtp", &first)) {
726                    mReceivedFirstRTPPacket = true;
727                    break;
728                }
729
730                ++mNumAccessUnitsReceived;
731                postAccessUnitTimeoutCheck();
732
733                size_t trackIndex;
734                CHECK(msg->findSize("track-index", &trackIndex));
735
736                if (trackIndex >= mTracks.size()) {
737                    LOGV("late packets ignored.");
738                    break;
739                }
740
741                TrackInfo *track = &mTracks.editItemAt(trackIndex);
742
743                int32_t eos;
744                if (msg->findInt32("eos", &eos)) {
745                    LOGI("received BYE on track index %d", trackIndex);
746#if 0
747                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
748#endif
749                    return;
750                }
751
752                sp<RefBase> obj;
753                CHECK(msg->findObject("access-unit", &obj));
754
755                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
756
757                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
758
759                if (mSeekPending) {
760                    LOGV("we're seeking, dropping stale packet.");
761                    break;
762                }
763
764                if (seqNum < track->mFirstSeqNumInSegment) {
765                    LOGV("dropping stale access-unit (%d < %d)",
766                         seqNum, track->mFirstSeqNumInSegment);
767                    break;
768                }
769
770                if (track->mNewSegment) {
771                    track->mNewSegment = false;
772                }
773
774                onAccessUnitComplete(trackIndex, accessUnit);
775                break;
776            }
777
778            case 'seek':
779            {
780                sp<AMessage> doneMsg;
781                CHECK(msg->findMessage("doneMsg", &doneMsg));
782
783                if (mSeekPending) {
784                    doneMsg->post();
785                    break;
786                }
787
788                if (!mSeekable) {
789                    LOGW("This is a live stream, ignoring seek request.");
790                    doneMsg->post();
791                    break;
792                }
793
794                int64_t timeUs;
795                CHECK(msg->findInt64("time", &timeUs));
796
797                mSeekPending = true;
798
799                // Disable the access unit timeout until we resumed
800                // playback again.
801                mCheckPending = true;
802                ++mCheckGeneration;
803
804                AString request = "PAUSE ";
805                request.append(mSessionURL);
806                request.append(" RTSP/1.0\r\n");
807
808                request.append("Session: ");
809                request.append(mSessionID);
810                request.append("\r\n");
811
812                request.append("\r\n");
813
814                sp<AMessage> reply = new AMessage('see1', id());
815                reply->setInt64("time", timeUs);
816                reply->setMessage("doneMsg", doneMsg);
817                mConn->sendRequest(request.c_str(), reply);
818                break;
819            }
820
821            case 'see1':
822            {
823                // Session is paused now.
824                for (size_t i = 0; i < mTracks.size(); ++i) {
825                    TrackInfo *info = &mTracks.editItemAt(i);
826
827                    info->mPacketSource->flushQueue();
828                    info->mRTPAnchor = 0;
829                    info->mNTPAnchorUs = -1;
830                }
831
832                mNTPAnchorUs = -1;
833
834                int64_t timeUs;
835                CHECK(msg->findInt64("time", &timeUs));
836
837                AString request = "PLAY ";
838                request.append(mSessionURL);
839                request.append(" RTSP/1.0\r\n");
840
841                request.append("Session: ");
842                request.append(mSessionID);
843                request.append("\r\n");
844
845                request.append(
846                        StringPrintf(
847                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
848
849                request.append("\r\n");
850
851                sp<AMessage> doneMsg;
852                CHECK(msg->findMessage("doneMsg", &doneMsg));
853
854                sp<AMessage> reply = new AMessage('see2', id());
855                reply->setMessage("doneMsg", doneMsg);
856                mConn->sendRequest(request.c_str(), reply);
857                break;
858            }
859
860            case 'see2':
861            {
862                CHECK(mSeekPending);
863
864                int32_t result;
865                CHECK(msg->findInt32("result", &result));
866
867                LOGI("PLAY completed with result %d (%s)",
868                     result, strerror(-result));
869
870                mCheckPending = false;
871                postAccessUnitTimeoutCheck();
872
873                if (result == OK) {
874                    sp<RefBase> obj;
875                    CHECK(msg->findObject("response", &obj));
876                    sp<ARTSPResponse> response =
877                        static_cast<ARTSPResponse *>(obj.get());
878
879                    if (response->mStatusCode != 200) {
880                        result = UNKNOWN_ERROR;
881                    } else {
882                        parsePlayResponse(response);
883
884                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
885                        CHECK_GE(i, 0);
886
887                        LOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
888
889                        LOGI("seek completed.");
890                    }
891                }
892
893                if (result != OK) {
894                    LOGE("seek failed, aborting.");
895                    (new AMessage('abor', id()))->post();
896                }
897
898                mSeekPending = false;
899
900                sp<AMessage> doneMsg;
901                CHECK(msg->findMessage("doneMsg", &doneMsg));
902
903                doneMsg->post();
904                break;
905            }
906
907            case 'biny':
908            {
909                sp<RefBase> obj;
910                CHECK(msg->findObject("buffer", &obj));
911                sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
912
913                int32_t index;
914                CHECK(buffer->meta()->findInt32("index", &index));
915
916                mRTPConn->injectPacket(index, buffer);
917                break;
918            }
919
920            case 'tiou':
921            {
922                if (!mReceivedFirstRTCPPacket) {
923                    if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
924                        LOGW("We received RTP packets but no RTCP packets, "
925                             "using fake timestamps.");
926
927                        mTryFakeRTCP = true;
928
929                        mReceivedFirstRTCPPacket = true;
930
931                        fakeTimestamps();
932                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
933                        LOGW("Never received any data, switching transports.");
934
935                        mTryTCPInterleaving = true;
936
937                        sp<AMessage> msg = new AMessage('abor', id());
938                        msg->setInt32("reconnect", true);
939                        msg->post();
940                    } else {
941                        LOGW("Never received any data, disconnecting.");
942                        (new AMessage('abor', id()))->post();
943                    }
944                }
945                break;
946            }
947
948            default:
949                TRESPASS();
950                break;
951        }
952    }
953
954    void postAccessUnitTimeoutCheck() {
955        if (mCheckPending) {
956            return;
957        }
958
959        mCheckPending = true;
960        sp<AMessage> check = new AMessage('chek', id());
961        check->setInt32("generation", mCheckGeneration);
962        check->post(kAccessUnitTimeoutUs);
963    }
964
965    static void SplitString(
966            const AString &s, const char *separator, List<AString> *items) {
967        items->clear();
968        size_t start = 0;
969        while (start < s.size()) {
970            ssize_t offset = s.find(separator, start);
971
972            if (offset < 0) {
973                items->push_back(AString(s, start, s.size() - start));
974                break;
975            }
976
977            items->push_back(AString(s, start, offset - start));
978            start = offset + strlen(separator);
979        }
980    }
981
982    void parsePlayResponse(const sp<ARTSPResponse> &response) {
983        mSeekable = false;
984
985        ssize_t i = response->mHeaders.indexOfKey("range");
986        if (i < 0) {
987            // Server doesn't even tell use what range it is going to
988            // play, therefore we won't support seeking.
989            return;
990        }
991
992        AString range = response->mHeaders.valueAt(i);
993        LOGV("Range: %s", range.c_str());
994
995        AString val;
996        CHECK(GetAttribute(range.c_str(), "npt", &val));
997
998        bool seekable = true;
999
1000        float npt1, npt2;
1001        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1002            // This is a live stream and therefore not seekable.
1003            seekable = false;
1004        }
1005
1006        i = response->mHeaders.indexOfKey("rtp-info");
1007        CHECK_GE(i, 0);
1008
1009        AString rtpInfo = response->mHeaders.valueAt(i);
1010        List<AString> streamInfos;
1011        SplitString(rtpInfo, ",", &streamInfos);
1012
1013        int n = 1;
1014        for (List<AString>::iterator it = streamInfos.begin();
1015             it != streamInfos.end(); ++it) {
1016            (*it).trim();
1017            LOGV("streamInfo[%d] = %s", n, (*it).c_str());
1018
1019            CHECK(GetAttribute((*it).c_str(), "url", &val));
1020
1021            size_t trackIndex = 0;
1022            while (trackIndex < mTracks.size()
1023                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1024                ++trackIndex;
1025            }
1026            CHECK_LT(trackIndex, mTracks.size());
1027
1028            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1029
1030            char *end;
1031            unsigned long seq = strtoul(val.c_str(), &end, 10);
1032
1033            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1034            info->mFirstSeqNumInSegment = seq;
1035            info->mNewSegment = true;
1036
1037            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1038
1039            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1040
1041            LOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1042
1043            info->mPacketSource->setNormalPlayTimeMapping(
1044                    rtpTime, (int64_t)(npt1 * 1E6));
1045
1046            ++n;
1047        }
1048
1049        mSeekable = seekable;
1050    }
1051
1052    sp<APacketSource> getPacketSource(size_t index) {
1053        CHECK_GE(index, 0u);
1054        CHECK_LT(index, mTracks.size());
1055
1056        return mTracks.editItemAt(index).mPacketSource;
1057    }
1058
1059    size_t countTracks() const {
1060        return mTracks.size();
1061    }
1062
1063private:
1064    struct TrackInfo {
1065        AString mURL;
1066        int mRTPSocket;
1067        int mRTCPSocket;
1068        bool mUsingInterleavedTCP;
1069        uint32_t mFirstSeqNumInSegment;
1070        bool mNewSegment;
1071
1072        uint32_t mRTPAnchor;
1073        int64_t mNTPAnchorUs;
1074        int32_t mTimeScale;
1075
1076        sp<APacketSource> mPacketSource;
1077
1078        // Stores packets temporarily while no notion of time
1079        // has been established yet.
1080        List<sp<ABuffer> > mPackets;
1081    };
1082
1083    sp<ALooper> mLooper;
1084    sp<ALooper> mNetLooper;
1085    sp<ARTSPConnection> mConn;
1086    sp<ARTPConnection> mRTPConn;
1087    sp<ASessionDescription> mSessionDesc;
1088    AString mOriginalSessionURL;  // This one still has user:pass@
1089    AString mSessionURL;
1090    AString mSessionHost;
1091    AString mBaseURL;
1092    AString mSessionID;
1093    bool mSetupTracksSuccessful;
1094    bool mSeekPending;
1095    bool mFirstAccessUnit;
1096
1097    int64_t mNTPAnchorUs;
1098    int64_t mMediaAnchorUs;
1099    int64_t mLastMediaTimeUs;
1100
1101    int64_t mNumAccessUnitsReceived;
1102    bool mCheckPending;
1103    int32_t mCheckGeneration;
1104    bool mTryTCPInterleaving;
1105    bool mTryFakeRTCP;
1106    bool mReceivedFirstRTCPPacket;
1107    bool mReceivedFirstRTPPacket;
1108    bool mSeekable;
1109
1110    Vector<TrackInfo> mTracks;
1111
1112    sp<AMessage> mDoneMsg;
1113
1114    void setupTrack(size_t index) {
1115        sp<APacketSource> source =
1116            new APacketSource(mSessionDesc, index);
1117
1118        if (source->initCheck() != OK) {
1119            LOGW("Unsupported format. Ignoring track #%d.", index);
1120
1121            sp<AMessage> reply = new AMessage('setu', id());
1122            reply->setSize("index", index);
1123            reply->setInt32("result", ERROR_UNSUPPORTED);
1124            reply->post();
1125            return;
1126        }
1127
1128        AString url;
1129        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1130
1131        AString trackURL;
1132        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1133
1134        mTracks.push(TrackInfo());
1135        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1136        info->mURL = trackURL;
1137        info->mPacketSource = source;
1138        info->mUsingInterleavedTCP = false;
1139        info->mFirstSeqNumInSegment = 0;
1140        info->mNewSegment = true;
1141        info->mRTPAnchor = 0;
1142        info->mNTPAnchorUs = -1;
1143
1144        unsigned long PT;
1145        AString formatDesc;
1146        AString formatParams;
1147        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1148
1149        int32_t timescale;
1150        int32_t numChannels;
1151        ASessionDescription::ParseFormatDesc(
1152                formatDesc.c_str(), &timescale, &numChannels);
1153
1154        info->mTimeScale = timescale;
1155
1156        LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1157
1158        AString request = "SETUP ";
1159        request.append(trackURL);
1160        request.append(" RTSP/1.0\r\n");
1161
1162        if (mTryTCPInterleaving) {
1163            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1164            info->mUsingInterleavedTCP = true;
1165            info->mRTPSocket = interleaveIndex;
1166            info->mRTCPSocket = interleaveIndex + 1;
1167
1168            request.append("Transport: RTP/AVP/TCP;interleaved=");
1169            request.append(interleaveIndex);
1170            request.append("-");
1171            request.append(interleaveIndex + 1);
1172        } else {
1173            unsigned rtpPort;
1174            ARTPConnection::MakePortPair(
1175                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1176
1177            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1178            request.append(rtpPort);
1179            request.append("-");
1180            request.append(rtpPort + 1);
1181        }
1182
1183        request.append("\r\n");
1184
1185        if (index > 1) {
1186            request.append("Session: ");
1187            request.append(mSessionID);
1188            request.append("\r\n");
1189        }
1190
1191        request.append("\r\n");
1192
1193        sp<AMessage> reply = new AMessage('setu', id());
1194        reply->setSize("index", index);
1195        reply->setSize("track-index", mTracks.size() - 1);
1196        mConn->sendRequest(request.c_str(), reply);
1197    }
1198
1199    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1200        out->clear();
1201
1202        if (strncasecmp("rtsp://", baseURL, 7)) {
1203            // Base URL must be absolute
1204            return false;
1205        }
1206
1207        if (!strncasecmp("rtsp://", url, 7)) {
1208            // "url" is already an absolute URL, ignore base URL.
1209            out->setTo(url);
1210            return true;
1211        }
1212
1213        size_t n = strlen(baseURL);
1214        if (baseURL[n - 1] == '/') {
1215            out->setTo(baseURL);
1216            out->append(url);
1217        } else {
1218            const char *slashPos = strrchr(baseURL, '/');
1219
1220            if (slashPos > &baseURL[6]) {
1221                out->setTo(baseURL, slashPos - baseURL);
1222            } else {
1223                out->setTo(baseURL);
1224            }
1225
1226            out->append("/");
1227            out->append(url);
1228        }
1229
1230        return true;
1231    }
1232
1233    void fakeTimestamps() {
1234        for (size_t i = 0; i < mTracks.size(); ++i) {
1235            onTimeUpdate(i, 0, 0ll);
1236        }
1237    }
1238
1239    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1240        LOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1241             trackIndex, rtpTime, ntpTime);
1242
1243        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1244
1245        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1246
1247        track->mRTPAnchor = rtpTime;
1248        track->mNTPAnchorUs = ntpTimeUs;
1249
1250        if (mNTPAnchorUs < 0) {
1251            mNTPAnchorUs = ntpTimeUs;
1252            mMediaAnchorUs = mLastMediaTimeUs;
1253        }
1254    }
1255
1256    void onAccessUnitComplete(
1257            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1258        LOGV("onAccessUnitComplete track %d", trackIndex);
1259
1260        if (mFirstAccessUnit) {
1261            mDoneMsg->setInt32("result", OK);
1262            mDoneMsg->post();
1263            mDoneMsg = NULL;
1264
1265            mFirstAccessUnit = false;
1266        }
1267
1268        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1269
1270        if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) {
1271            LOGV("storing accessUnit, no time established yet");
1272            track->mPackets.push_back(accessUnit);
1273            return;
1274        }
1275
1276        while (!track->mPackets.empty()) {
1277            sp<ABuffer> accessUnit = *track->mPackets.begin();
1278            track->mPackets.erase(track->mPackets.begin());
1279
1280            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1281                track->mPacketSource->queueAccessUnit(accessUnit);
1282            }
1283        }
1284
1285        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1286            track->mPacketSource->queueAccessUnit(accessUnit);
1287        }
1288    }
1289
1290    bool addMediaTimestamp(
1291            int32_t trackIndex, const TrackInfo *track,
1292            const sp<ABuffer> &accessUnit) {
1293        uint32_t rtpTime;
1294        CHECK(accessUnit->meta()->findInt32(
1295                    "rtp-time", (int32_t *)&rtpTime));
1296
1297        int64_t relRtpTimeUs =
1298            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1299                / track->mTimeScale;
1300
1301        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1302
1303        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1304
1305        if (mediaTimeUs > mLastMediaTimeUs) {
1306            mLastMediaTimeUs = mediaTimeUs;
1307        }
1308
1309        if (mediaTimeUs < 0) {
1310            LOGV("dropping early accessUnit.");
1311            return false;
1312        }
1313
1314        LOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1315             trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1316
1317        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1318
1319        return true;
1320    }
1321
1322
1323    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1324};
1325
1326}  // namespace android
1327
1328#endif  // MY_HANDLER_H_
1329