MyHandler.h revision bbbf9c4552402ab18b255f4058e9e6e506f3f106
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef MY_HANDLER_H_
18
19#define MY_HANDLER_H_
20
21//#define LOG_NDEBUG 0
22#define LOG_TAG "MyHandler"
23#include <utils/Log.h>
24
25#include "APacketSource.h"
26#include "ARTPConnection.h"
27#include "ARTSPConnection.h"
28#include "ASessionDescription.h"
29
30#include <ctype.h>
31#include <cutils/properties.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/ALooper.h>
36#include <media/stagefright/foundation/AMessage.h>
37#include <media/stagefright/MediaErrors.h>
38
39#include <arpa/inet.h>
40#include <sys/socket.h>
41#include <netdb.h>
42
43#include "HTTPBase.h"
44
45// If no access units are received within 5 secs, assume that the rtp
46// stream has ended and signal end of stream.
47static int64_t kAccessUnitTimeoutUs = 10000000ll;
48
49// If no access units arrive for the first 10 secs after starting the
50// stream, assume none ever will and signal EOS or switch transports.
51static int64_t kStartupTimeoutUs = 10000000ll;
52
53static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
54
55namespace android {
56
57static void MakeUserAgentString(AString *s) {
58    s->setTo("stagefright/1.1 (Linux;Android ");
59
60#if (PROPERTY_VALUE_MAX < 8)
61#error "PROPERTY_VALUE_MAX must be at least 8"
62#endif
63
64    char value[PROPERTY_VALUE_MAX];
65    property_get("ro.build.version.release", value, "Unknown");
66    s->append(value);
67    s->append(")");
68}
69
70static bool GetAttribute(const char *s, const char *key, AString *value) {
71    value->clear();
72
73    size_t keyLen = strlen(key);
74
75    for (;;) {
76        while (isspace(*s)) {
77            ++s;
78        }
79
80        const char *colonPos = strchr(s, ';');
81
82        size_t len =
83            (colonPos == NULL) ? strlen(s) : colonPos - s;
84
85        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
86            value->setTo(&s[keyLen + 1], len - keyLen - 1);
87            return true;
88        }
89
90        if (colonPos == NULL) {
91            return false;
92        }
93
94        s = colonPos + 1;
95    }
96}
97
98struct MyHandler : public AHandler {
99    enum {
100        kWhatConnected                  = 'conn',
101        kWhatDisconnected               = 'disc',
102        kWhatSeekDone                   = 'sdon',
103
104        kWhatAccessUnit                 = 'accU',
105        kWhatEOS                        = 'eos!',
106        kWhatSeekDiscontinuity          = 'seeD',
107        kWhatNormalPlayTimeMapping      = 'nptM',
108    };
109
110    MyHandler(
111            const char *url,
112            const sp<AMessage> &notify,
113            bool uidValid = false, uid_t uid = 0)
114        : mNotify(notify),
115          mUIDValid(uidValid),
116          mUID(uid),
117          mNetLooper(new ALooper),
118          mConn(new ARTSPConnection(mUIDValid, mUID)),
119          mRTPConn(new ARTPConnection),
120          mOriginalSessionURL(url),
121          mSessionURL(url),
122          mSetupTracksSuccessful(false),
123          mSeekPending(false),
124          mFirstAccessUnit(true),
125          mAllTracksHaveTime(false),
126          mNTPAnchorUs(-1),
127          mMediaAnchorUs(-1),
128          mLastMediaTimeUs(0),
129          mNumAccessUnitsReceived(0),
130          mCheckPending(false),
131          mCheckGeneration(0),
132          mTryTCPInterleaving(false),
133          mTryFakeRTCP(false),
134          mReceivedFirstRTCPPacket(false),
135          mReceivedFirstRTPPacket(false),
136          mSeekable(false),
137          mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
138          mKeepAliveGeneration(0),
139          mPlayResponseParsed(false) {
140        mNetLooper->setName("rtsp net");
141        mNetLooper->start(false /* runOnCallingThread */,
142                          false /* canCallJava */,
143                          PRIORITY_HIGHEST);
144
145        // Strip any authentication info from the session url, we don't
146        // want to transmit user/pass in cleartext.
147        AString host, path, user, pass;
148        unsigned port;
149        CHECK(ARTSPConnection::ParseURL(
150                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));
151
152        if (user.size() > 0) {
153            mSessionURL.clear();
154            mSessionURL.append("rtsp://");
155            mSessionURL.append(host);
156            mSessionURL.append(":");
157            mSessionURL.append(StringPrintf("%u", port));
158            mSessionURL.append(path);
159
160            ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
161        }
162
163        mSessionHost = host;
164    }
165
166    void connect() {
167        looper()->registerHandler(mConn);
168        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
169
170        sp<AMessage> notify = new AMessage('biny', id());
171        mConn->observeBinaryData(notify);
172
173        sp<AMessage> reply = new AMessage('conn', id());
174        mConn->connect(mOriginalSessionURL.c_str(), reply);
175    }
176
177    void disconnect() {
178        (new AMessage('abor', id()))->post();
179    }
180
181    void seek(int64_t timeUs) {
182        sp<AMessage> msg = new AMessage('seek', id());
183        msg->setInt64("time", timeUs);
184        msg->post();
185    }
186
187    static void addRR(const sp<ABuffer> &buf) {
188        uint8_t *ptr = buf->data() + buf->size();
189        ptr[0] = 0x80 | 0;
190        ptr[1] = 201;  // RR
191        ptr[2] = 0;
192        ptr[3] = 1;
193        ptr[4] = 0xde;  // SSRC
194        ptr[5] = 0xad;
195        ptr[6] = 0xbe;
196        ptr[7] = 0xef;
197
198        buf->setRange(0, buf->size() + 8);
199    }
200
201    static void addSDES(int s, const sp<ABuffer> &buffer) {
202        struct sockaddr_in addr;
203        socklen_t addrSize = sizeof(addr);
204        CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
205
206        uint8_t *data = buffer->data() + buffer->size();
207        data[0] = 0x80 | 1;
208        data[1] = 202;  // SDES
209        data[4] = 0xde;  // SSRC
210        data[5] = 0xad;
211        data[6] = 0xbe;
212        data[7] = 0xef;
213
214        size_t offset = 8;
215
216        data[offset++] = 1;  // CNAME
217
218        AString cname = "stagefright@";
219        cname.append(inet_ntoa(addr.sin_addr));
220        data[offset++] = cname.size();
221
222        memcpy(&data[offset], cname.c_str(), cname.size());
223        offset += cname.size();
224
225        data[offset++] = 6;  // TOOL
226
227        AString tool;
228        MakeUserAgentString(&tool);
229
230        data[offset++] = tool.size();
231
232        memcpy(&data[offset], tool.c_str(), tool.size());
233        offset += tool.size();
234
235        data[offset++] = 0;
236
237        if ((offset % 4) > 0) {
238            size_t count = 4 - (offset % 4);
239            switch (count) {
240                case 3:
241                    data[offset++] = 0;
242                case 2:
243                    data[offset++] = 0;
244                case 1:
245                    data[offset++] = 0;
246            }
247        }
248
249        size_t numWords = (offset / 4) - 1;
250        data[2] = numWords >> 8;
251        data[3] = numWords & 0xff;
252
253        buffer->setRange(buffer->offset(), buffer->size() + offset);
254    }
255
256    // In case we're behind NAT, fire off two UDP packets to the remote
257    // rtp/rtcp ports to poke a hole into the firewall for future incoming
258    // packets. We're going to send an RR/SDES RTCP packet to both of them.
259    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
260        struct sockaddr_in addr;
261        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
262        addr.sin_family = AF_INET;
263
264        AString source;
265        AString server_port;
266        if (!GetAttribute(transport.c_str(),
267                          "source",
268                          &source)) {
269            ALOGW("Missing 'source' field in Transport response. Using "
270                 "RTSP endpoint address.");
271
272            struct hostent *ent = gethostbyname(mSessionHost.c_str());
273            if (ent == NULL) {
274                ALOGE("Failed to look up address of session host '%s'",
275                     mSessionHost.c_str());
276
277                return false;
278            }
279
280            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
281        } else {
282            addr.sin_addr.s_addr = inet_addr(source.c_str());
283        }
284
285        if (!GetAttribute(transport.c_str(),
286                                 "server_port",
287                                 &server_port)) {
288            ALOGI("Missing 'server_port' field in Transport response.");
289            return false;
290        }
291
292        int rtpPort, rtcpPort;
293        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
294                || rtpPort <= 0 || rtpPort > 65535
295                || rtcpPort <=0 || rtcpPort > 65535
296                || rtcpPort != rtpPort + 1) {
297            ALOGE("Server picked invalid RTP/RTCP port pair %s,"
298                 " RTP port must be even, RTCP port must be one higher.",
299                 server_port.c_str());
300
301            return false;
302        }
303
304        if (rtpPort & 1) {
305            ALOGW("Server picked an odd RTP port, it should've picked an "
306                 "even one, we'll let it pass for now, but this may break "
307                 "in the future.");
308        }
309
310        if (addr.sin_addr.s_addr == INADDR_NONE) {
311            return true;
312        }
313
314        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
315            // No firewalls to traverse on the loopback interface.
316            return true;
317        }
318
319        // Make up an RR/SDES RTCP packet.
320        sp<ABuffer> buf = new ABuffer(65536);
321        buf->setRange(0, 0);
322        addRR(buf);
323        addSDES(rtpSocket, buf);
324
325        addr.sin_port = htons(rtpPort);
326
327        ssize_t n = sendto(
328                rtpSocket, buf->data(), buf->size(), 0,
329                (const sockaddr *)&addr, sizeof(addr));
330
331        if (n < (ssize_t)buf->size()) {
332            ALOGE("failed to poke a hole for RTP packets");
333            return false;
334        }
335
336        addr.sin_port = htons(rtcpPort);
337
338        n = sendto(
339                rtcpSocket, buf->data(), buf->size(), 0,
340                (const sockaddr *)&addr, sizeof(addr));
341
342        if (n < (ssize_t)buf->size()) {
343            ALOGE("failed to poke a hole for RTCP packets");
344            return false;
345        }
346
347        ALOGV("successfully poked holes.");
348
349        return true;
350    }
351
352    virtual void onMessageReceived(const sp<AMessage> &msg) {
353        switch (msg->what()) {
354            case 'conn':
355            {
356                int32_t result;
357                CHECK(msg->findInt32("result", &result));
358
359                ALOGI("connection request completed with result %d (%s)",
360                     result, strerror(-result));
361
362                if (result == OK) {
363                    AString request;
364                    request = "DESCRIBE ";
365                    request.append(mSessionURL);
366                    request.append(" RTSP/1.0\r\n");
367                    request.append("Accept: application/sdp\r\n");
368                    request.append("\r\n");
369
370                    sp<AMessage> reply = new AMessage('desc', id());
371                    mConn->sendRequest(request.c_str(), reply);
372                } else {
373                    (new AMessage('disc', id()))->post();
374                }
375                break;
376            }
377
378            case 'disc':
379            {
380                ++mKeepAliveGeneration;
381
382                int32_t reconnect;
383                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
384                    sp<AMessage> reply = new AMessage('conn', id());
385                    mConn->connect(mOriginalSessionURL.c_str(), reply);
386                } else {
387                    (new AMessage('quit', id()))->post();
388                }
389                break;
390            }
391
392            case 'desc':
393            {
394                int32_t result;
395                CHECK(msg->findInt32("result", &result));
396
397                ALOGI("DESCRIBE completed with result %d (%s)",
398                     result, strerror(-result));
399
400                if (result == OK) {
401                    sp<RefBase> obj;
402                    CHECK(msg->findObject("response", &obj));
403                    sp<ARTSPResponse> response =
404                        static_cast<ARTSPResponse *>(obj.get());
405
406                    if (response->mStatusCode == 302) {
407                        ssize_t i = response->mHeaders.indexOfKey("location");
408                        CHECK_GE(i, 0);
409
410                        mSessionURL = response->mHeaders.valueAt(i);
411
412                        AString request;
413                        request = "DESCRIBE ";
414                        request.append(mSessionURL);
415                        request.append(" RTSP/1.0\r\n");
416                        request.append("Accept: application/sdp\r\n");
417                        request.append("\r\n");
418
419                        sp<AMessage> reply = new AMessage('desc', id());
420                        mConn->sendRequest(request.c_str(), reply);
421                        break;
422                    }
423
424                    if (response->mStatusCode != 200) {
425                        result = UNKNOWN_ERROR;
426                    } else if (response->mContent == NULL) {
427                        result = ERROR_MALFORMED;
428                        ALOGE("The response has no content.");
429                    } else {
430                        mSessionDesc = new ASessionDescription;
431
432                        mSessionDesc->setTo(
433                                response->mContent->data(),
434                                response->mContent->size());
435
436                        if (!mSessionDesc->isValid()) {
437                            ALOGE("Failed to parse session description.");
438                            result = ERROR_MALFORMED;
439                        } else {
440                            ssize_t i = response->mHeaders.indexOfKey("content-base");
441                            if (i >= 0) {
442                                mBaseURL = response->mHeaders.valueAt(i);
443                            } else {
444                                i = response->mHeaders.indexOfKey("content-location");
445                                if (i >= 0) {
446                                    mBaseURL = response->mHeaders.valueAt(i);
447                                } else {
448                                    mBaseURL = mSessionURL;
449                                }
450                            }
451
452                            if (!mBaseURL.startsWith("rtsp://")) {
453                                // Some misbehaving servers specify a relative
454                                // URL in one of the locations above, combine
455                                // it with the absolute session URL to get
456                                // something usable...
457
458                                ALOGW("Server specified a non-absolute base URL"
459                                     ", combining it with the session URL to "
460                                     "get something usable...");
461
462                                AString tmp;
463                                CHECK(MakeURL(
464                                            mSessionURL.c_str(),
465                                            mBaseURL.c_str(),
466                                            &tmp));
467
468                                mBaseURL = tmp;
469                            }
470
471                            if (mSessionDesc->countTracks() < 2) {
472                                // There's no actual tracks in this session.
473                                // The first "track" is merely session meta
474                                // data.
475
476                                ALOGW("Session doesn't contain any playable "
477                                     "tracks. Aborting.");
478                                result = ERROR_UNSUPPORTED;
479                            } else {
480                                setupTrack(1);
481                            }
482                        }
483                    }
484                }
485
486                if (result != OK) {
487                    sp<AMessage> reply = new AMessage('disc', id());
488                    mConn->disconnect(reply);
489                }
490                break;
491            }
492
493            case 'setu':
494            {
495                size_t index;
496                CHECK(msg->findSize("index", &index));
497
498                TrackInfo *track = NULL;
499                size_t trackIndex;
500                if (msg->findSize("track-index", &trackIndex)) {
501                    track = &mTracks.editItemAt(trackIndex);
502                }
503
504                int32_t result;
505                CHECK(msg->findInt32("result", &result));
506
507                ALOGI("SETUP(%d) completed with result %d (%s)",
508                     index, result, strerror(-result));
509
510                if (result == OK) {
511                    CHECK(track != NULL);
512
513                    sp<RefBase> obj;
514                    CHECK(msg->findObject("response", &obj));
515                    sp<ARTSPResponse> response =
516                        static_cast<ARTSPResponse *>(obj.get());
517
518                    if (response->mStatusCode != 200) {
519                        result = UNKNOWN_ERROR;
520                    } else {
521                        ssize_t i = response->mHeaders.indexOfKey("session");
522                        CHECK_GE(i, 0);
523
524                        mSessionID = response->mHeaders.valueAt(i);
525
526                        mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
527                        AString timeoutStr;
528                        if (GetAttribute(
529                                    mSessionID.c_str(), "timeout", &timeoutStr)) {
530                            char *end;
531                            unsigned long timeoutSecs =
532                                strtoul(timeoutStr.c_str(), &end, 10);
533
534                            if (end == timeoutStr.c_str() || *end != '\0') {
535                                ALOGW("server specified malformed timeout '%s'",
536                                     timeoutStr.c_str());
537
538                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
539                            } else if (timeoutSecs < 15) {
540                                ALOGW("server specified too short a timeout "
541                                     "(%lu secs), using default.",
542                                     timeoutSecs);
543
544                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
545                            } else {
546                                mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
547
548                                ALOGI("server specified timeout of %lu secs.",
549                                     timeoutSecs);
550                            }
551                        }
552
553                        i = mSessionID.find(";");
554                        if (i >= 0) {
555                            // Remove options, i.e. ";timeout=90"
556                            mSessionID.erase(i, mSessionID.size() - i);
557                        }
558
559                        sp<AMessage> notify = new AMessage('accu', id());
560                        notify->setSize("track-index", trackIndex);
561
562                        i = response->mHeaders.indexOfKey("transport");
563                        CHECK_GE(i, 0);
564
565                        if (!track->mUsingInterleavedTCP) {
566                            AString transport = response->mHeaders.valueAt(i);
567
568                            // We are going to continue even if we were
569                            // unable to poke a hole into the firewall...
570                            pokeAHole(
571                                    track->mRTPSocket,
572                                    track->mRTCPSocket,
573                                    transport);
574                        }
575
576                        mRTPConn->addStream(
577                                track->mRTPSocket, track->mRTCPSocket,
578                                mSessionDesc, index,
579                                notify, track->mUsingInterleavedTCP);
580
581                        mSetupTracksSuccessful = true;
582                    }
583                }
584
585                if (result != OK) {
586                    if (track) {
587                        if (!track->mUsingInterleavedTCP) {
588                            // Clear the tag
589                            if (mUIDValid) {
590                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
591                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
592                            }
593
594                            close(track->mRTPSocket);
595                            close(track->mRTCPSocket);
596                        }
597
598                        mTracks.removeItemsAt(trackIndex);
599                    }
600                }
601
602                ++index;
603                if (index < mSessionDesc->countTracks()) {
604                    setupTrack(index);
605                } else if (mSetupTracksSuccessful) {
606                    ++mKeepAliveGeneration;
607                    postKeepAlive();
608
609                    AString request = "PLAY ";
610                    request.append(mSessionURL);
611                    request.append(" RTSP/1.0\r\n");
612
613                    request.append("Session: ");
614                    request.append(mSessionID);
615                    request.append("\r\n");
616
617                    request.append("\r\n");
618
619                    sp<AMessage> reply = new AMessage('play', id());
620                    mConn->sendRequest(request.c_str(), reply);
621                } else {
622                    sp<AMessage> reply = new AMessage('disc', id());
623                    mConn->disconnect(reply);
624                }
625                break;
626            }
627
628            case 'play':
629            {
630                int32_t result;
631                CHECK(msg->findInt32("result", &result));
632
633                ALOGI("PLAY completed with result %d (%s)",
634                     result, strerror(-result));
635
636                if (result == OK) {
637                    sp<RefBase> obj;
638                    CHECK(msg->findObject("response", &obj));
639                    sp<ARTSPResponse> response =
640                        static_cast<ARTSPResponse *>(obj.get());
641
642                    if (response->mStatusCode != 200) {
643                        result = UNKNOWN_ERROR;
644                    } else {
645                        parsePlayResponse(response);
646
647                        sp<AMessage> timeout = new AMessage('tiou', id());
648                        timeout->post(kStartupTimeoutUs);
649                    }
650                }
651
652                if (result != OK) {
653                    sp<AMessage> reply = new AMessage('disc', id());
654                    mConn->disconnect(reply);
655                }
656
657                break;
658            }
659
660            case 'aliv':
661            {
662                int32_t generation;
663                CHECK(msg->findInt32("generation", &generation));
664
665                if (generation != mKeepAliveGeneration) {
666                    // obsolete event.
667                    break;
668                }
669
670                AString request;
671                request.append("OPTIONS ");
672                request.append(mSessionURL);
673                request.append(" RTSP/1.0\r\n");
674                request.append("Session: ");
675                request.append(mSessionID);
676                request.append("\r\n");
677                request.append("\r\n");
678
679                sp<AMessage> reply = new AMessage('opts', id());
680                reply->setInt32("generation", mKeepAliveGeneration);
681                mConn->sendRequest(request.c_str(), reply);
682                break;
683            }
684
685            case 'opts':
686            {
687                int32_t result;
688                CHECK(msg->findInt32("result", &result));
689
690                ALOGI("OPTIONS completed with result %d (%s)",
691                     result, strerror(-result));
692
693                int32_t generation;
694                CHECK(msg->findInt32("generation", &generation));
695
696                if (generation != mKeepAliveGeneration) {
697                    // obsolete event.
698                    break;
699                }
700
701                postKeepAlive();
702                break;
703            }
704
705            case 'abor':
706            {
707                for (size_t i = 0; i < mTracks.size(); ++i) {
708                    TrackInfo *info = &mTracks.editItemAt(i);
709
710                    if (!mFirstAccessUnit) {
711                        postQueueEOS(i, ERROR_END_OF_STREAM);
712                    }
713
714                    if (!info->mUsingInterleavedTCP) {
715                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
716
717                        // Clear the tag
718                        if (mUIDValid) {
719                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
720                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
721                        }
722
723                        close(info->mRTPSocket);
724                        close(info->mRTCPSocket);
725                    }
726                }
727                mTracks.clear();
728                mSetupTracksSuccessful = false;
729                mSeekPending = false;
730                mFirstAccessUnit = true;
731                mAllTracksHaveTime = false;
732                mNTPAnchorUs = -1;
733                mMediaAnchorUs = -1;
734                mNumAccessUnitsReceived = 0;
735                mReceivedFirstRTCPPacket = false;
736                mReceivedFirstRTPPacket = false;
737                mSeekable = false;
738
739                sp<AMessage> reply = new AMessage('tear', id());
740
741                int32_t reconnect;
742                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
743                    reply->setInt32("reconnect", true);
744                }
745
746                AString request;
747                request = "TEARDOWN ";
748
749                // XXX should use aggregate url from SDP here...
750                request.append(mSessionURL);
751                request.append(" RTSP/1.0\r\n");
752
753                request.append("Session: ");
754                request.append(mSessionID);
755                request.append("\r\n");
756
757                request.append("\r\n");
758
759                mConn->sendRequest(request.c_str(), reply);
760                break;
761            }
762
763            case 'tear':
764            {
765                int32_t result;
766                CHECK(msg->findInt32("result", &result));
767
768                ALOGI("TEARDOWN completed with result %d (%s)",
769                     result, strerror(-result));
770
771                sp<AMessage> reply = new AMessage('disc', id());
772
773                int32_t reconnect;
774                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
775                    reply->setInt32("reconnect", true);
776                }
777
778                mConn->disconnect(reply);
779                break;
780            }
781
782            case 'quit':
783            {
784                sp<AMessage> msg = mNotify->dup();
785                msg->setInt32("what", kWhatDisconnected);
786                msg->setInt32("result", UNKNOWN_ERROR);
787                msg->post();
788                break;
789            }
790
791            case 'chek':
792            {
793                int32_t generation;
794                CHECK(msg->findInt32("generation", &generation));
795                if (generation != mCheckGeneration) {
796                    // This is an outdated message. Ignore.
797                    break;
798                }
799
800                if (mNumAccessUnitsReceived == 0) {
801#if 1
802                    ALOGI("stream ended? aborting.");
803                    (new AMessage('abor', id()))->post();
804                    break;
805#else
806                    ALOGI("haven't seen an AU in a looong time.");
807#endif
808                }
809
810                mNumAccessUnitsReceived = 0;
811                msg->post(kAccessUnitTimeoutUs);
812                break;
813            }
814
815            case 'accu':
816            {
817                int32_t timeUpdate;
818                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
819                    size_t trackIndex;
820                    CHECK(msg->findSize("track-index", &trackIndex));
821
822                    uint32_t rtpTime;
823                    uint64_t ntpTime;
824                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
825                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
826
827                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
828                    break;
829                }
830
831                int32_t first;
832                if (msg->findInt32("first-rtcp", &first)) {
833                    mReceivedFirstRTCPPacket = true;
834                    break;
835                }
836
837                if (msg->findInt32("first-rtp", &first)) {
838                    mReceivedFirstRTPPacket = true;
839                    break;
840                }
841
842                ++mNumAccessUnitsReceived;
843                postAccessUnitTimeoutCheck();
844
845                size_t trackIndex;
846                CHECK(msg->findSize("track-index", &trackIndex));
847
848                if (trackIndex >= mTracks.size()) {
849                    ALOGV("late packets ignored.");
850                    break;
851                }
852
853                TrackInfo *track = &mTracks.editItemAt(trackIndex);
854
855                int32_t eos;
856                if (msg->findInt32("eos", &eos)) {
857                    ALOGI("received BYE on track index %d", trackIndex);
858#if 0
859                    track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
860#endif
861                    return;
862                }
863
864                sp<ABuffer> accessUnit;
865                CHECK(msg->findBuffer("access-unit", &accessUnit));
866
867                uint32_t seqNum = (uint32_t)accessUnit->int32Data();
868
869                if (mSeekPending) {
870                    ALOGV("we're seeking, dropping stale packet.");
871                    break;
872                }
873
874                if (seqNum < track->mFirstSeqNumInSegment) {
875                    ALOGV("dropping stale access-unit (%d < %d)",
876                         seqNum, track->mFirstSeqNumInSegment);
877                    break;
878                }
879
880                if (track->mNewSegment) {
881                    track->mNewSegment = false;
882                }
883
884                onAccessUnitComplete(trackIndex, accessUnit);
885                break;
886            }
887
888            case 'seek':
889            {
890                if (!mSeekable) {
891                    ALOGW("This is a live stream, ignoring seek request.");
892
893                    sp<AMessage> msg = mNotify->dup();
894                    msg->setInt32("what", kWhatSeekDone);
895                    msg->post();
896                    break;
897                }
898
899                int64_t timeUs;
900                CHECK(msg->findInt64("time", &timeUs));
901
902                mSeekPending = true;
903
904                // Disable the access unit timeout until we resumed
905                // playback again.
906                mCheckPending = true;
907                ++mCheckGeneration;
908
909                AString request = "PAUSE ";
910                request.append(mSessionURL);
911                request.append(" RTSP/1.0\r\n");
912
913                request.append("Session: ");
914                request.append(mSessionID);
915                request.append("\r\n");
916
917                request.append("\r\n");
918
919                sp<AMessage> reply = new AMessage('see1', id());
920                reply->setInt64("time", timeUs);
921                mConn->sendRequest(request.c_str(), reply);
922                break;
923            }
924
925            case 'see1':
926            {
927                // Session is paused now.
928                for (size_t i = 0; i < mTracks.size(); ++i) {
929                    TrackInfo *info = &mTracks.editItemAt(i);
930
931                    postQueueSeekDiscontinuity(i);
932
933                    info->mRTPAnchor = 0;
934                    info->mNTPAnchorUs = -1;
935                }
936
937                mAllTracksHaveTime = false;
938                mNTPAnchorUs = -1;
939
940                int64_t timeUs;
941                CHECK(msg->findInt64("time", &timeUs));
942
943                AString request = "PLAY ";
944                request.append(mSessionURL);
945                request.append(" RTSP/1.0\r\n");
946
947                request.append("Session: ");
948                request.append(mSessionID);
949                request.append("\r\n");
950
951                request.append(
952                        StringPrintf(
953                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));
954
955                request.append("\r\n");
956
957                sp<AMessage> reply = new AMessage('see2', id());
958                mConn->sendRequest(request.c_str(), reply);
959                break;
960            }
961
962            case 'see2':
963            {
964                CHECK(mSeekPending);
965
966                int32_t result;
967                CHECK(msg->findInt32("result", &result));
968
969                ALOGI("PLAY completed with result %d (%s)",
970                     result, strerror(-result));
971
972                mCheckPending = false;
973                postAccessUnitTimeoutCheck();
974
975                if (result == OK) {
976                    sp<RefBase> obj;
977                    CHECK(msg->findObject("response", &obj));
978                    sp<ARTSPResponse> response =
979                        static_cast<ARTSPResponse *>(obj.get());
980
981                    if (response->mStatusCode != 200) {
982                        result = UNKNOWN_ERROR;
983                    } else {
984                        parsePlayResponse(response);
985
986                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
987                        CHECK_GE(i, 0);
988
989                        ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
990
991                        ALOGI("seek completed.");
992                    }
993                }
994
995                if (result != OK) {
996                    ALOGE("seek failed, aborting.");
997                    (new AMessage('abor', id()))->post();
998                }
999
1000                mSeekPending = false;
1001
1002                sp<AMessage> msg = mNotify->dup();
1003                msg->setInt32("what", kWhatSeekDone);
1004                msg->post();
1005                break;
1006            }
1007
1008            case 'biny':
1009            {
1010                sp<ABuffer> buffer;
1011                CHECK(msg->findBuffer("buffer", &buffer));
1012
1013                int32_t index;
1014                CHECK(buffer->meta()->findInt32("index", &index));
1015
1016                mRTPConn->injectPacket(index, buffer);
1017                break;
1018            }
1019
1020            case 'tiou':
1021            {
1022                if (!mReceivedFirstRTCPPacket) {
1023                    if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
1024                        ALOGW("We received RTP packets but no RTCP packets, "
1025                             "using fake timestamps.");
1026
1027                        mTryFakeRTCP = true;
1028
1029                        mReceivedFirstRTCPPacket = true;
1030
1031                        fakeTimestamps();
1032                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1033                        ALOGW("Never received any data, switching transports.");
1034
1035                        mTryTCPInterleaving = true;
1036
1037                        sp<AMessage> msg = new AMessage('abor', id());
1038                        msg->setInt32("reconnect", true);
1039                        msg->post();
1040                    } else {
1041                        ALOGW("Never received any data, disconnecting.");
1042                        (new AMessage('abor', id()))->post();
1043                    }
1044                } else {
1045                    if (!mAllTracksHaveTime) {
1046                        ALOGW("We received some RTCP packets, but time "
1047                              "could not be established on all tracks, now "
1048                              "using fake timestamps");
1049
1050                        fakeTimestamps();
1051                    }
1052                }
1053                break;
1054            }
1055
1056            default:
1057                TRESPASS();
1058                break;
1059        }
1060    }
1061
1062    void postKeepAlive() {
1063        sp<AMessage> msg = new AMessage('aliv', id());
1064        msg->setInt32("generation", mKeepAliveGeneration);
1065        msg->post((mKeepAliveTimeoutUs * 9) / 10);
1066    }
1067
1068    void postAccessUnitTimeoutCheck() {
1069        if (mCheckPending) {
1070            return;
1071        }
1072
1073        mCheckPending = true;
1074        sp<AMessage> check = new AMessage('chek', id());
1075        check->setInt32("generation", mCheckGeneration);
1076        check->post(kAccessUnitTimeoutUs);
1077    }
1078
1079    static void SplitString(
1080            const AString &s, const char *separator, List<AString> *items) {
1081        items->clear();
1082        size_t start = 0;
1083        while (start < s.size()) {
1084            ssize_t offset = s.find(separator, start);
1085
1086            if (offset < 0) {
1087                items->push_back(AString(s, start, s.size() - start));
1088                break;
1089            }
1090
1091            items->push_back(AString(s, start, offset - start));
1092            start = offset + strlen(separator);
1093        }
1094    }
1095
1096    void parsePlayResponse(const sp<ARTSPResponse> &response) {
1097        mPlayResponseParsed = true;
1098        mSeekable = false;
1099        if (mTracks.size() == 0) {
1100            ALOGV("parsePlayResponse: late packets ignored.");
1101            return;
1102        }
1103
1104        ssize_t i = response->mHeaders.indexOfKey("range");
1105        if (i < 0) {
1106            // Server doesn't even tell use what range it is going to
1107            // play, therefore we won't support seeking.
1108            return;
1109        }
1110
1111        AString range = response->mHeaders.valueAt(i);
1112        ALOGV("Range: %s", range.c_str());
1113
1114        AString val;
1115        CHECK(GetAttribute(range.c_str(), "npt", &val));
1116
1117        float npt1, npt2;
1118        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1119            // This is a live stream and therefore not seekable.
1120
1121            ALOGI("This is a live stream");
1122            return;
1123        }
1124
1125        i = response->mHeaders.indexOfKey("rtp-info");
1126        CHECK_GE(i, 0);
1127
1128        AString rtpInfo = response->mHeaders.valueAt(i);
1129        List<AString> streamInfos;
1130        SplitString(rtpInfo, ",", &streamInfos);
1131
1132        int n = 1;
1133        for (List<AString>::iterator it = streamInfos.begin();
1134             it != streamInfos.end(); ++it) {
1135            (*it).trim();
1136            ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1137
1138            CHECK(GetAttribute((*it).c_str(), "url", &val));
1139
1140            size_t trackIndex = 0;
1141            while (trackIndex < mTracks.size()
1142                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1143                ++trackIndex;
1144            }
1145            CHECK_LT(trackIndex, mTracks.size());
1146
1147            CHECK(GetAttribute((*it).c_str(), "seq", &val));
1148
1149            char *end;
1150            unsigned long seq = strtoul(val.c_str(), &end, 10);
1151
1152            TrackInfo *info = &mTracks.editItemAt(trackIndex);
1153            info->mFirstSeqNumInSegment = seq;
1154            info->mNewSegment = true;
1155
1156            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1157
1158            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1159
1160            ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1161
1162            info->mNormalPlayTimeRTP = rtpTime;
1163            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1164
1165            if (!mFirstAccessUnit) {
1166                postNormalPlayTimeMapping(
1167                        trackIndex,
1168                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1169            }
1170
1171            ++n;
1172        }
1173
1174        mSeekable = true;
1175    }
1176
1177    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1178        CHECK_GE(index, 0u);
1179        CHECK_LT(index, mTracks.size());
1180
1181        const TrackInfo &info = mTracks.itemAt(index);
1182
1183        *timeScale = info.mTimeScale;
1184
1185        return info.mPacketSource->getFormat();
1186    }
1187
1188    size_t countTracks() const {
1189        return mTracks.size();
1190    }
1191
1192private:
1193    struct TrackInfo {
1194        AString mURL;
1195        int mRTPSocket;
1196        int mRTCPSocket;
1197        bool mUsingInterleavedTCP;
1198        uint32_t mFirstSeqNumInSegment;
1199        bool mNewSegment;
1200
1201        uint32_t mRTPAnchor;
1202        int64_t mNTPAnchorUs;
1203        int32_t mTimeScale;
1204
1205        uint32_t mNormalPlayTimeRTP;
1206        int64_t mNormalPlayTimeUs;
1207
1208        sp<APacketSource> mPacketSource;
1209
1210        // Stores packets temporarily while no notion of time
1211        // has been established yet.
1212        List<sp<ABuffer> > mPackets;
1213    };
1214
1215    sp<AMessage> mNotify;
1216    bool mUIDValid;
1217    uid_t mUID;
1218    sp<ALooper> mNetLooper;
1219    sp<ARTSPConnection> mConn;
1220    sp<ARTPConnection> mRTPConn;
1221    sp<ASessionDescription> mSessionDesc;
1222    AString mOriginalSessionURL;  // This one still has user:pass@
1223    AString mSessionURL;
1224    AString mSessionHost;
1225    AString mBaseURL;
1226    AString mSessionID;
1227    bool mSetupTracksSuccessful;
1228    bool mSeekPending;
1229    bool mFirstAccessUnit;
1230
1231    bool mAllTracksHaveTime;
1232    int64_t mNTPAnchorUs;
1233    int64_t mMediaAnchorUs;
1234    int64_t mLastMediaTimeUs;
1235
1236    int64_t mNumAccessUnitsReceived;
1237    bool mCheckPending;
1238    int32_t mCheckGeneration;
1239    bool mTryTCPInterleaving;
1240    bool mTryFakeRTCP;
1241    bool mReceivedFirstRTCPPacket;
1242    bool mReceivedFirstRTPPacket;
1243    bool mSeekable;
1244    int64_t mKeepAliveTimeoutUs;
1245    int32_t mKeepAliveGeneration;
1246
1247    Vector<TrackInfo> mTracks;
1248
1249    bool mPlayResponseParsed;
1250
1251    void setupTrack(size_t index) {
1252        sp<APacketSource> source =
1253            new APacketSource(mSessionDesc, index);
1254
1255        if (source->initCheck() != OK) {
1256            ALOGW("Unsupported format. Ignoring track #%d.", index);
1257
1258            sp<AMessage> reply = new AMessage('setu', id());
1259            reply->setSize("index", index);
1260            reply->setInt32("result", ERROR_UNSUPPORTED);
1261            reply->post();
1262            return;
1263        }
1264
1265        AString url;
1266        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1267
1268        AString trackURL;
1269        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1270
1271        mTracks.push(TrackInfo());
1272        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1273        info->mURL = trackURL;
1274        info->mPacketSource = source;
1275        info->mUsingInterleavedTCP = false;
1276        info->mFirstSeqNumInSegment = 0;
1277        info->mNewSegment = true;
1278        info->mRTPAnchor = 0;
1279        info->mNTPAnchorUs = -1;
1280        info->mNormalPlayTimeRTP = 0;
1281        info->mNormalPlayTimeUs = 0ll;
1282
1283        unsigned long PT;
1284        AString formatDesc;
1285        AString formatParams;
1286        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1287
1288        int32_t timescale;
1289        int32_t numChannels;
1290        ASessionDescription::ParseFormatDesc(
1291                formatDesc.c_str(), &timescale, &numChannels);
1292
1293        info->mTimeScale = timescale;
1294
1295        ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1296
1297        AString request = "SETUP ";
1298        request.append(trackURL);
1299        request.append(" RTSP/1.0\r\n");
1300
1301        if (mTryTCPInterleaving) {
1302            size_t interleaveIndex = 2 * (mTracks.size() - 1);
1303            info->mUsingInterleavedTCP = true;
1304            info->mRTPSocket = interleaveIndex;
1305            info->mRTCPSocket = interleaveIndex + 1;
1306
1307            request.append("Transport: RTP/AVP/TCP;interleaved=");
1308            request.append(interleaveIndex);
1309            request.append("-");
1310            request.append(interleaveIndex + 1);
1311        } else {
1312            unsigned rtpPort;
1313            ARTPConnection::MakePortPair(
1314                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1315
1316            if (mUIDValid) {
1317                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1318                                                (uint32_t)*(uint32_t*) "RTP_");
1319                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1320                                                (uint32_t)*(uint32_t*) "RTP_");
1321            }
1322
1323            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1324            request.append(rtpPort);
1325            request.append("-");
1326            request.append(rtpPort + 1);
1327        }
1328
1329        request.append("\r\n");
1330
1331        if (index > 1) {
1332            request.append("Session: ");
1333            request.append(mSessionID);
1334            request.append("\r\n");
1335        }
1336
1337        request.append("\r\n");
1338
1339        sp<AMessage> reply = new AMessage('setu', id());
1340        reply->setSize("index", index);
1341        reply->setSize("track-index", mTracks.size() - 1);
1342        mConn->sendRequest(request.c_str(), reply);
1343    }
1344
1345    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1346        out->clear();
1347
1348        if (strncasecmp("rtsp://", baseURL, 7)) {
1349            // Base URL must be absolute
1350            return false;
1351        }
1352
1353        if (!strncasecmp("rtsp://", url, 7)) {
1354            // "url" is already an absolute URL, ignore base URL.
1355            out->setTo(url);
1356            return true;
1357        }
1358
1359        size_t n = strlen(baseURL);
1360        if (baseURL[n - 1] == '/') {
1361            out->setTo(baseURL);
1362            out->append(url);
1363        } else {
1364            const char *slashPos = strrchr(baseURL, '/');
1365
1366            if (slashPos > &baseURL[6]) {
1367                out->setTo(baseURL, slashPos - baseURL);
1368            } else {
1369                out->setTo(baseURL);
1370            }
1371
1372            out->append("/");
1373            out->append(url);
1374        }
1375
1376        return true;
1377    }
1378
1379    void fakeTimestamps() {
1380        mNTPAnchorUs = -1ll;
1381        for (size_t i = 0; i < mTracks.size(); ++i) {
1382            onTimeUpdate(i, 0, 0ll);
1383        }
1384    }
1385
1386    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1387        ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1388             trackIndex, rtpTime, ntpTime);
1389
1390        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1391
1392        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1393
1394        track->mRTPAnchor = rtpTime;
1395        track->mNTPAnchorUs = ntpTimeUs;
1396
1397        if (mNTPAnchorUs < 0) {
1398            mNTPAnchorUs = ntpTimeUs;
1399            mMediaAnchorUs = mLastMediaTimeUs;
1400        }
1401
1402        if (!mAllTracksHaveTime) {
1403            bool allTracksHaveTime = true;
1404            for (size_t i = 0; i < mTracks.size(); ++i) {
1405                TrackInfo *track = &mTracks.editItemAt(i);
1406                if (track->mNTPAnchorUs < 0) {
1407                    allTracksHaveTime = false;
1408                    break;
1409                }
1410            }
1411            if (allTracksHaveTime) {
1412                mAllTracksHaveTime = true;
1413                ALOGI("Time now established for all tracks.");
1414            }
1415        }
1416    }
1417
1418    void onAccessUnitComplete(
1419            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1420        ALOGV("onAccessUnitComplete track %d", trackIndex);
1421
1422        if(!mPlayResponseParsed){
1423            ALOGI("play response is not parsed, storing accessunit");
1424            TrackInfo *track = &mTracks.editItemAt(trackIndex);
1425            track->mPackets.push_back(accessUnit);
1426            return;
1427        }
1428
1429        if (mFirstAccessUnit) {
1430            sp<AMessage> msg = mNotify->dup();
1431            msg->setInt32("what", kWhatConnected);
1432            msg->post();
1433
1434            if (mSeekable) {
1435                for (size_t i = 0; i < mTracks.size(); ++i) {
1436                    TrackInfo *info = &mTracks.editItemAt(i);
1437
1438                    postNormalPlayTimeMapping(
1439                            i,
1440                            info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1441                }
1442            }
1443
1444            mFirstAccessUnit = false;
1445        }
1446
1447        TrackInfo *track = &mTracks.editItemAt(trackIndex);
1448
1449        if (!mAllTracksHaveTime) {
1450            ALOGV("storing accessUnit, no time established yet");
1451            track->mPackets.push_back(accessUnit);
1452            return;
1453        }
1454
1455        while (!track->mPackets.empty()) {
1456            sp<ABuffer> accessUnit = *track->mPackets.begin();
1457            track->mPackets.erase(track->mPackets.begin());
1458
1459            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1460                postQueueAccessUnit(trackIndex, accessUnit);
1461            }
1462        }
1463
1464        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1465            postQueueAccessUnit(trackIndex, accessUnit);
1466        }
1467    }
1468
1469    bool addMediaTimestamp(
1470            int32_t trackIndex, const TrackInfo *track,
1471            const sp<ABuffer> &accessUnit) {
1472        uint32_t rtpTime;
1473        CHECK(accessUnit->meta()->findInt32(
1474                    "rtp-time", (int32_t *)&rtpTime));
1475
1476        int64_t relRtpTimeUs =
1477            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1478                / track->mTimeScale;
1479
1480        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1481
1482        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1483
1484        if (mediaTimeUs > mLastMediaTimeUs) {
1485            mLastMediaTimeUs = mediaTimeUs;
1486        }
1487
1488        if (mediaTimeUs < 0) {
1489            ALOGV("dropping early accessUnit.");
1490            return false;
1491        }
1492
1493        ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1494             trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1495
1496        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1497
1498        return true;
1499    }
1500
1501    void postQueueAccessUnit(
1502            size_t trackIndex, const sp<ABuffer> &accessUnit) {
1503        sp<AMessage> msg = mNotify->dup();
1504        msg->setInt32("what", kWhatAccessUnit);
1505        msg->setSize("trackIndex", trackIndex);
1506        msg->setBuffer("accessUnit", accessUnit);
1507        msg->post();
1508    }
1509
1510    void postQueueEOS(size_t trackIndex, status_t finalResult) {
1511        sp<AMessage> msg = mNotify->dup();
1512        msg->setInt32("what", kWhatEOS);
1513        msg->setSize("trackIndex", trackIndex);
1514        msg->setInt32("finalResult", finalResult);
1515        msg->post();
1516    }
1517
1518    void postQueueSeekDiscontinuity(size_t trackIndex) {
1519        sp<AMessage> msg = mNotify->dup();
1520        msg->setInt32("what", kWhatSeekDiscontinuity);
1521        msg->setSize("trackIndex", trackIndex);
1522        msg->post();
1523    }
1524
1525    void postNormalPlayTimeMapping(
1526            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1527        sp<AMessage> msg = mNotify->dup();
1528        msg->setInt32("what", kWhatNormalPlayTimeMapping);
1529        msg->setSize("trackIndex", trackIndex);
1530        msg->setInt32("rtpTime", rtpTime);
1531        msg->setInt64("nptUs", nptUs);
1532        msg->post();
1533    }
1534
1535    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1536};
1537
1538}  // namespace android
1539
1540#endif  // MY_HANDLER_H_
1541