LiveSession.cpp revision 404fced9bfa8fa423ee210a271ca051ffd1bec13
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/IMediaHTTPConnection.h> 31#include <media/IMediaHTTPService.h> 32#include <media/stagefright/foundation/hexdump.h> 33#include <media/stagefright/foundation/ABuffer.h> 34#include <media/stagefright/foundation/ADebug.h> 35#include <media/stagefright/foundation/AMessage.h> 36#include <media/stagefright/DataSource.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaErrors.h> 39#include <media/stagefright/MediaHTTP.h> 40#include <media/stagefright/MetaData.h> 41#include <media/stagefright/Utils.h> 42 43#include <utils/Mutex.h> 44 45#include <ctype.h> 46#include <inttypes.h> 47#include <openssl/aes.h> 48#include <openssl/md5.h> 49 50namespace android { 51 52LiveSession::LiveSession( 53 const sp<AMessage> ¬ify, uint32_t flags, 54 const sp<IMediaHTTPService> &httpService) 55 : mNotify(notify), 56 mFlags(flags), 57 mHTTPService(httpService), 58 mInPreparationPhase(true), 59 mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), 60 mPrevBandwidthIndex(-1), 61 mStreamMask(0), 62 mNewStreamMask(0), 63 mSwapMask(0), 64 mCheckBandwidthGeneration(0), 65 mSwitchGeneration(0), 66 mLastDequeuedTimeUs(0ll), 67 mRealTimeBaseUs(0ll), 68 mReconfigurationInProgress(false), 69 mSwitchInProgress(false), 70 mDisconnectReplyID(0), 71 mSeekReplyID(0) { 72 73 mStreams[kAudioIndex] = StreamItem("audio"); 74 mStreams[kVideoIndex] = StreamItem("video"); 75 mStreams[kSubtitleIndex] = StreamItem("subtitles"); 76 77 for (size_t i = 0; i < kMaxStreams; ++i) { 78 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 79 mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 80 } 81} 82 83LiveSession::~LiveSession() { 84} 85 86sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { 87 ABuffer *discontinuity = new ABuffer(0); 88 discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); 89 discontinuity->meta()->setInt32("swapPacketSource", swap); 90 discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); 91 discontinuity->meta()->setInt64("timeUs", -1); 92 return discontinuity; 93} 94 95void LiveSession::swapPacketSource(StreamType stream) { 96 sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); 97 sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); 98 sp<AnotherPacketSource> tmp = aps; 99 aps = aps2; 100 aps2 = tmp; 101 aps2->clear(); 102} 103 104status_t LiveSession::dequeueAccessUnit( 105 StreamType stream, sp<ABuffer> *accessUnit) { 106 if (!(mStreamMask & stream)) { 107 // return -EWOULDBLOCK to avoid halting the decoder 108 // when switching between audio/video and audio only. 109 return -EWOULDBLOCK; 110 } 111 112 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 113 114 status_t finalResult; 115 if (!packetSource->hasBufferAvailable(&finalResult)) { 116 return finalResult == OK ? -EAGAIN : finalResult; 117 } 118 119 status_t err = packetSource->dequeueAccessUnit(accessUnit); 120 121 const char *streamStr; 122 switch (stream) { 123 case STREAMTYPE_AUDIO: 124 streamStr = "audio"; 125 break; 126 case STREAMTYPE_VIDEO: 127 streamStr = "video"; 128 break; 129 case STREAMTYPE_SUBTITLES: 130 streamStr = "subs"; 131 break; 132 default: 133 TRESPASS(); 134 } 135 136 if (err == INFO_DISCONTINUITY) { 137 int32_t type; 138 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 139 140 sp<AMessage> extra; 141 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 142 extra.clear(); 143 } 144 145 ALOGI("[%s] read discontinuity of type %d, extra = %s", 146 streamStr, 147 type, 148 extra == NULL ? "NULL" : extra->debugString().c_str()); 149 150 int32_t swap; 151 if (type == ATSParser::DISCONTINUITY_FORMATCHANGE 152 && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) 153 && swap) { 154 155 int32_t switchGeneration; 156 CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); 157 { 158 Mutex::Autolock lock(mSwapMutex); 159 if (switchGeneration == mSwitchGeneration) { 160 swapPacketSource(stream); 161 sp<AMessage> msg = new AMessage(kWhatSwapped, id()); 162 msg->setInt32("stream", stream); 163 msg->setInt32("switchGeneration", switchGeneration); 164 msg->post(); 165 } 166 } 167 } 168 } else if (err == OK) { 169 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 170 int64_t timeUs; 171 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 172 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 173 174 mLastDequeuedTimeUs = timeUs; 175 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 176 } else if (stream == STREAMTYPE_SUBTITLES) { 177 (*accessUnit)->meta()->setInt32( 178 "trackIndex", mPlaylist->getSelectedIndex()); 179 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 180 } 181 } else { 182 ALOGI("[%s] encountered error %d", streamStr, err); 183 } 184 185 return err; 186} 187 188status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 189 // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. 190 if (!(mStreamMask & stream)) { 191 return UNKNOWN_ERROR; 192 } 193 194 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 195 196 sp<MetaData> meta = packetSource->getFormat(); 197 198 if (meta == NULL) { 199 return -EAGAIN; 200 } 201 202 return convertMetaDataToMessage(meta, format); 203} 204 205void LiveSession::connectAsync( 206 const char *url, const KeyedVector<String8, String8> *headers) { 207 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 208 msg->setString("url", url); 209 210 if (headers != NULL) { 211 msg->setPointer( 212 "headers", 213 new KeyedVector<String8, String8>(*headers)); 214 } 215 216 msg->post(); 217} 218 219status_t LiveSession::disconnect() { 220 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 221 222 sp<AMessage> response; 223 status_t err = msg->postAndAwaitResponse(&response); 224 225 return err; 226} 227 228status_t LiveSession::seekTo(int64_t timeUs) { 229 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 230 msg->setInt64("timeUs", timeUs); 231 232 sp<AMessage> response; 233 status_t err = msg->postAndAwaitResponse(&response); 234 235 uint32_t replyID; 236 CHECK(response == mSeekReply && 0 != mSeekReplyID); 237 mSeekReply.clear(); 238 mSeekReplyID = 0; 239 return err; 240} 241 242void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 243 switch (msg->what()) { 244 case kWhatConnect: 245 { 246 onConnect(msg); 247 break; 248 } 249 250 case kWhatDisconnect: 251 { 252 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 253 254 if (mReconfigurationInProgress) { 255 break; 256 } 257 258 finishDisconnect(); 259 break; 260 } 261 262 case kWhatSeek: 263 { 264 CHECK(msg->senderAwaitsResponse(&mSeekReplyID)); 265 266 status_t err = onSeek(msg); 267 268 mSeekReply = new AMessage; 269 mSeekReply->setInt32("err", err); 270 break; 271 } 272 273 case kWhatFetcherNotify: 274 { 275 int32_t what; 276 CHECK(msg->findInt32("what", &what)); 277 278 switch (what) { 279 case PlaylistFetcher::kWhatStarted: 280 break; 281 case PlaylistFetcher::kWhatPaused: 282 case PlaylistFetcher::kWhatStopped: 283 { 284 if (what == PlaylistFetcher::kWhatStopped) { 285 AString uri; 286 CHECK(msg->findString("uri", &uri)); 287 if (mFetcherInfos.removeItem(uri) < 0) { 288 // ignore duplicated kWhatStopped messages. 289 break; 290 } 291 292 tryToFinishBandwidthSwitch(); 293 } 294 295 if (mContinuation != NULL) { 296 CHECK_GT(mContinuationCounter, 0); 297 if (--mContinuationCounter == 0) { 298 mContinuation->post(); 299 300 if (mSeekReplyID != 0) { 301 CHECK(mSeekReply != NULL); 302 mSeekReply->postReply(mSeekReplyID); 303 } 304 } 305 } 306 break; 307 } 308 309 case PlaylistFetcher::kWhatDurationUpdate: 310 { 311 AString uri; 312 CHECK(msg->findString("uri", &uri)); 313 314 int64_t durationUs; 315 CHECK(msg->findInt64("durationUs", &durationUs)); 316 317 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 318 info->mDurationUs = durationUs; 319 break; 320 } 321 322 case PlaylistFetcher::kWhatError: 323 { 324 status_t err; 325 CHECK(msg->findInt32("err", &err)); 326 327 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 328 329 if (mInPreparationPhase) { 330 postPrepared(err); 331 } 332 333 cancelBandwidthSwitch(); 334 335 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 336 337 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 338 339 mPacketSources.valueFor( 340 STREAMTYPE_SUBTITLES)->signalEOS(err); 341 342 sp<AMessage> notify = mNotify->dup(); 343 notify->setInt32("what", kWhatError); 344 notify->setInt32("err", err); 345 notify->post(); 346 break; 347 } 348 349 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 350 { 351 AString uri; 352 CHECK(msg->findString("uri", &uri)); 353 354 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 355 info->mIsPrepared = true; 356 357 if (mInPreparationPhase) { 358 bool allFetchersPrepared = true; 359 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 360 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 361 allFetchersPrepared = false; 362 break; 363 } 364 } 365 366 if (allFetchersPrepared) { 367 postPrepared(OK); 368 } 369 } 370 break; 371 } 372 373 case PlaylistFetcher::kWhatStartedAt: 374 { 375 int32_t switchGeneration; 376 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 377 378 if (switchGeneration != mSwitchGeneration) { 379 break; 380 } 381 382 // Resume fetcher for the original variant; the resumed fetcher should 383 // continue until the timestamps found in msg, which is stored by the 384 // new fetcher to indicate where the new variant has started buffering. 385 for (size_t i = 0; i < mFetcherInfos.size(); i++) { 386 const FetcherInfo info = mFetcherInfos.valueAt(i); 387 if (info.mToBeRemoved) { 388 info.mFetcher->resumeUntilAsync(msg); 389 } 390 } 391 break; 392 } 393 394 default: 395 TRESPASS(); 396 } 397 398 break; 399 } 400 401 case kWhatCheckBandwidth: 402 { 403 int32_t generation; 404 CHECK(msg->findInt32("generation", &generation)); 405 406 if (generation != mCheckBandwidthGeneration) { 407 break; 408 } 409 410 onCheckBandwidth(); 411 break; 412 } 413 414 case kWhatChangeConfiguration: 415 { 416 onChangeConfiguration(msg); 417 break; 418 } 419 420 case kWhatChangeConfiguration2: 421 { 422 onChangeConfiguration2(msg); 423 break; 424 } 425 426 case kWhatChangeConfiguration3: 427 { 428 onChangeConfiguration3(msg); 429 break; 430 } 431 432 case kWhatFinishDisconnect2: 433 { 434 onFinishDisconnect2(); 435 break; 436 } 437 438 case kWhatSwapped: 439 { 440 onSwapped(msg); 441 break; 442 } 443 default: 444 TRESPASS(); 445 break; 446 } 447} 448 449// static 450int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 451 if (a->mBandwidth < b->mBandwidth) { 452 return -1; 453 } else if (a->mBandwidth == b->mBandwidth) { 454 return 0; 455 } 456 457 return 1; 458} 459 460// static 461LiveSession::StreamType LiveSession::indexToType(int idx) { 462 CHECK(idx >= 0 && idx < kMaxStreams); 463 return (StreamType)(1 << idx); 464} 465 466void LiveSession::onConnect(const sp<AMessage> &msg) { 467 AString url; 468 CHECK(msg->findString("url", &url)); 469 470 KeyedVector<String8, String8> *headers = NULL; 471 if (!msg->findPointer("headers", (void **)&headers)) { 472 mExtraHeaders.clear(); 473 } else { 474 mExtraHeaders = *headers; 475 476 delete headers; 477 headers = NULL; 478 } 479 480 // TODO currently we don't know if we are coming here from incognito mode 481 ALOGI("onConnect %s", uriDebugString(url).c_str()); 482 483 mMasterURL = url; 484 485 bool dummy; 486 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 487 488 if (mPlaylist == NULL) { 489 ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str()); 490 491 postPrepared(ERROR_IO); 492 return; 493 } 494 495 // We trust the content provider to make a reasonable choice of preferred 496 // initial bandwidth by listing it first in the variant playlist. 497 // At startup we really don't have a good estimate on the available 498 // network bandwidth since we haven't tranferred any data yet. Once 499 // we have we can make a better informed choice. 500 size_t initialBandwidth = 0; 501 size_t initialBandwidthIndex = 0; 502 503 if (mPlaylist->isVariantPlaylist()) { 504 for (size_t i = 0; i < mPlaylist->size(); ++i) { 505 BandwidthItem item; 506 507 item.mPlaylistIndex = i; 508 509 sp<AMessage> meta; 510 AString uri; 511 mPlaylist->itemAt(i, &uri, &meta); 512 513 unsigned long bandwidth; 514 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 515 516 if (initialBandwidth == 0) { 517 initialBandwidth = item.mBandwidth; 518 } 519 520 mBandwidthItems.push(item); 521 } 522 523 CHECK_GT(mBandwidthItems.size(), 0u); 524 525 mBandwidthItems.sort(SortByBandwidth); 526 527 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 528 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 529 initialBandwidthIndex = i; 530 break; 531 } 532 } 533 } else { 534 // dummy item. 535 BandwidthItem item; 536 item.mPlaylistIndex = 0; 537 item.mBandwidth = 0; 538 mBandwidthItems.push(item); 539 } 540 541 changeConfiguration( 542 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); 543} 544 545void LiveSession::finishDisconnect() { 546 // No reconfiguration is currently pending, make sure none will trigger 547 // during disconnection either. 548 cancelCheckBandwidthEvent(); 549 550 // Protect mPacketSources from a swapPacketSource race condition through disconnect. 551 // (finishDisconnect, onFinishDisconnect2) 552 cancelBandwidthSwitch(); 553 554 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 555 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 556 } 557 558 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 559 560 mContinuationCounter = mFetcherInfos.size(); 561 mContinuation = msg; 562 563 if (mContinuationCounter == 0) { 564 msg->post(); 565 } 566} 567 568void LiveSession::onFinishDisconnect2() { 569 mContinuation.clear(); 570 571 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 572 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 573 574 mPacketSources.valueFor( 575 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 576 577 sp<AMessage> response = new AMessage; 578 response->setInt32("err", OK); 579 580 response->postReply(mDisconnectReplyID); 581 mDisconnectReplyID = 0; 582} 583 584sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 585 ssize_t index = mFetcherInfos.indexOfKey(uri); 586 587 if (index >= 0) { 588 return NULL; 589 } 590 591 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 592 notify->setString("uri", uri); 593 notify->setInt32("switchGeneration", mSwitchGeneration); 594 595 FetcherInfo info; 596 info.mFetcher = new PlaylistFetcher(notify, this, uri); 597 info.mDurationUs = -1ll; 598 info.mIsPrepared = false; 599 info.mToBeRemoved = false; 600 looper()->registerHandler(info.mFetcher); 601 602 mFetcherInfos.add(uri, info); 603 604 return info.mFetcher; 605} 606 607/* 608 * Illustration of parameters: 609 * 610 * 0 `range_offset` 611 * +------------+-------------------------------------------------------+--+--+ 612 * | | | next block to fetch | | | 613 * | | `source` handle => `out` buffer | | | | 614 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 615 * | |<----------- `range_length` / buffer capacity ----------->| | 616 * |<------------------------------ file_size ------------------------------->| 617 * 618 * Special parameter values: 619 * - range_length == -1 means entire file 620 * - block_size == 0 means entire range 621 * 622 */ 623ssize_t LiveSession::fetchFile( 624 const char *url, sp<ABuffer> *out, 625 int64_t range_offset, int64_t range_length, 626 uint32_t block_size, /* download block size */ 627 sp<DataSource> *source, /* to return and reuse source */ 628 String8 *actualUrl) { 629 off64_t size; 630 sp<DataSource> temp_source; 631 if (source == NULL) { 632 source = &temp_source; 633 } 634 635 if (*source == NULL) { 636 if (!strncasecmp(url, "file://", 7)) { 637 *source = new FileSource(url + 7); 638 } else if (strncasecmp(url, "http://", 7) 639 && strncasecmp(url, "https://", 8)) { 640 return ERROR_UNSUPPORTED; 641 } else { 642 KeyedVector<String8, String8> headers = mExtraHeaders; 643 if (range_offset > 0 || range_length >= 0) { 644 headers.add( 645 String8("Range"), 646 String8( 647 StringPrintf( 648 "bytes=%lld-%s", 649 range_offset, 650 range_length < 0 651 ? "" : StringPrintf("%lld", 652 range_offset + range_length - 1).c_str()).c_str())); 653 } 654 status_t err = mHTTPDataSource->connect(url, &headers); 655 656 if (err != OK) { 657 return err; 658 } 659 660 *source = mHTTPDataSource; 661 } 662 } 663 664 status_t getSizeErr = (*source)->getSize(&size); 665 if (getSizeErr != OK) { 666 size = 65536; 667 } 668 669 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 670 if (*out == NULL) { 671 buffer->setRange(0, 0); 672 } 673 674 ssize_t bytesRead = 0; 675 // adjust range_length if only reading partial block 676 if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) { 677 range_length = buffer->size() + block_size; 678 } 679 for (;;) { 680 // Only resize when we don't know the size. 681 size_t bufferRemaining = buffer->capacity() - buffer->size(); 682 if (bufferRemaining == 0 && getSizeErr != OK) { 683 bufferRemaining = 32768; 684 685 ALOGV("increasing download buffer to %zu bytes", 686 buffer->size() + bufferRemaining); 687 688 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 689 memcpy(copy->data(), buffer->data(), buffer->size()); 690 copy->setRange(0, buffer->size()); 691 692 buffer = copy; 693 } 694 695 size_t maxBytesToRead = bufferRemaining; 696 if (range_length >= 0) { 697 int64_t bytesLeftInRange = range_length - buffer->size(); 698 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 699 maxBytesToRead = bytesLeftInRange; 700 701 if (bytesLeftInRange == 0) { 702 break; 703 } 704 } 705 } 706 707 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 708 // to help us break out of the loop. 709 ssize_t n = (*source)->readAt( 710 buffer->size(), buffer->data() + buffer->size(), 711 maxBytesToRead); 712 713 if (n < 0) { 714 return n; 715 } 716 717 if (n == 0) { 718 break; 719 } 720 721 buffer->setRange(0, buffer->size() + (size_t)n); 722 bytesRead += n; 723 } 724 725 *out = buffer; 726 if (actualUrl != NULL) { 727 *actualUrl = (*source)->getUri(); 728 if (actualUrl->isEmpty()) { 729 *actualUrl = url; 730 } 731 } 732 733 return bytesRead; 734} 735 736sp<M3UParser> LiveSession::fetchPlaylist( 737 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 738 ALOGV("fetchPlaylist '%s'", url); 739 740 *unchanged = false; 741 742 sp<ABuffer> buffer; 743 String8 actualUrl; 744 ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl); 745 746 if (err <= 0) { 747 return NULL; 748 } 749 750 // MD5 functionality is not available on the simulator, treat all 751 // playlists as changed. 752 753#if defined(HAVE_ANDROID_OS) 754 uint8_t hash[16]; 755 756 MD5_CTX m; 757 MD5_Init(&m); 758 MD5_Update(&m, buffer->data(), buffer->size()); 759 760 MD5_Final(hash, &m); 761 762 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 763 // playlist unchanged 764 *unchanged = true; 765 766 return NULL; 767 } 768 769 if (curPlaylistHash != NULL) { 770 memcpy(curPlaylistHash, hash, sizeof(hash)); 771 } 772#endif 773 774 sp<M3UParser> playlist = 775 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 776 777 if (playlist->initCheck() != OK) { 778 ALOGE("failed to parse .m3u8 playlist"); 779 780 return NULL; 781 } 782 783 return playlist; 784} 785 786static double uniformRand() { 787 return (double)rand() / RAND_MAX; 788} 789 790size_t LiveSession::getBandwidthIndex() { 791 if (mBandwidthItems.size() == 0) { 792 return 0; 793 } 794 795#if 1 796 char value[PROPERTY_VALUE_MAX]; 797 ssize_t index = -1; 798 if (property_get("media.httplive.bw-index", value, NULL)) { 799 char *end; 800 index = strtol(value, &end, 10); 801 CHECK(end > value && *end == '\0'); 802 803 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 804 index = mBandwidthItems.size() - 1; 805 } 806 } 807 808 if (index < 0) { 809 int32_t bandwidthBps; 810 if (mHTTPDataSource != NULL 811 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 812 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 813 } else { 814 ALOGV("no bandwidth estimate."); 815 return 0; // Pick the lowest bandwidth stream by default. 816 } 817 818 char value[PROPERTY_VALUE_MAX]; 819 if (property_get("media.httplive.max-bw", value, NULL)) { 820 char *end; 821 long maxBw = strtoul(value, &end, 10); 822 if (end > value && *end == '\0') { 823 if (maxBw > 0 && bandwidthBps > maxBw) { 824 ALOGV("bandwidth capped to %ld bps", maxBw); 825 bandwidthBps = maxBw; 826 } 827 } 828 } 829 830 // Consider only 80% of the available bandwidth usable. 831 bandwidthBps = (bandwidthBps * 8) / 10; 832 833 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 834 835 index = mBandwidthItems.size() - 1; 836 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 837 > (size_t)bandwidthBps) { 838 --index; 839 } 840 } 841#elif 0 842 // Change bandwidth at random() 843 size_t index = uniformRand() * mBandwidthItems.size(); 844#elif 0 845 // There's a 50% chance to stay on the current bandwidth and 846 // a 50% chance to switch to the next higher bandwidth (wrapping around 847 // to lowest) 848 const size_t kMinIndex = 0; 849 850 static ssize_t mPrevBandwidthIndex = -1; 851 852 size_t index; 853 if (mPrevBandwidthIndex < 0) { 854 index = kMinIndex; 855 } else if (uniformRand() < 0.5) { 856 index = (size_t)mPrevBandwidthIndex; 857 } else { 858 index = mPrevBandwidthIndex + 1; 859 if (index == mBandwidthItems.size()) { 860 index = kMinIndex; 861 } 862 } 863 mPrevBandwidthIndex = index; 864#elif 0 865 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 866 867 size_t index = mBandwidthItems.size() - 1; 868 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 869 --index; 870 } 871#elif 1 872 char value[PROPERTY_VALUE_MAX]; 873 size_t index; 874 if (property_get("media.httplive.bw-index", value, NULL)) { 875 char *end; 876 index = strtoul(value, &end, 10); 877 CHECK(end > value && *end == '\0'); 878 879 if (index >= mBandwidthItems.size()) { 880 index = mBandwidthItems.size() - 1; 881 } 882 } else { 883 index = 0; 884 } 885#else 886 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 887#endif 888 889 CHECK_GE(index, 0); 890 891 return index; 892} 893 894status_t LiveSession::onSeek(const sp<AMessage> &msg) { 895 int64_t timeUs; 896 CHECK(msg->findInt64("timeUs", &timeUs)); 897 898 if (!mReconfigurationInProgress) { 899 changeConfiguration(timeUs, getBandwidthIndex()); 900 } 901 902 return OK; 903} 904 905status_t LiveSession::getDuration(int64_t *durationUs) const { 906 int64_t maxDurationUs = 0ll; 907 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 908 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 909 910 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 911 maxDurationUs = fetcherDurationUs; 912 } 913 } 914 915 *durationUs = maxDurationUs; 916 917 return OK; 918} 919 920bool LiveSession::isSeekable() const { 921 int64_t durationUs; 922 return getDuration(&durationUs) == OK && durationUs >= 0; 923} 924 925bool LiveSession::hasDynamicDuration() const { 926 return false; 927} 928 929size_t LiveSession::getTrackCount() const { 930 return mPlaylist->getTrackCount(); 931} 932 933sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const { 934 return mPlaylist->getTrackInfo(trackIndex); 935} 936 937status_t LiveSession::selectTrack(size_t index, bool select) { 938 status_t err = mPlaylist->selectTrack(index, select); 939 if (err == OK) { 940 (new AMessage(kWhatChangeConfiguration, id()))->post(); 941 } 942 return err; 943} 944 945bool LiveSession::canSwitchUp() { 946 // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. 947 status_t err = OK; 948 for (size_t i = 0; i < mPacketSources.size(); ++i) { 949 sp<AnotherPacketSource> source = mPacketSources.valueAt(i); 950 int64_t dur = source->getBufferedDurationUs(&err); 951 if (err == OK && dur > 10000000) { 952 return true; 953 } 954 } 955 return false; 956} 957 958void LiveSession::changeConfiguration( 959 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 960 // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. 961 // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). 962 cancelBandwidthSwitch(); 963 964 CHECK(!mReconfigurationInProgress); 965 mReconfigurationInProgress = true; 966 967 mPrevBandwidthIndex = bandwidthIndex; 968 969 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 970 timeUs, bandwidthIndex, pickTrack); 971 972 if (pickTrack) { 973 mPlaylist->pickRandomMediaItems(); 974 } 975 976 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 977 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 978 979 uint32_t streamMask = 0; // streams that should be fetched by the new fetcher 980 uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher 981 982 AString URIs[kMaxStreams]; 983 for (size_t i = 0; i < kMaxStreams; ++i) { 984 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 985 streamMask |= indexToType(i); 986 } 987 } 988 989 // Step 1, stop and discard fetchers that are no longer needed. 990 // Pause those that we'll reuse. 991 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 992 const AString &uri = mFetcherInfos.keyAt(i); 993 994 bool discardFetcher = true; 995 996 // If we're seeking all current fetchers are discarded. 997 if (timeUs < 0ll) { 998 // delay fetcher removal 999 discardFetcher = false; 1000 1001 for (size_t j = 0; j < kMaxStreams; ++j) { 1002 StreamType type = indexToType(j); 1003 if ((streamMask & type) && uri == URIs[j]) { 1004 resumeMask |= type; 1005 streamMask &= ~type; 1006 } 1007 } 1008 } 1009 1010 if (discardFetcher) { 1011 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 1012 } else { 1013 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 1014 } 1015 } 1016 1017 sp<AMessage> msg; 1018 if (timeUs < 0ll) { 1019 // skip onChangeConfiguration2 (decoder destruction) if switching. 1020 msg = new AMessage(kWhatChangeConfiguration3, id()); 1021 } else { 1022 msg = new AMessage(kWhatChangeConfiguration2, id()); 1023 } 1024 msg->setInt32("streamMask", streamMask); 1025 msg->setInt32("resumeMask", resumeMask); 1026 msg->setInt64("timeUs", timeUs); 1027 for (size_t i = 0; i < kMaxStreams; ++i) { 1028 if (streamMask & indexToType(i)) { 1029 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 1030 } 1031 } 1032 1033 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 1034 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 1035 // fetchers have completed their asynchronous operation, we'll post 1036 // mContinuation, which then is handled below in onChangeConfiguration2. 1037 mContinuationCounter = mFetcherInfos.size(); 1038 mContinuation = msg; 1039 1040 if (mContinuationCounter == 0) { 1041 msg->post(); 1042 1043 if (mSeekReplyID != 0) { 1044 CHECK(mSeekReply != NULL); 1045 mSeekReply->postReply(mSeekReplyID); 1046 } 1047 } 1048} 1049 1050void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 1051 if (!mReconfigurationInProgress) { 1052 changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); 1053 } else { 1054 msg->post(1000000ll); // retry in 1 sec 1055 } 1056} 1057 1058void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 1059 mContinuation.clear(); 1060 1061 // All fetchers are either suspended or have been removed now. 1062 1063 uint32_t streamMask; 1064 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1065 1066 AString URIs[kMaxStreams]; 1067 for (size_t i = 0; i < kMaxStreams; ++i) { 1068 if (streamMask & indexToType(i)) { 1069 const AString &uriKey = mStreams[i].uriKey(); 1070 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 1071 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 1072 } 1073 } 1074 1075 // Determine which decoders to shutdown on the player side, 1076 // a decoder has to be shutdown if either 1077 // 1) its streamtype was active before but now longer isn't. 1078 // or 1079 // 2) its streamtype was already active and still is but the URI 1080 // has changed. 1081 uint32_t changedMask = 0; 1082 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 1083 if (((mStreamMask & streamMask & indexToType(i)) 1084 && !(URIs[i] == mStreams[i].mUri)) 1085 || (mStreamMask & ~streamMask & indexToType(i))) { 1086 changedMask |= indexToType(i); 1087 } 1088 } 1089 1090 if (changedMask == 0) { 1091 // If nothing changed as far as the audio/video decoders 1092 // are concerned we can proceed. 1093 onChangeConfiguration3(msg); 1094 return; 1095 } 1096 1097 // Something changed, inform the player which will shutdown the 1098 // corresponding decoders and will post the reply once that's done. 1099 // Handling the reply will continue executing below in 1100 // onChangeConfiguration3. 1101 sp<AMessage> notify = mNotify->dup(); 1102 notify->setInt32("what", kWhatStreamsChanged); 1103 notify->setInt32("changedMask", changedMask); 1104 1105 msg->setWhat(kWhatChangeConfiguration3); 1106 msg->setTarget(id()); 1107 1108 notify->setMessage("reply", msg); 1109 notify->post(); 1110} 1111 1112void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 1113 mContinuation.clear(); 1114 // All remaining fetchers are still suspended, the player has shutdown 1115 // any decoders that needed it. 1116 1117 uint32_t streamMask, resumeMask; 1118 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 1119 CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); 1120 1121 for (size_t i = 0; i < kMaxStreams; ++i) { 1122 if (streamMask & indexToType(i)) { 1123 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 1124 } 1125 } 1126 1127 int64_t timeUs; 1128 bool switching = false; 1129 CHECK(msg->findInt64("timeUs", &timeUs)); 1130 1131 if (timeUs < 0ll) { 1132 timeUs = mLastDequeuedTimeUs; 1133 switching = true; 1134 } 1135 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 1136 1137 mNewStreamMask = streamMask; 1138 1139 // Of all existing fetchers: 1140 // * Resume fetchers that are still needed and assign them original packet sources. 1141 // * Mark otherwise unneeded fetchers for removal. 1142 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1143 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1144 const AString &uri = mFetcherInfos.keyAt(i); 1145 1146 sp<AnotherPacketSource> sources[kMaxStreams]; 1147 for (size_t j = 0; j < kMaxStreams; ++j) { 1148 if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { 1149 sources[j] = mPacketSources.valueFor(indexToType(j)); 1150 } 1151 } 1152 1153 FetcherInfo &info = mFetcherInfos.editValueAt(i); 1154 if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL 1155 || sources[kSubtitleIndex] != NULL) { 1156 info.mFetcher->startAsync( 1157 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1158 } else { 1159 info.mToBeRemoved = true; 1160 } 1161 } 1162 1163 // streamMask now only contains the types that need a new fetcher created. 1164 1165 if (streamMask != 0) { 1166 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1167 } 1168 1169 // Find out when the original fetchers have buffered up to and start the new fetchers 1170 // at a later timestamp. 1171 for (size_t i = 0; i < kMaxStreams; i++) { 1172 if (!(indexToType(i) & streamMask)) { 1173 continue; 1174 } 1175 1176 AString uri; 1177 uri = mStreams[i].mUri; 1178 1179 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1180 CHECK(fetcher != NULL); 1181 1182 int32_t latestSeq = -1; 1183 int64_t latestTimeUs = 0ll; 1184 sp<AnotherPacketSource> sources[kMaxStreams]; 1185 1186 // TRICKY: looping from i as earlier streams are already removed from streamMask 1187 for (size_t j = i; j < kMaxStreams; ++j) { 1188 if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { 1189 sources[j] = mPacketSources.valueFor(indexToType(j)); 1190 1191 if (!switching) { 1192 sources[j]->clear(); 1193 } else { 1194 int32_t type, seq; 1195 int64_t srcTimeUs; 1196 sp<AMessage> meta = sources[j]->getLatestMeta(); 1197 1198 if (meta != NULL && !meta->findInt32("discontinuity", &type)) { 1199 CHECK(meta->findInt32("seq", &seq)); 1200 if (seq > latestSeq) { 1201 latestSeq = seq; 1202 } 1203 CHECK(meta->findInt64("timeUs", &srcTimeUs)); 1204 if (srcTimeUs > latestTimeUs) { 1205 latestTimeUs = srcTimeUs; 1206 } 1207 } 1208 1209 sources[j] = mPacketSources2.valueFor(indexToType(j)); 1210 sources[j]->clear(); 1211 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1212 if (extraStreams & indexToType(j)) { 1213 sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); 1214 } 1215 } 1216 1217 streamMask &= ~indexToType(j); 1218 } 1219 } 1220 1221 fetcher->startAsync( 1222 sources[kAudioIndex], 1223 sources[kVideoIndex], 1224 sources[kSubtitleIndex], 1225 timeUs, 1226 latestTimeUs /* min start time(us) */, 1227 latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); 1228 } 1229 1230 // All fetchers have now been started, the configuration change 1231 // has completed. 1232 1233 scheduleCheckBandwidthEvent(); 1234 1235 ALOGV("XXX configuration change completed."); 1236 mReconfigurationInProgress = false; 1237 if (switching) { 1238 mSwitchInProgress = true; 1239 } else { 1240 mStreamMask = mNewStreamMask; 1241 } 1242 1243 if (mDisconnectReplyID != 0) { 1244 finishDisconnect(); 1245 } 1246} 1247 1248void LiveSession::onSwapped(const sp<AMessage> &msg) { 1249 int32_t switchGeneration; 1250 CHECK(msg->findInt32("switchGeneration", &switchGeneration)); 1251 if (switchGeneration != mSwitchGeneration) { 1252 return; 1253 } 1254 1255 int32_t stream; 1256 CHECK(msg->findInt32("stream", &stream)); 1257 mSwapMask |= stream; 1258 if (mSwapMask != mStreamMask) { 1259 return; 1260 } 1261 1262 // Check if new variant contains extra streams. 1263 uint32_t extraStreams = mNewStreamMask & (~mStreamMask); 1264 while (extraStreams) { 1265 StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); 1266 swapPacketSource(extraStream); 1267 extraStreams &= ~extraStream; 1268 } 1269 1270 tryToFinishBandwidthSwitch(); 1271} 1272 1273// Mark switch done when: 1274// 1. all old buffers are swapped out, AND 1275// 2. all old fetchers are removed. 1276void LiveSession::tryToFinishBandwidthSwitch() { 1277 bool needToRemoveFetchers = false; 1278 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1279 if (mFetcherInfos.valueAt(i).mToBeRemoved) { 1280 needToRemoveFetchers = true; 1281 break; 1282 } 1283 } 1284 if (!needToRemoveFetchers && mSwapMask == mStreamMask) { 1285 mStreamMask = mNewStreamMask; 1286 mSwitchInProgress = false; 1287 mSwapMask = 0; 1288 } 1289} 1290 1291void LiveSession::scheduleCheckBandwidthEvent() { 1292 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1293 msg->setInt32("generation", mCheckBandwidthGeneration); 1294 msg->post(10000000ll); 1295} 1296 1297void LiveSession::cancelCheckBandwidthEvent() { 1298 ++mCheckBandwidthGeneration; 1299} 1300 1301void LiveSession::cancelBandwidthSwitch() { 1302 Mutex::Autolock lock(mSwapMutex); 1303 mSwitchGeneration++; 1304 mSwitchInProgress = false; 1305 mSwapMask = 0; 1306} 1307 1308bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { 1309 if (mReconfigurationInProgress || mSwitchInProgress) { 1310 return false; 1311 } 1312 1313 if (mPrevBandwidthIndex < 0) { 1314 return true; 1315 } 1316 1317 if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { 1318 return false; 1319 } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { 1320 return canSwitchUp(); 1321 } else { 1322 return true; 1323 } 1324} 1325 1326void LiveSession::onCheckBandwidth() { 1327 size_t bandwidthIndex = getBandwidthIndex(); 1328 if (canSwitchBandwidthTo(bandwidthIndex)) { 1329 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1330 } else { 1331 scheduleCheckBandwidthEvent(); 1332 } 1333 1334 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1335 // schedule another one on return, only an explicit call to 1336 // scheduleCheckBandwidthEvent will do that. 1337 // This ensures that only one configuration change is ongoing at any 1338 // one time, once that completes it'll schedule another check bandwidth 1339 // event. 1340} 1341 1342void LiveSession::postPrepared(status_t err) { 1343 CHECK(mInPreparationPhase); 1344 1345 sp<AMessage> notify = mNotify->dup(); 1346 if (err == OK || err == ERROR_END_OF_STREAM) { 1347 notify->setInt32("what", kWhatPrepared); 1348 } else { 1349 notify->setInt32("what", kWhatPreparationFailed); 1350 notify->setInt32("err", err); 1351 } 1352 1353 notify->post(); 1354 1355 mInPreparationPhase = false; 1356} 1357 1358} // namespace android 1359 1360