1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-05 09:18:06 +03:00
samba-mirror/selftest/subunithelper.py
Joseph Sutton 421dc7fc4d python:subunit: Avoid misleading "Test was never started" error message
subunithelper.py keeps track of tests that have been started, and
displays an error message if a test reports an outcome without having
previously been started. However, it makes the assumption that a test
has finished once it has reported a single outcome. This means that a
misleading error message will be displayed if it receives multiple
outcomes from the same test (which can happen if a test using the Python
unittest framework does not complete successfully, and the cleanup
subsequently fails), and any actual errors from the cleanup remain
undisplayed.

This commit ensures that only a single outcome is reported for each
test, and only after the test has finished. Outcomes are buffered up
until the stopTest() function is called, when a single outcome is
determined and all errors received for that test are output.

FilterOps still needs to output test outcomes immediately rather than
buffering them, otherwise they are never picked up and passed on to the
remote test case by subunithelper.parse_results(). This would result in
an error as the test would be considered to have never finished.

    Example subunitrun output before the change:

time: 2021-04-28 01:28:49.862123Z
test: samba.tests.example.ExampleTests.test
time: 2021-04-28 01:28:49.862215Z
failure: samba.tests.example.ExampleTests.test [
Traceback (most recent call last):
  File "bin/python/samba/tests/example.py", line 28, in test
    self.fail()
AssertionError: None
]
time: 2021-04-28 01:28:49.862407Z
failure: samba.tests.example.ExampleTests.test [
Traceback (most recent call last):
  File "bin/python/samba/tests/example.py", line 31, in tearDown
    self.fail()
AssertionError: None
]
time: 2021-04-28 01:28:49.862467Z
time: 2021-04-28 01:28:49.862510Z

    and after:

time: 2021-04-28 01:29:19.949347Z
test: samba.tests.example.ExampleTests.test
time: 2021-04-28 01:29:19.949440Z
time: 2021-04-28 01:29:19.949590Z
time: 2021-04-28 01:29:19.949640Z
failure: samba.tests.example.ExampleTests.test [
Traceback (most recent call last):
  File "bin/python/samba/tests/example.py", line 28, in test
    self.fail()
AssertionError: None
Traceback (most recent call last):
  File "bin/python/samba/tests/example.py", line 31, in tearDown
    self.fail()
AssertionError: None
]
time: 2021-04-28 01:29:19.949702Z

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andreas Schneider <asn@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
2021-06-11 08:38:34 +00:00

732 lines
25 KiB
Python

# Python module for parsing and generating the Subunit protocol
# (Samba-specific)
# Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
__all__ = ['parse_results']
import datetime
import re
import sys
import os
from samba import subunit
from samba.subunit.run import TestProtocolClient
import unittest
try:
from dateutil.parser import isoparse as iso_parse_date
except ImportError:
try:
from iso8601 import parse_date as iso_parse_date;
except ImportError:
print('Install either python-dateutil >= 2.7.1 or python-iso8601')
VALID_RESULTS = set(['success', 'successful', 'failure', 'fail', 'skip',
'knownfail', 'error', 'xfail', 'skip-testsuite',
'testsuite-failure', 'testsuite-xfail',
'testsuite-success', 'testsuite-error',
'uxsuccess', 'testsuite-uxsuccess'])
class TestsuiteEnabledTestResult(unittest.TestResult):
def start_testsuite(self, name):
raise NotImplementedError(self.start_testsuite)
def parse_results(msg_ops, statistics, fh):
exitcode = 0
open_tests = {}
for l in fh:
parts = l.split(None, 1)
if not len(parts) == 2 or not l.startswith(parts[0]):
msg_ops.output_msg(l)
continue
command = parts[0].rstrip(":")
arg = parts[1]
if command in ("test", "testing"):
msg_ops.control_msg(l)
name = arg.rstrip()
test = subunit.RemotedTestCase(name)
if name in open_tests:
msg_ops.addError(open_tests.pop(name), subunit.RemoteError(u"Test already running"))
msg_ops.startTest(test)
open_tests[name] = test
elif command == "time":
msg_ops.control_msg(l)
try:
dt = iso_parse_date(arg.rstrip("\n"))
except TypeError as e:
print("Unable to parse time line: %s" % arg.rstrip("\n"))
else:
msg_ops.time(dt)
elif command in VALID_RESULTS:
msg_ops.control_msg(l)
result = command
grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
(testname, hasreason) = (grp.group(1), grp.group(2))
if hasreason:
reason = ""
# reason may be specified in next lines
terminated = False
for l in fh:
msg_ops.control_msg(l)
if l == "]\n":
terminated = True
break
else:
reason += l
if isinstance(reason, bytes):
remote_error = subunit.RemoteError(reason.decode("utf-8"))
else:
remote_error = subunit.RemoteError(reason)
if not terminated:
statistics['TESTS_ERROR'] += 1
msg_ops.addError(subunit.RemotedTestCase(testname),
subunit.RemoteError(u"result (%s) reason (%s) interrupted" % (result, reason)))
return 1
else:
reason = None
remote_error = subunit.RemoteError(u"No reason specified")
if result in ("success", "successful"):
try:
test = open_tests.pop(testname)
except KeyError:
statistics['TESTS_ERROR'] += 1
exitcode = 1
msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
else:
statistics['TESTS_EXPECTED_OK'] += 1
msg_ops.addSuccess(test)
elif result in ("xfail", "knownfail"):
try:
test = open_tests.pop(testname)
except KeyError:
statistics['TESTS_ERROR'] += 1
exitcode = 1
msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
else:
statistics['TESTS_EXPECTED_FAIL'] += 1
msg_ops.addExpectedFailure(test, remote_error)
elif result in ("uxsuccess", ):
try:
test = open_tests.pop(testname)
except KeyError:
statistics['TESTS_ERROR'] += 1
exitcode = 1
msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
else:
statistics['TESTS_UNEXPECTED_OK'] += 1
msg_ops.addUnexpectedSuccess(test)
exitcode = 1
elif result in ("failure", "fail"):
try:
test = open_tests.pop(testname)
except KeyError:
statistics['TESTS_ERROR'] += 1
exitcode = 1
msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
else:
statistics['TESTS_UNEXPECTED_FAIL'] += 1
exitcode = 1
msg_ops.addFailure(test, remote_error)
elif result == "skip":
statistics['TESTS_SKIP'] += 1
# Allow tests to be skipped without prior announcement of test
try:
test = open_tests.pop(testname)
except KeyError:
test = subunit.RemotedTestCase(testname)
msg_ops.addSkip(test, reason)
elif result == "error":
statistics['TESTS_ERROR'] += 1
exitcode = 1
try:
test = open_tests.pop(testname)
except KeyError:
test = subunit.RemotedTestCase(testname)
msg_ops.addError(test, remote_error)
elif result == "skip-testsuite":
msg_ops.skip_testsuite(testname)
elif result == "testsuite-success":
msg_ops.end_testsuite(testname, "success", reason)
elif result == "testsuite-failure":
msg_ops.end_testsuite(testname, "failure", reason)
exitcode = 1
elif result == "testsuite-xfail":
msg_ops.end_testsuite(testname, "xfail", reason)
elif result == "testsuite-uxsuccess":
msg_ops.end_testsuite(testname, "uxsuccess", reason)
exitcode = 1
elif result == "testsuite-error":
msg_ops.end_testsuite(testname, "error", reason)
exitcode = 1
else:
raise AssertionError("Recognized but unhandled result %r" %
result)
elif command == "testsuite":
msg_ops.start_testsuite(arg.strip())
elif command == "progress":
arg = arg.strip()
if arg == "pop":
msg_ops.progress(None, subunit.PROGRESS_POP)
elif arg == "push":
msg_ops.progress(None, subunit.PROGRESS_PUSH)
elif arg[0] in '+-':
msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
else:
msg_ops.progress(int(arg), subunit.PROGRESS_SET)
else:
msg_ops.output_msg(l)
while open_tests:
test = subunit.RemotedTestCase(open_tests.popitem()[1])
msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
statistics['TESTS_ERROR'] += 1
exitcode = 1
return exitcode
class SubunitOps(TestProtocolClient, TestsuiteEnabledTestResult):
def progress(self, count, whence):
if whence == subunit.PROGRESS_POP:
self._stream.write("progress: pop\n")
elif whence == subunit.PROGRESS_PUSH:
self._stream.write("progress: push\n")
elif whence == subunit.PROGRESS_SET:
self._stream.write("progress: %d\n" % count)
elif whence == subunit.PROGRESS_CUR:
raise NotImplementedError
# The following are Samba extensions:
def start_testsuite(self, name):
self._stream.write("testsuite: %s\n" % name)
def skip_testsuite(self, name, reason=None):
if reason:
self._stream.write("skip-testsuite: %s [\n%s\n]\n" % (name, reason))
else:
self._stream.write("skip-testsuite: %s\n" % name)
def end_testsuite(self, name, result, reason=None):
if reason:
self._stream.write("testsuite-%s: %s [\n%s\n]\n" % (result, name, reason))
else:
self._stream.write("testsuite-%s: %s\n" % (result, name))
def output_msg(self, msg):
self._stream.write(msg)
def read_test_regexes(*names):
ret = {}
files = []
for name in names:
# if we are given a directory, we read all the files it contains
# (except the ones that end with "~").
if os.path.isdir(name):
files.extend([os.path.join(name, x)
for x in os.listdir(name)
if x[-1] != '~'])
else:
files.append(name)
for filename in files:
with open(filename, 'r') as f:
for l in f:
l = l.strip()
if l == "" or l[0] == "#":
continue
if "#" in l:
(regex, reason) = l.split("#", 1)
ret[regex.strip()] = reason.strip()
else:
ret[l] = None
return ret
def find_in_list(regexes, fullname):
for regex, reason in regexes.items():
if re.match(regex, fullname):
if reason is None:
return ""
return reason
return None
class ImmediateFail(Exception):
"""Raised to abort immediately."""
def __init__(self):
super(ImmediateFail, self).__init__("test failed and fail_immediately set")
class FilterOps(unittest.TestResult):
def control_msg(self, msg):
pass # We regenerate control messages, so ignore this
def time(self, time):
self._ops.time(time)
def progress(self, delta, whence):
self._ops.progress(delta, whence)
def output_msg(self, msg):
if self.output is None:
sys.stdout.write(msg)
else:
self.output += msg
def startTest(self, test):
self.seen_output = True
test = self._add_prefix(test)
if self.strip_ok_output:
self.output = ""
self._ops.startTest(test)
def _add_prefix(self, test):
return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
def addError(self, test, err=None):
test = self._add_prefix(test)
self.error_added += 1
self.total_error += 1
self._ops.addError(test, err)
self._ops.writeOutcome(test)
self.output = None
if self.fail_immediately:
raise ImmediateFail()
def addSkip(self, test, reason=None):
self.seen_output = True
test = self._add_prefix(test)
self._ops.addSkip(test, reason)
self._ops.writeOutcome(test)
self.output = None
def addExpectedFailure(self, test, err=None):
test = self._add_prefix(test)
self._ops.addExpectedFailure(test, err)
self._ops.writeOutcome(test)
self.output = None
def addUnexpectedSuccess(self, test):
test = self._add_prefix(test)
self.uxsuccess_added += 1
self.total_uxsuccess += 1
self._ops.addUnexpectedSuccess(test)
self._ops.writeOutcome(test)
if self.output:
self._ops.output_msg(self.output)
self.output = None
if self.fail_immediately:
raise ImmediateFail()
def addFailure(self, test, err=None):
test = self._add_prefix(test)
xfail_reason = find_in_list(self.expected_failures, test.id())
if xfail_reason is None:
xfail_reason = find_in_list(self.flapping, test.id())
if xfail_reason is not None:
self.xfail_added += 1
self.total_xfail += 1
self._ops.addExpectedFailure(test, err)
self._ops.writeOutcome(test)
else:
self.fail_added += 1
self.total_fail += 1
self._ops.addFailure(test, err)
self._ops.writeOutcome(test)
if self.output:
self._ops.output_msg(self.output)
if self.fail_immediately:
raise ImmediateFail()
self.output = None
def addSuccess(self, test):
test = self._add_prefix(test)
xfail_reason = find_in_list(self.expected_failures, test.id())
if xfail_reason is not None:
self.uxsuccess_added += 1
self.total_uxsuccess += 1
self._ops.addUnexpectedSuccess(test)
self._ops.writeOutcome(test)
if self.output:
self._ops.output_msg(self.output)
if self.fail_immediately:
raise ImmediateFail()
else:
self._ops.addSuccess(test)
self._ops.writeOutcome(test)
self.output = None
def skip_testsuite(self, name, reason=None):
self._ops.skip_testsuite(name, reason)
def start_testsuite(self, name):
self._ops.start_testsuite(name)
self.error_added = 0
self.fail_added = 0
self.xfail_added = 0
self.uxsuccess_added = 0
def end_testsuite(self, name, result, reason=None):
xfail = False
if self.xfail_added > 0:
xfail = True
if self.fail_added > 0 or self.error_added > 0 or self.uxsuccess_added > 0:
xfail = False
if xfail and result in ("fail", "failure"):
result = "xfail"
if self.uxsuccess_added > 0 and result != "uxsuccess":
result = "uxsuccess"
if reason is None:
reason = "Subunit/Filter Reason"
reason += "\n uxsuccess[%d]" % self.uxsuccess_added
if self.fail_added > 0 and result != "failure":
result = "failure"
if reason is None:
reason = "Subunit/Filter Reason"
reason += "\n failures[%d]" % self.fail_added
if self.error_added > 0 and result != "error":
result = "error"
if reason is None:
reason = "Subunit/Filter Reason"
reason += "\n errors[%d]" % self.error_added
self._ops.end_testsuite(name, result, reason)
if result not in ("success", "xfail"):
if self.output:
self._ops.output_msg(self.output)
if self.fail_immediately:
raise ImmediateFail()
self.output = None
def __init__(self, out, prefix=None, suffix=None, expected_failures=None,
strip_ok_output=False, fail_immediately=False,
flapping=None):
self._ops = out
self.seen_output = False
self.output = None
self.prefix = prefix
self.suffix = suffix
if expected_failures is not None:
self.expected_failures = expected_failures
else:
self.expected_failures = {}
if flapping is not None:
self.flapping = flapping
else:
self.flapping = {}
self.strip_ok_output = strip_ok_output
self.xfail_added = 0
self.fail_added = 0
self.uxsuccess_added = 0
self.total_xfail = 0
self.total_error = 0
self.total_fail = 0
self.total_uxsuccess = 0
self.error_added = 0
self.fail_immediately = fail_immediately
class PerfFilterOps(unittest.TestResult):
def progress(self, delta, whence):
pass
def output_msg(self, msg):
pass
def control_msg(self, msg):
pass
def skip_testsuite(self, name, reason=None):
self._ops.skip_testsuite(name, reason)
def start_testsuite(self, name):
self.suite_has_time = False
def end_testsuite(self, name, result, reason=None):
pass
def _add_prefix(self, test):
return subunit.RemotedTestCase(self.prefix + test.id() + self.suffix)
def time(self, time):
self.latest_time = time
#self._ops.output_msg("found time %s\n" % time)
self.suite_has_time = True
def get_time(self):
if self.suite_has_time:
return self.latest_time
return datetime.datetime.utcnow()
def startTest(self, test):
self.seen_output = True
test = self._add_prefix(test)
self.starts[test.id()] = self.get_time()
def addSuccess(self, test):
test = self._add_prefix(test)
tid = test.id()
if tid not in self.starts:
self._ops.addError(test, "%s succeeded without ever starting!" % tid)
delta = self.get_time() - self.starts[tid]
self._ops.output_msg("elapsed-time: %s: %f\n" % (tid, delta.total_seconds()))
def addFailure(self, test, err=''):
tid = test.id()
delta = self.get_time() - self.starts[tid]
self._ops.output_msg("failure: %s failed after %f seconds (%s)\n" %
(tid, delta.total_seconds(), err))
def addError(self, test, err=''):
tid = test.id()
delta = self.get_time() - self.starts[tid]
self._ops.output_msg("error: %s failed after %f seconds (%s)\n" %
(tid, delta.total_seconds(), err))
def __init__(self, out, prefix='', suffix=''):
self._ops = out
self.prefix = prefix or ''
self.suffix = suffix or ''
self.starts = {}
self.seen_output = False
self.suite_has_time = False
class PlainFormatter(TestsuiteEnabledTestResult):
def __init__(self, verbose, immediate, statistics,
totaltests=None):
super(PlainFormatter, self).__init__()
self.verbose = verbose
self.immediate = immediate
self.statistics = statistics
self.start_time = None
self.test_output = {}
self.suitesfailed = []
self.suites_ok = 0
self.skips = {}
self.index = 0
self.name = None
self._progress_level = 0
self.totalsuites = totaltests
self.last_time = None
@staticmethod
def _format_time(delta):
minutes, seconds = divmod(delta.seconds, 60)
hours, minutes = divmod(minutes, 60)
ret = ""
if hours:
ret += "%dh" % hours
if minutes:
ret += "%dm" % minutes
ret += "%ds" % seconds
return ret
def progress(self, offset, whence):
if whence == subunit.PROGRESS_POP:
self._progress_level -= 1
elif whence == subunit.PROGRESS_PUSH:
self._progress_level += 1
elif whence == subunit.PROGRESS_SET:
if self._progress_level == 0:
self.totalsuites = offset
elif whence == subunit.PROGRESS_CUR:
raise NotImplementedError
def time(self, dt):
if self.start_time is None:
self.start_time = dt
self.last_time = dt
def start_testsuite(self, name):
self.index += 1
self.name = name
if not self.verbose:
self.test_output[name] = ""
total_tests = (self.statistics['TESTS_EXPECTED_OK'] +
self.statistics['TESTS_EXPECTED_FAIL'] +
self.statistics['TESTS_ERROR'] +
self.statistics['TESTS_UNEXPECTED_FAIL'] +
self.statistics['TESTS_UNEXPECTED_OK'])
out = "[%d(%d)" % (self.index, total_tests)
if self.totalsuites is not None:
out += "/%d" % self.totalsuites
if self.start_time is not None:
out += " at " + self._format_time(self.last_time - self.start_time)
if self.suitesfailed:
out += ", %d errors" % (len(self.suitesfailed),)
out += "] %s" % name
if self.immediate:
sys.stdout.write(out + "\n")
else:
sys.stdout.write(out + ": ")
def output_msg(self, output):
if self.verbose:
sys.stdout.write(output)
elif self.name is not None:
self.test_output[self.name] += output
else:
sys.stdout.write(output)
def control_msg(self, output):
pass
def end_testsuite(self, name, result, reason):
out = ""
unexpected = False
if name not in self.test_output:
print("no output for name[%s]" % name)
if result in ("success", "xfail"):
self.suites_ok += 1
else:
self.output_msg("ERROR: Testsuite[%s]\n" % name)
if reason is not None:
self.output_msg("REASON: %s\n" % (reason,))
self.suitesfailed.append(name)
if self.immediate and not self.verbose and name in self.test_output:
out += self.test_output[name]
unexpected = True
if not self.immediate:
if not unexpected:
out += " ok\n"
else:
out += " " + result.upper() + "\n"
sys.stdout.write(out)
def startTest(self, test):
pass
def addSuccess(self, test):
self.end_test(test.id(), "success", False)
def addError(self, test, err=None):
self.end_test(test.id(), "error", True, err)
def addFailure(self, test, err=None):
self.end_test(test.id(), "failure", True, err)
def addSkip(self, test, reason=None):
self.end_test(test.id(), "skip", False, reason)
def addExpectedFailure(self, test, err=None):
self.end_test(test.id(), "xfail", False, err)
def addUnexpectedSuccess(self, test):
self.end_test(test.id(), "uxsuccess", True)
def end_test(self, testname, result, unexpected, err=None):
if not unexpected:
self.test_output[self.name] = ""
if not self.immediate:
sys.stdout.write({
'failure': 'f',
'xfail': 'X',
'skip': 's',
'success': '.'}.get(result, "?(%s)" % result))
return
if self.name not in self.test_output:
self.test_output[self.name] = ""
self.test_output[self.name] += "UNEXPECTED(%s): %s\n" % (result, testname)
if err is not None:
self.test_output[self.name] += "REASON: %s\n" % str(err[1]).strip()
if self.immediate and not self.verbose:
sys.stdout.write(self.test_output[self.name])
self.test_output[self.name] = ""
if not self.immediate:
sys.stdout.write({
'error': 'E',
'failure': 'F',
'uxsuccess': 'U',
'success': 'S'}.get(result, "?"))
def write_summary(self, path):
f = open(path, 'w+')
if self.suitesfailed:
f.write("= Failed tests =\n")
for suite in self.suitesfailed:
f.write("== %s ==\n" % suite)
if suite in self.test_output:
f.write(self.test_output[suite] + "\n\n")
f.write("\n")
if not self.immediate and not self.verbose:
for suite in self.suitesfailed:
print("=" * 78)
print("FAIL: %s" % suite)
if suite in self.test_output:
print(self.test_output[suite])
print("")
f.write("= Skipped tests =\n")
for reason in self.skips.keys():
f.write(reason + "\n")
for name in self.skips[reason]:
f.write("\t%s\n" % name)
f.write("\n")
f.close()
if (not self.suitesfailed and
not self.statistics['TESTS_UNEXPECTED_FAIL'] and
not self.statistics['TESTS_UNEXPECTED_OK'] and
not self.statistics['TESTS_ERROR']):
ok = (self.statistics['TESTS_EXPECTED_OK'] +
self.statistics['TESTS_EXPECTED_FAIL'])
print("\nALL OK (%d tests in %d testsuites)" % (ok, self.suites_ok))
else:
print("\nFAILED (%d failures, %d errors and %d unexpected successes in %d testsuites)" % (
self.statistics['TESTS_UNEXPECTED_FAIL'],
self.statistics['TESTS_ERROR'],
self.statistics['TESTS_UNEXPECTED_OK'],
len(self.suitesfailed)))
def skip_testsuite(self, name, reason="UNKNOWN"):
self.skips.setdefault(reason, []).append(name)
if self.totalsuites:
self.totalsuites -= 1