MyHandler.h revision 3856b090cd04ba5dd4a59a12430ed724d5995909
1c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru/*
2c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru * Copyright (C) 2010 The Android Open Source Project
3c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru *
4c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru * Licensed under the Apache License, Version 2.0 (the "License");
5c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru * you may not use this file except in compliance with the License.
692a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev * You may obtain a copy of the License at
792a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev *
892a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev *      http://www.apache.org/licenses/LICENSE-2.0
992a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev *
1092a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev * Unless required by applicable law or agreed to in writing, software
1192a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev * distributed under the License is distributed on an "AS IS" BASIS,
1292a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1392a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev * See the License for the specific language governing permissions and
1492a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev * limitations under the License.
1592a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev */
1692a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev
1792a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev#ifndef MY_HANDLER_H_
18c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
19c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#define MY_HANDLER_H_
20c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
21c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru//#define LOG_NDEBUG 0
22c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#define LOG_TAG "MyHandler"
23c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include <utils/Log.h>
24c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
25c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include "APacketSource.h"
26c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include "ARTPConnection.h"
27c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include "ARTSPConnection.h"
28c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include "ASessionDescription.h"
29c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
30c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include <ctype.h>
31c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include <cutils/properties.h>
32c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
33c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include <media/stagefright/foundation/ABuffer.h>
34c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include <media/stagefright/foundation/ADebug.h>
35c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include <media/stagefright/foundation/ALooper.h>
36c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include <media/stagefright/foundation/AMessage.h>
37c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include <media/stagefright/MediaErrors.h>
38c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
39c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include <arpa/inet.h>
4092a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev#include <sys/socket.h>
4192a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev#include <netdb.h>
42c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
43c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#include "HTTPBase.h"
44c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
45c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru// If no access units are received within 5 secs, assume that the rtp
46c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru// stream has ended and signal end of stream.
47c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Querustatic int64_t kAccessUnitTimeoutUs = 5000000ll;
48c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
49c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru// If no access units arrive for the first 10 secs after starting the
50c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru// stream, assume none ever will and signal EOS or switch transports.
51c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Querustatic int64_t kStartupTimeoutUs = 10000000ll;
52c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
53c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Querunamespace android {
54c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
55c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Querustatic void MakeUserAgentString(AString *s) {
56c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    s->setTo("stagefright/1.1 (Linux;Android ");
57c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
58c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#if (PROPERTY_VALUE_MAX < 8)
59c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#error "PROPERTY_VALUE_MAX must be at least 8"
60c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru#endif
6192a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev
6292a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev    char value[PROPERTY_VALUE_MAX];
6392a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev    property_get("ro.build.version.release", value, "Unknown");
6492a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev    s->append(value);
6592a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev    s->append(")");
6692a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev}
6792a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev
6892a0538051ff3deeda67521c966f476bed90d23bIliyan Malchevstatic bool GetAttribute(const char *s, const char *key, AString *value) {
6992a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev    value->clear();
7092a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev
7192a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev    size_t keyLen = strlen(key);
7292a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev
7392a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev    for (;;) {
7492a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev        while (isspace(*s)) {
7592a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev            ++s;
7692a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev        }
7792a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev
7892a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev        const char *colonPos = strchr(s, ';');
7992a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev
8092a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev        size_t len =
8192a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev            (colonPos == NULL) ? strlen(s) : colonPos - s;
8292a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev
8392a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
8492a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev            value->setTo(&s[keyLen + 1], len - keyLen - 1);
8592a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev            return true;
8692a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev        }
87c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
88c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        if (colonPos == NULL) {
89c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru            return false;
90c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        }
91c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
92c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        s = colonPos + 1;
93c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    }
94c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru}
95c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
96c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Querustruct MyHandler : public AHandler {
97c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    enum {
98c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        kWhatConnected                  = 'conn',
99c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        kWhatDisconnected               = 'disc',
100c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        kWhatSeekDone                   = 'sdon',
101c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
102c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        kWhatAccessUnit                 = 'accU',
103c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        kWhatEOS                        = 'eos!',
104c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        kWhatSeekDiscontinuity          = 'seeD',
105c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        kWhatNormalPlayTimeMapping      = 'nptM',
106c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    };
107c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
108c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    MyHandler(
109c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru            const char *url,
11092a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev            const sp<AMessage> &notify,
11192a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev            bool uidValid = false, uid_t uid = 0)
11292a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev        : mNotify(notify),
11392a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mUIDValid(uidValid),
11492a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mUID(uid),
11592a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mNetLooper(new ALooper),
11692a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mConn(new ARTSPConnection(mUIDValid, mUID)),
11792a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mRTPConn(new ARTPConnection),
11892a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mOriginalSessionURL(url),
11992a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mSessionURL(url),
12092a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mSetupTracksSuccessful(false),
12192a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mSeekPending(false),
12292a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mFirstAccessUnit(true),
12392a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mNTPAnchorUs(-1),
12492a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mMediaAnchorUs(-1),
12592a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mLastMediaTimeUs(0),
12692a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mNumAccessUnitsReceived(0),
12792a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mCheckPending(false),
12892a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mCheckGeneration(0),
12992a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mTryTCPInterleaving(false),
13092a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mTryFakeRTCP(false),
13192a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev          mReceivedFirstRTCPPacket(false),
132c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru          mReceivedFirstRTPPacket(false),
133c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru          mSeekable(false) {
134c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        mNetLooper->setName("rtsp net");
135c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        mNetLooper->start(false /* runOnCallingThread */,
136c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru                          false /* canCallJava */,
137c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru                          PRIORITY_HIGHEST);
138c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
139c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        // Strip any authentication info from the session url, we don't
140c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        // want to transmit user/pass in cleartext.
141c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        AString host, path, user, pass;
142c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        unsigned port;
143c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        CHECK(ARTSPConnection::ParseURL(
144c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
145c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
146c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        if (user.size() > 0) {
147c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru            mSessionURL.clear();
148c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru            mSessionURL.append("rtsp://");
149c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru            mSessionURL.append(host);
150c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru            mSessionURL.append(":");
151c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru            mSessionURL.append(StringPrintf("%u", port));
152c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru            mSessionURL.append(path);
153c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
154c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru            LOGI("rewritten session url: '%s'", mSessionURL.c_str());
155c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        }
156c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
157c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        mSessionHost = host;
158c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    }
159c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
160c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    void connect() {
161c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        looper()->registerHandler(mConn);
162c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
163c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
164c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        sp<AMessage> notify = new AMessage('biny', id());
165c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        mConn->observeBinaryData(notify);
166c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
167c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        sp<AMessage> reply = new AMessage('conn', id());
168c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        mConn->connect(mOriginalSessionURL.c_str(), reply);
169c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    }
17092a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev
17192a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev    void disconnect() {
17292a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev        (new AMessage('abor', id()))->post();
173c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    }
174c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
175c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    void seek(int64_t timeUs) {
176c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        sp<AMessage> msg = new AMessage('seek', id());
177c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        msg->setInt64("time", timeUs);
178c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        msg->post();
179c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    }
180c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
181c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    static void addRR(const sp<ABuffer> &buf) {
182c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        uint8_t *ptr = buf->data() + buf->size();
183c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        ptr[0] = 0x80 | 0;
184c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        ptr[1] = 201;  // RR
185c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        ptr[2] = 0;
186c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        ptr[3] = 1;
18792a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev        ptr[4] = 0xde;  // SSRC
18892a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev        ptr[5] = 0xad;
18992a0538051ff3deeda67521c966f476bed90d23bIliyan Malchev        ptr[6] = 0xbe;
190c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        ptr[7] = 0xef;
191c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
192c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru        buf->setRange(0, buf->size() + 8);
193c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    }
194c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru
195c559cd81139f97cecad1ad91a0b2e25a5936d53Jean-Baptiste Queru    static void addSDES(int s, const sp<ABuffer> &buffer) {
196        struct sockaddr_in addr;
197        socklen_t addrSize = sizeof(addr);
198        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
199
200        uint8_t *data = buffer->data() + buffer->size();
201        data[0] = 0x80 | 1;
202        data[1] = 202;  // SDES
203        data[4] = 0xde;  // SSRC
204        data[5] = 0xad;
205        data[6] = 0xbe;
206        data[7] = 0xef;
207
208        size_t offset = 8;
209
210        data[offset++] = 1;  // CNAME
211
212        AString cname = "stagefright@";
213        cname.append(inet_ntoa(addr.sin_addr));
214        data[offset++] = cname.size();
215
216        memcpy(&data[offset], cname.c_str(), cname.size());
217        offset += cname.size();
218
219        data[offset++] = 6;  // TOOL
220
221        AString tool;
222        MakeUserAgentString(&tool);
223
224        data[offset++] = tool.size();
225
226        memcpy(&data[offset], tool.c_str(), tool.size());
227        offset += tool.size();
228
229        data[offset++] = 0;
230
231        if ((offset % 4) > 0) {
232            size_t count = 4 - (offset % 4);
233            switch (count) {
234                case 3:
235                    data[offset++] = 0;
236                case 2:
237                    data[offset++] = 0;
238                case 1:
239                    data[offset++] = 0;
240            }
241        }
242
243        size_t numWords = (offset / 4) - 1;
244        data[2] = numWords >> 8;
245        data[3] = numWords & 0xff;
246
247        buffer->setRange(buffer->offset(), buffer->size() + offset);
248    }
249
250    // In case we're behind NAT, fire off two UDP packets to the remote
251    // rtp/rtcp ports to poke a hole into the firewall for future incoming
252    // packets. We're going to send an RR/SDES RTCP packet to both of them.
253    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
254        struct sockaddr_in addr;
255        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
256        addr.sin_family = AF_INET;
257
258        AString source;
259        AString server_port;
260        if (!GetAttribute(transport.c_str(),
261                          "source",
262                          &source)) {
263            LOGW("Missing 'source' field in Transport response. Using "
264                 "RTSP endpoint address.");
265
266            struct hostent *ent = gethostbyname(mSessionHost.c_str());
267            if (ent == NULL) {
268                LOGE("Failed to look up address of session host '%s'",
269                     mSessionHost.c_str());
270
271                return false;
272            }
273
274            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
275        } else {
276            addr.sin_addr.s_addr = inet_addr(source.c_str());
277        }
278
279        if (!GetAttribute(transport.c_str(),
280                                 "server_port",
281                                 &server_port)) {
282            LOGI("Missing 'server_port' field in Transport response.");
283            return false;
284        }
285
286        int rtpPort, rtcpPort;
287        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
288                || rtpPort <= 0 || rtpPort > 65535
289                || rtcpPort <=0 || rtcpPort > 65535
290                || rtcpPort != rtpPort + 1) {
291            LOGE("Server picked invalid RTP/RTCP port pair %s,"
292                 " RTP port must be even, RTCP port must be one higher.",
293                 server_port.c_str());
294
295            return false;
296        }
297
298        if (rtpPort & 1) {
299            LOGW("Server picked an odd RTP port, it should've picked an "
300                 "even one, we'll let it pass for now, but this may break "
301                 "in the future.");
302        }
303
304        if (addr.sin_addr.s_addr == INADDR_NONE) {
305            return true;
306        }
307
308        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
309            // No firewalls to traverse on the loopback interface.
310            return true;
311        }
312
313        // Make up an RR/SDES RTCP packet.
314        sp<ABuffer> buf = new ABuffer(65536);
315        buf->setRange(0, 0);
316        addRR(buf);
317        addSDES(rtpSocket, buf);
318
319        addr.sin_port = htons(rtpPort);
320
321        ssize_t n = sendto(
322                rtpSocket, buf->data(), buf->size(), 0,
323                (const sockaddr *)&addr, sizeof(addr));
324
325        if (n < (ssize_t)buf->size()) {
326            LOGE("failed to poke a hole for RTP packets");
327            return false;
328        }
329
330        addr.sin_port = htons(rtcpPort);
331
332        n = sendto(
333                rtcpSocket, buf->data(), buf->size(), 0,
334                (const sockaddr *)&addr, sizeof(addr));
335
336        if (n < (ssize_t)buf->size()) {
337            LOGE("failed to poke a hole for RTCP packets");
338            return false;
339        }
340
341        ALOGV("successfully poked holes.");
342
343        return true;
344    }
345
346    virtual void onMessageReceived(const sp<AMessage> &msg) {
347        switch (msg->what()) {
348            case 'conn':
349            {
350                int32_t result;
351                CHECK(msg->findInt32("result", &result));
352
353                LOGI("connection request completed with result %d (%s)",
354                     result, strerror(-result));
355
356                if (result == OK) {
357                    AString request;
358                    request = "DESCRIBE ";
359                    request.append(mSessionURL);
360                    request.append(" RTSP/1.0\r\n");
361                    request.append("Accept: application/sdp\r\n");
362                    request.append("\r\n");
363
364                    sp<AMessage> reply = new AMessage('desc', id());
365                    mConn->sendRequest(request.c_str(), reply);
366                } else {
367                    (new AMessage('disc', id()))->post();
368                }
369                break;
370            }
371
372            case 'disc':
373            {
374                int32_t reconnect;
375                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
376                    sp<AMessage> reply = new AMessage('conn', id());
377                    mConn->connect(mOriginalSessionURL.c_str(), reply);
378                } else {
379                    (new AMessage('quit', id()))->post();
380                }
381                break;
382            }
383
384            case 'desc':
385            {
386                int32_t result;
387                CHECK(msg->findInt32("result", &result));
388
389                LOGI("DESCRIBE completed with result %d (%s)",
390                     result, strerror(-result));
391
392                if (result == OK) {
393                    sp<RefBase> obj;
394                    CHECK(msg->findObject("response", &obj));
395                    sp<ARTSPResponse> response =
396                        static_cast<ARTSPResponse *>(obj.get());
397
398                    if (response->mStatusCode == 302) {
399                        ssize_t i = response->mHeaders.indexOfKey("location");
400                        CHECK_GE(i, 0);
401
402                        mSessionURL = response->mHeaders.valueAt(i);
403
404                        AString request;
405                        request = "DESCRIBE ";
406                        request.append(mSessionURL);
407                        request.append(" RTSP/1.0\r\n");
408                        request.append("Accept: application/sdp\r\n");
409                        request.append("\r\n");
410
411                        sp<AMessage> reply = new AMessage('desc', id());
412                        mConn->sendRequest(request.c_str(), reply);
413                        break;
414                    }
415
416                    if (response->mStatusCode != 200) {
417                        result = UNKNOWN_ERROR;
418                    } else {
419                        mSessionDesc = new ASessionDescription;
420
421                        mSessionDesc->setTo(
422                                response->mContent->data(),
423                                response->mContent->size());
424
425                        if (!mSessionDesc->isValid()) {
426                            LOGE("Failed to parse session description.");
427                            result = ERROR_MALFORMED;
428                        } else {
429                            ssize_t i = response->mHeaders.indexOfKey("content-base");
430                            if (i >= 0) {
431                                mBaseURL = response->mHeaders.valueAt(i);
432                            } else {
433                                i = response->mHeaders.indexOfKey("content-location");
434                                if (i >= 0) {
435                                    mBaseURL = response->mHeaders.valueAt(i);
436                                } else {
437                                    mBaseURL = mSessionURL;
438                                }
439                            }
440
441                            if (!mBaseURL.startsWith("rtsp://")) {
442                                // Some misbehaving servers specify a relative
443                                // URL in one of the locations above, combine
444                                // it with the absolute session URL to get
445                                // something usable...
446
447                                LOGW("Server specified a non-absolute base URL"
448                                     ", combining it with the session URL to "
449                                     "get something usable...");
450
451                                AString tmp;
452                                CHECK(MakeURL(
453                                            mSessionURL.c_str(),
454                                            mBaseURL.c_str(),
455                                            &tmp));
456
457                                mBaseURL = tmp;
458                            }
459
460                            CHECK_GT(mSessionDesc->countTracks(), 1u);
461                            setupTrack(1);
462                        }
463                    }
464                }
465
466                if (result != OK) {
467                    sp<AMessage> reply = new AMessage('disc', id());
468                    mConn->disconnect(reply);
469                }
470                break;
471            }
472
473            case 'setu':
474            {
475                size_t index;
476                CHECK(msg->findSize("index", &index));
477
478                TrackInfo *track = NULL;
479                size_t trackIndex;
480                if (msg->findSize("track-index", &trackIndex)) {
481                    track = &mTracks.editItemAt(trackIndex);
482                }
483
484                int32_t result;
485                CHECK(msg->findInt32("result", &result));
486
487                LOGI("SETUP(%d) completed with result %d (%s)",
488                     index, result, strerror(-result));
489
490                if (result == OK) {
491                    CHECK(track != NULL);
492
493                    sp<RefBase> obj;
494                    CHECK(msg->findObject("response", &obj));
495                    sp<ARTSPResponse> response =
496                        static_cast<ARTSPResponse *>(obj.get());
497
498                    if (response->mStatusCode != 200) {
499                        result = UNKNOWN_ERROR;
500                    } else {
501                        ssize_t i = response->mHeaders.indexOfKey("session");
502                        CHECK_GE(i, 0);
503
504                        mSessionID = response->mHeaders.valueAt(i);
505                        i = mSessionID.find(";");
506                        if (i >= 0) {
507                            // Remove options, i.e. ";timeout=90"
508                            mSessionID.erase(i, mSessionID.size() - i);
509                        }
510
511                        sp<AMessage> notify = new AMessage('accu', id());
512                        notify->setSize("track-index", trackIndex);
513
514                        i = response->mHeaders.indexOfKey("transport");
515                        CHECK_GE(i, 0);
516
517                        if (!track->mUsingInterleavedTCP) {
518                            AString transport = response->mHeaders.valueAt(i);
519
520                            // We are going to continue even if we were
521                            // unable to poke a hole into the firewall...
522                            pokeAHole(
523                                    track->mRTPSocket,
524                                    track->mRTCPSocket,
525                                    transport);
526                        }
527
528                        mRTPConn->addStream(
529                                track->mRTPSocket, track->mRTCPSocket,
530                                mSessionDesc, index,
531                                notify, track->mUsingInterleavedTCP);
532
533                        mSetupTracksSuccessful = true;
534                    }
535                }
536
537                if (result != OK) {
538                    if (track) {
539                        if (!track->mUsingInterleavedTCP) {
540                            // Clear the tag
541                            if (mUIDValid) {
542                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
543                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
544                            }
545
546                            close(track->mRTPSocket);
547                            close(track->mRTCPSocket);
548                        }
549
550                        mTracks.removeItemsAt(trackIndex);
551                    }
552                }
553
554                ++index;
555                if (index < mSessionDesc->countTracks()) {
556                    setupTrack(index);
557                } else if (mSetupTracksSuccessful) {
558                    AString request = "PLAY ";
559                    request.append(mSessionURL);
560                    request.append(" RTSP/1.0\r\n");
561
562                    request.append("Session: ");
563                    request.append(mSessionID);
564                    request.append("\r\n");
565
566                    request.append("\r\n");
567
568                    sp<AMessage> reply = new AMessage('play', id());
569                    mConn->sendRequest(request.c_str(), reply);
570                } else {
571                    sp<AMessage> reply = new AMessage('disc', id());
572                    mConn->disconnect(reply);
573                }
574                break;
575            }
576
577            case 'play':
578            {
579                int32_t result;
580                CHECK(msg->findInt32("result", &result));
581
582                LOGI("PLAY completed with result %d (%s)",
583                     result, strerror(-result));
584
585                if (result == OK) {
586                    sp<RefBase> obj;
587                    CHECK(msg->findObject("response", &obj));
588                    sp<ARTSPResponse> response =
589                        static_cast<ARTSPResponse *>(obj.get());
590
591                    if (response->mStatusCode != 200) {
592                        result = UNKNOWN_ERROR;
593                    } else {
594                        parsePlayResponse(response);
595
596                        sp<AMessage> timeout = new AMessage('tiou', id());
597                        timeout->post(kStartupTimeoutUs);
598                    }
599                }
600
601                if (result != OK) {
602                    sp<AMessage> reply = new AMessage('disc', id());
603                    mConn->disconnect(reply);
604                }
605
606                break;
607            }
608
609            case 'abor':
610            {
611                for (size_t i = 0; i < mTracks.size(); ++i) {
612                    TrackInfo *info = &mTracks.editItemAt(i);
613
614                    if (!mFirstAccessUnit) {
615                        postQueueEOS(i, ERROR_END_OF_STREAM);
616                    }
617
618                    if (!info->mUsingInterleavedTCP) {
619                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
620
621                        // Clear the tag
622                        if (mUIDValid) {
623                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
624                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
625                        }
626
627                        close(info->mRTPSocket);
628                        close(info->mRTCPSocket);
629                    }
630                }
631                mTracks.clear();
632                mSetupTracksSuccessful = false;
633                mSeekPending = false;
634                mFirstAccessUnit = true;
635                mNTPAnchorUs = -1;
636                mMediaAnchorUs = -1;
637                mNumAccessUnitsReceived = 0;
638                mReceivedFirstRTCPPacket = false;
639                mReceivedFirstRTPPacket = false;
640                mSeekable = false;
641
642                sp<AMessage> reply = new AMessage('tear', id());
643
644                int32_t reconnect;
645                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
646                    reply->setInt32("reconnect", true);
647                }
648
649                AString request;
650                request = "TEARDOWN ";
651
652                // XXX should use aggregate url from SDP here...
653                request.append(mSessionURL);
654                request.append(" RTSP/1.0\r\n");
655
656                request.append("Session: ");
657                request.append(mSessionID);
658                request.append("\r\n");
659
660                request.append("\r\n");
661
662                mConn->sendRequest(request.c_str(), reply);
663                break;
664            }
665
666            case 'tear':
667            {
668                int32_t result;
669                CHECK(msg->findInt32("result", &result));
670
671                LOGI("TEARDOWN completed with result %d (%s)",
672                     result, strerror(-result));
673
674                sp<AMessage> reply = new AMessage('disc', id());
675
676                int32_t reconnect;
677                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
678                    reply->setInt32("reconnect", true);
679                }
680
681                mConn->disconnect(reply);
682                break;
683            }
684
685            case 'quit':
686            {
687                sp<AMessage> msg = mNotify->dup();
688                msg->setInt32("what", kWhatDisconnected);
689                msg->setInt32("result", UNKNOWN_ERROR);
690                msg->post();
691                break;
692            }
693
694            case 'chek':
695            {
696                int32_t generation;
697                CHECK(msg->findInt32("generation", &generation));
698                if (generation != mCheckGeneration) {
699                    // This is an outdated message. Ignore.
700                    break;
701                }
702
703                if (mNumAccessUnitsReceived == 0) {
704                    LOGI("stream ended? aborting.");
705                    (new AMessage('abor', id()))->post();
706                    break;
707                }
708
709                mNumAccessUnitsReceived = 0;
710                msg->post(kAccessUnitTimeoutUs);
711                break;
712            }
713
714            case 'accu':
715            {
716                int32_t timeUpdate;
717                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
718                    size_t trackIndex;
719                    CHECK(msg->findSize("track-index", &trackIndex));
720
721                    uint32_t rtpTime;
722                    uint64_t ntpTime;
723                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
724                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
725
726                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
727                    break;
728                }
729
730                int32_t first;
731                if (msg->findInt32("first-rtcp", &first)) {
732                    mReceivedFirstRTCPPacket = true;
733                    break;
734                }
735
736                if (msg->findInt32("first-rtp", &first)) {
737                    mReceivedFirstRTPPacket = true;
738                    break;
739                }
740
741                ++mNumAccessUnitsReceived;
742                postAccessUnitTimeoutCheck();
743
744                size_t trackIndex;
745                CHECK(msg->findSize("track-index", &trackIndex));
746
747                if (trackIndex >= mTracks.size()) {
748                    ALOGV("late packets ignored.");
749                    break;
750                }
751
752                TrackInfo *track = &mTracks.editItemAt(trackIndex);
753
754                int32_t eos;
755                if (msg->findInt32("eos", &eos)) {
756                    LOGI("received BYE on track index %d", trackIndex);
757#if 0
758                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
759#endif
760                    return;
761                }
762
763                sp<RefBase> obj;
764                CHECK(msg->findObject("access-unit", &obj));
765
766                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
767
768                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
769
770                if (mSeekPending) {
771                    ALOGV("we're seeking, dropping stale packet.");
772                    break;
773                }
774
775                if (seqNum < track->mFirstSeqNumInSegment) {
776                    ALOGV("dropping stale access-unit (%d < %d)",
777                         seqNum, track->mFirstSeqNumInSegment);
778                    break;
779                }
780
781                if (track->mNewSegment) {
782                    track->mNewSegment = false;
783                }
784
785                onAccessUnitComplete(trackIndex, accessUnit);
786                break;
787            }
788
789            case 'seek':
790            {
791                if (!mSeekable) {
792                    LOGW("This is a live stream, ignoring seek request.");
793
794                    sp<AMessage> msg = mNotify->dup();
795                    msg->setInt32("what", kWhatSeekDone);
796                    msg->post();
797                    break;
798                }
799
800                int64_t timeUs;
801                CHECK(msg->findInt64("time", &timeUs));
802
803                mSeekPending = true;
804
805                // Disable the access unit timeout until we resumed
806                // playback again.
807                mCheckPending = true;
808                ++mCheckGeneration;
809
810                AString request = "PAUSE ";
811                request.append(mSessionURL);
812                request.append(" RTSP/1.0\r\n");
813
814                request.append("Session: ");
815                request.append(mSessionID);
816                request.append("\r\n");
817
818                request.append("\r\n");
819
820                sp<AMessage> reply = new AMessage('see1', id());
821                reply->setInt64("time", timeUs);
822                mConn->sendRequest(request.c_str(), reply);
823                break;
824            }
825
826            case 'see1':
827            {
828                // Session is paused now.
829                for (size_t i = 0; i < mTracks.size(); ++i) {
830                    TrackInfo *info = &mTracks.editItemAt(i);
831
832                    postQueueSeekDiscontinuity(i);
833
834                    info->mRTPAnchor = 0;
835                    info->mNTPAnchorUs = -1;
836                }
837
838                mNTPAnchorUs = -1;
839
840                int64_t timeUs;
841                CHECK(msg->findInt64("time", &timeUs));
842
843                AString request = "PLAY ";
844                request.append(mSessionURL);
845                request.append(" RTSP/1.0\r\n");
846
847                request.append("Session: ");
848                request.append(mSessionID);
849                request.append("\r\n");
850
851                request.append(
852                        StringPrintf(
853                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
854
855                request.append("\r\n");
856
857                sp<AMessage> reply = new AMessage('see2', id());
858                mConn->sendRequest(request.c_str(), reply);
859                break;
860            }
861
862            case 'see2':
863            {
864                CHECK(mSeekPending);
865
866                int32_t result;
867                CHECK(msg->findInt32("result", &result));
868
869                LOGI("PLAY completed with result %d (%s)",
870                     result, strerror(-result));
871
872                mCheckPending = false;
873                postAccessUnitTimeoutCheck();
874
875                if (result == OK) {
876                    sp<RefBase> obj;
877                    CHECK(msg->findObject("response", &obj));
878                    sp<ARTSPResponse> response =
879                        static_cast<ARTSPResponse *>(obj.get());
880
881                    if (response->mStatusCode != 200) {
882                        result = UNKNOWN_ERROR;
883                    } else {
884                        parsePlayResponse(response);
885
886                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
887                        CHECK_GE(i, 0);
888
889                        ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
890
891                        LOGI("seek completed.");
892                    }
893                }
894
895                if (result != OK) {
896                    LOGE("seek failed, aborting.");
897                    (new AMessage('abor', id()))->post();
898                }
899
900                mSeekPending = false;
901
902                sp<AMessage> msg = mNotify->dup();
903                msg->setInt32("what", kWhatSeekDone);
904                msg->post();
905                break;
906            }
907
908            case 'biny':
909            {
910                sp<RefBase> obj;
911                CHECK(msg->findObject("buffer", &obj));
912                sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
913
914                int32_t index;
915                CHECK(buffer->meta()->findInt32("index", &index));
916
917                mRTPConn->injectPacket(index, buffer);
918                break;
919            }
920
921            case 'tiou':
922            {
923                if (!mReceivedFirstRTCPPacket) {
924                    if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
925                        LOGW("We received RTP packets but no RTCP packets, "
926                             "using fake timestamps.");
927
928                        mTryFakeRTCP = true;
929
930                        mReceivedFirstRTCPPacket = true;
931
932                        fakeTimestamps();
933                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
934                        LOGW("Never received any data, switching transports.");
935
936                        mTryTCPInterleaving = true;
937
938                        sp<AMessage> msg = new AMessage('abor', id());
939                        msg->setInt32("reconnect", true);
940                        msg->post();
941                    } else {
942                        LOGW("Never received any data, disconnecting.");
943                        (new AMessage('abor', id()))->post();
944                    }
945                }
946                break;
947            }
948
949            default:
950                TRESPASS();
951                break;
952        }
953    }
954
955    void postAccessUnitTimeoutCheck() {
956        if (mCheckPending) {
957            return;
958        }
959
960        mCheckPending = true;
961        sp<AMessage> check = new AMessage('chek', id());
962        check->setInt32("generation", mCheckGeneration);
963        check->post(kAccessUnitTimeoutUs);
964    }
965
966    static void SplitString(
967            const AString &s, const char *separator, List<AString> *items) {
968        items->clear();
969        size_t start = 0;
970        while (start < s.size()) {
971            ssize_t offset = s.find(separator, start);
972
973            if (offset < 0) {
974                items->push_back(AString(s, start, s.size() - start));
975                break;
976            }
977
978            items->push_back(AString(s, start, offset - start));
979            start = offset + strlen(separator);
980        }
981    }
982
983    void parsePlayResponse(const sp<ARTSPResponse> &response) {
984        mSeekable = false;
985
986        ssize_t i = response->mHeaders.indexOfKey("range");
987        if (i < 0) {
988            // Server doesn't even tell use what range it is going to
989            // play, therefore we won't support seeking.
990            return;
991        }
992
993        AString range = response->mHeaders.valueAt(i);
994        ALOGV("Range: %s", range.c_str());
995
996        AString val;
997        CHECK(GetAttribute(range.c_str(), "npt", &val));
998
999        float npt1, npt2;
1000        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1001            // This is a live stream and therefore not seekable.
1002            return;
1003        }
1004
1005        i = response->mHeaders.indexOfKey("rtp-info");
1006        CHECK_GE(i, 0);
1007
1008        AString rtpInfo = response->mHeaders.valueAt(i);
1009        List<AString> streamInfos;
1010        SplitString(rtpInfo, ",", &streamInfos);
1011
1012        int n = 1;
1013        for (List<AString>::iterator it = streamInfos.begin();
1014             it != streamInfos.end(); ++it) {
1015            (*it).trim();
1016            ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1017
1018            CHECK(GetAttribute((*it).c_str(), "url", &val));
1019
1020            size_t trackIndex = 0;
1021            while (trackIndex < mTracks.size()
1022                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1023                ++trackIndex;
1024            }
1025            CHECK_LT(trackIndex, mTracks.size());
1026
1027            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1028
1029            char *end;
1030            unsigned long seq = strtoul(val.c_str(), &end, 10);
1031
1032            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1033            info->mFirstSeqNumInSegment = seq;
1034            info->mNewSegment = true;
1035
1036            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1037
1038            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1039
1040            ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1041
1042            info->mNormalPlayTimeRTP = rtpTime;
1043            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1044
1045            if (!mFirstAccessUnit) {
1046                postNormalPlayTimeMapping(
1047                        trackIndex,
1048                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1049            }
1050
1051            ++n;
1052        }
1053
1054        mSeekable = true;
1055    }
1056
1057    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1058        CHECK_GE(index, 0u);
1059        CHECK_LT(index, mTracks.size());
1060
1061        const TrackInfo &info = mTracks.itemAt(index);
1062
1063        *timeScale = info.mTimeScale;
1064
1065        return info.mPacketSource->getFormat();
1066    }
1067
1068    size_t countTracks() const {
1069        return mTracks.size();
1070    }
1071
1072private:
1073    struct TrackInfo {
1074        AString mURL;
1075        int mRTPSocket;
1076        int mRTCPSocket;
1077        bool mUsingInterleavedTCP;
1078        uint32_t mFirstSeqNumInSegment;
1079        bool mNewSegment;
1080
1081        uint32_t mRTPAnchor;
1082        int64_t mNTPAnchorUs;
1083        int32_t mTimeScale;
1084
1085        uint32_t mNormalPlayTimeRTP;
1086        int64_t mNormalPlayTimeUs;
1087
1088        sp<APacketSource> mPacketSource;
1089
1090        // Stores packets temporarily while no notion of time
1091        // has been established yet.
1092        List<sp<ABuffer> > mPackets;
1093    };
1094
1095    sp<AMessage> mNotify;
1096    bool mUIDValid;
1097    uid_t mUID;
1098    sp<ALooper> mNetLooper;
1099    sp<ARTSPConnection> mConn;
1100    sp<ARTPConnection> mRTPConn;
1101    sp<ASessionDescription> mSessionDesc;
1102    AString mOriginalSessionURL;  // This one still has user:pass@
1103    AString mSessionURL;
1104    AString mSessionHost;
1105    AString mBaseURL;
1106    AString mSessionID;
1107    bool mSetupTracksSuccessful;
1108    bool mSeekPending;
1109    bool mFirstAccessUnit;
1110
1111    int64_t mNTPAnchorUs;
1112    int64_t mMediaAnchorUs;
1113    int64_t mLastMediaTimeUs;
1114
1115    int64_t mNumAccessUnitsReceived;
1116    bool mCheckPending;
1117    int32_t mCheckGeneration;
1118    bool mTryTCPInterleaving;
1119    bool mTryFakeRTCP;
1120    bool mReceivedFirstRTCPPacket;
1121    bool mReceivedFirstRTPPacket;
1122    bool mSeekable;
1123
1124    Vector<TrackInfo> mTracks;
1125
1126    void setupTrack(size_t index) {
1127        sp<APacketSource> source =
1128            new APacketSource(mSessionDesc, index);
1129
1130        if (source->initCheck() != OK) {
1131            LOGW("Unsupported format. Ignoring track #%d.", index);
1132
1133            sp<AMessage> reply = new AMessage('setu', id());
1134            reply->setSize("index", index);
1135            reply->setInt32("result", ERROR_UNSUPPORTED);
1136            reply->post();
1137            return;
1138        }
1139
1140        AString url;
1141        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1142
1143        AString trackURL;
1144        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1145
1146        mTracks.push(TrackInfo());
1147        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1148        info->mURL = trackURL;
1149        info->mPacketSource = source;
1150        info->mUsingInterleavedTCP = false;
1151        info->mFirstSeqNumInSegment = 0;
1152        info->mNewSegment = true;
1153        info->mRTPAnchor = 0;
1154        info->mNTPAnchorUs = -1;
1155        info->mNormalPlayTimeRTP = 0;
1156        info->mNormalPlayTimeUs = 0ll;
1157
1158        unsigned long PT;
1159        AString formatDesc;
1160        AString formatParams;
1161        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1162
1163        int32_t timescale;
1164        int32_t numChannels;
1165        ASessionDescription::ParseFormatDesc(
1166                formatDesc.c_str(), &timescale, &numChannels);
1167
1168        info->mTimeScale = timescale;
1169
1170        ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1171
1172        AString request = "SETUP ";
1173        request.append(trackURL);
1174        request.append(" RTSP/1.0\r\n");
1175
1176        if (mTryTCPInterleaving) {
1177            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1178            info->mUsingInterleavedTCP = true;
1179            info->mRTPSocket = interleaveIndex;
1180            info->mRTCPSocket = interleaveIndex + 1;
1181
1182            request.append("Transport: RTP/AVP/TCP;interleaved=");
1183            request.append(interleaveIndex);
1184            request.append("-");
1185            request.append(interleaveIndex + 1);
1186        } else {
1187            unsigned rtpPort;
1188            ARTPConnection::MakePortPair(
1189                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1190
1191            if (mUIDValid) {
1192                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1193                                                (uint32_t)*(uint32_t*) "RTP_");
1194                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1195                                                (uint32_t)*(uint32_t*) "RTP_");
1196            }
1197
1198            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1199            request.append(rtpPort);
1200            request.append("-");
1201            request.append(rtpPort + 1);
1202        }
1203
1204        request.append("\r\n");
1205
1206        if (index > 1) {
1207            request.append("Session: ");
1208            request.append(mSessionID);
1209            request.append("\r\n");
1210        }
1211
1212        request.append("\r\n");
1213
1214        sp<AMessage> reply = new AMessage('setu', id());
1215        reply->setSize("index", index);
1216        reply->setSize("track-index", mTracks.size() - 1);
1217        mConn->sendRequest(request.c_str(), reply);
1218    }
1219
1220    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1221        out->clear();
1222
1223        if (strncasecmp("rtsp://", baseURL, 7)) {
1224            // Base URL must be absolute
1225            return false;
1226        }
1227
1228        if (!strncasecmp("rtsp://", url, 7)) {
1229            // "url" is already an absolute URL, ignore base URL.
1230            out->setTo(url);
1231            return true;
1232        }
1233
1234        size_t n = strlen(baseURL);
1235        if (baseURL[n - 1] == '/') {
1236            out->setTo(baseURL);
1237            out->append(url);
1238        } else {
1239            const char *slashPos = strrchr(baseURL, '/');
1240
1241            if (slashPos > &baseURL[6]) {
1242                out->setTo(baseURL, slashPos - baseURL);
1243            } else {
1244                out->setTo(baseURL);
1245            }
1246
1247            out->append("/");
1248            out->append(url);
1249        }
1250
1251        return true;
1252    }
1253
1254    void fakeTimestamps() {
1255        for (size_t i = 0; i < mTracks.size(); ++i) {
1256            onTimeUpdate(i, 0, 0ll);
1257        }
1258    }
1259
1260    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1261        ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1262             trackIndex, rtpTime, ntpTime);
1263
1264        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1265
1266        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1267
1268        track->mRTPAnchor = rtpTime;
1269        track->mNTPAnchorUs = ntpTimeUs;
1270
1271        if (mNTPAnchorUs < 0) {
1272            mNTPAnchorUs = ntpTimeUs;
1273            mMediaAnchorUs = mLastMediaTimeUs;
1274        }
1275    }
1276
1277    void onAccessUnitComplete(
1278            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1279        ALOGV("onAccessUnitComplete track %d", trackIndex);
1280
1281        if (mFirstAccessUnit) {
1282            sp<AMessage> msg = mNotify->dup();
1283            msg->setInt32("what", kWhatConnected);
1284            msg->post();
1285
1286            for (size_t i = 0; i < mTracks.size(); ++i) {
1287                TrackInfo *info = &mTracks.editItemAt(i);
1288
1289                postNormalPlayTimeMapping(
1290                        i,
1291                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1292            }
1293
1294            mFirstAccessUnit = false;
1295        }
1296
1297        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1298
1299        if (mNTPAnchorUs < 0 || mMediaAnchorUs < 0 || track->mNTPAnchorUs < 0) {
1300            ALOGV("storing accessUnit, no time established yet");
1301            track->mPackets.push_back(accessUnit);
1302            return;
1303        }
1304
1305        while (!track->mPackets.empty()) {
1306            sp<ABuffer> accessUnit = *track->mPackets.begin();
1307            track->mPackets.erase(track->mPackets.begin());
1308
1309            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1310                postQueueAccessUnit(trackIndex, accessUnit);
1311            }
1312        }
1313
1314        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1315            postQueueAccessUnit(trackIndex, accessUnit);
1316        }
1317    }
1318
1319    bool addMediaTimestamp(
1320            int32_t trackIndex, const TrackInfo *track,
1321            const sp<ABuffer> &accessUnit) {
1322        uint32_t rtpTime;
1323        CHECK(accessUnit->meta()->findInt32(
1324                    "rtp-time", (int32_t *)&rtpTime));
1325
1326        int64_t relRtpTimeUs =
1327            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1328                / track->mTimeScale;
1329
1330        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1331
1332        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1333
1334        if (mediaTimeUs > mLastMediaTimeUs) {
1335            mLastMediaTimeUs = mediaTimeUs;
1336        }
1337
1338        if (mediaTimeUs < 0) {
1339            ALOGV("dropping early accessUnit.");
1340            return false;
1341        }
1342
1343        ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1344             trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1345
1346        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1347
1348        return true;
1349    }
1350
1351    void postQueueAccessUnit(
1352            size_t trackIndex, const sp<ABuffer> &accessUnit) {
1353        sp<AMessage> msg = mNotify->dup();
1354        msg->setInt32("what", kWhatAccessUnit);
1355        msg->setSize("trackIndex", trackIndex);
1356        msg->setObject("accessUnit", accessUnit);
1357        msg->post();
1358    }
1359
1360    void postQueueEOS(size_t trackIndex, status_t finalResult) {
1361        sp<AMessage> msg = mNotify->dup();
1362        msg->setInt32("what", kWhatEOS);
1363        msg->setSize("trackIndex", trackIndex);
1364        msg->setInt32("finalResult", finalResult);
1365        msg->post();
1366    }
1367
1368    void postQueueSeekDiscontinuity(size_t trackIndex) {
1369        sp<AMessage> msg = mNotify->dup();
1370        msg->setInt32("what", kWhatSeekDiscontinuity);
1371        msg->setSize("trackIndex", trackIndex);
1372        msg->post();
1373    }
1374
1375    void postNormalPlayTimeMapping(
1376            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1377        sp<AMessage> msg = mNotify->dup();
1378        msg->setInt32("what", kWhatNormalPlayTimeMapping);
1379        msg->setSize("trackIndex", trackIndex);
1380        msg->setInt32("rtpTime", rtpTime);
1381        msg->setInt64("nptUs", nptUs);
1382        msg->post();
1383    }
1384
1385    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1386};
1387
1388}  // namespace android
1389
1390#endif  // MY_HANDLER_H_
1391