1/* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.android.tv.tuner.source; 18 19import android.content.Context; 20import android.util.Log; 21import android.util.Pair; 22 23import com.google.android.exoplayer.C; 24import com.google.android.exoplayer.upstream.DataSpec; 25import com.android.tv.common.SoftPreconditions; 26import com.android.tv.tuner.ChannelScanFileParser; 27import com.android.tv.tuner.TunerHal; 28import com.android.tv.tuner.TunerPreferences; 29import com.android.tv.tuner.data.TunerChannel; 30import com.android.tv.tuner.tvinput.EventDetector; 31import com.android.tv.tuner.tvinput.EventDetector.EventListener; 32 33import java.io.IOException; 34import java.util.ArrayList; 35import java.util.List; 36import java.util.concurrent.atomic.AtomicLong; 37 38/** 39 * Provides MPEG-2 TS stream sources for channel playing from an underlying tuner device. 40 */ 41public class TunerTsStreamer implements TsStreamer { 42 private static final String TAG = "TunerTsStreamer"; 43 44 private static final int MIN_READ_UNIT = 1500; 45 private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~15KB 46 private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 20000; // ~ 30MB 47 private static final int TS_PACKET_SIZE = 188; 48 49 private static final int READ_TIMEOUT_MS = 5000; // 5 secs. 50 private static final int BUFFER_UNDERRUN_SLEEP_MS = 10; 51 private static final int READ_ERROR_STREAMING_ENDED = -1; 52 private static final int READ_ERROR_BUFFER_OVERWRITTEN = -2; 53 54 private final Object mCircularBufferMonitor = new Object(); 55 private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE]; 56 private long mBytesFetched; 57 private final AtomicLong mLastReadPosition = new AtomicLong(); 58 private boolean mStreaming; 59 60 private final TunerHal mTunerHal; 61 private TunerChannel mChannel; 62 private Thread mStreamingThread; 63 private final EventDetector mEventDetector; 64 private final List<Pair<EventListener, Boolean>> mEventListenerActions = new ArrayList<>(); 65 66 private final TsStreamWriter mTsStreamWriter; 67 private String mChannelNumber; 68 69 public static class TunerDataSource extends TsDataSource { 70 private final TunerTsStreamer mTsStreamer; 71 private final AtomicLong mLastReadPosition = new AtomicLong(0); 72 private long mStartBufferedPosition; 73 74 private TunerDataSource(TunerTsStreamer tsStreamer) { 75 mTsStreamer = tsStreamer; 76 mStartBufferedPosition = tsStreamer.getBufferedPosition(); 77 } 78 79 @Override 80 public long getBufferedPosition() { 81 return mTsStreamer.getBufferedPosition() - mStartBufferedPosition; 82 } 83 84 @Override 85 public long getLastReadPosition() { 86 return mLastReadPosition.get(); 87 } 88 89 @Override 90 public void shiftStartPosition(long offset) { 91 SoftPreconditions.checkState(mLastReadPosition.get() == 0); 92 SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition()); 93 mStartBufferedPosition += offset; 94 } 95 96 @Override 97 public long open(DataSpec dataSpec) throws IOException { 98 mLastReadPosition.set(0); 99 return C.LENGTH_UNBOUNDED; 100 } 101 102 @Override 103 public void close() { 104 } 105 106 @Override 107 public int read(byte[] buffer, int offset, int readLength) throws IOException { 108 int ret = mTsStreamer.readAt(mStartBufferedPosition + mLastReadPosition.get(), buffer, 109 offset, readLength); 110 if (ret > 0) { 111 mLastReadPosition.addAndGet(ret); 112 } else if (ret == READ_ERROR_BUFFER_OVERWRITTEN) { 113 long currentPosition = mStartBufferedPosition + mLastReadPosition.get(); 114 long endPosition = mTsStreamer.getBufferedPosition(); 115 long diff = ((endPosition - currentPosition + TS_PACKET_SIZE - 1) / TS_PACKET_SIZE) 116 * TS_PACKET_SIZE; 117 Log.w(TAG, "Demux position jump by overwritten buffer: " + diff); 118 mStartBufferedPosition = currentPosition + diff; 119 mLastReadPosition.set(0); 120 return 0; 121 } 122 return ret; 123 } 124 } 125 /** 126 * Creates {@link TsStreamer} for playing or recording the specified channel. 127 * @param tunerHal the HAL for tuner device 128 * @param eventListener the listener for channel & program information 129 */ 130 public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener, Context context) { 131 mTunerHal = tunerHal; 132 mEventDetector = new EventDetector(mTunerHal); 133 if (eventListener != null) { 134 mEventDetector.registerListener(eventListener); 135 } 136 mTsStreamWriter = context != null && TunerPreferences.getStoreTsStream(context) ? 137 new TsStreamWriter(context) : null; 138 } 139 140 public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener) { 141 this(tunerHal, eventListener, null); 142 } 143 144 @Override 145 public boolean startStream(TunerChannel channel) { 146 if (mTunerHal.tune(channel.getFrequency(), channel.getModulation(), 147 channel.getDisplayNumber(false))) { 148 if (channel.hasVideo()) { 149 mTunerHal.addPidFilter(channel.getVideoPid(), 150 TunerHal.FILTER_TYPE_VIDEO); 151 } 152 boolean audioFilterSet = false; 153 for (Integer audioPid : channel.getAudioPids()) { 154 if (!audioFilterSet) { 155 mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_AUDIO); 156 audioFilterSet = true; 157 } else { 158 // FILTER_TYPE_AUDIO overrides the previous filter for audio. We use 159 // FILTER_TYPE_OTHER from the secondary one to get the all audio tracks. 160 mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_OTHER); 161 } 162 } 163 mTunerHal.addPidFilter(channel.getPcrPid(), 164 TunerHal.FILTER_TYPE_PCR); 165 if (mEventDetector != null) { 166 mEventDetector.startDetecting(channel.getFrequency(), channel.getModulation(), 167 channel.getProgramNumber()); 168 } 169 mChannel = channel; 170 mChannelNumber = channel.getDisplayNumber(); 171 synchronized (mCircularBufferMonitor) { 172 if (mStreaming) { 173 Log.w(TAG, "Streaming should be stopped before start streaming"); 174 return true; 175 } 176 mStreaming = true; 177 mBytesFetched = 0; 178 mLastReadPosition.set(0L); 179 } 180 if (mTsStreamWriter != null) { 181 mTsStreamWriter.setChannel(mChannel); 182 mTsStreamWriter.openFile(); 183 } 184 mStreamingThread = new StreamingThread(); 185 mStreamingThread.start(); 186 Log.i(TAG, "Streaming started"); 187 return true; 188 } 189 return false; 190 } 191 192 @Override 193 public boolean startStream(ChannelScanFileParser.ScanChannel channel) { 194 if (mTunerHal.tune(channel.frequency, channel.modulation, null)) { 195 mEventDetector.startDetecting( 196 channel.frequency, channel.modulation, EventDetector.ALL_PROGRAM_NUMBERS); 197 synchronized (mCircularBufferMonitor) { 198 if (mStreaming) { 199 Log.w(TAG, "Streaming should be stopped before start streaming"); 200 return true; 201 } 202 mStreaming = true; 203 mBytesFetched = 0; 204 mLastReadPosition.set(0L); 205 } 206 mStreamingThread = new StreamingThread(); 207 mStreamingThread.start(); 208 Log.i(TAG, "Streaming started"); 209 return true; 210 } 211 return false; 212 } 213 214 /** 215 * Blocks the current thread until the streaming thread stops. In rare cases when the tuner 216 * device is overloaded this can take a while, but usually it returns pretty quickly. 217 */ 218 @Override 219 public void stopStream() { 220 mChannel = null; 221 synchronized (mCircularBufferMonitor) { 222 mStreaming = false; 223 mCircularBufferMonitor.notifyAll(); 224 } 225 226 try { 227 if (mStreamingThread != null) { 228 mStreamingThread.join(); 229 } 230 } catch (InterruptedException e) { 231 Thread.currentThread().interrupt(); 232 } 233 if (mTsStreamWriter != null) { 234 mTsStreamWriter.closeFile(true); 235 mTsStreamWriter.setChannel(null); 236 } 237 } 238 239 @Override 240 public TsDataSource createDataSource() { 241 return new TunerDataSource(this); 242 } 243 244 /** 245 * Returns incomplete channel lists which was scanned so far. Incomplete channel means 246 * the channel whose channel information is not complete or is not well-formed. 247 * @return {@link List} of {@link TunerChannel} 248 */ 249 public List<TunerChannel> getMalFormedChannels() { 250 return mEventDetector.getMalFormedChannels(); 251 } 252 253 /** 254 * Returns the current {@link TunerHal} which provides MPEG-TS stream for TunerTsStreamer. 255 * @return {@link TunerHal} 256 */ 257 public TunerHal getTunerHal() { 258 return mTunerHal; 259 } 260 261 /** 262 * Returns the current tuned channel for TunerTsStreamer. 263 * @return {@link TunerChannel} 264 */ 265 public TunerChannel getChannel() { 266 return mChannel; 267 } 268 269 /** 270 * Returns the current buffered position from tuner. 271 * @return the current buffered position 272 */ 273 public long getBufferedPosition() { 274 synchronized (mCircularBufferMonitor) { 275 return mBytesFetched; 276 } 277 } 278 279 public String getStreamerInfo() { 280 return "Channel: " + mChannelNumber + ", Streaming: " + mStreaming; 281 } 282 283 public void registerListener(EventListener listener) { 284 if (mEventDetector != null && listener != null) { 285 synchronized (mEventListenerActions) { 286 mEventListenerActions.add(new Pair<>(listener, true)); 287 } 288 } 289 } 290 291 public void unregisterListener(EventListener listener) { 292 if (mEventDetector != null) { 293 synchronized (mEventListenerActions) { 294 mEventListenerActions.add(new Pair(listener, false)); 295 } 296 } 297 } 298 299 private class StreamingThread extends Thread { 300 @Override 301 public void run() { 302 // Buffers for streaming data from the tuner and the internal buffer. 303 byte[] dataBuffer = new byte[READ_BUFFER_SIZE]; 304 305 while (true) { 306 synchronized (mCircularBufferMonitor) { 307 if (!mStreaming) { 308 break; 309 } 310 } 311 312 if (mEventDetector != null) { 313 synchronized (mEventListenerActions) { 314 for (Pair listenerAction : mEventListenerActions) { 315 EventListener listener = (EventListener) listenerAction.first; 316 if ((boolean) listenerAction.second) { 317 mEventDetector.registerListener(listener); 318 } else { 319 mEventDetector.unregisterListener(listener); 320 } 321 } 322 mEventListenerActions.clear(); 323 } 324 } 325 326 int bytesWritten = mTunerHal.readTsStream(dataBuffer, dataBuffer.length); 327 if (bytesWritten <= 0) { 328 try { 329 // When buffer is underrun, we sleep for short time to prevent 330 // unnecessary CPU draining. 331 sleep(BUFFER_UNDERRUN_SLEEP_MS); 332 } catch (InterruptedException e) { 333 Thread.currentThread().interrupt(); 334 } 335 continue; 336 } 337 338 if (mTsStreamWriter != null) { 339 mTsStreamWriter.writeToFile(dataBuffer, bytesWritten); 340 } 341 342 if (mEventDetector != null) { 343 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten); 344 } 345 synchronized (mCircularBufferMonitor) { 346 int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE); 347 int bytesToCopyInFirstPass = bytesWritten; 348 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 349 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 350 } 351 System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer, 352 bytesToCopyInFirstPass); 353 if (bytesToCopyInFirstPass < bytesWritten) { 354 System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0, 355 bytesWritten - bytesToCopyInFirstPass); 356 } 357 mBytesFetched += bytesWritten; 358 mCircularBufferMonitor.notifyAll(); 359 } 360 } 361 362 Log.i(TAG, "Streaming stopped"); 363 } 364 } 365 366 /** 367 * Reads data from internal buffer. 368 * @param pos the position to read from 369 * @param buffer to read 370 * @param offset start position of the read buffer 371 * @param amount number of bytes to read 372 * @return number of read bytes when successful, {@code -1} otherwise 373 * @throws IOException 374 */ 375 public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException { 376 while (true) { 377 synchronized (mCircularBufferMonitor) { 378 if (!mStreaming) { 379 return READ_ERROR_STREAMING_ENDED; 380 } 381 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) { 382 Log.w(TAG, "Demux is requesting the data which is already overwritten."); 383 return READ_ERROR_BUFFER_OVERWRITTEN; 384 } 385 if (mBytesFetched < pos + amount) { 386 try { 387 mCircularBufferMonitor.wait(READ_TIMEOUT_MS); 388 } catch (InterruptedException e) { 389 Thread.currentThread().interrupt(); 390 } 391 // Try again to prevent starvation. 392 // Give chances to read from other threads. 393 continue; 394 } 395 int startPos = (int) (pos % CIRCULAR_BUFFER_SIZE); 396 int endPos = (int) ((pos + amount) % CIRCULAR_BUFFER_SIZE); 397 int firstLength = (startPos > endPos ? CIRCULAR_BUFFER_SIZE : endPos) - startPos; 398 System.arraycopy(mCircularBuffer, startPos, buffer, offset, firstLength); 399 if (firstLength < amount) { 400 System.arraycopy(mCircularBuffer, 0, buffer, offset + firstLength, 401 amount - firstLength); 402 } 403 mCircularBufferMonitor.notifyAll(); 404 return amount; 405 } 406 } 407 } 408} 409