LiveSession.cpp revision b44ce2f84691559672cfaf6bb8fd3a9ac43904f2
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52LiveSession::LiveSession( 53 const sp<AMessage> ¬ify, uint32_t flags, 54 const sp<IMediaHTTPService> &httpService) 55 : mNotify(notify), 56 mFlags(flags), 57 mHTTPService(httpService), 58 mInPreparationPhase(true), 59 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 60 mCurBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mSubtitleGeneration(0), 67 mLastDequeuedTimeUs(0ll), 68 mRealTimeBaseUs(0ll), 69 mReconfigurationInProgress(false), 70 mSwitchInProgress(false), 71 mDisconnectReplyID(0), 72 mSeekReplyID(0), 73 mFirstTimeUsValid(false), 74 mFirstTimeUs(0), 75 mLastSeekTimeUs(0) { 76 77 mStreams[kAudioIndex] = StreamItem("audio"); 78 mStreams[kVideoIndex] = StreamItem("video"); 79 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 80 81 for (size_t i = 0; i < kMaxStreams; ++i) { 82 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 84 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 85 mBuffering[i] = false; 86 } 87} 88 89LiveSession::~LiveSession() { 90} 91 92sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 93 ABuffer *discontinuity = new ABuffer(0); 94 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 95 discontinuity->meta()->setInt32("swapPacketSource", swap); 96 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 97 discontinuity->meta()->setInt64("timeUs", -1); 98 return discontinuity; 99} 100 101void LiveSession::swapPacketSource(StreamType stream) { 102 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 103 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 104 sp<AnotherPacketSource> tmp = aps; 105 aps = aps2; 106 aps2 = tmp; 107 aps2->clear(); 108} 109 110status_t LiveSession::dequeueAccessUnit( 111 StreamType stream, sp<ABuffer> *accessUnit) { 112 if (!(mStreamMask & stream)) { 113 // return -EWOULDBLOCK to avoid halting the decoder 114 // when switching between audio/video and audio only. 115 return -EWOULDBLOCK; 116 } 117 118 status_t finalResult; 119 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 120 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 121 discontinuityQueue->dequeueAccessUnit(accessUnit); 122 // seeking, track switching 123 sp<AMessage> extra; 124 int64_t timeUs; 125 if ((*accessUnit)->meta()->findMessage("extra", &extra) 126 && extra != NULL 127 && extra->findInt64("timeUs", &timeUs)) { 128 // seeking only 129 mLastSeekTimeUs = timeUs; 130 mDiscontinuityOffsetTimesUs.clear(); 131 mDiscontinuityAbsStartTimesUs.clear(); 132 } 133 return INFO_DISCONTINUITY; 134 } 135 136 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 137 138 ssize_t idx = typeToIndex(stream); 139 if (!packetSource->hasBufferAvailable(&finalResult)) { 140 if (finalResult == OK) { 141 mBuffering[idx] = true; 142 return -EAGAIN; 143 } else { 144 return finalResult; 145 } 146 } 147 148 if (mBuffering[idx]) { 149 if (mSwitchInProgress 150 || packetSource->isFinished(0) 151 || packetSource->getEstimatedDurationUs() > 10000000ll) { 152 mBuffering[idx] = false; 153 } 154 } 155 156 if (mBuffering[idx]) { 157 return -EAGAIN; 158 } 159 160 // wait for counterpart 161 sp<AnotherPacketSource> otherSource; 162 uint32_t mask = mNewStreamMask & mStreamMask; 163 uint32_t fetchersMask = 0; 164 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 165 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 166 fetchersMask |= fetcherMask; 167 } 168 mask &= fetchersMask; 169 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 170 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 171 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 172 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 173 } 174 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 175 return finalResult == OK ? -EAGAIN : finalResult; 176 } 177 178 status_t err = packetSource->dequeueAccessUnit(accessUnit); 179 180 size_t streamIdx; 181 const char *streamStr; 182 switch (stream) { 183 case STREAMTYPE_AUDIO: 184 streamIdx = kAudioIndex; 185 streamStr = "audio"; 186 break; 187 case STREAMTYPE_VIDEO: 188 streamIdx = kVideoIndex; 189 streamStr = "video"; 190 break; 191 case STREAMTYPE_SUBTITLES: 192 streamIdx = kSubtitleIndex; 193 streamStr = "subs"; 194 break; 195 default: 196 TRESPASS(); 197 } 198 199 StreamItem& strm = mStreams[streamIdx]; 200 if (err == INFO_DISCONTINUITY) { 201 // adaptive streaming, discontinuities in the playlist 202 int32_t type; 203 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 204 205 sp<AMessage> extra; 206 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 207 extra.clear(); 208 } 209 210 ALOGI("[%s] read discontinuity of type %d, extra = %s", 211 streamStr, 212 type, 213 extra == NULL ? "NULL" : extra->debugString().c_str()); 214 215 int32_t swap; 216 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 217 int32_t switchGeneration; 218 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 219 { 220 Mutex::Autolock lock(mSwapMutex); 221 if (switchGeneration == mSwitchGeneration) { 222 swapPacketSource(stream); 223 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 224 msg->setInt32("stream", stream); 225 msg->setInt32("switchGeneration", switchGeneration); 226 msg->post(); 227 } 228 } 229 } else { 230 size_t seq = strm.mCurDiscontinuitySeq; 231 int64_t offsetTimeUs; 232 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 233 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 234 } else { 235 offsetTimeUs = 0; 236 } 237 238 seq += 1; 239 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 240 int64_t firstTimeUs; 241 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 242 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 243 offsetTimeUs += strm.mLastSampleDurationUs; 244 } else { 245 offsetTimeUs += strm.mLastSampleDurationUs; 246 } 247 248 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 249 } 250 } else if (err == OK) { 251 252 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 253 int64_t timeUs; 254 int32_t discontinuitySeq = 0; 255 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 256 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 257 strm.mCurDiscontinuitySeq = discontinuitySeq; 258 259 int32_t discard = 0; 260 int64_t firstTimeUs; 261 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 262 int64_t durUs; // approximate sample duration 263 if (timeUs > strm.mLastDequeuedTimeUs) { 264 durUs = timeUs - strm.mLastDequeuedTimeUs; 265 } else { 266 durUs = strm.mLastDequeuedTimeUs - timeUs; 267 } 268 strm.mLastSampleDurationUs = durUs; 269 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 270 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 271 firstTimeUs = timeUs; 272 } else { 273 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 274 firstTimeUs = timeUs; 275 } 276 277 strm.mLastDequeuedTimeUs = timeUs; 278 if (timeUs >= firstTimeUs) { 279 timeUs -= firstTimeUs; 280 } else { 281 timeUs = 0; 282 } 283 timeUs += mLastSeekTimeUs; 284 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 285 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 286 } 287 288 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 289 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 290 mLastDequeuedTimeUs = timeUs; 291 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 292 } else if (stream == STREAMTYPE_SUBTITLES) { 293 int32_t subtitleGeneration; 294 if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration) 295 && subtitleGeneration != mSubtitleGeneration) { 296 return -EAGAIN; 297 }; 298 (*accessUnit)->meta()->setInt32( 299 "trackIndex", mPlaylist->getSelectedIndex()); 300 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 301 } 302 } else { 303 ALOGI("[%s] encountered error %d", streamStr, err); 304 } 305 306 return err; 307} 308 309status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 310 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 311 if (!(mStreamMask & stream)) { 312 return UNKNOWN_ERROR; 313 } 314 315 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 316 317 sp<MetaData> meta = packetSource->getFormat(); 318 319 if (meta == NULL) { 320 return -EAGAIN; 321 } 322 323 return convertMetaDataToMessage(meta, format); 324} 325 326void LiveSession::connectAsync( 327 const char *url, const KeyedVector<String8, String8> *headers) { 328 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 329 msg->setString("url", url); 330 331 if (headers != NULL) { 332 msg->setPointer( 333 "headers", 334 new KeyedVector<String8, String8>(*headers)); 335 } 336 337 msg->post(); 338} 339 340status_t LiveSession::disconnect() { 341 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 342 343 sp<AMessage> response; 344 status_t err = msg->postAndAwaitResponse(&response); 345 346 return err; 347} 348 349status_t LiveSession::seekTo(int64_t timeUs) { 350 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 351 msg->setInt64("timeUs", timeUs); 352 353 sp<AMessage> response; 354 status_t err = msg->postAndAwaitResponse(&response); 355 356 uint32_t replyID; 357 CHECK(response == mSeekReply && 0 != mSeekReplyID); 358 mSeekReply.clear(); 359 mSeekReplyID = 0; 360 return err; 361} 362 363void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 364 switch (msg->what()) { 365 case kWhatConnect: 366 { 367 onConnect(msg); 368 break; 369 } 370 371 case kWhatDisconnect: 372 { 373 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 374 375 if (mReconfigurationInProgress) { 376 break; 377 } 378 379 finishDisconnect(); 380 break; 381 } 382 383 case kWhatSeek: 384 { 385 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 386 387 status_t err = onSeek(msg); 388 389 mSeekReply = new AMessage; 390 mSeekReply->setInt32("err", err); 391 break; 392 } 393 394 case kWhatFetcherNotify: 395 { 396 int32_t what; 397 CHECK(msg->findInt32("what", &what)); 398 399 switch (what) { 400 case PlaylistFetcher::kWhatStarted: 401 break; 402 case PlaylistFetcher::kWhatPaused: 403 case PlaylistFetcher::kWhatStopped: 404 { 405 if (what == PlaylistFetcher::kWhatStopped) { 406 AString uri; 407 CHECK(msg->findString("uri", &uri)); 408 if (mFetcherInfos.removeItem(uri) < 0) { 409 // ignore duplicated kWhatStopped messages. 410 break; 411 } 412 413 if (mSwitchInProgress) { 414 tryToFinishBandwidthSwitch(); 415 } 416 } 417 418 if (mContinuation != NULL) { 419 CHECK_GT(mContinuationCounter, 0); 420 if (--mContinuationCounter == 0) { 421 mContinuation->post(); 422 423 if (mSeekReplyID != 0) { 424 CHECK(mSeekReply != NULL); 425 mSeekReply->postReply(mSeekReplyID); 426 } 427 } 428 } 429 break; 430 } 431 432 case PlaylistFetcher::kWhatDurationUpdate: 433 { 434 AString uri; 435 CHECK(msg->findString("uri", &uri)); 436 437 int64_t durationUs; 438 CHECK(msg->findInt64("durationUs", &durationUs)); 439 440 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 441 info->mDurationUs = durationUs; 442 break; 443 } 444 445 case PlaylistFetcher::kWhatError: 446 { 447 status_t err; 448 CHECK(msg->findInt32("err", &err)); 449 450 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 451 452 // handle EOS on subtitle tracks independently 453 AString uri; 454 if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) { 455 ssize_t i = mFetcherInfos.indexOfKey(uri); 456 if (i >= 0) { 457 const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher; 458 if (fetcher != NULL) { 459 uint32_t type = fetcher->getStreamTypeMask(); 460 if (type == STREAMTYPE_SUBTITLES) { 461 mPacketSources.valueFor( 462 STREAMTYPE_SUBTITLES)->signalEOS(err);; 463 break; 464 } 465 } 466 } 467 } 468 469 if (mInPreparationPhase) { 470 postPrepared(err); 471 } 472 473 cancelBandwidthSwitch(); 474 475 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 476 477 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 478 479 mPacketSources.valueFor( 480 STREAMTYPE_SUBTITLES)->signalEOS(err); 481 482 sp<AMessage> notify = mNotify->dup(); 483 notify->setInt32("what", kWhatError); 484 notify->setInt32("err", err); 485 notify->post(); 486 break; 487 } 488 489 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 490 { 491 AString uri; 492 CHECK(msg->findString("uri", &uri)); 493 494 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 495 info->mIsPrepared = true; 496 497 if (mInPreparationPhase) { 498 bool allFetchersPrepared = true; 499 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 500 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 501 allFetchersPrepared = false; 502 break; 503 } 504 } 505 506 if (allFetchersPrepared) { 507 postPrepared(OK); 508 } 509 } 510 break; 511 } 512 513 case PlaylistFetcher::kWhatStartedAt: 514 { 515 int32_t switchGeneration; 516 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 517 518 if (switchGeneration != mSwitchGeneration) { 519 break; 520 } 521 522 // Resume fetcher for the original variant; the resumed fetcher should 523 // continue until the timestamps found in msg, which is stored by the 524 // new fetcher to indicate where the new variant has started buffering. 525 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 526 const FetcherInfo info = mFetcherInfos.valueAt(i); 527 if (info.mToBeRemoved) { 528 info.mFetcher->resumeUntilAsync(msg); 529 } 530 } 531 break; 532 } 533 534 default: 535 TRESPASS(); 536 } 537 538 break; 539 } 540 541 case kWhatCheckBandwidth: 542 { 543 int32_t generation; 544 CHECK(msg->findInt32("generation", &generation)); 545 546 if (generation != mCheckBandwidthGeneration) { 547 break; 548 } 549 550 onCheckBandwidth(msg); 551 break; 552 } 553 554 case kWhatChangeConfiguration: 555 { 556 onChangeConfiguration(msg); 557 break; 558 } 559 560 case kWhatChangeConfiguration2: 561 { 562 onChangeConfiguration2(msg); 563 break; 564 } 565 566 case kWhatChangeConfiguration3: 567 { 568 onChangeConfiguration3(msg); 569 break; 570 } 571 572 case kWhatFinishDisconnect2: 573 { 574 onFinishDisconnect2(); 575 break; 576 } 577 578 case kWhatSwapped: 579 { 580 onSwapped(msg); 581 break; 582 } 583 584 case kWhatCheckSwitchDown: 585 { 586 onCheckSwitchDown(); 587 break; 588 } 589 590 case kWhatSwitchDown: 591 { 592 onSwitchDown(); 593 break; 594 } 595 596 default: 597 TRESPASS(); 598 break; 599 } 600} 601 602// static 603int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 604 if (a->mBandwidth < b->mBandwidth) { 605 return -1; 606 } else if (a->mBandwidth == b->mBandwidth) { 607 return 0; 608 } 609 610 return 1; 611} 612 613// static 614LiveSession::StreamType LiveSession::indexToType(int idx) { 615 CHECK(idx >= 0 && idx < kMaxStreams); 616 return (StreamType)(1 << idx); 617} 618 619// static 620ssize_t LiveSession::typeToIndex(int32_t type) { 621 switch (type) { 622 case STREAMTYPE_AUDIO: 623 return 0; 624 case STREAMTYPE_VIDEO: 625 return 1; 626 case STREAMTYPE_SUBTITLES: 627 return 2; 628 default: 629 return -1; 630 }; 631 return -1; 632} 633 634void LiveSession::onConnect(const sp<AMessage> &msg) { 635 AString url; 636 CHECK(msg->findString("url", &url)); 637 638 KeyedVector<String8, String8> *headers = NULL; 639 if (!msg->findPointer("headers", (void **)&headers)) { 640 mExtraHeaders.clear(); 641 } else { 642 mExtraHeaders = *headers; 643 644 delete headers; 645 headers = NULL; 646 } 647 648 // TODO currently we don't know if we are coming here from incognito mode 649 ALOGI("onConnect %s", uriDebugString(url).c_str()); 650 651 mMasterURL = url; 652 653 bool dummy; 654 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 655 656 if (mPlaylist == NULL) { 657 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 658 659 postPrepared(ERROR_IO); 660 return; 661 } 662 663 // We trust the content provider to make a reasonable choice of preferred 664 // initial bandwidth by listing it first in the variant playlist. 665 // At startup we really don't have a good estimate on the available 666 // network bandwidth since we haven't tranferred any data yet. Once 667 // we have we can make a better informed choice. 668 size_t initialBandwidth = 0; 669 size_t initialBandwidthIndex = 0; 670 671 if (mPlaylist->isVariantPlaylist()) { 672 for (size_t i = 0; i < mPlaylist->size(); ++i) { 673 BandwidthItem item; 674 675 item.mPlaylistIndex = i; 676 677 sp<AMessage> meta; 678 AString uri; 679 mPlaylist->itemAt(i, &uri, &meta); 680 681 unsigned long bandwidth; 682 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 683 684 if (initialBandwidth == 0) { 685 initialBandwidth = item.mBandwidth; 686 } 687 688 mBandwidthItems.push(item); 689 } 690 691 CHECK_GT(mBandwidthItems.size(), 0u); 692 693 mBandwidthItems.sort(SortByBandwidth); 694 695 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 696 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 697 initialBandwidthIndex = i; 698 break; 699 } 700 } 701 } else { 702 // dummy item. 703 BandwidthItem item; 704 item.mPlaylistIndex = 0; 705 item.mBandwidth = 0; 706 mBandwidthItems.push(item); 707 } 708 709 mPlaylist->pickRandomMediaItems(); 710 changeConfiguration( 711 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 712} 713 714void LiveSession::finishDisconnect() { 715 // No reconfiguration is currently pending, make sure none will trigger 716 // during disconnection either. 717 cancelCheckBandwidthEvent(); 718 719 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 720 // (finishDisconnect, onFinishDisconnect2) 721 cancelBandwidthSwitch(); 722 723 // cancel switch down monitor 724 mSwitchDownMonitor.clear(); 725 726 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 727 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 728 } 729 730 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 731 732 mContinuationCounter = mFetcherInfos.size(); 733 mContinuation = msg; 734 735 if (mContinuationCounter == 0) { 736 msg->post(); 737 } 738} 739 740void LiveSession::onFinishDisconnect2() { 741 mContinuation.clear(); 742 743 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 744 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 745 746 mPacketSources.valueFor( 747 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 748 749 sp<AMessage> response = new AMessage; 750 response->setInt32("err", OK); 751 752 response->postReply(mDisconnectReplyID); 753 mDisconnectReplyID = 0; 754} 755 756sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 757 ssize_t index = mFetcherInfos.indexOfKey(uri); 758 759 if (index >= 0) { 760 return NULL; 761 } 762 763 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 764 notify->setString("uri", uri); 765 notify->setInt32("switchGeneration", mSwitchGeneration); 766 767 FetcherInfo info; 768 info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration); 769 info.mDurationUs = -1ll; 770 info.mIsPrepared = false; 771 info.mToBeRemoved = false; 772 looper()->registerHandler(info.mFetcher); 773 774 mFetcherInfos.add(uri, info); 775 776 return info.mFetcher; 777} 778 779/* 780 * Illustration of parameters: 781 * 782 * 0 `range_offset` 783 * +------------+-------------------------------------------------------+--+--+ 784 * | | | next block to fetch | | | 785 * | | `source` handle => `out` buffer | | | | 786 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 787 * | |<----------- `range_length` / buffer capacity ----------->| | 788 * |<------------------------------ file_size ------------------------------->| 789 * 790 * Special parameter values: 791 * - range_length == -1 means entire file 792 * - block_size == 0 means entire range 793 * 794 */ 795ssize_t LiveSession::fetchFile( 796 const char *url, sp<ABuffer> *out, 797 int64_t range_offset, int64_t range_length, 798 uint32_t block_size, /* download block size */ 799 sp<DataSource> *source, /* to return and reuse source */ 800 String8 *actualUrl) { 801 off64_t size; 802 sp<DataSource> temp_source; 803 if (source == NULL) { 804 source = &temp_source; 805 } 806 807 if (*source == NULL) { 808 if (!strncasecmp(url, "file://", 7)) { 809 *source = new FileSource(url + 7); 810 } else if (strncasecmp(url, "http://", 7) 811 && strncasecmp(url, "https://", 8)) { 812 return ERROR_UNSUPPORTED; 813 } else { 814 KeyedVector<String8, String8> headers = mExtraHeaders; 815 if (range_offset > 0 || range_length >= 0) { 816 headers.add( 817 String8("Range"), 818 String8( 819 StringPrintf( 820 "bytes=%lld-%s", 821 range_offset, 822 range_length < 0 823 ? "" : StringPrintf("%lld", 824 range_offset + range_length - 1).c_str()).c_str())); 825 } 826 status_t err = mHTTPDataSource->connect(url, &headers); 827 828 if (err != OK) { 829 return err; 830 } 831 832 *source = mHTTPDataSource; 833 } 834 } 835 836 status_t getSizeErr = (*source)->getSize(&size); 837 if (getSizeErr != OK) { 838 size = 65536; 839 } 840 841 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 842 if (*out == NULL) { 843 buffer->setRange(0, 0); 844 } 845 846 ssize_t bytesRead = 0; 847 // adjust range_length if only reading partial block 848 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 849 range_length = buffer->size() + block_size; 850 } 851 for (;;) { 852 // Only resize when we don't know the size. 853 size_t bufferRemaining = buffer->capacity() - buffer->size(); 854 if (bufferRemaining == 0 && getSizeErr != OK) { 855 bufferRemaining = 32768; 856 857 ALOGV("increasing download buffer to %zu bytes", 858 buffer->size() + bufferRemaining); 859 860 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 861 memcpy(copy->data(), buffer->data(), buffer->size()); 862 copy->setRange(0, buffer->size()); 863 864 buffer = copy; 865 } 866 867 size_t maxBytesToRead = bufferRemaining; 868 if (range_length >= 0) { 869 int64_t bytesLeftInRange = range_length - buffer->size(); 870 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 871 maxBytesToRead = bytesLeftInRange; 872 873 if (bytesLeftInRange == 0) { 874 break; 875 } 876 } 877 } 878 879 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 880 // to help us break out of the loop. 881 ssize_t n = (*source)->readAt( 882 buffer->size(), buffer->data() + buffer->size(), 883 maxBytesToRead); 884 885 if (n < 0) { 886 return n; 887 } 888 889 if (n == 0) { 890 break; 891 } 892 893 buffer->setRange(0, buffer->size() + (size_t)n); 894 bytesRead += n; 895 } 896 897 *out = buffer; 898 if (actualUrl != NULL) { 899 *actualUrl = (*source)->getUri(); 900 if (actualUrl->isEmpty()) { 901 *actualUrl = url; 902 } 903 } 904 905 return bytesRead; 906} 907 908sp<M3UParser> LiveSession::fetchPlaylist( 909 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 910 ALOGV("fetchPlaylist '%s'", url); 911 912 *unchanged = false; 913 914 sp<ABuffer> buffer; 915 String8 actualUrl; 916 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 917 918 if (err <= 0) { 919 return NULL; 920 } 921 922 // MD5 functionality is not available on the simulator, treat all 923 // playlists as changed. 924 925#if defined(HAVE_ANDROID_OS) 926 uint8_t hash[16]; 927 928 MD5_CTX m; 929 MD5_Init(&m); 930 MD5_Update(&m, buffer->data(), buffer->size()); 931 932 MD5_Final(hash, &m); 933 934 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 935 // playlist unchanged 936 *unchanged = true; 937 938 return NULL; 939 } 940 941 if (curPlaylistHash != NULL) { 942 memcpy(curPlaylistHash, hash, sizeof(hash)); 943 } 944#endif 945 946 sp<M3UParser> playlist = 947 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 948 949 if (playlist->initCheck() != OK) { 950 ALOGE("failed to parse .m3u8 playlist"); 951 952 return NULL; 953 } 954 955 return playlist; 956} 957 958static double uniformRand() { 959 return (double)rand() / RAND_MAX; 960} 961 962size_t LiveSession::getBandwidthIndex() { 963 if (mBandwidthItems.size() == 0) { 964 return 0; 965 } 966 967#if 1 968 char value[PROPERTY_VALUE_MAX]; 969 ssize_t index = -1; 970 if (property_get("media.httplive.bw-index", value, NULL)) { 971 char *end; 972 index = strtol(value, &end, 10); 973 CHECK(end > value && *end == '\0'); 974 975 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 976 index = mBandwidthItems.size() - 1; 977 } 978 } 979 980 if (index < 0) { 981 int32_t bandwidthBps; 982 if (mHTTPDataSource != NULL 983 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 984 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 985 } else { 986 ALOGV("no bandwidth estimate."); 987 return 0; // Pick the lowest bandwidth stream by default. 988 } 989 990 char value[PROPERTY_VALUE_MAX]; 991 if (property_get("media.httplive.max-bw", value, NULL)) { 992 char *end; 993 long maxBw = strtoul(value, &end, 10); 994 if (end > value && *end == '\0') { 995 if (maxBw > 0 && bandwidthBps > maxBw) { 996 ALOGV("bandwidth capped to %ld bps", maxBw); 997 bandwidthBps = maxBw; 998 } 999 } 1000 } 1001 1002 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 1003 1004 index = mBandwidthItems.size() - 1; 1005 while (index > 0) { 1006 // consider only 80% of the available bandwidth, but if we are switching up, 1007 // be even more conservative (70%) to avoid overestimating and immediately 1008 // switching back. 1009 size_t adjustedBandwidthBps = bandwidthBps; 1010 if (index > mCurBandwidthIndex) { 1011 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 1012 } else { 1013 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 1014 } 1015 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 1016 break; 1017 } 1018 --index; 1019 } 1020 } 1021#elif 0 1022 // Change bandwidth at random() 1023 size_t index = uniformRand() * mBandwidthItems.size(); 1024#elif 0 1025 // There's a 50% chance to stay on the current bandwidth and 1026 // a 50% chance to switch to the next higher bandwidth (wrapping around 1027 // to lowest) 1028 const size_t kMinIndex = 0; 1029 1030 static ssize_t mCurBandwidthIndex = -1; 1031 1032 size_t index; 1033 if (mCurBandwidthIndex < 0) { 1034 index = kMinIndex; 1035 } else if (uniformRand() < 0.5) { 1036 index = (size_t)mCurBandwidthIndex; 1037 } else { 1038 index = mCurBandwidthIndex + 1; 1039 if (index == mBandwidthItems.size()) { 1040 index = kMinIndex; 1041 } 1042 } 1043 mCurBandwidthIndex = index; 1044#elif 0 1045 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1046 1047 size_t index = mBandwidthItems.size() - 1; 1048 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1049 --index; 1050 } 1051#elif 1 1052 char value[PROPERTY_VALUE_MAX]; 1053 size_t index; 1054 if (property_get("media.httplive.bw-index", value, NULL)) { 1055 char *end; 1056 index = strtoul(value, &end, 10); 1057 CHECK(end > value && *end == '\0'); 1058 1059 if (index >= mBandwidthItems.size()) { 1060 index = mBandwidthItems.size() - 1; 1061 } 1062 } else { 1063 index = 0; 1064 } 1065#else 1066 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1067#endif 1068 1069 CHECK_GE(index, 0); 1070 1071 return index; 1072} 1073 1074int64_t LiveSession::latestMediaSegmentStartTimeUs() { 1075 sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta(); 1076 int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1; 1077 if (audioMeta != NULL) { 1078 audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs); 1079 } 1080 1081 sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta(); 1082 if (videoMeta != NULL 1083 && videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) { 1084 if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) { 1085 minSegmentStartTimeUs = videoSegmentStartTimeUs; 1086 } 1087 1088 } 1089 return minSegmentStartTimeUs; 1090} 1091 1092status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1093 int64_t timeUs; 1094 CHECK(msg->findInt64("timeUs", &timeUs)); 1095 1096 if (!mReconfigurationInProgress) { 1097 changeConfiguration(timeUs, getBandwidthIndex()); 1098 } 1099 1100 return OK; 1101} 1102 1103status_t LiveSession::getDuration(int64_t *durationUs) const { 1104 int64_t maxDurationUs = 0ll; 1105 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1106 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1107 1108 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 1109 maxDurationUs = fetcherDurationUs; 1110 } 1111 } 1112 1113 *durationUs = maxDurationUs; 1114 1115 return OK; 1116} 1117 1118bool LiveSession::isSeekable() const { 1119 int64_t durationUs; 1120 return getDuration(&durationUs) == OK && durationUs >= 0; 1121} 1122 1123bool LiveSession::hasDynamicDuration() const { 1124 return false; 1125} 1126 1127size_t LiveSession::getTrackCount() const { 1128 if (mPlaylist == NULL) { 1129 return 0; 1130 } else { 1131 return mPlaylist->getTrackCount(); 1132 } 1133} 1134 1135sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1136 if (mPlaylist == NULL) { 1137 return NULL; 1138 } else { 1139 return mPlaylist->getTrackInfo(trackIndex); 1140 } 1141} 1142 1143status_t LiveSession::selectTrack(size_t index, bool select) { 1144 if (mPlaylist == NULL) { 1145 return INVALID_OPERATION; 1146 } 1147 1148 ++mSubtitleGeneration; 1149 status_t err = mPlaylist->selectTrack(index, select); 1150 if (err == OK) { 1151 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1152 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1153 msg->setInt32("pickTrack", select); 1154 msg->post(); 1155 } 1156 return err; 1157} 1158 1159bool LiveSession::canSwitchUp() { 1160 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1161 status_t err = OK; 1162 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1163 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1164 int64_t dur = source->getBufferedDurationUs(&err); 1165 if (err == OK && dur > 10000000) { 1166 return true; 1167 } 1168 } 1169 return false; 1170} 1171 1172void LiveSession::changeConfiguration( 1173 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1174 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1175 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1176 cancelBandwidthSwitch(); 1177 1178 CHECK(!mReconfigurationInProgress); 1179 mReconfigurationInProgress = true; 1180 1181 mCurBandwidthIndex = bandwidthIndex; 1182 1183 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1184 timeUs, bandwidthIndex, pickTrack); 1185 1186 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1187 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1188 1189 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1190 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1191 1192 AString URIs[kMaxStreams]; 1193 for (size_t i = 0; i < kMaxStreams; ++i) { 1194 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1195 streamMask |= indexToType(i); 1196 } 1197 } 1198 1199 // Step 1, stop and discard fetchers that are no longer needed. 1200 // Pause those that we'll reuse. 1201 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1202 const AString &uri = mFetcherInfos.keyAt(i); 1203 1204 bool discardFetcher = true; 1205 1206 // If we're seeking all current fetchers are discarded. 1207 if (timeUs < 0ll) { 1208 // delay fetcher removal if not picking tracks 1209 discardFetcher = pickTrack; 1210 1211 for (size_t j = 0; j < kMaxStreams; ++j) { 1212 StreamType type = indexToType(j); 1213 if ((streamMask & type) && uri == URIs[j]) { 1214 resumeMask |= type; 1215 streamMask &= ~type; 1216 discardFetcher = false; 1217 } 1218 } 1219 } 1220 1221 if (discardFetcher) { 1222 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1223 } else { 1224 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1225 } 1226 } 1227 1228 sp<AMessage> msg; 1229 if (timeUs < 0ll) { 1230 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1231 msg = new AMessage(kWhatChangeConfiguration3, id()); 1232 } else { 1233 msg = new AMessage(kWhatChangeConfiguration2, id()); 1234 } 1235 msg->setInt32("streamMask", streamMask); 1236 msg->setInt32("resumeMask", resumeMask); 1237 msg->setInt32("pickTrack", pickTrack); 1238 msg->setInt64("timeUs", timeUs); 1239 for (size_t i = 0; i < kMaxStreams; ++i) { 1240 if ((streamMask | resumeMask) & indexToType(i)) { 1241 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1242 } 1243 } 1244 1245 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1246 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1247 // fetchers have completed their asynchronous operation, we'll post 1248 // mContinuation, which then is handled below in onChangeConfiguration2. 1249 mContinuationCounter = mFetcherInfos.size(); 1250 mContinuation = msg; 1251 1252 if (mContinuationCounter == 0) { 1253 msg->post(); 1254 1255 if (mSeekReplyID != 0) { 1256 CHECK(mSeekReply != NULL); 1257 mSeekReply->postReply(mSeekReplyID); 1258 } 1259 } 1260} 1261 1262void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1263 if (!mReconfigurationInProgress) { 1264 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1265 msg->findInt32("pickTrack", &pickTrack); 1266 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1267 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1268 } else { 1269 msg->post(1000000ll); // retry in 1 sec 1270 } 1271} 1272 1273void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1274 mContinuation.clear(); 1275 1276 // All fetchers are either suspended or have been removed now. 1277 1278 uint32_t streamMask, resumeMask; 1279 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1280 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1281 1282 // currently onChangeConfiguration2 is only called for seeking; 1283 // remove the following CHECK if using it else where. 1284 CHECK_EQ(resumeMask, 0); 1285 streamMask |= resumeMask; 1286 1287 AString URIs[kMaxStreams]; 1288 for (size_t i = 0; i < kMaxStreams; ++i) { 1289 if (streamMask & indexToType(i)) { 1290 const AString &uriKey = mStreams[i].uriKey(); 1291 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1292 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1293 } 1294 } 1295 1296 // Determine which decoders to shutdown on the player side, 1297 // a decoder has to be shutdown if either 1298 // 1) its streamtype was active before but now longer isn't. 1299 // or 1300 // 2) its streamtype was already active and still is but the URI 1301 // has changed. 1302 uint32_t changedMask = 0; 1303 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1304 if (((mStreamMask & streamMask & indexToType(i)) 1305 && !(URIs[i] == mStreams[i].mUri)) 1306 || (mStreamMask & ~streamMask & indexToType(i))) { 1307 changedMask |= indexToType(i); 1308 } 1309 } 1310 1311 if (changedMask == 0) { 1312 // If nothing changed as far as the audio/video decoders 1313 // are concerned we can proceed. 1314 onChangeConfiguration3(msg); 1315 return; 1316 } 1317 1318 // Something changed, inform the player which will shutdown the 1319 // corresponding decoders and will post the reply once that's done. 1320 // Handling the reply will continue executing below in 1321 // onChangeConfiguration3. 1322 sp<AMessage> notify = mNotify->dup(); 1323 notify->setInt32("what", kWhatStreamsChanged); 1324 notify->setInt32("changedMask", changedMask); 1325 1326 msg->setWhat(kWhatChangeConfiguration3); 1327 msg->setTarget(id()); 1328 1329 notify->setMessage("reply", msg); 1330 notify->post(); 1331} 1332 1333void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1334 mContinuation.clear(); 1335 // All remaining fetchers are still suspended, the player has shutdown 1336 // any decoders that needed it. 1337 1338 uint32_t streamMask, resumeMask; 1339 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1340 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1341 1342 int64_t timeUs; 1343 int32_t pickTrack; 1344 bool switching = false; 1345 CHECK(msg->findInt64("timeUs", &timeUs)); 1346 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1347 1348 if (timeUs < 0ll) { 1349 if (!pickTrack) { 1350 switching = true; 1351 } 1352 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1353 } else { 1354 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1355 } 1356 1357 for (size_t i = 0; i < kMaxStreams; ++i) { 1358 if (streamMask & indexToType(i)) { 1359 if (switching) { 1360 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1361 } else { 1362 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1363 } 1364 } 1365 } 1366 1367 mNewStreamMask = streamMask | resumeMask; 1368 if (switching) { 1369 mSwapMask = mStreamMask & ~resumeMask; 1370 } 1371 1372 // Of all existing fetchers: 1373 // * Resume fetchers that are still needed and assign them original packet sources. 1374 // * Mark otherwise unneeded fetchers for removal. 1375 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1376 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1377 const AString &uri = mFetcherInfos.keyAt(i); 1378 1379 sp<AnotherPacketSource> sources[kMaxStreams]; 1380 for (size_t j = 0; j < kMaxStreams; ++j) { 1381 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1382 sources[j] = mPacketSources.valueFor(indexToType(j)); 1383 1384 if (j != kSubtitleIndex) { 1385 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1386 sp<AnotherPacketSource> discontinuityQueue; 1387 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1388 discontinuityQueue->queueDiscontinuity( 1389 ATSParser::DISCONTINUITY_NONE, 1390 NULL, 1391 true); 1392 } 1393 } 1394 } 1395 1396 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1397 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1398 || sources[kSubtitleIndex] != NULL) { 1399 info.mFetcher->startAsync( 1400 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1401 } else { 1402 info.mToBeRemoved = true; 1403 } 1404 } 1405 1406 // streamMask now only contains the types that need a new fetcher created. 1407 1408 if (streamMask != 0) { 1409 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1410 } 1411 1412 // Find out when the original fetchers have buffered up to and start the new fetchers 1413 // at a later timestamp. 1414 for (size_t i = 0; i < kMaxStreams; i++) { 1415 if (!(indexToType(i) & streamMask)) { 1416 continue; 1417 } 1418 1419 AString uri; 1420 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1421 1422 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1423 CHECK(fetcher != NULL); 1424 1425 int32_t latestSeq = -1; 1426 int64_t startTimeUs = -1; 1427 int64_t segmentStartTimeUs = -1ll; 1428 int32_t discontinuitySeq = -1; 1429 sp<AnotherPacketSource> sources[kMaxStreams]; 1430 1431 if (i == kSubtitleIndex) { 1432 segmentStartTimeUs = latestMediaSegmentStartTimeUs(); 1433 } 1434 1435 // TRICKY: looping from i as earlier streams are already removed from streamMask 1436 for (size_t j = i; j < kMaxStreams; ++j) { 1437 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1438 if ((streamMask & indexToType(j)) && uri == streamUri) { 1439 sources[j] = mPacketSources.valueFor(indexToType(j)); 1440 1441 if (timeUs >= 0) { 1442 sources[j]->clear(); 1443 startTimeUs = timeUs; 1444 1445 sp<AnotherPacketSource> discontinuityQueue; 1446 sp<AMessage> extra = new AMessage; 1447 extra->setInt64("timeUs", timeUs); 1448 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1449 discontinuityQueue->queueDiscontinuity( 1450 ATSParser::DISCONTINUITY_SEEK, extra, true); 1451 } else { 1452 int32_t type; 1453 int64_t srcSegmentStartTimeUs; 1454 sp<AMessage> meta; 1455 if (pickTrack) { 1456 // selecting 1457 meta = sources[j]->getLatestDequeuedMeta(); 1458 } else { 1459 // adapting 1460 meta = sources[j]->getLatestEnqueuedMeta(); 1461 } 1462 1463 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1464 int64_t tmpUs; 1465 CHECK(meta->findInt64("timeUs", &tmpUs)); 1466 if (startTimeUs < 0 || tmpUs < startTimeUs) { 1467 startTimeUs = tmpUs; 1468 } 1469 1470 CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); 1471 if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { 1472 segmentStartTimeUs = tmpUs; 1473 } 1474 1475 int32_t seq; 1476 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1477 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1478 discontinuitySeq = seq; 1479 } 1480 } 1481 1482 if (pickTrack) { 1483 // selecting track, queue discontinuities before content 1484 sources[j]->clear(); 1485 if (j == kSubtitleIndex) { 1486 break; 1487 } 1488 sp<AnotherPacketSource> discontinuityQueue; 1489 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1490 discontinuityQueue->queueDiscontinuity( 1491 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1492 } else { 1493 // adapting, queue discontinuities after resume 1494 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1495 sources[j]->clear(); 1496 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1497 if (extraStreams & indexToType(j)) { 1498 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1499 } 1500 } 1501 } 1502 1503 streamMask &= ~indexToType(j); 1504 } 1505 } 1506 1507 fetcher->startAsync( 1508 sources[kAudioIndex], 1509 sources[kVideoIndex], 1510 sources[kSubtitleIndex], 1511 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1512 segmentStartTimeUs, 1513 discontinuitySeq, 1514 switching); 1515 } 1516 1517 // All fetchers have now been started, the configuration change 1518 // has completed. 1519 1520 cancelCheckBandwidthEvent(); 1521 scheduleCheckBandwidthEvent(); 1522 1523 ALOGV("XXX configuration change completed."); 1524 mReconfigurationInProgress = false; 1525 if (switching) { 1526 mSwitchInProgress = true; 1527 } else { 1528 mStreamMask = mNewStreamMask; 1529 } 1530 1531 if (mDisconnectReplyID != 0) { 1532 finishDisconnect(); 1533 } 1534} 1535 1536void LiveSession::onSwapped(const sp<AMessage> &msg) { 1537 int32_t switchGeneration; 1538 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1539 if (switchGeneration != mSwitchGeneration) { 1540 return; 1541 } 1542 1543 int32_t stream; 1544 CHECK(msg->findInt32("stream", &stream)); 1545 1546 ssize_t idx = typeToIndex(stream); 1547 CHECK(idx >= 0); 1548 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1549 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1550 } 1551 mStreams[idx].mUri = mStreams[idx].mNewUri; 1552 mStreams[idx].mNewUri.clear(); 1553 1554 mSwapMask &= ~stream; 1555 if (mSwapMask != 0) { 1556 return; 1557 } 1558 1559 // Check if new variant contains extra streams. 1560 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1561 while (extraStreams) { 1562 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1563 swapPacketSource(extraStream); 1564 extraStreams &= ~extraStream; 1565 1566 idx = typeToIndex(extraStream); 1567 CHECK(idx >= 0); 1568 if (mStreams[idx].mNewUri.empty()) { 1569 ALOGW("swapping extra stream type %d %s to empty stream", 1570 extraStream, mStreams[idx].mUri.c_str()); 1571 } 1572 mStreams[idx].mUri = mStreams[idx].mNewUri; 1573 mStreams[idx].mNewUri.clear(); 1574 } 1575 1576 tryToFinishBandwidthSwitch(); 1577} 1578 1579void LiveSession::onCheckSwitchDown() { 1580 if (mSwitchDownMonitor == NULL) { 1581 return; 1582 } 1583 1584 for (size_t i = 0; i < kMaxStreams; ++i) { 1585 int32_t targetDuration; 1586 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1587 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1588 1589 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1590 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1591 int64_t targetDurationUs = targetDuration * 1000000ll; 1592 1593 if (bufferedDurationUs < targetDurationUs / 3) { 1594 (new AMessage(kWhatSwitchDown, id()))->post(); 1595 break; 1596 } 1597 } 1598 } 1599 1600 mSwitchDownMonitor->post(1000000ll); 1601} 1602 1603void LiveSession::onSwitchDown() { 1604 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1605 return; 1606 } 1607 1608 ssize_t bandwidthIndex = getBandwidthIndex(); 1609 if (bandwidthIndex < mCurBandwidthIndex) { 1610 changeConfiguration(-1, bandwidthIndex, false); 1611 return; 1612 } 1613 1614 changeConfiguration(-1, mCurBandwidthIndex - 1, false); 1615} 1616 1617// Mark switch done when: 1618// 1. all old buffers are swapped out 1619void LiveSession::tryToFinishBandwidthSwitch() { 1620 if (!mSwitchInProgress) { 1621 return; 1622 } 1623 1624 bool needToRemoveFetchers = false; 1625 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1626 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1627 needToRemoveFetchers = true; 1628 break; 1629 } 1630 } 1631 1632 if (!needToRemoveFetchers && mSwapMask == 0) { 1633 ALOGI("mSwitchInProgress = false"); 1634 mStreamMask = mNewStreamMask; 1635 mSwitchInProgress = false; 1636 } 1637} 1638 1639void LiveSession::scheduleCheckBandwidthEvent() { 1640 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1641 msg->setInt32("generation", mCheckBandwidthGeneration); 1642 msg->post(10000000ll); 1643} 1644 1645void LiveSession::cancelCheckBandwidthEvent() { 1646 ++mCheckBandwidthGeneration; 1647} 1648 1649void LiveSession::cancelBandwidthSwitch() { 1650 Mutex::Autolock lock(mSwapMutex); 1651 mSwitchGeneration++; 1652 mSwitchInProgress = false; 1653 mSwapMask = 0; 1654 1655 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1656 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1657 if (info.mToBeRemoved) { 1658 info.mToBeRemoved = false; 1659 } 1660 } 1661 1662 for (size_t i = 0; i < kMaxStreams; ++i) { 1663 if (!mStreams[i].mNewUri.empty()) { 1664 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1665 if (j < 0) { 1666 mStreams[i].mNewUri.clear(); 1667 continue; 1668 } 1669 1670 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1671 info.mFetcher->stopAsync(); 1672 mFetcherInfos.removeItemsAt(j); 1673 mStreams[i].mNewUri.clear(); 1674 } 1675 } 1676} 1677 1678bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1679 if (mReconfigurationInProgress || mSwitchInProgress) { 1680 return false; 1681 } 1682 1683 if (mCurBandwidthIndex < 0) { 1684 return true; 1685 } 1686 1687 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1688 return false; 1689 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1690 return canSwitchUp(); 1691 } else { 1692 return true; 1693 } 1694} 1695 1696void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { 1697 size_t bandwidthIndex = getBandwidthIndex(); 1698 if (canSwitchBandwidthTo(bandwidthIndex)) { 1699 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1700 } else { 1701 // Come back and check again 10 seconds later in case there is nothing to do now. 1702 // If we DO change configuration, once that completes it'll schedule a new 1703 // check bandwidth event with an incremented mCheckBandwidthGeneration. 1704 msg->post(10000000ll); 1705 } 1706} 1707 1708void LiveSession::postPrepared(status_t err) { 1709 CHECK(mInPreparationPhase); 1710 1711 sp<AMessage> notify = mNotify->dup(); 1712 if (err == OK || err == ERROR_END_OF_STREAM) { 1713 notify->setInt32("what", kWhatPrepared); 1714 } else { 1715 notify->setInt32("what", kWhatPreparationFailed); 1716 notify->setInt32("err", err); 1717 } 1718 1719 notify->post(); 1720 1721 mInPreparationPhase = false; 1722 1723 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1724 mSwitchDownMonitor->post(); 1725} 1726 1727} // namespace android 1728 1729