1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

Revert "Upgrade pexpect to 3.3"

This reverts commit 46d2d46c20dc8b2379f3dee44ae58c45e0997bff.

Conflicts:
	awx/lib/site-packages/README
This commit is contained in:
Matthew Jones 2014-08-07 09:19:30 -04:00
parent e2eff08088
commit 2abf9ee653
5 changed files with 192 additions and 373 deletions

View File

@ -39,7 +39,7 @@ os-diskconfig-python-novaclient-ext==0.1.2 (os_diskconfig_python_novaclient_ext/
os-networksv2-python-novaclient-ext==0.21 (os_networksv2_python_novaclient_ext.py)
os-virtual-interfacesv2-python-novaclient-ext==0.15 (os_virtual_interfacesv2_python_novaclient_ext.py)
pbr==0.10.0 (pbr/*)
pexpect==3.3 (pexpect/*, excluded pxssh.py, fdpexpect.py, FSM.py, screen.py,
pexpect==3.1 (pexpect/*, excluded pxssh.py, fdpexpect.py, FSM.py, screen.py,
ANSI.py)
pip==1.5.4 (pip/*, excluded bin/pip*)
prettytable==0.7.2 (prettytable.py)

View File

@ -80,7 +80,6 @@ try:
import traceback
import signal
import codecs
import stat
except ImportError: # pragma: no cover
err = sys.exc_info()[1]
raise ImportError(str(err) + '''
@ -88,7 +87,7 @@ except ImportError: # pragma: no cover
A critical module was not found. Probably this operating system does not
support it. Pexpect is intended for UNIX-like operating systems.''')
__version__ = '3.3'
__version__ = '3.1'
__revision__ = ''
__all__ = ['ExceptionPexpect', 'EOF', 'TIMEOUT', 'spawn', 'spawnu', 'run', 'runu',
'which', 'split_command_line', '__version__', '__revision__']
@ -285,7 +284,6 @@ class spawn(object):
def _chr(c):
return bytes([c])
linesep = os.linesep.encode('ascii')
crlf = '\r\n'.encode('ascii')
@staticmethod
def write_to_stdout(b):
@ -298,14 +296,13 @@ class spawn(object):
allowed_string_types = (basestring,) # analysis:ignore
_chr = staticmethod(chr)
linesep = os.linesep
crlf = '\r\n'
write_to_stdout = sys.stdout.write
encoding = None
def __init__(self, command, args=[], timeout=30, maxread=2000,
searchwindowsize=None, logfile=None, cwd=None, env=None,
ignore_sighup=True, echo=True):
ignore_sighup=True):
'''This is the constructor. The command parameter may be a string that
includes a command and any arguments to the command. For example::
@ -418,16 +415,7 @@ class spawn(object):
signalstatus will store the signal value and exitstatus will be None.
If you need more detail you can also read the self.status member which
stores the status returned by os.waitpid. You can interpret this using
os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG.
The echo attribute may be set to False to disable echoing of input.
As a pseudo-terminal, all input echoed by the "keyboard" (send()
or sendline()) will be repeated to output. For many cases, it is
not desirable to have echo enabled, and it may be later disabled
using setecho(False) followed by waitnoecho(). However, for some
platforms such as Solaris, this is not possible, and should be
disabled immediately on spawn.
'''
os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG. '''
self.STDIN_FILENO = pty.STDIN_FILENO
self.STDOUT_FILENO = pty.STDOUT_FILENO
@ -449,7 +437,7 @@ class spawn(object):
self.status = None
self.flag_eof = False
self.pid = None
# the child file descriptor is initially closed
# the chile filedescriptor is initially closed
self.child_fd = -1
self.timeout = timeout
self.delimiter = EOF
@ -478,30 +466,16 @@ class spawn(object):
self.closed = True
self.cwd = cwd
self.env = env
self.echo = echo
self.ignore_sighup = ignore_sighup
_platform = sys.platform.lower()
# This flags if we are running on irix
self.__irix_hack = _platform.startswith('irix')
self.__irix_hack = (sys.platform.lower().find('irix') >= 0)
# Solaris uses internal __fork_pty(). All others use pty.fork().
self.use_native_pty_fork = not (
_platform.startswith('solaris') or
_platform.startswith('sunos'))
# inherit EOF and INTR definitions from controlling process.
try:
from termios import VEOF, VINTR
fd = sys.__stdin__.fileno()
self._INTR = ord(termios.tcgetattr(fd)[6][VINTR])
self._EOF = ord(termios.tcgetattr(fd)[6][VEOF])
except (ImportError, OSError, IOError, termios.error):
# unless the controlling process is also not a terminal,
# such as cron(1). Fall-back to using CEOF and CINTR.
try:
from termios import CEOF, CINTR
(self._INTR, self._EOF) = (CINTR, CEOF)
except ImportError:
# ^C, ^D
(self._INTR, self._EOF) = (3, 4)
if ((sys.platform.lower().find('solaris') >= 0)
or (sys.platform.lower().find('sunos5') >= 0)):
self.use_native_pty_fork = False
else:
self.use_native_pty_fork = True
# Support subclasses that do not use command or args.
if command is None:
self.command = None
@ -625,39 +599,33 @@ class spawn(object):
if self.use_native_pty_fork:
try:
self.pid, self.child_fd = pty.fork()
except OSError: # pragma: no cover
except OSError:
err = sys.exc_info()[1]
raise ExceptionPexpect('pty.fork() failed: ' + str(err))
else:
# Use internal __fork_pty
self.pid, self.child_fd = self.__fork_pty()
# Some platforms must call setwinsize() and setecho() from the
# child process, and others from the master process. We do both,
# allowing IOError for either.
if self.pid == pty.CHILD:
if self.pid == 0:
# Child
self.child_fd = self.STDIN_FILENO
# set default window size of 24 rows by 80 columns
try:
# used by setwinsize()
self.child_fd = sys.stdout.fileno()
self.setwinsize(24, 80)
except IOError as err:
if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
raise
# disable echo if spawn argument echo was unset
if not self.echo:
try:
self.setecho(self.echo)
except (IOError, termios.error) as err:
if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
raise
# which exception, shouldnt' we catch explicitly .. ?
except:
# Some platforms do not like setwinsize (Cygwin).
# This will cause problem when running applications that
# are very picky about window size.
# This is a serious limitation, but not a show stopper.
pass
# Do not allow child to inherit open file descriptors from parent.
max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
os.closerange(3, max_fd)
for i in range(3, max_fd):
try:
os.close(i)
except OSError:
pass
if self.ignore_sighup:
signal.signal(signal.SIGHUP, signal.SIG_IGN)
@ -670,13 +638,6 @@ class spawn(object):
os.execvpe(self.command, self.args, self.env)
# Parent
try:
self.setwinsize(24, 80)
except IOError as err:
if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
raise
self.terminated = False
self.closed = False
@ -699,15 +660,19 @@ class spawn(object):
raise ExceptionPexpect("Could not open with os.openpty().")
pid = os.fork()
if pid == pty.CHILD:
if pid < 0:
raise ExceptionPexpect("Failed os.fork().")
elif pid == 0:
# Child.
os.close(parent_fd)
self.__pty_make_controlling_tty(child_fd)
os.dup2(child_fd, self.STDIN_FILENO)
os.dup2(child_fd, self.STDOUT_FILENO)
os.dup2(child_fd, self.STDERR_FILENO)
os.dup2(child_fd, 0)
os.dup2(child_fd, 1)
os.dup2(child_fd, 2)
if child_fd > 2:
os.close(child_fd)
else:
# Parent.
os.close(child_fd)
@ -721,37 +686,45 @@ class spawn(object):
child_name = os.ttyname(tty_fd)
# Disconnect from controlling tty, if any. Raises OSError of ENXIO
# if there was no controlling tty to begin with, such as when
# executed by a cron(1) job.
# Disconnect from controlling tty. Harmless if not already connected.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
if fd >= 0:
os.close(fd)
except OSError as err:
if err.errno != errno.ENXIO:
raise
# which exception, shouldnt' we catch explicitly .. ?
except:
# Already disconnected. This happens if running inside cron.
pass
os.setsid()
# Verify we are disconnected from controlling tty by attempting to open
# it again. We expect that OSError of ENXIO should always be raised.
# Verify we are disconnected from controlling tty
# by attempting to open it again.
try:
fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
if fd >= 0:
os.close(fd)
raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
except OSError as err:
if err.errno != errno.ENXIO:
raise
raise ExceptionPexpect('Failed to disconnect from ' +
'controlling tty. It is still possible to open /dev/tty.')
# which exception, shouldnt' we catch explicitly .. ?
except:
# Good! We are disconnected from a controlling tty.
pass
# Verify we can open child pty.
fd = os.open(child_name, os.O_RDWR)
if fd < 0:
raise ExceptionPexpect("Could not open child pty, " + child_name)
else:
os.close(fd)
# Verify we now have a controlling tty.
fd = os.open("/dev/tty", os.O_WRONLY)
if fd < 0:
raise ExceptionPexpect("Could not open controlling tty, /dev/tty")
else:
os.close(fd)
def fileno(self):
'''This returns the file descriptor of the pty for the child.
'''
@ -784,12 +757,7 @@ class spawn(object):
def isatty(self):
'''This returns True if the file descriptor is open and connected to a
tty(-like) device, else False.
On SVR4-style platforms implementing streams, such as SunOS and HP-UX,
the child pty may not appear as a terminal device. This means
methods such as setecho(), setwinsize(), getwinsize() may raise an
IOError. '''
tty(-like) device, else False. '''
return os.isatty(self.child_fd)
@ -826,20 +794,12 @@ class spawn(object):
def getecho(self):
'''This returns the terminal echo mode. This returns True if echo is
on or False if echo is off. Child applications that are expecting you
to enter a password often set ECHO False. See waitnoecho().
to enter a password often set ECHO False. See waitnoecho(). '''
Not supported on platforms where ``isatty()`` returns False. '''
try:
attr = termios.tcgetattr(self.child_fd)
except termios.error as err:
errmsg = 'getecho() may not be called on this platform'
if err.args[0] == errno.EINVAL:
raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
raise
self.echo = bool(attr[3] & termios.ECHO)
return self.echo
if attr[3] & termios.ECHO:
return True
return False
def setecho(self, state):
'''This sets the terminal echo mode on or off. Note that anything the
@ -869,35 +829,18 @@ class spawn(object):
p.expect(['1234'])
p.expect(['abcd'])
p.expect(['wxyz'])
Not supported on platforms where ``isatty()`` returns False.
'''
errmsg = 'setecho() may not be called on this platform'
try:
self.child_fd
attr = termios.tcgetattr(self.child_fd)
except termios.error as err:
if err.args[0] == errno.EINVAL:
raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
raise
if state:
attr[3] = attr[3] | termios.ECHO
else:
attr[3] = attr[3] & ~termios.ECHO
try:
# I tried TCSADRAIN and TCSAFLUSH, but these were inconsistent and
# blocked on some platforms. TCSADRAIN would probably be ideal.
# I tried TCSADRAIN and TCSAFLUSH, but
# these were inconsistent and blocked on some platforms.
# TCSADRAIN would probably be ideal if it worked.
termios.tcsetattr(self.child_fd, termios.TCSANOW, attr)
except IOError as err:
if err.args[0] == errno.EINVAL:
raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
raise
self.echo = state
def _log(self, s, direction):
if self.logfile is not None:
@ -970,14 +913,12 @@ class spawn(object):
if self.child_fd in r:
try:
s = os.read(self.child_fd, size)
except OSError as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
except OSError:
# Linux does this
self.flag_eof = True
raise EOF('End Of File (EOF). Exception style platform.')
raise
if s == b'':
# BSD-style EOF
# BSD style
self.flag_eof = True
raise EOF('End Of File (EOF). Empty string style platform.')
@ -985,7 +926,7 @@ class spawn(object):
self._log(s, 'read')
return s
raise ExceptionPexpect('Reached an unexpected state.') # pragma: no cover
raise ExceptionPexpect('Reached an unexpected state.')
def read(self, size=-1):
'''This reads at most "size" bytes from the file (less if the read hits
@ -1031,9 +972,9 @@ class spawn(object):
if size == 0:
return self.string_type()
# delimiter default is EOF
index = self.expect([self.crlf, self.delimiter])
index = self.expect([b'\r\n', self.delimiter])
if index == 0:
return self.before + self.crlf
return self.before + b'\r\n'
else:
return self.before
@ -1134,14 +1075,40 @@ class spawn(object):
It is the responsibility of the caller to ensure the eof is sent at the
beginning of a line. '''
self.send(self._chr(self._EOF))
### Hmmm... how do I send an EOF?
###C if ((m = write(pty, *buf, p - *buf)) < 0)
###C return (errno == EWOULDBLOCK) ? n : -1;
#fd = sys.stdin.fileno()
#old = termios.tcgetattr(fd) # remember current state
#attr = termios.tcgetattr(fd)
#attr[3] = attr[3] | termios.ICANON # ICANON must be set to see EOF
#try: # use try/finally to ensure state gets restored
# termios.tcsetattr(fd, termios.TCSADRAIN, attr)
# if hasattr(termios, 'CEOF'):
# os.write(self.child_fd, '%c' % termios.CEOF)
# else:
# # Silly platform does not define CEOF so assume CTRL-D
# os.write(self.child_fd, '%c' % 4)
#finally: # restore state
# termios.tcsetattr(fd, termios.TCSADRAIN, old)
if hasattr(termios, 'VEOF'):
char = ord(termios.tcgetattr(self.child_fd)[6][termios.VEOF])
else:
# platform does not define VEOF so assume CTRL-D
char = 4
self.send(self._chr(char))
def sendintr(self):
'''This sends a SIGINT to the child. It does not require
the SIGINT to be the first character on a line. '''
self.send(self._chr(self._INTR))
if hasattr(termios, 'VINTR'):
char = ord(termios.tcgetattr(self.child_fd)[6][termios.VINTR])
else:
# platform does not define VINTR so assume CTRL-C
char = 3
self.send(self._chr(char))
def eof(self):
@ -1214,7 +1181,7 @@ class spawn(object):
self.exitstatus = None
self.signalstatus = os.WTERMSIG(status)
self.terminated = True
elif os.WIFSTOPPED(status): # pragma: no cover
elif os.WIFSTOPPED(status):
# You can't call wait() on a child process in the stopped state.
raise ExceptionPexpect('Called wait() on a stopped child ' +
'process. This is not supported. Is some other ' +
@ -1234,7 +1201,7 @@ class spawn(object):
if self.flag_eof:
# This is for Linux, which requires the blocking form
# of waitpid to get the status of a defunct process.
# of waitpid to # get status of a defunct process.
# This is super-lame. The flag_eof would have been set
# in read_nonblocking(), so this should be safe.
waitpid_options = 0
@ -1262,7 +1229,7 @@ class spawn(object):
try:
### os.WNOHANG) # Solaris!
pid, status = os.waitpid(self.pid, waitpid_options)
except OSError as e: # pragma: no cover
except OSError as e:
# This should never happen...
if e.errno == errno.ECHILD:
raise ExceptionPexpect('isalive() encountered condition ' +
@ -1591,11 +1558,18 @@ class spawn(object):
applications like vi or curses -- applications that respond to the
SIGWINCH signal. '''
# Some very old platforms have a bug that causes the value for
# termios.TIOCSWINSZ to be truncated. There was a hack here to work
# around this, but it caused problems with newer platforms so has been
# removed. For details see https://github.com/pexpect/pexpect/issues/39
# Check for buggy platforms. Some Python versions on some platforms
# (notably OSF1 Alpha and RedHat 7.1) truncate the value for
# termios.TIOCSWINSZ. It is not clear why this happens.
# These platforms don't seem to handle the signed int very well;
# yet other platforms like OpenBSD have a large negative value for
# TIOCSWINSZ and they don't have a truncate problem.
# Newer versions of Linux have totally different values for TIOCSWINSZ.
# Note that this fix is a hack.
TIOCSWINSZ = getattr(termios, 'TIOCSWINSZ', -2146929561)
if TIOCSWINSZ == 2148037735:
# Same bits, but with sign.
TIOCSWINSZ = -2146929561
# Note, assume ws_xpixel and ws_ypixel are zero.
s = struct.pack('HHHH', rows, cols, 0, 0)
fcntl.ioctl(self.fileno(), TIOCSWINSZ, s)
@ -1676,14 +1650,10 @@ class spawn(object):
if self.child_fd in r:
try:
data = self.__interact_read(self.child_fd)
except OSError as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
break
except OSError as e:
# The subprocess may have closed before we get to reading it
if e.errno != errno.EIO:
raise
if data == b'':
# BSD-style EOF
break
if output_filter:
data = output_filter(data)
if self.logfile is not None:
@ -1717,7 +1687,7 @@ class spawn(object):
return select.select(iwtd, owtd, ewtd, timeout)
except select.error:
err = sys.exc_info()[1]
if err.args[0] == errno.EINTR:
if err.errno == errno.EINTR:
# if we loop back we have to subtract the
# amount of time we already waited.
if timeout is not None:
@ -1732,7 +1702,7 @@ class spawn(object):
##############################################################################
# The following methods are no longer supported or allowed.
def setmaxread(self, maxread): # pragma: no cover
def setmaxread(self, maxread):
'''This method is no longer supported or allowed. I don't like getters
and setters without a good reason. '''
@ -1741,7 +1711,7 @@ class spawn(object):
'or allowed. Just assign a value to the ' +
'maxread member variable.')
def setlog(self, fileobject): # pragma: no cover
def setlog(self, fileobject):
'''This method is no longer supported or allowed.
'''
@ -1769,13 +1739,11 @@ class spawnu(spawn):
allowed_string_types = (str, )
_chr = staticmethod(chr)
linesep = os.linesep
crlf = '\r\n'
else:
string_type = unicode
allowed_string_types = (unicode, )
_chr = staticmethod(unichr)
linesep = os.linesep.decode('ascii')
crlf = '\r\n'.decode('ascii')
# This can handle unicode in both Python 2 and 3
write_to_stdout = sys.stdout.write
@ -1998,55 +1966,15 @@ class searcher_re(object):
return best_index
def is_executable_file(path):
"""Checks that path is an executable regular file (or a symlink to a file).
This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``, but
on some platforms :func:`os.access` gives us the wrong answer, so this
checks permission bits directly.
"""
# follow symlinks,
fpath = os.path.realpath(path)
# return False for non-files (directories, fifo, etc.)
if not os.path.isfile(fpath):
return False
# On Solaris, etc., "If the process has appropriate privileges, an
# implementation may indicate success for X_OK even if none of the
# execute file permission bits are set."
#
# For this reason, it is necessary to explicitly check st_mode
# get file mode using os.stat, and check if `other',
# that is anybody, may read and execute.
mode = os.stat(fpath).st_mode
if mode & stat.S_IROTH and mode & stat.S_IXOTH:
return True
# get current user's group ids, and check if `group',
# when matching ours, may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_gid in user_gids and
mode & stat.S_IRGRP and mode & stat.S_IXGRP):
return True
# finally, if file owner matches our effective userid,
# check if `user', may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_uid == os.geteuid() and
mode & stat.S_IRUSR and mode & stat.S_IXUSR):
return True
return False
def which(filename):
'''This takes a given filename; tries to find it in the environment path;
then checks if it is executable. This returns the full path to the filename
if found and executable. Otherwise this returns None.'''
# Special case where filename contains an explicit path.
if os.path.dirname(filename) != '' and is_executable_file(filename):
if os.path.dirname(filename) != '':
if os.access(filename, os.X_OK):
return filename
if 'PATH' not in os.environ or os.environ['PATH'] == '':
p = os.defpath
@ -2055,7 +1983,7 @@ def which(filename):
pathlist = p.split(os.pathsep)
for path in pathlist:
ff = os.path.join(path, filename)
if is_executable_file(ff):
if os.access(ff, os.X_OK):
return ff
return None
@ -2120,4 +2048,4 @@ def split_command_line(command_line):
arg_list.append(arg)
return arg_list
# vim: set shiftround expandtab tabstop=4 shiftwidth=4 ft=python autoindent :
# vi:set sr et ts=4 sw=4 ft=python :

View File

@ -89,9 +89,9 @@ class fdspawn (spawn):
except:
return False
def terminate (self, force=False): # pragma: no cover
def terminate (self, force=False):
raise ExceptionPexpect('This method is not valid for file descriptors.')
def kill (self, sig): # pragma: no cover
def kill (self, sig):
"""No-op - no process to kill."""
return

View File

@ -56,17 +56,17 @@ class pxssh (spawn):
s.login (hostname, username, password)
s.sendline ('uptime') # run a command
s.prompt() # match the prompt
print(s.before) # print everything before the prompt.
print s.before # print everything before the prompt.
s.sendline ('ls -l')
s.prompt()
print(s.before)
print s.before
s.sendline ('df')
s.prompt()
print(s.before)
print s.before
s.logout()
except pxssh.ExceptionPxssh as e:
print("pxssh failed on login.")
print(e)
except pxssh.ExceptionPxssh, e:
print "pxssh failed on login."
print str(e)
Note that if you have ssh-agent running while doing development with pxssh
then this can lead to a lot of confusion. Many X display managers (xdm,
@ -85,8 +85,7 @@ class pxssh (spawn):
s.login (hostname, username, password)
'''
def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None,
logfile=None, cwd=None, env=None):
def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None):
spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env)
@ -119,8 +118,10 @@ class pxssh (spawn):
# Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
#self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
self.force_password = False
self.auto_prompt_reset = True
def levenshtein_distance(self, a,b):
'''This calculates the Levenshtein distance between a and b.
'''
@ -140,6 +141,7 @@ class pxssh (spawn):
return current[n]
def try_read_prompt(self, timeout_multiplier):
'''This facilitates using communication timeouts to perform
synchronization as quickly as possible, while supporting high latency
connections with a tunable worst case performance. Fast connections
@ -172,6 +174,7 @@ class pxssh (spawn):
return prompt
def sync_original_prompt (self, sync_multiplier=1.0):
'''This attempts to find the prompt. Basically, press enter and record
the response; press enter again and record the response; if the two
responses are similar then assume we are at the original prompt.
@ -211,13 +214,9 @@ class pxssh (spawn):
### TODO: This is getting messy and I'm pretty sure this isn't perfect.
### TODO: I need to draw a flow chart for this.
def login (self, server, username, password='', terminal_type='ansi',
original_prompt=r"[#$]", login_timeout=10, port=None,
auto_prompt_reset=True, ssh_key=None, quiet=True,
sync_multiplier=1, check_local_ip=True):
'''This logs the user into the given server.
def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True,ssh_key=None,quiet=True,sync_multiplier=1,check_local_ip=True):
It uses
'''This logs the user into the given server. It uses the
'original_prompt' to try to find the prompt right after login. When it
finds the prompt it immediately tries to reset the prompt to something
more easily matched. The default 'original_prompt' is very optimistic
@ -225,7 +224,7 @@ class pxssh (spawn):
prompt as exactly as possible to prevent false matches by server
strings such as the "Message Of The Day". On many systems you can
disable the MOTD on the remote server by creating a zero-length file
called :file:`~/.hushlogin` on the remote server. If a prompt cannot be found
called "~/.hushlogin" on the remote server. If a prompt cannot be found
then this will not necessarily cause the login to fail. In the case of
a timeout when looking for the prompt we assume that the original
prompt was so weird that we could not match it, so we use a few tricks
@ -234,12 +233,11 @@ class pxssh (spawn):
then login() raises an :class:`ExceptionPxssh` exception.
In some situations it is not possible or desirable to reset the
original prompt. In this case, pass ``auto_prompt_reset=False`` to
original prompt. In this case, set :attr:`auto_prompt_reset` to False to
inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
uses a unique prompt in the :meth:`prompt` method. If the original prompt is
not reset then this will disable the :meth:`prompt` method unless you
manually set the :attr:`PROMPT` attribute.
'''
uses a unique prompt in the prompt() method. If the original prompt is
not reset then this will disable the prompt() method unless you
manually set the PROMPT attribute. '''
ssh_options = ''
if quiet:
@ -320,16 +318,14 @@ class pxssh (spawn):
if auto_prompt_reset:
if not self.set_unique_prompt():
self.close()
raise ExceptionPxssh('could not set shell prompt '
'(recieved: %r, expected: %r).' % (
self.before, self.PROMPT,))
raise ExceptionPxssh ('could not set shell prompt\n'+self.before)
return True
def logout (self):
'''Sends exit to the remote shell.
If there are stopped jobs then this automatically sends exit twice.
'''
'''This sends exit to the remote shell. If there are stopped jobs then
this automatically sends exit twice. '''
self.sendline("exit")
index = self.expect([EOF, "(?i)there are stopped jobs"])
if index==1:
@ -338,20 +334,17 @@ class pxssh (spawn):
self.close()
def prompt (self, timeout=-1):
'''Match the next shell prompt.
This is little more than a short-cut to the :meth:`~pexpect.spawn.expect`
method. Note that if you called :meth:`login` with
``auto_prompt_reset=False``, then before calling :meth:`prompt` you must
set the :attr:`PROMPT` attribute to a regex that it will use for
matching the prompt.
'''This matches the shell prompt. This is little more than a short-cut
to the expect() method. This returns True if the shell prompt was
matched. This returns False if a timeout was raised. Note that if you
called :meth:`login` with :attr:`auto_prompt_reset` set to False then
before calling :meth:`prompt` you must set the :attr:`PROMPT` attribute
to a regex that it will use for matching the prompt.
Calling :meth:`prompt` will erase the contents of the :attr:`before`
attribute even if no prompt is ever matched. If timeout is not given or
it is set to -1 then self.timeout is used.
:return: True if the shell prompt was matched, False if the timeout was
reached.
'''
if timeout == -1:
@ -362,7 +355,8 @@ class pxssh (spawn):
return True
def set_unique_prompt (self):
'''This sets the remote prompt to something more unique than ``#`` or ``$``.
'''This sets the remote prompt to something more unique than # or $.
This makes it easier for the :meth:`prompt` method to match the shell prompt
unambiguously. This method is called automatically by the :meth:`login`
method, but you may want to call it manually if you somehow reset the
@ -371,11 +365,11 @@ class pxssh (spawn):
the remote host to set the prompt, so this assumes the remote host is
ready to receive commands.
Alternatively, you may use your own prompt pattern. In this case you
should call :meth:`login` with ``auto_prompt_reset=False``; then set the
:attr:`PROMPT` attribute to a regular expression. After that, the
:meth:`prompt` method will try to match your prompt pattern.
'''
Alternatively, you may use your own prompt pattern. Just set the PROMPT
attribute to a regular expression that matches it. In this case you
should call login() with auto_prompt_reset=False; then set the PROMPT
attribute. After that the prompt() method will try to match your prompt
pattern.'''
self.sendline ("unset PROMPT_COMMAND")
self.sendline (self.PROMPT_SET_SH) # sh-style

View File

@ -1,103 +0,0 @@
"""Generic wrapper for read-eval-print-loops, a.k.a. interactive shells
"""
import signal
import sys
import re
import pexpect
PY3 = (sys.version_info[0] >= 3)
if PY3:
def u(s): return s
else:
def u(s): return s.decode('utf-8')
PEXPECT_PROMPT = u('[PEXPECT_PROMPT>')
PEXPECT_CONTINUATION_PROMPT = u('[PEXPECT_PROMPT+')
class REPLWrapper(object):
"""Wrapper for a REPL.
:param cmd_or_spawn: This can either be an instance of :class:`pexpect.spawn`
in which a REPL has already been started, or a str command to start a new
REPL process.
:param str orig_prompt: The prompt to expect at first.
:param str prompt_change: A command to change the prompt to something more
unique. If this is ``None``, the prompt will not be changed. This will
be formatted with the new and continuation prompts as positional
parameters, so you can use ``{}`` style formatting to insert them into
the command.
:param str new_prompt: The more unique prompt to expect after the change.
"""
def __init__(self, cmd_or_spawn, orig_prompt, prompt_change,
new_prompt=PEXPECT_PROMPT,
continuation_prompt=PEXPECT_CONTINUATION_PROMPT):
if isinstance(cmd_or_spawn, str):
self.child = pexpect.spawnu(cmd_or_spawn, echo=False)
else:
self.child = cmd_or_spawn
if self.child.echo:
# Existing spawn instance has echo enabled, disable it
# to prevent our input from being repeated to output.
self.child.setecho(False)
self.child.waitnoecho()
if prompt_change is None:
self.prompt = orig_prompt
else:
self.set_prompt(orig_prompt,
prompt_change.format(new_prompt, continuation_prompt))
self.prompt = new_prompt
self.continuation_prompt = continuation_prompt
self._expect_prompt()
def set_prompt(self, orig_prompt, prompt_change):
self.child.expect(orig_prompt)
self.child.sendline(prompt_change)
def _expect_prompt(self, timeout=-1):
return self.child.expect_exact([self.prompt, self.continuation_prompt],
timeout=timeout)
def run_command(self, command, timeout=-1):
"""Send a command to the REPL, wait for and return output.
:param str command: The command to send. Trailing newlines are not needed.
This should be a complete block of input that will trigger execution;
if a continuation prompt is found after sending input, :exc:`ValueError`
will be raised.
:param int timeout: How long to wait for the next prompt. -1 means the
default from the :class:`pexpect.spawn` object (default 30 seconds).
None means to wait indefinitely.
"""
# Split up multiline commands and feed them in bit-by-bit
cmdlines = command.splitlines()
# splitlines ignores trailing newlines - add it back in manually
if command.endswith('\n'):
cmdlines.append('')
if not cmdlines:
raise ValueError("No command was given")
self.child.sendline(cmdlines[0])
for line in cmdlines[1:]:
self._expect_prompt(timeout=1)
self.child.sendline(line)
# Command was fully submitted, now wait for the next prompt
if self._expect_prompt(timeout=timeout) == 1:
# We got the continuation prompt - command was incomplete
self.child.kill(signal.SIGINT)
self._expect_prompt(timeout=1)
raise ValueError("Continuation prompt found - input was incomplete:\n"
+ command)
return self.child.before
def python(command="python"):
"""Start a Python shell and return a :class:`REPLWrapper` object."""
return REPLWrapper(command, u(">>> "), u("import sys; sys.ps1={0!r}; sys.ps2={1!r}"))
def bash(command="bash", orig_prompt=re.compile('[$#]')):
"""Start a bash shell and return a :class:`REPLWrapper` object."""
return REPLWrapper(command, orig_prompt, u("PS1='{0}' PS2='{1}' PROMPT_COMMAND=''"))