LiveSession.cpp revision b4a7a2df4c28c3f32b5d877b54831d2cc5d78f81
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52LiveSession::LiveSession( 53 const sp<AMessage> ¬ify, uint32_t flags, 54 const sp<IMediaHTTPService> &httpService) 55 : mNotify(notify), 56 mFlags(flags), 57 mHTTPService(httpService), 58 mInPreparationPhase(true), 59 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 60 mPrevBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mLastDequeuedTimeUs(0ll), 67 mRealTimeBaseUs(0ll), 68 mReconfigurationInProgress(false), 69 mSwitchInProgress(false), 70 mDisconnectReplyID(0), 71 mSeekReplyID(0) { 72 73 mStreams[kAudioIndex] = StreamItem("audio"); 74 mStreams[kVideoIndex] = StreamItem("video"); 75 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 76 77 for (size_t i = 0; i < kMaxStreams; ++i) { 78 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 79 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 80 } 81} 82 83LiveSession::~LiveSession() { 84} 85 86sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 87 ABuffer *discontinuity = new ABuffer(0); 88 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 89 discontinuity->meta()->setInt32("swapPacketSource", swap); 90 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 91 discontinuity->meta()->setInt64("timeUs", -1); 92 return discontinuity; 93} 94 95void LiveSession::swapPacketSource(StreamType stream) { 96 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 97 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 98 sp<AnotherPacketSource> tmp = aps; 99 aps = aps2; 100 aps2 = tmp; 101 aps2->clear(); 102} 103 104status_t LiveSession::dequeueAccessUnit( 105 StreamType stream, sp<ABuffer> *accessUnit) { 106 if (!(mStreamMask & stream)) { 107 // return -EWOULDBLOCK to avoid halting the decoder 108 // when switching between audio/video and audio only. 109 return -EWOULDBLOCK; 110 } 111 112 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 113 114 status_t finalResult; 115 if (!packetSource->hasBufferAvailable(&finalResult)) { 116 return finalResult == OK ? -EAGAIN : finalResult; 117 } 118 119 status_t err = packetSource->dequeueAccessUnit(accessUnit); 120 121 const char *streamStr; 122 switch (stream) { 123 case STREAMTYPE_AUDIO: 124 streamStr = "audio"; 125 break; 126 case STREAMTYPE_VIDEO: 127 streamStr = "video"; 128 break; 129 case STREAMTYPE_SUBTITLES: 130 streamStr = "subs"; 131 break; 132 default: 133 TRESPASS(); 134 } 135 136 if (err == INFO_DISCONTINUITY) { 137 int32_t type; 138 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 139 140 sp<AMessage> extra; 141 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 142 extra.clear(); 143 } 144 145 ALOGI("[%s] read discontinuity of type %d, extra = %s", 146 streamStr, 147 type, 148 extra == NULL ? "NULL" : extra->debugString().c_str()); 149 150 int32_t swap; 151 if (type == ATSParser::DISCONTINUITY_FORMATCHANGE 152 && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) 153 && swap) { 154 155 int32_t switchGeneration; 156 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 157 { 158 Mutex::Autolock lock(mSwapMutex); 159 if (switchGeneration == mSwitchGeneration) { 160 swapPacketSource(stream); 161 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 162 msg->setInt32("stream", stream); 163 msg->setInt32("switchGeneration", switchGeneration); 164 msg->post(); 165 } 166 } 167 } 168 } else if (err == OK) { 169 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 170 int64_t timeUs; 171 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 172 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 173 174 mLastDequeuedTimeUs = timeUs; 175 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 176 } else if (stream == STREAMTYPE_SUBTITLES) { 177 (*accessUnit)->meta()->setInt32( 178 "trackIndex", mPlaylist->getSelectedIndex()); 179 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 180 } 181 } else { 182 ALOGI("[%s] encountered error %d", streamStr, err); 183 } 184 185 return err; 186} 187 188status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 189 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 190 if (!(mStreamMask & stream)) { 191 return UNKNOWN_ERROR; 192 } 193 194 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 195 196 sp<MetaData> meta = packetSource->getFormat(); 197 198 if (meta == NULL) { 199 return -EAGAIN; 200 } 201 202 return convertMetaDataToMessage(meta, format); 203} 204 205void LiveSession::connectAsync( 206 const char *url, const KeyedVector<String8, String8> *headers) { 207 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 208 msg->setString("url", url); 209 210 if (headers != NULL) { 211 msg->setPointer( 212 "headers", 213 new KeyedVector<String8, String8>(*headers)); 214 } 215 216 msg->post(); 217} 218 219status_t LiveSession::disconnect() { 220 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 221 222 sp<AMessage> response; 223 status_t err = msg->postAndAwaitResponse(&response); 224 225 return err; 226} 227 228status_t LiveSession::seekTo(int64_t timeUs) { 229 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 230 msg->setInt64("timeUs", timeUs); 231 232 sp<AMessage> response; 233 status_t err = msg->postAndAwaitResponse(&response); 234 235 uint32_t replyID; 236 CHECK(response == mSeekReply && 0 != mSeekReplyID); 237 mSeekReply.clear(); 238 mSeekReplyID = 0; 239 return err; 240} 241 242void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 243 switch (msg->what()) { 244 case kWhatConnect: 245 { 246 onConnect(msg); 247 break; 248 } 249 250 case kWhatDisconnect: 251 { 252 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 253 254 if (mReconfigurationInProgress) { 255 break; 256 } 257 258 finishDisconnect(); 259 break; 260 } 261 262 case kWhatSeek: 263 { 264 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 265 266 status_t err = onSeek(msg); 267 268 mSeekReply = new AMessage; 269 mSeekReply->setInt32("err", err); 270 break; 271 } 272 273 case kWhatFetcherNotify: 274 { 275 int32_t what; 276 CHECK(msg->findInt32("what", &what)); 277 278 switch (what) { 279 case PlaylistFetcher::kWhatStarted: 280 break; 281 case PlaylistFetcher::kWhatPaused: 282 case PlaylistFetcher::kWhatStopped: 283 { 284 if (what == PlaylistFetcher::kWhatStopped) { 285 AString uri; 286 CHECK(msg->findString("uri", &uri)); 287 if (mFetcherInfos.removeItem(uri) < 0) { 288 // ignore duplicated kWhatStopped messages. 289 break; 290 } 291 292 tryToFinishBandwidthSwitch(); 293 } 294 295 if (mContinuation != NULL) { 296 CHECK_GT(mContinuationCounter, 0); 297 if (--mContinuationCounter == 0) { 298 mContinuation->post(); 299 300 if (mSeekReplyID != 0) { 301 CHECK(mSeekReply != NULL); 302 mSeekReply->postReply(mSeekReplyID); 303 } 304 } 305 } 306 break; 307 } 308 309 case PlaylistFetcher::kWhatDurationUpdate: 310 { 311 AString uri; 312 CHECK(msg->findString("uri", &uri)); 313 314 int64_t durationUs; 315 CHECK(msg->findInt64("durationUs", &durationUs)); 316 317 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 318 info->mDurationUs = durationUs; 319 break; 320 } 321 322 case PlaylistFetcher::kWhatError: 323 { 324 status_t err; 325 CHECK(msg->findInt32("err", &err)); 326 327 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 328 329 if (mInPreparationPhase) { 330 postPrepared(err); 331 } 332 333 cancelBandwidthSwitch(); 334 335 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 336 337 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 338 339 mPacketSources.valueFor( 340 STREAMTYPE_SUBTITLES)->signalEOS(err); 341 342 sp<AMessage> notify = mNotify->dup(); 343 notify->setInt32("what", kWhatError); 344 notify->setInt32("err", err); 345 notify->post(); 346 break; 347 } 348 349 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 350 { 351 AString uri; 352 CHECK(msg->findString("uri", &uri)); 353 354 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 355 info->mIsPrepared = true; 356 357 if (mInPreparationPhase) { 358 bool allFetchersPrepared = true; 359 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 360 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 361 allFetchersPrepared = false; 362 break; 363 } 364 } 365 366 if (allFetchersPrepared) { 367 postPrepared(OK); 368 } 369 } 370 break; 371 } 372 373 case PlaylistFetcher::kWhatStartedAt: 374 { 375 int32_t switchGeneration; 376 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 377 378 if (switchGeneration != mSwitchGeneration) { 379 break; 380 } 381 382 // Resume fetcher for the original variant; the resumed fetcher should 383 // continue until the timestamps found in msg, which is stored by the 384 // new fetcher to indicate where the new variant has started buffering. 385 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 386 const FetcherInfo info = mFetcherInfos.valueAt(i); 387 if (info.mToBeRemoved) { 388 info.mFetcher->resumeUntilAsync(msg); 389 } 390 } 391 break; 392 } 393 394 default: 395 TRESPASS(); 396 } 397 398 break; 399 } 400 401 case kWhatCheckBandwidth: 402 { 403 int32_t generation; 404 CHECK(msg->findInt32("generation", &generation)); 405 406 if (generation != mCheckBandwidthGeneration) { 407 break; 408 } 409 410 onCheckBandwidth(); 411 break; 412 } 413 414 case kWhatChangeConfiguration: 415 { 416 onChangeConfiguration(msg); 417 break; 418 } 419 420 case kWhatChangeConfiguration2: 421 { 422 onChangeConfiguration2(msg); 423 break; 424 } 425 426 case kWhatChangeConfiguration3: 427 { 428 onChangeConfiguration3(msg); 429 break; 430 } 431 432 case kWhatFinishDisconnect2: 433 { 434 onFinishDisconnect2(); 435 break; 436 } 437 438 case kWhatSwapped: 439 { 440 onSwapped(msg); 441 break; 442 } 443 default: 444 TRESPASS(); 445 break; 446 } 447} 448 449// static 450int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 451 if (a->mBandwidth < b->mBandwidth) { 452 return -1; 453 } else if (a->mBandwidth == b->mBandwidth) { 454 return 0; 455 } 456 457 return 1; 458} 459 460// static 461LiveSession::StreamType LiveSession::indexToType(int idx) { 462 CHECK(idx >= 0 && idx < kMaxStreams); 463 return (StreamType)(1 << idx); 464} 465 466void LiveSession::onConnect(const sp<AMessage> &msg) { 467 AString url; 468 CHECK(msg->findString("url", &url)); 469 470 KeyedVector<String8, String8> *headers = NULL; 471 if (!msg->findPointer("headers", (void **)&headers)) { 472 mExtraHeaders.clear(); 473 } else { 474 mExtraHeaders = *headers; 475 476 delete headers; 477 headers = NULL; 478 } 479 480#if 1 481 ALOGI("onConnect <URL suppressed>"); 482#else 483 ALOGI("onConnect %s", url.c_str()); 484#endif 485 486 mMasterURL = url; 487 488 bool dummy; 489 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 490 491 if (mPlaylist == NULL) { 492 ALOGE("unable to fetch master playlist <URL suppressed>."); 493 494 postPrepared(ERROR_IO); 495 return; 496 } 497 498 // We trust the content provider to make a reasonable choice of preferred 499 // initial bandwidth by listing it first in the variant playlist. 500 // At startup we really don't have a good estimate on the available 501 // network bandwidth since we haven't tranferred any data yet. Once 502 // we have we can make a better informed choice. 503 size_t initialBandwidth = 0; 504 size_t initialBandwidthIndex = 0; 505 506 if (mPlaylist->isVariantPlaylist()) { 507 for (size_t i = 0; i < mPlaylist->size(); ++i) { 508 BandwidthItem item; 509 510 item.mPlaylistIndex = i; 511 512 sp<AMessage> meta; 513 AString uri; 514 mPlaylist->itemAt(i, &uri, &meta); 515 516 unsigned long bandwidth; 517 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 518 519 if (initialBandwidth == 0) { 520 initialBandwidth = item.mBandwidth; 521 } 522 523 mBandwidthItems.push(item); 524 } 525 526 CHECK_GT(mBandwidthItems.size(), 0u); 527 528 mBandwidthItems.sort(SortByBandwidth); 529 530 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 531 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 532 initialBandwidthIndex = i; 533 break; 534 } 535 } 536 } else { 537 // dummy item. 538 BandwidthItem item; 539 item.mPlaylistIndex = 0; 540 item.mBandwidth = 0; 541 mBandwidthItems.push(item); 542 } 543 544 changeConfiguration( 545 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); 546} 547 548void LiveSession::finishDisconnect() { 549 // No reconfiguration is currently pending, make sure none will trigger 550 // during disconnection either. 551 cancelCheckBandwidthEvent(); 552 553 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 554 // (finishDisconnect, onFinishDisconnect2) 555 cancelBandwidthSwitch(); 556 557 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 558 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 559 } 560 561 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 562 563 mContinuationCounter = mFetcherInfos.size(); 564 mContinuation = msg; 565 566 if (mContinuationCounter == 0) { 567 msg->post(); 568 } 569} 570 571void LiveSession::onFinishDisconnect2() { 572 mContinuation.clear(); 573 574 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 575 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 576 577 mPacketSources.valueFor( 578 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 579 580 sp<AMessage> response = new AMessage; 581 response->setInt32("err", OK); 582 583 response->postReply(mDisconnectReplyID); 584 mDisconnectReplyID = 0; 585} 586 587sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 588 ssize_t index = mFetcherInfos.indexOfKey(uri); 589 590 if (index >= 0) { 591 return NULL; 592 } 593 594 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 595 notify->setString("uri", uri); 596 notify->setInt32("switchGeneration", mSwitchGeneration); 597 598 FetcherInfo info; 599 info.mFetcher = new PlaylistFetcher(notify, this, uri); 600 info.mDurationUs = -1ll; 601 info.mIsPrepared = false; 602 info.mToBeRemoved = false; 603 looper()->registerHandler(info.mFetcher); 604 605 mFetcherInfos.add(uri, info); 606 607 return info.mFetcher; 608} 609 610/* 611 * Illustration of parameters: 612 * 613 * 0 `range_offset` 614 * +------------+-------------------------------------------------------+--+--+ 615 * | | | next block to fetch | | | 616 * | | `source` handle => `out` buffer | | | | 617 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 618 * | |<----------- `range_length` / buffer capacity ----------->| | 619 * |<------------------------------ file_size ------------------------------->| 620 * 621 * Special parameter values: 622 * - range_length == -1 means entire file 623 * - block_size == 0 means entire range 624 * 625 */ 626ssize_t LiveSession::fetchFile( 627 const char *url, sp<ABuffer> *out, 628 int64_t range_offset, int64_t range_length, 629 uint32_t block_size, /* download block size */ 630 sp<DataSource> *source, /* to return and reuse source */ 631 String8 *actualUrl) { 632 off64_t size; 633 sp<DataSource> temp_source; 634 if (source == NULL) { 635 source = &temp_source; 636 } 637 638 if (*source == NULL) { 639 if (!strncasecmp(url, "file://", 7)) { 640 *source = new FileSource(url + 7); 641 } else if (strncasecmp(url, "http://", 7) 642 && strncasecmp(url, "https://", 8)) { 643 return ERROR_UNSUPPORTED; 644 } else { 645 KeyedVector<String8, String8> headers = mExtraHeaders; 646 if (range_offset > 0 || range_length >= 0) { 647 headers.add( 648 String8("Range"), 649 String8( 650 StringPrintf( 651 "bytes=%lld-%s", 652 range_offset, 653 range_length < 0 654 ? "" : StringPrintf("%lld", 655 range_offset + range_length - 1).c_str()).c_str())); 656 } 657 status_t err = mHTTPDataSource->connect(url, &headers); 658 659 if (err != OK) { 660 return err; 661 } 662 663 *source = mHTTPDataSource; 664 } 665 } 666 667 status_t getSizeErr = (*source)->getSize(&size); 668 if (getSizeErr != OK) { 669 size = 65536; 670 } 671 672 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 673 if (*out == NULL) { 674 buffer->setRange(0, 0); 675 } 676 677 ssize_t bytesRead = 0; 678 // adjust range_length if only reading partial block 679 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 680 range_length = buffer->size() + block_size; 681 } 682 for (;;) { 683 // Only resize when we don't know the size. 684 size_t bufferRemaining = buffer->capacity() - buffer->size(); 685 if (bufferRemaining == 0 && getSizeErr != OK) { 686 bufferRemaining = 32768; 687 688 ALOGV("increasing download buffer to %zu bytes", 689 buffer->size() + bufferRemaining); 690 691 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 692 memcpy(copy->data(), buffer->data(), buffer->size()); 693 copy->setRange(0, buffer->size()); 694 695 buffer = copy; 696 } 697 698 size_t maxBytesToRead = bufferRemaining; 699 if (range_length >= 0) { 700 int64_t bytesLeftInRange = range_length - buffer->size(); 701 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 702 maxBytesToRead = bytesLeftInRange; 703 704 if (bytesLeftInRange == 0) { 705 break; 706 } 707 } 708 } 709 710 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 711 // to help us break out of the loop. 712 ssize_t n = (*source)->readAt( 713 buffer->size(), buffer->data() + buffer->size(), 714 maxBytesToRead); 715 716 if (n < 0) { 717 return n; 718 } 719 720 if (n == 0) { 721 break; 722 } 723 724 buffer->setRange(0, buffer->size() + (size_t)n); 725 bytesRead += n; 726 } 727 728 *out = buffer; 729 if (actualUrl != NULL) { 730 *actualUrl = (*source)->getUri(); 731 if (actualUrl->isEmpty()) { 732 *actualUrl = url; 733 } 734 } 735 736 return bytesRead; 737} 738 739sp<M3UParser> LiveSession::fetchPlaylist( 740 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 741 ALOGV("fetchPlaylist '%s'", url); 742 743 *unchanged = false; 744 745 sp<ABuffer> buffer; 746 String8 actualUrl; 747 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 748 749 if (err <= 0) { 750 return NULL; 751 } 752 753 // MD5 functionality is not available on the simulator, treat all 754 // playlists as changed. 755 756#if defined(HAVE_ANDROID_OS) 757 uint8_t hash[16]; 758 759 MD5_CTX m; 760 MD5_Init(&m); 761 MD5_Update(&m, buffer->data(), buffer->size()); 762 763 MD5_Final(hash, &m); 764 765 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 766 // playlist unchanged 767 *unchanged = true; 768 769 return NULL; 770 } 771 772 if (curPlaylistHash != NULL) { 773 memcpy(curPlaylistHash, hash, sizeof(hash)); 774 } 775#endif 776 777 sp<M3UParser> playlist = 778 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 779 780 if (playlist->initCheck() != OK) { 781 ALOGE("failed to parse .m3u8 playlist"); 782 783 return NULL; 784 } 785 786 return playlist; 787} 788 789static double uniformRand() { 790 return (double)rand() / RAND_MAX; 791} 792 793size_t LiveSession::getBandwidthIndex() { 794 if (mBandwidthItems.size() == 0) { 795 return 0; 796 } 797 798#if 1 799 char value[PROPERTY_VALUE_MAX]; 800 ssize_t index = -1; 801 if (property_get("media.httplive.bw-index", value, NULL)) { 802 char *end; 803 index = strtol(value, &end, 10); 804 CHECK(end > value && *end == '\0'); 805 806 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 807 index = mBandwidthItems.size() - 1; 808 } 809 } 810 811 if (index < 0) { 812 int32_t bandwidthBps; 813 if (mHTTPDataSource != NULL 814 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 815 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 816 } else { 817 ALOGV("no bandwidth estimate."); 818 return 0; // Pick the lowest bandwidth stream by default. 819 } 820 821 char value[PROPERTY_VALUE_MAX]; 822 if (property_get("media.httplive.max-bw", value, NULL)) { 823 char *end; 824 long maxBw = strtoul(value, &end, 10); 825 if (end > value && *end == '\0') { 826 if (maxBw > 0 && bandwidthBps > maxBw) { 827 ALOGV("bandwidth capped to %ld bps", maxBw); 828 bandwidthBps = maxBw; 829 } 830 } 831 } 832 833 // Consider only 80% of the available bandwidth usable. 834 bandwidthBps = (bandwidthBps * 8) / 10; 835 836 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 837 838 index = mBandwidthItems.size() - 1; 839 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 840 > (size_t)bandwidthBps) { 841 --index; 842 } 843 } 844#elif 0 845 // Change bandwidth at random() 846 size_t index = uniformRand() * mBandwidthItems.size(); 847#elif 0 848 // There's a 50% chance to stay on the current bandwidth and 849 // a 50% chance to switch to the next higher bandwidth (wrapping around 850 // to lowest) 851 const size_t kMinIndex = 0; 852 853 static ssize_t mPrevBandwidthIndex = -1; 854 855 size_t index; 856 if (mPrevBandwidthIndex < 0) { 857 index = kMinIndex; 858 } else if (uniformRand() < 0.5) { 859 index = (size_t)mPrevBandwidthIndex; 860 } else { 861 index = mPrevBandwidthIndex + 1; 862 if (index == mBandwidthItems.size()) { 863 index = kMinIndex; 864 } 865 } 866 mPrevBandwidthIndex = index; 867#elif 0 868 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 869 870 size_t index = mBandwidthItems.size() - 1; 871 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 872 --index; 873 } 874#elif 1 875 char value[PROPERTY_VALUE_MAX]; 876 size_t index; 877 if (property_get("media.httplive.bw-index", value, NULL)) { 878 char *end; 879 index = strtoul(value, &end, 10); 880 CHECK(end > value && *end == '\0'); 881 882 if (index >= mBandwidthItems.size()) { 883 index = mBandwidthItems.size() - 1; 884 } 885 } else { 886 index = 0; 887 } 888#else 889 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 890#endif 891 892 CHECK_GE(index, 0); 893 894 return index; 895} 896 897status_t LiveSession::onSeek(const sp<AMessage> &msg) { 898 int64_t timeUs; 899 CHECK(msg->findInt64("timeUs", &timeUs)); 900 901 if (!mReconfigurationInProgress) { 902 changeConfiguration(timeUs, getBandwidthIndex()); 903 } 904 905 return OK; 906} 907 908status_t LiveSession::getDuration(int64_t *durationUs) const { 909 int64_t maxDurationUs = 0ll; 910 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 911 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 912 913 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 914 maxDurationUs = fetcherDurationUs; 915 } 916 } 917 918 *durationUs = maxDurationUs; 919 920 return OK; 921} 922 923bool LiveSession::isSeekable() const { 924 int64_t durationUs; 925 return getDuration(&durationUs) == OK && durationUs >= 0; 926} 927 928bool LiveSession::hasDynamicDuration() const { 929 return false; 930} 931 932status_t LiveSession::getTrackInfo(Parcel *reply) const { 933 return mPlaylist->getTrackInfo(reply); 934} 935 936status_t LiveSession::selectTrack(size_t index, bool select) { 937 status_t err = mPlaylist->selectTrack(index, select); 938 if (err == OK) { 939 (new AMessage(kWhatChangeConfiguration, id()))->post(); 940 } 941 return err; 942} 943 944bool LiveSession::canSwitchUp() { 945 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 946 status_t err = OK; 947 for (size_t i = 0; i < mPacketSources.size(); ++i) { 948 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 949 int64_t dur = source->getBufferedDurationUs(&err); 950 if (err == OK && dur > 10000000) { 951 return true; 952 } 953 } 954 return false; 955} 956 957void LiveSession::changeConfiguration( 958 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 959 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 960 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 961 cancelBandwidthSwitch(); 962 963 CHECK(!mReconfigurationInProgress); 964 mReconfigurationInProgress = true; 965 966 mPrevBandwidthIndex = bandwidthIndex; 967 968 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 969 timeUs, bandwidthIndex, pickTrack); 970 971 if (pickTrack) { 972 mPlaylist->pickRandomMediaItems(); 973 } 974 975 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 976 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 977 978 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 979 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 980 981 AString URIs[kMaxStreams]; 982 for (size_t i = 0; i < kMaxStreams; ++i) { 983 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 984 streamMask |= indexToType(i); 985 } 986 } 987 988 // Step 1, stop and discard fetchers that are no longer needed. 989 // Pause those that we'll reuse. 990 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 991 const AString &uri = mFetcherInfos.keyAt(i); 992 993 bool discardFetcher = true; 994 995 // If we're seeking all current fetchers are discarded. 996 if (timeUs < 0ll) { 997 // delay fetcher removal 998 discardFetcher = false; 999 1000 for (size_t j = 0; j < kMaxStreams; ++j) { 1001 StreamType type = indexToType(j); 1002 if ((streamMask & type) && uri == URIs[j]) { 1003 resumeMask |= type; 1004 streamMask &= ~type; 1005 } 1006 } 1007 } 1008 1009 if (discardFetcher) { 1010 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1011 } else { 1012 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1013 } 1014 } 1015 1016 sp<AMessage> msg; 1017 if (timeUs < 0ll) { 1018 // skip onChangeConfiguration2 (decoder destruction) if switching. 1019 msg = new AMessage(kWhatChangeConfiguration3, id()); 1020 } else { 1021 msg = new AMessage(kWhatChangeConfiguration2, id()); 1022 } 1023 msg->setInt32("streamMask", streamMask); 1024 msg->setInt32("resumeMask", resumeMask); 1025 msg->setInt64("timeUs", timeUs); 1026 for (size_t i = 0; i < kMaxStreams; ++i) { 1027 if (streamMask & indexToType(i)) { 1028 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1029 } 1030 } 1031 1032 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1033 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1034 // fetchers have completed their asynchronous operation, we'll post 1035 // mContinuation, which then is handled below in onChangeConfiguration2. 1036 mContinuationCounter = mFetcherInfos.size(); 1037 mContinuation = msg; 1038 1039 if (mContinuationCounter == 0) { 1040 msg->post(); 1041 1042 if (mSeekReplyID != 0) { 1043 CHECK(mSeekReply != NULL); 1044 mSeekReply->postReply(mSeekReplyID); 1045 } 1046 } 1047} 1048 1049void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1050 if (!mReconfigurationInProgress) { 1051 changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); 1052 } else { 1053 msg->post(1000000ll); // retry in 1 sec 1054 } 1055} 1056 1057void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1058 mContinuation.clear(); 1059 1060 // All fetchers are either suspended or have been removed now. 1061 1062 uint32_t streamMask; 1063 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1064 1065 AString URIs[kMaxStreams]; 1066 for (size_t i = 0; i < kMaxStreams; ++i) { 1067 if (streamMask & indexToType(i)) { 1068 const AString &uriKey = mStreams[i].uriKey(); 1069 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1070 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1071 } 1072 } 1073 1074 // Determine which decoders to shutdown on the player side, 1075 // a decoder has to be shutdown if either 1076 // 1) its streamtype was active before but now longer isn't. 1077 // or 1078 // 2) its streamtype was already active and still is but the URI 1079 // has changed. 1080 uint32_t changedMask = 0; 1081 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1082 if (((mStreamMask & streamMask & indexToType(i)) 1083 && !(URIs[i] == mStreams[i].mUri)) 1084 || (mStreamMask & ~streamMask & indexToType(i))) { 1085 changedMask |= indexToType(i); 1086 } 1087 } 1088 1089 if (changedMask == 0) { 1090 // If nothing changed as far as the audio/video decoders 1091 // are concerned we can proceed. 1092 onChangeConfiguration3(msg); 1093 return; 1094 } 1095 1096 // Something changed, inform the player which will shutdown the 1097 // corresponding decoders and will post the reply once that's done. 1098 // Handling the reply will continue executing below in 1099 // onChangeConfiguration3. 1100 sp<AMessage> notify = mNotify->dup(); 1101 notify->setInt32("what", kWhatStreamsChanged); 1102 notify->setInt32("changedMask", changedMask); 1103 1104 msg->setWhat(kWhatChangeConfiguration3); 1105 msg->setTarget(id()); 1106 1107 notify->setMessage("reply", msg); 1108 notify->post(); 1109} 1110 1111void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1112 mContinuation.clear(); 1113 // All remaining fetchers are still suspended, the player has shutdown 1114 // any decoders that needed it. 1115 1116 uint32_t streamMask, resumeMask; 1117 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1118 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1119 1120 for (size_t i = 0; i < kMaxStreams; ++i) { 1121 if (streamMask & indexToType(i)) { 1122 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1123 } 1124 } 1125 1126 int64_t timeUs; 1127 bool switching = false; 1128 CHECK(msg->findInt64("timeUs", &timeUs)); 1129 1130 if (timeUs < 0ll) { 1131 timeUs = mLastDequeuedTimeUs; 1132 switching = true; 1133 } 1134 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1135 1136 mNewStreamMask = streamMask; 1137 1138 // Of all existing fetchers: 1139 // * Resume fetchers that are still needed and assign them original packet sources. 1140 // * Mark otherwise unneeded fetchers for removal. 1141 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1142 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1143 const AString &uri = mFetcherInfos.keyAt(i); 1144 1145 sp<AnotherPacketSource> sources[kMaxStreams]; 1146 for (size_t j = 0; j < kMaxStreams; ++j) { 1147 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1148 sources[j] = mPacketSources.valueFor(indexToType(j)); 1149 } 1150 } 1151 1152 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1153 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1154 || sources[kSubtitleIndex] != NULL) { 1155 info.mFetcher->startAsync( 1156 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1157 } else { 1158 info.mToBeRemoved = true; 1159 } 1160 } 1161 1162 // streamMask now only contains the types that need a new fetcher created. 1163 1164 if (streamMask != 0) { 1165 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1166 } 1167 1168 // Find out when the original fetchers have buffered up to and start the new fetchers 1169 // at a later timestamp. 1170 for (size_t i = 0; i < kMaxStreams; i++) { 1171 if (!(indexToType(i) & streamMask)) { 1172 continue; 1173 } 1174 1175 AString uri; 1176 uri = mStreams[i].mUri; 1177 1178 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1179 CHECK(fetcher != NULL); 1180 1181 int32_t latestSeq = -1; 1182 int64_t latestTimeUs = 0ll; 1183 sp<AnotherPacketSource> sources[kMaxStreams]; 1184 1185 // TRICKY: looping from i as earlier streams are already removed from streamMask 1186 for (size_t j = i; j < kMaxStreams; ++j) { 1187 if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { 1188 sources[j] = mPacketSources.valueFor(indexToType(j)); 1189 1190 if (!switching) { 1191 sources[j]->clear(); 1192 } else { 1193 int32_t type, seq; 1194 int64_t srcTimeUs; 1195 sp<AMessage> meta = sources[j]->getLatestMeta(); 1196 1197 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1198 CHECK(meta->findInt32("seq", &seq)); 1199 if (seq > latestSeq) { 1200 latestSeq = seq; 1201 } 1202 CHECK(meta->findInt64("timeUs", &srcTimeUs)); 1203 if (srcTimeUs > latestTimeUs) { 1204 latestTimeUs = srcTimeUs; 1205 } 1206 } 1207 1208 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1209 sources[j]->clear(); 1210 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1211 if (extraStreams & indexToType(j)) { 1212 sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); 1213 } 1214 } 1215 1216 streamMask &= ~indexToType(j); 1217 } 1218 } 1219 1220 fetcher->startAsync( 1221 sources[kAudioIndex], 1222 sources[kVideoIndex], 1223 sources[kSubtitleIndex], 1224 timeUs, 1225 latestTimeUs /* min start time(us) */, 1226 latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); 1227 } 1228 1229 // All fetchers have now been started, the configuration change 1230 // has completed. 1231 1232 scheduleCheckBandwidthEvent(); 1233 1234 ALOGV("XXX configuration change completed."); 1235 mReconfigurationInProgress = false; 1236 if (switching) { 1237 mSwitchInProgress = true; 1238 } else { 1239 mStreamMask = mNewStreamMask; 1240 } 1241 1242 if (mDisconnectReplyID != 0) { 1243 finishDisconnect(); 1244 } 1245} 1246 1247void LiveSession::onSwapped(const sp<AMessage> &msg) { 1248 int32_t switchGeneration; 1249 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1250 if (switchGeneration != mSwitchGeneration) { 1251 return; 1252 } 1253 1254 int32_t stream; 1255 CHECK(msg->findInt32("stream", &stream)); 1256 mSwapMask |= stream; 1257 if (mSwapMask != mStreamMask) { 1258 return; 1259 } 1260 1261 // Check if new variant contains extra streams. 1262 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1263 while (extraStreams) { 1264 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1265 swapPacketSource(extraStream); 1266 extraStreams &= ~extraStream; 1267 } 1268 1269 tryToFinishBandwidthSwitch(); 1270} 1271 1272// Mark switch done when: 1273// 1. all old buffers are swapped out, AND 1274// 2. all old fetchers are removed. 1275void LiveSession::tryToFinishBandwidthSwitch() { 1276 bool needToRemoveFetchers = false; 1277 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1278 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1279 needToRemoveFetchers = true; 1280 break; 1281 } 1282 } 1283 if (!needToRemoveFetchers && mSwapMask == mStreamMask) { 1284 mStreamMask = mNewStreamMask; 1285 mSwitchInProgress = false; 1286 mSwapMask = 0; 1287 } 1288} 1289 1290void LiveSession::scheduleCheckBandwidthEvent() { 1291 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1292 msg->setInt32("generation", mCheckBandwidthGeneration); 1293 msg->post(10000000ll); 1294} 1295 1296void LiveSession::cancelCheckBandwidthEvent() { 1297 ++mCheckBandwidthGeneration; 1298} 1299 1300void LiveSession::cancelBandwidthSwitch() { 1301 Mutex::Autolock lock(mSwapMutex); 1302 mSwitchGeneration++; 1303 mSwitchInProgress = false; 1304 mSwapMask = 0; 1305} 1306 1307bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1308 if (mReconfigurationInProgress || mSwitchInProgress) { 1309 return false; 1310 } 1311 1312 if (mPrevBandwidthIndex < 0) { 1313 return true; 1314 } 1315 1316 if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { 1317 return false; 1318 } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { 1319 return canSwitchUp(); 1320 } else { 1321 return true; 1322 } 1323} 1324 1325void LiveSession::onCheckBandwidth() { 1326 size_t bandwidthIndex = getBandwidthIndex(); 1327 if (canSwitchBandwidthTo(bandwidthIndex)) { 1328 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1329 } else { 1330 scheduleCheckBandwidthEvent(); 1331 } 1332 1333 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1334 // schedule another one on return, only an explicit call to 1335 // scheduleCheckBandwidthEvent will do that. 1336 // This ensures that only one configuration change is ongoing at any 1337 // one time, once that completes it'll schedule another check bandwidth 1338 // event. 1339} 1340 1341void LiveSession::postPrepared(status_t err) { 1342 CHECK(mInPreparationPhase); 1343 1344 sp<AMessage> notify = mNotify->dup(); 1345 if (err == OK || err == ERROR_END_OF_STREAM) { 1346 notify->setInt32("what", kWhatPrepared); 1347 } else { 1348 notify->setInt32("what", kWhatPreparationFailed); 1349 notify->setInt32("err", err); 1350 } 1351 1352 notify->post(); 1353 1354 mInPreparationPhase = false; 1355} 1356 1357} // namespace android 1358 1359