LiveSession.cpp revision 174609765fb9c8cbd6aeb61f489746c3570bfee2
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52LiveSession::LiveSession( 53 const sp<AMessage> ¬ify, uint32_t flags, 54 const sp<IMediaHTTPService> &httpService) 55 : mNotify(notify), 56 mFlags(flags), 57 mHTTPService(httpService), 58 mInPreparationPhase(true), 59 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 60 mCurBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mSubtitleGeneration(0), 67 mLastDequeuedTimeUs(0ll), 68 mRealTimeBaseUs(0ll), 69 mReconfigurationInProgress(false), 70 mSwitchInProgress(false), 71 mDisconnectReplyID(0), 72 mSeekReplyID(0), 73 mFirstTimeUsValid(false), 74 mFirstTimeUs(0), 75 mLastSeekTimeUs(0) { 76 77 mStreams[kAudioIndex] = StreamItem("audio"); 78 mStreams[kVideoIndex] = StreamItem("video"); 79 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 80 81 for (size_t i = 0; i < kMaxStreams; ++i) { 82 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 84 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 85 mBuffering[i] = false; 86 } 87} 88 89LiveSession::~LiveSession() { 90} 91 92sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 93 ABuffer *discontinuity = new ABuffer(0); 94 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 95 discontinuity->meta()->setInt32("swapPacketSource", swap); 96 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 97 discontinuity->meta()->setInt64("timeUs", -1); 98 return discontinuity; 99} 100 101void LiveSession::swapPacketSource(StreamType stream) { 102 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 103 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 104 sp<AnotherPacketSource> tmp = aps; 105 aps = aps2; 106 aps2 = tmp; 107 aps2->clear(); 108} 109 110status_t LiveSession::dequeueAccessUnit( 111 StreamType stream, sp<ABuffer> *accessUnit) { 112 if (!(mStreamMask & stream)) { 113 // return -EWOULDBLOCK to avoid halting the decoder 114 // when switching between audio/video and audio only. 115 return -EWOULDBLOCK; 116 } 117 118 status_t finalResult; 119 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 120 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 121 discontinuityQueue->dequeueAccessUnit(accessUnit); 122 // seeking, track switching 123 sp<AMessage> extra; 124 int64_t timeUs; 125 if ((*accessUnit)->meta()->findMessage("extra", &extra) 126 && extra != NULL 127 && extra->findInt64("timeUs", &timeUs)) { 128 // seeking only 129 mLastSeekTimeUs = timeUs; 130 mDiscontinuityOffsetTimesUs.clear(); 131 mDiscontinuityAbsStartTimesUs.clear(); 132 } 133 return INFO_DISCONTINUITY; 134 } 135 136 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 137 138 ssize_t idx = typeToIndex(stream); 139 if (!packetSource->hasBufferAvailable(&finalResult)) { 140 if (finalResult == OK) { 141 mBuffering[idx] = true; 142 return -EAGAIN; 143 } else { 144 return finalResult; 145 } 146 } 147 148 int32_t targetDuration = 0; 149 sp<AMessage> meta = packetSource->getLatestEnqueuedMeta(); 150 if (meta != NULL) { 151 meta->findInt32("targetDuration", &targetDuration); 152 } 153 154 int64_t targetDurationUs = targetDuration * 1000000ll; 155 if (targetDurationUs == 0 || 156 targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) { 157 // Fetchers limit buffering to 158 // min(3 * targetDuration, kMinBufferedDurationUs) 159 targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs; 160 } 161 162 if (mBuffering[idx]) { 163 if (mSwitchInProgress 164 || packetSource->isFinished(0) 165 || packetSource->getEstimatedDurationUs() > targetDurationUs) { 166 mBuffering[idx] = false; 167 } 168 } 169 170 if (mBuffering[idx]) { 171 return -EAGAIN; 172 } 173 174 // wait for counterpart 175 sp<AnotherPacketSource> otherSource; 176 uint32_t mask = mNewStreamMask & mStreamMask; 177 uint32_t fetchersMask = 0; 178 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 179 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 180 fetchersMask |= fetcherMask; 181 } 182 mask &= fetchersMask; 183 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 184 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 185 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 186 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 187 } 188 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 189 return finalResult == OK ? -EAGAIN : finalResult; 190 } 191 192 status_t err = packetSource->dequeueAccessUnit(accessUnit); 193 194 size_t streamIdx; 195 const char *streamStr; 196 switch (stream) { 197 case STREAMTYPE_AUDIO: 198 streamIdx = kAudioIndex; 199 streamStr = "audio"; 200 break; 201 case STREAMTYPE_VIDEO: 202 streamIdx = kVideoIndex; 203 streamStr = "video"; 204 break; 205 case STREAMTYPE_SUBTITLES: 206 streamIdx = kSubtitleIndex; 207 streamStr = "subs"; 208 break; 209 default: 210 TRESPASS(); 211 } 212 213 StreamItem& strm = mStreams[streamIdx]; 214 if (err == INFO_DISCONTINUITY) { 215 // adaptive streaming, discontinuities in the playlist 216 int32_t type; 217 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 218 219 sp<AMessage> extra; 220 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 221 extra.clear(); 222 } 223 224 ALOGI("[%s] read discontinuity of type %d, extra = %s", 225 streamStr, 226 type, 227 extra == NULL ? "NULL" : extra->debugString().c_str()); 228 229 int32_t swap; 230 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 231 int32_t switchGeneration; 232 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 233 { 234 Mutex::Autolock lock(mSwapMutex); 235 if (switchGeneration == mSwitchGeneration) { 236 swapPacketSource(stream); 237 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 238 msg->setInt32("stream", stream); 239 msg->setInt32("switchGeneration", switchGeneration); 240 msg->post(); 241 } 242 } 243 } else { 244 size_t seq = strm.mCurDiscontinuitySeq; 245 int64_t offsetTimeUs; 246 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 247 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 248 } else { 249 offsetTimeUs = 0; 250 } 251 252 seq += 1; 253 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 254 int64_t firstTimeUs; 255 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 256 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 257 offsetTimeUs += strm.mLastSampleDurationUs; 258 } else { 259 offsetTimeUs += strm.mLastSampleDurationUs; 260 } 261 262 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 263 } 264 } else if (err == OK) { 265 266 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 267 int64_t timeUs; 268 int32_t discontinuitySeq = 0; 269 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 270 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 271 strm.mCurDiscontinuitySeq = discontinuitySeq; 272 273 int32_t discard = 0; 274 int64_t firstTimeUs; 275 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 276 int64_t durUs; // approximate sample duration 277 if (timeUs > strm.mLastDequeuedTimeUs) { 278 durUs = timeUs - strm.mLastDequeuedTimeUs; 279 } else { 280 durUs = strm.mLastDequeuedTimeUs - timeUs; 281 } 282 strm.mLastSampleDurationUs = durUs; 283 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 284 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 285 firstTimeUs = timeUs; 286 } else { 287 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 288 firstTimeUs = timeUs; 289 } 290 291 strm.mLastDequeuedTimeUs = timeUs; 292 if (timeUs >= firstTimeUs) { 293 timeUs -= firstTimeUs; 294 } else { 295 timeUs = 0; 296 } 297 timeUs += mLastSeekTimeUs; 298 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 299 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 300 } 301 302 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 303 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 304 mLastDequeuedTimeUs = timeUs; 305 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 306 } else if (stream == STREAMTYPE_SUBTITLES) { 307 int32_t subtitleGeneration; 308 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 309 && subtitleGeneration != mSubtitleGeneration) { 310 return -EAGAIN; 311 }; 312 (*accessUnit)->meta()->setInt32( 313 "trackIndex", mPlaylist->getSelectedIndex()); 314 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 315 } 316 } else { 317 ALOGI("[%s] encountered error %d", streamStr, err); 318 } 319 320 return err; 321} 322 323status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 324 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 325 if (!(mStreamMask & stream)) { 326 return UNKNOWN_ERROR; 327 } 328 329 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 330 331 sp<MetaData> meta = packetSource->getFormat(); 332 333 if (meta == NULL) { 334 return -EAGAIN; 335 } 336 337 return convertMetaDataToMessage(meta, format); 338} 339 340void LiveSession::connectAsync( 341 const char *url, const KeyedVector<String8, String8> *headers) { 342 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 343 msg->setString("url", url); 344 345 if (headers != NULL) { 346 msg->setPointer( 347 "headers", 348 new KeyedVector<String8, String8>(*headers)); 349 } 350 351 msg->post(); 352} 353 354status_t LiveSession::disconnect() { 355 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 356 357 sp<AMessage> response; 358 status_t err = msg->postAndAwaitResponse(&response); 359 360 return err; 361} 362 363status_t LiveSession::seekTo(int64_t timeUs) { 364 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 365 msg->setInt64("timeUs", timeUs); 366 367 sp<AMessage> response; 368 status_t err = msg->postAndAwaitResponse(&response); 369 370 return err; 371} 372 373void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 374 switch (msg->what()) { 375 case kWhatConnect: 376 { 377 onConnect(msg); 378 break; 379 } 380 381 case kWhatDisconnect: 382 { 383 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 384 385 if (mReconfigurationInProgress) { 386 break; 387 } 388 389 finishDisconnect(); 390 break; 391 } 392 393 case kWhatSeek: 394 { 395 uint32_t seekReplyID; 396 CHECK(msg->senderAwaitsResponse(&seekReplyID)); 397 mSeekReplyID = seekReplyID; 398 mSeekReply = new AMessage; 399 400 status_t err = onSeek(msg); 401 402 if (err != OK) { 403 msg->post(50000); 404 } 405 break; 406 } 407 408 case kWhatFetcherNotify: 409 { 410 int32_t what; 411 CHECK(msg->findInt32("what", &what)); 412 413 switch (what) { 414 case PlaylistFetcher::kWhatStarted: 415 break; 416 case PlaylistFetcher::kWhatPaused: 417 case PlaylistFetcher::kWhatStopped: 418 { 419 if (what == PlaylistFetcher::kWhatStopped) { 420 AString uri; 421 CHECK(msg->findString("uri", &uri)); 422 if (mFetcherInfos.removeItem(uri) < 0) { 423 // ignore duplicated kWhatStopped messages. 424 break; 425 } 426 427 if (mSwitchInProgress) { 428 tryToFinishBandwidthSwitch(); 429 } 430 } 431 432 if (mContinuation != NULL) { 433 CHECK_GT(mContinuationCounter, 0); 434 if (--mContinuationCounter == 0) { 435 mContinuation->post(); 436 437 if (mSeekReplyID != 0) { 438 CHECK(mSeekReply != NULL); 439 mSeekReply->setInt32("err", OK); 440 mSeekReply->postReply(mSeekReplyID); 441 mSeekReplyID = 0; 442 mSeekReply.clear(); 443 } 444 } 445 } 446 break; 447 } 448 449 case PlaylistFetcher::kWhatDurationUpdate: 450 { 451 AString uri; 452 CHECK(msg->findString("uri", &uri)); 453 454 int64_t durationUs; 455 CHECK(msg->findInt64("durationUs", &durationUs)); 456 457 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 458 info->mDurationUs = durationUs; 459 break; 460 } 461 462 case PlaylistFetcher::kWhatError: 463 { 464 status_t err; 465 CHECK(msg->findInt32("err", &err)); 466 467 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 468 469 // handle EOS on subtitle tracks independently 470 AString uri; 471 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 472 ssize_t i = mFetcherInfos.indexOfKey(uri); 473 if (i >= 0) { 474 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 475 if (fetcher != NULL) { 476 uint32_t type = fetcher->getStreamTypeMask(); 477 if (type == STREAMTYPE_SUBTITLES) { 478 mPacketSources.valueFor( 479 STREAMTYPE_SUBTITLES)->signalEOS(err);; 480 break; 481 } 482 } 483 } 484 } 485 486 if (mInPreparationPhase) { 487 postPrepared(err); 488 } 489 490 cancelBandwidthSwitch(); 491 492 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 493 494 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 495 496 mPacketSources.valueFor( 497 STREAMTYPE_SUBTITLES)->signalEOS(err); 498 499 sp<AMessage> notify = mNotify->dup(); 500 notify->setInt32("what", kWhatError); 501 notify->setInt32("err", err); 502 notify->post(); 503 break; 504 } 505 506 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 507 { 508 AString uri; 509 CHECK(msg->findString("uri", &uri)); 510 511 if (mFetcherInfos.indexOfKey(uri) < 0) { 512 ALOGE("couldn't find uri"); 513 break; 514 } 515 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 516 info->mIsPrepared = true; 517 518 if (mInPreparationPhase) { 519 bool allFetchersPrepared = true; 520 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 521 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 522 allFetchersPrepared = false; 523 break; 524 } 525 } 526 527 if (allFetchersPrepared) { 528 postPrepared(OK); 529 } 530 } 531 break; 532 } 533 534 case PlaylistFetcher::kWhatStartedAt: 535 { 536 int32_t switchGeneration; 537 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 538 539 if (switchGeneration != mSwitchGeneration) { 540 break; 541 } 542 543 // Resume fetcher for the original variant; the resumed fetcher should 544 // continue until the timestamps found in msg, which is stored by the 545 // new fetcher to indicate where the new variant has started buffering. 546 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 547 const FetcherInfo info = mFetcherInfos.valueAt(i); 548 if (info.mToBeRemoved) { 549 info.mFetcher->resumeUntilAsync(msg); 550 } 551 } 552 break; 553 } 554 555 default: 556 TRESPASS(); 557 } 558 559 break; 560 } 561 562 case kWhatCheckBandwidth: 563 { 564 int32_t generation; 565 CHECK(msg->findInt32("generation", &generation)); 566 567 if (generation != mCheckBandwidthGeneration) { 568 break; 569 } 570 571 onCheckBandwidth(msg); 572 break; 573 } 574 575 case kWhatChangeConfiguration: 576 { 577 onChangeConfiguration(msg); 578 break; 579 } 580 581 case kWhatChangeConfiguration2: 582 { 583 onChangeConfiguration2(msg); 584 break; 585 } 586 587 case kWhatChangeConfiguration3: 588 { 589 onChangeConfiguration3(msg); 590 break; 591 } 592 593 case kWhatFinishDisconnect2: 594 { 595 onFinishDisconnect2(); 596 break; 597 } 598 599 case kWhatSwapped: 600 { 601 onSwapped(msg); 602 break; 603 } 604 605 case kWhatCheckSwitchDown: 606 { 607 onCheckSwitchDown(); 608 break; 609 } 610 611 case kWhatSwitchDown: 612 { 613 onSwitchDown(); 614 break; 615 } 616 617 default: 618 TRESPASS(); 619 break; 620 } 621} 622 623// static 624int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 625 if (a->mBandwidth < b->mBandwidth) { 626 return -1; 627 } else if (a->mBandwidth == b->mBandwidth) { 628 return 0; 629 } 630 631 return 1; 632} 633 634// static 635LiveSession::StreamType LiveSession::indexToType(int idx) { 636 CHECK(idx >= 0 && idx < kMaxStreams); 637 return (StreamType)(1 << idx); 638} 639 640// static 641ssize_t LiveSession::typeToIndex(int32_t type) { 642 switch (type) { 643 case STREAMTYPE_AUDIO: 644 return 0; 645 case STREAMTYPE_VIDEO: 646 return 1; 647 case STREAMTYPE_SUBTITLES: 648 return 2; 649 default: 650 return -1; 651 }; 652 return -1; 653} 654 655void LiveSession::onConnect(const sp<AMessage> &msg) { 656 AString url; 657 CHECK(msg->findString("url", &url)); 658 659 KeyedVector<String8, String8> *headers = NULL; 660 if (!msg->findPointer("headers", (void **)&headers)) { 661 mExtraHeaders.clear(); 662 } else { 663 mExtraHeaders = *headers; 664 665 delete headers; 666 headers = NULL; 667 } 668 669 // TODO currently we don't know if we are coming here from incognito mode 670 ALOGI("onConnect %s", uriDebugString(url).c_str()); 671 672 mMasterURL = url; 673 674 bool dummy; 675 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 676 677 if (mPlaylist == NULL) { 678 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 679 680 postPrepared(ERROR_IO); 681 return; 682 } 683 684 // We trust the content provider to make a reasonable choice of preferred 685 // initial bandwidth by listing it first in the variant playlist. 686 // At startup we really don't have a good estimate on the available 687 // network bandwidth since we haven't tranferred any data yet. Once 688 // we have we can make a better informed choice. 689 size_t initialBandwidth = 0; 690 size_t initialBandwidthIndex = 0; 691 692 if (mPlaylist->isVariantPlaylist()) { 693 for (size_t i = 0; i < mPlaylist->size(); ++i) { 694 BandwidthItem item; 695 696 item.mPlaylistIndex = i; 697 698 sp<AMessage> meta; 699 AString uri; 700 mPlaylist->itemAt(i, &uri, &meta); 701 702 unsigned long bandwidth; 703 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 704 705 if (initialBandwidth == 0) { 706 initialBandwidth = item.mBandwidth; 707 } 708 709 mBandwidthItems.push(item); 710 } 711 712 CHECK_GT(mBandwidthItems.size(), 0u); 713 714 mBandwidthItems.sort(SortByBandwidth); 715 716 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 717 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 718 initialBandwidthIndex = i; 719 break; 720 } 721 } 722 } else { 723 // dummy item. 724 BandwidthItem item; 725 item.mPlaylistIndex = 0; 726 item.mBandwidth = 0; 727 mBandwidthItems.push(item); 728 } 729 730 mPlaylist->pickRandomMediaItems(); 731 changeConfiguration( 732 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 733} 734 735void LiveSession::finishDisconnect() { 736 // No reconfiguration is currently pending, make sure none will trigger 737 // during disconnection either. 738 cancelCheckBandwidthEvent(); 739 740 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 741 // (finishDisconnect, onFinishDisconnect2) 742 cancelBandwidthSwitch(); 743 744 // cancel switch down monitor 745 mSwitchDownMonitor.clear(); 746 747 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 748 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 749 } 750 751 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 752 753 mContinuationCounter = mFetcherInfos.size(); 754 mContinuation = msg; 755 756 if (mContinuationCounter == 0) { 757 msg->post(); 758 } 759} 760 761void LiveSession::onFinishDisconnect2() { 762 mContinuation.clear(); 763 764 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 765 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 766 767 mPacketSources.valueFor( 768 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 769 770 sp<AMessage> response = new AMessage; 771 response->setInt32("err", OK); 772 773 response->postReply(mDisconnectReplyID); 774 mDisconnectReplyID = 0; 775} 776 777sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 778 ssize_t index = mFetcherInfos.indexOfKey(uri); 779 780 if (index >= 0) { 781 return NULL; 782 } 783 784 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 785 notify->setString("uri", uri); 786 notify->setInt32("switchGeneration", mSwitchGeneration); 787 788 FetcherInfo info; 789 info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration); 790 info.mDurationUs = -1ll; 791 info.mIsPrepared = false; 792 info.mToBeRemoved = false; 793 looper()->registerHandler(info.mFetcher); 794 795 mFetcherInfos.add(uri, info); 796 797 return info.mFetcher; 798} 799 800/* 801 * Illustration of parameters: 802 * 803 * 0 `range_offset` 804 * +------------+-------------------------------------------------------+--+--+ 805 * | | | next block to fetch | | | 806 * | | `source` handle => `out` buffer | | | | 807 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 808 * | |<----------- `range_length` / buffer capacity ----------->| | 809 * |<------------------------------ file_size ------------------------------->| 810 * 811 * Special parameter values: 812 * - range_length == -1 means entire file 813 * - block_size == 0 means entire range 814 * 815 */ 816ssize_t LiveSession::fetchFile( 817 const char *url, sp<ABuffer> *out, 818 int64_t range_offset, int64_t range_length, 819 uint32_t block_size, /* download block size */ 820 sp<DataSource> *source, /* to return and reuse source */ 821 String8 *actualUrl) { 822 off64_t size; 823 sp<DataSource> temp_source; 824 if (source == NULL) { 825 source = &temp_source; 826 } 827 828 if (*source == NULL) { 829 if (!strncasecmp(url, "file://", 7)) { 830 *source = new FileSource(url + 7); 831 } else if (strncasecmp(url, "http://", 7) 832 && strncasecmp(url, "https://", 8)) { 833 return ERROR_UNSUPPORTED; 834 } else { 835 KeyedVector<String8, String8> headers = mExtraHeaders; 836 if (range_offset > 0 || range_length >= 0) { 837 headers.add( 838 String8("Range"), 839 String8( 840 StringPrintf( 841 "bytes=%lld-%s", 842 range_offset, 843 range_length < 0 844 ? "" : StringPrintf("%lld", 845 range_offset + range_length - 1).c_str()).c_str())); 846 } 847 status_t err = mHTTPDataSource->connect(url, &headers); 848 849 if (err != OK) { 850 return err; 851 } 852 853 *source = mHTTPDataSource; 854 } 855 } 856 857 status_t getSizeErr = (*source)->getSize(&size); 858 if (getSizeErr != OK) { 859 size = 65536; 860 } 861 862 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 863 if (*out == NULL) { 864 buffer->setRange(0, 0); 865 } 866 867 ssize_t bytesRead = 0; 868 // adjust range_length if only reading partial block 869 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 870 range_length = buffer->size() + block_size; 871 } 872 for (;;) { 873 // Only resize when we don't know the size. 874 size_t bufferRemaining = buffer->capacity() - buffer->size(); 875 if (bufferRemaining == 0 && getSizeErr != OK) { 876 bufferRemaining = 32768; 877 878 ALOGV("increasing download buffer to %zu bytes", 879 buffer->size() + bufferRemaining); 880 881 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 882 memcpy(copy->data(), buffer->data(), buffer->size()); 883 copy->setRange(0, buffer->size()); 884 885 buffer = copy; 886 } 887 888 size_t maxBytesToRead = bufferRemaining; 889 if (range_length >= 0) { 890 int64_t bytesLeftInRange = range_length - buffer->size(); 891 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 892 maxBytesToRead = bytesLeftInRange; 893 894 if (bytesLeftInRange == 0) { 895 break; 896 } 897 } 898 } 899 900 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 901 // to help us break out of the loop. 902 ssize_t n = (*source)->readAt( 903 buffer->size(), buffer->data() + buffer->size(), 904 maxBytesToRead); 905 906 if (n < 0) { 907 return n; 908 } 909 910 if (n == 0) { 911 break; 912 } 913 914 buffer->setRange(0, buffer->size() + (size_t)n); 915 bytesRead += n; 916 } 917 918 *out = buffer; 919 if (actualUrl != NULL) { 920 *actualUrl = (*source)->getUri(); 921 if (actualUrl->isEmpty()) { 922 *actualUrl = url; 923 } 924 } 925 926 return bytesRead; 927} 928 929sp<M3UParser> LiveSession::fetchPlaylist( 930 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 931 ALOGV("fetchPlaylist '%s'", url); 932 933 *unchanged = false; 934 935 sp<ABuffer> buffer; 936 String8 actualUrl; 937 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 938 939 if (err <= 0) { 940 return NULL; 941 } 942 943 // MD5 functionality is not available on the simulator, treat all 944 // playlists as changed. 945 946#if defined(HAVE_ANDROID_OS) 947 uint8_t hash[16]; 948 949 MD5_CTX m; 950 MD5_Init(&m); 951 MD5_Update(&m, buffer->data(), buffer->size()); 952 953 MD5_Final(hash, &m); 954 955 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 956 // playlist unchanged 957 *unchanged = true; 958 959 return NULL; 960 } 961 962 if (curPlaylistHash != NULL) { 963 memcpy(curPlaylistHash, hash, sizeof(hash)); 964 } 965#endif 966 967 sp<M3UParser> playlist = 968 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 969 970 if (playlist->initCheck() != OK) { 971 ALOGE("failed to parse .m3u8 playlist"); 972 973 return NULL; 974 } 975 976 return playlist; 977} 978 979static double uniformRand() { 980 return (double)rand() / RAND_MAX; 981} 982 983size_t LiveSession::getBandwidthIndex() { 984 if (mBandwidthItems.size() == 0) { 985 return 0; 986 } 987 988#if 1 989 char value[PROPERTY_VALUE_MAX]; 990 ssize_t index = -1; 991 if (property_get("media.httplive.bw-index", value, NULL)) { 992 char *end; 993 index = strtol(value, &end, 10); 994 CHECK(end > value && *end == '\0'); 995 996 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 997 index = mBandwidthItems.size() - 1; 998 } 999 } 1000 1001 if (index < 0) { 1002 int32_t bandwidthBps; 1003 if (mHTTPDataSource != NULL 1004 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 1005 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 1006 } else { 1007 ALOGV("no bandwidth estimate."); 1008 return 0; // Pick the lowest bandwidth stream by default. 1009 } 1010 1011 char value[PROPERTY_VALUE_MAX]; 1012 if (property_get("media.httplive.max-bw", value, NULL)) { 1013 char *end; 1014 long maxBw = strtoul(value, &end, 10); 1015 if (end > value && *end == '\0') { 1016 if (maxBw > 0 && bandwidthBps > maxBw) { 1017 ALOGV("bandwidth capped to %ld bps", maxBw); 1018 bandwidthBps = maxBw; 1019 } 1020 } 1021 } 1022 1023 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1024 1025 index = mBandwidthItems.size() - 1; 1026 while (index > 0) { 1027 // consider only 80% of the available bandwidth, but if we are switching up, 1028 // be even more conservative (70%) to avoid overestimating and immediately 1029 // switching back. 1030 size_t adjustedBandwidthBps = bandwidthBps; 1031 if (index > mCurBandwidthIndex) { 1032 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 1033 } else { 1034 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 1035 } 1036 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1037 break; 1038 } 1039 --index; 1040 } 1041 } 1042#elif 0 1043 // Change bandwidth at random() 1044 size_t index = uniformRand() * mBandwidthItems.size(); 1045#elif 0 1046 // There's a 50% chance to stay on the current bandwidth and 1047 // a 50% chance to switch to the next higher bandwidth (wrapping around 1048 // to lowest) 1049 const size_t kMinIndex = 0; 1050 1051 static ssize_t mCurBandwidthIndex = -1; 1052 1053 size_t index; 1054 if (mCurBandwidthIndex < 0) { 1055 index = kMinIndex; 1056 } else if (uniformRand() < 0.5) { 1057 index = (size_t)mCurBandwidthIndex; 1058 } else { 1059 index = mCurBandwidthIndex + 1; 1060 if (index == mBandwidthItems.size()) { 1061 index = kMinIndex; 1062 } 1063 } 1064 mCurBandwidthIndex = index; 1065#elif 0 1066 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1067 1068 size_t index = mBandwidthItems.size() - 1; 1069 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1070 --index; 1071 } 1072#elif 1 1073 char value[PROPERTY_VALUE_MAX]; 1074 size_t index; 1075 if (property_get("media.httplive.bw-index", value, NULL)) { 1076 char *end; 1077 index = strtoul(value, &end, 10); 1078 CHECK(end > value && *end == '\0'); 1079 1080 if (index >= mBandwidthItems.size()) { 1081 index = mBandwidthItems.size() - 1; 1082 } 1083 } else { 1084 index = 0; 1085 } 1086#else 1087 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1088#endif 1089 1090 CHECK_GE(index, 0); 1091 1092 return index; 1093} 1094 1095int64_t LiveSession::latestMediaSegmentStartTimeUs() { 1096 sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta(); 1097 int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1; 1098 if (audioMeta != NULL) { 1099 audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs); 1100 } 1101 1102 sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta(); 1103 if (videoMeta != NULL 1104 && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) { 1105 if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) { 1106 minSegmentStartTimeUs = videoSegmentStartTimeUs; 1107 } 1108 1109 } 1110 return minSegmentStartTimeUs; 1111} 1112 1113status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1114 int64_t timeUs; 1115 CHECK(msg->findInt64("timeUs", &timeUs)); 1116 1117 if (!mReconfigurationInProgress) { 1118 changeConfiguration(timeUs, mCurBandwidthIndex); 1119 return OK; 1120 } else { 1121 return -EWOULDBLOCK; 1122 } 1123} 1124 1125status_t LiveSession::getDuration(int64_t *durationUs) const { 1126 int64_t maxDurationUs = -1ll; 1127 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1128 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1129 1130 if (fetcherDurationUs > maxDurationUs) { 1131 maxDurationUs = fetcherDurationUs; 1132 } 1133 } 1134 1135 *durationUs = maxDurationUs; 1136 1137 return OK; 1138} 1139 1140bool LiveSession::isSeekable() const { 1141 int64_t durationUs; 1142 return getDuration(&durationUs) == OK && durationUs >= 0; 1143} 1144 1145bool LiveSession::hasDynamicDuration() const { 1146 return false; 1147} 1148 1149size_t LiveSession::getTrackCount() const { 1150 if (mPlaylist == NULL) { 1151 return 0; 1152 } else { 1153 return mPlaylist->getTrackCount(); 1154 } 1155} 1156 1157sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1158 if (mPlaylist == NULL) { 1159 return NULL; 1160 } else { 1161 return mPlaylist->getTrackInfo(trackIndex); 1162 } 1163} 1164 1165status_t LiveSession::selectTrack(size_t index, bool select) { 1166 if (mPlaylist == NULL) { 1167 return INVALID_OPERATION; 1168 } 1169 1170 ++mSubtitleGeneration; 1171 status_t err = mPlaylist->selectTrack(index, select); 1172 if (err == OK) { 1173 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1174 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1175 msg->setInt32("pickTrack", select); 1176 msg->post(); 1177 } 1178 return err; 1179} 1180 1181ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1182 if (mPlaylist == NULL) { 1183 return -1; 1184 } else { 1185 return mPlaylist->getSelectedTrack(type); 1186 } 1187} 1188 1189bool LiveSession::canSwitchUp() { 1190 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1191 status_t err = OK; 1192 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1193 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1194 int64_t dur = source->getBufferedDurationUs(&err); 1195 if (err == OK && dur > 10000000) { 1196 return true; 1197 } 1198 } 1199 return false; 1200} 1201 1202void LiveSession::changeConfiguration( 1203 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1204 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1205 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1206 cancelBandwidthSwitch(); 1207 1208 CHECK(!mReconfigurationInProgress); 1209 mReconfigurationInProgress = true; 1210 1211 mCurBandwidthIndex = bandwidthIndex; 1212 1213 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1214 timeUs, bandwidthIndex, pickTrack); 1215 1216 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1217 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1218 1219 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1220 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1221 1222 AString URIs[kMaxStreams]; 1223 for (size_t i = 0; i < kMaxStreams; ++i) { 1224 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1225 streamMask |= indexToType(i); 1226 } 1227 } 1228 1229 // Step 1, stop and discard fetchers that are no longer needed. 1230 // Pause those that we'll reuse. 1231 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1232 const AString &uri = mFetcherInfos.keyAt(i); 1233 1234 bool discardFetcher = true; 1235 1236 // If we're seeking all current fetchers are discarded. 1237 if (timeUs < 0ll) { 1238 // delay fetcher removal if not picking tracks 1239 discardFetcher = pickTrack; 1240 1241 for (size_t j = 0; j < kMaxStreams; ++j) { 1242 StreamType type = indexToType(j); 1243 if ((streamMask & type) && uri == URIs[j]) { 1244 resumeMask |= type; 1245 streamMask &= ~type; 1246 discardFetcher = false; 1247 } 1248 } 1249 } 1250 1251 if (discardFetcher) { 1252 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1253 } else { 1254 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1255 } 1256 } 1257 1258 sp<AMessage> msg; 1259 if (timeUs < 0ll) { 1260 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1261 msg = new AMessage(kWhatChangeConfiguration3, id()); 1262 } else { 1263 msg = new AMessage(kWhatChangeConfiguration2, id()); 1264 } 1265 msg->setInt32("streamMask", streamMask); 1266 msg->setInt32("resumeMask", resumeMask); 1267 msg->setInt32("pickTrack", pickTrack); 1268 msg->setInt64("timeUs", timeUs); 1269 for (size_t i = 0; i < kMaxStreams; ++i) { 1270 if ((streamMask | resumeMask) & indexToType(i)) { 1271 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1272 } 1273 } 1274 1275 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1276 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1277 // fetchers have completed their asynchronous operation, we'll post 1278 // mContinuation, which then is handled below in onChangeConfiguration2. 1279 mContinuationCounter = mFetcherInfos.size(); 1280 mContinuation = msg; 1281 1282 if (mContinuationCounter == 0) { 1283 msg->post(); 1284 1285 if (mSeekReplyID != 0) { 1286 CHECK(mSeekReply != NULL); 1287 mSeekReply->setInt32("err", OK); 1288 mSeekReply->postReply(mSeekReplyID); 1289 mSeekReplyID = 0; 1290 mSeekReply.clear(); 1291 } 1292 } 1293} 1294 1295void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1296 if (!mReconfigurationInProgress) { 1297 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1298 msg->findInt32("pickTrack", &pickTrack); 1299 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1300 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1301 } else { 1302 msg->post(1000000ll); // retry in 1 sec 1303 } 1304} 1305 1306void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1307 mContinuation.clear(); 1308 1309 // All fetchers are either suspended or have been removed now. 1310 1311 uint32_t streamMask, resumeMask; 1312 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1313 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1314 1315 // currently onChangeConfiguration2 is only called for seeking; 1316 // remove the following CHECK if using it else where. 1317 CHECK_EQ(resumeMask, 0); 1318 streamMask |= resumeMask; 1319 1320 AString URIs[kMaxStreams]; 1321 for (size_t i = 0; i < kMaxStreams; ++i) { 1322 if (streamMask & indexToType(i)) { 1323 const AString &uriKey = mStreams[i].uriKey(); 1324 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1325 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1326 } 1327 } 1328 1329 // Determine which decoders to shutdown on the player side, 1330 // a decoder has to be shutdown if either 1331 // 1) its streamtype was active before but now longer isn't. 1332 // or 1333 // 2) its streamtype was already active and still is but the URI 1334 // has changed. 1335 uint32_t changedMask = 0; 1336 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1337 if (((mStreamMask & streamMask & indexToType(i)) 1338 && !(URIs[i] == mStreams[i].mUri)) 1339 || (mStreamMask & ~streamMask & indexToType(i))) { 1340 changedMask |= indexToType(i); 1341 } 1342 } 1343 1344 if (changedMask == 0) { 1345 // If nothing changed as far as the audio/video decoders 1346 // are concerned we can proceed. 1347 onChangeConfiguration3(msg); 1348 return; 1349 } 1350 1351 // Something changed, inform the player which will shutdown the 1352 // corresponding decoders and will post the reply once that's done. 1353 // Handling the reply will continue executing below in 1354 // onChangeConfiguration3. 1355 sp<AMessage> notify = mNotify->dup(); 1356 notify->setInt32("what", kWhatStreamsChanged); 1357 notify->setInt32("changedMask", changedMask); 1358 1359 msg->setWhat(kWhatChangeConfiguration3); 1360 msg->setTarget(id()); 1361 1362 notify->setMessage("reply", msg); 1363 notify->post(); 1364} 1365 1366void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1367 mContinuation.clear(); 1368 // All remaining fetchers are still suspended, the player has shutdown 1369 // any decoders that needed it. 1370 1371 uint32_t streamMask, resumeMask; 1372 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1373 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1374 1375 int64_t timeUs; 1376 int32_t pickTrack; 1377 bool switching = false; 1378 CHECK(msg->findInt64("timeUs", &timeUs)); 1379 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1380 1381 if (timeUs < 0ll) { 1382 if (!pickTrack) { 1383 switching = true; 1384 } 1385 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1386 } else { 1387 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1388 } 1389 1390 for (size_t i = 0; i < kMaxStreams; ++i) { 1391 if (streamMask & indexToType(i)) { 1392 if (switching) { 1393 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1394 } else { 1395 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1396 } 1397 } 1398 } 1399 1400 mNewStreamMask = streamMask | resumeMask; 1401 if (switching) { 1402 mSwapMask = mStreamMask & ~resumeMask; 1403 } 1404 1405 // Of all existing fetchers: 1406 // * Resume fetchers that are still needed and assign them original packet sources. 1407 // * Mark otherwise unneeded fetchers for removal. 1408 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1409 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1410 const AString &uri = mFetcherInfos.keyAt(i); 1411 1412 sp<AnotherPacketSource> sources[kMaxStreams]; 1413 for (size_t j = 0; j < kMaxStreams; ++j) { 1414 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1415 sources[j] = mPacketSources.valueFor(indexToType(j)); 1416 1417 if (j != kSubtitleIndex) { 1418 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1419 sp<AnotherPacketSource> discontinuityQueue; 1420 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1421 discontinuityQueue->queueDiscontinuity( 1422 ATSParser::DISCONTINUITY_NONE, 1423 NULL, 1424 true); 1425 } 1426 } 1427 } 1428 1429 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1430 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1431 || sources[kSubtitleIndex] != NULL) { 1432 info.mFetcher->startAsync( 1433 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1434 } else { 1435 info.mToBeRemoved = true; 1436 } 1437 } 1438 1439 // streamMask now only contains the types that need a new fetcher created. 1440 1441 if (streamMask != 0) { 1442 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1443 } 1444 1445 // Find out when the original fetchers have buffered up to and start the new fetchers 1446 // at a later timestamp. 1447 for (size_t i = 0; i < kMaxStreams; i++) { 1448 if (!(indexToType(i) & streamMask)) { 1449 continue; 1450 } 1451 1452 AString uri; 1453 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1454 1455 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1456 CHECK(fetcher != NULL); 1457 1458 int32_t latestSeq = -1; 1459 int64_t startTimeUs = -1; 1460 int64_t segmentStartTimeUs = -1ll; 1461 int32_t discontinuitySeq = -1; 1462 sp<AnotherPacketSource> sources[kMaxStreams]; 1463 1464 if (i == kSubtitleIndex) { 1465 segmentStartTimeUs = latestMediaSegmentStartTimeUs(); 1466 } 1467 1468 // TRICKY: looping from i as earlier streams are already removed from streamMask 1469 for (size_t j = i; j < kMaxStreams; ++j) { 1470 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1471 if ((streamMask & indexToType(j)) && uri == streamUri) { 1472 sources[j] = mPacketSources.valueFor(indexToType(j)); 1473 1474 if (timeUs >= 0) { 1475 sources[j]->clear(); 1476 startTimeUs = timeUs; 1477 1478 sp<AnotherPacketSource> discontinuityQueue; 1479 sp<AMessage> extra = new AMessage; 1480 extra->setInt64("timeUs", timeUs); 1481 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1482 discontinuityQueue->queueDiscontinuity( 1483 ATSParser::DISCONTINUITY_TIME, extra, true); 1484 } else { 1485 int32_t type; 1486 int64_t srcSegmentStartTimeUs; 1487 sp<AMessage> meta; 1488 if (pickTrack) { 1489 // selecting 1490 meta = sources[j]->getLatestDequeuedMeta(); 1491 } else { 1492 // adapting 1493 meta = sources[j]->getLatestEnqueuedMeta(); 1494 } 1495 1496 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1497 int64_t tmpUs; 1498 CHECK(meta->findInt64("timeUs", &tmpUs)); 1499 if (startTimeUs < 0 || tmpUs < startTimeUs) { 1500 startTimeUs = tmpUs; 1501 } 1502 1503 CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); 1504 if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { 1505 segmentStartTimeUs = tmpUs; 1506 } 1507 1508 int32_t seq; 1509 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1510 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1511 discontinuitySeq = seq; 1512 } 1513 } 1514 1515 if (pickTrack) { 1516 // selecting track, queue discontinuities before content 1517 sources[j]->clear(); 1518 if (j == kSubtitleIndex) { 1519 break; 1520 } 1521 sp<AnotherPacketSource> discontinuityQueue; 1522 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1523 discontinuityQueue->queueDiscontinuity( 1524 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1525 } else { 1526 // adapting, queue discontinuities after resume 1527 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1528 sources[j]->clear(); 1529 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1530 if (extraStreams & indexToType(j)) { 1531 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1532 } 1533 } 1534 } 1535 1536 streamMask &= ~indexToType(j); 1537 } 1538 } 1539 1540 fetcher->startAsync( 1541 sources[kAudioIndex], 1542 sources[kVideoIndex], 1543 sources[kSubtitleIndex], 1544 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1545 segmentStartTimeUs, 1546 discontinuitySeq, 1547 switching); 1548 } 1549 1550 // All fetchers have now been started, the configuration change 1551 // has completed. 1552 1553 cancelCheckBandwidthEvent(); 1554 scheduleCheckBandwidthEvent(); 1555 1556 ALOGV("XXX configuration change completed."); 1557 mReconfigurationInProgress = false; 1558 if (switching) { 1559 mSwitchInProgress = true; 1560 } else { 1561 mStreamMask = mNewStreamMask; 1562 } 1563 1564 if (mDisconnectReplyID != 0) { 1565 finishDisconnect(); 1566 } 1567} 1568 1569void LiveSession::onSwapped(const sp<AMessage> &msg) { 1570 int32_t switchGeneration; 1571 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1572 if (switchGeneration != mSwitchGeneration) { 1573 return; 1574 } 1575 1576 int32_t stream; 1577 CHECK(msg->findInt32("stream", &stream)); 1578 1579 ssize_t idx = typeToIndex(stream); 1580 CHECK(idx >= 0); 1581 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1582 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1583 } 1584 mStreams[idx].mUri = mStreams[idx].mNewUri; 1585 mStreams[idx].mNewUri.clear(); 1586 1587 mSwapMask &= ~stream; 1588 if (mSwapMask != 0) { 1589 return; 1590 } 1591 1592 // Check if new variant contains extra streams. 1593 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1594 while (extraStreams) { 1595 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1596 swapPacketSource(extraStream); 1597 extraStreams &= ~extraStream; 1598 1599 idx = typeToIndex(extraStream); 1600 CHECK(idx >= 0); 1601 if (mStreams[idx].mNewUri.empty()) { 1602 ALOGW("swapping extra stream type %d %s to empty stream", 1603 extraStream, mStreams[idx].mUri.c_str()); 1604 } 1605 mStreams[idx].mUri = mStreams[idx].mNewUri; 1606 mStreams[idx].mNewUri.clear(); 1607 } 1608 1609 tryToFinishBandwidthSwitch(); 1610} 1611 1612void LiveSession::onCheckSwitchDown() { 1613 if (mSwitchDownMonitor == NULL) { 1614 return; 1615 } 1616 1617 for (size_t i = 0; i < kMaxStreams; ++i) { 1618 int32_t targetDuration; 1619 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1620 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1621 1622 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1623 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1624 int64_t targetDurationUs = targetDuration * 1000000ll; 1625 1626 if (bufferedDurationUs < targetDurationUs / 3) { 1627 (new AMessage(kWhatSwitchDown, id()))->post(); 1628 break; 1629 } 1630 } 1631 } 1632 1633 mSwitchDownMonitor->post(1000000ll); 1634} 1635 1636void LiveSession::onSwitchDown() { 1637 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1638 return; 1639 } 1640 1641 ssize_t bandwidthIndex = getBandwidthIndex(); 1642 if (bandwidthIndex < mCurBandwidthIndex) { 1643 changeConfiguration(-1, bandwidthIndex, false); 1644 return; 1645 } 1646 1647 changeConfiguration(-1, mCurBandwidthIndex - 1, false); 1648} 1649 1650// Mark switch done when: 1651// 1. all old buffers are swapped out 1652void LiveSession::tryToFinishBandwidthSwitch() { 1653 if (!mSwitchInProgress) { 1654 return; 1655 } 1656 1657 bool needToRemoveFetchers = false; 1658 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1659 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1660 needToRemoveFetchers = true; 1661 break; 1662 } 1663 } 1664 1665 if (!needToRemoveFetchers && mSwapMask == 0) { 1666 ALOGI("mSwitchInProgress = false"); 1667 mStreamMask = mNewStreamMask; 1668 mSwitchInProgress = false; 1669 } 1670} 1671 1672void LiveSession::scheduleCheckBandwidthEvent() { 1673 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1674 msg->setInt32("generation", mCheckBandwidthGeneration); 1675 msg->post(10000000ll); 1676} 1677 1678void LiveSession::cancelCheckBandwidthEvent() { 1679 ++mCheckBandwidthGeneration; 1680} 1681 1682void LiveSession::cancelBandwidthSwitch() { 1683 Mutex::Autolock lock(mSwapMutex); 1684 mSwitchGeneration++; 1685 mSwitchInProgress = false; 1686 mSwapMask = 0; 1687 1688 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1689 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1690 if (info.mToBeRemoved) { 1691 info.mToBeRemoved = false; 1692 } 1693 } 1694 1695 for (size_t i = 0; i < kMaxStreams; ++i) { 1696 if (!mStreams[i].mNewUri.empty()) { 1697 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1698 if (j < 0) { 1699 mStreams[i].mNewUri.clear(); 1700 continue; 1701 } 1702 1703 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1704 info.mFetcher->stopAsync(); 1705 mFetcherInfos.removeItemsAt(j); 1706 mStreams[i].mNewUri.clear(); 1707 } 1708 } 1709} 1710 1711bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1712 if (mReconfigurationInProgress || mSwitchInProgress) { 1713 return false; 1714 } 1715 1716 if (mCurBandwidthIndex < 0) { 1717 return true; 1718 } 1719 1720 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1721 return false; 1722 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1723 return canSwitchUp(); 1724 } else { 1725 return true; 1726 } 1727} 1728 1729void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { 1730 size_t bandwidthIndex = getBandwidthIndex(); 1731 if (canSwitchBandwidthTo(bandwidthIndex)) { 1732 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1733 } else { 1734 // Come back and check again 10 seconds later in case there is nothing to do now. 1735 // If we DO change configuration, once that completes it'll schedule a new 1736 // check bandwidth event with an incremented mCheckBandwidthGeneration. 1737 msg->post(10000000ll); 1738 } 1739} 1740 1741void LiveSession::postPrepared(status_t err) { 1742 CHECK(mInPreparationPhase); 1743 1744 sp<AMessage> notify = mNotify->dup(); 1745 if (err == OK || err == ERROR_END_OF_STREAM) { 1746 notify->setInt32("what", kWhatPrepared); 1747 } else { 1748 notify->setInt32("what", kWhatPreparationFailed); 1749 notify->setInt32("err", err); 1750 } 1751 1752 notify->post(); 1753 1754 mInPreparationPhase = false; 1755 1756 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1757 mSwitchDownMonitor->post(); 1758} 1759 1760} // namespace android 1761 1762