MyHandler.h revision ea47cb4edea4426b0da7807db10548ddae7104f2
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22#define LOG_TAG "MyHandler"
23#include <utils/Log.h>
24
25#include "APacketSource.h"
26#include "ARTPConnection.h"
27#include "ARTSPConnection.h"
28#include "ASessionDescription.h"
29
30#include <ctype.h>
31#include <cutils/properties.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/ALooper.h>
36#include <media/stagefright/foundation/AMessage.h>
37#include <media/stagefright/MediaErrors.h>
38
39#include <arpa/inet.h>
40#include <sys/socket.h>
41#include <netdb.h>
42
43// If no access units are received within 3 secs, assume that the rtp
44// stream has ended and signal end of stream.
45static int64_t kAccessUnitTimeoutUs = 3000000ll;
46
47// If no access units arrive for the first 10 secs after starting the
48// stream, assume none ever will and signal EOS or switch transports.
49static int64_t kStartupTimeoutUs = 10000000ll;
50
51namespace android {
52
53static void MakeUserAgentString(AString *s) {
54    s->setTo("stagefright/1.1 (Linux;Android ");
55
56#if (PROPERTY_VALUE_MAX < 8)
57#error "PROPERTY_VALUE_MAX must be at least 8"
58#endif
59
60    char value[PROPERTY_VALUE_MAX];
61    property_get("ro.build.version.release", value, "Unknown");
62    s->append(value);
63    s->append(")");
64}
65
66static bool GetAttribute(const char *s, const char *key, AString *value) {
67    value->clear();
68
69    size_t keyLen = strlen(key);
70
71    for (;;) {
72        while (isspace(*s)) {
73            ++s;
74        }
75
76        const char *colonPos = strchr(s, ';');
77
78        size_t len =
79            (colonPos == NULL) ? strlen(s) : colonPos - s;
80
81        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
82            value->setTo(&s[keyLen + 1], len - keyLen - 1);
83            return true;
84        }
85
86        if (colonPos == NULL) {
87            return false;
88        }
89
90        s = colonPos + 1;
91    }
92}
93
94struct MyHandler : public AHandler {
95    MyHandler(const char *url, const sp<ALooper> &looper)
96        : mLooper(looper),
97          mNetLooper(new ALooper),
98          mConn(new ARTSPConnection),
99          mRTPConn(new ARTPConnection),
100          mOriginalSessionURL(url),
101          mSessionURL(url),
102          mSetupTracksSuccessful(false),
103          mSeekPending(false),
104          mFirstAccessUnit(true),
105          mFirstAccessUnitNTP(0),
106          mNumAccessUnitsReceived(0),
107          mCheckPending(false),
108          mCheckGeneration(0),
109          mTryTCPInterleaving(false),
110          mTryFakeRTCP(false),
111          mReceivedFirstRTCPPacket(false),
112          mReceivedFirstRTPPacket(false),
113          mSeekable(false) {
114        mNetLooper->setName("rtsp net");
115        mNetLooper->start(false /* runOnCallingThread */,
116                          false /* canCallJava */,
117                          PRIORITY_HIGHEST);
118
119        // Strip any authentication info from the session url, we don't
120        // want to transmit user/pass in cleartext.
121        AString host, path, user, pass;
122        unsigned port;
123        CHECK(ARTSPConnection::ParseURL(
124                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
125
126        if (user.size() > 0) {
127            mSessionURL.clear();
128            mSessionURL.append("rtsp://");
129            mSessionURL.append(host);
130            mSessionURL.append(":");
131            mSessionURL.append(StringPrintf("%u", port));
132            mSessionURL.append(path);
133
134            LOGI("rewritten session url: '%s'", mSessionURL.c_str());
135        }
136
137        mSessionHost = host;
138    }
139
140    void connect(const sp<AMessage> &doneMsg) {
141        mDoneMsg = doneMsg;
142
143        mLooper->registerHandler(this);
144        mLooper->registerHandler(mConn);
145        (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
146
147        sp<AMessage> notify = new AMessage('biny', id());
148        mConn->observeBinaryData(notify);
149
150        sp<AMessage> reply = new AMessage('conn', id());
151        mConn->connect(mOriginalSessionURL.c_str(), reply);
152    }
153
154    void disconnect(const sp<AMessage> &doneMsg) {
155        mDoneMsg = doneMsg;
156
157        (new AMessage('abor', id()))->post();
158    }
159
160    void seek(int64_t timeUs, const sp<AMessage> &doneMsg) {
161        sp<AMessage> msg = new AMessage('seek', id());
162        msg->setInt64("time", timeUs);
163        msg->setMessage("doneMsg", doneMsg);
164        msg->post();
165    }
166
167    int64_t getNormalPlayTimeUs() {
168        int64_t maxTimeUs = 0;
169        for (size_t i = 0; i < mTracks.size(); ++i) {
170            int64_t timeUs = mTracks.editItemAt(i).mPacketSource
171                ->getNormalPlayTimeUs();
172
173            if (i == 0 || timeUs > maxTimeUs) {
174                maxTimeUs = timeUs;
175            }
176        }
177
178        return maxTimeUs;
179    }
180
181    static void addRR(const sp<ABuffer> &buf) {
182        uint8_t *ptr = buf->data() + buf->size();
183        ptr[0] = 0x80 | 0;
184        ptr[1] = 201;  // RR
185        ptr[2] = 0;
186        ptr[3] = 1;
187        ptr[4] = 0xde;  // SSRC
188        ptr[5] = 0xad;
189        ptr[6] = 0xbe;
190        ptr[7] = 0xef;
191
192        buf->setRange(0, buf->size() + 8);
193    }
194
195    static void addSDES(int s, const sp<ABuffer> &buffer) {
196        struct sockaddr_in addr;
197        socklen_t addrSize = sizeof(addr);
198        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
199
200        uint8_t *data = buffer->data() + buffer->size();
201        data[0] = 0x80 | 1;
202        data[1] = 202;  // SDES
203        data[4] = 0xde;  // SSRC
204        data[5] = 0xad;
205        data[6] = 0xbe;
206        data[7] = 0xef;
207
208        size_t offset = 8;
209
210        data[offset++] = 1;  // CNAME
211
212        AString cname = "stagefright@";
213        cname.append(inet_ntoa(addr.sin_addr));
214        data[offset++] = cname.size();
215
216        memcpy(&data[offset], cname.c_str(), cname.size());
217        offset += cname.size();
218
219        data[offset++] = 6;  // TOOL
220
221        AString tool;
222        MakeUserAgentString(&tool);
223
224        data[offset++] = tool.size();
225
226        memcpy(&data[offset], tool.c_str(), tool.size());
227        offset += tool.size();
228
229        data[offset++] = 0;
230
231        if ((offset % 4) > 0) {
232            size_t count = 4 - (offset % 4);
233            switch (count) {
234                case 3:
235                    data[offset++] = 0;
236                case 2:
237                    data[offset++] = 0;
238                case 1:
239                    data[offset++] = 0;
240            }
241        }
242
243        size_t numWords = (offset / 4) - 1;
244        data[2] = numWords >> 8;
245        data[3] = numWords & 0xff;
246
247        buffer->setRange(buffer->offset(), buffer->size() + offset);
248    }
249
250    // In case we're behind NAT, fire off two UDP packets to the remote
251    // rtp/rtcp ports to poke a hole into the firewall for future incoming
252    // packets. We're going to send an RR/SDES RTCP packet to both of them.
253    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
254        struct sockaddr_in addr;
255        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
256        addr.sin_family = AF_INET;
257
258        AString source;
259        AString server_port;
260        if (!GetAttribute(transport.c_str(),
261                          "source",
262                          &source)) {
263            LOGW("Missing 'source' field in Transport response. Using "
264                 "RTSP endpoint address.");
265
266            struct hostent *ent = gethostbyname(mSessionHost.c_str());
267            if (ent == NULL) {
268                LOGE("Failed to look up address of session host '%s'",
269                     mSessionHost.c_str());
270
271                return false;
272            }
273
274            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
275        } else {
276            addr.sin_addr.s_addr = inet_addr(source.c_str());
277        }
278
279        if (!GetAttribute(transport.c_str(),
280                                 "server_port",
281                                 &server_port)) {
282            LOGI("Missing 'server_port' field in Transport response.");
283            return false;
284        }
285
286        int rtpPort, rtcpPort;
287        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
288                || rtpPort <= 0 || rtpPort > 65535
289                || rtcpPort <=0 || rtcpPort > 65535
290                || rtcpPort != rtpPort + 1) {
291            LOGE("Server picked invalid RTP/RTCP port pair %s,"
292                 " RTP port must be even, RTCP port must be one higher.",
293                 server_port.c_str());
294
295            return false;
296        }
297
298        if (rtpPort & 1) {
299            LOGW("Server picked an odd RTP port, it should've picked an "
300                 "even one, we'll let it pass for now, but this may break "
301                 "in the future.");
302        }
303
304        if (addr.sin_addr.s_addr == INADDR_NONE) {
305            return true;
306        }
307
308        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
309            // No firewalls to traverse on the loopback interface.
310            return true;
311        }
312
313        // Make up an RR/SDES RTCP packet.
314        sp<ABuffer> buf = new ABuffer(65536);
315        buf->setRange(0, 0);
316        addRR(buf);
317        addSDES(rtpSocket, buf);
318
319        addr.sin_port = htons(rtpPort);
320
321        ssize_t n = sendto(
322                rtpSocket, buf->data(), buf->size(), 0,
323                (const sockaddr *)&addr, sizeof(addr));
324
325        if (n < (ssize_t)buf->size()) {
326            LOGE("failed to poke a hole for RTP packets");
327            return false;
328        }
329
330        addr.sin_port = htons(rtcpPort);
331
332        n = sendto(
333                rtcpSocket, buf->data(), buf->size(), 0,
334                (const sockaddr *)&addr, sizeof(addr));
335
336        if (n < (ssize_t)buf->size()) {
337            LOGE("failed to poke a hole for RTCP packets");
338            return false;
339        }
340
341        LOGV("successfully poked holes.");
342
343        return true;
344    }
345
346    virtual void onMessageReceived(const sp<AMessage> &msg) {
347        switch (msg->what()) {
348            case 'conn':
349            {
350                int32_t result;
351                CHECK(msg->findInt32("result", &result));
352
353                LOGI("connection request completed with result %d (%s)",
354                     result, strerror(-result));
355
356                if (result == OK) {
357                    AString request;
358                    request = "DESCRIBE ";
359                    request.append(mSessionURL);
360                    request.append(" RTSP/1.0\r\n");
361                    request.append("Accept: application/sdp\r\n");
362                    request.append("\r\n");
363
364                    sp<AMessage> reply = new AMessage('desc', id());
365                    mConn->sendRequest(request.c_str(), reply);
366                } else {
367                    (new AMessage('disc', id()))->post();
368                }
369                break;
370            }
371
372            case 'disc':
373            {
374                int32_t reconnect;
375                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
376                    sp<AMessage> reply = new AMessage('conn', id());
377                    mConn->connect(mOriginalSessionURL.c_str(), reply);
378                } else {
379                    (new AMessage('quit', id()))->post();
380                }
381                break;
382            }
383
384            case 'desc':
385            {
386                int32_t result;
387                CHECK(msg->findInt32("result", &result));
388
389                LOGI("DESCRIBE completed with result %d (%s)",
390                     result, strerror(-result));
391
392                if (result == OK) {
393                    sp<RefBase> obj;
394                    CHECK(msg->findObject("response", &obj));
395                    sp<ARTSPResponse> response =
396                        static_cast<ARTSPResponse *>(obj.get());
397
398                    if (response->mStatusCode == 302) {
399                        ssize_t i = response->mHeaders.indexOfKey("location");
400                        CHECK_GE(i, 0);
401
402                        mSessionURL = response->mHeaders.valueAt(i);
403
404                        AString request;
405                        request = "DESCRIBE ";
406                        request.append(mSessionURL);
407                        request.append(" RTSP/1.0\r\n");
408                        request.append("Accept: application/sdp\r\n");
409                        request.append("\r\n");
410
411                        sp<AMessage> reply = new AMessage('desc', id());
412                        mConn->sendRequest(request.c_str(), reply);
413                        break;
414                    }
415
416                    if (response->mStatusCode != 200) {
417                        result = UNKNOWN_ERROR;
418                    } else {
419                        mSessionDesc = new ASessionDescription;
420
421                        mSessionDesc->setTo(
422                                response->mContent->data(),
423                                response->mContent->size());
424
425                        if (!mSessionDesc->isValid()) {
426                            LOGE("Failed to parse session description.");
427                            result = ERROR_MALFORMED;
428                        } else {
429                            ssize_t i = response->mHeaders.indexOfKey("content-base");
430                            if (i >= 0) {
431                                mBaseURL = response->mHeaders.valueAt(i);
432                            } else {
433                                i = response->mHeaders.indexOfKey("content-location");
434                                if (i >= 0) {
435                                    mBaseURL = response->mHeaders.valueAt(i);
436                                } else {
437                                    mBaseURL = mSessionURL;
438                                }
439                            }
440
441                            if (!mBaseURL.startsWith("rtsp://")) {
442                                // Some misbehaving servers specify a relative
443                                // URL in one of the locations above, combine
444                                // it with the absolute session URL to get
445                                // something usable...
446
447                                LOGW("Server specified a non-absolute base URL"
448                                     ", combining it with the session URL to "
449                                     "get something usable...");
450
451                                AString tmp;
452                                CHECK(MakeURL(
453                                            mSessionURL.c_str(),
454                                            mBaseURL.c_str(),
455                                            &tmp));
456
457                                mBaseURL = tmp;
458                            }
459
460                            CHECK_GT(mSessionDesc->countTracks(), 1u);
461                            setupTrack(1);
462                        }
463                    }
464                }
465
466                if (result != OK) {
467                    sp<AMessage> reply = new AMessage('disc', id());
468                    mConn->disconnect(reply);
469                }
470                break;
471            }
472
473            case 'setu':
474            {
475                size_t index;
476                CHECK(msg->findSize("index", &index));
477
478                TrackInfo *track = NULL;
479                size_t trackIndex;
480                if (msg->findSize("track-index", &trackIndex)) {
481                    track = &mTracks.editItemAt(trackIndex);
482                }
483
484                int32_t result;
485                CHECK(msg->findInt32("result", &result));
486
487                LOGI("SETUP(%d) completed with result %d (%s)",
488                     index, result, strerror(-result));
489
490                if (result == OK) {
491                    CHECK(track != NULL);
492
493                    sp<RefBase> obj;
494                    CHECK(msg->findObject("response", &obj));
495                    sp<ARTSPResponse> response =
496                        static_cast<ARTSPResponse *>(obj.get());
497
498                    if (response->mStatusCode != 200) {
499                        result = UNKNOWN_ERROR;
500                    } else {
501                        ssize_t i = response->mHeaders.indexOfKey("session");
502                        CHECK_GE(i, 0);
503
504                        mSessionID = response->mHeaders.valueAt(i);
505                        i = mSessionID.find(";");
506                        if (i >= 0) {
507                            // Remove options, i.e. ";timeout=90"
508                            mSessionID.erase(i, mSessionID.size() - i);
509                        }
510
511                        sp<AMessage> notify = new AMessage('accu', id());
512                        notify->setSize("track-index", trackIndex);
513
514                        i = response->mHeaders.indexOfKey("transport");
515                        CHECK_GE(i, 0);
516
517                        if (!track->mUsingInterleavedTCP) {
518                            AString transport = response->mHeaders.valueAt(i);
519
520                            // We are going to continue even if we were
521                            // unable to poke a hole into the firewall...
522                            pokeAHole(
523                                    track->mRTPSocket,
524                                    track->mRTCPSocket,
525                                    transport);
526                        }
527
528                        mRTPConn->addStream(
529                                track->mRTPSocket, track->mRTCPSocket,
530                                mSessionDesc, index,
531                                notify, track->mUsingInterleavedTCP);
532
533                        mSetupTracksSuccessful = true;
534                    }
535                }
536
537                if (result != OK) {
538                    if (track) {
539                        if (!track->mUsingInterleavedTCP) {
540                            close(track->mRTPSocket);
541                            close(track->mRTCPSocket);
542                        }
543
544                        mTracks.removeItemsAt(trackIndex);
545                    }
546                }
547
548                ++index;
549                if (index < mSessionDesc->countTracks()) {
550                    setupTrack(index);
551                } else if (mSetupTracksSuccessful) {
552                    AString request = "PLAY ";
553                    request.append(mSessionURL);
554                    request.append(" RTSP/1.0\r\n");
555
556                    request.append("Session: ");
557                    request.append(mSessionID);
558                    request.append("\r\n");
559
560                    request.append("\r\n");
561
562                    sp<AMessage> reply = new AMessage('play', id());
563                    mConn->sendRequest(request.c_str(), reply);
564                } else {
565                    sp<AMessage> reply = new AMessage('disc', id());
566                    mConn->disconnect(reply);
567                }
568                break;
569            }
570
571            case 'play':
572            {
573                int32_t result;
574                CHECK(msg->findInt32("result", &result));
575
576                LOGI("PLAY completed with result %d (%s)",
577                     result, strerror(-result));
578
579                if (result == OK) {
580                    sp<RefBase> obj;
581                    CHECK(msg->findObject("response", &obj));
582                    sp<ARTSPResponse> response =
583                        static_cast<ARTSPResponse *>(obj.get());
584
585                    if (response->mStatusCode != 200) {
586                        result = UNKNOWN_ERROR;
587                    } else {
588                        parsePlayResponse(response);
589
590                        sp<AMessage> timeout = new AMessage('tiou', id());
591                        timeout->post(kStartupTimeoutUs);
592                    }
593                }
594
595                if (result != OK) {
596                    sp<AMessage> reply = new AMessage('disc', id());
597                    mConn->disconnect(reply);
598                }
599
600                break;
601            }
602
603            case 'abor':
604            {
605                for (size_t i = 0; i < mTracks.size(); ++i) {
606                    TrackInfo *info = &mTracks.editItemAt(i);
607
608                    info->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
609
610                    if (!info->mUsingInterleavedTCP) {
611                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
612
613                        close(info->mRTPSocket);
614                        close(info->mRTCPSocket);
615                    }
616                }
617                mTracks.clear();
618                mSetupTracksSuccessful = false;
619                mSeekPending = false;
620                mFirstAccessUnit = true;
621                mFirstAccessUnitNTP = 0;
622                mNumAccessUnitsReceived = 0;
623                mReceivedFirstRTCPPacket = false;
624                mReceivedFirstRTPPacket = false;
625                mSeekable = false;
626
627                sp<AMessage> reply = new AMessage('tear', id());
628
629                int32_t reconnect;
630                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
631                    reply->setInt32("reconnect", true);
632                }
633
634                AString request;
635                request = "TEARDOWN ";
636
637                // XXX should use aggregate url from SDP here...
638                request.append(mSessionURL);
639                request.append(" RTSP/1.0\r\n");
640
641                request.append("Session: ");
642                request.append(mSessionID);
643                request.append("\r\n");
644
645                request.append("\r\n");
646
647                mConn->sendRequest(request.c_str(), reply);
648                break;
649            }
650
651            case 'tear':
652            {
653                int32_t result;
654                CHECK(msg->findInt32("result", &result));
655
656                LOGI("TEARDOWN completed with result %d (%s)",
657                     result, strerror(-result));
658
659                sp<AMessage> reply = new AMessage('disc', id());
660
661                int32_t reconnect;
662                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
663                    reply->setInt32("reconnect", true);
664                }
665
666                mConn->disconnect(reply);
667                break;
668            }
669
670            case 'quit':
671            {
672                if (mDoneMsg != NULL) {
673                    mDoneMsg->setInt32("result", UNKNOWN_ERROR);
674                    mDoneMsg->post();
675                    mDoneMsg = NULL;
676                }
677                break;
678            }
679
680            case 'chek':
681            {
682                int32_t generation;
683                CHECK(msg->findInt32("generation", &generation));
684                if (generation != mCheckGeneration) {
685                    // This is an outdated message. Ignore.
686                    break;
687                }
688
689                if (mNumAccessUnitsReceived == 0) {
690                    LOGI("stream ended? aborting.");
691                    (new AMessage('abor', id()))->post();
692                    break;
693                }
694
695                mNumAccessUnitsReceived = 0;
696                msg->post(kAccessUnitTimeoutUs);
697                break;
698            }
699
700            case 'accu':
701            {
702                int32_t first;
703                if (msg->findInt32("first-rtcp", &first)) {
704                    mReceivedFirstRTCPPacket = true;
705                    break;
706                }
707
708                if (msg->findInt32("first-rtp", &first)) {
709                    mReceivedFirstRTPPacket = true;
710                    break;
711                }
712
713                ++mNumAccessUnitsReceived;
714                postAccessUnitTimeoutCheck();
715
716                size_t trackIndex;
717                CHECK(msg->findSize("track-index", &trackIndex));
718
719                if (trackIndex >= mTracks.size()) {
720                    LOGV("late packets ignored.");
721                    break;
722                }
723
724                TrackInfo *track = &mTracks.editItemAt(trackIndex);
725
726                int32_t eos;
727                if (msg->findInt32("eos", &eos)) {
728                    LOGI("received BYE on track index %d", trackIndex);
729#if 0
730                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
731#endif
732                    return;
733                }
734
735                sp<RefBase> obj;
736                CHECK(msg->findObject("access-unit", &obj));
737
738                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
739
740                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
741
742                if (mSeekPending) {
743                    LOGV("we're seeking, dropping stale packet.");
744                    break;
745                }
746
747                if (seqNum < track->mFirstSeqNumInSegment) {
748                    LOGV("dropping stale access-unit (%d < %d)",
749                         seqNum, track->mFirstSeqNumInSegment);
750                    break;
751                }
752
753                uint64_t ntpTime;
754                CHECK(accessUnit->meta()->findInt64(
755                            "ntp-time", (int64_t *)&ntpTime));
756
757                uint32_t rtpTime;
758                CHECK(accessUnit->meta()->findInt32(
759                            "rtp-time", (int32_t *)&rtpTime));
760
761                if (track->mNewSegment) {
762                    track->mNewSegment = false;
763
764                    LOGV("first segment unit ntpTime=0x%016llx rtpTime=%u seq=%d",
765                         ntpTime, rtpTime, seqNum);
766                }
767
768                if (mFirstAccessUnit) {
769                    mDoneMsg->setInt32("result", OK);
770                    mDoneMsg->post();
771                    mDoneMsg = NULL;
772
773                    mFirstAccessUnit = false;
774                    mFirstAccessUnitNTP = ntpTime;
775                }
776
777                if (ntpTime >= mFirstAccessUnitNTP) {
778                    ntpTime -= mFirstAccessUnitNTP;
779                } else {
780                    ntpTime = 0;
781                }
782
783                int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
784
785                accessUnit->meta()->setInt64("timeUs", timeUs);
786
787#if 0
788                int32_t damaged;
789                if (accessUnit->meta()->findInt32("damaged", &damaged)
790                        && damaged != 0) {
791                    LOGI("ignoring damaged AU");
792                } else
793#endif
794                {
795                    TrackInfo *track = &mTracks.editItemAt(trackIndex);
796                    track->mPacketSource->queueAccessUnit(accessUnit);
797                }
798                break;
799            }
800
801            case 'seek':
802            {
803                sp<AMessage> doneMsg;
804                CHECK(msg->findMessage("doneMsg", &doneMsg));
805
806                if (mSeekPending) {
807                    doneMsg->post();
808                    break;
809                }
810
811                if (!mSeekable) {
812                    LOGW("This is a live stream, ignoring seek request.");
813                    doneMsg->post();
814                    break;
815                }
816
817                int64_t timeUs;
818                CHECK(msg->findInt64("time", &timeUs));
819
820                mSeekPending = true;
821
822                // Disable the access unit timeout until we resumed
823                // playback again.
824                mCheckPending = true;
825                ++mCheckGeneration;
826
827                AString request = "PAUSE ";
828                request.append(mSessionURL);
829                request.append(" RTSP/1.0\r\n");
830
831                request.append("Session: ");
832                request.append(mSessionID);
833                request.append("\r\n");
834
835                request.append("\r\n");
836
837                sp<AMessage> reply = new AMessage('see1', id());
838                reply->setInt64("time", timeUs);
839                reply->setMessage("doneMsg", doneMsg);
840                mConn->sendRequest(request.c_str(), reply);
841                break;
842            }
843
844            case 'see1':
845            {
846                // Session is paused now.
847                for (size_t i = 0; i < mTracks.size(); ++i) {
848                    mTracks.editItemAt(i).mPacketSource->flushQueue();
849                }
850
851                int64_t timeUs;
852                CHECK(msg->findInt64("time", &timeUs));
853
854                AString request = "PLAY ";
855                request.append(mSessionURL);
856                request.append(" RTSP/1.0\r\n");
857
858                request.append("Session: ");
859                request.append(mSessionID);
860                request.append("\r\n");
861
862                request.append(
863                        StringPrintf(
864                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
865
866                request.append("\r\n");
867
868                sp<AMessage> doneMsg;
869                CHECK(msg->findMessage("doneMsg", &doneMsg));
870
871                sp<AMessage> reply = new AMessage('see2', id());
872                reply->setMessage("doneMsg", doneMsg);
873                mConn->sendRequest(request.c_str(), reply);
874                break;
875            }
876
877            case 'see2':
878            {
879                CHECK(mSeekPending);
880
881                int32_t result;
882                CHECK(msg->findInt32("result", &result));
883
884                LOGI("PLAY completed with result %d (%s)",
885                     result, strerror(-result));
886
887                mCheckPending = false;
888                postAccessUnitTimeoutCheck();
889
890                if (result == OK) {
891                    sp<RefBase> obj;
892                    CHECK(msg->findObject("response", &obj));
893                    sp<ARTSPResponse> response =
894                        static_cast<ARTSPResponse *>(obj.get());
895
896                    if (response->mStatusCode != 200) {
897                        result = UNKNOWN_ERROR;
898                    } else {
899                        parsePlayResponse(response);
900
901                        LOGI("seek completed.");
902                    }
903                }
904
905                if (result != OK) {
906                    LOGE("seek failed, aborting.");
907                    (new AMessage('abor', id()))->post();
908                }
909
910                mSeekPending = false;
911
912                sp<AMessage> doneMsg;
913                CHECK(msg->findMessage("doneMsg", &doneMsg));
914
915                doneMsg->post();
916                break;
917            }
918
919            case 'biny':
920            {
921                sp<RefBase> obj;
922                CHECK(msg->findObject("buffer", &obj));
923                sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
924
925                int32_t index;
926                CHECK(buffer->meta()->findInt32("index", &index));
927
928                mRTPConn->injectPacket(index, buffer);
929                break;
930            }
931
932            case 'tiou':
933            {
934                if (!mReceivedFirstRTCPPacket) {
935                    if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
936                        LOGW("We received RTP packets but no RTCP packets, "
937                             "using fake timestamps.");
938
939                        mTryFakeRTCP = true;
940
941                        mReceivedFirstRTCPPacket = true;
942                        mRTPConn->fakeTimestamps();
943                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
944                        LOGW("Never received any data, switching transports.");
945
946                        mTryTCPInterleaving = true;
947
948                        sp<AMessage> msg = new AMessage('abor', id());
949                        msg->setInt32("reconnect", true);
950                        msg->post();
951                    } else {
952                        LOGW("Never received any data, disconnecting.");
953                        (new AMessage('abor', id()))->post();
954                    }
955                }
956                break;
957            }
958
959            default:
960                TRESPASS();
961                break;
962        }
963    }
964
965    void postAccessUnitTimeoutCheck() {
966        if (mCheckPending) {
967            return;
968        }
969
970        mCheckPending = true;
971        sp<AMessage> check = new AMessage('chek', id());
972        check->setInt32("generation", mCheckGeneration);
973        check->post(kAccessUnitTimeoutUs);
974    }
975
976    static void SplitString(
977            const AString &s, const char *separator, List<AString> *items) {
978        items->clear();
979        size_t start = 0;
980        while (start < s.size()) {
981            ssize_t offset = s.find(separator, start);
982
983            if (offset < 0) {
984                items->push_back(AString(s, start, s.size() - start));
985                break;
986            }
987
988            items->push_back(AString(s, start, offset - start));
989            start = offset + strlen(separator);
990        }
991    }
992
993    void parsePlayResponse(const sp<ARTSPResponse> &response) {
994        mSeekable = false;
995
996        ssize_t i = response->mHeaders.indexOfKey("range");
997        if (i < 0) {
998            // Server doesn't even tell use what range it is going to
999            // play, therefore we won't support seeking.
1000            return;
1001        }
1002
1003        AString range = response->mHeaders.valueAt(i);
1004        LOGV("Range: %s", range.c_str());
1005
1006        AString val;
1007        CHECK(GetAttribute(range.c_str(), "npt", &val));
1008
1009        float npt1, npt2;
1010        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1011            // This is a live stream and therefore not seekable.
1012            return;
1013        }
1014
1015        i = response->mHeaders.indexOfKey("rtp-info");
1016        CHECK_GE(i, 0);
1017
1018        AString rtpInfo = response->mHeaders.valueAt(i);
1019        List<AString> streamInfos;
1020        SplitString(rtpInfo, ",", &streamInfos);
1021
1022        int n = 1;
1023        for (List<AString>::iterator it = streamInfos.begin();
1024             it != streamInfos.end(); ++it) {
1025            (*it).trim();
1026            LOGV("streamInfo[%d] = %s", n, (*it).c_str());
1027
1028            CHECK(GetAttribute((*it).c_str(), "url", &val));
1029
1030            size_t trackIndex = 0;
1031            while (trackIndex < mTracks.size()
1032                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1033                ++trackIndex;
1034            }
1035            CHECK_LT(trackIndex, mTracks.size());
1036
1037            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1038
1039            char *end;
1040            unsigned long seq = strtoul(val.c_str(), &end, 10);
1041
1042            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1043            info->mFirstSeqNumInSegment = seq;
1044            info->mNewSegment = true;
1045
1046            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1047
1048            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1049
1050            LOGV("track #%d: rtpTime=%u <=> ntp=%.2f", n, rtpTime, npt1);
1051
1052            info->mPacketSource->setNormalPlayTimeMapping(
1053                    rtpTime, (int64_t)(npt1 * 1E6));
1054
1055            ++n;
1056        }
1057
1058        mSeekable = true;
1059    }
1060
1061    sp<APacketSource> getPacketSource(size_t index) {
1062        CHECK_GE(index, 0u);
1063        CHECK_LT(index, mTracks.size());
1064
1065        return mTracks.editItemAt(index).mPacketSource;
1066    }
1067
1068    size_t countTracks() const {
1069        return mTracks.size();
1070    }
1071
1072private:
1073    sp<ALooper> mLooper;
1074    sp<ALooper> mNetLooper;
1075    sp<ARTSPConnection> mConn;
1076    sp<ARTPConnection> mRTPConn;
1077    sp<ASessionDescription> mSessionDesc;
1078    AString mOriginalSessionURL;  // This one still has user:pass@
1079    AString mSessionURL;
1080    AString mSessionHost;
1081    AString mBaseURL;
1082    AString mSessionID;
1083    bool mSetupTracksSuccessful;
1084    bool mSeekPending;
1085    bool mFirstAccessUnit;
1086    uint64_t mFirstAccessUnitNTP;
1087    int64_t mNumAccessUnitsReceived;
1088    bool mCheckPending;
1089    int32_t mCheckGeneration;
1090    bool mTryTCPInterleaving;
1091    bool mTryFakeRTCP;
1092    bool mReceivedFirstRTCPPacket;
1093    bool mReceivedFirstRTPPacket;
1094    bool mSeekable;
1095
1096    struct TrackInfo {
1097        AString mURL;
1098        int mRTPSocket;
1099        int mRTCPSocket;
1100        bool mUsingInterleavedTCP;
1101        uint32_t mFirstSeqNumInSegment;
1102        bool mNewSegment;
1103
1104        sp<APacketSource> mPacketSource;
1105    };
1106    Vector<TrackInfo> mTracks;
1107
1108    sp<AMessage> mDoneMsg;
1109
1110    void setupTrack(size_t index) {
1111        sp<APacketSource> source =
1112            new APacketSource(mSessionDesc, index);
1113
1114        if (source->initCheck() != OK) {
1115            LOGW("Unsupported format. Ignoring track #%d.", index);
1116
1117            sp<AMessage> reply = new AMessage('setu', id());
1118            reply->setSize("index", index);
1119            reply->setInt32("result", ERROR_UNSUPPORTED);
1120            reply->post();
1121            return;
1122        }
1123
1124        AString url;
1125        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1126
1127        AString trackURL;
1128        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1129
1130        mTracks.push(TrackInfo());
1131        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1132        info->mURL = trackURL;
1133        info->mPacketSource = source;
1134        info->mUsingInterleavedTCP = false;
1135        info->mFirstSeqNumInSegment = 0;
1136        info->mNewSegment = true;
1137
1138        LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1139
1140        AString request = "SETUP ";
1141        request.append(trackURL);
1142        request.append(" RTSP/1.0\r\n");
1143
1144        if (mTryTCPInterleaving) {
1145            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1146            info->mUsingInterleavedTCP = true;
1147            info->mRTPSocket = interleaveIndex;
1148            info->mRTCPSocket = interleaveIndex + 1;
1149
1150            request.append("Transport: RTP/AVP/TCP;interleaved=");
1151            request.append(interleaveIndex);
1152            request.append("-");
1153            request.append(interleaveIndex + 1);
1154        } else {
1155            unsigned rtpPort;
1156            ARTPConnection::MakePortPair(
1157                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1158
1159            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1160            request.append(rtpPort);
1161            request.append("-");
1162            request.append(rtpPort + 1);
1163        }
1164
1165        request.append("\r\n");
1166
1167        if (index > 1) {
1168            request.append("Session: ");
1169            request.append(mSessionID);
1170            request.append("\r\n");
1171        }
1172
1173        request.append("\r\n");
1174
1175        sp<AMessage> reply = new AMessage('setu', id());
1176        reply->setSize("index", index);
1177        reply->setSize("track-index", mTracks.size() - 1);
1178        mConn->sendRequest(request.c_str(), reply);
1179    }
1180
1181    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1182        out->clear();
1183
1184        if (strncasecmp("rtsp://", baseURL, 7)) {
1185            // Base URL must be absolute
1186            return false;
1187        }
1188
1189        if (!strncasecmp("rtsp://", url, 7)) {
1190            // "url" is already an absolute URL, ignore base URL.
1191            out->setTo(url);
1192            return true;
1193        }
1194
1195        size_t n = strlen(baseURL);
1196        if (baseURL[n - 1] == '/') {
1197            out->setTo(baseURL);
1198            out->append(url);
1199        } else {
1200            char *slashPos = strrchr(baseURL, '/');
1201
1202            if (slashPos > &baseURL[6]) {
1203                out->setTo(baseURL, slashPos - baseURL);
1204            } else {
1205                out->setTo(baseURL);
1206            }
1207
1208            out->append("/");
1209            out->append(url);
1210        }
1211
1212        return true;
1213    }
1214
1215    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1216};
1217
1218}  // namespace android
1219
1220#endif  // MY_HANDLER_H_
1221