1"""This class extends pexpect.spawn to specialize setting up SSH connections.
2This adds methods for login, logout, and expecting the shell prompt.
3
4PEXPECT LICENSE
5
6    This license is approved by the OSI and FSF as GPL-compatible.
7        http://opensource.org/licenses/isc-license.txt
8
9    Copyright (c) 2012, Noah Spurrier <noah@noah.org>
10    PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
11    PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
12    COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
13    THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
14    WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15    MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
16    ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17    WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18    ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
19    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20
21"""
22
23from pexpect import *
24import pexpect
25import time
26import os
27
28__all__ = ['ExceptionPxssh', 'pxssh']
29
30# Exception classes used by this module.
31class ExceptionPxssh(ExceptionPexpect):
32    """Raised for pxssh exceptions.
33    """
34
35class pxssh (spawn):
36
37    """This class extends pexpect.spawn to specialize setting up SSH
38    connections. This adds methods for login, logout, and expecting the shell
39    prompt. It does various tricky things to handle many situations in the SSH
40    login process. For example, if the session is your first login, then pxssh
41    automatically accepts the remote certificate; or if you have public key
42    authentication setup then pxssh won't wait for the password prompt.
43
44    pxssh uses the shell prompt to synchronize output from the remote host. In
45    order to make this more robust it sets the shell prompt to something more
46    unique than just $ or #. This should work on most Borne/Bash or Csh style
47    shells.
48
49    Example that runs a few commands on a remote server and prints the result::
50
51        import pxssh
52        import getpass
53        try:
54            s = pxssh.pxssh()
55            hostname = raw_input('hostname: ')
56            username = raw_input('username: ')
57            password = getpass.getpass('password: ')
58            s.login (hostname, username, password)
59            s.sendline ('uptime')  # run a command
60            s.prompt()             # match the prompt
61            print s.before         # print everything before the prompt.
62            s.sendline ('ls -l')
63            s.prompt()
64            print s.before
65            s.sendline ('df')
66            s.prompt()
67            print s.before
68            s.logout()
69        except pxssh.ExceptionPxssh, e:
70            print "pxssh failed on login."
71            print str(e)
72
73    Note that if you have ssh-agent running while doing development with pxssh
74    then this can lead to a lot of confusion. Many X display managers (xdm,
75    gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
76    dialog box popup asking for a password during development. You should turn
77    off any key agents during testing. The 'force_password' attribute will turn
78    off public key authentication. This will only work if the remote SSH server
79    is configured to allow password logins. Example of using 'force_password'
80    attribute::
81
82            s = pxssh.pxssh()
83            s.force_password = True
84            hostname = raw_input('hostname: ')
85            username = raw_input('username: ')
86            password = getpass.getpass('password: ')
87            s.login (hostname, username, password)
88    """
89
90    def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None):
91
92        spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env)
93
94        self.name = '<pxssh>'
95
96        #SUBTLE HACK ALERT! Note that the command that SETS the prompt uses a
97        #slightly different string than the regular expression to match it. This
98        #is because when you set the prompt the command will echo back, but we
99        #don't want to match the echoed command. So if we make the set command
100        #slightly different than the regex we eliminate the problem. To make the
101        #set command different we add a backslash in front of $. The $ doesn't
102        #need to be escaped, but it doesn't hurt and serves to make the set
103        #prompt command different than the regex.
104
105        # used to match the command-line prompt
106        self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] "
107        self.PROMPT = self.UNIQUE_PROMPT
108
109        # used to set shell command-line prompt to UNIQUE_PROMPT.
110        self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '"
111        self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '"
112        self.SSH_OPTS = ("-o'RSAAuthentication=no'"
113                + " -o 'PubkeyAuthentication=no'")
114# Disabling host key checking, makes you vulnerable to MITM attacks.
115#                + " -o 'StrictHostKeyChecking=no'"
116#                + " -o 'UserKnownHostsFile /dev/null' ")
117        # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from
118        # displaying a GUI password dialog. I have not figured out how to
119        # disable only SSH_ASKPASS without also disabling X11 forwarding.
120        # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
121        #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
122        self.force_password = False
123        self.auto_prompt_reset = True
124
125    def levenshtein_distance(self, a,b):
126
127        """This calculates the Levenshtein distance between a and b.
128        """
129
130        n, m = len(a), len(b)
131        if n > m:
132            a,b = b,a
133            n,m = m,n
134        current = range(n+1)
135        for i in range(1,m+1):
136            previous, current = current, [i]+[0]*n
137            for j in range(1,n+1):
138                add, delete = previous[j]+1, current[j-1]+1
139                change = previous[j-1]
140                if a[j-1] != b[i-1]:
141                    change = change + 1
142                current[j] = min(add, delete, change)
143        return current[n]
144
145    def sync_original_prompt (self):
146
147        """This attempts to find the prompt. Basically, press enter and record
148        the response; press enter again and record the response; if the two
149        responses are similar then assume we are at the original prompt. This
150        is a slow function. It can take over 10 seconds. """
151
152        # All of these timing pace values are magic.
153        # I came up with these based on what seemed reliable for
154        # connecting to a heavily loaded machine I have.
155        self.sendline()
156        time.sleep(0.1)
157        # If latency is worse than these values then this will fail.
158
159        try:
160            # Clear the buffer before getting the prompt.
161            self.read_nonblocking(size=10000,timeout=1)
162        except TIMEOUT:
163            pass
164        time.sleep(0.1)
165        self.sendline()
166        time.sleep(0.5)
167        x = self.read_nonblocking(size=1000,timeout=1)
168        time.sleep(0.1)
169        self.sendline()
170        time.sleep(0.5)
171        a = self.read_nonblocking(size=1000,timeout=1)
172        time.sleep(0.1)
173        self.sendline()
174        time.sleep(0.5)
175        b = self.read_nonblocking(size=1000,timeout=1)
176        ld = self.levenshtein_distance(a,b)
177        len_a = len(a)
178        if len_a == 0:
179            return False
180        if float(ld)/len_a < 0.4:
181            return True
182        return False
183
184    ### TODO: This is getting messy and I'm pretty sure this isn't perfect.
185    ### TODO: I need to draw a flow chart for this.
186    def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True,ssh_key=None):
187
188        """This logs the user into the given server. It uses the
189        'original_prompt' to try to find the prompt right after login. When it
190        finds the prompt it immediately tries to reset the prompt to something
191        more easily matched. The default 'original_prompt' is very optimistic
192        and is easily fooled. It's more reliable to try to match the original
193        prompt as exactly as possible to prevent false matches by server
194        strings such as the "Message Of The Day". On many systems you can
195        disable the MOTD on the remote server by creating a zero-length file
196        called "~/.hushlogin" on the remote server. If a prompt cannot be found
197        then this will not necessarily cause the login to fail. In the case of
198        a timeout when looking for the prompt we assume that the original
199        prompt was so weird that we could not match it, so we use a few tricks
200        to guess when we have reached the prompt. Then we hope for the best and
201        blindly try to reset the prompt to something more unique. If that fails
202        then login() raises an ExceptionPxssh exception.
203
204        In some situations it is not possible or desirable to reset the
205        original prompt. In this case, set 'auto_prompt_reset' to False to
206        inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
207        uses a unique prompt in the prompt() method. If the original prompt is
208        not reset then this will disable the prompt() method unless you
209        manually set the PROMPT attribute. """
210
211        ssh_options = '-q'
212        if self.force_password:
213            ssh_options = ssh_options + ' ' + self.SSH_OPTS
214        if port is not None:
215            ssh_options = ssh_options + ' -p %s'%(str(port))
216        if ssh_key is not None:
217            try:
218                os.path.isfile(ssh_key)
219            except:
220                raise ExceptionPxssh ('private ssh key does not exist')
221            ssh_options = ssh_options + ' -i %s' % (ssh_key)
222        cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
223
224        # This does not distinguish between a remote server 'password' prompt
225        # and a local ssh 'passphrase' prompt (for unlocking a private key).
226        spawn._spawn(self, cmd)
227        i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout)
228
229        # First phase
230        if i==0:
231            # New certificate -- always accept it.
232            # This is what you get if SSH does not have the remote host's
233            # public key stored in the 'known_hosts' cache.
234            self.sendline("yes")
235            i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
236        if i==2: # password or passphrase
237            self.sendline(password)
238            i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
239        if i==4:
240            self.sendline(terminal_type)
241            i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
242
243        # Second phase
244        if i==0:
245            # This is weird. This should not happen twice in a row.
246            self.close()
247            raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.')
248        elif i==1: # can occur if you have a public key pair set to authenticate.
249            ### TODO: May NOT be OK if expect() got tricked and matched a false prompt.
250            pass
251        elif i==2: # password prompt again
252            # For incorrect passwords, some ssh servers will
253            # ask for the password again, others return 'denied' right away.
254            # If we get the password prompt again then this means
255            # we didn't get the password right the first time.
256            self.close()
257            raise ExceptionPxssh ('password refused')
258        elif i==3: # permission denied -- password was bad.
259            self.close()
260            raise ExceptionPxssh ('permission denied')
261        elif i==4: # terminal type again? WTF?
262            self.close()
263            raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.')
264        elif i==5: # Timeout
265            #This is tricky... I presume that we are at the command-line prompt.
266            #It may be that the shell prompt was so weird that we couldn't match
267            #it. Or it may be that we couldn't log in for some other reason. I
268            #can't be sure, but it's safe to guess that we did login because if
269            #I presume wrong and we are not logged in then this should be caught
270            #later when I try to set the shell prompt.
271            pass
272        elif i==6: # Connection closed by remote host
273            self.close()
274            raise ExceptionPxssh ('connection closed')
275        else: # Unexpected
276            self.close()
277            raise ExceptionPxssh ('unexpected login response')
278        if not self.sync_original_prompt():
279            self.close()
280            raise ExceptionPxssh ('could not synchronize with original prompt')
281        # We appear to be in.
282        # set shell prompt to something unique.
283        if auto_prompt_reset:
284            if not self.set_unique_prompt():
285                self.close()
286                raise ExceptionPxssh ('could not set shell prompt\n'+self.before)
287        return True
288
289    def logout (self):
290
291        """This sends exit to the remote shell. If there are stopped jobs then
292        this automatically sends exit twice. """
293
294        self.sendline("exit")
295        index = self.expect([EOF, "(?i)there are stopped jobs"])
296        if index==1:
297            self.sendline("exit")
298            self.expect(EOF)
299        self.close()
300
301    def prompt (self, timeout=-1):
302
303        """This matches the shell prompt. This is little more than a short-cut
304        to the expect() method. This returns True if the shell prompt was
305        matched. This returns False if a timeout was raised. Note that if you
306        called login() with auto_prompt_reset set to False then before calling
307        prompt() you must set the PROMPT attribute to a regex that prompt()
308        will use for matching the prompt. Calling prompt() will erase the
309        contents of the 'before' attribute even if no prompt is ever matched.
310        If timeout is not given or it is set to -1 then self.timeout is used.
311        """
312
313        if timeout == -1:
314            timeout = self.timeout
315        i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
316        if i==1:
317            return False
318        return True
319
320    def set_unique_prompt (self):
321
322        """This sets the remote prompt to something more unique than # or $.
323        This makes it easier for the prompt() method to match the shell prompt
324        unambiguously. This method is called automatically by the login()
325        method, but you may want to call it manually if you somehow reset the
326        shell prompt. For example, if you 'su' to a different user then you
327        will need to manually reset the prompt. This sends shell commands to
328        the remote host to set the prompt, so this assumes the remote host is
329        ready to receive commands.
330
331        Alternatively, you may use your own prompt pattern. Just set the PROMPT
332        attribute to a regular expression that matches it. In this case you
333        should call login() with auto_prompt_reset=False; then set the PROMPT
334        attribute. After that the prompt() method will try to match your prompt
335        pattern."""
336
337        self.sendline ("unset PROMPT_COMMAND")
338        self.sendline (self.PROMPT_SET_SH) # sh-style
339        i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
340        if i == 0: # csh-style
341            self.sendline (self.PROMPT_SET_CSH)
342            i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
343            if i == 0:
344                return False
345        return True
346
347# vi:ts=4:sw=4:expandtab:ft=python:
348