1
0
mirror of https://github.com/samba-team/samba.git synced 2025-08-05 12:22:11 +03:00

Merge from Subversion r50.

(This used to be commit 4804873559)
This commit is contained in:
Martin Pool
2003-04-04 03:16:27 +00:00
parent 03ccc4ac13
commit 94e5719dac

View File

@ -31,14 +31,9 @@ For more information, see the file README.comfychair.
To run a test suite based on ComfyChair, just run it as a program. To run a test suite based on ComfyChair, just run it as a program.
""" """
# TODO: Put everything into a temporary directory?
# TODO: Have a means for tests to customize the display of their
# failure messages. In particular, if a shell command failed, then
# give its stderr.
import sys, re import sys, re
class TestCase: class TestCase:
"""A base class for tests. This class defines required functions which """A base class for tests. This class defines required functions which
can optionally be overridden by subclasses. It also provides some can optionally be overridden by subclasses. It also provides some
@ -47,6 +42,43 @@ class TestCase:
def __init__(self): def __init__(self):
self.test_log = "" self.test_log = ""
self.background_pids = [] self.background_pids = []
self._cleanups = []
self._enter_rundir()
self._save_environment()
self.add_cleanup(self.teardown)
# --------------------------------------------------
# Save and restore directory
def _enter_rundir(self):
import os
self.basedir = os.getcwd()
self.add_cleanup(self._restore_directory)
self.rundir = os.path.join(self.basedir,
'testtmp',
self.__class__.__name__)
self.tmpdir = os.path.join(self.rundir, 'tmp')
os.system("rm -fr %s" % self.rundir)
os.makedirs(self.tmpdir)
os.system("mkdir -p %s" % self.rundir)
os.chdir(self.rundir)
def _restore_directory(self):
import os
os.chdir(self.basedir)
# --------------------------------------------------
# Save and restore environment
def _save_environment(self):
import os
self._saved_environ = os.environ.copy()
self.add_cleanup(self._restore_environment)
def _restore_environment(self):
import os
os.environ.clear()
os.environ.update(self._saved_environ)
def setup(self): def setup(self):
"""Set up test fixture.""" """Set up test fixture."""
@ -60,6 +92,12 @@ class TestCase:
"""Run the test.""" """Run the test."""
pass pass
def add_cleanup(self, c):
"""Queue a cleanup to be run when the test is complete."""
self._cleanups.append(c)
def fail(self, reason = ""): def fail(self, reason = ""):
"""Say the test failed.""" """Say the test failed."""
raise AssertionError(reason) raise AssertionError(reason)
@ -138,9 +176,14 @@ why."""
def runcmd_background(self, cmd): def runcmd_background(self, cmd):
import os import os
name = cmd[0]
self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n" self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
pid = os.spawnvp(os.P_NOWAIT, name, cmd) pid = os.fork()
if pid == 0:
# child
try:
os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
finally:
os._exit(127)
self.test_log = self.test_log + "pid: %d\n" % pid self.test_log = self.test_log + "pid: %d\n" % pid
return pid return pid
@ -148,44 +191,78 @@ why."""
def runcmd(self, cmd, expectedResult = 0): def runcmd(self, cmd, expectedResult = 0):
"""Run a command, fail if the command returns an unexpected exit """Run a command, fail if the command returns an unexpected exit
code. Return the output produced.""" code. Return the output produced."""
rc, output = self.runcmd_unchecked(cmd) rc, output, stderr = self.runcmd_unchecked(cmd)
if rc != expectedResult: if rc != expectedResult:
raise AssertionError("command returned %d; expected %s: \"%s\"" % raise AssertionError("""command returned %d; expected %s: \"%s\"
(rc, expectedResult, cmd)) stdout:
%s
stderr:
%s""" % (rc, expectedResult, cmd, output, stderr))
return output, stderr
def run_captured(self, cmd):
"""Run a command, capturing stdout and stderr.
Based in part on popen2.py
Returns (waitstatus, stdout, stderr)."""
import os, types
pid = os.fork()
if pid == 0:
# child
try:
pid = os.getpid()
openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
outfd = os.open('%d.out' % pid, openmode, 0666)
os.dup2(outfd, 1)
os.close(outfd)
errfd = os.open('%d.err' % pid, openmode, 0666)
os.dup2(errfd, 2)
os.close(errfd)
if isinstance(cmd, types.StringType):
cmd = ['/bin/sh', '-c', cmd]
os.execvp(cmd[0], cmd)
finally:
os._exit(127)
else:
# parent
exited_pid, waitstatus = os.waitpid(pid, 0)
stdout = open('%d.out' % pid).read()
stderr = open('%d.err' % pid).read()
return waitstatus, stdout, stderr
return output
def runcmd_unchecked(self, cmd, skip_on_noexec = 0): def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
"""Invoke a command; return (exitcode, stdout)""" """Invoke a command; return (exitcode, stdout, stderr)"""
import os, popen2 import os
pobj = popen2.Popen4(cmd) waitstatus, stdout, stderr = self.run_captured(cmd)
output = pobj.fromchild.read()
waitstatus = pobj.wait()
assert not os.WIFSIGNALED(waitstatus), \ assert not os.WIFSIGNALED(waitstatus), \
("%s terminated with signal %d", cmd, os.WTERMSIG(waitstatus)) ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
rc = os.WEXITSTATUS(waitstatus) rc = os.WEXITSTATUS(waitstatus)
self.test_log = self.test_log + ("""Run command: %s self.test_log = self.test_log + ("""Run command: %s
Wait status: %#x (exit code %d, signal %d) Wait status: %#x (exit code %d, signal %d)
Output: stdout:
%s
stderr:
%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus), %s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
output)) stdout, stderr))
if skip_on_noexec and rc == 127: if skip_on_noexec and rc == 127:
# Either we could not execute the command or the command # Either we could not execute the command or the command
# returned exit code 127. According to system(3) we can't # returned exit code 127. According to system(3) we can't
# tell the difference. # tell the difference.
raise NotRunError, "could not execute %s" % `cmd` raise NotRunError, "could not execute %s" % `cmd`
return rc, output return rc, stdout, stderr
def explain_failure(self, exc_info = None): def explain_failure(self, exc_info = None):
import traceback print "test_log:"
# Move along, nothing to see here
if not exc_info and self.test_log == "":
return
print "-----------------------------------------------------------------"
if exc_info:
traceback.print_exc(file=sys.stdout)
print self.test_log print self.test_log
print "-----------------------------------------------------------------"
def log(self, msg): def log(self, msg):
@ -201,14 +278,34 @@ class NotRunError(Exception):
self.value = value self.value = value
def runtests(test_list, verbose = 0): def _report_error(case, debugger):
"""Ask the test case to explain failure, and optionally run a debugger
Input:
case TestCase instance
debugger if true, a debugger function to be applied to the traceback
"""
import sys
ex = sys.exc_info()
print "-----------------------------------------------------------------"
if ex:
import traceback
traceback.print_exc(file=sys.stdout)
case.explain_failure()
print "-----------------------------------------------------------------"
if debugger:
tb = ex[2]
debugger(tb)
def runtests(test_list, verbose = 0, debugger = None):
"""Run a series of tests. """Run a series of tests.
Eventually, this routine will also examine sys.argv[] to handle
extra options.
Inputs: Inputs:
test_list sequence of callable test objects test_list sequence of TestCase classes
verbose print more information as testing proceeds
debugger debugger object to be applied to errors
Returns: Returns:
unix return code: 0 for success, 1 for failures, 2 for test failure unix return code: 0 for success, 1 for failures, 2 for test failure
@ -220,36 +317,36 @@ def runtests(test_list, verbose = 0):
# flush now so that long running tests are easier to follow # flush now so that long running tests are easier to follow
sys.stdout.flush() sys.stdout.flush()
obj = None
try: try:
try: # run test and show result try: # run test and show result
obj = test_class() obj = test_class()
if hasattr(obj, "setup"):
obj.setup() obj.setup()
obj.runtest() obj.runtest()
print "OK" print "OK"
except KeyboardInterrupt: except KeyboardInterrupt:
print "INTERRUPT" print "INTERRUPT"
obj.explain_failure(sys.exc_info()) _report_error(obj, debugger)
ret = 2 ret = 2
break break
except NotRunError, msg: except NotRunError, msg:
print "NOTRUN, %s" % msg.value print "NOTRUN, %s" % msg.value
except: except:
print "FAIL" print "FAIL"
obj.explain_failure(sys.exc_info()) _report_error(obj, debugger)
ret = 1 ret = 1
finally: finally:
while obj and obj._cleanups:
try: try:
if hasattr(obj, "teardown"): apply(obj._cleanups.pop())
obj.teardown()
except KeyboardInterrupt: except KeyboardInterrupt:
print "interrupted during teardown" print "interrupted during teardown"
obj.explain_failure(sys.exc_info()) _report_error(obj, debugger)
ret = 2 ret = 2
break break
except: except:
print "error during teardown" print "error during teardown"
obj.explain_failure(sys.exc_info()) _report_error(obj, debugger)
ret = 1 ret = 1
# Display log file if we're verbose # Display log file if we're verbose
if ret == 0 and verbose: if ret == 0 and verbose:
@ -279,7 +376,8 @@ usage:
options: options:
--help show usage message --help show usage message
--list list available tests --list list available tests
--verbose show more information while running tests --verbose, -v show more information while running tests
--post-mortem, -p enter Python debugger on error
""" % sys.argv[0] """ % sys.argv[0]
@ -289,9 +387,14 @@ def print_list(test_list):
print " %s" % _test_name(test_class) print " %s" % _test_name(test_class)
def main(tests): def main(tests, extra_tests=[]):
"""Main entry point for test suites based on ComfyChair. """Main entry point for test suites based on ComfyChair.
inputs:
tests Sequence of TestCase subclasses to be run by default.
extra_tests Sequence of TestCase subclasses that are available but
not run by default.
Test suites should contain this boilerplate: Test suites should contain this boilerplate:
if __name__ == '__main__': if __name__ == '__main__':
@ -305,28 +408,37 @@ Calls sys.exit() on completion.
from sys import argv from sys import argv
import getopt, sys import getopt, sys
verbose = 0 opt_verbose = 0
debugger = None
opts, args = getopt.getopt(argv[1:], '', ['help', 'list', 'verbose']) opts, args = getopt.getopt(argv[1:], 'pv',
if ('--help', '') in opts: ['help', 'list', 'verbose', 'post-mortem'])
for opt, opt_arg in opts:
if opt == '--help':
print_help() print_help()
return return
elif ('--list', '') in opts: elif opt == '--list':
print_list(tests) print_list(tests + extra_tests)
return return
elif opt == '--verbose' or opt == '-v':
if ('--verbose', '') in opts: opt_verbose = 1
verbose = 1 elif opt == '--post-mortem' or opt == '-p':
import pdb
debugger = pdb.post_mortem
if args: if args:
all_tests = tests + extra_tests
by_name = {} by_name = {}
for t in tests: for t in all_tests:
by_name[_test_name(t)] = t by_name[_test_name(t)] = t
which_tests = [by_name[name] for name in args] which_tests = []
for name in args:
which_tests.append(by_name[name])
else: else:
which_tests = tests which_tests = tests
sys.exit(runtests(which_tests, verbose)) sys.exit(runtests(which_tests, verbose=opt_verbose,
debugger=debugger))
if __name__ == '__main__': if __name__ == '__main__':