MyHandler.h revision a44501ea0896c2508bd6b807185d9049be6752f3
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22#define LOG_TAG "MyHandler"
23#include <utils/Log.h>
24
25#include "APacketSource.h"
26#include "ARTPConnection.h"
27#include "ARTSPConnection.h"
28#include "ASessionDescription.h"
29
30#include <ctype.h>
31#include <cutils/properties.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/ALooper.h>
36#include <media/stagefright/foundation/AMessage.h>
37#include <media/stagefright/MediaErrors.h>
38
39#include <arpa/inet.h>
40#include <sys/socket.h>
41
42// If no access units are received within 3 secs, assume that the rtp
43// stream has ended and signal end of stream.
44static int64_t kAccessUnitTimeoutUs = 3000000ll;
45
46// If no access units arrive for the first 10 secs after starting the
47// stream, assume none ever will and signal EOS or switch transports.
48static int64_t kStartupTimeoutUs = 10000000ll;
49
50namespace android {
51
52static void MakeUserAgentString(AString *s) {
53    s->setTo("stagefright/1.1 (Linux;Android ");
54
55#if (PROPERTY_VALUE_MAX < 8)
56#error "PROPERTY_VALUE_MAX must be at least 8"
57#endif
58
59    char value[PROPERTY_VALUE_MAX];
60    property_get("ro.build.version.release", value, "Unknown");
61    s->append(value);
62    s->append(")");
63}
64
65static bool GetAttribute(const char *s, const char *key, AString *value) {
66    value->clear();
67
68    size_t keyLen = strlen(key);
69
70    for (;;) {
71        while (isspace(*s)) {
72            ++s;
73        }
74
75        const char *colonPos = strchr(s, ';');
76
77        size_t len =
78            (colonPos == NULL) ? strlen(s) : colonPos - s;
79
80        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
81            value->setTo(&s[keyLen + 1], len - keyLen - 1);
82            return true;
83        }
84
85        if (colonPos == NULL) {
86            return false;
87        }
88
89        s = colonPos + 1;
90    }
91}
92
93struct MyHandler : public AHandler {
94    MyHandler(const char *url, const sp<ALooper> &looper)
95        : mLooper(looper),
96          mNetLooper(new ALooper),
97          mConn(new ARTSPConnection),
98          mRTPConn(new ARTPConnection),
99          mSessionURL(url),
100          mSetupTracksSuccessful(false),
101          mSeekPending(false),
102          mFirstAccessUnit(true),
103          mFirstAccessUnitNTP(0),
104          mNumAccessUnitsReceived(0),
105          mCheckPending(false),
106          mCheckGeneration(0),
107          mTryTCPInterleaving(false),
108          mTryFakeRTCP(false),
109          mReceivedFirstRTCPPacket(false),
110          mReceivedFirstRTPPacket(false),
111          mSeekable(false) {
112        mNetLooper->setName("rtsp net");
113        mNetLooper->start(false /* runOnCallingThread */,
114                          false /* canCallJava */,
115                          PRIORITY_HIGHEST);
116    }
117
118    void connect(const sp<AMessage> &doneMsg) {
119        mDoneMsg = doneMsg;
120
121        mLooper->registerHandler(this);
122        mLooper->registerHandler(mConn);
123        (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
124
125        sp<AMessage> notify = new AMessage('biny', id());
126        mConn->observeBinaryData(notify);
127
128        sp<AMessage> reply = new AMessage('conn', id());
129        mConn->connect(mSessionURL.c_str(), reply);
130    }
131
132    void disconnect(const sp<AMessage> &doneMsg) {
133        mDoneMsg = doneMsg;
134
135        (new AMessage('abor', id()))->post();
136    }
137
138    void seek(int64_t timeUs, const sp<AMessage> &doneMsg) {
139        sp<AMessage> msg = new AMessage('seek', id());
140        msg->setInt64("time", timeUs);
141        msg->setMessage("doneMsg", doneMsg);
142        msg->post();
143    }
144
145    int64_t getNormalPlayTimeUs() {
146        int64_t maxTimeUs = 0;
147        for (size_t i = 0; i < mTracks.size(); ++i) {
148            int64_t timeUs = mTracks.editItemAt(i).mPacketSource
149                ->getNormalPlayTimeUs();
150
151            if (i == 0 || timeUs > maxTimeUs) {
152                maxTimeUs = timeUs;
153            }
154        }
155
156        return maxTimeUs;
157    }
158
159    static void addRR(const sp<ABuffer> &buf) {
160        uint8_t *ptr = buf->data() + buf->size();
161        ptr[0] = 0x80 | 0;
162        ptr[1] = 201;  // RR
163        ptr[2] = 0;
164        ptr[3] = 1;
165        ptr[4] = 0xde;  // SSRC
166        ptr[5] = 0xad;
167        ptr[6] = 0xbe;
168        ptr[7] = 0xef;
169
170        buf->setRange(0, buf->size() + 8);
171    }
172
173    static void addSDES(int s, const sp<ABuffer> &buffer) {
174        struct sockaddr_in addr;
175        socklen_t addrSize = sizeof(addr);
176        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
177
178        uint8_t *data = buffer->data() + buffer->size();
179        data[0] = 0x80 | 1;
180        data[1] = 202;  // SDES
181        data[4] = 0xde;  // SSRC
182        data[5] = 0xad;
183        data[6] = 0xbe;
184        data[7] = 0xef;
185
186        size_t offset = 8;
187
188        data[offset++] = 1;  // CNAME
189
190        AString cname = "stagefright@";
191        cname.append(inet_ntoa(addr.sin_addr));
192        data[offset++] = cname.size();
193
194        memcpy(&data[offset], cname.c_str(), cname.size());
195        offset += cname.size();
196
197        data[offset++] = 6;  // TOOL
198
199        AString tool;
200        MakeUserAgentString(&tool);
201
202        data[offset++] = tool.size();
203
204        memcpy(&data[offset], tool.c_str(), tool.size());
205        offset += tool.size();
206
207        data[offset++] = 0;
208
209        if ((offset % 4) > 0) {
210            size_t count = 4 - (offset % 4);
211            switch (count) {
212                case 3:
213                    data[offset++] = 0;
214                case 2:
215                    data[offset++] = 0;
216                case 1:
217                    data[offset++] = 0;
218            }
219        }
220
221        size_t numWords = (offset / 4) - 1;
222        data[2] = numWords >> 8;
223        data[3] = numWords & 0xff;
224
225        buffer->setRange(buffer->offset(), buffer->size() + offset);
226    }
227
228    // In case we're behind NAT, fire off two UDP packets to the remote
229    // rtp/rtcp ports to poke a hole into the firewall for future incoming
230    // packets. We're going to send an RR/SDES RTCP packet to both of them.
231    void pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
232        AString source;
233        AString server_port;
234        if (!GetAttribute(transport.c_str(),
235                          "source",
236                          &source)
237                || !GetAttribute(transport.c_str(),
238                                 "server_port",
239                                 &server_port)) {
240            return;
241        }
242
243        int rtpPort, rtcpPort;
244        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
245                || rtpPort <= 0 || rtpPort > 65535
246                || rtcpPort <=0 || rtcpPort > 65535
247                || rtcpPort != rtpPort + 1
248                || (rtpPort & 1) != 0) {
249            return;
250        }
251
252        struct sockaddr_in addr;
253        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
254        addr.sin_family = AF_INET;
255        addr.sin_addr.s_addr = inet_addr(source.c_str());
256
257        if (addr.sin_addr.s_addr == INADDR_NONE) {
258            return;
259        }
260
261        // Make up an RR/SDES RTCP packet.
262        sp<ABuffer> buf = new ABuffer(65536);
263        buf->setRange(0, 0);
264        addRR(buf);
265        addSDES(rtpSocket, buf);
266
267        addr.sin_port = htons(rtpPort);
268
269        ssize_t n = sendto(
270                rtpSocket, buf->data(), buf->size(), 0,
271                (const sockaddr *)&addr, sizeof(addr));
272        CHECK_EQ(n, (ssize_t)buf->size());
273
274        addr.sin_port = htons(rtcpPort);
275
276        n = sendto(
277                rtcpSocket, buf->data(), buf->size(), 0,
278                (const sockaddr *)&addr, sizeof(addr));
279        CHECK_EQ(n, (ssize_t)buf->size());
280
281        LOGV("successfully poked holes.");
282    }
283
284    virtual void onMessageReceived(const sp<AMessage> &msg) {
285        switch (msg->what()) {
286            case 'conn':
287            {
288                int32_t result;
289                CHECK(msg->findInt32("result", &result));
290
291                LOGI("connection request completed with result %d (%s)",
292                     result, strerror(-result));
293
294                if (result == OK) {
295                    AString request;
296                    request = "DESCRIBE ";
297                    request.append(mSessionURL);
298                    request.append(" RTSP/1.0\r\n");
299                    request.append("Accept: application/sdp\r\n");
300                    request.append("\r\n");
301
302                    sp<AMessage> reply = new AMessage('desc', id());
303                    mConn->sendRequest(request.c_str(), reply);
304                } else {
305                    (new AMessage('disc', id()))->post();
306                }
307                break;
308            }
309
310            case 'disc':
311            {
312                int32_t reconnect;
313                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
314                    sp<AMessage> reply = new AMessage('conn', id());
315                    mConn->connect(mSessionURL.c_str(), reply);
316                } else {
317                    (new AMessage('quit', id()))->post();
318                }
319                break;
320            }
321
322            case 'desc':
323            {
324                int32_t result;
325                CHECK(msg->findInt32("result", &result));
326
327                LOGI("DESCRIBE completed with result %d (%s)",
328                     result, strerror(-result));
329
330                if (result == OK) {
331                    sp<RefBase> obj;
332                    CHECK(msg->findObject("response", &obj));
333                    sp<ARTSPResponse> response =
334                        static_cast<ARTSPResponse *>(obj.get());
335
336                    if (response->mStatusCode == 302) {
337                        ssize_t i = response->mHeaders.indexOfKey("location");
338                        CHECK_GE(i, 0);
339
340                        mSessionURL = response->mHeaders.valueAt(i);
341
342                        AString request;
343                        request = "DESCRIBE ";
344                        request.append(mSessionURL);
345                        request.append(" RTSP/1.0\r\n");
346                        request.append("Accept: application/sdp\r\n");
347                        request.append("\r\n");
348
349                        sp<AMessage> reply = new AMessage('desc', id());
350                        mConn->sendRequest(request.c_str(), reply);
351                        break;
352                    }
353
354                    if (response->mStatusCode != 200) {
355                        result = UNKNOWN_ERROR;
356                    } else {
357                        mSessionDesc = new ASessionDescription;
358
359                        mSessionDesc->setTo(
360                                response->mContent->data(),
361                                response->mContent->size());
362
363                        if (!mSessionDesc->isValid()) {
364                            result = ERROR_MALFORMED;
365                        } else {
366                            ssize_t i = response->mHeaders.indexOfKey("content-base");
367                            if (i >= 0) {
368                                mBaseURL = response->mHeaders.valueAt(i);
369                            } else {
370                                i = response->mHeaders.indexOfKey("content-location");
371                                if (i >= 0) {
372                                    mBaseURL = response->mHeaders.valueAt(i);
373                                } else {
374                                    mBaseURL = mSessionURL;
375                                }
376                            }
377
378                            CHECK_GT(mSessionDesc->countTracks(), 1u);
379                            setupTrack(1);
380                        }
381                    }
382                }
383
384                if (result != OK) {
385                    sp<AMessage> reply = new AMessage('disc', id());
386                    mConn->disconnect(reply);
387                }
388                break;
389            }
390
391            case 'setu':
392            {
393                size_t index;
394                CHECK(msg->findSize("index", &index));
395
396                TrackInfo *track = NULL;
397                size_t trackIndex;
398                if (msg->findSize("track-index", &trackIndex)) {
399                    track = &mTracks.editItemAt(trackIndex);
400                }
401
402                int32_t result;
403                CHECK(msg->findInt32("result", &result));
404
405                LOGI("SETUP(%d) completed with result %d (%s)",
406                     index, result, strerror(-result));
407
408                if (result == OK) {
409                    CHECK(track != NULL);
410
411                    sp<RefBase> obj;
412                    CHECK(msg->findObject("response", &obj));
413                    sp<ARTSPResponse> response =
414                        static_cast<ARTSPResponse *>(obj.get());
415
416                    if (response->mStatusCode != 200) {
417                        result = UNKNOWN_ERROR;
418                    } else {
419                        ssize_t i = response->mHeaders.indexOfKey("session");
420                        CHECK_GE(i, 0);
421
422                        mSessionID = response->mHeaders.valueAt(i);
423                        i = mSessionID.find(";");
424                        if (i >= 0) {
425                            // Remove options, i.e. ";timeout=90"
426                            mSessionID.erase(i, mSessionID.size() - i);
427                        }
428
429                        sp<AMessage> notify = new AMessage('accu', id());
430                        notify->setSize("track-index", trackIndex);
431
432                        i = response->mHeaders.indexOfKey("transport");
433                        CHECK_GE(i, 0);
434
435                        if (!track->mUsingInterleavedTCP) {
436                            AString transport = response->mHeaders.valueAt(i);
437
438                            pokeAHole(track->mRTPSocket,
439                                      track->mRTCPSocket,
440                                      transport);
441                        }
442
443                        mRTPConn->addStream(
444                                track->mRTPSocket, track->mRTCPSocket,
445                                mSessionDesc, index,
446                                notify, track->mUsingInterleavedTCP);
447
448                        mSetupTracksSuccessful = true;
449                    }
450                }
451
452                if (result != OK) {
453                    if (track) {
454                        if (!track->mUsingInterleavedTCP) {
455                            close(track->mRTPSocket);
456                            close(track->mRTCPSocket);
457                        }
458
459                        mTracks.removeItemsAt(trackIndex);
460                    }
461                }
462
463                ++index;
464                if (index < mSessionDesc->countTracks()) {
465                    setupTrack(index);
466                } else if (mSetupTracksSuccessful) {
467                    AString request = "PLAY ";
468                    request.append(mSessionURL);
469                    request.append(" RTSP/1.0\r\n");
470
471                    request.append("Session: ");
472                    request.append(mSessionID);
473                    request.append("\r\n");
474
475                    request.append("\r\n");
476
477                    sp<AMessage> reply = new AMessage('play', id());
478                    mConn->sendRequest(request.c_str(), reply);
479                } else {
480                    sp<AMessage> reply = new AMessage('disc', id());
481                    mConn->disconnect(reply);
482                }
483                break;
484            }
485
486            case 'play':
487            {
488                int32_t result;
489                CHECK(msg->findInt32("result", &result));
490
491                LOGI("PLAY completed with result %d (%s)",
492                     result, strerror(-result));
493
494                if (result == OK) {
495                    sp<RefBase> obj;
496                    CHECK(msg->findObject("response", &obj));
497                    sp<ARTSPResponse> response =
498                        static_cast<ARTSPResponse *>(obj.get());
499
500                    if (response->mStatusCode != 200) {
501                        result = UNKNOWN_ERROR;
502                    } else {
503                        parsePlayResponse(response);
504
505                        sp<AMessage> timeout = new AMessage('tiou', id());
506                        timeout->post(kStartupTimeoutUs);
507                    }
508                }
509
510                if (result != OK) {
511                    sp<AMessage> reply = new AMessage('disc', id());
512                    mConn->disconnect(reply);
513                }
514
515                break;
516            }
517
518            case 'abor':
519            {
520                for (size_t i = 0; i < mTracks.size(); ++i) {
521                    TrackInfo *info = &mTracks.editItemAt(i);
522
523                    info->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
524
525                    if (!info->mUsingInterleavedTCP) {
526                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
527
528                        close(info->mRTPSocket);
529                        close(info->mRTCPSocket);
530                    }
531                }
532                mTracks.clear();
533                mSetupTracksSuccessful = false;
534                mSeekPending = false;
535                mFirstAccessUnit = true;
536                mFirstAccessUnitNTP = 0;
537                mNumAccessUnitsReceived = 0;
538                mReceivedFirstRTCPPacket = false;
539                mReceivedFirstRTPPacket = false;
540                mSeekable = false;
541
542                sp<AMessage> reply = new AMessage('tear', id());
543
544                int32_t reconnect;
545                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
546                    reply->setInt32("reconnect", true);
547                }
548
549                AString request;
550                request = "TEARDOWN ";
551
552                // XXX should use aggregate url from SDP here...
553                request.append(mSessionURL);
554                request.append(" RTSP/1.0\r\n");
555
556                request.append("Session: ");
557                request.append(mSessionID);
558                request.append("\r\n");
559
560                request.append("\r\n");
561
562                mConn->sendRequest(request.c_str(), reply);
563                break;
564            }
565
566            case 'tear':
567            {
568                int32_t result;
569                CHECK(msg->findInt32("result", &result));
570
571                LOGI("TEARDOWN completed with result %d (%s)",
572                     result, strerror(-result));
573
574                sp<AMessage> reply = new AMessage('disc', id());
575
576                int32_t reconnect;
577                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
578                    reply->setInt32("reconnect", true);
579                }
580
581                mConn->disconnect(reply);
582                break;
583            }
584
585            case 'quit':
586            {
587                if (mDoneMsg != NULL) {
588                    mDoneMsg->setInt32("result", UNKNOWN_ERROR);
589                    mDoneMsg->post();
590                    mDoneMsg = NULL;
591                }
592                break;
593            }
594
595            case 'chek':
596            {
597                int32_t generation;
598                CHECK(msg->findInt32("generation", &generation));
599                if (generation != mCheckGeneration) {
600                    // This is an outdated message. Ignore.
601                    break;
602                }
603
604                if (mNumAccessUnitsReceived == 0) {
605                    LOGI("stream ended? aborting.");
606                    (new AMessage('abor', id()))->post();
607                    break;
608                }
609
610                mNumAccessUnitsReceived = 0;
611                msg->post(kAccessUnitTimeoutUs);
612                break;
613            }
614
615            case 'accu':
616            {
617                int32_t first;
618                if (msg->findInt32("first-rtcp", &first)) {
619                    mReceivedFirstRTCPPacket = true;
620                    break;
621                }
622
623                if (msg->findInt32("first-rtp", &first)) {
624                    mReceivedFirstRTPPacket = true;
625                    break;
626                }
627
628                ++mNumAccessUnitsReceived;
629                postAccessUnitTimeoutCheck();
630
631                size_t trackIndex;
632                CHECK(msg->findSize("track-index", &trackIndex));
633
634                if (trackIndex >= mTracks.size()) {
635                    LOGV("late packets ignored.");
636                    break;
637                }
638
639                TrackInfo *track = &mTracks.editItemAt(trackIndex);
640
641                int32_t eos;
642                if (msg->findInt32("eos", &eos)) {
643                    LOGI("received BYE on track index %d", trackIndex);
644#if 0
645                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
646#endif
647                    return;
648                }
649
650                sp<RefBase> obj;
651                CHECK(msg->findObject("access-unit", &obj));
652
653                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
654
655                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
656
657                if (mSeekPending) {
658                    LOGV("we're seeking, dropping stale packet.");
659                    break;
660                }
661
662                if (seqNum < track->mFirstSeqNumInSegment) {
663                    LOGV("dropping stale access-unit (%d < %d)",
664                         seqNum, track->mFirstSeqNumInSegment);
665                    break;
666                }
667
668                uint64_t ntpTime;
669                CHECK(accessUnit->meta()->findInt64(
670                            "ntp-time", (int64_t *)&ntpTime));
671
672                uint32_t rtpTime;
673                CHECK(accessUnit->meta()->findInt32(
674                            "rtp-time", (int32_t *)&rtpTime));
675
676                if (track->mNewSegment) {
677                    track->mNewSegment = false;
678
679                    LOGV("first segment unit ntpTime=0x%016llx rtpTime=%u seq=%d",
680                         ntpTime, rtpTime, seqNum);
681                }
682
683                if (mFirstAccessUnit) {
684                    mDoneMsg->setInt32("result", OK);
685                    mDoneMsg->post();
686                    mDoneMsg = NULL;
687
688                    mFirstAccessUnit = false;
689                    mFirstAccessUnitNTP = ntpTime;
690                }
691
692                if (ntpTime >= mFirstAccessUnitNTP) {
693                    ntpTime -= mFirstAccessUnitNTP;
694                } else {
695                    ntpTime = 0;
696                }
697
698                int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
699
700                accessUnit->meta()->setInt64("timeUs", timeUs);
701
702#if 0
703                int32_t damaged;
704                if (accessUnit->meta()->findInt32("damaged", &damaged)
705                        && damaged != 0) {
706                    LOGI("ignoring damaged AU");
707                } else
708#endif
709                {
710                    TrackInfo *track = &mTracks.editItemAt(trackIndex);
711                    track->mPacketSource->queueAccessUnit(accessUnit);
712                }
713                break;
714            }
715
716            case 'seek':
717            {
718                sp<AMessage> doneMsg;
719                CHECK(msg->findMessage("doneMsg", &doneMsg));
720
721                if (mSeekPending) {
722                    doneMsg->post();
723                    break;
724                }
725
726                if (!mSeekable) {
727                    LOGW("This is a live stream, ignoring seek request.");
728                    doneMsg->post();
729                    break;
730                }
731
732                int64_t timeUs;
733                CHECK(msg->findInt64("time", &timeUs));
734
735                mSeekPending = true;
736
737                // Disable the access unit timeout until we resumed
738                // playback again.
739                mCheckPending = true;
740                ++mCheckGeneration;
741
742                AString request = "PAUSE ";
743                request.append(mSessionURL);
744                request.append(" RTSP/1.0\r\n");
745
746                request.append("Session: ");
747                request.append(mSessionID);
748                request.append("\r\n");
749
750                request.append("\r\n");
751
752                sp<AMessage> reply = new AMessage('see1', id());
753                reply->setInt64("time", timeUs);
754                reply->setMessage("doneMsg", doneMsg);
755                mConn->sendRequest(request.c_str(), reply);
756                break;
757            }
758
759            case 'see1':
760            {
761                // Session is paused now.
762                for (size_t i = 0; i < mTracks.size(); ++i) {
763                    mTracks.editItemAt(i).mPacketSource->flushQueue();
764                }
765
766                int64_t timeUs;
767                CHECK(msg->findInt64("time", &timeUs));
768
769                AString request = "PLAY ";
770                request.append(mSessionURL);
771                request.append(" RTSP/1.0\r\n");
772
773                request.append("Session: ");
774                request.append(mSessionID);
775                request.append("\r\n");
776
777                request.append(
778                        StringPrintf(
779                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
780
781                request.append("\r\n");
782
783                sp<AMessage> doneMsg;
784                CHECK(msg->findMessage("doneMsg", &doneMsg));
785
786                sp<AMessage> reply = new AMessage('see2', id());
787                reply->setMessage("doneMsg", doneMsg);
788                mConn->sendRequest(request.c_str(), reply);
789                break;
790            }
791
792            case 'see2':
793            {
794                CHECK(mSeekPending);
795
796                int32_t result;
797                CHECK(msg->findInt32("result", &result));
798
799                LOGI("PLAY completed with result %d (%s)",
800                     result, strerror(-result));
801
802                mCheckPending = false;
803                postAccessUnitTimeoutCheck();
804
805                if (result == OK) {
806                    sp<RefBase> obj;
807                    CHECK(msg->findObject("response", &obj));
808                    sp<ARTSPResponse> response =
809                        static_cast<ARTSPResponse *>(obj.get());
810
811                    if (response->mStatusCode != 200) {
812                        result = UNKNOWN_ERROR;
813                    } else {
814                        parsePlayResponse(response);
815
816                        LOGI("seek completed.");
817                    }
818                }
819
820                if (result != OK) {
821                    LOGE("seek failed, aborting.");
822                    (new AMessage('abor', id()))->post();
823                }
824
825                mSeekPending = false;
826
827                sp<AMessage> doneMsg;
828                CHECK(msg->findMessage("doneMsg", &doneMsg));
829
830                doneMsg->post();
831                break;
832            }
833
834            case 'biny':
835            {
836                sp<RefBase> obj;
837                CHECK(msg->findObject("buffer", &obj));
838                sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
839
840                int32_t index;
841                CHECK(buffer->meta()->findInt32("index", &index));
842
843                mRTPConn->injectPacket(index, buffer);
844                break;
845            }
846
847            case 'tiou':
848            {
849                if (!mReceivedFirstRTCPPacket) {
850                    if (mTryFakeRTCP) {
851                        LOGW("Never received any data, disconnecting.");
852                        (new AMessage('abor', id()))->post();
853                    } else if (mTryTCPInterleaving && mReceivedFirstRTPPacket) {
854                        LOGW("We received RTP packets but no RTCP packets, "
855                             "using fake timestamps.");
856
857                        mTryFakeRTCP = true;
858
859                        mReceivedFirstRTCPPacket = true;
860                        mRTPConn->fakeTimestamps();
861                    } else {
862                        LOGW("Never received any data, switching transports.");
863
864                        mTryTCPInterleaving = true;
865
866                        sp<AMessage> msg = new AMessage('abor', id());
867                        msg->setInt32("reconnect", true);
868                        msg->post();
869                    }
870                }
871                break;
872            }
873
874            default:
875                TRESPASS();
876                break;
877        }
878    }
879
880    void postAccessUnitTimeoutCheck() {
881        if (mCheckPending) {
882            return;
883        }
884
885        mCheckPending = true;
886        sp<AMessage> check = new AMessage('chek', id());
887        check->setInt32("generation", mCheckGeneration);
888        check->post(kAccessUnitTimeoutUs);
889    }
890
891    static void SplitString(
892            const AString &s, const char *separator, List<AString> *items) {
893        items->clear();
894        size_t start = 0;
895        while (start < s.size()) {
896            ssize_t offset = s.find(separator, start);
897
898            if (offset < 0) {
899                items->push_back(AString(s, start, s.size() - start));
900                break;
901            }
902
903            items->push_back(AString(s, start, offset - start));
904            start = offset + strlen(separator);
905        }
906    }
907
908    void parsePlayResponse(const sp<ARTSPResponse> &response) {
909        mSeekable = false;
910
911        ssize_t i = response->mHeaders.indexOfKey("range");
912        if (i < 0) {
913            // Server doesn't even tell use what range it is going to
914            // play, therefore we won't support seeking.
915            return;
916        }
917
918        AString range = response->mHeaders.valueAt(i);
919        LOGV("Range: %s", range.c_str());
920
921        AString val;
922        CHECK(GetAttribute(range.c_str(), "npt", &val));
923        float npt1, npt2;
924
925        if (val == "now-") {
926            // This is a live stream and therefore not seekable.
927            return;
928        } else {
929            CHECK_EQ(sscanf(val.c_str(), "%f-%f", &npt1, &npt2), 2);
930        }
931
932        i = response->mHeaders.indexOfKey("rtp-info");
933        CHECK_GE(i, 0);
934
935        AString rtpInfo = response->mHeaders.valueAt(i);
936        List<AString> streamInfos;
937        SplitString(rtpInfo, ",", &streamInfos);
938
939        int n = 1;
940        for (List<AString>::iterator it = streamInfos.begin();
941             it != streamInfos.end(); ++it) {
942            (*it).trim();
943            LOGV("streamInfo[%d] = %s", n, (*it).c_str());
944
945            CHECK(GetAttribute((*it).c_str(), "url", &val));
946
947            size_t trackIndex = 0;
948            while (trackIndex < mTracks.size()
949                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
950                ++trackIndex;
951            }
952            CHECK_LT(trackIndex, mTracks.size());
953
954            CHECK(GetAttribute((*it).c_str(), "seq", &val));
955
956            char *end;
957            unsigned long seq = strtoul(val.c_str(), &end, 10);
958
959            TrackInfo *info = &mTracks.editItemAt(trackIndex);
960            info->mFirstSeqNumInSegment = seq;
961            info->mNewSegment = true;
962
963            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
964
965            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
966
967            LOGV("track #%d: rtpTime=%u <=> ntp=%.2f", n, rtpTime, npt1);
968
969            info->mPacketSource->setNormalPlayTimeMapping(
970                    rtpTime, (int64_t)(npt1 * 1E6));
971
972            ++n;
973        }
974
975        mSeekable = true;
976    }
977
978    sp<APacketSource> getPacketSource(size_t index) {
979        CHECK_GE(index, 0u);
980        CHECK_LT(index, mTracks.size());
981
982        return mTracks.editItemAt(index).mPacketSource;
983    }
984
985    size_t countTracks() const {
986        return mTracks.size();
987    }
988
989private:
990    sp<ALooper> mLooper;
991    sp<ALooper> mNetLooper;
992    sp<ARTSPConnection> mConn;
993    sp<ARTPConnection> mRTPConn;
994    sp<ASessionDescription> mSessionDesc;
995    AString mSessionURL;
996    AString mBaseURL;
997    AString mSessionID;
998    bool mSetupTracksSuccessful;
999    bool mSeekPending;
1000    bool mFirstAccessUnit;
1001    uint64_t mFirstAccessUnitNTP;
1002    int64_t mNumAccessUnitsReceived;
1003    bool mCheckPending;
1004    int32_t mCheckGeneration;
1005    bool mTryTCPInterleaving;
1006    bool mTryFakeRTCP;
1007    bool mReceivedFirstRTCPPacket;
1008    bool mReceivedFirstRTPPacket;
1009    bool mSeekable;
1010
1011    struct TrackInfo {
1012        AString mURL;
1013        int mRTPSocket;
1014        int mRTCPSocket;
1015        bool mUsingInterleavedTCP;
1016        uint32_t mFirstSeqNumInSegment;
1017        bool mNewSegment;
1018
1019        sp<APacketSource> mPacketSource;
1020    };
1021    Vector<TrackInfo> mTracks;
1022
1023    sp<AMessage> mDoneMsg;
1024
1025    void setupTrack(size_t index) {
1026        sp<APacketSource> source =
1027            new APacketSource(mSessionDesc, index);
1028
1029        if (source->initCheck() != OK) {
1030            LOGW("Unsupported format. Ignoring track #%d.", index);
1031
1032            sp<AMessage> reply = new AMessage('setu', id());
1033            reply->setSize("index", index);
1034            reply->setInt32("result", ERROR_UNSUPPORTED);
1035            reply->post();
1036            return;
1037        }
1038
1039        AString url;
1040        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1041
1042        AString trackURL;
1043        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1044
1045        mTracks.push(TrackInfo());
1046        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1047        info->mURL = trackURL;
1048        info->mPacketSource = source;
1049        info->mUsingInterleavedTCP = false;
1050        info->mFirstSeqNumInSegment = 0;
1051        info->mNewSegment = true;
1052
1053        LOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1054
1055        AString request = "SETUP ";
1056        request.append(trackURL);
1057        request.append(" RTSP/1.0\r\n");
1058
1059        if (mTryTCPInterleaving) {
1060            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1061            info->mUsingInterleavedTCP = true;
1062            info->mRTPSocket = interleaveIndex;
1063            info->mRTCPSocket = interleaveIndex + 1;
1064
1065            request.append("Transport: RTP/AVP/TCP;interleaved=");
1066            request.append(interleaveIndex);
1067            request.append("-");
1068            request.append(interleaveIndex + 1);
1069        } else {
1070            unsigned rtpPort;
1071            ARTPConnection::MakePortPair(
1072                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1073
1074            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1075            request.append(rtpPort);
1076            request.append("-");
1077            request.append(rtpPort + 1);
1078        }
1079
1080        request.append("\r\n");
1081
1082        if (index > 1) {
1083            request.append("Session: ");
1084            request.append(mSessionID);
1085            request.append("\r\n");
1086        }
1087
1088        request.append("\r\n");
1089
1090        sp<AMessage> reply = new AMessage('setu', id());
1091        reply->setSize("index", index);
1092        reply->setSize("track-index", mTracks.size() - 1);
1093        mConn->sendRequest(request.c_str(), reply);
1094    }
1095
1096    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1097        out->clear();
1098
1099        if (strncasecmp("rtsp://", baseURL, 7)) {
1100            // Base URL must be absolute
1101            return false;
1102        }
1103
1104        if (!strncasecmp("rtsp://", url, 7)) {
1105            // "url" is already an absolute URL, ignore base URL.
1106            out->setTo(url);
1107            return true;
1108        }
1109
1110        size_t n = strlen(baseURL);
1111        if (baseURL[n - 1] == '/') {
1112            out->setTo(baseURL);
1113            out->append(url);
1114        } else {
1115            const char *slashPos = strrchr(baseURL, '/');
1116
1117            if (slashPos > &baseURL[6]) {
1118                out->setTo(baseURL, slashPos - baseURL);
1119            } else {
1120                out->setTo(baseURL);
1121            }
1122
1123            out->append("/");
1124            out->append(url);
1125        }
1126
1127        return true;
1128    }
1129
1130    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1131};
1132
1133}  // namespace android
1134
1135#endif  // MY_HANDLER_H_
1136