1/*
2 * Copyright (c) 2006-2011 Christian Plattner. All rights reserved.
3 * Please refer to the LICENSE.txt for licensing details.
4 */
5package ch.ethz.ssh2;
6
7import java.io.IOException;
8import java.io.InputStream;
9
10/**
11 * A <code>StreamGobbler</code> is an InputStream that uses an internal worker
12 * thread to constantly consume input from another InputStream. It uses a buffer
13 * to store the consumed data. The buffer size is automatically adjusted, if needed.
14 * <p/>
15 * This class is sometimes very convenient - if you wrap a session's STDOUT and STDERR
16 * InputStreams with instances of this class, then you don't have to bother about
17 * the shared window of STDOUT and STDERR in the low level SSH-2 protocol,
18 * since all arriving data will be immediatelly consumed by the worker threads.
19 * Also, as a side effect, the streams will be buffered (e.g., single byte
20 * read() operations are faster).
21 * <p/>
22 * Other SSH for Java libraries include this functionality by default in
23 * their STDOUT and STDERR InputStream implementations, however, please be aware
24 * that this approach has also a downside:
25 * <p/>
26 * If you do not call the StreamGobbler's <code>read()</code> method often enough
27 * and the peer is constantly sending huge amounts of data, then you will sooner or later
28 * encounter a low memory situation due to the aggregated data (well, it also depends on the Java heap size).
29 * Joe Average will like this class anyway - a paranoid programmer would never use such an approach.
30 * <p/>
31 * The term "StreamGobbler" was taken from an article called "When Runtime.exec() won't",
32 * see http://www.javaworld.com/javaworld/jw-12-2000/jw-1229-traps.html.
33 *
34 * @author Christian Plattner
35 * @version 2.50, 03/15/10
36 */
37
38public class StreamGobbler extends InputStream
39{
40	class GobblerThread extends Thread
41	{
42		@Override
43		public void run()
44		{
45			byte[] buff = new byte[8192];
46
47			while (true)
48			{
49				try
50				{
51					int avail = is.read(buff);
52
53					synchronized (synchronizer)
54					{
55						if (avail <= 0)
56						{
57							isEOF = true;
58							synchronizer.notifyAll();
59							break;
60						}
61
62						int space_available = buffer.length - write_pos;
63
64						if (space_available < avail)
65						{
66							/* compact/resize buffer */
67
68							int unread_size = write_pos - read_pos;
69							int need_space = unread_size + avail;
70
71							byte[] new_buffer = buffer;
72
73							if (need_space > buffer.length)
74							{
75								int inc = need_space / 3;
76								inc = (inc < 256) ? 256 : inc;
77								inc = (inc > 8192) ? 8192 : inc;
78								new_buffer = new byte[need_space + inc];
79							}
80
81							if (unread_size > 0)
82								System.arraycopy(buffer, read_pos, new_buffer, 0, unread_size);
83
84							buffer = new_buffer;
85
86							read_pos = 0;
87							write_pos = unread_size;
88						}
89
90						System.arraycopy(buff, 0, buffer, write_pos, avail);
91						write_pos += avail;
92
93						synchronizer.notifyAll();
94					}
95				}
96				catch (IOException e)
97				{
98					synchronized (synchronizer)
99					{
100						exception = e;
101						synchronizer.notifyAll();
102						break;
103					}
104				}
105			}
106		}
107	}
108
109	private InputStream is;
110
111	private final Object synchronizer = new Object();
112
113	private boolean isEOF = false;
114	private boolean isClosed = false;
115	private IOException exception = null;
116
117	private byte[] buffer = new byte[2048];
118	private int read_pos = 0;
119	private int write_pos = 0;
120
121	public StreamGobbler(InputStream is)
122	{
123		this.is = is;
124		GobblerThread t = new GobblerThread();
125		t.setDaemon(true);
126		t.start();
127	}
128
129	@Override
130	public int read() throws IOException
131	{
132		boolean wasInterrupted = false;
133
134		try
135		{
136			synchronized (synchronizer)
137			{
138				if (isClosed)
139					throw new IOException("This StreamGobbler is closed.");
140
141				while (read_pos == write_pos)
142				{
143					if (exception != null)
144						throw exception;
145
146					if (isEOF)
147						return -1;
148
149					try
150					{
151						synchronizer.wait();
152					}
153					catch (InterruptedException e)
154					{
155						wasInterrupted = true;
156					}
157				}
158				return buffer[read_pos++] & 0xff;
159			}
160		}
161		finally
162		{
163			if (wasInterrupted)
164				Thread.currentThread().interrupt();
165		}
166	}
167
168	@Override
169	public int available() throws IOException
170	{
171		synchronized (synchronizer)
172		{
173			if (isClosed)
174				throw new IOException("This StreamGobbler is closed.");
175
176			return write_pos - read_pos;
177		}
178	}
179
180	@Override
181	public int read(byte[] b) throws IOException
182	{
183		return read(b, 0, b.length);
184	}
185
186	@Override
187	public void close() throws IOException
188	{
189		synchronized (synchronizer)
190		{
191			if (isClosed)
192				return;
193			isClosed = true;
194			isEOF = true;
195			synchronizer.notifyAll();
196			is.close();
197		}
198	}
199
200	@Override
201	public int read(byte[] b, int off, int len) throws IOException
202	{
203		if (b == null)
204			throw new NullPointerException();
205
206		if ((off < 0) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0) || (off > b.length))
207			throw new IndexOutOfBoundsException();
208
209		if (len == 0)
210			return 0;
211
212		boolean wasInterrupted = false;
213
214		try
215		{
216			synchronized (synchronizer)
217			{
218				if (isClosed)
219					throw new IOException("This StreamGobbler is closed.");
220
221				while (read_pos == write_pos)
222				{
223					if (exception != null)
224						throw exception;
225
226					if (isEOF)
227						return -1;
228
229					try
230					{
231						synchronizer.wait();
232					}
233					catch (InterruptedException e)
234					{
235						wasInterrupted = true;
236					}
237				}
238
239				int avail = write_pos - read_pos;
240
241				avail = (avail > len) ? len : avail;
242
243				System.arraycopy(buffer, read_pos, b, off, avail);
244
245				read_pos += avail;
246
247				return avail;
248			}
249		}
250		finally
251		{
252			if (wasInterrupted)
253				Thread.currentThread().interrupt();
254		}
255	}
256}
257