1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "StreamingSource" 19#include <utils/Log.h> 20 21#include "StreamingSource.h" 22 23#include "ATSParser.h" 24#include "AnotherPacketSource.h" 25#include "NuPlayerStreamListener.h" 26 27#include <media/MediaSource.h> 28#include <media/stagefright/foundation/ABuffer.h> 29#include <media/stagefright/foundation/ADebug.h> 30#include <media/stagefright/foundation/AMessage.h> 31#include <media/stagefright/foundation/MediaKeys.h> 32#include <media/stagefright/MetaData.h> 33#include <media/stagefright/Utils.h> 34 35namespace android { 36 37const int32_t kNumListenerQueuePackets = 80; 38 39NuPlayer::StreamingSource::StreamingSource( 40 const sp<AMessage> ¬ify, 41 const sp<IStreamSource> &source) 42 : Source(notify), 43 mSource(source), 44 mFinalResult(OK), 45 mBuffering(false) { 46} 47 48NuPlayer::StreamingSource::~StreamingSource() { 49 if (mLooper != NULL) { 50 mLooper->unregisterHandler(id()); 51 mLooper->stop(); 52 } 53} 54 55status_t NuPlayer::StreamingSource::getBufferingSettings( 56 BufferingSettings *buffering /* nonnull */) { 57 *buffering = BufferingSettings(); 58 return OK; 59} 60 61status_t NuPlayer::StreamingSource::setBufferingSettings( 62 const BufferingSettings & /* buffering */) { 63 return OK; 64} 65 66void NuPlayer::StreamingSource::prepareAsync() { 67 if (mLooper == NULL) { 68 mLooper = new ALooper; 69 mLooper->setName("streaming"); 70 mLooper->start(); 71 72 mLooper->registerHandler(this); 73 } 74 75 notifyVideoSizeChanged(); 76 notifyFlagsChanged(0); 77 notifyPrepared(); 78} 79 80void NuPlayer::StreamingSource::start() { 81 mStreamListener = new NuPlayerStreamListener(mSource, NULL); 82 83 uint32_t sourceFlags = mSource->flags(); 84 85 uint32_t parserFlags = ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE; 86 if (sourceFlags & IStreamSource::kFlagAlignedVideoData) { 87 parserFlags |= ATSParser::ALIGNED_VIDEO_DATA; 88 } 89 90 mTSParser = new ATSParser(parserFlags); 91 92 mStreamListener->start(); 93 94 postReadBuffer(); 95} 96 97status_t NuPlayer::StreamingSource::feedMoreTSData() { 98 return postReadBuffer(); 99} 100 101void NuPlayer::StreamingSource::onReadBuffer() { 102 for (int32_t i = 0; i < kNumListenerQueuePackets; ++i) { 103 char buffer[188]; 104 sp<AMessage> extra; 105 ssize_t n = mStreamListener->read(buffer, sizeof(buffer), &extra); 106 107 if (n == 0) { 108 ALOGI("input data EOS reached."); 109 mTSParser->signalEOS(ERROR_END_OF_STREAM); 110 setError(ERROR_END_OF_STREAM); 111 break; 112 } else if (n == INFO_DISCONTINUITY) { 113 int32_t type = ATSParser::DISCONTINUITY_TIME; 114 115 int32_t mask; 116 if (extra != NULL 117 && extra->findInt32( 118 kIStreamListenerKeyDiscontinuityMask, &mask)) { 119 if (mask == 0) { 120 ALOGE("Client specified an illegal discontinuity type."); 121 setError(ERROR_UNSUPPORTED); 122 break; 123 } 124 125 type = mask; 126 } 127 128 mTSParser->signalDiscontinuity( 129 (ATSParser::DiscontinuityType)type, extra); 130 } else if (n < 0) { 131 break; 132 } else { 133 if (buffer[0] == 0x00) { 134 // XXX legacy 135 136 if (extra == NULL) { 137 extra = new AMessage; 138 } 139 140 uint8_t type = buffer[1]; 141 142 if (type & 2) { 143 int64_t mediaTimeUs; 144 memcpy(&mediaTimeUs, &buffer[2], sizeof(mediaTimeUs)); 145 146 extra->setInt64(kATSParserKeyMediaTimeUs, mediaTimeUs); 147 } 148 149 mTSParser->signalDiscontinuity( 150 ((type & 1) == 0) 151 ? ATSParser::DISCONTINUITY_TIME 152 : ATSParser::DISCONTINUITY_FORMATCHANGE, 153 extra); 154 } else { 155 status_t err = mTSParser->feedTSPacket(buffer, sizeof(buffer)); 156 157 if (err != OK) { 158 ALOGE("TS Parser returned error %d", err); 159 160 mTSParser->signalEOS(err); 161 setError(err); 162 break; 163 } 164 } 165 } 166 } 167} 168 169status_t NuPlayer::StreamingSource::postReadBuffer() { 170 { 171 Mutex::Autolock _l(mBufferingLock); 172 if (mFinalResult != OK) { 173 return mFinalResult; 174 } 175 if (mBuffering) { 176 return OK; 177 } 178 mBuffering = true; 179 } 180 181 (new AMessage(kWhatReadBuffer, this))->post(); 182 return OK; 183} 184 185bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() { 186 // We're going to buffer at least 2 secs worth data on all tracks before 187 // starting playback (both at startup and after a seek). 188 189 static const int64_t kMinDurationUs = 2000000ll; 190 191 sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/); 192 sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/); 193 194 status_t err; 195 int64_t durationUs; 196 if (audioTrack != NULL 197 && (durationUs = audioTrack->getBufferedDurationUs(&err)) 198 < kMinDurationUs 199 && err == OK) { 200 ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)", 201 durationUs / 1E6); 202 return false; 203 } 204 205 if (videoTrack != NULL 206 && (durationUs = videoTrack->getBufferedDurationUs(&err)) 207 < kMinDurationUs 208 && err == OK) { 209 ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)", 210 durationUs / 1E6); 211 return false; 212 } 213 214 return true; 215} 216 217void NuPlayer::StreamingSource::setError(status_t err) { 218 Mutex::Autolock _l(mBufferingLock); 219 mFinalResult = err; 220} 221 222sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) { 223 if (mTSParser == NULL) { 224 return NULL; 225 } 226 227 sp<MediaSource> source = mTSParser->getSource( 228 audio ? ATSParser::AUDIO : ATSParser::VIDEO); 229 230 return static_cast<AnotherPacketSource *>(source.get()); 231} 232 233sp<AMessage> NuPlayer::StreamingSource::getFormat(bool audio) { 234 sp<AnotherPacketSource> source = getSource(audio); 235 236 sp<AMessage> format = new AMessage; 237 if (source == NULL) { 238 format->setInt32("err", -EWOULDBLOCK); 239 return format; 240 } 241 242 sp<MetaData> meta = source->getFormat(); 243 if (meta == NULL) { 244 format->setInt32("err", -EWOULDBLOCK); 245 return format; 246 } 247 status_t err = convertMetaDataToMessage(meta, &format); 248 if (err != OK) { // format may have been cleared on error 249 return NULL; 250 } 251 return format; 252} 253 254status_t NuPlayer::StreamingSource::dequeueAccessUnit( 255 bool audio, sp<ABuffer> *accessUnit) { 256 sp<AnotherPacketSource> source = getSource(audio); 257 258 if (source == NULL) { 259 return -EWOULDBLOCK; 260 } 261 262 if (!haveSufficientDataOnAllTracks()) { 263 postReadBuffer(); 264 } 265 266 status_t finalResult; 267 if (!source->hasBufferAvailable(&finalResult)) { 268 return finalResult == OK ? -EWOULDBLOCK : finalResult; 269 } 270 271 status_t err = source->dequeueAccessUnit(accessUnit); 272 273#if !defined(LOG_NDEBUG) || LOG_NDEBUG == 0 274 if (err == OK) { 275 int64_t timeUs; 276 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 277 ALOGV("dequeueAccessUnit timeUs=%lld us", timeUs); 278 } 279#endif 280 281 return err; 282} 283 284bool NuPlayer::StreamingSource::isRealTime() const { 285 return mSource->flags() & IStreamSource::kFlagIsRealTimeData; 286} 287 288void NuPlayer::StreamingSource::onMessageReceived( 289 const sp<AMessage> &msg) { 290 switch (msg->what()) { 291 case kWhatReadBuffer: 292 { 293 onReadBuffer(); 294 295 { 296 Mutex::Autolock _l(mBufferingLock); 297 mBuffering = false; 298 } 299 break; 300 } 301 default: 302 { 303 TRESPASS(); 304 } 305 } 306} 307 308 309} // namespace android 310 311