PlaylistFetcher.cpp revision 7dbbc7ec95c3040668388162a0ffbc45b68af6f1
1/* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "PlaylistFetcher" 19#include <utils/Log.h> 20 21#include "PlaylistFetcher.h" 22 23#include "LiveDataSource.h" 24#include "LiveSession.h" 25#include "M3UParser.h" 26 27#include "include/avc_utils.h" 28#include "include/HTTPBase.h" 29#include "include/ID3.h" 30#include "mpeg2ts/AnotherPacketSource.h" 31 32#include <media/IStreamSource.h> 33#include <media/stagefright/foundation/ABitReader.h> 34#include <media/stagefright/foundation/ABuffer.h> 35#include <media/stagefright/foundation/ADebug.h> 36#include <media/stagefright/foundation/hexdump.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaDefs.h> 39#include <media/stagefright/MetaData.h> 40#include <media/stagefright/Utils.h> 41 42#include <ctype.h> 43#include <inttypes.h> 44#include <openssl/aes.h> 45#include <openssl/md5.h> 46 47namespace android { 48 49// static 50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; 51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; 52// LCM of 188 (size of a TS packet) & 1k works well 53const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024; 54const int32_t PlaylistFetcher::kNumSkipFrames = 5; 55 56PlaylistFetcher::PlaylistFetcher( 57 const sp<AMessage> ¬ify, 58 const sp<LiveSession> &session, 59 const char *uri, 60 int32_t subtitleGeneration) 61 : mNotify(notify), 62 mStartTimeUsNotify(notify->dup()), 63 mSession(session), 64 mURI(uri), 65 mStreamTypeMask(0), 66 mStartTimeUs(-1ll), 67 mSegmentStartTimeUs(-1ll), 68 mDiscontinuitySeq(-1ll), 69 mStartTimeUsRelative(false), 70 mLastPlaylistFetchTimeUs(-1ll), 71 mSeqNumber(-1), 72 mNumRetries(0), 73 mStartup(true), 74 mAdaptive(false), 75 mPrepared(false), 76 mNextPTSTimeUs(-1ll), 77 mMonitorQueueGeneration(0), 78 mSubtitleGeneration(subtitleGeneration), 79 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), 80 mFirstPTSValid(false), 81 mAbsoluteTimeAnchorUs(0ll), 82 mVideoBuffer(new AnotherPacketSource(NULL)) { 83 memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); 84 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 85 mStartTimeUsNotify->setInt32("streamMask", 0); 86} 87 88PlaylistFetcher::~PlaylistFetcher() { 89} 90 91int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { 92 CHECK(mPlaylist != NULL); 93 94 int32_t firstSeqNumberInPlaylist; 95 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 96 "media-sequence", &firstSeqNumberInPlaylist)) { 97 firstSeqNumberInPlaylist = 0; 98 } 99 100 int32_t lastSeqNumberInPlaylist = 101 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 102 103 CHECK_GE(seqNumber, firstSeqNumberInPlaylist); 104 CHECK_LE(seqNumber, lastSeqNumberInPlaylist); 105 106 int64_t segmentStartUs = 0ll; 107 for (int32_t index = 0; 108 index < seqNumber - firstSeqNumberInPlaylist; ++index) { 109 sp<AMessage> itemMeta; 110 CHECK(mPlaylist->itemAt( 111 index, NULL /* uri */, &itemMeta)); 112 113 int64_t itemDurationUs; 114 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 115 116 segmentStartUs += itemDurationUs; 117 } 118 119 return segmentStartUs; 120} 121 122int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { 123 int64_t nowUs = ALooper::GetNowUs(); 124 125 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) { 126 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY); 127 return 0ll; 128 } 129 130 if (mPlaylist->isComplete()) { 131 return (~0llu >> 1); 132 } 133 134 int32_t targetDurationSecs; 135 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 136 137 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 138 139 int64_t minPlaylistAgeUs; 140 141 switch (mRefreshState) { 142 case INITIAL_MINIMUM_RELOAD_DELAY: 143 { 144 size_t n = mPlaylist->size(); 145 if (n > 0) { 146 sp<AMessage> itemMeta; 147 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta)); 148 149 int64_t itemDurationUs; 150 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 151 152 minPlaylistAgeUs = itemDurationUs; 153 break; 154 } 155 156 // fall through 157 } 158 159 case FIRST_UNCHANGED_RELOAD_ATTEMPT: 160 { 161 minPlaylistAgeUs = targetDurationUs / 2; 162 break; 163 } 164 165 case SECOND_UNCHANGED_RELOAD_ATTEMPT: 166 { 167 minPlaylistAgeUs = (targetDurationUs * 3) / 2; 168 break; 169 } 170 171 case THIRD_UNCHANGED_RELOAD_ATTEMPT: 172 { 173 minPlaylistAgeUs = targetDurationUs * 3; 174 break; 175 } 176 177 default: 178 TRESPASS(); 179 break; 180 } 181 182 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs; 183 return delayUs > 0ll ? delayUs : 0ll; 184} 185 186status_t PlaylistFetcher::decryptBuffer( 187 size_t playlistIndex, const sp<ABuffer> &buffer, 188 bool first) { 189 sp<AMessage> itemMeta; 190 bool found = false; 191 AString method; 192 193 for (ssize_t i = playlistIndex; i >= 0; --i) { 194 AString uri; 195 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); 196 197 if (itemMeta->findString("cipher-method", &method)) { 198 found = true; 199 break; 200 } 201 } 202 203 if (!found) { 204 method = "NONE"; 205 } 206 buffer->meta()->setString("cipher-method", method.c_str()); 207 208 if (method == "NONE") { 209 return OK; 210 } else if (!(method == "AES-128")) { 211 ALOGE("Unsupported cipher method '%s'", method.c_str()); 212 return ERROR_UNSUPPORTED; 213 } 214 215 AString keyURI; 216 if (!itemMeta->findString("cipher-uri", &keyURI)) { 217 ALOGE("Missing key uri"); 218 return ERROR_MALFORMED; 219 } 220 221 ssize_t index = mAESKeyForURI.indexOfKey(keyURI); 222 223 sp<ABuffer> key; 224 if (index >= 0) { 225 key = mAESKeyForURI.valueAt(index); 226 } else { 227 ssize_t err = mSession->fetchFile(keyURI.c_str(), &key); 228 229 if (err < 0) { 230 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); 231 return ERROR_IO; 232 } else if (key->size() != 16) { 233 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str()); 234 return ERROR_MALFORMED; 235 } 236 237 mAESKeyForURI.add(keyURI, key); 238 } 239 240 AES_KEY aes_key; 241 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { 242 ALOGE("failed to set AES decryption key."); 243 return UNKNOWN_ERROR; 244 } 245 246 size_t n = buffer->size(); 247 if (!n) { 248 return OK; 249 } 250 CHECK(n % 16 == 0); 251 252 if (first) { 253 // If decrypting the first block in a file, read the iv from the manifest 254 // or derive the iv from the file's sequence number. 255 256 AString iv; 257 if (itemMeta->findString("cipher-iv", &iv)) { 258 if ((!iv.startsWith("0x") && !iv.startsWith("0X")) 259 || iv.size() != 16 * 2 + 2) { 260 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 261 return ERROR_MALFORMED; 262 } 263 264 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 265 for (size_t i = 0; i < 16; ++i) { 266 char c1 = tolower(iv.c_str()[2 + 2 * i]); 267 char c2 = tolower(iv.c_str()[3 + 2 * i]); 268 if (!isxdigit(c1) || !isxdigit(c2)) { 269 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 270 return ERROR_MALFORMED; 271 } 272 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; 273 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; 274 275 mAESInitVec[i] = nibble1 << 4 | nibble2; 276 } 277 } else { 278 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 279 mAESInitVec[15] = mSeqNumber & 0xff; 280 mAESInitVec[14] = (mSeqNumber >> 8) & 0xff; 281 mAESInitVec[13] = (mSeqNumber >> 16) & 0xff; 282 mAESInitVec[12] = (mSeqNumber >> 24) & 0xff; 283 } 284 } 285 286 AES_cbc_encrypt( 287 buffer->data(), buffer->data(), buffer->size(), 288 &aes_key, mAESInitVec, AES_DECRYPT); 289 290 return OK; 291} 292 293status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) { 294 AString method; 295 CHECK(buffer->meta()->findString("cipher-method", &method)); 296 if (method == "NONE") { 297 return OK; 298 } 299 300 uint8_t padding = 0; 301 if (buffer->size() > 0) { 302 padding = buffer->data()[buffer->size() - 1]; 303 } 304 305 if (padding > 16) { 306 return ERROR_MALFORMED; 307 } 308 309 for (size_t i = buffer->size() - padding; i < padding; i++) { 310 if (buffer->data()[i] != padding) { 311 return ERROR_MALFORMED; 312 } 313 } 314 315 buffer->setRange(buffer->offset(), buffer->size() - padding); 316 return OK; 317} 318 319void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { 320 int64_t maxDelayUs = delayUsToRefreshPlaylist(); 321 if (maxDelayUs < minDelayUs) { 322 maxDelayUs = minDelayUs; 323 } 324 if (delayUs > maxDelayUs) { 325 ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs); 326 delayUs = maxDelayUs; 327 } 328 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); 329 msg->setInt32("generation", mMonitorQueueGeneration); 330 msg->post(delayUs); 331} 332 333void PlaylistFetcher::cancelMonitorQueue() { 334 ++mMonitorQueueGeneration; 335} 336 337void PlaylistFetcher::startAsync( 338 const sp<AnotherPacketSource> &audioSource, 339 const sp<AnotherPacketSource> &videoSource, 340 const sp<AnotherPacketSource> &subtitleSource, 341 int64_t startTimeUs, 342 int64_t segmentStartTimeUs, 343 int32_t startDiscontinuitySeq, 344 bool adaptive) { 345 sp<AMessage> msg = new AMessage(kWhatStart, id()); 346 347 uint32_t streamTypeMask = 0ul; 348 349 if (audioSource != NULL) { 350 msg->setPointer("audioSource", audioSource.get()); 351 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO; 352 } 353 354 if (videoSource != NULL) { 355 msg->setPointer("videoSource", videoSource.get()); 356 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO; 357 } 358 359 if (subtitleSource != NULL) { 360 msg->setPointer("subtitleSource", subtitleSource.get()); 361 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES; 362 } 363 364 msg->setInt32("streamTypeMask", streamTypeMask); 365 msg->setInt64("startTimeUs", startTimeUs); 366 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs); 367 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq); 368 msg->setInt32("adaptive", adaptive); 369 msg->post(); 370} 371 372void PlaylistFetcher::pauseAsync() { 373 (new AMessage(kWhatPause, id()))->post(); 374} 375 376void PlaylistFetcher::stopAsync(bool clear) { 377 sp<AMessage> msg = new AMessage(kWhatStop, id()); 378 msg->setInt32("clear", clear); 379 msg->post(); 380} 381 382void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) { 383 AMessage* msg = new AMessage(kWhatResumeUntil, id()); 384 msg->setMessage("params", params); 385 msg->post(); 386} 387 388void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { 389 switch (msg->what()) { 390 case kWhatStart: 391 { 392 status_t err = onStart(msg); 393 394 sp<AMessage> notify = mNotify->dup(); 395 notify->setInt32("what", kWhatStarted); 396 notify->setInt32("err", err); 397 notify->post(); 398 break; 399 } 400 401 case kWhatPause: 402 { 403 onPause(); 404 405 sp<AMessage> notify = mNotify->dup(); 406 notify->setInt32("what", kWhatPaused); 407 notify->post(); 408 break; 409 } 410 411 case kWhatStop: 412 { 413 onStop(msg); 414 415 sp<AMessage> notify = mNotify->dup(); 416 notify->setInt32("what", kWhatStopped); 417 notify->post(); 418 break; 419 } 420 421 case kWhatMonitorQueue: 422 case kWhatDownloadNext: 423 { 424 int32_t generation; 425 CHECK(msg->findInt32("generation", &generation)); 426 427 if (generation != mMonitorQueueGeneration) { 428 // Stale event 429 break; 430 } 431 432 if (msg->what() == kWhatMonitorQueue) { 433 onMonitorQueue(); 434 } else { 435 onDownloadNext(); 436 } 437 break; 438 } 439 440 case kWhatResumeUntil: 441 { 442 onResumeUntil(msg); 443 break; 444 } 445 446 default: 447 TRESPASS(); 448 } 449} 450 451status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { 452 mPacketSources.clear(); 453 454 uint32_t streamTypeMask; 455 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); 456 457 int64_t startTimeUs; 458 int64_t segmentStartTimeUs; 459 int32_t startDiscontinuitySeq; 460 int32_t adaptive; 461 CHECK(msg->findInt64("startTimeUs", &startTimeUs)); 462 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs)); 463 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq)); 464 CHECK(msg->findInt32("adaptive", &adaptive)); 465 466 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { 467 void *ptr; 468 CHECK(msg->findPointer("audioSource", &ptr)); 469 470 mPacketSources.add( 471 LiveSession::STREAMTYPE_AUDIO, 472 static_cast<AnotherPacketSource *>(ptr)); 473 } 474 475 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) { 476 void *ptr; 477 CHECK(msg->findPointer("videoSource", &ptr)); 478 479 mPacketSources.add( 480 LiveSession::STREAMTYPE_VIDEO, 481 static_cast<AnotherPacketSource *>(ptr)); 482 } 483 484 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) { 485 void *ptr; 486 CHECK(msg->findPointer("subtitleSource", &ptr)); 487 488 mPacketSources.add( 489 LiveSession::STREAMTYPE_SUBTITLES, 490 static_cast<AnotherPacketSource *>(ptr)); 491 } 492 493 mStreamTypeMask = streamTypeMask; 494 495 mSegmentStartTimeUs = segmentStartTimeUs; 496 mDiscontinuitySeq = startDiscontinuitySeq; 497 498 if (startTimeUs >= 0) { 499 mStartTimeUs = startTimeUs; 500 mSeqNumber = -1; 501 mStartup = true; 502 mPrepared = false; 503 mAdaptive = adaptive; 504 } 505 506 postMonitorQueue(); 507 508 return OK; 509} 510 511void PlaylistFetcher::onPause() { 512 cancelMonitorQueue(); 513} 514 515void PlaylistFetcher::onStop(const sp<AMessage> &msg) { 516 cancelMonitorQueue(); 517 518 int32_t clear; 519 CHECK(msg->findInt32("clear", &clear)); 520 if (clear) { 521 for (size_t i = 0; i < mPacketSources.size(); i++) { 522 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 523 packetSource->clear(); 524 } 525 } 526 527 mPacketSources.clear(); 528 mStreamTypeMask = 0; 529} 530 531// Resume until we have reached the boundary timestamps listed in `msg`; when 532// the remaining time is too short (within a resume threshold) stop immediately 533// instead. 534status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { 535 sp<AMessage> params; 536 CHECK(msg->findMessage("params", ¶ms)); 537 538 bool stop = false; 539 for (size_t i = 0; i < mPacketSources.size(); i++) { 540 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 541 542 const char *stopKey; 543 int streamType = mPacketSources.keyAt(i); 544 switch (streamType) { 545 case LiveSession::STREAMTYPE_VIDEO: 546 stopKey = "timeUsVideo"; 547 break; 548 549 case LiveSession::STREAMTYPE_AUDIO: 550 stopKey = "timeUsAudio"; 551 break; 552 553 case LiveSession::STREAMTYPE_SUBTITLES: 554 stopKey = "timeUsSubtitle"; 555 break; 556 557 default: 558 TRESPASS(); 559 } 560 561 // Don't resume if we would stop within a resume threshold. 562 int32_t discontinuitySeq; 563 int64_t latestTimeUs = 0, stopTimeUs = 0; 564 sp<AMessage> latestMeta = packetSource->getLatestEnqueuedMeta(); 565 if (latestMeta != NULL 566 && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq) 567 && discontinuitySeq == mDiscontinuitySeq 568 && latestMeta->findInt64("timeUs", &latestTimeUs) 569 && params->findInt64(stopKey, &stopTimeUs) 570 && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) { 571 stop = true; 572 } 573 } 574 575 if (stop) { 576 for (size_t i = 0; i < mPacketSources.size(); i++) { 577 mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); 578 } 579 stopAsync(/* clear = */ false); 580 return OK; 581 } 582 583 mStopParams = params; 584 postMonitorQueue(); 585 586 return OK; 587} 588 589void PlaylistFetcher::notifyError(status_t err) { 590 sp<AMessage> notify = mNotify->dup(); 591 notify->setInt32("what", kWhatError); 592 notify->setInt32("err", err); 593 notify->post(); 594} 595 596void PlaylistFetcher::queueDiscontinuity( 597 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) { 598 for (size_t i = 0; i < mPacketSources.size(); ++i) { 599 // do not discard buffer upon #EXT-X-DISCONTINUITY tag 600 // (seek will discard buffer by abandoning old fetchers) 601 mPacketSources.valueAt(i)->queueDiscontinuity( 602 type, extra, false /* discard */); 603 } 604} 605 606void PlaylistFetcher::onMonitorQueue() { 607 bool downloadMore = false; 608 refreshPlaylist(); 609 610 int32_t targetDurationSecs; 611 int64_t targetDurationUs = kMinBufferedDurationUs; 612 if (mPlaylist != NULL) { 613 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 614 "target-duration", &targetDurationSecs)) { 615 ALOGE("Playlist is missing required EXT-X-TARGETDURATION tag"); 616 notifyError(ERROR_MALFORMED); 617 return; 618 } 619 targetDurationUs = targetDurationSecs * 1000000ll; 620 } 621 622 // buffer at least 3 times the target duration, or up to 10 seconds 623 int64_t durationToBufferUs = targetDurationUs * 3; 624 if (durationToBufferUs > kMinBufferedDurationUs) { 625 durationToBufferUs = kMinBufferedDurationUs; 626 } 627 628 int64_t bufferedDurationUs = 0ll; 629 status_t finalResult = NOT_ENOUGH_DATA; 630 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { 631 sp<AnotherPacketSource> packetSource = 632 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 633 634 bufferedDurationUs = 635 packetSource->getBufferedDurationUs(&finalResult); 636 finalResult = OK; 637 } else { 638 // Use max stream duration to prevent us from waiting on a non-existent stream; 639 // when we cannot make out from the manifest what streams are included in a playlist 640 // we might assume extra streams. 641 for (size_t i = 0; i < mPacketSources.size(); ++i) { 642 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { 643 continue; 644 } 645 646 int64_t bufferedStreamDurationUs = 647 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); 648 ALOGV("buffered %" PRId64 " for stream %d", 649 bufferedStreamDurationUs, mPacketSources.keyAt(i)); 650 if (bufferedStreamDurationUs > bufferedDurationUs) { 651 bufferedDurationUs = bufferedStreamDurationUs; 652 } 653 } 654 } 655 downloadMore = (bufferedDurationUs < durationToBufferUs); 656 657 // signal start if buffered up at least the target size 658 if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) { 659 mPrepared = true; 660 661 ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "", 662 bufferedDurationUs, targetDurationUs); 663 sp<AMessage> msg = mNotify->dup(); 664 msg->setInt32("what", kWhatTemporarilyDoneFetching); 665 msg->post(); 666 } 667 668 if (finalResult == OK && downloadMore) { 669 ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "", 670 bufferedDurationUs, durationToBufferUs); 671 // delay the next download slightly; hopefully this gives other concurrent fetchers 672 // a better chance to run. 673 // onDownloadNext(); 674 sp<AMessage> msg = new AMessage(kWhatDownloadNext, id()); 675 msg->setInt32("generation", mMonitorQueueGeneration); 676 msg->post(1000l); 677 } else { 678 // Nothing to do yet, try again in a second. 679 680 sp<AMessage> msg = mNotify->dup(); 681 msg->setInt32("what", kWhatTemporarilyDoneFetching); 682 msg->post(); 683 684 int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2; 685 ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "", 686 delayUs, bufferedDurationUs, durationToBufferUs); 687 // :TRICKY: need to enforce minimum delay because the delay to 688 // refresh the playlist will become 0 689 postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0); 690 } 691} 692 693status_t PlaylistFetcher::refreshPlaylist() { 694 if (delayUsToRefreshPlaylist() <= 0) { 695 bool unchanged; 696 sp<M3UParser> playlist = mSession->fetchPlaylist( 697 mURI.c_str(), mPlaylistHash, &unchanged); 698 699 if (playlist == NULL) { 700 if (unchanged) { 701 // We succeeded in fetching the playlist, but it was 702 // unchanged from the last time we tried. 703 704 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) { 705 mRefreshState = (RefreshState)(mRefreshState + 1); 706 } 707 } else { 708 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str()); 709 return ERROR_IO; 710 } 711 } else { 712 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; 713 mPlaylist = playlist; 714 715 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 716 updateDuration(); 717 } 718 } 719 720 mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); 721 } 722 return OK; 723} 724 725// static 726bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) { 727 return buffer->size() > 0 && buffer->data()[0] == 0x47; 728} 729 730void PlaylistFetcher::onDownloadNext() { 731 status_t err = refreshPlaylist(); 732 int32_t firstSeqNumberInPlaylist = 0; 733 int32_t lastSeqNumberInPlaylist = 0; 734 bool discontinuity = false; 735 736 if (mPlaylist != NULL) { 737 if (mPlaylist->meta() != NULL) { 738 mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist); 739 } 740 741 lastSeqNumberInPlaylist = 742 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 743 744 if (mDiscontinuitySeq < 0) { 745 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 746 } 747 } 748 749 if (mPlaylist != NULL && mSeqNumber < 0) { 750 CHECK_GE(mStartTimeUs, 0ll); 751 752 if (mSegmentStartTimeUs < 0) { 753 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { 754 // If this is a live session, start 3 segments from the end on connect 755 mSeqNumber = lastSeqNumberInPlaylist - 3; 756 if (mSeqNumber < firstSeqNumberInPlaylist) { 757 mSeqNumber = firstSeqNumberInPlaylist; 758 } 759 } else { 760 // When seeking mSegmentStartTimeUs is unavailable (< 0), we 761 // use mStartTimeUs (client supplied timestamp) to determine both start segment 762 // and relative position inside a segment 763 mSeqNumber = getSeqNumberForTime(mStartTimeUs); 764 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber); 765 } 766 mStartTimeUsRelative = true; 767 ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)", 768 mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, 769 lastSeqNumberInPlaylist); 770 } else { 771 // When adapting or track switching, mSegmentStartTimeUs (relative 772 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute 773 // timestamps coming from the media container) is used to determine the position 774 // inside a segments. 775 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs); 776 if (mAdaptive) { 777 // avoid double fetch/decode 778 mSeqNumber += 1; 779 } 780 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq); 781 if (mSeqNumber < minSeq) { 782 mSeqNumber = minSeq; 783 } 784 785 if (mSeqNumber < firstSeqNumberInPlaylist) { 786 mSeqNumber = firstSeqNumberInPlaylist; 787 } 788 789 if (mSeqNumber > lastSeqNumberInPlaylist) { 790 mSeqNumber = lastSeqNumberInPlaylist; 791 } 792 ALOGV("Initial sequence number for live event %d from (%d .. %d)", 793 mSeqNumber, firstSeqNumberInPlaylist, 794 lastSeqNumberInPlaylist); 795 } 796 } 797 798 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true 799 if (mSeqNumber < firstSeqNumberInPlaylist 800 || mSeqNumber > lastSeqNumberInPlaylist 801 || err != OK) { 802 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) { 803 ++mNumRetries; 804 805 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) { 806 // make sure we reach this retry logic on refresh failures 807 // by adding an err != OK clause to all enclosing if's. 808 809 // refresh in increasing fraction (1/2, 1/3, ...) of the 810 // playlist's target duration or 3 seconds, whichever is less 811 int64_t delayUs = kMaxMonitorDelayUs; 812 if (mPlaylist != NULL && mPlaylist->meta() != NULL) { 813 int32_t targetDurationSecs; 814 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 815 delayUs = mPlaylist->size() * targetDurationSecs * 816 1000000ll / (1 + mNumRetries); 817 } 818 if (delayUs > kMaxMonitorDelayUs) { 819 delayUs = kMaxMonitorDelayUs; 820 } 821 ALOGV("sequence number high: %d from (%d .. %d), " 822 "monitor in %" PRId64 " (retry=%d)", 823 mSeqNumber, firstSeqNumberInPlaylist, 824 lastSeqNumberInPlaylist, delayUs, mNumRetries); 825 postMonitorQueue(delayUs); 826 return; 827 } 828 829 if (err != OK) { 830 notifyError(err); 831 return; 832 } 833 834 // we've missed the boat, let's start 3 segments prior to the latest sequence 835 // number available and signal a discontinuity. 836 837 ALOGI("We've missed the boat, restarting playback." 838 " mStartup=%d, was looking for %d in %d-%d", 839 mStartup, mSeqNumber, firstSeqNumberInPlaylist, 840 lastSeqNumberInPlaylist); 841 if (mStopParams != NULL) { 842 // we should have kept on fetching until we hit the boundaries in mStopParams, 843 // but since the segments we are supposed to fetch have already rolled off 844 // the playlist, i.e. we have already missed the boat, we inevitably have to 845 // skip. 846 for (size_t i = 0; i < mPacketSources.size(); i++) { 847 sp<ABuffer> formatChange = mSession->createFormatChangeBuffer(); 848 mPacketSources.valueAt(i)->queueAccessUnit(formatChange); 849 } 850 stopAsync(/* clear = */ false); 851 return; 852 } 853 mSeqNumber = lastSeqNumberInPlaylist - 3; 854 if (mSeqNumber < firstSeqNumberInPlaylist) { 855 mSeqNumber = firstSeqNumberInPlaylist; 856 } 857 discontinuity = true; 858 859 // fall through 860 } else { 861 ALOGE("Cannot find sequence number %d in playlist " 862 "(contains %d - %d)", 863 mSeqNumber, firstSeqNumberInPlaylist, 864 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1); 865 866 notifyError(ERROR_END_OF_STREAM); 867 return; 868 } 869 } 870 871 mNumRetries = 0; 872 873 AString uri; 874 sp<AMessage> itemMeta; 875 CHECK(mPlaylist->itemAt( 876 mSeqNumber - firstSeqNumberInPlaylist, 877 &uri, 878 &itemMeta)); 879 880 int32_t val; 881 if (itemMeta->findInt32("discontinuity", &val) && val != 0) { 882 mDiscontinuitySeq++; 883 discontinuity = true; 884 } 885 886 int64_t range_offset, range_length; 887 if (!itemMeta->findInt64("range-offset", &range_offset) 888 || !itemMeta->findInt64("range-length", &range_length)) { 889 range_offset = 0; 890 range_length = -1; 891 } 892 893 ALOGV("fetching segment %d from (%d .. %d)", 894 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); 895 896 ALOGV("fetching '%s'", uri.c_str()); 897 898 sp<DataSource> source; 899 sp<ABuffer> buffer, tsBuffer; 900 // decrypt a junk buffer to prefetch key; since a session uses only one http connection, 901 // this avoids interleaved connections to the key and segment file. 902 { 903 sp<ABuffer> junk = new ABuffer(16); 904 junk->setRange(0, 16); 905 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk, 906 true /* first */); 907 if (err != OK) { 908 notifyError(err); 909 return; 910 } 911 } 912 913 // block-wise download 914 bool startup = mStartup; 915 ssize_t bytesRead; 916 do { 917 bytesRead = mSession->fetchFile( 918 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source); 919 920 if (bytesRead < 0) { 921 status_t err = bytesRead; 922 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); 923 notifyError(err); 924 return; 925 } 926 927 CHECK(buffer != NULL); 928 929 size_t size = buffer->size(); 930 // Set decryption range. 931 buffer->setRange(size - bytesRead, bytesRead); 932 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer, 933 buffer->offset() == 0 /* first */); 934 // Unset decryption range. 935 buffer->setRange(0, size); 936 937 if (err != OK) { 938 ALOGE("decryptBuffer failed w/ error %d", err); 939 940 notifyError(err); 941 return; 942 } 943 944 if (startup || discontinuity) { 945 // Signal discontinuity. 946 947 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 948 // If this was a live event this made no sense since 949 // we don't have access to all the segment before the current 950 // one. 951 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); 952 } 953 954 if (discontinuity) { 955 ALOGI("queueing discontinuity (explicit=%d)", discontinuity); 956 957 queueDiscontinuity( 958 ATSParser::DISCONTINUITY_FORMATCHANGE, 959 NULL /* extra */); 960 961 discontinuity = false; 962 } 963 964 startup = false; 965 } 966 967 err = OK; 968 if (bufferStartsWithTsSyncByte(buffer)) { 969 // Incremental extraction is only supported for MPEG2 transport streams. 970 if (tsBuffer == NULL) { 971 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 972 tsBuffer->setRange(0, 0); 973 } else if (tsBuffer->capacity() != buffer->capacity()) { 974 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size(); 975 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 976 tsBuffer->setRange(tsOff, tsSize); 977 } 978 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); 979 980 err = extractAndQueueAccessUnitsFromTs(tsBuffer); 981 } 982 983 if (err == -EAGAIN) { 984 // starting sequence number too low/high 985 mTSParser.clear(); 986 for (size_t i = 0; i < mPacketSources.size(); i++) { 987 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 988 packetSource->clear(); 989 } 990 postMonitorQueue(); 991 return; 992 } else if (err == ERROR_OUT_OF_RANGE) { 993 // reached stopping point 994 stopAsync(/* clear = */ false); 995 return; 996 } else if (err != OK) { 997 notifyError(err); 998 return; 999 } 1000 1001 } while (bytesRead != 0); 1002 1003 if (bufferStartsWithTsSyncByte(buffer)) { 1004 // If we don't see a stream in the program table after fetching a full ts segment 1005 // mark it as nonexistent. 1006 const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES; 1007 ATSParser::SourceType srcTypes[kNumTypes] = 1008 { ATSParser::VIDEO, ATSParser::AUDIO }; 1009 LiveSession::StreamType streamTypes[kNumTypes] = 1010 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO }; 1011 1012 for (size_t i = 0; i < kNumTypes; i++) { 1013 ATSParser::SourceType srcType = srcTypes[i]; 1014 LiveSession::StreamType streamType = streamTypes[i]; 1015 1016 sp<AnotherPacketSource> source = 1017 static_cast<AnotherPacketSource *>( 1018 mTSParser->getSource(srcType).get()); 1019 1020 if (!mTSParser->hasSource(srcType)) { 1021 ALOGW("MPEG2 Transport stream does not contain %s data.", 1022 srcType == ATSParser::VIDEO ? "video" : "audio"); 1023 1024 mStreamTypeMask &= ~streamType; 1025 mPacketSources.removeItem(streamType); 1026 } 1027 } 1028 1029 } 1030 1031 if (checkDecryptPadding(buffer) != OK) { 1032 ALOGE("Incorrect padding bytes after decryption."); 1033 notifyError(ERROR_MALFORMED); 1034 return; 1035 } 1036 1037 err = OK; 1038 if (tsBuffer != NULL) { 1039 AString method; 1040 CHECK(buffer->meta()->findString("cipher-method", &method)); 1041 if ((tsBuffer->size() > 0 && method == "NONE") 1042 || tsBuffer->size() > 16) { 1043 ALOGE("MPEG2 transport stream is not an even multiple of 188 " 1044 "bytes in length."); 1045 notifyError(ERROR_MALFORMED); 1046 return; 1047 } 1048 } 1049 1050 // bulk extract non-ts files 1051 if (tsBuffer == NULL) { 1052 err = extractAndQueueAccessUnits(buffer, itemMeta); 1053 if (err == -EAGAIN) { 1054 // starting sequence number too low/high 1055 postMonitorQueue(); 1056 return; 1057 } else if (err == ERROR_OUT_OF_RANGE) { 1058 // reached stopping point 1059 stopAsync(/* clear = */false); 1060 return; 1061 } 1062 } 1063 1064 if (err != OK) { 1065 notifyError(err); 1066 return; 1067 } 1068 1069 ++mSeqNumber; 1070 1071 postMonitorQueue(); 1072} 1073 1074int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const { 1075 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; 1076 if (mPlaylist->meta() == NULL 1077 || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { 1078 firstSeqNumberInPlaylist = 0; 1079 } 1080 lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1; 1081 1082 int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1; 1083 while (index >= 0 && anchorTimeUs > mStartTimeUs) { 1084 sp<AMessage> itemMeta; 1085 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); 1086 1087 int64_t itemDurationUs; 1088 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1089 1090 anchorTimeUs -= itemDurationUs; 1091 --index; 1092 } 1093 1094 int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1; 1095 if (newSeqNumber <= lastSeqNumberInPlaylist) { 1096 return newSeqNumber; 1097 } else { 1098 return lastSeqNumberInPlaylist; 1099 } 1100} 1101 1102int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const { 1103 int32_t firstSeqNumberInPlaylist; 1104 if (mPlaylist->meta() == NULL 1105 || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { 1106 firstSeqNumberInPlaylist = 0; 1107 } 1108 1109 size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 1110 if (discontinuitySeq < curDiscontinuitySeq) { 1111 return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1); 1112 } 1113 1114 size_t index = 0; 1115 while (index < mPlaylist->size()) { 1116 sp<AMessage> itemMeta; 1117 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); 1118 1119 int64_t discontinuity; 1120 if (itemMeta->findInt64("discontinuity", &discontinuity)) { 1121 curDiscontinuitySeq++; 1122 } 1123 1124 if (curDiscontinuitySeq == discontinuitySeq) { 1125 return firstSeqNumberInPlaylist + index; 1126 } 1127 1128 ++index; 1129 } 1130 1131 return firstSeqNumberInPlaylist + mPlaylist->size(); 1132} 1133 1134int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { 1135 int32_t firstSeqNumberInPlaylist; 1136 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 1137 "media-sequence", &firstSeqNumberInPlaylist)) { 1138 firstSeqNumberInPlaylist = 0; 1139 } 1140 1141 size_t index = 0; 1142 int64_t segmentStartUs = 0; 1143 while (index < mPlaylist->size()) { 1144 sp<AMessage> itemMeta; 1145 CHECK(mPlaylist->itemAt( 1146 index, NULL /* uri */, &itemMeta)); 1147 1148 int64_t itemDurationUs; 1149 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1150 1151 if (timeUs < segmentStartUs + itemDurationUs) { 1152 break; 1153 } 1154 1155 segmentStartUs += itemDurationUs; 1156 ++index; 1157 } 1158 1159 if (index >= mPlaylist->size()) { 1160 index = mPlaylist->size() - 1; 1161 } 1162 1163 return firstSeqNumberInPlaylist + index; 1164} 1165 1166const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties( 1167 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) { 1168 sp<MetaData> format = source->getFormat(); 1169 if (format != NULL) { 1170 // for simplicity, store a reference to the format in each unit 1171 accessUnit->meta()->setObject("format", format); 1172 } 1173 1174 if (discard) { 1175 accessUnit->meta()->setInt32("discard", discard); 1176 } 1177 1178 int32_t targetDurationSecs; 1179 if (mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)) { 1180 accessUnit->meta()->setInt32("targetDuration", targetDurationSecs); 1181 } 1182 1183 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1184 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1185 return accessUnit; 1186} 1187 1188status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) { 1189 if (mTSParser == NULL) { 1190 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. 1191 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); 1192 } 1193 1194 if (mNextPTSTimeUs >= 0ll) { 1195 sp<AMessage> extra = new AMessage; 1196 // Since we are using absolute timestamps, signal an offset of 0 to prevent 1197 // ATSParser from skewing the timestamps of access units. 1198 extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); 1199 1200 mTSParser->signalDiscontinuity( 1201 ATSParser::DISCONTINUITY_TIME, extra); 1202 1203 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 1204 mNextPTSTimeUs = -1ll; 1205 mFirstPTSValid = false; 1206 } 1207 1208 size_t offset = 0; 1209 while (offset + 188 <= buffer->size()) { 1210 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); 1211 1212 if (err != OK) { 1213 return err; 1214 } 1215 1216 offset += 188; 1217 } 1218 // setRange to indicate consumed bytes. 1219 buffer->setRange(buffer->offset() + offset, buffer->size() - offset); 1220 1221 status_t err = OK; 1222 for (size_t i = mPacketSources.size(); i-- > 0;) { 1223 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1224 1225 const char *key; 1226 ATSParser::SourceType type; 1227 const LiveSession::StreamType stream = mPacketSources.keyAt(i); 1228 switch (stream) { 1229 case LiveSession::STREAMTYPE_VIDEO: 1230 type = ATSParser::VIDEO; 1231 key = "timeUsVideo"; 1232 break; 1233 1234 case LiveSession::STREAMTYPE_AUDIO: 1235 type = ATSParser::AUDIO; 1236 key = "timeUsAudio"; 1237 break; 1238 1239 case LiveSession::STREAMTYPE_SUBTITLES: 1240 { 1241 ALOGE("MPEG2 Transport streams do not contain subtitles."); 1242 return ERROR_MALFORMED; 1243 break; 1244 } 1245 1246 default: 1247 TRESPASS(); 1248 } 1249 1250 sp<AnotherPacketSource> source = 1251 static_cast<AnotherPacketSource *>( 1252 mTSParser->getSource(type).get()); 1253 1254 if (source == NULL) { 1255 continue; 1256 } 1257 1258 int64_t timeUs; 1259 sp<ABuffer> accessUnit; 1260 status_t finalResult; 1261 while (source->hasBufferAvailable(&finalResult) 1262 && source->dequeueAccessUnit(&accessUnit) == OK) { 1263 1264 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1265 1266 if (mStartup) { 1267 if (!mFirstPTSValid) { 1268 mFirstTimeUs = timeUs; 1269 mFirstPTSValid = true; 1270 } 1271 if (mStartTimeUsRelative) { 1272 timeUs -= mFirstTimeUs; 1273 if (timeUs < 0) { 1274 timeUs = 0; 1275 } 1276 } 1277 1278 if (timeUs < mStartTimeUs) { 1279 // buffer up to the closest preceding IDR frame 1280 ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us", 1281 timeUs, mStartTimeUs); 1282 const char *mime; 1283 sp<MetaData> format = source->getFormat(); 1284 bool isAvc = false; 1285 if (format != NULL && format->findCString(kKeyMIMEType, &mime) 1286 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) { 1287 isAvc = true; 1288 } 1289 if (isAvc && IsIDR(accessUnit)) { 1290 mVideoBuffer->clear(); 1291 } 1292 if (isAvc) { 1293 mVideoBuffer->queueAccessUnit(accessUnit); 1294 } 1295 1296 continue; 1297 } 1298 } 1299 1300 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1301 if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) { 1302 int32_t firstSeqNumberInPlaylist; 1303 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 1304 "media-sequence", &firstSeqNumberInPlaylist)) { 1305 firstSeqNumberInPlaylist = 0; 1306 } 1307 1308 int32_t targetDurationSecs; 1309 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 1310 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 1311 // mStartup 1312 // mStartup is true until we have queued a packet for all the streams 1313 // we are fetching. We queue packets whose timestamps are greater than 1314 // mStartTimeUs. 1315 // mSegmentStartTimeUs >= 0 1316 // mSegmentStartTimeUs is non-negative when adapting or switching tracks 1317 // mSeqNumber > firstSeqNumberInPlaylist 1318 // don't decrement mSeqNumber if it already points to the 1st segment 1319 // timeUs - mStartTimeUs > targetDurationUs: 1320 // This and the 2 above conditions should only happen when adapting in a live 1321 // stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher 1322 // would start fetching after timeUs, which should be greater than mStartTimeUs; 1323 // the old fetcher would then continue fetching data until timeUs. We don't want 1324 // timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to 1325 // stop as early as possible. The definition of being "too far ahead" is 1326 // arbitrary; here we use targetDurationUs as threshold. 1327 if (mStartup && mSegmentStartTimeUs >= 0 1328 && mSeqNumber > firstSeqNumberInPlaylist 1329 && timeUs - mStartTimeUs > targetDurationUs) { 1330 // we just guessed a starting timestamp that is too high when adapting in a 1331 // live stream; re-adjust based on the actual timestamp extracted from the 1332 // media segment; if we didn't move backward after the re-adjustment 1333 // (newSeqNumber), start at least 1 segment prior. 1334 int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); 1335 if (newSeqNumber >= mSeqNumber) { 1336 --mSeqNumber; 1337 } else { 1338 mSeqNumber = newSeqNumber; 1339 } 1340 mStartTimeUsNotify = mNotify->dup(); 1341 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 1342 return -EAGAIN; 1343 } 1344 1345 int32_t seq; 1346 if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) { 1347 mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); 1348 } 1349 int64_t startTimeUs; 1350 if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) { 1351 mStartTimeUsNotify->setInt64(key, timeUs); 1352 1353 uint32_t streamMask = 0; 1354 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); 1355 streamMask |= mPacketSources.keyAt(i); 1356 mStartTimeUsNotify->setInt32("streamMask", streamMask); 1357 1358 if (streamMask == mStreamTypeMask) { 1359 mStartup = false; 1360 mStartTimeUsNotify->post(); 1361 mStartTimeUsNotify.clear(); 1362 } 1363 } 1364 } 1365 1366 if (mStopParams != NULL) { 1367 // Queue discontinuity in original stream. 1368 int32_t discontinuitySeq; 1369 int64_t stopTimeUs; 1370 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1371 || discontinuitySeq > mDiscontinuitySeq 1372 || !mStopParams->findInt64(key, &stopTimeUs) 1373 || (discontinuitySeq == mDiscontinuitySeq 1374 && timeUs >= stopTimeUs)) { 1375 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); 1376 mStreamTypeMask &= ~stream; 1377 mPacketSources.removeItemsAt(i); 1378 break; 1379 } 1380 } 1381 1382 // Note that we do NOT dequeue any discontinuities except for format change. 1383 if (stream == LiveSession::STREAMTYPE_VIDEO) { 1384 const bool discard = true; 1385 status_t status; 1386 while (mVideoBuffer->hasBufferAvailable(&status)) { 1387 sp<ABuffer> videoBuffer; 1388 mVideoBuffer->dequeueAccessUnit(&videoBuffer); 1389 setAccessUnitProperties(videoBuffer, source, discard); 1390 packetSource->queueAccessUnit(videoBuffer); 1391 } 1392 } 1393 1394 setAccessUnitProperties(accessUnit, source); 1395 packetSource->queueAccessUnit(accessUnit); 1396 } 1397 1398 if (err != OK) { 1399 break; 1400 } 1401 } 1402 1403 if (err != OK) { 1404 for (size_t i = mPacketSources.size(); i-- > 0;) { 1405 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1406 packetSource->clear(); 1407 } 1408 return err; 1409 } 1410 1411 if (!mStreamTypeMask) { 1412 // Signal gap is filled between original and new stream. 1413 ALOGV("ERROR OUT OF RANGE"); 1414 return ERROR_OUT_OF_RANGE; 1415 } 1416 1417 return OK; 1418} 1419 1420/* static */ 1421bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence( 1422 const sp<ABuffer> &buffer) { 1423 size_t pos = 0; 1424 1425 // skip possible BOM 1426 if (buffer->size() >= pos + 3 && 1427 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) { 1428 pos += 3; 1429 } 1430 1431 // accept WEBVTT followed by SPACE, TAB or (CR) LF 1432 if (buffer->size() < pos + 6 || 1433 memcmp("WEBVTT", buffer->data() + pos, 6)) { 1434 return false; 1435 } 1436 pos += 6; 1437 1438 if (buffer->size() == pos) { 1439 return true; 1440 } 1441 1442 uint8_t sep = buffer->data()[pos]; 1443 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r'; 1444} 1445 1446status_t PlaylistFetcher::extractAndQueueAccessUnits( 1447 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { 1448 if (bufferStartsWithWebVTTMagicSequence(buffer)) { 1449 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { 1450 ALOGE("This stream only contains subtitles."); 1451 return ERROR_MALFORMED; 1452 } 1453 1454 const sp<AnotherPacketSource> packetSource = 1455 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 1456 1457 int64_t durationUs; 1458 CHECK(itemMeta->findInt64("durationUs", &durationUs)); 1459 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); 1460 buffer->meta()->setInt64("durationUs", durationUs); 1461 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1462 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1463 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration); 1464 1465 packetSource->queueAccessUnit(buffer); 1466 return OK; 1467 } 1468 1469 if (mNextPTSTimeUs >= 0ll) { 1470 mFirstPTSValid = false; 1471 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 1472 mNextPTSTimeUs = -1ll; 1473 } 1474 1475 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio 1476 // stream prefixed by an ID3 tag. 1477 1478 bool firstID3Tag = true; 1479 uint64_t PTS = 0; 1480 1481 for (;;) { 1482 // Make sure to skip all ID3 tags preceding the audio data. 1483 // At least one must be present to provide the PTS timestamp. 1484 1485 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */); 1486 if (!id3.isValid()) { 1487 if (firstID3Tag) { 1488 ALOGE("Unable to parse ID3 tag."); 1489 return ERROR_MALFORMED; 1490 } else { 1491 break; 1492 } 1493 } 1494 1495 if (firstID3Tag) { 1496 bool found = false; 1497 1498 ID3::Iterator it(id3, "PRIV"); 1499 while (!it.done()) { 1500 size_t length; 1501 const uint8_t *data = it.getData(&length); 1502 1503 static const char *kMatchName = 1504 "com.apple.streaming.transportStreamTimestamp"; 1505 static const size_t kMatchNameLen = strlen(kMatchName); 1506 1507 if (length == kMatchNameLen + 1 + 8 1508 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) { 1509 found = true; 1510 PTS = U64_AT(&data[kMatchNameLen + 1]); 1511 } 1512 1513 it.next(); 1514 } 1515 1516 if (!found) { 1517 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag."); 1518 return ERROR_MALFORMED; 1519 } 1520 } 1521 1522 // skip the ID3 tag 1523 buffer->setRange( 1524 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize()); 1525 1526 firstID3Tag = false; 1527 } 1528 1529 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { 1530 ALOGW("This stream only contains audio data!"); 1531 1532 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO; 1533 1534 if (mStreamTypeMask == 0) { 1535 return OK; 1536 } 1537 } 1538 1539 sp<AnotherPacketSource> packetSource = 1540 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO); 1541 1542 if (packetSource->getFormat() == NULL && buffer->size() >= 7) { 1543 ABitReader bits(buffer->data(), buffer->size()); 1544 1545 // adts_fixed_header 1546 1547 CHECK_EQ(bits.getBits(12), 0xfffu); 1548 bits.skipBits(3); // ID, layer 1549 bool protection_absent __unused = bits.getBits(1) != 0; 1550 1551 unsigned profile = bits.getBits(2); 1552 CHECK_NE(profile, 3u); 1553 unsigned sampling_freq_index = bits.getBits(4); 1554 bits.getBits(1); // private_bit 1555 unsigned channel_configuration = bits.getBits(3); 1556 CHECK_NE(channel_configuration, 0u); 1557 bits.skipBits(2); // original_copy, home 1558 1559 sp<MetaData> meta = MakeAACCodecSpecificData( 1560 profile, sampling_freq_index, channel_configuration); 1561 1562 meta->setInt32(kKeyIsADTS, true); 1563 1564 packetSource->setFormat(meta); 1565 } 1566 1567 int64_t numSamples = 0ll; 1568 int32_t sampleRate; 1569 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); 1570 1571 int64_t timeUs = (PTS * 100ll) / 9ll; 1572 if (!mFirstPTSValid) { 1573 mFirstPTSValid = true; 1574 mFirstTimeUs = timeUs; 1575 } 1576 1577 size_t offset = 0; 1578 while (offset < buffer->size()) { 1579 const uint8_t *adtsHeader = buffer->data() + offset; 1580 CHECK_LT(offset + 5, buffer->size()); 1581 1582 unsigned aac_frame_length = 1583 ((adtsHeader[3] & 3) << 11) 1584 | (adtsHeader[4] << 3) 1585 | (adtsHeader[5] >> 5); 1586 1587 if (aac_frame_length == 0) { 1588 const uint8_t *id3Header = adtsHeader; 1589 if (!memcmp(id3Header, "ID3", 3)) { 1590 ID3 id3(id3Header, buffer->size() - offset, true); 1591 if (id3.isValid()) { 1592 offset += id3.rawSize(); 1593 continue; 1594 }; 1595 } 1596 return ERROR_MALFORMED; 1597 } 1598 1599 CHECK_LE(offset + aac_frame_length, buffer->size()); 1600 1601 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; 1602 offset += aac_frame_length; 1603 1604 // Each AAC frame encodes 1024 samples. 1605 numSamples += 1024; 1606 1607 if (mStartup) { 1608 int64_t startTimeUs = unitTimeUs; 1609 if (mStartTimeUsRelative) { 1610 startTimeUs -= mFirstTimeUs; 1611 if (startTimeUs < 0) { 1612 startTimeUs = 0; 1613 } 1614 } 1615 if (startTimeUs < mStartTimeUs) { 1616 continue; 1617 } 1618 1619 if (mStartTimeUsNotify != NULL) { 1620 int32_t targetDurationSecs; 1621 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 1622 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 1623 1624 // Duplicated logic from how we handle .ts playlists. 1625 if (mStartup && mSegmentStartTimeUs >= 0 1626 && timeUs - mStartTimeUs > targetDurationUs) { 1627 int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); 1628 if (newSeqNumber >= mSeqNumber) { 1629 --mSeqNumber; 1630 } else { 1631 mSeqNumber = newSeqNumber; 1632 } 1633 return -EAGAIN; 1634 } 1635 1636 mStartTimeUsNotify->setInt64("timeUsAudio", timeUs); 1637 mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); 1638 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO); 1639 mStartTimeUsNotify->post(); 1640 mStartTimeUsNotify.clear(); 1641 mStartup = false; 1642 } 1643 } 1644 1645 if (mStopParams != NULL) { 1646 // Queue discontinuity in original stream. 1647 int32_t discontinuitySeq; 1648 int64_t stopTimeUs; 1649 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1650 || discontinuitySeq > mDiscontinuitySeq 1651 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs) 1652 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) { 1653 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); 1654 mStreamTypeMask = 0; 1655 mPacketSources.clear(); 1656 return ERROR_OUT_OF_RANGE; 1657 } 1658 } 1659 1660 sp<ABuffer> unit = new ABuffer(aac_frame_length); 1661 memcpy(unit->data(), adtsHeader, aac_frame_length); 1662 1663 unit->meta()->setInt64("timeUs", unitTimeUs); 1664 setAccessUnitProperties(unit, packetSource); 1665 packetSource->queueAccessUnit(unit); 1666 } 1667 1668 return OK; 1669} 1670 1671void PlaylistFetcher::updateDuration() { 1672 int64_t durationUs = 0ll; 1673 for (size_t index = 0; index < mPlaylist->size(); ++index) { 1674 sp<AMessage> itemMeta; 1675 CHECK(mPlaylist->itemAt( 1676 index, NULL /* uri */, &itemMeta)); 1677 1678 int64_t itemDurationUs; 1679 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1680 1681 durationUs += itemDurationUs; 1682 } 1683 1684 sp<AMessage> msg = mNotify->dup(); 1685 msg->setInt32("what", kWhatDurationUpdate); 1686 msg->setInt64("durationUs", durationUs); 1687 msg->post(); 1688} 1689 1690int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) { 1691 int64_t durationUs; 1692 if (msg->findInt64("durationUs", &durationUs) && durationUs > 0) { 1693 return kNumSkipFrames * durationUs; 1694 } 1695 1696 sp<RefBase> obj; 1697 msg->findObject("format", &obj); 1698 MetaData *format = static_cast<MetaData *>(obj.get()); 1699 1700 const char *mime; 1701 CHECK(format->findCString(kKeyMIMEType, &mime)); 1702 bool audio = !strncasecmp(mime, "audio/", 6); 1703 if (audio) { 1704 // Assumes 1000 samples per frame. 1705 int32_t sampleRate; 1706 CHECK(format->findInt32(kKeySampleRate, &sampleRate)); 1707 return kNumSkipFrames /* frames */ * 1000 /* samples */ 1708 * (1000000 / sampleRate) /* sample duration (us) */; 1709 } else { 1710 int32_t frameRate; 1711 if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) { 1712 return kNumSkipFrames * (1000000 / frameRate); 1713 } 1714 } 1715 1716 return 500000ll; 1717} 1718 1719} // namespace android 1720