LiveSession.cpp revision 6f9c5e26c710dbee50e57316f1c460dda4850fa5
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22#include "HTTPDownloader.h" 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "mpeg2ts/AnotherPacketSource.h" 27 28#include <cutils/properties.h> 29#include <media/IMediaHTTPService.h> 30#include <media/stagefright/foundation/ABuffer.h> 31#include <media/stagefright/foundation/ADebug.h> 32#include <media/stagefright/foundation/AMessage.h> 33#include <media/stagefright/foundation/AUtils.h> 34#include <media/stagefright/MediaDefs.h> 35#include <media/stagefright/MetaData.h> 36#include <media/stagefright/Utils.h> 37 38#include <utils/Mutex.h> 39 40#include <ctype.h> 41#include <inttypes.h> 42 43namespace android { 44 45// static 46// Bandwidth Switch Mark Defaults 47const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll; 48const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll; 49const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll; 50const int64_t LiveSession::kResumeThresholdUs = 100000ll; 51 52// Buffer Prepare/Ready/Underflow Marks 53const int64_t LiveSession::kReadyMarkUs = 5000000ll; 54const int64_t LiveSession::kPrepareMarkUs = 1500000ll; 55const int64_t LiveSession::kUnderflowMarkUs = 1000000ll; 56 57struct LiveSession::BandwidthEstimator : public RefBase { 58 BandwidthEstimator(); 59 60 void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); 61 bool estimateBandwidth(int32_t *bandwidth, bool *isStable = NULL); 62 63private: 64 // Bandwidth estimation parameters 65 static const int32_t kMinBandwidthHistoryItems = 20; 66 static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec 67 static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec 68 69 struct BandwidthEntry { 70 int64_t mDelayUs; 71 size_t mNumBytes; 72 }; 73 74 Mutex mLock; 75 List<BandwidthEntry> mBandwidthHistory; 76 List<int32_t> mPrevEstimates; 77 bool mHasNewSample; 78 bool mIsStable; 79 int64_t mTotalTransferTimeUs; 80 size_t mTotalTransferBytes; 81 82 DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator); 83}; 84 85LiveSession::BandwidthEstimator::BandwidthEstimator() : 86 mHasNewSample(false), 87 mIsStable(true), 88 mTotalTransferTimeUs(0), 89 mTotalTransferBytes(0) { 90} 91 92void LiveSession::BandwidthEstimator::addBandwidthMeasurement( 93 size_t numBytes, int64_t delayUs) { 94 AutoMutex autoLock(mLock); 95 96 BandwidthEntry entry; 97 entry.mDelayUs = delayUs; 98 entry.mNumBytes = numBytes; 99 mTotalTransferTimeUs += delayUs; 100 mTotalTransferBytes += numBytes; 101 mBandwidthHistory.push_back(entry); 102 mHasNewSample = true; 103 104 // Remove no more than 10% of total transfer time at a time 105 // to avoid sudden jump on bandwidth estimation. There might 106 // be long blocking reads that takes up signification time, 107 // we have to keep a longer window in that case. 108 int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10; 109 if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) { 110 bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs; 111 } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) { 112 bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs; 113 } 114 // trim old samples, keeping at least kMaxBandwidthHistoryItems samples, 115 // and total transfer time at least kMaxBandwidthHistoryWindowUs. 116 while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) { 117 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 118 if (mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) { 119 break; 120 } 121 mTotalTransferTimeUs -= it->mDelayUs; 122 mTotalTransferBytes -= it->mNumBytes; 123 mBandwidthHistory.erase(mBandwidthHistory.begin()); 124 } 125} 126 127bool LiveSession::BandwidthEstimator::estimateBandwidth( 128 int32_t *bandwidthBps, bool *isStable) { 129 AutoMutex autoLock(mLock); 130 131 if (mBandwidthHistory.size() < 2) { 132 return false; 133 } 134 135 if (!mHasNewSample) { 136 *bandwidthBps = *(--mPrevEstimates.end()); 137 if (isStable) { 138 *isStable = mIsStable; 139 } 140 return true; 141 } 142 143 *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs); 144 mPrevEstimates.push_back(*bandwidthBps); 145 while (mPrevEstimates.size() > 3) { 146 mPrevEstimates.erase(mPrevEstimates.begin()); 147 } 148 mHasNewSample = false; 149 150 int32_t minEstimate = -1, maxEstimate = -1; 151 List<int32_t>::iterator it; 152 for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) { 153 int32_t estimate = *it; 154 if (minEstimate < 0 || minEstimate > estimate) { 155 minEstimate = estimate; 156 } 157 if (maxEstimate < 0 || maxEstimate < estimate) { 158 maxEstimate = estimate; 159 } 160 } 161 mIsStable = (maxEstimate <= minEstimate * 4 / 3); 162 if (isStable) { 163 *isStable = mIsStable; 164 } 165#if 0 166 { 167 char dumpStr[1024] = {0}; 168 size_t itemIdx = 0; 169 size_t histSize = mBandwidthHistory.size(); 170 sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {", 171 *bandwidthBps, mIsStable, histSize); 172 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 173 for (; it != mBandwidthHistory.end(); ++it) { 174 if (itemIdx > 50) { 175 sprintf(dumpStr + strlen(dumpStr), 176 "...(%zd more items)... }", histSize - itemIdx); 177 break; 178 } 179 sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s", 180 it->mNumBytes / 1024, 181 (double)it->mDelayUs * 1.0e-6, 182 (it == (--mBandwidthHistory.end())) ? "}" : ", "); 183 itemIdx++; 184 } 185 ALOGE(dumpStr); 186 } 187#endif 188 return true; 189} 190 191//static 192const char *LiveSession::getKeyForStream(StreamType type) { 193 switch (type) { 194 case STREAMTYPE_VIDEO: 195 return "timeUsVideo"; 196 case STREAMTYPE_AUDIO: 197 return "timeUsAudio"; 198 case STREAMTYPE_SUBTITLES: 199 return "timeUsSubtitle"; 200 case STREAMTYPE_METADATA: 201 return "timeUsMetadata"; // unused 202 default: 203 TRESPASS(); 204 } 205 return NULL; 206} 207 208//static 209const char *LiveSession::getNameForStream(StreamType type) { 210 switch (type) { 211 case STREAMTYPE_VIDEO: 212 return "video"; 213 case STREAMTYPE_AUDIO: 214 return "audio"; 215 case STREAMTYPE_SUBTITLES: 216 return "subs"; 217 case STREAMTYPE_METADATA: 218 return "metadata"; 219 default: 220 break; 221 } 222 return "unknown"; 223} 224 225//static 226ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) { 227 switch (type) { 228 case STREAMTYPE_VIDEO: 229 return ATSParser::VIDEO; 230 case STREAMTYPE_AUDIO: 231 return ATSParser::AUDIO; 232 case STREAMTYPE_METADATA: 233 return ATSParser::META; 234 case STREAMTYPE_SUBTITLES: 235 default: 236 TRESPASS(); 237 } 238 return ATSParser::NUM_SOURCE_TYPES; // should not reach here 239} 240 241LiveSession::LiveSession( 242 const sp<AMessage> ¬ify, uint32_t flags, 243 const sp<IMediaHTTPService> &httpService) 244 : mNotify(notify), 245 mFlags(flags), 246 mHTTPService(httpService), 247 mBuffering(false), 248 mInPreparationPhase(true), 249 mPollBufferingGeneration(0), 250 mPrevBufferPercentage(-1), 251 mCurBandwidthIndex(-1), 252 mOrigBandwidthIndex(-1), 253 mLastBandwidthBps(-1ll), 254 mBandwidthEstimator(new BandwidthEstimator()), 255 mMaxWidth(720), 256 mMaxHeight(480), 257 mStreamMask(0), 258 mNewStreamMask(0), 259 mSwapMask(0), 260 mSwitchGeneration(0), 261 mSubtitleGeneration(0), 262 mLastDequeuedTimeUs(0ll), 263 mRealTimeBaseUs(0ll), 264 mReconfigurationInProgress(false), 265 mSwitchInProgress(false), 266 mUpSwitchMark(kUpSwitchMarkUs), 267 mDownSwitchMark(kDownSwitchMarkUs), 268 mUpSwitchMargin(kUpSwitchMarginUs), 269 mFirstTimeUsValid(false), 270 mFirstTimeUs(0), 271 mLastSeekTimeUs(0), 272 mHasMetadata(false) { 273 mStreams[kAudioIndex] = StreamItem("audio"); 274 mStreams[kVideoIndex] = StreamItem("video"); 275 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 276 277 for (size_t i = 0; i < kNumSources; ++i) { 278 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 279 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 280 } 281} 282 283LiveSession::~LiveSession() { 284 if (mFetcherLooper != NULL) { 285 mFetcherLooper->stop(); 286 } 287} 288 289int64_t LiveSession::calculateMediaTimeUs( 290 int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) { 291 if (timeUs >= firstTimeUs) { 292 timeUs -= firstTimeUs; 293 } else { 294 timeUs = 0; 295 } 296 timeUs += mLastSeekTimeUs; 297 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 298 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 299 } 300 return timeUs; 301} 302 303status_t LiveSession::dequeueAccessUnit( 304 StreamType stream, sp<ABuffer> *accessUnit) { 305 status_t finalResult = OK; 306 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 307 308 ssize_t streamIdx = typeToIndex(stream); 309 if (streamIdx < 0) { 310 return BAD_VALUE; 311 } 312 const char *streamStr = getNameForStream(stream); 313 // Do not let client pull data if we don't have data packets yet. 314 // We might only have a format discontinuity queued without data. 315 // When NuPlayerDecoder dequeues the format discontinuity, it will 316 // immediately try to getFormat. If we return NULL, NuPlayerDecoder 317 // thinks it can do seamless change, so will not shutdown decoder. 318 // When the actual format arrives, it can't handle it and get stuck. 319 if (!packetSource->hasDataBufferAvailable(&finalResult)) { 320 ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)", 321 streamStr, finalResult); 322 323 if (finalResult == OK) { 324 return -EAGAIN; 325 } else { 326 return finalResult; 327 } 328 } 329 330 // Let the client dequeue as long as we have buffers available 331 // Do not make pause/resume decisions here. 332 333 status_t err = packetSource->dequeueAccessUnit(accessUnit); 334 335 if (err == INFO_DISCONTINUITY) { 336 // adaptive streaming, discontinuities in the playlist 337 int32_t type; 338 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 339 340 sp<AMessage> extra; 341 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 342 extra.clear(); 343 } 344 345 ALOGI("[%s] read discontinuity of type %d, extra = %s", 346 streamStr, 347 type, 348 extra == NULL ? "NULL" : extra->debugString().c_str()); 349 } else if (err == OK) { 350 351 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 352 int64_t timeUs, originalTimeUs; 353 int32_t discontinuitySeq = 0; 354 StreamItem& strm = mStreams[streamIdx]; 355 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 356 originalTimeUs = timeUs; 357 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 358 if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { 359 int64_t offsetTimeUs; 360 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 361 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); 362 } else { 363 offsetTimeUs = 0; 364 } 365 366 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0 367 && strm.mLastDequeuedTimeUs >= 0) { 368 int64_t firstTimeUs; 369 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 370 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 371 offsetTimeUs += strm.mLastSampleDurationUs; 372 } else { 373 offsetTimeUs += strm.mLastSampleDurationUs; 374 } 375 376 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); 377 strm.mCurDiscontinuitySeq = discontinuitySeq; 378 } 379 380 int32_t discard = 0; 381 int64_t firstTimeUs; 382 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 383 int64_t durUs; // approximate sample duration 384 if (timeUs > strm.mLastDequeuedTimeUs) { 385 durUs = timeUs - strm.mLastDequeuedTimeUs; 386 } else { 387 durUs = strm.mLastDequeuedTimeUs - timeUs; 388 } 389 strm.mLastSampleDurationUs = durUs; 390 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 391 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 392 firstTimeUs = timeUs; 393 } else { 394 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 395 firstTimeUs = timeUs; 396 } 397 398 strm.mLastDequeuedTimeUs = timeUs; 399 timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq); 400 401 ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us", 402 streamStr, (long long)timeUs, (long long)originalTimeUs); 403 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 404 mLastDequeuedTimeUs = timeUs; 405 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 406 } else if (stream == STREAMTYPE_SUBTITLES) { 407 int32_t subtitleGeneration; 408 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 409 && subtitleGeneration != mSubtitleGeneration) { 410 return -EAGAIN; 411 }; 412 (*accessUnit)->meta()->setInt32( 413 "trackIndex", mPlaylist->getSelectedIndex()); 414 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 415 } else if (stream == STREAMTYPE_METADATA) { 416 HLSTime mdTime((*accessUnit)->meta()); 417 if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) { 418 packetSource->requeueAccessUnit((*accessUnit)); 419 return -EAGAIN; 420 } else { 421 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq); 422 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq); 423 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 424 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 425 } 426 } 427 } else { 428 ALOGI("[%s] encountered error %d", streamStr, err); 429 } 430 431 return err; 432} 433 434status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 435 if (!(mStreamMask & stream)) { 436 return UNKNOWN_ERROR; 437 } 438 439 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 440 441 sp<MetaData> meta = packetSource->getFormat(); 442 443 if (meta == NULL) { 444 return -EAGAIN; 445 } 446 447 if (stream == STREAMTYPE_AUDIO) { 448 // set AAC input buffer size to 32K bytes (256kbps x 1sec) 449 meta->setInt32(kKeyMaxInputSize, 32 * 1024); 450 } else if (stream == STREAMTYPE_VIDEO) { 451 meta->setInt32(kKeyMaxWidth, mMaxWidth); 452 meta->setInt32(kKeyMaxHeight, mMaxHeight); 453 } 454 455 return convertMetaDataToMessage(meta, format); 456} 457 458sp<HTTPDownloader> LiveSession::getHTTPDownloader() { 459 return new HTTPDownloader(mHTTPService, mExtraHeaders); 460} 461 462void LiveSession::connectAsync( 463 const char *url, const KeyedVector<String8, String8> *headers) { 464 sp<AMessage> msg = new AMessage(kWhatConnect, this); 465 msg->setString("url", url); 466 467 if (headers != NULL) { 468 msg->setPointer( 469 "headers", 470 new KeyedVector<String8, String8>(*headers)); 471 } 472 473 msg->post(); 474} 475 476status_t LiveSession::disconnect() { 477 sp<AMessage> msg = new AMessage(kWhatDisconnect, this); 478 479 sp<AMessage> response; 480 status_t err = msg->postAndAwaitResponse(&response); 481 482 return err; 483} 484 485status_t LiveSession::seekTo(int64_t timeUs) { 486 sp<AMessage> msg = new AMessage(kWhatSeek, this); 487 msg->setInt64("timeUs", timeUs); 488 489 sp<AMessage> response; 490 status_t err = msg->postAndAwaitResponse(&response); 491 492 return err; 493} 494 495bool LiveSession::checkSwitchProgress( 496 sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) { 497 AString newUri; 498 CHECK(stopParams->findString("uri", &newUri)); 499 500 *needResumeUntil = false; 501 sp<AMessage> firstNewMeta[kMaxStreams]; 502 for (size_t i = 0; i < kMaxStreams; ++i) { 503 StreamType stream = indexToType(i); 504 if (!(mSwapMask & mNewStreamMask & stream) 505 || (mStreams[i].mNewUri != newUri)) { 506 continue; 507 } 508 if (stream == STREAMTYPE_SUBTITLES) { 509 continue; 510 } 511 sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i); 512 513 // First, get latest dequeued meta, which is where the decoder is at. 514 // (when upswitching, we take the meta after a certain delay, so that 515 // the decoder is left with some cushion) 516 sp<AMessage> lastDequeueMeta, lastEnqueueMeta; 517 if (delayUs > 0) { 518 lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); 519 if (lastDequeueMeta == NULL) { 520 // this means we don't have enough cushion, try again later 521 ALOGV("[%s] up switching failed due to insufficient buffer", 522 getNameForStream(stream)); 523 return false; 524 } 525 } else { 526 // It's okay for lastDequeueMeta to be NULL here, it means the 527 // decoder hasn't even started dequeueing 528 lastDequeueMeta = source->getLatestDequeuedMeta(); 529 } 530 // Then, trim off packets at beginning of mPacketSources2 that's before 531 // the latest dequeued time. These samples are definitely too late. 532 firstNewMeta[i] = mPacketSources2.editValueAt(i) 533 ->trimBuffersBeforeMeta(lastDequeueMeta); 534 535 // Now firstNewMeta[i] is the first sample after the trim. 536 // If it's NULL, we failed because dequeue already past all samples 537 // in mPacketSource2, we have to try again. 538 if (firstNewMeta[i] == NULL) { 539 HLSTime dequeueTime(lastDequeueMeta); 540 ALOGV("[%s] dequeue time (%d, %lld) past start time", 541 getNameForStream(stream), 542 dequeueTime.mSeq, (long long) dequeueTime.mTimeUs); 543 return false; 544 } 545 546 // Otherwise, we check if mPacketSources2 overlaps with what old fetcher 547 // already fetched, and see if we need to resumeUntil 548 lastEnqueueMeta = source->getLatestEnqueuedMeta(); 549 // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity 550 // boundary, no need to resume as the content will look different anyways 551 if (lastEnqueueMeta != NULL) { 552 HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]); 553 554 // no need to resume old fetcher if new fetcher started in different 555 // discontinuity sequence, as the content will look different. 556 *needResumeUntil |= (startTime.mSeq == lastTime.mSeq 557 && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs); 558 559 // update the stopTime for resumeUntil 560 stopParams->setInt32("discontinuitySeq", startTime.mSeq); 561 stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs); 562 } 563 } 564 565 // if we're here, it means dequeue progress hasn't passed some samples in 566 // mPacketSource2, we can trim off the excess in mPacketSource. 567 // (old fetcher might still need to resumeUntil the start time of new fetcher) 568 for (size_t i = 0; i < kMaxStreams; ++i) { 569 StreamType stream = indexToType(i); 570 if (!(mSwapMask & mNewStreamMask & stream) 571 || (newUri != mStreams[i].mNewUri) 572 || stream == STREAMTYPE_SUBTITLES) { 573 continue; 574 } 575 mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]); 576 } 577 578 // no resumeUntil if already underflow 579 *needResumeUntil &= !mBuffering; 580 581 return true; 582} 583 584void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 585 switch (msg->what()) { 586 case kWhatConnect: 587 { 588 onConnect(msg); 589 break; 590 } 591 592 case kWhatDisconnect: 593 { 594 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 595 596 if (mReconfigurationInProgress) { 597 break; 598 } 599 600 finishDisconnect(); 601 break; 602 } 603 604 case kWhatSeek: 605 { 606 if (mReconfigurationInProgress) { 607 msg->post(50000); 608 break; 609 } 610 611 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 612 mSeekReply = new AMessage; 613 614 onSeek(msg); 615 break; 616 } 617 618 case kWhatFetcherNotify: 619 { 620 int32_t what; 621 CHECK(msg->findInt32("what", &what)); 622 623 switch (what) { 624 case PlaylistFetcher::kWhatStarted: 625 break; 626 case PlaylistFetcher::kWhatPaused: 627 case PlaylistFetcher::kWhatStopped: 628 { 629 AString uri; 630 CHECK(msg->findString("uri", &uri)); 631 ssize_t index = mFetcherInfos.indexOfKey(uri); 632 if (index < 0) { 633 // ignore msgs from fetchers that's already gone 634 break; 635 } 636 637 ALOGV("fetcher-%d %s", 638 mFetcherInfos[index].mFetcher->getFetcherID(), 639 what == PlaylistFetcher::kWhatPaused ? 640 "paused" : "stopped"); 641 642 if (what == PlaylistFetcher::kWhatStopped) { 643 mFetcherLooper->unregisterHandler( 644 mFetcherInfos[index].mFetcher->id()); 645 mFetcherInfos.removeItemsAt(index); 646 } else if (what == PlaylistFetcher::kWhatPaused) { 647 int32_t seekMode; 648 CHECK(msg->findInt32("seekMode", &seekMode)); 649 for (size_t i = 0; i < kMaxStreams; ++i) { 650 if (mStreams[i].mUri == uri) { 651 mStreams[i].mSeekMode = (SeekMode) seekMode; 652 } 653 } 654 } 655 656 if (mContinuation != NULL) { 657 CHECK_GT(mContinuationCounter, 0); 658 if (--mContinuationCounter == 0) { 659 mContinuation->post(); 660 } 661 ALOGV("%zu fetcher(s) left", mContinuationCounter); 662 } 663 break; 664 } 665 666 case PlaylistFetcher::kWhatDurationUpdate: 667 { 668 AString uri; 669 CHECK(msg->findString("uri", &uri)); 670 671 int64_t durationUs; 672 CHECK(msg->findInt64("durationUs", &durationUs)); 673 674 ssize_t index = mFetcherInfos.indexOfKey(uri); 675 if (index >= 0) { 676 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 677 info->mDurationUs = durationUs; 678 } 679 break; 680 } 681 682 case PlaylistFetcher::kWhatTargetDurationUpdate: 683 { 684 int64_t targetDurationUs; 685 CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); 686 mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4); 687 mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4); 688 mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs); 689 break; 690 } 691 692 case PlaylistFetcher::kWhatError: 693 { 694 status_t err; 695 CHECK(msg->findInt32("err", &err)); 696 697 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 698 699 // handle EOS on subtitle tracks independently 700 AString uri; 701 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 702 ssize_t i = mFetcherInfos.indexOfKey(uri); 703 if (i >= 0) { 704 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 705 if (fetcher != NULL) { 706 uint32_t type = fetcher->getStreamTypeMask(); 707 if (type == STREAMTYPE_SUBTITLES) { 708 mPacketSources.valueFor( 709 STREAMTYPE_SUBTITLES)->signalEOS(err);; 710 break; 711 } 712 } 713 } 714 } 715 716 if (mInPreparationPhase) { 717 postPrepared(err); 718 } 719 720 cancelBandwidthSwitch(); 721 722 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 723 724 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 725 726 mPacketSources.valueFor( 727 STREAMTYPE_SUBTITLES)->signalEOS(err); 728 729 postError(err); 730 break; 731 } 732 733 case PlaylistFetcher::kWhatStopReached: 734 { 735 ALOGV("kWhatStopReached"); 736 737 AString oldUri; 738 CHECK(msg->findString("uri", &oldUri)); 739 740 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 741 if (index < 0) { 742 break; 743 } 744 745 tryToFinishBandwidthSwitch(oldUri); 746 break; 747 } 748 749 case PlaylistFetcher::kWhatStartedAt: 750 { 751 int32_t switchGeneration; 752 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 753 754 ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d", 755 switchGeneration, mSwitchGeneration); 756 757 if (switchGeneration != mSwitchGeneration) { 758 break; 759 } 760 761 AString uri; 762 CHECK(msg->findString("uri", &uri)); 763 764 // mark new fetcher mToBeResumed 765 ssize_t index = mFetcherInfos.indexOfKey(uri); 766 if (index >= 0) { 767 mFetcherInfos.editValueAt(index).mToBeResumed = true; 768 } 769 770 // temporarily disable packet sources to be swapped to prevent 771 // NuPlayerDecoder from dequeuing while we check progress 772 for (size_t i = 0; i < mPacketSources.size(); ++i) { 773 if ((mSwapMask & mPacketSources.keyAt(i)) 774 && uri == mStreams[i].mNewUri) { 775 mPacketSources.editValueAt(i)->enable(false); 776 } 777 } 778 bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); 779 // If switching up, require a cushion bigger than kUnderflowMark 780 // to avoid buffering immediately after the switch. 781 // (If we don't have that cushion we'd rather cancel and try again.) 782 int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0; 783 bool needResumeUntil = false; 784 sp<AMessage> stopParams = msg; 785 if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { 786 // playback time hasn't passed startAt time 787 if (!needResumeUntil) { 788 ALOGV("finish switch"); 789 for (size_t i = 0; i < kMaxStreams; ++i) { 790 if ((mSwapMask & indexToType(i)) 791 && uri == mStreams[i].mNewUri) { 792 // have to make a copy of mStreams[i].mUri because 793 // tryToFinishBandwidthSwitch is modifying mStreams[] 794 AString oldURI = mStreams[i].mUri; 795 tryToFinishBandwidthSwitch(oldURI); 796 break; 797 } 798 } 799 } else { 800 // startAt time is after last enqueue time 801 // Resume fetcher for the original variant; the resumed fetcher should 802 // continue until the timestamps found in msg, which is stored by the 803 // new fetcher to indicate where the new variant has started buffering. 804 ALOGV("finish switch with resumeUntilAsync"); 805 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 806 const FetcherInfo &info = mFetcherInfos.valueAt(i); 807 if (info.mToBeRemoved) { 808 info.mFetcher->resumeUntilAsync(stopParams); 809 } 810 } 811 } 812 } else { 813 // playback time passed startAt time 814 if (switchUp) { 815 // if switching up, cancel and retry if condition satisfies again 816 ALOGV("cancel up switch because we're too late"); 817 cancelBandwidthSwitch(true /* resume */); 818 } else { 819 ALOGV("retry down switch at next sample"); 820 resumeFetcher(uri, mSwapMask, -1, true /* newUri */); 821 } 822 } 823 // re-enable all packet sources 824 for (size_t i = 0; i < mPacketSources.size(); ++i) { 825 mPacketSources.editValueAt(i)->enable(true); 826 } 827 828 break; 829 } 830 831 case PlaylistFetcher::kWhatPlaylistFetched: 832 { 833 onMasterPlaylistFetched(msg); 834 break; 835 } 836 837 case PlaylistFetcher::kWhatMetadataDetected: 838 { 839 if (!mHasMetadata) { 840 mHasMetadata = true; 841 sp<AMessage> notify = mNotify->dup(); 842 notify->setInt32("what", kWhatMetadataDetected); 843 notify->post(); 844 } 845 break; 846 } 847 848 default: 849 TRESPASS(); 850 } 851 852 break; 853 } 854 855 case kWhatChangeConfiguration: 856 { 857 onChangeConfiguration(msg); 858 break; 859 } 860 861 case kWhatChangeConfiguration2: 862 { 863 onChangeConfiguration2(msg); 864 break; 865 } 866 867 case kWhatChangeConfiguration3: 868 { 869 onChangeConfiguration3(msg); 870 break; 871 } 872 873 case kWhatPollBuffering: 874 { 875 int32_t generation; 876 CHECK(msg->findInt32("generation", &generation)); 877 if (generation == mPollBufferingGeneration) { 878 onPollBuffering(); 879 } 880 break; 881 } 882 883 default: 884 TRESPASS(); 885 break; 886 } 887} 888 889// static 890int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 891 if (a->mBandwidth < b->mBandwidth) { 892 return -1; 893 } else if (a->mBandwidth == b->mBandwidth) { 894 return 0; 895 } 896 897 return 1; 898} 899 900// static 901LiveSession::StreamType LiveSession::indexToType(int idx) { 902 CHECK(idx >= 0 && idx < kNumSources); 903 return (StreamType)(1 << idx); 904} 905 906// static 907ssize_t LiveSession::typeToIndex(int32_t type) { 908 switch (type) { 909 case STREAMTYPE_AUDIO: 910 return 0; 911 case STREAMTYPE_VIDEO: 912 return 1; 913 case STREAMTYPE_SUBTITLES: 914 return 2; 915 case STREAMTYPE_METADATA: 916 return 3; 917 default: 918 return -1; 919 }; 920 return -1; 921} 922 923void LiveSession::onConnect(const sp<AMessage> &msg) { 924 CHECK(msg->findString("url", &mMasterURL)); 925 926 // TODO currently we don't know if we are coming here from incognito mode 927 ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str()); 928 929 KeyedVector<String8, String8> *headers = NULL; 930 if (!msg->findPointer("headers", (void **)&headers)) { 931 mExtraHeaders.clear(); 932 } else { 933 mExtraHeaders = *headers; 934 935 delete headers; 936 headers = NULL; 937 } 938 939 // create looper for fetchers 940 if (mFetcherLooper == NULL) { 941 mFetcherLooper = new ALooper(); 942 943 mFetcherLooper->setName("Fetcher"); 944 mFetcherLooper->start(false, false); 945 } 946 947 // create fetcher to fetch the master playlist 948 addFetcher(mMasterURL.c_str())->fetchPlaylistAsync(); 949} 950 951void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) { 952 AString uri; 953 CHECK(msg->findString("uri", &uri)); 954 ssize_t index = mFetcherInfos.indexOfKey(uri); 955 if (index < 0) { 956 ALOGW("fetcher for master playlist is gone."); 957 return; 958 } 959 960 // no longer useful, remove 961 mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id()); 962 mFetcherInfos.removeItemsAt(index); 963 964 CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist)); 965 if (mPlaylist == NULL) { 966 ALOGE("unable to fetch master playlist %s.", 967 uriDebugString(mMasterURL).c_str()); 968 969 postPrepared(ERROR_IO); 970 return; 971 } 972 // We trust the content provider to make a reasonable choice of preferred 973 // initial bandwidth by listing it first in the variant playlist. 974 // At startup we really don't have a good estimate on the available 975 // network bandwidth since we haven't tranferred any data yet. Once 976 // we have we can make a better informed choice. 977 size_t initialBandwidth = 0; 978 size_t initialBandwidthIndex = 0; 979 980 int32_t maxWidth = 0; 981 int32_t maxHeight = 0; 982 983 if (mPlaylist->isVariantPlaylist()) { 984 Vector<BandwidthItem> itemsWithVideo; 985 for (size_t i = 0; i < mPlaylist->size(); ++i) { 986 BandwidthItem item; 987 988 item.mPlaylistIndex = i; 989 990 sp<AMessage> meta; 991 AString uri; 992 mPlaylist->itemAt(i, &uri, &meta); 993 994 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 995 996 int32_t width, height; 997 if (meta->findInt32("width", &width)) { 998 maxWidth = max(maxWidth, width); 999 } 1000 if (meta->findInt32("height", &height)) { 1001 maxHeight = max(maxHeight, height); 1002 } 1003 1004 mBandwidthItems.push(item); 1005 if (mPlaylist->hasType(i, "video")) { 1006 itemsWithVideo.push(item); 1007 } 1008 } 1009 // remove the audio-only variants if we have at least one with video 1010 if (!itemsWithVideo.empty() 1011 && itemsWithVideo.size() < mBandwidthItems.size()) { 1012 mBandwidthItems.clear(); 1013 for (size_t i = 0; i < itemsWithVideo.size(); ++i) { 1014 mBandwidthItems.push(itemsWithVideo[i]); 1015 } 1016 } 1017 1018 CHECK_GT(mBandwidthItems.size(), 0u); 1019 initialBandwidth = mBandwidthItems[0].mBandwidth; 1020 1021 mBandwidthItems.sort(SortByBandwidth); 1022 1023 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 1024 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 1025 initialBandwidthIndex = i; 1026 break; 1027 } 1028 } 1029 } else { 1030 // dummy item. 1031 BandwidthItem item; 1032 item.mPlaylistIndex = 0; 1033 item.mBandwidth = 0; 1034 mBandwidthItems.push(item); 1035 } 1036 1037 mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth; 1038 mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight; 1039 1040 mPlaylist->pickRandomMediaItems(); 1041 changeConfiguration( 1042 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 1043} 1044 1045void LiveSession::finishDisconnect() { 1046 ALOGV("finishDisconnect"); 1047 1048 // No reconfiguration is currently pending, make sure none will trigger 1049 // during disconnection either. 1050 cancelBandwidthSwitch(); 1051 1052 // cancel buffer polling 1053 cancelPollBuffering(); 1054 1055 // TRICKY: don't wait for all fetcher to be stopped when disconnecting 1056 // 1057 // Some fetchers might be stuck in connect/getSize at this point. These 1058 // operations will eventually timeout (as we have a timeout set in 1059 // MediaHTTPConnection), but we don't want to block the main UI thread 1060 // until then. Here we just need to make sure we clear all references 1061 // to the fetchers, so that when they finally exit from the blocking 1062 // operation, they can be destructed. 1063 // 1064 // There is one very tricky point though. For this scheme to work, the 1065 // fecther must hold a reference to LiveSession, so that LiveSession is 1066 // destroyed after fetcher. Otherwise LiveSession would get stuck in its 1067 // own destructor when it waits for mFetcherLooper to stop, which still 1068 // blocks main UI thread. 1069 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1070 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1071 mFetcherLooper->unregisterHandler( 1072 mFetcherInfos.valueAt(i).mFetcher->id()); 1073 } 1074 mFetcherInfos.clear(); 1075 1076 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 1077 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 1078 1079 mPacketSources.valueFor( 1080 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 1081 1082 sp<AMessage> response = new AMessage; 1083 response->setInt32("err", OK); 1084 1085 response->postReply(mDisconnectReplyID); 1086 mDisconnectReplyID.clear(); 1087} 1088 1089sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 1090 ssize_t index = mFetcherInfos.indexOfKey(uri); 1091 1092 if (index >= 0) { 1093 return NULL; 1094 } 1095 1096 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this); 1097 notify->setString("uri", uri); 1098 notify->setInt32("switchGeneration", mSwitchGeneration); 1099 1100 FetcherInfo info; 1101 info.mFetcher = new PlaylistFetcher( 1102 notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration); 1103 info.mDurationUs = -1ll; 1104 info.mToBeRemoved = false; 1105 info.mToBeResumed = false; 1106 mFetcherLooper->registerHandler(info.mFetcher); 1107 1108 mFetcherInfos.add(uri, info); 1109 1110 return info.mFetcher; 1111} 1112 1113#if 0 1114static double uniformRand() { 1115 return (double)rand() / RAND_MAX; 1116} 1117#endif 1118 1119bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) { 1120 ALOGI("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i, 1121 newUri ? "true" : "false", 1122 newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str()); 1123 return i >= 0 1124 && ((!newUri && uri == mStreams[i].mUri) 1125 || (newUri && uri == mStreams[i].mNewUri)); 1126} 1127 1128sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex( 1129 size_t trackIndex, bool newUri) { 1130 StreamType type = indexToType(trackIndex); 1131 sp<AnotherPacketSource> source = NULL; 1132 if (newUri) { 1133 source = mPacketSources2.valueFor(type); 1134 source->clear(); 1135 } else { 1136 source = mPacketSources.valueFor(type); 1137 }; 1138 return source; 1139} 1140 1141sp<AnotherPacketSource> LiveSession::getMetadataSource( 1142 sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) { 1143 // todo: One case where the following strategy can fail is when audio and video 1144 // are in separate playlists, both are transport streams, and the metadata 1145 // is actually contained in the audio stream. 1146 ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s", 1147 streamMask, newUri ? "true" : "false"); 1148 1149 if ((sources[kVideoIndex] != NULL) // video fetcher; or ... 1150 || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) { 1151 // ... audio fetcher for audio only variant 1152 return getPacketSourceForStreamIndex(kMetaDataIndex, newUri); 1153 } 1154 1155 return NULL; 1156} 1157 1158bool LiveSession::resumeFetcher( 1159 const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { 1160 ssize_t index = mFetcherInfos.indexOfKey(uri); 1161 if (index < 0) { 1162 ALOGE("did not find fetcher for uri: %s", uri.c_str()); 1163 return false; 1164 } 1165 1166 bool resume = false; 1167 sp<AnotherPacketSource> sources[kNumSources]; 1168 for (size_t i = 0; i < kMaxStreams; ++i) { 1169 if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) { 1170 resume = true; 1171 sources[i] = getPacketSourceForStreamIndex(i, newUri); 1172 } 1173 } 1174 1175 if (resume) { 1176 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher; 1177 SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; 1178 1179 ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d", 1180 fetcher->getFetcherID(), (long long)timeUs, seekMode); 1181 1182 fetcher->startAsync( 1183 sources[kAudioIndex], 1184 sources[kVideoIndex], 1185 sources[kSubtitleIndex], 1186 getMetadataSource(sources, streamMask, newUri), 1187 timeUs, -1, -1, seekMode); 1188 } 1189 1190 return resume; 1191} 1192 1193float LiveSession::getAbortThreshold( 1194 ssize_t currentBWIndex, ssize_t targetBWIndex) const { 1195 float abortThreshold = -1.0f; 1196 if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) { 1197 /* 1198 If we're switching down, we need to decide whether to 1199 1200 1) finish last segment of high-bandwidth variant, or 1201 2) abort last segment of high-bandwidth variant, and fetch an 1202 overlapping portion from low-bandwidth variant. 1203 1204 Here we try to maximize the amount of buffer left when the 1205 switch point is met. Given the following parameters: 1206 1207 B: our current buffering level in seconds 1208 T: target duration in seconds 1209 X: sample duration in seconds remain to fetch in last segment 1210 bw0: bandwidth of old variant (as specified in playlist) 1211 bw1: bandwidth of new variant (as specified in playlist) 1212 bw: measured bandwidth available 1213 1214 If we choose 1), when switch happens at the end of current 1215 segment, our buffering will be 1216 B + X - X * bw0 / bw 1217 1218 If we choose 2), when switch happens where we aborted current 1219 segment, our buffering will be 1220 B - (T - X) * bw1 / bw 1221 1222 We should only choose 1) if 1223 X/T < bw1 / (bw1 + bw0 - bw) 1224 */ 1225 1226 // Taking the measured current bandwidth at 50% face value only, 1227 // as our bandwidth estimation is a lagging indicator. Being 1228 // conservative on this, we prefer switching to lower bandwidth 1229 // unless we're really confident finishing up the last segment 1230 // of higher bandwidth will be fast. 1231 CHECK(mLastBandwidthBps >= 0); 1232 abortThreshold = 1233 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1234 / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1235 + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth 1236 - (float)mLastBandwidthBps * 0.5f); 1237 if (abortThreshold < 0.0f) { 1238 abortThreshold = -1.0f; // do not abort 1239 } 1240 ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f", 1241 mBandwidthItems.itemAt(currentBWIndex).mBandwidth, 1242 mBandwidthItems.itemAt(targetBWIndex).mBandwidth, 1243 mLastBandwidthBps, 1244 abortThreshold); 1245 } 1246 return abortThreshold; 1247} 1248 1249void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) { 1250 mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs); 1251} 1252 1253size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) { 1254 if (mBandwidthItems.size() < 2) { 1255 // shouldn't be here if we only have 1 bandwidth, check 1256 // logic to get rid of redundant bandwidth polling 1257 ALOGW("getBandwidthIndex() called for single bandwidth playlist!"); 1258 return 0; 1259 } 1260 1261#if 1 1262 char value[PROPERTY_VALUE_MAX]; 1263 ssize_t index = -1; 1264 if (property_get("media.httplive.bw-index", value, NULL)) { 1265 char *end; 1266 index = strtol(value, &end, 10); 1267 CHECK(end > value && *end == '\0'); 1268 1269 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 1270 index = mBandwidthItems.size() - 1; 1271 } 1272 } 1273 1274 if (index < 0) { 1275 char value[PROPERTY_VALUE_MAX]; 1276 if (property_get("media.httplive.max-bw", value, NULL)) { 1277 char *end; 1278 long maxBw = strtoul(value, &end, 10); 1279 if (end > value && *end == '\0') { 1280 if (maxBw > 0 && bandwidthBps > maxBw) { 1281 ALOGV("bandwidth capped to %ld bps", maxBw); 1282 bandwidthBps = maxBw; 1283 } 1284 } 1285 } 1286 1287 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1288 1289 index = mBandwidthItems.size() - 1; 1290 while (index > 0) { 1291 // be conservative (70%) to avoid overestimating and immediately 1292 // switching down again. 1293 size_t adjustedBandwidthBps = bandwidthBps * 7 / 10; 1294 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1295 break; 1296 } 1297 --index; 1298 } 1299 } 1300#elif 0 1301 // Change bandwidth at random() 1302 size_t index = uniformRand() * mBandwidthItems.size(); 1303#elif 0 1304 // There's a 50% chance to stay on the current bandwidth and 1305 // a 50% chance to switch to the next higher bandwidth (wrapping around 1306 // to lowest) 1307 const size_t kMinIndex = 0; 1308 1309 static ssize_t mCurBandwidthIndex = -1; 1310 1311 size_t index; 1312 if (mCurBandwidthIndex < 0) { 1313 index = kMinIndex; 1314 } else if (uniformRand() < 0.5) { 1315 index = (size_t)mCurBandwidthIndex; 1316 } else { 1317 index = mCurBandwidthIndex + 1; 1318 if (index == mBandwidthItems.size()) { 1319 index = kMinIndex; 1320 } 1321 } 1322 mCurBandwidthIndex = index; 1323#elif 0 1324 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1325 1326 size_t index = mBandwidthItems.size() - 1; 1327 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1328 --index; 1329 } 1330#elif 1 1331 char value[PROPERTY_VALUE_MAX]; 1332 size_t index; 1333 if (property_get("media.httplive.bw-index", value, NULL)) { 1334 char *end; 1335 index = strtoul(value, &end, 10); 1336 CHECK(end > value && *end == '\0'); 1337 1338 if (index >= mBandwidthItems.size()) { 1339 index = mBandwidthItems.size() - 1; 1340 } 1341 } else { 1342 index = 0; 1343 } 1344#else 1345 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1346#endif 1347 1348 CHECK_GE(index, 0); 1349 1350 return index; 1351} 1352 1353HLSTime LiveSession::latestMediaSegmentStartTime() const { 1354 HLSTime audioTime(mPacketSources.valueFor( 1355 STREAMTYPE_AUDIO)->getLatestDequeuedMeta()); 1356 1357 HLSTime videoTime(mPacketSources.valueFor( 1358 STREAMTYPE_VIDEO)->getLatestDequeuedMeta()); 1359 1360 return audioTime < videoTime ? videoTime : audioTime; 1361} 1362 1363void LiveSession::onSeek(const sp<AMessage> &msg) { 1364 int64_t timeUs; 1365 CHECK(msg->findInt64("timeUs", &timeUs)); 1366 changeConfiguration(timeUs); 1367} 1368 1369status_t LiveSession::getDuration(int64_t *durationUs) const { 1370 int64_t maxDurationUs = -1ll; 1371 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1372 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1373 1374 if (fetcherDurationUs > maxDurationUs) { 1375 maxDurationUs = fetcherDurationUs; 1376 } 1377 } 1378 1379 *durationUs = maxDurationUs; 1380 1381 return OK; 1382} 1383 1384bool LiveSession::isSeekable() const { 1385 int64_t durationUs; 1386 return getDuration(&durationUs) == OK && durationUs >= 0; 1387} 1388 1389bool LiveSession::hasDynamicDuration() const { 1390 return false; 1391} 1392 1393size_t LiveSession::getTrackCount() const { 1394 if (mPlaylist == NULL) { 1395 return 0; 1396 } else { 1397 return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0); 1398 } 1399} 1400 1401sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1402 if (mPlaylist == NULL) { 1403 return NULL; 1404 } else { 1405 if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) { 1406 sp<AMessage> format = new AMessage(); 1407 format->setInt32("type", MEDIA_TRACK_TYPE_METADATA); 1408 format->setString("language", "und"); 1409 format->setString("mime", MEDIA_MIMETYPE_DATA_TIMED_ID3); 1410 return format; 1411 } 1412 return mPlaylist->getTrackInfo(trackIndex); 1413 } 1414} 1415 1416status_t LiveSession::selectTrack(size_t index, bool select) { 1417 if (mPlaylist == NULL) { 1418 return INVALID_OPERATION; 1419 } 1420 1421 ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++", 1422 index, select, mSubtitleGeneration); 1423 1424 ++mSubtitleGeneration; 1425 status_t err = mPlaylist->selectTrack(index, select); 1426 if (err == OK) { 1427 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this); 1428 msg->setInt32("pickTrack", select); 1429 msg->post(); 1430 } 1431 return err; 1432} 1433 1434ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1435 if (mPlaylist == NULL) { 1436 return -1; 1437 } else { 1438 return mPlaylist->getSelectedTrack(type); 1439 } 1440} 1441 1442void LiveSession::changeConfiguration( 1443 int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { 1444 ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d", 1445 (long long)timeUs, bandwidthIndex, pickTrack); 1446 1447 cancelBandwidthSwitch(); 1448 1449 CHECK(!mReconfigurationInProgress); 1450 mReconfigurationInProgress = true; 1451 if (bandwidthIndex >= 0) { 1452 mOrigBandwidthIndex = mCurBandwidthIndex; 1453 mCurBandwidthIndex = bandwidthIndex; 1454 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1455 ALOGI("#### Starting Bandwidth Switch: %zd => %zd", 1456 mOrigBandwidthIndex, mCurBandwidthIndex); 1457 } 1458 } 1459 CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size()); 1460 const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); 1461 1462 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1463 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1464 1465 AString URIs[kMaxStreams]; 1466 for (size_t i = 0; i < kMaxStreams; ++i) { 1467 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1468 streamMask |= indexToType(i); 1469 } 1470 } 1471 1472 // Step 1, stop and discard fetchers that are no longer needed. 1473 // Pause those that we'll reuse. 1474 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1475 // skip fetchers that are marked mToBeRemoved, 1476 // these are done and can't be reused 1477 if (mFetcherInfos[i].mToBeRemoved) { 1478 continue; 1479 } 1480 1481 const AString &uri = mFetcherInfos.keyAt(i); 1482 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher; 1483 1484 bool discardFetcher = true, delayRemoval = false; 1485 for (size_t j = 0; j < kMaxStreams; ++j) { 1486 StreamType type = indexToType(j); 1487 if ((streamMask & type) && uri == URIs[j]) { 1488 resumeMask |= type; 1489 streamMask &= ~type; 1490 discardFetcher = false; 1491 } 1492 } 1493 // Delay fetcher removal if not picking tracks, AND old fetcher 1494 // has stream mask that overlaps new variant. (Okay to discard 1495 // old fetcher now, if completely no overlap.) 1496 if (discardFetcher && timeUs < 0ll && !pickTrack 1497 && (fetcher->getStreamTypeMask() & streamMask)) { 1498 discardFetcher = false; 1499 delayRemoval = true; 1500 } 1501 1502 if (discardFetcher) { 1503 ALOGV("discarding fetcher-%d", fetcher->getFetcherID()); 1504 fetcher->stopAsync(); 1505 } else { 1506 float threshold = 0.0f; // default to pause after current block (47Kbytes) 1507 bool disconnect = false; 1508 if (timeUs >= 0ll) { 1509 // seeking, no need to finish fetching 1510 disconnect = true; 1511 } else if (delayRemoval) { 1512 // adapting, abort if remaining of current segment is over threshold 1513 threshold = getAbortThreshold( 1514 mOrigBandwidthIndex, mCurBandwidthIndex); 1515 } 1516 1517 ALOGV("pausing fetcher-%d, threshold=%.2f", 1518 fetcher->getFetcherID(), threshold); 1519 fetcher->pauseAsync(threshold, disconnect); 1520 } 1521 } 1522 1523 sp<AMessage> msg; 1524 if (timeUs < 0ll) { 1525 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1526 msg = new AMessage(kWhatChangeConfiguration3, this); 1527 } else { 1528 msg = new AMessage(kWhatChangeConfiguration2, this); 1529 } 1530 msg->setInt32("streamMask", streamMask); 1531 msg->setInt32("resumeMask", resumeMask); 1532 msg->setInt32("pickTrack", pickTrack); 1533 msg->setInt64("timeUs", timeUs); 1534 for (size_t i = 0; i < kMaxStreams; ++i) { 1535 if ((streamMask | resumeMask) & indexToType(i)) { 1536 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1537 } 1538 } 1539 1540 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1541 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1542 // fetchers have completed their asynchronous operation, we'll post 1543 // mContinuation, which then is handled below in onChangeConfiguration2. 1544 mContinuationCounter = mFetcherInfos.size(); 1545 mContinuation = msg; 1546 1547 if (mContinuationCounter == 0) { 1548 msg->post(); 1549 } 1550} 1551 1552void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1553 ALOGV("onChangeConfiguration"); 1554 1555 if (!mReconfigurationInProgress) { 1556 int32_t pickTrack = 0; 1557 msg->findInt32("pickTrack", &pickTrack); 1558 changeConfiguration(-1ll /* timeUs */, -1, pickTrack); 1559 } else { 1560 msg->post(1000000ll); // retry in 1 sec 1561 } 1562} 1563 1564void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1565 ALOGV("onChangeConfiguration2"); 1566 1567 mContinuation.clear(); 1568 1569 // All fetchers are either suspended or have been removed now. 1570 1571 // If we're seeking, clear all packet sources before we report 1572 // seek complete, to prevent decoder from pulling stale data. 1573 int64_t timeUs; 1574 CHECK(msg->findInt64("timeUs", &timeUs)); 1575 1576 if (timeUs >= 0) { 1577 mLastSeekTimeUs = timeUs; 1578 mLastDequeuedTimeUs = timeUs; 1579 1580 for (size_t i = 0; i < mPacketSources.size(); i++) { 1581 mPacketSources.editValueAt(i)->clear(); 1582 } 1583 1584 for (size_t i = 0; i < kMaxStreams; ++i) { 1585 mStreams[i].reset(); 1586 } 1587 1588 mDiscontinuityOffsetTimesUs.clear(); 1589 mDiscontinuityAbsStartTimesUs.clear(); 1590 1591 if (mSeekReplyID != NULL) { 1592 CHECK(mSeekReply != NULL); 1593 mSeekReply->setInt32("err", OK); 1594 mSeekReply->postReply(mSeekReplyID); 1595 mSeekReplyID.clear(); 1596 mSeekReply.clear(); 1597 } 1598 1599 // restart buffer polling after seek becauese previous 1600 // buffering position is no longer valid. 1601 restartPollBuffering(); 1602 } 1603 1604 uint32_t streamMask, resumeMask; 1605 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1606 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1607 1608 streamMask |= resumeMask; 1609 1610 AString URIs[kMaxStreams]; 1611 for (size_t i = 0; i < kMaxStreams; ++i) { 1612 if (streamMask & indexToType(i)) { 1613 const AString &uriKey = mStreams[i].uriKey(); 1614 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1615 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1616 } 1617 } 1618 1619 uint32_t changedMask = 0; 1620 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1621 // stream URI could change even if onChangeConfiguration2 is only 1622 // used for seek. Seek could happen during a bw switch, in this 1623 // case bw switch will be cancelled, but the seekTo position will 1624 // fetch from the new URI. 1625 if ((mStreamMask & streamMask & indexToType(i)) 1626 && !mStreams[i].mUri.empty() 1627 && !(URIs[i] == mStreams[i].mUri)) { 1628 ALOGV("stream %zu changed: oldURI %s, newURI %s", i, 1629 mStreams[i].mUri.c_str(), URIs[i].c_str()); 1630 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i)); 1631 if (source->getLatestDequeuedMeta() != NULL) { 1632 source->queueDiscontinuity( 1633 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1634 } 1635 } 1636 // Determine which decoders to shutdown on the player side, 1637 // a decoder has to be shutdown if its streamtype was active 1638 // before but now longer isn't. 1639 if ((mStreamMask & ~streamMask & indexToType(i))) { 1640 changedMask |= indexToType(i); 1641 } 1642 } 1643 1644 if (changedMask == 0) { 1645 // If nothing changed as far as the audio/video decoders 1646 // are concerned we can proceed. 1647 onChangeConfiguration3(msg); 1648 return; 1649 } 1650 1651 // Something changed, inform the player which will shutdown the 1652 // corresponding decoders and will post the reply once that's done. 1653 // Handling the reply will continue executing below in 1654 // onChangeConfiguration3. 1655 sp<AMessage> notify = mNotify->dup(); 1656 notify->setInt32("what", kWhatStreamsChanged); 1657 notify->setInt32("changedMask", changedMask); 1658 1659 msg->setWhat(kWhatChangeConfiguration3); 1660 msg->setTarget(this); 1661 1662 notify->setMessage("reply", msg); 1663 notify->post(); 1664} 1665 1666void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1667 mContinuation.clear(); 1668 // All remaining fetchers are still suspended, the player has shutdown 1669 // any decoders that needed it. 1670 1671 uint32_t streamMask, resumeMask; 1672 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1673 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1674 1675 mNewStreamMask = streamMask | resumeMask; 1676 1677 int64_t timeUs; 1678 int32_t pickTrack; 1679 bool switching = false; 1680 CHECK(msg->findInt64("timeUs", &timeUs)); 1681 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1682 1683 if (timeUs < 0ll) { 1684 if (!pickTrack) { 1685 // mSwapMask contains streams that are in both old and new variant, 1686 // (in mNewStreamMask & mStreamMask) but with different URIs 1687 // (not in resumeMask). 1688 // For example, old variant has video and audio in two separate 1689 // URIs, and new variant has only audio with unchanged URI. mSwapMask 1690 // should be 0 as there is nothing to swap. We only need to stop video, 1691 // and resume audio. 1692 mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask; 1693 switching = (mSwapMask != 0); 1694 } 1695 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1696 } else { 1697 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1698 } 1699 1700 ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, " 1701 "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x", 1702 (long long)timeUs, switching, pickTrack, 1703 mStreamMask, mNewStreamMask, mSwapMask); 1704 1705 for (size_t i = 0; i < kMaxStreams; ++i) { 1706 if (streamMask & indexToType(i)) { 1707 if (switching) { 1708 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1709 } else { 1710 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1711 } 1712 } 1713 } 1714 1715 // Of all existing fetchers: 1716 // * Resume fetchers that are still needed and assign them original packet sources. 1717 // * Mark otherwise unneeded fetchers for removal. 1718 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1719 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1720 const AString &uri = mFetcherInfos.keyAt(i); 1721 if (!resumeFetcher(uri, resumeMask, timeUs)) { 1722 ALOGV("marking fetcher-%d to be removed", 1723 mFetcherInfos[i].mFetcher->getFetcherID()); 1724 1725 mFetcherInfos.editValueAt(i).mToBeRemoved = true; 1726 } 1727 } 1728 1729 // streamMask now only contains the types that need a new fetcher created. 1730 if (streamMask != 0) { 1731 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1732 } 1733 1734 // Find out when the original fetchers have buffered up to and start the new fetchers 1735 // at a later timestamp. 1736 for (size_t i = 0; i < kMaxStreams; i++) { 1737 if (!(indexToType(i) & streamMask)) { 1738 continue; 1739 } 1740 1741 AString uri; 1742 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1743 1744 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1745 CHECK(fetcher != NULL); 1746 1747 HLSTime startTime; 1748 SeekMode seekMode = kSeekModeExactPosition; 1749 sp<AnotherPacketSource> sources[kNumSources]; 1750 1751 if (i == kSubtitleIndex || (!pickTrack && !switching)) { 1752 startTime = latestMediaSegmentStartTime(); 1753 } 1754 1755 // TRICKY: looping from i as earlier streams are already removed from streamMask 1756 for (size_t j = i; j < kMaxStreams; ++j) { 1757 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1758 if ((streamMask & indexToType(j)) && uri == streamUri) { 1759 sources[j] = mPacketSources.valueFor(indexToType(j)); 1760 1761 if (timeUs >= 0) { 1762 startTime.mTimeUs = timeUs; 1763 } else { 1764 int32_t type; 1765 sp<AMessage> meta; 1766 if (!switching) { 1767 // selecting, or adapting but no swap required 1768 meta = sources[j]->getLatestDequeuedMeta(); 1769 } else { 1770 // adapting and swap required 1771 meta = sources[j]->getLatestEnqueuedMeta(); 1772 if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { 1773 // switching up 1774 meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); 1775 } 1776 } 1777 1778 if ((j == kAudioIndex || j == kVideoIndex) 1779 && meta != NULL && !meta->findInt32("discontinuity", &type)) { 1780 HLSTime tmpTime(meta); 1781 if (startTime < tmpTime) { 1782 startTime = tmpTime; 1783 } 1784 } 1785 1786 if (!switching) { 1787 // selecting, or adapting but no swap required 1788 sources[j]->clear(); 1789 if (j == kSubtitleIndex) { 1790 break; 1791 } 1792 1793 ALOGV("stream[%zu]: queue format change", j); 1794 sources[j]->queueDiscontinuity( 1795 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); 1796 } else { 1797 // switching, queue discontinuities after resume 1798 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1799 sources[j]->clear(); 1800 // the new fetcher might be providing streams that used to be 1801 // provided by two different fetchers, if one of the fetcher 1802 // paused in the middle while the other somehow paused in next 1803 // seg, we have to start from next seg. 1804 if (seekMode < mStreams[j].mSeekMode) { 1805 seekMode = mStreams[j].mSeekMode; 1806 } 1807 } 1808 } 1809 1810 streamMask &= ~indexToType(j); 1811 } 1812 } 1813 1814 ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld " 1815 "segmentStartTimeUs %lld seekMode %d", 1816 fetcher->getFetcherID(), 1817 (long long)startTime.mTimeUs, 1818 (long long)mLastSeekTimeUs, 1819 (long long)startTime.getSegmentTimeUs(), 1820 seekMode); 1821 1822 // Set the target segment start time to the middle point of the 1823 // segment where the last sample was. 1824 // This gives a better guess if segments of the two variants are not 1825 // perfectly aligned. (If the corresponding segment in new variant 1826 // starts slightly later than that in the old variant, we still want 1827 // to pick that segment, not the one before) 1828 fetcher->startAsync( 1829 sources[kAudioIndex], 1830 sources[kVideoIndex], 1831 sources[kSubtitleIndex], 1832 getMetadataSource(sources, mNewStreamMask, switching), 1833 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs, 1834 startTime.getSegmentTimeUs(), 1835 startTime.mSeq, 1836 seekMode); 1837 } 1838 1839 // All fetchers have now been started, the configuration change 1840 // has completed. 1841 1842 mReconfigurationInProgress = false; 1843 if (switching) { 1844 mSwitchInProgress = true; 1845 } else { 1846 mStreamMask = mNewStreamMask; 1847 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1848 ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd", 1849 mOrigBandwidthIndex, mCurBandwidthIndex); 1850 mOrigBandwidthIndex = mCurBandwidthIndex; 1851 } 1852 } 1853 1854 ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x", 1855 mSwitchInProgress, mStreamMask); 1856 1857 if (mDisconnectReplyID != NULL) { 1858 finishDisconnect(); 1859 } 1860} 1861 1862void LiveSession::swapPacketSource(StreamType stream) { 1863 ALOGV("[%s] swapPacketSource", getNameForStream(stream)); 1864 1865 // transfer packets from source2 to source 1866 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 1867 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 1868 1869 // queue discontinuity in mPacketSource 1870 aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false); 1871 1872 // queue packets in mPacketSource2 to mPacketSource 1873 status_t finalResult = OK; 1874 sp<ABuffer> accessUnit; 1875 while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK && 1876 OK == aps2->dequeueAccessUnit(&accessUnit)) { 1877 aps->queueAccessUnit(accessUnit); 1878 } 1879 aps2->clear(); 1880} 1881 1882void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { 1883 if (!mSwitchInProgress) { 1884 return; 1885 } 1886 1887 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 1888 if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { 1889 return; 1890 } 1891 1892 // Swap packet source of streams provided by old variant 1893 for (size_t idx = 0; idx < kMaxStreams; idx++) { 1894 StreamType stream = indexToType(idx); 1895 if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { 1896 swapPacketSource(stream); 1897 1898 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1899 ALOGW("swapping stream type %d %s to empty stream", 1900 stream, mStreams[idx].mUri.c_str()); 1901 } 1902 mStreams[idx].mUri = mStreams[idx].mNewUri; 1903 mStreams[idx].mNewUri.clear(); 1904 1905 mSwapMask &= ~stream; 1906 } 1907 } 1908 1909 mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); 1910 1911 ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask); 1912 if (mSwapMask != 0) { 1913 return; 1914 } 1915 1916 // Check if new variant contains extra streams. 1917 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1918 while (extraStreams) { 1919 StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1920 extraStreams &= ~stream; 1921 1922 swapPacketSource(stream); 1923 1924 ssize_t idx = typeToIndex(stream); 1925 CHECK(idx >= 0); 1926 if (mStreams[idx].mNewUri.empty()) { 1927 ALOGW("swapping extra stream type %d %s to empty stream", 1928 stream, mStreams[idx].mUri.c_str()); 1929 } 1930 mStreams[idx].mUri = mStreams[idx].mNewUri; 1931 mStreams[idx].mNewUri.clear(); 1932 } 1933 1934 // Restart new fetcher (it was paused after the first 47k block) 1935 // and let it fetch into mPacketSources (not mPacketSources2) 1936 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1937 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1938 if (info.mToBeResumed) { 1939 resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); 1940 info.mToBeResumed = false; 1941 } 1942 } 1943 1944 ALOGI("#### Finished Bandwidth Switch: %zd => %zd", 1945 mOrigBandwidthIndex, mCurBandwidthIndex); 1946 1947 mStreamMask = mNewStreamMask; 1948 mSwitchInProgress = false; 1949 mOrigBandwidthIndex = mCurBandwidthIndex; 1950 1951 restartPollBuffering(); 1952} 1953 1954void LiveSession::schedulePollBuffering() { 1955 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); 1956 msg->setInt32("generation", mPollBufferingGeneration); 1957 msg->post(1000000ll); 1958} 1959 1960void LiveSession::cancelPollBuffering() { 1961 ++mPollBufferingGeneration; 1962 mPrevBufferPercentage = -1; 1963} 1964 1965void LiveSession::restartPollBuffering() { 1966 cancelPollBuffering(); 1967 onPollBuffering(); 1968} 1969 1970void LiveSession::onPollBuffering() { 1971 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " 1972 "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x", 1973 mSwitchInProgress, mReconfigurationInProgress, 1974 mInPreparationPhase, mCurBandwidthIndex, mStreamMask); 1975 1976 bool underflow, ready, down, up; 1977 if (checkBuffering(underflow, ready, down, up)) { 1978 if (mInPreparationPhase) { 1979 // Allow down switch even if we're still preparing. 1980 // 1981 // Some streams have a high bandwidth index as default, 1982 // when bandwidth is low, it takes a long time to buffer 1983 // to ready mark, then it immediately pauses after start 1984 // as we have to do a down switch. It's better experience 1985 // to restart from a lower index, if we detect low bw. 1986 if (!switchBandwidthIfNeeded(false /* up */, down) && ready) { 1987 postPrepared(OK); 1988 } 1989 } 1990 1991 if (!mInPreparationPhase) { 1992 if (ready) { 1993 stopBufferingIfNecessary(); 1994 } else if (underflow) { 1995 startBufferingIfNecessary(); 1996 } 1997 switchBandwidthIfNeeded(up, down); 1998 } 1999 } 2000 2001 schedulePollBuffering(); 2002} 2003 2004void LiveSession::cancelBandwidthSwitch(bool resume) { 2005 ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", 2006 mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); 2007 if (!mSwitchInProgress) { 2008 return; 2009 } 2010 2011 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2012 FetcherInfo& info = mFetcherInfos.editValueAt(i); 2013 if (info.mToBeRemoved) { 2014 info.mToBeRemoved = false; 2015 if (resume) { 2016 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); 2017 } 2018 } 2019 } 2020 2021 for (size_t i = 0; i < kMaxStreams; ++i) { 2022 AString newUri = mStreams[i].mNewUri; 2023 if (!newUri.empty()) { 2024 // clear all mNewUri matching this newUri 2025 for (size_t j = i; j < kMaxStreams; ++j) { 2026 if (mStreams[j].mNewUri == newUri) { 2027 mStreams[j].mNewUri.clear(); 2028 } 2029 } 2030 ALOGV("stopping newUri = %s", newUri.c_str()); 2031 ssize_t index = mFetcherInfos.indexOfKey(newUri); 2032 if (index < 0) { 2033 ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); 2034 continue; 2035 } 2036 FetcherInfo &info = mFetcherInfos.editValueAt(index); 2037 info.mToBeRemoved = true; 2038 info.mFetcher->stopAsync(); 2039 } 2040 } 2041 2042 ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", 2043 mOrigBandwidthIndex, mCurBandwidthIndex); 2044 2045 mSwitchGeneration++; 2046 mSwitchInProgress = false; 2047 mCurBandwidthIndex = mOrigBandwidthIndex; 2048 mSwapMask = 0; 2049} 2050 2051bool LiveSession::checkBuffering( 2052 bool &underflow, bool &ready, bool &down, bool &up) { 2053 underflow = ready = down = up = false; 2054 2055 if (mReconfigurationInProgress) { 2056 ALOGV("Switch/Reconfig in progress, defer buffer polling"); 2057 return false; 2058 } 2059 2060 size_t activeCount, underflowCount, readyCount, downCount, upCount; 2061 activeCount = underflowCount = readyCount = downCount = upCount =0; 2062 int32_t minBufferPercent = -1; 2063 int64_t durationUs; 2064 if (getDuration(&durationUs) != OK) { 2065 durationUs = -1; 2066 } 2067 for (size_t i = 0; i < mPacketSources.size(); ++i) { 2068 // we don't check subtitles for buffering level 2069 if (!(mStreamMask & mPacketSources.keyAt(i) 2070 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { 2071 continue; 2072 } 2073 // ignore streams that never had any packet queued. 2074 // (it's possible that the variant only has audio or video) 2075 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 2076 if (meta == NULL) { 2077 continue; 2078 } 2079 2080 status_t finalResult; 2081 int64_t bufferedDurationUs = 2082 mPacketSources[i]->getBufferedDurationUs(&finalResult); 2083 ALOGV("[%s] buffered %lld us", 2084 getNameForStream(mPacketSources.keyAt(i)), 2085 (long long)bufferedDurationUs); 2086 if (durationUs >= 0) { 2087 int32_t percent; 2088 if (mPacketSources[i]->isFinished(0 /* duration */)) { 2089 percent = 100; 2090 } else { 2091 percent = (int32_t)(100.0 * 2092 (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); 2093 } 2094 if (minBufferPercent < 0 || percent < minBufferPercent) { 2095 minBufferPercent = percent; 2096 } 2097 } 2098 2099 ++activeCount; 2100 int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs; 2101 if (bufferedDurationUs > readyMark 2102 || mPacketSources[i]->isFinished(0)) { 2103 ++readyCount; 2104 } 2105 if (!mPacketSources[i]->isFinished(0)) { 2106 if (bufferedDurationUs < kUnderflowMarkUs) { 2107 ++underflowCount; 2108 } 2109 if (bufferedDurationUs > mUpSwitchMark) { 2110 ++upCount; 2111 } 2112 if (bufferedDurationUs < mDownSwitchMark) { 2113 ++downCount; 2114 } 2115 } 2116 } 2117 2118 if (minBufferPercent >= 0) { 2119 notifyBufferingUpdate(minBufferPercent); 2120 } 2121 2122 if (activeCount > 0) { 2123 up = (upCount == activeCount); 2124 down = (downCount > 0); 2125 ready = (readyCount == activeCount); 2126 underflow = (underflowCount > 0); 2127 return true; 2128 } 2129 2130 return false; 2131} 2132 2133void LiveSession::startBufferingIfNecessary() { 2134 ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2135 mInPreparationPhase, mBuffering); 2136 if (!mBuffering) { 2137 mBuffering = true; 2138 2139 sp<AMessage> notify = mNotify->dup(); 2140 notify->setInt32("what", kWhatBufferingStart); 2141 notify->post(); 2142 } 2143} 2144 2145void LiveSession::stopBufferingIfNecessary() { 2146 ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2147 mInPreparationPhase, mBuffering); 2148 2149 if (mBuffering) { 2150 mBuffering = false; 2151 2152 sp<AMessage> notify = mNotify->dup(); 2153 notify->setInt32("what", kWhatBufferingEnd); 2154 notify->post(); 2155 } 2156} 2157 2158void LiveSession::notifyBufferingUpdate(int32_t percentage) { 2159 if (percentage < mPrevBufferPercentage) { 2160 percentage = mPrevBufferPercentage; 2161 } else if (percentage > 100) { 2162 percentage = 100; 2163 } 2164 2165 mPrevBufferPercentage = percentage; 2166 2167 ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); 2168 2169 sp<AMessage> notify = mNotify->dup(); 2170 notify->setInt32("what", kWhatBufferingUpdate); 2171 notify->setInt32("percentage", percentage); 2172 notify->post(); 2173} 2174 2175/* 2176 * returns true if a bandwidth switch is actually needed (and started), 2177 * returns false otherwise 2178 */ 2179bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { 2180 // no need to check bandwidth if we only have 1 bandwidth settings 2181 if (mSwitchInProgress || mBandwidthItems.size() < 2) { 2182 return false; 2183 } 2184 2185 int32_t bandwidthBps; 2186 bool isStable; 2187 if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps, &isStable)) { 2188 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 2189 mLastBandwidthBps = bandwidthBps; 2190 } else { 2191 ALOGV("no bandwidth estimate."); 2192 return false; 2193 } 2194 2195 int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth; 2196 // canSwithDown and canSwitchUp can't both be true. 2197 // we only want to switch up when measured bw is 120% higher than current variant, 2198 // and we only want to switch down when measured bw is below current variant. 2199 bool canSwitchDown = bufferLow 2200 && (bandwidthBps < (int32_t)curBandwidth); 2201 bool canSwitchUp = bufferHigh 2202 && (bandwidthBps > (int32_t)curBandwidth * 12 / 10); 2203 2204 if (canSwitchDown || canSwitchUp) { 2205 // bandwidth estimating has some delay, if we have to downswitch when 2206 // it hasn't stabilized, be very conservative on bandwidth. 2207 if (!isStable && canSwitchDown) { 2208 bandwidthBps /= 2; 2209 } 2210 2211 ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps); 2212 2213 // it's possible that we're checking for canSwitchUp case, but the returned 2214 // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70% 2215 // of measured bw. In that case we don't want to do anything, since we have 2216 // both enough buffer and enough bw. 2217 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) 2218 || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) { 2219 // if not yet prepared, just restart again with new bw index. 2220 // this is faster and playback experience is cleaner. 2221 changeConfiguration( 2222 mInPreparationPhase ? 0 : -1ll, bandwidthIndex); 2223 return true; 2224 } 2225 } 2226 return false; 2227} 2228 2229void LiveSession::postError(status_t err) { 2230 // if we reached EOS, notify buffering of 100% 2231 if (err == ERROR_END_OF_STREAM) { 2232 notifyBufferingUpdate(100); 2233 } 2234 // we'll stop buffer polling now, before that notify 2235 // stop buffering to stop the spinning icon 2236 stopBufferingIfNecessary(); 2237 cancelPollBuffering(); 2238 2239 sp<AMessage> notify = mNotify->dup(); 2240 notify->setInt32("what", kWhatError); 2241 notify->setInt32("err", err); 2242 notify->post(); 2243} 2244 2245void LiveSession::postPrepared(status_t err) { 2246 CHECK(mInPreparationPhase); 2247 2248 sp<AMessage> notify = mNotify->dup(); 2249 if (err == OK || err == ERROR_END_OF_STREAM) { 2250 notify->setInt32("what", kWhatPrepared); 2251 } else { 2252 cancelPollBuffering(); 2253 2254 notify->setInt32("what", kWhatPreparationFailed); 2255 notify->setInt32("err", err); 2256 } 2257 2258 notify->post(); 2259 2260 mInPreparationPhase = false; 2261} 2262 2263 2264} // namespace android 2265 2266