1/*
2 * Copyright (C) 2015 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.tv.tuner.source;
18
19import android.content.Context;
20import android.util.Log;
21import android.util.Pair;
22
23import com.google.android.exoplayer.C;
24import com.google.android.exoplayer.upstream.DataSpec;
25import com.android.tv.common.SoftPreconditions;
26import com.android.tv.tuner.ChannelScanFileParser;
27import com.android.tv.tuner.TunerHal;
28import com.android.tv.tuner.TunerPreferences;
29import com.android.tv.tuner.data.TunerChannel;
30import com.android.tv.tuner.tvinput.EventDetector;
31import com.android.tv.tuner.tvinput.EventDetector.EventListener;
32
33import java.io.IOException;
34import java.util.ArrayList;
35import java.util.List;
36import java.util.concurrent.atomic.AtomicLong;
37
38/**
39 * Provides MPEG-2 TS stream sources for channel playing from an underlying tuner device.
40 */
41public class TunerTsStreamer implements TsStreamer {
42    private static final String TAG = "TunerTsStreamer";
43
44    private static final int MIN_READ_UNIT = 1500;
45    private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~15KB
46    private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 20000;  // ~ 30MB
47    private static final int TS_PACKET_SIZE = 188;
48
49    private static final int READ_TIMEOUT_MS = 5000; // 5 secs.
50    private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
51    private static final int READ_ERROR_STREAMING_ENDED = -1;
52    private static final int READ_ERROR_BUFFER_OVERWRITTEN = -2;
53
54    private final Object mCircularBufferMonitor = new Object();
55    private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
56    private long mBytesFetched;
57    private final AtomicLong mLastReadPosition = new AtomicLong();
58    private boolean mStreaming;
59
60    private final TunerHal mTunerHal;
61    private TunerChannel mChannel;
62    private Thread mStreamingThread;
63    private final EventDetector mEventDetector;
64    private final List<Pair<EventListener, Boolean>> mEventListenerActions = new ArrayList<>();
65
66    private final TsStreamWriter mTsStreamWriter;
67    private String mChannelNumber;
68
69    public static class TunerDataSource extends TsDataSource {
70        private final TunerTsStreamer mTsStreamer;
71        private final AtomicLong mLastReadPosition = new AtomicLong(0);
72        private long mStartBufferedPosition;
73
74        private TunerDataSource(TunerTsStreamer tsStreamer) {
75            mTsStreamer = tsStreamer;
76            mStartBufferedPosition = tsStreamer.getBufferedPosition();
77        }
78
79        @Override
80        public long getBufferedPosition() {
81            return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
82        }
83
84        @Override
85        public long getLastReadPosition() {
86            return mLastReadPosition.get();
87        }
88
89        @Override
90        public void shiftStartPosition(long offset) {
91            SoftPreconditions.checkState(mLastReadPosition.get() == 0);
92            SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
93            mStartBufferedPosition += offset;
94        }
95
96        @Override
97        public long open(DataSpec dataSpec) throws IOException {
98            mLastReadPosition.set(0);
99            return C.LENGTH_UNBOUNDED;
100        }
101
102        @Override
103        public void close() {
104        }
105
106        @Override
107        public int read(byte[] buffer, int offset, int readLength) throws IOException {
108            int ret = mTsStreamer.readAt(mStartBufferedPosition + mLastReadPosition.get(), buffer,
109                    offset, readLength);
110            if (ret > 0) {
111                mLastReadPosition.addAndGet(ret);
112            } else if (ret == READ_ERROR_BUFFER_OVERWRITTEN) {
113                long currentPosition = mStartBufferedPosition + mLastReadPosition.get();
114                long endPosition = mTsStreamer.getBufferedPosition();
115                long diff = ((endPosition - currentPosition + TS_PACKET_SIZE - 1) / TS_PACKET_SIZE)
116                        * TS_PACKET_SIZE;
117                Log.w(TAG, "Demux position jump by overwritten buffer: " + diff);
118                mStartBufferedPosition = currentPosition + diff;
119                mLastReadPosition.set(0);
120                return 0;
121            }
122            return ret;
123        }
124    }
125    /**
126     * Creates {@link TsStreamer} for playing or recording the specified channel.
127     * @param tunerHal the HAL for tuner device
128     * @param eventListener the listener for channel & program information
129     */
130    public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener, Context context) {
131        mTunerHal = tunerHal;
132        mEventDetector = new EventDetector(mTunerHal);
133        if (eventListener != null) {
134            mEventDetector.registerListener(eventListener);
135        }
136        mTsStreamWriter = context != null && TunerPreferences.getStoreTsStream(context) ?
137                new TsStreamWriter(context) : null;
138    }
139
140    public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener) {
141        this(tunerHal, eventListener, null);
142    }
143
144    @Override
145    public boolean startStream(TunerChannel channel) {
146        if (mTunerHal.tune(channel.getFrequency(), channel.getModulation(),
147                channel.getDisplayNumber(false))) {
148            if (channel.hasVideo()) {
149                mTunerHal.addPidFilter(channel.getVideoPid(),
150                        TunerHal.FILTER_TYPE_VIDEO);
151            }
152            boolean audioFilterSet = false;
153            for (Integer audioPid : channel.getAudioPids()) {
154                if (!audioFilterSet) {
155                    mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_AUDIO);
156                    audioFilterSet = true;
157                } else {
158                    // FILTER_TYPE_AUDIO overrides the previous filter for audio. We use
159                    // FILTER_TYPE_OTHER from the secondary one to get the all audio tracks.
160                    mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_OTHER);
161                }
162            }
163            mTunerHal.addPidFilter(channel.getPcrPid(),
164                    TunerHal.FILTER_TYPE_PCR);
165            if (mEventDetector != null) {
166                mEventDetector.startDetecting(channel.getFrequency(), channel.getModulation(),
167                        channel.getProgramNumber());
168            }
169            mChannel = channel;
170            mChannelNumber = channel.getDisplayNumber();
171            synchronized (mCircularBufferMonitor) {
172                if (mStreaming) {
173                    Log.w(TAG, "Streaming should be stopped before start streaming");
174                    return true;
175                }
176                mStreaming = true;
177                mBytesFetched = 0;
178                mLastReadPosition.set(0L);
179            }
180            if (mTsStreamWriter != null) {
181                mTsStreamWriter.setChannel(mChannel);
182                mTsStreamWriter.openFile();
183            }
184            mStreamingThread = new StreamingThread();
185            mStreamingThread.start();
186            Log.i(TAG, "Streaming started");
187            return true;
188        }
189        return false;
190    }
191
192    @Override
193    public boolean startStream(ChannelScanFileParser.ScanChannel channel) {
194        if (mTunerHal.tune(channel.frequency, channel.modulation, null)) {
195            mEventDetector.startDetecting(
196                    channel.frequency, channel.modulation, EventDetector.ALL_PROGRAM_NUMBERS);
197            synchronized (mCircularBufferMonitor) {
198                if (mStreaming) {
199                    Log.w(TAG, "Streaming should be stopped before start streaming");
200                    return true;
201                }
202                mStreaming = true;
203                mBytesFetched = 0;
204                mLastReadPosition.set(0L);
205            }
206            mStreamingThread = new StreamingThread();
207            mStreamingThread.start();
208            Log.i(TAG, "Streaming started");
209            return true;
210        }
211        return false;
212    }
213
214    /**
215     * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
216     * device is overloaded this can take a while, but usually it returns pretty quickly.
217     */
218    @Override
219    public void stopStream() {
220        mChannel = null;
221        synchronized (mCircularBufferMonitor) {
222            mStreaming = false;
223            mCircularBufferMonitor.notifyAll();
224        }
225
226        try {
227            if (mStreamingThread != null) {
228                mStreamingThread.join();
229            }
230        } catch (InterruptedException e) {
231            Thread.currentThread().interrupt();
232        }
233        if (mTsStreamWriter != null) {
234            mTsStreamWriter.closeFile(true);
235            mTsStreamWriter.setChannel(null);
236        }
237    }
238
239    @Override
240    public TsDataSource createDataSource() {
241        return new TunerDataSource(this);
242    }
243
244    /**
245     * Returns incomplete channel lists which was scanned so far. Incomplete channel means
246     * the channel whose channel information is not complete or is not well-formed.
247     * @return {@link List} of {@link TunerChannel}
248     */
249    public List<TunerChannel> getMalFormedChannels() {
250        return mEventDetector.getMalFormedChannels();
251    }
252
253    /**
254     * Returns the current {@link TunerHal} which provides MPEG-TS stream for TunerTsStreamer.
255     * @return {@link TunerHal}
256     */
257    public TunerHal getTunerHal() {
258        return mTunerHal;
259    }
260
261    /**
262     * Returns the current tuned channel for TunerTsStreamer.
263     * @return {@link TunerChannel}
264     */
265    public TunerChannel getChannel() {
266        return mChannel;
267    }
268
269    /**
270     * Returns the current buffered position from tuner.
271     * @return the current buffered position
272     */
273    public long getBufferedPosition() {
274        synchronized (mCircularBufferMonitor) {
275            return mBytesFetched;
276        }
277    }
278
279    public String getStreamerInfo() {
280        return "Channel: " + mChannelNumber + ", Streaming: " + mStreaming;
281    }
282
283    public void registerListener(EventListener listener) {
284        if (mEventDetector != null && listener != null) {
285            synchronized (mEventListenerActions) {
286                mEventListenerActions.add(new Pair<>(listener, true));
287            }
288        }
289    }
290
291    public void unregisterListener(EventListener listener) {
292        if (mEventDetector != null) {
293            synchronized (mEventListenerActions) {
294                mEventListenerActions.add(new Pair(listener, false));
295            }
296        }
297    }
298
299    private class StreamingThread extends Thread {
300        @Override
301        public void run() {
302            // Buffers for streaming data from the tuner and the internal buffer.
303            byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
304
305            while (true) {
306                synchronized (mCircularBufferMonitor) {
307                    if (!mStreaming) {
308                        break;
309                    }
310                }
311
312                if (mEventDetector != null) {
313                    synchronized (mEventListenerActions) {
314                        for (Pair listenerAction : mEventListenerActions) {
315                            EventListener listener = (EventListener) listenerAction.first;
316                            if ((boolean) listenerAction.second) {
317                                mEventDetector.registerListener(listener);
318                            } else {
319                                mEventDetector.unregisterListener(listener);
320                            }
321                        }
322                        mEventListenerActions.clear();
323                    }
324                }
325
326                int bytesWritten = mTunerHal.readTsStream(dataBuffer, dataBuffer.length);
327                if (bytesWritten <= 0) {
328                    try {
329                        // When buffer is underrun, we sleep for short time to prevent
330                        // unnecessary CPU draining.
331                        sleep(BUFFER_UNDERRUN_SLEEP_MS);
332                    } catch (InterruptedException e) {
333                        Thread.currentThread().interrupt();
334                    }
335                    continue;
336                }
337
338                if (mTsStreamWriter != null) {
339                    mTsStreamWriter.writeToFile(dataBuffer, bytesWritten);
340                }
341
342                if (mEventDetector != null) {
343                    mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
344                }
345                synchronized (mCircularBufferMonitor) {
346                    int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
347                    int bytesToCopyInFirstPass = bytesWritten;
348                    if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
349                        bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
350                    }
351                    System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer,
352                            bytesToCopyInFirstPass);
353                    if (bytesToCopyInFirstPass < bytesWritten) {
354                        System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0,
355                                bytesWritten - bytesToCopyInFirstPass);
356                    }
357                    mBytesFetched += bytesWritten;
358                    mCircularBufferMonitor.notifyAll();
359                }
360            }
361
362            Log.i(TAG, "Streaming stopped");
363        }
364    }
365
366    /**
367     * Reads data from internal buffer.
368     * @param pos the position to read from
369     * @param buffer to read
370     * @param offset start position of the read buffer
371     * @param amount number of bytes to read
372     * @return number of read bytes when successful, {@code -1} otherwise
373     * @throws IOException
374     */
375    public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
376        while (true) {
377            synchronized (mCircularBufferMonitor) {
378                if (!mStreaming) {
379                    return READ_ERROR_STREAMING_ENDED;
380                }
381                if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
382                    Log.w(TAG, "Demux is requesting the data which is already overwritten.");
383                    return READ_ERROR_BUFFER_OVERWRITTEN;
384                }
385                if (mBytesFetched < pos + amount) {
386                    try {
387                        mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
388                    } catch (InterruptedException e) {
389                        Thread.currentThread().interrupt();
390                    }
391                    // Try again to prevent starvation.
392                    // Give chances to read from other threads.
393                    continue;
394                }
395                int startPos = (int) (pos % CIRCULAR_BUFFER_SIZE);
396                int endPos = (int) ((pos + amount) % CIRCULAR_BUFFER_SIZE);
397                int firstLength = (startPos > endPos ? CIRCULAR_BUFFER_SIZE : endPos) - startPos;
398                System.arraycopy(mCircularBuffer, startPos, buffer, offset, firstLength);
399                if (firstLength < amount) {
400                    System.arraycopy(mCircularBuffer, 0, buffer, offset + firstLength,
401                            amount - firstLength);
402                }
403                mCircularBufferMonitor.notifyAll();
404                return amount;
405            }
406        }
407    }
408}
409