StreamingSource.cpp revision 093024bfa271988655327e0fb761b581afa8bc11
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "StreamingSource" 19#include <utils/Log.h> 20 21#include "StreamingSource.h" 22 23#include "ATSParser.h" 24#include "AnotherPacketSource.h" 25#include "NuPlayerStreamListener.h" 26 27#include <media/stagefright/foundation/ABuffer.h> 28#include <media/stagefright/foundation/ADebug.h> 29#include <media/stagefright/foundation/AMessage.h> 30#include <media/stagefright/MediaSource.h> 31#include <media/stagefright/MetaData.h> 32#include <media/stagefright/Utils.h> 33 34namespace android { 35 36const int32_t kNumListenerQueuePackets = 80; 37 38NuPlayer::StreamingSource::StreamingSource( 39 const sp<AMessage> ¬ify, 40 const sp<IStreamSource> &source) 41 : Source(notify), 42 mSource(source), 43 mFinalResult(OK), 44 mBuffering(false) { 45} 46 47NuPlayer::StreamingSource::~StreamingSource() { 48 if (mLooper != NULL) { 49 mLooper->unregisterHandler(id()); 50 mLooper->stop(); 51 } 52} 53 54void NuPlayer::StreamingSource::prepareAsync() { 55 if (mLooper == NULL) { 56 mLooper = new ALooper; 57 mLooper->setName("streaming"); 58 mLooper->start(); 59 60 mLooper->registerHandler(this); 61 } 62 63 notifyVideoSizeChanged(); 64 notifyFlagsChanged(0); 65 notifyPrepared(); 66} 67 68void NuPlayer::StreamingSource::start() { 69 mStreamListener = new NuPlayerStreamListener(mSource, NULL); 70 71 uint32_t sourceFlags = mSource->flags(); 72 73 uint32_t parserFlags = ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE; 74 if (sourceFlags & IStreamSource::kFlagAlignedVideoData) { 75 parserFlags |= ATSParser::ALIGNED_VIDEO_DATA; 76 } 77 78 mTSParser = new ATSParser(parserFlags); 79 80 mStreamListener->start(); 81 82 postReadBuffer(); 83} 84 85status_t NuPlayer::StreamingSource::feedMoreTSData() { 86 return postReadBuffer(); 87} 88 89void NuPlayer::StreamingSource::onReadBuffer() { 90 for (int32_t i = 0; i < kNumListenerQueuePackets; ++i) { 91 char buffer[188]; 92 sp<AMessage> extra; 93 ssize_t n = mStreamListener->read(buffer, sizeof(buffer), &extra); 94 95 if (n == 0) { 96 ALOGI("input data EOS reached."); 97 mTSParser->signalEOS(ERROR_END_OF_STREAM); 98 setError(ERROR_END_OF_STREAM); 99 break; 100 } else if (n == INFO_DISCONTINUITY) { 101 int32_t type = ATSParser::DISCONTINUITY_TIME; 102 103 int32_t mask; 104 if (extra != NULL 105 && extra->findInt32( 106 IStreamListener::kKeyDiscontinuityMask, &mask)) { 107 if (mask == 0) { 108 ALOGE("Client specified an illegal discontinuity type."); 109 setError(ERROR_UNSUPPORTED); 110 break; 111 } 112 113 type = mask; 114 } 115 116 mTSParser->signalDiscontinuity( 117 (ATSParser::DiscontinuityType)type, extra); 118 } else if (n < 0) { 119 break; 120 } else { 121 if (buffer[0] == 0x00) { 122 // XXX legacy 123 124 if (extra == NULL) { 125 extra = new AMessage; 126 } 127 128 uint8_t type = buffer[1]; 129 130 if (type & 2) { 131 int64_t mediaTimeUs; 132 memcpy(&mediaTimeUs, &buffer[2], sizeof(mediaTimeUs)); 133 134 extra->setInt64(IStreamListener::kKeyMediaTimeUs, mediaTimeUs); 135 } 136 137 mTSParser->signalDiscontinuity( 138 ((type & 1) == 0) 139 ? ATSParser::DISCONTINUITY_TIME 140 : ATSParser::DISCONTINUITY_FORMATCHANGE, 141 extra); 142 } else { 143 status_t err = mTSParser->feedTSPacket(buffer, sizeof(buffer)); 144 145 if (err != OK) { 146 ALOGE("TS Parser returned error %d", err); 147 148 mTSParser->signalEOS(err); 149 setError(err); 150 break; 151 } 152 } 153 } 154 } 155} 156 157status_t NuPlayer::StreamingSource::postReadBuffer() { 158 { 159 Mutex::Autolock _l(mBufferingLock); 160 if (mFinalResult != OK) { 161 return mFinalResult; 162 } 163 if (mBuffering) { 164 return OK; 165 } 166 mBuffering = true; 167 } 168 169 (new AMessage(kWhatReadBuffer, this))->post(); 170 return OK; 171} 172 173bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() { 174 // We're going to buffer at least 2 secs worth data on all tracks before 175 // starting playback (both at startup and after a seek). 176 177 static const int64_t kMinDurationUs = 2000000ll; 178 179 sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/); 180 sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/); 181 182 status_t err; 183 int64_t durationUs; 184 if (audioTrack != NULL 185 && (durationUs = audioTrack->getBufferedDurationUs(&err)) 186 < kMinDurationUs 187 && err == OK) { 188 ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)", 189 durationUs / 1E6); 190 return false; 191 } 192 193 if (videoTrack != NULL 194 && (durationUs = videoTrack->getBufferedDurationUs(&err)) 195 < kMinDurationUs 196 && err == OK) { 197 ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)", 198 durationUs / 1E6); 199 return false; 200 } 201 202 return true; 203} 204 205void NuPlayer::StreamingSource::setError(status_t err) { 206 Mutex::Autolock _l(mBufferingLock); 207 mFinalResult = err; 208} 209 210sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) { 211 if (mTSParser == NULL) { 212 return NULL; 213 } 214 215 sp<MediaSource> source = mTSParser->getSource( 216 audio ? ATSParser::AUDIO : ATSParser::VIDEO); 217 218 return static_cast<AnotherPacketSource *>(source.get()); 219} 220 221sp<AMessage> NuPlayer::StreamingSource::getFormat(bool audio) { 222 sp<AnotherPacketSource> source = getSource(audio); 223 224 sp<AMessage> format = new AMessage; 225 if (source == NULL) { 226 format->setInt32("err", -EWOULDBLOCK); 227 return format; 228 } 229 230 sp<MetaData> meta = source->getFormat(); 231 status_t err = convertMetaDataToMessage(meta, &format); 232 if (err != OK) { 233 format->setInt32("err", err); 234 } 235 return format; 236} 237 238status_t NuPlayer::StreamingSource::dequeueAccessUnit( 239 bool audio, sp<ABuffer> *accessUnit) { 240 sp<AnotherPacketSource> source = getSource(audio); 241 242 if (source == NULL) { 243 return -EWOULDBLOCK; 244 } 245 246 if (!haveSufficientDataOnAllTracks()) { 247 postReadBuffer(); 248 } 249 250 status_t finalResult; 251 if (!source->hasBufferAvailable(&finalResult)) { 252 return finalResult == OK ? -EWOULDBLOCK : finalResult; 253 } 254 255 status_t err = source->dequeueAccessUnit(accessUnit); 256 257#if !defined(LOG_NDEBUG) || LOG_NDEBUG == 0 258 if (err == OK) { 259 int64_t timeUs; 260 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 261 ALOGV("dequeueAccessUnit timeUs=%lld us", timeUs); 262 } 263#endif 264 265 return err; 266} 267 268bool NuPlayer::StreamingSource::isRealTime() const { 269 return mSource->flags() & IStreamSource::kFlagIsRealTimeData; 270} 271 272void NuPlayer::StreamingSource::onMessageReceived( 273 const sp<AMessage> &msg) { 274 switch (msg->what()) { 275 case kWhatReadBuffer: 276 { 277 onReadBuffer(); 278 279 { 280 Mutex::Autolock _l(mBufferingLock); 281 mBuffering = false; 282 } 283 break; 284 } 285 default: 286 { 287 TRESPASS(); 288 } 289 } 290} 291 292 293} // namespace android 294 295