PlaylistFetcher.cpp revision f0d689934e70d3e5b3784265e890377db04c7c1d
1/* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "PlaylistFetcher" 19#include <utils/Log.h> 20 21#include "PlaylistFetcher.h" 22 23#include "LiveDataSource.h" 24#include "LiveSession.h" 25#include "M3UParser.h" 26 27#include "include/avc_utils.h" 28#include "include/HTTPBase.h" 29#include "include/ID3.h" 30#include "mpeg2ts/AnotherPacketSource.h" 31 32#include <media/IStreamSource.h> 33#include <media/stagefright/foundation/ABitReader.h> 34#include <media/stagefright/foundation/ABuffer.h> 35#include <media/stagefright/foundation/ADebug.h> 36#include <media/stagefright/foundation/hexdump.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaDefs.h> 39#include <media/stagefright/MetaData.h> 40#include <media/stagefright/Utils.h> 41 42#include <ctype.h> 43#include <inttypes.h> 44#include <openssl/aes.h> 45#include <openssl/md5.h> 46 47namespace android { 48 49// static 50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; 51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; 52const int32_t PlaylistFetcher::kDownloadBlockSize = 2048; 53const int32_t PlaylistFetcher::kNumSkipFrames = 10; 54 55PlaylistFetcher::PlaylistFetcher( 56 const sp<AMessage> ¬ify, 57 const sp<LiveSession> &session, 58 const char *uri, 59 int32_t subtitleGeneration) 60 : mNotify(notify), 61 mStartTimeUsNotify(notify->dup()), 62 mSession(session), 63 mURI(uri), 64 mStreamTypeMask(0), 65 mStartTimeUs(-1ll), 66 mSegmentStartTimeUs(-1ll), 67 mDiscontinuitySeq(-1ll), 68 mStartTimeUsRelative(false), 69 mLastPlaylistFetchTimeUs(-1ll), 70 mSeqNumber(-1), 71 mNumRetries(0), 72 mStartup(true), 73 mAdaptive(false), 74 mPrepared(false), 75 mNextPTSTimeUs(-1ll), 76 mMonitorQueueGeneration(0), 77 mSubtitleGeneration(subtitleGeneration), 78 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), 79 mFirstPTSValid(false), 80 mAbsoluteTimeAnchorUs(0ll), 81 mVideoBuffer(new AnotherPacketSource(NULL)) { 82 memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); 83 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 84 mStartTimeUsNotify->setInt32("streamMask", 0); 85} 86 87PlaylistFetcher::~PlaylistFetcher() { 88} 89 90int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { 91 CHECK(mPlaylist != NULL); 92 93 int32_t firstSeqNumberInPlaylist; 94 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 95 "media-sequence", &firstSeqNumberInPlaylist)) { 96 firstSeqNumberInPlaylist = 0; 97 } 98 99 int32_t lastSeqNumberInPlaylist = 100 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 101 102 CHECK_GE(seqNumber, firstSeqNumberInPlaylist); 103 CHECK_LE(seqNumber, lastSeqNumberInPlaylist); 104 105 int64_t segmentStartUs = 0ll; 106 for (int32_t index = 0; 107 index < seqNumber - firstSeqNumberInPlaylist; ++index) { 108 sp<AMessage> itemMeta; 109 CHECK(mPlaylist->itemAt( 110 index, NULL /* uri */, &itemMeta)); 111 112 int64_t itemDurationUs; 113 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 114 115 segmentStartUs += itemDurationUs; 116 } 117 118 return segmentStartUs; 119} 120 121int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { 122 int64_t nowUs = ALooper::GetNowUs(); 123 124 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) { 125 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY); 126 return 0ll; 127 } 128 129 if (mPlaylist->isComplete()) { 130 return (~0llu >> 1); 131 } 132 133 int32_t targetDurationSecs; 134 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 135 136 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 137 138 int64_t minPlaylistAgeUs; 139 140 switch (mRefreshState) { 141 case INITIAL_MINIMUM_RELOAD_DELAY: 142 { 143 size_t n = mPlaylist->size(); 144 if (n > 0) { 145 sp<AMessage> itemMeta; 146 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta)); 147 148 int64_t itemDurationUs; 149 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 150 151 minPlaylistAgeUs = itemDurationUs; 152 break; 153 } 154 155 // fall through 156 } 157 158 case FIRST_UNCHANGED_RELOAD_ATTEMPT: 159 { 160 minPlaylistAgeUs = targetDurationUs / 2; 161 break; 162 } 163 164 case SECOND_UNCHANGED_RELOAD_ATTEMPT: 165 { 166 minPlaylistAgeUs = (targetDurationUs * 3) / 2; 167 break; 168 } 169 170 case THIRD_UNCHANGED_RELOAD_ATTEMPT: 171 { 172 minPlaylistAgeUs = targetDurationUs * 3; 173 break; 174 } 175 176 default: 177 TRESPASS(); 178 break; 179 } 180 181 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs; 182 return delayUs > 0ll ? delayUs : 0ll; 183} 184 185status_t PlaylistFetcher::decryptBuffer( 186 size_t playlistIndex, const sp<ABuffer> &buffer, 187 bool first) { 188 sp<AMessage> itemMeta; 189 bool found = false; 190 AString method; 191 192 for (ssize_t i = playlistIndex; i >= 0; --i) { 193 AString uri; 194 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); 195 196 if (itemMeta->findString("cipher-method", &method)) { 197 found = true; 198 break; 199 } 200 } 201 202 if (!found) { 203 method = "NONE"; 204 } 205 buffer->meta()->setString("cipher-method", method.c_str()); 206 207 if (method == "NONE") { 208 return OK; 209 } else if (!(method == "AES-128")) { 210 ALOGE("Unsupported cipher method '%s'", method.c_str()); 211 return ERROR_UNSUPPORTED; 212 } 213 214 AString keyURI; 215 if (!itemMeta->findString("cipher-uri", &keyURI)) { 216 ALOGE("Missing key uri"); 217 return ERROR_MALFORMED; 218 } 219 220 ssize_t index = mAESKeyForURI.indexOfKey(keyURI); 221 222 sp<ABuffer> key; 223 if (index >= 0) { 224 key = mAESKeyForURI.valueAt(index); 225 } else { 226 ssize_t err = mSession->fetchFile(keyURI.c_str(), &key); 227 228 if (err < 0) { 229 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); 230 return ERROR_IO; 231 } else if (key->size() != 16) { 232 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str()); 233 return ERROR_MALFORMED; 234 } 235 236 mAESKeyForURI.add(keyURI, key); 237 } 238 239 AES_KEY aes_key; 240 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { 241 ALOGE("failed to set AES decryption key."); 242 return UNKNOWN_ERROR; 243 } 244 245 size_t n = buffer->size(); 246 if (!n) { 247 return OK; 248 } 249 CHECK(n % 16 == 0); 250 251 if (first) { 252 // If decrypting the first block in a file, read the iv from the manifest 253 // or derive the iv from the file's sequence number. 254 255 AString iv; 256 if (itemMeta->findString("cipher-iv", &iv)) { 257 if ((!iv.startsWith("0x") && !iv.startsWith("0X")) 258 || iv.size() != 16 * 2 + 2) { 259 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 260 return ERROR_MALFORMED; 261 } 262 263 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 264 for (size_t i = 0; i < 16; ++i) { 265 char c1 = tolower(iv.c_str()[2 + 2 * i]); 266 char c2 = tolower(iv.c_str()[3 + 2 * i]); 267 if (!isxdigit(c1) || !isxdigit(c2)) { 268 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 269 return ERROR_MALFORMED; 270 } 271 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; 272 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; 273 274 mAESInitVec[i] = nibble1 << 4 | nibble2; 275 } 276 } else { 277 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 278 mAESInitVec[15] = mSeqNumber & 0xff; 279 mAESInitVec[14] = (mSeqNumber >> 8) & 0xff; 280 mAESInitVec[13] = (mSeqNumber >> 16) & 0xff; 281 mAESInitVec[12] = (mSeqNumber >> 24) & 0xff; 282 } 283 } 284 285 AES_cbc_encrypt( 286 buffer->data(), buffer->data(), buffer->size(), 287 &aes_key, mAESInitVec, AES_DECRYPT); 288 289 return OK; 290} 291 292status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) { 293 status_t err; 294 AString method; 295 CHECK(buffer->meta()->findString("cipher-method", &method)); 296 if (method == "NONE") { 297 return OK; 298 } 299 300 uint8_t padding = 0; 301 if (buffer->size() > 0) { 302 padding = buffer->data()[buffer->size() - 1]; 303 } 304 305 if (padding > 16) { 306 return ERROR_MALFORMED; 307 } 308 309 for (size_t i = buffer->size() - padding; i < padding; i++) { 310 if (buffer->data()[i] != padding) { 311 return ERROR_MALFORMED; 312 } 313 } 314 315 buffer->setRange(buffer->offset(), buffer->size() - padding); 316 return OK; 317} 318 319void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { 320 int64_t maxDelayUs = delayUsToRefreshPlaylist(); 321 if (maxDelayUs < minDelayUs) { 322 maxDelayUs = minDelayUs; 323 } 324 if (delayUs > maxDelayUs) { 325 ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs); 326 delayUs = maxDelayUs; 327 } 328 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); 329 msg->setInt32("generation", mMonitorQueueGeneration); 330 msg->post(delayUs); 331} 332 333void PlaylistFetcher::cancelMonitorQueue() { 334 ++mMonitorQueueGeneration; 335} 336 337void PlaylistFetcher::startAsync( 338 const sp<AnotherPacketSource> &audioSource, 339 const sp<AnotherPacketSource> &videoSource, 340 const sp<AnotherPacketSource> &subtitleSource, 341 int64_t startTimeUs, 342 int64_t segmentStartTimeUs, 343 int32_t startDiscontinuitySeq, 344 bool adaptive) { 345 sp<AMessage> msg = new AMessage(kWhatStart, id()); 346 347 uint32_t streamTypeMask = 0ul; 348 349 if (audioSource != NULL) { 350 msg->setPointer("audioSource", audioSource.get()); 351 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO; 352 } 353 354 if (videoSource != NULL) { 355 msg->setPointer("videoSource", videoSource.get()); 356 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO; 357 } 358 359 if (subtitleSource != NULL) { 360 msg->setPointer("subtitleSource", subtitleSource.get()); 361 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES; 362 } 363 364 msg->setInt32("streamTypeMask", streamTypeMask); 365 msg->setInt64("startTimeUs", startTimeUs); 366 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs); 367 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq); 368 msg->setInt32("adaptive", adaptive); 369 msg->post(); 370} 371 372void PlaylistFetcher::pauseAsync() { 373 (new AMessage(kWhatPause, id()))->post(); 374} 375 376void PlaylistFetcher::stopAsync(bool clear) { 377 sp<AMessage> msg = new AMessage(kWhatStop, id()); 378 msg->setInt32("clear", clear); 379 msg->post(); 380} 381 382void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) { 383 AMessage* msg = new AMessage(kWhatResumeUntil, id()); 384 msg->setMessage("params", params); 385 msg->post(); 386} 387 388void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { 389 switch (msg->what()) { 390 case kWhatStart: 391 { 392 status_t err = onStart(msg); 393 394 sp<AMessage> notify = mNotify->dup(); 395 notify->setInt32("what", kWhatStarted); 396 notify->setInt32("err", err); 397 notify->post(); 398 break; 399 } 400 401 case kWhatPause: 402 { 403 onPause(); 404 405 sp<AMessage> notify = mNotify->dup(); 406 notify->setInt32("what", kWhatPaused); 407 notify->post(); 408 break; 409 } 410 411 case kWhatStop: 412 { 413 onStop(msg); 414 415 sp<AMessage> notify = mNotify->dup(); 416 notify->setInt32("what", kWhatStopped); 417 notify->post(); 418 break; 419 } 420 421 case kWhatMonitorQueue: 422 case kWhatDownloadNext: 423 { 424 int32_t generation; 425 CHECK(msg->findInt32("generation", &generation)); 426 427 if (generation != mMonitorQueueGeneration) { 428 // Stale event 429 break; 430 } 431 432 if (msg->what() == kWhatMonitorQueue) { 433 onMonitorQueue(); 434 } else { 435 onDownloadNext(); 436 } 437 break; 438 } 439 440 case kWhatResumeUntil: 441 { 442 onResumeUntil(msg); 443 break; 444 } 445 446 default: 447 TRESPASS(); 448 } 449} 450 451status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { 452 mPacketSources.clear(); 453 454 uint32_t streamTypeMask; 455 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); 456 457 int64_t startTimeUs; 458 int64_t segmentStartTimeUs; 459 int32_t startDiscontinuitySeq; 460 int32_t adaptive; 461 CHECK(msg->findInt64("startTimeUs", &startTimeUs)); 462 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs)); 463 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq)); 464 CHECK(msg->findInt32("adaptive", &adaptive)); 465 466 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { 467 void *ptr; 468 CHECK(msg->findPointer("audioSource", &ptr)); 469 470 mPacketSources.add( 471 LiveSession::STREAMTYPE_AUDIO, 472 static_cast<AnotherPacketSource *>(ptr)); 473 } 474 475 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) { 476 void *ptr; 477 CHECK(msg->findPointer("videoSource", &ptr)); 478 479 mPacketSources.add( 480 LiveSession::STREAMTYPE_VIDEO, 481 static_cast<AnotherPacketSource *>(ptr)); 482 } 483 484 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) { 485 void *ptr; 486 CHECK(msg->findPointer("subtitleSource", &ptr)); 487 488 mPacketSources.add( 489 LiveSession::STREAMTYPE_SUBTITLES, 490 static_cast<AnotherPacketSource *>(ptr)); 491 } 492 493 mStreamTypeMask = streamTypeMask; 494 495 mSegmentStartTimeUs = segmentStartTimeUs; 496 mDiscontinuitySeq = startDiscontinuitySeq; 497 498 if (startTimeUs >= 0) { 499 mStartTimeUs = startTimeUs; 500 mSeqNumber = -1; 501 mStartup = true; 502 mPrepared = false; 503 mAdaptive = adaptive; 504 } 505 506 postMonitorQueue(); 507 508 return OK; 509} 510 511void PlaylistFetcher::onPause() { 512 cancelMonitorQueue(); 513} 514 515void PlaylistFetcher::onStop(const sp<AMessage> &msg) { 516 cancelMonitorQueue(); 517 518 int32_t clear; 519 CHECK(msg->findInt32("clear", &clear)); 520 if (clear) { 521 for (size_t i = 0; i < mPacketSources.size(); i++) { 522 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 523 packetSource->clear(); 524 } 525 } 526 527 mPacketSources.clear(); 528 mStreamTypeMask = 0; 529} 530 531// Resume until we have reached the boundary timestamps listed in `msg`; when 532// the remaining time is too short (within a resume threshold) stop immediately 533// instead. 534status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { 535 sp<AMessage> params; 536 CHECK(msg->findMessage("params", ¶ms)); 537 538 bool stop = false; 539 for (size_t i = 0; i < mPacketSources.size(); i++) { 540 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 541 542 const char *stopKey; 543 int streamType = mPacketSources.keyAt(i); 544 switch (streamType) { 545 case LiveSession::STREAMTYPE_VIDEO: 546 stopKey = "timeUsVideo"; 547 break; 548 549 case LiveSession::STREAMTYPE_AUDIO: 550 stopKey = "timeUsAudio"; 551 break; 552 553 case LiveSession::STREAMTYPE_SUBTITLES: 554 stopKey = "timeUsSubtitle"; 555 break; 556 557 default: 558 TRESPASS(); 559 } 560 561 // Don't resume if we would stop within a resume threshold. 562 int32_t discontinuitySeq; 563 int64_t latestTimeUs = 0, stopTimeUs = 0; 564 sp<AMessage> latestMeta = packetSource->getLatestDequeuedMeta(); 565 if (latestMeta != NULL 566 && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq) 567 && discontinuitySeq == mDiscontinuitySeq 568 && latestMeta->findInt64("timeUs", &latestTimeUs) 569 && params->findInt64(stopKey, &stopTimeUs) 570 && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) { 571 stop = true; 572 } 573 } 574 575 if (stop) { 576 for (size_t i = 0; i < mPacketSources.size(); i++) { 577 mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); 578 } 579 stopAsync(/* clear = */ false); 580 return OK; 581 } 582 583 mStopParams = params; 584 postMonitorQueue(); 585 586 return OK; 587} 588 589void PlaylistFetcher::notifyError(status_t err) { 590 sp<AMessage> notify = mNotify->dup(); 591 notify->setInt32("what", kWhatError); 592 notify->setInt32("err", err); 593 notify->post(); 594} 595 596void PlaylistFetcher::queueDiscontinuity( 597 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) { 598 for (size_t i = 0; i < mPacketSources.size(); ++i) { 599 // do not discard buffer upon #EXT-X-DISCONTINUITY tag 600 // (seek will discard buffer by abandoning old fetchers) 601 mPacketSources.valueAt(i)->queueDiscontinuity( 602 type, extra, false /* discard */); 603 } 604} 605 606void PlaylistFetcher::onMonitorQueue() { 607 bool downloadMore = false; 608 refreshPlaylist(); 609 610 int32_t targetDurationSecs; 611 int64_t targetDurationUs = kMinBufferedDurationUs; 612 if (mPlaylist != NULL) { 613 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 614 targetDurationUs = targetDurationSecs * 1000000ll; 615 } 616 617 // buffer at least 3 times the target duration, or up to 10 seconds 618 int64_t durationToBufferUs = targetDurationUs * 3; 619 if (durationToBufferUs > kMinBufferedDurationUs) { 620 durationToBufferUs = kMinBufferedDurationUs; 621 } 622 623 int64_t bufferedDurationUs = 0ll; 624 status_t finalResult = NOT_ENOUGH_DATA; 625 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { 626 sp<AnotherPacketSource> packetSource = 627 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 628 629 bufferedDurationUs = 630 packetSource->getBufferedDurationUs(&finalResult); 631 finalResult = OK; 632 } else { 633 // Use max stream duration to prevent us from waiting on a non-existent stream; 634 // when we cannot make out from the manifest what streams are included in a playlist 635 // we might assume extra streams. 636 for (size_t i = 0; i < mPacketSources.size(); ++i) { 637 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { 638 continue; 639 } 640 641 int64_t bufferedStreamDurationUs = 642 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); 643 ALOGV("buffered %" PRId64 " for stream %d", 644 bufferedStreamDurationUs, mPacketSources.keyAt(i)); 645 if (bufferedStreamDurationUs > bufferedDurationUs) { 646 bufferedDurationUs = bufferedStreamDurationUs; 647 } 648 } 649 } 650 downloadMore = (bufferedDurationUs < durationToBufferUs); 651 652 // signal start if buffered up at least the target size 653 if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) { 654 mPrepared = true; 655 656 ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "", 657 bufferedDurationUs, targetDurationUs); 658 sp<AMessage> msg = mNotify->dup(); 659 msg->setInt32("what", kWhatTemporarilyDoneFetching); 660 msg->post(); 661 } 662 663 if (finalResult == OK && downloadMore) { 664 ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "", 665 bufferedDurationUs, durationToBufferUs); 666 // delay the next download slightly; hopefully this gives other concurrent fetchers 667 // a better chance to run. 668 // onDownloadNext(); 669 sp<AMessage> msg = new AMessage(kWhatDownloadNext, id()); 670 msg->setInt32("generation", mMonitorQueueGeneration); 671 msg->post(1000l); 672 } else { 673 // Nothing to do yet, try again in a second. 674 675 sp<AMessage> msg = mNotify->dup(); 676 msg->setInt32("what", kWhatTemporarilyDoneFetching); 677 msg->post(); 678 679 int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2; 680 ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "", 681 delayUs, bufferedDurationUs, durationToBufferUs); 682 // :TRICKY: need to enforce minimum delay because the delay to 683 // refresh the playlist will become 0 684 postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0); 685 } 686} 687 688status_t PlaylistFetcher::refreshPlaylist() { 689 if (delayUsToRefreshPlaylist() <= 0) { 690 bool unchanged; 691 sp<M3UParser> playlist = mSession->fetchPlaylist( 692 mURI.c_str(), mPlaylistHash, &unchanged); 693 694 if (playlist == NULL) { 695 if (unchanged) { 696 // We succeeded in fetching the playlist, but it was 697 // unchanged from the last time we tried. 698 699 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) { 700 mRefreshState = (RefreshState)(mRefreshState + 1); 701 } 702 } else { 703 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str()); 704 return ERROR_IO; 705 } 706 } else { 707 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; 708 mPlaylist = playlist; 709 710 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 711 updateDuration(); 712 } 713 } 714 715 mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); 716 } 717 return OK; 718} 719 720// static 721bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) { 722 return buffer->size() > 0 && buffer->data()[0] == 0x47; 723} 724 725void PlaylistFetcher::onDownloadNext() { 726 status_t err = refreshPlaylist(); 727 int32_t firstSeqNumberInPlaylist = 0; 728 int32_t lastSeqNumberInPlaylist = 0; 729 bool discontinuity = false; 730 731 if (mPlaylist != NULL) { 732 if (mPlaylist->meta() != NULL) { 733 mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist); 734 } 735 736 lastSeqNumberInPlaylist = 737 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 738 739 if (mDiscontinuitySeq < 0) { 740 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 741 } 742 } 743 744 if (mPlaylist != NULL && mSeqNumber < 0) { 745 CHECK_GE(mStartTimeUs, 0ll); 746 747 if (mSegmentStartTimeUs < 0) { 748 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { 749 // If this is a live session, start 3 segments from the end on connect 750 mSeqNumber = lastSeqNumberInPlaylist - 3; 751 if (mSeqNumber < firstSeqNumberInPlaylist) { 752 mSeqNumber = firstSeqNumberInPlaylist; 753 } 754 } else { 755 mSeqNumber = getSeqNumberForTime(mStartTimeUs); 756 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber); 757 } 758 mStartTimeUsRelative = true; 759 ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)", 760 mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, 761 lastSeqNumberInPlaylist); 762 } else { 763 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs); 764 if (mAdaptive) { 765 // avoid double fetch/decode 766 mSeqNumber += 1; 767 } 768 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq); 769 if (mSeqNumber < minSeq) { 770 mSeqNumber = minSeq; 771 } 772 773 if (mSeqNumber < firstSeqNumberInPlaylist) { 774 mSeqNumber = firstSeqNumberInPlaylist; 775 } 776 777 if (mSeqNumber > lastSeqNumberInPlaylist) { 778 mSeqNumber = lastSeqNumberInPlaylist; 779 } 780 ALOGV("Initial sequence number for live event %d from (%d .. %d)", 781 mSeqNumber, firstSeqNumberInPlaylist, 782 lastSeqNumberInPlaylist); 783 } 784 } 785 786 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true 787 if (mSeqNumber < firstSeqNumberInPlaylist 788 || mSeqNumber > lastSeqNumberInPlaylist 789 || err != OK) { 790 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) { 791 ++mNumRetries; 792 793 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) { 794 // make sure we reach this retry logic on refresh failures 795 // by adding an err != OK clause to all enclosing if's. 796 797 // refresh in increasing fraction (1/2, 1/3, ...) of the 798 // playlist's target duration or 3 seconds, whichever is less 799 int64_t delayUs = kMaxMonitorDelayUs; 800 if (mPlaylist != NULL && mPlaylist->meta() != NULL) { 801 int32_t targetDurationSecs; 802 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 803 delayUs = mPlaylist->size() * targetDurationSecs * 804 1000000ll / (1 + mNumRetries); 805 } 806 if (delayUs > kMaxMonitorDelayUs) { 807 delayUs = kMaxMonitorDelayUs; 808 } 809 ALOGV("sequence number high: %d from (%d .. %d), " 810 "monitor in %" PRId64 " (retry=%d)", 811 mSeqNumber, firstSeqNumberInPlaylist, 812 lastSeqNumberInPlaylist, delayUs, mNumRetries); 813 postMonitorQueue(delayUs); 814 return; 815 } 816 817 if (err != OK) { 818 notifyError(err); 819 return; 820 } 821 822 // we've missed the boat, let's start 3 segments prior to the latest sequence 823 // number available and signal a discontinuity. 824 825 ALOGI("We've missed the boat, restarting playback." 826 " mStartup=%d, was looking for %d in %d-%d", 827 mStartup, mSeqNumber, firstSeqNumberInPlaylist, 828 lastSeqNumberInPlaylist); 829 if (mStopParams != NULL) { 830 // we should have kept on fetching until we hit the boundaries in mStopParams, 831 // but since the segments we are supposed to fetch have already rolled off 832 // the playlist, i.e. we have already missed the boat, we inevitably have to 833 // skip. 834 for (size_t i = 0; i < mPacketSources.size(); i++) { 835 sp<ABuffer> formatChange = mSession->createFormatChangeBuffer(); 836 mPacketSources.valueAt(i)->queueAccessUnit(formatChange); 837 } 838 stopAsync(/* clear = */ false); 839 return; 840 } 841 mSeqNumber = lastSeqNumberInPlaylist - 3; 842 if (mSeqNumber < firstSeqNumberInPlaylist) { 843 mSeqNumber = firstSeqNumberInPlaylist; 844 } 845 discontinuity = true; 846 847 // fall through 848 } else { 849 ALOGE("Cannot find sequence number %d in playlist " 850 "(contains %d - %d)", 851 mSeqNumber, firstSeqNumberInPlaylist, 852 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1); 853 854 notifyError(ERROR_END_OF_STREAM); 855 return; 856 } 857 } 858 859 mNumRetries = 0; 860 861 AString uri; 862 sp<AMessage> itemMeta; 863 CHECK(mPlaylist->itemAt( 864 mSeqNumber - firstSeqNumberInPlaylist, 865 &uri, 866 &itemMeta)); 867 868 int32_t val; 869 if (itemMeta->findInt32("discontinuity", &val) && val != 0) { 870 mDiscontinuitySeq++; 871 discontinuity = true; 872 } 873 874 int64_t range_offset, range_length; 875 if (!itemMeta->findInt64("range-offset", &range_offset) 876 || !itemMeta->findInt64("range-length", &range_length)) { 877 range_offset = 0; 878 range_length = -1; 879 } 880 881 ALOGV("fetching segment %d from (%d .. %d)", 882 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); 883 884 ALOGV("fetching '%s'", uri.c_str()); 885 886 sp<DataSource> source; 887 sp<ABuffer> buffer, tsBuffer; 888 // decrypt a junk buffer to prefetch key; since a session uses only one http connection, 889 // this avoids interleaved connections to the key and segment file. 890 { 891 sp<ABuffer> junk = new ABuffer(16); 892 junk->setRange(0, 16); 893 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk, 894 true /* first */); 895 if (err != OK) { 896 notifyError(err); 897 return; 898 } 899 } 900 901 // block-wise download 902 bool startup = mStartup; 903 ssize_t bytesRead; 904 do { 905 bytesRead = mSession->fetchFile( 906 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source); 907 908 if (bytesRead < 0) { 909 status_t err = bytesRead; 910 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); 911 notifyError(err); 912 return; 913 } 914 915 CHECK(buffer != NULL); 916 917 size_t size = buffer->size(); 918 // Set decryption range. 919 buffer->setRange(size - bytesRead, bytesRead); 920 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer, 921 buffer->offset() == 0 /* first */); 922 // Unset decryption range. 923 buffer->setRange(0, size); 924 925 if (err != OK) { 926 ALOGE("decryptBuffer failed w/ error %d", err); 927 928 notifyError(err); 929 return; 930 } 931 932 if (startup || discontinuity) { 933 // Signal discontinuity. 934 935 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 936 // If this was a live event this made no sense since 937 // we don't have access to all the segment before the current 938 // one. 939 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); 940 } 941 942 if (discontinuity) { 943 ALOGI("queueing discontinuity (explicit=%d)", discontinuity); 944 945 queueDiscontinuity( 946 ATSParser::DISCONTINUITY_FORMATCHANGE, 947 NULL /* extra */); 948 949 discontinuity = false; 950 } 951 952 startup = false; 953 } 954 955 err = OK; 956 if (bufferStartsWithTsSyncByte(buffer)) { 957 // Incremental extraction is only supported for MPEG2 transport streams. 958 if (tsBuffer == NULL) { 959 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 960 tsBuffer->setRange(0, 0); 961 } else if (tsBuffer->capacity() != buffer->capacity()) { 962 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size(); 963 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 964 tsBuffer->setRange(tsOff, tsSize); 965 } 966 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); 967 968 err = extractAndQueueAccessUnitsFromTs(tsBuffer); 969 } 970 971 if (err == -EAGAIN) { 972 // starting sequence number too low/high 973 mTSParser.clear(); 974 postMonitorQueue(); 975 return; 976 } else if (err == ERROR_OUT_OF_RANGE) { 977 // reached stopping point 978 stopAsync(/* clear = */ false); 979 return; 980 } else if (err != OK) { 981 notifyError(err); 982 return; 983 } 984 985 } while (bytesRead != 0); 986 987 if (bufferStartsWithTsSyncByte(buffer)) { 988 // If we don't see a stream in the program table after fetching a full ts segment 989 // mark it as nonexistent. 990 const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES; 991 ATSParser::SourceType srcTypes[kNumTypes] = 992 { ATSParser::VIDEO, ATSParser::AUDIO }; 993 LiveSession::StreamType streamTypes[kNumTypes] = 994 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO }; 995 996 for (size_t i = 0; i < kNumTypes; i++) { 997 ATSParser::SourceType srcType = srcTypes[i]; 998 LiveSession::StreamType streamType = streamTypes[i]; 999 1000 sp<AnotherPacketSource> source = 1001 static_cast<AnotherPacketSource *>( 1002 mTSParser->getSource(srcType).get()); 1003 1004 if (!mTSParser->hasSource(srcType)) { 1005 ALOGW("MPEG2 Transport stream does not contain %s data.", 1006 srcType == ATSParser::VIDEO ? "video" : "audio"); 1007 1008 mStreamTypeMask &= ~streamType; 1009 mPacketSources.removeItem(streamType); 1010 } 1011 } 1012 1013 } 1014 1015 if (checkDecryptPadding(buffer) != OK) { 1016 ALOGE("Incorrect padding bytes after decryption."); 1017 notifyError(ERROR_MALFORMED); 1018 return; 1019 } 1020 1021 err = OK; 1022 if (tsBuffer != NULL) { 1023 AString method; 1024 CHECK(buffer->meta()->findString("cipher-method", &method)); 1025 if ((tsBuffer->size() > 0 && method == "NONE") 1026 || tsBuffer->size() > 16) { 1027 ALOGE("MPEG2 transport stream is not an even multiple of 188 " 1028 "bytes in length."); 1029 notifyError(ERROR_MALFORMED); 1030 return; 1031 } 1032 } 1033 1034 // bulk extract non-ts files 1035 if (tsBuffer == NULL) { 1036 err = extractAndQueueAccessUnits(buffer, itemMeta); 1037 if (err == -EAGAIN) { 1038 // starting sequence number too low/high 1039 postMonitorQueue(); 1040 return; 1041 } else if (err == ERROR_OUT_OF_RANGE) { 1042 // reached stopping point 1043 stopAsync(/* clear = */false); 1044 return; 1045 } 1046 } 1047 1048 if (err != OK) { 1049 notifyError(err); 1050 return; 1051 } 1052 1053 ++mSeqNumber; 1054 1055 postMonitorQueue(); 1056} 1057 1058int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const { 1059 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; 1060 if (mPlaylist->meta() == NULL 1061 || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { 1062 firstSeqNumberInPlaylist = 0; 1063 } 1064 lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1; 1065 1066 int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1; 1067 while (index >= 0 && anchorTimeUs > mStartTimeUs) { 1068 sp<AMessage> itemMeta; 1069 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); 1070 1071 int64_t itemDurationUs; 1072 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1073 1074 anchorTimeUs -= itemDurationUs; 1075 --index; 1076 } 1077 1078 int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1; 1079 if (newSeqNumber <= lastSeqNumberInPlaylist) { 1080 return newSeqNumber; 1081 } else { 1082 return lastSeqNumberInPlaylist; 1083 } 1084} 1085 1086int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const { 1087 int32_t firstSeqNumberInPlaylist; 1088 if (mPlaylist->meta() == NULL 1089 || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { 1090 firstSeqNumberInPlaylist = 0; 1091 } 1092 1093 size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 1094 if (discontinuitySeq < curDiscontinuitySeq) { 1095 return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1); 1096 } 1097 1098 size_t index = 0; 1099 while (index < mPlaylist->size()) { 1100 sp<AMessage> itemMeta; 1101 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); 1102 1103 int64_t discontinuity; 1104 if (itemMeta->findInt64("discontinuity", &discontinuity)) { 1105 curDiscontinuitySeq++; 1106 } 1107 1108 if (curDiscontinuitySeq == discontinuitySeq) { 1109 return firstSeqNumberInPlaylist + index; 1110 } 1111 1112 ++index; 1113 } 1114 1115 return firstSeqNumberInPlaylist + mPlaylist->size(); 1116} 1117 1118int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { 1119 int32_t firstSeqNumberInPlaylist; 1120 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 1121 "media-sequence", &firstSeqNumberInPlaylist)) { 1122 firstSeqNumberInPlaylist = 0; 1123 } 1124 1125 size_t index = 0; 1126 int64_t segmentStartUs = 0; 1127 while (index < mPlaylist->size()) { 1128 sp<AMessage> itemMeta; 1129 CHECK(mPlaylist->itemAt( 1130 index, NULL /* uri */, &itemMeta)); 1131 1132 int64_t itemDurationUs; 1133 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1134 1135 if (timeUs < segmentStartUs + itemDurationUs) { 1136 break; 1137 } 1138 1139 segmentStartUs += itemDurationUs; 1140 ++index; 1141 } 1142 1143 if (index >= mPlaylist->size()) { 1144 index = mPlaylist->size() - 1; 1145 } 1146 1147 return firstSeqNumberInPlaylist + index; 1148} 1149 1150const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties( 1151 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) { 1152 sp<MetaData> format = source->getFormat(); 1153 if (format != NULL) { 1154 // for simplicity, store a reference to the format in each unit 1155 accessUnit->meta()->setObject("format", format); 1156 } 1157 1158 if (discard) { 1159 accessUnit->meta()->setInt32("discard", discard); 1160 } 1161 1162 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1163 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1164 return accessUnit; 1165} 1166 1167status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) { 1168 if (mTSParser == NULL) { 1169 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. 1170 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); 1171 } 1172 1173 if (mNextPTSTimeUs >= 0ll) { 1174 sp<AMessage> extra = new AMessage; 1175 // Since we are using absolute timestamps, signal an offset of 0 to prevent 1176 // ATSParser from skewing the timestamps of access units. 1177 extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); 1178 1179 mTSParser->signalDiscontinuity( 1180 ATSParser::DISCONTINUITY_TIME, extra); 1181 1182 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 1183 mNextPTSTimeUs = -1ll; 1184 mFirstPTSValid = false; 1185 } 1186 1187 size_t offset = 0; 1188 while (offset + 188 <= buffer->size()) { 1189 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); 1190 1191 if (err != OK) { 1192 return err; 1193 } 1194 1195 offset += 188; 1196 } 1197 // setRange to indicate consumed bytes. 1198 buffer->setRange(buffer->offset() + offset, buffer->size() - offset); 1199 1200 status_t err = OK; 1201 for (size_t i = mPacketSources.size(); i-- > 0;) { 1202 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1203 1204 const char *key; 1205 ATSParser::SourceType type; 1206 const LiveSession::StreamType stream = mPacketSources.keyAt(i); 1207 switch (stream) { 1208 case LiveSession::STREAMTYPE_VIDEO: 1209 type = ATSParser::VIDEO; 1210 key = "timeUsVideo"; 1211 break; 1212 1213 case LiveSession::STREAMTYPE_AUDIO: 1214 type = ATSParser::AUDIO; 1215 key = "timeUsAudio"; 1216 break; 1217 1218 case LiveSession::STREAMTYPE_SUBTITLES: 1219 { 1220 ALOGE("MPEG2 Transport streams do not contain subtitles."); 1221 return ERROR_MALFORMED; 1222 break; 1223 } 1224 1225 default: 1226 TRESPASS(); 1227 } 1228 1229 sp<AnotherPacketSource> source = 1230 static_cast<AnotherPacketSource *>( 1231 mTSParser->getSource(type).get()); 1232 1233 if (source == NULL) { 1234 continue; 1235 } 1236 1237 int64_t timeUs; 1238 sp<ABuffer> accessUnit; 1239 status_t finalResult; 1240 while (source->hasBufferAvailable(&finalResult) 1241 && source->dequeueAccessUnit(&accessUnit) == OK) { 1242 1243 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1244 1245 if (mStartup) { 1246 if (!mFirstPTSValid) { 1247 mFirstTimeUs = timeUs; 1248 mFirstPTSValid = true; 1249 } 1250 if (mStartTimeUsRelative) { 1251 timeUs -= mFirstTimeUs; 1252 if (timeUs < 0) { 1253 timeUs = 0; 1254 } 1255 } 1256 1257 if (timeUs < mStartTimeUs) { 1258 // buffer up to the closest preceding IDR frame 1259 ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us", 1260 timeUs, mStartTimeUs); 1261 const char *mime; 1262 sp<MetaData> format = source->getFormat(); 1263 bool isAvc = false; 1264 if (format != NULL && format->findCString(kKeyMIMEType, &mime) 1265 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) { 1266 isAvc = true; 1267 } 1268 if (isAvc && IsIDR(accessUnit)) { 1269 mVideoBuffer->clear(); 1270 } 1271 if (isAvc) { 1272 mVideoBuffer->queueAccessUnit(accessUnit); 1273 } 1274 1275 continue; 1276 } 1277 } 1278 1279 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1280 if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) { 1281 int32_t firstSeqNumberInPlaylist; 1282 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 1283 "media-sequence", &firstSeqNumberInPlaylist)) { 1284 firstSeqNumberInPlaylist = 0; 1285 } 1286 1287 int32_t targetDurationSecs; 1288 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 1289 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 1290 // mStartup 1291 // mStartup is true until we have queued a packet for all the streams 1292 // we are fetching. We queue packets whose timestamps are greater than 1293 // mStartTimeUs. 1294 // mSegmentStartTimeUs >= 0 1295 // mSegmentStartTimeUs is non-negative when adapting or switching tracks 1296 // mSeqNumber > firstSeqNumberInPlaylist 1297 // don't decrement mSeqNumber if it already points to the 1st segment 1298 // timeUs - mStartTimeUs > targetDurationUs: 1299 // This and the 2 above conditions should only happen when adapting in a live 1300 // stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher 1301 // would start fetching after timeUs, which should be greater than mStartTimeUs; 1302 // the old fetcher would then continue fetching data until timeUs. We don't want 1303 // timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to 1304 // stop as early as possible. The definition of being "too far ahead" is 1305 // arbitrary; here we use targetDurationUs as threshold. 1306 if (mStartup && mSegmentStartTimeUs >= 0 1307 && mSeqNumber > firstSeqNumberInPlaylist 1308 && timeUs - mStartTimeUs > targetDurationUs) { 1309 // we just guessed a starting timestamp that is too high when adapting in a 1310 // live stream; re-adjust based on the actual timestamp extracted from the 1311 // media segment; if we didn't move backward after the re-adjustment 1312 // (newSeqNumber), start at least 1 segment prior. 1313 int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); 1314 if (newSeqNumber >= mSeqNumber) { 1315 --mSeqNumber; 1316 } else { 1317 mSeqNumber = newSeqNumber; 1318 } 1319 mStartTimeUsNotify = mNotify->dup(); 1320 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 1321 return -EAGAIN; 1322 } 1323 1324 int32_t seq; 1325 if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) { 1326 mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); 1327 } 1328 int64_t startTimeUs; 1329 if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) { 1330 mStartTimeUsNotify->setInt64(key, timeUs); 1331 1332 uint32_t streamMask = 0; 1333 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); 1334 streamMask |= mPacketSources.keyAt(i); 1335 mStartTimeUsNotify->setInt32("streamMask", streamMask); 1336 1337 if (streamMask == mStreamTypeMask) { 1338 mStartup = false; 1339 mStartTimeUsNotify->post(); 1340 mStartTimeUsNotify.clear(); 1341 } 1342 } 1343 } 1344 1345 if (mStopParams != NULL) { 1346 // Queue discontinuity in original stream. 1347 int32_t discontinuitySeq; 1348 int64_t stopTimeUs; 1349 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1350 || discontinuitySeq > mDiscontinuitySeq 1351 || !mStopParams->findInt64(key, &stopTimeUs) 1352 || (discontinuitySeq == mDiscontinuitySeq 1353 && timeUs >= stopTimeUs)) { 1354 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); 1355 mStreamTypeMask &= ~stream; 1356 mPacketSources.removeItemsAt(i); 1357 break; 1358 } 1359 } 1360 1361 // Note that we do NOT dequeue any discontinuities except for format change. 1362 if (stream == LiveSession::STREAMTYPE_VIDEO) { 1363 const bool discard = true; 1364 status_t status; 1365 while (mVideoBuffer->hasBufferAvailable(&status)) { 1366 sp<ABuffer> videoBuffer; 1367 mVideoBuffer->dequeueAccessUnit(&videoBuffer); 1368 setAccessUnitProperties(videoBuffer, source, discard); 1369 packetSource->queueAccessUnit(videoBuffer); 1370 } 1371 } 1372 1373 setAccessUnitProperties(accessUnit, source); 1374 packetSource->queueAccessUnit(accessUnit); 1375 } 1376 1377 if (err != OK) { 1378 break; 1379 } 1380 } 1381 1382 if (err != OK) { 1383 for (size_t i = mPacketSources.size(); i-- > 0;) { 1384 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1385 packetSource->clear(); 1386 } 1387 return err; 1388 } 1389 1390 if (!mStreamTypeMask) { 1391 // Signal gap is filled between original and new stream. 1392 ALOGV("ERROR OUT OF RANGE"); 1393 return ERROR_OUT_OF_RANGE; 1394 } 1395 1396 return OK; 1397} 1398 1399/* static */ 1400bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence( 1401 const sp<ABuffer> &buffer) { 1402 size_t pos = 0; 1403 1404 // skip possible BOM 1405 if (buffer->size() >= pos + 3 && 1406 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) { 1407 pos += 3; 1408 } 1409 1410 // accept WEBVTT followed by SPACE, TAB or (CR) LF 1411 if (buffer->size() < pos + 6 || 1412 memcmp("WEBVTT", buffer->data() + pos, 6)) { 1413 return false; 1414 } 1415 pos += 6; 1416 1417 if (buffer->size() == pos) { 1418 return true; 1419 } 1420 1421 uint8_t sep = buffer->data()[pos]; 1422 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r'; 1423} 1424 1425status_t PlaylistFetcher::extractAndQueueAccessUnits( 1426 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { 1427 if (bufferStartsWithWebVTTMagicSequence(buffer)) { 1428 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { 1429 ALOGE("This stream only contains subtitles."); 1430 return ERROR_MALFORMED; 1431 } 1432 1433 const sp<AnotherPacketSource> packetSource = 1434 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 1435 1436 int64_t durationUs; 1437 CHECK(itemMeta->findInt64("durationUs", &durationUs)); 1438 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); 1439 buffer->meta()->setInt64("durationUs", durationUs); 1440 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1441 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1442 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration); 1443 1444 packetSource->queueAccessUnit(buffer); 1445 return OK; 1446 } 1447 1448 if (mNextPTSTimeUs >= 0ll) { 1449 mFirstPTSValid = false; 1450 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 1451 mNextPTSTimeUs = -1ll; 1452 } 1453 1454 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio 1455 // stream prefixed by an ID3 tag. 1456 1457 bool firstID3Tag = true; 1458 uint64_t PTS = 0; 1459 1460 for (;;) { 1461 // Make sure to skip all ID3 tags preceding the audio data. 1462 // At least one must be present to provide the PTS timestamp. 1463 1464 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */); 1465 if (!id3.isValid()) { 1466 if (firstID3Tag) { 1467 ALOGE("Unable to parse ID3 tag."); 1468 return ERROR_MALFORMED; 1469 } else { 1470 break; 1471 } 1472 } 1473 1474 if (firstID3Tag) { 1475 bool found = false; 1476 1477 ID3::Iterator it(id3, "PRIV"); 1478 while (!it.done()) { 1479 size_t length; 1480 const uint8_t *data = it.getData(&length); 1481 1482 static const char *kMatchName = 1483 "com.apple.streaming.transportStreamTimestamp"; 1484 static const size_t kMatchNameLen = strlen(kMatchName); 1485 1486 if (length == kMatchNameLen + 1 + 8 1487 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) { 1488 found = true; 1489 PTS = U64_AT(&data[kMatchNameLen + 1]); 1490 } 1491 1492 it.next(); 1493 } 1494 1495 if (!found) { 1496 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag."); 1497 return ERROR_MALFORMED; 1498 } 1499 } 1500 1501 // skip the ID3 tag 1502 buffer->setRange( 1503 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize()); 1504 1505 firstID3Tag = false; 1506 } 1507 1508 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { 1509 ALOGW("This stream only contains audio data!"); 1510 1511 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO; 1512 1513 if (mStreamTypeMask == 0) { 1514 return OK; 1515 } 1516 } 1517 1518 sp<AnotherPacketSource> packetSource = 1519 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO); 1520 1521 if (packetSource->getFormat() == NULL && buffer->size() >= 7) { 1522 ABitReader bits(buffer->data(), buffer->size()); 1523 1524 // adts_fixed_header 1525 1526 CHECK_EQ(bits.getBits(12), 0xfffu); 1527 bits.skipBits(3); // ID, layer 1528 bool protection_absent = bits.getBits(1) != 0; 1529 1530 unsigned profile = bits.getBits(2); 1531 CHECK_NE(profile, 3u); 1532 unsigned sampling_freq_index = bits.getBits(4); 1533 bits.getBits(1); // private_bit 1534 unsigned channel_configuration = bits.getBits(3); 1535 CHECK_NE(channel_configuration, 0u); 1536 bits.skipBits(2); // original_copy, home 1537 1538 sp<MetaData> meta = MakeAACCodecSpecificData( 1539 profile, sampling_freq_index, channel_configuration); 1540 1541 meta->setInt32(kKeyIsADTS, true); 1542 1543 packetSource->setFormat(meta); 1544 } 1545 1546 int64_t numSamples = 0ll; 1547 int32_t sampleRate; 1548 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); 1549 1550 int64_t timeUs = (PTS * 100ll) / 9ll; 1551 if (!mFirstPTSValid) { 1552 mFirstPTSValid = true; 1553 mFirstTimeUs = timeUs; 1554 } 1555 1556 size_t offset = 0; 1557 while (offset < buffer->size()) { 1558 const uint8_t *adtsHeader = buffer->data() + offset; 1559 CHECK_LT(offset + 5, buffer->size()); 1560 1561 unsigned aac_frame_length = 1562 ((adtsHeader[3] & 3) << 11) 1563 | (adtsHeader[4] << 3) 1564 | (adtsHeader[5] >> 5); 1565 1566 if (aac_frame_length == 0) { 1567 const uint8_t *id3Header = adtsHeader; 1568 if (!memcmp(id3Header, "ID3", 3)) { 1569 ID3 id3(id3Header, buffer->size() - offset, true); 1570 if (id3.isValid()) { 1571 offset += id3.rawSize(); 1572 continue; 1573 }; 1574 } 1575 return ERROR_MALFORMED; 1576 } 1577 1578 CHECK_LE(offset + aac_frame_length, buffer->size()); 1579 1580 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; 1581 offset += aac_frame_length; 1582 1583 // Each AAC frame encodes 1024 samples. 1584 numSamples += 1024; 1585 1586 if (mStartup) { 1587 int64_t startTimeUs = unitTimeUs; 1588 if (mStartTimeUsRelative) { 1589 startTimeUs -= mFirstTimeUs; 1590 if (startTimeUs < 0) { 1591 startTimeUs = 0; 1592 } 1593 } 1594 if (startTimeUs < mStartTimeUs) { 1595 continue; 1596 } 1597 1598 if (mStartTimeUsNotify != NULL) { 1599 int32_t targetDurationSecs; 1600 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 1601 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 1602 1603 // Duplicated logic from how we handle .ts playlists. 1604 if (mStartup && mSegmentStartTimeUs >= 0 1605 && timeUs - mStartTimeUs > targetDurationUs) { 1606 int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); 1607 if (newSeqNumber >= mSeqNumber) { 1608 --mSeqNumber; 1609 } else { 1610 mSeqNumber = newSeqNumber; 1611 } 1612 return -EAGAIN; 1613 } 1614 1615 mStartTimeUsNotify->setInt64("timeUsAudio", timeUs); 1616 mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); 1617 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO); 1618 mStartTimeUsNotify->post(); 1619 mStartTimeUsNotify.clear(); 1620 mStartup = false; 1621 } 1622 } 1623 1624 if (mStopParams != NULL) { 1625 // Queue discontinuity in original stream. 1626 int32_t discontinuitySeq; 1627 int64_t stopTimeUs; 1628 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1629 || discontinuitySeq > mDiscontinuitySeq 1630 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs) 1631 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) { 1632 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); 1633 mStreamTypeMask = 0; 1634 mPacketSources.clear(); 1635 return ERROR_OUT_OF_RANGE; 1636 } 1637 } 1638 1639 sp<ABuffer> unit = new ABuffer(aac_frame_length); 1640 memcpy(unit->data(), adtsHeader, aac_frame_length); 1641 1642 unit->meta()->setInt64("timeUs", unitTimeUs); 1643 setAccessUnitProperties(unit, packetSource); 1644 packetSource->queueAccessUnit(unit); 1645 } 1646 1647 return OK; 1648} 1649 1650void PlaylistFetcher::updateDuration() { 1651 int64_t durationUs = 0ll; 1652 for (size_t index = 0; index < mPlaylist->size(); ++index) { 1653 sp<AMessage> itemMeta; 1654 CHECK(mPlaylist->itemAt( 1655 index, NULL /* uri */, &itemMeta)); 1656 1657 int64_t itemDurationUs; 1658 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1659 1660 durationUs += itemDurationUs; 1661 } 1662 1663 sp<AMessage> msg = mNotify->dup(); 1664 msg->setInt32("what", kWhatDurationUpdate); 1665 msg->setInt64("durationUs", durationUs); 1666 msg->post(); 1667} 1668 1669int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) { 1670 int64_t durationUs, threshold; 1671 if (msg->findInt64("durationUs", &durationUs)) { 1672 return kNumSkipFrames * durationUs; 1673 } 1674 1675 sp<RefBase> obj; 1676 msg->findObject("format", &obj); 1677 MetaData *format = static_cast<MetaData *>(obj.get()); 1678 1679 const char *mime; 1680 CHECK(format->findCString(kKeyMIMEType, &mime)); 1681 bool audio = !strncasecmp(mime, "audio/", 6); 1682 if (audio) { 1683 // Assumes 1000 samples per frame. 1684 int32_t sampleRate; 1685 CHECK(format->findInt32(kKeySampleRate, &sampleRate)); 1686 return kNumSkipFrames /* frames */ * 1000 /* samples */ 1687 * (1000000 / sampleRate) /* sample duration (us) */; 1688 } else { 1689 int32_t frameRate; 1690 if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) { 1691 return kNumSkipFrames * (1000000 / frameRate); 1692 } 1693 } 1694 1695 return 500000ll; 1696} 1697 1698} // namespace android 1699