StreamingSource.cpp revision 9737d3497f6ef8bf6a1083aabd8a03569944795d
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "StreamingSource" 19#include <utils/Log.h> 20 21#include "StreamingSource.h" 22 23#include "ATSParser.h" 24#include "AnotherPacketSource.h" 25#include "NuPlayerStreamListener.h" 26 27#include <media/stagefright/foundation/ABuffer.h> 28#include <media/stagefright/foundation/ADebug.h> 29#include <media/stagefright/foundation/AMessage.h> 30#include <media/stagefright/MediaSource.h> 31#include <media/stagefright/MetaData.h> 32#include <media/stagefright/Utils.h> 33 34namespace android { 35 36const int32_t kNumListenerQueuePackets = 80; 37 38NuPlayer::StreamingSource::StreamingSource( 39 const sp<AMessage> ¬ify, 40 const sp<IStreamSource> &source) 41 : Source(notify), 42 mSource(source), 43 mFinalResult(OK), 44 mBuffering(false) { 45} 46 47NuPlayer::StreamingSource::~StreamingSource() { 48 if (mLooper != NULL) { 49 mLooper->unregisterHandler(id()); 50 mLooper->stop(); 51 } 52} 53 54void NuPlayer::StreamingSource::prepareAsync() { 55 if (mLooper == NULL) { 56 mLooper = new ALooper; 57 mLooper->setName("streaming"); 58 mLooper->start(); 59 60 mLooper->registerHandler(this); 61 } 62 63 notifyVideoSizeChanged(); 64 notifyFlagsChanged(0); 65 notifyPrepared(); 66} 67 68void NuPlayer::StreamingSource::start() { 69 mStreamListener = new NuPlayerStreamListener(mSource, NULL); 70 71 uint32_t sourceFlags = mSource->flags(); 72 73 uint32_t parserFlags = ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE; 74 if (sourceFlags & IStreamSource::kFlagAlignedVideoData) { 75 parserFlags |= ATSParser::ALIGNED_VIDEO_DATA; 76 } 77 78 mTSParser = new ATSParser(parserFlags); 79 80 mStreamListener->start(); 81 82 postReadBuffer(); 83} 84 85status_t NuPlayer::StreamingSource::feedMoreTSData() { 86 return postReadBuffer(); 87} 88 89void NuPlayer::StreamingSource::onReadBuffer() { 90 for (int32_t i = 0; i < kNumListenerQueuePackets; ++i) { 91 char buffer[188]; 92 sp<AMessage> extra; 93 ssize_t n = mStreamListener->read(buffer, sizeof(buffer), &extra); 94 95 if (n == 0) { 96 ALOGI("input data EOS reached."); 97 mTSParser->signalEOS(ERROR_END_OF_STREAM); 98 setError(ERROR_END_OF_STREAM); 99 break; 100 } else if (n == INFO_DISCONTINUITY) { 101 int32_t type = ATSParser::DISCONTINUITY_TIME; 102 103 int32_t mask; 104 if (extra != NULL 105 && extra->findInt32( 106 IStreamListener::kKeyDiscontinuityMask, &mask)) { 107 if (mask == 0) { 108 ALOGE("Client specified an illegal discontinuity type."); 109 setError(ERROR_UNSUPPORTED); 110 break; 111 } 112 113 type = mask; 114 } 115 116 mTSParser->signalDiscontinuity( 117 (ATSParser::DiscontinuityType)type, extra); 118 } else if (n < 0) { 119 break; 120 } else { 121 if (buffer[0] == 0x00) { 122 // XXX legacy 123 124 if (extra == NULL) { 125 extra = new AMessage; 126 } 127 128 uint8_t type = buffer[1]; 129 130 if (type & 2) { 131 int64_t mediaTimeUs; 132 memcpy(&mediaTimeUs, &buffer[2], sizeof(mediaTimeUs)); 133 134 extra->setInt64(IStreamListener::kKeyMediaTimeUs, mediaTimeUs); 135 } 136 137 mTSParser->signalDiscontinuity( 138 ((type & 1) == 0) 139 ? ATSParser::DISCONTINUITY_TIME 140 : ATSParser::DISCONTINUITY_FORMATCHANGE, 141 extra); 142 } else { 143 status_t err = mTSParser->feedTSPacket(buffer, sizeof(buffer)); 144 145 if (err != OK) { 146 ALOGE("TS Parser returned error %d", err); 147 148 mTSParser->signalEOS(err); 149 setError(err); 150 break; 151 } 152 } 153 } 154 } 155} 156 157status_t NuPlayer::StreamingSource::postReadBuffer() { 158 { 159 Mutex::Autolock _l(mBufferingLock); 160 if (mFinalResult != OK) { 161 return mFinalResult; 162 } 163 if (mBuffering) { 164 return OK; 165 } 166 mBuffering = true; 167 } 168 169 (new AMessage(kWhatReadBuffer, this))->post(); 170 return OK; 171} 172 173bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() { 174 // We're going to buffer at least 2 secs worth data on all tracks before 175 // starting playback (both at startup and after a seek). 176 177 static const int64_t kMinDurationUs = 2000000ll; 178 179 sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/); 180 sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/); 181 182 status_t err; 183 int64_t durationUs; 184 if (audioTrack != NULL 185 && (durationUs = audioTrack->getBufferedDurationUs(&err)) 186 < kMinDurationUs 187 && err == OK) { 188 ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)", 189 durationUs / 1E6); 190 return false; 191 } 192 193 if (videoTrack != NULL 194 && (durationUs = videoTrack->getBufferedDurationUs(&err)) 195 < kMinDurationUs 196 && err == OK) { 197 ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)", 198 durationUs / 1E6); 199 return false; 200 } 201 202 return true; 203} 204 205void NuPlayer::StreamingSource::setError(status_t err) { 206 Mutex::Autolock _l(mBufferingLock); 207 mFinalResult = err; 208} 209 210sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) { 211 if (mTSParser == NULL) { 212 return NULL; 213 } 214 215 sp<MediaSource> source = mTSParser->getSource( 216 audio ? ATSParser::AUDIO : ATSParser::VIDEO); 217 218 return static_cast<AnotherPacketSource *>(source.get()); 219} 220 221sp<AMessage> NuPlayer::StreamingSource::getFormat(bool audio) { 222 sp<AnotherPacketSource> source = getSource(audio); 223 224 sp<AMessage> format = new AMessage; 225 if (source == NULL) { 226 format->setInt32("err", -EWOULDBLOCK); 227 return format; 228 } 229 230 sp<MetaData> meta = source->getFormat(); 231 if (meta == NULL) { 232 format->setInt32("err", -EWOULDBLOCK); 233 return format; 234 } 235 status_t err = convertMetaDataToMessage(meta, &format); 236 if (err != OK) { // format may have been cleared on error 237 return NULL; 238 } 239 return format; 240} 241 242status_t NuPlayer::StreamingSource::dequeueAccessUnit( 243 bool audio, sp<ABuffer> *accessUnit) { 244 sp<AnotherPacketSource> source = getSource(audio); 245 246 if (source == NULL) { 247 return -EWOULDBLOCK; 248 } 249 250 if (!haveSufficientDataOnAllTracks()) { 251 postReadBuffer(); 252 } 253 254 status_t finalResult; 255 if (!source->hasBufferAvailable(&finalResult)) { 256 return finalResult == OK ? -EWOULDBLOCK : finalResult; 257 } 258 259 status_t err = source->dequeueAccessUnit(accessUnit); 260 261#if !defined(LOG_NDEBUG) || LOG_NDEBUG == 0 262 if (err == OK) { 263 int64_t timeUs; 264 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 265 ALOGV("dequeueAccessUnit timeUs=%lld us", timeUs); 266 } 267#endif 268 269 return err; 270} 271 272bool NuPlayer::StreamingSource::isRealTime() const { 273 return mSource->flags() & IStreamSource::kFlagIsRealTimeData; 274} 275 276void NuPlayer::StreamingSource::onMessageReceived( 277 const sp<AMessage> &msg) { 278 switch (msg->what()) { 279 case kWhatReadBuffer: 280 { 281 onReadBuffer(); 282 283 { 284 Mutex::Autolock _l(mBufferingLock); 285 mBuffering = false; 286 } 287 break; 288 } 289 default: 290 { 291 TRESPASS(); 292 } 293 } 294} 295 296 297} // namespace android 298 299