1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-26 10:04:02 +03:00

subunit: Update to latest upstream version.

Autobuild-User(master): Jelmer Vernooij <jelmer@samba.org>
Autobuild-Date(master): Wed Nov 14 12:11:58 CET 2012 on sn-devel-104
This commit is contained in:
Jelmer Vernooij 2012-11-14 09:47:16 +01:00
parent 7b654a8c18
commit a53caea7a2
21 changed files with 1167 additions and 342 deletions

View File

@ -28,6 +28,7 @@ EXTRA_DIST = \
python/subunit/tests/test_details.py \
python/subunit/tests/test_progress_model.py \
python/subunit/tests/test_subunit_filter.py \
python/subunit/tests/test_run.py \
python/subunit/tests/test_subunit_stats.py \
python/subunit/tests/test_subunit_tags.py \
python/subunit/tests/test_tap2subunit.py \
@ -67,6 +68,7 @@ pkgpython_PYTHON = \
python/subunit/__init__.py \
python/subunit/chunked.py \
python/subunit/details.py \
python/subunit/filters.py \
python/subunit/iso8601.py \
python/subunit/progress_model.py \
python/subunit/run.py \

View File

@ -5,12 +5,30 @@ subunit release notes
NEXT (In development)
---------------------
BUG FIXES
~~~~~~~~~
* ``python/subunit/tests/test_run.py`` and ``python/subunit/filters.py`` were
not included in the 0.0.8 tarball. (Robert Collins)
0.0.8
-----
IMPROVEMENTS
~~~~~~~~~~~~
* Perl module now correctly outputs "failure" instead of "fail". (Stewart Smith)
* Shell functions now output timestamps. (Stewart Smith)
* Shell functions now output timestamps. (Stewart Smith, Robert Collins)
* 'subunit2csv' script that converts subunit output to CSV format.
(Jonathan Lange)
* ``TagCollapsingDecorator`` now correctly distinguishes between local and
global tags. (Jonathan Lange)
* ``TestResultFilter`` always forwards ``time:`` events.
(Benji York, Brad Crittenden)
BUG FIXES
~~~~~~~~~
@ -22,6 +40,23 @@ BUG FIXES
'--no-xfail', '--no-passthrough, '--no-success', and gives you just the
failure stream. (John Arbash Meinel)
* Python2.6 support was broken by the fixup feature.
(Arfrever Frehtes Taifersar Arahesis, #987490)
* Python3 support regressed in trunk.
(Arfrever Frehtes Taifersar Arahesis, #987514)
* Python3 support was insufficiently robust in detecting unicode streams.
(Robert Collins, Arfrever Frehtes Taifersar Arahesis)
* Tag support has been implemented for TestProtocolClient.
(Robert Collins, #518016)
* Tags can now be filtered. (Jonathan Lange, #664171)
* Test suite works with latest testtools (but not older ones - formatting
changes only). (Robert Collins)
0.0.7
-----

View File

@ -1,6 +1,6 @@
m4_define([SUBUNIT_MAJOR_VERSION], [0])
m4_define([SUBUNIT_MINOR_VERSION], [0])
m4_define([SUBUNIT_MICRO_VERSION], [7])
m4_define([SUBUNIT_MICRO_VERSION], [8])
m4_define([SUBUNIT_VERSION],
m4_defn([SUBUNIT_MAJOR_VERSION]).m4_defn([SUBUNIT_MINOR_VERSION]).m4_defn([SUBUNIT_MICRO_VERSION]))
AC_PREREQ([2.59])

View File

@ -36,41 +36,59 @@ from subunit import (
TestProtocolClient,
read_test_list,
)
from subunit.test_results import TestResultFilter
from subunit.filters import filter_by_result
from subunit.test_results import (
and_predicates,
make_tag_filter,
TestResultFilter,
)
def make_options(description):
parser = OptionParser(description=__doc__)
parser.add_option("--error", action="store_false",
help="include errors", default=False, dest="error")
parser.add_option("-e", "--no-error", action="store_true",
help="exclude errors", dest="error")
parser.add_option("--failure", action="store_false",
help="include failures", default=False, dest="failure")
parser.add_option("-f", "--no-failure", action="store_true",
help="exclude failures", dest="failure")
parser.add_option("--passthrough", action="store_false",
help="Show all non subunit input.", default=False, dest="no_passthrough")
parser.add_option("--no-passthrough", action="store_true",
help="Hide all non subunit input.", default=False, dest="no_passthrough")
parser.add_option("-s", "--success", action="store_false",
help="include successes", dest="success")
parser.add_option("--no-success", action="store_true",
help="exclude successes", default=True, dest="success")
parser.add_option("--no-skip", action="store_true",
help="exclude skips", dest="skip")
parser.add_option("--xfail", action="store_false",
help="include expected falures", default=True, dest="xfail")
parser.add_option("--no-xfail", action="store_true",
help="exclude expected falures", default=True, dest="xfail")
parser.add_option(
"--with-tag", type=str,
help="include tests with these tags", action="append", dest="with_tags")
parser.add_option(
"--without-tag", type=str,
help="exclude tests with these tags", action="append", dest="without_tags")
parser.add_option("-m", "--with", type=str,
help="regexp to include (case-sensitive by default)",
action="append", dest="with_regexps")
parser.add_option("--fixup-expected-failures", type=str,
help="File with list of test ids that are expected to fail; on failure "
"their result will be changed to xfail; on success they will be "
"changed to error.", dest="fixup_expected_failures", action="append")
parser.add_option("--without", type=str,
help="regexp to exclude (case-sensitive by default)",
action="append", dest="without_regexps")
parser.add_option("-F", "--only-genuine-failures", action="callback",
callback=only_genuine_failures_callback,
help="Only pass through failures and exceptions.")
return parser
parser = OptionParser(description=__doc__)
parser.add_option("--error", action="store_false",
help="include errors", default=False, dest="error")
parser.add_option("-e", "--no-error", action="store_true",
help="exclude errors", dest="error")
parser.add_option("--failure", action="store_false",
help="include failures", default=False, dest="failure")
parser.add_option("-f", "--no-failure", action="store_true",
help="exclude failures", dest="failure")
parser.add_option("--passthrough", action="store_false",
help="Show all non subunit input.", default=False, dest="no_passthrough")
parser.add_option("--no-passthrough", action="store_true",
help="Hide all non subunit input.", default=False, dest="no_passthrough")
parser.add_option("-s", "--success", action="store_false",
help="include successes", dest="success")
parser.add_option("--no-success", action="store_true",
help="exclude successes", default=True, dest="success")
parser.add_option("--no-skip", action="store_true",
help="exclude skips", dest="skip")
parser.add_option("--xfail", action="store_false",
help="include expected falures", default=True, dest="xfail")
parser.add_option("--no-xfail", action="store_true",
help="exclude expected falures", default=True, dest="xfail")
parser.add_option("-m", "--with", type=str,
help="regexp to include (case-sensitive by default)",
action="append", dest="with_regexps")
parser.add_option("--fixup-expected-failures", type=str,
help="File with list of test ids that are expected to fail; on failure "
"their result will be changed to xfail; on success they will be "
"changed to error.", dest="fixup_expected_failures", action="append")
parser.add_option("--without", type=str,
help="regexp to exclude (case-sensitive by default)",
action="append", dest="without_regexps")
def only_genuine_failures_callback(option, opt, value, parser):
parser.rargs.insert(0, '--no-passthrough')
@ -78,11 +96,6 @@ def only_genuine_failures_callback(option, opt, value, parser):
parser.rargs.insert(0, '--no-skip')
parser.rargs.insert(0, '--no-success')
parser.add_option("-F", "--only-genuine-failures", action="callback",
callback=only_genuine_failures_callback,
help="Only pass through failures and exceptions.")
(options, args) = parser.parse_args()
def _compile_re_from_list(l):
return re.compile("|".join(l), re.MULTILINE)
@ -97,7 +110,7 @@ def _make_regexp_filter(with_regexps, without_regexps):
with_re = with_regexps and _compile_re_from_list(with_regexps)
without_re = without_regexps and _compile_re_from_list(without_regexps)
def check_regexps(test, outcome, err, details):
def check_regexps(test, outcome, err, details, tags):
"""Check if this test and error match the regexp filters."""
test_str = str(test) + outcome + str(err) + str(details)
if with_re and not with_re.search(test_str):
@ -108,21 +121,38 @@ def _make_regexp_filter(with_regexps, without_regexps):
return check_regexps
regexp_filter = _make_regexp_filter(options.with_regexps,
options.without_regexps)
fixup_expected_failures = set()
for path in options.fixup_expected_failures or ():
fixup_expected_failures.update(read_test_list(path))
result = TestProtocolClient(sys.stdout)
result = TestResultFilter(result, filter_error=options.error,
filter_failure=options.failure, filter_success=options.success,
filter_skip=options.skip, filter_xfail=options.xfail,
filter_predicate=regexp_filter,
fixup_expected_failures=fixup_expected_failures)
if options.no_passthrough:
passthrough_stream = DiscardStream()
else:
passthrough_stream = None
test = ProtocolTestCase(sys.stdin, passthrough=passthrough_stream)
test.run(result)
sys.exit(0)
def _make_result(output, options, predicate):
"""Make the result that we'll send the test outcomes to."""
fixup_expected_failures = set()
for path in options.fixup_expected_failures or ():
fixup_expected_failures.update(read_test_list(path))
return TestResultFilter(
TestProtocolClient(output),
filter_error=options.error,
filter_failure=options.failure,
filter_success=options.success,
filter_skip=options.skip,
filter_xfail=options.xfail,
filter_predicate=predicate,
fixup_expected_failures=fixup_expected_failures)
def main():
parser = make_options(__doc__)
(options, args) = parser.parse_args()
regexp_filter = _make_regexp_filter(
options.with_regexps, options.without_regexps)
tag_filter = make_tag_filter(options.with_tags, options.without_tags)
filter_predicate = and_predicates([regexp_filter, tag_filter])
filter_by_result(
lambda output_to: _make_result(sys.stdout, options, filter_predicate),
output_path=None,
passthrough=(not options.no_passthrough),
forward=False)
sys.exit(0)
if __name__ == '__main__':
main()

View File

@ -16,50 +16,29 @@
"""Notify the user of a finished test run."""
from optparse import OptionParser
import sys
import pygtk
pygtk.require('2.0')
import pynotify
from subunit import DiscardStream, ProtocolTestCase, TestResultStats
from subunit import TestResultStats
from subunit.filters import run_filter_script
if not pynotify.init("Subunit-notify"):
sys.exit(1)
parser = OptionParser(description=__doc__)
parser.add_option("--no-passthrough", action="store_true",
help="Hide all non subunit input.", default=False, dest="no_passthrough")
parser.add_option("-f", "--forward", action="store_true", default=False,
help="Forward subunit stream on stdout.")
(options, args) = parser.parse_args()
result = TestResultStats(sys.stdout)
if options.no_passthrough:
passthrough_stream = DiscardStream()
else:
passthrough_stream = None
if options.forward:
forward_stream = sys.stdout
else:
forward_stream = None
test = ProtocolTestCase(sys.stdin, passthrough=passthrough_stream,
forward=forward_stream)
test.run(result)
if result.failed_tests > 0:
summary = "Test run failed"
else:
summary = "Test run successful"
body = "Total tests: %d; Passed: %d; Failed: %d" % (
result.total_tests,
result.passed_tests,
result.failed_tests,
)
nw = pynotify.Notification(summary, body)
nw.show()
if result.wasSuccessful():
exit_code = 0
else:
exit_code = 1
sys.exit(exit_code)
def notify_of_result(result):
if result.failed_tests > 0:
summary = "Test run failed"
else:
summary = "Test run successful"
body = "Total tests: %d; Passed: %d; Failed: %d" % (
result.total_tests,
result.passed_tests,
result.failed_tests,
)
nw = pynotify.Notification(summary, body)
nw.show()
run_filter_script(TestResultStats, __doc__, notify_of_result)

23
lib/subunit/filters/subunit2csv Executable file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env python
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is d on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Turn a subunit stream into a CSV"""
from subunit.filters import run_filter_script
from subunit.test_results import CsvResult
run_filter_script(CsvResult, __doc__)

View File

@ -16,11 +16,10 @@
"""Filter a subunit stream to get aggregate statistics."""
from optparse import OptionParser
import sys
import unittest
from subunit import DiscardStream, ProtocolTestCase
import sys
from subunit.filters import run_filter_script
try:
from junitxml import JUnitXmlResult
except ImportError:
@ -28,38 +27,5 @@ except ImportError:
"http://pypi.python.org/pypi/junitxml) is required for this filter.")
raise
parser = OptionParser(description=__doc__)
parser.add_option("--no-passthrough", action="store_true",
help="Hide all non subunit input.", default=False, dest="no_passthrough")
parser.add_option("-o", "--output-to",
help="Output the XML to this path rather than stdout.")
parser.add_option("-f", "--forward", action="store_true", default=False,
help="Forward subunit stream on stdout.")
(options, args) = parser.parse_args()
if options.output_to is None:
output_to = sys.stdout
else:
output_to = file(options.output_to, 'wb')
try:
result = JUnitXmlResult(output_to)
if options.no_passthrough:
passthrough_stream = DiscardStream()
else:
passthrough_stream = None
if options.forward:
forward_stream = sys.stdout
else:
forward_stream = None
test = ProtocolTestCase(sys.stdin, passthrough=passthrough_stream,
forward=forward_stream)
result.startTestRun()
test.run(result)
result.stopTestRun()
finally:
if options.output_to is not None:
output_to.close()
if result.wasSuccessful():
exit_code = 0
else:
exit_code = 1
sys.exit(exit_code)
run_filter_script(JUnitXmlResult, __doc__)

View File

@ -13,6 +13,7 @@ check: # test
uninstall_distcheck:
rm -fr $(DESTINSTALLARCHLIB)
rm MYMETA.yml
VPATH = @srcdir@
.PHONY: uninstall_distcheck

View File

@ -121,8 +121,14 @@ import re
import subprocess
import sys
import unittest
if sys.version_info > (3, 0):
from io import UnsupportedOperation as _UnsupportedOperation
else:
_UnsupportedOperation = AttributeError
from testtools import content, content_type, ExtendedToOriginalDecorator
from testtools.content import TracebackContent
from testtools.compat import _b, _u, BytesIO, StringIO
try:
from testtools.testresult.real import _StringException
@ -182,9 +188,15 @@ def tags_to_new_gone(tags):
class DiscardStream(object):
"""A filelike object which discards what is written to it."""
def fileno(self):
raise _UnsupportedOperation()
def write(self, bytes):
pass
def read(self, len=0):
return _b('')
class _ParserState(object):
"""State for the subunit parser."""
@ -599,8 +611,8 @@ class TestProtocolClient(testresult.TestResult):
def __init__(self, stream):
testresult.TestResult.__init__(self)
stream = _make_stream_binary(stream)
self._stream = stream
_make_stream_binary(stream)
self._progress_fmt = _b("progress: ")
self._bytes_eol = _b("\n")
self._progress_plus = _b("+")
@ -682,10 +694,9 @@ class TestProtocolClient(testresult.TestResult):
raise ValueError
if error is not None:
self._stream.write(self._start_simple)
# XXX: this needs to be made much stricter, along the lines of
# Martin[gz]'s work in testtools. Perhaps subunit can use that?
for line in self._exc_info_to_unicode(error, test).splitlines():
self._stream.write(("%s\n" % line).encode('utf8'))
tb_content = TracebackContent(error, test)
for bytes in tb_content.iter_bytes():
self._stream.write(bytes)
elif details is not None:
self._write_details(details)
else:
@ -755,6 +766,15 @@ class TestProtocolClient(testresult.TestResult):
self._stream.write(self._progress_fmt + prefix + offset +
self._bytes_eol)
def tags(self, new_tags, gone_tags):
"""Inform the client about tags added/removed from the stream."""
if not new_tags and not gone_tags:
return
tags = set([tag.encode('utf8') for tag in new_tags])
tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags])
tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n")
self._stream.write(tag_line)
def time(self, a_datetime):
"""Inform the client of the time.
@ -1122,7 +1142,7 @@ class ProtocolTestCase(object):
:seealso: TestProtocolServer (the subunit wire protocol parser).
"""
def __init__(self, stream, passthrough=None, forward=False):
def __init__(self, stream, passthrough=None, forward=None):
"""Create a ProtocolTestCase reading from stream.
:param stream: A filelike object which a subunit stream can be read
@ -1132,9 +1152,11 @@ class ProtocolTestCase(object):
:param forward: A stream to pass subunit input on to. If not supplied
subunit input is not forwarded.
"""
stream = _make_stream_binary(stream)
self._stream = stream
_make_stream_binary(stream)
self._passthrough = passthrough
if forward is not None:
forward = _make_stream_binary(forward)
self._forward = forward
def __call__(self, result=None):
@ -1217,11 +1239,6 @@ def get_default_formatter():
return stream
if sys.version_info > (3, 0):
from io import UnsupportedOperation as _NoFilenoError
else:
_NoFilenoError = AttributeError
def read_test_list(path):
"""Read a list of test ids from a file on disk.
@ -1236,15 +1253,37 @@ def read_test_list(path):
def _make_stream_binary(stream):
"""Ensure that a stream will be binary safe. See _make_binary_on_windows."""
"""Ensure that a stream will be binary safe. See _make_binary_on_windows.
:return: A binary version of the same stream (some streams cannot be
'fixed' but can be unwrapped).
"""
try:
fileno = stream.fileno()
except _NoFilenoError:
return
_make_binary_on_windows(fileno)
except _UnsupportedOperation:
pass
else:
_make_binary_on_windows(fileno)
return _unwrap_text(stream)
def _make_binary_on_windows(fileno):
"""Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
if sys.platform == "win32":
import msvcrt
msvcrt.setmode(fileno, os.O_BINARY)
def _unwrap_text(stream):
"""Unwrap stream if it is a text stream to get the original buffer."""
if sys.version_info > (3, 0):
try:
# Read streams
if type(stream.read(0)) is str:
return stream.buffer
except (_UnsupportedOperation, IOError):
# Cannot read from the stream: try via writes
try:
stream.write(_b(''))
except TypeError:
return stream.buffer
return stream

View File

@ -0,0 +1,125 @@
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
from optparse import OptionParser
import sys
from subunit import DiscardStream, ProtocolTestCase
def make_options(description):
parser = OptionParser(description=description)
parser.add_option(
"--no-passthrough", action="store_true",
help="Hide all non subunit input.", default=False,
dest="no_passthrough")
parser.add_option(
"-o", "--output-to",
help="Send the output to this path rather than stdout.")
parser.add_option(
"-f", "--forward", action="store_true", default=False,
help="Forward subunit stream on stdout.")
return parser
def run_tests_from_stream(input_stream, result, passthrough_stream=None,
forward_stream=None):
"""Run tests from a subunit input stream through 'result'.
:param input_stream: A stream containing subunit input.
:param result: A TestResult that will receive the test events.
:param passthrough_stream: All non-subunit input received will be
sent to this stream. If not provided, uses the ``TestProtocolServer``
default, which is ``sys.stdout``.
:param forward_stream: All subunit input received will be forwarded
to this stream. If not provided, uses the ``TestProtocolServer``
default, which is to not forward any input.
"""
test = ProtocolTestCase(
input_stream, passthrough=passthrough_stream,
forward=forward_stream)
result.startTestRun()
test.run(result)
result.stopTestRun()
def filter_by_result(result_factory, output_path, passthrough, forward,
input_stream=sys.stdin):
"""Filter an input stream using a test result.
:param result_factory: A callable that when passed an output stream
returns a TestResult. It is expected that this result will output
to the given stream.
:param output_path: A path send output to. If None, output will be go
to ``sys.stdout``.
:param passthrough: If True, all non-subunit input will be sent to
``sys.stdout``. If False, that input will be discarded.
:param forward: If True, all subunit input will be forwarded directly to
``sys.stdout`` as well as to the ``TestResult``.
:param input_stream: The source of subunit input. Defaults to
``sys.stdin``.
:return: A test result with the resultts of the run.
"""
if passthrough:
passthrough_stream = sys.stdout
else:
passthrough_stream = DiscardStream()
if forward:
forward_stream = sys.stdout
else:
forward_stream = DiscardStream()
if output_path is None:
output_to = sys.stdout
else:
output_to = file(output_path, 'wb')
try:
result = result_factory(output_to)
run_tests_from_stream(
input_stream, result, passthrough_stream, forward_stream)
finally:
if output_path:
output_to.close()
return result
def run_filter_script(result_factory, description, post_run_hook=None):
"""Main function for simple subunit filter scripts.
Many subunit filter scripts take a stream of subunit input and use a
TestResult to handle the events generated by that stream. This function
wraps a lot of the boiler-plate around that by making a script with
options for handling passthrough information and stream forwarding, and
that will exit with a successful return code (i.e. 0) if the input stream
represents a successful test run.
:param result_factory: A callable that takes an output stream and returns
a test result that outputs to that stream.
:param description: A description of the filter script.
"""
parser = make_options(description)
(options, args) = parser.parse_args()
result = filter_by_result(
result_factory, options.output_to, not options.no_passthrough,
options.forward)
if post_run_hook:
post_run_hook(result)
if result.wasSuccessful():
sys.exit(0)
else:
sys.exit(1)

View File

@ -127,7 +127,7 @@ def parse_date(datestring, default_timezone=UTC):
if groups["fraction"] is None:
groups["fraction"] = 0
else:
groups["fraction"] = int(float("0.%s" % groups["fraction"]) * 1e6)
groups["fraction"] = int(float("0.%s" % groups["fraction"].decode()) * 1e6)
return datetime(int(groups["year"]), int(groups["month"]), int(groups["day"]),
int(groups["hour"]), int(groups["minute"]), int(groups["second"]),
int(groups["fraction"]), tz)

View File

@ -16,9 +16,15 @@
"""TestResult helper classes used to by subunit."""
import csv
import datetime
import testtools
from testtools.compat import all
from testtools.content import (
text_content,
TracebackContent,
)
from subunit import iso8601
@ -34,6 +40,9 @@ class TestResultDecorator(object):
or features by degrading them.
"""
# XXX: Since lp:testtools r250, this is in testtools. Once it's released,
# we should gut this and just use that.
def __init__(self, decorated):
"""Create a TestResultDecorator forwarding to decorated."""
# Make every decorator degrade gracefully.
@ -200,34 +209,44 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
return self.decorated.time(a_datetime)
class TagCollapsingDecorator(TestResultDecorator):
"""Collapses many 'tags' calls into one where possible."""
class TagsMixin(object):
def __init__(self, result):
super(TagCollapsingDecorator, self).__init__(result)
# The (new, gone) tags for the current test.
self._current_test_tags = None
def __init__(self):
self._clear_tags()
def _clear_tags(self):
self._global_tags = set(), set()
self._test_tags = None
def _get_active_tags(self):
global_new, global_gone = self._global_tags
if self._test_tags is None:
return set(global_new)
test_new, test_gone = self._test_tags
return global_new.difference(test_gone).union(test_new)
def _get_current_scope(self):
if self._test_tags:
return self._test_tags
return self._global_tags
def _flush_current_scope(self, tag_receiver):
new_tags, gone_tags = self._get_current_scope()
if new_tags or gone_tags:
tag_receiver.tags(new_tags, gone_tags)
if self._test_tags:
self._test_tags = set(), set()
else:
self._global_tags = set(), set()
def startTestRun(self):
self._clear_tags()
def startTest(self, test):
"""Start a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
self.decorated.startTest(test)
self._current_test_tags = set(), set()
self._test_tags = set(), set()
def stopTest(self, test):
"""Stop a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
# Tags to output for this test.
if self._current_test_tags[0] or self._current_test_tags[1]:
self.decorated.tags(*self._current_test_tags)
self.decorated.stopTest(test)
self._current_test_tags = None
self._test_tags = None
def tags(self, new_tags, gone_tags):
"""Handle tag instructions.
@ -238,14 +257,25 @@ class TagCollapsingDecorator(TestResultDecorator):
:param new_tags: Tags to add,
:param gone_tags: Tags to remove.
"""
if self._current_test_tags is not None:
# gather the tags until the test stops.
self._current_test_tags[0].update(new_tags)
self._current_test_tags[0].difference_update(gone_tags)
self._current_test_tags[1].update(gone_tags)
self._current_test_tags[1].difference_update(new_tags)
else:
return self.decorated.tags(new_tags, gone_tags)
current_new_tags, current_gone_tags = self._get_current_scope()
current_new_tags.update(new_tags)
current_new_tags.difference_update(gone_tags)
current_gone_tags.update(gone_tags)
current_gone_tags.difference_update(new_tags)
class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
"""Collapses many 'tags' calls into one where possible."""
def __init__(self, result):
super(TagCollapsingDecorator, self).__init__(result)
self._clear_tags()
def _before_event(self):
self._flush_current_scope(self.decorated)
def tags(self, new_tags, gone_tags):
TagsMixin.tags(self, new_tags, gone_tags)
class TimeCollapsingDecorator(HookedTestResultDecorator):
@ -273,12 +303,129 @@ class TimeCollapsingDecorator(HookedTestResultDecorator):
self._last_received_time = a_time
def all_true(bools):
"""Return True if all of 'bools' are True. False otherwise."""
for b in bools:
if not b:
def and_predicates(predicates):
"""Return a predicate that is true iff all predicates are true."""
# XXX: Should probably be in testtools to be better used by matchers. jml
return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
def make_tag_filter(with_tags, without_tags):
"""Make a callback that checks tests against tags."""
with_tags = with_tags and set(with_tags) or None
without_tags = without_tags and set(without_tags) or None
def check_tags(test, outcome, err, details, tags):
if with_tags and not with_tags <= tags:
return False
return True
if without_tags and bool(without_tags & tags):
return False
return True
return check_tags
class _PredicateFilter(TestResultDecorator, TagsMixin):
def __init__(self, result, predicate):
super(_PredicateFilter, self).__init__(result)
self._clear_tags()
self.decorated = TimeCollapsingDecorator(
TagCollapsingDecorator(self.decorated))
self._predicate = predicate
# The current test (for filtering tags)
self._current_test = None
# Has the current test been filtered (for outputting test tags)
self._current_test_filtered = None
# Calls to this result that we don't know whether to forward on yet.
self._buffered_calls = []
def filter_predicate(self, test, outcome, error, details):
return self._predicate(
test, outcome, error, details, self._get_active_tags())
def addError(self, test, err=None, details=None):
if (self.filter_predicate(test, 'error', err, details)):
self._buffered_calls.append(
('addError', [test, err], {'details': details}))
else:
self._filtered()
def addFailure(self, test, err=None, details=None):
if (self.filter_predicate(test, 'failure', err, details)):
self._buffered_calls.append(
('addFailure', [test, err], {'details': details}))
else:
self._filtered()
def addSkip(self, test, reason=None, details=None):
if (self.filter_predicate(test, 'skip', reason, details)):
self._buffered_calls.append(
('addSkip', [test, reason], {'details': details}))
else:
self._filtered()
def addExpectedFailure(self, test, err=None, details=None):
if self.filter_predicate(test, 'expectedfailure', err, details):
self._buffered_calls.append(
('addExpectedFailure', [test, err], {'details': details}))
else:
self._filtered()
def addUnexpectedSuccess(self, test, details=None):
self._buffered_calls.append(
('addUnexpectedSuccess', [test], {'details': details}))
def addSuccess(self, test, details=None):
if (self.filter_predicate(test, 'success', None, details)):
self._buffered_calls.append(
('addSuccess', [test], {'details': details}))
else:
self._filtered()
def _filtered(self):
self._current_test_filtered = True
def startTest(self, test):
"""Start a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
TagsMixin.startTest(self, test)
self._current_test = test
self._current_test_filtered = False
self._buffered_calls.append(('startTest', [test], {}))
def stopTest(self, test):
"""Stop a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
if not self._current_test_filtered:
for method, args, kwargs in self._buffered_calls:
getattr(self.decorated, method)(*args, **kwargs)
self.decorated.stopTest(test)
self._current_test = None
self._current_test_filtered = None
self._buffered_calls = []
TagsMixin.stopTest(self, test)
def tags(self, new_tags, gone_tags):
TagsMixin.tags(self, new_tags, gone_tags)
if self._current_test is not None:
self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
else:
return super(_PredicateFilter, self).tags(new_tags, gone_tags)
def time(self, a_time):
return self.decorated.time(a_time)
def id_to_orig_id(self, id):
if id.startswith("subunit.RemotedTestCase."):
return id[len("subunit.RemotedTestCase."):]
return id
class TestResultFilter(TestResultDecorator):
@ -304,136 +451,71 @@ class TestResultFilter(TestResultDecorator):
:param filter_skip: Filter out skipped tests.
:param filter_xfail: Filter out expected failure tests.
:param filter_predicate: A callable taking (test, outcome, err,
details) and returning True if the result should be passed
details, tags) and returning True if the result should be passed
through. err and details may be none if no error or extra
metadata is available. outcome is the name of the outcome such
as 'success' or 'failure'.
as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters
are still supported but should be updated to accept the tags
parameter for efficiency.
:param fixup_expected_failures: Set of test ids to consider known
failing.
"""
super(TestResultFilter, self).__init__(result)
self.decorated = TimeCollapsingDecorator(
TagCollapsingDecorator(self.decorated))
predicates = []
if filter_error:
predicates.append(lambda t, outcome, e, d: outcome != 'error')
predicates.append(
lambda t, outcome, e, d, tags: outcome != 'error')
if filter_failure:
predicates.append(lambda t, outcome, e, d: outcome != 'failure')
predicates.append(
lambda t, outcome, e, d, tags: outcome != 'failure')
if filter_success:
predicates.append(lambda t, outcome, e, d: outcome != 'success')
predicates.append(
lambda t, outcome, e, d, tags: outcome != 'success')
if filter_skip:
predicates.append(lambda t, outcome, e, d: outcome != 'skip')
predicates.append(
lambda t, outcome, e, d, tags: outcome != 'skip')
if filter_xfail:
predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
predicates.append(
lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
if filter_predicate is not None:
predicates.append(filter_predicate)
self.filter_predicate = (
lambda test, outcome, err, details:
all_true(p(test, outcome, err, details) for p in predicates))
# The current test (for filtering tags)
self._current_test = None
# Has the current test been filtered (for outputting test tags)
self._current_test_filtered = None
# Calls to this result that we don't know whether to forward on yet.
self._buffered_calls = []
def compat(test, outcome, error, details, tags):
# 0.0.7 and earlier did not support the 'tags' parameter.
try:
return filter_predicate(
test, outcome, error, details, tags)
except TypeError:
return filter_predicate(test, outcome, error, details)
predicates.append(compat)
predicate = and_predicates(predicates)
super(TestResultFilter, self).__init__(
_PredicateFilter(result, predicate))
if fixup_expected_failures is None:
self._fixup_expected_failures = frozenset()
else:
self._fixup_expected_failures = fixup_expected_failures
def addError(self, test, err=None, details=None):
if (self.filter_predicate(test, 'error', err, details)):
if self._failure_expected(test):
self._buffered_calls.append(
('addExpectedFailure', [test, err], {'details': details}))
else:
self._buffered_calls.append(
('addError', [test, err], {'details': details}))
if self._failure_expected(test):
self.addExpectedFailure(test, err=err, details=details)
else:
self._filtered()
super(TestResultFilter, self).addError(
test, err=err, details=details)
def addFailure(self, test, err=None, details=None):
if (self.filter_predicate(test, 'failure', err, details)):
if self._failure_expected(test):
self._buffered_calls.append(
('addExpectedFailure', [test, err], {'details': details}))
else:
self._buffered_calls.append(
('addFailure', [test, err], {'details': details}))
if self._failure_expected(test):
self.addExpectedFailure(test, err=err, details=details)
else:
self._filtered()
def addSkip(self, test, reason=None, details=None):
if (self.filter_predicate(test, 'skip', reason, details)):
self._buffered_calls.append(
('addSkip', [test, reason], {'details': details}))
else:
self._filtered()
super(TestResultFilter, self).addFailure(
test, err=err, details=details)
def addSuccess(self, test, details=None):
if (self.filter_predicate(test, 'success', None, details)):
if self._failure_expected(test):
self._buffered_calls.append(
('addUnexpectedSuccess', [test], {'details': details}))
else:
self._buffered_calls.append(
('addSuccess', [test], {'details': details}))
if self._failure_expected(test):
self.addUnexpectedSuccess(test, details=details)
else:
self._filtered()
def addExpectedFailure(self, test, err=None, details=None):
if self.filter_predicate(test, 'expectedfailure', err, details):
self._buffered_calls.append(
('addExpectedFailure', [test, err], {'details': details}))
else:
self._filtered()
def addUnexpectedSuccess(self, test, details=None):
self._buffered_calls.append(
('addUnexpectedSuccess', [test], {'details': details}))
def _filtered(self):
self._current_test_filtered = True
super(TestResultFilter, self).addSuccess(test, details=details)
def _failure_expected(self, test):
return (test.id() in self._fixup_expected_failures)
def startTest(self, test):
"""Start a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
self._current_test = test
self._current_test_filtered = False
self._buffered_calls.append(('startTest', [test], {}))
def stopTest(self, test):
"""Stop a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
if not self._current_test_filtered:
# Tags to output for this test.
for method, args, kwargs in self._buffered_calls:
getattr(self.decorated, method)(*args, **kwargs)
self.decorated.stopTest(test)
self._current_test = None
self._current_test_filtered = None
self._buffered_calls = []
def time(self, a_time):
if self._current_test is not None:
self._buffered_calls.append(('time', [a_time], {}))
else:
return self.decorated.time(a_time)
def id_to_orig_id(self, id):
if id.startswith("subunit.RemotedTestCase."):
return id[len("subunit.RemotedTestCase."):]
return id
class TestIdPrintingResult(testtools.TestResult):
@ -493,3 +575,97 @@ class TestIdPrintingResult(testtools.TestResult):
def wasSuccessful(self):
"Tells whether or not this result was a success"
return self.failed_tests == 0
class TestByTestResult(testtools.TestResult):
"""Call something every time a test completes."""
# XXX: In testtools since lp:testtools r249. Once that's released, just
# import that.
def __init__(self, on_test):
"""Construct a ``TestByTestResult``.
:param on_test: A callable that take a test case, a status (one of
"success", "failure", "error", "skip", or "xfail"), a start time
(a ``datetime`` with timezone), a stop time, an iterable of tags,
and a details dict. Is called at the end of each test (i.e. on
``stopTest``) with the accumulated values for that test.
"""
super(TestByTestResult, self).__init__()
self._on_test = on_test
def startTest(self, test):
super(TestByTestResult, self).startTest(test)
self._start_time = self._now()
# There's no supported (i.e. tested) behaviour that relies on these
# being set, but it makes me more comfortable all the same. -- jml
self._status = None
self._details = None
self._stop_time = None
def stopTest(self, test):
self._stop_time = self._now()
super(TestByTestResult, self).stopTest(test)
self._on_test(
test=test,
status=self._status,
start_time=self._start_time,
stop_time=self._stop_time,
# current_tags is new in testtools 0.9.13.
tags=getattr(self, 'current_tags', None),
details=self._details)
def _err_to_details(self, test, err, details):
if details:
return details
return {'traceback': TracebackContent(err, test)}
def addSuccess(self, test, details=None):
super(TestByTestResult, self).addSuccess(test)
self._status = 'success'
self._details = details
def addFailure(self, test, err=None, details=None):
super(TestByTestResult, self).addFailure(test, err, details)
self._status = 'failure'
self._details = self._err_to_details(test, err, details)
def addError(self, test, err=None, details=None):
super(TestByTestResult, self).addError(test, err, details)
self._status = 'error'
self._details = self._err_to_details(test, err, details)
def addSkip(self, test, reason=None, details=None):
super(TestByTestResult, self).addSkip(test, reason, details)
self._status = 'skip'
if details is None:
details = {'reason': text_content(reason)}
elif reason:
# XXX: What if details already has 'reason' key?
details['reason'] = text_content(reason)
self._details = details
def addExpectedFailure(self, test, err=None, details=None):
super(TestByTestResult, self).addExpectedFailure(test, err, details)
self._status = 'xfail'
self._details = self._err_to_details(test, err, details)
def addUnexpectedSuccess(self, test, details=None):
super(TestByTestResult, self).addUnexpectedSuccess(test, details)
self._status = 'success'
self._details = details
class CsvResult(TestByTestResult):
def __init__(self, stream):
super(CsvResult, self).__init__(self._on_test)
self._write_row = csv.writer(stream).writerow
def _on_test(self, test, status, start_time, stop_time, tags, details):
self._write_row([test.id(), status, start_time, stop_time])
def startTestRun(self):
super(CsvResult, self).startTestRun()
self._write_row(['test', 'status', 'start_time', 'stop_time'])

View File

@ -7,15 +7,15 @@ if len(sys.argv) == 2:
# subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args
# uses this code path to be sure that the arguments were passed to
# sample-script.py
print "test fail"
print "error fail"
print("test fail")
print("error fail")
sys.exit(0)
print "test old mcdonald"
print "success old mcdonald"
print "test bing crosby"
print "failure bing crosby ["
print "foo.c:53:ERROR invalid state"
print "]"
print "test an error"
print "error an error"
print("test old mcdonald")
print("success old mcdonald")
print("test bing crosby")
print("failure bing crosby [")
print("foo.c:53:ERROR invalid state")
print("]")
print("test an error")
print("error an error")
sys.exit(0)

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
import sys
print "test old mcdonald"
print "success old mcdonald"
print "test bing crosby"
print "success bing crosby"
print("test old mcdonald")
print("success old mcdonald")
print("test bing crosby")
print("success bing crosby")
sys.exit(0)

View File

@ -14,7 +14,7 @@
# limitations under that license.
#
from cStringIO import StringIO
from testtools.compat import BytesIO
import unittest
from testtools import PlaceHolder
@ -42,7 +42,7 @@ class TimeCollectingTestResult(unittest.TestResult):
class TestSubunitTestRunner(unittest.TestCase):
def test_includes_timing_output(self):
io = StringIO()
io = BytesIO()
runner = SubunitTestRunner(stream=io)
test = PlaceHolder('name')
runner.run(test)

View File

@ -17,15 +17,18 @@
"""Tests for subunit.TestResultFilter."""
from datetime import datetime
import os
import subprocess
import sys
from subunit import iso8601
import unittest
from testtools import TestCase
from testtools.compat import _b, BytesIO, StringIO
from testtools.compat import _b, BytesIO
from testtools.testresult.doubles import ExtendedTestResult
import subunit
from subunit.test_results import TestResultFilter
from subunit.test_results import make_tag_filter, TestResultFilter
class TestTestResultFilter(TestCase):
@ -77,6 +80,40 @@ xfail todo
filtered_result.failures])
self.assertEqual(4, filtered_result.testsRun)
def test_tag_filter(self):
tag_filter = make_tag_filter(['global'], ['local'])
result = ExtendedTestResult()
result_filter = TestResultFilter(
result, filter_success=False, filter_predicate=tag_filter)
self.run_tests(result_filter)
tests_included = [
event[1] for event in result._events if event[0] == 'startTest']
tests_expected = list(map(
subunit.RemotedTestCase,
['passed', 'error', 'skipped', 'todo']))
self.assertEquals(tests_expected, tests_included)
def test_tags_tracked_correctly(self):
tag_filter = make_tag_filter(['a'], [])
result = ExtendedTestResult()
result_filter = TestResultFilter(
result, filter_success=False, filter_predicate=tag_filter)
input_stream = _b(
"test: foo\n"
"tags: a\n"
"successful: foo\n"
"test: bar\n"
"successful: bar\n")
self.run_tests(result_filter, input_stream)
foo = subunit.RemotedTestCase('foo')
self.assertEquals(
[('startTest', foo),
('tags', set(['a']), set()),
('addSuccess', foo),
('stopTest', foo),
],
result._events)
def test_exclude_errors(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result, filter_error=True)
@ -151,6 +188,8 @@ xfail todo
def test_filter_predicate(self):
"""You can filter by predicate callbacks"""
# 0.0.7 and earlier did not support the 'tags' parameter, so we need
# to test that we still support behaviour without it.
filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details):
return outcome == 'success'
@ -161,6 +200,18 @@ xfail todo
# Only success should pass
self.assertEqual(1, filtered_result.testsRun)
def test_filter_predicate_with_tags(self):
"""You can filter by predicate callbacks that accept tags"""
filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details, tags):
return outcome == 'success'
result_filter = TestResultFilter(filtered_result,
filter_predicate=filter_cb,
filter_success=False)
self.run_tests(result_filter)
# Only success should pass
self.assertEqual(1, filtered_result.testsRun)
def test_time_ordering_preserved(self):
# Passing a subunit stream through TestResultFilter preserves the
# relative ordering of 'time' directives and any other subunit
@ -179,14 +230,41 @@ xfail todo
result_filter = TestResultFilter(result)
self.run_tests(result_filter, subunit_stream)
foo = subunit.RemotedTestCase('foo')
self.assertEquals(
self.maxDiff = None
self.assertEqual(
[('time', date_a),
('startTest', foo),
('time', date_b),
('startTest', foo),
('addError', foo, {}),
('stopTest', foo),
('time', date_c)], result._events)
def test_time_passes_through_filtered_tests(self):
# Passing a subunit stream through TestResultFilter preserves 'time'
# directives even if a specific test is filtered out.
date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
subunit_stream = _b('\n'.join([
"time: %s",
"test: foo",
"time: %s",
"success: foo",
"time: %s",
""]) % (date_a, date_b, date_c))
result = ExtendedTestResult()
result_filter = TestResultFilter(result)
result_filter.startTestRun()
self.run_tests(result_filter, subunit_stream)
result_filter.stopTestRun()
foo = subunit.RemotedTestCase('foo')
self.maxDiff = None
self.assertEqual(
[('startTestRun',),
('time', date_a),
('time', date_c),
('stopTestRun',),], result._events)
def test_skip_preserved(self):
subunit_stream = _b('\n'.join([
"test: foo",
@ -201,6 +279,90 @@ xfail todo
('addSkip', foo, {}),
('stopTest', foo), ], result._events)
if sys.version_info < (2, 7):
# These tests require Python >=2.7.
del test_fixup_expected_failures, test_fixup_expected_errors, test_fixup_unexpected_success
class TestFilterCommand(TestCase):
example_subunit_stream = _b("""\
tags: global
test passed
success passed
test failed
tags: local
failure failed
test error
error error [
error details
]
test skipped
skip skipped
test todo
xfail todo
""")
def run_command(self, args, stream):
root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
script_path = os.path.join(root, 'filters', 'subunit-filter')
command = [sys.executable, script_path] + list(args)
ps = subprocess.Popen(
command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = ps.communicate(stream)
if ps.returncode != 0:
raise RuntimeError("%s failed: %s" % (command, err))
return out
def to_events(self, stream):
test = subunit.ProtocolTestCase(BytesIO(stream))
result = ExtendedTestResult()
test.run(result)
return result._events
def test_default(self):
output = self.run_command([], _b(
"test: foo\n"
"skip: foo\n"
))
events = self.to_events(output)
foo = subunit.RemotedTestCase('foo')
self.assertEqual(
[('startTest', foo),
('addSkip', foo, {}),
('stopTest', foo)],
events)
def test_tags(self):
output = self.run_command(['-s', '--with-tag', 'a'], _b(
"tags: a\n"
"test: foo\n"
"success: foo\n"
"tags: -a\n"
"test: bar\n"
"success: bar\n"
"test: baz\n"
"tags: a\n"
"success: baz\n"
))
events = self.to_events(output)
foo = subunit.RemotedTestCase('foo')
baz = subunit.RemotedTestCase('baz')
self.assertEqual(
[('tags', set(['a']), set()),
('startTest', foo),
('addSuccess', foo),
('stopTest', foo),
('tags', set(), set(['a'])),
('startTest', baz),
('tags', set(['a']), set()),
('addSuccess', baz),
('stopTest', baz),
],
events)
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()

View File

@ -18,9 +18,9 @@ import datetime
import unittest
import os
from testtools import skipIf, TestCase
from testtools.compat import _b, _u, BytesIO, StringIO
from testtools.content import Content, TracebackContent
from testtools import skipIf, TestCase, TestResult
from testtools.compat import _b, _u, BytesIO
from testtools.content import Content, TracebackContent, text_content
from testtools.content_type import ContentType
try:
from testtools.testresult.doubles import (
@ -40,6 +40,10 @@ from subunit import _remote_exception_str, _remote_exception_str_chunked
import subunit.iso8601 as iso8601
def details_to_str(details):
return TestResult()._err_details_to_string(None, details=details)
class TestTestImports(unittest.TestCase):
def test_imports(self):
@ -87,11 +91,12 @@ class TestTestProtocolServerPipe(unittest.TestCase):
def test_story(self):
client = unittest.TestResult()
protocol = subunit.TestProtocolServer(client)
traceback = "foo.c:53:ERROR invalid state\n"
pipe = BytesIO(_b("test old mcdonald\n"
"success old mcdonald\n"
"test bing crosby\n"
"failure bing crosby [\n"
"foo.c:53:ERROR invalid state\n"
+ traceback +
"]\n"
"test an error\n"
"error an error\n"))
@ -102,9 +107,8 @@ class TestTestProtocolServerPipe(unittest.TestCase):
[(an_error, _remote_exception_str + '\n')])
self.assertEqual(
client.failures,
[(bing, _remote_exception_str + ": Text attachment: traceback\n"
"------------\nfoo.c:53:ERROR invalid state\n"
"------------\n\n")])
[(bing, _remote_exception_str + ": "
+ details_to_str({'traceback': text_content(traceback)}) + "\n")])
self.assertEqual(client.testsRun, 3)
def test_non_test_characters_forwarded_immediately(self):
@ -559,9 +563,7 @@ class TestTestProtocolServerAddxFail(unittest.TestCase):
value = details
else:
if error_message is not None:
value = subunit.RemoteError(_u("Text attachment: traceback\n"
"------------\n") + _u(error_message) +
_u("------------\n"))
value = subunit.RemoteError(details_to_str(details))
else:
value = subunit.RemoteError()
self.assertEqual([
@ -1299,6 +1301,22 @@ class TestTestProtocolClient(unittest.TestCase):
"something\n"
"F\r\nserialised\nform0\r\n]\n" % self.test.id()))
def test_tags_empty(self):
self.protocol.tags(set(), set())
self.assertEqual(_b(""), self.io.getvalue())
def test_tags_add(self):
self.protocol.tags(set(['foo']), set())
self.assertEqual(_b("tags: foo\n"), self.io.getvalue())
def test_tags_both(self):
self.protocol.tags(set(['quux']), set(['bar']))
self.assertEqual(_b("tags: quux -bar\n"), self.io.getvalue())
def test_tags_gone(self):
self.protocol.tags(set(), set(['bar']))
self.assertEqual(_b("tags: -bar\n"), self.io.getvalue())
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()

View File

@ -14,16 +14,25 @@
# limitations under that license.
#
import csv
import datetime
import sys
import unittest
from testtools import TestCase
from testtools.compat import StringIO
from testtools.content import (
text_content,
TracebackContent,
)
from testtools.testresult.doubles import ExtendedTestResult
import subunit
import subunit.iso8601 as iso8601
import subunit.test_results
import testtools
class LoggingDecorator(subunit.test_results.HookedTestResultDecorator):
@ -192,12 +201,55 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase):
class TestTagCollapsingDecorator(TestCase):
def test_tags_forwarded_outside_of_tests(self):
def test_tags_collapsed_outside_of_tests(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
tag_collapser.tags(set(['a', 'b']), set())
tag_collapser.tags(set(['a']), set())
tag_collapser.tags(set(['b']), set())
tag_collapser.startTest(self)
self.assertEquals(
[('tags', set(['a', 'b']), set([]))], result._events)
[('tags', set(['a', 'b']), set([])),
('startTest', self),
], result._events)
def test_tags_collapsed_outside_of_tests_are_flushed(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
tag_collapser.startTestRun()
tag_collapser.tags(set(['a']), set())
tag_collapser.tags(set(['b']), set())
tag_collapser.startTest(self)
tag_collapser.addSuccess(self)
tag_collapser.stopTest(self)
tag_collapser.stopTestRun()
self.assertEquals(
[('startTestRun',),
('tags', set(['a', 'b']), set([])),
('startTest', self),
('addSuccess', self),
('stopTest', self),
('stopTestRun',),
], result._events)
def test_tags_forwarded_after_tests(self):
test = subunit.RemotedTestCase('foo')
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
tag_collapser.startTestRun()
tag_collapser.startTest(test)
tag_collapser.addSuccess(test)
tag_collapser.stopTest(test)
tag_collapser.tags(set(['a']), set(['b']))
tag_collapser.stopTestRun()
self.assertEqual(
[('startTestRun',),
('startTest', test),
('addSuccess', test),
('stopTest', test),
('tags', set(['a']), set(['b'])),
('stopTestRun',),
],
result._events)
def test_tags_collapsed_inside_of_tests(self):
result = ExtendedTestResult()
@ -229,6 +281,25 @@ class TestTagCollapsingDecorator(TestCase):
('stopTest', test)],
result._events)
def test_tags_sent_before_result(self):
# Because addSuccess and friends tend to send subunit output
# immediately, and because 'tags:' before a result line means
# something different to 'tags:' after a result line, we need to be
# sure that tags are emitted before 'addSuccess' (or whatever).
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
test = subunit.RemotedTestCase('foo')
tag_collapser.startTest(test)
tag_collapser.tags(set(['a']), set())
tag_collapser.addSuccess(test)
tag_collapser.stopTest(test)
self.assertEquals(
[('startTest', test),
('tags', set(['a']), set()),
('addSuccess', test),
('stopTest', test)],
result._events)
class TestTimeCollapsingDecorator(TestCase):
@ -294,6 +365,201 @@ class TestTimeCollapsingDecorator(TestCase):
('stopTest', foo)], result._events)
class TestByTestResultTests(testtools.TestCase):
def setUp(self):
super(TestByTestResultTests, self).setUp()
self.log = []
self.result = subunit.test_results.TestByTestResult(self.on_test)
if sys.version_info >= (3, 0):
self.result._now = iter(range(5)).__next__
else:
self.result._now = iter(range(5)).next
def assertCalled(self, **kwargs):
defaults = {
'test': self,
'tags': set(),
'details': None,
'start_time': 0,
'stop_time': 1,
}
defaults.update(kwargs)
self.assertEqual([defaults], self.log)
def on_test(self, **kwargs):
self.log.append(kwargs)
def test_no_tests_nothing_reported(self):
self.result.startTestRun()
self.result.stopTestRun()
self.assertEqual([], self.log)
def test_add_success(self):
self.result.startTest(self)
self.result.addSuccess(self)
self.result.stopTest(self)
self.assertCalled(status='success')
def test_add_success_details(self):
self.result.startTest(self)
details = {'foo': 'bar'}
self.result.addSuccess(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='success', details=details)
def test_tags(self):
if not getattr(self.result, 'tags', None):
self.skipTest("No tags in testtools")
self.result.tags(['foo'], [])
self.result.startTest(self)
self.result.addSuccess(self)
self.result.stopTest(self)
self.assertCalled(status='success', tags=set(['foo']))
def test_add_error(self):
self.result.startTest(self)
try:
1/0
except ZeroDivisionError:
error = sys.exc_info()
self.result.addError(self, error)
self.result.stopTest(self)
self.assertCalled(
status='error',
details={'traceback': TracebackContent(error, self)})
def test_add_error_details(self):
self.result.startTest(self)
details = {"foo": text_content("bar")}
self.result.addError(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='error', details=details)
def test_add_failure(self):
self.result.startTest(self)
try:
self.fail("intentional failure")
except self.failureException:
failure = sys.exc_info()
self.result.addFailure(self, failure)
self.result.stopTest(self)
self.assertCalled(
status='failure',
details={'traceback': TracebackContent(failure, self)})
def test_add_failure_details(self):
self.result.startTest(self)
details = {"foo": text_content("bar")}
self.result.addFailure(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='failure', details=details)
def test_add_xfail(self):
self.result.startTest(self)
try:
1/0
except ZeroDivisionError:
error = sys.exc_info()
self.result.addExpectedFailure(self, error)
self.result.stopTest(self)
self.assertCalled(
status='xfail',
details={'traceback': TracebackContent(error, self)})
def test_add_xfail_details(self):
self.result.startTest(self)
details = {"foo": text_content("bar")}
self.result.addExpectedFailure(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='xfail', details=details)
def test_add_unexpected_success(self):
self.result.startTest(self)
details = {'foo': 'bar'}
self.result.addUnexpectedSuccess(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='success', details=details)
def test_add_skip_reason(self):
self.result.startTest(self)
reason = self.getUniqueString()
self.result.addSkip(self, reason)
self.result.stopTest(self)
self.assertCalled(
status='skip', details={'reason': text_content(reason)})
def test_add_skip_details(self):
self.result.startTest(self)
details = {'foo': 'bar'}
self.result.addSkip(self, details=details)
self.result.stopTest(self)
self.assertCalled(status='skip', details=details)
def test_twice(self):
self.result.startTest(self)
self.result.addSuccess(self, details={'foo': 'bar'})
self.result.stopTest(self)
self.result.startTest(self)
self.result.addSuccess(self)
self.result.stopTest(self)
self.assertEqual(
[{'test': self,
'status': 'success',
'start_time': 0,
'stop_time': 1,
'tags': set(),
'details': {'foo': 'bar'}},
{'test': self,
'status': 'success',
'start_time': 2,
'stop_time': 3,
'tags': set(),
'details': None},
],
self.log)
class TestCsvResult(testtools.TestCase):
def parse_stream(self, stream):
stream.seek(0)
reader = csv.reader(stream)
return list(reader)
def test_csv_output(self):
stream = StringIO()
result = subunit.test_results.CsvResult(stream)
if sys.version_info >= (3, 0):
result._now = iter(range(5)).__next__
else:
result._now = iter(range(5)).next
result.startTestRun()
result.startTest(self)
result.addSuccess(self)
result.stopTest(self)
result.stopTestRun()
self.assertEqual(
[['test', 'status', 'start_time', 'stop_time'],
[self.id(), 'success', '0', '1'],
],
self.parse_stream(stream))
def test_just_header_when_no_tests(self):
stream = StringIO()
result = subunit.test_results.CsvResult(stream)
result.startTestRun()
result.stopTestRun()
self.assertEqual(
[['test', 'status', 'start_time', 'stop_time']],
self.parse_stream(stream))
def test_no_output_before_events(self):
stream = StringIO()
subunit.test_results.CsvResult(stream)
self.assertEqual([], self.parse_stream(stream))
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- Mode: python -*-
#
# Copyright (C) 2004 Canonical.com

View File

@ -13,19 +13,22 @@ else:
]
}
try:
def _get_version_from_file(filename, start_of_line, split_marker):
"""Extract version from file, giving last matching value or None"""
try:
return [x for x in open(filename)
if x.startswith(start_of_line)][-1].split(split_marker)[1].strip()
except (IOError, IndexError):
return None
VERSION = (
# Assume we are in a distribution, which has PKG-INFO
version_lines = [x for x in open('PKG-INFO').readlines()
if x.startswith('Version:')]
version_line = version_lines and version_lines[-1] or 'VERSION = 0.0'
VERSION = version_line.split(':')[1].strip()
except IOError:
_get_version_from_file('PKG-INFO', 'Version:', ':')
# Must be a development checkout, so use the Makefile
version_lines = [x for x in open('Makefile').readlines()
if x.startswith('VERSION')]
version_line = version_lines and version_lines[-1] or 'VERSION = 0.0'
VERSION = version_line.split('=')[1].strip()
or _get_version_from_file('Makefile', 'VERSION', '=')
or "0.0")
setup(

View File

@ -27,7 +27,7 @@
. ${SHELL_SHARE}subunit.sh
echo 'test: subunit_start_test output'
func_output=$(subunit_start_test "foo bar")
func_output=$(subunit_start_test "foo bar"|grep -v 'time:')
func_status=$?
if [ $func_status == 0 -a "x$func_output" = "xtest: foo bar" ]; then
echo 'success: subunit_start_test output'
@ -40,7 +40,7 @@ else
fi
subunit_start_test "subunit_pass_test output"
func_output=$(subunit_pass_test "foo bar")
func_output=$(subunit_pass_test "foo bar"|grep -v 'time:')
func_status=$?
if [ $func_status == 0 -a "x$func_output" = "xsuccess: foo bar" ]; then
subunit_pass_test "subunit_pass_test output"
@ -53,12 +53,12 @@ else
fi
subunit_start_test "subunit_fail_test output"
func_output=$(subunit_fail_test "foo bar" <<END
func_output=$((subunit_fail_test "foo bar" <<END
something
wrong
here
END
)
)|grep -v 'time:')
func_status=$?
if [ $func_status == 0 -a "x$func_output" = "xfailure: foo bar [
something
@ -75,12 +75,12 @@ else
fi
subunit_start_test "subunit_error_test output"
func_output=$(subunit_error_test "foo bar" <<END
func_output=$((subunit_error_test "foo bar" <<END
something
died
here
END
)
)| grep -v 'time:')
func_status=$?
if [ $func_status == 0 -a "x$func_output" = "xerror: foo bar [
something