StreamingSource.cpp revision 8cf4ced8d25e9b1b56b69b544339acc1550e4038
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "StreamingSource"
19#include <utils/Log.h>
20
21#include "StreamingSource.h"
22
23#include "ATSParser.h"
24#include "AnotherPacketSource.h"
25#include "NuPlayerStreamListener.h"
26
27#include <media/stagefright/foundation/ABuffer.h>
28#include <media/stagefright/foundation/ADebug.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/MediaSource.h>
31#include <media/stagefright/MetaData.h>
32
33namespace android {
34
35const int32_t kNumListenerQueuePackets = 80;
36
37NuPlayer::StreamingSource::StreamingSource(
38        const sp<AMessage> &notify,
39        const sp<IStreamSource> &source)
40    : Source(notify),
41      mSource(source),
42      mFinalResult(OK),
43      mBuffering(false) {
44}
45
46NuPlayer::StreamingSource::~StreamingSource() {
47    if (mLooper != NULL) {
48        mLooper->unregisterHandler(id());
49        mLooper->stop();
50    }
51}
52
53void NuPlayer::StreamingSource::prepareAsync() {
54    if (mLooper == NULL) {
55        mLooper = new ALooper;
56        mLooper->setName("streaming");
57        mLooper->start();
58
59        mLooper->registerHandler(this);
60    }
61
62    notifyVideoSizeChanged();
63    notifyFlagsChanged(0);
64    notifyPrepared();
65}
66
67void NuPlayer::StreamingSource::start() {
68    mStreamListener = new NuPlayerStreamListener(mSource, NULL);
69
70    uint32_t sourceFlags = mSource->flags();
71
72    uint32_t parserFlags = ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE;
73    if (sourceFlags & IStreamSource::kFlagAlignedVideoData) {
74        parserFlags |= ATSParser::ALIGNED_VIDEO_DATA;
75    }
76
77    mTSParser = new ATSParser(parserFlags);
78
79    mStreamListener->start();
80
81    postReadBuffer();
82}
83
84status_t NuPlayer::StreamingSource::feedMoreTSData() {
85    return postReadBuffer();
86}
87
88void NuPlayer::StreamingSource::onReadBuffer() {
89    for (int32_t i = 0; i < kNumListenerQueuePackets; ++i) {
90        char buffer[188];
91        sp<AMessage> extra;
92        ssize_t n = mStreamListener->read(buffer, sizeof(buffer), &extra);
93
94        if (n == 0) {
95            ALOGI("input data EOS reached.");
96            mTSParser->signalEOS(ERROR_END_OF_STREAM);
97            setError(ERROR_END_OF_STREAM);
98            break;
99        } else if (n == INFO_DISCONTINUITY) {
100            int32_t type = ATSParser::DISCONTINUITY_TIME;
101
102            int32_t mask;
103            if (extra != NULL
104                    && extra->findInt32(
105                        IStreamListener::kKeyDiscontinuityMask, &mask)) {
106                if (mask == 0) {
107                    ALOGE("Client specified an illegal discontinuity type.");
108                    setError(ERROR_UNSUPPORTED);
109                    break;
110                }
111
112                type = mask;
113            }
114
115            mTSParser->signalDiscontinuity(
116                    (ATSParser::DiscontinuityType)type, extra);
117        } else if (n < 0) {
118            break;
119        } else {
120            if (buffer[0] == 0x00) {
121                // XXX legacy
122
123                if (extra == NULL) {
124                    extra = new AMessage;
125                }
126
127                uint8_t type = buffer[1];
128
129                if (type & 2) {
130                    int64_t mediaTimeUs;
131                    memcpy(&mediaTimeUs, &buffer[2], sizeof(mediaTimeUs));
132
133                    extra->setInt64(IStreamListener::kKeyMediaTimeUs, mediaTimeUs);
134                }
135
136                mTSParser->signalDiscontinuity(
137                        ((type & 1) == 0)
138                            ? ATSParser::DISCONTINUITY_TIME
139                            : ATSParser::DISCONTINUITY_FORMATCHANGE,
140                        extra);
141            } else {
142                status_t err = mTSParser->feedTSPacket(buffer, sizeof(buffer));
143
144                if (err != OK) {
145                    ALOGE("TS Parser returned error %d", err);
146
147                    mTSParser->signalEOS(err);
148                    setError(err);
149                    break;
150                }
151            }
152        }
153    }
154}
155
156status_t NuPlayer::StreamingSource::postReadBuffer() {
157    {
158        Mutex::Autolock _l(mBufferingLock);
159        if (mFinalResult != OK) {
160            return mFinalResult;
161        }
162        if (mBuffering) {
163            return OK;
164        }
165        mBuffering = true;
166    }
167
168    (new AMessage(kWhatReadBuffer, this))->post();
169    return OK;
170}
171
172bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() {
173    // We're going to buffer at least 2 secs worth data on all tracks before
174    // starting playback (both at startup and after a seek).
175
176    static const int64_t kMinDurationUs = 2000000ll;
177
178    sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/);
179    sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/);
180
181    status_t err;
182    int64_t durationUs;
183    if (audioTrack != NULL
184            && (durationUs = audioTrack->getBufferedDurationUs(&err))
185                    < kMinDurationUs
186            && err == OK) {
187        ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
188              durationUs / 1E6);
189        return false;
190    }
191
192    if (videoTrack != NULL
193            && (durationUs = videoTrack->getBufferedDurationUs(&err))
194                    < kMinDurationUs
195            && err == OK) {
196        ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
197              durationUs / 1E6);
198        return false;
199    }
200
201    return true;
202}
203
204void NuPlayer::StreamingSource::setError(status_t err) {
205    Mutex::Autolock _l(mBufferingLock);
206    mFinalResult = err;
207}
208
209sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) {
210    if (mTSParser == NULL) {
211        return NULL;
212    }
213
214    sp<MediaSource> source = mTSParser->getSource(
215            audio ? ATSParser::AUDIO : ATSParser::VIDEO);
216
217    return static_cast<AnotherPacketSource *>(source.get());
218}
219
220sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {
221    sp<AnotherPacketSource> source = getSource(audio);
222
223    if (source == NULL) {
224        return NULL;
225    }
226
227    return source->getFormat();
228}
229
230status_t NuPlayer::StreamingSource::dequeueAccessUnit(
231        bool audio, sp<ABuffer> *accessUnit) {
232    sp<AnotherPacketSource> source = getSource(audio);
233
234    if (source == NULL) {
235        return -EWOULDBLOCK;
236    }
237
238    if (!haveSufficientDataOnAllTracks()) {
239        postReadBuffer();
240    }
241
242    status_t finalResult;
243    if (!source->hasBufferAvailable(&finalResult)) {
244        return finalResult == OK ? -EWOULDBLOCK : finalResult;
245    }
246
247    status_t err = source->dequeueAccessUnit(accessUnit);
248
249#if !defined(LOG_NDEBUG) || LOG_NDEBUG == 0
250    if (err == OK) {
251        int64_t timeUs;
252        CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
253        ALOGV("dequeueAccessUnit timeUs=%lld us", timeUs);
254    }
255#endif
256
257    return err;
258}
259
260bool NuPlayer::StreamingSource::isRealTime() const {
261    return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
262}
263
264void NuPlayer::StreamingSource::onMessageReceived(
265        const sp<AMessage> &msg) {
266    switch (msg->what()) {
267        case kWhatReadBuffer:
268        {
269            onReadBuffer();
270
271            {
272                Mutex::Autolock _l(mBufferingLock);
273                mBuffering = false;
274            }
275            break;
276        }
277        default:
278        {
279            TRESPASS();
280        }
281    }
282}
283
284
285}  // namespace android
286
287