AnotherPacketSource.cpp revision 4123d6db0642cd13e69230705b12d6b6fee6f73f
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "AnotherPacketSource" 19 20#include "AnotherPacketSource.h" 21 22#include "include/avc_utils.h" 23 24#include <media/stagefright/foundation/ABuffer.h> 25#include <media/stagefright/foundation/ADebug.h> 26#include <media/stagefright/foundation/AMessage.h> 27#include <media/stagefright/foundation/AString.h> 28#include <media/stagefright/foundation/hexdump.h> 29#include <media/stagefright/MediaBuffer.h> 30#include <media/stagefright/MediaDefs.h> 31#include <media/stagefright/MetaData.h> 32#include <media/stagefright/Utils.h> 33#include <utils/Vector.h> 34 35#include <inttypes.h> 36 37namespace android { 38 39const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs 40 41AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta) 42 : mIsAudio(false), 43 mIsVideo(false), 44 mEnabled(true), 45 mFormat(NULL), 46 mLastQueuedTimeUs(0), 47 mEOSResult(OK), 48 mLatestEnqueuedMeta(NULL), 49 mLatestDequeuedMeta(NULL), 50 mQueuedDiscontinuityCount(0) { 51 setFormat(meta); 52} 53 54void AnotherPacketSource::setFormat(const sp<MetaData> &meta) { 55 if (mFormat != NULL) { 56 // Only allowed to be set once. Requires explicit clear to reset. 57 return; 58 } 59 60 mIsAudio = false; 61 mIsVideo = false; 62 63 if (meta == NULL) { 64 return; 65 } 66 67 mFormat = meta; 68 const char *mime; 69 CHECK(meta->findCString(kKeyMIMEType, &mime)); 70 71 if (!strncasecmp("audio/", mime, 6)) { 72 mIsAudio = true; 73 } else if (!strncasecmp("video/", mime, 6)) { 74 mIsVideo = true; 75 } else { 76 CHECK(!strncasecmp("text/", mime, 5)); 77 } 78} 79 80AnotherPacketSource::~AnotherPacketSource() { 81} 82 83status_t AnotherPacketSource::start(MetaData * /* params */) { 84 return OK; 85} 86 87status_t AnotherPacketSource::stop() { 88 return OK; 89} 90 91sp<MetaData> AnotherPacketSource::getFormat() { 92 Mutex::Autolock autoLock(mLock); 93 if (mFormat != NULL) { 94 return mFormat; 95 } 96 97 List<sp<ABuffer> >::iterator it = mBuffers.begin(); 98 while (it != mBuffers.end()) { 99 sp<ABuffer> buffer = *it; 100 int32_t discontinuity; 101 if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) { 102 sp<RefBase> object; 103 if (buffer->meta()->findObject("format", &object)) { 104 setFormat(static_cast<MetaData*>(object.get())); 105 return mFormat; 106 } 107 } 108 109 ++it; 110 } 111 return NULL; 112} 113 114status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) { 115 buffer->clear(); 116 117 Mutex::Autolock autoLock(mLock); 118 while (mEOSResult == OK && mBuffers.empty()) { 119 mCondition.wait(mLock); 120 } 121 122 if (!mBuffers.empty()) { 123 *buffer = *mBuffers.begin(); 124 mBuffers.erase(mBuffers.begin()); 125 126 int32_t discontinuity; 127 if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) { 128 if (wasFormatChange(discontinuity)) { 129 mFormat.clear(); 130 } 131 132 --mQueuedDiscontinuityCount; 133 return INFO_DISCONTINUITY; 134 } 135 136 mLatestDequeuedMeta = (*buffer)->meta()->dup(); 137 138 sp<RefBase> object; 139 if ((*buffer)->meta()->findObject("format", &object)) { 140 setFormat(static_cast<MetaData*>(object.get())); 141 } 142 143 return OK; 144 } 145 146 return mEOSResult; 147} 148 149status_t AnotherPacketSource::read( 150 MediaBuffer **out, const ReadOptions *) { 151 *out = NULL; 152 153 Mutex::Autolock autoLock(mLock); 154 while (mEOSResult == OK && mBuffers.empty()) { 155 mCondition.wait(mLock); 156 } 157 158 if (!mBuffers.empty()) { 159 160 const sp<ABuffer> buffer = *mBuffers.begin(); 161 mBuffers.erase(mBuffers.begin()); 162 163 int32_t discontinuity; 164 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 165 if (wasFormatChange(discontinuity)) { 166 mFormat.clear(); 167 } 168 169 return INFO_DISCONTINUITY; 170 } 171 172 mLatestDequeuedMeta = buffer->meta()->dup(); 173 174 sp<RefBase> object; 175 if (buffer->meta()->findObject("format", &object)) { 176 setFormat(static_cast<MetaData*>(object.get())); 177 } 178 179 int64_t timeUs; 180 CHECK(buffer->meta()->findInt64("timeUs", &timeUs)); 181 182 MediaBuffer *mediaBuffer = new MediaBuffer(buffer); 183 184 mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs); 185 186 int32_t isSync; 187 if (buffer->meta()->findInt32("isSync", &isSync)) { 188 mediaBuffer->meta_data()->setInt32(kKeyIsSyncFrame, isSync); 189 } 190 191 *out = mediaBuffer; 192 return OK; 193 } 194 195 return mEOSResult; 196} 197 198bool AnotherPacketSource::wasFormatChange( 199 int32_t discontinuityType) const { 200 if (mIsAudio) { 201 return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0; 202 } 203 204 if (mIsVideo) { 205 return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0; 206 } 207 208 return false; 209} 210 211void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) { 212 int32_t damaged; 213 if (buffer->meta()->findInt32("damaged", &damaged) && damaged) { 214 // LOG(VERBOSE) << "discarding damaged AU"; 215 return; 216 } 217 218 Mutex::Autolock autoLock(mLock); 219 mBuffers.push_back(buffer); 220 mCondition.signal(); 221 222 int32_t discontinuity; 223 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 224 // discontinuity handling needs to be consistent with queueDiscontinuity() 225 ++mQueuedDiscontinuityCount; 226 mLastQueuedTimeUs = 0ll; 227 mEOSResult = OK; 228 mLatestEnqueuedMeta = NULL; 229 return; 230 } 231 232 int64_t lastQueuedTimeUs; 233 CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs)); 234 mLastQueuedTimeUs = lastQueuedTimeUs; 235 ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)", 236 mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6); 237 238 if (mLatestEnqueuedMeta == NULL) { 239 mLatestEnqueuedMeta = buffer->meta()->dup(); 240 } else { 241 int64_t latestTimeUs = 0; 242 int64_t frameDeltaUs = 0; 243 CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs)); 244 if (lastQueuedTimeUs > latestTimeUs) { 245 mLatestEnqueuedMeta = buffer->meta()->dup(); 246 frameDeltaUs = lastQueuedTimeUs - latestTimeUs; 247 mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs); 248 } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) { 249 // For B frames 250 frameDeltaUs = latestTimeUs - lastQueuedTimeUs; 251 mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs); 252 } 253 } 254} 255 256void AnotherPacketSource::clear() { 257 Mutex::Autolock autoLock(mLock); 258 259 mBuffers.clear(); 260 mEOSResult = OK; 261 mQueuedDiscontinuityCount = 0; 262 263 mFormat = NULL; 264 mLatestEnqueuedMeta = NULL; 265} 266 267void AnotherPacketSource::queueDiscontinuity( 268 ATSParser::DiscontinuityType type, 269 const sp<AMessage> &extra, 270 bool discard) { 271 Mutex::Autolock autoLock(mLock); 272 273 if (discard) { 274 // Leave only discontinuities in the queue. 275 List<sp<ABuffer> >::iterator it = mBuffers.begin(); 276 while (it != mBuffers.end()) { 277 sp<ABuffer> oldBuffer = *it; 278 279 int32_t oldDiscontinuityType; 280 if (!oldBuffer->meta()->findInt32( 281 "discontinuity", &oldDiscontinuityType)) { 282 it = mBuffers.erase(it); 283 continue; 284 } 285 286 ++it; 287 } 288 } 289 290 mEOSResult = OK; 291 mLastQueuedTimeUs = 0; 292 mLatestEnqueuedMeta = NULL; 293 294 if (type == ATSParser::DISCONTINUITY_NONE) { 295 return; 296 } 297 298 ++mQueuedDiscontinuityCount; 299 sp<ABuffer> buffer = new ABuffer(0); 300 buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type)); 301 buffer->meta()->setMessage("extra", extra); 302 303 mBuffers.push_back(buffer); 304 mCondition.signal(); 305} 306 307void AnotherPacketSource::signalEOS(status_t result) { 308 CHECK(result != OK); 309 310 Mutex::Autolock autoLock(mLock); 311 mEOSResult = result; 312 mCondition.signal(); 313} 314 315bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) { 316 Mutex::Autolock autoLock(mLock); 317 *finalResult = OK; 318 if (!mEnabled) { 319 return false; 320 } 321 if (!mBuffers.empty()) { 322 return true; 323 } 324 325 *finalResult = mEOSResult; 326 return false; 327} 328 329bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) { 330 Mutex::Autolock autoLock(mLock); 331 *finalResult = OK; 332 if (!mEnabled) { 333 return false; 334 } 335 List<sp<ABuffer> >::iterator it; 336 for (it = mBuffers.begin(); it != mBuffers.end(); it++) { 337 int32_t discontinuity; 338 if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) { 339 return true; 340 } 341 } 342 343 *finalResult = mEOSResult; 344 return false; 345} 346 347int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) { 348 Mutex::Autolock autoLock(mLock); 349 return getBufferedDurationUs_l(finalResult); 350} 351 352int64_t AnotherPacketSource::getBufferedDurationUs_l(status_t *finalResult) { 353 *finalResult = mEOSResult; 354 355 if (mBuffers.empty()) { 356 return 0; 357 } 358 359 int64_t time1 = -1; 360 int64_t time2 = -1; 361 int64_t durationUs = 0; 362 363 List<sp<ABuffer> >::iterator it; 364 for (it = mBuffers.begin(); it != mBuffers.end(); it++) { 365 const sp<ABuffer> &buffer = *it; 366 367 int32_t discard; 368 if (buffer->meta()->findInt32("discard", &discard) && discard) { 369 continue; 370 } 371 372 int64_t timeUs; 373 if (buffer->meta()->findInt64("timeUs", &timeUs)) { 374 if (time1 < 0 || timeUs < time1) { 375 time1 = timeUs; 376 } 377 378 if (time2 < 0 || timeUs > time2) { 379 time2 = timeUs; 380 } 381 } else { 382 // This is a discontinuity, reset everything. 383 durationUs += time2 - time1; 384 time1 = time2 = -1; 385 } 386 } 387 388 return durationUs + (time2 - time1); 389} 390 391// A cheaper but less precise version of getBufferedDurationUs that we would like to use in 392// LiveSession::dequeueAccessUnit to trigger downwards adaptation. 393int64_t AnotherPacketSource::getEstimatedDurationUs() { 394 Mutex::Autolock autoLock(mLock); 395 if (mBuffers.empty()) { 396 return 0; 397 } 398 399 if (mQueuedDiscontinuityCount > 0) { 400 status_t finalResult; 401 return getBufferedDurationUs_l(&finalResult); 402 } 403 404 sp<ABuffer> buffer; 405 int32_t discard; 406 int64_t startTimeUs = -1ll; 407 List<sp<ABuffer> >::iterator it; 408 for (it = mBuffers.begin(); it != mBuffers.end(); it++) { 409 buffer = *it; 410 if (buffer->meta()->findInt32("discard", &discard) && discard) { 411 continue; 412 } 413 buffer->meta()->findInt64("timeUs", &startTimeUs); 414 break; 415 } 416 417 if (startTimeUs < 0) { 418 return 0; 419 } 420 421 it = mBuffers.end(); 422 --it; 423 buffer = *it; 424 425 int64_t endTimeUs; 426 buffer->meta()->findInt64("timeUs", &endTimeUs); 427 if (endTimeUs < 0) { 428 return 0; 429 } 430 431 int64_t diffUs; 432 if (endTimeUs > startTimeUs) { 433 diffUs = endTimeUs - startTimeUs; 434 } else { 435 diffUs = startTimeUs - endTimeUs; 436 } 437 return diffUs; 438} 439 440status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) { 441 *timeUs = 0; 442 443 Mutex::Autolock autoLock(mLock); 444 445 if (mBuffers.empty()) { 446 return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK; 447 } 448 449 sp<ABuffer> buffer = *mBuffers.begin(); 450 CHECK(buffer->meta()->findInt64("timeUs", timeUs)); 451 452 return OK; 453} 454 455bool AnotherPacketSource::isFinished(int64_t duration) const { 456 if (duration > 0) { 457 int64_t diff = duration - mLastQueuedTimeUs; 458 if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) { 459 ALOGV("Detecting EOS due to near end"); 460 return true; 461 } 462 } 463 return (mEOSResult != OK); 464} 465 466sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() { 467 Mutex::Autolock autoLock(mLock); 468 return mLatestEnqueuedMeta; 469} 470 471sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() { 472 Mutex::Autolock autoLock(mLock); 473 return mLatestDequeuedMeta; 474} 475 476void AnotherPacketSource::enable(bool enable) { 477 Mutex::Autolock autoLock(mLock); 478 mEnabled = enable; 479} 480 481/* 482 * returns the sample meta that's delayUs after queue head 483 * (NULL if such sample is unavailable) 484 */ 485sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) { 486 Mutex::Autolock autoLock(mLock); 487 int64_t firstUs = -1; 488 int64_t lastUs = -1; 489 int64_t durationUs = 0; 490 491 List<sp<ABuffer> >::iterator it; 492 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { 493 const sp<ABuffer> &buffer = *it; 494 int32_t discontinuity; 495 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 496 durationUs += lastUs - firstUs; 497 firstUs = -1; 498 lastUs = -1; 499 continue; 500 } 501 int64_t timeUs; 502 if (buffer->meta()->findInt64("timeUs", &timeUs)) { 503 if (firstUs < 0) { 504 firstUs = timeUs; 505 } 506 if (lastUs < 0 || timeUs > lastUs) { 507 lastUs = timeUs; 508 } 509 if (durationUs + (lastUs - firstUs) >= delayUs) { 510 return buffer->meta(); 511 } 512 } 513 } 514 return NULL; 515} 516 517/* 518 * removes samples with time equal or after meta 519 */ 520void AnotherPacketSource::trimBuffersAfterMeta( 521 const sp<AMessage> &meta) { 522 if (meta == NULL) { 523 ALOGW("trimming with NULL meta, ignoring"); 524 return; 525 } 526 527 Mutex::Autolock autoLock(mLock); 528 if (mBuffers.empty()) { 529 return; 530 } 531 532 HLSTime stopTime(meta); 533 ALOGV("trimBuffersAfterMeta: discontinuitySeq %d, timeUs %lld", 534 stopTime.mSeq, (long long)stopTime.mTimeUs); 535 536 List<sp<ABuffer> >::iterator it; 537 sp<AMessage> newLatestEnqueuedMeta = NULL; 538 int64_t newLastQueuedTimeUs = 0; 539 size_t newDiscontinuityCount = 0; 540 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { 541 const sp<ABuffer> &buffer = *it; 542 int32_t discontinuity; 543 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 544 newDiscontinuityCount++; 545 continue; 546 } 547 548 HLSTime curTime(buffer->meta()); 549 if (!(curTime < stopTime)) { 550 ALOGV("trimming from %lld (inclusive) to end", 551 (long long)curTime.mTimeUs); 552 break; 553 } 554 newLatestEnqueuedMeta = buffer->meta(); 555 newLastQueuedTimeUs = curTime.mTimeUs; 556 } 557 mBuffers.erase(it, mBuffers.end()); 558 mLatestEnqueuedMeta = newLatestEnqueuedMeta; 559 mLastQueuedTimeUs = newLastQueuedTimeUs; 560 mQueuedDiscontinuityCount = newDiscontinuityCount; 561} 562 563/* 564 * removes samples with time equal or before meta; 565 * returns first sample left in the queue. 566 * 567 * (for AVC, if trim happens, the samples left will always start 568 * at next IDR.) 569 */ 570sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta( 571 const sp<AMessage> &meta) { 572 HLSTime startTime(meta); 573 ALOGV("trimBuffersBeforeMeta: discontinuitySeq %d, timeUs %lld", 574 startTime.mSeq, (long long)startTime.mTimeUs); 575 576 sp<AMessage> firstMeta; 577 Mutex::Autolock autoLock(mLock); 578 if (mBuffers.empty()) { 579 return NULL; 580 } 581 582 sp<MetaData> format; 583 bool isAvc = false; 584 585 List<sp<ABuffer> >::iterator it; 586 size_t discontinuityCount = 0; 587 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { 588 const sp<ABuffer> &buffer = *it; 589 int32_t discontinuity; 590 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 591 format = NULL; 592 isAvc = false; 593 discontinuityCount++; 594 continue; 595 } 596 if (format == NULL) { 597 sp<RefBase> object; 598 if (buffer->meta()->findObject("format", &object)) { 599 const char* mime; 600 format = static_cast<MetaData*>(object.get()); 601 isAvc = format != NULL 602 && format->findCString(kKeyMIMEType, &mime) 603 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC); 604 } 605 } 606 if (isAvc && !IsIDR(buffer)) { 607 continue; 608 } 609 610 HLSTime curTime(buffer->meta()); 611 if (startTime < curTime) { 612 ALOGV("trimming from beginning to %lld (not inclusive)", 613 (long long)curTime.mTimeUs); 614 firstMeta = buffer->meta(); 615 break; 616 } 617 } 618 mBuffers.erase(mBuffers.begin(), it); 619 mQueuedDiscontinuityCount -= discontinuityCount; 620 mLatestDequeuedMeta = NULL; 621 return firstMeta; 622} 623 624} // namespace android 625