LiveSession.cpp revision 98d53011c390ab0c3cb8d5d9e47251876174d5d4
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52// static 53// Number of recently-read bytes to use for bandwidth estimation 54const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024; 55// High water mark to start up switch or report prepared) 56const int64_t LiveSession::kHighWaterMark = 8000000ll; 57const int64_t LiveSession::kMidWaterMark = 5000000ll; 58const int64_t LiveSession::kLowWaterMark = 3000000ll; 59 60LiveSession::LiveSession( 61 const sp<AMessage> ¬ify, uint32_t flags, 62 const sp<IMediaHTTPService> &httpService) 63 : mNotify(notify), 64 mFlags(flags), 65 mHTTPService(httpService), 66 mInPreparationPhase(true), 67 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 68 mCurBandwidthIndex(-1), 69 mStreamMask(0), 70 mNewStreamMask(0), 71 mSwapMask(0), 72 mCheckBandwidthGeneration(0), 73 mSwitchGeneration(0), 74 mSubtitleGeneration(0), 75 mLastDequeuedTimeUs(0ll), 76 mRealTimeBaseUs(0ll), 77 mReconfigurationInProgress(false), 78 mSwitchInProgress(false), 79 mDisconnectReplyID(0), 80 mSeekReplyID(0), 81 mFirstTimeUsValid(false), 82 mFirstTimeUs(0), 83 mLastSeekTimeUs(0), 84 mPollBufferingGeneration(0) { 85 86 mStreams[kAudioIndex] = StreamItem("audio"); 87 mStreams[kVideoIndex] = StreamItem("video"); 88 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 89 90 for (size_t i = 0; i < kMaxStreams; ++i) { 91 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 92 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 93 mBuffering[i] = false; 94 } 95 96 size_t numHistoryItems = kBandwidthHistoryBytes / 97 PlaylistFetcher::kDownloadBlockSize + 1; 98 if (numHistoryItems < 5) { 99 numHistoryItems = 5; 100 } 101 mHTTPDataSource->setBandwidthHistorySize(numHistoryItems); 102} 103 104LiveSession::~LiveSession() { 105 if (mFetcherLooper != NULL) { 106 mFetcherLooper->stop(); 107 } 108} 109 110sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 111 ABuffer *discontinuity = new ABuffer(0); 112 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 113 discontinuity->meta()->setInt32("swapPacketSource", swap); 114 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 115 discontinuity->meta()->setInt64("timeUs", -1); 116 return discontinuity; 117} 118 119void LiveSession::swapPacketSource(StreamType stream) { 120 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 121 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 122 sp<AnotherPacketSource> tmp = aps; 123 aps = aps2; 124 aps2 = tmp; 125 aps2->clear(); 126} 127 128status_t LiveSession::dequeueAccessUnit( 129 StreamType stream, sp<ABuffer> *accessUnit) { 130 if (!(mStreamMask & stream)) { 131 // return -EWOULDBLOCK to avoid halting the decoder 132 // when switching between audio/video and audio only. 133 return -EWOULDBLOCK; 134 } 135 136 status_t finalResult = OK; 137 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 138 139 ssize_t idx = typeToIndex(stream); 140 if (!packetSource->hasBufferAvailable(&finalResult)) { 141 if (finalResult == OK) { 142 mBuffering[idx] = true; 143 return -EAGAIN; 144 } else { 145 return finalResult; 146 } 147 } 148 149 int32_t targetDuration = 0; 150 sp<AMessage> meta = packetSource->getLatestEnqueuedMeta(); 151 if (meta != NULL) { 152 meta->findInt32("targetDuration", &targetDuration); 153 } 154 155 int64_t targetDurationUs = targetDuration * 1000000ll; 156 if (targetDurationUs == 0 || 157 targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) { 158 // Fetchers limit buffering to 159 // min(3 * targetDuration, kMinBufferedDurationUs) 160 targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs; 161 } 162 163 if (mBuffering[idx]) { 164 if (mSwitchInProgress 165 || packetSource->isFinished(0) 166 || packetSource->hasBufferAvailable(&finalResult)) { 167 mBuffering[idx] = false; 168 } 169 } 170 171 if (mBuffering[idx]) { 172 return -EAGAIN; 173 } 174 175 // wait for counterpart 176 sp<AnotherPacketSource> otherSource; 177 uint32_t mask = mNewStreamMask & mStreamMask; 178 uint32_t fetchersMask = 0; 179 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 180 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 181 fetchersMask |= fetcherMask; 182 } 183 mask &= fetchersMask; 184 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 185 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 186 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 187 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 188 } 189 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 190 return finalResult == OK ? -EAGAIN : finalResult; 191 } 192 193 status_t err = packetSource->dequeueAccessUnit(accessUnit); 194 195 size_t streamIdx; 196 const char *streamStr; 197 switch (stream) { 198 case STREAMTYPE_AUDIO: 199 streamIdx = kAudioIndex; 200 streamStr = "audio"; 201 break; 202 case STREAMTYPE_VIDEO: 203 streamIdx = kVideoIndex; 204 streamStr = "video"; 205 break; 206 case STREAMTYPE_SUBTITLES: 207 streamIdx = kSubtitleIndex; 208 streamStr = "subs"; 209 break; 210 default: 211 TRESPASS(); 212 } 213 214 StreamItem& strm = mStreams[streamIdx]; 215 if (err == INFO_DISCONTINUITY) { 216 // adaptive streaming, discontinuities in the playlist 217 int32_t type; 218 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 219 220 sp<AMessage> extra; 221 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 222 extra.clear(); 223 } 224 225 ALOGI("[%s] read discontinuity of type %d, extra = %s", 226 streamStr, 227 type, 228 extra == NULL ? "NULL" : extra->debugString().c_str()); 229 230 int32_t swap; 231 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 232 int32_t switchGeneration; 233 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 234 { 235 Mutex::Autolock lock(mSwapMutex); 236 if (switchGeneration == mSwitchGeneration) { 237 swapPacketSource(stream); 238 sp<AMessage> msg = new AMessage(kWhatSwapped, this); 239 msg->setInt32("stream", stream); 240 msg->setInt32("switchGeneration", switchGeneration); 241 msg->post(); 242 } 243 } 244 } else { 245 size_t seq = strm.mCurDiscontinuitySeq; 246 int64_t offsetTimeUs; 247 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 248 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 249 } else { 250 offsetTimeUs = 0; 251 } 252 253 seq += 1; 254 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 255 int64_t firstTimeUs; 256 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 257 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 258 offsetTimeUs += strm.mLastSampleDurationUs; 259 } else { 260 offsetTimeUs += strm.mLastSampleDurationUs; 261 } 262 263 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 264 } 265 } else if (err == OK) { 266 267 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 268 int64_t timeUs; 269 int32_t discontinuitySeq = 0; 270 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 271 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 272 strm.mCurDiscontinuitySeq = discontinuitySeq; 273 274 int32_t discard = 0; 275 int64_t firstTimeUs; 276 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 277 int64_t durUs; // approximate sample duration 278 if (timeUs > strm.mLastDequeuedTimeUs) { 279 durUs = timeUs - strm.mLastDequeuedTimeUs; 280 } else { 281 durUs = strm.mLastDequeuedTimeUs - timeUs; 282 } 283 strm.mLastSampleDurationUs = durUs; 284 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 285 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 286 firstTimeUs = timeUs; 287 } else { 288 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 289 firstTimeUs = timeUs; 290 } 291 292 strm.mLastDequeuedTimeUs = timeUs; 293 if (timeUs >= firstTimeUs) { 294 timeUs -= firstTimeUs; 295 } else { 296 timeUs = 0; 297 } 298 timeUs += mLastSeekTimeUs; 299 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 300 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 301 } 302 303 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 304 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 305 mLastDequeuedTimeUs = timeUs; 306 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 307 } else if (stream == STREAMTYPE_SUBTITLES) { 308 int32_t subtitleGeneration; 309 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 310 && subtitleGeneration != mSubtitleGeneration) { 311 return -EAGAIN; 312 }; 313 (*accessUnit)->meta()->setInt32( 314 "trackIndex", mPlaylist->getSelectedIndex()); 315 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 316 } 317 } else { 318 ALOGI("[%s] encountered error %d", streamStr, err); 319 } 320 321 return err; 322} 323 324status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 325 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 326 if (!(mStreamMask & stream)) { 327 return UNKNOWN_ERROR; 328 } 329 330 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 331 332 sp<MetaData> meta = packetSource->getFormat(); 333 334 if (meta == NULL) { 335 return -EAGAIN; 336 } 337 338 return convertMetaDataToMessage(meta, format); 339} 340 341void LiveSession::connectAsync( 342 const char *url, const KeyedVector<String8, String8> *headers) { 343 sp<AMessage> msg = new AMessage(kWhatConnect, this); 344 msg->setString("url", url); 345 346 if (headers != NULL) { 347 msg->setPointer( 348 "headers", 349 new KeyedVector<String8, String8>(*headers)); 350 } 351 352 msg->post(); 353} 354 355status_t LiveSession::disconnect() { 356 sp<AMessage> msg = new AMessage(kWhatDisconnect, this); 357 358 sp<AMessage> response; 359 status_t err = msg->postAndAwaitResponse(&response); 360 361 return err; 362} 363 364status_t LiveSession::seekTo(int64_t timeUs) { 365 sp<AMessage> msg = new AMessage(kWhatSeek, this); 366 msg->setInt64("timeUs", timeUs); 367 368 sp<AMessage> response; 369 status_t err = msg->postAndAwaitResponse(&response); 370 371 return err; 372} 373 374void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 375 switch (msg->what()) { 376 case kWhatConnect: 377 { 378 onConnect(msg); 379 break; 380 } 381 382 case kWhatDisconnect: 383 { 384 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 385 386 if (mReconfigurationInProgress) { 387 break; 388 } 389 390 finishDisconnect(); 391 break; 392 } 393 394 case kWhatSeek: 395 { 396 sp<AReplyToken> seekReplyID; 397 CHECK(msg->senderAwaitsResponse(&seekReplyID)); 398 mSeekReplyID = seekReplyID; 399 mSeekReply = new AMessage; 400 401 status_t err = onSeek(msg); 402 403 if (err != OK) { 404 msg->post(50000); 405 } 406 break; 407 } 408 409 case kWhatFetcherNotify: 410 { 411 int32_t what; 412 CHECK(msg->findInt32("what", &what)); 413 414 switch (what) { 415 case PlaylistFetcher::kWhatStarted: 416 break; 417 case PlaylistFetcher::kWhatPaused: 418 case PlaylistFetcher::kWhatStopped: 419 { 420 if (what == PlaylistFetcher::kWhatStopped) { 421 AString uri; 422 CHECK(msg->findString("uri", &uri)); 423 ssize_t index = mFetcherInfos.indexOfKey(uri); 424 if (index < 0) { 425 // ignore duplicated kWhatStopped messages. 426 break; 427 } 428 429 mFetcherLooper->unregisterHandler( 430 mFetcherInfos[index].mFetcher->id()); 431 mFetcherInfos.removeItemsAt(index); 432 433 if (mSwitchInProgress) { 434 tryToFinishBandwidthSwitch(); 435 } 436 } 437 438 if (mContinuation != NULL) { 439 CHECK_GT(mContinuationCounter, 0); 440 if (--mContinuationCounter == 0) { 441 mContinuation->post(); 442 } 443 } 444 break; 445 } 446 447 case PlaylistFetcher::kWhatDurationUpdate: 448 { 449 AString uri; 450 CHECK(msg->findString("uri", &uri)); 451 452 int64_t durationUs; 453 CHECK(msg->findInt64("durationUs", &durationUs)); 454 455 ssize_t index = mFetcherInfos.indexOfKey(uri); 456 if (index >= 0) { 457 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 458 info->mDurationUs = durationUs; 459 } 460 break; 461 } 462 463 case PlaylistFetcher::kWhatError: 464 { 465 status_t err; 466 CHECK(msg->findInt32("err", &err)); 467 468 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 469 470 // handle EOS on subtitle tracks independently 471 AString uri; 472 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 473 ssize_t i = mFetcherInfos.indexOfKey(uri); 474 if (i >= 0) { 475 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 476 if (fetcher != NULL) { 477 uint32_t type = fetcher->getStreamTypeMask(); 478 if (type == STREAMTYPE_SUBTITLES) { 479 mPacketSources.valueFor( 480 STREAMTYPE_SUBTITLES)->signalEOS(err);; 481 break; 482 } 483 } 484 } 485 } 486 487 if (mInPreparationPhase) { 488 postPrepared(err); 489 } 490 491 cancelBandwidthSwitch(); 492 493 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 494 495 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 496 497 mPacketSources.valueFor( 498 STREAMTYPE_SUBTITLES)->signalEOS(err); 499 500 sp<AMessage> notify = mNotify->dup(); 501 notify->setInt32("what", kWhatError); 502 notify->setInt32("err", err); 503 notify->post(); 504 break; 505 } 506 507 case PlaylistFetcher::kWhatStartedAt: 508 { 509 int32_t switchGeneration; 510 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 511 512 if (switchGeneration != mSwitchGeneration) { 513 break; 514 } 515 516 // Resume fetcher for the original variant; the resumed fetcher should 517 // continue until the timestamps found in msg, which is stored by the 518 // new fetcher to indicate where the new variant has started buffering. 519 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 520 const FetcherInfo info = mFetcherInfos.valueAt(i); 521 if (info.mToBeRemoved) { 522 info.mFetcher->resumeUntilAsync(msg); 523 } 524 } 525 break; 526 } 527 528 default: 529 TRESPASS(); 530 } 531 532 break; 533 } 534 535 case kWhatChangeConfiguration: 536 { 537 onChangeConfiguration(msg); 538 break; 539 } 540 541 case kWhatChangeConfiguration2: 542 { 543 onChangeConfiguration2(msg); 544 break; 545 } 546 547 case kWhatChangeConfiguration3: 548 { 549 onChangeConfiguration3(msg); 550 break; 551 } 552 553 case kWhatFinishDisconnect2: 554 { 555 onFinishDisconnect2(); 556 break; 557 } 558 559 case kWhatSwapped: 560 { 561 onSwapped(msg); 562 break; 563 } 564 565 case kWhatPollBuffering: 566 { 567 int32_t generation; 568 CHECK(msg->findInt32("generation", &generation)); 569 if (generation == mPollBufferingGeneration) { 570 onPollBuffering(); 571 } 572 break; 573 } 574 575 default: 576 TRESPASS(); 577 break; 578 } 579} 580 581// static 582int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 583 if (a->mBandwidth < b->mBandwidth) { 584 return -1; 585 } else if (a->mBandwidth == b->mBandwidth) { 586 return 0; 587 } 588 589 return 1; 590} 591 592// static 593LiveSession::StreamType LiveSession::indexToType(int idx) { 594 CHECK(idx >= 0 && idx < kMaxStreams); 595 return (StreamType)(1 << idx); 596} 597 598// static 599ssize_t LiveSession::typeToIndex(int32_t type) { 600 switch (type) { 601 case STREAMTYPE_AUDIO: 602 return 0; 603 case STREAMTYPE_VIDEO: 604 return 1; 605 case STREAMTYPE_SUBTITLES: 606 return 2; 607 default: 608 return -1; 609 }; 610 return -1; 611} 612 613void LiveSession::onConnect(const sp<AMessage> &msg) { 614 AString url; 615 CHECK(msg->findString("url", &url)); 616 617 KeyedVector<String8, String8> *headers = NULL; 618 if (!msg->findPointer("headers", (void **)&headers)) { 619 mExtraHeaders.clear(); 620 } else { 621 mExtraHeaders = *headers; 622 623 delete headers; 624 headers = NULL; 625 } 626 627 // TODO currently we don't know if we are coming here from incognito mode 628 ALOGI("onConnect %s", uriDebugString(url).c_str()); 629 630 mMasterURL = url; 631 632 bool dummy; 633 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 634 635 if (mPlaylist == NULL) { 636 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 637 638 postPrepared(ERROR_IO); 639 return; 640 } 641 642 // create looper for fetchers 643 if (mFetcherLooper == NULL) { 644 mFetcherLooper = new ALooper(); 645 646 mFetcherLooper->setName("Fetcher"); 647 mFetcherLooper->start(false, false); 648 } 649 650 // We trust the content provider to make a reasonable choice of preferred 651 // initial bandwidth by listing it first in the variant playlist. 652 // At startup we really don't have a good estimate on the available 653 // network bandwidth since we haven't tranferred any data yet. Once 654 // we have we can make a better informed choice. 655 size_t initialBandwidth = 0; 656 size_t initialBandwidthIndex = 0; 657 658 if (mPlaylist->isVariantPlaylist()) { 659 for (size_t i = 0; i < mPlaylist->size(); ++i) { 660 BandwidthItem item; 661 662 item.mPlaylistIndex = i; 663 664 sp<AMessage> meta; 665 AString uri; 666 mPlaylist->itemAt(i, &uri, &meta); 667 668 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 669 670 if (initialBandwidth == 0) { 671 initialBandwidth = item.mBandwidth; 672 } 673 674 mBandwidthItems.push(item); 675 } 676 677 CHECK_GT(mBandwidthItems.size(), 0u); 678 679 mBandwidthItems.sort(SortByBandwidth); 680 681 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 682 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 683 initialBandwidthIndex = i; 684 break; 685 } 686 } 687 } else { 688 // dummy item. 689 BandwidthItem item; 690 item.mPlaylistIndex = 0; 691 item.mBandwidth = 0; 692 mBandwidthItems.push(item); 693 } 694 695 mPlaylist->pickRandomMediaItems(); 696 changeConfiguration( 697 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 698 699 schedulePollBuffering(); 700} 701 702void LiveSession::finishDisconnect() { 703 // No reconfiguration is currently pending, make sure none will trigger 704 // during disconnection either. 705 706 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 707 // (finishDisconnect, onFinishDisconnect2) 708 cancelBandwidthSwitch(); 709 710 // cancel buffer polling 711 cancelPollBuffering(); 712 713 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 714 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 715 } 716 717 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this); 718 719 mContinuationCounter = mFetcherInfos.size(); 720 mContinuation = msg; 721 722 if (mContinuationCounter == 0) { 723 msg->post(); 724 } 725} 726 727void LiveSession::onFinishDisconnect2() { 728 mContinuation.clear(); 729 730 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 731 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 732 733 mPacketSources.valueFor( 734 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 735 736 sp<AMessage> response = new AMessage; 737 response->setInt32("err", OK); 738 739 response->postReply(mDisconnectReplyID); 740 mDisconnectReplyID = 0; 741} 742 743sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 744 ssize_t index = mFetcherInfos.indexOfKey(uri); 745 746 if (index >= 0) { 747 return NULL; 748 } 749 750 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this); 751 notify->setString("uri", uri); 752 notify->setInt32("switchGeneration", mSwitchGeneration); 753 754 FetcherInfo info; 755 info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration); 756 info.mDurationUs = -1ll; 757 info.mIsPrepared = false; 758 info.mToBeRemoved = false; 759 mFetcherLooper->registerHandler(info.mFetcher); 760 761 mFetcherInfos.add(uri, info); 762 763 return info.mFetcher; 764} 765 766/* 767 * Illustration of parameters: 768 * 769 * 0 `range_offset` 770 * +------------+-------------------------------------------------------+--+--+ 771 * | | | next block to fetch | | | 772 * | | `source` handle => `out` buffer | | | | 773 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 774 * | |<----------- `range_length` / buffer capacity ----------->| | 775 * |<------------------------------ file_size ------------------------------->| 776 * 777 * Special parameter values: 778 * - range_length == -1 means entire file 779 * - block_size == 0 means entire range 780 * 781 */ 782ssize_t LiveSession::fetchFile( 783 const char *url, sp<ABuffer> *out, 784 int64_t range_offset, int64_t range_length, 785 uint32_t block_size, /* download block size */ 786 sp<DataSource> *source, /* to return and reuse source */ 787 String8 *actualUrl) { 788 off64_t size; 789 sp<DataSource> temp_source; 790 if (source == NULL) { 791 source = &temp_source; 792 } 793 794 if (*source == NULL) { 795 if (!strncasecmp(url, "file://", 7)) { 796 *source = new FileSource(url + 7); 797 } else if (strncasecmp(url, "http://", 7) 798 && strncasecmp(url, "https://", 8)) { 799 return ERROR_UNSUPPORTED; 800 } else { 801 KeyedVector<String8, String8> headers = mExtraHeaders; 802 if (range_offset > 0 || range_length >= 0) { 803 headers.add( 804 String8("Range"), 805 String8( 806 AStringPrintf( 807 "bytes=%lld-%s", 808 range_offset, 809 range_length < 0 810 ? "" : AStringPrintf("%lld", 811 range_offset + range_length - 1).c_str()).c_str())); 812 } 813 status_t err = mHTTPDataSource->connect(url, &headers); 814 815 if (err != OK) { 816 return err; 817 } 818 819 *source = mHTTPDataSource; 820 } 821 } 822 823 status_t getSizeErr = (*source)->getSize(&size); 824 if (getSizeErr != OK) { 825 size = 65536; 826 } 827 828 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 829 if (*out == NULL) { 830 buffer->setRange(0, 0); 831 } 832 833 ssize_t bytesRead = 0; 834 // adjust range_length if only reading partial block 835 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 836 range_length = buffer->size() + block_size; 837 } 838 for (;;) { 839 // Only resize when we don't know the size. 840 size_t bufferRemaining = buffer->capacity() - buffer->size(); 841 if (bufferRemaining == 0 && getSizeErr != OK) { 842 size_t bufferIncrement = buffer->size() / 2; 843 if (bufferIncrement < 32768) { 844 bufferIncrement = 32768; 845 } 846 bufferRemaining = bufferIncrement; 847 848 ALOGV("increasing download buffer to %zu bytes", 849 buffer->size() + bufferRemaining); 850 851 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 852 memcpy(copy->data(), buffer->data(), buffer->size()); 853 copy->setRange(0, buffer->size()); 854 855 buffer = copy; 856 } 857 858 size_t maxBytesToRead = bufferRemaining; 859 if (range_length >= 0) { 860 int64_t bytesLeftInRange = range_length - buffer->size(); 861 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 862 maxBytesToRead = bytesLeftInRange; 863 864 if (bytesLeftInRange == 0) { 865 break; 866 } 867 } 868 } 869 870 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 871 // to help us break out of the loop. 872 ssize_t n = (*source)->readAt( 873 buffer->size(), buffer->data() + buffer->size(), 874 maxBytesToRead); 875 876 if (n < 0) { 877 return n; 878 } 879 880 if (n == 0) { 881 break; 882 } 883 884 buffer->setRange(0, buffer->size() + (size_t)n); 885 bytesRead += n; 886 } 887 888 *out = buffer; 889 if (actualUrl != NULL) { 890 *actualUrl = (*source)->getUri(); 891 if (actualUrl->isEmpty()) { 892 *actualUrl = url; 893 } 894 } 895 896 return bytesRead; 897} 898 899sp<M3UParser> LiveSession::fetchPlaylist( 900 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 901 ALOGV("fetchPlaylist '%s'", url); 902 903 *unchanged = false; 904 905 sp<ABuffer> buffer; 906 String8 actualUrl; 907 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 908 909 if (err <= 0) { 910 return NULL; 911 } 912 913 // MD5 functionality is not available on the simulator, treat all 914 // playlists as changed. 915 916#if defined(HAVE_ANDROID_OS) 917 uint8_t hash[16]; 918 919 MD5_CTX m; 920 MD5_Init(&m); 921 MD5_Update(&m, buffer->data(), buffer->size()); 922 923 MD5_Final(hash, &m); 924 925 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 926 // playlist unchanged 927 *unchanged = true; 928 929 return NULL; 930 } 931 932 if (curPlaylistHash != NULL) { 933 memcpy(curPlaylistHash, hash, sizeof(hash)); 934 } 935#endif 936 937 sp<M3UParser> playlist = 938 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 939 940 if (playlist->initCheck() != OK) { 941 ALOGE("failed to parse .m3u8 playlist"); 942 943 return NULL; 944 } 945 946 return playlist; 947} 948 949#if 0 950static double uniformRand() { 951 return (double)rand() / RAND_MAX; 952} 953#endif 954 955size_t LiveSession::getBandwidthIndex() { 956 if (mBandwidthItems.size() == 0) { 957 return 0; 958 } 959 960#if 1 961 char value[PROPERTY_VALUE_MAX]; 962 ssize_t index = -1; 963 if (property_get("media.httplive.bw-index", value, NULL)) { 964 char *end; 965 index = strtol(value, &end, 10); 966 CHECK(end > value && *end == '\0'); 967 968 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 969 index = mBandwidthItems.size() - 1; 970 } 971 } 972 973 if (index < 0) { 974 int32_t bandwidthBps; 975 if (mHTTPDataSource != NULL 976 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 977 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 978 } else { 979 ALOGV("no bandwidth estimate."); 980 return 0; // Pick the lowest bandwidth stream by default. 981 } 982 983 char value[PROPERTY_VALUE_MAX]; 984 if (property_get("media.httplive.max-bw", value, NULL)) { 985 char *end; 986 long maxBw = strtoul(value, &end, 10); 987 if (end > value && *end == '\0') { 988 if (maxBw > 0 && bandwidthBps > maxBw) { 989 ALOGV("bandwidth capped to %ld bps", maxBw); 990 bandwidthBps = maxBw; 991 } 992 } 993 } 994 995 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 996 997 index = mBandwidthItems.size() - 1; 998 while (index > 0) { 999 // consider only 80% of the available bandwidth, but if we are switching up, 1000 // be even more conservative (70%) to avoid overestimating and immediately 1001 // switching back. 1002 size_t adjustedBandwidthBps = bandwidthBps; 1003 if (index > mCurBandwidthIndex) { 1004 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 1005 } else { 1006 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 1007 } 1008 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1009 break; 1010 } 1011 --index; 1012 } 1013 } 1014#elif 0 1015 // Change bandwidth at random() 1016 size_t index = uniformRand() * mBandwidthItems.size(); 1017#elif 0 1018 // There's a 50% chance to stay on the current bandwidth and 1019 // a 50% chance to switch to the next higher bandwidth (wrapping around 1020 // to lowest) 1021 const size_t kMinIndex = 0; 1022 1023 static ssize_t mCurBandwidthIndex = -1; 1024 1025 size_t index; 1026 if (mCurBandwidthIndex < 0) { 1027 index = kMinIndex; 1028 } else if (uniformRand() < 0.5) { 1029 index = (size_t)mCurBandwidthIndex; 1030 } else { 1031 index = mCurBandwidthIndex + 1; 1032 if (index == mBandwidthItems.size()) { 1033 index = kMinIndex; 1034 } 1035 } 1036 mCurBandwidthIndex = index; 1037#elif 0 1038 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1039 1040 size_t index = mBandwidthItems.size() - 1; 1041 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1042 --index; 1043 } 1044#elif 1 1045 char value[PROPERTY_VALUE_MAX]; 1046 size_t index; 1047 if (property_get("media.httplive.bw-index", value, NULL)) { 1048 char *end; 1049 index = strtoul(value, &end, 10); 1050 CHECK(end > value && *end == '\0'); 1051 1052 if (index >= mBandwidthItems.size()) { 1053 index = mBandwidthItems.size() - 1; 1054 } 1055 } else { 1056 index = 0; 1057 } 1058#else 1059 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1060#endif 1061 1062 CHECK_GE(index, 0); 1063 1064 return index; 1065} 1066 1067int64_t LiveSession::latestMediaSegmentStartTimeUs() { 1068 sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta(); 1069 int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1; 1070 if (audioMeta != NULL) { 1071 audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs); 1072 } 1073 1074 sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta(); 1075 if (videoMeta != NULL 1076 && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) { 1077 if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) { 1078 minSegmentStartTimeUs = videoSegmentStartTimeUs; 1079 } 1080 1081 } 1082 return minSegmentStartTimeUs; 1083} 1084 1085status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1086 int64_t timeUs; 1087 CHECK(msg->findInt64("timeUs", &timeUs)); 1088 1089 if (!mReconfigurationInProgress) { 1090 changeConfiguration(timeUs, mCurBandwidthIndex); 1091 return OK; 1092 } else { 1093 return -EWOULDBLOCK; 1094 } 1095} 1096 1097status_t LiveSession::getDuration(int64_t *durationUs) const { 1098 int64_t maxDurationUs = -1ll; 1099 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1100 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1101 1102 if (fetcherDurationUs > maxDurationUs) { 1103 maxDurationUs = fetcherDurationUs; 1104 } 1105 } 1106 1107 *durationUs = maxDurationUs; 1108 1109 return OK; 1110} 1111 1112bool LiveSession::isSeekable() const { 1113 int64_t durationUs; 1114 return getDuration(&durationUs) == OK && durationUs >= 0; 1115} 1116 1117bool LiveSession::hasDynamicDuration() const { 1118 return false; 1119} 1120 1121size_t LiveSession::getTrackCount() const { 1122 if (mPlaylist == NULL) { 1123 return 0; 1124 } else { 1125 return mPlaylist->getTrackCount(); 1126 } 1127} 1128 1129sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1130 if (mPlaylist == NULL) { 1131 return NULL; 1132 } else { 1133 return mPlaylist->getTrackInfo(trackIndex); 1134 } 1135} 1136 1137status_t LiveSession::selectTrack(size_t index, bool select) { 1138 if (mPlaylist == NULL) { 1139 return INVALID_OPERATION; 1140 } 1141 1142 ++mSubtitleGeneration; 1143 status_t err = mPlaylist->selectTrack(index, select); 1144 if (err == OK) { 1145 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this); 1146 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1147 msg->setInt32("pickTrack", select); 1148 msg->post(); 1149 } 1150 return err; 1151} 1152 1153ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1154 if (mPlaylist == NULL) { 1155 return -1; 1156 } else { 1157 return mPlaylist->getSelectedTrack(type); 1158 } 1159} 1160 1161void LiveSession::changeConfiguration( 1162 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1163 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1164 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1165 cancelBandwidthSwitch(); 1166 1167 CHECK(!mReconfigurationInProgress); 1168 mReconfigurationInProgress = true; 1169 1170 mCurBandwidthIndex = bandwidthIndex; 1171 1172 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1173 timeUs, bandwidthIndex, pickTrack); 1174 1175 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1176 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1177 1178 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1179 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1180 1181 AString URIs[kMaxStreams]; 1182 for (size_t i = 0; i < kMaxStreams; ++i) { 1183 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1184 streamMask |= indexToType(i); 1185 } 1186 } 1187 1188 // Step 1, stop and discard fetchers that are no longer needed. 1189 // Pause those that we'll reuse. 1190 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1191 const AString &uri = mFetcherInfos.keyAt(i); 1192 1193 bool discardFetcher = true; 1194 1195 // If we're seeking all current fetchers are discarded. 1196 if (timeUs < 0ll) { 1197 // delay fetcher removal if not picking tracks 1198 discardFetcher = pickTrack; 1199 1200 for (size_t j = 0; j < kMaxStreams; ++j) { 1201 StreamType type = indexToType(j); 1202 if ((streamMask & type) && uri == URIs[j]) { 1203 resumeMask |= type; 1204 streamMask &= ~type; 1205 discardFetcher = false; 1206 } 1207 } 1208 } 1209 1210 if (discardFetcher) { 1211 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1212 } else { 1213 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1214 } 1215 } 1216 1217 sp<AMessage> msg; 1218 if (timeUs < 0ll) { 1219 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1220 msg = new AMessage(kWhatChangeConfiguration3, this); 1221 } else { 1222 msg = new AMessage(kWhatChangeConfiguration2, this); 1223 } 1224 msg->setInt32("streamMask", streamMask); 1225 msg->setInt32("resumeMask", resumeMask); 1226 msg->setInt32("pickTrack", pickTrack); 1227 msg->setInt64("timeUs", timeUs); 1228 for (size_t i = 0; i < kMaxStreams; ++i) { 1229 if ((streamMask | resumeMask) & indexToType(i)) { 1230 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1231 } 1232 } 1233 1234 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1235 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1236 // fetchers have completed their asynchronous operation, we'll post 1237 // mContinuation, which then is handled below in onChangeConfiguration2. 1238 mContinuationCounter = mFetcherInfos.size(); 1239 mContinuation = msg; 1240 1241 if (mContinuationCounter == 0) { 1242 msg->post(); 1243 } 1244} 1245 1246void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1247 if (!mReconfigurationInProgress) { 1248 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1249 msg->findInt32("pickTrack", &pickTrack); 1250 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1251 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1252 } else { 1253 msg->post(1000000ll); // retry in 1 sec 1254 } 1255} 1256 1257void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1258 mContinuation.clear(); 1259 1260 // All fetchers are either suspended or have been removed now. 1261 1262 // If we're seeking, clear all packet sources before we report 1263 // seek complete, to prevent decoder from pulling stale data. 1264 int64_t timeUs; 1265 CHECK(msg->findInt64("timeUs", &timeUs)); 1266 1267 if (timeUs >= 0) { 1268 mLastSeekTimeUs = timeUs; 1269 1270 for (size_t i = 0; i < mPacketSources.size(); i++) { 1271 mPacketSources.editValueAt(i)->clear(); 1272 } 1273 1274 mDiscontinuityOffsetTimesUs.clear(); 1275 mDiscontinuityAbsStartTimesUs.clear(); 1276 1277 if (mSeekReplyID != 0) { 1278 CHECK(mSeekReply != NULL); 1279 mSeekReply->setInt32("err", OK); 1280 mSeekReply->postReply(mSeekReplyID); 1281 mSeekReplyID = 0; 1282 mSeekReply.clear(); 1283 } 1284 } 1285 1286 uint32_t streamMask, resumeMask; 1287 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1288 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1289 1290 // currently onChangeConfiguration2 is only called for seeking; 1291 // remove the following CHECK if using it else where. 1292 CHECK_EQ(resumeMask, 0); 1293 streamMask |= resumeMask; 1294 1295 AString URIs[kMaxStreams]; 1296 for (size_t i = 0; i < kMaxStreams; ++i) { 1297 if (streamMask & indexToType(i)) { 1298 const AString &uriKey = mStreams[i].uriKey(); 1299 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1300 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1301 } 1302 } 1303 1304 // Determine which decoders to shutdown on the player side, 1305 // a decoder has to be shutdown if either 1306 // 1) its streamtype was active before but now longer isn't. 1307 // or 1308 // 2) its streamtype was already active and still is but the URI 1309 // has changed. 1310 uint32_t changedMask = 0; 1311 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1312 if (((mStreamMask & streamMask & indexToType(i)) 1313 && !(URIs[i] == mStreams[i].mUri)) 1314 || (mStreamMask & ~streamMask & indexToType(i))) { 1315 changedMask |= indexToType(i); 1316 } 1317 } 1318 1319 if (changedMask == 0) { 1320 // If nothing changed as far as the audio/video decoders 1321 // are concerned we can proceed. 1322 onChangeConfiguration3(msg); 1323 return; 1324 } 1325 1326 // Something changed, inform the player which will shutdown the 1327 // corresponding decoders and will post the reply once that's done. 1328 // Handling the reply will continue executing below in 1329 // onChangeConfiguration3. 1330 sp<AMessage> notify = mNotify->dup(); 1331 notify->setInt32("what", kWhatStreamsChanged); 1332 notify->setInt32("changedMask", changedMask); 1333 1334 msg->setWhat(kWhatChangeConfiguration3); 1335 msg->setTarget(this); 1336 1337 notify->setMessage("reply", msg); 1338 notify->post(); 1339} 1340 1341void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1342 mContinuation.clear(); 1343 // All remaining fetchers are still suspended, the player has shutdown 1344 // any decoders that needed it. 1345 1346 uint32_t streamMask, resumeMask; 1347 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1348 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1349 1350 int64_t timeUs; 1351 int32_t pickTrack; 1352 bool switching = false; 1353 CHECK(msg->findInt64("timeUs", &timeUs)); 1354 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1355 1356 if (timeUs < 0ll) { 1357 if (!pickTrack) { 1358 switching = true; 1359 } 1360 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1361 } else { 1362 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1363 } 1364 1365 for (size_t i = 0; i < kMaxStreams; ++i) { 1366 if (streamMask & indexToType(i)) { 1367 if (switching) { 1368 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1369 } else { 1370 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1371 } 1372 } 1373 } 1374 1375 mNewStreamMask = streamMask | resumeMask; 1376 if (switching) { 1377 mSwapMask = mStreamMask & ~resumeMask; 1378 } 1379 1380 // Of all existing fetchers: 1381 // * Resume fetchers that are still needed and assign them original packet sources. 1382 // * Mark otherwise unneeded fetchers for removal. 1383 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1384 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1385 const AString &uri = mFetcherInfos.keyAt(i); 1386 1387 sp<AnotherPacketSource> sources[kMaxStreams]; 1388 for (size_t j = 0; j < kMaxStreams; ++j) { 1389 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1390 sources[j] = mPacketSources.valueFor(indexToType(j)); 1391 } 1392 } 1393 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1394 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1395 || sources[kSubtitleIndex] != NULL) { 1396 info.mFetcher->startAsync( 1397 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1398 } else { 1399 info.mToBeRemoved = true; 1400 } 1401 } 1402 1403 // streamMask now only contains the types that need a new fetcher created. 1404 1405 if (streamMask != 0) { 1406 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1407 } 1408 1409 // Find out when the original fetchers have buffered up to and start the new fetchers 1410 // at a later timestamp. 1411 for (size_t i = 0; i < kMaxStreams; i++) { 1412 if (!(indexToType(i) & streamMask)) { 1413 continue; 1414 } 1415 1416 AString uri; 1417 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1418 1419 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1420 CHECK(fetcher != NULL); 1421 1422 int64_t startTimeUs = -1; 1423 int64_t segmentStartTimeUs = -1ll; 1424 int32_t discontinuitySeq = -1; 1425 sp<AnotherPacketSource> sources[kMaxStreams]; 1426 1427 if (i == kSubtitleIndex) { 1428 segmentStartTimeUs = latestMediaSegmentStartTimeUs(); 1429 } 1430 1431 // TRICKY: looping from i as earlier streams are already removed from streamMask 1432 for (size_t j = i; j < kMaxStreams; ++j) { 1433 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1434 if ((streamMask & indexToType(j)) && uri == streamUri) { 1435 sources[j] = mPacketSources.valueFor(indexToType(j)); 1436 1437 if (timeUs >= 0) { 1438 startTimeUs = timeUs; 1439 } else { 1440 int32_t type; 1441 sp<AMessage> meta; 1442 if (pickTrack) { 1443 // selecting 1444 meta = sources[j]->getLatestDequeuedMeta(); 1445 } else { 1446 // adapting 1447 meta = sources[j]->getLatestEnqueuedMeta(); 1448 } 1449 1450 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1451 int64_t tmpUs; 1452 int64_t tmpSegmentUs; 1453 1454 CHECK(meta->findInt64("timeUs", &tmpUs)); 1455 CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs)); 1456 if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) { 1457 startTimeUs = tmpUs; 1458 segmentStartTimeUs = tmpSegmentUs; 1459 } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) { 1460 startTimeUs = tmpUs; 1461 } 1462 1463 int32_t seq; 1464 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1465 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1466 discontinuitySeq = seq; 1467 } 1468 } 1469 1470 if (pickTrack) { 1471 // selecting track, queue discontinuities before content 1472 sources[j]->clear(); 1473 if (j == kSubtitleIndex) { 1474 break; 1475 } 1476 1477 ALOGV("stream[%d]: queue format change", j); 1478 1479 sources[j]->queueDiscontinuity( 1480 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1481 } else { 1482 // adapting, queue discontinuities after resume 1483 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1484 sources[j]->clear(); 1485 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1486 if (extraStreams & indexToType(j)) { 1487 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1488 } 1489 } 1490 } 1491 1492 streamMask &= ~indexToType(j); 1493 } 1494 } 1495 1496 fetcher->startAsync( 1497 sources[kAudioIndex], 1498 sources[kVideoIndex], 1499 sources[kSubtitleIndex], 1500 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1501 segmentStartTimeUs, 1502 discontinuitySeq, 1503 switching); 1504 } 1505 1506 // All fetchers have now been started, the configuration change 1507 // has completed. 1508 1509 ALOGV("XXX configuration change completed."); 1510 mReconfigurationInProgress = false; 1511 if (switching) { 1512 mSwitchInProgress = true; 1513 } else { 1514 mStreamMask = mNewStreamMask; 1515 } 1516 1517 if (mDisconnectReplyID != 0) { 1518 finishDisconnect(); 1519 } 1520} 1521 1522void LiveSession::onSwapped(const sp<AMessage> &msg) { 1523 int32_t switchGeneration; 1524 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1525 if (switchGeneration != mSwitchGeneration) { 1526 return; 1527 } 1528 1529 int32_t stream; 1530 CHECK(msg->findInt32("stream", &stream)); 1531 1532 ssize_t idx = typeToIndex(stream); 1533 CHECK(idx >= 0); 1534 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1535 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1536 } 1537 mStreams[idx].mUri = mStreams[idx].mNewUri; 1538 mStreams[idx].mNewUri.clear(); 1539 1540 mSwapMask &= ~stream; 1541 if (mSwapMask != 0) { 1542 return; 1543 } 1544 1545 // Check if new variant contains extra streams. 1546 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1547 while (extraStreams) { 1548 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1549 swapPacketSource(extraStream); 1550 extraStreams &= ~extraStream; 1551 1552 idx = typeToIndex(extraStream); 1553 CHECK(idx >= 0); 1554 if (mStreams[idx].mNewUri.empty()) { 1555 ALOGW("swapping extra stream type %d %s to empty stream", 1556 extraStream, mStreams[idx].mUri.c_str()); 1557 } 1558 mStreams[idx].mUri = mStreams[idx].mNewUri; 1559 mStreams[idx].mNewUri.clear(); 1560 } 1561 1562 tryToFinishBandwidthSwitch(); 1563} 1564 1565void LiveSession::schedulePollBuffering() { 1566 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); 1567 msg->setInt32("generation", mPollBufferingGeneration); 1568 msg->post(1000000ll); 1569} 1570 1571void LiveSession::cancelPollBuffering() { 1572 ++mPollBufferingGeneration; 1573} 1574 1575void LiveSession::onPollBuffering() { 1576 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " 1577 "mInPreparationPhase %d, mStreamMask 0x%x", 1578 mSwitchInProgress, mReconfigurationInProgress, 1579 mInPreparationPhase, mStreamMask); 1580 1581 bool low, mid, high; 1582 if (checkBuffering(low, mid, high)) { 1583 if (mInPreparationPhase && mid) { 1584 postPrepared(OK); 1585 } 1586 1587 // don't switch before we report prepared 1588 if (!mInPreparationPhase && (low || high)) { 1589 switchBandwidthIfNeeded(high); 1590 } 1591 } 1592 1593 schedulePollBuffering(); 1594} 1595 1596// Mark switch done when: 1597// 1. all old buffers are swapped out 1598void LiveSession::tryToFinishBandwidthSwitch() { 1599 if (!mSwitchInProgress) { 1600 return; 1601 } 1602 1603 bool needToRemoveFetchers = false; 1604 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1605 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1606 needToRemoveFetchers = true; 1607 break; 1608 } 1609 } 1610 1611 if (!needToRemoveFetchers && mSwapMask == 0) { 1612 ALOGI("mSwitchInProgress = false"); 1613 mStreamMask = mNewStreamMask; 1614 mSwitchInProgress = false; 1615 } 1616} 1617 1618void LiveSession::cancelBandwidthSwitch() { 1619 Mutex::Autolock lock(mSwapMutex); 1620 mSwitchGeneration++; 1621 mSwitchInProgress = false; 1622 mSwapMask = 0; 1623 1624 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1625 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1626 if (info.mToBeRemoved) { 1627 info.mToBeRemoved = false; 1628 } 1629 } 1630 1631 for (size_t i = 0; i < kMaxStreams; ++i) { 1632 if (!mStreams[i].mNewUri.empty()) { 1633 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1634 if (j < 0) { 1635 mStreams[i].mNewUri.clear(); 1636 continue; 1637 } 1638 1639 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1640 info.mFetcher->stopAsync(); 1641 mFetcherInfos.removeItemsAt(j); 1642 mStreams[i].mNewUri.clear(); 1643 } 1644 } 1645} 1646 1647bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) { 1648 low = mid = high = false; 1649 1650 if (mSwitchInProgress || mReconfigurationInProgress) { 1651 ALOGV("Switch/Reconfig in progress, defer buffer polling"); 1652 return false; 1653 } 1654 1655 // TODO: Fine tune low/high mark. 1656 // We also need to pause playback if buffering is too low. 1657 // Currently during underflow, we depend on decoder to starve 1658 // to pause, but A/V could have different buffering left, 1659 // they're not paused together. 1660 // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE 1661 1662 // Switch down if any of the fetchers are below low mark; 1663 // Switch up if all of the fetchers are over high mark. 1664 size_t activeCount, lowCount, midCount, highCount; 1665 activeCount = lowCount = midCount = highCount = 0; 1666 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1667 // we don't check subtitles for buffering level 1668 if (!(mStreamMask & mPacketSources.keyAt(i) 1669 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { 1670 continue; 1671 } 1672 // ignore streams that never had any packet queued. 1673 // (it's possible that the variant only has audio or video) 1674 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 1675 if (meta == NULL) { 1676 continue; 1677 } 1678 1679 ++activeCount; 1680 int64_t bufferedDurationUs = 1681 mPacketSources[i]->getEstimatedDurationUs(); 1682 ALOGV("source[%d]: buffered %lld us", i, bufferedDurationUs); 1683 if (bufferedDurationUs < kLowWaterMark) { 1684 ++lowCount; 1685 break; 1686 } else if (bufferedDurationUs > kHighWaterMark) { 1687 ++midCount; 1688 ++highCount; 1689 } else if (bufferedDurationUs > kMidWaterMark) { 1690 ++midCount; 1691 } 1692 } 1693 1694 if (activeCount > 0) { 1695 high = (highCount == activeCount); 1696 mid = (midCount == activeCount); 1697 low = (lowCount > 0); 1698 return true; 1699 } 1700 1701 return false; 1702} 1703 1704void LiveSession::switchBandwidthIfNeeded(bool canSwitchUp) { 1705 ssize_t bandwidthIndex = getBandwidthIndex(); 1706 1707 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) 1708 || (!canSwitchUp && bandwidthIndex < mCurBandwidthIndex)) { 1709 changeConfiguration(-1, bandwidthIndex, false); 1710 } 1711} 1712 1713void LiveSession::postPrepared(status_t err) { 1714 CHECK(mInPreparationPhase); 1715 1716 sp<AMessage> notify = mNotify->dup(); 1717 if (err == OK || err == ERROR_END_OF_STREAM) { 1718 notify->setInt32("what", kWhatPrepared); 1719 } else { 1720 notify->setInt32("what", kWhatPreparationFailed); 1721 notify->setInt32("err", err); 1722 } 1723 1724 notify->post(); 1725 1726 mInPreparationPhase = false; 1727} 1728 1729 1730} // namespace android 1731 1732