MyHandler.h revision 39ddf8e0f18766f7ba1e3246b774aa6ebd93eea8
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21#include "APacketSource.h"
22#include "ARTPConnection.h"
23#include "ARTSPConnection.h"
24#include "ASessionDescription.h"
25
26#include <media/stagefright/foundation/ABuffer.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/ALooper.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/MediaErrors.h>
31
32namespace android {
33
34struct MyHandler : public AHandler {
35    MyHandler(const char *url, const sp<ALooper> &looper)
36        : mLooper(looper),
37          mNetLooper(new ALooper),
38          mConn(new ARTSPConnection),
39          mRTPConn(new ARTPConnection),
40          mSessionURL(url),
41          mSetupTracksSuccessful(false) {
42
43        mNetLooper->start(false /* runOnCallingThread */,
44                          false /* canCallJava */,
45                          PRIORITY_HIGHEST);
46    }
47
48    void connect() {
49        mLooper->registerHandler(this);
50        mLooper->registerHandler(mConn);
51        (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
52        sp<AMessage> reply = new AMessage('conn', id());
53
54        mConn->connect(mSessionURL.c_str(), reply);
55    }
56
57    void disconnect() {
58        sp<AMessage> reply = new AMessage('disc', id());
59        mConn->disconnect(reply);
60    }
61
62    virtual void onMessageReceived(const sp<AMessage> &msg) {
63        switch (msg->what()) {
64            case 'conn':
65            {
66                int32_t result;
67                CHECK(msg->findInt32("result", &result));
68
69                LOG(INFO) << "connection request completed with result "
70                     << result << " (" << strerror(-result) << ")";
71
72                if (result == OK) {
73                    AString request;
74                    request = "DESCRIBE ";
75                    request.append(mSessionURL);
76                    request.append(" RTSP/1.0\r\n");
77                    request.append("Accept: application/sdp\r\n");
78                    request.append("\r\n");
79
80                    sp<AMessage> reply = new AMessage('desc', id());
81                    mConn->sendRequest(request.c_str(), reply);
82                }
83                break;
84            }
85
86            case 'disc':
87            {
88                LOG(INFO) << "disconnect completed";
89
90                (new AMessage('quit', id()))->post();
91                break;
92            }
93
94            case 'desc':
95            {
96                int32_t result;
97                CHECK(msg->findInt32("result", &result));
98
99                LOG(INFO) << "DESCRIBE completed with result "
100                     << result << " (" << strerror(-result) << ")";
101
102                if (result == OK) {
103                    sp<RefBase> obj;
104                    CHECK(msg->findObject("response", &obj));
105                    sp<ARTSPResponse> response =
106                        static_cast<ARTSPResponse *>(obj.get());
107
108                    if (response->mStatusCode == 302) {
109                        ssize_t i = response->mHeaders.indexOfKey("location");
110                        CHECK_GE(i, 0);
111
112                        mSessionURL = response->mHeaders.valueAt(i);
113
114                        AString request;
115                        request = "DESCRIBE ";
116                        request.append(mSessionURL);
117                        request.append(" RTSP/1.0\r\n");
118                        request.append("Accept: application/sdp\r\n");
119                        request.append("\r\n");
120
121                        sp<AMessage> reply = new AMessage('desc', id());
122                        mConn->sendRequest(request.c_str(), reply);
123                        break;
124                    }
125
126                    CHECK_EQ(response->mStatusCode, 200u);
127
128                    mSessionDesc = new ASessionDescription;
129
130                    mSessionDesc->setTo(
131                            response->mContent->data(),
132                            response->mContent->size());
133
134                    CHECK(mSessionDesc->isValid());
135
136                    ssize_t i = response->mHeaders.indexOfKey("content-base");
137                    if (i >= 0) {
138                        mBaseURL = response->mHeaders.valueAt(i);
139                    } else {
140                        i = response->mHeaders.indexOfKey("content-location");
141                        if (i >= 0) {
142                            mBaseURL = response->mHeaders.valueAt(i);
143                        } else {
144                            mBaseURL = mSessionURL;
145                        }
146                    }
147
148                    CHECK_GT(mSessionDesc->countTracks(), 1u);
149                    setupTrack(1);
150                } else {
151                    sp<AMessage> reply = new AMessage('disc', id());
152                    mConn->disconnect(reply);
153                }
154                break;
155            }
156
157            case 'setu':
158            {
159                size_t index;
160                CHECK(msg->findSize("index", &index));
161
162                TrackInfo *track = NULL;
163                size_t trackIndex;
164                if (msg->findSize("track-index", &trackIndex)) {
165                    track = &mTracks.editItemAt(trackIndex);
166                }
167
168                int32_t result;
169                CHECK(msg->findInt32("result", &result));
170
171                LOG(INFO) << "SETUP(" << index << ") completed with result "
172                     << result << " (" << strerror(-result) << ")";
173
174                if (result != OK) {
175                    if (track) {
176                        close(track->mRTPSocket);
177                        close(track->mRTCPSocket);
178
179                        mTracks.removeItemsAt(trackIndex);
180                    }
181                } else {
182                    CHECK(track != NULL);
183
184                    sp<RefBase> obj;
185                    CHECK(msg->findObject("response", &obj));
186                    sp<ARTSPResponse> response =
187                        static_cast<ARTSPResponse *>(obj.get());
188
189                    CHECK_EQ(response->mStatusCode, 200u);
190
191                    ssize_t i = response->mHeaders.indexOfKey("session");
192                    CHECK_GE(i, 0);
193
194                    if (index == 1) {
195                        mSessionID = response->mHeaders.valueAt(i);
196                        i = mSessionID.find(";");
197                        if (i >= 0) {
198                            // Remove options, i.e. ";timeout=90"
199                            mSessionID.erase(i, mSessionID.size() - i);
200                        }
201                    }
202
203                    sp<AMessage> notify = new AMessage('accu', id());
204                    notify->setSize("track-index", trackIndex);
205
206                    mRTPConn->addStream(
207                            track->mRTPSocket, track->mRTCPSocket,
208                            mSessionDesc, index,
209                            notify);
210
211                    mSetupTracksSuccessful = true;
212                }
213
214                ++index;
215                if (index < mSessionDesc->countTracks()) {
216                    setupTrack(index);
217                } else if (mSetupTracksSuccessful) {
218                    AString request = "PLAY ";
219                    request.append(mSessionURL);
220                    request.append(" RTSP/1.0\r\n");
221
222                    request.append("Session: ");
223                    request.append(mSessionID);
224                    request.append("\r\n");
225
226                    request.append("\r\n");
227
228                    sp<AMessage> reply = new AMessage('play', id());
229                    mConn->sendRequest(request.c_str(), reply);
230                } else {
231                    sp<AMessage> reply = new AMessage('disc', id());
232                    mConn->disconnect(reply);
233                }
234                break;
235            }
236
237            case 'play':
238            {
239                int32_t result;
240                CHECK(msg->findInt32("result", &result));
241
242                LOG(INFO) << "PLAY completed with result "
243                     << result << " (" << strerror(-result) << ")";
244
245                if (result == OK) {
246                    sp<RefBase> obj;
247                    CHECK(msg->findObject("response", &obj));
248                    sp<ARTSPResponse> response =
249                        static_cast<ARTSPResponse *>(obj.get());
250
251                    CHECK_EQ(response->mStatusCode, 200u);
252
253                    sp<AMessage> msg = new AMessage('abor', id());
254                    msg->post(60000000ll);
255                } else {
256                    sp<AMessage> reply = new AMessage('disc', id());
257                    mConn->disconnect(reply);
258                }
259
260                break;
261            }
262
263            case 'abor':
264            {
265                for (size_t i = 0; i < mTracks.size(); ++i) {
266                    mTracks.editItemAt(i).mPacketSource->signalEOS(
267                            ERROR_END_OF_STREAM);
268                }
269
270                sp<AMessage> reply = new AMessage('tear', id());
271
272                AString request;
273                request = "TEARDOWN ";
274
275                // XXX should use aggregate url from SDP here...
276                request.append(mSessionURL);
277                request.append(" RTSP/1.0\r\n");
278
279                request.append("Session: ");
280                request.append(mSessionID);
281                request.append("\r\n");
282
283                request.append("\r\n");
284
285                mConn->sendRequest(request.c_str(), reply);
286                break;
287            }
288
289            case 'tear':
290            {
291                int32_t result;
292                CHECK(msg->findInt32("result", &result));
293
294                LOG(INFO) << "TEARDOWN completed with result "
295                     << result << " (" << strerror(-result) << ")";
296
297                sp<AMessage> reply = new AMessage('disc', id());
298                mConn->disconnect(reply);
299                break;
300            }
301
302            case 'quit':
303            {
304                break;
305            }
306
307            case 'accu':
308            {
309                size_t trackIndex;
310                CHECK(msg->findSize("track-index", &trackIndex));
311
312                sp<RefBase> obj;
313                CHECK(msg->findObject("access-unit", &obj));
314
315                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
316
317                uint64_t ntpTime;
318                CHECK(accessUnit->meta()->findInt64(
319                            "ntp-time", (int64_t *)&ntpTime));
320
321                accessUnit->meta()->setInt64("ntp-time", ntpTime);
322
323#if 0
324                int32_t damaged;
325                if (accessUnit->meta()->findInt32("damaged", &damaged)
326                        && damaged != 0) {
327                    LOG(INFO) << "ignoring damaged AU";
328                } else
329#endif
330                {
331                    TrackInfo *track = &mTracks.editItemAt(trackIndex);
332                    track->mPacketSource->queueAccessUnit(accessUnit);
333                }
334                break;
335            }
336
337            default:
338                TRESPASS();
339                break;
340        }
341    }
342
343    sp<APacketSource> getPacketSource(size_t index) {
344        CHECK_GE(index, 0u);
345        CHECK_LT(index, mTracks.size());
346
347        return mTracks.editItemAt(index).mPacketSource;
348    }
349
350    size_t countTracks() const {
351        return mTracks.size();
352    }
353
354private:
355    sp<ALooper> mLooper;
356    sp<ALooper> mNetLooper;
357    sp<ARTSPConnection> mConn;
358    sp<ARTPConnection> mRTPConn;
359    sp<ASessionDescription> mSessionDesc;
360    AString mSessionURL;
361    AString mBaseURL;
362    AString mSessionID;
363    bool mSetupTracksSuccessful;
364
365    struct TrackInfo {
366        int mRTPSocket;
367        int mRTCPSocket;
368
369        sp<APacketSource> mPacketSource;
370    };
371    Vector<TrackInfo> mTracks;
372
373    void setupTrack(size_t index) {
374        sp<APacketSource> source =
375            new APacketSource(mSessionDesc, index);
376        if (source->initCheck() != OK) {
377            LOG(WARNING) << "Unsupported format. Ignoring track #"
378                         << index << ".";
379
380            sp<AMessage> reply = new AMessage('setu', id());
381            reply->setSize("index", index);
382            reply->setInt32("result", ERROR_UNSUPPORTED);
383            reply->post();
384            return;
385        }
386
387        AString url;
388        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
389
390        AString trackURL;
391        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
392
393        mTracks.push(TrackInfo());
394        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
395        info->mPacketSource = source;
396
397        unsigned rtpPort;
398        ARTPConnection::MakePortPair(
399                &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
400
401        AString request = "SETUP ";
402        request.append(trackURL);
403        request.append(" RTSP/1.0\r\n");
404
405        request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
406        request.append(rtpPort);
407        request.append("-");
408        request.append(rtpPort + 1);
409        request.append("\r\n");
410
411        if (index > 1) {
412            request.append("Session: ");
413            request.append(mSessionID);
414            request.append("\r\n");
415        }
416
417        request.append("\r\n");
418
419        sp<AMessage> reply = new AMessage('setu', id());
420        reply->setSize("index", index);
421        reply->setSize("track-index", mTracks.size() - 1);
422        mConn->sendRequest(request.c_str(), reply);
423    }
424
425    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
426        out->clear();
427
428        if (strncasecmp("rtsp://", baseURL, 7)) {
429            // Base URL must be absolute
430            return false;
431        }
432
433        if (!strncasecmp("rtsp://", url, 7)) {
434            // "url" is already an absolute URL, ignore base URL.
435            out->setTo(url);
436            return true;
437        }
438
439        size_t n = strlen(baseURL);
440        if (baseURL[n - 1] == '/') {
441            out->setTo(baseURL);
442            out->append(url);
443        } else {
444            char *slashPos = strrchr(baseURL, '/');
445
446            if (slashPos > &baseURL[6]) {
447                out->setTo(baseURL, slashPos - baseURL);
448            } else {
449                out->setTo(baseURL);
450            }
451
452            out->append("/");
453            out->append(url);
454        }
455
456        return true;
457    }
458
459    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
460};
461
462}  // namespace android
463
464#endif  // MY_HANDLER_H_
465