LiveSession.cpp revision a0d0ba51ad60a68117a0ee78e37ab78715b8a069
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/foundation/AUtils.h> 37#include <media/stagefright/DataSource.h> 38#include <media/stagefright/FileSource.h> 39#include <media/stagefright/MediaErrors.h> 40#include <media/stagefright/MediaHTTP.h> 41#include <media/stagefright/MetaData.h> 42#include <media/stagefright/Utils.h> 43 44#include <utils/Mutex.h> 45 46#include <ctype.h> 47#include <inttypes.h> 48#include <openssl/aes.h> 49#include <openssl/md5.h> 50 51namespace android { 52 53// static 54// Bandwidth Switch Mark Defaults 55const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll; 56const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll; 57const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll; 58const int64_t LiveSession::kResumeThresholdUs = 100000ll; 59 60// Buffer Prepare/Ready/Underflow Marks 61const int64_t LiveSession::kReadyMarkUs = 5000000ll; 62const int64_t LiveSession::kPrepareMarkUs = 1500000ll; 63const int64_t LiveSession::kUnderflowMarkUs = 1000000ll; 64 65struct LiveSession::BandwidthEstimator : public RefBase { 66 BandwidthEstimator(); 67 68 void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); 69 bool estimateBandwidth(int32_t *bandwidth); 70 71private: 72 // Bandwidth estimation parameters 73 static const int32_t kMaxBandwidthHistoryItems = 20; 74 static const int64_t kMaxBandwidthHistoryWindowUs = 5000000ll; // 5 sec 75 76 struct BandwidthEntry { 77 int64_t mDelayUs; 78 size_t mNumBytes; 79 }; 80 81 Mutex mLock; 82 List<BandwidthEntry> mBandwidthHistory; 83 int64_t mTotalTransferTimeUs; 84 size_t mTotalTransferBytes; 85 86 DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator); 87}; 88 89LiveSession::BandwidthEstimator::BandwidthEstimator() : 90 mTotalTransferTimeUs(0), 91 mTotalTransferBytes(0) { 92} 93 94void LiveSession::BandwidthEstimator::addBandwidthMeasurement( 95 size_t numBytes, int64_t delayUs) { 96 AutoMutex autoLock(mLock); 97 98 BandwidthEntry entry; 99 entry.mDelayUs = delayUs; 100 entry.mNumBytes = numBytes; 101 mTotalTransferTimeUs += delayUs; 102 mTotalTransferBytes += numBytes; 103 mBandwidthHistory.push_back(entry); 104 105 // trim old samples, keeping at least kMaxBandwidthHistoryItems samples, 106 // and total transfer time at least kMaxBandwidthHistoryWindowUs. 107 while (mBandwidthHistory.size() > kMaxBandwidthHistoryItems) { 108 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 109 if (mTotalTransferTimeUs - it->mDelayUs < kMaxBandwidthHistoryWindowUs) { 110 break; 111 } 112 mTotalTransferTimeUs -= it->mDelayUs; 113 mTotalTransferBytes -= it->mNumBytes; 114 mBandwidthHistory.erase(mBandwidthHistory.begin()); 115 } 116} 117 118bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) { 119 AutoMutex autoLock(mLock); 120 121 if (mBandwidthHistory.size() < 2) { 122 return false; 123 } 124 125 *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs); 126 return true; 127} 128 129//static 130const char *LiveSession::getKeyForStream(StreamType type) { 131 switch (type) { 132 case STREAMTYPE_VIDEO: 133 return "timeUsVideo"; 134 case STREAMTYPE_AUDIO: 135 return "timeUsAudio"; 136 case STREAMTYPE_SUBTITLES: 137 return "timeUsSubtitle"; 138 default: 139 TRESPASS(); 140 } 141 return NULL; 142} 143 144//static 145const char *LiveSession::getNameForStream(StreamType type) { 146 switch (type) { 147 case STREAMTYPE_VIDEO: 148 return "video"; 149 case STREAMTYPE_AUDIO: 150 return "audio"; 151 case STREAMTYPE_SUBTITLES: 152 return "subs"; 153 default: 154 break; 155 } 156 return "unknown"; 157} 158 159LiveSession::LiveSession( 160 const sp<AMessage> ¬ify, uint32_t flags, 161 const sp<IMediaHTTPService> &httpService) 162 : mNotify(notify), 163 mFlags(flags), 164 mHTTPService(httpService), 165 mBuffering(false), 166 mInPreparationPhase(true), 167 mPollBufferingGeneration(0), 168 mPrevBufferPercentage(-1), 169 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 170 mCurBandwidthIndex(-1), 171 mOrigBandwidthIndex(-1), 172 mLastBandwidthBps(-1ll), 173 mBandwidthEstimator(new BandwidthEstimator()), 174 mMaxWidth(720), 175 mMaxHeight(480), 176 mStreamMask(0), 177 mNewStreamMask(0), 178 mSwapMask(0), 179 mSwitchGeneration(0), 180 mSubtitleGeneration(0), 181 mLastDequeuedTimeUs(0ll), 182 mRealTimeBaseUs(0ll), 183 mReconfigurationInProgress(false), 184 mSwitchInProgress(false), 185 mUpSwitchMark(kUpSwitchMarkUs), 186 mDownSwitchMark(kDownSwitchMarkUs), 187 mUpSwitchMargin(kUpSwitchMarginUs), 188 mFirstTimeUsValid(false), 189 mFirstTimeUs(0), 190 mLastSeekTimeUs(0) { 191 mStreams[kAudioIndex] = StreamItem("audio"); 192 mStreams[kVideoIndex] = StreamItem("video"); 193 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 194 195 for (size_t i = 0; i < kMaxStreams; ++i) { 196 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 197 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 198 } 199} 200 201LiveSession::~LiveSession() { 202 if (mFetcherLooper != NULL) { 203 mFetcherLooper->stop(); 204 } 205} 206 207status_t LiveSession::dequeueAccessUnit( 208 StreamType stream, sp<ABuffer> *accessUnit) { 209 status_t finalResult = OK; 210 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 211 212 ssize_t streamIdx = typeToIndex(stream); 213 if (streamIdx < 0) { 214 return INVALID_VALUE; 215 } 216 const char *streamStr = getNameForStream(stream); 217 // Do not let client pull data if we don't have data packets yet. 218 // We might only have a format discontinuity queued without data. 219 // When NuPlayerDecoder dequeues the format discontinuity, it will 220 // immediately try to getFormat. If we return NULL, NuPlayerDecoder 221 // thinks it can do seamless change, so will not shutdown decoder. 222 // When the actual format arrives, it can't handle it and get stuck. 223 if (!packetSource->hasDataBufferAvailable(&finalResult)) { 224 ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)", 225 streamStr, finalResult); 226 227 if (finalResult == OK) { 228 return -EAGAIN; 229 } else { 230 return finalResult; 231 } 232 } 233 234 // Let the client dequeue as long as we have buffers available 235 // Do not make pause/resume decisions here. 236 237 status_t err = packetSource->dequeueAccessUnit(accessUnit); 238 239 StreamItem& strm = mStreams[streamIdx]; 240 if (err == INFO_DISCONTINUITY) { 241 // adaptive streaming, discontinuities in the playlist 242 int32_t type; 243 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 244 245 sp<AMessage> extra; 246 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 247 extra.clear(); 248 } 249 250 ALOGI("[%s] read discontinuity of type %d, extra = %s", 251 streamStr, 252 type, 253 extra == NULL ? "NULL" : extra->debugString().c_str()); 254 } else if (err == OK) { 255 256 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 257 int64_t timeUs, originalTimeUs; 258 int32_t discontinuitySeq = 0; 259 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 260 originalTimeUs = timeUs; 261 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 262 if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { 263 int64_t offsetTimeUs; 264 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 265 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); 266 } else { 267 offsetTimeUs = 0; 268 } 269 270 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 271 int64_t firstTimeUs; 272 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 273 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 274 offsetTimeUs += strm.mLastSampleDurationUs; 275 } else { 276 offsetTimeUs += strm.mLastSampleDurationUs; 277 } 278 279 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); 280 strm.mCurDiscontinuitySeq = discontinuitySeq; 281 } 282 283 int32_t discard = 0; 284 int64_t firstTimeUs; 285 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 286 int64_t durUs; // approximate sample duration 287 if (timeUs > strm.mLastDequeuedTimeUs) { 288 durUs = timeUs - strm.mLastDequeuedTimeUs; 289 } else { 290 durUs = strm.mLastDequeuedTimeUs - timeUs; 291 } 292 strm.mLastSampleDurationUs = durUs; 293 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 294 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 295 firstTimeUs = timeUs; 296 } else { 297 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 298 firstTimeUs = timeUs; 299 } 300 301 strm.mLastDequeuedTimeUs = timeUs; 302 if (timeUs >= firstTimeUs) { 303 timeUs -= firstTimeUs; 304 } else { 305 timeUs = 0; 306 } 307 timeUs += mLastSeekTimeUs; 308 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 309 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 310 } 311 312 ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us", 313 streamStr, (long long)timeUs, (long long)originalTimeUs); 314 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 315 mLastDequeuedTimeUs = timeUs; 316 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 317 } else if (stream == STREAMTYPE_SUBTITLES) { 318 int32_t subtitleGeneration; 319 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 320 && subtitleGeneration != mSubtitleGeneration) { 321 return -EAGAIN; 322 }; 323 (*accessUnit)->meta()->setInt32( 324 "trackIndex", mPlaylist->getSelectedIndex()); 325 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 326 } 327 } else { 328 ALOGI("[%s] encountered error %d", streamStr, err); 329 } 330 331 return err; 332} 333 334status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 335 if (!(mStreamMask & stream)) { 336 return UNKNOWN_ERROR; 337 } 338 339 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 340 341 sp<MetaData> meta = packetSource->getFormat(); 342 343 if (meta == NULL) { 344 return -EAGAIN; 345 } 346 347 if (stream == STREAMTYPE_AUDIO) { 348 // set AAC input buffer size to 32K bytes (256kbps x 1sec) 349 meta->setInt32(kKeyMaxInputSize, 32 * 1024); 350 } else if (stream == STREAMTYPE_VIDEO) { 351 meta->setInt32(kKeyMaxWidth, mMaxWidth); 352 meta->setInt32(kKeyMaxHeight, mMaxHeight); 353 } 354 355 return convertMetaDataToMessage(meta, format); 356} 357 358sp<HTTPBase> LiveSession::getHTTPDataSource() { 359 return new MediaHTTP(mHTTPService->makeHTTPConnection()); 360} 361 362void LiveSession::connectAsync( 363 const char *url, const KeyedVector<String8, String8> *headers) { 364 sp<AMessage> msg = new AMessage(kWhatConnect, this); 365 msg->setString("url", url); 366 367 if (headers != NULL) { 368 msg->setPointer( 369 "headers", 370 new KeyedVector<String8, String8>(*headers)); 371 } 372 373 msg->post(); 374} 375 376status_t LiveSession::disconnect() { 377 sp<AMessage> msg = new AMessage(kWhatDisconnect, this); 378 379 sp<AMessage> response; 380 status_t err = msg->postAndAwaitResponse(&response); 381 382 return err; 383} 384 385status_t LiveSession::seekTo(int64_t timeUs) { 386 sp<AMessage> msg = new AMessage(kWhatSeek, this); 387 msg->setInt64("timeUs", timeUs); 388 389 sp<AMessage> response; 390 status_t err = msg->postAndAwaitResponse(&response); 391 392 return err; 393} 394 395bool LiveSession::checkSwitchProgress( 396 sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) { 397 AString newUri; 398 CHECK(stopParams->findString("uri", &newUri)); 399 400 *needResumeUntil = false; 401 sp<AMessage> firstNewMeta[kMaxStreams]; 402 for (size_t i = 0; i < kMaxStreams; ++i) { 403 StreamType stream = indexToType(i); 404 if (!(mSwapMask & mNewStreamMask & stream) 405 || (mStreams[i].mNewUri != newUri)) { 406 continue; 407 } 408 if (stream == STREAMTYPE_SUBTITLES) { 409 continue; 410 } 411 sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i); 412 413 // First, get latest dequeued meta, which is where the decoder is at. 414 // (when upswitching, we take the meta after a certain delay, so that 415 // the decoder is left with some cushion) 416 sp<AMessage> lastDequeueMeta, lastEnqueueMeta; 417 if (delayUs > 0) { 418 lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); 419 if (lastDequeueMeta == NULL) { 420 // this means we don't have enough cushion, try again later 421 ALOGV("[%s] up switching failed due to insufficient buffer", 422 getNameForStream(stream)); 423 return false; 424 } 425 } else { 426 // It's okay for lastDequeueMeta to be NULL here, it means the 427 // decoder hasn't even started dequeueing 428 lastDequeueMeta = source->getLatestDequeuedMeta(); 429 } 430 // Then, trim off packets at beginning of mPacketSources2 that's before 431 // the latest dequeued time. These samples are definitely too late. 432 firstNewMeta[i] = mPacketSources2.editValueAt(i) 433 ->trimBuffersBeforeMeta(lastDequeueMeta); 434 435 // Now firstNewMeta[i] is the first sample after the trim. 436 // If it's NULL, we failed because dequeue already past all samples 437 // in mPacketSource2, we have to try again. 438 if (firstNewMeta[i] == NULL) { 439 HLSTime dequeueTime(lastDequeueMeta); 440 ALOGV("[%s] dequeue time (%d, %lld) past start time", 441 getNameForStream(stream), 442 dequeueTime.mSeq, (long long) dequeueTime.mTimeUs); 443 return false; 444 } 445 446 // Otherwise, we check if mPacketSources2 overlaps with what old fetcher 447 // already fetched, and see if we need to resumeUntil 448 lastEnqueueMeta = source->getLatestEnqueuedMeta(); 449 // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity 450 // boundary, no need to resume as the content will look different anyways 451 if (lastEnqueueMeta != NULL) { 452 HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]); 453 454 // no need to resume old fetcher if new fetcher started in different 455 // discontinuity sequence, as the content will look different. 456 *needResumeUntil |= (startTime.mSeq == lastTime.mSeq 457 && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs); 458 459 // update the stopTime for resumeUntil 460 stopParams->setInt32("discontinuitySeq", startTime.mSeq); 461 stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs); 462 } 463 } 464 465 // if we're here, it means dequeue progress hasn't passed some samples in 466 // mPacketSource2, we can trim off the excess in mPacketSource. 467 // (old fetcher might still need to resumeUntil the start time of new fetcher) 468 for (size_t i = 0; i < kMaxStreams; ++i) { 469 StreamType stream = indexToType(i); 470 if (!(mSwapMask & mNewStreamMask & stream) 471 || (newUri != mStreams[i].mNewUri) 472 || stream == STREAMTYPE_SUBTITLES) { 473 continue; 474 } 475 mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]); 476 } 477 478 // no resumeUntil if already underflow 479 *needResumeUntil &= !mBuffering; 480 481 return true; 482} 483 484void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 485 switch (msg->what()) { 486 case kWhatConnect: 487 { 488 onConnect(msg); 489 break; 490 } 491 492 case kWhatDisconnect: 493 { 494 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 495 496 if (mReconfigurationInProgress) { 497 break; 498 } 499 500 finishDisconnect(); 501 break; 502 } 503 504 case kWhatSeek: 505 { 506 if (mReconfigurationInProgress) { 507 msg->post(50000); 508 break; 509 } 510 511 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 512 mSeekReply = new AMessage; 513 514 onSeek(msg); 515 break; 516 } 517 518 case kWhatFetcherNotify: 519 { 520 int32_t what; 521 CHECK(msg->findInt32("what", &what)); 522 523 switch (what) { 524 case PlaylistFetcher::kWhatStarted: 525 break; 526 case PlaylistFetcher::kWhatPaused: 527 case PlaylistFetcher::kWhatStopped: 528 { 529 AString uri; 530 CHECK(msg->findString("uri", &uri)); 531 ssize_t index = mFetcherInfos.indexOfKey(uri); 532 if (index < 0) { 533 // ignore msgs from fetchers that's already gone 534 break; 535 } 536 537 ALOGV("fetcher-%d %s", 538 mFetcherInfos[index].mFetcher->getFetcherID(), 539 what == PlaylistFetcher::kWhatPaused ? 540 "paused" : "stopped"); 541 542 if (what == PlaylistFetcher::kWhatStopped) { 543 mFetcherLooper->unregisterHandler( 544 mFetcherInfos[index].mFetcher->id()); 545 mFetcherInfos.removeItemsAt(index); 546 } else if (what == PlaylistFetcher::kWhatPaused) { 547 int32_t seekMode; 548 CHECK(msg->findInt32("seekMode", &seekMode)); 549 for (size_t i = 0; i < kMaxStreams; ++i) { 550 if (mStreams[i].mUri == uri) { 551 mStreams[i].mSeekMode = (SeekMode) seekMode; 552 } 553 } 554 } 555 556 if (mContinuation != NULL) { 557 CHECK_GT(mContinuationCounter, 0); 558 if (--mContinuationCounter == 0) { 559 mContinuation->post(); 560 } 561 ALOGV("%zu fetcher(s) left", mContinuationCounter); 562 } 563 break; 564 } 565 566 case PlaylistFetcher::kWhatDurationUpdate: 567 { 568 AString uri; 569 CHECK(msg->findString("uri", &uri)); 570 571 int64_t durationUs; 572 CHECK(msg->findInt64("durationUs", &durationUs)); 573 574 ssize_t index = mFetcherInfos.indexOfKey(uri); 575 if (index >= 0) { 576 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 577 info->mDurationUs = durationUs; 578 } 579 break; 580 } 581 582 case PlaylistFetcher::kWhatTargetDurationUpdate: 583 { 584 int64_t targetDurationUs; 585 CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); 586 mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4); 587 mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4); 588 mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs); 589 break; 590 } 591 592 case PlaylistFetcher::kWhatError: 593 { 594 status_t err; 595 CHECK(msg->findInt32("err", &err)); 596 597 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 598 599 // handle EOS on subtitle tracks independently 600 AString uri; 601 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 602 ssize_t i = mFetcherInfos.indexOfKey(uri); 603 if (i >= 0) { 604 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 605 if (fetcher != NULL) { 606 uint32_t type = fetcher->getStreamTypeMask(); 607 if (type == STREAMTYPE_SUBTITLES) { 608 mPacketSources.valueFor( 609 STREAMTYPE_SUBTITLES)->signalEOS(err);; 610 break; 611 } 612 } 613 } 614 } 615 616 if (mInPreparationPhase) { 617 postPrepared(err); 618 } 619 620 cancelBandwidthSwitch(); 621 622 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 623 624 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 625 626 mPacketSources.valueFor( 627 STREAMTYPE_SUBTITLES)->signalEOS(err); 628 629 postError(err); 630 break; 631 } 632 633 case PlaylistFetcher::kWhatStopReached: 634 { 635 ALOGV("kWhatStopReached"); 636 637 AString oldUri; 638 CHECK(msg->findString("uri", &oldUri)); 639 640 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 641 if (index < 0) { 642 break; 643 } 644 645 tryToFinishBandwidthSwitch(oldUri); 646 break; 647 } 648 649 case PlaylistFetcher::kWhatStartedAt: 650 { 651 int32_t switchGeneration; 652 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 653 654 ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d", 655 switchGeneration, mSwitchGeneration); 656 657 if (switchGeneration != mSwitchGeneration) { 658 break; 659 } 660 661 AString uri; 662 CHECK(msg->findString("uri", &uri)); 663 664 // mark new fetcher mToBeResumed 665 ssize_t index = mFetcherInfos.indexOfKey(uri); 666 if (index >= 0) { 667 mFetcherInfos.editValueAt(index).mToBeResumed = true; 668 } 669 670 // temporarily disable packet sources to be swapped to prevent 671 // NuPlayerDecoder from dequeuing while we check progress 672 for (size_t i = 0; i < mPacketSources.size(); ++i) { 673 if ((mSwapMask & mPacketSources.keyAt(i)) 674 && uri == mStreams[i].mNewUri) { 675 mPacketSources.editValueAt(i)->enable(false); 676 } 677 } 678 bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); 679 // If switching up, require a cushion bigger than kUnderflowMark 680 // to avoid buffering immediately after the switch. 681 // (If we don't have that cushion we'd rather cancel and try again.) 682 int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0; 683 bool needResumeUntil = false; 684 sp<AMessage> stopParams = msg; 685 if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { 686 // playback time hasn't passed startAt time 687 if (!needResumeUntil) { 688 ALOGV("finish switch"); 689 for (size_t i = 0; i < kMaxStreams; ++i) { 690 if ((mSwapMask & indexToType(i)) 691 && uri == mStreams[i].mNewUri) { 692 // have to make a copy of mStreams[i].mUri because 693 // tryToFinishBandwidthSwitch is modifying mStreams[] 694 AString oldURI = mStreams[i].mUri; 695 tryToFinishBandwidthSwitch(oldURI); 696 break; 697 } 698 } 699 } else { 700 // startAt time is after last enqueue time 701 // Resume fetcher for the original variant; the resumed fetcher should 702 // continue until the timestamps found in msg, which is stored by the 703 // new fetcher to indicate where the new variant has started buffering. 704 ALOGV("finish switch with resumeUntilAsync"); 705 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 706 const FetcherInfo &info = mFetcherInfos.valueAt(i); 707 if (info.mToBeRemoved) { 708 info.mFetcher->resumeUntilAsync(stopParams); 709 } 710 } 711 } 712 } else { 713 // playback time passed startAt time 714 if (switchUp) { 715 // if switching up, cancel and retry if condition satisfies again 716 ALOGV("cancel up switch because we're too late"); 717 cancelBandwidthSwitch(true /* resume */); 718 } else { 719 ALOGV("retry down switch at next sample"); 720 resumeFetcher(uri, mSwapMask, -1, true /* newUri */); 721 } 722 } 723 // re-enable all packet sources 724 for (size_t i = 0; i < mPacketSources.size(); ++i) { 725 mPacketSources.editValueAt(i)->enable(true); 726 } 727 728 break; 729 } 730 731 default: 732 TRESPASS(); 733 } 734 735 break; 736 } 737 738 case kWhatChangeConfiguration: 739 { 740 onChangeConfiguration(msg); 741 break; 742 } 743 744 case kWhatChangeConfiguration2: 745 { 746 onChangeConfiguration2(msg); 747 break; 748 } 749 750 case kWhatChangeConfiguration3: 751 { 752 onChangeConfiguration3(msg); 753 break; 754 } 755 756 case kWhatFinishDisconnect2: 757 { 758 onFinishDisconnect2(); 759 break; 760 } 761 762 case kWhatPollBuffering: 763 { 764 int32_t generation; 765 CHECK(msg->findInt32("generation", &generation)); 766 if (generation == mPollBufferingGeneration) { 767 onPollBuffering(); 768 } 769 break; 770 } 771 772 default: 773 TRESPASS(); 774 break; 775 } 776} 777 778// static 779int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 780 if (a->mBandwidth < b->mBandwidth) { 781 return -1; 782 } else if (a->mBandwidth == b->mBandwidth) { 783 return 0; 784 } 785 786 return 1; 787} 788 789// static 790LiveSession::StreamType LiveSession::indexToType(int idx) { 791 CHECK(idx >= 0 && idx < kMaxStreams); 792 return (StreamType)(1 << idx); 793} 794 795// static 796ssize_t LiveSession::typeToIndex(int32_t type) { 797 switch (type) { 798 case STREAMTYPE_AUDIO: 799 return 0; 800 case STREAMTYPE_VIDEO: 801 return 1; 802 case STREAMTYPE_SUBTITLES: 803 return 2; 804 default: 805 return -1; 806 }; 807 return -1; 808} 809 810void LiveSession::onConnect(const sp<AMessage> &msg) { 811 AString url; 812 CHECK(msg->findString("url", &url)); 813 814 KeyedVector<String8, String8> *headers = NULL; 815 if (!msg->findPointer("headers", (void **)&headers)) { 816 mExtraHeaders.clear(); 817 } else { 818 mExtraHeaders = *headers; 819 820 delete headers; 821 headers = NULL; 822 } 823 824 // TODO currently we don't know if we are coming here from incognito mode 825 ALOGI("onConnect %s", uriDebugString(url).c_str()); 826 827 mMasterURL = url; 828 829 bool dummy; 830 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 831 832 if (mPlaylist == NULL) { 833 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 834 835 postPrepared(ERROR_IO); 836 return; 837 } 838 839 // create looper for fetchers 840 if (mFetcherLooper == NULL) { 841 mFetcherLooper = new ALooper(); 842 843 mFetcherLooper->setName("Fetcher"); 844 mFetcherLooper->start(false, false); 845 } 846 847 // We trust the content provider to make a reasonable choice of preferred 848 // initial bandwidth by listing it first in the variant playlist. 849 // At startup we really don't have a good estimate on the available 850 // network bandwidth since we haven't tranferred any data yet. Once 851 // we have we can make a better informed choice. 852 size_t initialBandwidth = 0; 853 size_t initialBandwidthIndex = 0; 854 855 int32_t maxWidth = 0; 856 int32_t maxHeight = 0; 857 858 if (mPlaylist->isVariantPlaylist()) { 859 Vector<BandwidthItem> itemsWithVideo; 860 for (size_t i = 0; i < mPlaylist->size(); ++i) { 861 BandwidthItem item; 862 863 item.mPlaylistIndex = i; 864 865 sp<AMessage> meta; 866 AString uri; 867 mPlaylist->itemAt(i, &uri, &meta); 868 869 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 870 871 int32_t width, height; 872 if (meta->findInt32("width", &width)) { 873 maxWidth = max(maxWidth, width); 874 } 875 if (meta->findInt32("height", &height)) { 876 maxHeight = max(maxHeight, height); 877 } 878 879 mBandwidthItems.push(item); 880 if (mPlaylist->hasType(i, "video")) { 881 itemsWithVideo.push(item); 882 } 883 } 884 // remove the audio-only variants if we have at least one with video 885 if (!itemsWithVideo.empty() 886 && itemsWithVideo.size() < mBandwidthItems.size()) { 887 mBandwidthItems.clear(); 888 for (size_t i = 0; i < itemsWithVideo.size(); ++i) { 889 mBandwidthItems.push(itemsWithVideo[i]); 890 } 891 } 892 893 CHECK_GT(mBandwidthItems.size(), 0u); 894 initialBandwidth = mBandwidthItems[0].mBandwidth; 895 896 mBandwidthItems.sort(SortByBandwidth); 897 898 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 899 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 900 initialBandwidthIndex = i; 901 break; 902 } 903 } 904 } else { 905 // dummy item. 906 BandwidthItem item; 907 item.mPlaylistIndex = 0; 908 item.mBandwidth = 0; 909 mBandwidthItems.push(item); 910 } 911 912 mMaxWidth = maxWidth > 0 ? maxWidth : mMaxWidth; 913 mMaxHeight = maxHeight > 0 ? maxHeight : mMaxHeight; 914 915 mPlaylist->pickRandomMediaItems(); 916 changeConfiguration( 917 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 918} 919 920void LiveSession::finishDisconnect() { 921 ALOGV("finishDisconnect"); 922 923 // No reconfiguration is currently pending, make sure none will trigger 924 // during disconnection either. 925 cancelBandwidthSwitch(); 926 927 // cancel buffer polling 928 cancelPollBuffering(); 929 930 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 931 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 932 } 933 934 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this); 935 936 mContinuationCounter = mFetcherInfos.size(); 937 mContinuation = msg; 938 939 if (mContinuationCounter == 0) { 940 msg->post(); 941 } 942} 943 944void LiveSession::onFinishDisconnect2() { 945 mContinuation.clear(); 946 947 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 948 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 949 950 mPacketSources.valueFor( 951 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 952 953 sp<AMessage> response = new AMessage; 954 response->setInt32("err", OK); 955 956 response->postReply(mDisconnectReplyID); 957 mDisconnectReplyID.clear(); 958} 959 960sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 961 ssize_t index = mFetcherInfos.indexOfKey(uri); 962 963 if (index >= 0) { 964 return NULL; 965 } 966 967 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this); 968 notify->setString("uri", uri); 969 notify->setInt32("switchGeneration", mSwitchGeneration); 970 971 FetcherInfo info; 972 info.mFetcher = new PlaylistFetcher( 973 notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration); 974 info.mDurationUs = -1ll; 975 info.mToBeRemoved = false; 976 info.mToBeResumed = false; 977 mFetcherLooper->registerHandler(info.mFetcher); 978 979 mFetcherInfos.add(uri, info); 980 981 return info.mFetcher; 982} 983 984/* 985 * Illustration of parameters: 986 * 987 * 0 `range_offset` 988 * +------------+-------------------------------------------------------+--+--+ 989 * | | | next block to fetch | | | 990 * | | `source` handle => `out` buffer | | | | 991 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 992 * | |<----------- `range_length` / buffer capacity ----------->| | 993 * |<------------------------------ file_size ------------------------------->| 994 * 995 * Special parameter values: 996 * - range_length == -1 means entire file 997 * - block_size == 0 means entire range 998 * 999 */ 1000ssize_t LiveSession::fetchFile( 1001 const char *url, sp<ABuffer> *out, 1002 int64_t range_offset, int64_t range_length, 1003 uint32_t block_size, /* download block size */ 1004 sp<DataSource> *source, /* to return and reuse source */ 1005 String8 *actualUrl, 1006 bool forceConnectHTTP /* force connect HTTP when resuing source */) { 1007 off64_t size; 1008 sp<DataSource> temp_source; 1009 if (source == NULL) { 1010 source = &temp_source; 1011 } 1012 1013 if (*source == NULL || forceConnectHTTP) { 1014 if (!strncasecmp(url, "file://", 7)) { 1015 *source = new FileSource(url + 7); 1016 } else if (strncasecmp(url, "http://", 7) 1017 && strncasecmp(url, "https://", 8)) { 1018 return ERROR_UNSUPPORTED; 1019 } else { 1020 KeyedVector<String8, String8> headers = mExtraHeaders; 1021 if (range_offset > 0 || range_length >= 0) { 1022 headers.add( 1023 String8("Range"), 1024 String8( 1025 AStringPrintf( 1026 "bytes=%lld-%s", 1027 range_offset, 1028 range_length < 0 1029 ? "" : AStringPrintf("%lld", 1030 range_offset + range_length - 1).c_str()).c_str())); 1031 } 1032 1033 HTTPBase* httpDataSource = 1034 (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get(); 1035 status_t err = httpDataSource->connect(url, &headers); 1036 1037 if (err != OK) { 1038 return err; 1039 } 1040 1041 if (*source == NULL) { 1042 *source = mHTTPDataSource; 1043 } 1044 } 1045 } 1046 1047 status_t getSizeErr = (*source)->getSize(&size); 1048 if (getSizeErr != OK) { 1049 size = 65536; 1050 } 1051 1052 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 1053 if (*out == NULL) { 1054 buffer->setRange(0, 0); 1055 } 1056 1057 ssize_t bytesRead = 0; 1058 // adjust range_length if only reading partial block 1059 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 1060 range_length = buffer->size() + block_size; 1061 } 1062 for (;;) { 1063 // Only resize when we don't know the size. 1064 size_t bufferRemaining = buffer->capacity() - buffer->size(); 1065 if (bufferRemaining == 0 && getSizeErr != OK) { 1066 size_t bufferIncrement = buffer->size() / 2; 1067 if (bufferIncrement < 32768) { 1068 bufferIncrement = 32768; 1069 } 1070 bufferRemaining = bufferIncrement; 1071 1072 ALOGV("increasing download buffer to %zu bytes", 1073 buffer->size() + bufferRemaining); 1074 1075 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 1076 memcpy(copy->data(), buffer->data(), buffer->size()); 1077 copy->setRange(0, buffer->size()); 1078 1079 buffer = copy; 1080 } 1081 1082 size_t maxBytesToRead = bufferRemaining; 1083 if (range_length >= 0) { 1084 int64_t bytesLeftInRange = range_length - buffer->size(); 1085 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 1086 maxBytesToRead = bytesLeftInRange; 1087 1088 if (bytesLeftInRange == 0) { 1089 break; 1090 } 1091 } 1092 } 1093 1094 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 1095 // to help us break out of the loop. 1096 ssize_t n = (*source)->readAt( 1097 buffer->size(), buffer->data() + buffer->size(), 1098 maxBytesToRead); 1099 1100 if (n < 0) { 1101 return n; 1102 } 1103 1104 if (n == 0) { 1105 break; 1106 } 1107 1108 buffer->setRange(0, buffer->size() + (size_t)n); 1109 bytesRead += n; 1110 } 1111 1112 *out = buffer; 1113 if (actualUrl != NULL) { 1114 *actualUrl = (*source)->getUri(); 1115 if (actualUrl->isEmpty()) { 1116 *actualUrl = url; 1117 } 1118 } 1119 1120 return bytesRead; 1121} 1122 1123sp<M3UParser> LiveSession::fetchPlaylist( 1124 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 1125 ALOGV("fetchPlaylist '%s'", url); 1126 1127 *unchanged = false; 1128 1129 sp<ABuffer> buffer; 1130 String8 actualUrl; 1131 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 1132 1133 // close off the connection after use 1134 mHTTPDataSource->disconnect(); 1135 1136 if (err <= 0) { 1137 return NULL; 1138 } 1139 1140 // MD5 functionality is not available on the simulator, treat all 1141 // playlists as changed. 1142 1143#if defined(HAVE_ANDROID_OS) 1144 uint8_t hash[16]; 1145 1146 MD5_CTX m; 1147 MD5_Init(&m); 1148 MD5_Update(&m, buffer->data(), buffer->size()); 1149 1150 MD5_Final(hash, &m); 1151 1152 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 1153 // playlist unchanged 1154 *unchanged = true; 1155 1156 return NULL; 1157 } 1158 1159 if (curPlaylistHash != NULL) { 1160 memcpy(curPlaylistHash, hash, sizeof(hash)); 1161 } 1162#endif 1163 1164 sp<M3UParser> playlist = 1165 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 1166 1167 if (playlist->initCheck() != OK) { 1168 ALOGE("failed to parse .m3u8 playlist"); 1169 1170 return NULL; 1171 } 1172 1173 return playlist; 1174} 1175 1176#if 0 1177static double uniformRand() { 1178 return (double)rand() / RAND_MAX; 1179} 1180#endif 1181 1182bool LiveSession::resumeFetcher( 1183 const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { 1184 ssize_t index = mFetcherInfos.indexOfKey(uri); 1185 if (index < 0) { 1186 ALOGE("did not find fetcher for uri: %s", uri.c_str()); 1187 return false; 1188 } 1189 1190 bool resume = false; 1191 sp<AnotherPacketSource> sources[kMaxStreams]; 1192 for (size_t i = 0; i < kMaxStreams; ++i) { 1193 if ((streamMask & indexToType(i)) 1194 && ((!newUri && uri == mStreams[i].mUri) 1195 || (newUri && uri == mStreams[i].mNewUri))) { 1196 resume = true; 1197 if (newUri) { 1198 sources[i] = mPacketSources2.valueFor(indexToType(i)); 1199 sources[i]->clear(); 1200 } else { 1201 sources[i] = mPacketSources.valueFor(indexToType(i)); 1202 } 1203 } 1204 } 1205 1206 if (resume) { 1207 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher; 1208 SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; 1209 1210 ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d", 1211 fetcher->getFetcherID(), (long long)timeUs, seekMode); 1212 1213 fetcher->startAsync( 1214 sources[kAudioIndex], 1215 sources[kVideoIndex], 1216 sources[kSubtitleIndex], 1217 timeUs, -1, -1, seekMode); 1218 } 1219 1220 return resume; 1221} 1222 1223float LiveSession::getAbortThreshold( 1224 ssize_t currentBWIndex, ssize_t targetBWIndex) const { 1225 float abortThreshold = -1.0f; 1226 if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) { 1227 /* 1228 If we're switching down, we need to decide whether to 1229 1230 1) finish last segment of high-bandwidth variant, or 1231 2) abort last segment of high-bandwidth variant, and fetch an 1232 overlapping portion from low-bandwidth variant. 1233 1234 Here we try to maximize the amount of buffer left when the 1235 switch point is met. Given the following parameters: 1236 1237 B: our current buffering level in seconds 1238 T: target duration in seconds 1239 X: sample duration in seconds remain to fetch in last segment 1240 bw0: bandwidth of old variant (as specified in playlist) 1241 bw1: bandwidth of new variant (as specified in playlist) 1242 bw: measured bandwidth available 1243 1244 If we choose 1), when switch happens at the end of current 1245 segment, our buffering will be 1246 B + X - X * bw0 / bw 1247 1248 If we choose 2), when switch happens where we aborted current 1249 segment, our buffering will be 1250 B - (T - X) * bw1 / bw 1251 1252 We should only choose 1) if 1253 X/T < bw1 / (bw1 + bw0 - bw) 1254 */ 1255 1256 // Taking the measured current bandwidth at 50% face value only, 1257 // as our bandwidth estimation is a lagging indicator. Being 1258 // conservative on this, we prefer switching to lower bandwidth 1259 // unless we're really confident finishing up the last segment 1260 // of higher bandwidth will be fast. 1261 CHECK(mLastBandwidthBps >= 0); 1262 abortThreshold = 1263 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1264 / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1265 + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth 1266 - (float)mLastBandwidthBps * 0.5f); 1267 if (abortThreshold < 0.0f) { 1268 abortThreshold = -1.0f; // do not abort 1269 } 1270 ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f", 1271 mBandwidthItems.itemAt(currentBWIndex).mBandwidth, 1272 mBandwidthItems.itemAt(targetBWIndex).mBandwidth, 1273 mLastBandwidthBps, 1274 abortThreshold); 1275 } 1276 return abortThreshold; 1277} 1278 1279void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) { 1280 mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs); 1281} 1282 1283size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) { 1284 if (mBandwidthItems.size() < 2) { 1285 // shouldn't be here if we only have 1 bandwidth, check 1286 // logic to get rid of redundant bandwidth polling 1287 ALOGW("getBandwidthIndex() called for single bandwidth playlist!"); 1288 return 0; 1289 } 1290 1291#if 1 1292 char value[PROPERTY_VALUE_MAX]; 1293 ssize_t index = -1; 1294 if (property_get("media.httplive.bw-index", value, NULL)) { 1295 char *end; 1296 index = strtol(value, &end, 10); 1297 CHECK(end > value && *end == '\0'); 1298 1299 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 1300 index = mBandwidthItems.size() - 1; 1301 } 1302 } 1303 1304 if (index < 0) { 1305 char value[PROPERTY_VALUE_MAX]; 1306 if (property_get("media.httplive.max-bw", value, NULL)) { 1307 char *end; 1308 long maxBw = strtoul(value, &end, 10); 1309 if (end > value && *end == '\0') { 1310 if (maxBw > 0 && bandwidthBps > maxBw) { 1311 ALOGV("bandwidth capped to %ld bps", maxBw); 1312 bandwidthBps = maxBw; 1313 } 1314 } 1315 } 1316 1317 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1318 1319 index = mBandwidthItems.size() - 1; 1320 while (index > 0) { 1321 // be conservative (70%) to avoid overestimating and immediately 1322 // switching down again. 1323 size_t adjustedBandwidthBps = bandwidthBps * 7 / 10; 1324 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1325 break; 1326 } 1327 --index; 1328 } 1329 } 1330#elif 0 1331 // Change bandwidth at random() 1332 size_t index = uniformRand() * mBandwidthItems.size(); 1333#elif 0 1334 // There's a 50% chance to stay on the current bandwidth and 1335 // a 50% chance to switch to the next higher bandwidth (wrapping around 1336 // to lowest) 1337 const size_t kMinIndex = 0; 1338 1339 static ssize_t mCurBandwidthIndex = -1; 1340 1341 size_t index; 1342 if (mCurBandwidthIndex < 0) { 1343 index = kMinIndex; 1344 } else if (uniformRand() < 0.5) { 1345 index = (size_t)mCurBandwidthIndex; 1346 } else { 1347 index = mCurBandwidthIndex + 1; 1348 if (index == mBandwidthItems.size()) { 1349 index = kMinIndex; 1350 } 1351 } 1352 mCurBandwidthIndex = index; 1353#elif 0 1354 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1355 1356 size_t index = mBandwidthItems.size() - 1; 1357 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1358 --index; 1359 } 1360#elif 1 1361 char value[PROPERTY_VALUE_MAX]; 1362 size_t index; 1363 if (property_get("media.httplive.bw-index", value, NULL)) { 1364 char *end; 1365 index = strtoul(value, &end, 10); 1366 CHECK(end > value && *end == '\0'); 1367 1368 if (index >= mBandwidthItems.size()) { 1369 index = mBandwidthItems.size() - 1; 1370 } 1371 } else { 1372 index = 0; 1373 } 1374#else 1375 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1376#endif 1377 1378 CHECK_GE(index, 0); 1379 1380 return index; 1381} 1382 1383HLSTime LiveSession::latestMediaSegmentStartTime() const { 1384 HLSTime audioTime(mPacketSources.valueFor( 1385 STREAMTYPE_AUDIO)->getLatestDequeuedMeta()); 1386 1387 HLSTime videoTime(mPacketSources.valueFor( 1388 STREAMTYPE_VIDEO)->getLatestDequeuedMeta()); 1389 1390 return audioTime < videoTime ? videoTime : audioTime; 1391} 1392 1393void LiveSession::onSeek(const sp<AMessage> &msg) { 1394 int64_t timeUs; 1395 CHECK(msg->findInt64("timeUs", &timeUs)); 1396 changeConfiguration(timeUs); 1397} 1398 1399status_t LiveSession::getDuration(int64_t *durationUs) const { 1400 int64_t maxDurationUs = -1ll; 1401 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1402 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1403 1404 if (fetcherDurationUs > maxDurationUs) { 1405 maxDurationUs = fetcherDurationUs; 1406 } 1407 } 1408 1409 *durationUs = maxDurationUs; 1410 1411 return OK; 1412} 1413 1414bool LiveSession::isSeekable() const { 1415 int64_t durationUs; 1416 return getDuration(&durationUs) == OK && durationUs >= 0; 1417} 1418 1419bool LiveSession::hasDynamicDuration() const { 1420 return false; 1421} 1422 1423size_t LiveSession::getTrackCount() const { 1424 if (mPlaylist == NULL) { 1425 return 0; 1426 } else { 1427 return mPlaylist->getTrackCount(); 1428 } 1429} 1430 1431sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1432 if (mPlaylist == NULL) { 1433 return NULL; 1434 } else { 1435 return mPlaylist->getTrackInfo(trackIndex); 1436 } 1437} 1438 1439status_t LiveSession::selectTrack(size_t index, bool select) { 1440 if (mPlaylist == NULL) { 1441 return INVALID_OPERATION; 1442 } 1443 1444 ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++", 1445 index, select, mSubtitleGeneration); 1446 1447 ++mSubtitleGeneration; 1448 status_t err = mPlaylist->selectTrack(index, select); 1449 if (err == OK) { 1450 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this); 1451 msg->setInt32("pickTrack", select); 1452 msg->post(); 1453 } 1454 return err; 1455} 1456 1457ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1458 if (mPlaylist == NULL) { 1459 return -1; 1460 } else { 1461 return mPlaylist->getSelectedTrack(type); 1462 } 1463} 1464 1465void LiveSession::changeConfiguration( 1466 int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { 1467 ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d", 1468 (long long)timeUs, bandwidthIndex, pickTrack); 1469 1470 cancelBandwidthSwitch(); 1471 1472 CHECK(!mReconfigurationInProgress); 1473 mReconfigurationInProgress = true; 1474 if (bandwidthIndex >= 0) { 1475 mOrigBandwidthIndex = mCurBandwidthIndex; 1476 mCurBandwidthIndex = bandwidthIndex; 1477 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1478 ALOGI("#### Starting Bandwidth Switch: %zd => %zd", 1479 mOrigBandwidthIndex, mCurBandwidthIndex); 1480 } 1481 } 1482 CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size()); 1483 const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); 1484 1485 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1486 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1487 1488 AString URIs[kMaxStreams]; 1489 for (size_t i = 0; i < kMaxStreams; ++i) { 1490 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1491 streamMask |= indexToType(i); 1492 } 1493 } 1494 1495 // Step 1, stop and discard fetchers that are no longer needed. 1496 // Pause those that we'll reuse. 1497 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1498 // skip fetchers that are marked mToBeRemoved, 1499 // these are done and can't be reused 1500 if (mFetcherInfos[i].mToBeRemoved) { 1501 continue; 1502 } 1503 1504 const AString &uri = mFetcherInfos.keyAt(i); 1505 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher; 1506 1507 bool discardFetcher = true, delayRemoval = false; 1508 for (size_t j = 0; j < kMaxStreams; ++j) { 1509 StreamType type = indexToType(j); 1510 if ((streamMask & type) && uri == URIs[j]) { 1511 resumeMask |= type; 1512 streamMask &= ~type; 1513 discardFetcher = false; 1514 } 1515 } 1516 // Delay fetcher removal if not picking tracks, AND old fetcher 1517 // has stream mask that overlaps new variant. (Okay to discard 1518 // old fetcher now, if completely no overlap.) 1519 if (discardFetcher && timeUs < 0ll && !pickTrack 1520 && (fetcher->getStreamTypeMask() & streamMask)) { 1521 discardFetcher = false; 1522 delayRemoval = true; 1523 } 1524 1525 if (discardFetcher) { 1526 ALOGV("discarding fetcher-%d", fetcher->getFetcherID()); 1527 fetcher->stopAsync(); 1528 } else { 1529 float threshold = -1.0f; // always finish fetching by default 1530 if (timeUs >= 0ll) { 1531 // seeking, no need to finish fetching 1532 threshold = 0.0f; 1533 } else if (delayRemoval) { 1534 // adapting, abort if remaining of current segment is over threshold 1535 threshold = getAbortThreshold( 1536 mOrigBandwidthIndex, mCurBandwidthIndex); 1537 } 1538 1539 ALOGV("pausing fetcher-%d, threshold=%.2f", 1540 fetcher->getFetcherID(), threshold); 1541 fetcher->pauseAsync(threshold); 1542 } 1543 } 1544 1545 sp<AMessage> msg; 1546 if (timeUs < 0ll) { 1547 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1548 msg = new AMessage(kWhatChangeConfiguration3, this); 1549 } else { 1550 msg = new AMessage(kWhatChangeConfiguration2, this); 1551 } 1552 msg->setInt32("streamMask", streamMask); 1553 msg->setInt32("resumeMask", resumeMask); 1554 msg->setInt32("pickTrack", pickTrack); 1555 msg->setInt64("timeUs", timeUs); 1556 for (size_t i = 0; i < kMaxStreams; ++i) { 1557 if ((streamMask | resumeMask) & indexToType(i)) { 1558 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1559 } 1560 } 1561 1562 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1563 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1564 // fetchers have completed their asynchronous operation, we'll post 1565 // mContinuation, which then is handled below in onChangeConfiguration2. 1566 mContinuationCounter = mFetcherInfos.size(); 1567 mContinuation = msg; 1568 1569 if (mContinuationCounter == 0) { 1570 msg->post(); 1571 } 1572} 1573 1574void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1575 ALOGV("onChangeConfiguration"); 1576 1577 if (!mReconfigurationInProgress) { 1578 int32_t pickTrack = 0; 1579 msg->findInt32("pickTrack", &pickTrack); 1580 changeConfiguration(-1ll /* timeUs */, -1, pickTrack); 1581 } else { 1582 msg->post(1000000ll); // retry in 1 sec 1583 } 1584} 1585 1586void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1587 ALOGV("onChangeConfiguration2"); 1588 1589 mContinuation.clear(); 1590 1591 // All fetchers are either suspended or have been removed now. 1592 1593 // If we're seeking, clear all packet sources before we report 1594 // seek complete, to prevent decoder from pulling stale data. 1595 int64_t timeUs; 1596 CHECK(msg->findInt64("timeUs", &timeUs)); 1597 1598 if (timeUs >= 0) { 1599 mLastSeekTimeUs = timeUs; 1600 mLastDequeuedTimeUs = timeUs; 1601 1602 for (size_t i = 0; i < mPacketSources.size(); i++) { 1603 mPacketSources.editValueAt(i)->clear(); 1604 } 1605 1606 for (size_t i = 0; i < kMaxStreams; ++i) { 1607 mStreams[i].mCurDiscontinuitySeq = 0; 1608 } 1609 1610 mDiscontinuityOffsetTimesUs.clear(); 1611 mDiscontinuityAbsStartTimesUs.clear(); 1612 1613 if (mSeekReplyID != NULL) { 1614 CHECK(mSeekReply != NULL); 1615 mSeekReply->setInt32("err", OK); 1616 mSeekReply->postReply(mSeekReplyID); 1617 mSeekReplyID.clear(); 1618 mSeekReply.clear(); 1619 } 1620 1621 // restart buffer polling after seek becauese previous 1622 // buffering position is no longer valid. 1623 restartPollBuffering(); 1624 } 1625 1626 uint32_t streamMask, resumeMask; 1627 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1628 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1629 1630 streamMask |= resumeMask; 1631 1632 AString URIs[kMaxStreams]; 1633 for (size_t i = 0; i < kMaxStreams; ++i) { 1634 if (streamMask & indexToType(i)) { 1635 const AString &uriKey = mStreams[i].uriKey(); 1636 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1637 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1638 } 1639 } 1640 1641 uint32_t changedMask = 0; 1642 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1643 // stream URI could change even if onChangeConfiguration2 is only 1644 // used for seek. Seek could happen during a bw switch, in this 1645 // case bw switch will be cancelled, but the seekTo position will 1646 // fetch from the new URI. 1647 if ((mStreamMask & streamMask & indexToType(i)) 1648 && !mStreams[i].mUri.empty() 1649 && !(URIs[i] == mStreams[i].mUri)) { 1650 ALOGV("stream %zu changed: oldURI %s, newURI %s", i, 1651 mStreams[i].mUri.c_str(), URIs[i].c_str()); 1652 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i)); 1653 if (source->getLatestDequeuedMeta() != NULL) { 1654 source->queueDiscontinuity( 1655 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1656 } 1657 } 1658 // Determine which decoders to shutdown on the player side, 1659 // a decoder has to be shutdown if its streamtype was active 1660 // before but now longer isn't. 1661 if ((mStreamMask & ~streamMask & indexToType(i))) { 1662 changedMask |= indexToType(i); 1663 } 1664 } 1665 1666 if (changedMask == 0) { 1667 // If nothing changed as far as the audio/video decoders 1668 // are concerned we can proceed. 1669 onChangeConfiguration3(msg); 1670 return; 1671 } 1672 1673 // Something changed, inform the player which will shutdown the 1674 // corresponding decoders and will post the reply once that's done. 1675 // Handling the reply will continue executing below in 1676 // onChangeConfiguration3. 1677 sp<AMessage> notify = mNotify->dup(); 1678 notify->setInt32("what", kWhatStreamsChanged); 1679 notify->setInt32("changedMask", changedMask); 1680 1681 msg->setWhat(kWhatChangeConfiguration3); 1682 msg->setTarget(this); 1683 1684 notify->setMessage("reply", msg); 1685 notify->post(); 1686} 1687 1688void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1689 mContinuation.clear(); 1690 // All remaining fetchers are still suspended, the player has shutdown 1691 // any decoders that needed it. 1692 1693 uint32_t streamMask, resumeMask; 1694 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1695 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1696 1697 mNewStreamMask = streamMask | resumeMask; 1698 1699 int64_t timeUs; 1700 int32_t pickTrack; 1701 bool switching = false; 1702 CHECK(msg->findInt64("timeUs", &timeUs)); 1703 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1704 1705 if (timeUs < 0ll) { 1706 if (!pickTrack) { 1707 // mSwapMask contains streams that are in both old and new variant, 1708 // (in mNewStreamMask & mStreamMask) but with different URIs 1709 // (not in resumeMask). 1710 // For example, old variant has video and audio in two separate 1711 // URIs, and new variant has only audio with unchanged URI. mSwapMask 1712 // should be 0 as there is nothing to swap. We only need to stop video, 1713 // and resume audio. 1714 mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask; 1715 switching = (mSwapMask != 0); 1716 } 1717 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1718 } else { 1719 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1720 } 1721 1722 ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, " 1723 "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x", 1724 (long long)timeUs, switching, pickTrack, 1725 mStreamMask, mNewStreamMask, mSwapMask); 1726 1727 for (size_t i = 0; i < kMaxStreams; ++i) { 1728 if (streamMask & indexToType(i)) { 1729 if (switching) { 1730 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1731 } else { 1732 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1733 } 1734 } 1735 } 1736 1737 // Of all existing fetchers: 1738 // * Resume fetchers that are still needed and assign them original packet sources. 1739 // * Mark otherwise unneeded fetchers for removal. 1740 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1741 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1742 const AString &uri = mFetcherInfos.keyAt(i); 1743 if (!resumeFetcher(uri, resumeMask, timeUs)) { 1744 ALOGV("marking fetcher-%d to be removed", 1745 mFetcherInfos[i].mFetcher->getFetcherID()); 1746 1747 mFetcherInfos.editValueAt(i).mToBeRemoved = true; 1748 } 1749 } 1750 1751 // streamMask now only contains the types that need a new fetcher created. 1752 if (streamMask != 0) { 1753 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1754 } 1755 1756 // Find out when the original fetchers have buffered up to and start the new fetchers 1757 // at a later timestamp. 1758 for (size_t i = 0; i < kMaxStreams; i++) { 1759 if (!(indexToType(i) & streamMask)) { 1760 continue; 1761 } 1762 1763 AString uri; 1764 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1765 1766 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1767 CHECK(fetcher != NULL); 1768 1769 HLSTime startTime; 1770 SeekMode seekMode = kSeekModeExactPosition; 1771 sp<AnotherPacketSource> sources[kMaxStreams]; 1772 1773 if (i == kSubtitleIndex || (!pickTrack && !switching)) { 1774 startTime = latestMediaSegmentStartTime(); 1775 } 1776 1777 // TRICKY: looping from i as earlier streams are already removed from streamMask 1778 for (size_t j = i; j < kMaxStreams; ++j) { 1779 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1780 if ((streamMask & indexToType(j)) && uri == streamUri) { 1781 sources[j] = mPacketSources.valueFor(indexToType(j)); 1782 1783 if (timeUs >= 0) { 1784 startTime.mTimeUs = timeUs; 1785 } else { 1786 int32_t type; 1787 sp<AMessage> meta; 1788 if (!switching) { 1789 // selecting, or adapting but no swap required 1790 meta = sources[j]->getLatestDequeuedMeta(); 1791 } else { 1792 // adapting and swap required 1793 meta = sources[j]->getLatestEnqueuedMeta(); 1794 if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { 1795 // switching up 1796 meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); 1797 } 1798 } 1799 1800 if (j != kSubtitleIndex && meta != NULL 1801 && !meta->findInt32("discontinuity", &type)) { 1802 HLSTime tmpTime(meta); 1803 if (startTime < tmpTime) { 1804 startTime = tmpTime; 1805 } 1806 } 1807 1808 if (!switching) { 1809 // selecting, or adapting but no swap required 1810 sources[j]->clear(); 1811 if (j == kSubtitleIndex) { 1812 break; 1813 } 1814 1815 ALOGV("stream[%zu]: queue format change", j); 1816 sources[j]->queueDiscontinuity( 1817 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); 1818 } else { 1819 // switching, queue discontinuities after resume 1820 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1821 sources[j]->clear(); 1822 // the new fetcher might be providing streams that used to be 1823 // provided by two different fetchers, if one of the fetcher 1824 // paused in the middle while the other somehow paused in next 1825 // seg, we have to start from next seg. 1826 if (seekMode < mStreams[j].mSeekMode) { 1827 seekMode = mStreams[j].mSeekMode; 1828 } 1829 } 1830 } 1831 1832 streamMask &= ~indexToType(j); 1833 } 1834 } 1835 1836 ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld " 1837 "segmentStartTimeUs %lld seekMode %d", 1838 fetcher->getFetcherID(), 1839 (long long)startTime.mTimeUs, 1840 (long long)mLastSeekTimeUs, 1841 (long long)startTime.getSegmentTimeUs(true /* midpoint */), 1842 seekMode); 1843 1844 // Set the target segment start time to the middle point of the 1845 // segment where the last sample was. 1846 // This gives a better guess if segments of the two variants are not 1847 // perfectly aligned. (If the corresponding segment in new variant 1848 // starts slightly later than that in the old variant, we still want 1849 // to pick that segment, not the one before) 1850 fetcher->startAsync( 1851 sources[kAudioIndex], 1852 sources[kVideoIndex], 1853 sources[kSubtitleIndex], 1854 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs, 1855 startTime.getSegmentTimeUs(true /* midpoint */), 1856 startTime.mSeq, 1857 seekMode); 1858 } 1859 1860 // All fetchers have now been started, the configuration change 1861 // has completed. 1862 1863 mReconfigurationInProgress = false; 1864 if (switching) { 1865 mSwitchInProgress = true; 1866 } else { 1867 mStreamMask = mNewStreamMask; 1868 if (mOrigBandwidthIndex != mCurBandwidthIndex) { 1869 ALOGV("#### Finished Bandwidth Switch Early: %zd => %zd", 1870 mOrigBandwidthIndex, mCurBandwidthIndex); 1871 mOrigBandwidthIndex = mCurBandwidthIndex; 1872 } 1873 } 1874 1875 ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x", 1876 mSwitchInProgress, mStreamMask); 1877 1878 if (mDisconnectReplyID != NULL) { 1879 finishDisconnect(); 1880 } 1881} 1882 1883void LiveSession::swapPacketSource(StreamType stream) { 1884 ALOGV("[%s] swapPacketSource", getNameForStream(stream)); 1885 1886 // transfer packets from source2 to source 1887 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 1888 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 1889 1890 // queue discontinuity in mPacketSource 1891 aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false); 1892 1893 // queue packets in mPacketSource2 to mPacketSource 1894 status_t finalResult = OK; 1895 sp<ABuffer> accessUnit; 1896 while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK && 1897 OK == aps2->dequeueAccessUnit(&accessUnit)) { 1898 aps->queueAccessUnit(accessUnit); 1899 } 1900 aps2->clear(); 1901} 1902 1903void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { 1904 if (!mSwitchInProgress) { 1905 return; 1906 } 1907 1908 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 1909 if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { 1910 return; 1911 } 1912 1913 // Swap packet source of streams provided by old variant 1914 for (size_t idx = 0; idx < kMaxStreams; idx++) { 1915 StreamType stream = indexToType(idx); 1916 if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { 1917 swapPacketSource(stream); 1918 1919 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1920 ALOGW("swapping stream type %d %s to empty stream", 1921 stream, mStreams[idx].mUri.c_str()); 1922 } 1923 mStreams[idx].mUri = mStreams[idx].mNewUri; 1924 mStreams[idx].mNewUri.clear(); 1925 1926 mSwapMask &= ~stream; 1927 } 1928 } 1929 1930 mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); 1931 1932 ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask); 1933 if (mSwapMask != 0) { 1934 return; 1935 } 1936 1937 // Check if new variant contains extra streams. 1938 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1939 while (extraStreams) { 1940 StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1941 extraStreams &= ~stream; 1942 1943 swapPacketSource(stream); 1944 1945 ssize_t idx = typeToIndex(stream); 1946 CHECK(idx >= 0); 1947 if (mStreams[idx].mNewUri.empty()) { 1948 ALOGW("swapping extra stream type %d %s to empty stream", 1949 stream, mStreams[idx].mUri.c_str()); 1950 } 1951 mStreams[idx].mUri = mStreams[idx].mNewUri; 1952 mStreams[idx].mNewUri.clear(); 1953 } 1954 1955 // Restart new fetcher (it was paused after the first 47k block) 1956 // and let it fetch into mPacketSources (not mPacketSources2) 1957 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1958 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1959 if (info.mToBeResumed) { 1960 resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); 1961 info.mToBeResumed = false; 1962 } 1963 } 1964 1965 ALOGI("#### Finished Bandwidth Switch: %zd => %zd", 1966 mOrigBandwidthIndex, mCurBandwidthIndex); 1967 1968 mStreamMask = mNewStreamMask; 1969 mSwitchInProgress = false; 1970 mOrigBandwidthIndex = mCurBandwidthIndex; 1971 1972 restartPollBuffering(); 1973} 1974 1975void LiveSession::schedulePollBuffering() { 1976 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); 1977 msg->setInt32("generation", mPollBufferingGeneration); 1978 msg->post(1000000ll); 1979} 1980 1981void LiveSession::cancelPollBuffering() { 1982 ++mPollBufferingGeneration; 1983 mPrevBufferPercentage = -1; 1984} 1985 1986void LiveSession::restartPollBuffering() { 1987 cancelPollBuffering(); 1988 onPollBuffering(); 1989} 1990 1991void LiveSession::onPollBuffering() { 1992 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " 1993 "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x", 1994 mSwitchInProgress, mReconfigurationInProgress, 1995 mInPreparationPhase, mCurBandwidthIndex, mStreamMask); 1996 1997 bool underflow, ready, down, up; 1998 if (checkBuffering(underflow, ready, down, up)) { 1999 if (mInPreparationPhase) { 2000 // Allow down switch even if we're still preparing. 2001 // 2002 // Some streams have a high bandwidth index as default, 2003 // when bandwidth is low, it takes a long time to buffer 2004 // to ready mark, then it immediately pauses after start 2005 // as we have to do a down switch. It's better experience 2006 // to restart from a lower index, if we detect low bw. 2007 if (!switchBandwidthIfNeeded(false /* up */, down) && ready) { 2008 postPrepared(OK); 2009 } 2010 } 2011 2012 if (!mInPreparationPhase) { 2013 if (ready) { 2014 stopBufferingIfNecessary(); 2015 } else if (underflow) { 2016 startBufferingIfNecessary(); 2017 } 2018 switchBandwidthIfNeeded(up, down); 2019 } 2020 } 2021 2022 schedulePollBuffering(); 2023} 2024 2025void LiveSession::cancelBandwidthSwitch(bool resume) { 2026 ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", 2027 mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); 2028 if (!mSwitchInProgress) { 2029 return; 2030 } 2031 2032 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2033 FetcherInfo& info = mFetcherInfos.editValueAt(i); 2034 if (info.mToBeRemoved) { 2035 info.mToBeRemoved = false; 2036 if (resume) { 2037 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); 2038 } 2039 } 2040 } 2041 2042 for (size_t i = 0; i < kMaxStreams; ++i) { 2043 AString newUri = mStreams[i].mNewUri; 2044 if (!newUri.empty()) { 2045 // clear all mNewUri matching this newUri 2046 for (size_t j = i; j < kMaxStreams; ++j) { 2047 if (mStreams[j].mNewUri == newUri) { 2048 mStreams[j].mNewUri.clear(); 2049 } 2050 } 2051 ALOGV("stopping newUri = %s", newUri.c_str()); 2052 ssize_t index = mFetcherInfos.indexOfKey(newUri); 2053 if (index < 0) { 2054 ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); 2055 continue; 2056 } 2057 FetcherInfo &info = mFetcherInfos.editValueAt(index); 2058 info.mToBeRemoved = true; 2059 info.mFetcher->stopAsync(); 2060 } 2061 } 2062 2063 ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", 2064 mOrigBandwidthIndex, mCurBandwidthIndex); 2065 2066 mSwitchGeneration++; 2067 mSwitchInProgress = false; 2068 mCurBandwidthIndex = mOrigBandwidthIndex; 2069 mSwapMask = 0; 2070} 2071 2072bool LiveSession::checkBuffering( 2073 bool &underflow, bool &ready, bool &down, bool &up) { 2074 underflow = ready = down = up = false; 2075 2076 if (mReconfigurationInProgress) { 2077 ALOGV("Switch/Reconfig in progress, defer buffer polling"); 2078 return false; 2079 } 2080 2081 size_t activeCount, underflowCount, readyCount, downCount, upCount; 2082 activeCount = underflowCount = readyCount = downCount = upCount =0; 2083 int32_t minBufferPercent = -1; 2084 int64_t durationUs; 2085 if (getDuration(&durationUs) != OK) { 2086 durationUs = -1; 2087 } 2088 for (size_t i = 0; i < mPacketSources.size(); ++i) { 2089 // we don't check subtitles for buffering level 2090 if (!(mStreamMask & mPacketSources.keyAt(i) 2091 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { 2092 continue; 2093 } 2094 // ignore streams that never had any packet queued. 2095 // (it's possible that the variant only has audio or video) 2096 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 2097 if (meta == NULL) { 2098 continue; 2099 } 2100 2101 int64_t bufferedDurationUs = 2102 mPacketSources[i]->getEstimatedDurationUs(); 2103 ALOGV("[%s] buffered %lld us", 2104 getNameForStream(mPacketSources.keyAt(i)), 2105 (long long)bufferedDurationUs); 2106 if (durationUs >= 0) { 2107 int32_t percent; 2108 if (mPacketSources[i]->isFinished(0 /* duration */)) { 2109 percent = 100; 2110 } else { 2111 percent = (int32_t)(100.0 * 2112 (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); 2113 } 2114 if (minBufferPercent < 0 || percent < minBufferPercent) { 2115 minBufferPercent = percent; 2116 } 2117 } 2118 2119 ++activeCount; 2120 int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs; 2121 if (bufferedDurationUs > readyMark 2122 || mPacketSources[i]->isFinished(0)) { 2123 ++readyCount; 2124 } 2125 if (!mPacketSources[i]->isFinished(0)) { 2126 if (bufferedDurationUs < kUnderflowMarkUs) { 2127 ++underflowCount; 2128 } 2129 if (bufferedDurationUs > mUpSwitchMark) { 2130 ++upCount; 2131 } 2132 if (bufferedDurationUs < mDownSwitchMark) { 2133 ++downCount; 2134 } 2135 } 2136 } 2137 2138 if (minBufferPercent >= 0) { 2139 notifyBufferingUpdate(minBufferPercent); 2140 } 2141 2142 if (activeCount > 0) { 2143 up = (upCount == activeCount); 2144 down = (downCount > 0); 2145 ready = (readyCount == activeCount); 2146 underflow = (underflowCount > 0); 2147 return true; 2148 } 2149 2150 return false; 2151} 2152 2153void LiveSession::startBufferingIfNecessary() { 2154 ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2155 mInPreparationPhase, mBuffering); 2156 if (!mBuffering) { 2157 mBuffering = true; 2158 2159 sp<AMessage> notify = mNotify->dup(); 2160 notify->setInt32("what", kWhatBufferingStart); 2161 notify->post(); 2162 } 2163} 2164 2165void LiveSession::stopBufferingIfNecessary() { 2166 ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2167 mInPreparationPhase, mBuffering); 2168 2169 if (mBuffering) { 2170 mBuffering = false; 2171 2172 sp<AMessage> notify = mNotify->dup(); 2173 notify->setInt32("what", kWhatBufferingEnd); 2174 notify->post(); 2175 } 2176} 2177 2178void LiveSession::notifyBufferingUpdate(int32_t percentage) { 2179 if (percentage < mPrevBufferPercentage) { 2180 percentage = mPrevBufferPercentage; 2181 } else if (percentage > 100) { 2182 percentage = 100; 2183 } 2184 2185 mPrevBufferPercentage = percentage; 2186 2187 ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); 2188 2189 sp<AMessage> notify = mNotify->dup(); 2190 notify->setInt32("what", kWhatBufferingUpdate); 2191 notify->setInt32("percentage", percentage); 2192 notify->post(); 2193} 2194 2195/* 2196 * returns true if a bandwidth switch is actually needed (and started), 2197 * returns false otherwise 2198 */ 2199bool LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { 2200 // no need to check bandwidth if we only have 1 bandwidth settings 2201 if (mSwitchInProgress || mBandwidthItems.size() < 2) { 2202 return false; 2203 } 2204 2205 int32_t bandwidthBps; 2206 if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) { 2207 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 2208 mLastBandwidthBps = bandwidthBps; 2209 } else { 2210 ALOGV("no bandwidth estimate."); 2211 return false; 2212 } 2213 2214 int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth; 2215 // canSwithDown and canSwitchUp can't both be true. 2216 // we only want to switch up when measured bw is 120% higher than current variant, 2217 // and we only want to switch down when measured bw is below current variant. 2218 bool canSwithDown = bufferLow 2219 && (bandwidthBps < (int32_t)curBandwidth); 2220 bool canSwitchUp = bufferHigh 2221 && (bandwidthBps > (int32_t)curBandwidth * 12 / 10); 2222 2223 if (canSwithDown || canSwitchUp) { 2224 ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps); 2225 2226 // it's possible that we're checking for canSwitchUp case, but the returned 2227 // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70% 2228 // of measured bw. In that case we don't want to do anything, since we have 2229 // both enough buffer and enough bw. 2230 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) 2231 || (canSwithDown && bandwidthIndex < mCurBandwidthIndex)) { 2232 // if not yet prepared, just restart again with new bw index. 2233 // this is faster and playback experience is cleaner. 2234 changeConfiguration( 2235 mInPreparationPhase ? 0 : -1ll, bandwidthIndex); 2236 return true; 2237 } 2238 } 2239 return false; 2240} 2241 2242void LiveSession::postError(status_t err) { 2243 // if we reached EOS, notify buffering of 100% 2244 if (err == ERROR_END_OF_STREAM) { 2245 notifyBufferingUpdate(100); 2246 } 2247 // we'll stop buffer polling now, before that notify 2248 // stop buffering to stop the spinning icon 2249 stopBufferingIfNecessary(); 2250 cancelPollBuffering(); 2251 2252 sp<AMessage> notify = mNotify->dup(); 2253 notify->setInt32("what", kWhatError); 2254 notify->setInt32("err", err); 2255 notify->post(); 2256} 2257 2258void LiveSession::postPrepared(status_t err) { 2259 CHECK(mInPreparationPhase); 2260 2261 sp<AMessage> notify = mNotify->dup(); 2262 if (err == OK || err == ERROR_END_OF_STREAM) { 2263 notify->setInt32("what", kWhatPrepared); 2264 } else { 2265 cancelPollBuffering(); 2266 2267 notify->setInt32("what", kWhatPreparationFailed); 2268 notify->setInt32("err", err); 2269 } 2270 2271 notify->post(); 2272 2273 mInPreparationPhase = false; 2274} 2275 2276 2277} // namespace android 2278 2279