LiveSession.cpp revision a0b94395dc82c90ca437bb6fed7aa01fcbbffffe
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/stagefright/foundation/hexdump.h> 31#include <media/stagefright/foundation/ABuffer.h> 32#include <media/stagefright/foundation/ADebug.h> 33#include <media/stagefright/foundation/AMessage.h> 34#include <media/stagefright/DataSource.h> 35#include <media/stagefright/FileSource.h> 36#include <media/stagefright/MediaErrors.h> 37#include <media/stagefright/MetaData.h> 38#include <media/stagefright/Utils.h> 39 40#include <utils/Mutex.h> 41 42#include <ctype.h> 43#include <openssl/aes.h> 44#include <openssl/md5.h> 45 46namespace android { 47 48LiveSession::LiveSession( 49 const sp<AMessage> ¬ify, uint32_t flags, bool uidValid, uid_t uid) 50 : mNotify(notify), 51 mFlags(flags), 52 mUIDValid(uidValid), 53 mUID(uid), 54 mInPreparationPhase(true), 55 mHTTPDataSource( 56 HTTPBase::Create( 57 (mFlags & kFlagIncognito) 58 ? HTTPBase::kFlagIncognito 59 : 0)), 60 mPrevBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mLastDequeuedTimeUs(0ll), 67 mRealTimeBaseUs(0ll), 68 mReconfigurationInProgress(false), 69 mSwitchInProgress(false), 70 mDisconnectReplyID(0), 71 mSeekReplyID(0) { 72 if (mUIDValid) { 73 mHTTPDataSource->setUID(mUID); 74 } 75 76 mStreams[kAudioIndex] = StreamItem("audio"); 77 mStreams[kVideoIndex] = StreamItem("video"); 78 mStreams[kSubtitleIndex] = StreamItem("subtitle"); 79 80 for (size_t i = 0; i < kMaxStreams; ++i) { 81 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 82 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 83 } 84} 85 86LiveSession::~LiveSession() { 87} 88 89sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 90 ABuffer *discontinuity = new ABuffer(0); 91 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 92 discontinuity->meta()->setInt32("swapPacketSource", swap); 93 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 94 discontinuity->meta()->setInt64("timeUs", -1); 95 return discontinuity; 96} 97 98void LiveSession::swapPacketSource(StreamType stream) { 99 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 100 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 101 sp<AnotherPacketSource> tmp = aps; 102 aps = aps2; 103 aps2 = tmp; 104 aps2->clear(); 105} 106 107status_t LiveSession::dequeueAccessUnit( 108 StreamType stream, sp<ABuffer> *accessUnit) { 109 if (!(mStreamMask & stream)) { 110 // return -EWOULDBLOCK to avoid halting the decoder 111 // when switching between audio/video and audio only. 112 return -EWOULDBLOCK; 113 } 114 115 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 116 117 status_t finalResult; 118 if (!packetSource->hasBufferAvailable(&finalResult)) { 119 return finalResult == OK ? -EAGAIN : finalResult; 120 } 121 122 status_t err = packetSource->dequeueAccessUnit(accessUnit); 123 124 const char *streamStr; 125 switch (stream) { 126 case STREAMTYPE_AUDIO: 127 streamStr = "audio"; 128 break; 129 case STREAMTYPE_VIDEO: 130 streamStr = "video"; 131 break; 132 case STREAMTYPE_SUBTITLES: 133 streamStr = "subs"; 134 break; 135 default: 136 TRESPASS(); 137 } 138 139 if (err == INFO_DISCONTINUITY) { 140 int32_t type; 141 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 142 143 sp<AMessage> extra; 144 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 145 extra.clear(); 146 } 147 148 ALOGI("[%s] read discontinuity of type %d, extra = %s", 149 streamStr, 150 type, 151 extra == NULL ? "NULL" : extra->debugString().c_str()); 152 153 int32_t swap; 154 if (type == ATSParser::DISCONTINUITY_FORMATCHANGE 155 && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) 156 && swap) { 157 158 int32_t switchGeneration; 159 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 160 { 161 Mutex::Autolock lock(mSwapMutex); 162 if (switchGeneration == mSwitchGeneration) { 163 swapPacketSource(stream); 164 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 165 msg->setInt32("stream", stream); 166 msg->setInt32("switchGeneration", switchGeneration); 167 msg->post(); 168 } 169 } 170 } 171 } else if (err == OK) { 172 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 173 int64_t timeUs; 174 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 175 ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs); 176 177 mLastDequeuedTimeUs = timeUs; 178 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 179 } else if (stream == STREAMTYPE_SUBTITLES) { 180 (*accessUnit)->meta()->setInt32( 181 "trackIndex", mPlaylist->getSelectedIndex()); 182 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 183 } 184 } else { 185 ALOGI("[%s] encountered error %d", streamStr, err); 186 } 187 188 return err; 189} 190 191status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 192 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 193 if (!(mStreamMask & stream)) { 194 return UNKNOWN_ERROR; 195 } 196 197 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 198 199 sp<MetaData> meta = packetSource->getFormat(); 200 201 if (meta == NULL) { 202 return -EAGAIN; 203 } 204 205 return convertMetaDataToMessage(meta, format); 206} 207 208void LiveSession::connectAsync( 209 const char *url, const KeyedVector<String8, String8> *headers) { 210 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 211 msg->setString("url", url); 212 213 if (headers != NULL) { 214 msg->setPointer( 215 "headers", 216 new KeyedVector<String8, String8>(*headers)); 217 } 218 219 msg->post(); 220} 221 222status_t LiveSession::disconnect() { 223 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 224 225 sp<AMessage> response; 226 status_t err = msg->postAndAwaitResponse(&response); 227 228 return err; 229} 230 231status_t LiveSession::seekTo(int64_t timeUs) { 232 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 233 msg->setInt64("timeUs", timeUs); 234 235 sp<AMessage> response; 236 status_t err = msg->postAndAwaitResponse(&response); 237 238 uint32_t replyID; 239 CHECK(response == mSeekReply && 0 != mSeekReplyID); 240 mSeekReply.clear(); 241 mSeekReplyID = 0; 242 return err; 243} 244 245void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 246 switch (msg->what()) { 247 case kWhatConnect: 248 { 249 onConnect(msg); 250 break; 251 } 252 253 case kWhatDisconnect: 254 { 255 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 256 257 if (mReconfigurationInProgress) { 258 break; 259 } 260 261 finishDisconnect(); 262 break; 263 } 264 265 case kWhatSeek: 266 { 267 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 268 269 status_t err = onSeek(msg); 270 271 mSeekReply = new AMessage; 272 mSeekReply->setInt32("err", err); 273 break; 274 } 275 276 case kWhatFetcherNotify: 277 { 278 int32_t what; 279 CHECK(msg->findInt32("what", &what)); 280 281 switch (what) { 282 case PlaylistFetcher::kWhatStarted: 283 break; 284 case PlaylistFetcher::kWhatPaused: 285 case PlaylistFetcher::kWhatStopped: 286 { 287 if (what == PlaylistFetcher::kWhatStopped) { 288 AString uri; 289 CHECK(msg->findString("uri", &uri)); 290 if (mFetcherInfos.removeItem(uri) < 0) { 291 // ignore duplicated kWhatStopped messages. 292 break; 293 } 294 295 tryToFinishBandwidthSwitch(); 296 } 297 298 if (mContinuation != NULL) { 299 CHECK_GT(mContinuationCounter, 0); 300 if (--mContinuationCounter == 0) { 301 mContinuation->post(); 302 303 if (mSeekReplyID != 0) { 304 CHECK(mSeekReply != NULL); 305 mSeekReply->postReply(mSeekReplyID); 306 } 307 } 308 } 309 break; 310 } 311 312 case PlaylistFetcher::kWhatDurationUpdate: 313 { 314 AString uri; 315 CHECK(msg->findString("uri", &uri)); 316 317 int64_t durationUs; 318 CHECK(msg->findInt64("durationUs", &durationUs)); 319 320 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 321 info->mDurationUs = durationUs; 322 break; 323 } 324 325 case PlaylistFetcher::kWhatError: 326 { 327 status_t err; 328 CHECK(msg->findInt32("err", &err)); 329 330 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 331 332 if (mInPreparationPhase) { 333 postPrepared(err); 334 } 335 336 cancelBandwidthSwitch(); 337 338 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 339 340 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 341 342 mPacketSources.valueFor( 343 STREAMTYPE_SUBTITLES)->signalEOS(err); 344 345 sp<AMessage> notify = mNotify->dup(); 346 notify->setInt32("what", kWhatError); 347 notify->setInt32("err", err); 348 notify->post(); 349 break; 350 } 351 352 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 353 { 354 AString uri; 355 CHECK(msg->findString("uri", &uri)); 356 357 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 358 info->mIsPrepared = true; 359 360 if (mInPreparationPhase) { 361 bool allFetchersPrepared = true; 362 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 363 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 364 allFetchersPrepared = false; 365 break; 366 } 367 } 368 369 if (allFetchersPrepared) { 370 postPrepared(OK); 371 } 372 } 373 break; 374 } 375 376 case PlaylistFetcher::kWhatStartedAt: 377 { 378 int32_t switchGeneration; 379 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 380 381 if (switchGeneration != mSwitchGeneration) { 382 break; 383 } 384 385 // Resume fetcher for the original variant; the resumed fetcher should 386 // continue until the timestamps found in msg, which is stored by the 387 // new fetcher to indicate where the new variant has started buffering. 388 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 389 const FetcherInfo info = mFetcherInfos.valueAt(i); 390 if (info.mToBeRemoved) { 391 info.mFetcher->resumeUntilAsync(msg); 392 } 393 } 394 break; 395 } 396 397 default: 398 TRESPASS(); 399 } 400 401 break; 402 } 403 404 case kWhatCheckBandwidth: 405 { 406 int32_t generation; 407 CHECK(msg->findInt32("generation", &generation)); 408 409 if (generation != mCheckBandwidthGeneration) { 410 break; 411 } 412 413 onCheckBandwidth(); 414 break; 415 } 416 417 case kWhatChangeConfiguration: 418 { 419 onChangeConfiguration(msg); 420 break; 421 } 422 423 case kWhatChangeConfiguration2: 424 { 425 onChangeConfiguration2(msg); 426 break; 427 } 428 429 case kWhatChangeConfiguration3: 430 { 431 onChangeConfiguration3(msg); 432 break; 433 } 434 435 case kWhatFinishDisconnect2: 436 { 437 onFinishDisconnect2(); 438 break; 439 } 440 441 case kWhatSwapped: 442 { 443 onSwapped(msg); 444 break; 445 } 446 default: 447 TRESPASS(); 448 break; 449 } 450} 451 452// static 453int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 454 if (a->mBandwidth < b->mBandwidth) { 455 return -1; 456 } else if (a->mBandwidth == b->mBandwidth) { 457 return 0; 458 } 459 460 return 1; 461} 462 463// static 464LiveSession::StreamType LiveSession::indexToType(int idx) { 465 CHECK(idx >= 0 && idx < kMaxStreams); 466 return (StreamType)(1 << idx); 467} 468 469void LiveSession::onConnect(const sp<AMessage> &msg) { 470 AString url; 471 CHECK(msg->findString("url", &url)); 472 473 KeyedVector<String8, String8> *headers = NULL; 474 if (!msg->findPointer("headers", (void **)&headers)) { 475 mExtraHeaders.clear(); 476 } else { 477 mExtraHeaders = *headers; 478 479 delete headers; 480 headers = NULL; 481 } 482 483#if 1 484 ALOGI("onConnect <URL suppressed>"); 485#else 486 ALOGI("onConnect %s", url.c_str()); 487#endif 488 489 mMasterURL = url; 490 491 bool dummy; 492 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 493 494 if (mPlaylist == NULL) { 495 ALOGE("unable to fetch master playlist <URL suppressed>."); 496 497 postPrepared(ERROR_IO); 498 return; 499 } 500 501 // We trust the content provider to make a reasonable choice of preferred 502 // initial bandwidth by listing it first in the variant playlist. 503 // At startup we really don't have a good estimate on the available 504 // network bandwidth since we haven't tranferred any data yet. Once 505 // we have we can make a better informed choice. 506 size_t initialBandwidth = 0; 507 size_t initialBandwidthIndex = 0; 508 509 if (mPlaylist->isVariantPlaylist()) { 510 for (size_t i = 0; i < mPlaylist->size(); ++i) { 511 BandwidthItem item; 512 513 item.mPlaylistIndex = i; 514 515 sp<AMessage> meta; 516 AString uri; 517 mPlaylist->itemAt(i, &uri, &meta); 518 519 unsigned long bandwidth; 520 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 521 522 if (initialBandwidth == 0) { 523 initialBandwidth = item.mBandwidth; 524 } 525 526 mBandwidthItems.push(item); 527 } 528 529 CHECK_GT(mBandwidthItems.size(), 0u); 530 531 mBandwidthItems.sort(SortByBandwidth); 532 533 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 534 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 535 initialBandwidthIndex = i; 536 break; 537 } 538 } 539 } else { 540 // dummy item. 541 BandwidthItem item; 542 item.mPlaylistIndex = 0; 543 item.mBandwidth = 0; 544 mBandwidthItems.push(item); 545 } 546 547 changeConfiguration( 548 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); 549} 550 551void LiveSession::finishDisconnect() { 552 // No reconfiguration is currently pending, make sure none will trigger 553 // during disconnection either. 554 cancelCheckBandwidthEvent(); 555 556 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 557 // (finishDisconnect, onFinishDisconnect2) 558 cancelBandwidthSwitch(); 559 560 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 561 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 562 } 563 564 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 565 566 mContinuationCounter = mFetcherInfos.size(); 567 mContinuation = msg; 568 569 if (mContinuationCounter == 0) { 570 msg->post(); 571 } 572} 573 574void LiveSession::onFinishDisconnect2() { 575 mContinuation.clear(); 576 577 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 578 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 579 580 mPacketSources.valueFor( 581 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 582 583 sp<AMessage> response = new AMessage; 584 response->setInt32("err", OK); 585 586 response->postReply(mDisconnectReplyID); 587 mDisconnectReplyID = 0; 588} 589 590sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 591 ssize_t index = mFetcherInfos.indexOfKey(uri); 592 593 if (index >= 0) { 594 return NULL; 595 } 596 597 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 598 notify->setString("uri", uri); 599 notify->setInt32("switchGeneration", mSwitchGeneration); 600 601 FetcherInfo info; 602 info.mFetcher = new PlaylistFetcher(notify, this, uri); 603 info.mDurationUs = -1ll; 604 info.mIsPrepared = false; 605 info.mToBeRemoved = false; 606 looper()->registerHandler(info.mFetcher); 607 608 mFetcherInfos.add(uri, info); 609 610 return info.mFetcher; 611} 612 613/* 614 * Illustration of parameters: 615 * 616 * 0 `range_offset` 617 * +------------+-------------------------------------------------------+--+--+ 618 * | | | next block to fetch | | | 619 * | | `source` handle => `out` buffer | | | | 620 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 621 * | |<----------- `range_length` / buffer capacity ----------->| | 622 * |<------------------------------ file_size ------------------------------->| 623 * 624 * Special parameter values: 625 * - range_length == -1 means entire file 626 * - block_size == 0 means entire range 627 * 628 */ 629status_t LiveSession::fetchFile( 630 const char *url, sp<ABuffer> *out, 631 int64_t range_offset, int64_t range_length, 632 uint32_t block_size, /* download block size */ 633 sp<DataSource> *source, /* to return and reuse source */ 634 String8 *actualUrl) { 635 off64_t size; 636 sp<DataSource> temp_source; 637 if (source == NULL) { 638 source = &temp_source; 639 } 640 641 if (*source == NULL) { 642 if (!strncasecmp(url, "file://", 7)) { 643 *source = new FileSource(url + 7); 644 } else if (strncasecmp(url, "http://", 7) 645 && strncasecmp(url, "https://", 8)) { 646 return ERROR_UNSUPPORTED; 647 } else { 648 KeyedVector<String8, String8> headers = mExtraHeaders; 649 if (range_offset > 0 || range_length >= 0) { 650 headers.add( 651 String8("Range"), 652 String8( 653 StringPrintf( 654 "bytes=%lld-%s", 655 range_offset, 656 range_length < 0 657 ? "" : StringPrintf("%lld", 658 range_offset + range_length - 1).c_str()).c_str())); 659 } 660 status_t err = mHTTPDataSource->connect(url, &headers); 661 662 if (err != OK) { 663 return err; 664 } 665 666 *source = mHTTPDataSource; 667 } 668 } 669 670 status_t getSizeErr = (*source)->getSize(&size); 671 if (getSizeErr != OK) { 672 size = 65536; 673 } 674 675 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 676 if (*out == NULL) { 677 buffer->setRange(0, 0); 678 } 679 680 // adjust range_length if only reading partial block 681 if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) { 682 range_length = buffer->size() + block_size; 683 } 684 for (;;) { 685 // Only resize when we don't know the size. 686 size_t bufferRemaining = buffer->capacity() - buffer->size(); 687 if (bufferRemaining == 0 && getSizeErr != OK) { 688 bufferRemaining = 32768; 689 690 ALOGV("increasing download buffer to %d bytes", 691 buffer->size() + bufferRemaining); 692 693 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 694 memcpy(copy->data(), buffer->data(), buffer->size()); 695 copy->setRange(0, buffer->size()); 696 697 buffer = copy; 698 } 699 700 size_t maxBytesToRead = bufferRemaining; 701 if (range_length >= 0) { 702 int64_t bytesLeftInRange = range_length - buffer->size(); 703 if (bytesLeftInRange < maxBytesToRead) { 704 maxBytesToRead = bytesLeftInRange; 705 706 if (bytesLeftInRange == 0) { 707 break; 708 } 709 } 710 } 711 712 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 713 // to help us break out of the loop. 714 ssize_t n = (*source)->readAt( 715 buffer->size(), buffer->data() + buffer->size(), 716 maxBytesToRead); 717 718 if (n < 0) { 719 return n; 720 } 721 722 if (n == 0) { 723 break; 724 } 725 726 buffer->setRange(0, buffer->size() + (size_t)n); 727 } 728 729 *out = buffer; 730 if (actualUrl != NULL) { 731 *actualUrl = (*source)->getUri(); 732 if (actualUrl->isEmpty()) { 733 *actualUrl = url; 734 } 735 } 736 737 return OK; 738} 739 740sp<M3UParser> LiveSession::fetchPlaylist( 741 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 742 ALOGV("fetchPlaylist '%s'", url); 743 744 *unchanged = false; 745 746 sp<ABuffer> buffer; 747 String8 actualUrl; 748 status_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 749 750 if (err != OK) { 751 return NULL; 752 } 753 754 // MD5 functionality is not available on the simulator, treat all 755 // playlists as changed. 756 757#if defined(HAVE_ANDROID_OS) 758 uint8_t hash[16]; 759 760 MD5_CTX m; 761 MD5_Init(&m); 762 MD5_Update(&m, buffer->data(), buffer->size()); 763 764 MD5_Final(hash, &m); 765 766 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 767 // playlist unchanged 768 *unchanged = true; 769 770 return NULL; 771 } 772 773 if (curPlaylistHash != NULL) { 774 memcpy(curPlaylistHash, hash, sizeof(hash)); 775 } 776#endif 777 778 sp<M3UParser> playlist = 779 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 780 781 if (playlist->initCheck() != OK) { 782 ALOGE("failed to parse .m3u8 playlist"); 783 784 return NULL; 785 } 786 787 return playlist; 788} 789 790static double uniformRand() { 791 return (double)rand() / RAND_MAX; 792} 793 794size_t LiveSession::getBandwidthIndex() { 795 if (mBandwidthItems.size() == 0) { 796 return 0; 797 } 798 799#if 1 800 char value[PROPERTY_VALUE_MAX]; 801 ssize_t index = -1; 802 if (property_get("media.httplive.bw-index", value, NULL)) { 803 char *end; 804 index = strtol(value, &end, 10); 805 CHECK(end > value && *end == '\0'); 806 807 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 808 index = mBandwidthItems.size() - 1; 809 } 810 } 811 812 if (index < 0) { 813 int32_t bandwidthBps; 814 if (mHTTPDataSource != NULL 815 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 816 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 817 } else { 818 ALOGV("no bandwidth estimate."); 819 return 0; // Pick the lowest bandwidth stream by default. 820 } 821 822 char value[PROPERTY_VALUE_MAX]; 823 if (property_get("media.httplive.max-bw", value, NULL)) { 824 char *end; 825 long maxBw = strtoul(value, &end, 10); 826 if (end > value && *end == '\0') { 827 if (maxBw > 0 && bandwidthBps > maxBw) { 828 ALOGV("bandwidth capped to %ld bps", maxBw); 829 bandwidthBps = maxBw; 830 } 831 } 832 } 833 834 // Consider only 80% of the available bandwidth usable. 835 bandwidthBps = (bandwidthBps * 8) / 10; 836 837 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 838 839 index = mBandwidthItems.size() - 1; 840 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 841 > (size_t)bandwidthBps) { 842 --index; 843 } 844 } 845#elif 0 846 // Change bandwidth at random() 847 size_t index = uniformRand() * mBandwidthItems.size(); 848#elif 0 849 // There's a 50% chance to stay on the current bandwidth and 850 // a 50% chance to switch to the next higher bandwidth (wrapping around 851 // to lowest) 852 const size_t kMinIndex = 0; 853 854 static ssize_t mPrevBandwidthIndex = -1; 855 856 size_t index; 857 if (mPrevBandwidthIndex < 0) { 858 index = kMinIndex; 859 } else if (uniformRand() < 0.5) { 860 index = (size_t)mPrevBandwidthIndex; 861 } else { 862 index = mPrevBandwidthIndex + 1; 863 if (index == mBandwidthItems.size()) { 864 index = kMinIndex; 865 } 866 } 867 mPrevBandwidthIndex = index; 868#elif 0 869 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 870 871 size_t index = mBandwidthItems.size() - 1; 872 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 873 --index; 874 } 875#elif 1 876 char value[PROPERTY_VALUE_MAX]; 877 size_t index; 878 if (property_get("media.httplive.bw-index", value, NULL)) { 879 char *end; 880 index = strtoul(value, &end, 10); 881 CHECK(end > value && *end == '\0'); 882 883 if (index >= mBandwidthItems.size()) { 884 index = mBandwidthItems.size() - 1; 885 } 886 } else { 887 index = 0; 888 } 889#else 890 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 891#endif 892 893 CHECK_GE(index, 0); 894 895 return index; 896} 897 898status_t LiveSession::onSeek(const sp<AMessage> &msg) { 899 int64_t timeUs; 900 CHECK(msg->findInt64("timeUs", &timeUs)); 901 902 if (!mReconfigurationInProgress) { 903 changeConfiguration(timeUs, getBandwidthIndex()); 904 } 905 906 return OK; 907} 908 909status_t LiveSession::getDuration(int64_t *durationUs) const { 910 int64_t maxDurationUs = 0ll; 911 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 912 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 913 914 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 915 maxDurationUs = fetcherDurationUs; 916 } 917 } 918 919 *durationUs = maxDurationUs; 920 921 return OK; 922} 923 924bool LiveSession::isSeekable() const { 925 int64_t durationUs; 926 return getDuration(&durationUs) == OK && durationUs >= 0; 927} 928 929bool LiveSession::hasDynamicDuration() const { 930 return false; 931} 932 933status_t LiveSession::getTrackInfo(Parcel *reply) const { 934 return mPlaylist->getTrackInfo(reply); 935} 936 937status_t LiveSession::selectTrack(size_t index, bool select) { 938 status_t err = mPlaylist->selectTrack(index, select); 939 if (err == OK) { 940 (new AMessage(kWhatChangeConfiguration, id()))->post(); 941 } 942 return err; 943} 944 945bool LiveSession::canSwitchUp() { 946 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 947 status_t err = OK; 948 for (size_t i = 0; i < mPacketSources.size(); ++i) { 949 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 950 int64_t dur = source->getBufferedDurationUs(&err); 951 if (err == OK && dur > 10000000) { 952 return true; 953 } 954 } 955 return false; 956} 957 958void LiveSession::changeConfiguration( 959 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 960 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 961 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 962 cancelBandwidthSwitch(); 963 964 CHECK(!mReconfigurationInProgress); 965 mReconfigurationInProgress = true; 966 967 mPrevBandwidthIndex = bandwidthIndex; 968 969 ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d", 970 timeUs, bandwidthIndex, pickTrack); 971 972 if (pickTrack) { 973 mPlaylist->pickRandomMediaItems(); 974 } 975 976 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 977 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 978 979 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 980 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 981 982 AString URIs[kMaxStreams]; 983 for (size_t i = 0; i < kMaxStreams; ++i) { 984 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 985 streamMask |= indexToType(i); 986 } 987 } 988 989 // Step 1, stop and discard fetchers that are no longer needed. 990 // Pause those that we'll reuse. 991 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 992 const AString &uri = mFetcherInfos.keyAt(i); 993 994 bool discardFetcher = true; 995 996 // If we're seeking all current fetchers are discarded. 997 if (timeUs < 0ll) { 998 // delay fetcher removal 999 discardFetcher = false; 1000 1001 for (size_t j = 0; j < kMaxStreams; ++j) { 1002 StreamType type = indexToType(j); 1003 if ((streamMask & type) && uri == URIs[j]) { 1004 resumeMask |= type; 1005 streamMask &= ~type; 1006 } 1007 } 1008 } 1009 1010 if (discardFetcher) { 1011 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1012 } else { 1013 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1014 } 1015 } 1016 1017 sp<AMessage> msg; 1018 if (timeUs < 0ll) { 1019 // skip onChangeConfiguration2 (decoder destruction) if switching. 1020 msg = new AMessage(kWhatChangeConfiguration3, id()); 1021 } else { 1022 msg = new AMessage(kWhatChangeConfiguration2, id()); 1023 } 1024 msg->setInt32("streamMask", streamMask); 1025 msg->setInt32("resumeMask", resumeMask); 1026 msg->setInt64("timeUs", timeUs); 1027 for (size_t i = 0; i < kMaxStreams; ++i) { 1028 if (streamMask & indexToType(i)) { 1029 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1030 } 1031 } 1032 1033 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1034 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1035 // fetchers have completed their asynchronous operation, we'll post 1036 // mContinuation, which then is handled below in onChangeConfiguration2. 1037 mContinuationCounter = mFetcherInfos.size(); 1038 mContinuation = msg; 1039 1040 if (mContinuationCounter == 0) { 1041 msg->post(); 1042 1043 if (mSeekReplyID != 0) { 1044 CHECK(mSeekReply != NULL); 1045 mSeekReply->postReply(mSeekReplyID); 1046 } 1047 } 1048} 1049 1050void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1051 if (!mReconfigurationInProgress) { 1052 changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); 1053 } else { 1054 msg->post(1000000ll); // retry in 1 sec 1055 } 1056} 1057 1058void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1059 mContinuation.clear(); 1060 1061 // All fetchers are either suspended or have been removed now. 1062 1063 uint32_t streamMask; 1064 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1065 1066 AString URIs[kMaxStreams]; 1067 for (size_t i = 0; i < kMaxStreams; ++i) { 1068 if (streamMask & indexToType(i)) { 1069 const AString &uriKey = mStreams[i].uriKey(); 1070 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1071 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1072 } 1073 } 1074 1075 // Determine which decoders to shutdown on the player side, 1076 // a decoder has to be shutdown if either 1077 // 1) its streamtype was active before but now longer isn't. 1078 // or 1079 // 2) its streamtype was already active and still is but the URI 1080 // has changed. 1081 uint32_t changedMask = 0; 1082 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1083 if (((mStreamMask & streamMask & indexToType(i)) 1084 && !(URIs[i] == mStreams[i].mUri)) 1085 || (mStreamMask & ~streamMask & indexToType(i))) { 1086 changedMask |= indexToType(i); 1087 } 1088 } 1089 1090 if (changedMask == 0) { 1091 // If nothing changed as far as the audio/video decoders 1092 // are concerned we can proceed. 1093 onChangeConfiguration3(msg); 1094 return; 1095 } 1096 1097 // Something changed, inform the player which will shutdown the 1098 // corresponding decoders and will post the reply once that's done. 1099 // Handling the reply will continue executing below in 1100 // onChangeConfiguration3. 1101 sp<AMessage> notify = mNotify->dup(); 1102 notify->setInt32("what", kWhatStreamsChanged); 1103 notify->setInt32("changedMask", changedMask); 1104 1105 msg->setWhat(kWhatChangeConfiguration3); 1106 msg->setTarget(id()); 1107 1108 notify->setMessage("reply", msg); 1109 notify->post(); 1110} 1111 1112void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1113 mContinuation.clear(); 1114 // All remaining fetchers are still suspended, the player has shutdown 1115 // any decoders that needed it. 1116 1117 uint32_t streamMask, resumeMask; 1118 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1119 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1120 1121 for (size_t i = 0; i < kMaxStreams; ++i) { 1122 if (streamMask & indexToType(i)) { 1123 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1124 } 1125 } 1126 1127 int64_t timeUs; 1128 bool switching = false; 1129 CHECK(msg->findInt64("timeUs", &timeUs)); 1130 1131 if (timeUs < 0ll) { 1132 timeUs = mLastDequeuedTimeUs; 1133 switching = true; 1134 } 1135 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1136 1137 mNewStreamMask = streamMask; 1138 1139 // Of all existing fetchers: 1140 // * Resume fetchers that are still needed and assign them original packet sources. 1141 // * Mark otherwise unneeded fetchers for removal. 1142 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1143 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1144 const AString &uri = mFetcherInfos.keyAt(i); 1145 1146 sp<AnotherPacketSource> sources[kMaxStreams]; 1147 for (size_t j = 0; j < kMaxStreams; ++j) { 1148 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1149 sources[j] = mPacketSources.valueFor(indexToType(j)); 1150 } 1151 } 1152 1153 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1154 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1155 || sources[kSubtitleIndex] != NULL) { 1156 info.mFetcher->startAsync( 1157 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1158 } else { 1159 info.mToBeRemoved = true; 1160 } 1161 } 1162 1163 // streamMask now only contains the types that need a new fetcher created. 1164 1165 if (streamMask != 0) { 1166 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1167 } 1168 1169 // Find out when the original fetchers have buffered up to and start the new fetchers 1170 // at a later timestamp. 1171 for (size_t i = 0; i < kMaxStreams; i++) { 1172 if (!(indexToType(i) & streamMask)) { 1173 continue; 1174 } 1175 1176 AString uri; 1177 uri = mStreams[i].mUri; 1178 1179 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1180 CHECK(fetcher != NULL); 1181 1182 int32_t latestSeq = -1; 1183 int64_t latestTimeUs = 0ll; 1184 sp<AnotherPacketSource> sources[kMaxStreams]; 1185 1186 // TRICKY: looping from i as earlier streams are already removed from streamMask 1187 for (size_t j = i; j < kMaxStreams; ++j) { 1188 if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { 1189 sources[j] = mPacketSources.valueFor(indexToType(j)); 1190 1191 if (!switching) { 1192 sources[j]->clear(); 1193 } else { 1194 int32_t type, seq; 1195 int64_t srcTimeUs; 1196 sp<AMessage> meta = sources[j]->getLatestMeta(); 1197 1198 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1199 CHECK(meta->findInt32("seq", &seq)); 1200 if (seq > latestSeq) { 1201 latestSeq = seq; 1202 } 1203 CHECK(meta->findInt64("timeUs", &srcTimeUs)); 1204 if (srcTimeUs > latestTimeUs) { 1205 latestTimeUs = srcTimeUs; 1206 } 1207 } 1208 1209 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1210 sources[j]->clear(); 1211 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1212 if (extraStreams & indexToType(j)) { 1213 sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); 1214 } 1215 } 1216 1217 streamMask &= ~indexToType(j); 1218 } 1219 } 1220 1221 fetcher->startAsync( 1222 sources[kAudioIndex], 1223 sources[kVideoIndex], 1224 sources[kSubtitleIndex], 1225 timeUs, 1226 latestTimeUs /* min start time(us) */, 1227 latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); 1228 } 1229 1230 // All fetchers have now been started, the configuration change 1231 // has completed. 1232 1233 scheduleCheckBandwidthEvent(); 1234 1235 ALOGV("XXX configuration change completed."); 1236 mReconfigurationInProgress = false; 1237 if (switching) { 1238 mSwitchInProgress = true; 1239 } else { 1240 mStreamMask = mNewStreamMask; 1241 } 1242 1243 if (mDisconnectReplyID != 0) { 1244 finishDisconnect(); 1245 } 1246} 1247 1248void LiveSession::onSwapped(const sp<AMessage> &msg) { 1249 int32_t switchGeneration; 1250 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1251 if (switchGeneration != mSwitchGeneration) { 1252 return; 1253 } 1254 1255 int32_t stream; 1256 CHECK(msg->findInt32("stream", &stream)); 1257 mSwapMask |= stream; 1258 if (mSwapMask != mStreamMask) { 1259 return; 1260 } 1261 1262 // Check if new variant contains extra streams. 1263 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1264 while (extraStreams) { 1265 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1266 swapPacketSource(extraStream); 1267 extraStreams &= ~extraStream; 1268 } 1269 1270 tryToFinishBandwidthSwitch(); 1271} 1272 1273// Mark switch done when: 1274// 1. all old buffers are swapped out, AND 1275// 2. all old fetchers are removed. 1276void LiveSession::tryToFinishBandwidthSwitch() { 1277 bool needToRemoveFetchers = false; 1278 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1279 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1280 needToRemoveFetchers = true; 1281 break; 1282 } 1283 } 1284 if (!needToRemoveFetchers && mSwapMask == mStreamMask) { 1285 mStreamMask = mNewStreamMask; 1286 mSwitchInProgress = false; 1287 mSwapMask = 0; 1288 } 1289} 1290 1291void LiveSession::scheduleCheckBandwidthEvent() { 1292 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1293 msg->setInt32("generation", mCheckBandwidthGeneration); 1294 msg->post(10000000ll); 1295} 1296 1297void LiveSession::cancelCheckBandwidthEvent() { 1298 ++mCheckBandwidthGeneration; 1299} 1300 1301void LiveSession::cancelBandwidthSwitch() { 1302 Mutex::Autolock lock(mSwapMutex); 1303 mSwitchGeneration++; 1304 mSwitchInProgress = false; 1305 mSwapMask = 0; 1306} 1307 1308bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1309 if (mReconfigurationInProgress || mSwitchInProgress) { 1310 return false; 1311 } 1312 1313 if (mPrevBandwidthIndex < 0) { 1314 return true; 1315 } 1316 1317 if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { 1318 return false; 1319 } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { 1320 return canSwitchUp(); 1321 } else { 1322 return true; 1323 } 1324} 1325 1326void LiveSession::onCheckBandwidth() { 1327 size_t bandwidthIndex = getBandwidthIndex(); 1328 if (canSwitchBandwidthTo(bandwidthIndex)) { 1329 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1330 } else { 1331 scheduleCheckBandwidthEvent(); 1332 } 1333 1334 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1335 // schedule another one on return, only an explicit call to 1336 // scheduleCheckBandwidthEvent will do that. 1337 // This ensures that only one configuration change is ongoing at any 1338 // one time, once that completes it'll schedule another check bandwidth 1339 // event. 1340} 1341 1342void LiveSession::postPrepared(status_t err) { 1343 CHECK(mInPreparationPhase); 1344 1345 sp<AMessage> notify = mNotify->dup(); 1346 if (err == OK || err == ERROR_END_OF_STREAM) { 1347 notify->setInt32("what", kWhatPrepared); 1348 } else { 1349 notify->setInt32("what", kWhatPreparationFailed); 1350 notify->setInt32("err", err); 1351 } 1352 1353 notify->post(); 1354 1355 mInPreparationPhase = false; 1356} 1357 1358} // namespace android 1359 1360