From daf63fef10835a5c5a7b51a78fbd45911a48e58e Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 11 Dec 2014 01:07:36 +0000 Subject: [PATCH] Add simple subunit runner outputting subunit v1. This is a short module (< 1k lines) that removes the need to depend on subunit, testtools, extras and mimeparse. It is based on an extract from testtools and subunit. Change-Id: I0a4f3060b25f7bde602a07ed6bef71c8196fca64 Signed-Off-By: Jelmer Vernooij Reviewed-by: Andrew Bartlett --- python/samba/subunit/__init__.py | 0 python/samba/subunit/run.py | 750 +++++++++++++++++++++++++++++++ 2 files changed, 750 insertions(+) create mode 100644 python/samba/subunit/__init__.py create mode 100755 python/samba/subunit/run.py diff --git a/python/samba/subunit/__init__.py b/python/samba/subunit/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/samba/subunit/run.py b/python/samba/subunit/run.py new file mode 100755 index 00000000000..33efd4f6f76 --- /dev/null +++ b/python/samba/subunit/run.py @@ -0,0 +1,750 @@ +#!/usr/bin/python +# +# Simple subunit testrunner for python +# Copyright (C) Jelmer Vernooij 2014 + +# Cobbled together from testtools and subunit: +# Copyright (C) 2005-2011 Robert Collins +# Copyright (c) 2008-2011 testtools developers. +# +# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +# license at the users choice. A copy of both licenses are available in the +# project source as Apache-2.0 and BSD. You may not use this file except in +# compliance with one of these two licences. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# license you chose for the specific language governing permissions and +# limitations under that license. +# + +"""Run a unittest testcase reporting results as Subunit. + + $ python -m samba.subunit.run mylib.tests.test_suite +""" + +import datetime +import os +import sys +import traceback +import unittest + + +# From http://docs.python.org/library/datetime.html +_ZERO = datetime.timedelta(0) + +# A UTC class. + +class UTC(datetime.tzinfo): + """UTC""" + + def utcoffset(self, dt): + return _ZERO + + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return _ZERO + +utc = UTC() + +# Whether or not to hide layers of the stack trace that are +# unittest/testtools internal code. Defaults to True since the +# system-under-test is rarely unittest or testtools. +HIDE_INTERNAL_STACK = True + + +def write_traceback(stream, err, test): + """Converts a sys.exc_info()-style tuple of values into a string. + + Copied from Python 2.7's unittest.TestResult._exc_info_to_string. + """ + def _is_relevant_tb_level(tb): + return '__unittest' in tb.tb_frame.f_globals + + def _count_relevant_tb_levels(tb): + length = 0 + while tb and not _is_relevant_tb_level(tb): + length += 1 + tb = tb.tb_next + return length + + exctype, value, tb = err + # Skip test runner traceback levels + if HIDE_INTERNAL_STACK: + while tb and _is_relevant_tb_level(tb): + tb = tb.tb_next + + format_exception = traceback.format_exception + + if (HIDE_INTERNAL_STACK and test.failureException + and isinstance(value, test.failureException)): + # Skip assert*() traceback levels + length = _count_relevant_tb_levels(tb) + msgLines = format_exception(exctype, value, tb, length) + else: + msgLines = format_exception(exctype, value, tb) + stream.writelines(msgLines) + + +class TestProtocolClient(unittest.TestResult): + """A TestResult which generates a subunit stream for a test run. + + # Get a TestSuite or TestCase to run + suite = make_suite() + # Create a stream (any object with a 'write' method). This should accept + # bytes not strings: subunit is a byte orientated protocol. + stream = file('tests.log', 'wb') + # Create a subunit result object which will output to the stream + result = subunit.TestProtocolClient(stream) + # Optionally, to get timing data for performance analysis, wrap the + # serialiser with a timing decorator + result = subunit.test_results.AutoTimingTestResultDecorator(result) + # Run the test suite reporting to the subunit result object + suite.run(result) + # Close the stream. + stream.close() + """ + + def __init__(self, stream): + unittest.TestResult.__init__(self) + self._stream = stream + + def addError(self, test, error=None): + """Report an error in test test. + + :param error: Standard unittest positional argument form - an + exc_info tuple. + """ + self._addOutcome("error", test, error=error) + if self.failfast: + self.stop() + + def addExpectedFailure(self, test, error=None): + """Report an expected failure in test test. + + :param error: Standard unittest positional argument form - an + exc_info tuple. + """ + self._addOutcome("xfail", test, error=error) + + def addFailure(self, test, error=None): + """Report a failure in test test. + + :param error: Standard unittest positional argument form - an + exc_info tuple. + """ + self._addOutcome("failure", test, error=error) + if self.failfast: + self.stop() + + def _addOutcome(self, outcome, test, error=None, error_permitted=True): + """Report a failure in test test. + + :param outcome: A string describing the outcome - used as the + event name in the subunit stream. + :param error: Standard unittest positional argument form - an + exc_info tuple. + :param error_permitted: If True then error must be supplied. + If False then error must not be supplied. + """ + self._stream.write(("%s: " % outcome) + self._test_id(test)) + if error_permitted: + if error is None: + raise ValueError + else: + if error is not None: + raise ValueError + if error is not None: + self._stream.write(" [\n") + write_traceback(self._stream, error, test) + else: + self._stream.write("\n") + if error is not None: + self._stream.write("]\n") + + def addSkip(self, test, reason=None): + """Report a skipped test.""" + if reason is None: + self._addOutcome("skip", test, error=None) + else: + self._stream.write("skip: %s [\n" % test.id()) + self._stream.write("%s\n" % reason) + self._stream.write("]\n") + + def addSuccess(self, test): + """Report a success in a test.""" + self._addOutcome("successful", test, error_permitted=False) + + def addUnexpectedSuccess(self, test): + """Report an unexpected success in test test. + """ + self._addOutcome("uxsuccess", test, error_permitted=False) + if self.failfast: + self.stop() + + def _test_id(self, test): + result = test.id() + if type(result) is not bytes: + result = result.encode('utf8') + return result + + def startTest(self, test): + """Mark a test as starting its test run.""" + super(TestProtocolClient, self).startTest(test) + self._stream.write("test: " + self._test_id(test) + "\n") + self._stream.flush() + + def stopTest(self, test): + super(TestProtocolClient, self).stopTest(test) + self._stream.flush() + + def time(self, a_datetime): + """Inform the client of the time. + + ":param datetime: A datetime.datetime object. + """ + time = a_datetime.astimezone(UTC()) + self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % ( + time.year, time.month, time.day, time.hour, time.minute, + time.second, time.microsecond)) + + +def _flatten_tests(suite_or_case, unpack_outer=False): + try: + tests = iter(suite_or_case) + except TypeError: + # Not iterable, assume it's a test case. + return [(suite_or_case.id(), suite_or_case)] + if (type(suite_or_case) in (unittest.TestSuite,) or + unpack_outer): + # Plain old test suite (or any others we may add). + result = [] + for test in tests: + # Recurse to flatten. + result.extend(_flatten_tests(test)) + return result + else: + # Find any old actual test and grab its id. + suite_id = None + tests = iterate_tests(suite_or_case) + for test in tests: + suite_id = test.id() + break + # If it has a sort_tests method, call that. + if getattr(suite_or_case, 'sort_tests', None) is not None: + suite_or_case.sort_tests() + return [(suite_id, suite_or_case)] + + +def sorted_tests(suite_or_case, unpack_outer=False): + """Sort suite_or_case while preserving non-vanilla TestSuites.""" + tests = _flatten_tests(suite_or_case, unpack_outer=unpack_outer) + tests.sort() + return unittest.TestSuite([test for (sort_key, test) in tests]) + + +def iterate_tests(test_suite_or_case): + """Iterate through all of the test cases in 'test_suite_or_case'.""" + try: + suite = iter(test_suite_or_case) + except TypeError: + yield test_suite_or_case + else: + for test in suite: + for subtest in iterate_tests(test): + yield subtest + + +defaultTestLoader = unittest.defaultTestLoader +defaultTestLoaderCls = unittest.TestLoader + +if getattr(defaultTestLoader, 'discover', None) is None: + try: + import discover + defaultTestLoader = discover.DiscoveringTestLoader() + defaultTestLoaderCls = discover.DiscoveringTestLoader + have_discover = True + except ImportError: + have_discover = False +else: + have_discover = True + + +#################### +# Taken from python 2.7 and slightly modified for compatibility with +# older versions. Delete when 2.7 is the oldest supported version. +# Modifications: +# - Use have_discover to raise an error if the user tries to use +# discovery on an old version and doesn't have discover installed. +# - If --catch is given check that installHandler is available, as +# it won't be on old python versions. +# - print calls have been been made single-source python3 compatibile. +# - exception handling likewise. +# - The default help has been changed to USAGE_AS_MAIN and USAGE_FROM_MODULE +# removed. +# - A tweak has been added to detect 'python -m *.run' and use a +# better progName in that case. +# - self.module is more comprehensively set to None when being invoked from +# the commandline - __name__ is used as a sentinel value. +# - --list has been added which can list tests (should be upstreamed). +# - --load-list has been added which can reduce the tests used (should be +# upstreamed). +# - The limitation of using getopt is declared to the user. +# - http://bugs.python.org/issue16709 is worked around, by sorting tests when +# discover is used. + +FAILFAST = " -f, --failfast Stop on first failure\n" +CATCHBREAK = " -c, --catch Catch control-C and display results\n" +BUFFEROUTPUT = " -b, --buffer Buffer stdout and stderr during test runs\n" + +USAGE_AS_MAIN = """\ +Usage: %(progName)s [options] [tests] + +Options: + -h, --help Show this message + -v, --verbose Verbose output + -q, --quiet Minimal output + -l, --list List tests rather than executing them. + --load-list Specifies a file containing test ids, only tests matching + those ids are executed. +%(failfast)s%(catchbreak)s%(buffer)s +Examples: + %(progName)s test_module - run tests from test_module + %(progName)s module.TestClass - run tests from module.TestClass + %(progName)s module.Class.test_method - run specified test method + +All options must come before [tests]. [tests] can be a list of any number of +test modules, classes and test methods. + +Alternative Usage: %(progName)s discover [options] + +Options: + -v, --verbose Verbose output +%(failfast)s%(catchbreak)s%(buffer)s -s directory Directory to start discovery ('.' default) + -p pattern Pattern to match test files ('test*.py' default) + -t directory Top level directory of project (default to + start directory) + -l, --list List tests rather than executing them. + --load-list Specifies a file containing test ids, only tests matching + those ids are executed. + +For test discovery all test modules must be importable from the top +level directory of the project. +""" + + +# NOT a TestResult, because we are implementing the interface, not inheriting +# it. +class TestResultDecorator(object): + """General pass-through decorator. + + This provides a base that other TestResults can inherit from to + gain basic forwarding functionality. It also takes care of + handling the case where the target doesn't support newer methods + or features by degrading them. + """ + + def __init__(self, decorated): + """Create a TestResultDecorator forwarding to decorated.""" + # Make every decorator degrade gracefully. + self.decorated = decorated + + def startTest(self, test): + return self.decorated.startTest(test) + + def startTestRun(self): + return self.decorated.startTestRun() + + def stopTest(self, test): + return self.decorated.stopTest(test) + + def stopTestRun(self): + return self.decorated.stopTestRun() + + def addError(self, test, err=None): + return self.decorated.addError(test, err) + + def addFailure(self, test, err=None): + return self.decorated.addFailure(test, err) + + def addSuccess(self, test): + return self.decorated.addSuccess(test) + + def addSkip(self, test, reason=None): + return self.decorated.addSkip(test, reason) + + def addExpectedFailure(self, test, err=None): + return self.decorated.addExpectedFailure(test, err) + + def addUnexpectedSuccess(self, test): + return self.decorated.addUnexpectedSuccess(test) + + def _get_failfast(self): + return getattr(self.decorated, 'failfast', False) + + def _set_failfast(self, value): + self.decorated.failfast = value + failfast = property(_get_failfast, _set_failfast) + + def wasSuccessful(self): + return self.decorated.wasSuccessful() + + @property + def shouldStop(self): + return self.decorated.shouldStop + + def stop(self): + return self.decorated.stop() + + @property + def testsRun(self): + return self.decorated.testsRun + + def time(self, a_datetime): + return self.decorated.time(a_datetime) + + +class HookedTestResultDecorator(TestResultDecorator): + """A TestResult which calls a hook on every event.""" + + def __init__(self, decorated): + self.super = super(HookedTestResultDecorator, self) + self.super.__init__(decorated) + + def startTest(self, test): + self._before_event() + return self.super.startTest(test) + + def startTestRun(self): + self._before_event() + return self.super.startTestRun() + + def stopTest(self, test): + self._before_event() + return self.super.stopTest(test) + + def stopTestRun(self): + self._before_event() + return self.super.stopTestRun() + + def addError(self, test, err=None): + self._before_event() + return self.super.addError(test, err) + + def addFailure(self, test, err=None): + self._before_event() + return self.super.addFailure(test, err) + + def addSuccess(self, test): + self._before_event() + return self.super.addSuccess(test) + + def addSkip(self, test, reason=None): + self._before_event() + return self.super.addSkip(test, reason) + + def addExpectedFailure(self, test, err=None): + self._before_event() + return self.super.addExpectedFailure(test, err) + + def addUnexpectedSuccess(self, test): + self._before_event() + return self.super.addUnexpectedSuccess(test) + + def wasSuccessful(self): + self._before_event() + return self.super.wasSuccessful() + + @property + def shouldStop(self): + self._before_event() + return self.super.shouldStop + + def stop(self): + self._before_event() + return self.super.stop() + + def time(self, a_datetime): + self._before_event() + return self.super.time(a_datetime) + + +class AutoTimingTestResultDecorator(HookedTestResultDecorator): + """Decorate a TestResult to add time events to a test run. + + By default this will cause a time event before every test event, + but if explicit time data is being provided by the test run, then + this decorator will turn itself off to prevent causing confusion. + """ + + def __init__(self, decorated): + self._time = None + super(AutoTimingTestResultDecorator, self).__init__(decorated) + + def _before_event(self): + time = self._time + if time is not None: + return + time = datetime.datetime.utcnow().replace(tzinfo=UTC()) + self.decorated.time(time) + + @property + def shouldStop(self): + return self.decorated.shouldStop + + def time(self, a_datetime): + """Provide a timestamp for the current test activity. + + :param a_datetime: If None, automatically add timestamps before every + event (this is the default behaviour if time() is not called at + all). If not None, pass the provided time onto the decorated + result object and disable automatic timestamps. + """ + self._time = a_datetime + return self.decorated.time(a_datetime) + + +class SubunitTestRunner(object): + + def __init__(self, verbosity=None, failfast=None, buffer=None, stream=None): + """Create a SubunitTestRunner. + + :param verbosity: Ignored. + :param failfast: Stop running tests at the first failure. + :param buffer: Ignored. + """ + self.failfast = failfast + self.stream = stream or sys.stdout + + def run(self, test): + "Run the given test case or test suite." + result = TestProtocolClient(self.stream) + result = AutoTimingTestResultDecorator(result) + if self.failfast is not None: + result.failfast = self.failfast + test(result) + return result + + +class TestProgram(object): + """A command-line program that runs a set of tests; this is primarily + for making test modules conveniently executable. + """ + USAGE = USAGE_AS_MAIN + + # defaults for testing + failfast = catchbreak = buffer = progName = None + + def __init__(self, module=__name__, defaultTest=None, argv=None, + testRunner=None, testLoader=defaultTestLoader, + exit=True, verbosity=1, failfast=None, catchbreak=None, + buffer=None, stdout=None): + if module == __name__: + self.module = None + elif isinstance(module, str): + self.module = __import__(module) + for part in module.split('.')[1:]: + self.module = getattr(self.module, part) + else: + self.module = module + if argv is None: + argv = sys.argv + if stdout is None: + stdout = sys.stdout + + self.exit = exit + self.failfast = failfast + self.catchbreak = catchbreak + self.verbosity = verbosity + self.buffer = buffer + self.defaultTest = defaultTest + self.listtests = False + self.load_list = None + self.testRunner = testRunner + self.testLoader = testLoader + progName = argv[0] + if progName.endswith('%srun.py' % os.path.sep): + elements = progName.split(os.path.sep) + progName = '%s.run' % elements[-2] + else: + progName = os.path.basename(argv[0]) + self.progName = progName + self.parseArgs(argv) + if self.load_list: + # TODO: preserve existing suites (like testresources does in + # OptimisingTestSuite.add, but with a standard protocol). + # This is needed because the load_tests hook allows arbitrary + # suites, even if that is rarely used. + source = open(self.load_list, 'rb') + try: + lines = source.readlines() + finally: + source.close() + test_ids = set(line.strip().decode('utf-8') for line in lines) + filtered = unittest.TestSuite() + for test in iterate_tests(self.test): + if test.id() in test_ids: + filtered.addTest(test) + self.test = filtered + if not self.listtests: + self.runTests() + else: + for test in iterate_tests(self.test): + stdout.write('%s\n' % test.id()) + + def parseArgs(self, argv): + if len(argv) > 1 and argv[1].lower() == 'discover': + self._do_discovery(argv[2:]) + return + + import getopt + long_opts = ['help', 'verbose', 'quiet', 'failfast', 'catch', 'buffer', + 'list', 'load-list='] + try: + options, args = getopt.getopt(argv[1:], 'hHvqfcbl', long_opts) + for opt, value in options: + if opt in ('-h','-H','--help'): + self.usageExit() + if opt in ('-q','--quiet'): + self.verbosity = 0 + if opt in ('-v','--verbose'): + self.verbosity = 2 + if opt in ('-f','--failfast'): + if self.failfast is None: + self.failfast = True + # Should this raise an exception if -f is not valid? + if opt in ('-c','--catch'): + if self.catchbreak is None: + self.catchbreak = True + # Should this raise an exception if -c is not valid? + if opt in ('-b','--buffer'): + if self.buffer is None: + self.buffer = True + # Should this raise an exception if -b is not valid? + if opt in ('-l', '--list'): + self.listtests = True + if opt == '--load-list': + self.load_list = value + if len(args) == 0 and self.defaultTest is None: + # createTests will load tests from self.module + self.testNames = None + elif len(args) > 0: + self.testNames = args + else: + self.testNames = (self.defaultTest,) + self.createTests() + except getopt.error: + self.usageExit(sys.exc_info()[1]) + + def createTests(self): + if self.testNames is None: + self.test = self.testLoader.loadTestsFromModule(self.module) + else: + self.test = self.testLoader.loadTestsFromNames(self.testNames, + self.module) + + def _do_discovery(self, argv, Loader=defaultTestLoaderCls): + # handle command line args for test discovery + if not have_discover: + raise AssertionError("Unable to use discovery, must use python 2.7 " + "or greater, or install the discover package.") + self.progName = '%s discover' % self.progName + import optparse + parser = optparse.OptionParser() + parser.prog = self.progName + parser.add_option('-v', '--verbose', dest='verbose', default=False, + help='Verbose output', action='store_true') + if self.failfast != False: + parser.add_option('-f', '--failfast', dest='failfast', default=False, + help='Stop on first fail or error', + action='store_true') + if self.catchbreak != False: + parser.add_option('-c', '--catch', dest='catchbreak', default=False, + help='Catch ctrl-C and display results so far', + action='store_true') + if self.buffer != False: + parser.add_option('-b', '--buffer', dest='buffer', default=False, + help='Buffer stdout and stderr during tests', + action='store_true') + parser.add_option('-s', '--start-directory', dest='start', default='.', + help="Directory to start discovery ('.' default)") + parser.add_option('-p', '--pattern', dest='pattern', default='test*.py', + help="Pattern to match tests ('test*.py' default)") + parser.add_option('-t', '--top-level-directory', dest='top', default=None, + help='Top level directory of project (defaults to start directory)') + parser.add_option('-l', '--list', dest='listtests', default=False, action="store_true", + help='List tests rather than running them.') + parser.add_option('--load-list', dest='load_list', default=None, + help='Specify a filename containing the test ids to use.') + + options, args = parser.parse_args(argv) + if len(args) > 3: + self.usageExit() + + for name, value in zip(('start', 'pattern', 'top'), args): + setattr(options, name, value) + + # only set options from the parsing here + # if they weren't set explicitly in the constructor + if self.failfast is None: + self.failfast = options.failfast + if self.catchbreak is None: + self.catchbreak = options.catchbreak + if self.buffer is None: + self.buffer = options.buffer + self.listtests = options.listtests + self.load_list = options.load_list + + if options.verbose: + self.verbosity = 2 + + start_dir = options.start + pattern = options.pattern + top_level_dir = options.top + + loader = Loader() + # See http://bugs.python.org/issue16709 + # While sorting here is intrusive, its better than being random. + # Rules for the sort: + # - standard suites are flattened, and the resulting tests sorted by + # id. + # - non-standard suites are preserved as-is, and sorted into position + # by the first test found by iterating the suite. + # We do this by a DSU process: flatten and grab a key, sort, strip the + # keys. + loaded = loader.discover(start_dir, pattern, top_level_dir) + self.test = sorted_tests(loaded) + + def runTests(self): + if (self.catchbreak + and getattr(unittest, 'installHandler', None) is not None): + unittest.installHandler() + self.result = self.testRunner.run(self.test) + if self.exit: + sys.exit(not self.result.wasSuccessful()) + + def usageExit(self, msg=None): + if msg: + print (msg) + usage = {'progName': self.progName, 'catchbreak': '', 'failfast': '', + 'buffer': ''} + if self.failfast != False: + usage['failfast'] = FAILFAST + if self.catchbreak != False: + usage['catchbreak'] = CATCHBREAK + if self.buffer != False: + usage['buffer'] = BUFFEROUTPUT + usage_text = self.USAGE % usage + usage_lines = usage_text.split('\n') + usage_lines.insert(2, "Run a test suite with a subunit reporter.") + usage_lines.insert(3, "") + print('\n'.join(usage_lines)) + sys.exit(2) + + +if __name__ == '__main__': + TestProgram(module=None, argv=sys.argv, testRunner=SubunitTestRunner(), + stdout=sys.stdout)