LiveSession.cpp revision 895651b07fec30b0f9b0d2499599a179d95c9be4
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52LiveSession::LiveSession( 53 const sp<AMessage> ¬ify, uint32_t flags, 54 const sp<IMediaHTTPService> &httpService) 55 : mNotify(notify), 56 mFlags(flags), 57 mHTTPService(httpService), 58 mInPreparationPhase(true), 59 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 60 mCurBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mSubtitleGeneration(0), 67 mLastDequeuedTimeUs(0ll), 68 mRealTimeBaseUs(0ll), 69 mReconfigurationInProgress(false), 70 mSwitchInProgress(false), 71 mDisconnectReplyID(0), 72 mSeekReplyID(0), 73 mFirstTimeUsValid(false), 74 mFirstTimeUs(0), 75 mLastSeekTimeUs(0) { 76 77 mStreams[kAudioIndex] = StreamItem("audio"); 78 mStreams[kVideoIndex] = StreamItem("video"); 79 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 80 81 for (size_t i = 0; i < kMaxStreams; ++i) { 82 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 84 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 85 mBuffering[i] = false; 86 } 87} 88 89LiveSession::~LiveSession() { 90} 91 92sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 93 ABuffer *discontinuity = new ABuffer(0); 94 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 95 discontinuity->meta()->setInt32("swapPacketSource", swap); 96 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 97 discontinuity->meta()->setInt64("timeUs", -1); 98 return discontinuity; 99} 100 101void LiveSession::swapPacketSource(StreamType stream) { 102 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 103 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 104 sp<AnotherPacketSource> tmp = aps; 105 aps = aps2; 106 aps2 = tmp; 107 aps2->clear(); 108} 109 110status_t LiveSession::dequeueAccessUnit( 111 StreamType stream, sp<ABuffer> *accessUnit) { 112 if (!(mStreamMask & stream)) { 113 // return -EWOULDBLOCK to avoid halting the decoder 114 // when switching between audio/video and audio only. 115 return -EWOULDBLOCK; 116 } 117 118 status_t finalResult; 119 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 120 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 121 discontinuityQueue->dequeueAccessUnit(accessUnit); 122 // seeking, track switching 123 sp<AMessage> extra; 124 int64_t timeUs; 125 if ((*accessUnit)->meta()->findMessage("extra", &extra) 126 && extra != NULL 127 && extra->findInt64("timeUs", &timeUs)) { 128 // seeking only 129 mLastSeekTimeUs = timeUs; 130 mDiscontinuityOffsetTimesUs.clear(); 131 mDiscontinuityAbsStartTimesUs.clear(); 132 } 133 return INFO_DISCONTINUITY; 134 } 135 136 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 137 138 ssize_t idx = typeToIndex(stream); 139 if (!packetSource->hasBufferAvailable(&finalResult)) { 140 if (finalResult == OK) { 141 mBuffering[idx] = true; 142 return -EAGAIN; 143 } else { 144 return finalResult; 145 } 146 } 147 148 if (mBuffering[idx]) { 149 if (mSwitchInProgress 150 || packetSource->isFinished(0) 151 || packetSource->getEstimatedDurationUs() > 10000000ll) { 152 mBuffering[idx] = false; 153 } 154 } 155 156 if (mBuffering[idx]) { 157 return -EAGAIN; 158 } 159 160 // wait for counterpart 161 sp<AnotherPacketSource> otherSource; 162 uint32_t mask = mNewStreamMask & mStreamMask; 163 uint32_t fetchersMask = 0; 164 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 165 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 166 fetchersMask |= fetcherMask; 167 } 168 mask &= fetchersMask; 169 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 170 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 171 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 172 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 173 } 174 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 175 return finalResult == OK ? -EAGAIN : finalResult; 176 } 177 178 status_t err = packetSource->dequeueAccessUnit(accessUnit); 179 180 size_t streamIdx; 181 const char *streamStr; 182 switch (stream) { 183 case STREAMTYPE_AUDIO: 184 streamIdx = kAudioIndex; 185 streamStr = "audio"; 186 break; 187 case STREAMTYPE_VIDEO: 188 streamIdx = kVideoIndex; 189 streamStr = "video"; 190 break; 191 case STREAMTYPE_SUBTITLES: 192 streamIdx = kSubtitleIndex; 193 streamStr = "subs"; 194 break; 195 default: 196 TRESPASS(); 197 } 198 199 StreamItem& strm = mStreams[streamIdx]; 200 if (err == INFO_DISCONTINUITY) { 201 // adaptive streaming, discontinuities in the playlist 202 int32_t type; 203 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 204 205 sp<AMessage> extra; 206 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 207 extra.clear(); 208 } 209 210 ALOGI("[%s] read discontinuity of type %d, extra = %s", 211 streamStr, 212 type, 213 extra == NULL ? "NULL" : extra->debugString().c_str()); 214 215 int32_t swap; 216 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 217 int32_t switchGeneration; 218 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 219 { 220 Mutex::Autolock lock(mSwapMutex); 221 if (switchGeneration == mSwitchGeneration) { 222 swapPacketSource(stream); 223 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 224 msg->setInt32("stream", stream); 225 msg->setInt32("switchGeneration", switchGeneration); 226 msg->post(); 227 } 228 } 229 } else { 230 size_t seq = strm.mCurDiscontinuitySeq; 231 int64_t offsetTimeUs; 232 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 233 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 234 } else { 235 offsetTimeUs = 0; 236 } 237 238 seq += 1; 239 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 240 int64_t firstTimeUs; 241 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 242 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 243 offsetTimeUs += strm.mLastSampleDurationUs; 244 } else { 245 offsetTimeUs += strm.mLastSampleDurationUs; 246 } 247 248 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 249 } 250 } else if (err == OK) { 251 252 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 253 int64_t timeUs; 254 int32_t discontinuitySeq = 0; 255 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 256 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 257 strm.mCurDiscontinuitySeq = discontinuitySeq; 258 259 int32_t discard = 0; 260 int64_t firstTimeUs; 261 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 262 int64_t durUs; // approximate sample duration 263 if (timeUs > strm.mLastDequeuedTimeUs) { 264 durUs = timeUs - strm.mLastDequeuedTimeUs; 265 } else { 266 durUs = strm.mLastDequeuedTimeUs - timeUs; 267 } 268 strm.mLastSampleDurationUs = durUs; 269 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 270 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 271 firstTimeUs = timeUs; 272 } else { 273 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 274 firstTimeUs = timeUs; 275 } 276 277 strm.mLastDequeuedTimeUs = timeUs; 278 if (timeUs >= firstTimeUs) { 279 timeUs -= firstTimeUs; 280 } else { 281 timeUs = 0; 282 } 283 timeUs += mLastSeekTimeUs; 284 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 285 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 286 } 287 288 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 289 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 290 mLastDequeuedTimeUs = timeUs; 291 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 292 } else if (stream == STREAMTYPE_SUBTITLES) { 293 int32_t subtitleGeneration; 294 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 295 && subtitleGeneration != mSubtitleGeneration) { 296 return -EAGAIN; 297 }; 298 (*accessUnit)->meta()->setInt32( 299 "trackIndex", mPlaylist->getSelectedIndex()); 300 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 301 } 302 } else { 303 ALOGI("[%s] encountered error %d", streamStr, err); 304 } 305 306 return err; 307} 308 309status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 310 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 311 if (!(mStreamMask & stream)) { 312 return UNKNOWN_ERROR; 313 } 314 315 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 316 317 sp<MetaData> meta = packetSource->getFormat(); 318 319 if (meta == NULL) { 320 return -EAGAIN; 321 } 322 323 return convertMetaDataToMessage(meta, format); 324} 325 326void LiveSession::connectAsync( 327 const char *url, const KeyedVector<String8, String8> *headers) { 328 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 329 msg->setString("url", url); 330 331 if (headers != NULL) { 332 msg->setPointer( 333 "headers", 334 new KeyedVector<String8, String8>(*headers)); 335 } 336 337 msg->post(); 338} 339 340status_t LiveSession::disconnect() { 341 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 342 343 sp<AMessage> response; 344 status_t err = msg->postAndAwaitResponse(&response); 345 346 return err; 347} 348 349status_t LiveSession::seekTo(int64_t timeUs) { 350 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 351 msg->setInt64("timeUs", timeUs); 352 353 sp<AMessage> response; 354 status_t err = msg->postAndAwaitResponse(&response); 355 356 return err; 357} 358 359void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 360 switch (msg->what()) { 361 case kWhatConnect: 362 { 363 onConnect(msg); 364 break; 365 } 366 367 case kWhatDisconnect: 368 { 369 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 370 371 if (mReconfigurationInProgress) { 372 break; 373 } 374 375 finishDisconnect(); 376 break; 377 } 378 379 case kWhatSeek: 380 { 381 uint32_t seekReplyID; 382 CHECK(msg->senderAwaitsResponse(&seekReplyID)); 383 mSeekReplyID = seekReplyID; 384 mSeekReply = new AMessage; 385 386 status_t err = onSeek(msg); 387 388 if (err != OK) { 389 msg->post(50000); 390 } 391 break; 392 } 393 394 case kWhatFetcherNotify: 395 { 396 int32_t what; 397 CHECK(msg->findInt32("what", &what)); 398 399 switch (what) { 400 case PlaylistFetcher::kWhatStarted: 401 break; 402 case PlaylistFetcher::kWhatPaused: 403 case PlaylistFetcher::kWhatStopped: 404 { 405 if (what == PlaylistFetcher::kWhatStopped) { 406 AString uri; 407 CHECK(msg->findString("uri", &uri)); 408 if (mFetcherInfos.removeItem(uri) < 0) { 409 // ignore duplicated kWhatStopped messages. 410 break; 411 } 412 413 if (mSwitchInProgress) { 414 tryToFinishBandwidthSwitch(); 415 } 416 } 417 418 if (mContinuation != NULL) { 419 CHECK_GT(mContinuationCounter, 0); 420 if (--mContinuationCounter == 0) { 421 mContinuation->post(); 422 423 if (mSeekReplyID != 0) { 424 CHECK(mSeekReply != NULL); 425 mSeekReply->setInt32("err", OK); 426 mSeekReply->postReply(mSeekReplyID); 427 mSeekReplyID = 0; 428 mSeekReply.clear(); 429 } 430 } 431 } 432 break; 433 } 434 435 case PlaylistFetcher::kWhatDurationUpdate: 436 { 437 AString uri; 438 CHECK(msg->findString("uri", &uri)); 439 440 int64_t durationUs; 441 CHECK(msg->findInt64("durationUs", &durationUs)); 442 443 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 444 info->mDurationUs = durationUs; 445 break; 446 } 447 448 case PlaylistFetcher::kWhatError: 449 { 450 status_t err; 451 CHECK(msg->findInt32("err", &err)); 452 453 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 454 455 // handle EOS on subtitle tracks independently 456 AString uri; 457 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 458 ssize_t i = mFetcherInfos.indexOfKey(uri); 459 if (i >= 0) { 460 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 461 if (fetcher != NULL) { 462 uint32_t type = fetcher->getStreamTypeMask(); 463 if (type == STREAMTYPE_SUBTITLES) { 464 mPacketSources.valueFor( 465 STREAMTYPE_SUBTITLES)->signalEOS(err);; 466 break; 467 } 468 } 469 } 470 } 471 472 if (mInPreparationPhase) { 473 postPrepared(err); 474 } 475 476 cancelBandwidthSwitch(); 477 478 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 479 480 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 481 482 mPacketSources.valueFor( 483 STREAMTYPE_SUBTITLES)->signalEOS(err); 484 485 sp<AMessage> notify = mNotify->dup(); 486 notify->setInt32("what", kWhatError); 487 notify->setInt32("err", err); 488 notify->post(); 489 break; 490 } 491 492 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 493 { 494 AString uri; 495 CHECK(msg->findString("uri", &uri)); 496 497 if (mFetcherInfos.indexOfKey(uri) < 0) { 498 ALOGE("couldn't find uri"); 499 break; 500 } 501 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 502 info->mIsPrepared = true; 503 504 if (mInPreparationPhase) { 505 bool allFetchersPrepared = true; 506 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 507 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 508 allFetchersPrepared = false; 509 break; 510 } 511 } 512 513 if (allFetchersPrepared) { 514 postPrepared(OK); 515 } 516 } 517 break; 518 } 519 520 case PlaylistFetcher::kWhatStartedAt: 521 { 522 int32_t switchGeneration; 523 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 524 525 if (switchGeneration != mSwitchGeneration) { 526 break; 527 } 528 529 // Resume fetcher for the original variant; the resumed fetcher should 530 // continue until the timestamps found in msg, which is stored by the 531 // new fetcher to indicate where the new variant has started buffering. 532 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 533 const FetcherInfo info = mFetcherInfos.valueAt(i); 534 if (info.mToBeRemoved) { 535 info.mFetcher->resumeUntilAsync(msg); 536 } 537 } 538 break; 539 } 540 541 default: 542 TRESPASS(); 543 } 544 545 break; 546 } 547 548 case kWhatCheckBandwidth: 549 { 550 int32_t generation; 551 CHECK(msg->findInt32("generation", &generation)); 552 553 if (generation != mCheckBandwidthGeneration) { 554 break; 555 } 556 557 onCheckBandwidth(msg); 558 break; 559 } 560 561 case kWhatChangeConfiguration: 562 { 563 onChangeConfiguration(msg); 564 break; 565 } 566 567 case kWhatChangeConfiguration2: 568 { 569 onChangeConfiguration2(msg); 570 break; 571 } 572 573 case kWhatChangeConfiguration3: 574 { 575 onChangeConfiguration3(msg); 576 break; 577 } 578 579 case kWhatFinishDisconnect2: 580 { 581 onFinishDisconnect2(); 582 break; 583 } 584 585 case kWhatSwapped: 586 { 587 onSwapped(msg); 588 break; 589 } 590 591 case kWhatCheckSwitchDown: 592 { 593 onCheckSwitchDown(); 594 break; 595 } 596 597 case kWhatSwitchDown: 598 { 599 onSwitchDown(); 600 break; 601 } 602 603 default: 604 TRESPASS(); 605 break; 606 } 607} 608 609// static 610int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 611 if (a->mBandwidth < b->mBandwidth) { 612 return -1; 613 } else if (a->mBandwidth == b->mBandwidth) { 614 return 0; 615 } 616 617 return 1; 618} 619 620// static 621LiveSession::StreamType LiveSession::indexToType(int idx) { 622 CHECK(idx >= 0 && idx < kMaxStreams); 623 return (StreamType)(1 << idx); 624} 625 626// static 627ssize_t LiveSession::typeToIndex(int32_t type) { 628 switch (type) { 629 case STREAMTYPE_AUDIO: 630 return 0; 631 case STREAMTYPE_VIDEO: 632 return 1; 633 case STREAMTYPE_SUBTITLES: 634 return 2; 635 default: 636 return -1; 637 }; 638 return -1; 639} 640 641void LiveSession::onConnect(const sp<AMessage> &msg) { 642 AString url; 643 CHECK(msg->findString("url", &url)); 644 645 KeyedVector<String8, String8> *headers = NULL; 646 if (!msg->findPointer("headers", (void **)&headers)) { 647 mExtraHeaders.clear(); 648 } else { 649 mExtraHeaders = *headers; 650 651 delete headers; 652 headers = NULL; 653 } 654 655 // TODO currently we don't know if we are coming here from incognito mode 656 ALOGI("onConnect %s", uriDebugString(url).c_str()); 657 658 mMasterURL = url; 659 660 bool dummy; 661 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 662 663 if (mPlaylist == NULL) { 664 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 665 666 postPrepared(ERROR_IO); 667 return; 668 } 669 670 // We trust the content provider to make a reasonable choice of preferred 671 // initial bandwidth by listing it first in the variant playlist. 672 // At startup we really don't have a good estimate on the available 673 // network bandwidth since we haven't tranferred any data yet. Once 674 // we have we can make a better informed choice. 675 size_t initialBandwidth = 0; 676 size_t initialBandwidthIndex = 0; 677 678 if (mPlaylist->isVariantPlaylist()) { 679 for (size_t i = 0; i < mPlaylist->size(); ++i) { 680 BandwidthItem item; 681 682 item.mPlaylistIndex = i; 683 684 sp<AMessage> meta; 685 AString uri; 686 mPlaylist->itemAt(i, &uri, &meta); 687 688 unsigned long bandwidth; 689 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 690 691 if (initialBandwidth == 0) { 692 initialBandwidth = item.mBandwidth; 693 } 694 695 mBandwidthItems.push(item); 696 } 697 698 CHECK_GT(mBandwidthItems.size(), 0u); 699 700 mBandwidthItems.sort(SortByBandwidth); 701 702 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 703 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 704 initialBandwidthIndex = i; 705 break; 706 } 707 } 708 } else { 709 // dummy item. 710 BandwidthItem item; 711 item.mPlaylistIndex = 0; 712 item.mBandwidth = 0; 713 mBandwidthItems.push(item); 714 } 715 716 mPlaylist->pickRandomMediaItems(); 717 changeConfiguration( 718 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 719} 720 721void LiveSession::finishDisconnect() { 722 // No reconfiguration is currently pending, make sure none will trigger 723 // during disconnection either. 724 cancelCheckBandwidthEvent(); 725 726 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 727 // (finishDisconnect, onFinishDisconnect2) 728 cancelBandwidthSwitch(); 729 730 // cancel switch down monitor 731 mSwitchDownMonitor.clear(); 732 733 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 734 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 735 } 736 737 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 738 739 mContinuationCounter = mFetcherInfos.size(); 740 mContinuation = msg; 741 742 if (mContinuationCounter == 0) { 743 msg->post(); 744 } 745} 746 747void LiveSession::onFinishDisconnect2() { 748 mContinuation.clear(); 749 750 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 751 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 752 753 mPacketSources.valueFor( 754 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 755 756 sp<AMessage> response = new AMessage; 757 response->setInt32("err", OK); 758 759 response->postReply(mDisconnectReplyID); 760 mDisconnectReplyID = 0; 761} 762 763sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 764 ssize_t index = mFetcherInfos.indexOfKey(uri); 765 766 if (index >= 0) { 767 return NULL; 768 } 769 770 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 771 notify->setString("uri", uri); 772 notify->setInt32("switchGeneration", mSwitchGeneration); 773 774 FetcherInfo info; 775 info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration); 776 info.mDurationUs = -1ll; 777 info.mIsPrepared = false; 778 info.mToBeRemoved = false; 779 looper()->registerHandler(info.mFetcher); 780 781 mFetcherInfos.add(uri, info); 782 783 return info.mFetcher; 784} 785 786/* 787 * Illustration of parameters: 788 * 789 * 0 `range_offset` 790 * +------------+-------------------------------------------------------+--+--+ 791 * | | | next block to fetch | | | 792 * | | `source` handle => `out` buffer | | | | 793 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 794 * | |<----------- `range_length` / buffer capacity ----------->| | 795 * |<------------------------------ file_size ------------------------------->| 796 * 797 * Special parameter values: 798 * - range_length == -1 means entire file 799 * - block_size == 0 means entire range 800 * 801 */ 802ssize_t LiveSession::fetchFile( 803 const char *url, sp<ABuffer> *out, 804 int64_t range_offset, int64_t range_length, 805 uint32_t block_size, /* download block size */ 806 sp<DataSource> *source, /* to return and reuse source */ 807 String8 *actualUrl) { 808 off64_t size; 809 sp<DataSource> temp_source; 810 if (source == NULL) { 811 source = &temp_source; 812 } 813 814 if (*source == NULL) { 815 if (!strncasecmp(url, "file://", 7)) { 816 *source = new FileSource(url + 7); 817 } else if (strncasecmp(url, "http://", 7) 818 && strncasecmp(url, "https://", 8)) { 819 return ERROR_UNSUPPORTED; 820 } else { 821 KeyedVector<String8, String8> headers = mExtraHeaders; 822 if (range_offset > 0 || range_length >= 0) { 823 headers.add( 824 String8("Range"), 825 String8( 826 StringPrintf( 827 "bytes=%lld-%s", 828 range_offset, 829 range_length < 0 830 ? "" : StringPrintf("%lld", 831 range_offset + range_length - 1).c_str()).c_str())); 832 } 833 status_t err = mHTTPDataSource->connect(url, &headers); 834 835 if (err != OK) { 836 return err; 837 } 838 839 *source = mHTTPDataSource; 840 } 841 } 842 843 status_t getSizeErr = (*source)->getSize(&size); 844 if (getSizeErr != OK) { 845 size = 65536; 846 } 847 848 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 849 if (*out == NULL) { 850 buffer->setRange(0, 0); 851 } 852 853 ssize_t bytesRead = 0; 854 // adjust range_length if only reading partial block 855 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 856 range_length = buffer->size() + block_size; 857 } 858 for (;;) { 859 // Only resize when we don't know the size. 860 size_t bufferRemaining = buffer->capacity() - buffer->size(); 861 if (bufferRemaining == 0 && getSizeErr != OK) { 862 bufferRemaining = 32768; 863 864 ALOGV("increasing download buffer to %zu bytes", 865 buffer->size() + bufferRemaining); 866 867 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 868 memcpy(copy->data(), buffer->data(), buffer->size()); 869 copy->setRange(0, buffer->size()); 870 871 buffer = copy; 872 } 873 874 size_t maxBytesToRead = bufferRemaining; 875 if (range_length >= 0) { 876 int64_t bytesLeftInRange = range_length - buffer->size(); 877 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 878 maxBytesToRead = bytesLeftInRange; 879 880 if (bytesLeftInRange == 0) { 881 break; 882 } 883 } 884 } 885 886 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 887 // to help us break out of the loop. 888 ssize_t n = (*source)->readAt( 889 buffer->size(), buffer->data() + buffer->size(), 890 maxBytesToRead); 891 892 if (n < 0) { 893 return n; 894 } 895 896 if (n == 0) { 897 break; 898 } 899 900 buffer->setRange(0, buffer->size() + (size_t)n); 901 bytesRead += n; 902 } 903 904 *out = buffer; 905 if (actualUrl != NULL) { 906 *actualUrl = (*source)->getUri(); 907 if (actualUrl->isEmpty()) { 908 *actualUrl = url; 909 } 910 } 911 912 return bytesRead; 913} 914 915sp<M3UParser> LiveSession::fetchPlaylist( 916 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 917 ALOGV("fetchPlaylist '%s'", url); 918 919 *unchanged = false; 920 921 sp<ABuffer> buffer; 922 String8 actualUrl; 923 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 924 925 if (err <= 0) { 926 return NULL; 927 } 928 929 // MD5 functionality is not available on the simulator, treat all 930 // playlists as changed. 931 932#if defined(HAVE_ANDROID_OS) 933 uint8_t hash[16]; 934 935 MD5_CTX m; 936 MD5_Init(&m); 937 MD5_Update(&m, buffer->data(), buffer->size()); 938 939 MD5_Final(hash, &m); 940 941 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 942 // playlist unchanged 943 *unchanged = true; 944 945 return NULL; 946 } 947 948 if (curPlaylistHash != NULL) { 949 memcpy(curPlaylistHash, hash, sizeof(hash)); 950 } 951#endif 952 953 sp<M3UParser> playlist = 954 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 955 956 if (playlist->initCheck() != OK) { 957 ALOGE("failed to parse .m3u8 playlist"); 958 959 return NULL; 960 } 961 962 return playlist; 963} 964 965static double uniformRand() { 966 return (double)rand() / RAND_MAX; 967} 968 969size_t LiveSession::getBandwidthIndex() { 970 if (mBandwidthItems.size() == 0) { 971 return 0; 972 } 973 974#if 1 975 char value[PROPERTY_VALUE_MAX]; 976 ssize_t index = -1; 977 if (property_get("media.httplive.bw-index", value, NULL)) { 978 char *end; 979 index = strtol(value, &end, 10); 980 CHECK(end > value && *end == '\0'); 981 982 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 983 index = mBandwidthItems.size() - 1; 984 } 985 } 986 987 if (index < 0) { 988 int32_t bandwidthBps; 989 if (mHTTPDataSource != NULL 990 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 991 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 992 } else { 993 ALOGV("no bandwidth estimate."); 994 return 0; // Pick the lowest bandwidth stream by default. 995 } 996 997 char value[PROPERTY_VALUE_MAX]; 998 if (property_get("media.httplive.max-bw", value, NULL)) { 999 char *end; 1000 long maxBw = strtoul(value, &end, 10); 1001 if (end > value && *end == '\0') { 1002 if (maxBw > 0 && bandwidthBps > maxBw) { 1003 ALOGV("bandwidth capped to %ld bps", maxBw); 1004 bandwidthBps = maxBw; 1005 } 1006 } 1007 } 1008 1009 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1010 1011 index = mBandwidthItems.size() - 1; 1012 while (index > 0) { 1013 // consider only 80% of the available bandwidth, but if we are switching up, 1014 // be even more conservative (70%) to avoid overestimating and immediately 1015 // switching back. 1016 size_t adjustedBandwidthBps = bandwidthBps; 1017 if (index > mCurBandwidthIndex) { 1018 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 1019 } else { 1020 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 1021 } 1022 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1023 break; 1024 } 1025 --index; 1026 } 1027 } 1028#elif 0 1029 // Change bandwidth at random() 1030 size_t index = uniformRand() * mBandwidthItems.size(); 1031#elif 0 1032 // There's a 50% chance to stay on the current bandwidth and 1033 // a 50% chance to switch to the next higher bandwidth (wrapping around 1034 // to lowest) 1035 const size_t kMinIndex = 0; 1036 1037 static ssize_t mCurBandwidthIndex = -1; 1038 1039 size_t index; 1040 if (mCurBandwidthIndex < 0) { 1041 index = kMinIndex; 1042 } else if (uniformRand() < 0.5) { 1043 index = (size_t)mCurBandwidthIndex; 1044 } else { 1045 index = mCurBandwidthIndex + 1; 1046 if (index == mBandwidthItems.size()) { 1047 index = kMinIndex; 1048 } 1049 } 1050 mCurBandwidthIndex = index; 1051#elif 0 1052 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1053 1054 size_t index = mBandwidthItems.size() - 1; 1055 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1056 --index; 1057 } 1058#elif 1 1059 char value[PROPERTY_VALUE_MAX]; 1060 size_t index; 1061 if (property_get("media.httplive.bw-index", value, NULL)) { 1062 char *end; 1063 index = strtoul(value, &end, 10); 1064 CHECK(end > value && *end == '\0'); 1065 1066 if (index >= mBandwidthItems.size()) { 1067 index = mBandwidthItems.size() - 1; 1068 } 1069 } else { 1070 index = 0; 1071 } 1072#else 1073 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1074#endif 1075 1076 CHECK_GE(index, 0); 1077 1078 return index; 1079} 1080 1081int64_t LiveSession::latestMediaSegmentStartTimeUs() { 1082 sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta(); 1083 int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1; 1084 if (audioMeta != NULL) { 1085 audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs); 1086 } 1087 1088 sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta(); 1089 if (videoMeta != NULL 1090 && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) { 1091 if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) { 1092 minSegmentStartTimeUs = videoSegmentStartTimeUs; 1093 } 1094 1095 } 1096 return minSegmentStartTimeUs; 1097} 1098 1099status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1100 int64_t timeUs; 1101 CHECK(msg->findInt64("timeUs", &timeUs)); 1102 1103 if (!mReconfigurationInProgress) { 1104 changeConfiguration(timeUs, mCurBandwidthIndex); 1105 return OK; 1106 } else { 1107 return -EWOULDBLOCK; 1108 } 1109} 1110 1111status_t LiveSession::getDuration(int64_t *durationUs) const { 1112 int64_t maxDurationUs = -1ll; 1113 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1114 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1115 1116 if (fetcherDurationUs > maxDurationUs) { 1117 maxDurationUs = fetcherDurationUs; 1118 } 1119 } 1120 1121 *durationUs = maxDurationUs; 1122 1123 return OK; 1124} 1125 1126bool LiveSession::isSeekable() const { 1127 int64_t durationUs; 1128 return getDuration(&durationUs) == OK && durationUs >= 0; 1129} 1130 1131bool LiveSession::hasDynamicDuration() const { 1132 return false; 1133} 1134 1135size_t LiveSession::getTrackCount() const { 1136 if (mPlaylist == NULL) { 1137 return 0; 1138 } else { 1139 return mPlaylist->getTrackCount(); 1140 } 1141} 1142 1143sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1144 if (mPlaylist == NULL) { 1145 return NULL; 1146 } else { 1147 return mPlaylist->getTrackInfo(trackIndex); 1148 } 1149} 1150 1151status_t LiveSession::selectTrack(size_t index, bool select) { 1152 if (mPlaylist == NULL) { 1153 return INVALID_OPERATION; 1154 } 1155 1156 ++mSubtitleGeneration; 1157 status_t err = mPlaylist->selectTrack(index, select); 1158 if (err == OK) { 1159 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1160 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1161 msg->setInt32("pickTrack", select); 1162 msg->post(); 1163 } 1164 return err; 1165} 1166 1167ssize_t LiveSession::getSelectedTrack(media_track_type type) const { 1168 if (mPlaylist == NULL) { 1169 return -1; 1170 } else { 1171 return mPlaylist->getSelectedTrack(type); 1172 } 1173} 1174 1175bool LiveSession::canSwitchUp() { 1176 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1177 status_t err = OK; 1178 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1179 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1180 int64_t dur = source->getBufferedDurationUs(&err); 1181 if (err == OK && dur > 10000000) { 1182 return true; 1183 } 1184 } 1185 return false; 1186} 1187 1188void LiveSession::changeConfiguration( 1189 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1190 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1191 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1192 cancelBandwidthSwitch(); 1193 1194 CHECK(!mReconfigurationInProgress); 1195 mReconfigurationInProgress = true; 1196 1197 mCurBandwidthIndex = bandwidthIndex; 1198 1199 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1200 timeUs, bandwidthIndex, pickTrack); 1201 1202 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1203 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1204 1205 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1206 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1207 1208 AString URIs[kMaxStreams]; 1209 for (size_t i = 0; i < kMaxStreams; ++i) { 1210 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1211 streamMask |= indexToType(i); 1212 } 1213 } 1214 1215 // Step 1, stop and discard fetchers that are no longer needed. 1216 // Pause those that we'll reuse. 1217 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1218 const AString &uri = mFetcherInfos.keyAt(i); 1219 1220 bool discardFetcher = true; 1221 1222 // If we're seeking all current fetchers are discarded. 1223 if (timeUs < 0ll) { 1224 // delay fetcher removal if not picking tracks 1225 discardFetcher = pickTrack; 1226 1227 for (size_t j = 0; j < kMaxStreams; ++j) { 1228 StreamType type = indexToType(j); 1229 if ((streamMask & type) && uri == URIs[j]) { 1230 resumeMask |= type; 1231 streamMask &= ~type; 1232 discardFetcher = false; 1233 } 1234 } 1235 } 1236 1237 if (discardFetcher) { 1238 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1239 } else { 1240 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1241 } 1242 } 1243 1244 sp<AMessage> msg; 1245 if (timeUs < 0ll) { 1246 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1247 msg = new AMessage(kWhatChangeConfiguration3, id()); 1248 } else { 1249 msg = new AMessage(kWhatChangeConfiguration2, id()); 1250 } 1251 msg->setInt32("streamMask", streamMask); 1252 msg->setInt32("resumeMask", resumeMask); 1253 msg->setInt32("pickTrack", pickTrack); 1254 msg->setInt64("timeUs", timeUs); 1255 for (size_t i = 0; i < kMaxStreams; ++i) { 1256 if ((streamMask | resumeMask) & indexToType(i)) { 1257 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1258 } 1259 } 1260 1261 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1262 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1263 // fetchers have completed their asynchronous operation, we'll post 1264 // mContinuation, which then is handled below in onChangeConfiguration2. 1265 mContinuationCounter = mFetcherInfos.size(); 1266 mContinuation = msg; 1267 1268 if (mContinuationCounter == 0) { 1269 msg->post(); 1270 1271 if (mSeekReplyID != 0) { 1272 CHECK(mSeekReply != NULL); 1273 mSeekReply->setInt32("err", OK); 1274 mSeekReply->postReply(mSeekReplyID); 1275 mSeekReplyID = 0; 1276 mSeekReply.clear(); 1277 } 1278 } 1279} 1280 1281void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1282 if (!mReconfigurationInProgress) { 1283 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1284 msg->findInt32("pickTrack", &pickTrack); 1285 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1286 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1287 } else { 1288 msg->post(1000000ll); // retry in 1 sec 1289 } 1290} 1291 1292void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1293 mContinuation.clear(); 1294 1295 // All fetchers are either suspended or have been removed now. 1296 1297 uint32_t streamMask, resumeMask; 1298 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1299 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1300 1301 // currently onChangeConfiguration2 is only called for seeking; 1302 // remove the following CHECK if using it else where. 1303 CHECK_EQ(resumeMask, 0); 1304 streamMask |= resumeMask; 1305 1306 AString URIs[kMaxStreams]; 1307 for (size_t i = 0; i < kMaxStreams; ++i) { 1308 if (streamMask & indexToType(i)) { 1309 const AString &uriKey = mStreams[i].uriKey(); 1310 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1311 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1312 } 1313 } 1314 1315 // Determine which decoders to shutdown on the player side, 1316 // a decoder has to be shutdown if either 1317 // 1) its streamtype was active before but now longer isn't. 1318 // or 1319 // 2) its streamtype was already active and still is but the URI 1320 // has changed. 1321 uint32_t changedMask = 0; 1322 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1323 if (((mStreamMask & streamMask & indexToType(i)) 1324 && !(URIs[i] == mStreams[i].mUri)) 1325 || (mStreamMask & ~streamMask & indexToType(i))) { 1326 changedMask |= indexToType(i); 1327 } 1328 } 1329 1330 if (changedMask == 0) { 1331 // If nothing changed as far as the audio/video decoders 1332 // are concerned we can proceed. 1333 onChangeConfiguration3(msg); 1334 return; 1335 } 1336 1337 // Something changed, inform the player which will shutdown the 1338 // corresponding decoders and will post the reply once that's done. 1339 // Handling the reply will continue executing below in 1340 // onChangeConfiguration3. 1341 sp<AMessage> notify = mNotify->dup(); 1342 notify->setInt32("what", kWhatStreamsChanged); 1343 notify->setInt32("changedMask", changedMask); 1344 1345 msg->setWhat(kWhatChangeConfiguration3); 1346 msg->setTarget(id()); 1347 1348 notify->setMessage("reply", msg); 1349 notify->post(); 1350} 1351 1352void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1353 mContinuation.clear(); 1354 // All remaining fetchers are still suspended, the player has shutdown 1355 // any decoders that needed it. 1356 1357 uint32_t streamMask, resumeMask; 1358 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1359 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1360 1361 int64_t timeUs; 1362 int32_t pickTrack; 1363 bool switching = false; 1364 CHECK(msg->findInt64("timeUs", &timeUs)); 1365 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1366 1367 if (timeUs < 0ll) { 1368 if (!pickTrack) { 1369 switching = true; 1370 } 1371 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1372 } else { 1373 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1374 } 1375 1376 for (size_t i = 0; i < kMaxStreams; ++i) { 1377 if (streamMask & indexToType(i)) { 1378 if (switching) { 1379 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1380 } else { 1381 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1382 } 1383 } 1384 } 1385 1386 mNewStreamMask = streamMask | resumeMask; 1387 if (switching) { 1388 mSwapMask = mStreamMask & ~resumeMask; 1389 } 1390 1391 // Of all existing fetchers: 1392 // * Resume fetchers that are still needed and assign them original packet sources. 1393 // * Mark otherwise unneeded fetchers for removal. 1394 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1395 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1396 const AString &uri = mFetcherInfos.keyAt(i); 1397 1398 sp<AnotherPacketSource> sources[kMaxStreams]; 1399 for (size_t j = 0; j < kMaxStreams; ++j) { 1400 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1401 sources[j] = mPacketSources.valueFor(indexToType(j)); 1402 1403 if (j != kSubtitleIndex) { 1404 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1405 sp<AnotherPacketSource> discontinuityQueue; 1406 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1407 discontinuityQueue->queueDiscontinuity( 1408 ATSParser::DISCONTINUITY_NONE, 1409 NULL, 1410 true); 1411 } 1412 } 1413 } 1414 1415 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1416 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1417 || sources[kSubtitleIndex] != NULL) { 1418 info.mFetcher->startAsync( 1419 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1420 } else { 1421 info.mToBeRemoved = true; 1422 } 1423 } 1424 1425 // streamMask now only contains the types that need a new fetcher created. 1426 1427 if (streamMask != 0) { 1428 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1429 } 1430 1431 // Find out when the original fetchers have buffered up to and start the new fetchers 1432 // at a later timestamp. 1433 for (size_t i = 0; i < kMaxStreams; i++) { 1434 if (!(indexToType(i) & streamMask)) { 1435 continue; 1436 } 1437 1438 AString uri; 1439 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1440 1441 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1442 CHECK(fetcher != NULL); 1443 1444 int32_t latestSeq = -1; 1445 int64_t startTimeUs = -1; 1446 int64_t segmentStartTimeUs = -1ll; 1447 int32_t discontinuitySeq = -1; 1448 sp<AnotherPacketSource> sources[kMaxStreams]; 1449 1450 if (i == kSubtitleIndex) { 1451 segmentStartTimeUs = latestMediaSegmentStartTimeUs(); 1452 } 1453 1454 // TRICKY: looping from i as earlier streams are already removed from streamMask 1455 for (size_t j = i; j < kMaxStreams; ++j) { 1456 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1457 if ((streamMask & indexToType(j)) && uri == streamUri) { 1458 sources[j] = mPacketSources.valueFor(indexToType(j)); 1459 1460 if (timeUs >= 0) { 1461 sources[j]->clear(); 1462 startTimeUs = timeUs; 1463 1464 sp<AnotherPacketSource> discontinuityQueue; 1465 sp<AMessage> extra = new AMessage; 1466 extra->setInt64("timeUs", timeUs); 1467 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1468 discontinuityQueue->queueDiscontinuity( 1469 ATSParser::DISCONTINUITY_TIME, extra, true); 1470 } else { 1471 int32_t type; 1472 int64_t srcSegmentStartTimeUs; 1473 sp<AMessage> meta; 1474 if (pickTrack) { 1475 // selecting 1476 meta = sources[j]->getLatestDequeuedMeta(); 1477 } else { 1478 // adapting 1479 meta = sources[j]->getLatestEnqueuedMeta(); 1480 } 1481 1482 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1483 int64_t tmpUs; 1484 CHECK(meta->findInt64("timeUs", &tmpUs)); 1485 if (startTimeUs < 0 || tmpUs < startTimeUs) { 1486 startTimeUs = tmpUs; 1487 } 1488 1489 CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); 1490 if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { 1491 segmentStartTimeUs = tmpUs; 1492 } 1493 1494 int32_t seq; 1495 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1496 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1497 discontinuitySeq = seq; 1498 } 1499 } 1500 1501 if (pickTrack) { 1502 // selecting track, queue discontinuities before content 1503 sources[j]->clear(); 1504 if (j == kSubtitleIndex) { 1505 break; 1506 } 1507 sp<AnotherPacketSource> discontinuityQueue; 1508 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1509 discontinuityQueue->queueDiscontinuity( 1510 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1511 } else { 1512 // adapting, queue discontinuities after resume 1513 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1514 sources[j]->clear(); 1515 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1516 if (extraStreams & indexToType(j)) { 1517 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1518 } 1519 } 1520 } 1521 1522 streamMask &= ~indexToType(j); 1523 } 1524 } 1525 1526 fetcher->startAsync( 1527 sources[kAudioIndex], 1528 sources[kVideoIndex], 1529 sources[kSubtitleIndex], 1530 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1531 segmentStartTimeUs, 1532 discontinuitySeq, 1533 switching); 1534 } 1535 1536 // All fetchers have now been started, the configuration change 1537 // has completed. 1538 1539 cancelCheckBandwidthEvent(); 1540 scheduleCheckBandwidthEvent(); 1541 1542 ALOGV("XXX configuration change completed."); 1543 mReconfigurationInProgress = false; 1544 if (switching) { 1545 mSwitchInProgress = true; 1546 } else { 1547 mStreamMask = mNewStreamMask; 1548 } 1549 1550 if (mDisconnectReplyID != 0) { 1551 finishDisconnect(); 1552 } 1553} 1554 1555void LiveSession::onSwapped(const sp<AMessage> &msg) { 1556 int32_t switchGeneration; 1557 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1558 if (switchGeneration != mSwitchGeneration) { 1559 return; 1560 } 1561 1562 int32_t stream; 1563 CHECK(msg->findInt32("stream", &stream)); 1564 1565 ssize_t idx = typeToIndex(stream); 1566 CHECK(idx >= 0); 1567 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1568 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1569 } 1570 mStreams[idx].mUri = mStreams[idx].mNewUri; 1571 mStreams[idx].mNewUri.clear(); 1572 1573 mSwapMask &= ~stream; 1574 if (mSwapMask != 0) { 1575 return; 1576 } 1577 1578 // Check if new variant contains extra streams. 1579 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1580 while (extraStreams) { 1581 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1582 swapPacketSource(extraStream); 1583 extraStreams &= ~extraStream; 1584 1585 idx = typeToIndex(extraStream); 1586 CHECK(idx >= 0); 1587 if (mStreams[idx].mNewUri.empty()) { 1588 ALOGW("swapping extra stream type %d %s to empty stream", 1589 extraStream, mStreams[idx].mUri.c_str()); 1590 } 1591 mStreams[idx].mUri = mStreams[idx].mNewUri; 1592 mStreams[idx].mNewUri.clear(); 1593 } 1594 1595 tryToFinishBandwidthSwitch(); 1596} 1597 1598void LiveSession::onCheckSwitchDown() { 1599 if (mSwitchDownMonitor == NULL) { 1600 return; 1601 } 1602 1603 for (size_t i = 0; i < kMaxStreams; ++i) { 1604 int32_t targetDuration; 1605 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1606 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1607 1608 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1609 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1610 int64_t targetDurationUs = targetDuration * 1000000ll; 1611 1612 if (bufferedDurationUs < targetDurationUs / 3) { 1613 (new AMessage(kWhatSwitchDown, id()))->post(); 1614 break; 1615 } 1616 } 1617 } 1618 1619 mSwitchDownMonitor->post(1000000ll); 1620} 1621 1622void LiveSession::onSwitchDown() { 1623 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1624 return; 1625 } 1626 1627 ssize_t bandwidthIndex = getBandwidthIndex(); 1628 if (bandwidthIndex < mCurBandwidthIndex) { 1629 changeConfiguration(-1, bandwidthIndex, false); 1630 return; 1631 } 1632 1633 changeConfiguration(-1, mCurBandwidthIndex - 1, false); 1634} 1635 1636// Mark switch done when: 1637// 1. all old buffers are swapped out 1638void LiveSession::tryToFinishBandwidthSwitch() { 1639 if (!mSwitchInProgress) { 1640 return; 1641 } 1642 1643 bool needToRemoveFetchers = false; 1644 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1645 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1646 needToRemoveFetchers = true; 1647 break; 1648 } 1649 } 1650 1651 if (!needToRemoveFetchers && mSwapMask == 0) { 1652 ALOGI("mSwitchInProgress = false"); 1653 mStreamMask = mNewStreamMask; 1654 mSwitchInProgress = false; 1655 } 1656} 1657 1658void LiveSession::scheduleCheckBandwidthEvent() { 1659 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1660 msg->setInt32("generation", mCheckBandwidthGeneration); 1661 msg->post(10000000ll); 1662} 1663 1664void LiveSession::cancelCheckBandwidthEvent() { 1665 ++mCheckBandwidthGeneration; 1666} 1667 1668void LiveSession::cancelBandwidthSwitch() { 1669 Mutex::Autolock lock(mSwapMutex); 1670 mSwitchGeneration++; 1671 mSwitchInProgress = false; 1672 mSwapMask = 0; 1673 1674 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1675 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1676 if (info.mToBeRemoved) { 1677 info.mToBeRemoved = false; 1678 } 1679 } 1680 1681 for (size_t i = 0; i < kMaxStreams; ++i) { 1682 if (!mStreams[i].mNewUri.empty()) { 1683 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1684 if (j < 0) { 1685 mStreams[i].mNewUri.clear(); 1686 continue; 1687 } 1688 1689 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1690 info.mFetcher->stopAsync(); 1691 mFetcherInfos.removeItemsAt(j); 1692 mStreams[i].mNewUri.clear(); 1693 } 1694 } 1695} 1696 1697bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1698 if (mReconfigurationInProgress || mSwitchInProgress) { 1699 return false; 1700 } 1701 1702 if (mCurBandwidthIndex < 0) { 1703 return true; 1704 } 1705 1706 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1707 return false; 1708 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1709 return canSwitchUp(); 1710 } else { 1711 return true; 1712 } 1713} 1714 1715void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { 1716 size_t bandwidthIndex = getBandwidthIndex(); 1717 if (canSwitchBandwidthTo(bandwidthIndex)) { 1718 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1719 } else { 1720 // Come back and check again 10 seconds later in case there is nothing to do now. 1721 // If we DO change configuration, once that completes it'll schedule a new 1722 // check bandwidth event with an incremented mCheckBandwidthGeneration. 1723 msg->post(10000000ll); 1724 } 1725} 1726 1727void LiveSession::postPrepared(status_t err) { 1728 CHECK(mInPreparationPhase); 1729 1730 sp<AMessage> notify = mNotify->dup(); 1731 if (err == OK || err == ERROR_END_OF_STREAM) { 1732 notify->setInt32("what", kWhatPrepared); 1733 } else { 1734 notify->setInt32("what", kWhatPreparationFailed); 1735 notify->setInt32("err", err); 1736 } 1737 1738 notify->post(); 1739 1740 mInPreparationPhase = false; 1741 1742 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1743 mSwitchDownMonitor->post(); 1744} 1745 1746} // namespace android 1747 1748