1/* 2 * Copyright (c) 2006-2011 Christian Plattner. All rights reserved. 3 * Please refer to the LICENSE.txt for licensing details. 4 */ 5package ch.ethz.ssh2; 6 7import java.io.IOException; 8import java.io.InputStream; 9 10/** 11 * A <code>StreamGobbler</code> is an InputStream that uses an internal worker 12 * thread to constantly consume input from another InputStream. It uses a buffer 13 * to store the consumed data. The buffer size is automatically adjusted, if needed. 14 * <p/> 15 * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR 16 * InputStreams with instances of this class, then you don't have to bother about 17 * the shared window of STDOUT and STDERR in the low level SSH-2 protocol, 18 * since all arriving data will be immediatelly consumed by the worker threads. 19 * Also, as a side effect, the streams will be buffered (e.g., single byte 20 * read() operations are faster). 21 * <p/> 22 * Other SSH for Java libraries include this functionality by default in 23 * their STDOUT and STDERR InputStream implementations, however, please be aware 24 * that this approach has also a downside: 25 * <p/> 26 * If you do not call the StreamGobbler's <code>read()</code> method often enough 27 * and the peer is constantly sending huge amounts of data, then you will sooner or later 28 * encounter a low memory situation due to the aggregated data (well, it also depends on the Java heap size). 29 * Joe Average will like this class anyway - a paranoid programmer would never use such an approach. 30 * <p/> 31 * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't", 32 * see http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html. 33 * 34 * @author Christian Plattner 35 * @version 2.50, 03/15/10 36 */ 37 38public class StreamGobbler extends InputStream 39{ 40 class GobblerThread extends Thread 41 { 42 @Override 43 public void run() 44 { 45 byte[] buff = new byte[8192]; 46 47 while (true) 48 { 49 try 50 { 51 int avail = is.read(buff); 52 53 synchronized (synchronizer) 54 { 55 if (avail <= 0) 56 { 57 isEOF = true; 58 synchronizer.notifyAll(); 59 break; 60 } 61 62 int space_available = buffer.length - write_pos; 63 64 if (space_available < avail) 65 { 66 /* compact/resize buffer */ 67 68 int unread_size = write_pos - read_pos; 69 int need_space = unread_size + avail; 70 71 byte[] new_buffer = buffer; 72 73 if (need_space > buffer.length) 74 { 75 int inc = need_space / 3; 76 inc = (inc < 256) ? 256 : inc; 77 inc = (inc > 8192) ? 8192 : inc; 78 new_buffer = new byte[need_space + inc]; 79 } 80 81 if (unread_size > 0) 82 System.arraycopy(buffer, read_pos, new_buffer, 0, unread_size); 83 84 buffer = new_buffer; 85 86 read_pos = 0; 87 write_pos = unread_size; 88 } 89 90 System.arraycopy(buff, 0, buffer, write_pos, avail); 91 write_pos += avail; 92 93 synchronizer.notifyAll(); 94 } 95 } 96 catch (IOException e) 97 { 98 synchronized (synchronizer) 99 { 100 exception = e; 101 synchronizer.notifyAll(); 102 break; 103 } 104 } 105 } 106 } 107 } 108 109 private InputStream is; 110 111 private final Object synchronizer = new Object(); 112 113 private boolean isEOF = false; 114 private boolean isClosed = false; 115 private IOException exception = null; 116 117 private byte[] buffer = new byte[2048]; 118 private int read_pos = 0; 119 private int write_pos = 0; 120 121 public StreamGobbler(InputStream is) 122 { 123 this.is = is; 124 GobblerThread t = new GobblerThread(); 125 t.setDaemon(true); 126 t.start(); 127 } 128 129 @Override 130 public int read() throws IOException 131 { 132 boolean wasInterrupted = false; 133 134 try 135 { 136 synchronized (synchronizer) 137 { 138 if (isClosed) 139 throw new IOException("This StreamGobbler is closed."); 140 141 while (read_pos == write_pos) 142 { 143 if (exception != null) 144 throw exception; 145 146 if (isEOF) 147 return -1; 148 149 try 150 { 151 synchronizer.wait(); 152 } 153 catch (InterruptedException e) 154 { 155 wasInterrupted = true; 156 } 157 } 158 return buffer[read_pos++] & 0xff; 159 } 160 } 161 finally 162 { 163 if (wasInterrupted) 164 Thread.currentThread().interrupt(); 165 } 166 } 167 168 @Override 169 public int available() throws IOException 170 { 171 synchronized (synchronizer) 172 { 173 if (isClosed) 174 throw new IOException("This StreamGobbler is closed."); 175 176 return write_pos - read_pos; 177 } 178 } 179 180 @Override 181 public int read(byte[] b) throws IOException 182 { 183 return read(b, 0, b.length); 184 } 185 186 @Override 187 public void close() throws IOException 188 { 189 synchronized (synchronizer) 190 { 191 if (isClosed) 192 return; 193 isClosed = true; 194 isEOF = true; 195 synchronizer.notifyAll(); 196 is.close(); 197 } 198 } 199 200 @Override 201 public int read(byte[] b, int off, int len) throws IOException 202 { 203 if (b == null) 204 throw new NullPointerException(); 205 206 if ((off < 0) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0) || (off > b.length)) 207 throw new IndexOutOfBoundsException(); 208 209 if (len == 0) 210 return 0; 211 212 boolean wasInterrupted = false; 213 214 try 215 { 216 synchronized (synchronizer) 217 { 218 if (isClosed) 219 throw new IOException("This StreamGobbler is closed."); 220 221 while (read_pos == write_pos) 222 { 223 if (exception != null) 224 throw exception; 225 226 if (isEOF) 227 return -1; 228 229 try 230 { 231 synchronizer.wait(); 232 } 233 catch (InterruptedException e) 234 { 235 wasInterrupted = true; 236 } 237 } 238 239 int avail = write_pos - read_pos; 240 241 avail = (avail > len) ? len : avail; 242 243 System.arraycopy(buffer, read_pos, b, off, avail); 244 245 read_pos += avail; 246 247 return avail; 248 } 249 } 250 finally 251 { 252 if (wasInterrupted) 253 Thread.currentThread().interrupt(); 254 } 255 } 256} 257