PlaylistFetcher.cpp revision 5ce50c1931e1e3d8f113394bbe2c9f99354f4c5f
1/* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "PlaylistFetcher" 19#include <utils/Log.h> 20 21#include "PlaylistFetcher.h" 22 23#include "LiveDataSource.h" 24#include "LiveSession.h" 25#include "M3UParser.h" 26 27#include "include/avc_utils.h" 28#include "include/HTTPBase.h" 29#include "include/ID3.h" 30#include "mpeg2ts/AnotherPacketSource.h" 31 32#include <media/IStreamSource.h> 33#include <media/stagefright/foundation/ABitReader.h> 34#include <media/stagefright/foundation/ABuffer.h> 35#include <media/stagefright/foundation/ADebug.h> 36#include <media/stagefright/foundation/hexdump.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaDefs.h> 39#include <media/stagefright/MetaData.h> 40#include <media/stagefright/Utils.h> 41 42#include <ctype.h> 43#include <openssl/aes.h> 44#include <openssl/md5.h> 45 46namespace android { 47 48// static 49const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; 50const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; 51const int32_t PlaylistFetcher::kNumSkipFrames = 10; 52 53PlaylistFetcher::PlaylistFetcher( 54 const sp<AMessage> ¬ify, 55 const sp<LiveSession> &session, 56 const char *uri) 57 : mNotify(notify), 58 mStartTimeUsNotify(notify->dup()), 59 mSession(session), 60 mURI(uri), 61 mStreamTypeMask(0), 62 mStartTimeUs(-1ll), 63 mMinStartTimeUs(0ll), 64 mStopParams(NULL), 65 mLastPlaylistFetchTimeUs(-1ll), 66 mSeqNumber(-1), 67 mNumRetries(0), 68 mStartup(true), 69 mPrepared(false), 70 mNextPTSTimeUs(-1ll), 71 mMonitorQueueGeneration(0), 72 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), 73 mFirstPTSValid(false), 74 mAbsoluteTimeAnchorUs(0ll) { 75 memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); 76 mStartTimeUsNotify->setInt32("what", kWhatStartedAt); 77 mStartTimeUsNotify->setInt32("streamMask", 0); 78} 79 80PlaylistFetcher::~PlaylistFetcher() { 81} 82 83int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { 84 CHECK(mPlaylist != NULL); 85 86 int32_t firstSeqNumberInPlaylist; 87 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 88 "media-sequence", &firstSeqNumberInPlaylist)) { 89 firstSeqNumberInPlaylist = 0; 90 } 91 92 int32_t lastSeqNumberInPlaylist = 93 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 94 95 CHECK_GE(seqNumber, firstSeqNumberInPlaylist); 96 CHECK_LE(seqNumber, lastSeqNumberInPlaylist); 97 98 int64_t segmentStartUs = 0ll; 99 for (int32_t index = 0; 100 index < seqNumber - firstSeqNumberInPlaylist; ++index) { 101 sp<AMessage> itemMeta; 102 CHECK(mPlaylist->itemAt( 103 index, NULL /* uri */, &itemMeta)); 104 105 int64_t itemDurationUs; 106 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 107 108 segmentStartUs += itemDurationUs; 109 } 110 111 return segmentStartUs; 112} 113 114int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { 115 int64_t nowUs = ALooper::GetNowUs(); 116 117 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) { 118 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY); 119 return 0ll; 120 } 121 122 if (mPlaylist->isComplete()) { 123 return (~0llu >> 1); 124 } 125 126 int32_t targetDurationSecs; 127 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 128 129 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 130 131 int64_t minPlaylistAgeUs; 132 133 switch (mRefreshState) { 134 case INITIAL_MINIMUM_RELOAD_DELAY: 135 { 136 size_t n = mPlaylist->size(); 137 if (n > 0) { 138 sp<AMessage> itemMeta; 139 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta)); 140 141 int64_t itemDurationUs; 142 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 143 144 minPlaylistAgeUs = itemDurationUs; 145 break; 146 } 147 148 // fall through 149 } 150 151 case FIRST_UNCHANGED_RELOAD_ATTEMPT: 152 { 153 minPlaylistAgeUs = targetDurationUs / 2; 154 break; 155 } 156 157 case SECOND_UNCHANGED_RELOAD_ATTEMPT: 158 { 159 minPlaylistAgeUs = (targetDurationUs * 3) / 2; 160 break; 161 } 162 163 case THIRD_UNCHANGED_RELOAD_ATTEMPT: 164 { 165 minPlaylistAgeUs = targetDurationUs * 3; 166 break; 167 } 168 169 default: 170 TRESPASS(); 171 break; 172 } 173 174 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs; 175 return delayUs > 0ll ? delayUs : 0ll; 176} 177 178status_t PlaylistFetcher::decryptBuffer( 179 size_t playlistIndex, const sp<ABuffer> &buffer, 180 bool first) { 181 sp<AMessage> itemMeta; 182 bool found = false; 183 AString method; 184 185 for (ssize_t i = playlistIndex; i >= 0; --i) { 186 AString uri; 187 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); 188 189 if (itemMeta->findString("cipher-method", &method)) { 190 found = true; 191 break; 192 } 193 } 194 195 if (!found) { 196 method = "NONE"; 197 } 198 buffer->meta()->setString("cipher-method", method.c_str()); 199 200 if (method == "NONE") { 201 return OK; 202 } else if (!(method == "AES-128")) { 203 ALOGE("Unsupported cipher method '%s'", method.c_str()); 204 return ERROR_UNSUPPORTED; 205 } 206 207 AString keyURI; 208 if (!itemMeta->findString("cipher-uri", &keyURI)) { 209 ALOGE("Missing key uri"); 210 return ERROR_MALFORMED; 211 } 212 213 ssize_t index = mAESKeyForURI.indexOfKey(keyURI); 214 215 sp<ABuffer> key; 216 if (index >= 0) { 217 key = mAESKeyForURI.valueAt(index); 218 } else { 219 status_t err = mSession->fetchFile(keyURI.c_str(), &key); 220 221 if (err != OK) { 222 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); 223 return ERROR_IO; 224 } else if (key->size() != 16) { 225 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str()); 226 return ERROR_MALFORMED; 227 } 228 229 mAESKeyForURI.add(keyURI, key); 230 } 231 232 AES_KEY aes_key; 233 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { 234 ALOGE("failed to set AES decryption key."); 235 return UNKNOWN_ERROR; 236 } 237 238 size_t n = buffer->size(); 239 if (!n) { 240 return OK; 241 } 242 CHECK(n % 16 == 0); 243 244 if (first) { 245 // If decrypting the first block in a file, read the iv from the manifest 246 // or derive the iv from the file's sequence number. 247 248 AString iv; 249 if (itemMeta->findString("cipher-iv", &iv)) { 250 if ((!iv.startsWith("0x") && !iv.startsWith("0X")) 251 || iv.size() != 16 * 2 + 2) { 252 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 253 return ERROR_MALFORMED; 254 } 255 256 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 257 for (size_t i = 0; i < 16; ++i) { 258 char c1 = tolower(iv.c_str()[2 + 2 * i]); 259 char c2 = tolower(iv.c_str()[3 + 2 * i]); 260 if (!isxdigit(c1) || !isxdigit(c2)) { 261 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 262 return ERROR_MALFORMED; 263 } 264 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; 265 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; 266 267 mAESInitVec[i] = nibble1 << 4 | nibble2; 268 } 269 } else { 270 memset(mAESInitVec, 0, sizeof(mAESInitVec)); 271 mAESInitVec[15] = mSeqNumber & 0xff; 272 mAESInitVec[14] = (mSeqNumber >> 8) & 0xff; 273 mAESInitVec[13] = (mSeqNumber >> 16) & 0xff; 274 mAESInitVec[12] = (mSeqNumber >> 24) & 0xff; 275 } 276 } 277 278 AES_cbc_encrypt( 279 buffer->data(), buffer->data(), buffer->size(), 280 &aes_key, mAESInitVec, AES_DECRYPT); 281 282 return OK; 283} 284 285status_t PlaylistFetcher::checkDecryptPadding(const sp<ABuffer> &buffer) { 286 status_t err; 287 AString method; 288 CHECK(buffer->meta()->findString("cipher-method", &method)); 289 if (method == "NONE") { 290 return OK; 291 } 292 293 uint8_t padding = 0; 294 if (buffer->size() > 0) { 295 padding = buffer->data()[buffer->size() - 1]; 296 } 297 298 if (padding > 16) { 299 return ERROR_MALFORMED; 300 } 301 302 for (size_t i = buffer->size() - padding; i < padding; i++) { 303 if (buffer->data()[i] != padding) { 304 return ERROR_MALFORMED; 305 } 306 } 307 308 buffer->setRange(buffer->offset(), buffer->size() - padding); 309 return OK; 310} 311 312void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { 313 int64_t maxDelayUs = delayUsToRefreshPlaylist(); 314 if (maxDelayUs < minDelayUs) { 315 maxDelayUs = minDelayUs; 316 } 317 if (delayUs > maxDelayUs) { 318 ALOGV("Need to refresh playlist in %lld", maxDelayUs); 319 delayUs = maxDelayUs; 320 } 321 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); 322 msg->setInt32("generation", mMonitorQueueGeneration); 323 msg->post(delayUs); 324} 325 326void PlaylistFetcher::cancelMonitorQueue() { 327 ++mMonitorQueueGeneration; 328} 329 330void PlaylistFetcher::startAsync( 331 const sp<AnotherPacketSource> &audioSource, 332 const sp<AnotherPacketSource> &videoSource, 333 const sp<AnotherPacketSource> &subtitleSource, 334 int64_t startTimeUs, 335 int64_t minStartTimeUs, 336 int32_t startSeqNumberHint) { 337 sp<AMessage> msg = new AMessage(kWhatStart, id()); 338 339 uint32_t streamTypeMask = 0ul; 340 341 if (audioSource != NULL) { 342 msg->setPointer("audioSource", audioSource.get()); 343 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO; 344 } 345 346 if (videoSource != NULL) { 347 msg->setPointer("videoSource", videoSource.get()); 348 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO; 349 } 350 351 if (subtitleSource != NULL) { 352 msg->setPointer("subtitleSource", subtitleSource.get()); 353 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES; 354 } 355 356 msg->setInt32("streamTypeMask", streamTypeMask); 357 msg->setInt64("startTimeUs", startTimeUs); 358 msg->setInt64("minStartTimeUs", minStartTimeUs); 359 msg->setInt32("startSeqNumberHint", startSeqNumberHint); 360 msg->post(); 361} 362 363void PlaylistFetcher::pauseAsync() { 364 (new AMessage(kWhatPause, id()))->post(); 365} 366 367void PlaylistFetcher::stopAsync(bool selfTriggered) { 368 sp<AMessage> msg = new AMessage(kWhatStop, id()); 369 msg->setInt32("selfTriggered", selfTriggered); 370 msg->post(); 371} 372 373void PlaylistFetcher::resumeUntilAsync(const sp<AMessage> ¶ms) { 374 AMessage* msg = new AMessage(kWhatResumeUntil, id()); 375 msg->setMessage("params", params); 376 msg->post(); 377} 378 379void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { 380 switch (msg->what()) { 381 case kWhatStart: 382 { 383 status_t err = onStart(msg); 384 385 sp<AMessage> notify = mNotify->dup(); 386 notify->setInt32("what", kWhatStarted); 387 notify->setInt32("err", err); 388 notify->post(); 389 break; 390 } 391 392 case kWhatPause: 393 { 394 onPause(); 395 396 sp<AMessage> notify = mNotify->dup(); 397 notify->setInt32("what", kWhatPaused); 398 notify->post(); 399 break; 400 } 401 402 case kWhatStop: 403 { 404 onStop(msg); 405 406 sp<AMessage> notify = mNotify->dup(); 407 notify->setInt32("what", kWhatStopped); 408 notify->post(); 409 break; 410 } 411 412 case kWhatMonitorQueue: 413 case kWhatDownloadNext: 414 { 415 int32_t generation; 416 CHECK(msg->findInt32("generation", &generation)); 417 418 if (generation != mMonitorQueueGeneration) { 419 // Stale event 420 break; 421 } 422 423 if (msg->what() == kWhatMonitorQueue) { 424 onMonitorQueue(); 425 } else { 426 onDownloadNext(); 427 } 428 break; 429 } 430 431 case kWhatResumeUntil: 432 { 433 onResumeUntil(msg); 434 break; 435 } 436 437 default: 438 TRESPASS(); 439 } 440} 441 442status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { 443 mPacketSources.clear(); 444 445 uint32_t streamTypeMask; 446 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); 447 448 int64_t startTimeUs; 449 int32_t startSeqNumberHint; 450 CHECK(msg->findInt64("startTimeUs", &startTimeUs)); 451 CHECK(msg->findInt64("minStartTimeUs", (int64_t *) &mMinStartTimeUs)); 452 CHECK(msg->findInt32("startSeqNumberHint", &startSeqNumberHint)); 453 454 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { 455 void *ptr; 456 CHECK(msg->findPointer("audioSource", &ptr)); 457 458 mPacketSources.add( 459 LiveSession::STREAMTYPE_AUDIO, 460 static_cast<AnotherPacketSource *>(ptr)); 461 } 462 463 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) { 464 void *ptr; 465 CHECK(msg->findPointer("videoSource", &ptr)); 466 467 mPacketSources.add( 468 LiveSession::STREAMTYPE_VIDEO, 469 static_cast<AnotherPacketSource *>(ptr)); 470 } 471 472 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) { 473 void *ptr; 474 CHECK(msg->findPointer("subtitleSource", &ptr)); 475 476 mPacketSources.add( 477 LiveSession::STREAMTYPE_SUBTITLES, 478 static_cast<AnotherPacketSource *>(ptr)); 479 } 480 481 mStreamTypeMask = streamTypeMask; 482 mStartTimeUs = startTimeUs; 483 484 if (mStartTimeUs >= 0ll) { 485 mSeqNumber = -1; 486 mStartup = true; 487 mPrepared = false; 488 } 489 490 if (startSeqNumberHint >= 0) { 491 mSeqNumber = startSeqNumberHint; 492 } 493 494 postMonitorQueue(); 495 496 return OK; 497} 498 499void PlaylistFetcher::onPause() { 500 cancelMonitorQueue(); 501} 502 503void PlaylistFetcher::onStop(const sp<AMessage> &msg) { 504 cancelMonitorQueue(); 505 506 int32_t selfTriggered; 507 CHECK(msg->findInt32("selfTriggered", &selfTriggered)); 508 if (!selfTriggered) { 509 // Self triggered stops only happen during switching, in which case we do not want 510 // to clear the discontinuities queued at the end of packet sources. 511 for (size_t i = 0; i < mPacketSources.size(); i++) { 512 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 513 packetSource->clear(); 514 } 515 } 516 517 mPacketSources.clear(); 518 mStreamTypeMask = 0; 519} 520 521// Resume until we have reached the boundary timestamps listed in `msg`; when 522// the remaining time is too short (within a resume threshold) stop immediately 523// instead. 524status_t PlaylistFetcher::onResumeUntil(const sp<AMessage> &msg) { 525 sp<AMessage> params; 526 CHECK(msg->findMessage("params", ¶ms)); 527 528 bool stop = false; 529 for (size_t i = 0; i < mPacketSources.size(); i++) { 530 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 531 532 const char *stopKey; 533 int streamType = mPacketSources.keyAt(i); 534 switch (streamType) { 535 case LiveSession::STREAMTYPE_VIDEO: 536 stopKey = "timeUsVideo"; 537 break; 538 539 case LiveSession::STREAMTYPE_AUDIO: 540 stopKey = "timeUsAudio"; 541 break; 542 543 case LiveSession::STREAMTYPE_SUBTITLES: 544 stopKey = "timeUsSubtitle"; 545 break; 546 547 default: 548 TRESPASS(); 549 } 550 551 // Don't resume if we would stop within a resume threshold. 552 int64_t latestTimeUs = 0, stopTimeUs = 0; 553 sp<AMessage> latestMeta = packetSource->getLatestMeta(); 554 if (latestMeta != NULL 555 && (latestMeta->findInt64("timeUs", &latestTimeUs) 556 && params->findInt64(stopKey, &stopTimeUs))) { 557 int64_t diffUs = stopTimeUs - latestTimeUs; 558 if (diffUs < resumeThreshold(latestMeta)) { 559 stop = true; 560 } 561 } 562 } 563 564 if (stop) { 565 for (size_t i = 0; i < mPacketSources.size(); i++) { 566 mPacketSources.valueAt(i)->queueAccessUnit(mSession->createFormatChangeBuffer()); 567 } 568 stopAsync(/* selfTriggered = */ true); 569 return OK; 570 } 571 572 mStopParams = params; 573 postMonitorQueue(); 574 575 return OK; 576} 577 578void PlaylistFetcher::notifyError(status_t err) { 579 sp<AMessage> notify = mNotify->dup(); 580 notify->setInt32("what", kWhatError); 581 notify->setInt32("err", err); 582 notify->post(); 583} 584 585void PlaylistFetcher::queueDiscontinuity( 586 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) { 587 for (size_t i = 0; i < mPacketSources.size(); ++i) { 588 mPacketSources.valueAt(i)->queueDiscontinuity(type, extra); 589 } 590} 591 592void PlaylistFetcher::onMonitorQueue() { 593 bool downloadMore = false; 594 refreshPlaylist(); 595 596 int32_t targetDurationSecs; 597 int64_t targetDurationUs = kMinBufferedDurationUs; 598 if (mPlaylist != NULL) { 599 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 600 targetDurationUs = targetDurationSecs * 1000000ll; 601 } 602 603 // buffer at least 3 times the target duration, or up to 10 seconds 604 int64_t durationToBufferUs = targetDurationUs * 3; 605 if (durationToBufferUs > kMinBufferedDurationUs) { 606 durationToBufferUs = kMinBufferedDurationUs; 607 } 608 609 int64_t bufferedDurationUs = 0ll; 610 status_t finalResult = NOT_ENOUGH_DATA; 611 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { 612 sp<AnotherPacketSource> packetSource = 613 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 614 615 bufferedDurationUs = 616 packetSource->getBufferedDurationUs(&finalResult); 617 finalResult = OK; 618 } else { 619 // Use max stream duration to prevent us from waiting on a non-existent stream; 620 // when we cannot make out from the manifest what streams are included in a playlist 621 // we might assume extra streams. 622 for (size_t i = 0; i < mPacketSources.size(); ++i) { 623 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { 624 continue; 625 } 626 627 int64_t bufferedStreamDurationUs = 628 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); 629 ALOGV("buffered %lld for stream %d", 630 bufferedStreamDurationUs, mPacketSources.keyAt(i)); 631 if (bufferedStreamDurationUs > bufferedDurationUs) { 632 bufferedDurationUs = bufferedStreamDurationUs; 633 } 634 } 635 } 636 downloadMore = (bufferedDurationUs < durationToBufferUs); 637 638 // signal start if buffered up at least the target size 639 if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) { 640 mPrepared = true; 641 642 ALOGV("prepared, buffered=%lld > %lld", 643 bufferedDurationUs, targetDurationUs); 644 sp<AMessage> msg = mNotify->dup(); 645 msg->setInt32("what", kWhatTemporarilyDoneFetching); 646 msg->post(); 647 } 648 649 if (finalResult == OK && downloadMore) { 650 ALOGV("monitoring, buffered=%lld < %lld", 651 bufferedDurationUs, durationToBufferUs); 652 // delay the next download slightly; hopefully this gives other concurrent fetchers 653 // a better chance to run. 654 // onDownloadNext(); 655 sp<AMessage> msg = new AMessage(kWhatDownloadNext, id()); 656 msg->setInt32("generation", mMonitorQueueGeneration); 657 msg->post(1000l); 658 } else { 659 // Nothing to do yet, try again in a second. 660 661 sp<AMessage> msg = mNotify->dup(); 662 msg->setInt32("what", kWhatTemporarilyDoneFetching); 663 msg->post(); 664 665 int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2; 666 ALOGV("pausing for %lld, buffered=%lld > %lld", 667 delayUs, bufferedDurationUs, durationToBufferUs); 668 // :TRICKY: need to enforce minimum delay because the delay to 669 // refresh the playlist will become 0 670 postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0); 671 } 672} 673 674status_t PlaylistFetcher::refreshPlaylist() { 675 if (delayUsToRefreshPlaylist() <= 0) { 676 bool unchanged; 677 sp<M3UParser> playlist = mSession->fetchPlaylist( 678 mURI.c_str(), mPlaylistHash, &unchanged); 679 680 if (playlist == NULL) { 681 if (unchanged) { 682 // We succeeded in fetching the playlist, but it was 683 // unchanged from the last time we tried. 684 685 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) { 686 mRefreshState = (RefreshState)(mRefreshState + 1); 687 } 688 } else { 689 ALOGE("failed to load playlist at url '%s'", mURI.c_str()); 690 notifyError(ERROR_IO); 691 return ERROR_IO; 692 } 693 } else { 694 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; 695 mPlaylist = playlist; 696 697 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 698 updateDuration(); 699 } 700 } 701 702 mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); 703 } 704 return OK; 705} 706 707void PlaylistFetcher::onDownloadNext() { 708 if (refreshPlaylist() != OK) { 709 return; 710 } 711 712 int32_t firstSeqNumberInPlaylist; 713 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 714 "media-sequence", &firstSeqNumberInPlaylist)) { 715 firstSeqNumberInPlaylist = 0; 716 } 717 718 bool seekDiscontinuity = false; 719 bool explicitDiscontinuity = false; 720 721 const int32_t lastSeqNumberInPlaylist = 722 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 723 724 if (mStartup && mSeqNumber >= 0 725 && (mSeqNumber < firstSeqNumberInPlaylist || mSeqNumber > lastSeqNumberInPlaylist)) { 726 // in case we guessed wrong during reconfiguration, try fetching the latest content. 727 mSeqNumber = lastSeqNumberInPlaylist; 728 } 729 730 if (mSeqNumber < 0) { 731 CHECK_GE(mStartTimeUs, 0ll); 732 733 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 734 mSeqNumber = getSeqNumberForTime(mStartTimeUs); 735 ALOGV("Initial sequence number for time %lld is %d from (%d .. %d)", 736 mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, 737 lastSeqNumberInPlaylist); 738 } else { 739 // If this is a live session, start 3 segments from the end. 740 mSeqNumber = lastSeqNumberInPlaylist - 3; 741 if (mSeqNumber < firstSeqNumberInPlaylist) { 742 mSeqNumber = firstSeqNumberInPlaylist; 743 } 744 ALOGV("Initial sequence number for live event %d from (%d .. %d)", 745 mSeqNumber, firstSeqNumberInPlaylist, 746 lastSeqNumberInPlaylist); 747 } 748 749 mStartTimeUs = -1ll; 750 } 751 752 if (mSeqNumber < firstSeqNumberInPlaylist 753 || mSeqNumber > lastSeqNumberInPlaylist) { 754 if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) { 755 ++mNumRetries; 756 757 if (mSeqNumber > lastSeqNumberInPlaylist) { 758 // refresh in increasing fraction (1/2, 1/3, ...) of the 759 // playlist's target duration or 3 seconds, whichever is less 760 int32_t targetDurationSecs; 761 CHECK(mPlaylist->meta()->findInt32( 762 "target-duration", &targetDurationSecs)); 763 int64_t delayUs = mPlaylist->size() * targetDurationSecs * 764 1000000ll / (1 + mNumRetries); 765 if (delayUs > kMaxMonitorDelayUs) { 766 delayUs = kMaxMonitorDelayUs; 767 } 768 ALOGV("sequence number high: %d from (%d .. %d), " 769 "monitor in %lld (retry=%d)", 770 mSeqNumber, firstSeqNumberInPlaylist, 771 lastSeqNumberInPlaylist, delayUs, mNumRetries); 772 postMonitorQueue(delayUs); 773 return; 774 } 775 776 // we've missed the boat, let's start from the lowest sequence 777 // number available and signal a discontinuity. 778 779 ALOGI("We've missed the boat, restarting playback." 780 " mStartup=%d, was looking for %d in %d-%d", 781 mStartup, mSeqNumber, firstSeqNumberInPlaylist, 782 lastSeqNumberInPlaylist); 783 mSeqNumber = lastSeqNumberInPlaylist - 3; 784 if (mSeqNumber < firstSeqNumberInPlaylist) { 785 mSeqNumber = firstSeqNumberInPlaylist; 786 } 787 explicitDiscontinuity = true; 788 789 // fall through 790 } else { 791 ALOGE("Cannot find sequence number %d in playlist " 792 "(contains %d - %d)", 793 mSeqNumber, firstSeqNumberInPlaylist, 794 firstSeqNumberInPlaylist + mPlaylist->size() - 1); 795 796 notifyError(ERROR_END_OF_STREAM); 797 return; 798 } 799 } 800 801 mNumRetries = 0; 802 803 AString uri; 804 sp<AMessage> itemMeta; 805 CHECK(mPlaylist->itemAt( 806 mSeqNumber - firstSeqNumberInPlaylist, 807 &uri, 808 &itemMeta)); 809 810 int32_t val; 811 if (itemMeta->findInt32("discontinuity", &val) && val != 0) { 812 explicitDiscontinuity = true; 813 } 814 815 int64_t range_offset, range_length; 816 if (!itemMeta->findInt64("range-offset", &range_offset) 817 || !itemMeta->findInt64("range-length", &range_length)) { 818 range_offset = 0; 819 range_length = -1; 820 } 821 822 ALOGV("fetching segment %d from (%d .. %d)", 823 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); 824 825 ALOGV("fetching '%s'", uri.c_str()); 826 827 sp<ABuffer> buffer; 828 status_t err = mSession->fetchFile( 829 uri.c_str(), &buffer, range_offset, range_length); 830 831 if (err != OK) { 832 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); 833 notifyError(err); 834 return; 835 } 836 837 CHECK(buffer != NULL); 838 839 err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer); 840 if (err == OK) { 841 err = checkDecryptPadding(buffer); 842 } 843 844 if (err != OK) { 845 ALOGE("decryptBuffer failed w/ error %d", err); 846 847 notifyError(err); 848 return; 849 } 850 851 if (mStartup || seekDiscontinuity || explicitDiscontinuity) { 852 // Signal discontinuity. 853 854 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 855 // If this was a live event this made no sense since 856 // we don't have access to all the segment before the current 857 // one. 858 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); 859 } 860 861 if (seekDiscontinuity || explicitDiscontinuity) { 862 ALOGI("queueing discontinuity (seek=%d, explicit=%d)", 863 seekDiscontinuity, explicitDiscontinuity); 864 865 queueDiscontinuity( 866 explicitDiscontinuity 867 ? ATSParser::DISCONTINUITY_FORMATCHANGE 868 : ATSParser::DISCONTINUITY_SEEK, 869 NULL /* extra */); 870 } 871 } 872 873 err = extractAndQueueAccessUnits(buffer, itemMeta); 874 875 if (err == -EAGAIN) { 876 // bad starting sequence number hint 877 postMonitorQueue(); 878 return; 879 } 880 881 if (err == ERROR_OUT_OF_RANGE) { 882 // reached stopping point 883 stopAsync(/* selfTriggered = */ true); 884 return; 885 } 886 887 if (err != OK) { 888 notifyError(err); 889 return; 890 } 891 892 ++mSeqNumber; 893 894 postMonitorQueue(); 895 896 mStartup = false; 897} 898 899int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { 900 int32_t firstSeqNumberInPlaylist; 901 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 902 "media-sequence", &firstSeqNumberInPlaylist)) { 903 firstSeqNumberInPlaylist = 0; 904 } 905 906 size_t index = 0; 907 int64_t segmentStartUs = 0; 908 while (index < mPlaylist->size()) { 909 sp<AMessage> itemMeta; 910 CHECK(mPlaylist->itemAt( 911 index, NULL /* uri */, &itemMeta)); 912 913 int64_t itemDurationUs; 914 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 915 916 if (timeUs < segmentStartUs + itemDurationUs) { 917 break; 918 } 919 920 segmentStartUs += itemDurationUs; 921 ++index; 922 } 923 924 if (index >= mPlaylist->size()) { 925 index = mPlaylist->size() - 1; 926 } 927 928 return firstSeqNumberInPlaylist + index; 929} 930 931status_t PlaylistFetcher::extractAndQueueAccessUnits( 932 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { 933 if (buffer->size() > 0 && buffer->data()[0] == 0x47) { 934 // Let's assume this is an MPEG2 transport stream. 935 936 if ((buffer->size() % 188) != 0) { 937 ALOGE("MPEG2 transport stream is not an even multiple of 188 " 938 "bytes in length."); 939 return ERROR_MALFORMED; 940 } 941 942 if (mTSParser == NULL) { 943 // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers. 944 mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE); 945 } 946 947 if (mNextPTSTimeUs >= 0ll) { 948 sp<AMessage> extra = new AMessage; 949 // Since we are using absolute timestamps, signal an offset of 0 to prevent 950 // ATSParser from skewing the timestamps of access units. 951 extra->setInt64(IStreamListener::kKeyMediaTimeUs, 0); 952 953 mTSParser->signalDiscontinuity( 954 ATSParser::DISCONTINUITY_SEEK, extra); 955 956 mNextPTSTimeUs = -1ll; 957 } 958 959 size_t offset = 0; 960 while (offset < buffer->size()) { 961 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); 962 963 if (err != OK) { 964 return err; 965 } 966 967 offset += 188; 968 } 969 970 status_t err = OK; 971 for (size_t i = mPacketSources.size(); i-- > 0;) { 972 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 973 974 const char *key; 975 ATSParser::SourceType type; 976 const LiveSession::StreamType stream = mPacketSources.keyAt(i); 977 switch (stream) { 978 979 case LiveSession::STREAMTYPE_VIDEO: 980 type = ATSParser::VIDEO; 981 key = "timeUsVideo"; 982 break; 983 984 case LiveSession::STREAMTYPE_AUDIO: 985 type = ATSParser::AUDIO; 986 key = "timeUsAudio"; 987 break; 988 989 case LiveSession::STREAMTYPE_SUBTITLES: 990 { 991 ALOGE("MPEG2 Transport streams do not contain subtitles."); 992 return ERROR_MALFORMED; 993 break; 994 } 995 996 default: 997 TRESPASS(); 998 } 999 1000 sp<AnotherPacketSource> source = 1001 static_cast<AnotherPacketSource *>( 1002 mTSParser->getSource(type).get()); 1003 1004 if (source == NULL) { 1005 ALOGW("MPEG2 Transport stream does not contain %s data.", 1006 type == ATSParser::VIDEO ? "video" : "audio"); 1007 1008 mStreamTypeMask &= ~mPacketSources.keyAt(i); 1009 mPacketSources.removeItemsAt(i); 1010 continue; 1011 } 1012 1013 int64_t timeUs; 1014 sp<ABuffer> accessUnit; 1015 status_t finalResult; 1016 while (source->hasBufferAvailable(&finalResult) 1017 && source->dequeueAccessUnit(&accessUnit) == OK) { 1018 1019 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs)); 1020 if (mMinStartTimeUs > 0) { 1021 if (timeUs < mMinStartTimeUs) { 1022 // TODO untested path 1023 // try a later ts 1024 int32_t targetDuration; 1025 mPlaylist->meta()->findInt32("target-duration", &targetDuration); 1026 int32_t incr = (mMinStartTimeUs - timeUs) / 1000000 / targetDuration; 1027 if (incr == 0) { 1028 // increment mSeqNumber by at least one 1029 incr = 1; 1030 } 1031 mSeqNumber += incr; 1032 err = -EAGAIN; 1033 break; 1034 } else { 1035 int64_t startTimeUs; 1036 if (mStartTimeUsNotify != NULL 1037 && !mStartTimeUsNotify->findInt64(key, &startTimeUs)) { 1038 mStartTimeUsNotify->setInt64(key, timeUs); 1039 1040 uint32_t streamMask = 0; 1041 mStartTimeUsNotify->findInt32("streamMask", (int32_t *) &streamMask); 1042 streamMask |= mPacketSources.keyAt(i); 1043 mStartTimeUsNotify->setInt32("streamMask", streamMask); 1044 1045 if (streamMask == mStreamTypeMask) { 1046 mStartTimeUsNotify->post(); 1047 mStartTimeUsNotify.clear(); 1048 } 1049 } 1050 } 1051 } 1052 1053 if (mStopParams != NULL) { 1054 // Queue discontinuity in original stream. 1055 int64_t stopTimeUs; 1056 if (!mStopParams->findInt64(key, &stopTimeUs) || timeUs >= stopTimeUs) { 1057 packetSource->queueAccessUnit(mSession->createFormatChangeBuffer()); 1058 mStreamTypeMask &= ~stream; 1059 mPacketSources.removeItemsAt(i); 1060 break; 1061 } 1062 } 1063 1064 // Note that we do NOT dequeue any discontinuities except for format change. 1065 1066 // for simplicity, store a reference to the format in each unit 1067 sp<MetaData> format = source->getFormat(); 1068 if (format != NULL) { 1069 accessUnit->meta()->setObject("format", format); 1070 } 1071 1072 // Stash the sequence number so we can hint future fetchers where to start at. 1073 accessUnit->meta()->setInt32("seq", mSeqNumber); 1074 packetSource->queueAccessUnit(accessUnit); 1075 } 1076 1077 if (err != OK) { 1078 break; 1079 } 1080 } 1081 1082 if (err != OK) { 1083 for (size_t i = mPacketSources.size(); i-- > 0;) { 1084 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 1085 packetSource->clear(); 1086 } 1087 return err; 1088 } 1089 1090 if (!mStreamTypeMask) { 1091 // Signal gap is filled between original and new stream. 1092 ALOGV("ERROR OUT OF RANGE"); 1093 return ERROR_OUT_OF_RANGE; 1094 } 1095 1096 return OK; 1097 } else if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) { 1098 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { 1099 ALOGE("This stream only contains subtitles."); 1100 return ERROR_MALFORMED; 1101 } 1102 1103 const sp<AnotherPacketSource> packetSource = 1104 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 1105 1106 int64_t durationUs; 1107 CHECK(itemMeta->findInt64("durationUs", &durationUs)); 1108 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); 1109 buffer->meta()->setInt64("durationUs", durationUs); 1110 buffer->meta()->setInt32("seq", mSeqNumber); 1111 1112 packetSource->queueAccessUnit(buffer); 1113 return OK; 1114 } 1115 1116 if (mNextPTSTimeUs >= 0ll) { 1117 mFirstPTSValid = false; 1118 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 1119 mNextPTSTimeUs = -1ll; 1120 } 1121 1122 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio 1123 // stream prefixed by an ID3 tag. 1124 1125 bool firstID3Tag = true; 1126 uint64_t PTS = 0; 1127 1128 for (;;) { 1129 // Make sure to skip all ID3 tags preceding the audio data. 1130 // At least one must be present to provide the PTS timestamp. 1131 1132 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */); 1133 if (!id3.isValid()) { 1134 if (firstID3Tag) { 1135 ALOGE("Unable to parse ID3 tag."); 1136 return ERROR_MALFORMED; 1137 } else { 1138 break; 1139 } 1140 } 1141 1142 if (firstID3Tag) { 1143 bool found = false; 1144 1145 ID3::Iterator it(id3, "PRIV"); 1146 while (!it.done()) { 1147 size_t length; 1148 const uint8_t *data = it.getData(&length); 1149 1150 static const char *kMatchName = 1151 "com.apple.streaming.transportStreamTimestamp"; 1152 static const size_t kMatchNameLen = strlen(kMatchName); 1153 1154 if (length == kMatchNameLen + 1 + 8 1155 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) { 1156 found = true; 1157 PTS = U64_AT(&data[kMatchNameLen + 1]); 1158 } 1159 1160 it.next(); 1161 } 1162 1163 if (!found) { 1164 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag."); 1165 return ERROR_MALFORMED; 1166 } 1167 } 1168 1169 // skip the ID3 tag 1170 buffer->setRange( 1171 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize()); 1172 1173 firstID3Tag = false; 1174 } 1175 1176 if (!mFirstPTSValid) { 1177 mFirstPTSValid = true; 1178 mFirstPTS = PTS; 1179 } 1180 PTS -= mFirstPTS; 1181 1182 int64_t timeUs = (PTS * 100ll) / 9ll + mAbsoluteTimeAnchorUs; 1183 1184 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { 1185 ALOGW("This stream only contains audio data!"); 1186 1187 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO; 1188 1189 if (mStreamTypeMask == 0) { 1190 return OK; 1191 } 1192 } 1193 1194 sp<AnotherPacketSource> packetSource = 1195 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO); 1196 1197 if (packetSource->getFormat() == NULL && buffer->size() >= 7) { 1198 ABitReader bits(buffer->data(), buffer->size()); 1199 1200 // adts_fixed_header 1201 1202 CHECK_EQ(bits.getBits(12), 0xfffu); 1203 bits.skipBits(3); // ID, layer 1204 bool protection_absent = bits.getBits(1) != 0; 1205 1206 unsigned profile = bits.getBits(2); 1207 CHECK_NE(profile, 3u); 1208 unsigned sampling_freq_index = bits.getBits(4); 1209 bits.getBits(1); // private_bit 1210 unsigned channel_configuration = bits.getBits(3); 1211 CHECK_NE(channel_configuration, 0u); 1212 bits.skipBits(2); // original_copy, home 1213 1214 sp<MetaData> meta = MakeAACCodecSpecificData( 1215 profile, sampling_freq_index, channel_configuration); 1216 1217 meta->setInt32(kKeyIsADTS, true); 1218 1219 packetSource->setFormat(meta); 1220 } 1221 1222 int64_t numSamples = 0ll; 1223 int32_t sampleRate; 1224 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); 1225 1226 size_t offset = 0; 1227 while (offset < buffer->size()) { 1228 const uint8_t *adtsHeader = buffer->data() + offset; 1229 CHECK_LT(offset + 5, buffer->size()); 1230 1231 unsigned aac_frame_length = 1232 ((adtsHeader[3] & 3) << 11) 1233 | (adtsHeader[4] << 3) 1234 | (adtsHeader[5] >> 5); 1235 1236 CHECK_LE(offset + aac_frame_length, buffer->size()); 1237 1238 sp<ABuffer> unit = new ABuffer(aac_frame_length); 1239 memcpy(unit->data(), adtsHeader, aac_frame_length); 1240 1241 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; 1242 unit->meta()->setInt64("timeUs", unitTimeUs); 1243 1244 // Each AAC frame encodes 1024 samples. 1245 numSamples += 1024; 1246 1247 unit->meta()->setInt32("seq", mSeqNumber); 1248 packetSource->queueAccessUnit(unit); 1249 1250 offset += aac_frame_length; 1251 } 1252 1253 return OK; 1254} 1255 1256void PlaylistFetcher::updateDuration() { 1257 int64_t durationUs = 0ll; 1258 for (size_t index = 0; index < mPlaylist->size(); ++index) { 1259 sp<AMessage> itemMeta; 1260 CHECK(mPlaylist->itemAt( 1261 index, NULL /* uri */, &itemMeta)); 1262 1263 int64_t itemDurationUs; 1264 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1265 1266 durationUs += itemDurationUs; 1267 } 1268 1269 sp<AMessage> msg = mNotify->dup(); 1270 msg->setInt32("what", kWhatDurationUpdate); 1271 msg->setInt64("durationUs", durationUs); 1272 msg->post(); 1273} 1274 1275int64_t PlaylistFetcher::resumeThreshold(const sp<AMessage> &msg) { 1276 int64_t durationUs, threshold; 1277 if (msg->findInt64("durationUs", &durationUs)) { 1278 return kNumSkipFrames * durationUs; 1279 } 1280 1281 sp<RefBase> obj; 1282 msg->findObject("format", &obj); 1283 MetaData *format = static_cast<MetaData *>(obj.get()); 1284 1285 const char *mime; 1286 CHECK(format->findCString(kKeyMIMEType, &mime)); 1287 bool audio = !strncasecmp(mime, "audio/", 6); 1288 if (audio) { 1289 // Assumes 1000 samples per frame. 1290 int32_t sampleRate; 1291 CHECK(format->findInt32(kKeySampleRate, &sampleRate)); 1292 return kNumSkipFrames /* frames */ * 1000 /* samples */ 1293 * (1000000 / sampleRate) /* sample duration (us) */; 1294 } else { 1295 int32_t frameRate; 1296 if (format->findInt32(kKeyFrameRate, &frameRate) && frameRate > 0) { 1297 return kNumSkipFrames * (1000000 / frameRate); 1298 } 1299 } 1300 1301 return 500000ll; 1302} 1303 1304} // namespace android 1305