MyHandler.h revision e0dd7d396051942ccce0429d7a1fe968d63ac3f7
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21#include "APacketSource.h"
22#include "ARTPConnection.h"
23#include "ARTSPConnection.h"
24#include "ASessionDescription.h"
25
26#include <media/stagefright/foundation/ABuffer.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/ALooper.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/MediaErrors.h>
31
32namespace android {
33
34struct MyHandler : public AHandler {
35    MyHandler(const char *url, const sp<ALooper> &looper)
36        : mLooper(looper),
37          mNetLooper(new ALooper),
38          mConn(new ARTSPConnection),
39          mRTPConn(new ARTPConnection),
40          mSessionURL(url),
41          mSetupTracksSuccessful(false),
42          mSeekPending(false),
43          mFirstAccessUnit(true),
44          mFirstAccessUnitNTP(0) {
45
46        mNetLooper->start(false /* runOnCallingThread */,
47                          false /* canCallJava */,
48                          PRIORITY_HIGHEST);
49    }
50
51    void connect(const sp<AMessage> &doneMsg) {
52        mDoneMsg = doneMsg;
53
54        mLooper->registerHandler(this);
55        mLooper->registerHandler(mConn);
56        (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
57
58        sp<AMessage> reply = new AMessage('conn', id());
59        mConn->connect(mSessionURL.c_str(), reply);
60    }
61
62    void disconnect(const sp<AMessage> &doneMsg) {
63        mDoneMsg = doneMsg;
64
65        (new AMessage('abor', id()))->post();
66    }
67
68    void seek(int64_t timeUs) {
69        sp<AMessage> msg = new AMessage('seek', id());
70        msg->setInt64("time", timeUs);
71        msg->post();
72    }
73
74    virtual void onMessageReceived(const sp<AMessage> &msg) {
75        switch (msg->what()) {
76            case 'conn':
77            {
78                int32_t result;
79                CHECK(msg->findInt32("result", &result));
80
81                LOG(INFO) << "connection request completed with result "
82                     << result << " (" << strerror(-result) << ")";
83
84                if (result == OK) {
85                    AString request;
86                    request = "DESCRIBE ";
87                    request.append(mSessionURL);
88                    request.append(" RTSP/1.0\r\n");
89                    request.append("Accept: application/sdp\r\n");
90                    request.append("\r\n");
91
92                    sp<AMessage> reply = new AMessage('desc', id());
93                    mConn->sendRequest(request.c_str(), reply);
94                }
95                break;
96            }
97
98            case 'disc':
99            {
100                (new AMessage('quit', id()))->post();
101                break;
102            }
103
104            case 'desc':
105            {
106                int32_t result;
107                CHECK(msg->findInt32("result", &result));
108
109                LOG(INFO) << "DESCRIBE completed with result "
110                     << result << " (" << strerror(-result) << ")";
111
112                if (result == OK) {
113                    sp<RefBase> obj;
114                    CHECK(msg->findObject("response", &obj));
115                    sp<ARTSPResponse> response =
116                        static_cast<ARTSPResponse *>(obj.get());
117
118                    if (response->mStatusCode == 302) {
119                        ssize_t i = response->mHeaders.indexOfKey("location");
120                        CHECK_GE(i, 0);
121
122                        mSessionURL = response->mHeaders.valueAt(i);
123
124                        AString request;
125                        request = "DESCRIBE ";
126                        request.append(mSessionURL);
127                        request.append(" RTSP/1.0\r\n");
128                        request.append("Accept: application/sdp\r\n");
129                        request.append("\r\n");
130
131                        sp<AMessage> reply = new AMessage('desc', id());
132                        mConn->sendRequest(request.c_str(), reply);
133                        break;
134                    }
135
136                    CHECK_EQ(response->mStatusCode, 200u);
137
138                    mSessionDesc = new ASessionDescription;
139
140                    mSessionDesc->setTo(
141                            response->mContent->data(),
142                            response->mContent->size());
143
144                    CHECK(mSessionDesc->isValid());
145
146                    ssize_t i = response->mHeaders.indexOfKey("content-base");
147                    if (i >= 0) {
148                        mBaseURL = response->mHeaders.valueAt(i);
149                    } else {
150                        i = response->mHeaders.indexOfKey("content-location");
151                        if (i >= 0) {
152                            mBaseURL = response->mHeaders.valueAt(i);
153                        } else {
154                            mBaseURL = mSessionURL;
155                        }
156                    }
157
158                    CHECK_GT(mSessionDesc->countTracks(), 1u);
159                    setupTrack(1);
160                } else {
161                    sp<AMessage> reply = new AMessage('disc', id());
162                    mConn->disconnect(reply);
163                }
164                break;
165            }
166
167            case 'setu':
168            {
169                size_t index;
170                CHECK(msg->findSize("index", &index));
171
172                TrackInfo *track = NULL;
173                size_t trackIndex;
174                if (msg->findSize("track-index", &trackIndex)) {
175                    track = &mTracks.editItemAt(trackIndex);
176                }
177
178                int32_t result;
179                CHECK(msg->findInt32("result", &result));
180
181                LOG(INFO) << "SETUP(" << index << ") completed with result "
182                     << result << " (" << strerror(-result) << ")";
183
184                if (result != OK) {
185                    if (track) {
186                        close(track->mRTPSocket);
187                        close(track->mRTCPSocket);
188
189                        mTracks.removeItemsAt(trackIndex);
190                    }
191                } else {
192                    CHECK(track != NULL);
193
194                    sp<RefBase> obj;
195                    CHECK(msg->findObject("response", &obj));
196                    sp<ARTSPResponse> response =
197                        static_cast<ARTSPResponse *>(obj.get());
198
199                    CHECK_EQ(response->mStatusCode, 200u);
200
201                    ssize_t i = response->mHeaders.indexOfKey("session");
202                    CHECK_GE(i, 0);
203
204                    if (index == 1) {
205                        mSessionID = response->mHeaders.valueAt(i);
206                        i = mSessionID.find(";");
207                        if (i >= 0) {
208                            // Remove options, i.e. ";timeout=90"
209                            mSessionID.erase(i, mSessionID.size() - i);
210                        }
211                    }
212
213                    sp<AMessage> notify = new AMessage('accu', id());
214                    notify->setSize("track-index", trackIndex);
215
216                    mRTPConn->addStream(
217                            track->mRTPSocket, track->mRTCPSocket,
218                            mSessionDesc, index,
219                            notify);
220
221                    mSetupTracksSuccessful = true;
222                }
223
224                ++index;
225                if (index < mSessionDesc->countTracks()) {
226                    setupTrack(index);
227                } else if (mSetupTracksSuccessful) {
228                    AString request = "PLAY ";
229                    request.append(mSessionURL);
230                    request.append(" RTSP/1.0\r\n");
231
232                    request.append("Session: ");
233                    request.append(mSessionID);
234                    request.append("\r\n");
235
236                    request.append("\r\n");
237
238                    sp<AMessage> reply = new AMessage('play', id());
239                    mConn->sendRequest(request.c_str(), reply);
240                } else {
241                    sp<AMessage> reply = new AMessage('disc', id());
242                    mConn->disconnect(reply);
243                }
244                break;
245            }
246
247            case 'play':
248            {
249                int32_t result;
250                CHECK(msg->findInt32("result", &result));
251
252                LOG(INFO) << "PLAY completed with result "
253                     << result << " (" << strerror(-result) << ")";
254
255                if (result == OK) {
256                    sp<RefBase> obj;
257                    CHECK(msg->findObject("response", &obj));
258                    sp<ARTSPResponse> response =
259                        static_cast<ARTSPResponse *>(obj.get());
260
261                    CHECK_EQ(response->mStatusCode, 200u);
262
263                    mDoneMsg->setInt32("result", OK);
264                    mDoneMsg->post();
265                    mDoneMsg = NULL;
266                } else {
267                    sp<AMessage> reply = new AMessage('disc', id());
268                    mConn->disconnect(reply);
269                }
270
271                break;
272            }
273
274            case 'abor':
275            {
276                for (size_t i = 0; i < mTracks.size(); ++i) {
277                    mTracks.editItemAt(i).mPacketSource->signalEOS(
278                            ERROR_END_OF_STREAM);
279                }
280
281                sp<AMessage> reply = new AMessage('tear', id());
282
283                AString request;
284                request = "TEARDOWN ";
285
286                // XXX should use aggregate url from SDP here...
287                request.append(mSessionURL);
288                request.append(" RTSP/1.0\r\n");
289
290                request.append("Session: ");
291                request.append(mSessionID);
292                request.append("\r\n");
293
294                request.append("\r\n");
295
296                mConn->sendRequest(request.c_str(), reply);
297                break;
298            }
299
300            case 'tear':
301            {
302                int32_t result;
303                CHECK(msg->findInt32("result", &result));
304
305                LOG(INFO) << "TEARDOWN completed with result "
306                     << result << " (" << strerror(-result) << ")";
307
308                sp<AMessage> reply = new AMessage('disc', id());
309                mConn->disconnect(reply);
310                break;
311            }
312
313            case 'quit':
314            {
315                if (mDoneMsg != NULL) {
316                    mDoneMsg->setInt32("result", UNKNOWN_ERROR);
317                    mDoneMsg->post();
318                    mDoneMsg = NULL;
319                }
320                break;
321            }
322
323            case 'accu':
324            {
325                size_t trackIndex;
326                CHECK(msg->findSize("track-index", &trackIndex));
327
328                int32_t eos;
329                if (msg->findInt32("eos", &eos)) {
330                    LOG(INFO) << "received BYE on track index " << trackIndex;
331#if 0
332                    TrackInfo *track = &mTracks.editItemAt(trackIndex);
333                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
334#endif
335                    return;
336                }
337
338                sp<RefBase> obj;
339                CHECK(msg->findObject("access-unit", &obj));
340
341                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
342
343                uint64_t ntpTime;
344                CHECK(accessUnit->meta()->findInt64(
345                            "ntp-time", (int64_t *)&ntpTime));
346
347                if (mFirstAccessUnit) {
348                    mFirstAccessUnit = false;
349                    mFirstAccessUnitNTP = ntpTime;
350                }
351
352                if (ntpTime >= mFirstAccessUnitNTP) {
353                    ntpTime -= mFirstAccessUnitNTP;
354                } else {
355                    ntpTime = 0;
356                }
357
358                int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
359
360                accessUnit->meta()->setInt64("timeUs", timeUs);
361
362#if 0
363                int32_t damaged;
364                if (accessUnit->meta()->findInt32("damaged", &damaged)
365                        && damaged != 0) {
366                    LOG(INFO) << "ignoring damaged AU";
367                } else
368#endif
369                {
370                    TrackInfo *track = &mTracks.editItemAt(trackIndex);
371                    track->mPacketSource->queueAccessUnit(accessUnit);
372                }
373                break;
374            }
375
376            case 'seek':
377            {
378                if (mSeekPending) {
379                    break;
380                }
381
382                int64_t timeUs;
383                CHECK(msg->findInt64("time", &timeUs));
384
385                mSeekPending = true;
386
387                AString request = "PAUSE ";
388                request.append(mSessionURL);
389                request.append(" RTSP/1.0\r\n");
390
391                request.append("Session: ");
392                request.append(mSessionID);
393                request.append("\r\n");
394
395                request.append("\r\n");
396
397                sp<AMessage> reply = new AMessage('see1', id());
398                reply->setInt64("time", timeUs);
399                mConn->sendRequest(request.c_str(), reply);
400                break;
401            }
402
403            case 'see1':
404            {
405                int64_t timeUs;
406                CHECK(msg->findInt64("time", &timeUs));
407
408                AString request = "PLAY ";
409                request.append(mSessionURL);
410                request.append(" RTSP/1.0\r\n");
411
412                request.append("Session: ");
413                request.append(mSessionID);
414                request.append("\r\n");
415
416                request.append(
417                        StringPrintf(
418                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
419
420                request.append("\r\n");
421
422                sp<AMessage> reply = new AMessage('see2', id());
423                mConn->sendRequest(request.c_str(), reply);
424                break;
425            }
426
427            case 'see2':
428            {
429                CHECK(mSeekPending);
430
431                LOG(INFO) << "seek completed.";
432                mSeekPending = false;
433
434                int32_t result;
435                CHECK(msg->findInt32("result", &result));
436                if (result != OK) {
437                    LOG(ERROR) << "seek FAILED";
438                    break;
439                }
440
441                sp<RefBase> obj;
442                CHECK(msg->findObject("response", &obj));
443                sp<ARTSPResponse> response =
444                    static_cast<ARTSPResponse *>(obj.get());
445
446                CHECK_EQ(response->mStatusCode, 200u);
447
448                for (size_t i = 0; i < mTracks.size(); ++i) {
449                    mTracks.editItemAt(i).mPacketSource->flushQueue();
450                }
451                break;
452            }
453
454            default:
455                TRESPASS();
456                break;
457        }
458    }
459
460    sp<APacketSource> getPacketSource(size_t index) {
461        CHECK_GE(index, 0u);
462        CHECK_LT(index, mTracks.size());
463
464        return mTracks.editItemAt(index).mPacketSource;
465    }
466
467    size_t countTracks() const {
468        return mTracks.size();
469    }
470
471private:
472    sp<ALooper> mLooper;
473    sp<ALooper> mNetLooper;
474    sp<ARTSPConnection> mConn;
475    sp<ARTPConnection> mRTPConn;
476    sp<ASessionDescription> mSessionDesc;
477    AString mSessionURL;
478    AString mBaseURL;
479    AString mSessionID;
480    bool mSetupTracksSuccessful;
481    bool mSeekPending;
482    bool mFirstAccessUnit;
483    uint64_t mFirstAccessUnitNTP;
484
485    struct TrackInfo {
486        int mRTPSocket;
487        int mRTCPSocket;
488
489        sp<APacketSource> mPacketSource;
490    };
491    Vector<TrackInfo> mTracks;
492
493    sp<AMessage> mDoneMsg;
494
495    void setupTrack(size_t index) {
496        sp<APacketSource> source =
497            new APacketSource(mSessionDesc, index);
498        if (source->initCheck() != OK) {
499            LOG(WARNING) << "Unsupported format. Ignoring track #"
500                         << index << ".";
501
502            sp<AMessage> reply = new AMessage('setu', id());
503            reply->setSize("index", index);
504            reply->setInt32("result", ERROR_UNSUPPORTED);
505            reply->post();
506            return;
507        }
508
509        AString url;
510        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
511
512        AString trackURL;
513        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
514
515        mTracks.push(TrackInfo());
516        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
517        info->mPacketSource = source;
518
519        unsigned rtpPort;
520        ARTPConnection::MakePortPair(
521                &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
522
523        AString request = "SETUP ";
524        request.append(trackURL);
525        request.append(" RTSP/1.0\r\n");
526
527        request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
528        request.append(rtpPort);
529        request.append("-");
530        request.append(rtpPort + 1);
531        request.append("\r\n");
532
533        if (index > 1) {
534            request.append("Session: ");
535            request.append(mSessionID);
536            request.append("\r\n");
537        }
538
539        request.append("\r\n");
540
541        sp<AMessage> reply = new AMessage('setu', id());
542        reply->setSize("index", index);
543        reply->setSize("track-index", mTracks.size() - 1);
544        mConn->sendRequest(request.c_str(), reply);
545    }
546
547    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
548        out->clear();
549
550        if (strncasecmp("rtsp://", baseURL, 7)) {
551            // Base URL must be absolute
552            return false;
553        }
554
555        if (!strncasecmp("rtsp://", url, 7)) {
556            // "url" is already an absolute URL, ignore base URL.
557            out->setTo(url);
558            return true;
559        }
560
561        size_t n = strlen(baseURL);
562        if (baseURL[n - 1] == '/') {
563            out->setTo(baseURL);
564            out->append(url);
565        } else {
566            char *slashPos = strrchr(baseURL, '/');
567
568            if (slashPos > &baseURL[6]) {
569                out->setTo(baseURL, slashPos - baseURL);
570            } else {
571                out->setTo(baseURL);
572            }
573
574            out->append("/");
575            out->append(url);
576        }
577
578        return true;
579    }
580
581    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
582};
583
584}  // namespace android
585
586#endif  // MY_HANDLER_H_
587