LiveSession.cpp revision f6d0c1fd6d9e697bb3a891fae14c7e9d4b685de6
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/stagefright/foundation/hexdump.h> 31#include <media/stagefright/foundation/ABuffer.h> 32#include <media/stagefright/foundation/ADebug.h> 33#include <media/stagefright/foundation/AMessage.h> 34#include <media/stagefright/DataSource.h> 35#include <media/stagefright/FileSource.h> 36#include <media/stagefright/MediaErrors.h> 37#include <media/stagefright/MetaData.h> 38#include <media/stagefright/Utils.h> 39 40#include <ctype.h> 41#include <inttypes.h> 42#include <openssl/aes.h> 43#include <openssl/md5.h> 44 45namespace android { 46 47LiveSession::LiveSession( 48 const sp<AMessage> ¬ify, uint32_t flags, bool uidValid, uid_t uid) 49 : mNotify(notify), 50 mFlags(flags), 51 mUIDValid(uidValid), 52 mUID(uid), 53 mInPreparationPhase(true), 54 mHTTPDataSource( 55 HTTPBase::Create( 56 (mFlags & kFlagIncognito) 57 ? HTTPBase::kFlagIncognito 58 : 0)), 59 mPrevBandwidthIndex(-1), 60 mStreamMask(0), 61 mCheckBandwidthGeneration(0), 62 mLastDequeuedTimeUs(0ll), 63 mRealTimeBaseUs(0ll), 64 mReconfigurationInProgress(false), 65 mDisconnectReplyID(0) { 66 if (mUIDValid) { 67 mHTTPDataSource->setUID(mUID); 68 } 69 70 mPacketSources.add( 71 STREAMTYPE_AUDIO, new AnotherPacketSource(NULL /* meta */)); 72 73 mPacketSources.add( 74 STREAMTYPE_VIDEO, new AnotherPacketSource(NULL /* meta */)); 75 76 mPacketSources.add( 77 STREAMTYPE_SUBTITLES, new AnotherPacketSource(NULL /* meta */)); 78} 79 80LiveSession::~LiveSession() { 81} 82 83status_t LiveSession::dequeueAccessUnit( 84 StreamType stream, sp<ABuffer> *accessUnit) { 85 if (!(mStreamMask & stream)) { 86 return UNKNOWN_ERROR; 87 } 88 89 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 90 91 status_t finalResult; 92 if (!packetSource->hasBufferAvailable(&finalResult)) { 93 return finalResult == OK ? -EAGAIN : finalResult; 94 } 95 96 status_t err = packetSource->dequeueAccessUnit(accessUnit); 97 98 const char *streamStr; 99 switch (stream) { 100 case STREAMTYPE_AUDIO: 101 streamStr = "audio"; 102 break; 103 case STREAMTYPE_VIDEO: 104 streamStr = "video"; 105 break; 106 case STREAMTYPE_SUBTITLES: 107 streamStr = "subs"; 108 break; 109 default: 110 TRESPASS(); 111 } 112 113 if (err == INFO_DISCONTINUITY) { 114 int32_t type; 115 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 116 117 sp<AMessage> extra; 118 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 119 extra.clear(); 120 } 121 122 ALOGI("[%s] read discontinuity of type %d, extra = %s", 123 streamStr, 124 type, 125 extra == NULL ? "NULL" : extra->debugString().c_str()); 126 } else if (err == OK) { 127 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 128 int64_t timeUs; 129 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 130 ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs); 131 132 mLastDequeuedTimeUs = timeUs; 133 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 134 } else if (stream == STREAMTYPE_SUBTITLES) { 135 (*accessUnit)->meta()->setInt32( 136 "trackIndex", mPlaylist->getSelectedIndex()); 137 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 138 } 139 } else { 140 ALOGI("[%s] encountered error %d", streamStr, err); 141 } 142 143 return err; 144} 145 146status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 147 if (!(mStreamMask & stream)) { 148 return UNKNOWN_ERROR; 149 } 150 151 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 152 153 sp<MetaData> meta = packetSource->getFormat(); 154 155 if (meta == NULL) { 156 return -EAGAIN; 157 } 158 159 return convertMetaDataToMessage(meta, format); 160} 161 162void LiveSession::connectAsync( 163 const char *url, const KeyedVector<String8, String8> *headers) { 164 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 165 msg->setString("url", url); 166 167 if (headers != NULL) { 168 msg->setPointer( 169 "headers", 170 new KeyedVector<String8, String8>(*headers)); 171 } 172 173 msg->post(); 174} 175 176status_t LiveSession::disconnect() { 177 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 178 179 sp<AMessage> response; 180 status_t err = msg->postAndAwaitResponse(&response); 181 182 return err; 183} 184 185status_t LiveSession::seekTo(int64_t timeUs) { 186 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 187 msg->setInt64("timeUs", timeUs); 188 189 sp<AMessage> response; 190 status_t err = msg->postAndAwaitResponse(&response); 191 192 return err; 193} 194 195void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 196 switch (msg->what()) { 197 case kWhatConnect: 198 { 199 onConnect(msg); 200 break; 201 } 202 203 case kWhatDisconnect: 204 { 205 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 206 207 if (mReconfigurationInProgress) { 208 break; 209 } 210 211 finishDisconnect(); 212 break; 213 } 214 215 case kWhatSeek: 216 { 217 uint32_t replyID; 218 CHECK(msg->senderAwaitsResponse(&replyID)); 219 220 status_t err = onSeek(msg); 221 222 sp<AMessage> response = new AMessage; 223 response->setInt32("err", err); 224 225 response->postReply(replyID); 226 break; 227 } 228 229 case kWhatFetcherNotify: 230 { 231 int32_t what; 232 CHECK(msg->findInt32("what", &what)); 233 234 switch (what) { 235 case PlaylistFetcher::kWhatStarted: 236 break; 237 case PlaylistFetcher::kWhatPaused: 238 case PlaylistFetcher::kWhatStopped: 239 { 240 if (what == PlaylistFetcher::kWhatStopped) { 241 AString uri; 242 CHECK(msg->findString("uri", &uri)); 243 mFetcherInfos.removeItem(uri); 244 } 245 246 if (mContinuation != NULL) { 247 CHECK_GT(mContinuationCounter, 0); 248 if (--mContinuationCounter == 0) { 249 mContinuation->post(); 250 } 251 } 252 break; 253 } 254 255 case PlaylistFetcher::kWhatDurationUpdate: 256 { 257 AString uri; 258 CHECK(msg->findString("uri", &uri)); 259 260 int64_t durationUs; 261 CHECK(msg->findInt64("durationUs", &durationUs)); 262 263 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 264 info->mDurationUs = durationUs; 265 break; 266 } 267 268 case PlaylistFetcher::kWhatError: 269 { 270 status_t err; 271 CHECK(msg->findInt32("err", &err)); 272 273 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 274 275 if (mInPreparationPhase) { 276 postPrepared(err); 277 } 278 279 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 280 281 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 282 283 mPacketSources.valueFor( 284 STREAMTYPE_SUBTITLES)->signalEOS(err); 285 286 sp<AMessage> notify = mNotify->dup(); 287 notify->setInt32("what", kWhatError); 288 notify->setInt32("err", err); 289 notify->post(); 290 break; 291 } 292 293 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 294 { 295 AString uri; 296 CHECK(msg->findString("uri", &uri)); 297 298 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 299 info->mIsPrepared = true; 300 301 if (mInPreparationPhase) { 302 bool allFetchersPrepared = true; 303 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 304 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 305 allFetchersPrepared = false; 306 break; 307 } 308 } 309 310 if (allFetchersPrepared) { 311 postPrepared(OK); 312 } 313 } 314 break; 315 } 316 317 default: 318 TRESPASS(); 319 } 320 321 break; 322 } 323 324 case kWhatCheckBandwidth: 325 { 326 int32_t generation; 327 CHECK(msg->findInt32("generation", &generation)); 328 329 if (generation != mCheckBandwidthGeneration) { 330 break; 331 } 332 333 onCheckBandwidth(); 334 break; 335 } 336 337 case kWhatChangeConfiguration: 338 { 339 onChangeConfiguration(msg); 340 break; 341 } 342 343 case kWhatChangeConfiguration2: 344 { 345 onChangeConfiguration2(msg); 346 break; 347 } 348 349 case kWhatChangeConfiguration3: 350 { 351 onChangeConfiguration3(msg); 352 break; 353 } 354 355 case kWhatFinishDisconnect2: 356 { 357 onFinishDisconnect2(); 358 break; 359 } 360 361 default: 362 TRESPASS(); 363 break; 364 } 365} 366 367// static 368int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 369 if (a->mBandwidth < b->mBandwidth) { 370 return -1; 371 } else if (a->mBandwidth == b->mBandwidth) { 372 return 0; 373 } 374 375 return 1; 376} 377 378void LiveSession::onConnect(const sp<AMessage> &msg) { 379 AString url; 380 CHECK(msg->findString("url", &url)); 381 382 KeyedVector<String8, String8> *headers = NULL; 383 if (!msg->findPointer("headers", (void **)&headers)) { 384 mExtraHeaders.clear(); 385 } else { 386 mExtraHeaders = *headers; 387 388 delete headers; 389 headers = NULL; 390 } 391 392#if 1 393 ALOGI("onConnect <URL suppressed>"); 394#else 395 ALOGI("onConnect %s", url.c_str()); 396#endif 397 398 mMasterURL = url; 399 400 bool dummy; 401 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 402 403 if (mPlaylist == NULL) { 404 ALOGE("unable to fetch master playlist <URL suppressed>."); 405 406 postPrepared(ERROR_IO); 407 return; 408 } 409 410 // We trust the content provider to make a reasonable choice of preferred 411 // initial bandwidth by listing it first in the variant playlist. 412 // At startup we really don't have a good estimate on the available 413 // network bandwidth since we haven't tranferred any data yet. Once 414 // we have we can make a better informed choice. 415 size_t initialBandwidth = 0; 416 size_t initialBandwidthIndex = 0; 417 418 if (mPlaylist->isVariantPlaylist()) { 419 for (size_t i = 0; i < mPlaylist->size(); ++i) { 420 BandwidthItem item; 421 422 item.mPlaylistIndex = i; 423 424 sp<AMessage> meta; 425 AString uri; 426 mPlaylist->itemAt(i, &uri, &meta); 427 428 unsigned long bandwidth; 429 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 430 431 if (initialBandwidth == 0) { 432 initialBandwidth = item.mBandwidth; 433 } 434 435 mBandwidthItems.push(item); 436 } 437 438 CHECK_GT(mBandwidthItems.size(), 0u); 439 440 mBandwidthItems.sort(SortByBandwidth); 441 442 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 443 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 444 initialBandwidthIndex = i; 445 break; 446 } 447 } 448 } else { 449 // dummy item. 450 BandwidthItem item; 451 item.mPlaylistIndex = 0; 452 item.mBandwidth = 0; 453 mBandwidthItems.push(item); 454 } 455 456 changeConfiguration( 457 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); 458} 459 460void LiveSession::finishDisconnect() { 461 // No reconfiguration is currently pending, make sure none will trigger 462 // during disconnection either. 463 cancelCheckBandwidthEvent(); 464 465 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 466 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 467 } 468 469 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 470 471 mContinuationCounter = mFetcherInfos.size(); 472 mContinuation = msg; 473 474 if (mContinuationCounter == 0) { 475 msg->post(); 476 } 477} 478 479void LiveSession::onFinishDisconnect2() { 480 mContinuation.clear(); 481 482 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 483 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 484 485 mPacketSources.valueFor( 486 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 487 488 sp<AMessage> response = new AMessage; 489 response->setInt32("err", OK); 490 491 response->postReply(mDisconnectReplyID); 492 mDisconnectReplyID = 0; 493} 494 495sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 496 ssize_t index = mFetcherInfos.indexOfKey(uri); 497 498 if (index >= 0) { 499 return NULL; 500 } 501 502 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 503 notify->setString("uri", uri); 504 505 FetcherInfo info; 506 info.mFetcher = new PlaylistFetcher(notify, this, uri); 507 info.mDurationUs = -1ll; 508 info.mIsPrepared = false; 509 looper()->registerHandler(info.mFetcher); 510 511 mFetcherInfos.add(uri, info); 512 513 return info.mFetcher; 514} 515 516status_t LiveSession::fetchFile( 517 const char *url, sp<ABuffer> *out, 518 int64_t range_offset, int64_t range_length, 519 String8 *actualUrl) { 520 *out = NULL; 521 522 sp<DataSource> source; 523 524 if (!strncasecmp(url, "file://", 7)) { 525 source = new FileSource(url + 7); 526 } else if (strncasecmp(url, "http://", 7) 527 && strncasecmp(url, "https://", 8)) { 528 return ERROR_UNSUPPORTED; 529 } else { 530 KeyedVector<String8, String8> headers = mExtraHeaders; 531 if (range_offset > 0 || range_length >= 0) { 532 headers.add( 533 String8("Range"), 534 String8( 535 StringPrintf( 536 "bytes=%lld-%s", 537 range_offset, 538 range_length < 0 539 ? "" : StringPrintf("%lld", range_offset + range_length - 1).c_str()).c_str())); 540 } 541 status_t err = mHTTPDataSource->connect(url, &headers); 542 543 if (err != OK) { 544 return err; 545 } 546 547 source = mHTTPDataSource; 548 } 549 550 off64_t size; 551 status_t err = source->getSize(&size); 552 553 if (err != OK) { 554 size = 65536; 555 } 556 557 sp<ABuffer> buffer = new ABuffer(size); 558 buffer->setRange(0, 0); 559 560 for (;;) { 561 size_t bufferRemaining = buffer->capacity() - buffer->size(); 562 563 if (bufferRemaining == 0) { 564 bufferRemaining = 32768; 565 566 ALOGV("increasing download buffer to %zu bytes", 567 buffer->size() + bufferRemaining); 568 569 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 570 memcpy(copy->data(), buffer->data(), buffer->size()); 571 copy->setRange(0, buffer->size()); 572 573 buffer = copy; 574 } 575 576 size_t maxBytesToRead = bufferRemaining; 577 if (range_length >= 0) { 578 int64_t bytesLeftInRange = range_length - buffer->size(); 579 if (bytesLeftInRange < (int64_t)maxBytesToRead) { 580 maxBytesToRead = bytesLeftInRange; 581 582 if (bytesLeftInRange == 0) { 583 break; 584 } 585 } 586 } 587 588 ssize_t n = source->readAt( 589 buffer->size(), buffer->data() + buffer->size(), 590 maxBytesToRead); 591 592 if (n < 0) { 593 return n; 594 } 595 596 if (n == 0) { 597 break; 598 } 599 600 buffer->setRange(0, buffer->size() + (size_t)n); 601 } 602 603 *out = buffer; 604 if (actualUrl != NULL) { 605 *actualUrl = source->getUri(); 606 if (actualUrl->isEmpty()) { 607 *actualUrl = url; 608 } 609 } 610 611 return OK; 612} 613 614sp<M3UParser> LiveSession::fetchPlaylist( 615 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 616 ALOGV("fetchPlaylist '%s'", url); 617 618 *unchanged = false; 619 620 sp<ABuffer> buffer; 621 String8 actualUrl; 622 status_t err = fetchFile(url, &buffer, 0, -1, &actualUrl); 623 624 if (err != OK) { 625 return NULL; 626 } 627 628 // MD5 functionality is not available on the simulator, treat all 629 // playlists as changed. 630 631#if defined(HAVE_ANDROID_OS) 632 uint8_t hash[16]; 633 634 MD5_CTX m; 635 MD5_Init(&m); 636 MD5_Update(&m, buffer->data(), buffer->size()); 637 638 MD5_Final(hash, &m); 639 640 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 641 // playlist unchanged 642 *unchanged = true; 643 644 return NULL; 645 } 646 647 if (curPlaylistHash != NULL) { 648 memcpy(curPlaylistHash, hash, sizeof(hash)); 649 } 650#endif 651 652 sp<M3UParser> playlist = 653 new M3UParser(actualUrl.string(), buffer->data(), buffer->size()); 654 655 if (playlist->initCheck() != OK) { 656 ALOGE("failed to parse .m3u8 playlist"); 657 658 return NULL; 659 } 660 661 return playlist; 662} 663 664static double uniformRand() { 665 return (double)rand() / RAND_MAX; 666} 667 668size_t LiveSession::getBandwidthIndex() { 669 if (mBandwidthItems.size() == 0) { 670 return 0; 671 } 672 673#if 1 674 char value[PROPERTY_VALUE_MAX]; 675 ssize_t index = -1; 676 if (property_get("media.httplive.bw-index", value, NULL)) { 677 char *end; 678 index = strtol(value, &end, 10); 679 CHECK(end > value && *end == '\0'); 680 681 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 682 index = mBandwidthItems.size() - 1; 683 } 684 } 685 686 if (index < 0) { 687 int32_t bandwidthBps; 688 if (mHTTPDataSource != NULL 689 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 690 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 691 } else { 692 ALOGV("no bandwidth estimate."); 693 return 0; // Pick the lowest bandwidth stream by default. 694 } 695 696 char value[PROPERTY_VALUE_MAX]; 697 if (property_get("media.httplive.max-bw", value, NULL)) { 698 char *end; 699 long maxBw = strtoul(value, &end, 10); 700 if (end > value && *end == '\0') { 701 if (maxBw > 0 && bandwidthBps > maxBw) { 702 ALOGV("bandwidth capped to %ld bps", maxBw); 703 bandwidthBps = maxBw; 704 } 705 } 706 } 707 708 // Consider only 80% of the available bandwidth usable. 709 bandwidthBps = (bandwidthBps * 8) / 10; 710 711 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 712 713 index = mBandwidthItems.size() - 1; 714 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 715 > (size_t)bandwidthBps) { 716 --index; 717 } 718 } 719#elif 0 720 // Change bandwidth at random() 721 size_t index = uniformRand() * mBandwidthItems.size(); 722#elif 0 723 // There's a 50% chance to stay on the current bandwidth and 724 // a 50% chance to switch to the next higher bandwidth (wrapping around 725 // to lowest) 726 const size_t kMinIndex = 0; 727 728 static ssize_t mPrevBandwidthIndex = -1; 729 730 size_t index; 731 if (mPrevBandwidthIndex < 0) { 732 index = kMinIndex; 733 } else if (uniformRand() < 0.5) { 734 index = (size_t)mPrevBandwidthIndex; 735 } else { 736 index = mPrevBandwidthIndex + 1; 737 if (index == mBandwidthItems.size()) { 738 index = kMinIndex; 739 } 740 } 741 mPrevBandwidthIndex = index; 742#elif 0 743 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 744 745 size_t index = mBandwidthItems.size() - 1; 746 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 747 --index; 748 } 749#elif 1 750 char value[PROPERTY_VALUE_MAX]; 751 size_t index; 752 if (property_get("media.httplive.bw-index", value, NULL)) { 753 char *end; 754 index = strtoul(value, &end, 10); 755 CHECK(end > value && *end == '\0'); 756 757 if (index >= mBandwidthItems.size()) { 758 index = mBandwidthItems.size() - 1; 759 } 760 } else { 761 index = 0; 762 } 763#else 764 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 765#endif 766 767 CHECK_GE(index, 0); 768 769 return index; 770} 771 772status_t LiveSession::onSeek(const sp<AMessage> &msg) { 773 int64_t timeUs; 774 CHECK(msg->findInt64("timeUs", &timeUs)); 775 776 if (!mReconfigurationInProgress) { 777 changeConfiguration(timeUs, getBandwidthIndex()); 778 } 779 780 return OK; 781} 782 783status_t LiveSession::getDuration(int64_t *durationUs) const { 784 int64_t maxDurationUs = 0ll; 785 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 786 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 787 788 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 789 maxDurationUs = fetcherDurationUs; 790 } 791 } 792 793 *durationUs = maxDurationUs; 794 795 return OK; 796} 797 798bool LiveSession::isSeekable() const { 799 int64_t durationUs; 800 return getDuration(&durationUs) == OK && durationUs >= 0; 801} 802 803bool LiveSession::hasDynamicDuration() const { 804 return false; 805} 806 807status_t LiveSession::getTrackInfo(Parcel *reply) const { 808 return mPlaylist->getTrackInfo(reply); 809} 810 811status_t LiveSession::selectTrack(size_t index, bool select) { 812 status_t err = mPlaylist->selectTrack(index, select); 813 if (err == OK) { 814 (new AMessage(kWhatChangeConfiguration, id()))->post(); 815 } 816 return err; 817} 818 819void LiveSession::changeConfiguration( 820 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 821 CHECK(!mReconfigurationInProgress); 822 mReconfigurationInProgress = true; 823 824 mPrevBandwidthIndex = bandwidthIndex; 825 826 ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d", 827 timeUs, bandwidthIndex, pickTrack); 828 829 if (pickTrack) { 830 mPlaylist->pickRandomMediaItems(); 831 } 832 833 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 834 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 835 836 uint32_t streamMask = 0; 837 838 AString audioURI; 839 if (mPlaylist->getAudioURI(item.mPlaylistIndex, &audioURI)) { 840 streamMask |= STREAMTYPE_AUDIO; 841 } 842 843 AString videoURI; 844 if (mPlaylist->getVideoURI(item.mPlaylistIndex, &videoURI)) { 845 streamMask |= STREAMTYPE_VIDEO; 846 } 847 848 AString subtitleURI; 849 if (mPlaylist->getSubtitleURI(item.mPlaylistIndex, &subtitleURI)) { 850 streamMask |= STREAMTYPE_SUBTITLES; 851 } 852 853 // Step 1, stop and discard fetchers that are no longer needed. 854 // Pause those that we'll reuse. 855 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 856 const AString &uri = mFetcherInfos.keyAt(i); 857 858 bool discardFetcher = true; 859 860 // If we're seeking all current fetchers are discarded. 861 if (timeUs < 0ll) { 862 if (((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) 863 || ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) 864 || ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI)) { 865 discardFetcher = false; 866 } 867 } 868 869 if (discardFetcher) { 870 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 871 } else { 872 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 873 } 874 } 875 876 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id()); 877 msg->setInt32("streamMask", streamMask); 878 msg->setInt64("timeUs", timeUs); 879 if (streamMask & STREAMTYPE_AUDIO) { 880 msg->setString("audioURI", audioURI.c_str()); 881 } 882 if (streamMask & STREAMTYPE_VIDEO) { 883 msg->setString("videoURI", videoURI.c_str()); 884 } 885 if (streamMask & STREAMTYPE_SUBTITLES) { 886 msg->setString("subtitleURI", subtitleURI.c_str()); 887 } 888 889 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 890 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 891 // fetchers have completed their asynchronous operation, we'll post 892 // mContinuation, which then is handled below in onChangeConfiguration2. 893 mContinuationCounter = mFetcherInfos.size(); 894 mContinuation = msg; 895 896 if (mContinuationCounter == 0) { 897 msg->post(); 898 } 899} 900 901void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 902 if (!mReconfigurationInProgress) { 903 changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); 904 } else { 905 msg->post(1000000ll); // retry in 1 sec 906 } 907} 908 909void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 910 mContinuation.clear(); 911 912 // All fetchers are either suspended or have been removed now. 913 914 uint32_t streamMask; 915 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 916 917 AString audioURI, videoURI, subtitleURI; 918 if (streamMask & STREAMTYPE_AUDIO) { 919 CHECK(msg->findString("audioURI", &audioURI)); 920 ALOGV("audioURI = '%s'", audioURI.c_str()); 921 } 922 if (streamMask & STREAMTYPE_VIDEO) { 923 CHECK(msg->findString("videoURI", &videoURI)); 924 ALOGV("videoURI = '%s'", videoURI.c_str()); 925 } 926 if (streamMask & STREAMTYPE_SUBTITLES) { 927 CHECK(msg->findString("subtitleURI", &subtitleURI)); 928 ALOGV("subtitleURI = '%s'", subtitleURI.c_str()); 929 } 930 931 // Determine which decoders to shutdown on the player side, 932 // a decoder has to be shutdown if either 933 // 1) its streamtype was active before but now longer isn't. 934 // or 935 // 2) its streamtype was already active and still is but the URI 936 // has changed. 937 uint32_t changedMask = 0; 938 if (((mStreamMask & streamMask & STREAMTYPE_AUDIO) 939 && !(audioURI == mAudioURI)) 940 || (mStreamMask & ~streamMask & STREAMTYPE_AUDIO)) { 941 changedMask |= STREAMTYPE_AUDIO; 942 } 943 if (((mStreamMask & streamMask & STREAMTYPE_VIDEO) 944 && !(videoURI == mVideoURI)) 945 || (mStreamMask & ~streamMask & STREAMTYPE_VIDEO)) { 946 changedMask |= STREAMTYPE_VIDEO; 947 } 948 949 if (changedMask == 0) { 950 // If nothing changed as far as the audio/video decoders 951 // are concerned we can proceed. 952 onChangeConfiguration3(msg); 953 return; 954 } 955 956 // Something changed, inform the player which will shutdown the 957 // corresponding decoders and will post the reply once that's done. 958 // Handling the reply will continue executing below in 959 // onChangeConfiguration3. 960 sp<AMessage> notify = mNotify->dup(); 961 notify->setInt32("what", kWhatStreamsChanged); 962 notify->setInt32("changedMask", changedMask); 963 964 msg->setWhat(kWhatChangeConfiguration3); 965 msg->setTarget(id()); 966 967 notify->setMessage("reply", msg); 968 notify->post(); 969} 970 971void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 972 // All remaining fetchers are still suspended, the player has shutdown 973 // any decoders that needed it. 974 975 uint32_t streamMask; 976 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 977 978 AString audioURI, videoURI, subtitleURI; 979 if (streamMask & STREAMTYPE_AUDIO) { 980 CHECK(msg->findString("audioURI", &audioURI)); 981 } 982 if (streamMask & STREAMTYPE_VIDEO) { 983 CHECK(msg->findString("videoURI", &videoURI)); 984 } 985 if (streamMask & STREAMTYPE_SUBTITLES) { 986 CHECK(msg->findString("subtitleURI", &subtitleURI)); 987 } 988 989 int64_t timeUs; 990 CHECK(msg->findInt64("timeUs", &timeUs)); 991 992 if (timeUs < 0ll) { 993 timeUs = mLastDequeuedTimeUs; 994 } 995 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 996 997 mStreamMask = streamMask; 998 mAudioURI = audioURI; 999 mVideoURI = videoURI; 1000 mSubtitleURI = subtitleURI; 1001 1002 // Resume all existing fetchers and assign them packet sources. 1003 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1004 const AString &uri = mFetcherInfos.keyAt(i); 1005 1006 uint32_t resumeMask = 0; 1007 1008 sp<AnotherPacketSource> audioSource; 1009 if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) { 1010 audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 1011 resumeMask |= STREAMTYPE_AUDIO; 1012 } 1013 1014 sp<AnotherPacketSource> videoSource; 1015 if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) { 1016 videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 1017 resumeMask |= STREAMTYPE_VIDEO; 1018 } 1019 1020 sp<AnotherPacketSource> subtitleSource; 1021 if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) { 1022 subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES); 1023 resumeMask |= STREAMTYPE_SUBTITLES; 1024 } 1025 1026 CHECK_NE(resumeMask, 0u); 1027 1028 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1029 1030 streamMask &= ~resumeMask; 1031 1032 mFetcherInfos.valueAt(i).mFetcher->startAsync( 1033 audioSource, videoSource, subtitleSource); 1034 } 1035 1036 // streamMask now only contains the types that need a new fetcher created. 1037 1038 if (streamMask != 0) { 1039 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1040 } 1041 1042 while (streamMask != 0) { 1043 StreamType streamType = (StreamType)(streamMask & ~(streamMask - 1)); 1044 1045 AString uri; 1046 switch (streamType) { 1047 case STREAMTYPE_AUDIO: 1048 uri = audioURI; 1049 break; 1050 case STREAMTYPE_VIDEO: 1051 uri = videoURI; 1052 break; 1053 case STREAMTYPE_SUBTITLES: 1054 uri = subtitleURI; 1055 break; 1056 default: 1057 TRESPASS(); 1058 } 1059 1060 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1061 CHECK(fetcher != NULL); 1062 1063 sp<AnotherPacketSource> audioSource; 1064 if ((streamMask & STREAMTYPE_AUDIO) && uri == audioURI) { 1065 audioSource = mPacketSources.valueFor(STREAMTYPE_AUDIO); 1066 audioSource->clear(); 1067 1068 streamMask &= ~STREAMTYPE_AUDIO; 1069 } 1070 1071 sp<AnotherPacketSource> videoSource; 1072 if ((streamMask & STREAMTYPE_VIDEO) && uri == videoURI) { 1073 videoSource = mPacketSources.valueFor(STREAMTYPE_VIDEO); 1074 videoSource->clear(); 1075 1076 streamMask &= ~STREAMTYPE_VIDEO; 1077 } 1078 1079 sp<AnotherPacketSource> subtitleSource; 1080 if ((streamMask & STREAMTYPE_SUBTITLES) && uri == subtitleURI) { 1081 subtitleSource = mPacketSources.valueFor(STREAMTYPE_SUBTITLES); 1082 subtitleSource->clear(); 1083 1084 streamMask &= ~STREAMTYPE_SUBTITLES; 1085 } 1086 1087 fetcher->startAsync(audioSource, videoSource, subtitleSource, timeUs); 1088 } 1089 1090 // All fetchers have now been started, the configuration change 1091 // has completed. 1092 1093 scheduleCheckBandwidthEvent(); 1094 1095 ALOGV("XXX configuration change completed."); 1096 1097 mReconfigurationInProgress = false; 1098 1099 if (mDisconnectReplyID != 0) { 1100 finishDisconnect(); 1101 } 1102} 1103 1104void LiveSession::scheduleCheckBandwidthEvent() { 1105 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1106 msg->setInt32("generation", mCheckBandwidthGeneration); 1107 msg->post(10000000ll); 1108} 1109 1110void LiveSession::cancelCheckBandwidthEvent() { 1111 ++mCheckBandwidthGeneration; 1112} 1113 1114void LiveSession::onCheckBandwidth() { 1115 if (mReconfigurationInProgress) { 1116 scheduleCheckBandwidthEvent(); 1117 return; 1118 } 1119 1120 size_t bandwidthIndex = getBandwidthIndex(); 1121 if (mPrevBandwidthIndex < 0 1122 || bandwidthIndex != (size_t)mPrevBandwidthIndex) { 1123 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1124 } 1125 1126 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1127 // schedule another one on return, only an explicit call to 1128 // scheduleCheckBandwidthEvent will do that. 1129 // This ensures that only one configuration change is ongoing at any 1130 // one time, once that completes it'll schedule another check bandwidth 1131 // event. 1132} 1133 1134void LiveSession::postPrepared(status_t err) { 1135 CHECK(mInPreparationPhase); 1136 1137 sp<AMessage> notify = mNotify->dup(); 1138 if (err == OK || err == ERROR_END_OF_STREAM) { 1139 notify->setInt32("what", kWhatPrepared); 1140 } else { 1141 notify->setInt32("what", kWhatPreparationFailed); 1142 notify->setInt32("err", err); 1143 } 1144 1145 notify->post(); 1146 1147 mInPreparationPhase = false; 1148} 1149 1150} // namespace android 1151 1152