StreamingSource.cpp revision 093024bfa271988655327e0fb761b581afa8bc11
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "StreamingSource"
19#include <utils/Log.h>
20
21#include "StreamingSource.h"
22
23#include "ATSParser.h"
24#include "AnotherPacketSource.h"
25#include "NuPlayerStreamListener.h"
26
27#include <media/stagefright/foundation/ABuffer.h>
28#include <media/stagefright/foundation/ADebug.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/MediaSource.h>
31#include <media/stagefright/MetaData.h>
32#include <media/stagefright/Utils.h>
33
34namespace android {
35
36const int32_t kNumListenerQueuePackets = 80;
37
38NuPlayer::StreamingSource::StreamingSource(
39        const sp<AMessage> &notify,
40        const sp<IStreamSource> &source)
41    : Source(notify),
42      mSource(source),
43      mFinalResult(OK),
44      mBuffering(false) {
45}
46
47NuPlayer::StreamingSource::~StreamingSource() {
48    if (mLooper != NULL) {
49        mLooper->unregisterHandler(id());
50        mLooper->stop();
51    }
52}
53
54void NuPlayer::StreamingSource::prepareAsync() {
55    if (mLooper == NULL) {
56        mLooper = new ALooper;
57        mLooper->setName("streaming");
58        mLooper->start();
59
60        mLooper->registerHandler(this);
61    }
62
63    notifyVideoSizeChanged();
64    notifyFlagsChanged(0);
65    notifyPrepared();
66}
67
68void NuPlayer::StreamingSource::start() {
69    mStreamListener = new NuPlayerStreamListener(mSource, NULL);
70
71    uint32_t sourceFlags = mSource->flags();
72
73    uint32_t parserFlags = ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE;
74    if (sourceFlags & IStreamSource::kFlagAlignedVideoData) {
75        parserFlags |= ATSParser::ALIGNED_VIDEO_DATA;
76    }
77
78    mTSParser = new ATSParser(parserFlags);
79
80    mStreamListener->start();
81
82    postReadBuffer();
83}
84
85status_t NuPlayer::StreamingSource::feedMoreTSData() {
86    return postReadBuffer();
87}
88
89void NuPlayer::StreamingSource::onReadBuffer() {
90    for (int32_t i = 0; i < kNumListenerQueuePackets; ++i) {
91        char buffer[188];
92        sp<AMessage> extra;
93        ssize_t n = mStreamListener->read(buffer, sizeof(buffer), &extra);
94
95        if (n == 0) {
96            ALOGI("input data EOS reached.");
97            mTSParser->signalEOS(ERROR_END_OF_STREAM);
98            setError(ERROR_END_OF_STREAM);
99            break;
100        } else if (n == INFO_DISCONTINUITY) {
101            int32_t type = ATSParser::DISCONTINUITY_TIME;
102
103            int32_t mask;
104            if (extra != NULL
105                    && extra->findInt32(
106                        IStreamListener::kKeyDiscontinuityMask, &mask)) {
107                if (mask == 0) {
108                    ALOGE("Client specified an illegal discontinuity type.");
109                    setError(ERROR_UNSUPPORTED);
110                    break;
111                }
112
113                type = mask;
114            }
115
116            mTSParser->signalDiscontinuity(
117                    (ATSParser::DiscontinuityType)type, extra);
118        } else if (n < 0) {
119            break;
120        } else {
121            if (buffer[0] == 0x00) {
122                // XXX legacy
123
124                if (extra == NULL) {
125                    extra = new AMessage;
126                }
127
128                uint8_t type = buffer[1];
129
130                if (type & 2) {
131                    int64_t mediaTimeUs;
132                    memcpy(&mediaTimeUs, &buffer[2], sizeof(mediaTimeUs));
133
134                    extra->setInt64(IStreamListener::kKeyMediaTimeUs, mediaTimeUs);
135                }
136
137                mTSParser->signalDiscontinuity(
138                        ((type & 1) == 0)
139                            ? ATSParser::DISCONTINUITY_TIME
140                            : ATSParser::DISCONTINUITY_FORMATCHANGE,
141                        extra);
142            } else {
143                status_t err = mTSParser->feedTSPacket(buffer, sizeof(buffer));
144
145                if (err != OK) {
146                    ALOGE("TS Parser returned error %d", err);
147
148                    mTSParser->signalEOS(err);
149                    setError(err);
150                    break;
151                }
152            }
153        }
154    }
155}
156
157status_t NuPlayer::StreamingSource::postReadBuffer() {
158    {
159        Mutex::Autolock _l(mBufferingLock);
160        if (mFinalResult != OK) {
161            return mFinalResult;
162        }
163        if (mBuffering) {
164            return OK;
165        }
166        mBuffering = true;
167    }
168
169    (new AMessage(kWhatReadBuffer, this))->post();
170    return OK;
171}
172
173bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() {
174    // We're going to buffer at least 2 secs worth data on all tracks before
175    // starting playback (both at startup and after a seek).
176
177    static const int64_t kMinDurationUs = 2000000ll;
178
179    sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/);
180    sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/);
181
182    status_t err;
183    int64_t durationUs;
184    if (audioTrack != NULL
185            && (durationUs = audioTrack->getBufferedDurationUs(&err))
186                    < kMinDurationUs
187            && err == OK) {
188        ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
189              durationUs / 1E6);
190        return false;
191    }
192
193    if (videoTrack != NULL
194            && (durationUs = videoTrack->getBufferedDurationUs(&err))
195                    < kMinDurationUs
196            && err == OK) {
197        ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
198              durationUs / 1E6);
199        return false;
200    }
201
202    return true;
203}
204
205void NuPlayer::StreamingSource::setError(status_t err) {
206    Mutex::Autolock _l(mBufferingLock);
207    mFinalResult = err;
208}
209
210sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) {
211    if (mTSParser == NULL) {
212        return NULL;
213    }
214
215    sp<MediaSource> source = mTSParser->getSource(
216            audio ? ATSParser::AUDIO : ATSParser::VIDEO);
217
218    return static_cast<AnotherPacketSource *>(source.get());
219}
220
221sp<AMessage> NuPlayer::StreamingSource::getFormat(bool audio) {
222    sp<AnotherPacketSource> source = getSource(audio);
223
224    sp<AMessage> format = new AMessage;
225    if (source == NULL) {
226        format->setInt32("err", -EWOULDBLOCK);
227        return format;
228    }
229
230    sp<MetaData> meta = source->getFormat();
231    status_t err = convertMetaDataToMessage(meta, &format);
232    if (err != OK) {
233        format->setInt32("err", err);
234    }
235    return format;
236}
237
238status_t NuPlayer::StreamingSource::dequeueAccessUnit(
239        bool audio, sp<ABuffer> *accessUnit) {
240    sp<AnotherPacketSource> source = getSource(audio);
241
242    if (source == NULL) {
243        return -EWOULDBLOCK;
244    }
245
246    if (!haveSufficientDataOnAllTracks()) {
247        postReadBuffer();
248    }
249
250    status_t finalResult;
251    if (!source->hasBufferAvailable(&finalResult)) {
252        return finalResult == OK ? -EWOULDBLOCK : finalResult;
253    }
254
255    status_t err = source->dequeueAccessUnit(accessUnit);
256
257#if !defined(LOG_NDEBUG) || LOG_NDEBUG == 0
258    if (err == OK) {
259        int64_t timeUs;
260        CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
261        ALOGV("dequeueAccessUnit timeUs=%lld us", timeUs);
262    }
263#endif
264
265    return err;
266}
267
268bool NuPlayer::StreamingSource::isRealTime() const {
269    return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
270}
271
272void NuPlayer::StreamingSource::onMessageReceived(
273        const sp<AMessage> &msg) {
274    switch (msg->what()) {
275        case kWhatReadBuffer:
276        {
277            onReadBuffer();
278
279            {
280                Mutex::Autolock _l(mBufferingLock);
281                mBuffering = false;
282            }
283            break;
284        }
285        default:
286        {
287            TRESPASS();
288        }
289    }
290}
291
292
293}  // namespace android
294
295