LiveSession.cpp revision a8b8488f703bb6bda039d7d98f87e4f9d845664d
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/stagefright/foundation/hexdump.h> 31#include <media/stagefright/foundation/ABuffer.h> 32#include <media/stagefright/foundation/ADebug.h> 33#include <media/stagefright/foundation/AMessage.h> 34#include <media/stagefright/DataSource.h> 35#include <media/stagefright/FileSource.h> 36#include <media/stagefright/MediaErrors.h> 37#include <media/stagefright/MetaData.h> 38#include <media/stagefright/Utils.h> 39 40#include <ctype.h> 41#include <openssl/aes.h> 42#include <openssl/md5.h> 43 44namespace android { 45 46LiveSession::LiveSession( 47 const sp<AMessage> ¬ify, uint32_t flags, bool uidValid, uid_t uid) 48 : mNotify(notify), 49 mFlags(flags), 50 mUIDValid(uidValid), 51 mUID(uid), 52 mInPreparationPhase(true), 53 mHTTPDataSource( 54 HTTPBase::Create( 55 (mFlags & kFlagIncognito) 56 ? HTTPBase::kFlagIncognito 57 : 0)), 58 mPrevBandwidthIndex(-1), 59 mStreamMask(0), 60 mCheckBandwidthGeneration(0), 61 mLastDequeuedTimeUs(0ll), 62 mRealTimeBaseUs(0ll), 63 mReconfigurationInProgress(false), 64 mDisconnectReplyID(0) { 65 if (mUIDValid) { 66 mHTTPDataSource->setUID(mUID); 67 } 68 69 mPacketSources.add( 70 STREAMTYPE_AUDIO, new AnotherPacketSource(NULL /* meta */)); 71 72 mPacketSources.add( 73 STREAMTYPE_VIDEO, new AnotherPacketSource(NULL /* meta */)); 74 75 mPacketSources.add( 76 STREAMTYPE_SUBTITLES, new AnotherPacketSource(NULL /* meta */)); 77} 78 79LiveSession::~LiveSession() { 80} 81 82status_t LiveSession::dequeueAccessUnit( 83 StreamType stream, sp<ABuffer> *accessUnit) { 84 if (!(mStreamMask & stream)) { 85 return UNKNOWN_ERROR; 86 } 87 88 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 89 90 status_t finalResult; 91 if (!packetSource->hasBufferAvailable(&finalResult)) { 92 return finalResult == OK ? -EAGAIN : finalResult; 93 } 94 95 status_t err = packetSource->dequeueAccessUnit(accessUnit); 96 97 const char *streamStr; 98 switch (stream) { 99 case STREAMTYPE_AUDIO: 100 streamStr = "audio"; 101 break; 102 case STREAMTYPE_VIDEO: 103 streamStr = "video"; 104 break; 105 case STREAMTYPE_SUBTITLES: 106 streamStr = "subs"; 107 break; 108 default: 109 TRESPASS(); 110 } 111 112 if (err == INFO_DISCONTINUITY) { 113 int32_t type; 114 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 115 116 sp<AMessage> extra; 117 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 118 extra.clear(); 119 } 120 121 ALOGI("[%s] read discontinuity of type %d, extra = %s", 122 streamStr, 123 type, 124 extra == NULL ? "NULL" : extra->debugString().c_str()); 125 } else if (err == OK) { 126 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 127 int64_t timeUs; 128 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 129 ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs); 130 131 mLastDequeuedTimeUs = timeUs; 132 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 133 } else if (stream == STREAMTYPE_SUBTITLES) { 134 (*accessUnit)->meta()->setInt32( 135 "trackIndex", mPlaylist->getSelectedIndex()); 136 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 137 } 138 } else { 139 ALOGI("[%s] encountered error %d", streamStr, err); 140 } 141 142 return err; 143} 144 145status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 146 if (!(mStreamMask & stream)) { 147 return UNKNOWN_ERROR; 148 } 149 150 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 151 152 sp<MetaData> meta = packetSource->getFormat(); 153 154 if (meta == NULL) { 155 return -EAGAIN; 156 } 157 158 return convertMetaDataToMessage(meta, format); 159} 160 161void LiveSession::connectAsync( 162 const char *url, const KeyedVector<String8, String8> *headers) { 163 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 164 msg->setString("url", url); 165 166 if (headers != NULL) { 167 msg->setPointer( 168 "headers", 169 new KeyedVector<String8, String8>(*headers)); 170 } 171 172 msg->post(); 173} 174 175status_t LiveSession::disconnect() { 176 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 177 178 sp<AMessage> response; 179 status_t err = msg->postAndAwaitResponse(&response); 180 181 return err; 182} 183 184status_t LiveSession::seekTo(int64_t timeUs) { 185 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 186 msg->setInt64("timeUs", timeUs); 187 188 sp<AMessage> response; 189 status_t err = msg->postAndAwaitResponse(&response); 190 191 return err; 192} 193 194void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 195 switch (msg->what()) { 196 case kWhatConnect: 197 { 198 onConnect(msg); 199 break; 200 } 201 202 case kWhatDisconnect: 203 { 204 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 205 206 if (mReconfigurationInProgress) { 207 break; 208 } 209 210 finishDisconnect(); 211 break; 212 } 213 214 case kWhatSeek: 215 { 216 uint32_t replyID; 217 CHECK(msg->senderAwaitsResponse(&replyID)); 218 219 status_t err = onSeek(msg); 220 221 sp<AMessage> response = new AMessage; 222 response->setInt32("err", err); 223 224 response->postReply(replyID); 225 break; 226 } 227 228 case kWhatFetcherNotify: 229 { 230 int32_t what; 231 CHECK(msg->findInt32("what", &what)); 232 233 switch (what) { 234 case PlaylistFetcher::kWhatStarted: 235 break; 236 case PlaylistFetcher::kWhatPaused: 237 case PlaylistFetcher::kWhatStopped: 238 { 239 if (what == PlaylistFetcher::kWhatStopped) { 240 AString uri; 241 CHECK(msg->findString("uri", &uri)); 242 mFetcherInfos.removeItem(uri); 243 } 244 245 if (mContinuation != NULL) { 246 CHECK_GT(mContinuationCounter, 0); 247 if (--mContinuationCounter == 0) { 248 mContinuation->post(); 249 } 250 } 251 break; 252 } 253 254 case PlaylistFetcher::kWhatDurationUpdate: 255 { 256 AString uri; 257 CHECK(msg->findString("uri", &uri)); 258 259 int64_t durationUs; 260 CHECK(msg->findInt64("durationUs", &durationUs)); 261 262 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 263 info->mDurationUs = durationUs; 264 break; 265 } 266 267 case PlaylistFetcher::kWhatError: 268 { 269 status_t err; 270 CHECK(msg->findInt32("err", &err)); 271 272 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 273 274 if (mInPreparationPhase) { 275 postPrepared(err); 276 } 277 278 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 279 280 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 281 282 mPacketSources.valueFor( 283 STREAMTYPE_SUBTITLES)->signalEOS(err); 284 285 sp<AMessage> notify = mNotify->dup(); 286 notify->setInt32("what", kWhatError); 287 notify->setInt32("err", err); 288 notify->post(); 289 break; 290 } 291 292 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 293 { 294 AString uri; 295 CHECK(msg->findString("uri", &uri)); 296 297 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 298 info->mIsPrepared = true; 299 300 if (mInPreparationPhase) { 301 bool allFetchersPrepared = true; 302 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 303 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 304 allFetchersPrepared = false; 305 break; 306 } 307 } 308 309 if (allFetchersPrepared) { 310 postPrepared(OK); 311 } 312 } 313 break; 314 } 315 316 default: 317 TRESPASS(); 318 } 319 320 break; 321 } 322 323 case kWhatCheckBandwidth: 324 { 325 int32_t generation; 326 CHECK(msg->findInt32("generation", &generation)); 327 328 if (generation != mCheckBandwidthGeneration) { 329 break; 330 } 331 332 onCheckBandwidth(); 333 break; 334 } 335 336 case kWhatChangeConfiguration: 337 { 338 onChangeConfiguration(msg); 339 break; 340 } 341 342 case kWhatChangeConfiguration2: 343 { 344 onChangeConfiguration2(msg); 345 break; 346 } 347 348 case kWhatChangeConfiguration3: 349 { 350 onChangeConfiguration3(msg); 351 break; 352 } 353 354 case kWhatFinishDisconnect2: 355 { 356 onFinishDisconnect2(); 357 break; 358 } 359 360 default: 361 TRESPASS(); 362 break; 363 } 364} 365 366// static 367int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 368 if (a->mBandwidth < b->mBandwidth) { 369 return -1; 370 } else if (a->mBandwidth == b->mBandwidth) { 371 return 0; 372 } 373 374 return 1; 375} 376 377void LiveSession::onConnect(const sp<AMessage> &msg) { 378 AString url; 379 CHECK(msg->findString("url", &url)); 380 381 KeyedVector<String8, String8> *headers = NULL; 382 if (!msg->findPointer("headers", (void **)&headers)) { 383 mExtraHeaders.clear(); 384 } else { 385 mExtraHeaders = *headers; 386 387 delete headers; 388 headers = NULL; 389 } 390 391#if 1 392 ALOGI("onConnect <URL suppressed>"); 393#else 394 ALOGI("onConnect %s", url.c_str()); 395#endif 396 397 mMasterURL = url; 398 399 bool dummy; 400 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 401 402 if (mPlaylist == NULL) { 403 ALOGE("unable to fetch master playlist <URL suppressed>."); 404 405 postPrepared(ERROR_IO); 406 return; 407 } 408 409 // We trust the content provider to make a reasonable choice of preferred 410 // initial bandwidth by listing it first in the variant playlist. 411 // At startup we really don't have a good estimate on the available 412 // network bandwidth since we haven't tranferred any data yet. Once 413 // we have we can make a better informed choice. 414 size_t initialBandwidth = 0; 415 size_t initialBandwidthIndex = 0; 416 417 if (mPlaylist->isVariantPlaylist()) { 418 for (size_t i = 0; i < mPlaylist->size(); ++i) { 419 BandwidthItem item; 420 421 item.mPlaylistIndex = i; 422 423 sp<AMessage> meta; 424 AString uri; 425 mPlaylist->itemAt(i, &uri, &meta); 426 427 unsigned long bandwidth; 428 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 429 430 if (initialBandwidth == 0) { 431 initialBandwidth = item.mBandwidth; 432 } 433 434 mBandwidthItems.push(item); 435 } 436 437 CHECK_GT(mBandwidthItems.size(), 0u); 438 439 mBandwidthItems.sort(SortByBandwidth); 440 441 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 442 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 443 initialBandwidthIndex = i; 444 break; 445 } 446 } 447 } else { 448 // dummy item. 449 BandwidthItem item; 450 item.mPlaylistIndex = 0; 451 item.mBandwidth = 0; 452 mBandwidthItems.push(item); 453 } 454 455 changeConfiguration( 456 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); 457} 458 459void LiveSession::finishDisconnect() { 460 // No reconfiguration is currently pending, make sure none will trigger 461 // during disconnection either. 462 cancelCheckBandwidthEvent(); 463 464 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 465 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 466 } 467 468 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 469 470 mContinuationCounter = mFetcherInfos.size(); 471 mContinuation = msg; 472 473 if (mContinuationCounter == 0) { 474 msg->post(); 475 } 476} 477 478void LiveSession::onFinishDisconnect2() { 479 mContinuation.clear(); 480 481 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 482 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 483 484 mPacketSources.valueFor( 485 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 486 487 sp<AMessage> response = new AMessage; 488 response->setInt32("err", OK); 489 490 response->postReply(mDisconnectReplyID); 491 mDisconnectReplyID = 0; 492} 493 494sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 495 ssize_t index = mFetcherInfos.indexOfKey(uri); 496 497 if (index >= 0) { 498 return NULL; 499 } 500 501 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 502 notify->setString("uri", uri); 503 504 FetcherInfo info; 505 info.mFetcher = new PlaylistFetcher(notify, this, uri); 506 info.mDurationUs = -1ll; 507 info.mIsPrepared = false; 508 looper()->registerHandler(info.mFetcher); 509 510 mFetcherInfos.add(uri, info); 511 512 return info.mFetcher; 513} 514 515status_t LiveSession::fetchFile( 516 const char *url, sp<ABuffer> *out, 517 int64_t range_offset, int64_t range_length, 518 String8 *actualUrl) { 519 *out = NULL; 520 521 sp<DataSource> source; 522 523 if (!strncasecmp(url, "file://", 7)) { 524 source = new FileSource(url + 7); 525 } else if (strncasecmp(url, "http://", 7) 526 && strncasecmp(url, "https://", 8)) { 527 return ERROR_UNSUPPORTED; 528 } else { 529 KeyedVector<String8, String8> headers = mExtraHeaders; 530 if (range_offset > 0 || range_length >= 0) { 531 headers.add( 532 String8("Range"), 533 String8( 534 StringPrintf( 535 "bytes=%lld-%s", 536 range_offset, 537 range_length < 0 538 ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str())); 539 } 540 status_t err = mHTTPDataSource->connect(url, &headers); 541 542 if (err != OK) { 543 return err; 544 } 545 546 source = mHTTPDataSource; 547 } 548 549 off64_t size; 550 status_t err = source->getSize(&size); 551 552 if (err != OK) { 553 size = 65536; 554 } 555 556 sp<ABuffer> buffer = new ABuffer(size); 557 buffer->setRange(0, 0); 558 559 for (;;) { 560 size_t bufferRemaining = buffer->capacity() - buffer->size(); 561 562 if (bufferRemaining == 0) { 563 bufferRemaining = 32768; 564 565 ALOGV("increasing download buffer to %d bytes", 566 buffer->size() + bufferRemaining); 567 568 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 569 memcpy(copy->data(), buffer->data(), buffer->size()); 570 copy->setRange(0, buffer->size()); 571 572 buffer = copy; 573 } 574 575 size_t maxBytesToRead = bufferRemaining; 576 if (range_length >= 0) { 577 int64_t bytesLeftInRange = range_length - buffer->size(); 578 if (bytesLeftInRange < maxBytesToRead) { 579 maxBytesToRead = bytesLeftInRange; 580 581 if (bytesLeftInRange == 0) { 582 break; 583 } 584 } 585 } 586 587 ssize_t n = source->readAt( 588 buffer->size(), buffer->data() + buffer->size(), 589 maxBytesToRead); 590 591 if (n < 0) { 592 return n; 593 } 594 595 if (n == 0) { 596 break; 597 } 598 599 buffer->setRange(0, buffer->size() + (size_t)n); 600 } 601 602 *out = buffer; 603 if (actualUrl != NULL) { 604 *actualUrl = source->getUri(); 605 if (actualUrl->isEmpty()) { 606 *actualUrl = url; 607 } 608 } 609 610 return OK; 611} 612 613sp<M3UParser> LiveSession::fetchPlaylist( 614 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 615 ALOGV("fetchPlaylist '%s'", url); 616 617 *unchanged = false; 618 619 sp<ABuffer> buffer; 620 String8 actualUrl; 621 status_t err = fetchFile(url, &buffer, 0, -1, &actualUrl); 622 623 if (err != OK) { 624 return NULL; 625 } 626 627 // MD5 functionality is not available on the simulator, treat all 628 // playlists as changed. 629 630#if defined(HAVE_ANDROID_OS) 631 uint8_t hash[16]; 632 633 MD5_CTX m; 634 MD5_Init(&m); 635 MD5_Update(&m, buffer->data(), buffer->size()); 636 637 MD5_Final(hash, &m); 638 639 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 640 // playlist unchanged 641 *unchanged = true; 642 643 return NULL; 644 } 645 646 if (curPlaylistHash != NULL) { 647 memcpy(curPlaylistHash, hash, sizeof(hash)); 648 } 649#endif 650 651 sp<M3UParser> playlist = 652 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 653 654 if (playlist->initCheck() != OK) { 655 ALOGE("failed to parse .m3u8 playlist"); 656 657 return NULL; 658 } 659 660 return playlist; 661} 662 663static double uniformRand() { 664 return (double)rand() / RAND_MAX; 665} 666 667size_t LiveSession::getBandwidthIndex() { 668 if (mBandwidthItems.size() == 0) { 669 return 0; 670 } 671 672#if 1 673 char value[PROPERTY_VALUE_MAX]; 674 ssize_t index = -1; 675 if (property_get("media.httplive.bw-index", value, NULL)) { 676 char *end; 677 index = strtol(value, &end, 10); 678 CHECK(end > value && *end == '\0'); 679 680 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 681 index = mBandwidthItems.size() - 1; 682 } 683 } 684 685 if (index < 0) { 686 int32_t bandwidthBps; 687 if (mHTTPDataSource != NULL 688 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 689 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 690 } else { 691 ALOGV("no bandwidth estimate."); 692 return 0; // Pick the lowest bandwidth stream by default. 693 } 694 695 char value[PROPERTY_VALUE_MAX]; 696 if (property_get("media.httplive.max-bw", value, NULL)) { 697 char *end; 698 long maxBw = strtoul(value, &end, 10); 699 if (end > value && *end == '\0') { 700 if (maxBw > 0 && bandwidthBps > maxBw) { 701 ALOGV("bandwidth capped to %ld bps", maxBw); 702 bandwidthBps = maxBw; 703 } 704 } 705 } 706 707 // Consider only 80% of the available bandwidth usable. 708 bandwidthBps = (bandwidthBps * 8) / 10; 709 710 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 711 712 index = mBandwidthItems.size() - 1; 713 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 714 > (size_t)bandwidthBps) { 715 --index; 716 } 717 } 718#elif 0 719 // Change bandwidth at random() 720 size_t index = uniformRand() * mBandwidthItems.size(); 721#elif 0 722 // There's a 50% chance to stay on the current bandwidth and 723 // a 50% chance to switch to the next higher bandwidth (wrapping around 724 // to lowest) 725 const size_t kMinIndex = 0; 726 727 static ssize_t mPrevBandwidthIndex = -1; 728 729 size_t index; 730 if (mPrevBandwidthIndex < 0) { 731 index = kMinIndex; 732 } else if (uniformRand() < 0.5) { 733 index = (size_t)mPrevBandwidthIndex; 734 } else { 735 index = mPrevBandwidthIndex + 1; 736 if (index == mBandwidthItems.size()) { 737 index = kMinIndex; 738 } 739 } 740 mPrevBandwidthIndex = index; 741#elif 0 742 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 743 744 size_t index = mBandwidthItems.size() - 1; 745 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 746 --index; 747 } 748#elif 1 749 char value[PROPERTY_VALUE_MAX]; 750 size_t index; 751 if (property_get("media.httplive.bw-index", value, NULL)) { 752 char *end; 753 index = strtoul(value, &end, 10); 754 CHECK(end > value && *end == '\0'); 755 756 if (index >= mBandwidthItems.size()) { 757 index = mBandwidthItems.size() - 1; 758 } 759 } else { 760 index = 0; 761 } 762#else 763 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 764#endif 765 766 CHECK_GE(index, 0); 767 768 return index; 769} 770 771status_t LiveSession::onSeek(const sp<AMessage> &msg) { 772 int64_t timeUs; 773 CHECK(msg->findInt64("timeUs", &timeUs)); 774 775 if (!mReconfigurationInProgress) { 776 changeConfiguration(timeUs, getBandwidthIndex()); 777 } 778 779 return OK; 780} 781 782status_t LiveSession::getDuration(int64_t *durationUs) const { 783 int64_t maxDurationUs = 0ll; 784 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 785 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 786 787 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 788 maxDurationUs = fetcherDurationUs; 789 } 790 } 791 792 *durationUs = maxDurationUs; 793 794 return OK; 795} 796 797bool LiveSession::isSeekable() const { 798 int64_t durationUs; 799 return getDuration(&durationUs) == OK && durationUs >= 0; 800} 801 802bool LiveSession::hasDynamicDuration() const { 803 return false; 804} 805 806status_t LiveSession::getTrackInfo(Parcel *reply) const { 807 return mPlaylist->getTrackInfo(reply); 808} 809 810status_t LiveSession::selectTrack(size_t index, bool select) { 811 status_t err = mPlaylist->selectTrack(index, select); 812 if (err == OK) { 813 (new AMessage(kWhatChangeConfiguration, id()))->post(); 814 } 815 return err; 816} 817 818void LiveSession::changeConfiguration( 819 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 820 CHECK(!mReconfigurationInProgress); 821 mReconfigurationInProgress = true; 822 823 mPrevBandwidthIndex = bandwidthIndex; 824 825 ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d", 826 timeUs, bandwidthIndex, pickTrack); 827 828 if (pickTrack) { 829 mPlaylist->pickRandomMediaItems(); 830 } 831 832 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 833 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 834 835 uint32_t streamMask = 0; 836 837 AString audioURI; 838 if (mPlaylist->getAudioURI(item.mPlaylistIndex, &audioURI)) { 839 streamMask |= STREAMTYPE_AUDIO; 840 } 841 842 AString videoURI; 843 if (mPlaylist->getVideoURI(item.mPlaylistIndex, &videoURI)) { 844 streamMask |= STREAMTYPE_VIDEO; 845 } 846 847 AString subtitleURI; 848 if (mPlaylist->getSubtitleURI(item.mPlaylistIndex, &subtitleURI)) { 849 streamMask |= STREAMTYPE_SUBTITLES; 850 } 851 852 // Step 1, stop and discard fetchers that are no longer needed. 853 // Pause those that we'll reuse. 854 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 855 const AString &uri = mFetcherInfos.keyAt(i); 856 857 bool discardFetcher = true; 858 859 // If we're seeking all current fetchers are discarded. 860 if (timeUs < 0ll) { 861 if (((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) 862 || ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) 863 || ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI)) { 864 discardFetcher = false; 865 } 866 } 867 868 if (discardFetcher) { 869 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 870 } else { 871 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 872 } 873 } 874 875 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id()); 876 msg->setInt32("streamMask", streamMask); 877 msg->setInt64("timeUs", timeUs); 878 if (streamMask & STREAMTYPE_AUDIO) { 879 msg->setString("audioURI", audioURI.c_str()); 880 } 881 if (streamMask & STREAMTYPE_VIDEO) { 882 msg->setString("videoURI", videoURI.c_str()); 883 } 884 if (streamMask & STREAMTYPE_SUBTITLES) { 885 msg->setString("subtitleURI", subtitleURI.c_str()); 886 } 887 888 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 889 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 890 // fetchers have completed their asynchronous operation, we'll post 891 // mContinuation, which then is handled below in onChangeConfiguration2. 892 mContinuationCounter = mFetcherInfos.size(); 893 mContinuation = msg; 894 895 if (mContinuationCounter == 0) { 896 msg->post(); 897 } 898} 899 900void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 901 if (!mReconfigurationInProgress) { 902 changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); 903 } else { 904 msg->post(1000000ll); // retry in 1 sec 905 } 906} 907 908void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 909 mContinuation.clear(); 910 911 // All fetchers are either suspended or have been removed now. 912 913 uint32_t streamMask; 914 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 915 916 AString audioURI, videoURI, subtitleURI; 917 if (streamMask & STREAMTYPE_AUDIO) { 918 CHECK(msg->findString("audioURI", &audioURI)); 919 ALOGV("audioURI = '%s'", audioURI.c_str()); 920 } 921 if (streamMask & STREAMTYPE_VIDEO) { 922 CHECK(msg->findString("videoURI", &videoURI)); 923 ALOGV("videoURI = '%s'", videoURI.c_str()); 924 } 925 if (streamMask & STREAMTYPE_SUBTITLES) { 926 CHECK(msg->findString("subtitleURI", &subtitleURI)); 927 ALOGV("subtitleURI = '%s'", subtitleURI.c_str()); 928 } 929 930 // Determine which decoders to shutdown on the player side, 931 // a decoder has to be shutdown if either 932 // 1) its streamtype was active before but now longer isn't. 933 // or 934 // 2) its streamtype was already active and still is but the URI 935 // has changed. 936 uint32_t changedMask = 0; 937 if (((mStreamMask & streamMask & STREAMTYPE_AUDIO) 938 && !(audioURI == mAudioURI)) 939 || (mStreamMask & ~streamMask & STREAMTYPE_AUDIO)) { 940 changedMask |= STREAMTYPE_AUDIO; 941 } 942 if (((mStreamMask & streamMask & STREAMTYPE_VIDEO) 943 && !(videoURI == mVideoURI)) 944 || (mStreamMask & ~streamMask & STREAMTYPE_VIDEO)) { 945 changedMask |= STREAMTYPE_VIDEO; 946 } 947 948 if (changedMask == 0) { 949 // If nothing changed as far as the audio/video decoders 950 // are concerned we can proceed. 951 onChangeConfiguration3(msg); 952 return; 953 } 954 955 // Something changed, inform the player which will shutdown the 956 // corresponding decoders and will post the reply once that's done. 957 // Handling the reply will continue executing below in 958 // onChangeConfiguration3. 959 sp<AMessage> notify = mNotify->dup(); 960 notify->setInt32("what", kWhatStreamsChanged); 961 notify->setInt32("changedMask", changedMask); 962 963 msg->setWhat(kWhatChangeConfiguration3); 964 msg->setTarget(id()); 965 966 notify->setMessage("reply", msg); 967 notify->post(); 968} 969 970void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 971 // All remaining fetchers are still suspended, the player has shutdown 972 // any decoders that needed it. 973 974 uint32_t streamMask; 975 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 976 977 AString audioURI, videoURI, subtitleURI; 978 if (streamMask & STREAMTYPE_AUDIO) { 979 CHECK(msg->findString("audioURI", &audioURI)); 980 } 981 if (streamMask & STREAMTYPE_VIDEO) { 982 CHECK(msg->findString("videoURI", &videoURI)); 983 } 984 if (streamMask & STREAMTYPE_SUBTITLES) { 985 CHECK(msg->findString("subtitleURI", &subtitleURI)); 986 } 987 988 int64_t timeUs; 989 CHECK(msg->findInt64("timeUs", &timeUs)); 990 991 if (timeUs < 0ll) { 992 timeUs = mLastDequeuedTimeUs; 993 } 994 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 995 996 mStreamMask = streamMask; 997 mAudioURI = audioURI; 998 mVideoURI = videoURI; 999 mSubtitleURI = subtitleURI; 1000 1001 // Resume all existing fetchers and assign them packet sources. 1002 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1003 const AString &uri = mFetcherInfos.keyAt(i); 1004 1005 uint32_t resumeMask = 0; 1006 1007 sp<AnotherPacketSource> audioSource; 1008 if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) { 1009 audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 1010 resumeMask |= STREAMTYPE_AUDIO; 1011 } 1012 1013 sp<AnotherPacketSource> videoSource; 1014 if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) { 1015 videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 1016 resumeMask |= STREAMTYPE_VIDEO; 1017 } 1018 1019 sp<AnotherPacketSource> subtitleSource; 1020 if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) { 1021 subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES); 1022 resumeMask |= STREAMTYPE_SUBTITLES; 1023 } 1024 1025 CHECK_NE(resumeMask, 0u); 1026 1027 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1028 1029 streamMask &= ~resumeMask; 1030 1031 mFetcherInfos.valueAt(i).mFetcher->startAsync( 1032 audioSource, videoSource, subtitleSource); 1033 } 1034 1035 // streamMask now only contains the types that need a new fetcher created. 1036 1037 if (streamMask != 0) { 1038 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1039 } 1040 1041 while (streamMask != 0) { 1042 StreamType streamType = (StreamType)(streamMask & ~(streamMask - 1)); 1043 1044 AString uri; 1045 switch (streamType) { 1046 case STREAMTYPE_AUDIO: 1047 uri = audioURI; 1048 break; 1049 case STREAMTYPE_VIDEO: 1050 uri = videoURI; 1051 break; 1052 case STREAMTYPE_SUBTITLES: 1053 uri = subtitleURI; 1054 break; 1055 default: 1056 TRESPASS(); 1057 } 1058 1059 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1060 CHECK(fetcher != NULL); 1061 1062 sp<AnotherPacketSource> audioSource; 1063 if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) { 1064 audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 1065 audioSource->clear(); 1066 1067 streamMask &= ~STREAMTYPE_AUDIO; 1068 } 1069 1070 sp<AnotherPacketSource> videoSource; 1071 if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) { 1072 videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 1073 videoSource->clear(); 1074 1075 streamMask &= ~STREAMTYPE_VIDEO; 1076 } 1077 1078 sp<AnotherPacketSource> subtitleSource; 1079 if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) { 1080 subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES); 1081 subtitleSource->clear(); 1082 1083 streamMask &= ~STREAMTYPE_SUBTITLES; 1084 } 1085 1086 fetcher->startAsync(audioSource, videoSource, subtitleSource, timeUs); 1087 } 1088 1089 // All fetchers have now been started, the configuration change 1090 // has completed. 1091 1092 scheduleCheckBandwidthEvent(); 1093 1094 ALOGV("XXX configuration change completed."); 1095 1096 mReconfigurationInProgress = false; 1097 1098 if (mDisconnectReplyID != 0) { 1099 finishDisconnect(); 1100 } 1101} 1102 1103void LiveSession::scheduleCheckBandwidthEvent() { 1104 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1105 msg->setInt32("generation", mCheckBandwidthGeneration); 1106 msg->post(10000000ll); 1107} 1108 1109void LiveSession::cancelCheckBandwidthEvent() { 1110 ++mCheckBandwidthGeneration; 1111} 1112 1113void LiveSession::onCheckBandwidth() { 1114 if (mReconfigurationInProgress) { 1115 scheduleCheckBandwidthEvent(); 1116 return; 1117 } 1118 1119 size_t bandwidthIndex = getBandwidthIndex(); 1120 if (mPrevBandwidthIndex < 0 1121 || bandwidthIndex != (size_t)mPrevBandwidthIndex) { 1122 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1123 } 1124 1125 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1126 // schedule another one on return, only an explicit call to 1127 // scheduleCheckBandwidthEvent will do that. 1128 // This ensures that only one configuration change is ongoing at any 1129 // one time, once that completes it'll schedule another check bandwidth 1130 // event. 1131} 1132 1133void LiveSession::postPrepared(status_t err) { 1134 CHECK(mInPreparationPhase); 1135 1136 sp<AMessage> notify = mNotify->dup(); 1137 if (err == OK || err == ERROR_END_OF_STREAM) { 1138 notify->setInt32("what", kWhatPrepared); 1139 } else { 1140 notify->setInt32("what", kWhatPreparationFailed); 1141 notify->setInt32("err", err); 1142 } 1143 1144 notify->post(); 1145 1146 mInPreparationPhase = false; 1147} 1148 1149} // namespace android 1150 1151