1/*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.tv.tuner.exoplayer.buffer;
18
19import android.media.MediaCodec;
20import android.os.ConditionVariable;
21import android.os.Handler;
22import android.os.HandlerThread;
23import android.os.Message;
24import android.util.ArraySet;
25import android.util.Log;
26import android.util.Pair;
27
28import com.google.android.exoplayer.MediaFormat;
29import com.google.android.exoplayer.SampleHolder;
30import com.google.android.exoplayer.util.MimeTypes;
31import com.android.tv.common.SoftPreconditions;
32import com.android.tv.tuner.exoplayer.buffer.RecordingSampleBuffer.BufferReason;
33
34import java.io.IOException;
35import java.util.LinkedList;
36import java.util.List;
37import java.util.Set;
38import java.util.concurrent.ConcurrentLinkedQueue;
39
40/**
41 * Handles all {@link SampleChunk} I/O operations.
42 * An I/O dedicated thread handles all I/O operations for synchronization.
43 */
44public class SampleChunkIoHelper implements Handler.Callback {
45    private static final String TAG = "SampleChunkIoHelper";
46
47    private static final int MAX_READ_BUFFER_SAMPLES = 3;
48    private static final int READ_RESCHEDULING_DELAY_MS = 10;
49
50    private static final int MSG_OPEN_READ = 1;
51    private static final int MSG_OPEN_WRITE = 2;
52    private static final int MSG_CLOSE_READ = 3;
53    private static final int MSG_CLOSE_WRITE = 4;
54    private static final int MSG_READ = 5;
55    private static final int MSG_WRITE = 6;
56    private static final int MSG_RELEASE = 7;
57
58    private final long mSampleChunkDurationUs;
59    private final int mTrackCount;
60    private final List<String> mIds;
61    private final List<MediaFormat> mMediaFormats;
62    private final @BufferReason int mBufferReason;
63    private final BufferManager mBufferManager;
64    private final SamplePool mSamplePool;
65    private final IoCallback mIoCallback;
66
67    private Handler mIoHandler;
68    private final ConcurrentLinkedQueue<SampleHolder> mReadSampleBuffers[];
69    private final ConcurrentLinkedQueue<SampleHolder> mHandlerReadSampleBuffers[];
70    private final long[] mWriteIndexEndPositionUs;
71    private final long[] mWriteChunkEndPositionUs;
72    private final SampleChunk.IoState[] mReadIoStates;
73    private final SampleChunk.IoState[] mWriteIoStates;
74    private final Set<Integer> mSelectedTracks = new ArraySet<>();
75    private long mBufferDurationUs = 0;
76    private boolean mWriteEnded;
77    private boolean mErrorNotified;
78    private boolean mFinished;
79
80    /**
81     * A Callback for I/O events.
82     */
83    public static abstract class IoCallback {
84
85        /**
86         * Called when there is no sample to read.
87         */
88        public void onIoReachedEos() {
89        }
90
91        /**
92         * Called when there is an irrecoverable error during I/O.
93         */
94        public void onIoError() {
95        }
96    }
97
98    private class IoParams {
99        private final int index;
100        private final long positionUs;
101        private final SampleHolder sample;
102        private final ConditionVariable conditionVariable;
103        private final ConcurrentLinkedQueue<SampleHolder> readSampleBuffer;
104
105        private IoParams(int index, long positionUs, SampleHolder sample,
106                ConditionVariable conditionVariable,
107                ConcurrentLinkedQueue<SampleHolder> readSampleBuffer) {
108            this.index = index;
109            this.positionUs = positionUs;
110            this.sample = sample;
111            this.conditionVariable = conditionVariable;
112            this.readSampleBuffer = readSampleBuffer;
113        }
114    }
115
116    /**
117     * Creates {@link SampleChunk} I/O handler.
118     *
119     * @param ids track names
120     * @param mediaFormats {@link android.media.MediaFormat} for each track
121     * @param bufferReason reason to be buffered
122     * @param bufferManager manager of {@link SampleChunk} collections
123     * @param samplePool allocator for a sample
124     * @param ioCallback listeners for I/O events
125     */
126    public SampleChunkIoHelper(List<String> ids, List<MediaFormat> mediaFormats,
127            @BufferReason int bufferReason, BufferManager bufferManager, SamplePool samplePool,
128            IoCallback ioCallback) {
129        mTrackCount = ids.size();
130        mIds = ids;
131        mMediaFormats = mediaFormats;
132        mBufferReason = bufferReason;
133        mBufferManager = bufferManager;
134        mSamplePool = samplePool;
135        mIoCallback = ioCallback;
136
137        mReadSampleBuffers = new ConcurrentLinkedQueue[mTrackCount];
138        mHandlerReadSampleBuffers = new ConcurrentLinkedQueue[mTrackCount];
139        mWriteIndexEndPositionUs = new long[mTrackCount];
140        mWriteChunkEndPositionUs = new long[mTrackCount];
141        mReadIoStates = new SampleChunk.IoState[mTrackCount];
142        mWriteIoStates = new SampleChunk.IoState[mTrackCount];
143
144        // Small chunk duration for live playback will give more fine grained storage usage
145        // and eviction handling for trickplay.
146        mSampleChunkDurationUs =
147                bufferReason == RecordingSampleBuffer.BUFFER_REASON_LIVE_PLAYBACK ?
148                        RecordingSampleBuffer.MIN_SEEK_DURATION_US :
149                        RecordingSampleBuffer.RECORDING_CHUNK_DURATION_US;
150        for (int i = 0; i < mTrackCount; ++i) {
151            mWriteIndexEndPositionUs[i] = RecordingSampleBuffer.MIN_SEEK_DURATION_US;
152            mWriteChunkEndPositionUs[i] = mSampleChunkDurationUs;
153            mReadIoStates[i] = new SampleChunk.IoState();
154            mWriteIoStates[i] = new SampleChunk.IoState();
155        }
156    }
157
158    /**
159     * Prepares and initializes for I/O operations.
160     *
161     * @throws IOException
162     */
163    public void init() throws IOException {
164        HandlerThread handlerThread = new HandlerThread(TAG);
165        handlerThread.start();
166        mIoHandler = new Handler(handlerThread.getLooper(), this);
167        if (mBufferReason == RecordingSampleBuffer.BUFFER_REASON_RECORDED_PLAYBACK) {
168            for (int i = 0; i < mTrackCount; ++i) {
169                mBufferManager.loadTrackFromStorage(mIds.get(i), mSamplePool);
170            }
171            mWriteEnded = true;
172        } else {
173            for (int i = 0; i < mTrackCount; ++i) {
174                mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_OPEN_WRITE, i));
175            }
176        }
177    }
178
179    /**
180     * Reads a sample if it is available.
181     *
182     * @param index track index
183     * @return {@code null} if a sample is not available, otherwise returns a sample
184     */
185    public SampleHolder readSample(int index) {
186        SampleHolder sample = mReadSampleBuffers[index].poll();
187        mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_READ, index));
188        return sample;
189    }
190
191    /**
192     * Writes a sample.
193     *
194     * @param index track index
195     * @param sample to write
196     * @param conditionVariable which will be wait until the write is finished
197     * @throws IOException
198     */
199    public void writeSample(int index, SampleHolder sample,
200            ConditionVariable conditionVariable) throws IOException {
201        if (mErrorNotified) {
202            throw new IOException("Storage I/O error happened");
203        }
204        conditionVariable.close();
205        IoParams params = new IoParams(index, 0, sample, conditionVariable, null);
206        mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_WRITE, params));
207    }
208
209    /**
210     * Starts read from the specified position.
211     *
212     * @param index track index
213     * @param positionUs the specified position
214     */
215    public void openRead(int index, long positionUs) {
216        // Old mReadSampleBuffers may have a pending read.
217        mReadSampleBuffers[index] = new ConcurrentLinkedQueue<>();
218        IoParams params = new IoParams(index, positionUs, null, null, mReadSampleBuffers[index]);
219        mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_OPEN_READ, params));
220    }
221
222    /**
223     * Closes read from the specified track.
224     *
225     * @param index track index
226     */
227    public void closeRead(int index) {
228        mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_CLOSE_READ, index));
229    }
230
231    /**
232     * Notifies writes are finished.
233     */
234    public void closeWrite() {
235        mIoHandler.sendEmptyMessage(MSG_CLOSE_WRITE);
236    }
237
238    /**
239     * Finishes I/O operations and releases all the resources.
240     * @throws IOException
241     */
242    public void release() throws IOException {
243        if (mIoHandler == null) {
244            return;
245        }
246        // Finishes all I/O operations.
247        ConditionVariable conditionVariable = new ConditionVariable();
248        mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_RELEASE, conditionVariable));
249        conditionVariable.block();
250
251        for (int i = 0; i < mTrackCount; ++i) {
252            mBufferManager.unregisterChunkEvictedListener(mIds.get(i));
253        }
254        try {
255            if (mBufferReason == RecordingSampleBuffer.BUFFER_REASON_RECORDING && mTrackCount > 0) {
256                // Saves meta information for recording.
257                List<BufferManager.TrackFormat> audios = new LinkedList<>();
258                List<BufferManager.TrackFormat> videos = new LinkedList<>();
259                for (int i = 0; i < mTrackCount; ++i) {
260                    android.media.MediaFormat format =
261                            mMediaFormats.get(i).getFrameworkMediaFormatV16();
262                    format.setLong(android.media.MediaFormat.KEY_DURATION, mBufferDurationUs);
263                    if (MimeTypes.isAudio(mMediaFormats.get(i).mimeType)) {
264                        audios.add(new BufferManager.TrackFormat(mIds.get(i), format));
265                    } else if (MimeTypes.isVideo(mMediaFormats.get(i).mimeType)) {
266                        videos.add(new BufferManager.TrackFormat(mIds.get(i), format));
267                    }
268                }
269                mBufferManager.writeMetaFiles(audios, videos);
270            }
271        } finally {
272            mBufferManager.release();
273            mIoHandler.getLooper().quitSafely();
274        }
275    }
276
277    @Override
278    public boolean handleMessage(Message message) {
279        if (mFinished) {
280            return true;
281        }
282        releaseEvictedChunks();
283        try {
284            switch (message.what) {
285                case MSG_OPEN_READ:
286                    doOpenRead((IoParams) message.obj);
287                    return true;
288                case MSG_OPEN_WRITE:
289                    doOpenWrite((int) message.obj);
290                    return true;
291                case MSG_CLOSE_READ:
292                    doCloseRead((int) message.obj);
293                    return true;
294                case MSG_CLOSE_WRITE:
295                    doCloseWrite();
296                    return true;
297                case MSG_READ:
298                    doRead((int) message.obj);
299                    return true;
300                case MSG_WRITE:
301                    doWrite((IoParams) message.obj);
302                    // Since only write will increase storage, eviction will be handled here.
303                    return true;
304                case MSG_RELEASE:
305                    doRelease((ConditionVariable) message.obj);
306                    return true;
307            }
308        } catch (IOException e) {
309            mIoCallback.onIoError();
310            mErrorNotified = true;
311            Log.e(TAG, "IoException happened", e);
312            return true;
313        }
314        return false;
315    }
316
317    private void doOpenRead(IoParams params) throws IOException {
318        int index = params.index;
319        mIoHandler.removeMessages(MSG_READ, index);
320        Pair<SampleChunk, Integer> readPosition =
321                mBufferManager.getReadFile(mIds.get(index), params.positionUs);
322        if (readPosition == null) {
323            String errorMessage = "Chunk ID:" + mIds.get(index) + " pos:" + params.positionUs
324                    + "is not found";
325            SoftPreconditions.checkNotNull(readPosition, TAG, errorMessage);
326            throw new IOException(errorMessage);
327        }
328        mSelectedTracks.add(index);
329        mReadIoStates[index].openRead(readPosition.first, (long) readPosition.second);
330        if (mHandlerReadSampleBuffers[index] != null) {
331            SampleHolder sample;
332            while ((sample = mHandlerReadSampleBuffers[index].poll()) != null) {
333                mSamplePool.releaseSample(sample);
334            }
335        }
336        mHandlerReadSampleBuffers[index] = params.readSampleBuffer;
337        mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_READ, index));
338    }
339
340    private void doOpenWrite(int index) throws IOException {
341        SampleChunk chunk = mBufferManager.createNewWriteFileIfNeeded(mIds.get(index), 0,
342                mSamplePool, null, 0);
343        mWriteIoStates[index].openWrite(chunk);
344    }
345
346    private void doCloseRead(int index) {
347        mSelectedTracks.remove(index);
348        if (mHandlerReadSampleBuffers[index] != null) {
349            SampleHolder sample;
350            while ((sample = mHandlerReadSampleBuffers[index].poll()) != null) {
351                mSamplePool.releaseSample(sample);
352            }
353        }
354        mIoHandler.removeMessages(MSG_READ, index);
355    }
356
357    private void doRead(int index) throws IOException {
358        mIoHandler.removeMessages(MSG_READ, index);
359        if (mHandlerReadSampleBuffers[index].size() >= MAX_READ_BUFFER_SAMPLES) {
360            // If enough samples are buffered, try again few moments later hoping that
361            // buffered samples are consumed.
362            mIoHandler.sendMessageDelayed(
363                    mIoHandler.obtainMessage(MSG_READ, index), READ_RESCHEDULING_DELAY_MS);
364        } else {
365            if (mReadIoStates[index].isReadFinished()) {
366                for (int i = 0; i < mTrackCount; ++i) {
367                    if (!mReadIoStates[i].isReadFinished()) {
368                        return;
369                    }
370                }
371                mIoCallback.onIoReachedEos();
372                return;
373            }
374            SampleHolder sample = mReadIoStates[index].read();
375            if (sample != null) {
376                mHandlerReadSampleBuffers[index].offer(sample);
377            } else {
378                // Read reached write but write is not finished yet --- wait a few moments to
379                // see if another sample is written.
380                mIoHandler.sendMessageDelayed(
381                        mIoHandler.obtainMessage(MSG_READ, index),
382                        READ_RESCHEDULING_DELAY_MS);
383            }
384        }
385    }
386
387    private void doWrite(IoParams params) throws IOException {
388        try {
389            if (mWriteEnded) {
390                SoftPreconditions.checkState(false);
391                return;
392            }
393            int index = params.index;
394            SampleHolder sample = params.sample;
395            SampleChunk nextChunk = null;
396            if ((sample.flags & MediaCodec.BUFFER_FLAG_KEY_FRAME) != 0) {
397                if (sample.timeUs > mBufferDurationUs) {
398                    mBufferDurationUs = sample.timeUs;
399                }
400                if (sample.timeUs >= mWriteIndexEndPositionUs[index]) {
401                    SampleChunk currentChunk = sample.timeUs >= mWriteChunkEndPositionUs[index] ?
402                            null : mWriteIoStates[params.index].getChunk();
403                    int currentOffset = (int) mWriteIoStates[params.index].getOffset();
404                    nextChunk = mBufferManager.createNewWriteFileIfNeeded(
405                            mIds.get(index), mWriteIndexEndPositionUs[index], mSamplePool,
406                            currentChunk, currentOffset);
407                    mWriteIndexEndPositionUs[index] =
408                            ((sample.timeUs / RecordingSampleBuffer.MIN_SEEK_DURATION_US) + 1) *
409                                    RecordingSampleBuffer.MIN_SEEK_DURATION_US;
410                    if (nextChunk != null) {
411                        mWriteChunkEndPositionUs[index] =
412                                ((sample.timeUs / mSampleChunkDurationUs) + 1)
413                                        * mSampleChunkDurationUs;
414                    }
415                }
416            }
417            mWriteIoStates[params.index].write(params.sample, nextChunk);
418        } finally {
419            params.conditionVariable.open();
420        }
421    }
422
423    private void doCloseWrite() throws IOException {
424        if (mWriteEnded) {
425            return;
426        }
427        mWriteEnded = true;
428        boolean readFinished = true;
429        for (int i = 0; i < mTrackCount; ++i) {
430            readFinished = readFinished && mReadIoStates[i].isReadFinished();
431            mWriteIoStates[i].closeWrite();
432        }
433        if (readFinished) {
434            mIoCallback.onIoReachedEos();
435        }
436    }
437
438    private void doRelease(ConditionVariable conditionVariable) {
439        mIoHandler.removeCallbacksAndMessages(null);
440        mFinished = true;
441        conditionVariable.open();
442        mSelectedTracks.clear();
443    }
444
445    private void releaseEvictedChunks() {
446        if (mBufferReason != RecordingSampleBuffer.BUFFER_REASON_LIVE_PLAYBACK
447                || mSelectedTracks.isEmpty()) {
448            return;
449        }
450        long currentStartPositionUs = Long.MAX_VALUE;
451        for (int trackIndex : mSelectedTracks) {
452            currentStartPositionUs = Math.min(currentStartPositionUs,
453                    mReadIoStates[trackIndex].getStartPositionUs());
454        }
455        for (int i = 0; i < mTrackCount; ++i) {
456            long evictEndPositionUs = Math.min(mBufferManager.getStartPositionUs(mIds.get(i)),
457                    currentStartPositionUs);
458            mBufferManager.evictChunks(mIds.get(i), evictEndPositionUs);
459        }
460    }
461}