1/*
2 * Copyright (C) 2015 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.tv.tuner.source;
18
19import android.os.Environment;
20import android.util.Log;
21import android.util.SparseBooleanArray;
22
23import com.google.android.exoplayer.C;
24import com.google.android.exoplayer.upstream.DataSpec;
25import com.android.tv.common.SoftPreconditions;
26import com.android.tv.tuner.ChannelScanFileParser.ScanChannel;
27import com.android.tv.tuner.data.TunerChannel;
28import com.android.tv.tuner.ts.TsParser;
29import com.android.tv.tuner.tvinput.EventDetector;
30import com.android.tv.tuner.tvinput.FileSourceEventDetector;
31
32import java.io.BufferedInputStream;
33import java.io.File;
34import java.io.FileInputStream;
35import java.io.IOException;
36import java.util.List;
37import java.util.concurrent.atomic.AtomicLong;
38
39/**
40 * Provides MPEG-2 TS stream sources for both channel scanning and channel playing from a local file
41 * generated by capturing TV signal.
42 */
43public class FileTsStreamer implements TsStreamer {
44    private static final String TAG = "FileTsStreamer";
45
46    private static final int TS_PACKET_SIZE = 188;
47    private static final int TS_SYNC_BYTE = 0x47;
48    private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10;
49    private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB
50    private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB
51    private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB
52    private static final int READ_TIMEOUT_MS = 10000; // 10 secs.
53    private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
54    private static final String FILE_DIR =
55            new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath();
56
57    // Virtual frequency base used for file-based source
58    public static final int FREQ_BASE = 100;
59
60    private final Object mCircularBufferMonitor = new Object();
61    private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
62    private final FileSourceEventDetector mEventDetector;
63
64    private long mBytesFetched;
65    private long mLastReadPosition;
66    private boolean mStreaming;
67
68    private Thread mStreamingThread;
69    private StreamProvider mSource;
70
71    public static class FileDataSource extends TsDataSource {
72        private final FileTsStreamer mTsStreamer;
73        private final AtomicLong mLastReadPosition = new AtomicLong(0);
74        private long mStartBufferedPosition;
75
76        private FileDataSource(FileTsStreamer tsStreamer) {
77            mTsStreamer = tsStreamer;
78            mStartBufferedPosition = tsStreamer.getBufferedPosition();
79        }
80
81        @Override
82        public long getBufferedPosition() {
83            return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
84        }
85
86        @Override
87        public long getLastReadPosition() {
88            return mLastReadPosition.get();
89        }
90
91        @Override
92        public void shiftStartPosition(long offset) {
93            SoftPreconditions.checkState(mLastReadPosition.get() == 0);
94            SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
95            mStartBufferedPosition += offset;
96        }
97
98        @Override
99        public long open(DataSpec dataSpec) throws IOException {
100            mLastReadPosition.set(0);
101            return C.LENGTH_UNBOUNDED;
102        }
103
104        @Override
105        public void close() {
106        }
107
108        @Override
109        public int read(byte[] buffer, int offset, int readLength) throws IOException {
110            int ret = mTsStreamer.readAt(mStartBufferedPosition + mLastReadPosition.get(), buffer,
111                    offset, readLength);
112            if (ret > 0) {
113                mLastReadPosition.addAndGet(ret);
114            }
115            return ret;
116        }
117    }
118
119    /**
120     * Creates {@link TsStreamer} for scanning & playing MPEG-2 TS file.
121     * @param eventListener the listener for channel & program information
122     */
123    public FileTsStreamer(EventDetector.EventListener eventListener) {
124        mEventDetector = new FileSourceEventDetector(eventListener);
125    }
126
127    @Override
128    public boolean startStream(ScanChannel channel) {
129        String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath();
130        mSource = new StreamProvider(filepath);
131        if (!mSource.isReady()) {
132            return false;
133        }
134        mEventDetector.start(mSource, FileSourceEventDetector.ALL_PROGRAM_NUMBERS);
135        mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
136        mSource.addPidFilter(TsParser.PAT_PID);
137        synchronized (mCircularBufferMonitor) {
138            if (mStreaming) {
139                return true;
140            }
141            mStreaming = true;
142        }
143
144        mStreamingThread = new StreamingThread();
145        mStreamingThread.start();
146        Log.i(TAG, "Streaming started");
147        return true;
148    }
149
150    @Override
151    public boolean startStream(TunerChannel channel) {
152        Log.i(TAG, "tuneToChannel with: " + channel.getFilepath());
153        mSource = new StreamProvider(channel.getFilepath());
154        if (!mSource.isReady()) {
155            return false;
156        }
157        mEventDetector.start(mSource, channel.getProgramNumber());
158        mSource.addPidFilter(channel.getVideoPid());
159        for (Integer i : channel.getAudioPids()) {
160            mSource.addPidFilter(i);
161        }
162        mSource.addPidFilter(channel.getPcrPid());
163        mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
164        mSource.addPidFilter(TsParser.PAT_PID);
165        synchronized (mCircularBufferMonitor) {
166            if (mStreaming) {
167                return true;
168            }
169            mStreaming = true;
170        }
171
172        mStreamingThread = new StreamingThread();
173        mStreamingThread.start();
174        Log.i(TAG, "Streaming started");
175        return true;
176    }
177
178    /**
179     * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
180     * device is overloaded this can take a while, but usually it returns pretty quickly.
181     */
182    @Override
183    public void stopStream() {
184        synchronized (mCircularBufferMonitor) {
185            mStreaming = false;
186            mCircularBufferMonitor.notify();
187        }
188
189        try {
190            if (mStreamingThread != null) {
191                mStreamingThread.join();
192            }
193        } catch (InterruptedException e) {
194            Thread.currentThread().interrupt();
195        }
196    }
197
198    @Override
199    public TsDataSource createDataSource() {
200        return new FileDataSource(this);
201    }
202
203    /**
204     * Returns the current buffered position from the file.
205     * @return the current buffered position
206     */
207    public long getBufferedPosition() {
208        synchronized (mCircularBufferMonitor) {
209            return mBytesFetched;
210        }
211    }
212
213    /**
214     * Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID.
215     */
216    public static class StreamProvider {
217        private final String mFilepath;
218        private final SparseBooleanArray mPids = new SparseBooleanArray();
219        private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE];
220
221        private BufferedInputStream mInputStream;
222
223        private StreamProvider(String filepath) {
224            mFilepath = filepath;
225            open(filepath);
226        }
227
228        private void open(String filepath) {
229            try {
230                mInputStream = new BufferedInputStream(new FileInputStream(filepath));
231            } catch (IOException e) {
232                Log.e(TAG, "Error opening input stream", e);
233                mInputStream = null;
234            }
235        }
236
237        private boolean isReady() {
238            return mInputStream != null;
239        }
240
241        /**
242         * Returns the file path of the MPEG-2 TS file.
243         */
244        public String getFilepath() {
245            return mFilepath;
246        }
247
248        /**
249         * Adds a pid for filtering from the MPEG-2 TS file.
250         */
251        public void addPidFilter(int pid) {
252            mPids.put(pid, true);
253        }
254
255        /**
256         * Returns whether the current pid filter is empty or not.
257         */
258        public boolean isFilterEmpty() {
259            return mPids.size() > 0;
260        }
261
262        /**
263         * Clears the current pid filter.
264         */
265        public void clearPidFilter() {
266            mPids.clear();
267        }
268
269        /**
270         * Returns whether a pid is in the pid filter or not.
271         * @param pid the pid to check
272         */
273        public boolean isInFilter(int pid) {
274            return mPids.get(pid);
275        }
276
277        /**
278         * Reads from the MPEG-2 TS file to buffer.
279         *
280         * @param inputBuffer to read
281         * @return the number of read bytes
282         */
283        private int read(byte[] inputBuffer) {
284            int readSize = readInternal();
285            if (readSize <= 0) {
286                // Reached the end of stream. Restart from the beginning.
287                close();
288                open(mFilepath);
289                if (mInputStream == null) {
290                    return -1;
291                }
292                readSize = readInternal();
293            }
294
295            if (mPreBuffer[0] != TS_SYNC_BYTE) {
296                Log.e(TAG, "Error reading input stream - no TS sync found");
297                return -1;
298            }
299            int filteredSize = 0;
300            for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) {
301                if (mPreBuffer[i] == TS_SYNC_BYTE) {
302                    int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff);
303                    if (mPids.get(pid)) {
304                        System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE);
305                        destPos += TS_PACKET_SIZE;
306                        filteredSize += TS_PACKET_SIZE;
307                    }
308                }
309            }
310            return filteredSize;
311        }
312
313        private int readInternal() {
314            int readSize;
315            try {
316                readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length);
317            } catch (IOException e) {
318                Log.e(TAG, "Error reading input stream", e);
319                return -1;
320            }
321            return readSize;
322        }
323
324        private void close() {
325            try {
326                mInputStream.close();
327            } catch (IOException e) {
328                Log.e(TAG, "Error closing input stream:", e);
329            }
330            mInputStream = null;
331        }
332    }
333
334    /**
335     * Reads data from internal buffer.
336     * @param pos the position to read from
337     * @param buffer to read
338     * @param offset start position of the read buffer
339     * @param amount number of bytes to read
340     * @return number of read bytes when successful, {@code -1} otherwise
341     * @throws IOException
342     */
343    public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
344        synchronized (mCircularBufferMonitor) {
345            long initialBytesFetched = mBytesFetched;
346            while (mBytesFetched < pos + amount && mStreaming) {
347                try {
348                    mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
349                } catch (InterruptedException e) {
350                    // Wait again.
351                    Thread.currentThread().interrupt();
352                }
353                if (initialBytesFetched == mBytesFetched) {
354                    Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1.");
355
356                    // Returning -1 will make demux report EOS so that the input service can retry
357                    // the playback.
358                    return -1;
359                }
360            }
361            if (!mStreaming) {
362                Log.w(TAG, "Stream is already stopped.");
363                return -1;
364            }
365            if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
366                Log.e(TAG, "Demux is requesting the data which is already overwritten.");
367                return -1;
368            }
369            int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE);
370            int bytesToCopyInFirstPass = amount;
371            if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
372                bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
373            }
374            System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass);
375            if (bytesToCopyInFirstPass < amount) {
376                System.arraycopy(mCircularBuffer, 0, buffer, offset + bytesToCopyInFirstPass,
377                        amount - bytesToCopyInFirstPass);
378            }
379            mLastReadPosition = pos + amount;
380            mCircularBufferMonitor.notify();
381            return amount;
382        }
383    }
384
385    /**
386     * Adds {@link ScanChannel} instance for local files.
387     *
388     * @param output a list of channels where the results will be placed in
389     */
390    public static void addLocalStreamFiles(List<ScanChannel> output) {
391        File dir = new File(FILE_DIR);
392        if (!dir.exists()) return;
393
394        File[] tsFiles = dir.listFiles();
395        if (tsFiles == null) return;
396        int freq = FileTsStreamer.FREQ_BASE;
397        for (File file : tsFiles) {
398            if (!file.isFile()) continue;
399            output.add(ScanChannel.forFile(freq, file.getName()));
400            freq += 100;
401        }
402    }
403
404    /**
405     * A thread managing a circular buffer that holds stream data to be consumed by player.
406     * Keeps reading data in from a {@link StreamProvider} to hold enough amount for buffering.
407     * Started and stopped by {@link #startStream()} and {@link #stopStream()}, respectively.
408     */
409    private class StreamingThread extends Thread {
410        @Override
411        public void run() {
412            byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
413
414            synchronized (mCircularBufferMonitor) {
415                mBytesFetched = 0;
416                mLastReadPosition = 0;
417            }
418
419            while (true) {
420                synchronized (mCircularBufferMonitor) {
421                    while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE
422                            && mStreaming) {
423                        try {
424                            mCircularBufferMonitor.wait();
425                        } catch (InterruptedException e) {
426                            // Wait again.
427                            Thread.currentThread().interrupt();
428                        }
429                    }
430                    if (!mStreaming) {
431                        break;
432                    }
433                }
434
435                int bytesWritten = mSource.read(dataBuffer);
436                if (bytesWritten <= 0) {
437                    try {
438                        // When buffer is underrun, we sleep for short time to prevent
439                        // unnecessary CPU draining.
440                        sleep(BUFFER_UNDERRUN_SLEEP_MS);
441                    } catch (InterruptedException e) {
442                        Thread.currentThread().interrupt();
443                    }
444                    continue;
445                }
446
447                mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
448
449                synchronized (mCircularBufferMonitor) {
450                    int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
451                    int bytesToCopyInFirstPass = bytesWritten;
452                    if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
453                        bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
454                    }
455                    System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer,
456                            bytesToCopyInFirstPass);
457                    if (bytesToCopyInFirstPass < bytesWritten) {
458                        System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0,
459                                bytesWritten - bytesToCopyInFirstPass);
460                    }
461                    mBytesFetched += bytesWritten;
462                    mCircularBufferMonitor.notify();
463                }
464            }
465
466            Log.i(TAG, "Streaming stopped");
467            mSource.close();
468        }
469    }
470}
471