1/*
2 * Copyright (c) 2006-2011 Christian Plattner. All rights reserved.
3 * Please refer to the LICENSE.txt for licensing details.
4 */
5
6package ch.ethz.ssh2.transport;
7
8import java.io.IOException;
9import java.io.InputStream;
10import java.io.OutputStream;
11import java.net.InetAddress;
12import java.net.InetSocketAddress;
13import java.net.Socket;
14import java.net.SocketTimeoutException;
15import java.net.UnknownHostException;
16import java.security.SecureRandom;
17import java.util.List;
18import java.util.Vector;
19
20import ch.ethz.ssh2.ConnectionInfo;
21import ch.ethz.ssh2.ConnectionMonitor;
22import ch.ethz.ssh2.DHGexParameters;
23import ch.ethz.ssh2.HTTPProxyData;
24import ch.ethz.ssh2.HTTPProxyException;
25import ch.ethz.ssh2.ProxyData;
26import ch.ethz.ssh2.ServerHostKeyVerifier;
27import ch.ethz.ssh2.crypto.Base64;
28import ch.ethz.ssh2.crypto.CryptoWishList;
29import ch.ethz.ssh2.crypto.cipher.BlockCipher;
30import ch.ethz.ssh2.crypto.digest.MAC;
31import ch.ethz.ssh2.log.Logger;
32import ch.ethz.ssh2.packets.PacketDisconnect;
33import ch.ethz.ssh2.packets.Packets;
34import ch.ethz.ssh2.packets.TypesReader;
35import ch.ethz.ssh2.util.StringEncoder;
36import ch.ethz.ssh2.util.Tokenizer;
37
38/*
39 * Yes, the "standard" is a big mess. On one side, the say that arbitary channel
40 * packets are allowed during kex exchange, on the other side we need to blindly
41 * ignore the next _packet_ if the KEX guess was wrong. Where do we know from that
42 * the next packet is not a channel data packet? Yes, we could check if it is in
43 * the KEX range. But the standard says nothing about this. The OpenSSH guys
44 * block local "normal" traffic during KEX. That's fine - however, they assume
45 * that the other side is doing the same. During re-key, if they receive traffic
46 * other than KEX, they become horribly irritated and kill the connection. Since
47 * we are very likely going to communicate with OpenSSH servers, we have to play
48 * the same game - even though we could do better.
49 *
50 * btw: having stdout and stderr on the same channel, with a shared window, is
51 * also a VERY good idea... =(
52 */
53
54/**
55 * TransportManager.
56 *
57 * @author Christian Plattner
58 * @version $Id: TransportManager.java 41 2011-06-02 10:36:41Z dkocher@sudo.ch $
59 */
60public class TransportManager
61{
62	private static final Logger log = Logger.getLogger(TransportManager.class);
63
64	private static class HandlerEntry
65	{
66		MessageHandler mh;
67		int low;
68		int high;
69	}
70
71	private final List<byte[]> asynchronousQueue = new Vector<byte[]>();
72	private Thread asynchronousThread = null;
73
74	class AsynchronousWorker extends Thread
75	{
76		@Override
77		public void run()
78		{
79			while (true)
80			{
81				byte[] msg = null;
82
83				synchronized (asynchronousQueue)
84				{
85					if (asynchronousQueue.size() == 0)
86					{
87						/* After the queue is empty for about 2 seconds, stop this thread */
88
89						try
90						{
91							asynchronousQueue.wait(2000);
92						}
93						catch (InterruptedException ignore)
94						{
95						}
96
97						if (asynchronousQueue.size() == 0)
98						{
99							asynchronousThread = null;
100							return;
101						}
102					}
103
104					msg = asynchronousQueue.remove(0);
105				}
106
107				/* The following invocation may throw an IOException.
108				 * There is no point in handling it - it simply means
109				 * that the connection has a problem and we should stop
110				 * sending asynchronously messages. We do not need to signal that
111				 * we have exited (asynchronousThread = null): further
112				 * messages in the queue cannot be sent by this or any
113				 * other thread.
114				 * Other threads will sooner or later (when receiving or
115				 * sending the next message) get the same IOException and
116				 * get to the same conclusion.
117				 */
118
119				try
120				{
121					sendMessage(msg);
122				}
123				catch (IOException e)
124				{
125					return;
126				}
127			}
128		}
129	}
130
131	private String hostname;
132	private int port;
133	private final Socket sock = new Socket();
134
135	private final Object connectionSemaphore = new Object();
136
137	private boolean flagKexOngoing = false;
138	private boolean connectionClosed = false;
139
140	private Throwable reasonClosedCause = null;
141
142	private TransportConnection tc;
143	private KexManager km;
144
145	private final List<HandlerEntry> messageHandlers = new Vector<HandlerEntry>();
146
147	private Thread receiveThread;
148
149	private List<ConnectionMonitor> connectionMonitors = new Vector<ConnectionMonitor>();
150	private boolean monitorsWereInformed = false;
151
152	/**
153	 * There were reports that there are JDKs which use
154	 * the resolver even though one supplies a dotted IP
155	 * address in the Socket constructor. That is why we
156	 * try to generate the InetAdress "by hand".
157	 *
158	 * @param host
159	 * @return the InetAddress
160	 * @throws UnknownHostException
161	 */
162	private InetAddress createInetAddress(String host) throws UnknownHostException
163	{
164		/* Check if it is a dotted IP4 address */
165
166		InetAddress addr = parseIPv4Address(host);
167
168		if (addr != null)
169		{
170			return addr;
171		}
172
173		return InetAddress.getByName(host);
174	}
175
176	private InetAddress parseIPv4Address(String host) throws UnknownHostException
177	{
178		if (host == null)
179		{
180			return null;
181		}
182
183		String[] quad = Tokenizer.parseTokens(host, '.');
184
185		if ((quad == null) || (quad.length != 4))
186		{
187			return null;
188		}
189
190		byte[] addr = new byte[4];
191
192		for (int i = 0; i < 4; i++)
193		{
194			int part = 0;
195
196			if ((quad[i].length() == 0) || (quad[i].length() > 3))
197			{
198				return null;
199			}
200
201			for (int k = 0; k < quad[i].length(); k++)
202			{
203				char c = quad[i].charAt(k);
204
205				/* No, Character.isDigit is not the same */
206				if ((c < '0') || (c > '9'))
207				{
208					return null;
209				}
210
211				part = part * 10 + (c - '0');
212			}
213
214			if (part > 255) /* 300.1.2.3 is invalid =) */
215			{
216				return null;
217			}
218
219			addr[i] = (byte) part;
220		}
221
222		return InetAddress.getByAddress(host, addr);
223	}
224
225	public TransportManager(String host, int port) throws IOException
226	{
227		this.hostname = host;
228		this.port = port;
229	}
230
231	public int getPacketOverheadEstimate()
232	{
233		return tc.getPacketOverheadEstimate();
234	}
235
236	public void setTcpNoDelay(boolean state) throws IOException
237	{
238		sock.setTcpNoDelay(state);
239	}
240
241	public void setSoTimeout(int timeout) throws IOException
242	{
243		sock.setSoTimeout(timeout);
244	}
245
246	public ConnectionInfo getConnectionInfo(int kexNumber) throws IOException
247	{
248		return km.getOrWaitForConnectionInfo(kexNumber);
249	}
250
251	public Throwable getReasonClosedCause()
252	{
253		synchronized (connectionSemaphore)
254		{
255			return reasonClosedCause;
256		}
257	}
258
259	public byte[] getSessionIdentifier()
260	{
261		return km.sessionId;
262	}
263
264	public void close(Throwable cause, boolean useDisconnectPacket)
265	{
266		if (useDisconnectPacket == false)
267		{
268			/* OK, hard shutdown - do not aquire the semaphore,
269			 * perhaps somebody is inside (and waits until the remote
270			 * side is ready to accept new data). */
271
272			try
273			{
274				sock.close();
275			}
276			catch (IOException ignore)
277			{
278			}
279
280			/* OK, whoever tried to send data, should now agree that
281			 * there is no point in further waiting =)
282			 * It is safe now to aquire the semaphore.
283			 */
284		}
285
286		synchronized (connectionSemaphore)
287		{
288			if (connectionClosed == false)
289			{
290				if (useDisconnectPacket == true)
291				{
292					try
293					{
294						byte[] msg = new PacketDisconnect(Packets.SSH_DISCONNECT_BY_APPLICATION, cause.getMessage(), "")
295								.getPayload();
296						if (tc != null)
297						{
298							tc.sendMessage(msg);
299						}
300					}
301					catch (IOException ignore)
302					{
303					}
304
305					try
306					{
307						sock.close();
308					}
309					catch (IOException ignore)
310					{
311					}
312				}
313
314				connectionClosed = true;
315				reasonClosedCause = cause; /* may be null */
316			}
317			connectionSemaphore.notifyAll();
318		}
319
320		/* No check if we need to inform the monitors */
321
322		List<ConnectionMonitor> monitors = new Vector<ConnectionMonitor>();
323
324		synchronized (this)
325		{
326			/* Short term lock to protect "connectionMonitors"
327			 * and "monitorsWereInformed"
328			 * (they may be modified concurrently)
329			 */
330
331			if (monitorsWereInformed == false)
332			{
333				monitorsWereInformed = true;
334				monitors.addAll(connectionMonitors);
335			}
336		}
337
338		for (ConnectionMonitor cmon : monitors)
339		{
340			try
341			{
342				cmon.connectionLost(reasonClosedCause);
343			}
344			catch (Exception ignore)
345			{
346			}
347		}
348	}
349
350	private void establishConnection(ProxyData proxyData, int connectTimeout) throws IOException
351	{
352		/* See the comment for createInetAddress() */
353
354		if (proxyData == null)
355		{
356			InetAddress addr = createInetAddress(hostname);
357			sock.connect(new InetSocketAddress(addr, port), connectTimeout);
358			return;
359		}
360
361		if (proxyData instanceof HTTPProxyData)
362		{
363			HTTPProxyData pd = (HTTPProxyData) proxyData;
364
365			/* At the moment, we only support HTTP proxies */
366
367			InetAddress addr = createInetAddress(pd.proxyHost);
368			sock.connect(new InetSocketAddress(addr, pd.proxyPort), connectTimeout);
369
370			/* OK, now tell the proxy where we actually want to connect to */
371
372			StringBuilder sb = new StringBuilder();
373
374			sb.append("CONNECT ");
375			sb.append(hostname);
376			sb.append(':');
377			sb.append(port);
378			sb.append(" HTTP/1.0\r\n");
379
380			if ((pd.proxyUser != null) && (pd.proxyPass != null))
381			{
382				String credentials = pd.proxyUser + ":" + pd.proxyPass;
383				char[] encoded = Base64.encode(StringEncoder.GetBytes(credentials));
384				sb.append("Proxy-Authorization: Basic ");
385				sb.append(encoded);
386				sb.append("\r\n");
387			}
388
389			if (pd.requestHeaderLines != null)
390			{
391				for (int i = 0; i < pd.requestHeaderLines.length; i++)
392				{
393					if (pd.requestHeaderLines[i] != null)
394					{
395						sb.append(pd.requestHeaderLines[i]);
396						sb.append("\r\n");
397					}
398				}
399			}
400
401			sb.append("\r\n");
402
403			OutputStream out = sock.getOutputStream();
404
405			out.write(StringEncoder.GetBytes(sb.toString()));
406			out.flush();
407
408			/* Now parse the HTTP response */
409
410			byte[] buffer = new byte[1024];
411			InputStream in = sock.getInputStream();
412
413			int len = ClientServerHello.readLineRN(in, buffer);
414
415			String httpReponse = StringEncoder.GetString(buffer, 0, len);
416
417			if (httpReponse.startsWith("HTTP/") == false)
418			{
419				throw new IOException("The proxy did not send back a valid HTTP response.");
420			}
421
422			/* "HTTP/1.X XYZ X" => 14 characters minimum */
423
424			if ((httpReponse.length() < 14) || (httpReponse.charAt(8) != ' ') || (httpReponse.charAt(12) != ' '))
425			{
426				throw new IOException("The proxy did not send back a valid HTTP response.");
427			}
428
429			int errorCode = 0;
430
431			try
432			{
433				errorCode = Integer.parseInt(httpReponse.substring(9, 12));
434			}
435			catch (NumberFormatException ignore)
436			{
437				throw new IOException("The proxy did not send back a valid HTTP response.");
438			}
439
440			if ((errorCode < 0) || (errorCode > 999))
441			{
442				throw new IOException("The proxy did not send back a valid HTTP response.");
443			}
444
445			if (errorCode != 200)
446			{
447				throw new HTTPProxyException(httpReponse.substring(13), errorCode);
448			}
449
450			/* OK, read until empty line */
451
452			while (true)
453			{
454				len = ClientServerHello.readLineRN(in, buffer);
455				if (len == 0)
456				{
457					break;
458				}
459			}
460			return;
461		}
462
463		throw new IOException("Unsupported ProxyData");
464	}
465
466	public void initialize(String identification, CryptoWishList cwl, ServerHostKeyVerifier verifier,
467						   DHGexParameters dhgex, int connectTimeout, SecureRandom rnd, ProxyData proxyData)
468			throws IOException
469	{
470		/* First, establish the TCP connection to the SSH-2 server */
471
472		establishConnection(proxyData, connectTimeout);
473
474		/* Parse the server line and say hello - important: this information is later needed for the
475		 * key exchange (to stop man-in-the-middle attacks) - that is why we wrap it into an object
476		 * for later use.
477		 */
478
479		ClientServerHello csh = new ClientServerHello(identification, sock.getInputStream(), sock.getOutputStream());
480
481		tc = new TransportConnection(sock.getInputStream(), sock.getOutputStream(), rnd);
482
483		km = new KexManager(this, csh, cwl, hostname, port, verifier, rnd);
484		km.initiateKEX(cwl, dhgex);
485
486		receiveThread = new Thread(new Runnable()
487		{
488			public void run()
489			{
490				try
491				{
492					receiveLoop();
493				}
494				catch (IOException e)
495				{
496					close(e, false);
497
498					log.warning("Receive thread: error in receiveLoop: " + e.getMessage());
499				}
500
501				if (log.isDebugEnabled())
502				{
503					log.debug("Receive thread: back from receiveLoop");
504				}
505
506				/* Tell all handlers that it is time to say goodbye */
507
508				if (km != null)
509				{
510					try
511					{
512						km.handleMessage(null, 0);
513					}
514					catch (IOException ignored)
515					{
516					}
517				}
518
519				for (HandlerEntry he : messageHandlers)
520				{
521					try
522					{
523						he.mh.handleMessage(null, 0);
524					}
525					catch (Exception ignore)
526					{
527					}
528				}
529			}
530		});
531
532		receiveThread.setDaemon(true);
533		receiveThread.start();
534	}
535
536	public void registerMessageHandler(MessageHandler mh, int low, int high)
537	{
538		HandlerEntry he = new HandlerEntry();
539		he.mh = mh;
540		he.low = low;
541		he.high = high;
542
543		synchronized (messageHandlers)
544		{
545			messageHandlers.add(he);
546		}
547	}
548
549	public void removeMessageHandler(MessageHandler mh, int low, int high)
550	{
551		synchronized (messageHandlers)
552		{
553			for (int i = 0; i < messageHandlers.size(); i++)
554			{
555				HandlerEntry he = messageHandlers.get(i);
556				if ((he.mh == mh) && (he.low == low) && (he.high == high))
557				{
558					messageHandlers.remove(i);
559					break;
560				}
561			}
562		}
563	}
564
565	public void sendKexMessage(byte[] msg) throws IOException
566	{
567		synchronized (connectionSemaphore)
568		{
569			if (connectionClosed)
570			{
571				throw (IOException) new IOException("Sorry, this connection is closed.").initCause(reasonClosedCause);
572			}
573
574			flagKexOngoing = true;
575
576			try
577			{
578				tc.sendMessage(msg);
579			}
580			catch (IOException e)
581			{
582				close(e, false);
583				throw e;
584			}
585		}
586	}
587
588	public void kexFinished() throws IOException
589	{
590		synchronized (connectionSemaphore)
591		{
592			flagKexOngoing = false;
593			connectionSemaphore.notifyAll();
594		}
595	}
596
597	public void forceKeyExchange(CryptoWishList cwl, DHGexParameters dhgex) throws IOException
598	{
599		km.initiateKEX(cwl, dhgex);
600	}
601
602	public void changeRecvCipher(BlockCipher bc, MAC mac)
603	{
604		tc.changeRecvCipher(bc, mac);
605	}
606
607	public void changeSendCipher(BlockCipher bc, MAC mac)
608	{
609		tc.changeSendCipher(bc, mac);
610	}
611
612	public void sendAsynchronousMessage(byte[] msg) throws IOException
613	{
614		synchronized (asynchronousQueue)
615		{
616			asynchronousQueue.add(msg);
617
618			/* This limit should be flexible enough. We need this, otherwise the peer
619			 * can flood us with global requests (and other stuff where we have to reply
620			 * with an asynchronous message) and (if the server just sends data and does not
621			 * read what we send) this will probably put us in a low memory situation
622			 * (our send queue would grow and grow and...) */
623
624			if (asynchronousQueue.size() > 100)
625			{
626				throw new IOException("Error: the peer is not consuming our asynchronous replies.");
627			}
628
629			/* Check if we have an asynchronous sending thread */
630
631			if (asynchronousThread == null)
632			{
633				asynchronousThread = new AsynchronousWorker();
634				asynchronousThread.setDaemon(true);
635				asynchronousThread.start();
636
637				/* The thread will stop after 2 seconds of inactivity (i.e., empty queue) */
638			}
639		}
640	}
641
642	public void setConnectionMonitors(List<ConnectionMonitor> monitors)
643	{
644		synchronized (this)
645		{
646			connectionMonitors = new Vector<ConnectionMonitor>();
647			connectionMonitors.addAll(monitors);
648		}
649	}
650
651	/**
652	 * True if no response message expected.
653	 */
654	private boolean idle;
655
656	public void sendMessage(byte[] msg) throws IOException
657	{
658		if (Thread.currentThread() == receiveThread)
659		{
660			throw new IOException("Assertion error: sendMessage may never be invoked by the receiver thread!");
661		}
662
663		boolean wasInterrupted = false;
664
665		try
666		{
667			synchronized (connectionSemaphore)
668			{
669				while (true)
670				{
671					if (connectionClosed)
672					{
673						throw (IOException) new IOException("Sorry, this connection is closed.")
674								.initCause(reasonClosedCause);
675					}
676
677					if (flagKexOngoing == false)
678					{
679						break;
680					}
681
682					try
683					{
684						connectionSemaphore.wait();
685					}
686					catch (InterruptedException e)
687					{
688						wasInterrupted = true;
689					}
690				}
691
692				try
693				{
694					tc.sendMessage(msg);
695					idle = false;
696				}
697				catch (IOException e)
698				{
699					close(e, false);
700					throw e;
701				}
702			}
703		}
704		finally
705		{
706			if (wasInterrupted)
707				Thread.currentThread().interrupt();
708		}
709	}
710
711	public void receiveLoop() throws IOException
712	{
713		byte[] msg = new byte[35000];
714
715		while (true)
716		{
717			int msglen;
718			try
719			{
720				msglen = tc.receiveMessage(msg, 0, msg.length);
721			}
722			catch (SocketTimeoutException e)
723			{
724				// Timeout in read
725				if (idle)
726				{
727					log.debug("Ignoring socket timeout");
728					continue;
729				}
730				throw e;
731			}
732			idle = true;
733
734			int type = msg[0] & 0xff;
735
736			if (type == Packets.SSH_MSG_IGNORE)
737			{
738				continue;
739			}
740
741			if (type == Packets.SSH_MSG_DEBUG)
742			{
743				if (log.isDebugEnabled())
744				{
745					TypesReader tr = new TypesReader(msg, 0, msglen);
746					tr.readByte();
747					tr.readBoolean();
748					StringBuilder debugMessageBuffer = new StringBuilder();
749					debugMessageBuffer.append(tr.readString("UTF-8"));
750
751					for (int i = 0; i < debugMessageBuffer.length(); i++)
752					{
753						char c = debugMessageBuffer.charAt(i);
754
755						if ((c >= 32) && (c <= 126))
756						{
757							continue;
758						}
759						debugMessageBuffer.setCharAt(i, '\uFFFD');
760					}
761
762					log.debug("DEBUG Message from remote: '" + debugMessageBuffer.toString() + "'");
763				}
764				continue;
765			}
766
767			if (type == Packets.SSH_MSG_UNIMPLEMENTED)
768			{
769				throw new IOException("Peer sent UNIMPLEMENTED message, that should not happen.");
770			}
771
772			if (type == Packets.SSH_MSG_DISCONNECT)
773			{
774				TypesReader tr = new TypesReader(msg, 0, msglen);
775				tr.readByte();
776				int reason_code = tr.readUINT32();
777				StringBuilder reasonBuffer = new StringBuilder();
778				reasonBuffer.append(tr.readString("UTF-8"));
779
780				/*
781				 * Do not get fooled by servers that send abnormal long error
782				 * messages
783				 */
784
785				if (reasonBuffer.length() > 255)
786				{
787					reasonBuffer.setLength(255);
788					reasonBuffer.setCharAt(254, '.');
789					reasonBuffer.setCharAt(253, '.');
790					reasonBuffer.setCharAt(252, '.');
791				}
792
793				/*
794				 * Also, check that the server did not send characters that may
795				 * screw up the receiver -> restrict to reasonable US-ASCII
796				 * subset -> "printable characters" (ASCII 32 - 126). Replace
797				 * all others with 0xFFFD (UNICODE replacement character).
798				 */
799
800				for (int i = 0; i < reasonBuffer.length(); i++)
801				{
802					char c = reasonBuffer.charAt(i);
803
804					if ((c >= 32) && (c <= 126))
805					{
806						continue;
807					}
808					reasonBuffer.setCharAt(i, '\uFFFD');
809				}
810
811				throw new IOException("Peer sent DISCONNECT message (reason code " + reason_code + "): "
812						+ reasonBuffer.toString());
813			}
814
815			/*
816			 * Is it a KEX Packet?
817			 */
818
819			if ((type == Packets.SSH_MSG_KEXINIT) || (type == Packets.SSH_MSG_NEWKEYS)
820					|| ((type >= 30) && (type <= 49)))
821			{
822				km.handleMessage(msg, msglen);
823				continue;
824			}
825
826			MessageHandler mh = null;
827
828			for (int i = 0; i < messageHandlers.size(); i++)
829			{
830				HandlerEntry he = messageHandlers.get(i);
831				if ((he.low <= type) && (type <= he.high))
832				{
833					mh = he.mh;
834					break;
835				}
836			}
837
838			if (mh == null)
839			{
840				throw new IOException("Unexpected SSH message (type " + type + ")");
841			}
842
843			mh.handleMessage(msg, msglen);
844		}
845	}
846}
847