LiveSession.cpp revision 15f8ecfa23b650b3efa8fe841d2be6bd0c9523fb
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52LiveSession::LiveSession( 53 const sp<AMessage> ¬ify, uint32_t flags, 54 const sp<IMediaHTTPService> &httpService) 55 : mNotify(notify), 56 mFlags(flags), 57 mHTTPService(httpService), 58 mInPreparationPhase(true), 59 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 60 mCurBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mLastDequeuedTimeUs(0ll), 67 mRealTimeBaseUs(0ll), 68 mReconfigurationInProgress(false), 69 mSwitchInProgress(false), 70 mDisconnectReplyID(0), 71 mSeekReplyID(0), 72 mFirstTimeUsValid(false), 73 mFirstTimeUs(0), 74 mLastSeekTimeUs(0) { 75 76 mStreams[kAudioIndex] = StreamItem("audio"); 77 mStreams[kVideoIndex] = StreamItem("video"); 78 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 79 80 for (size_t i = 0; i < kMaxStreams; ++i) { 81 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 82 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 84 mBuffering[i] = false; 85 } 86} 87 88LiveSession::~LiveSession() { 89} 90 91sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 92 ABuffer *discontinuity = new ABuffer(0); 93 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 94 discontinuity->meta()->setInt32("swapPacketSource", swap); 95 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 96 discontinuity->meta()->setInt64("timeUs", -1); 97 return discontinuity; 98} 99 100void LiveSession::swapPacketSource(StreamType stream) { 101 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 102 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 103 sp<AnotherPacketSource> tmp = aps; 104 aps = aps2; 105 aps2 = tmp; 106 aps2->clear(); 107} 108 109status_t LiveSession::dequeueAccessUnit( 110 StreamType stream, sp<ABuffer> *accessUnit) { 111 if (!(mStreamMask & stream)) { 112 // return -EWOULDBLOCK to avoid halting the decoder 113 // when switching between audio/video and audio only. 114 return -EWOULDBLOCK; 115 } 116 117 status_t finalResult; 118 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 119 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 120 discontinuityQueue->dequeueAccessUnit(accessUnit); 121 // seeking, track switching 122 sp<AMessage> extra; 123 int64_t timeUs; 124 if ((*accessUnit)->meta()->findMessage("extra", &extra) 125 && extra != NULL 126 && extra->findInt64("timeUs", &timeUs)) { 127 // seeking only 128 mLastSeekTimeUs = timeUs; 129 mDiscontinuityOffsetTimesUs.clear(); 130 mDiscontinuityAbsStartTimesUs.clear(); 131 } 132 return INFO_DISCONTINUITY; 133 } 134 135 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 136 137 ssize_t idx = typeToIndex(stream); 138 if (!packetSource->hasBufferAvailable(&finalResult)) { 139 if (finalResult == OK) { 140 mBuffering[idx] = true; 141 return -EAGAIN; 142 } else { 143 return finalResult; 144 } 145 } 146 147 if (mBuffering[idx]) { 148 if (mSwitchInProgress 149 || packetSource->isFinished(0) 150 || packetSource->getEstimatedDurationUs() > 10000000ll) { 151 mBuffering[idx] = false; 152 } 153 } 154 155 if (mBuffering[idx]) { 156 return -EAGAIN; 157 } 158 159 // wait for counterpart 160 sp<AnotherPacketSource> otherSource; 161 uint32_t mask = mNewStreamMask & mStreamMask; 162 uint32_t fetchersMask = 0; 163 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 164 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 165 fetchersMask |= fetcherMask; 166 } 167 mask &= fetchersMask; 168 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 169 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 170 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 171 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 172 } 173 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 174 return finalResult == OK ? -EAGAIN : finalResult; 175 } 176 177 status_t err = packetSource->dequeueAccessUnit(accessUnit); 178 179 size_t streamIdx; 180 const char *streamStr; 181 switch (stream) { 182 case STREAMTYPE_AUDIO: 183 streamIdx = kAudioIndex; 184 streamStr = "audio"; 185 break; 186 case STREAMTYPE_VIDEO: 187 streamIdx = kVideoIndex; 188 streamStr = "video"; 189 break; 190 case STREAMTYPE_SUBTITLES: 191 streamIdx = kSubtitleIndex; 192 streamStr = "subs"; 193 break; 194 default: 195 TRESPASS(); 196 } 197 198 StreamItem& strm = mStreams[streamIdx]; 199 if (err == INFO_DISCONTINUITY) { 200 // adaptive streaming, discontinuities in the playlist 201 int32_t type; 202 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 203 204 sp<AMessage> extra; 205 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 206 extra.clear(); 207 } 208 209 ALOGI("[%s] read discontinuity of type %d, extra = %s", 210 streamStr, 211 type, 212 extra == NULL ? "NULL" : extra->debugString().c_str()); 213 214 int32_t swap; 215 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 216 int32_t switchGeneration; 217 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 218 { 219 Mutex::Autolock lock(mSwapMutex); 220 if (switchGeneration == mSwitchGeneration) { 221 swapPacketSource(stream); 222 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 223 msg->setInt32("stream", stream); 224 msg->setInt32("switchGeneration", switchGeneration); 225 msg->post(); 226 } 227 } 228 } else { 229 size_t seq = strm.mCurDiscontinuitySeq; 230 int64_t offsetTimeUs; 231 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 232 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 233 } else { 234 offsetTimeUs = 0; 235 } 236 237 seq += 1; 238 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 239 int64_t firstTimeUs; 240 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 241 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 242 offsetTimeUs += strm.mLastSampleDurationUs; 243 } else { 244 offsetTimeUs += strm.mLastSampleDurationUs; 245 } 246 247 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 248 } 249 } else if (err == OK) { 250 251 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 252 int64_t timeUs; 253 int32_t discontinuitySeq = 0; 254 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 255 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 256 strm.mCurDiscontinuitySeq = discontinuitySeq; 257 258 int32_t discard = 0; 259 int64_t firstTimeUs; 260 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 261 int64_t durUs; // approximate sample duration 262 if (timeUs > strm.mLastDequeuedTimeUs) { 263 durUs = timeUs - strm.mLastDequeuedTimeUs; 264 } else { 265 durUs = strm.mLastDequeuedTimeUs - timeUs; 266 } 267 strm.mLastSampleDurationUs = durUs; 268 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 269 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 270 firstTimeUs = timeUs; 271 } else { 272 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 273 firstTimeUs = timeUs; 274 } 275 276 strm.mLastDequeuedTimeUs = timeUs; 277 if (timeUs >= firstTimeUs) { 278 timeUs -= firstTimeUs; 279 } else { 280 timeUs = 0; 281 } 282 timeUs += mLastSeekTimeUs; 283 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 284 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 285 } 286 287 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 288 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 289 mLastDequeuedTimeUs = timeUs; 290 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 291 } else if (stream == STREAMTYPE_SUBTITLES) { 292 (*accessUnit)->meta()->setInt32( 293 "trackIndex", mPlaylist->getSelectedIndex()); 294 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 295 } 296 } else { 297 ALOGI("[%s] encountered error %d", streamStr, err); 298 } 299 300 return err; 301} 302 303status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 304 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 305 if (!(mStreamMask & stream)) { 306 return UNKNOWN_ERROR; 307 } 308 309 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 310 311 sp<MetaData> meta = packetSource->getFormat(); 312 313 if (meta == NULL) { 314 return -EAGAIN; 315 } 316 317 return convertMetaDataToMessage(meta, format); 318} 319 320void LiveSession::connectAsync( 321 const char *url, const KeyedVector<String8, String8> *headers) { 322 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 323 msg->setString("url", url); 324 325 if (headers != NULL) { 326 msg->setPointer( 327 "headers", 328 new KeyedVector<String8, String8>(*headers)); 329 } 330 331 msg->post(); 332} 333 334status_t LiveSession::disconnect() { 335 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 336 337 sp<AMessage> response; 338 status_t err = msg->postAndAwaitResponse(&response); 339 340 return err; 341} 342 343status_t LiveSession::seekTo(int64_t timeUs) { 344 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 345 msg->setInt64("timeUs", timeUs); 346 347 sp<AMessage> response; 348 status_t err = msg->postAndAwaitResponse(&response); 349 350 uint32_t replyID; 351 CHECK(response == mSeekReply && 0 != mSeekReplyID); 352 mSeekReply.clear(); 353 mSeekReplyID = 0; 354 return err; 355} 356 357void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 358 switch (msg->what()) { 359 case kWhatConnect: 360 { 361 onConnect(msg); 362 break; 363 } 364 365 case kWhatDisconnect: 366 { 367 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 368 369 if (mReconfigurationInProgress) { 370 break; 371 } 372 373 finishDisconnect(); 374 break; 375 } 376 377 case kWhatSeek: 378 { 379 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 380 381 status_t err = onSeek(msg); 382 383 mSeekReply = new AMessage; 384 mSeekReply->setInt32("err", err); 385 break; 386 } 387 388 case kWhatFetcherNotify: 389 { 390 int32_t what; 391 CHECK(msg->findInt32("what", &what)); 392 393 switch (what) { 394 case PlaylistFetcher::kWhatStarted: 395 break; 396 case PlaylistFetcher::kWhatPaused: 397 case PlaylistFetcher::kWhatStopped: 398 { 399 if (what == PlaylistFetcher::kWhatStopped) { 400 AString uri; 401 CHECK(msg->findString("uri", &uri)); 402 if (mFetcherInfos.removeItem(uri) < 0) { 403 // ignore duplicated kWhatStopped messages. 404 break; 405 } 406 407 if (mSwitchInProgress) { 408 tryToFinishBandwidthSwitch(); 409 } 410 } 411 412 if (mContinuation != NULL) { 413 CHECK_GT(mContinuationCounter, 0); 414 if (--mContinuationCounter == 0) { 415 mContinuation->post(); 416 417 if (mSeekReplyID != 0) { 418 CHECK(mSeekReply != NULL); 419 mSeekReply->postReply(mSeekReplyID); 420 } 421 } 422 } 423 break; 424 } 425 426 case PlaylistFetcher::kWhatDurationUpdate: 427 { 428 AString uri; 429 CHECK(msg->findString("uri", &uri)); 430 431 int64_t durationUs; 432 CHECK(msg->findInt64("durationUs", &durationUs)); 433 434 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 435 info->mDurationUs = durationUs; 436 break; 437 } 438 439 case PlaylistFetcher::kWhatError: 440 { 441 status_t err; 442 CHECK(msg->findInt32("err", &err)); 443 444 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 445 446 // handle EOS on subtitle tracks independently 447 AString uri; 448 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 449 ssize_t i = mFetcherInfos.indexOfKey(uri); 450 if (i >= 0) { 451 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 452 if (fetcher != NULL) { 453 uint32_t type = fetcher->getStreamTypeMask(); 454 if (type == STREAMTYPE_SUBTITLES) { 455 mPacketSources.valueFor( 456 STREAMTYPE_SUBTITLES)->signalEOS(err);; 457 break; 458 } 459 } 460 } 461 } 462 463 if (mInPreparationPhase) { 464 postPrepared(err); 465 } 466 467 cancelBandwidthSwitch(); 468 469 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 470 471 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 472 473 mPacketSources.valueFor( 474 STREAMTYPE_SUBTITLES)->signalEOS(err); 475 476 sp<AMessage> notify = mNotify->dup(); 477 notify->setInt32("what", kWhatError); 478 notify->setInt32("err", err); 479 notify->post(); 480 break; 481 } 482 483 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 484 { 485 AString uri; 486 CHECK(msg->findString("uri", &uri)); 487 488 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 489 info->mIsPrepared = true; 490 491 if (mInPreparationPhase) { 492 bool allFetchersPrepared = true; 493 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 494 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 495 allFetchersPrepared = false; 496 break; 497 } 498 } 499 500 if (allFetchersPrepared) { 501 postPrepared(OK); 502 } 503 } 504 break; 505 } 506 507 case PlaylistFetcher::kWhatStartedAt: 508 { 509 int32_t switchGeneration; 510 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 511 512 if (switchGeneration != mSwitchGeneration) { 513 break; 514 } 515 516 // Resume fetcher for the original variant; the resumed fetcher should 517 // continue until the timestamps found in msg, which is stored by the 518 // new fetcher to indicate where the new variant has started buffering. 519 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 520 const FetcherInfo info = mFetcherInfos.valueAt(i); 521 if (info.mToBeRemoved) { 522 info.mFetcher->resumeUntilAsync(msg); 523 } 524 } 525 break; 526 } 527 528 default: 529 TRESPASS(); 530 } 531 532 break; 533 } 534 535 case kWhatCheckBandwidth: 536 { 537 int32_t generation; 538 CHECK(msg->findInt32("generation", &generation)); 539 540 if (generation != mCheckBandwidthGeneration) { 541 break; 542 } 543 544 onCheckBandwidth(msg); 545 break; 546 } 547 548 case kWhatChangeConfiguration: 549 { 550 onChangeConfiguration(msg); 551 break; 552 } 553 554 case kWhatChangeConfiguration2: 555 { 556 onChangeConfiguration2(msg); 557 break; 558 } 559 560 case kWhatChangeConfiguration3: 561 { 562 onChangeConfiguration3(msg); 563 break; 564 } 565 566 case kWhatFinishDisconnect2: 567 { 568 onFinishDisconnect2(); 569 break; 570 } 571 572 case kWhatSwapped: 573 { 574 onSwapped(msg); 575 break; 576 } 577 578 case kWhatCheckSwitchDown: 579 { 580 onCheckSwitchDown(); 581 break; 582 } 583 584 case kWhatSwitchDown: 585 { 586 onSwitchDown(); 587 break; 588 } 589 590 default: 591 TRESPASS(); 592 break; 593 } 594} 595 596// static 597int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 598 if (a->mBandwidth < b->mBandwidth) { 599 return -1; 600 } else if (a->mBandwidth == b->mBandwidth) { 601 return 0; 602 } 603 604 return 1; 605} 606 607// static 608LiveSession::StreamType LiveSession::indexToType(int idx) { 609 CHECK(idx >= 0 && idx < kMaxStreams); 610 return (StreamType)(1 << idx); 611} 612 613// static 614ssize_t LiveSession::typeToIndex(int32_t type) { 615 switch (type) { 616 case STREAMTYPE_AUDIO: 617 return 0; 618 case STREAMTYPE_VIDEO: 619 return 1; 620 case STREAMTYPE_SUBTITLES: 621 return 2; 622 default: 623 return -1; 624 }; 625 return -1; 626} 627 628void LiveSession::onConnect(const sp<AMessage> &msg) { 629 AString url; 630 CHECK(msg->findString("url", &url)); 631 632 KeyedVector<String8, String8> *headers = NULL; 633 if (!msg->findPointer("headers", (void **)&headers)) { 634 mExtraHeaders.clear(); 635 } else { 636 mExtraHeaders = *headers; 637 638 delete headers; 639 headers = NULL; 640 } 641 642 // TODO currently we don't know if we are coming here from incognito mode 643 ALOGI("onConnect %s", uriDebugString(url).c_str()); 644 645 mMasterURL = url; 646 647 bool dummy; 648 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 649 650 if (mPlaylist == NULL) { 651 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 652 653 postPrepared(ERROR_IO); 654 return; 655 } 656 657 // We trust the content provider to make a reasonable choice of preferred 658 // initial bandwidth by listing it first in the variant playlist. 659 // At startup we really don't have a good estimate on the available 660 // network bandwidth since we haven't tranferred any data yet. Once 661 // we have we can make a better informed choice. 662 size_t initialBandwidth = 0; 663 size_t initialBandwidthIndex = 0; 664 665 if (mPlaylist->isVariantPlaylist()) { 666 for (size_t i = 0; i < mPlaylist->size(); ++i) { 667 BandwidthItem item; 668 669 item.mPlaylistIndex = i; 670 671 sp<AMessage> meta; 672 AString uri; 673 mPlaylist->itemAt(i, &uri, &meta); 674 675 unsigned long bandwidth; 676 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 677 678 if (initialBandwidth == 0) { 679 initialBandwidth = item.mBandwidth; 680 } 681 682 mBandwidthItems.push(item); 683 } 684 685 CHECK_GT(mBandwidthItems.size(), 0u); 686 687 mBandwidthItems.sort(SortByBandwidth); 688 689 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 690 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 691 initialBandwidthIndex = i; 692 break; 693 } 694 } 695 } else { 696 // dummy item. 697 BandwidthItem item; 698 item.mPlaylistIndex = 0; 699 item.mBandwidth = 0; 700 mBandwidthItems.push(item); 701 } 702 703 mPlaylist->pickRandomMediaItems(); 704 changeConfiguration( 705 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 706} 707 708void LiveSession::finishDisconnect() { 709 // No reconfiguration is currently pending, make sure none will trigger 710 // during disconnection either. 711 cancelCheckBandwidthEvent(); 712 713 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 714 // (finishDisconnect, onFinishDisconnect2) 715 cancelBandwidthSwitch(); 716 717 // cancel switch down monitor 718 mSwitchDownMonitor.clear(); 719 720 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 721 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 722 } 723 724 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 725 726 mContinuationCounter = mFetcherInfos.size(); 727 mContinuation = msg; 728 729 if (mContinuationCounter == 0) { 730 msg->post(); 731 } 732} 733 734void LiveSession::onFinishDisconnect2() { 735 mContinuation.clear(); 736 737 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 738 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 739 740 mPacketSources.valueFor( 741 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 742 743 sp<AMessage> response = new AMessage; 744 response->setInt32("err", OK); 745 746 response->postReply(mDisconnectReplyID); 747 mDisconnectReplyID = 0; 748} 749 750sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 751 ssize_t index = mFetcherInfos.indexOfKey(uri); 752 753 if (index >= 0) { 754 return NULL; 755 } 756 757 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 758 notify->setString("uri", uri); 759 notify->setInt32("switchGeneration", mSwitchGeneration); 760 761 FetcherInfo info; 762 info.mFetcher = new PlaylistFetcher(notify, this, uri); 763 info.mDurationUs = -1ll; 764 info.mIsPrepared = false; 765 info.mToBeRemoved = false; 766 looper()->registerHandler(info.mFetcher); 767 768 mFetcherInfos.add(uri, info); 769 770 return info.mFetcher; 771} 772 773/* 774 * Illustration of parameters: 775 * 776 * 0 `range_offset` 777 * +------------+-------------------------------------------------------+--+--+ 778 * | | | next block to fetch | | | 779 * | | `source` handle => `out` buffer | | | | 780 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 781 * | |<----------- `range_length` / buffer capacity ----------->| | 782 * |<------------------------------ file_size ------------------------------->| 783 * 784 * Special parameter values: 785 * - range_length == -1 means entire file 786 * - block_size == 0 means entire range 787 * 788 */ 789ssize_t LiveSession::fetchFile( 790 const char *url, sp<ABuffer> *out, 791 int64_t range_offset, int64_t range_length, 792 uint32_t block_size, /* download block size */ 793 sp<DataSource> *source, /* to return and reuse source */ 794 String8 *actualUrl) { 795 off64_t size; 796 sp<DataSource> temp_source; 797 if (source == NULL) { 798 source = &temp_source; 799 } 800 801 if (*source == NULL) { 802 if (!strncasecmp(url, "file://", 7)) { 803 *source = new FileSource(url + 7); 804 } else if (strncasecmp(url, "http://", 7) 805 && strncasecmp(url, "https://", 8)) { 806 return ERROR_UNSUPPORTED; 807 } else { 808 KeyedVector<String8, String8> headers = mExtraHeaders; 809 if (range_offset > 0 || range_length >= 0) { 810 headers.add( 811 String8("Range"), 812 String8( 813 StringPrintf( 814 "bytes=%lld-%s", 815 range_offset, 816 range_length < 0 817 ? "" : StringPrintf("%lld", 818 range_offset + range_length - 1).c_str()).c_str())); 819 } 820 status_t err = mHTTPDataSource->connect(url, &headers); 821 822 if (err != OK) { 823 return err; 824 } 825 826 *source = mHTTPDataSource; 827 } 828 } 829 830 status_t getSizeErr = (*source)->getSize(&size); 831 if (getSizeErr != OK) { 832 size = 65536; 833 } 834 835 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 836 if (*out == NULL) { 837 buffer->setRange(0, 0); 838 } 839 840 ssize_t bytesRead = 0; 841 // adjust range_length if only reading partial block 842 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 843 range_length = buffer->size() + block_size; 844 } 845 for (;;) { 846 // Only resize when we don't know the size. 847 size_t bufferRemaining = buffer->capacity() - buffer->size(); 848 if (bufferRemaining == 0 && getSizeErr != OK) { 849 bufferRemaining = 32768; 850 851 ALOGV("increasing download buffer to %zu bytes", 852 buffer->size() + bufferRemaining); 853 854 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 855 memcpy(copy->data(), buffer->data(), buffer->size()); 856 copy->setRange(0, buffer->size()); 857 858 buffer = copy; 859 } 860 861 size_t maxBytesToRead = bufferRemaining; 862 if (range_length >= 0) { 863 int64_t bytesLeftInRange = range_length - buffer->size(); 864 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 865 maxBytesToRead = bytesLeftInRange; 866 867 if (bytesLeftInRange == 0) { 868 break; 869 } 870 } 871 } 872 873 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 874 // to help us break out of the loop. 875 ssize_t n = (*source)->readAt( 876 buffer->size(), buffer->data() + buffer->size(), 877 maxBytesToRead); 878 879 if (n < 0) { 880 return n; 881 } 882 883 if (n == 0) { 884 break; 885 } 886 887 buffer->setRange(0, buffer->size() + (size_t)n); 888 bytesRead += n; 889 } 890 891 *out = buffer; 892 if (actualUrl != NULL) { 893 *actualUrl = (*source)->getUri(); 894 if (actualUrl->isEmpty()) { 895 *actualUrl = url; 896 } 897 } 898 899 return bytesRead; 900} 901 902sp<M3UParser> LiveSession::fetchPlaylist( 903 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 904 ALOGV("fetchPlaylist '%s'", url); 905 906 *unchanged = false; 907 908 sp<ABuffer> buffer; 909 String8 actualUrl; 910 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 911 912 if (err <= 0) { 913 return NULL; 914 } 915 916 // MD5 functionality is not available on the simulator, treat all 917 // playlists as changed. 918 919#if defined(HAVE_ANDROID_OS) 920 uint8_t hash[16]; 921 922 MD5_CTX m; 923 MD5_Init(&m); 924 MD5_Update(&m, buffer->data(), buffer->size()); 925 926 MD5_Final(hash, &m); 927 928 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 929 // playlist unchanged 930 *unchanged = true; 931 932 return NULL; 933 } 934 935 if (curPlaylistHash != NULL) { 936 memcpy(curPlaylistHash, hash, sizeof(hash)); 937 } 938#endif 939 940 sp<M3UParser> playlist = 941 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 942 943 if (playlist->initCheck() != OK) { 944 ALOGE("failed to parse .m3u8 playlist"); 945 946 return NULL; 947 } 948 949 return playlist; 950} 951 952static double uniformRand() { 953 return (double)rand() / RAND_MAX; 954} 955 956size_t LiveSession::getBandwidthIndex() { 957 if (mBandwidthItems.size() == 0) { 958 return 0; 959 } 960 961#if 1 962 char value[PROPERTY_VALUE_MAX]; 963 ssize_t index = -1; 964 if (property_get("media.httplive.bw-index", value, NULL)) { 965 char *end; 966 index = strtol(value, &end, 10); 967 CHECK(end > value && *end == '\0'); 968 969 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 970 index = mBandwidthItems.size() - 1; 971 } 972 } 973 974 if (index < 0) { 975 int32_t bandwidthBps; 976 if (mHTTPDataSource != NULL 977 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 978 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 979 } else { 980 ALOGV("no bandwidth estimate."); 981 return 0; // Pick the lowest bandwidth stream by default. 982 } 983 984 char value[PROPERTY_VALUE_MAX]; 985 if (property_get("media.httplive.max-bw", value, NULL)) { 986 char *end; 987 long maxBw = strtoul(value, &end, 10); 988 if (end > value && *end == '\0') { 989 if (maxBw > 0 && bandwidthBps > maxBw) { 990 ALOGV("bandwidth capped to %ld bps", maxBw); 991 bandwidthBps = maxBw; 992 } 993 } 994 } 995 996 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 997 998 index = mBandwidthItems.size() - 1; 999 while (index > 0) { 1000 // consider only 80% of the available bandwidth, but if we are switching up, 1001 // be even more conservative (70%) to avoid overestimating and immediately 1002 // switching back. 1003 size_t adjustedBandwidthBps = bandwidthBps; 1004 if (index > mCurBandwidthIndex) { 1005 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 1006 } else { 1007 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 1008 } 1009 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1010 break; 1011 } 1012 --index; 1013 } 1014 } 1015#elif 0 1016 // Change bandwidth at random() 1017 size_t index = uniformRand() * mBandwidthItems.size(); 1018#elif 0 1019 // There's a 50% chance to stay on the current bandwidth and 1020 // a 50% chance to switch to the next higher bandwidth (wrapping around 1021 // to lowest) 1022 const size_t kMinIndex = 0; 1023 1024 static ssize_t mCurBandwidthIndex = -1; 1025 1026 size_t index; 1027 if (mCurBandwidthIndex < 0) { 1028 index = kMinIndex; 1029 } else if (uniformRand() < 0.5) { 1030 index = (size_t)mCurBandwidthIndex; 1031 } else { 1032 index = mCurBandwidthIndex + 1; 1033 if (index == mBandwidthItems.size()) { 1034 index = kMinIndex; 1035 } 1036 } 1037 mCurBandwidthIndex = index; 1038#elif 0 1039 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1040 1041 size_t index = mBandwidthItems.size() - 1; 1042 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1043 --index; 1044 } 1045#elif 1 1046 char value[PROPERTY_VALUE_MAX]; 1047 size_t index; 1048 if (property_get("media.httplive.bw-index", value, NULL)) { 1049 char *end; 1050 index = strtoul(value, &end, 10); 1051 CHECK(end > value && *end == '\0'); 1052 1053 if (index >= mBandwidthItems.size()) { 1054 index = mBandwidthItems.size() - 1; 1055 } 1056 } else { 1057 index = 0; 1058 } 1059#else 1060 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1061#endif 1062 1063 CHECK_GE(index, 0); 1064 1065 return index; 1066} 1067 1068status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1069 int64_t timeUs; 1070 CHECK(msg->findInt64("timeUs", &timeUs)); 1071 1072 if (!mReconfigurationInProgress) { 1073 changeConfiguration(timeUs, getBandwidthIndex()); 1074 } 1075 1076 return OK; 1077} 1078 1079status_t LiveSession::getDuration(int64_t *durationUs) const { 1080 int64_t maxDurationUs = 0ll; 1081 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1082 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1083 1084 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 1085 maxDurationUs = fetcherDurationUs; 1086 } 1087 } 1088 1089 *durationUs = maxDurationUs; 1090 1091 return OK; 1092} 1093 1094bool LiveSession::isSeekable() const { 1095 int64_t durationUs; 1096 return getDuration(&durationUs) == OK && durationUs >= 0; 1097} 1098 1099bool LiveSession::hasDynamicDuration() const { 1100 return false; 1101} 1102 1103size_t LiveSession::getTrackCount() const { 1104 if (mPlaylist == NULL) { 1105 return 0; 1106 } else { 1107 return mPlaylist->getTrackCount(); 1108 } 1109} 1110 1111sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1112 if (mPlaylist == NULL) { 1113 return NULL; 1114 } else { 1115 return mPlaylist->getTrackInfo(trackIndex); 1116 } 1117} 1118 1119status_t LiveSession::selectTrack(size_t index, bool select) { 1120 status_t err = mPlaylist->selectTrack(index, select); 1121 if (err == OK) { 1122 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1123 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1124 msg->setInt32("pickTrack", select); 1125 msg->post(); 1126 } 1127 return err; 1128} 1129 1130bool LiveSession::canSwitchUp() { 1131 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1132 status_t err = OK; 1133 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1134 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1135 int64_t dur = source->getBufferedDurationUs(&err); 1136 if (err == OK && dur > 10000000) { 1137 return true; 1138 } 1139 } 1140 return false; 1141} 1142 1143void LiveSession::changeConfiguration( 1144 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1145 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1146 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1147 cancelBandwidthSwitch(); 1148 1149 CHECK(!mReconfigurationInProgress); 1150 mReconfigurationInProgress = true; 1151 1152 mCurBandwidthIndex = bandwidthIndex; 1153 1154 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1155 timeUs, bandwidthIndex, pickTrack); 1156 1157 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1158 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1159 1160 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1161 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1162 1163 AString URIs[kMaxStreams]; 1164 for (size_t i = 0; i < kMaxStreams; ++i) { 1165 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1166 streamMask |= indexToType(i); 1167 } 1168 } 1169 1170 // Step 1, stop and discard fetchers that are no longer needed. 1171 // Pause those that we'll reuse. 1172 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1173 const AString &uri = mFetcherInfos.keyAt(i); 1174 1175 bool discardFetcher = true; 1176 1177 // If we're seeking all current fetchers are discarded. 1178 if (timeUs < 0ll) { 1179 // delay fetcher removal if not picking tracks 1180 discardFetcher = pickTrack; 1181 1182 for (size_t j = 0; j < kMaxStreams; ++j) { 1183 StreamType type = indexToType(j); 1184 if ((streamMask & type) && uri == URIs[j]) { 1185 resumeMask |= type; 1186 streamMask &= ~type; 1187 discardFetcher = false; 1188 } 1189 } 1190 } 1191 1192 if (discardFetcher) { 1193 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1194 } else { 1195 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1196 } 1197 } 1198 1199 sp<AMessage> msg; 1200 if (timeUs < 0ll) { 1201 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1202 msg = new AMessage(kWhatChangeConfiguration3, id()); 1203 } else { 1204 msg = new AMessage(kWhatChangeConfiguration2, id()); 1205 } 1206 msg->setInt32("streamMask", streamMask); 1207 msg->setInt32("resumeMask", resumeMask); 1208 msg->setInt32("pickTrack", pickTrack); 1209 msg->setInt64("timeUs", timeUs); 1210 for (size_t i = 0; i < kMaxStreams; ++i) { 1211 if ((streamMask | resumeMask) & indexToType(i)) { 1212 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1213 } 1214 } 1215 1216 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1217 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1218 // fetchers have completed their asynchronous operation, we'll post 1219 // mContinuation, which then is handled below in onChangeConfiguration2. 1220 mContinuationCounter = mFetcherInfos.size(); 1221 mContinuation = msg; 1222 1223 if (mContinuationCounter == 0) { 1224 msg->post(); 1225 1226 if (mSeekReplyID != 0) { 1227 CHECK(mSeekReply != NULL); 1228 mSeekReply->postReply(mSeekReplyID); 1229 } 1230 } 1231} 1232 1233void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1234 if (!mReconfigurationInProgress) { 1235 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1236 msg->findInt32("pickTrack", &pickTrack); 1237 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1238 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1239 } else { 1240 msg->post(1000000ll); // retry in 1 sec 1241 } 1242} 1243 1244void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1245 mContinuation.clear(); 1246 1247 // All fetchers are either suspended or have been removed now. 1248 1249 uint32_t streamMask, resumeMask; 1250 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1251 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1252 1253 // currently onChangeConfiguration2 is only called for seeking; 1254 // remove the following CHECK if using it else where. 1255 CHECK_EQ(resumeMask, 0); 1256 streamMask |= resumeMask; 1257 1258 AString URIs[kMaxStreams]; 1259 for (size_t i = 0; i < kMaxStreams; ++i) { 1260 if (streamMask & indexToType(i)) { 1261 const AString &uriKey = mStreams[i].uriKey(); 1262 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1263 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1264 } 1265 } 1266 1267 // Determine which decoders to shutdown on the player side, 1268 // a decoder has to be shutdown if either 1269 // 1) its streamtype was active before but now longer isn't. 1270 // or 1271 // 2) its streamtype was already active and still is but the URI 1272 // has changed. 1273 uint32_t changedMask = 0; 1274 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1275 if (((mStreamMask & streamMask & indexToType(i)) 1276 && !(URIs[i] == mStreams[i].mUri)) 1277 || (mStreamMask & ~streamMask & indexToType(i))) { 1278 changedMask |= indexToType(i); 1279 } 1280 } 1281 1282 if (changedMask == 0) { 1283 // If nothing changed as far as the audio/video decoders 1284 // are concerned we can proceed. 1285 onChangeConfiguration3(msg); 1286 return; 1287 } 1288 1289 // Something changed, inform the player which will shutdown the 1290 // corresponding decoders and will post the reply once that's done. 1291 // Handling the reply will continue executing below in 1292 // onChangeConfiguration3. 1293 sp<AMessage> notify = mNotify->dup(); 1294 notify->setInt32("what", kWhatStreamsChanged); 1295 notify->setInt32("changedMask", changedMask); 1296 1297 msg->setWhat(kWhatChangeConfiguration3); 1298 msg->setTarget(id()); 1299 1300 notify->setMessage("reply", msg); 1301 notify->post(); 1302} 1303 1304void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1305 mContinuation.clear(); 1306 // All remaining fetchers are still suspended, the player has shutdown 1307 // any decoders that needed it. 1308 1309 uint32_t streamMask, resumeMask; 1310 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1311 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1312 1313 int64_t timeUs; 1314 int32_t pickTrack; 1315 bool switching = false; 1316 CHECK(msg->findInt64("timeUs", &timeUs)); 1317 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1318 1319 if (timeUs < 0ll) { 1320 if (!pickTrack) { 1321 switching = true; 1322 } 1323 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1324 } else { 1325 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1326 } 1327 1328 for (size_t i = 0; i < kMaxStreams; ++i) { 1329 if (streamMask & indexToType(i)) { 1330 if (switching) { 1331 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1332 } else { 1333 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1334 } 1335 } 1336 } 1337 1338 mNewStreamMask = streamMask | resumeMask; 1339 if (switching) { 1340 mSwapMask = mStreamMask & ~resumeMask; 1341 } 1342 1343 // Of all existing fetchers: 1344 // * Resume fetchers that are still needed and assign them original packet sources. 1345 // * Mark otherwise unneeded fetchers for removal. 1346 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1347 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1348 const AString &uri = mFetcherInfos.keyAt(i); 1349 1350 sp<AnotherPacketSource> sources[kMaxStreams]; 1351 for (size_t j = 0; j < kMaxStreams; ++j) { 1352 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1353 sources[j] = mPacketSources.valueFor(indexToType(j)); 1354 1355 if (j != kSubtitleIndex) { 1356 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1357 sp<AnotherPacketSource> discontinuityQueue; 1358 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1359 discontinuityQueue->queueDiscontinuity( 1360 ATSParser::DISCONTINUITY_NONE, 1361 NULL, 1362 true); 1363 } 1364 } 1365 } 1366 1367 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1368 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1369 || sources[kSubtitleIndex] != NULL) { 1370 info.mFetcher->startAsync( 1371 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1372 } else { 1373 info.mToBeRemoved = true; 1374 } 1375 } 1376 1377 // streamMask now only contains the types that need a new fetcher created. 1378 1379 if (streamMask != 0) { 1380 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1381 } 1382 1383 // Find out when the original fetchers have buffered up to and start the new fetchers 1384 // at a later timestamp. 1385 for (size_t i = 0; i < kMaxStreams; i++) { 1386 if (!(indexToType(i) & streamMask)) { 1387 continue; 1388 } 1389 1390 AString uri; 1391 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1392 1393 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1394 CHECK(fetcher != NULL); 1395 1396 int32_t latestSeq = -1; 1397 int64_t startTimeUs = -1; 1398 int64_t segmentStartTimeUs = -1ll; 1399 int32_t discontinuitySeq = -1; 1400 sp<AnotherPacketSource> sources[kMaxStreams]; 1401 1402 // TRICKY: looping from i as earlier streams are already removed from streamMask 1403 for (size_t j = i; j < kMaxStreams; ++j) { 1404 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1405 if ((streamMask & indexToType(j)) && uri == streamUri) { 1406 sources[j] = mPacketSources.valueFor(indexToType(j)); 1407 1408 if (timeUs >= 0) { 1409 sources[j]->clear(); 1410 startTimeUs = timeUs; 1411 1412 sp<AnotherPacketSource> discontinuityQueue; 1413 sp<AMessage> extra = new AMessage; 1414 extra->setInt64("timeUs", timeUs); 1415 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1416 discontinuityQueue->queueDiscontinuity( 1417 ATSParser::DISCONTINUITY_SEEK, extra, true); 1418 } else { 1419 int32_t type; 1420 int64_t srcSegmentStartTimeUs; 1421 sp<AMessage> meta; 1422 if (pickTrack) { 1423 // selecting 1424 meta = sources[j]->getLatestDequeuedMeta(); 1425 } else { 1426 // adapting 1427 meta = sources[j]->getLatestEnqueuedMeta(); 1428 } 1429 1430 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1431 int64_t tmpUs; 1432 CHECK(meta->findInt64("timeUs", &tmpUs)); 1433 if (startTimeUs < 0 || tmpUs < startTimeUs) { 1434 startTimeUs = tmpUs; 1435 } 1436 1437 CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); 1438 if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { 1439 segmentStartTimeUs = tmpUs; 1440 } 1441 1442 int32_t seq; 1443 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1444 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1445 discontinuitySeq = seq; 1446 } 1447 } 1448 1449 if (pickTrack) { 1450 // selecting track, queue discontinuities before content 1451 sources[j]->clear(); 1452 if (j == kSubtitleIndex) { 1453 break; 1454 } 1455 sp<AnotherPacketSource> discontinuityQueue; 1456 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1457 discontinuityQueue->queueDiscontinuity( 1458 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1459 } else { 1460 // adapting, queue discontinuities after resume 1461 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1462 sources[j]->clear(); 1463 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1464 if (extraStreams & indexToType(j)) { 1465 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1466 } 1467 } 1468 } 1469 1470 streamMask &= ~indexToType(j); 1471 } 1472 } 1473 1474 fetcher->startAsync( 1475 sources[kAudioIndex], 1476 sources[kVideoIndex], 1477 sources[kSubtitleIndex], 1478 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1479 segmentStartTimeUs, 1480 discontinuitySeq, 1481 switching); 1482 } 1483 1484 // All fetchers have now been started, the configuration change 1485 // has completed. 1486 1487 cancelCheckBandwidthEvent(); 1488 scheduleCheckBandwidthEvent(); 1489 1490 ALOGV("XXX configuration change completed."); 1491 mReconfigurationInProgress = false; 1492 if (switching) { 1493 mSwitchInProgress = true; 1494 } else { 1495 mStreamMask = mNewStreamMask; 1496 } 1497 1498 if (mDisconnectReplyID != 0) { 1499 finishDisconnect(); 1500 } 1501} 1502 1503void LiveSession::onSwapped(const sp<AMessage> &msg) { 1504 int32_t switchGeneration; 1505 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1506 if (switchGeneration != mSwitchGeneration) { 1507 return; 1508 } 1509 1510 int32_t stream; 1511 CHECK(msg->findInt32("stream", &stream)); 1512 1513 ssize_t idx = typeToIndex(stream); 1514 CHECK(idx >= 0); 1515 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1516 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1517 } 1518 mStreams[idx].mUri = mStreams[idx].mNewUri; 1519 mStreams[idx].mNewUri.clear(); 1520 1521 mSwapMask &= ~stream; 1522 if (mSwapMask != 0) { 1523 return; 1524 } 1525 1526 // Check if new variant contains extra streams. 1527 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1528 while (extraStreams) { 1529 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1530 swapPacketSource(extraStream); 1531 extraStreams &= ~extraStream; 1532 1533 idx = typeToIndex(extraStream); 1534 CHECK(idx >= 0); 1535 if (mStreams[idx].mNewUri.empty()) { 1536 ALOGW("swapping extra stream type %d %s to empty stream", 1537 extraStream, mStreams[idx].mUri.c_str()); 1538 } 1539 mStreams[idx].mUri = mStreams[idx].mNewUri; 1540 mStreams[idx].mNewUri.clear(); 1541 } 1542 1543 tryToFinishBandwidthSwitch(); 1544} 1545 1546void LiveSession::onCheckSwitchDown() { 1547 if (mSwitchDownMonitor == NULL) { 1548 return; 1549 } 1550 1551 for (size_t i = 0; i < kMaxStreams; ++i) { 1552 int32_t targetDuration; 1553 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1554 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1555 1556 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1557 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1558 int64_t targetDurationUs = targetDuration * 1000000ll; 1559 1560 if (bufferedDurationUs < targetDurationUs / 3) { 1561 (new AMessage(kWhatSwitchDown, id()))->post(); 1562 break; 1563 } 1564 } 1565 } 1566 1567 mSwitchDownMonitor->post(1000000ll); 1568} 1569 1570void LiveSession::onSwitchDown() { 1571 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1572 return; 1573 } 1574 1575 ssize_t bandwidthIndex = getBandwidthIndex(); 1576 if (bandwidthIndex < mCurBandwidthIndex) { 1577 changeConfiguration(-1, bandwidthIndex, false); 1578 return; 1579 } 1580 1581 changeConfiguration(-1, mCurBandwidthIndex - 1, false); 1582} 1583 1584// Mark switch done when: 1585// 1. all old buffers are swapped out 1586void LiveSession::tryToFinishBandwidthSwitch() { 1587 if (!mSwitchInProgress) { 1588 return; 1589 } 1590 1591 bool needToRemoveFetchers = false; 1592 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1593 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1594 needToRemoveFetchers = true; 1595 break; 1596 } 1597 } 1598 1599 if (!needToRemoveFetchers && mSwapMask == 0) { 1600 ALOGI("mSwitchInProgress = false"); 1601 mStreamMask = mNewStreamMask; 1602 mSwitchInProgress = false; 1603 } 1604} 1605 1606void LiveSession::scheduleCheckBandwidthEvent() { 1607 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1608 msg->setInt32("generation", mCheckBandwidthGeneration); 1609 msg->post(10000000ll); 1610} 1611 1612void LiveSession::cancelCheckBandwidthEvent() { 1613 ++mCheckBandwidthGeneration; 1614} 1615 1616void LiveSession::cancelBandwidthSwitch() { 1617 Mutex::Autolock lock(mSwapMutex); 1618 mSwitchGeneration++; 1619 mSwitchInProgress = false; 1620 mSwapMask = 0; 1621 1622 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1623 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1624 if (info.mToBeRemoved) { 1625 info.mToBeRemoved = false; 1626 } 1627 } 1628 1629 for (size_t i = 0; i < kMaxStreams; ++i) { 1630 if (!mStreams[i].mNewUri.empty()) { 1631 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1632 if (j < 0) { 1633 mStreams[i].mNewUri.clear(); 1634 continue; 1635 } 1636 1637 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1638 info.mFetcher->stopAsync(); 1639 mFetcherInfos.removeItemsAt(j); 1640 mStreams[i].mNewUri.clear(); 1641 } 1642 } 1643} 1644 1645bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1646 if (mReconfigurationInProgress || mSwitchInProgress) { 1647 return false; 1648 } 1649 1650 if (mCurBandwidthIndex < 0) { 1651 return true; 1652 } 1653 1654 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1655 return false; 1656 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1657 return canSwitchUp(); 1658 } else { 1659 return true; 1660 } 1661} 1662 1663void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { 1664 size_t bandwidthIndex = getBandwidthIndex(); 1665 if (canSwitchBandwidthTo(bandwidthIndex)) { 1666 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1667 } else { 1668 // Come back and check again 10 seconds later in case there is nothing to do now. 1669 // If we DO change configuration, once that completes it'll schedule a new 1670 // check bandwidth event with an incremented mCheckBandwidthGeneration. 1671 msg->post(10000000ll); 1672 } 1673} 1674 1675void LiveSession::postPrepared(status_t err) { 1676 CHECK(mInPreparationPhase); 1677 1678 sp<AMessage> notify = mNotify->dup(); 1679 if (err == OK || err == ERROR_END_OF_STREAM) { 1680 notify->setInt32("what", kWhatPrepared); 1681 } else { 1682 notify->setInt32("what", kWhatPreparationFailed); 1683 notify->setInt32("err", err); 1684 } 1685 1686 notify->post(); 1687 1688 mInPreparationPhase = false; 1689 1690 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1691 mSwitchDownMonitor->post(); 1692} 1693 1694} // namespace android 1695 1696