StreamingSource.cpp revision 48fa06d1e80a872c7495804979256e021e566ae0
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "StreamingSource"
19#include <utils/Log.h>
20
21#include "StreamingSource.h"
22
23#include "ATSParser.h"
24#include "AnotherPacketSource.h"
25#include "NuPlayerStreamListener.h"
26
27#include <media/stagefright/foundation/ABuffer.h>
28#include <media/stagefright/foundation/ADebug.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/MediaSource.h>
31#include <media/stagefright/MetaData.h>
32#include <media/stagefright/Utils.h>
33
34namespace android {
35
36const int32_t kNumListenerQueuePackets = 80;
37
38NuPlayer::StreamingSource::StreamingSource(
39        const sp<AMessage> &notify,
40        const sp<IStreamSource> &source)
41    : Source(notify),
42      mSource(source),
43      mFinalResult(OK),
44      mBuffering(false) {
45}
46
47NuPlayer::StreamingSource::~StreamingSource() {
48    if (mLooper != NULL) {
49        mLooper->unregisterHandler(id());
50        mLooper->stop();
51    }
52}
53
54status_t NuPlayer::StreamingSource::getDefaultBufferingSettings(
55        BufferingSettings *buffering /* nonnull */) {
56    *buffering = BufferingSettings();
57    return OK;
58}
59
60status_t NuPlayer::StreamingSource::setBufferingSettings(
61        const BufferingSettings &buffering) {
62    if (buffering.mInitialBufferingMode != BUFFERING_MODE_NONE
63            || buffering.mRebufferingMode != BUFFERING_MODE_NONE) {
64        return BAD_VALUE;
65    }
66
67    return OK;
68}
69
70void NuPlayer::StreamingSource::prepareAsync() {
71    if (mLooper == NULL) {
72        mLooper = new ALooper;
73        mLooper->setName("streaming");
74        mLooper->start();
75
76        mLooper->registerHandler(this);
77    }
78
79    notifyVideoSizeChanged();
80    notifyFlagsChanged(0);
81    notifyPrepared();
82}
83
84void NuPlayer::StreamingSource::start() {
85    mStreamListener = new NuPlayerStreamListener(mSource, NULL);
86
87    uint32_t sourceFlags = mSource->flags();
88
89    uint32_t parserFlags = ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE;
90    if (sourceFlags & IStreamSource::kFlagAlignedVideoData) {
91        parserFlags |= ATSParser::ALIGNED_VIDEO_DATA;
92    }
93
94    mTSParser = new ATSParser(parserFlags);
95
96    mStreamListener->start();
97
98    postReadBuffer();
99}
100
101status_t NuPlayer::StreamingSource::feedMoreTSData() {
102    return postReadBuffer();
103}
104
105void NuPlayer::StreamingSource::onReadBuffer() {
106    for (int32_t i = 0; i < kNumListenerQueuePackets; ++i) {
107        char buffer[188];
108        sp<AMessage> extra;
109        ssize_t n = mStreamListener->read(buffer, sizeof(buffer), &extra);
110
111        if (n == 0) {
112            ALOGI("input data EOS reached.");
113            mTSParser->signalEOS(ERROR_END_OF_STREAM);
114            setError(ERROR_END_OF_STREAM);
115            break;
116        } else if (n == INFO_DISCONTINUITY) {
117            int32_t type = ATSParser::DISCONTINUITY_TIME;
118
119            int32_t mask;
120            if (extra != NULL
121                    && extra->findInt32(
122                        IStreamListener::kKeyDiscontinuityMask, &mask)) {
123                if (mask == 0) {
124                    ALOGE("Client specified an illegal discontinuity type.");
125                    setError(ERROR_UNSUPPORTED);
126                    break;
127                }
128
129                type = mask;
130            }
131
132            mTSParser->signalDiscontinuity(
133                    (ATSParser::DiscontinuityType)type, extra);
134        } else if (n < 0) {
135            break;
136        } else {
137            if (buffer[0] == 0x00) {
138                // XXX legacy
139
140                if (extra == NULL) {
141                    extra = new AMessage;
142                }
143
144                uint8_t type = buffer[1];
145
146                if (type & 2) {
147                    int64_t mediaTimeUs;
148                    memcpy(&mediaTimeUs, &buffer[2], sizeof(mediaTimeUs));
149
150                    extra->setInt64(IStreamListener::kKeyMediaTimeUs, mediaTimeUs);
151                }
152
153                mTSParser->signalDiscontinuity(
154                        ((type & 1) == 0)
155                            ? ATSParser::DISCONTINUITY_TIME
156                            : ATSParser::DISCONTINUITY_FORMATCHANGE,
157                        extra);
158            } else {
159                status_t err = mTSParser->feedTSPacket(buffer, sizeof(buffer));
160
161                if (err != OK) {
162                    ALOGE("TS Parser returned error %d", err);
163
164                    mTSParser->signalEOS(err);
165                    setError(err);
166                    break;
167                }
168            }
169        }
170    }
171}
172
173status_t NuPlayer::StreamingSource::postReadBuffer() {
174    {
175        Mutex::Autolock _l(mBufferingLock);
176        if (mFinalResult != OK) {
177            return mFinalResult;
178        }
179        if (mBuffering) {
180            return OK;
181        }
182        mBuffering = true;
183    }
184
185    (new AMessage(kWhatReadBuffer, this))->post();
186    return OK;
187}
188
189bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() {
190    // We're going to buffer at least 2 secs worth data on all tracks before
191    // starting playback (both at startup and after a seek).
192
193    static const int64_t kMinDurationUs = 2000000ll;
194
195    sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/);
196    sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/);
197
198    status_t err;
199    int64_t durationUs;
200    if (audioTrack != NULL
201            && (durationUs = audioTrack->getBufferedDurationUs(&err))
202                    < kMinDurationUs
203            && err == OK) {
204        ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
205              durationUs / 1E6);
206        return false;
207    }
208
209    if (videoTrack != NULL
210            && (durationUs = videoTrack->getBufferedDurationUs(&err))
211                    < kMinDurationUs
212            && err == OK) {
213        ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
214              durationUs / 1E6);
215        return false;
216    }
217
218    return true;
219}
220
221void NuPlayer::StreamingSource::setError(status_t err) {
222    Mutex::Autolock _l(mBufferingLock);
223    mFinalResult = err;
224}
225
226sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) {
227    if (mTSParser == NULL) {
228        return NULL;
229    }
230
231    sp<MediaSource> source = mTSParser->getSource(
232            audio ? ATSParser::AUDIO : ATSParser::VIDEO);
233
234    return static_cast<AnotherPacketSource *>(source.get());
235}
236
237sp<AMessage> NuPlayer::StreamingSource::getFormat(bool audio) {
238    sp<AnotherPacketSource> source = getSource(audio);
239
240    sp<AMessage> format = new AMessage;
241    if (source == NULL) {
242        format->setInt32("err", -EWOULDBLOCK);
243        return format;
244    }
245
246    sp<MetaData> meta = source->getFormat();
247    if (meta == NULL) {
248        format->setInt32("err", -EWOULDBLOCK);
249        return format;
250    }
251    status_t err = convertMetaDataToMessage(meta, &format);
252    if (err != OK) { // format may have been cleared on error
253        format = new AMessage;
254        format->setInt32("err", err);
255    }
256    return format;
257}
258
259status_t NuPlayer::StreamingSource::dequeueAccessUnit(
260        bool audio, sp<ABuffer> *accessUnit) {
261    sp<AnotherPacketSource> source = getSource(audio);
262
263    if (source == NULL) {
264        return -EWOULDBLOCK;
265    }
266
267    if (!haveSufficientDataOnAllTracks()) {
268        postReadBuffer();
269    }
270
271    status_t finalResult;
272    if (!source->hasBufferAvailable(&finalResult)) {
273        return finalResult == OK ? -EWOULDBLOCK : finalResult;
274    }
275
276    status_t err = source->dequeueAccessUnit(accessUnit);
277
278#if !defined(LOG_NDEBUG) || LOG_NDEBUG == 0
279    if (err == OK) {
280        int64_t timeUs;
281        CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
282        ALOGV("dequeueAccessUnit timeUs=%lld us", timeUs);
283    }
284#endif
285
286    return err;
287}
288
289bool NuPlayer::StreamingSource::isRealTime() const {
290    return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
291}
292
293void NuPlayer::StreamingSource::onMessageReceived(
294        const sp<AMessage> &msg) {
295    switch (msg->what()) {
296        case kWhatReadBuffer:
297        {
298            onReadBuffer();
299
300            {
301                Mutex::Autolock _l(mBufferingLock);
302                mBuffering = false;
303            }
304            break;
305        }
306        default:
307        {
308            TRESPASS();
309        }
310    }
311}
312
313
314}  // namespace android
315
316