1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22#define LOG_TAG "MyHandler"
23#include <utils/Log.h>
24
25#include "APacketSource.h"
26#include "ARTPConnection.h"
27#include "ARTSPConnection.h"
28#include "ASessionDescription.h"
29
30#include <ctype.h>
31#include <cutils/properties.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/ALooper.h>
36#include <media/stagefright/foundation/AMessage.h>
37#include <media/stagefright/MediaErrors.h>
38
39#include <arpa/inet.h>
40#include <sys/socket.h>
41#include <netdb.h>
42
43#include "HTTPBase.h"
44
45// If no access units are received within 5 secs, assume that the rtp
46// stream has ended and signal end of stream.
47static int64_t kAccessUnitTimeoutUs = 10000000ll;
48
49// If no access units arrive for the first 10 secs after starting the
50// stream, assume none ever will and signal EOS or switch transports.
51static int64_t kStartupTimeoutUs = 10000000ll;
52
53static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
54
55namespace android {
56
57static void MakeUserAgentString(AString *s) {
58    s->setTo("stagefright/1.1 (Linux;Android ");
59
60#if (PROPERTY_VALUE_MAX < 8)
61#error "PROPERTY_VALUE_MAX must be at least 8"
62#endif
63
64    char value[PROPERTY_VALUE_MAX];
65    property_get("ro.build.version.release", value, "Unknown");
66    s->append(value);
67    s->append(")");
68}
69
70static bool GetAttribute(const char *s, const char *key, AString *value) {
71    value->clear();
72
73    size_t keyLen = strlen(key);
74
75    for (;;) {
76        while (isspace(*s)) {
77            ++s;
78        }
79
80        const char *colonPos = strchr(s, ';');
81
82        size_t len =
83            (colonPos == NULL) ? strlen(s) : colonPos - s;
84
85        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
86            value->setTo(&s[keyLen + 1], len - keyLen - 1);
87            return true;
88        }
89
90        if (colonPos == NULL) {
91            return false;
92        }
93
94        s = colonPos + 1;
95    }
96}
97
98struct MyHandler : public AHandler {
99    enum {
100        kWhatConnected                  = 'conn',
101        kWhatDisconnected               = 'disc',
102        kWhatSeekDone                   = 'sdon',
103
104        kWhatAccessUnit                 = 'accU',
105        kWhatEOS                        = 'eos!',
106        kWhatSeekDiscontinuity          = 'seeD',
107        kWhatNormalPlayTimeMapping      = 'nptM',
108    };
109
110    MyHandler(
111            const char *url,
112            const sp<AMessage> &notify,
113            bool uidValid = false, uid_t uid = 0)
114        : mNotify(notify),
115          mUIDValid(uidValid),
116          mUID(uid),
117          mNetLooper(new ALooper),
118          mConn(new ARTSPConnection(mUIDValid, mUID)),
119          mRTPConn(new ARTPConnection),
120          mOriginalSessionURL(url),
121          mSessionURL(url),
122          mSetupTracksSuccessful(false),
123          mSeekPending(false),
124          mFirstAccessUnit(true),
125          mAllTracksHaveTime(false),
126          mNTPAnchorUs(-1),
127          mMediaAnchorUs(-1),
128          mLastMediaTimeUs(0),
129          mNumAccessUnitsReceived(0),
130          mCheckPending(false),
131          mCheckGeneration(0),
132          mTryTCPInterleaving(false),
133          mTryFakeRTCP(false),
134          mReceivedFirstRTCPPacket(false),
135          mReceivedFirstRTPPacket(false),
136          mSeekable(false),
137          mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
138          mKeepAliveGeneration(0) {
139        mNetLooper->setName("rtsp net");
140        mNetLooper->start(false /* runOnCallingThread */,
141                          false /* canCallJava */,
142                          PRIORITY_HIGHEST);
143
144        // Strip any authentication info from the session url, we don't
145        // want to transmit user/pass in cleartext.
146        AString host, path, user, pass;
147        unsigned port;
148        CHECK(ARTSPConnection::ParseURL(
149                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
150
151        if (user.size() > 0) {
152            mSessionURL.clear();
153            mSessionURL.append("rtsp://");
154            mSessionURL.append(host);
155            mSessionURL.append(":");
156            mSessionURL.append(StringPrintf("%u", port));
157            mSessionURL.append(path);
158
159            ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
160        }
161
162        mSessionHost = host;
163    }
164
165    void connect() {
166        looper()->registerHandler(mConn);
167        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
168
169        sp<AMessage> notify = new AMessage('biny', id());
170        mConn->observeBinaryData(notify);
171
172        sp<AMessage> reply = new AMessage('conn', id());
173        mConn->connect(mOriginalSessionURL.c_str(), reply);
174    }
175
176    void disconnect() {
177        (new AMessage('abor', id()))->post();
178    }
179
180    void seek(int64_t timeUs) {
181        sp<AMessage> msg = new AMessage('seek', id());
182        msg->setInt64("time", timeUs);
183        msg->post();
184    }
185
186    static void addRR(const sp<ABuffer> &buf) {
187        uint8_t *ptr = buf->data() + buf->size();
188        ptr[0] = 0x80 | 0;
189        ptr[1] = 201;  // RR
190        ptr[2] = 0;
191        ptr[3] = 1;
192        ptr[4] = 0xde;  // SSRC
193        ptr[5] = 0xad;
194        ptr[6] = 0xbe;
195        ptr[7] = 0xef;
196
197        buf->setRange(0, buf->size() + 8);
198    }
199
200    static void addSDES(int s, const sp<ABuffer> &buffer) {
201        struct sockaddr_in addr;
202        socklen_t addrSize = sizeof(addr);
203        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
204
205        uint8_t *data = buffer->data() + buffer->size();
206        data[0] = 0x80 | 1;
207        data[1] = 202;  // SDES
208        data[4] = 0xde;  // SSRC
209        data[5] = 0xad;
210        data[6] = 0xbe;
211        data[7] = 0xef;
212
213        size_t offset = 8;
214
215        data[offset++] = 1;  // CNAME
216
217        AString cname = "stagefright@";
218        cname.append(inet_ntoa(addr.sin_addr));
219        data[offset++] = cname.size();
220
221        memcpy(&data[offset], cname.c_str(), cname.size());
222        offset += cname.size();
223
224        data[offset++] = 6;  // TOOL
225
226        AString tool;
227        MakeUserAgentString(&tool);
228
229        data[offset++] = tool.size();
230
231        memcpy(&data[offset], tool.c_str(), tool.size());
232        offset += tool.size();
233
234        data[offset++] = 0;
235
236        if ((offset % 4) > 0) {
237            size_t count = 4 - (offset % 4);
238            switch (count) {
239                case 3:
240                    data[offset++] = 0;
241                case 2:
242                    data[offset++] = 0;
243                case 1:
244                    data[offset++] = 0;
245            }
246        }
247
248        size_t numWords = (offset / 4) - 1;
249        data[2] = numWords >> 8;
250        data[3] = numWords & 0xff;
251
252        buffer->setRange(buffer->offset(), buffer->size() + offset);
253    }
254
255    // In case we're behind NAT, fire off two UDP packets to the remote
256    // rtp/rtcp ports to poke a hole into the firewall for future incoming
257    // packets. We're going to send an RR/SDES RTCP packet to both of them.
258    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
259        struct sockaddr_in addr;
260        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
261        addr.sin_family = AF_INET;
262
263        AString source;
264        AString server_port;
265        if (!GetAttribute(transport.c_str(),
266                          "source",
267                          &source)) {
268            ALOGW("Missing 'source' field in Transport response. Using "
269                 "RTSP endpoint address.");
270
271            struct hostent *ent = gethostbyname(mSessionHost.c_str());
272            if (ent == NULL) {
273                ALOGE("Failed to look up address of session host '%s'",
274                     mSessionHost.c_str());
275
276                return false;
277            }
278
279            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
280        } else {
281            addr.sin_addr.s_addr = inet_addr(source.c_str());
282        }
283
284        if (!GetAttribute(transport.c_str(),
285                                 "server_port",
286                                 &server_port)) {
287            ALOGI("Missing 'server_port' field in Transport response.");
288            return false;
289        }
290
291        int rtpPort, rtcpPort;
292        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
293                || rtpPort <= 0 || rtpPort > 65535
294                || rtcpPort <=0 || rtcpPort > 65535
295                || rtcpPort != rtpPort + 1) {
296            ALOGE("Server picked invalid RTP/RTCP port pair %s,"
297                 " RTP port must be even, RTCP port must be one higher.",
298                 server_port.c_str());
299
300            return false;
301        }
302
303        if (rtpPort & 1) {
304            ALOGW("Server picked an odd RTP port, it should've picked an "
305                 "even one, we'll let it pass for now, but this may break "
306                 "in the future.");
307        }
308
309        if (addr.sin_addr.s_addr == INADDR_NONE) {
310            return true;
311        }
312
313        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
314            // No firewalls to traverse on the loopback interface.
315            return true;
316        }
317
318        // Make up an RR/SDES RTCP packet.
319        sp<ABuffer> buf = new ABuffer(65536);
320        buf->setRange(0, 0);
321        addRR(buf);
322        addSDES(rtpSocket, buf);
323
324        addr.sin_port = htons(rtpPort);
325
326        ssize_t n = sendto(
327                rtpSocket, buf->data(), buf->size(), 0,
328                (const sockaddr *)&addr, sizeof(addr));
329
330        if (n < (ssize_t)buf->size()) {
331            ALOGE("failed to poke a hole for RTP packets");
332            return false;
333        }
334
335        addr.sin_port = htons(rtcpPort);
336
337        n = sendto(
338                rtcpSocket, buf->data(), buf->size(), 0,
339                (const sockaddr *)&addr, sizeof(addr));
340
341        if (n < (ssize_t)buf->size()) {
342            ALOGE("failed to poke a hole for RTCP packets");
343            return false;
344        }
345
346        ALOGV("successfully poked holes.");
347
348        return true;
349    }
350
351    virtual void onMessageReceived(const sp<AMessage> &msg) {
352        switch (msg->what()) {
353            case 'conn':
354            {
355                int32_t result;
356                CHECK(msg->findInt32("result", &result));
357
358                ALOGI("connection request completed with result %d (%s)",
359                     result, strerror(-result));
360
361                if (result == OK) {
362                    AString request;
363                    request = "DESCRIBE ";
364                    request.append(mSessionURL);
365                    request.append(" RTSP/1.0\r\n");
366                    request.append("Accept: application/sdp\r\n");
367                    request.append("\r\n");
368
369                    sp<AMessage> reply = new AMessage('desc', id());
370                    mConn->sendRequest(request.c_str(), reply);
371                } else {
372                    (new AMessage('disc', id()))->post();
373                }
374                break;
375            }
376
377            case 'disc':
378            {
379                ++mKeepAliveGeneration;
380
381                int32_t reconnect;
382                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
383                    sp<AMessage> reply = new AMessage('conn', id());
384                    mConn->connect(mOriginalSessionURL.c_str(), reply);
385                } else {
386                    (new AMessage('quit', id()))->post();
387                }
388                break;
389            }
390
391            case 'desc':
392            {
393                int32_t result;
394                CHECK(msg->findInt32("result", &result));
395
396                ALOGI("DESCRIBE completed with result %d (%s)",
397                     result, strerror(-result));
398
399                if (result == OK) {
400                    sp<RefBase> obj;
401                    CHECK(msg->findObject("response", &obj));
402                    sp<ARTSPResponse> response =
403                        static_cast<ARTSPResponse *>(obj.get());
404
405                    if (response->mStatusCode == 302) {
406                        ssize_t i = response->mHeaders.indexOfKey("location");
407                        CHECK_GE(i, 0);
408
409                        mSessionURL = response->mHeaders.valueAt(i);
410
411                        AString request;
412                        request = "DESCRIBE ";
413                        request.append(mSessionURL);
414                        request.append(" RTSP/1.0\r\n");
415                        request.append("Accept: application/sdp\r\n");
416                        request.append("\r\n");
417
418                        sp<AMessage> reply = new AMessage('desc', id());
419                        mConn->sendRequest(request.c_str(), reply);
420                        break;
421                    }
422
423                    if (response->mStatusCode != 200) {
424                        result = UNKNOWN_ERROR;
425                    } else {
426                        mSessionDesc = new ASessionDescription;
427
428                        mSessionDesc->setTo(
429                                response->mContent->data(),
430                                response->mContent->size());
431
432                        if (!mSessionDesc->isValid()) {
433                            ALOGE("Failed to parse session description.");
434                            result = ERROR_MALFORMED;
435                        } else {
436                            ssize_t i = response->mHeaders.indexOfKey("content-base");
437                            if (i >= 0) {
438                                mBaseURL = response->mHeaders.valueAt(i);
439                            } else {
440                                i = response->mHeaders.indexOfKey("content-location");
441                                if (i >= 0) {
442                                    mBaseURL = response->mHeaders.valueAt(i);
443                                } else {
444                                    mBaseURL = mSessionURL;
445                                }
446                            }
447
448                            if (!mBaseURL.startsWith("rtsp://")) {
449                                // Some misbehaving servers specify a relative
450                                // URL in one of the locations above, combine
451                                // it with the absolute session URL to get
452                                // something usable...
453
454                                ALOGW("Server specified a non-absolute base URL"
455                                     ", combining it with the session URL to "
456                                     "get something usable...");
457
458                                AString tmp;
459                                CHECK(MakeURL(
460                                            mSessionURL.c_str(),
461                                            mBaseURL.c_str(),
462                                            &tmp));
463
464                                mBaseURL = tmp;
465                            }
466
467                            if (mSessionDesc->countTracks() < 2) {
468                                // There's no actual tracks in this session.
469                                // The first "track" is merely session meta
470                                // data.
471
472                                ALOGW("Session doesn't contain any playable "
473                                     "tracks. Aborting.");
474                                result = ERROR_UNSUPPORTED;
475                            } else {
476                                setupTrack(1);
477                            }
478                        }
479                    }
480                }
481
482                if (result != OK) {
483                    sp<AMessage> reply = new AMessage('disc', id());
484                    mConn->disconnect(reply);
485                }
486                break;
487            }
488
489            case 'setu':
490            {
491                size_t index;
492                CHECK(msg->findSize("index", &index));
493
494                TrackInfo *track = NULL;
495                size_t trackIndex;
496                if (msg->findSize("track-index", &trackIndex)) {
497                    track = &mTracks.editItemAt(trackIndex);
498                }
499
500                int32_t result;
501                CHECK(msg->findInt32("result", &result));
502
503                ALOGI("SETUP(%d) completed with result %d (%s)",
504                     index, result, strerror(-result));
505
506                if (result == OK) {
507                    CHECK(track != NULL);
508
509                    sp<RefBase> obj;
510                    CHECK(msg->findObject("response", &obj));
511                    sp<ARTSPResponse> response =
512                        static_cast<ARTSPResponse *>(obj.get());
513
514                    if (response->mStatusCode != 200) {
515                        result = UNKNOWN_ERROR;
516                    } else {
517                        ssize_t i = response->mHeaders.indexOfKey("session");
518                        CHECK_GE(i, 0);
519
520                        mSessionID = response->mHeaders.valueAt(i);
521
522                        mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
523                        AString timeoutStr;
524                        if (GetAttribute(
525                                    mSessionID.c_str(), "timeout", &timeoutStr)) {
526                            char *end;
527                            unsigned long timeoutSecs =
528                                strtoul(timeoutStr.c_str(), &end, 10);
529
530                            if (end == timeoutStr.c_str() || *end != '\0') {
531                                ALOGW("server specified malformed timeout '%s'",
532                                     timeoutStr.c_str());
533
534                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
535                            } else if (timeoutSecs < 15) {
536                                ALOGW("server specified too short a timeout "
537                                     "(%lu secs), using default.",
538                                     timeoutSecs);
539
540                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
541                            } else {
542                                mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
543
544                                ALOGI("server specified timeout of %lu secs.",
545                                     timeoutSecs);
546                            }
547                        }
548
549                        i = mSessionID.find(";");
550                        if (i >= 0) {
551                            // Remove options, i.e. ";timeout=90"
552                            mSessionID.erase(i, mSessionID.size() - i);
553                        }
554
555                        sp<AMessage> notify = new AMessage('accu', id());
556                        notify->setSize("track-index", trackIndex);
557
558                        i = response->mHeaders.indexOfKey("transport");
559                        CHECK_GE(i, 0);
560
561                        if (!track->mUsingInterleavedTCP) {
562                            AString transport = response->mHeaders.valueAt(i);
563
564                            // We are going to continue even if we were
565                            // unable to poke a hole into the firewall...
566                            pokeAHole(
567                                    track->mRTPSocket,
568                                    track->mRTCPSocket,
569                                    transport);
570                        }
571
572                        mRTPConn->addStream(
573                                track->mRTPSocket, track->mRTCPSocket,
574                                mSessionDesc, index,
575                                notify, track->mUsingInterleavedTCP);
576
577                        mSetupTracksSuccessful = true;
578                    }
579                }
580
581                if (result != OK) {
582                    if (track) {
583                        if (!track->mUsingInterleavedTCP) {
584                            // Clear the tag
585                            if (mUIDValid) {
586                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
587                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
588                            }
589
590                            close(track->mRTPSocket);
591                            close(track->mRTCPSocket);
592                        }
593
594                        mTracks.removeItemsAt(trackIndex);
595                    }
596                }
597
598                ++index;
599                if (index < mSessionDesc->countTracks()) {
600                    setupTrack(index);
601                } else if (mSetupTracksSuccessful) {
602                    ++mKeepAliveGeneration;
603                    postKeepAlive();
604
605                    AString request = "PLAY ";
606                    request.append(mSessionURL);
607                    request.append(" RTSP/1.0\r\n");
608
609                    request.append("Session: ");
610                    request.append(mSessionID);
611                    request.append("\r\n");
612
613                    request.append("\r\n");
614
615                    sp<AMessage> reply = new AMessage('play', id());
616                    mConn->sendRequest(request.c_str(), reply);
617                } else {
618                    sp<AMessage> reply = new AMessage('disc', id());
619                    mConn->disconnect(reply);
620                }
621                break;
622            }
623
624            case 'play':
625            {
626                int32_t result;
627                CHECK(msg->findInt32("result", &result));
628
629                ALOGI("PLAY completed with result %d (%s)",
630                     result, strerror(-result));
631
632                if (result == OK) {
633                    sp<RefBase> obj;
634                    CHECK(msg->findObject("response", &obj));
635                    sp<ARTSPResponse> response =
636                        static_cast<ARTSPResponse *>(obj.get());
637
638                    if (response->mStatusCode != 200) {
639                        result = UNKNOWN_ERROR;
640                    } else {
641                        parsePlayResponse(response);
642
643                        sp<AMessage> timeout = new AMessage('tiou', id());
644                        timeout->post(kStartupTimeoutUs);
645                    }
646                }
647
648                if (result != OK) {
649                    sp<AMessage> reply = new AMessage('disc', id());
650                    mConn->disconnect(reply);
651                }
652
653                break;
654            }
655
656            case 'aliv':
657            {
658                int32_t generation;
659                CHECK(msg->findInt32("generation", &generation));
660
661                if (generation != mKeepAliveGeneration) {
662                    // obsolete event.
663                    break;
664                }
665
666                AString request;
667                request.append("OPTIONS ");
668                request.append(mSessionURL);
669                request.append(" RTSP/1.0\r\n");
670                request.append("Session: ");
671                request.append(mSessionID);
672                request.append("\r\n");
673                request.append("\r\n");
674
675                sp<AMessage> reply = new AMessage('opts', id());
676                reply->setInt32("generation", mKeepAliveGeneration);
677                mConn->sendRequest(request.c_str(), reply);
678                break;
679            }
680
681            case 'opts':
682            {
683                int32_t result;
684                CHECK(msg->findInt32("result", &result));
685
686                ALOGI("OPTIONS completed with result %d (%s)",
687                     result, strerror(-result));
688
689                int32_t generation;
690                CHECK(msg->findInt32("generation", &generation));
691
692                if (generation != mKeepAliveGeneration) {
693                    // obsolete event.
694                    break;
695                }
696
697                postKeepAlive();
698                break;
699            }
700
701            case 'abor':
702            {
703                for (size_t i = 0; i < mTracks.size(); ++i) {
704                    TrackInfo *info = &mTracks.editItemAt(i);
705
706                    if (!mFirstAccessUnit) {
707                        postQueueEOS(i, ERROR_END_OF_STREAM);
708                    }
709
710                    if (!info->mUsingInterleavedTCP) {
711                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
712
713                        // Clear the tag
714                        if (mUIDValid) {
715                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
716                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
717                        }
718
719                        close(info->mRTPSocket);
720                        close(info->mRTCPSocket);
721                    }
722                }
723                mTracks.clear();
724                mSetupTracksSuccessful = false;
725                mSeekPending = false;
726                mFirstAccessUnit = true;
727                mAllTracksHaveTime = false;
728                mNTPAnchorUs = -1;
729                mMediaAnchorUs = -1;
730                mNumAccessUnitsReceived = 0;
731                mReceivedFirstRTCPPacket = false;
732                mReceivedFirstRTPPacket = false;
733                mSeekable = false;
734
735                sp<AMessage> reply = new AMessage('tear', id());
736
737                int32_t reconnect;
738                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
739                    reply->setInt32("reconnect", true);
740                }
741
742                AString request;
743                request = "TEARDOWN ";
744
745                // XXX should use aggregate url from SDP here...
746                request.append(mSessionURL);
747                request.append(" RTSP/1.0\r\n");
748
749                request.append("Session: ");
750                request.append(mSessionID);
751                request.append("\r\n");
752
753                request.append("\r\n");
754
755                mConn->sendRequest(request.c_str(), reply);
756                break;
757            }
758
759            case 'tear':
760            {
761                int32_t result;
762                CHECK(msg->findInt32("result", &result));
763
764                ALOGI("TEARDOWN completed with result %d (%s)",
765                     result, strerror(-result));
766
767                sp<AMessage> reply = new AMessage('disc', id());
768
769                int32_t reconnect;
770                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
771                    reply->setInt32("reconnect", true);
772                }
773
774                mConn->disconnect(reply);
775                break;
776            }
777
778            case 'quit':
779            {
780                sp<AMessage> msg = mNotify->dup();
781                msg->setInt32("what", kWhatDisconnected);
782                msg->setInt32("result", UNKNOWN_ERROR);
783                msg->post();
784                break;
785            }
786
787            case 'chek':
788            {
789                int32_t generation;
790                CHECK(msg->findInt32("generation", &generation));
791                if (generation != mCheckGeneration) {
792                    // This is an outdated message. Ignore.
793                    break;
794                }
795
796                if (mNumAccessUnitsReceived == 0) {
797#if 1
798                    ALOGI("stream ended? aborting.");
799                    (new AMessage('abor', id()))->post();
800                    break;
801#else
802                    ALOGI("haven't seen an AU in a looong time.");
803#endif
804                }
805
806                mNumAccessUnitsReceived = 0;
807                msg->post(kAccessUnitTimeoutUs);
808                break;
809            }
810
811            case 'accu':
812            {
813                int32_t timeUpdate;
814                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
815                    size_t trackIndex;
816                    CHECK(msg->findSize("track-index", &trackIndex));
817
818                    uint32_t rtpTime;
819                    uint64_t ntpTime;
820                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
821                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
822
823                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
824                    break;
825                }
826
827                int32_t first;
828                if (msg->findInt32("first-rtcp", &first)) {
829                    mReceivedFirstRTCPPacket = true;
830                    break;
831                }
832
833                if (msg->findInt32("first-rtp", &first)) {
834                    mReceivedFirstRTPPacket = true;
835                    break;
836                }
837
838                ++mNumAccessUnitsReceived;
839                postAccessUnitTimeoutCheck();
840
841                size_t trackIndex;
842                CHECK(msg->findSize("track-index", &trackIndex));
843
844                if (trackIndex >= mTracks.size()) {
845                    ALOGV("late packets ignored.");
846                    break;
847                }
848
849                TrackInfo *track = &mTracks.editItemAt(trackIndex);
850
851                int32_t eos;
852                if (msg->findInt32("eos", &eos)) {
853                    ALOGI("received BYE on track index %d", trackIndex);
854#if 0
855                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
856#endif
857                    return;
858                }
859
860                sp<ABuffer> accessUnit;
861                CHECK(msg->findBuffer("access-unit", &accessUnit));
862
863                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
864
865                if (mSeekPending) {
866                    ALOGV("we're seeking, dropping stale packet.");
867                    break;
868                }
869
870                if (seqNum < track->mFirstSeqNumInSegment) {
871                    ALOGV("dropping stale access-unit (%d < %d)",
872                         seqNum, track->mFirstSeqNumInSegment);
873                    break;
874                }
875
876                if (track->mNewSegment) {
877                    track->mNewSegment = false;
878                }
879
880                onAccessUnitComplete(trackIndex, accessUnit);
881                break;
882            }
883
884            case 'seek':
885            {
886                if (!mSeekable) {
887                    ALOGW("This is a live stream, ignoring seek request.");
888
889                    sp<AMessage> msg = mNotify->dup();
890                    msg->setInt32("what", kWhatSeekDone);
891                    msg->post();
892                    break;
893                }
894
895                int64_t timeUs;
896                CHECK(msg->findInt64("time", &timeUs));
897
898                mSeekPending = true;
899
900                // Disable the access unit timeout until we resumed
901                // playback again.
902                mCheckPending = true;
903                ++mCheckGeneration;
904
905                AString request = "PAUSE ";
906                request.append(mSessionURL);
907                request.append(" RTSP/1.0\r\n");
908
909                request.append("Session: ");
910                request.append(mSessionID);
911                request.append("\r\n");
912
913                request.append("\r\n");
914
915                sp<AMessage> reply = new AMessage('see1', id());
916                reply->setInt64("time", timeUs);
917                mConn->sendRequest(request.c_str(), reply);
918                break;
919            }
920
921            case 'see1':
922            {
923                // Session is paused now.
924                for (size_t i = 0; i < mTracks.size(); ++i) {
925                    TrackInfo *info = &mTracks.editItemAt(i);
926
927                    postQueueSeekDiscontinuity(i);
928
929                    info->mRTPAnchor = 0;
930                    info->mNTPAnchorUs = -1;
931                }
932
933                mAllTracksHaveTime = false;
934                mNTPAnchorUs = -1;
935
936                int64_t timeUs;
937                CHECK(msg->findInt64("time", &timeUs));
938
939                AString request = "PLAY ";
940                request.append(mSessionURL);
941                request.append(" RTSP/1.0\r\n");
942
943                request.append("Session: ");
944                request.append(mSessionID);
945                request.append("\r\n");
946
947                request.append(
948                        StringPrintf(
949                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
950
951                request.append("\r\n");
952
953                sp<AMessage> reply = new AMessage('see2', id());
954                mConn->sendRequest(request.c_str(), reply);
955                break;
956            }
957
958            case 'see2':
959            {
960                CHECK(mSeekPending);
961
962                int32_t result;
963                CHECK(msg->findInt32("result", &result));
964
965                ALOGI("PLAY completed with result %d (%s)",
966                     result, strerror(-result));
967
968                mCheckPending = false;
969                postAccessUnitTimeoutCheck();
970
971                if (result == OK) {
972                    sp<RefBase> obj;
973                    CHECK(msg->findObject("response", &obj));
974                    sp<ARTSPResponse> response =
975                        static_cast<ARTSPResponse *>(obj.get());
976
977                    if (response->mStatusCode != 200) {
978                        result = UNKNOWN_ERROR;
979                    } else {
980                        parsePlayResponse(response);
981
982                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
983                        CHECK_GE(i, 0);
984
985                        ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
986
987                        ALOGI("seek completed.");
988                    }
989                }
990
991                if (result != OK) {
992                    ALOGE("seek failed, aborting.");
993                    (new AMessage('abor', id()))->post();
994                }
995
996                mSeekPending = false;
997
998                sp<AMessage> msg = mNotify->dup();
999                msg->setInt32("what", kWhatSeekDone);
1000                msg->post();
1001                break;
1002            }
1003
1004            case 'biny':
1005            {
1006                sp<ABuffer> buffer;
1007                CHECK(msg->findBuffer("buffer", &buffer));
1008
1009                int32_t index;
1010                CHECK(buffer->meta()->findInt32("index", &index));
1011
1012                mRTPConn->injectPacket(index, buffer);
1013                break;
1014            }
1015
1016            case 'tiou':
1017            {
1018                if (!mReceivedFirstRTCPPacket) {
1019                    if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
1020                        ALOGW("We received RTP packets but no RTCP packets, "
1021                             "using fake timestamps.");
1022
1023                        mTryFakeRTCP = true;
1024
1025                        mReceivedFirstRTCPPacket = true;
1026
1027                        fakeTimestamps();
1028                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1029                        ALOGW("Never received any data, switching transports.");
1030
1031                        mTryTCPInterleaving = true;
1032
1033                        sp<AMessage> msg = new AMessage('abor', id());
1034                        msg->setInt32("reconnect", true);
1035                        msg->post();
1036                    } else {
1037                        ALOGW("Never received any data, disconnecting.");
1038                        (new AMessage('abor', id()))->post();
1039                    }
1040                } else {
1041                    if (!mAllTracksHaveTime) {
1042                        ALOGW("We received some RTCP packets, but time "
1043                              "could not be established on all tracks, now "
1044                              "using fake timestamps");
1045
1046                        fakeTimestamps();
1047                    }
1048                }
1049                break;
1050            }
1051
1052            default:
1053                TRESPASS();
1054                break;
1055        }
1056    }
1057
1058    void postKeepAlive() {
1059        sp<AMessage> msg = new AMessage('aliv', id());
1060        msg->setInt32("generation", mKeepAliveGeneration);
1061        msg->post((mKeepAliveTimeoutUs * 9) / 10);
1062    }
1063
1064    void postAccessUnitTimeoutCheck() {
1065        if (mCheckPending) {
1066            return;
1067        }
1068
1069        mCheckPending = true;
1070        sp<AMessage> check = new AMessage('chek', id());
1071        check->setInt32("generation", mCheckGeneration);
1072        check->post(kAccessUnitTimeoutUs);
1073    }
1074
1075    static void SplitString(
1076            const AString &s, const char *separator, List<AString> *items) {
1077        items->clear();
1078        size_t start = 0;
1079        while (start < s.size()) {
1080            ssize_t offset = s.find(separator, start);
1081
1082            if (offset < 0) {
1083                items->push_back(AString(s, start, s.size() - start));
1084                break;
1085            }
1086
1087            items->push_back(AString(s, start, offset - start));
1088            start = offset + strlen(separator);
1089        }
1090    }
1091
1092    void parsePlayResponse(const sp<ARTSPResponse> &response) {
1093        mSeekable = false;
1094
1095        ssize_t i = response->mHeaders.indexOfKey("range");
1096        if (i < 0) {
1097            // Server doesn't even tell use what range it is going to
1098            // play, therefore we won't support seeking.
1099            return;
1100        }
1101
1102        AString range = response->mHeaders.valueAt(i);
1103        ALOGV("Range: %s", range.c_str());
1104
1105        AString val;
1106        CHECK(GetAttribute(range.c_str(), "npt", &val));
1107
1108        float npt1, npt2;
1109        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1110            // This is a live stream and therefore not seekable.
1111
1112            ALOGI("This is a live stream");
1113            return;
1114        }
1115
1116        i = response->mHeaders.indexOfKey("rtp-info");
1117        CHECK_GE(i, 0);
1118
1119        AString rtpInfo = response->mHeaders.valueAt(i);
1120        List<AString> streamInfos;
1121        SplitString(rtpInfo, ",", &streamInfos);
1122
1123        int n = 1;
1124        for (List<AString>::iterator it = streamInfos.begin();
1125             it != streamInfos.end(); ++it) {
1126            (*it).trim();
1127            ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1128
1129            CHECK(GetAttribute((*it).c_str(), "url", &val));
1130
1131            size_t trackIndex = 0;
1132            while (trackIndex < mTracks.size()
1133                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1134                ++trackIndex;
1135            }
1136            CHECK_LT(trackIndex, mTracks.size());
1137
1138            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1139
1140            char *end;
1141            unsigned long seq = strtoul(val.c_str(), &end, 10);
1142
1143            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1144            info->mFirstSeqNumInSegment = seq;
1145            info->mNewSegment = true;
1146
1147            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1148
1149            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1150
1151            ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1152
1153            info->mNormalPlayTimeRTP = rtpTime;
1154            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1155
1156            if (!mFirstAccessUnit) {
1157                postNormalPlayTimeMapping(
1158                        trackIndex,
1159                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1160            }
1161
1162            ++n;
1163        }
1164
1165        mSeekable = true;
1166    }
1167
1168    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1169        CHECK_GE(index, 0u);
1170        CHECK_LT(index, mTracks.size());
1171
1172        const TrackInfo &info = mTracks.itemAt(index);
1173
1174        *timeScale = info.mTimeScale;
1175
1176        return info.mPacketSource->getFormat();
1177    }
1178
1179    size_t countTracks() const {
1180        return mTracks.size();
1181    }
1182
1183private:
1184    struct TrackInfo {
1185        AString mURL;
1186        int mRTPSocket;
1187        int mRTCPSocket;
1188        bool mUsingInterleavedTCP;
1189        uint32_t mFirstSeqNumInSegment;
1190        bool mNewSegment;
1191
1192        uint32_t mRTPAnchor;
1193        int64_t mNTPAnchorUs;
1194        int32_t mTimeScale;
1195
1196        uint32_t mNormalPlayTimeRTP;
1197        int64_t mNormalPlayTimeUs;
1198
1199        sp<APacketSource> mPacketSource;
1200
1201        // Stores packets temporarily while no notion of time
1202        // has been established yet.
1203        List<sp<ABuffer> > mPackets;
1204    };
1205
1206    sp<AMessage> mNotify;
1207    bool mUIDValid;
1208    uid_t mUID;
1209    sp<ALooper> mNetLooper;
1210    sp<ARTSPConnection> mConn;
1211    sp<ARTPConnection> mRTPConn;
1212    sp<ASessionDescription> mSessionDesc;
1213    AString mOriginalSessionURL;  // This one still has user:pass@
1214    AString mSessionURL;
1215    AString mSessionHost;
1216    AString mBaseURL;
1217    AString mSessionID;
1218    bool mSetupTracksSuccessful;
1219    bool mSeekPending;
1220    bool mFirstAccessUnit;
1221
1222    bool mAllTracksHaveTime;
1223    int64_t mNTPAnchorUs;
1224    int64_t mMediaAnchorUs;
1225    int64_t mLastMediaTimeUs;
1226
1227    int64_t mNumAccessUnitsReceived;
1228    bool mCheckPending;
1229    int32_t mCheckGeneration;
1230    bool mTryTCPInterleaving;
1231    bool mTryFakeRTCP;
1232    bool mReceivedFirstRTCPPacket;
1233    bool mReceivedFirstRTPPacket;
1234    bool mSeekable;
1235    int64_t mKeepAliveTimeoutUs;
1236    int32_t mKeepAliveGeneration;
1237
1238    Vector<TrackInfo> mTracks;
1239
1240    void setupTrack(size_t index) {
1241        sp<APacketSource> source =
1242            new APacketSource(mSessionDesc, index);
1243
1244        if (source->initCheck() != OK) {
1245            ALOGW("Unsupported format. Ignoring track #%d.", index);
1246
1247            sp<AMessage> reply = new AMessage('setu', id());
1248            reply->setSize("index", index);
1249            reply->setInt32("result", ERROR_UNSUPPORTED);
1250            reply->post();
1251            return;
1252        }
1253
1254        AString url;
1255        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1256
1257        AString trackURL;
1258        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1259
1260        mTracks.push(TrackInfo());
1261        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1262        info->mURL = trackURL;
1263        info->mPacketSource = source;
1264        info->mUsingInterleavedTCP = false;
1265        info->mFirstSeqNumInSegment = 0;
1266        info->mNewSegment = true;
1267        info->mRTPAnchor = 0;
1268        info->mNTPAnchorUs = -1;
1269        info->mNormalPlayTimeRTP = 0;
1270        info->mNormalPlayTimeUs = 0ll;
1271
1272        unsigned long PT;
1273        AString formatDesc;
1274        AString formatParams;
1275        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1276
1277        int32_t timescale;
1278        int32_t numChannels;
1279        ASessionDescription::ParseFormatDesc(
1280                formatDesc.c_str(), &timescale, &numChannels);
1281
1282        info->mTimeScale = timescale;
1283
1284        ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1285
1286        AString request = "SETUP ";
1287        request.append(trackURL);
1288        request.append(" RTSP/1.0\r\n");
1289
1290        if (mTryTCPInterleaving) {
1291            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1292            info->mUsingInterleavedTCP = true;
1293            info->mRTPSocket = interleaveIndex;
1294            info->mRTCPSocket = interleaveIndex + 1;
1295
1296            request.append("Transport: RTP/AVP/TCP;interleaved=");
1297            request.append(interleaveIndex);
1298            request.append("-");
1299            request.append(interleaveIndex + 1);
1300        } else {
1301            unsigned rtpPort;
1302            ARTPConnection::MakePortPair(
1303                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1304
1305            if (mUIDValid) {
1306                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1307                                                (uint32_t)*(uint32_t*) "RTP_");
1308                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1309                                                (uint32_t)*(uint32_t*) "RTP_");
1310            }
1311
1312            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1313            request.append(rtpPort);
1314            request.append("-");
1315            request.append(rtpPort + 1);
1316        }
1317
1318        request.append("\r\n");
1319
1320        if (index > 1) {
1321            request.append("Session: ");
1322            request.append(mSessionID);
1323            request.append("\r\n");
1324        }
1325
1326        request.append("\r\n");
1327
1328        sp<AMessage> reply = new AMessage('setu', id());
1329        reply->setSize("index", index);
1330        reply->setSize("track-index", mTracks.size() - 1);
1331        mConn->sendRequest(request.c_str(), reply);
1332    }
1333
1334    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1335        out->clear();
1336
1337        if (strncasecmp("rtsp://", baseURL, 7)) {
1338            // Base URL must be absolute
1339            return false;
1340        }
1341
1342        if (!strncasecmp("rtsp://", url, 7)) {
1343            // "url" is already an absolute URL, ignore base URL.
1344            out->setTo(url);
1345            return true;
1346        }
1347
1348        size_t n = strlen(baseURL);
1349        if (baseURL[n - 1] == '/') {
1350            out->setTo(baseURL);
1351            out->append(url);
1352        } else {
1353            const char *slashPos = strrchr(baseURL, '/');
1354
1355            if (slashPos > &baseURL[6]) {
1356                out->setTo(baseURL, slashPos - baseURL);
1357            } else {
1358                out->setTo(baseURL);
1359            }
1360
1361            out->append("/");
1362            out->append(url);
1363        }
1364
1365        return true;
1366    }
1367
1368    void fakeTimestamps() {
1369        mNTPAnchorUs = -1ll;
1370        for (size_t i = 0; i < mTracks.size(); ++i) {
1371            onTimeUpdate(i, 0, 0ll);
1372        }
1373    }
1374
1375    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1376        ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1377             trackIndex, rtpTime, ntpTime);
1378
1379        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1380
1381        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1382
1383        track->mRTPAnchor = rtpTime;
1384        track->mNTPAnchorUs = ntpTimeUs;
1385
1386        if (mNTPAnchorUs < 0) {
1387            mNTPAnchorUs = ntpTimeUs;
1388            mMediaAnchorUs = mLastMediaTimeUs;
1389        }
1390
1391        if (!mAllTracksHaveTime) {
1392            bool allTracksHaveTime = true;
1393            for (size_t i = 0; i < mTracks.size(); ++i) {
1394                TrackInfo *track = &mTracks.editItemAt(i);
1395                if (track->mNTPAnchorUs < 0) {
1396                    allTracksHaveTime = false;
1397                    break;
1398                }
1399            }
1400            if (allTracksHaveTime) {
1401                mAllTracksHaveTime = true;
1402                ALOGI("Time now established for all tracks.");
1403            }
1404        }
1405    }
1406
1407    void onAccessUnitComplete(
1408            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1409        ALOGV("onAccessUnitComplete track %d", trackIndex);
1410
1411        if (mFirstAccessUnit) {
1412            sp<AMessage> msg = mNotify->dup();
1413            msg->setInt32("what", kWhatConnected);
1414            msg->post();
1415
1416            if (mSeekable) {
1417                for (size_t i = 0; i < mTracks.size(); ++i) {
1418                    TrackInfo *info = &mTracks.editItemAt(i);
1419
1420                    postNormalPlayTimeMapping(
1421                            i,
1422                            info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1423                }
1424            }
1425
1426            mFirstAccessUnit = false;
1427        }
1428
1429        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1430
1431        if (!mAllTracksHaveTime) {
1432            ALOGV("storing accessUnit, no time established yet");
1433            track->mPackets.push_back(accessUnit);
1434            return;
1435        }
1436
1437        while (!track->mPackets.empty()) {
1438            sp<ABuffer> accessUnit = *track->mPackets.begin();
1439            track->mPackets.erase(track->mPackets.begin());
1440
1441            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1442                postQueueAccessUnit(trackIndex, accessUnit);
1443            }
1444        }
1445
1446        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1447            postQueueAccessUnit(trackIndex, accessUnit);
1448        }
1449    }
1450
1451    bool addMediaTimestamp(
1452            int32_t trackIndex, const TrackInfo *track,
1453            const sp<ABuffer> &accessUnit) {
1454        uint32_t rtpTime;
1455        CHECK(accessUnit->meta()->findInt32(
1456                    "rtp-time", (int32_t *)&rtpTime));
1457
1458        int64_t relRtpTimeUs =
1459            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1460                / track->mTimeScale;
1461
1462        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1463
1464        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1465
1466        if (mediaTimeUs > mLastMediaTimeUs) {
1467            mLastMediaTimeUs = mediaTimeUs;
1468        }
1469
1470        if (mediaTimeUs < 0) {
1471            ALOGV("dropping early accessUnit.");
1472            return false;
1473        }
1474
1475        ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1476             trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1477
1478        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1479
1480        return true;
1481    }
1482
1483    void postQueueAccessUnit(
1484            size_t trackIndex, const sp<ABuffer> &accessUnit) {
1485        sp<AMessage> msg = mNotify->dup();
1486        msg->setInt32("what", kWhatAccessUnit);
1487        msg->setSize("trackIndex", trackIndex);
1488        msg->setBuffer("accessUnit", accessUnit);
1489        msg->post();
1490    }
1491
1492    void postQueueEOS(size_t trackIndex, status_t finalResult) {
1493        sp<AMessage> msg = mNotify->dup();
1494        msg->setInt32("what", kWhatEOS);
1495        msg->setSize("trackIndex", trackIndex);
1496        msg->setInt32("finalResult", finalResult);
1497        msg->post();
1498    }
1499
1500    void postQueueSeekDiscontinuity(size_t trackIndex) {
1501        sp<AMessage> msg = mNotify->dup();
1502        msg->setInt32("what", kWhatSeekDiscontinuity);
1503        msg->setSize("trackIndex", trackIndex);
1504        msg->post();
1505    }
1506
1507    void postNormalPlayTimeMapping(
1508            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1509        sp<AMessage> msg = mNotify->dup();
1510        msg->setInt32("what", kWhatNormalPlayTimeMapping);
1511        msg->setSize("trackIndex", trackIndex);
1512        msg->setInt32("rtpTime", rtpTime);
1513        msg->setInt64("nptUs", nptUs);
1514        msg->post();
1515    }
1516
1517    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1518};
1519
1520}  // namespace android
1521
1522#endif  // MY_HANDLER_H_
1523