LiveSession.cpp revision df42f949c8bd05b81d94633767514fff88f52062
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "LiveSession" 19#include <utils/Log.h> 20 21#include "include/LiveSession.h" 22 23#include "LiveDataSource.h" 24 25#include "include/M3UParser.h" 26#include "include/NuHTTPDataSource.h" 27 28#include <cutils/properties.h> 29#include <media/stagefright/foundation/hexdump.h> 30#include <media/stagefright/foundation/ABuffer.h> 31#include <media/stagefright/foundation/ADebug.h> 32#include <media/stagefright/foundation/AMessage.h> 33#include <media/stagefright/DataSource.h> 34#include <media/stagefright/FileSource.h> 35#include <media/stagefright/MediaErrors.h> 36 37#include <ctype.h> 38#include <openssl/aes.h> 39 40namespace android { 41 42const int64_t LiveSession::kMaxPlaylistAgeUs = 15000000ll; 43 44LiveSession::LiveSession() 45 : mDataSource(new LiveDataSource), 46 mHTTPDataSource(new NuHTTPDataSource), 47 mPrevBandwidthIndex(-1), 48 mLastPlaylistFetchTimeUs(-1), 49 mSeqNumber(-1), 50 mSeekTimeUs(-1), 51 mNumRetries(0), 52 mDurationUs(-1), 53 mSeekDone(false), 54 mMonitorQueueGeneration(0) { 55} 56 57LiveSession::~LiveSession() { 58} 59 60sp<DataSource> LiveSession::getDataSource() { 61 return mDataSource; 62} 63 64void LiveSession::connect(const char *url) { 65 sp<AMessage> msg = new AMessage(kWhatConnect, id()); 66 msg->setString("url", url); 67 msg->post(); 68} 69 70void LiveSession::disconnect() { 71 (new AMessage(kWhatDisconnect, id()))->post(); 72} 73 74void LiveSession::seekTo(int64_t timeUs) { 75 Mutex::Autolock autoLock(mLock); 76 mSeekDone = false; 77 78 sp<AMessage> msg = new AMessage(kWhatSeek, id()); 79 msg->setInt64("timeUs", timeUs); 80 msg->post(); 81 82 while (!mSeekDone) { 83 mCondition.wait(mLock); 84 } 85} 86 87void LiveSession::onMessageReceived(const sp<AMessage> &msg) { 88 switch (msg->what()) { 89 case kWhatConnect: 90 onConnect(msg); 91 break; 92 93 case kWhatDisconnect: 94 onDisconnect(); 95 break; 96 97 case kWhatMonitorQueue: 98 { 99 int32_t generation; 100 CHECK(msg->findInt32("generation", &generation)); 101 102 if (generation != mMonitorQueueGeneration) { 103 // Stale event 104 break; 105 } 106 107 onMonitorQueue(); 108 break; 109 } 110 111 case kWhatSeek: 112 onSeek(msg); 113 break; 114 115 default: 116 TRESPASS(); 117 break; 118 } 119} 120 121// static 122int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) { 123 if (a->mBandwidth < b->mBandwidth) { 124 return -1; 125 } else if (a->mBandwidth == b->mBandwidth) { 126 return 0; 127 } 128 129 return 1; 130} 131 132void LiveSession::onConnect(const sp<AMessage> &msg) { 133 AString url; 134 CHECK(msg->findString("url", &url)); 135 136 LOGI("onConnect '%s'", url.c_str()); 137 138 mMasterURL = url; 139 140 sp<M3UParser> playlist = fetchPlaylist(url.c_str()); 141 CHECK(playlist != NULL); 142 143 if (playlist->isVariantPlaylist()) { 144 for (size_t i = 0; i < playlist->size(); ++i) { 145 BandwidthItem item; 146 147 sp<AMessage> meta; 148 playlist->itemAt(i, &item.mURI, &meta); 149 150 unsigned long bandwidth; 151 CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth)); 152 153 mBandwidthItems.push(item); 154 } 155 156 CHECK_GT(mBandwidthItems.size(), 0u); 157 158 mBandwidthItems.sort(SortByBandwidth); 159 160 if (mBandwidthItems.size() > 1) { 161 // XXX Remove the lowest bitrate stream for now... 162 mBandwidthItems.removeAt(0); 163 } 164 } 165 166 postMonitorQueue(); 167} 168 169void LiveSession::onDisconnect() { 170 LOGI("onDisconnect"); 171 172 mDataSource->queueEOS(ERROR_END_OF_STREAM); 173} 174 175status_t LiveSession::fetchFile(const char *url, sp<ABuffer> *out) { 176 *out = NULL; 177 178 sp<DataSource> source; 179 180 if (!strncasecmp(url, "file://", 7)) { 181 source = new FileSource(url + 7); 182 } else if (strncasecmp(url, "http://", 7)) { 183 return ERROR_UNSUPPORTED; 184 } else { 185 status_t err = mHTTPDataSource->connect(url); 186 187 if (err != OK) { 188 return err; 189 } 190 191 source = mHTTPDataSource; 192 } 193 194 off64_t size; 195 status_t err = source->getSize(&size); 196 197 if (err != OK) { 198 size = 65536; 199 } 200 201 sp<ABuffer> buffer = new ABuffer(size); 202 buffer->setRange(0, 0); 203 204 for (;;) { 205 size_t bufferRemaining = buffer->capacity() - buffer->size(); 206 207 if (bufferRemaining == 0) { 208 bufferRemaining = 32768; 209 210 LOGV("increasing download buffer to %d bytes", 211 buffer->size() + bufferRemaining); 212 213 sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining); 214 memcpy(copy->data(), buffer->data(), buffer->size()); 215 copy->setRange(0, buffer->size()); 216 217 buffer = copy; 218 } 219 220 ssize_t n = source->readAt( 221 buffer->size(), buffer->data() + buffer->size(), 222 bufferRemaining); 223 224 if (n < 0) { 225 return err; 226 } 227 228 if (n == 0) { 229 break; 230 } 231 232 buffer->setRange(0, buffer->size() + (size_t)n); 233 } 234 235 *out = buffer; 236 237 return OK; 238} 239 240sp<M3UParser> LiveSession::fetchPlaylist(const char *url) { 241 sp<ABuffer> buffer; 242 status_t err = fetchFile(url, &buffer); 243 244 if (err != OK) { 245 return NULL; 246 } 247 248 sp<M3UParser> playlist = 249 new M3UParser(url, buffer->data(), buffer->size()); 250 251 if (playlist->initCheck() != OK) { 252 return NULL; 253 } 254 255 return playlist; 256} 257 258static double uniformRand() { 259 return (double)rand() / RAND_MAX; 260} 261 262size_t LiveSession::getBandwidthIndex() { 263 if (mBandwidthItems.size() == 0) { 264 return 0; 265 } 266 267#if 1 268 int32_t bandwidthBps; 269 if (mHTTPDataSource != NULL 270 && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) { 271 LOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f); 272 } else { 273 LOGV("no bandwidth estimate."); 274 return 0; // Pick the lowest bandwidth stream by default. 275 } 276 277 char value[PROPERTY_VALUE_MAX]; 278 if (property_get("media.httplive.max-bw", value, NULL)) { 279 char *end; 280 long maxBw = strtoul(value, &end, 10); 281 if (end > value && *end == '\0') { 282 if (maxBw > 0 && bandwidthBps > maxBw) { 283 LOGV("bandwidth capped to %ld bps", maxBw); 284 bandwidthBps = maxBw; 285 } 286 } 287 } 288 289 // Consider only 80% of the available bandwidth usable. 290 bandwidthBps = (bandwidthBps * 8) / 10; 291 292 // Pick the highest bandwidth stream below or equal to estimated bandwidth. 293 294 size_t index = mBandwidthItems.size() - 1; 295 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth 296 > (size_t)bandwidthBps) { 297 --index; 298 } 299#elif 0 300 // Change bandwidth at random() 301 size_t index = uniformRand() * mBandwidthItems.size(); 302#elif 0 303 // There's a 50% chance to stay on the current bandwidth and 304 // a 50% chance to switch to the next higher bandwidth (wrapping around 305 // to lowest) 306 const size_t kMinIndex = 0; 307 308 size_t index; 309 if (mPrevBandwidthIndex < 0) { 310 index = kMinIndex; 311 } else if (uniformRand() < 0.5) { 312 index = (size_t)mPrevBandwidthIndex; 313 } else { 314 index = mPrevBandwidthIndex + 1; 315 if (index == mBandwidthItems.size()) { 316 index = kMinIndex; 317 } 318 } 319#elif 0 320 // Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec 321 322 size_t index = mBandwidthItems.size() - 1; 323 while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) { 324 --index; 325 } 326#else 327 size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream 328#endif 329 330 return index; 331} 332 333void LiveSession::onDownloadNext() { 334 size_t bandwidthIndex = getBandwidthIndex(); 335 336 int64_t nowUs = ALooper::GetNowUs(); 337 338 if (mLastPlaylistFetchTimeUs < 0 339 || (ssize_t)bandwidthIndex != mPrevBandwidthIndex 340 || (!mPlaylist->isComplete() 341 && mLastPlaylistFetchTimeUs + kMaxPlaylistAgeUs <= nowUs)) { 342 AString url; 343 if (mBandwidthItems.size() > 0) { 344 url = mBandwidthItems.editItemAt(bandwidthIndex).mURI; 345 } else { 346 url = mMasterURL; 347 } 348 349 bool firstTime = (mPlaylist == NULL); 350 351 mPlaylist = fetchPlaylist(url.c_str()); 352 if (mPlaylist == NULL) { 353 LOGE("failed to load playlist at url '%s'", url.c_str()); 354 mDataSource->queueEOS(ERROR_IO); 355 return; 356 } 357 358 if (firstTime) { 359 Mutex::Autolock autoLock(mLock); 360 361 int32_t targetDuration; 362 if (!mPlaylist->isComplete() 363 || !mPlaylist->meta()->findInt32( 364 "target-duration", &targetDuration)) { 365 mDurationUs = -1; 366 } else { 367 mDurationUs = 1000000ll * targetDuration * mPlaylist->size(); 368 } 369 } 370 371 mLastPlaylistFetchTimeUs = ALooper::GetNowUs(); 372 } 373 374 int32_t firstSeqNumberInPlaylist; 375 if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32( 376 "media-sequence", &firstSeqNumberInPlaylist)) { 377 firstSeqNumberInPlaylist = 0; 378 } 379 380 bool explicitDiscontinuity = false; 381 382 if (mSeekTimeUs >= 0) { 383 int32_t targetDuration; 384 if (mPlaylist->isComplete() && 385 mPlaylist->meta()->findInt32( 386 "target-duration", &targetDuration)) { 387 int64_t seekTimeSecs = (mSeekTimeUs + 500000ll) / 1000000ll; 388 int64_t index = seekTimeSecs / targetDuration; 389 390 if (index >= 0 && index < mPlaylist->size()) { 391 mSeqNumber = firstSeqNumberInPlaylist + index; 392 mDataSource->reset(); 393 394 explicitDiscontinuity = true; 395 } 396 } 397 398 mSeekTimeUs = -1; 399 400 Mutex::Autolock autoLock(mLock); 401 mSeekDone = true; 402 mCondition.broadcast(); 403 } 404 405 if (mSeqNumber < 0) { 406 if (mPlaylist->isComplete()) { 407 mSeqNumber = firstSeqNumberInPlaylist; 408 } else { 409 mSeqNumber = firstSeqNumberInPlaylist + mPlaylist->size() / 2; 410 } 411 } 412 413 int32_t lastSeqNumberInPlaylist = 414 firstSeqNumberInPlaylist + (int32_t)mPlaylist->size() - 1; 415 416 if (mSeqNumber < firstSeqNumberInPlaylist 417 || mSeqNumber > lastSeqNumberInPlaylist) { 418 if (!mPlaylist->isComplete() 419 && mSeqNumber > lastSeqNumberInPlaylist 420 && mNumRetries < kMaxNumRetries) { 421 ++mNumRetries; 422 423 mLastPlaylistFetchTimeUs = -1; 424 postMonitorQueue(1000000ll); 425 return; 426 } 427 428 LOGE("Cannot find sequence number %d in playlist " 429 "(contains %d - %d)", 430 mSeqNumber, firstSeqNumberInPlaylist, 431 firstSeqNumberInPlaylist + mPlaylist->size() - 1); 432 433 mDataSource->queueEOS(ERROR_END_OF_STREAM); 434 return; 435 } 436 437 mNumRetries = 0; 438 439 AString uri; 440 sp<AMessage> itemMeta; 441 CHECK(mPlaylist->itemAt( 442 mSeqNumber - firstSeqNumberInPlaylist, 443 &uri, 444 &itemMeta)); 445 446 int32_t val; 447 if (itemMeta->findInt32("discontinuity", &val) && val != 0) { 448 explicitDiscontinuity = true; 449 } 450 451 sp<ABuffer> buffer; 452 status_t err = fetchFile(uri.c_str(), &buffer); 453 if (err != OK) { 454 LOGE("failed to fetch .ts segment at url '%s'", uri.c_str()); 455 mDataSource->queueEOS(err); 456 return; 457 } 458 459 CHECK_EQ((status_t)OK, 460 decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer)); 461 462 if (buffer->size() == 0 || buffer->data()[0] != 0x47) { 463 // Not a transport stream??? 464 465 LOGE("This doesn't look like a transport stream..."); 466 467 mDataSource->queueEOS(ERROR_UNSUPPORTED); 468 return; 469 } 470 471 bool bandwidthChanged = 472 mPrevBandwidthIndex >= 0 473 && (size_t)mPrevBandwidthIndex != bandwidthIndex; 474 475 if (explicitDiscontinuity || bandwidthChanged) { 476 // Signal discontinuity. 477 478 sp<ABuffer> tmp = new ABuffer(188); 479 memset(tmp->data(), 0, tmp->size()); 480 tmp->data()[1] = bandwidthChanged; 481 482 mDataSource->queueBuffer(tmp); 483 } 484 485 mDataSource->queueBuffer(buffer); 486 487 mPrevBandwidthIndex = bandwidthIndex; 488 ++mSeqNumber; 489 490 postMonitorQueue(); 491} 492 493void LiveSession::onMonitorQueue() { 494 if (mSeekTimeUs >= 0 495 || mDataSource->countQueuedBuffers() < kMaxNumQueuedFragments) { 496 onDownloadNext(); 497 } else { 498 postMonitorQueue(1000000ll); 499 } 500} 501 502status_t LiveSession::decryptBuffer( 503 size_t playlistIndex, const sp<ABuffer> &buffer) { 504 sp<AMessage> itemMeta; 505 bool found = false; 506 AString method; 507 508 for (ssize_t i = playlistIndex; i >= 0; --i) { 509 AString uri; 510 CHECK(mPlaylist->itemAt(i, &uri, &itemMeta)); 511 512 if (itemMeta->findString("cipher-method", &method)) { 513 found = true; 514 break; 515 } 516 } 517 518 if (!found) { 519 method = "NONE"; 520 } 521 522 if (method == "NONE") { 523 return OK; 524 } else if (!(method == "AES-128")) { 525 LOGE("Unsupported cipher method '%s'", method.c_str()); 526 return ERROR_UNSUPPORTED; 527 } 528 529 AString keyURI; 530 if (!itemMeta->findString("cipher-uri", &keyURI)) { 531 LOGE("Missing key uri"); 532 return ERROR_MALFORMED; 533 } 534 535 ssize_t index = mAESKeyForURI.indexOfKey(keyURI); 536 537 sp<ABuffer> key; 538 if (index >= 0) { 539 key = mAESKeyForURI.valueAt(index); 540 } else { 541 key = new ABuffer(16); 542 543 sp<NuHTTPDataSource> keySource = new NuHTTPDataSource; 544 status_t err = keySource->connect(keyURI.c_str()); 545 546 if (err == OK) { 547 size_t offset = 0; 548 while (offset < 16) { 549 ssize_t n = keySource->readAt( 550 offset, key->data() + offset, 16 - offset); 551 if (n <= 0) { 552 err = ERROR_IO; 553 break; 554 } 555 556 offset += n; 557 } 558 } 559 560 if (err != OK) { 561 LOGE("failed to fetch cipher key from '%s'.", keyURI.c_str()); 562 return ERROR_IO; 563 } 564 565 mAESKeyForURI.add(keyURI, key); 566 } 567 568 AES_KEY aes_key; 569 if (AES_set_decrypt_key(key->data(), 128, &aes_key) != 0) { 570 LOGE("failed to set AES decryption key."); 571 return UNKNOWN_ERROR; 572 } 573 574 unsigned char aes_ivec[16]; 575 576 AString iv; 577 if (itemMeta->findString("cipher-iv", &iv)) { 578 if ((!iv.startsWith("0x") && !iv.startsWith("0X")) 579 || iv.size() != 16 * 2 + 2) { 580 LOGE("malformed cipher IV '%s'.", iv.c_str()); 581 return ERROR_MALFORMED; 582 } 583 584 memset(aes_ivec, 0, sizeof(aes_ivec)); 585 for (size_t i = 0; i < 16; ++i) { 586 char c1 = tolower(iv.c_str()[2 + 2 * i]); 587 char c2 = tolower(iv.c_str()[3 + 2 * i]); 588 if (!isxdigit(c1) || !isxdigit(c2)) { 589 LOGE("malformed cipher IV '%s'.", iv.c_str()); 590 return ERROR_MALFORMED; 591 } 592 uint8_t nibble1 = isdigit(c1) ? c1 - '0' : c1 - 'a' + 10; 593 uint8_t nibble2 = isdigit(c2) ? c2 - '0' : c2 - 'a' + 10; 594 595 aes_ivec[i] = nibble1 << 4 | nibble2; 596 } 597 } else { 598 memset(aes_ivec, 0, sizeof(aes_ivec)); 599 aes_ivec[15] = mSeqNumber & 0xff; 600 aes_ivec[14] = (mSeqNumber >> 8) & 0xff; 601 aes_ivec[13] = (mSeqNumber >> 16) & 0xff; 602 aes_ivec[12] = (mSeqNumber >> 24) & 0xff; 603 } 604 605 AES_cbc_encrypt( 606 buffer->data(), buffer->data(), buffer->size(), 607 &aes_key, aes_ivec, AES_DECRYPT); 608 609 // hexdump(buffer->data(), buffer->size()); 610 611 size_t n = buffer->size(); 612 CHECK_GT(n, 0u); 613 614 size_t pad = buffer->data()[n - 1]; 615 616 CHECK_GT(pad, 0u); 617 CHECK_LE(pad, 16u); 618 CHECK_GE((size_t)n, pad); 619 for (size_t i = 0; i < pad; ++i) { 620 CHECK_EQ((unsigned)buffer->data()[n - 1 - i], pad); 621 } 622 623 n -= pad; 624 625 buffer->setRange(buffer->offset(), n); 626 627 return OK; 628} 629 630void LiveSession::postMonitorQueue(int64_t delayUs) { 631 sp<AMessage> msg = new AMessage(kWhatMonitorQueue, id()); 632 msg->setInt32("generation", ++mMonitorQueueGeneration); 633 msg->post(delayUs); 634} 635 636void LiveSession::onSeek(const sp<AMessage> &msg) { 637 int64_t timeUs; 638 CHECK(msg->findInt64("timeUs", &timeUs)); 639 640 mSeekTimeUs = timeUs; 641 postMonitorQueue(); 642} 643 644status_t LiveSession::getDuration(int64_t *durationUs) { 645 Mutex::Autolock autoLock(mLock); 646 *durationUs = mDurationUs; 647 648 return OK; 649} 650 651bool LiveSession::isSeekable() { 652 int64_t durationUs; 653 return getDuration(&durationUs) == OK && durationUs >= 0; 654} 655 656} // namespace android 657 658