1/* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.android.tv.tuner.exoplayer.buffer; 18 19import android.media.MediaCodec; 20import android.os.ConditionVariable; 21import android.os.Handler; 22import android.os.HandlerThread; 23import android.os.Message; 24import android.util.ArraySet; 25import android.util.Log; 26import android.util.Pair; 27 28import com.google.android.exoplayer.MediaFormat; 29import com.google.android.exoplayer.SampleHolder; 30import com.google.android.exoplayer.util.MimeTypes; 31import com.android.tv.common.SoftPreconditions; 32import com.android.tv.tuner.exoplayer.buffer.RecordingSampleBuffer.BufferReason; 33 34import java.io.IOException; 35import java.util.LinkedList; 36import java.util.List; 37import java.util.Set; 38import java.util.concurrent.ConcurrentLinkedQueue; 39 40/** 41 * Handles all {@link SampleChunk} I/O operations. 42 * An I/O dedicated thread handles all I/O operations for synchronization. 43 */ 44public class SampleChunkIoHelper implements Handler.Callback { 45 private static final String TAG = "SampleChunkIoHelper"; 46 47 private static final int MAX_READ_BUFFER_SAMPLES = 3; 48 private static final int READ_RESCHEDULING_DELAY_MS = 10; 49 50 private static final int MSG_OPEN_READ = 1; 51 private static final int MSG_OPEN_WRITE = 2; 52 private static final int MSG_CLOSE_READ = 3; 53 private static final int MSG_CLOSE_WRITE = 4; 54 private static final int MSG_READ = 5; 55 private static final int MSG_WRITE = 6; 56 private static final int MSG_RELEASE = 7; 57 58 private final long mSampleChunkDurationUs; 59 private final int mTrackCount; 60 private final List<String> mIds; 61 private final List<MediaFormat> mMediaFormats; 62 private final @BufferReason int mBufferReason; 63 private final BufferManager mBufferManager; 64 private final SamplePool mSamplePool; 65 private final IoCallback mIoCallback; 66 67 private Handler mIoHandler; 68 private final ConcurrentLinkedQueue<SampleHolder> mReadSampleBuffers[]; 69 private final ConcurrentLinkedQueue<SampleHolder> mHandlerReadSampleBuffers[]; 70 private final long[] mWriteIndexEndPositionUs; 71 private final long[] mWriteChunkEndPositionUs; 72 private final SampleChunk.IoState[] mReadIoStates; 73 private final SampleChunk.IoState[] mWriteIoStates; 74 private final Set<Integer> mSelectedTracks = new ArraySet<>(); 75 private long mBufferDurationUs = 0; 76 private boolean mWriteEnded; 77 private boolean mErrorNotified; 78 private boolean mFinished; 79 80 /** 81 * A Callback for I/O events. 82 */ 83 public static abstract class IoCallback { 84 85 /** 86 * Called when there is no sample to read. 87 */ 88 public void onIoReachedEos() { 89 } 90 91 /** 92 * Called when there is an irrecoverable error during I/O. 93 */ 94 public void onIoError() { 95 } 96 } 97 98 private class IoParams { 99 private final int index; 100 private final long positionUs; 101 private final SampleHolder sample; 102 private final ConditionVariable conditionVariable; 103 private final ConcurrentLinkedQueue<SampleHolder> readSampleBuffer; 104 105 private IoParams(int index, long positionUs, SampleHolder sample, 106 ConditionVariable conditionVariable, 107 ConcurrentLinkedQueue<SampleHolder> readSampleBuffer) { 108 this.index = index; 109 this.positionUs = positionUs; 110 this.sample = sample; 111 this.conditionVariable = conditionVariable; 112 this.readSampleBuffer = readSampleBuffer; 113 } 114 } 115 116 /** 117 * Creates {@link SampleChunk} I/O handler. 118 * 119 * @param ids track names 120 * @param mediaFormats {@link android.media.MediaFormat} for each track 121 * @param bufferReason reason to be buffered 122 * @param bufferManager manager of {@link SampleChunk} collections 123 * @param samplePool allocator for a sample 124 * @param ioCallback listeners for I/O events 125 */ 126 public SampleChunkIoHelper(List<String> ids, List<MediaFormat> mediaFormats, 127 @BufferReason int bufferReason, BufferManager bufferManager, SamplePool samplePool, 128 IoCallback ioCallback) { 129 mTrackCount = ids.size(); 130 mIds = ids; 131 mMediaFormats = mediaFormats; 132 mBufferReason = bufferReason; 133 mBufferManager = bufferManager; 134 mSamplePool = samplePool; 135 mIoCallback = ioCallback; 136 137 mReadSampleBuffers = new ConcurrentLinkedQueue[mTrackCount]; 138 mHandlerReadSampleBuffers = new ConcurrentLinkedQueue[mTrackCount]; 139 mWriteIndexEndPositionUs = new long[mTrackCount]; 140 mWriteChunkEndPositionUs = new long[mTrackCount]; 141 mReadIoStates = new SampleChunk.IoState[mTrackCount]; 142 mWriteIoStates = new SampleChunk.IoState[mTrackCount]; 143 144 // Small chunk duration for live playback will give more fine grained storage usage 145 // and eviction handling for trickplay. 146 mSampleChunkDurationUs = 147 bufferReason == RecordingSampleBuffer.BUFFER_REASON_LIVE_PLAYBACK ? 148 RecordingSampleBuffer.MIN_SEEK_DURATION_US : 149 RecordingSampleBuffer.RECORDING_CHUNK_DURATION_US; 150 for (int i = 0; i < mTrackCount; ++i) { 151 mWriteIndexEndPositionUs[i] = RecordingSampleBuffer.MIN_SEEK_DURATION_US; 152 mWriteChunkEndPositionUs[i] = mSampleChunkDurationUs; 153 mReadIoStates[i] = new SampleChunk.IoState(); 154 mWriteIoStates[i] = new SampleChunk.IoState(); 155 } 156 } 157 158 /** 159 * Prepares and initializes for I/O operations. 160 * 161 * @throws IOException 162 */ 163 public void init() throws IOException { 164 HandlerThread handlerThread = new HandlerThread(TAG); 165 handlerThread.start(); 166 mIoHandler = new Handler(handlerThread.getLooper(), this); 167 if (mBufferReason == RecordingSampleBuffer.BUFFER_REASON_RECORDED_PLAYBACK) { 168 for (int i = 0; i < mTrackCount; ++i) { 169 mBufferManager.loadTrackFromStorage(mIds.get(i), mSamplePool); 170 } 171 mWriteEnded = true; 172 } else { 173 for (int i = 0; i < mTrackCount; ++i) { 174 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_OPEN_WRITE, i)); 175 } 176 } 177 } 178 179 /** 180 * Reads a sample if it is available. 181 * 182 * @param index track index 183 * @return {@code null} if a sample is not available, otherwise returns a sample 184 */ 185 public SampleHolder readSample(int index) { 186 SampleHolder sample = mReadSampleBuffers[index].poll(); 187 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_READ, index)); 188 return sample; 189 } 190 191 /** 192 * Writes a sample. 193 * 194 * @param index track index 195 * @param sample to write 196 * @param conditionVariable which will be wait until the write is finished 197 * @throws IOException 198 */ 199 public void writeSample(int index, SampleHolder sample, 200 ConditionVariable conditionVariable) throws IOException { 201 if (mErrorNotified) { 202 throw new IOException("Storage I/O error happened"); 203 } 204 conditionVariable.close(); 205 IoParams params = new IoParams(index, 0, sample, conditionVariable, null); 206 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_WRITE, params)); 207 } 208 209 /** 210 * Starts read from the specified position. 211 * 212 * @param index track index 213 * @param positionUs the specified position 214 */ 215 public void openRead(int index, long positionUs) { 216 // Old mReadSampleBuffers may have a pending read. 217 mReadSampleBuffers[index] = new ConcurrentLinkedQueue<>(); 218 IoParams params = new IoParams(index, positionUs, null, null, mReadSampleBuffers[index]); 219 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_OPEN_READ, params)); 220 } 221 222 /** 223 * Closes read from the specified track. 224 * 225 * @param index track index 226 */ 227 public void closeRead(int index) { 228 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_CLOSE_READ, index)); 229 } 230 231 /** 232 * Notifies writes are finished. 233 */ 234 public void closeWrite() { 235 mIoHandler.sendEmptyMessage(MSG_CLOSE_WRITE); 236 } 237 238 /** 239 * Finishes I/O operations and releases all the resources. 240 * @throws IOException 241 */ 242 public void release() throws IOException { 243 if (mIoHandler == null) { 244 return; 245 } 246 // Finishes all I/O operations. 247 ConditionVariable conditionVariable = new ConditionVariable(); 248 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_RELEASE, conditionVariable)); 249 conditionVariable.block(); 250 251 for (int i = 0; i < mTrackCount; ++i) { 252 mBufferManager.unregisterChunkEvictedListener(mIds.get(i)); 253 } 254 try { 255 if (mBufferReason == RecordingSampleBuffer.BUFFER_REASON_RECORDING && mTrackCount > 0) { 256 // Saves meta information for recording. 257 List<BufferManager.TrackFormat> audios = new LinkedList<>(); 258 List<BufferManager.TrackFormat> videos = new LinkedList<>(); 259 for (int i = 0; i < mTrackCount; ++i) { 260 android.media.MediaFormat format = 261 mMediaFormats.get(i).getFrameworkMediaFormatV16(); 262 format.setLong(android.media.MediaFormat.KEY_DURATION, mBufferDurationUs); 263 if (MimeTypes.isAudio(mMediaFormats.get(i).mimeType)) { 264 audios.add(new BufferManager.TrackFormat(mIds.get(i), format)); 265 } else if (MimeTypes.isVideo(mMediaFormats.get(i).mimeType)) { 266 videos.add(new BufferManager.TrackFormat(mIds.get(i), format)); 267 } 268 } 269 mBufferManager.writeMetaFiles(audios, videos); 270 } 271 } finally { 272 mBufferManager.release(); 273 mIoHandler.getLooper().quitSafely(); 274 } 275 } 276 277 @Override 278 public boolean handleMessage(Message message) { 279 if (mFinished) { 280 return true; 281 } 282 releaseEvictedChunks(); 283 try { 284 switch (message.what) { 285 case MSG_OPEN_READ: 286 doOpenRead((IoParams) message.obj); 287 return true; 288 case MSG_OPEN_WRITE: 289 doOpenWrite((int) message.obj); 290 return true; 291 case MSG_CLOSE_READ: 292 doCloseRead((int) message.obj); 293 return true; 294 case MSG_CLOSE_WRITE: 295 doCloseWrite(); 296 return true; 297 case MSG_READ: 298 doRead((int) message.obj); 299 return true; 300 case MSG_WRITE: 301 doWrite((IoParams) message.obj); 302 // Since only write will increase storage, eviction will be handled here. 303 return true; 304 case MSG_RELEASE: 305 doRelease((ConditionVariable) message.obj); 306 return true; 307 } 308 } catch (IOException e) { 309 mIoCallback.onIoError(); 310 mErrorNotified = true; 311 Log.e(TAG, "IoException happened", e); 312 return true; 313 } 314 return false; 315 } 316 317 private void doOpenRead(IoParams params) throws IOException { 318 int index = params.index; 319 mIoHandler.removeMessages(MSG_READ, index); 320 Pair<SampleChunk, Integer> readPosition = 321 mBufferManager.getReadFile(mIds.get(index), params.positionUs); 322 if (readPosition == null) { 323 String errorMessage = "Chunk ID:" + mIds.get(index) + " pos:" + params.positionUs 324 + "is not found"; 325 SoftPreconditions.checkNotNull(readPosition, TAG, errorMessage); 326 throw new IOException(errorMessage); 327 } 328 mSelectedTracks.add(index); 329 mReadIoStates[index].openRead(readPosition.first, (long) readPosition.second); 330 if (mHandlerReadSampleBuffers[index] != null) { 331 SampleHolder sample; 332 while ((sample = mHandlerReadSampleBuffers[index].poll()) != null) { 333 mSamplePool.releaseSample(sample); 334 } 335 } 336 mHandlerReadSampleBuffers[index] = params.readSampleBuffer; 337 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_READ, index)); 338 } 339 340 private void doOpenWrite(int index) throws IOException { 341 SampleChunk chunk = mBufferManager.createNewWriteFileIfNeeded(mIds.get(index), 0, 342 mSamplePool, null, 0); 343 mWriteIoStates[index].openWrite(chunk); 344 } 345 346 private void doCloseRead(int index) { 347 mSelectedTracks.remove(index); 348 if (mHandlerReadSampleBuffers[index] != null) { 349 SampleHolder sample; 350 while ((sample = mHandlerReadSampleBuffers[index].poll()) != null) { 351 mSamplePool.releaseSample(sample); 352 } 353 } 354 mIoHandler.removeMessages(MSG_READ, index); 355 } 356 357 private void doRead(int index) throws IOException { 358 mIoHandler.removeMessages(MSG_READ, index); 359 if (mHandlerReadSampleBuffers[index].size() >= MAX_READ_BUFFER_SAMPLES) { 360 // If enough samples are buffered, try again few moments later hoping that 361 // buffered samples are consumed. 362 mIoHandler.sendMessageDelayed( 363 mIoHandler.obtainMessage(MSG_READ, index), READ_RESCHEDULING_DELAY_MS); 364 } else { 365 if (mReadIoStates[index].isReadFinished()) { 366 for (int i = 0; i < mTrackCount; ++i) { 367 if (!mReadIoStates[i].isReadFinished()) { 368 return; 369 } 370 } 371 mIoCallback.onIoReachedEos(); 372 return; 373 } 374 SampleHolder sample = mReadIoStates[index].read(); 375 if (sample != null) { 376 mHandlerReadSampleBuffers[index].offer(sample); 377 } else { 378 // Read reached write but write is not finished yet --- wait a few moments to 379 // see if another sample is written. 380 mIoHandler.sendMessageDelayed( 381 mIoHandler.obtainMessage(MSG_READ, index), 382 READ_RESCHEDULING_DELAY_MS); 383 } 384 } 385 } 386 387 private void doWrite(IoParams params) throws IOException { 388 try { 389 if (mWriteEnded) { 390 SoftPreconditions.checkState(false); 391 return; 392 } 393 int index = params.index; 394 SampleHolder sample = params.sample; 395 SampleChunk nextChunk = null; 396 if ((sample.flags & MediaCodec.BUFFER_FLAG_KEY_FRAME) != 0) { 397 if (sample.timeUs > mBufferDurationUs) { 398 mBufferDurationUs = sample.timeUs; 399 } 400 if (sample.timeUs >= mWriteIndexEndPositionUs[index]) { 401 SampleChunk currentChunk = sample.timeUs >= mWriteChunkEndPositionUs[index] ? 402 null : mWriteIoStates[params.index].getChunk(); 403 int currentOffset = (int) mWriteIoStates[params.index].getOffset(); 404 nextChunk = mBufferManager.createNewWriteFileIfNeeded( 405 mIds.get(index), mWriteIndexEndPositionUs[index], mSamplePool, 406 currentChunk, currentOffset); 407 mWriteIndexEndPositionUs[index] = 408 ((sample.timeUs / RecordingSampleBuffer.MIN_SEEK_DURATION_US) + 1) * 409 RecordingSampleBuffer.MIN_SEEK_DURATION_US; 410 if (nextChunk != null) { 411 mWriteChunkEndPositionUs[index] = 412 ((sample.timeUs / mSampleChunkDurationUs) + 1) 413 * mSampleChunkDurationUs; 414 } 415 } 416 } 417 mWriteIoStates[params.index].write(params.sample, nextChunk); 418 } finally { 419 params.conditionVariable.open(); 420 } 421 } 422 423 private void doCloseWrite() throws IOException { 424 if (mWriteEnded) { 425 return; 426 } 427 mWriteEnded = true; 428 boolean readFinished = true; 429 for (int i = 0; i < mTrackCount; ++i) { 430 readFinished = readFinished && mReadIoStates[i].isReadFinished(); 431 mWriteIoStates[i].closeWrite(); 432 } 433 if (readFinished) { 434 mIoCallback.onIoReachedEos(); 435 } 436 } 437 438 private void doRelease(ConditionVariable conditionVariable) { 439 mIoHandler.removeCallbacksAndMessages(null); 440 mFinished = true; 441 conditionVariable.open(); 442 mSelectedTracks.clear(); 443 } 444 445 private void releaseEvictedChunks() { 446 if (mBufferReason != RecordingSampleBuffer.BUFFER_REASON_LIVE_PLAYBACK 447 || mSelectedTracks.isEmpty()) { 448 return; 449 } 450 long currentStartPositionUs = Long.MAX_VALUE; 451 for (int trackIndex : mSelectedTracks) { 452 currentStartPositionUs = Math.min(currentStartPositionUs, 453 mReadIoStates[trackIndex].getStartPositionUs()); 454 } 455 for (int i = 0; i < mTrackCount; ++i) { 456 long evictEndPositionUs = Math.min(mBufferManager.getStartPositionUs(mIds.get(i)), 457 currentStartPositionUs); 458 mBufferManager.evictChunks(mIds.get(i), evictEndPositionUs); 459 } 460 } 461}