LiveSession.cpp revision 73d2847af14cdd5fdf8bd1ac80fb7ddf9ae7d9a7
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52LiveSession::LiveSession( 53 const sp<AMessage> ¬ify, uint32_t flags, 54 const sp<IMediaHTTPService> &httpService) 55 : mNotify(notify), 56 mFlags(flags), 57 mHTTPService(httpService), 58 mInPreparationPhase(true), 59 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 60 mCurBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mLastDequeuedTimeUs(0ll), 67 mRealTimeBaseUs(0ll), 68 mReconfigurationInProgress(false), 69 mSwitchInProgress(false), 70 mDisconnectReplyID(0), 71 mSeekReplyID(0), 72 mFirstTimeUsValid(false), 73 mFirstTimeUs(0), 74 mLastSeekTimeUs(0) { 75 76 mStreams[kAudioIndex] = StreamItem("audio"); 77 mStreams[kVideoIndex] = StreamItem("video"); 78 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 79 80 for (size_t i = 0; i < kMaxStreams; ++i) { 81 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 82 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 84 mBuffering[i] = false; 85 } 86} 87 88LiveSession::~LiveSession() { 89} 90 91sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 92 ABuffer *discontinuity = new ABuffer(0); 93 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 94 discontinuity->meta()->setInt32("swapPacketSource", swap); 95 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 96 discontinuity->meta()->setInt64("timeUs", -1); 97 return discontinuity; 98} 99 100void LiveSession::swapPacketSource(StreamType stream) { 101 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 102 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 103 sp<AnotherPacketSource> tmp = aps; 104 aps = aps2; 105 aps2 = tmp; 106 aps2->clear(); 107} 108 109status_t LiveSession::dequeueAccessUnit( 110 StreamType stream, sp<ABuffer> *accessUnit) { 111 if (!(mStreamMask & stream)) { 112 // return -EWOULDBLOCK to avoid halting the decoder 113 // when switching between audio/video and audio only. 114 return -EWOULDBLOCK; 115 } 116 117 status_t finalResult; 118 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 119 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 120 discontinuityQueue->dequeueAccessUnit(accessUnit); 121 // seeking, track switching 122 sp<AMessage> extra; 123 int64_t timeUs; 124 if ((*accessUnit)->meta()->findMessage("extra", &extra) 125 && extra != NULL 126 && extra->findInt64("timeUs", &timeUs)) { 127 // seeking only 128 mLastSeekTimeUs = timeUs; 129 mDiscontinuityOffsetTimesUs.clear(); 130 mDiscontinuityAbsStartTimesUs.clear(); 131 } 132 return INFO_DISCONTINUITY; 133 } 134 135 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 136 137 ssize_t idx = typeToIndex(stream); 138 if (!packetSource->hasBufferAvailable(&finalResult)) { 139 if (finalResult == OK) { 140 mBuffering[idx] = true; 141 return -EAGAIN; 142 } else { 143 return finalResult; 144 } 145 } 146 147 if (mBuffering[idx]) { 148 if (mSwitchInProgress 149 || packetSource->isFinished(0) 150 || packetSource->getEstimatedDurationUs() > 10000000ll) { 151 mBuffering[idx] = false; 152 } 153 } 154 155 if (mBuffering[idx]) { 156 return -EAGAIN; 157 } 158 159 // wait for counterpart 160 sp<AnotherPacketSource> otherSource; 161 uint32_t mask = mNewStreamMask & mStreamMask; 162 uint32_t fetchersMask = 0; 163 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 164 uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask(); 165 fetchersMask |= fetcherMask; 166 } 167 mask &= fetchersMask; 168 if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) { 169 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 170 } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) { 171 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 172 } 173 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 174 return finalResult == OK ? -EAGAIN : finalResult; 175 } 176 177 status_t err = packetSource->dequeueAccessUnit(accessUnit); 178 179 size_t streamIdx; 180 const char *streamStr; 181 switch (stream) { 182 case STREAMTYPE_AUDIO: 183 streamIdx = kAudioIndex; 184 streamStr = "audio"; 185 break; 186 case STREAMTYPE_VIDEO: 187 streamIdx = kVideoIndex; 188 streamStr = "video"; 189 break; 190 case STREAMTYPE_SUBTITLES: 191 streamIdx = kSubtitleIndex; 192 streamStr = "subs"; 193 break; 194 default: 195 TRESPASS(); 196 } 197 198 StreamItem& strm = mStreams[streamIdx]; 199 if (err == INFO_DISCONTINUITY) { 200 // adaptive streaming, discontinuities in the playlist 201 int32_t type; 202 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 203 204 sp<AMessage> extra; 205 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 206 extra.clear(); 207 } 208 209 ALOGI("[%s] read discontinuity of type %d, extra = %s", 210 streamStr, 211 type, 212 extra == NULL ? "NULL" : extra->debugString().c_str()); 213 214 int32_t swap; 215 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 216 int32_t switchGeneration; 217 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 218 { 219 Mutex::Autolock lock(mSwapMutex); 220 if (switchGeneration == mSwitchGeneration) { 221 swapPacketSource(stream); 222 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 223 msg->setInt32("stream", stream); 224 msg->setInt32("switchGeneration", switchGeneration); 225 msg->post(); 226 } 227 } 228 } else { 229 size_t seq = strm.mCurDiscontinuitySeq; 230 int64_t offsetTimeUs; 231 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 232 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 233 } else { 234 offsetTimeUs = 0; 235 } 236 237 seq += 1; 238 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 239 int64_t firstTimeUs; 240 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 241 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 242 offsetTimeUs += strm.mLastSampleDurationUs; 243 } else { 244 offsetTimeUs += strm.mLastSampleDurationUs; 245 } 246 247 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 248 } 249 } else if (err == OK) { 250 251 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 252 int64_t timeUs; 253 int32_t discontinuitySeq = 0; 254 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 255 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 256 strm.mCurDiscontinuitySeq = discontinuitySeq; 257 258 int32_t discard = 0; 259 int64_t firstTimeUs; 260 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 261 int64_t durUs; // approximate sample duration 262 if (timeUs > strm.mLastDequeuedTimeUs) { 263 durUs = timeUs - strm.mLastDequeuedTimeUs; 264 } else { 265 durUs = strm.mLastDequeuedTimeUs - timeUs; 266 } 267 strm.mLastSampleDurationUs = durUs; 268 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 269 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 270 firstTimeUs = timeUs; 271 } else { 272 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 273 firstTimeUs = timeUs; 274 } 275 276 strm.mLastDequeuedTimeUs = timeUs; 277 if (timeUs >= firstTimeUs) { 278 timeUs -= firstTimeUs; 279 } else { 280 timeUs = 0; 281 } 282 timeUs += mLastSeekTimeUs; 283 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 284 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 285 } 286 287 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 288 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 289 mLastDequeuedTimeUs = timeUs; 290 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 291 } else if (stream == STREAMTYPE_SUBTITLES) { 292 (*accessUnit)->meta()->setInt32( 293 "trackIndex", mPlaylist->getSelectedIndex()); 294 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 295 } 296 } else { 297 ALOGI("[%s] encountered error %d", streamStr, err); 298 } 299 300 return err; 301} 302 303status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 304 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 305 if (!(mStreamMask & stream)) { 306 return UNKNOWN_ERROR; 307 } 308 309 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 310 311 sp<MetaData> meta = packetSource->getFormat(); 312 313 if (meta == NULL) { 314 return -EAGAIN; 315 } 316 317 return convertMetaDataToMessage(meta, format); 318} 319 320void LiveSession::connectAsync( 321 const char *url, const KeyedVector<String8, String8> *headers) { 322 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 323 msg->setString("url", url); 324 325 if (headers != NULL) { 326 msg->setPointer( 327 "headers", 328 new KeyedVector<String8, String8>(*headers)); 329 } 330 331 msg->post(); 332} 333 334status_t LiveSession::disconnect() { 335 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 336 337 sp<AMessage> response; 338 status_t err = msg->postAndAwaitResponse(&response); 339 340 return err; 341} 342 343status_t LiveSession::seekTo(int64_t timeUs) { 344 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 345 msg->setInt64("timeUs", timeUs); 346 347 sp<AMessage> response; 348 status_t err = msg->postAndAwaitResponse(&response); 349 350 uint32_t replyID; 351 CHECK(response == mSeekReply && 0 != mSeekReplyID); 352 mSeekReply.clear(); 353 mSeekReplyID = 0; 354 return err; 355} 356 357void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 358 switch (msg->what()) { 359 case kWhatConnect: 360 { 361 onConnect(msg); 362 break; 363 } 364 365 case kWhatDisconnect: 366 { 367 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 368 369 if (mReconfigurationInProgress) { 370 break; 371 } 372 373 finishDisconnect(); 374 break; 375 } 376 377 case kWhatSeek: 378 { 379 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 380 381 status_t err = onSeek(msg); 382 383 mSeekReply = new AMessage; 384 mSeekReply->setInt32("err", err); 385 break; 386 } 387 388 case kWhatFetcherNotify: 389 { 390 int32_t what; 391 CHECK(msg->findInt32("what", &what)); 392 393 switch (what) { 394 case PlaylistFetcher::kWhatStarted: 395 break; 396 case PlaylistFetcher::kWhatPaused: 397 case PlaylistFetcher::kWhatStopped: 398 { 399 if (what == PlaylistFetcher::kWhatStopped) { 400 AString uri; 401 CHECK(msg->findString("uri", &uri)); 402 if (mFetcherInfos.removeItem(uri) < 0) { 403 // ignore duplicated kWhatStopped messages. 404 break; 405 } 406 407 if (mSwitchInProgress) { 408 tryToFinishBandwidthSwitch(); 409 } 410 } 411 412 if (mContinuation != NULL) { 413 CHECK_GT(mContinuationCounter, 0); 414 if (--mContinuationCounter == 0) { 415 mContinuation->post(); 416 417 if (mSeekReplyID != 0) { 418 CHECK(mSeekReply != NULL); 419 mSeekReply->postReply(mSeekReplyID); 420 } 421 } 422 } 423 break; 424 } 425 426 case PlaylistFetcher::kWhatDurationUpdate: 427 { 428 AString uri; 429 CHECK(msg->findString("uri", &uri)); 430 431 int64_t durationUs; 432 CHECK(msg->findInt64("durationUs", &durationUs)); 433 434 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 435 info->mDurationUs = durationUs; 436 break; 437 } 438 439 case PlaylistFetcher::kWhatError: 440 { 441 status_t err; 442 CHECK(msg->findInt32("err", &err)); 443 444 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 445 446 if (mInPreparationPhase) { 447 postPrepared(err); 448 } 449 450 cancelBandwidthSwitch(); 451 452 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 453 454 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 455 456 mPacketSources.valueFor( 457 STREAMTYPE_SUBTITLES)->signalEOS(err); 458 459 sp<AMessage> notify = mNotify->dup(); 460 notify->setInt32("what", kWhatError); 461 notify->setInt32("err", err); 462 notify->post(); 463 break; 464 } 465 466 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 467 { 468 AString uri; 469 CHECK(msg->findString("uri", &uri)); 470 471 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 472 info->mIsPrepared = true; 473 474 if (mInPreparationPhase) { 475 bool allFetchersPrepared = true; 476 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 477 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 478 allFetchersPrepared = false; 479 break; 480 } 481 } 482 483 if (allFetchersPrepared) { 484 postPrepared(OK); 485 } 486 } 487 break; 488 } 489 490 case PlaylistFetcher::kWhatStartedAt: 491 { 492 int32_t switchGeneration; 493 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 494 495 if (switchGeneration != mSwitchGeneration) { 496 break; 497 } 498 499 // Resume fetcher for the original variant; the resumed fetcher should 500 // continue until the timestamps found in msg, which is stored by the 501 // new fetcher to indicate where the new variant has started buffering. 502 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 503 const FetcherInfo info = mFetcherInfos.valueAt(i); 504 if (info.mToBeRemoved) { 505 info.mFetcher->resumeUntilAsync(msg); 506 } 507 } 508 break; 509 } 510 511 default: 512 TRESPASS(); 513 } 514 515 break; 516 } 517 518 case kWhatCheckBandwidth: 519 { 520 int32_t generation; 521 CHECK(msg->findInt32("generation", &generation)); 522 523 if (generation != mCheckBandwidthGeneration) { 524 break; 525 } 526 527 onCheckBandwidth(msg); 528 break; 529 } 530 531 case kWhatChangeConfiguration: 532 { 533 onChangeConfiguration(msg); 534 break; 535 } 536 537 case kWhatChangeConfiguration2: 538 { 539 onChangeConfiguration2(msg); 540 break; 541 } 542 543 case kWhatChangeConfiguration3: 544 { 545 onChangeConfiguration3(msg); 546 break; 547 } 548 549 case kWhatFinishDisconnect2: 550 { 551 onFinishDisconnect2(); 552 break; 553 } 554 555 case kWhatSwapped: 556 { 557 onSwapped(msg); 558 break; 559 } 560 561 case kWhatCheckSwitchDown: 562 { 563 onCheckSwitchDown(); 564 break; 565 } 566 567 case kWhatSwitchDown: 568 { 569 onSwitchDown(); 570 break; 571 } 572 573 default: 574 TRESPASS(); 575 break; 576 } 577} 578 579// static 580int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 581 if (a->mBandwidth < b->mBandwidth) { 582 return -1; 583 } else if (a->mBandwidth == b->mBandwidth) { 584 return 0; 585 } 586 587 return 1; 588} 589 590// static 591LiveSession::StreamType LiveSession::indexToType(int idx) { 592 CHECK(idx >= 0 && idx < kMaxStreams); 593 return (StreamType)(1 << idx); 594} 595 596// static 597ssize_t LiveSession::typeToIndex(int32_t type) { 598 switch (type) { 599 case STREAMTYPE_AUDIO: 600 return 0; 601 case STREAMTYPE_VIDEO: 602 return 1; 603 case STREAMTYPE_SUBTITLES: 604 return 2; 605 default: 606 return -1; 607 }; 608 return -1; 609} 610 611void LiveSession::onConnect(const sp<AMessage> &msg) { 612 AString url; 613 CHECK(msg->findString("url", &url)); 614 615 KeyedVector<String8, String8> *headers = NULL; 616 if (!msg->findPointer("headers", (void **)&headers)) { 617 mExtraHeaders.clear(); 618 } else { 619 mExtraHeaders = *headers; 620 621 delete headers; 622 headers = NULL; 623 } 624 625 // TODO currently we don't know if we are coming here from incognito mode 626 ALOGI("onConnect %s", uriDebugString(url).c_str()); 627 628 mMasterURL = url; 629 630 bool dummy; 631 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 632 633 if (mPlaylist == NULL) { 634 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 635 636 postPrepared(ERROR_IO); 637 return; 638 } 639 640 // We trust the content provider to make a reasonable choice of preferred 641 // initial bandwidth by listing it first in the variant playlist. 642 // At startup we really don't have a good estimate on the available 643 // network bandwidth since we haven't tranferred any data yet. Once 644 // we have we can make a better informed choice. 645 size_t initialBandwidth = 0; 646 size_t initialBandwidthIndex = 0; 647 648 if (mPlaylist->isVariantPlaylist()) { 649 for (size_t i = 0; i < mPlaylist->size(); ++i) { 650 BandwidthItem item; 651 652 item.mPlaylistIndex = i; 653 654 sp<AMessage> meta; 655 AString uri; 656 mPlaylist->itemAt(i, &uri, &meta); 657 658 unsigned long bandwidth; 659 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 660 661 if (initialBandwidth == 0) { 662 initialBandwidth = item.mBandwidth; 663 } 664 665 mBandwidthItems.push(item); 666 } 667 668 CHECK_GT(mBandwidthItems.size(), 0u); 669 670 mBandwidthItems.sort(SortByBandwidth); 671 672 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 673 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 674 initialBandwidthIndex = i; 675 break; 676 } 677 } 678 } else { 679 // dummy item. 680 BandwidthItem item; 681 item.mPlaylistIndex = 0; 682 item.mBandwidth = 0; 683 mBandwidthItems.push(item); 684 } 685 686 mPlaylist->pickRandomMediaItems(); 687 changeConfiguration( 688 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 689} 690 691void LiveSession::finishDisconnect() { 692 // No reconfiguration is currently pending, make sure none will trigger 693 // during disconnection either. 694 cancelCheckBandwidthEvent(); 695 696 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 697 // (finishDisconnect, onFinishDisconnect2) 698 cancelBandwidthSwitch(); 699 700 // cancel switch down monitor 701 mSwitchDownMonitor.clear(); 702 703 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 704 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 705 } 706 707 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 708 709 mContinuationCounter = mFetcherInfos.size(); 710 mContinuation = msg; 711 712 if (mContinuationCounter == 0) { 713 msg->post(); 714 } 715} 716 717void LiveSession::onFinishDisconnect2() { 718 mContinuation.clear(); 719 720 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 721 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 722 723 mPacketSources.valueFor( 724 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 725 726 sp<AMessage> response = new AMessage; 727 response->setInt32("err", OK); 728 729 response->postReply(mDisconnectReplyID); 730 mDisconnectReplyID = 0; 731} 732 733sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 734 ssize_t index = mFetcherInfos.indexOfKey(uri); 735 736 if (index >= 0) { 737 return NULL; 738 } 739 740 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 741 notify->setString("uri", uri); 742 notify->setInt32("switchGeneration", mSwitchGeneration); 743 744 FetcherInfo info; 745 info.mFetcher = new PlaylistFetcher(notify, this, uri); 746 info.mDurationUs = -1ll; 747 info.mIsPrepared = false; 748 info.mToBeRemoved = false; 749 looper()->registerHandler(info.mFetcher); 750 751 mFetcherInfos.add(uri, info); 752 753 return info.mFetcher; 754} 755 756/* 757 * Illustration of parameters: 758 * 759 * 0 `range_offset` 760 * +------------+-------------------------------------------------------+--+--+ 761 * | | | next block to fetch | | | 762 * | | `source` handle => `out` buffer | | | | 763 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 764 * | |<----------- `range_length` / buffer capacity ----------->| | 765 * |<------------------------------ file_size ------------------------------->| 766 * 767 * Special parameter values: 768 * - range_length == -1 means entire file 769 * - block_size == 0 means entire range 770 * 771 */ 772ssize_t LiveSession::fetchFile( 773 const char *url, sp<ABuffer> *out, 774 int64_t range_offset, int64_t range_length, 775 uint32_t block_size, /* download block size */ 776 sp<DataSource> *source, /* to return and reuse source */ 777 String8 *actualUrl) { 778 off64_t size; 779 sp<DataSource> temp_source; 780 if (source == NULL) { 781 source = &temp_source; 782 } 783 784 if (*source == NULL) { 785 if (!strncasecmp(url, "file://", 7)) { 786 *source = new FileSource(url + 7); 787 } else if (strncasecmp(url, "http://", 7) 788 && strncasecmp(url, "https://", 8)) { 789 return ERROR_UNSUPPORTED; 790 } else { 791 KeyedVector<String8, String8> headers = mExtraHeaders; 792 if (range_offset > 0 || range_length >= 0) { 793 headers.add( 794 String8("Range"), 795 String8( 796 StringPrintf( 797 "bytes=%lld-%s", 798 range_offset, 799 range_length < 0 800 ? "" : StringPrintf("%lld", 801 range_offset + range_length - 1).c_str()).c_str())); 802 } 803 status_t err = mHTTPDataSource->connect(url, &headers); 804 805 if (err != OK) { 806 return err; 807 } 808 809 *source = mHTTPDataSource; 810 } 811 } 812 813 status_t getSizeErr = (*source)->getSize(&size); 814 if (getSizeErr != OK) { 815 size = 65536; 816 } 817 818 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 819 if (*out == NULL) { 820 buffer->setRange(0, 0); 821 } 822 823 ssize_t bytesRead = 0; 824 // adjust range_length if only reading partial block 825 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 826 range_length = buffer->size() + block_size; 827 } 828 for (;;) { 829 // Only resize when we don't know the size. 830 size_t bufferRemaining = buffer->capacity() - buffer->size(); 831 if (bufferRemaining == 0 && getSizeErr != OK) { 832 bufferRemaining = 32768; 833 834 ALOGV("increasing download buffer to %zu bytes", 835 buffer->size() + bufferRemaining); 836 837 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 838 memcpy(copy->data(), buffer->data(), buffer->size()); 839 copy->setRange(0, buffer->size()); 840 841 buffer = copy; 842 } 843 844 size_t maxBytesToRead = bufferRemaining; 845 if (range_length >= 0) { 846 int64_t bytesLeftInRange = range_length - buffer->size(); 847 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 848 maxBytesToRead = bytesLeftInRange; 849 850 if (bytesLeftInRange == 0) { 851 break; 852 } 853 } 854 } 855 856 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 857 // to help us break out of the loop. 858 ssize_t n = (*source)->readAt( 859 buffer->size(), buffer->data() + buffer->size(), 860 maxBytesToRead); 861 862 if (n < 0) { 863 return n; 864 } 865 866 if (n == 0) { 867 break; 868 } 869 870 buffer->setRange(0, buffer->size() + (size_t)n); 871 bytesRead += n; 872 } 873 874 *out = buffer; 875 if (actualUrl != NULL) { 876 *actualUrl = (*source)->getUri(); 877 if (actualUrl->isEmpty()) { 878 *actualUrl = url; 879 } 880 } 881 882 return bytesRead; 883} 884 885sp<M3UParser> LiveSession::fetchPlaylist( 886 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 887 ALOGV("fetchPlaylist '%s'", url); 888 889 *unchanged = false; 890 891 sp<ABuffer> buffer; 892 String8 actualUrl; 893 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 894 895 if (err <= 0) { 896 return NULL; 897 } 898 899 // MD5 functionality is not available on the simulator, treat all 900 // playlists as changed. 901 902#if defined(HAVE_ANDROID_OS) 903 uint8_t hash[16]; 904 905 MD5_CTX m; 906 MD5_Init(&m); 907 MD5_Update(&m, buffer->data(), buffer->size()); 908 909 MD5_Final(hash, &m); 910 911 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 912 // playlist unchanged 913 *unchanged = true; 914 915 return NULL; 916 } 917 918 if (curPlaylistHash != NULL) { 919 memcpy(curPlaylistHash, hash, sizeof(hash)); 920 } 921#endif 922 923 sp<M3UParser> playlist = 924 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 925 926 if (playlist->initCheck() != OK) { 927 ALOGE("failed to parse .m3u8 playlist"); 928 929 return NULL; 930 } 931 932 return playlist; 933} 934 935static double uniformRand() { 936 return (double)rand() / RAND_MAX; 937} 938 939size_t LiveSession::getBandwidthIndex() { 940 if (mBandwidthItems.size() == 0) { 941 return 0; 942 } 943 944#if 1 945 char value[PROPERTY_VALUE_MAX]; 946 ssize_t index = -1; 947 if (property_get("media.httplive.bw-index", value, NULL)) { 948 char *end; 949 index = strtol(value, &end, 10); 950 CHECK(end > value && *end == '\0'); 951 952 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 953 index = mBandwidthItems.size() - 1; 954 } 955 } 956 957 if (index < 0) { 958 int32_t bandwidthBps; 959 if (mHTTPDataSource != NULL 960 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 961 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 962 } else { 963 ALOGV("no bandwidth estimate."); 964 return 0; // Pick the lowest bandwidth stream by default. 965 } 966 967 char value[PROPERTY_VALUE_MAX]; 968 if (property_get("media.httplive.max-bw", value, NULL)) { 969 char *end; 970 long maxBw = strtoul(value, &end, 10); 971 if (end > value && *end == '\0') { 972 if (maxBw > 0 && bandwidthBps > maxBw) { 973 ALOGV("bandwidth capped to %ld bps", maxBw); 974 bandwidthBps = maxBw; 975 } 976 } 977 } 978 979 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 980 981 index = mBandwidthItems.size() - 1; 982 while (index > 0) { 983 // consider only 80% of the available bandwidth, but if we are switching up, 984 // be even more conservative (70%) to avoid overestimating and immediately 985 // switching back. 986 size_t adjustedBandwidthBps = bandwidthBps; 987 if (index > mCurBandwidthIndex) { 988 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 989 } else { 990 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 991 } 992 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 993 break; 994 } 995 --index; 996 } 997 } 998#elif 0 999 // Change bandwidth at random() 1000 size_t index = uniformRand() * mBandwidthItems.size(); 1001#elif 0 1002 // There's a 50% chance to stay on the current bandwidth and 1003 // a 50% chance to switch to the next higher bandwidth (wrapping around 1004 // to lowest) 1005 const size_t kMinIndex = 0; 1006 1007 static ssize_t mCurBandwidthIndex = -1; 1008 1009 size_t index; 1010 if (mCurBandwidthIndex < 0) { 1011 index = kMinIndex; 1012 } else if (uniformRand() < 0.5) { 1013 index = (size_t)mCurBandwidthIndex; 1014 } else { 1015 index = mCurBandwidthIndex + 1; 1016 if (index == mBandwidthItems.size()) { 1017 index = kMinIndex; 1018 } 1019 } 1020 mCurBandwidthIndex = index; 1021#elif 0 1022 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1023 1024 size_t index = mBandwidthItems.size() - 1; 1025 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1026 --index; 1027 } 1028#elif 1 1029 char value[PROPERTY_VALUE_MAX]; 1030 size_t index; 1031 if (property_get("media.httplive.bw-index", value, NULL)) { 1032 char *end; 1033 index = strtoul(value, &end, 10); 1034 CHECK(end > value && *end == '\0'); 1035 1036 if (index >= mBandwidthItems.size()) { 1037 index = mBandwidthItems.size() - 1; 1038 } 1039 } else { 1040 index = 0; 1041 } 1042#else 1043 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1044#endif 1045 1046 CHECK_GE(index, 0); 1047 1048 return index; 1049} 1050 1051status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1052 int64_t timeUs; 1053 CHECK(msg->findInt64("timeUs", &timeUs)); 1054 1055 if (!mReconfigurationInProgress) { 1056 changeConfiguration(timeUs, getBandwidthIndex()); 1057 } 1058 1059 return OK; 1060} 1061 1062status_t LiveSession::getDuration(int64_t *durationUs) const { 1063 int64_t maxDurationUs = 0ll; 1064 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1065 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1066 1067 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 1068 maxDurationUs = fetcherDurationUs; 1069 } 1070 } 1071 1072 *durationUs = maxDurationUs; 1073 1074 return OK; 1075} 1076 1077bool LiveSession::isSeekable() const { 1078 int64_t durationUs; 1079 return getDuration(&durationUs) == OK && durationUs >= 0; 1080} 1081 1082bool LiveSession::hasDynamicDuration() const { 1083 return false; 1084} 1085 1086size_t LiveSession::getTrackCount() const { 1087 if (mPlaylist == NULL) { 1088 return 0; 1089 } else { 1090 return mPlaylist->getTrackCount(); 1091 } 1092} 1093 1094sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1095 if (mPlaylist == NULL) { 1096 return NULL; 1097 } else { 1098 return mPlaylist->getTrackInfo(trackIndex); 1099 } 1100} 1101 1102status_t LiveSession::selectTrack(size_t index, bool select) { 1103 status_t err = mPlaylist->selectTrack(index, select); 1104 if (err == OK) { 1105 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1106 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1107 msg->setInt32("pickTrack", select); 1108 msg->post(); 1109 } 1110 return err; 1111} 1112 1113bool LiveSession::canSwitchUp() { 1114 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1115 status_t err = OK; 1116 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1117 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1118 int64_t dur = source->getBufferedDurationUs(&err); 1119 if (err == OK && dur > 10000000) { 1120 return true; 1121 } 1122 } 1123 return false; 1124} 1125 1126void LiveSession::changeConfiguration( 1127 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1128 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1129 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1130 cancelBandwidthSwitch(); 1131 1132 CHECK(!mReconfigurationInProgress); 1133 mReconfigurationInProgress = true; 1134 1135 mCurBandwidthIndex = bandwidthIndex; 1136 1137 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1138 timeUs, bandwidthIndex, pickTrack); 1139 1140 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1141 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1142 1143 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1144 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1145 1146 AString URIs[kMaxStreams]; 1147 for (size_t i = 0; i < kMaxStreams; ++i) { 1148 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1149 streamMask |= indexToType(i); 1150 } 1151 } 1152 1153 // Step 1, stop and discard fetchers that are no longer needed. 1154 // Pause those that we'll reuse. 1155 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1156 const AString &uri = mFetcherInfos.keyAt(i); 1157 1158 bool discardFetcher = true; 1159 1160 // If we're seeking all current fetchers are discarded. 1161 if (timeUs < 0ll) { 1162 // delay fetcher removal if not picking tracks 1163 discardFetcher = pickTrack; 1164 1165 for (size_t j = 0; j < kMaxStreams; ++j) { 1166 StreamType type = indexToType(j); 1167 if ((streamMask & type) && uri == URIs[j]) { 1168 resumeMask |= type; 1169 streamMask &= ~type; 1170 discardFetcher = false; 1171 } 1172 } 1173 } 1174 1175 if (discardFetcher) { 1176 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1177 } else { 1178 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1179 } 1180 } 1181 1182 sp<AMessage> msg; 1183 if (timeUs < 0ll) { 1184 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1185 msg = new AMessage(kWhatChangeConfiguration3, id()); 1186 } else { 1187 msg = new AMessage(kWhatChangeConfiguration2, id()); 1188 } 1189 msg->setInt32("streamMask", streamMask); 1190 msg->setInt32("resumeMask", resumeMask); 1191 msg->setInt32("pickTrack", pickTrack); 1192 msg->setInt64("timeUs", timeUs); 1193 for (size_t i = 0; i < kMaxStreams; ++i) { 1194 if ((streamMask | resumeMask) & indexToType(i)) { 1195 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1196 } 1197 } 1198 1199 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1200 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1201 // fetchers have completed their asynchronous operation, we'll post 1202 // mContinuation, which then is handled below in onChangeConfiguration2. 1203 mContinuationCounter = mFetcherInfos.size(); 1204 mContinuation = msg; 1205 1206 if (mContinuationCounter == 0) { 1207 msg->post(); 1208 1209 if (mSeekReplyID != 0) { 1210 CHECK(mSeekReply != NULL); 1211 mSeekReply->postReply(mSeekReplyID); 1212 } 1213 } 1214} 1215 1216void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1217 if (!mReconfigurationInProgress) { 1218 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1219 msg->findInt32("pickTrack", &pickTrack); 1220 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1221 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1222 } else { 1223 msg->post(1000000ll); // retry in 1 sec 1224 } 1225} 1226 1227void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1228 mContinuation.clear(); 1229 1230 // All fetchers are either suspended or have been removed now. 1231 1232 uint32_t streamMask, resumeMask; 1233 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1234 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1235 1236 // currently onChangeConfiguration2 is only called for seeking; 1237 // remove the following CHECK if using it else where. 1238 CHECK_EQ(resumeMask, 0); 1239 streamMask |= resumeMask; 1240 1241 AString URIs[kMaxStreams]; 1242 for (size_t i = 0; i < kMaxStreams; ++i) { 1243 if (streamMask & indexToType(i)) { 1244 const AString &uriKey = mStreams[i].uriKey(); 1245 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1246 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1247 } 1248 } 1249 1250 // Determine which decoders to shutdown on the player side, 1251 // a decoder has to be shutdown if either 1252 // 1) its streamtype was active before but now longer isn't. 1253 // or 1254 // 2) its streamtype was already active and still is but the URI 1255 // has changed. 1256 uint32_t changedMask = 0; 1257 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1258 if (((mStreamMask & streamMask & indexToType(i)) 1259 && !(URIs[i] == mStreams[i].mUri)) 1260 || (mStreamMask & ~streamMask & indexToType(i))) { 1261 changedMask |= indexToType(i); 1262 } 1263 } 1264 1265 if (changedMask == 0) { 1266 // If nothing changed as far as the audio/video decoders 1267 // are concerned we can proceed. 1268 onChangeConfiguration3(msg); 1269 return; 1270 } 1271 1272 // Something changed, inform the player which will shutdown the 1273 // corresponding decoders and will post the reply once that's done. 1274 // Handling the reply will continue executing below in 1275 // onChangeConfiguration3. 1276 sp<AMessage> notify = mNotify->dup(); 1277 notify->setInt32("what", kWhatStreamsChanged); 1278 notify->setInt32("changedMask", changedMask); 1279 1280 msg->setWhat(kWhatChangeConfiguration3); 1281 msg->setTarget(id()); 1282 1283 notify->setMessage("reply", msg); 1284 notify->post(); 1285} 1286 1287void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1288 mContinuation.clear(); 1289 // All remaining fetchers are still suspended, the player has shutdown 1290 // any decoders that needed it. 1291 1292 uint32_t streamMask, resumeMask; 1293 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1294 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1295 1296 int64_t timeUs; 1297 int32_t pickTrack; 1298 bool switching = false; 1299 CHECK(msg->findInt64("timeUs", &timeUs)); 1300 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1301 1302 if (timeUs < 0ll) { 1303 if (!pickTrack) { 1304 switching = true; 1305 } 1306 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1307 } else { 1308 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1309 } 1310 1311 for (size_t i = 0; i < kMaxStreams; ++i) { 1312 if (streamMask & indexToType(i)) { 1313 if (switching) { 1314 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1315 } else { 1316 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1317 } 1318 } 1319 } 1320 1321 mNewStreamMask = streamMask | resumeMask; 1322 if (switching) { 1323 mSwapMask = mStreamMask & ~resumeMask; 1324 } 1325 1326 // Of all existing fetchers: 1327 // * Resume fetchers that are still needed and assign them original packet sources. 1328 // * Mark otherwise unneeded fetchers for removal. 1329 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1330 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1331 const AString &uri = mFetcherInfos.keyAt(i); 1332 1333 sp<AnotherPacketSource> sources[kMaxStreams]; 1334 for (size_t j = 0; j < kMaxStreams; ++j) { 1335 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1336 sources[j] = mPacketSources.valueFor(indexToType(j)); 1337 1338 if (j != kSubtitleIndex) { 1339 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1340 sp<AnotherPacketSource> discontinuityQueue; 1341 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1342 discontinuityQueue->queueDiscontinuity( 1343 ATSParser::DISCONTINUITY_NONE, 1344 NULL, 1345 true); 1346 } 1347 } 1348 } 1349 1350 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1351 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1352 || sources[kSubtitleIndex] != NULL) { 1353 info.mFetcher->startAsync( 1354 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1355 } else { 1356 info.mToBeRemoved = true; 1357 } 1358 } 1359 1360 // streamMask now only contains the types that need a new fetcher created. 1361 1362 if (streamMask != 0) { 1363 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1364 } 1365 1366 // Find out when the original fetchers have buffered up to and start the new fetchers 1367 // at a later timestamp. 1368 for (size_t i = 0; i < kMaxStreams; i++) { 1369 if (!(indexToType(i) & streamMask)) { 1370 continue; 1371 } 1372 1373 AString uri; 1374 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1375 1376 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1377 CHECK(fetcher != NULL); 1378 1379 int32_t latestSeq = -1; 1380 int64_t startTimeUs = -1; 1381 int64_t segmentStartTimeUs = -1ll; 1382 int32_t discontinuitySeq = -1; 1383 sp<AnotherPacketSource> sources[kMaxStreams]; 1384 1385 // TRICKY: looping from i as earlier streams are already removed from streamMask 1386 for (size_t j = i; j < kMaxStreams; ++j) { 1387 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1388 if ((streamMask & indexToType(j)) && uri == streamUri) { 1389 sources[j] = mPacketSources.valueFor(indexToType(j)); 1390 1391 if (timeUs >= 0) { 1392 sources[j]->clear(); 1393 startTimeUs = timeUs; 1394 1395 sp<AnotherPacketSource> discontinuityQueue; 1396 sp<AMessage> extra = new AMessage; 1397 extra->setInt64("timeUs", timeUs); 1398 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1399 discontinuityQueue->queueDiscontinuity( 1400 ATSParser::DISCONTINUITY_SEEK, extra, true); 1401 } else { 1402 int32_t type; 1403 int64_t srcSegmentStartTimeUs; 1404 sp<AMessage> meta; 1405 if (pickTrack) { 1406 // selecting 1407 meta = sources[j]->getLatestDequeuedMeta(); 1408 } else { 1409 // adapting 1410 meta = sources[j]->getLatestEnqueuedMeta(); 1411 } 1412 1413 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1414 int64_t tmpUs; 1415 CHECK(meta->findInt64("timeUs", &tmpUs)); 1416 if (startTimeUs < 0 || tmpUs < startTimeUs) { 1417 startTimeUs = tmpUs; 1418 } 1419 1420 CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); 1421 if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { 1422 segmentStartTimeUs = tmpUs; 1423 } 1424 1425 int32_t seq; 1426 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1427 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1428 discontinuitySeq = seq; 1429 } 1430 } 1431 1432 if (pickTrack) { 1433 // selecting track, queue discontinuities before content 1434 sources[j]->clear(); 1435 if (j == kSubtitleIndex) { 1436 break; 1437 } 1438 sp<AnotherPacketSource> discontinuityQueue; 1439 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1440 discontinuityQueue->queueDiscontinuity( 1441 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1442 } else { 1443 // adapting, queue discontinuities after resume 1444 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1445 sources[j]->clear(); 1446 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1447 if (extraStreams & indexToType(j)) { 1448 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1449 } 1450 } 1451 } 1452 1453 streamMask &= ~indexToType(j); 1454 } 1455 } 1456 1457 fetcher->startAsync( 1458 sources[kAudioIndex], 1459 sources[kVideoIndex], 1460 sources[kSubtitleIndex], 1461 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1462 segmentStartTimeUs, 1463 discontinuitySeq, 1464 switching); 1465 } 1466 1467 // All fetchers have now been started, the configuration change 1468 // has completed. 1469 1470 cancelCheckBandwidthEvent(); 1471 scheduleCheckBandwidthEvent(); 1472 1473 ALOGV("XXX configuration change completed."); 1474 mReconfigurationInProgress = false; 1475 if (switching) { 1476 mSwitchInProgress = true; 1477 } else { 1478 mStreamMask = mNewStreamMask; 1479 } 1480 1481 if (mDisconnectReplyID != 0) { 1482 finishDisconnect(); 1483 } 1484} 1485 1486void LiveSession::onSwapped(const sp<AMessage> &msg) { 1487 int32_t switchGeneration; 1488 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1489 if (switchGeneration != mSwitchGeneration) { 1490 return; 1491 } 1492 1493 int32_t stream; 1494 CHECK(msg->findInt32("stream", &stream)); 1495 1496 ssize_t idx = typeToIndex(stream); 1497 CHECK(idx >= 0); 1498 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1499 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1500 } 1501 mStreams[idx].mUri = mStreams[idx].mNewUri; 1502 mStreams[idx].mNewUri.clear(); 1503 1504 mSwapMask &= ~stream; 1505 if (mSwapMask != 0) { 1506 return; 1507 } 1508 1509 // Check if new variant contains extra streams. 1510 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1511 while (extraStreams) { 1512 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1513 swapPacketSource(extraStream); 1514 extraStreams &= ~extraStream; 1515 1516 idx = typeToIndex(extraStream); 1517 CHECK(idx >= 0); 1518 if (mStreams[idx].mNewUri.empty()) { 1519 ALOGW("swapping extra stream type %d %s to empty stream", 1520 extraStream, mStreams[idx].mUri.c_str()); 1521 } 1522 mStreams[idx].mUri = mStreams[idx].mNewUri; 1523 mStreams[idx].mNewUri.clear(); 1524 } 1525 1526 tryToFinishBandwidthSwitch(); 1527} 1528 1529void LiveSession::onCheckSwitchDown() { 1530 if (mSwitchDownMonitor == NULL) { 1531 return; 1532 } 1533 1534 for (size_t i = 0; i < kMaxStreams; ++i) { 1535 int32_t targetDuration; 1536 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1537 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1538 1539 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1540 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1541 int64_t targetDurationUs = targetDuration * 1000000ll; 1542 1543 if (bufferedDurationUs < targetDurationUs / 3) { 1544 (new AMessage(kWhatSwitchDown, id()))->post(); 1545 break; 1546 } 1547 } 1548 } 1549 1550 mSwitchDownMonitor->post(1000000ll); 1551} 1552 1553void LiveSession::onSwitchDown() { 1554 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1555 return; 1556 } 1557 1558 ssize_t bandwidthIndex = getBandwidthIndex(); 1559 if (bandwidthIndex < mCurBandwidthIndex) { 1560 changeConfiguration(-1, bandwidthIndex, false); 1561 return; 1562 } 1563 1564 changeConfiguration(-1, mCurBandwidthIndex - 1, false); 1565} 1566 1567// Mark switch done when: 1568// 1. all old buffers are swapped out 1569void LiveSession::tryToFinishBandwidthSwitch() { 1570 if (!mSwitchInProgress) { 1571 return; 1572 } 1573 1574 bool needToRemoveFetchers = false; 1575 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1576 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1577 needToRemoveFetchers = true; 1578 break; 1579 } 1580 } 1581 1582 if (!needToRemoveFetchers && mSwapMask == 0) { 1583 ALOGI("mSwitchInProgress = false"); 1584 mStreamMask = mNewStreamMask; 1585 mSwitchInProgress = false; 1586 } 1587} 1588 1589void LiveSession::scheduleCheckBandwidthEvent() { 1590 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1591 msg->setInt32("generation", mCheckBandwidthGeneration); 1592 msg->post(10000000ll); 1593} 1594 1595void LiveSession::cancelCheckBandwidthEvent() { 1596 ++mCheckBandwidthGeneration; 1597} 1598 1599void LiveSession::cancelBandwidthSwitch() { 1600 Mutex::Autolock lock(mSwapMutex); 1601 mSwitchGeneration++; 1602 mSwitchInProgress = false; 1603 mSwapMask = 0; 1604 1605 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1606 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1607 if (info.mToBeRemoved) { 1608 info.mToBeRemoved = false; 1609 } 1610 } 1611 1612 for (size_t i = 0; i < kMaxStreams; ++i) { 1613 if (!mStreams[i].mNewUri.empty()) { 1614 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1615 if (j < 0) { 1616 mStreams[i].mNewUri.clear(); 1617 continue; 1618 } 1619 1620 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1621 info.mFetcher->stopAsync(); 1622 mFetcherInfos.removeItemsAt(j); 1623 mStreams[i].mNewUri.clear(); 1624 } 1625 } 1626} 1627 1628bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1629 if (mReconfigurationInProgress || mSwitchInProgress) { 1630 return false; 1631 } 1632 1633 if (mCurBandwidthIndex < 0) { 1634 return true; 1635 } 1636 1637 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1638 return false; 1639 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1640 return canSwitchUp(); 1641 } else { 1642 return true; 1643 } 1644} 1645 1646void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { 1647 size_t bandwidthIndex = getBandwidthIndex(); 1648 if (canSwitchBandwidthTo(bandwidthIndex)) { 1649 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1650 } else { 1651 // Come back and check again 10 seconds later in case there is nothing to do now. 1652 // If we DO change configuration, once that completes it'll schedule a new 1653 // check bandwidth event with an incremented mCheckBandwidthGeneration. 1654 msg->post(10000000ll); 1655 } 1656} 1657 1658void LiveSession::postPrepared(status_t err) { 1659 CHECK(mInPreparationPhase); 1660 1661 sp<AMessage> notify = mNotify->dup(); 1662 if (err == OK || err == ERROR_END_OF_STREAM) { 1663 notify->setInt32("what", kWhatPrepared); 1664 } else { 1665 notify->setInt32("what", kWhatPreparationFailed); 1666 notify->setInt32("err", err); 1667 } 1668 1669 notify->post(); 1670 1671 mInPreparationPhase = false; 1672 1673 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1674 mSwitchDownMonitor->post(); 1675} 1676 1677} // namespace android 1678 1679