LiveSession.cpp revision ec5206c99694d263ac099bf2c37f8119f43f74f1
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52LiveSession::LiveSession( 53 const sp<AMessage> ¬ify, uint32_t flags, 54 const sp<IMediaHTTPService> &httpService) 55 : mNotify(notify), 56 mFlags(flags), 57 mHTTPService(httpService), 58 mInPreparationPhase(true), 59 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 60 mCurBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mSubtitleGeneration(0), 67 mLastDequeuedTimeUs(0ll), 68 mRealTimeBaseUs(0ll), 69 mReconfigurationInProgress(false), 70 mSwitchInProgress(false), 71 mDisconnectReplyID(0), 72 mSeekReplyID(0), 73 mFirstTimeUsValid(false), 74 mFirstTimeUs(0), 75 mLastSeekTimeUs(0) { 76 77 mStreams[kAudioIndex] = StreamItem("audio"); 78 mStreams[kVideoIndex] = StreamItem("video"); 79 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 80 81 for (size_t i = 0; i < kMaxStreams; ++i) { 82 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 84 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 85 mBuffering[i] = false; 86 } 87} 88 89LiveSession::~LiveSession() { 90} 91 92sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 93 ABuffer *discontinuity = new ABuffer(0); 94 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 95 discontinuity->meta()->setInt32("swapPacketSource", swap); 96 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 97 discontinuity->meta()->setInt64("timeUs", -1); 98 return discontinuity; 99} 100 101void LiveSession::swapPacketSource(StreamType stream) { 102 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 103 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 104 sp<AnotherPacketSource> tmp = aps; 105 aps = aps2; 106 aps2 = tmp; 107 aps2->clear(); 108} 109 110status_t LiveSession::dequeueAccessUnit( 111 StreamType stream, sp<ABuffer> *accessUnit) { 112 if (!(mStreamMask & stream)) { 113 // return -EWOULDBLOCK to avoid halting the decoder 114 // when switching between audio/video and audio only. 115 return -EWOULDBLOCK; 116 } 117 118 status_t finalResult; 119 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 120 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 121 discontinuityQueue->dequeueAccessUnit(accessUnit); 122 // seeking, track switching 123 sp<AMessage> extra; 124 int64_t timeUs; 125 if ((*accessUnit)->meta()->findMessage("extra", &extra) 126 && extra != NULL 127 && extra->findInt64("timeUs", &timeUs)) { 128 // seeking only 129 mLastSeekTimeUs = timeUs; 130 mDiscontinuityOffsetTimesUs.clear(); 131 mDiscontinuityAbsStartTimesUs.clear(); 132 } 133 return INFO_DISCONTINUITY; 134 } 135 136 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 137 138 ssize_t idx = typeToIndex(stream); 139 if (!packetSource->hasBufferAvailable(&finalResult)) { 140 if (finalResult == OK) { 141 mBuffering[idx] = true; 142 return -EAGAIN; 143 } else { 144 return finalResult; 145 } 146 } 147 148 if (mBuffering[idx]) { 149 if (mSwitchInProgress 150 || packetSource->isFinished(0) 151 || packetSource->getEstimatedDurationUs() > 10000000ll) { 152 mBuffering[idx] = false; 153 } 154 } 155 156 if (mBuffering[idx]) { 157 return -EAGAIN; 158 } 159 160 // wait for counterpart 161 sp<AnotherPacketSource> otherSource; 162 uint32_t mask = mNewStreamMask & mStreamMask; 163 uint32_t fetchersMask = 0; 164 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 165 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 166 fetchersMask |= fetcherMask; 167 } 168 mask &= fetchersMask; 169 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 170 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 171 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 172 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 173 } 174 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 175 return finalResult == OK ? -EAGAIN : finalResult; 176 } 177 178 status_t err = packetSource->dequeueAccessUnit(accessUnit); 179 180 size_t streamIdx; 181 const char *streamStr; 182 switch (stream) { 183 case STREAMTYPE_AUDIO: 184 streamIdx = kAudioIndex; 185 streamStr = "audio"; 186 break; 187 case STREAMTYPE_VIDEO: 188 streamIdx = kVideoIndex; 189 streamStr = "video"; 190 break; 191 case STREAMTYPE_SUBTITLES: 192 streamIdx = kSubtitleIndex; 193 streamStr = "subs"; 194 break; 195 default: 196 TRESPASS(); 197 } 198 199 StreamItem& strm = mStreams[streamIdx]; 200 if (err == INFO_DISCONTINUITY) { 201 // adaptive streaming, discontinuities in the playlist 202 int32_t type; 203 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 204 205 sp<AMessage> extra; 206 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 207 extra.clear(); 208 } 209 210 ALOGI("[%s] read discontinuity of type %d, extra = %s", 211 streamStr, 212 type, 213 extra == NULL ? "NULL" : extra->debugString().c_str()); 214 215 int32_t swap; 216 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 217 int32_t switchGeneration; 218 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 219 { 220 Mutex::Autolock lock(mSwapMutex); 221 if (switchGeneration == mSwitchGeneration) { 222 swapPacketSource(stream); 223 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 224 msg->setInt32("stream", stream); 225 msg->setInt32("switchGeneration", switchGeneration); 226 msg->post(); 227 } 228 } 229 } else { 230 size_t seq = strm.mCurDiscontinuitySeq; 231 int64_t offsetTimeUs; 232 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 233 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 234 } else { 235 offsetTimeUs = 0; 236 } 237 238 seq += 1; 239 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 240 int64_t firstTimeUs; 241 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 242 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 243 offsetTimeUs += strm.mLastSampleDurationUs; 244 } else { 245 offsetTimeUs += strm.mLastSampleDurationUs; 246 } 247 248 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 249 } 250 } else if (err == OK) { 251 252 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 253 int64_t timeUs; 254 int32_t discontinuitySeq = 0; 255 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 256 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 257 strm.mCurDiscontinuitySeq = discontinuitySeq; 258 259 int32_t discard = 0; 260 int64_t firstTimeUs; 261 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 262 int64_t durUs; // approximate sample duration 263 if (timeUs > strm.mLastDequeuedTimeUs) { 264 durUs = timeUs - strm.mLastDequeuedTimeUs; 265 } else { 266 durUs = strm.mLastDequeuedTimeUs - timeUs; 267 } 268 strm.mLastSampleDurationUs = durUs; 269 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 270 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 271 firstTimeUs = timeUs; 272 } else { 273 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 274 firstTimeUs = timeUs; 275 } 276 277 strm.mLastDequeuedTimeUs = timeUs; 278 if (timeUs >= firstTimeUs) { 279 timeUs -= firstTimeUs; 280 } else { 281 timeUs = 0; 282 } 283 timeUs += mLastSeekTimeUs; 284 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 285 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 286 } 287 288 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 289 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 290 mLastDequeuedTimeUs = timeUs; 291 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 292 } else if (stream == STREAMTYPE_SUBTITLES) { 293 int32_t subtitleGeneration; 294 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 295 && subtitleGeneration != mSubtitleGeneration) { 296 return -EAGAIN; 297 }; 298 (*accessUnit)->meta()->setInt32( 299 "trackIndex", mPlaylist->getSelectedIndex()); 300 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 301 } 302 } else { 303 ALOGI("[%s] encountered error %d", streamStr, err); 304 } 305 306 return err; 307} 308 309status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 310 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 311 if (!(mStreamMask & stream)) { 312 return UNKNOWN_ERROR; 313 } 314 315 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 316 317 sp<MetaData> meta = packetSource->getFormat(); 318 319 if (meta == NULL) { 320 return -EAGAIN; 321 } 322 323 return convertMetaDataToMessage(meta, format); 324} 325 326void LiveSession::connectAsync( 327 const char *url, const KeyedVector<String8, String8> *headers) { 328 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 329 msg->setString("url", url); 330 331 if (headers != NULL) { 332 msg->setPointer( 333 "headers", 334 new KeyedVector<String8, String8>(*headers)); 335 } 336 337 msg->post(); 338} 339 340status_t LiveSession::disconnect() { 341 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 342 343 sp<AMessage> response; 344 status_t err = msg->postAndAwaitResponse(&response); 345 346 return err; 347} 348 349status_t LiveSession::seekTo(int64_t timeUs) { 350 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 351 msg->setInt64("timeUs", timeUs); 352 353 sp<AMessage> response; 354 status_t err = msg->postAndAwaitResponse(&response); 355 356 return err; 357} 358 359void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 360 switch (msg->what()) { 361 case kWhatConnect: 362 { 363 onConnect(msg); 364 break; 365 } 366 367 case kWhatDisconnect: 368 { 369 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 370 371 if (mReconfigurationInProgress) { 372 break; 373 } 374 375 finishDisconnect(); 376 break; 377 } 378 379 case kWhatSeek: 380 { 381 uint32_t seekReplyID; 382 CHECK(msg->senderAwaitsResponse(&seekReplyID)); 383 mSeekReplyID = seekReplyID; 384 mSeekReply = new AMessage; 385 386 status_t err = onSeek(msg); 387 388 if (err != OK) { 389 msg->post(50000); 390 } 391 break; 392 } 393 394 case kWhatFetcherNotify: 395 { 396 int32_t what; 397 CHECK(msg->findInt32("what", &what)); 398 399 switch (what) { 400 case PlaylistFetcher::kWhatStarted: 401 break; 402 case PlaylistFetcher::kWhatPaused: 403 case PlaylistFetcher::kWhatStopped: 404 { 405 if (what == PlaylistFetcher::kWhatStopped) { 406 AString uri; 407 CHECK(msg->findString("uri", &uri)); 408 if (mFetcherInfos.removeItem(uri) < 0) { 409 // ignore duplicated kWhatStopped messages. 410 break; 411 } 412 413 if (mSwitchInProgress) { 414 tryToFinishBandwidthSwitch(); 415 } 416 } 417 418 if (mContinuation != NULL) { 419 CHECK_GT(mContinuationCounter, 0); 420 if (--mContinuationCounter == 0) { 421 mContinuation->post(); 422 423 if (mSeekReplyID != 0) { 424 CHECK(mSeekReply != NULL); 425 mSeekReply->setInt32("err", OK); 426 mSeekReply->postReply(mSeekReplyID); 427 mSeekReplyID = 0; 428 mSeekReply.clear(); 429 } 430 } 431 } 432 break; 433 } 434 435 case PlaylistFetcher::kWhatDurationUpdate: 436 { 437 AString uri; 438 CHECK(msg->findString("uri", &uri)); 439 440 int64_t durationUs; 441 CHECK(msg->findInt64("durationUs", &durationUs)); 442 443 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 444 info->mDurationUs = durationUs; 445 break; 446 } 447 448 case PlaylistFetcher::kWhatError: 449 { 450 status_t err; 451 CHECK(msg->findInt32("err", &err)); 452 453 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 454 455 // handle EOS on subtitle tracks independently 456 AString uri; 457 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 458 ssize_t i = mFetcherInfos.indexOfKey(uri); 459 if (i >= 0) { 460 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 461 if (fetcher != NULL) { 462 uint32_t type = fetcher->getStreamTypeMask(); 463 if (type == STREAMTYPE_SUBTITLES) { 464 mPacketSources.valueFor( 465 STREAMTYPE_SUBTITLES)->signalEOS(err);; 466 break; 467 } 468 } 469 } 470 } 471 472 if (mInPreparationPhase) { 473 postPrepared(err); 474 } 475 476 cancelBandwidthSwitch(); 477 478 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 479 480 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 481 482 mPacketSources.valueFor( 483 STREAMTYPE_SUBTITLES)->signalEOS(err); 484 485 sp<AMessage> notify = mNotify->dup(); 486 notify->setInt32("what", kWhatError); 487 notify->setInt32("err", err); 488 notify->post(); 489 break; 490 } 491 492 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 493 { 494 AString uri; 495 CHECK(msg->findString("uri", &uri)); 496 497 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 498 info->mIsPrepared = true; 499 500 if (mInPreparationPhase) { 501 bool allFetchersPrepared = true; 502 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 503 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 504 allFetchersPrepared = false; 505 break; 506 } 507 } 508 509 if (allFetchersPrepared) { 510 postPrepared(OK); 511 } 512 } 513 break; 514 } 515 516 case PlaylistFetcher::kWhatStartedAt: 517 { 518 int32_t switchGeneration; 519 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 520 521 if (switchGeneration != mSwitchGeneration) { 522 break; 523 } 524 525 // Resume fetcher for the original variant; the resumed fetcher should 526 // continue until the timestamps found in msg, which is stored by the 527 // new fetcher to indicate where the new variant has started buffering. 528 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 529 const FetcherInfo info = mFetcherInfos.valueAt(i); 530 if (info.mToBeRemoved) { 531 info.mFetcher->resumeUntilAsync(msg); 532 } 533 } 534 break; 535 } 536 537 default: 538 TRESPASS(); 539 } 540 541 break; 542 } 543 544 case kWhatCheckBandwidth: 545 { 546 int32_t generation; 547 CHECK(msg->findInt32("generation", &generation)); 548 549 if (generation != mCheckBandwidthGeneration) { 550 break; 551 } 552 553 onCheckBandwidth(msg); 554 break; 555 } 556 557 case kWhatChangeConfiguration: 558 { 559 onChangeConfiguration(msg); 560 break; 561 } 562 563 case kWhatChangeConfiguration2: 564 { 565 onChangeConfiguration2(msg); 566 break; 567 } 568 569 case kWhatChangeConfiguration3: 570 { 571 onChangeConfiguration3(msg); 572 break; 573 } 574 575 case kWhatFinishDisconnect2: 576 { 577 onFinishDisconnect2(); 578 break; 579 } 580 581 case kWhatSwapped: 582 { 583 onSwapped(msg); 584 break; 585 } 586 587 case kWhatCheckSwitchDown: 588 { 589 onCheckSwitchDown(); 590 break; 591 } 592 593 case kWhatSwitchDown: 594 { 595 onSwitchDown(); 596 break; 597 } 598 599 default: 600 TRESPASS(); 601 break; 602 } 603} 604 605// static 606int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 607 if (a->mBandwidth < b->mBandwidth) { 608 return -1; 609 } else if (a->mBandwidth == b->mBandwidth) { 610 return 0; 611 } 612 613 return 1; 614} 615 616// static 617LiveSession::StreamType LiveSession::indexToType(int idx) { 618 CHECK(idx >= 0 && idx < kMaxStreams); 619 return (StreamType)(1 << idx); 620} 621 622// static 623ssize_t LiveSession::typeToIndex(int32_t type) { 624 switch (type) { 625 case STREAMTYPE_AUDIO: 626 return 0; 627 case STREAMTYPE_VIDEO: 628 return 1; 629 case STREAMTYPE_SUBTITLES: 630 return 2; 631 default: 632 return -1; 633 }; 634 return -1; 635} 636 637void LiveSession::onConnect(const sp<AMessage> &msg) { 638 AString url; 639 CHECK(msg->findString("url", &url)); 640 641 KeyedVector<String8, String8> *headers = NULL; 642 if (!msg->findPointer("headers", (void **)&headers)) { 643 mExtraHeaders.clear(); 644 } else { 645 mExtraHeaders = *headers; 646 647 delete headers; 648 headers = NULL; 649 } 650 651 // TODO currently we don't know if we are coming here from incognito mode 652 ALOGI("onConnect %s", uriDebugString(url).c_str()); 653 654 mMasterURL = url; 655 656 bool dummy; 657 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 658 659 if (mPlaylist == NULL) { 660 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 661 662 postPrepared(ERROR_IO); 663 return; 664 } 665 666 // We trust the content provider to make a reasonable choice of preferred 667 // initial bandwidth by listing it first in the variant playlist. 668 // At startup we really don't have a good estimate on the available 669 // network bandwidth since we haven't tranferred any data yet. Once 670 // we have we can make a better informed choice. 671 size_t initialBandwidth = 0; 672 size_t initialBandwidthIndex = 0; 673 674 if (mPlaylist->isVariantPlaylist()) { 675 for (size_t i = 0; i < mPlaylist->size(); ++i) { 676 BandwidthItem item; 677 678 item.mPlaylistIndex = i; 679 680 sp<AMessage> meta; 681 AString uri; 682 mPlaylist->itemAt(i, &uri, &meta); 683 684 unsigned long bandwidth; 685 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 686 687 if (initialBandwidth == 0) { 688 initialBandwidth = item.mBandwidth; 689 } 690 691 mBandwidthItems.push(item); 692 } 693 694 CHECK_GT(mBandwidthItems.size(), 0u); 695 696 mBandwidthItems.sort(SortByBandwidth); 697 698 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 699 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 700 initialBandwidthIndex = i; 701 break; 702 } 703 } 704 } else { 705 // dummy item. 706 BandwidthItem item; 707 item.mPlaylistIndex = 0; 708 item.mBandwidth = 0; 709 mBandwidthItems.push(item); 710 } 711 712 mPlaylist->pickRandomMediaItems(); 713 changeConfiguration( 714 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 715} 716 717void LiveSession::finishDisconnect() { 718 // No reconfiguration is currently pending, make sure none will trigger 719 // during disconnection either. 720 cancelCheckBandwidthEvent(); 721 722 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 723 // (finishDisconnect, onFinishDisconnect2) 724 cancelBandwidthSwitch(); 725 726 // cancel switch down monitor 727 mSwitchDownMonitor.clear(); 728 729 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 730 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 731 } 732 733 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 734 735 mContinuationCounter = mFetcherInfos.size(); 736 mContinuation = msg; 737 738 if (mContinuationCounter == 0) { 739 msg->post(); 740 } 741} 742 743void LiveSession::onFinishDisconnect2() { 744 mContinuation.clear(); 745 746 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 747 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 748 749 mPacketSources.valueFor( 750 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 751 752 sp<AMessage> response = new AMessage; 753 response->setInt32("err", OK); 754 755 response->postReply(mDisconnectReplyID); 756 mDisconnectReplyID = 0; 757} 758 759sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 760 ssize_t index = mFetcherInfos.indexOfKey(uri); 761 762 if (index >= 0) { 763 return NULL; 764 } 765 766 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 767 notify->setString("uri", uri); 768 notify->setInt32("switchGeneration", mSwitchGeneration); 769 770 FetcherInfo info; 771 info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration); 772 info.mDurationUs = -1ll; 773 info.mIsPrepared = false; 774 info.mToBeRemoved = false; 775 looper()->registerHandler(info.mFetcher); 776 777 mFetcherInfos.add(uri, info); 778 779 return info.mFetcher; 780} 781 782/* 783 * Illustration of parameters: 784 * 785 * 0 `range_offset` 786 * +------------+-------------------------------------------------------+--+--+ 787 * | | | next block to fetch | | | 788 * | | `source` handle => `out` buffer | | | | 789 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 790 * | |<----------- `range_length` / buffer capacity ----------->| | 791 * |<------------------------------ file_size ------------------------------->| 792 * 793 * Special parameter values: 794 * - range_length == -1 means entire file 795 * - block_size == 0 means entire range 796 * 797 */ 798ssize_t LiveSession::fetchFile( 799 const char *url, sp<ABuffer> *out, 800 int64_t range_offset, int64_t range_length, 801 uint32_t block_size, /* download block size */ 802 sp<DataSource> *source, /* to return and reuse source */ 803 String8 *actualUrl) { 804 off64_t size; 805 sp<DataSource> temp_source; 806 if (source == NULL) { 807 source = &temp_source; 808 } 809 810 if (*source == NULL) { 811 if (!strncasecmp(url, "file://", 7)) { 812 *source = new FileSource(url + 7); 813 } else if (strncasecmp(url, "http://", 7) 814 && strncasecmp(url, "https://", 8)) { 815 return ERROR_UNSUPPORTED; 816 } else { 817 KeyedVector<String8, String8> headers = mExtraHeaders; 818 if (range_offset > 0 || range_length >= 0) { 819 headers.add( 820 String8("Range"), 821 String8( 822 StringPrintf( 823 "bytes=%lld-%s", 824 range_offset, 825 range_length < 0 826 ? "" : StringPrintf("%lld", 827 range_offset + range_length - 1).c_str()).c_str())); 828 } 829 status_t err = mHTTPDataSource->connect(url, &headers); 830 831 if (err != OK) { 832 return err; 833 } 834 835 *source = mHTTPDataSource; 836 } 837 } 838 839 status_t getSizeErr = (*source)->getSize(&size); 840 if (getSizeErr != OK) { 841 size = 65536; 842 } 843 844 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 845 if (*out == NULL) { 846 buffer->setRange(0, 0); 847 } 848 849 ssize_t bytesRead = 0; 850 // adjust range_length if only reading partial block 851 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 852 range_length = buffer->size() + block_size; 853 } 854 for (;;) { 855 // Only resize when we don't know the size. 856 size_t bufferRemaining = buffer->capacity() - buffer->size(); 857 if (bufferRemaining == 0 && getSizeErr != OK) { 858 bufferRemaining = 32768; 859 860 ALOGV("increasing download buffer to %zu bytes", 861 buffer->size() + bufferRemaining); 862 863 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 864 memcpy(copy->data(), buffer->data(), buffer->size()); 865 copy->setRange(0, buffer->size()); 866 867 buffer = copy; 868 } 869 870 size_t maxBytesToRead = bufferRemaining; 871 if (range_length >= 0) { 872 int64_t bytesLeftInRange = range_length - buffer->size(); 873 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 874 maxBytesToRead = bytesLeftInRange; 875 876 if (bytesLeftInRange == 0) { 877 break; 878 } 879 } 880 } 881 882 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 883 // to help us break out of the loop. 884 ssize_t n = (*source)->readAt( 885 buffer->size(), buffer->data() + buffer->size(), 886 maxBytesToRead); 887 888 if (n < 0) { 889 return n; 890 } 891 892 if (n == 0) { 893 break; 894 } 895 896 buffer->setRange(0, buffer->size() + (size_t)n); 897 bytesRead += n; 898 } 899 900 *out = buffer; 901 if (actualUrl != NULL) { 902 *actualUrl = (*source)->getUri(); 903 if (actualUrl->isEmpty()) { 904 *actualUrl = url; 905 } 906 } 907 908 return bytesRead; 909} 910 911sp<M3UParser> LiveSession::fetchPlaylist( 912 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 913 ALOGV("fetchPlaylist '%s'", url); 914 915 *unchanged = false; 916 917 sp<ABuffer> buffer; 918 String8 actualUrl; 919 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 920 921 if (err <= 0) { 922 return NULL; 923 } 924 925 // MD5 functionality is not available on the simulator, treat all 926 // playlists as changed. 927 928#if defined(HAVE_ANDROID_OS) 929 uint8_t hash[16]; 930 931 MD5_CTX m; 932 MD5_Init(&m); 933 MD5_Update(&m, buffer->data(), buffer->size()); 934 935 MD5_Final(hash, &m); 936 937 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 938 // playlist unchanged 939 *unchanged = true; 940 941 return NULL; 942 } 943 944 if (curPlaylistHash != NULL) { 945 memcpy(curPlaylistHash, hash, sizeof(hash)); 946 } 947#endif 948 949 sp<M3UParser> playlist = 950 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 951 952 if (playlist->initCheck() != OK) { 953 ALOGE("failed to parse .m3u8 playlist"); 954 955 return NULL; 956 } 957 958 return playlist; 959} 960 961static double uniformRand() { 962 return (double)rand() / RAND_MAX; 963} 964 965size_t LiveSession::getBandwidthIndex() { 966 if (mBandwidthItems.size() == 0) { 967 return 0; 968 } 969 970#if 1 971 char value[PROPERTY_VALUE_MAX]; 972 ssize_t index = -1; 973 if (property_get("media.httplive.bw-index", value, NULL)) { 974 char *end; 975 index = strtol(value, &end, 10); 976 CHECK(end > value && *end == '\0'); 977 978 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 979 index = mBandwidthItems.size() - 1; 980 } 981 } 982 983 if (index < 0) { 984 int32_t bandwidthBps; 985 if (mHTTPDataSource != NULL 986 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 987 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 988 } else { 989 ALOGV("no bandwidth estimate."); 990 return 0; // Pick the lowest bandwidth stream by default. 991 } 992 993 char value[PROPERTY_VALUE_MAX]; 994 if (property_get("media.httplive.max-bw", value, NULL)) { 995 char *end; 996 long maxBw = strtoul(value, &end, 10); 997 if (end > value && *end == '\0') { 998 if (maxBw > 0 && bandwidthBps > maxBw) { 999 ALOGV("bandwidth capped to %ld bps", maxBw); 1000 bandwidthBps = maxBw; 1001 } 1002 } 1003 } 1004 1005 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1006 1007 index = mBandwidthItems.size() - 1; 1008 while (index > 0) { 1009 // consider only 80% of the available bandwidth, but if we are switching up, 1010 // be even more conservative (70%) to avoid overestimating and immediately 1011 // switching back. 1012 size_t adjustedBandwidthBps = bandwidthBps; 1013 if (index > mCurBandwidthIndex) { 1014 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 1015 } else { 1016 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 1017 } 1018 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1019 break; 1020 } 1021 --index; 1022 } 1023 } 1024#elif 0 1025 // Change bandwidth at random() 1026 size_t index = uniformRand() * mBandwidthItems.size(); 1027#elif 0 1028 // There's a 50% chance to stay on the current bandwidth and 1029 // a 50% chance to switch to the next higher bandwidth (wrapping around 1030 // to lowest) 1031 const size_t kMinIndex = 0; 1032 1033 static ssize_t mCurBandwidthIndex = -1; 1034 1035 size_t index; 1036 if (mCurBandwidthIndex < 0) { 1037 index = kMinIndex; 1038 } else if (uniformRand() < 0.5) { 1039 index = (size_t)mCurBandwidthIndex; 1040 } else { 1041 index = mCurBandwidthIndex + 1; 1042 if (index == mBandwidthItems.size()) { 1043 index = kMinIndex; 1044 } 1045 } 1046 mCurBandwidthIndex = index; 1047#elif 0 1048 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1049 1050 size_t index = mBandwidthItems.size() - 1; 1051 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1052 --index; 1053 } 1054#elif 1 1055 char value[PROPERTY_VALUE_MAX]; 1056 size_t index; 1057 if (property_get("media.httplive.bw-index", value, NULL)) { 1058 char *end; 1059 index = strtoul(value, &end, 10); 1060 CHECK(end > value && *end == '\0'); 1061 1062 if (index >= mBandwidthItems.size()) { 1063 index = mBandwidthItems.size() - 1; 1064 } 1065 } else { 1066 index = 0; 1067 } 1068#else 1069 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1070#endif 1071 1072 CHECK_GE(index, 0); 1073 1074 return index; 1075} 1076 1077int64_t LiveSession::latestMediaSegmentStartTimeUs() { 1078 sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta(); 1079 int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1; 1080 if (audioMeta != NULL) { 1081 audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs); 1082 } 1083 1084 sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta(); 1085 if (videoMeta != NULL 1086 && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) { 1087 if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) { 1088 minSegmentStartTimeUs = videoSegmentStartTimeUs; 1089 } 1090 1091 } 1092 return minSegmentStartTimeUs; 1093} 1094 1095status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1096 int64_t timeUs; 1097 CHECK(msg->findInt64("timeUs", &timeUs)); 1098 1099 if (!mReconfigurationInProgress) { 1100 changeConfiguration(timeUs, mCurBandwidthIndex); 1101 return OK; 1102 } else { 1103 return -EWOULDBLOCK; 1104 } 1105} 1106 1107status_t LiveSession::getDuration(int64_t *durationUs) const { 1108 int64_t maxDurationUs = 0ll; 1109 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1110 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1111 1112 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 1113 maxDurationUs = fetcherDurationUs; 1114 } 1115 } 1116 1117 *durationUs = maxDurationUs; 1118 1119 return OK; 1120} 1121 1122bool LiveSession::isSeekable() const { 1123 int64_t durationUs; 1124 return getDuration(&durationUs) == OK && durationUs >= 0; 1125} 1126 1127bool LiveSession::hasDynamicDuration() const { 1128 return false; 1129} 1130 1131size_t LiveSession::getTrackCount() const { 1132 if (mPlaylist == NULL) { 1133 return 0; 1134 } else { 1135 return mPlaylist->getTrackCount(); 1136 } 1137} 1138 1139sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1140 if (mPlaylist == NULL) { 1141 return NULL; 1142 } else { 1143 return mPlaylist->getTrackInfo(trackIndex); 1144 } 1145} 1146 1147status_t LiveSession::selectTrack(size_t index, bool select) { 1148 if (mPlaylist == NULL) { 1149 return INVALID_OPERATION; 1150 } 1151 1152 ++mSubtitleGeneration; 1153 status_t err = mPlaylist->selectTrack(index, select); 1154 if (err == OK) { 1155 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1156 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1157 msg->setInt32("pickTrack", select); 1158 msg->post(); 1159 } 1160 return err; 1161} 1162 1163bool LiveSession::canSwitchUp() { 1164 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1165 status_t err = OK; 1166 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1167 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1168 int64_t dur = source->getBufferedDurationUs(&err); 1169 if (err == OK && dur > 10000000) { 1170 return true; 1171 } 1172 } 1173 return false; 1174} 1175 1176void LiveSession::changeConfiguration( 1177 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1178 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1179 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1180 cancelBandwidthSwitch(); 1181 1182 CHECK(!mReconfigurationInProgress); 1183 mReconfigurationInProgress = true; 1184 1185 mCurBandwidthIndex = bandwidthIndex; 1186 1187 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1188 timeUs, bandwidthIndex, pickTrack); 1189 1190 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1191 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1192 1193 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1194 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1195 1196 AString URIs[kMaxStreams]; 1197 for (size_t i = 0; i < kMaxStreams; ++i) { 1198 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1199 streamMask |= indexToType(i); 1200 } 1201 } 1202 1203 // Step 1, stop and discard fetchers that are no longer needed. 1204 // Pause those that we'll reuse. 1205 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1206 const AString &uri = mFetcherInfos.keyAt(i); 1207 1208 bool discardFetcher = true; 1209 1210 // If we're seeking all current fetchers are discarded. 1211 if (timeUs < 0ll) { 1212 // delay fetcher removal if not picking tracks 1213 discardFetcher = pickTrack; 1214 1215 for (size_t j = 0; j < kMaxStreams; ++j) { 1216 StreamType type = indexToType(j); 1217 if ((streamMask & type) && uri == URIs[j]) { 1218 resumeMask |= type; 1219 streamMask &= ~type; 1220 discardFetcher = false; 1221 } 1222 } 1223 } 1224 1225 if (discardFetcher) { 1226 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1227 } else { 1228 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1229 } 1230 } 1231 1232 sp<AMessage> msg; 1233 if (timeUs < 0ll) { 1234 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1235 msg = new AMessage(kWhatChangeConfiguration3, id()); 1236 } else { 1237 msg = new AMessage(kWhatChangeConfiguration2, id()); 1238 } 1239 msg->setInt32("streamMask", streamMask); 1240 msg->setInt32("resumeMask", resumeMask); 1241 msg->setInt32("pickTrack", pickTrack); 1242 msg->setInt64("timeUs", timeUs); 1243 for (size_t i = 0; i < kMaxStreams; ++i) { 1244 if ((streamMask | resumeMask) & indexToType(i)) { 1245 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1246 } 1247 } 1248 1249 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1250 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1251 // fetchers have completed their asynchronous operation, we'll post 1252 // mContinuation, which then is handled below in onChangeConfiguration2. 1253 mContinuationCounter = mFetcherInfos.size(); 1254 mContinuation = msg; 1255 1256 if (mContinuationCounter == 0) { 1257 msg->post(); 1258 1259 if (mSeekReplyID != 0) { 1260 CHECK(mSeekReply != NULL); 1261 mSeekReply->setInt32("err", OK); 1262 mSeekReply->postReply(mSeekReplyID); 1263 mSeekReplyID = 0; 1264 mSeekReply.clear(); 1265 } 1266 } 1267} 1268 1269void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1270 if (!mReconfigurationInProgress) { 1271 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1272 msg->findInt32("pickTrack", &pickTrack); 1273 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1274 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1275 } else { 1276 msg->post(1000000ll); // retry in 1 sec 1277 } 1278} 1279 1280void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1281 mContinuation.clear(); 1282 1283 // All fetchers are either suspended or have been removed now. 1284 1285 uint32_t streamMask, resumeMask; 1286 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1287 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1288 1289 // currently onChangeConfiguration2 is only called for seeking; 1290 // remove the following CHECK if using it else where. 1291 CHECK_EQ(resumeMask, 0); 1292 streamMask |= resumeMask; 1293 1294 AString URIs[kMaxStreams]; 1295 for (size_t i = 0; i < kMaxStreams; ++i) { 1296 if (streamMask & indexToType(i)) { 1297 const AString &uriKey = mStreams[i].uriKey(); 1298 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1299 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1300 } 1301 } 1302 1303 // Determine which decoders to shutdown on the player side, 1304 // a decoder has to be shutdown if either 1305 // 1) its streamtype was active before but now longer isn't. 1306 // or 1307 // 2) its streamtype was already active and still is but the URI 1308 // has changed. 1309 uint32_t changedMask = 0; 1310 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1311 if (((mStreamMask & streamMask & indexToType(i)) 1312 && !(URIs[i] == mStreams[i].mUri)) 1313 || (mStreamMask & ~streamMask & indexToType(i))) { 1314 changedMask |= indexToType(i); 1315 } 1316 } 1317 1318 if (changedMask == 0) { 1319 // If nothing changed as far as the audio/video decoders 1320 // are concerned we can proceed. 1321 onChangeConfiguration3(msg); 1322 return; 1323 } 1324 1325 // Something changed, inform the player which will shutdown the 1326 // corresponding decoders and will post the reply once that's done. 1327 // Handling the reply will continue executing below in 1328 // onChangeConfiguration3. 1329 sp<AMessage> notify = mNotify->dup(); 1330 notify->setInt32("what", kWhatStreamsChanged); 1331 notify->setInt32("changedMask", changedMask); 1332 1333 msg->setWhat(kWhatChangeConfiguration3); 1334 msg->setTarget(id()); 1335 1336 notify->setMessage("reply", msg); 1337 notify->post(); 1338} 1339 1340void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1341 mContinuation.clear(); 1342 // All remaining fetchers are still suspended, the player has shutdown 1343 // any decoders that needed it. 1344 1345 uint32_t streamMask, resumeMask; 1346 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1347 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1348 1349 int64_t timeUs; 1350 int32_t pickTrack; 1351 bool switching = false; 1352 CHECK(msg->findInt64("timeUs", &timeUs)); 1353 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1354 1355 if (timeUs < 0ll) { 1356 if (!pickTrack) { 1357 switching = true; 1358 } 1359 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1360 } else { 1361 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1362 } 1363 1364 for (size_t i = 0; i < kMaxStreams; ++i) { 1365 if (streamMask & indexToType(i)) { 1366 if (switching) { 1367 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1368 } else { 1369 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1370 } 1371 } 1372 } 1373 1374 mNewStreamMask = streamMask | resumeMask; 1375 if (switching) { 1376 mSwapMask = mStreamMask & ~resumeMask; 1377 } 1378 1379 // Of all existing fetchers: 1380 // * Resume fetchers that are still needed and assign them original packet sources. 1381 // * Mark otherwise unneeded fetchers for removal. 1382 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1383 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1384 const AString &uri = mFetcherInfos.keyAt(i); 1385 1386 sp<AnotherPacketSource> sources[kMaxStreams]; 1387 for (size_t j = 0; j < kMaxStreams; ++j) { 1388 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1389 sources[j] = mPacketSources.valueFor(indexToType(j)); 1390 1391 if (j != kSubtitleIndex) { 1392 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1393 sp<AnotherPacketSource> discontinuityQueue; 1394 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1395 discontinuityQueue->queueDiscontinuity( 1396 ATSParser::DISCONTINUITY_NONE, 1397 NULL, 1398 true); 1399 } 1400 } 1401 } 1402 1403 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1404 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1405 || sources[kSubtitleIndex] != NULL) { 1406 info.mFetcher->startAsync( 1407 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1408 } else { 1409 info.mToBeRemoved = true; 1410 } 1411 } 1412 1413 // streamMask now only contains the types that need a new fetcher created. 1414 1415 if (streamMask != 0) { 1416 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1417 } 1418 1419 // Find out when the original fetchers have buffered up to and start the new fetchers 1420 // at a later timestamp. 1421 for (size_t i = 0; i < kMaxStreams; i++) { 1422 if (!(indexToType(i) & streamMask)) { 1423 continue; 1424 } 1425 1426 AString uri; 1427 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1428 1429 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1430 CHECK(fetcher != NULL); 1431 1432 int32_t latestSeq = -1; 1433 int64_t startTimeUs = -1; 1434 int64_t segmentStartTimeUs = -1ll; 1435 int32_t discontinuitySeq = -1; 1436 sp<AnotherPacketSource> sources[kMaxStreams]; 1437 1438 if (i == kSubtitleIndex) { 1439 segmentStartTimeUs = latestMediaSegmentStartTimeUs(); 1440 } 1441 1442 // TRICKY: looping from i as earlier streams are already removed from streamMask 1443 for (size_t j = i; j < kMaxStreams; ++j) { 1444 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1445 if ((streamMask & indexToType(j)) && uri == streamUri) { 1446 sources[j] = mPacketSources.valueFor(indexToType(j)); 1447 1448 if (timeUs >= 0) { 1449 sources[j]->clear(); 1450 startTimeUs = timeUs; 1451 1452 sp<AnotherPacketSource> discontinuityQueue; 1453 sp<AMessage> extra = new AMessage; 1454 extra->setInt64("timeUs", timeUs); 1455 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1456 discontinuityQueue->queueDiscontinuity( 1457 ATSParser::DISCONTINUITY_SEEK, extra, true); 1458 } else { 1459 int32_t type; 1460 int64_t srcSegmentStartTimeUs; 1461 sp<AMessage> meta; 1462 if (pickTrack) { 1463 // selecting 1464 meta = sources[j]->getLatestDequeuedMeta(); 1465 } else { 1466 // adapting 1467 meta = sources[j]->getLatestEnqueuedMeta(); 1468 } 1469 1470 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1471 int64_t tmpUs; 1472 CHECK(meta->findInt64("timeUs", &tmpUs)); 1473 if (startTimeUs < 0 || tmpUs < startTimeUs) { 1474 startTimeUs = tmpUs; 1475 } 1476 1477 CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); 1478 if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { 1479 segmentStartTimeUs = tmpUs; 1480 } 1481 1482 int32_t seq; 1483 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1484 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1485 discontinuitySeq = seq; 1486 } 1487 } 1488 1489 if (pickTrack) { 1490 // selecting track, queue discontinuities before content 1491 sources[j]->clear(); 1492 if (j == kSubtitleIndex) { 1493 break; 1494 } 1495 sp<AnotherPacketSource> discontinuityQueue; 1496 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1497 discontinuityQueue->queueDiscontinuity( 1498 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1499 } else { 1500 // adapting, queue discontinuities after resume 1501 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1502 sources[j]->clear(); 1503 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1504 if (extraStreams & indexToType(j)) { 1505 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1506 } 1507 } 1508 } 1509 1510 streamMask &= ~indexToType(j); 1511 } 1512 } 1513 1514 fetcher->startAsync( 1515 sources[kAudioIndex], 1516 sources[kVideoIndex], 1517 sources[kSubtitleIndex], 1518 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1519 segmentStartTimeUs, 1520 discontinuitySeq, 1521 switching); 1522 } 1523 1524 // All fetchers have now been started, the configuration change 1525 // has completed. 1526 1527 cancelCheckBandwidthEvent(); 1528 scheduleCheckBandwidthEvent(); 1529 1530 ALOGV("XXX configuration change completed."); 1531 mReconfigurationInProgress = false; 1532 if (switching) { 1533 mSwitchInProgress = true; 1534 } else { 1535 mStreamMask = mNewStreamMask; 1536 } 1537 1538 if (mDisconnectReplyID != 0) { 1539 finishDisconnect(); 1540 } 1541} 1542 1543void LiveSession::onSwapped(const sp<AMessage> &msg) { 1544 int32_t switchGeneration; 1545 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1546 if (switchGeneration != mSwitchGeneration) { 1547 return; 1548 } 1549 1550 int32_t stream; 1551 CHECK(msg->findInt32("stream", &stream)); 1552 1553 ssize_t idx = typeToIndex(stream); 1554 CHECK(idx >= 0); 1555 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1556 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1557 } 1558 mStreams[idx].mUri = mStreams[idx].mNewUri; 1559 mStreams[idx].mNewUri.clear(); 1560 1561 mSwapMask &= ~stream; 1562 if (mSwapMask != 0) { 1563 return; 1564 } 1565 1566 // Check if new variant contains extra streams. 1567 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1568 while (extraStreams) { 1569 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1570 swapPacketSource(extraStream); 1571 extraStreams &= ~extraStream; 1572 1573 idx = typeToIndex(extraStream); 1574 CHECK(idx >= 0); 1575 if (mStreams[idx].mNewUri.empty()) { 1576 ALOGW("swapping extra stream type %d %s to empty stream", 1577 extraStream, mStreams[idx].mUri.c_str()); 1578 } 1579 mStreams[idx].mUri = mStreams[idx].mNewUri; 1580 mStreams[idx].mNewUri.clear(); 1581 } 1582 1583 tryToFinishBandwidthSwitch(); 1584} 1585 1586void LiveSession::onCheckSwitchDown() { 1587 if (mSwitchDownMonitor == NULL) { 1588 return; 1589 } 1590 1591 for (size_t i = 0; i < kMaxStreams; ++i) { 1592 int32_t targetDuration; 1593 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1594 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1595 1596 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1597 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1598 int64_t targetDurationUs = targetDuration * 1000000ll; 1599 1600 if (bufferedDurationUs < targetDurationUs / 3) { 1601 (new AMessage(kWhatSwitchDown, id()))->post(); 1602 break; 1603 } 1604 } 1605 } 1606 1607 mSwitchDownMonitor->post(1000000ll); 1608} 1609 1610void LiveSession::onSwitchDown() { 1611 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1612 return; 1613 } 1614 1615 ssize_t bandwidthIndex = getBandwidthIndex(); 1616 if (bandwidthIndex < mCurBandwidthIndex) { 1617 changeConfiguration(-1, bandwidthIndex, false); 1618 return; 1619 } 1620 1621 changeConfiguration(-1, mCurBandwidthIndex - 1, false); 1622} 1623 1624// Mark switch done when: 1625// 1. all old buffers are swapped out 1626void LiveSession::tryToFinishBandwidthSwitch() { 1627 if (!mSwitchInProgress) { 1628 return; 1629 } 1630 1631 bool needToRemoveFetchers = false; 1632 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1633 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1634 needToRemoveFetchers = true; 1635 break; 1636 } 1637 } 1638 1639 if (!needToRemoveFetchers && mSwapMask == 0) { 1640 ALOGI("mSwitchInProgress = false"); 1641 mStreamMask = mNewStreamMask; 1642 mSwitchInProgress = false; 1643 } 1644} 1645 1646void LiveSession::scheduleCheckBandwidthEvent() { 1647 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1648 msg->setInt32("generation", mCheckBandwidthGeneration); 1649 msg->post(10000000ll); 1650} 1651 1652void LiveSession::cancelCheckBandwidthEvent() { 1653 ++mCheckBandwidthGeneration; 1654} 1655 1656void LiveSession::cancelBandwidthSwitch() { 1657 Mutex::Autolock lock(mSwapMutex); 1658 mSwitchGeneration++; 1659 mSwitchInProgress = false; 1660 mSwapMask = 0; 1661 1662 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1663 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1664 if (info.mToBeRemoved) { 1665 info.mToBeRemoved = false; 1666 } 1667 } 1668 1669 for (size_t i = 0; i < kMaxStreams; ++i) { 1670 if (!mStreams[i].mNewUri.empty()) { 1671 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1672 if (j < 0) { 1673 mStreams[i].mNewUri.clear(); 1674 continue; 1675 } 1676 1677 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1678 info.mFetcher->stopAsync(); 1679 mFetcherInfos.removeItemsAt(j); 1680 mStreams[i].mNewUri.clear(); 1681 } 1682 } 1683} 1684 1685bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1686 if (mReconfigurationInProgress || mSwitchInProgress) { 1687 return false; 1688 } 1689 1690 if (mCurBandwidthIndex < 0) { 1691 return true; 1692 } 1693 1694 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1695 return false; 1696 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1697 return canSwitchUp(); 1698 } else { 1699 return true; 1700 } 1701} 1702 1703void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { 1704 size_t bandwidthIndex = getBandwidthIndex(); 1705 if (canSwitchBandwidthTo(bandwidthIndex)) { 1706 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1707 } else { 1708 // Come back and check again 10 seconds later in case there is nothing to do now. 1709 // If we DO change configuration, once that completes it'll schedule a new 1710 // check bandwidth event with an incremented mCheckBandwidthGeneration. 1711 msg->post(10000000ll); 1712 } 1713} 1714 1715void LiveSession::postPrepared(status_t err) { 1716 CHECK(mInPreparationPhase); 1717 1718 sp<AMessage> notify = mNotify->dup(); 1719 if (err == OK || err == ERROR_END_OF_STREAM) { 1720 notify->setInt32("what", kWhatPrepared); 1721 } else { 1722 notify->setInt32("what", kWhatPreparationFailed); 1723 notify->setInt32("err", err); 1724 } 1725 1726 notify->post(); 1727 1728 mInPreparationPhase = false; 1729 1730 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1731 mSwitchDownMonitor->post(); 1732} 1733 1734} // namespace android 1735 1736