LiveSession.cpp revision 5abbd3dcbb0bb32a3d4b90dddbcf90458967eb6f
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22#include "HTTPDownloader.h" 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "mpeg2ts/AnotherPacketSource.h" 27 28#include <cutils/properties.h> 29#include <media/IMediaHTTPService.h> 30#include <media/stagefright/foundation/ABuffer.h> 31#include <media/stagefright/foundation/ADebug.h> 32#include <media/stagefright/foundation/AMessage.h> 33#include <media/stagefright/foundation/AUtils.h> 34#include <media/stagefright/MediaDefs.h> 35#include <media/stagefright/MetaData.h> 36#include <media/stagefright/Utils.h> 37 38#include <utils/Mutex.h> 39 40#include <ctype.h> 41#include <inttypes.h> 42 43namespace android { 44 45// static 46// Bandwidth Switch Mark Defaults 47const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll; 48const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll; 49const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll; 50const int64_t LiveSession::kResumeThresholdUs = 100000ll; 51 52// Buffer Prepare/Ready/Underflow Marks 53const int64_t LiveSession::kReadyMarkUs = 5000000ll; 54const int64_t LiveSession::kPrepareMarkUs = 1500000ll; 55const int64_t LiveSession::kUnderflowMarkUs = 1000000ll; 56 57struct LiveSession::BandwidthEstimator : public RefBase { 58 BandwidthEstimator(); 59 60 void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); 61 bool estimateBandwidth(int32_t *bandwidth, bool *isStable = NULL); 62 63private: 64 // Bandwidth estimation parameters 65 static const int32_t kMinBandwidthHistoryItems = 20; 66 static const int64_t kMinBandwidthHistoryWindowUs = 5000000ll; // 5 sec 67 static const int64_t kMaxBandwidthHistoryWindowUs = 30000000ll; // 30 sec 68 69 struct BandwidthEntry { 70 int64_t mDelayUs; 71 size_t mNumBytes; 72 }; 73 74 Mutex mLock; 75 List<BandwidthEntry> mBandwidthHistory; 76 List<int32_t> mPrevEstimates; 77 bool mHasNewSample; 78 bool mIsStable; 79 int64_t mTotalTransferTimeUs; 80 size_t mTotalTransferBytes; 81 82 DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator); 83}; 84 85LiveSession::BandwidthEstimator::BandwidthEstimator() : 86 mHasNewSample(false), 87 mIsStable(true), 88 mTotalTransferTimeUs(0), 89 mTotalTransferBytes(0) { 90} 91 92void LiveSession::BandwidthEstimator::addBandwidthMeasurement( 93 size_t numBytes, int64_t delayUs) { 94 AutoMutex autoLock(mLock); 95 96 BandwidthEntry entry; 97 entry.mDelayUs = delayUs; 98 entry.mNumBytes = numBytes; 99 mTotalTransferTimeUs += delayUs; 100 mTotalTransferBytes += numBytes; 101 mBandwidthHistory.push_back(entry); 102 mHasNewSample = true; 103 104 // Remove no more than 10% of total transfer time at a time 105 // to avoid sudden jump on bandwidth estimation. There might 106 // be long blocking reads that takes up signification time, 107 // we have to keep a longer window in that case. 108 int64_t bandwidthHistoryWindowUs = mTotalTransferTimeUs * 9 / 10; 109 if (bandwidthHistoryWindowUs < kMinBandwidthHistoryWindowUs) { 110 bandwidthHistoryWindowUs = kMinBandwidthHistoryWindowUs; 111 } else if (bandwidthHistoryWindowUs > kMaxBandwidthHistoryWindowUs) { 112 bandwidthHistoryWindowUs = kMaxBandwidthHistoryWindowUs; 113 } 114 // trim old samples, keeping at least kMaxBandwidthHistoryItems samples, 115 // and total transfer time at least kMaxBandwidthHistoryWindowUs. 116 while (mBandwidthHistory.size() > kMinBandwidthHistoryItems) { 117 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 118 if (mTotalTransferTimeUs - it->mDelayUs < bandwidthHistoryWindowUs) { 119 break; 120 } 121 mTotalTransferTimeUs -= it->mDelayUs; 122 mTotalTransferBytes -= it->mNumBytes; 123 mBandwidthHistory.erase(mBandwidthHistory.begin()); 124 } 125} 126 127bool LiveSession::BandwidthEstimator::estimateBandwidth( 128 int32_t *bandwidthBps, bool *isStable) { 129 AutoMutex autoLock(mLock); 130 131 if (mBandwidthHistory.size() < 2) { 132 return false; 133 } 134 135 if (!mHasNewSample) { 136 *bandwidthBps = *(--mPrevEstimates.end()); 137 if (isStable) { 138 *isStable = mIsStable; 139 } 140 return true; 141 } 142 143 *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs); 144 mPrevEstimates.push_back(*bandwidthBps); 145 while (mPrevEstimates.size() > 3) { 146 mPrevEstimates.erase(mPrevEstimates.begin()); 147 } 148 mHasNewSample = false; 149 150 int32_t minEstimate = -1, maxEstimate = -1; 151 List<int32_t>::iterator it; 152 for (it = mPrevEstimates.begin(); it != mPrevEstimates.end(); it++) { 153 int32_t estimate = *it; 154 if (minEstimate < 0 || minEstimate > estimate) { 155 minEstimate = estimate; 156 } 157 if (maxEstimate < 0 || maxEstimate < estimate) { 158 maxEstimate = estimate; 159 } 160 } 161 mIsStable = (maxEstimate <= minEstimate * 4 / 3); 162 if (isStable) { 163 *isStable = mIsStable; 164 } 165#if 0 166 { 167 char dumpStr[1024] = {0}; 168 size_t itemIdx = 0; 169 size_t histSize = mBandwidthHistory.size(); 170 sprintf(dumpStr, "estimate bps=%d stable=%d history (n=%d): {", 171 *bandwidthBps, mIsStable, histSize); 172 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 173 for (; it != mBandwidthHistory.end(); ++it) { 174 if (itemIdx > 50) { 175 sprintf(dumpStr + strlen(dumpStr), 176 "...(%zd more items)... }", histSize - itemIdx); 177 break; 178 } 179 sprintf(dumpStr + strlen(dumpStr), "%dk/%.3fs%s", 180 it->mNumBytes / 1024, 181 (double)it->mDelayUs * 1.0e-6, 182 (it == (--mBandwidthHistory.end())) ? "}" : ", "); 183 itemIdx++; 184 } 185 ALOGE(dumpStr); 186 } 187#endif 188 return true; 189} 190 191//static 192const char *LiveSession::getKeyForStream(StreamType type) { 193 switch (type) { 194 case STREAMTYPE_VIDEO: 195 return "timeUsVideo"; 196 case STREAMTYPE_AUDIO: 197 return "timeUsAudio"; 198 case STREAMTYPE_SUBTITLES: 199 return "timeUsSubtitle"; 200 case STREAMTYPE_METADATA: 201 return "timeUsMetadata"; // unused 202 default: 203 TRESPASS(); 204 } 205 return NULL; 206} 207 208//static 209const char *LiveSession::getNameForStream(StreamType type) { 210 switch (type) { 211 case STREAMTYPE_VIDEO: 212 return "video"; 213 case STREAMTYPE_AUDIO: 214 return "audio"; 215 case STREAMTYPE_SUBTITLES: 216 return "subs"; 217 case STREAMTYPE_METADATA: 218 return "metadata"; 219 default: 220 break; 221 } 222 return "unknown"; 223} 224 225//static 226ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) { 227 switch (type) { 228 case STREAMTYPE_VIDEO: 229 return ATSParser::VIDEO; 230 case STREAMTYPE_AUDIO: 231 return ATSParser::AUDIO; 232 case STREAMTYPE_METADATA: 233 return ATSParser::META; 234 case STREAMTYPE_SUBTITLES: 235 default: 236 TRESPASS(); 237 } 238 return ATSParser::NUM_SOURCE_TYPES; // should not reach here 239} 240 241LiveSession::LiveSession( 242 const sp<AMessage> ¬ify, uint32_t flags, 243 const sp<IMediaHTTPService> &httpService) 244 : mNotify(notify), 245 mFlags(flags), 246 mHTTPService(httpService), 247 mBuffering(false), 248 mInPreparationPhase(true), 249 mPollBufferingGeneration(0), 250 mPrevBufferPercentage(-1), 251 mCurBandwidthIndex(-1), 252 mOrigBandwidthIndex(-1), 253 mLastBandwidthBps(-1ll), 254 mBandwidthEstimator(new BandwidthEstimator()), 255 mMaxWidth(720), 256 mMaxHeight(480), 257 mStreamMask(0), 258 mNewStreamMask(0), 259 mSwapMask(0), 260 mSwitchGeneration(0), 261 mSubtitleGeneration(0), 262 mLastDequeuedTimeUs(0ll), 263 mRealTimeBaseUs(0ll), 264 mReconfigurationInProgress(false), 265 mSwitchInProgress(false), 266 mUpSwitchMark(kUpSwitchMarkUs), 267 mDownSwitchMark(kDownSwitchMarkUs), 268 mUpSwitchMargin(kUpSwitchMarginUs), 269 mFirstTimeUsValid(false), 270 mFirstTimeUs(0), 271 mLastSeekTimeUs(0), 272 mHasMetadata(false) { 273 mStreams[kAudioIndex] = StreamItem("audio"); 274 mStreams[kVideoIndex] = StreamItem("video"); 275 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 276 277 for (size_t i = 0; i < kNumSources; ++i) { 278 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 279 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 280 } 281} 282 283LiveSession::~LiveSession() { 284 if (mFetcherLooper != NULL) { 285 mFetcherLooper->stop(); 286 } 287} 288 289int64_t LiveSession::calculateMediaTimeUs( 290 int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) { 291 if (timeUs >= firstTimeUs) { 292 timeUs -= firstTimeUs; 293 } else { 294 timeUs = 0; 295 } 296 timeUs += mLastSeekTimeUs; 297 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 298 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 299 } 300 return timeUs; 301} 302 303status_t LiveSession::dequeueAccessUnit( 304 StreamType stream, sp<ABuffer> *accessUnit) { 305 status_t finalResult = OK; 306 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 307 308 ssize_t streamIdx = typeToIndex(stream); 309 if (streamIdx < 0) { 310 return BAD_VALUE; 311 } 312 const char *streamStr = getNameForStream(stream); 313 // Do not let client pull data if we don't have data packets yet. 314 // We might only have a format discontinuity queued without data. 315 // When NuPlayerDecoder dequeues the format discontinuity, it will 316 // immediately try to getFormat. If we return NULL, NuPlayerDecoder 317 // thinks it can do seamless change, so will not shutdown decoder. 318 // When the actual format arrives, it can't handle it and get stuck. 319 if (!packetSource->hasDataBufferAvailable(&finalResult)) { 320 ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)", 321 streamStr, finalResult); 322 323 if (finalResult == OK) { 324 return -EAGAIN; 325 } else { 326 return finalResult; 327 } 328 } 329 330 // Let the client dequeue as long as we have buffers available 331 // Do not make pause/resume decisions here. 332 333 status_t err = packetSource->dequeueAccessUnit(accessUnit); 334 335 if (err == INFO_DISCONTINUITY) { 336 // adaptive streaming, discontinuities in the playlist 337 int32_t type; 338 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 339 340 sp<AMessage> extra; 341 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 342 extra.clear(); 343 } 344 345 ALOGI("[%s] read discontinuity of type %d, extra = %s", 346 streamStr, 347 type, 348 extra == NULL ? "NULL" : extra->debugString().c_str()); 349 } else if (err == OK) { 350 351 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 352 int64_t timeUs, originalTimeUs; 353 int32_t discontinuitySeq = 0; 354 StreamItem& strm = mStreams[streamIdx]; 355 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 356 originalTimeUs = timeUs; 357 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 358 if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { 359 int64_t offsetTimeUs; 360 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 361 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); 362 } else { 363 offsetTimeUs = 0; 364 } 365 366 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0 367 && strm.mLastDequeuedTimeUs >= 0) { 368 int64_t firstTimeUs; 369 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 370 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 371 offsetTimeUs += strm.mLastSampleDurationUs; 372 } else { 373 offsetTimeUs += strm.mLastSampleDurationUs; 374 } 375 376 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); 377 strm.mCurDiscontinuitySeq = discontinuitySeq; 378 } 379 380 int32_t discard = 0; 381 int64_t firstTimeUs; 382 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 383 int64_t durUs; // approximate sample duration 384 if (timeUs > strm.mLastDequeuedTimeUs) { 385 durUs = timeUs - strm.mLastDequeuedTimeUs; 386 } else { 387 durUs = strm.mLastDequeuedTimeUs - timeUs; 388 } 389 strm.mLastSampleDurationUs = durUs; 390 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 391 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 392 firstTimeUs = timeUs; 393 } else { 394 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 395 firstTimeUs = timeUs; 396 } 397 398 strm.mLastDequeuedTimeUs = timeUs; 399 timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq); 400 401 ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us", 402 streamStr, (long long)timeUs, (long long)originalTimeUs); 403 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 404 mLastDequeuedTimeUs = timeUs; 405 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 406 } else if (stream == STREAMTYPE_SUBTITLES) { 407 int32_t subtitleGeneration; 408 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 409 && subtitleGeneration != mSubtitleGeneration) { 410 return -EAGAIN; 411 }; 412 (*accessUnit)->meta()->setInt32( 413 "trackIndex", mPlaylist->getSelectedIndex()); 414 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 415 } else if (stream == STREAMTYPE_METADATA) { 416 HLSTime mdTime((*accessUnit)->meta()); 417 if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) { 418 packetSource->requeueAccessUnit((*accessUnit)); 419 return -EAGAIN; 420 } else { 421 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq); 422 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq); 423 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 424 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 425 } 426 } 427 } else { 428 ALOGI("[%s] encountered error %d", streamStr, err); 429 } 430 431 return err; 432} 433 434status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 435 if (!(mStreamMask & stream)) { 436 return UNKNOWN_ERROR; 437 } 438 439 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 440 441 sp<MetaData> meta = packetSource->getFormat(); 442 443 if (meta == NULL) { 444 return -EAGAIN; 445 } 446 447 if (stream == STREAMTYPE_AUDIO) { 448 // set AAC input buffer size to 32K bytes (256kbps x 1sec) 449 meta->setInt32(kKeyMaxInputSize, 32 * 1024); 450 } else if (stream == STREAMTYPE_VIDEO) { 451 meta->setInt32(kKeyMaxWidth, mMaxWidth); 452 meta->setInt32(kKeyMaxHeight, mMaxHeight); 453 } 454 455 return convertMetaDataToMessage(meta, format); 456} 457 458sp<HTTPDownloader> LiveSession::getHTTPDownloader() { 459 return new HTTPDownloader(mHTTPService, mExtraHeaders); 460} 461 462void LiveSession::connectAsync( 463 const char *url, const KeyedVector<String8, String8> *headers) { 464 sp<AMessage> msg = new AMessage(kWhatConnect, this); 465 msg->setString("url", url); 466 467 if (headers != NULL) { 468 msg->setPointer( 469 "headers", 470 new KeyedVector<String8, String8>(*headers)); 471 } 472 473 msg->post(); 474} 475 476status_t LiveSession::disconnect() { 477 sp<AMessage> msg = new AMessage(kWhatDisconnect, this); 478 479 sp<AMessage> response; 480 status_t err = msg->postAndAwaitResponse(&response); 481 482 return err; 483} 484 485status_t LiveSession::seekTo(int64_t timeUs) { 486 sp<AMessage> msg = new AMessage(kWhatSeek, this); 487 msg->setInt64("timeUs", timeUs); 488 489 sp<AMessage> response; 490 status_t err = msg->postAndAwaitResponse(&response); 491 492 return err; 493} 494 495bool LiveSession::checkSwitchProgress( 496 sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) { 497 AString newUri; 498 CHECK(stopParams->findString("uri", &newUri)); 499 500 *needResumeUntil = false; 501 sp<AMessage> firstNewMeta[kMaxStreams]; 502 for (size_t i = 0; i < kMaxStreams; ++i) { 503 StreamType stream = indexToType(i); 504 if (!(mSwapMask & mNewStreamMask & stream) 505 || (mStreams[i].mNewUri != newUri)) { 506 continue; 507 } 508 if (stream == STREAMTYPE_SUBTITLES) { 509 continue; 510 } 511 sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i); 512 513 // First, get latest dequeued meta, which is where the decoder is at. 514 // (when upswitching, we take the meta after a certain delay, so that 515 // the decoder is left with some cushion) 516 sp<AMessage> lastDequeueMeta, lastEnqueueMeta; 517 if (delayUs > 0) { 518 lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); 519 if (lastDequeueMeta == NULL) { 520 // this means we don't have enough cushion, try again later 521 ALOGV("[%s] up switching failed due to insufficient buffer", 522 getNameForStream(stream)); 523 return false; 524 } 525 } else { 526 // It's okay for lastDequeueMeta to be NULL here, it means the 527 // decoder hasn't even started dequeueing 528 lastDequeueMeta = source->getLatestDequeuedMeta(); 529 } 530 // Then, trim off packets at beginning of mPacketSources2 that's before 531 // the latest dequeued time. These samples are definitely too late. 532 firstNewMeta[i] = mPacketSources2.editValueAt(i) 533 ->trimBuffersBeforeMeta(lastDequeueMeta); 534 535 // Now firstNewMeta[i] is the first sample after the trim. 536 // If it's NULL, we failed because dequeue already past all samples 537 // in mPacketSource2, we have to try again. 538 if (firstNewMeta[i] == NULL) { 539 HLSTime dequeueTime(lastDequeueMeta); 540 ALOGV("[%s] dequeue time (%d, %lld) past start time", 541 getNameForStream(stream), 542 dequeueTime.mSeq, (long long) dequeueTime.mTimeUs); 543 return false; 544 } 545 546 // Otherwise, we check if mPacketSources2 overlaps with what old fetcher 547 // already fetched, and see if we need to resumeUntil 548 lastEnqueueMeta = source->getLatestEnqueuedMeta(); 549 // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity 550 // boundary, no need to resume as the content will look different anyways 551 if (lastEnqueueMeta != NULL) { 552 HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]); 553 554 // no need to resume old fetcher if new fetcher started in different 555 // discontinuity sequence, as the content will look different. 556 *needResumeUntil |= (startTime.mSeq == lastTime.mSeq 557 && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs); 558 559 // update the stopTime for resumeUntil 560 stopParams->setInt32("discontinuitySeq", startTime.mSeq); 561 stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs); 562 } 563 } 564 565 // if we're here, it means dequeue progress hasn't passed some samples in 566 // mPacketSource2, we can trim off the excess in mPacketSource. 567 // (old fetcher might still need to resumeUntil the start time of new fetcher) 568 for (size_t i = 0; i < kMaxStreams; ++i) { 569 StreamType stream = indexToType(i); 570 if (!(mSwapMask & mNewStreamMask & stream) 571 || (newUri != mStreams[i].mNewUri) 572 || stream == STREAMTYPE_SUBTITLES) { 573 continue; 574 } 575 mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]); 576 } 577 578 // no resumeUntil if already underflow 579 *needResumeUntil &= !mBuffering; 580 581 return true; 582} 583 584void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 585 switch (msg->what()) { 586 case kWhatConnect: 587 { 588 onConnect(msg); 589 break; 590 } 591 592 case kWhatDisconnect: 593 { 594 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 595 596 if (mReconfigurationInProgress) { 597 break; 598 } 599 600 finishDisconnect(); 601 break; 602 } 603 604 case kWhatSeek: 605 { 606 if (mReconfigurationInProgress) { 607 msg->post(50000); 608 break; 609 } 610 611 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 612 mSeekReply = new AMessage; 613 614 onSeek(msg); 615 break; 616 } 617 618 case kWhatFetcherNotify: 619 { 620 int32_t what; 621 CHECK(msg->findInt32("what", &what)); 622 623 switch (what) { 624 case PlaylistFetcher::kWhatStarted: 625 break; 626 case PlaylistFetcher::kWhatPaused: 627 case PlaylistFetcher::kWhatStopped: 628 { 629 AString uri; 630 CHECK(msg->findString("uri", &uri)); 631 ssize_t index = mFetcherInfos.indexOfKey(uri); 632 if (index < 0) { 633 // ignore msgs from fetchers that's already gone 634 break; 635 } 636 637 ALOGV("fetcher-%d %s", 638 mFetcherInfos[index].mFetcher->getFetcherID(), 639 what == PlaylistFetcher::kWhatPaused ? 640 "paused" : "stopped"); 641 642 if (what == PlaylistFetcher::kWhatStopped) { 643 mFetcherLooper->unregisterHandler( 644 mFetcherInfos[index].mFetcher->id()); 645 mFetcherInfos.removeItemsAt(index); 646 } else if (what == PlaylistFetcher::kWhatPaused) { 647 int32_t seekMode; 648 CHECK(msg->findInt32("seekMode", &seekMode)); 649 for (size_t i = 0; i < kMaxStreams; ++i) { 650 if (mStreams[i].mUri == uri) { 651 mStreams[i].mSeekMode = (SeekMode) seekMode; 652 } 653 } 654 } 655 656 if (mContinuation != NULL) { 657 CHECK_GT(mContinuationCounter, 0); 658 if (--mContinuationCounter == 0) { 659 mContinuation->post(); 660 } 661 ALOGV("%zu fetcher(s) left", mContinuationCounter); 662 } 663 break; 664 } 665 666 case PlaylistFetcher::kWhatDurationUpdate: 667 { 668 AString uri; 669 CHECK(msg->findString("uri", &uri)); 670 671 int64_t durationUs; 672 CHECK(msg->findInt64("durationUs", &durationUs)); 673 674 ssize_t index = mFetcherInfos.indexOfKey(uri); 675 if (index >= 0) { 676 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 677 info->mDurationUs = durationUs; 678 } 679 break; 680 } 681 682 case PlaylistFetcher::kWhatTargetDurationUpdate: 683 { 684 int64_t targetDurationUs; 685 CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); 686 mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4); 687 mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4); 688 mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs); 689 break; 690 } 691 692 case PlaylistFetcher::kWhatError: 693 { 694 status_t err; 695 CHECK(msg->findInt32("err", &err)); 696 697 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 698 699 // handle EOS on subtitle tracks independently 700 AString uri; 701 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 702 ssize_t i = mFetcherInfos.indexOfKey(uri); 703 if (i >= 0) { 704 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 705 if (fetcher != NULL) { 706 uint32_t type = fetcher->getStreamTypeMask(); 707 if (type == STREAMTYPE_SUBTITLES) { 708 mPacketSources.valueFor( 709 STREAMTYPE_SUBTITLES)->signalEOS(err);; 710 break; 711 } 712 } 713 } 714 } 715 716 if (mInPreparationPhase) { 717 postPrepared(err); 718 } 719 720 cancelBandwidthSwitch(); 721 722 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 723 724 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 725 726 mPacketSources.valueFor( 727 STREAMTYPE_SUBTITLES)->signalEOS(err); 728 729 postError(err); 730 break; 731 } 732 733 case PlaylistFetcher::kWhatStopReached: 734 { 735 ALOGV("kWhatStopReached"); 736 737 AString oldUri; 738 CHECK(msg->findString("uri", &oldUri)); 739 740 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 741 if (index < 0) { 742 break; 743 } 744 745 tryToFinishBandwidthSwitch(oldUri); 746 break; 747 } 748 749 case PlaylistFetcher::kWhatStartedAt: 750 { 751 int32_t switchGeneration; 752 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 753 754 ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d", 755 switchGeneration, mSwitchGeneration); 756 757 if (switchGeneration != mSwitchGeneration) { 758 break; 759 } 760 761 AString uri; 762 CHECK(msg->findString("uri", &uri)); 763 764 // mark new fetcher mToBeResumed 765 ssize_t index = mFetcherInfos.indexOfKey(uri); 766 if (index >= 0) { 767 mFetcherInfos.editValueAt(index).mToBeResumed = true; 768 } 769 770 // temporarily disable packet sources to be swapped to prevent 771 // NuPlayerDecoder from dequeuing while we check progress 772 for (size_t i = 0; i < mPacketSources.size(); ++i) { 773 if ((mSwapMask & mPacketSources.keyAt(i)) 774 && uri == mStreams[i].mNewUri) { 775 mPacketSources.editValueAt(i)->enable(false); 776 } 777 } 778 bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); 779 // If switching up, require a cushion bigger than kUnderflowMark 780 // to avoid buffering immediately after the switch. 781 // (If we don't have that cushion we'd rather cancel and try again.) 782 int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0; 783 bool needResumeUntil = false; 784 sp<AMessage> stopParams = msg; 785 if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { 786 // playback time hasn't passed startAt time 787 if (!needResumeUntil) { 788 ALOGV("finish switch"); 789 for (size_t i = 0; i < kMaxStreams; ++i) { 790 if ((mSwapMask & indexToType(i)) 791 && uri == mStreams[i].mNewUri) { 792 // have to make a copy of mStreams[i].mUri because 793 // tryToFinishBandwidthSwitch is modifying mStreams[] 794 AString oldURI = mStreams[i].mUri; 795 tryToFinishBandwidthSwitch(oldURI); 796 break; 797 } 798 } 799 } else { 800 // startAt time is after last enqueue time 801 // Resume fetcher for the original variant; the resumed fetcher should 802 // continue until the timestamps found in msg, which is stored by the 803 // new fetcher to indicate where the new variant has started buffering. 804 ALOGV("finish switch with resumeUntilAsync"); 805 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 806 const FetcherInfo &info = mFetcherInfos.valueAt(i); 807 if (info.mToBeRemoved) { 808 info.mFetcher->resumeUntilAsync(stopParams); 809 } 810 } 811 } 812 } else { 813 // playback time passed startAt time 814 if (switchUp) { 815 // if switching up, cancel and retry if condition satisfies again 816 ALOGV("cancel up switch because we're too late"); 817 cancelBandwidthSwitch(true /* resume */); 818 } else { 819 ALOGV("retry down switch at next sample"); 820 resumeFetcher(uri, mSwapMask, -1, true /* newUri */); 821 } 822 } 823 // re-enable all packet sources 824 for (size_t i = 0; i < mPacketSources.size(); ++i) { 825 mPacketSources.editValueAt(i)->enable(true); 826 } 827 828 break; 829 } 830 831 case PlaylistFetcher::kWhatPlaylistFetched: 832 { 833 onMasterPlaylistFetched(msg); 834 break; 835 } 836 837 case PlaylistFetcher::kWhatMetadataDetected: 838 { 839 if (!mHasMetadata) { 840 mHasMetadata = true; 841 sp<AMessage> notify = mNotify->dup(); 842 notify->setInt32("what", kWhatMetadataDetected); 843 notify->post(); 844 } 845 break; 846 } 847 848 default: 849 TRESPASS(); 850 } 851 852 break; 853 } 854 855 case kWhatChangeConfiguration: 856 { 857 onChangeConfiguration(msg); 858 break; 859 } 860 861 case kWhatChangeConfiguration2: 862 { 863 onChangeConfiguration2(msg); 864 break; 865 } 866 867 case kWhatChangeConfiguration3: 868 { 869 onChangeConfiguration3(msg); 870 break; 871 } 872 873 case kWhatPollBuffering: 874 { 875 int32_t generation; 876 CHECK(msg->findInt32("generation", &generation)); 877 if (generation == mPollBufferingGeneration) { 878 onPollBuffering(); 879 } 880 break; 881 } 882 883 default: 884 TRESPASS(); 885 break; 886 } 887} 888 889// static 890int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 891 if (a->mBandwidth < b->mBandwidth) { 892 return -1; 893 } else if (a->mBandwidth == b->mBandwidth) { 894 return 0; 895 } 896 897 return 1; 898} 899 900// static 901LiveSession::StreamType LiveSession::indexToType(int idx) { 902 CHECK(idx >= 0 && idx < kNumSources); 903 return (StreamType)(1 << idx); 904} 905 906// static 907ssize_t LiveSession::typeToIndex(int32_t type) { 908 switch (type) { 909 case STREAMTYPE_AUDIO: 910 return 0; 911 case STREAMTYPE_VIDEO: 912 return 1; 913 case STREAMTYPE_SUBTITLES: 914 return 2; 915 case STREAMTYPE_METADATA: 916 return 3; 917 default: 918 return -1; 919 }; 920 return -1; 921} 922 923void LiveSession::onConnect(const sp<AMessage> &msg) { 924 CHECK(msg->findString("url", &mMasterURL)); 925 926 // TODO currently we don't know if we are coming here from incognito mode 927 ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str()); 928 929 KeyedVector<String8, String8> *headers = NULL; 930 if (!msg->findPointer("headers", (void **)&headers)) { 931 mExtraHeaders.clear(); 932 } else { 933 mExtraHeaders = *headers; 934 935 delete headers; 936 headers = NULL; 937 } 938 939 // create looper for fetchers 940 if (mFetcherLooper == NULL) { 941 mFetcherLooper = new ALooper(); 942 943 mFetcherLooper->setName("Fetcher"); 944 mFetcherLooper->start(false, false); 945 } 946 947 // create fetcher to fetch the master playlist 948 addFetcher(mMasterURL.c_str())->fetchPlaylistAsync(); 949} 950 951void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) { 952 AString uri; 953 CHECK(msg->findString("uri", &uri)); 954 ssize_t index = mFetcherInfos.indexOfKey(uri); 955 if (index < 0) { 956 ALOGW("fetcher for master playlist is gone."); 957 return; 958 } 959 960 // no longer useful, remove 961 mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id()); 962 mFetcherInfos.removeItemsAt(index); 963 964 CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist)); 965 if (mPlaylist == NULL) { 966 ALOGE("unable to fetch master playlist %s.", 967 uriDebugString(mMasterURL).c_str()); 968 969 postPrepared(ERROR_IO); 970 return; 971 } 972 // We trust the content provider to make a reasonable choice of preferred 973 // initial bandwidth by listing it first in the variant playlist. 974 // At startup we really don't have a good estimate on the available 975 // network bandwidth since we haven't tranferred any data yet. Once 976 // we have we can make a better informed choice. 977 size_t initialBandwidth = 0; 978 size_t initialBandwidthIndex = 0; 979 980 int32_t maxWidth = 0; 981 int32_t maxHeight = 0; 982 983 if (mPlaylist->isVariantPlaylist()) { 984 Vector<BandwidthItem> itemsWithVideo; 985 for (size_t i = 0; i < mPlaylist->size(); ++i) { 986 BandwidthItem item; 987 988 item.mPlaylistIndex = i; 989 990 sp<AMessage> meta; 991 AString uri; 992 mPlaylist->itemAt(i, &uri, &meta); 993 994 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 995 996 int32_t width, height; 997 if (meta->findInt32("width", &width)) { 998 maxWidth = max(maxWidth, width); 999 } 1000 if (meta->findInt32("height", &height)) { 1001 maxHeight = max(maxHeight, height); 1002 } 1003 1004 mBandwidthItems.push(item); 1005 if (mPlaylist->hasType(i, "video")) { 1006 itemsWithVideo.push(item); 1007 } 1008 } 1009 // remove the audio-only variants if we have at least one with video 1010 if (!itemsWithVideo.empty() 1011 && itemsWithVideo.size() < mBandwidthItems.size()) { 1012 mBandwidthItems.clear(); 1013 for (size_t i = 0; i < itemsWithVideo.size(); ++i) { 1014 mBandwidthItems.push(itemsWithVideo[i]); 1015 } 1016 } 1017 1018 CHECK_GT(mBandwidthItems.size(), 0u); 1019 initialBandwidth = mBandwidthItems[0].mBandwidth; 1020 1021 mBandwidthItems.sort(SortByBandwidth); 1022 1023 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 1024 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 1025 initialBandwidthIndex = i; 1026 break; 1027 } 1028 } 1029 } else { 1030 // dummy item. 1031 BandwidthItem item; 1032 item.mPlaylistIndex = 0; 1033 item.mBandwidth = 0; 1034 mBandwidthItems.push(item); 1035 } 1036 1037 mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth; 1038 mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight; 1039 1040 mPlaylist->pickRandomMediaItems(); 1041 changeConfiguration( 1042 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 1043} 1044 1045void LiveSession::finishDisconnect() { 1046 ALOGV("finishDisconnect"); 1047 1048 // No reconfiguration is currently pending, make sure none will trigger 1049 // during disconnection either. 1050 cancelBandwidthSwitch(); 1051 1052 // cancel buffer polling 1053 cancelPollBuffering(); 1054 1055 // TRICKY: don't wait for all fetcher to be stopped when disconnecting 1056 // 1057 // Some fetchers might be stuck in connect/getSize at this point. These 1058 // operations will eventually timeout (as we have a timeout set in 1059 // MediaHTTPConnection), but we don't want to block the main UI thread 1060 // until then. Here we just need to make sure we clear all references 1061 // to the fetchers, so that when they finally exit from the blocking 1062 // operation, they can be destructed. 1063 // 1064 // There is one very tricky point though. For this scheme to work, the 1065 // fecther must hold a reference to LiveSession, so that LiveSession is 1066 // destroyed after fetcher. Otherwise LiveSession would get stuck in its 1067 // own destructor when it waits for mFetcherLooper to stop, which still 1068 // blocks main UI thread. 1069 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1070 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1071 mFetcherLooper->unregisterHandler( 1072 mFetcherInfos.valueAt(i).mFetcher->id()); 1073 } 1074 mFetcherInfos.clear(); 1075 1076 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 1077 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 1078 1079 mPacketSources.valueFor( 1080 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 1081 1082 sp<AMessage> response = new AMessage; 1083 response->setInt32("err", OK); 1084 1085 response->postReply(mDisconnectReplyID); 1086 mDisconnectReplyID.clear(); 1087} 1088 1089sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 1090 ssize_t index = mFetcherInfos.indexOfKey(uri); 1091 1092 if (index >= 0) { 1093 return NULL; 1094 } 1095 1096 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this); 1097 notify->setString("uri", uri); 1098 notify->setInt32("switchGeneration", mSwitchGeneration); 1099 1100 FetcherInfo info; 1101 info.mFetcher = new PlaylistFetcher( 1102 notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration); 1103 info.mDurationUs = -1ll; 1104 info.mToBeRemoved = false; 1105 info.mToBeResumed = false; 1106 mFetcherLooper->registerHandler(info.mFetcher); 1107 1108 mFetcherInfos.add(uri, info); 1109 1110 return info.mFetcher; 1111} 1112 1113#if 0 1114static double uniformRand() { 1115 return (double)rand() / RAND_MAX; 1116} 1117#endif 1118 1119bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) { 1120 ALOGI("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i, 1121 newUri ? "true" : "false", 1122 newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str()); 1123 return i >= 0 1124 && ((!newUri && uri == mStreams[i].mUri) 1125 || (newUri && uri == mStreams[i].mNewUri)); 1126} 1127 1128sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex( 1129 size_t trackIndex, bool newUri) { 1130 StreamType type = indexToType(trackIndex); 1131 sp<AnotherPacketSource> source = NULL; 1132 if (newUri) { 1133 source = mPacketSources2.valueFor(type); 1134 source->clear(); 1135 } else { 1136 source = mPacketSources.valueFor(type); 1137 }; 1138 return source; 1139} 1140 1141sp<AnotherPacketSource> LiveSession::getMetadataSource( 1142 sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) { 1143 // todo: One case where the following strategy can fail is when audio and video 1144 // are in separate playlists, both are transport streams, and the metadata 1145 // is actually contained in the audio stream. 1146 ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s", 1147 streamMask, newUri ? "true" : "false"); 1148 1149 if ((sources[kVideoIndex] != NULL) // video fetcher; or ... 1150 || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) { 1151 // ... audio fetcher for audio only variant 1152 return getPacketSourceForStreamIndex(kMetaDataIndex, newUri); 1153 } 1154 1155 return NULL; 1156} 1157 1158bool LiveSession::resumeFetcher( 1159 const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { 1160 ssize_t index = mFetcherInfos.indexOfKey(uri); 1161 if (index < 0) { 1162 ALOGE("did not find fetcher for uri: %s", uri.c_str()); 1163 return false; 1164 } 1165 1166 bool resume = false; 1167 sp<AnotherPacketSource> sources[kNumSources]; 1168 for (size_t i = 0; i < kMaxStreams; ++i) { 1169 if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) { 1170 resume = true; 1171 sources[i] = getPacketSourceForStreamIndex(i, newUri); 1172 } 1173 } 1174 1175 if (resume) { 1176 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher; 1177 SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; 1178 1179 ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d", 1180 fetcher->getFetcherID(), (long long)timeUs, seekMode); 1181 1182 fetcher->startAsync( 1183 sources[kAudioIndex], 1184 sources[kVideoIndex], 1185 sources[kSubtitleIndex], 1186 getMetadataSource(sources, streamMask, newUri), 1187 timeUs, -1, -1, seekMode); 1188 } 1189 1190 return resume; 1191} 1192 1193float LiveSession::getAbortThreshold( 1194 ssize_t currentBWIndex, ssize_t targetBWIndex) const { 1195 float abortThreshold = -1.0f; 1196 if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) { 1197 /* 1198 If we're switching down, we need to decide whether to 1199 1200 1) finish last segment of high-bandwidth variant, or 1201 2) abort last segment of high-bandwidth variant, and fetch an 1202 overlapping portion from low-bandwidth variant. 1203 1204 Here we try to maximize the amount of buffer left when the 1205 switch point is met. Given the following parameters: 1206 1207 B: our current buffering level in seconds 1208 T: target duration in seconds 1209 X: sample duration in seconds remain to fetch in last segment 1210 bw0: bandwidth of old variant (as specified in playlist) 1211 bw1: bandwidth of new variant (as specified in playlist) 1212 bw: measured bandwidth available 1213 1214 If we choose 1), when switch happens at the end of current 1215 segment, our buffering will be 1216 B + X - X * bw0 / bw 1217 1218 If we choose 2), when switch happens where we aborted current 1219 segment, our buffering will be 1220 B - (T - X) * bw1 / bw 1221 1222 We should only choose 1) if 1223 X/T < bw1 / (bw1 + bw0 - bw) 1224 */ 1225 1226 // Taking the measured current bandwidth at 50% face value only, 1227 // as our bandwidth estimation is a lagging indicator. Being 1228 // conservative on this, we prefer switching to lower bandwidth 1229 // unless we're really confident finishing up the last segment 1230 // of higher bandwidth will be fast. 1231 CHECK(mLastBandwidthBps >= 0); 1232 abortThreshold = 1233 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1234 / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1235 + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth 1236 - (float)mLastBandwidthBps * 0.5f); 1237 if (abortThreshold < 0.0f) { 1238 abortThreshold = -1.0f; // do not abort 1239 } 1240 ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f", 1241 mBandwidthItems.itemAt(currentBWIndex).mBandwidth, 1242 mBandwidthItems.itemAt(targetBWIndex).mBandwidth, 1243 mLastBandwidthBps, 1244 abortThreshold); 1245 } 1246 return abortThreshold; 1247} 1248 1249void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) { 1250 mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs); 1251} 1252 1253size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) { 1254 if (mBandwidthItems.size() < 2) { 1255 // shouldn't be here if we only have 1 bandwidth, check 1256 // logic to get rid of redundant bandwidth polling 1257 ALOGW("getBandwidthIndex() called for single bandwidth playlist!"); 1258 return 0; 1259 } 1260 1261#if 1 1262 char value[PROPERTY_VALUE_MAX]; 1263 ssize_t index = -1; 1264 if (property_get("media.httplive.bw-index", value, NULL)) { 1265 char *end; 1266 index = strtol(value, &end, 10); 1267 CHECK(end > value && *end == '\0'); 1268 1269 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 1270 index = mBandwidthItems.size() - 1; 1271 } 1272 } 1273 1274 if (index < 0) { 1275 char value[PROPERTY_VALUE_MAX]; 1276 if (property_get("media.httplive.max-bw", value, NULL)) { 1277 char *end; 1278 long maxBw = strtoul(value, &end, 10); 1279 if (end > value && *end == '\0') { 1280 if (maxBw > 0 && bandwidthBps > maxBw) { 1281 ALOGV("bandwidth capped to %ld bps", maxBw); 1282 bandwidthBps = maxBw; 1283 } 1284 } 1285 } 1286 1287 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1288 1289 index = mBandwidthItems.size() - 1; 1290 while (index > 0) { 1291 // be conservative (70%) to avoid overestimating and immediately 1292 // switching down again. 1293 size_t adjustedBandwidthBps = bandwidthBps * 7 / 10; 1294 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1295 break; 1296 } 1297 --index; 1298 } 1299 } 1300#elif 0 1301 // Change bandwidth at random() 1302 size_t index = uniformRand() * mBandwidthItems.size(); 1303#elif 0 1304 // There's a 50% chance to stay on the current bandwidth and 1305 // a 50% chance to switch to the next higher bandwidth (wrapping around 1306 // to lowest) 1307 const size_t kMinIndex = 0; 1308 1309 static ssize_t mCurBandwidthIndex = -1; 1310 1311 size_t index; 1312 if (mCurBandwidthIndex < 0) { 1313 index = kMinIndex; 1314 } else if (uniformRand() < 0.5) { 1315 index = (size_t)mCurBandwidthIndex; 1316 } else { 1317 index = mCurBandwidthIndex + 1; 1318 if (index == mBandwidthItems.size()) { 1319 index = kMinIndex; 1320 } 1321 } 1322 mCurBandwidthIndex = index; 1323#elif 0 1324 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1325 1326 size_t index = mBandwidthItems.size() - 1; 1327 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1328 --index; 1329 } 1330#elif 1 1331 char value[PROPERTY_VALUE_MAX]; 1332 size_t index; 1333 if (property_get("media.httplive.bw-index", value, NULL)) { 1334 char *end; 1335 index = strtoul(value, &end, 10); 1336 CHECK(end > value && *end == '\0'); 1337 1338 if (index >= mBandwidthItems.size()) { 1339 index = mBandwidthItems.size() - 1; 1340 } 1341 } else { 1342 index = 0; 1343 } 1344#else 1345 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1346#endif 1347 1348 CHECK_GE(index, 0); 1349 1350 return index; 1351} 1352 1353HLSTime LiveSession::latestMediaSegmentStartTime() const { 1354 HLSTime audioTime(mPacketSources.valueFor( 1355 STREAMTYPE_AUDIO)->getLatestDequeuedMeta()); 1356 1357 HLSTime videoTime(mPacketSources.valueFor( 1358 STREAMTYPE_VIDEO)->getLatestDequeuedMeta()); 1359 1360 return audioTime < videoTime ? videoTime : audioTime; 1361} 1362 1363void LiveSession::onSeek(const sp<AMessage> &msg) { 1364 int64_t timeUs; 1365 CHECK(msg->findInt64("timeUs", &timeUs)); 1366 changeConfiguration(timeUs); 1367} 1368 1369status_t LiveSession::getDuration(int64_t *durationUs) const { 1370 int64_t maxDurationUs = -1ll; 1371 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1372 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1373 1374 if (fetcherDurationUs > maxDurationUs) { 1375 maxDurationUs = fetcherDurationUs; 1376 } 1377 } 1378 1379 *durationUs = maxDurationUs; 1380 1381 return OK; 1382} 1383 1384bool LiveSession::isSeekable() const { 1385 int64_t durationUs; 1386 return getDuration(&durationUs) == OK && durationUs >= 0; 1387} 1388 1389bool LiveSession::hasDynamicDuration() const { 1390 return false; 1391} 1392 1393size_t LiveSession::getTrackCount() const { 1394 if (mPlaylist == NULL) { 1395 return 0; 1396 } else { 1397 return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0); 1398 } 1399} 1400 1401sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1402 if (mPlaylist == NULL) { 1403 return NULL; 1404 } else { 1405 if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) { 1406 sp<AMessage> format = new AMessage(); 1407 format->setInt32("type", MEDIA_TRACK_TYPE_METADATA); 1408 format->setString("language", "und"); 1409 format->setString("mime", MEDIA_MIMETYPE_DATA_METADATA); 1410 return format; 1411 } 1412 return mPlaylist->getTrackInfo(trackIndex); 1413 } 1414} 1415 1416status_t LiveSession::selectTrack(size_t index, bool select) { 1417 if (mPlaylist == NULL) { 1418 return INVALID_OPERATION; 1419 } 1420 1421 ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++", 1422 index, select, mSubtitleGeneration); 1423 1424 ++mSubtitleGeneration; 1425 status_t err = mPlaylist->selectTrack(index, select); 1426 if (err == OK) { 1427 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this); 1428 msg->setInt32("pickTrack", select); 1429 msg->post(); 1430 } 1431 return err; 1432} 1433 1434ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1435 if (mPlaylist == NULL) { 1436 return -1; 1437 } else { 1438 return mPlaylist->getSelectedTrack(type); 1439 } 1440} 1441 1442void LiveSession::changeConfiguration( 1443 int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { 1444 ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d", 1445 (long long)timeUs, bandwidthIndex, pickTrack); 1446 1447 cancelBandwidthSwitch(); 1448 1449 CHECK(!mReconfigurationInProgress); 1450 mReconfigurationInProgress = true; 1451 if (bandwidthIndex >= 0) { 1452 mOrigBandwidthIndex = mCurBandwidthIndex; 1453 mCurBandwidthIndex = bandwidthIndex; 1454 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1455 ALOGI("#### Starting Bandwidth Switch: %zd => %zd", 1456 mOrigBandwidthIndex, mCurBandwidthIndex); 1457 } 1458 } 1459 CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size()); 1460 const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); 1461 1462 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1463 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1464 1465 AString URIs[kMaxStreams]; 1466 for (size_t i = 0; i < kMaxStreams; ++i) { 1467 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1468 streamMask |= indexToType(i); 1469 } 1470 } 1471 1472 // Step 1, stop and discard fetchers that are no longer needed. 1473 // Pause those that we'll reuse. 1474 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1475 // skip fetchers that are marked mToBeRemoved, 1476 // these are done and can't be reused 1477 if (mFetcherInfos[i].mToBeRemoved) { 1478 continue; 1479 } 1480 1481 const AString &uri = mFetcherInfos.keyAt(i); 1482 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher; 1483 1484 bool discardFetcher = true, delayRemoval = false; 1485 for (size_t j = 0; j < kMaxStreams; ++j) { 1486 StreamType type = indexToType(j); 1487 if ((streamMask & type) && uri == URIs[j]) { 1488 resumeMask |= type; 1489 streamMask &= ~type; 1490 discardFetcher = false; 1491 } 1492 } 1493 // Delay fetcher removal if not picking tracks, AND old fetcher 1494 // has stream mask that overlaps new variant. (Okay to discard 1495 // old fetcher now, if completely no overlap.) 1496 if (discardFetcher && timeUs < 0ll && !pickTrack 1497 && (fetcher->getStreamTypeMask() & streamMask)) { 1498 discardFetcher = false; 1499 delayRemoval = true; 1500 } 1501 1502 if (discardFetcher) { 1503 ALOGV("discarding fetcher-%d", fetcher->getFetcherID()); 1504 fetcher->stopAsync(); 1505 } else { 1506 float threshold = -1.0f; // always finish fetching by default 1507 bool disconnect = false; 1508 if (timeUs >= 0ll) { 1509 // seeking, no need to finish fetching 1510 threshold = 0.0f; 1511 disconnect = true; 1512 } else if (delayRemoval) { 1513 // adapting, abort if remaining of current segment is over threshold 1514 threshold = getAbortThreshold( 1515 mOrigBandwidthIndex, mCurBandwidthIndex); 1516 } 1517 1518 ALOGV("pausing fetcher-%d, threshold=%.2f", 1519 fetcher->getFetcherID(), threshold); 1520 fetcher->pauseAsync(threshold, disconnect); 1521 } 1522 } 1523 1524 sp<AMessage> msg; 1525 if (timeUs < 0ll) { 1526 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1527 msg = new AMessage(kWhatChangeConfiguration3, this); 1528 } else { 1529 msg = new AMessage(kWhatChangeConfiguration2, this); 1530 } 1531 msg->setInt32("streamMask", streamMask); 1532 msg->setInt32("resumeMask", resumeMask); 1533 msg->setInt32("pickTrack", pickTrack); 1534 msg->setInt64("timeUs", timeUs); 1535 for (size_t i = 0; i < kMaxStreams; ++i) { 1536 if ((streamMask | resumeMask) & indexToType(i)) { 1537 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1538 } 1539 } 1540 1541 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1542 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1543 // fetchers have completed their asynchronous operation, we'll post 1544 // mContinuation, which then is handled below in onChangeConfiguration2. 1545 mContinuationCounter = mFetcherInfos.size(); 1546 mContinuation = msg; 1547 1548 if (mContinuationCounter == 0) { 1549 msg->post(); 1550 } 1551} 1552 1553void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1554 ALOGV("onChangeConfiguration"); 1555 1556 if (!mReconfigurationInProgress) { 1557 int32_t pickTrack = 0; 1558 msg->findInt32("pickTrack", &pickTrack); 1559 changeConfiguration(-1ll /* timeUs */, -1, pickTrack); 1560 } else { 1561 msg->post(1000000ll); // retry in 1 sec 1562 } 1563} 1564 1565void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1566 ALOGV("onChangeConfiguration2"); 1567 1568 mContinuation.clear(); 1569 1570 // All fetchers are either suspended or have been removed now. 1571 1572 // If we're seeking, clear all packet sources before we report 1573 // seek complete, to prevent decoder from pulling stale data. 1574 int64_t timeUs; 1575 CHECK(msg->findInt64("timeUs", &timeUs)); 1576 1577 if (timeUs >= 0) { 1578 mLastSeekTimeUs = timeUs; 1579 mLastDequeuedTimeUs = timeUs; 1580 1581 for (size_t i = 0; i < mPacketSources.size(); i++) { 1582 mPacketSources.editValueAt(i)->clear(); 1583 } 1584 1585 for (size_t i = 0; i < kMaxStreams; ++i) { 1586 mStreams[i].reset(); 1587 } 1588 1589 mDiscontinuityOffsetTimesUs.clear(); 1590 mDiscontinuityAbsStartTimesUs.clear(); 1591 1592 if (mSeekReplyID != NULL) { 1593 CHECK(mSeekReply != NULL); 1594 mSeekReply->setInt32("err", OK); 1595 mSeekReply->postReply(mSeekReplyID); 1596 mSeekReplyID.clear(); 1597 mSeekReply.clear(); 1598 } 1599 1600 // restart buffer polling after seek becauese previous 1601 // buffering position is no longer valid. 1602 restartPollBuffering(); 1603 } 1604 1605 uint32_t streamMask, resumeMask; 1606 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1607 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1608 1609 streamMask |= resumeMask; 1610 1611 AString URIs[kMaxStreams]; 1612 for (size_t i = 0; i < kMaxStreams; ++i) { 1613 if (streamMask & indexToType(i)) { 1614 const AString &uriKey = mStreams[i].uriKey(); 1615 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1616 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1617 } 1618 } 1619 1620 uint32_t changedMask = 0; 1621 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1622 // stream URI could change even if onChangeConfiguration2 is only 1623 // used for seek. Seek could happen during a bw switch, in this 1624 // case bw switch will be cancelled, but the seekTo position will 1625 // fetch from the new URI. 1626 if ((mStreamMask & streamMask & indexToType(i)) 1627 && !mStreams[i].mUri.empty() 1628 && !(URIs[i] == mStreams[i].mUri)) { 1629 ALOGV("stream %zu changed: oldURI %s, newURI %s", i, 1630 mStreams[i].mUri.c_str(), URIs[i].c_str()); 1631 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i)); 1632 if (source->getLatestDequeuedMeta() != NULL) { 1633 source->queueDiscontinuity( 1634 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1635 } 1636 } 1637 // Determine which decoders to shutdown on the player side, 1638 // a decoder has to be shutdown if its streamtype was active 1639 // before but now longer isn't. 1640 if ((mStreamMask & ~streamMask & indexToType(i))) { 1641 changedMask |= indexToType(i); 1642 } 1643 } 1644 1645 if (changedMask == 0) { 1646 // If nothing changed as far as the audio/video decoders 1647 // are concerned we can proceed. 1648 onChangeConfiguration3(msg); 1649 return; 1650 } 1651 1652 // Something changed, inform the player which will shutdown the 1653 // corresponding decoders and will post the reply once that's done. 1654 // Handling the reply will continue executing below in 1655 // onChangeConfiguration3. 1656 sp<AMessage> notify = mNotify->dup(); 1657 notify->setInt32("what", kWhatStreamsChanged); 1658 notify->setInt32("changedMask", changedMask); 1659 1660 msg->setWhat(kWhatChangeConfiguration3); 1661 msg->setTarget(this); 1662 1663 notify->setMessage("reply", msg); 1664 notify->post(); 1665} 1666 1667void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1668 mContinuation.clear(); 1669 // All remaining fetchers are still suspended, the player has shutdown 1670 // any decoders that needed it. 1671 1672 uint32_t streamMask, resumeMask; 1673 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1674 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1675 1676 mNewStreamMask = streamMask | resumeMask; 1677 1678 int64_t timeUs; 1679 int32_t pickTrack; 1680 bool switching = false; 1681 CHECK(msg->findInt64("timeUs", &timeUs)); 1682 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1683 1684 if (timeUs < 0ll) { 1685 if (!pickTrack) { 1686 // mSwapMask contains streams that are in both old and new variant, 1687 // (in mNewStreamMask & mStreamMask) but with different URIs 1688 // (not in resumeMask). 1689 // For example, old variant has video and audio in two separate 1690 // URIs, and new variant has only audio with unchanged URI. mSwapMask 1691 // should be 0 as there is nothing to swap. We only need to stop video, 1692 // and resume audio. 1693 mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask; 1694 switching = (mSwapMask != 0); 1695 } 1696 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1697 } else { 1698 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1699 } 1700 1701 ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, " 1702 "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x", 1703 (long long)timeUs, switching, pickTrack, 1704 mStreamMask, mNewStreamMask, mSwapMask); 1705 1706 for (size_t i = 0; i < kMaxStreams; ++i) { 1707 if (streamMask & indexToType(i)) { 1708 if (switching) { 1709 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1710 } else { 1711 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1712 } 1713 } 1714 } 1715 1716 // Of all existing fetchers: 1717 // * Resume fetchers that are still needed and assign them original packet sources. 1718 // * Mark otherwise unneeded fetchers for removal. 1719 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1720 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1721 const AString &uri = mFetcherInfos.keyAt(i); 1722 if (!resumeFetcher(uri, resumeMask, timeUs)) { 1723 ALOGV("marking fetcher-%d to be removed", 1724 mFetcherInfos[i].mFetcher->getFetcherID()); 1725 1726 mFetcherInfos.editValueAt(i).mToBeRemoved = true; 1727 } 1728 } 1729 1730 // streamMask now only contains the types that need a new fetcher created. 1731 if (streamMask != 0) { 1732 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1733 } 1734 1735 // Find out when the original fetchers have buffered up to and start the new fetchers 1736 // at a later timestamp. 1737 for (size_t i = 0; i < kMaxStreams; i++) { 1738 if (!(indexToType(i) & streamMask)) { 1739 continue; 1740 } 1741 1742 AString uri; 1743 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1744 1745 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1746 CHECK(fetcher != NULL); 1747 1748 HLSTime startTime; 1749 SeekMode seekMode = kSeekModeExactPosition; 1750 sp<AnotherPacketSource> sources[kNumSources]; 1751 1752 if (i == kSubtitleIndex || (!pickTrack && !switching)) { 1753 startTime = latestMediaSegmentStartTime(); 1754 } 1755 1756 // TRICKY: looping from i as earlier streams are already removed from streamMask 1757 for (size_t j = i; j < kMaxStreams; ++j) { 1758 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1759 if ((streamMask & indexToType(j)) && uri == streamUri) { 1760 sources[j] = mPacketSources.valueFor(indexToType(j)); 1761 1762 if (timeUs >= 0) { 1763 startTime.mTimeUs = timeUs; 1764 } else { 1765 int32_t type; 1766 sp<AMessage> meta; 1767 if (!switching) { 1768 // selecting, or adapting but no swap required 1769 meta = sources[j]->getLatestDequeuedMeta(); 1770 } else { 1771 // adapting and swap required 1772 meta = sources[j]->getLatestEnqueuedMeta(); 1773 if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { 1774 // switching up 1775 meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); 1776 } 1777 } 1778 1779 if ((j == kAudioIndex || j == kVideoIndex) 1780 && meta != NULL && !meta->findInt32("discontinuity", &type)) { 1781 HLSTime tmpTime(meta); 1782 if (startTime < tmpTime) { 1783 startTime = tmpTime; 1784 } 1785 } 1786 1787 if (!switching) { 1788 // selecting, or adapting but no swap required 1789 sources[j]->clear(); 1790 if (j == kSubtitleIndex) { 1791 break; 1792 } 1793 1794 ALOGV("stream[%zu]: queue format change", j); 1795 sources[j]->queueDiscontinuity( 1796 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); 1797 } else { 1798 // switching, queue discontinuities after resume 1799 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1800 sources[j]->clear(); 1801 // the new fetcher might be providing streams that used to be 1802 // provided by two different fetchers, if one of the fetcher 1803 // paused in the middle while the other somehow paused in next 1804 // seg, we have to start from next seg. 1805 if (seekMode < mStreams[j].mSeekMode) { 1806 seekMode = mStreams[j].mSeekMode; 1807 } 1808 } 1809 } 1810 1811 streamMask &= ~indexToType(j); 1812 } 1813 } 1814 1815 ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld " 1816 "segmentStartTimeUs %lld seekMode %d", 1817 fetcher->getFetcherID(), 1818 (long long)startTime.mTimeUs, 1819 (long long)mLastSeekTimeUs, 1820 (long long)startTime.getSegmentTimeUs(), 1821 seekMode); 1822 1823 // Set the target segment start time to the middle point of the 1824 // segment where the last sample was. 1825 // This gives a better guess if segments of the two variants are not 1826 // perfectly aligned. (If the corresponding segment in new variant 1827 // starts slightly later than that in the old variant, we still want 1828 // to pick that segment, not the one before) 1829 fetcher->startAsync( 1830 sources[kAudioIndex], 1831 sources[kVideoIndex], 1832 sources[kSubtitleIndex], 1833 getMetadataSource(sources, mNewStreamMask, switching), 1834 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs, 1835 startTime.getSegmentTimeUs(), 1836 startTime.mSeq, 1837 seekMode); 1838 } 1839 1840 // All fetchers have now been started, the configuration change 1841 // has completed. 1842 1843 mReconfigurationInProgress = false; 1844 if (switching) { 1845 mSwitchInProgress = true; 1846 } else { 1847 mStreamMask = mNewStreamMask; 1848 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1849 ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd", 1850 mOrigBandwidthIndex, mCurBandwidthIndex); 1851 mOrigBandwidthIndex = mCurBandwidthIndex; 1852 } 1853 } 1854 1855 ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x", 1856 mSwitchInProgress, mStreamMask); 1857 1858 if (mDisconnectReplyID != NULL) { 1859 finishDisconnect(); 1860 } 1861} 1862 1863void LiveSession::swapPacketSource(StreamType stream) { 1864 ALOGV("[%s] swapPacketSource", getNameForStream(stream)); 1865 1866 // transfer packets from source2 to source 1867 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 1868 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 1869 1870 // queue discontinuity in mPacketSource 1871 aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false); 1872 1873 // queue packets in mPacketSource2 to mPacketSource 1874 status_t finalResult = OK; 1875 sp<ABuffer> accessUnit; 1876 while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK && 1877 OK == aps2->dequeueAccessUnit(&accessUnit)) { 1878 aps->queueAccessUnit(accessUnit); 1879 } 1880 aps2->clear(); 1881} 1882 1883void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { 1884 if (!mSwitchInProgress) { 1885 return; 1886 } 1887 1888 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 1889 if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { 1890 return; 1891 } 1892 1893 // Swap packet source of streams provided by old variant 1894 for (size_t idx = 0; idx < kMaxStreams; idx++) { 1895 StreamType stream = indexToType(idx); 1896 if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { 1897 swapPacketSource(stream); 1898 1899 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1900 ALOGW("swapping stream type %d %s to empty stream", 1901 stream, mStreams[idx].mUri.c_str()); 1902 } 1903 mStreams[idx].mUri = mStreams[idx].mNewUri; 1904 mStreams[idx].mNewUri.clear(); 1905 1906 mSwapMask &= ~stream; 1907 } 1908 } 1909 1910 mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); 1911 1912 ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask); 1913 if (mSwapMask != 0) { 1914 return; 1915 } 1916 1917 // Check if new variant contains extra streams. 1918 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1919 while (extraStreams) { 1920 StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1921 extraStreams &= ~stream; 1922 1923 swapPacketSource(stream); 1924 1925 ssize_t idx = typeToIndex(stream); 1926 CHECK(idx >= 0); 1927 if (mStreams[idx].mNewUri.empty()) { 1928 ALOGW("swapping extra stream type %d %s to empty stream", 1929 stream, mStreams[idx].mUri.c_str()); 1930 } 1931 mStreams[idx].mUri = mStreams[idx].mNewUri; 1932 mStreams[idx].mNewUri.clear(); 1933 } 1934 1935 // Restart new fetcher (it was paused after the first 47k block) 1936 // and let it fetch into mPacketSources (not mPacketSources2) 1937 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1938 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1939 if (info.mToBeResumed) { 1940 resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); 1941 info.mToBeResumed = false; 1942 } 1943 } 1944 1945 ALOGI("#### Finished Bandwidth Switch: %zd => %zd", 1946 mOrigBandwidthIndex, mCurBandwidthIndex); 1947 1948 mStreamMask = mNewStreamMask; 1949 mSwitchInProgress = false; 1950 mOrigBandwidthIndex = mCurBandwidthIndex; 1951 1952 restartPollBuffering(); 1953} 1954 1955void LiveSession::schedulePollBuffering() { 1956 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); 1957 msg->setInt32("generation", mPollBufferingGeneration); 1958 msg->post(1000000ll); 1959} 1960 1961void LiveSession::cancelPollBuffering() { 1962 ++mPollBufferingGeneration; 1963 mPrevBufferPercentage = -1; 1964} 1965 1966void LiveSession::restartPollBuffering() { 1967 cancelPollBuffering(); 1968 onPollBuffering(); 1969} 1970 1971void LiveSession::onPollBuffering() { 1972 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " 1973 "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x", 1974 mSwitchInProgress, mReconfigurationInProgress, 1975 mInPreparationPhase, mCurBandwidthIndex, mStreamMask); 1976 1977 bool underflow, ready, down, up; 1978 if (checkBuffering(underflow, ready, down, up)) { 1979 if (mInPreparationPhase) { 1980 // Allow down switch even if we're still preparing. 1981 // 1982 // Some streams have a high bandwidth index as default, 1983 // when bandwidth is low, it takes a long time to buffer 1984 // to ready mark, then it immediately pauses after start 1985 // as we have to do a down switch. It's better experience 1986 // to restart from a lower index, if we detect low bw. 1987 if (!switchBandwidthIfNeeded(false /* up */, down) && ready) { 1988 postPrepared(OK); 1989 } 1990 } 1991 1992 if (!mInPreparationPhase) { 1993 if (ready) { 1994 stopBufferingIfNecessary(); 1995 } else if (underflow) { 1996 startBufferingIfNecessary(); 1997 } 1998 switchBandwidthIfNeeded(up, down); 1999 } 2000 } 2001 2002 schedulePollBuffering(); 2003} 2004 2005void LiveSession::cancelBandwidthSwitch(bool resume) { 2006 ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", 2007 mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); 2008 if (!mSwitchInProgress) { 2009 return; 2010 } 2011 2012 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2013 FetcherInfo& info = mFetcherInfos.editValueAt(i); 2014 if (info.mToBeRemoved) { 2015 info.mToBeRemoved = false; 2016 if (resume) { 2017 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); 2018 } 2019 } 2020 } 2021 2022 for (size_t i = 0; i < kMaxStreams; ++i) { 2023 AString newUri = mStreams[i].mNewUri; 2024 if (!newUri.empty()) { 2025 // clear all mNewUri matching this newUri 2026 for (size_t j = i; j < kMaxStreams; ++j) { 2027 if (mStreams[j].mNewUri == newUri) { 2028 mStreams[j].mNewUri.clear(); 2029 } 2030 } 2031 ALOGV("stopping newUri = %s", newUri.c_str()); 2032 ssize_t index = mFetcherInfos.indexOfKey(newUri); 2033 if (index < 0) { 2034 ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); 2035 continue; 2036 } 2037 FetcherInfo &info = mFetcherInfos.editValueAt(index); 2038 info.mToBeRemoved = true; 2039 info.mFetcher->stopAsync(); 2040 } 2041 } 2042 2043 ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", 2044 mOrigBandwidthIndex, mCurBandwidthIndex); 2045 2046 mSwitchGeneration++; 2047 mSwitchInProgress = false; 2048 mCurBandwidthIndex = mOrigBandwidthIndex; 2049 mSwapMask = 0; 2050} 2051 2052bool LiveSession::checkBuffering( 2053 bool &underflow, bool &ready, bool &down, bool &up) { 2054 underflow = ready = down = up = false; 2055 2056 if (mReconfigurationInProgress) { 2057 ALOGV("Switch/Reconfig in progress, defer buffer polling"); 2058 return false; 2059 } 2060 2061 size_t activeCount, underflowCount, readyCount, downCount, upCount; 2062 activeCount = underflowCount = readyCount = downCount = upCount =0; 2063 int32_t minBufferPercent = -1; 2064 int64_t durationUs; 2065 if (getDuration(&durationUs) != OK) { 2066 durationUs = -1; 2067 } 2068 for (size_t i = 0; i < mPacketSources.size(); ++i) { 2069 // we don't check subtitles for buffering level 2070 if (!(mStreamMask & mPacketSources.keyAt(i) 2071 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { 2072 continue; 2073 } 2074 // ignore streams that never had any packet queued. 2075 // (it's possible that the variant only has audio or video) 2076 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 2077 if (meta == NULL) { 2078 continue; 2079 } 2080 2081 status_t finalResult; 2082 int64_t bufferedDurationUs = 2083 mPacketSources[i]->getBufferedDurationUs(&finalResult); 2084 ALOGV("[%s] buffered %lld us", 2085 getNameForStream(mPacketSources.keyAt(i)), 2086 (long long)bufferedDurationUs); 2087 if (durationUs >= 0) { 2088 int32_t percent; 2089 if (mPacketSources[i]->isFinished(0 /* duration */)) { 2090 percent = 100; 2091 } else { 2092 percent = (int32_t)(100.0 * 2093 (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); 2094 } 2095 if (minBufferPercent < 0 || percent < minBufferPercent) { 2096 minBufferPercent = percent; 2097 } 2098 } 2099 2100 ++activeCount; 2101 int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs; 2102 if (bufferedDurationUs > readyMark 2103 || mPacketSources[i]->isFinished(0)) { 2104 ++readyCount; 2105 } 2106 if (!mPacketSources[i]->isFinished(0)) { 2107 if (bufferedDurationUs < kUnderflowMarkUs) { 2108 ++underflowCount; 2109 } 2110 if (bufferedDurationUs > mUpSwitchMark) { 2111 ++upCount; 2112 } 2113 if (bufferedDurationUs < mDownSwitchMark) { 2114 ++downCount; 2115 } 2116 } 2117 } 2118 2119 if (minBufferPercent >= 0) { 2120 notifyBufferingUpdate(minBufferPercent); 2121 } 2122 2123 if (activeCount > 0) { 2124 up = (upCount == activeCount); 2125 down = (downCount > 0); 2126 ready = (readyCount == activeCount); 2127 underflow = (underflowCount > 0); 2128 return true; 2129 } 2130 2131 return false; 2132} 2133 2134void LiveSession::startBufferingIfNecessary() { 2135 ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2136 mInPreparationPhase, mBuffering); 2137 if (!mBuffering) { 2138 mBuffering = true; 2139 2140 sp<AMessage> notify = mNotify->dup(); 2141 notify->setInt32("what", kWhatBufferingStart); 2142 notify->post(); 2143 } 2144} 2145 2146void LiveSession::stopBufferingIfNecessary() { 2147 ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2148 mInPreparationPhase, mBuffering); 2149 2150 if (mBuffering) { 2151 mBuffering = false; 2152 2153 sp<AMessage> notify = mNotify->dup(); 2154 notify->setInt32("what", kWhatBufferingEnd); 2155 notify->post(); 2156 } 2157} 2158 2159void LiveSession::notifyBufferingUpdate(int32_t percentage) { 2160 if (percentage < mPrevBufferPercentage) { 2161 percentage = mPrevBufferPercentage; 2162 } else if (percentage > 100) { 2163 percentage = 100; 2164 } 2165 2166 mPrevBufferPercentage = percentage; 2167 2168 ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); 2169 2170 sp<AMessage> notify = mNotify->dup(); 2171 notify->setInt32("what", kWhatBufferingUpdate); 2172 notify->setInt32("percentage", percentage); 2173 notify->post(); 2174} 2175 2176/* 2177 * returns true if a bandwidth switch is actually needed (and started), 2178 * returns false otherwise 2179 */ 2180bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { 2181 // no need to check bandwidth if we only have 1 bandwidth settings 2182 if (mSwitchInProgress || mBandwidthItems.size() < 2) { 2183 return false; 2184 } 2185 2186 int32_t bandwidthBps; 2187 bool isStable; 2188 if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps, &isStable)) { 2189 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 2190 mLastBandwidthBps = bandwidthBps; 2191 } else { 2192 ALOGV("no bandwidth estimate."); 2193 return false; 2194 } 2195 2196 int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth; 2197 // canSwithDown and canSwitchUp can't both be true. 2198 // we only want to switch up when measured bw is 120% higher than current variant, 2199 // and we only want to switch down when measured bw is below current variant. 2200 bool canSwitchDown = bufferLow 2201 && (bandwidthBps < (int32_t)curBandwidth); 2202 bool canSwitchUp = bufferHigh 2203 && (bandwidthBps > (int32_t)curBandwidth * 12 / 10); 2204 2205 if (canSwitchDown || canSwitchUp) { 2206 // bandwidth estimating has some delay, if we have to downswitch when 2207 // it hasn't stabilized, be very conservative on bandwidth. 2208 if (!isStable && canSwitchDown) { 2209 bandwidthBps /= 2; 2210 } 2211 2212 ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps); 2213 2214 // it's possible that we're checking for canSwitchUp case, but the returned 2215 // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70% 2216 // of measured bw. In that case we don't want to do anything, since we have 2217 // both enough buffer and enough bw. 2218 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) 2219 || (canSwitchDown && bandwidthIndex < mCurBandwidthIndex)) { 2220 // if not yet prepared, just restart again with new bw index. 2221 // this is faster and playback experience is cleaner. 2222 changeConfiguration( 2223 mInPreparationPhase ? 0 : -1ll, bandwidthIndex); 2224 return true; 2225 } 2226 } 2227 return false; 2228} 2229 2230void LiveSession::postError(status_t err) { 2231 // if we reached EOS, notify buffering of 100% 2232 if (err == ERROR_END_OF_STREAM) { 2233 notifyBufferingUpdate(100); 2234 } 2235 // we'll stop buffer polling now, before that notify 2236 // stop buffering to stop the spinning icon 2237 stopBufferingIfNecessary(); 2238 cancelPollBuffering(); 2239 2240 sp<AMessage> notify = mNotify->dup(); 2241 notify->setInt32("what", kWhatError); 2242 notify->setInt32("err", err); 2243 notify->post(); 2244} 2245 2246void LiveSession::postPrepared(status_t err) { 2247 CHECK(mInPreparationPhase); 2248 2249 sp<AMessage> notify = mNotify->dup(); 2250 if (err == OK || err == ERROR_END_OF_STREAM) { 2251 notify->setInt32("what", kWhatPrepared); 2252 } else { 2253 cancelPollBuffering(); 2254 2255 notify->setInt32("what", kWhatPreparationFailed); 2256 notify->setInt32("err", err); 2257 } 2258 2259 notify->post(); 2260 2261 mInPreparationPhase = false; 2262} 2263 2264 2265} // namespace android 2266 2267