1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-25 06:04:04 +03:00
samba-mirror/source3/stf/comfychair.py
Martin Pool 94e5719dac Merge from Subversion r50.
(This used to be commit 480487355929ec5ec066b33630c06c3a30bcca49)
2003-04-04 03:16:27 +00:00

446 lines
13 KiB
Python

#! /usr/bin/env python
# Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
# Copyright (C) 2003 by Tim Potter <tpot@samba.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
"""comfychair: a Python-based instrument of software torture.
Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
Copyright (C) 2003 by Tim Potter <tpot@samba.org>
This is a test framework designed for testing programs written in
Python, or (through a fork/exec interface) any other language.
For more information, see the file README.comfychair.
To run a test suite based on ComfyChair, just run it as a program.
"""
import sys, re
class TestCase:
"""A base class for tests. This class defines required functions which
can optionally be overridden by subclasses. It also provides some
utility functions for"""
def __init__(self):
self.test_log = ""
self.background_pids = []
self._cleanups = []
self._enter_rundir()
self._save_environment()
self.add_cleanup(self.teardown)
# --------------------------------------------------
# Save and restore directory
def _enter_rundir(self):
import os
self.basedir = os.getcwd()
self.add_cleanup(self._restore_directory)
self.rundir = os.path.join(self.basedir,
'testtmp',
self.__class__.__name__)
self.tmpdir = os.path.join(self.rundir, 'tmp')
os.system("rm -fr %s" % self.rundir)
os.makedirs(self.tmpdir)
os.system("mkdir -p %s" % self.rundir)
os.chdir(self.rundir)
def _restore_directory(self):
import os
os.chdir(self.basedir)
# --------------------------------------------------
# Save and restore environment
def _save_environment(self):
import os
self._saved_environ = os.environ.copy()
self.add_cleanup(self._restore_environment)
def _restore_environment(self):
import os
os.environ.clear()
os.environ.update(self._saved_environ)
def setup(self):
"""Set up test fixture."""
pass
def teardown(self):
"""Tear down test fixture."""
pass
def runtest(self):
"""Run the test."""
pass
def add_cleanup(self, c):
"""Queue a cleanup to be run when the test is complete."""
self._cleanups.append(c)
def fail(self, reason = ""):
"""Say the test failed."""
raise AssertionError(reason)
#############################################################
# Requisition methods
def require(self, predicate, message):
"""Check a predicate for running this test.
If the predicate value is not true, the test is skipped with a message explaining
why."""
if not predicate:
raise NotRunError, message
def require_root(self):
"""Skip this test unless run by root."""
import os
self.require(os.getuid() == 0,
"must be root to run this test")
#############################################################
# Assertion methods
def assert_(self, expr, reason = ""):
if not expr:
raise AssertionError(reason)
def assert_equal(self, a, b):
if not a == b:
raise AssertionError("assertEquals failed: %s" % `(a, b)`)
def assert_notequal(self, a, b):
if a == b:
raise AssertionError("assertNotEqual failed: %s" % `(a, b)`)
def assert_re_match(self, pattern, s):
"""Assert that a string matches a particular pattern
Inputs:
pattern string: regular expression
s string: to be matched
Raises:
AssertionError if not matched
"""
if not re.match(pattern, s):
raise AssertionError("string does not match regexp\n"
" string: %s\n"
" re: %s" % (`s`, `pattern`))
def assert_re_search(self, pattern, s):
"""Assert that a string *contains* a particular pattern
Inputs:
pattern string: regular expression
s string: to be searched
Raises:
AssertionError if not matched
"""
if not re.search(pattern, s):
raise AssertionError("string does not contain regexp\n"
" string: %s\n"
" re: %s" % (`s`, `pattern`))
def assert_no_file(self, filename):
import os.path
assert not os.path.exists(filename), ("file exists but should not: %s" % filename)
#############################################################
# Methods for running programs
def runcmd_background(self, cmd):
import os
self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
pid = os.fork()
if pid == 0:
# child
try:
os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
finally:
os._exit(127)
self.test_log = self.test_log + "pid: %d\n" % pid
return pid
def runcmd(self, cmd, expectedResult = 0):
"""Run a command, fail if the command returns an unexpected exit
code. Return the output produced."""
rc, output, stderr = self.runcmd_unchecked(cmd)
if rc != expectedResult:
raise AssertionError("""command returned %d; expected %s: \"%s\"
stdout:
%s
stderr:
%s""" % (rc, expectedResult, cmd, output, stderr))
return output, stderr
def run_captured(self, cmd):
"""Run a command, capturing stdout and stderr.
Based in part on popen2.py
Returns (waitstatus, stdout, stderr)."""
import os, types
pid = os.fork()
if pid == 0:
# child
try:
pid = os.getpid()
openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
outfd = os.open('%d.out' % pid, openmode, 0666)
os.dup2(outfd, 1)
os.close(outfd)
errfd = os.open('%d.err' % pid, openmode, 0666)
os.dup2(errfd, 2)
os.close(errfd)
if isinstance(cmd, types.StringType):
cmd = ['/bin/sh', '-c', cmd]
os.execvp(cmd[0], cmd)
finally:
os._exit(127)
else:
# parent
exited_pid, waitstatus = os.waitpid(pid, 0)
stdout = open('%d.out' % pid).read()
stderr = open('%d.err' % pid).read()
return waitstatus, stdout, stderr
def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
"""Invoke a command; return (exitcode, stdout, stderr)"""
import os
waitstatus, stdout, stderr = self.run_captured(cmd)
assert not os.WIFSIGNALED(waitstatus), \
("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
rc = os.WEXITSTATUS(waitstatus)
self.test_log = self.test_log + ("""Run command: %s
Wait status: %#x (exit code %d, signal %d)
stdout:
%s
stderr:
%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
stdout, stderr))
if skip_on_noexec and rc == 127:
# Either we could not execute the command or the command
# returned exit code 127. According to system(3) we can't
# tell the difference.
raise NotRunError, "could not execute %s" % `cmd`
return rc, stdout, stderr
def explain_failure(self, exc_info = None):
print "test_log:"
print self.test_log
def log(self, msg):
"""Log a message to the test log. This message is displayed if
the test fails, or when the runtests function is invoked with
the verbose option."""
self.test_log = self.test_log + msg + "\n"
class NotRunError(Exception):
"""Raised if a test must be skipped because of missing resources"""
def __init__(self, value = None):
self.value = value
def _report_error(case, debugger):
"""Ask the test case to explain failure, and optionally run a debugger
Input:
case TestCase instance
debugger if true, a debugger function to be applied to the traceback
"""
import sys
ex = sys.exc_info()
print "-----------------------------------------------------------------"
if ex:
import traceback
traceback.print_exc(file=sys.stdout)
case.explain_failure()
print "-----------------------------------------------------------------"
if debugger:
tb = ex[2]
debugger(tb)
def runtests(test_list, verbose = 0, debugger = None):
"""Run a series of tests.
Inputs:
test_list sequence of TestCase classes
verbose print more information as testing proceeds
debugger debugger object to be applied to errors
Returns:
unix return code: 0 for success, 1 for failures, 2 for test failure
"""
import traceback
ret = 0
for test_class in test_list:
print "%-30s" % _test_name(test_class),
# flush now so that long running tests are easier to follow
sys.stdout.flush()
obj = None
try:
try: # run test and show result
obj = test_class()
obj.setup()
obj.runtest()
print "OK"
except KeyboardInterrupt:
print "INTERRUPT"
_report_error(obj, debugger)
ret = 2
break
except NotRunError, msg:
print "NOTRUN, %s" % msg.value
except:
print "FAIL"
_report_error(obj, debugger)
ret = 1
finally:
while obj and obj._cleanups:
try:
apply(obj._cleanups.pop())
except KeyboardInterrupt:
print "interrupted during teardown"
_report_error(obj, debugger)
ret = 2
break
except:
print "error during teardown"
_report_error(obj, debugger)
ret = 1
# Display log file if we're verbose
if ret == 0 and verbose:
obj.explain_failure()
return ret
def _test_name(test_class):
"""Return a human-readable name for a test class.
"""
try:
return test_class.__name__
except:
return `test_class`
def print_help():
"""Help for people running tests"""
import sys
print """%s: software test suite based on ComfyChair
usage:
To run all tests, just run this program. To run particular tests,
list them on the command line.
options:
--help show usage message
--list list available tests
--verbose, -v show more information while running tests
--post-mortem, -p enter Python debugger on error
""" % sys.argv[0]
def print_list(test_list):
"""Show list of available tests"""
for test_class in test_list:
print " %s" % _test_name(test_class)
def main(tests, extra_tests=[]):
"""Main entry point for test suites based on ComfyChair.
inputs:
tests Sequence of TestCase subclasses to be run by default.
extra_tests Sequence of TestCase subclasses that are available but
not run by default.
Test suites should contain this boilerplate:
if __name__ == '__main__':
comfychair.main(tests)
This function handles standard options such as --help and --list, and
by default runs all tests in the suggested order.
Calls sys.exit() on completion.
"""
from sys import argv
import getopt, sys
opt_verbose = 0
debugger = None
opts, args = getopt.getopt(argv[1:], 'pv',
['help', 'list', 'verbose', 'post-mortem'])
for opt, opt_arg in opts:
if opt == '--help':
print_help()
return
elif opt == '--list':
print_list(tests + extra_tests)
return
elif opt == '--verbose' or opt == '-v':
opt_verbose = 1
elif opt == '--post-mortem' or opt == '-p':
import pdb
debugger = pdb.post_mortem
if args:
all_tests = tests + extra_tests
by_name = {}
for t in all_tests:
by_name[_test_name(t)] = t
which_tests = []
for name in args:
which_tests.append(by_name[name])
else:
which_tests = tests
sys.exit(runtests(which_tests, verbose=opt_verbose,
debugger=debugger))
if __name__ == '__main__':
print __doc__