1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

Upgrade pexpect to 3.3

This commit is contained in:
Matthew Jones 2014-08-06 14:00:43 -04:00
parent b18b075bc9
commit 92449d3f23
5 changed files with 373 additions and 192 deletions

View File

@ -39,7 +39,7 @@ os-diskconfig-python-novaclient-ext==0.1.2 (os_diskconfig_python_novaclient_ext/
os-networksv2-python-novaclient-ext==0.21 (os_networksv2_python_novaclient_ext.py)
os-virtual-interfacesv2-python-novaclient-ext==0.15 (os_virtual_interfacesv2_python_novaclient_ext.py)
pbr==0.8.0 (pbr/*)
pexpect==3.1 (pexpect/*, excluded pxssh.py, fdpexpect.py, FSM.py, screen.py,
pexpect==3.3 (pexpect/*, excluded pxssh.py, fdpexpect.py, FSM.py, screen.py,
ANSI.py)
pip==1.5.4 (pip/*, excluded bin/pip*)
prettytable==0.7.2 (prettytable.py)

View File

@ -80,6 +80,7 @@ try:
import traceback
import signal
import codecs
import stat
except ImportError: # pragma: no cover
err = sys.exc_info()[1]
raise ImportError(str(err) + '''
@ -87,7 +88,7 @@ except ImportError: # pragma: no cover
A critical module was not found. Probably this operating system does not
support it. Pexpect is intended for UNIX-like operating systems.''')
__version__ = '3.1'
__version__ = '3.3'
__revision__ = ''
__all__ = ['ExceptionPexpect', 'EOF', 'TIMEOUT', 'spawn', 'spawnu', 'run', 'runu',
'which', 'split_command_line', '__version__', '__revision__']
@ -284,6 +285,7 @@ class spawn(object):
def _chr(c):
return bytes([c])
linesep = os.linesep.encode('ascii')
crlf = '\r\n'.encode('ascii')
@staticmethod
def write_to_stdout(b):
@ -296,13 +298,14 @@ class spawn(object):
allowed_string_types = (basestring,) # analysis:ignore
_chr = staticmethod(chr)
linesep = os.linesep
crlf = '\r\n'
write_to_stdout = sys.stdout.write
encoding = None
def __init__(self, command, args=[], timeout=30, maxread=2000,
searchwindowsize=None, logfile=None, cwd=None, env=None,
ignore_sighup=True):
ignore_sighup=True, echo=True):
'''This is the constructor. The command parameter may be a string that
includes a command and any arguments to the command. For example::
@ -415,7 +418,16 @@ class spawn(object):
signalstatus will store the signal value and exitstatus will be None.
If you need more detail you can also read the self.status member which
stores the status returned by os.waitpid. You can interpret this using
os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG. '''
os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG.
The echo attribute may be set to False to disable echoing of input.
As a pseudo-terminal, all input echoed by the "keyboard" (send()
or sendline()) will be repeated to output. For many cases, it is
not desirable to have echo enabled, and it may be later disabled
using setecho(False) followed by waitnoecho(). However, for some
platforms such as Solaris, this is not possible, and should be
disabled immediately on spawn.
'''
self.STDIN_FILENO = pty.STDIN_FILENO
self.STDOUT_FILENO = pty.STDOUT_FILENO
@ -437,7 +449,7 @@ class spawn(object):
self.status = None
self.flag_eof = False
self.pid = None
# the chile filedescriptor is initially closed
# the child file descriptor is initially closed
self.child_fd = -1
self.timeout = timeout
self.delimiter = EOF
@ -466,16 +478,30 @@ class spawn(object):
self.closed = True
self.cwd = cwd
self.env = env
self.echo = echo
self.ignore_sighup = ignore_sighup
_platform = sys.platform.lower()
# This flags if we are running on irix
self.__irix_hack = (sys.platform.lower().find('irix') >= 0)
self.__irix_hack = _platform.startswith('irix')
# Solaris uses internal __fork_pty(). All others use pty.fork().
if ((sys.platform.lower().find('solaris') >= 0)
or (sys.platform.lower().find('sunos5') >= 0)):
self.use_native_pty_fork = False
else:
self.use_native_pty_fork = True
self.use_native_pty_fork = not (
_platform.startswith('solaris') or
_platform.startswith('sunos'))
# inherit EOF and INTR definitions from controlling process.
try:
from termios import VEOF, VINTR
fd = sys.__stdin__.fileno()
self._INTR = ord(termios.tcgetattr(fd)[6][VINTR])
self._EOF = ord(termios.tcgetattr(fd)[6][VEOF])
except (ImportError, OSError, IOError, termios.error):
# unless the controlling process is also not a terminal,
# such as cron(1). Fall-back to using CEOF and CINTR.
try:
from termios import CEOF, CINTR
(self._INTR, self._EOF) = (CINTR, CEOF)
except ImportError:
# ^C, ^D
(self._INTR, self._EOF) = (3, 4)
# Support subclasses that do not use command or args.
if command is None:
self.command = None
@ -599,33 +625,39 @@ class spawn(object):
if self.use_native_pty_fork:
try:
self.pid, self.child_fd = pty.fork()
except OSError:
except OSError: # pragma: no cover
err = sys.exc_info()[1]
raise ExceptionPexpect('pty.fork() failed: ' + str(err))
else:
# Use internal __fork_pty
self.pid, self.child_fd = self.__fork_pty()
if self.pid == 0:
# Some platforms must call setwinsize() and setecho() from the
# child process, and others from the master process. We do both,
# allowing IOError for either.
if self.pid == pty.CHILD:
# Child
self.child_fd = self.STDIN_FILENO
# set default window size of 24 rows by 80 columns
try:
# used by setwinsize()
self.child_fd = sys.stdout.fileno()
self.setwinsize(24, 80)
# which exception, shouldnt' we catch explicitly .. ?
except:
# Some platforms do not like setwinsize (Cygwin).
# This will cause problem when running applications that
# are very picky about window size.
# This is a serious limitation, but not a show stopper.
pass
except IOError as err:
if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
raise
# disable echo if spawn argument echo was unset
if not self.echo:
try:
self.setecho(self.echo)
except (IOError, termios.error) as err:
if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
raise
# Do not allow child to inherit open file descriptors from parent.
max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
for i in range(3, max_fd):
try:
os.close(i)
except OSError:
pass
os.closerange(3, max_fd)
if self.ignore_sighup:
signal.signal(signal.SIGHUP, signal.SIG_IGN)
@ -638,6 +670,13 @@ class spawn(object):
os.execvpe(self.command, self.args, self.env)
# Parent
try:
self.setwinsize(24, 80)
except IOError as err:
if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
raise
self.terminated = False
self.closed = False
@ -660,19 +699,15 @@ class spawn(object):
raise ExceptionPexpect("Could not open with os.openpty().")
pid = os.fork()
if pid < 0:
raise ExceptionPexpect("Failed os.fork().")
elif pid == 0:
if pid == pty.CHILD:
# Child.
os.close(parent_fd)
self.__pty_make_controlling_tty(child_fd)
os.dup2(child_fd, 0)
os.dup2(child_fd, 1)
os.dup2(child_fd, 2)
os.dup2(child_fd, self.STDIN_FILENO)
os.dup2(child_fd, self.STDOUT_FILENO)
os.dup2(child_fd, self.STDERR_FILENO)
if child_fd > 2:
os.close(child_fd)
else:
# Parent.
os.close(child_fd)
@ -686,45 +721,37 @@ class spawn(object):
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty. Harmless if not already connected.
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
if fd >= 0:
os.close(fd)
# which exception, shouldnt' we catch explicitly .. ?
except:
# Already disconnected. This happens if running inside cron.
pass
except OSError as err:
if err.errno != errno.ENXIO:
raise
os.setsid()
# Verify we are disconnected from controlling tty
# by attempting to open it again.
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
if fd >= 0:
os.close(fd)
raise ExceptionPexpect('Failed to disconnect from ' +
'controlling tty. It is still possible to open /dev/tty.')
# which exception, shouldnt' we catch explicitly .. ?
except:
# Good! We are disconnected from a controlling tty.
pass
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
if fd < 0:
raise ExceptionPexpect("Could not open child pty, " + child_name)
else:
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
if fd < 0:
raise ExceptionPexpect("Could not open controlling tty, /dev/tty")
else:
os.close(fd)
def fileno(self):
'''This returns the file descriptor of the pty for the child.
'''
@ -757,7 +784,12 @@ class spawn(object):
def isatty(self):
'''This returns True if the file descriptor is open and connected to a
tty(-like) device, else False. '''
tty(-like) device, else False.
On SVR4-style platforms implementing streams, such as SunOS and HP-UX,
the child pty may not appear as a terminal device. This means
methods such as setecho(), setwinsize(), getwinsize() may raise an
IOError. '''
return os.isatty(self.child_fd)
@ -794,12 +826,20 @@ class spawn(object):
def getecho(self):
'''This returns the terminal echo mode. This returns True if echo is
on or False if echo is off. Child applications that are expecting you
to enter a password often set ECHO False. See waitnoecho(). '''
to enter a password often set ECHO False. See waitnoecho().
Not supported on platforms where ``isatty()`` returns False. '''
try:
attr = termios.tcgetattr(self.child_fd)
if attr[3] & termios.ECHO:
return True
return False
except termios.error as err:
errmsg = 'getecho() may not be called on this platform'
if err.args[0] == errno.EINVAL:
raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
raise
self.echo = bool(attr[3] & termios.ECHO)
return self.echo
def setecho(self, state):
'''This sets the terminal echo mode on or off. Note that anything the
@ -829,18 +869,35 @@ class spawn(object):
p.expect(['1234'])
p.expect(['abcd'])
p.expect(['wxyz'])
Not supported on platforms where ``isatty()`` returns False.
'''
self.child_fd
errmsg = 'setecho() may not be called on this platform'
try:
attr = termios.tcgetattr(self.child_fd)
except termios.error as err:
if err.args[0] == errno.EINVAL:
raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
raise
if state:
attr[3] = attr[3] | termios.ECHO
else:
attr[3] = attr[3] & ~termios.ECHO
# I tried TCSADRAIN and TCSAFLUSH, but
# these were inconsistent and blocked on some platforms.
# TCSADRAIN would probably be ideal if it worked.
try:
# I tried TCSADRAIN and TCSAFLUSH, but these were inconsistent and
# blocked on some platforms. TCSADRAIN would probably be ideal.
termios.tcsetattr(self.child_fd, termios.TCSANOW, attr)
except IOError as err:
if err.args[0] == errno.EINVAL:
raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
raise
self.echo = state
def _log(self, s, direction):
if self.logfile is not None:
@ -913,12 +970,14 @@ class spawn(object):
if self.child_fd in r:
try:
s = os.read(self.child_fd, size)
except OSError:
# Linux does this
except OSError as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
self.flag_eof = True
raise EOF('End Of File (EOF). Exception style platform.')
raise
if s == b'':
# BSD style
# BSD-style EOF
self.flag_eof = True
raise EOF('End Of File (EOF). Empty string style platform.')
@ -926,7 +985,7 @@ class spawn(object):
self._log(s, 'read')
return s
raise ExceptionPexpect('Reached an unexpected state.')
raise ExceptionPexpect('Reached an unexpected state.') # pragma: no cover
def read(self, size=-1):
'''This reads at most "size" bytes from the file (less if the read hits
@ -972,9 +1031,9 @@ class spawn(object):
if size == 0:
return self.string_type()
# delimiter default is EOF
index = self.expect([b'\r\n', self.delimiter])
index = self.expect([self.crlf, self.delimiter])
if index == 0:
return self.before + b'\r\n'
return self.before + self.crlf
else:
return self.before
@ -1075,40 +1134,14 @@ class spawn(object):
It is the responsibility of the caller to ensure the eof is sent at the
beginning of a line. '''
### Hmmm... how do I send an EOF?
###C if ((m = write(pty, *buf, p - *buf)) < 0)
###C return (errno == EWOULDBLOCK) ? n : -1;
#fd = sys.stdin.fileno()
#old = termios.tcgetattr(fd) # remember current state
#attr = termios.tcgetattr(fd)
#attr[3] = attr[3] | termios.ICANON # ICANON must be set to see EOF
#try: # use try/finally to ensure state gets restored
# termios.tcsetattr(fd, termios.TCSADRAIN, attr)
# if hasattr(termios, 'CEOF'):
# os.write(self.child_fd, '%c' % termios.CEOF)
# else:
# # Silly platform does not define CEOF so assume CTRL-D
# os.write(self.child_fd, '%c' % 4)
#finally: # restore state
# termios.tcsetattr(fd, termios.TCSADRAIN, old)
if hasattr(termios, 'VEOF'):
char = ord(termios.tcgetattr(self.child_fd)[6][termios.VEOF])
else:
# platform does not define VEOF so assume CTRL-D
char = 4
self.send(self._chr(char))
self.send(self._chr(self._EOF))
def sendintr(self):
'''This sends a SIGINT to the child. It does not require
the SIGINT to be the first character on a line. '''
if hasattr(termios, 'VINTR'):
char = ord(termios.tcgetattr(self.child_fd)[6][termios.VINTR])
else:
# platform does not define VINTR so assume CTRL-C
char = 3
self.send(self._chr(char))
self.send(self._chr(self._INTR))
def eof(self):
@ -1181,7 +1214,7 @@ class spawn(object):
self.exitstatus = None
self.signalstatus = os.WTERMSIG(status)
self.terminated = True
elif os.WIFSTOPPED(status):
elif os.WIFSTOPPED(status): # pragma: no cover
# You can't call wait() on a child process in the stopped state.
raise ExceptionPexpect('Called wait() on a stopped child ' +
'process. This is not supported. Is some other ' +
@ -1201,7 +1234,7 @@ class spawn(object):
if self.flag_eof:
# This is for Linux, which requires the blocking form
# of waitpid to # get status of a defunct process.
# of waitpid to get the status of a defunct process.
# This is super-lame. The flag_eof would have been set
# in read_nonblocking(), so this should be safe.
waitpid_options = 0
@ -1229,7 +1262,7 @@ class spawn(object):
try:
### os.WNOHANG) # Solaris!
pid, status = os.waitpid(self.pid, waitpid_options)
except OSError as e:
except OSError as e: # pragma: no cover
# This should never happen...
if e.errno == errno.ECHILD:
raise ExceptionPexpect('isalive() encountered condition ' +
@ -1558,18 +1591,11 @@ class spawn(object):
applications like vi or curses -- applications that respond to the
SIGWINCH signal. '''
# Check for buggy platforms. Some Python versions on some platforms
# (notably OSF1 Alpha and RedHat 7.1) truncate the value for
# termios.TIOCSWINSZ. It is not clear why this happens.
# These platforms don't seem to handle the signed int very well;
# yet other platforms like OpenBSD have a large negative value for
# TIOCSWINSZ and they don't have a truncate problem.
# Newer versions of Linux have totally different values for TIOCSWINSZ.
# Note that this fix is a hack.
# Some very old platforms have a bug that causes the value for
# termios.TIOCSWINSZ to be truncated. There was a hack here to work
# around this, but it caused problems with newer platforms so has been
# removed. For details see https://github.com/pexpect/pexpect/issues/39
TIOCSWINSZ = getattr(termios, 'TIOCSWINSZ', -2146929561)
if TIOCSWINSZ == 2148037735:
# Same bits, but with sign.
TIOCSWINSZ = -2146929561
# Note, assume ws_xpixel and ws_ypixel are zero.
s = struct.pack('HHHH', rows, cols, 0, 0)
fcntl.ioctl(self.fileno(), TIOCSWINSZ, s)
@ -1650,10 +1676,14 @@ class spawn(object):
if self.child_fd in r:
try:
data = self.__interact_read(self.child_fd)
except OSError as e:
# The subprocess may have closed before we get to reading it
if e.errno != errno.EIO:
except OSError as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
break
raise
if data == b'':
# BSD-style EOF
break
if output_filter:
data = output_filter(data)
if self.logfile is not None:
@ -1687,7 +1717,7 @@ class spawn(object):
return select.select(iwtd, owtd, ewtd, timeout)
except select.error:
err = sys.exc_info()[1]
if err.errno == errno.EINTR:
if err.args[0] == errno.EINTR:
# if we loop back we have to subtract the
# amount of time we already waited.
if timeout is not None:
@ -1702,7 +1732,7 @@ class spawn(object):
##############################################################################
# The following methods are no longer supported or allowed.
def setmaxread(self, maxread):
def setmaxread(self, maxread): # pragma: no cover
'''This method is no longer supported or allowed. I don't like getters
and setters without a good reason. '''
@ -1711,7 +1741,7 @@ class spawn(object):
'or allowed. Just assign a value to the ' +
'maxread member variable.')
def setlog(self, fileobject):
def setlog(self, fileobject): # pragma: no cover
'''This method is no longer supported or allowed.
'''
@ -1739,11 +1769,13 @@ class spawnu(spawn):
allowed_string_types = (str, )
_chr = staticmethod(chr)
linesep = os.linesep
crlf = '\r\n'
else:
string_type = unicode
allowed_string_types = (unicode, )
_chr = staticmethod(unichr)
linesep = os.linesep.decode('ascii')
crlf = '\r\n'.decode('ascii')
# This can handle unicode in both Python 2 and 3
write_to_stdout = sys.stdout.write
@ -1966,15 +1998,55 @@ class searcher_re(object):
return best_index
def which(filename):
def is_executable_file(path):
"""Checks that path is an executable regular file (or a symlink to a file).
This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``, but
on some platforms :func:`os.access` gives us the wrong answer, so this
checks permission bits directly.
"""
# follow symlinks,
fpath = os.path.realpath(path)
# return False for non-files (directories, fifo, etc.)
if not os.path.isfile(fpath):
return False
# On Solaris, etc., "If the process has appropriate privileges, an
# implementation may indicate success for X_OK even if none of the
# execute file permission bits are set."
#
# For this reason, it is necessary to explicitly check st_mode
# get file mode using os.stat, and check if `other',
# that is anybody, may read and execute.
mode = os.stat(fpath).st_mode
if mode & stat.S_IROTH and mode & stat.S_IXOTH:
return True
# get current user's group ids, and check if `group',
# when matching ours, may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_gid in user_gids and
mode & stat.S_IRGRP and mode & stat.S_IXGRP):
return True
# finally, if file owner matches our effective userid,
# check if `user', may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_uid == os.geteuid() and
mode & stat.S_IRUSR and mode & stat.S_IXUSR):
return True
return False
def which(filename):
'''This takes a given filename; tries to find it in the environment path;
then checks if it is executable. This returns the full path to the filename
if found and executable. Otherwise this returns None.'''
# Special case where filename contains an explicit path.
if os.path.dirname(filename) != '':
if os.access(filename, os.X_OK):
if os.path.dirname(filename) != '' and is_executable_file(filename):
return filename
if 'PATH' not in os.environ or os.environ['PATH'] == '':
p = os.defpath
@ -1983,7 +2055,7 @@ def which(filename):
pathlist = p.split(os.pathsep)
for path in pathlist:
ff = os.path.join(path, filename)
if os.access(ff, os.X_OK):
if is_executable_file(ff):
return ff
return None
@ -2048,4 +2120,4 @@ def split_command_line(command_line):
arg_list.append(arg)
return arg_list
# vi:set sr et ts=4 sw=4 ft=python :
# vim: set shiftround expandtab tabstop=4 shiftwidth=4 ft=python autoindent :

View File

@ -89,9 +89,9 @@ class fdspawn (spawn):
except:
return False
def terminate (self, force=False):
def terminate (self, force=False): # pragma: no cover
raise ExceptionPexpect('This method is not valid for file descriptors.')
def kill (self, sig):
def kill (self, sig): # pragma: no cover
"""No-op - no process to kill."""
return

View File

@ -53,20 +53,20 @@ class pxssh (spawn):
hostname = raw_input('hostname: ')
username = raw_input('username: ')
password = getpass.getpass('password: ')
s.login (hostname, username, password)
s.sendline ('uptime') # run a command
s.login(hostname, username, password)
s.sendline('uptime') # run a command
s.prompt() # match the prompt
print s.before # print everything before the prompt.
s.sendline ('ls -l')
print(s.before) # print everything before the prompt.
s.sendline('ls -l')
s.prompt()
print s.before
s.sendline ('df')
print(s.before)
s.sendline('df')
s.prompt()
print s.before
print(s.before)
s.logout()
except pxssh.ExceptionPxssh, e:
print "pxssh failed on login."
print str(e)
except pxssh.ExceptionPxssh as e:
print("pxssh failed on login.")
print(e)
Note that if you have ssh-agent running while doing development with pxssh
then this can lead to a lot of confusion. Many X display managers (xdm,
@ -85,7 +85,8 @@ class pxssh (spawn):
s.login (hostname, username, password)
'''
def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None):
def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None,
logfile=None, cwd=None, env=None):
spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env)
@ -118,10 +119,8 @@ class pxssh (spawn):
# Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
#self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
self.force_password = False
self.auto_prompt_reset = True
def levenshtein_distance(self, a,b):
def levenshtein_distance(self, a, b):
'''This calculates the Levenshtein distance between a and b.
'''
@ -141,7 +140,6 @@ class pxssh (spawn):
return current[n]
def try_read_prompt(self, timeout_multiplier):
'''This facilitates using communication timeouts to perform
synchronization as quickly as possible, while supporting high latency
connections with a tunable worst case performance. Fast connections
@ -174,7 +172,6 @@ class pxssh (spawn):
return prompt
def sync_original_prompt (self, sync_multiplier=1.0):
'''This attempts to find the prompt. Basically, press enter and record
the response; press enter again and record the response; if the two
responses are similar then assume we are at the original prompt.
@ -214,9 +211,13 @@ class pxssh (spawn):
### TODO: This is getting messy and I'm pretty sure this isn't perfect.
### TODO: I need to draw a flow chart for this.
def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True,ssh_key=None,quiet=True,sync_multiplier=1,check_local_ip=True):
def login (self, server, username, password='', terminal_type='ansi',
original_prompt=r"[#$]", login_timeout=10, port=None,
auto_prompt_reset=True, ssh_key=None, quiet=True,
sync_multiplier=1, check_local_ip=True):
'''This logs the user into the given server.
'''This logs the user into the given server. It uses the
It uses
'original_prompt' to try to find the prompt right after login. When it
finds the prompt it immediately tries to reset the prompt to something
more easily matched. The default 'original_prompt' is very optimistic
@ -224,7 +225,7 @@ class pxssh (spawn):
prompt as exactly as possible to prevent false matches by server
strings such as the "Message Of The Day". On many systems you can
disable the MOTD on the remote server by creating a zero-length file
called "~/.hushlogin" on the remote server. If a prompt cannot be found
called :file:`~/.hushlogin` on the remote server. If a prompt cannot be found
then this will not necessarily cause the login to fail. In the case of
a timeout when looking for the prompt we assume that the original
prompt was so weird that we could not match it, so we use a few tricks
@ -233,11 +234,12 @@ class pxssh (spawn):
then login() raises an :class:`ExceptionPxssh` exception.
In some situations it is not possible or desirable to reset the
original prompt. In this case, set :attr:`auto_prompt_reset` to False to
original prompt. In this case, pass ``auto_prompt_reset=False`` to
inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
uses a unique prompt in the prompt() method. If the original prompt is
not reset then this will disable the prompt() method unless you
manually set the PROMPT attribute. '''
uses a unique prompt in the :meth:`prompt` method. If the original prompt is
not reset then this will disable the :meth:`prompt` method unless you
manually set the :attr:`PROMPT` attribute.
'''
ssh_options = ''
if quiet:
@ -252,7 +254,7 @@ class pxssh (spawn):
try:
os.path.isfile(ssh_key)
except:
raise ExceptionPxssh ('private ssh key does not exist')
raise ExceptionPxssh('private ssh key does not exist')
ssh_options = ssh_options + ' -i %s' % (ssh_key)
cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
@ -279,7 +281,7 @@ class pxssh (spawn):
if i==0:
# This is weird. This should not happen twice in a row.
self.close()
raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.')
raise ExceptionPxssh('Weird error. Got "are you sure" prompt twice.')
elif i==1: # can occur if you have a public key pair set to authenticate.
### TODO: May NOT be OK if expect() got tricked and matched a false prompt.
pass
@ -289,13 +291,13 @@ class pxssh (spawn):
# If we get the password prompt again then this means
# we didn't get the password right the first time.
self.close()
raise ExceptionPxssh ('password refused')
raise ExceptionPxssh('password refused')
elif i==3: # permission denied -- password was bad.
self.close()
raise ExceptionPxssh ('permission denied')
raise ExceptionPxssh('permission denied')
elif i==4: # terminal type again? WTF?
self.close()
raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.')
raise ExceptionPxssh('Weird error. Got "terminal type" prompt twice.')
elif i==5: # Timeout
#This is tricky... I presume that we are at the command-line prompt.
#It may be that the shell prompt was so weird that we couldn't match
@ -306,26 +308,28 @@ class pxssh (spawn):
pass
elif i==6: # Connection closed by remote host
self.close()
raise ExceptionPxssh ('connection closed')
raise ExceptionPxssh('connection closed')
else: # Unexpected
self.close()
raise ExceptionPxssh ('unexpected login response')
raise ExceptionPxssh('unexpected login response')
if not self.sync_original_prompt(sync_multiplier):
self.close()
raise ExceptionPxssh ('could not synchronize with original prompt')
raise ExceptionPxssh('could not synchronize with original prompt')
# We appear to be in.
# set shell prompt to something unique.
if auto_prompt_reset:
if not self.set_unique_prompt():
self.close()
raise ExceptionPxssh ('could not set shell prompt\n'+self.before)
raise ExceptionPxssh('could not set shell prompt '
'(recieved: %r, expected: %r).' % (
self.before, self.PROMPT,))
return True
def logout (self):
'''Sends exit to the remote shell.
'''This sends exit to the remote shell. If there are stopped jobs then
this automatically sends exit twice. '''
If there are stopped jobs then this automatically sends exit twice.
'''
self.sendline("exit")
index = self.expect([EOF, "(?i)there are stopped jobs"])
if index==1:
@ -333,18 +337,21 @@ class pxssh (spawn):
self.expect(EOF)
self.close()
def prompt (self, timeout=-1):
def prompt(self, timeout=-1):
'''Match the next shell prompt.
'''This matches the shell prompt. This is little more than a short-cut
to the expect() method. This returns True if the shell prompt was
matched. This returns False if a timeout was raised. Note that if you
called :meth:`login` with :attr:`auto_prompt_reset` set to False then
before calling :meth:`prompt` you must set the :attr:`PROMPT` attribute
to a regex that it will use for matching the prompt.
This is little more than a short-cut to the :meth:`~pexpect.spawn.expect`
method. Note that if you called :meth:`login` with
``auto_prompt_reset=False``, then before calling :meth:`prompt` you must
set the :attr:`PROMPT` attribute to a regex that it will use for
matching the prompt.
Calling :meth:`prompt` will erase the contents of the :attr:`before`
attribute even if no prompt is ever matched. If timeout is not given or
it is set to -1 then self.timeout is used.
:return: True if the shell prompt was matched, False if the timeout was
reached.
'''
if timeout == -1:
@ -354,9 +361,8 @@ class pxssh (spawn):
return False
return True
def set_unique_prompt (self):
'''This sets the remote prompt to something more unique than # or $.
def set_unique_prompt(self):
'''This sets the remote prompt to something more unique than ``#`` or ``$``.
This makes it easier for the :meth:`prompt` method to match the shell prompt
unambiguously. This method is called automatically by the :meth:`login`
method, but you may want to call it manually if you somehow reset the
@ -365,18 +371,18 @@ class pxssh (spawn):
the remote host to set the prompt, so this assumes the remote host is
ready to receive commands.
Alternatively, you may use your own prompt pattern. Just set the PROMPT
attribute to a regular expression that matches it. In this case you
should call login() with auto_prompt_reset=False; then set the PROMPT
attribute. After that the prompt() method will try to match your prompt
pattern.'''
Alternatively, you may use your own prompt pattern. In this case you
should call :meth:`login` with ``auto_prompt_reset=False``; then set the
:attr:`PROMPT` attribute to a regular expression. After that, the
:meth:`prompt` method will try to match your prompt pattern.
'''
self.sendline ("unset PROMPT_COMMAND")
self.sendline (self.PROMPT_SET_SH) # sh-style
self.sendline("unset PROMPT_COMMAND")
self.sendline(self.PROMPT_SET_SH) # sh-style
i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
if i == 0: # csh-style
self.sendline (self.PROMPT_SET_CSH)
i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
self.sendline(self.PROMPT_SET_CSH)
i = self.expect([TIMEOUT, self.PROMPT], timeout=10)
if i == 0:
return False
return True

View File

@ -0,0 +1,103 @@
"""Generic wrapper for read-eval-print-loops, a.k.a. interactive shells
"""
import signal
import sys
import re
import pexpect
PY3 = (sys.version_info[0] >= 3)
if PY3:
def u(s): return s
else:
def u(s): return s.decode('utf-8')
PEXPECT_PROMPT = u('[PEXPECT_PROMPT>')
PEXPECT_CONTINUATION_PROMPT = u('[PEXPECT_PROMPT+')
class REPLWrapper(object):
"""Wrapper for a REPL.
:param cmd_or_spawn: This can either be an instance of :class:`pexpect.spawn`
in which a REPL has already been started, or a str command to start a new
REPL process.
:param str orig_prompt: The prompt to expect at first.
:param str prompt_change: A command to change the prompt to something more
unique. If this is ``None``, the prompt will not be changed. This will
be formatted with the new and continuation prompts as positional
parameters, so you can use ``{}`` style formatting to insert them into
the command.
:param str new_prompt: The more unique prompt to expect after the change.
"""
def __init__(self, cmd_or_spawn, orig_prompt, prompt_change,
new_prompt=PEXPECT_PROMPT,
continuation_prompt=PEXPECT_CONTINUATION_PROMPT):
if isinstance(cmd_or_spawn, str):
self.child = pexpect.spawnu(cmd_or_spawn, echo=False)
else:
self.child = cmd_or_spawn
if self.child.echo:
# Existing spawn instance has echo enabled, disable it
# to prevent our input from being repeated to output.
self.child.setecho(False)
self.child.waitnoecho()
if prompt_change is None:
self.prompt = orig_prompt
else:
self.set_prompt(orig_prompt,
prompt_change.format(new_prompt, continuation_prompt))
self.prompt = new_prompt
self.continuation_prompt = continuation_prompt
self._expect_prompt()
def set_prompt(self, orig_prompt, prompt_change):
self.child.expect(orig_prompt)
self.child.sendline(prompt_change)
def _expect_prompt(self, timeout=-1):
return self.child.expect_exact([self.prompt, self.continuation_prompt],
timeout=timeout)
def run_command(self, command, timeout=-1):
"""Send a command to the REPL, wait for and return output.
:param str command: The command to send. Trailing newlines are not needed.
This should be a complete block of input that will trigger execution;
if a continuation prompt is found after sending input, :exc:`ValueError`
will be raised.
:param int timeout: How long to wait for the next prompt. -1 means the
default from the :class:`pexpect.spawn` object (default 30 seconds).
None means to wait indefinitely.
"""
# Split up multiline commands and feed them in bit-by-bit
cmdlines = command.splitlines()
# splitlines ignores trailing newlines - add it back in manually
if command.endswith('\n'):
cmdlines.append('')
if not cmdlines:
raise ValueError("No command was given")
self.child.sendline(cmdlines[0])
for line in cmdlines[1:]:
self._expect_prompt(timeout=1)
self.child.sendline(line)
# Command was fully submitted, now wait for the next prompt
if self._expect_prompt(timeout=timeout) == 1:
# We got the continuation prompt - command was incomplete
self.child.kill(signal.SIGINT)
self._expect_prompt(timeout=1)
raise ValueError("Continuation prompt found - input was incomplete:\n"
+ command)
return self.child.before
def python(command="python"):
"""Start a Python shell and return a :class:`REPLWrapper` object."""
return REPLWrapper(command, u(">>> "), u("import sys; sys.ps1={0!r}; sys.ps2={1!r}"))
def bash(command="bash", orig_prompt=re.compile('[$#]')):
"""Start a bash shell and return a :class:`REPLWrapper` object."""
return REPLWrapper(command, orig_prompt, u("PS1='{0}' PS2='{1}' PROMPT_COMMAND=''"))