LiveSession.cpp revision 0ad776d2e4c6b4968d9dcd9bf34b962366b312a9
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52LiveSession::LiveSession( 53 const sp<AMessage> ¬ify, uint32_t flags, 54 const sp<IMediaHTTPService> &httpService) 55 : mNotify(notify), 56 mFlags(flags), 57 mHTTPService(httpService), 58 mInPreparationPhase(true), 59 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 60 mCurBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mLastDequeuedTimeUs(0ll), 67 mRealTimeBaseUs(0ll), 68 mReconfigurationInProgress(false), 69 mSwitchInProgress(false), 70 mDisconnectReplyID(0), 71 mSeekReplyID(0), 72 mFirstTimeUsValid(false), 73 mFirstTimeUs(0), 74 mLastSeekTimeUs(0) { 75 76 mStreams[kAudioIndex] = StreamItem("audio"); 77 mStreams[kVideoIndex] = StreamItem("video"); 78 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 79 80 for (size_t i = 0; i < kMaxStreams; ++i) { 81 mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 82 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 84 } 85} 86 87LiveSession::~LiveSession() { 88} 89 90sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 91 ABuffer *discontinuity = new ABuffer(0); 92 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 93 discontinuity->meta()->setInt32("swapPacketSource", swap); 94 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 95 discontinuity->meta()->setInt64("timeUs", -1); 96 return discontinuity; 97} 98 99void LiveSession::swapPacketSource(StreamType stream) { 100 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 101 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 102 sp<AnotherPacketSource> tmp = aps; 103 aps = aps2; 104 aps2 = tmp; 105 aps2->clear(); 106} 107 108status_t LiveSession::dequeueAccessUnit( 109 StreamType stream, sp<ABuffer> *accessUnit) { 110 if (!(mStreamMask & stream)) { 111 // return -EWOULDBLOCK to avoid halting the decoder 112 // when switching between audio/video and audio only. 113 return -EWOULDBLOCK; 114 } 115 116 status_t finalResult; 117 sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); 118 if (discontinuityQueue->hasBufferAvailable(&finalResult)) { 119 discontinuityQueue->dequeueAccessUnit(accessUnit); 120 // seeking, track switching 121 sp<AMessage> extra; 122 int64_t timeUs; 123 if ((*accessUnit)->meta()->findMessage("extra", &extra) 124 && extra != NULL 125 && extra->findInt64("timeUs", &timeUs)) { 126 // seeking only 127 mLastSeekTimeUs = timeUs; 128 mDiscontinuityOffsetTimesUs.clear(); 129 mDiscontinuityAbsStartTimesUs.clear(); 130 } 131 return INFO_DISCONTINUITY; 132 } 133 134 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 135 136 if (!packetSource->hasBufferAvailable(&finalResult)) { 137 return finalResult == OK ? -EAGAIN : finalResult; 138 } 139 140 // wait for counterpart 141 sp<AnotherPacketSource> otherSource; 142 if (stream == STREAMTYPE_AUDIO && (mStreamMask & STREAMTYPE_VIDEO)) { 143 otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 144 } else if (stream == STREAMTYPE_VIDEO && (mStreamMask & STREAMTYPE_AUDIO)) { 145 otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 146 } 147 if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) { 148 return finalResult == OK ? -EAGAIN : finalResult; 149 } 150 151 status_t err = packetSource->dequeueAccessUnit(accessUnit); 152 153 size_t streamIdx; 154 const char *streamStr; 155 switch (stream) { 156 case STREAMTYPE_AUDIO: 157 streamIdx = kAudioIndex; 158 streamStr = "audio"; 159 break; 160 case STREAMTYPE_VIDEO: 161 streamIdx = kVideoIndex; 162 streamStr = "video"; 163 break; 164 case STREAMTYPE_SUBTITLES: 165 streamIdx = kSubtitleIndex; 166 streamStr = "subs"; 167 break; 168 default: 169 TRESPASS(); 170 } 171 172 StreamItem& strm = mStreams[streamIdx]; 173 if (err == INFO_DISCONTINUITY) { 174 // adaptive streaming, discontinuities in the playlist 175 int32_t type; 176 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 177 178 sp<AMessage> extra; 179 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 180 extra.clear(); 181 } 182 183 ALOGI("[%s] read discontinuity of type %d, extra = %s", 184 streamStr, 185 type, 186 extra == NULL ? "NULL" : extra->debugString().c_str()); 187 188 int32_t swap; 189 if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { 190 int32_t switchGeneration; 191 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 192 { 193 Mutex::Autolock lock(mSwapMutex); 194 if (switchGeneration == mSwitchGeneration) { 195 swapPacketSource(stream); 196 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 197 msg->setInt32("stream", stream); 198 msg->setInt32("switchGeneration", switchGeneration); 199 msg->post(); 200 } 201 } 202 } else { 203 size_t seq = strm.mCurDiscontinuitySeq; 204 int64_t offsetTimeUs; 205 if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) { 206 offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq); 207 } else { 208 offsetTimeUs = 0; 209 } 210 211 seq += 1; 212 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 213 int64_t firstTimeUs; 214 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 215 offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs; 216 offsetTimeUs += strm.mLastSampleDurationUs; 217 } else { 218 offsetTimeUs += strm.mLastSampleDurationUs; 219 } 220 221 mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs); 222 } 223 } else if (err == OK) { 224 225 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 226 int64_t timeUs; 227 int32_t discontinuitySeq = 0; 228 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 229 (*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq); 230 strm.mCurDiscontinuitySeq = discontinuitySeq; 231 232 int32_t discard = 0; 233 int64_t firstTimeUs; 234 if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) { 235 int64_t durUs; // approximate sample duration 236 if (timeUs > strm.mLastDequeuedTimeUs) { 237 durUs = timeUs - strm.mLastDequeuedTimeUs; 238 } else { 239 durUs = strm.mLastDequeuedTimeUs - timeUs; 240 } 241 strm.mLastSampleDurationUs = durUs; 242 firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq); 243 } else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) { 244 firstTimeUs = timeUs; 245 } else { 246 mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs); 247 firstTimeUs = timeUs; 248 } 249 250 strm.mLastDequeuedTimeUs = timeUs; 251 if (timeUs >= firstTimeUs) { 252 timeUs -= firstTimeUs; 253 } else { 254 timeUs = 0; 255 } 256 timeUs += mLastSeekTimeUs; 257 if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) { 258 timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq); 259 } 260 261 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 262 (*accessUnit)->meta()->setInt64("timeUs", timeUs); 263 mLastDequeuedTimeUs = timeUs; 264 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 265 } else if (stream == STREAMTYPE_SUBTITLES) { 266 (*accessUnit)->meta()->setInt32( 267 "trackIndex", mPlaylist->getSelectedIndex()); 268 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 269 } 270 } else { 271 ALOGI("[%s] encountered error %d", streamStr, err); 272 } 273 274 return err; 275} 276 277status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 278 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 279 if (!(mStreamMask & stream)) { 280 return UNKNOWN_ERROR; 281 } 282 283 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 284 285 sp<MetaData> meta = packetSource->getFormat(); 286 287 if (meta == NULL) { 288 return -EAGAIN; 289 } 290 291 return convertMetaDataToMessage(meta, format); 292} 293 294void LiveSession::connectAsync( 295 const char *url, const KeyedVector<String8, String8> *headers) { 296 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 297 msg->setString("url", url); 298 299 if (headers != NULL) { 300 msg->setPointer( 301 "headers", 302 new KeyedVector<String8, String8>(*headers)); 303 } 304 305 msg->post(); 306} 307 308status_t LiveSession::disconnect() { 309 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 310 311 sp<AMessage> response; 312 status_t err = msg->postAndAwaitResponse(&response); 313 314 return err; 315} 316 317status_t LiveSession::seekTo(int64_t timeUs) { 318 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 319 msg->setInt64("timeUs", timeUs); 320 321 sp<AMessage> response; 322 status_t err = msg->postAndAwaitResponse(&response); 323 324 uint32_t replyID; 325 CHECK(response == mSeekReply && 0 != mSeekReplyID); 326 mSeekReply.clear(); 327 mSeekReplyID = 0; 328 return err; 329} 330 331void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 332 switch (msg->what()) { 333 case kWhatConnect: 334 { 335 onConnect(msg); 336 break; 337 } 338 339 case kWhatDisconnect: 340 { 341 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 342 343 if (mReconfigurationInProgress) { 344 break; 345 } 346 347 finishDisconnect(); 348 break; 349 } 350 351 case kWhatSeek: 352 { 353 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 354 355 status_t err = onSeek(msg); 356 357 mSeekReply = new AMessage; 358 mSeekReply->setInt32("err", err); 359 break; 360 } 361 362 case kWhatFetcherNotify: 363 { 364 int32_t what; 365 CHECK(msg->findInt32("what", &what)); 366 367 switch (what) { 368 case PlaylistFetcher::kWhatStarted: 369 break; 370 case PlaylistFetcher::kWhatPaused: 371 case PlaylistFetcher::kWhatStopped: 372 { 373 if (what == PlaylistFetcher::kWhatStopped) { 374 AString uri; 375 CHECK(msg->findString("uri", &uri)); 376 if (mFetcherInfos.removeItem(uri) < 0) { 377 // ignore duplicated kWhatStopped messages. 378 break; 379 } 380 381 if (mSwitchInProgress) { 382 tryToFinishBandwidthSwitch(); 383 } 384 } 385 386 if (mContinuation != NULL) { 387 CHECK_GT(mContinuationCounter, 0); 388 if (--mContinuationCounter == 0) { 389 mContinuation->post(); 390 391 if (mSeekReplyID != 0) { 392 CHECK(mSeekReply != NULL); 393 mSeekReply->postReply(mSeekReplyID); 394 } 395 } 396 } 397 break; 398 } 399 400 case PlaylistFetcher::kWhatDurationUpdate: 401 { 402 AString uri; 403 CHECK(msg->findString("uri", &uri)); 404 405 int64_t durationUs; 406 CHECK(msg->findInt64("durationUs", &durationUs)); 407 408 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 409 info->mDurationUs = durationUs; 410 break; 411 } 412 413 case PlaylistFetcher::kWhatError: 414 { 415 status_t err; 416 CHECK(msg->findInt32("err", &err)); 417 418 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 419 420 if (mInPreparationPhase) { 421 postPrepared(err); 422 } 423 424 cancelBandwidthSwitch(); 425 426 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 427 428 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 429 430 mPacketSources.valueFor( 431 STREAMTYPE_SUBTITLES)->signalEOS(err); 432 433 sp<AMessage> notify = mNotify->dup(); 434 notify->setInt32("what", kWhatError); 435 notify->setInt32("err", err); 436 notify->post(); 437 break; 438 } 439 440 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 441 { 442 AString uri; 443 CHECK(msg->findString("uri", &uri)); 444 445 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 446 info->mIsPrepared = true; 447 448 if (mInPreparationPhase) { 449 bool allFetchersPrepared = true; 450 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 451 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 452 allFetchersPrepared = false; 453 break; 454 } 455 } 456 457 if (allFetchersPrepared) { 458 postPrepared(OK); 459 } 460 } 461 break; 462 } 463 464 case PlaylistFetcher::kWhatStartedAt: 465 { 466 int32_t switchGeneration; 467 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 468 469 if (switchGeneration != mSwitchGeneration) { 470 break; 471 } 472 473 // Resume fetcher for the original variant; the resumed fetcher should 474 // continue until the timestamps found in msg, which is stored by the 475 // new fetcher to indicate where the new variant has started buffering. 476 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 477 const FetcherInfo info = mFetcherInfos.valueAt(i); 478 if (info.mToBeRemoved) { 479 info.mFetcher->resumeUntilAsync(msg); 480 } 481 } 482 break; 483 } 484 485 default: 486 TRESPASS(); 487 } 488 489 break; 490 } 491 492 case kWhatCheckBandwidth: 493 { 494 int32_t generation; 495 CHECK(msg->findInt32("generation", &generation)); 496 497 if (generation != mCheckBandwidthGeneration) { 498 break; 499 } 500 501 onCheckBandwidth(); 502 break; 503 } 504 505 case kWhatChangeConfiguration: 506 { 507 onChangeConfiguration(msg); 508 break; 509 } 510 511 case kWhatChangeConfiguration2: 512 { 513 onChangeConfiguration2(msg); 514 break; 515 } 516 517 case kWhatChangeConfiguration3: 518 { 519 onChangeConfiguration3(msg); 520 break; 521 } 522 523 case kWhatFinishDisconnect2: 524 { 525 onFinishDisconnect2(); 526 break; 527 } 528 529 case kWhatSwapped: 530 { 531 onSwapped(msg); 532 break; 533 } 534 535 case kWhatCheckSwitchDown: 536 { 537 onCheckSwitchDown(); 538 break; 539 } 540 541 case kWhatSwitchDown: 542 { 543 onSwitchDown(); 544 break; 545 } 546 547 default: 548 TRESPASS(); 549 break; 550 } 551} 552 553// static 554int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 555 if (a->mBandwidth < b->mBandwidth) { 556 return -1; 557 } else if (a->mBandwidth == b->mBandwidth) { 558 return 0; 559 } 560 561 return 1; 562} 563 564// static 565LiveSession::StreamType LiveSession::indexToType(int idx) { 566 CHECK(idx >= 0 && idx < kMaxStreams); 567 return (StreamType)(1 << idx); 568} 569 570void LiveSession::onConnect(const sp<AMessage> &msg) { 571 AString url; 572 CHECK(msg->findString("url", &url)); 573 574 KeyedVector<String8, String8> *headers = NULL; 575 if (!msg->findPointer("headers", (void **)&headers)) { 576 mExtraHeaders.clear(); 577 } else { 578 mExtraHeaders = *headers; 579 580 delete headers; 581 headers = NULL; 582 } 583 584 // TODO currently we don't know if we are coming here from incognito mode 585 ALOGI("onConnect %s", uriDebugString(url).c_str()); 586 587 mMasterURL = url; 588 589 bool dummy; 590 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 591 592 if (mPlaylist == NULL) { 593 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 594 595 postPrepared(ERROR_IO); 596 return; 597 } 598 599 // We trust the content provider to make a reasonable choice of preferred 600 // initial bandwidth by listing it first in the variant playlist. 601 // At startup we really don't have a good estimate on the available 602 // network bandwidth since we haven't tranferred any data yet. Once 603 // we have we can make a better informed choice. 604 size_t initialBandwidth = 0; 605 size_t initialBandwidthIndex = 0; 606 607 if (mPlaylist->isVariantPlaylist()) { 608 for (size_t i = 0; i < mPlaylist->size(); ++i) { 609 BandwidthItem item; 610 611 item.mPlaylistIndex = i; 612 613 sp<AMessage> meta; 614 AString uri; 615 mPlaylist->itemAt(i, &uri, &meta); 616 617 unsigned long bandwidth; 618 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 619 620 if (initialBandwidth == 0) { 621 initialBandwidth = item.mBandwidth; 622 } 623 624 mBandwidthItems.push(item); 625 } 626 627 CHECK_GT(mBandwidthItems.size(), 0u); 628 629 mBandwidthItems.sort(SortByBandwidth); 630 631 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 632 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 633 initialBandwidthIndex = i; 634 break; 635 } 636 } 637 } else { 638 // dummy item. 639 BandwidthItem item; 640 item.mPlaylistIndex = 0; 641 item.mBandwidth = 0; 642 mBandwidthItems.push(item); 643 } 644 645 mPlaylist->pickRandomMediaItems(); 646 changeConfiguration( 647 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); 648} 649 650void LiveSession::finishDisconnect() { 651 // No reconfiguration is currently pending, make sure none will trigger 652 // during disconnection either. 653 cancelCheckBandwidthEvent(); 654 655 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 656 // (finishDisconnect, onFinishDisconnect2) 657 cancelBandwidthSwitch(); 658 659 // cancel switch down monitor 660 mSwitchDownMonitor.clear(); 661 662 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 663 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 664 } 665 666 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 667 668 mContinuationCounter = mFetcherInfos.size(); 669 mContinuation = msg; 670 671 if (mContinuationCounter == 0) { 672 msg->post(); 673 } 674} 675 676void LiveSession::onFinishDisconnect2() { 677 mContinuation.clear(); 678 679 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 680 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 681 682 mPacketSources.valueFor( 683 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 684 685 sp<AMessage> response = new AMessage; 686 response->setInt32("err", OK); 687 688 response->postReply(mDisconnectReplyID); 689 mDisconnectReplyID = 0; 690} 691 692sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 693 ssize_t index = mFetcherInfos.indexOfKey(uri); 694 695 if (index >= 0) { 696 return NULL; 697 } 698 699 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 700 notify->setString("uri", uri); 701 notify->setInt32("switchGeneration", mSwitchGeneration); 702 703 FetcherInfo info; 704 info.mFetcher = new PlaylistFetcher(notify, this, uri); 705 info.mDurationUs = -1ll; 706 info.mIsPrepared = false; 707 info.mToBeRemoved = false; 708 looper()->registerHandler(info.mFetcher); 709 710 mFetcherInfos.add(uri, info); 711 712 return info.mFetcher; 713} 714 715/* 716 * Illustration of parameters: 717 * 718 * 0 `range_offset` 719 * +------------+-------------------------------------------------------+--+--+ 720 * | | | next block to fetch | | | 721 * | | `source` handle => `out` buffer | | | | 722 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 723 * | |<----------- `range_length` / buffer capacity ----------->| | 724 * |<------------------------------ file_size ------------------------------->| 725 * 726 * Special parameter values: 727 * - range_length == -1 means entire file 728 * - block_size == 0 means entire range 729 * 730 */ 731ssize_t LiveSession::fetchFile( 732 const char *url, sp<ABuffer> *out, 733 int64_t range_offset, int64_t range_length, 734 uint32_t block_size, /* download block size */ 735 sp<DataSource> *source, /* to return and reuse source */ 736 String8 *actualUrl) { 737 off64_t size; 738 sp<DataSource> temp_source; 739 if (source == NULL) { 740 source = &temp_source; 741 } 742 743 if (*source == NULL) { 744 if (!strncasecmp(url, "file://", 7)) { 745 *source = new FileSource(url + 7); 746 } else if (strncasecmp(url, "http://", 7) 747 && strncasecmp(url, "https://", 8)) { 748 return ERROR_UNSUPPORTED; 749 } else { 750 KeyedVector<String8, String8> headers = mExtraHeaders; 751 if (range_offset > 0 || range_length >= 0) { 752 headers.add( 753 String8("Range"), 754 String8( 755 StringPrintf( 756 "bytes=%lld-%s", 757 range_offset, 758 range_length < 0 759 ? "" : StringPrintf("%lld", 760 range_offset + range_length - 1).c_str()).c_str())); 761 } 762 status_t err = mHTTPDataSource->connect(url, &headers); 763 764 if (err != OK) { 765 return err; 766 } 767 768 *source = mHTTPDataSource; 769 } 770 } 771 772 status_t getSizeErr = (*source)->getSize(&size); 773 if (getSizeErr != OK) { 774 size = 65536; 775 } 776 777 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 778 if (*out == NULL) { 779 buffer->setRange(0, 0); 780 } 781 782 ssize_t bytesRead = 0; 783 // adjust range_length if only reading partial block 784 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 785 range_length = buffer->size() + block_size; 786 } 787 for (;;) { 788 // Only resize when we don't know the size. 789 size_t bufferRemaining = buffer->capacity() - buffer->size(); 790 if (bufferRemaining == 0 && getSizeErr != OK) { 791 bufferRemaining = 32768; 792 793 ALOGV("increasing download buffer to %zu bytes", 794 buffer->size() + bufferRemaining); 795 796 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 797 memcpy(copy->data(), buffer->data(), buffer->size()); 798 copy->setRange(0, buffer->size()); 799 800 buffer = copy; 801 } 802 803 size_t maxBytesToRead = bufferRemaining; 804 if (range_length >= 0) { 805 int64_t bytesLeftInRange = range_length - buffer->size(); 806 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 807 maxBytesToRead = bytesLeftInRange; 808 809 if (bytesLeftInRange == 0) { 810 break; 811 } 812 } 813 } 814 815 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 816 // to help us break out of the loop. 817 ssize_t n = (*source)->readAt( 818 buffer->size(), buffer->data() + buffer->size(), 819 maxBytesToRead); 820 821 if (n < 0) { 822 return n; 823 } 824 825 if (n == 0) { 826 break; 827 } 828 829 buffer->setRange(0, buffer->size() + (size_t)n); 830 bytesRead += n; 831 } 832 833 *out = buffer; 834 if (actualUrl != NULL) { 835 *actualUrl = (*source)->getUri(); 836 if (actualUrl->isEmpty()) { 837 *actualUrl = url; 838 } 839 } 840 841 return bytesRead; 842} 843 844sp<M3UParser> LiveSession::fetchPlaylist( 845 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 846 ALOGV("fetchPlaylist '%s'", url); 847 848 *unchanged = false; 849 850 sp<ABuffer> buffer; 851 String8 actualUrl; 852 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 853 854 if (err <= 0) { 855 return NULL; 856 } 857 858 // MD5 functionality is not available on the simulator, treat all 859 // playlists as changed. 860 861#if defined(HAVE_ANDROID_OS) 862 uint8_t hash[16]; 863 864 MD5_CTX m; 865 MD5_Init(&m); 866 MD5_Update(&m, buffer->data(), buffer->size()); 867 868 MD5_Final(hash, &m); 869 870 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 871 // playlist unchanged 872 *unchanged = true; 873 874 return NULL; 875 } 876 877 if (curPlaylistHash != NULL) { 878 memcpy(curPlaylistHash, hash, sizeof(hash)); 879 } 880#endif 881 882 sp<M3UParser> playlist = 883 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 884 885 if (playlist->initCheck() != OK) { 886 ALOGE("failed to parse .m3u8 playlist"); 887 888 return NULL; 889 } 890 891 return playlist; 892} 893 894static double uniformRand() { 895 return (double)rand() / RAND_MAX; 896} 897 898size_t LiveSession::getBandwidthIndex() { 899 if (mBandwidthItems.size() == 0) { 900 return 0; 901 } 902 903#if 1 904 char value[PROPERTY_VALUE_MAX]; 905 ssize_t index = -1; 906 if (property_get("media.httplive.bw-index", value, NULL)) { 907 char *end; 908 index = strtol(value, &end, 10); 909 CHECK(end > value && *end == '\0'); 910 911 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 912 index = mBandwidthItems.size() - 1; 913 } 914 } 915 916 if (index < 0) { 917 int32_t bandwidthBps; 918 if (mHTTPDataSource != NULL 919 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 920 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 921 } else { 922 ALOGV("no bandwidth estimate."); 923 return 0; // Pick the lowest bandwidth stream by default. 924 } 925 926 char value[PROPERTY_VALUE_MAX]; 927 if (property_get("media.httplive.max-bw", value, NULL)) { 928 char *end; 929 long maxBw = strtoul(value, &end, 10); 930 if (end > value && *end == '\0') { 931 if (maxBw > 0 && bandwidthBps > maxBw) { 932 ALOGV("bandwidth capped to %ld bps", maxBw); 933 bandwidthBps = maxBw; 934 } 935 } 936 } 937 938 // Consider only 80% of the available bandwidth usable. 939 bandwidthBps = (bandwidthBps * 8) / 10; 940 941 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 942 943 index = mBandwidthItems.size() - 1; 944 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 945 > (size_t)bandwidthBps) { 946 --index; 947 } 948 } 949#elif 0 950 // Change bandwidth at random() 951 size_t index = uniformRand() * mBandwidthItems.size(); 952#elif 0 953 // There's a 50% chance to stay on the current bandwidth and 954 // a 50% chance to switch to the next higher bandwidth (wrapping around 955 // to lowest) 956 const size_t kMinIndex = 0; 957 958 static ssize_t mCurBandwidthIndex = -1; 959 960 size_t index; 961 if (mCurBandwidthIndex < 0) { 962 index = kMinIndex; 963 } else if (uniformRand() < 0.5) { 964 index = (size_t)mCurBandwidthIndex; 965 } else { 966 index = mCurBandwidthIndex + 1; 967 if (index == mBandwidthItems.size()) { 968 index = kMinIndex; 969 } 970 } 971 mCurBandwidthIndex = index; 972#elif 0 973 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 974 975 size_t index = mBandwidthItems.size() - 1; 976 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 977 --index; 978 } 979#elif 1 980 char value[PROPERTY_VALUE_MAX]; 981 size_t index; 982 if (property_get("media.httplive.bw-index", value, NULL)) { 983 char *end; 984 index = strtoul(value, &end, 10); 985 CHECK(end > value && *end == '\0'); 986 987 if (index >= mBandwidthItems.size()) { 988 index = mBandwidthItems.size() - 1; 989 } 990 } else { 991 index = 0; 992 } 993#else 994 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 995#endif 996 997 CHECK_GE(index, 0); 998 999 return index; 1000} 1001 1002status_t LiveSession::onSeek(const sp<AMessage> &msg) { 1003 int64_t timeUs; 1004 CHECK(msg->findInt64("timeUs", &timeUs)); 1005 1006 if (!mReconfigurationInProgress) { 1007 changeConfiguration(timeUs, getBandwidthIndex()); 1008 } 1009 1010 return OK; 1011} 1012 1013status_t LiveSession::getDuration(int64_t *durationUs) const { 1014 int64_t maxDurationUs = 0ll; 1015 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1016 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 1017 1018 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 1019 maxDurationUs = fetcherDurationUs; 1020 } 1021 } 1022 1023 *durationUs = maxDurationUs; 1024 1025 return OK; 1026} 1027 1028bool LiveSession::isSeekable() const { 1029 int64_t durationUs; 1030 return getDuration(&durationUs) == OK && durationUs >= 0; 1031} 1032 1033bool LiveSession::hasDynamicDuration() const { 1034 return false; 1035} 1036 1037size_t LiveSession::getTrackCount() const { 1038 if (mPlaylist == NULL) { 1039 return 0; 1040 } else { 1041 return mPlaylist->getTrackCount(); 1042 } 1043} 1044 1045sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 1046 if (mPlaylist == NULL) { 1047 return NULL; 1048 } else { 1049 return mPlaylist->getTrackInfo(trackIndex); 1050 } 1051} 1052 1053status_t LiveSession::selectTrack(size_t index, bool select) { 1054 status_t err = mPlaylist->selectTrack(index, select); 1055 if (err == OK) { 1056 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id()); 1057 msg->setInt32("bandwidthIndex", mCurBandwidthIndex); 1058 msg->setInt32("pickTrack", select); 1059 msg->post(); 1060 } 1061 return err; 1062} 1063 1064bool LiveSession::canSwitchUp() { 1065 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 1066 status_t err = OK; 1067 for (size_t i = 0; i < mPacketSources.size(); ++i) { 1068 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 1069 int64_t dur = source->getBufferedDurationUs(&err); 1070 if (err == OK && dur > 10000000) { 1071 return true; 1072 } 1073 } 1074 return false; 1075} 1076 1077void LiveSession::changeConfiguration( 1078 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 1079 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 1080 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 1081 cancelBandwidthSwitch(); 1082 1083 CHECK(!mReconfigurationInProgress); 1084 mReconfigurationInProgress = true; 1085 1086 mCurBandwidthIndex = bandwidthIndex; 1087 1088 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 1089 timeUs, bandwidthIndex, pickTrack); 1090 1091 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 1092 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 1093 1094 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 1095 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 1096 1097 AString URIs[kMaxStreams]; 1098 for (size_t i = 0; i < kMaxStreams; ++i) { 1099 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 1100 streamMask |= indexToType(i); 1101 } 1102 } 1103 1104 // Step 1, stop and discard fetchers that are no longer needed. 1105 // Pause those that we'll reuse. 1106 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1107 const AString &uri = mFetcherInfos.keyAt(i); 1108 1109 bool discardFetcher = true; 1110 1111 // If we're seeking all current fetchers are discarded. 1112 if (timeUs < 0ll) { 1113 // delay fetcher removal if not picking tracks 1114 discardFetcher = pickTrack; 1115 1116 for (size_t j = 0; j < kMaxStreams; ++j) { 1117 StreamType type = indexToType(j); 1118 if ((streamMask & type) && uri == URIs[j]) { 1119 resumeMask |= type; 1120 streamMask &= ~type; 1121 discardFetcher = false; 1122 } 1123 } 1124 } 1125 1126 if (discardFetcher) { 1127 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1128 } else { 1129 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1130 } 1131 } 1132 1133 sp<AMessage> msg; 1134 if (timeUs < 0ll) { 1135 // skip onChangeConfiguration2 (decoder destruction) if not seeking. 1136 msg = new AMessage(kWhatChangeConfiguration3, id()); 1137 } else { 1138 msg = new AMessage(kWhatChangeConfiguration2, id()); 1139 } 1140 msg->setInt32("streamMask", streamMask); 1141 msg->setInt32("resumeMask", resumeMask); 1142 msg->setInt32("pickTrack", pickTrack); 1143 msg->setInt64("timeUs", timeUs); 1144 for (size_t i = 0; i < kMaxStreams; ++i) { 1145 if ((streamMask | resumeMask) & indexToType(i)) { 1146 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1147 } 1148 } 1149 1150 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1151 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1152 // fetchers have completed their asynchronous operation, we'll post 1153 // mContinuation, which then is handled below in onChangeConfiguration2. 1154 mContinuationCounter = mFetcherInfos.size(); 1155 mContinuation = msg; 1156 1157 if (mContinuationCounter == 0) { 1158 msg->post(); 1159 1160 if (mSeekReplyID != 0) { 1161 CHECK(mSeekReply != NULL); 1162 mSeekReply->postReply(mSeekReplyID); 1163 } 1164 } 1165} 1166 1167void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1168 if (!mReconfigurationInProgress) { 1169 int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex; 1170 msg->findInt32("pickTrack", &pickTrack); 1171 msg->findInt32("bandwidthIndex", &bandwidthIndex); 1172 changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack); 1173 } else { 1174 msg->post(1000000ll); // retry in 1 sec 1175 } 1176} 1177 1178void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1179 mContinuation.clear(); 1180 1181 // All fetchers are either suspended or have been removed now. 1182 1183 uint32_t streamMask, resumeMask; 1184 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1185 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1186 1187 // currently onChangeConfiguration2 is only called for seeking; 1188 // remove the following CHECK if using it else where. 1189 CHECK_EQ(resumeMask, 0); 1190 streamMask |= resumeMask; 1191 1192 AString URIs[kMaxStreams]; 1193 for (size_t i = 0; i < kMaxStreams; ++i) { 1194 if (streamMask & indexToType(i)) { 1195 const AString &uriKey = mStreams[i].uriKey(); 1196 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1197 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1198 } 1199 } 1200 1201 // Determine which decoders to shutdown on the player side, 1202 // a decoder has to be shutdown if either 1203 // 1) its streamtype was active before but now longer isn't. 1204 // or 1205 // 2) its streamtype was already active and still is but the URI 1206 // has changed. 1207 uint32_t changedMask = 0; 1208 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1209 if (((mStreamMask & streamMask & indexToType(i)) 1210 && !(URIs[i] == mStreams[i].mUri)) 1211 || (mStreamMask & ~streamMask & indexToType(i))) { 1212 changedMask |= indexToType(i); 1213 } 1214 } 1215 1216 if (changedMask == 0) { 1217 // If nothing changed as far as the audio/video decoders 1218 // are concerned we can proceed. 1219 onChangeConfiguration3(msg); 1220 return; 1221 } 1222 1223 // Something changed, inform the player which will shutdown the 1224 // corresponding decoders and will post the reply once that's done. 1225 // Handling the reply will continue executing below in 1226 // onChangeConfiguration3. 1227 sp<AMessage> notify = mNotify->dup(); 1228 notify->setInt32("what", kWhatStreamsChanged); 1229 notify->setInt32("changedMask", changedMask); 1230 1231 msg->setWhat(kWhatChangeConfiguration3); 1232 msg->setTarget(id()); 1233 1234 notify->setMessage("reply", msg); 1235 notify->post(); 1236} 1237 1238void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1239 mContinuation.clear(); 1240 // All remaining fetchers are still suspended, the player has shutdown 1241 // any decoders that needed it. 1242 1243 uint32_t streamMask, resumeMask; 1244 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1245 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1246 1247 for (size_t i = 0; i < kMaxStreams; ++i) { 1248 if (streamMask & indexToType(i)) { 1249 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1250 } 1251 } 1252 1253 int64_t timeUs; 1254 int32_t pickTrack; 1255 bool switching = false; 1256 CHECK(msg->findInt64("timeUs", &timeUs)); 1257 CHECK(msg->findInt32("pickTrack", &pickTrack)); 1258 1259 if (timeUs < 0ll) { 1260 if (!pickTrack) { 1261 switching = true; 1262 } 1263 mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs; 1264 } else { 1265 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1266 } 1267 1268 mNewStreamMask = streamMask | resumeMask; 1269 1270 // Of all existing fetchers: 1271 // * Resume fetchers that are still needed and assign them original packet sources. 1272 // * Mark otherwise unneeded fetchers for removal. 1273 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1274 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1275 const AString &uri = mFetcherInfos.keyAt(i); 1276 1277 sp<AnotherPacketSource> sources[kMaxStreams]; 1278 for (size_t j = 0; j < kMaxStreams; ++j) { 1279 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1280 sources[j] = mPacketSources.valueFor(indexToType(j)); 1281 1282 if (j != kSubtitleIndex) { 1283 ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); 1284 sp<AnotherPacketSource> discontinuityQueue; 1285 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1286 discontinuityQueue->queueDiscontinuity( 1287 ATSParser::DISCONTINUITY_NONE, 1288 NULL, 1289 true); 1290 } 1291 } 1292 } 1293 1294 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1295 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1296 || sources[kSubtitleIndex] != NULL) { 1297 info.mFetcher->startAsync( 1298 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1299 } else { 1300 info.mToBeRemoved = true; 1301 } 1302 } 1303 1304 // streamMask now only contains the types that need a new fetcher created. 1305 1306 if (streamMask != 0) { 1307 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1308 } 1309 1310 // Find out when the original fetchers have buffered up to and start the new fetchers 1311 // at a later timestamp. 1312 for (size_t i = 0; i < kMaxStreams; i++) { 1313 if (!(indexToType(i) & streamMask)) { 1314 continue; 1315 } 1316 1317 AString uri; 1318 uri = mStreams[i].mUri; 1319 1320 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1321 CHECK(fetcher != NULL); 1322 1323 int32_t latestSeq = -1; 1324 int64_t startTimeUs = -1; 1325 int64_t segmentStartTimeUs = -1ll; 1326 int32_t discontinuitySeq = -1; 1327 sp<AnotherPacketSource> sources[kMaxStreams]; 1328 1329 // TRICKY: looping from i as earlier streams are already removed from streamMask 1330 for (size_t j = i; j < kMaxStreams; ++j) { 1331 if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { 1332 sources[j] = mPacketSources.valueFor(indexToType(j)); 1333 1334 if (timeUs >= 0) { 1335 sources[j]->clear(); 1336 startTimeUs = timeUs; 1337 1338 sp<AnotherPacketSource> discontinuityQueue; 1339 sp<AMessage> extra = new AMessage; 1340 extra->setInt64("timeUs", timeUs); 1341 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1342 discontinuityQueue->queueDiscontinuity( 1343 ATSParser::DISCONTINUITY_SEEK, extra, true); 1344 } else { 1345 int32_t type; 1346 int64_t srcSegmentStartTimeUs; 1347 sp<AMessage> meta; 1348 if (pickTrack) { 1349 // selecting 1350 meta = sources[j]->getLatestDequeuedMeta(); 1351 } else { 1352 // adapting 1353 meta = sources[j]->getLatestEnqueuedMeta(); 1354 } 1355 1356 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1357 int64_t tmpUs; 1358 CHECK(meta->findInt64("timeUs", &tmpUs)); 1359 if (startTimeUs < 0 || tmpUs < startTimeUs) { 1360 startTimeUs = tmpUs; 1361 } 1362 1363 CHECK(meta->findInt64("segmentStartTimeUs", &tmpUs)); 1364 if (segmentStartTimeUs < 0 || tmpUs < segmentStartTimeUs) { 1365 segmentStartTimeUs = tmpUs; 1366 } 1367 1368 int32_t seq; 1369 CHECK(meta->findInt32("discontinuitySeq", &seq)); 1370 if (discontinuitySeq < 0 || seq < discontinuitySeq) { 1371 discontinuitySeq = seq; 1372 } 1373 } 1374 1375 if (pickTrack) { 1376 // selecting track, queue discontinuities before content 1377 sources[j]->clear(); 1378 if (j == kSubtitleIndex) { 1379 break; 1380 } 1381 sp<AnotherPacketSource> discontinuityQueue; 1382 discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); 1383 discontinuityQueue->queueDiscontinuity( 1384 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); 1385 } else { 1386 // adapting, queue discontinuities after resume 1387 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1388 sources[j]->clear(); 1389 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1390 if (extraStreams & indexToType(j)) { 1391 sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false)); 1392 } 1393 } 1394 } 1395 1396 streamMask &= ~indexToType(j); 1397 } 1398 } 1399 1400 fetcher->startAsync( 1401 sources[kAudioIndex], 1402 sources[kVideoIndex], 1403 sources[kSubtitleIndex], 1404 startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs, 1405 segmentStartTimeUs, 1406 discontinuitySeq, 1407 switching); 1408 } 1409 1410 // All fetchers have now been started, the configuration change 1411 // has completed. 1412 1413 scheduleCheckBandwidthEvent(); 1414 1415 ALOGV("XXX configuration change completed."); 1416 mReconfigurationInProgress = false; 1417 if (switching) { 1418 mSwitchInProgress = true; 1419 mSwapMask = streamMask; 1420 } else { 1421 mStreamMask = mNewStreamMask; 1422 } 1423 1424 if (mDisconnectReplyID != 0) { 1425 finishDisconnect(); 1426 } 1427} 1428 1429void LiveSession::onSwapped(const sp<AMessage> &msg) { 1430 int32_t switchGeneration; 1431 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1432 if (switchGeneration != mSwitchGeneration) { 1433 return; 1434 } 1435 1436 int32_t stream; 1437 CHECK(msg->findInt32("stream", &stream)); 1438 mSwapMask &= ~stream; 1439 if (mSwapMask != 0) { 1440 return; 1441 } 1442 1443 // Check if new variant contains extra streams. 1444 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1445 while (extraStreams) { 1446 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1447 swapPacketSource(extraStream); 1448 extraStreams &= ~extraStream; 1449 } 1450 1451 tryToFinishBandwidthSwitch(); 1452} 1453 1454void LiveSession::onCheckSwitchDown() { 1455 if (mSwitchDownMonitor == NULL) { 1456 return; 1457 } 1458 1459 for (size_t i = 0; i < kMaxStreams; ++i) { 1460 int32_t targetDuration; 1461 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); 1462 sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); 1463 1464 if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { 1465 int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); 1466 int64_t targetDurationUs = targetDuration * 1000000ll; 1467 1468 if (bufferedDurationUs < targetDurationUs / 3) { 1469 (new AMessage(kWhatSwitchDown, id()))->post(); 1470 break; 1471 } 1472 } 1473 } 1474 1475 mSwitchDownMonitor->post(1000000ll); 1476} 1477 1478void LiveSession::onSwitchDown() { 1479 if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { 1480 return; 1481 } 1482 1483 ssize_t bandwidthIndex = getBandwidthIndex(); 1484 if (bandwidthIndex < mCurBandwidthIndex) { 1485 changeConfiguration(-1, bandwidthIndex, false); 1486 return; 1487 } 1488 1489 changeConfiguration(-1, mCurBandwidthIndex - 1, false); 1490} 1491 1492// Mark switch done when: 1493// 1. all old buffers are swapped out 1494void LiveSession::tryToFinishBandwidthSwitch() { 1495 if (!mSwitchInProgress) { 1496 return; 1497 } 1498 1499 bool needToRemoveFetchers = false; 1500 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1501 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1502 needToRemoveFetchers = true; 1503 break; 1504 } 1505 } 1506 1507 if (!needToRemoveFetchers && mSwapMask == 0) { 1508 ALOGI("mSwitchInProgress = false"); 1509 mStreamMask = mNewStreamMask; 1510 mSwitchInProgress = false; 1511 } 1512} 1513 1514void LiveSession::scheduleCheckBandwidthEvent() { 1515 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1516 msg->setInt32("generation", mCheckBandwidthGeneration); 1517 msg->post(10000000ll); 1518} 1519 1520void LiveSession::cancelCheckBandwidthEvent() { 1521 ++mCheckBandwidthGeneration; 1522} 1523 1524void LiveSession::cancelBandwidthSwitch() { 1525 Mutex::Autolock lock(mSwapMutex); 1526 mSwitchGeneration++; 1527 mSwitchInProgress = false; 1528 mSwapMask = 0; 1529} 1530 1531bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1532 if (mReconfigurationInProgress || mSwitchInProgress) { 1533 return false; 1534 } 1535 1536 if (mCurBandwidthIndex < 0) { 1537 return true; 1538 } 1539 1540 if (bandwidthIndex == (size_t)mCurBandwidthIndex) { 1541 return false; 1542 } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { 1543 return canSwitchUp(); 1544 } else { 1545 return true; 1546 } 1547} 1548 1549void LiveSession::onCheckBandwidth() { 1550 size_t bandwidthIndex = getBandwidthIndex(); 1551 if (canSwitchBandwidthTo(bandwidthIndex)) { 1552 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1553 } else { 1554 scheduleCheckBandwidthEvent(); 1555 } 1556 1557 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1558 // schedule another one on return, only an explicit call to 1559 // scheduleCheckBandwidthEvent will do that. 1560 // This ensures that only one configuration change is ongoing at any 1561 // one time, once that completes it'll schedule another check bandwidth 1562 // event. 1563} 1564 1565void LiveSession::postPrepared(status_t err) { 1566 CHECK(mInPreparationPhase); 1567 1568 sp<AMessage> notify = mNotify->dup(); 1569 if (err == OK || err == ERROR_END_OF_STREAM) { 1570 notify->setInt32("what", kWhatPrepared); 1571 } else { 1572 notify->setInt32("what", kWhatPreparationFailed); 1573 notify->setInt32("err", err); 1574 } 1575 1576 notify->post(); 1577 1578 mInPreparationPhase = false; 1579 1580 mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id()); 1581 mSwitchDownMonitor->post(); 1582} 1583 1584} // namespace android 1585 1586