MyHandler.h revision 8370be11debc574b4a9fee62009009d999e29fa3
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21#include "APacketSource.h"
22#include "ARTPConnection.h"
23#include "ARTSPConnection.h"
24#include "ASessionDescription.h"
25
26#include <media/stagefright/foundation/ABuffer.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/ALooper.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/MediaErrors.h>
31
32namespace android {
33
34struct MyHandler : public AHandler {
35    MyHandler(const char *url, const sp<ALooper> &looper)
36        : mLooper(looper),
37          mNetLooper(new ALooper),
38          mConn(new ARTSPConnection),
39          mRTPConn(new ARTPConnection),
40          mSessionURL(url),
41          mSetupTracksSuccessful(false) {
42
43        mNetLooper->start(false /* runOnCallingThread */,
44                          false /* canCallJava */,
45                          PRIORITY_HIGHEST);
46    }
47
48    void connect(const sp<AMessage> &doneMsg) {
49        mDoneMsg = doneMsg;
50
51        mLooper->registerHandler(this);
52        mLooper->registerHandler(mConn);
53        (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
54
55        sp<AMessage> reply = new AMessage('conn', id());
56        mConn->connect(mSessionURL.c_str(), reply);
57    }
58
59    void disconnect(const sp<AMessage> &doneMsg) {
60        mDoneMsg = doneMsg;
61
62        (new AMessage('abor', id()))->post();
63    }
64
65    virtual void onMessageReceived(const sp<AMessage> &msg) {
66        switch (msg->what()) {
67            case 'conn':
68            {
69                int32_t result;
70                CHECK(msg->findInt32("result", &result));
71
72                LOG(INFO) << "connection request completed with result "
73                     << result << " (" << strerror(-result) << ")";
74
75                if (result == OK) {
76                    AString request;
77                    request = "DESCRIBE ";
78                    request.append(mSessionURL);
79                    request.append(" RTSP/1.0\r\n");
80                    request.append("Accept: application/sdp\r\n");
81                    request.append("\r\n");
82
83                    sp<AMessage> reply = new AMessage('desc', id());
84                    mConn->sendRequest(request.c_str(), reply);
85                }
86                break;
87            }
88
89            case 'disc':
90            {
91                LOG(INFO) << "disconnect completed";
92
93                (new AMessage('quit', id()))->post();
94                break;
95            }
96
97            case 'desc':
98            {
99                int32_t result;
100                CHECK(msg->findInt32("result", &result));
101
102                LOG(INFO) << "DESCRIBE completed with result "
103                     << result << " (" << strerror(-result) << ")";
104
105                if (result == OK) {
106                    sp<RefBase> obj;
107                    CHECK(msg->findObject("response", &obj));
108                    sp<ARTSPResponse> response =
109                        static_cast<ARTSPResponse *>(obj.get());
110
111                    if (response->mStatusCode == 302) {
112                        ssize_t i = response->mHeaders.indexOfKey("location");
113                        CHECK_GE(i, 0);
114
115                        mSessionURL = response->mHeaders.valueAt(i);
116
117                        AString request;
118                        request = "DESCRIBE ";
119                        request.append(mSessionURL);
120                        request.append(" RTSP/1.0\r\n");
121                        request.append("Accept: application/sdp\r\n");
122                        request.append("\r\n");
123
124                        sp<AMessage> reply = new AMessage('desc', id());
125                        mConn->sendRequest(request.c_str(), reply);
126                        break;
127                    }
128
129                    CHECK_EQ(response->mStatusCode, 200u);
130
131                    mSessionDesc = new ASessionDescription;
132
133                    mSessionDesc->setTo(
134                            response->mContent->data(),
135                            response->mContent->size());
136
137                    CHECK(mSessionDesc->isValid());
138
139                    ssize_t i = response->mHeaders.indexOfKey("content-base");
140                    if (i >= 0) {
141                        mBaseURL = response->mHeaders.valueAt(i);
142                    } else {
143                        i = response->mHeaders.indexOfKey("content-location");
144                        if (i >= 0) {
145                            mBaseURL = response->mHeaders.valueAt(i);
146                        } else {
147                            mBaseURL = mSessionURL;
148                        }
149                    }
150
151                    CHECK_GT(mSessionDesc->countTracks(), 1u);
152                    setupTrack(1);
153                } else {
154                    sp<AMessage> reply = new AMessage('disc', id());
155                    mConn->disconnect(reply);
156                }
157                break;
158            }
159
160            case 'setu':
161            {
162                size_t index;
163                CHECK(msg->findSize("index", &index));
164
165                TrackInfo *track = NULL;
166                size_t trackIndex;
167                if (msg->findSize("track-index", &trackIndex)) {
168                    track = &mTracks.editItemAt(trackIndex);
169                }
170
171                int32_t result;
172                CHECK(msg->findInt32("result", &result));
173
174                LOG(INFO) << "SETUP(" << index << ") completed with result "
175                     << result << " (" << strerror(-result) << ")";
176
177                if (result != OK) {
178                    if (track) {
179                        close(track->mRTPSocket);
180                        close(track->mRTCPSocket);
181
182                        mTracks.removeItemsAt(trackIndex);
183                    }
184                } else {
185                    CHECK(track != NULL);
186
187                    sp<RefBase> obj;
188                    CHECK(msg->findObject("response", &obj));
189                    sp<ARTSPResponse> response =
190                        static_cast<ARTSPResponse *>(obj.get());
191
192                    CHECK_EQ(response->mStatusCode, 200u);
193
194                    ssize_t i = response->mHeaders.indexOfKey("session");
195                    CHECK_GE(i, 0);
196
197                    if (index == 1) {
198                        mSessionID = response->mHeaders.valueAt(i);
199                        i = mSessionID.find(";");
200                        if (i >= 0) {
201                            // Remove options, i.e. ";timeout=90"
202                            mSessionID.erase(i, mSessionID.size() - i);
203                        }
204                    }
205
206                    sp<AMessage> notify = new AMessage('accu', id());
207                    notify->setSize("track-index", trackIndex);
208
209                    mRTPConn->addStream(
210                            track->mRTPSocket, track->mRTCPSocket,
211                            mSessionDesc, index,
212                            notify);
213
214                    mSetupTracksSuccessful = true;
215                }
216
217                ++index;
218                if (index < mSessionDesc->countTracks()) {
219                    setupTrack(index);
220                } else if (mSetupTracksSuccessful) {
221                    AString request = "PLAY ";
222                    request.append(mSessionURL);
223                    request.append(" RTSP/1.0\r\n");
224
225                    request.append("Session: ");
226                    request.append(mSessionID);
227                    request.append("\r\n");
228
229                    request.append("\r\n");
230
231                    sp<AMessage> reply = new AMessage('play', id());
232                    mConn->sendRequest(request.c_str(), reply);
233                } else {
234                    sp<AMessage> reply = new AMessage('disc', id());
235                    mConn->disconnect(reply);
236                }
237                break;
238            }
239
240            case 'play':
241            {
242                int32_t result;
243                CHECK(msg->findInt32("result", &result));
244
245                LOG(INFO) << "PLAY completed with result "
246                     << result << " (" << strerror(-result) << ")";
247
248                if (result == OK) {
249                    sp<RefBase> obj;
250                    CHECK(msg->findObject("response", &obj));
251                    sp<ARTSPResponse> response =
252                        static_cast<ARTSPResponse *>(obj.get());
253
254                    CHECK_EQ(response->mStatusCode, 200u);
255
256                    mDoneMsg->setInt32("result", OK);
257                    mDoneMsg->post();
258                    mDoneMsg = NULL;
259                } else {
260                    sp<AMessage> reply = new AMessage('disc', id());
261                    mConn->disconnect(reply);
262                }
263
264                break;
265            }
266
267            case 'abor':
268            {
269                for (size_t i = 0; i < mTracks.size(); ++i) {
270                    mTracks.editItemAt(i).mPacketSource->signalEOS(
271                            ERROR_END_OF_STREAM);
272                }
273
274                sp<AMessage> reply = new AMessage('tear', id());
275
276                AString request;
277                request = "TEARDOWN ";
278
279                // XXX should use aggregate url from SDP here...
280                request.append(mSessionURL);
281                request.append(" RTSP/1.0\r\n");
282
283                request.append("Session: ");
284                request.append(mSessionID);
285                request.append("\r\n");
286
287                request.append("\r\n");
288
289                mConn->sendRequest(request.c_str(), reply);
290                break;
291            }
292
293            case 'tear':
294            {
295                int32_t result;
296                CHECK(msg->findInt32("result", &result));
297
298                LOG(INFO) << "TEARDOWN completed with result "
299                     << result << " (" << strerror(-result) << ")";
300
301                sp<AMessage> reply = new AMessage('disc', id());
302                mConn->disconnect(reply);
303                break;
304            }
305
306            case 'quit':
307            {
308                if (mDoneMsg != NULL) {
309                    mDoneMsg->setInt32("result", UNKNOWN_ERROR);
310                    mDoneMsg->post();
311                    mDoneMsg = NULL;
312                }
313                break;
314            }
315
316            case 'accu':
317            {
318                size_t trackIndex;
319                CHECK(msg->findSize("track-index", &trackIndex));
320
321                int32_t eos;
322                if (msg->findInt32("eos", &eos)) {
323                    LOG(INFO) << "received BYE on track index " << trackIndex;
324#if 0
325                    TrackInfo *track = &mTracks.editItemAt(trackIndex);
326                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
327#endif
328                    return;
329                }
330
331                sp<RefBase> obj;
332                CHECK(msg->findObject("access-unit", &obj));
333
334                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
335
336                uint64_t ntpTime;
337                CHECK(accessUnit->meta()->findInt64(
338                            "ntp-time", (int64_t *)&ntpTime));
339
340                accessUnit->meta()->setInt64("ntp-time", ntpTime);
341
342#if 0
343                int32_t damaged;
344                if (accessUnit->meta()->findInt32("damaged", &damaged)
345                        && damaged != 0) {
346                    LOG(INFO) << "ignoring damaged AU";
347                } else
348#endif
349                {
350                    TrackInfo *track = &mTracks.editItemAt(trackIndex);
351                    track->mPacketSource->queueAccessUnit(accessUnit);
352                }
353                break;
354            }
355
356            default:
357                TRESPASS();
358                break;
359        }
360    }
361
362    sp<APacketSource> getPacketSource(size_t index) {
363        CHECK_GE(index, 0u);
364        CHECK_LT(index, mTracks.size());
365
366        return mTracks.editItemAt(index).mPacketSource;
367    }
368
369    size_t countTracks() const {
370        return mTracks.size();
371    }
372
373private:
374    sp<ALooper> mLooper;
375    sp<ALooper> mNetLooper;
376    sp<ARTSPConnection> mConn;
377    sp<ARTPConnection> mRTPConn;
378    sp<ASessionDescription> mSessionDesc;
379    AString mSessionURL;
380    AString mBaseURL;
381    AString mSessionID;
382    bool mSetupTracksSuccessful;
383
384    struct TrackInfo {
385        int mRTPSocket;
386        int mRTCPSocket;
387
388        sp<APacketSource> mPacketSource;
389    };
390    Vector<TrackInfo> mTracks;
391
392    sp<AMessage> mDoneMsg;
393
394    void setupTrack(size_t index) {
395        sp<APacketSource> source =
396            new APacketSource(mSessionDesc, index);
397        if (source->initCheck() != OK) {
398            LOG(WARNING) << "Unsupported format. Ignoring track #"
399                         << index << ".";
400
401            sp<AMessage> reply = new AMessage('setu', id());
402            reply->setSize("index", index);
403            reply->setInt32("result", ERROR_UNSUPPORTED);
404            reply->post();
405            return;
406        }
407
408        AString url;
409        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
410
411        AString trackURL;
412        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
413
414        mTracks.push(TrackInfo());
415        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
416        info->mPacketSource = source;
417
418        unsigned rtpPort;
419        ARTPConnection::MakePortPair(
420                &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
421
422        AString request = "SETUP ";
423        request.append(trackURL);
424        request.append(" RTSP/1.0\r\n");
425
426        request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
427        request.append(rtpPort);
428        request.append("-");
429        request.append(rtpPort + 1);
430        request.append("\r\n");
431
432        if (index > 1) {
433            request.append("Session: ");
434            request.append(mSessionID);
435            request.append("\r\n");
436        }
437
438        request.append("\r\n");
439
440        sp<AMessage> reply = new AMessage('setu', id());
441        reply->setSize("index", index);
442        reply->setSize("track-index", mTracks.size() - 1);
443        mConn->sendRequest(request.c_str(), reply);
444    }
445
446    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
447        out->clear();
448
449        if (strncasecmp("rtsp://", baseURL, 7)) {
450            // Base URL must be absolute
451            return false;
452        }
453
454        if (!strncasecmp("rtsp://", url, 7)) {
455            // "url" is already an absolute URL, ignore base URL.
456            out->setTo(url);
457            return true;
458        }
459
460        size_t n = strlen(baseURL);
461        if (baseURL[n - 1] == '/') {
462            out->setTo(baseURL);
463            out->append(url);
464        } else {
465            char *slashPos = strrchr(baseURL, '/');
466
467            if (slashPos > &baseURL[6]) {
468                out->setTo(baseURL, slashPos - baseURL);
469            } else {
470                out->setTo(baseURL);
471            }
472
473            out->append("/");
474            out->append(url);
475        }
476
477        return true;
478    }
479
480    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
481};
482
483}  // namespace android
484
485#endif  // MY_HANDLER_H_
486