MyHandler.h revision aa5ba9a27f4c483ee116b7b296a681f4f8e23e62
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22#define LOG_TAG "MyHandler"
23#include <utils/Log.h>
24
25#include "APacketSource.h"
26#include "ARTPConnection.h"
27#include "ARTSPConnection.h"
28#include "ASessionDescription.h"
29
30#include <ctype.h>
31#include <cutils/properties.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/ALooper.h>
36#include <media/stagefright/foundation/AMessage.h>
37#include <media/stagefright/MediaErrors.h>
38
39#include <arpa/inet.h>
40#include <sys/socket.h>
41#include <netdb.h>
42
43#include "HTTPBase.h"
44
45// If no access units are received within 5 secs, assume that the rtp
46// stream has ended and signal end of stream.
47static int64_t kAccessUnitTimeoutUs = 10000000ll;
48
49// If no access units arrive for the first 10 secs after starting the
50// stream, assume none ever will and signal EOS or switch transports.
51static int64_t kStartupTimeoutUs = 10000000ll;
52
53static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
54
55namespace android {
56
57static void MakeUserAgentString(AString *s) {
58    s->setTo("stagefright/1.1 (Linux;Android ");
59
60#if (PROPERTY_VALUE_MAX < 8)
61#error "PROPERTY_VALUE_MAX must be at least 8"
62#endif
63
64    char value[PROPERTY_VALUE_MAX];
65    property_get("ro.build.version.release", value, "Unknown");
66    s->append(value);
67    s->append(")");
68}
69
70static bool GetAttribute(const char *s, const char *key, AString *value) {
71    value->clear();
72
73    size_t keyLen = strlen(key);
74
75    for (;;) {
76        while (isspace(*s)) {
77            ++s;
78        }
79
80        const char *colonPos = strchr(s, ';');
81
82        size_t len =
83            (colonPos == NULL) ? strlen(s) : colonPos - s;
84
85        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
86            value->setTo(&s[keyLen + 1], len - keyLen - 1);
87            return true;
88        }
89
90        if (colonPos == NULL) {
91            return false;
92        }
93
94        s = colonPos + 1;
95    }
96}
97
98struct MyHandler : public AHandler {
99    enum {
100        kWhatConnected                  = 'conn',
101        kWhatDisconnected               = 'disc',
102        kWhatSeekDone                   = 'sdon',
103
104        kWhatAccessUnit                 = 'accU',
105        kWhatEOS                        = 'eos!',
106        kWhatSeekDiscontinuity          = 'seeD',
107        kWhatNormalPlayTimeMapping      = 'nptM',
108    };
109
110    MyHandler(
111            const char *url,
112            const sp<AMessage> &notify,
113            bool uidValid = false, uid_t uid = 0)
114        : mNotify(notify),
115          mUIDValid(uidValid),
116          mUID(uid),
117          mNetLooper(new ALooper),
118          mConn(new ARTSPConnection(mUIDValid, mUID)),
119          mRTPConn(new ARTPConnection),
120          mOriginalSessionURL(url),
121          mSessionURL(url),
122          mSetupTracksSuccessful(false),
123          mSeekPending(false),
124          mFirstAccessUnit(true),
125          mNTPAnchorUs(-1),
126          mMediaAnchorUs(-1),
127          mLastMediaTimeUs(0),
128          mNumAccessUnitsReceived(0),
129          mCheckPending(false),
130          mCheckGeneration(0),
131          mTryTCPInterleaving(false),
132          mTryFakeRTCP(false),
133          mReceivedFirstRTCPPacket(false),
134          mReceivedFirstRTPPacket(false),
135          mSeekable(false),
136          mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
137          mKeepAliveGeneration(0) {
138        mNetLooper->setName("rtsp net");
139        mNetLooper->start(false /* runOnCallingThread */,
140                          false /* canCallJava */,
141                          PRIORITY_HIGHEST);
142
143        // Strip any authentication info from the session url, we don't
144        // want to transmit user/pass in cleartext.
145        AString host, path, user, pass;
146        unsigned port;
147        CHECK(ARTSPConnection::ParseURL(
148                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
149
150        if (user.size() > 0) {
151            mSessionURL.clear();
152            mSessionURL.append("rtsp://");
153            mSessionURL.append(host);
154            mSessionURL.append(":");
155            mSessionURL.append(StringPrintf("%u", port));
156            mSessionURL.append(path);
157
158            LOGI("rewritten session url: '%s'", mSessionURL.c_str());
159        }
160
161        mSessionHost = host;
162    }
163
164    void connect() {
165        looper()->registerHandler(mConn);
166        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
167
168        sp<AMessage> notify = new AMessage('biny', id());
169        mConn->observeBinaryData(notify);
170
171        sp<AMessage> reply = new AMessage('conn', id());
172        mConn->connect(mOriginalSessionURL.c_str(), reply);
173    }
174
175    void disconnect() {
176        (new AMessage('abor', id()))->post();
177    }
178
179    void seek(int64_t timeUs) {
180        sp<AMessage> msg = new AMessage('seek', id());
181        msg->setInt64("time", timeUs);
182        msg->post();
183    }
184
185    static void addRR(const sp<ABuffer> &buf) {
186        uint8_t *ptr = buf->data() + buf->size();
187        ptr[0] = 0x80 | 0;
188        ptr[1] = 201;  // RR
189        ptr[2] = 0;
190        ptr[3] = 1;
191        ptr[4] = 0xde;  // SSRC
192        ptr[5] = 0xad;
193        ptr[6] = 0xbe;
194        ptr[7] = 0xef;
195
196        buf->setRange(0, buf->size() + 8);
197    }
198
199    static void addSDES(int s, const sp<ABuffer> &buffer) {
200        struct sockaddr_in addr;
201        socklen_t addrSize = sizeof(addr);
202        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
203
204        uint8_t *data = buffer->data() + buffer->size();
205        data[0] = 0x80 | 1;
206        data[1] = 202;  // SDES
207        data[4] = 0xde;  // SSRC
208        data[5] = 0xad;
209        data[6] = 0xbe;
210        data[7] = 0xef;
211
212        size_t offset = 8;
213
214        data[offset++] = 1;  // CNAME
215
216        AString cname = "stagefright@";
217        cname.append(inet_ntoa(addr.sin_addr));
218        data[offset++] = cname.size();
219
220        memcpy(&data[offset], cname.c_str(), cname.size());
221        offset += cname.size();
222
223        data[offset++] = 6;  // TOOL
224
225        AString tool;
226        MakeUserAgentString(&tool);
227
228        data[offset++] = tool.size();
229
230        memcpy(&data[offset], tool.c_str(), tool.size());
231        offset += tool.size();
232
233        data[offset++] = 0;
234
235        if ((offset % 4) > 0) {
236            size_t count = 4 - (offset % 4);
237            switch (count) {
238                case 3:
239                    data[offset++] = 0;
240                case 2:
241                    data[offset++] = 0;
242                case 1:
243                    data[offset++] = 0;
244            }
245        }
246
247        size_t numWords = (offset / 4) - 1;
248        data[2] = numWords >> 8;
249        data[3] = numWords & 0xff;
250
251        buffer->setRange(buffer->offset(), buffer->size() + offset);
252    }
253
254    // In case we're behind NAT, fire off two UDP packets to the remote
255    // rtp/rtcp ports to poke a hole into the firewall for future incoming
256    // packets. We're going to send an RR/SDES RTCP packet to both of them.
257    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
258        struct sockaddr_in addr;
259        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
260        addr.sin_family = AF_INET;
261
262        AString source;
263        AString server_port;
264        if (!GetAttribute(transport.c_str(),
265                          "source",
266                          &source)) {
267            LOGW("Missing 'source' field in Transport response. Using "
268                 "RTSP endpoint address.");
269
270            struct hostent *ent = gethostbyname(mSessionHost.c_str());
271            if (ent == NULL) {
272                LOGE("Failed to look up address of session host '%s'",
273                     mSessionHost.c_str());
274
275                return false;
276            }
277
278            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
279        } else {
280            addr.sin_addr.s_addr = inet_addr(source.c_str());
281        }
282
283        if (!GetAttribute(transport.c_str(),
284                                 "server_port",
285                                 &server_port)) {
286            LOGI("Missing 'server_port' field in Transport response.");
287            return false;
288        }
289
290        int rtpPort, rtcpPort;
291        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
292                || rtpPort <= 0 || rtpPort > 65535
293                || rtcpPort <=0 || rtcpPort > 65535
294                || rtcpPort != rtpPort + 1) {
295            LOGE("Server picked invalid RTP/RTCP port pair %s,"
296                 " RTP port must be even, RTCP port must be one higher.",
297                 server_port.c_str());
298
299            return false;
300        }
301
302        if (rtpPort & 1) {
303            LOGW("Server picked an odd RTP port, it should've picked an "
304                 "even one, we'll let it pass for now, but this may break "
305                 "in the future.");
306        }
307
308        if (addr.sin_addr.s_addr == INADDR_NONE) {
309            return true;
310        }
311
312        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
313            // No firewalls to traverse on the loopback interface.
314            return true;
315        }
316
317        // Make up an RR/SDES RTCP packet.
318        sp<ABuffer> buf = new ABuffer(65536);
319        buf->setRange(0, 0);
320        addRR(buf);
321        addSDES(rtpSocket, buf);
322
323        addr.sin_port = htons(rtpPort);
324
325        ssize_t n = sendto(
326                rtpSocket, buf->data(), buf->size(), 0,
327                (const sockaddr *)&addr, sizeof(addr));
328
329        if (n < (ssize_t)buf->size()) {
330            LOGE("failed to poke a hole for RTP packets");
331            return false;
332        }
333
334        addr.sin_port = htons(rtcpPort);
335
336        n = sendto(
337                rtcpSocket, buf->data(), buf->size(), 0,
338                (const sockaddr *)&addr, sizeof(addr));
339
340        if (n < (ssize_t)buf->size()) {
341            LOGE("failed to poke a hole for RTCP packets");
342            return false;
343        }
344
345        ALOGV("successfully poked holes.");
346
347        return true;
348    }
349
350    virtual void onMessageReceived(const sp<AMessage> &msg) {
351        switch (msg->what()) {
352            case 'conn':
353            {
354                int32_t result;
355                CHECK(msg->findInt32("result", &result));
356
357                LOGI("connection request completed with result %d (%s)",
358                     result, strerror(-result));
359
360                if (result == OK) {
361                    AString request;
362                    request = "DESCRIBE ";
363                    request.append(mSessionURL);
364                    request.append(" RTSP/1.0\r\n");
365                    request.append("Accept: application/sdp\r\n");
366                    request.append("\r\n");
367
368                    sp<AMessage> reply = new AMessage('desc', id());
369                    mConn->sendRequest(request.c_str(), reply);
370                } else {
371                    (new AMessage('disc', id()))->post();
372                }
373                break;
374            }
375
376            case 'disc':
377            {
378                ++mKeepAliveGeneration;
379
380                int32_t reconnect;
381                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
382                    sp<AMessage> reply = new AMessage('conn', id());
383                    mConn->connect(mOriginalSessionURL.c_str(), reply);
384                } else {
385                    (new AMessage('quit', id()))->post();
386                }
387                break;
388            }
389
390            case 'desc':
391            {
392                int32_t result;
393                CHECK(msg->findInt32("result", &result));
394
395                LOGI("DESCRIBE completed with result %d (%s)",
396                     result, strerror(-result));
397
398                if (result == OK) {
399                    sp<RefBase> obj;
400                    CHECK(msg->findObject("response", &obj));
401                    sp<ARTSPResponse> response =
402                        static_cast<ARTSPResponse *>(obj.get());
403
404                    if (response->mStatusCode == 302) {
405                        ssize_t i = response->mHeaders.indexOfKey("location");
406                        CHECK_GE(i, 0);
407
408                        mSessionURL = response->mHeaders.valueAt(i);
409
410                        AString request;
411                        request = "DESCRIBE ";
412                        request.append(mSessionURL);
413                        request.append(" RTSP/1.0\r\n");
414                        request.append("Accept: application/sdp\r\n");
415                        request.append("\r\n");
416
417                        sp<AMessage> reply = new AMessage('desc', id());
418                        mConn->sendRequest(request.c_str(), reply);
419                        break;
420                    }
421
422                    if (response->mStatusCode != 200) {
423                        result = UNKNOWN_ERROR;
424                    } else {
425                        mSessionDesc = new ASessionDescription;
426
427                        mSessionDesc->setTo(
428                                response->mContent->data(),
429                                response->mContent->size());
430
431                        if (!mSessionDesc->isValid()) {
432                            LOGE("Failed to parse session description.");
433                            result = ERROR_MALFORMED;
434                        } else {
435                            ssize_t i = response->mHeaders.indexOfKey("content-base");
436                            if (i >= 0) {
437                                mBaseURL = response->mHeaders.valueAt(i);
438                            } else {
439                                i = response->mHeaders.indexOfKey("content-location");
440                                if (i >= 0) {
441                                    mBaseURL = response->mHeaders.valueAt(i);
442                                } else {
443                                    mBaseURL = mSessionURL;
444                                }
445                            }
446
447                            if (!mBaseURL.startsWith("rtsp://")) {
448                                // Some misbehaving servers specify a relative
449                                // URL in one of the locations above, combine
450                                // it with the absolute session URL to get
451                                // something usable...
452
453                                LOGW("Server specified a non-absolute base URL"
454                                     ", combining it with the session URL to "
455                                     "get something usable...");
456
457                                AString tmp;
458                                CHECK(MakeURL(
459                                            mSessionURL.c_str(),
460                                            mBaseURL.c_str(),
461                                            &tmp));
462
463                                mBaseURL = tmp;
464                            }
465
466                            if (mSessionDesc->countTracks() < 2) {
467                                // There's no actual tracks in this session.
468                                // The first "track" is merely session meta
469                                // data.
470
471                                LOGW("Session doesn't contain any playable "
472                                     "tracks. Aborting.");
473                                result = ERROR_UNSUPPORTED;
474                            } else {
475                                setupTrack(1);
476                            }
477                        }
478                    }
479                }
480
481                if (result != OK) {
482                    sp<AMessage> reply = new AMessage('disc', id());
483                    mConn->disconnect(reply);
484                }
485                break;
486            }
487
488            case 'setu':
489            {
490                size_t index;
491                CHECK(msg->findSize("index", &index));
492
493                TrackInfo *track = NULL;
494                size_t trackIndex;
495                if (msg->findSize("track-index", &trackIndex)) {
496                    track = &mTracks.editItemAt(trackIndex);
497                }
498
499                int32_t result;
500                CHECK(msg->findInt32("result", &result));
501
502                LOGI("SETUP(%d) completed with result %d (%s)",
503                     index, result, strerror(-result));
504
505                if (result == OK) {
506                    CHECK(track != NULL);
507
508                    sp<RefBase> obj;
509                    CHECK(msg->findObject("response", &obj));
510                    sp<ARTSPResponse> response =
511                        static_cast<ARTSPResponse *>(obj.get());
512
513                    if (response->mStatusCode != 200) {
514                        result = UNKNOWN_ERROR;
515                    } else {
516                        ssize_t i = response->mHeaders.indexOfKey("session");
517                        CHECK_GE(i, 0);
518
519                        mSessionID = response->mHeaders.valueAt(i);
520
521                        mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
522                        AString timeoutStr;
523                        if (GetAttribute(
524                                    mSessionID.c_str(), "timeout", &timeoutStr)) {
525                            char *end;
526                            unsigned long timeoutSecs =
527                                strtoul(timeoutStr.c_str(), &end, 10);
528
529                            if (end == timeoutStr.c_str() || *end != '\0') {
530                                LOGW("server specified malformed timeout '%s'",
531                                     timeoutStr.c_str());
532
533                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
534                            } else if (timeoutSecs < 15) {
535                                LOGW("server specified too short a timeout "
536                                     "(%lu secs), using default.",
537                                     timeoutSecs);
538
539                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
540                            } else {
541                                mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
542
543                                LOGI("server specified timeout of %lu secs.",
544                                     timeoutSecs);
545                            }
546                        }
547
548                        i = mSessionID.find(";");
549                        if (i >= 0) {
550                            // Remove options, i.e. ";timeout=90"
551                            mSessionID.erase(i, mSessionID.size() - i);
552                        }
553
554                        sp<AMessage> notify = new AMessage('accu', id());
555                        notify->setSize("track-index", trackIndex);
556
557                        i = response->mHeaders.indexOfKey("transport");
558                        CHECK_GE(i, 0);
559
560                        if (!track->mUsingInterleavedTCP) {
561                            AString transport = response->mHeaders.valueAt(i);
562
563                            // We are going to continue even if we were
564                            // unable to poke a hole into the firewall...
565                            pokeAHole(
566                                    track->mRTPSocket,
567                                    track->mRTCPSocket,
568                                    transport);
569                        }
570
571                        mRTPConn->addStream(
572                                track->mRTPSocket, track->mRTCPSocket,
573                                mSessionDesc, index,
574                                notify, track->mUsingInterleavedTCP);
575
576                        mSetupTracksSuccessful = true;
577                    }
578                }
579
580                if (result != OK) {
581                    if (track) {
582                        if (!track->mUsingInterleavedTCP) {
583                            // Clear the tag
584                            if (mUIDValid) {
585                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
586                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
587                            }
588
589                            close(track->mRTPSocket);
590                            close(track->mRTCPSocket);
591                        }
592
593                        mTracks.removeItemsAt(trackIndex);
594                    }
595                }
596
597                ++index;
598                if (index < mSessionDesc->countTracks()) {
599                    setupTrack(index);
600                } else if (mSetupTracksSuccessful) {
601                    ++mKeepAliveGeneration;
602                    postKeepAlive();
603
604                    AString request = "PLAY ";
605                    request.append(mSessionURL);
606                    request.append(" RTSP/1.0\r\n");
607
608                    request.append("Session: ");
609                    request.append(mSessionID);
610                    request.append("\r\n");
611
612                    request.append("\r\n");
613
614                    sp<AMessage> reply = new AMessage('play', id());
615                    mConn->sendRequest(request.c_str(), reply);
616                } else {
617                    sp<AMessage> reply = new AMessage('disc', id());
618                    mConn->disconnect(reply);
619                }
620                break;
621            }
622
623            case 'play':
624            {
625                int32_t result;
626                CHECK(msg->findInt32("result", &result));
627
628                LOGI("PLAY completed with result %d (%s)",
629                     result, strerror(-result));
630
631                if (result == OK) {
632                    sp<RefBase> obj;
633                    CHECK(msg->findObject("response", &obj));
634                    sp<ARTSPResponse> response =
635                        static_cast<ARTSPResponse *>(obj.get());
636
637                    if (response->mStatusCode != 200) {
638                        result = UNKNOWN_ERROR;
639                    } else {
640                        parsePlayResponse(response);
641
642                        sp<AMessage> timeout = new AMessage('tiou', id());
643                        timeout->post(kStartupTimeoutUs);
644                    }
645                }
646
647                if (result != OK) {
648                    sp<AMessage> reply = new AMessage('disc', id());
649                    mConn->disconnect(reply);
650                }
651
652                break;
653            }
654
655            case 'aliv':
656            {
657                int32_t generation;
658                CHECK(msg->findInt32("generation", &generation));
659
660                if (generation != mKeepAliveGeneration) {
661                    // obsolete event.
662                    break;
663                }
664
665                AString request;
666                request.append("OPTIONS ");
667                request.append(mSessionURL);
668                request.append(" RTSP/1.0\r\n");
669                request.append("Session: ");
670                request.append(mSessionID);
671                request.append("\r\n");
672                request.append("\r\n");
673
674                sp<AMessage> reply = new AMessage('opts', id());
675                reply->setInt32("generation", mKeepAliveGeneration);
676                mConn->sendRequest(request.c_str(), reply);
677                break;
678            }
679
680            case 'opts':
681            {
682                int32_t result;
683                CHECK(msg->findInt32("result", &result));
684
685                LOGI("OPTIONS completed with result %d (%s)",
686                     result, strerror(-result));
687
688                int32_t generation;
689                CHECK(msg->findInt32("generation", &generation));
690
691                if (generation != mKeepAliveGeneration) {
692                    // obsolete event.
693                    break;
694                }
695
696                postKeepAlive();
697                break;
698            }
699
700            case 'abor':
701            {
702                for (size_t i = 0; i < mTracks.size(); ++i) {
703                    TrackInfo *info = &mTracks.editItemAt(i);
704
705                    if (!mFirstAccessUnit) {
706                        postQueueEOS(i, ERROR_END_OF_STREAM);
707                    }
708
709                    if (!info->mUsingInterleavedTCP) {
710                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
711
712                        // Clear the tag
713                        if (mUIDValid) {
714                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
715                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
716                        }
717
718                        close(info->mRTPSocket);
719                        close(info->mRTCPSocket);
720                    }
721                }
722                mTracks.clear();
723                mSetupTracksSuccessful = false;
724                mSeekPending = false;
725                mFirstAccessUnit = true;
726                mNTPAnchorUs = -1;
727                mMediaAnchorUs = -1;
728                mNumAccessUnitsReceived = 0;
729                mReceivedFirstRTCPPacket = false;
730                mReceivedFirstRTPPacket = false;
731                mSeekable = false;
732
733                sp<AMessage> reply = new AMessage('tear', id());
734
735                int32_t reconnect;
736                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
737                    reply->setInt32("reconnect", true);
738                }
739
740                AString request;
741                request = "TEARDOWN ";
742
743                // XXX should use aggregate url from SDP here...
744                request.append(mSessionURL);
745                request.append(" RTSP/1.0\r\n");
746
747                request.append("Session: ");
748                request.append(mSessionID);
749                request.append("\r\n");
750
751                request.append("\r\n");
752
753                mConn->sendRequest(request.c_str(), reply);
754                break;
755            }
756
757            case 'tear':
758            {
759                int32_t result;
760                CHECK(msg->findInt32("result", &result));
761
762                LOGI("TEARDOWN completed with result %d (%s)",
763                     result, strerror(-result));
764
765                sp<AMessage> reply = new AMessage('disc', id());
766
767                int32_t reconnect;
768                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
769                    reply->setInt32("reconnect", true);
770                }
771
772                mConn->disconnect(reply);
773                break;
774            }
775
776            case 'quit':
777            {
778                sp<AMessage> msg = mNotify->dup();
779                msg->setInt32("what", kWhatDisconnected);
780                msg->setInt32("result", UNKNOWN_ERROR);
781                msg->post();
782                break;
783            }
784
785            case 'chek':
786            {
787                int32_t generation;
788                CHECK(msg->findInt32("generation", &generation));
789                if (generation != mCheckGeneration) {
790                    // This is an outdated message. Ignore.
791                    break;
792                }
793
794                if (mNumAccessUnitsReceived == 0) {
795#if 1
796                    LOGI("stream ended? aborting.");
797                    (new AMessage('abor', id()))->post();
798                    break;
799#else
800                    LOGI("haven't seen an AU in a looong time.");
801#endif
802                }
803
804                mNumAccessUnitsReceived = 0;
805                msg->post(kAccessUnitTimeoutUs);
806                break;
807            }
808
809            case 'accu':
810            {
811                int32_t timeUpdate;
812                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
813                    size_t trackIndex;
814                    CHECK(msg->findSize("track-index", &trackIndex));
815
816                    uint32_t rtpTime;
817                    uint64_t ntpTime;
818                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
819                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
820
821                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
822                    break;
823                }
824
825                int32_t first;
826                if (msg->findInt32("first-rtcp", &first)) {
827                    mReceivedFirstRTCPPacket = true;
828                    break;
829                }
830
831                if (msg->findInt32("first-rtp", &first)) {
832                    mReceivedFirstRTPPacket = true;
833                    break;
834                }
835
836                ++mNumAccessUnitsReceived;
837                postAccessUnitTimeoutCheck();
838
839                size_t trackIndex;
840                CHECK(msg->findSize("track-index", &trackIndex));
841
842                if (trackIndex >= mTracks.size()) {
843                    ALOGV("late packets ignored.");
844                    break;
845                }
846
847                TrackInfo *track = &mTracks.editItemAt(trackIndex);
848
849                int32_t eos;
850                if (msg->findInt32("eos", &eos)) {
851                    LOGI("received BYE on track index %d", trackIndex);
852#if 0
853                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
854#endif
855                    return;
856                }
857
858                sp<RefBase> obj;
859                CHECK(msg->findObject("access-unit", &obj));
860
861                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
862
863                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
864
865                if (mSeekPending) {
866                    ALOGV("we're seeking, dropping stale packet.");
867                    break;
868                }
869
870                if (seqNum < track->mFirstSeqNumInSegment) {
871                    ALOGV("dropping stale access-unit (%d < %d)",
872                         seqNum, track->mFirstSeqNumInSegment);
873                    break;
874                }
875
876                if (track->mNewSegment) {
877                    track->mNewSegment = false;
878                }
879
880                onAccessUnitComplete(trackIndex, accessUnit);
881                break;
882            }
883
884            case 'seek':
885            {
886                if (!mSeekable) {
887                    LOGW("This is a live stream, ignoring seek request.");
888
889                    sp<AMessage> msg = mNotify->dup();
890                    msg->setInt32("what", kWhatSeekDone);
891                    msg->post();
892                    break;
893                }
894
895                int64_t timeUs;
896                CHECK(msg->findInt64("time", &timeUs));
897
898                mSeekPending = true;
899
900                // Disable the access unit timeout until we resumed
901                // playback again.
902                mCheckPending = true;
903                ++mCheckGeneration;
904
905                AString request = "PAUSE ";
906                request.append(mSessionURL);
907                request.append(" RTSP/1.0\r\n");
908
909                request.append("Session: ");
910                request.append(mSessionID);
911                request.append("\r\n");
912
913                request.append("\r\n");
914
915                sp<AMessage> reply = new AMessage('see1', id());
916                reply->setInt64("time", timeUs);
917                mConn->sendRequest(request.c_str(), reply);
918                break;
919            }
920
921            case 'see1':
922            {
923                // Session is paused now.
924                for (size_t i = 0; i < mTracks.size(); ++i) {
925                    TrackInfo *info = &mTracks.editItemAt(i);
926
927                    postQueueSeekDiscontinuity(i);
928
929                    info->mRTPAnchor = 0;
930                    info->mNTPAnchorUs = -1;
931                }
932
933                mNTPAnchorUs = -1;
934
935                int64_t timeUs;
936                CHECK(msg->findInt64("time", &timeUs));
937
938                AString request = "PLAY ";
939                request.append(mSessionURL);
940                request.append(" RTSP/1.0\r\n");
941
942                request.append("Session: ");
943                request.append(mSessionID);
944                request.append("\r\n");
945
946                request.append(
947                        StringPrintf(
948                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
949
950                request.append("\r\n");
951
952                sp<AMessage> reply = new AMessage('see2', id());
953                mConn->sendRequest(request.c_str(), reply);
954                break;
955            }
956
957            case 'see2':
958            {
959                CHECK(mSeekPending);
960
961                int32_t result;
962                CHECK(msg->findInt32("result", &result));
963
964                LOGI("PLAY completed with result %d (%s)",
965                     result, strerror(-result));
966
967                mCheckPending = false;
968                postAccessUnitTimeoutCheck();
969
970                if (result == OK) {
971                    sp<RefBase> obj;
972                    CHECK(msg->findObject("response", &obj));
973                    sp<ARTSPResponse> response =
974                        static_cast<ARTSPResponse *>(obj.get());
975
976                    if (response->mStatusCode != 200) {
977                        result = UNKNOWN_ERROR;
978                    } else {
979                        parsePlayResponse(response);
980
981                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
982                        CHECK_GE(i, 0);
983
984                        ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
985
986                        LOGI("seek completed.");
987                    }
988                }
989
990                if (result != OK) {
991                    LOGE("seek failed, aborting.");
992                    (new AMessage('abor', id()))->post();
993                }
994
995                mSeekPending = false;
996
997                sp<AMessage> msg = mNotify->dup();
998                msg->setInt32("what", kWhatSeekDone);
999                msg->post();
1000                break;
1001            }
1002
1003            case 'biny':
1004            {
1005                sp<RefBase> obj;
1006                CHECK(msg->findObject("buffer", &obj));
1007                sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
1008
1009                int32_t index;
1010                CHECK(buffer->meta()->findInt32("index", &index));
1011
1012                mRTPConn->injectPacket(index, buffer);
1013                break;
1014            }
1015
1016            case 'tiou':
1017            {
1018                if (!mReceivedFirstRTCPPacket) {
1019                    if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
1020                        LOGW("We received RTP packets but no RTCP packets, "
1021                             "using fake timestamps.");
1022
1023                        mTryFakeRTCP = true;
1024
1025                        mReceivedFirstRTCPPacket = true;
1026
1027                        fakeTimestamps();
1028                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1029                        LOGW("Never received any data, switching transports.");
1030
1031                        mTryTCPInterleaving = true;
1032
1033                        sp<AMessage> msg = new AMessage('abor', id());
1034                        msg->setInt32("reconnect", true);
1035                        msg->post();
1036                    } else {
1037                        LOGW("Never received any data, disconnecting.");
1038                        (new AMessage('abor', id()))->post();
1039                    }
1040                }
1041                break;
1042            }
1043
1044            default:
1045                TRESPASS();
1046                break;
1047        }
1048    }
1049
1050    void postKeepAlive() {
1051        sp<AMessage> msg = new AMessage('aliv', id());
1052        msg->setInt32("generation", mKeepAliveGeneration);
1053        msg->post((mKeepAliveTimeoutUs * 9) / 10);
1054    }
1055
1056    void postAccessUnitTimeoutCheck() {
1057        if (mCheckPending) {
1058            return;
1059        }
1060
1061        mCheckPending = true;
1062        sp<AMessage> check = new AMessage('chek', id());
1063        check->setInt32("generation", mCheckGeneration);
1064        check->post(kAccessUnitTimeoutUs);
1065    }
1066
1067    static void SplitString(
1068            const AString &s, const char *separator, List<AString> *items) {
1069        items->clear();
1070        size_t start = 0;
1071        while (start < s.size()) {
1072            ssize_t offset = s.find(separator, start);
1073
1074            if (offset < 0) {
1075                items->push_back(AString(s, start, s.size() - start));
1076                break;
1077            }
1078
1079            items->push_back(AString(s, start, offset - start));
1080            start = offset + strlen(separator);
1081        }
1082    }
1083
1084    void parsePlayResponse(const sp<ARTSPResponse> &response) {
1085        mSeekable = false;
1086
1087        ssize_t i = response->mHeaders.indexOfKey("range");
1088        if (i < 0) {
1089            // Server doesn't even tell use what range it is going to
1090            // play, therefore we won't support seeking.
1091            return;
1092        }
1093
1094        AString range = response->mHeaders.valueAt(i);
1095        ALOGV("Range: %s", range.c_str());
1096
1097        AString val;
1098        CHECK(GetAttribute(range.c_str(), "npt", &val));
1099
1100        float npt1, npt2;
1101        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1102            // This is a live stream and therefore not seekable.
1103
1104            LOGI("This is a live stream");
1105            return;
1106        }
1107
1108        i = response->mHeaders.indexOfKey("rtp-info");
1109        CHECK_GE(i, 0);
1110
1111        AString rtpInfo = response->mHeaders.valueAt(i);
1112        List<AString> streamInfos;
1113        SplitString(rtpInfo, ",", &streamInfos);
1114
1115        int n = 1;
1116        for (List<AString>::iterator it = streamInfos.begin();
1117             it != streamInfos.end(); ++it) {
1118            (*it).trim();
1119            ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1120
1121            CHECK(GetAttribute((*it).c_str(), "url", &val));
1122
1123            size_t trackIndex = 0;
1124            while (trackIndex < mTracks.size()
1125                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1126                ++trackIndex;
1127            }
1128            CHECK_LT(trackIndex, mTracks.size());
1129
1130            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1131
1132            char *end;
1133            unsigned long seq = strtoul(val.c_str(), &end, 10);
1134
1135            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1136            info->mFirstSeqNumInSegment = seq;
1137            info->mNewSegment = true;
1138
1139            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1140
1141            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1142
1143            ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1144
1145            info->mNormalPlayTimeRTP = rtpTime;
1146            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1147
1148            if (!mFirstAccessUnit) {
1149                postNormalPlayTimeMapping(
1150                        trackIndex,
1151                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1152            }
1153
1154            ++n;
1155        }
1156
1157        mSeekable = true;
1158    }
1159
1160    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1161        CHECK_GE(index, 0u);
1162        CHECK_LT(index, mTracks.size());
1163
1164        const TrackInfo &info = mTracks.itemAt(index);
1165
1166        *timeScale = info.mTimeScale;
1167
1168        return info.mPacketSource->getFormat();
1169    }
1170
1171    size_t countTracks() const {
1172        return mTracks.size();
1173    }
1174
1175private:
1176    struct TrackInfo {
1177        AString mURL;
1178        int mRTPSocket;
1179        int mRTCPSocket;
1180        bool mUsingInterleavedTCP;
1181        uint32_t mFirstSeqNumInSegment;
1182        bool mNewSegment;
1183
1184        uint32_t mRTPAnchor;
1185        int64_t mNTPAnchorUs;
1186        int32_t mTimeScale;
1187
1188        uint32_t mNormalPlayTimeRTP;
1189        int64_t mNormalPlayTimeUs;
1190
1191        sp<APacketSource> mPacketSource;
1192
1193        // Stores packets temporarily while no notion of time
1194        // has been established yet.
1195        List<sp<ABuffer> > mPackets;
1196    };
1197
1198    sp<AMessage> mNotify;
1199    bool mUIDValid;
1200    uid_t mUID;
1201    sp<ALooper> mNetLooper;
1202    sp<ARTSPConnection> mConn;
1203    sp<ARTPConnection> mRTPConn;
1204    sp<ASessionDescription> mSessionDesc;
1205    AString mOriginalSessionURL;  // This one still has user:pass@
1206    AString mSessionURL;
1207    AString mSessionHost;
1208    AString mBaseURL;
1209    AString mSessionID;
1210    bool mSetupTracksSuccessful;
1211    bool mSeekPending;
1212    bool mFirstAccessUnit;
1213
1214    int64_t mNTPAnchorUs;
1215    int64_t mMediaAnchorUs;
1216    int64_t mLastMediaTimeUs;
1217
1218    int64_t mNumAccessUnitsReceived;
1219    bool mCheckPending;
1220    int32_t mCheckGeneration;
1221    bool mTryTCPInterleaving;
1222    bool mTryFakeRTCP;
1223    bool mReceivedFirstRTCPPacket;
1224    bool mReceivedFirstRTPPacket;
1225    bool mSeekable;
1226    int64_t mKeepAliveTimeoutUs;
1227    int32_t mKeepAliveGeneration;
1228
1229    Vector<TrackInfo> mTracks;
1230
1231    void setupTrack(size_t index) {
1232        sp<APacketSource> source =
1233            new APacketSource(mSessionDesc, index);
1234
1235        if (source->initCheck() != OK) {
1236            LOGW("Unsupported format. Ignoring track #%d.", index);
1237
1238            sp<AMessage> reply = new AMessage('setu', id());
1239            reply->setSize("index", index);
1240            reply->setInt32("result", ERROR_UNSUPPORTED);
1241            reply->post();
1242            return;
1243        }
1244
1245        AString url;
1246        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1247
1248        AString trackURL;
1249        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1250
1251        mTracks.push(TrackInfo());
1252        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1253        info->mURL = trackURL;
1254        info->mPacketSource = source;
1255        info->mUsingInterleavedTCP = false;
1256        info->mFirstSeqNumInSegment = 0;
1257        info->mNewSegment = true;
1258        info->mRTPAnchor = 0;
1259        info->mNTPAnchorUs = -1;
1260        info->mNormalPlayTimeRTP = 0;
1261        info->mNormalPlayTimeUs = 0ll;
1262
1263        unsigned long PT;
1264        AString formatDesc;
1265        AString formatParams;
1266        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1267
1268        int32_t timescale;
1269        int32_t numChannels;
1270        ASessionDescription::ParseFormatDesc(
1271                formatDesc.c_str(), &timescale, &numChannels);
1272
1273        info->mTimeScale = timescale;
1274
1275        ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1276
1277        AString request = "SETUP ";
1278        request.append(trackURL);
1279        request.append(" RTSP/1.0\r\n");
1280
1281        if (mTryTCPInterleaving) {
1282            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1283            info->mUsingInterleavedTCP = true;
1284            info->mRTPSocket = interleaveIndex;
1285            info->mRTCPSocket = interleaveIndex + 1;
1286
1287            request.append("Transport: RTP/AVP/TCP;interleaved=");
1288            request.append(interleaveIndex);
1289            request.append("-");
1290            request.append(interleaveIndex + 1);
1291        } else {
1292            unsigned rtpPort;
1293            ARTPConnection::MakePortPair(
1294                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1295
1296            if (mUIDValid) {
1297                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1298                                                (uint32_t)*(uint32_t*) "RTP_");
1299                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1300                                                (uint32_t)*(uint32_t*) "RTP_");
1301            }
1302
1303            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1304            request.append(rtpPort);
1305            request.append("-");
1306            request.append(rtpPort + 1);
1307        }
1308
1309        request.append("\r\n");
1310
1311        if (index > 1) {
1312            request.append("Session: ");
1313            request.append(mSessionID);
1314            request.append("\r\n");
1315        }
1316
1317        request.append("\r\n");
1318
1319        sp<AMessage> reply = new AMessage('setu', id());
1320        reply->setSize("index", index);
1321        reply->setSize("track-index", mTracks.size() - 1);
1322        mConn->sendRequest(request.c_str(), reply);
1323    }
1324
1325    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1326        out->clear();
1327
1328        if (strncasecmp("rtsp://", baseURL, 7)) {
1329            // Base URL must be absolute
1330            return false;
1331        }
1332
1333        if (!strncasecmp("rtsp://", url, 7)) {
1334            // "url" is already an absolute URL, ignore base URL.
1335            out->setTo(url);
1336            return true;
1337        }
1338
1339        size_t n = strlen(baseURL);
1340        if (baseURL[n - 1] == '/') {
1341            out->setTo(baseURL);
1342            out->append(url);
1343        } else {
1344            const char *slashPos = strrchr(baseURL, '/');
1345
1346            if (slashPos > &baseURL[6]) {
1347                out->setTo(baseURL, slashPos - baseURL);
1348            } else {
1349                out->setTo(baseURL);
1350            }
1351
1352            out->append("/");
1353            out->append(url);
1354        }
1355
1356        return true;
1357    }
1358
1359    void fakeTimestamps() {
1360        for (size_t i = 0; i < mTracks.size(); ++i) {
1361            onTimeUpdate(i, 0, 0ll);
1362        }
1363    }
1364
1365    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1366        ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1367             trackIndex, rtpTime, ntpTime);
1368
1369        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1370
1371        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1372
1373        track->mRTPAnchor = rtpTime;
1374        track->mNTPAnchorUs = ntpTimeUs;
1375
1376        if (mNTPAnchorUs < 0) {
1377            mNTPAnchorUs = ntpTimeUs;
1378            mMediaAnchorUs = mLastMediaTimeUs;
1379        }
1380    }
1381
1382    void onAccessUnitComplete(
1383            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1384        ALOGV("onAccessUnitComplete track %d", trackIndex);
1385
1386        if (mFirstAccessUnit) {
1387            sp<AMessage> msg = mNotify->dup();
1388            msg->setInt32("what", kWhatConnected);
1389            msg->post();
1390
1391            if (mSeekable) {
1392                for (size_t i = 0; i < mTracks.size(); ++i) {
1393                    TrackInfo *info = &mTracks.editItemAt(i);
1394
1395                    postNormalPlayTimeMapping(
1396                            i,
1397                            info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1398                }
1399            }
1400
1401            mFirstAccessUnit = false;
1402        }
1403
1404        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1405
1406        if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) {
1407            ALOGV("storing accessUnit, no time established yet");
1408            track->mPackets.push_back(accessUnit);
1409            return;
1410        }
1411
1412        while (!track->mPackets.empty()) {
1413            sp<ABuffer> accessUnit = *track->mPackets.begin();
1414            track->mPackets.erase(track->mPackets.begin());
1415
1416            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1417                postQueueAccessUnit(trackIndex, accessUnit);
1418            }
1419        }
1420
1421        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1422            postQueueAccessUnit(trackIndex, accessUnit);
1423        }
1424    }
1425
1426    bool addMediaTimestamp(
1427            int32_t trackIndex, const TrackInfo *track,
1428            const sp<ABuffer> &accessUnit) {
1429        uint32_t rtpTime;
1430        CHECK(accessUnit->meta()->findInt32(
1431                    "rtp-time", (int32_t *)&rtpTime));
1432
1433        int64_t relRtpTimeUs =
1434            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1435                / track->mTimeScale;
1436
1437        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1438
1439        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1440
1441        if (mediaTimeUs > mLastMediaTimeUs) {
1442            mLastMediaTimeUs = mediaTimeUs;
1443        }
1444
1445        if (mediaTimeUs < 0) {
1446            ALOGV("dropping early accessUnit.");
1447            return false;
1448        }
1449
1450        ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1451             trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1452
1453        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1454
1455        return true;
1456    }
1457
1458    void postQueueAccessUnit(
1459            size_t trackIndex, const sp<ABuffer> &accessUnit) {
1460        sp<AMessage> msg = mNotify->dup();
1461        msg->setInt32("what", kWhatAccessUnit);
1462        msg->setSize("trackIndex", trackIndex);
1463        msg->setObject("accessUnit", accessUnit);
1464        msg->post();
1465    }
1466
1467    void postQueueEOS(size_t trackIndex, status_t finalResult) {
1468        sp<AMessage> msg = mNotify->dup();
1469        msg->setInt32("what", kWhatEOS);
1470        msg->setSize("trackIndex", trackIndex);
1471        msg->setInt32("finalResult", finalResult);
1472        msg->post();
1473    }
1474
1475    void postQueueSeekDiscontinuity(size_t trackIndex) {
1476        sp<AMessage> msg = mNotify->dup();
1477        msg->setInt32("what", kWhatSeekDiscontinuity);
1478        msg->setSize("trackIndex", trackIndex);
1479        msg->post();
1480    }
1481
1482    void postNormalPlayTimeMapping(
1483            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1484        sp<AMessage> msg = mNotify->dup();
1485        msg->setInt32("what", kWhatNormalPlayTimeMapping);
1486        msg->setSize("trackIndex", trackIndex);
1487        msg->setInt32("rtpTime", rtpTime);
1488        msg->setInt64("nptUs", nptUs);
1489        msg->post();
1490    }
1491
1492    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1493};
1494
1495}  // namespace android
1496
1497#endif  // MY_HANDLER_H_
1498