PlaylistFetcher.cpp revision 964adb17885185808398507d2de88665fe193ee2
1/* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "PlaylistFetcher" 19#include <utils/Log.h> 20 21#include "PlaylistFetcher.h" 22 23#include "LiveDataSource.h" 24#include "LiveSession.h" 25#include "M3UParser.h" 26 27#include "include/avc_utils.h" 28#include "include/HTTPBase.h" 29#include "include/ID3.h" 30#include "mpeg2ts/AnotherPacketSource.h" 31 32#include <media/IStreamSource.h> 33#include <media/stagefright/foundation/ABitReader.h> 34#include <media/stagefright/foundation/ABuffer.h> 35#include <media/stagefright/foundation/ADebug.h> 36#include <media/stagefright/foundation/hexdump.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaDefs.h> 39#include <media/stagefright/MetaData.h> 40#include <media/stagefright/Utils.h> 41 42#include <ctype.h> 43#include <inttypes.h> 44#include <openssl/aes.h> 45#include <openssl/md5.h> 46 47namespace android { 48 49// static 50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; 51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; 52const int64_t PlaylistFetcher::kFetcherResumeThreshold = 100000ll; 53// LCM of 188 (size of a TS packet) & 1k works well 54const int32_t PlaylistFetcher::kDownloadBlockSize = 47 * 1024; 55const int32_t PlaylistFetcher::kNumSkipFrames = 5; 56 57PlaylistFetcher::PlaylistFetcher( 58 const sp<AMessage> ¬ify, 59 const sp<LiveSession> &session, 60 const char *uri, 61 int32_t subtitleGeneration) 62 : mNotify(notify), 63 mStartTimeUsNotify(notify->dup()), 64 mSession(session), 65 mURI(uri), 66 mStreamTypeMask(0), 67 mStartTimeUs(-1ll), 68 mSegmentStartTimeUs(-1ll), 69 mDiscontinuitySeq(-1ll), 70 mStartTimeUsRelative(false), 71 mLastPlaylistFetchTimeUs(-1ll), 72 mSeqNumber(-1), 73 mNumRetries(0), 74 mStartup(true), 75 mAdaptive(false), 76 mPrepared(false), 77 mNextPTSTimeUs(-1ll), 78 mMonitorQueueGeneration(0), 79 mSubtitleGeneration(subtitleGeneration), 80 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), 81 mFirstPTSValid(false), 82 mAbsoluteTimeAnchorUs(0ll), 83 mVideoBuffer(new AnotherPacketSource(NULL)) { 84 memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); 85 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 86 mStartTimeUsNotify->setInt32("streamMask", 0); 87} 88 89PlaylistFetcher::~PlaylistFetcher() { 90} 91 92int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { 93 CHECK(mPlaylist != NULL); 94 95 int32_t firstSeqNumberInPlaylist; 96 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 97 "media-sequence", &firstSeqNumberInPlaylist)) { 98 firstSeqNumberInPlaylist = 0; 99 } 100 101 int32_t lastSeqNumberInPlaylist = 102 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 103 104 CHECK_GE(seqNumber, firstSeqNumberInPlaylist); 105 CHECK_LE(seqNumber, lastSeqNumberInPlaylist); 106 107 int64_t segmentStartUs = 0ll; 108 for (int32_t index = 0; 109 index < seqNumber - firstSeqNumberInPlaylist; ++index) { 110 sp<AMessage> itemMeta; 111 CHECK(mPlaylist->itemAt( 112 index, NULL /* uri */, &itemMeta)); 113 114 int64_t itemDurationUs; 115 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 116 117 segmentStartUs += itemDurationUs; 118 } 119 120 return segmentStartUs; 121} 122 123int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { 124 int64_t nowUs = ALooper::GetNowUs(); 125 126 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) { 127 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY); 128 return 0ll; 129 } 130 131 if (mPlaylist->isComplete()) { 132 return (~0llu >> 1); 133 } 134 135 int32_t targetDurationSecs; 136 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 137 138 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 139 140 int64_t minPlaylistAgeUs; 141 142 switch (mRefreshState) { 143 case INITIAL_MINIMUM_RELOAD_DELAY: 144 { 145 size_t n = mPlaylist->size(); 146 if (n > 0) { 147 sp<AMessage> itemMeta; 148 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta)); 149 150 int64_t itemDurationUs; 151 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 152 153 minPlaylistAgeUs = itemDurationUs; 154 break; 155 } 156 157 // fall through 158 } 159 160 case FIRST_UNCHANGED_RELOAD_ATTEMPT: 161 { 162 minPlaylistAgeUs = targetDurationUs / 2; 163 break; 164 } 165 166 case SECOND_UNCHANGED_RELOAD_ATTEMPT: 167 { 168 minPlaylistAgeUs = (targetDurationUs * 3) / 2; 169 break; 170 } 171 172 case THIRD_UNCHANGED_RELOAD_ATTEMPT: 173 { 174 minPlaylistAgeUs = targetDurationUs * 3; 175 break; 176 } 177 178 default: 179 TRESPASS(); 180 break; 181 } 182 183 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs; 184 return delayUs > 0ll ? delayUs : 0ll; 185} 186 187status_t PlaylistFetcher::decryptBuffer( 188 size_t playlistIndex, const sp<ABuffer> &buffer, 189 bool first) { 190 sp<AMessage> itemMeta; 191 bool found = false; 192 AString method; 193 194 for (ssize_t i = playlistIndex; i >= 0; --i) { 195 AString uri; 196 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); 197 198 if (itemMeta->findString("cipher-method", &method)) { 199 found = true; 200 break; 201 } 202 } 203 204 if (!found) { 205 method = "NONE"; 206 } 207 buffer->meta()->setString("cipher-method", method.c_str()); 208 209 if (method == "NONE") { 210 return OK; 211 } else if (!(method == "AES-128")) { 212 ALOGE("Unsupported cipher method '%s'", method.c_str()); 213 return ERROR_UNSUPPORTED; 214 } 215 216 AString keyURI; 217 if (!itemMeta->findString("cipher-uri", &keyURI)) { 218 ALOGE("Missing key uri"); 219 return ERROR_MALFORMED; 220 } 221 222 ssize_t index = mAESKeyForURI.indexOfKey(keyURI); 223 224 sp<ABuffer> key; 225 if (index >= 0) { 226 key = mAESKeyForURI.valueAt(index); 227 } else { 228 ssize_t err = mSession->fetchFile(keyURI.c_str(), &key); 229 230 if (err < 0) { 231 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); 232 return ERROR_IO; 233 } else if (key->size() != 16) { 234 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str()); 235 return ERROR_MALFORMED; 236 } 237 238 mAESKeyForURI.add(keyURI, key); 239 } 240 241 AES_KEY aes_key; 242 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { 243 ALOGE("failed to set AES decryption key."); 244 return UNKNOWN_ERROR; 245 } 246 247 size_t n = buffer->size(); 248 if (!n) { 249 return OK; 250 } 251 CHECK(n % 16 == 0); 252 253 if (first) { 254 // If decrypting the first block in a file, read the iv from the manifest 255 // or derive the iv from the file's sequence number. 256 257 AString iv; 258 if (itemMeta->findString("cipher-iv", &iv)) { 259 if ((!iv.startsWith("0x") && !iv.startsWith("0X")) 260 || iv.size() != 16 * 2 + 2) { 261 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 262 return ERROR_MALFORMED; 263 } 264 265 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 266 for (size_t i = 0; i < 16; ++i) { 267 char c1 = tolower(iv.c_str()[2 + 2 * i]); 268 char c2 = tolower(iv.c_str()[3 + 2 * i]); 269 if (!isxdigit(c1) || !isxdigit(c2)) { 270 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 271 return ERROR_MALFORMED; 272 } 273 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; 274 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; 275 276 mAESInitVec[i] = nibble1 << 4 | nibble2; 277 } 278 } else { 279 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 280 mAESInitVec[15] = mSeqNumber & 0xff; 281 mAESInitVec[14] = (mSeqNumber >> 8) & 0xff; 282 mAESInitVec[13] = (mSeqNumber >> 16) & 0xff; 283 mAESInitVec[12] = (mSeqNumber >> 24) & 0xff; 284 } 285 } 286 287 AES_cbc_encrypt( 288 buffer->data(), buffer->data(), buffer->size(), 289 &aes_key, mAESInitVec, AES_DECRYPT); 290 291 return OK; 292} 293 294status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) { 295 AString method; 296 CHECK(buffer->meta()->findString("cipher-method", &method)); 297 if (method == "NONE") { 298 return OK; 299 } 300 301 uint8_t padding = 0; 302 if (buffer->size() > 0) { 303 padding = buffer->data()[buffer->size() - 1]; 304 } 305 306 if (padding > 16) { 307 return ERROR_MALFORMED; 308 } 309 310 for (size_t i = buffer->size() - padding; i < padding; i++) { 311 if (buffer->data()[i] != padding) { 312 return ERROR_MALFORMED; 313 } 314 } 315 316 buffer->setRange(buffer->offset(), buffer->size() - padding); 317 return OK; 318} 319 320void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { 321 int64_t maxDelayUs = delayUsToRefreshPlaylist(); 322 if (maxDelayUs < minDelayUs) { 323 maxDelayUs = minDelayUs; 324 } 325 if (delayUs > maxDelayUs) { 326 ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs); 327 delayUs = maxDelayUs; 328 } 329 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, this); 330 msg->setInt32("generation", mMonitorQueueGeneration); 331 msg->post(delayUs); 332} 333 334void PlaylistFetcher::cancelMonitorQueue() { 335 ++mMonitorQueueGeneration; 336} 337 338void PlaylistFetcher::startAsync( 339 const sp<AnotherPacketSource> &audioSource, 340 const sp<AnotherPacketSource> &videoSource, 341 const sp<AnotherPacketSource> &subtitleSource, 342 int64_t startTimeUs, 343 int64_t segmentStartTimeUs, 344 int32_t startDiscontinuitySeq, 345 bool adaptive) { 346 sp<AMessage> msg = new AMessage(kWhatStart, this); 347 348 uint32_t streamTypeMask = 0ul; 349 350 if (audioSource != NULL) { 351 msg->setPointer("audioSource", audioSource.get()); 352 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO; 353 } 354 355 if (videoSource != NULL) { 356 msg->setPointer("videoSource", videoSource.get()); 357 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO; 358 } 359 360 if (subtitleSource != NULL) { 361 msg->setPointer("subtitleSource", subtitleSource.get()); 362 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES; 363 } 364 365 msg->setInt32("streamTypeMask", streamTypeMask); 366 msg->setInt64("startTimeUs", startTimeUs); 367 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs); 368 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq); 369 msg->setInt32("adaptive", adaptive); 370 msg->post(); 371} 372 373void PlaylistFetcher::pauseAsync() { 374 (new AMessage(kWhatPause, this))->post(); 375} 376 377void PlaylistFetcher::stopAsync(bool clear) { 378 sp<AMessage> msg = new AMessage(kWhatStop, this); 379 msg->setInt32("clear", clear); 380 msg->post(); 381} 382 383void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) { 384 AMessage* msg = new AMessage(kWhatResumeUntil, this); 385 msg->setMessage("params", params); 386 msg->post(); 387} 388 389void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { 390 switch (msg->what()) { 391 case kWhatStart: 392 { 393 status_t err = onStart(msg); 394 395 sp<AMessage> notify = mNotify->dup(); 396 notify->setInt32("what", kWhatStarted); 397 notify->setInt32("err", err); 398 notify->post(); 399 break; 400 } 401 402 case kWhatPause: 403 { 404 onPause(); 405 406 sp<AMessage> notify = mNotify->dup(); 407 notify->setInt32("what", kWhatPaused); 408 notify->post(); 409 break; 410 } 411 412 case kWhatStop: 413 { 414 onStop(msg); 415 416 sp<AMessage> notify = mNotify->dup(); 417 notify->setInt32("what", kWhatStopped); 418 notify->post(); 419 break; 420 } 421 422 case kWhatMonitorQueue: 423 case kWhatDownloadNext: 424 { 425 int32_t generation; 426 CHECK(msg->findInt32("generation", &generation)); 427 428 if (generation != mMonitorQueueGeneration) { 429 // Stale event 430 break; 431 } 432 433 if (msg->what() == kWhatMonitorQueue) { 434 onMonitorQueue(); 435 } else { 436 onDownloadNext(); 437 } 438 break; 439 } 440 441 case kWhatResumeUntil: 442 { 443 onResumeUntil(msg); 444 break; 445 } 446 447 default: 448 TRESPASS(); 449 } 450} 451 452status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { 453 mPacketSources.clear(); 454 455 uint32_t streamTypeMask; 456 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); 457 458 int64_t startTimeUs; 459 int64_t segmentStartTimeUs; 460 int32_t startDiscontinuitySeq; 461 int32_t adaptive; 462 CHECK(msg->findInt64("startTimeUs", &startTimeUs)); 463 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs)); 464 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq)); 465 CHECK(msg->findInt32("adaptive", &adaptive)); 466 467 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { 468 void *ptr; 469 CHECK(msg->findPointer("audioSource", &ptr)); 470 471 mPacketSources.add( 472 LiveSession::STREAMTYPE_AUDIO, 473 static_cast<AnotherPacketSource *>(ptr)); 474 } 475 476 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) { 477 void *ptr; 478 CHECK(msg->findPointer("videoSource", &ptr)); 479 480 mPacketSources.add( 481 LiveSession::STREAMTYPE_VIDEO, 482 static_cast<AnotherPacketSource *>(ptr)); 483 } 484 485 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) { 486 void *ptr; 487 CHECK(msg->findPointer("subtitleSource", &ptr)); 488 489 mPacketSources.add( 490 LiveSession::STREAMTYPE_SUBTITLES, 491 static_cast<AnotherPacketSource *>(ptr)); 492 } 493 494 mStreamTypeMask = streamTypeMask; 495 496 mSegmentStartTimeUs = segmentStartTimeUs; 497 mDiscontinuitySeq = startDiscontinuitySeq; 498 499 if (startTimeUs >= 0) { 500 mStartTimeUs = startTimeUs; 501 mSeqNumber = -1; 502 mStartup = true; 503 mPrepared = false; 504 mAdaptive = adaptive; 505 } 506 507 postMonitorQueue(); 508 509 return OK; 510} 511 512void PlaylistFetcher::onPause() { 513 cancelMonitorQueue(); 514} 515 516void PlaylistFetcher::onStop(const sp<AMessage> &msg) { 517 cancelMonitorQueue(); 518 519 int32_t clear; 520 CHECK(msg->findInt32("clear", &clear)); 521 if (clear) { 522 for (size_t i = 0; i < mPacketSources.size(); i++) { 523 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 524 packetSource->clear(); 525 } 526 } 527 528 mPacketSources.clear(); 529 mStreamTypeMask = 0; 530} 531 532// Resume until we have reached the boundary timestamps listed in `msg`; when 533// the remaining time is too short (within a resume threshold) stop immediately 534// instead. 535status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { 536 sp<AMessage> params; 537 CHECK(msg->findMessage("params", ¶ms)); 538 539 size_t stopCount = 0; 540 for (size_t i = 0; i < mPacketSources.size(); i++) { 541 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 542 543 const char *stopKey; 544 int streamType = mPacketSources.keyAt(i); 545 546 if (streamType == LiveSession::STREAMTYPE_SUBTITLES) { 547 // the subtitle track can always be stopped 548 ++stopCount; 549 continue; 550 } 551 552 switch (streamType) { 553 case LiveSession::STREAMTYPE_VIDEO: 554 stopKey = "timeUsVideo"; 555 break; 556 557 case LiveSession::STREAMTYPE_AUDIO: 558 stopKey = "timeUsAudio"; 559 break; 560 561 default: 562 TRESPASS(); 563 } 564 565 // check if this stream has too little data left to be resumed 566 int32_t discontinuitySeq; 567 int64_t latestTimeUs = 0, stopTimeUs = 0; 568 sp<AMessage> latestMeta = packetSource->getLatestEnqueuedMeta(); 569 if (latestMeta != NULL 570 && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq) 571 && discontinuitySeq == mDiscontinuitySeq 572 && latestMeta->findInt64("timeUs", &latestTimeUs) 573 && params->findInt64(stopKey, &stopTimeUs) 574 && stopTimeUs - latestTimeUs < kFetcherResumeThreshold) { 575 ++stopCount; 576 } 577 } 578 579 // Don't resume if all streams are within a resume threshold 580 if (stopCount == mPacketSources.size()) { 581 for (size_t i = 0; i < mPacketSources.size(); i++) { 582 mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); 583 } 584 stopAsync(/* clear = */ false); 585 return OK; 586 } 587 588 mStopParams = params; 589 onDownloadNext(); 590 591 return OK; 592} 593 594void PlaylistFetcher::notifyError(status_t err) { 595 sp<AMessage> notify = mNotify->dup(); 596 notify->setInt32("what", kWhatError); 597 notify->setInt32("err", err); 598 notify->post(); 599} 600 601void PlaylistFetcher::queueDiscontinuity( 602 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) { 603 for (size_t i = 0; i < mPacketSources.size(); ++i) { 604 // do not discard buffer upon #EXT-X-DISCONTINUITY tag 605 // (seek will discard buffer by abandoning old fetchers) 606 mPacketSources.valueAt(i)->queueDiscontinuity( 607 type, extra, false /* discard */); 608 } 609} 610 611void PlaylistFetcher::onMonitorQueue() { 612 bool downloadMore = false; 613 refreshPlaylist(); 614 615 int32_t targetDurationSecs; 616 int64_t targetDurationUs = kMinBufferedDurationUs; 617 if (mPlaylist != NULL) { 618 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 619 "target-duration", &targetDurationSecs)) { 620 ALOGE("Playlist is missing required EXT-X-TARGETDURATION tag"); 621 notifyError(ERROR_MALFORMED); 622 return; 623 } 624 targetDurationUs = targetDurationSecs * 1000000ll; 625 } 626 627 // buffer at least 3 times the target duration, or up to 10 seconds 628 int64_t durationToBufferUs = targetDurationUs * 3; 629 if (durationToBufferUs > kMinBufferedDurationUs) { 630 durationToBufferUs = kMinBufferedDurationUs; 631 } 632 633 int64_t bufferedDurationUs = 0ll; 634 status_t finalResult = NOT_ENOUGH_DATA; 635 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { 636 sp<AnotherPacketSource> packetSource = 637 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 638 639 bufferedDurationUs = 640 packetSource->getBufferedDurationUs(&finalResult); 641 finalResult = OK; 642 } else { 643 // Use max stream duration to prevent us from waiting on a non-existent stream; 644 // when we cannot make out from the manifest what streams are included in a playlist 645 // we might assume extra streams. 646 for (size_t i = 0; i < mPacketSources.size(); ++i) { 647 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { 648 continue; 649 } 650 651 int64_t bufferedStreamDurationUs = 652 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); 653 ALOGV("buffered %" PRId64 " for stream %d", 654 bufferedStreamDurationUs, mPacketSources.keyAt(i)); 655 if (bufferedStreamDurationUs > bufferedDurationUs) { 656 bufferedDurationUs = bufferedStreamDurationUs; 657 } 658 } 659 } 660 downloadMore = (bufferedDurationUs < durationToBufferUs); 661 662 // signal start if buffered up at least the target size 663 if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) { 664 mPrepared = true; 665 666 ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "", 667 bufferedDurationUs, targetDurationUs); 668 } 669 670 if (finalResult == OK && downloadMore) { 671 ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "", 672 bufferedDurationUs, durationToBufferUs); 673 // delay the next download slightly; hopefully this gives other concurrent fetchers 674 // a better chance to run. 675 // onDownloadNext(); 676 sp<AMessage> msg = new AMessage(kWhatDownloadNext, this); 677 msg->setInt32("generation", mMonitorQueueGeneration); 678 msg->post(1000l); 679 } else { 680 // Nothing to do yet, try again in a second. 681 int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2; 682 ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "", 683 delayUs, bufferedDurationUs, durationToBufferUs); 684 // :TRICKY: need to enforce minimum delay because the delay to 685 // refresh the playlist will become 0 686 postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0); 687 } 688} 689 690status_t PlaylistFetcher::refreshPlaylist() { 691 if (delayUsToRefreshPlaylist() <= 0) { 692 bool unchanged; 693 sp<M3UParser> playlist = mSession->fetchPlaylist( 694 mURI.c_str(), mPlaylistHash, &unchanged); 695 696 if (playlist == NULL) { 697 if (unchanged) { 698 // We succeeded in fetching the playlist, but it was 699 // unchanged from the last time we tried. 700 701 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) { 702 mRefreshState = (RefreshState)(mRefreshState + 1); 703 } 704 } else { 705 ALOGE("failed to load playlist at url '%s'", uriDebugString(mURI).c_str()); 706 return ERROR_IO; 707 } 708 } else { 709 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; 710 mPlaylist = playlist; 711 712 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 713 updateDuration(); 714 } 715 } 716 717 mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); 718 } 719 return OK; 720} 721 722// static 723bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) { 724 return buffer->size() > 0 && buffer->data()[0] == 0x47; 725} 726 727void PlaylistFetcher::onDownloadNext() { 728 status_t err = refreshPlaylist(); 729 int32_t firstSeqNumberInPlaylist = 0; 730 int32_t lastSeqNumberInPlaylist = 0; 731 bool discontinuity = false; 732 733 if (mPlaylist != NULL) { 734 if (mPlaylist->meta() != NULL) { 735 mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist); 736 } 737 738 lastSeqNumberInPlaylist = 739 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 740 741 if (mDiscontinuitySeq < 0) { 742 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 743 } 744 } 745 746 if (mPlaylist != NULL && mSeqNumber < 0) { 747 CHECK_GE(mStartTimeUs, 0ll); 748 749 if (mSegmentStartTimeUs < 0) { 750 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { 751 // If this is a live session, start 3 segments from the end on connect 752 mSeqNumber = lastSeqNumberInPlaylist - 3; 753 if (mSeqNumber < firstSeqNumberInPlaylist) { 754 mSeqNumber = firstSeqNumberInPlaylist; 755 } 756 } else { 757 // When seeking mSegmentStartTimeUs is unavailable (< 0), we 758 // use mStartTimeUs (client supplied timestamp) to determine both start segment 759 // and relative position inside a segment 760 mSeqNumber = getSeqNumberForTime(mStartTimeUs); 761 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber); 762 } 763 mStartTimeUsRelative = true; 764 ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)", 765 mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, 766 lastSeqNumberInPlaylist); 767 } else { 768 // When adapting or track switching, mSegmentStartTimeUs (relative 769 // to media time 0) is used to determine the start segment; mStartTimeUs (absolute 770 // timestamps coming from the media container) is used to determine the position 771 // inside a segments. 772 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs); 773 if (mAdaptive) { 774 // avoid double fetch/decode 775 mSeqNumber += 1; 776 } 777 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq); 778 if (mSeqNumber < minSeq) { 779 mSeqNumber = minSeq; 780 } 781 782 if (mSeqNumber < firstSeqNumberInPlaylist) { 783 mSeqNumber = firstSeqNumberInPlaylist; 784 } 785 786 if (mSeqNumber > lastSeqNumberInPlaylist) { 787 mSeqNumber = lastSeqNumberInPlaylist; 788 } 789 ALOGV("Initial sequence number for live event %d from (%d .. %d)", 790 mSeqNumber, firstSeqNumberInPlaylist, 791 lastSeqNumberInPlaylist); 792 } 793 } 794 795 // if mPlaylist is NULL then err must be non-OK; but the other way around might not be true 796 if (mSeqNumber < firstSeqNumberInPlaylist 797 || mSeqNumber > lastSeqNumberInPlaylist 798 || err != OK) { 799 if ((err != OK || !mPlaylist->isComplete()) && mNumRetries < kMaxNumRetries) { 800 ++mNumRetries; 801 802 if (mSeqNumber > lastSeqNumberInPlaylist || err != OK) { 803 // make sure we reach this retry logic on refresh failures 804 // by adding an err != OK clause to all enclosing if's. 805 806 // refresh in increasing fraction (1/2, 1/3, ...) of the 807 // playlist's target duration or 3 seconds, whichever is less 808 int64_t delayUs = kMaxMonitorDelayUs; 809 if (mPlaylist != NULL && mPlaylist->meta() != NULL) { 810 int32_t targetDurationSecs; 811 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 812 delayUs = mPlaylist->size() * targetDurationSecs * 813 1000000ll / (1 + mNumRetries); 814 } 815 if (delayUs > kMaxMonitorDelayUs) { 816 delayUs = kMaxMonitorDelayUs; 817 } 818 ALOGV("sequence number high: %d from (%d .. %d), " 819 "monitor in %" PRId64 " (retry=%d)", 820 mSeqNumber, firstSeqNumberInPlaylist, 821 lastSeqNumberInPlaylist, delayUs, mNumRetries); 822 postMonitorQueue(delayUs); 823 return; 824 } 825 826 if (err != OK) { 827 notifyError(err); 828 return; 829 } 830 831 // we've missed the boat, let's start 3 segments prior to the latest sequence 832 // number available and signal a discontinuity. 833 834 ALOGI("We've missed the boat, restarting playback." 835 " mStartup=%d, was looking for %d in %d-%d", 836 mStartup, mSeqNumber, firstSeqNumberInPlaylist, 837 lastSeqNumberInPlaylist); 838 if (mStopParams != NULL) { 839 // we should have kept on fetching until we hit the boundaries in mStopParams, 840 // but since the segments we are supposed to fetch have already rolled off 841 // the playlist, i.e. we have already missed the boat, we inevitably have to 842 // skip. 843 for (size_t i = 0; i < mPacketSources.size(); i++) { 844 sp<ABuffer> formatChange = mSession->createFormatChangeBuffer(); 845 mPacketSources.valueAt(i)->queueAccessUnit(formatChange); 846 } 847 stopAsync(/* clear = */ false); 848 return; 849 } 850 mSeqNumber = lastSeqNumberInPlaylist - 3; 851 if (mSeqNumber < firstSeqNumberInPlaylist) { 852 mSeqNumber = firstSeqNumberInPlaylist; 853 } 854 discontinuity = true; 855 856 // fall through 857 } else { 858 ALOGE("Cannot find sequence number %d in playlist " 859 "(contains %d - %d)", 860 mSeqNumber, firstSeqNumberInPlaylist, 861 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1); 862 863 notifyError(ERROR_END_OF_STREAM); 864 return; 865 } 866 } 867 868 mNumRetries = 0; 869 870 AString uri; 871 sp<AMessage> itemMeta; 872 CHECK(mPlaylist->itemAt( 873 mSeqNumber - firstSeqNumberInPlaylist, 874 &uri, 875 &itemMeta)); 876 877 int32_t val; 878 if (itemMeta->findInt32("discontinuity", &val) && val != 0) { 879 mDiscontinuitySeq++; 880 discontinuity = true; 881 } 882 883 int64_t range_offset, range_length; 884 if (!itemMeta->findInt64("range-offset", &range_offset) 885 || !itemMeta->findInt64("range-length", &range_length)) { 886 range_offset = 0; 887 range_length = -1; 888 } 889 890 ALOGV("fetching segment %d from (%d .. %d)", 891 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); 892 893 ALOGV("fetching '%s'", uri.c_str()); 894 895 sp<DataSource> source; 896 sp<ABuffer> buffer, tsBuffer; 897 // decrypt a junk buffer to prefetch key; since a session uses only one http connection, 898 // this avoids interleaved connections to the key and segment file. 899 { 900 sp<ABuffer> junk = new ABuffer(16); 901 junk->setRange(0, 16); 902 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk, 903 true /* first */); 904 if (err != OK) { 905 notifyError(err); 906 return; 907 } 908 } 909 910 // block-wise download 911 bool startup = mStartup; 912 ssize_t bytesRead; 913 do { 914 bytesRead = mSession->fetchFile( 915 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source); 916 917 if (bytesRead < 0) { 918 status_t err = bytesRead; 919 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); 920 notifyError(err); 921 return; 922 } 923 924 CHECK(buffer != NULL); 925 926 size_t size = buffer->size(); 927 // Set decryption range. 928 buffer->setRange(size - bytesRead, bytesRead); 929 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer, 930 buffer->offset() == 0 /* first */); 931 // Unset decryption range. 932 buffer->setRange(0, size); 933 934 if (err != OK) { 935 ALOGE("decryptBuffer failed w/ error %d", err); 936 937 notifyError(err); 938 return; 939 } 940 941 if (startup || discontinuity) { 942 // Signal discontinuity. 943 944 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 945 // If this was a live event this made no sense since 946 // we don't have access to all the segment before the current 947 // one. 948 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); 949 } 950 951 if (discontinuity) { 952 ALOGI("queueing discontinuity (explicit=%d)", discontinuity); 953 954 queueDiscontinuity( 955 ATSParser::DISCONTINUITY_FORMATCHANGE, 956 NULL /* extra */); 957 958 discontinuity = false; 959 } 960 961 startup = false; 962 } 963 964 err = OK; 965 if (bufferStartsWithTsSyncByte(buffer)) { 966 // Incremental extraction is only supported for MPEG2 transport streams. 967 if (tsBuffer == NULL) { 968 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 969 tsBuffer->setRange(0, 0); 970 } else if (tsBuffer->capacity() != buffer->capacity()) { 971 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size(); 972 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 973 tsBuffer->setRange(tsOff, tsSize); 974 } 975 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); 976 977 err = extractAndQueueAccessUnitsFromTs(tsBuffer); 978 } 979 980 if (err == -EAGAIN) { 981 // starting sequence number too low/high 982 mTSParser.clear(); 983 for (size_t i = 0; i < mPacketSources.size(); i++) { 984 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 985 packetSource->clear(); 986 } 987 postMonitorQueue(); 988 return; 989 } else if (err == ERROR_OUT_OF_RANGE) { 990 // reached stopping point 991 stopAsync(/* clear = */ false); 992 return; 993 } else if (err != OK) { 994 notifyError(err); 995 return; 996 } 997 998 } while (bytesRead != 0); 999 1000 if (bufferStartsWithTsSyncByte(buffer)) { 1001 // If we don't see a stream in the program table after fetching a full ts segment 1002 // mark it as nonexistent. 1003 const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES; 1004 ATSParser::SourceType srcTypes[kNumTypes] = 1005 { ATSParser::VIDEO, ATSParser::AUDIO }; 1006 LiveSession::StreamType streamTypes[kNumTypes] = 1007 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO }; 1008 1009 for (size_t i = 0; i < kNumTypes; i++) { 1010 ATSParser::SourceType srcType = srcTypes[i]; 1011 LiveSession::StreamType streamType = streamTypes[i]; 1012 1013 sp<AnotherPacketSource> source = 1014 static_cast<AnotherPacketSource *>( 1015 mTSParser->getSource(srcType).get()); 1016 1017 if (!mTSParser->hasSource(srcType)) { 1018 ALOGW("MPEG2 Transport stream does not contain %s data.", 1019 srcType == ATSParser::VIDEO ? "video" : "audio"); 1020 1021 mStreamTypeMask &= ~streamType; 1022 mPacketSources.removeItem(streamType); 1023 } 1024 } 1025 1026 } 1027 1028 if (checkDecryptPadding(buffer) != OK) { 1029 ALOGE("Incorrect padding bytes after decryption."); 1030 notifyError(ERROR_MALFORMED); 1031 return; 1032 } 1033 1034 err = OK; 1035 if (tsBuffer != NULL) { 1036 AString method; 1037 CHECK(buffer->meta()->findString("cipher-method", &method)); 1038 if ((tsBuffer->size() > 0 && method == "NONE") 1039 || tsBuffer->size() > 16) { 1040 ALOGE("MPEG2 transport stream is not an even multiple of 188 " 1041 "bytes in length."); 1042 notifyError(ERROR_MALFORMED); 1043 return; 1044 } 1045 } 1046 1047 // bulk extract non-ts files 1048 if (tsBuffer == NULL) { 1049 err = extractAndQueueAccessUnits(buffer, itemMeta); 1050 if (err == -EAGAIN) { 1051 // starting sequence number too low/high 1052 postMonitorQueue(); 1053 return; 1054 } else if (err == ERROR_OUT_OF_RANGE) { 1055 // reached stopping point 1056 stopAsync(/* clear = */false); 1057 return; 1058 } 1059 } 1060 1061 if (err != OK) { 1062 notifyError(err); 1063 return; 1064 } 1065 1066 ++mSeqNumber; 1067 1068 postMonitorQueue(); 1069} 1070 1071int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const { 1072 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; 1073 if (mPlaylist->meta() == NULL 1074 || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { 1075 firstSeqNumberInPlaylist = 0; 1076 } 1077 lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1; 1078 1079 int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1; 1080 while (index >= 0 && anchorTimeUs > mStartTimeUs) { 1081 sp<AMessage> itemMeta; 1082 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); 1083 1084 int64_t itemDurationUs; 1085 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1086 1087 anchorTimeUs -= itemDurationUs; 1088 --index; 1089 } 1090 1091 int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1; 1092 if (newSeqNumber <= lastSeqNumberInPlaylist) { 1093 return newSeqNumber; 1094 } else { 1095 return lastSeqNumberInPlaylist; 1096 } 1097} 1098 1099int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const { 1100 int32_t firstSeqNumberInPlaylist; 1101 if (mPlaylist->meta() == NULL 1102 || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { 1103 firstSeqNumberInPlaylist = 0; 1104 } 1105 1106 size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 1107 if (discontinuitySeq < curDiscontinuitySeq) { 1108 return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1); 1109 } 1110 1111 size_t index = 0; 1112 while (index < mPlaylist->size()) { 1113 sp<AMessage> itemMeta; 1114 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); 1115 1116 int64_t discontinuity; 1117 if (itemMeta->findInt64("discontinuity", &discontinuity)) { 1118 curDiscontinuitySeq++; 1119 } 1120 1121 if (curDiscontinuitySeq == discontinuitySeq) { 1122 return firstSeqNumberInPlaylist + index; 1123 } 1124 1125 ++index; 1126 } 1127 1128 return firstSeqNumberInPlaylist + mPlaylist->size(); 1129} 1130 1131int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { 1132 int32_t firstSeqNumberInPlaylist; 1133 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 1134 "media-sequence", &firstSeqNumberInPlaylist)) { 1135 firstSeqNumberInPlaylist = 0; 1136 } 1137 1138 size_t index = 0; 1139 int64_t segmentStartUs = 0; 1140 while (index < mPlaylist->size()) { 1141 sp<AMessage> itemMeta; 1142 CHECK(mPlaylist->itemAt( 1143 index, NULL /* uri */, &itemMeta)); 1144 1145 int64_t itemDurationUs; 1146 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1147 1148 if (timeUs < segmentStartUs + itemDurationUs) { 1149 break; 1150 } 1151 1152 segmentStartUs += itemDurationUs; 1153 ++index; 1154 } 1155 1156 if (index >= mPlaylist->size()) { 1157 index = mPlaylist->size() - 1; 1158 } 1159 1160 return firstSeqNumberInPlaylist + index; 1161} 1162 1163const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties( 1164 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) { 1165 sp<MetaData> format = source->getFormat(); 1166 if (format != NULL) { 1167 // for simplicity, store a reference to the format in each unit 1168 accessUnit->meta()->setObject("format", format); 1169 } 1170 1171 if (discard) { 1172 accessUnit->meta()->setInt32("discard", discard); 1173 } 1174 1175 int32_t targetDurationSecs; 1176 if (mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)) { 1177 accessUnit->meta()->setInt32("targetDuration", targetDurationSecs); 1178 } 1179 1180 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1181 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1182 return accessUnit; 1183} 1184 1185status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) { 1186 if (mTSParser == NULL) { 1187 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. 1188 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); 1189 } 1190 1191 if (mNextPTSTimeUs >= 0ll) { 1192 sp<AMessage> extra = new AMessage; 1193 // Since we are using absolute timestamps, signal an offset of 0 to prevent 1194 // ATSParser from skewing the timestamps of access units. 1195 extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); 1196 1197 mTSParser->signalDiscontinuity( 1198 ATSParser::DISCONTINUITY_TIME, extra); 1199 1200 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 1201 mNextPTSTimeUs = -1ll; 1202 mFirstPTSValid = false; 1203 } 1204 1205 size_t offset = 0; 1206 while (offset + 188 <= buffer->size()) { 1207 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); 1208 1209 if (err != OK) { 1210 return err; 1211 } 1212 1213 offset += 188; 1214 } 1215 // setRange to indicate consumed bytes. 1216 buffer->setRange(buffer->offset() + offset, buffer->size() - offset); 1217 1218 status_t err = OK; 1219 for (size_t i = mPacketSources.size(); i-- > 0;) { 1220 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1221 1222 const char *key; 1223 ATSParser::SourceType type; 1224 const LiveSession::StreamType stream = mPacketSources.keyAt(i); 1225 switch (stream) { 1226 case LiveSession::STREAMTYPE_VIDEO: 1227 type = ATSParser::VIDEO; 1228 key = "timeUsVideo"; 1229 break; 1230 1231 case LiveSession::STREAMTYPE_AUDIO: 1232 type = ATSParser::AUDIO; 1233 key = "timeUsAudio"; 1234 break; 1235 1236 case LiveSession::STREAMTYPE_SUBTITLES: 1237 { 1238 ALOGE("MPEG2 Transport streams do not contain subtitles."); 1239 return ERROR_MALFORMED; 1240 break; 1241 } 1242 1243 default: 1244 TRESPASS(); 1245 } 1246 1247 sp<AnotherPacketSource> source = 1248 static_cast<AnotherPacketSource *>( 1249 mTSParser->getSource(type).get()); 1250 1251 if (source == NULL) { 1252 continue; 1253 } 1254 1255 int64_t timeUs; 1256 sp<ABuffer> accessUnit; 1257 status_t finalResult; 1258 while (source->hasBufferAvailable(&finalResult) 1259 && source->dequeueAccessUnit(&accessUnit) == OK) { 1260 1261 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1262 1263 if (mStartup) { 1264 if (!mFirstPTSValid) { 1265 mFirstTimeUs = timeUs; 1266 mFirstPTSValid = true; 1267 } 1268 if (mStartTimeUsRelative) { 1269 timeUs -= mFirstTimeUs; 1270 if (timeUs < 0) { 1271 timeUs = 0; 1272 } 1273 } 1274 1275 if (timeUs < mStartTimeUs) { 1276 // buffer up to the closest preceding IDR frame 1277 ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us", 1278 timeUs, mStartTimeUs); 1279 const char *mime; 1280 sp<MetaData> format = source->getFormat(); 1281 bool isAvc = false; 1282 if (format != NULL && format->findCString(kKeyMIMEType, &mime) 1283 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) { 1284 isAvc = true; 1285 } 1286 if (isAvc && IsIDR(accessUnit)) { 1287 mVideoBuffer->clear(); 1288 } 1289 if (isAvc) { 1290 mVideoBuffer->queueAccessUnit(accessUnit); 1291 } 1292 1293 continue; 1294 } 1295 } 1296 1297 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1298 if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) { 1299 int32_t firstSeqNumberInPlaylist; 1300 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 1301 "media-sequence", &firstSeqNumberInPlaylist)) { 1302 firstSeqNumberInPlaylist = 0; 1303 } 1304 1305 int32_t targetDurationSecs; 1306 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 1307 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 1308 // mStartup 1309 // mStartup is true until we have queued a packet for all the streams 1310 // we are fetching. We queue packets whose timestamps are greater than 1311 // mStartTimeUs. 1312 // mSegmentStartTimeUs >= 0 1313 // mSegmentStartTimeUs is non-negative when adapting or switching tracks 1314 // mSeqNumber > firstSeqNumberInPlaylist 1315 // don't decrement mSeqNumber if it already points to the 1st segment 1316 // timeUs - mStartTimeUs > targetDurationUs: 1317 // This and the 2 above conditions should only happen when adapting in a live 1318 // stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher 1319 // would start fetching after timeUs, which should be greater than mStartTimeUs; 1320 // the old fetcher would then continue fetching data until timeUs. We don't want 1321 // timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to 1322 // stop as early as possible. The definition of being "too far ahead" is 1323 // arbitrary; here we use targetDurationUs as threshold. 1324 if (mStartup && mSegmentStartTimeUs >= 0 1325 && mSeqNumber > firstSeqNumberInPlaylist 1326 && timeUs - mStartTimeUs > targetDurationUs) { 1327 // we just guessed a starting timestamp that is too high when adapting in a 1328 // live stream; re-adjust based on the actual timestamp extracted from the 1329 // media segment; if we didn't move backward after the re-adjustment 1330 // (newSeqNumber), start at least 1 segment prior. 1331 int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); 1332 if (newSeqNumber >= mSeqNumber) { 1333 --mSeqNumber; 1334 } else { 1335 mSeqNumber = newSeqNumber; 1336 } 1337 mStartTimeUsNotify = mNotify->dup(); 1338 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 1339 return -EAGAIN; 1340 } 1341 1342 int32_t seq; 1343 if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) { 1344 mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); 1345 } 1346 int64_t startTimeUs; 1347 if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) { 1348 mStartTimeUsNotify->setInt64(key, timeUs); 1349 1350 uint32_t streamMask = 0; 1351 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); 1352 streamMask |= mPacketSources.keyAt(i); 1353 mStartTimeUsNotify->setInt32("streamMask", streamMask); 1354 1355 if (streamMask == mStreamTypeMask) { 1356 mStartup = false; 1357 mStartTimeUsNotify->post(); 1358 mStartTimeUsNotify.clear(); 1359 } 1360 } 1361 } 1362 1363 if (mStopParams != NULL) { 1364 // Queue discontinuity in original stream. 1365 int32_t discontinuitySeq; 1366 int64_t stopTimeUs; 1367 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1368 || discontinuitySeq > mDiscontinuitySeq 1369 || !mStopParams->findInt64(key, &stopTimeUs) 1370 || (discontinuitySeq == mDiscontinuitySeq 1371 && timeUs >= stopTimeUs)) { 1372 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); 1373 mStreamTypeMask &= ~stream; 1374 mPacketSources.removeItemsAt(i); 1375 break; 1376 } 1377 } 1378 1379 // Note that we do NOT dequeue any discontinuities except for format change. 1380 if (stream == LiveSession::STREAMTYPE_VIDEO) { 1381 const bool discard = true; 1382 status_t status; 1383 while (mVideoBuffer->hasBufferAvailable(&status)) { 1384 sp<ABuffer> videoBuffer; 1385 mVideoBuffer->dequeueAccessUnit(&videoBuffer); 1386 setAccessUnitProperties(videoBuffer, source, discard); 1387 packetSource->queueAccessUnit(videoBuffer); 1388 } 1389 } 1390 1391 setAccessUnitProperties(accessUnit, source); 1392 packetSource->queueAccessUnit(accessUnit); 1393 } 1394 1395 if (err != OK) { 1396 break; 1397 } 1398 } 1399 1400 if (err != OK) { 1401 for (size_t i = mPacketSources.size(); i-- > 0;) { 1402 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1403 packetSource->clear(); 1404 } 1405 return err; 1406 } 1407 1408 if (!mStreamTypeMask) { 1409 // Signal gap is filled between original and new stream. 1410 ALOGV("ERROR OUT OF RANGE"); 1411 return ERROR_OUT_OF_RANGE; 1412 } 1413 1414 return OK; 1415} 1416 1417/* static */ 1418bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence( 1419 const sp<ABuffer> &buffer) { 1420 size_t pos = 0; 1421 1422 // skip possible BOM 1423 if (buffer->size() >= pos + 3 && 1424 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) { 1425 pos += 3; 1426 } 1427 1428 // accept WEBVTT followed by SPACE, TAB or (CR) LF 1429 if (buffer->size() < pos + 6 || 1430 memcmp("WEBVTT", buffer->data() + pos, 6)) { 1431 return false; 1432 } 1433 pos += 6; 1434 1435 if (buffer->size() == pos) { 1436 return true; 1437 } 1438 1439 uint8_t sep = buffer->data()[pos]; 1440 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r'; 1441} 1442 1443status_t PlaylistFetcher::extractAndQueueAccessUnits( 1444 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { 1445 if (bufferStartsWithWebVTTMagicSequence(buffer)) { 1446 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { 1447 ALOGE("This stream only contains subtitles."); 1448 return ERROR_MALFORMED; 1449 } 1450 1451 const sp<AnotherPacketSource> packetSource = 1452 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 1453 1454 int64_t durationUs; 1455 CHECK(itemMeta->findInt64("durationUs", &durationUs)); 1456 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); 1457 buffer->meta()->setInt64("durationUs", durationUs); 1458 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1459 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1460 buffer->meta()->setInt32("subtitleGeneration", mSubtitleGeneration); 1461 1462 packetSource->queueAccessUnit(buffer); 1463 return OK; 1464 } 1465 1466 if (mNextPTSTimeUs >= 0ll) { 1467 mFirstPTSValid = false; 1468 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 1469 mNextPTSTimeUs = -1ll; 1470 } 1471 1472 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio 1473 // stream prefixed by an ID3 tag. 1474 1475 bool firstID3Tag = true; 1476 uint64_t PTS = 0; 1477 1478 for (;;) { 1479 // Make sure to skip all ID3 tags preceding the audio data. 1480 // At least one must be present to provide the PTS timestamp. 1481 1482 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */); 1483 if (!id3.isValid()) { 1484 if (firstID3Tag) { 1485 ALOGE("Unable to parse ID3 tag."); 1486 return ERROR_MALFORMED; 1487 } else { 1488 break; 1489 } 1490 } 1491 1492 if (firstID3Tag) { 1493 bool found = false; 1494 1495 ID3::Iterator it(id3, "PRIV"); 1496 while (!it.done()) { 1497 size_t length; 1498 const uint8_t *data = it.getData(&length); 1499 1500 static const char *kMatchName = 1501 "com.apple.streaming.transportStreamTimestamp"; 1502 static const size_t kMatchNameLen = strlen(kMatchName); 1503 1504 if (length == kMatchNameLen + 1 + 8 1505 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) { 1506 found = true; 1507 PTS = U64_AT(&data[kMatchNameLen + 1]); 1508 } 1509 1510 it.next(); 1511 } 1512 1513 if (!found) { 1514 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag."); 1515 return ERROR_MALFORMED; 1516 } 1517 } 1518 1519 // skip the ID3 tag 1520 buffer->setRange( 1521 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize()); 1522 1523 firstID3Tag = false; 1524 } 1525 1526 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { 1527 ALOGW("This stream only contains audio data!"); 1528 1529 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO; 1530 1531 if (mStreamTypeMask == 0) { 1532 return OK; 1533 } 1534 } 1535 1536 sp<AnotherPacketSource> packetSource = 1537 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO); 1538 1539 if (packetSource->getFormat() == NULL && buffer->size() >= 7) { 1540 ABitReader bits(buffer->data(), buffer->size()); 1541 1542 // adts_fixed_header 1543 1544 CHECK_EQ(bits.getBits(12), 0xfffu); 1545 bits.skipBits(3); // ID, layer 1546 bool protection_absent __unused = bits.getBits(1) != 0; 1547 1548 unsigned profile = bits.getBits(2); 1549 CHECK_NE(profile, 3u); 1550 unsigned sampling_freq_index = bits.getBits(4); 1551 bits.getBits(1); // private_bit 1552 unsigned channel_configuration = bits.getBits(3); 1553 CHECK_NE(channel_configuration, 0u); 1554 bits.skipBits(2); // original_copy, home 1555 1556 sp<MetaData> meta = MakeAACCodecSpecificData( 1557 profile, sampling_freq_index, channel_configuration); 1558 1559 meta->setInt32(kKeyIsADTS, true); 1560 1561 packetSource->setFormat(meta); 1562 } 1563 1564 int64_t numSamples = 0ll; 1565 int32_t sampleRate; 1566 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); 1567 1568 int64_t timeUs = (PTS * 100ll) / 9ll; 1569 if (!mFirstPTSValid) { 1570 mFirstPTSValid = true; 1571 mFirstTimeUs = timeUs; 1572 } 1573 1574 size_t offset = 0; 1575 while (offset < buffer->size()) { 1576 const uint8_t *adtsHeader = buffer->data() + offset; 1577 CHECK_LT(offset + 5, buffer->size()); 1578 1579 unsigned aac_frame_length = 1580 ((adtsHeader[3] & 3) << 11) 1581 | (adtsHeader[4] << 3) 1582 | (adtsHeader[5] >> 5); 1583 1584 if (aac_frame_length == 0) { 1585 const uint8_t *id3Header = adtsHeader; 1586 if (!memcmp(id3Header, "ID3", 3)) { 1587 ID3 id3(id3Header, buffer->size() - offset, true); 1588 if (id3.isValid()) { 1589 offset += id3.rawSize(); 1590 continue; 1591 }; 1592 } 1593 return ERROR_MALFORMED; 1594 } 1595 1596 CHECK_LE(offset + aac_frame_length, buffer->size()); 1597 1598 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; 1599 offset += aac_frame_length; 1600 1601 // Each AAC frame encodes 1024 samples. 1602 numSamples += 1024; 1603 1604 if (mStartup) { 1605 int64_t startTimeUs = unitTimeUs; 1606 if (mStartTimeUsRelative) { 1607 startTimeUs -= mFirstTimeUs; 1608 if (startTimeUs < 0) { 1609 startTimeUs = 0; 1610 } 1611 } 1612 if (startTimeUs < mStartTimeUs) { 1613 continue; 1614 } 1615 1616 if (mStartTimeUsNotify != NULL) { 1617 int32_t targetDurationSecs; 1618 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 1619 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 1620 1621 // Duplicated logic from how we handle .ts playlists. 1622 if (mStartup && mSegmentStartTimeUs >= 0 1623 && timeUs - mStartTimeUs > targetDurationUs) { 1624 int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); 1625 if (newSeqNumber >= mSeqNumber) { 1626 --mSeqNumber; 1627 } else { 1628 mSeqNumber = newSeqNumber; 1629 } 1630 return -EAGAIN; 1631 } 1632 1633 mStartTimeUsNotify->setInt64("timeUsAudio", timeUs); 1634 mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); 1635 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO); 1636 mStartTimeUsNotify->post(); 1637 mStartTimeUsNotify.clear(); 1638 mStartup = false; 1639 } 1640 } 1641 1642 if (mStopParams != NULL) { 1643 // Queue discontinuity in original stream. 1644 int32_t discontinuitySeq; 1645 int64_t stopTimeUs; 1646 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1647 || discontinuitySeq > mDiscontinuitySeq 1648 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs) 1649 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) { 1650 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); 1651 mStreamTypeMask = 0; 1652 mPacketSources.clear(); 1653 return ERROR_OUT_OF_RANGE; 1654 } 1655 } 1656 1657 sp<ABuffer> unit = new ABuffer(aac_frame_length); 1658 memcpy(unit->data(), adtsHeader, aac_frame_length); 1659 1660 unit->meta()->setInt64("timeUs", unitTimeUs); 1661 setAccessUnitProperties(unit, packetSource); 1662 packetSource->queueAccessUnit(unit); 1663 } 1664 1665 return OK; 1666} 1667 1668void PlaylistFetcher::updateDuration() { 1669 int64_t durationUs = 0ll; 1670 for (size_t index = 0; index < mPlaylist->size(); ++index) { 1671 sp<AMessage> itemMeta; 1672 CHECK(mPlaylist->itemAt( 1673 index, NULL /* uri */, &itemMeta)); 1674 1675 int64_t itemDurationUs; 1676 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1677 1678 durationUs += itemDurationUs; 1679 } 1680 1681 sp<AMessage> msg = mNotify->dup(); 1682 msg->setInt32("what", kWhatDurationUpdate); 1683 msg->setInt64("durationUs", durationUs); 1684 msg->post(); 1685} 1686 1687} // namespace android 1688