1/* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.android.tv.tuner.source; 18 19import android.os.Environment; 20import android.util.Log; 21import android.util.SparseBooleanArray; 22 23import com.google.android.exoplayer.C; 24import com.google.android.exoplayer.upstream.DataSpec; 25import com.android.tv.common.SoftPreconditions; 26import com.android.tv.tuner.ChannelScanFileParser.ScanChannel; 27import com.android.tv.tuner.data.TunerChannel; 28import com.android.tv.tuner.ts.TsParser; 29import com.android.tv.tuner.tvinput.EventDetector; 30import com.android.tv.tuner.tvinput.FileSourceEventDetector; 31 32import java.io.BufferedInputStream; 33import java.io.File; 34import java.io.FileInputStream; 35import java.io.IOException; 36import java.util.List; 37import java.util.concurrent.atomic.AtomicLong; 38 39/** 40 * Provides MPEG-2 TS stream sources for both channel scanning and channel playing from a local file 41 * generated by capturing TV signal. 42 */ 43public class FileTsStreamer implements TsStreamer { 44 private static final String TAG = "FileTsStreamer"; 45 46 private static final int TS_PACKET_SIZE = 188; 47 private static final int TS_SYNC_BYTE = 0x47; 48 private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10; 49 private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB 50 private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB 51 private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB 52 private static final int READ_TIMEOUT_MS = 10000; // 10 secs. 53 private static final int BUFFER_UNDERRUN_SLEEP_MS = 10; 54 private static final String FILE_DIR = 55 new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath(); 56 57 // Virtual frequency base used for file-based source 58 public static final int FREQ_BASE = 100; 59 60 private final Object mCircularBufferMonitor = new Object(); 61 private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE]; 62 private final FileSourceEventDetector mEventDetector; 63 64 private long mBytesFetched; 65 private long mLastReadPosition; 66 private boolean mStreaming; 67 68 private Thread mStreamingThread; 69 private StreamProvider mSource; 70 71 public static class FileDataSource extends TsDataSource { 72 private final FileTsStreamer mTsStreamer; 73 private final AtomicLong mLastReadPosition = new AtomicLong(0); 74 private long mStartBufferedPosition; 75 76 private FileDataSource(FileTsStreamer tsStreamer) { 77 mTsStreamer = tsStreamer; 78 mStartBufferedPosition = tsStreamer.getBufferedPosition(); 79 } 80 81 @Override 82 public long getBufferedPosition() { 83 return mTsStreamer.getBufferedPosition() - mStartBufferedPosition; 84 } 85 86 @Override 87 public long getLastReadPosition() { 88 return mLastReadPosition.get(); 89 } 90 91 @Override 92 public void shiftStartPosition(long offset) { 93 SoftPreconditions.checkState(mLastReadPosition.get() == 0); 94 SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition()); 95 mStartBufferedPosition += offset; 96 } 97 98 @Override 99 public long open(DataSpec dataSpec) throws IOException { 100 mLastReadPosition.set(0); 101 return C.LENGTH_UNBOUNDED; 102 } 103 104 @Override 105 public void close() { 106 } 107 108 @Override 109 public int read(byte[] buffer, int offset, int readLength) throws IOException { 110 int ret = mTsStreamer.readAt(mStartBufferedPosition + mLastReadPosition.get(), buffer, 111 offset, readLength); 112 if (ret > 0) { 113 mLastReadPosition.addAndGet(ret); 114 } 115 return ret; 116 } 117 } 118 119 /** 120 * Creates {@link TsStreamer} for scanning & playing MPEG-2 TS file. 121 * @param eventListener the listener for channel & program information 122 */ 123 public FileTsStreamer(EventDetector.EventListener eventListener) { 124 mEventDetector = new FileSourceEventDetector(eventListener); 125 } 126 127 @Override 128 public boolean startStream(ScanChannel channel) { 129 String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath(); 130 mSource = new StreamProvider(filepath); 131 if (!mSource.isReady()) { 132 return false; 133 } 134 mEventDetector.start(mSource, FileSourceEventDetector.ALL_PROGRAM_NUMBERS); 135 mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID); 136 mSource.addPidFilter(TsParser.PAT_PID); 137 synchronized (mCircularBufferMonitor) { 138 if (mStreaming) { 139 return true; 140 } 141 mStreaming = true; 142 } 143 144 mStreamingThread = new StreamingThread(); 145 mStreamingThread.start(); 146 Log.i(TAG, "Streaming started"); 147 return true; 148 } 149 150 @Override 151 public boolean startStream(TunerChannel channel) { 152 Log.i(TAG, "tuneToChannel with: " + channel.getFilepath()); 153 mSource = new StreamProvider(channel.getFilepath()); 154 if (!mSource.isReady()) { 155 return false; 156 } 157 mEventDetector.start(mSource, channel.getProgramNumber()); 158 mSource.addPidFilter(channel.getVideoPid()); 159 for (Integer i : channel.getAudioPids()) { 160 mSource.addPidFilter(i); 161 } 162 mSource.addPidFilter(channel.getPcrPid()); 163 mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID); 164 mSource.addPidFilter(TsParser.PAT_PID); 165 synchronized (mCircularBufferMonitor) { 166 if (mStreaming) { 167 return true; 168 } 169 mStreaming = true; 170 } 171 172 mStreamingThread = new StreamingThread(); 173 mStreamingThread.start(); 174 Log.i(TAG, "Streaming started"); 175 return true; 176 } 177 178 /** 179 * Blocks the current thread until the streaming thread stops. In rare cases when the tuner 180 * device is overloaded this can take a while, but usually it returns pretty quickly. 181 */ 182 @Override 183 public void stopStream() { 184 synchronized (mCircularBufferMonitor) { 185 mStreaming = false; 186 mCircularBufferMonitor.notify(); 187 } 188 189 try { 190 if (mStreamingThread != null) { 191 mStreamingThread.join(); 192 } 193 } catch (InterruptedException e) { 194 Thread.currentThread().interrupt(); 195 } 196 } 197 198 @Override 199 public TsDataSource createDataSource() { 200 return new FileDataSource(this); 201 } 202 203 /** 204 * Returns the current buffered position from the file. 205 * @return the current buffered position 206 */ 207 public long getBufferedPosition() { 208 synchronized (mCircularBufferMonitor) { 209 return mBytesFetched; 210 } 211 } 212 213 /** 214 * Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID. 215 */ 216 public static class StreamProvider { 217 private final String mFilepath; 218 private final SparseBooleanArray mPids = new SparseBooleanArray(); 219 private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE]; 220 221 private BufferedInputStream mInputStream; 222 223 private StreamProvider(String filepath) { 224 mFilepath = filepath; 225 open(filepath); 226 } 227 228 private void open(String filepath) { 229 try { 230 mInputStream = new BufferedInputStream(new FileInputStream(filepath)); 231 } catch (IOException e) { 232 Log.e(TAG, "Error opening input stream", e); 233 mInputStream = null; 234 } 235 } 236 237 private boolean isReady() { 238 return mInputStream != null; 239 } 240 241 /** 242 * Returns the file path of the MPEG-2 TS file. 243 */ 244 public String getFilepath() { 245 return mFilepath; 246 } 247 248 /** 249 * Adds a pid for filtering from the MPEG-2 TS file. 250 */ 251 public void addPidFilter(int pid) { 252 mPids.put(pid, true); 253 } 254 255 /** 256 * Returns whether the current pid filter is empty or not. 257 */ 258 public boolean isFilterEmpty() { 259 return mPids.size() > 0; 260 } 261 262 /** 263 * Clears the current pid filter. 264 */ 265 public void clearPidFilter() { 266 mPids.clear(); 267 } 268 269 /** 270 * Returns whether a pid is in the pid filter or not. 271 * @param pid the pid to check 272 */ 273 public boolean isInFilter(int pid) { 274 return mPids.get(pid); 275 } 276 277 /** 278 * Reads from the MPEG-2 TS file to buffer. 279 * 280 * @param inputBuffer to read 281 * @return the number of read bytes 282 */ 283 private int read(byte[] inputBuffer) { 284 int readSize = readInternal(); 285 if (readSize <= 0) { 286 // Reached the end of stream. Restart from the beginning. 287 close(); 288 open(mFilepath); 289 if (mInputStream == null) { 290 return -1; 291 } 292 readSize = readInternal(); 293 } 294 295 if (mPreBuffer[0] != TS_SYNC_BYTE) { 296 Log.e(TAG, "Error reading input stream - no TS sync found"); 297 return -1; 298 } 299 int filteredSize = 0; 300 for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) { 301 if (mPreBuffer[i] == TS_SYNC_BYTE) { 302 int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff); 303 if (mPids.get(pid)) { 304 System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE); 305 destPos += TS_PACKET_SIZE; 306 filteredSize += TS_PACKET_SIZE; 307 } 308 } 309 } 310 return filteredSize; 311 } 312 313 private int readInternal() { 314 int readSize; 315 try { 316 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length); 317 } catch (IOException e) { 318 Log.e(TAG, "Error reading input stream", e); 319 return -1; 320 } 321 return readSize; 322 } 323 324 private void close() { 325 try { 326 mInputStream.close(); 327 } catch (IOException e) { 328 Log.e(TAG, "Error closing input stream:", e); 329 } 330 mInputStream = null; 331 } 332 } 333 334 /** 335 * Reads data from internal buffer. 336 * @param pos the position to read from 337 * @param buffer to read 338 * @param offset start position of the read buffer 339 * @param amount number of bytes to read 340 * @return number of read bytes when successful, {@code -1} otherwise 341 * @throws IOException 342 */ 343 public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException { 344 synchronized (mCircularBufferMonitor) { 345 long initialBytesFetched = mBytesFetched; 346 while (mBytesFetched < pos + amount && mStreaming) { 347 try { 348 mCircularBufferMonitor.wait(READ_TIMEOUT_MS); 349 } catch (InterruptedException e) { 350 // Wait again. 351 Thread.currentThread().interrupt(); 352 } 353 if (initialBytesFetched == mBytesFetched) { 354 Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1."); 355 356 // Returning -1 will make demux report EOS so that the input service can retry 357 // the playback. 358 return -1; 359 } 360 } 361 if (!mStreaming) { 362 Log.w(TAG, "Stream is already stopped."); 363 return -1; 364 } 365 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) { 366 Log.e(TAG, "Demux is requesting the data which is already overwritten."); 367 return -1; 368 } 369 int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE); 370 int bytesToCopyInFirstPass = amount; 371 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 372 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 373 } 374 System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass); 375 if (bytesToCopyInFirstPass < amount) { 376 System.arraycopy(mCircularBuffer, 0, buffer, offset + bytesToCopyInFirstPass, 377 amount - bytesToCopyInFirstPass); 378 } 379 mLastReadPosition = pos + amount; 380 mCircularBufferMonitor.notify(); 381 return amount; 382 } 383 } 384 385 /** 386 * Adds {@link ScanChannel} instance for local files. 387 * 388 * @param output a list of channels where the results will be placed in 389 */ 390 public static void addLocalStreamFiles(List<ScanChannel> output) { 391 File dir = new File(FILE_DIR); 392 if (!dir.exists()) return; 393 394 File[] tsFiles = dir.listFiles(); 395 if (tsFiles == null) return; 396 int freq = FileTsStreamer.FREQ_BASE; 397 for (File file : tsFiles) { 398 if (!file.isFile()) continue; 399 output.add(ScanChannel.forFile(freq, file.getName())); 400 freq += 100; 401 } 402 } 403 404 /** 405 * A thread managing a circular buffer that holds stream data to be consumed by player. 406 * Keeps reading data in from a {@link StreamProvider} to hold enough amount for buffering. 407 * Started and stopped by {@link #startStream()} and {@link #stopStream()}, respectively. 408 */ 409 private class StreamingThread extends Thread { 410 @Override 411 public void run() { 412 byte[] dataBuffer = new byte[READ_BUFFER_SIZE]; 413 414 synchronized (mCircularBufferMonitor) { 415 mBytesFetched = 0; 416 mLastReadPosition = 0; 417 } 418 419 while (true) { 420 synchronized (mCircularBufferMonitor) { 421 while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE 422 && mStreaming) { 423 try { 424 mCircularBufferMonitor.wait(); 425 } catch (InterruptedException e) { 426 // Wait again. 427 Thread.currentThread().interrupt(); 428 } 429 } 430 if (!mStreaming) { 431 break; 432 } 433 } 434 435 int bytesWritten = mSource.read(dataBuffer); 436 if (bytesWritten <= 0) { 437 try { 438 // When buffer is underrun, we sleep for short time to prevent 439 // unnecessary CPU draining. 440 sleep(BUFFER_UNDERRUN_SLEEP_MS); 441 } catch (InterruptedException e) { 442 Thread.currentThread().interrupt(); 443 } 444 continue; 445 } 446 447 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten); 448 449 synchronized (mCircularBufferMonitor) { 450 int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE); 451 int bytesToCopyInFirstPass = bytesWritten; 452 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 453 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 454 } 455 System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer, 456 bytesToCopyInFirstPass); 457 if (bytesToCopyInFirstPass < bytesWritten) { 458 System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0, 459 bytesWritten - bytesToCopyInFirstPass); 460 } 461 mBytesFetched += bytesWritten; 462 mCircularBufferMonitor.notify(); 463 } 464 } 465 466 Log.i(TAG, "Streaming stopped"); 467 mSource.close(); 468 } 469 } 470} 471