LiveSession.cpp revision f4a48dfa8570d6a4708a868b8b15d1236f7ca54b
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52LiveSession::LiveSession( 53 const sp<AMessage> ¬ify, uint32_t flags, 54 const sp<IMediaHTTPService> &httpService) 55 : mNotify(notify), 56 mFlags(flags), 57 mHTTPService(httpService), 58 mInPreparationPhase(true), 59 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 60 mCurBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mLastDequeuedTimeUs(0ll), 67 mRealTimeBaseUs(0ll), 68 mReconfigurationInProgress(false), 69 mSwitchInProgress(false), 70 mDisconnectReplyID(0), 71 mSeekReplyID(0), 72 mFirstTimeUsValid(false), 73 mFirstTimeUs(0), 74 mLastSeekTimeUs(0) { 75 76 mStreams[kAudioIndex] = StreamItem("audio"); 77 mStreams[kVideoIndex] = StreamItem("video"); 78 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 79 80 for (size_t i = 0; i < kMaxStreams; ++i) { 81 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 82 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 84 mBuffering[i] = false; 85 } 86} 87 88LiveSession::~LiveSession() { 89} 90 91sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 92 ABuffer *discontinuity = new ABuffer(0); 93 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 94 discontinuity->meta()->setInt32("swapPacketSource", swap); 95 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 96 discontinuity->meta()->setInt64("timeUs", -1); 97 return discontinuity; 98} 99 100void LiveSession::swapPacketSource(StreamType stream) { 101 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 102 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 103 sp<AnotherPacketSource> tmp = aps; 104 aps = aps2; 105 aps2 = tmp; 106 aps2->clear(); 107} 108 109status_t LiveSession::dequeueAccessUnit( 110 StreamType stream, sp<ABuffer> *accessUnit) { 111 if (!(mStreamMask & stream)) { 112 // return -EWOULDBLOCK to avoid halting the decoder 113 // when switching between audio/video and audio only. 114 return -EWOULDBLOCK; 115 } 116 117 status_t finalResult; 118 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 119 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 120 discontinuityQueue->dequeueAccessUnit(accessUnit); 121 // seeking, track switching 122 sp<AMessage> extra; 123 int64_t timeUs; 124 if ((*accessUnit)->meta()->findMessage("extra", &extra) 125 && extra != NULL 126 && extra->findInt64("timeUs", &timeUs)) { 127 // seeking only 128 mLastSeekTimeUs = timeUs; 129 mDiscontinuityOffsetTimesUs.clear(); 130 mDiscontinuityAbsStartTimesUs.clear(); 131 } 132 return INFO_DISCONTINUITY; 133 } 134 135 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 136 137 ssize_t idx = typeToIndex(stream); 138 if (!packetSource->hasBufferAvailable(&finalResult)) { 139 if (finalResult == OK) { 140 mBuffering[idx] = true; 141 return -EAGAIN; 142 } else { 143 return finalResult; 144 } 145 } 146 147 if (mBuffering[idx]) { 148 if (mSwitchInProgress 149 || packetSource->isFinished(0) 150 || packetSource->getEstimatedDurationUs() > 10000000ll) { 151 mBuffering[idx] = false; 152 } 153 } 154 155 if (mBuffering[idx]) { 156 return -EAGAIN; 157 } 158 159 // wait for counterpart 160 sp<AnotherPacketSource> otherSource; 161 if (stream == STREAMTYPE_AUDIO && (mStreamMask & STREAMTYPE_VIDEO)) { 162 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 163 } else if (stream == STREAMTYPE_VIDEO && (mStreamMask & STREAMTYPE_AUDIO)) { 164 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 165 } 166 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 167 return finalResult == OK ? -EAGAIN : finalResult; 168 } 169 170 status_t err = packetSource->dequeueAccessUnit(accessUnit); 171 172 size_t streamIdx; 173 const char *streamStr; 174 switch (stream) { 175 case STREAMTYPE_AUDIO: 176 streamIdx = kAudioIndex; 177 streamStr = "audio"; 178 break; 179 case STREAMTYPE_VIDEO: 180 streamIdx = kVideoIndex; 181 streamStr = "video"; 182 break; 183 case STREAMTYPE_SUBTITLES: 184 streamIdx = kSubtitleIndex; 185 streamStr = "subs"; 186 break; 187 default: 188 TRESPASS(); 189 } 190 191 StreamItem& strm = mStreams[streamIdx]; 192 if (err == INFO_DISCONTINUITY) { 193 // adaptive streaming, discontinuities in the playlist 194 int32_t type; 195 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 196 197 sp<AMessage> extra; 198 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 199 extra.clear(); 200 } 201 202 ALOGI("[%s] read discontinuity of type %d, extra = %s", 203 streamStr, 204 type, 205 extra == NULL ? "NULL" : extra->debugString().c_str()); 206 207 int32_t swap; 208 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 209 int32_t switchGeneration; 210 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 211 { 212 Mutex::Autolock lock(mSwapMutex); 213 if (switchGeneration == mSwitchGeneration) { 214 swapPacketSource(stream); 215 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 216 msg->setInt32("stream", stream); 217 msg->setInt32("switchGeneration", switchGeneration); 218 msg->post(); 219 } 220 } 221 } else { 222 size_t seq = strm.mCurDiscontinuitySeq; 223 int64_t offsetTimeUs; 224 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 225 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 226 } else { 227 offsetTimeUs = 0; 228 } 229 230 seq += 1; 231 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 232 int64_t firstTimeUs; 233 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 234 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 235 offsetTimeUs += strm.mLastSampleDurationUs; 236 } else { 237 offsetTimeUs += strm.mLastSampleDurationUs; 238 } 239 240 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 241 } 242 } else if (err == OK) { 243 244 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 245 int64_t timeUs; 246 int32_t discontinuitySeq = 0; 247 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 248 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 249 strm.mCurDiscontinuitySeq = discontinuitySeq; 250 251 int32_t discard = 0; 252 int64_t firstTimeUs; 253 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 254 int64_t durUs; // approximate sample duration 255 if (timeUs > strm.mLastDequeuedTimeUs) { 256 durUs = timeUs - strm.mLastDequeuedTimeUs; 257 } else { 258 durUs = strm.mLastDequeuedTimeUs - timeUs; 259 } 260 strm.mLastSampleDurationUs = durUs; 261 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 262 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 263 firstTimeUs = timeUs; 264 } else { 265 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 266 firstTimeUs = timeUs; 267 } 268 269 strm.mLastDequeuedTimeUs = timeUs; 270 if (timeUs >= firstTimeUs) { 271 timeUs -= firstTimeUs; 272 } else { 273 timeUs = 0; 274 } 275 timeUs += mLastSeekTimeUs; 276 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 277 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 278 } 279 280 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 281 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 282 mLastDequeuedTimeUs = timeUs; 283 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 284 } else if (stream == STREAMTYPE_SUBTITLES) { 285 (*accessUnit)->meta()->setInt32( 286 "trackIndex", mPlaylist->getSelectedIndex()); 287 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 288 } 289 } else { 290 ALOGI("[%s] encountered error %d", streamStr, err); 291 } 292 293 return err; 294} 295 296status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 297 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 298 if (!(mStreamMask & stream)) { 299 return UNKNOWN_ERROR; 300 } 301 302 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 303 304 sp<MetaData> meta = packetSource->getFormat(); 305 306 if (meta == NULL) { 307 return -EAGAIN; 308 } 309 310 return convertMetaDataToMessage(meta, format); 311} 312 313void LiveSession::connectAsync( 314 const char *url, const KeyedVector<String8, String8> *headers) { 315 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 316 msg->setString("url", url); 317 318 if (headers != NULL) { 319 msg->setPointer( 320 "headers", 321 new KeyedVector<String8, String8>(*headers)); 322 } 323 324 msg->post(); 325} 326 327status_t LiveSession::disconnect() { 328 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 329 330 sp<AMessage> response; 331 status_t err = msg->postAndAwaitResponse(&response); 332 333 return err; 334} 335 336status_t LiveSession::seekTo(int64_t timeUs) { 337 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 338 msg->setInt64("timeUs", timeUs); 339 340 sp<AMessage> response; 341 status_t err = msg->postAndAwaitResponse(&response); 342 343 uint32_t replyID; 344 CHECK(response == mSeekReply && 0 != mSeekReplyID); 345 mSeekReply.clear(); 346 mSeekReplyID = 0; 347 return err; 348} 349 350void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 351 switch (msg->what()) { 352 case kWhatConnect: 353 { 354 onConnect(msg); 355 break; 356 } 357 358 case kWhatDisconnect: 359 { 360 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 361 362 if (mReconfigurationInProgress) { 363 break; 364 } 365 366 finishDisconnect(); 367 break; 368 } 369 370 case kWhatSeek: 371 { 372 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 373 374 status_t err = onSeek(msg); 375 376 mSeekReply = new AMessage; 377 mSeekReply->setInt32("err", err); 378 break; 379 } 380 381 case kWhatFetcherNotify: 382 { 383 int32_t what; 384 CHECK(msg->findInt32("what", &what)); 385 386 switch (what) { 387 case PlaylistFetcher::kWhatStarted: 388 break; 389 case PlaylistFetcher::kWhatPaused: 390 case PlaylistFetcher::kWhatStopped: 391 { 392 if (what == PlaylistFetcher::kWhatStopped) { 393 AString uri; 394 CHECK(msg->findString("uri", &uri)); 395 if (mFetcherInfos.removeItem(uri) < 0) { 396 // ignore duplicated kWhatStopped messages. 397 break; 398 } 399 400 if (mSwitchInProgress) { 401 tryToFinishBandwidthSwitch(); 402 } 403 } 404 405 if (mContinuation != NULL) { 406 CHECK_GT(mContinuationCounter, 0); 407 if (--mContinuationCounter == 0) { 408 mContinuation->post(); 409 410 if (mSeekReplyID != 0) { 411 CHECK(mSeekReply != NULL); 412 mSeekReply->postReply(mSeekReplyID); 413 } 414 } 415 } 416 break; 417 } 418 419 case PlaylistFetcher::kWhatDurationUpdate: 420 { 421 AString uri; 422 CHECK(msg->findString("uri", &uri)); 423 424 int64_t durationUs; 425 CHECK(msg->findInt64("durationUs", &durationUs)); 426 427 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 428 info->mDurationUs = durationUs; 429 break; 430 } 431 432 case PlaylistFetcher::kWhatError: 433 { 434 status_t err; 435 CHECK(msg->findInt32("err", &err)); 436 437 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 438 439 if (mInPreparationPhase) { 440 postPrepared(err); 441 } 442 443 cancelBandwidthSwitch(); 444 445 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 446 447 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 448 449 mPacketSources.valueFor( 450 STREAMTYPE_SUBTITLES)->signalEOS(err); 451 452 sp<AMessage> notify = mNotify->dup(); 453 notify->setInt32("what", kWhatError); 454 notify->setInt32("err", err); 455 notify->post(); 456 break; 457 } 458 459 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 460 { 461 AString uri; 462 CHECK(msg->findString("uri", &uri)); 463 464 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 465 info->mIsPrepared = true; 466 467 if (mInPreparationPhase) { 468 bool allFetchersPrepared = true; 469 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 470 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 471 allFetchersPrepared = false; 472 break; 473 } 474 } 475 476 if (allFetchersPrepared) { 477 postPrepared(OK); 478 } 479 } 480 break; 481 } 482 483 case PlaylistFetcher::kWhatStartedAt: 484 { 485 int32_t switchGeneration; 486 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 487 488 if (switchGeneration != mSwitchGeneration) { 489 break; 490 } 491 492 // Resume fetcher for the original variant; the resumed fetcher should 493 // continue until the timestamps found in msg, which is stored by the 494 // new fetcher to indicate where the new variant has started buffering. 495 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 496 const FetcherInfo info = mFetcherInfos.valueAt(i); 497 if (info.mToBeRemoved) { 498 info.mFetcher->resumeUntilAsync(msg); 499 } 500 } 501 break; 502 } 503 504 default: 505 TRESPASS(); 506 } 507 508 break; 509 } 510 511 case kWhatCheckBandwidth: 512 { 513 int32_t generation; 514 CHECK(msg->findInt32("generation", &generation)); 515 516 if (generation != mCheckBandwidthGeneration) { 517 break; 518 } 519 520 onCheckBandwidth(msg); 521 break; 522 } 523 524 case kWhatChangeConfiguration: 525 { 526 onChangeConfiguration(msg); 527 break; 528 } 529 530 case kWhatChangeConfiguration2: 531 { 532 onChangeConfiguration2(msg); 533 break; 534 } 535 536 case kWhatChangeConfiguration3: 537 { 538 onChangeConfiguration3(msg); 539 break; 540 } 541 542 case kWhatFinishDisconnect2: 543 { 544 onFinishDisconnect2(); 545 break; 546 } 547 548 case kWhatSwapped: 549 { 550 onSwapped(msg); 551 break; 552 } 553 554 case kWhatCheckSwitchDown: 555 { 556 onCheckSwitchDown(); 557 break; 558 } 559 560 case kWhatSwitchDown: 561 { 562 onSwitchDown(); 563 break; 564 } 565 566 default: 567 TRESPASS(); 568 break; 569 } 570} 571 572// static 573int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 574 if (a->mBandwidth < b->mBandwidth) { 575 return -1; 576 } else if (a->mBandwidth == b->mBandwidth) { 577 return 0; 578 } 579 580 return 1; 581} 582 583// static 584LiveSession::StreamType LiveSession::indexToType(int idx) { 585 CHECK(idx >= 0 && idx < kMaxStreams); 586 return (StreamType)(1 << idx); 587} 588 589// static 590ssize_t LiveSession::typeToIndex(int32_t type) { 591 switch (type) { 592 case STREAMTYPE_AUDIO: 593 return 0; 594 case STREAMTYPE_VIDEO: 595 return 1; 596 case STREAMTYPE_SUBTITLES: 597 return 2; 598 default: 599 return -1; 600 }; 601 return -1; 602} 603 604void LiveSession::onConnect(const sp<AMessage> &msg) { 605 AString url; 606 CHECK(msg->findString("url", &url)); 607 608 KeyedVector<String8, String8> *headers = NULL; 609 if (!msg->findPointer("headers", (void **)&headers)) { 610 mExtraHeaders.clear(); 611 } else { 612 mExtraHeaders = *headers; 613 614 delete headers; 615 headers = NULL; 616 } 617 618 // TODO currently we don't know if we are coming here from incognito mode 619 ALOGI("onConnect %s", uriDebugString(url).c_str()); 620 621 mMasterURL = url; 622 623 bool dummy; 624 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 625 626 if (mPlaylist == NULL) { 627 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 628 629 postPrepared(ERROR_IO); 630 return; 631 } 632 633 // We trust the content provider to make a reasonable choice of preferred 634 // initial bandwidth by listing it first in the variant playlist. 635 // At startup we really don't have a good estimate on the available 636 // network bandwidth since we haven't tranferred any data yet. Once 637 // we have we can make a better informed choice. 638 size_t initialBandwidth = 0; 639 size_t initialBandwidthIndex = 0; 640 641 if (mPlaylist->isVariantPlaylist()) { 642 for (size_t i = 0; i < mPlaylist->size(); ++i) { 643 BandwidthItem item; 644 645 item.mPlaylistIndex = i; 646 647 sp<AMessage> meta; 648 AString uri; 649 mPlaylist->itemAt(i, &uri, &meta); 650 651 unsigned long bandwidth; 652 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 653 654 if (initialBandwidth == 0) { 655 initialBandwidth = item.mBandwidth; 656 } 657 658 mBandwidthItems.push(item); 659 } 660 661 CHECK_GT(mBandwidthItems.size(), 0u); 662 663 mBandwidthItems.sort(SortByBandwidth); 664 665 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 666 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 667 initialBandwidthIndex = i; 668 break; 669 } 670 } 671 } else { 672 // dummy item. 673 BandwidthItem item; 674 item.mPlaylistIndex = 0; 675 item.mBandwidth = 0; 676 mBandwidthItems.push(item); 677 } 678 679 mPlaylist->pickRandomMediaItems(); 680 changeConfiguration( 681 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 682} 683 684void LiveSession::finishDisconnect() { 685 // No reconfiguration is currently pending, make sure none will trigger 686 // during disconnection either. 687 cancelCheckBandwidthEvent(); 688 689 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 690 // (finishDisconnect, onFinishDisconnect2) 691 cancelBandwidthSwitch(); 692 693 // cancel switch down monitor 694 mSwitchDownMonitor.clear(); 695 696 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 697 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 698 } 699 700 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 701 702 mContinuationCounter = mFetcherInfos.size(); 703 mContinuation = msg; 704 705 if (mContinuationCounter == 0) { 706 msg->post(); 707 } 708} 709 710void LiveSession::onFinishDisconnect2() { 711 mContinuation.clear(); 712 713 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 714 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 715 716 mPacketSources.valueFor( 717 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 718 719 sp<AMessage> response = new AMessage; 720 response->setInt32("err", OK); 721 722 response->postReply(mDisconnectReplyID); 723 mDisconnectReplyID = 0; 724} 725 726sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 727 ssize_t index = mFetcherInfos.indexOfKey(uri); 728 729 if (index >= 0) { 730 return NULL; 731 } 732 733 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 734 notify->setString("uri", uri); 735 notify->setInt32("switchGeneration", mSwitchGeneration); 736 737 FetcherInfo info; 738 info.mFetcher = new PlaylistFetcher(notify, this, uri); 739 info.mDurationUs = -1ll; 740 info.mIsPrepared = false; 741 info.mToBeRemoved = false; 742 looper()->registerHandler(info.mFetcher); 743 744 mFetcherInfos.add(uri, info); 745 746 return info.mFetcher; 747} 748 749/* 750 * Illustration of parameters: 751 * 752 * 0 `range_offset` 753 * +------------+-------------------------------------------------------+--+--+ 754 * | | | next block to fetch | | | 755 * | | `source` handle => `out` buffer | | | | 756 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 757 * | |<----------- `range_length` / buffer capacity ----------->| | 758 * |<------------------------------ file_size ------------------------------->| 759 * 760 * Special parameter values: 761 * - range_length == -1 means entire file 762 * - block_size == 0 means entire range 763 * 764 */ 765ssize_t LiveSession::fetchFile( 766 const char *url, sp<ABuffer> *out, 767 int64_t range_offset, int64_t range_length, 768 uint32_t block_size, /* download block size */ 769 sp<DataSource> *source, /* to return and reuse source */ 770 String8 *actualUrl) { 771 off64_t size; 772 sp<DataSource> temp_source; 773 if (source == NULL) { 774 source = &temp_source; 775 } 776 777 if (*source == NULL) { 778 if (!strncasecmp(url, "file://", 7)) { 779 *source = new FileSource(url + 7); 780 } else if (strncasecmp(url, "http://", 7) 781 && strncasecmp(url, "https://", 8)) { 782 return ERROR_UNSUPPORTED; 783 } else { 784 KeyedVector<String8, String8> headers = mExtraHeaders; 785 if (range_offset > 0 || range_length >= 0) { 786 headers.add( 787 String8("Range"), 788 String8( 789 StringPrintf( 790 "bytes=%lld-%s", 791 range_offset, 792 range_length < 0 793 ? "" : StringPrintf("%lld", 794 range_offset + range_length - 1).c_str()).c_str())); 795 } 796 status_t err = mHTTPDataSource->connect(url, &headers); 797 798 if (err != OK) { 799 return err; 800 } 801 802 *source = mHTTPDataSource; 803 } 804 } 805 806 status_t getSizeErr = (*source)->getSize(&size); 807 if (getSizeErr != OK) { 808 size = 65536; 809 } 810 811 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 812 if (*out == NULL) { 813 buffer->setRange(0, 0); 814 } 815 816 ssize_t bytesRead = 0; 817 // adjust range_length if only reading partial block 818 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 819 range_length = buffer->size() + block_size; 820 } 821 for (;;) { 822 // Only resize when we don't know the size. 823 size_t bufferRemaining = buffer->capacity() - buffer->size(); 824 if (bufferRemaining == 0 && getSizeErr != OK) { 825 bufferRemaining = 32768; 826 827 ALOGV("increasing download buffer to %zu bytes", 828 buffer->size() + bufferRemaining); 829 830 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 831 memcpy(copy->data(), buffer->data(), buffer->size()); 832 copy->setRange(0, buffer->size()); 833 834 buffer = copy; 835 } 836 837 size_t maxBytesToRead = bufferRemaining; 838 if (range_length >= 0) { 839 int64_t bytesLeftInRange = range_length - buffer->size(); 840 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 841 maxBytesToRead = bytesLeftInRange; 842 843 if (bytesLeftInRange == 0) { 844 break; 845 } 846 } 847 } 848 849 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 850 // to help us break out of the loop. 851 ssize_t n = (*source)->readAt( 852 buffer->size(), buffer->data() + buffer->size(), 853 maxBytesToRead); 854 855 if (n < 0) { 856 return n; 857 } 858 859 if (n == 0) { 860 break; 861 } 862 863 buffer->setRange(0, buffer->size() + (size_t)n); 864 bytesRead += n; 865 } 866 867 *out = buffer; 868 if (actualUrl != NULL) { 869 *actualUrl = (*source)->getUri(); 870 if (actualUrl->isEmpty()) { 871 *actualUrl = url; 872 } 873 } 874 875 return bytesRead; 876} 877 878sp<M3UParser> LiveSession::fetchPlaylist( 879 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 880 ALOGV("fetchPlaylist '%s'", url); 881 882 *unchanged = false; 883 884 sp<ABuffer> buffer; 885 String8 actualUrl; 886 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 887 888 if (err <= 0) { 889 return NULL; 890 } 891 892 // MD5 functionality is not available on the simulator, treat all 893 // playlists as changed. 894 895#if defined(HAVE_ANDROID_OS) 896 uint8_t hash[16]; 897 898 MD5_CTX m; 899 MD5_Init(&m); 900 MD5_Update(&m, buffer->data(), buffer->size()); 901 902 MD5_Final(hash, &m); 903 904 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 905 // playlist unchanged 906 *unchanged = true; 907 908 return NULL; 909 } 910 911 if (curPlaylistHash != NULL) { 912 memcpy(curPlaylistHash, hash, sizeof(hash)); 913 } 914#endif 915 916 sp<M3UParser> playlist = 917 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 918 919 if (playlist->initCheck() != OK) { 920 ALOGE("failed to parse .m3u8 playlist"); 921 922 return NULL; 923 } 924 925 return playlist; 926} 927 928static double uniformRand() { 929 return (double)rand() / RAND_MAX; 930} 931 932size_t LiveSession::getBandwidthIndex() { 933 if (mBandwidthItems.size() == 0) { 934 return 0; 935 } 936 937#if 1 938 char value[PROPERTY_VALUE_MAX]; 939 ssize_t index = -1; 940 if (property_get("media.httplive.bw-index", value, NULL)) { 941 char *end; 942 index = strtol(value, &end, 10); 943 CHECK(end > value && *end == '\0'); 944 945 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 946 index = mBandwidthItems.size() - 1; 947 } 948 } 949 950 if (index < 0) { 951 int32_t bandwidthBps; 952 if (mHTTPDataSource != NULL 953 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 954 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 955 } else { 956 ALOGV("no bandwidth estimate."); 957 return 0; // Pick the lowest bandwidth stream by default. 958 } 959 960 char value[PROPERTY_VALUE_MAX]; 961 if (property_get("media.httplive.max-bw", value, NULL)) { 962 char *end; 963 long maxBw = strtoul(value, &end, 10); 964 if (end > value && *end == '\0') { 965 if (maxBw > 0 && bandwidthBps > maxBw) { 966 ALOGV("bandwidth capped to %ld bps", maxBw); 967 bandwidthBps = maxBw; 968 } 969 } 970 } 971 972 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 973 974 index = mBandwidthItems.size() - 1; 975 while (index > 0) { 976 // consider only 80% of the available bandwidth, but if we are switching up, 977 // be even more conservative (70%) to avoid overestimating and immediately 978 // switching back. 979 size_t adjustedBandwidthBps = bandwidthBps; 980 if (index > mCurBandwidthIndex) { 981 adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10; 982 } else { 983 adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10; 984 } 985 if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) { 986 break; 987 } 988 --index; 989 } 990 } 991#elif 0 992 // Change bandwidth at random() 993 size_t index = uniformRand() * mBandwidthItems.size(); 994#elif 0 995 // There's a 50% chance to stay on the current bandwidth and 996 // a 50% chance to switch to the next higher bandwidth (wrapping around 997 // to lowest) 998 const size_t kMinIndex = 0; 999 1000 static ssize_t mCurBandwidthIndex = -1; 1001 1002 size_t index; 1003 if (mCurBandwidthIndex < 0) { 1004 index = kMinIndex; 1005 } else if (uniformRand() < 0.5) { 1006 index = (size_t)mCurBandwidthIndex; 1007 } else { 1008 index = mCurBandwidthIndex + 1; 1009 if (index == mBandwidthItems.size()) { 1010 index = kMinIndex; 1011 } 1012 } 1013 mCurBandwidthIndex = index; 1014#elif 0 1015 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 1016 1017 size_t index = mBandwidthItems.size() - 1; 1018 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 1019 --index; 1020 } 1021#elif 1 1022 char value[PROPERTY_VALUE_MAX]; 1023 size_t index; 1024 if (property_get("media.httplive.bw-index", value, NULL)) { 1025 char *end; 1026 index = strtoul(value, &end, 10); 1027 CHECK(end > value && *end == '\0'); 1028 1029 if (index >= mBandwidthItems.size()) { 1030 index = mBandwidthItems.size() - 1; 1031 } 1032 } else { 1033 index = 0; 1034 } 1035#else 1036 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 1037#endif 1038 1039 CHECK_GE(index, 0); 1040 1041 return index; 1042} 1043 1044status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1045 int64_t timeUs; 1046 CHECK(msg->findInt64("timeUs", &timeUs)); 1047 1048 if (!mReconfigurationInProgress) { 1049 changeConfiguration(timeUs, getBandwidthIndex()); 1050 } 1051 1052 return OK; 1053} 1054 1055status_t LiveSession::getDuration(int64_t *durationUs) const { 1056 int64_t maxDurationUs = 0ll; 1057 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1058 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1059 1060 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 1061 maxDurationUs = fetcherDurationUs; 1062 } 1063 } 1064 1065 *durationUs = maxDurationUs; 1066 1067 return OK; 1068} 1069 1070bool LiveSession::isSeekable() const { 1071 int64_t durationUs; 1072 return getDuration(&durationUs) == OK && durationUs >= 0; 1073} 1074 1075bool LiveSession::hasDynamicDuration() const { 1076 return false; 1077} 1078 1079size_t LiveSession::getTrackCount() const { 1080 if (mPlaylist == NULL) { 1081 return 0; 1082 } else { 1083 return mPlaylist->getTrackCount(); 1084 } 1085} 1086 1087sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1088 if (mPlaylist == NULL) { 1089 return NULL; 1090 } else { 1091 return mPlaylist->getTrackInfo(trackIndex); 1092 } 1093} 1094 1095status_t LiveSession::selectTrack(size_t index, bool select) { 1096 status_t err = mPlaylist->selectTrack(index, select); 1097 if (err == OK) { 1098 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1099 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1100 msg->setInt32("pickTrack", select); 1101 msg->post(); 1102 } 1103 return err; 1104} 1105 1106bool LiveSession::canSwitchUp() { 1107 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1108 status_t err = OK; 1109 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1110 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1111 int64_t dur = source->getBufferedDurationUs(&err); 1112 if (err == OK && dur > 10000000) { 1113 return true; 1114 } 1115 } 1116 return false; 1117} 1118 1119void LiveSession::changeConfiguration( 1120 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1121 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1122 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1123 cancelBandwidthSwitch(); 1124 1125 CHECK(!mReconfigurationInProgress); 1126 mReconfigurationInProgress = true; 1127 1128 mCurBandwidthIndex = bandwidthIndex; 1129 1130 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1131 timeUs, bandwidthIndex, pickTrack); 1132 1133 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1134 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1135 1136 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1137 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1138 1139 AString URIs[kMaxStreams]; 1140 for (size_t i = 0; i < kMaxStreams; ++i) { 1141 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1142 streamMask |= indexToType(i); 1143 } 1144 } 1145 1146 // Step 1, stop and discard fetchers that are no longer needed. 1147 // Pause those that we'll reuse. 1148 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1149 const AString &uri = mFetcherInfos.keyAt(i); 1150 1151 bool discardFetcher = true; 1152 1153 // If we're seeking all current fetchers are discarded. 1154 if (timeUs < 0ll) { 1155 // delay fetcher removal if not picking tracks 1156 discardFetcher = pickTrack; 1157 1158 for (size_t j = 0; j < kMaxStreams; ++j) { 1159 StreamType type = indexToType(j); 1160 if ((streamMask & type) && uri == URIs[j]) { 1161 resumeMask |= type; 1162 streamMask &= ~type; 1163 discardFetcher = false; 1164 } 1165 } 1166 } 1167 1168 if (discardFetcher) { 1169 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1170 } else { 1171 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1172 } 1173 } 1174 1175 sp<AMessage> msg; 1176 if (timeUs < 0ll) { 1177 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1178 msg = new AMessage(kWhatChangeConfiguration3, id()); 1179 } else { 1180 msg = new AMessage(kWhatChangeConfiguration2, id()); 1181 } 1182 msg->setInt32("streamMask", streamMask); 1183 msg->setInt32("resumeMask", resumeMask); 1184 msg->setInt32("pickTrack", pickTrack); 1185 msg->setInt64("timeUs", timeUs); 1186 for (size_t i = 0; i < kMaxStreams; ++i) { 1187 if ((streamMask | resumeMask) & indexToType(i)) { 1188 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1189 } 1190 } 1191 1192 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1193 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1194 // fetchers have completed their asynchronous operation, we'll post 1195 // mContinuation, which then is handled below in onChangeConfiguration2. 1196 mContinuationCounter = mFetcherInfos.size(); 1197 mContinuation = msg; 1198 1199 if (mContinuationCounter == 0) { 1200 msg->post(); 1201 1202 if (mSeekReplyID != 0) { 1203 CHECK(mSeekReply != NULL); 1204 mSeekReply->postReply(mSeekReplyID); 1205 } 1206 } 1207} 1208 1209void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1210 if (!mReconfigurationInProgress) { 1211 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1212 msg->findInt32("pickTrack", &pickTrack); 1213 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1214 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1215 } else { 1216 msg->post(1000000ll); // retry in 1 sec 1217 } 1218} 1219 1220void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1221 mContinuation.clear(); 1222 1223 // All fetchers are either suspended or have been removed now. 1224 1225 uint32_t streamMask, resumeMask; 1226 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1227 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1228 1229 // currently onChangeConfiguration2 is only called for seeking; 1230 // remove the following CHECK if using it else where. 1231 CHECK_EQ(resumeMask, 0); 1232 streamMask |= resumeMask; 1233 1234 AString URIs[kMaxStreams]; 1235 for (size_t i = 0; i < kMaxStreams; ++i) { 1236 if (streamMask & indexToType(i)) { 1237 const AString &uriKey = mStreams[i].uriKey(); 1238 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1239 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1240 } 1241 } 1242 1243 // Determine which decoders to shutdown on the player side, 1244 // a decoder has to be shutdown if either 1245 // 1) its streamtype was active before but now longer isn't. 1246 // or 1247 // 2) its streamtype was already active and still is but the URI 1248 // has changed. 1249 uint32_t changedMask = 0; 1250 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1251 if (((mStreamMask & streamMask & indexToType(i)) 1252 && !(URIs[i] == mStreams[i].mUri)) 1253 || (mStreamMask & ~streamMask & indexToType(i))) { 1254 changedMask |= indexToType(i); 1255 } 1256 } 1257 1258 if (changedMask == 0) { 1259 // If nothing changed as far as the audio/video decoders 1260 // are concerned we can proceed. 1261 onChangeConfiguration3(msg); 1262 return; 1263 } 1264 1265 // Something changed, inform the player which will shutdown the 1266 // corresponding decoders and will post the reply once that's done. 1267 // Handling the reply will continue executing below in 1268 // onChangeConfiguration3. 1269 sp<AMessage> notify = mNotify->dup(); 1270 notify->setInt32("what", kWhatStreamsChanged); 1271 notify->setInt32("changedMask", changedMask); 1272 1273 msg->setWhat(kWhatChangeConfiguration3); 1274 msg->setTarget(id()); 1275 1276 notify->setMessage("reply", msg); 1277 notify->post(); 1278} 1279 1280void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1281 mContinuation.clear(); 1282 // All remaining fetchers are still suspended, the player has shutdown 1283 // any decoders that needed it. 1284 1285 uint32_t streamMask, resumeMask; 1286 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1287 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1288 1289 int64_t timeUs; 1290 int32_t pickTrack; 1291 bool switching = false; 1292 CHECK(msg->findInt64("timeUs", &timeUs)); 1293 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1294 1295 if (timeUs < 0ll) { 1296 if (!pickTrack) { 1297 switching = true; 1298 } 1299 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1300 } else { 1301 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1302 } 1303 1304 for (size_t i = 0; i < kMaxStreams; ++i) { 1305 if (streamMask & indexToType(i)) { 1306 if (switching) { 1307 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri)); 1308 } else { 1309 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1310 } 1311 } 1312 } 1313 1314 mNewStreamMask = streamMask | resumeMask; 1315 if (switching) { 1316 mSwapMask = mStreamMask & ~resumeMask; 1317 } 1318 1319 // Of all existing fetchers: 1320 // * Resume fetchers that are still needed and assign them original packet sources. 1321 // * Mark otherwise unneeded fetchers for removal. 1322 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1323 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1324 const AString &uri = mFetcherInfos.keyAt(i); 1325 1326 sp<AnotherPacketSource> sources[kMaxStreams]; 1327 for (size_t j = 0; j < kMaxStreams; ++j) { 1328 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1329 sources[j] = mPacketSources.valueFor(indexToType(j)); 1330 1331 if (j != kSubtitleIndex) { 1332 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1333 sp<AnotherPacketSource> discontinuityQueue; 1334 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1335 discontinuityQueue->queueDiscontinuity( 1336 ATSParser::DISCONTINUITY_NONE, 1337 NULL, 1338 true); 1339 } 1340 } 1341 } 1342 1343 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1344 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1345 || sources[kSubtitleIndex] != NULL) { 1346 info.mFetcher->startAsync( 1347 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1348 } else { 1349 info.mToBeRemoved = true; 1350 } 1351 } 1352 1353 // streamMask now only contains the types that need a new fetcher created. 1354 1355 if (streamMask != 0) { 1356 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1357 } 1358 1359 // Find out when the original fetchers have buffered up to and start the new fetchers 1360 // at a later timestamp. 1361 for (size_t i = 0; i < kMaxStreams; i++) { 1362 if (!(indexToType(i) & streamMask)) { 1363 continue; 1364 } 1365 1366 AString uri; 1367 uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri; 1368 1369 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1370 CHECK(fetcher != NULL); 1371 1372 int32_t latestSeq = -1; 1373 int64_t startTimeUs = -1; 1374 int64_t segmentStartTimeUs = -1ll; 1375 int32_t discontinuitySeq = -1; 1376 sp<AnotherPacketSource> sources[kMaxStreams]; 1377 1378 // TRICKY: looping from i as earlier streams are already removed from streamMask 1379 for (size_t j = i; j < kMaxStreams; ++j) { 1380 const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri; 1381 if ((streamMask & indexToType(j)) && uri == streamUri) { 1382 sources[j] = mPacketSources.valueFor(indexToType(j)); 1383 1384 if (timeUs >= 0) { 1385 sources[j]->clear(); 1386 startTimeUs = timeUs; 1387 1388 sp<AnotherPacketSource> discontinuityQueue; 1389 sp<AMessage> extra = new AMessage; 1390 extra->setInt64("timeUs", timeUs); 1391 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1392 discontinuityQueue->queueDiscontinuity( 1393 ATSParser::DISCONTINUITY_SEEK, extra, true); 1394 } else { 1395 int32_t type; 1396 int64_t srcSegmentStartTimeUs; 1397 sp<AMessage> meta; 1398 if (pickTrack) { 1399 // selecting 1400 meta = sources[j]->getLatestDequeuedMeta(); 1401 } else { 1402 // adapting 1403 meta = sources[j]->getLatestEnqueuedMeta(); 1404 } 1405 1406 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1407 int64_t tmpUs; 1408 CHECK(meta->findInt64("timeUs", &tmpUs)); 1409 if (startTimeUs < 0 || tmpUs < startTimeUs) { 1410 startTimeUs = tmpUs; 1411 } 1412 1413 CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); 1414 if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { 1415 segmentStartTimeUs = tmpUs; 1416 } 1417 1418 int32_t seq; 1419 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1420 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1421 discontinuitySeq = seq; 1422 } 1423 } 1424 1425 if (pickTrack) { 1426 // selecting track, queue discontinuities before content 1427 sources[j]->clear(); 1428 if (j == kSubtitleIndex) { 1429 break; 1430 } 1431 sp<AnotherPacketSource> discontinuityQueue; 1432 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1433 discontinuityQueue->queueDiscontinuity( 1434 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1435 } else { 1436 // adapting, queue discontinuities after resume 1437 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1438 sources[j]->clear(); 1439 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1440 if (extraStreams & indexToType(j)) { 1441 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1442 } 1443 } 1444 } 1445 1446 streamMask &= ~indexToType(j); 1447 } 1448 } 1449 1450 fetcher->startAsync( 1451 sources[kAudioIndex], 1452 sources[kVideoIndex], 1453 sources[kSubtitleIndex], 1454 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1455 segmentStartTimeUs, 1456 discontinuitySeq, 1457 switching); 1458 } 1459 1460 // All fetchers have now been started, the configuration change 1461 // has completed. 1462 1463 cancelCheckBandwidthEvent(); 1464 scheduleCheckBandwidthEvent(); 1465 1466 ALOGV("XXX configuration change completed."); 1467 mReconfigurationInProgress = false; 1468 if (switching) { 1469 mSwitchInProgress = true; 1470 } else { 1471 mStreamMask = mNewStreamMask; 1472 } 1473 1474 if (mDisconnectReplyID != 0) { 1475 finishDisconnect(); 1476 } 1477} 1478 1479void LiveSession::onSwapped(const sp<AMessage> &msg) { 1480 int32_t switchGeneration; 1481 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1482 if (switchGeneration != mSwitchGeneration) { 1483 return; 1484 } 1485 1486 int32_t stream; 1487 CHECK(msg->findInt32("stream", &stream)); 1488 1489 ssize_t idx = typeToIndex(stream); 1490 CHECK(idx >= 0); 1491 if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) { 1492 ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str()); 1493 } 1494 mStreams[idx].mUri = mStreams[idx].mNewUri; 1495 mStreams[idx].mNewUri.clear(); 1496 1497 mSwapMask &= ~stream; 1498 if (mSwapMask != 0) { 1499 return; 1500 } 1501 1502 // Check if new variant contains extra streams. 1503 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1504 while (extraStreams) { 1505 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1506 swapPacketSource(extraStream); 1507 extraStreams &= ~extraStream; 1508 1509 idx = typeToIndex(extraStream); 1510 CHECK(idx >= 0); 1511 if (mStreams[idx].mNewUri.empty()) { 1512 ALOGW("swapping extra stream type %d %s to empty stream", 1513 extraStream, mStreams[idx].mUri.c_str()); 1514 } 1515 mStreams[idx].mUri = mStreams[idx].mNewUri; 1516 mStreams[idx].mNewUri.clear(); 1517 } 1518 1519 tryToFinishBandwidthSwitch(); 1520} 1521 1522void LiveSession::onCheckSwitchDown() { 1523 if (mSwitchDownMonitor == NULL) { 1524 return; 1525 } 1526 1527 for (size_t i = 0; i < kMaxStreams; ++i) { 1528 int32_t targetDuration; 1529 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1530 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1531 1532 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1533 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1534 int64_t targetDurationUs = targetDuration * 1000000ll; 1535 1536 if (bufferedDurationUs < targetDurationUs / 3) { 1537 (new AMessage(kWhatSwitchDown, id()))->post(); 1538 break; 1539 } 1540 } 1541 } 1542 1543 mSwitchDownMonitor->post(1000000ll); 1544} 1545 1546void LiveSession::onSwitchDown() { 1547 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1548 return; 1549 } 1550 1551 ssize_t bandwidthIndex = getBandwidthIndex(); 1552 if (bandwidthIndex < mCurBandwidthIndex) { 1553 changeConfiguration(-1, bandwidthIndex, false); 1554 return; 1555 } 1556 1557 changeConfiguration(-1, mCurBandwidthIndex - 1, false); 1558} 1559 1560// Mark switch done when: 1561// 1. all old buffers are swapped out 1562void LiveSession::tryToFinishBandwidthSwitch() { 1563 if (!mSwitchInProgress) { 1564 return; 1565 } 1566 1567 bool needToRemoveFetchers = false; 1568 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1569 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1570 needToRemoveFetchers = true; 1571 break; 1572 } 1573 } 1574 1575 if (!needToRemoveFetchers && mSwapMask == 0) { 1576 ALOGI("mSwitchInProgress = false"); 1577 mStreamMask = mNewStreamMask; 1578 mSwitchInProgress = false; 1579 } 1580} 1581 1582void LiveSession::scheduleCheckBandwidthEvent() { 1583 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1584 msg->setInt32("generation", mCheckBandwidthGeneration); 1585 msg->post(10000000ll); 1586} 1587 1588void LiveSession::cancelCheckBandwidthEvent() { 1589 ++mCheckBandwidthGeneration; 1590} 1591 1592void LiveSession::cancelBandwidthSwitch() { 1593 Mutex::Autolock lock(mSwapMutex); 1594 mSwitchGeneration++; 1595 mSwitchInProgress = false; 1596 mSwapMask = 0; 1597 1598 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1599 FetcherInfo& info = mFetcherInfos.editValueAt(i); 1600 if (info.mToBeRemoved) { 1601 info.mToBeRemoved = false; 1602 } 1603 } 1604 1605 for (size_t i = 0; i < kMaxStreams; ++i) { 1606 if (!mStreams[i].mNewUri.empty()) { 1607 ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri); 1608 if (j < 0) { 1609 mStreams[i].mNewUri.clear(); 1610 continue; 1611 } 1612 1613 const FetcherInfo &info = mFetcherInfos.valueAt(j); 1614 info.mFetcher->stopAsync(); 1615 mFetcherInfos.removeItemsAt(j); 1616 mStreams[i].mNewUri.clear(); 1617 } 1618 } 1619} 1620 1621bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1622 if (mReconfigurationInProgress || mSwitchInProgress) { 1623 return false; 1624 } 1625 1626 if (mCurBandwidthIndex < 0) { 1627 return true; 1628 } 1629 1630 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1631 return false; 1632 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1633 return canSwitchUp(); 1634 } else { 1635 return true; 1636 } 1637} 1638 1639void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { 1640 size_t bandwidthIndex = getBandwidthIndex(); 1641 if (canSwitchBandwidthTo(bandwidthIndex)) { 1642 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1643 } else { 1644 // Come back and check again 10 seconds later in case there is nothing to do now. 1645 // If we DO change configuration, once that completes it'll schedule a new 1646 // check bandwidth event with an incremented mCheckBandwidthGeneration. 1647 msg->post(10000000ll); 1648 } 1649} 1650 1651void LiveSession::postPrepared(status_t err) { 1652 CHECK(mInPreparationPhase); 1653 1654 sp<AMessage> notify = mNotify->dup(); 1655 if (err == OK || err == ERROR_END_OF_STREAM) { 1656 notify->setInt32("what", kWhatPrepared); 1657 } else { 1658 notify->setInt32("what", kWhatPreparationFailed); 1659 notify->setInt32("err", err); 1660 } 1661 1662 notify->post(); 1663 1664 mInPreparationPhase = false; 1665 1666 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1667 mSwitchDownMonitor->post(); 1668} 1669 1670} // namespace android 1671 1672