2020-03-02 15:20:29 -08:00
""" pexpect_helper provides a wrapper around the pexpect module.
This module exposes a single class SpawnedProc , which wraps pexpect . spawn ( ) .
This exposes a pseudo - tty , which fish or another process may talk to .
The send ( ) function may be used to send data to fish , and the expect_ * family
of functions may be used to match what is output to the tty .
Example usage :
sp = SpawnedProc ( ) # this launches fish
sp . expect_prompt ( ) # wait for a prompt
sp . sendline ( " echo hello world " )
sp . expect_prompt ( " hello world " )
"""
from __future__ import print_function
import inspect
import os
import os . path
import re
import sys
import time
import pexpect
# Default timeout for failing to match.
TIMEOUT_SECS = 5
2021-02-12 18:50:05 +01:00
UNEXPECTED_SUCCESS = object ( )
2020-03-02 15:20:29 -08:00
2021-10-29 20:42:59 -07:00
2020-03-02 15:20:29 -08:00
def get_prompt_re ( counter ) :
2021-10-29 20:42:59 -07:00
""" Return a regular expression for matching a with a given prompt counter. """
2020-03-02 15:20:29 -08:00
return re . compile (
r """ (?: \ r \ n?|^) # beginning of line
2020-06-13 19:18:12 +02:00
( ? : \x1b [ \d \[ KB ( m ] * ) * # optional colors
2020-03-02 15:20:29 -08:00
( ? : \[ . \] \ ) ? # optional vi mode prompt
"""
2020-06-24 20:43:56 +02:00
+ ( r " prompt \ %d > " % counter ) # prompt with counter
2020-06-13 19:18:12 +02:00
+ r """
( ? : \x1b [ \d \[ KB ( m ] * ) * # optional colors
""" ,
2020-03-02 15:20:29 -08:00
re . VERBOSE ,
)
def get_callsite ( ) :
2021-10-29 20:42:59 -07:00
""" Return a triple (filename, line_number, line_text) of the call site location. """
2020-03-02 15:20:29 -08:00
callstack = inspect . getouterframes ( inspect . currentframe ( ) )
for f in callstack :
if inspect . getmodule ( f . frame ) is not Message . MODULE :
return ( os . path . basename ( f . filename ) , f . lineno , f . code_context )
return ( " Unknown " , - 1 , " " )
def escape ( s ) :
2021-10-29 20:42:59 -07:00
""" Escape the string ' s ' to make it human-understandable. """
2020-03-02 15:20:29 -08:00
res = [ ]
for c in s :
if c == " \n " :
res . append ( " \\ n " )
elif c == " \r " :
res . append ( " \\ r " )
elif c == " \t " :
res . append ( " \\ t " )
elif c . isprintable ( ) :
res . append ( c )
else :
res . append ( " \\ x {:02x} " . format ( ord ( c ) ) )
return " " . join ( res )
2020-06-04 18:13:39 -07:00
def pexpect_error_type ( err ) :
2021-10-29 20:42:59 -07:00
""" Return a human-readable description of a pexpect error type. """
2020-06-04 18:13:39 -07:00
if isinstance ( err , pexpect . EOF ) :
return " EOF "
elif isinstance ( err , pexpect . TIMEOUT ) :
return " timeout "
2021-02-12 18:50:05 +01:00
elif err is UNEXPECTED_SUCCESS :
return " unexpected success "
2020-06-04 18:13:39 -07:00
else :
return " unknown error "
2020-03-02 15:20:29 -08:00
class Message ( object ) :
2020-11-22 14:39:48 +01:00
""" Some text either sent-to or received-from the spawned proc.
2020-03-02 15:20:29 -08:00
Attributes :
2020-06-04 18:13:39 -07:00
dir : the message direction , either DIR_INPUT or DIR_OUTPUT
2020-03-02 15:20:29 -08:00
filename : the name of the file from which the message was sent
text : the text of the messages
when : a timestamp of when the message was sent
"""
2020-06-04 18:13:39 -07:00
# Input is input into fish shell ("sent data").
DIR_INPUT = " INPUT "
# Output means output from fish shell ("received data").
DIR_OUTPUT = " OUTPUT "
2020-03-02 15:20:29 -08:00
MODULE = sys . modules [ __name__ ]
def __init__ ( self , dir , text , when ) :
2021-10-29 20:42:59 -07:00
""" Construct from a direction, message text and timestamp. """
2020-03-02 15:20:29 -08:00
self . dir = dir
self . filename , self . lineno , _ = get_callsite ( )
self . text = text
self . when = when
@staticmethod
2020-06-04 18:13:39 -07:00
def sent_input ( text , when ) :
2021-10-29 20:42:59 -07:00
""" Return an input message with the given text. """
2020-06-04 18:13:39 -07:00
return Message ( Message . DIR_INPUT , text , when )
2020-03-02 15:20:29 -08:00
@staticmethod
2020-06-04 18:13:39 -07:00
def received_output ( text , when ) :
2021-10-29 20:42:59 -07:00
""" Return a output message with the given text. """
2020-06-04 18:13:39 -07:00
return Message ( Message . DIR_OUTPUT , text , when )
2020-03-02 15:20:29 -08:00
class SpawnedProc ( object ) :
2020-11-22 14:39:48 +01:00
""" A process, talking to our ptty. This wraps pexpect.spawn.
2020-03-02 15:20:29 -08:00
Attributes :
colorize : whether error messages should have ANSI color escapes
messages : list of Message sent and received , in - order
start_time : the timestamp of the first message , or None if none yet
spawn : the pexpect . spawn value
prompt_counter : the index of the prompt . This cooperates with the fish_prompt
function to ensure that each printed prompt is distinct .
"""
2021-10-29 20:42:59 -07:00
def __init__ (
self , name = " fish " , timeout = TIMEOUT_SECS , env = os . environ . copy ( ) , * * kwargs
) :
2020-11-22 14:39:48 +01:00
""" Construct from a name, timeout, and environment.
Args :
name : the name of the executable to launch , as a key into the
environment dictionary . By default this is ' fish ' but may be
other executables .
timeout : A timeout to pass to pexpect . This indicates how long to wait
before giving up on some expected output .
env : a string - > string dictionary , describing the environment variables .
2020-03-02 15:20:29 -08:00
"""
if name not in env :
2021-03-21 16:05:45 +01:00
raise ValueError ( " ' %s ' variable not found in environment " % name )
2020-03-02 15:20:29 -08:00
exe_path = env . get ( name )
2021-08-30 17:16:19 +02:00
self . colorize = sys . stdout . isatty ( ) or env . get ( " FISH_FORCE_COLOR " , " 0 " ) == " 1 "
2020-03-02 15:20:29 -08:00
self . messages = [ ]
self . start_time = None
2021-10-29 20:42:59 -07:00
self . spawn = pexpect . spawn (
exe_path , env = env , encoding = " utf-8 " , timeout = timeout , * * kwargs
)
2020-03-02 15:20:29 -08:00
self . spawn . delaybeforesend = None
2020-10-06 14:22:35 -07:00
self . prompt_counter = 0
2020-03-02 15:20:29 -08:00
def time_since_first_message ( self ) :
2021-10-29 20:42:59 -07:00
""" Return a delta in seconds since the first message, or 0 if this is the first. """
2020-03-02 15:20:29 -08:00
now = time . monotonic ( )
if not self . start_time :
self . start_time = now
return now - self . start_time
def send ( self , s ) :
2020-11-22 14:39:48 +01:00
""" Cover over pexpect.spawn.send().
Send the given string to the tty , returning the number of bytes written .
2020-03-02 15:20:29 -08:00
"""
res = self . spawn . send ( s )
when = self . time_since_first_message ( )
2020-06-04 18:13:39 -07:00
self . messages . append ( Message . sent_input ( s , when ) )
2020-03-02 15:20:29 -08:00
return res
def sendline ( self , s ) :
2020-11-22 14:39:48 +01:00
""" Cover over pexpect.spawn.sendline().
Send the given string + linesep to the tty , returning the number of bytes written .
2020-03-02 15:20:29 -08:00
"""
return self . send ( s + os . linesep )
2021-02-12 18:50:05 +01:00
def expect_re ( self , pat , pat_desc = None , unmatched = None , shouldfail = False , * * kwargs ) :
2020-11-22 14:39:48 +01:00
""" Cover over pexpect.spawn.expect().
Consume all " new " output of self . spawn until the given pattern is matched , or
the timeout is reached .
Note that output between the current position and the location of the match is
consumed as well .
The pattern is typically a regular expression in string form , but may also be
any of the types accepted by pexpect . spawn . expect ( ) .
If the ' unmatched ' parameter is given , it is printed as part of the error message
of any failure .
On failure , this prints an error and exits .
2020-03-02 15:20:29 -08:00
"""
try :
2020-09-04 17:46:04 +02:00
self . spawn . expect ( pat , * * kwargs )
2020-03-02 15:20:29 -08:00
when = self . time_since_first_message ( )
2020-06-04 18:13:39 -07:00
self . messages . append (
Message . received_output ( self . spawn . match . group ( ) , when )
)
2020-09-04 17:46:04 +02:00
# When a match is found,
# spawn.match is the MatchObject that produced it.
# This can be used to check what exactly was matched.
2021-02-12 18:50:05 +01:00
if shouldfail :
err = UNEXPECTED_SUCCESS
if not pat_desc :
pat_desc = str ( pat )
self . report_exception_and_exit ( pat_desc , unmatched , err )
2020-09-04 17:46:04 +02:00
return self . spawn . match
2020-03-02 15:20:29 -08:00
except pexpect . ExceptionPexpect as err :
2021-02-12 18:50:05 +01:00
if shouldfail :
return True
2020-03-02 15:20:29 -08:00
if not pat_desc :
pat_desc = str ( pat )
self . report_exception_and_exit ( pat_desc , unmatched , err )
def expect_str ( self , s , * * kwargs ) :
2021-10-29 20:42:59 -07:00
""" Cover over expect_re() which accepts a literal string. """
2020-03-02 15:20:29 -08:00
return self . expect_re ( re . escape ( s ) , * * kwargs )
2020-10-06 16:03:06 -07:00
def expect_prompt ( self , * args , increment = True , * * kwargs ) :
2020-11-22 14:39:48 +01:00
""" Convenience function which matches some text and then a prompt.
Match the given positional arguments as expect_re , and then look
for a prompt .
If increment is set , then this should be a new prompt and the prompt counter
should be bumped ; otherwise this is not a new prompt .
Returns None on success , and exits on failure .
Example :
sp . sendline ( " echo hello world " )
sp . expect_prompt ( " hello world " )
2020-03-02 15:20:29 -08:00
"""
if args :
self . expect_re ( * args , * * kwargs )
2020-10-06 14:22:35 -07:00
if increment :
self . prompt_counter + = 1
2020-03-02 15:20:29 -08:00
self . expect_re (
get_prompt_re ( self . prompt_counter ) ,
pat_desc = " prompt %d " % self . prompt_counter ,
)
def report_exception_and_exit ( self , pat , unmatched , err ) :
2020-11-22 14:39:48 +01:00
""" Things have gone badly.
We have an exception ' err ' , some pexpect . ExceptionPexpect .
Report it to stdout , along with the offending call site .
If ' unmatched ' is set , print it to stdout .
2020-03-02 15:20:29 -08:00
"""
colors = self . colors ( )
2020-06-04 18:13:39 -07:00
failtype = pexpect_error_type ( err )
fmtkeys = { " failtype " : failtype , " pat " : escape ( pat ) }
fmtkeys . update ( * * colors )
2020-03-02 15:20:29 -08:00
filename , lineno , code_context = get_callsite ( )
2020-06-04 18:13:39 -07:00
fmtkeys [ " filename " ] = filename
fmtkeys [ " lineno " ] = lineno
fmtkeys [ " code " ] = " \n " . join ( code_context )
if unmatched :
print (
" {RED} Error: {NORMAL} {BOLD} {unmatched} {RESET} " . format (
unmatched = unmatched , * * fmtkeys
)
2020-03-02 15:20:29 -08:00
)
2020-06-04 18:13:39 -07:00
print (
" {RED} Failed to match pattern: {NORMAL} {BOLD} {pat} {RESET} " . format ( * * fmtkeys )
)
print (
" {filename} : {lineno} : {BOLD} {failtype} {RESET} from {code} " . format ( * * fmtkeys )
2020-03-02 15:20:29 -08:00
)
2020-06-04 18:13:39 -07:00
print ( " " )
print ( " {CYAN} Escaped buffer: {RESET} " . format ( * * colors ) )
print ( escape ( self . spawn . before ) )
print ( " " )
2021-10-29 20:45:42 -07:00
print ( " {CYAN} When written to the tty, this looks like: {RESET} " . format ( * * colors ) )
print ( " {CYAN} <------- {RESET} " . format ( * * colors ) )
sys . stdout . write ( self . spawn . before )
sys . stdout . flush ( )
print ( " {RESET} \n {CYAN} -------> {RESET} " . format ( * * colors ) )
2020-06-04 18:13:39 -07:00
print ( " " )
2020-06-13 19:51:37 +02:00
# Show the last 10 messages.
print ( " Last 10 messages: " )
2020-06-04 18:13:39 -07:00
delta = None
2020-06-13 19:51:37 +02:00
for m in self . messages [ - 10 : ] :
2020-06-04 18:13:39 -07:00
etext = escape ( m . text )
timestamp = m . when * 1000.0
# Use relative timestamps and add a sign.
# This assumes a max length of 10^10 milliseconds (115 days) for the initial timestamp,
# and 11.5 days for the delta.
if delta :
timestamp - = delta
timestampstr = " {timestamp:+10.2f} ms " . format ( timestamp = timestamp )
else :
timestampstr = " {timestamp:10.2f} ms " . format ( timestamp = timestamp )
delta = m . when * 1000.0
print (
" {dir} {timestampstr} (Line {lineno} ): {BOLD} {etext} {RESET} " . format (
dir = m . dir ,
timestampstr = timestampstr ,
filename = m . filename ,
lineno = m . lineno ,
etext = etext ,
* * colors
)
)
print ( " " )
2020-03-02 15:20:29 -08:00
sys . exit ( 1 )
def sleep ( self , secs ) :
2021-10-29 20:42:59 -07:00
""" Cover over time.sleep(). """
2020-03-02 15:20:29 -08:00
time . sleep ( secs )
def colors ( self ) :
2021-10-29 20:42:59 -07:00
""" Return a dictionary mapping color names to ANSI escapes """
2020-03-02 15:20:29 -08:00
def ansic ( n ) :
2021-10-29 20:42:59 -07:00
""" Return either an ANSI escape sequence for a color, or empty string. """
2020-03-02 15:20:29 -08:00
return " \033 [ %d m " % n if self . colorize else " "
return {
" RESET " : ansic ( 0 ) ,
" BOLD " : ansic ( 1 ) ,
" NORMAL " : ansic ( 39 ) ,
" BLACK " : ansic ( 30 ) ,
" RED " : ansic ( 31 ) ,
" GREEN " : ansic ( 32 ) ,
" YELLOW " : ansic ( 33 ) ,
" BLUE " : ansic ( 34 ) ,
" MAGENTA " : ansic ( 35 ) ,
" CYAN " : ansic ( 36 ) ,
" LIGHTGRAY " : ansic ( 37 ) ,
" DARKGRAY " : ansic ( 90 ) ,
" LIGHTRED " : ansic ( 91 ) ,
" LIGHTGREEN " : ansic ( 92 ) ,
" LIGHTYELLOW " : ansic ( 93 ) ,
" LIGHTBLUE " : ansic ( 94 ) ,
" LIGHTMAGENTA " : ansic ( 95 ) ,
" LIGHTCYAN " : ansic ( 96 ) ,
" WHITE " : ansic ( 97 ) ,
}