1"""This class extends pexpect.spawn to specialize setting up SSH connections. 2This adds methods for login, logout, and expecting the shell prompt. 3 4$Id: pxssh.py 513 2008-02-09 18:26:13Z noah $ 5""" 6 7from pexpect import * 8import pexpect 9import time 10 11__all__ = ['ExceptionPxssh', 'pxssh'] 12 13# Exception classes used by this module. 14class ExceptionPxssh(ExceptionPexpect): 15 """Raised for pxssh exceptions. 16 """ 17 18class pxssh (spawn): 19 20 """This class extends pexpect.spawn to specialize setting up SSH 21 connections. This adds methods for login, logout, and expecting the shell 22 prompt. It does various tricky things to handle many situations in the SSH 23 login process. For example, if the session is your first login, then pxssh 24 automatically accepts the remote certificate; or if you have public key 25 authentication setup then pxssh won't wait for the password prompt. 26 27 pxssh uses the shell prompt to synchronize output from the remote host. In 28 order to make this more robust it sets the shell prompt to something more 29 unique than just $ or #. This should work on most Borne/Bash or Csh style 30 shells. 31 32 Example that runs a few commands on a remote server and prints the result:: 33 34 import pxssh 35 import getpass 36 try: 37 s = pxssh.pxssh() 38 hostname = raw_input('hostname: ') 39 username = raw_input('username: ') 40 password = getpass.getpass('password: ') 41 s.login (hostname, username, password) 42 s.sendline ('uptime') # run a command 43 s.prompt() # match the prompt 44 print s.before # print everything before the prompt. 45 s.sendline ('ls -l') 46 s.prompt() 47 print s.before 48 s.sendline ('df') 49 s.prompt() 50 print s.before 51 s.logout() 52 except pxssh.ExceptionPxssh, e: 53 print "pxssh failed on login." 54 print str(e) 55 56 Note that if you have ssh-agent running while doing development with pxssh 57 then this can lead to a lot of confusion. Many X display managers (xdm, 58 gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI 59 dialog box popup asking for a password during development. You should turn 60 off any key agents during testing. The 'force_password' attribute will turn 61 off public key authentication. This will only work if the remote SSH server 62 is configured to allow password logins. Example of using 'force_password' 63 attribute:: 64 65 s = pxssh.pxssh() 66 s.force_password = True 67 hostname = raw_input('hostname: ') 68 username = raw_input('username: ') 69 password = getpass.getpass('password: ') 70 s.login (hostname, username, password) 71 """ 72 73 def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None): 74 spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env) 75 76 self.name = '<pxssh>' 77 78 #SUBTLE HACK ALERT! Note that the command to set the prompt uses a 79 #slightly different string than the regular expression to match it. This 80 #is because when you set the prompt the command will echo back, but we 81 #don't want to match the echoed command. So if we make the set command 82 #slightly different than the regex we eliminate the problem. To make the 83 #set command different we add a backslash in front of $. The $ doesn't 84 #need to be escaped, but it doesn't hurt and serves to make the set 85 #prompt command different than the regex. 86 87 # used to match the command-line prompt 88 self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] " 89 self.PROMPT = self.UNIQUE_PROMPT 90 91 # used to set shell command-line prompt to UNIQUE_PROMPT. 92 self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '" 93 self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '" 94 self.SSH_OPTS = "-o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" 95 # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from 96 # displaying a GUI password dialog. I have not figured out how to 97 # disable only SSH_ASKPASS without also disabling X11 forwarding. 98 # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying! 99 #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" 100 self.force_password = False 101 self.auto_prompt_reset = True 102 103 def levenshtein_distance(self, a,b): 104 105 """This calculates the Levenshtein distance between a and b. 106 """ 107 108 n, m = len(a), len(b) 109 if n > m: 110 a,b = b,a 111 n,m = m,n 112 current = range(n+1) 113 for i in range(1,m+1): 114 previous, current = current, [i]+[0]*n 115 for j in range(1,n+1): 116 add, delete = previous[j]+1, current[j-1]+1 117 change = previous[j-1] 118 if a[j-1] != b[i-1]: 119 change = change + 1 120 current[j] = min(add, delete, change) 121 return current[n] 122 123 def sync_original_prompt (self): 124 125 """This attempts to find the prompt. Basically, press enter and record 126 the response; press enter again and record the response; if the two 127 responses are similar then assume we are at the original prompt. This 128 is a slow function. It can take over 10 seconds. """ 129 130 # All of these timing pace values are magic. 131 # I came up with these based on what seemed reliable for 132 # connecting to a heavily loaded machine I have. 133 # If latency is worse than these values then this will fail. 134 135 try: 136 self.read_nonblocking(size=10000,timeout=1) # GAS: Clear out the cache before getting the prompt 137 except TIMEOUT: 138 pass 139 time.sleep(0.1) 140 self.sendline() 141 time.sleep(0.5) 142 x = self.read_nonblocking(size=1000,timeout=1) 143 time.sleep(0.1) 144 self.sendline() 145 time.sleep(0.5) 146 a = self.read_nonblocking(size=1000,timeout=1) 147 time.sleep(0.1) 148 self.sendline() 149 time.sleep(0.5) 150 b = self.read_nonblocking(size=1000,timeout=1) 151 ld = self.levenshtein_distance(a,b) 152 len_a = len(a) 153 if len_a == 0: 154 return False 155 if float(ld)/len_a < 0.4: 156 return True 157 return False 158 159 ### TODO: This is getting messy and I'm pretty sure this isn't perfect. 160 ### TODO: I need to draw a flow chart for this. 161 def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True): 162 163 """This logs the user into the given server. It uses the 164 'original_prompt' to try to find the prompt right after login. When it 165 finds the prompt it immediately tries to reset the prompt to something 166 more easily matched. The default 'original_prompt' is very optimistic 167 and is easily fooled. It's more reliable to try to match the original 168 prompt as exactly as possible to prevent false matches by server 169 strings such as the "Message Of The Day". On many systems you can 170 disable the MOTD on the remote server by creating a zero-length file 171 called "~/.hushlogin" on the remote server. If a prompt cannot be found 172 then this will not necessarily cause the login to fail. In the case of 173 a timeout when looking for the prompt we assume that the original 174 prompt was so weird that we could not match it, so we use a few tricks 175 to guess when we have reached the prompt. Then we hope for the best and 176 blindly try to reset the prompt to something more unique. If that fails 177 then login() raises an ExceptionPxssh exception. 178 179 In some situations it is not possible or desirable to reset the 180 original prompt. In this case, set 'auto_prompt_reset' to False to 181 inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh 182 uses a unique prompt in the prompt() method. If the original prompt is 183 not reset then this will disable the prompt() method unless you 184 manually set the PROMPT attribute. """ 185 186 ssh_options = '-q' 187 if self.force_password: 188 ssh_options = ssh_options + ' ' + self.SSH_OPTS 189 if port is not None: 190 ssh_options = ssh_options + ' -p %s'%(str(port)) 191 cmd = "ssh %s -l %s %s" % (ssh_options, username, server) 192 193 # This does not distinguish between a remote server 'password' prompt 194 # and a local ssh 'passphrase' prompt (for unlocking a private key). 195 spawn._spawn(self, cmd) 196 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout) 197 198 # First phase 199 if i==0: 200 # New certificate -- always accept it. 201 # This is what you get if SSH does not have the remote host's 202 # public key stored in the 'known_hosts' cache. 203 self.sendline("yes") 204 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) 205 if i==2: # password or passphrase 206 self.sendline(password) 207 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) 208 if i==4: 209 self.sendline(terminal_type) 210 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) 211 212 # Second phase 213 if i==0: 214 # This is weird. This should not happen twice in a row. 215 self.close() 216 raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.') 217 elif i==1: # can occur if you have a public key pair set to authenticate. 218 ### TODO: May NOT be OK if expect() got tricked and matched a false prompt. 219 pass 220 elif i==2: # password prompt again 221 # For incorrect passwords, some ssh servers will 222 # ask for the password again, others return 'denied' right away. 223 # If we get the password prompt again then this means 224 # we didn't get the password right the first time. 225 self.close() 226 raise ExceptionPxssh ('password refused') 227 elif i==3: # permission denied -- password was bad. 228 self.close() 229 raise ExceptionPxssh ('permission denied') 230 elif i==4: # terminal type again? WTF? 231 self.close() 232 raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.') 233 elif i==5: # Timeout 234 #This is tricky... I presume that we are at the command-line prompt. 235 #It may be that the shell prompt was so weird that we couldn't match 236 #it. Or it may be that we couldn't log in for some other reason. I 237 #can't be sure, but it's safe to guess that we did login because if 238 #I presume wrong and we are not logged in then this should be caught 239 #later when I try to set the shell prompt. 240 pass 241 elif i==6: # Connection closed by remote host 242 self.close() 243 raise ExceptionPxssh ('connection closed') 244 else: # Unexpected 245 self.close() 246 raise ExceptionPxssh ('unexpected login response') 247 if not self.sync_original_prompt(): 248 self.close() 249 raise ExceptionPxssh ('could not synchronize with original prompt') 250 # We appear to be in. 251 # set shell prompt to something unique. 252 if auto_prompt_reset: 253 if not self.set_unique_prompt(): 254 self.close() 255 raise ExceptionPxssh ('could not set shell prompt\n'+self.before) 256 return True 257 258 def logout (self): 259 260 """This sends exit to the remote shell. If there are stopped jobs then 261 this automatically sends exit twice. """ 262 263 self.sendline("exit") 264 index = self.expect([EOF, "(?i)there are stopped jobs"]) 265 if index==1: 266 self.sendline("exit") 267 self.expect(EOF) 268 self.close() 269 270 def prompt (self, timeout=20): 271 272 """This matches the shell prompt. This is little more than a short-cut 273 to the expect() method. This returns True if the shell prompt was 274 matched. This returns False if there was a timeout. Note that if you 275 called login() with auto_prompt_reset set to False then you should have 276 manually set the PROMPT attribute to a regex pattern for matching the 277 prompt. """ 278 279 i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout) 280 if i==1: 281 return False 282 return True 283 284 def set_unique_prompt (self): 285 286 """This sets the remote prompt to something more unique than # or $. 287 This makes it easier for the prompt() method to match the shell prompt 288 unambiguously. This method is called automatically by the login() 289 method, but you may want to call it manually if you somehow reset the 290 shell prompt. For example, if you 'su' to a different user then you 291 will need to manually reset the prompt. This sends shell commands to 292 the remote host to set the prompt, so this assumes the remote host is 293 ready to receive commands. 294 295 Alternatively, you may use your own prompt pattern. Just set the PROMPT 296 attribute to a regular expression that matches it. In this case you 297 should call login() with auto_prompt_reset=False; then set the PROMPT 298 attribute. After that the prompt() method will try to match your prompt 299 pattern.""" 300 301 self.sendline ("unset PROMPT_COMMAND") 302 self.sendline (self.PROMPT_SET_SH) # sh-style 303 i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) 304 if i == 0: # csh-style 305 self.sendline (self.PROMPT_SET_CSH) 306 i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) 307 if i == 0: 308 return False 309 return True 310 311# vi:ts=4:sw=4:expandtab:ft=python: 312