LiveSession.cpp revision 678bcdc852dd8f801f5c46fdc85db587b721d83d
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52// Number of recently-read bytes to use for bandwidth estimation 53const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024; 54 55LiveSession::LiveSession( 56 const sp<AMessage> ¬ify, uint32_t flags, 57 const sp<IMediaHTTPService> &httpService) 58 : mNotify(notify), 59 mFlags(flags), 60 mHTTPService(httpService), 61 mInPreparationPhase(true), 62 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 63 mCurBandwidthIndex(-1), 64 mStreamMask(0), 65 mNewStreamMask(0), 66 mSwapMask(0), 67 mCheckBandwidthGeneration(0), 68 mSwitchGeneration(0), 69 mSubtitleGeneration(0), 70 mLastDequeuedTimeUs(0ll), 71 mRealTimeBaseUs(0ll), 72 mReconfigurationInProgress(false), 73 mSwitchInProgress(false), 74 mDisconnectReplyID(0), 75 mSeekReplyID(0), 76 mFirstTimeUsValid(false), 77 mFirstTimeUs(0), 78 mLastSeekTimeUs(0) { 79 80 mStreams[kAudioIndex] = StreamItem("audio"); 81 mStreams[kVideoIndex] = StreamItem("video"); 82 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 83 84 for (size_t i = 0; i < kMaxStreams; ++i) { 85 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 86 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 87 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 88 mBuffering[i] = false; 89 } 90 91 size_t numHistoryItems = kBandwidthHistoryBytes / 92 PlaylistFetcher::kDownloadBlockSize + 1; 93 if (numHistoryItems < 5) { 94 numHistoryItems = 5; 95 } 96 mHTTPDataSource->setBandwidthHistorySize(numHistoryItems); 97} 98 99LiveSession::~LiveSession() { 100} 101 102sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 103 ABuffer *discontinuity = new ABuffer(0); 104 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 105 discontinuity->meta()->setInt32("swapPacketSource", swap); 106 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 107 discontinuity->meta()->setInt64("timeUs", -1); 108 return discontinuity; 109} 110 111void LiveSession::swapPacketSource(StreamType stream) { 112 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 113 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 114 sp<AnotherPacketSource> tmp = aps; 115 aps = aps2; 116 aps2 = tmp; 117 aps2->clear(); 118} 119 120status_t LiveSession::dequeueAccessUnit( 121 StreamType stream, sp<ABuffer> *accessUnit) { 122 if (!(mStreamMask & stream)) { 123 // return -EWOULDBLOCK to avoid halting the decoder 124 // when switching between audio/video and audio only. 125 return -EWOULDBLOCK; 126 } 127 128 status_t finalResult; 129 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 130 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 131 discontinuityQueue->dequeueAccessUnit(accessUnit); 132 // seeking, track switching 133 sp<AMessage> extra; 134 int64_t timeUs; 135 if ((*accessUnit)->meta()->findMessage("extra", &extra) 136 && extra != NULL 137 && extra->findInt64("timeUs", &timeUs)) { 138 // seeking only 139 mLastSeekTimeUs = timeUs; 140 mDiscontinuityOffsetTimesUs.clear(); 141 mDiscontinuityAbsStartTimesUs.clear(); 142 } 143 return INFO_DISCONTINUITY; 144 } 145 146 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 147 148 ssize_t idx = typeToIndex(stream); 149 if (!packetSource->hasBufferAvailable(&finalResult)) { 150 if (finalResult == OK) { 151 mBuffering[idx] = true; 152 return -EAGAIN; 153 } else { 154 return finalResult; 155 } 156 } 157 158 int32_t targetDuration = 0; 159 sp<AMessage> meta = packetSource->getLatestEnqueuedMeta(); 160 if (meta != NULL) { 161 meta->findInt32("targetDuration", &targetDuration); 162 } 163 164 int64_t targetDurationUs = targetDuration * 1000000ll; 165 if (targetDurationUs == 0 || 166 targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) { 167 // Fetchers limit buffering to 168 // min(3 * targetDuration, kMinBufferedDurationUs) 169 targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs; 170 } 171 172 if (mBuffering[idx]) { 173 if (mSwitchInProgress 174 || packetSource->isFinished(0) 175 || packetSource->getEstimatedDurationUs() > targetDurationUs) { 176 mBuffering[idx] = false; 177 } 178 } 179 180 if (mBuffering[idx]) { 181 return -EAGAIN; 182 } 183 184 // wait for counterpart 185 sp<AnotherPacketSource> otherSource; 186 uint32_t mask = mNewStreamMask & mStreamMask; 187 uint32_t fetchersMask = 0; 188 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 189 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 190 fetchersMask |= fetcherMask; 191 } 192 mask &= fetchersMask; 193 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 194 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 195 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 196 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 197 } 198 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 199 return finalResult == OK ? -EAGAIN : finalResult; 200 } 201 202 status_t err = packetSource->dequeueAccessUnit(accessUnit); 203 204 size_t streamIdx; 205 const char *streamStr; 206 switch (stream) { 207 case STREAMTYPE_AUDIO: 208 streamIdx = kAudioIndex; 209 streamStr = "audio"; 210 break; 211 case STREAMTYPE_VIDEO: 212 streamIdx = kVideoIndex; 213 streamStr = "video"; 214 break; 215 case STREAMTYPE_SUBTITLES: 216 streamIdx = kSubtitleIndex; 217 streamStr = "subs"; 218 break; 219 default: 220 TRESPASS(); 221 } 222 223 StreamItem& strm = mStreams[streamIdx]; 224 if (err == INFO_DISCONTINUITY) { 225 // adaptive streaming, discontinuities in the playlist 226 int32_t type; 227 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 228 229 sp<AMessage> extra; 230 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 231 extra.clear(); 232 } 233 234 ALOGI("[%s] read discontinuity of type %d, extra = %s", 235 streamStr, 236 type, 237 extra == NULL ? "NULL" : extra->debugString().c_str()); 238 239 int32_t swap; 240 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 241 int32_t switchGeneration; 242 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 243 { 244 Mutex::Autolock lock(mSwapMutex); 245 if (switchGeneration == mSwitchGeneration) { 246 swapPacketSource(stream); 247 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 248 msg->setInt32("stream", stream); 249 msg->setInt32("switchGeneration", switchGeneration); 250 msg->post(); 251 } 252 } 253 } else { 254 size_t seq = strm.mCurDiscontinuitySeq; 255 int64_t offsetTimeUs; 256 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 257 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 258 } else { 259 offsetTimeUs = 0; 260 } 261 262 seq += 1; 263 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 264 int64_t firstTimeUs; 265 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 266 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 267 offsetTimeUs += strm.mLastSampleDurationUs; 268 } else { 269 offsetTimeUs += strm.mLastSampleDurationUs; 270 } 271 272 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 273 } 274 } else if (err == OK) { 275 276 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 277 int64_t timeUs; 278 int32_t discontinuitySeq = 0; 279 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 280 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 281 strm.mCurDiscontinuitySeq = discontinuitySeq; 282 283 int32_t discard = 0; 284 int64_t firstTimeUs; 285 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 286 int64_t durUs; // approximate sample duration 287 if (timeUs > strm.mLastDequeuedTimeUs) { 288 durUs = timeUs - strm.mLastDequeuedTimeUs; 289 } else { 290 durUs = strm.mLastDequeuedTimeUs - timeUs; 291 } 292 strm.mLastSampleDurationUs = durUs; 293 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 294 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 295 firstTimeUs = timeUs; 296 } else { 297 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 298 firstTimeUs = timeUs; 299 } 300 301 strm.mLastDequeuedTimeUs = timeUs; 302 if (timeUs >= firstTimeUs) { 303 timeUs -= firstTimeUs; 304 } else { 305 timeUs = 0; 306 } 307 timeUs += mLastSeekTimeUs; 308 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 309 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 310 } 311 312 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 313 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 314 mLastDequeuedTimeUs = timeUs; 315 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 316 } else if (stream == STREAMTYPE_SUBTITLES) { 317 int32_t subtitleGeneration; 318 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 319 && subtitleGeneration != mSubtitleGeneration) { 320 return -EAGAIN; 321 }; 322 (*accessUnit)->meta()->setInt32( 323 "trackIndex", mPlaylist->getSelectedIndex()); 324 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 325 } 326 } else { 327 ALOGI("[%s] encountered error %d", streamStr, err); 328 } 329 330 return err; 331} 332 333status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 334 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 335 if (!(mStreamMask & stream)) { 336 return UNKNOWN_ERROR; 337 } 338 339 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 340 341 sp<MetaData> meta = packetSource->getFormat(); 342 343 if (meta == NULL) { 344 return -EAGAIN; 345 } 346 347 return convertMetaDataToMessage(meta, format); 348} 349 350void LiveSession::connectAsync( 351 const char *url, const KeyedVector<String8, String8> *headers) { 352 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 353 msg->setString("url", url); 354 355 if (headers != NULL) { 356 msg->setPointer( 357 "headers", 358 new KeyedVector<String8, String8>(*headers)); 359 } 360 361 msg->post(); 362} 363 364status_t LiveSession::disconnect() { 365 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 366 367 sp<AMessage> response; 368 status_t err = msg->postAndAwaitResponse(&response); 369 370 return err; 371} 372 373status_t LiveSession::seekTo(int64_t timeUs) { 374 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 375 msg->setInt64("timeUs", timeUs); 376 377 sp<AMessage> response; 378 status_t err = msg->postAndAwaitResponse(&response); 379 380 return err; 381} 382 383void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 384 switch (msg->what()) { 385 case kWhatConnect: 386 { 387 onConnect(msg); 388 break; 389 } 390 391 case kWhatDisconnect: 392 { 393 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 394 395 if (mReconfigurationInProgress) { 396 break; 397 } 398 399 finishDisconnect(); 400 break; 401 } 402 403 case kWhatSeek: 404 { 405 uint32_t seekReplyID; 406 CHECK(msg->senderAwaitsResponse(&seekReplyID)); 407 mSeekReplyID = seekReplyID; 408 mSeekReply = new AMessage; 409 410 status_t err = onSeek(msg); 411 412 if (err != OK) { 413 msg->post(50000); 414 } 415 break; 416 } 417 418 case kWhatFetcherNotify: 419 { 420 int32_t what; 421 CHECK(msg->findInt32("what", &what)); 422 423 switch (what) { 424 case PlaylistFetcher::kWhatStarted: 425 break; 426 case PlaylistFetcher::kWhatPaused: 427 case PlaylistFetcher::kWhatStopped: 428 { 429 if (what == PlaylistFetcher::kWhatStopped) { 430 AString uri; 431 CHECK(msg->findString("uri", &uri)); 432 if (mFetcherInfos.removeItem(uri) < 0) { 433 // ignore duplicated kWhatStopped messages. 434 break; 435 } 436 437 if (mSwitchInProgress) { 438 tryToFinishBandwidthSwitch(); 439 } 440 } 441 442 if (mContinuation != NULL) { 443 CHECK_GT(mContinuationCounter, 0); 444 if (--mContinuationCounter == 0) { 445 mContinuation->post(); 446 447 if (mSeekReplyID != 0) { 448 CHECK(mSeekReply != NULL); 449 mSeekReply->setInt32("err", OK); 450 mSeekReply->postReply(mSeekReplyID); 451 mSeekReplyID = 0; 452 mSeekReply.clear(); 453 } 454 } 455 } 456 break; 457 } 458 459 case PlaylistFetcher::kWhatDurationUpdate: 460 { 461 AString uri; 462 CHECK(msg->findString("uri", &uri)); 463 464 int64_t durationUs; 465 CHECK(msg->findInt64("durationUs", &durationUs)); 466 467 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 468 info->mDurationUs = durationUs; 469 break; 470 } 471 472 case PlaylistFetcher::kWhatError: 473 { 474 status_t err; 475 CHECK(msg->findInt32("err", &err)); 476 477 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 478 479 // handle EOS on subtitle tracks independently 480 AString uri; 481 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 482 ssize_t i = mFetcherInfos.indexOfKey(uri); 483 if (i >= 0) { 484 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 485 if (fetcher != NULL) { 486 uint32_t type = fetcher->getStreamTypeMask(); 487 if (type == STREAMTYPE_SUBTITLES) { 488 mPacketSources.valueFor( 489 STREAMTYPE_SUBTITLES)->signalEOS(err);; 490 break; 491 } 492 } 493 } 494 } 495 496 if (mInPreparationPhase) { 497 postPrepared(err); 498 } 499 500 cancelBandwidthSwitch(); 501 502 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 503 504 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 505 506 mPacketSources.valueFor( 507 STREAMTYPE_SUBTITLES)->signalEOS(err); 508 509 sp<AMessage> notify = mNotify->dup(); 510 notify->setInt32("what", kWhatError); 511 notify->setInt32("err", err); 512 notify->post(); 513 break; 514 } 515 516 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 517 { 518 AString uri; 519 CHECK(msg->findString("uri", &uri)); 520 521 if (mFetcherInfos.indexOfKey(uri) < 0) { 522 ALOGE("couldn't find uri"); 523 break; 524 } 525 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 526 info->mIsPrepared = true; 527 528 if (mInPreparationPhase) { 529 bool allFetchersPrepared = true; 530 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 531 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 532 allFetchersPrepared = false; 533 break; 534 } 535 } 536 537 if (allFetchersPrepared) { 538 postPrepared(OK); 539 } 540 } 541 break; 542 } 543 544 case PlaylistFetcher::kWhatStartedAt: 545 { 546 int32_t switchGeneration; 547 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 548 549 if (switchGeneration != mSwitchGeneration) { 550 break; 551 } 552 553 // Resume fetcher for the original variant; the resumed fetcher should 554 // continue until the timestamps found in msg, which is stored by the 555 // new fetcher to indicate where the new variant has started buffering. 556 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 557 const FetcherInfo info = mFetcherInfos.valueAt(i); 558 if (info.mToBeRemoved) { 559 info.mFetcher->resumeUntilAsync(msg); 560 } 561 } 562 break; 563 } 564 565 default: 566 TRESPASS(); 567 } 568 569 break; 570 } 571 572 case kWhatCheckBandwidth: 573 { 574 int32_t generation; 575 CHECK(msg->findInt32("generation", &generation)); 576 577 if (generation != mCheckBandwidthGeneration) { 578 break; 579 } 580 581 onCheckBandwidth(msg); 582 break; 583 } 584 585 case kWhatChangeConfiguration: 586 { 587 onChangeConfiguration(msg); 588 break; 589 } 590 591 case kWhatChangeConfiguration2: 592 { 593 onChangeConfiguration2(msg); 594 break; 595 } 596 597 case kWhatChangeConfiguration3: 598 { 599 onChangeConfiguration3(msg); 600 break; 601 } 602 603 case kWhatFinishDisconnect2: 604 { 605 onFinishDisconnect2(); 606 break; 607 } 608 609 case kWhatSwapped: 610 { 611 onSwapped(msg); 612 break; 613 } 614 615 case kWhatCheckSwitchDown: 616 { 617 onCheckSwitchDown(); 618 break; 619 } 620 621 case kWhatSwitchDown: 622 { 623 onSwitchDown(); 624 break; 625 } 626 627 default: 628 TRESPASS(); 629 break; 630 } 631} 632 633// static 634int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 635 if (a->mBandwidth < b->mBandwidth) { 636 return -1; 637 } else if (a->mBandwidth == b->mBandwidth) { 638 return 0; 639 } 640 641 return 1; 642} 643 644// static 645LiveSession::StreamType LiveSession::indexToType(int idx) { 646 CHECK(idx >= 0 && idx < kMaxStreams); 647 return (StreamType)(1 << idx); 648} 649 650// static 651ssize_t LiveSession::typeToIndex(int32_t type) { 652 switch (type) { 653 case STREAMTYPE_AUDIO: 654 return 0; 655 case STREAMTYPE_VIDEO: 656 return 1; 657 case STREAMTYPE_SUBTITLES: 658 return 2; 659 default: 660 return -1; 661 }; 662 return -1; 663} 664 665void LiveSession::onConnect(const sp<AMessage> &msg) { 666 AString url; 667 CHECK(msg->findString("url", &url)); 668 669 KeyedVector<String8, String8> *headers = NULL; 670 if (!msg->findPointer("headers", (void **)&headers)) { 671 mExtraHeaders.clear(); 672 } else { 673 mExtraHeaders = *headers; 674 675 delete headers; 676 headers = NULL; 677 } 678 679 // TODO currently we don't know if we are coming here from incognito mode 680 ALOGI("onConnect %s", uriDebugString(url).c_str()); 681 682 mMasterURL = url; 683 684 bool dummy; 685 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 686 687 if (mPlaylist == NULL) { 688 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 689 690 postPrepared(ERROR_IO); 691 return; 692 } 693 694 // We trust the content provider to make a reasonable choice of preferred 695 // initial bandwidth by listing it first in the variant playlist. 696 // At startup we really don't have a good estimate on the available 697 // network bandwidth since we haven't tranferred any data yet. Once 698 // we have we can make a better informed choice. 699 size_t initialBandwidth = 0; 700 size_t initialBandwidthIndex = 0; 701 702 if (mPlaylist->isVariantPlaylist()) { 703 for (size_t i = 0; i < mPlaylist->size(); ++i) { 704 BandwidthItem item; 705 706 item.mPlaylistIndex = i; 707 708 sp<AMessage> meta; 709 AString uri; 710 mPlaylist->itemAt(i, &uri, &meta); 711 712 unsigned long bandwidth; 713 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 714 715 if (initialBandwidth == 0) { 716 initialBandwidth = item.mBandwidth; 717 } 718 719 mBandwidthItems.push(item); 720 } 721 722 CHECK_GT(mBandwidthItems.size(), 0u); 723 724 mBandwidthItems.sort(SortByBandwidth); 725 726 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 727 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 728 initialBandwidthIndex = i; 729 break; 730 } 731 } 732 } else { 733 // dummy item. 734 BandwidthItem item; 735 item.mPlaylistIndex = 0; 736 item.mBandwidth = 0; 737 mBandwidthItems.push(item); 738 } 739 740 mPlaylist->pickRandomMediaItems(); 741 changeConfiguration( 742 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 743} 744 745void LiveSession::finishDisconnect() { 746 // No reconfiguration is currently pending, make sure none will trigger 747 // during disconnection either. 748 cancelCheckBandwidthEvent(); 749 750 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 751 // (finishDisconnect, onFinishDisconnect2) 752 cancelBandwidthSwitch(); 753 754 // cancel switch down monitor 755 mSwitchDownMonitor.clear(); 756 757 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 758 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 759 } 760 761 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 762 763 mContinuationCounter = mFetcherInfos.size(); 764 mContinuation = msg; 765 766 if (mContinuationCounter == 0) { 767 msg->post(); 768 } 769} 770 771void LiveSession::onFinishDisconnect2() { 772 mContinuation.clear(); 773 774 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 775 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 776 777 mPacketSources.valueFor( 778 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 779 780 sp<AMessage> response = new AMessage; 781 response->setInt32("err", OK); 782 783 response->postReply(mDisconnectReplyID); 784 mDisconnectReplyID = 0; 785} 786 787sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 788 ssize_t index = mFetcherInfos.indexOfKey(uri); 789 790 if (index >= 0) { 791 return NULL; 792 } 793 794 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 795 notify->setString("uri", uri); 796 notify->setInt32("switchGeneration", mSwitchGeneration); 797 798 FetcherInfo info; 799 info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration); 800 info.mDurationUs = -1ll; 801 info.mIsPrepared = false; 802 info.mToBeRemoved = false; 803 looper()->registerHandler(info.mFetcher); 804 805 mFetcherInfos.add(uri, info); 806 807 return info.mFetcher; 808} 809 810/* 811 * Illustration of parameters: 812 * 813 * 0 `range_offset` 814 * +------------+-------------------------------------------------------+--+--+ 815 * | | | next block to fetch | | | 816 * | | `source` handle => `out` buffer | | | | 817 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 818 * | |<----------- `range_length` / buffer capacity ----------->| | 819 * |<------------------------------ file_size ------------------------------->| 820 * 821 * Special parameter values: 822 * - range_length == -1 means entire file 823 * - block_size == 0 means entire range 824 * 825 */ 826ssize_t LiveSession::fetchFile( 827 const char *url, sp<ABuffer> *out, 828 int64_t range_offset, int64_t range_length, 829 uint32_t block_size, /* download block size */ 830 sp<DataSource> *source, /* to return and reuse source */ 831 String8 *actualUrl) { 832 off64_t size; 833 sp<DataSource> temp_source; 834 if (source == NULL) { 835 source = &temp_source; 836 } 837 838 if (*source == NULL) { 839 if (!strncasecmp(url, "file://", 7)) { 840 *source = new FileSource(url + 7); 841 } else if (strncasecmp(url, "http://", 7) 842 && strncasecmp(url, "https://", 8)) { 843 return ERROR_UNSUPPORTED; 844 } else { 845 KeyedVector<String8, String8> headers = mExtraHeaders; 846 if (range_offset > 0 || range_length >= 0) { 847 headers.add( 848 String8("Range"), 849 String8( 850 StringPrintf( 851 "bytes=%lld-%s", 852 range_offset, 853 range_length < 0 854 ? "" : StringPrintf("%lld", 855 range_offset + range_length - 1).c_str()).c_str())); 856 } 857 status_t err = mHTTPDataSource->connect(url, &headers); 858 859 if (err != OK) { 860 return err; 861 } 862 863 *source = mHTTPDataSource; 864 } 865 } 866 867 status_t getSizeErr = (*source)->getSize(&size); 868 if (getSizeErr != OK) { 869 size = 65536; 870 } 871 872 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 873 if (*out == NULL) { 874 buffer->setRange(0, 0); 875 } 876 877 ssize_t bytesRead = 0; 878 // adjust range_length if only reading partial block 879 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 880 range_length = buffer->size() + block_size; 881 } 882 for (;;) { 883 // Only resize when we don't know the size. 884 size_t bufferRemaining = buffer->capacity() - buffer->size(); 885 if (bufferRemaining == 0 && getSizeErr != OK) { 886 size_t bufferIncrement = buffer->size() / 2; 887 if (bufferIncrement < 32768) { 888 bufferIncrement = 32768; 889 } 890 bufferRemaining = bufferIncrement; 891 892 ALOGV("increasing download buffer to %zu bytes", 893 buffer->size() + bufferRemaining); 894 895 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 896 memcpy(copy->data(), buffer->data(), buffer->size()); 897 copy->setRange(0, buffer->size()); 898 899 buffer = copy; 900 } 901 902 size_t maxBytesToRead = bufferRemaining; 903 if (range_length >= 0) { 904 int64_t bytesLeftInRange = range_length - buffer->size(); 905 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 906 maxBytesToRead = bytesLeftInRange; 907 908 if (bytesLeftInRange == 0) { 909 break; 910 } 911 } 912 } 913 914 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 915 // to help us break out of the loop. 916 ssize_t n = (*source)->readAt( 917 buffer->size(), buffer->data() + buffer->size(), 918 maxBytesToRead); 919 920 if (n < 0) { 921 return n; 922 } 923 924 if (n == 0) { 925 break; 926 } 927 928 buffer->setRange(0, buffer->size() + (size_t)n); 929 bytesRead += n; 930 } 931 932 *out = buffer; 933 if (actualUrl != NULL) { 934 *actualUrl = (*source)->getUri(); 935 if (actualUrl->isEmpty()) { 936 *actualUrl = url; 937 } 938 } 939 940 return bytesRead; 941} 942 943sp<M3UParser> LiveSession::fetchPlaylist( 944 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 945 ALOGV("fetchPlaylist '%s'", url); 946 947 *unchanged = false; 948 949 sp<ABuffer> buffer; 950 String8 actualUrl; 951 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 952 953 if (err <= 0) { 954 return NULL; 955 } 956 957 // MD5 functionality is not available on the simulator, treat all 958 // playlists as changed. 959 960#if defined(HAVE_ANDROID_OS) 961 uint8_t hash[16]; 962 963 MD5_CTX m; 964 MD5_Init(&m); 965 MD5_Update(&m, buffer->data(), buffer->size()); 966 967 MD5_Final(hash, &m); 968 969 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 970 // playlist unchanged 971 *unchanged = true; 972 973 return NULL; 974 } 975 976 if (curPlaylistHash != NULL) { 977 memcpy(curPlaylistHash, hash, sizeof(hash)); 978 } 979#endif 980 981 sp<M3UParser> playlist = 982 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 983 984 if (playlist->initCheck() != OK) { 985 ALOGE("failed to parse .m3u8 playlist"); 986 987 return NULL; 988 } 989 990 return playlist; 991} 992 993static double uniformRand() { 994 return (double)rand() / RAND_MAX; 995} 996 997size_t LiveSession::getBandwidthIndex() { 998 if (mBandwidthItems.size() == 0) { 999 return 0; 1000 } 1001 1002#if 1 1003 char value[PROPERTY_VALUE_MAX]; 1004 ssize_t index = -1; 1005 if (property_get("media.httplive.bw-index", value, NULL)) { 1006 char *end; 1007 index = strtol(value, &end, 10); 1008 CHECK(end > value && *end == '\0'); 1009 1010 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 1011 index = mBandwidthItems.size() - 1; 1012 } 1013 } 1014 1015 if (index < 0) { 1016 int32_t bandwidthBps; 1017 if (mHTTPDataSource != NULL 1018 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 1019 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 1020 } else { 1021 ALOGV("no bandwidth estimate."); 1022 return 0; // Pick the lowest bandwidth stream by default. 1023 } 1024 1025 char value[PROPERTY_VALUE_MAX]; 1026 if (property_get("media.httplive.max-bw", value, NULL)) { 1027 char *end; 1028 long maxBw = strtoul(value, &end, 10); 1029 if (end > value && *end == '\0') { 1030 if (maxBw > 0 && bandwidthBps > maxBw) { 1031 ALOGV("bandwidth capped to %ld bps", maxBw); 1032 bandwidthBps = maxBw; 1033 } 1034 } 1035 } 1036 1037 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1038 1039 index = mBandwidthItems.size() - 1; 1040 while (index > 0) { 1041 // consider only 80% of the available bandwidth, but if we are switching up, 1042 // be even more conservative (70%) to avoid overestimating and immediately 1043 // switching back. 1044 size_t adjustedBandwidthBps = bandwidthBps; 1045 if (index > mCurBandwidthIndex) { 1046 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 1047 } else { 1048 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 1049 } 1050 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1051 break; 1052 } 1053 --index; 1054 } 1055 } 1056#elif 0 1057 // Change bandwidth at random() 1058 size_t index = uniformRand() * mBandwidthItems.size(); 1059#elif 0 1060 // There's a 50% chance to stay on the current bandwidth and 1061 // a 50% chance to switch to the next higher bandwidth (wrapping around 1062 // to lowest) 1063 const size_t kMinIndex = 0; 1064 1065 static ssize_t mCurBandwidthIndex = -1; 1066 1067 size_t index; 1068 if (mCurBandwidthIndex < 0) { 1069 index = kMinIndex; 1070 } else if (uniformRand() < 0.5) { 1071 index = (size_t)mCurBandwidthIndex; 1072 } else { 1073 index = mCurBandwidthIndex + 1; 1074 if (index == mBandwidthItems.size()) { 1075 index = kMinIndex; 1076 } 1077 } 1078 mCurBandwidthIndex = index; 1079#elif 0 1080 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1081 1082 size_t index = mBandwidthItems.size() - 1; 1083 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1084 --index; 1085 } 1086#elif 1 1087 char value[PROPERTY_VALUE_MAX]; 1088 size_t index; 1089 if (property_get("media.httplive.bw-index", value, NULL)) { 1090 char *end; 1091 index = strtoul(value, &end, 10); 1092 CHECK(end > value && *end == '\0'); 1093 1094 if (index >= mBandwidthItems.size()) { 1095 index = mBandwidthItems.size() - 1; 1096 } 1097 } else { 1098 index = 0; 1099 } 1100#else 1101 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1102#endif 1103 1104 CHECK_GE(index, 0); 1105 1106 return index; 1107} 1108 1109int64_t LiveSession::latestMediaSegmentStartTimeUs() { 1110 sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta(); 1111 int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1; 1112 if (audioMeta != NULL) { 1113 audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs); 1114 } 1115 1116 sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta(); 1117 if (videoMeta != NULL 1118 && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) { 1119 if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) { 1120 minSegmentStartTimeUs = videoSegmentStartTimeUs; 1121 } 1122 1123 } 1124 return minSegmentStartTimeUs; 1125} 1126 1127status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1128 int64_t timeUs; 1129 CHECK(msg->findInt64("timeUs", &timeUs)); 1130 1131 if (!mReconfigurationInProgress) { 1132 changeConfiguration(timeUs, mCurBandwidthIndex); 1133 return OK; 1134 } else { 1135 return -EWOULDBLOCK; 1136 } 1137} 1138 1139status_t LiveSession::getDuration(int64_t *durationUs) const { 1140 int64_t maxDurationUs = -1ll; 1141 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1142 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1143 1144 if (fetcherDurationUs > maxDurationUs) { 1145 maxDurationUs = fetcherDurationUs; 1146 } 1147 } 1148 1149 *durationUs = maxDurationUs; 1150 1151 return OK; 1152} 1153 1154bool LiveSession::isSeekable() const { 1155 int64_t durationUs; 1156 return getDuration(&durationUs) == OK && durationUs >= 0; 1157} 1158 1159bool LiveSession::hasDynamicDuration() const { 1160 return false; 1161} 1162 1163size_t LiveSession::getTrackCount() const { 1164 if (mPlaylist == NULL) { 1165 return 0; 1166 } else { 1167 return mPlaylist->getTrackCount(); 1168 } 1169} 1170 1171sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1172 if (mPlaylist == NULL) { 1173 return NULL; 1174 } else { 1175 return mPlaylist->getTrackInfo(trackIndex); 1176 } 1177} 1178 1179status_t LiveSession::selectTrack(size_t index, bool select) { 1180 if (mPlaylist == NULL) { 1181 return INVALID_OPERATION; 1182 } 1183 1184 ++mSubtitleGeneration; 1185 status_t err = mPlaylist->selectTrack(index, select); 1186 if (err == OK) { 1187 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1188 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1189 msg->setInt32("pickTrack", select); 1190 msg->post(); 1191 } 1192 return err; 1193} 1194 1195ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1196 if (mPlaylist == NULL) { 1197 return -1; 1198 } else { 1199 return mPlaylist->getSelectedTrack(type); 1200 } 1201} 1202 1203bool LiveSession::canSwitchUp() { 1204 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1205 status_t err = OK; 1206 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1207 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1208 int64_t dur = source->getBufferedDurationUs(&err); 1209 if (err == OK && dur > 10000000) { 1210 return true; 1211 } 1212 } 1213 return false; 1214} 1215 1216void LiveSession::changeConfiguration( 1217 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1218 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1219 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1220 cancelBandwidthSwitch(); 1221 1222 CHECK(!mReconfigurationInProgress); 1223 mReconfigurationInProgress = true; 1224 1225 mCurBandwidthIndex = bandwidthIndex; 1226 1227 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1228 timeUs, bandwidthIndex, pickTrack); 1229 1230 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1231 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1232 1233 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1234 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1235 1236 AString URIs[kMaxStreams]; 1237 for (size_t i = 0; i < kMaxStreams; ++i) { 1238 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1239 streamMask |= indexToType(i); 1240 } 1241 } 1242 1243 // Step 1, stop and discard fetchers that are no longer needed. 1244 // Pause those that we'll reuse. 1245 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1246 const AString &uri = mFetcherInfos.keyAt(i); 1247 1248 bool discardFetcher = true; 1249 1250 // If we're seeking all current fetchers are discarded. 1251 if (timeUs < 0ll) { 1252 // delay fetcher removal if not picking tracks 1253 discardFetcher = pickTrack; 1254 1255 for (size_t j = 0; j < kMaxStreams; ++j) { 1256 StreamType type = indexToType(j); 1257 if ((streamMask & type) && uri == URIs[j]) { 1258 resumeMask |= type; 1259 streamMask &= ~type; 1260 discardFetcher = false; 1261 } 1262 } 1263 } 1264 1265 if (discardFetcher) { 1266 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1267 } else { 1268 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1269 } 1270 } 1271 1272 sp<AMessage> msg; 1273 if (timeUs < 0ll) { 1274 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1275 msg = new AMessage(kWhatChangeConfiguration3, id()); 1276 } else { 1277 msg = new AMessage(kWhatChangeConfiguration2, id()); 1278 } 1279 msg->setInt32("streamMask", streamMask); 1280 msg->setInt32("resumeMask", resumeMask); 1281 msg->setInt32("pickTrack", pickTrack); 1282 msg->setInt64("timeUs", timeUs); 1283 for (size_t i = 0; i < kMaxStreams; ++i) { 1284 if ((streamMask | resumeMask) & indexToType(i)) { 1285 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1286 } 1287 } 1288 1289 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1290 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1291 // fetchers have completed their asynchronous operation, we'll post 1292 // mContinuation, which then is handled below in onChangeConfiguration2. 1293 mContinuationCounter = mFetcherInfos.size(); 1294 mContinuation = msg; 1295 1296 if (mContinuationCounter == 0) { 1297 msg->post(); 1298 1299 if (mSeekReplyID != 0) { 1300 CHECK(mSeekReply != NULL); 1301 mSeekReply->setInt32("err", OK); 1302 mSeekReply->postReply(mSeekReplyID); 1303 mSeekReplyID = 0; 1304 mSeekReply.clear(); 1305 } 1306 } 1307} 1308 1309void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1310 if (!mReconfigurationInProgress) { 1311 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1312 msg->findInt32("pickTrack", &pickTrack); 1313 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1314 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1315 } else { 1316 msg->post(1000000ll); // retry in 1 sec 1317 } 1318} 1319 1320void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1321 mContinuation.clear(); 1322 1323 // All fetchers are either suspended or have been removed now. 1324 1325 uint32_t streamMask, resumeMask; 1326 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1327 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1328 1329 // currently onChangeConfiguration2 is only called for seeking; 1330 // remove the following CHECK if using it else where. 1331 CHECK_EQ(resumeMask, 0); 1332 streamMask |= resumeMask; 1333 1334 AString URIs[kMaxStreams]; 1335 for (size_t i = 0; i < kMaxStreams; ++i) { 1336 if (streamMask & indexToType(i)) { 1337 const AString &uriKey = mStreams[i].uriKey(); 1338 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1339 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1340 } 1341 } 1342 1343 // Determine which decoders to shutdown on the player side, 1344 // a decoder has to be shutdown if either 1345 // 1) its streamtype was active before but now longer isn't. 1346 // or 1347 // 2) its streamtype was already active and still is but the URI 1348 // has changed. 1349 uint32_t changedMask = 0; 1350 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1351 if (((mStreamMask & streamMask & indexToType(i)) 1352 && !(URIs[i] == mStreams[i].mUri)) 1353 || (mStreamMask & ~streamMask & indexToType(i))) { 1354 changedMask |= indexToType(i); 1355 } 1356 } 1357 1358 if (changedMask == 0) { 1359 // If nothing changed as far as the audio/video decoders 1360 // are concerned we can proceed. 1361 onChangeConfiguration3(msg); 1362 return; 1363 } 1364 1365 // Something changed, inform the player which will shutdown the 1366 // corresponding decoders and will post the reply once that's done. 1367 // Handling the reply will continue executing below in 1368 // onChangeConfiguration3. 1369 sp<AMessage> notify = mNotify->dup(); 1370 notify->setInt32("what", kWhatStreamsChanged); 1371 notify->setInt32("changedMask", changedMask); 1372 1373 msg->setWhat(kWhatChangeConfiguration3); 1374 msg->setTarget(id()); 1375 1376 notify->setMessage("reply", msg); 1377 notify->post(); 1378} 1379 1380void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1381 mContinuation.clear(); 1382 // All remaining fetchers are still suspended, the player has shutdown 1383 // any decoders that needed it. 1384 1385 uint32_t streamMask, resumeMask; 1386 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1387 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1388 1389 int64_t timeUs; 1390 int32_t pickTrack; 1391 bool switching = false; 1392 CHECK(msg->findInt64("timeUs", &timeUs)); 1393 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1394 1395 if (timeUs < 0ll) { 1396 if (!pickTrack) { 1397 switching = true; 1398 } 1399 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1400 } else { 1401 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1402 } 1403 1404 for (size_t i = 0; i < kMaxStreams; ++i) { 1405 if (streamMask & indexToType(i)) { 1406 if (switching) { 1407 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1408 } else { 1409 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1410 } 1411 } 1412 } 1413 1414 mNewStreamMask = streamMask | resumeMask; 1415 if (switching) { 1416 mSwapMask = mStreamMask & ~resumeMask; 1417 } 1418 1419 // Of all existing fetchers: 1420 // * Resume fetchers that are still needed and assign them original packet sources. 1421 // * Mark otherwise unneeded fetchers for removal. 1422 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1423 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1424 const AString &uri = mFetcherInfos.keyAt(i); 1425 1426 sp<AnotherPacketSource> sources[kMaxStreams]; 1427 for (size_t j = 0; j < kMaxStreams; ++j) { 1428 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1429 sources[j] = mPacketSources.valueFor(indexToType(j)); 1430 1431 if (j != kSubtitleIndex) { 1432 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1433 sp<AnotherPacketSource> discontinuityQueue; 1434 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1435 discontinuityQueue->queueDiscontinuity( 1436 ATSParser::DISCONTINUITY_NONE, 1437 NULL, 1438 true); 1439 } 1440 } 1441 } 1442 1443 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1444 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1445 || sources[kSubtitleIndex] != NULL) { 1446 info.mFetcher->startAsync( 1447 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1448 } else { 1449 info.mToBeRemoved = true; 1450 } 1451 } 1452 1453 // streamMask now only contains the types that need a new fetcher created. 1454 1455 if (streamMask != 0) { 1456 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1457 } 1458 1459 // Find out when the original fetchers have buffered up to and start the new fetchers 1460 // at a later timestamp. 1461 for (size_t i = 0; i < kMaxStreams; i++) { 1462 if (!(indexToType(i) & streamMask)) { 1463 continue; 1464 } 1465 1466 AString uri; 1467 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1468 1469 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1470 CHECK(fetcher != NULL); 1471 1472 int32_t latestSeq = -1; 1473 int64_t startTimeUs = -1; 1474 int64_t segmentStartTimeUs = -1ll; 1475 int32_t discontinuitySeq = -1; 1476 sp<AnotherPacketSource> sources[kMaxStreams]; 1477 1478 if (i == kSubtitleIndex) { 1479 segmentStartTimeUs = latestMediaSegmentStartTimeUs(); 1480 } 1481 1482 // TRICKY: looping from i as earlier streams are already removed from streamMask 1483 for (size_t j = i; j < kMaxStreams; ++j) { 1484 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1485 if ((streamMask & indexToType(j)) && uri == streamUri) { 1486 sources[j] = mPacketSources.valueFor(indexToType(j)); 1487 1488 if (timeUs >= 0) { 1489 sources[j]->clear(); 1490 startTimeUs = timeUs; 1491 1492 sp<AnotherPacketSource> discontinuityQueue; 1493 sp<AMessage> extra = new AMessage; 1494 extra->setInt64("timeUs", timeUs); 1495 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1496 discontinuityQueue->queueDiscontinuity( 1497 ATSParser::DISCONTINUITY_TIME, extra, true); 1498 } else { 1499 int32_t type; 1500 int64_t srcSegmentStartTimeUs; 1501 sp<AMessage> meta; 1502 if (pickTrack) { 1503 // selecting 1504 meta = sources[j]->getLatestDequeuedMeta(); 1505 } else { 1506 // adapting 1507 meta = sources[j]->getLatestEnqueuedMeta(); 1508 } 1509 1510 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1511 int64_t tmpUs; 1512 CHECK(meta->findInt64("timeUs", &tmpUs)); 1513 if (startTimeUs < 0 || tmpUs < startTimeUs) { 1514 startTimeUs = tmpUs; 1515 } 1516 1517 CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); 1518 if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { 1519 segmentStartTimeUs = tmpUs; 1520 } 1521 1522 int32_t seq; 1523 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1524 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1525 discontinuitySeq = seq; 1526 } 1527 } 1528 1529 if (pickTrack) { 1530 // selecting track, queue discontinuities before content 1531 sources[j]->clear(); 1532 if (j == kSubtitleIndex) { 1533 break; 1534 } 1535 sp<AnotherPacketSource> discontinuityQueue; 1536 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1537 discontinuityQueue->queueDiscontinuity( 1538 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1539 } else { 1540 // adapting, queue discontinuities after resume 1541 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1542 sources[j]->clear(); 1543 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1544 if (extraStreams & indexToType(j)) { 1545 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1546 } 1547 } 1548 } 1549 1550 streamMask &= ~indexToType(j); 1551 } 1552 } 1553 1554 fetcher->startAsync( 1555 sources[kAudioIndex], 1556 sources[kVideoIndex], 1557 sources[kSubtitleIndex], 1558 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1559 segmentStartTimeUs, 1560 discontinuitySeq, 1561 switching); 1562 } 1563 1564 // All fetchers have now been started, the configuration change 1565 // has completed. 1566 1567 cancelCheckBandwidthEvent(); 1568 scheduleCheckBandwidthEvent(); 1569 1570 ALOGV("XXX configuration change completed."); 1571 mReconfigurationInProgress = false; 1572 if (switching) { 1573 mSwitchInProgress = true; 1574 } else { 1575 mStreamMask = mNewStreamMask; 1576 } 1577 1578 if (mDisconnectReplyID != 0) { 1579 finishDisconnect(); 1580 } 1581} 1582 1583void LiveSession::onSwapped(const sp<AMessage> &msg) { 1584 int32_t switchGeneration; 1585 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1586 if (switchGeneration != mSwitchGeneration) { 1587 return; 1588 } 1589 1590 int32_t stream; 1591 CHECK(msg->findInt32("stream", &stream)); 1592 1593 ssize_t idx = typeToIndex(stream); 1594 CHECK(idx >= 0); 1595 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1596 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1597 } 1598 mStreams[idx].mUri = mStreams[idx].mNewUri; 1599 mStreams[idx].mNewUri.clear(); 1600 1601 mSwapMask &= ~stream; 1602 if (mSwapMask != 0) { 1603 return; 1604 } 1605 1606 // Check if new variant contains extra streams. 1607 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1608 while (extraStreams) { 1609 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1610 swapPacketSource(extraStream); 1611 extraStreams &= ~extraStream; 1612 1613 idx = typeToIndex(extraStream); 1614 CHECK(idx >= 0); 1615 if (mStreams[idx].mNewUri.empty()) { 1616 ALOGW("swapping extra stream type %d %s to empty stream", 1617 extraStream, mStreams[idx].mUri.c_str()); 1618 } 1619 mStreams[idx].mUri = mStreams[idx].mNewUri; 1620 mStreams[idx].mNewUri.clear(); 1621 } 1622 1623 tryToFinishBandwidthSwitch(); 1624} 1625 1626void LiveSession::onCheckSwitchDown() { 1627 if (mSwitchDownMonitor == NULL) { 1628 return; 1629 } 1630 1631 if (mSwitchInProgress || mReconfigurationInProgress) { 1632 ALOGV("Switch/Reconfig in progress, defer switch down"); 1633 mSwitchDownMonitor->post(1000000ll); 1634 return; 1635 } 1636 1637 for (size_t i = 0; i < kMaxStreams; ++i) { 1638 int32_t targetDuration; 1639 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1640 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1641 1642 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1643 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1644 int64_t targetDurationUs = targetDuration * 1000000ll; 1645 1646 if (bufferedDurationUs < targetDurationUs / 3) { 1647 (new AMessage(kWhatSwitchDown, id()))->post(); 1648 break; 1649 } 1650 } 1651 } 1652 1653 mSwitchDownMonitor->post(1000000ll); 1654} 1655 1656void LiveSession::onSwitchDown() { 1657 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1658 return; 1659 } 1660 1661 ssize_t bandwidthIndex = getBandwidthIndex(); 1662 if (bandwidthIndex < mCurBandwidthIndex) { 1663 changeConfiguration(-1, bandwidthIndex, false); 1664 return; 1665 } 1666 1667} 1668 1669// Mark switch done when: 1670// 1. all old buffers are swapped out 1671void LiveSession::tryToFinishBandwidthSwitch() { 1672 if (!mSwitchInProgress) { 1673 return; 1674 } 1675 1676 bool needToRemoveFetchers = false; 1677 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1678 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1679 needToRemoveFetchers = true; 1680 break; 1681 } 1682 } 1683 1684 if (!needToRemoveFetchers && mSwapMask == 0) { 1685 ALOGI("mSwitchInProgress = false"); 1686 mStreamMask = mNewStreamMask; 1687 mSwitchInProgress = false; 1688 } 1689} 1690 1691void LiveSession::scheduleCheckBandwidthEvent() { 1692 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1693 msg->setInt32("generation", mCheckBandwidthGeneration); 1694 msg->post(10000000ll); 1695} 1696 1697void LiveSession::cancelCheckBandwidthEvent() { 1698 ++mCheckBandwidthGeneration; 1699} 1700 1701void LiveSession::cancelBandwidthSwitch() { 1702 Mutex::Autolock lock(mSwapMutex); 1703 mSwitchGeneration++; 1704 mSwitchInProgress = false; 1705 mSwapMask = 0; 1706 1707 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1708 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1709 if (info.mToBeRemoved) { 1710 info.mToBeRemoved = false; 1711 } 1712 } 1713 1714 for (size_t i = 0; i < kMaxStreams; ++i) { 1715 if (!mStreams[i].mNewUri.empty()) { 1716 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1717 if (j < 0) { 1718 mStreams[i].mNewUri.clear(); 1719 continue; 1720 } 1721 1722 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1723 info.mFetcher->stopAsync(); 1724 mFetcherInfos.removeItemsAt(j); 1725 mStreams[i].mNewUri.clear(); 1726 } 1727 } 1728} 1729 1730bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1731 if (mReconfigurationInProgress || mSwitchInProgress) { 1732 return false; 1733 } 1734 1735 if (mCurBandwidthIndex < 0) { 1736 return true; 1737 } 1738 1739 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1740 return false; 1741 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1742 return canSwitchUp(); 1743 } else { 1744 return true; 1745 } 1746} 1747 1748void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { 1749 size_t bandwidthIndex = getBandwidthIndex(); 1750 if (canSwitchBandwidthTo(bandwidthIndex)) { 1751 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1752 } else { 1753 // Come back and check again 10 seconds later in case there is nothing to do now. 1754 // If we DO change configuration, once that completes it'll schedule a new 1755 // check bandwidth event with an incremented mCheckBandwidthGeneration. 1756 msg->post(10000000ll); 1757 } 1758} 1759 1760void LiveSession::postPrepared(status_t err) { 1761 CHECK(mInPreparationPhase); 1762 1763 sp<AMessage> notify = mNotify->dup(); 1764 if (err == OK || err == ERROR_END_OF_STREAM) { 1765 notify->setInt32("what", kWhatPrepared); 1766 } else { 1767 notify->setInt32("what", kWhatPreparationFailed); 1768 notify->setInt32("err", err); 1769 } 1770 1771 notify->post(); 1772 1773 mInPreparationPhase = false; 1774 1775 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1776 mSwitchDownMonitor->post(); 1777} 1778 1779} // namespace android 1780 1781