1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-10 01:18:15 +03:00

testtools: Update to latest upstream version.

This commit is contained in:
Jelmer Vernooij 2012-12-26 22:11:04 +01:00
parent 24957527e0
commit 983a3ea437
19 changed files with 504 additions and 64 deletions

View File

@ -16,6 +16,7 @@ The testtools authors are:
* Christian Kampka
* Gavin Panella
* Martin Pool
* Vincent Ladeuil
and are collectively referred to as "testtools developers".

View File

@ -6,6 +6,58 @@ Changes and improvements to testtools_, grouped by release.
NEXT
~~~~
0.9.24
~~~~~~
Changes
-------
* ``testtools.run discover`` will now sort the tests it discovered. This is a
workaround for http://bugs.python.org/issue16709. Non-standard test suites
are preserved, and their ``sort_tests()`` method called (if they have such an
attribute). ``testtools.testsuite.sorted_tests(suite, True)`` can be used by
such suites to do a local sort. (Robert Collins, #1091512)
* ``ThreadsafeForwardingResult`` now defines a stub ``progress`` method, which
fixes ``testr run`` of streams containing progress markers (by discarding the
progress data). (Robert Collins, #1019165)
0.9.23
~~~~~~
Changes
-------
* ``run.TestToolsTestRunner`` now accepts the verbosity, buffer and failfast
arguments the upstream python TestProgram code wants to give it, making it
possible to support them in a compatible fashion. (Robert Collins)
Improvements
------------
* ``testtools.run`` now supports the ``-f`` or ``--failfast`` parameter.
Previously it was advertised in the help but ignored.
(Robert Collins, #1090582)
* ``AnyMatch`` added, a new matcher that matches when any item in a collection
matches the given matcher. (Jonathan Lange)
* Spelling corrections to documentation. (Vincent Ladeuil)
* ``TestProgram`` now has a sane default for its ``testRunner`` argument.
(Vincent Ladeuil)
* The test suite passes on Python 3 again. (Robert Collins)
0.9.22
~~~~~~
Improvements
------------
* ``content_from_file`` and ``content_from_stream`` now accept seek_offset and
seek_whence parameters allowing them to be used to grab less than the full
stream, or to be used with StringIO streams. (Robert Collins, #1088693)
0.9.21
~~~~~~

View File

@ -222,6 +222,17 @@ A test suite that sets up a fixture_ before running any tests, and then tears
it down after all of the tests are run. The fixture is *not* made available to
any of the tests.
sorted_tests
------------
Given the composite structure of TestSuite / TestCase, sorting tests is
problematic - you can't tell what functionality is embedded into custom Suite
implementations. In order to deliver consistent test orders when using test
discovery (see http://bugs.python.org/issue16709), testtools flattens and
sorts tests that have the standard TestSuite, defines a new method sort_tests,
which can be used by non-standard TestSuites to know when they should sort
their tests.
.. _`testtools API docs`: http://mumak.net/testtools/apidocs/
.. _unittest: http://docs.python.org/library/unittest.html
.. _fixture: http://pypi.python.org/pypi/fixtures

View File

@ -432,7 +432,7 @@ example::
def test_keys_equal(self):
x = {'a': 1, 'b': 2}
y = {'a': 2, 'b': 3}
self.assertThat(a, KeysEqual(b))
self.assertThat(x, KeysEqual(y))
MatchesRegex

View File

@ -71,7 +71,9 @@ setup(name='testtools',
'framework'),
long_description=get_long_description(),
version=get_version(),
classifiers=["License :: OSI Approved :: MIT License"],
classifiers=["License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
],
packages=[
'testtools',
'testtools.matchers',

View File

@ -86,4 +86,4 @@ from testtools.distutilscmd import (
# If the releaselevel is 'final', then the tarball will be major.minor.micro.
# Otherwise it is major.minor.micro~$(revno).
__version__ = (0, 9, 22, 'dev', 0)
__version__ = (0, 9, 25, 'dev', 0)

View File

@ -33,12 +33,16 @@ STDOUT_LINE = '\nStdout:\n%s'
STDERR_LINE = '\nStderr:\n%s'
def _iter_chunks(stream, chunk_size):
def _iter_chunks(stream, chunk_size, seek_offset=None, seek_whence=0):
"""Read 'stream' in chunks of 'chunk_size'.
:param stream: A file-like object to read from.
:param chunk_size: The size of each read from 'stream'.
:param seek_offset: If non-None, seek before iterating.
:param seek_whence: Pass through to the seek call, if seeking.
"""
if seek_offset is not None:
stream.seek(seek_offset, seek_whence)
chunk = stream.read(chunk_size)
while chunk:
yield chunk
@ -215,7 +219,7 @@ def maybe_wrap(wrapper, func):
def content_from_file(path, content_type=None, chunk_size=DEFAULT_CHUNK_SIZE,
buffer_now=False):
buffer_now=False, seek_offset=None, seek_whence=0):
"""Create a `Content` object from a file on disk.
Note that unless 'read_now' is explicitly passed in as True, the file
@ -228,6 +232,8 @@ def content_from_file(path, content_type=None, chunk_size=DEFAULT_CHUNK_SIZE,
Defaults to ``DEFAULT_CHUNK_SIZE``.
:param buffer_now: If True, read the file from disk now and keep it in
memory. Otherwise, only read when the content is serialized.
:param seek_offset: If non-None, seek within the stream before reading it.
:param seek_whence: If supplied, pass to stream.seek() when seeking.
"""
if content_type is None:
content_type = UTF8_TEXT
@ -236,14 +242,15 @@ def content_from_file(path, content_type=None, chunk_size=DEFAULT_CHUNK_SIZE,
# We drop older python support we can make this use a context manager
# for maximum simplicity.
stream = open(path, 'rb')
for chunk in _iter_chunks(stream, chunk_size):
for chunk in _iter_chunks(stream, chunk_size, seek_offset, seek_whence):
yield chunk
stream.close()
return content_from_reader(reader, content_type, buffer_now)
def content_from_stream(stream, content_type=None,
chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=False):
chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=False,
seek_offset=None, seek_whence=0):
"""Create a `Content` object from a file-like stream.
Note that the stream will only be read from when ``iter_bytes`` is
@ -257,10 +264,12 @@ def content_from_stream(stream, content_type=None,
Defaults to ``DEFAULT_CHUNK_SIZE``.
:param buffer_now: If True, reads from the stream right now. Otherwise,
only reads when the content is serialized. Defaults to False.
:param seek_offset: If non-None, seek within the stream before reading it.
:param seek_whence: If supplied, pass to stream.seek() when seeking.
"""
if content_type is None:
content_type = UTF8_TEXT
reader = lambda: _iter_chunks(stream, chunk_size)
reader = lambda: _iter_chunks(stream, chunk_size, seek_offset, seek_whence)
return content_from_reader(reader, content_type, buffer_now)

View File

@ -236,6 +236,26 @@ class AllMatch(object):
return MismatchesAll(mismatches)
class AnyMatch(object):
"""Matches if any of the provided values match the given matcher."""
def __init__(self, matcher):
self.matcher = matcher
def __str__(self):
return 'AnyMatch(%s)' % (self.matcher,)
def match(self, values):
mismatches = []
for value in values:
mismatch = self.matcher.match(value)
if mismatch:
mismatches.append(mismatch)
else:
return None
return MismatchesAll(mismatches)
class MatchesPredicate(Matcher):
"""Match if a given function returns True.

View File

@ -14,7 +14,7 @@ import sys
from testtools import TextTestResult
from testtools.compat import classtypes, istext, unicode_output_stream
from testtools.testsuite import iterate_tests
from testtools.testsuite import iterate_tests, sorted_tests
defaultTestLoader = unittest.defaultTestLoader
@ -35,12 +35,19 @@ else:
class TestToolsTestRunner(object):
""" A thunk object to support unittest.TestProgram."""
def __init__(self, stdout):
self.stdout = stdout
def __init__(self, verbosity=None, failfast=None, buffer=None):
"""Create a TestToolsTestRunner.
:param verbosity: Ignored.
:param failfast: Stop running tests at the first failure.
:param buffer: Ignored.
"""
self.failfast = failfast
def run(self, test):
"Run the given test case or test suite."
result = TextTestResult(unicode_output_stream(self.stdout))
result = TextTestResult(
unicode_output_stream(sys.stdout), failfast=self.failfast)
result.startTestRun()
try:
return test.run(result)
@ -68,6 +75,8 @@ class TestToolsTestRunner(object):
# - --load-list has been added which can reduce the tests used (should be
# upstreamed).
# - The limitation of using getopt is declared to the user.
# - http://bugs.python.org/issue16709 is worked around, by sorting tests when
# discover is used.
FAILFAST = " -f, --failfast Stop on first failure\n"
CATCHBREAK = " -c, --catch Catch control-C and display results\n"
@ -300,14 +309,24 @@ class TestProgram(object):
top_level_dir = options.top
loader = Loader()
self.test = loader.discover(start_dir, pattern, top_level_dir)
# See http://bugs.python.org/issue16709
# While sorting here is intrusive, its better than being random.
# Rules for the sort:
# - standard suites are flattened, and the resulting tests sorted by
# id.
# - non-standard suites are preserved as-is, and sorted into position
# by the first test found by iterating the suite.
# We do this by a DSU process: flatten and grab a key, sort, strip the
# keys.
loaded = loader.discover(start_dir, pattern, top_level_dir)
self.test = sorted_tests(loaded)
def runTests(self):
if (self.catchbreak
and getattr(unittest, 'installHandler', None) is not None):
unittest.installHandler()
if self.testRunner is None:
self.testRunner = runner.TextTestRunner
self.testRunner = TestToolsTestRunner
if isinstance(self.testRunner, classtypes()):
try:
testRunner = self.testRunner(verbosity=self.verbosity,
@ -325,8 +344,8 @@ class TestProgram(object):
################
def main(argv, stdout):
runner = TestToolsTestRunner(stdout)
program = TestProgram(argv=argv, testRunner=runner, stdout=stdout)
program = TestProgram(argv=argv, testRunner=TestToolsTestRunner,
stdout=stdout)
if __name__ == '__main__':
main(sys.argv, sys.stdout)

View File

@ -19,6 +19,7 @@ class LoggingBase(object):
self._events = []
self.shouldStop = False
self._was_successful = True
self.testsRun = 0
class Python26TestResult(LoggingBase):
@ -37,6 +38,7 @@ class Python26TestResult(LoggingBase):
def startTest(self, test):
self._events.append(('startTest', test))
self.testsRun += 1
def stop(self):
self.shouldStop = True
@ -51,6 +53,20 @@ class Python26TestResult(LoggingBase):
class Python27TestResult(Python26TestResult):
"""A precisely python 2.7 like test result, that logs."""
def __init__(self):
super(Python27TestResult, self).__init__()
self.failfast = False
def addError(self, test, err):
super(Python27TestResult, self).addError(test, err)
if self.failfast:
self.stop()
def addFailure(self, test, err):
super(Python27TestResult, self).addFailure(test, err)
if self.failfast:
self.stop()
def addExpectedFailure(self, test, err):
self._events.append(('addExpectedFailure', test, err))
@ -59,6 +75,8 @@ class Python27TestResult(Python26TestResult):
def addUnexpectedSuccess(self, test):
self._events.append(('addUnexpectedSuccess', test))
if self.failfast:
self.stop()
def startTestRun(self):
self._events.append(('startTestRun',))

View File

@ -21,6 +21,7 @@ from testtools.content import (
text_content,
TracebackContent,
)
from testtools.helpers import safe_hasattr
from testtools.tags import TagContext
# From http://docs.python.org/library/datetime.html
@ -60,11 +61,12 @@ class TestResult(unittest.TestResult):
:ivar skip_reasons: A dict of skip-reasons -> list of tests. See addSkip.
"""
def __init__(self):
def __init__(self, failfast=False):
# startTestRun resets all attributes, and older clients don't know to
# call startTestRun, so it is called once here.
# Because subclasses may reasonably not expect this, we call the
# specific version we want to run.
self.failfast = failfast
TestResult.startTestRun(self)
def addExpectedFailure(self, test, err=None, details=None):
@ -89,6 +91,8 @@ class TestResult(unittest.TestResult):
"""
self.errors.append((test,
self._err_details_to_string(test, err, details)))
if self.failfast:
self.stop()
def addFailure(self, test, err=None, details=None):
"""Called when an error has occurred. 'err' is a tuple of values as
@ -99,6 +103,8 @@ class TestResult(unittest.TestResult):
"""
self.failures.append((test,
self._err_details_to_string(test, err, details)))
if self.failfast:
self.stop()
def addSkip(self, test, reason=None, details=None):
"""Called when a test has been skipped rather than running.
@ -131,6 +137,8 @@ class TestResult(unittest.TestResult):
def addUnexpectedSuccess(self, test, details=None):
"""Called when a test was expected to fail, but succeed."""
self.unexpectedSuccesses.append(test)
if self.failfast:
self.stop()
def wasSuccessful(self):
"""Has this result been successful so far?
@ -174,6 +182,8 @@ class TestResult(unittest.TestResult):
pristine condition ready for use in another test run. Note that this
is different from Python 2.7's startTestRun, which does nothing.
"""
# failfast is reset by the super __init__, so stash it.
failfast = self.failfast
super(TestResult, self).__init__()
self.skip_reasons = {}
self.__now = None
@ -181,6 +191,7 @@ class TestResult(unittest.TestResult):
# -- Start: As per python 2.7 --
self.expectedFailures = []
self.unexpectedSuccesses = []
self.failfast = failfast
# -- End: As per python 2.7 --
def stopTestRun(self):
@ -236,8 +247,9 @@ class MultiTestResult(TestResult):
"""A test result that dispatches to many test results."""
def __init__(self, *results):
super(MultiTestResult, self).__init__()
# Setup _results first, as the base class __init__ assigns to failfast.
self._results = list(map(ExtendedToOriginalDecorator, results))
super(MultiTestResult, self).__init__()
def __repr__(self):
return '<%s (%s)>' % (
@ -248,10 +260,26 @@ class MultiTestResult(TestResult):
getattr(result, message)(*args, **kwargs)
for result in self._results)
def _get_failfast(self):
return getattr(self._results[0], 'failfast', False)
def _set_failfast(self, value):
self._dispatch('__setattr__', 'failfast', value)
failfast = property(_get_failfast, _set_failfast)
def _get_shouldStop(self):
return any(self._dispatch('__getattr__', 'shouldStop'))
def _set_shouldStop(self, value):
# Called because we subclass TestResult. Probably should not do that.
pass
shouldStop = property(_get_shouldStop, _set_shouldStop)
def startTest(self, test):
super(MultiTestResult, self).startTest(test)
return self._dispatch('startTest', test)
def stop(self):
return self._dispatch('stop')
def stopTest(self, test):
super(MultiTestResult, self).stopTest(test)
return self._dispatch('stopTest', test)
@ -303,9 +331,9 @@ class MultiTestResult(TestResult):
class TextTestResult(TestResult):
"""A TestResult which outputs activity to a text stream."""
def __init__(self, stream):
def __init__(self, stream, failfast=False):
"""Construct a TextTestResult writing to stream."""
super(TextTestResult, self).__init__()
super(TextTestResult, self).__init__(failfast=failfast)
self.stream = stream
self.sep1 = '=' * 70 + '\n'
self.sep2 = '-' * 70 + '\n'
@ -443,6 +471,9 @@ class ThreadsafeForwardingResult(TestResult):
self._add_result_with_semaphore(self.result.addUnexpectedSuccess,
test, details=details)
def progress(self, offset, whence):
pass
def startTestRun(self):
super(ThreadsafeForwardingResult, self).startTestRun()
self.semaphore.acquire()
@ -451,6 +482,24 @@ class ThreadsafeForwardingResult(TestResult):
finally:
self.semaphore.release()
def _get_shouldStop(self):
self.semaphore.acquire()
try:
return self.result.shouldStop
finally:
self.semaphore.release()
def _set_shouldStop(self, value):
# Another case where we should not subclass TestResult
pass
shouldStop = property(_get_shouldStop, _set_shouldStop)
def stop(self):
self.semaphore.acquire()
try:
self.result.stop()
finally:
self.semaphore.release()
def stopTestRun(self):
self.semaphore.acquire()
try:
@ -507,6 +556,8 @@ class ExtendedToOriginalDecorator(object):
def __init__(self, decorated):
self.decorated = decorated
self._tags = TagContext()
# Only used for old TestResults that do not have failfast.
self._failfast = False
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.decorated)
@ -515,14 +566,18 @@ class ExtendedToOriginalDecorator(object):
return getattr(self.decorated, name)
def addError(self, test, err=None, details=None):
self._check_args(err, details)
if details is not None:
try:
return self.decorated.addError(test, details=details)
except TypeError:
# have to convert
err = self._details_to_exc_info(details)
return self.decorated.addError(test, err)
try:
self._check_args(err, details)
if details is not None:
try:
return self.decorated.addError(test, details=details)
except TypeError:
# have to convert
err = self._details_to_exc_info(details)
return self.decorated.addError(test, err)
finally:
if self.failfast:
self.stop()
def addExpectedFailure(self, test, err=None, details=None):
self._check_args(err, details)
@ -539,14 +594,18 @@ class ExtendedToOriginalDecorator(object):
return addExpectedFailure(test, err)
def addFailure(self, test, err=None, details=None):
self._check_args(err, details)
if details is not None:
try:
return self.decorated.addFailure(test, details=details)
except TypeError:
# have to convert
err = self._details_to_exc_info(details)
return self.decorated.addFailure(test, err)
try:
self._check_args(err, details)
if details is not None:
try:
return self.decorated.addFailure(test, details=details)
except TypeError:
# have to convert
err = self._details_to_exc_info(details)
return self.decorated.addFailure(test, err)
finally:
if self.failfast:
self.stop()
def addSkip(self, test, reason=None, details=None):
self._check_args(reason, details)
@ -565,18 +624,22 @@ class ExtendedToOriginalDecorator(object):
return addSkip(test, reason)
def addUnexpectedSuccess(self, test, details=None):
outcome = getattr(self.decorated, 'addUnexpectedSuccess', None)
if outcome is None:
try:
test.fail("")
except test.failureException:
return self.addFailure(test, sys.exc_info())
if details is not None:
try:
return outcome(test, details=details)
except TypeError:
pass
return outcome(test)
try:
outcome = getattr(self.decorated, 'addUnexpectedSuccess', None)
if outcome is None:
try:
test.fail("")
except test.failureException:
return self.addFailure(test, sys.exc_info())
if details is not None:
try:
return outcome(test, details=details)
except TypeError:
pass
return outcome(test)
finally:
if self.failfast:
self.stop()
def addSuccess(self, test, details=None):
if details is not None:
@ -614,6 +677,15 @@ class ExtendedToOriginalDecorator(object):
except AttributeError:
return
def _get_failfast(self):
return getattr(self.decorated, 'failfast', self._failfast)
def _set_failfast(self, value):
if safe_hasattr(self.decorated, 'failfast'):
self.decorated.failfast = value
else:
self._failfast = value
failfast = property(_get_failfast, _set_failfast)
def progress(self, offset, whence):
method = getattr(self.decorated, 'progress', None)
if method is None:

View File

@ -38,6 +38,10 @@ class LoggingResult(TestResult):
self._events.append(('startTest', test))
super(LoggingResult, self).startTest(test)
def stop(self):
self._events.append('stop')
super(LoggingResult, self).stop()
def stopTest(self, test):
self._events.append(('stopTest', test))
super(LoggingResult, self).stopTest(test)

View File

@ -14,6 +14,7 @@ from testtools.matchers._higherorder import (
AllMatch,
Annotate,
AnnotatedMismatch,
AnyMatch,
MatchesAny,
MatchesAll,
MatchesPredicate,
@ -50,6 +51,38 @@ class TestAllMatch(TestCase, TestMatchersInterface):
]
class TestAnyMatch(TestCase, TestMatchersInterface):
matches_matcher = AnyMatch(Equals('elephant'))
matches_matches = [
['grass', 'cow', 'steak', 'milk', 'elephant'],
(13, 'elephant'),
['elephant', 'elephant', 'elephant'],
set(['hippo', 'rhino', 'elephant']),
]
matches_mismatches = [
[],
['grass', 'cow', 'steak', 'milk'],
(13, 12, 10),
['element', 'hephalump', 'pachyderm'],
set(['hippo', 'rhino', 'diplodocus']),
]
str_examples = [
("AnyMatch(Equals('elephant'))", AnyMatch(Equals('elephant'))),
]
describe_examples = [
('Differences: [\n'
'7 != 11\n'
'7 != 9\n'
'7 != 10\n'
']',
[11, 9, 10],
AnyMatch(Equals(7))),
]
class TestAfterPreprocessing(TestCase, TestMatchersInterface):
def parity(x):

View File

@ -9,6 +9,7 @@ from testtools import TestCase
from testtools.compat import (
_b,
_u,
BytesIO,
StringIO,
)
from testtools.content import (
@ -125,6 +126,26 @@ class TestContent(TestCase):
self.assertThat(
''.join(content.iter_text()), Equals('some data'))
def test_from_file_with_simple_seek(self):
f = tempfile.NamedTemporaryFile()
f.write(_b('some data'))
f.flush()
self.addCleanup(f.close)
content = content_from_file(
f.name, UTF8_TEXT, chunk_size=50, seek_offset=5)
self.assertThat(
list(content.iter_bytes()), Equals([_b('data')]))
def test_from_file_with_whence_seek(self):
f = tempfile.NamedTemporaryFile()
f.write(_b('some data'))
f.flush()
self.addCleanup(f.close)
content = content_from_file(
f.name, UTF8_TEXT, chunk_size=50, seek_offset=-4, seek_whence=2)
self.assertThat(
list(content.iter_bytes()), Equals([_b('data')]))
def test_from_stream(self):
data = StringIO('some data')
content = content_from_stream(data, UTF8_TEXT, chunk_size=2)
@ -148,6 +169,20 @@ class TestContent(TestCase):
self.assertThat(
''.join(content.iter_text()), Equals('some data'))
def test_from_stream_with_simple_seek(self):
data = BytesIO(_b('some data'))
content = content_from_stream(
data, UTF8_TEXT, chunk_size=50, seek_offset=5)
self.assertThat(
list(content.iter_bytes()), Equals([_b('data')]))
def test_from_stream_with_whence_seek(self):
data = BytesIO(_b('some data'))
content = content_from_stream(
data, UTF8_TEXT, chunk_size=50, seek_offset=-4, seek_whence=2)
self.assertThat(
list(content.iter_bytes()), Equals([_b('data')]))
def test_from_text(self):
data = _u("some data")
expected = Content(UTF8_TEXT, lambda: [data.encode('utf8')])

View File

@ -6,6 +6,7 @@ from distutils.dist import Distribution
from testtools.compat import (
_b,
_u,
BytesIO,
)
from testtools.helpers import try_import
@ -52,7 +53,7 @@ class TestCommandTest(TestCase):
def test_test_module(self):
self.useFixture(SampleTestFixture())
stream = BytesIO()
stdout = self.useFixture(fixtures.StringStream('stdout'))
dist = Distribution()
dist.script_name = 'setup.py'
dist.script_args = ['test']
@ -60,11 +61,11 @@ class TestCommandTest(TestCase):
dist.command_options = {
'test': {'test_module': ('command line', 'testtools.runexample')}}
cmd = dist.reinitialize_command('test')
cmd.runner.stdout = stream
dist.run_command('test')
with fixtures.MonkeyPatch('sys.stdout', stdout.stream):
dist.run_command('test')
self.assertThat(
stream.getvalue(),
MatchesRegex(_b("""Tests running...
stdout.getDetails()['stdout'].as_text(),
MatchesRegex(_u("""Tests running...
Ran 2 tests in \\d.\\d\\d\\ds
OK
@ -72,7 +73,7 @@ OK
def test_test_suite(self):
self.useFixture(SampleTestFixture())
stream = BytesIO()
stdout = self.useFixture(fixtures.StringStream('stdout'))
dist = Distribution()
dist.script_name = 'setup.py'
dist.script_args = ['test']
@ -82,11 +83,11 @@ OK
'test_suite': (
'command line', 'testtools.runexample.test_suite')}}
cmd = dist.reinitialize_command('test')
cmd.runner.stdout = stream
dist.run_command('test')
with fixtures.MonkeyPatch('sys.stdout', stdout.stream):
dist.run_command('test')
self.assertThat(
stream.getvalue(),
MatchesRegex(_b("""Tests running...
stdout.getDetails()['stdout'].as_text(),
MatchesRegex(_u("""Tests running...
Ran 2 tests in \\d.\\d\\d\\ds
OK

View File

@ -2,6 +2,8 @@
"""Tests for the test runner logic."""
from unittest import TestSuite
from testtools.compat import (
_b,
StringIO,
@ -11,6 +13,7 @@ fixtures = try_import('fixtures')
import testtools
from testtools import TestCase, run
from testtools.matchers import Contains
if fixtures:
@ -41,9 +44,12 @@ def test_suite():
class TestRun(TestCase):
def test_run_list(self):
def setUp(self):
super(TestRun, self).setUp()
if fixtures is None:
self.skipTest("Need fixtures")
def test_run_list(self):
self.useFixture(SampleTestFixture())
out = StringIO()
run.main(['prog', '-l', 'testtools.runexample.test_suite'], out)
@ -51,9 +57,7 @@ class TestRun(TestCase):
testtools.runexample.TestFoo.test_quux
""", out.getvalue())
def test_run_load_list(self):
if fixtures is None:
self.skipTest("Need fixtures")
def test_run_orders_tests(self):
self.useFixture(SampleTestFixture())
out = StringIO()
# We load two tests - one that exists and one that doesn't, and we
@ -74,6 +78,42 @@ testtools.runexample.missingtest
self.assertEqual("""testtools.runexample.TestFoo.test_bar
""", out.getvalue())
def test_run_load_list(self):
self.useFixture(SampleTestFixture())
out = StringIO()
# We load two tests - one that exists and one that doesn't, and we
# should get the one that exists and neither the one that doesn't nor
# the unmentioned one that does.
tempdir = self.useFixture(fixtures.TempDir())
tempname = tempdir.path + '/tests.list'
f = open(tempname, 'wb')
try:
f.write(_b("""
testtools.runexample.TestFoo.test_bar
testtools.runexample.missingtest
"""))
finally:
f.close()
run.main(['prog', '-l', '--load-list', tempname,
'testtools.runexample.test_suite'], out)
self.assertEqual("""testtools.runexample.TestFoo.test_bar
""", out.getvalue())
def test_run_failfast(self):
stdout = self.useFixture(fixtures.StringStream('stdout'))
class Failing(TestCase):
def test_a(self):
self.fail('a')
def test_b(self):
self.fail('b')
runner = run.TestToolsTestRunner(failfast=True)
with fixtures.MonkeyPatch('sys.stdout', stdout.stream):
runner.run(TestSuite([Failing('test_a'), Failing('test_b')]))
self.assertThat(
stdout.getDetails()['stdout'].as_text(), Contains('Ran 1 test'))
def test_suite():
from unittest import TestLoader

View File

@ -12,6 +12,7 @@ import shutil
import sys
import tempfile
import threading
from unittest import TestSuite
import warnings
from testtools import (
@ -43,6 +44,7 @@ from testtools.content import (
TracebackContent,
)
from testtools.content_type import ContentType, UTF8_TEXT
from testtools.helpers import safe_hasattr
from testtools.matchers import (
Contains,
DocTestMatches,
@ -142,6 +144,11 @@ class Python26Contract(object):
result.stopTest(self)
self.assertTrue(result.wasSuccessful())
def test_stop_sets_shouldStop(self):
result = self.makeResult()
result.stop()
self.assertTrue(result.shouldStop)
class Python27Contract(Python26Contract):
@ -193,6 +200,17 @@ class Python27Contract(Python26Contract):
result.startTestRun()
result.stopTestRun()
def test_failfast(self):
result = self.makeResult()
result.failfast = True
class Failing(TestCase):
def test_a(self):
self.fail('a')
def test_b(self):
self.fail('b')
TestSuite([Failing('test_a'), Failing('test_b')]).run(result)
self.assertEqual(1, result.testsRun)
class TagsContract(Python27Contract):
"""Tests to ensure correct tagging behaviour.
@ -566,12 +584,36 @@ class TestMultiTestResult(TestCase):
# `TestResult`s.
self.assertResultLogsEqual([])
def test_failfast_get(self):
# Reading reads from the first one - arbitrary choice.
self.assertEqual(False, self.multiResult.failfast)
self.result1.failfast = True
self.assertEqual(True, self.multiResult.failfast)
def test_failfast_set(self):
# Writing writes to all.
self.multiResult.failfast = True
self.assertEqual(True, self.result1.failfast)
self.assertEqual(True, self.result2.failfast)
def test_shouldStop(self):
self.assertFalse(self.multiResult.shouldStop)
self.result2.stop()
# NB: result1 is not stopped: MultiTestResult has to combine the
# values.
self.assertTrue(self.multiResult.shouldStop)
def test_startTest(self):
# Calling `startTest` on a `MultiTestResult` calls `startTest` on all
# its `TestResult`s.
self.multiResult.startTest(self)
self.assertResultLogsEqual([('startTest', self)])
def test_stop(self):
self.assertFalse(self.multiResult.shouldStop)
self.multiResult.stop()
self.assertResultLogsEqual(['stop'])
def test_stopTest(self):
# Calling `stopTest` on a `MultiTestResult` calls `stopTest` on all
# its `TestResult`s.
@ -1176,6 +1218,19 @@ class TestExtendedToOriginalResultDecoratorBase(TestCase):
class TestExtendedToOriginalResultDecorator(
TestExtendedToOriginalResultDecoratorBase):
def test_failfast_py26(self):
self.make_26_result()
self.assertEqual(False, self.converter.failfast)
self.converter.failfast = True
self.assertFalse(safe_hasattr(self.converter.decorated, 'failfast'))
def test_failfast_py27(self):
self.make_27_result()
self.assertEqual(False, self.converter.failfast)
# setting it should write it to the backing result
self.converter.failfast = True
self.assertEqual(True, self.converter.decorated.failfast)
def test_progress_py26(self):
self.make_26_result()
self.converter.progress(1, 2)

View File

@ -9,10 +9,11 @@ import unittest
from testtools import (
ConcurrentTestSuite,
iterate_tests,
PlaceHolder,
TestCase,
)
from testtools.helpers import try_import
from testtools.testsuite import FixtureSuite
from testtools.testsuite import FixtureSuite, iterate_tests, sorted_tests
from testtools.tests.helpers import LoggingResult
FunctionFixture = try_import('fixtures.FunctionFixture')
@ -93,6 +94,35 @@ class TestFixtureSuite(TestCase):
self.assertEqual(['setUp', 1, 2, 'tearDown'], log)
class TestSortedTests(TestCase):
def test_sorts_custom_suites(self):
a = PlaceHolder('a')
b = PlaceHolder('b')
class Subclass(unittest.TestSuite):
def sort_tests(self):
self._tests = sorted_tests(self, True)
input_suite = Subclass([b, a])
suite = sorted_tests(input_suite)
self.assertEqual([a, b], list(iterate_tests(suite)))
self.assertEqual([input_suite], list(iter(suite)))
def test_custom_suite_without_sort_tests_works(self):
a = PlaceHolder('a')
b = PlaceHolder('b')
class Subclass(unittest.TestSuite):pass
input_suite = Subclass([b, a])
suite = sorted_tests(input_suite)
self.assertEqual([b, a], list(iterate_tests(suite)))
self.assertEqual([input_suite], list(iter(suite)))
def test_sorts_simple_suites(self):
a = PlaceHolder('a')
b = PlaceHolder('b')
suite = sorted_tests(unittest.TestSuite([b, a]))
self.assertEqual([a, b], list(iterate_tests(suite)))
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)

View File

@ -6,9 +6,10 @@ __metaclass__ = type
__all__ = [
'ConcurrentTestSuite',
'iterate_tests',
'sorted_tests',
]
from testtools.helpers import try_imports
from testtools.helpers import safe_hasattr, try_imports
Queue = try_imports(['Queue.Queue', 'queue.Queue'])
@ -114,3 +115,40 @@ class FixtureSuite(unittest.TestSuite):
super(FixtureSuite, self).run(result)
finally:
self._fixture.cleanUp()
def sort_tests(self):
self._tests = sorted_tests(self, True)
def _flatten_tests(suite_or_case, unpack_outer=False):
try:
tests = iter(suite_or_case)
except TypeError:
# Not iterable, assume it's a test case.
return [(suite_or_case.id(), suite_or_case)]
if (type(suite_or_case) in (unittest.TestSuite,) or
unpack_outer):
# Plain old test suite (or any others we may add).
result = []
for test in tests:
# Recurse to flatten.
result.extend(_flatten_tests(test))
return result
else:
# Find any old actual test and grab its id.
suite_id = None
tests = iterate_tests(suite_or_case)
for test in tests:
suite_id = test.id()
break
# If it has a sort_tests method, call that.
if safe_hasattr(suite_or_case, 'sort_tests'):
suite_or_case.sort_tests()
return [(suite_id, suite_or_case)]
def sorted_tests(suite_or_case, unpack_outer=False):
"""Sort suite_or_case while preserving non-vanilla TestSuites."""
tests = _flatten_tests(suite_or_case, unpack_outer=unpack_outer)
tests.sort()
return unittest.TestSuite([test for (sort_key, test) in tests])