LiveSession.cpp revision 94dcc94b16cc6c2a7aa02df2d0d6b8743d738d78
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/stagefright/foundation/hexdump.h> 31#include <media/stagefright/foundation/ABuffer.h> 32#include <media/stagefright/foundation/ADebug.h> 33#include <media/stagefright/foundation/AMessage.h> 34#include <media/stagefright/DataSource.h> 35#include <media/stagefright/FileSource.h> 36#include <media/stagefright/MediaErrors.h> 37#include <media/stagefright/MetaData.h> 38#include <media/stagefright/Utils.h> 39 40#include <ctype.h> 41#include <openssl/aes.h> 42#include <openssl/md5.h> 43 44namespace android { 45 46LiveSession::LiveSession( 47 const sp<AMessage> ¬ify, uint32_t flags, bool uidValid, uid_t uid) 48 : mNotify(notify), 49 mFlags(flags), 50 mUIDValid(uidValid), 51 mUID(uid), 52 mInPreparationPhase(true), 53 mHTTPDataSource( 54 HTTPBase::Create( 55 (mFlags & kFlagIncognito) 56 ? HTTPBase::kFlagIncognito 57 : 0)), 58 mPrevBandwidthIndex(-1), 59 mStreamMask(0), 60 mCheckBandwidthGeneration(0), 61 mLastDequeuedTimeUs(0ll), 62 mRealTimeBaseUs(0ll), 63 mReconfigurationInProgress(false), 64 mDisconnectReplyID(0) { 65 if (mUIDValid) { 66 mHTTPDataSource->setUID(mUID); 67 } 68 69 mPacketSources.add( 70 STREAMTYPE_AUDIO, new AnotherPacketSource(NULL /* meta */)); 71 72 mPacketSources.add( 73 STREAMTYPE_VIDEO, new AnotherPacketSource(NULL /* meta */)); 74 75 mPacketSources.add( 76 STREAMTYPE_SUBTITLES, new AnotherPacketSource(NULL /* meta */)); 77} 78 79LiveSession::~LiveSession() { 80} 81 82status_t LiveSession::dequeueAccessUnit( 83 StreamType stream, sp<ABuffer> *accessUnit) { 84 if (!(mStreamMask & stream)) { 85 return UNKNOWN_ERROR; 86 } 87 88 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 89 90 status_t finalResult; 91 if (!packetSource->hasBufferAvailable(&finalResult)) { 92 return finalResult == OK ? -EAGAIN : finalResult; 93 } 94 95 status_t err = packetSource->dequeueAccessUnit(accessUnit); 96 97 const char *streamStr; 98 switch (stream) { 99 case STREAMTYPE_AUDIO: 100 streamStr = "audio"; 101 break; 102 case STREAMTYPE_VIDEO: 103 streamStr = "video"; 104 break; 105 case STREAMTYPE_SUBTITLES: 106 streamStr = "subs"; 107 break; 108 default: 109 TRESPASS(); 110 } 111 112 if (err == INFO_DISCONTINUITY) { 113 int32_t type; 114 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 115 116 sp<AMessage> extra; 117 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 118 extra.clear(); 119 } 120 121 ALOGI("[%s] read discontinuity of type %d, extra = %s", 122 streamStr, 123 type, 124 extra == NULL ? "NULL" : extra->debugString().c_str()); 125 } else if (err == OK) { 126 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 127 int64_t timeUs; 128 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 129 ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs); 130 131 mLastDequeuedTimeUs = timeUs; 132 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 133 } else if (stream == STREAMTYPE_SUBTITLES) { 134 (*accessUnit)->meta()->setInt32( 135 "trackIndex", mPlaylist->getSelectedIndex()); 136 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 137 } 138 } else { 139 ALOGI("[%s] encountered error %d", streamStr, err); 140 } 141 142 return err; 143} 144 145status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 146 if (!(mStreamMask & stream)) { 147 return UNKNOWN_ERROR; 148 } 149 150 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 151 152 sp<MetaData> meta = packetSource->getFormat(); 153 154 if (meta == NULL) { 155 return -EAGAIN; 156 } 157 158 return convertMetaDataToMessage(meta, format); 159} 160 161void LiveSession::connectAsync( 162 const char *url, const KeyedVector<String8, String8> *headers) { 163 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 164 msg->setString("url", url); 165 166 if (headers != NULL) { 167 msg->setPointer( 168 "headers", 169 new KeyedVector<String8, String8>(*headers)); 170 } 171 172 msg->post(); 173} 174 175status_t LiveSession::disconnect() { 176 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 177 178 sp<AMessage> response; 179 status_t err = msg->postAndAwaitResponse(&response); 180 181 return err; 182} 183 184status_t LiveSession::seekTo(int64_t timeUs) { 185 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 186 msg->setInt64("timeUs", timeUs); 187 188 sp<AMessage> response; 189 status_t err = msg->postAndAwaitResponse(&response); 190 191 return err; 192} 193 194void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 195 switch (msg->what()) { 196 case kWhatConnect: 197 { 198 onConnect(msg); 199 break; 200 } 201 202 case kWhatDisconnect: 203 { 204 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 205 206 if (mReconfigurationInProgress) { 207 break; 208 } 209 210 finishDisconnect(); 211 break; 212 } 213 214 case kWhatSeek: 215 { 216 uint32_t replyID; 217 CHECK(msg->senderAwaitsResponse(&replyID)); 218 219 status_t err = onSeek(msg); 220 221 sp<AMessage> response = new AMessage; 222 response->setInt32("err", err); 223 224 response->postReply(replyID); 225 break; 226 } 227 228 case kWhatFetcherNotify: 229 { 230 int32_t what; 231 CHECK(msg->findInt32("what", &what)); 232 233 switch (what) { 234 case PlaylistFetcher::kWhatStarted: 235 break; 236 case PlaylistFetcher::kWhatPaused: 237 case PlaylistFetcher::kWhatStopped: 238 { 239 if (what == PlaylistFetcher::kWhatStopped) { 240 AString uri; 241 CHECK(msg->findString("uri", &uri)); 242 mFetcherInfos.removeItem(uri); 243 } 244 245 if (mContinuation != NULL) { 246 CHECK_GT(mContinuationCounter, 0); 247 if (--mContinuationCounter == 0) { 248 mContinuation->post(); 249 } 250 } 251 break; 252 } 253 254 case PlaylistFetcher::kWhatDurationUpdate: 255 { 256 AString uri; 257 CHECK(msg->findString("uri", &uri)); 258 259 int64_t durationUs; 260 CHECK(msg->findInt64("durationUs", &durationUs)); 261 262 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 263 info->mDurationUs = durationUs; 264 break; 265 } 266 267 case PlaylistFetcher::kWhatError: 268 { 269 status_t err; 270 CHECK(msg->findInt32("err", &err)); 271 272 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 273 274 if (mInPreparationPhase) { 275 postPrepared(err); 276 } 277 278 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 279 280 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 281 282 mPacketSources.valueFor( 283 STREAMTYPE_SUBTITLES)->signalEOS(err); 284 285 sp<AMessage> notify = mNotify->dup(); 286 notify->setInt32("what", kWhatError); 287 notify->setInt32("err", err); 288 notify->post(); 289 break; 290 } 291 292 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 293 { 294 AString uri; 295 CHECK(msg->findString("uri", &uri)); 296 297 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 298 info->mIsPrepared = true; 299 300 if (mInPreparationPhase) { 301 bool allFetchersPrepared = true; 302 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 303 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 304 allFetchersPrepared = false; 305 break; 306 } 307 } 308 309 if (allFetchersPrepared) { 310 postPrepared(OK); 311 } 312 } 313 break; 314 } 315 316 default: 317 TRESPASS(); 318 } 319 320 break; 321 } 322 323 case kWhatCheckBandwidth: 324 { 325 int32_t generation; 326 CHECK(msg->findInt32("generation", &generation)); 327 328 if (generation != mCheckBandwidthGeneration) { 329 break; 330 } 331 332 onCheckBandwidth(); 333 break; 334 } 335 336 case kWhatChangeConfiguration: 337 { 338 onChangeConfiguration(msg); 339 break; 340 } 341 342 case kWhatChangeConfiguration2: 343 { 344 onChangeConfiguration2(msg); 345 break; 346 } 347 348 case kWhatChangeConfiguration3: 349 { 350 onChangeConfiguration3(msg); 351 break; 352 } 353 354 case kWhatFinishDisconnect2: 355 { 356 onFinishDisconnect2(); 357 break; 358 } 359 360 default: 361 TRESPASS(); 362 break; 363 } 364} 365 366// static 367int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 368 if (a->mBandwidth < b->mBandwidth) { 369 return -1; 370 } else if (a->mBandwidth == b->mBandwidth) { 371 return 0; 372 } 373 374 return 1; 375} 376 377void LiveSession::onConnect(const sp<AMessage> &msg) { 378 AString url; 379 CHECK(msg->findString("url", &url)); 380 381 KeyedVector<String8, String8> *headers = NULL; 382 if (!msg->findPointer("headers", (void **)&headers)) { 383 mExtraHeaders.clear(); 384 } else { 385 mExtraHeaders = *headers; 386 387 delete headers; 388 headers = NULL; 389 } 390 391#if 1 392 ALOGI("onConnect <URL suppressed>"); 393#else 394 ALOGI("onConnect %s", url.c_str()); 395#endif 396 397 mMasterURL = url; 398 399 bool dummy; 400 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 401 402 if (mPlaylist == NULL) { 403 ALOGE("unable to fetch master playlist '%s'.", url.c_str()); 404 405 postPrepared(ERROR_IO); 406 return; 407 } 408 409 // We trust the content provider to make a reasonable choice of preferred 410 // initial bandwidth by listing it first in the variant playlist. 411 // At startup we really don't have a good estimate on the available 412 // network bandwidth since we haven't tranferred any data yet. Once 413 // we have we can make a better informed choice. 414 size_t initialBandwidth = 0; 415 size_t initialBandwidthIndex = 0; 416 417 if (mPlaylist->isVariantPlaylist()) { 418 for (size_t i = 0; i < mPlaylist->size(); ++i) { 419 BandwidthItem item; 420 421 item.mPlaylistIndex = i; 422 423 sp<AMessage> meta; 424 AString uri; 425 mPlaylist->itemAt(i, &uri, &meta); 426 427 unsigned long bandwidth; 428 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 429 430 if (initialBandwidth == 0) { 431 initialBandwidth = item.mBandwidth; 432 } 433 434 mBandwidthItems.push(item); 435 } 436 437 CHECK_GT(mBandwidthItems.size(), 0u); 438 439 mBandwidthItems.sort(SortByBandwidth); 440 441 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 442 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 443 initialBandwidthIndex = i; 444 break; 445 } 446 } 447 } else { 448 // dummy item. 449 BandwidthItem item; 450 item.mPlaylistIndex = 0; 451 item.mBandwidth = 0; 452 mBandwidthItems.push(item); 453 } 454 455 changeConfiguration( 456 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); 457} 458 459void LiveSession::finishDisconnect() { 460 // No reconfiguration is currently pending, make sure none will trigger 461 // during disconnection either. 462 cancelCheckBandwidthEvent(); 463 464 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 465 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 466 } 467 468 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 469 470 mContinuationCounter = mFetcherInfos.size(); 471 mContinuation = msg; 472 473 if (mContinuationCounter == 0) { 474 msg->post(); 475 } 476} 477 478void LiveSession::onFinishDisconnect2() { 479 mContinuation.clear(); 480 481 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 482 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 483 484 mPacketSources.valueFor( 485 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 486 487 sp<AMessage> response = new AMessage; 488 response->setInt32("err", OK); 489 490 response->postReply(mDisconnectReplyID); 491 mDisconnectReplyID = 0; 492} 493 494sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 495 ssize_t index = mFetcherInfos.indexOfKey(uri); 496 497 if (index >= 0) { 498 return NULL; 499 } 500 501 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 502 notify->setString("uri", uri); 503 504 FetcherInfo info; 505 info.mFetcher = new PlaylistFetcher(notify, this, uri); 506 info.mDurationUs = -1ll; 507 info.mIsPrepared = false; 508 looper()->registerHandler(info.mFetcher); 509 510 mFetcherInfos.add(uri, info); 511 512 return info.mFetcher; 513} 514 515status_t LiveSession::fetchFile( 516 const char *url, sp<ABuffer> *out, 517 int64_t range_offset, int64_t range_length) { 518 *out = NULL; 519 520 sp<DataSource> source; 521 522 if (!strncasecmp(url, "file://", 7)) { 523 source = new FileSource(url + 7); 524 } else if (strncasecmp(url, "http://", 7) 525 && strncasecmp(url, "https://", 8)) { 526 return ERROR_UNSUPPORTED; 527 } else { 528 KeyedVector<String8, String8> headers = mExtraHeaders; 529 if (range_offset > 0 || range_length >= 0) { 530 headers.add( 531 String8("Range"), 532 String8( 533 StringPrintf( 534 "bytes=%lld-%s", 535 range_offset, 536 range_length < 0 537 ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str())); 538 } 539 status_t err = mHTTPDataSource->connect(url, &headers); 540 541 if (err != OK) { 542 return err; 543 } 544 545 source = mHTTPDataSource; 546 } 547 548 off64_t size; 549 status_t err = source->getSize(&size); 550 551 if (err != OK) { 552 size = 65536; 553 } 554 555 sp<ABuffer> buffer = new ABuffer(size); 556 buffer->setRange(0, 0); 557 558 for (;;) { 559 size_t bufferRemaining = buffer->capacity() - buffer->size(); 560 561 if (bufferRemaining == 0) { 562 bufferRemaining = 32768; 563 564 ALOGV("increasing download buffer to %d bytes", 565 buffer->size() + bufferRemaining); 566 567 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 568 memcpy(copy->data(), buffer->data(), buffer->size()); 569 copy->setRange(0, buffer->size()); 570 571 buffer = copy; 572 } 573 574 size_t maxBytesToRead = bufferRemaining; 575 if (range_length >= 0) { 576 int64_t bytesLeftInRange = range_length - buffer->size(); 577 if (bytesLeftInRange < maxBytesToRead) { 578 maxBytesToRead = bytesLeftInRange; 579 580 if (bytesLeftInRange == 0) { 581 break; 582 } 583 } 584 } 585 586 ssize_t n = source->readAt( 587 buffer->size(), buffer->data() + buffer->size(), 588 maxBytesToRead); 589 590 if (n < 0) { 591 return n; 592 } 593 594 if (n == 0) { 595 break; 596 } 597 598 buffer->setRange(0, buffer->size() + (size_t)n); 599 } 600 601 *out = buffer; 602 603 return OK; 604} 605 606sp<M3UParser> LiveSession::fetchPlaylist( 607 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 608 ALOGV("fetchPlaylist '%s'", url); 609 610 *unchanged = false; 611 612 sp<ABuffer> buffer; 613 status_t err = fetchFile(url, &buffer); 614 615 if (err != OK) { 616 return NULL; 617 } 618 619 // MD5 functionality is not available on the simulator, treat all 620 // playlists as changed. 621 622#if defined(HAVE_ANDROID_OS) 623 uint8_t hash[16]; 624 625 MD5_CTX m; 626 MD5_Init(&m); 627 MD5_Update(&m, buffer->data(), buffer->size()); 628 629 MD5_Final(hash, &m); 630 631 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 632 // playlist unchanged 633 *unchanged = true; 634 635 return NULL; 636 } 637 638 if (curPlaylistHash != NULL) { 639 memcpy(curPlaylistHash, hash, sizeof(hash)); 640 } 641#endif 642 643 sp<M3UParser> playlist = 644 new M3UParser(url, buffer->data(), buffer->size()); 645 646 if (playlist->initCheck() != OK) { 647 ALOGE("failed to parse .m3u8 playlist"); 648 649 return NULL; 650 } 651 652 return playlist; 653} 654 655static double uniformRand() { 656 return (double)rand() / RAND_MAX; 657} 658 659size_t LiveSession::getBandwidthIndex() { 660 if (mBandwidthItems.size() == 0) { 661 return 0; 662 } 663 664#if 1 665 char value[PROPERTY_VALUE_MAX]; 666 ssize_t index = -1; 667 if (property_get("media.httplive.bw-index", value, NULL)) { 668 char *end; 669 index = strtol(value, &end, 10); 670 CHECK(end > value && *end == '\0'); 671 672 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 673 index = mBandwidthItems.size() - 1; 674 } 675 } 676 677 if (index < 0) { 678 int32_t bandwidthBps; 679 if (mHTTPDataSource != NULL 680 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 681 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 682 } else { 683 ALOGV("no bandwidth estimate."); 684 return 0; // Pick the lowest bandwidth stream by default. 685 } 686 687 char value[PROPERTY_VALUE_MAX]; 688 if (property_get("media.httplive.max-bw", value, NULL)) { 689 char *end; 690 long maxBw = strtoul(value, &end, 10); 691 if (end > value && *end == '\0') { 692 if (maxBw > 0 && bandwidthBps > maxBw) { 693 ALOGV("bandwidth capped to %ld bps", maxBw); 694 bandwidthBps = maxBw; 695 } 696 } 697 } 698 699 // Consider only 80% of the available bandwidth usable. 700 bandwidthBps = (bandwidthBps * 8) / 10; 701 702 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 703 704 index = mBandwidthItems.size() - 1; 705 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 706 > (size_t)bandwidthBps) { 707 --index; 708 } 709 } 710#elif 0 711 // Change bandwidth at random() 712 size_t index = uniformRand() * mBandwidthItems.size(); 713#elif 0 714 // There's a 50% chance to stay on the current bandwidth and 715 // a 50% chance to switch to the next higher bandwidth (wrapping around 716 // to lowest) 717 const size_t kMinIndex = 0; 718 719 static ssize_t mPrevBandwidthIndex = -1; 720 721 size_t index; 722 if (mPrevBandwidthIndex < 0) { 723 index = kMinIndex; 724 } else if (uniformRand() < 0.5) { 725 index = (size_t)mPrevBandwidthIndex; 726 } else { 727 index = mPrevBandwidthIndex + 1; 728 if (index == mBandwidthItems.size()) { 729 index = kMinIndex; 730 } 731 } 732 mPrevBandwidthIndex = index; 733#elif 0 734 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 735 736 size_t index = mBandwidthItems.size() - 1; 737 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 738 --index; 739 } 740#elif 1 741 char value[PROPERTY_VALUE_MAX]; 742 size_t index; 743 if (property_get("media.httplive.bw-index", value, NULL)) { 744 char *end; 745 index = strtoul(value, &end, 10); 746 CHECK(end > value && *end == '\0'); 747 748 if (index >= mBandwidthItems.size()) { 749 index = mBandwidthItems.size() - 1; 750 } 751 } else { 752 index = 0; 753 } 754#else 755 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 756#endif 757 758 CHECK_GE(index, 0); 759 760 return index; 761} 762 763status_t LiveSession::onSeek(const sp<AMessage> &msg) { 764 int64_t timeUs; 765 CHECK(msg->findInt64("timeUs", &timeUs)); 766 767 if (!mReconfigurationInProgress) { 768 changeConfiguration(timeUs, getBandwidthIndex()); 769 } 770 771 return OK; 772} 773 774status_t LiveSession::getDuration(int64_t *durationUs) const { 775 int64_t maxDurationUs = 0ll; 776 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 777 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 778 779 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 780 maxDurationUs = fetcherDurationUs; 781 } 782 } 783 784 *durationUs = maxDurationUs; 785 786 return OK; 787} 788 789bool LiveSession::isSeekable() const { 790 int64_t durationUs; 791 return getDuration(&durationUs) == OK && durationUs >= 0; 792} 793 794bool LiveSession::hasDynamicDuration() const { 795 return false; 796} 797 798status_t LiveSession::getTrackInfo(Parcel *reply) const { 799 return mPlaylist->getTrackInfo(reply); 800} 801 802status_t LiveSession::selectTrack(size_t index, bool select) { 803 status_t err = mPlaylist->selectTrack(index, select); 804 if (err == OK) { 805 (new AMessage(kWhatChangeConfiguration, id()))->post(); 806 } 807 return err; 808} 809 810void LiveSession::changeConfiguration( 811 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 812 CHECK(!mReconfigurationInProgress); 813 mReconfigurationInProgress = true; 814 815 mPrevBandwidthIndex = bandwidthIndex; 816 817 ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d", 818 timeUs, bandwidthIndex, pickTrack); 819 820 if (pickTrack) { 821 mPlaylist->pickRandomMediaItems(); 822 } 823 824 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 825 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 826 827 uint32_t streamMask = 0; 828 829 AString audioURI; 830 if (mPlaylist->getAudioURI(item.mPlaylistIndex, &audioURI)) { 831 streamMask |= STREAMTYPE_AUDIO; 832 } 833 834 AString videoURI; 835 if (mPlaylist->getVideoURI(item.mPlaylistIndex, &videoURI)) { 836 streamMask |= STREAMTYPE_VIDEO; 837 } 838 839 AString subtitleURI; 840 if (mPlaylist->getSubtitleURI(item.mPlaylistIndex, &subtitleURI)) { 841 streamMask |= STREAMTYPE_SUBTITLES; 842 } 843 844 // Step 1, stop and discard fetchers that are no longer needed. 845 // Pause those that we'll reuse. 846 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 847 const AString &uri = mFetcherInfos.keyAt(i); 848 849 bool discardFetcher = true; 850 851 // If we're seeking all current fetchers are discarded. 852 if (timeUs < 0ll) { 853 if (((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) 854 || ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) 855 || ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI)) { 856 discardFetcher = false; 857 } 858 } 859 860 if (discardFetcher) { 861 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 862 } else { 863 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 864 } 865 } 866 867 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id()); 868 msg->setInt32("streamMask", streamMask); 869 msg->setInt64("timeUs", timeUs); 870 if (streamMask & STREAMTYPE_AUDIO) { 871 msg->setString("audioURI", audioURI.c_str()); 872 } 873 if (streamMask & STREAMTYPE_VIDEO) { 874 msg->setString("videoURI", videoURI.c_str()); 875 } 876 if (streamMask & STREAMTYPE_SUBTITLES) { 877 msg->setString("subtitleURI", subtitleURI.c_str()); 878 } 879 880 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 881 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 882 // fetchers have completed their asynchronous operation, we'll post 883 // mContinuation, which then is handled below in onChangeConfiguration2. 884 mContinuationCounter = mFetcherInfos.size(); 885 mContinuation = msg; 886 887 if (mContinuationCounter == 0) { 888 msg->post(); 889 } 890} 891 892void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 893 if (!mReconfigurationInProgress) { 894 changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); 895 } else { 896 msg->post(1000000ll); // retry in 1 sec 897 } 898} 899 900void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 901 mContinuation.clear(); 902 903 // All fetchers are either suspended or have been removed now. 904 905 uint32_t streamMask; 906 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 907 908 AString audioURI, videoURI, subtitleURI; 909 if (streamMask & STREAMTYPE_AUDIO) { 910 CHECK(msg->findString("audioURI", &audioURI)); 911 ALOGV("audioURI = '%s'", audioURI.c_str()); 912 } 913 if (streamMask & STREAMTYPE_VIDEO) { 914 CHECK(msg->findString("videoURI", &videoURI)); 915 ALOGV("videoURI = '%s'", videoURI.c_str()); 916 } 917 if (streamMask & STREAMTYPE_SUBTITLES) { 918 CHECK(msg->findString("subtitleURI", &subtitleURI)); 919 ALOGV("subtitleURI = '%s'", subtitleURI.c_str()); 920 } 921 922 // Determine which decoders to shutdown on the player side, 923 // a decoder has to be shutdown if either 924 // 1) its streamtype was active before but now longer isn't. 925 // or 926 // 2) its streamtype was already active and still is but the URI 927 // has changed. 928 uint32_t changedMask = 0; 929 if (((mStreamMask & streamMask & STREAMTYPE_AUDIO) 930 && !(audioURI == mAudioURI)) 931 || (mStreamMask & ~streamMask & STREAMTYPE_AUDIO)) { 932 changedMask |= STREAMTYPE_AUDIO; 933 } 934 if (((mStreamMask & streamMask & STREAMTYPE_VIDEO) 935 && !(videoURI == mVideoURI)) 936 || (mStreamMask & ~streamMask & STREAMTYPE_VIDEO)) { 937 changedMask |= STREAMTYPE_VIDEO; 938 } 939 940 if (changedMask == 0) { 941 // If nothing changed as far as the audio/video decoders 942 // are concerned we can proceed. 943 onChangeConfiguration3(msg); 944 return; 945 } 946 947 // Something changed, inform the player which will shutdown the 948 // corresponding decoders and will post the reply once that's done. 949 // Handling the reply will continue executing below in 950 // onChangeConfiguration3. 951 sp<AMessage> notify = mNotify->dup(); 952 notify->setInt32("what", kWhatStreamsChanged); 953 notify->setInt32("changedMask", changedMask); 954 955 msg->setWhat(kWhatChangeConfiguration3); 956 msg->setTarget(id()); 957 958 notify->setMessage("reply", msg); 959 notify->post(); 960} 961 962void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 963 // All remaining fetchers are still suspended, the player has shutdown 964 // any decoders that needed it. 965 966 uint32_t streamMask; 967 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 968 969 AString audioURI, videoURI, subtitleURI; 970 if (streamMask & STREAMTYPE_AUDIO) { 971 CHECK(msg->findString("audioURI", &audioURI)); 972 } 973 if (streamMask & STREAMTYPE_VIDEO) { 974 CHECK(msg->findString("videoURI", &videoURI)); 975 } 976 if (streamMask & STREAMTYPE_SUBTITLES) { 977 CHECK(msg->findString("subtitleURI", &subtitleURI)); 978 } 979 980 int64_t timeUs; 981 CHECK(msg->findInt64("timeUs", &timeUs)); 982 983 if (timeUs < 0ll) { 984 timeUs = mLastDequeuedTimeUs; 985 } 986 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 987 988 mStreamMask = streamMask; 989 mAudioURI = audioURI; 990 mVideoURI = videoURI; 991 mSubtitleURI = subtitleURI; 992 993 // Resume all existing fetchers and assign them packet sources. 994 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 995 const AString &uri = mFetcherInfos.keyAt(i); 996 997 uint32_t resumeMask = 0; 998 999 sp<AnotherPacketSource> audioSource; 1000 if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) { 1001 audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 1002 resumeMask |= STREAMTYPE_AUDIO; 1003 } 1004 1005 sp<AnotherPacketSource> videoSource; 1006 if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) { 1007 videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 1008 resumeMask |= STREAMTYPE_VIDEO; 1009 } 1010 1011 sp<AnotherPacketSource> subtitleSource; 1012 if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) { 1013 subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES); 1014 resumeMask |= STREAMTYPE_SUBTITLES; 1015 } 1016 1017 CHECK_NE(resumeMask, 0u); 1018 1019 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1020 1021 streamMask &= ~resumeMask; 1022 1023 mFetcherInfos.valueAt(i).mFetcher->startAsync( 1024 audioSource, videoSource, subtitleSource); 1025 } 1026 1027 // streamMask now only contains the types that need a new fetcher created. 1028 1029 if (streamMask != 0) { 1030 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1031 } 1032 1033 while (streamMask != 0) { 1034 StreamType streamType = (StreamType)(streamMask & ~(streamMask - 1)); 1035 1036 AString uri; 1037 switch (streamType) { 1038 case STREAMTYPE_AUDIO: 1039 uri = audioURI; 1040 break; 1041 case STREAMTYPE_VIDEO: 1042 uri = videoURI; 1043 break; 1044 case STREAMTYPE_SUBTITLES: 1045 uri = subtitleURI; 1046 break; 1047 default: 1048 TRESPASS(); 1049 } 1050 1051 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1052 CHECK(fetcher != NULL); 1053 1054 sp<AnotherPacketSource> audioSource; 1055 if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) { 1056 audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 1057 audioSource->clear(); 1058 1059 streamMask &= ~STREAMTYPE_AUDIO; 1060 } 1061 1062 sp<AnotherPacketSource> videoSource; 1063 if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) { 1064 videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 1065 videoSource->clear(); 1066 1067 streamMask &= ~STREAMTYPE_VIDEO; 1068 } 1069 1070 sp<AnotherPacketSource> subtitleSource; 1071 if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) { 1072 subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES); 1073 subtitleSource->clear(); 1074 1075 streamMask &= ~STREAMTYPE_SUBTITLES; 1076 } 1077 1078 fetcher->startAsync(audioSource, videoSource, subtitleSource, timeUs); 1079 } 1080 1081 // All fetchers have now been started, the configuration change 1082 // has completed. 1083 1084 scheduleCheckBandwidthEvent(); 1085 1086 ALOGV("XXX configuration change completed."); 1087 1088 mReconfigurationInProgress = false; 1089 1090 if (mDisconnectReplyID != 0) { 1091 finishDisconnect(); 1092 } 1093} 1094 1095void LiveSession::scheduleCheckBandwidthEvent() { 1096 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1097 msg->setInt32("generation", mCheckBandwidthGeneration); 1098 msg->post(10000000ll); 1099} 1100 1101void LiveSession::cancelCheckBandwidthEvent() { 1102 ++mCheckBandwidthGeneration; 1103} 1104 1105void LiveSession::onCheckBandwidth() { 1106 if (mReconfigurationInProgress) { 1107 scheduleCheckBandwidthEvent(); 1108 return; 1109 } 1110 1111 size_t bandwidthIndex = getBandwidthIndex(); 1112 if (mPrevBandwidthIndex < 0 1113 || bandwidthIndex != (size_t)mPrevBandwidthIndex) { 1114 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1115 } 1116 1117 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1118 // schedule another one on return, only an explicit call to 1119 // scheduleCheckBandwidthEvent will do that. 1120 // This ensures that only one configuration change is ongoing at any 1121 // one time, once that completes it'll schedule another check bandwidth 1122 // event. 1123} 1124 1125void LiveSession::postPrepared(status_t err) { 1126 CHECK(mInPreparationPhase); 1127 1128 sp<AMessage> notify = mNotify->dup(); 1129 if (err == OK || err == ERROR_END_OF_STREAM) { 1130 notify->setInt32("what", kWhatPrepared); 1131 } else { 1132 notify->setInt32("what", kWhatPreparationFailed); 1133 notify->setInt32("err", err); 1134 } 1135 1136 notify->post(); 1137 1138 mInPreparationPhase = false; 1139} 1140 1141} // namespace android 1142 1143