1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "StreamingSource"
19#include <utils/Log.h>
20
21#include "StreamingSource.h"
22
23#include "ATSParser.h"
24#include "AnotherPacketSource.h"
25#include "NuPlayerStreamListener.h"
26
27#include <media/stagefright/foundation/ABuffer.h>
28#include <media/stagefright/foundation/ADebug.h>
29#include <media/stagefright/foundation/AMessage.h>
30#include <media/stagefright/MediaSource.h>
31#include <media/stagefright/MetaData.h>
32
33namespace android {
34
35NuPlayer::StreamingSource::StreamingSource(
36        const sp<AMessage> &notify,
37        const sp<IStreamSource> &source)
38    : Source(notify),
39      mSource(source),
40      mFinalResult(OK),
41      mBuffering(false) {
42}
43
44NuPlayer::StreamingSource::~StreamingSource() {
45    if (mLooper != NULL) {
46        mLooper->unregisterHandler(id());
47        mLooper->stop();
48    }
49}
50
51void NuPlayer::StreamingSource::prepareAsync() {
52    if (mLooper == NULL) {
53        mLooper = new ALooper;
54        mLooper->setName("streaming");
55        mLooper->start();
56
57        mLooper->registerHandler(this);
58    }
59
60    notifyVideoSizeChanged();
61    notifyFlagsChanged(0);
62    notifyPrepared();
63}
64
65void NuPlayer::StreamingSource::start() {
66    mStreamListener = new NuPlayerStreamListener(mSource, 0);
67
68    uint32_t sourceFlags = mSource->flags();
69
70    uint32_t parserFlags = ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE;
71    if (sourceFlags & IStreamSource::kFlagAlignedVideoData) {
72        parserFlags |= ATSParser::ALIGNED_VIDEO_DATA;
73    }
74
75    mTSParser = new ATSParser(parserFlags);
76
77    mStreamListener->start();
78
79    postReadBuffer();
80}
81
82status_t NuPlayer::StreamingSource::feedMoreTSData() {
83    return postReadBuffer();
84}
85
86void NuPlayer::StreamingSource::onReadBuffer() {
87    for (int32_t i = 0; i < 50; ++i) {
88        char buffer[188];
89        sp<AMessage> extra;
90        ssize_t n = mStreamListener->read(buffer, sizeof(buffer), &extra);
91
92        if (n == 0) {
93            ALOGI("input data EOS reached.");
94            mTSParser->signalEOS(ERROR_END_OF_STREAM);
95            setError(ERROR_END_OF_STREAM);
96            break;
97        } else if (n == INFO_DISCONTINUITY) {
98            int32_t type = ATSParser::DISCONTINUITY_TIME;
99
100            int32_t mask;
101            if (extra != NULL
102                    && extra->findInt32(
103                        IStreamListener::kKeyDiscontinuityMask, &mask)) {
104                if (mask == 0) {
105                    ALOGE("Client specified an illegal discontinuity type.");
106                    setError(ERROR_UNSUPPORTED);
107                    break;
108                }
109
110                type = mask;
111            }
112
113            mTSParser->signalDiscontinuity(
114                    (ATSParser::DiscontinuityType)type, extra);
115        } else if (n < 0) {
116            break;
117        } else {
118            if (buffer[0] == 0x00) {
119                // XXX legacy
120
121                if (extra == NULL) {
122                    extra = new AMessage;
123                }
124
125                uint8_t type = buffer[1];
126
127                if (type & 2) {
128                    int64_t mediaTimeUs;
129                    memcpy(&mediaTimeUs, &buffer[2], sizeof(mediaTimeUs));
130
131                    extra->setInt64(IStreamListener::kKeyMediaTimeUs, mediaTimeUs);
132                }
133
134                mTSParser->signalDiscontinuity(
135                        ((type & 1) == 0)
136                            ? ATSParser::DISCONTINUITY_TIME
137                            : ATSParser::DISCONTINUITY_FORMATCHANGE,
138                        extra);
139            } else {
140                status_t err = mTSParser->feedTSPacket(buffer, sizeof(buffer));
141
142                if (err != OK) {
143                    ALOGE("TS Parser returned error %d", err);
144
145                    mTSParser->signalEOS(err);
146                    setError(err);
147                    break;
148                }
149            }
150        }
151    }
152}
153
154status_t NuPlayer::StreamingSource::postReadBuffer() {
155    {
156        Mutex::Autolock _l(mBufferingLock);
157        if (mFinalResult != OK) {
158            return mFinalResult;
159        }
160        if (mBuffering) {
161            return OK;
162        }
163        mBuffering = true;
164    }
165
166    (new AMessage(kWhatReadBuffer, id()))->post();
167    return OK;
168}
169
170bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() {
171    // We're going to buffer at least 2 secs worth data on all tracks before
172    // starting playback (both at startup and after a seek).
173
174    static const int64_t kMinDurationUs = 2000000ll;
175
176    sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/);
177    sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/);
178
179    status_t err;
180    int64_t durationUs;
181    if (audioTrack != NULL
182            && (durationUs = audioTrack->getBufferedDurationUs(&err))
183                    < kMinDurationUs
184            && err == OK) {
185        ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
186              durationUs / 1E6);
187        return false;
188    }
189
190    if (videoTrack != NULL
191            && (durationUs = videoTrack->getBufferedDurationUs(&err))
192                    < kMinDurationUs
193            && err == OK) {
194        ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
195              durationUs / 1E6);
196        return false;
197    }
198
199    return true;
200}
201
202void NuPlayer::StreamingSource::setError(status_t err) {
203    Mutex::Autolock _l(mBufferingLock);
204    mFinalResult = err;
205}
206
207sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) {
208    if (mTSParser == NULL) {
209        return NULL;
210    }
211
212    sp<MediaSource> source = mTSParser->getSource(
213            audio ? ATSParser::AUDIO : ATSParser::VIDEO);
214
215    return static_cast<AnotherPacketSource *>(source.get());
216}
217
218sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {
219    sp<AnotherPacketSource> source = getSource(audio);
220
221    if (source == NULL) {
222        return NULL;
223    }
224
225    return source->getFormat();
226}
227
228status_t NuPlayer::StreamingSource::dequeueAccessUnit(
229        bool audio, sp<ABuffer> *accessUnit) {
230    sp<AnotherPacketSource> source = getSource(audio);
231
232    if (source == NULL) {
233        return -EWOULDBLOCK;
234    }
235
236    if (!haveSufficientDataOnAllTracks()) {
237        postReadBuffer();
238    }
239
240    status_t finalResult;
241    if (!source->hasBufferAvailable(&finalResult)) {
242        return finalResult == OK ? -EWOULDBLOCK : finalResult;
243    }
244
245    status_t err = source->dequeueAccessUnit(accessUnit);
246
247#if !defined(LOG_NDEBUG) || LOG_NDEBUG == 0
248    if (err == OK) {
249        int64_t timeUs;
250        CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
251        ALOGV("dequeueAccessUnit timeUs=%lld us", timeUs);
252    }
253#endif
254
255    return err;
256}
257
258bool NuPlayer::StreamingSource::isRealTime() const {
259    return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
260}
261
262void NuPlayer::StreamingSource::onMessageReceived(
263        const sp<AMessage> &msg) {
264    switch (msg->what()) {
265        case kWhatReadBuffer:
266        {
267            onReadBuffer();
268
269            {
270                Mutex::Autolock _l(mBufferingLock);
271                mBuffering = false;
272            }
273            break;
274        }
275        default:
276        {
277            TRESPASS();
278        }
279    }
280}
281
282
283}  // namespace android
284
285