LiveSession.cpp revision 800599cdd50737de1cde483a34b39923750b0658
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52LiveSession::LiveSession( 53 const sp<AMessage> ¬ify, uint32_t flags, 54 const sp<IMediaHTTPService> &httpService) 55 : mNotify(notify), 56 mFlags(flags), 57 mHTTPService(httpService), 58 mInPreparationPhase(true), 59 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 60 mCurBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mLastDequeuedTimeUs(0ll), 67 mRealTimeBaseUs(0ll), 68 mReconfigurationInProgress(false), 69 mSwitchInProgress(false), 70 mDisconnectReplyID(0), 71 mSeekReplyID(0), 72 mFirstTimeUsValid(false), 73 mFirstTimeUs(0), 74 mLastSeekTimeUs(0) { 75 76 mStreams[kAudioIndex] = StreamItem("audio"); 77 mStreams[kVideoIndex] = StreamItem("video"); 78 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 79 80 for (size_t i = 0; i < kMaxStreams; ++i) { 81 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 82 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 84 mBuffering[i] = false; 85 } 86} 87 88LiveSession::~LiveSession() { 89} 90 91sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 92 ABuffer *discontinuity = new ABuffer(0); 93 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 94 discontinuity->meta()->setInt32("swapPacketSource", swap); 95 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 96 discontinuity->meta()->setInt64("timeUs", -1); 97 return discontinuity; 98} 99 100void LiveSession::swapPacketSource(StreamType stream) { 101 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 102 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 103 sp<AnotherPacketSource> tmp = aps; 104 aps = aps2; 105 aps2 = tmp; 106 aps2->clear(); 107} 108 109status_t LiveSession::dequeueAccessUnit( 110 StreamType stream, sp<ABuffer> *accessUnit) { 111 if (!(mStreamMask & stream)) { 112 // return -EWOULDBLOCK to avoid halting the decoder 113 // when switching between audio/video and audio only. 114 return -EWOULDBLOCK; 115 } 116 117 status_t finalResult; 118 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 119 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 120 discontinuityQueue->dequeueAccessUnit(accessUnit); 121 // seeking, track switching 122 sp<AMessage> extra; 123 int64_t timeUs; 124 if ((*accessUnit)->meta()->findMessage("extra", &extra) 125 && extra != NULL 126 && extra->findInt64("timeUs", &timeUs)) { 127 // seeking only 128 mLastSeekTimeUs = timeUs; 129 mDiscontinuityOffsetTimesUs.clear(); 130 mDiscontinuityAbsStartTimesUs.clear(); 131 } 132 return INFO_DISCONTINUITY; 133 } 134 135 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 136 137 ssize_t idx = typeToIndex(stream); 138 if (!packetSource->hasBufferAvailable(&finalResult)) { 139 if (finalResult == OK) { 140 mBuffering[idx] = true; 141 return -EAGAIN; 142 } else { 143 return finalResult; 144 } 145 } 146 147 if (mBuffering[idx]) { 148 if (mSwitchInProgress 149 || packetSource->isFinished(0) 150 || packetSource->getEstimatedDurationUs() > 10000000ll) { 151 mBuffering[idx] = false; 152 } 153 } 154 155 if (mBuffering[idx]) { 156 return -EAGAIN; 157 } 158 159 // wait for counterpart 160 sp<AnotherPacketSource> otherSource; 161 uint32_t mask = mNewStreamMask & mStreamMask; 162 uint32_t fetchersMask = 0; 163 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 164 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 165 fetchersMask |= fetcherMask; 166 } 167 mask &= fetchersMask; 168 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 169 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 170 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 171 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 172 } 173 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 174 return finalResult == OK ? -EAGAIN : finalResult; 175 } 176 177 status_t err = packetSource->dequeueAccessUnit(accessUnit); 178 179 size_t streamIdx; 180 const char *streamStr; 181 switch (stream) { 182 case STREAMTYPE_AUDIO: 183 streamIdx = kAudioIndex; 184 streamStr = "audio"; 185 break; 186 case STREAMTYPE_VIDEO: 187 streamIdx = kVideoIndex; 188 streamStr = "video"; 189 break; 190 case STREAMTYPE_SUBTITLES: 191 streamIdx = kSubtitleIndex; 192 streamStr = "subs"; 193 break; 194 default: 195 TRESPASS(); 196 } 197 198 StreamItem& strm = mStreams[streamIdx]; 199 if (err == INFO_DISCONTINUITY) { 200 // adaptive streaming, discontinuities in the playlist 201 int32_t type; 202 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 203 204 sp<AMessage> extra; 205 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 206 extra.clear(); 207 } 208 209 ALOGI("[%s] read discontinuity of type %d, extra = %s", 210 streamStr, 211 type, 212 extra == NULL ? "NULL" : extra->debugString().c_str()); 213 214 int32_t swap; 215 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 216 int32_t switchGeneration; 217 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 218 { 219 Mutex::Autolock lock(mSwapMutex); 220 if (switchGeneration == mSwitchGeneration) { 221 swapPacketSource(stream); 222 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 223 msg->setInt32("stream", stream); 224 msg->setInt32("switchGeneration", switchGeneration); 225 msg->post(); 226 } 227 } 228 } else { 229 size_t seq = strm.mCurDiscontinuitySeq; 230 int64_t offsetTimeUs; 231 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 232 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 233 } else { 234 offsetTimeUs = 0; 235 } 236 237 seq += 1; 238 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 239 int64_t firstTimeUs; 240 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 241 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 242 offsetTimeUs += strm.mLastSampleDurationUs; 243 } else { 244 offsetTimeUs += strm.mLastSampleDurationUs; 245 } 246 247 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 248 } 249 } else if (err == OK) { 250 251 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 252 int64_t timeUs; 253 int32_t discontinuitySeq = 0; 254 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 255 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 256 strm.mCurDiscontinuitySeq = discontinuitySeq; 257 258 int32_t discard = 0; 259 int64_t firstTimeUs; 260 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 261 int64_t durUs; // approximate sample duration 262 if (timeUs > strm.mLastDequeuedTimeUs) { 263 durUs = timeUs - strm.mLastDequeuedTimeUs; 264 } else { 265 durUs = strm.mLastDequeuedTimeUs - timeUs; 266 } 267 strm.mLastSampleDurationUs = durUs; 268 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 269 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 270 firstTimeUs = timeUs; 271 } else { 272 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 273 firstTimeUs = timeUs; 274 } 275 276 strm.mLastDequeuedTimeUs = timeUs; 277 if (timeUs >= firstTimeUs) { 278 timeUs -= firstTimeUs; 279 } else { 280 timeUs = 0; 281 } 282 timeUs += mLastSeekTimeUs; 283 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 284 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 285 } 286 287 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 288 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 289 mLastDequeuedTimeUs = timeUs; 290 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 291 } else if (stream == STREAMTYPE_SUBTITLES) { 292 (*accessUnit)->meta()->setInt32( 293 "trackIndex", mPlaylist->getSelectedIndex()); 294 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 295 } 296 } else { 297 ALOGI("[%s] encountered error %d", streamStr, err); 298 } 299 300 return err; 301} 302 303status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 304 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 305 if (!(mStreamMask & stream)) { 306 return UNKNOWN_ERROR; 307 } 308 309 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 310 311 sp<MetaData> meta = packetSource->getFormat(); 312 313 if (meta == NULL) { 314 return -EAGAIN; 315 } 316 317 return convertMetaDataToMessage(meta, format); 318} 319 320void LiveSession::connectAsync( 321 const char *url, const KeyedVector<String8, String8> *headers) { 322 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 323 msg->setString("url", url); 324 325 if (headers != NULL) { 326 msg->setPointer( 327 "headers", 328 new KeyedVector<String8, String8>(*headers)); 329 } 330 331 msg->post(); 332} 333 334status_t LiveSession::disconnect() { 335 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 336 337 sp<AMessage> response; 338 status_t err = msg->postAndAwaitResponse(&response); 339 340 return err; 341} 342 343status_t LiveSession::seekTo(int64_t timeUs) { 344 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 345 msg->setInt64("timeUs", timeUs); 346 347 sp<AMessage> response; 348 status_t err = msg->postAndAwaitResponse(&response); 349 350 return err; 351} 352 353void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 354 switch (msg->what()) { 355 case kWhatConnect: 356 { 357 onConnect(msg); 358 break; 359 } 360 361 case kWhatDisconnect: 362 { 363 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 364 365 if (mReconfigurationInProgress) { 366 break; 367 } 368 369 finishDisconnect(); 370 break; 371 } 372 373 case kWhatSeek: 374 { 375 uint32_t seekReplyID; 376 CHECK(msg->senderAwaitsResponse(&seekReplyID)); 377 mSeekReplyID = seekReplyID; 378 mSeekReply = new AMessage; 379 380 status_t err = onSeek(msg); 381 382 if (err != OK) { 383 msg->post(50000); 384 } 385 break; 386 } 387 388 case kWhatFetcherNotify: 389 { 390 int32_t what; 391 CHECK(msg->findInt32("what", &what)); 392 393 switch (what) { 394 case PlaylistFetcher::kWhatStarted: 395 break; 396 case PlaylistFetcher::kWhatPaused: 397 case PlaylistFetcher::kWhatStopped: 398 { 399 if (what == PlaylistFetcher::kWhatStopped) { 400 AString uri; 401 CHECK(msg->findString("uri", &uri)); 402 if (mFetcherInfos.removeItem(uri) < 0) { 403 // ignore duplicated kWhatStopped messages. 404 break; 405 } 406 407 if (mSwitchInProgress) { 408 tryToFinishBandwidthSwitch(); 409 } 410 } 411 412 if (mContinuation != NULL) { 413 CHECK_GT(mContinuationCounter, 0); 414 if (--mContinuationCounter == 0) { 415 mContinuation->post(); 416 417 if (mSeekReplyID != 0) { 418 CHECK(mSeekReply != NULL); 419 mSeekReply->setInt32("err", OK); 420 mSeekReply->postReply(mSeekReplyID); 421 mSeekReplyID = 0; 422 mSeekReply.clear(); 423 } 424 } 425 } 426 break; 427 } 428 429 case PlaylistFetcher::kWhatDurationUpdate: 430 { 431 AString uri; 432 CHECK(msg->findString("uri", &uri)); 433 434 int64_t durationUs; 435 CHECK(msg->findInt64("durationUs", &durationUs)); 436 437 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 438 info->mDurationUs = durationUs; 439 break; 440 } 441 442 case PlaylistFetcher::kWhatError: 443 { 444 status_t err; 445 CHECK(msg->findInt32("err", &err)); 446 447 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 448 449 // handle EOS on subtitle tracks independently 450 AString uri; 451 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 452 ssize_t i = mFetcherInfos.indexOfKey(uri); 453 if (i >= 0) { 454 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 455 if (fetcher != NULL) { 456 uint32_t type = fetcher->getStreamTypeMask(); 457 if (type == STREAMTYPE_SUBTITLES) { 458 mPacketSources.valueFor( 459 STREAMTYPE_SUBTITLES)->signalEOS(err);; 460 break; 461 } 462 } 463 } 464 } 465 466 if (mInPreparationPhase) { 467 postPrepared(err); 468 } 469 470 cancelBandwidthSwitch(); 471 472 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 473 474 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 475 476 mPacketSources.valueFor( 477 STREAMTYPE_SUBTITLES)->signalEOS(err); 478 479 sp<AMessage> notify = mNotify->dup(); 480 notify->setInt32("what", kWhatError); 481 notify->setInt32("err", err); 482 notify->post(); 483 break; 484 } 485 486 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 487 { 488 AString uri; 489 CHECK(msg->findString("uri", &uri)); 490 491 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 492 info->mIsPrepared = true; 493 494 if (mInPreparationPhase) { 495 bool allFetchersPrepared = true; 496 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 497 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 498 allFetchersPrepared = false; 499 break; 500 } 501 } 502 503 if (allFetchersPrepared) { 504 postPrepared(OK); 505 } 506 } 507 break; 508 } 509 510 case PlaylistFetcher::kWhatStartedAt: 511 { 512 int32_t switchGeneration; 513 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 514 515 if (switchGeneration != mSwitchGeneration) { 516 break; 517 } 518 519 // Resume fetcher for the original variant; the resumed fetcher should 520 // continue until the timestamps found in msg, which is stored by the 521 // new fetcher to indicate where the new variant has started buffering. 522 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 523 const FetcherInfo info = mFetcherInfos.valueAt(i); 524 if (info.mToBeRemoved) { 525 info.mFetcher->resumeUntilAsync(msg); 526 } 527 } 528 break; 529 } 530 531 default: 532 TRESPASS(); 533 } 534 535 break; 536 } 537 538 case kWhatCheckBandwidth: 539 { 540 int32_t generation; 541 CHECK(msg->findInt32("generation", &generation)); 542 543 if (generation != mCheckBandwidthGeneration) { 544 break; 545 } 546 547 onCheckBandwidth(msg); 548 break; 549 } 550 551 case kWhatChangeConfiguration: 552 { 553 onChangeConfiguration(msg); 554 break; 555 } 556 557 case kWhatChangeConfiguration2: 558 { 559 onChangeConfiguration2(msg); 560 break; 561 } 562 563 case kWhatChangeConfiguration3: 564 { 565 onChangeConfiguration3(msg); 566 break; 567 } 568 569 case kWhatFinishDisconnect2: 570 { 571 onFinishDisconnect2(); 572 break; 573 } 574 575 case kWhatSwapped: 576 { 577 onSwapped(msg); 578 break; 579 } 580 581 case kWhatCheckSwitchDown: 582 { 583 onCheckSwitchDown(); 584 break; 585 } 586 587 case kWhatSwitchDown: 588 { 589 onSwitchDown(); 590 break; 591 } 592 593 default: 594 TRESPASS(); 595 break; 596 } 597} 598 599// static 600int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 601 if (a->mBandwidth < b->mBandwidth) { 602 return -1; 603 } else if (a->mBandwidth == b->mBandwidth) { 604 return 0; 605 } 606 607 return 1; 608} 609 610// static 611LiveSession::StreamType LiveSession::indexToType(int idx) { 612 CHECK(idx >= 0 && idx < kMaxStreams); 613 return (StreamType)(1 << idx); 614} 615 616// static 617ssize_t LiveSession::typeToIndex(int32_t type) { 618 switch (type) { 619 case STREAMTYPE_AUDIO: 620 return 0; 621 case STREAMTYPE_VIDEO: 622 return 1; 623 case STREAMTYPE_SUBTITLES: 624 return 2; 625 default: 626 return -1; 627 }; 628 return -1; 629} 630 631void LiveSession::onConnect(const sp<AMessage> &msg) { 632 AString url; 633 CHECK(msg->findString("url", &url)); 634 635 KeyedVector<String8, String8> *headers = NULL; 636 if (!msg->findPointer("headers", (void **)&headers)) { 637 mExtraHeaders.clear(); 638 } else { 639 mExtraHeaders = *headers; 640 641 delete headers; 642 headers = NULL; 643 } 644 645 // TODO currently we don't know if we are coming here from incognito mode 646 ALOGI("onConnect %s", uriDebugString(url).c_str()); 647 648 mMasterURL = url; 649 650 bool dummy; 651 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 652 653 if (mPlaylist == NULL) { 654 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 655 656 postPrepared(ERROR_IO); 657 return; 658 } 659 660 // We trust the content provider to make a reasonable choice of preferred 661 // initial bandwidth by listing it first in the variant playlist. 662 // At startup we really don't have a good estimate on the available 663 // network bandwidth since we haven't tranferred any data yet. Once 664 // we have we can make a better informed choice. 665 size_t initialBandwidth = 0; 666 size_t initialBandwidthIndex = 0; 667 668 if (mPlaylist->isVariantPlaylist()) { 669 for (size_t i = 0; i < mPlaylist->size(); ++i) { 670 BandwidthItem item; 671 672 item.mPlaylistIndex = i; 673 674 sp<AMessage> meta; 675 AString uri; 676 mPlaylist->itemAt(i, &uri, &meta); 677 678 unsigned long bandwidth; 679 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 680 681 if (initialBandwidth == 0) { 682 initialBandwidth = item.mBandwidth; 683 } 684 685 mBandwidthItems.push(item); 686 } 687 688 CHECK_GT(mBandwidthItems.size(), 0u); 689 690 mBandwidthItems.sort(SortByBandwidth); 691 692 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 693 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 694 initialBandwidthIndex = i; 695 break; 696 } 697 } 698 } else { 699 // dummy item. 700 BandwidthItem item; 701 item.mPlaylistIndex = 0; 702 item.mBandwidth = 0; 703 mBandwidthItems.push(item); 704 } 705 706 mPlaylist->pickRandomMediaItems(); 707 changeConfiguration( 708 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 709} 710 711void LiveSession::finishDisconnect() { 712 // No reconfiguration is currently pending, make sure none will trigger 713 // during disconnection either. 714 cancelCheckBandwidthEvent(); 715 716 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 717 // (finishDisconnect, onFinishDisconnect2) 718 cancelBandwidthSwitch(); 719 720 // cancel switch down monitor 721 mSwitchDownMonitor.clear(); 722 723 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 724 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 725 } 726 727 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 728 729 mContinuationCounter = mFetcherInfos.size(); 730 mContinuation = msg; 731 732 if (mContinuationCounter == 0) { 733 msg->post(); 734 } 735} 736 737void LiveSession::onFinishDisconnect2() { 738 mContinuation.clear(); 739 740 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 741 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 742 743 mPacketSources.valueFor( 744 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 745 746 sp<AMessage> response = new AMessage; 747 response->setInt32("err", OK); 748 749 response->postReply(mDisconnectReplyID); 750 mDisconnectReplyID = 0; 751} 752 753sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 754 ssize_t index = mFetcherInfos.indexOfKey(uri); 755 756 if (index >= 0) { 757 return NULL; 758 } 759 760 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 761 notify->setString("uri", uri); 762 notify->setInt32("switchGeneration", mSwitchGeneration); 763 764 FetcherInfo info; 765 info.mFetcher = new PlaylistFetcher(notify, this, uri); 766 info.mDurationUs = -1ll; 767 info.mIsPrepared = false; 768 info.mToBeRemoved = false; 769 looper()->registerHandler(info.mFetcher); 770 771 mFetcherInfos.add(uri, info); 772 773 return info.mFetcher; 774} 775 776/* 777 * Illustration of parameters: 778 * 779 * 0 `range_offset` 780 * +------------+-------------------------------------------------------+--+--+ 781 * | | | next block to fetch | | | 782 * | | `source` handle => `out` buffer | | | | 783 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 784 * | |<----------- `range_length` / buffer capacity ----------->| | 785 * |<------------------------------ file_size ------------------------------->| 786 * 787 * Special parameter values: 788 * - range_length == -1 means entire file 789 * - block_size == 0 means entire range 790 * 791 */ 792ssize_t LiveSession::fetchFile( 793 const char *url, sp<ABuffer> *out, 794 int64_t range_offset, int64_t range_length, 795 uint32_t block_size, /* download block size */ 796 sp<DataSource> *source, /* to return and reuse source */ 797 String8 *actualUrl) { 798 off64_t size; 799 sp<DataSource> temp_source; 800 if (source == NULL) { 801 source = &temp_source; 802 } 803 804 if (*source == NULL) { 805 if (!strncasecmp(url, "file://", 7)) { 806 *source = new FileSource(url + 7); 807 } else if (strncasecmp(url, "http://", 7) 808 && strncasecmp(url, "https://", 8)) { 809 return ERROR_UNSUPPORTED; 810 } else { 811 KeyedVector<String8, String8> headers = mExtraHeaders; 812 if (range_offset > 0 || range_length >= 0) { 813 headers.add( 814 String8("Range"), 815 String8( 816 StringPrintf( 817 "bytes=%lld-%s", 818 range_offset, 819 range_length < 0 820 ? "" : StringPrintf("%lld", 821 range_offset + range_length - 1).c_str()).c_str())); 822 } 823 status_t err = mHTTPDataSource->connect(url, &headers); 824 825 if (err != OK) { 826 return err; 827 } 828 829 *source = mHTTPDataSource; 830 } 831 } 832 833 status_t getSizeErr = (*source)->getSize(&size); 834 if (getSizeErr != OK) { 835 size = 65536; 836 } 837 838 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 839 if (*out == NULL) { 840 buffer->setRange(0, 0); 841 } 842 843 ssize_t bytesRead = 0; 844 // adjust range_length if only reading partial block 845 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 846 range_length = buffer->size() + block_size; 847 } 848 for (;;) { 849 // Only resize when we don't know the size. 850 size_t bufferRemaining = buffer->capacity() - buffer->size(); 851 if (bufferRemaining == 0 && getSizeErr != OK) { 852 bufferRemaining = 32768; 853 854 ALOGV("increasing download buffer to %zu bytes", 855 buffer->size() + bufferRemaining); 856 857 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 858 memcpy(copy->data(), buffer->data(), buffer->size()); 859 copy->setRange(0, buffer->size()); 860 861 buffer = copy; 862 } 863 864 size_t maxBytesToRead = bufferRemaining; 865 if (range_length >= 0) { 866 int64_t bytesLeftInRange = range_length - buffer->size(); 867 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 868 maxBytesToRead = bytesLeftInRange; 869 870 if (bytesLeftInRange == 0) { 871 break; 872 } 873 } 874 } 875 876 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 877 // to help us break out of the loop. 878 ssize_t n = (*source)->readAt( 879 buffer->size(), buffer->data() + buffer->size(), 880 maxBytesToRead); 881 882 if (n < 0) { 883 return n; 884 } 885 886 if (n == 0) { 887 break; 888 } 889 890 buffer->setRange(0, buffer->size() + (size_t)n); 891 bytesRead += n; 892 } 893 894 *out = buffer; 895 if (actualUrl != NULL) { 896 *actualUrl = (*source)->getUri(); 897 if (actualUrl->isEmpty()) { 898 *actualUrl = url; 899 } 900 } 901 902 return bytesRead; 903} 904 905sp<M3UParser> LiveSession::fetchPlaylist( 906 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 907 ALOGV("fetchPlaylist '%s'", url); 908 909 *unchanged = false; 910 911 sp<ABuffer> buffer; 912 String8 actualUrl; 913 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 914 915 if (err <= 0) { 916 return NULL; 917 } 918 919 // MD5 functionality is not available on the simulator, treat all 920 // playlists as changed. 921 922#if defined(HAVE_ANDROID_OS) 923 uint8_t hash[16]; 924 925 MD5_CTX m; 926 MD5_Init(&m); 927 MD5_Update(&m, buffer->data(), buffer->size()); 928 929 MD5_Final(hash, &m); 930 931 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 932 // playlist unchanged 933 *unchanged = true; 934 935 return NULL; 936 } 937 938 if (curPlaylistHash != NULL) { 939 memcpy(curPlaylistHash, hash, sizeof(hash)); 940 } 941#endif 942 943 sp<M3UParser> playlist = 944 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 945 946 if (playlist->initCheck() != OK) { 947 ALOGE("failed to parse .m3u8 playlist"); 948 949 return NULL; 950 } 951 952 return playlist; 953} 954 955static double uniformRand() { 956 return (double)rand() / RAND_MAX; 957} 958 959size_t LiveSession::getBandwidthIndex() { 960 if (mBandwidthItems.size() == 0) { 961 return 0; 962 } 963 964#if 1 965 char value[PROPERTY_VALUE_MAX]; 966 ssize_t index = -1; 967 if (property_get("media.httplive.bw-index", value, NULL)) { 968 char *end; 969 index = strtol(value, &end, 10); 970 CHECK(end > value && *end == '\0'); 971 972 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 973 index = mBandwidthItems.size() - 1; 974 } 975 } 976 977 if (index < 0) { 978 int32_t bandwidthBps; 979 if (mHTTPDataSource != NULL 980 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 981 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 982 } else { 983 ALOGV("no bandwidth estimate."); 984 return 0; // Pick the lowest bandwidth stream by default. 985 } 986 987 char value[PROPERTY_VALUE_MAX]; 988 if (property_get("media.httplive.max-bw", value, NULL)) { 989 char *end; 990 long maxBw = strtoul(value, &end, 10); 991 if (end > value && *end == '\0') { 992 if (maxBw > 0 && bandwidthBps > maxBw) { 993 ALOGV("bandwidth capped to %ld bps", maxBw); 994 bandwidthBps = maxBw; 995 } 996 } 997 } 998 999 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1000 1001 index = mBandwidthItems.size() - 1; 1002 while (index > 0) { 1003 // consider only 80% of the available bandwidth, but if we are switching up, 1004 // be even more conservative (70%) to avoid overestimating and immediately 1005 // switching back. 1006 size_t adjustedBandwidthBps = bandwidthBps; 1007 if (index > mCurBandwidthIndex) { 1008 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 1009 } else { 1010 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 1011 } 1012 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1013 break; 1014 } 1015 --index; 1016 } 1017 } 1018#elif 0 1019 // Change bandwidth at random() 1020 size_t index = uniformRand() * mBandwidthItems.size(); 1021#elif 0 1022 // There's a 50% chance to stay on the current bandwidth and 1023 // a 50% chance to switch to the next higher bandwidth (wrapping around 1024 // to lowest) 1025 const size_t kMinIndex = 0; 1026 1027 static ssize_t mCurBandwidthIndex = -1; 1028 1029 size_t index; 1030 if (mCurBandwidthIndex < 0) { 1031 index = kMinIndex; 1032 } else if (uniformRand() < 0.5) { 1033 index = (size_t)mCurBandwidthIndex; 1034 } else { 1035 index = mCurBandwidthIndex + 1; 1036 if (index == mBandwidthItems.size()) { 1037 index = kMinIndex; 1038 } 1039 } 1040 mCurBandwidthIndex = index; 1041#elif 0 1042 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1043 1044 size_t index = mBandwidthItems.size() - 1; 1045 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1046 --index; 1047 } 1048#elif 1 1049 char value[PROPERTY_VALUE_MAX]; 1050 size_t index; 1051 if (property_get("media.httplive.bw-index", value, NULL)) { 1052 char *end; 1053 index = strtoul(value, &end, 10); 1054 CHECK(end > value && *end == '\0'); 1055 1056 if (index >= mBandwidthItems.size()) { 1057 index = mBandwidthItems.size() - 1; 1058 } 1059 } else { 1060 index = 0; 1061 } 1062#else 1063 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1064#endif 1065 1066 CHECK_GE(index, 0); 1067 1068 return index; 1069} 1070 1071status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1072 int64_t timeUs; 1073 CHECK(msg->findInt64("timeUs", &timeUs)); 1074 1075 if (!mReconfigurationInProgress) { 1076 changeConfiguration(timeUs, mCurBandwidthIndex); 1077 return OK; 1078 } else { 1079 return -EWOULDBLOCK; 1080 } 1081} 1082 1083status_t LiveSession::getDuration(int64_t *durationUs) const { 1084 int64_t maxDurationUs = 0ll; 1085 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1086 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1087 1088 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 1089 maxDurationUs = fetcherDurationUs; 1090 } 1091 } 1092 1093 *durationUs = maxDurationUs; 1094 1095 return OK; 1096} 1097 1098bool LiveSession::isSeekable() const { 1099 int64_t durationUs; 1100 return getDuration(&durationUs) == OK && durationUs >= 0; 1101} 1102 1103bool LiveSession::hasDynamicDuration() const { 1104 return false; 1105} 1106 1107size_t LiveSession::getTrackCount() const { 1108 if (mPlaylist == NULL) { 1109 return 0; 1110 } else { 1111 return mPlaylist->getTrackCount(); 1112 } 1113} 1114 1115sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1116 if (mPlaylist == NULL) { 1117 return NULL; 1118 } else { 1119 return mPlaylist->getTrackInfo(trackIndex); 1120 } 1121} 1122 1123status_t LiveSession::selectTrack(size_t index, bool select) { 1124 status_t err = mPlaylist->selectTrack(index, select); 1125 if (err == OK) { 1126 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1127 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1128 msg->setInt32("pickTrack", select); 1129 msg->post(); 1130 } 1131 return err; 1132} 1133 1134bool LiveSession::canSwitchUp() { 1135 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1136 status_t err = OK; 1137 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1138 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1139 int64_t dur = source->getBufferedDurationUs(&err); 1140 if (err == OK && dur > 10000000) { 1141 return true; 1142 } 1143 } 1144 return false; 1145} 1146 1147void LiveSession::changeConfiguration( 1148 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1149 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1150 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1151 cancelBandwidthSwitch(); 1152 1153 CHECK(!mReconfigurationInProgress); 1154 mReconfigurationInProgress = true; 1155 1156 mCurBandwidthIndex = bandwidthIndex; 1157 1158 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1159 timeUs, bandwidthIndex, pickTrack); 1160 1161 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1162 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1163 1164 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1165 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1166 1167 AString URIs[kMaxStreams]; 1168 for (size_t i = 0; i < kMaxStreams; ++i) { 1169 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1170 streamMask |= indexToType(i); 1171 } 1172 } 1173 1174 // Step 1, stop and discard fetchers that are no longer needed. 1175 // Pause those that we'll reuse. 1176 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1177 const AString &uri = mFetcherInfos.keyAt(i); 1178 1179 bool discardFetcher = true; 1180 1181 // If we're seeking all current fetchers are discarded. 1182 if (timeUs < 0ll) { 1183 // delay fetcher removal if not picking tracks 1184 discardFetcher = pickTrack; 1185 1186 for (size_t j = 0; j < kMaxStreams; ++j) { 1187 StreamType type = indexToType(j); 1188 if ((streamMask & type) && uri == URIs[j]) { 1189 resumeMask |= type; 1190 streamMask &= ~type; 1191 discardFetcher = false; 1192 } 1193 } 1194 } 1195 1196 if (discardFetcher) { 1197 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1198 } else { 1199 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1200 } 1201 } 1202 1203 sp<AMessage> msg; 1204 if (timeUs < 0ll) { 1205 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1206 msg = new AMessage(kWhatChangeConfiguration3, id()); 1207 } else { 1208 msg = new AMessage(kWhatChangeConfiguration2, id()); 1209 } 1210 msg->setInt32("streamMask", streamMask); 1211 msg->setInt32("resumeMask", resumeMask); 1212 msg->setInt32("pickTrack", pickTrack); 1213 msg->setInt64("timeUs", timeUs); 1214 for (size_t i = 0; i < kMaxStreams; ++i) { 1215 if ((streamMask | resumeMask) & indexToType(i)) { 1216 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1217 } 1218 } 1219 1220 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1221 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1222 // fetchers have completed their asynchronous operation, we'll post 1223 // mContinuation, which then is handled below in onChangeConfiguration2. 1224 mContinuationCounter = mFetcherInfos.size(); 1225 mContinuation = msg; 1226 1227 if (mContinuationCounter == 0) { 1228 msg->post(); 1229 1230 if (mSeekReplyID != 0) { 1231 CHECK(mSeekReply != NULL); 1232 mSeekReply->setInt32("err", OK); 1233 mSeekReply->postReply(mSeekReplyID); 1234 mSeekReplyID = 0; 1235 mSeekReply.clear(); 1236 } 1237 } 1238} 1239 1240void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1241 if (!mReconfigurationInProgress) { 1242 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1243 msg->findInt32("pickTrack", &pickTrack); 1244 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1245 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1246 } else { 1247 msg->post(1000000ll); // retry in 1 sec 1248 } 1249} 1250 1251void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1252 mContinuation.clear(); 1253 1254 // All fetchers are either suspended or have been removed now. 1255 1256 uint32_t streamMask, resumeMask; 1257 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1258 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1259 1260 // currently onChangeConfiguration2 is only called for seeking; 1261 // remove the following CHECK if using it else where. 1262 CHECK_EQ(resumeMask, 0); 1263 streamMask |= resumeMask; 1264 1265 AString URIs[kMaxStreams]; 1266 for (size_t i = 0; i < kMaxStreams; ++i) { 1267 if (streamMask & indexToType(i)) { 1268 const AString &uriKey = mStreams[i].uriKey(); 1269 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1270 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1271 } 1272 } 1273 1274 // Determine which decoders to shutdown on the player side, 1275 // a decoder has to be shutdown if either 1276 // 1) its streamtype was active before but now longer isn't. 1277 // or 1278 // 2) its streamtype was already active and still is but the URI 1279 // has changed. 1280 uint32_t changedMask = 0; 1281 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1282 if (((mStreamMask & streamMask & indexToType(i)) 1283 && !(URIs[i] == mStreams[i].mUri)) 1284 || (mStreamMask & ~streamMask & indexToType(i))) { 1285 changedMask |= indexToType(i); 1286 } 1287 } 1288 1289 if (changedMask == 0) { 1290 // If nothing changed as far as the audio/video decoders 1291 // are concerned we can proceed. 1292 onChangeConfiguration3(msg); 1293 return; 1294 } 1295 1296 // Something changed, inform the player which will shutdown the 1297 // corresponding decoders and will post the reply once that's done. 1298 // Handling the reply will continue executing below in 1299 // onChangeConfiguration3. 1300 sp<AMessage> notify = mNotify->dup(); 1301 notify->setInt32("what", kWhatStreamsChanged); 1302 notify->setInt32("changedMask", changedMask); 1303 1304 msg->setWhat(kWhatChangeConfiguration3); 1305 msg->setTarget(id()); 1306 1307 notify->setMessage("reply", msg); 1308 notify->post(); 1309} 1310 1311void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1312 mContinuation.clear(); 1313 // All remaining fetchers are still suspended, the player has shutdown 1314 // any decoders that needed it. 1315 1316 uint32_t streamMask, resumeMask; 1317 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1318 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1319 1320 int64_t timeUs; 1321 int32_t pickTrack; 1322 bool switching = false; 1323 CHECK(msg->findInt64("timeUs", &timeUs)); 1324 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1325 1326 if (timeUs < 0ll) { 1327 if (!pickTrack) { 1328 switching = true; 1329 } 1330 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1331 } else { 1332 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1333 } 1334 1335 for (size_t i = 0; i < kMaxStreams; ++i) { 1336 if (streamMask & indexToType(i)) { 1337 if (switching) { 1338 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1339 } else { 1340 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1341 } 1342 } 1343 } 1344 1345 mNewStreamMask = streamMask | resumeMask; 1346 if (switching) { 1347 mSwapMask = mStreamMask & ~resumeMask; 1348 } 1349 1350 // Of all existing fetchers: 1351 // * Resume fetchers that are still needed and assign them original packet sources. 1352 // * Mark otherwise unneeded fetchers for removal. 1353 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1354 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1355 const AString &uri = mFetcherInfos.keyAt(i); 1356 1357 sp<AnotherPacketSource> sources[kMaxStreams]; 1358 for (size_t j = 0; j < kMaxStreams; ++j) { 1359 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1360 sources[j] = mPacketSources.valueFor(indexToType(j)); 1361 1362 if (j != kSubtitleIndex) { 1363 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1364 sp<AnotherPacketSource> discontinuityQueue; 1365 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1366 discontinuityQueue->queueDiscontinuity( 1367 ATSParser::DISCONTINUITY_NONE, 1368 NULL, 1369 true); 1370 } 1371 } 1372 } 1373 1374 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1375 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1376 || sources[kSubtitleIndex] != NULL) { 1377 info.mFetcher->startAsync( 1378 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1379 } else { 1380 info.mToBeRemoved = true; 1381 } 1382 } 1383 1384 // streamMask now only contains the types that need a new fetcher created. 1385 1386 if (streamMask != 0) { 1387 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1388 } 1389 1390 // Find out when the original fetchers have buffered up to and start the new fetchers 1391 // at a later timestamp. 1392 for (size_t i = 0; i < kMaxStreams; i++) { 1393 if (!(indexToType(i) & streamMask)) { 1394 continue; 1395 } 1396 1397 AString uri; 1398 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1399 1400 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1401 CHECK(fetcher != NULL); 1402 1403 int32_t latestSeq = -1; 1404 int64_t startTimeUs = -1; 1405 int64_t segmentStartTimeUs = -1ll; 1406 int32_t discontinuitySeq = -1; 1407 sp<AnotherPacketSource> sources[kMaxStreams]; 1408 1409 // TRICKY: looping from i as earlier streams are already removed from streamMask 1410 for (size_t j = i; j < kMaxStreams; ++j) { 1411 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1412 if ((streamMask & indexToType(j)) && uri == streamUri) { 1413 sources[j] = mPacketSources.valueFor(indexToType(j)); 1414 1415 if (timeUs >= 0) { 1416 sources[j]->clear(); 1417 startTimeUs = timeUs; 1418 1419 sp<AnotherPacketSource> discontinuityQueue; 1420 sp<AMessage> extra = new AMessage; 1421 extra->setInt64("timeUs", timeUs); 1422 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1423 discontinuityQueue->queueDiscontinuity( 1424 ATSParser::DISCONTINUITY_SEEK, extra, true); 1425 } else { 1426 int32_t type; 1427 int64_t srcSegmentStartTimeUs; 1428 sp<AMessage> meta; 1429 if (pickTrack) { 1430 // selecting 1431 meta = sources[j]->getLatestDequeuedMeta(); 1432 } else { 1433 // adapting 1434 meta = sources[j]->getLatestEnqueuedMeta(); 1435 } 1436 1437 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1438 int64_t tmpUs; 1439 CHECK(meta->findInt64("timeUs", &tmpUs)); 1440 if (startTimeUs < 0 || tmpUs < startTimeUs) { 1441 startTimeUs = tmpUs; 1442 } 1443 1444 CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); 1445 if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { 1446 segmentStartTimeUs = tmpUs; 1447 } 1448 1449 int32_t seq; 1450 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1451 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1452 discontinuitySeq = seq; 1453 } 1454 } 1455 1456 if (pickTrack) { 1457 // selecting track, queue discontinuities before content 1458 sources[j]->clear(); 1459 if (j == kSubtitleIndex) { 1460 break; 1461 } 1462 sp<AnotherPacketSource> discontinuityQueue; 1463 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1464 discontinuityQueue->queueDiscontinuity( 1465 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1466 } else { 1467 // adapting, queue discontinuities after resume 1468 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1469 sources[j]->clear(); 1470 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1471 if (extraStreams & indexToType(j)) { 1472 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1473 } 1474 } 1475 } 1476 1477 streamMask &= ~indexToType(j); 1478 } 1479 } 1480 1481 fetcher->startAsync( 1482 sources[kAudioIndex], 1483 sources[kVideoIndex], 1484 sources[kSubtitleIndex], 1485 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1486 segmentStartTimeUs, 1487 discontinuitySeq, 1488 switching); 1489 } 1490 1491 // All fetchers have now been started, the configuration change 1492 // has completed. 1493 1494 cancelCheckBandwidthEvent(); 1495 scheduleCheckBandwidthEvent(); 1496 1497 ALOGV("XXX configuration change completed."); 1498 mReconfigurationInProgress = false; 1499 if (switching) { 1500 mSwitchInProgress = true; 1501 } else { 1502 mStreamMask = mNewStreamMask; 1503 } 1504 1505 if (mDisconnectReplyID != 0) { 1506 finishDisconnect(); 1507 } 1508} 1509 1510void LiveSession::onSwapped(const sp<AMessage> &msg) { 1511 int32_t switchGeneration; 1512 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1513 if (switchGeneration != mSwitchGeneration) { 1514 return; 1515 } 1516 1517 int32_t stream; 1518 CHECK(msg->findInt32("stream", &stream)); 1519 1520 ssize_t idx = typeToIndex(stream); 1521 CHECK(idx >= 0); 1522 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1523 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1524 } 1525 mStreams[idx].mUri = mStreams[idx].mNewUri; 1526 mStreams[idx].mNewUri.clear(); 1527 1528 mSwapMask &= ~stream; 1529 if (mSwapMask != 0) { 1530 return; 1531 } 1532 1533 // Check if new variant contains extra streams. 1534 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1535 while (extraStreams) { 1536 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1537 swapPacketSource(extraStream); 1538 extraStreams &= ~extraStream; 1539 1540 idx = typeToIndex(extraStream); 1541 CHECK(idx >= 0); 1542 if (mStreams[idx].mNewUri.empty()) { 1543 ALOGW("swapping extra stream type %d %s to empty stream", 1544 extraStream, mStreams[idx].mUri.c_str()); 1545 } 1546 mStreams[idx].mUri = mStreams[idx].mNewUri; 1547 mStreams[idx].mNewUri.clear(); 1548 } 1549 1550 tryToFinishBandwidthSwitch(); 1551} 1552 1553void LiveSession::onCheckSwitchDown() { 1554 if (mSwitchDownMonitor == NULL) { 1555 return; 1556 } 1557 1558 for (size_t i = 0; i < kMaxStreams; ++i) { 1559 int32_t targetDuration; 1560 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1561 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1562 1563 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1564 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1565 int64_t targetDurationUs = targetDuration * 1000000ll; 1566 1567 if (bufferedDurationUs < targetDurationUs / 3) { 1568 (new AMessage(kWhatSwitchDown, id()))->post(); 1569 break; 1570 } 1571 } 1572 } 1573 1574 mSwitchDownMonitor->post(1000000ll); 1575} 1576 1577void LiveSession::onSwitchDown() { 1578 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1579 return; 1580 } 1581 1582 ssize_t bandwidthIndex = getBandwidthIndex(); 1583 if (bandwidthIndex < mCurBandwidthIndex) { 1584 changeConfiguration(-1, bandwidthIndex, false); 1585 return; 1586 } 1587 1588 changeConfiguration(-1, mCurBandwidthIndex - 1, false); 1589} 1590 1591// Mark switch done when: 1592// 1. all old buffers are swapped out 1593void LiveSession::tryToFinishBandwidthSwitch() { 1594 if (!mSwitchInProgress) { 1595 return; 1596 } 1597 1598 bool needToRemoveFetchers = false; 1599 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1600 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1601 needToRemoveFetchers = true; 1602 break; 1603 } 1604 } 1605 1606 if (!needToRemoveFetchers && mSwapMask == 0) { 1607 ALOGI("mSwitchInProgress = false"); 1608 mStreamMask = mNewStreamMask; 1609 mSwitchInProgress = false; 1610 } 1611} 1612 1613void LiveSession::scheduleCheckBandwidthEvent() { 1614 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1615 msg->setInt32("generation", mCheckBandwidthGeneration); 1616 msg->post(10000000ll); 1617} 1618 1619void LiveSession::cancelCheckBandwidthEvent() { 1620 ++mCheckBandwidthGeneration; 1621} 1622 1623void LiveSession::cancelBandwidthSwitch() { 1624 Mutex::Autolock lock(mSwapMutex); 1625 mSwitchGeneration++; 1626 mSwitchInProgress = false; 1627 mSwapMask = 0; 1628 1629 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1630 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1631 if (info.mToBeRemoved) { 1632 info.mToBeRemoved = false; 1633 } 1634 } 1635 1636 for (size_t i = 0; i < kMaxStreams; ++i) { 1637 if (!mStreams[i].mNewUri.empty()) { 1638 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1639 if (j < 0) { 1640 mStreams[i].mNewUri.clear(); 1641 continue; 1642 } 1643 1644 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1645 info.mFetcher->stopAsync(); 1646 mFetcherInfos.removeItemsAt(j); 1647 mStreams[i].mNewUri.clear(); 1648 } 1649 } 1650} 1651 1652bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1653 if (mReconfigurationInProgress || mSwitchInProgress) { 1654 return false; 1655 } 1656 1657 if (mCurBandwidthIndex < 0) { 1658 return true; 1659 } 1660 1661 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1662 return false; 1663 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1664 return canSwitchUp(); 1665 } else { 1666 return true; 1667 } 1668} 1669 1670void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { 1671 size_t bandwidthIndex = getBandwidthIndex(); 1672 if (canSwitchBandwidthTo(bandwidthIndex)) { 1673 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1674 } else { 1675 // Come back and check again 10 seconds later in case there is nothing to do now. 1676 // If we DO change configuration, once that completes it'll schedule a new 1677 // check bandwidth event with an incremented mCheckBandwidthGeneration. 1678 msg->post(10000000ll); 1679 } 1680} 1681 1682void LiveSession::postPrepared(status_t err) { 1683 CHECK(mInPreparationPhase); 1684 1685 sp<AMessage> notify = mNotify->dup(); 1686 if (err == OK || err == ERROR_END_OF_STREAM) { 1687 notify->setInt32("what", kWhatPrepared); 1688 } else { 1689 notify->setInt32("what", kWhatPreparationFailed); 1690 notify->setInt32("err", err); 1691 } 1692 1693 notify->post(); 1694 1695 mInPreparationPhase = false; 1696 1697 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1698 mSwitchDownMonitor->post(); 1699} 1700 1701} // namespace android 1702 1703