LiveSession.cpp revision ff430c633e52f15acdd305953e6071b75f1e700c
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52// Number of recently-read bytes to use for bandwidth estimation 53const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024; 54 55LiveSession::LiveSession( 56 const sp<AMessage> ¬ify, uint32_t flags, 57 const sp<IMediaHTTPService> &httpService) 58 : mNotify(notify), 59 mFlags(flags), 60 mHTTPService(httpService), 61 mInPreparationPhase(true), 62 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 63 mCurBandwidthIndex(-1), 64 mStreamMask(0), 65 mNewStreamMask(0), 66 mSwapMask(0), 67 mCheckBandwidthGeneration(0), 68 mSwitchGeneration(0), 69 mSubtitleGeneration(0), 70 mLastDequeuedTimeUs(0ll), 71 mRealTimeBaseUs(0ll), 72 mReconfigurationInProgress(false), 73 mSwitchInProgress(false), 74 mDisconnectReplyID(0), 75 mSeekReplyID(0), 76 mFirstTimeUsValid(false), 77 mFirstTimeUs(0), 78 mLastSeekTimeUs(0) { 79 80 mStreams[kAudioIndex] = StreamItem("audio"); 81 mStreams[kVideoIndex] = StreamItem("video"); 82 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 83 84 for (size_t i = 0; i < kMaxStreams; ++i) { 85 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 86 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 87 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 88 mBuffering[i] = false; 89 } 90 91 size_t numHistoryItems = kBandwidthHistoryBytes / 92 PlaylistFetcher::kDownloadBlockSize + 1; 93 if (numHistoryItems < 5) { 94 numHistoryItems = 5; 95 } 96 mHTTPDataSource->setBandwidthHistorySize(numHistoryItems); 97} 98 99LiveSession::~LiveSession() { 100} 101 102sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 103 ABuffer *discontinuity = new ABuffer(0); 104 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 105 discontinuity->meta()->setInt32("swapPacketSource", swap); 106 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 107 discontinuity->meta()->setInt64("timeUs", -1); 108 return discontinuity; 109} 110 111void LiveSession::swapPacketSource(StreamType stream) { 112 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 113 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 114 sp<AnotherPacketSource> tmp = aps; 115 aps = aps2; 116 aps2 = tmp; 117 aps2->clear(); 118} 119 120status_t LiveSession::dequeueAccessUnit( 121 StreamType stream, sp<ABuffer> *accessUnit) { 122 if (!(mStreamMask & stream)) { 123 // return -EWOULDBLOCK to avoid halting the decoder 124 // when switching between audio/video and audio only. 125 return -EWOULDBLOCK; 126 } 127 128 status_t finalResult; 129 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 130 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 131 discontinuityQueue->dequeueAccessUnit(accessUnit); 132 // seeking, track switching 133 sp<AMessage> extra; 134 int64_t timeUs; 135 if ((*accessUnit)->meta()->findMessage("extra", &extra) 136 && extra != NULL 137 && extra->findInt64("timeUs", &timeUs)) { 138 // seeking only 139 mLastSeekTimeUs = timeUs; 140 mDiscontinuityOffsetTimesUs.clear(); 141 mDiscontinuityAbsStartTimesUs.clear(); 142 } 143 return INFO_DISCONTINUITY; 144 } 145 146 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 147 148 ssize_t idx = typeToIndex(stream); 149 if (!packetSource->hasBufferAvailable(&finalResult)) { 150 if (finalResult == OK) { 151 mBuffering[idx] = true; 152 return -EAGAIN; 153 } else { 154 return finalResult; 155 } 156 } 157 158 int32_t targetDuration = 0; 159 sp<AMessage> meta = packetSource->getLatestEnqueuedMeta(); 160 if (meta != NULL) { 161 meta->findInt32("targetDuration", &targetDuration); 162 } 163 164 int64_t targetDurationUs = targetDuration * 1000000ll; 165 if (targetDurationUs == 0 || 166 targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) { 167 // Fetchers limit buffering to 168 // min(3 * targetDuration, kMinBufferedDurationUs) 169 targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs; 170 } 171 172 if (mBuffering[idx]) { 173 if (mSwitchInProgress 174 || packetSource->isFinished(0) 175 || packetSource->getEstimatedDurationUs() > targetDurationUs) { 176 mBuffering[idx] = false; 177 } 178 } 179 180 if (mBuffering[idx]) { 181 return -EAGAIN; 182 } 183 184 // wait for counterpart 185 sp<AnotherPacketSource> otherSource; 186 uint32_t mask = mNewStreamMask & mStreamMask; 187 uint32_t fetchersMask = 0; 188 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 189 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 190 fetchersMask |= fetcherMask; 191 } 192 mask &= fetchersMask; 193 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 194 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 195 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 196 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 197 } 198 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 199 return finalResult == OK ? -EAGAIN : finalResult; 200 } 201 202 status_t err = packetSource->dequeueAccessUnit(accessUnit); 203 204 size_t streamIdx; 205 const char *streamStr; 206 switch (stream) { 207 case STREAMTYPE_AUDIO: 208 streamIdx = kAudioIndex; 209 streamStr = "audio"; 210 break; 211 case STREAMTYPE_VIDEO: 212 streamIdx = kVideoIndex; 213 streamStr = "video"; 214 break; 215 case STREAMTYPE_SUBTITLES: 216 streamIdx = kSubtitleIndex; 217 streamStr = "subs"; 218 break; 219 default: 220 TRESPASS(); 221 } 222 223 StreamItem& strm = mStreams[streamIdx]; 224 if (err == INFO_DISCONTINUITY) { 225 // adaptive streaming, discontinuities in the playlist 226 int32_t type; 227 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 228 229 sp<AMessage> extra; 230 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 231 extra.clear(); 232 } 233 234 ALOGI("[%s] read discontinuity of type %d, extra = %s", 235 streamStr, 236 type, 237 extra == NULL ? "NULL" : extra->debugString().c_str()); 238 239 int32_t swap; 240 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 241 int32_t switchGeneration; 242 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 243 { 244 Mutex::Autolock lock(mSwapMutex); 245 if (switchGeneration == mSwitchGeneration) { 246 swapPacketSource(stream); 247 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 248 msg->setInt32("stream", stream); 249 msg->setInt32("switchGeneration", switchGeneration); 250 msg->post(); 251 } 252 } 253 } else { 254 size_t seq = strm.mCurDiscontinuitySeq; 255 int64_t offsetTimeUs; 256 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 257 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 258 } else { 259 offsetTimeUs = 0; 260 } 261 262 seq += 1; 263 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 264 int64_t firstTimeUs; 265 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 266 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 267 offsetTimeUs += strm.mLastSampleDurationUs; 268 } else { 269 offsetTimeUs += strm.mLastSampleDurationUs; 270 } 271 272 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 273 } 274 } else if (err == OK) { 275 276 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 277 int64_t timeUs; 278 int32_t discontinuitySeq = 0; 279 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 280 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 281 strm.mCurDiscontinuitySeq = discontinuitySeq; 282 283 int32_t discard = 0; 284 int64_t firstTimeUs; 285 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 286 int64_t durUs; // approximate sample duration 287 if (timeUs > strm.mLastDequeuedTimeUs) { 288 durUs = timeUs - strm.mLastDequeuedTimeUs; 289 } else { 290 durUs = strm.mLastDequeuedTimeUs - timeUs; 291 } 292 strm.mLastSampleDurationUs = durUs; 293 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 294 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 295 firstTimeUs = timeUs; 296 } else { 297 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 298 firstTimeUs = timeUs; 299 } 300 301 strm.mLastDequeuedTimeUs = timeUs; 302 if (timeUs >= firstTimeUs) { 303 timeUs -= firstTimeUs; 304 } else { 305 timeUs = 0; 306 } 307 timeUs += mLastSeekTimeUs; 308 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 309 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 310 } 311 312 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 313 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 314 mLastDequeuedTimeUs = timeUs; 315 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 316 } else if (stream == STREAMTYPE_SUBTITLES) { 317 int32_t subtitleGeneration; 318 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 319 && subtitleGeneration != mSubtitleGeneration) { 320 return -EAGAIN; 321 }; 322 (*accessUnit)->meta()->setInt32( 323 "trackIndex", mPlaylist->getSelectedIndex()); 324 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 325 } 326 } else { 327 ALOGI("[%s] encountered error %d", streamStr, err); 328 } 329 330 return err; 331} 332 333status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 334 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 335 if (!(mStreamMask & stream)) { 336 return UNKNOWN_ERROR; 337 } 338 339 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 340 341 sp<MetaData> meta = packetSource->getFormat(); 342 343 if (meta == NULL) { 344 return -EAGAIN; 345 } 346 347 return convertMetaDataToMessage(meta, format); 348} 349 350void LiveSession::connectAsync( 351 const char *url, const KeyedVector<String8, String8> *headers) { 352 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 353 msg->setString("url", url); 354 355 if (headers != NULL) { 356 msg->setPointer( 357 "headers", 358 new KeyedVector<String8, String8>(*headers)); 359 } 360 361 msg->post(); 362} 363 364status_t LiveSession::disconnect() { 365 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 366 367 sp<AMessage> response; 368 status_t err = msg->postAndAwaitResponse(&response); 369 370 return err; 371} 372 373status_t LiveSession::seekTo(int64_t timeUs) { 374 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 375 msg->setInt64("timeUs", timeUs); 376 377 sp<AMessage> response; 378 status_t err = msg->postAndAwaitResponse(&response); 379 380 return err; 381} 382 383void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 384 switch (msg->what()) { 385 case kWhatConnect: 386 { 387 onConnect(msg); 388 break; 389 } 390 391 case kWhatDisconnect: 392 { 393 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 394 395 if (mReconfigurationInProgress) { 396 break; 397 } 398 399 finishDisconnect(); 400 break; 401 } 402 403 case kWhatSeek: 404 { 405 uint32_t seekReplyID; 406 CHECK(msg->senderAwaitsResponse(&seekReplyID)); 407 mSeekReplyID = seekReplyID; 408 mSeekReply = new AMessage; 409 410 status_t err = onSeek(msg); 411 412 if (err != OK) { 413 msg->post(50000); 414 } 415 break; 416 } 417 418 case kWhatFetcherNotify: 419 { 420 int32_t what; 421 CHECK(msg->findInt32("what", &what)); 422 423 switch (what) { 424 case PlaylistFetcher::kWhatStarted: 425 break; 426 case PlaylistFetcher::kWhatPaused: 427 case PlaylistFetcher::kWhatStopped: 428 { 429 if (what == PlaylistFetcher::kWhatStopped) { 430 AString uri; 431 CHECK(msg->findString("uri", &uri)); 432 if (mFetcherInfos.removeItem(uri) < 0) { 433 // ignore duplicated kWhatStopped messages. 434 break; 435 } 436 437 if (mSwitchInProgress) { 438 tryToFinishBandwidthSwitch(); 439 } 440 } 441 442 if (mContinuation != NULL) { 443 CHECK_GT(mContinuationCounter, 0); 444 if (--mContinuationCounter == 0) { 445 mContinuation->post(); 446 447 if (mSeekReplyID != 0) { 448 CHECK(mSeekReply != NULL); 449 mSeekReply->setInt32("err", OK); 450 mSeekReply->postReply(mSeekReplyID); 451 mSeekReplyID = 0; 452 mSeekReply.clear(); 453 } 454 } 455 } 456 break; 457 } 458 459 case PlaylistFetcher::kWhatDurationUpdate: 460 { 461 AString uri; 462 CHECK(msg->findString("uri", &uri)); 463 464 int64_t durationUs; 465 CHECK(msg->findInt64("durationUs", &durationUs)); 466 467 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 468 info->mDurationUs = durationUs; 469 break; 470 } 471 472 case PlaylistFetcher::kWhatError: 473 { 474 status_t err; 475 CHECK(msg->findInt32("err", &err)); 476 477 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 478 479 // handle EOS on subtitle tracks independently 480 AString uri; 481 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 482 ssize_t i = mFetcherInfos.indexOfKey(uri); 483 if (i >= 0) { 484 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 485 if (fetcher != NULL) { 486 uint32_t type = fetcher->getStreamTypeMask(); 487 if (type == STREAMTYPE_SUBTITLES) { 488 mPacketSources.valueFor( 489 STREAMTYPE_SUBTITLES)->signalEOS(err);; 490 break; 491 } 492 } 493 } 494 } 495 496 if (mInPreparationPhase) { 497 postPrepared(err); 498 } 499 500 cancelBandwidthSwitch(); 501 502 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 503 504 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 505 506 mPacketSources.valueFor( 507 STREAMTYPE_SUBTITLES)->signalEOS(err); 508 509 sp<AMessage> notify = mNotify->dup(); 510 notify->setInt32("what", kWhatError); 511 notify->setInt32("err", err); 512 notify->post(); 513 break; 514 } 515 516 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 517 { 518 AString uri; 519 CHECK(msg->findString("uri", &uri)); 520 521 if (mFetcherInfos.indexOfKey(uri) < 0) { 522 ALOGE("couldn't find uri"); 523 break; 524 } 525 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 526 info->mIsPrepared = true; 527 528 if (mInPreparationPhase) { 529 bool allFetchersPrepared = true; 530 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 531 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 532 allFetchersPrepared = false; 533 break; 534 } 535 } 536 537 if (allFetchersPrepared) { 538 postPrepared(OK); 539 } 540 } 541 break; 542 } 543 544 case PlaylistFetcher::kWhatStartedAt: 545 { 546 int32_t switchGeneration; 547 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 548 549 if (switchGeneration != mSwitchGeneration) { 550 break; 551 } 552 553 // Resume fetcher for the original variant; the resumed fetcher should 554 // continue until the timestamps found in msg, which is stored by the 555 // new fetcher to indicate where the new variant has started buffering. 556 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 557 const FetcherInfo info = mFetcherInfos.valueAt(i); 558 if (info.mToBeRemoved) { 559 info.mFetcher->resumeUntilAsync(msg); 560 } 561 } 562 break; 563 } 564 565 default: 566 TRESPASS(); 567 } 568 569 break; 570 } 571 572 case kWhatCheckBandwidth: 573 { 574 int32_t generation; 575 CHECK(msg->findInt32("generation", &generation)); 576 577 if (generation != mCheckBandwidthGeneration) { 578 break; 579 } 580 581 onCheckBandwidth(msg); 582 break; 583 } 584 585 case kWhatChangeConfiguration: 586 { 587 onChangeConfiguration(msg); 588 break; 589 } 590 591 case kWhatChangeConfiguration2: 592 { 593 onChangeConfiguration2(msg); 594 break; 595 } 596 597 case kWhatChangeConfiguration3: 598 { 599 onChangeConfiguration3(msg); 600 break; 601 } 602 603 case kWhatFinishDisconnect2: 604 { 605 onFinishDisconnect2(); 606 break; 607 } 608 609 case kWhatSwapped: 610 { 611 onSwapped(msg); 612 break; 613 } 614 615 case kWhatCheckSwitchDown: 616 { 617 onCheckSwitchDown(); 618 break; 619 } 620 621 case kWhatSwitchDown: 622 { 623 onSwitchDown(); 624 break; 625 } 626 627 default: 628 TRESPASS(); 629 break; 630 } 631} 632 633// static 634int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 635 if (a->mBandwidth < b->mBandwidth) { 636 return -1; 637 } else if (a->mBandwidth == b->mBandwidth) { 638 return 0; 639 } 640 641 return 1; 642} 643 644// static 645LiveSession::StreamType LiveSession::indexToType(int idx) { 646 CHECK(idx >= 0 && idx < kMaxStreams); 647 return (StreamType)(1 << idx); 648} 649 650// static 651ssize_t LiveSession::typeToIndex(int32_t type) { 652 switch (type) { 653 case STREAMTYPE_AUDIO: 654 return 0; 655 case STREAMTYPE_VIDEO: 656 return 1; 657 case STREAMTYPE_SUBTITLES: 658 return 2; 659 default: 660 return -1; 661 }; 662 return -1; 663} 664 665void LiveSession::onConnect(const sp<AMessage> &msg) { 666 AString url; 667 CHECK(msg->findString("url", &url)); 668 669 KeyedVector<String8, String8> *headers = NULL; 670 if (!msg->findPointer("headers", (void **)&headers)) { 671 mExtraHeaders.clear(); 672 } else { 673 mExtraHeaders = *headers; 674 675 delete headers; 676 headers = NULL; 677 } 678 679 // TODO currently we don't know if we are coming here from incognito mode 680 ALOGI("onConnect %s", uriDebugString(url).c_str()); 681 682 mMasterURL = url; 683 684 bool dummy; 685 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 686 687 if (mPlaylist == NULL) { 688 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 689 690 postPrepared(ERROR_IO); 691 return; 692 } 693 694 // We trust the content provider to make a reasonable choice of preferred 695 // initial bandwidth by listing it first in the variant playlist. 696 // At startup we really don't have a good estimate on the available 697 // network bandwidth since we haven't tranferred any data yet. Once 698 // we have we can make a better informed choice. 699 size_t initialBandwidth = 0; 700 size_t initialBandwidthIndex = 0; 701 702 if (mPlaylist->isVariantPlaylist()) { 703 for (size_t i = 0; i < mPlaylist->size(); ++i) { 704 BandwidthItem item; 705 706 item.mPlaylistIndex = i; 707 708 sp<AMessage> meta; 709 AString uri; 710 mPlaylist->itemAt(i, &uri, &meta); 711 712 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 713 714 if (initialBandwidth == 0) { 715 initialBandwidth = item.mBandwidth; 716 } 717 718 mBandwidthItems.push(item); 719 } 720 721 CHECK_GT(mBandwidthItems.size(), 0u); 722 723 mBandwidthItems.sort(SortByBandwidth); 724 725 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 726 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 727 initialBandwidthIndex = i; 728 break; 729 } 730 } 731 } else { 732 // dummy item. 733 BandwidthItem item; 734 item.mPlaylistIndex = 0; 735 item.mBandwidth = 0; 736 mBandwidthItems.push(item); 737 } 738 739 mPlaylist->pickRandomMediaItems(); 740 changeConfiguration( 741 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 742} 743 744void LiveSession::finishDisconnect() { 745 // No reconfiguration is currently pending, make sure none will trigger 746 // during disconnection either. 747 cancelCheckBandwidthEvent(); 748 749 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 750 // (finishDisconnect, onFinishDisconnect2) 751 cancelBandwidthSwitch(); 752 753 // cancel switch down monitor 754 mSwitchDownMonitor.clear(); 755 756 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 757 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 758 } 759 760 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 761 762 mContinuationCounter = mFetcherInfos.size(); 763 mContinuation = msg; 764 765 if (mContinuationCounter == 0) { 766 msg->post(); 767 } 768} 769 770void LiveSession::onFinishDisconnect2() { 771 mContinuation.clear(); 772 773 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 774 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 775 776 mPacketSources.valueFor( 777 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 778 779 sp<AMessage> response = new AMessage; 780 response->setInt32("err", OK); 781 782 response->postReply(mDisconnectReplyID); 783 mDisconnectReplyID = 0; 784} 785 786sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 787 ssize_t index = mFetcherInfos.indexOfKey(uri); 788 789 if (index >= 0) { 790 return NULL; 791 } 792 793 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 794 notify->setString("uri", uri); 795 notify->setInt32("switchGeneration", mSwitchGeneration); 796 797 FetcherInfo info; 798 info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration); 799 info.mDurationUs = -1ll; 800 info.mIsPrepared = false; 801 info.mToBeRemoved = false; 802 looper()->registerHandler(info.mFetcher); 803 804 mFetcherInfos.add(uri, info); 805 806 return info.mFetcher; 807} 808 809/* 810 * Illustration of parameters: 811 * 812 * 0 `range_offset` 813 * +------------+-------------------------------------------------------+--+--+ 814 * | | | next block to fetch | | | 815 * | | `source` handle => `out` buffer | | | | 816 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 817 * | |<----------- `range_length` / buffer capacity ----------->| | 818 * |<------------------------------ file_size ------------------------------->| 819 * 820 * Special parameter values: 821 * - range_length == -1 means entire file 822 * - block_size == 0 means entire range 823 * 824 */ 825ssize_t LiveSession::fetchFile( 826 const char *url, sp<ABuffer> *out, 827 int64_t range_offset, int64_t range_length, 828 uint32_t block_size, /* download block size */ 829 sp<DataSource> *source, /* to return and reuse source */ 830 String8 *actualUrl) { 831 off64_t size; 832 sp<DataSource> temp_source; 833 if (source == NULL) { 834 source = &temp_source; 835 } 836 837 if (*source == NULL) { 838 if (!strncasecmp(url, "file://", 7)) { 839 *source = new FileSource(url + 7); 840 } else if (strncasecmp(url, "http://", 7) 841 && strncasecmp(url, "https://", 8)) { 842 return ERROR_UNSUPPORTED; 843 } else { 844 KeyedVector<String8, String8> headers = mExtraHeaders; 845 if (range_offset > 0 || range_length >= 0) { 846 headers.add( 847 String8("Range"), 848 String8( 849 StringPrintf( 850 "bytes=%lld-%s", 851 range_offset, 852 range_length < 0 853 ? "" : StringPrintf("%lld", 854 range_offset + range_length - 1).c_str()).c_str())); 855 } 856 status_t err = mHTTPDataSource->connect(url, &headers); 857 858 if (err != OK) { 859 return err; 860 } 861 862 *source = mHTTPDataSource; 863 } 864 } 865 866 status_t getSizeErr = (*source)->getSize(&size); 867 if (getSizeErr != OK) { 868 size = 65536; 869 } 870 871 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 872 if (*out == NULL) { 873 buffer->setRange(0, 0); 874 } 875 876 ssize_t bytesRead = 0; 877 // adjust range_length if only reading partial block 878 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 879 range_length = buffer->size() + block_size; 880 } 881 for (;;) { 882 // Only resize when we don't know the size. 883 size_t bufferRemaining = buffer->capacity() - buffer->size(); 884 if (bufferRemaining == 0 && getSizeErr != OK) { 885 size_t bufferIncrement = buffer->size() / 2; 886 if (bufferIncrement < 32768) { 887 bufferIncrement = 32768; 888 } 889 bufferRemaining = bufferIncrement; 890 891 ALOGV("increasing download buffer to %zu bytes", 892 buffer->size() + bufferRemaining); 893 894 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 895 memcpy(copy->data(), buffer->data(), buffer->size()); 896 copy->setRange(0, buffer->size()); 897 898 buffer = copy; 899 } 900 901 size_t maxBytesToRead = bufferRemaining; 902 if (range_length >= 0) { 903 int64_t bytesLeftInRange = range_length - buffer->size(); 904 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 905 maxBytesToRead = bytesLeftInRange; 906 907 if (bytesLeftInRange == 0) { 908 break; 909 } 910 } 911 } 912 913 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 914 // to help us break out of the loop. 915 ssize_t n = (*source)->readAt( 916 buffer->size(), buffer->data() + buffer->size(), 917 maxBytesToRead); 918 919 if (n < 0) { 920 return n; 921 } 922 923 if (n == 0) { 924 break; 925 } 926 927 buffer->setRange(0, buffer->size() + (size_t)n); 928 bytesRead += n; 929 } 930 931 *out = buffer; 932 if (actualUrl != NULL) { 933 *actualUrl = (*source)->getUri(); 934 if (actualUrl->isEmpty()) { 935 *actualUrl = url; 936 } 937 } 938 939 return bytesRead; 940} 941 942sp<M3UParser> LiveSession::fetchPlaylist( 943 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 944 ALOGV("fetchPlaylist '%s'", url); 945 946 *unchanged = false; 947 948 sp<ABuffer> buffer; 949 String8 actualUrl; 950 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 951 952 if (err <= 0) { 953 return NULL; 954 } 955 956 // MD5 functionality is not available on the simulator, treat all 957 // playlists as changed. 958 959#if defined(HAVE_ANDROID_OS) 960 uint8_t hash[16]; 961 962 MD5_CTX m; 963 MD5_Init(&m); 964 MD5_Update(&m, buffer->data(), buffer->size()); 965 966 MD5_Final(hash, &m); 967 968 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 969 // playlist unchanged 970 *unchanged = true; 971 972 return NULL; 973 } 974 975 if (curPlaylistHash != NULL) { 976 memcpy(curPlaylistHash, hash, sizeof(hash)); 977 } 978#endif 979 980 sp<M3UParser> playlist = 981 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 982 983 if (playlist->initCheck() != OK) { 984 ALOGE("failed to parse .m3u8 playlist"); 985 986 return NULL; 987 } 988 989 return playlist; 990} 991 992#if 0 993static double uniformRand() { 994 return (double)rand() / RAND_MAX; 995} 996#endif 997 998size_t LiveSession::getBandwidthIndex() { 999 if (mBandwidthItems.size() == 0) { 1000 return 0; 1001 } 1002 1003#if 1 1004 char value[PROPERTY_VALUE_MAX]; 1005 ssize_t index = -1; 1006 if (property_get("media.httplive.bw-index", value, NULL)) { 1007 char *end; 1008 index = strtol(value, &end, 10); 1009 CHECK(end > value && *end == '\0'); 1010 1011 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 1012 index = mBandwidthItems.size() - 1; 1013 } 1014 } 1015 1016 if (index < 0) { 1017 int32_t bandwidthBps; 1018 if (mHTTPDataSource != NULL 1019 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 1020 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 1021 } else { 1022 ALOGV("no bandwidth estimate."); 1023 return 0; // Pick the lowest bandwidth stream by default. 1024 } 1025 1026 char value[PROPERTY_VALUE_MAX]; 1027 if (property_get("media.httplive.max-bw", value, NULL)) { 1028 char *end; 1029 long maxBw = strtoul(value, &end, 10); 1030 if (end > value && *end == '\0') { 1031 if (maxBw > 0 && bandwidthBps > maxBw) { 1032 ALOGV("bandwidth capped to %ld bps", maxBw); 1033 bandwidthBps = maxBw; 1034 } 1035 } 1036 } 1037 1038 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1039 1040 index = mBandwidthItems.size() - 1; 1041 while (index > 0) { 1042 // consider only 80% of the available bandwidth, but if we are switching up, 1043 // be even more conservative (70%) to avoid overestimating and immediately 1044 // switching back. 1045 size_t adjustedBandwidthBps = bandwidthBps; 1046 if (index > mCurBandwidthIndex) { 1047 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 1048 } else { 1049 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 1050 } 1051 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1052 break; 1053 } 1054 --index; 1055 } 1056 } 1057#elif 0 1058 // Change bandwidth at random() 1059 size_t index = uniformRand() * mBandwidthItems.size(); 1060#elif 0 1061 // There's a 50% chance to stay on the current bandwidth and 1062 // a 50% chance to switch to the next higher bandwidth (wrapping around 1063 // to lowest) 1064 const size_t kMinIndex = 0; 1065 1066 static ssize_t mCurBandwidthIndex = -1; 1067 1068 size_t index; 1069 if (mCurBandwidthIndex < 0) { 1070 index = kMinIndex; 1071 } else if (uniformRand() < 0.5) { 1072 index = (size_t)mCurBandwidthIndex; 1073 } else { 1074 index = mCurBandwidthIndex + 1; 1075 if (index == mBandwidthItems.size()) { 1076 index = kMinIndex; 1077 } 1078 } 1079 mCurBandwidthIndex = index; 1080#elif 0 1081 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1082 1083 size_t index = mBandwidthItems.size() - 1; 1084 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1085 --index; 1086 } 1087#elif 1 1088 char value[PROPERTY_VALUE_MAX]; 1089 size_t index; 1090 if (property_get("media.httplive.bw-index", value, NULL)) { 1091 char *end; 1092 index = strtoul(value, &end, 10); 1093 CHECK(end > value && *end == '\0'); 1094 1095 if (index >= mBandwidthItems.size()) { 1096 index = mBandwidthItems.size() - 1; 1097 } 1098 } else { 1099 index = 0; 1100 } 1101#else 1102 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1103#endif 1104 1105 CHECK_GE(index, 0); 1106 1107 return index; 1108} 1109 1110int64_t LiveSession::latestMediaSegmentStartTimeUs() { 1111 sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta(); 1112 int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1; 1113 if (audioMeta != NULL) { 1114 audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs); 1115 } 1116 1117 sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta(); 1118 if (videoMeta != NULL 1119 && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) { 1120 if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) { 1121 minSegmentStartTimeUs = videoSegmentStartTimeUs; 1122 } 1123 1124 } 1125 return minSegmentStartTimeUs; 1126} 1127 1128status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1129 int64_t timeUs; 1130 CHECK(msg->findInt64("timeUs", &timeUs)); 1131 1132 if (!mReconfigurationInProgress) { 1133 changeConfiguration(timeUs, mCurBandwidthIndex); 1134 return OK; 1135 } else { 1136 return -EWOULDBLOCK; 1137 } 1138} 1139 1140status_t LiveSession::getDuration(int64_t *durationUs) const { 1141 int64_t maxDurationUs = -1ll; 1142 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1143 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1144 1145 if (fetcherDurationUs > maxDurationUs) { 1146 maxDurationUs = fetcherDurationUs; 1147 } 1148 } 1149 1150 *durationUs = maxDurationUs; 1151 1152 return OK; 1153} 1154 1155bool LiveSession::isSeekable() const { 1156 int64_t durationUs; 1157 return getDuration(&durationUs) == OK && durationUs >= 0; 1158} 1159 1160bool LiveSession::hasDynamicDuration() const { 1161 return false; 1162} 1163 1164size_t LiveSession::getTrackCount() const { 1165 if (mPlaylist == NULL) { 1166 return 0; 1167 } else { 1168 return mPlaylist->getTrackCount(); 1169 } 1170} 1171 1172sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1173 if (mPlaylist == NULL) { 1174 return NULL; 1175 } else { 1176 return mPlaylist->getTrackInfo(trackIndex); 1177 } 1178} 1179 1180status_t LiveSession::selectTrack(size_t index, bool select) { 1181 if (mPlaylist == NULL) { 1182 return INVALID_OPERATION; 1183 } 1184 1185 ++mSubtitleGeneration; 1186 status_t err = mPlaylist->selectTrack(index, select); 1187 if (err == OK) { 1188 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1189 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1190 msg->setInt32("pickTrack", select); 1191 msg->post(); 1192 } 1193 return err; 1194} 1195 1196ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1197 if (mPlaylist == NULL) { 1198 return -1; 1199 } else { 1200 return mPlaylist->getSelectedTrack(type); 1201 } 1202} 1203 1204bool LiveSession::canSwitchUp() { 1205 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1206 status_t err = OK; 1207 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1208 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1209 int64_t dur = source->getBufferedDurationUs(&err); 1210 if (err == OK && dur > 10000000) { 1211 return true; 1212 } 1213 } 1214 return false; 1215} 1216 1217void LiveSession::changeConfiguration( 1218 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1219 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1220 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1221 cancelBandwidthSwitch(); 1222 1223 CHECK(!mReconfigurationInProgress); 1224 mReconfigurationInProgress = true; 1225 1226 mCurBandwidthIndex = bandwidthIndex; 1227 1228 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1229 timeUs, bandwidthIndex, pickTrack); 1230 1231 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1232 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1233 1234 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1235 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1236 1237 AString URIs[kMaxStreams]; 1238 for (size_t i = 0; i < kMaxStreams; ++i) { 1239 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1240 streamMask |= indexToType(i); 1241 } 1242 } 1243 1244 // Step 1, stop and discard fetchers that are no longer needed. 1245 // Pause those that we'll reuse. 1246 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1247 const AString &uri = mFetcherInfos.keyAt(i); 1248 1249 bool discardFetcher = true; 1250 1251 // If we're seeking all current fetchers are discarded. 1252 if (timeUs < 0ll) { 1253 // delay fetcher removal if not picking tracks 1254 discardFetcher = pickTrack; 1255 1256 for (size_t j = 0; j < kMaxStreams; ++j) { 1257 StreamType type = indexToType(j); 1258 if ((streamMask & type) && uri == URIs[j]) { 1259 resumeMask |= type; 1260 streamMask &= ~type; 1261 discardFetcher = false; 1262 } 1263 } 1264 } 1265 1266 if (discardFetcher) { 1267 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1268 } else { 1269 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1270 } 1271 } 1272 1273 sp<AMessage> msg; 1274 if (timeUs < 0ll) { 1275 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1276 msg = new AMessage(kWhatChangeConfiguration3, id()); 1277 } else { 1278 msg = new AMessage(kWhatChangeConfiguration2, id()); 1279 } 1280 msg->setInt32("streamMask", streamMask); 1281 msg->setInt32("resumeMask", resumeMask); 1282 msg->setInt32("pickTrack", pickTrack); 1283 msg->setInt64("timeUs", timeUs); 1284 for (size_t i = 0; i < kMaxStreams; ++i) { 1285 if ((streamMask | resumeMask) & indexToType(i)) { 1286 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1287 } 1288 } 1289 1290 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1291 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1292 // fetchers have completed their asynchronous operation, we'll post 1293 // mContinuation, which then is handled below in onChangeConfiguration2. 1294 mContinuationCounter = mFetcherInfos.size(); 1295 mContinuation = msg; 1296 1297 if (mContinuationCounter == 0) { 1298 msg->post(); 1299 1300 if (mSeekReplyID != 0) { 1301 CHECK(mSeekReply != NULL); 1302 mSeekReply->setInt32("err", OK); 1303 mSeekReply->postReply(mSeekReplyID); 1304 mSeekReplyID = 0; 1305 mSeekReply.clear(); 1306 } 1307 } 1308} 1309 1310void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1311 if (!mReconfigurationInProgress) { 1312 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1313 msg->findInt32("pickTrack", &pickTrack); 1314 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1315 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1316 } else { 1317 msg->post(1000000ll); // retry in 1 sec 1318 } 1319} 1320 1321void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1322 mContinuation.clear(); 1323 1324 // All fetchers are either suspended or have been removed now. 1325 1326 uint32_t streamMask, resumeMask; 1327 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1328 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1329 1330 // currently onChangeConfiguration2 is only called for seeking; 1331 // remove the following CHECK if using it else where. 1332 CHECK_EQ(resumeMask, 0); 1333 streamMask |= resumeMask; 1334 1335 AString URIs[kMaxStreams]; 1336 for (size_t i = 0; i < kMaxStreams; ++i) { 1337 if (streamMask & indexToType(i)) { 1338 const AString &uriKey = mStreams[i].uriKey(); 1339 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1340 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1341 } 1342 } 1343 1344 // Determine which decoders to shutdown on the player side, 1345 // a decoder has to be shutdown if either 1346 // 1) its streamtype was active before but now longer isn't. 1347 // or 1348 // 2) its streamtype was already active and still is but the URI 1349 // has changed. 1350 uint32_t changedMask = 0; 1351 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1352 if (((mStreamMask & streamMask & indexToType(i)) 1353 && !(URIs[i] == mStreams[i].mUri)) 1354 || (mStreamMask & ~streamMask & indexToType(i))) { 1355 changedMask |= indexToType(i); 1356 } 1357 } 1358 1359 if (changedMask == 0) { 1360 // If nothing changed as far as the audio/video decoders 1361 // are concerned we can proceed. 1362 onChangeConfiguration3(msg); 1363 return; 1364 } 1365 1366 // Something changed, inform the player which will shutdown the 1367 // corresponding decoders and will post the reply once that's done. 1368 // Handling the reply will continue executing below in 1369 // onChangeConfiguration3. 1370 sp<AMessage> notify = mNotify->dup(); 1371 notify->setInt32("what", kWhatStreamsChanged); 1372 notify->setInt32("changedMask", changedMask); 1373 1374 msg->setWhat(kWhatChangeConfiguration3); 1375 msg->setTarget(id()); 1376 1377 notify->setMessage("reply", msg); 1378 notify->post(); 1379} 1380 1381void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1382 mContinuation.clear(); 1383 // All remaining fetchers are still suspended, the player has shutdown 1384 // any decoders that needed it. 1385 1386 uint32_t streamMask, resumeMask; 1387 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1388 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1389 1390 int64_t timeUs; 1391 int32_t pickTrack; 1392 bool switching = false; 1393 CHECK(msg->findInt64("timeUs", &timeUs)); 1394 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1395 1396 if (timeUs < 0ll) { 1397 if (!pickTrack) { 1398 switching = true; 1399 } 1400 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1401 } else { 1402 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1403 } 1404 1405 for (size_t i = 0; i < kMaxStreams; ++i) { 1406 if (streamMask & indexToType(i)) { 1407 if (switching) { 1408 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1409 } else { 1410 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1411 } 1412 } 1413 } 1414 1415 mNewStreamMask = streamMask | resumeMask; 1416 if (switching) { 1417 mSwapMask = mStreamMask & ~resumeMask; 1418 } 1419 1420 // Of all existing fetchers: 1421 // * Resume fetchers that are still needed and assign them original packet sources. 1422 // * Mark otherwise unneeded fetchers for removal. 1423 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1424 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1425 const AString &uri = mFetcherInfos.keyAt(i); 1426 1427 sp<AnotherPacketSource> sources[kMaxStreams]; 1428 for (size_t j = 0; j < kMaxStreams; ++j) { 1429 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1430 sources[j] = mPacketSources.valueFor(indexToType(j)); 1431 1432 if (j != kSubtitleIndex) { 1433 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1434 sp<AnotherPacketSource> discontinuityQueue; 1435 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1436 discontinuityQueue->queueDiscontinuity( 1437 ATSParser::DISCONTINUITY_NONE, 1438 NULL, 1439 true); 1440 } 1441 } 1442 } 1443 1444 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1445 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1446 || sources[kSubtitleIndex] != NULL) { 1447 info.mFetcher->startAsync( 1448 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1449 } else { 1450 info.mToBeRemoved = true; 1451 } 1452 } 1453 1454 // streamMask now only contains the types that need a new fetcher created. 1455 1456 if (streamMask != 0) { 1457 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1458 } 1459 1460 // Find out when the original fetchers have buffered up to and start the new fetchers 1461 // at a later timestamp. 1462 for (size_t i = 0; i < kMaxStreams; i++) { 1463 if (!(indexToType(i) & streamMask)) { 1464 continue; 1465 } 1466 1467 AString uri; 1468 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1469 1470 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1471 CHECK(fetcher != NULL); 1472 1473 int64_t startTimeUs = -1; 1474 int64_t segmentStartTimeUs = -1ll; 1475 int32_t discontinuitySeq = -1; 1476 sp<AnotherPacketSource> sources[kMaxStreams]; 1477 1478 if (i == kSubtitleIndex) { 1479 segmentStartTimeUs = latestMediaSegmentStartTimeUs(); 1480 } 1481 1482 // TRICKY: looping from i as earlier streams are already removed from streamMask 1483 for (size_t j = i; j < kMaxStreams; ++j) { 1484 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1485 if ((streamMask & indexToType(j)) && uri == streamUri) { 1486 sources[j] = mPacketSources.valueFor(indexToType(j)); 1487 1488 if (timeUs >= 0) { 1489 sources[j]->clear(); 1490 startTimeUs = timeUs; 1491 1492 sp<AnotherPacketSource> discontinuityQueue; 1493 sp<AMessage> extra = new AMessage; 1494 extra->setInt64("timeUs", timeUs); 1495 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1496 discontinuityQueue->queueDiscontinuity( 1497 ATSParser::DISCONTINUITY_TIME, extra, true); 1498 } else { 1499 int32_t type; 1500 sp<AMessage> meta; 1501 if (pickTrack) { 1502 // selecting 1503 meta = sources[j]->getLatestDequeuedMeta(); 1504 } else { 1505 // adapting 1506 meta = sources[j]->getLatestEnqueuedMeta(); 1507 } 1508 1509 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1510 int64_t tmpUs; 1511 CHECK(meta->findInt64("timeUs", &tmpUs)); 1512 if (startTimeUs < 0 || tmpUs < startTimeUs) { 1513 startTimeUs = tmpUs; 1514 } 1515 1516 CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); 1517 if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { 1518 segmentStartTimeUs = tmpUs; 1519 } 1520 1521 int32_t seq; 1522 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1523 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1524 discontinuitySeq = seq; 1525 } 1526 } 1527 1528 if (pickTrack) { 1529 // selecting track, queue discontinuities before content 1530 sources[j]->clear(); 1531 if (j == kSubtitleIndex) { 1532 break; 1533 } 1534 sp<AnotherPacketSource> discontinuityQueue; 1535 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1536 discontinuityQueue->queueDiscontinuity( 1537 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1538 } else { 1539 // adapting, queue discontinuities after resume 1540 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1541 sources[j]->clear(); 1542 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1543 if (extraStreams & indexToType(j)) { 1544 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1545 } 1546 } 1547 } 1548 1549 streamMask &= ~indexToType(j); 1550 } 1551 } 1552 1553 fetcher->startAsync( 1554 sources[kAudioIndex], 1555 sources[kVideoIndex], 1556 sources[kSubtitleIndex], 1557 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1558 segmentStartTimeUs, 1559 discontinuitySeq, 1560 switching); 1561 } 1562 1563 // All fetchers have now been started, the configuration change 1564 // has completed. 1565 1566 cancelCheckBandwidthEvent(); 1567 scheduleCheckBandwidthEvent(); 1568 1569 ALOGV("XXX configuration change completed."); 1570 mReconfigurationInProgress = false; 1571 if (switching) { 1572 mSwitchInProgress = true; 1573 } else { 1574 mStreamMask = mNewStreamMask; 1575 } 1576 1577 if (mDisconnectReplyID != 0) { 1578 finishDisconnect(); 1579 } 1580} 1581 1582void LiveSession::onSwapped(const sp<AMessage> &msg) { 1583 int32_t switchGeneration; 1584 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1585 if (switchGeneration != mSwitchGeneration) { 1586 return; 1587 } 1588 1589 int32_t stream; 1590 CHECK(msg->findInt32("stream", &stream)); 1591 1592 ssize_t idx = typeToIndex(stream); 1593 CHECK(idx >= 0); 1594 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1595 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1596 } 1597 mStreams[idx].mUri = mStreams[idx].mNewUri; 1598 mStreams[idx].mNewUri.clear(); 1599 1600 mSwapMask &= ~stream; 1601 if (mSwapMask != 0) { 1602 return; 1603 } 1604 1605 // Check if new variant contains extra streams. 1606 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1607 while (extraStreams) { 1608 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1609 swapPacketSource(extraStream); 1610 extraStreams &= ~extraStream; 1611 1612 idx = typeToIndex(extraStream); 1613 CHECK(idx >= 0); 1614 if (mStreams[idx].mNewUri.empty()) { 1615 ALOGW("swapping extra stream type %d %s to empty stream", 1616 extraStream, mStreams[idx].mUri.c_str()); 1617 } 1618 mStreams[idx].mUri = mStreams[idx].mNewUri; 1619 mStreams[idx].mNewUri.clear(); 1620 } 1621 1622 tryToFinishBandwidthSwitch(); 1623} 1624 1625void LiveSession::onCheckSwitchDown() { 1626 if (mSwitchDownMonitor == NULL) { 1627 return; 1628 } 1629 1630 if (mSwitchInProgress || mReconfigurationInProgress) { 1631 ALOGV("Switch/Reconfig in progress, defer switch down"); 1632 mSwitchDownMonitor->post(1000000ll); 1633 return; 1634 } 1635 1636 for (size_t i = 0; i < kMaxStreams; ++i) { 1637 int32_t targetDuration; 1638 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1639 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1640 1641 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1642 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1643 int64_t targetDurationUs = targetDuration * 1000000ll; 1644 1645 if (bufferedDurationUs < targetDurationUs / 3) { 1646 (new AMessage(kWhatSwitchDown, id()))->post(); 1647 break; 1648 } 1649 } 1650 } 1651 1652 mSwitchDownMonitor->post(1000000ll); 1653} 1654 1655void LiveSession::onSwitchDown() { 1656 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1657 return; 1658 } 1659 1660 ssize_t bandwidthIndex = getBandwidthIndex(); 1661 if (bandwidthIndex < mCurBandwidthIndex) { 1662 changeConfiguration(-1, bandwidthIndex, false); 1663 return; 1664 } 1665 1666} 1667 1668// Mark switch done when: 1669// 1. all old buffers are swapped out 1670void LiveSession::tryToFinishBandwidthSwitch() { 1671 if (!mSwitchInProgress) { 1672 return; 1673 } 1674 1675 bool needToRemoveFetchers = false; 1676 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1677 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1678 needToRemoveFetchers = true; 1679 break; 1680 } 1681 } 1682 1683 if (!needToRemoveFetchers && mSwapMask == 0) { 1684 ALOGI("mSwitchInProgress = false"); 1685 mStreamMask = mNewStreamMask; 1686 mSwitchInProgress = false; 1687 } 1688} 1689 1690void LiveSession::scheduleCheckBandwidthEvent() { 1691 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1692 msg->setInt32("generation", mCheckBandwidthGeneration); 1693 msg->post(10000000ll); 1694} 1695 1696void LiveSession::cancelCheckBandwidthEvent() { 1697 ++mCheckBandwidthGeneration; 1698} 1699 1700void LiveSession::cancelBandwidthSwitch() { 1701 Mutex::Autolock lock(mSwapMutex); 1702 mSwitchGeneration++; 1703 mSwitchInProgress = false; 1704 mSwapMask = 0; 1705 1706 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1707 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1708 if (info.mToBeRemoved) { 1709 info.mToBeRemoved = false; 1710 } 1711 } 1712 1713 for (size_t i = 0; i < kMaxStreams; ++i) { 1714 if (!mStreams[i].mNewUri.empty()) { 1715 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1716 if (j < 0) { 1717 mStreams[i].mNewUri.clear(); 1718 continue; 1719 } 1720 1721 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1722 info.mFetcher->stopAsync(); 1723 mFetcherInfos.removeItemsAt(j); 1724 mStreams[i].mNewUri.clear(); 1725 } 1726 } 1727} 1728 1729bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1730 if (mReconfigurationInProgress || mSwitchInProgress) { 1731 return false; 1732 } 1733 1734 if (mCurBandwidthIndex < 0) { 1735 return true; 1736 } 1737 1738 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1739 return false; 1740 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1741 return canSwitchUp(); 1742 } else { 1743 return true; 1744 } 1745} 1746 1747void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { 1748 size_t bandwidthIndex = getBandwidthIndex(); 1749 if (canSwitchBandwidthTo(bandwidthIndex)) { 1750 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1751 } else { 1752 // Come back and check again 10 seconds later in case there is nothing to do now. 1753 // If we DO change configuration, once that completes it'll schedule a new 1754 // check bandwidth event with an incremented mCheckBandwidthGeneration. 1755 msg->post(10000000ll); 1756 } 1757} 1758 1759void LiveSession::postPrepared(status_t err) { 1760 CHECK(mInPreparationPhase); 1761 1762 sp<AMessage> notify = mNotify->dup(); 1763 if (err == OK || err == ERROR_END_OF_STREAM) { 1764 notify->setInt32("what", kWhatPrepared); 1765 } else { 1766 notify->setInt32("what", kWhatPreparationFailed); 1767 notify->setInt32("err", err); 1768 } 1769 1770 notify->post(); 1771 1772 mInPreparationPhase = false; 1773 1774 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1775 mSwitchDownMonitor->post(); 1776} 1777 1778} // namespace android 1779 1780