LiveSession.cpp revision 4604458dfe57b0e91a464aefafea50ae7b9876c1
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52// static 53// Number of recently-read bytes to use for bandwidth estimation 54const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024; 55// High water mark to start up switch or report prepared) 56const int64_t LiveSession::kHighWaterMark = 8000000ll; 57const int64_t LiveSession::kMidWaterMark = 5000000ll; 58const int64_t LiveSession::kLowWaterMark = 3000000ll; 59 60LiveSession::LiveSession( 61 const sp<AMessage> ¬ify, uint32_t flags, 62 const sp<IMediaHTTPService> &httpService) 63 : mNotify(notify), 64 mFlags(flags), 65 mHTTPService(httpService), 66 mInPreparationPhase(true), 67 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 68 mCurBandwidthIndex(-1), 69 mStreamMask(0), 70 mNewStreamMask(0), 71 mSwapMask(0), 72 mCheckBandwidthGeneration(0), 73 mSwitchGeneration(0), 74 mSubtitleGeneration(0), 75 mLastDequeuedTimeUs(0ll), 76 mRealTimeBaseUs(0ll), 77 mReconfigurationInProgress(false), 78 mSwitchInProgress(false), 79 mFirstTimeUsValid(false), 80 mFirstTimeUs(0), 81 mLastSeekTimeUs(0), 82 mPollBufferingGeneration(0) { 83 84 mStreams[kAudioIndex] = StreamItem("audio"); 85 mStreams[kVideoIndex] = StreamItem("video"); 86 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 87 88 for (size_t i = 0; i < kMaxStreams; ++i) { 89 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 90 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 91 } 92 93 size_t numHistoryItems = kBandwidthHistoryBytes / 94 PlaylistFetcher::kDownloadBlockSize + 1; 95 if (numHistoryItems < 5) { 96 numHistoryItems = 5; 97 } 98 mHTTPDataSource->setBandwidthHistorySize(numHistoryItems); 99} 100 101LiveSession::~LiveSession() { 102 if (mFetcherLooper != NULL) { 103 mFetcherLooper->stop(); 104 } 105} 106 107sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 108 ABuffer *discontinuity = new ABuffer(0); 109 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 110 discontinuity->meta()->setInt32("swapPacketSource", swap); 111 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 112 discontinuity->meta()->setInt64("timeUs", -1); 113 return discontinuity; 114} 115 116void LiveSession::swapPacketSource(StreamType stream) { 117 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 118 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 119 sp<AnotherPacketSource> tmp = aps; 120 aps = aps2; 121 aps2 = tmp; 122 aps2->clear(); 123} 124 125status_t LiveSession::dequeueAccessUnit( 126 StreamType stream, sp<ABuffer> *accessUnit) { 127 if (!(mStreamMask & stream)) { 128 // return -EWOULDBLOCK to avoid halting the decoder 129 // when switching between audio/video and audio only. 130 return -EWOULDBLOCK; 131 } 132 133 status_t finalResult = OK; 134 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 135 136 ssize_t idx = typeToIndex(stream); 137 if (!packetSource->hasBufferAvailable(&finalResult)) { 138 if (finalResult == OK) { 139 return -EAGAIN; 140 } else { 141 return finalResult; 142 } 143 } 144 145 // Do not let client pull data if we don't have format yet. 146 // We might only have a format discontinuity queued without actual data. 147 // When NuPlayerDecoder dequeues the format discontinuity, it will 148 // immediately try to getFormat. If we return NULL, NuPlayerDecoder 149 // thinks it can do seamless change, so will not shutdown decoder. 150 // When the actual format arrives, it can't handle it and get stuck. 151 // TODO: We need a method to check if the packet source has any 152 // data packets available, dequeuing should only start then. 153 sp<MetaData> format = packetSource->getFormat(); 154 if (format == NULL) { 155 return -EAGAIN; 156 } 157 int32_t targetDuration = 0; 158 sp<AMessage> meta = packetSource->getLatestEnqueuedMeta(); 159 if (meta != NULL) { 160 meta->findInt32("targetDuration", &targetDuration); 161 } 162 163 int64_t targetDurationUs = targetDuration * 1000000ll; 164 if (targetDurationUs == 0 || 165 targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) { 166 // Fetchers limit buffering to 167 // min(3 * targetDuration, kMinBufferedDurationUs) 168 targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs; 169 } 170 171 // wait for counterpart 172 sp<AnotherPacketSource> otherSource; 173 uint32_t mask = mNewStreamMask & mStreamMask; 174 uint32_t fetchersMask = 0; 175 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 176 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 177 fetchersMask |= fetcherMask; 178 } 179 mask &= fetchersMask; 180 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 181 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 182 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 183 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 184 } 185 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 186 return finalResult == OK ? -EAGAIN : finalResult; 187 } 188 189 status_t err = packetSource->dequeueAccessUnit(accessUnit); 190 191 size_t streamIdx; 192 const char *streamStr; 193 switch (stream) { 194 case STREAMTYPE_AUDIO: 195 streamIdx = kAudioIndex; 196 streamStr = "audio"; 197 break; 198 case STREAMTYPE_VIDEO: 199 streamIdx = kVideoIndex; 200 streamStr = "video"; 201 break; 202 case STREAMTYPE_SUBTITLES: 203 streamIdx = kSubtitleIndex; 204 streamStr = "subs"; 205 break; 206 default: 207 TRESPASS(); 208 } 209 210 StreamItem& strm = mStreams[streamIdx]; 211 if (err == INFO_DISCONTINUITY) { 212 // adaptive streaming, discontinuities in the playlist 213 int32_t type; 214 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 215 216 sp<AMessage> extra; 217 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 218 extra.clear(); 219 } 220 221 ALOGI("[%s] read discontinuity of type %d, extra = %s", 222 streamStr, 223 type, 224 extra == NULL ? "NULL" : extra->debugString().c_str()); 225 226 int32_t swap; 227 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 228 int32_t switchGeneration; 229 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 230 { 231 Mutex::Autolock lock(mSwapMutex); 232 if (switchGeneration == mSwitchGeneration) { 233 swapPacketSource(stream); 234 sp<AMessage> msg = new AMessage(kWhatSwapped, this); 235 msg->setInt32("stream", stream); 236 msg->setInt32("switchGeneration", switchGeneration); 237 msg->post(); 238 } 239 } 240 } else { 241 size_t seq = strm.mCurDiscontinuitySeq; 242 int64_t offsetTimeUs; 243 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 244 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 245 } else { 246 offsetTimeUs = 0; 247 } 248 249 seq += 1; 250 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 251 int64_t firstTimeUs; 252 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 253 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 254 offsetTimeUs += strm.mLastSampleDurationUs; 255 } else { 256 offsetTimeUs += strm.mLastSampleDurationUs; 257 } 258 259 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 260 } 261 } else if (err == OK) { 262 263 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 264 int64_t timeUs; 265 int32_t discontinuitySeq = 0; 266 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 267 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 268 strm.mCurDiscontinuitySeq = discontinuitySeq; 269 270 int32_t discard = 0; 271 int64_t firstTimeUs; 272 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 273 int64_t durUs; // approximate sample duration 274 if (timeUs > strm.mLastDequeuedTimeUs) { 275 durUs = timeUs - strm.mLastDequeuedTimeUs; 276 } else { 277 durUs = strm.mLastDequeuedTimeUs - timeUs; 278 } 279 strm.mLastSampleDurationUs = durUs; 280 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 281 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 282 firstTimeUs = timeUs; 283 } else { 284 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 285 firstTimeUs = timeUs; 286 } 287 288 strm.mLastDequeuedTimeUs = timeUs; 289 if (timeUs >= firstTimeUs) { 290 timeUs -= firstTimeUs; 291 } else { 292 timeUs = 0; 293 } 294 timeUs += mLastSeekTimeUs; 295 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 296 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 297 } 298 299 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 300 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 301 mLastDequeuedTimeUs = timeUs; 302 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 303 } else if (stream == STREAMTYPE_SUBTITLES) { 304 int32_t subtitleGeneration; 305 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 306 && subtitleGeneration != mSubtitleGeneration) { 307 return -EAGAIN; 308 }; 309 (*accessUnit)->meta()->setInt32( 310 "trackIndex", mPlaylist->getSelectedIndex()); 311 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 312 } 313 } else { 314 ALOGI("[%s] encountered error %d", streamStr, err); 315 } 316 317 return err; 318} 319 320status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 321 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 322 if (!(mStreamMask & stream)) { 323 return UNKNOWN_ERROR; 324 } 325 326 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 327 328 sp<MetaData> meta = packetSource->getFormat(); 329 330 if (meta == NULL) { 331 return -EAGAIN; 332 } 333 334 return convertMetaDataToMessage(meta, format); 335} 336 337void LiveSession::connectAsync( 338 const char *url, const KeyedVector<String8, String8> *headers) { 339 sp<AMessage> msg = new AMessage(kWhatConnect, this); 340 msg->setString("url", url); 341 342 if (headers != NULL) { 343 msg->setPointer( 344 "headers", 345 new KeyedVector<String8, String8>(*headers)); 346 } 347 348 msg->post(); 349} 350 351status_t LiveSession::disconnect() { 352 sp<AMessage> msg = new AMessage(kWhatDisconnect, this); 353 354 sp<AMessage> response; 355 status_t err = msg->postAndAwaitResponse(&response); 356 357 return err; 358} 359 360status_t LiveSession::seekTo(int64_t timeUs) { 361 sp<AMessage> msg = new AMessage(kWhatSeek, this); 362 msg->setInt64("timeUs", timeUs); 363 364 sp<AMessage> response; 365 status_t err = msg->postAndAwaitResponse(&response); 366 367 return err; 368} 369 370void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 371 switch (msg->what()) { 372 case kWhatConnect: 373 { 374 onConnect(msg); 375 break; 376 } 377 378 case kWhatDisconnect: 379 { 380 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 381 382 if (mReconfigurationInProgress) { 383 break; 384 } 385 386 finishDisconnect(); 387 break; 388 } 389 390 case kWhatSeek: 391 { 392 sp<AReplyToken> seekReplyID; 393 CHECK(msg->senderAwaitsResponse(&seekReplyID)); 394 mSeekReplyID = seekReplyID; 395 mSeekReply = new AMessage; 396 397 status_t err = onSeek(msg); 398 399 if (err != OK) { 400 msg->post(50000); 401 } 402 break; 403 } 404 405 case kWhatFetcherNotify: 406 { 407 int32_t what; 408 CHECK(msg->findInt32("what", &what)); 409 410 switch (what) { 411 case PlaylistFetcher::kWhatStarted: 412 break; 413 case PlaylistFetcher::kWhatPaused: 414 case PlaylistFetcher::kWhatStopped: 415 { 416 if (what == PlaylistFetcher::kWhatStopped) { 417 AString uri; 418 CHECK(msg->findString("uri", &uri)); 419 ssize_t index = mFetcherInfos.indexOfKey(uri); 420 if (index < 0) { 421 // ignore duplicated kWhatStopped messages. 422 break; 423 } 424 425 mFetcherLooper->unregisterHandler( 426 mFetcherInfos[index].mFetcher->id()); 427 mFetcherInfos.removeItemsAt(index); 428 429 if (mSwitchInProgress) { 430 tryToFinishBandwidthSwitch(); 431 } 432 } 433 434 if (mContinuation != NULL) { 435 CHECK_GT(mContinuationCounter, 0); 436 if (--mContinuationCounter == 0) { 437 mContinuation->post(); 438 } 439 } 440 break; 441 } 442 443 case PlaylistFetcher::kWhatDurationUpdate: 444 { 445 AString uri; 446 CHECK(msg->findString("uri", &uri)); 447 448 int64_t durationUs; 449 CHECK(msg->findInt64("durationUs", &durationUs)); 450 451 ssize_t index = mFetcherInfos.indexOfKey(uri); 452 if (index >= 0) { 453 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 454 info->mDurationUs = durationUs; 455 } 456 break; 457 } 458 459 case PlaylistFetcher::kWhatError: 460 { 461 status_t err; 462 CHECK(msg->findInt32("err", &err)); 463 464 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 465 466 // handle EOS on subtitle tracks independently 467 AString uri; 468 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 469 ssize_t i = mFetcherInfos.indexOfKey(uri); 470 if (i >= 0) { 471 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 472 if (fetcher != NULL) { 473 uint32_t type = fetcher->getStreamTypeMask(); 474 if (type == STREAMTYPE_SUBTITLES) { 475 mPacketSources.valueFor( 476 STREAMTYPE_SUBTITLES)->signalEOS(err);; 477 break; 478 } 479 } 480 } 481 } 482 483 if (mInPreparationPhase) { 484 postPrepared(err); 485 } 486 487 cancelBandwidthSwitch(); 488 489 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 490 491 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 492 493 mPacketSources.valueFor( 494 STREAMTYPE_SUBTITLES)->signalEOS(err); 495 496 sp<AMessage> notify = mNotify->dup(); 497 notify->setInt32("what", kWhatError); 498 notify->setInt32("err", err); 499 notify->post(); 500 break; 501 } 502 503 case PlaylistFetcher::kWhatStartedAt: 504 { 505 int32_t switchGeneration; 506 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 507 508 if (switchGeneration != mSwitchGeneration) { 509 break; 510 } 511 512 // Resume fetcher for the original variant; the resumed fetcher should 513 // continue until the timestamps found in msg, which is stored by the 514 // new fetcher to indicate where the new variant has started buffering. 515 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 516 const FetcherInfo info = mFetcherInfos.valueAt(i); 517 if (info.mToBeRemoved) { 518 info.mFetcher->resumeUntilAsync(msg); 519 } 520 } 521 break; 522 } 523 524 default: 525 TRESPASS(); 526 } 527 528 break; 529 } 530 531 case kWhatChangeConfiguration: 532 { 533 onChangeConfiguration(msg); 534 break; 535 } 536 537 case kWhatChangeConfiguration2: 538 { 539 onChangeConfiguration2(msg); 540 break; 541 } 542 543 case kWhatChangeConfiguration3: 544 { 545 onChangeConfiguration3(msg); 546 break; 547 } 548 549 case kWhatFinishDisconnect2: 550 { 551 onFinishDisconnect2(); 552 break; 553 } 554 555 case kWhatSwapped: 556 { 557 onSwapped(msg); 558 break; 559 } 560 561 case kWhatPollBuffering: 562 { 563 int32_t generation; 564 CHECK(msg->findInt32("generation", &generation)); 565 if (generation == mPollBufferingGeneration) { 566 onPollBuffering(); 567 } 568 break; 569 } 570 571 default: 572 TRESPASS(); 573 break; 574 } 575} 576 577// static 578int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 579 if (a->mBandwidth < b->mBandwidth) { 580 return -1; 581 } else if (a->mBandwidth == b->mBandwidth) { 582 return 0; 583 } 584 585 return 1; 586} 587 588// static 589LiveSession::StreamType LiveSession::indexToType(int idx) { 590 CHECK(idx >= 0 && idx < kMaxStreams); 591 return (StreamType)(1 << idx); 592} 593 594// static 595ssize_t LiveSession::typeToIndex(int32_t type) { 596 switch (type) { 597 case STREAMTYPE_AUDIO: 598 return 0; 599 case STREAMTYPE_VIDEO: 600 return 1; 601 case STREAMTYPE_SUBTITLES: 602 return 2; 603 default: 604 return -1; 605 }; 606 return -1; 607} 608 609void LiveSession::onConnect(const sp<AMessage> &msg) { 610 AString url; 611 CHECK(msg->findString("url", &url)); 612 613 KeyedVector<String8, String8> *headers = NULL; 614 if (!msg->findPointer("headers", (void **)&headers)) { 615 mExtraHeaders.clear(); 616 } else { 617 mExtraHeaders = *headers; 618 619 delete headers; 620 headers = NULL; 621 } 622 623 // TODO currently we don't know if we are coming here from incognito mode 624 ALOGI("onConnect %s", uriDebugString(url).c_str()); 625 626 mMasterURL = url; 627 628 bool dummy; 629 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 630 631 if (mPlaylist == NULL) { 632 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 633 634 postPrepared(ERROR_IO); 635 return; 636 } 637 638 // create looper for fetchers 639 if (mFetcherLooper == NULL) { 640 mFetcherLooper = new ALooper(); 641 642 mFetcherLooper->setName("Fetcher"); 643 mFetcherLooper->start(false, false); 644 } 645 646 // We trust the content provider to make a reasonable choice of preferred 647 // initial bandwidth by listing it first in the variant playlist. 648 // At startup we really don't have a good estimate on the available 649 // network bandwidth since we haven't tranferred any data yet. Once 650 // we have we can make a better informed choice. 651 size_t initialBandwidth = 0; 652 size_t initialBandwidthIndex = 0; 653 654 if (mPlaylist->isVariantPlaylist()) { 655 for (size_t i = 0; i < mPlaylist->size(); ++i) { 656 BandwidthItem item; 657 658 item.mPlaylistIndex = i; 659 660 sp<AMessage> meta; 661 AString uri; 662 mPlaylist->itemAt(i, &uri, &meta); 663 664 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 665 666 if (initialBandwidth == 0) { 667 initialBandwidth = item.mBandwidth; 668 } 669 670 mBandwidthItems.push(item); 671 } 672 673 CHECK_GT(mBandwidthItems.size(), 0u); 674 675 mBandwidthItems.sort(SortByBandwidth); 676 677 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 678 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 679 initialBandwidthIndex = i; 680 break; 681 } 682 } 683 } else { 684 // dummy item. 685 BandwidthItem item; 686 item.mPlaylistIndex = 0; 687 item.mBandwidth = 0; 688 mBandwidthItems.push(item); 689 } 690 691 mPlaylist->pickRandomMediaItems(); 692 changeConfiguration( 693 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 694 695 schedulePollBuffering(); 696} 697 698void LiveSession::finishDisconnect() { 699 // No reconfiguration is currently pending, make sure none will trigger 700 // during disconnection either. 701 702 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 703 // (finishDisconnect, onFinishDisconnect2) 704 cancelBandwidthSwitch(); 705 706 // cancel buffer polling 707 cancelPollBuffering(); 708 709 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 710 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 711 } 712 713 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this); 714 715 mContinuationCounter = mFetcherInfos.size(); 716 mContinuation = msg; 717 718 if (mContinuationCounter == 0) { 719 msg->post(); 720 } 721} 722 723void LiveSession::onFinishDisconnect2() { 724 mContinuation.clear(); 725 726 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 727 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 728 729 mPacketSources.valueFor( 730 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 731 732 sp<AMessage> response = new AMessage; 733 response->setInt32("err", OK); 734 735 response->postReply(mDisconnectReplyID); 736 mDisconnectReplyID.clear(); 737} 738 739sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 740 ssize_t index = mFetcherInfos.indexOfKey(uri); 741 742 if (index >= 0) { 743 return NULL; 744 } 745 746 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, this); 747 notify->setString("uri", uri); 748 notify->setInt32("switchGeneration", mSwitchGeneration); 749 750 FetcherInfo info; 751 info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration); 752 info.mDurationUs = -1ll; 753 info.mIsPrepared = false; 754 info.mToBeRemoved = false; 755 mFetcherLooper->registerHandler(info.mFetcher); 756 757 mFetcherInfos.add(uri, info); 758 759 return info.mFetcher; 760} 761 762/* 763 * Illustration of parameters: 764 * 765 * 0 `range_offset` 766 * +------------+-------------------------------------------------------+--+--+ 767 * | | | next block to fetch | | | 768 * | | `source` handle => `out` buffer | | | | 769 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 770 * | |<----------- `range_length` / buffer capacity ----------->| | 771 * |<------------------------------ file_size ------------------------------->| 772 * 773 * Special parameter values: 774 * - range_length == -1 means entire file 775 * - block_size == 0 means entire range 776 * 777 */ 778ssize_t LiveSession::fetchFile( 779 const char *url, sp<ABuffer> *out, 780 int64_t range_offset, int64_t range_length, 781 uint32_t block_size, /* download block size */ 782 sp<DataSource> *source, /* to return and reuse source */ 783 String8 *actualUrl) { 784 off64_t size; 785 sp<DataSource> temp_source; 786 if (source == NULL) { 787 source = &temp_source; 788 } 789 790 if (*source == NULL) { 791 if (!strncasecmp(url, "file://", 7)) { 792 *source = new FileSource(url + 7); 793 } else if (strncasecmp(url, "http://", 7) 794 && strncasecmp(url, "https://", 8)) { 795 return ERROR_UNSUPPORTED; 796 } else { 797 KeyedVector<String8, String8> headers = mExtraHeaders; 798 if (range_offset > 0 || range_length >= 0) { 799 headers.add( 800 String8("Range"), 801 String8( 802 AStringPrintf( 803 "bytes=%lld-%s", 804 range_offset, 805 range_length < 0 806 ? "" : AStringPrintf("%lld", 807 range_offset + range_length - 1).c_str()).c_str())); 808 } 809 status_t err = mHTTPDataSource->connect(url, &headers); 810 811 if (err != OK) { 812 return err; 813 } 814 815 *source = mHTTPDataSource; 816 } 817 } 818 819 status_t getSizeErr = (*source)->getSize(&size); 820 if (getSizeErr != OK) { 821 size = 65536; 822 } 823 824 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 825 if (*out == NULL) { 826 buffer->setRange(0, 0); 827 } 828 829 ssize_t bytesRead = 0; 830 // adjust range_length if only reading partial block 831 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 832 range_length = buffer->size() + block_size; 833 } 834 for (;;) { 835 // Only resize when we don't know the size. 836 size_t bufferRemaining = buffer->capacity() - buffer->size(); 837 if (bufferRemaining == 0 && getSizeErr != OK) { 838 size_t bufferIncrement = buffer->size() / 2; 839 if (bufferIncrement < 32768) { 840 bufferIncrement = 32768; 841 } 842 bufferRemaining = bufferIncrement; 843 844 ALOGV("increasing download buffer to %zu bytes", 845 buffer->size() + bufferRemaining); 846 847 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 848 memcpy(copy->data(), buffer->data(), buffer->size()); 849 copy->setRange(0, buffer->size()); 850 851 buffer = copy; 852 } 853 854 size_t maxBytesToRead = bufferRemaining; 855 if (range_length >= 0) { 856 int64_t bytesLeftInRange = range_length - buffer->size(); 857 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 858 maxBytesToRead = bytesLeftInRange; 859 860 if (bytesLeftInRange == 0) { 861 break; 862 } 863 } 864 } 865 866 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 867 // to help us break out of the loop. 868 ssize_t n = (*source)->readAt( 869 buffer->size(), buffer->data() + buffer->size(), 870 maxBytesToRead); 871 872 if (n < 0) { 873 return n; 874 } 875 876 if (n == 0) { 877 break; 878 } 879 880 buffer->setRange(0, buffer->size() + (size_t)n); 881 bytesRead += n; 882 } 883 884 *out = buffer; 885 if (actualUrl != NULL) { 886 *actualUrl = (*source)->getUri(); 887 if (actualUrl->isEmpty()) { 888 *actualUrl = url; 889 } 890 } 891 892 return bytesRead; 893} 894 895sp<M3UParser> LiveSession::fetchPlaylist( 896 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 897 ALOGV("fetchPlaylist '%s'", url); 898 899 *unchanged = false; 900 901 sp<ABuffer> buffer; 902 String8 actualUrl; 903 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 904 905 if (err <= 0) { 906 return NULL; 907 } 908 909 // MD5 functionality is not available on the simulator, treat all 910 // playlists as changed. 911 912#if defined(HAVE_ANDROID_OS) 913 uint8_t hash[16]; 914 915 MD5_CTX m; 916 MD5_Init(&m); 917 MD5_Update(&m, buffer->data(), buffer->size()); 918 919 MD5_Final(hash, &m); 920 921 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 922 // playlist unchanged 923 *unchanged = true; 924 925 return NULL; 926 } 927 928 if (curPlaylistHash != NULL) { 929 memcpy(curPlaylistHash, hash, sizeof(hash)); 930 } 931#endif 932 933 sp<M3UParser> playlist = 934 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 935 936 if (playlist->initCheck() != OK) { 937 ALOGE("failed to parse .m3u8 playlist"); 938 939 return NULL; 940 } 941 942 return playlist; 943} 944 945#if 0 946static double uniformRand() { 947 return (double)rand() / RAND_MAX; 948} 949#endif 950 951size_t LiveSession::getBandwidthIndex() { 952 if (mBandwidthItems.size() == 0) { 953 return 0; 954 } 955 956#if 1 957 char value[PROPERTY_VALUE_MAX]; 958 ssize_t index = -1; 959 if (property_get("media.httplive.bw-index", value, NULL)) { 960 char *end; 961 index = strtol(value, &end, 10); 962 CHECK(end > value && *end == '\0'); 963 964 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 965 index = mBandwidthItems.size() - 1; 966 } 967 } 968 969 if (index < 0) { 970 int32_t bandwidthBps; 971 if (mHTTPDataSource != NULL 972 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 973 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 974 } else { 975 ALOGV("no bandwidth estimate."); 976 return 0; // Pick the lowest bandwidth stream by default. 977 } 978 979 char value[PROPERTY_VALUE_MAX]; 980 if (property_get("media.httplive.max-bw", value, NULL)) { 981 char *end; 982 long maxBw = strtoul(value, &end, 10); 983 if (end > value && *end == '\0') { 984 if (maxBw > 0 && bandwidthBps > maxBw) { 985 ALOGV("bandwidth capped to %ld bps", maxBw); 986 bandwidthBps = maxBw; 987 } 988 } 989 } 990 991 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 992 993 index = mBandwidthItems.size() - 1; 994 while (index > 0) { 995 // consider only 80% of the available bandwidth, but if we are switching up, 996 // be even more conservative (70%) to avoid overestimating and immediately 997 // switching back. 998 size_t adjustedBandwidthBps = bandwidthBps; 999 if (index > mCurBandwidthIndex) { 1000 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 1001 } else { 1002 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 1003 } 1004 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1005 break; 1006 } 1007 --index; 1008 } 1009 } 1010#elif 0 1011 // Change bandwidth at random() 1012 size_t index = uniformRand() * mBandwidthItems.size(); 1013#elif 0 1014 // There's a 50% chance to stay on the current bandwidth and 1015 // a 50% chance to switch to the next higher bandwidth (wrapping around 1016 // to lowest) 1017 const size_t kMinIndex = 0; 1018 1019 static ssize_t mCurBandwidthIndex = -1; 1020 1021 size_t index; 1022 if (mCurBandwidthIndex < 0) { 1023 index = kMinIndex; 1024 } else if (uniformRand() < 0.5) { 1025 index = (size_t)mCurBandwidthIndex; 1026 } else { 1027 index = mCurBandwidthIndex + 1; 1028 if (index == mBandwidthItems.size()) { 1029 index = kMinIndex; 1030 } 1031 } 1032 mCurBandwidthIndex = index; 1033#elif 0 1034 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1035 1036 size_t index = mBandwidthItems.size() - 1; 1037 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1038 --index; 1039 } 1040#elif 1 1041 char value[PROPERTY_VALUE_MAX]; 1042 size_t index; 1043 if (property_get("media.httplive.bw-index", value, NULL)) { 1044 char *end; 1045 index = strtoul(value, &end, 10); 1046 CHECK(end > value && *end == '\0'); 1047 1048 if (index >= mBandwidthItems.size()) { 1049 index = mBandwidthItems.size() - 1; 1050 } 1051 } else { 1052 index = 0; 1053 } 1054#else 1055 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1056#endif 1057 1058 CHECK_GE(index, 0); 1059 1060 return index; 1061} 1062 1063int64_t LiveSession::latestMediaSegmentStartTimeUs() { 1064 sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta(); 1065 int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1; 1066 if (audioMeta != NULL) { 1067 audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs); 1068 } 1069 1070 sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta(); 1071 if (videoMeta != NULL 1072 && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) { 1073 if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) { 1074 minSegmentStartTimeUs = videoSegmentStartTimeUs; 1075 } 1076 1077 } 1078 return minSegmentStartTimeUs; 1079} 1080 1081status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1082 int64_t timeUs; 1083 CHECK(msg->findInt64("timeUs", &timeUs)); 1084 1085 if (!mReconfigurationInProgress) { 1086 changeConfiguration(timeUs, mCurBandwidthIndex); 1087 return OK; 1088 } else { 1089 return -EWOULDBLOCK; 1090 } 1091} 1092 1093status_t LiveSession::getDuration(int64_t *durationUs) const { 1094 int64_t maxDurationUs = -1ll; 1095 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1096 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1097 1098 if (fetcherDurationUs > maxDurationUs) { 1099 maxDurationUs = fetcherDurationUs; 1100 } 1101 } 1102 1103 *durationUs = maxDurationUs; 1104 1105 return OK; 1106} 1107 1108bool LiveSession::isSeekable() const { 1109 int64_t durationUs; 1110 return getDuration(&durationUs) == OK && durationUs >= 0; 1111} 1112 1113bool LiveSession::hasDynamicDuration() const { 1114 return false; 1115} 1116 1117size_t LiveSession::getTrackCount() const { 1118 if (mPlaylist == NULL) { 1119 return 0; 1120 } else { 1121 return mPlaylist->getTrackCount(); 1122 } 1123} 1124 1125sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1126 if (mPlaylist == NULL) { 1127 return NULL; 1128 } else { 1129 return mPlaylist->getTrackInfo(trackIndex); 1130 } 1131} 1132 1133status_t LiveSession::selectTrack(size_t index, bool select) { 1134 if (mPlaylist == NULL) { 1135 return INVALID_OPERATION; 1136 } 1137 1138 ++mSubtitleGeneration; 1139 status_t err = mPlaylist->selectTrack(index, select); 1140 if (err == OK) { 1141 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, this); 1142 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1143 msg->setInt32("pickTrack", select); 1144 msg->post(); 1145 } 1146 return err; 1147} 1148 1149ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1150 if (mPlaylist == NULL) { 1151 return -1; 1152 } else { 1153 return mPlaylist->getSelectedTrack(type); 1154 } 1155} 1156 1157void LiveSession::changeConfiguration( 1158 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1159 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1160 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1161 cancelBandwidthSwitch(); 1162 1163 CHECK(!mReconfigurationInProgress); 1164 mReconfigurationInProgress = true; 1165 1166 mCurBandwidthIndex = bandwidthIndex; 1167 1168 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1169 timeUs, bandwidthIndex, pickTrack); 1170 1171 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1172 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1173 1174 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1175 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1176 1177 AString URIs[kMaxStreams]; 1178 for (size_t i = 0; i < kMaxStreams; ++i) { 1179 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1180 streamMask |= indexToType(i); 1181 } 1182 } 1183 1184 // Step 1, stop and discard fetchers that are no longer needed. 1185 // Pause those that we'll reuse. 1186 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1187 const AString &uri = mFetcherInfos.keyAt(i); 1188 1189 bool discardFetcher = true; 1190 1191 if (timeUs < 0ll) { 1192 // delay fetcher removal if not picking tracks 1193 discardFetcher = pickTrack; 1194 1195 } 1196 1197 for (size_t j = 0; j < kMaxStreams; ++j) { 1198 StreamType type = indexToType(j); 1199 if ((streamMask & type) && uri == URIs[j]) { 1200 resumeMask |= type; 1201 streamMask &= ~type; 1202 discardFetcher = false; 1203 } 1204 } 1205 1206 if (discardFetcher) { 1207 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1208 } else { 1209 // if we're seeking, pause immediately (no need to finish the segment) 1210 bool immediate = (timeUs >= 0ll); 1211 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(immediate); 1212 } 1213 } 1214 1215 sp<AMessage> msg; 1216 if (timeUs < 0ll) { 1217 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1218 msg = new AMessage(kWhatChangeConfiguration3, this); 1219 } else { 1220 msg = new AMessage(kWhatChangeConfiguration2, this); 1221 } 1222 msg->setInt32("streamMask", streamMask); 1223 msg->setInt32("resumeMask", resumeMask); 1224 msg->setInt32("pickTrack", pickTrack); 1225 msg->setInt64("timeUs", timeUs); 1226 for (size_t i = 0; i < kMaxStreams; ++i) { 1227 if ((streamMask | resumeMask) & indexToType(i)) { 1228 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1229 } 1230 } 1231 1232 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1233 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1234 // fetchers have completed their asynchronous operation, we'll post 1235 // mContinuation, which then is handled below in onChangeConfiguration2. 1236 mContinuationCounter = mFetcherInfos.size(); 1237 mContinuation = msg; 1238 1239 if (mContinuationCounter == 0) { 1240 msg->post(); 1241 } 1242} 1243 1244void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1245 if (!mReconfigurationInProgress) { 1246 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1247 msg->findInt32("pickTrack", &pickTrack); 1248 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1249 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1250 } else { 1251 msg->post(1000000ll); // retry in 1 sec 1252 } 1253} 1254 1255void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1256 mContinuation.clear(); 1257 1258 // All fetchers are either suspended or have been removed now. 1259 1260 // If we're seeking, clear all packet sources before we report 1261 // seek complete, to prevent decoder from pulling stale data. 1262 int64_t timeUs; 1263 CHECK(msg->findInt64("timeUs", &timeUs)); 1264 1265 if (timeUs >= 0) { 1266 mLastSeekTimeUs = timeUs; 1267 1268 for (size_t i = 0; i < mPacketSources.size(); i++) { 1269 mPacketSources.editValueAt(i)->clear(); 1270 } 1271 1272 mDiscontinuityOffsetTimesUs.clear(); 1273 mDiscontinuityAbsStartTimesUs.clear(); 1274 1275 if (mSeekReplyID != NULL) { 1276 CHECK(mSeekReply != NULL); 1277 mSeekReply->setInt32("err", OK); 1278 mSeekReply->postReply(mSeekReplyID); 1279 mSeekReplyID.clear(); 1280 mSeekReply.clear(); 1281 } 1282 } 1283 1284 uint32_t streamMask, resumeMask; 1285 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1286 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1287 1288 streamMask |= resumeMask; 1289 1290 AString URIs[kMaxStreams]; 1291 for (size_t i = 0; i < kMaxStreams; ++i) { 1292 if (streamMask & indexToType(i)) { 1293 const AString &uriKey = mStreams[i].uriKey(); 1294 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1295 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1296 } 1297 } 1298 1299 uint32_t changedMask = 0; 1300 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1301 // stream URI could change even if onChangeConfiguration2 is only 1302 // used for seek. Seek could happen during a bw switch, in this 1303 // case bw switch will be cancelled, but the seekTo position will 1304 // fetch from the new URI. 1305 if ((mStreamMask & streamMask & indexToType(i)) 1306 && !mStreams[i].mUri.empty() 1307 && !(URIs[i] == mStreams[i].mUri)) { 1308 ALOGV("stream %zu changed: oldURI %s, newURI %s", i, 1309 mStreams[i].mUri.c_str(), URIs[i].c_str()); 1310 sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i)); 1311 source->queueDiscontinuity( 1312 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1313 } 1314 // Determine which decoders to shutdown on the player side, 1315 // a decoder has to be shutdown if its streamtype was active 1316 // before but now longer isn't. 1317 if ((mStreamMask & ~streamMask & indexToType(i))) { 1318 changedMask |= indexToType(i); 1319 } 1320 } 1321 1322 if (changedMask == 0) { 1323 // If nothing changed as far as the audio/video decoders 1324 // are concerned we can proceed. 1325 onChangeConfiguration3(msg); 1326 return; 1327 } 1328 1329 // Something changed, inform the player which will shutdown the 1330 // corresponding decoders and will post the reply once that's done. 1331 // Handling the reply will continue executing below in 1332 // onChangeConfiguration3. 1333 sp<AMessage> notify = mNotify->dup(); 1334 notify->setInt32("what", kWhatStreamsChanged); 1335 notify->setInt32("changedMask", changedMask); 1336 1337 msg->setWhat(kWhatChangeConfiguration3); 1338 msg->setTarget(this); 1339 1340 notify->setMessage("reply", msg); 1341 notify->post(); 1342} 1343 1344void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1345 mContinuation.clear(); 1346 // All remaining fetchers are still suspended, the player has shutdown 1347 // any decoders that needed it. 1348 1349 uint32_t streamMask, resumeMask; 1350 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1351 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1352 1353 int64_t timeUs; 1354 int32_t pickTrack; 1355 bool switching = false; 1356 CHECK(msg->findInt64("timeUs", &timeUs)); 1357 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1358 1359 if (timeUs < 0ll) { 1360 if (!pickTrack) { 1361 switching = true; 1362 } 1363 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1364 } else { 1365 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1366 } 1367 1368 for (size_t i = 0; i < kMaxStreams; ++i) { 1369 if (streamMask & indexToType(i)) { 1370 if (switching) { 1371 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1372 } else { 1373 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1374 } 1375 } 1376 } 1377 1378 mNewStreamMask = streamMask | resumeMask; 1379 if (switching) { 1380 mSwapMask = mStreamMask & ~resumeMask; 1381 } 1382 1383 // Of all existing fetchers: 1384 // * Resume fetchers that are still needed and assign them original packet sources. 1385 // * Mark otherwise unneeded fetchers for removal. 1386 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1387 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1388 const AString &uri = mFetcherInfos.keyAt(i); 1389 1390 sp<AnotherPacketSource> sources[kMaxStreams]; 1391 for (size_t j = 0; j < kMaxStreams; ++j) { 1392 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1393 sources[j] = mPacketSources.valueFor(indexToType(j)); 1394 } 1395 } 1396 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1397 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1398 || sources[kSubtitleIndex] != NULL) { 1399 info.mFetcher->startAsync( 1400 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], timeUs); 1401 } else { 1402 info.mToBeRemoved = true; 1403 } 1404 } 1405 1406 // streamMask now only contains the types that need a new fetcher created. 1407 1408 if (streamMask != 0) { 1409 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1410 } 1411 1412 // Find out when the original fetchers have buffered up to and start the new fetchers 1413 // at a later timestamp. 1414 for (size_t i = 0; i < kMaxStreams; i++) { 1415 if (!(indexToType(i) & streamMask)) { 1416 continue; 1417 } 1418 1419 AString uri; 1420 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1421 1422 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1423 CHECK(fetcher != NULL); 1424 1425 int64_t startTimeUs = -1; 1426 int64_t segmentStartTimeUs = -1ll; 1427 int32_t discontinuitySeq = -1; 1428 sp<AnotherPacketSource> sources[kMaxStreams]; 1429 1430 if (i == kSubtitleIndex) { 1431 segmentStartTimeUs = latestMediaSegmentStartTimeUs(); 1432 } 1433 1434 // TRICKY: looping from i as earlier streams are already removed from streamMask 1435 for (size_t j = i; j < kMaxStreams; ++j) { 1436 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1437 if ((streamMask & indexToType(j)) && uri == streamUri) { 1438 sources[j] = mPacketSources.valueFor(indexToType(j)); 1439 1440 if (timeUs >= 0) { 1441 startTimeUs = timeUs; 1442 } else { 1443 int32_t type; 1444 sp<AMessage> meta; 1445 if (pickTrack) { 1446 // selecting 1447 meta = sources[j]->getLatestDequeuedMeta(); 1448 } else { 1449 // adapting 1450 meta = sources[j]->getLatestEnqueuedMeta(); 1451 } 1452 1453 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1454 int64_t tmpUs; 1455 int64_t tmpSegmentUs; 1456 1457 CHECK(meta->findInt64("timeUs", &tmpUs)); 1458 CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs)); 1459 if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) { 1460 startTimeUs = tmpUs; 1461 segmentStartTimeUs = tmpSegmentUs; 1462 } else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) { 1463 startTimeUs = tmpUs; 1464 } 1465 1466 int32_t seq; 1467 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1468 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1469 discontinuitySeq = seq; 1470 } 1471 } 1472 1473 if (pickTrack) { 1474 // selecting track, queue discontinuities before content 1475 sources[j]->clear(); 1476 if (j == kSubtitleIndex) { 1477 break; 1478 } 1479 1480 ALOGV("stream[%zu]: queue format change", j); 1481 1482 sources[j]->queueDiscontinuity( 1483 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1484 } else { 1485 // adapting, queue discontinuities after resume 1486 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1487 sources[j]->clear(); 1488 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1489 if (extraStreams & indexToType(j)) { 1490 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1491 } 1492 } 1493 } 1494 1495 streamMask &= ~indexToType(j); 1496 } 1497 } 1498 1499 fetcher->startAsync( 1500 sources[kAudioIndex], 1501 sources[kVideoIndex], 1502 sources[kSubtitleIndex], 1503 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1504 segmentStartTimeUs, 1505 discontinuitySeq, 1506 switching); 1507 } 1508 1509 // All fetchers have now been started, the configuration change 1510 // has completed. 1511 1512 ALOGV("XXX configuration change completed."); 1513 mReconfigurationInProgress = false; 1514 if (switching) { 1515 mSwitchInProgress = true; 1516 } else { 1517 mStreamMask = mNewStreamMask; 1518 } 1519 1520 if (mDisconnectReplyID != NULL) { 1521 finishDisconnect(); 1522 } 1523} 1524 1525void LiveSession::onSwapped(const sp<AMessage> &msg) { 1526 int32_t switchGeneration; 1527 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1528 if (switchGeneration != mSwitchGeneration) { 1529 return; 1530 } 1531 1532 int32_t stream; 1533 CHECK(msg->findInt32("stream", &stream)); 1534 1535 ssize_t idx = typeToIndex(stream); 1536 CHECK(idx >= 0); 1537 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1538 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1539 } 1540 mStreams[idx].mUri = mStreams[idx].mNewUri; 1541 mStreams[idx].mNewUri.clear(); 1542 1543 mSwapMask &= ~stream; 1544 if (mSwapMask != 0) { 1545 return; 1546 } 1547 1548 // Check if new variant contains extra streams. 1549 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1550 while (extraStreams) { 1551 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1552 swapPacketSource(extraStream); 1553 extraStreams &= ~extraStream; 1554 1555 idx = typeToIndex(extraStream); 1556 CHECK(idx >= 0); 1557 if (mStreams[idx].mNewUri.empty()) { 1558 ALOGW("swapping extra stream type %d %s to empty stream", 1559 extraStream, mStreams[idx].mUri.c_str()); 1560 } 1561 mStreams[idx].mUri = mStreams[idx].mNewUri; 1562 mStreams[idx].mNewUri.clear(); 1563 } 1564 1565 tryToFinishBandwidthSwitch(); 1566} 1567 1568void LiveSession::schedulePollBuffering() { 1569 sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); 1570 msg->setInt32("generation", mPollBufferingGeneration); 1571 msg->post(1000000ll); 1572} 1573 1574void LiveSession::cancelPollBuffering() { 1575 ++mPollBufferingGeneration; 1576} 1577 1578void LiveSession::onPollBuffering() { 1579 ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " 1580 "mInPreparationPhase %d, mStreamMask 0x%x", 1581 mSwitchInProgress, mReconfigurationInProgress, 1582 mInPreparationPhase, mStreamMask); 1583 1584 bool low, mid, high; 1585 if (checkBuffering(low, mid, high)) { 1586 if (mInPreparationPhase && mid) { 1587 postPrepared(OK); 1588 } 1589 1590 // don't switch before we report prepared 1591 if (!mInPreparationPhase && (low || high)) { 1592 switchBandwidthIfNeeded(high); 1593 } 1594 } 1595 1596 schedulePollBuffering(); 1597} 1598 1599// Mark switch done when: 1600// 1. all old buffers are swapped out 1601void LiveSession::tryToFinishBandwidthSwitch() { 1602 if (!mSwitchInProgress) { 1603 return; 1604 } 1605 1606 bool needToRemoveFetchers = false; 1607 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1608 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1609 needToRemoveFetchers = true; 1610 break; 1611 } 1612 } 1613 1614 if (!needToRemoveFetchers && mSwapMask == 0) { 1615 ALOGI("mSwitchInProgress = false"); 1616 mStreamMask = mNewStreamMask; 1617 mSwitchInProgress = false; 1618 } 1619} 1620 1621void LiveSession::cancelBandwidthSwitch() { 1622 Mutex::Autolock lock(mSwapMutex); 1623 mSwitchGeneration++; 1624 mSwitchInProgress = false; 1625 mSwapMask = 0; 1626 1627 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1628 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1629 if (info.mToBeRemoved) { 1630 info.mToBeRemoved = false; 1631 } 1632 } 1633 1634 for (size_t i = 0; i < kMaxStreams; ++i) { 1635 if (!mStreams[i].mNewUri.empty()) { 1636 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1637 if (j < 0) { 1638 mStreams[i].mNewUri.clear(); 1639 continue; 1640 } 1641 1642 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1643 info.mFetcher->stopAsync(); 1644 mFetcherInfos.removeItemsAt(j); 1645 mStreams[i].mNewUri.clear(); 1646 } 1647 } 1648} 1649 1650bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) { 1651 low = mid = high = false; 1652 1653 if (mSwitchInProgress || mReconfigurationInProgress) { 1654 ALOGV("Switch/Reconfig in progress, defer buffer polling"); 1655 return false; 1656 } 1657 1658 // TODO: Fine tune low/high mark. 1659 // We also need to pause playback if buffering is too low. 1660 // Currently during underflow, we depend on decoder to starve 1661 // to pause, but A/V could have different buffering left, 1662 // they're not paused together. 1663 // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE 1664 1665 // Switch down if any of the fetchers are below low mark; 1666 // Switch up if all of the fetchers are over high mark. 1667 size_t activeCount, lowCount, midCount, highCount; 1668 activeCount = lowCount = midCount = highCount = 0; 1669 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1670 // we don't check subtitles for buffering level 1671 if (!(mStreamMask & mPacketSources.keyAt(i) 1672 & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { 1673 continue; 1674 } 1675 // ignore streams that never had any packet queued. 1676 // (it's possible that the variant only has audio or video) 1677 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 1678 if (meta == NULL) { 1679 continue; 1680 } 1681 1682 ++activeCount; 1683 int64_t bufferedDurationUs = 1684 mPacketSources[i]->getEstimatedDurationUs(); 1685 ALOGV("source[%zu]: buffered %lld us", i, (long long)bufferedDurationUs); 1686 if (bufferedDurationUs < kLowWaterMark) { 1687 ++lowCount; 1688 break; 1689 } else if (bufferedDurationUs > kHighWaterMark) { 1690 ++midCount; 1691 ++highCount; 1692 } else if (bufferedDurationUs > kMidWaterMark) { 1693 ++midCount; 1694 } 1695 } 1696 1697 if (activeCount > 0) { 1698 high = (highCount == activeCount); 1699 mid = (midCount == activeCount); 1700 low = (lowCount > 0); 1701 return true; 1702 } 1703 1704 return false; 1705} 1706 1707void LiveSession::switchBandwidthIfNeeded(bool canSwitchUp) { 1708 ssize_t bandwidthIndex = getBandwidthIndex(); 1709 1710 if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) 1711 || (!canSwitchUp && bandwidthIndex < mCurBandwidthIndex)) { 1712 changeConfiguration(-1, bandwidthIndex, false); 1713 } 1714} 1715 1716void LiveSession::postPrepared(status_t err) { 1717 CHECK(mInPreparationPhase); 1718 1719 sp<AMessage> notify = mNotify->dup(); 1720 if (err == OK || err == ERROR_END_OF_STREAM) { 1721 notify->setInt32("what", kWhatPrepared); 1722 } else { 1723 notify->setInt32("what", kWhatPreparationFailed); 1724 notify->setInt32("err", err); 1725 } 1726 1727 notify->post(); 1728 1729 mInPreparationPhase = false; 1730} 1731 1732 1733} // namespace android 1734 1735