PlaylistFetcher.cpp revision 84333e0475bc911adc16417f4ca327c975cf6c36
1/* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "PlaylistFetcher" 19#include <utils/Log.h> 20 21#include "PlaylistFetcher.h" 22 23#include "LiveDataSource.h" 24#include "LiveSession.h" 25#include "M3UParser.h" 26 27#include "include/avc_utils.h" 28#include "include/HTTPBase.h" 29#include "include/ID3.h" 30#include "mpeg2ts/AnotherPacketSource.h" 31 32#include <media/IStreamSource.h> 33#include <media/stagefright/foundation/ABitReader.h> 34#include <media/stagefright/foundation/ABuffer.h> 35#include <media/stagefright/foundation/ADebug.h> 36#include <media/stagefright/foundation/hexdump.h> 37#include <media/stagefright/FileSource.h> 38#include <media/stagefright/MediaDefs.h> 39#include <media/stagefright/MetaData.h> 40#include <media/stagefright/Utils.h> 41 42#include <ctype.h> 43#include <openssl/aes.h> 44#include <openssl/md5.h> 45 46namespace android { 47 48// static 49const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll; 50const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll; 51 52PlaylistFetcher::PlaylistFetcher( 53 const sp<AMessage> ¬ify, 54 const sp<LiveSession> &session, 55 const char *uri) 56 : mNotify(notify), 57 mSession(session), 58 mURI(uri), 59 mStreamTypeMask(0), 60 mStartTimeUs(-1ll), 61 mLastPlaylistFetchTimeUs(-1ll), 62 mSeqNumber(-1), 63 mNumRetries(0), 64 mStartup(true), 65 mPrepared(false), 66 mNextPTSTimeUs(-1ll), 67 mMonitorQueueGeneration(0), 68 mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY), 69 mFirstPTSValid(false), 70 mAbsoluteTimeAnchorUs(0ll) { 71 memset(mPlaylistHash, 0, sizeof(mPlaylistHash)); 72} 73 74PlaylistFetcher::~PlaylistFetcher() { 75} 76 77int64_t PlaylistFetcher::getSegmentStartTimeUs(int32_t seqNumber) const { 78 CHECK(mPlaylist != NULL); 79 80 int32_t firstSeqNumberInPlaylist; 81 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 82 "media-sequence", &firstSeqNumberInPlaylist)) { 83 firstSeqNumberInPlaylist = 0; 84 } 85 86 int32_t lastSeqNumberInPlaylist = 87 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 88 89 CHECK_GE(seqNumber, firstSeqNumberInPlaylist); 90 CHECK_LE(seqNumber, lastSeqNumberInPlaylist); 91 92 int64_t segmentStartUs = 0ll; 93 for (int32_t index = 0; 94 index < seqNumber - firstSeqNumberInPlaylist; ++index) { 95 sp<AMessage> itemMeta; 96 CHECK(mPlaylist->itemAt( 97 index, NULL /* uri */, &itemMeta)); 98 99 int64_t itemDurationUs; 100 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 101 102 segmentStartUs += itemDurationUs; 103 } 104 105 return segmentStartUs; 106} 107 108int64_t PlaylistFetcher::delayUsToRefreshPlaylist() const { 109 int64_t nowUs = ALooper::GetNowUs(); 110 111 if (mPlaylist == NULL || mLastPlaylistFetchTimeUs < 0ll) { 112 CHECK_EQ((int)mRefreshState, (int)INITIAL_MINIMUM_RELOAD_DELAY); 113 return 0ll; 114 } 115 116 if (mPlaylist->isComplete()) { 117 return (~0llu >> 1); 118 } 119 120 int32_t targetDurationSecs; 121 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 122 123 int64_t targetDurationUs = targetDurationSecs * 1000000ll; 124 125 int64_t minPlaylistAgeUs; 126 127 switch (mRefreshState) { 128 case INITIAL_MINIMUM_RELOAD_DELAY: 129 { 130 size_t n = mPlaylist->size(); 131 if (n > 0) { 132 sp<AMessage> itemMeta; 133 CHECK(mPlaylist->itemAt(n - 1, NULL /* uri */, &itemMeta)); 134 135 int64_t itemDurationUs; 136 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 137 138 minPlaylistAgeUs = itemDurationUs; 139 break; 140 } 141 142 // fall through 143 } 144 145 case FIRST_UNCHANGED_RELOAD_ATTEMPT: 146 { 147 minPlaylistAgeUs = targetDurationUs / 2; 148 break; 149 } 150 151 case SECOND_UNCHANGED_RELOAD_ATTEMPT: 152 { 153 minPlaylistAgeUs = (targetDurationUs * 3) / 2; 154 break; 155 } 156 157 case THIRD_UNCHANGED_RELOAD_ATTEMPT: 158 { 159 minPlaylistAgeUs = targetDurationUs * 3; 160 break; 161 } 162 163 default: 164 TRESPASS(); 165 break; 166 } 167 168 int64_t delayUs = mLastPlaylistFetchTimeUs + minPlaylistAgeUs - nowUs; 169 return delayUs > 0ll ? delayUs : 0ll; 170} 171 172status_t PlaylistFetcher::decryptBuffer( 173 size_t playlistIndex, const sp<ABuffer> &buffer) { 174 sp<AMessage> itemMeta; 175 bool found = false; 176 AString method; 177 178 for (ssize_t i = playlistIndex; i >= 0; --i) { 179 AString uri; 180 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); 181 182 if (itemMeta->findString("cipher-method", &method)) { 183 found = true; 184 break; 185 } 186 } 187 188 if (!found) { 189 method = "NONE"; 190 } 191 192 if (method == "NONE") { 193 return OK; 194 } else if (!(method == "AES-128")) { 195 ALOGE("Unsupported cipher method '%s'", method.c_str()); 196 return ERROR_UNSUPPORTED; 197 } 198 199 AString keyURI; 200 if (!itemMeta->findString("cipher-uri", &keyURI)) { 201 ALOGE("Missing key uri"); 202 return ERROR_MALFORMED; 203 } 204 205 ssize_t index = mAESKeyForURI.indexOfKey(keyURI); 206 207 sp<ABuffer> key; 208 if (index >= 0) { 209 key = mAESKeyForURI.valueAt(index); 210 } else { 211 status_t err = mSession->fetchFile(keyURI.c_str(), &key); 212 213 if (err != OK) { 214 ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); 215 return ERROR_IO; 216 } else if (key->size() != 16) { 217 ALOGE("key file '%s' wasn't 16 bytes in size.", keyURI.c_str()); 218 return ERROR_MALFORMED; 219 } 220 221 mAESKeyForURI.add(keyURI, key); 222 } 223 224 AES_KEY aes_key; 225 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { 226 ALOGE("failed to set AES decryption key."); 227 return UNKNOWN_ERROR; 228 } 229 230 unsigned char aes_ivec[16]; 231 232 AString iv; 233 if (itemMeta->findString("cipher-iv", &iv)) { 234 if ((!iv.startsWith("0x") && !iv.startsWith("0X")) 235 || iv.size() != 16 * 2 + 2) { 236 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 237 return ERROR_MALFORMED; 238 } 239 240 memset(aes_ivec, 0, sizeof(aes_ivec)); 241 for (size_t i = 0; i < 16; ++i) { 242 char c1 = tolower(iv.c_str()[2 + 2 * i]); 243 char c2 = tolower(iv.c_str()[3 + 2 * i]); 244 if (!isxdigit(c1) || !isxdigit(c2)) { 245 ALOGE("malformed cipher IV '%s'.", iv.c_str()); 246 return ERROR_MALFORMED; 247 } 248 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; 249 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; 250 251 aes_ivec[i] = nibble1 << 4 | nibble2; 252 } 253 } else { 254 memset(aes_ivec, 0, sizeof(aes_ivec)); 255 aes_ivec[15] = mSeqNumber & 0xff; 256 aes_ivec[14] = (mSeqNumber >> 8) & 0xff; 257 aes_ivec[13] = (mSeqNumber >> 16) & 0xff; 258 aes_ivec[12] = (mSeqNumber >> 24) & 0xff; 259 } 260 261 AES_cbc_encrypt( 262 buffer->data(), buffer->data(), buffer->size(), 263 &aes_key, aes_ivec, AES_DECRYPT); 264 265 // hexdump(buffer->data(), buffer->size()); 266 267 size_t n = buffer->size(); 268 CHECK_GT(n, 0u); 269 270 size_t pad = buffer->data()[n - 1]; 271 272 CHECK_GT(pad, 0u); 273 CHECK_LE(pad, 16u); 274 CHECK_GE((size_t)n, pad); 275 for (size_t i = 0; i < pad; ++i) { 276 CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad); 277 } 278 279 n -= pad; 280 281 buffer->setRange(buffer->offset(), n); 282 283 return OK; 284} 285 286void PlaylistFetcher::postMonitorQueue(int64_t delayUs, int64_t minDelayUs) { 287 int64_t maxDelayUs = delayUsToRefreshPlaylist(); 288 if (maxDelayUs < minDelayUs) { 289 maxDelayUs = minDelayUs; 290 } 291 if (delayUs > maxDelayUs) { 292 ALOGV("Need to refresh playlist in %lld", maxDelayUs); 293 delayUs = maxDelayUs; 294 } 295 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); 296 msg->setInt32("generation", mMonitorQueueGeneration); 297 msg->post(delayUs); 298} 299 300void PlaylistFetcher::cancelMonitorQueue() { 301 ++mMonitorQueueGeneration; 302} 303 304void PlaylistFetcher::startAsync( 305 const sp<AnotherPacketSource> &audioSource, 306 const sp<AnotherPacketSource> &videoSource, 307 const sp<AnotherPacketSource> &subtitleSource, 308 int64_t startTimeUs) { 309 sp<AMessage> msg = new AMessage(kWhatStart, id()); 310 311 uint32_t streamTypeMask = 0ul; 312 313 if (audioSource != NULL) { 314 msg->setPointer("audioSource", audioSource.get()); 315 streamTypeMask |= LiveSession::STREAMTYPE_AUDIO; 316 } 317 318 if (videoSource != NULL) { 319 msg->setPointer("videoSource", videoSource.get()); 320 streamTypeMask |= LiveSession::STREAMTYPE_VIDEO; 321 } 322 323 if (subtitleSource != NULL) { 324 msg->setPointer("subtitleSource", subtitleSource.get()); 325 streamTypeMask |= LiveSession::STREAMTYPE_SUBTITLES; 326 } 327 328 msg->setInt32("streamTypeMask", streamTypeMask); 329 msg->setInt64("startTimeUs", startTimeUs); 330 msg->post(); 331} 332 333void PlaylistFetcher::pauseAsync() { 334 (new AMessage(kWhatPause, id()))->post(); 335} 336 337void PlaylistFetcher::stopAsync() { 338 (new AMessage(kWhatStop, id()))->post(); 339} 340 341void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) { 342 switch (msg->what()) { 343 case kWhatStart: 344 { 345 status_t err = onStart(msg); 346 347 sp<AMessage> notify = mNotify->dup(); 348 notify->setInt32("what", kWhatStarted); 349 notify->setInt32("err", err); 350 notify->post(); 351 break; 352 } 353 354 case kWhatPause: 355 { 356 onPause(); 357 358 sp<AMessage> notify = mNotify->dup(); 359 notify->setInt32("what", kWhatPaused); 360 notify->post(); 361 break; 362 } 363 364 case kWhatStop: 365 { 366 onStop(); 367 368 sp<AMessage> notify = mNotify->dup(); 369 notify->setInt32("what", kWhatStopped); 370 notify->post(); 371 break; 372 } 373 374 case kWhatMonitorQueue: 375 { 376 int32_t generation; 377 CHECK(msg->findInt32("generation", &generation)); 378 379 if (generation != mMonitorQueueGeneration) { 380 // Stale event 381 break; 382 } 383 384 onMonitorQueue(); 385 break; 386 } 387 388 default: 389 TRESPASS(); 390 } 391} 392 393status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) { 394 mPacketSources.clear(); 395 396 uint32_t streamTypeMask; 397 CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask)); 398 399 int64_t startTimeUs; 400 CHECK(msg->findInt64("startTimeUs", &startTimeUs)); 401 402 if (streamTypeMask & LiveSession::STREAMTYPE_AUDIO) { 403 void *ptr; 404 CHECK(msg->findPointer("audioSource", &ptr)); 405 406 mPacketSources.add( 407 LiveSession::STREAMTYPE_AUDIO, 408 static_cast<AnotherPacketSource *>(ptr)); 409 } 410 411 if (streamTypeMask & LiveSession::STREAMTYPE_VIDEO) { 412 void *ptr; 413 CHECK(msg->findPointer("videoSource", &ptr)); 414 415 mPacketSources.add( 416 LiveSession::STREAMTYPE_VIDEO, 417 static_cast<AnotherPacketSource *>(ptr)); 418 } 419 420 if (streamTypeMask & LiveSession::STREAMTYPE_SUBTITLES) { 421 void *ptr; 422 CHECK(msg->findPointer("subtitleSource", &ptr)); 423 424 mPacketSources.add( 425 LiveSession::STREAMTYPE_SUBTITLES, 426 static_cast<AnotherPacketSource *>(ptr)); 427 } 428 429 mStreamTypeMask = streamTypeMask; 430 mStartTimeUs = startTimeUs; 431 432 if (mStartTimeUs >= 0ll) { 433 mSeqNumber = -1; 434 mStartup = true; 435 mPrepared = false; 436 } 437 438 postMonitorQueue(); 439 440 return OK; 441} 442 443void PlaylistFetcher::onPause() { 444 cancelMonitorQueue(); 445 446 mPacketSources.clear(); 447 mStreamTypeMask = 0; 448} 449 450void PlaylistFetcher::onStop() { 451 cancelMonitorQueue(); 452 453 for (size_t i = 0; i < mPacketSources.size(); ++i) { 454 mPacketSources.valueAt(i)->clear(); 455 } 456 457 mPacketSources.clear(); 458 mStreamTypeMask = 0; 459} 460 461void PlaylistFetcher::notifyError(status_t err) { 462 sp<AMessage> notify = mNotify->dup(); 463 notify->setInt32("what", kWhatError); 464 notify->setInt32("err", err); 465 notify->post(); 466} 467 468void PlaylistFetcher::queueDiscontinuity( 469 ATSParser::DiscontinuityType type, const sp<AMessage> &extra) { 470 for (size_t i = 0; i < mPacketSources.size(); ++i) { 471 mPacketSources.valueAt(i)->queueDiscontinuity(type, extra); 472 } 473} 474 475void PlaylistFetcher::onMonitorQueue() { 476 bool downloadMore = false; 477 refreshPlaylist(); 478 479 int32_t targetDurationSecs; 480 int64_t targetDurationUs = kMinBufferedDurationUs; 481 if (mPlaylist != NULL) { 482 CHECK(mPlaylist->meta()->findInt32("target-duration", &targetDurationSecs)); 483 targetDurationUs = targetDurationSecs * 1000000ll; 484 } 485 486 // buffer at least 3 times the target duration, or up to 10 seconds 487 int64_t durationToBufferUs = targetDurationUs * 3; 488 if (durationToBufferUs > kMinBufferedDurationUs) { 489 durationToBufferUs = kMinBufferedDurationUs; 490 } 491 492 int64_t bufferedDurationUs = 0ll; 493 status_t finalResult = NOT_ENOUGH_DATA; 494 if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) { 495 sp<AnotherPacketSource> packetSource = 496 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 497 498 bufferedDurationUs = 499 packetSource->getBufferedDurationUs(&finalResult); 500 finalResult = OK; 501 } else { 502 bool first = true; 503 504 for (size_t i = 0; i < mPacketSources.size(); ++i) { 505 if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) { 506 continue; 507 } 508 509 int64_t bufferedStreamDurationUs = 510 mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult); 511 if (first || bufferedStreamDurationUs < bufferedDurationUs) { 512 bufferedDurationUs = bufferedStreamDurationUs; 513 first = false; 514 } 515 } 516 } 517 downloadMore = (bufferedDurationUs < durationToBufferUs); 518 519 // signal start if buffered up at least the target size 520 if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) { 521 mPrepared = true; 522 523 ALOGV("prepared, buffered=%lld > %lld", 524 bufferedDurationUs, targetDurationUs); 525 sp<AMessage> msg = mNotify->dup(); 526 msg->setInt32("what", kWhatTemporarilyDoneFetching); 527 msg->post(); 528 } 529 530 if (finalResult == OK && downloadMore) { 531 ALOGV("monitoring, buffered=%lld < %lld", 532 bufferedDurationUs, durationToBufferUs); 533 onDownloadNext(); 534 } else { 535 // Nothing to do yet, try again in a second. 536 537 sp<AMessage> msg = mNotify->dup(); 538 msg->setInt32("what", kWhatTemporarilyDoneFetching); 539 msg->post(); 540 541 int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2; 542 ALOGV("pausing for %lld, buffered=%lld > %lld", 543 delayUs, bufferedDurationUs, durationToBufferUs); 544 // :TRICKY: need to enforce minimum delay because the delay to 545 // refresh the playlist will become 0 546 postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0); 547 } 548} 549 550status_t PlaylistFetcher::refreshPlaylist() { 551 if (delayUsToRefreshPlaylist() <= 0) { 552 bool unchanged; 553 sp<M3UParser> playlist = mSession->fetchPlaylist( 554 mURI.c_str(), mPlaylistHash, &unchanged); 555 556 if (playlist == NULL) { 557 if (unchanged) { 558 // We succeeded in fetching the playlist, but it was 559 // unchanged from the last time we tried. 560 561 if (mRefreshState != THIRD_UNCHANGED_RELOAD_ATTEMPT) { 562 mRefreshState = (RefreshState)(mRefreshState + 1); 563 } 564 } else { 565 ALOGE("failed to load playlist at url '%s'", mURI.c_str()); 566 notifyError(ERROR_IO); 567 return ERROR_IO; 568 } 569 } else { 570 mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY; 571 mPlaylist = playlist; 572 573 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 574 updateDuration(); 575 } 576 } 577 578 mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); 579 } 580 return OK; 581} 582 583void PlaylistFetcher::onDownloadNext() { 584 if (refreshPlaylist() != OK) { 585 return; 586 } 587 588 int32_t firstSeqNumberInPlaylist; 589 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 590 "media-sequence", &firstSeqNumberInPlaylist)) { 591 firstSeqNumberInPlaylist = 0; 592 } 593 594 bool seekDiscontinuity = false; 595 bool explicitDiscontinuity = false; 596 597 const int32_t lastSeqNumberInPlaylist = 598 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 599 600 if (mSeqNumber < 0) { 601 CHECK_GE(mStartTimeUs, 0ll); 602 603 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 604 mSeqNumber = getSeqNumberForTime(mStartTimeUs); 605 ALOGV("Initial sequence number for time %lld is %d from (%d .. %d)", 606 mStartTimeUs, mSeqNumber, firstSeqNumberInPlaylist, 607 lastSeqNumberInPlaylist); 608 } else { 609 // If this is a live session, start 3 segments from the end. 610 mSeqNumber = lastSeqNumberInPlaylist - 3; 611 if (mSeqNumber < firstSeqNumberInPlaylist) { 612 mSeqNumber = firstSeqNumberInPlaylist; 613 } 614 ALOGV("Initial sequence number for live event %d from (%d .. %d)", 615 mSeqNumber, firstSeqNumberInPlaylist, 616 lastSeqNumberInPlaylist); 617 } 618 619 mStartTimeUs = -1ll; 620 } 621 622 if (mSeqNumber < firstSeqNumberInPlaylist 623 || mSeqNumber > lastSeqNumberInPlaylist) { 624 if (!mPlaylist->isComplete() && mNumRetries < kMaxNumRetries) { 625 ++mNumRetries; 626 627 if (mSeqNumber > lastSeqNumberInPlaylist) { 628 // refresh in increasing fraction (1/2, 1/3, ...) of the 629 // playlist's target duration or 3 seconds, whichever is less 630 int32_t targetDurationSecs; 631 CHECK(mPlaylist->meta()->findInt32( 632 "target-duration", &targetDurationSecs)); 633 int64_t delayUs = mPlaylist->size() * targetDurationSecs * 634 1000000ll / (1 + mNumRetries); 635 if (delayUs > kMaxMonitorDelayUs) { 636 delayUs = kMaxMonitorDelayUs; 637 } 638 ALOGV("sequence number high: %d from (%d .. %d), " 639 "monitor in %lld (retry=%d)", 640 mSeqNumber, firstSeqNumberInPlaylist, 641 lastSeqNumberInPlaylist, delayUs, mNumRetries); 642 postMonitorQueue(delayUs); 643 return; 644 } 645 646 // we've missed the boat, let's start from the lowest sequence 647 // number available and signal a discontinuity. 648 649 ALOGI("We've missed the boat, restarting playback." 650 " mStartup=%d, was looking for %d in %d-%d", 651 mStartup, mSeqNumber, firstSeqNumberInPlaylist, 652 lastSeqNumberInPlaylist); 653 mSeqNumber = lastSeqNumberInPlaylist - 3; 654 if (mSeqNumber < firstSeqNumberInPlaylist) { 655 mSeqNumber = firstSeqNumberInPlaylist; 656 } 657 explicitDiscontinuity = true; 658 659 // fall through 660 } else { 661 ALOGE("Cannot find sequence number %d in playlist " 662 "(contains %d - %d)", 663 mSeqNumber, firstSeqNumberInPlaylist, 664 firstSeqNumberInPlaylist + mPlaylist->size() - 1); 665 666 notifyError(ERROR_END_OF_STREAM); 667 return; 668 } 669 } 670 671 mNumRetries = 0; 672 673 AString uri; 674 sp<AMessage> itemMeta; 675 CHECK(mPlaylist->itemAt( 676 mSeqNumber - firstSeqNumberInPlaylist, 677 &uri, 678 &itemMeta)); 679 680 int32_t val; 681 if (itemMeta->findInt32("discontinuity", &val) && val != 0) { 682 explicitDiscontinuity = true; 683 } 684 685 int64_t range_offset, range_length; 686 if (!itemMeta->findInt64("range-offset", &range_offset) 687 || !itemMeta->findInt64("range-length", &range_length)) { 688 range_offset = 0; 689 range_length = -1; 690 } 691 692 ALOGV("fetching segment %d from (%d .. %d)", 693 mSeqNumber, firstSeqNumberInPlaylist, lastSeqNumberInPlaylist); 694 695 ALOGV("fetching '%s'", uri.c_str()); 696 697 sp<ABuffer> buffer; 698 status_t err = mSession->fetchFile( 699 uri.c_str(), &buffer, range_offset, range_length); 700 701 if (err != OK) { 702 ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); 703 notifyError(err); 704 return; 705 } 706 707 CHECK(buffer != NULL); 708 709 err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer); 710 711 if (err != OK) { 712 ALOGE("decryptBuffer failed w/ error %d", err); 713 714 notifyError(err); 715 return; 716 } 717 718 if (mStartup || seekDiscontinuity || explicitDiscontinuity) { 719 // Signal discontinuity. 720 721 if (mPlaylist->isComplete() || mPlaylist->isEvent()) { 722 // If this was a live event this made no sense since 723 // we don't have access to all the segment before the current 724 // one. 725 mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber); 726 } 727 728 if (seekDiscontinuity || explicitDiscontinuity) { 729 ALOGI("queueing discontinuity (seek=%d, explicit=%d)", 730 seekDiscontinuity, explicitDiscontinuity); 731 732 queueDiscontinuity( 733 explicitDiscontinuity 734 ? ATSParser::DISCONTINUITY_FORMATCHANGE 735 : ATSParser::DISCONTINUITY_SEEK, 736 NULL /* extra */); 737 } 738 } 739 740 err = extractAndQueueAccessUnits(buffer, itemMeta); 741 742 if (err != OK) { 743 notifyError(err); 744 return; 745 } 746 747 ++mSeqNumber; 748 749 postMonitorQueue(); 750 751 mStartup = false; 752} 753 754int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const { 755 int32_t firstSeqNumberInPlaylist; 756 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 757 "media-sequence", &firstSeqNumberInPlaylist)) { 758 firstSeqNumberInPlaylist = 0; 759 } 760 761 size_t index = 0; 762 int64_t segmentStartUs = 0; 763 while (index < mPlaylist->size()) { 764 sp<AMessage> itemMeta; 765 CHECK(mPlaylist->itemAt( 766 index, NULL /* uri */, &itemMeta)); 767 768 int64_t itemDurationUs; 769 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 770 771 if (timeUs < segmentStartUs + itemDurationUs) { 772 break; 773 } 774 775 segmentStartUs += itemDurationUs; 776 ++index; 777 } 778 779 if (index >= mPlaylist->size()) { 780 index = mPlaylist->size() - 1; 781 } 782 783 return firstSeqNumberInPlaylist + index; 784} 785 786status_t PlaylistFetcher::extractAndQueueAccessUnits( 787 const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) { 788 if (buffer->size() > 0 && buffer->data()[0] == 0x47) { 789 // Let's assume this is an MPEG2 transport stream. 790 791 if ((buffer->size() % 188) != 0) { 792 ALOGE("MPEG2 transport stream is not an even multiple of 188 " 793 "bytes in length."); 794 return ERROR_MALFORMED; 795 } 796 797 if (mTSParser == NULL) { 798 mTSParser = new ATSParser; 799 } 800 801 if (mNextPTSTimeUs >= 0ll) { 802 sp<AMessage> extra = new AMessage; 803 extra->setInt64(IStreamListener::kKeyMediaTimeUs, mNextPTSTimeUs); 804 805 mTSParser->signalDiscontinuity( 806 ATSParser::DISCONTINUITY_SEEK, extra); 807 808 mNextPTSTimeUs = -1ll; 809 } 810 811 size_t offset = 0; 812 while (offset < buffer->size()) { 813 status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188); 814 815 if (err != OK) { 816 return err; 817 } 818 819 offset += 188; 820 } 821 822 for (size_t i = mPacketSources.size(); i-- > 0;) { 823 sp<AnotherPacketSource> packetSource = mPacketSources.valueAt(i); 824 825 ATSParser::SourceType type; 826 switch (mPacketSources.keyAt(i)) { 827 case LiveSession::STREAMTYPE_VIDEO: 828 type = ATSParser::VIDEO; 829 break; 830 831 case LiveSession::STREAMTYPE_AUDIO: 832 type = ATSParser::AUDIO; 833 break; 834 835 case LiveSession::STREAMTYPE_SUBTITLES: 836 { 837 ALOGE("MPEG2 Transport streams do not contain subtitles."); 838 return ERROR_MALFORMED; 839 break; 840 } 841 842 default: 843 TRESPASS(); 844 } 845 846 sp<AnotherPacketSource> source = 847 static_cast<AnotherPacketSource *>( 848 mTSParser->getSource(type).get()); 849 850 if (source == NULL) { 851 ALOGW("MPEG2 Transport stream does not contain %s data.", 852 type == ATSParser::VIDEO ? "video" : "audio"); 853 854 mStreamTypeMask &= ~mPacketSources.keyAt(i); 855 mPacketSources.removeItemsAt(i); 856 continue; 857 } 858 859 sp<ABuffer> accessUnit; 860 status_t finalResult; 861 while (source->hasBufferAvailable(&finalResult) 862 && source->dequeueAccessUnit(&accessUnit) == OK) { 863 // Note that we do NOT dequeue any discontinuities. 864 865 // for simplicity, store a reference to the format in each unit 866 sp<MetaData> format = source->getFormat(); 867 if (format != NULL) { 868 accessUnit->meta()->setObject("format", format); 869 } 870 packetSource->queueAccessUnit(accessUnit); 871 } 872 } 873 874 return OK; 875 } else if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) { 876 if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) { 877 ALOGE("This stream only contains subtitles."); 878 return ERROR_MALFORMED; 879 } 880 881 const sp<AnotherPacketSource> packetSource = 882 mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES); 883 884 int64_t durationUs; 885 CHECK(itemMeta->findInt64("durationUs", &durationUs)); 886 buffer->meta()->setInt64("timeUs", getSegmentStartTimeUs(mSeqNumber)); 887 buffer->meta()->setInt64("durationUs", durationUs); 888 889 packetSource->queueAccessUnit(buffer); 890 return OK; 891 } 892 893 if (mNextPTSTimeUs >= 0ll) { 894 mFirstPTSValid = false; 895 mAbsoluteTimeAnchorUs = mNextPTSTimeUs; 896 mNextPTSTimeUs = -1ll; 897 } 898 899 // This better be an ISO 13818-7 (AAC) or ISO 13818-1 (MPEG) audio 900 // stream prefixed by an ID3 tag. 901 902 bool firstID3Tag = true; 903 uint64_t PTS = 0; 904 905 for (;;) { 906 // Make sure to skip all ID3 tags preceding the audio data. 907 // At least one must be present to provide the PTS timestamp. 908 909 ID3 id3(buffer->data(), buffer->size(), true /* ignoreV1 */); 910 if (!id3.isValid()) { 911 if (firstID3Tag) { 912 ALOGE("Unable to parse ID3 tag."); 913 return ERROR_MALFORMED; 914 } else { 915 break; 916 } 917 } 918 919 if (firstID3Tag) { 920 bool found = false; 921 922 ID3::Iterator it(id3, "PRIV"); 923 while (!it.done()) { 924 size_t length; 925 const uint8_t *data = it.getData(&length); 926 927 static const char *kMatchName = 928 "com.apple.streaming.transportStreamTimestamp"; 929 static const size_t kMatchNameLen = strlen(kMatchName); 930 931 if (length == kMatchNameLen + 1 + 8 932 && !strncmp((const char *)data, kMatchName, kMatchNameLen)) { 933 found = true; 934 PTS = U64_AT(&data[kMatchNameLen + 1]); 935 } 936 937 it.next(); 938 } 939 940 if (!found) { 941 ALOGE("Unable to extract transportStreamTimestamp from ID3 tag."); 942 return ERROR_MALFORMED; 943 } 944 } 945 946 // skip the ID3 tag 947 buffer->setRange( 948 buffer->offset() + id3.rawSize(), buffer->size() - id3.rawSize()); 949 950 firstID3Tag = false; 951 } 952 953 if (!mFirstPTSValid) { 954 mFirstPTSValid = true; 955 mFirstPTS = PTS; 956 } 957 PTS -= mFirstPTS; 958 959 int64_t timeUs = (PTS * 100ll) / 9ll + mAbsoluteTimeAnchorUs; 960 961 if (mStreamTypeMask != LiveSession::STREAMTYPE_AUDIO) { 962 ALOGW("This stream only contains audio data!"); 963 964 mStreamTypeMask &= LiveSession::STREAMTYPE_AUDIO; 965 966 if (mStreamTypeMask == 0) { 967 return OK; 968 } 969 } 970 971 sp<AnotherPacketSource> packetSource = 972 mPacketSources.valueFor(LiveSession::STREAMTYPE_AUDIO); 973 974 if (packetSource->getFormat() == NULL && buffer->size() >= 7) { 975 ABitReader bits(buffer->data(), buffer->size()); 976 977 // adts_fixed_header 978 979 CHECK_EQ(bits.getBits(12), 0xfffu); 980 bits.skipBits(3); // ID, layer 981 bool protection_absent = bits.getBits(1) != 0; 982 983 unsigned profile = bits.getBits(2); 984 CHECK_NE(profile, 3u); 985 unsigned sampling_freq_index = bits.getBits(4); 986 bits.getBits(1); // private_bit 987 unsigned channel_configuration = bits.getBits(3); 988 CHECK_NE(channel_configuration, 0u); 989 bits.skipBits(2); // original_copy, home 990 991 sp<MetaData> meta = MakeAACCodecSpecificData( 992 profile, sampling_freq_index, channel_configuration); 993 994 meta->setInt32(kKeyIsADTS, true); 995 996 packetSource->setFormat(meta); 997 } 998 999 int64_t numSamples = 0ll; 1000 int32_t sampleRate; 1001 CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate)); 1002 1003 size_t offset = 0; 1004 while (offset < buffer->size()) { 1005 const uint8_t *adtsHeader = buffer->data() + offset; 1006 CHECK_LT(offset + 5, buffer->size()); 1007 1008 unsigned aac_frame_length = 1009 ((adtsHeader[3] & 3) << 11) 1010 | (adtsHeader[4] << 3) 1011 | (adtsHeader[5] >> 5); 1012 1013 CHECK_LE(offset + aac_frame_length, buffer->size()); 1014 1015 sp<ABuffer> unit = new ABuffer(aac_frame_length); 1016 memcpy(unit->data(), adtsHeader, aac_frame_length); 1017 1018 int64_t unitTimeUs = timeUs + numSamples * 1000000ll / sampleRate; 1019 unit->meta()->setInt64("timeUs", unitTimeUs); 1020 1021 // Each AAC frame encodes 1024 samples. 1022 numSamples += 1024; 1023 1024 packetSource->queueAccessUnit(unit); 1025 1026 offset += aac_frame_length; 1027 } 1028 1029 return OK; 1030} 1031 1032void PlaylistFetcher::updateDuration() { 1033 int64_t durationUs = 0ll; 1034 for (size_t index = 0; index < mPlaylist->size(); ++index) { 1035 sp<AMessage> itemMeta; 1036 CHECK(mPlaylist->itemAt( 1037 index, NULL /* uri */, &itemMeta)); 1038 1039 int64_t itemDurationUs; 1040 CHECK(itemMeta->findInt64("durationUs", &itemDurationUs)); 1041 1042 durationUs += itemDurationUs; 1043 } 1044 1045 sp<AMessage> msg = mNotify->dup(); 1046 msg->setInt32("what", kWhatDurationUpdate); 1047 msg->setInt64("durationUs", durationUs); 1048 msg->post(); 1049} 1050 1051} // namespace android 1052