LiveSession.cpp revision a1151185c7eb3b4c483f7067deba1775fd0a2510
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/foundation/AUtils.h> 37#include <media/stagefright/DataSource.h> 38#include <media/stagefright/FileSource.h> 39#include <media/stagefright/MediaErrors.h> 40#include <media/stagefright/MediaHTTP.h> 41#include <media/stagefright/MetaData.h> 42#include <media/stagefright/Utils.h> 43 44#include <utils/Mutex.h> 45 46#include <ctype.h> 47#include <inttypes.h> 48#include <openssl/aes.h> 49#include <openssl/md5.h> 50 51namespace android { 52 53// static 54// Bandwidth Switch Mark Defaults 55const int64_t LiveSession::kUpSwitchMarkUs = 15000000ll; 56const int64_t LiveSession::kDownSwitchMarkUs = 20000000ll; 57const int64_t LiveSession::kUpSwitchMarginUs = 5000000ll; 58const int64_t LiveSession::kResumeThresholdUs = 100000ll; 59 60// Buffer Prepare/Ready/Underflow Marks 61const int64_t LiveSession::kReadyMarkUs = 5000000ll; 62const int64_t LiveSession::kPrepareMarkUs = 1500000ll; 63const int64_t LiveSession::kUnderflowMarkUs = 1000000ll; 64 65struct LiveSession::BandwidthEstimator : public RefBase { 66 BandwidthEstimator(); 67 68 void addBandwidthMeasurement(size_t numBytes, int64_t delayUs); 69 bool estimateBandwidth(int32_t *bandwidth); 70 71private: 72 // Bandwidth estimation parameters 73 static const int32_t kMaxBandwidthHistoryItems = 20; 74 static const int64_t kMaxBandwidthHistoryWindowUs = 5000000ll; // 5 sec 75 76 struct BandwidthEntry { 77 int64_t mDelayUs; 78 size_t mNumBytes; 79 }; 80 81 Mutex mLock; 82 List<BandwidthEntry> mBandwidthHistory; 83 int64_t mTotalTransferTimeUs; 84 size_t mTotalTransferBytes; 85 86 DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator); 87}; 88 89LiveSession::BandwidthEstimator::BandwidthEstimator() : 90 mTotalTransferTimeUs(0), 91 mTotalTransferBytes(0) { 92} 93 94void LiveSession::BandwidthEstimator::addBandwidthMeasurement( 95 size_t numBytes, int64_t delayUs) { 96 AutoMutex autoLock(mLock); 97 98 BandwidthEntry entry; 99 entry.mDelayUs = delayUs; 100 entry.mNumBytes = numBytes; 101 mTotalTransferTimeUs += delayUs; 102 mTotalTransferBytes += numBytes; 103 mBandwidthHistory.push_back(entry); 104 105 // trim old samples, keeping at least kMaxBandwidthHistoryItems samples, 106 // and total transfer time at least kMaxBandwidthHistoryWindowUs. 107 while (mBandwidthHistory.size() > kMaxBandwidthHistoryItems) { 108 List<BandwidthEntry>::iterator it = mBandwidthHistory.begin(); 109 if (mTotalTransferTimeUs - it->mDelayUs < kMaxBandwidthHistoryWindowUs) { 110 break; 111 } 112 mTotalTransferTimeUs -= it->mDelayUs; 113 mTotalTransferBytes -= it->mNumBytes; 114 mBandwidthHistory.erase(mBandwidthHistory.begin()); 115 } 116} 117 118bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) { 119 AutoMutex autoLock(mLock); 120 121 if (mBandwidthHistory.size() < 2) { 122 return false; 123 } 124 125 *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs); 126 return true; 127} 128 129//static 130const char *LiveSession::getKeyForStream(StreamType type) { 131 switch (type) { 132 case STREAMTYPE_VIDEO: 133 return "timeUsVideo"; 134 case STREAMTYPE_AUDIO: 135 return "timeUsAudio"; 136 case STREAMTYPE_SUBTITLES: 137 return "timeUsSubtitle"; 138 default: 139 TRESPASS(); 140 } 141 return NULL; 142} 143 144//static 145const char *LiveSession::getNameForStream(StreamType type) { 146 switch (type) { 147 case STREAMTYPE_VIDEO: 148 return "video"; 149 case STREAMTYPE_AUDIO: 150 return "audio"; 151 case STREAMTYPE_SUBTITLES: 152 return "subs"; 153 default: 154 break; 155 } 156 return "unknown"; 157} 158 159LiveSession::LiveSession( 160 const sp<AMessage> ¬ify, uint32_t flags, 161 const sp<IMediaHTTPService> &httpService) 162 : mNotify(notify), 163 mFlags(flags), 164 mHTTPService(httpService), 165 mBuffering(false), 166 mInPreparationPhase(true), 167 mPollBufferingGeneration(0), 168 mPrevBufferPercentage(-1), 169 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 170 mCurBandwidthIndex(-1), 171 mOrigBandwidthIndex(-1), 172 mLastBandwidthBps(-1ll), 173 mBandwidthEstimator(new BandwidthEstimator()), 174 mStreamMask(0), 175 mNewStreamMask(0), 176 mSwapMask(0), 177 mSwitchGeneration(0), 178 mSubtitleGeneration(0), 179 mLastDequeuedTimeUs(0ll), 180 mRealTimeBaseUs(0ll), 181 mReconfigurationInProgress(false), 182 mSwitchInProgress(false), 183 mUpSwitchMark(kUpSwitchMarkUs), 184 mDownSwitchMark(kDownSwitchMarkUs), 185 mUpSwitchMargin(kUpSwitchMarginUs), 186 mFirstTimeUsValid(false), 187 mFirstTimeUs(0), 188 mLastSeekTimeUs(0) { 189 mStreams[kAudioIndex] = StreamItem("audio"); 190 mStreams[kVideoIndex] = StreamItem("video"); 191 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 192 193 for (size_t i = 0; i < kMaxStreams; ++i) { 194 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 195 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 196 } 197} 198 199LiveSession::~LiveSession() { 200 if (mFetcherLooper != NULL) { 201 mFetcherLooper->stop(); 202 } 203} 204 205status_t LiveSession::dequeueAccessUnit( 206 StreamType stream, sp<ABuffer> *accessUnit) { 207 status_t finalResult = OK; 208 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 209 210 ssize_t streamIdx = typeToIndex(stream); 211 if (streamIdx < 0) { 212 return INVALID_VALUE; 213 } 214 const char *streamStr = getNameForStream(stream); 215 // Do not let client pull data if we don't have data packets yet. 216 // We might only have a format discontinuity queued without data. 217 // When NuPlayerDecoder dequeues the format discontinuity, it will 218 // immediately try to getFormat. If we return NULL, NuPlayerDecoder 219 // thinks it can do seamless change, so will not shutdown decoder. 220 // When the actual format arrives, it can't handle it and get stuck. 221 if (!packetSource->hasDataBufferAvailable(&finalResult)) { 222 ALOGV("[%s] dequeueAccessUnit: no buffer available (finalResult=%d)", 223 streamStr, finalResult); 224 225 if (finalResult == OK) { 226 return -EAGAIN; 227 } else { 228 return finalResult; 229 } 230 } 231 232 // Let the client dequeue as long as we have buffers available 233 // Do not make pause/resume decisions here. 234 235 status_t err = packetSource->dequeueAccessUnit(accessUnit); 236 237 StreamItem& strm = mStreams[streamIdx]; 238 if (err == INFO_DISCONTINUITY) { 239 // adaptive streaming, discontinuities in the playlist 240 int32_t type; 241 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 242 243 sp<AMessage> extra; 244 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 245 extra.clear(); 246 } 247 248 ALOGI("[%s] read discontinuity of type %d, extra = %s", 249 streamStr, 250 type, 251 extra == NULL ? "NULL" : extra->debugString().c_str()); 252 } else if (err == OK) { 253 254 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 255 int64_t timeUs, originalTimeUs; 256 int32_t discontinuitySeq = 0; 257 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 258 originalTimeUs = timeUs; 259 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 260 if (discontinuitySeq > (int32_t) strm.mCurDiscontinuitySeq) { 261 int64_t offsetTimeUs; 262 if (mDiscontinuityOffsetTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 263 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(strm.mCurDiscontinuitySeq); 264 } else { 265 offsetTimeUs = 0; 266 } 267 268 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 269 int64_t firstTimeUs; 270 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 271 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 272 offsetTimeUs += strm.mLastSampleDurationUs; 273 } else { 274 offsetTimeUs += strm.mLastSampleDurationUs; 275 } 276 277 mDiscontinuityOffsetTimesUs.add(discontinuitySeq, offsetTimeUs); 278 strm.mCurDiscontinuitySeq = discontinuitySeq; 279 } 280 281 int32_t discard = 0; 282 int64_t firstTimeUs; 283 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 284 int64_t durUs; // approximate sample duration 285 if (timeUs > strm.mLastDequeuedTimeUs) { 286 durUs = timeUs - strm.mLastDequeuedTimeUs; 287 } else { 288 durUs = strm.mLastDequeuedTimeUs - timeUs; 289 } 290 strm.mLastSampleDurationUs = durUs; 291 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 292 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 293 firstTimeUs = timeUs; 294 } else { 295 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 296 firstTimeUs = timeUs; 297 } 298 299 strm.mLastDequeuedTimeUs = timeUs; 300 if (timeUs >= firstTimeUs) { 301 timeUs -= firstTimeUs; 302 } else { 303 timeUs = 0; 304 } 305 timeUs += mLastSeekTimeUs; 306 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 307 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 308 } 309 310 ALOGV("[%s] dequeueAccessUnit: time %lld us, original %lld us", 311 streamStr, (long long)timeUs, (long long)originalTimeUs); 312 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 313 mLastDequeuedTimeUs = timeUs; 314 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 315 } else if (stream == STREAMTYPE_SUBTITLES) { 316 int32_t subtitleGeneration; 317 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 318 && subtitleGeneration != mSubtitleGeneration) { 319 return -EAGAIN; 320 }; 321 (*accessUnit)->meta()->setInt32( 322 "trackIndex", mPlaylist->getSelectedIndex()); 323 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 324 } 325 } else { 326 ALOGI("[%s] encountered error %d", streamStr, err); 327 } 328 329 return err; 330} 331 332status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 333 if (!(mStreamMask & stream)) { 334 return UNKNOWN_ERROR; 335 } 336 337 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 338 339 sp<MetaData> meta = packetSource->getFormat(); 340 341 if (meta == NULL) { 342 return -EAGAIN; 343 } 344 345 if (stream == STREAMTYPE_AUDIO) { 346 // set AAC input buffer size to 32K bytes (256kbps x 1sec) 347 meta->setInt32(kKeyMaxInputSize, 32 * 1024); 348 } 349 350 return convertMetaDataToMessage(meta, format); 351} 352 353sp<HTTPBase> LiveSession::getHTTPDataSource() { 354 return new MediaHTTP(mHTTPService->makeHTTPConnection()); 355} 356 357void LiveSession::connectAsync( 358 const char *url, const KeyedVector<String8, String8> *headers) { 359 sp<AMessage> msg = new AMessage(kWhatConnect, this); 360 msg->setString("url", url); 361 362 if (headers != NULL) { 363 msg->setPointer( 364 "headers", 365 new KeyedVector<String8, String8>(*headers)); 366 } 367 368 msg->post(); 369} 370 371status_t LiveSession::disconnect() { 372 sp<AMessage> msg = new AMessage(kWhatDisconnect, this); 373 374 sp<AMessage> response; 375 status_t err = msg->postAndAwaitResponse(&response); 376 377 return err; 378} 379 380status_t LiveSession::seekTo(int64_t timeUs) { 381 sp<AMessage> msg = new AMessage(kWhatSeek, this); 382 msg->setInt64("timeUs", timeUs); 383 384 sp<AMessage> response; 385 status_t err = msg->postAndAwaitResponse(&response); 386 387 return err; 388} 389 390bool LiveSession::checkSwitchProgress( 391 sp<AMessage> &stopParams, int64_t delayUs, bool *needResumeUntil) { 392 AString newUri; 393 CHECK(stopParams->findString("uri", &newUri)); 394 395 *needResumeUntil = false; 396 sp<AMessage> firstNewMeta[kMaxStreams]; 397 for (size_t i = 0; i < kMaxStreams; ++i) { 398 StreamType stream = indexToType(i); 399 if (!(mSwapMask & mNewStreamMask & stream) 400 || (mStreams[i].mNewUri != newUri)) { 401 continue; 402 } 403 if (stream == STREAMTYPE_SUBTITLES) { 404 continue; 405 } 406 sp<AnotherPacketSource> &source = mPacketSources.editValueAt(i); 407 408 // First, get latest dequeued meta, which is where the decoder is at. 409 // (when upswitching, we take the meta after a certain delay, so that 410 // the decoder is left with some cushion) 411 sp<AMessage> lastDequeueMeta, lastEnqueueMeta; 412 if (delayUs > 0) { 413 lastDequeueMeta = source->getMetaAfterLastDequeued(delayUs); 414 if (lastDequeueMeta == NULL) { 415 // this means we don't have enough cushion, try again later 416 ALOGV("[%s] up switching failed due to insufficient buffer", 417 getNameForStream(stream)); 418 return false; 419 } 420 } else { 421 // It's okay for lastDequeueMeta to be NULL here, it means the 422 // decoder hasn't even started dequeueing 423 lastDequeueMeta = source->getLatestDequeuedMeta(); 424 } 425 // Then, trim off packets at beginning of mPacketSources2 that's before 426 // the latest dequeued time. These samples are definitely too late. 427 firstNewMeta[i] = mPacketSources2.editValueAt(i) 428 ->trimBuffersBeforeMeta(lastDequeueMeta); 429 430 // Now firstNewMeta[i] is the first sample after the trim. 431 // If it's NULL, we failed because dequeue already past all samples 432 // in mPacketSource2, we have to try again. 433 if (firstNewMeta[i] == NULL) { 434 HLSTime dequeueTime(lastDequeueMeta); 435 ALOGV("[%s] dequeue time (%d, %lld) past start time", 436 getNameForStream(stream), 437 dequeueTime.mSeq, (long long) dequeueTime.mTimeUs); 438 return false; 439 } 440 441 // Otherwise, we check if mPacketSources2 overlaps with what old fetcher 442 // already fetched, and see if we need to resumeUntil 443 lastEnqueueMeta = source->getLatestEnqueuedMeta(); 444 // lastEnqueueMeta == NULL means old fetcher stopped at a discontinuity 445 // boundary, no need to resume as the content will look different anyways 446 if (lastEnqueueMeta != NULL) { 447 HLSTime lastTime(lastEnqueueMeta), startTime(firstNewMeta[i]); 448 449 // no need to resume old fetcher if new fetcher started in different 450 // discontinuity sequence, as the content will look different. 451 *needResumeUntil |= (startTime.mSeq == lastTime.mSeq 452 && startTime.mTimeUs - lastTime.mTimeUs > kResumeThresholdUs); 453 454 // update the stopTime for resumeUntil 455 stopParams->setInt32("discontinuitySeq", startTime.mSeq); 456 stopParams->setInt64(getKeyForStream(stream), startTime.mTimeUs); 457 } 458 } 459 460 // if we're here, it means dequeue progress hasn't passed some samples in 461 // mPacketSource2, we can trim off the excess in mPacketSource. 462 // (old fetcher might still need to resumeUntil the start time of new fetcher) 463 for (size_t i = 0; i < kMaxStreams; ++i) { 464 StreamType stream = indexToType(i); 465 if (!(mSwapMask & mNewStreamMask & stream) 466 || (newUri != mStreams[i].mNewUri) 467 || stream == STREAMTYPE_SUBTITLES) { 468 continue; 469 } 470 mPacketSources.valueFor(stream)->trimBuffersAfterMeta(firstNewMeta[i]); 471 } 472 473 // no resumeUntil if already underflow 474 *needResumeUntil &= !mBuffering; 475 476 return true; 477} 478 479void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 480 switch (msg->what()) { 481 case kWhatConnect: 482 { 483 onConnect(msg); 484 break; 485 } 486 487 case kWhatDisconnect: 488 { 489 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 490 491 if (mReconfigurationInProgress) { 492 break; 493 } 494 495 finishDisconnect(); 496 break; 497 } 498 499 case kWhatSeek: 500 { 501 if (mReconfigurationInProgress) { 502 msg->post(50000); 503 break; 504 } 505 506 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 507 mSeekReply = new AMessage; 508 509 onSeek(msg); 510 break; 511 } 512 513 case kWhatFetcherNotify: 514 { 515 int32_t what; 516 CHECK(msg->findInt32("what", &what)); 517 518 switch (what) { 519 case PlaylistFetcher::kWhatStarted: 520 break; 521 case PlaylistFetcher::kWhatPaused: 522 case PlaylistFetcher::kWhatStopped: 523 { 524 AString uri; 525 CHECK(msg->findString("uri", &uri)); 526 ssize_t index = mFetcherInfos.indexOfKey(uri); 527 if (index < 0) { 528 // ignore msgs from fetchers that's already gone 529 break; 530 } 531 532 ALOGV("fetcher-%d %s", 533 mFetcherInfos[index].mFetcher->getFetcherID(), 534 what == PlaylistFetcher::kWhatPaused ? 535 "paused" : "stopped"); 536 537 if (what == PlaylistFetcher::kWhatStopped) { 538 mFetcherLooper->unregisterHandler( 539 mFetcherInfos[index].mFetcher->id()); 540 mFetcherInfos.removeItemsAt(index); 541 } else if (what == PlaylistFetcher::kWhatPaused) { 542 int32_t seekMode; 543 CHECK(msg->findInt32("seekMode", &seekMode)); 544 for (size_t i = 0; i < kMaxStreams; ++i) { 545 if (mStreams[i].mUri == uri) { 546 mStreams[i].mSeekMode = (SeekMode) seekMode; 547 } 548 } 549 } 550 551 if (mContinuation != NULL) { 552 CHECK_GT(mContinuationCounter, 0); 553 if (--mContinuationCounter == 0) { 554 mContinuation->post(); 555 } 556 ALOGV("%zu fetcher(s) left", mContinuationCounter); 557 } 558 break; 559 } 560 561 case PlaylistFetcher::kWhatDurationUpdate: 562 { 563 AString uri; 564 CHECK(msg->findString("uri", &uri)); 565 566 int64_t durationUs; 567 CHECK(msg->findInt64("durationUs", &durationUs)); 568 569 ssize_t index = mFetcherInfos.indexOfKey(uri); 570 if (index >= 0) { 571 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 572 info->mDurationUs = durationUs; 573 } 574 break; 575 } 576 577 case PlaylistFetcher::kWhatTargetDurationUpdate: 578 { 579 int64_t targetDurationUs; 580 CHECK(msg->findInt64("targetDurationUs", &targetDurationUs)); 581 mUpSwitchMark = min(kUpSwitchMarkUs, targetDurationUs * 7 / 4); 582 mDownSwitchMark = min(kDownSwitchMarkUs, targetDurationUs * 9 / 4); 583 mUpSwitchMargin = min(kUpSwitchMarginUs, targetDurationUs); 584 break; 585 } 586 587 case PlaylistFetcher::kWhatError: 588 { 589 status_t err; 590 CHECK(msg->findInt32("err", &err)); 591 592 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 593 594 // handle EOS on subtitle tracks independently 595 AString uri; 596 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 597 ssize_t i = mFetcherInfos.indexOfKey(uri); 598 if (i >= 0) { 599 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 600 if (fetcher != NULL) { 601 uint32_t type = fetcher->getStreamTypeMask(); 602 if (type == STREAMTYPE_SUBTITLES) { 603 mPacketSources.valueFor( 604 STREAMTYPE_SUBTITLES)->signalEOS(err);; 605 break; 606 } 607 } 608 } 609 } 610 611 if (mInPreparationPhase) { 612 postPrepared(err); 613 } 614 615 cancelBandwidthSwitch(); 616 617 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 618 619 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 620 621 mPacketSources.valueFor( 622 STREAMTYPE_SUBTITLES)->signalEOS(err); 623 624 postError(err); 625 break; 626 } 627 628 case PlaylistFetcher::kWhatStopReached: 629 { 630 ALOGV("kWhatStopReached"); 631 632 AString oldUri; 633 CHECK(msg->findString("uri", &oldUri)); 634 635 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 636 if (index < 0) { 637 break; 638 } 639 640 tryToFinishBandwidthSwitch(oldUri); 641 break; 642 } 643 644 case PlaylistFetcher::kWhatStartedAt: 645 { 646 int32_t switchGeneration; 647 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 648 649 ALOGV("kWhatStartedAt: switchGen=%d, mSwitchGen=%d", 650 switchGeneration, mSwitchGeneration); 651 652 if (switchGeneration != mSwitchGeneration) { 653 break; 654 } 655 656 AString uri; 657 CHECK(msg->findString("uri", &uri)); 658 659 // mark new fetcher mToBeResumed 660 ssize_t index = mFetcherInfos.indexOfKey(uri); 661 if (index >= 0) { 662 mFetcherInfos.editValueAt(index).mToBeResumed = true; 663 } 664 665 // temporarily disable packet sources to be swapped to prevent 666 // NuPlayerDecoder from dequeuing while we check progress 667 for (size_t i = 0; i < mPacketSources.size(); ++i) { 668 if ((mSwapMask & mPacketSources.keyAt(i)) 669 && uri == mStreams[i].mNewUri) { 670 mPacketSources.editValueAt(i)->enable(false); 671 } 672 } 673 bool switchUp = (mCurBandwidthIndex > mOrigBandwidthIndex); 674 // If switching up, require a cushion bigger than kUnderflowMark 675 // to avoid buffering immediately after the switch. 676 // (If we don't have that cushion we'd rather cancel and try again.) 677 int64_t delayUs = switchUp ? (kUnderflowMarkUs + 1000000ll) : 0; 678 bool needResumeUntil = false; 679 sp<AMessage> stopParams = msg; 680 if (checkSwitchProgress(stopParams, delayUs, &needResumeUntil)) { 681 // playback time hasn't passed startAt time 682 if (!needResumeUntil) { 683 ALOGV("finish switch"); 684 for (size_t i = 0; i < kMaxStreams; ++i) { 685 if ((mSwapMask & indexToType(i)) 686 && uri == mStreams[i].mNewUri) { 687 // have to make a copy of mStreams[i].mUri because 688 // tryToFinishBandwidthSwitch is modifying mStreams[] 689 AString oldURI = mStreams[i].mUri; 690 tryToFinishBandwidthSwitch(oldURI); 691 break; 692 } 693 } 694 } else { 695 // startAt time is after last enqueue time 696 // Resume fetcher for the original variant; the resumed fetcher should 697 // continue until the timestamps found in msg, which is stored by the 698 // new fetcher to indicate where the new variant has started buffering. 699 ALOGV("finish switch with resumeUntilAsync"); 700 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 701 const FetcherInfo &info = mFetcherInfos.valueAt(i); 702 if (info.mToBeRemoved) { 703 info.mFetcher->resumeUntilAsync(stopParams); 704 } 705 } 706 } 707 } else { 708 // playback time passed startAt time 709 if (switchUp) { 710 // if switching up, cancel and retry if condition satisfies again 711 ALOGV("cancel up switch because we're too late"); 712 cancelBandwidthSwitch(true /* resume */); 713 } else { 714 ALOGV("retry down switch at next sample"); 715 resumeFetcher(uri, mSwapMask, -1, true /* newUri */); 716 } 717 } 718 // re-enable all packet sources 719 for (size_t i = 0; i < mPacketSources.size(); ++i) { 720 mPacketSources.editValueAt(i)->enable(true); 721 } 722 723 break; 724 } 725 726 default: 727 TRESPASS(); 728 } 729 730 break; 731 } 732 733 case kWhatChangeConfiguration: 734 { 735 onChangeConfiguration(msg); 736 break; 737 } 738 739 case kWhatChangeConfiguration2: 740 { 741 onChangeConfiguration2(msg); 742 break; 743 } 744 745 case kWhatChangeConfiguration3: 746 { 747 onChangeConfiguration3(msg); 748 break; 749 } 750 751 case kWhatFinishDisconnect2: 752 { 753 onFinishDisconnect2(); 754 break; 755 } 756 757 case kWhatPollBuffering: 758 { 759 int32_t generation; 760 CHECK(msg->findInt32("generation", &generation)); 761 if (generation == mPollBufferingGeneration) { 762 onPollBuffering(); 763 } 764 break; 765 } 766 767 default: 768 TRESPASS(); 769 break; 770 } 771} 772 773// static 774int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 775 if (a->mBandwidth < b->mBandwidth) { 776 return -1; 777 } else if (a->mBandwidth == b->mBandwidth) { 778 return 0; 779 } 780 781 return 1; 782} 783 784// static 785LiveSession::StreamType LiveSession::indexToType(int idx) { 786 CHECK(idx >= 0 && idx < kMaxStreams); 787 return (StreamType)(1 << idx); 788} 789 790// static 791ssize_t LiveSession::typeToIndex(int32_t type) { 792 switch (type) { 793 case STREAMTYPE_AUDIO: 794 return 0; 795 case STREAMTYPE_VIDEO: 796 return 1; 797 case STREAMTYPE_SUBTITLES: 798 return 2; 799 default: 800 return -1; 801 }; 802 return -1; 803} 804 805void LiveSession::onConnect(const sp<AMessage> &msg) { 806 AString url; 807 CHECK(msg->findString("url", &url)); 808 809 KeyedVector<String8, String8> *headers = NULL; 810 if (!msg->findPointer("headers", (void **)&headers)) { 811 mExtraHeaders.clear(); 812 } else { 813 mExtraHeaders = *headers; 814 815 delete headers; 816 headers = NULL; 817 } 818 819 // TODO currently we don't know if we are coming here from incognito mode 820 ALOGI("onConnect %s", uriDebugString(url).c_str()); 821 822 mMasterURL = url; 823 824 bool dummy; 825 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 826 827 if (mPlaylist == NULL) { 828 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 829 830 postPrepared(ERROR_IO); 831 return; 832 } 833 834 // create looper for fetchers 835 if (mFetcherLooper == NULL) { 836 mFetcherLooper = new ALooper(); 837 838 mFetcherLooper->setName("Fetcher"); 839 mFetcherLooper->start(false, false); 840 } 841 842 // We trust the content provider to make a reasonable choice of preferred 843 // initial bandwidth by listing it first in the variant playlist. 844 // At startup we really don't have a good estimate on the available 845 // network bandwidth since we haven't tranferred any data yet. Once 846 // we have we can make a better informed choice. 847 size_t initialBandwidth = 0; 848 size_t initialBandwidthIndex = 0; 849 850 if (mPlaylist->isVariantPlaylist()) { 851 Vector<BandwidthItem> itemsWithVideo; 852 for (size_t i = 0; i < mPlaylist->size(); ++i) { 853 BandwidthItem item; 854 855 item.mPlaylistIndex = i; 856 857 sp<AMessage> meta; 858 AString uri; 859 mPlaylist->itemAt(i, &uri, &meta); 860 861 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 862 863 mBandwidthItems.push(item); 864 if (mPlaylist->hasType(i, "video")) { 865 itemsWithVideo.push(item); 866 } 867 } 868 // remove the audio-only variants if we have at least one with video 869 if (!itemsWithVideo.empty() 870 && itemsWithVideo.size() < mBandwidthItems.size()) { 871 mBandwidthItems.clear(); 872 for (size_t i = 0; i < itemsWithVideo.size(); ++i) { 873 mBandwidthItems.push(itemsWithVideo[i]); 874 } 875 } 876 877 CHECK_GT(mBandwidthItems.size(), 0u); 878 initialBandwidth = mBandwidthItems[0].mBandwidth; 879 880 mBandwidthItems.sort(SortByBandwidth); 881 882 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 883 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 884 initialBandwidthIndex = i; 885 break; 886 } 887 } 888 } else { 889 // dummy item. 890 BandwidthItem item; 891 item.mPlaylistIndex = 0; 892 item.mBandwidth = 0; 893 mBandwidthItems.push(item); 894 } 895 896 mPlaylist->pickRandomMediaItems(); 897 changeConfiguration( 898 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 899} 900 901void LiveSession::finishDisconnect() { 902 ALOGV("finishDisconnect"); 903 904 // No reconfiguration is currently pending, make sure none will trigger 905 // during disconnection either. 906 cancelBandwidthSwitch(); 907 908 // cancel buffer polling 909 cancelPollBuffering(); 910 911 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 912 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 913 } 914 915 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this); 916 917 mContinuationCounter = mFetcherInfos.size(); 918 mContinuation = msg; 919 920 if (mContinuationCounter == 0) { 921 msg->post(); 922 } 923} 924 925void LiveSession::onFinishDisconnect2() { 926 mContinuation.clear(); 927 928 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 929 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 930 931 mPacketSources.valueFor( 932 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 933 934 sp<AMessage> response = new AMessage; 935 response->setInt32("err", OK); 936 937 response->postReply(mDisconnectReplyID); 938 mDisconnectReplyID.clear(); 939} 940 941sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 942 ssize_t index = mFetcherInfos.indexOfKey(uri); 943 944 if (index >= 0) { 945 return NULL; 946 } 947 948 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this); 949 notify->setString("uri", uri); 950 notify->setInt32("switchGeneration", mSwitchGeneration); 951 952 FetcherInfo info; 953 info.mFetcher = new PlaylistFetcher( 954 notify, this, uri, mCurBandwidthIndex, mSubtitleGeneration); 955 info.mDurationUs = -1ll; 956 info.mToBeRemoved = false; 957 info.mToBeResumed = false; 958 mFetcherLooper->registerHandler(info.mFetcher); 959 960 mFetcherInfos.add(uri, info); 961 962 return info.mFetcher; 963} 964 965/* 966 * Illustration of parameters: 967 * 968 * 0 `range_offset` 969 * +------------+-------------------------------------------------------+--+--+ 970 * | | | next block to fetch | | | 971 * | | `source` handle => `out` buffer | | | | 972 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 973 * | |<----------- `range_length` / buffer capacity ----------->| | 974 * |<------------------------------ file_size ------------------------------->| 975 * 976 * Special parameter values: 977 * - range_length == -1 means entire file 978 * - block_size == 0 means entire range 979 * 980 */ 981ssize_t LiveSession::fetchFile( 982 const char *url, sp<ABuffer> *out, 983 int64_t range_offset, int64_t range_length, 984 uint32_t block_size, /* download block size */ 985 sp<DataSource> *source, /* to return and reuse source */ 986 String8 *actualUrl, 987 bool forceConnectHTTP /* force connect HTTP when resuing source */) { 988 off64_t size; 989 sp<DataSource> temp_source; 990 if (source == NULL) { 991 source = &temp_source; 992 } 993 994 if (*source == NULL || forceConnectHTTP) { 995 if (!strncasecmp(url, "file://", 7)) { 996 *source = new FileSource(url + 7); 997 } else if (strncasecmp(url, "http://", 7) 998 && strncasecmp(url, "https://", 8)) { 999 return ERROR_UNSUPPORTED; 1000 } else { 1001 KeyedVector<String8, String8> headers = mExtraHeaders; 1002 if (range_offset > 0 || range_length >= 0) { 1003 headers.add( 1004 String8("Range"), 1005 String8( 1006 AStringPrintf( 1007 "bytes=%lld-%s", 1008 range_offset, 1009 range_length < 0 1010 ? "" : AStringPrintf("%lld", 1011 range_offset + range_length - 1).c_str()).c_str())); 1012 } 1013 1014 HTTPBase* httpDataSource = 1015 (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get(); 1016 status_t err = httpDataSource->connect(url, &headers); 1017 1018 if (err != OK) { 1019 return err; 1020 } 1021 1022 if (*source == NULL) { 1023 *source = mHTTPDataSource; 1024 } 1025 } 1026 } 1027 1028 status_t getSizeErr = (*source)->getSize(&size); 1029 if (getSizeErr != OK) { 1030 size = 65536; 1031 } 1032 1033 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 1034 if (*out == NULL) { 1035 buffer->setRange(0, 0); 1036 } 1037 1038 ssize_t bytesRead = 0; 1039 // adjust range_length if only reading partial block 1040 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 1041 range_length = buffer->size() + block_size; 1042 } 1043 for (;;) { 1044 // Only resize when we don't know the size. 1045 size_t bufferRemaining = buffer->capacity() - buffer->size(); 1046 if (bufferRemaining == 0 && getSizeErr != OK) { 1047 size_t bufferIncrement = buffer->size() / 2; 1048 if (bufferIncrement < 32768) { 1049 bufferIncrement = 32768; 1050 } 1051 bufferRemaining = bufferIncrement; 1052 1053 ALOGV("increasing download buffer to %zu bytes", 1054 buffer->size() + bufferRemaining); 1055 1056 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 1057 memcpy(copy->data(), buffer->data(), buffer->size()); 1058 copy->setRange(0, buffer->size()); 1059 1060 buffer = copy; 1061 } 1062 1063 size_t maxBytesToRead = bufferRemaining; 1064 if (range_length >= 0) { 1065 int64_t bytesLeftInRange = range_length - buffer->size(); 1066 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 1067 maxBytesToRead = bytesLeftInRange; 1068 1069 if (bytesLeftInRange == 0) { 1070 break; 1071 } 1072 } 1073 } 1074 1075 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 1076 // to help us break out of the loop. 1077 ssize_t n = (*source)->readAt( 1078 buffer->size(), buffer->data() + buffer->size(), 1079 maxBytesToRead); 1080 1081 if (n < 0) { 1082 return n; 1083 } 1084 1085 if (n == 0) { 1086 break; 1087 } 1088 1089 buffer->setRange(0, buffer->size() + (size_t)n); 1090 bytesRead += n; 1091 } 1092 1093 *out = buffer; 1094 if (actualUrl != NULL) { 1095 *actualUrl = (*source)->getUri(); 1096 if (actualUrl->isEmpty()) { 1097 *actualUrl = url; 1098 } 1099 } 1100 1101 return bytesRead; 1102} 1103 1104sp<M3UParser> LiveSession::fetchPlaylist( 1105 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 1106 ALOGV("fetchPlaylist '%s'", url); 1107 1108 *unchanged = false; 1109 1110 sp<ABuffer> buffer; 1111 String8 actualUrl; 1112 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 1113 1114 // close off the connection after use 1115 mHTTPDataSource->disconnect(); 1116 1117 if (err <= 0) { 1118 return NULL; 1119 } 1120 1121 // MD5 functionality is not available on the simulator, treat all 1122 // playlists as changed. 1123 1124#if defined(HAVE_ANDROID_OS) 1125 uint8_t hash[16]; 1126 1127 MD5_CTX m; 1128 MD5_Init(&m); 1129 MD5_Update(&m, buffer->data(), buffer->size()); 1130 1131 MD5_Final(hash, &m); 1132 1133 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 1134 // playlist unchanged 1135 *unchanged = true; 1136 1137 return NULL; 1138 } 1139 1140 if (curPlaylistHash != NULL) { 1141 memcpy(curPlaylistHash, hash, sizeof(hash)); 1142 } 1143#endif 1144 1145 sp<M3UParser> playlist = 1146 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 1147 1148 if (playlist->initCheck() != OK) { 1149 ALOGE("failed to parse .m3u8 playlist"); 1150 1151 return NULL; 1152 } 1153 1154 return playlist; 1155} 1156 1157#if 0 1158static double uniformRand() { 1159 return (double)rand() / RAND_MAX; 1160} 1161#endif 1162 1163bool LiveSession::resumeFetcher( 1164 const AString &uri, uint32_t streamMask, int64_t timeUs, bool newUri) { 1165 ssize_t index = mFetcherInfos.indexOfKey(uri); 1166 if (index < 0) { 1167 ALOGE("did not find fetcher for uri: %s", uri.c_str()); 1168 return false; 1169 } 1170 1171 bool resume = false; 1172 sp<AnotherPacketSource> sources[kMaxStreams]; 1173 for (size_t i = 0; i < kMaxStreams; ++i) { 1174 if ((streamMask & indexToType(i)) 1175 && ((!newUri && uri == mStreams[i].mUri) 1176 || (newUri && uri == mStreams[i].mNewUri))) { 1177 resume = true; 1178 if (newUri) { 1179 sources[i] = mPacketSources2.valueFor(indexToType(i)); 1180 sources[i]->clear(); 1181 } else { 1182 sources[i] = mPacketSources.valueFor(indexToType(i)); 1183 } 1184 } 1185 } 1186 1187 if (resume) { 1188 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(index).mFetcher; 1189 SeekMode seekMode = newUri ? kSeekModeNextSample : kSeekModeExactPosition; 1190 1191 ALOGV("resuming fetcher-%d, timeUs=%lld, seekMode=%d", 1192 fetcher->getFetcherID(), (long long)timeUs, seekMode); 1193 1194 fetcher->startAsync( 1195 sources[kAudioIndex], 1196 sources[kVideoIndex], 1197 sources[kSubtitleIndex], 1198 timeUs, -1, -1, seekMode); 1199 } 1200 1201 return resume; 1202} 1203 1204float LiveSession::getAbortThreshold( 1205 ssize_t currentBWIndex, ssize_t targetBWIndex) const { 1206 float abortThreshold = -1.0f; 1207 if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) { 1208 /* 1209 If we're switching down, we need to decide whether to 1210 1211 1) finish last segment of high-bandwidth variant, or 1212 2) abort last segment of high-bandwidth variant, and fetch an 1213 overlapping portion from low-bandwidth variant. 1214 1215 Here we try to maximize the amount of buffer left when the 1216 switch point is met. Given the following parameters: 1217 1218 B: our current buffering level in seconds 1219 T: target duration in seconds 1220 X: sample duration in seconds remain to fetch in last segment 1221 bw0: bandwidth of old variant (as specified in playlist) 1222 bw1: bandwidth of new variant (as specified in playlist) 1223 bw: measured bandwidth available 1224 1225 If we choose 1), when switch happens at the end of current 1226 segment, our buffering will be 1227 B + X - X * bw0 / bw 1228 1229 If we choose 2), when switch happens where we aborted current 1230 segment, our buffering will be 1231 B - (T - X) * bw1 / bw 1232 1233 We should only choose 1) if 1234 X/T < bw1 / (bw1 + bw0 - bw) 1235 */ 1236 1237 // Taking the measured current bandwidth at 50% face value only, 1238 // as our bandwidth estimation is a lagging indicator. Being 1239 // conservative on this, we prefer switching to lower bandwidth 1240 // unless we're really confident finishing up the last segment 1241 // of higher bandwidth will be fast. 1242 CHECK(mLastBandwidthBps >= 0); 1243 abortThreshold = 1244 (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1245 / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth 1246 + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth 1247 - (float)mLastBandwidthBps * 0.5f); 1248 if (abortThreshold < 0.0f) { 1249 abortThreshold = -1.0f; // do not abort 1250 } 1251 ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f", 1252 mBandwidthItems.itemAt(currentBWIndex).mBandwidth, 1253 mBandwidthItems.itemAt(targetBWIndex).mBandwidth, 1254 mLastBandwidthBps, 1255 abortThreshold); 1256 } 1257 return abortThreshold; 1258} 1259 1260void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) { 1261 mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs); 1262} 1263 1264size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) { 1265 if (mBandwidthItems.size() < 2) { 1266 // shouldn't be here if we only have 1 bandwidth, check 1267 // logic to get rid of redundant bandwidth polling 1268 ALOGW("getBandwidthIndex() called for single bandwidth playlist!"); 1269 return 0; 1270 } 1271 1272#if 1 1273 char value[PROPERTY_VALUE_MAX]; 1274 ssize_t index = -1; 1275 if (property_get("media.httplive.bw-index", value, NULL)) { 1276 char *end; 1277 index = strtol(value, &end, 10); 1278 CHECK(end > value && *end == '\0'); 1279 1280 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 1281 index = mBandwidthItems.size() - 1; 1282 } 1283 } 1284 1285 if (index < 0) { 1286 char value[PROPERTY_VALUE_MAX]; 1287 if (property_get("media.httplive.max-bw", value, NULL)) { 1288 char *end; 1289 long maxBw = strtoul(value, &end, 10); 1290 if (end > value && *end == '\0') { 1291 if (maxBw > 0 && bandwidthBps > maxBw) { 1292 ALOGV("bandwidth capped to %ld bps", maxBw); 1293 bandwidthBps = maxBw; 1294 } 1295 } 1296 } 1297 1298 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1299 1300 index = mBandwidthItems.size() - 1; 1301 while (index > 0) { 1302 // be conservative (70%) to avoid overestimating and immediately 1303 // switching down again. 1304 size_t adjustedBandwidthBps = bandwidthBps * 7 / 10; 1305 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1306 break; 1307 } 1308 --index; 1309 } 1310 } 1311#elif 0 1312 // Change bandwidth at random() 1313 size_t index = uniformRand() * mBandwidthItems.size(); 1314#elif 0 1315 // There's a 50% chance to stay on the current bandwidth and 1316 // a 50% chance to switch to the next higher bandwidth (wrapping around 1317 // to lowest) 1318 const size_t kMinIndex = 0; 1319 1320 static ssize_t mCurBandwidthIndex = -1; 1321 1322 size_t index; 1323 if (mCurBandwidthIndex < 0) { 1324 index = kMinIndex; 1325 } else if (uniformRand() < 0.5) { 1326 index = (size_t)mCurBandwidthIndex; 1327 } else { 1328 index = mCurBandwidthIndex + 1; 1329 if (index == mBandwidthItems.size()) { 1330 index = kMinIndex; 1331 } 1332 } 1333 mCurBandwidthIndex = index; 1334#elif 0 1335 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1336 1337 size_t index = mBandwidthItems.size() - 1; 1338 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1339 --index; 1340 } 1341#elif 1 1342 char value[PROPERTY_VALUE_MAX]; 1343 size_t index; 1344 if (property_get("media.httplive.bw-index", value, NULL)) { 1345 char *end; 1346 index = strtoul(value, &end, 10); 1347 CHECK(end > value && *end == '\0'); 1348 1349 if (index >= mBandwidthItems.size()) { 1350 index = mBandwidthItems.size() - 1; 1351 } 1352 } else { 1353 index = 0; 1354 } 1355#else 1356 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1357#endif 1358 1359 CHECK_GE(index, 0); 1360 1361 return index; 1362} 1363 1364HLSTime LiveSession::latestMediaSegmentStartTime() const { 1365 HLSTime audioTime(mPacketSources.valueFor( 1366 STREAMTYPE_AUDIO)->getLatestDequeuedMeta()); 1367 1368 HLSTime videoTime(mPacketSources.valueFor( 1369 STREAMTYPE_VIDEO)->getLatestDequeuedMeta()); 1370 1371 return audioTime < videoTime ? videoTime : audioTime; 1372} 1373 1374void LiveSession::onSeek(const sp<AMessage> &msg) { 1375 int64_t timeUs; 1376 CHECK(msg->findInt64("timeUs", &timeUs)); 1377 changeConfiguration(timeUs); 1378} 1379 1380status_t LiveSession::getDuration(int64_t *durationUs) const { 1381 int64_t maxDurationUs = -1ll; 1382 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1383 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1384 1385 if (fetcherDurationUs > maxDurationUs) { 1386 maxDurationUs = fetcherDurationUs; 1387 } 1388 } 1389 1390 *durationUs = maxDurationUs; 1391 1392 return OK; 1393} 1394 1395bool LiveSession::isSeekable() const { 1396 int64_t durationUs; 1397 return getDuration(&durationUs) == OK && durationUs >= 0; 1398} 1399 1400bool LiveSession::hasDynamicDuration() const { 1401 return false; 1402} 1403 1404size_t LiveSession::getTrackCount() const { 1405 if (mPlaylist == NULL) { 1406 return 0; 1407 } else { 1408 return mPlaylist->getTrackCount(); 1409 } 1410} 1411 1412sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1413 if (mPlaylist == NULL) { 1414 return NULL; 1415 } else { 1416 return mPlaylist->getTrackInfo(trackIndex); 1417 } 1418} 1419 1420status_t LiveSession::selectTrack(size_t index, bool select) { 1421 if (mPlaylist == NULL) { 1422 return INVALID_OPERATION; 1423 } 1424 1425 ALOGV("selectTrack: index=%zu, select=%d, mSubtitleGen=%d++", 1426 index, select, mSubtitleGeneration); 1427 1428 ++mSubtitleGeneration; 1429 status_t err = mPlaylist->selectTrack(index, select); 1430 if (err == OK) { 1431 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this); 1432 msg->setInt32("pickTrack", select); 1433 msg->post(); 1434 } 1435 return err; 1436} 1437 1438ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1439 if (mPlaylist == NULL) { 1440 return -1; 1441 } else { 1442 return mPlaylist->getSelectedTrack(type); 1443 } 1444} 1445 1446void LiveSession::changeConfiguration( 1447 int64_t timeUs, ssize_t bandwidthIndex, bool pickTrack) { 1448 ALOGV("changeConfiguration: timeUs=%lld us, bwIndex=%zd, pickTrack=%d", 1449 (long long)timeUs, bandwidthIndex, pickTrack); 1450 1451 cancelBandwidthSwitch(); 1452 1453 CHECK(!mReconfigurationInProgress); 1454 mReconfigurationInProgress = true; 1455 if (bandwidthIndex >= 0) { 1456 mOrigBandwidthIndex = mCurBandwidthIndex; 1457 mCurBandwidthIndex = bandwidthIndex; 1458 } 1459 CHECK_LT(mCurBandwidthIndex, mBandwidthItems.size()); 1460 const BandwidthItem &item = mBandwidthItems.itemAt(mCurBandwidthIndex); 1461 1462 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1463 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1464 1465 AString URIs[kMaxStreams]; 1466 for (size_t i = 0; i < kMaxStreams; ++i) { 1467 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1468 streamMask |= indexToType(i); 1469 } 1470 } 1471 1472 // Step 1, stop and discard fetchers that are no longer needed. 1473 // Pause those that we'll reuse. 1474 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1475 // skip fetchers that are marked mToBeRemoved, 1476 // these are done and can't be reused 1477 if (mFetcherInfos[i].mToBeRemoved) { 1478 continue; 1479 } 1480 1481 const AString &uri = mFetcherInfos.keyAt(i); 1482 sp<PlaylistFetcher> &fetcher = mFetcherInfos.editValueAt(i).mFetcher; 1483 1484 bool discardFetcher = true, delayRemoval = false; 1485 for (size_t j = 0; j < kMaxStreams; ++j) { 1486 StreamType type = indexToType(j); 1487 if ((streamMask & type) && uri == URIs[j]) { 1488 resumeMask |= type; 1489 streamMask &= ~type; 1490 discardFetcher = false; 1491 } 1492 } 1493 // Delay fetcher removal if not picking tracks, AND old fetcher 1494 // has stream mask that overlaps new variant. (Okay to discard 1495 // old fetcher now, if completely no overlap.) 1496 if (discardFetcher && timeUs < 0ll && !pickTrack 1497 && (fetcher->getStreamTypeMask() & streamMask)) { 1498 discardFetcher = false; 1499 delayRemoval = true; 1500 } 1501 1502 if (discardFetcher) { 1503 ALOGV("discarding fetcher-%d", fetcher->getFetcherID()); 1504 fetcher->stopAsync(); 1505 } else { 1506 float threshold = -1.0f; // always finish fetching by default 1507 if (timeUs >= 0ll) { 1508 // seeking, no need to finish fetching 1509 threshold = 0.0f; 1510 } else if (delayRemoval) { 1511 // adapting, abort if remaining of current segment is over threshold 1512 threshold = getAbortThreshold( 1513 mOrigBandwidthIndex, mCurBandwidthIndex); 1514 } 1515 1516 ALOGV("pausing fetcher-%d, threshold=%.2f", 1517 fetcher->getFetcherID(), threshold); 1518 fetcher->pauseAsync(threshold); 1519 } 1520 } 1521 1522 sp<AMessage> msg; 1523 if (timeUs < 0ll) { 1524 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1525 msg = new AMessage(kWhatChangeConfiguration3, this); 1526 } else { 1527 msg = new AMessage(kWhatChangeConfiguration2, this); 1528 } 1529 msg->setInt32("streamMask", streamMask); 1530 msg->setInt32("resumeMask", resumeMask); 1531 msg->setInt32("pickTrack", pickTrack); 1532 msg->setInt64("timeUs", timeUs); 1533 for (size_t i = 0; i < kMaxStreams; ++i) { 1534 if ((streamMask | resumeMask) & indexToType(i)) { 1535 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1536 } 1537 } 1538 1539 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1540 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1541 // fetchers have completed their asynchronous operation, we'll post 1542 // mContinuation, which then is handled below in onChangeConfiguration2. 1543 mContinuationCounter = mFetcherInfos.size(); 1544 mContinuation = msg; 1545 1546 if (mContinuationCounter == 0) { 1547 msg->post(); 1548 } 1549} 1550 1551void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1552 ALOGV("onChangeConfiguration"); 1553 1554 if (!mReconfigurationInProgress) { 1555 int32_t pickTrack = 0; 1556 msg->findInt32("pickTrack", &pickTrack); 1557 changeConfiguration(-1ll /* timeUs */, -1, pickTrack); 1558 } else { 1559 msg->post(1000000ll); // retry in 1 sec 1560 } 1561} 1562 1563void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1564 ALOGV("onChangeConfiguration2"); 1565 1566 mContinuation.clear(); 1567 1568 // All fetchers are either suspended or have been removed now. 1569 1570 // If we're seeking, clear all packet sources before we report 1571 // seek complete, to prevent decoder from pulling stale data. 1572 int64_t timeUs; 1573 CHECK(msg->findInt64("timeUs", &timeUs)); 1574 1575 if (timeUs >= 0) { 1576 mLastSeekTimeUs = timeUs; 1577 1578 for (size_t i = 0; i < mPacketSources.size(); i++) { 1579 mPacketSources.editValueAt(i)->clear(); 1580 } 1581 1582 for (size_t i = 0; i < kMaxStreams; ++i) { 1583 mStreams[i].mCurDiscontinuitySeq = 0; 1584 } 1585 1586 mDiscontinuityOffsetTimesUs.clear(); 1587 mDiscontinuityAbsStartTimesUs.clear(); 1588 1589 if (mSeekReplyID != NULL) { 1590 CHECK(mSeekReply != NULL); 1591 mSeekReply->setInt32("err", OK); 1592 mSeekReply->postReply(mSeekReplyID); 1593 mSeekReplyID.clear(); 1594 mSeekReply.clear(); 1595 } 1596 1597 // restart buffer polling after seek becauese previous 1598 // buffering position is no longer valid. 1599 restartPollBuffering(); 1600 } 1601 1602 uint32_t streamMask, resumeMask; 1603 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1604 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1605 1606 streamMask |= resumeMask; 1607 1608 AString URIs[kMaxStreams]; 1609 for (size_t i = 0; i < kMaxStreams; ++i) { 1610 if (streamMask & indexToType(i)) { 1611 const AString &uriKey = mStreams[i].uriKey(); 1612 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1613 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1614 } 1615 } 1616 1617 uint32_t changedMask = 0; 1618 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1619 // stream URI could change even if onChangeConfiguration2 is only 1620 // used for seek. Seek could happen during a bw switch, in this 1621 // case bw switch will be cancelled, but the seekTo position will 1622 // fetch from the new URI. 1623 if ((mStreamMask & streamMask & indexToType(i)) 1624 && !mStreams[i].mUri.empty() 1625 && !(URIs[i] == mStreams[i].mUri)) { 1626 ALOGV("stream %zu changed: oldURI %s, newURI %s", i, 1627 mStreams[i].mUri.c_str(), URIs[i].c_str()); 1628 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i)); 1629 source->queueDiscontinuity( 1630 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1631 } 1632 // Determine which decoders to shutdown on the player side, 1633 // a decoder has to be shutdown if its streamtype was active 1634 // before but now longer isn't. 1635 if ((mStreamMask & ~streamMask & indexToType(i))) { 1636 changedMask |= indexToType(i); 1637 } 1638 } 1639 1640 if (changedMask == 0) { 1641 // If nothing changed as far as the audio/video decoders 1642 // are concerned we can proceed. 1643 onChangeConfiguration3(msg); 1644 return; 1645 } 1646 1647 // Something changed, inform the player which will shutdown the 1648 // corresponding decoders and will post the reply once that's done. 1649 // Handling the reply will continue executing below in 1650 // onChangeConfiguration3. 1651 sp<AMessage> notify = mNotify->dup(); 1652 notify->setInt32("what", kWhatStreamsChanged); 1653 notify->setInt32("changedMask", changedMask); 1654 1655 msg->setWhat(kWhatChangeConfiguration3); 1656 msg->setTarget(this); 1657 1658 notify->setMessage("reply", msg); 1659 notify->post(); 1660} 1661 1662void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1663 mContinuation.clear(); 1664 // All remaining fetchers are still suspended, the player has shutdown 1665 // any decoders that needed it. 1666 1667 uint32_t streamMask, resumeMask; 1668 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1669 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1670 1671 mNewStreamMask = streamMask | resumeMask; 1672 1673 int64_t timeUs; 1674 int32_t pickTrack; 1675 bool switching = false; 1676 CHECK(msg->findInt64("timeUs", &timeUs)); 1677 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1678 1679 if (timeUs < 0ll) { 1680 if (!pickTrack) { 1681 // mSwapMask contains streams that are in both old and new variant, 1682 // (in mNewStreamMask & mStreamMask) but with different URIs 1683 // (not in resumeMask). 1684 // For example, old variant has video and audio in two separate 1685 // URIs, and new variant has only audio with unchanged URI. mSwapMask 1686 // should be 0 as there is nothing to swap. We only need to stop video, 1687 // and resume audio. 1688 mSwapMask = mNewStreamMask & mStreamMask & ~resumeMask; 1689 switching = (mSwapMask != 0); 1690 if (!switching) { 1691 ALOGV("#### Finishing Bandwidth Switch Early: %zd => %zd", 1692 mOrigBandwidthIndex, mCurBandwidthIndex); 1693 } 1694 } 1695 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1696 } else { 1697 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1698 } 1699 1700 ALOGV("onChangeConfiguration3: timeUs=%lld, switching=%d, pickTrack=%d, " 1701 "mStreamMask=0x%x, mNewStreamMask=0x%x, mSwapMask=0x%x", 1702 (long long)timeUs, switching, pickTrack, 1703 mStreamMask, mNewStreamMask, mSwapMask); 1704 1705 for (size_t i = 0; i < kMaxStreams; ++i) { 1706 if (streamMask & indexToType(i)) { 1707 if (switching) { 1708 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1709 } else { 1710 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1711 } 1712 } 1713 } 1714 1715 // Of all existing fetchers: 1716 // * Resume fetchers that are still needed and assign them original packet sources. 1717 // * Mark otherwise unneeded fetchers for removal. 1718 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1719 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1720 const AString &uri = mFetcherInfos.keyAt(i); 1721 if (!resumeFetcher(uri, resumeMask, timeUs)) { 1722 ALOGV("marking fetcher-%d to be removed", 1723 mFetcherInfos[i].mFetcher->getFetcherID()); 1724 1725 mFetcherInfos.editValueAt(i).mToBeRemoved = true; 1726 } 1727 } 1728 1729 // streamMask now only contains the types that need a new fetcher created. 1730 if (streamMask != 0) { 1731 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1732 } 1733 1734 // Find out when the original fetchers have buffered up to and start the new fetchers 1735 // at a later timestamp. 1736 for (size_t i = 0; i < kMaxStreams; i++) { 1737 if (!(indexToType(i) & streamMask)) { 1738 continue; 1739 } 1740 1741 AString uri; 1742 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1743 1744 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1745 CHECK(fetcher != NULL); 1746 1747 HLSTime startTime; 1748 SeekMode seekMode = kSeekModeExactPosition; 1749 sp<AnotherPacketSource> sources[kMaxStreams]; 1750 1751 if (i == kSubtitleIndex || (!pickTrack && !switching)) { 1752 startTime = latestMediaSegmentStartTime(); 1753 } 1754 1755 // TRICKY: looping from i as earlier streams are already removed from streamMask 1756 for (size_t j = i; j < kMaxStreams; ++j) { 1757 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1758 if ((streamMask & indexToType(j)) && uri == streamUri) { 1759 sources[j] = mPacketSources.valueFor(indexToType(j)); 1760 1761 if (timeUs >= 0) { 1762 startTime.mTimeUs = timeUs; 1763 } else { 1764 int32_t type; 1765 sp<AMessage> meta; 1766 if (!switching) { 1767 // selecting, or adapting but no swap required 1768 meta = sources[j]->getLatestDequeuedMeta(); 1769 } else { 1770 // adapting and swap required 1771 meta = sources[j]->getLatestEnqueuedMeta(); 1772 if (meta != NULL && mCurBandwidthIndex > mOrigBandwidthIndex) { 1773 // switching up 1774 meta = sources[j]->getMetaAfterLastDequeued(mUpSwitchMargin); 1775 } 1776 } 1777 1778 if (j != kSubtitleIndex && meta != NULL 1779 && !meta->findInt32("discontinuity", &type)) { 1780 HLSTime tmpTime(meta); 1781 if (startTime < tmpTime) { 1782 startTime = tmpTime; 1783 } 1784 } 1785 1786 if (!switching) { 1787 // selecting, or adapting but no swap required 1788 sources[j]->clear(); 1789 if (j == kSubtitleIndex) { 1790 break; 1791 } 1792 1793 ALOGV("stream[%zu]: queue format change", j); 1794 sources[j]->queueDiscontinuity( 1795 ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true); 1796 } else { 1797 // switching, queue discontinuities after resume 1798 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1799 sources[j]->clear(); 1800 // the new fetcher might be providing streams that used to be 1801 // provided by two different fetchers, if one of the fetcher 1802 // paused in the middle while the other somehow paused in next 1803 // seg, we have to start from next seg. 1804 if (seekMode < mStreams[j].mSeekMode) { 1805 seekMode = mStreams[j].mSeekMode; 1806 } 1807 } 1808 } 1809 1810 streamMask &= ~indexToType(j); 1811 } 1812 } 1813 1814 ALOGV("[fetcher-%d] startAsync: startTimeUs %lld mLastSeekTimeUs %lld " 1815 "segmentStartTimeUs %lld seekMode %d", 1816 fetcher->getFetcherID(), 1817 (long long)startTime.mTimeUs, 1818 (long long)mLastSeekTimeUs, 1819 (long long)startTime.getSegmentTimeUs(true /* midpoint */), 1820 seekMode); 1821 1822 // Set the target segment start time to the middle point of the 1823 // segment where the last sample was. 1824 // This gives a better guess if segments of the two variants are not 1825 // perfectly aligned. (If the corresponding segment in new variant 1826 // starts slightly later than that in the old variant, we still want 1827 // to pick that segment, not the one before) 1828 fetcher->startAsync( 1829 sources[kAudioIndex], 1830 sources[kVideoIndex], 1831 sources[kSubtitleIndex], 1832 startTime.mTimeUs < 0 ? mLastSeekTimeUs : startTime.mTimeUs, 1833 startTime.getSegmentTimeUs(true /* midpoint */), 1834 startTime.mSeq, 1835 seekMode); 1836 } 1837 1838 // All fetchers have now been started, the configuration change 1839 // has completed. 1840 1841 mReconfigurationInProgress = false; 1842 if (switching) { 1843 mSwitchInProgress = true; 1844 } else { 1845 mStreamMask = mNewStreamMask; 1846 mOrigBandwidthIndex = mCurBandwidthIndex; 1847 } 1848 1849 ALOGV("onChangeConfiguration3: mSwitchInProgress %d, mStreamMask 0x%x", 1850 mSwitchInProgress, mStreamMask); 1851 1852 if (mDisconnectReplyID != NULL) { 1853 finishDisconnect(); 1854 } 1855} 1856 1857void LiveSession::swapPacketSource(StreamType stream) { 1858 ALOGV("[%s] swapPacketSource", getNameForStream(stream)); 1859 1860 // transfer packets from source2 to source 1861 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 1862 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 1863 1864 // queue discontinuity in mPacketSource 1865 aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false); 1866 1867 // queue packets in mPacketSource2 to mPacketSource 1868 status_t finalResult = OK; 1869 sp<ABuffer> accessUnit; 1870 while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK && 1871 OK == aps2->dequeueAccessUnit(&accessUnit)) { 1872 aps->queueAccessUnit(accessUnit); 1873 } 1874 aps2->clear(); 1875} 1876 1877void LiveSession::tryToFinishBandwidthSwitch(const AString &oldUri) { 1878 if (!mSwitchInProgress) { 1879 return; 1880 } 1881 1882 ssize_t index = mFetcherInfos.indexOfKey(oldUri); 1883 if (index < 0 || !mFetcherInfos[index].mToBeRemoved) { 1884 return; 1885 } 1886 1887 // Swap packet source of streams provided by old variant 1888 for (size_t idx = 0; idx < kMaxStreams; idx++) { 1889 StreamType stream = indexToType(idx); 1890 if ((mSwapMask & stream) && (oldUri == mStreams[idx].mUri)) { 1891 swapPacketSource(stream); 1892 1893 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1894 ALOGW("swapping stream type %d %s to empty stream", 1895 stream, mStreams[idx].mUri.c_str()); 1896 } 1897 mStreams[idx].mUri = mStreams[idx].mNewUri; 1898 mStreams[idx].mNewUri.clear(); 1899 1900 mSwapMask &= ~stream; 1901 } 1902 } 1903 1904 mFetcherInfos.editValueAt(index).mFetcher->stopAsync(false /* clear */); 1905 1906 ALOGV("tryToFinishBandwidthSwitch: mSwapMask=0x%x", mSwapMask); 1907 if (mSwapMask != 0) { 1908 return; 1909 } 1910 1911 // Check if new variant contains extra streams. 1912 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1913 while (extraStreams) { 1914 StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1915 extraStreams &= ~stream; 1916 1917 swapPacketSource(stream); 1918 1919 ssize_t idx = typeToIndex(stream); 1920 CHECK(idx >= 0); 1921 if (mStreams[idx].mNewUri.empty()) { 1922 ALOGW("swapping extra stream type %d %s to empty stream", 1923 stream, mStreams[idx].mUri.c_str()); 1924 } 1925 mStreams[idx].mUri = mStreams[idx].mNewUri; 1926 mStreams[idx].mNewUri.clear(); 1927 } 1928 1929 // Restart new fetcher (it was paused after the first 47k block) 1930 // and let it fetch into mPacketSources (not mPacketSources2) 1931 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1932 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1933 if (info.mToBeResumed) { 1934 resumeFetcher(mFetcherInfos.keyAt(i), mNewStreamMask); 1935 info.mToBeResumed = false; 1936 } 1937 } 1938 1939 ALOGI("#### Finished Bandwidth Switch: %zd => %zd", 1940 mOrigBandwidthIndex, mCurBandwidthIndex); 1941 1942 mStreamMask = mNewStreamMask; 1943 mSwitchInProgress = false; 1944 mOrigBandwidthIndex = mCurBandwidthIndex; 1945 1946 restartPollBuffering(); 1947} 1948 1949void LiveSession::schedulePollBuffering() { 1950 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); 1951 msg->setInt32("generation", mPollBufferingGeneration); 1952 msg->post(1000000ll); 1953} 1954 1955void LiveSession::cancelPollBuffering() { 1956 ++mPollBufferingGeneration; 1957 mPrevBufferPercentage = -1; 1958} 1959 1960void LiveSession::restartPollBuffering() { 1961 cancelPollBuffering(); 1962 onPollBuffering(); 1963} 1964 1965void LiveSession::onPollBuffering() { 1966 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " 1967 "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x", 1968 mSwitchInProgress, mReconfigurationInProgress, 1969 mInPreparationPhase, mCurBandwidthIndex, mStreamMask); 1970 1971 bool underflow, ready, down, up; 1972 if (checkBuffering(underflow, ready, down, up)) { 1973 if (mInPreparationPhase && ready) { 1974 postPrepared(OK); 1975 } 1976 1977 // don't switch before we report prepared 1978 if (!mInPreparationPhase) { 1979 if (ready) { 1980 stopBufferingIfNecessary(); 1981 } else if (underflow) { 1982 startBufferingIfNecessary(); 1983 } 1984 switchBandwidthIfNeeded(up, down); 1985 } 1986 1987 } 1988 1989 schedulePollBuffering(); 1990} 1991 1992void LiveSession::cancelBandwidthSwitch(bool resume) { 1993 ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++, orig %zd, cur %zd", 1994 mSwitchGeneration, mOrigBandwidthIndex, mCurBandwidthIndex); 1995 if (!mSwitchInProgress) { 1996 return; 1997 } 1998 1999 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 2000 FetcherInfo& info = mFetcherInfos.editValueAt(i); 2001 if (info.mToBeRemoved) { 2002 info.mToBeRemoved = false; 2003 if (resume) { 2004 resumeFetcher(mFetcherInfos.keyAt(i), mSwapMask); 2005 } 2006 } 2007 } 2008 2009 for (size_t i = 0; i < kMaxStreams; ++i) { 2010 AString newUri = mStreams[i].mNewUri; 2011 if (!newUri.empty()) { 2012 // clear all mNewUri matching this newUri 2013 for (size_t j = i; j < kMaxStreams; ++j) { 2014 if (mStreams[j].mNewUri == newUri) { 2015 mStreams[j].mNewUri.clear(); 2016 } 2017 } 2018 ALOGV("stopping newUri = %s", newUri.c_str()); 2019 ssize_t index = mFetcherInfos.indexOfKey(newUri); 2020 if (index < 0) { 2021 ALOGE("did not find fetcher for newUri: %s", newUri.c_str()); 2022 continue; 2023 } 2024 FetcherInfo &info = mFetcherInfos.editValueAt(index); 2025 info.mToBeRemoved = true; 2026 info.mFetcher->stopAsync(); 2027 } 2028 } 2029 2030 ALOGI("#### Canceled Bandwidth Switch: %zd => %zd", 2031 mOrigBandwidthIndex, mCurBandwidthIndex); 2032 2033 mSwitchGeneration++; 2034 mSwitchInProgress = false; 2035 mCurBandwidthIndex = mOrigBandwidthIndex; 2036 mSwapMask = 0; 2037} 2038 2039bool LiveSession::checkBuffering( 2040 bool &underflow, bool &ready, bool &down, bool &up) { 2041 underflow = ready = down = up = false; 2042 2043 if (mReconfigurationInProgress) { 2044 ALOGV("Switch/Reconfig in progress, defer buffer polling"); 2045 return false; 2046 } 2047 2048 size_t activeCount, underflowCount, readyCount, downCount, upCount; 2049 activeCount = underflowCount = readyCount = downCount = upCount =0; 2050 int32_t minBufferPercent = -1; 2051 int64_t durationUs; 2052 if (getDuration(&durationUs) != OK) { 2053 durationUs = -1; 2054 } 2055 for (size_t i = 0; i < mPacketSources.size(); ++i) { 2056 // we don't check subtitles for buffering level 2057 if (!(mStreamMask & mPacketSources.keyAt(i) 2058 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { 2059 continue; 2060 } 2061 // ignore streams that never had any packet queued. 2062 // (it's possible that the variant only has audio or video) 2063 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 2064 if (meta == NULL) { 2065 continue; 2066 } 2067 2068 int64_t bufferedDurationUs = 2069 mPacketSources[i]->getEstimatedDurationUs(); 2070 ALOGV("[%s] buffered %lld us", 2071 getNameForStream(mPacketSources.keyAt(i)), 2072 (long long)bufferedDurationUs); 2073 if (durationUs >= 0) { 2074 int32_t percent; 2075 if (mPacketSources[i]->isFinished(0 /* duration */)) { 2076 percent = 100; 2077 } else { 2078 percent = (int32_t)(100.0 * (mLastDequeuedTimeUs + bufferedDurationUs) / durationUs); 2079 } 2080 if (minBufferPercent < 0 || percent < minBufferPercent) { 2081 minBufferPercent = percent; 2082 } 2083 } 2084 2085 ++activeCount; 2086 int64_t readyMark = mInPreparationPhase ? kPrepareMarkUs : kReadyMarkUs; 2087 if (bufferedDurationUs > readyMark 2088 || mPacketSources[i]->isFinished(0)) { 2089 ++readyCount; 2090 } 2091 if (!mPacketSources[i]->isFinished(0)) { 2092 if (bufferedDurationUs < kUnderflowMarkUs) { 2093 ++underflowCount; 2094 } 2095 if (bufferedDurationUs > mUpSwitchMark) { 2096 ++upCount; 2097 } 2098 if (bufferedDurationUs < mDownSwitchMark) { 2099 ++downCount; 2100 } 2101 } 2102 } 2103 2104 if (minBufferPercent >= 0) { 2105 notifyBufferingUpdate(minBufferPercent); 2106 } 2107 2108 if (activeCount > 0) { 2109 up = (upCount == activeCount); 2110 down = (downCount > 0); 2111 ready = (readyCount == activeCount); 2112 underflow = (underflowCount > 0); 2113 return true; 2114 } 2115 2116 return false; 2117} 2118 2119void LiveSession::startBufferingIfNecessary() { 2120 ALOGV("startBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2121 mInPreparationPhase, mBuffering); 2122 if (!mBuffering) { 2123 mBuffering = true; 2124 2125 sp<AMessage> notify = mNotify->dup(); 2126 notify->setInt32("what", kWhatBufferingStart); 2127 notify->post(); 2128 } 2129} 2130 2131void LiveSession::stopBufferingIfNecessary() { 2132 ALOGV("stopBufferingIfNecessary: mInPreparationPhase=%d, mBuffering=%d", 2133 mInPreparationPhase, mBuffering); 2134 2135 if (mBuffering) { 2136 mBuffering = false; 2137 2138 sp<AMessage> notify = mNotify->dup(); 2139 notify->setInt32("what", kWhatBufferingEnd); 2140 notify->post(); 2141 } 2142} 2143 2144void LiveSession::notifyBufferingUpdate(int32_t percentage) { 2145 if (percentage < mPrevBufferPercentage) { 2146 percentage = mPrevBufferPercentage; 2147 } else if (percentage > 100) { 2148 percentage = 100; 2149 } 2150 2151 mPrevBufferPercentage = percentage; 2152 2153 ALOGV("notifyBufferingUpdate: percentage=%d%%", percentage); 2154 2155 sp<AMessage> notify = mNotify->dup(); 2156 notify->setInt32("what", kWhatBufferingUpdate); 2157 notify->setInt32("percentage", percentage); 2158 notify->post(); 2159} 2160 2161void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) { 2162 // no need to check bandwidth if we only have 1 bandwidth settings 2163 if (mSwitchInProgress || mBandwidthItems.size() < 2) { 2164 return; 2165 } 2166 2167 int32_t bandwidthBps; 2168 if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) { 2169 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 2170 mLastBandwidthBps = bandwidthBps; 2171 } else { 2172 ALOGV("no bandwidth estimate."); 2173 return; 2174 } 2175 2176 int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth; 2177 // canSwithDown and canSwitchUp can't both be true. 2178 // we only want to switch up when measured bw is 120% higher than current variant, 2179 // and we only want to switch down when measured bw is below current variant. 2180 bool canSwithDown = bufferLow 2181 && (bandwidthBps < (int32_t)curBandwidth); 2182 bool canSwitchUp = bufferHigh 2183 && (bandwidthBps > (int32_t)curBandwidth * 12 / 10); 2184 2185 if (canSwithDown || canSwitchUp) { 2186 ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps); 2187 2188 // it's possible that we're checking for canSwitchUp case, but the returned 2189 // bandwidthIndex is < mCurBandwidthIndex, as getBandwidthIndex() only uses 70% 2190 // of measured bw. In that case we don't want to do anything, since we have 2191 // both enough buffer and enough bw. 2192 if (bandwidthIndex == mCurBandwidthIndex 2193 || (canSwitchUp && bandwidthIndex < mCurBandwidthIndex) 2194 || (canSwithDown && bandwidthIndex > mCurBandwidthIndex)) { 2195 return; 2196 } 2197 2198 ALOGI("#### Starting Bandwidth Switch: %zd => %zd", 2199 mCurBandwidthIndex, bandwidthIndex); 2200 changeConfiguration(-1, bandwidthIndex, false); 2201 } 2202} 2203 2204void LiveSession::postError(status_t err) { 2205 // if we reached EOS, notify buffering of 100% 2206 if (err == ERROR_END_OF_STREAM) { 2207 notifyBufferingUpdate(100); 2208 } 2209 // we'll stop buffer polling now, before that notify 2210 // stop buffering to stop the spinning icon 2211 stopBufferingIfNecessary(); 2212 cancelPollBuffering(); 2213 2214 sp<AMessage> notify = mNotify->dup(); 2215 notify->setInt32("what", kWhatError); 2216 notify->setInt32("err", err); 2217 notify->post(); 2218} 2219 2220void LiveSession::postPrepared(status_t err) { 2221 CHECK(mInPreparationPhase); 2222 2223 sp<AMessage> notify = mNotify->dup(); 2224 if (err == OK || err == ERROR_END_OF_STREAM) { 2225 notify->setInt32("what", kWhatPrepared); 2226 } else { 2227 cancelPollBuffering(); 2228 2229 notify->setInt32("what", kWhatPreparationFailed); 2230 notify->setInt32("err", err); 2231 } 2232 2233 notify->post(); 2234 2235 mInPreparationPhase = false; 2236} 2237 2238 2239} // namespace android 2240 2241