MyHandler.h revision 437ab8c4b66a6c9dc47faa257df90089ebef10a9
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21#include "APacketSource.h"
22#include "ARTPConnection.h"
23#include "ARTSPConnection.h"
24#include "ASessionDescription.h"
25
26#include <media/stagefright/foundation/ABuffer.h>
27#include <media/stagefright/foundation/ADebug.h>
28#include <media/stagefright/foundation/ALooper.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/MediaErrors.h>
31
32#define USE_TCP_INTERLEAVED     0
33
34namespace android {
35
36struct MyHandler : public AHandler {
37    MyHandler(const char *url, const sp<ALooper> &looper)
38        : mLooper(looper),
39          mNetLooper(new ALooper),
40          mConn(new ARTSPConnection),
41          mRTPConn(new ARTPConnection),
42          mSessionURL(url),
43          mSetupTracksSuccessful(false),
44          mSeekPending(false),
45          mFirstAccessUnit(true),
46          mFirstAccessUnitNTP(0) {
47
48        mNetLooper->start(false /* runOnCallingThread */,
49                          false /* canCallJava */,
50                          PRIORITY_HIGHEST);
51    }
52
53    void connect(const sp<AMessage> &doneMsg) {
54        mDoneMsg = doneMsg;
55
56        mLooper->registerHandler(this);
57        mLooper->registerHandler(mConn);
58        (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
59
60        sp<AMessage> notify = new AMessage('biny', id());
61        mConn->observeBinaryData(notify);
62
63        sp<AMessage> reply = new AMessage('conn', id());
64        mConn->connect(mSessionURL.c_str(), reply);
65    }
66
67    void disconnect(const sp<AMessage> &doneMsg) {
68        mDoneMsg = doneMsg;
69
70        (new AMessage('abor', id()))->post();
71    }
72
73    void seek(int64_t timeUs) {
74        sp<AMessage> msg = new AMessage('seek', id());
75        msg->setInt64("time", timeUs);
76        msg->post();
77    }
78
79    virtual void onMessageReceived(const sp<AMessage> &msg) {
80        switch (msg->what()) {
81            case 'conn':
82            {
83                int32_t result;
84                CHECK(msg->findInt32("result", &result));
85
86                LOG(INFO) << "connection request completed with result "
87                     << result << " (" << strerror(-result) << ")";
88
89                if (result == OK) {
90                    AString request;
91                    request = "DESCRIBE ";
92                    request.append(mSessionURL);
93                    request.append(" RTSP/1.0\r\n");
94                    request.append("Accept: application/sdp\r\n");
95                    request.append("\r\n");
96
97                    sp<AMessage> reply = new AMessage('desc', id());
98                    mConn->sendRequest(request.c_str(), reply);
99                } else {
100                    (new AMessage('disc', id()))->post();
101                }
102                break;
103            }
104
105            case 'disc':
106            {
107                (new AMessage('quit', id()))->post();
108                break;
109            }
110
111            case 'desc':
112            {
113                int32_t result;
114                CHECK(msg->findInt32("result", &result));
115
116                LOG(INFO) << "DESCRIBE completed with result "
117                     << result << " (" << strerror(-result) << ")";
118
119                if (result == OK) {
120                    sp<RefBase> obj;
121                    CHECK(msg->findObject("response", &obj));
122                    sp<ARTSPResponse> response =
123                        static_cast<ARTSPResponse *>(obj.get());
124
125                    if (response->mStatusCode == 302) {
126                        ssize_t i = response->mHeaders.indexOfKey("location");
127                        CHECK_GE(i, 0);
128
129                        mSessionURL = response->mHeaders.valueAt(i);
130
131                        AString request;
132                        request = "DESCRIBE ";
133                        request.append(mSessionURL);
134                        request.append(" RTSP/1.0\r\n");
135                        request.append("Accept: application/sdp\r\n");
136                        request.append("\r\n");
137
138                        sp<AMessage> reply = new AMessage('desc', id());
139                        mConn->sendRequest(request.c_str(), reply);
140                        break;
141                    }
142
143                    CHECK_EQ(response->mStatusCode, 200u);
144
145                    mSessionDesc = new ASessionDescription;
146
147                    mSessionDesc->setTo(
148                            response->mContent->data(),
149                            response->mContent->size());
150
151                    CHECK(mSessionDesc->isValid());
152
153                    ssize_t i = response->mHeaders.indexOfKey("content-base");
154                    if (i >= 0) {
155                        mBaseURL = response->mHeaders.valueAt(i);
156                    } else {
157                        i = response->mHeaders.indexOfKey("content-location");
158                        if (i >= 0) {
159                            mBaseURL = response->mHeaders.valueAt(i);
160                        } else {
161                            mBaseURL = mSessionURL;
162                        }
163                    }
164
165                    CHECK_GT(mSessionDesc->countTracks(), 1u);
166                    setupTrack(1);
167                } else {
168                    sp<AMessage> reply = new AMessage('disc', id());
169                    mConn->disconnect(reply);
170                }
171                break;
172            }
173
174            case 'setu':
175            {
176                size_t index;
177                CHECK(msg->findSize("index", &index));
178
179                TrackInfo *track = NULL;
180                size_t trackIndex;
181                if (msg->findSize("track-index", &trackIndex)) {
182                    track = &mTracks.editItemAt(trackIndex);
183                }
184
185                int32_t result;
186                CHECK(msg->findInt32("result", &result));
187
188                LOG(INFO) << "SETUP(" << index << ") completed with result "
189                     << result << " (" << strerror(-result) << ")";
190
191                if (result != OK) {
192                    if (track) {
193                        if (!track->mUsingInterleavedTCP) {
194                            close(track->mRTPSocket);
195                            close(track->mRTCPSocket);
196                        }
197
198                        mTracks.removeItemsAt(trackIndex);
199                    }
200                } else {
201                    CHECK(track != NULL);
202
203                    sp<RefBase> obj;
204                    CHECK(msg->findObject("response", &obj));
205                    sp<ARTSPResponse> response =
206                        static_cast<ARTSPResponse *>(obj.get());
207
208                    CHECK_EQ(response->mStatusCode, 200u);
209
210                    ssize_t i = response->mHeaders.indexOfKey("session");
211                    CHECK_GE(i, 0);
212
213                    if (index == 1) {
214                        mSessionID = response->mHeaders.valueAt(i);
215                        i = mSessionID.find(";");
216                        if (i >= 0) {
217                            // Remove options, i.e. ";timeout=90"
218                            mSessionID.erase(i, mSessionID.size() - i);
219                        }
220                    }
221
222                    sp<AMessage> notify = new AMessage('accu', id());
223                    notify->setSize("track-index", trackIndex);
224
225                    mRTPConn->addStream(
226                            track->mRTPSocket, track->mRTCPSocket,
227                            mSessionDesc, index,
228                            notify, track->mUsingInterleavedTCP);
229
230                    mSetupTracksSuccessful = true;
231                }
232
233                ++index;
234                if (index < mSessionDesc->countTracks()) {
235                    setupTrack(index);
236                } else if (mSetupTracksSuccessful) {
237                    AString request = "PLAY ";
238                    request.append(mSessionURL);
239                    request.append(" RTSP/1.0\r\n");
240
241                    request.append("Session: ");
242                    request.append(mSessionID);
243                    request.append("\r\n");
244
245                    request.append("\r\n");
246
247                    sp<AMessage> reply = new AMessage('play', id());
248                    mConn->sendRequest(request.c_str(), reply);
249                } else {
250                    sp<AMessage> reply = new AMessage('disc', id());
251                    mConn->disconnect(reply);
252                }
253                break;
254            }
255
256            case 'play':
257            {
258                int32_t result;
259                CHECK(msg->findInt32("result", &result));
260
261                LOG(INFO) << "PLAY completed with result "
262                     << result << " (" << strerror(-result) << ")";
263
264                if (result == OK) {
265                    sp<RefBase> obj;
266                    CHECK(msg->findObject("response", &obj));
267                    sp<ARTSPResponse> response =
268                        static_cast<ARTSPResponse *>(obj.get());
269
270                    CHECK_EQ(response->mStatusCode, 200u);
271
272                    mDoneMsg->setInt32("result", OK);
273                    mDoneMsg->post();
274                    mDoneMsg = NULL;
275
276                    sp<AMessage> timeout = new AMessage('tiou', id());
277                    timeout->post(10000000ll);
278                } else {
279                    sp<AMessage> reply = new AMessage('disc', id());
280                    mConn->disconnect(reply);
281                }
282
283                break;
284            }
285
286            case 'abor':
287            {
288                for (size_t i = 0; i < mTracks.size(); ++i) {
289                    mTracks.editItemAt(i).mPacketSource->signalEOS(
290                            ERROR_END_OF_STREAM);
291                }
292
293                sp<AMessage> reply = new AMessage('tear', id());
294
295                AString request;
296                request = "TEARDOWN ";
297
298                // XXX should use aggregate url from SDP here...
299                request.append(mSessionURL);
300                request.append(" RTSP/1.0\r\n");
301
302                request.append("Session: ");
303                request.append(mSessionID);
304                request.append("\r\n");
305
306                request.append("\r\n");
307
308                mConn->sendRequest(request.c_str(), reply);
309                break;
310            }
311
312            case 'tear':
313            {
314                int32_t result;
315                CHECK(msg->findInt32("result", &result));
316
317                LOG(INFO) << "TEARDOWN completed with result "
318                     << result << " (" << strerror(-result) << ")";
319
320                sp<AMessage> reply = new AMessage('disc', id());
321                mConn->disconnect(reply);
322                break;
323            }
324
325            case 'quit':
326            {
327                if (mDoneMsg != NULL) {
328                    mDoneMsg->setInt32("result", UNKNOWN_ERROR);
329                    mDoneMsg->post();
330                    mDoneMsg = NULL;
331                }
332                break;
333            }
334
335            case 'accu':
336            {
337                size_t trackIndex;
338                CHECK(msg->findSize("track-index", &trackIndex));
339
340                int32_t eos;
341                if (msg->findInt32("eos", &eos)) {
342                    LOG(INFO) << "received BYE on track index " << trackIndex;
343#if 0
344                    TrackInfo *track = &mTracks.editItemAt(trackIndex);
345                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
346#endif
347                    return;
348                }
349
350                sp<RefBase> obj;
351                CHECK(msg->findObject("access-unit", &obj));
352
353                sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get());
354
355                uint64_t ntpTime;
356                CHECK(accessUnit->meta()->findInt64(
357                            "ntp-time", (int64_t *)&ntpTime));
358
359                if (mFirstAccessUnit) {
360                    mFirstAccessUnit = false;
361                    mFirstAccessUnitNTP = ntpTime;
362                }
363
364                if (ntpTime >= mFirstAccessUnitNTP) {
365                    ntpTime -= mFirstAccessUnitNTP;
366                } else {
367                    ntpTime = 0;
368                }
369
370                int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
371
372                accessUnit->meta()->setInt64("timeUs", timeUs);
373
374#if 0
375                int32_t damaged;
376                if (accessUnit->meta()->findInt32("damaged", &damaged)
377                        && damaged != 0) {
378                    LOG(INFO) << "ignoring damaged AU";
379                } else
380#endif
381                {
382                    TrackInfo *track = &mTracks.editItemAt(trackIndex);
383                    track->mPacketSource->queueAccessUnit(accessUnit);
384                }
385                break;
386            }
387
388            case 'seek':
389            {
390                if (mSeekPending) {
391                    break;
392                }
393
394                int64_t timeUs;
395                CHECK(msg->findInt64("time", &timeUs));
396
397                mSeekPending = true;
398
399                AString request = "PAUSE ";
400                request.append(mSessionURL);
401                request.append(" RTSP/1.0\r\n");
402
403                request.append("Session: ");
404                request.append(mSessionID);
405                request.append("\r\n");
406
407                request.append("\r\n");
408
409                sp<AMessage> reply = new AMessage('see1', id());
410                reply->setInt64("time", timeUs);
411                mConn->sendRequest(request.c_str(), reply);
412                break;
413            }
414
415            case 'see1':
416            {
417                int64_t timeUs;
418                CHECK(msg->findInt64("time", &timeUs));
419
420                AString request = "PLAY ";
421                request.append(mSessionURL);
422                request.append(" RTSP/1.0\r\n");
423
424                request.append("Session: ");
425                request.append(mSessionID);
426                request.append("\r\n");
427
428                request.append(
429                        StringPrintf(
430                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
431
432                request.append("\r\n");
433
434                sp<AMessage> reply = new AMessage('see2', id());
435                mConn->sendRequest(request.c_str(), reply);
436                break;
437            }
438
439            case 'see2':
440            {
441                CHECK(mSeekPending);
442
443                LOG(INFO) << "seek completed.";
444                mSeekPending = false;
445
446                int32_t result;
447                CHECK(msg->findInt32("result", &result));
448                if (result != OK) {
449                    LOG(ERROR) << "seek FAILED";
450                    break;
451                }
452
453                sp<RefBase> obj;
454                CHECK(msg->findObject("response", &obj));
455                sp<ARTSPResponse> response =
456                    static_cast<ARTSPResponse *>(obj.get());
457
458                CHECK_EQ(response->mStatusCode, 200u);
459
460                for (size_t i = 0; i < mTracks.size(); ++i) {
461                    mTracks.editItemAt(i).mPacketSource->flushQueue();
462                }
463                break;
464            }
465
466            case 'biny':
467            {
468                sp<RefBase> obj;
469                CHECK(msg->findObject("buffer", &obj));
470                sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
471
472                int32_t index;
473                CHECK(buffer->meta()->findInt32("index", &index));
474
475                mRTPConn->injectPacket(index, buffer);
476                break;
477            }
478
479            case 'tiou':
480            {
481                if (mFirstAccessUnit) {
482                    LOG(WARNING) << "Never received any data, disconnecting.";
483                    (new AMessage('abor', id()))->post();
484                }
485                break;
486            }
487
488            default:
489                TRESPASS();
490                break;
491        }
492    }
493
494    sp<APacketSource> getPacketSource(size_t index) {
495        CHECK_GE(index, 0u);
496        CHECK_LT(index, mTracks.size());
497
498        return mTracks.editItemAt(index).mPacketSource;
499    }
500
501    size_t countTracks() const {
502        return mTracks.size();
503    }
504
505private:
506    sp<ALooper> mLooper;
507    sp<ALooper> mNetLooper;
508    sp<ARTSPConnection> mConn;
509    sp<ARTPConnection> mRTPConn;
510    sp<ASessionDescription> mSessionDesc;
511    AString mSessionURL;
512    AString mBaseURL;
513    AString mSessionID;
514    bool mSetupTracksSuccessful;
515    bool mSeekPending;
516    bool mFirstAccessUnit;
517    uint64_t mFirstAccessUnitNTP;
518
519    struct TrackInfo {
520        int mRTPSocket;
521        int mRTCPSocket;
522        bool mUsingInterleavedTCP;
523
524        sp<APacketSource> mPacketSource;
525    };
526    Vector<TrackInfo> mTracks;
527
528    sp<AMessage> mDoneMsg;
529
530    void setupTrack(size_t index) {
531        sp<APacketSource> source =
532            new APacketSource(mSessionDesc, index);
533        if (source->initCheck() != OK) {
534            LOG(WARNING) << "Unsupported format. Ignoring track #"
535                         << index << ".";
536
537            sp<AMessage> reply = new AMessage('setu', id());
538            reply->setSize("index", index);
539            reply->setInt32("result", ERROR_UNSUPPORTED);
540            reply->post();
541            return;
542        }
543
544        AString url;
545        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
546
547        AString trackURL;
548        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
549
550        mTracks.push(TrackInfo());
551        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
552        info->mPacketSource = source;
553        info->mUsingInterleavedTCP = false;
554
555        AString request = "SETUP ";
556        request.append(trackURL);
557        request.append(" RTSP/1.0\r\n");
558
559#if USE_TCP_INTERLEAVED
560        size_t interleaveIndex = 2 * (mTracks.size() - 1);
561        info->mUsingInterleavedTCP = true;
562        info->mRTPSocket = interleaveIndex;
563        info->mRTCPSocket = interleaveIndex + 1;
564
565        request.append("Transport: RTP/AVP/TCP;interleaved=");
566        request.append(interleaveIndex);
567        request.append("-");
568        request.append(interleaveIndex + 1);
569#else
570        unsigned rtpPort;
571        ARTPConnection::MakePortPair(
572                &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
573
574        request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
575        request.append(rtpPort);
576        request.append("-");
577        request.append(rtpPort + 1);
578#endif
579
580        request.append("\r\n");
581
582        if (index > 1) {
583            request.append("Session: ");
584            request.append(mSessionID);
585            request.append("\r\n");
586        }
587
588        request.append("\r\n");
589
590        sp<AMessage> reply = new AMessage('setu', id());
591        reply->setSize("index", index);
592        reply->setSize("track-index", mTracks.size() - 1);
593        mConn->sendRequest(request.c_str(), reply);
594    }
595
596    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
597        out->clear();
598
599        if (strncasecmp("rtsp://", baseURL, 7)) {
600            // Base URL must be absolute
601            return false;
602        }
603
604        if (!strncasecmp("rtsp://", url, 7)) {
605            // "url" is already an absolute URL, ignore base URL.
606            out->setTo(url);
607            return true;
608        }
609
610        size_t n = strlen(baseURL);
611        if (baseURL[n - 1] == '/') {
612            out->setTo(baseURL);
613            out->append(url);
614        } else {
615            const char *slashPos = strrchr(baseURL, '/');
616
617            if (slashPos > &baseURL[6]) {
618                out->setTo(baseURL, slashPos - baseURL);
619            } else {
620                out->setTo(baseURL);
621            }
622
623            out->append("/");
624            out->append(url);
625        }
626
627        return true;
628    }
629
630    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
631};
632
633}  // namespace android
634
635#endif  // MY_HANDLER_H_
636