LiveSession.cpp revision e4f25c280a8f1655c31a745978e0fcbc61f91dee
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <openssl/aes.h> 47#include <openssl/md5.h> 48 49namespace android { 50 51LiveSession::LiveSession( 52 const sp<AMessage> ¬ify, uint32_t flags, 53 const sp<IMediaHTTPService> &httpService) 54 : mNotify(notify), 55 mFlags(flags), 56 mHTTPService(httpService), 57 mInPreparationPhase(true), 58 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 59 mPrevBandwidthIndex(-1), 60 mStreamMask(0), 61 mNewStreamMask(0), 62 mSwapMask(0), 63 mCheckBandwidthGeneration(0), 64 mSwitchGeneration(0), 65 mLastDequeuedTimeUs(0ll), 66 mRealTimeBaseUs(0ll), 67 mReconfigurationInProgress(false), 68 mSwitchInProgress(false), 69 mDisconnectReplyID(0), 70 mSeekReplyID(0) { 71 72 mStreams[kAudioIndex] = StreamItem("audio"); 73 mStreams[kVideoIndex] = StreamItem("video"); 74 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 75 76 for (size_t i = 0; i < kMaxStreams; ++i) { 77 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 78 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 79 } 80} 81 82LiveSession::~LiveSession() { 83} 84 85sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 86 ABuffer *discontinuity = new ABuffer(0); 87 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 88 discontinuity->meta()->setInt32("swapPacketSource", swap); 89 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 90 discontinuity->meta()->setInt64("timeUs", -1); 91 return discontinuity; 92} 93 94void LiveSession::swapPacketSource(StreamType stream) { 95 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 96 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 97 sp<AnotherPacketSource> tmp = aps; 98 aps = aps2; 99 aps2 = tmp; 100 aps2->clear(); 101} 102 103status_t LiveSession::dequeueAccessUnit( 104 StreamType stream, sp<ABuffer> *accessUnit) { 105 if (!(mStreamMask & stream)) { 106 // return -EWOULDBLOCK to avoid halting the decoder 107 // when switching between audio/video and audio only. 108 return -EWOULDBLOCK; 109 } 110 111 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 112 113 status_t finalResult; 114 if (!packetSource->hasBufferAvailable(&finalResult)) { 115 return finalResult == OK ? -EAGAIN : finalResult; 116 } 117 118 status_t err = packetSource->dequeueAccessUnit(accessUnit); 119 120 const char *streamStr; 121 switch (stream) { 122 case STREAMTYPE_AUDIO: 123 streamStr = "audio"; 124 break; 125 case STREAMTYPE_VIDEO: 126 streamStr = "video"; 127 break; 128 case STREAMTYPE_SUBTITLES: 129 streamStr = "subs"; 130 break; 131 default: 132 TRESPASS(); 133 } 134 135 if (err == INFO_DISCONTINUITY) { 136 int32_t type; 137 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 138 139 sp<AMessage> extra; 140 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 141 extra.clear(); 142 } 143 144 ALOGI("[%s] read discontinuity of type %d, extra = %s", 145 streamStr, 146 type, 147 extra == NULL ? "NULL" : extra->debugString().c_str()); 148 149 int32_t swap; 150 if (type == ATSParser::DISCONTINUITY_FORMATCHANGE 151 && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) 152 && swap) { 153 154 int32_t switchGeneration; 155 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 156 { 157 Mutex::Autolock lock(mSwapMutex); 158 if (switchGeneration == mSwitchGeneration) { 159 swapPacketSource(stream); 160 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 161 msg->setInt32("stream", stream); 162 msg->setInt32("switchGeneration", switchGeneration); 163 msg->post(); 164 } 165 } 166 } 167 } else if (err == OK) { 168 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 169 int64_t timeUs; 170 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 171 ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs); 172 173 mLastDequeuedTimeUs = timeUs; 174 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 175 } else if (stream == STREAMTYPE_SUBTITLES) { 176 (*accessUnit)->meta()->setInt32( 177 "trackIndex", mPlaylist->getSelectedIndex()); 178 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 179 } 180 } else { 181 ALOGI("[%s] encountered error %d", streamStr, err); 182 } 183 184 return err; 185} 186 187status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 188 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 189 if (!(mStreamMask & stream)) { 190 return UNKNOWN_ERROR; 191 } 192 193 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 194 195 sp<MetaData> meta = packetSource->getFormat(); 196 197 if (meta == NULL) { 198 return -EAGAIN; 199 } 200 201 return convertMetaDataToMessage(meta, format); 202} 203 204void LiveSession::connectAsync( 205 const char *url, const KeyedVector<String8, String8> *headers) { 206 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 207 msg->setString("url", url); 208 209 if (headers != NULL) { 210 msg->setPointer( 211 "headers", 212 new KeyedVector<String8, String8>(*headers)); 213 } 214 215 msg->post(); 216} 217 218status_t LiveSession::disconnect() { 219 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 220 221 sp<AMessage> response; 222 status_t err = msg->postAndAwaitResponse(&response); 223 224 return err; 225} 226 227status_t LiveSession::seekTo(int64_t timeUs) { 228 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 229 msg->setInt64("timeUs", timeUs); 230 231 sp<AMessage> response; 232 status_t err = msg->postAndAwaitResponse(&response); 233 234 uint32_t replyID; 235 CHECK(response == mSeekReply && 0 != mSeekReplyID); 236 mSeekReply.clear(); 237 mSeekReplyID = 0; 238 return err; 239} 240 241void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 242 switch (msg->what()) { 243 case kWhatConnect: 244 { 245 onConnect(msg); 246 break; 247 } 248 249 case kWhatDisconnect: 250 { 251 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 252 253 if (mReconfigurationInProgress) { 254 break; 255 } 256 257 finishDisconnect(); 258 break; 259 } 260 261 case kWhatSeek: 262 { 263 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 264 265 status_t err = onSeek(msg); 266 267 mSeekReply = new AMessage; 268 mSeekReply->setInt32("err", err); 269 break; 270 } 271 272 case kWhatFetcherNotify: 273 { 274 int32_t what; 275 CHECK(msg->findInt32("what", &what)); 276 277 switch (what) { 278 case PlaylistFetcher::kWhatStarted: 279 break; 280 case PlaylistFetcher::kWhatPaused: 281 case PlaylistFetcher::kWhatStopped: 282 { 283 if (what == PlaylistFetcher::kWhatStopped) { 284 AString uri; 285 CHECK(msg->findString("uri", &uri)); 286 if (mFetcherInfos.removeItem(uri) < 0) { 287 // ignore duplicated kWhatStopped messages. 288 break; 289 } 290 291 tryToFinishBandwidthSwitch(); 292 } 293 294 if (mContinuation != NULL) { 295 CHECK_GT(mContinuationCounter, 0); 296 if (--mContinuationCounter == 0) { 297 mContinuation->post(); 298 299 if (mSeekReplyID != 0) { 300 CHECK(mSeekReply != NULL); 301 mSeekReply->postReply(mSeekReplyID); 302 } 303 } 304 } 305 break; 306 } 307 308 case PlaylistFetcher::kWhatDurationUpdate: 309 { 310 AString uri; 311 CHECK(msg->findString("uri", &uri)); 312 313 int64_t durationUs; 314 CHECK(msg->findInt64("durationUs", &durationUs)); 315 316 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 317 info->mDurationUs = durationUs; 318 break; 319 } 320 321 case PlaylistFetcher::kWhatError: 322 { 323 status_t err; 324 CHECK(msg->findInt32("err", &err)); 325 326 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 327 328 if (mInPreparationPhase) { 329 postPrepared(err); 330 } 331 332 cancelBandwidthSwitch(); 333 334 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 335 336 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 337 338 mPacketSources.valueFor( 339 STREAMTYPE_SUBTITLES)->signalEOS(err); 340 341 sp<AMessage> notify = mNotify->dup(); 342 notify->setInt32("what", kWhatError); 343 notify->setInt32("err", err); 344 notify->post(); 345 break; 346 } 347 348 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 349 { 350 AString uri; 351 CHECK(msg->findString("uri", &uri)); 352 353 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 354 info->mIsPrepared = true; 355 356 if (mInPreparationPhase) { 357 bool allFetchersPrepared = true; 358 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 359 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 360 allFetchersPrepared = false; 361 break; 362 } 363 } 364 365 if (allFetchersPrepared) { 366 postPrepared(OK); 367 } 368 } 369 break; 370 } 371 372 case PlaylistFetcher::kWhatStartedAt: 373 { 374 int32_t switchGeneration; 375 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 376 377 if (switchGeneration != mSwitchGeneration) { 378 break; 379 } 380 381 // Resume fetcher for the original variant; the resumed fetcher should 382 // continue until the timestamps found in msg, which is stored by the 383 // new fetcher to indicate where the new variant has started buffering. 384 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 385 const FetcherInfo info = mFetcherInfos.valueAt(i); 386 if (info.mToBeRemoved) { 387 info.mFetcher->resumeUntilAsync(msg); 388 } 389 } 390 break; 391 } 392 393 default: 394 TRESPASS(); 395 } 396 397 break; 398 } 399 400 case kWhatCheckBandwidth: 401 { 402 int32_t generation; 403 CHECK(msg->findInt32("generation", &generation)); 404 405 if (generation != mCheckBandwidthGeneration) { 406 break; 407 } 408 409 onCheckBandwidth(); 410 break; 411 } 412 413 case kWhatChangeConfiguration: 414 { 415 onChangeConfiguration(msg); 416 break; 417 } 418 419 case kWhatChangeConfiguration2: 420 { 421 onChangeConfiguration2(msg); 422 break; 423 } 424 425 case kWhatChangeConfiguration3: 426 { 427 onChangeConfiguration3(msg); 428 break; 429 } 430 431 case kWhatFinishDisconnect2: 432 { 433 onFinishDisconnect2(); 434 break; 435 } 436 437 case kWhatSwapped: 438 { 439 onSwapped(msg); 440 break; 441 } 442 default: 443 TRESPASS(); 444 break; 445 } 446} 447 448// static 449int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 450 if (a->mBandwidth < b->mBandwidth) { 451 return -1; 452 } else if (a->mBandwidth == b->mBandwidth) { 453 return 0; 454 } 455 456 return 1; 457} 458 459// static 460LiveSession::StreamType LiveSession::indexToType(int idx) { 461 CHECK(idx >= 0 && idx < kMaxStreams); 462 return (StreamType)(1 << idx); 463} 464 465void LiveSession::onConnect(const sp<AMessage> &msg) { 466 AString url; 467 CHECK(msg->findString("url", &url)); 468 469 KeyedVector<String8, String8> *headers = NULL; 470 if (!msg->findPointer("headers", (void **)&headers)) { 471 mExtraHeaders.clear(); 472 } else { 473 mExtraHeaders = *headers; 474 475 delete headers; 476 headers = NULL; 477 } 478 479#if 1 480 ALOGI("onConnect <URL suppressed>"); 481#else 482 ALOGI("onConnect %s", url.c_str()); 483#endif 484 485 mMasterURL = url; 486 487 bool dummy; 488 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 489 490 if (mPlaylist == NULL) { 491 ALOGE("unable to fetch master playlist '%s'.", url.c_str()); 492 493 postPrepared(ERROR_IO); 494 return; 495 } 496 497 // We trust the content provider to make a reasonable choice of preferred 498 // initial bandwidth by listing it first in the variant playlist. 499 // At startup we really don't have a good estimate on the available 500 // network bandwidth since we haven't tranferred any data yet. Once 501 // we have we can make a better informed choice. 502 size_t initialBandwidth = 0; 503 size_t initialBandwidthIndex = 0; 504 505 if (mPlaylist->isVariantPlaylist()) { 506 for (size_t i = 0; i < mPlaylist->size(); ++i) { 507 BandwidthItem item; 508 509 item.mPlaylistIndex = i; 510 511 sp<AMessage> meta; 512 AString uri; 513 mPlaylist->itemAt(i, &uri, &meta); 514 515 unsigned long bandwidth; 516 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 517 518 if (initialBandwidth == 0) { 519 initialBandwidth = item.mBandwidth; 520 } 521 522 mBandwidthItems.push(item); 523 } 524 525 CHECK_GT(mBandwidthItems.size(), 0u); 526 527 mBandwidthItems.sort(SortByBandwidth); 528 529 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 530 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 531 initialBandwidthIndex = i; 532 break; 533 } 534 } 535 } else { 536 // dummy item. 537 BandwidthItem item; 538 item.mPlaylistIndex = 0; 539 item.mBandwidth = 0; 540 mBandwidthItems.push(item); 541 } 542 543 changeConfiguration( 544 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); 545} 546 547void LiveSession::finishDisconnect() { 548 // No reconfiguration is currently pending, make sure none will trigger 549 // during disconnection either. 550 cancelCheckBandwidthEvent(); 551 552 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 553 // (finishDisconnect, onFinishDisconnect2) 554 cancelBandwidthSwitch(); 555 556 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 557 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 558 } 559 560 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 561 562 mContinuationCounter = mFetcherInfos.size(); 563 mContinuation = msg; 564 565 if (mContinuationCounter == 0) { 566 msg->post(); 567 } 568} 569 570void LiveSession::onFinishDisconnect2() { 571 mContinuation.clear(); 572 573 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 574 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 575 576 mPacketSources.valueFor( 577 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 578 579 sp<AMessage> response = new AMessage; 580 response->setInt32("err", OK); 581 582 response->postReply(mDisconnectReplyID); 583 mDisconnectReplyID = 0; 584} 585 586sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 587 ssize_t index = mFetcherInfos.indexOfKey(uri); 588 589 if (index >= 0) { 590 return NULL; 591 } 592 593 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 594 notify->setString("uri", uri); 595 notify->setInt32("switchGeneration", mSwitchGeneration); 596 597 FetcherInfo info; 598 info.mFetcher = new PlaylistFetcher(notify, this, uri); 599 info.mDurationUs = -1ll; 600 info.mIsPrepared = false; 601 info.mToBeRemoved = false; 602 looper()->registerHandler(info.mFetcher); 603 604 mFetcherInfos.add(uri, info); 605 606 return info.mFetcher; 607} 608 609/* 610 * Illustration of parameters: 611 * 612 * 0 `range_offset` 613 * +------------+-------------------------------------------------------+--+--+ 614 * | | | next block to fetch | | | 615 * | | `source` handle => `out` buffer | | | | 616 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 617 * | |<----------- `range_length` / buffer capacity ----------->| | 618 * |<------------------------------ file_size ------------------------------->| 619 * 620 * Special parameter values: 621 * - range_length == -1 means entire file 622 * - block_size == 0 means entire range 623 * 624 */ 625status_t LiveSession::fetchFile( 626 const char *url, sp<ABuffer> *out, 627 int64_t range_offset, int64_t range_length, 628 uint32_t block_size, /* download block size */ 629 sp<DataSource> *source, /* to return and reuse source */ 630 String8 *actualUrl) { 631 off64_t size; 632 sp<DataSource> temp_source; 633 if (source == NULL) { 634 source = &temp_source; 635 } 636 637 if (*source == NULL) { 638 if (!strncasecmp(url, "file://", 7)) { 639 *source = new FileSource(url + 7); 640 } else if (strncasecmp(url, "http://", 7) 641 && strncasecmp(url, "https://", 8)) { 642 return ERROR_UNSUPPORTED; 643 } else { 644 KeyedVector<String8, String8> headers = mExtraHeaders; 645 if (range_offset > 0 || range_length >= 0) { 646 headers.add( 647 String8("Range"), 648 String8( 649 StringPrintf( 650 "bytes=%lld-%s", 651 range_offset, 652 range_length < 0 653 ? "" : StringPrintf("%lld", 654 range_offset + range_length - 1).c_str()).c_str())); 655 } 656 status_t err = mHTTPDataSource->connect(url, &headers); 657 658 if (err != OK) { 659 return err; 660 } 661 662 *source = mHTTPDataSource; 663 } 664 } 665 666 status_t getSizeErr = (*source)->getSize(&size); 667 if (getSizeErr != OK) { 668 size = 65536; 669 } 670 671 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 672 if (*out == NULL) { 673 buffer->setRange(0, 0); 674 } 675 676 // adjust range_length if only reading partial block 677 if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) { 678 range_length = buffer->size() + block_size; 679 } 680 for (;;) { 681 // Only resize when we don't know the size. 682 size_t bufferRemaining = buffer->capacity() - buffer->size(); 683 if (bufferRemaining == 0 && getSizeErr != OK) { 684 bufferRemaining = 32768; 685 686 ALOGV("increasing download buffer to %d bytes", 687 buffer->size() + bufferRemaining); 688 689 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 690 memcpy(copy->data(), buffer->data(), buffer->size()); 691 copy->setRange(0, buffer->size()); 692 693 buffer = copy; 694 } 695 696 size_t maxBytesToRead = bufferRemaining; 697 if (range_length >= 0) { 698 int64_t bytesLeftInRange = range_length - buffer->size(); 699 if (bytesLeftInRange < maxBytesToRead) { 700 maxBytesToRead = bytesLeftInRange; 701 702 if (bytesLeftInRange == 0) { 703 break; 704 } 705 } 706 } 707 708 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 709 // to help us break out of the loop. 710 ssize_t n = (*source)->readAt( 711 buffer->size(), buffer->data() + buffer->size(), 712 maxBytesToRead); 713 714 if (n < 0) { 715 return n; 716 } 717 718 if (n == 0) { 719 break; 720 } 721 722 buffer->setRange(0, buffer->size() + (size_t)n); 723 } 724 725 *out = buffer; 726 if (actualUrl != NULL) { 727 *actualUrl = (*source)->getUri(); 728 if (actualUrl->isEmpty()) { 729 *actualUrl = url; 730 } 731 } 732 733 return OK; 734} 735 736sp<M3UParser> LiveSession::fetchPlaylist( 737 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 738 ALOGV("fetchPlaylist '%s'", url); 739 740 *unchanged = false; 741 742 sp<ABuffer> buffer; 743 String8 actualUrl; 744 status_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 745 746 if (err != OK) { 747 return NULL; 748 } 749 750 // MD5 functionality is not available on the simulator, treat all 751 // playlists as changed. 752 753#if defined(HAVE_ANDROID_OS) 754 uint8_t hash[16]; 755 756 MD5_CTX m; 757 MD5_Init(&m); 758 MD5_Update(&m, buffer->data(), buffer->size()); 759 760 MD5_Final(hash, &m); 761 762 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 763 // playlist unchanged 764 *unchanged = true; 765 766 return NULL; 767 } 768 769 if (curPlaylistHash != NULL) { 770 memcpy(curPlaylistHash, hash, sizeof(hash)); 771 } 772#endif 773 774 sp<M3UParser> playlist = 775 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 776 777 if (playlist->initCheck() != OK) { 778 ALOGE("failed to parse .m3u8 playlist"); 779 780 return NULL; 781 } 782 783 return playlist; 784} 785 786static double uniformRand() { 787 return (double)rand() / RAND_MAX; 788} 789 790size_t LiveSession::getBandwidthIndex() { 791 if (mBandwidthItems.size() == 0) { 792 return 0; 793 } 794 795#if 1 796 char value[PROPERTY_VALUE_MAX]; 797 ssize_t index = -1; 798 if (property_get("media.httplive.bw-index", value, NULL)) { 799 char *end; 800 index = strtol(value, &end, 10); 801 CHECK(end > value && *end == '\0'); 802 803 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 804 index = mBandwidthItems.size() - 1; 805 } 806 } 807 808 if (index < 0) { 809 int32_t bandwidthBps; 810 if (mHTTPDataSource != NULL 811 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 812 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 813 } else { 814 ALOGV("no bandwidth estimate."); 815 return 0; // Pick the lowest bandwidth stream by default. 816 } 817 818 char value[PROPERTY_VALUE_MAX]; 819 if (property_get("media.httplive.max-bw", value, NULL)) { 820 char *end; 821 long maxBw = strtoul(value, &end, 10); 822 if (end > value && *end == '\0') { 823 if (maxBw > 0 && bandwidthBps > maxBw) { 824 ALOGV("bandwidth capped to %ld bps", maxBw); 825 bandwidthBps = maxBw; 826 } 827 } 828 } 829 830 // Consider only 80% of the available bandwidth usable. 831 bandwidthBps = (bandwidthBps * 8) / 10; 832 833 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 834 835 index = mBandwidthItems.size() - 1; 836 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 837 > (size_t)bandwidthBps) { 838 --index; 839 } 840 } 841#elif 0 842 // Change bandwidth at random() 843 size_t index = uniformRand() * mBandwidthItems.size(); 844#elif 0 845 // There's a 50% chance to stay on the current bandwidth and 846 // a 50% chance to switch to the next higher bandwidth (wrapping around 847 // to lowest) 848 const size_t kMinIndex = 0; 849 850 static ssize_t mPrevBandwidthIndex = -1; 851 852 size_t index; 853 if (mPrevBandwidthIndex < 0) { 854 index = kMinIndex; 855 } else if (uniformRand() < 0.5) { 856 index = (size_t)mPrevBandwidthIndex; 857 } else { 858 index = mPrevBandwidthIndex + 1; 859 if (index == mBandwidthItems.size()) { 860 index = kMinIndex; 861 } 862 } 863 mPrevBandwidthIndex = index; 864#elif 0 865 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 866 867 size_t index = mBandwidthItems.size() - 1; 868 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 869 --index; 870 } 871#elif 1 872 char value[PROPERTY_VALUE_MAX]; 873 size_t index; 874 if (property_get("media.httplive.bw-index", value, NULL)) { 875 char *end; 876 index = strtoul(value, &end, 10); 877 CHECK(end > value && *end == '\0'); 878 879 if (index >= mBandwidthItems.size()) { 880 index = mBandwidthItems.size() - 1; 881 } 882 } else { 883 index = 0; 884 } 885#else 886 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 887#endif 888 889 CHECK_GE(index, 0); 890 891 return index; 892} 893 894status_t LiveSession::onSeek(const sp<AMessage> &msg) { 895 int64_t timeUs; 896 CHECK(msg->findInt64("timeUs", &timeUs)); 897 898 if (!mReconfigurationInProgress) { 899 changeConfiguration(timeUs, getBandwidthIndex()); 900 } 901 902 return OK; 903} 904 905status_t LiveSession::getDuration(int64_t *durationUs) const { 906 int64_t maxDurationUs = 0ll; 907 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 908 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 909 910 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 911 maxDurationUs = fetcherDurationUs; 912 } 913 } 914 915 *durationUs = maxDurationUs; 916 917 return OK; 918} 919 920bool LiveSession::isSeekable() const { 921 int64_t durationUs; 922 return getDuration(&durationUs) == OK && durationUs >= 0; 923} 924 925bool LiveSession::hasDynamicDuration() const { 926 return false; 927} 928 929status_t LiveSession::getTrackInfo(Parcel *reply) const { 930 return mPlaylist->getTrackInfo(reply); 931} 932 933status_t LiveSession::selectTrack(size_t index, bool select) { 934 status_t err = mPlaylist->selectTrack(index, select); 935 if (err == OK) { 936 (new AMessage(kWhatChangeConfiguration, id()))->post(); 937 } 938 return err; 939} 940 941bool LiveSession::canSwitchUp() { 942 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 943 status_t err = OK; 944 for (size_t i = 0; i < mPacketSources.size(); ++i) { 945 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 946 int64_t dur = source->getBufferedDurationUs(&err); 947 if (err == OK && dur > 10000000) { 948 return true; 949 } 950 } 951 return false; 952} 953 954void LiveSession::changeConfiguration( 955 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 956 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 957 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 958 cancelBandwidthSwitch(); 959 960 CHECK(!mReconfigurationInProgress); 961 mReconfigurationInProgress = true; 962 963 mPrevBandwidthIndex = bandwidthIndex; 964 965 ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d", 966 timeUs, bandwidthIndex, pickTrack); 967 968 if (pickTrack) { 969 mPlaylist->pickRandomMediaItems(); 970 } 971 972 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 973 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 974 975 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 976 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 977 978 AString URIs[kMaxStreams]; 979 for (size_t i = 0; i < kMaxStreams; ++i) { 980 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 981 streamMask |= indexToType(i); 982 } 983 } 984 985 // Step 1, stop and discard fetchers that are no longer needed. 986 // Pause those that we'll reuse. 987 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 988 const AString &uri = mFetcherInfos.keyAt(i); 989 990 bool discardFetcher = true; 991 992 // If we're seeking all current fetchers are discarded. 993 if (timeUs < 0ll) { 994 // delay fetcher removal 995 discardFetcher = false; 996 997 for (size_t j = 0; j < kMaxStreams; ++j) { 998 StreamType type = indexToType(j); 999 if ((streamMask & type) && uri == URIs[j]) { 1000 resumeMask |= type; 1001 streamMask &= ~type; 1002 } 1003 } 1004 } 1005 1006 if (discardFetcher) { 1007 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1008 } else { 1009 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1010 } 1011 } 1012 1013 sp<AMessage> msg; 1014 if (timeUs < 0ll) { 1015 // skip onChangeConfiguration2 (decoder destruction) if switching. 1016 msg = new AMessage(kWhatChangeConfiguration3, id()); 1017 } else { 1018 msg = new AMessage(kWhatChangeConfiguration2, id()); 1019 } 1020 msg->setInt32("streamMask", streamMask); 1021 msg->setInt32("resumeMask", resumeMask); 1022 msg->setInt64("timeUs", timeUs); 1023 for (size_t i = 0; i < kMaxStreams; ++i) { 1024 if (streamMask & indexToType(i)) { 1025 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1026 } 1027 } 1028 1029 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1030 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1031 // fetchers have completed their asynchronous operation, we'll post 1032 // mContinuation, which then is handled below in onChangeConfiguration2. 1033 mContinuationCounter = mFetcherInfos.size(); 1034 mContinuation = msg; 1035 1036 if (mContinuationCounter == 0) { 1037 msg->post(); 1038 1039 if (mSeekReplyID != 0) { 1040 CHECK(mSeekReply != NULL); 1041 mSeekReply->postReply(mSeekReplyID); 1042 } 1043 } 1044} 1045 1046void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1047 if (!mReconfigurationInProgress) { 1048 changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); 1049 } else { 1050 msg->post(1000000ll); // retry in 1 sec 1051 } 1052} 1053 1054void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1055 mContinuation.clear(); 1056 1057 // All fetchers are either suspended or have been removed now. 1058 1059 uint32_t streamMask; 1060 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1061 1062 AString URIs[kMaxStreams]; 1063 for (size_t i = 0; i < kMaxStreams; ++i) { 1064 if (streamMask & indexToType(i)) { 1065 const AString &uriKey = mStreams[i].uriKey(); 1066 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1067 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1068 } 1069 } 1070 1071 // Determine which decoders to shutdown on the player side, 1072 // a decoder has to be shutdown if either 1073 // 1) its streamtype was active before but now longer isn't. 1074 // or 1075 // 2) its streamtype was already active and still is but the URI 1076 // has changed. 1077 uint32_t changedMask = 0; 1078 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1079 if (((mStreamMask & streamMask & indexToType(i)) 1080 && !(URIs[i] == mStreams[i].mUri)) 1081 || (mStreamMask & ~streamMask & indexToType(i))) { 1082 changedMask |= indexToType(i); 1083 } 1084 } 1085 1086 if (changedMask == 0) { 1087 // If nothing changed as far as the audio/video decoders 1088 // are concerned we can proceed. 1089 onChangeConfiguration3(msg); 1090 return; 1091 } 1092 1093 // Something changed, inform the player which will shutdown the 1094 // corresponding decoders and will post the reply once that's done. 1095 // Handling the reply will continue executing below in 1096 // onChangeConfiguration3. 1097 sp<AMessage> notify = mNotify->dup(); 1098 notify->setInt32("what", kWhatStreamsChanged); 1099 notify->setInt32("changedMask", changedMask); 1100 1101 msg->setWhat(kWhatChangeConfiguration3); 1102 msg->setTarget(id()); 1103 1104 notify->setMessage("reply", msg); 1105 notify->post(); 1106} 1107 1108void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1109 mContinuation.clear(); 1110 // All remaining fetchers are still suspended, the player has shutdown 1111 // any decoders that needed it. 1112 1113 uint32_t streamMask, resumeMask; 1114 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1115 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1116 1117 for (size_t i = 0; i < kMaxStreams; ++i) { 1118 if (streamMask & indexToType(i)) { 1119 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1120 } 1121 } 1122 1123 int64_t timeUs; 1124 bool switching = false; 1125 CHECK(msg->findInt64("timeUs", &timeUs)); 1126 1127 if (timeUs < 0ll) { 1128 timeUs = mLastDequeuedTimeUs; 1129 switching = true; 1130 } 1131 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1132 1133 mNewStreamMask = streamMask; 1134 1135 // Of all existing fetchers: 1136 // * Resume fetchers that are still needed and assign them original packet sources. 1137 // * Mark otherwise unneeded fetchers for removal. 1138 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1139 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1140 const AString &uri = mFetcherInfos.keyAt(i); 1141 1142 sp<AnotherPacketSource> sources[kMaxStreams]; 1143 for (size_t j = 0; j < kMaxStreams; ++j) { 1144 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1145 sources[j] = mPacketSources.valueFor(indexToType(j)); 1146 } 1147 } 1148 1149 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1150 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1151 || sources[kSubtitleIndex] != NULL) { 1152 info.mFetcher->startAsync( 1153 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1154 } else { 1155 info.mToBeRemoved = true; 1156 } 1157 } 1158 1159 // streamMask now only contains the types that need a new fetcher created. 1160 1161 if (streamMask != 0) { 1162 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1163 } 1164 1165 // Find out when the original fetchers have buffered up to and start the new fetchers 1166 // at a later timestamp. 1167 for (size_t i = 0; i < kMaxStreams; i++) { 1168 if (!(indexToType(i) & streamMask)) { 1169 continue; 1170 } 1171 1172 AString uri; 1173 uri = mStreams[i].mUri; 1174 1175 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1176 CHECK(fetcher != NULL); 1177 1178 int32_t latestSeq = -1; 1179 int64_t latestTimeUs = 0ll; 1180 sp<AnotherPacketSource> sources[kMaxStreams]; 1181 1182 // TRICKY: looping from i as earlier streams are already removed from streamMask 1183 for (size_t j = i; j < kMaxStreams; ++j) { 1184 if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { 1185 sources[j] = mPacketSources.valueFor(indexToType(j)); 1186 1187 if (!switching) { 1188 sources[j]->clear(); 1189 } else { 1190 int32_t type, seq; 1191 int64_t srcTimeUs; 1192 sp<AMessage> meta = sources[j]->getLatestMeta(); 1193 1194 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1195 CHECK(meta->findInt32("seq", &seq)); 1196 if (seq > latestSeq) { 1197 latestSeq = seq; 1198 } 1199 CHECK(meta->findInt64("timeUs", &srcTimeUs)); 1200 if (srcTimeUs > latestTimeUs) { 1201 latestTimeUs = srcTimeUs; 1202 } 1203 } 1204 1205 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1206 sources[j]->clear(); 1207 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1208 if (extraStreams & indexToType(j)) { 1209 sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); 1210 } 1211 } 1212 1213 streamMask &= ~indexToType(j); 1214 } 1215 } 1216 1217 fetcher->startAsync( 1218 sources[kAudioIndex], 1219 sources[kVideoIndex], 1220 sources[kSubtitleIndex], 1221 timeUs, 1222 latestTimeUs /* min start time(us) */, 1223 latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); 1224 } 1225 1226 // All fetchers have now been started, the configuration change 1227 // has completed. 1228 1229 scheduleCheckBandwidthEvent(); 1230 1231 ALOGV("XXX configuration change completed."); 1232 mReconfigurationInProgress = false; 1233 if (switching) { 1234 mSwitchInProgress = true; 1235 } else { 1236 mStreamMask = mNewStreamMask; 1237 } 1238 1239 if (mDisconnectReplyID != 0) { 1240 finishDisconnect(); 1241 } 1242} 1243 1244void LiveSession::onSwapped(const sp<AMessage> &msg) { 1245 int32_t switchGeneration; 1246 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1247 if (switchGeneration != mSwitchGeneration) { 1248 return; 1249 } 1250 1251 int32_t stream; 1252 CHECK(msg->findInt32("stream", &stream)); 1253 mSwapMask |= stream; 1254 if (mSwapMask != mStreamMask) { 1255 return; 1256 } 1257 1258 // Check if new variant contains extra streams. 1259 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1260 while (extraStreams) { 1261 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1262 swapPacketSource(extraStream); 1263 extraStreams &= ~extraStream; 1264 } 1265 1266 tryToFinishBandwidthSwitch(); 1267} 1268 1269// Mark switch done when: 1270// 1. all old buffers are swapped out, AND 1271// 2. all old fetchers are removed. 1272void LiveSession::tryToFinishBandwidthSwitch() { 1273 bool needToRemoveFetchers = false; 1274 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1275 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1276 needToRemoveFetchers = true; 1277 break; 1278 } 1279 } 1280 if (!needToRemoveFetchers && mSwapMask == mStreamMask) { 1281 mStreamMask = mNewStreamMask; 1282 mSwitchInProgress = false; 1283 mSwapMask = 0; 1284 } 1285} 1286 1287void LiveSession::scheduleCheckBandwidthEvent() { 1288 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1289 msg->setInt32("generation", mCheckBandwidthGeneration); 1290 msg->post(10000000ll); 1291} 1292 1293void LiveSession::cancelCheckBandwidthEvent() { 1294 ++mCheckBandwidthGeneration; 1295} 1296 1297void LiveSession::cancelBandwidthSwitch() { 1298 Mutex::Autolock lock(mSwapMutex); 1299 mSwitchGeneration++; 1300 mSwitchInProgress = false; 1301 mSwapMask = 0; 1302} 1303 1304bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1305 if (mReconfigurationInProgress || mSwitchInProgress) { 1306 return false; 1307 } 1308 1309 if (mPrevBandwidthIndex < 0) { 1310 return true; 1311 } 1312 1313 if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { 1314 return false; 1315 } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { 1316 return canSwitchUp(); 1317 } else { 1318 return true; 1319 } 1320} 1321 1322void LiveSession::onCheckBandwidth() { 1323 size_t bandwidthIndex = getBandwidthIndex(); 1324 if (canSwitchBandwidthTo(bandwidthIndex)) { 1325 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1326 } else { 1327 scheduleCheckBandwidthEvent(); 1328 } 1329 1330 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1331 // schedule another one on return, only an explicit call to 1332 // scheduleCheckBandwidthEvent will do that. 1333 // This ensures that only one configuration change is ongoing at any 1334 // one time, once that completes it'll schedule another check bandwidth 1335 // event. 1336} 1337 1338void LiveSession::postPrepared(status_t err) { 1339 CHECK(mInPreparationPhase); 1340 1341 sp<AMessage> notify = mNotify->dup(); 1342 if (err == OK || err == ERROR_END_OF_STREAM) { 1343 notify->setInt32("what", kWhatPrepared); 1344 } else { 1345 notify->setInt32("what", kWhatPreparationFailed); 1346 notify->setInt32("err", err); 1347 } 1348 1349 notify->post(); 1350 1351 mInPreparationPhase = false; 1352} 1353 1354} // namespace android 1355 1356