LiveSession.cpp revision 7d3044d64294cca6fadd184648a57185e92cf5c6
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/stagefright/foundation/hexdump.h> 31#include <media/stagefright/foundation/ABuffer.h> 32#include <media/stagefright/foundation/ADebug.h> 33#include <media/stagefright/foundation/AMessage.h> 34#include <media/stagefright/DataSource.h> 35#include <media/stagefright/FileSource.h> 36#include <media/stagefright/MediaErrors.h> 37#include <media/stagefright/MetaData.h> 38#include <media/stagefright/Utils.h> 39 40#include <utils/Mutex.h> 41 42#include <ctype.h> 43#include <openssl/aes.h> 44#include <openssl/md5.h> 45 46namespace android { 47 48LiveSession::LiveSession( 49 const sp<AMessage> ¬ify, uint32_t flags, bool uidValid, uid_t uid) 50 : mNotify(notify), 51 mFlags(flags), 52 mUIDValid(uidValid), 53 mUID(uid), 54 mInPreparationPhase(true), 55 mHTTPDataSource( 56 HTTPBase::Create( 57 (mFlags & kFlagIncognito) 58 ? HTTPBase::kFlagIncognito 59 : 0)), 60 mPrevBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mLastDequeuedTimeUs(0ll), 67 mRealTimeBaseUs(0ll), 68 mReconfigurationInProgress(false), 69 mSwitchInProgress(false), 70 mDisconnectReplyID(0), 71 mSeekReplyID(0) { 72 if (mUIDValid) { 73 mHTTPDataSource->setUID(mUID); 74 } 75 76 mStreams[kAudioIndex] = StreamItem("audio"); 77 mStreams[kVideoIndex] = StreamItem("video"); 78 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 79 80 for (size_t i = 0; i < kMaxStreams; ++i) { 81 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 82 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 } 84} 85 86LiveSession::~LiveSession() { 87} 88 89sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 90 ABuffer *discontinuity = new ABuffer(0); 91 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 92 discontinuity->meta()->setInt32("swapPacketSource", swap); 93 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 94 discontinuity->meta()->setInt64("timeUs", -1); 95 return discontinuity; 96} 97 98void LiveSession::swapPacketSource(StreamType stream) { 99 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 100 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 101 sp<AnotherPacketSource> tmp = aps; 102 aps = aps2; 103 aps2 = tmp; 104 aps2->clear(); 105} 106 107status_t LiveSession::dequeueAccessUnit( 108 StreamType stream, sp<ABuffer> *accessUnit) { 109 if (!(mStreamMask & stream)) { 110 // return -EWOULDBLOCK to avoid halting the decoder 111 // when switching between audio/video and audio only. 112 return -EWOULDBLOCK; 113 } 114 115 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 116 117 status_t finalResult; 118 if (!packetSource->hasBufferAvailable(&finalResult)) { 119 return finalResult == OK ? -EAGAIN : finalResult; 120 } 121 122 status_t err = packetSource->dequeueAccessUnit(accessUnit); 123 124 const char *streamStr; 125 switch (stream) { 126 case STREAMTYPE_AUDIO: 127 streamStr = "audio"; 128 break; 129 case STREAMTYPE_VIDEO: 130 streamStr = "video"; 131 break; 132 case STREAMTYPE_SUBTITLES: 133 streamStr = "subs"; 134 break; 135 default: 136 TRESPASS(); 137 } 138 139 if (err == INFO_DISCONTINUITY) { 140 int32_t type; 141 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 142 143 sp<AMessage> extra; 144 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 145 extra.clear(); 146 } 147 148 ALOGI("[%s] read discontinuity of type %d, extra = %s", 149 streamStr, 150 type, 151 extra == NULL ? "NULL" : extra->debugString().c_str()); 152 153 int32_t swap; 154 if (type == ATSParser::DISCONTINUITY_FORMATCHANGE 155 && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) 156 && swap) { 157 158 int32_t switchGeneration; 159 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 160 { 161 Mutex::Autolock lock(mSwapMutex); 162 if (switchGeneration == mSwitchGeneration) { 163 swapPacketSource(stream); 164 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 165 msg->setInt32("stream", stream); 166 msg->setInt32("switchGeneration", switchGeneration); 167 msg->post(); 168 } 169 } 170 } 171 } else if (err == OK) { 172 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 173 int64_t timeUs; 174 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 175 ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs); 176 177 mLastDequeuedTimeUs = timeUs; 178 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 179 } else if (stream == STREAMTYPE_SUBTITLES) { 180 (*accessUnit)->meta()->setInt32( 181 "trackIndex", mPlaylist->getSelectedIndex()); 182 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 183 } 184 } else { 185 ALOGI("[%s] encountered error %d", streamStr, err); 186 } 187 188 return err; 189} 190 191status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 192 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 193 if (!(mStreamMask & stream)) { 194 return UNKNOWN_ERROR; 195 } 196 197 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 198 199 sp<MetaData> meta = packetSource->getFormat(); 200 201 if (meta == NULL) { 202 return -EAGAIN; 203 } 204 205 return convertMetaDataToMessage(meta, format); 206} 207 208void LiveSession::connectAsync( 209 const char *url, const KeyedVector<String8, String8> *headers) { 210 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 211 msg->setString("url", url); 212 213 if (headers != NULL) { 214 msg->setPointer( 215 "headers", 216 new KeyedVector<String8, String8>(*headers)); 217 } 218 219 msg->post(); 220} 221 222status_t LiveSession::disconnect() { 223 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 224 225 sp<AMessage> response; 226 status_t err = msg->postAndAwaitResponse(&response); 227 228 return err; 229} 230 231status_t LiveSession::seekTo(int64_t timeUs) { 232 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 233 msg->setInt64("timeUs", timeUs); 234 235 sp<AMessage> response; 236 status_t err = msg->postAndAwaitResponse(&response); 237 238 uint32_t replyID; 239 CHECK(response == mSeekReply && 0 != mSeekReplyID); 240 mSeekReply.clear(); 241 mSeekReplyID = 0; 242 return err; 243} 244 245void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 246 switch (msg->what()) { 247 case kWhatConnect: 248 { 249 onConnect(msg); 250 break; 251 } 252 253 case kWhatDisconnect: 254 { 255 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 256 257 if (mReconfigurationInProgress) { 258 break; 259 } 260 261 finishDisconnect(); 262 break; 263 } 264 265 case kWhatSeek: 266 { 267 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 268 269 status_t err = onSeek(msg); 270 271 mSeekReply = new AMessage; 272 mSeekReply->setInt32("err", err); 273 break; 274 } 275 276 case kWhatFetcherNotify: 277 { 278 int32_t what; 279 CHECK(msg->findInt32("what", &what)); 280 281 switch (what) { 282 case PlaylistFetcher::kWhatStarted: 283 break; 284 case PlaylistFetcher::kWhatPaused: 285 case PlaylistFetcher::kWhatStopped: 286 { 287 if (what == PlaylistFetcher::kWhatStopped) { 288 AString uri; 289 CHECK(msg->findString("uri", &uri)); 290 if (mFetcherInfos.removeItem(uri) < 0) { 291 // ignore duplicated kWhatStopped messages. 292 break; 293 } 294 295 tryToFinishBandwidthSwitch(); 296 } 297 298 if (mContinuation != NULL) { 299 CHECK_GT(mContinuationCounter, 0); 300 if (--mContinuationCounter == 0) { 301 mContinuation->post(); 302 303 if (mSeekReplyID != 0) { 304 CHECK(mSeekReply != NULL); 305 mSeekReply->postReply(mSeekReplyID); 306 } 307 } 308 } 309 break; 310 } 311 312 case PlaylistFetcher::kWhatDurationUpdate: 313 { 314 AString uri; 315 CHECK(msg->findString("uri", &uri)); 316 317 int64_t durationUs; 318 CHECK(msg->findInt64("durationUs", &durationUs)); 319 320 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 321 info->mDurationUs = durationUs; 322 break; 323 } 324 325 case PlaylistFetcher::kWhatError: 326 { 327 status_t err; 328 CHECK(msg->findInt32("err", &err)); 329 330 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 331 332 if (mInPreparationPhase) { 333 postPrepared(err); 334 } 335 336 cancelBandwidthSwitch(); 337 338 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 339 340 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 341 342 mPacketSources.valueFor( 343 STREAMTYPE_SUBTITLES)->signalEOS(err); 344 345 sp<AMessage> notify = mNotify->dup(); 346 notify->setInt32("what", kWhatError); 347 notify->setInt32("err", err); 348 notify->post(); 349 break; 350 } 351 352 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 353 { 354 AString uri; 355 CHECK(msg->findString("uri", &uri)); 356 357 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 358 info->mIsPrepared = true; 359 360 if (mInPreparationPhase) { 361 bool allFetchersPrepared = true; 362 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 363 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 364 allFetchersPrepared = false; 365 break; 366 } 367 } 368 369 if (allFetchersPrepared) { 370 postPrepared(OK); 371 } 372 } 373 break; 374 } 375 376 case PlaylistFetcher::kWhatStartedAt: 377 { 378 int32_t switchGeneration; 379 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 380 381 if (switchGeneration != mSwitchGeneration) { 382 break; 383 } 384 385 // Resume fetcher for the original variant; the resumed fetcher should 386 // continue until the timestamps found in msg, which is stored by the 387 // new fetcher to indicate where the new variant has started buffering. 388 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 389 const FetcherInfo info = mFetcherInfos.valueAt(i); 390 if (info.mToBeRemoved) { 391 info.mFetcher->resumeUntilAsync(msg); 392 } 393 } 394 break; 395 } 396 397 default: 398 TRESPASS(); 399 } 400 401 break; 402 } 403 404 case kWhatCheckBandwidth: 405 { 406 int32_t generation; 407 CHECK(msg->findInt32("generation", &generation)); 408 409 if (generation != mCheckBandwidthGeneration) { 410 break; 411 } 412 413 onCheckBandwidth(); 414 break; 415 } 416 417 case kWhatChangeConfiguration: 418 { 419 onChangeConfiguration(msg); 420 break; 421 } 422 423 case kWhatChangeConfiguration2: 424 { 425 onChangeConfiguration2(msg); 426 break; 427 } 428 429 case kWhatChangeConfiguration3: 430 { 431 onChangeConfiguration3(msg); 432 break; 433 } 434 435 case kWhatFinishDisconnect2: 436 { 437 onFinishDisconnect2(); 438 break; 439 } 440 441 case kWhatSwapped: 442 { 443 onSwapped(msg); 444 break; 445 } 446 default: 447 TRESPASS(); 448 break; 449 } 450} 451 452// static 453int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 454 if (a->mBandwidth < b->mBandwidth) { 455 return -1; 456 } else if (a->mBandwidth == b->mBandwidth) { 457 return 0; 458 } 459 460 return 1; 461} 462 463// static 464LiveSession::StreamType LiveSession::indexToType(int idx) { 465 CHECK(idx >= 0 && idx < kMaxStreams); 466 return (StreamType)(1 << idx); 467} 468 469void LiveSession::onConnect(const sp<AMessage> &msg) { 470 AString url; 471 CHECK(msg->findString("url", &url)); 472 473 KeyedVector<String8, String8> *headers = NULL; 474 if (!msg->findPointer("headers", (void **)&headers)) { 475 mExtraHeaders.clear(); 476 } else { 477 mExtraHeaders = *headers; 478 479 delete headers; 480 headers = NULL; 481 } 482 483#if 1 484 ALOGI("onConnect <URL suppressed>"); 485#else 486 ALOGI("onConnect %s", url.c_str()); 487#endif 488 489 mMasterURL = url; 490 491 bool dummy; 492 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 493 494 if (mPlaylist == NULL) { 495 ALOGE("unable to fetch master playlist '%s'.", url.c_str()); 496 497 postPrepared(ERROR_IO); 498 return; 499 } 500 501 // We trust the content provider to make a reasonable choice of preferred 502 // initial bandwidth by listing it first in the variant playlist. 503 // At startup we really don't have a good estimate on the available 504 // network bandwidth since we haven't tranferred any data yet. Once 505 // we have we can make a better informed choice. 506 size_t initialBandwidth = 0; 507 size_t initialBandwidthIndex = 0; 508 509 if (mPlaylist->isVariantPlaylist()) { 510 for (size_t i = 0; i < mPlaylist->size(); ++i) { 511 BandwidthItem item; 512 513 item.mPlaylistIndex = i; 514 515 sp<AMessage> meta; 516 AString uri; 517 mPlaylist->itemAt(i, &uri, &meta); 518 519 unsigned long bandwidth; 520 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 521 522 if (initialBandwidth == 0) { 523 initialBandwidth = item.mBandwidth; 524 } 525 526 mBandwidthItems.push(item); 527 } 528 529 CHECK_GT(mBandwidthItems.size(), 0u); 530 531 mBandwidthItems.sort(SortByBandwidth); 532 533 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 534 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 535 initialBandwidthIndex = i; 536 break; 537 } 538 } 539 } else { 540 // dummy item. 541 BandwidthItem item; 542 item.mPlaylistIndex = 0; 543 item.mBandwidth = 0; 544 mBandwidthItems.push(item); 545 } 546 547 changeConfiguration( 548 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); 549} 550 551void LiveSession::finishDisconnect() { 552 // No reconfiguration is currently pending, make sure none will trigger 553 // during disconnection either. 554 cancelCheckBandwidthEvent(); 555 556 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 557 // (finishDisconnect, onFinishDisconnect2) 558 cancelBandwidthSwitch(); 559 560 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 561 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 562 } 563 564 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 565 566 mContinuationCounter = mFetcherInfos.size(); 567 mContinuation = msg; 568 569 if (mContinuationCounter == 0) { 570 msg->post(); 571 } 572} 573 574void LiveSession::onFinishDisconnect2() { 575 mContinuation.clear(); 576 577 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 578 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 579 580 mPacketSources.valueFor( 581 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 582 583 sp<AMessage> response = new AMessage; 584 response->setInt32("err", OK); 585 586 response->postReply(mDisconnectReplyID); 587 mDisconnectReplyID = 0; 588} 589 590sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 591 ssize_t index = mFetcherInfos.indexOfKey(uri); 592 593 if (index >= 0) { 594 return NULL; 595 } 596 597 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 598 notify->setString("uri", uri); 599 notify->setInt32("switchGeneration", mSwitchGeneration); 600 601 FetcherInfo info; 602 info.mFetcher = new PlaylistFetcher(notify, this, uri); 603 info.mDurationUs = -1ll; 604 info.mIsPrepared = false; 605 info.mToBeRemoved = false; 606 looper()->registerHandler(info.mFetcher); 607 608 mFetcherInfos.add(uri, info); 609 610 return info.mFetcher; 611} 612 613/* 614 * Illustration of parameters: 615 * 616 * 0 `range_offset` 617 * +------------+-------------------------------------------------------+--+--+ 618 * | | | next block to fetch | | | 619 * | | `source` handle => `out` buffer | | | | 620 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 621 * | |<----------- `range_length` / buffer capacity ----------->| | 622 * |<------------------------------ file_size ------------------------------->| 623 * 624 * Special parameter values: 625 * - range_length == -1 means entire file 626 * - block_size == 0 means entire range 627 * 628 */ 629status_t LiveSession::fetchFile( 630 const char *url, sp<ABuffer> *out, 631 int64_t range_offset, int64_t range_length, 632 uint32_t block_size, /* download block size */ 633 sp<DataSource> *source, /* to return and reuse source */ 634 String8 *actualUrl) { 635 off64_t size; 636 sp<DataSource> temp_source; 637 if (source == NULL) { 638 source = &temp_source; 639 } 640 641 if (*source == NULL) { 642 if (!strncasecmp(url, "file://", 7)) { 643 *source = new FileSource(url + 7); 644 } else if (strncasecmp(url, "http://", 7) 645 && strncasecmp(url, "https://", 8)) { 646 return ERROR_UNSUPPORTED; 647 } else { 648 KeyedVector<String8, String8> headers = mExtraHeaders; 649 if (range_offset > 0 || range_length >= 0) { 650 headers.add( 651 String8("Range"), 652 String8( 653 StringPrintf( 654 "bytes=%lld-%s", 655 range_offset, 656 range_length < 0 657 ? "" : StringPrintf("%lld", 658 range_offset + range_length - 1).c_str()).c_str())); 659 } 660 status_t err = mHTTPDataSource->connect(url, &headers); 661 662 if (err != OK) { 663 return err; 664 } 665 666 *source = mHTTPDataSource; 667 } 668 } 669 670 status_t getSizeErr = (*source)->getSize(&size); 671 if (getSizeErr != OK) { 672 size = 65536; 673 } 674 675 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 676 if (*out == NULL) { 677 buffer->setRange(0, 0); 678 } 679 680 // adjust range_length if only reading partial block 681 if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) { 682 range_length = buffer->size() + block_size; 683 } 684 for (;;) { 685 // Only resize when we don't know the size. 686 size_t bufferRemaining = buffer->capacity() - buffer->size(); 687 if (bufferRemaining == 0 && getSizeErr != OK) { 688 bufferRemaining = 32768; 689 690 ALOGV("increasing download buffer to %d bytes", 691 buffer->size() + bufferRemaining); 692 693 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 694 memcpy(copy->data(), buffer->data(), buffer->size()); 695 copy->setRange(0, buffer->size()); 696 697 buffer = copy; 698 } 699 700 size_t maxBytesToRead = bufferRemaining; 701 if (range_length >= 0) { 702 int64_t bytesLeftInRange = range_length - buffer->size(); 703 if (bytesLeftInRange < maxBytesToRead) { 704 maxBytesToRead = bytesLeftInRange; 705 706 if (bytesLeftInRange == 0) { 707 break; 708 } 709 } 710 } 711 712 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 713 // to help us break out of the loop. 714 ssize_t n = (*source)->readAt( 715 buffer->size(), buffer->data() + buffer->size(), 716 maxBytesToRead); 717 718 if (n < 0) { 719 return n; 720 } 721 722 if (n == 0) { 723 break; 724 } 725 726 buffer->setRange(0, buffer->size() + (size_t)n); 727 } 728 729 *out = buffer; 730 if (actualUrl != NULL) { 731 *actualUrl = (*source)->getUri(); 732 if (actualUrl->isEmpty()) { 733 *actualUrl = url; 734 } 735 } 736 737 return OK; 738} 739 740sp<M3UParser> LiveSession::fetchPlaylist( 741 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 742 ALOGV("fetchPlaylist '%s'", url); 743 744 *unchanged = false; 745 746 sp<ABuffer> buffer; 747 String8 actualUrl; 748 status_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 749 750 if (err != OK) { 751 return NULL; 752 } 753 754 // MD5 functionality is not available on the simulator, treat all 755 // playlists as changed. 756 757#if defined(HAVE_ANDROID_OS) 758 uint8_t hash[16]; 759 760 MD5_CTX m; 761 MD5_Init(&m); 762 MD5_Update(&m, buffer->data(), buffer->size()); 763 764 MD5_Final(hash, &m); 765 766 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 767 // playlist unchanged 768 *unchanged = true; 769 770 ALOGV("Playlist unchanged, refresh state is now %d", 771 (int)mRefreshState); 772 773 return NULL; 774 } 775 776 if (curPlaylistHash != NULL) { 777 memcpy(curPlaylistHash, hash, sizeof(hash)); 778 } 779#endif 780 781 sp<M3UParser> playlist = 782 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 783 784 if (playlist->initCheck() != OK) { 785 ALOGE("failed to parse .m3u8 playlist"); 786 787 return NULL; 788 } 789 790 return playlist; 791} 792 793static double uniformRand() { 794 return (double)rand() / RAND_MAX; 795} 796 797size_t LiveSession::getBandwidthIndex() { 798 if (mBandwidthItems.size() == 0) { 799 return 0; 800 } 801 802#if 1 803 char value[PROPERTY_VALUE_MAX]; 804 ssize_t index = -1; 805 if (property_get("media.httplive.bw-index", value, NULL)) { 806 char *end; 807 index = strtol(value, &end, 10); 808 CHECK(end > value && *end == '\0'); 809 810 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 811 index = mBandwidthItems.size() - 1; 812 } 813 } 814 815 if (index < 0) { 816 int32_t bandwidthBps; 817 if (mHTTPDataSource != NULL 818 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 819 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 820 } else { 821 ALOGV("no bandwidth estimate."); 822 return 0; // Pick the lowest bandwidth stream by default. 823 } 824 825 char value[PROPERTY_VALUE_MAX]; 826 if (property_get("media.httplive.max-bw", value, NULL)) { 827 char *end; 828 long maxBw = strtoul(value, &end, 10); 829 if (end > value && *end == '\0') { 830 if (maxBw > 0 && bandwidthBps > maxBw) { 831 ALOGV("bandwidth capped to %ld bps", maxBw); 832 bandwidthBps = maxBw; 833 } 834 } 835 } 836 837 // Consider only 80% of the available bandwidth usable. 838 bandwidthBps = (bandwidthBps * 8) / 10; 839 840 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 841 842 index = mBandwidthItems.size() - 1; 843 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 844 > (size_t)bandwidthBps) { 845 --index; 846 } 847 } 848#elif 0 849 // Change bandwidth at random() 850 size_t index = uniformRand() * mBandwidthItems.size(); 851#elif 0 852 // There's a 50% chance to stay on the current bandwidth and 853 // a 50% chance to switch to the next higher bandwidth (wrapping around 854 // to lowest) 855 const size_t kMinIndex = 0; 856 857 static ssize_t mPrevBandwidthIndex = -1; 858 859 size_t index; 860 if (mPrevBandwidthIndex < 0) { 861 index = kMinIndex; 862 } else if (uniformRand() < 0.5) { 863 index = (size_t)mPrevBandwidthIndex; 864 } else { 865 index = mPrevBandwidthIndex + 1; 866 if (index == mBandwidthItems.size()) { 867 index = kMinIndex; 868 } 869 } 870 mPrevBandwidthIndex = index; 871#elif 0 872 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 873 874 size_t index = mBandwidthItems.size() - 1; 875 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 876 --index; 877 } 878#elif 1 879 char value[PROPERTY_VALUE_MAX]; 880 size_t index; 881 if (property_get("media.httplive.bw-index", value, NULL)) { 882 char *end; 883 index = strtoul(value, &end, 10); 884 CHECK(end > value && *end == '\0'); 885 886 if (index >= mBandwidthItems.size()) { 887 index = mBandwidthItems.size() - 1; 888 } 889 } else { 890 index = 0; 891 } 892#else 893 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 894#endif 895 896 CHECK_GE(index, 0); 897 898 return index; 899} 900 901status_t LiveSession::onSeek(const sp<AMessage> &msg) { 902 int64_t timeUs; 903 CHECK(msg->findInt64("timeUs", &timeUs)); 904 905 if (!mReconfigurationInProgress) { 906 changeConfiguration(timeUs, getBandwidthIndex()); 907 } 908 909 return OK; 910} 911 912status_t LiveSession::getDuration(int64_t *durationUs) const { 913 int64_t maxDurationUs = 0ll; 914 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 915 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 916 917 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 918 maxDurationUs = fetcherDurationUs; 919 } 920 } 921 922 *durationUs = maxDurationUs; 923 924 return OK; 925} 926 927bool LiveSession::isSeekable() const { 928 int64_t durationUs; 929 return getDuration(&durationUs) == OK && durationUs >= 0; 930} 931 932bool LiveSession::hasDynamicDuration() const { 933 return false; 934} 935 936status_t LiveSession::getTrackInfo(Parcel *reply) const { 937 return mPlaylist->getTrackInfo(reply); 938} 939 940status_t LiveSession::selectTrack(size_t index, bool select) { 941 status_t err = mPlaylist->selectTrack(index, select); 942 if (err == OK) { 943 (new AMessage(kWhatChangeConfiguration, id()))->post(); 944 } 945 return err; 946} 947 948bool LiveSession::canSwitchUp() { 949 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 950 status_t err = OK; 951 for (size_t i = 0; i < mPacketSources.size(); ++i) { 952 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 953 int64_t dur = source->getBufferedDurationUs(&err); 954 if (err == OK && dur > 10000000) { 955 return true; 956 } 957 } 958 return false; 959} 960 961void LiveSession::changeConfiguration( 962 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 963 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 964 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 965 cancelBandwidthSwitch(); 966 967 CHECK(!mReconfigurationInProgress); 968 mReconfigurationInProgress = true; 969 970 mPrevBandwidthIndex = bandwidthIndex; 971 972 ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d", 973 timeUs, bandwidthIndex, pickTrack); 974 975 if (pickTrack) { 976 mPlaylist->pickRandomMediaItems(); 977 } 978 979 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 980 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 981 982 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 983 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 984 985 AString URIs[kMaxStreams]; 986 for (size_t i = 0; i < kMaxStreams; ++i) { 987 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 988 streamMask |= indexToType(i); 989 } 990 } 991 992 // Step 1, stop and discard fetchers that are no longer needed. 993 // Pause those that we'll reuse. 994 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 995 const AString &uri = mFetcherInfos.keyAt(i); 996 997 bool discardFetcher = true; 998 999 // If we're seeking all current fetchers are discarded. 1000 if (timeUs < 0ll) { 1001 // delay fetcher removal 1002 discardFetcher = false; 1003 1004 for (size_t j = 0; j < kMaxStreams; ++j) { 1005 StreamType type = indexToType(j); 1006 if ((streamMask & type) && uri == URIs[j]) { 1007 resumeMask |= type; 1008 streamMask &= ~type; 1009 } 1010 } 1011 } 1012 1013 if (discardFetcher) { 1014 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1015 } else { 1016 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1017 } 1018 } 1019 1020 sp<AMessage> msg; 1021 if (timeUs < 0ll) { 1022 // skip onChangeConfiguration2 (decoder destruction) if switching. 1023 msg = new AMessage(kWhatChangeConfiguration3, id()); 1024 } else { 1025 msg = new AMessage(kWhatChangeConfiguration2, id()); 1026 } 1027 msg->setInt32("streamMask", streamMask); 1028 msg->setInt32("resumeMask", resumeMask); 1029 msg->setInt64("timeUs", timeUs); 1030 for (size_t i = 0; i < kMaxStreams; ++i) { 1031 if (streamMask & indexToType(i)) { 1032 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1033 } 1034 } 1035 1036 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1037 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1038 // fetchers have completed their asynchronous operation, we'll post 1039 // mContinuation, which then is handled below in onChangeConfiguration2. 1040 mContinuationCounter = mFetcherInfos.size(); 1041 mContinuation = msg; 1042 1043 if (mContinuationCounter == 0) { 1044 msg->post(); 1045 1046 if (mSeekReplyID != 0) { 1047 CHECK(mSeekReply != NULL); 1048 mSeekReply->postReply(mSeekReplyID); 1049 } 1050 } 1051} 1052 1053void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1054 if (!mReconfigurationInProgress) { 1055 changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); 1056 } else { 1057 msg->post(1000000ll); // retry in 1 sec 1058 } 1059} 1060 1061void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1062 mContinuation.clear(); 1063 1064 // All fetchers are either suspended or have been removed now. 1065 1066 uint32_t streamMask; 1067 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1068 1069 AString URIs[kMaxStreams]; 1070 for (size_t i = 0; i < kMaxStreams; ++i) { 1071 if (streamMask & indexToType(i)) { 1072 const AString &uriKey = mStreams[i].uriKey(); 1073 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1074 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1075 } 1076 } 1077 1078 // Determine which decoders to shutdown on the player side, 1079 // a decoder has to be shutdown if either 1080 // 1) its streamtype was active before but now longer isn't. 1081 // or 1082 // 2) its streamtype was already active and still is but the URI 1083 // has changed. 1084 uint32_t changedMask = 0; 1085 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1086 if (((mStreamMask & streamMask & indexToType(i)) 1087 && !(URIs[i] == mStreams[i].mUri)) 1088 || (mStreamMask & ~streamMask & indexToType(i))) { 1089 changedMask |= indexToType(i); 1090 } 1091 } 1092 1093 if (changedMask == 0) { 1094 // If nothing changed as far as the audio/video decoders 1095 // are concerned we can proceed. 1096 onChangeConfiguration3(msg); 1097 return; 1098 } 1099 1100 // Something changed, inform the player which will shutdown the 1101 // corresponding decoders and will post the reply once that's done. 1102 // Handling the reply will continue executing below in 1103 // onChangeConfiguration3. 1104 sp<AMessage> notify = mNotify->dup(); 1105 notify->setInt32("what", kWhatStreamsChanged); 1106 notify->setInt32("changedMask", changedMask); 1107 1108 msg->setWhat(kWhatChangeConfiguration3); 1109 msg->setTarget(id()); 1110 1111 notify->setMessage("reply", msg); 1112 notify->post(); 1113} 1114 1115void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1116 mContinuation.clear(); 1117 // All remaining fetchers are still suspended, the player has shutdown 1118 // any decoders that needed it. 1119 1120 uint32_t streamMask, resumeMask; 1121 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1122 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1123 1124 for (size_t i = 0; i < kMaxStreams; ++i) { 1125 if (streamMask & indexToType(i)) { 1126 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1127 } 1128 } 1129 1130 int64_t timeUs; 1131 bool switching = false; 1132 CHECK(msg->findInt64("timeUs", &timeUs)); 1133 1134 if (timeUs < 0ll) { 1135 timeUs = mLastDequeuedTimeUs; 1136 switching = true; 1137 } 1138 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1139 1140 mNewStreamMask = streamMask; 1141 1142 // Of all existing fetchers: 1143 // * Resume fetchers that are still needed and assign them original packet sources. 1144 // * Mark otherwise unneeded fetchers for removal. 1145 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1146 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1147 const AString &uri = mFetcherInfos.keyAt(i); 1148 1149 sp<AnotherPacketSource> sources[kMaxStreams]; 1150 for (size_t j = 0; j < kMaxStreams; ++j) { 1151 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1152 sources[j] = mPacketSources.valueFor(indexToType(j)); 1153 } 1154 } 1155 1156 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1157 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1158 || sources[kSubtitleIndex] != NULL) { 1159 info.mFetcher->startAsync( 1160 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1161 } else { 1162 info.mToBeRemoved = true; 1163 } 1164 } 1165 1166 // streamMask now only contains the types that need a new fetcher created. 1167 1168 if (streamMask != 0) { 1169 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1170 } 1171 1172 // Find out when the original fetchers have buffered up to and start the new fetchers 1173 // at a later timestamp. 1174 for (size_t i = 0; i < kMaxStreams; i++) { 1175 if (!(indexToType(i) & streamMask)) { 1176 continue; 1177 } 1178 1179 AString uri; 1180 uri = mStreams[i].mUri; 1181 1182 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1183 CHECK(fetcher != NULL); 1184 1185 int32_t latestSeq = -1; 1186 int64_t latestTimeUs = 0ll; 1187 sp<AnotherPacketSource> sources[kMaxStreams]; 1188 1189 // TRICKY: looping from i as earlier streams are already removed from streamMask 1190 for (size_t j = i; j < kMaxStreams; ++j) { 1191 if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { 1192 sources[j] = mPacketSources.valueFor(indexToType(j)); 1193 1194 if (!switching) { 1195 sources[j]->clear(); 1196 } else { 1197 int32_t type, seq; 1198 int64_t srcTimeUs; 1199 sp<AMessage> meta = sources[j]->getLatestMeta(); 1200 1201 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1202 CHECK(meta->findInt32("seq", &seq)); 1203 if (seq > latestSeq) { 1204 latestSeq = seq; 1205 } 1206 CHECK(meta->findInt64("timeUs", &srcTimeUs)); 1207 if (srcTimeUs > latestTimeUs) { 1208 latestTimeUs = srcTimeUs; 1209 } 1210 } 1211 1212 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1213 sources[j]->clear(); 1214 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1215 if (extraStreams & indexToType(j)) { 1216 sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); 1217 } 1218 } 1219 1220 streamMask &= ~indexToType(j); 1221 } 1222 } 1223 1224 fetcher->startAsync( 1225 sources[kAudioIndex], 1226 sources[kVideoIndex], 1227 sources[kSubtitleIndex], 1228 timeUs, 1229 latestTimeUs /* min start time(us) */, 1230 latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); 1231 } 1232 1233 // All fetchers have now been started, the configuration change 1234 // has completed. 1235 1236 scheduleCheckBandwidthEvent(); 1237 1238 ALOGV("XXX configuration change completed."); 1239 mReconfigurationInProgress = false; 1240 if (switching) { 1241 mSwitchInProgress = true; 1242 } else { 1243 mStreamMask = mNewStreamMask; 1244 } 1245 1246 if (mDisconnectReplyID != 0) { 1247 finishDisconnect(); 1248 } 1249} 1250 1251void LiveSession::onSwapped(const sp<AMessage> &msg) { 1252 int32_t switchGeneration; 1253 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1254 if (switchGeneration != mSwitchGeneration) { 1255 return; 1256 } 1257 1258 int32_t stream; 1259 CHECK(msg->findInt32("stream", &stream)); 1260 mSwapMask |= stream; 1261 if (mSwapMask != mStreamMask) { 1262 return; 1263 } 1264 1265 // Check if new variant contains extra streams. 1266 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1267 while (extraStreams) { 1268 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1269 swapPacketSource(extraStream); 1270 extraStreams &= ~extraStream; 1271 } 1272 1273 tryToFinishBandwidthSwitch(); 1274} 1275 1276// Mark switch done when: 1277// 1. all old buffers are swapped out, AND 1278// 2. all old fetchers are removed. 1279void LiveSession::tryToFinishBandwidthSwitch() { 1280 bool needToRemoveFetchers = false; 1281 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1282 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1283 needToRemoveFetchers = true; 1284 break; 1285 } 1286 } 1287 if (!needToRemoveFetchers && mSwapMask == mStreamMask) { 1288 mStreamMask = mNewStreamMask; 1289 mSwitchInProgress = false; 1290 mSwapMask = 0; 1291 } 1292} 1293 1294void LiveSession::scheduleCheckBandwidthEvent() { 1295 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1296 msg->setInt32("generation", mCheckBandwidthGeneration); 1297 msg->post(10000000ll); 1298} 1299 1300void LiveSession::cancelCheckBandwidthEvent() { 1301 ++mCheckBandwidthGeneration; 1302} 1303 1304void LiveSession::cancelBandwidthSwitch() { 1305 Mutex::Autolock lock(mSwapMutex); 1306 mSwitchGeneration++; 1307 mSwitchInProgress = false; 1308 mSwapMask = 0; 1309} 1310 1311bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1312 if (mReconfigurationInProgress || mSwitchInProgress) { 1313 return false; 1314 } 1315 1316 if (mPrevBandwidthIndex < 0) { 1317 return true; 1318 } 1319 1320 if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { 1321 return false; 1322 } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { 1323 return canSwitchUp(); 1324 } else { 1325 return true; 1326 } 1327} 1328 1329void LiveSession::onCheckBandwidth() { 1330 size_t bandwidthIndex = getBandwidthIndex(); 1331 if (canSwitchBandwidthTo(bandwidthIndex)) { 1332 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1333 } else { 1334 scheduleCheckBandwidthEvent(); 1335 } 1336 1337 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1338 // schedule another one on return, only an explicit call to 1339 // scheduleCheckBandwidthEvent will do that. 1340 // This ensures that only one configuration change is ongoing at any 1341 // one time, once that completes it'll schedule another check bandwidth 1342 // event. 1343} 1344 1345void LiveSession::postPrepared(status_t err) { 1346 CHECK(mInPreparationPhase); 1347 1348 sp<AMessage> notify = mNotify->dup(); 1349 if (err == OK || err == ERROR_END_OF_STREAM) { 1350 notify->setInt32("what", kWhatPrepared); 1351 } else { 1352 notify->setInt32("what", kWhatPreparationFailed); 1353 notify->setInt32("err", err); 1354 } 1355 1356 notify->post(); 1357 1358 mInPreparationPhase = false; 1359} 1360 1361} // namespace android 1362 1363