PlaylistFetcher.cpp revision 73d2847af14cdd5fdf8bd1ac80fb7ddf9ae7d9a7
1/* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "PlaylistFetcher" 19#include <utils/Log.h> 20 21#include "PlaylistFetcher.h" 22 23#include "LiveDataSource.h" 24#include "LiveSession.h" 25#include "M3UParser.h" 26 27#include "include/avc_utils.h" 28#include "include/HTTPBase.h" 29#include "include/ID3.h" 30#include "mpeg2ts/AnotherPacketSource.h" 31 32#include <media/IStreamSource.h> 33#include <media/stagefright/foundation/ABitReader.h> 34#include <media/stagefright/foundation/ABuffer.h> 35#include <media/stagefright/foundation/ADebug.h> 36#include <media/stagefright/foundation/hexdump.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaDefs.h> 39#include <media/stagefright/MetaData.h> 40#include <media/stagefright/Utils.h> 41 42#include <ctype.h> 43#include <inttypes.h> 44#include <openssl/aes.h> 45#include <openssl/md5.h> 46 47namespace android { 48 49// static 50const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; 51const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; 52const int32_t PlaylistFetcher::kDownloadBlockSize = 2048; 53const int32_t PlaylistFetcher::kNumSkipFrames = 10; 54 55PlaylistFetcher::PlaylistFetcher( 56 const sp<AMessage> ¬ify, 57 const sp<LiveSession> &session, 58 const char *uri) 59 : mNotify(notify), 60 mStartTimeUsNotify(notify->dup()), 61 mSession(session), 62 mURI(uri), 63 mStreamTypeMask(0), 64 mStartTimeUs(-1ll), 65 mSegmentStartTimeUs(-1ll), 66 mDiscontinuitySeq(-1ll), 67 mStartTimeUsRelative(false), 68 mLastPlaylistFetchTimeUs(-1ll), 69 mSeqNumber(-1), 70 mNumRetries(0), 71 mStartup(true), 72 mAdaptive(false), 73 mPrepared(false), 74 mNextPTSTimeUs(-1ll), 75 mMonitorQueueGeneration(0), 76 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), 77 mFirstPTSValid(false), 78 mAbsoluteTimeAnchorUs(0ll), 79 mVideoBuffer(new AnotherPacketSource(NULL)) { 80 memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); 81 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 82 mStartTimeUsNotify->setInt32("streamMask", 0); 83} 84 85PlaylistFetcher::~PlaylistFetcher() { 86} 87 88int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { 89 CHECK(mPlaylist != NULL); 90 91 int32_t firstSeqNumberInPlaylist; 92 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 93 "media-sequence", &firstSeqNumberInPlaylist)) { 94 firstSeqNumberInPlaylist = 0; 95 } 96 97 int32_t lastSeqNumberInPlaylist = 98 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 99 100 CHECK_GE(seqNumber, firstSeqNumberInPlaylist); 101 CHECK_LE(seqNumber, lastSeqNumberInPlaylist); 102 103 int64_t segmentStartUs = 0ll; 104 for (int32_t index = 0; 105 index < seqNumber - firstSeqNumberInPlaylist; ++index) { 106 sp<AMessage> itemMeta; 107 CHECK(mPlaylist->itemAt( 108 index, NULL /* uri */, &itemMeta)); 109 110 int64_t itemDurationUs; 111 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 112 113 segmentStartUs += itemDurationUs; 114 } 115 116 return segmentStartUs; 117} 118 119int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { 120 int64_t nowUs = ALooper::GetNowUs(); 121 122 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) { 123 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY); 124 return 0ll; 125 } 126 127 if (mPlaylist->isComplete()) { 128 return (~0llu >> 1); 129 } 130 131 int32_t targetDurationSecs; 132 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 133 134 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 135 136 int64_t minPlaylistAgeUs; 137 138 switch (mRefreshState) { 139 case INITIAL_MINIMUM_RELOAD_DELAY: 140 { 141 size_t n = mPlaylist->size(); 142 if (n > 0) { 143 sp<AMessage> itemMeta; 144 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta)); 145 146 int64_t itemDurationUs; 147 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 148 149 minPlaylistAgeUs = itemDurationUs; 150 break; 151 } 152 153 // fall through 154 } 155 156 case FIRST_UNCHANGED_RELOAD_ATTEMPT: 157 { 158 minPlaylistAgeUs = targetDurationUs / 2; 159 break; 160 } 161 162 case SECOND_UNCHANGED_RELOAD_ATTEMPT: 163 { 164 minPlaylistAgeUs = (targetDurationUs * 3) / 2; 165 break; 166 } 167 168 case THIRD_UNCHANGED_RELOAD_ATTEMPT: 169 { 170 minPlaylistAgeUs = targetDurationUs * 3; 171 break; 172 } 173 174 default: 175 TRESPASS(); 176 break; 177 } 178 179 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs; 180 return delayUs > 0ll ? delayUs : 0ll; 181} 182 183status_t PlaylistFetcher::decryptBuffer( 184 size_t playlistIndex, const sp<ABuffer> &buffer, 185 bool first) { 186 sp<AMessage> itemMeta; 187 bool found = false; 188 AString method; 189 190 for (ssize_t i = playlistIndex; i >= 0; --i) { 191 AString uri; 192 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); 193 194 if (itemMeta->findString("cipher-method", &method)) { 195 found = true; 196 break; 197 } 198 } 199 200 if (!found) { 201 method = "NONE"; 202 } 203 buffer->meta()->setString("cipher-method", method.c_str()); 204 205 if (method == "NONE") { 206 return OK; 207 } else if (!(method == "AES-128")) { 208 ALOGE("Unsupported cipher method '%s'", method.c_str()); 209 return ERROR_UNSUPPORTED; 210 } 211 212 AString keyURI; 213 if (!itemMeta->findString("cipher-uri", &keyURI)) { 214 ALOGE("Missing key uri"); 215 return ERROR_MALFORMED; 216 } 217 218 ssize_t index = mAESKeyForURI.indexOfKey(keyURI); 219 220 sp<ABuffer> key; 221 if (index >= 0) { 222 key = mAESKeyForURI.valueAt(index); 223 } else { 224 ssize_t err = mSession->fetchFile(keyURI.c_str(), &key); 225 226 if (err < 0) { 227 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); 228 return ERROR_IO; 229 } else if (key->size() != 16) { 230 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str()); 231 return ERROR_MALFORMED; 232 } 233 234 mAESKeyForURI.add(keyURI, key); 235 } 236 237 AES_KEY aes_key; 238 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { 239 ALOGE("failed to set AES decryption key."); 240 return UNKNOWN_ERROR; 241 } 242 243 size_t n = buffer->size(); 244 if (!n) { 245 return OK; 246 } 247 CHECK(n % 16 == 0); 248 249 if (first) { 250 // If decrypting the first block in a file, read the iv from the manifest 251 // or derive the iv from the file's sequence number. 252 253 AString iv; 254 if (itemMeta->findString("cipher-iv", &iv)) { 255 if ((!iv.startsWith("0x") && !iv.startsWith("0X")) 256 || iv.size() != 16 * 2 + 2) { 257 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 258 return ERROR_MALFORMED; 259 } 260 261 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 262 for (size_t i = 0; i < 16; ++i) { 263 char c1 = tolower(iv.c_str()[2 + 2 * i]); 264 char c2 = tolower(iv.c_str()[3 + 2 * i]); 265 if (!isxdigit(c1) || !isxdigit(c2)) { 266 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 267 return ERROR_MALFORMED; 268 } 269 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; 270 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; 271 272 mAESInitVec[i] = nibble1 << 4 | nibble2; 273 } 274 } else { 275 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 276 mAESInitVec[15] = mSeqNumber & 0xff; 277 mAESInitVec[14] = (mSeqNumber >> 8) & 0xff; 278 mAESInitVec[13] = (mSeqNumber >> 16) & 0xff; 279 mAESInitVec[12] = (mSeqNumber >> 24) & 0xff; 280 } 281 } 282 283 AES_cbc_encrypt( 284 buffer->data(), buffer->data(), buffer->size(), 285 &aes_key, mAESInitVec, AES_DECRYPT); 286 287 return OK; 288} 289 290status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) { 291 status_t err; 292 AString method; 293 CHECK(buffer->meta()->findString("cipher-method", &method)); 294 if (method == "NONE") { 295 return OK; 296 } 297 298 uint8_t padding = 0; 299 if (buffer->size() > 0) { 300 padding = buffer->data()[buffer->size() - 1]; 301 } 302 303 if (padding > 16) { 304 return ERROR_MALFORMED; 305 } 306 307 for (size_t i = buffer->size() - padding; i < padding; i++) { 308 if (buffer->data()[i] != padding) { 309 return ERROR_MALFORMED; 310 } 311 } 312 313 buffer->setRange(buffer->offset(), buffer->size() - padding); 314 return OK; 315} 316 317void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { 318 int64_t maxDelayUs = delayUsToRefreshPlaylist(); 319 if (maxDelayUs < minDelayUs) { 320 maxDelayUs = minDelayUs; 321 } 322 if (delayUs > maxDelayUs) { 323 ALOGV("Need to refresh playlist in %" PRId64 , maxDelayUs); 324 delayUs = maxDelayUs; 325 } 326 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); 327 msg->setInt32("generation", mMonitorQueueGeneration); 328 msg->post(delayUs); 329} 330 331void PlaylistFetcher::cancelMonitorQueue() { 332 ++mMonitorQueueGeneration; 333} 334 335void PlaylistFetcher::startAsync( 336 const sp<AnotherPacketSource> &audioSource, 337 const sp<AnotherPacketSource> &videoSource, 338 const sp<AnotherPacketSource> &subtitleSource, 339 int64_t startTimeUs, 340 int64_t segmentStartTimeUs, 341 int32_t startDiscontinuitySeq, 342 bool adaptive) { 343 sp<AMessage> msg = new AMessage(kWhatStart, id()); 344 345 uint32_t streamTypeMask = 0ul; 346 347 if (audioSource != NULL) { 348 msg->setPointer("audioSource", audioSource.get()); 349 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO; 350 } 351 352 if (videoSource != NULL) { 353 msg->setPointer("videoSource", videoSource.get()); 354 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO; 355 } 356 357 if (subtitleSource != NULL) { 358 msg->setPointer("subtitleSource", subtitleSource.get()); 359 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES; 360 } 361 362 msg->setInt32("streamTypeMask", streamTypeMask); 363 msg->setInt64("startTimeUs", startTimeUs); 364 msg->setInt64("segmentStartTimeUs", segmentStartTimeUs); 365 msg->setInt32("startDiscontinuitySeq", startDiscontinuitySeq); 366 msg->setInt32("adaptive", adaptive); 367 msg->post(); 368} 369 370void PlaylistFetcher::pauseAsync() { 371 (new AMessage(kWhatPause, id()))->post(); 372} 373 374void PlaylistFetcher::stopAsync(bool clear) { 375 sp<AMessage> msg = new AMessage(kWhatStop, id()); 376 msg->setInt32("clear", clear); 377 msg->post(); 378} 379 380void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) { 381 AMessage* msg = new AMessage(kWhatResumeUntil, id()); 382 msg->setMessage("params", params); 383 msg->post(); 384} 385 386void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { 387 switch (msg->what()) { 388 case kWhatStart: 389 { 390 status_t err = onStart(msg); 391 392 sp<AMessage> notify = mNotify->dup(); 393 notify->setInt32("what", kWhatStarted); 394 notify->setInt32("err", err); 395 notify->post(); 396 break; 397 } 398 399 case kWhatPause: 400 { 401 onPause(); 402 403 sp<AMessage> notify = mNotify->dup(); 404 notify->setInt32("what", kWhatPaused); 405 notify->post(); 406 break; 407 } 408 409 case kWhatStop: 410 { 411 onStop(msg); 412 413 sp<AMessage> notify = mNotify->dup(); 414 notify->setInt32("what", kWhatStopped); 415 notify->post(); 416 break; 417 } 418 419 case kWhatMonitorQueue: 420 case kWhatDownloadNext: 421 { 422 int32_t generation; 423 CHECK(msg->findInt32("generation", &generation)); 424 425 if (generation != mMonitorQueueGeneration) { 426 // Stale event 427 break; 428 } 429 430 if (msg->what() == kWhatMonitorQueue) { 431 onMonitorQueue(); 432 } else { 433 onDownloadNext(); 434 } 435 break; 436 } 437 438 case kWhatResumeUntil: 439 { 440 onResumeUntil(msg); 441 break; 442 } 443 444 default: 445 TRESPASS(); 446 } 447} 448 449status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { 450 mPacketSources.clear(); 451 452 uint32_t streamTypeMask; 453 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); 454 455 int64_t startTimeUs; 456 int64_t segmentStartTimeUs; 457 int32_t startDiscontinuitySeq; 458 int32_t adaptive; 459 CHECK(msg->findInt64("startTimeUs", &startTimeUs)); 460 CHECK(msg->findInt64("segmentStartTimeUs", &segmentStartTimeUs)); 461 CHECK(msg->findInt32("startDiscontinuitySeq", &startDiscontinuitySeq)); 462 CHECK(msg->findInt32("adaptive", &adaptive)); 463 464 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { 465 void *ptr; 466 CHECK(msg->findPointer("audioSource", &ptr)); 467 468 mPacketSources.add( 469 LiveSession::STREAMTYPE_AUDIO, 470 static_cast<AnotherPacketSource *>(ptr)); 471 } 472 473 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) { 474 void *ptr; 475 CHECK(msg->findPointer("videoSource", &ptr)); 476 477 mPacketSources.add( 478 LiveSession::STREAMTYPE_VIDEO, 479 static_cast<AnotherPacketSource *>(ptr)); 480 } 481 482 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) { 483 void *ptr; 484 CHECK(msg->findPointer("subtitleSource", &ptr)); 485 486 mPacketSources.add( 487 LiveSession::STREAMTYPE_SUBTITLES, 488 static_cast<AnotherPacketSource *>(ptr)); 489 } 490 491 mStreamTypeMask = streamTypeMask; 492 493 mSegmentStartTimeUs = segmentStartTimeUs; 494 mDiscontinuitySeq = startDiscontinuitySeq; 495 496 if (startTimeUs >= 0) { 497 mStartTimeUs = startTimeUs; 498 mSeqNumber = -1; 499 mStartup = true; 500 mPrepared = false; 501 mAdaptive = adaptive; 502 } 503 504 postMonitorQueue(); 505 506 return OK; 507} 508 509void PlaylistFetcher::onPause() { 510 cancelMonitorQueue(); 511} 512 513void PlaylistFetcher::onStop(const sp<AMessage> &msg) { 514 cancelMonitorQueue(); 515 516 int32_t clear; 517 CHECK(msg->findInt32("clear", &clear)); 518 if (clear) { 519 for (size_t i = 0; i < mPacketSources.size(); i++) { 520 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 521 packetSource->clear(); 522 } 523 } 524 525 mPacketSources.clear(); 526 mStreamTypeMask = 0; 527} 528 529// Resume until we have reached the boundary timestamps listed in `msg`; when 530// the remaining time is too short (within a resume threshold) stop immediately 531// instead. 532status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { 533 sp<AMessage> params; 534 CHECK(msg->findMessage("params", ¶ms)); 535 536 bool stop = false; 537 for (size_t i = 0; i < mPacketSources.size(); i++) { 538 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 539 540 const char *stopKey; 541 int streamType = mPacketSources.keyAt(i); 542 switch (streamType) { 543 case LiveSession::STREAMTYPE_VIDEO: 544 stopKey = "timeUsVideo"; 545 break; 546 547 case LiveSession::STREAMTYPE_AUDIO: 548 stopKey = "timeUsAudio"; 549 break; 550 551 case LiveSession::STREAMTYPE_SUBTITLES: 552 stopKey = "timeUsSubtitle"; 553 break; 554 555 default: 556 TRESPASS(); 557 } 558 559 // Don't resume if we would stop within a resume threshold. 560 int32_t discontinuitySeq; 561 int64_t latestTimeUs = 0, stopTimeUs = 0; 562 sp<AMessage> latestMeta = packetSource->getLatestDequeuedMeta(); 563 if (latestMeta != NULL 564 && latestMeta->findInt32("discontinuitySeq", &discontinuitySeq) 565 && discontinuitySeq == mDiscontinuitySeq 566 && latestMeta->findInt64("timeUs", &latestTimeUs) 567 && params->findInt64(stopKey, &stopTimeUs) 568 && stopTimeUs - latestTimeUs < resumeThreshold(latestMeta)) { 569 stop = true; 570 } 571 } 572 573 if (stop) { 574 for (size_t i = 0; i < mPacketSources.size(); i++) { 575 mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); 576 } 577 stopAsync(/* clear = */ false); 578 return OK; 579 } 580 581 mStopParams = params; 582 postMonitorQueue(); 583 584 return OK; 585} 586 587void PlaylistFetcher::notifyError(status_t err) { 588 sp<AMessage> notify = mNotify->dup(); 589 notify->setInt32("what", kWhatError); 590 notify->setInt32("err", err); 591 notify->post(); 592} 593 594void PlaylistFetcher::queueDiscontinuity( 595 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) { 596 for (size_t i = 0; i < mPacketSources.size(); ++i) { 597 // do not discard buffer upon #EXT-X-DISCONTINUITY tag 598 // (seek will discard buffer by abandoning old fetchers) 599 mPacketSources.valueAt(i)->queueDiscontinuity( 600 type, extra, false /* discard */); 601 } 602} 603 604void PlaylistFetcher::onMonitorQueue() { 605 bool downloadMore = false; 606 refreshPlaylist(); 607 608 int32_t targetDurationSecs; 609 int64_t targetDurationUs = kMinBufferedDurationUs; 610 if (mPlaylist != NULL) { 611 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 612 targetDurationUs = targetDurationSecs * 1000000ll; 613 } 614 615 // buffer at least 3 times the target duration, or up to 10 seconds 616 int64_t durationToBufferUs = targetDurationUs * 3; 617 if (durationToBufferUs > kMinBufferedDurationUs) { 618 durationToBufferUs = kMinBufferedDurationUs; 619 } 620 621 int64_t bufferedDurationUs = 0ll; 622 status_t finalResult = NOT_ENOUGH_DATA; 623 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { 624 sp<AnotherPacketSource> packetSource = 625 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 626 627 bufferedDurationUs = 628 packetSource->getBufferedDurationUs(&finalResult); 629 finalResult = OK; 630 } else { 631 // Use max stream duration to prevent us from waiting on a non-existent stream; 632 // when we cannot make out from the manifest what streams are included in a playlist 633 // we might assume extra streams. 634 for (size_t i = 0; i < mPacketSources.size(); ++i) { 635 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { 636 continue; 637 } 638 639 int64_t bufferedStreamDurationUs = 640 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); 641 ALOGV("buffered %" PRId64 " for stream %d", 642 bufferedStreamDurationUs, mPacketSources.keyAt(i)); 643 if (bufferedStreamDurationUs > bufferedDurationUs) { 644 bufferedDurationUs = bufferedStreamDurationUs; 645 } 646 } 647 } 648 downloadMore = (bufferedDurationUs < durationToBufferUs); 649 650 // signal start if buffered up at least the target size 651 if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) { 652 mPrepared = true; 653 654 ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "", 655 bufferedDurationUs, targetDurationUs); 656 sp<AMessage> msg = mNotify->dup(); 657 msg->setInt32("what", kWhatTemporarilyDoneFetching); 658 msg->post(); 659 } 660 661 if (finalResult == OK && downloadMore) { 662 ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "", 663 bufferedDurationUs, durationToBufferUs); 664 // delay the next download slightly; hopefully this gives other concurrent fetchers 665 // a better chance to run. 666 // onDownloadNext(); 667 sp<AMessage> msg = new AMessage(kWhatDownloadNext, id()); 668 msg->setInt32("generation", mMonitorQueueGeneration); 669 msg->post(1000l); 670 } else { 671 // Nothing to do yet, try again in a second. 672 673 sp<AMessage> msg = mNotify->dup(); 674 msg->setInt32("what", kWhatTemporarilyDoneFetching); 675 msg->post(); 676 677 int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2; 678 ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "", 679 delayUs, bufferedDurationUs, durationToBufferUs); 680 // :TRICKY: need to enforce minimum delay because the delay to 681 // refresh the playlist will become 0 682 postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0); 683 } 684} 685 686status_t PlaylistFetcher::refreshPlaylist() { 687 if (delayUsToRefreshPlaylist() <= 0) { 688 bool unchanged; 689 sp<M3UParser> playlist = mSession->fetchPlaylist( 690 mURI.c_str(), mPlaylistHash, &unchanged); 691 692 if (playlist == NULL) { 693 if (unchanged) { 694 // We succeeded in fetching the playlist, but it was 695 // unchanged from the last time we tried. 696 697 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) { 698 mRefreshState = (RefreshState)(mRefreshState + 1); 699 } 700 } else { 701 ALOGE("failed to load playlist at url '%s'", mURI.c_str()); 702 notifyError(ERROR_IO); 703 return ERROR_IO; 704 } 705 } else { 706 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; 707 mPlaylist = playlist; 708 709 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 710 updateDuration(); 711 } 712 } 713 714 mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); 715 } 716 return OK; 717} 718 719// static 720bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) { 721 return buffer->size() > 0 && buffer->data()[0] == 0x47; 722} 723 724void PlaylistFetcher::onDownloadNext() { 725 if (refreshPlaylist() != OK) { 726 return; 727 } 728 729 int32_t firstSeqNumberInPlaylist; 730 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 731 "media-sequence", &firstSeqNumberInPlaylist)) { 732 firstSeqNumberInPlaylist = 0; 733 } 734 735 bool discontinuity = false; 736 737 const int32_t lastSeqNumberInPlaylist = 738 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 739 740 if (mDiscontinuitySeq < 0) { 741 mDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 742 } 743 744 if (mSeqNumber < 0) { 745 CHECK_GE(mStartTimeUs, 0ll); 746 747 if (mSegmentStartTimeUs < 0) { 748 if (!mPlaylist->isComplete() && !mPlaylist->isEvent()) { 749 // If this is a live session, start 3 segments from the end on connect 750 mSeqNumber = lastSeqNumberInPlaylist - 3; 751 if (mSeqNumber < firstSeqNumberInPlaylist) { 752 mSeqNumber = firstSeqNumberInPlaylist; 753 } 754 } else { 755 mSeqNumber = getSeqNumberForTime(mStartTimeUs); 756 mStartTimeUs -= getSegmentStartTimeUs(mSeqNumber); 757 } 758 mStartTimeUsRelative = true; 759 ALOGV("Initial sequence number for time %" PRId64 " is %d from (%d .. %d)", 760 mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, 761 lastSeqNumberInPlaylist); 762 } else { 763 mSeqNumber = getSeqNumberForTime(mSegmentStartTimeUs); 764 if (mAdaptive) { 765 // avoid double fetch/decode 766 mSeqNumber += 1; 767 } 768 ssize_t minSeq = getSeqNumberForDiscontinuity(mDiscontinuitySeq); 769 if (mSeqNumber < minSeq) { 770 mSeqNumber = minSeq; 771 } 772 773 if (mSeqNumber < firstSeqNumberInPlaylist) { 774 mSeqNumber = firstSeqNumberInPlaylist; 775 } 776 777 if (mSeqNumber > lastSeqNumberInPlaylist) { 778 mSeqNumber = lastSeqNumberInPlaylist; 779 } 780 ALOGV("Initial sequence number for live event %d from (%d .. %d)", 781 mSeqNumber, firstSeqNumberInPlaylist, 782 lastSeqNumberInPlaylist); 783 } 784 } 785 786 if (mSeqNumber < firstSeqNumberInPlaylist 787 || mSeqNumber > lastSeqNumberInPlaylist) { 788 if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) { 789 ++mNumRetries; 790 791 if (mSeqNumber > lastSeqNumberInPlaylist) { 792 // refresh in increasing fraction (1/2, 1/3, ...) of the 793 // playlist's target duration or 3 seconds, whichever is less 794 int32_t targetDurationSecs; 795 CHECK(mPlaylist->meta()->findInt32( 796 "target-duration", &targetDurationSecs)); 797 int64_t delayUs = mPlaylist->size() * targetDurationSecs * 798 1000000ll / (1 + mNumRetries); 799 if (delayUs > kMaxMonitorDelayUs) { 800 delayUs = kMaxMonitorDelayUs; 801 } 802 ALOGV("sequence number high: %d from (%d .. %d), " 803 "monitor in %" PRId64 " (retry=%d)", 804 mSeqNumber, firstSeqNumberInPlaylist, 805 lastSeqNumberInPlaylist, delayUs, mNumRetries); 806 postMonitorQueue(delayUs); 807 return; 808 } 809 810 // we've missed the boat, let's start from the lowest sequence 811 // number available and signal a discontinuity. 812 813 ALOGI("We've missed the boat, restarting playback." 814 " mStartup=%d, was looking for %d in %d-%d", 815 mStartup, mSeqNumber, firstSeqNumberInPlaylist, 816 lastSeqNumberInPlaylist); 817 mSeqNumber = lastSeqNumberInPlaylist - 3; 818 if (mSeqNumber < firstSeqNumberInPlaylist) { 819 mSeqNumber = firstSeqNumberInPlaylist; 820 } 821 discontinuity = true; 822 823 // fall through 824 } else { 825 ALOGE("Cannot find sequence number %d in playlist " 826 "(contains %d - %d)", 827 mSeqNumber, firstSeqNumberInPlaylist, 828 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1); 829 830 notifyError(ERROR_END_OF_STREAM); 831 return; 832 } 833 } 834 835 mNumRetries = 0; 836 837 AString uri; 838 sp<AMessage> itemMeta; 839 CHECK(mPlaylist->itemAt( 840 mSeqNumber - firstSeqNumberInPlaylist, 841 &uri, 842 &itemMeta)); 843 844 int32_t val; 845 if (itemMeta->findInt32("discontinuity", &val) && val != 0) { 846 mDiscontinuitySeq++; 847 discontinuity = true; 848 } 849 850 int64_t range_offset, range_length; 851 if (!itemMeta->findInt64("range-offset", &range_offset) 852 || !itemMeta->findInt64("range-length", &range_length)) { 853 range_offset = 0; 854 range_length = -1; 855 } 856 857 ALOGV("fetching segment %d from (%d .. %d)", 858 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); 859 860 ALOGV("fetching '%s'", uri.c_str()); 861 862 sp<DataSource> source; 863 sp<ABuffer> buffer, tsBuffer; 864 // decrypt a junk buffer to prefetch key; since a session uses only one http connection, 865 // this avoids interleaved connections to the key and segment file. 866 { 867 sp<ABuffer> junk = new ABuffer(16); 868 junk->setRange(0, 16); 869 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk, 870 true /* first */); 871 if (err != OK) { 872 notifyError(err); 873 return; 874 } 875 } 876 877 // block-wise download 878 bool startup = mStartup; 879 ssize_t bytesRead; 880 do { 881 bytesRead = mSession->fetchFile( 882 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source); 883 884 if (bytesRead < 0) { 885 status_t err = bytesRead; 886 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); 887 notifyError(err); 888 return; 889 } 890 891 CHECK(buffer != NULL); 892 893 size_t size = buffer->size(); 894 // Set decryption range. 895 buffer->setRange(size - bytesRead, bytesRead); 896 status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer, 897 buffer->offset() == 0 /* first */); 898 // Unset decryption range. 899 buffer->setRange(0, size); 900 901 if (err != OK) { 902 ALOGE("decryptBuffer failed w/ error %d", err); 903 904 notifyError(err); 905 return; 906 } 907 908 if (startup || discontinuity) { 909 // Signal discontinuity. 910 911 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 912 // If this was a live event this made no sense since 913 // we don't have access to all the segment before the current 914 // one. 915 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); 916 } 917 918 if (discontinuity) { 919 ALOGI("queueing discontinuity (explicit=%d)", discontinuity); 920 921 queueDiscontinuity( 922 ATSParser::DISCONTINUITY_FORMATCHANGE, 923 NULL /* extra */); 924 925 discontinuity = false; 926 } 927 928 startup = false; 929 } 930 931 err = OK; 932 if (bufferStartsWithTsSyncByte(buffer)) { 933 // Incremental extraction is only supported for MPEG2 transport streams. 934 if (tsBuffer == NULL) { 935 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 936 tsBuffer->setRange(0, 0); 937 } else if (tsBuffer->capacity() != buffer->capacity()) { 938 size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size(); 939 tsBuffer = new ABuffer(buffer->data(), buffer->capacity()); 940 tsBuffer->setRange(tsOff, tsSize); 941 } 942 tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead); 943 944 err = extractAndQueueAccessUnitsFromTs(tsBuffer); 945 } 946 947 if (err == -EAGAIN) { 948 // starting sequence number too low/high 949 mTSParser.clear(); 950 postMonitorQueue(); 951 return; 952 } else if (err == ERROR_OUT_OF_RANGE) { 953 // reached stopping point 954 stopAsync(/* clear = */ false); 955 return; 956 } else if (err != OK) { 957 notifyError(err); 958 return; 959 } 960 961 } while (bytesRead != 0); 962 963 if (bufferStartsWithTsSyncByte(buffer)) { 964 // If we still don't see a stream after fetching a full ts segment mark it as 965 // nonexistent. 966 const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES; 967 ATSParser::SourceType srcTypes[kNumTypes] = 968 { ATSParser::VIDEO, ATSParser::AUDIO }; 969 LiveSession::StreamType streamTypes[kNumTypes] = 970 { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO }; 971 972 for (size_t i = 0; i < kNumTypes; i++) { 973 ATSParser::SourceType srcType = srcTypes[i]; 974 LiveSession::StreamType streamType = streamTypes[i]; 975 976 sp<AnotherPacketSource> source = 977 static_cast<AnotherPacketSource *>( 978 mTSParser->getSource(srcType).get()); 979 980 if (source == NULL) { 981 ALOGW("MPEG2 Transport stream does not contain %s data.", 982 srcType == ATSParser::VIDEO ? "video" : "audio"); 983 984 mStreamTypeMask &= ~streamType; 985 mPacketSources.removeItem(streamType); 986 } 987 } 988 989 } 990 991 if (checkDecryptPadding(buffer) != OK) { 992 ALOGE("Incorrect padding bytes after decryption."); 993 notifyError(ERROR_MALFORMED); 994 return; 995 } 996 997 status_t err = OK; 998 if (tsBuffer != NULL) { 999 AString method; 1000 CHECK(buffer->meta()->findString("cipher-method", &method)); 1001 if ((tsBuffer->size() > 0 && method == "NONE") 1002 || tsBuffer->size() > 16) { 1003 ALOGE("MPEG2 transport stream is not an even multiple of 188 " 1004 "bytes in length."); 1005 notifyError(ERROR_MALFORMED); 1006 return; 1007 } 1008 } 1009 1010 // bulk extract non-ts files 1011 if (tsBuffer == NULL) { 1012 err = extractAndQueueAccessUnits(buffer, itemMeta); 1013 if (err == -EAGAIN) { 1014 // starting sequence number too low/high 1015 postMonitorQueue(); 1016 return; 1017 } else if (err == ERROR_OUT_OF_RANGE) { 1018 // reached stopping point 1019 stopAsync(/* clear = */false); 1020 return; 1021 } 1022 } 1023 1024 if (err != OK) { 1025 notifyError(err); 1026 return; 1027 } 1028 1029 ++mSeqNumber; 1030 1031 postMonitorQueue(); 1032} 1033 1034int32_t PlaylistFetcher::getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const { 1035 int32_t firstSeqNumberInPlaylist, lastSeqNumberInPlaylist; 1036 if (mPlaylist->meta() == NULL 1037 || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { 1038 firstSeqNumberInPlaylist = 0; 1039 } 1040 lastSeqNumberInPlaylist = firstSeqNumberInPlaylist + mPlaylist->size() - 1; 1041 1042 int32_t index = mSeqNumber - firstSeqNumberInPlaylist - 1; 1043 while (index >= 0 && anchorTimeUs > mStartTimeUs) { 1044 sp<AMessage> itemMeta; 1045 CHECK(mPlaylist->itemAt(index, NULL /* uri */, &itemMeta)); 1046 1047 int64_t itemDurationUs; 1048 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1049 1050 anchorTimeUs -= itemDurationUs; 1051 --index; 1052 } 1053 1054 int32_t newSeqNumber = firstSeqNumberInPlaylist + index + 1; 1055 if (newSeqNumber <= lastSeqNumberInPlaylist) { 1056 return newSeqNumber; 1057 } else { 1058 return lastSeqNumberInPlaylist; 1059 } 1060} 1061 1062int32_t PlaylistFetcher::getSeqNumberForDiscontinuity(size_t discontinuitySeq) const { 1063 int32_t firstSeqNumberInPlaylist; 1064 if (mPlaylist->meta() == NULL 1065 || !mPlaylist->meta()->findInt32("media-sequence", &firstSeqNumberInPlaylist)) { 1066 firstSeqNumberInPlaylist = 0; 1067 } 1068 1069 size_t curDiscontinuitySeq = mPlaylist->getDiscontinuitySeq(); 1070 if (discontinuitySeq < curDiscontinuitySeq) { 1071 return firstSeqNumberInPlaylist <= 0 ? 0 : (firstSeqNumberInPlaylist - 1); 1072 } 1073 1074 size_t index = 0; 1075 while (index < mPlaylist->size()) { 1076 sp<AMessage> itemMeta; 1077 CHECK(mPlaylist->itemAt( index, NULL /* uri */, &itemMeta)); 1078 1079 int64_t discontinuity; 1080 if (itemMeta->findInt64("discontinuity", &discontinuity)) { 1081 curDiscontinuitySeq++; 1082 } 1083 1084 if (curDiscontinuitySeq == discontinuitySeq) { 1085 return firstSeqNumberInPlaylist + index; 1086 } 1087 1088 ++index; 1089 } 1090 1091 return firstSeqNumberInPlaylist + mPlaylist->size(); 1092} 1093 1094int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { 1095 int32_t firstSeqNumberInPlaylist; 1096 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 1097 "media-sequence", &firstSeqNumberInPlaylist)) { 1098 firstSeqNumberInPlaylist = 0; 1099 } 1100 1101 size_t index = 0; 1102 int64_t segmentStartUs = 0; 1103 while (index < mPlaylist->size()) { 1104 sp<AMessage> itemMeta; 1105 CHECK(mPlaylist->itemAt( 1106 index, NULL /* uri */, &itemMeta)); 1107 1108 int64_t itemDurationUs; 1109 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1110 1111 if (timeUs < segmentStartUs + itemDurationUs) { 1112 break; 1113 } 1114 1115 segmentStartUs += itemDurationUs; 1116 ++index; 1117 } 1118 1119 if (index >= mPlaylist->size()) { 1120 index = mPlaylist->size() - 1; 1121 } 1122 1123 return firstSeqNumberInPlaylist + index; 1124} 1125 1126const sp<ABuffer> &PlaylistFetcher::setAccessUnitProperties( 1127 const sp<ABuffer> &accessUnit, const sp<AnotherPacketSource> &source, bool discard) { 1128 sp<MetaData> format = source->getFormat(); 1129 if (format != NULL) { 1130 // for simplicity, store a reference to the format in each unit 1131 accessUnit->meta()->setObject("format", format); 1132 } 1133 1134 if (discard) { 1135 accessUnit->meta()->setInt32("discard", discard); 1136 } 1137 1138 accessUnit->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1139 accessUnit->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1140 return accessUnit; 1141} 1142 1143status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) { 1144 if (mTSParser == NULL) { 1145 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. 1146 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); 1147 } 1148 1149 if (mNextPTSTimeUs >= 0ll) { 1150 sp<AMessage> extra = new AMessage; 1151 // Since we are using absolute timestamps, signal an offset of 0 to prevent 1152 // ATSParser from skewing the timestamps of access units. 1153 extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); 1154 1155 mTSParser->signalDiscontinuity( 1156 ATSParser::DISCONTINUITY_SEEK, extra); 1157 1158 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 1159 mNextPTSTimeUs = -1ll; 1160 mFirstPTSValid = false; 1161 } 1162 1163 size_t offset = 0; 1164 while (offset + 188 <= buffer->size()) { 1165 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); 1166 1167 if (err != OK) { 1168 return err; 1169 } 1170 1171 offset += 188; 1172 } 1173 // setRange to indicate consumed bytes. 1174 buffer->setRange(buffer->offset() + offset, buffer->size() - offset); 1175 1176 status_t err = OK; 1177 for (size_t i = mPacketSources.size(); i-- > 0;) { 1178 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1179 1180 const char *key; 1181 ATSParser::SourceType type; 1182 const LiveSession::StreamType stream = mPacketSources.keyAt(i); 1183 switch (stream) { 1184 case LiveSession::STREAMTYPE_VIDEO: 1185 type = ATSParser::VIDEO; 1186 key = "timeUsVideo"; 1187 break; 1188 1189 case LiveSession::STREAMTYPE_AUDIO: 1190 type = ATSParser::AUDIO; 1191 key = "timeUsAudio"; 1192 break; 1193 1194 case LiveSession::STREAMTYPE_SUBTITLES: 1195 { 1196 ALOGE("MPEG2 Transport streams do not contain subtitles."); 1197 return ERROR_MALFORMED; 1198 break; 1199 } 1200 1201 default: 1202 TRESPASS(); 1203 } 1204 1205 sp<AnotherPacketSource> source = 1206 static_cast<AnotherPacketSource *>( 1207 mTSParser->getSource(type).get()); 1208 1209 if (source == NULL) { 1210 continue; 1211 } 1212 1213 int64_t timeUs; 1214 sp<ABuffer> accessUnit; 1215 status_t finalResult; 1216 while (source->hasBufferAvailable(&finalResult) 1217 && source->dequeueAccessUnit(&accessUnit) == OK) { 1218 1219 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1220 1221 if (mStartup) { 1222 if (!mFirstPTSValid) { 1223 mFirstTimeUs = timeUs; 1224 mFirstPTSValid = true; 1225 } 1226 if (mStartTimeUsRelative) { 1227 timeUs -= mFirstTimeUs; 1228 if (timeUs < 0) { 1229 timeUs = 0; 1230 } 1231 } 1232 1233 if (timeUs < mStartTimeUs) { 1234 // buffer up to the closest preceding IDR frame 1235 ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us", 1236 timeUs, mStartTimeUs); 1237 const char *mime; 1238 sp<MetaData> format = source->getFormat(); 1239 bool isAvc = false; 1240 if (format != NULL && format->findCString(kKeyMIMEType, &mime) 1241 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) { 1242 isAvc = true; 1243 } 1244 if (isAvc && IsIDR(accessUnit)) { 1245 mVideoBuffer->clear(); 1246 } 1247 if (isAvc) { 1248 mVideoBuffer->queueAccessUnit(accessUnit); 1249 } 1250 1251 continue; 1252 } 1253 } 1254 1255 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1256 if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) { 1257 1258 int32_t targetDurationSecs; 1259 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 1260 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 1261 // mStartup 1262 // mStartup is true until we have queued a packet for all the streams 1263 // we are fetching. We queue packets whose timestamps are greater than 1264 // mStartTimeUs. 1265 // mSegmentStartTimeUs >= 0 1266 // mSegmentStartTimeUs is non-negative when adapting or switching tracks 1267 // timeUs - mStartTimeUs > targetDurationUs: 1268 // This and the 2 above conditions should only happen when adapting in a live 1269 // stream; the old fetcher has already fetched to mStartTimeUs; the new fetcher 1270 // would start fetching after timeUs, which should be greater than mStartTimeUs; 1271 // the old fetcher would then continue fetching data until timeUs. We don't want 1272 // timeUs to be too far ahead of mStartTimeUs because we want the old fetcher to 1273 // stop as early as possible. The definition of being "too far ahead" is 1274 // arbitrary; here we use targetDurationUs as threshold. 1275 if (mStartup && mSegmentStartTimeUs >= 0 1276 && timeUs - mStartTimeUs > targetDurationUs) { 1277 // we just guessed a starting timestamp that is too high when adapting in a 1278 // live stream; re-adjust based on the actual timestamp extracted from the 1279 // media segment; if we didn't move backward after the re-adjustment 1280 // (newSeqNumber), start at least 1 segment prior. 1281 int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); 1282 if (newSeqNumber >= mSeqNumber) { 1283 --mSeqNumber; 1284 } else { 1285 mSeqNumber = newSeqNumber; 1286 } 1287 mStartTimeUsNotify = mNotify->dup(); 1288 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 1289 return -EAGAIN; 1290 } 1291 1292 int32_t seq; 1293 if (!mStartTimeUsNotify->findInt32("discontinuitySeq", &seq)) { 1294 mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); 1295 } 1296 int64_t startTimeUs; 1297 if (!mStartTimeUsNotify->findInt64(key, &startTimeUs)) { 1298 mStartTimeUsNotify->setInt64(key, timeUs); 1299 1300 uint32_t streamMask = 0; 1301 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); 1302 streamMask |= mPacketSources.keyAt(i); 1303 mStartTimeUsNotify->setInt32("streamMask", streamMask); 1304 1305 if (streamMask == mStreamTypeMask) { 1306 mStartup = false; 1307 mStartTimeUsNotify->post(); 1308 mStartTimeUsNotify.clear(); 1309 } 1310 } 1311 } 1312 1313 if (mStopParams != NULL) { 1314 // Queue discontinuity in original stream. 1315 int32_t discontinuitySeq; 1316 int64_t stopTimeUs; 1317 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1318 || discontinuitySeq > mDiscontinuitySeq 1319 || !mStopParams->findInt64(key, &stopTimeUs) 1320 || (discontinuitySeq == mDiscontinuitySeq 1321 && timeUs >= stopTimeUs)) { 1322 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); 1323 mStreamTypeMask &= ~stream; 1324 mPacketSources.removeItemsAt(i); 1325 break; 1326 } 1327 } 1328 1329 // Note that we do NOT dequeue any discontinuities except for format change. 1330 if (stream == LiveSession::STREAMTYPE_VIDEO) { 1331 const bool discard = true; 1332 status_t status; 1333 while (mVideoBuffer->hasBufferAvailable(&status)) { 1334 sp<ABuffer> videoBuffer; 1335 mVideoBuffer->dequeueAccessUnit(&videoBuffer); 1336 setAccessUnitProperties(videoBuffer, source, discard); 1337 packetSource->queueAccessUnit(videoBuffer); 1338 } 1339 } 1340 1341 setAccessUnitProperties(accessUnit, source); 1342 packetSource->queueAccessUnit(accessUnit); 1343 } 1344 1345 if (err != OK) { 1346 break; 1347 } 1348 } 1349 1350 if (err != OK) { 1351 for (size_t i = mPacketSources.size(); i-- > 0;) { 1352 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1353 packetSource->clear(); 1354 } 1355 return err; 1356 } 1357 1358 if (!mStreamTypeMask) { 1359 // Signal gap is filled between original and new stream. 1360 ALOGV("ERROR OUT OF RANGE"); 1361 return ERROR_OUT_OF_RANGE; 1362 } 1363 1364 return OK; 1365} 1366 1367/* static */ 1368bool PlaylistFetcher::bufferStartsWithWebVTTMagicSequence( 1369 const sp<ABuffer> &buffer) { 1370 size_t pos = 0; 1371 1372 // skip possible BOM 1373 if (buffer->size() >= pos + 3 && 1374 !memcmp("\xef\xbb\xbf", buffer->data() + pos, 3)) { 1375 pos += 3; 1376 } 1377 1378 // accept WEBVTT followed by SPACE, TAB or (CR) LF 1379 if (buffer->size() < pos + 6 || 1380 memcmp("WEBVTT", buffer->data() + pos, 6)) { 1381 return false; 1382 } 1383 pos += 6; 1384 1385 if (buffer->size() == pos) { 1386 return true; 1387 } 1388 1389 uint8_t sep = buffer->data()[pos]; 1390 return sep == ' ' || sep == '\t' || sep == '\n' || sep == '\r'; 1391} 1392 1393status_t PlaylistFetcher::extractAndQueueAccessUnits( 1394 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { 1395 if (bufferStartsWithWebVTTMagicSequence(buffer)) { 1396 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { 1397 ALOGE("This stream only contains subtitles."); 1398 return ERROR_MALFORMED; 1399 } 1400 1401 const sp<AnotherPacketSource> packetSource = 1402 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 1403 1404 int64_t durationUs; 1405 CHECK(itemMeta->findInt64("durationUs", &durationUs)); 1406 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); 1407 buffer->meta()->setInt64("durationUs", durationUs); 1408 buffer->meta()->setInt64("segmentStartTimeUs", getSegmentStartTimeUs(mSeqNumber)); 1409 buffer->meta()->setInt32("discontinuitySeq", mDiscontinuitySeq); 1410 1411 packetSource->queueAccessUnit(buffer); 1412 return OK; 1413 } 1414 1415 if (mNextPTSTimeUs >= 0ll) { 1416 mFirstPTSValid = false; 1417 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 1418 mNextPTSTimeUs = -1ll; 1419 } 1420 1421 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio 1422 // stream prefixed by an ID3 tag. 1423 1424 bool firstID3Tag = true; 1425 uint64_t PTS = 0; 1426 1427 for (;;) { 1428 // Make sure to skip all ID3 tags preceding the audio data. 1429 // At least one must be present to provide the PTS timestamp. 1430 1431 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */); 1432 if (!id3.isValid()) { 1433 if (firstID3Tag) { 1434 ALOGE("Unable to parse ID3 tag."); 1435 return ERROR_MALFORMED; 1436 } else { 1437 break; 1438 } 1439 } 1440 1441 if (firstID3Tag) { 1442 bool found = false; 1443 1444 ID3::Iterator it(id3, "PRIV"); 1445 while (!it.done()) { 1446 size_t length; 1447 const uint8_t *data = it.getData(&length); 1448 1449 static const char *kMatchName = 1450 "com.apple.streaming.transportStreamTimestamp"; 1451 static const size_t kMatchNameLen = strlen(kMatchName); 1452 1453 if (length == kMatchNameLen + 1 + 8 1454 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) { 1455 found = true; 1456 PTS = U64_AT(&data[kMatchNameLen + 1]); 1457 } 1458 1459 it.next(); 1460 } 1461 1462 if (!found) { 1463 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag."); 1464 return ERROR_MALFORMED; 1465 } 1466 } 1467 1468 // skip the ID3 tag 1469 buffer->setRange( 1470 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize()); 1471 1472 firstID3Tag = false; 1473 } 1474 1475 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { 1476 ALOGW("This stream only contains audio data!"); 1477 1478 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO; 1479 1480 if (mStreamTypeMask == 0) { 1481 return OK; 1482 } 1483 } 1484 1485 sp<AnotherPacketSource> packetSource = 1486 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO); 1487 1488 if (packetSource->getFormat() == NULL && buffer->size() >= 7) { 1489 ABitReader bits(buffer->data(), buffer->size()); 1490 1491 // adts_fixed_header 1492 1493 CHECK_EQ(bits.getBits(12), 0xfffu); 1494 bits.skipBits(3); // ID, layer 1495 bool protection_absent = bits.getBits(1) != 0; 1496 1497 unsigned profile = bits.getBits(2); 1498 CHECK_NE(profile, 3u); 1499 unsigned sampling_freq_index = bits.getBits(4); 1500 bits.getBits(1); // private_bit 1501 unsigned channel_configuration = bits.getBits(3); 1502 CHECK_NE(channel_configuration, 0u); 1503 bits.skipBits(2); // original_copy, home 1504 1505 sp<MetaData> meta = MakeAACCodecSpecificData( 1506 profile, sampling_freq_index, channel_configuration); 1507 1508 meta->setInt32(kKeyIsADTS, true); 1509 1510 packetSource->setFormat(meta); 1511 } 1512 1513 int64_t numSamples = 0ll; 1514 int32_t sampleRate; 1515 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); 1516 1517 int64_t timeUs = (PTS * 100ll) / 9ll; 1518 if (!mFirstPTSValid) { 1519 mFirstPTSValid = true; 1520 mFirstTimeUs = timeUs; 1521 } 1522 1523 size_t offset = 0; 1524 while (offset < buffer->size()) { 1525 const uint8_t *adtsHeader = buffer->data() + offset; 1526 CHECK_LT(offset + 5, buffer->size()); 1527 1528 unsigned aac_frame_length = 1529 ((adtsHeader[3] & 3) << 11) 1530 | (adtsHeader[4] << 3) 1531 | (adtsHeader[5] >> 5); 1532 1533 if (aac_frame_length == 0) { 1534 const uint8_t *id3Header = adtsHeader; 1535 if (!memcmp(id3Header, "ID3", 3)) { 1536 ID3 id3(id3Header, buffer->size() - offset, true); 1537 if (id3.isValid()) { 1538 offset += id3.rawSize(); 1539 continue; 1540 }; 1541 } 1542 return ERROR_MALFORMED; 1543 } 1544 1545 CHECK_LE(offset + aac_frame_length, buffer->size()); 1546 1547 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; 1548 offset += aac_frame_length; 1549 1550 // Each AAC frame encodes 1024 samples. 1551 numSamples += 1024; 1552 1553 if (mStartup) { 1554 int64_t startTimeUs = unitTimeUs; 1555 if (mStartTimeUsRelative) { 1556 startTimeUs -= mFirstTimeUs; 1557 if (startTimeUs < 0) { 1558 startTimeUs = 0; 1559 } 1560 } 1561 if (startTimeUs < mStartTimeUs) { 1562 continue; 1563 } 1564 1565 if (mStartTimeUsNotify != NULL) { 1566 int32_t targetDurationSecs; 1567 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 1568 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 1569 1570 // Duplicated logic from how we handle .ts playlists. 1571 if (mStartup && mSegmentStartTimeUs >= 0 1572 && timeUs - mStartTimeUs > targetDurationUs) { 1573 int32_t newSeqNumber = getSeqNumberWithAnchorTime(timeUs); 1574 if (newSeqNumber >= mSeqNumber) { 1575 --mSeqNumber; 1576 } else { 1577 mSeqNumber = newSeqNumber; 1578 } 1579 return -EAGAIN; 1580 } 1581 1582 mStartTimeUsNotify->setInt64("timeUsAudio", timeUs); 1583 mStartTimeUsNotify->setInt32("discontinuitySeq", mDiscontinuitySeq); 1584 mStartTimeUsNotify->setInt32("streamMask", LiveSession::STREAMTYPE_AUDIO); 1585 mStartTimeUsNotify->post(); 1586 mStartTimeUsNotify.clear(); 1587 } 1588 } 1589 1590 if (mStopParams != NULL) { 1591 // Queue discontinuity in original stream. 1592 int32_t discontinuitySeq; 1593 int64_t stopTimeUs; 1594 if (!mStopParams->findInt32("discontinuitySeq", &discontinuitySeq) 1595 || discontinuitySeq > mDiscontinuitySeq 1596 || !mStopParams->findInt64("timeUsAudio", &stopTimeUs) 1597 || (discontinuitySeq == mDiscontinuitySeq && unitTimeUs >= stopTimeUs)) { 1598 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); 1599 mStreamTypeMask = 0; 1600 mPacketSources.clear(); 1601 return ERROR_OUT_OF_RANGE; 1602 } 1603 } 1604 1605 sp<ABuffer> unit = new ABuffer(aac_frame_length); 1606 memcpy(unit->data(), adtsHeader, aac_frame_length); 1607 1608 unit->meta()->setInt64("timeUs", unitTimeUs); 1609 setAccessUnitProperties(unit, packetSource); 1610 packetSource->queueAccessUnit(unit); 1611 } 1612 1613 return OK; 1614} 1615 1616void PlaylistFetcher::updateDuration() { 1617 int64_t durationUs = 0ll; 1618 for (size_t index = 0; index < mPlaylist->size(); ++index) { 1619 sp<AMessage> itemMeta; 1620 CHECK(mPlaylist->itemAt( 1621 index, NULL /* uri */, &itemMeta)); 1622 1623 int64_t itemDurationUs; 1624 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1625 1626 durationUs += itemDurationUs; 1627 } 1628 1629 sp<AMessage> msg = mNotify->dup(); 1630 msg->setInt32("what", kWhatDurationUpdate); 1631 msg->setInt64("durationUs", durationUs); 1632 msg->post(); 1633} 1634 1635int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) { 1636 int64_t durationUs, threshold; 1637 if (msg->findInt64("durationUs", &durationUs)) { 1638 return kNumSkipFrames * durationUs; 1639 } 1640 1641 sp<RefBase> obj; 1642 msg->findObject("format", &obj); 1643 MetaData *format = static_cast<MetaData *>(obj.get()); 1644 1645 const char *mime; 1646 CHECK(format->findCString(kKeyMIMEType, &mime)); 1647 bool audio = !strncasecmp(mime, "audio/", 6); 1648 if (audio) { 1649 // Assumes 1000 samples per frame. 1650 int32_t sampleRate; 1651 CHECK(format->findInt32(kKeySampleRate, &sampleRate)); 1652 return kNumSkipFrames /* frames */ * 1000 /* samples */ 1653 * (1000000 / sampleRate) /* sample duration (us) */; 1654 } else { 1655 int32_t frameRate; 1656 if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) { 1657 return kNumSkipFrames * (1000000 / frameRate); 1658 } 1659 } 1660 1661 return 500000ll; 1662} 1663 1664} // namespace android 1665