AnotherPacketSource.cpp revision d47dfcb5a2e5901c96fc92662cec7aa30f7f8843
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17//#define LOG_NDEBUG 0 18#define LOG_TAG "AnotherPacketSource" 19 20#include "AnotherPacketSource.h" 21 22#include "include/avc_utils.h" 23 24#include <media/stagefright/foundation/ABuffer.h> 25#include <media/stagefright/foundation/ADebug.h> 26#include <media/stagefright/foundation/AMessage.h> 27#include <media/stagefright/foundation/AString.h> 28#include <media/stagefright/foundation/hexdump.h> 29#include <media/stagefright/MediaBuffer.h> 30#include <media/stagefright/MediaDefs.h> 31#include <media/stagefright/MetaData.h> 32#include <media/stagefright/Utils.h> 33#include <utils/Vector.h> 34 35#include <inttypes.h> 36 37namespace android { 38 39const int64_t kNearEOSMarkUs = 2000000ll; // 2 secs 40 41AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta) 42 : mIsAudio(false), 43 mIsVideo(false), 44 mEnabled(true), 45 mFormat(NULL), 46 mLastQueuedTimeUs(0), 47 mEOSResult(OK), 48 mLatestEnqueuedMeta(NULL), 49 mLatestDequeuedMeta(NULL), 50 mQueuedDiscontinuityCount(0) { 51 setFormat(meta); 52} 53 54void AnotherPacketSource::setFormat(const sp<MetaData> &meta) { 55 if (mFormat != NULL) { 56 // Only allowed to be set once. Requires explicit clear to reset. 57 return; 58 } 59 60 mIsAudio = false; 61 mIsVideo = false; 62 63 if (meta == NULL) { 64 return; 65 } 66 67 mFormat = meta; 68 const char *mime; 69 CHECK(meta->findCString(kKeyMIMEType, &mime)); 70 71 if (!strncasecmp("audio/", mime, 6)) { 72 mIsAudio = true; 73 } else if (!strncasecmp("video/", mime, 6)) { 74 mIsVideo = true; 75 } else { 76 CHECK(!strncasecmp("text/", mime, 5)); 77 } 78} 79 80AnotherPacketSource::~AnotherPacketSource() { 81} 82 83status_t AnotherPacketSource::start(MetaData * /* params */) { 84 return OK; 85} 86 87status_t AnotherPacketSource::stop() { 88 return OK; 89} 90 91sp<MetaData> AnotherPacketSource::getFormat() { 92 Mutex::Autolock autoLock(mLock); 93 if (mFormat != NULL) { 94 return mFormat; 95 } 96 97 List<sp<ABuffer> >::iterator it = mBuffers.begin(); 98 while (it != mBuffers.end()) { 99 sp<ABuffer> buffer = *it; 100 int32_t discontinuity; 101 if (!buffer->meta()->findInt32("discontinuity", &discontinuity)) { 102 sp<RefBase> object; 103 if (buffer->meta()->findObject("format", &object)) { 104 setFormat(static_cast<MetaData*>(object.get())); 105 return mFormat; 106 } 107 } 108 109 ++it; 110 } 111 return NULL; 112} 113 114status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) { 115 buffer->clear(); 116 117 Mutex::Autolock autoLock(mLock); 118 while (mEOSResult == OK && mBuffers.empty()) { 119 mCondition.wait(mLock); 120 } 121 122 if (!mBuffers.empty()) { 123 *buffer = *mBuffers.begin(); 124 mBuffers.erase(mBuffers.begin()); 125 126 int32_t discontinuity; 127 if ((*buffer)->meta()->findInt32("discontinuity", &discontinuity)) { 128 if (wasFormatChange(discontinuity)) { 129 mFormat.clear(); 130 } 131 132 --mQueuedDiscontinuityCount; 133 return INFO_DISCONTINUITY; 134 } 135 136 mLatestDequeuedMeta = (*buffer)->meta()->dup(); 137 138 sp<RefBase> object; 139 if ((*buffer)->meta()->findObject("format", &object)) { 140 setFormat(static_cast<MetaData*>(object.get())); 141 } 142 143 return OK; 144 } 145 146 return mEOSResult; 147} 148 149status_t AnotherPacketSource::read( 150 MediaBuffer **out, const ReadOptions *) { 151 *out = NULL; 152 153 Mutex::Autolock autoLock(mLock); 154 while (mEOSResult == OK && mBuffers.empty()) { 155 mCondition.wait(mLock); 156 } 157 158 if (!mBuffers.empty()) { 159 160 const sp<ABuffer> buffer = *mBuffers.begin(); 161 mBuffers.erase(mBuffers.begin()); 162 163 int32_t discontinuity; 164 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 165 if (wasFormatChange(discontinuity)) { 166 mFormat.clear(); 167 } 168 169 return INFO_DISCONTINUITY; 170 } 171 172 mLatestDequeuedMeta = buffer->meta()->dup(); 173 174 sp<RefBase> object; 175 if (buffer->meta()->findObject("format", &object)) { 176 setFormat(static_cast<MetaData*>(object.get())); 177 } 178 179 int64_t timeUs; 180 CHECK(buffer->meta()->findInt64("timeUs", &timeUs)); 181 182 MediaBuffer *mediaBuffer = new MediaBuffer(buffer); 183 184 mediaBuffer->meta_data()->setInt64(kKeyTime, timeUs); 185 186 *out = mediaBuffer; 187 return OK; 188 } 189 190 return mEOSResult; 191} 192 193bool AnotherPacketSource::wasFormatChange( 194 int32_t discontinuityType) const { 195 if (mIsAudio) { 196 return (discontinuityType & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0; 197 } 198 199 if (mIsVideo) { 200 return (discontinuityType & ATSParser::DISCONTINUITY_VIDEO_FORMAT) != 0; 201 } 202 203 return false; 204} 205 206void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) { 207 int32_t damaged; 208 if (buffer->meta()->findInt32("damaged", &damaged) && damaged) { 209 // LOG(VERBOSE) << "discarding damaged AU"; 210 return; 211 } 212 213 Mutex::Autolock autoLock(mLock); 214 mBuffers.push_back(buffer); 215 mCondition.signal(); 216 217 int32_t discontinuity; 218 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 219 // discontinuity handling needs to be consistent with queueDiscontinuity() 220 ++mQueuedDiscontinuityCount; 221 mLastQueuedTimeUs = 0ll; 222 mEOSResult = OK; 223 mLatestEnqueuedMeta = NULL; 224 return; 225 } 226 227 int64_t lastQueuedTimeUs; 228 CHECK(buffer->meta()->findInt64("timeUs", &lastQueuedTimeUs)); 229 mLastQueuedTimeUs = lastQueuedTimeUs; 230 ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)", 231 mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6); 232 233 if (mLatestEnqueuedMeta == NULL) { 234 mLatestEnqueuedMeta = buffer->meta()->dup(); 235 } else { 236 int64_t latestTimeUs = 0; 237 int64_t frameDeltaUs = 0; 238 CHECK(mLatestEnqueuedMeta->findInt64("timeUs", &latestTimeUs)); 239 if (lastQueuedTimeUs > latestTimeUs) { 240 mLatestEnqueuedMeta = buffer->meta()->dup(); 241 frameDeltaUs = lastQueuedTimeUs - latestTimeUs; 242 mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs); 243 } else if (!mLatestEnqueuedMeta->findInt64("durationUs", &frameDeltaUs)) { 244 // For B frames 245 frameDeltaUs = latestTimeUs - lastQueuedTimeUs; 246 mLatestEnqueuedMeta->setInt64("durationUs", frameDeltaUs); 247 } 248 } 249} 250 251void AnotherPacketSource::clear() { 252 Mutex::Autolock autoLock(mLock); 253 254 mBuffers.clear(); 255 mEOSResult = OK; 256 mQueuedDiscontinuityCount = 0; 257 258 mFormat = NULL; 259 mLatestEnqueuedMeta = NULL; 260} 261 262void AnotherPacketSource::queueDiscontinuity( 263 ATSParser::DiscontinuityType type, 264 const sp<AMessage> &extra, 265 bool discard) { 266 Mutex::Autolock autoLock(mLock); 267 268 if (discard) { 269 // Leave only discontinuities in the queue. 270 List<sp<ABuffer> >::iterator it = mBuffers.begin(); 271 while (it != mBuffers.end()) { 272 sp<ABuffer> oldBuffer = *it; 273 274 int32_t oldDiscontinuityType; 275 if (!oldBuffer->meta()->findInt32( 276 "discontinuity", &oldDiscontinuityType)) { 277 it = mBuffers.erase(it); 278 continue; 279 } 280 281 ++it; 282 } 283 } 284 285 mEOSResult = OK; 286 mLastQueuedTimeUs = 0; 287 mLatestEnqueuedMeta = NULL; 288 289 if (type == ATSParser::DISCONTINUITY_NONE) { 290 return; 291 } 292 293 ++mQueuedDiscontinuityCount; 294 sp<ABuffer> buffer = new ABuffer(0); 295 buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type)); 296 buffer->meta()->setMessage("extra", extra); 297 298 mBuffers.push_back(buffer); 299 mCondition.signal(); 300} 301 302void AnotherPacketSource::signalEOS(status_t result) { 303 CHECK(result != OK); 304 305 Mutex::Autolock autoLock(mLock); 306 mEOSResult = result; 307 mCondition.signal(); 308} 309 310bool AnotherPacketSource::hasBufferAvailable(status_t *finalResult) { 311 Mutex::Autolock autoLock(mLock); 312 *finalResult = OK; 313 if (!mEnabled) { 314 return false; 315 } 316 if (!mBuffers.empty()) { 317 return true; 318 } 319 320 *finalResult = mEOSResult; 321 return false; 322} 323 324bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) { 325 Mutex::Autolock autoLock(mLock); 326 *finalResult = OK; 327 if (!mEnabled) { 328 return false; 329 } 330 List<sp<ABuffer> >::iterator it; 331 for (it = mBuffers.begin(); it != mBuffers.end(); it++) { 332 int32_t discontinuity; 333 if (!(*it)->meta()->findInt32("discontinuity", &discontinuity)) { 334 return true; 335 } 336 } 337 338 *finalResult = mEOSResult; 339 return false; 340} 341 342int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) { 343 Mutex::Autolock autoLock(mLock); 344 return getBufferedDurationUs_l(finalResult); 345} 346 347int64_t AnotherPacketSource::getBufferedDurationUs_l(status_t *finalResult) { 348 *finalResult = mEOSResult; 349 350 if (mBuffers.empty()) { 351 return 0; 352 } 353 354 int64_t time1 = -1; 355 int64_t time2 = -1; 356 int64_t durationUs = 0; 357 358 List<sp<ABuffer> >::iterator it = mBuffers.begin(); 359 while (it != mBuffers.end()) { 360 const sp<ABuffer> &buffer = *it; 361 362 int64_t timeUs; 363 if (buffer->meta()->findInt64("timeUs", &timeUs)) { 364 if (time1 < 0 || timeUs < time1) { 365 time1 = timeUs; 366 } 367 368 if (time2 < 0 || timeUs > time2) { 369 time2 = timeUs; 370 } 371 } else { 372 // This is a discontinuity, reset everything. 373 durationUs += time2 - time1; 374 time1 = time2 = -1; 375 } 376 377 ++it; 378 } 379 380 return durationUs + (time2 - time1); 381} 382 383// A cheaper but less precise version of getBufferedDurationUs that we would like to use in 384// LiveSession::dequeueAccessUnit to trigger downwards adaptation. 385int64_t AnotherPacketSource::getEstimatedDurationUs() { 386 Mutex::Autolock autoLock(mLock); 387 if (mBuffers.empty()) { 388 return 0; 389 } 390 391 if (mQueuedDiscontinuityCount > 0) { 392 status_t finalResult; 393 return getBufferedDurationUs_l(&finalResult); 394 } 395 396 List<sp<ABuffer> >::iterator it = mBuffers.begin(); 397 sp<ABuffer> buffer = *it; 398 399 int64_t startTimeUs; 400 buffer->meta()->findInt64("timeUs", &startTimeUs); 401 if (startTimeUs < 0) { 402 return 0; 403 } 404 405 it = mBuffers.end(); 406 --it; 407 buffer = *it; 408 409 int64_t endTimeUs; 410 buffer->meta()->findInt64("timeUs", &endTimeUs); 411 if (endTimeUs < 0) { 412 return 0; 413 } 414 415 int64_t diffUs; 416 if (endTimeUs > startTimeUs) { 417 diffUs = endTimeUs - startTimeUs; 418 } else { 419 diffUs = startTimeUs - endTimeUs; 420 } 421 return diffUs; 422} 423 424status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) { 425 *timeUs = 0; 426 427 Mutex::Autolock autoLock(mLock); 428 429 if (mBuffers.empty()) { 430 return mEOSResult != OK ? mEOSResult : -EWOULDBLOCK; 431 } 432 433 sp<ABuffer> buffer = *mBuffers.begin(); 434 CHECK(buffer->meta()->findInt64("timeUs", timeUs)); 435 436 return OK; 437} 438 439bool AnotherPacketSource::isFinished(int64_t duration) const { 440 if (duration > 0) { 441 int64_t diff = duration - mLastQueuedTimeUs; 442 if (diff < kNearEOSMarkUs && diff > -kNearEOSMarkUs) { 443 ALOGV("Detecting EOS due to near end"); 444 return true; 445 } 446 } 447 return (mEOSResult != OK); 448} 449 450sp<AMessage> AnotherPacketSource::getLatestEnqueuedMeta() { 451 Mutex::Autolock autoLock(mLock); 452 return mLatestEnqueuedMeta; 453} 454 455sp<AMessage> AnotherPacketSource::getLatestDequeuedMeta() { 456 Mutex::Autolock autoLock(mLock); 457 return mLatestDequeuedMeta; 458} 459 460void AnotherPacketSource::enable(bool enable) { 461 Mutex::Autolock autoLock(mLock); 462 mEnabled = enable; 463} 464 465/* 466 * returns the sample meta that's delayUs after queue head 467 * (NULL if such sample is unavailable) 468 */ 469sp<AMessage> AnotherPacketSource::getMetaAfterLastDequeued(int64_t delayUs) { 470 Mutex::Autolock autoLock(mLock); 471 int64_t firstUs = -1; 472 int64_t lastUs = -1; 473 int64_t durationUs = 0; 474 475 List<sp<ABuffer> >::iterator it; 476 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { 477 const sp<ABuffer> &buffer = *it; 478 int32_t discontinuity; 479 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 480 durationUs += lastUs - firstUs; 481 firstUs = -1; 482 lastUs = -1; 483 continue; 484 } 485 int64_t timeUs; 486 if (buffer->meta()->findInt64("timeUs", &timeUs)) { 487 if (firstUs < 0) { 488 firstUs = timeUs; 489 } 490 if (lastUs < 0 || timeUs > lastUs) { 491 lastUs = timeUs; 492 } 493 if (durationUs + (lastUs - firstUs) >= delayUs) { 494 return buffer->meta(); 495 } 496 } 497 } 498 return NULL; 499} 500 501/* 502 * removes samples with time equal or after meta 503 */ 504void AnotherPacketSource::trimBuffersAfterMeta( 505 const sp<AMessage> &meta) { 506 if (meta == NULL) { 507 ALOGW("trimming with NULL meta, ignoring"); 508 return; 509 } 510 511 Mutex::Autolock autoLock(mLock); 512 if (mBuffers.empty()) { 513 return; 514 } 515 516 HLSTime stopTime(meta); 517 ALOGV("trimBuffersAfterMeta: discontinuitySeq %zu, timeUs %lld", 518 stopTime.mSeq, (long long)stopTime.mTimeUs); 519 520 List<sp<ABuffer> >::iterator it; 521 sp<AMessage> newLatestEnqueuedMeta = NULL; 522 int64_t newLastQueuedTimeUs = 0; 523 size_t newDiscontinuityCount = 0; 524 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { 525 const sp<ABuffer> &buffer = *it; 526 int32_t discontinuity; 527 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 528 newDiscontinuityCount++; 529 continue; 530 } 531 532 HLSTime curTime(buffer->meta()); 533 if (!(curTime < stopTime)) { 534 ALOGV("trimming from %lld (inclusive) to end", 535 (long long)curTime.mTimeUs); 536 break; 537 } 538 newLatestEnqueuedMeta = buffer->meta(); 539 newLastQueuedTimeUs = curTime.mTimeUs; 540 } 541 mBuffers.erase(it, mBuffers.end()); 542 mLatestEnqueuedMeta = newLatestEnqueuedMeta; 543 mLastQueuedTimeUs = newLastQueuedTimeUs; 544 mQueuedDiscontinuityCount = newDiscontinuityCount; 545} 546 547/* 548 * removes samples with time equal or before meta; 549 * returns first sample left in the queue. 550 * 551 * (for AVC, if trim happens, the samples left will always start 552 * at next IDR.) 553 */ 554sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta( 555 const sp<AMessage> &meta) { 556 HLSTime startTime(meta); 557 ALOGV("trimBuffersBeforeMeta: discontinuitySeq %zu, timeUs %lld", 558 startTime.mSeq, (long long)startTime.mTimeUs); 559 560 sp<AMessage> firstMeta; 561 Mutex::Autolock autoLock(mLock); 562 if (mBuffers.empty()) { 563 return NULL; 564 } 565 566 sp<MetaData> format; 567 bool isAvc = false; 568 569 List<sp<ABuffer> >::iterator it; 570 size_t discontinuityCount = 0; 571 for (it = mBuffers.begin(); it != mBuffers.end(); ++it) { 572 const sp<ABuffer> &buffer = *it; 573 int32_t discontinuity; 574 if (buffer->meta()->findInt32("discontinuity", &discontinuity)) { 575 format = NULL; 576 isAvc = false; 577 discontinuityCount++; 578 continue; 579 } 580 if (format == NULL) { 581 sp<RefBase> object; 582 if (buffer->meta()->findObject("format", &object)) { 583 const char* mime; 584 format = static_cast<MetaData*>(object.get()); 585 isAvc = format != NULL 586 && format->findCString(kKeyMIMEType, &mime) 587 && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC); 588 } 589 } 590 if (isAvc && !IsIDR(buffer)) { 591 continue; 592 } 593 594 HLSTime curTime(buffer->meta()); 595 if (startTime < curTime) { 596 ALOGV("trimming from beginning to %lld (not inclusive)", 597 (long long)curTime.mTimeUs); 598 firstMeta = buffer->meta(); 599 break; 600 } 601 } 602 mBuffers.erase(mBuffers.begin(), it); 603 mQueuedDiscontinuityCount -= discontinuityCount; 604 mLatestDequeuedMeta = NULL; 605 return firstMeta; 606} 607 608} // namespace android 609