MyHandler.h revision cf7b9c7aae758ac0b99833915053c63c2ac46e09
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21#include "APacketSource.h"
22#include "ARTPConnection.h"
23#include "ARTSPConnection.h"
24#include "ASessionDescription.h"
25
26#include <media/stagefright/foundation/ABuffer.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/ALooper.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/MediaErrors.h>
31
32namespace android {
33
34struct MyHandler : public AHandler {
35    MyHandler(const char *url, const sp<ALooper> &looper)
36        : mLooper(looper),
37          mConn(new ARTSPConnection),
38          mRTPConn(new ARTPConnection),
39          mSessionURL(url),
40          mSetupTracksSuccessful(false),
41          mFirstAccessUnit(true),
42          mFirstAccessUnitNTP(-1) {
43        mLooper->registerHandler(this);
44        mLooper->registerHandler(mConn);
45        mLooper->registerHandler(mRTPConn);
46        sp<AMessage> reply = new AMessage('conn', id());
47        mConn->connect(mSessionURL.c_str(), reply);
48    }
49
50    virtual void onMessageReceived(const sp<AMessage> &msg) {
51        switch (msg->what()) {
52            case 'conn':
53            {
54                int32_t result;
55                CHECK(msg->findInt32("result", &result));
56
57                LOG(INFO) << "connection request completed with result "
58                     << result << " (" << strerror(-result) << ")";
59
60                if (result == OK) {
61                    AString request;
62                    request = "DESCRIBE ";
63                    request.append(mSessionURL);
64                    request.append(" RTSP/1.0\r\n");
65                    request.append("Accept: application/sdp\r\n");
66                    request.append("\r\n");
67
68                    sp<AMessage> reply = new AMessage('desc', id());
69                    mConn->sendRequest(request.c_str(), reply);
70                }
71                break;
72            }
73
74            case 'disc':
75            {
76                LOG(INFO) << "disconnect completed";
77
78                (new AMessage('quit', id()))->post();
79                break;
80            }
81
82            case 'desc':
83            {
84                int32_t result;
85                CHECK(msg->findInt32("result", &result));
86
87                LOG(INFO) << "DESCRIBE completed with result "
88                     << result << " (" << strerror(-result) << ")";
89
90                if (result == OK) {
91                    sp<RefBase> obj;
92                    CHECK(msg->findObject("response", &obj));
93                    sp<ARTSPResponse> response =
94                        static_cast<ARTSPResponse *>(obj.get());
95
96                    if (response->mStatusCode == 302) {
97                        ssize_t i = response->mHeaders.indexOfKey("location");
98                        CHECK_GE(i, 0);
99
100                        mSessionURL = response->mHeaders.valueAt(i);
101
102                        AString request;
103                        request = "DESCRIBE ";
104                        request.append(mSessionURL);
105                        request.append(" RTSP/1.0\r\n");
106                        request.append("Accept: application/sdp\r\n");
107                        request.append("\r\n");
108
109                        sp<AMessage> reply = new AMessage('desc', id());
110                        mConn->sendRequest(request.c_str(), reply);
111                        break;
112                    }
113
114                    CHECK_EQ(response->mStatusCode, 200u);
115
116                    mSessionDesc = new ASessionDescription;
117
118                    mSessionDesc->setTo(
119                            response->mContent->data(),
120                            response->mContent->size());
121
122                    CHECK(mSessionDesc->isValid());
123
124                    ssize_t i = response->mHeaders.indexOfKey("content-base");
125                    if (i >= 0) {
126                        mBaseURL = response->mHeaders.valueAt(i);
127                    } else {
128                        i = response->mHeaders.indexOfKey("content-location");
129                        if (i >= 0) {
130                            mBaseURL = response->mHeaders.valueAt(i);
131                        } else {
132                            mBaseURL = mSessionURL;
133                        }
134                    }
135
136                    CHECK_GT(mSessionDesc->countTracks(), 1u);
137                    setupTrack(1);
138                } else {
139                    sp<AMessage> reply = new AMessage('disc', id());
140                    mConn->disconnect(reply);
141                }
142                break;
143            }
144
145            case 'setu':
146            {
147                size_t index;
148                CHECK(msg->findSize("index", &index));
149
150                size_t trackIndex;
151                CHECK(msg->findSize("track-index", &trackIndex));
152
153                int32_t result;
154                CHECK(msg->findInt32("result", &result));
155
156                LOG(INFO) << "SETUP(" << index << ") completed with result "
157                     << result << " (" << strerror(-result) << ")";
158
159                TrackInfo *track = &mTracks.editItemAt(trackIndex);
160
161                if (result == OK) {
162                    sp<RefBase> obj;
163                    CHECK(msg->findObject("response", &obj));
164                    sp<ARTSPResponse> response =
165                        static_cast<ARTSPResponse *>(obj.get());
166
167                    CHECK_EQ(response->mStatusCode, 200u);
168
169                    ssize_t i = response->mHeaders.indexOfKey("session");
170                    CHECK_GE(i, 0);
171
172                    if (index == 1) {
173                        mSessionID = response->mHeaders.valueAt(i);
174                        i = mSessionID.find(";");
175                        if (i >= 0) {
176                            // Remove options, i.e. ";timeout=90"
177                            mSessionID.erase(i, mSessionID.size() - i);
178                        }
179                    }
180
181                    sp<AMessage> notify = new AMessage('accu', id());
182                    notify->setSize("track-index", trackIndex);
183
184                    mRTPConn->addStream(
185                            track->mRTPSocket, track->mRTCPSocket,
186                            mSessionDesc, index,
187                            notify);
188
189                    track->mPacketSource =
190                        new APacketSource(mSessionDesc, index);
191
192                    mSetupTracksSuccessful = true;
193
194                    ++index;
195                    if (index < mSessionDesc->countTracks()) {
196                        setupTrack(index);
197                        break;
198                    }
199                } else {
200                    close(track->mRTPSocket);
201                    close(track->mRTCPSocket);
202
203                    mTracks.removeItemsAt(mTracks.size() - 1);
204                }
205
206                if (mSetupTracksSuccessful) {
207                    AString request = "PLAY ";
208                    request.append(mSessionURL);
209                    request.append(" RTSP/1.0\r\n");
210
211                    request.append("Session: ");
212                    request.append(mSessionID);
213                    request.append("\r\n");
214
215                    request.append("\r\n");
216
217                    sp<AMessage> reply = new AMessage('play', id());
218                    mConn->sendRequest(request.c_str(), reply);
219                } else {
220                    sp<AMessage> reply = new AMessage('disc', id());
221                    mConn->disconnect(reply);
222                }
223                break;
224            }
225
226            case 'play':
227            {
228                int32_t result;
229                CHECK(msg->findInt32("result", &result));
230
231                LOG(INFO) << "PLAY completed with result "
232                     << result << " (" << strerror(-result) << ")";
233
234                if (result == OK) {
235                    sp<RefBase> obj;
236                    CHECK(msg->findObject("response", &obj));
237                    sp<ARTSPResponse> response =
238                        static_cast<ARTSPResponse *>(obj.get());
239
240                    CHECK_EQ(response->mStatusCode, 200u);
241
242                    sp<AMessage> msg = new AMessage('abor', id());
243                    msg->post(60000000ll);
244                } else {
245                    sp<AMessage> reply = new AMessage('disc', id());
246                    mConn->disconnect(reply);
247                }
248
249                break;
250            }
251
252            case 'abor':
253            {
254                for (size_t i = 0; i < mTracks.size(); ++i) {
255                    mTracks.editItemAt(i).mPacketSource->signalEOS(
256                            ERROR_END_OF_STREAM);
257                }
258
259                sp<AMessage> reply = new AMessage('tear', id());
260
261                AString request;
262                request = "TEARDOWN ";
263
264                // XXX should use aggregate url from SDP here...
265                request.append(mSessionURL);
266                request.append(" RTSP/1.0\r\n");
267
268                request.append("Session: ");
269                request.append(mSessionID);
270                request.append("\r\n");
271
272                request.append("\r\n");
273
274                mConn->sendRequest(request.c_str(), reply);
275                break;
276            }
277
278            case 'tear':
279            {
280                int32_t result;
281                CHECK(msg->findInt32("result", &result));
282
283                LOG(INFO) << "TEARDOWN completed with result "
284                     << result << " (" << strerror(-result) << ")";
285
286                sp<AMessage> reply = new AMessage('disc', id());
287                mConn->disconnect(reply);
288                break;
289            }
290
291            case 'quit':
292            {
293                mLooper->stop();
294                break;
295            }
296
297            case 'accu':
298            {
299                size_t trackIndex;
300                CHECK(msg->findSize("track-index", &trackIndex));
301
302                sp<RefBase> obj;
303                CHECK(msg->findObject("access-unit", &obj));
304
305                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
306
307                uint64_t ntpTime;
308                CHECK(accessUnit->meta()->findInt64(
309                            "ntp-time", (int64_t *)&ntpTime));
310
311                if (mFirstAccessUnit) {
312                    mFirstAccessUnit = false;
313                    mFirstAccessUnitNTP = ntpTime;
314                }
315                if (ntpTime > mFirstAccessUnitNTP) {
316                    ntpTime -= mFirstAccessUnitNTP;
317                } else {
318                    ntpTime = 0;
319                }
320
321                accessUnit->meta()->setInt64("ntp-time", ntpTime);
322
323                TrackInfo *track = &mTracks.editItemAt(trackIndex);
324                track->mPacketSource->queueAccessUnit(accessUnit);
325                break;
326            }
327
328            default:
329                TRESPASS();
330                break;
331        }
332    }
333
334    sp<APacketSource> getPacketSource(size_t index) {
335        CHECK_GE(index, 0u);
336        CHECK_LT(index, mTracks.size());
337
338        return mTracks.editItemAt(index).mPacketSource;
339    }
340
341    size_t countTracks() const {
342        return mTracks.size();
343    }
344
345private:
346    sp<ALooper> mLooper;
347    sp<ARTSPConnection> mConn;
348    sp<ARTPConnection> mRTPConn;
349    sp<ASessionDescription> mSessionDesc;
350    AString mSessionURL;
351    AString mBaseURL;
352    AString mSessionID;
353    bool mSetupTracksSuccessful;
354    bool mFirstAccessUnit;
355    uint64_t mFirstAccessUnitNTP;
356
357    struct TrackInfo {
358        int mRTPSocket;
359        int mRTCPSocket;
360
361        sp<APacketSource> mPacketSource;
362    };
363    Vector<TrackInfo> mTracks;
364
365    void setupTrack(size_t index) {
366        AString url;
367        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
368
369        AString trackURL;
370        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
371
372        mTracks.push(TrackInfo());
373        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
374
375        unsigned rtpPort;
376        ARTPConnection::MakePortPair(
377                &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
378
379        AString request = "SETUP ";
380        request.append(trackURL);
381        request.append(" RTSP/1.0\r\n");
382
383        request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
384        request.append(rtpPort);
385        request.append("-");
386        request.append(rtpPort + 1);
387        request.append("\r\n");
388
389        if (index > 1) {
390            request.append("Session: ");
391            request.append(mSessionID);
392            request.append("\r\n");
393        }
394
395        request.append("\r\n");
396
397        sp<AMessage> reply = new AMessage('setu', id());
398        reply->setSize("index", index);
399        reply->setSize("track-index", mTracks.size() - 1);
400        mConn->sendRequest(request.c_str(), reply);
401    }
402
403    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
404        out->clear();
405
406        if (strncasecmp("rtsp://", baseURL, 7)) {
407            // Base URL must be absolute
408            return false;
409        }
410
411        if (!strncasecmp("rtsp://", url, 7)) {
412            // "url" is already an absolute URL, ignore base URL.
413            out->setTo(url);
414            return true;
415        }
416
417        size_t n = strlen(baseURL);
418        if (baseURL[n - 1] == '/') {
419            out->setTo(baseURL);
420            out->append(url);
421        } else {
422            char *slashPos = strrchr(baseURL, '/');
423
424            if (slashPos > &baseURL[6]) {
425                out->setTo(baseURL, slashPos - baseURL);
426            } else {
427                out->setTo(baseURL);
428            }
429
430            out->append("/");
431            out->append(url);
432        }
433
434        return true;
435    }
436
437    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
438};
439
440}  // namespace android
441
442#endif  // MY_HANDLER_H_
443