AnotherPacketSource.cpp revision 0852843d304006e3ab333081fddda13b07193de8
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "AnotherPacketSource" 19 20#include "AnotherPacketSource.h" 21 22#include "include/avc_utils.h" 23 24#include <media/stagefright/foundation/ABuffer.h> 25#include <media/stagefright/foundation/ADebug.h> 26#include <media/stagefright/foundation/AMessage.h> 27#include <media/stagefright/foundation/AString.h> 28#include <media/stagefright/foundation/hexdump.h> 29#include <media/stagefright/MediaBuffer.h> 30#include <media/stagefright/MediaDefs.h> 31#include <media/stagefright/MetaData.h> 32#include <media/stagefright/Utils.h> 33#include <utils/Vector.h> 34 35#include <inttypes.h> 36 37namespace android { 38 39const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs 40 41AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta) 42 : mIsAudio(false), 43 mIsVideo(false), 44 mEnabled(true), 45 mFormat(NULL), 46 mLastQueuedTimeUs(0), 47 mEOSResult(OK), 48 mLatestEnqueuedMeta(NULL), 49 mLatestDequeuedMeta(NULL), 50 mQueuedDiscontinuityCount(0) { 51 setFormat(meta); 52} 53 54void AnotherPacketSource::setFormat(const sp<MetaData> &meta) { 55 if (mFormat != NULL) { 56 // Only allowed to be set once. Requires explicit clear to reset. 57 return; 58 } 59 60 mIsAudio = false; 61 mIsVideo = false; 62 63 if (meta == NULL) { 64 return; 65 } 66 67 mFormat = meta; 68 const char *mime; 69 CHECK(meta->findCString(kKeyMIMEType, &mime)); 70 71 if (!strncasecmp("audio/", mime, 6)) { 72 mIsAudio = true; 73 } else if (!strncasecmp("video/", mime, 6)) { 74 mIsVideo = true; 75 } else { 76 CHECK(!strncasecmp("text/", mime, 5) || !strncasecmp("application/", mime, 12)); 77 } 78} 79 80AnotherPacketSource::~AnotherPacketSource() { 81} 82 83status_t AnotherPacketSource::start(MetaData * /* params */) { 84 return OK; 85} 86 87status_t AnotherPacketSource::stop() { 88 return OK; 89} 90 91sp<MetaData> AnotherPacketSource::getFormat() { 92 Mutex::Autolock autoLock(mLock); 93 if (mFormat != NULL) { 94 return mFormat; 95 } 96 97 List<sp<ABuffer> >::iterator it = mBuffers.begin(); 98 while (it != mBuffers.end()) { 99 sp<ABuffer> buffer = *it; 100 int32_t discontinuity; 101 if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) { 102 sp<RefBase> object; 103 if (buffer->meta()->findObject("format", &object)) { 104 setFormat(static_cast<MetaData*>(object.get())); 105 return mFormat; 106 } 107 } 108 109 ++it; 110 } 111 return NULL; 112} 113 114status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) { 115 buffer->clear(); 116 117 Mutex::Autolock autoLock(mLock); 118 while (mEOSResult == OK && mBuffers.empty()) { 119 mCondition.wait(mLock); 120 } 121 122 if (!mBuffers.empty()) { 123 *buffer = *mBuffers.begin(); 124 mBuffers.erase(mBuffers.begin()); 125 126 int32_t discontinuity; 127 if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) { 128 if (wasFormatChange(discontinuity)) { 129 mFormat.clear(); 130 } 131 132 --mQueuedDiscontinuityCount; 133 return INFO_DISCONTINUITY; 134 } 135 136 mLatestDequeuedMeta = (*buffer)->meta()->dup(); 137 138 sp<RefBase> object; 139 if ((*buffer)->meta()->findObject("format", &object)) { 140 setFormat(static_cast<MetaData*>(object.get())); 141 } 142 143 return OK; 144 } 145 146 return mEOSResult; 147} 148 149void AnotherPacketSource::requeueAccessUnit(const sp<ABuffer> &buffer) { 150 // TODO: update corresponding book keeping info. 151 Mutex::Autolock autoLock(mLock); 152 mBuffers.push_front(buffer); 153} 154 155status_t AnotherPacketSource::read( 156 MediaBuffer **out, const ReadOptions *) { 157 *out = NULL; 158 159 Mutex::Autolock autoLock(mLock); 160 while (mEOSResult == OK && mBuffers.empty()) { 161 mCondition.wait(mLock); 162 } 163 164 if (!mBuffers.empty()) { 165 166 const sp<ABuffer> buffer = *mBuffers.begin(); 167 mBuffers.erase(mBuffers.begin()); 168 169 int32_t discontinuity; 170 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 171 if (wasFormatChange(discontinuity)) { 172 mFormat.clear(); 173 } 174 175 return INFO_DISCONTINUITY; 176 } 177 178 mLatestDequeuedMeta = buffer->meta()->dup(); 179 180 sp<RefBase> object; 181 if (buffer->meta()->findObject("format", &object)) { 182 setFormat(static_cast<MetaData*>(object.get())); 183 } 184 185 int64_t timeUs; 186 CHECK(buffer->meta()->findInt64("timeUs", &timeUs)); 187 188 MediaBuffer *mediaBuffer = new MediaBuffer(buffer); 189 190 mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs); 191 192 int32_t isSync; 193 if (buffer->meta()->findInt32("isSync", &isSync)) { 194 mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync); 195 } 196 197 *out = mediaBuffer; 198 return OK; 199 } 200 201 return mEOSResult; 202} 203 204bool AnotherPacketSource::wasFormatChange( 205 int32_t discontinuityType) const { 206 if (mIsAudio) { 207 return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0; 208 } 209 210 if (mIsVideo) { 211 return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0; 212 } 213 214 return false; 215} 216 217void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) { 218 int32_t damaged; 219 if (buffer->meta()->findInt32("damaged", &damaged) && damaged) { 220 // LOG(VERBOSE) << "discarding damaged AU"; 221 return; 222 } 223 224 Mutex::Autolock autoLock(mLock); 225 mBuffers.push_back(buffer); 226 mCondition.signal(); 227 228 int32_t discontinuity; 229 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 230 // discontinuity handling needs to be consistent with queueDiscontinuity() 231 ++mQueuedDiscontinuityCount; 232 mLastQueuedTimeUs = 0ll; 233 mEOSResult = OK; 234 mLatestEnqueuedMeta = NULL; 235 return; 236 } 237 238 int64_t lastQueuedTimeUs; 239 CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs)); 240 mLastQueuedTimeUs = lastQueuedTimeUs; 241 ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)", 242 mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6); 243 244 if (mLatestEnqueuedMeta == NULL) { 245 mLatestEnqueuedMeta = buffer->meta()->dup(); 246 } else { 247 int64_t latestTimeUs = 0; 248 int64_t frameDeltaUs = 0; 249 CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs)); 250 if (lastQueuedTimeUs > latestTimeUs) { 251 mLatestEnqueuedMeta = buffer->meta()->dup(); 252 frameDeltaUs = lastQueuedTimeUs - latestTimeUs; 253 mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs); 254 } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) { 255 // For B frames 256 frameDeltaUs = latestTimeUs - lastQueuedTimeUs; 257 mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs); 258 } 259 } 260} 261 262void AnotherPacketSource::clear() { 263 Mutex::Autolock autoLock(mLock); 264 265 mBuffers.clear(); 266 mEOSResult = OK; 267 mQueuedDiscontinuityCount = 0; 268 269 mFormat = NULL; 270 mLatestEnqueuedMeta = NULL; 271} 272 273void AnotherPacketSource::queueDiscontinuity( 274 ATSParser::DiscontinuityType type, 275 const sp<AMessage> &extra, 276 bool discard) { 277 Mutex::Autolock autoLock(mLock); 278 279 if (discard) { 280 // Leave only discontinuities in the queue. 281 List<sp<ABuffer> >::iterator it = mBuffers.begin(); 282 while (it != mBuffers.end()) { 283 sp<ABuffer> oldBuffer = *it; 284 285 int32_t oldDiscontinuityType; 286 if (!oldBuffer->meta()->findInt32( 287 "discontinuity", &oldDiscontinuityType)) { 288 it = mBuffers.erase(it); 289 continue; 290 } 291 292 ++it; 293 } 294 } 295 296 mEOSResult = OK; 297 mLastQueuedTimeUs = 0; 298 mLatestEnqueuedMeta = NULL; 299 300 if (type == ATSParser::DISCONTINUITY_NONE) { 301 return; 302 } 303 304 ++mQueuedDiscontinuityCount; 305 sp<ABuffer> buffer = new ABuffer(0); 306 buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type)); 307 buffer->meta()->setMessage("extra", extra); 308 309 mBuffers.push_back(buffer); 310 mCondition.signal(); 311} 312 313void AnotherPacketSource::signalEOS(status_t result) { 314 CHECK(result != OK); 315 316 Mutex::Autolock autoLock(mLock); 317 mEOSResult = result; 318 mCondition.signal(); 319} 320 321bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) { 322 Mutex::Autolock autoLock(mLock); 323 *finalResult = OK; 324 if (!mEnabled) { 325 return false; 326 } 327 if (!mBuffers.empty()) { 328 return true; 329 } 330 331 *finalResult = mEOSResult; 332 return false; 333} 334 335bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) { 336 Mutex::Autolock autoLock(mLock); 337 *finalResult = OK; 338 if (!mEnabled) { 339 return false; 340 } 341 List<sp<ABuffer> >::iterator it; 342 for (it = mBuffers.begin(); it != mBuffers.end(); it++) { 343 int32_t discontinuity; 344 if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) { 345 return true; 346 } 347 } 348 349 *finalResult = mEOSResult; 350 return false; 351} 352 353int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) { 354 Mutex::Autolock autoLock(mLock); 355 return getBufferedDurationUs_l(finalResult); 356} 357 358int64_t AnotherPacketSource::getBufferedDurationUs_l(status_t *finalResult) { 359 *finalResult = mEOSResult; 360 361 if (mBuffers.empty()) { 362 return 0; 363 } 364 365 int64_t time1 = -1; 366 int64_t time2 = -1; 367 int64_t durationUs = 0; 368 369 List<sp<ABuffer> >::iterator it; 370 for (it = mBuffers.begin(); it != mBuffers.end(); it++) { 371 const sp<ABuffer> &buffer = *it; 372 373 int32_t discard; 374 if (buffer->meta()->findInt32("discard", &discard) && discard) { 375 continue; 376 } 377 378 int64_t timeUs; 379 if (buffer->meta()->findInt64("timeUs", &timeUs)) { 380 if (time1 < 0 || timeUs < time1) { 381 time1 = timeUs; 382 } 383 384 if (time2 < 0 || timeUs > time2) { 385 time2 = timeUs; 386 } 387 } else { 388 // This is a discontinuity, reset everything. 389 durationUs += time2 - time1; 390 time1 = time2 = -1; 391 } 392 } 393 394 return durationUs + (time2 - time1); 395} 396 397// A cheaper but less precise version of getBufferedDurationUs that we would like to use in 398// LiveSession::dequeueAccessUnit to trigger downwards adaptation. 399int64_t AnotherPacketSource::getEstimatedDurationUs() { 400 Mutex::Autolock autoLock(mLock); 401 if (mBuffers.empty()) { 402 return 0; 403 } 404 405 if (mQueuedDiscontinuityCount > 0) { 406 status_t finalResult; 407 return getBufferedDurationUs_l(&finalResult); 408 } 409 410 sp<ABuffer> buffer; 411 int32_t discard; 412 int64_t startTimeUs = -1ll; 413 List<sp<ABuffer> >::iterator it; 414 for (it = mBuffers.begin(); it != mBuffers.end(); it++) { 415 buffer = *it; 416 if (buffer->meta()->findInt32("discard", &discard) && discard) { 417 continue; 418 } 419 buffer->meta()->findInt64("timeUs", &startTimeUs); 420 break; 421 } 422 423 if (startTimeUs < 0) { 424 return 0; 425 } 426 427 it = mBuffers.end(); 428 --it; 429 buffer = *it; 430 431 int64_t endTimeUs; 432 buffer->meta()->findInt64("timeUs", &endTimeUs); 433 if (endTimeUs < 0) { 434 return 0; 435 } 436 437 int64_t diffUs; 438 if (endTimeUs > startTimeUs) { 439 diffUs = endTimeUs - startTimeUs; 440 } else { 441 diffUs = startTimeUs - endTimeUs; 442 } 443 return diffUs; 444} 445 446status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) { 447 *timeUs = 0; 448 449 Mutex::Autolock autoLock(mLock); 450 451 if (mBuffers.empty()) { 452 return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK; 453 } 454 455 sp<ABuffer> buffer = *mBuffers.begin(); 456 CHECK(buffer->meta()->findInt64("timeUs", timeUs)); 457 458 return OK; 459} 460 461bool AnotherPacketSource::isFinished(int64_t duration) const { 462 if (duration > 0) { 463 int64_t diff = duration - mLastQueuedTimeUs; 464 if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) { 465 ALOGV("Detecting EOS due to near end"); 466 return true; 467 } 468 } 469 return (mEOSResult != OK); 470} 471 472sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() { 473 Mutex::Autolock autoLock(mLock); 474 return mLatestEnqueuedMeta; 475} 476 477sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() { 478 Mutex::Autolock autoLock(mLock); 479 return mLatestDequeuedMeta; 480} 481 482void AnotherPacketSource::enable(bool enable) { 483 Mutex::Autolock autoLock(mLock); 484 mEnabled = enable; 485} 486 487/* 488 * returns the sample meta that's delayUs after queue head 489 * (NULL if such sample is unavailable) 490 */ 491sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) { 492 Mutex::Autolock autoLock(mLock); 493 int64_t firstUs = -1; 494 int64_t lastUs = -1; 495 int64_t durationUs = 0; 496 497 List<sp<ABuffer> >::iterator it; 498 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { 499 const sp<ABuffer> &buffer = *it; 500 int32_t discontinuity; 501 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 502 durationUs += lastUs - firstUs; 503 firstUs = -1; 504 lastUs = -1; 505 continue; 506 } 507 int64_t timeUs; 508 if (buffer->meta()->findInt64("timeUs", &timeUs)) { 509 if (firstUs < 0) { 510 firstUs = timeUs; 511 } 512 if (lastUs < 0 || timeUs > lastUs) { 513 lastUs = timeUs; 514 } 515 if (durationUs + (lastUs - firstUs) >= delayUs) { 516 return buffer->meta(); 517 } 518 } 519 } 520 return NULL; 521} 522 523/* 524 * removes samples with time equal or after meta 525 */ 526void AnotherPacketSource::trimBuffersAfterMeta( 527 const sp<AMessage> &meta) { 528 if (meta == NULL) { 529 ALOGW("trimming with NULL meta, ignoring"); 530 return; 531 } 532 533 Mutex::Autolock autoLock(mLock); 534 if (mBuffers.empty()) { 535 return; 536 } 537 538 HLSTime stopTime(meta); 539 ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld", 540 stopTime.mSeq, (long long)stopTime.mTimeUs); 541 542 List<sp<ABuffer> >::iterator it; 543 sp<AMessage> newLatestEnqueuedMeta = NULL; 544 int64_t newLastQueuedTimeUs = 0; 545 size_t newDiscontinuityCount = 0; 546 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { 547 const sp<ABuffer> &buffer = *it; 548 int32_t discontinuity; 549 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 550 newDiscontinuityCount++; 551 continue; 552 } 553 554 HLSTime curTime(buffer->meta()); 555 if (!(curTime < stopTime)) { 556 ALOGV("trimming from %lld (inclusive) to end", 557 (long long)curTime.mTimeUs); 558 break; 559 } 560 newLatestEnqueuedMeta = buffer->meta(); 561 newLastQueuedTimeUs = curTime.mTimeUs; 562 } 563 mBuffers.erase(it, mBuffers.end()); 564 mLatestEnqueuedMeta = newLatestEnqueuedMeta; 565 mLastQueuedTimeUs = newLastQueuedTimeUs; 566 mQueuedDiscontinuityCount = newDiscontinuityCount; 567} 568 569/* 570 * removes samples with time equal or before meta; 571 * returns first sample left in the queue. 572 * 573 * (for AVC, if trim happens, the samples left will always start 574 * at next IDR.) 575 */ 576sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta( 577 const sp<AMessage> &meta) { 578 HLSTime startTime(meta); 579 ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld", 580 startTime.mSeq, (long long)startTime.mTimeUs); 581 582 sp<AMessage> firstMeta; 583 Mutex::Autolock autoLock(mLock); 584 if (mBuffers.empty()) { 585 return NULL; 586 } 587 588 sp<MetaData> format; 589 bool isAvc = false; 590 591 List<sp<ABuffer> >::iterator it; 592 size_t discontinuityCount = 0; 593 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { 594 const sp<ABuffer> &buffer = *it; 595 int32_t discontinuity; 596 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 597 format = NULL; 598 isAvc = false; 599 discontinuityCount++; 600 continue; 601 } 602 if (format == NULL) { 603 sp<RefBase> object; 604 if (buffer->meta()->findObject("format", &object)) { 605 const char* mime; 606 format = static_cast<MetaData*>(object.get()); 607 isAvc = format != NULL 608 && format->findCString(kKeyMIMEType, &mime) 609 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC); 610 } 611 } 612 if (isAvc && !IsIDR(buffer)) { 613 continue; 614 } 615 616 HLSTime curTime(buffer->meta()); 617 if (startTime < curTime) { 618 ALOGV("trimming from beginning to %lld (not inclusive)", 619 (long long)curTime.mTimeUs); 620 firstMeta = buffer->meta(); 621 break; 622 } 623 } 624 mBuffers.erase(mBuffers.begin(), it); 625 mQueuedDiscontinuityCount -= discontinuityCount; 626 mLatestDequeuedMeta = NULL; 627 return firstMeta; 628} 629 630} // namespace android 631