1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-24 13:57:43 +03:00

subunit: Update to latest upstream snapshot.

This commit is contained in:
Jelmer Vernooij 2011-08-27 16:07:38 +02:00
parent dd56d27d74
commit bd01a8e79f
22 changed files with 1300 additions and 628 deletions

View File

@ -14,7 +14,7 @@ Dependencies
* Python for the filters
* 'testtools' (On Debian and Ubuntu systems the 'python-testtools' package,
the testtools package on pypi, or https://launchpad.net/testtools) for
the extended test API which permits attachments. Version 0.9.8 or newer is
the extended test API which permits attachments. Version 0.9.11 or newer is
required. Of particular note, http://testtools.python-hosting.com/ is not
the testtools you want.
* A C compiler for the C bindings

View File

@ -10,13 +10,74 @@ test ids and also filter via a test id list file thanks to improvements in
``testtools.run``. See the testtools manual, or testrepository - a major
user of such functionality.
Additionally the protocol now has a keyword uxsuccess for Unexpected Success
reporting. Older parsers will report tests with this status code as 'lost
connection'.
IMPROVEMENTS
~~~~~~~~~~~~
* Add ``TimeCollapsingDecorator`` which collapses multiple sequential time()
calls into just the first and last. (Jonathan Lange)
* Add ``TagCollapsingDecorator`` which collapses many tags() calls into one
where possible. (Jonathan Lange, Robert Collins)
* Force flush of writes to stdout in c/tests/test_child.
(Jelmer Vernooij, #687611)
* Provisional Python 3.x support.
(Robert Collins, Tres Seaver, Martin[gz], #666819)
* ``subunit.chunked.Decoder`` Python class takes a new ``strict`` option,
which defaults to ``True``. When ``False``, the ``Decoder`` will accept
incorrect input that is still unambiguous. i.e. subunit will not barf if
a \r is missing from the input. (Martin Pool)
* ``subunit-filter`` preserves the relative ordering of ``time:`` statements,
so you can now use filtered streams to gather data about how long it takes
to run a test. (Jonathan Lange, #716554)
* ``subunit-ls`` now handles a stream with time: instructions that start
partway through the stream (which may lead to strange times) more gracefully.
(Robert Collins, #785954)
* ``subunit-ls`` should handle the new test outcomes in Python2.7 better.
(Robert Collins, #785953)
* ``TestResultFilter`` now collapses sequential calls to time().
(Jonathan Lange, #567150)
* ``TestResultDecorator.tags()`` now actually works, and is no longer a buggy
copy/paste of ``TestResultDecorator.time()``. (Jonathan Lange, #681828)
* ``TestResultFilter`` now supports a ``fixup_expected_failures``
argument. (Jelmer Vernooij, #755241)
* The ``subunit.run`` Python module supports ``-l`` and ``--load-list`` as
per ``testtools.run``. This required a dependency bump due to a small
API change in ``testtools``. (Robert Collins)
* The help for subunit-filter was confusing about the behaviour of ``-f`` /
``--no-failure``. (Robert Collins, #703392)
* The Python2.7 / testtools addUnexpectedSuccess API is now supported. This
required adding a new status code to the protocol. (Robert Collins, #654474)
BUG FIXES
~~~~~~~~~
* Add 'subunit --no-xfail', which will omit expected failures from the subunit
stream. (John Arbash Meinel, #623642)
* Add 'subunit -F/--only-genuine-failures' which sets all of '--no-skips',
'--no-xfail', '--no-passthrough, '--no-success', and gives you just the
failure stream. (John Arbash Meinel)
CHANGES
~~~~~~~
* Newer testtools is needed as part of the Python 3 support. (Robert Collins)
0.0.6
-----

View File

@ -142,23 +142,26 @@ line orientated and consists of either directives and their parameters, or
when outside a DETAILS region unexpected lines which are not interpreted by
the parser - they should be forwarded unaltered.
test|testing|test:|testing: test label
success|success:|successful|successful: test label
success|success:|successful|successful: test label DETAILS
failure: test label
failure: test label DETAILS
error: test label
error: test label DETAILS
skip[:] test label
skip[:] test label DETAILS
xfail[:] test label
xfail[:] test label DETAILS
test|testing|test:|testing: test LABEL
success|success:|successful|successful: test LABEL
success|success:|successful|successful: test LABEL DETAILS
failure: test LABEL
failure: test LABEL DETAILS
error: test LABEL
error: test LABEL DETAILS
skip[:] test LABEL
skip[:] test LABEL DETAILS
xfail[:] test LABEL
xfail[:] test LABEL DETAILS
uxsuccess[:] test LABEL
uxsuccess[:] test LABEL DETAILS
progress: [+|-]X
progress: push
progress: pop
tags: [-]TAG ...
time: YYYY-MM-DD HH:MM:SSZ
LABEL: UTF8*
DETAILS ::= BRACKETED | MULTIPART
BRACKETED ::= '[' CR UTF8-lines ']' CR
MULTIPART ::= '[ multipart' CR PART* ']' CR
@ -200,13 +203,14 @@ directive for the most recently started test).
The time directive acts as a clock event - it sets the time for all future
events. The value should be a valid ISO8601 time.
The skip result is used to indicate a test that was found by the runner but not
fully executed due to some policy or dependency issue. This is represented in
python using the addSkip interface that testtools
(https://edge.launchpad.net/testtools) defines. When communicating with a non
skip aware test result, the test is reported as an error.
The xfail result is used to indicate a test that was expected to fail failing
in the expected manner. As this is a normal condition for such tests it is
represented as a successful test in Python.
In future, skip and xfail results will be represented semantically in Python,
but some discussion is underway on the right way to do this.
The skip, xfail and uxsuccess outcomes are not supported by all testing
environments. In Python the testttools (https://launchpad.net/testtools)
library is used to translate these automatically if an older Python version
that does not support them is in use. See the testtools documentation for the
translation policy.
skip is used to indicate a test was discovered but not executed. xfail is used
to indicate a test that errored in some expected fashion (also know as "TODO"
tests in some frameworks). uxsuccess is used to indicate and unexpected success
where a test though to be failing actually passes. It is complementary to
xfail.

View File

@ -16,6 +16,7 @@
**/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <check.h>
@ -57,6 +58,8 @@ test_stdout_function(char const * expected,
* DEAL.
*/
function();
/* flush writes on FILE object to file descriptor */
fflush(stdout);
/* restore stdout now */
if (dup2(old_stdout, 1) != 1) {
close(old_stdout);

View File

@ -28,13 +28,13 @@ Remember to quote shell metacharacters.
from optparse import OptionParser
import sys
import unittest
import re
from subunit import (
DiscardStream,
ProtocolTestCase,
TestProtocolClient,
read_test_list,
)
from subunit.test_results import TestResultFilter
@ -46,24 +46,43 @@ parser.add_option("-e", "--no-error", action="store_true",
parser.add_option("--failure", action="store_false",
help="include failures", default=False, dest="failure")
parser.add_option("-f", "--no-failure", action="store_true",
help="include failures", dest="failure")
help="exclude failures", dest="failure")
parser.add_option("--passthrough", action="store_false",
help="Show all non subunit input.", default=False, dest="no_passthrough")
parser.add_option("--no-passthrough", action="store_true",
help="Hide all non subunit input.", default=False, dest="no_passthrough")
parser.add_option("-s", "--success", action="store_false",
help="include successes", dest="success")
parser.add_option("--no-skip", action="store_true",
help="exclude skips", dest="skip")
parser.add_option("--no-success", action="store_true",
help="exclude successes", default=True, dest="success")
parser.add_option("--no-skip", action="store_true",
help="exclude skips", dest="skip")
parser.add_option("--xfail", action="store_false",
help="include expected falures", default=True, dest="xfail")
parser.add_option("--no-xfail", action="store_true",
help="exclude expected falures", default=True, dest="xfail")
parser.add_option("-m", "--with", type=str,
help="regexp to include (case-sensitive by default)",
action="append", dest="with_regexps")
parser.add_option("--fixup-expected-failures", type=str,
help="File with list of test ids that are expected to fail; on failure "
"their result will be changed to xfail; on success they will be "
"changed to error.", dest="fixup_expected_failures", action="append")
parser.add_option("--without", type=str,
help="regexp to exclude (case-sensitive by default)",
action="append", dest="without_regexps")
(options, args) = parser.parse_args()
def only_genuine_failures_callback(option, opt, value, parser):
parser.rargs.insert(0, '--no-passthrough')
parser.rargs.insert(0, '--no-xfail')
parser.rargs.insert(0, '--no-skip')
parser.rargs.insert(0, '--no-success')
parser.add_option("-F", "--only-genuine-failures", action="callback",
callback=only_genuine_failures_callback,
help="Only pass through failures and exceptions.")
(options, args) = parser.parse_args()
def _compile_re_from_list(l):
return re.compile("|".join(l), re.MULTILINE)
@ -91,11 +110,15 @@ def _make_regexp_filter(with_regexps, without_regexps):
regexp_filter = _make_regexp_filter(options.with_regexps,
options.without_regexps)
fixup_expected_failures = set()
for path in options.fixup_expected_failures or ():
fixup_expected_failures.update(read_test_list(path))
result = TestProtocolClient(sys.stdout)
result = TestResultFilter(result, filter_error=options.error,
filter_failure=options.failure, filter_success=options.success,
filter_skip=options.skip,
filter_predicate=regexp_filter)
filter_skip=options.skip, filter_xfail=options.xfail,
filter_predicate=regexp_filter,
fixup_expected_failures=fixup_expected_failures)
if options.no_passthrough:
passthrough_stream = DiscardStream()
else:

View File

@ -20,7 +20,10 @@ from optparse import OptionParser
import sys
from subunit import DiscardStream, ProtocolTestCase
from subunit.test_results import TestIdPrintingResult
from subunit.test_results import (
AutoTimingTestResultDecorator,
TestIdPrintingResult,
)
parser = OptionParser(description=__doc__)
@ -30,7 +33,8 @@ parser.add_option("--times", action="store_true",
parser.add_option("--no-passthrough", action="store_true",
help="Hide all non subunit input.", default=False, dest="no_passthrough")
(options, args) = parser.parse_args()
result = TestIdPrintingResult(sys.stdout, options.times)
result = AutoTimingTestResultDecorator(
TestIdPrintingResult(sys.stdout, options.times))
if options.no_passthrough:
passthrough_stream = DiscardStream()
else:

View File

@ -6,7 +6,7 @@
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -49,7 +49,7 @@ details, tags, timestamping and progress markers).
The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
which can be used instead of the usual python unittest parameter.
When used the value of details should be a dict from ``string`` to
When used the value of details should be a dict from ``string`` to
``testtools.content.Content`` objects. This is a draft API being worked on with
the Python Testing In Python mail list, with the goal of permitting a common
way to provide additional data beyond a traceback, such as captured data from
@ -58,13 +58,13 @@ and newer).
The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
remove tags in the test run that is currently executing. If called when no
test is in progress (that is, if called outside of the ``startTest``,
``stopTest`` pair), the the tags apply to all sebsequent tests. If called
test is in progress (that is, if called outside of the ``startTest``,
``stopTest`` pair), the the tags apply to all subsequent tests. If called
when a test is in progress, then the tags only apply to that test.
The ``time(a_datetime)`` method is called (if present) when a ``time:``
directive is encountered in a Subunit stream. This is used to tell a TestResult
about the time that events in the stream occured at, to allow reconstructing
about the time that events in the stream occurred at, to allow reconstructing
test timing from a stream.
The ``progress(offset, whence)`` method controls progress data for a stream.
@ -87,7 +87,7 @@ tests, allowing isolation between the test runner and some tests.
Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
tests that will fork() before that individual test is run.
`ExecTestCase`` is a convenience wrapper for running an external
`ExecTestCase`` is a convenience wrapper for running an external
program to get a Subunit stream and then report that back to an arbitrary
result object::
@ -98,7 +98,7 @@ result object::
def test_script_two(self):
'./bin/script_two'
# Normally your normal test loading would take of this automatically,
# It is only spelt out in detail here for clarity.
suite = unittest.TestSuite([AggregateTests("test_script_one"),
@ -116,26 +116,30 @@ Utility modules
* subunit.test_results contains TestResult helper classes.
"""
import datetime
import os
import re
from StringIO import StringIO
import subprocess
import sys
import unittest
import iso8601
from testtools import content, content_type, ExtendedToOriginalDecorator
from testtools.compat import _b, _u, BytesIO, StringIO
try:
from testtools.testresult.real import _StringException
RemoteException = _StringException
_remote_exception_str = '_StringException' # For testing.
# For testing: different pythons have different str() implementations.
if sys.version_info > (3, 0):
_remote_exception_str = "testtools.testresult.real._StringException"
_remote_exception_str_chunked = "34\r\n" + _remote_exception_str
else:
_remote_exception_str = "_StringException"
_remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
except ImportError:
raise ImportError ("testtools.testresult.real does not contain "
"_StringException, check your version.")
from testtools import testresult
import chunked, details, test_results
from subunit import chunked, details, iso8601, test_results
PROGRESS_SET = 0
@ -187,6 +191,19 @@ class _ParserState(object):
def __init__(self, parser):
self.parser = parser
self._test_sym = (_b('test'), _b('testing'))
self._colon_sym = _b(':')
self._error_sym = (_b('error'),)
self._failure_sym = (_b('failure'),)
self._progress_sym = (_b('progress'),)
self._skip_sym = _b('skip')
self._success_sym = (_b('success'), _b('successful'))
self._tags_sym = (_b('tags'),)
self._time_sym = (_b('time'),)
self._xfail_sym = (_b('xfail'),)
self._uxsuccess_sym = (_b('uxsuccess'),)
self._start_simple = _u(" [")
self._start_multipart = _u(" [ multipart")
def addError(self, offset, line):
"""An 'error:' directive has been read."""
@ -214,27 +231,29 @@ class _ParserState(object):
if len(parts) == 2 and line.startswith(parts[0]):
cmd, rest = parts
offset = len(cmd) + 1
cmd = cmd.rstrip(':')
if cmd in ('test', 'testing'):
cmd = cmd.rstrip(self._colon_sym)
if cmd in self._test_sym:
self.startTest(offset, line)
elif cmd == 'error':
elif cmd in self._error_sym:
self.addError(offset, line)
elif cmd == 'failure':
elif cmd in self._failure_sym:
self.addFailure(offset, line)
elif cmd == 'progress':
elif cmd in self._progress_sym:
self.parser._handleProgress(offset, line)
elif cmd == 'skip':
elif cmd in self._skip_sym:
self.addSkip(offset, line)
elif cmd in ('success', 'successful'):
elif cmd in self._success_sym:
self.addSuccess(offset, line)
elif cmd in ('tags',):
elif cmd in self._tags_sym:
self.parser._handleTags(offset, line)
self.parser.subunitLineReceived(line)
elif cmd in ('time',):
elif cmd in self._time_sym:
self.parser._handleTime(offset, line)
self.parser.subunitLineReceived(line)
elif cmd == 'xfail':
elif cmd in self._xfail_sym:
self.addExpectedFail(offset, line)
elif cmd in self._uxsuccess_sym:
self.addUnexpectedSuccess(offset, line)
else:
self.parser.stdOutLineReceived(line)
else:
@ -242,7 +261,7 @@ class _ParserState(object):
def lostConnection(self):
"""Connection lost."""
self.parser._lostConnectionInTest(u'unknown state of ')
self.parser._lostConnectionInTest(_u('unknown state of '))
def startTest(self, offset, line):
"""A test start command received."""
@ -254,24 +273,26 @@ class _InTest(_ParserState):
def _outcome(self, offset, line, no_details, details_state):
"""An outcome directive has been read.
:param no_details: Callable to call when no details are presented.
:param details_state: The state to switch to for details
processing of this outcome.
"""
if self.parser.current_test_description == line[offset:-1]:
test_name = line[offset:-1].decode('utf8')
if self.parser.current_test_description == test_name:
self.parser._state = self.parser._outside_test
self.parser.current_test_description = None
no_details()
self.parser.client.stopTest(self.parser._current_test)
self.parser._current_test = None
self.parser.subunitLineReceived(line)
elif self.parser.current_test_description + " [" == line[offset:-1]:
elif self.parser.current_test_description + self._start_simple == \
test_name:
self.parser._state = details_state
details_state.set_simple()
self.parser.subunitLineReceived(line)
elif self.parser.current_test_description + " [ multipart" == \
line[offset:-1]:
elif self.parser.current_test_description + self._start_multipart == \
test_name:
self.parser._state = details_state
details_state.set_multipart()
self.parser.subunitLineReceived(line)
@ -296,6 +317,14 @@ class _InTest(_ParserState):
self._outcome(offset, line, self._xfail,
self.parser._reading_xfail_details)
def _uxsuccess(self):
self.parser.client.addUnexpectedSuccess(self.parser._current_test)
def addUnexpectedSuccess(self, offset, line):
"""A 'uxsuccess:' directive has been read."""
self._outcome(offset, line, self._uxsuccess,
self.parser._reading_uxsuccess_details)
def _failure(self):
self.parser.client.addFailure(self.parser._current_test, details={})
@ -322,7 +351,7 @@ class _InTest(_ParserState):
def lostConnection(self):
"""Connection lost."""
self.parser._lostConnectionInTest(u'')
self.parser._lostConnectionInTest(_u(''))
class _OutSideTest(_ParserState):
@ -334,8 +363,9 @@ class _OutSideTest(_ParserState):
def startTest(self, offset, line):
"""A test start command received."""
self.parser._state = self.parser._in_test
self.parser._current_test = RemotedTestCase(line[offset:-1])
self.parser.current_test_description = line[offset:-1]
test_name = line[offset:-1].decode('utf8')
self.parser._current_test = RemotedTestCase(test_name)
self.parser.current_test_description = test_name
self.parser.client.startTest(self.parser._current_test)
self.parser.subunitLineReceived(line)
@ -357,7 +387,7 @@ class _ReadingDetails(_ParserState):
def lostConnection(self):
"""Connection lost."""
self.parser._lostConnectionInTest(u'%s report of ' %
self.parser._lostConnectionInTest(_u('%s report of ') %
self._outcome_label())
def _outcome_label(self):
@ -382,7 +412,7 @@ class _ReadingFailureDetails(_ReadingDetails):
def _outcome_label(self):
return "failure"
class _ReadingErrorDetails(_ReadingDetails):
"""State for the subunit parser when reading error details."""
@ -406,6 +436,17 @@ class _ReadingExpectedFailureDetails(_ReadingDetails):
return "xfail"
class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
"""State for the subunit parser when reading uxsuccess details."""
def _report_outcome(self):
self.parser.client.addUnexpectedSuccess(self.parser._current_test,
details=self.details_parser.get_details())
def _outcome_label(self):
return "uxsuccess"
class _ReadingSkipDetails(_ReadingDetails):
"""State for the subunit parser when reading skip details."""
@ -430,7 +471,7 @@ class _ReadingSuccessDetails(_ReadingDetails):
class TestProtocolServer(object):
"""A parser for subunit.
:ivar tags: The current tags associated with the protocol stream.
"""
@ -441,8 +482,8 @@ class TestProtocolServer(object):
:param stream: The stream that lines received which are not part of the
subunit protocol should be written to. This allows custom handling
of mixed protocols. By default, sys.stdout will be used for
convenience.
:param forward_stream: A stream to forward subunit lines to. This
convenience. It should accept bytes to its write() method.
:param forward_stream: A stream to forward subunit lines to. This
allows a filter to forward the entire stream while still parsing
and acting on it. By default forward_stream is set to
DiscardStream() and no forwarding happens.
@ -450,6 +491,8 @@ class TestProtocolServer(object):
self.client = ExtendedToOriginalDecorator(client)
if stream is None:
stream = sys.stdout
if sys.version_info > (3, 0):
stream = stream.buffer
self._stream = stream
self._forward_stream = forward_stream or DiscardStream()
# state objects we can switch too
@ -460,19 +503,24 @@ class TestProtocolServer(object):
self._reading_skip_details = _ReadingSkipDetails(self)
self._reading_success_details = _ReadingSuccessDetails(self)
self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
# start with outside test.
self._state = self._outside_test
# Avoid casts on every call
self._plusminus = _b('+-')
self._push_sym = _b('push')
self._pop_sym = _b('pop')
def _handleProgress(self, offset, line):
"""Process a progress directive."""
line = line[offset:].strip()
if line[0] in '+-':
if line[0] in self._plusminus:
whence = PROGRESS_CUR
delta = int(line)
elif line == "push":
elif line == self._push_sym:
whence = PROGRESS_PUSH
delta = None
elif line == "pop":
elif line == self._pop_sym:
whence = PROGRESS_POP
delta = None
else:
@ -482,7 +530,7 @@ class TestProtocolServer(object):
def _handleTags(self, offset, line):
"""Process a tags command."""
tags = line[offset:].split()
tags = line[offset:].decode('utf8').split()
new_tags, gone_tags = tags_to_new_gone(tags)
self.client.tags(new_tags, gone_tags)
@ -490,8 +538,9 @@ class TestProtocolServer(object):
# Accept it, but do not do anything with it yet.
try:
event_time = iso8601.parse_date(line[offset:-1])
except TypeError, e:
raise TypeError("Failed to parse %r, got %r" % (line, e))
except TypeError:
raise TypeError(_u("Failed to parse %r, got %r")
% (line, sys.exec_info[1]))
self.client.time(event_time)
def lineReceived(self, line):
@ -499,7 +548,7 @@ class TestProtocolServer(object):
self._state.lineReceived(line)
def _lostConnectionInTest(self, state_string):
error_string = u"lost connection during %stest '%s'" % (
error_string = _u("lost connection during %stest '%s'") % (
state_string, self.current_test_description)
self.client.addError(self._current_test, RemoteError(error_string))
self.client.stopTest(self._current_test)
@ -510,7 +559,7 @@ class TestProtocolServer(object):
def readFrom(self, pipe):
"""Blocking convenience API to parse an entire stream.
:param pipe: A file-like object supporting readlines().
:return: None.
"""
@ -531,10 +580,11 @@ class TestProtocolServer(object):
class TestProtocolClient(testresult.TestResult):
"""A TestResult which generates a subunit stream for a test run.
# Get a TestSuite or TestCase to run
suite = make_suite()
# Create a stream (any object with a 'write' method)
# Create a stream (any object with a 'write' method). This should accept
# bytes not strings: subunit is a byte orientated protocol.
stream = file('tests.log', 'wb')
# Create a subunit result object which will output to the stream
result = subunit.TestProtocolClient(stream)
@ -551,10 +601,18 @@ class TestProtocolClient(testresult.TestResult):
testresult.TestResult.__init__(self)
self._stream = stream
_make_stream_binary(stream)
self._progress_fmt = _b("progress: ")
self._bytes_eol = _b("\n")
self._progress_plus = _b("+")
self._progress_push = _b("push")
self._progress_pop = _b("pop")
self._empty_bytes = _b("")
self._start_simple = _b(" [\n")
self._end_simple = _b("]\n")
def addError(self, test, error=None, details=None):
"""Report an error in test test.
Only one of error and details should be provided: conceptually there
are two separate methods:
addError(self, test, error)
@ -569,7 +627,7 @@ class TestProtocolClient(testresult.TestResult):
def addExpectedFailure(self, test, error=None, details=None):
"""Report an expected failure in test test.
Only one of error and details should be provided: conceptually there
are two separate methods:
addError(self, test, error)
@ -584,7 +642,7 @@ class TestProtocolClient(testresult.TestResult):
def addFailure(self, test, error=None, details=None):
"""Report a failure in test test.
Only one of error and details should be provided: conceptually there
are two separate methods:
addFailure(self, test, error)
@ -597,9 +655,10 @@ class TestProtocolClient(testresult.TestResult):
"""
self._addOutcome("failure", test, error=error, details=details)
def _addOutcome(self, outcome, test, error=None, details=None):
def _addOutcome(self, outcome, test, error=None, details=None,
error_permitted=True):
"""Report a failure in test test.
Only one of error and details should be provided: conceptually there
are two separate methods:
addOutcome(self, test, error)
@ -611,43 +670,60 @@ class TestProtocolClient(testresult.TestResult):
exc_info tuple.
:param details: New Testing-in-python drafted API; a dict from string
to subunit.Content objects.
"""
self._stream.write("%s: %s" % (outcome, test.id()))
if error is None and details is None:
raise ValueError
:param error_permitted: If True then one and only one of error or
details must be supplied. If False then error must not be supplied
and details is still optional. """
self._stream.write(_b("%s: %s" % (outcome, test.id())))
if error_permitted:
if error is None and details is None:
raise ValueError
else:
if error is not None:
raise ValueError
if error is not None:
self._stream.write(" [\n")
self._stream.write(self._start_simple)
# XXX: this needs to be made much stricter, along the lines of
# Martin[gz]'s work in testtools. Perhaps subunit can use that?
for line in self._exc_info_to_unicode(error, test).splitlines():
self._stream.write(("%s\n" % line).encode('utf8'))
else:
elif details is not None:
self._write_details(details)
self._stream.write("]\n")
else:
self._stream.write(_b("\n"))
if details is not None or error is not None:
self._stream.write(self._end_simple)
def addSkip(self, test, reason=None, details=None):
"""Report a skipped test."""
if reason is None:
self._addOutcome("skip", test, error=None, details=details)
else:
self._stream.write("skip: %s [\n" % test.id())
self._stream.write("%s\n" % reason)
self._stream.write("]\n")
self._stream.write(_b("skip: %s [\n" % test.id()))
self._stream.write(_b("%s\n" % reason))
self._stream.write(self._end_simple)
def addSuccess(self, test, details=None):
"""Report a success in a test."""
self._stream.write("successful: %s" % test.id())
if not details:
self._stream.write("\n")
else:
self._write_details(details)
self._stream.write("]\n")
addUnexpectedSuccess = addSuccess
self._addOutcome("successful", test, details=details, error_permitted=False)
def addUnexpectedSuccess(self, test, details=None):
"""Report an unexpected success in test test.
Details can optionally be provided: conceptually there
are two separate methods:
addError(self, test)
addError(self, test, details)
:param details: New Testing-in-python drafted API; a dict from string
to subunit.Content objects.
"""
self._addOutcome("uxsuccess", test, details=details,
error_permitted=False)
def startTest(self, test):
"""Mark a test as starting its test run."""
super(TestProtocolClient, self).startTest(test)
self._stream.write("test: %s\n" % test.id())
self._stream.write(_b("test: %s\n" % test.id()))
self._stream.flush()
def stopTest(self, test):
@ -665,16 +741,19 @@ class TestProtocolClient(testresult.TestResult):
PROGRESS_POP.
"""
if whence == PROGRESS_CUR and offset > -1:
prefix = "+"
prefix = self._progress_plus
offset = _b(str(offset))
elif whence == PROGRESS_PUSH:
prefix = ""
offset = "push"
prefix = self._empty_bytes
offset = self._progress_push
elif whence == PROGRESS_POP:
prefix = ""
offset = "pop"
prefix = self._empty_bytes
offset = self._progress_pop
else:
prefix = ""
self._stream.write("progress: %s%s\n" % (prefix, offset))
prefix = self._empty_bytes
offset = _b(str(offset))
self._stream.write(self._progress_fmt + prefix + offset +
self._bytes_eol)
def time(self, a_datetime):
"""Inform the client of the time.
@ -682,42 +761,42 @@ class TestProtocolClient(testresult.TestResult):
":param datetime: A datetime.datetime object.
"""
time = a_datetime.astimezone(iso8601.Utc())
self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
time.year, time.month, time.day, time.hour, time.minute,
time.second, time.microsecond))
time.second, time.microsecond)))
def _write_details(self, details):
"""Output details to the stream.
:param details: An extended details dict for a test outcome.
"""
self._stream.write(" [ multipart\n")
for name, content in sorted(details.iteritems()):
self._stream.write("Content-Type: %s/%s" %
(content.content_type.type, content.content_type.subtype))
self._stream.write(_b(" [ multipart\n"))
for name, content in sorted(details.items()):
self._stream.write(_b("Content-Type: %s/%s" %
(content.content_type.type, content.content_type.subtype)))
parameters = content.content_type.parameters
if parameters:
self._stream.write(";")
self._stream.write(_b(";"))
param_strs = []
for param, value in parameters.iteritems():
for param, value in parameters.items():
param_strs.append("%s=%s" % (param, value))
self._stream.write(",".join(param_strs))
self._stream.write("\n%s\n" % name)
self._stream.write(_b(",".join(param_strs)))
self._stream.write(_b("\n%s\n" % name))
encoder = chunked.Encoder(self._stream)
map(encoder.write, content.iter_bytes())
list(map(encoder.write, content.iter_bytes()))
encoder.close()
def done(self):
"""Obey the testtools result.done() interface."""
def RemoteError(description=u""):
def RemoteError(description=_u("")):
return (_StringException, _StringException(description), None)
class RemotedTestCase(unittest.TestCase):
"""A class to represent test cases run in child processes.
Instances of this class are used to provide the Python test API a TestCase
that can be printed to the screen, introspected for metadata and so on.
However, as they are a simply a memoisation of a test that was actually
@ -761,7 +840,7 @@ class RemotedTestCase(unittest.TestCase):
def run(self, result=None):
if result is None: result = self.defaultTestResult()
result.startTest(self)
result.addError(self, RemoteError(u"Cannot run RemotedTestCases.\n"))
result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
result.stopTest(self)
def _strclass(self):
@ -795,14 +874,16 @@ class ExecTestCase(unittest.TestCase):
def _run(self, result):
protocol = TestProtocolServer(result)
output = subprocess.Popen(self.script, shell=True,
stdout=subprocess.PIPE).communicate()[0]
protocol.readFrom(StringIO(output))
process = subprocess.Popen(self.script, shell=True,
stdout=subprocess.PIPE)
_make_stream_binary(process.stdout)
output = process.communicate()[0]
protocol.readFrom(BytesIO(output))
class IsolatedTestCase(unittest.TestCase):
"""A TestCase which executes in a forked process.
Each test gets its own process, which has a performance overhead but will
provide excellent isolation from global state (such as django configs,
zope utilities and so on).
@ -815,7 +896,7 @@ class IsolatedTestCase(unittest.TestCase):
class IsolatedTestSuite(unittest.TestSuite):
"""A TestSuite which runs its tests in a forked process.
This decorator that will fork() before running the tests and report the
results from the child process using a Subunit stream. This is useful for
handling tests that mutate global state, or are testing C extensions that
@ -846,10 +927,10 @@ def run_isolated(klass, self, result):
# at this point, sys.stdin is redirected, now we want
# to filter it to escape ]'s.
### XXX: test and write that bit.
result = TestProtocolClient(sys.stdout)
stream = os.fdopen(1, 'wb')
result = TestProtocolClient(stream)
klass.run(self, result)
sys.stdout.flush()
stream.flush()
sys.stderr.flush()
# exit HARD, exit NOW.
os._exit(0)
@ -859,7 +940,8 @@ def run_isolated(klass, self, result):
os.close(c2pwrite)
# hookup a protocol engine
protocol = TestProtocolServer(result)
protocol.readFrom(os.fdopen(c2pread, 'rU'))
fileobj = os.fdopen(c2pread, 'rb')
protocol.readFrom(fileobj)
os.waitpid(pid, 0)
# TODO return code evaluation.
return result
@ -867,7 +949,7 @@ def run_isolated(klass, self, result):
def TAP2SubUnit(tap, subunit):
"""Filter a TAP pipe into a subunit pipe.
:param tap: A tap pipe/stream/file object.
:param subunit: A pipe/stream/file object to write subunit results to.
:return: The exit code to exit with.
@ -875,7 +957,6 @@ def TAP2SubUnit(tap, subunit):
BEFORE_PLAN = 0
AFTER_PLAN = 1
SKIP_STREAM = 2
client = TestProtocolClient(subunit)
state = BEFORE_PLAN
plan_start = 1
plan_stop = 0
@ -1025,11 +1106,11 @@ class ProtocolTestCase(object):
that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
and ``countTestCases`` methods are not supported because there isn't a
sensible mapping for those methods.
# Get a stream (any object with a readline() method), in this case the
# stream output by the example from ``subunit.TestProtocolClient``.
stream = file('tests.log', 'rb')
# Create a parser which will read from the stream and emit
# Create a parser which will read from the stream and emit
# activity to a unittest.TestResult when run() is called.
suite = subunit.ProtocolTestCase(stream)
# Create a result object to accept the contents of that stream.
@ -1055,7 +1136,6 @@ class ProtocolTestCase(object):
_make_stream_binary(stream)
self._passthrough = passthrough
self._forward = forward
_make_stream_binary(forward)
def __call__(self, result=None):
return self.run(result)
@ -1073,7 +1153,7 @@ class ProtocolTestCase(object):
class TestResultStats(testresult.TestResult):
"""A pyunit TestResult interface implementation for making statistics.
:ivar total_tests: The total tests seen.
:ivar passed_tests: The tests that passed.
:ivar failed_tests: The tests that failed.
@ -1124,20 +1204,44 @@ class TestResultStats(testresult.TestResult):
def get_default_formatter():
"""Obtain the default formatter to write to.
:return: A file-like object.
"""
formatter = os.getenv("SUBUNIT_FORMATTER")
if formatter:
return os.popen(formatter, "w")
else:
return sys.stdout
stream = sys.stdout
if sys.version_info > (3, 0):
stream = stream.buffer
return stream
if sys.version_info > (3, 0):
from io import UnsupportedOperation as _NoFilenoError
else:
_NoFilenoError = AttributeError
def read_test_list(path):
"""Read a list of test ids from a file on disk.
:param path: Path to the file
:return: Sequence of test ids
"""
f = open(path, 'rb')
try:
return [l.rstrip("\n") for l in f.readlines()]
finally:
f.close()
def _make_stream_binary(stream):
"""Ensure that a stream will be binary safe. See _make_binary_on_windows."""
if getattr(stream, 'fileno', None) is not None:
_make_binary_on_windows(stream.fileno())
try:
fileno = stream.fileno()
except _NoFilenoError:
return
_make_binary_on_windows(fileno)
def _make_binary_on_windows(fileno):
"""Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""

View File

@ -1,12 +1,13 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -16,24 +17,40 @@
"""Encoder/decoder for http style chunked encoding."""
from testtools.compat import _b
empty = _b('')
class Decoder(object):
"""Decode chunked content to a byte stream."""
def __init__(self, output):
def __init__(self, output, strict=True):
"""Create a decoder decoding to output.
:param output: A file-like object. Bytes written to the Decoder are
decoded to strip off the chunking and written to the output.
Up to a full write worth of data or a single control line may be
Up to a full write worth of data or a single control line may be
buffered (whichever is larger). The close method should be called
when no more data is available, to detect short streams; the
write method will return none-None when the end of a stream is
detected.
detected. The output object must accept bytes objects.
:param strict: If True (the default), the decoder will not knowingly
accept input that is not conformant to the HTTP specification.
(This does not imply that it will catch every nonconformance.)
If False, it will accept incorrect input that is still
unambiguous.
"""
self.output = output
self.buffered_bytes = []
self.state = self._read_length
self.body_length = 0
self.strict = strict
self._match_chars = _b("0123456789abcdefABCDEF\r\n")
self._slash_n = _b('\n')
self._slash_r = _b('\r')
self._slash_rn = _b('\r\n')
self._slash_nr = _b('\n\r')
def close(self):
"""Close the decoder.
@ -48,7 +65,7 @@ class Decoder(object):
if self.buffered_bytes:
buffered_bytes = self.buffered_bytes
self.buffered_bytes = []
return ''.join(buffered_bytes)
return empty.join(buffered_bytes)
else:
raise ValueError("stream is finished")
@ -72,22 +89,26 @@ class Decoder(object):
def _read_length(self):
"""Try to decode a length from the bytes."""
count = -1
match_chars = "0123456789abcdefABCDEF\r\n"
count_chars = []
for bytes in self.buffered_bytes:
for byte in bytes:
if byte not in match_chars:
for pos in range(len(bytes)):
byte = bytes[pos:pos+1]
if byte not in self._match_chars:
break
count_chars.append(byte)
if byte == '\n':
if byte == self._slash_n:
break
if not count_chars:
return
if count_chars[-1][-1] != '\n':
if count_chars[-1] != self._slash_n:
return
count_str = ''.join(count_chars)
self.body_length = int(count_str[:-2], 16)
count_str = empty.join(count_chars)
if self.strict:
if count_str[-2:] != self._slash_rn:
raise ValueError("chunk header invalid: %r" % count_str)
if self._slash_r in count_str[:-2]:
raise ValueError("too many CRs in chunk header %r" % count_str)
self.body_length = int(count_str.rstrip(self._slash_nr), 16)
excess_bytes = len(count_str)
while excess_bytes:
if excess_bytes >= len(self.buffered_bytes[0]):
@ -100,14 +121,14 @@ class Decoder(object):
self.state = self._finished
if not self.buffered_bytes:
# May not call into self._finished with no buffered data.
return ''
return empty
else:
self.state = self._read_body
return self.state()
def write(self, bytes):
"""Decode bytes to the output stream.
:raises ValueError: If the stream has already seen the end of file
marker.
:returns: None, or the excess bytes beyond the end of file marker.
@ -133,7 +154,7 @@ class Encoder(object):
def flush(self, extra_len=0):
"""Flush the encoder to the output stream.
:param extra_len: Increase the size of the chunk by this many bytes
to allow for a subsequent write.
"""
@ -143,9 +164,9 @@ class Encoder(object):
buffer_size = self.buffer_size
self.buffered_bytes = []
self.buffer_size = 0
self.output.write("%X\r\n" % (buffer_size + extra_len))
self.output.write(_b("%X\r\n" % (buffer_size + extra_len)))
if buffer_size:
self.output.write(''.join(buffered_bytes))
self.output.write(empty.join(buffered_bytes))
return True
def write(self, bytes):
@ -161,4 +182,4 @@ class Encoder(object):
def close(self):
"""Finish the stream. This does not close the output stream."""
self.flush()
self.output.write("0\r\n")
self.output.write(_b("0\r\n"))

View File

@ -16,11 +16,14 @@
"""Handlers for outcome details."""
from cStringIO import StringIO
from testtools import content, content_type
from testtools.compat import _b, BytesIO
import chunked
from subunit import chunked
end_marker = _b("]\n")
quoted_marker = _b(" ]")
empty = _b('')
class DetailsParser(object):
@ -31,14 +34,14 @@ class SimpleDetailsParser(DetailsParser):
"""Parser for single-part [] delimited details."""
def __init__(self, state):
self._message = ""
self._message = _b("")
self._state = state
def lineReceived(self, line):
if line == "]\n":
if line == end_marker:
self._state.endDetails()
return
if line[0:2] == " ]":
if line[0:2] == quoted_marker:
# quoted ] start
self._message += line[1:]
else:
@ -77,18 +80,21 @@ class MultipartDetailsParser(DetailsParser):
self._parse_state = self._look_for_content
def _look_for_content(self, line):
if line == "]\n":
if line == end_marker:
self._state.endDetails()
return
# TODO error handling
field, value = line[:-1].split(' ', 1)
main, sub = value.split('/')
field, value = line[:-1].decode('utf8').split(' ', 1)
try:
main, sub = value.split('/')
except ValueError:
raise ValueError("Invalid MIME type %r" % value)
self._content_type = content_type.ContentType(main, sub)
self._parse_state = self._get_name
def _get_name(self, line):
self._name = line[:-1]
self._body = StringIO()
self._name = line[:-1].decode('utf8')
self._body = BytesIO()
self._chunk_parser = chunked.Decoder(self._body)
self._parse_state = self._feed_chunks
@ -96,7 +102,7 @@ class MultipartDetailsParser(DetailsParser):
residue = self._chunk_parser.write(line)
if residue is not None:
# Line based use always ends on no residue.
assert residue == '', 'residue: %r' % (residue,)
assert residue == empty, 'residue: %r' % (residue,)
body = self._body
self._details[self._name] = content.Content(
self._content_type, lambda:[body.getvalue()])

View File

@ -31,15 +31,25 @@ datetime.datetime(2007, 1, 25, 12, 0, tzinfo=<iso8601.iso8601.Utc ...>)
from datetime import datetime, timedelta, tzinfo
import re
import sys
__all__ = ["parse_date", "ParseError"]
# Adapted from http://delete.me.uk/2005/03/iso8601.html
ISO8601_REGEX = re.compile(r"(?P<year>[0-9]{4})(-(?P<month>[0-9]{1,2})(-(?P<day>[0-9]{1,2})"
ISO8601_REGEX_PATTERN = (r"(?P<year>[0-9]{4})(-(?P<month>[0-9]{1,2})(-(?P<day>[0-9]{1,2})"
r"((?P<separator>.)(?P<hour>[0-9]{2}):(?P<minute>[0-9]{2})(:(?P<second>[0-9]{2})(\.(?P<fraction>[0-9]+))?)?"
r"(?P<timezone>Z|(([-+])([0-9]{2}):([0-9]{2})))?)?)?)?"
)
TIMEZONE_REGEX = re.compile("(?P<prefix>[+-])(?P<hours>[0-9]{2}).(?P<minutes>[0-9]{2})")
TIMEZONE_REGEX_PATTERN = "(?P<prefix>[+-])(?P<hours>[0-9]{2}).(?P<minutes>[0-9]{2})"
ISO8601_REGEX = re.compile(ISO8601_REGEX_PATTERN.encode('utf8'))
TIMEZONE_REGEX = re.compile(TIMEZONE_REGEX_PATTERN.encode('utf8'))
zulu = "Z".encode('latin-1')
minus = "-".encode('latin-1')
if sys.version_info < (3, 0):
bytes = str
class ParseError(Exception):
"""Raised when there is a problem parsing a date string"""
@ -84,7 +94,7 @@ def parse_timezone(tzstring, default_timezone=UTC):
"""Parses ISO 8601 time zone specs into tzinfo offsets
"""
if tzstring == "Z":
if tzstring == zulu:
return default_timezone
# This isn't strictly correct, but it's common to encounter dates without
# timezones so I'll assume the default (which defaults to UTC).
@ -94,7 +104,7 @@ def parse_timezone(tzstring, default_timezone=UTC):
m = TIMEZONE_REGEX.match(tzstring)
prefix, hours, minutes = m.groups()
hours, minutes = int(hours), int(minutes)
if prefix == "-":
if prefix == minus:
hours = -hours
minutes = -minutes
return FixedOffset(hours, minutes, tzstring)
@ -107,8 +117,8 @@ def parse_date(datestring, default_timezone=UTC):
default timezone specified in default_timezone is used. This is UTC by
default.
"""
if not isinstance(datestring, basestring):
raise ParseError("Expecting a string %r" % datestring)
if not isinstance(datestring, bytes):
raise ParseError("Expecting bytes %r" % datestring)
m = ISO8601_REGEX.match(datestring)
if not m:
raise ParseError("Unable to parse date string %r" % datestring)

View File

@ -49,7 +49,7 @@ class SubunitTestProgram(TestProgram):
def usageExit(self, msg=None):
if msg:
print msg
print (msg)
usage = {'progName': self.progName, 'catchbreak': '', 'failfast': '',
'buffer': ''}
if self.failfast != False:

View File

@ -18,9 +18,10 @@
import datetime
import iso8601
import testtools
from subunit import iso8601
# NOT a TestResult, because we are implementing the interface, not inheriting
# it.
@ -81,8 +82,12 @@ class TestResultDecorator(object):
def stop(self):
return self.decorated.stop()
@property
def testsRun(self):
return self.decorated.testsRun
def tags(self, new_tags, gone_tags):
return self.decorated.time(new_tags, gone_tags)
return self.decorated.tags(new_tags, gone_tags)
def time(self, a_datetime):
return self.decorated.time(a_datetime)
@ -195,6 +200,87 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
return self.decorated.time(a_datetime)
class TagCollapsingDecorator(TestResultDecorator):
"""Collapses many 'tags' calls into one where possible."""
def __init__(self, result):
super(TagCollapsingDecorator, self).__init__(result)
# The (new, gone) tags for the current test.
self._current_test_tags = None
def startTest(self, test):
"""Start a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
self.decorated.startTest(test)
self._current_test_tags = set(), set()
def stopTest(self, test):
"""Stop a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
# Tags to output for this test.
if self._current_test_tags[0] or self._current_test_tags[1]:
self.decorated.tags(*self._current_test_tags)
self.decorated.stopTest(test)
self._current_test_tags = None
def tags(self, new_tags, gone_tags):
"""Handle tag instructions.
Adds and removes tags as appropriate. If a test is currently running,
tags are not affected for subsequent tests.
:param new_tags: Tags to add,
:param gone_tags: Tags to remove.
"""
if self._current_test_tags is not None:
# gather the tags until the test stops.
self._current_test_tags[0].update(new_tags)
self._current_test_tags[0].difference_update(gone_tags)
self._current_test_tags[1].update(gone_tags)
self._current_test_tags[1].difference_update(new_tags)
else:
return self.decorated.tags(new_tags, gone_tags)
class TimeCollapsingDecorator(HookedTestResultDecorator):
"""Only pass on the first and last of a consecutive sequence of times."""
def __init__(self, decorated):
super(TimeCollapsingDecorator, self).__init__(decorated)
self._last_received_time = None
self._last_sent_time = None
def _before_event(self):
if self._last_received_time is None:
return
if self._last_received_time != self._last_sent_time:
self.decorated.time(self._last_received_time)
self._last_sent_time = self._last_received_time
self._last_received_time = None
def time(self, a_time):
# Don't upcall, because we don't want to call _before_event, it's only
# for non-time events.
if self._last_received_time is None:
self.decorated.time(a_time)
self._last_sent_time = a_time
self._last_received_time = a_time
def all_true(bools):
"""Return True if all of 'bools' are True. False otherwise."""
for b in bools:
if not b:
return False
return True
class TestResultFilter(TestResultDecorator):
"""A pyunit TestResult interface implementation which filters tests.
@ -208,82 +294,110 @@ class TestResultFilter(TestResultDecorator):
"""
def __init__(self, result, filter_error=False, filter_failure=False,
filter_success=True, filter_skip=False,
filter_predicate=None):
filter_success=True, filter_skip=False, filter_xfail=False,
filter_predicate=None, fixup_expected_failures=None):
"""Create a FilterResult object filtering to result.
:param filter_error: Filter out errors.
:param filter_failure: Filter out failures.
:param filter_success: Filter out successful tests.
:param filter_skip: Filter out skipped tests.
:param filter_xfail: Filter out expected failure tests.
:param filter_predicate: A callable taking (test, outcome, err,
details) and returning True if the result should be passed
through. err and details may be none if no error or extra
metadata is available. outcome is the name of the outcome such
as 'success' or 'failure'.
:param fixup_expected_failures: Set of test ids to consider known
failing.
"""
TestResultDecorator.__init__(self, result)
self._filter_error = filter_error
self._filter_failure = filter_failure
self._filter_success = filter_success
self._filter_skip = filter_skip
if filter_predicate is None:
filter_predicate = lambda test, outcome, err, details: True
self.filter_predicate = filter_predicate
super(TestResultFilter, self).__init__(result)
self.decorated = TimeCollapsingDecorator(
TagCollapsingDecorator(self.decorated))
predicates = []
if filter_error:
predicates.append(lambda t, outcome, e, d: outcome != 'error')
if filter_failure:
predicates.append(lambda t, outcome, e, d: outcome != 'failure')
if filter_success:
predicates.append(lambda t, outcome, e, d: outcome != 'success')
if filter_skip:
predicates.append(lambda t, outcome, e, d: outcome != 'skip')
if filter_xfail:
predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
if filter_predicate is not None:
predicates.append(filter_predicate)
self.filter_predicate = (
lambda test, outcome, err, details:
all_true(p(test, outcome, err, details) for p in predicates))
# The current test (for filtering tags)
self._current_test = None
# Has the current test been filtered (for outputting test tags)
self._current_test_filtered = None
# The (new, gone) tags for the current test.
self._current_test_tags = None
# Calls to this result that we don't know whether to forward on yet.
self._buffered_calls = []
if fixup_expected_failures is None:
self._fixup_expected_failures = frozenset()
else:
self._fixup_expected_failures = fixup_expected_failures
def addError(self, test, err=None, details=None):
if (not self._filter_error and
self.filter_predicate(test, 'error', err, details)):
self.decorated.startTest(test)
self.decorated.addError(test, err, details=details)
if (self.filter_predicate(test, 'error', err, details)):
if self._failure_expected(test):
self._buffered_calls.append(
('addExpectedFailure', [test, err], {'details': details}))
else:
self._buffered_calls.append(
('addError', [test, err], {'details': details}))
else:
self._filtered()
def addFailure(self, test, err=None, details=None):
if (not self._filter_failure and
self.filter_predicate(test, 'failure', err, details)):
self.decorated.startTest(test)
self.decorated.addFailure(test, err, details=details)
if (self.filter_predicate(test, 'failure', err, details)):
if self._failure_expected(test):
self._buffered_calls.append(
('addExpectedFailure', [test, err], {'details': details}))
else:
self._buffered_calls.append(
('addFailure', [test, err], {'details': details}))
else:
self._filtered()
def addSkip(self, test, reason=None, details=None):
if (not self._filter_skip and
self.filter_predicate(test, 'skip', reason, details)):
self.decorated.startTest(test)
self.decorated.addSkip(test, reason, details=details)
if (self.filter_predicate(test, 'skip', reason, details)):
self._buffered_calls.append(
('addSkip', [test, reason], {'details': details}))
else:
self._filtered()
def addSuccess(self, test, details=None):
if (not self._filter_success and
self.filter_predicate(test, 'success', None, details)):
self.decorated.startTest(test)
self.decorated.addSuccess(test, details=details)
if (self.filter_predicate(test, 'success', None, details)):
if self._failure_expected(test):
self._buffered_calls.append(
('addUnexpectedSuccess', [test], {'details': details}))
else:
self._buffered_calls.append(
('addSuccess', [test], {'details': details}))
else:
self._filtered()
def addExpectedFailure(self, test, err=None, details=None):
if self.filter_predicate(test, 'expectedfailure', err, details):
self.decorated.startTest(test)
return self.decorated.addExpectedFailure(test, err,
details=details)
self._buffered_calls.append(
('addExpectedFailure', [test, err], {'details': details}))
else:
self._filtered()
def addUnexpectedSuccess(self, test, details=None):
self.decorated.startTest(test)
return self.decorated.addUnexpectedSuccess(test, details=details)
self._buffered_calls.append(
('addUnexpectedSuccess', [test], {'details': details}))
def _filtered(self):
self._current_test_filtered = True
def _failure_expected(self, test):
return (test.id() in self._fixup_expected_failures)
def startTest(self, test):
"""Start a test.
@ -292,7 +406,7 @@ class TestResultFilter(TestResultDecorator):
"""
self._current_test = test
self._current_test_filtered = False
self._current_test_tags = set(), set()
self._buffered_calls.append(('startTest', [test], {}))
def stopTest(self, test):
"""Stop a test.
@ -302,29 +416,18 @@ class TestResultFilter(TestResultDecorator):
"""
if not self._current_test_filtered:
# Tags to output for this test.
if self._current_test_tags[0] or self._current_test_tags[1]:
self.decorated.tags(*self._current_test_tags)
for method, args, kwargs in self._buffered_calls:
getattr(self.decorated, method)(*args, **kwargs)
self.decorated.stopTest(test)
self._current_test = None
self._current_test_filtered = None
self._current_test_tags = None
self._buffered_calls = []
def tags(self, new_tags, gone_tags):
"""Handle tag instructions.
Adds and removes tags as appropriate. If a test is currently running,
tags are not affected for subsequent tests.
:param new_tags: Tags to add,
:param gone_tags: Tags to remove.
"""
def time(self, a_time):
if self._current_test is not None:
# gather the tags until the test stops.
self._current_test_tags[0].update(new_tags)
self._current_test_tags[0].difference_update(gone_tags)
self._current_test_tags[1].update(gone_tags)
self._current_test_tags[1].difference_update(new_tags)
return self.decorated.tags(new_tags, gone_tags)
self._buffered_calls.append(('time', [a_time], {}))
else:
return self.decorated.time(a_time)
def id_to_orig_id(self, id):
if id.startswith("subunit.RemotedTestCase."):
@ -336,10 +439,10 @@ class TestIdPrintingResult(testtools.TestResult):
def __init__(self, stream, show_times=False):
"""Create a FilterResult object outputting to stream."""
testtools.TestResult.__init__(self)
super(TestIdPrintingResult, self).__init__()
self._stream = stream
self.failed_tests = 0
self.__time = 0
self.__time = None
self.show_times = show_times
self._test = None
self._test_duration = 0
@ -355,6 +458,16 @@ class TestIdPrintingResult(testtools.TestResult):
def addSuccess(self, test):
self._test = test
def addSkip(self, test, reason=None, details=None):
self._test = test
def addUnexpectedSuccess(self, test, details=None):
self.failed_tests += 1
self._test = test
def addExpectedFailure(self, test, err=None, details=None):
self._test = test
def reportTest(self, test, duration):
if self.show_times:
seconds = duration.seconds

View File

@ -53,7 +53,7 @@ def visitTests(suite, visitor):
visitor.visitSuite(test)
visitTests(test, visitor)
else:
print "unvisitable non-unittest.TestCase element %r (%r)" % (test, test.__class__)
print ("unvisitable non-unittest.TestCase element %r (%r)" % (test, test.__class__))
class TestSuite(unittest.TestSuite):

View File

@ -1,5 +1,8 @@
#!/usr/bin/env python
import sys
if sys.platform == "win32":
import msvcrt, os
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
if len(sys.argv) == 2:
# subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args
# uses this code path to be sure that the arguments were passed to

View File

@ -1,6 +1,7 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
@ -14,9 +15,10 @@
# limitations under that license.
#
from cStringIO import StringIO
import unittest
from testtools.compat import _b, BytesIO
import subunit.chunked
@ -30,98 +32,121 @@ class TestDecode(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.output = StringIO()
self.output = BytesIO()
self.decoder = subunit.chunked.Decoder(self.output)
def test_close_read_length_short_errors(self):
self.assertRaises(ValueError, self.decoder.close)
def test_close_body_short_errors(self):
self.assertEqual(None, self.decoder.write('2\r\na'))
self.assertEqual(None, self.decoder.write(_b('2\r\na')))
self.assertRaises(ValueError, self.decoder.close)
def test_close_body_buffered_data_errors(self):
self.assertEqual(None, self.decoder.write('2\r'))
self.assertEqual(None, self.decoder.write(_b('2\r')))
self.assertRaises(ValueError, self.decoder.close)
def test_close_after_finished_stream_safe(self):
self.assertEqual(None, self.decoder.write('2\r\nab'))
self.assertEqual('', self.decoder.write('0\r\n'))
self.assertEqual(None, self.decoder.write(_b('2\r\nab')))
self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
self.decoder.close()
def test_decode_nothing(self):
self.assertEqual('', self.decoder.write('0\r\n'))
self.assertEqual('', self.output.getvalue())
self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
self.assertEqual(_b(''), self.output.getvalue())
def test_decode_serialised_form(self):
self.assertEqual(None, self.decoder.write("F\r\n"))
self.assertEqual(None, self.decoder.write("serialised\n"))
self.assertEqual('', self.decoder.write("form0\r\n"))
self.assertEqual(None, self.decoder.write(_b("F\r\n")))
self.assertEqual(None, self.decoder.write(_b("serialised\n")))
self.assertEqual(_b(''), self.decoder.write(_b("form0\r\n")))
def test_decode_short(self):
self.assertEqual('', self.decoder.write('3\r\nabc0\r\n'))
self.assertEqual('abc', self.output.getvalue())
self.assertEqual(_b(''), self.decoder.write(_b('3\r\nabc0\r\n')))
self.assertEqual(_b('abc'), self.output.getvalue())
def test_decode_combines_short(self):
self.assertEqual('', self.decoder.write('6\r\nabcdef0\r\n'))
self.assertEqual('abcdef', self.output.getvalue())
self.assertEqual(_b(''), self.decoder.write(_b('6\r\nabcdef0\r\n')))
self.assertEqual(_b('abcdef'), self.output.getvalue())
def test_decode_excess_bytes_from_write(self):
self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234'))
self.assertEqual('abc', self.output.getvalue())
self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
self.assertEqual(_b('abc'), self.output.getvalue())
def test_decode_write_after_finished_errors(self):
self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234'))
self.assertRaises(ValueError, self.decoder.write, '')
self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
self.assertRaises(ValueError, self.decoder.write, _b(''))
def test_decode_hex(self):
self.assertEqual('', self.decoder.write('A\r\n12345678900\r\n'))
self.assertEqual('1234567890', self.output.getvalue())
self.assertEqual(_b(''), self.decoder.write(_b('A\r\n12345678900\r\n')))
self.assertEqual(_b('1234567890'), self.output.getvalue())
def test_decode_long_ranges(self):
self.assertEqual(None, self.decoder.write('10000\r\n'))
self.assertEqual(None, self.decoder.write('1' * 65536))
self.assertEqual(None, self.decoder.write('10000\r\n'))
self.assertEqual(None, self.decoder.write('2' * 65536))
self.assertEqual('', self.decoder.write('0\r\n'))
self.assertEqual('1' * 65536 + '2' * 65536, self.output.getvalue())
self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
self.assertEqual(None, self.decoder.write(_b('1' * 65536)))
self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
self.assertEqual(None, self.decoder.write(_b('2' * 65536)))
self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
self.assertEqual(_b('1' * 65536 + '2' * 65536), self.output.getvalue())
def test_decode_newline_nonstrict(self):
"""Tolerate chunk markers with no CR character."""
# From <http://pad.lv/505078>
self.decoder = subunit.chunked.Decoder(self.output, strict=False)
self.assertEqual(None, self.decoder.write(_b('a\n')))
self.assertEqual(None, self.decoder.write(_b('abcdeabcde')))
self.assertEqual(_b(''), self.decoder.write(_b('0\n')))
self.assertEqual(_b('abcdeabcde'), self.output.getvalue())
def test_decode_strict_newline_only(self):
"""Reject chunk markers with no CR character in strict mode."""
# From <http://pad.lv/505078>
self.assertRaises(ValueError,
self.decoder.write, _b('a\n'))
def test_decode_strict_multiple_crs(self):
self.assertRaises(ValueError,
self.decoder.write, _b('a\r\r\n'))
def test_decode_short_header(self):
self.assertRaises(ValueError,
self.decoder.write, _b('\n'))
class TestEncode(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.output = StringIO()
self.output = BytesIO()
self.encoder = subunit.chunked.Encoder(self.output)
def test_encode_nothing(self):
self.encoder.close()
self.assertEqual('0\r\n', self.output.getvalue())
self.assertEqual(_b('0\r\n'), self.output.getvalue())
def test_encode_empty(self):
self.encoder.write('')
self.encoder.write(_b(''))
self.encoder.close()
self.assertEqual('0\r\n', self.output.getvalue())
self.assertEqual(_b('0\r\n'), self.output.getvalue())
def test_encode_short(self):
self.encoder.write('abc')
self.encoder.write(_b('abc'))
self.encoder.close()
self.assertEqual('3\r\nabc0\r\n', self.output.getvalue())
self.assertEqual(_b('3\r\nabc0\r\n'), self.output.getvalue())
def test_encode_combines_short(self):
self.encoder.write('abc')
self.encoder.write('def')
self.encoder.write(_b('abc'))
self.encoder.write(_b('def'))
self.encoder.close()
self.assertEqual('6\r\nabcdef0\r\n', self.output.getvalue())
self.assertEqual(_b('6\r\nabcdef0\r\n'), self.output.getvalue())
def test_encode_over_9_is_in_hex(self):
self.encoder.write('1234567890')
self.encoder.write(_b('1234567890'))
self.encoder.close()
self.assertEqual('A\r\n12345678900\r\n', self.output.getvalue())
self.assertEqual(_b('A\r\n12345678900\r\n'), self.output.getvalue())
def test_encode_long_ranges_not_combined(self):
self.encoder.write('1' * 65536)
self.encoder.write('2' * 65536)
self.encoder.write(_b('1' * 65536))
self.encoder.write(_b('2' * 65536))
self.encoder.close()
self.assertEqual('10000\r\n' + '1' * 65536 + '10000\r\n' +
'2' * 65536 + '0\r\n', self.output.getvalue())
self.assertEqual(_b('10000\r\n' + '1' * 65536 + '10000\r\n' +
'2' * 65536 + '0\r\n'), self.output.getvalue())

View File

@ -14,9 +14,10 @@
# limitations under that license.
#
from cStringIO import StringIO
import unittest
from testtools.compat import _b, StringIO
import subunit.tests
from subunit import content, content_type, details
@ -31,20 +32,20 @@ class TestSimpleDetails(unittest.TestCase):
def test_lineReceived(self):
parser = details.SimpleDetailsParser(None)
parser.lineReceived("foo\n")
parser.lineReceived("bar\n")
self.assertEqual("foo\nbar\n", parser._message)
parser.lineReceived(_b("foo\n"))
parser.lineReceived(_b("bar\n"))
self.assertEqual(_b("foo\nbar\n"), parser._message)
def test_lineReceived_escaped_bracket(self):
parser = details.SimpleDetailsParser(None)
parser.lineReceived("foo\n")
parser.lineReceived(" ]are\n")
parser.lineReceived("bar\n")
self.assertEqual("foo\n]are\nbar\n", parser._message)
parser.lineReceived(_b("foo\n"))
parser.lineReceived(_b(" ]are\n"))
parser.lineReceived(_b("bar\n"))
self.assertEqual(_b("foo\n]are\nbar\n"), parser._message)
def test_get_message(self):
parser = details.SimpleDetailsParser(None)
self.assertEqual("", parser.get_message())
self.assertEqual(_b(""), parser.get_message())
def test_get_details(self):
parser = details.SimpleDetailsParser(None)
@ -53,13 +54,13 @@ class TestSimpleDetails(unittest.TestCase):
expected['traceback'] = content.Content(
content_type.ContentType("text", "x-traceback",
{'charset': 'utf8'}),
lambda:[""])
lambda:[_b("")])
found = parser.get_details()
self.assertEqual(expected.keys(), found.keys())
self.assertEqual(expected['traceback'].content_type,
found['traceback'].content_type)
self.assertEqual(''.join(expected['traceback'].iter_bytes()),
''.join(found['traceback'].iter_bytes()))
self.assertEqual(_b('').join(expected['traceback'].iter_bytes()),
_b('').join(found['traceback'].iter_bytes()))
def test_get_details_skip(self):
parser = details.SimpleDetailsParser(None)
@ -67,7 +68,7 @@ class TestSimpleDetails(unittest.TestCase):
expected = {}
expected['reason'] = content.Content(
content_type.ContentType("text", "plain"),
lambda:[""])
lambda:[_b("")])
found = parser.get_details("skip")
self.assertEqual(expected, found)
@ -77,7 +78,7 @@ class TestSimpleDetails(unittest.TestCase):
expected = {}
expected['message'] = content.Content(
content_type.ContentType("text", "plain"),
lambda:[""])
lambda:[_b("")])
found = parser.get_details("success")
self.assertEqual(expected, found)
@ -94,18 +95,18 @@ class TestMultipartDetails(unittest.TestCase):
def test_parts(self):
parser = details.MultipartDetailsParser(None)
parser.lineReceived("Content-Type: text/plain\n")
parser.lineReceived("something\n")
parser.lineReceived("F\r\n")
parser.lineReceived("serialised\n")
parser.lineReceived("form0\r\n")
parser.lineReceived(_b("Content-Type: text/plain\n"))
parser.lineReceived(_b("something\n"))
parser.lineReceived(_b("F\r\n"))
parser.lineReceived(_b("serialised\n"))
parser.lineReceived(_b("form0\r\n"))
expected = {}
expected['something'] = content.Content(
content_type.ContentType("text", "plain"),
lambda:["serialised\nform"])
lambda:[_b("serialised\nform")])
found = parser.get_details()
self.assertEqual(expected.keys(), found.keys())
self.assertEqual(expected['something'].content_type,
found['something'].content_type)
self.assertEqual(''.join(expected['something'].iter_bytes()),
''.join(found['something'].iter_bytes()))
self.assertEqual(_b('').join(expected['something'].iter_bytes()),
_b('').join(found['something'].iter_bytes()))

View File

@ -6,7 +6,7 @@
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -16,103 +16,27 @@
"""Tests for subunit.TestResultFilter."""
from datetime import datetime
from subunit import iso8601
import unittest
from StringIO import StringIO
from testtools import TestCase
from testtools.compat import _b, BytesIO, StringIO
from testtools.testresult.doubles import ExtendedTestResult
import subunit
from subunit.test_results import TestResultFilter
class TestTestResultFilter(unittest.TestCase):
class TestTestResultFilter(TestCase):
"""Test for TestResultFilter, a TestResult object which filters tests."""
def _setUp(self):
self.output = StringIO()
def test_default(self):
"""The default is to exclude success and include everything else."""
self.filtered_result = unittest.TestResult()
self.filter = TestResultFilter(self.filtered_result)
self.run_tests()
# skips are seen as success by default python TestResult.
self.assertEqual(['error'],
[error[0].id() for error in self.filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
self.filtered_result.failures])
self.assertEqual(4, self.filtered_result.testsRun)
def test_exclude_errors(self):
self.filtered_result = unittest.TestResult()
self.filter = TestResultFilter(self.filtered_result,
filter_error=True)
self.run_tests()
# skips are seen as errors by default python TestResult.
self.assertEqual([], self.filtered_result.errors)
self.assertEqual(['failed'],
[failure[0].id() for failure in
self.filtered_result.failures])
self.assertEqual(3, self.filtered_result.testsRun)
def test_exclude_failure(self):
self.filtered_result = unittest.TestResult()
self.filter = TestResultFilter(self.filtered_result,
filter_failure=True)
self.run_tests()
self.assertEqual(['error'],
[error[0].id() for error in self.filtered_result.errors])
self.assertEqual([],
[failure[0].id() for failure in
self.filtered_result.failures])
self.assertEqual(3, self.filtered_result.testsRun)
def test_exclude_skips(self):
self.filtered_result = subunit.TestResultStats(None)
self.filter = TestResultFilter(self.filtered_result,
filter_skip=True)
self.run_tests()
self.assertEqual(0, self.filtered_result.skipped_tests)
self.assertEqual(2, self.filtered_result.failed_tests)
self.assertEqual(3, self.filtered_result.testsRun)
def test_include_success(self):
"""Success's can be included if requested."""
self.filtered_result = unittest.TestResult()
self.filter = TestResultFilter(self.filtered_result,
filter_success=False)
self.run_tests()
self.assertEqual(['error'],
[error[0].id() for error in self.filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
self.filtered_result.failures])
self.assertEqual(5, self.filtered_result.testsRun)
def test_filter_predicate(self):
"""You can filter by predicate callbacks"""
self.filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details):
return outcome == 'success'
self.filter = TestResultFilter(self.filtered_result,
filter_predicate=filter_cb,
filter_success=False)
self.run_tests()
# Only success should pass
self.assertEqual(1, self.filtered_result.testsRun)
def run_tests(self):
self.setUpTestStream()
self.test = subunit.ProtocolTestCase(self.input_stream)
self.test.run(self.filter)
def setUpTestStream(self):
# While TestResultFilter works on python objects, using a subunit
# stream is an easy pithy way of getting a series of test objects to
# call into the TestResult, and as TestResultFilter is intended for
# use with subunit also has the benefit of detecting any interface
# skew issues.
self.input_stream = StringIO()
self.input_stream.write("""tags: global
# While TestResultFilter works on python objects, using a subunit stream
# is an easy pithy way of getting a series of test objects to call into
# the TestResult, and as TestResultFilter is intended for use with subunit
# also has the benefit of detecting any interface skew issues.
example_subunit_stream = _b("""\
tags: global
test passed
success passed
test failed
@ -127,8 +51,156 @@ skip skipped
test todo
xfail todo
""")
self.input_stream.seek(0)
def run_tests(self, result_filter, input_stream=None):
"""Run tests through the given filter.
:param result_filter: A filtering TestResult object.
:param input_stream: Bytes of subunit stream data. If not provided,
uses TestTestResultFilter.example_subunit_stream.
"""
if input_stream is None:
input_stream = self.example_subunit_stream
test = subunit.ProtocolTestCase(BytesIO(input_stream))
test.run(result_filter)
def test_default(self):
"""The default is to exclude success and include everything else."""
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result)
self.run_tests(result_filter)
# skips are seen as success by default python TestResult.
self.assertEqual(['error'],
[error[0].id() for error in filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
filtered_result.failures])
self.assertEqual(4, filtered_result.testsRun)
def test_exclude_errors(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result, filter_error=True)
self.run_tests(result_filter)
# skips are seen as errors by default python TestResult.
self.assertEqual([], filtered_result.errors)
self.assertEqual(['failed'],
[failure[0].id() for failure in
filtered_result.failures])
self.assertEqual(3, filtered_result.testsRun)
def test_fixup_expected_failures(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result,
fixup_expected_failures=set(["failed"]))
self.run_tests(result_filter)
self.assertEqual(['failed', 'todo'],
[failure[0].id() for failure in filtered_result.expectedFailures])
self.assertEqual([], filtered_result.failures)
self.assertEqual(4, filtered_result.testsRun)
def test_fixup_expected_errors(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result,
fixup_expected_failures=set(["error"]))
self.run_tests(result_filter)
self.assertEqual(['error', 'todo'],
[failure[0].id() for failure in filtered_result.expectedFailures])
self.assertEqual([], filtered_result.errors)
self.assertEqual(4, filtered_result.testsRun)
def test_fixup_unexpected_success(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result, filter_success=False,
fixup_expected_failures=set(["passed"]))
self.run_tests(result_filter)
self.assertEqual(['passed'],
[passed.id() for passed in filtered_result.unexpectedSuccesses])
self.assertEqual(5, filtered_result.testsRun)
def test_exclude_failure(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result, filter_failure=True)
self.run_tests(result_filter)
self.assertEqual(['error'],
[error[0].id() for error in filtered_result.errors])
self.assertEqual([],
[failure[0].id() for failure in
filtered_result.failures])
self.assertEqual(3, filtered_result.testsRun)
def test_exclude_skips(self):
filtered_result = subunit.TestResultStats(None)
result_filter = TestResultFilter(filtered_result, filter_skip=True)
self.run_tests(result_filter)
self.assertEqual(0, filtered_result.skipped_tests)
self.assertEqual(2, filtered_result.failed_tests)
self.assertEqual(3, filtered_result.testsRun)
def test_include_success(self):
"""Successes can be included if requested."""
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result,
filter_success=False)
self.run_tests(result_filter)
self.assertEqual(['error'],
[error[0].id() for error in filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
filtered_result.failures])
self.assertEqual(5, filtered_result.testsRun)
def test_filter_predicate(self):
"""You can filter by predicate callbacks"""
filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details):
return outcome == 'success'
result_filter = TestResultFilter(filtered_result,
filter_predicate=filter_cb,
filter_success=False)
self.run_tests(result_filter)
# Only success should pass
self.assertEqual(1, filtered_result.testsRun)
def test_time_ordering_preserved(self):
# Passing a subunit stream through TestResultFilter preserves the
# relative ordering of 'time' directives and any other subunit
# directives that are still included.
date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
subunit_stream = _b('\n'.join([
"time: %s",
"test: foo",
"time: %s",
"error: foo",
"time: %s",
""]) % (date_a, date_b, date_c))
result = ExtendedTestResult()
result_filter = TestResultFilter(result)
self.run_tests(result_filter, subunit_stream)
foo = subunit.RemotedTestCase('foo')
self.assertEquals(
[('time', date_a),
('startTest', foo),
('time', date_b),
('addError', foo, {}),
('stopTest', foo),
('time', date_c)], result._events)
def test_skip_preserved(self):
subunit_stream = _b('\n'.join([
"test: foo",
"skip: foo",
""]))
result = ExtendedTestResult()
result_filter = TestResultFilter(result)
self.run_tests(result_filter, subunit_stream)
foo = subunit.RemotedTestCase('foo')
self.assertEquals(
[('startTest', foo),
('addSkip', foo, {}),
('stopTest', foo), ], result._events)
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()

View File

@ -17,7 +17,8 @@
"""Tests for subunit.TestResultStats."""
import unittest
from StringIO import StringIO
from testtools.compat import _b, BytesIO, StringIO
import subunit
@ -28,7 +29,7 @@ class TestTestResultStats(unittest.TestCase):
def setUp(self):
self.output = StringIO()
self.result = subunit.TestResultStats(self.output)
self.input_stream = StringIO()
self.input_stream = BytesIO()
self.test = subunit.ProtocolTestCase(self.input_stream)
def test_stats_empty(self):
@ -39,7 +40,7 @@ class TestTestResultStats(unittest.TestCase):
self.assertEqual(set(), self.result.seen_tags)
def setUpUsedStream(self):
self.input_stream.write("""tags: global
self.input_stream.write(_b("""tags: global
test passed
success passed
test failed
@ -51,7 +52,7 @@ test skipped
skip skipped
test todo
xfail todo
""")
"""))
self.input_stream.seek(0)
self.test.run(self.result)

View File

@ -17,7 +17,8 @@
"""Tests for subunit.tag_stream."""
import unittest
from StringIO import StringIO
from testtools.compat import StringIO
import subunit
import subunit.test_results

View File

@ -17,7 +17,9 @@
"""Tests for TAP2SubUnit."""
import unittest
from StringIO import StringIO
from testtools.compat import StringIO
import subunit

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,7 @@
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -16,12 +16,9 @@
import datetime
import unittest
from StringIO import StringIO
import os
import sys
from testtools.content_type import ContentType
from testtools.content import Content
from testtools import TestCase
from testtools.testresult.doubles import ExtendedTestResult
import subunit
import subunit.iso8601 as iso8601
@ -82,22 +79,22 @@ class TestHookedTestResultDecorator(unittest.TestCase):
def test_startTest(self):
self.result.startTest(self)
def test_startTestRun(self):
self.result.startTestRun()
def test_stopTest(self):
self.result.stopTest(self)
def test_stopTestRun(self):
self.result.stopTestRun()
def test_addError(self):
self.result.addError(self, subunit.RemoteError())
def test_addError_details(self):
self.result.addError(self, details={})
def test_addFailure(self):
self.result.addFailure(self, subunit.RemoteError())
@ -142,7 +139,7 @@ class TestHookedTestResultDecorator(unittest.TestCase):
def test_time(self):
self.result.time(None)
class TestAutoTimingTestResultDecorator(unittest.TestCase):
@ -193,6 +190,110 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase):
self.assertNotEqual(None, self.decorated._calls[2])
class TestTagCollapsingDecorator(TestCase):
def test_tags_forwarded_outside_of_tests(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
tag_collapser.tags(set(['a', 'b']), set())
self.assertEquals(
[('tags', set(['a', 'b']), set([]))], result._events)
def test_tags_collapsed_inside_of_tests(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
test = subunit.RemotedTestCase('foo')
tag_collapser.startTest(test)
tag_collapser.tags(set(['a']), set())
tag_collapser.tags(set(['b']), set(['a']))
tag_collapser.tags(set(['c']), set())
tag_collapser.stopTest(test)
self.assertEquals(
[('startTest', test),
('tags', set(['b', 'c']), set(['a'])),
('stopTest', test)],
result._events)
def test_tags_collapsed_inside_of_tests_different_ordering(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
test = subunit.RemotedTestCase('foo')
tag_collapser.startTest(test)
tag_collapser.tags(set(), set(['a']))
tag_collapser.tags(set(['a', 'b']), set())
tag_collapser.tags(set(['c']), set())
tag_collapser.stopTest(test)
self.assertEquals(
[('startTest', test),
('tags', set(['a', 'b', 'c']), set()),
('stopTest', test)],
result._events)
class TestTimeCollapsingDecorator(TestCase):
def make_time(self):
# Heh heh.
return datetime.datetime(
2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC)
def test_initial_time_forwarded(self):
# We always forward the first time event we see.
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
a_time = self.make_time()
tag_collapser.time(a_time)
self.assertEquals([('time', a_time)], result._events)
def test_time_collapsed_to_first_and_last(self):
# If there are many consecutive time events, only the first and last
# are sent through.
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
times = [self.make_time() for i in range(5)]
for a_time in times:
tag_collapser.time(a_time)
tag_collapser.startTest(subunit.RemotedTestCase('foo'))
self.assertEquals(
[('time', times[0]), ('time', times[-1])], result._events[:-1])
def test_only_one_time_sent(self):
# If we receive a single time event followed by a non-time event, we
# send exactly one time event.
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
a_time = self.make_time()
tag_collapser.time(a_time)
tag_collapser.startTest(subunit.RemotedTestCase('foo'))
self.assertEquals([('time', a_time)], result._events[:-1])
def test_duplicate_times_not_sent(self):
# Many time events with the exact same time are collapsed into one
# time event.
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
a_time = self.make_time()
for i in range(5):
tag_collapser.time(a_time)
tag_collapser.startTest(subunit.RemotedTestCase('foo'))
self.assertEquals([('time', a_time)], result._events[:-1])
def test_no_times_inserted(self):
result = ExtendedTestResult()
tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
a_time = self.make_time()
tag_collapser.time(a_time)
foo = subunit.RemotedTestCase('foo')
tag_collapser.startTest(foo)
tag_collapser.addSuccess(foo)
tag_collapser.stopTest(foo)
self.assertEquals(
[('time', a_time),
('startTest', foo),
('addSuccess', foo),
('stopTest', foo)], result._events)
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)