MyHandler.h revision 4579b7d49f6dd4f37e6043e59debfd72d69b8e7b
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22#define LOG_TAG "MyHandler"
23#include <utils/Log.h>
24
25#include "APacketSource.h"
26#include "ARTPConnection.h"
27#include "ARTSPConnection.h"
28#include "ASessionDescription.h"
29
30#include <ctype.h>
31#include <cutils/properties.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/ALooper.h>
36#include <media/stagefright/foundation/AMessage.h>
37#include <media/stagefright/MediaErrors.h>
38
39#include <arpa/inet.h>
40#include <sys/socket.h>
41
42// If no access units are received within 3 secs, assume that the rtp
43// stream has ended and signal end of stream.
44static int64_t kAccessUnitTimeoutUs = 3000000ll;
45
46// If no access units arrive for the first 10 secs after starting the
47// stream, assume none ever will and signal EOS or switch transports.
48static int64_t kStartupTimeoutUs = 10000000ll;
49
50namespace android {
51
52static void MakeUserAgentString(AString *s) {
53    s->setTo("stagefright/1.1 (Linux;Android ");
54
55#if (PROPERTY_VALUE_MAX < 8)
56#error "PROPERTY_VALUE_MAX must be at least 8"
57#endif
58
59    char value[PROPERTY_VALUE_MAX];
60    property_get("ro.build.version.release", value, "Unknown");
61    s->append(value);
62    s->append(")");
63}
64
65static bool GetAttribute(const char *s, const char *key, AString *value) {
66    value->clear();
67
68    size_t keyLen = strlen(key);
69
70    for (;;) {
71        while (isspace(*s)) {
72            ++s;
73        }
74
75        const char *colonPos = strchr(s, ';');
76
77        size_t len =
78            (colonPos == NULL) ? strlen(s) : colonPos - s;
79
80        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
81            value->setTo(&s[keyLen + 1], len - keyLen - 1);
82            return true;
83        }
84
85        if (colonPos == NULL) {
86            return false;
87        }
88
89        s = colonPos + 1;
90    }
91}
92
93struct MyHandler : public AHandler {
94    MyHandler(const char *url, const sp<ALooper> &looper)
95        : mLooper(looper),
96          mNetLooper(new ALooper),
97          mConn(new ARTSPConnection),
98          mRTPConn(new ARTPConnection),
99          mOriginalSessionURL(url),
100          mSessionURL(url),
101          mSetupTracksSuccessful(false),
102          mSeekPending(false),
103          mFirstAccessUnit(true),
104          mFirstAccessUnitNTP(0),
105          mNumAccessUnitsReceived(0),
106          mCheckPending(false),
107          mCheckGeneration(0),
108          mTryTCPInterleaving(false),
109          mTryFakeRTCP(false),
110          mReceivedFirstRTCPPacket(false),
111          mReceivedFirstRTPPacket(false),
112          mSeekable(false) {
113        mNetLooper->setName("rtsp net");
114        mNetLooper->start(false /* runOnCallingThread */,
115                          false /* canCallJava */,
116                          PRIORITY_HIGHEST);
117
118        // Strip any authentication info from the session url, we don't
119        // want to transmit user/pass in cleartext.
120        AString host, path, user, pass;
121        unsigned port;
122        if (ARTSPConnection::ParseURL(
123                    mSessionURL.c_str(), &host, &port, &path, &user, &pass)
124                && user.size() > 0) {
125            mSessionURL.clear();
126            mSessionURL.append("rtsp://");
127            mSessionURL.append(host);
128            mSessionURL.append(":");
129            mSessionURL.append(StringPrintf("%u", port));
130            mSessionURL.append(path);
131
132            LOGI("rewritten session url: '%s'", mSessionURL.c_str());
133        }
134    }
135
136    void connect(const sp<AMessage> &doneMsg) {
137        mDoneMsg = doneMsg;
138
139        mLooper->registerHandler(this);
140        mLooper->registerHandler(mConn);
141        (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
142
143        sp<AMessage> notify = new AMessage('biny', id());
144        mConn->observeBinaryData(notify);
145
146        sp<AMessage> reply = new AMessage('conn', id());
147        mConn->connect(mOriginalSessionURL.c_str(), reply);
148    }
149
150    void disconnect(const sp<AMessage> &doneMsg) {
151        mDoneMsg = doneMsg;
152
153        (new AMessage('abor', id()))->post();
154    }
155
156    void seek(int64_t timeUs, const sp<AMessage> &doneMsg) {
157        sp<AMessage> msg = new AMessage('seek', id());
158        msg->setInt64("time", timeUs);
159        msg->setMessage("doneMsg", doneMsg);
160        msg->post();
161    }
162
163    int64_t getNormalPlayTimeUs() {
164        int64_t maxTimeUs = 0;
165        for (size_t i = 0; i < mTracks.size(); ++i) {
166            int64_t timeUs = mTracks.editItemAt(i).mPacketSource
167                ->getNormalPlayTimeUs();
168
169            if (i == 0 || timeUs > maxTimeUs) {
170                maxTimeUs = timeUs;
171            }
172        }
173
174        return maxTimeUs;
175    }
176
177    static void addRR(const sp<ABuffer> &buf) {
178        uint8_t *ptr = buf->data() + buf->size();
179        ptr[0] = 0x80 | 0;
180        ptr[1] = 201;  // RR
181        ptr[2] = 0;
182        ptr[3] = 1;
183        ptr[4] = 0xde;  // SSRC
184        ptr[5] = 0xad;
185        ptr[6] = 0xbe;
186        ptr[7] = 0xef;
187
188        buf->setRange(0, buf->size() + 8);
189    }
190
191    static void addSDES(int s, const sp<ABuffer> &buffer) {
192        struct sockaddr_in addr;
193        socklen_t addrSize = sizeof(addr);
194        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
195
196        uint8_t *data = buffer->data() + buffer->size();
197        data[0] = 0x80 | 1;
198        data[1] = 202;  // SDES
199        data[4] = 0xde;  // SSRC
200        data[5] = 0xad;
201        data[6] = 0xbe;
202        data[7] = 0xef;
203
204        size_t offset = 8;
205
206        data[offset++] = 1;  // CNAME
207
208        AString cname = "stagefright@";
209        cname.append(inet_ntoa(addr.sin_addr));
210        data[offset++] = cname.size();
211
212        memcpy(&data[offset], cname.c_str(), cname.size());
213        offset += cname.size();
214
215        data[offset++] = 6;  // TOOL
216
217        AString tool;
218        MakeUserAgentString(&tool);
219
220        data[offset++] = tool.size();
221
222        memcpy(&data[offset], tool.c_str(), tool.size());
223        offset += tool.size();
224
225        data[offset++] = 0;
226
227        if ((offset % 4) > 0) {
228            size_t count = 4 - (offset % 4);
229            switch (count) {
230                case 3:
231                    data[offset++] = 0;
232                case 2:
233                    data[offset++] = 0;
234                case 1:
235                    data[offset++] = 0;
236            }
237        }
238
239        size_t numWords = (offset / 4) - 1;
240        data[2] = numWords >> 8;
241        data[3] = numWords & 0xff;
242
243        buffer->setRange(buffer->offset(), buffer->size() + offset);
244    }
245
246    // In case we're behind NAT, fire off two UDP packets to the remote
247    // rtp/rtcp ports to poke a hole into the firewall for future incoming
248    // packets. We're going to send an RR/SDES RTCP packet to both of them.
249    void pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
250        AString source;
251        AString server_port;
252        if (!GetAttribute(transport.c_str(),
253                          "source",
254                          &source)
255                || !GetAttribute(transport.c_str(),
256                                 "server_port",
257                                 &server_port)) {
258            return;
259        }
260
261        int rtpPort, rtcpPort;
262        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
263                || rtpPort <= 0 || rtpPort > 65535
264                || rtcpPort <=0 || rtcpPort > 65535
265                || rtcpPort != rtpPort + 1
266                || (rtpPort & 1) != 0) {
267            return;
268        }
269
270        struct sockaddr_in addr;
271        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
272        addr.sin_family = AF_INET;
273        addr.sin_addr.s_addr = inet_addr(source.c_str());
274
275        if (addr.sin_addr.s_addr == INADDR_NONE) {
276            return;
277        }
278
279        // Make up an RR/SDES RTCP packet.
280        sp<ABuffer> buf = new ABuffer(65536);
281        buf->setRange(0, 0);
282        addRR(buf);
283        addSDES(rtpSocket, buf);
284
285        addr.sin_port = htons(rtpPort);
286
287        ssize_t n = sendto(
288                rtpSocket, buf->data(), buf->size(), 0,
289                (const sockaddr *)&addr, sizeof(addr));
290        CHECK_EQ(n, (ssize_t)buf->size());
291
292        addr.sin_port = htons(rtcpPort);
293
294        n = sendto(
295                rtcpSocket, buf->data(), buf->size(), 0,
296                (const sockaddr *)&addr, sizeof(addr));
297        CHECK_EQ(n, (ssize_t)buf->size());
298
299        LOGV("successfully poked holes.");
300    }
301
302    virtual void onMessageReceived(const sp<AMessage> &msg) {
303        switch (msg->what()) {
304            case 'conn':
305            {
306                int32_t result;
307                CHECK(msg->findInt32("result", &result));
308
309                LOGI("connection request completed with result %d (%s)",
310                     result, strerror(-result));
311
312                if (result == OK) {
313                    AString request;
314                    request = "DESCRIBE ";
315                    request.append(mSessionURL);
316                    request.append(" RTSP/1.0\r\n");
317                    request.append("Accept: application/sdp\r\n");
318                    request.append("\r\n");
319
320                    sp<AMessage> reply = new AMessage('desc', id());
321                    mConn->sendRequest(request.c_str(), reply);
322                } else {
323                    (new AMessage('disc', id()))->post();
324                }
325                break;
326            }
327
328            case 'disc':
329            {
330                int32_t reconnect;
331                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
332                    sp<AMessage> reply = new AMessage('conn', id());
333                    mConn->connect(mOriginalSessionURL.c_str(), reply);
334                } else {
335                    (new AMessage('quit', id()))->post();
336                }
337                break;
338            }
339
340            case 'desc':
341            {
342                int32_t result;
343                CHECK(msg->findInt32("result", &result));
344
345                LOGI("DESCRIBE completed with result %d (%s)",
346                     result, strerror(-result));
347
348                if (result == OK) {
349                    sp<RefBase> obj;
350                    CHECK(msg->findObject("response", &obj));
351                    sp<ARTSPResponse> response =
352                        static_cast<ARTSPResponse *>(obj.get());
353
354                    if (response->mStatusCode == 302) {
355                        ssize_t i = response->mHeaders.indexOfKey("location");
356                        CHECK_GE(i, 0);
357
358                        mSessionURL = response->mHeaders.valueAt(i);
359
360                        AString request;
361                        request = "DESCRIBE ";
362                        request.append(mSessionURL);
363                        request.append(" RTSP/1.0\r\n");
364                        request.append("Accept: application/sdp\r\n");
365                        request.append("\r\n");
366
367                        sp<AMessage> reply = new AMessage('desc', id());
368                        mConn->sendRequest(request.c_str(), reply);
369                        break;
370                    }
371
372                    if (response->mStatusCode != 200) {
373                        result = UNKNOWN_ERROR;
374                    } else {
375                        mSessionDesc = new ASessionDescription;
376
377                        mSessionDesc->setTo(
378                                response->mContent->data(),
379                                response->mContent->size());
380
381                        if (!mSessionDesc->isValid()) {
382                            result = ERROR_MALFORMED;
383                        } else {
384                            ssize_t i = response->mHeaders.indexOfKey("content-base");
385                            if (i >= 0) {
386                                mBaseURL = response->mHeaders.valueAt(i);
387                            } else {
388                                i = response->mHeaders.indexOfKey("content-location");
389                                if (i >= 0) {
390                                    mBaseURL = response->mHeaders.valueAt(i);
391                                } else {
392                                    mBaseURL = mSessionURL;
393                                }
394                            }
395
396                            CHECK_GT(mSessionDesc->countTracks(), 1u);
397                            setupTrack(1);
398                        }
399                    }
400                }
401
402                if (result != OK) {
403                    sp<AMessage> reply = new AMessage('disc', id());
404                    mConn->disconnect(reply);
405                }
406                break;
407            }
408
409            case 'setu':
410            {
411                size_t index;
412                CHECK(msg->findSize("index", &index));
413
414                TrackInfo *track = NULL;
415                size_t trackIndex;
416                if (msg->findSize("track-index", &trackIndex)) {
417                    track = &mTracks.editItemAt(trackIndex);
418                }
419
420                int32_t result;
421                CHECK(msg->findInt32("result", &result));
422
423                LOGI("SETUP(%d) completed with result %d (%s)",
424                     index, result, strerror(-result));
425
426                if (result == OK) {
427                    CHECK(track != NULL);
428
429                    sp<RefBase> obj;
430                    CHECK(msg->findObject("response", &obj));
431                    sp<ARTSPResponse> response =
432                        static_cast<ARTSPResponse *>(obj.get());
433
434                    if (response->mStatusCode != 200) {
435                        result = UNKNOWN_ERROR;
436                    } else {
437                        ssize_t i = response->mHeaders.indexOfKey("session");
438                        CHECK_GE(i, 0);
439
440                        mSessionID = response->mHeaders.valueAt(i);
441                        i = mSessionID.find(";");
442                        if (i >= 0) {
443                            // Remove options, i.e. ";timeout=90"
444                            mSessionID.erase(i, mSessionID.size() - i);
445                        }
446
447                        sp<AMessage> notify = new AMessage('accu', id());
448                        notify->setSize("track-index", trackIndex);
449
450                        i = response->mHeaders.indexOfKey("transport");
451                        CHECK_GE(i, 0);
452
453                        if (!track->mUsingInterleavedTCP) {
454                            AString transport = response->mHeaders.valueAt(i);
455
456                            pokeAHole(track->mRTPSocket,
457                                      track->mRTCPSocket,
458                                      transport);
459                        }
460
461                        mRTPConn->addStream(
462                                track->mRTPSocket, track->mRTCPSocket,
463                                mSessionDesc, index,
464                                notify, track->mUsingInterleavedTCP);
465
466                        mSetupTracksSuccessful = true;
467                    }
468                }
469
470                if (result != OK) {
471                    if (track) {
472                        if (!track->mUsingInterleavedTCP) {
473                            close(track->mRTPSocket);
474                            close(track->mRTCPSocket);
475                        }
476
477                        mTracks.removeItemsAt(trackIndex);
478                    }
479                }
480
481                ++index;
482                if (index < mSessionDesc->countTracks()) {
483                    setupTrack(index);
484                } else if (mSetupTracksSuccessful) {
485                    AString request = "PLAY ";
486                    request.append(mSessionURL);
487                    request.append(" RTSP/1.0\r\n");
488
489                    request.append("Session: ");
490                    request.append(mSessionID);
491                    request.append("\r\n");
492
493                    request.append("\r\n");
494
495                    sp<AMessage> reply = new AMessage('play', id());
496                    mConn->sendRequest(request.c_str(), reply);
497                } else {
498                    sp<AMessage> reply = new AMessage('disc', id());
499                    mConn->disconnect(reply);
500                }
501                break;
502            }
503
504            case 'play':
505            {
506                int32_t result;
507                CHECK(msg->findInt32("result", &result));
508
509                LOGI("PLAY completed with result %d (%s)",
510                     result, strerror(-result));
511
512                if (result == OK) {
513                    sp<RefBase> obj;
514                    CHECK(msg->findObject("response", &obj));
515                    sp<ARTSPResponse> response =
516                        static_cast<ARTSPResponse *>(obj.get());
517
518                    if (response->mStatusCode != 200) {
519                        result = UNKNOWN_ERROR;
520                    } else {
521                        parsePlayResponse(response);
522
523                        sp<AMessage> timeout = new AMessage('tiou', id());
524                        timeout->post(kStartupTimeoutUs);
525                    }
526                }
527
528                if (result != OK) {
529                    sp<AMessage> reply = new AMessage('disc', id());
530                    mConn->disconnect(reply);
531                }
532
533                break;
534            }
535
536            case 'abor':
537            {
538                for (size_t i = 0; i < mTracks.size(); ++i) {
539                    TrackInfo *info = &mTracks.editItemAt(i);
540
541                    info->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
542
543                    if (!info->mUsingInterleavedTCP) {
544                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
545
546                        close(info->mRTPSocket);
547                        close(info->mRTCPSocket);
548                    }
549                }
550                mTracks.clear();
551                mSetupTracksSuccessful = false;
552                mSeekPending = false;
553                mFirstAccessUnit = true;
554                mFirstAccessUnitNTP = 0;
555                mNumAccessUnitsReceived = 0;
556                mReceivedFirstRTCPPacket = false;
557                mReceivedFirstRTPPacket = false;
558                mSeekable = false;
559
560                sp<AMessage> reply = new AMessage('tear', id());
561
562                int32_t reconnect;
563                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
564                    reply->setInt32("reconnect", true);
565                }
566
567                AString request;
568                request = "TEARDOWN ";
569
570                // XXX should use aggregate url from SDP here...
571                request.append(mSessionURL);
572                request.append(" RTSP/1.0\r\n");
573
574                request.append("Session: ");
575                request.append(mSessionID);
576                request.append("\r\n");
577
578                request.append("\r\n");
579
580                mConn->sendRequest(request.c_str(), reply);
581                break;
582            }
583
584            case 'tear':
585            {
586                int32_t result;
587                CHECK(msg->findInt32("result", &result));
588
589                LOGI("TEARDOWN completed with result %d (%s)",
590                     result, strerror(-result));
591
592                sp<AMessage> reply = new AMessage('disc', id());
593
594                int32_t reconnect;
595                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
596                    reply->setInt32("reconnect", true);
597                }
598
599                mConn->disconnect(reply);
600                break;
601            }
602
603            case 'quit':
604            {
605                if (mDoneMsg != NULL) {
606                    mDoneMsg->setInt32("result", UNKNOWN_ERROR);
607                    mDoneMsg->post();
608                    mDoneMsg = NULL;
609                }
610                break;
611            }
612
613            case 'chek':
614            {
615                int32_t generation;
616                CHECK(msg->findInt32("generation", &generation));
617                if (generation != mCheckGeneration) {
618                    // This is an outdated message. Ignore.
619                    break;
620                }
621
622                if (mNumAccessUnitsReceived == 0) {
623                    LOGI("stream ended? aborting.");
624                    (new AMessage('abor', id()))->post();
625                    break;
626                }
627
628                mNumAccessUnitsReceived = 0;
629                msg->post(kAccessUnitTimeoutUs);
630                break;
631            }
632
633            case 'accu':
634            {
635                int32_t first;
636                if (msg->findInt32("first-rtcp", &first)) {
637                    mReceivedFirstRTCPPacket = true;
638                    break;
639                }
640
641                if (msg->findInt32("first-rtp", &first)) {
642                    mReceivedFirstRTPPacket = true;
643                    break;
644                }
645
646                ++mNumAccessUnitsReceived;
647                postAccessUnitTimeoutCheck();
648
649                size_t trackIndex;
650                CHECK(msg->findSize("track-index", &trackIndex));
651
652                if (trackIndex >= mTracks.size()) {
653                    LOGV("late packets ignored.");
654                    break;
655                }
656
657                TrackInfo *track = &mTracks.editItemAt(trackIndex);
658
659                int32_t eos;
660                if (msg->findInt32("eos", &eos)) {
661                    LOGI("received BYE on track index %d", trackIndex);
662#if 0
663                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
664#endif
665                    return;
666                }
667
668                sp<RefBase> obj;
669                CHECK(msg->findObject("access-unit", &obj));
670
671                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
672
673                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
674
675                if (mSeekPending) {
676                    LOGV("we're seeking, dropping stale packet.");
677                    break;
678                }
679
680                if (seqNum < track->mFirstSeqNumInSegment) {
681                    LOGV("dropping stale access-unit (%d < %d)",
682                         seqNum, track->mFirstSeqNumInSegment);
683                    break;
684                }
685
686                uint64_t ntpTime;
687                CHECK(accessUnit->meta()->findInt64(
688                            "ntp-time", (int64_t *)&ntpTime));
689
690                uint32_t rtpTime;
691                CHECK(accessUnit->meta()->findInt32(
692                            "rtp-time", (int32_t *)&rtpTime));
693
694                if (track->mNewSegment) {
695                    track->mNewSegment = false;
696
697                    LOGV("first segment unit ntpTime=0x%016llx rtpTime=%u seq=%d",
698                         ntpTime, rtpTime, seqNum);
699                }
700
701                if (mFirstAccessUnit) {
702                    mDoneMsg->setInt32("result", OK);
703                    mDoneMsg->post();
704                    mDoneMsg = NULL;
705
706                    mFirstAccessUnit = false;
707                    mFirstAccessUnitNTP = ntpTime;
708                }
709
710                if (ntpTime >= mFirstAccessUnitNTP) {
711                    ntpTime -= mFirstAccessUnitNTP;
712                } else {
713                    ntpTime = 0;
714                }
715
716                int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
717
718                accessUnit->meta()->setInt64("timeUs", timeUs);
719
720#if 0
721                int32_t damaged;
722                if (accessUnit->meta()->findInt32("damaged", &damaged)
723                        && damaged != 0) {
724                    LOGI("ignoring damaged AU");
725                } else
726#endif
727                {
728                    TrackInfo *track = &mTracks.editItemAt(trackIndex);
729                    track->mPacketSource->queueAccessUnit(accessUnit);
730                }
731                break;
732            }
733
734            case 'seek':
735            {
736                sp<AMessage> doneMsg;
737                CHECK(msg->findMessage("doneMsg", &doneMsg));
738
739                if (mSeekPending) {
740                    doneMsg->post();
741                    break;
742                }
743
744                if (!mSeekable) {
745                    LOGW("This is a live stream, ignoring seek request.");
746                    doneMsg->post();
747                    break;
748                }
749
750                int64_t timeUs;
751                CHECK(msg->findInt64("time", &timeUs));
752
753                mSeekPending = true;
754
755                // Disable the access unit timeout until we resumed
756                // playback again.
757                mCheckPending = true;
758                ++mCheckGeneration;
759
760                AString request = "PAUSE ";
761                request.append(mSessionURL);
762                request.append(" RTSP/1.0\r\n");
763
764                request.append("Session: ");
765                request.append(mSessionID);
766                request.append("\r\n");
767
768                request.append("\r\n");
769
770                sp<AMessage> reply = new AMessage('see1', id());
771                reply->setInt64("time", timeUs);
772                reply->setMessage("doneMsg", doneMsg);
773                mConn->sendRequest(request.c_str(), reply);
774                break;
775            }
776
777            case 'see1':
778            {
779                // Session is paused now.
780                for (size_t i = 0; i < mTracks.size(); ++i) {
781                    mTracks.editItemAt(i).mPacketSource->flushQueue();
782                }
783
784                int64_t timeUs;
785                CHECK(msg->findInt64("time", &timeUs));
786
787                AString request = "PLAY ";
788                request.append(mSessionURL);
789                request.append(" RTSP/1.0\r\n");
790
791                request.append("Session: ");
792                request.append(mSessionID);
793                request.append("\r\n");
794
795                request.append(
796                        StringPrintf(
797                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
798
799                request.append("\r\n");
800
801                sp<AMessage> doneMsg;
802                CHECK(msg->findMessage("doneMsg", &doneMsg));
803
804                sp<AMessage> reply = new AMessage('see2', id());
805                reply->setMessage("doneMsg", doneMsg);
806                mConn->sendRequest(request.c_str(), reply);
807                break;
808            }
809
810            case 'see2':
811            {
812                CHECK(mSeekPending);
813
814                int32_t result;
815                CHECK(msg->findInt32("result", &result));
816
817                LOGI("PLAY completed with result %d (%s)",
818                     result, strerror(-result));
819
820                mCheckPending = false;
821                postAccessUnitTimeoutCheck();
822
823                if (result == OK) {
824                    sp<RefBase> obj;
825                    CHECK(msg->findObject("response", &obj));
826                    sp<ARTSPResponse> response =
827                        static_cast<ARTSPResponse *>(obj.get());
828
829                    if (response->mStatusCode != 200) {
830                        result = UNKNOWN_ERROR;
831                    } else {
832                        parsePlayResponse(response);
833
834                        LOGI("seek completed.");
835                    }
836                }
837
838                if (result != OK) {
839                    LOGE("seek failed, aborting.");
840                    (new AMessage('abor', id()))->post();
841                }
842
843                mSeekPending = false;
844
845                sp<AMessage> doneMsg;
846                CHECK(msg->findMessage("doneMsg", &doneMsg));
847
848                doneMsg->post();
849                break;
850            }
851
852            case 'biny':
853            {
854                sp<RefBase> obj;
855                CHECK(msg->findObject("buffer", &obj));
856                sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
857
858                int32_t index;
859                CHECK(buffer->meta()->findInt32("index", &index));
860
861                mRTPConn->injectPacket(index, buffer);
862                break;
863            }
864
865            case 'tiou':
866            {
867                if (!mReceivedFirstRTCPPacket) {
868                    if (mTryFakeRTCP) {
869                        LOGW("Never received any data, disconnecting.");
870                        (new AMessage('abor', id()))->post();
871                    } else if (mTryTCPInterleaving && mReceivedFirstRTPPacket) {
872                        LOGW("We received RTP packets but no RTCP packets, "
873                             "using fake timestamps.");
874
875                        mTryFakeRTCP = true;
876
877                        mReceivedFirstRTCPPacket = true;
878                        mRTPConn->fakeTimestamps();
879                    } else {
880                        LOGW("Never received any data, switching transports.");
881
882                        mTryTCPInterleaving = true;
883
884                        sp<AMessage> msg = new AMessage('abor', id());
885                        msg->setInt32("reconnect", true);
886                        msg->post();
887                    }
888                }
889                break;
890            }
891
892            default:
893                TRESPASS();
894                break;
895        }
896    }
897
898    void postAccessUnitTimeoutCheck() {
899        if (mCheckPending) {
900            return;
901        }
902
903        mCheckPending = true;
904        sp<AMessage> check = new AMessage('chek', id());
905        check->setInt32("generation", mCheckGeneration);
906        check->post(kAccessUnitTimeoutUs);
907    }
908
909    static void SplitString(
910            const AString &s, const char *separator, List<AString> *items) {
911        items->clear();
912        size_t start = 0;
913        while (start < s.size()) {
914            ssize_t offset = s.find(separator, start);
915
916            if (offset < 0) {
917                items->push_back(AString(s, start, s.size() - start));
918                break;
919            }
920
921            items->push_back(AString(s, start, offset - start));
922            start = offset + strlen(separator);
923        }
924    }
925
926    void parsePlayResponse(const sp<ARTSPResponse> &response) {
927        mSeekable = false;
928
929        ssize_t i = response->mHeaders.indexOfKey("range");
930        if (i < 0) {
931            // Server doesn't even tell use what range it is going to
932            // play, therefore we won't support seeking.
933            return;
934        }
935
936        AString range = response->mHeaders.valueAt(i);
937        LOGV("Range: %s", range.c_str());
938
939        AString val;
940        CHECK(GetAttribute(range.c_str(), "npt", &val));
941        float npt1, npt2;
942
943        if (val == "now-" || val == "0-") {
944            // This is a live stream and therefore not seekable.
945            return;
946        } else {
947            CHECK_EQ(sscanf(val.c_str(), "%f-%f", &npt1, &npt2), 2);
948        }
949
950        i = response->mHeaders.indexOfKey("rtp-info");
951        CHECK_GE(i, 0);
952
953        AString rtpInfo = response->mHeaders.valueAt(i);
954        List<AString> streamInfos;
955        SplitString(rtpInfo, ",", &streamInfos);
956
957        int n = 1;
958        for (List<AString>::iterator it = streamInfos.begin();
959             it != streamInfos.end(); ++it) {
960            (*it).trim();
961            LOGV("streamInfo[%d] = %s", n, (*it).c_str());
962
963            CHECK(GetAttribute((*it).c_str(), "url", &val));
964
965            size_t trackIndex = 0;
966            while (trackIndex < mTracks.size()
967                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
968                ++trackIndex;
969            }
970            CHECK_LT(trackIndex, mTracks.size());
971
972            CHECK(GetAttribute((*it).c_str(), "seq", &val));
973
974            char *end;
975            unsigned long seq = strtoul(val.c_str(), &end, 10);
976
977            TrackInfo *info = &mTracks.editItemAt(trackIndex);
978            info->mFirstSeqNumInSegment = seq;
979            info->mNewSegment = true;
980
981            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
982
983            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
984
985            LOGV("track #%d: rtpTime=%u <=> ntp=%.2f", n, rtpTime, npt1);
986
987            info->mPacketSource->setNormalPlayTimeMapping(
988                    rtpTime, (int64_t)(npt1 * 1E6));
989
990            ++n;
991        }
992
993        mSeekable = true;
994    }
995
996    sp<APacketSource> getPacketSource(size_t index) {
997        CHECK_GE(index, 0u);
998        CHECK_LT(index, mTracks.size());
999
1000        return mTracks.editItemAt(index).mPacketSource;
1001    }
1002
1003    size_t countTracks() const {
1004        return mTracks.size();
1005    }
1006
1007private:
1008    sp<ALooper> mLooper;
1009    sp<ALooper> mNetLooper;
1010    sp<ARTSPConnection> mConn;
1011    sp<ARTPConnection> mRTPConn;
1012    sp<ASessionDescription> mSessionDesc;
1013    AString mOriginalSessionURL;  // This one still has user:pass@
1014    AString mSessionURL;
1015    AString mBaseURL;
1016    AString mSessionID;
1017    bool mSetupTracksSuccessful;
1018    bool mSeekPending;
1019    bool mFirstAccessUnit;
1020    uint64_t mFirstAccessUnitNTP;
1021    int64_t mNumAccessUnitsReceived;
1022    bool mCheckPending;
1023    int32_t mCheckGeneration;
1024    bool mTryTCPInterleaving;
1025    bool mTryFakeRTCP;
1026    bool mReceivedFirstRTCPPacket;
1027    bool mReceivedFirstRTPPacket;
1028    bool mSeekable;
1029
1030    struct TrackInfo {
1031        AString mURL;
1032        int mRTPSocket;
1033        int mRTCPSocket;
1034        bool mUsingInterleavedTCP;
1035        uint32_t mFirstSeqNumInSegment;
1036        bool mNewSegment;
1037
1038        sp<APacketSource> mPacketSource;
1039    };
1040    Vector<TrackInfo> mTracks;
1041
1042    sp<AMessage> mDoneMsg;
1043
1044    void setupTrack(size_t index) {
1045        sp<APacketSource> source =
1046            new APacketSource(mSessionDesc, index);
1047
1048        if (source->initCheck() != OK) {
1049            LOGW("Unsupported format. Ignoring track #%d.", index);
1050
1051            sp<AMessage> reply = new AMessage('setu', id());
1052            reply->setSize("index", index);
1053            reply->setInt32("result", ERROR_UNSUPPORTED);
1054            reply->post();
1055            return;
1056        }
1057
1058        AString url;
1059        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1060
1061        AString trackURL;
1062        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1063
1064        mTracks.push(TrackInfo());
1065        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1066        info->mURL = trackURL;
1067        info->mPacketSource = source;
1068        info->mUsingInterleavedTCP = false;
1069        info->mFirstSeqNumInSegment = 0;
1070        info->mNewSegment = true;
1071
1072        LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1073
1074        AString request = "SETUP ";
1075        request.append(trackURL);
1076        request.append(" RTSP/1.0\r\n");
1077
1078        if (mTryTCPInterleaving) {
1079            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1080            info->mUsingInterleavedTCP = true;
1081            info->mRTPSocket = interleaveIndex;
1082            info->mRTCPSocket = interleaveIndex + 1;
1083
1084            request.append("Transport: RTP/AVP/TCP;interleaved=");
1085            request.append(interleaveIndex);
1086            request.append("-");
1087            request.append(interleaveIndex + 1);
1088        } else {
1089            unsigned rtpPort;
1090            ARTPConnection::MakePortPair(
1091                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1092
1093            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1094            request.append(rtpPort);
1095            request.append("-");
1096            request.append(rtpPort + 1);
1097        }
1098
1099        request.append("\r\n");
1100
1101        if (index > 1) {
1102            request.append("Session: ");
1103            request.append(mSessionID);
1104            request.append("\r\n");
1105        }
1106
1107        request.append("\r\n");
1108
1109        sp<AMessage> reply = new AMessage('setu', id());
1110        reply->setSize("index", index);
1111        reply->setSize("track-index", mTracks.size() - 1);
1112        mConn->sendRequest(request.c_str(), reply);
1113    }
1114
1115    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1116        out->clear();
1117
1118        if (strncasecmp("rtsp://", baseURL, 7)) {
1119            // Base URL must be absolute
1120            return false;
1121        }
1122
1123        if (!strncasecmp("rtsp://", url, 7)) {
1124            // "url" is already an absolute URL, ignore base URL.
1125            out->setTo(url);
1126            return true;
1127        }
1128
1129        size_t n = strlen(baseURL);
1130        if (baseURL[n - 1] == '/') {
1131            out->setTo(baseURL);
1132            out->append(url);
1133        } else {
1134            const char *slashPos = strrchr(baseURL, '/');
1135
1136            if (slashPos > &baseURL[6]) {
1137                out->setTo(baseURL, slashPos - baseURL);
1138            } else {
1139                out->setTo(baseURL);
1140            }
1141
1142            out->append("/");
1143            out->append(url);
1144        }
1145
1146        return true;
1147    }
1148
1149    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1150};
1151
1152}  // namespace android
1153
1154#endif  // MY_HANDLER_H_
1155