LiveSession.cpp revision 6c8495c8f1ccc35db972ee7ac0dbb8baf5843548
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "LiveSession.h" 22 23#include "M3UParser.h" 24#include "PlaylistFetcher.h" 25 26#include "include/HTTPBase.h" 27#include "mpeg2ts/AnotherPacketSource.h" 28 29#include <cutils/properties.h> 30#include <media/stagefright/foundation/hexdump.h> 31#include <media/stagefright/foundation/ABuffer.h> 32#include <media/stagefright/foundation/ADebug.h> 33#include <media/stagefright/foundation/AMessage.h> 34#include <media/stagefright/DataSource.h> 35#include <media/stagefright/FileSource.h> 36#include <media/stagefright/MediaErrors.h> 37#include <media/stagefright/MetaData.h> 38#include <media/stagefright/Utils.h> 39 40#include <ctype.h> 41#include <openssl/aes.h> 42#include <openssl/md5.h> 43 44namespace android { 45 46LiveSession::LiveSession( 47 const sp<AMessage> ¬ify, uint32_t flags, bool uidValid, uid_t uid) 48 : mNotify(notify), 49 mFlags(flags), 50 mUIDValid(uidValid), 51 mUID(uid), 52 mInPreparationPhase(true), 53 mHTTPDataSource( 54 HTTPBase::Create( 55 (mFlags & kFlagIncognito) 56 ? HTTPBase::kFlagIncognito 57 : 0)), 58 mPrevBandwidthIndex(-1), 59 mStreamMask(0), 60 mCheckBandwidthGeneration(0), 61 mLastDequeuedTimeUs(0ll), 62 mRealTimeBaseUs(0ll), 63 mReconfigurationInProgress(false), 64 mDisconnectReplyID(0) { 65 if (mUIDValid) { 66 mHTTPDataSource->setUID(mUID); 67 } 68 69 mStreams[kAudioIndex] = StreamItem("audio"); 70 mStreams[kVideoIndex] = StreamItem("video"); 71 mStreams[kSubtitleIndex] = StreamItem("subtitle"); 72 73 for (size_t i = 0; i < kMaxStreams; ++i) { 74 mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); 75 } 76} 77 78LiveSession::~LiveSession() { 79} 80 81status_t LiveSession::dequeueAccessUnit( 82 StreamType stream, sp<ABuffer> *accessUnit) { 83 if (!(mStreamMask & stream)) { 84 return UNKNOWN_ERROR; 85 } 86 87 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 88 89 status_t finalResult; 90 if (!packetSource->hasBufferAvailable(&finalResult)) { 91 return finalResult == OK ? -EAGAIN : finalResult; 92 } 93 94 status_t err = packetSource->dequeueAccessUnit(accessUnit); 95 96 const char *streamStr; 97 switch (stream) { 98 case STREAMTYPE_AUDIO: 99 streamStr = "audio"; 100 break; 101 case STREAMTYPE_VIDEO: 102 streamStr = "video"; 103 break; 104 case STREAMTYPE_SUBTITLES: 105 streamStr = "subs"; 106 break; 107 default: 108 TRESPASS(); 109 } 110 111 if (err == INFO_DISCONTINUITY) { 112 int32_t type; 113 CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type)); 114 115 sp<AMessage> extra; 116 if (!(*accessUnit)->meta()->findMessage("extra", &extra)) { 117 extra.clear(); 118 } 119 120 ALOGI("[%s] read discontinuity of type %d, extra = %s", 121 streamStr, 122 type, 123 extra == NULL ? "NULL" : extra->debugString().c_str()); 124 } else if (err == OK) { 125 if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { 126 int64_t timeUs; 127 CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs)); 128 ALOGV("[%s] read buffer at time %lld us", streamStr, timeUs); 129 130 mLastDequeuedTimeUs = timeUs; 131 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 132 } else if (stream == STREAMTYPE_SUBTITLES) { 133 (*accessUnit)->meta()->setInt32( 134 "trackIndex", mPlaylist->getSelectedIndex()); 135 (*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs); 136 } 137 } else { 138 ALOGI("[%s] encountered error %d", streamStr, err); 139 } 140 141 return err; 142} 143 144status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { 145 if (!(mStreamMask & stream)) { 146 return UNKNOWN_ERROR; 147 } 148 149 sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); 150 151 sp<MetaData> meta = packetSource->getFormat(); 152 153 if (meta == NULL) { 154 return -EAGAIN; 155 } 156 157 return convertMetaDataToMessage(meta, format); 158} 159 160void LiveSession::connectAsync( 161 const char *url, const KeyedVector<String8, String8> *headers) { 162 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 163 msg->setString("url", url); 164 165 if (headers != NULL) { 166 msg->setPointer( 167 "headers", 168 new KeyedVector<String8, String8>(*headers)); 169 } 170 171 msg->post(); 172} 173 174status_t LiveSession::disconnect() { 175 sp<AMessage> msg = new AMessage(kWhatDisconnect, id()); 176 177 sp<AMessage> response; 178 status_t err = msg->postAndAwaitResponse(&response); 179 180 return err; 181} 182 183status_t LiveSession::seekTo(int64_t timeUs) { 184 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 185 msg->setInt64("timeUs", timeUs); 186 187 sp<AMessage> response; 188 status_t err = msg->postAndAwaitResponse(&response); 189 190 return err; 191} 192 193void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 194 switch (msg->what()) { 195 case kWhatConnect: 196 { 197 onConnect(msg); 198 break; 199 } 200 201 case kWhatDisconnect: 202 { 203 CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID)); 204 205 if (mReconfigurationInProgress) { 206 break; 207 } 208 209 finishDisconnect(); 210 break; 211 } 212 213 case kWhatSeek: 214 { 215 uint32_t replyID; 216 CHECK(msg->senderAwaitsResponse(&replyID)); 217 218 status_t err = onSeek(msg); 219 220 sp<AMessage> response = new AMessage; 221 response->setInt32("err", err); 222 223 response->postReply(replyID); 224 break; 225 } 226 227 case kWhatFetcherNotify: 228 { 229 int32_t what; 230 CHECK(msg->findInt32("what", &what)); 231 232 switch (what) { 233 case PlaylistFetcher::kWhatStarted: 234 break; 235 case PlaylistFetcher::kWhatPaused: 236 case PlaylistFetcher::kWhatStopped: 237 { 238 if (what == PlaylistFetcher::kWhatStopped) { 239 AString uri; 240 CHECK(msg->findString("uri", &uri)); 241 mFetcherInfos.removeItem(uri); 242 } 243 244 if (mContinuation != NULL) { 245 CHECK_GT(mContinuationCounter, 0); 246 if (--mContinuationCounter == 0) { 247 mContinuation->post(); 248 } 249 } 250 break; 251 } 252 253 case PlaylistFetcher::kWhatDurationUpdate: 254 { 255 AString uri; 256 CHECK(msg->findString("uri", &uri)); 257 258 int64_t durationUs; 259 CHECK(msg->findInt64("durationUs", &durationUs)); 260 261 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 262 info->mDurationUs = durationUs; 263 break; 264 } 265 266 case PlaylistFetcher::kWhatError: 267 { 268 status_t err; 269 CHECK(msg->findInt32("err", &err)); 270 271 ALOGE("XXX Received error %d from PlaylistFetcher.", err); 272 273 if (mInPreparationPhase) { 274 postPrepared(err); 275 } 276 277 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); 278 279 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); 280 281 mPacketSources.valueFor( 282 STREAMTYPE_SUBTITLES)->signalEOS(err); 283 284 sp<AMessage> notify = mNotify->dup(); 285 notify->setInt32("what", kWhatError); 286 notify->setInt32("err", err); 287 notify->post(); 288 break; 289 } 290 291 case PlaylistFetcher::kWhatTemporarilyDoneFetching: 292 { 293 AString uri; 294 CHECK(msg->findString("uri", &uri)); 295 296 FetcherInfo *info = &mFetcherInfos.editValueFor(uri); 297 info->mIsPrepared = true; 298 299 if (mInPreparationPhase) { 300 bool allFetchersPrepared = true; 301 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 302 if (!mFetcherInfos.valueAt(i).mIsPrepared) { 303 allFetchersPrepared = false; 304 break; 305 } 306 } 307 308 if (allFetchersPrepared) { 309 postPrepared(OK); 310 } 311 } 312 break; 313 } 314 315 default: 316 TRESPASS(); 317 } 318 319 break; 320 } 321 322 case kWhatCheckBandwidth: 323 { 324 int32_t generation; 325 CHECK(msg->findInt32("generation", &generation)); 326 327 if (generation != mCheckBandwidthGeneration) { 328 break; 329 } 330 331 onCheckBandwidth(); 332 break; 333 } 334 335 case kWhatChangeConfiguration: 336 { 337 onChangeConfiguration(msg); 338 break; 339 } 340 341 case kWhatChangeConfiguration2: 342 { 343 onChangeConfiguration2(msg); 344 break; 345 } 346 347 case kWhatChangeConfiguration3: 348 { 349 onChangeConfiguration3(msg); 350 break; 351 } 352 353 case kWhatFinishDisconnect2: 354 { 355 onFinishDisconnect2(); 356 break; 357 } 358 359 default: 360 TRESPASS(); 361 break; 362 } 363} 364 365// static 366int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 367 if (a->mBandwidth < b->mBandwidth) { 368 return -1; 369 } else if (a->mBandwidth == b->mBandwidth) { 370 return 0; 371 } 372 373 return 1; 374} 375 376// static 377LiveSession::StreamType LiveSession::indexToType(int idx) { 378 CHECK(idx >= 0 && idx < kMaxStreams); 379 return (StreamType)(1 << idx); 380} 381 382void LiveSession::onConnect(const sp<AMessage> &msg) { 383 AString url; 384 CHECK(msg->findString("url", &url)); 385 386 KeyedVector<String8, String8> *headers = NULL; 387 if (!msg->findPointer("headers", (void **)&headers)) { 388 mExtraHeaders.clear(); 389 } else { 390 mExtraHeaders = *headers; 391 392 delete headers; 393 headers = NULL; 394 } 395 396#if 1 397 ALOGI("onConnect <URL suppressed>"); 398#else 399 ALOGI("onConnect %s", url.c_str()); 400#endif 401 402 mMasterURL = url; 403 404 bool dummy; 405 mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy); 406 407 if (mPlaylist == NULL) { 408 ALOGE("unable to fetch master playlist '%s'.", url.c_str()); 409 410 postPrepared(ERROR_IO); 411 return; 412 } 413 414 // We trust the content provider to make a reasonable choice of preferred 415 // initial bandwidth by listing it first in the variant playlist. 416 // At startup we really don't have a good estimate on the available 417 // network bandwidth since we haven't tranferred any data yet. Once 418 // we have we can make a better informed choice. 419 size_t initialBandwidth = 0; 420 size_t initialBandwidthIndex = 0; 421 422 if (mPlaylist->isVariantPlaylist()) { 423 for (size_t i = 0; i < mPlaylist->size(); ++i) { 424 BandwidthItem item; 425 426 item.mPlaylistIndex = i; 427 428 sp<AMessage> meta; 429 AString uri; 430 mPlaylist->itemAt(i, &uri, &meta); 431 432 unsigned long bandwidth; 433 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 434 435 if (initialBandwidth == 0) { 436 initialBandwidth = item.mBandwidth; 437 } 438 439 mBandwidthItems.push(item); 440 } 441 442 CHECK_GT(mBandwidthItems.size(), 0u); 443 444 mBandwidthItems.sort(SortByBandwidth); 445 446 for (size_t i = 0; i < mBandwidthItems.size(); ++i) { 447 if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) { 448 initialBandwidthIndex = i; 449 break; 450 } 451 } 452 } else { 453 // dummy item. 454 BandwidthItem item; 455 item.mPlaylistIndex = 0; 456 item.mBandwidth = 0; 457 mBandwidthItems.push(item); 458 } 459 460 changeConfiguration( 461 0ll /* timeUs */, initialBandwidthIndex, true /* pickTrack */); 462} 463 464void LiveSession::finishDisconnect() { 465 // No reconfiguration is currently pending, make sure none will trigger 466 // during disconnection either. 467 cancelCheckBandwidthEvent(); 468 469 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 470 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 471 } 472 473 sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id()); 474 475 mContinuationCounter = mFetcherInfos.size(); 476 mContinuation = msg; 477 478 if (mContinuationCounter == 0) { 479 msg->post(); 480 } 481} 482 483void LiveSession::onFinishDisconnect2() { 484 mContinuation.clear(); 485 486 mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM); 487 mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM); 488 489 mPacketSources.valueFor( 490 STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM); 491 492 sp<AMessage> response = new AMessage; 493 response->setInt32("err", OK); 494 495 response->postReply(mDisconnectReplyID); 496 mDisconnectReplyID = 0; 497} 498 499sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { 500 ssize_t index = mFetcherInfos.indexOfKey(uri); 501 502 if (index >= 0) { 503 return NULL; 504 } 505 506 sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); 507 notify->setString("uri", uri); 508 509 FetcherInfo info; 510 info.mFetcher = new PlaylistFetcher(notify, this, uri); 511 info.mDurationUs = -1ll; 512 info.mIsPrepared = false; 513 looper()->registerHandler(info.mFetcher); 514 515 mFetcherInfos.add(uri, info); 516 517 return info.mFetcher; 518} 519 520/* 521 * Illustration of parameters: 522 * 523 * 0 `range_offset` 524 * +------------+-------------------------------------------------------+--+--+ 525 * | | | next block to fetch | | | 526 * | | `source` handle => `out` buffer | | | | 527 * | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | | 528 * | |<----------- `range_length` / buffer capacity ----------->| | 529 * |<------------------------------ file_size ------------------------------->| 530 * 531 * Special parameter values: 532 * - range_length == -1 means entire file 533 * - block_size == 0 means entire range 534 * 535 */ 536status_t LiveSession::fetchFile( 537 const char *url, sp<ABuffer> *out, 538 int64_t range_offset, int64_t range_length, 539 uint32_t block_size, /* download block size */ 540 sp<DataSource> *source /* to return and reuse source */) { 541 off64_t size; 542 sp<DataSource> temp_source; 543 if (source == NULL) { 544 source = &temp_source; 545 } 546 547 if (*source == NULL) { 548 if (!strncasecmp(url, "file://", 7)) { 549 *source = new FileSource(url + 7); 550 } else if (strncasecmp(url, "http://", 7) 551 && strncasecmp(url, "https://", 8)) { 552 return ERROR_UNSUPPORTED; 553 } else { 554 KeyedVector<String8, String8> headers = mExtraHeaders; 555 if (range_offset > 0 || range_length >= 0) { 556 headers.add( 557 String8("Range"), 558 String8( 559 StringPrintf( 560 "bytes=%lld-%s", 561 range_offset, 562 range_length < 0 563 ? "" : StringPrintf("%lld", 564 range_offset + range_length - 1).c_str()).c_str())); 565 } 566 status_t err = mHTTPDataSource->connect(url, &headers); 567 568 if (err != OK) { 569 return err; 570 } 571 572 *source = mHTTPDataSource; 573 } 574 } 575 576 status_t getSizeErr = (*source)->getSize(&size); 577 if (getSizeErr != OK) { 578 size = 65536; 579 } 580 581 sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size); 582 if (*out == NULL) { 583 buffer->setRange(0, 0); 584 } 585 586 // adjust range_length if only reading partial block 587 if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) { 588 range_length = buffer->size() + block_size; 589 } 590 for (;;) { 591 // Only resize when we don't know the size. 592 size_t bufferRemaining = buffer->capacity() - buffer->size(); 593 if (bufferRemaining == 0 && getSizeErr != OK) { 594 bufferRemaining = 32768; 595 596 ALOGV("increasing download buffer to %d bytes", 597 buffer->size() + bufferRemaining); 598 599 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 600 memcpy(copy->data(), buffer->data(), buffer->size()); 601 copy->setRange(0, buffer->size()); 602 603 buffer = copy; 604 } 605 606 size_t maxBytesToRead = bufferRemaining; 607 if (range_length >= 0) { 608 int64_t bytesLeftInRange = range_length - buffer->size(); 609 if (bytesLeftInRange < maxBytesToRead) { 610 maxBytesToRead = bytesLeftInRange; 611 612 if (bytesLeftInRange == 0) { 613 break; 614 } 615 } 616 } 617 618 // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0) 619 // to help us break out of the loop. 620 ssize_t n = (*source)->readAt( 621 buffer->size(), buffer->data() + buffer->size(), 622 maxBytesToRead); 623 624 if (n < 0) { 625 return n; 626 } 627 628 if (n == 0) { 629 break; 630 } 631 632 buffer->setRange(0, buffer->size() + (size_t)n); 633 } 634 635 *out = buffer; 636 637 return OK; 638} 639 640sp<M3UParser> LiveSession::fetchPlaylist( 641 const char *url, uint8_t *curPlaylistHash, bool *unchanged) { 642 ALOGV("fetchPlaylist '%s'", url); 643 644 *unchanged = false; 645 646 sp<ABuffer> buffer; 647 status_t err = fetchFile(url, &buffer); 648 649 if (err != OK) { 650 return NULL; 651 } 652 653 // MD5 functionality is not available on the simulator, treat all 654 // playlists as changed. 655 656#if defined(HAVE_ANDROID_OS) 657 uint8_t hash[16]; 658 659 MD5_CTX m; 660 MD5_Init(&m); 661 MD5_Update(&m, buffer->data(), buffer->size()); 662 663 MD5_Final(hash, &m); 664 665 if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) { 666 // playlist unchanged 667 *unchanged = true; 668 669 ALOGV("Playlist unchanged, refresh state is now %d", 670 (int)mRefreshState); 671 672 return NULL; 673 } 674 675 if (curPlaylistHash != NULL) { 676 memcpy(curPlaylistHash, hash, sizeof(hash)); 677 } 678#endif 679 680 sp<M3UParser> playlist = 681 new M3UParser(url, buffer->data(), buffer->size()); 682 683 if (playlist->initCheck() != OK) { 684 ALOGE("failed to parse .m3u8 playlist"); 685 686 return NULL; 687 } 688 689 return playlist; 690} 691 692static double uniformRand() { 693 return (double)rand() / RAND_MAX; 694} 695 696size_t LiveSession::getBandwidthIndex() { 697 if (mBandwidthItems.size() == 0) { 698 return 0; 699 } 700 701#if 1 702 char value[PROPERTY_VALUE_MAX]; 703 ssize_t index = -1; 704 if (property_get("media.httplive.bw-index", value, NULL)) { 705 char *end; 706 index = strtol(value, &end, 10); 707 CHECK(end > value && *end == '\0'); 708 709 if (index >= 0 && (size_t)index >= mBandwidthItems.size()) { 710 index = mBandwidthItems.size() - 1; 711 } 712 } 713 714 if (index < 0) { 715 int32_t bandwidthBps; 716 if (mHTTPDataSource != NULL 717 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 718 ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 719 } else { 720 ALOGV("no bandwidth estimate."); 721 return 0; // Pick the lowest bandwidth stream by default. 722 } 723 724 char value[PROPERTY_VALUE_MAX]; 725 if (property_get("media.httplive.max-bw", value, NULL)) { 726 char *end; 727 long maxBw = strtoul(value, &end, 10); 728 if (end > value && *end == '\0') { 729 if (maxBw > 0 && bandwidthBps > maxBw) { 730 ALOGV("bandwidth capped to %ld bps", maxBw); 731 bandwidthBps = maxBw; 732 } 733 } 734 } 735 736 // Consider only 80% of the available bandwidth usable. 737 bandwidthBps = (bandwidthBps * 8) / 10; 738 739 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 740 741 index = mBandwidthItems.size() - 1; 742 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 743 > (size_t)bandwidthBps) { 744 --index; 745 } 746 } 747#elif 0 748 // Change bandwidth at random() 749 size_t index = uniformRand() * mBandwidthItems.size(); 750#elif 0 751 // There's a 50% chance to stay on the current bandwidth and 752 // a 50% chance to switch to the next higher bandwidth (wrapping around 753 // to lowest) 754 const size_t kMinIndex = 0; 755 756 static ssize_t mPrevBandwidthIndex = -1; 757 758 size_t index; 759 if (mPrevBandwidthIndex < 0) { 760 index = kMinIndex; 761 } else if (uniformRand() < 0.5) { 762 index = (size_t)mPrevBandwidthIndex; 763 } else { 764 index = mPrevBandwidthIndex + 1; 765 if (index == mBandwidthItems.size()) { 766 index = kMinIndex; 767 } 768 } 769 mPrevBandwidthIndex = index; 770#elif 0 771 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 772 773 size_t index = mBandwidthItems.size() - 1; 774 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 775 --index; 776 } 777#elif 1 778 char value[PROPERTY_VALUE_MAX]; 779 size_t index; 780 if (property_get("media.httplive.bw-index", value, NULL)) { 781 char *end; 782 index = strtoul(value, &end, 10); 783 CHECK(end > value && *end == '\0'); 784 785 if (index >= mBandwidthItems.size()) { 786 index = mBandwidthItems.size() - 1; 787 } 788 } else { 789 index = 0; 790 } 791#else 792 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 793#endif 794 795 CHECK_GE(index, 0); 796 797 return index; 798} 799 800status_t LiveSession::onSeek(const sp<AMessage> &msg) { 801 int64_t timeUs; 802 CHECK(msg->findInt64("timeUs", &timeUs)); 803 804 if (!mReconfigurationInProgress) { 805 changeConfiguration(timeUs, getBandwidthIndex()); 806 } 807 808 return OK; 809} 810 811status_t LiveSession::getDuration(int64_t *durationUs) const { 812 int64_t maxDurationUs = 0ll; 813 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 814 int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs; 815 816 if (fetcherDurationUs >= 0ll && fetcherDurationUs > maxDurationUs) { 817 maxDurationUs = fetcherDurationUs; 818 } 819 } 820 821 *durationUs = maxDurationUs; 822 823 return OK; 824} 825 826bool LiveSession::isSeekable() const { 827 int64_t durationUs; 828 return getDuration(&durationUs) == OK && durationUs >= 0; 829} 830 831bool LiveSession::hasDynamicDuration() const { 832 return false; 833} 834 835status_t LiveSession::getTrackInfo(Parcel *reply) const { 836 return mPlaylist->getTrackInfo(reply); 837} 838 839status_t LiveSession::selectTrack(size_t index, bool select) { 840 status_t err = mPlaylist->selectTrack(index, select); 841 if (err == OK) { 842 (new AMessage(kWhatChangeConfiguration, id()))->post(); 843 } 844 return err; 845} 846 847void LiveSession::changeConfiguration( 848 int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { 849 CHECK(!mReconfigurationInProgress); 850 mReconfigurationInProgress = true; 851 852 mPrevBandwidthIndex = bandwidthIndex; 853 854 ALOGV("changeConfiguration => timeUs:%lld us, bwIndex:%d, pickTrack:%d", 855 timeUs, bandwidthIndex, pickTrack); 856 857 if (pickTrack) { 858 mPlaylist->pickRandomMediaItems(); 859 } 860 861 CHECK_LT(bandwidthIndex, mBandwidthItems.size()); 862 const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); 863 864 uint32_t streamMask = 0; 865 866 AString URIs[kMaxStreams]; 867 for (size_t i = 0; i < kMaxStreams; ++i) { 868 if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) { 869 streamMask |= indexToType(i); 870 } 871 } 872 873 // Step 1, stop and discard fetchers that are no longer needed. 874 // Pause those that we'll reuse. 875 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 876 const AString &uri = mFetcherInfos.keyAt(i); 877 878 bool discardFetcher = true; 879 880 // If we're seeking all current fetchers are discarded. 881 if (timeUs < 0ll) { 882 for (size_t j = 0; j < kMaxStreams; ++j) { 883 if ((streamMask & indexToType(j)) && uri == URIs[j]) { 884 discardFetcher = false; 885 } 886 } 887 } 888 889 if (discardFetcher) { 890 mFetcherInfos.valueAt(i).mFetcher->stopAsync(); 891 } else { 892 mFetcherInfos.valueAt(i).mFetcher->pauseAsync(); 893 } 894 } 895 896 sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id()); 897 msg->setInt32("streamMask", streamMask); 898 msg->setInt64("timeUs", timeUs); 899 for (size_t i = 0; i < kMaxStreams; ++i) { 900 if (streamMask & indexToType(i)) { 901 msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str()); 902 } 903 } 904 905 // Every time a fetcher acknowledges the stopAsync or pauseAsync request 906 // we'll decrement mContinuationCounter, once it reaches zero, i.e. all 907 // fetchers have completed their asynchronous operation, we'll post 908 // mContinuation, which then is handled below in onChangeConfiguration2. 909 mContinuationCounter = mFetcherInfos.size(); 910 mContinuation = msg; 911 912 if (mContinuationCounter == 0) { 913 msg->post(); 914 } 915} 916 917void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) { 918 if (!mReconfigurationInProgress) { 919 changeConfiguration(-1ll /* timeUs */, getBandwidthIndex()); 920 } else { 921 msg->post(1000000ll); // retry in 1 sec 922 } 923} 924 925void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { 926 mContinuation.clear(); 927 928 // All fetchers are either suspended or have been removed now. 929 930 uint32_t streamMask; 931 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 932 933 AString URIs[kMaxStreams]; 934 for (size_t i = 0; i < kMaxStreams; ++i) { 935 if (streamMask & indexToType(i)) { 936 const AString &uriKey = mStreams[i].uriKey(); 937 CHECK(msg->findString(uriKey.c_str(), &URIs[i])); 938 ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str()); 939 } 940 } 941 942 // Determine which decoders to shutdown on the player side, 943 // a decoder has to be shutdown if either 944 // 1) its streamtype was active before but now longer isn't. 945 // or 946 // 2) its streamtype was already active and still is but the URI 947 // has changed. 948 uint32_t changedMask = 0; 949 for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) { 950 if (((mStreamMask & streamMask & indexToType(i)) 951 && !(URIs[i] == mStreams[i].mUri)) 952 || (mStreamMask & ~streamMask & indexToType(i))) { 953 changedMask |= indexToType(i); 954 } 955 } 956 957 if (changedMask == 0) { 958 // If nothing changed as far as the audio/video decoders 959 // are concerned we can proceed. 960 onChangeConfiguration3(msg); 961 return; 962 } 963 964 // Something changed, inform the player which will shutdown the 965 // corresponding decoders and will post the reply once that's done. 966 // Handling the reply will continue executing below in 967 // onChangeConfiguration3. 968 sp<AMessage> notify = mNotify->dup(); 969 notify->setInt32("what", kWhatStreamsChanged); 970 notify->setInt32("changedMask", changedMask); 971 972 msg->setWhat(kWhatChangeConfiguration3); 973 msg->setTarget(id()); 974 975 notify->setMessage("reply", msg); 976 notify->post(); 977} 978 979void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { 980 // All remaining fetchers are still suspended, the player has shutdown 981 // any decoders that needed it. 982 983 uint32_t streamMask; 984 CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); 985 986 for (size_t i = 0; i < kMaxStreams; ++i) { 987 if (streamMask & indexToType(i)) { 988 CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri)); 989 } 990 } 991 992 int64_t timeUs; 993 CHECK(msg->findInt64("timeUs", &timeUs)); 994 995 if (timeUs < 0ll) { 996 timeUs = mLastDequeuedTimeUs; 997 } 998 mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; 999 1000 mStreamMask = streamMask; 1001 1002 // Resume all existing fetchers and assign them packet sources. 1003 for (size_t i = 0; i < mFetcherInfos.size(); ++i) { 1004 const AString &uri = mFetcherInfos.keyAt(i); 1005 1006 uint32_t resumeMask = 0; 1007 1008 sp<AnotherPacketSource> sources[kMaxStreams]; 1009 for (size_t j = 0; j < kMaxStreams; ++j) { 1010 if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { 1011 sources[j] = mPacketSources.valueFor(indexToType(j)); 1012 resumeMask |= indexToType(j); 1013 } 1014 } 1015 1016 CHECK_NE(resumeMask, 0u); 1017 1018 ALOGV("resuming fetchers for mask 0x%08x", resumeMask); 1019 1020 streamMask &= ~resumeMask; 1021 1022 mFetcherInfos.valueAt(i).mFetcher->startAsync( 1023 sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); 1024 } 1025 1026 // streamMask now only contains the types that need a new fetcher created. 1027 1028 if (streamMask != 0) { 1029 ALOGV("creating new fetchers for mask 0x%08x", streamMask); 1030 } 1031 1032 for (size_t i = 0; i < kMaxStreams; i++) { 1033 if (!(indexToType(i) & streamMask)) { 1034 continue; 1035 } 1036 1037 AString uri; 1038 uri = mStreams[i].mUri; 1039 1040 sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); 1041 CHECK(fetcher != NULL); 1042 1043 sp<AnotherPacketSource> sources[kMaxStreams]; 1044 // TRICKY: looping from i as earlier streams are already removed from streamMask 1045 for (size_t j = i; j < kMaxStreams; ++j) { 1046 if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { 1047 sources[j] = mPacketSources.valueFor(indexToType(j)); 1048 sources[j]->clear(); 1049 1050 streamMask &= ~indexToType(j); 1051 } 1052 } 1053 1054 fetcher->startAsync( 1055 sources[kAudioIndex], 1056 sources[kVideoIndex], 1057 sources[kSubtitleIndex], 1058 timeUs); 1059 } 1060 1061 // All fetchers have now been started, the configuration change 1062 // has completed. 1063 1064 scheduleCheckBandwidthEvent(); 1065 1066 ALOGV("XXX configuration change completed."); 1067 1068 mReconfigurationInProgress = false; 1069 1070 if (mDisconnectReplyID != 0) { 1071 finishDisconnect(); 1072 } 1073} 1074 1075void LiveSession::scheduleCheckBandwidthEvent() { 1076 sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); 1077 msg->setInt32("generation", mCheckBandwidthGeneration); 1078 msg->post(10000000ll); 1079} 1080 1081void LiveSession::cancelCheckBandwidthEvent() { 1082 ++mCheckBandwidthGeneration; 1083} 1084 1085void LiveSession::onCheckBandwidth() { 1086 if (mReconfigurationInProgress) { 1087 scheduleCheckBandwidthEvent(); 1088 return; 1089 } 1090 1091 size_t bandwidthIndex = getBandwidthIndex(); 1092 if (mPrevBandwidthIndex < 0 1093 || bandwidthIndex != (size_t)mPrevBandwidthIndex) { 1094 changeConfiguration(-1ll /* timeUs */, bandwidthIndex); 1095 } 1096 1097 // Handling the kWhatCheckBandwidth even here does _not_ automatically 1098 // schedule another one on return, only an explicit call to 1099 // scheduleCheckBandwidthEvent will do that. 1100 // This ensures that only one configuration change is ongoing at any 1101 // one time, once that completes it'll schedule another check bandwidth 1102 // event. 1103} 1104 1105void LiveSession::postPrepared(status_t err) { 1106 CHECK(mInPreparationPhase); 1107 1108 sp<AMessage> notify = mNotify->dup(); 1109 if (err == OK || err == ERROR_END_OF_STREAM) { 1110 notify->setInt32("what", kWhatPrepared); 1111 } else { 1112 notify->setInt32("what", kWhatPreparationFailed); 1113 notify->setInt32("err", err); 1114 } 1115 1116 notify->post(); 1117 1118 mInPreparationPhase = false; 1119} 1120 1121} // namespace android 1122 1123