PlaylistFetcher.cpp revision a540058ec00b6d147f40a7cdcf6f47acbd70f6c9
1/* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "PlaylistFetcher" 19#include <utils/Log.h> 20#include <utils/misc.h> 21 22#include "PlaylistFetcher.h" 23#include "HTTPDownloader.h" 24#include "LiveSession.h" 25#include "M3UParser.h" 26#include "include/avc_utils.h" 27#include "include/ID3.h" 28#include "mpeg2ts/AnotherPacketSource.h" 29 30#include <media/stagefright/foundation/ABitReader.h> 31#include <media/stagefright/foundation/ABuffer.h> 32#include <media/stagefright/foundation/ADebug.h> 33#include <media/stagefright/MediaDefs.h> 34#include <media/stagefright/MetaData.h> 35#include <media/stagefright/Utils.h> 36 37#include <ctype.h> 38#include <inttypes.h> 39#include <openssl/aes.h> 40 41#define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__) 42#define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \ 43 LiveSession::getNameForStream(stream), ##__VA_ARGS__) 44 45namespace android { 46 47// static 48const int64_t PlaylistFetcher::kMinBufferedDurationUs = 30000000ll; 49const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; 50// LCM of 188 (size of a TS packet) & 1k works well 51const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024; 52 53struct PlaylistFetcher::DownloadState : public RefBase { 54 DownloadState(); 55 void resetState(); 56 bool hasSavedState() const; 57 void restoreState( 58 AString &uri, 59 sp<AMessage> &itemMeta, 60 sp<ABuffer> &buffer, 61 sp<ABuffer> &tsBuffer, 62 int32_t &firstSeqNumberInPlaylist, 63 int32_t &lastSeqNumberInPlaylist); 64 void saveState( 65 AString &uri, 66 sp<AMessage> &itemMeta, 67 sp<ABuffer> &buffer, 68 sp<ABuffer> &tsBuffer, 69 int32_t &firstSeqNumberInPlaylist, 70 int32_t &lastSeqNumberInPlaylist); 71 72private: 73 bool mHasSavedState; 74 AString mUri; 75 sp<AMessage> mItemMeta; 76 sp<ABuffer> mBuffer; 77 sp<ABuffer> mTsBuffer; 78 int32_t mFirstSeqNumberInPlaylist; 79 int32_t mLastSeqNumberInPlaylist; 80}; 81 82PlaylistFetcher::DownloadState::DownloadState() { 83 resetState(); 84} 85 86bool PlaylistFetcher::DownloadState::hasSavedState() const { 87 return mHasSavedState; 88} 89 90void PlaylistFetcher::DownloadState::resetState() { 91 mHasSavedState = false; 92 93 mUri.clear(); 94 mItemMeta = NULL; 95 mBuffer = NULL; 96 mTsBuffer = NULL; 97 mFirstSeqNumberInPlaylist = 0; 98 mLastSeqNumberInPlaylist = 0; 99} 100 101void PlaylistFetcher::DownloadState::restoreState( 102 AString &uri, 103 sp<AMessage> &itemMeta, 104 sp<ABuffer> &buffer, 105 sp<ABuffer> &tsBuffer, 106 int32_t &firstSeqNumberInPlaylist, 107 int32_t &lastSeqNumberInPlaylist) { 108 if (!mHasSavedState) { 109 return; 110 } 111 112 uri = mUri; 113 itemMeta = mItemMeta; 114 buffer = mBuffer; 115 tsBuffer = mTsBuffer; 116 firstSeqNumberInPlaylist = mFirstSeqNumberInPlaylist; 117 lastSeqNumberInPlaylist = mLastSeqNumberInPlaylist; 118 119 resetState(); 120} 121 122void PlaylistFetcher::DownloadState::saveState( 123 AString &uri, 124 sp<AMessage> &itemMeta, 125 sp<ABuffer> &buffer, 126 sp<ABuffer> &tsBuffer, 127 int32_t &firstSeqNumberInPlaylist, 128 int32_t &lastSeqNumberInPlaylist) { 129 mHasSavedState = true; 130 131 mUri = uri; 132 mItemMeta = itemMeta; 133 mBuffer = buffer; 134 mTsBuffer = tsBuffer; 135 mFirstSeqNumberInPlaylist = firstSeqNumberInPlaylist; 136 mLastSeqNumberInPlaylist = lastSeqNumberInPlaylist; 137} 138 139PlaylistFetcher::PlaylistFetcher( 140 const sp<AMessage> ¬ify, 141 const sp<LiveSession> &session, 142 const char *uri, 143 int32_t id, 144 int32_t subtitleGeneration) 145 : mNotify(notify), 146 mSession(session), 147 mURI(uri), 148 mFetcherID(id), 149 mStreamTypeMask(0), 150 mStartTimeUs(-1ll), 151 mSegmentStartTimeUs(-1ll), 152 mDiscontinuitySeq(-1ll), 153 mStartTimeUsRelative(false), 154 mLastPlaylistFetchTimeUs(-1ll), 155 mPlaylistTimeUs(-1ll), 156 mSeqNumber(-1), 157 mNumRetries(0), 158 mStartup(true), 159 mIDRFound(false), 160 mSeekMode(LiveSession::kSeekModeExactPosition), 161 mTimeChangeSignaled(false), 162 mNextPTSTimeUs(-1ll), 163 mMonitorQueueGeneration(0), 164 mSubtitleGeneration(subtitleGeneration), 165 mLastDiscontinuitySeq(-1ll), 166 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), 167 mFirstPTSValid(false), 168 mFirstTimeUs(-1ll), 169 mVideoBuffer(new AnotherPacketSource(NULL)), 170 mThresholdRatio(-1.0f), 171 mDownloadState(new DownloadState()), 172 mHasMetadata(false) { 173 memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); 174 mHTTPDownloader = mSession->getHTTPDownloader(); 175} 176 177PlaylistFetcher::~PlaylistFetcher() { 178} 179 180int32_t PlaylistFetcher::getFetcherID() const { 181 return mFetcherID; 182} 183 184int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { 185 CHECK(mPlaylist != NULL); 186 187 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; 188 mPlaylist->getSeqNumberRange( 189 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist); 190 191 CHECK_GE(seqNumber, firstSeqNumberInPlaylist); 192 CHECK_LE(seqNumber, lastSeqNumberInPlaylist); 193 194 int64_t segmentStartUs = 0ll; 195 for (int32_t index = 0; 196 index < seqNumber - firstSeqNumberInPlaylist; ++index) { 197 sp<AMessage> itemMeta; 198 CHECK(mPlaylist->itemAt( 199 index, NULL /* uri */, &itemMeta)); 200 201 int64_t itemDurationUs; 202 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 203 204 segmentStartUs += itemDurationUs; 205 } 206 207 return segmentStartUs; 208} 209 210int64_t PlaylistFetcher::getSegmentDurationUs(int32_t seqNumber) const { 211 CHECK(mPlaylist != NULL); 212 213 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; 214 mPlaylist->getSeqNumberRange( 215 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist); 216 217 CHECK_GE(seqNumber, firstSeqNumberInPlaylist); 218 CHECK_LE(seqNumber, lastSeqNumberInPlaylist); 219 220 int32_t index = seqNumber - firstSeqNumberInPlaylist; 221 sp<AMessage> itemMeta; 222 CHECK(mPlaylist->itemAt( 223 index, NULL /* uri */, &itemMeta)); 224 225 int64_t itemDurationUs; 226 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 227 228 return itemDurationUs; 229} 230 231int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { 232 int64_t nowUs = ALooper::GetNowUs(); 233 234 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) { 235 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY); 236 return 0ll; 237 } 238 239 if (mPlaylist->isComplete()) { 240 return (~0llu >> 1); 241 } 242 243 int64_t targetDurationUs = mPlaylist->getTargetDuration(); 244 245 int64_t minPlaylistAgeUs; 246 247 switch (mRefreshState) { 248 case INITIAL_MINIMUM_RELOAD_DELAY: 249 { 250 size_t n = mPlaylist->size(); 251 if (n > 0) { 252 sp<AMessage> itemMeta; 253 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta)); 254 255 int64_t itemDurationUs; 256 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 257 258 minPlaylistAgeUs = itemDurationUs; 259 break; 260 } 261 262 // fall through 263 } 264 265 case FIRST_UNCHANGED_RELOAD_ATTEMPT: 266 { 267 minPlaylistAgeUs = targetDurationUs / 2; 268 break; 269 } 270 271 case SECOND_UNCHANGED_RELOAD_ATTEMPT: 272 { 273 minPlaylistAgeUs = (targetDurationUs * 3) / 2; 274 break; 275 } 276 277 case THIRD_UNCHANGED_RELOAD_ATTEMPT: 278 { 279 minPlaylistAgeUs = targetDurationUs * 3; 280 break; 281 } 282 283 default: 284 TRESPASS(); 285 break; 286 } 287 288 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs; 289 return delayUs > 0ll ? delayUs : 0ll; 290} 291 292status_t PlaylistFetcher::decryptBuffer( 293 size_t playlistIndex, const sp<ABuffer> &buffer, 294 bool first) { 295 sp<AMessage> itemMeta; 296 bool found = false; 297 AString method; 298 299 for (ssize_t i = playlistIndex; i >= 0; --i) { 300 AString uri; 301 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); 302 303 if (itemMeta->findString("cipher-method", &method)) { 304 found = true; 305 break; 306 } 307 } 308 309 if (!found) { 310 method = "NONE"; 311 } 312 buffer->meta()->setString("cipher-method", method.c_str()); 313 314 if (method == "NONE") { 315 return OK; 316 } else if (!(method == "AES-128")) { 317 ALOGE("Unsupported cipher method '%s'", method.c_str()); 318 return ERROR_UNSUPPORTED; 319 } 320 321 AString keyURI; 322 if (!itemMeta->findString("cipher-uri", &keyURI)) { 323 ALOGE("Missing key uri"); 324 return ERROR_MALFORMED; 325 } 326 327 ssize_t index = mAESKeyForURI.indexOfKey(keyURI); 328 329 sp<ABuffer> key; 330 if (index >= 0) { 331 key = mAESKeyForURI.valueAt(index); 332 } else { 333 ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key); 334 335 if (err == ERROR_NOT_CONNECTED) { 336 return ERROR_NOT_CONNECTED; 337 } else if (err < 0) { 338 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); 339 return ERROR_IO; 340 } else if (key->size() != 16) { 341 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str()); 342 return ERROR_MALFORMED; 343 } 344 345 mAESKeyForURI.add(keyURI, key); 346 } 347 348 AES_KEY aes_key; 349 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { 350 ALOGE("failed to set AES decryption key."); 351 return UNKNOWN_ERROR; 352 } 353 354 size_t n = buffer->size(); 355 if (!n) { 356 return OK; 357 } 358 CHECK(n % 16 == 0); 359 360 if (first) { 361 // If decrypting the first block in a file, read the iv from the manifest 362 // or derive the iv from the file's sequence number. 363 364 AString iv; 365 if (itemMeta->findString("cipher-iv", &iv)) { 366 if ((!iv.startsWith("0x") && !iv.startsWith("0X")) 367 || iv.size() != 16 * 2 + 2) { 368 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 369 return ERROR_MALFORMED; 370 } 371 372 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 373 for (size_t i = 0; i < 16; ++i) { 374 char c1 = tolower(iv.c_str()[2 + 2 * i]); 375 char c2 = tolower(iv.c_str()[3 + 2 * i]); 376 if (!isxdigit(c1) || !isxdigit(c2)) { 377 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 378 return ERROR_MALFORMED; 379 } 380 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; 381 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; 382 383 mAESInitVec[i] = nibble1 << 4 | nibble2; 384 } 385 } else { 386 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 387 mAESInitVec[15] = mSeqNumber & 0xff; 388 mAESInitVec[14] = (mSeqNumber >> 8) & 0xff; 389 mAESInitVec[13] = (mSeqNumber >> 16) & 0xff; 390 mAESInitVec[12] = (mSeqNumber >> 24) & 0xff; 391 } 392 } 393 394 AES_cbc_encrypt( 395 buffer->data(), buffer->data(), buffer->size(), 396 &aes_key, mAESInitVec, AES_DECRYPT); 397 398 return OK; 399} 400 401status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) { 402 AString method; 403 CHECK(buffer->meta()->findString("cipher-method", &method)); 404 if (method == "NONE") { 405 return OK; 406 } 407 408 uint8_t padding = 0; 409 if (buffer->size() > 0) { 410 padding = buffer->data()[buffer->size() - 1]; 411 } 412 413 if (padding > 16) { 414 return ERROR_MALFORMED; 415 } 416 417 for (size_t i = buffer->size() - padding; i < padding; i++) { 418 if (buffer->data()[i] != padding) { 419 return ERROR_MALFORMED; 420 } 421 } 422 423 buffer->setRange(buffer->offset(), buffer->size() - padding); 424 return OK; 425} 426 427void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { 428 int64_t maxDelayUs = delayUsToRefreshPlaylist(); 429 if (maxDelayUs < minDelayUs) { 430 maxDelayUs = minDelayUs; 431 } 432 if (delayUs > maxDelayUs) { 433 FLOGV("Need to refresh playlist in %lld", (long long)maxDelayUs); 434 delayUs = maxDelayUs; 435 } 436 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this); 437 msg->setInt32("generation", mMonitorQueueGeneration); 438 msg->post(delayUs); 439} 440 441void PlaylistFetcher::cancelMonitorQueue() { 442 ++mMonitorQueueGeneration; 443} 444 445void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) { 446 { 447 AutoMutex _l(mThresholdLock); 448 mThresholdRatio = thresholdRatio; 449 } 450 if (disconnect) { 451 mHTTPDownloader->disconnect(); 452 } 453} 454 455void PlaylistFetcher::resetStoppingThreshold(bool disconnect) { 456 { 457 AutoMutex _l(mThresholdLock); 458 mThresholdRatio = -1.0f; 459 } 460 if (disconnect) { 461 mHTTPDownloader->disconnect(); 462 } else { 463 // allow reconnect 464 mHTTPDownloader->reconnect(); 465 } 466} 467 468float PlaylistFetcher::getStoppingThreshold() { 469 AutoMutex _l(mThresholdLock); 470 return mThresholdRatio; 471} 472 473void PlaylistFetcher::startAsync( 474 const sp<AnotherPacketSource> &audioSource, 475 const sp<AnotherPacketSource> &videoSource, 476 const sp<AnotherPacketSource> &subtitleSource, 477 const sp<AnotherPacketSource> &metadataSource, 478 int64_t startTimeUs, 479 int64_t segmentStartTimeUs, 480 int32_t startDiscontinuitySeq, 481 LiveSession::SeekMode seekMode) { 482 sp<AMessage> msg = new AMessage(kWhatStart, this); 483 484 uint32_t streamTypeMask = 0ul; 485 486 if (audioSource != NULL) { 487 msg->setPointer("audioSource", audioSource.get()); 488 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO; 489 } 490 491 if (videoSource != NULL) { 492 msg->setPointer("videoSource", videoSource.get()); 493 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO; 494 } 495 496 if (subtitleSource != NULL) { 497 msg->setPointer("subtitleSource", subtitleSource.get()); 498 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES; 499 } 500 501 if (metadataSource != NULL) { 502 msg->setPointer("metadataSource", metadataSource.get()); 503 // metadataSource does not affect streamTypeMask. 504 } 505 506 msg->setInt32("streamTypeMask", streamTypeMask); 507 msg->setInt64("startTimeUs", startTimeUs); 508 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs); 509 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq); 510 msg->setInt32("seekMode", seekMode); 511 msg->post(); 512} 513 514void PlaylistFetcher::pauseAsync( 515 float thresholdRatio, bool disconnect) { 516 setStoppingThreshold(thresholdRatio, disconnect); 517 518 (new AMessage(kWhatPause, this))->post(); 519} 520 521void PlaylistFetcher::stopAsync(bool clear) { 522 setStoppingThreshold(0.0f, true /* disconncect */); 523 524 sp<AMessage> msg = new AMessage(kWhatStop, this); 525 msg->setInt32("clear", clear); 526 msg->post(); 527} 528 529void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) { 530 FLOGV("resumeUntilAsync: params=%s", params->debugString().c_str()); 531 532 AMessage* msg = new AMessage(kWhatResumeUntil, this); 533 msg->setMessage("params", params); 534 msg->post(); 535} 536 537void PlaylistFetcher::fetchPlaylistAsync() { 538 (new AMessage(kWhatFetchPlaylist, this))->post(); 539} 540 541void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { 542 switch (msg->what()) { 543 case kWhatStart: 544 { 545 status_t err = onStart(msg); 546 547 sp<AMessage> notify = mNotify->dup(); 548 notify->setInt32("what", kWhatStarted); 549 notify->setInt32("err", err); 550 notify->post(); 551 break; 552 } 553 554 case kWhatPause: 555 { 556 onPause(); 557 558 sp<AMessage> notify = mNotify->dup(); 559 notify->setInt32("what", kWhatPaused); 560 notify->setInt32("seekMode", 561 mDownloadState->hasSavedState() 562 ? LiveSession::kSeekModeNextSample 563 : LiveSession::kSeekModeNextSegment); 564 notify->post(); 565 break; 566 } 567 568 case kWhatStop: 569 { 570 onStop(msg); 571 572 sp<AMessage> notify = mNotify->dup(); 573 notify->setInt32("what", kWhatStopped); 574 notify->post(); 575 break; 576 } 577 578 case kWhatFetchPlaylist: 579 { 580 bool unchanged; 581 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist( 582 mURI.c_str(), NULL /* curPlaylistHash */, &unchanged); 583 584 sp<AMessage> notify = mNotify->dup(); 585 notify->setInt32("what", kWhatPlaylistFetched); 586 notify->setObject("playlist", playlist); 587 notify->post(); 588 break; 589 } 590 591 case kWhatMonitorQueue: 592 case kWhatDownloadNext: 593 { 594 int32_t generation; 595 CHECK(msg->findInt32("generation", &generation)); 596 597 if (generation != mMonitorQueueGeneration) { 598 // Stale event 599 break; 600 } 601 602 if (msg->what() == kWhatMonitorQueue) { 603 onMonitorQueue(); 604 } else { 605 onDownloadNext(); 606 } 607 break; 608 } 609 610 case kWhatResumeUntil: 611 { 612 onResumeUntil(msg); 613 break; 614 } 615 616 default: 617 TRESPASS(); 618 } 619} 620 621status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { 622 mPacketSources.clear(); 623 mStopParams.clear(); 624 mStartTimeUsNotify = mNotify->dup(); 625 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 626 mStartTimeUsNotify->setString("uri", mURI); 627 628 uint32_t streamTypeMask; 629 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); 630 631 int64_t startTimeUs; 632 int64_t segmentStartTimeUs; 633 int32_t startDiscontinuitySeq; 634 int32_t seekMode; 635 CHECK(msg->findInt64("startTimeUs", &startTimeUs)); 636 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs)); 637 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq)); 638 CHECK(msg->findInt32("seekMode", &seekMode)); 639 640 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { 641 void *ptr; 642 CHECK(msg->findPointer("audioSource", &ptr)); 643 644 mPacketSources.add( 645 LiveSession::STREAMTYPE_AUDIO, 646 static_cast<AnotherPacketSource *>(ptr)); 647 } 648 649 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) { 650 void *ptr; 651 CHECK(msg->findPointer("videoSource", &ptr)); 652 653 mPacketSources.add( 654 LiveSession::STREAMTYPE_VIDEO, 655 static_cast<AnotherPacketSource *>(ptr)); 656 } 657 658 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) { 659 void *ptr; 660 CHECK(msg->findPointer("subtitleSource", &ptr)); 661 662 mPacketSources.add( 663 LiveSession::STREAMTYPE_SUBTITLES, 664 static_cast<AnotherPacketSource *>(ptr)); 665 } 666 667 void *ptr; 668 // metadataSource is not part of streamTypeMask 669 if ((streamTypeMask & (LiveSession::STREAMTYPE_AUDIO | LiveSession::STREAMTYPE_VIDEO)) 670 && msg->findPointer("metadataSource", &ptr)) { 671 mPacketSources.add( 672 LiveSession::STREAMTYPE_METADATA, 673 static_cast<AnotherPacketSource *>(ptr)); 674 } 675 676 mStreamTypeMask = streamTypeMask; 677 678 mSegmentStartTimeUs = segmentStartTimeUs; 679 680 if (startDiscontinuitySeq >= 0) { 681 mDiscontinuitySeq = startDiscontinuitySeq; 682 } 683 684 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; 685 mSeekMode = (LiveSession::SeekMode) seekMode; 686 687 if (startTimeUs >= 0 || mSeekMode == LiveSession::kSeekModeNextSample) { 688 mStartup = true; 689 mIDRFound = false; 690 mVideoBuffer->clear(); 691 } 692 693 if (startTimeUs >= 0) { 694 mStartTimeUs = startTimeUs; 695 mFirstPTSValid = false; 696 mSeqNumber = -1; 697 mTimeChangeSignaled = false; 698 mDownloadState->resetState(); 699 } 700 701 postMonitorQueue(); 702 703 return OK; 704} 705 706void PlaylistFetcher::onPause() { 707 cancelMonitorQueue(); 708 mLastDiscontinuitySeq = mDiscontinuitySeq; 709 710 resetStoppingThreshold(false /* disconnect */); 711} 712 713void PlaylistFetcher::onStop(const sp<AMessage> &msg) { 714 cancelMonitorQueue(); 715 716 int32_t clear; 717 CHECK(msg->findInt32("clear", &clear)); 718 if (clear) { 719 for (size_t i = 0; i < mPacketSources.size(); i++) { 720 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 721 packetSource->clear(); 722 } 723 } 724 725 mDownloadState->resetState(); 726 mPacketSources.clear(); 727 mStreamTypeMask = 0; 728 729 resetStoppingThreshold(true /* disconnect */); 730} 731 732// Resume until we have reached the boundary timestamps listed in `msg`; when 733// the remaining time is too short (within a resume threshold) stop immediately 734// instead. 735status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { 736 sp<AMessage> params; 737 CHECK(msg->findMessage("params", ¶ms)); 738 739 mStopParams = params; 740 onDownloadNext(); 741 742 return OK; 743} 744 745void PlaylistFetcher::notifyStopReached() { 746 sp<AMessage> notify = mNotify->dup(); 747 notify->setInt32("what", kWhatStopReached); 748 notify->post(); 749} 750 751void PlaylistFetcher::notifyError(status_t err) { 752 sp<AMessage> notify = mNotify->dup(); 753 notify->setInt32("what", kWhatError); 754 notify->setInt32("err", err); 755 notify->post(); 756} 757 758void PlaylistFetcher::queueDiscontinuity( 759 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) { 760 for (size_t i = 0; i < mPacketSources.size(); ++i) { 761 // do not discard buffer upon #EXT-X-DISCONTINUITY tag 762 // (seek will discard buffer by abandoning old fetchers) 763 mPacketSources.valueAt(i)->queueDiscontinuity( 764 type, extra, false /* discard */); 765 } 766} 767 768void PlaylistFetcher::onMonitorQueue() { 769 // in the middle of an unfinished download, delay 770 // playlist refresh as it'll change seq numbers 771 if (!mDownloadState->hasSavedState()) { 772 refreshPlaylist(); 773 } 774 775 int64_t targetDurationUs = kMinBufferedDurationUs; 776 if (mPlaylist != NULL) { 777 targetDurationUs = mPlaylist->getTargetDuration(); 778 } 779 780 int64_t bufferedDurationUs = 0ll; 781 status_t finalResult = OK; 782 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { 783 sp<AnotherPacketSource> packetSource = 784 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 785 786 bufferedDurationUs = 787 packetSource->getBufferedDurationUs(&finalResult); 788 } else { 789 // Use min stream duration, but ignore streams that never have any packet 790 // enqueued to prevent us from waiting on a non-existent stream; 791 // when we cannot make out from the manifest what streams are included in 792 // a playlist we might assume extra streams. 793 bufferedDurationUs = -1ll; 794 for (size_t i = 0; i < mPacketSources.size(); ++i) { 795 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0 796 || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) { 797 continue; 798 } 799 800 int64_t bufferedStreamDurationUs = 801 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); 802 803 FSLOGV(mPacketSources.keyAt(i), "buffered %lld", (long long)bufferedStreamDurationUs); 804 805 if (bufferedDurationUs == -1ll 806 || bufferedStreamDurationUs < bufferedDurationUs) { 807 bufferedDurationUs = bufferedStreamDurationUs; 808 } 809 } 810 if (bufferedDurationUs == -1ll) { 811 bufferedDurationUs = 0ll; 812 } 813 } 814 815 if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) { 816 FLOGV("monitoring, buffered=%lld < %lld", 817 (long long)bufferedDurationUs, (long long)kMinBufferedDurationUs); 818 819 // delay the next download slightly; hopefully this gives other concurrent fetchers 820 // a better chance to run. 821 // onDownloadNext(); 822 sp<AMessage> msg = new AMessage(kWhatDownloadNext, this); 823 msg->setInt32("generation", mMonitorQueueGeneration); 824 msg->post(1000l); 825 } else { 826 // We'd like to maintain buffering above durationToBufferUs, so try 827 // again when buffer just about to go below durationToBufferUs 828 // (or after targetDurationUs / 2, whichever is smaller). 829 int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll; 830 if (delayUs > targetDurationUs / 2) { 831 delayUs = targetDurationUs / 2; 832 } 833 834 FLOGV("pausing for %lld, buffered=%lld > %lld", 835 (long long)delayUs, 836 (long long)bufferedDurationUs, 837 (long long)kMinBufferedDurationUs); 838 839 postMonitorQueue(delayUs); 840 } 841} 842 843status_t PlaylistFetcher::refreshPlaylist() { 844 if (delayUsToRefreshPlaylist() <= 0) { 845 bool unchanged; 846 sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist( 847 mURI.c_str(), mPlaylistHash, &unchanged); 848 849 if (playlist == NULL) { 850 if (unchanged) { 851 // We succeeded in fetching the playlist, but it was 852 // unchanged from the last time we tried. 853 854 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) { 855 mRefreshState = (RefreshState)(mRefreshState + 1); 856 } 857 } else { 858 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str()); 859 return ERROR_IO; 860 } 861 } else { 862 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; 863 mPlaylist = playlist; 864 865 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 866 updateDuration(); 867 } 868 // Notify LiveSession to use target-duration based buffering level 869 // for up/down switch. Default LiveSession::kUpSwitchMark may not 870 // be reachable for live streams, as our max buffering amount is 871 // limited to 3 segments. 872 if (!mPlaylist->isComplete()) { 873 updateTargetDuration(); 874 } 875 mPlaylistTimeUs = ALooper::GetNowUs(); 876 } 877 878 mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); 879 } 880 return OK; 881} 882 883// static 884bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) { 885 return buffer->size() > 0 && buffer->data()[0] == 0x47; 886} 887 888bool PlaylistFetcher::shouldPauseDownload() { 889 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { 890 // doesn't apply to subtitles 891 return false; 892 } 893 894 // Calculate threshold to abort current download 895 float thresholdRatio = getStoppingThreshold(); 896 897 if (thresholdRatio < 0.0f) { 898 // never abort 899 return false; 900 } else if (thresholdRatio == 0.0f) { 901 // immediately abort 902 return true; 903 } 904 905 // now we have a positive thresholdUs, abort if remaining 906 // portion to download is over that threshold. 907 if (mSegmentFirstPTS < 0) { 908 // this means we haven't even find the first access unit, 909 // abort now as we must be very far away from the end. 910 return true; 911 } 912 int64_t lastEnqueueUs = mSegmentFirstPTS; 913 for (size_t i = 0; i < mPacketSources.size(); ++i) { 914 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { 915 continue; 916 } 917 sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); 918 int32_t type; 919 if (meta == NULL || meta->findInt32("discontinuity", &type)) { 920 continue; 921 } 922 int64_t tmpUs; 923 CHECK(meta->findInt64("timeUs", &tmpUs)); 924 if (tmpUs > lastEnqueueUs) { 925 lastEnqueueUs = tmpUs; 926 } 927 } 928 lastEnqueueUs -= mSegmentFirstPTS; 929 930 int64_t targetDurationUs = mPlaylist->getTargetDuration(); 931 int64_t thresholdUs = thresholdRatio * targetDurationUs; 932 933 FLOGV("%spausing now, thresholdUs %lld, remaining %lld", 934 targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ", 935 (long long)thresholdUs, 936 (long long)(targetDurationUs - lastEnqueueUs)); 937 938 if (targetDurationUs - lastEnqueueUs > thresholdUs) { 939 return true; 940 } 941 return false; 942} 943 944bool PlaylistFetcher::initDownloadState( 945 AString &uri, 946 sp<AMessage> &itemMeta, 947 int32_t &firstSeqNumberInPlaylist, 948 int32_t &lastSeqNumberInPlaylist) { 949 status_t err = refreshPlaylist(); 950 firstSeqNumberInPlaylist = 0; 951 lastSeqNumberInPlaylist = 0; 952 bool discontinuity = false; 953 954 if (mPlaylist != NULL) { 955 mPlaylist->getSeqNumberRange( 956 &firstSeqNumberInPlaylist, &lastSeqNumberInPlaylist); 957 958 if (mDiscontinuitySeq < 0) { 959 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 960 } 961 } 962 963 mSegmentFirstPTS = -1ll; 964 965 if (mPlaylist != NULL && mSeqNumber < 0) { 966 CHECK_GE(mStartTimeUs, 0ll); 967 968 if (mSegmentStartTimeUs < 0) { 969 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { 970 // If this is a live session, start 3 segments from the end on connect 971 mSeqNumber = lastSeqNumberInPlaylist - 3; 972 if (mSeqNumber < firstSeqNumberInPlaylist) { 973 mSeqNumber = firstSeqNumberInPlaylist; 974 } 975 } else { 976 // When seeking mSegmentStartTimeUs is unavailable (< 0), we 977 // use mStartTimeUs (client supplied timestamp) to determine both start segment 978 // and relative position inside a segment 979 mSeqNumber = getSeqNumberForTime(mStartTimeUs); 980 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber); 981 } 982 mStartTimeUsRelative = true; 983 FLOGV("Initial sequence number for time %lld is %d from (%d .. %d)", 984 (long long)mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, 985 lastSeqNumberInPlaylist); 986 } else { 987 // When adapting or track switching, mSegmentStartTimeUs (relative 988 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute 989 // timestamps coming from the media container) is used to determine the position 990 // inside a segments. 991 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES 992 && mSeekMode != LiveSession::kSeekModeNextSample) { 993 // avoid double fetch/decode 994 // Use (mSegmentStartTimeUs + 1/2 * targetDurationUs) to search 995 // for the starting segment in new variant. 996 // If the two variants' segments are aligned, this gives the 997 // next segment. If they're not aligned, this gives the segment 998 // that overlaps no more than 1/2 * targetDurationUs. 999 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs 1000 + mPlaylist->getTargetDuration() / 2); 1001 } else { 1002 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs); 1003 } 1004 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq); 1005 if (mSeqNumber < minSeq) { 1006 mSeqNumber = minSeq; 1007 } 1008 1009 if (mSeqNumber < firstSeqNumberInPlaylist) { 1010 mSeqNumber = firstSeqNumberInPlaylist; 1011 } 1012 1013 if (mSeqNumber > lastSeqNumberInPlaylist) { 1014 mSeqNumber = lastSeqNumberInPlaylist; 1015 } 1016 FLOGV("Initial sequence number is %d from (%d .. %d)", 1017 mSeqNumber, firstSeqNumberInPlaylist, 1018 lastSeqNumberInPlaylist); 1019 } 1020 } 1021 1022 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true 1023 if (mSeqNumber < firstSeqNumberInPlaylist 1024 || mSeqNumber > lastSeqNumberInPlaylist 1025 || err != OK) { 1026 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) { 1027 ++mNumRetries; 1028 1029 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) { 1030 // make sure we reach this retry logic on refresh failures 1031 // by adding an err != OK clause to all enclosing if's. 1032 1033 // refresh in increasing fraction (1/2, 1/3, ...) of the 1034 // playlist's target duration or 3 seconds, whichever is less 1035 int64_t delayUs = kMaxMonitorDelayUs; 1036 if (mPlaylist != NULL) { 1037 delayUs = mPlaylist->size() * mPlaylist->getTargetDuration() 1038 / (1 + mNumRetries); 1039 } 1040 if (delayUs > kMaxMonitorDelayUs) { 1041 delayUs = kMaxMonitorDelayUs; 1042 } 1043 FLOGV("sequence number high: %d from (%d .. %d), " 1044 "monitor in %lld (retry=%d)", 1045 mSeqNumber, firstSeqNumberInPlaylist, 1046 lastSeqNumberInPlaylist, (long long)delayUs, mNumRetries); 1047 postMonitorQueue(delayUs); 1048 return false; 1049 } 1050 1051 if (err != OK) { 1052 notifyError(err); 1053 return false; 1054 } 1055 1056 // we've missed the boat, let's start 3 segments prior to the latest sequence 1057 // number available and signal a discontinuity. 1058 1059 ALOGI("We've missed the boat, restarting playback." 1060 " mStartup=%d, was looking for %d in %d-%d", 1061 mStartup, mSeqNumber, firstSeqNumberInPlaylist, 1062 lastSeqNumberInPlaylist); 1063 if (mStopParams != NULL) { 1064 // we should have kept on fetching until we hit the boundaries in mStopParams, 1065 // but since the segments we are supposed to fetch have already rolled off 1066 // the playlist, i.e. we have already missed the boat, we inevitably have to 1067 // skip. 1068 notifyStopReached(); 1069 return false; 1070 } 1071 mSeqNumber = lastSeqNumberInPlaylist - 3; 1072 if (mSeqNumber < firstSeqNumberInPlaylist) { 1073 mSeqNumber = firstSeqNumberInPlaylist; 1074 } 1075 discontinuity = true; 1076 1077 // fall through 1078 } else { 1079 if (mPlaylist != NULL) { 1080 ALOGE("Cannot find sequence number %d in playlist " 1081 "(contains %d - %d)", 1082 mSeqNumber, firstSeqNumberInPlaylist, 1083 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1); 1084 1085 if (mTSParser != NULL) { 1086 mTSParser->signalEOS(ERROR_END_OF_STREAM); 1087 // Use an empty buffer; we don't have any new data, just want to extract 1088 // potential new access units after flush. Reset mSeqNumber to 1089 // lastSeqNumberInPlaylist such that we set the correct access unit 1090 // properties in extractAndQueueAccessUnitsFromTs. 1091 sp<ABuffer> buffer = new ABuffer(0); 1092 mSeqNumber = lastSeqNumberInPlaylist; 1093 extractAndQueueAccessUnitsFromTs(buffer); 1094 } 1095 notifyError(ERROR_END_OF_STREAM); 1096 } else { 1097 // It's possible that we were never able to download the playlist. 1098 // In this case we should notify error, instead of EOS, as EOS during 1099 // prepare means we succeeded in downloading everything. 1100 ALOGE("Failed to download playlist!"); 1101 notifyError(ERROR_IO); 1102 } 1103 1104 return false; 1105 } 1106 } 1107 1108 mNumRetries = 0; 1109 1110 CHECK(mPlaylist->itemAt( 1111 mSeqNumber - firstSeqNumberInPlaylist, 1112 &uri, 1113 &itemMeta)); 1114 1115 CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq)); 1116 1117 int32_t val; 1118 if (itemMeta->findInt32("discontinuity", &val) && val != 0) { 1119 discontinuity = true; 1120 } else if (mLastDiscontinuitySeq >= 0 1121 && mDiscontinuitySeq != mLastDiscontinuitySeq) { 1122 // Seek jumped to a new discontinuity sequence. We need to signal 1123 // a format change to decoder. Decoder needs to shutdown and be 1124 // created again if seamless format change is unsupported. 1125 FLOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, " 1126 "mDiscontinuitySeq %d, mStartTimeUs %lld", 1127 mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs); 1128 discontinuity = true; 1129 } 1130 mLastDiscontinuitySeq = -1; 1131 1132 // decrypt a junk buffer to prefetch key; since a session uses only one http connection, 1133 // this avoids interleaved connections to the key and segment file. 1134 { 1135 sp<ABuffer> junk = new ABuffer(16); 1136 junk->setRange(0, 16); 1137 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk, 1138 true /* first */); 1139 if (err == ERROR_NOT_CONNECTED) { 1140 return false; 1141 } else if (err != OK) { 1142 notifyError(err); 1143 return false; 1144 } 1145 } 1146 1147 if ((mStartup && !mTimeChangeSignaled) || discontinuity) { 1148 // We need to signal a time discontinuity to ATSParser on the 1149 // first segment after start, or on a discontinuity segment. 1150 // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX() 1151 // to send the time discontinuity. 1152 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 1153 // If this was a live event this made no sense since 1154 // we don't have access to all the segment before the current 1155 // one. 1156 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); 1157 } 1158 1159 // Setting mTimeChangeSignaled to true, so that if start time 1160 // searching goes into 2nd segment (without a discontinuity), 1161 // we don't reset time again. It causes corruption when pending 1162 // data in ATSParser is cleared. 1163 mTimeChangeSignaled = true; 1164 } 1165 1166 if (discontinuity) { 1167 ALOGI("queueing discontinuity (explicit=%d)", discontinuity); 1168 1169 // Signal a format discontinuity to ATSParser to clear partial data 1170 // from previous streams. Not doing this causes bitstream corruption. 1171 if (mTSParser != NULL) { 1172 mTSParser->signalDiscontinuity( 1173 ATSParser::DISCONTINUITY_FORMATCHANGE, NULL /* extra */); 1174 } 1175 1176 queueDiscontinuity( 1177 ATSParser::DISCONTINUITY_FORMAT_ONLY, 1178 NULL /* extra */); 1179 1180 if (mStartup && mStartTimeUsRelative && mFirstPTSValid) { 1181 // This means we guessed mStartTimeUs to be in the previous 1182 // segment (likely very close to the end), but either video or 1183 // audio has not found start by the end of that segment. 1184 // 1185 // If this new segment is not a discontinuity, keep searching. 1186 // 1187 // If this new segment even got a discontinuity marker, just 1188 // set mStartTimeUs=0, and take all samples from now on. 1189 mStartTimeUs = 0; 1190 mFirstPTSValid = false; 1191 mIDRFound = false; 1192 mVideoBuffer->clear(); 1193 } 1194 } 1195 1196 FLOGV("fetching segment %d from (%d .. %d)", 1197 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); 1198 return true; 1199} 1200 1201void PlaylistFetcher::onDownloadNext() { 1202 AString uri; 1203 sp<AMessage> itemMeta; 1204 sp<ABuffer> buffer; 1205 sp<ABuffer> tsBuffer; 1206 int32_t firstSeqNumberInPlaylist = 0; 1207 int32_t lastSeqNumberInPlaylist = 0; 1208 bool connectHTTP = true; 1209 1210 if (mDownloadState->hasSavedState()) { 1211 mDownloadState->restoreState( 1212 uri, 1213 itemMeta, 1214 buffer, 1215 tsBuffer, 1216 firstSeqNumberInPlaylist, 1217 lastSeqNumberInPlaylist); 1218 connectHTTP = false; 1219 FLOGV("resuming: '%s'", uri.c_str()); 1220 } else { 1221 if (!initDownloadState( 1222 uri, 1223 itemMeta, 1224 firstSeqNumberInPlaylist, 1225 lastSeqNumberInPlaylist)) { 1226 return; 1227 } 1228 FLOGV("fetching: '%s'", uri.c_str()); 1229 } 1230 1231 int64_t range_offset, range_length; 1232 if (!itemMeta->findInt64("range-offset", &range_offset) 1233 || !itemMeta->findInt64("range-length", &range_length)) { 1234 range_offset = 0; 1235 range_length = -1; 1236 } 1237 1238 // block-wise download 1239 bool shouldPause = false; 1240 ssize_t bytesRead; 1241 do { 1242 int64_t startUs = ALooper::GetNowUs(); 1243 bytesRead = mHTTPDownloader->fetchBlock( 1244 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, 1245 NULL /* actualURL */, connectHTTP); 1246 int64_t delayUs = ALooper::GetNowUs() - startUs; 1247 1248 if (bytesRead == ERROR_NOT_CONNECTED) { 1249 return; 1250 } 1251 if (bytesRead < 0) { 1252 status_t err = bytesRead; 1253 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); 1254 notifyError(err); 1255 return; 1256 } 1257 1258 // add sample for bandwidth estimation, excluding samples from subtitles (as 1259 // its too small), or during startup/resumeUntil (when we could have more than 1260 // one connection open which affects bandwidth) 1261 if (!mStartup && mStopParams == NULL && bytesRead > 0 1262 && (mStreamTypeMask 1263 & (LiveSession::STREAMTYPE_AUDIO 1264 | LiveSession::STREAMTYPE_VIDEO))) { 1265 mSession->addBandwidthMeasurement(bytesRead, delayUs); 1266 if (delayUs > 2000000ll) { 1267 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip", 1268 bytesRead, (double)delayUs / 1.0e6); 1269 } 1270 } 1271 1272 connectHTTP = false; 1273 1274 CHECK(buffer != NULL); 1275 1276 size_t size = buffer->size(); 1277 // Set decryption range. 1278 buffer->setRange(size - bytesRead, bytesRead); 1279 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer, 1280 buffer->offset() == 0 /* first */); 1281 // Unset decryption range. 1282 buffer->setRange(0, size); 1283 1284 if (err != OK) { 1285 ALOGE("decryptBuffer failed w/ error %d", err); 1286 1287 notifyError(err); 1288 return; 1289 } 1290 1291 bool startUp = mStartup; // save current start up state 1292 1293 err = OK; 1294 if (bufferStartsWithTsSyncByte(buffer)) { 1295 // Incremental extraction is only supported for MPEG2 transport streams. 1296 if (tsBuffer == NULL) { 1297 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 1298 tsBuffer->setRange(0, 0); 1299 } else if (tsBuffer->capacity() != buffer->capacity()) { 1300 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size(); 1301 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 1302 tsBuffer->setRange(tsOff, tsSize); 1303 } 1304 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); 1305 err = extractAndQueueAccessUnitsFromTs(tsBuffer); 1306 } 1307 1308 if (err == -EAGAIN) { 1309 // starting sequence number too low/high 1310 mTSParser.clear(); 1311 for (size_t i = 0; i < mPacketSources.size(); i++) { 1312 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1313 packetSource->clear(); 1314 } 1315 postMonitorQueue(); 1316 return; 1317 } else if (err == ERROR_OUT_OF_RANGE) { 1318 // reached stopping point 1319 notifyStopReached(); 1320 return; 1321 } else if (err != OK) { 1322 notifyError(err); 1323 return; 1324 } 1325 // If we're switching, post start notification 1326 // this should only be posted when the last chunk is full processed by TSParser 1327 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) { 1328 CHECK(mStartTimeUsNotify != NULL); 1329 mStartTimeUsNotify->post(); 1330 mStartTimeUsNotify.clear(); 1331 shouldPause = true; 1332 } 1333 if (shouldPause || shouldPauseDownload()) { 1334 // save state and return if this is not the last chunk, 1335 // leaving the fetcher in paused state. 1336 if (bytesRead != 0) { 1337 mDownloadState->saveState( 1338 uri, 1339 itemMeta, 1340 buffer, 1341 tsBuffer, 1342 firstSeqNumberInPlaylist, 1343 lastSeqNumberInPlaylist); 1344 return; 1345 } 1346 shouldPause = true; 1347 } 1348 } while (bytesRead != 0); 1349 1350 if (bufferStartsWithTsSyncByte(buffer)) { 1351 // If we don't see a stream in the program table after fetching a full ts segment 1352 // mark it as nonexistent. 1353 ATSParser::SourceType srcTypes[] = 1354 { ATSParser::VIDEO, ATSParser::AUDIO }; 1355 LiveSession::StreamType streamTypes[] = 1356 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO }; 1357 const size_t kNumTypes = NELEM(srcTypes); 1358 1359 for (size_t i = 0; i < kNumTypes; i++) { 1360 ATSParser::SourceType srcType = srcTypes[i]; 1361 LiveSession::StreamType streamType = streamTypes[i]; 1362 1363 sp<AnotherPacketSource> source = 1364 static_cast<AnotherPacketSource *>( 1365 mTSParser->getSource(srcType).get()); 1366 1367 if (!mTSParser->hasSource(srcType)) { 1368 ALOGW("MPEG2 Transport stream does not contain %s data.", 1369 srcType == ATSParser::VIDEO ? "video" : "audio"); 1370 1371 mStreamTypeMask &= ~streamType; 1372 mPacketSources.removeItem(streamType); 1373 } 1374 } 1375 1376 } 1377 1378 if (checkDecryptPadding(buffer) != OK) { 1379 ALOGE("Incorrect padding bytes after decryption."); 1380 notifyError(ERROR_MALFORMED); 1381 return; 1382 } 1383 1384 if (tsBuffer != NULL) { 1385 AString method; 1386 CHECK(buffer->meta()->findString("cipher-method", &method)); 1387 if ((tsBuffer->size() > 0 && method == "NONE") 1388 || tsBuffer->size() > 16) { 1389 ALOGE("MPEG2 transport stream is not an even multiple of 188 " 1390 "bytes in length."); 1391 notifyError(ERROR_MALFORMED); 1392 return; 1393 } 1394 } 1395 1396 // bulk extract non-ts files 1397 bool startUp = mStartup; 1398 if (tsBuffer == NULL) { 1399 status_t err = extractAndQueueAccessUnits(buffer, itemMeta); 1400 if (err == -EAGAIN) { 1401 // starting sequence number too low/high 1402 postMonitorQueue(); 1403 return; 1404 } else if (err == ERROR_OUT_OF_RANGE) { 1405 // reached stopping point 1406 notifyStopReached(); 1407 return; 1408 } else if (err != OK) { 1409 notifyError(err); 1410 return; 1411 } 1412 } 1413 1414 ++mSeqNumber; 1415 1416 // if adapting, pause after found the next starting point 1417 if (mSeekMode != LiveSession::kSeekModeExactPosition && startUp != mStartup) { 1418 CHECK(mStartTimeUsNotify != NULL); 1419 mStartTimeUsNotify->post(); 1420 mStartTimeUsNotify.clear(); 1421 shouldPause = true; 1422 } 1423 1424 if (!shouldPause) { 1425 postMonitorQueue(); 1426 } 1427} 1428 1429/* 1430 * returns true if we need to adjust mSeqNumber 1431 */ 1432bool PlaylistFetcher::adjustSeqNumberWithAnchorTime(int64_t anchorTimeUs) { 1433 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber(); 1434 1435 int64_t minDiffUs, maxDiffUs; 1436 if (mSeekMode == LiveSession::kSeekModeNextSample) { 1437 // if the previous fetcher paused in the middle of a segment, we 1438 // want to start at a segment that overlaps the last sample 1439 minDiffUs = -mPlaylist->getTargetDuration(); 1440 maxDiffUs = 0ll; 1441 } else { 1442 // if the previous fetcher paused at the end of a segment, ideally 1443 // we want to start at the segment that's roughly aligned with its 1444 // next segment, but if the two variants are not well aligned we 1445 // adjust the diff to within (-T/2, T/2) 1446 minDiffUs = -mPlaylist->getTargetDuration() / 2; 1447 maxDiffUs = mPlaylist->getTargetDuration() / 2; 1448 } 1449 1450 int32_t oldSeqNumber = mSeqNumber; 1451 ssize_t index = mSeqNumber - firstSeqNumberInPlaylist; 1452 1453 // adjust anchorTimeUs to within (minDiffUs, maxDiffUs) from mStartTimeUs 1454 int64_t diffUs = anchorTimeUs - mStartTimeUs; 1455 if (diffUs > maxDiffUs) { 1456 while (index > 0 && diffUs > maxDiffUs) { 1457 --index; 1458 1459 sp<AMessage> itemMeta; 1460 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); 1461 1462 int64_t itemDurationUs; 1463 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1464 1465 diffUs -= itemDurationUs; 1466 } 1467 } else if (diffUs < minDiffUs) { 1468 while (index + 1 < (ssize_t) mPlaylist->size() 1469 && diffUs < minDiffUs) { 1470 ++index; 1471 1472 sp<AMessage> itemMeta; 1473 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); 1474 1475 int64_t itemDurationUs; 1476 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1477 1478 diffUs += itemDurationUs; 1479 } 1480 } 1481 1482 mSeqNumber = firstSeqNumberInPlaylist + index; 1483 1484 if (mSeqNumber != oldSeqNumber) { 1485 FLOGV("guessed wrong seg number: diff %lld out of [%lld, %lld]", 1486 (long long) anchorTimeUs - mStartTimeUs, 1487 (long long) minDiffUs, 1488 (long long) maxDiffUs); 1489 return true; 1490 } 1491 return false; 1492} 1493 1494int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const { 1495 int32_t firstSeqNumberInPlaylist = mPlaylist->getFirstSeqNumber(); 1496 1497 size_t index = 0; 1498 while (index < mPlaylist->size()) { 1499 sp<AMessage> itemMeta; 1500 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); 1501 size_t curDiscontinuitySeq; 1502 CHECK(itemMeta->findInt32("discontinuity-sequence", (int32_t *)&curDiscontinuitySeq)); 1503 int32_t seqNumber = firstSeqNumberInPlaylist + index; 1504 if (curDiscontinuitySeq == discontinuitySeq) { 1505 return seqNumber; 1506 } else if (curDiscontinuitySeq > discontinuitySeq) { 1507 return seqNumber <= 0 ? 0 : seqNumber - 1; 1508 } 1509 1510 ++index; 1511 } 1512 1513 return firstSeqNumberInPlaylist + mPlaylist->size(); 1514} 1515 1516int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { 1517 size_t index = 0; 1518 int64_t segmentStartUs = 0; 1519 while (index < mPlaylist->size()) { 1520 sp<AMessage> itemMeta; 1521 CHECK(mPlaylist->itemAt( 1522 index, NULL /* uri */, &itemMeta)); 1523 1524 int64_t itemDurationUs; 1525 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1526 1527 if (timeUs < segmentStartUs + itemDurationUs) { 1528 break; 1529 } 1530 1531 segmentStartUs += itemDurationUs; 1532 ++index; 1533 } 1534 1535 if (index >= mPlaylist->size()) { 1536 index = mPlaylist->size() - 1; 1537 } 1538 1539 return mPlaylist->getFirstSeqNumber() + index; 1540} 1541 1542const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties( 1543 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) { 1544 sp<MetaData> format = source->getFormat(); 1545 if (format != NULL) { 1546 // for simplicity, store a reference to the format in each unit 1547 accessUnit->meta()->setObject("format", format); 1548 } 1549 1550 if (discard) { 1551 accessUnit->meta()->setInt32("discard", discard); 1552 } 1553 1554 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1555 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1556 accessUnit->meta()->setInt64("segmentFirstTimeUs", mSegmentFirstPTS); 1557 accessUnit->meta()->setInt64("segmentDurationUs", getSegmentDurationUs(mSeqNumber)); 1558 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { 1559 accessUnit->meta()->setInt64("playlistTimeUs", mPlaylistTimeUs); 1560 } 1561 return accessUnit; 1562} 1563 1564bool PlaylistFetcher::isStartTimeReached(int64_t timeUs) { 1565 if (!mFirstPTSValid) { 1566 mFirstTimeUs = timeUs; 1567 mFirstPTSValid = true; 1568 } 1569 bool startTimeReached = true; 1570 if (mStartTimeUsRelative) { 1571 FLOGV("startTimeUsRelative, timeUs (%lld) - %lld = %lld", 1572 (long long)timeUs, 1573 (long long)mFirstTimeUs, 1574 (long long)(timeUs - mFirstTimeUs)); 1575 timeUs -= mFirstTimeUs; 1576 if (timeUs < 0) { 1577 FLOGV("clamp negative timeUs to 0"); 1578 timeUs = 0; 1579 } 1580 startTimeReached = (timeUs >= mStartTimeUs); 1581 } 1582 return startTimeReached; 1583} 1584 1585status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) { 1586 if (mTSParser == NULL) { 1587 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. 1588 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); 1589 } 1590 1591 if (mNextPTSTimeUs >= 0ll) { 1592 sp<AMessage> extra = new AMessage; 1593 // Since we are using absolute timestamps, signal an offset of 0 to prevent 1594 // ATSParser from skewing the timestamps of access units. 1595 extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); 1596 1597 // When adapting, signal a recent media time to the parser, 1598 // so that PTS wrap around is handled for the new variant. 1599 if (mStartTimeUs >= 0 && !mStartTimeUsRelative) { 1600 extra->setInt64(IStreamListener::kKeyRecentMediaTimeUs, mStartTimeUs); 1601 } 1602 1603 mTSParser->signalDiscontinuity( 1604 ATSParser::DISCONTINUITY_TIME, extra); 1605 1606 mNextPTSTimeUs = -1ll; 1607 } 1608 1609 size_t offset = 0; 1610 while (offset + 188 <= buffer->size()) { 1611 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); 1612 1613 if (err != OK) { 1614 return err; 1615 } 1616 1617 offset += 188; 1618 } 1619 // setRange to indicate consumed bytes. 1620 buffer->setRange(buffer->offset() + offset, buffer->size() - offset); 1621 1622 if (mSegmentFirstPTS < 0ll) { 1623 // get the smallest first PTS from all streams present in this parser 1624 for (size_t i = mPacketSources.size(); i-- > 0;) { 1625 const LiveSession::StreamType stream = mPacketSources.keyAt(i); 1626 if (stream == LiveSession::STREAMTYPE_SUBTITLES) { 1627 ALOGE("MPEG2 Transport streams do not contain subtitles."); 1628 return ERROR_MALFORMED; 1629 } 1630 if (stream == LiveSession::STREAMTYPE_METADATA) { 1631 continue; 1632 } 1633 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream); 1634 sp<AnotherPacketSource> source = 1635 static_cast<AnotherPacketSource *>( 1636 mTSParser->getSource(type).get()); 1637 1638 if (source == NULL) { 1639 continue; 1640 } 1641 sp<AMessage> meta = source->getMetaAfterLastDequeued(0); 1642 if (meta != NULL) { 1643 int64_t timeUs; 1644 CHECK(meta->findInt64("timeUs", &timeUs)); 1645 if (mSegmentFirstPTS < 0ll || timeUs < mSegmentFirstPTS) { 1646 mSegmentFirstPTS = timeUs; 1647 } 1648 } 1649 } 1650 if (mSegmentFirstPTS < 0ll) { 1651 // didn't find any TS packet, can return early 1652 return OK; 1653 } 1654 if (!mStartTimeUsRelative) { 1655 // mStartup 1656 // mStartup is true until we have queued a packet for all the streams 1657 // we are fetching. We queue packets whose timestamps are greater than 1658 // mStartTimeUs. 1659 // mSegmentStartTimeUs >= 0 1660 // mSegmentStartTimeUs is non-negative when adapting or switching tracks 1661 // adjustSeqNumberWithAnchorTime(timeUs) == true 1662 // we guessed a seq number that's either too large or too small. 1663 // If this happens, we'll adjust mSeqNumber and restart fetching from new 1664 // location. Note that we only want to adjust once, so set mSegmentStartTimeUs 1665 // to -1 so that we don't enter this chunk next time. 1666 if (mStartup && mSegmentStartTimeUs >= 0 1667 && adjustSeqNumberWithAnchorTime(mSegmentFirstPTS)) { 1668 mStartTimeUsNotify = mNotify->dup(); 1669 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 1670 mStartTimeUsNotify->setString("uri", mURI); 1671 mIDRFound = false; 1672 mSegmentStartTimeUs = -1; 1673 return -EAGAIN; 1674 } 1675 } 1676 } 1677 1678 status_t err = OK; 1679 for (size_t i = mPacketSources.size(); i-- > 0;) { 1680 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1681 1682 const LiveSession::StreamType stream = mPacketSources.keyAt(i); 1683 if (stream == LiveSession::STREAMTYPE_SUBTITLES) { 1684 ALOGE("MPEG2 Transport streams do not contain subtitles."); 1685 return ERROR_MALFORMED; 1686 } 1687 1688 const char *key = LiveSession::getKeyForStream(stream); 1689 ATSParser::SourceType type =LiveSession::getSourceTypeForStream(stream); 1690 1691 sp<AnotherPacketSource> source = 1692 static_cast<AnotherPacketSource *>( 1693 mTSParser->getSource(type).get()); 1694 1695 if (source == NULL) { 1696 continue; 1697 } 1698 1699 const char *mime; 1700 sp<MetaData> format = source->getFormat(); 1701 bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime) 1702 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC); 1703 1704 sp<ABuffer> accessUnit; 1705 status_t finalResult; 1706 while (source->hasBufferAvailable(&finalResult) 1707 && source->dequeueAccessUnit(&accessUnit) == OK) { 1708 1709 int64_t timeUs; 1710 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1711 1712 if (mStartup) { 1713 bool startTimeReached = isStartTimeReached(timeUs); 1714 1715 if (!startTimeReached || (isAvc && !mIDRFound)) { 1716 // buffer up to the closest preceding IDR frame in the next segement, 1717 // or the closest succeeding IDR frame after the exact position 1718 FSLOGV(stream, "timeUs(%lld)-mStartTimeUs(%lld)=%lld, mIDRFound=%d", 1719 (long long)timeUs, 1720 (long long)mStartTimeUs, 1721 (long long)timeUs - mStartTimeUs, 1722 mIDRFound); 1723 if (isAvc) { 1724 if (IsIDR(accessUnit)) { 1725 mVideoBuffer->clear(); 1726 FSLOGV(stream, "found IDR, clear mVideoBuffer"); 1727 mIDRFound = true; 1728 } 1729 if (mIDRFound && mStartTimeUsRelative && !startTimeReached) { 1730 mVideoBuffer->queueAccessUnit(accessUnit); 1731 FSLOGV(stream, "saving AVC video AccessUnit"); 1732 } 1733 } 1734 if (!startTimeReached || (isAvc && !mIDRFound)) { 1735 continue; 1736 } 1737 } 1738 } 1739 1740 if (mStartTimeUsNotify != NULL) { 1741 uint32_t streamMask = 0; 1742 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); 1743 if ((mStreamTypeMask & mPacketSources.keyAt(i)) 1744 && !(streamMask & mPacketSources.keyAt(i))) { 1745 streamMask |= mPacketSources.keyAt(i); 1746 mStartTimeUsNotify->setInt32("streamMask", streamMask); 1747 FSLOGV(stream, "found start point, timeUs=%lld, streamMask becomes %x", 1748 (long long)timeUs, streamMask); 1749 1750 if (streamMask == mStreamTypeMask) { 1751 FLOGV("found start point for all streams"); 1752 mStartup = false; 1753 } 1754 } 1755 } 1756 1757 if (mStopParams != NULL) { 1758 int32_t discontinuitySeq; 1759 int64_t stopTimeUs; 1760 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1761 || discontinuitySeq > mDiscontinuitySeq 1762 || !mStopParams->findInt64(key, &stopTimeUs) 1763 || (discontinuitySeq == mDiscontinuitySeq 1764 && timeUs >= stopTimeUs)) { 1765 FSLOGV(stream, "reached stop point, timeUs=%lld", (long long)timeUs); 1766 mStreamTypeMask &= ~stream; 1767 mPacketSources.removeItemsAt(i); 1768 break; 1769 } 1770 } 1771 1772 if (stream == LiveSession::STREAMTYPE_VIDEO) { 1773 const bool discard = true; 1774 status_t status; 1775 while (mVideoBuffer->hasBufferAvailable(&status)) { 1776 sp<ABuffer> videoBuffer; 1777 mVideoBuffer->dequeueAccessUnit(&videoBuffer); 1778 setAccessUnitProperties(videoBuffer, source, discard); 1779 packetSource->queueAccessUnit(videoBuffer); 1780 int64_t bufferTimeUs; 1781 CHECK(videoBuffer->meta()->findInt64("timeUs", &bufferTimeUs)); 1782 FSLOGV(stream, "queueAccessUnit (saved), timeUs=%lld", 1783 (long long)bufferTimeUs); 1784 } 1785 } else if (stream == LiveSession::STREAMTYPE_METADATA && !mHasMetadata) { 1786 mHasMetadata = true; 1787 sp<AMessage> notify = mNotify->dup(); 1788 notify->setInt32("what", kWhatMetadataDetected); 1789 notify->post(); 1790 } 1791 1792 setAccessUnitProperties(accessUnit, source); 1793 packetSource->queueAccessUnit(accessUnit); 1794 FSLOGV(stream, "queueAccessUnit, timeUs=%lld", (long long)timeUs); 1795 } 1796 1797 if (err != OK) { 1798 break; 1799 } 1800 } 1801 1802 if (err != OK) { 1803 for (size_t i = mPacketSources.size(); i-- > 0;) { 1804 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1805 packetSource->clear(); 1806 } 1807 return err; 1808 } 1809 1810 if (!mStreamTypeMask) { 1811 // Signal gap is filled between original and new stream. 1812 FLOGV("reached stop point for all streams"); 1813 return ERROR_OUT_OF_RANGE; 1814 } 1815 1816 return OK; 1817} 1818 1819/* static */ 1820bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence( 1821 const sp<ABuffer> &buffer) { 1822 size_t pos = 0; 1823 1824 // skip possible BOM 1825 if (buffer->size() >= pos + 3 && 1826 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) { 1827 pos += 3; 1828 } 1829 1830 // accept WEBVTT followed by SPACE, TAB or (CR) LF 1831 if (buffer->size() < pos + 6 || 1832 memcmp("WEBVTT", buffer->data() + pos, 6)) { 1833 return false; 1834 } 1835 pos += 6; 1836 1837 if (buffer->size() == pos) { 1838 return true; 1839 } 1840 1841 uint8_t sep = buffer->data()[pos]; 1842 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r'; 1843} 1844 1845status_t PlaylistFetcher::extractAndQueueAccessUnits( 1846 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { 1847 if (bufferStartsWithWebVTTMagicSequence(buffer)) { 1848 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { 1849 ALOGE("This stream only contains subtitles."); 1850 return ERROR_MALFORMED; 1851 } 1852 1853 const sp<AnotherPacketSource> packetSource = 1854 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 1855 1856 int64_t durationUs; 1857 CHECK(itemMeta->findInt64("durationUs", &durationUs)); 1858 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); 1859 buffer->meta()->setInt64("durationUs", durationUs); 1860 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1861 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1862 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration); 1863 packetSource->queueAccessUnit(buffer); 1864 return OK; 1865 } 1866 1867 if (mNextPTSTimeUs >= 0ll) { 1868 mNextPTSTimeUs = -1ll; 1869 } 1870 1871 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio 1872 // stream prefixed by an ID3 tag. 1873 1874 bool firstID3Tag = true; 1875 uint64_t PTS = 0; 1876 1877 for (;;) { 1878 // Make sure to skip all ID3 tags preceding the audio data. 1879 // At least one must be present to provide the PTS timestamp. 1880 1881 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */); 1882 if (!id3.isValid()) { 1883 if (firstID3Tag) { 1884 ALOGE("Unable to parse ID3 tag."); 1885 return ERROR_MALFORMED; 1886 } else { 1887 break; 1888 } 1889 } 1890 1891 if (firstID3Tag) { 1892 bool found = false; 1893 1894 ID3::Iterator it(id3, "PRIV"); 1895 while (!it.done()) { 1896 size_t length; 1897 const uint8_t *data = it.getData(&length); 1898 1899 static const char *kMatchName = 1900 "com.apple.streaming.transportStreamTimestamp"; 1901 static const size_t kMatchNameLen = strlen(kMatchName); 1902 1903 if (length == kMatchNameLen + 1 + 8 1904 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) { 1905 found = true; 1906 PTS = U64_AT(&data[kMatchNameLen + 1]); 1907 } 1908 1909 it.next(); 1910 } 1911 1912 if (!found) { 1913 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag."); 1914 return ERROR_MALFORMED; 1915 } 1916 } 1917 1918 // skip the ID3 tag 1919 buffer->setRange( 1920 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize()); 1921 1922 firstID3Tag = false; 1923 } 1924 1925 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { 1926 ALOGW("This stream only contains audio data!"); 1927 1928 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO; 1929 1930 if (mStreamTypeMask == 0) { 1931 return OK; 1932 } 1933 } 1934 1935 sp<AnotherPacketSource> packetSource = 1936 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO); 1937 1938 if (packetSource->getFormat() == NULL && buffer->size() >= 7) { 1939 ABitReader bits(buffer->data(), buffer->size()); 1940 1941 // adts_fixed_header 1942 1943 CHECK_EQ(bits.getBits(12), 0xfffu); 1944 bits.skipBits(3); // ID, layer 1945 bool protection_absent __unused = bits.getBits(1) != 0; 1946 1947 unsigned profile = bits.getBits(2); 1948 CHECK_NE(profile, 3u); 1949 unsigned sampling_freq_index = bits.getBits(4); 1950 bits.getBits(1); // private_bit 1951 unsigned channel_configuration = bits.getBits(3); 1952 CHECK_NE(channel_configuration, 0u); 1953 bits.skipBits(2); // original_copy, home 1954 1955 sp<MetaData> meta = MakeAACCodecSpecificData( 1956 profile, sampling_freq_index, channel_configuration); 1957 1958 meta->setInt32(kKeyIsADTS, true); 1959 1960 packetSource->setFormat(meta); 1961 } 1962 1963 int64_t numSamples = 0ll; 1964 int32_t sampleRate; 1965 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); 1966 1967 int64_t timeUs = (PTS * 100ll) / 9ll; 1968 if (mStartup && !mFirstPTSValid) { 1969 mFirstPTSValid = true; 1970 mFirstTimeUs = timeUs; 1971 } 1972 1973 if (mSegmentFirstPTS < 0ll) { 1974 mSegmentFirstPTS = timeUs; 1975 if (!mStartTimeUsRelative) { 1976 // Duplicated logic from how we handle .ts playlists. 1977 if (mStartup && mSegmentStartTimeUs >= 0 1978 && adjustSeqNumberWithAnchorTime(timeUs)) { 1979 mSegmentStartTimeUs = -1; 1980 return -EAGAIN; 1981 } 1982 } 1983 } 1984 1985 size_t offset = 0; 1986 while (offset < buffer->size()) { 1987 const uint8_t *adtsHeader = buffer->data() + offset; 1988 CHECK_LT(offset + 5, buffer->size()); 1989 1990 unsigned aac_frame_length = 1991 ((adtsHeader[3] & 3) << 11) 1992 | (adtsHeader[4] << 3) 1993 | (adtsHeader[5] >> 5); 1994 1995 if (aac_frame_length == 0) { 1996 const uint8_t *id3Header = adtsHeader; 1997 if (!memcmp(id3Header, "ID3", 3)) { 1998 ID3 id3(id3Header, buffer->size() - offset, true); 1999 if (id3.isValid()) { 2000 offset += id3.rawSize(); 2001 continue; 2002 }; 2003 } 2004 return ERROR_MALFORMED; 2005 } 2006 2007 CHECK_LE(offset + aac_frame_length, buffer->size()); 2008 2009 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; 2010 offset += aac_frame_length; 2011 2012 // Each AAC frame encodes 1024 samples. 2013 numSamples += 1024; 2014 2015 if (mStartup) { 2016 int64_t startTimeUs = unitTimeUs; 2017 if (mStartTimeUsRelative) { 2018 startTimeUs -= mFirstTimeUs; 2019 if (startTimeUs < 0) { 2020 startTimeUs = 0; 2021 } 2022 } 2023 if (startTimeUs < mStartTimeUs) { 2024 continue; 2025 } 2026 2027 if (mStartTimeUsNotify != NULL) { 2028 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO); 2029 mStartup = false; 2030 } 2031 } 2032 2033 if (mStopParams != NULL) { 2034 int32_t discontinuitySeq; 2035 int64_t stopTimeUs; 2036 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 2037 || discontinuitySeq > mDiscontinuitySeq 2038 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs) 2039 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) { 2040 mStreamTypeMask = 0; 2041 mPacketSources.clear(); 2042 return ERROR_OUT_OF_RANGE; 2043 } 2044 } 2045 2046 sp<ABuffer> unit = new ABuffer(aac_frame_length); 2047 memcpy(unit->data(), adtsHeader, aac_frame_length); 2048 2049 unit->meta()->setInt64("timeUs", unitTimeUs); 2050 setAccessUnitProperties(unit, packetSource); 2051 packetSource->queueAccessUnit(unit); 2052 } 2053 2054 return OK; 2055} 2056 2057void PlaylistFetcher::updateDuration() { 2058 int64_t durationUs = 0ll; 2059 for (size_t index = 0; index < mPlaylist->size(); ++index) { 2060 sp<AMessage> itemMeta; 2061 CHECK(mPlaylist->itemAt( 2062 index, NULL /* uri */, &itemMeta)); 2063 2064 int64_t itemDurationUs; 2065 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 2066 2067 durationUs += itemDurationUs; 2068 } 2069 2070 sp<AMessage> msg = mNotify->dup(); 2071 msg->setInt32("what", kWhatDurationUpdate); 2072 msg->setInt64("durationUs", durationUs); 2073 msg->post(); 2074} 2075 2076void PlaylistFetcher::updateTargetDuration() { 2077 sp<AMessage> msg = mNotify->dup(); 2078 msg->setInt32("what", kWhatTargetDurationUpdate); 2079 msg->setInt64("targetDurationUs", mPlaylist->getTargetDuration()); 2080 msg->post(); 2081} 2082 2083} // namespace android 2084