LiveSession.cpp revision ed8d14f6a934072cd012992c4ef16990a54baa9a
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "include/LiveSession.h" 22 23#include "LiveDataSource.h" 24 25#include "include/M3UParser.h" 26#include "include/NuHTTPDataSource.h" 27 28#include <cutils/properties.h> 29#include <media/stagefright/foundation/hexdump.h> 30#include <media/stagefright/foundation/ABuffer.h> 31#include <media/stagefright/foundation/ADebug.h> 32#include <media/stagefright/foundation/AMessage.h> 33#include <media/stagefright/DataSource.h> 34#include <media/stagefright/FileSource.h> 35#include <media/stagefright/MediaErrors.h> 36 37#include <ctype.h> 38#include <openssl/aes.h> 39 40namespace android { 41 42const int64_t LiveSession::kMaxPlaylistAgeUs = 15000000ll; 43 44LiveSession::LiveSession() 45 : mDataSource(new LiveDataSource), 46 mHTTPDataSource(new NuHTTPDataSource), 47 mPrevBandwidthIndex(-1), 48 mLastPlaylistFetchTimeUs(-1), 49 mSeqNumber(-1), 50 mSeekTimeUs(-1), 51 mNumRetries(0), 52 mDurationUs(-1), 53 mSeekDone(false), 54 mDisconnectPending(false), 55 mMonitorQueueGeneration(0) { 56} 57 58LiveSession::~LiveSession() { 59} 60 61sp<DataSource> LiveSession::getDataSource() { 62 return mDataSource; 63} 64 65void LiveSession::connect(const char *url) { 66 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 67 msg->setString("url", url); 68 msg->post(); 69} 70 71void LiveSession::disconnect() { 72 Mutex::Autolock autoLock(mLock); 73 mDisconnectPending = true; 74 75 mHTTPDataSource->disconnect(); 76 77 (new AMessage(kWhatDisconnect, id()))->post(); 78} 79 80void LiveSession::seekTo(int64_t timeUs) { 81 Mutex::Autolock autoLock(mLock); 82 mSeekDone = false; 83 84 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 85 msg->setInt64("timeUs", timeUs); 86 msg->post(); 87 88 while (!mSeekDone) { 89 mCondition.wait(mLock); 90 } 91} 92 93void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 94 switch (msg->what()) { 95 case kWhatConnect: 96 onConnect(msg); 97 break; 98 99 case kWhatDisconnect: 100 onDisconnect(); 101 break; 102 103 case kWhatMonitorQueue: 104 { 105 int32_t generation; 106 CHECK(msg->findInt32("generation", &generation)); 107 108 if (generation != mMonitorQueueGeneration) { 109 // Stale event 110 break; 111 } 112 113 onMonitorQueue(); 114 break; 115 } 116 117 case kWhatSeek: 118 onSeek(msg); 119 break; 120 121 default: 122 TRESPASS(); 123 break; 124 } 125} 126 127// static 128int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 129 if (a->mBandwidth < b->mBandwidth) { 130 return -1; 131 } else if (a->mBandwidth == b->mBandwidth) { 132 return 0; 133 } 134 135 return 1; 136} 137 138void LiveSession::onConnect(const sp<AMessage> &msg) { 139 AString url; 140 CHECK(msg->findString("url", &url)); 141 142 LOGI("onConnect '%s'", url.c_str()); 143 144 mMasterURL = url; 145 146 sp<M3UParser> playlist = fetchPlaylist(url.c_str()); 147 148 if (playlist == NULL) { 149 LOGE("unable to fetch master playlist '%s'.", url.c_str()); 150 151 mDataSource->queueEOS(ERROR_IO); 152 return; 153 } 154 155 if (playlist->isVariantPlaylist()) { 156 for (size_t i = 0; i < playlist->size(); ++i) { 157 BandwidthItem item; 158 159 sp<AMessage> meta; 160 playlist->itemAt(i, &item.mURI, &meta); 161 162 unsigned long bandwidth; 163 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 164 165 mBandwidthItems.push(item); 166 } 167 168 CHECK_GT(mBandwidthItems.size(), 0u); 169 170 mBandwidthItems.sort(SortByBandwidth); 171 } 172 173 postMonitorQueue(); 174} 175 176void LiveSession::onDisconnect() { 177 LOGI("onDisconnect"); 178 179 mDataSource->queueEOS(ERROR_END_OF_STREAM); 180 181 Mutex::Autolock autoLock(mLock); 182 mDisconnectPending = false; 183} 184 185status_t LiveSession::fetchFile(const char *url, sp<ABuffer> *out) { 186 *out = NULL; 187 188 sp<DataSource> source; 189 190 if (!strncasecmp(url, "file://", 7)) { 191 source = new FileSource(url + 7); 192 } else if (strncasecmp(url, "http://", 7)) { 193 return ERROR_UNSUPPORTED; 194 } else { 195 { 196 Mutex::Autolock autoLock(mLock); 197 198 if (mDisconnectPending) { 199 return ERROR_IO; 200 } 201 } 202 203 status_t err = mHTTPDataSource->connect(url); 204 205 if (err != OK) { 206 return err; 207 } 208 209 source = mHTTPDataSource; 210 } 211 212 off64_t size; 213 status_t err = source->getSize(&size); 214 215 if (err != OK) { 216 size = 65536; 217 } 218 219 sp<ABuffer> buffer = new ABuffer(size); 220 buffer->setRange(0, 0); 221 222 for (;;) { 223 size_t bufferRemaining = buffer->capacity() - buffer->size(); 224 225 if (bufferRemaining == 0) { 226 bufferRemaining = 32768; 227 228 LOGV("increasing download buffer to %d bytes", 229 buffer->size() + bufferRemaining); 230 231 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 232 memcpy(copy->data(), buffer->data(), buffer->size()); 233 copy->setRange(0, buffer->size()); 234 235 buffer = copy; 236 } 237 238 ssize_t n = source->readAt( 239 buffer->size(), buffer->data() + buffer->size(), 240 bufferRemaining); 241 242 if (n < 0) { 243 return n; 244 } 245 246 if (n == 0) { 247 break; 248 } 249 250 buffer->setRange(0, buffer->size() + (size_t)n); 251 } 252 253 *out = buffer; 254 255 return OK; 256} 257 258sp<M3UParser> LiveSession::fetchPlaylist(const char *url) { 259 sp<ABuffer> buffer; 260 status_t err = fetchFile(url, &buffer); 261 262 if (err != OK) { 263 return NULL; 264 } 265 266 sp<M3UParser> playlist = 267 new M3UParser(url, buffer->data(), buffer->size()); 268 269 if (playlist->initCheck() != OK) { 270 return NULL; 271 } 272 273 return playlist; 274} 275 276static double uniformRand() { 277 return (double)rand() / RAND_MAX; 278} 279 280size_t LiveSession::getBandwidthIndex() { 281 if (mBandwidthItems.size() == 0) { 282 return 0; 283 } 284 285#if 1 286 int32_t bandwidthBps; 287 if (mHTTPDataSource != NULL 288 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 289 LOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 290 } else { 291 LOGV("no bandwidth estimate."); 292 return 0; // Pick the lowest bandwidth stream by default. 293 } 294 295 char value[PROPERTY_VALUE_MAX]; 296 if (property_get("media.httplive.max-bw", value, NULL)) { 297 char *end; 298 long maxBw = strtoul(value, &end, 10); 299 if (end > value && *end == '\0') { 300 if (maxBw > 0 && bandwidthBps > maxBw) { 301 LOGV("bandwidth capped to %ld bps", maxBw); 302 bandwidthBps = maxBw; 303 } 304 } 305 } 306 307 // Consider only 80% of the available bandwidth usable. 308 bandwidthBps = (bandwidthBps * 8) / 10; 309 310 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 311 312 size_t index = mBandwidthItems.size() - 1; 313 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 314 > (size_t)bandwidthBps) { 315 --index; 316 } 317#elif 0 318 // Change bandwidth at random() 319 size_t index = uniformRand() * mBandwidthItems.size(); 320#elif 0 321 // There's a 50% chance to stay on the current bandwidth and 322 // a 50% chance to switch to the next higher bandwidth (wrapping around 323 // to lowest) 324 const size_t kMinIndex = 0; 325 326 size_t index; 327 if (mPrevBandwidthIndex < 0) { 328 index = kMinIndex; 329 } else if (uniformRand() < 0.5) { 330 index = (size_t)mPrevBandwidthIndex; 331 } else { 332 index = mPrevBandwidthIndex + 1; 333 if (index == mBandwidthItems.size()) { 334 index = kMinIndex; 335 } 336 } 337#elif 0 338 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 339 340 size_t index = mBandwidthItems.size() - 1; 341 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 342 --index; 343 } 344#else 345 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 346#endif 347 348 return index; 349} 350 351void LiveSession::onDownloadNext() { 352 size_t bandwidthIndex = getBandwidthIndex(); 353 354rinse_repeat: 355 int64_t nowUs = ALooper::GetNowUs(); 356 357 if (mLastPlaylistFetchTimeUs < 0 358 || (ssize_t)bandwidthIndex != mPrevBandwidthIndex 359 || (!mPlaylist->isComplete() 360 && mLastPlaylistFetchTimeUs + kMaxPlaylistAgeUs <= nowUs)) { 361 AString url; 362 if (mBandwidthItems.size() > 0) { 363 url = mBandwidthItems.editItemAt(bandwidthIndex).mURI; 364 } else { 365 url = mMasterURL; 366 } 367 368 bool firstTime = (mPlaylist == NULL); 369 370 mPlaylist = fetchPlaylist(url.c_str()); 371 if (mPlaylist == NULL) { 372 LOGE("failed to load playlist at url '%s'", url.c_str()); 373 mDataSource->queueEOS(ERROR_IO); 374 return; 375 } 376 377 if (firstTime) { 378 Mutex::Autolock autoLock(mLock); 379 380 int32_t targetDuration; 381 if (!mPlaylist->isComplete() 382 || !mPlaylist->meta()->findInt32( 383 "target-duration", &targetDuration)) { 384 mDurationUs = -1; 385 } else { 386 mDurationUs = 1000000ll * targetDuration * mPlaylist->size(); 387 } 388 } 389 390 mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); 391 } 392 393 int32_t firstSeqNumberInPlaylist; 394 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 395 "media-sequence", &firstSeqNumberInPlaylist)) { 396 firstSeqNumberInPlaylist = 0; 397 } 398 399 bool explicitDiscontinuity = false; 400 bool bandwidthChanged = false; 401 402 if (mSeekTimeUs >= 0) { 403 int32_t targetDuration; 404 if (mPlaylist->isComplete() && 405 mPlaylist->meta()->findInt32( 406 "target-duration", &targetDuration)) { 407 int64_t seekTimeSecs = (mSeekTimeUs + 500000ll) / 1000000ll; 408 int64_t index = seekTimeSecs / targetDuration; 409 410 if (index >= 0 && index < mPlaylist->size()) { 411 int32_t newSeqNumber = firstSeqNumberInPlaylist + index; 412 413 if (newSeqNumber != mSeqNumber) { 414 LOGI("seeking to seq no %d", newSeqNumber); 415 416 mSeqNumber = newSeqNumber; 417 418 mDataSource->reset(); 419 420 // reseting the data source will have had the 421 // side effect of discarding any previously queued 422 // bandwidth change discontinuity. 423 // Therefore we'll need to treat these explicit 424 // discontinuities as involving a bandwidth change 425 // even if they aren't directly. 426 explicitDiscontinuity = true; 427 bandwidthChanged = true; 428 } 429 } 430 } 431 432 mSeekTimeUs = -1; 433 434 Mutex::Autolock autoLock(mLock); 435 mSeekDone = true; 436 mCondition.broadcast(); 437 } 438 439 if (mSeqNumber < 0) { 440 if (mPlaylist->isComplete()) { 441 mSeqNumber = firstSeqNumberInPlaylist; 442 } else { 443 mSeqNumber = firstSeqNumberInPlaylist + mPlaylist->size() / 2; 444 } 445 } 446 447 int32_t lastSeqNumberInPlaylist = 448 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 449 450 if (mSeqNumber < firstSeqNumberInPlaylist 451 || mSeqNumber > lastSeqNumberInPlaylist) { 452 if (mPrevBandwidthIndex != (ssize_t)bandwidthIndex) { 453 // Go back to the previous bandwidth. 454 455 LOGI("new bandwidth does not have the sequence number " 456 "we're looking for, switching back to previous bandwidth"); 457 458 mLastPlaylistFetchTimeUs = -1; 459 bandwidthIndex = mPrevBandwidthIndex; 460 goto rinse_repeat; 461 } 462 463 if (!mPlaylist->isComplete() 464 && mSeqNumber > lastSeqNumberInPlaylist 465 && mNumRetries < kMaxNumRetries) { 466 ++mNumRetries; 467 468 mLastPlaylistFetchTimeUs = -1; 469 postMonitorQueue(3000000ll); 470 return; 471 } 472 473 LOGE("Cannot find sequence number %d in playlist " 474 "(contains %d - %d)", 475 mSeqNumber, firstSeqNumberInPlaylist, 476 firstSeqNumberInPlaylist + mPlaylist->size() - 1); 477 478 mDataSource->queueEOS(ERROR_END_OF_STREAM); 479 return; 480 } 481 482 mNumRetries = 0; 483 484 AString uri; 485 sp<AMessage> itemMeta; 486 CHECK(mPlaylist->itemAt( 487 mSeqNumber - firstSeqNumberInPlaylist, 488 &uri, 489 &itemMeta)); 490 491 int32_t val; 492 if (itemMeta->findInt32("discontinuity", &val) && val != 0) { 493 explicitDiscontinuity = true; 494 } 495 496 sp<ABuffer> buffer; 497 status_t err = fetchFile(uri.c_str(), &buffer); 498 if (err != OK) { 499 LOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); 500 mDataSource->queueEOS(err); 501 return; 502 } 503 504 CHECK(buffer != NULL); 505 506 err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer); 507 508 if (err != OK) { 509 LOGE("decryptBuffer failed w/ error %d", err); 510 511 mDataSource->queueEOS(err); 512 return; 513 } 514 515 if (buffer->size() == 0 || buffer->data()[0] != 0x47) { 516 // Not a transport stream??? 517 518 LOGE("This doesn't look like a transport stream..."); 519 520 mBandwidthItems.removeAt(bandwidthIndex); 521 522 if (mBandwidthItems.isEmpty()) { 523 mDataSource->queueEOS(ERROR_UNSUPPORTED); 524 return; 525 } 526 527 LOGI("Retrying with a different bandwidth stream."); 528 529 mLastPlaylistFetchTimeUs = -1; 530 bandwidthIndex = getBandwidthIndex(); 531 mPrevBandwidthIndex = bandwidthIndex; 532 mSeqNumber = -1; 533 534 goto rinse_repeat; 535 } 536 537 if ((size_t)mPrevBandwidthIndex != bandwidthIndex) { 538 bandwidthChanged = true; 539 } 540 541 if (mPrevBandwidthIndex < 0) { 542 // Don't signal a bandwidth change at the very beginning of 543 // playback. 544 bandwidthChanged = false; 545 } 546 547 if (explicitDiscontinuity || bandwidthChanged) { 548 // Signal discontinuity. 549 550 LOGI("queueing discontinuity (explicit=%d, bandwidthChanged=%d)", 551 explicitDiscontinuity, bandwidthChanged); 552 553 sp<ABuffer> tmp = new ABuffer(188); 554 memset(tmp->data(), 0, tmp->size()); 555 tmp->data()[1] = bandwidthChanged; 556 557 mDataSource->queueBuffer(tmp); 558 } 559 560 mDataSource->queueBuffer(buffer); 561 562 mPrevBandwidthIndex = bandwidthIndex; 563 ++mSeqNumber; 564 565 postMonitorQueue(); 566} 567 568void LiveSession::onMonitorQueue() { 569 if (mSeekTimeUs >= 0 570 || mDataSource->countQueuedBuffers() < kMaxNumQueuedFragments) { 571 onDownloadNext(); 572 } else { 573 postMonitorQueue(1000000ll); 574 } 575} 576 577status_t LiveSession::decryptBuffer( 578 size_t playlistIndex, const sp<ABuffer> &buffer) { 579 sp<AMessage> itemMeta; 580 bool found = false; 581 AString method; 582 583 for (ssize_t i = playlistIndex; i >= 0; --i) { 584 AString uri; 585 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); 586 587 if (itemMeta->findString("cipher-method", &method)) { 588 found = true; 589 break; 590 } 591 } 592 593 if (!found) { 594 method = "NONE"; 595 } 596 597 if (method == "NONE") { 598 return OK; 599 } else if (!(method == "AES-128")) { 600 LOGE("Unsupported cipher method '%s'", method.c_str()); 601 return ERROR_UNSUPPORTED; 602 } 603 604 AString keyURI; 605 if (!itemMeta->findString("cipher-uri", &keyURI)) { 606 LOGE("Missing key uri"); 607 return ERROR_MALFORMED; 608 } 609 610 ssize_t index = mAESKeyForURI.indexOfKey(keyURI); 611 612 sp<ABuffer> key; 613 if (index >= 0) { 614 key = mAESKeyForURI.valueAt(index); 615 } else { 616 key = new ABuffer(16); 617 618 sp<NuHTTPDataSource> keySource = new NuHTTPDataSource; 619 status_t err = keySource->connect(keyURI.c_str()); 620 621 if (err == OK) { 622 size_t offset = 0; 623 while (offset < 16) { 624 ssize_t n = keySource->readAt( 625 offset, key->data() + offset, 16 - offset); 626 if (n <= 0) { 627 err = ERROR_IO; 628 break; 629 } 630 631 offset += n; 632 } 633 } 634 635 if (err != OK) { 636 LOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); 637 return ERROR_IO; 638 } 639 640 mAESKeyForURI.add(keyURI, key); 641 } 642 643 AES_KEY aes_key; 644 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { 645 LOGE("failed to set AES decryption key."); 646 return UNKNOWN_ERROR; 647 } 648 649 unsigned char aes_ivec[16]; 650 651 AString iv; 652 if (itemMeta->findString("cipher-iv", &iv)) { 653 if ((!iv.startsWith("0x") && !iv.startsWith("0X")) 654 || iv.size() != 16 * 2 + 2) { 655 LOGE("malformed cipher IV '%s'.", iv.c_str()); 656 return ERROR_MALFORMED; 657 } 658 659 memset(aes_ivec, 0, sizeof(aes_ivec)); 660 for (size_t i = 0; i < 16; ++i) { 661 char c1 = tolower(iv.c_str()[2 + 2 * i]); 662 char c2 = tolower(iv.c_str()[3 + 2 * i]); 663 if (!isxdigit(c1) || !isxdigit(c2)) { 664 LOGE("malformed cipher IV '%s'.", iv.c_str()); 665 return ERROR_MALFORMED; 666 } 667 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; 668 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; 669 670 aes_ivec[i] = nibble1 << 4 | nibble2; 671 } 672 } else { 673 memset(aes_ivec, 0, sizeof(aes_ivec)); 674 aes_ivec[15] = mSeqNumber & 0xff; 675 aes_ivec[14] = (mSeqNumber >> 8) & 0xff; 676 aes_ivec[13] = (mSeqNumber >> 16) & 0xff; 677 aes_ivec[12] = (mSeqNumber >> 24) & 0xff; 678 } 679 680 AES_cbc_encrypt( 681 buffer->data(), buffer->data(), buffer->size(), 682 &aes_key, aes_ivec, AES_DECRYPT); 683 684 // hexdump(buffer->data(), buffer->size()); 685 686 size_t n = buffer->size(); 687 CHECK_GT(n, 0u); 688 689 size_t pad = buffer->data()[n - 1]; 690 691 CHECK_GT(pad, 0u); 692 CHECK_LE(pad, 16u); 693 CHECK_GE((size_t)n, pad); 694 for (size_t i = 0; i < pad; ++i) { 695 CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad); 696 } 697 698 n -= pad; 699 700 buffer->setRange(buffer->offset(), n); 701 702 return OK; 703} 704 705void LiveSession::postMonitorQueue(int64_t delayUs) { 706 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); 707 msg->setInt32("generation", ++mMonitorQueueGeneration); 708 msg->post(delayUs); 709} 710 711void LiveSession::onSeek(const sp<AMessage> &msg) { 712 int64_t timeUs; 713 CHECK(msg->findInt64("timeUs", &timeUs)); 714 715 mSeekTimeUs = timeUs; 716 postMonitorQueue(); 717} 718 719status_t LiveSession::getDuration(int64_t *durationUs) { 720 Mutex::Autolock autoLock(mLock); 721 *durationUs = mDurationUs; 722 723 return OK; 724} 725 726bool LiveSession::isSeekable() { 727 int64_t durationUs; 728 return getDuration(&durationUs) == OK && durationUs >= 0; 729} 730 731} // namespace android 732 733