LiveSession.cpp revision 0852843d304006e3ab333081fddda13b07193de8
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/foundation/AUtils.h> 37#include <media/stagefright/DataSource.h> 38#include <media/stagefright/FileSource.h> 39#include <media/stagefright/MediaErrors.h> 40#include <media/stagefright/MediaHTTP.h> 41#include <media/stagefright/MediaDefs.h> 42#include <media/stagefright/MetaData.h> 43#include <media/stagefright/Utils.h> 44 45#include <utils/Mutex.h> 46 47#include <ctype.h> 48#include <inttypes.h> 49#include <openssl/aes.h> 50#include <openssl/md5.h> 51 52namespace android { 53 54// static 55// Bandwidth Switch Mark Defaults 56const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll; 57const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll; 58const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll; 59const int64_t LiveSession::kResumeThresholdUs = 100000ll; 60 61// Buffer Prepare/Ready/Underflow Marks 62const int64_t LiveSession::kReadyMarkUs = 5000000ll; 63const int64_t LiveSession::kPrepareMarkUs = 1500000ll; 64const int64_t LiveSession::kUnderflowMarkUs = 1000000ll; 65 66struct LiveSession::BandwidthEstimator : public RefBase { 67 BandwidthEstimator(); 68 69 void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); 70 bool estimateBandwidth(int32_t *bandwidth); 71 72private: 73 // Bandwidth estimation parameters 74 static const int32_t kMaxBandwidthHistoryItems = 20; 75 static const int64_t kMaxBandwidthHistoryWindowUs = 5000000ll; // 5 sec 76 77 struct BandwidthEntry { 78 int64_t mDelayUs; 79 size_t mNumBytes; 80 }; 81 82 Mutex mLock; 83 List<BandwidthEntry> mBandwidthHistory; 84 int64_t mTotalTransferTimeUs; 85 size_t mTotalTransferBytes; 86 87 DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator); 88}; 89 90LiveSession::BandwidthEstimator::BandwidthEstimator() : 91 mTotalTransferTimeUs(0), 92 mTotalTransferBytes(0) { 93} 94 95void LiveSession::BandwidthEstimator::addBandwidthMeasurement( 96 size_t numBytes, int64_t delayUs) { 97 AutoMutex autoLock(mLock); 98 99 BandwidthEntry entry; 100 entry.mDelayUs = delayUs; 101 entry.mNumBytes = numBytes; 102 mTotalTransferTimeUs += delayUs; 103 mTotalTransferBytes += numBytes; 104 mBandwidthHistory.push_back(entry); 105 106 // trim old samples, keeping at least kMaxBandwidthHistoryItems samples, 107 // and total transfer time at least kMaxBandwidthHistoryWindowUs. 108 while (mBandwidthHistory.size() > kMaxBandwidthHistoryItems) { 109 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 110 if (mTotalTransferTimeUs - it->mDelayUs < kMaxBandwidthHistoryWindowUs) { 111 break; 112 } 113 mTotalTransferTimeUs -= it->mDelayUs; 114 mTotalTransferBytes -= it->mNumBytes; 115 mBandwidthHistory.erase(mBandwidthHistory.begin()); 116 } 117} 118 119bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) { 120 AutoMutex autoLock(mLock); 121 122 if (mBandwidthHistory.size() < 2) { 123 return false; 124 } 125 126 *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs); 127 return true; 128} 129 130//static 131const char *LiveSession::getKeyForStream(StreamType type) { 132 switch (type) { 133 case STREAMTYPE_VIDEO: 134 return "timeUsVideo"; 135 case STREAMTYPE_AUDIO: 136 return "timeUsAudio"; 137 case STREAMTYPE_SUBTITLES: 138 return "timeUsSubtitle"; 139 case STREAMTYPE_METADATA: 140 return "timeUsMetadata"; // unused 141 default: 142 TRESPASS(); 143 } 144 return NULL; 145} 146 147//static 148const char *LiveSession::getNameForStream(StreamType type) { 149 switch (type) { 150 case STREAMTYPE_VIDEO: 151 return "video"; 152 case STREAMTYPE_AUDIO: 153 return "audio"; 154 case STREAMTYPE_SUBTITLES: 155 return "subs"; 156 case STREAMTYPE_METADATA: 157 return "metadata"; 158 default: 159 break; 160 } 161 return "unknown"; 162} 163 164//static 165ATSParser::SourceType LiveSession::getSourceTypeForStream(StreamType type) { 166 switch (type) { 167 case STREAMTYPE_VIDEO: 168 return ATSParser::VIDEO; 169 case STREAMTYPE_AUDIO: 170 return ATSParser::AUDIO; 171 case STREAMTYPE_METADATA: 172 return ATSParser::META; 173 case STREAMTYPE_SUBTITLES: 174 default: 175 TRESPASS(); 176 } 177 return ATSParser::NUM_SOURCE_TYPES; // should not reach here 178} 179 180LiveSession::LiveSession( 181 const sp<AMessage> ¬ify, uint32_t flags, 182 const sp<IMediaHTTPService> &httpService) 183 : mNotify(notify), 184 mFlags(flags), 185 mHTTPService(httpService), 186 mBuffering(false), 187 mInPreparationPhase(true), 188 mPollBufferingGeneration(0), 189 mPrevBufferPercentage(-1), 190 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 191 mCurBandwidthIndex(-1), 192 mOrigBandwidthIndex(-1), 193 mLastBandwidthBps(-1ll), 194 mBandwidthEstimator(new BandwidthEstimator()), 195 mMaxWidth(720), 196 mMaxHeight(480), 197 mStreamMask(0), 198 mNewStreamMask(0), 199 mSwapMask(0), 200 mSwitchGeneration(0), 201 mSubtitleGeneration(0), 202 mLastDequeuedTimeUs(0ll), 203 mRealTimeBaseUs(0ll), 204 mReconfigurationInProgress(false), 205 mSwitchInProgress(false), 206 mUpSwitchMark(kUpSwitchMarkUs), 207 mDownSwitchMark(kDownSwitchMarkUs), 208 mUpSwitchMargin(kUpSwitchMarginUs), 209 mFirstTimeUsValid(false), 210 mFirstTimeUs(0), 211 mLastSeekTimeUs(0), 212 mHasMetadata(false) { 213 mStreams[kAudioIndex] = StreamItem("audio"); 214 mStreams[kVideoIndex] = StreamItem("video"); 215 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 216 217 for (size_t i = 0; i < kNumSources; ++i) { 218 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 219 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 220 } 221} 222 223LiveSession::~LiveSession() { 224 if (mFetcherLooper != NULL) { 225 mFetcherLooper->stop(); 226 } 227} 228 229int64_t LiveSession::calculateMediaTimeUs( 230 int64_t firstTimeUs, int64_t timeUs, int32_t discontinuitySeq) { 231 if (timeUs >= firstTimeUs) { 232 timeUs -= firstTimeUs; 233 } else { 234 timeUs = 0; 235 } 236 timeUs += mLastSeekTimeUs; 237 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 238 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 239 } 240 return timeUs; 241} 242 243status_t LiveSession::dequeueAccessUnit( 244 StreamType stream, sp<ABuffer> *accessUnit) { 245 status_t finalResult = OK; 246 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 247 248 ssize_t streamIdx = typeToIndex(stream); 249 if (streamIdx < 0) { 250 return INVALID_VALUE; 251 } 252 const char *streamStr = getNameForStream(stream); 253 // Do not let client pull data if we don't have data packets yet. 254 // We might only have a format discontinuity queued without data. 255 // When NuPlayerDecoder dequeues the format discontinuity, it will 256 // immediately try to getFormat. If we return NULL, NuPlayerDecoder 257 // thinks it can do seamless change, so will not shutdown decoder. 258 // When the actual format arrives, it can't handle it and get stuck. 259 if (!packetSource->hasDataBufferAvailable(&finalResult)) { 260 ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)", 261 streamStr, finalResult); 262 263 if (finalResult == OK) { 264 return -EAGAIN; 265 } else { 266 return finalResult; 267 } 268 } 269 270 // Let the client dequeue as long as we have buffers available 271 // Do not make pause/resume decisions here. 272 273 status_t err = packetSource->dequeueAccessUnit(accessUnit); 274 275 if (err == INFO_DISCONTINUITY) { 276 // adaptive streaming, discontinuities in the playlist 277 int32_t type; 278 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 279 280 sp<AMessage> extra; 281 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 282 extra.clear(); 283 } 284 285 ALOGI("[%s] read discontinuity of type %d, extra = %s", 286 streamStr, 287 type, 288 extra == NULL ? "NULL" : extra->debugString().c_str()); 289 } else if (err == OK) { 290 291 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 292 int64_t timeUs, originalTimeUs; 293 int32_t discontinuitySeq = 0; 294 StreamItem& strm = mStreams[streamIdx]; 295 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 296 originalTimeUs = timeUs; 297 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 298 if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { 299 int64_t offsetTimeUs; 300 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 301 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); 302 } else { 303 offsetTimeUs = 0; 304 } 305 306 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 307 int64_t firstTimeUs; 308 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 309 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 310 offsetTimeUs += strm.mLastSampleDurationUs; 311 } else { 312 offsetTimeUs += strm.mLastSampleDurationUs; 313 } 314 315 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); 316 strm.mCurDiscontinuitySeq = discontinuitySeq; 317 } 318 319 int32_t discard = 0; 320 int64_t firstTimeUs; 321 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 322 int64_t durUs; // approximate sample duration 323 if (timeUs > strm.mLastDequeuedTimeUs) { 324 durUs = timeUs - strm.mLastDequeuedTimeUs; 325 } else { 326 durUs = strm.mLastDequeuedTimeUs - timeUs; 327 } 328 strm.mLastSampleDurationUs = durUs; 329 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 330 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 331 firstTimeUs = timeUs; 332 } else { 333 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 334 firstTimeUs = timeUs; 335 } 336 337 strm.mLastDequeuedTimeUs = timeUs; 338 timeUs = calculateMediaTimeUs(firstTimeUs, timeUs, discontinuitySeq); 339 340 ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us", 341 streamStr, (long long)timeUs, (long long)originalTimeUs); 342 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 343 mLastDequeuedTimeUs = timeUs; 344 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 345 } else if (stream == STREAMTYPE_SUBTITLES) { 346 int32_t subtitleGeneration; 347 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 348 && subtitleGeneration != mSubtitleGeneration) { 349 return -EAGAIN; 350 }; 351 (*accessUnit)->meta()->setInt32( 352 "trackIndex", mPlaylist->getSelectedIndex()); 353 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 354 } else if (stream == STREAMTYPE_METADATA) { 355 HLSTime mdTime((*accessUnit)->meta()); 356 if (mDiscontinuityAbsStartTimesUs.indexOfKey(mdTime.mSeq) < 0) { 357 packetSource->requeueAccessUnit((*accessUnit)); 358 return -EAGAIN; 359 } else { 360 int64_t firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(mdTime.mSeq); 361 int64_t timeUs = calculateMediaTimeUs(firstTimeUs, mdTime.mTimeUs, mdTime.mSeq); 362 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 363 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 364 } 365 } 366 } else { 367 ALOGI("[%s] encountered error %d", streamStr, err); 368 } 369 370 return err; 371} 372 373status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 374 if (!(mStreamMask & stream)) { 375 return UNKNOWN_ERROR; 376 } 377 378 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 379 380 sp<MetaData> meta = packetSource->getFormat(); 381 382 if (meta == NULL) { 383 return -EAGAIN; 384 } 385 386 if (stream == STREAMTYPE_AUDIO) { 387 // set AAC input buffer size to 32K bytes (256kbps x 1sec) 388 meta->setInt32(kKeyMaxInputSize, 32 * 1024); 389 } else if (stream == STREAMTYPE_VIDEO) { 390 meta->setInt32(kKeyMaxWidth, mMaxWidth); 391 meta->setInt32(kKeyMaxHeight, mMaxHeight); 392 } 393 394 return convertMetaDataToMessage(meta, format); 395} 396 397sp<HTTPBase> LiveSession::getHTTPDataSource() { 398 return new MediaHTTP(mHTTPService->makeHTTPConnection()); 399} 400 401void LiveSession::connectAsync( 402 const char *url, const KeyedVector<String8, String8> *headers) { 403 sp<AMessage> msg = new AMessage(kWhatConnect, this); 404 msg->setString("url", url); 405 406 if (headers != NULL) { 407 msg->setPointer( 408 "headers", 409 new KeyedVector<String8, String8>(*headers)); 410 } 411 412 msg->post(); 413} 414 415status_t LiveSession::disconnect() { 416 sp<AMessage> msg = new AMessage(kWhatDisconnect, this); 417 418 sp<AMessage> response; 419 status_t err = msg->postAndAwaitResponse(&response); 420 421 return err; 422} 423 424status_t LiveSession::seekTo(int64_t timeUs) { 425 sp<AMessage> msg = new AMessage(kWhatSeek, this); 426 msg->setInt64("timeUs", timeUs); 427 428 sp<AMessage> response; 429 status_t err = msg->postAndAwaitResponse(&response); 430 431 return err; 432} 433 434bool LiveSession::checkSwitchProgress( 435 sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) { 436 AString newUri; 437 CHECK(stopParams->findString("uri", &newUri)); 438 439 *needResumeUntil = false; 440 sp<AMessage> firstNewMeta[kMaxStreams]; 441 for (size_t i = 0; i < kMaxStreams; ++i) { 442 StreamType stream = indexToType(i); 443 if (!(mSwapMask & mNewStreamMask & stream) 444 || (mStreams[i].mNewUri != newUri)) { 445 continue; 446 } 447 if (stream == STREAMTYPE_SUBTITLES) { 448 continue; 449 } 450 sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i); 451 452 // First, get latest dequeued meta, which is where the decoder is at. 453 // (when upswitching, we take the meta after a certain delay, so that 454 // the decoder is left with some cushion) 455 sp<AMessage> lastDequeueMeta, lastEnqueueMeta; 456 if (delayUs > 0) { 457 lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); 458 if (lastDequeueMeta == NULL) { 459 // this means we don't have enough cushion, try again later 460 ALOGV("[%s] up switching failed due to insufficient buffer", 461 getNameForStream(stream)); 462 return false; 463 } 464 } else { 465 // It's okay for lastDequeueMeta to be NULL here, it means the 466 // decoder hasn't even started dequeueing 467 lastDequeueMeta = source->getLatestDequeuedMeta(); 468 } 469 // Then, trim off packets at beginning of mPacketSources2 that's before 470 // the latest dequeued time. These samples are definitely too late. 471 firstNewMeta[i] = mPacketSources2.editValueAt(i) 472 ->trimBuffersBeforeMeta(lastDequeueMeta); 473 474 // Now firstNewMeta[i] is the first sample after the trim. 475 // If it's NULL, we failed because dequeue already past all samples 476 // in mPacketSource2, we have to try again. 477 if (firstNewMeta[i] == NULL) { 478 HLSTime dequeueTime(lastDequeueMeta); 479 ALOGV("[%s] dequeue time (%d, %lld) past start time", 480 getNameForStream(stream), 481 dequeueTime.mSeq, (long long) dequeueTime.mTimeUs); 482 return false; 483 } 484 485 // Otherwise, we check if mPacketSources2 overlaps with what old fetcher 486 // already fetched, and see if we need to resumeUntil 487 lastEnqueueMeta = source->getLatestEnqueuedMeta(); 488 // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity 489 // boundary, no need to resume as the content will look different anyways 490 if (lastEnqueueMeta != NULL) { 491 HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]); 492 493 // no need to resume old fetcher if new fetcher started in different 494 // discontinuity sequence, as the content will look different. 495 *needResumeUntil |= (startTime.mSeq == lastTime.mSeq 496 && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs); 497 498 // update the stopTime for resumeUntil 499 stopParams->setInt32("discontinuitySeq", startTime.mSeq); 500 stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs); 501 } 502 } 503 504 // if we're here, it means dequeue progress hasn't passed some samples in 505 // mPacketSource2, we can trim off the excess in mPacketSource. 506 // (old fetcher might still need to resumeUntil the start time of new fetcher) 507 for (size_t i = 0; i < kMaxStreams; ++i) { 508 StreamType stream = indexToType(i); 509 if (!(mSwapMask & mNewStreamMask & stream) 510 || (newUri != mStreams[i].mNewUri) 511 || stream == STREAMTYPE_SUBTITLES) { 512 continue; 513 } 514 mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]); 515 } 516 517 // no resumeUntil if already underflow 518 *needResumeUntil &= !mBuffering; 519 520 return true; 521} 522 523void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 524 switch (msg->what()) { 525 case kWhatConnect: 526 { 527 onConnect(msg); 528 break; 529 } 530 531 case kWhatDisconnect: 532 { 533 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 534 535 if (mReconfigurationInProgress) { 536 break; 537 } 538 539 finishDisconnect(); 540 break; 541 } 542 543 case kWhatSeek: 544 { 545 if (mReconfigurationInProgress) { 546 msg->post(50000); 547 break; 548 } 549 550 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 551 mSeekReply = new AMessage; 552 553 onSeek(msg); 554 break; 555 } 556 557 case kWhatFetcherNotify: 558 { 559 int32_t what; 560 CHECK(msg->findInt32("what", &what)); 561 562 switch (what) { 563 case PlaylistFetcher::kWhatStarted: 564 break; 565 case PlaylistFetcher::kWhatPaused: 566 case PlaylistFetcher::kWhatStopped: 567 { 568 AString uri; 569 CHECK(msg->findString("uri", &uri)); 570 ssize_t index = mFetcherInfos.indexOfKey(uri); 571 if (index < 0) { 572 // ignore msgs from fetchers that's already gone 573 break; 574 } 575 576 ALOGV("fetcher-%d %s", 577 mFetcherInfos[index].mFetcher->getFetcherID(), 578 what == PlaylistFetcher::kWhatPaused ? 579 "paused" : "stopped"); 580 581 if (what == PlaylistFetcher::kWhatStopped) { 582 mFetcherLooper->unregisterHandler( 583 mFetcherInfos[index].mFetcher->id()); 584 mFetcherInfos.removeItemsAt(index); 585 } else if (what == PlaylistFetcher::kWhatPaused) { 586 int32_t seekMode; 587 CHECK(msg->findInt32("seekMode", &seekMode)); 588 for (size_t i = 0; i < kMaxStreams; ++i) { 589 if (mStreams[i].mUri == uri) { 590 mStreams[i].mSeekMode = (SeekMode) seekMode; 591 } 592 } 593 } 594 595 if (mContinuation != NULL) { 596 CHECK_GT(mContinuationCounter, 0); 597 if (--mContinuationCounter == 0) { 598 mContinuation->post(); 599 } 600 ALOGV("%zu fetcher(s) left", mContinuationCounter); 601 } 602 break; 603 } 604 605 case PlaylistFetcher::kWhatDurationUpdate: 606 { 607 AString uri; 608 CHECK(msg->findString("uri", &uri)); 609 610 int64_t durationUs; 611 CHECK(msg->findInt64("durationUs", &durationUs)); 612 613 ssize_t index = mFetcherInfos.indexOfKey(uri); 614 if (index >= 0) { 615 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 616 info->mDurationUs = durationUs; 617 } 618 break; 619 } 620 621 case PlaylistFetcher::kWhatTargetDurationUpdate: 622 { 623 int64_t targetDurationUs; 624 CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); 625 mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4); 626 mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4); 627 mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs); 628 break; 629 } 630 631 case PlaylistFetcher::kWhatError: 632 { 633 status_t err; 634 CHECK(msg->findInt32("err", &err)); 635 636 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 637 638 // handle EOS on subtitle tracks independently 639 AString uri; 640 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 641 ssize_t i = mFetcherInfos.indexOfKey(uri); 642 if (i >= 0) { 643 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 644 if (fetcher != NULL) { 645 uint32_t type = fetcher->getStreamTypeMask(); 646 if (type == STREAMTYPE_SUBTITLES) { 647 mPacketSources.valueFor( 648 STREAMTYPE_SUBTITLES)->signalEOS(err);; 649 break; 650 } 651 } 652 } 653 } 654 655 if (mInPreparationPhase) { 656 postPrepared(err); 657 } 658 659 cancelBandwidthSwitch(); 660 661 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 662 663 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 664 665 mPacketSources.valueFor( 666 STREAMTYPE_SUBTITLES)->signalEOS(err); 667 668 postError(err); 669 break; 670 } 671 672 case PlaylistFetcher::kWhatStopReached: 673 { 674 ALOGV("kWhatStopReached"); 675 676 AString oldUri; 677 CHECK(msg->findString("uri", &oldUri)); 678 679 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 680 if (index < 0) { 681 break; 682 } 683 684 tryToFinishBandwidthSwitch(oldUri); 685 break; 686 } 687 688 case PlaylistFetcher::kWhatStartedAt: 689 { 690 int32_t switchGeneration; 691 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 692 693 ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d", 694 switchGeneration, mSwitchGeneration); 695 696 if (switchGeneration != mSwitchGeneration) { 697 break; 698 } 699 700 AString uri; 701 CHECK(msg->findString("uri", &uri)); 702 703 // mark new fetcher mToBeResumed 704 ssize_t index = mFetcherInfos.indexOfKey(uri); 705 if (index >= 0) { 706 mFetcherInfos.editValueAt(index).mToBeResumed = true; 707 } 708 709 // temporarily disable packet sources to be swapped to prevent 710 // NuPlayerDecoder from dequeuing while we check progress 711 for (size_t i = 0; i < mPacketSources.size(); ++i) { 712 if ((mSwapMask & mPacketSources.keyAt(i)) 713 && uri == mStreams[i].mNewUri) { 714 mPacketSources.editValueAt(i)->enable(false); 715 } 716 } 717 bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); 718 // If switching up, require a cushion bigger than kUnderflowMark 719 // to avoid buffering immediately after the switch. 720 // (If we don't have that cushion we'd rather cancel and try again.) 721 int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0; 722 bool needResumeUntil = false; 723 sp<AMessage> stopParams = msg; 724 if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { 725 // playback time hasn't passed startAt time 726 if (!needResumeUntil) { 727 ALOGV("finish switch"); 728 for (size_t i = 0; i < kMaxStreams; ++i) { 729 if ((mSwapMask & indexToType(i)) 730 && uri == mStreams[i].mNewUri) { 731 // have to make a copy of mStreams[i].mUri because 732 // tryToFinishBandwidthSwitch is modifying mStreams[] 733 AString oldURI = mStreams[i].mUri; 734 tryToFinishBandwidthSwitch(oldURI); 735 break; 736 } 737 } 738 } else { 739 // startAt time is after last enqueue time 740 // Resume fetcher for the original variant; the resumed fetcher should 741 // continue until the timestamps found in msg, which is stored by the 742 // new fetcher to indicate where the new variant has started buffering. 743 ALOGV("finish switch with resumeUntilAsync"); 744 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 745 const FetcherInfo &info = mFetcherInfos.valueAt(i); 746 if (info.mToBeRemoved) { 747 info.mFetcher->resumeUntilAsync(stopParams); 748 } 749 } 750 } 751 } else { 752 // playback time passed startAt time 753 if (switchUp) { 754 // if switching up, cancel and retry if condition satisfies again 755 ALOGV("cancel up switch because we're too late"); 756 cancelBandwidthSwitch(true /* resume */); 757 } else { 758 ALOGV("retry down switch at next sample"); 759 resumeFetcher(uri, mSwapMask, -1, true /* newUri */); 760 } 761 } 762 // re-enable all packet sources 763 for (size_t i = 0; i < mPacketSources.size(); ++i) { 764 mPacketSources.editValueAt(i)->enable(true); 765 } 766 767 break; 768 } 769 770 case PlaylistFetcher::kWhatMetadataDetected: 771 { 772 if (!mHasMetadata) { 773 mHasMetadata = true; 774 sp<AMessage> notify = mNotify->dup(); 775 notify->setInt32("what", kWhatMetadataDetected); 776 notify->post(); 777 } 778 break; 779 } 780 781 default: 782 TRESPASS(); 783 } 784 785 break; 786 } 787 788 case kWhatChangeConfiguration: 789 { 790 onChangeConfiguration(msg); 791 break; 792 } 793 794 case kWhatChangeConfiguration2: 795 { 796 onChangeConfiguration2(msg); 797 break; 798 } 799 800 case kWhatChangeConfiguration3: 801 { 802 onChangeConfiguration3(msg); 803 break; 804 } 805 806 case kWhatFinishDisconnect2: 807 { 808 onFinishDisconnect2(); 809 break; 810 } 811 812 case kWhatPollBuffering: 813 { 814 int32_t generation; 815 CHECK(msg->findInt32("generation", &generation)); 816 if (generation == mPollBufferingGeneration) { 817 onPollBuffering(); 818 } 819 break; 820 } 821 822 default: 823 TRESPASS(); 824 break; 825 } 826} 827 828// static 829int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 830 if (a->mBandwidth < b->mBandwidth) { 831 return -1; 832 } else if (a->mBandwidth == b->mBandwidth) { 833 return 0; 834 } 835 836 return 1; 837} 838 839// static 840LiveSession::StreamType LiveSession::indexToType(int idx) { 841 CHECK(idx >= 0 && idx < kNumSources); 842 return (StreamType)(1 << idx); 843} 844 845// static 846ssize_t LiveSession::typeToIndex(int32_t type) { 847 switch (type) { 848 case STREAMTYPE_AUDIO: 849 return 0; 850 case STREAMTYPE_VIDEO: 851 return 1; 852 case STREAMTYPE_SUBTITLES: 853 return 2; 854 case STREAMTYPE_METADATA: 855 return 3; 856 default: 857 return -1; 858 }; 859 return -1; 860} 861 862void LiveSession::onConnect(const sp<AMessage> &msg) { 863 AString url; 864 CHECK(msg->findString("url", &url)); 865 866 KeyedVector<String8, String8> *headers = NULL; 867 if (!msg->findPointer("headers", (void **)&headers)) { 868 mExtraHeaders.clear(); 869 } else { 870 mExtraHeaders = *headers; 871 872 delete headers; 873 headers = NULL; 874 } 875 876 // TODO currently we don't know if we are coming here from incognito mode 877 ALOGI("onConnect %s", uriDebugString(url).c_str()); 878 879 mMasterURL = url; 880 881 bool dummy; 882 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 883 884 if (mPlaylist == NULL) { 885 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 886 887 postPrepared(ERROR_IO); 888 return; 889 } 890 891 // create looper for fetchers 892 if (mFetcherLooper == NULL) { 893 mFetcherLooper = new ALooper(); 894 895 mFetcherLooper->setName("Fetcher"); 896 mFetcherLooper->start(false, false); 897 } 898 899 // We trust the content provider to make a reasonable choice of preferred 900 // initial bandwidth by listing it first in the variant playlist. 901 // At startup we really don't have a good estimate on the available 902 // network bandwidth since we haven't tranferred any data yet. Once 903 // we have we can make a better informed choice. 904 size_t initialBandwidth = 0; 905 size_t initialBandwidthIndex = 0; 906 907 int32_t maxWidth = 0; 908 int32_t maxHeight = 0; 909 910 if (mPlaylist->isVariantPlaylist()) { 911 Vector<BandwidthItem> itemsWithVideo; 912 for (size_t i = 0; i < mPlaylist->size(); ++i) { 913 BandwidthItem item; 914 915 item.mPlaylistIndex = i; 916 917 sp<AMessage> meta; 918 AString uri; 919 mPlaylist->itemAt(i, &uri, &meta); 920 921 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 922 923 int32_t width, height; 924 if (meta->findInt32("width", &width)) { 925 maxWidth = max(maxWidth, width); 926 } 927 if (meta->findInt32("height", &height)) { 928 maxHeight = max(maxHeight, height); 929 } 930 931 mBandwidthItems.push(item); 932 if (mPlaylist->hasType(i, "video")) { 933 itemsWithVideo.push(item); 934 } 935 } 936 // remove the audio-only variants if we have at least one with video 937 if (!itemsWithVideo.empty() 938 && itemsWithVideo.size() < mBandwidthItems.size()) { 939 mBandwidthItems.clear(); 940 for (size_t i = 0; i < itemsWithVideo.size(); ++i) { 941 mBandwidthItems.push(itemsWithVideo[i]); 942 } 943 } 944 945 CHECK_GT(mBandwidthItems.size(), 0u); 946 initialBandwidth = mBandwidthItems[0].mBandwidth; 947 948 mBandwidthItems.sort(SortByBandwidth); 949 950 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 951 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 952 initialBandwidthIndex = i; 953 break; 954 } 955 } 956 } else { 957 // dummy item. 958 BandwidthItem item; 959 item.mPlaylistIndex = 0; 960 item.mBandwidth = 0; 961 mBandwidthItems.push(item); 962 } 963 964 mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth; 965 mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight; 966 967 mPlaylist->pickRandomMediaItems(); 968 changeConfiguration( 969 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 970} 971 972void LiveSession::finishDisconnect() { 973 ALOGV("finishDisconnect"); 974 975 // No reconfiguration is currently pending, make sure none will trigger 976 // during disconnection either. 977 cancelBandwidthSwitch(); 978 979 // cancel buffer polling 980 cancelPollBuffering(); 981 982 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 983 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 984 } 985 986 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this); 987 988 mContinuationCounter = mFetcherInfos.size(); 989 mContinuation = msg; 990 991 if (mContinuationCounter == 0) { 992 msg->post(); 993 } 994} 995 996void LiveSession::onFinishDisconnect2() { 997 mContinuation.clear(); 998 999 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 1000 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 1001 1002 mPacketSources.valueFor( 1003 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 1004 1005 sp<AMessage> response = new AMessage; 1006 response->setInt32("err", OK); 1007 1008 response->postReply(mDisconnectReplyID); 1009 mDisconnectReplyID.clear(); 1010} 1011 1012sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 1013 ssize_t index = mFetcherInfos.indexOfKey(uri); 1014 1015 if (index >= 0) { 1016 return NULL; 1017 } 1018 1019 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this); 1020 notify->setString("uri", uri); 1021 notify->setInt32("switchGeneration", mSwitchGeneration); 1022 1023 FetcherInfo info; 1024 info.mFetcher = new PlaylistFetcher( 1025 notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration); 1026 info.mDurationUs = -1ll; 1027 info.mToBeRemoved = false; 1028 info.mToBeResumed = false; 1029 mFetcherLooper->registerHandler(info.mFetcher); 1030 1031 mFetcherInfos.add(uri, info); 1032 1033 return info.mFetcher; 1034} 1035 1036/* 1037 * Illustration of parameters: 1038 * 1039 * 0 `range_offset` 1040 * +------------+-------------------------------------------------------+--+--+ 1041 * | | | next block to fetch | | | 1042 * | | `source` handle => `out` buffer | | | | 1043 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 1044 * | |<----------- `range_length` / buffer capacity ----------->| | 1045 * |<------------------------------ file_size ------------------------------->| 1046 * 1047 * Special parameter values: 1048 * - range_length == -1 means entire file 1049 * - block_size == 0 means entire range 1050 * 1051 */ 1052ssize_t LiveSession::fetchFile( 1053 const char *url, sp<ABuffer> *out, 1054 int64_t range_offset, int64_t range_length, 1055 uint32_t block_size, /* download block size */ 1056 sp<DataSource> *source, /* to return and reuse source */ 1057 String8 *actualUrl, 1058 bool forceConnectHTTP /* force connect HTTP when resuing source */) { 1059 off64_t size; 1060 sp<DataSource> temp_source; 1061 if (source == NULL) { 1062 source = &temp_source; 1063 } 1064 1065 if (*source == NULL || forceConnectHTTP) { 1066 if (!strncasecmp(url, "file://", 7)) { 1067 *source = new FileSource(url + 7); 1068 } else if (strncasecmp(url, "http://", 7) 1069 && strncasecmp(url, "https://", 8)) { 1070 return ERROR_UNSUPPORTED; 1071 } else { 1072 KeyedVector<String8, String8> headers = mExtraHeaders; 1073 if (range_offset > 0 || range_length >= 0) { 1074 headers.add( 1075 String8("Range"), 1076 String8( 1077 AStringPrintf( 1078 "bytes=%lld-%s", 1079 range_offset, 1080 range_length < 0 1081 ? "" : AStringPrintf("%lld", 1082 range_offset + range_length - 1).c_str()).c_str())); 1083 } 1084 1085 HTTPBase* httpDataSource = 1086 (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get(); 1087 status_t err = httpDataSource->connect(url, &headers); 1088 1089 if (err != OK) { 1090 return err; 1091 } 1092 1093 if (*source == NULL) { 1094 *source = mHTTPDataSource; 1095 } 1096 } 1097 } 1098 1099 status_t getSizeErr = (*source)->getSize(&size); 1100 if (getSizeErr != OK) { 1101 size = 65536; 1102 } 1103 1104 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 1105 if (*out == NULL) { 1106 buffer->setRange(0, 0); 1107 } 1108 1109 ssize_t bytesRead = 0; 1110 // adjust range_length if only reading partial block 1111 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 1112 range_length = buffer->size() + block_size; 1113 } 1114 for (;;) { 1115 // Only resize when we don't know the size. 1116 size_t bufferRemaining = buffer->capacity() - buffer->size(); 1117 if (bufferRemaining == 0 && getSizeErr != OK) { 1118 size_t bufferIncrement = buffer->size() / 2; 1119 if (bufferIncrement < 32768) { 1120 bufferIncrement = 32768; 1121 } 1122 bufferRemaining = bufferIncrement; 1123 1124 ALOGV("increasing download buffer to %zu bytes", 1125 buffer->size() + bufferRemaining); 1126 1127 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 1128 memcpy(copy->data(), buffer->data(), buffer->size()); 1129 copy->setRange(0, buffer->size()); 1130 1131 buffer = copy; 1132 } 1133 1134 size_t maxBytesToRead = bufferRemaining; 1135 if (range_length >= 0) { 1136 int64_t bytesLeftInRange = range_length - buffer->size(); 1137 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 1138 maxBytesToRead = bytesLeftInRange; 1139 1140 if (bytesLeftInRange == 0) { 1141 break; 1142 } 1143 } 1144 } 1145 1146 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 1147 // to help us break out of the loop. 1148 ssize_t n = (*source)->readAt( 1149 buffer->size(), buffer->data() + buffer->size(), 1150 maxBytesToRead); 1151 1152 if (n < 0) { 1153 return n; 1154 } 1155 1156 if (n == 0) { 1157 break; 1158 } 1159 1160 buffer->setRange(0, buffer->size() + (size_t)n); 1161 bytesRead += n; 1162 } 1163 1164 *out = buffer; 1165 if (actualUrl != NULL) { 1166 *actualUrl = (*source)->getUri(); 1167 if (actualUrl->isEmpty()) { 1168 *actualUrl = url; 1169 } 1170 } 1171 1172 return bytesRead; 1173} 1174 1175sp<M3UParser> LiveSession::fetchPlaylist( 1176 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 1177 ALOGV("fetchPlaylist '%s'", url); 1178 1179 *unchanged = false; 1180 1181 sp<ABuffer> buffer; 1182 String8 actualUrl; 1183 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 1184 1185 // close off the connection after use 1186 mHTTPDataSource->disconnect(); 1187 1188 if (err <= 0) { 1189 return NULL; 1190 } 1191 1192 // MD5 functionality is not available on the simulator, treat all 1193 // playlists as changed. 1194 1195#if defined(HAVE_ANDROID_OS) 1196 uint8_t hash[16]; 1197 1198 MD5_CTX m; 1199 MD5_Init(&m); 1200 MD5_Update(&m, buffer->data(), buffer->size()); 1201 1202 MD5_Final(hash, &m); 1203 1204 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 1205 // playlist unchanged 1206 *unchanged = true; 1207 1208 return NULL; 1209 } 1210 1211 if (curPlaylistHash != NULL) { 1212 memcpy(curPlaylistHash, hash, sizeof(hash)); 1213 } 1214#endif 1215 1216 sp<M3UParser> playlist = 1217 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 1218 1219 if (playlist->initCheck() != OK) { 1220 ALOGE("failed to parse .m3u8 playlist"); 1221 1222 return NULL; 1223 } 1224 1225 return playlist; 1226} 1227 1228#if 0 1229static double uniformRand() { 1230 return (double)rand() / RAND_MAX; 1231} 1232#endif 1233 1234bool LiveSession::UriIsSameAsIndex(const AString &uri, int32_t i, bool newUri) { 1235 ALOGI("[timed_id3] i %d UriIsSameAsIndex newUri %s, %s", i, 1236 newUri ? "true" : "false", 1237 newUri ? mStreams[i].mNewUri.c_str() : mStreams[i].mUri.c_str()); 1238 return i >= 0 1239 && ((!newUri && uri == mStreams[i].mUri) 1240 || (newUri && uri == mStreams[i].mNewUri)); 1241} 1242 1243sp<AnotherPacketSource> LiveSession::getPacketSourceForStreamIndex( 1244 size_t trackIndex, bool newUri) { 1245 StreamType type = indexToType(trackIndex); 1246 sp<AnotherPacketSource> source = NULL; 1247 if (newUri) { 1248 source = mPacketSources2.valueFor(type); 1249 source->clear(); 1250 } else { 1251 source = mPacketSources.valueFor(type); 1252 }; 1253 return source; 1254} 1255 1256sp<AnotherPacketSource> LiveSession::getMetadataSource( 1257 sp<AnotherPacketSource> sources[kNumSources], uint32_t streamMask, bool newUri) { 1258 // todo: One case where the following strategy can fail is when audio and video 1259 // are in separate playlists, both are transport streams, and the metadata 1260 // is actually contained in the audio stream. 1261 ALOGV("[timed_id3] getMetadataSourceForUri streamMask %x newUri %s", 1262 streamMask, newUri ? "true" : "false"); 1263 1264 if ((sources[kVideoIndex] != NULL) // video fetcher; or ... 1265 || (!(streamMask & STREAMTYPE_VIDEO) && sources[kAudioIndex] != NULL)) { 1266 // ... audio fetcher for audio only variant 1267 return getPacketSourceForStreamIndex(kMetaDataIndex, newUri); 1268 } 1269 1270 return NULL; 1271} 1272 1273bool LiveSession::resumeFetcher( 1274 const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { 1275 ssize_t index = mFetcherInfos.indexOfKey(uri); 1276 if (index < 0) { 1277 ALOGE("did not find fetcher for uri: %s", uri.c_str()); 1278 return false; 1279 } 1280 1281 bool resume = false; 1282 sp<AnotherPacketSource> sources[kNumSources]; 1283 for (size_t i = 0; i < kMaxStreams; ++i) { 1284 if ((streamMask & indexToType(i)) && UriIsSameAsIndex(uri, i, newUri)) { 1285 resume = true; 1286 sources[i] = getPacketSourceForStreamIndex(i, newUri); 1287 } 1288 } 1289 1290 if (resume) { 1291 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher; 1292 SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; 1293 1294 ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d", 1295 fetcher->getFetcherID(), (long long)timeUs, seekMode); 1296 1297 fetcher->startAsync( 1298 sources[kAudioIndex], 1299 sources[kVideoIndex], 1300 sources[kSubtitleIndex], 1301 getMetadataSource(sources, streamMask, newUri), 1302 timeUs, -1, -1, seekMode); 1303 } 1304 1305 return resume; 1306} 1307 1308float LiveSession::getAbortThreshold( 1309 ssize_t currentBWIndex, ssize_t targetBWIndex) const { 1310 float abortThreshold = -1.0f; 1311 if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) { 1312 /* 1313 If we're switching down, we need to decide whether to 1314 1315 1) finish last segment of high-bandwidth variant, or 1316 2) abort last segment of high-bandwidth variant, and fetch an 1317 overlapping portion from low-bandwidth variant. 1318 1319 Here we try to maximize the amount of buffer left when the 1320 switch point is met. Given the following parameters: 1321 1322 B: our current buffering level in seconds 1323 T: target duration in seconds 1324 X: sample duration in seconds remain to fetch in last segment 1325 bw0: bandwidth of old variant (as specified in playlist) 1326 bw1: bandwidth of new variant (as specified in playlist) 1327 bw: measured bandwidth available 1328 1329 If we choose 1), when switch happens at the end of current 1330 segment, our buffering will be 1331 B + X - X * bw0 / bw 1332 1333 If we choose 2), when switch happens where we aborted current 1334 segment, our buffering will be 1335 B - (T - X) * bw1 / bw 1336 1337 We should only choose 1) if 1338 X/T < bw1 / (bw1 + bw0 - bw) 1339 */ 1340 1341 // Taking the measured current bandwidth at 50% face value only, 1342 // as our bandwidth estimation is a lagging indicator. Being 1343 // conservative on this, we prefer switching to lower bandwidth 1344 // unless we're really confident finishing up the last segment 1345 // of higher bandwidth will be fast. 1346 CHECK(mLastBandwidthBps >= 0); 1347 abortThreshold = 1348 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1349 / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1350 + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth 1351 - (float)mLastBandwidthBps * 0.5f); 1352 if (abortThreshold < 0.0f) { 1353 abortThreshold = -1.0f; // do not abort 1354 } 1355 ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f", 1356 mBandwidthItems.itemAt(currentBWIndex).mBandwidth, 1357 mBandwidthItems.itemAt(targetBWIndex).mBandwidth, 1358 mLastBandwidthBps, 1359 abortThreshold); 1360 } 1361 return abortThreshold; 1362} 1363 1364void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) { 1365 mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs); 1366} 1367 1368size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) { 1369 if (mBandwidthItems.size() < 2) { 1370 // shouldn't be here if we only have 1 bandwidth, check 1371 // logic to get rid of redundant bandwidth polling 1372 ALOGW("getBandwidthIndex() called for single bandwidth playlist!"); 1373 return 0; 1374 } 1375 1376#if 1 1377 char value[PROPERTY_VALUE_MAX]; 1378 ssize_t index = -1; 1379 if (property_get("media.httplive.bw-index", value, NULL)) { 1380 char *end; 1381 index = strtol(value, &end, 10); 1382 CHECK(end > value && *end == '\0'); 1383 1384 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 1385 index = mBandwidthItems.size() - 1; 1386 } 1387 } 1388 1389 if (index < 0) { 1390 char value[PROPERTY_VALUE_MAX]; 1391 if (property_get("media.httplive.max-bw", value, NULL)) { 1392 char *end; 1393 long maxBw = strtoul(value, &end, 10); 1394 if (end > value && *end == '\0') { 1395 if (maxBw > 0 && bandwidthBps > maxBw) { 1396 ALOGV("bandwidth capped to %ld bps", maxBw); 1397 bandwidthBps = maxBw; 1398 } 1399 } 1400 } 1401 1402 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1403 1404 index = mBandwidthItems.size() - 1; 1405 while (index > 0) { 1406 // be conservative (70%) to avoid overestimating and immediately 1407 // switching down again. 1408 size_t adjustedBandwidthBps = bandwidthBps * 7 / 10; 1409 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1410 break; 1411 } 1412 --index; 1413 } 1414 } 1415#elif 0 1416 // Change bandwidth at random() 1417 size_t index = uniformRand() * mBandwidthItems.size(); 1418#elif 0 1419 // There's a 50% chance to stay on the current bandwidth and 1420 // a 50% chance to switch to the next higher bandwidth (wrapping around 1421 // to lowest) 1422 const size_t kMinIndex = 0; 1423 1424 static ssize_t mCurBandwidthIndex = -1; 1425 1426 size_t index; 1427 if (mCurBandwidthIndex < 0) { 1428 index = kMinIndex; 1429 } else if (uniformRand() < 0.5) { 1430 index = (size_t)mCurBandwidthIndex; 1431 } else { 1432 index = mCurBandwidthIndex + 1; 1433 if (index == mBandwidthItems.size()) { 1434 index = kMinIndex; 1435 } 1436 } 1437 mCurBandwidthIndex = index; 1438#elif 0 1439 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1440 1441 size_t index = mBandwidthItems.size() - 1; 1442 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1443 --index; 1444 } 1445#elif 1 1446 char value[PROPERTY_VALUE_MAX]; 1447 size_t index; 1448 if (property_get("media.httplive.bw-index", value, NULL)) { 1449 char *end; 1450 index = strtoul(value, &end, 10); 1451 CHECK(end > value && *end == '\0'); 1452 1453 if (index >= mBandwidthItems.size()) { 1454 index = mBandwidthItems.size() - 1; 1455 } 1456 } else { 1457 index = 0; 1458 } 1459#else 1460 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1461#endif 1462 1463 CHECK_GE(index, 0); 1464 1465 return index; 1466} 1467 1468HLSTime LiveSession::latestMediaSegmentStartTime() const { 1469 HLSTime audioTime(mPacketSources.valueFor( 1470 STREAMTYPE_AUDIO)->getLatestDequeuedMeta()); 1471 1472 HLSTime videoTime(mPacketSources.valueFor( 1473 STREAMTYPE_VIDEO)->getLatestDequeuedMeta()); 1474 1475 return audioTime < videoTime ? videoTime : audioTime; 1476} 1477 1478void LiveSession::onSeek(const sp<AMessage> &msg) { 1479 int64_t timeUs; 1480 CHECK(msg->findInt64("timeUs", &timeUs)); 1481 changeConfiguration(timeUs); 1482} 1483 1484status_t LiveSession::getDuration(int64_t *durationUs) const { 1485 int64_t maxDurationUs = -1ll; 1486 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1487 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1488 1489 if (fetcherDurationUs > maxDurationUs) { 1490 maxDurationUs = fetcherDurationUs; 1491 } 1492 } 1493 1494 *durationUs = maxDurationUs; 1495 1496 return OK; 1497} 1498 1499bool LiveSession::isSeekable() const { 1500 int64_t durationUs; 1501 return getDuration(&durationUs) == OK && durationUs >= 0; 1502} 1503 1504bool LiveSession::hasDynamicDuration() const { 1505 return false; 1506} 1507 1508size_t LiveSession::getTrackCount() const { 1509 if (mPlaylist == NULL) { 1510 return 0; 1511 } else { 1512 return mPlaylist->getTrackCount() + (mHasMetadata ? 1 : 0); 1513 } 1514} 1515 1516sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1517 if (mPlaylist == NULL) { 1518 return NULL; 1519 } else { 1520 if (trackIndex == mPlaylist->getTrackCount() && mHasMetadata) { 1521 sp<AMessage> format = new AMessage(); 1522 format->setInt32("type", MEDIA_TRACK_TYPE_METADATA); 1523 format->setString("language", "und"); 1524 format->setString("mime", MEDIA_MIMETYPE_DATA_METADATA); 1525 return format; 1526 } 1527 return mPlaylist->getTrackInfo(trackIndex); 1528 } 1529} 1530 1531status_t LiveSession::selectTrack(size_t index, bool select) { 1532 if (mPlaylist == NULL) { 1533 return INVALID_OPERATION; 1534 } 1535 1536 ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++", 1537 index, select, mSubtitleGeneration); 1538 1539 ++mSubtitleGeneration; 1540 status_t err = mPlaylist->selectTrack(index, select); 1541 if (err == OK) { 1542 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this); 1543 msg->setInt32("pickTrack", select); 1544 msg->post(); 1545 } 1546 return err; 1547} 1548 1549ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1550 if (mPlaylist == NULL) { 1551 return -1; 1552 } else { 1553 return mPlaylist->getSelectedTrack(type); 1554 } 1555} 1556 1557void LiveSession::changeConfiguration( 1558 int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { 1559 ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d", 1560 (long long)timeUs, bandwidthIndex, pickTrack); 1561 1562 cancelBandwidthSwitch(); 1563 1564 CHECK(!mReconfigurationInProgress); 1565 mReconfigurationInProgress = true; 1566 if (bandwidthIndex >= 0) { 1567 mOrigBandwidthIndex = mCurBandwidthIndex; 1568 mCurBandwidthIndex = bandwidthIndex; 1569 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1570 ALOGI("#### Starting Bandwidth Switch: %zd => %zd", 1571 mOrigBandwidthIndex, mCurBandwidthIndex); 1572 } 1573 } 1574 CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size()); 1575 const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); 1576 1577 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1578 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1579 1580 AString URIs[kMaxStreams]; 1581 for (size_t i = 0; i < kMaxStreams; ++i) { 1582 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1583 streamMask |= indexToType(i); 1584 } 1585 } 1586 1587 // Step 1, stop and discard fetchers that are no longer needed. 1588 // Pause those that we'll reuse. 1589 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1590 // skip fetchers that are marked mToBeRemoved, 1591 // these are done and can't be reused 1592 if (mFetcherInfos[i].mToBeRemoved) { 1593 continue; 1594 } 1595 1596 const AString &uri = mFetcherInfos.keyAt(i); 1597 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher; 1598 1599 bool discardFetcher = true, delayRemoval = false; 1600 for (size_t j = 0; j < kMaxStreams; ++j) { 1601 StreamType type = indexToType(j); 1602 if ((streamMask & type) && uri == URIs[j]) { 1603 resumeMask |= type; 1604 streamMask &= ~type; 1605 discardFetcher = false; 1606 } 1607 } 1608 // Delay fetcher removal if not picking tracks, AND old fetcher 1609 // has stream mask that overlaps new variant. (Okay to discard 1610 // old fetcher now, if completely no overlap.) 1611 if (discardFetcher && timeUs < 0ll && !pickTrack 1612 && (fetcher->getStreamTypeMask() & streamMask)) { 1613 discardFetcher = false; 1614 delayRemoval = true; 1615 } 1616 1617 if (discardFetcher) { 1618 ALOGV("discarding fetcher-%d", fetcher->getFetcherID()); 1619 fetcher->stopAsync(); 1620 } else { 1621 float threshold = -1.0f; // always finish fetching by default 1622 if (timeUs >= 0ll) { 1623 // seeking, no need to finish fetching 1624 threshold = 0.0f; 1625 } else if (delayRemoval) { 1626 // adapting, abort if remaining of current segment is over threshold 1627 threshold = getAbortThreshold( 1628 mOrigBandwidthIndex, mCurBandwidthIndex); 1629 } 1630 1631 ALOGV("pausing fetcher-%d, threshold=%.2f", 1632 fetcher->getFetcherID(), threshold); 1633 fetcher->pauseAsync(threshold); 1634 } 1635 } 1636 1637 sp<AMessage> msg; 1638 if (timeUs < 0ll) { 1639 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1640 msg = new AMessage(kWhatChangeConfiguration3, this); 1641 } else { 1642 msg = new AMessage(kWhatChangeConfiguration2, this); 1643 } 1644 msg->setInt32("streamMask", streamMask); 1645 msg->setInt32("resumeMask", resumeMask); 1646 msg->setInt32("pickTrack", pickTrack); 1647 msg->setInt64("timeUs", timeUs); 1648 for (size_t i = 0; i < kMaxStreams; ++i) { 1649 if ((streamMask | resumeMask) & indexToType(i)) { 1650 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1651 } 1652 } 1653 1654 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1655 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1656 // fetchers have completed their asynchronous operation, we'll post 1657 // mContinuation, which then is handled below in onChangeConfiguration2. 1658 mContinuationCounter = mFetcherInfos.size(); 1659 mContinuation = msg; 1660 1661 if (mContinuationCounter == 0) { 1662 msg->post(); 1663 } 1664} 1665 1666void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1667 ALOGV("onChangeConfiguration"); 1668 1669 if (!mReconfigurationInProgress) { 1670 int32_t pickTrack = 0; 1671 msg->findInt32("pickTrack", &pickTrack); 1672 changeConfiguration(-1ll /* timeUs */, -1, pickTrack); 1673 } else { 1674 msg->post(1000000ll); // retry in 1 sec 1675 } 1676} 1677 1678void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1679 ALOGV("onChangeConfiguration2"); 1680 1681 mContinuation.clear(); 1682 1683 // All fetchers are either suspended or have been removed now. 1684 1685 // If we're seeking, clear all packet sources before we report 1686 // seek complete, to prevent decoder from pulling stale data. 1687 int64_t timeUs; 1688 CHECK(msg->findInt64("timeUs", &timeUs)); 1689 1690 if (timeUs >= 0) { 1691 mLastSeekTimeUs = timeUs; 1692 mLastDequeuedTimeUs = timeUs; 1693 1694 for (size_t i = 0; i < mPacketSources.size(); i++) { 1695 mPacketSources.editValueAt(i)->clear(); 1696 } 1697 1698 for (size_t i = 0; i < kMaxStreams; ++i) { 1699 mStreams[i].mCurDiscontinuitySeq = 0; 1700 } 1701 1702 mDiscontinuityOffsetTimesUs.clear(); 1703 mDiscontinuityAbsStartTimesUs.clear(); 1704 1705 if (mSeekReplyID != NULL) { 1706 CHECK(mSeekReply != NULL); 1707 mSeekReply->setInt32("err", OK); 1708 mSeekReply->postReply(mSeekReplyID); 1709 mSeekReplyID.clear(); 1710 mSeekReply.clear(); 1711 } 1712 1713 // restart buffer polling after seek becauese previous 1714 // buffering position is no longer valid. 1715 restartPollBuffering(); 1716 } 1717 1718 uint32_t streamMask, resumeMask; 1719 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1720 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1721 1722 streamMask |= resumeMask; 1723 1724 AString URIs[kMaxStreams]; 1725 for (size_t i = 0; i < kMaxStreams; ++i) { 1726 if (streamMask & indexToType(i)) { 1727 const AString &uriKey = mStreams[i].uriKey(); 1728 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1729 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1730 } 1731 } 1732 1733 uint32_t changedMask = 0; 1734 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1735 // stream URI could change even if onChangeConfiguration2 is only 1736 // used for seek. Seek could happen during a bw switch, in this 1737 // case bw switch will be cancelled, but the seekTo position will 1738 // fetch from the new URI. 1739 if ((mStreamMask & streamMask & indexToType(i)) 1740 && !mStreams[i].mUri.empty() 1741 && !(URIs[i] == mStreams[i].mUri)) { 1742 ALOGV("stream %zu changed: oldURI %s, newURI %s", i, 1743 mStreams[i].mUri.c_str(), URIs[i].c_str()); 1744 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i)); 1745 if (source->getLatestDequeuedMeta() != NULL) { 1746 source->queueDiscontinuity( 1747 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1748 } 1749 } 1750 // Determine which decoders to shutdown on the player side, 1751 // a decoder has to be shutdown if its streamtype was active 1752 // before but now longer isn't. 1753 if ((mStreamMask & ~streamMask & indexToType(i))) { 1754 changedMask |= indexToType(i); 1755 } 1756 } 1757 1758 if (changedMask == 0) { 1759 // If nothing changed as far as the audio/video decoders 1760 // are concerned we can proceed. 1761 onChangeConfiguration3(msg); 1762 return; 1763 } 1764 1765 // Something changed, inform the player which will shutdown the 1766 // corresponding decoders and will post the reply once that's done. 1767 // Handling the reply will continue executing below in 1768 // onChangeConfiguration3. 1769 sp<AMessage> notify = mNotify->dup(); 1770 notify->setInt32("what", kWhatStreamsChanged); 1771 notify->setInt32("changedMask", changedMask); 1772 1773 msg->setWhat(kWhatChangeConfiguration3); 1774 msg->setTarget(this); 1775 1776 notify->setMessage("reply", msg); 1777 notify->post(); 1778} 1779 1780void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1781 mContinuation.clear(); 1782 // All remaining fetchers are still suspended, the player has shutdown 1783 // any decoders that needed it. 1784 1785 uint32_t streamMask, resumeMask; 1786 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1787 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1788 1789 mNewStreamMask = streamMask | resumeMask; 1790 1791 int64_t timeUs; 1792 int32_t pickTrack; 1793 bool switching = false; 1794 CHECK(msg->findInt64("timeUs", &timeUs)); 1795 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1796 1797 if (timeUs < 0ll) { 1798 if (!pickTrack) { 1799 // mSwapMask contains streams that are in both old and new variant, 1800 // (in mNewStreamMask & mStreamMask) but with different URIs 1801 // (not in resumeMask). 1802 // For example, old variant has video and audio in two separate 1803 // URIs, and new variant has only audio with unchanged URI. mSwapMask 1804 // should be 0 as there is nothing to swap. We only need to stop video, 1805 // and resume audio. 1806 mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask; 1807 switching = (mSwapMask != 0); 1808 } 1809 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1810 } else { 1811 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1812 } 1813 1814 ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, " 1815 "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x", 1816 (long long)timeUs, switching, pickTrack, 1817 mStreamMask, mNewStreamMask, mSwapMask); 1818 1819 for (size_t i = 0; i < kMaxStreams; ++i) { 1820 if (streamMask & indexToType(i)) { 1821 if (switching) { 1822 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1823 } else { 1824 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1825 } 1826 } 1827 } 1828 1829 // Of all existing fetchers: 1830 // * Resume fetchers that are still needed and assign them original packet sources. 1831 // * Mark otherwise unneeded fetchers for removal. 1832 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1833 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1834 const AString &uri = mFetcherInfos.keyAt(i); 1835 if (!resumeFetcher(uri, resumeMask, timeUs)) { 1836 ALOGV("marking fetcher-%d to be removed", 1837 mFetcherInfos[i].mFetcher->getFetcherID()); 1838 1839 mFetcherInfos.editValueAt(i).mToBeRemoved = true; 1840 } 1841 } 1842 1843 // streamMask now only contains the types that need a new fetcher created. 1844 if (streamMask != 0) { 1845 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1846 } 1847 1848 // Find out when the original fetchers have buffered up to and start the new fetchers 1849 // at a later timestamp. 1850 for (size_t i = 0; i < kMaxStreams; i++) { 1851 if (!(indexToType(i) & streamMask)) { 1852 continue; 1853 } 1854 1855 AString uri; 1856 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1857 1858 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1859 CHECK(fetcher != NULL); 1860 1861 HLSTime startTime; 1862 SeekMode seekMode = kSeekModeExactPosition; 1863 sp<AnotherPacketSource> sources[kNumSources]; 1864 1865 if (i == kSubtitleIndex || (!pickTrack && !switching)) { 1866 startTime = latestMediaSegmentStartTime(); 1867 } 1868 1869 // TRICKY: looping from i as earlier streams are already removed from streamMask 1870 for (size_t j = i; j < kMaxStreams; ++j) { 1871 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1872 if ((streamMask & indexToType(j)) && uri == streamUri) { 1873 sources[j] = mPacketSources.valueFor(indexToType(j)); 1874 1875 if (timeUs >= 0) { 1876 startTime.mTimeUs = timeUs; 1877 } else { 1878 int32_t type; 1879 sp<AMessage> meta; 1880 if (!switching) { 1881 // selecting, or adapting but no swap required 1882 meta = sources[j]->getLatestDequeuedMeta(); 1883 } else { 1884 // adapting and swap required 1885 meta = sources[j]->getLatestEnqueuedMeta(); 1886 if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { 1887 // switching up 1888 meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); 1889 } 1890 } 1891 1892 if ((j == kAudioIndex || j == kVideoIndex) 1893 && meta != NULL && !meta->findInt32("discontinuity", &type)) { 1894 HLSTime tmpTime(meta); 1895 if (startTime < tmpTime) { 1896 startTime = tmpTime; 1897 } 1898 } 1899 1900 if (!switching) { 1901 // selecting, or adapting but no swap required 1902 sources[j]->clear(); 1903 if (j == kSubtitleIndex) { 1904 break; 1905 } 1906 1907 ALOGV("stream[%zu]: queue format change", j); 1908 sources[j]->queueDiscontinuity( 1909 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); 1910 } else { 1911 // switching, queue discontinuities after resume 1912 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1913 sources[j]->clear(); 1914 // the new fetcher might be providing streams that used to be 1915 // provided by two different fetchers, if one of the fetcher 1916 // paused in the middle while the other somehow paused in next 1917 // seg, we have to start from next seg. 1918 if (seekMode < mStreams[j].mSeekMode) { 1919 seekMode = mStreams[j].mSeekMode; 1920 } 1921 } 1922 } 1923 1924 streamMask &= ~indexToType(j); 1925 } 1926 } 1927 1928 ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld " 1929 "segmentStartTimeUs %lld seekMode %d", 1930 fetcher->getFetcherID(), 1931 (long long)startTime.mTimeUs, 1932 (long long)mLastSeekTimeUs, 1933 (long long)startTime.getSegmentTimeUs(true /* midpoint */), 1934 seekMode); 1935 1936 // Set the target segment start time to the middle point of the 1937 // segment where the last sample was. 1938 // This gives a better guess if segments of the two variants are not 1939 // perfectly aligned. (If the corresponding segment in new variant 1940 // starts slightly later than that in the old variant, we still want 1941 // to pick that segment, not the one before) 1942 fetcher->startAsync( 1943 sources[kAudioIndex], 1944 sources[kVideoIndex], 1945 sources[kSubtitleIndex], 1946 getMetadataSource(sources, mNewStreamMask, switching), 1947 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs, 1948 startTime.getSegmentTimeUs(true /* midpoint */), 1949 startTime.mSeq, 1950 seekMode); 1951 } 1952 1953 // All fetchers have now been started, the configuration change 1954 // has completed. 1955 1956 mReconfigurationInProgress = false; 1957 if (switching) { 1958 mSwitchInProgress = true; 1959 } else { 1960 mStreamMask = mNewStreamMask; 1961 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1962 ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd", 1963 mOrigBandwidthIndex, mCurBandwidthIndex); 1964 mOrigBandwidthIndex = mCurBandwidthIndex; 1965 } 1966 } 1967 1968 ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x", 1969 mSwitchInProgress, mStreamMask); 1970 1971 if (mDisconnectReplyID != NULL) { 1972 finishDisconnect(); 1973 } 1974} 1975 1976void LiveSession::swapPacketSource(StreamType stream) { 1977 ALOGV("[%s] swapPacketSource", getNameForStream(stream)); 1978 1979 // transfer packets from source2 to source 1980 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 1981 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 1982 1983 // queue discontinuity in mPacketSource 1984 aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false); 1985 1986 // queue packets in mPacketSource2 to mPacketSource 1987 status_t finalResult = OK; 1988 sp<ABuffer> accessUnit; 1989 while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK && 1990 OK == aps2->dequeueAccessUnit(&accessUnit)) { 1991 aps->queueAccessUnit(accessUnit); 1992 } 1993 aps2->clear(); 1994} 1995 1996void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { 1997 if (!mSwitchInProgress) { 1998 return; 1999 } 2000 2001 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 2002 if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { 2003 return; 2004 } 2005 2006 // Swap packet source of streams provided by old variant 2007 for (size_t idx = 0; idx < kMaxStreams; idx++) { 2008 StreamType stream = indexToType(idx); 2009 if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { 2010 swapPacketSource(stream); 2011 2012 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 2013 ALOGW("swapping stream type %d %s to empty stream", 2014 stream, mStreams[idx].mUri.c_str()); 2015 } 2016 mStreams[idx].mUri = mStreams[idx].mNewUri; 2017 mStreams[idx].mNewUri.clear(); 2018 2019 mSwapMask &= ~stream; 2020 } 2021 } 2022 2023 mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); 2024 2025 ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask); 2026 if (mSwapMask != 0) { 2027 return; 2028 } 2029 2030 // Check if new variant contains extra streams. 2031 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 2032 while (extraStreams) { 2033 StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1)); 2034 extraStreams &= ~stream; 2035 2036 swapPacketSource(stream); 2037 2038 ssize_t idx = typeToIndex(stream); 2039 CHECK(idx >= 0); 2040 if (mStreams[idx].mNewUri.empty()) { 2041 ALOGW("swapping extra stream type %d %s to empty stream", 2042 stream, mStreams[idx].mUri.c_str()); 2043 } 2044 mStreams[idx].mUri = mStreams[idx].mNewUri; 2045 mStreams[idx].mNewUri.clear(); 2046 } 2047 2048 // Restart new fetcher (it was paused after the first 47k block) 2049 // and let it fetch into mPacketSources (not mPacketSources2) 2050 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2051 FetcherInfo &info = mFetcherInfos.editValueAt(i); 2052 if (info.mToBeResumed) { 2053 resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); 2054 info.mToBeResumed = false; 2055 } 2056 } 2057 2058 ALOGI("#### Finished Bandwidth Switch: %zd => %zd", 2059 mOrigBandwidthIndex, mCurBandwidthIndex); 2060 2061 mStreamMask = mNewStreamMask; 2062 mSwitchInProgress = false; 2063 mOrigBandwidthIndex = mCurBandwidthIndex; 2064 2065 restartPollBuffering(); 2066} 2067 2068void LiveSession::schedulePollBuffering() { 2069 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); 2070 msg->setInt32("generation", mPollBufferingGeneration); 2071 msg->post(1000000ll); 2072} 2073 2074void LiveSession::cancelPollBuffering() { 2075 ++mPollBufferingGeneration; 2076 mPrevBufferPercentage = -1; 2077} 2078 2079void LiveSession::restartPollBuffering() { 2080 cancelPollBuffering(); 2081 onPollBuffering(); 2082} 2083 2084void LiveSession::onPollBuffering() { 2085 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " 2086 "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x", 2087 mSwitchInProgress, mReconfigurationInProgress, 2088 mInPreparationPhase, mCurBandwidthIndex, mStreamMask); 2089 2090 bool underflow, ready, down, up; 2091 if (checkBuffering(underflow, ready, down, up)) { 2092 if (mInPreparationPhase) { 2093 // Allow down switch even if we're still preparing. 2094 // 2095 // Some streams have a high bandwidth index as default, 2096 // when bandwidth is low, it takes a long time to buffer 2097 // to ready mark, then it immediately pauses after start 2098 // as we have to do a down switch. It's better experience 2099 // to restart from a lower index, if we detect low bw. 2100 if (!switchBandwidthIfNeeded(false /* up */, down) && ready) { 2101 postPrepared(OK); 2102 } 2103 } 2104 2105 if (!mInPreparationPhase) { 2106 if (ready) { 2107 stopBufferingIfNecessary(); 2108 } else if (underflow) { 2109 startBufferingIfNecessary(); 2110 } 2111 switchBandwidthIfNeeded(up, down); 2112 } 2113 } 2114 2115 schedulePollBuffering(); 2116} 2117 2118void LiveSession::cancelBandwidthSwitch(bool resume) { 2119 ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", 2120 mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); 2121 if (!mSwitchInProgress) { 2122 return; 2123 } 2124 2125 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2126 FetcherInfo& info = mFetcherInfos.editValueAt(i); 2127 if (info.mToBeRemoved) { 2128 info.mToBeRemoved = false; 2129 if (resume) { 2130 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); 2131 } 2132 } 2133 } 2134 2135 for (size_t i = 0; i < kMaxStreams; ++i) { 2136 AString newUri = mStreams[i].mNewUri; 2137 if (!newUri.empty()) { 2138 // clear all mNewUri matching this newUri 2139 for (size_t j = i; j < kMaxStreams; ++j) { 2140 if (mStreams[j].mNewUri == newUri) { 2141 mStreams[j].mNewUri.clear(); 2142 } 2143 } 2144 ALOGV("stopping newUri = %s", newUri.c_str()); 2145 ssize_t index = mFetcherInfos.indexOfKey(newUri); 2146 if (index < 0) { 2147 ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); 2148 continue; 2149 } 2150 FetcherInfo &info = mFetcherInfos.editValueAt(index); 2151 info.mToBeRemoved = true; 2152 info.mFetcher->stopAsync(); 2153 } 2154 } 2155 2156 ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", 2157 mOrigBandwidthIndex, mCurBandwidthIndex); 2158 2159 mSwitchGeneration++; 2160 mSwitchInProgress = false; 2161 mCurBandwidthIndex = mOrigBandwidthIndex; 2162 mSwapMask = 0; 2163} 2164 2165bool LiveSession::checkBuffering( 2166 bool &underflow, bool &ready, bool &down, bool &up) { 2167 underflow = ready = down = up = false; 2168 2169 if (mReconfigurationInProgress) { 2170 ALOGV("Switch/Reconfig in progress, defer buffer polling"); 2171 return false; 2172 } 2173 2174 size_t activeCount, underflowCount, readyCount, downCount, upCount; 2175 activeCount = underflowCount = readyCount = downCount = upCount =0; 2176 int32_t minBufferPercent = -1; 2177 int64_t durationUs; 2178 if (getDuration(&durationUs) != OK) { 2179 durationUs = -1; 2180 } 2181 for (size_t i = 0; i < mPacketSources.size(); ++i) { 2182 // we don't check subtitles for buffering level 2183 if (!(mStreamMask & mPacketSources.keyAt(i) 2184 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { 2185 continue; 2186 } 2187 // ignore streams that never had any packet queued. 2188 // (it's possible that the variant only has audio or video) 2189 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 2190 if (meta == NULL) { 2191 continue; 2192 } 2193 2194 int64_t bufferedDurationUs = 2195 mPacketSources[i]->getEstimatedDurationUs(); 2196 ALOGV("[%s] buffered %lld us", 2197 getNameForStream(mPacketSources.keyAt(i)), 2198 (long long)bufferedDurationUs); 2199 if (durationUs >= 0) { 2200 int32_t percent; 2201 if (mPacketSources[i]->isFinished(0 /* duration */)) { 2202 percent = 100; 2203 } else { 2204 percent = (int32_t)(100.0 * 2205 (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); 2206 } 2207 if (minBufferPercent < 0 || percent < minBufferPercent) { 2208 minBufferPercent = percent; 2209 } 2210 } 2211 2212 ++activeCount; 2213 int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs; 2214 if (bufferedDurationUs > readyMark 2215 || mPacketSources[i]->isFinished(0)) { 2216 ++readyCount; 2217 } 2218 if (!mPacketSources[i]->isFinished(0)) { 2219 if (bufferedDurationUs < kUnderflowMarkUs) { 2220 ++underflowCount; 2221 } 2222 if (bufferedDurationUs > mUpSwitchMark) { 2223 ++upCount; 2224 } 2225 if (bufferedDurationUs < mDownSwitchMark) { 2226 ++downCount; 2227 } 2228 } 2229 } 2230 2231 if (minBufferPercent >= 0) { 2232 notifyBufferingUpdate(minBufferPercent); 2233 } 2234 2235 if (activeCount > 0) { 2236 up = (upCount == activeCount); 2237 down = (downCount > 0); 2238 ready = (readyCount == activeCount); 2239 underflow = (underflowCount > 0); 2240 return true; 2241 } 2242 2243 return false; 2244} 2245 2246void LiveSession::startBufferingIfNecessary() { 2247 ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2248 mInPreparationPhase, mBuffering); 2249 if (!mBuffering) { 2250 mBuffering = true; 2251 2252 sp<AMessage> notify = mNotify->dup(); 2253 notify->setInt32("what", kWhatBufferingStart); 2254 notify->post(); 2255 } 2256} 2257 2258void LiveSession::stopBufferingIfNecessary() { 2259 ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2260 mInPreparationPhase, mBuffering); 2261 2262 if (mBuffering) { 2263 mBuffering = false; 2264 2265 sp<AMessage> notify = mNotify->dup(); 2266 notify->setInt32("what", kWhatBufferingEnd); 2267 notify->post(); 2268 } 2269} 2270 2271void LiveSession::notifyBufferingUpdate(int32_t percentage) { 2272 if (percentage < mPrevBufferPercentage) { 2273 percentage = mPrevBufferPercentage; 2274 } else if (percentage > 100) { 2275 percentage = 100; 2276 } 2277 2278 mPrevBufferPercentage = percentage; 2279 2280 ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); 2281 2282 sp<AMessage> notify = mNotify->dup(); 2283 notify->setInt32("what", kWhatBufferingUpdate); 2284 notify->setInt32("percentage", percentage); 2285 notify->post(); 2286} 2287 2288/* 2289 * returns true if a bandwidth switch is actually needed (and started), 2290 * returns false otherwise 2291 */ 2292bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { 2293 // no need to check bandwidth if we only have 1 bandwidth settings 2294 if (mSwitchInProgress || mBandwidthItems.size() < 2) { 2295 return false; 2296 } 2297 2298 int32_t bandwidthBps; 2299 if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) { 2300 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 2301 mLastBandwidthBps = bandwidthBps; 2302 } else { 2303 ALOGV("no bandwidth estimate."); 2304 return false; 2305 } 2306 2307 int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth; 2308 // canSwithDown and canSwitchUp can't both be true. 2309 // we only want to switch up when measured bw is 120% higher than current variant, 2310 // and we only want to switch down when measured bw is below current variant. 2311 bool canSwithDown = bufferLow 2312 && (bandwidthBps < (int32_t)curBandwidth); 2313 bool canSwitchUp = bufferHigh 2314 && (bandwidthBps > (int32_t)curBandwidth * 12 / 10); 2315 2316 if (canSwithDown || canSwitchUp) { 2317 ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps); 2318 2319 // it's possible that we're checking for canSwitchUp case, but the returned 2320 // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70% 2321 // of measured bw. In that case we don't want to do anything, since we have 2322 // both enough buffer and enough bw. 2323 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) 2324 || (canSwithDown && bandwidthIndex < mCurBandwidthIndex)) { 2325 // if not yet prepared, just restart again with new bw index. 2326 // this is faster and playback experience is cleaner. 2327 changeConfiguration( 2328 mInPreparationPhase ? 0 : -1ll, bandwidthIndex); 2329 return true; 2330 } 2331 } 2332 return false; 2333} 2334 2335void LiveSession::postError(status_t err) { 2336 // if we reached EOS, notify buffering of 100% 2337 if (err == ERROR_END_OF_STREAM) { 2338 notifyBufferingUpdate(100); 2339 } 2340 // we'll stop buffer polling now, before that notify 2341 // stop buffering to stop the spinning icon 2342 stopBufferingIfNecessary(); 2343 cancelPollBuffering(); 2344 2345 sp<AMessage> notify = mNotify->dup(); 2346 notify->setInt32("what", kWhatError); 2347 notify->setInt32("err", err); 2348 notify->post(); 2349} 2350 2351void LiveSession::postPrepared(status_t err) { 2352 CHECK(mInPreparationPhase); 2353 2354 sp<AMessage> notify = mNotify->dup(); 2355 if (err == OK || err == ERROR_END_OF_STREAM) { 2356 notify->setInt32("what", kWhatPrepared); 2357 } else { 2358 cancelPollBuffering(); 2359 2360 notify->setInt32("what", kWhatPreparationFailed); 2361 notify->setInt32("err", err); 2362 } 2363 2364 notify->post(); 2365 2366 mInPreparationPhase = false; 2367} 2368 2369 2370} // namespace android 2371 2372