1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-02 09:47:23 +03:00

Remove unused python selftest

It doesn't work, isn't changing, and causes a little bit of extra
confusion.

Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>

Autobuild-User(master): Andrew Bartlett <abartlet@samba.org>
Autobuild-Date(master): Thu Sep  1 13:29:46 CEST 2016 on sn-devel-144
This commit is contained in:
Douglas Bagnall 2016-08-23 12:11:59 +12:00 committed by Andrew Bartlett
parent f479b1b3fd
commit c433479312
14 changed files with 0 additions and 1859 deletions

View File

View File

@ -1,134 +0,0 @@
#!/usr/bin/python -u
# Bootstrap Samba and run a number of tests against it.
# Copyright (C) 2012 Jelmer Vernooij <jelmer@samba.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Test command running."""
import datetime
import os
import subprocess
from samba import subunit
from iso8601 import iso8601
import sys
import tempfile
import warnings
# expand strings from %ENV
def expand_environment_strings(s, vars):
# we use a reverse sort so we do the longer ones first
for k in sorted(vars.keys(), reverse=True):
v = vars[k]
s = s.replace("$%s" % k, v)
return s
def expand_command_list(cmd):
if not "$LISTOPT" in cmd:
return None
return cmd.replace("$LISTOPT", "--list")
def expand_command_run(cmd, supports_loadfile, supports_idlist, subtests=None):
"""Expand a test command.
:param cmd: Command to expand
:param supports_loadfile: Whether command supports loadfile
:param supports_idlist: Whether the command supports running specific
subtests
:param subtests: List of subtests to run - None for all subtests
:return: Tuple with command to run and temporary file to remove after
running (or None)
"""
# Generate a file with the individual tests to run, if the
# test runner for this test suite supports it.
if subtests is None:
return (cmd.replace("$LOADLIST", ""), None)
if supports_loadfile:
(fd, listid_file) = tempfile.mkstemp()
f = os.fdopen(fd, 'w')
try:
for test in subtests:
f.write(test+"\n")
finally:
f.close()
return (
cmd.replace("$LOADLIST", "--load-list=%s" % listid_file),
listid_file)
elif supports_idlist:
cmd += " " + " ".join(subtests)
return (cmd, None)
else:
warnings.warn(
"Running subtests requested, but command does not support "
"this.")
return (cmd, None)
def exported_envvars_str(vars, names):
out = ""
for n in names:
if not n in vars:
continue
out += "%s=%s\n" % (n, vars[n])
return out
def now():
"""Return datetime instance for current time in UTC.
"""
return datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
def run_testsuite_command(name, cmd, subunit_ops, env=None, outf=None):
"""Run a testsuite command.
:param name: Name of the testsuite
:param cmd: Command to run
:param subunit_ops: Subunit ops to use for reporting results
:param env: Environment the test is run in
:param outf: File-like object to write standard out to (defaults to sys.stdout)
:return: Exit code or None if the test failed to run completely
"""
if outf is None:
outf = sys.stdout
subunit_ops.start_testsuite(name)
subunit_ops.progress(None, subunit.PROGRESS_PUSH)
subunit_ops.time(now())
try:
exitcode = subprocess.call(cmd, shell=True, stdout=outf)
except Exception, e:
subunit_ops.time(now())
subunit_ops.progress(None, subunit.PROGRESS_POP)
subunit_ops.end_testsuite(name, "error", "Unable to run %r: %s" % (cmd, e))
return None
subunit_ops.time(now())
subunit_ops.progress(None, subunit.PROGRESS_POP)
if env is not None:
envlog = env.get_log()
if envlog != "":
outf.write("envlog: %s\n" % envlog)
outf.write("command: %s\n" % cmd)
outf.write("expanded command: %s\n" % expand_environment_strings(cmd, os.environ))
if exitcode == 0:
subunit_ops.end_testsuite(name, "success")
else:
subunit_ops.end_testsuite(name, "failure", "Exit code was %d" % exitcode)
return exitcode

View File

@ -1,525 +0,0 @@
#!/usr/bin/python -u
# Bootstrap Samba and run a number of tests against it.
# Copyright (C) 2005-2012 Jelmer Vernooij <jelmer@samba.org>
# Copyright (C) 2007-2009 Stefan Metzmacher <metze@samba.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import atexit
from cStringIO import StringIO
import os
import sys
import signal
import subprocess
from samba import subunit
import traceback
import warnings
import optparse
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from selftest import (
socket_wrapper,
subunithelper,
testlist,
)
from selftest.client import write_clientconf
from selftest.run import (
expand_command_list,
expand_command_run,
exported_envvars_str,
now,
run_testsuite_command,
)
from selftest.target import (
EnvironmentManager,
NoneTarget,
UnsupportedEnvironment,
)
includes = ()
excludes = ()
def read_excludes(fn):
excludes.extend(testlist.read_test_regexes(fn))
def read_includes(fn):
includes.extend(testlist.read_test_regexes(fn))
parser = optparse.OptionParser("TEST-REGEXES")
parser.add_option("--target", type="choice", choices=["samba", "samba3", "none"], default="samba", help="Samba version to target")
parser.add_option("--quick", help="run quick overall test", action="store_true", default=False)
parser.add_option("--list", help="list available tests", action="store_true", default=False)
parser.add_option("--socket-wrapper", help="enable socket wrapper", action="store_true", default=False)
parser.add_option("--socket-wrapper-pcap", help="save traffic to pcap directories", type="str")
parser.add_option("--socket-wrapper-keep-pcap", help="keep all pcap files, not just those for tests that failed", action="store_true", default=False)
parser.add_option("--one", help="abort when the first test fails", action="store_true", default=False)
parser.add_option("--exclude", action="callback", help="Add file to exclude files", callback=read_excludes)
parser.add_option("--include", action="callback", help="Add file to include files", callback=read_includes)
parser.add_option("--testenv", help="run a shell in the requested test environment", action="store_true", default=False)
parser.add_option("--resetup-environment", help="Re-setup environment", action="store_true", default=False)
parser.add_option("--load-list", help="Load list of tests to load from a file", type=str)
parser.add_option("--prefix", help="prefix to run tests in", type=str, default="./st")
parser.add_option("--srcdir", type=str, default=".", help="source directory")
parser.add_option("--bindir", type=str, default="./bin", help="binaries directory")
parser.add_option("--testlist", type=str, action="append", help="file to read available tests from")
parser.add_option("--ldap", help="back samba onto specified ldap server", choices=["openldap", "fedora-ds"], type="choice")
opts, args = parser.parse_args()
subunit_ops = subunithelper.SubunitOps(sys.stdout)
def handle_signal(sig, frame):
sys.stderr.write("Exiting early because of signal %s.\n" % sig)
sys.exit(1)
for sig in (signal.SIGINT, signal.SIGQUIT, signal.SIGTERM, signal.SIGPIPE):
signal.signal(sig, handle_signal)
def skip(name):
return testlist.find_in_list(excludes, name)
def setup_pcap(name):
if (not opts.socket_wrapper_pcap or
not os.environ.get("SOCKET_WRAPPER_PCAP_DIR")):
return
fname = "".join([x for x in name if x.isalnum() or x == '-'])
pcap_file = os.path.join(
os.environ["SOCKET_WRAPPER_PCAP_DIR"], "%s.pcap" % fname)
socket_wrapper.setup_pcap(pcap_file)
return pcap_file
def cleanup_pcap(pcap_file, exit_code):
if not opts.socket_wrapper_pcap:
return
if opts.socket_wrapper_keep_pcap:
return
if exitcode == 0:
return
if pcap_file is None:
return
os.unlink(pcap_file)
def run_testsuite(name, cmd, subunit_ops, env=None):
"""Run a single testsuite.
:param env: Environment to run in
:param name: Name of the testsuite
:param cmd: Name of the (fully expanded) command to run
:return: exitcode of the command
"""
pcap_file = setup_pcap(name)
exitcode = run_testsuite_command(name, cmd, subunit_ops, env)
if exitcode is None:
sys.exit(1)
cleanup_pcap(pcap_file, exitcode)
if not opts.socket_wrapper_keep_pcap and pcap_file is not None:
sys.stdout.write("PCAP FILE: %s\n" % pcap_file)
if exitcode != 0 and opts.one:
sys.exit(1)
return exitcode
if opts.list and opts.testenv:
sys.stderr.write("--list and --testenv are mutually exclusive\n")
sys.exit(1)
tests = args
# quick hack to disable rpc validation when using valgrind - it is way too slow
if not os.environ.get("VALGRIND"):
os.environ["VALIDATE"] = "validate"
os.environ["MALLOC_CHECK_"] = "3"
# make all our python scripts unbuffered
os.environ["PYTHONUNBUFFERED"] = "1"
bindir_abs = os.path.abspath(opts.bindir)
# Backwards compatibility:
if os.environ.get("TEST_LDAP") == "yes":
if os.environ.get("FEDORA_DS_ROOT"):
ldap = "fedora-ds"
else:
ldap = "openldap"
torture_maxtime = int(os.getenv("TORTURE_MAXTIME", "1200"))
if opts.ldap:
# LDAP is slow
torture_maxtime *= 2
prefix = os.path.normpath(opts.prefix)
# Ensure we have the test prefix around.
#
# We need restrictive permissions on this as some subdirectories in this tree
# will have wider permissions (ie 0777) and this would allow other users on the
# host to subvert the test process.
if not os.path.isdir(prefix):
os.mkdir(prefix, 0700)
else:
os.chmod(prefix, 0700)
prefix_abs = os.path.abspath(prefix)
tmpdir_abs = os.path.abspath(os.path.join(prefix_abs, "tmp"))
if not os.path.isdir(tmpdir_abs):
os.mkdir(tmpdir_abs, 0777)
srcdir_abs = os.path.abspath(opts.srcdir)
if prefix_abs == "/":
raise Exception("using '/' as absolute prefix is a bad idea")
os.environ["PREFIX"] = prefix
os.environ["KRB5CCNAME"] = os.path.join(prefix, "krb5ticket")
os.environ["PREFIX_ABS"] = prefix_abs
os.environ["SRCDIR"] = opts.srcdir
os.environ["SRCDIR_ABS"] = srcdir_abs
os.environ["BINDIR"] = bindir_abs
tls_enabled = not opts.quick
if tls_enabled:
os.environ["TLS_ENABLED"] = "yes"
else:
os.environ["TLS_ENABLED"] = "no"
def prefix_pathvar(name, newpath):
if name in os.environ:
os.environ[name] = "%s:%s" % (newpath, os.environ[name])
else:
os.environ[name] = newpath
prefix_pathvar("PKG_CONFIG_PATH", os.path.join(bindir_abs, "pkgconfig"))
prefix_pathvar("PYTHONPATH", os.path.join(bindir_abs, "python"))
if opts.socket_wrapper_keep_pcap:
# Socket wrapper keep pcap implies socket wrapper pcap
opts.socket_wrapper_pcap = True
if opts.socket_wrapper_pcap:
# Socket wrapper pcap implies socket wrapper
opts.socket_wrapper = True
if opts.socket_wrapper:
socket_wrapper_dir = socket_wrapper.setup_dir(os.path.join(prefix_abs, "w"), opts.socket_wrapper_pcap)
sys.stdout.write("SOCKET_WRAPPER_DIR=%s\n" % socket_wrapper_dir)
elif not opts.list:
if os.getuid() != 0:
warnings.warn("not using socket wrapper, but also not running as root. Will not be able to listen on proper ports")
testenv_default = "none"
# After this many seconds, the server will self-terminate. All tests
# must terminate in this time, and testenv will only stay alive this
# long
if os.environ.get("SMBD_MAXTIME", ""):
server_maxtime = int(os.environ["SMBD_MAXTIME"])
else:
server_maxtime = 7500
def has_socket_wrapper(bindir):
"""Check if Samba has been built with socket wrapper support.
"""
f = StringIO()
subprocess.check_call([os.path.join(bindir, "smbd"), "-b"], stdout=f)
for l in f.readlines():
if "SOCKET_WRAPPER" in l:
return True
return False
if not opts.list:
if opts.target == "samba":
if opts.socket_wrapper and not has_socket_wrapper(opts.bindir):
sys.stderr.write("You must include --enable-socket-wrapper when compiling Samba in order to execute 'make test'. Exiting....\n")
sys.exit(1)
testenv_default = "ad_dc_ntvfs"
from selftest.target.samba import Samba
target = Samba(opts.bindir, ldap, opts.srcdir, server_maxtime)
elif opts.target == "samba3":
if opts.socket_wrapper and not has_socket_wrapper(opts.bindir):
sys.stderr.write("You must include --enable-socket-wrapper when compiling Samba in order to execute 'make test'. Exiting....\n")
sys.exit(1)
testenv_default = "member"
from selftest.target.samba3 import Samba3
target = Samba3(opts.bindir, srcdir_abs, server_maxtime)
elif opts.target == "none":
testenv_default = "none"
target = NoneTarget()
env_manager = EnvironmentManager(target)
atexit.register(env_manager.teardown_all)
interfaces = ",".join([
"127.0.0.11/8",
"127.0.0.12/8",
"127.0.0.13/8",
"127.0.0.14/8",
"127.0.0.15/8",
"127.0.0.16/8"])
clientdir = os.path.join(prefix_abs, "client")
conffile = os.path.join(clientdir, "client.conf")
os.environ["SMB_CONF_PATH"] = conffile
todo = []
if not opts.testlist:
sys.stderr.write("No testlists specified\n")
sys.exit(1)
os.environ["SELFTEST_PREFIX"] = prefix_abs
os.environ["SELFTEST_TMPDIR"] = tmpdir_abs
os.environ["TEST_DATA_PREFIX"] = tmpdir_abs
if opts.socket_wrapper:
os.environ["SELFTEST_INTERFACES"] = interfaces
else:
os.environ["SELFTEST_INTERFACES"] = ""
if opts.quick:
os.environ["SELFTEST_QUICK"] = "1"
else:
os.environ["SELFTEST_QUICK"] = ""
os.environ["SELFTEST_MAXTIME"] = str(torture_maxtime)
available = []
for fn in opts.testlist:
for testsuite in testlist.read_testlist_file(fn):
if not testlist.should_run_test(tests, testsuite):
continue
name = testsuite[0]
if (includes is not None and
testlist.find_in_list(includes, name) is not None):
continue
available.append(testsuite)
if opts.load_list:
restricted_mgr = testlist.RestrictedTestManager.from_path(opts.load_list)
else:
restricted_mgr = None
for testsuite in available:
name = testsuite[0]
skipreason = skip(name)
if restricted_mgr is not None:
match = restricted_mgr.should_run_testsuite(name)
if match == []:
continue
else:
match = None
if skipreason is not None:
if not opts.list:
subunit_ops.skip_testsuite(name, skipreason)
else:
todo.append(testsuite + (match,))
if restricted_mgr is not None:
for name in restricted_mgr.iter_unused():
sys.stdout.write("No test or testsuite found matching %s\n" % name)
if todo == []:
sys.stderr.write("No tests to run\n")
sys.exit(1)
suitestotal = len(todo)
if not opts.list:
subunit_ops.progress(suitestotal, subunit.PROGRESS_SET)
subunit_ops.time(now())
exported_envvars = [
# domain stuff
"DOMAIN",
"REALM",
# domain controller stuff
"DC_SERVER",
"DC_SERVER_IP",
"DC_NETBIOSNAME",
"DC_NETBIOSALIAS",
# domain member
"MEMBER_SERVER",
"MEMBER_SERVER_IP",
"MEMBER_NETBIOSNAME",
"MEMBER_NETBIOSALIAS",
# rpc proxy controller stuff
"RPC_PROXY_SERVER",
"RPC_PROXY_SERVER_IP",
"RPC_PROXY_NETBIOSNAME",
"RPC_PROXY_NETBIOSALIAS",
# domain controller stuff for Vampired DC
"VAMPIRE_DC_SERVER",
"VAMPIRE_DC_SERVER_IP",
"VAMPIRE_DC_NETBIOSNAME",
"VAMPIRE_DC_NETBIOSALIAS",
# domain controller stuff for Vampired DC
"PROMOTED_DC_SERVER",
"PROMOTED_DC_SERVER_IP",
"PROMOTED_DC_NETBIOSNAME",
"PROMOTED_DC_NETBIOSALIAS",
# server stuff
"SERVER",
"SERVER_IP",
"NETBIOSNAME",
"NETBIOSALIAS",
# user stuff
"USERNAME",
"USERID",
"PASSWORD",
"DC_USERNAME",
"DC_PASSWORD",
# misc stuff
"KRB5_CONFIG",
"WINBINDD_SOCKET_DIR",
"WINBINDD_PRIV_PIPE_DIR",
"NMBD_SOCKET_DIR",
"LOCAL_PATH"
]
def switch_env(name, prefix):
if ":" in name:
(envname, option) = name.split(":", 1)
else:
envname = name
option = "client"
env = env_manager.setup_env(envname, prefix)
testenv_vars = env.get_vars()
if option == "local":
socket_wrapper.set_default_iface(testenv_vars["SOCKET_WRAPPER_DEFAULT_IFACE"])
os.environ["SMB_CONF_PATH"] = testenv_vars["SERVERCONFFILE"]
elif option == "client":
socket_wrapper.set_default_iface(11)
write_clientconf(conffile, clientdir, testenv_vars)
os.environ["SMB_CONF_PATH"] = conffile
else:
raise Exception("Unknown option[%s] for envname[%s]" % (option,
envname))
for name in exported_envvars:
if name in testenv_vars:
os.environ[name] = testenv_vars[name]
elif name in os.environ:
del os.environ[name]
return env
# This 'global' file needs to be empty when we start
dns_host_file_path = os.path.join(prefix_abs, "dns_host_file")
if os.path.exists(dns_host_file_path):
os.unlink(dns_host_file_path)
if opts.testenv:
testenv_name = os.environ.get("SELFTEST_TESTENV", testenv_default)
env = switch_env(testenv_name, prefix)
testenv_vars = env.get_vars()
os.environ["PIDDIR"] = testenv_vars["PIDDIR"]
os.environ["ENVNAME"] = testenv_name
envvarstr = exported_envvars_str(testenv_vars, exported_envvars)
term = os.environ.get("TERMINAL", "xterm -e")
cmd = """'echo -e "
Welcome to the Samba4 Test environment '%(testenv_name)'
This matches the client environment used in make test
server is pid `cat \$PIDDIR/samba.pid`
Some useful environment variables:
TORTURE_OPTIONS=\$TORTURE_OPTIONS
SMB_CONF_PATH=\$SMB_CONF_PATH
$envvarstr
\" && LD_LIBRARY_PATH=%(LD_LIBRARY_PATH)s $(SHELL)'""" % {
"testenv_name": testenv_name,
"LD_LIBRARY_PATH": os.environ["LD_LIBRARY_PATH"]}
subprocess.call(term + ' ' + cmd, shell=True)
env_manager.teardown_env(testenv_name)
elif opts.list:
for (name, envname, cmd, supports_loadfile, supports_idlist, subtests) in todo:
cmd = expand_command_list(cmd)
if cmd is None:
warnings.warn("Unable to list tests in %s" % name)
continue
exitcode = subprocess.call(cmd, shell=True)
if exitcode != 0:
sys.stderr.write("%s exited with exit code %s\n" % (cmd, exitcode))
sys.exit(1)
else:
for (name, envname, cmd, supports_loadfile, supports_idlist, subtests) in todo:
try:
env = switch_env(envname, prefix)
except UnsupportedEnvironment:
subunit_ops.start_testsuite(name)
subunit_ops.end_testsuite(name, "skip",
"environment %s is unknown in this test backend - skipping" % envname)
continue
except Exception, e:
subunit_ops.start_testsuite(name)
traceback.print_exc()
subunit_ops.end_testsuite(name, "error",
"unable to set up environment %s: %s" % (envname, e))
continue
cmd, tmpf = expand_command_run(cmd, supports_loadfile, supports_idlist,
subtests)
run_testsuite(name, cmd, subunit_ops, env=env)
if tmpf is not None:
os.remove(tmpf)
if opts.resetup_environment:
env_manager.teardown_env(envname)
env_manager.teardown_all()
sys.stdout.write("\n")
# if there were any valgrind failures, show them
for fn in os.listdir(prefix):
if fn.startswith("valgrind.log"):
sys.stdout.write("VALGRIND FAILURE\n")
f = open(os.path.join(prefix, fn), 'r')
try:
sys.stdout.write(f.read())
finally:
f.close()
sys.exit(0)

View File

@ -1,61 +0,0 @@
#!/usr/bin/python
# Bootstrap Samba and run a number of tests against it.
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
__all__ = ['setup_dir', 'setup_pcap', 'set_default_iface']
import os
import shutil
def setup_dir(dir, pcap):
"""Setup a socket wrapper directory.
:param dir: Socket wrapper directory (None if socket wrapper should be
disabled)
:param pcap: Whether to generate pcap files
:return: The socket wrapper directory
"""
pcap_dir = None
if dir is not None:
if os.path.isdir(dir):
shutil.rmtree(dir)
os.mkdir(dir, 0777)
if pcap:
pcap_dir = os.path.join(dir, "pcap")
if os.path.isdir(pcap_dir):
shutil.rmtree(pcap_dir)
os.mkdir(pcap_dir, 0777)
if pcap_dir is not None:
os.environ["SOCKET_WRAPPER_PCAP_DIR"] = pcap_dir
else:
del os.environ["SOCKET_WRAPPER_PCAP_DIR"]
if dir is not None:
os.environ["SOCKET_WRAPPER_DIR"] = dir
else:
del os.environ["SOCKET_WRAPPER_DIR"]
return dir
def setup_pcap(pcap_file):
os.environ["SOCKET_WRAPPER_PCAP_FILE"] = pcap_file
def set_default_iface(i):
os.environ["SOCKET_WRAPPER_DEFAULT_IFACE"] = str(i)

View File

@ -1,165 +0,0 @@
# target.py -- Targets
# Copyright (C) 2012 Jelmer Vernooij <jelmer@samba.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 3
# of the License or (at your option) any later version of
# the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""Selftest target management."""
__all__ = ['Target', 'Environment', 'EnvironmentManager']
class EnvironmentDown(Exception):
"""Indicates an environment has gone down."""
def __init__(self, msg):
super(EnvironmentDown, self).__init__("environment went down: %s" % msg)
class UnsupportedEnvironment(Exception):
"""Indicates a particular environment is not supported."""
def __init__(self, target, envname):
super(UnsupportedEnvironment, self).__init__(
"Target %s does not support environment %s" % (target, envname))
class Target(object):
"""A target for Samba tests."""
def setup_env(self, name, prefix):
"""Setup an environment.
:param name: name of the environment
:param prefix: directory to create it in
"""
raise NotImplementedError(self.setup_env)
class Environment(object):
"""An environment for Samba tests.
Tests often need to run against a server with particular things set up,
a "environment". This environment is provided by the test target.
"""
def check(self):
"""Check if this environment is still up and running.
:return: Boolean indicating whether environment is still running
"""
raise NotImplementedError(self.check)
def get_log(self):
"""Retrieve the last log for this environment.
:return: String with log
"""
raise NotImplementedError(self.get_log)
def teardown(self):
"""Tear down an environment.
"""
raise NotImplementedError(self.teardown)
def get_vars(self):
"""Retrieve the environment variables for this environment.
:return: Dictionary with string -> string values
"""
raise NotImplementedError(self.get_vars)
class NoneEnvironment(Environment):
"""Empty environment.
"""
def check(self):
return True
def get_log(self):
return ""
def teardown(self):
return
def get_vars(self):
return {}
class NoneTarget(Target):
"""Target that can only provide the 'none' environment."""
name = "none"
def setup_env(self, envname, prefix):
raise UnsupportedEnvironment(self.name, envname)
class EnvironmentManager(object):
"""Manager of environments."""
def __init__(self, target):
self.target = target
self.running_envs = {}
def get_running_env(self, name):
envname = name.split(":")[0]
if envname == "none":
return NoneEnvironment()
return self.running_envs.get(envname)
def getlog_env(self, envname):
env = self.get_running_env(envname)
return env.get_log()
def check_env(self, envname):
"""Check if an environment is still up.
:param envname: Environment to check
"""
env = self.get_running_env(envname)
return env.check()
def teardown_env(self, envname):
"""Tear down an environment.
:param envname: Name of the environment
"""
env = self.get_running_env(envname)
env.teardown()
del self.running_envs[envname]
def teardown_all(self):
"""Teardown all environments."""
for env in self.running_envs.iterkeys():
self.teardown_env(env)
def setup_env(self, envname, prefix):
running_env = self.get_running_env(envname)
if running_env is not None:
if not running_env.check():
raise EnvironmentDown(running_env.get_log())
return running_env
env = self.target.setup_env(envname, prefix)
if env is None:
return None
self.running_envs[envname] = env
return env

View File

@ -1,153 +0,0 @@
#!/usr/bin/perl
# Bootstrap Samba and run a number of tests against it.
# Copyright (C) 2005-2012 Jelmer Vernooij <jelmer@samba.org>
# Published under the GNU GPL, v3 or later.
import os
import sys
def bindir_path(bindir, path):
"""Find the executable to use.
:param bindir: Directory with binaries
:param path: Name of the executable to run
:return: Full path to the executable to run
"""
valpath = os.path.join(bindir, path)
if os.path.isfile(valpath):
return valpath
return path
def mk_realms_stanza(realm, dnsname, domain, kdc_ipv4):
"""Create a realms stanza for use in a krb5.conf file.
:param realm: Real name
:param dnsname: DNS name matching the realm
:param domain: Domain name
:param kdc_ipv4: IPv4 address of the KDC
:return: String with stanza
"""
return """\
%(realm)s = {
kdc = %(kdc_ipv4)s:88
admin_server = %(kdc_ipv4)s:88
default_domain = %(dnsname)s
}
%(dnsname)s = {
kdc = %(kdc_ipv4)s:88
admin_server = %(kdc_ipv4)s:88
default_domain = %(dnsname)s
}
%(domain)s = {
kdc = %(kdc_ipv4)s:88
admin_server = %(kdc_ipv4)s:88
default_domain = %(dnsname)s
}
""" % {
"kdc_ipv4": kdc_ipv4, "dnsname": dnsname, "realm": realm, "domain": domain}
def write_krb5_conf(f, realm, dnsname, domain, kdc_ipv4, tlsdir=None,
other_realms_stanza=None):
"""Write a krb5.conf file.
:param f: File-like object to write to
:param realm: Realm
:param dnsname: DNS domain name
:param domain: Domain name
:param kdc_ipv4: IPv4 address of KDC
:param tlsdir: Optional TLS directory
:param other_realms_stanza: Optional extra raw text for [realms] section
"""
f.write("""\
#Generated krb5.conf for %(realm)s
[libdefaults]
\tdefault_realm = %(realm)s
\tdns_lookup_realm = false
\tdns_lookup_kdc = false
\tticket_lifetime = 24h
\tforwardable = yes
\tallow_weak_crypto = yes
""" % {"realm": realm})
f.write("\n[realms]\n")
f.write(mk_realms_stanza(realm, dnsname, domain, kdc_ipv4))
if other_realms_stanza:
f.write(other_realms_stanza)
if tlsdir:
f.write("""
[appdefaults]
pkinit_anchors = FILE:%(tlsdir)s/ca.pem
[kdc]
enable-pkinit = true
pkinit_identity = FILE:%(tlsdir)s/kdc.pem,%(tlsdir)s/key.pem
pkinit_anchors = FILE:%(tlsdir)s/ca.pem
""" % {"tlsdir": tlsdir})
def cleanup_child(pid, name, outf=None):
"""Cleanup a child process.
:param pid: Parent pid process to be passed to waitpid()
:param name: Name to use when referring to process
:param outf: File-like object to write to (defaults to stderr)
:return: Child pid
"""
if outf is None:
outf = sys.stderr
(childpid, status) = os.waitpid(pid, os.WNOHANG)
if childpid == 0:
pass
elif childpid < 0:
outf.write("%s child process %d isn't here any more.\n" % (name, pid))
return childpid
elif status & 127:
if status & 128:
core_status = 'with'
else:
core_status = 'without'
outf.write("%s child process %d, died with signal %d, %s coredump.\n" % (name, childpid, (status & 127), core_status))
else:
outf.write("%s child process %d exited with value %d.\n" % (name, childpid, status >> 8))
return childpid
def get_interface(netbiosname):
"""Return interface id for a particular server.
"""
netbiosname = netbiosname.lower()
interfaces = {
"localnt4dc2": 2,
"localnt4member3": 3,
"localshare4": 4,
"localserver5": 5,
"localktest6": 6,
"maptoguest": 7,
# 11-16 used by selftest.pl for client interfaces
"localdc": 21,
"localvampiredc": 22,
"s4member": 23,
"localrpcproxy": 24,
"dc5": 25,
"dc6": 26,
"dc7": 27,
"rodc": 28,
"localadmember": 29,
"addc": 30,
"localsubdc": 31,
"chgdcpass": 32,
}
# update lib/socket_wrapper/socket_wrapper.c
# #define MAX_WRAPPED_INTERFACES 32
# if you wish to have more than 32 interfaces
return interfaces[netbiosname]

View File

@ -1,171 +0,0 @@
# testlist.py -- Test list
# Copyright (C) 2012 Jelmer Vernooij <jelmer@samba.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 3
# of the License or (at your option) any later version of
# the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""Selftest test list management."""
__all__ = ['find_in_list', 'read_test_regexes', 'read_testlist']
import os
import re
import sys
def find_in_list(list, fullname):
"""Find test in list.
:param list: List with 2-tuples with regex and reason
"""
for (regex, reason) in list:
if re.match(regex, fullname):
if reason is not None:
return reason
else:
return ""
return None
def read_test_regexes(f):
"""Read tuples with regular expression and optional string from a file.
:param f: File-like object to read from
:return: Iterator over tuples with regular expression and test name
"""
for l in f.readlines():
l = l.strip()
if l[0] == "#":
continue
try:
(test, reason) = l.split("#", 1)
except ValueError:
yield l, None
else:
yield test.strip(), reason.strip()
def should_run_test(tests, name):
if tests == []:
return True
for test in tests:
if re.match(test, name):
return True
return False
def read_testlist(inf, outf):
"""Read a list of tests from a file.
:param inf: File-like object to read from.
:param outf: File-like object to write to.
:return: Iterator over tuples describing tests
"""
while True:
l = inf.readline()
if l == '':
return
if l.startswith("-- TEST") and l.endswith(" --\n"):
supports_loadlist = l.startswith("-- TEST-LOADLIST")
name = inf.readline().rstrip("\n")
env = inf.readline().rstrip("\n")
if supports_loadlist:
loadlist = inf.readline().rstrip("\n")
else:
loadlist = None
cmdline = inf.readline().rstrip("\n")
yield (name, env, cmdline, loadlist)
else:
outf.write(l)
def read_restricted_test_list(f):
for l in f.readlines():
yield l.strip()
class RestrictedTestManager(object):
"""Test manager which can filter individual tests that should be run."""
def __init__(self, test_list):
self.test_list = test_list
self.unused = set(self.test_list)
@classmethod
def from_path(cls, path):
f = open(path, 'r')
try:
return cls(read_restricted_test_list(f))
finally:
f.close()
def should_run_testsuite(self, name):
"""Determine whether a testsuite should be run.
:param name: Name of the testsuite
:return: None if full testsuite should be run,
a list of subtests to run or [] if it should
not be run.
"""
match = []
for r in self.test_list:
if r == name:
match = None
if r in self.unused:
self.unused.remove(r)
elif r.startswith(name + "."):
if match is not None:
match.append(r[len(name+"."):])
if r in self.unused:
self.unused.remove(r)
return match
def iter_unused(self):
"""Iterate over entry entries that were unused.
:return: Iterator over test list entries that were not used.
"""
return iter(self.unused)
def open_file_or_pipe(path, mode):
"""Open a file or pipe.
:param path: Path to open; if it ends with | it is assumed to be a
command to run
:param mode: Mode with which to open it
:return: File-like object
"""
if path.endswith("|"):
return os.popen(path[:-1], mode)
return open(path, mode)
def read_testlist_file(fn, outf=None):
"""Read testlist file.
:param fn: Path to read (assumed to be a command to run if it ends with |)
:param outf: File-like object to pass non-test data through to
(defaults to stdout)
:return: Iterator over test suites (see read_testlist)
"""
if outf is None:
outf = sys.stdout
inf = open_file_or_pipe(fn, 'r')
try:
for testsuite in read_testlist(inf, outf):
yield testsuite
finally:
inf.close()

View File

@ -37,7 +37,6 @@ planpythontestsuite("none", "samba.tests.source")
if have_man_pages_support:
planpythontestsuite("none", "samba.tests.docs")
planpythontestsuite("none", "selftest.tests.test_suite", extra_path=[srcdir()])
try:
import testscenarios
except ImportError:

View File

@ -1,30 +0,0 @@
# __init__.py -- The tests for selftest
# Copyright (C) 2012 Jelmer Vernooij <jelmer@samba.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 3
# of the License or (at your option) any later version of
# the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""Tests for selftest."""
import unittest
def test_suite():
result = unittest.TestSuite()
names = ['socket_wrapper', 'target', 'testlist', 'run', 'samba']
module_names = ['selftest.tests.test_' + name for name in names]
loader = unittest.TestLoader()
result.addTests(loader.loadTestsFromNames(module_names))
return result

View File

@ -1,190 +0,0 @@
# test_run.py -- Tests for selftest.run
# Copyright (C) 2012 Jelmer Vernooij <jelmer@samba.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 3
# of the License or (at your option) any later version of
# the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""Tests for selftest.run."""
import datetime
import os
from samba.subunit import (
PROGRESS_PUSH,
PROGRESS_POP,
)
from samba.tests import TestCase
import tempfile
from selftest.run import (
expand_command_list,
expand_environment_strings,
expand_command_run,
exported_envvars_str,
now,
run_testsuite_command,
)
class ExpandEnvironmentStringsTests(TestCase):
def test_no_vars(self):
self.assertEquals("foo bar", expand_environment_strings("foo bar", {}))
def test_simple(self):
self.assertEquals("foo bar",
expand_environment_strings("foo $BLA", {"BLA": "bar"}))
def test_unknown(self):
self.assertEquals("foo $BLA",
expand_environment_strings("foo $BLA", {}))
class ExpandCommandListTests(TestCase):
def test_no_list(self):
self.assertIs(None, expand_command_list("test bla"))
def test_list(self):
self.assertEquals("test --list", expand_command_list("test $LISTOPT"))
class ExpandCommandRunTests(TestCase):
def test_idlist(self):
self.assertEquals(("test foo bar", None),
expand_command_run("test", False, True, subtests=["foo", "bar"]))
def test_idlist_all(self):
self.assertEquals(("test", None),
expand_command_run("test", False, True))
def test_loadlist(self):
(cmd, tmpf) = expand_command_run("test $LOADLIST", True, False,
subtests=["foo", "bar"])
self.addCleanup(os.remove, tmpf)
f = open(tmpf, 'r')
try:
self.assertEquals(f.read(), "foo\nbar\n")
finally:
f.close()
self.assertEquals("test --load-list=%s" % tmpf, cmd)
def test_loadlist_all(self):
self.assertEquals(("test ", None),
expand_command_run("test $LOADLIST", True, False))
class ExportedEnvvarsStrTests(TestCase):
def test_no_vars(self):
self.assertEquals("", exported_envvars_str({}, ["foo", "bar"]))
def test_vars(self):
self.assertEquals("foo=1\n",
exported_envvars_str({"foo": "1"}, ["foo", "bar"]))
def test_vars_unknown(self):
self.assertEquals("foo=1\n",
exported_envvars_str({"foo": "1", "bla": "2"}, ["foo", "bar"]))
class NowTests(TestCase):
def test_basic(self):
self.assertIsInstance(now(), datetime.datetime)
self.assertIsNot(now().tzinfo, None)
class MockSubunitOps(object):
def __init__(self):
self.calls = []
def start_testsuite(self, name):
self.calls.append(("start-testsuite", name))
def progress(self, count, whence):
self.calls.append(("progress", count, whence))
def time(self, t):
self.calls.append(("time", ))
def end_testsuite(self, name, result, details=None):
self.calls.append(("end-testsuite", name, result, details))
class RunTestsuiteCommandTests(TestCase):
def test_success_no_env(self):
outf = tempfile.TemporaryFile()
subunit_ops = MockSubunitOps()
exit_code = run_testsuite_command("thetestsuitename", "echo doing something", subunit_ops, outf=outf)
self.assertEquals([
("start-testsuite", "thetestsuitename"),
("progress", None, PROGRESS_PUSH),
("time", ),
("time", ),
("progress", None, PROGRESS_POP),
("end-testsuite", "thetestsuitename", "success", None),
], subunit_ops.calls)
self.assertEquals(0, exit_code)
outf.seek(0)
self.assertEquals("""\
doing something
command: echo doing something
expanded command: echo doing something
""", outf.read())
def test_failure(self):
outf = tempfile.TemporaryFile()
subunit_ops = MockSubunitOps()
exit_code = run_testsuite_command("thetestsuitename", "exit 3", subunit_ops, outf=outf)
self.assertEquals([
("start-testsuite", "thetestsuitename"),
("progress", None, PROGRESS_PUSH),
("time", ),
("time", ),
("progress", None, PROGRESS_POP),
("end-testsuite", "thetestsuitename", "failure", "Exit code was 3"),
], subunit_ops.calls)
self.assertEquals(3, exit_code)
outf.seek(0)
self.assertEquals("""\
command: exit 3
expanded command: exit 3
""", outf.read())
def test_error(self):
outf = tempfile.TemporaryFile()
subunit_ops = MockSubunitOps()
exit_code = run_testsuite_command("thetestsuitename",
"thisisacommandthatdoesnotexist 2>/dev/null", subunit_ops, outf=outf)
self.assertEquals([
("start-testsuite", "thetestsuitename"),
("progress", None, PROGRESS_PUSH),
("time", ),
("time", ),
("progress", None, PROGRESS_POP),
("end-testsuite", "thetestsuitename", "failure", "Exit code was 127"),
], subunit_ops.calls)
self.assertEquals(127, exit_code)
outf.seek(0)
self.assertEquals("""\
command: thisisacommandthatdoesnotexist 2>/dev/null
expanded command: thisisacommandthatdoesnotexist 2>/dev/null
""", outf.read())

View File

@ -1,116 +0,0 @@
# test_run.py -- Tests for selftest.target.samba
# Copyright (C) 2012 Jelmer Vernooij <jelmer@samba.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 3
# of the License or (at your option) any later version of
# the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""Tests for selftest.target.samba."""
from cStringIO import StringIO
from samba.tests import TestCase
from selftest.target.samba import (
bindir_path,
get_interface,
mk_realms_stanza,
write_krb5_conf,
)
class BinDirPathTests(TestCase):
def test_mapping(self):
self.assertEquals("exe",
bindir_path("/some/path", "exe"))
self.assertEquals("/bin/exe",
bindir_path("/bin", "/bin/exe"))
def test_no_mapping(self):
self.assertEqual("exe", bindir_path("/some/path", "exe"))
self.assertEqual("/bin/ls",
bindir_path("/bin", "ls"))
class MkRealmsStanzaTests(TestCase):
def test_basic(self):
self.assertEqual(
mk_realms_stanza("rijk", "dnsnaam", "domein", "ipv4_kdc"),
'''\
rijk = {
kdc = ipv4_kdc:88
admin_server = ipv4_kdc:88
default_domain = dnsnaam
}
dnsnaam = {
kdc = ipv4_kdc:88
admin_server = ipv4_kdc:88
default_domain = dnsnaam
}
domein = {
kdc = ipv4_kdc:88
admin_server = ipv4_kdc:88
default_domain = dnsnaam
}
''')
class WriteKrb5ConfTests(TestCase):
def test_simple(self):
f = StringIO()
write_krb5_conf(f, "rijk", "dnsnaam", "domein", "kdc_ipv4")
self.assertEquals('''\
#Generated krb5.conf for rijk
[libdefaults]
\tdefault_realm = rijk
\tdns_lookup_realm = false
\tdns_lookup_kdc = false
\tticket_lifetime = 24h
\tforwardable = yes
\tallow_weak_crypto = yes
[realms]
rijk = {
kdc = kdc_ipv4:88
admin_server = kdc_ipv4:88
default_domain = dnsnaam
}
dnsnaam = {
kdc = kdc_ipv4:88
admin_server = kdc_ipv4:88
default_domain = dnsnaam
}
domein = {
kdc = kdc_ipv4:88
admin_server = kdc_ipv4:88
default_domain = dnsnaam
}
''', f.getvalue())
class GetInterfaceTests(TestCase):
def test_get_interface(self):
self.assertEquals(21, get_interface("localdc"))
self.assertEquals(4, get_interface("localshare4"))
def test_unknown(self):
self.assertRaises(KeyError, get_interface, "unknown")

View File

@ -1,36 +0,0 @@
# test_socket_wraper.py -- The tests for selftest socket wrapper routines
# Copyright (C) 2012 Jelmer Vernooij <jelmer@samba.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 3
# of the License or (at your option) any later version of
# the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""Tests for selftest/socket_wrapper."""
from samba.tests import TestCase
from selftest import socket_wrapper
import os
class SocketWrapperTests(TestCase):
def test_setup_pcap(self):
socket_wrapper.setup_pcap("somefile")
self.assertEquals("somefile", os.environ["SOCKET_WRAPPER_PCAP_FILE"])
def test_set_default_iface(self):
socket_wrapper.set_default_iface(4)
self.assertEquals("4", os.environ["SOCKET_WRAPPER_DEFAULT_IFACE"])

View File

@ -1,129 +0,0 @@
# test_target.py -- The tests for selftest target code
# Copyright (C) 2012 Jelmer Vernooij <jelmer@samba.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 3
# of the License or (at your option) any later version of
# the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""Tests for selftest.target."""
from selftest.target import (
Environment,
EnvironmentDown,
EnvironmentManager,
NoneEnvironment,
NoneTarget,
Target,
UnsupportedEnvironment,
)
from samba.tests import TestCase
class DummyEnvironment(Environment):
def __init__(self, name, prefix):
self.name = name
self.prefix = prefix
self.check_ret = True
self.log_ret = ""
def check(self):
return self.check_ret
def get_log(self):
return self.log_ret
class DummyTarget(Target):
def setup_env(self, name, prefix):
return DummyEnvironment(name, prefix)
class NoneEnvironmentTests(TestCase):
def setUp(self):
super(NoneEnvironmentTests, self).setUp()
self.env = NoneEnvironment()
def test_get_vars(self):
self.assertEquals({}, self.env.get_vars())
def test_check(self):
self.assertEquals(True, self.env.check())
def test_get_log(self):
self.assertEquals("", self.env.get_log())
class NoneTargetTests(TestCase):
def setUp(self):
super(NoneTargetTests, self).setUp()
self.target = NoneTarget()
def test_setup_env(self):
self.assertRaises(UnsupportedEnvironment, self.target.setup_env,
"something", "prefx")
class EnvironmentManagerTests(TestCase):
def setUp(self):
super(EnvironmentManagerTests, self).setUp()
self.mgr = EnvironmentManager(DummyTarget())
def test_none(self):
self.assertIs(
NoneEnvironment, type(self.mgr.setup_env("none", "prefix")))
def test_setup(self):
env = self.mgr.setup_env("something", "prefix")
self.assertEquals(env.name, "something")
self.assertEquals(env.prefix, "prefix")
def test_setup_reuses(self):
env1 = self.mgr.setup_env("something", "prefix")
env2 = self.mgr.setup_env("something", "prefix")
self.assertIs(env1, env2)
def test_setup_down(self):
env1 = self.mgr.setup_env("something", "prefix")
env1.check_ret = False
self.assertRaises(EnvironmentDown, self.mgr.setup_env, "something", "")
def test_check(self):
env = self.mgr.setup_env("something", "prefix")
self.assertTrue(env.check())
self.assertTrue(self.mgr.check_env("something"))
env.check_ret = False
self.assertFalse(env.check())
self.assertFalse(self.mgr.check_env("something"))
def test_get_log(self):
env = self.mgr.setup_env("something", "prefix")
self.assertEquals("", env.get_log())
self.assertEquals("", self.mgr.getlog_env("something"))
env.log_ret = 'bla'
self.assertEquals('bla', env.get_log())
self.assertEquals('bla', self.mgr.getlog_env("something"))
def test_get_running_env(self):
env = self.mgr.setup_env("something", "prefix")
self.assertIs(env, self.mgr.get_running_env("something"))
def test_get_running_env_nonexistent(self):
self.assertIs(None, self.mgr.get_running_env("something"))

View File

@ -1,148 +0,0 @@
# test_testlist.py -- The tests for selftest testlist code
# Copyright (C) 2012 Jelmer Vernooij <jelmer@samba.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 3
# of the License or (at your option) any later version of
# the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""Tests for selftest.testlist."""
import os
import tempfile
from samba.tests import TestCase
from selftest.testlist import (
RestrictedTestManager,
find_in_list,
open_file_or_pipe,
read_test_regexes,
read_testlist,
read_testlist_file,
)
from cStringIO import StringIO
class FindInListTests(TestCase):
def test_empty(self):
self.assertIs(None, find_in_list([], "foo.test"))
def test_no_reason(self):
self.assertEquals("because",
find_in_list([("foo.*bar", "because")], "foo.bla.bar"))
class ReadTestRegexesTests(TestCase):
def test_comment(self):
f = StringIO("# I am a comment\n # I am also a comment\n")
self.assertEquals([], list(read_test_regexes(f)))
def test_no_reason(self):
f = StringIO(" foo\n")
self.assertEquals([("foo", None)], list(read_test_regexes(f)))
def test_reason(self):
f = StringIO(" foo # because\nbar\n")
self.assertEquals([("foo", "because"), ("bar", None)],
list(read_test_regexes(f)))
class ReadTestlistTests(TestCase):
def test_read_list(self):
inf = StringIO("-- TEST --\nfoo\nbar\nbla\n")
outf = StringIO()
self.assertEquals([('foo', 'bar', 'bla', None)],
list(read_testlist(inf, outf)))
self.assertEquals("", outf.getvalue())
def test_read_list_passes_through(self):
inf = StringIO("MORENOISE\n-- TEST --\nfoo\nbar\nbla\nNOISE\n")
outf = StringIO()
self.assertEquals([('foo', 'bar', 'bla', None)],
list(read_testlist(inf, outf)))
self.assertEquals("MORENOISE\nNOISE\n", outf.getvalue())
class RestrictedTestManagerTests(TestCase):
def test_unused(self):
mgr = RestrictedTestManager(["foo.bar"])
self.assertEquals(["foo.bar"], list(mgr.iter_unused()))
def test_run_testsuite(self):
mgr = RestrictedTestManager(["foo.bar"])
self.assertEquals(None, mgr.should_run_testsuite("foo.bar"))
def test_run_subtest(self):
mgr = RestrictedTestManager(["foo.bar.bla"])
self.assertEquals(["bla"], mgr.should_run_testsuite("foo.bar"))
def test_run_subtest_after_testsuite(self):
mgr = RestrictedTestManager(["foo.bar", "foo.bar.bla"])
self.assertEquals(None, mgr.should_run_testsuite("foo.bar"))
def test_run_multiple_subtests(self):
mgr = RestrictedTestManager(["foo.bar.blie", "foo.bar.bla"])
self.assertEquals(["blie", "bla"], mgr.should_run_testsuite("foo.bar"))
def test_run_nomatch(self):
mgr = RestrictedTestManager(["foo.bar"])
self.assertEquals([], mgr.should_run_testsuite("foo.blie.bla"))
class OpenFileOrPipeTests(TestCase):
def test_regular_file(self):
(fd, p) = tempfile.mkstemp()
self.addCleanup(os.remove, p)
f = os.fdopen(fd, 'w')
try:
f.write('data\nbla\n')
finally:
f.close()
f = open_file_or_pipe(p, 'r')
try:
self.assertEquals("data\nbla\n", f.read())
finally:
f.close()
def test_pipe(self):
f = open_file_or_pipe('echo foo|', 'r')
try:
self.assertEquals("foo\n", f.read())
finally:
f.close()
class ReadTestListFileTests(TestCase):
def test_regular_file(self):
(fd, p) = tempfile.mkstemp()
self.addCleanup(os.remove, p)
f = os.fdopen(fd, 'w')
try:
f.write('noise\n-- TEST --\ndata\nenv\ncmd\n')
finally:
f.close()
outf = StringIO()
self.assertEquals(
[('data', 'env', 'cmd', None)],
list(read_testlist_file(p, outf)))
self.assertEquals("noise\n", outf.getvalue())