1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-12 09:18:10 +03:00

Import testtools as well, required for subunit.

This commit is contained in:
Jelmer Vernooij 2010-01-08 02:09:20 +01:00
parent b6b46b4978
commit 28577aae92
28 changed files with 3855 additions and 12 deletions

View File

@ -0,0 +1,58 @@
# Copyright (c) 2008, 2009 Jonathan M. Lange. See LICENSE for details.
"""Extensions to the standard Python unittest library."""
__all__ = [
'clone_test_with_new_id',
'ConcurrentTestSuite',
'ExtendedToOriginalDecorator',
'iterate_tests',
'MultiTestResult',
'TestCase',
'TestResult',
'TextTestResult',
'RunTest',
'skip',
'skipIf',
'skipUnless',
'ThreadsafeForwardingResult',
]
from testtools.matchers import (
Matcher,
)
from testtools.runtest import (
RunTest,
)
from testtools.testcase import (
TestCase,
clone_test_with_new_id,
skip,
skipIf,
skipUnless,
)
from testtools.testresult import (
ExtendedToOriginalDecorator,
MultiTestResult,
TestResult,
TextTestResult,
ThreadsafeForwardingResult,
)
from testtools.testsuite import (
ConcurrentTestSuite,
)
from testtools.utils import iterate_tests
# same format as sys.version_info: "A tuple containing the five components of
# the version number: major, minor, micro, releaselevel, and serial. All
# values except releaselevel are integers; the release level is 'alpha',
# 'beta', 'candidate', or 'final'. The version_info value corresponding to the
# Python version 2.0 is (2, 0, 0, 'final', 0)." Additionally we use a
# releaselevel of 'dev' for unreleased under-development code.
#
# If the releaselevel is 'alpha' then the major/minor/micro components are not
# established at this point, and setup.py will use a version of next-$(revno).
# If the releaselevel is 'final', then the tarball will be major.minor.micro.
# Otherwise it is major.minor.micro~$(revno).
__version__ = (0, 9, 2, 'final', 0)

View File

@ -0,0 +1,91 @@
# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
"""Content - a MIME-like Content object."""
import codecs
from unittest import TestResult
from testtools.content_type import ContentType
from testtools.utils import _b
class Content(object):
"""A MIME-like Content object.
Content objects can be serialised to bytes using the iter_bytes method.
If the Content-Type is recognised by other code, they are welcome to
look for richer contents that mere byte serialisation - for example in
memory object graphs etc. However, such code MUST be prepared to receive
a generic Content object that has been reconstructed from a byte stream.
:ivar content_type: The content type of this Content.
"""
def __init__(self, content_type, get_bytes):
"""Create a ContentType."""
if None in (content_type, get_bytes):
raise ValueError("None not permitted in %r, %r" % (
content_type, get_bytes))
self.content_type = content_type
self._get_bytes = get_bytes
def __eq__(self, other):
return (self.content_type == other.content_type and
''.join(self.iter_bytes()) == ''.join(other.iter_bytes()))
def iter_bytes(self):
"""Iterate over bytestrings of the serialised content."""
return self._get_bytes()
def iter_text(self):
"""Iterate over the text of the serialised content.
This is only valid for text MIME types, and will use ISO-8859-1 if
no charset parameter is present in the MIME type. (This is somewhat
arbitrary, but consistent with RFC2617 3.7.1).
:raises: ValueError If the content type is not text/*.
"""
if self.content_type.type != "text":
raise ValueError("Not a text type %r" % self.content_type)
return self._iter_text()
def _iter_text(self):
"""Worker for iter_text - does the decoding."""
encoding = self.content_type.parameters.get('charset', 'ISO-8859-1')
try:
# 2.5+
decoder = codecs.getincrementaldecoder(encoding)()
for bytes in self.iter_bytes():
yield decoder.decode(bytes)
final = decoder.decode(_b(''), True)
if final:
yield final
except AttributeError:
# < 2.5
bytes = ''.join(self.iter_bytes())
yield bytes.decode(encoding)
def __repr__(self):
return "<Content type=%r, value=%r>" % (
self.content_type, ''.join(self.iter_bytes()))
class TracebackContent(Content):
"""Content object for tracebacks.
This adapts an exc_info tuple to the Content interface.
text/x-traceback;language=python is used for the mime type, in order to
provide room for other languages to format their tracebacks differently.
"""
def __init__(self, err, test):
"""Create a TracebackContent for err."""
if err is None:
raise ValueError("err may not be None")
content_type = ContentType('text', 'x-traceback',
{"language": "python", "charset": "utf8"})
self._result = TestResult()
value = self._result._exc_info_to_string(err, test)
super(TracebackContent, self).__init__(
content_type, lambda: [value.encode("utf8")])

View File

@ -0,0 +1,30 @@
# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
"""ContentType - a MIME Content Type."""
class ContentType(object):
"""A content type from http://www.iana.org/assignments/media-types/
:ivar type: The primary type, e.g. "text" or "application"
:ivar subtype: The subtype, e.g. "plain" or "octet-stream"
:ivar parameters: A dict of additional parameters specific to the
content type.
"""
def __init__(self, primary_type, sub_type, parameters=None):
"""Create a ContentType."""
if None in (primary_type, sub_type):
raise ValueError("None not permitted in %r, %r" % (
primary_type, sub_type))
self.type = primary_type
self.subtype = sub_type
self.parameters = parameters or {}
def __eq__(self, other):
if type(other) != ContentType:
return False
return self.__dict__ == other.__dict__
def __repr__(self):
return "%s/%s params=%s" % (self.type, self.subtype, self.parameters)

View File

@ -0,0 +1,169 @@
# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
"""Matchers, a way to express complex assertions outside the testcase.
Inspired by 'hamcrest'.
Matcher provides the abstract API that all matchers need to implement.
Bundled matchers are listed in __all__: a list can be obtained by running
$ python -c 'import testtools.matchers; print testtools.matchers.__all__'
"""
__metaclass__ = type
__all__ = [
'DocTestMatches',
'Equals',
'MatchesAny',
]
import doctest
class Matcher:
"""A pattern matcher.
A Matcher must implement match and __str__ to be used by
testtools.TestCase.assertThat. Matcher.match(thing) returns None when
thing is completely matched, and a Mismatch object otherwise.
Matchers can be useful outside of test cases, as they are simply a
pattern matching language expressed as objects.
testtools.matchers is inspired by hamcrest, but is pythonic rather than
a Java transcription.
"""
def match(self, something):
"""Return None if this matcher matches something, a Mismatch otherwise.
"""
raise NotImplementedError(self.match)
def __str__(self):
"""Get a sensible human representation of the matcher.
This should include the parameters given to the matcher and any
state that would affect the matches operation.
"""
raise NotImplementedError(self.__str__)
class Mismatch:
"""An object describing a mismatch detected by a Matcher."""
def describe(self):
"""Describe the mismatch.
This should be either a human-readable string or castable to a string.
"""
raise NotImplementedError(self.describe_difference)
class DocTestMatches:
"""See if a string matches a doctest example."""
def __init__(self, example, flags=0):
"""Create a DocTestMatches to match example.
:param example: The example to match e.g. 'foo bar baz'
:param flags: doctest comparison flags to match on. e.g.
doctest.ELLIPSIS.
"""
if not example.endswith('\n'):
example += '\n'
self.want = example # required variable name by doctest.
self.flags = flags
self._checker = doctest.OutputChecker()
def __str__(self):
if self.flags:
flagstr = ", flags=%d" % self.flags
else:
flagstr = ""
return 'DocTestMatches(%r%s)' % (self.want, flagstr)
def _with_nl(self, actual):
result = str(actual)
if not result.endswith('\n'):
result += '\n'
return result
def match(self, actual):
with_nl = self._with_nl(actual)
if self._checker.check_output(self.want, with_nl, self.flags):
return None
return DocTestMismatch(self, with_nl)
def _describe_difference(self, with_nl):
return self._checker.output_difference(self, with_nl, self.flags)
class DocTestMismatch:
"""Mismatch object for DocTestMatches."""
def __init__(self, matcher, with_nl):
self.matcher = matcher
self.with_nl = with_nl
def describe(self):
return self.matcher._describe_difference(self.with_nl)
class Equals:
"""Matches if the items are equal."""
def __init__(self, expected):
self.expected = expected
def match(self, other):
if self.expected == other:
return None
return EqualsMismatch(self.expected, other)
def __str__(self):
return "Equals(%r)" % self.expected
class EqualsMismatch:
"""Two things differed."""
def __init__(self, expected, other):
self.expected = expected
self.other = other
def describe(self):
return "%r != %r" % (self.expected, self.other)
class MatchesAny:
"""Matches if any of the matchers it is created with match."""
def __init__(self, *matchers):
self.matchers = matchers
def match(self, matchee):
results = []
for matcher in self.matchers:
mismatch = matcher.match(matchee)
if mismatch is None:
return None
results.append(mismatch)
return MismatchesAll(results)
def __str__(self):
return "MatchesAny(%s)" % ', '.join([
str(matcher) for matcher in self.matchers])
class MismatchesAll:
"""A mismatch with many child mismatches."""
def __init__(self, mismatches):
self.mismatches = mismatches
def describe(self):
descriptions = ["Differences: ["]
for mismatch in self.mismatches:
descriptions.append(mismatch.describe())
descriptions.append("]\n")
return '\n'.join(descriptions)

View File

@ -0,0 +1,39 @@
# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
"""python -m testtools.run testspec [testspec...]
Run some tests with the testtools extended API.
For instance, to run the testtools test suite.
$ python -m testtools.run testtools.tests.test_suite
"""
import sys
from testtools.tests import test_suite
from testtools import TextTestResult
class TestToolsTestRunner(object):
""" A thunk object to support unittest.TestProgram."""
def run(self, test):
"Run the given test case or test suite."
result = TextTestResult(sys.stdout)
result.startTestRun()
try:
return test.run(result)
finally:
result.stopTestRun()
if __name__ == '__main__':
import optparse
from unittest import TestProgram
parser = optparse.OptionParser(__doc__)
args = parser.parse_args()[1]
if not args:
parser.error("No testspecs given.")
runner = TestToolsTestRunner()
program = TestProgram(module=None, argv=[sys.argv[0]] + args,
testRunner=runner)

View File

@ -0,0 +1,142 @@
# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
"""Individual test case execution."""
__metaclass__ = type
__all__ = [
'RunTest',
]
import sys
from testtools.testresult import ExtendedToOriginalDecorator
class RunTest:
"""An object to run a test.
RunTest objects are used to implement the internal logic involved in
running a test. TestCase.__init__ stores _RunTest as the class of RunTest
to execute. Passing the runTest= parameter to TestCase.__init__ allows a
different RunTest class to be used to execute the test.
Subclassing or replacing RunTest can be useful to add functionality to the
way that tests are run in a given project.
:ivar case: The test case that is to be run.
:ivar result: The result object a case is reporting to.
:ivar handlers: A list of (ExceptionClass->handler code) for exceptions
that should be caught if raised from the user code. Exceptions that
are caught are checked against this list in first to last order.
There is a catchall of Exception at the end of the list, so to add
a new exception to the list, insert it at the front (which ensures that
it will be checked before any existing base classes in the list. If you
add multiple exceptions some of which are subclasses of each other, add
the most specific exceptions last (so they come before their parent
classes in the list).
:ivar exception_caught: An object returned when _run_user catches an
exception.
"""
def __init__(self, case, handlers=None):
"""Create a RunTest to run a case.
:param case: A testtools.TestCase test case object.
:param handlers: Exception handlers for this RunTest. These are stored
in self.handlers and can be modified later if needed.
"""
self.case = case
self.handlers = handlers or []
self.exception_caught = object()
def run(self, result=None):
"""Run self.case reporting activity to result.
:param result: Optional testtools.TestResult to report activity to.
:return: The result object the test was run against.
"""
if result is None:
actual_result = self.case.defaultTestResult()
actual_result.startTestRun()
else:
actual_result = result
try:
return self._run_one(actual_result)
finally:
if result is None:
actual_result.stopTestRun()
def _run_one(self, result):
"""Run one test reporting to result.
:param result: A testtools.TestResult to report activity to.
This result object is decorated with an ExtendedToOriginalDecorator
to ensure that the latest TestResult API can be used with
confidence by client code.
:return: The result object the test was run against.
"""
return self._run_prepared_result(ExtendedToOriginalDecorator(result))
def _run_prepared_result(self, result):
"""Run one test reporting to result.
:param result: A testtools.TestResult to report activity to.
:return: The result object the test was run against.
"""
result.startTest(self.case)
self.result = result
try:
self._run_core()
finally:
result.stopTest(self.case)
return result
def _run_core(self):
"""Run the user supplied test code."""
if self.exception_caught == self._run_user(self.case._run_setup,
self.result):
# Don't run the test method if we failed getting here.
self.case._runCleanups(self.result)
return
# Run everything from here on in. If any of the methods raise an
# exception we'll have failed.
failed = False
try:
if self.exception_caught == self._run_user(
self.case._run_test_method, self.result):
failed = True
finally:
try:
if self.exception_caught == self._run_user(
self.case._run_teardown, self.result):
failed = True
finally:
try:
if not self._run_user(
self.case._runCleanups, self.result):
failed = True
finally:
if not failed:
self.result.addSuccess(self.case,
details=self.case.getDetails())
def _run_user(self, fn, *args):
"""Run a user supplied function.
Exceptions are processed by self.handlers.
"""
try:
return fn(*args)
except KeyboardInterrupt:
raise
except Exception:
# Note that bare exceptions are not caught, so raised strings will
# escape: but they are deprecated anyway.
exc_info = sys.exc_info()
e = exc_info[1]
for exc_class, handler in self.handlers:
self.case.onException(exc_info)
if isinstance(e, exc_class):
handler(self.case, self.result, e)
return self.exception_caught
raise e

View File

@ -0,0 +1,444 @@
# Copyright (c) 2008, 2009 Jonathan M. Lange. See LICENSE for details.
"""Test case related stuff."""
__metaclass__ = type
__all__ = [
'clone_test_with_new_id',
'TestCase',
'skip',
'skipIf',
'skipUnless',
]
import copy
try:
from functools import wraps
except ImportError:
wraps = None
import itertools
import sys
import types
import unittest
from testtools import content
from testtools.runtest import RunTest
from testtools.testresult import TestResult
from testtools.utils import advance_iterator
try:
# Try to use the python2.7 SkipTest exception for signalling skips.
from unittest.case import SkipTest as TestSkipped
except ImportError:
class TestSkipped(Exception):
"""Raised within TestCase.run() when a test is skipped."""
try:
# Try to use the same exceptions python 2.7 does.
from unittest.case import _ExpectedFailure, _UnexpectedSuccess
except ImportError:
# Oops, not available, make our own.
class _UnexpectedSuccess(Exception):
"""An unexpected success was raised.
Note that this exception is private plumbing in testtools' testcase
module.
"""
class _ExpectedFailure(Exception):
"""An expected failure occured.
Note that this exception is private plumbing in testtools' testcase
module.
"""
class TestCase(unittest.TestCase):
"""Extensions to the basic TestCase.
:ivar exception_handlers: Exceptions to catch from setUp, runTest and
tearDown. This list is able to be modified at any time and consists of
(exception_class, handler(case, result, exception_value)) pairs.
"""
skipException = TestSkipped
def __init__(self, *args, **kwargs):
"""Construct a TestCase.
:param testMethod: The name of the method to run.
:param runTest: Optional class to use to execute the test. If not
supplied testtools.runtest.RunTest is used. The instance to be
used is created when run() is invoked, so will be fresh each time.
"""
unittest.TestCase.__init__(self, *args, **kwargs)
self._cleanups = []
self._unique_id_gen = itertools.count(1)
self.__setup_called = False
self.__teardown_called = False
self.__details = {}
self.__RunTest = kwargs.get('runTest', RunTest)
self.__exception_handlers = []
self.exception_handlers = [
(self.skipException, self._report_skip),
(self.failureException, self._report_failure),
(_ExpectedFailure, self._report_expected_failure),
(_UnexpectedSuccess, self._report_unexpected_success),
(Exception, self._report_error),
]
def __eq__(self, other):
eq = getattr(unittest.TestCase, '__eq__', None)
if eq is not None and not unittest.TestCase.__eq__(self, other):
return False
return self.__dict__ == other.__dict__
def __repr__(self):
# We add id to the repr because it makes testing testtools easier.
return "<%s id=0x%0x>" % (self.id(), id(self))
def addDetail(self, name, content_object):
"""Add a detail to be reported with this test's outcome.
For more details see pydoc testtools.TestResult.
:param name: The name to give this detail.
:param content_object: The content object for this detail. See
testtools.content for more detail.
"""
self.__details[name] = content_object
def getDetails(self):
"""Get the details dict that will be reported with this test's outcome.
For more details see pydoc testtools.TestResult.
"""
return self.__details
def shortDescription(self):
return self.id()
def skip(self, reason):
"""Cause this test to be skipped.
This raises self.skipException(reason). skipException is raised
to permit a skip to be triggered at any point (during setUp or the
testMethod itself). The run() method catches skipException and
translates that into a call to the result objects addSkip method.
:param reason: The reason why the test is being skipped. This must
support being cast into a unicode string for reporting.
"""
raise self.skipException(reason)
def _formatTypes(self, classOrIterable):
"""Format a class or a bunch of classes for display in an error."""
className = getattr(classOrIterable, '__name__', None)
if className is None:
className = ', '.join(klass.__name__ for klass in classOrIterable)
return className
def _runCleanups(self, result):
"""Run the cleanups that have been added with addCleanup.
See the docstring for addCleanup for more information.
Returns True if all cleanups ran without error, False otherwise.
"""
ok = True
while self._cleanups:
function, arguments, keywordArguments = self._cleanups.pop()
try:
function(*arguments, **keywordArguments)
except KeyboardInterrupt:
raise
except:
self._report_error(self, result, None)
ok = False
return ok
def addCleanup(self, function, *arguments, **keywordArguments):
"""Add a cleanup function to be called after tearDown.
Functions added with addCleanup will be called in reverse order of
adding after the test method and before tearDown.
If a function added with addCleanup raises an exception, the error
will be recorded as a test error, and the next cleanup will then be
run.
Cleanup functions are always called before a test finishes running,
even if setUp is aborted by an exception.
"""
self._cleanups.append((function, arguments, keywordArguments))
def addOnException(self, handler):
"""Add a handler to be called when an exception occurs in test code.
This handler cannot affect what result methods are called, and is
called before any outcome is called on the result object. An example
use for it is to add some diagnostic state to the test details dict
which is expensive to calculate and not interesting for reporting in
the success case.
Handlers are called before the outcome (such as addFailure) that
the exception has caused.
Handlers are called in first-added, first-called order, and if they
raise an exception, that will propogate out of the test running
machinery, halting test processing. As a result, do not call code that
may unreasonably fail.
"""
self.__exception_handlers.append(handler)
def _add_reason(self, reason):
self.addDetail('reason', content.Content(
content.ContentType('text', 'plain'),
lambda: [reason.encode('utf8')]))
def assertIn(self, needle, haystack):
"""Assert that needle is in haystack."""
self.assertTrue(
needle in haystack, '%r not in %r' % (needle, haystack))
def assertIs(self, expected, observed):
"""Assert that `expected` is `observed`."""
self.assertTrue(
expected is observed, '%r is not %r' % (expected, observed))
def assertIsNot(self, expected, observed):
"""Assert that `expected` is not `observed`."""
self.assertTrue(
expected is not observed, '%r is %r' % (expected, observed))
def assertNotIn(self, needle, haystack):
"""Assert that needle is not in haystack."""
self.assertTrue(
needle not in haystack, '%r in %r' % (needle, haystack))
def assertIsInstance(self, obj, klass):
self.assertTrue(
isinstance(obj, klass),
'%r is not an instance of %s' % (obj, self._formatTypes(klass)))
def assertRaises(self, excClass, callableObj, *args, **kwargs):
"""Fail unless an exception of class excClass is thrown
by callableObj when invoked with arguments args and keyword
arguments kwargs. If a different type of exception is
thrown, it will not be caught, and the test case will be
deemed to have suffered an error, exactly as for an
unexpected exception.
"""
try:
ret = callableObj(*args, **kwargs)
except excClass:
return sys.exc_info()[1]
else:
excName = self._formatTypes(excClass)
self.fail("%s not raised, %r returned instead." % (excName, ret))
failUnlessRaises = assertRaises
def assertThat(self, matchee, matcher):
"""Assert that matchee is matched by matcher.
:param matchee: An object to match with matcher.
:param matcher: An object meeting the testtools.Matcher protocol.
:raises self.failureException: When matcher does not match thing.
"""
mismatch = matcher.match(matchee)
if not mismatch:
return
self.fail('Match failed. Matchee: "%s"\nMatcher: %s\nDifference: %s\n'
% (matchee, matcher, mismatch.describe()))
def defaultTestResult(self):
return TestResult()
def expectFailure(self, reason, predicate, *args, **kwargs):
"""Check that a test fails in a particular way.
If the test fails in the expected way, a KnownFailure is caused. If it
succeeds an UnexpectedSuccess is caused.
The expected use of expectFailure is as a barrier at the point in a
test where the test would fail. For example:
>>> def test_foo(self):
>>> self.expectFailure("1 should be 0", self.assertNotEqual, 1, 0)
>>> self.assertEqual(1, 0)
If in the future 1 were to equal 0, the expectFailure call can simply
be removed. This separation preserves the original intent of the test
while it is in the expectFailure mode.
"""
self._add_reason(reason)
try:
predicate(*args, **kwargs)
except self.failureException:
exc_info = sys.exc_info()
self.addDetail('traceback',
content.TracebackContent(exc_info, self))
raise _ExpectedFailure(exc_info)
else:
raise _UnexpectedSuccess(reason)
def getUniqueInteger(self):
"""Get an integer unique to this test.
Returns an integer that is guaranteed to be unique to this instance.
Use this when you need an arbitrary integer in your test, or as a
helper for custom anonymous factory methods.
"""
return advance_iterator(self._unique_id_gen)
def getUniqueString(self, prefix=None):
"""Get a string unique to this test.
Returns a string that is guaranteed to be unique to this instance. Use
this when you need an arbitrary string in your test, or as a helper
for custom anonymous factory methods.
:param prefix: The prefix of the string. If not provided, defaults
to the id of the tests.
:return: A bytestring of '<prefix>-<unique_int>'.
"""
if prefix is None:
prefix = self.id()
return '%s-%d' % (prefix, self.getUniqueInteger())
def onException(self, exc_info):
"""Called when an exception propogates from test code.
:seealso addOnException:
"""
for handler in self.__exception_handlers:
handler(exc_info)
@staticmethod
def _report_error(self, result, err):
self._report_traceback()
result.addError(self, details=self.getDetails())
@staticmethod
def _report_expected_failure(self, result, err):
result.addExpectedFailure(self, details=self.getDetails())
@staticmethod
def _report_failure(self, result, err):
self._report_traceback()
result.addFailure(self, details=self.getDetails())
@staticmethod
def _report_skip(self, result, err):
if err.args:
reason = err.args[0]
else:
reason = "no reason given."
self._add_reason(reason)
result.addSkip(self, details=self.getDetails())
def _report_traceback(self):
self.addDetail('traceback',
content.TracebackContent(sys.exc_info(), self))
@staticmethod
def _report_unexpected_success(self, result, err):
result.addUnexpectedSuccess(self, details=self.getDetails())
def run(self, result=None):
return self.__RunTest(self, self.exception_handlers).run(result)
def _run_setup(self, result):
"""Run the setUp function for this test.
:param result: A testtools.TestResult to report activity to.
:raises ValueError: If the base class setUp is not called, a
ValueError is raised.
"""
self.setUp()
if not self.__setup_called:
raise ValueError("setUp was not called")
def _run_teardown(self, result):
"""Run the tearDown function for this test.
:param result: A testtools.TestResult to report activity to.
:raises ValueError: If the base class tearDown is not called, a
ValueError is raised.
"""
self.tearDown()
if not self.__teardown_called:
raise ValueError("teardown was not called")
def _run_test_method(self, result):
"""Run the test method for this test.
:param result: A testtools.TestResult to report activity to.
:return: None.
"""
absent_attr = object()
# Python 2.5+
method_name = getattr(self, '_testMethodName', absent_attr)
if method_name is absent_attr:
# Python 2.4
method_name = getattr(self, '_TestCase__testMethodName')
testMethod = getattr(self, method_name)
testMethod()
def setUp(self):
unittest.TestCase.setUp(self)
self.__setup_called = True
def tearDown(self):
unittest.TestCase.tearDown(self)
self.__teardown_called = True
# Python 2.4 did not know how to deep copy functions.
if types.FunctionType not in copy._deepcopy_dispatch:
copy._deepcopy_dispatch[types.FunctionType] = copy._deepcopy_atomic
def clone_test_with_new_id(test, new_id):
"""Copy a TestCase, and give the copied test a new id."""
newTest = copy.deepcopy(test)
newTest.id = lambda: new_id
return newTest
def skip(reason):
"""A decorator to skip unit tests.
This is just syntactic sugar so users don't have to change any of their
unit tests in order to migrate to python 2.7, which provides the
@unittest.skip decorator.
"""
def decorator(test_item):
if wraps is not None:
@wraps(test_item)
def skip_wrapper(*args, **kwargs):
raise TestCase.skipException(reason)
else:
def skip_wrapper(test_item):
test_item.skip(reason)
return skip_wrapper
return decorator
def skipIf(condition, reason):
"""Skip a test if the condition is true."""
if condition:
return skip(reason)
def _id(obj):
return obj
return _id
def skipUnless(condition, reason):
"""Skip a test unless the condition is true."""
if not condition:
return skip(reason)
def _id(obj):
return obj
return _id

View File

@ -0,0 +1,19 @@
# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
"""Test result objects."""
__all__ = [
'ExtendedToOriginalDecorator',
'MultiTestResult',
'TestResult',
'TextTestResult',
'ThreadsafeForwardingResult',
]
from real import (
ExtendedToOriginalDecorator,
MultiTestResult,
TestResult,
TextTestResult,
ThreadsafeForwardingResult,
)

View File

@ -0,0 +1,95 @@
# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
"""Doubles of test result objects, useful for testing unittest code."""
__all__ = [
'Python26TestResult',
'Python27TestResult',
'ExtendedTestResult',
]
class LoggingBase(object):
"""Basic support for logging of results."""
def __init__(self):
self._events = []
self.shouldStop = False
class Python26TestResult(LoggingBase):
"""A precisely python 2.6 like test result, that logs."""
def addError(self, test, err):
self._events.append(('addError', test, err))
def addFailure(self, test, err):
self._events.append(('addFailure', test, err))
def addSuccess(self, test):
self._events.append(('addSuccess', test))
def startTest(self, test):
self._events.append(('startTest', test))
def stop(self):
self.shouldStop = True
def stopTest(self, test):
self._events.append(('stopTest', test))
class Python27TestResult(Python26TestResult):
"""A precisely python 2.7 like test result, that logs."""
def addExpectedFailure(self, test, err):
self._events.append(('addExpectedFailure', test, err))
def addSkip(self, test, reason):
self._events.append(('addSkip', test, reason))
def addUnexpectedSuccess(self, test):
self._events.append(('addUnexpectedSuccess', test))
def startTestRun(self):
self._events.append(('startTestRun',))
def stopTestRun(self):
self._events.append(('stopTestRun',))
class ExtendedTestResult(Python27TestResult):
"""A test result like the proposed extended unittest result API."""
def addError(self, test, err=None, details=None):
self._events.append(('addError', test, err or details))
def addFailure(self, test, err=None, details=None):
self._events.append(('addFailure', test, err or details))
def addExpectedFailure(self, test, err=None, details=None):
self._events.append(('addExpectedFailure', test, err or details))
def addSkip(self, test, reason=None, details=None):
self._events.append(('addSkip', test, reason or details))
def addSuccess(self, test, details=None):
if details:
self._events.append(('addSuccess', test, details))
else:
self._events.append(('addSuccess', test))
def addUnexpectedSuccess(self, test, details=None):
if details is not None:
self._events.append(('addUnexpectedSuccess', test, details))
else:
self._events.append(('addUnexpectedSuccess', test))
def progress(self, offset, whence):
self._events.append(('progress', offset, whence))
def tags(self, new_tags, gone_tags):
self._events.append(('tags', new_tags, gone_tags))
def time(self, time):
self._events.append(('time', time))

View File

@ -0,0 +1,540 @@
# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details.
"""Test results and related things."""
__metaclass__ = type
__all__ = [
'ExtendedToOriginalDecorator',
'MultiTestResult',
'TestResult',
'ThreadsafeForwardingResult',
]
import datetime
import unittest
class TestResult(unittest.TestResult):
"""Subclass of unittest.TestResult extending the protocol for flexability.
This test result supports an experimental protocol for providing additional
data to in test outcomes. All the outcome methods take an optional dict
'details'. If supplied any other detail parameters like 'err' or 'reason'
should not be provided. The details dict is a mapping from names to
MIME content objects (see testtools.content). This permits attaching
tracebacks, log files, or even large objects like databases that were
part of the test fixture. Until this API is accepted into upstream
Python it is considered experimental: it may be replaced at any point
by a newer version more in line with upstream Python. Compatibility would
be aimed for in this case, but may not be possible.
:ivar skip_reasons: A dict of skip-reasons -> list of tests. See addSkip.
"""
def __init__(self):
super(TestResult, self).__init__()
self.skip_reasons = {}
self.__now = None
# -- Start: As per python 2.7 --
self.expectedFailures = []
self.unexpectedSuccesses = []
# -- End: As per python 2.7 --
def addExpectedFailure(self, test, err=None, details=None):
"""Called when a test has failed in an expected manner.
Like with addSuccess and addError, testStopped should still be called.
:param test: The test that has been skipped.
:param err: The exc_info of the error that was raised.
:return: None
"""
# This is the python 2.7 implementation
self.expectedFailures.append(
(test, self._err_details_to_string(test, err, details)))
def addError(self, test, err=None, details=None):
"""Called when an error has occurred. 'err' is a tuple of values as
returned by sys.exc_info().
:param details: Alternative way to supply details about the outcome.
see the class docstring for more information.
"""
self.errors.append((test,
self._err_details_to_string(test, err, details)))
def addFailure(self, test, err=None, details=None):
"""Called when an error has occurred. 'err' is a tuple of values as
returned by sys.exc_info().
:param details: Alternative way to supply details about the outcome.
see the class docstring for more information.
"""
self.failures.append((test,
self._err_details_to_string(test, err, details)))
def addSkip(self, test, reason=None, details=None):
"""Called when a test has been skipped rather than running.
Like with addSuccess and addError, testStopped should still be called.
This must be called by the TestCase. 'addError' and 'addFailure' will
not call addSkip, since they have no assumptions about the kind of
errors that a test can raise.
:param test: The test that has been skipped.
:param reason: The reason for the test being skipped. For instance,
u"pyGL is not available".
:param details: Alternative way to supply details about the outcome.
see the class docstring for more information.
:return: None
"""
if reason is None:
reason = details.get('reason')
if reason is None:
reason = 'No reason given'
else:
reason = ''.join(reason.iter_text())
skip_list = self.skip_reasons.setdefault(reason, [])
skip_list.append(test)
def addSuccess(self, test, details=None):
"""Called when a test succeeded."""
def addUnexpectedSuccess(self, test, details=None):
"""Called when a test was expected to fail, but succeed."""
self.unexpectedSuccesses.append(test)
def _err_details_to_string(self, test, err=None, details=None):
"""Convert an error in exc_info form or a contents dict to a string."""
if err is not None:
return self._exc_info_to_string(err, test)
return _details_to_str(details)
def _now(self):
"""Return the current 'test time'.
If the time() method has not been called, this is equivalent to
datetime.now(), otherwise its the last supplied datestamp given to the
time() method.
"""
if self.__now is None:
return datetime.datetime.now()
else:
return self.__now
def startTestRun(self):
"""Called before a test run starts.
New in python 2.7
"""
def stopTestRun(self):
"""Called after a test run completes
New in python 2.7
"""
def time(self, a_datetime):
"""Provide a timestamp to represent the current time.
This is useful when test activity is time delayed, or happening
concurrently and getting the system time between API calls will not
accurately represent the duration of tests (or the whole run).
Calling time() sets the datetime used by the TestResult object.
Time is permitted to go backwards when using this call.
:param a_datetime: A datetime.datetime object with TZ information or
None to reset the TestResult to gathering time from the system.
"""
self.__now = a_datetime
def done(self):
"""Called when the test runner is done.
deprecated in favour of stopTestRun.
"""
class MultiTestResult(TestResult):
"""A test result that dispatches to many test results."""
def __init__(self, *results):
TestResult.__init__(self)
self._results = map(ExtendedToOriginalDecorator, results)
def _dispatch(self, message, *args, **kwargs):
for result in self._results:
getattr(result, message)(*args, **kwargs)
def startTest(self, test):
self._dispatch('startTest', test)
def stopTest(self, test):
self._dispatch('stopTest', test)
def addError(self, test, error=None, details=None):
self._dispatch('addError', test, error, details=details)
def addExpectedFailure(self, test, err=None, details=None):
self._dispatch('addExpectedFailure', test, err, details=details)
def addFailure(self, test, err=None, details=None):
self._dispatch('addFailure', test, err, details=details)
def addSkip(self, test, reason=None, details=None):
self._dispatch('addSkip', test, reason, details=details)
def addSuccess(self, test, details=None):
self._dispatch('addSuccess', test, details=details)
def addUnexpectedSuccess(self, test, details=None):
self._dispatch('addUnexpectedSuccess', test, details=details)
def startTestRun(self):
self._dispatch('startTestRun')
def stopTestRun(self):
self._dispatch('stopTestRun')
def done(self):
self._dispatch('done')
class TextTestResult(TestResult):
"""A TestResult which outputs activity to a text stream."""
def __init__(self, stream):
"""Construct a TextTestResult writing to stream."""
super(TextTestResult, self).__init__()
self.stream = stream
self.sep1 = '=' * 70 + '\n'
self.sep2 = '-' * 70 + '\n'
def _delta_to_float(self, a_timedelta):
return (a_timedelta.days * 86400.0 + a_timedelta.seconds +
a_timedelta.microseconds / 1000000.0)
def _show_list(self, label, error_list):
for test, output in error_list:
self.stream.write(self.sep1)
self.stream.write("%s: %s\n" % (label, test.id()))
self.stream.write(self.sep2)
self.stream.write(output)
def startTestRun(self):
super(TextTestResult, self).startTestRun()
self.__start = self._now()
self.stream.write("Tests running...\n")
def stopTestRun(self):
if self.testsRun != 1:
plural = 's'
else:
plural = ''
stop = self._now()
self._show_list('ERROR', self.errors)
self._show_list('FAIL', self.failures)
self.stream.write("Ran %d test%s in %.3fs\n\n" %
(self.testsRun, plural,
self._delta_to_float(stop - self.__start)))
if self.wasSuccessful():
self.stream.write("OK\n")
else:
self.stream.write("FAILED (")
details = []
details.append("failures=%d" % (
len(self.failures) + len(self.errors)))
self.stream.write(", ".join(details))
self.stream.write(")\n")
super(TextTestResult, self).stopTestRun()
class ThreadsafeForwardingResult(TestResult):
"""A TestResult which ensures the target does not receive mixed up calls.
This is used when receiving test results from multiple sources, and batches
up all the activity for a single test into a thread-safe batch where all
other ThreadsafeForwardingResult objects sharing the same semaphore will be
locked out.
Typical use of ThreadsafeForwardingResult involves creating one
ThreadsafeForwardingResult per thread in a ConcurrentTestSuite. These
forward to the TestResult that the ConcurrentTestSuite run method was
called with.
target.done() is called once for each ThreadsafeForwardingResult that
forwards to the same target. If the target's done() takes special action,
care should be taken to accommodate this.
"""
def __init__(self, target, semaphore):
"""Create a ThreadsafeForwardingResult forwarding to target.
:param target: A TestResult.
:param semaphore: A threading.Semaphore with limit 1.
"""
TestResult.__init__(self)
self.result = ExtendedToOriginalDecorator(target)
self.semaphore = semaphore
def addError(self, test, err=None, details=None):
self.semaphore.acquire()
try:
self.result.startTest(test)
self.result.addError(test, err, details=details)
self.result.stopTest(test)
finally:
self.semaphore.release()
def addExpectedFailure(self, test, err=None, details=None):
self.semaphore.acquire()
try:
self.result.startTest(test)
self.result.addExpectedFailure(test, err, details=details)
self.result.stopTest(test)
finally:
self.semaphore.release()
def addFailure(self, test, err=None, details=None):
self.semaphore.acquire()
try:
self.result.startTest(test)
self.result.addFailure(test, err, details=details)
self.result.stopTest(test)
finally:
self.semaphore.release()
def addSkip(self, test, reason=None, details=None):
self.semaphore.acquire()
try:
self.result.startTest(test)
self.result.addSkip(test, reason, details=details)
self.result.stopTest(test)
finally:
self.semaphore.release()
def addSuccess(self, test, details=None):
self.semaphore.acquire()
try:
self.result.startTest(test)
self.result.addSuccess(test, details=details)
self.result.stopTest(test)
finally:
self.semaphore.release()
def addUnexpectedSuccess(self, test, details=None):
self.semaphore.acquire()
try:
self.result.startTest(test)
self.result.addUnexpectedSuccess(test, details=details)
self.result.stopTest(test)
finally:
self.semaphore.release()
def startTestRun(self):
self.semaphore.acquire()
try:
self.result.startTestRun()
finally:
self.semaphore.release()
def stopTestRun(self):
self.semaphore.acquire()
try:
self.result.stopTestRun()
finally:
self.semaphore.release()
def done(self):
self.semaphore.acquire()
try:
self.result.done()
finally:
self.semaphore.release()
class ExtendedToOriginalDecorator(object):
"""Permit new TestResult API code to degrade gracefully with old results.
This decorates an existing TestResult and converts missing outcomes
such as addSkip to older outcomes such as addSuccess. It also supports
the extended details protocol. In all cases the most recent protocol
is attempted first, and fallbacks only occur when the decorated result
does not support the newer style of calling.
"""
def __init__(self, decorated):
self.decorated = decorated
def __getattr__(self, name):
return getattr(self.decorated, name)
def addError(self, test, err=None, details=None):
self._check_args(err, details)
if details is not None:
try:
return self.decorated.addError(test, details=details)
except TypeError:
# have to convert
err = self._details_to_exc_info(details)
return self.decorated.addError(test, err)
def addExpectedFailure(self, test, err=None, details=None):
self._check_args(err, details)
addExpectedFailure = getattr(
self.decorated, 'addExpectedFailure', None)
if addExpectedFailure is None:
return self.addSuccess(test)
if details is not None:
try:
return addExpectedFailure(test, details=details)
except TypeError:
# have to convert
err = self._details_to_exc_info(details)
return addExpectedFailure(test, err)
def addFailure(self, test, err=None, details=None):
self._check_args(err, details)
if details is not None:
try:
return self.decorated.addFailure(test, details=details)
except TypeError:
# have to convert
err = self._details_to_exc_info(details)
return self.decorated.addFailure(test, err)
def addSkip(self, test, reason=None, details=None):
self._check_args(reason, details)
addSkip = getattr(self.decorated, 'addSkip', None)
if addSkip is None:
return self.decorated.addSuccess(test)
if details is not None:
try:
return addSkip(test, details=details)
except TypeError:
# have to convert
reason = _details_to_str(details)
return addSkip(test, reason)
def addUnexpectedSuccess(self, test, details=None):
outcome = getattr(self.decorated, 'addUnexpectedSuccess', None)
if outcome is None:
return self.decorated.addSuccess(test)
if details is not None:
try:
return outcome(test, details=details)
except TypeError:
pass
return outcome(test)
def addSuccess(self, test, details=None):
if details is not None:
try:
return self.decorated.addSuccess(test, details=details)
except TypeError:
pass
return self.decorated.addSuccess(test)
def _check_args(self, err, details):
param_count = 0
if err is not None:
param_count += 1
if details is not None:
param_count += 1
if param_count != 1:
raise ValueError("Must pass only one of err '%s' and details '%s"
% (err, details))
def _details_to_exc_info(self, details):
"""Convert a details dict to an exc_info tuple."""
return (_StringException,
_StringException(_details_to_str(details)), None)
def done(self):
try:
return self.decorated.done()
except AttributeError:
return
def progress(self, offset, whence):
method = getattr(self.decorated, 'progress', None)
if method is None:
return
return method(offset, whence)
@property
def shouldStop(self):
return self.decorated.shouldStop
def startTest(self, test):
return self.decorated.startTest(test)
def startTestRun(self):
try:
return self.decorated.startTestRun()
except AttributeError:
return
def stop(self):
return self.decorated.stop()
def stopTest(self, test):
return self.decorated.stopTest(test)
def stopTestRun(self):
try:
return self.decorated.stopTestRun()
except AttributeError:
return
def tags(self, new_tags, gone_tags):
method = getattr(self.decorated, 'tags', None)
if method is None:
return
return method(new_tags, gone_tags)
def time(self, a_datetime):
method = getattr(self.decorated, 'time', None)
if method is None:
return
return method(a_datetime)
def wasSuccessful(self):
return self.decorated.wasSuccessful()
class _StringException(Exception):
"""An exception made from an arbitrary string."""
def __hash__(self):
return id(self)
def __str__(self):
"""Stringify better than 2.x's default behaviour of ascii encoding."""
return self.args[0]
def __eq__(self, other):
try:
return self.args == other.args
except AttributeError:
return False
def _details_to_str(details):
"""Convert a details dict to a string."""
chars = []
# sorted is for testing, may want to remove that and use a dict
# subclass with defined order for items instead.
for key, content in sorted(details.items()):
if content.content_type.type != 'text':
chars.append('Binary content: %s\n' % key)
continue
chars.append('Text attachment: %s\n' % key)
chars.append('------------\n')
chars.extend(content.iter_text())
if not chars[-1].endswith('\n'):
chars.append('\n')
chars.append('------------\n')
return ''.join(chars)

View File

@ -0,0 +1,28 @@
# See README for copyright and licensing details.
import unittest
from testtools.tests import (
test_content,
test_content_type,
test_matchers,
test_runtest,
test_testtools,
test_testresult,
test_testsuite,
)
def test_suite():
suites = []
modules = [
test_content,
test_content_type,
test_matchers,
test_runtest,
test_testresult,
test_testsuite,
test_testtools,
]
for module in modules:
suites.append(getattr(module, 'test_suite')())
return unittest.TestSuite(suites)

View File

@ -0,0 +1,67 @@
# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details.
"""Helpers for tests."""
import sys
__metaclass__ = type
__all__ = [
'LoggingResult',
]
from testtools import TestResult
try:
raise Exception
except Exception:
an_exc_info = sys.exc_info()
# Deprecated: This classes attributes are somewhat non deterministic which
# leads to hard to predict tests (because Python upstream are changing things.
class LoggingResult(TestResult):
"""TestResult that logs its event to a list."""
def __init__(self, log):
self._events = log
super(LoggingResult, self).__init__()
def startTest(self, test):
self._events.append(('startTest', test))
super(LoggingResult, self).startTest(test)
def stopTest(self, test):
self._events.append(('stopTest', test))
super(LoggingResult, self).stopTest(test)
def addFailure(self, test, error):
self._events.append(('addFailure', test, error))
super(LoggingResult, self).addFailure(test, error)
def addError(self, test, error):
self._events.append(('addError', test, error))
super(LoggingResult, self).addError(test, error)
def addSkip(self, test, reason):
self._events.append(('addSkip', test, reason))
super(LoggingResult, self).addSkip(test, reason)
def addSuccess(self, test):
self._events.append(('addSuccess', test))
super(LoggingResult, self).addSuccess(test)
def startTestRun(self):
self._events.append('startTestRun')
super(LoggingResult, self).startTestRun()
def stopTestRun(self):
self._events.append('stopTestRun')
super(LoggingResult, self).stopTestRun()
def done(self):
self._events.append('done')
super(LoggingResult, self).done()
# Note, the following three classes are different to LoggingResult by
# being fully defined exact matches rather than supersets.
from testtools.testresult.doubles import *

View File

@ -0,0 +1,72 @@
# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details.
import unittest
from testtools.content import Content, TracebackContent
from testtools.content_type import ContentType
from testtools.utils import _u
from testtools.tests.helpers import an_exc_info
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)
class TestContent(unittest.TestCase):
def test___init___None_errors(self):
self.assertRaises(ValueError, Content, None, None)
self.assertRaises(ValueError, Content, None, lambda: ["traceback"])
self.assertRaises(ValueError, Content,
ContentType("text", "traceback"), None)
def test___init___sets_ivars(self):
content_type = ContentType("foo", "bar")
content = Content(content_type, lambda: ["bytes"])
self.assertEqual(content_type, content.content_type)
self.assertEqual(["bytes"], list(content.iter_bytes()))
def test___eq__(self):
content_type = ContentType("foo", "bar")
content1 = Content(content_type, lambda: ["bytes"])
content2 = Content(content_type, lambda: ["bytes"])
content3 = Content(content_type, lambda: ["by", "tes"])
content4 = Content(content_type, lambda: ["by", "te"])
content5 = Content(ContentType("f", "b"), lambda: ["by", "tes"])
self.assertEqual(content1, content2)
self.assertEqual(content1, content3)
self.assertNotEqual(content1, content4)
self.assertNotEqual(content1, content5)
def test_iter_text_not_text_errors(self):
content_type = ContentType("foo", "bar")
content = Content(content_type, lambda: ["bytes"])
self.assertRaises(ValueError, content.iter_text)
def test_iter_text_decodes(self):
content_type = ContentType("text", "strange", {"charset": "utf8"})
content = Content(
content_type, lambda: [_u("bytes\xea").encode("utf8")])
self.assertEqual([_u("bytes\xea")], list(content.iter_text()))
def test_iter_text_default_charset_iso_8859_1(self):
content_type = ContentType("text", "strange")
text = _u("bytes\xea")
iso_version = text.encode("ISO-8859-1")
content = Content(content_type, lambda: [iso_version])
self.assertEqual([text], list(content.iter_text()))
class TestTracebackContent(unittest.TestCase):
def test___init___None_errors(self):
self.assertRaises(ValueError, TracebackContent, None, None)
def test___init___sets_ivars(self):
content = TracebackContent(an_exc_info, self)
content_type = ContentType("text", "x-traceback",
{"language": "python", "charset": "utf8"})
self.assertEqual(content_type, content.content_type)
result = unittest.TestResult()
expected = result._exc_info_to_string(an_exc_info, self)
self.assertEqual(expected, ''.join(list(content.iter_text())))

View File

@ -0,0 +1,34 @@
# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details.
import unittest
from testtools.content_type import ContentType
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)
class TestContentType(unittest.TestCase):
def test___init___None_errors(self):
self.assertRaises(ValueError, ContentType, None, None)
self.assertRaises(ValueError, ContentType, None, "traceback")
self.assertRaises(ValueError, ContentType, "text", None)
def test___init___sets_ivars(self):
content_type = ContentType("foo", "bar")
self.assertEqual("foo", content_type.type)
self.assertEqual("bar", content_type.subtype)
self.assertEqual({}, content_type.parameters)
def test___init___with_parameters(self):
content_type = ContentType("foo", "bar", {"quux":"thing"})
self.assertEqual({"quux":"thing"}, content_type.parameters)
def test___eq__(self):
content_type1 = ContentType("foo", "bar", {"quux":"thing"})
content_type2 = ContentType("foo", "bar", {"quux":"thing"})
content_type3 = ContentType("foo", "bar", {"quux":"thing2"})
self.assertTrue(content_type1.__eq__(content_type2))
self.assertFalse(content_type1.__eq__(content_type3))

View File

@ -0,0 +1,113 @@
# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details.
"""Tests for matchers."""
import doctest
from testtools import (
Matcher, # check that Matcher is exposed at the top level for docs.
TestCase,
)
from testtools.matchers import (
Equals,
DocTestMatches,
MatchesAny,
)
class TestMatchersInterface:
def test_matches_match(self):
matcher = self.matches_matcher
matches = self.matches_matches
mismatches = self.matches_mismatches
for candidate in matches:
self.assertEqual(None, matcher.match(candidate))
for candidate in mismatches:
mismatch = matcher.match(candidate)
self.assertNotEqual(None, mismatch)
self.assertNotEqual(None, getattr(mismatch, 'describe', None))
def test__str__(self):
# [(expected, object to __str__)].
examples = self.str_examples
for expected, matcher in examples:
self.assertThat(matcher, DocTestMatches(expected))
def test_describe_difference(self):
# [(expected, matchee, matcher), ...]
examples = self.describe_examples
for difference, matchee, matcher in examples:
mismatch = matcher.match(matchee)
self.assertEqual(difference, mismatch.describe())
class TestDocTestMatchesInterface(TestCase, TestMatchersInterface):
matches_matcher = DocTestMatches("Ran 1 test in ...s", doctest.ELLIPSIS)
matches_matches = ["Ran 1 test in 0.000s", "Ran 1 test in 1.234s"]
matches_mismatches = ["Ran 1 tests in 0.000s", "Ran 2 test in 0.000s"]
str_examples = [("DocTestMatches('Ran 1 test in ...s\\n')",
DocTestMatches("Ran 1 test in ...s")),
("DocTestMatches('foo\\n', flags=8)", DocTestMatches("foo", flags=8)),
]
describe_examples = [('Expected:\n Ran 1 tests in ...s\nGot:\n'
' Ran 1 test in 0.123s\n', "Ran 1 test in 0.123s",
DocTestMatches("Ran 1 tests in ...s", doctest.ELLIPSIS))]
class TestDocTestMatchesSpecific(TestCase):
def test___init__simple(self):
matcher = DocTestMatches("foo")
self.assertEqual("foo\n", matcher.want)
def test___init__flags(self):
matcher = DocTestMatches("bar\n", doctest.ELLIPSIS)
self.assertEqual("bar\n", matcher.want)
self.assertEqual(doctest.ELLIPSIS, matcher.flags)
class TestEqualsInterface(TestCase, TestMatchersInterface):
matches_matcher = Equals(1)
matches_matches = [1]
matches_mismatches = [2]
str_examples = [("Equals(1)", Equals(1)), ("Equals('1')", Equals('1'))]
describe_examples = [("1 != 2", 2, Equals(1))]
class TestMatchersAnyInterface(TestCase, TestMatchersInterface):
matches_matcher = MatchesAny(DocTestMatches("1"), DocTestMatches("2"))
matches_matches = ["1", "2"]
matches_mismatches = ["3"]
str_examples = [(
"MatchesAny(DocTestMatches('1\\n'), DocTestMatches('2\\n'))",
MatchesAny(DocTestMatches("1"), DocTestMatches("2"))),
]
describe_examples = [("""Differences: [
Expected:
1
Got:
3
Expected:
2
Got:
3
]
""",
"3", MatchesAny(DocTestMatches("1"), DocTestMatches("2")))]
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)

View File

@ -0,0 +1,185 @@
# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
"""Tests for the RunTest single test execution logic."""
from testtools import (
ExtendedToOriginalDecorator,
RunTest,
TestCase,
TestResult,
)
from testtools.tests.helpers import ExtendedTestResult
class TestRunTest(TestCase):
def make_case(self):
class Case(TestCase):
def test(self):
pass
return Case('test')
def test___init___short(self):
run = RunTest("bar")
self.assertEqual("bar", run.case)
self.assertEqual([], run.handlers)
def test__init____handlers(self):
handlers = [("quux", "baz")]
run = RunTest("bar", handlers)
self.assertEqual(handlers, run.handlers)
def test_run_with_result(self):
# test.run passes result down to _run_test_method.
log = []
class Case(TestCase):
def _run_test_method(self, result):
log.append(result)
case = Case('_run_test_method')
run = RunTest(case, lambda x: log.append(x))
result = TestResult()
run.run(result)
self.assertEqual(1, len(log))
self.assertEqual(result, log[0].decorated)
def test_run_no_result_manages_new_result(self):
log = []
run = RunTest(self.make_case(), lambda x: log.append(x) or x)
result = run.run()
self.assertIsInstance(result.decorated, TestResult)
def test__run_core_called(self):
case = self.make_case()
log = []
run = RunTest(case, lambda x: x)
run._run_core = lambda: log.append('foo')
run.run()
self.assertEqual(['foo'], log)
def test__run_user_does_not_catch_keyboard(self):
case = self.make_case()
def raises():
raise KeyboardInterrupt("yo")
run = RunTest(case, None)
run.result = ExtendedTestResult()
self.assertRaises(KeyboardInterrupt, run._run_user, raises)
self.assertEqual([], run.result._events)
def test__run_user_calls_onException(self):
case = self.make_case()
log = []
def handler(exc_info):
log.append("got it")
self.assertEqual(3, len(exc_info))
self.assertIsInstance(exc_info[1], KeyError)
self.assertIs(KeyError, exc_info[0])
case.addOnException(handler)
e = KeyError('Yo')
def raises():
raise e
def log_exc(self, result, err):
log.append((result, err))
run = RunTest(case, [(KeyError, log_exc)])
run.result = ExtendedTestResult()
status = run._run_user(raises)
self.assertEqual(run.exception_caught, status)
self.assertEqual([], run.result._events)
self.assertEqual(["got it", (run.result, e)], log)
def test__run_user_can_catch_Exception(self):
case = self.make_case()
e = Exception('Yo')
def raises():
raise e
log = []
def log_exc(self, result, err):
log.append((result, err))
run = RunTest(case, [(Exception, log_exc)])
run.result = ExtendedTestResult()
status = run._run_user(raises)
self.assertEqual(run.exception_caught, status)
self.assertEqual([], run.result._events)
self.assertEqual([(run.result, e)], log)
def test__run_user_uncaught_Exception_raised(self):
case = self.make_case()
e = KeyError('Yo')
def raises():
raise e
log = []
def log_exc(self, result, err):
log.append((result, err))
run = RunTest(case, [(ValueError, log_exc)])
run.result = ExtendedTestResult()
self.assertRaises(KeyError, run._run_user, raises)
self.assertEqual([], run.result._events)
self.assertEqual([], log)
def test__run_user_uncaught_Exception_from_exception_handler_raised(self):
case = self.make_case()
def broken_handler(exc_info):
# ValueError because thats what we know how to catch - and must
# not.
raise ValueError('boo')
case.addOnException(broken_handler)
e = KeyError('Yo')
def raises():
raise e
log = []
def log_exc(self, result, err):
log.append((result, err))
run = RunTest(case, [(ValueError, log_exc)])
run.result = ExtendedTestResult()
self.assertRaises(ValueError, run._run_user, raises)
self.assertEqual([], run.result._events)
self.assertEqual([], log)
def test__run_user_returns_result(self):
case = self.make_case()
def returns():
return 1
run = RunTest(case)
run.result = ExtendedTestResult()
self.assertEqual(1, run._run_user(returns))
self.assertEqual([], run.result._events)
def test__run_one_decorates_result(self):
log = []
class Run(RunTest):
def _run_prepared_result(self, result):
log.append(result)
return result
run = Run(self.make_case(), lambda x: x)
result = run._run_one('foo')
self.assertEqual([result], log)
self.assertIsInstance(log[0], ExtendedToOriginalDecorator)
self.assertEqual('foo', result.decorated)
def test__run_prepared_result_calls_start_and_stop_test(self):
result = ExtendedTestResult()
case = self.make_case()
run = RunTest(case, lambda x: x)
run.run(result)
self.assertEqual([
('startTest', case),
('addSuccess', case),
('stopTest', case),
], result._events)
def test__run_prepared_result_calls_stop_test_always(self):
result = ExtendedTestResult()
case = self.make_case()
def inner():
raise Exception("foo")
run = RunTest(case, lambda x: x)
run._run_core = inner
self.assertRaises(Exception, run.run, result)
self.assertEqual([
('startTest', case),
('stopTest', case),
], result._events)
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)

View File

@ -0,0 +1,807 @@
# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details.
"""Test TestResults and related things."""
__metaclass__ = type
import datetime
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
import doctest
import sys
import threading
from testtools import (
ExtendedToOriginalDecorator,
MultiTestResult,
TestCase,
TestResult,
TextTestResult,
ThreadsafeForwardingResult,
testresult,
)
from testtools.content import Content, ContentType
from testtools.matchers import DocTestMatches
from testtools.utils import _u, _b
from testtools.tests.helpers import (
LoggingResult,
Python26TestResult,
Python27TestResult,
ExtendedTestResult,
an_exc_info
)
class TestTestResultContract(TestCase):
"""Tests for the contract of TestResults."""
def test_addExpectedFailure(self):
# Calling addExpectedFailure(test, exc_info) completes ok.
result = self.makeResult()
result.addExpectedFailure(self, an_exc_info)
def test_addExpectedFailure_details(self):
# Calling addExpectedFailure(test, details=xxx) completes ok.
result = self.makeResult()
result.addExpectedFailure(self, details={})
def test_addError_details(self):
# Calling addError(test, details=xxx) completes ok.
result = self.makeResult()
result.addError(self, details={})
def test_addFailure_details(self):
# Calling addFailure(test, details=xxx) completes ok.
result = self.makeResult()
result.addFailure(self, details={})
def test_addSkipped(self):
# Calling addSkip(test, reason) completes ok.
result = self.makeResult()
result.addSkip(self, _u("Skipped for some reason"))
def test_addSkipped_details(self):
# Calling addSkip(test, reason) completes ok.
result = self.makeResult()
result.addSkip(self, details={})
def test_addUnexpectedSuccess(self):
# Calling addUnexpectedSuccess(test) completes ok.
result = self.makeResult()
result.addUnexpectedSuccess(self)
def test_addUnexpectedSuccess_details(self):
# Calling addUnexpectedSuccess(test) completes ok.
result = self.makeResult()
result.addUnexpectedSuccess(self, details={})
def test_addSuccess_details(self):
# Calling addSuccess(test) completes ok.
result = self.makeResult()
result.addSuccess(self, details={})
def test_startStopTestRun(self):
# Calling startTestRun completes ok.
result = self.makeResult()
result.startTestRun()
result.stopTestRun()
class TestTestResultContract(TestTestResultContract):
def makeResult(self):
return TestResult()
class TestMultiTestresultContract(TestTestResultContract):
def makeResult(self):
return MultiTestResult(TestResult(), TestResult())
class TestTextTestResultContract(TestTestResultContract):
def makeResult(self):
return TextTestResult(StringIO())
class TestThreadSafeForwardingResultContract(TestTestResultContract):
def makeResult(self):
result_semaphore = threading.Semaphore(1)
target = TestResult()
return ThreadsafeForwardingResult(target, result_semaphore)
class TestTestResult(TestCase):
"""Tests for `TestResult`."""
def makeResult(self):
"""Make an arbitrary result for testing."""
return TestResult()
def test_addSkipped(self):
# Calling addSkip on a TestResult records the test that was skipped in
# its skip_reasons dict.
result = self.makeResult()
result.addSkip(self, _u("Skipped for some reason"))
self.assertEqual({_u("Skipped for some reason"):[self]},
result.skip_reasons)
result.addSkip(self, _u("Skipped for some reason"))
self.assertEqual({_u("Skipped for some reason"):[self, self]},
result.skip_reasons)
result.addSkip(self, _u("Skipped for another reason"))
self.assertEqual({_u("Skipped for some reason"):[self, self],
_u("Skipped for another reason"):[self]},
result.skip_reasons)
def test_now_datetime_now(self):
result = self.makeResult()
olddatetime = testresult.real.datetime
def restore():
testresult.real.datetime = olddatetime
self.addCleanup(restore)
class Module:
pass
now = datetime.datetime.now()
stubdatetime = Module()
stubdatetime.datetime = Module()
stubdatetime.datetime.now = lambda: now
testresult.real.datetime = stubdatetime
# Calling _now() looks up the time.
self.assertEqual(now, result._now())
then = now + datetime.timedelta(0, 1)
# Set an explicit datetime, which gets returned from then on.
result.time(then)
self.assertNotEqual(now, result._now())
self.assertEqual(then, result._now())
# go back to looking it up.
result.time(None)
self.assertEqual(now, result._now())
def test_now_datetime_time(self):
result = self.makeResult()
now = datetime.datetime.now()
result.time(now)
self.assertEqual(now, result._now())
class TestWithFakeExceptions(TestCase):
def makeExceptionInfo(self, exceptionFactory, *args, **kwargs):
try:
raise exceptionFactory(*args, **kwargs)
except:
return sys.exc_info()
class TestMultiTestResult(TestWithFakeExceptions):
"""Tests for `MultiTestResult`."""
def setUp(self):
TestWithFakeExceptions.setUp(self)
self.result1 = LoggingResult([])
self.result2 = LoggingResult([])
self.multiResult = MultiTestResult(self.result1, self.result2)
def assertResultLogsEqual(self, expectedEvents):
"""Assert that our test results have received the expected events."""
self.assertEqual(expectedEvents, self.result1._events)
self.assertEqual(expectedEvents, self.result2._events)
def test_empty(self):
# Initializing a `MultiTestResult` doesn't do anything to its
# `TestResult`s.
self.assertResultLogsEqual([])
def test_startTest(self):
# Calling `startTest` on a `MultiTestResult` calls `startTest` on all
# its `TestResult`s.
self.multiResult.startTest(self)
self.assertResultLogsEqual([('startTest', self)])
def test_stopTest(self):
# Calling `stopTest` on a `MultiTestResult` calls `stopTest` on all
# its `TestResult`s.
self.multiResult.stopTest(self)
self.assertResultLogsEqual([('stopTest', self)])
def test_addSkipped(self):
# Calling `addSkip` on a `MultiTestResult` calls addSkip on its
# results.
reason = _u("Skipped for some reason")
self.multiResult.addSkip(self, reason)
self.assertResultLogsEqual([('addSkip', self, reason)])
def test_addSuccess(self):
# Calling `addSuccess` on a `MultiTestResult` calls `addSuccess` on
# all its `TestResult`s.
self.multiResult.addSuccess(self)
self.assertResultLogsEqual([('addSuccess', self)])
def test_done(self):
# Calling `done` on a `MultiTestResult` calls `done` on all its
# `TestResult`s.
self.multiResult.done()
self.assertResultLogsEqual([('done')])
def test_addFailure(self):
# Calling `addFailure` on a `MultiTestResult` calls `addFailure` on
# all its `TestResult`s.
exc_info = self.makeExceptionInfo(AssertionError, 'failure')
self.multiResult.addFailure(self, exc_info)
self.assertResultLogsEqual([('addFailure', self, exc_info)])
def test_addError(self):
# Calling `addError` on a `MultiTestResult` calls `addError` on all
# its `TestResult`s.
exc_info = self.makeExceptionInfo(RuntimeError, 'error')
self.multiResult.addError(self, exc_info)
self.assertResultLogsEqual([('addError', self, exc_info)])
def test_startTestRun(self):
# Calling `startTestRun` on a `MultiTestResult` forwards to all its
# `TestResult`s.
self.multiResult.startTestRun()
self.assertResultLogsEqual([('startTestRun')])
def test_stopTestRun(self):
# Calling `stopTestRun` on a `MultiTestResult` forwards to all its
# `TestResult`s.
self.multiResult.stopTestRun()
self.assertResultLogsEqual([('stopTestRun')])
class TestTextTestResult(TestWithFakeExceptions):
"""Tests for `TextTestResult`."""
def setUp(self):
super(TestTextTestResult, self).setUp()
self.result = TextTestResult(StringIO())
def make_erroring_test(self):
class Test(TestCase):
def error(self):
1/0
return Test("error")
def make_failing_test(self):
class Test(TestCase):
def failed(self):
self.fail("yo!")
return Test("failed")
def make_test(self):
class Test(TestCase):
def test(self):
pass
return Test("test")
def getvalue(self):
return self.result.stream.getvalue()
def test__init_sets_stream(self):
result = TextTestResult("fp")
self.assertEqual("fp", result.stream)
def reset_output(self):
self.result.stream = StringIO()
def test_startTestRun(self):
self.result.startTestRun()
self.assertEqual("Tests running...\n", self.getvalue())
def test_stopTestRun_count_many(self):
test = self.make_test()
self.result.startTestRun()
self.result.startTest(test)
self.result.stopTest(test)
self.result.startTest(test)
self.result.stopTest(test)
self.result.stream = StringIO()
self.result.stopTestRun()
self.assertThat(self.getvalue(),
DocTestMatches("Ran 2 tests in ...s\n...", doctest.ELLIPSIS))
def test_stopTestRun_count_single(self):
test = self.make_test()
self.result.startTestRun()
self.result.startTest(test)
self.result.stopTest(test)
self.reset_output()
self.result.stopTestRun()
self.assertThat(self.getvalue(),
DocTestMatches("Ran 1 test in ...s\n\nOK\n", doctest.ELLIPSIS))
def test_stopTestRun_count_zero(self):
self.result.startTestRun()
self.reset_output()
self.result.stopTestRun()
self.assertThat(self.getvalue(),
DocTestMatches("Ran 0 tests in ...s\n\nOK\n", doctest.ELLIPSIS))
def test_stopTestRun_current_time(self):
test = self.make_test()
now = datetime.datetime.now()
self.result.time(now)
self.result.startTestRun()
self.result.startTest(test)
now = now + datetime.timedelta(0, 0, 0, 1)
self.result.time(now)
self.result.stopTest(test)
self.reset_output()
self.result.stopTestRun()
self.assertThat(self.getvalue(),
DocTestMatches("... in 0.001s\n...", doctest.ELLIPSIS))
def test_stopTestRun_successful(self):
self.result.startTestRun()
self.result.stopTestRun()
self.assertThat(self.getvalue(),
DocTestMatches("...\n\nOK\n", doctest.ELLIPSIS))
def test_stopTestRun_not_successful_failure(self):
test = self.make_failing_test()
self.result.startTestRun()
test.run(self.result)
self.result.stopTestRun()
self.assertThat(self.getvalue(),
DocTestMatches("...\n\nFAILED (failures=1)\n", doctest.ELLIPSIS))
def test_stopTestRun_not_successful_error(self):
test = self.make_erroring_test()
self.result.startTestRun()
test.run(self.result)
self.result.stopTestRun()
self.assertThat(self.getvalue(),
DocTestMatches("...\n\nFAILED (failures=1)\n", doctest.ELLIPSIS))
def test_stopTestRun_shows_details(self):
self.result.startTestRun()
self.make_erroring_test().run(self.result)
self.make_failing_test().run(self.result)
self.reset_output()
self.result.stopTestRun()
self.assertThat(self.getvalue(),
DocTestMatches("""...======================================================================
ERROR: testtools.tests.test_testresult.Test.error
----------------------------------------------------------------------
Text attachment: traceback
------------
Traceback (most recent call last):
File "...testtools...runtest.py", line ..., in _run_user...
return fn(*args)
File "...testtools...testcase.py", line ..., in _run_test_method
testMethod()
File "...testtools...tests...test_testresult.py", line ..., in error
1/0
ZeroDivisionError: int... division or modulo by zero
------------
======================================================================
FAIL: testtools.tests.test_testresult.Test.failed
----------------------------------------------------------------------
Text attachment: traceback
------------
Traceback (most recent call last):
File "...testtools...runtest.py", line ..., in _run_user...
return fn(*args)
File "...testtools...testcase.py", line ..., in _run_test_method
testMethod()
File "...testtools...tests...test_testresult.py", line ..., in failed
self.fail("yo!")
AssertionError: yo!
------------
...""", doctest.ELLIPSIS))
class TestThreadSafeForwardingResult(TestWithFakeExceptions):
"""Tests for `MultiTestResult`."""
def setUp(self):
TestWithFakeExceptions.setUp(self)
self.result_semaphore = threading.Semaphore(1)
self.target = LoggingResult([])
self.result1 = ThreadsafeForwardingResult(self.target,
self.result_semaphore)
def test_nonforwarding_methods(self):
# startTest and stopTest are not forwarded because they need to be
# batched.
self.result1.startTest(self)
self.result1.stopTest(self)
self.assertEqual([], self.target._events)
def test_startTestRun(self):
self.result1.startTestRun()
self.result2 = ThreadsafeForwardingResult(self.target,
self.result_semaphore)
self.result2.startTestRun()
self.assertEqual(["startTestRun", "startTestRun"], self.target._events)
def test_stopTestRun(self):
self.result1.stopTestRun()
self.result2 = ThreadsafeForwardingResult(self.target,
self.result_semaphore)
self.result2.stopTestRun()
self.assertEqual(["stopTestRun", "stopTestRun"], self.target._events)
def test_forwarding_methods(self):
# error, failure, skip and success are forwarded in batches.
exc_info1 = self.makeExceptionInfo(RuntimeError, 'error')
self.result1.addError(self, exc_info1)
exc_info2 = self.makeExceptionInfo(AssertionError, 'failure')
self.result1.addFailure(self, exc_info2)
reason = _u("Skipped for some reason")
self.result1.addSkip(self, reason)
self.result1.addSuccess(self)
self.assertEqual([('startTest', self),
('addError', self, exc_info1),
('stopTest', self),
('startTest', self),
('addFailure', self, exc_info2),
('stopTest', self),
('startTest', self),
('addSkip', self, reason),
('stopTest', self),
('startTest', self),
('addSuccess', self),
('stopTest', self),
], self.target._events)
class TestExtendedToOriginalResultDecoratorBase(TestCase):
def make_26_result(self):
self.result = Python26TestResult()
self.make_converter()
def make_27_result(self):
self.result = Python27TestResult()
self.make_converter()
def make_converter(self):
self.converter = ExtendedToOriginalDecorator(self.result)
def make_extended_result(self):
self.result = ExtendedTestResult()
self.make_converter()
def check_outcome_details(self, outcome):
"""Call an outcome with a details dict to be passed through."""
# This dict is /not/ convertible - thats deliberate, as it should
# not hit the conversion code path.
details = {'foo': 'bar'}
getattr(self.converter, outcome)(self, details=details)
self.assertEqual([(outcome, self, details)], self.result._events)
def get_details_and_string(self):
"""Get a details dict and expected string."""
text1 = lambda: [_b("1\n2\n")]
text2 = lambda: [_b("3\n4\n")]
bin1 = lambda: [_b("5\n")]
details = {'text 1': Content(ContentType('text', 'plain'), text1),
'text 2': Content(ContentType('text', 'strange'), text2),
'bin 1': Content(ContentType('application', 'binary'), bin1)}
return (details, "Binary content: bin 1\n"
"Text attachment: text 1\n------------\n1\n2\n"
"------------\nText attachment: text 2\n------------\n"
"3\n4\n------------\n")
def check_outcome_details_to_exec_info(self, outcome, expected=None):
"""Call an outcome with a details dict to be made into exc_info."""
# The conversion is a done using RemoteError and the string contents
# of the text types in the details dict.
if not expected:
expected = outcome
details, err_str = self.get_details_and_string()
getattr(self.converter, outcome)(self, details=details)
err = self.converter._details_to_exc_info(details)
self.assertEqual([(expected, self, err)], self.result._events)
def check_outcome_details_to_nothing(self, outcome, expected=None):
"""Call an outcome with a details dict to be swallowed."""
if not expected:
expected = outcome
details = {'foo': 'bar'}
getattr(self.converter, outcome)(self, details=details)
self.assertEqual([(expected, self)], self.result._events)
def check_outcome_details_to_string(self, outcome):
"""Call an outcome with a details dict to be stringified."""
details, err_str = self.get_details_and_string()
getattr(self.converter, outcome)(self, details=details)
self.assertEqual([(outcome, self, err_str)], self.result._events)
def check_outcome_exc_info(self, outcome, expected=None):
"""Check that calling a legacy outcome still works."""
# calling some outcome with the legacy exc_info style api (no keyword
# parameters) gets passed through.
if not expected:
expected = outcome
err = sys.exc_info()
getattr(self.converter, outcome)(self, err)
self.assertEqual([(expected, self, err)], self.result._events)
def check_outcome_exc_info_to_nothing(self, outcome, expected=None):
"""Check that calling a legacy outcome on a fallback works."""
# calling some outcome with the legacy exc_info style api (no keyword
# parameters) gets passed through.
if not expected:
expected = outcome
err = sys.exc_info()
getattr(self.converter, outcome)(self, err)
self.assertEqual([(expected, self)], self.result._events)
def check_outcome_nothing(self, outcome, expected=None):
"""Check that calling a legacy outcome still works."""
if not expected:
expected = outcome
getattr(self.converter, outcome)(self)
self.assertEqual([(expected, self)], self.result._events)
def check_outcome_string_nothing(self, outcome, expected):
"""Check that calling outcome with a string calls expected."""
getattr(self.converter, outcome)(self, "foo")
self.assertEqual([(expected, self)], self.result._events)
def check_outcome_string(self, outcome):
"""Check that calling outcome with a string works."""
getattr(self.converter, outcome)(self, "foo")
self.assertEqual([(outcome, self, "foo")], self.result._events)
class TestExtendedToOriginalResultDecorator(
TestExtendedToOriginalResultDecoratorBase):
def test_progress_py26(self):
self.make_26_result()
self.converter.progress(1, 2)
def test_progress_py27(self):
self.make_27_result()
self.converter.progress(1, 2)
def test_progress_pyextended(self):
self.make_extended_result()
self.converter.progress(1, 2)
self.assertEqual([('progress', 1, 2)], self.result._events)
def test_shouldStop(self):
self.make_26_result()
self.assertEqual(False, self.converter.shouldStop)
self.converter.decorated.stop()
self.assertEqual(True, self.converter.shouldStop)
def test_startTest_py26(self):
self.make_26_result()
self.converter.startTest(self)
self.assertEqual([('startTest', self)], self.result._events)
def test_startTest_py27(self):
self.make_27_result()
self.converter.startTest(self)
self.assertEqual([('startTest', self)], self.result._events)
def test_startTest_pyextended(self):
self.make_extended_result()
self.converter.startTest(self)
self.assertEqual([('startTest', self)], self.result._events)
def test_startTestRun_py26(self):
self.make_26_result()
self.converter.startTestRun()
self.assertEqual([], self.result._events)
def test_startTestRun_py27(self):
self.make_27_result()
self.converter.startTestRun()
self.assertEqual([('startTestRun',)], self.result._events)
def test_startTestRun_pyextended(self):
self.make_extended_result()
self.converter.startTestRun()
self.assertEqual([('startTestRun',)], self.result._events)
def test_stopTest_py26(self):
self.make_26_result()
self.converter.stopTest(self)
self.assertEqual([('stopTest', self)], self.result._events)
def test_stopTest_py27(self):
self.make_27_result()
self.converter.stopTest(self)
self.assertEqual([('stopTest', self)], self.result._events)
def test_stopTest_pyextended(self):
self.make_extended_result()
self.converter.stopTest(self)
self.assertEqual([('stopTest', self)], self.result._events)
def test_stopTestRun_py26(self):
self.make_26_result()
self.converter.stopTestRun()
self.assertEqual([], self.result._events)
def test_stopTestRun_py27(self):
self.make_27_result()
self.converter.stopTestRun()
self.assertEqual([('stopTestRun',)], self.result._events)
def test_stopTestRun_pyextended(self):
self.make_extended_result()
self.converter.stopTestRun()
self.assertEqual([('stopTestRun',)], self.result._events)
def test_tags_py26(self):
self.make_26_result()
self.converter.tags(1, 2)
def test_tags_py27(self):
self.make_27_result()
self.converter.tags(1, 2)
def test_tags_pyextended(self):
self.make_extended_result()
self.converter.tags(1, 2)
self.assertEqual([('tags', 1, 2)], self.result._events)
def test_time_py26(self):
self.make_26_result()
self.converter.time(1)
def test_time_py27(self):
self.make_27_result()
self.converter.time(1)
def test_time_pyextended(self):
self.make_extended_result()
self.converter.time(1)
self.assertEqual([('time', 1)], self.result._events)
class TestExtendedToOriginalAddError(TestExtendedToOriginalResultDecoratorBase):
outcome = 'addError'
def test_outcome_Original_py26(self):
self.make_26_result()
self.check_outcome_exc_info(self.outcome)
def test_outcome_Original_py27(self):
self.make_27_result()
self.check_outcome_exc_info(self.outcome)
def test_outcome_Original_pyextended(self):
self.make_extended_result()
self.check_outcome_exc_info(self.outcome)
def test_outcome_Extended_py26(self):
self.make_26_result()
self.check_outcome_details_to_exec_info(self.outcome)
def test_outcome_Extended_py27(self):
self.make_27_result()
self.check_outcome_details_to_exec_info(self.outcome)
def test_outcome_Extended_pyextended(self):
self.make_extended_result()
self.check_outcome_details(self.outcome)
def test_outcome__no_details(self):
self.make_extended_result()
self.assertRaises(ValueError,
getattr(self.converter, self.outcome), self)
class TestExtendedToOriginalAddFailure(
TestExtendedToOriginalAddError):
outcome = 'addFailure'
class TestExtendedToOriginalAddExpectedFailure(
TestExtendedToOriginalAddError):
outcome = 'addExpectedFailure'
def test_outcome_Original_py26(self):
self.make_26_result()
self.check_outcome_exc_info_to_nothing(self.outcome, 'addSuccess')
def test_outcome_Extended_py26(self):
self.make_26_result()
self.check_outcome_details_to_nothing(self.outcome, 'addSuccess')
class TestExtendedToOriginalAddSkip(
TestExtendedToOriginalResultDecoratorBase):
outcome = 'addSkip'
def test_outcome_Original_py26(self):
self.make_26_result()
self.check_outcome_string_nothing(self.outcome, 'addSuccess')
def test_outcome_Original_py27(self):
self.make_27_result()
self.check_outcome_string(self.outcome)
def test_outcome_Original_pyextended(self):
self.make_extended_result()
self.check_outcome_string(self.outcome)
def test_outcome_Extended_py26(self):
self.make_26_result()
self.check_outcome_string_nothing(self.outcome, 'addSuccess')
def test_outcome_Extended_py27(self):
self.make_27_result()
self.check_outcome_details_to_string(self.outcome)
def test_outcome_Extended_pyextended(self):
self.make_extended_result()
self.check_outcome_details(self.outcome)
def test_outcome__no_details(self):
self.make_extended_result()
self.assertRaises(ValueError,
getattr(self.converter, self.outcome), self)
class TestExtendedToOriginalAddSuccess(
TestExtendedToOriginalResultDecoratorBase):
outcome = 'addSuccess'
expected = 'addSuccess'
def test_outcome_Original_py26(self):
self.make_26_result()
self.check_outcome_nothing(self.outcome, self.expected)
def test_outcome_Original_py27(self):
self.make_27_result()
self.check_outcome_nothing(self.outcome)
def test_outcome_Original_pyextended(self):
self.make_extended_result()
self.check_outcome_nothing(self.outcome)
def test_outcome_Extended_py26(self):
self.make_26_result()
self.check_outcome_details_to_nothing(self.outcome, self.expected)
def test_outcome_Extended_py27(self):
self.make_27_result()
self.check_outcome_details_to_nothing(self.outcome)
def test_outcome_Extended_pyextended(self):
self.make_extended_result()
self.check_outcome_details(self.outcome)
class TestExtendedToOriginalAddUnexpectedSuccess(
TestExtendedToOriginalAddSuccess):
outcome = 'addUnexpectedSuccess'
class TestExtendedToOriginalResultOtherAttributes(
TestExtendedToOriginalResultDecoratorBase):
def test_other_attribute(self):
class OtherExtendedResult:
def foo(self):
return 2
bar = 1
self.result = OtherExtendedResult()
self.make_converter()
self.assertEqual(1, self.converter.bar)
self.assertEqual(2, self.converter.foo())
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)

View File

@ -0,0 +1,56 @@
# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
"""Test ConcurrentTestSuite and related things."""
__metaclass__ = type
import unittest
from testtools import (
ConcurrentTestSuite,
iterate_tests,
TestCase,
)
from testtools.matchers import (
Equals,
)
from testtools.tests.helpers import LoggingResult
class TestConcurrentTestSuiteRun(TestCase):
def test_trivial(self):
log = []
result = LoggingResult(log)
class Sample(TestCase):
def __hash__(self):
return id(self)
def test_method1(self):
pass
def test_method2(self):
pass
test1 = Sample('test_method1')
test2 = Sample('test_method2')
original_suite = unittest.TestSuite([test1, test2])
suite = ConcurrentTestSuite(original_suite, self.split_suite)
suite.run(result)
test1 = log[0][1]
test2 = log[-1][1]
self.assertIsInstance(test1, Sample)
self.assertIsInstance(test2, Sample)
self.assertNotEqual(test1.id(), test2.id())
# We expect the start/outcome/stop to be grouped
expected = [('startTest', test1), ('addSuccess', test1),
('stopTest', test1), ('startTest', test2), ('addSuccess', test2),
('stopTest', test2)]
self.assertThat(log, Equals(expected))
def split_suite(self, suite):
tests = list(iterate_tests(suite))
return tests[0], tests[1]
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)

View File

@ -0,0 +1,743 @@
# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details.
"""Tests for extensions to the base test library."""
import sys
import unittest
from testtools import (
TestCase,
clone_test_with_new_id,
content,
skip,
skipIf,
skipUnless,
testcase,
)
from testtools.matchers import (
Equals,
)
from testtools.tests.helpers import (
an_exc_info,
LoggingResult,
Python26TestResult,
Python27TestResult,
ExtendedTestResult,
)
class TestEquality(TestCase):
"""Test `TestCase`'s equality implementation."""
def test_identicalIsEqual(self):
# TestCase's are equal if they are identical.
self.assertEqual(self, self)
def test_nonIdenticalInUnequal(self):
# TestCase's are not equal if they are not identical.
self.assertNotEqual(TestCase(methodName='run'),
TestCase(methodName='skip'))
class TestAssertions(TestCase):
"""Test assertions in TestCase."""
def raiseError(self, exceptionFactory, *args, **kwargs):
raise exceptionFactory(*args, **kwargs)
def test_formatTypes_single(self):
# Given a single class, _formatTypes returns the name.
class Foo:
pass
self.assertEqual('Foo', self._formatTypes(Foo))
def test_formatTypes_multiple(self):
# Given multiple types, _formatTypes returns the names joined by
# commas.
class Foo:
pass
class Bar:
pass
self.assertEqual('Foo, Bar', self._formatTypes([Foo, Bar]))
def test_assertRaises(self):
# assertRaises asserts that a callable raises a particular exception.
self.assertRaises(RuntimeError, self.raiseError, RuntimeError)
def test_assertRaises_fails_when_no_error_raised(self):
# assertRaises raises self.failureException when it's passed a
# callable that raises no error.
ret = ('orange', 42)
try:
self.assertRaises(RuntimeError, lambda: ret)
except self.failureException:
# We expected assertRaises to raise this exception.
e = sys.exc_info()[1]
self.assertEqual(
'%s not raised, %r returned instead.'
% (self._formatTypes(RuntimeError), ret), str(e))
else:
self.fail('Expected assertRaises to fail, but it did not.')
def test_assertRaises_fails_when_different_error_raised(self):
# assertRaises re-raises an exception that it didn't expect.
self.assertRaises(
ZeroDivisionError,
self.assertRaises,
RuntimeError, self.raiseError, ZeroDivisionError)
def test_assertRaises_returns_the_raised_exception(self):
# assertRaises returns the exception object that was raised. This is
# useful for testing that exceptions have the right message.
# This contraption stores the raised exception, so we can compare it
# to the return value of assertRaises.
raisedExceptions = []
def raiseError():
try:
raise RuntimeError('Deliberate error')
except RuntimeError:
raisedExceptions.append(sys.exc_info()[1])
raise
exception = self.assertRaises(RuntimeError, raiseError)
self.assertEqual(1, len(raisedExceptions))
self.assertTrue(
exception is raisedExceptions[0],
"%r is not %r" % (exception, raisedExceptions[0]))
def test_assertRaises_with_multiple_exceptions(self):
# assertRaises((ExceptionOne, ExceptionTwo), function) asserts that
# function raises one of ExceptionTwo or ExceptionOne.
expectedExceptions = (RuntimeError, ZeroDivisionError)
self.assertRaises(
expectedExceptions, self.raiseError, expectedExceptions[0])
self.assertRaises(
expectedExceptions, self.raiseError, expectedExceptions[1])
def test_assertRaises_with_multiple_exceptions_failure_mode(self):
# If assertRaises is called expecting one of a group of exceptions and
# a callable that doesn't raise an exception, then fail with an
# appropriate error message.
expectedExceptions = (RuntimeError, ZeroDivisionError)
failure = self.assertRaises(
self.failureException,
self.assertRaises, expectedExceptions, lambda: None)
self.assertEqual(
'%s not raised, None returned instead.'
% self._formatTypes(expectedExceptions), str(failure))
def assertFails(self, message, function, *args, **kwargs):
"""Assert that function raises a failure with the given message."""
failure = self.assertRaises(
self.failureException, function, *args, **kwargs)
self.assertEqual(message, str(failure))
def test_assertIn_success(self):
# assertIn(needle, haystack) asserts that 'needle' is in 'haystack'.
self.assertIn(3, range(10))
self.assertIn('foo', 'foo bar baz')
self.assertIn('foo', 'foo bar baz'.split())
def test_assertIn_failure(self):
# assertIn(needle, haystack) fails the test when 'needle' is not in
# 'haystack'.
self.assertFails('3 not in [0, 1, 2]', self.assertIn, 3, [0, 1, 2])
self.assertFails(
'%r not in %r' % ('qux', 'foo bar baz'),
self.assertIn, 'qux', 'foo bar baz')
def test_assertNotIn_success(self):
# assertNotIn(needle, haystack) asserts that 'needle' is not in
# 'haystack'.
self.assertNotIn(3, [0, 1, 2])
self.assertNotIn('qux', 'foo bar baz')
def test_assertNotIn_failure(self):
# assertNotIn(needle, haystack) fails the test when 'needle' is in
# 'haystack'.
self.assertFails('3 in [1, 2, 3]', self.assertNotIn, 3, [1, 2, 3])
self.assertFails(
'%r in %r' % ('foo', 'foo bar baz'),
self.assertNotIn, 'foo', 'foo bar baz')
def test_assertIsInstance(self):
# assertIsInstance asserts that an object is an instance of a class.
class Foo:
"""Simple class for testing assertIsInstance."""
foo = Foo()
self.assertIsInstance(foo, Foo)
def test_assertIsInstance_multiple_classes(self):
# assertIsInstance asserts that an object is an instance of one of a
# group of classes.
class Foo:
"""Simple class for testing assertIsInstance."""
class Bar:
"""Another simple class for testing assertIsInstance."""
foo = Foo()
self.assertIsInstance(foo, (Foo, Bar))
self.assertIsInstance(Bar(), (Foo, Bar))
def test_assertIsInstance_failure(self):
# assertIsInstance(obj, klass) fails the test when obj is not an
# instance of klass.
class Foo:
"""Simple class for testing assertIsInstance."""
self.assertFails(
'42 is not an instance of %s' % self._formatTypes(Foo),
self.assertIsInstance, 42, Foo)
def test_assertIsInstance_failure_multiple_classes(self):
# assertIsInstance(obj, (klass1, klass2)) fails the test when obj is
# not an instance of klass1 or klass2.
class Foo:
"""Simple class for testing assertIsInstance."""
class Bar:
"""Another simple class for testing assertIsInstance."""
self.assertFails(
'42 is not an instance of %s' % self._formatTypes([Foo, Bar]),
self.assertIsInstance, 42, (Foo, Bar))
def test_assertIs(self):
# assertIs asserts that an object is identical to another object.
self.assertIs(None, None)
some_list = [42]
self.assertIs(some_list, some_list)
some_object = object()
self.assertIs(some_object, some_object)
def test_assertIs_fails(self):
# assertIs raises assertion errors if one object is not identical to
# another.
self.assertFails('None is not 42', self.assertIs, None, 42)
self.assertFails('[42] is not [42]', self.assertIs, [42], [42])
def test_assertIsNot(self):
# assertIsNot asserts that an object is not identical to another
# object.
self.assertIsNot(None, 42)
self.assertIsNot([42], [42])
self.assertIsNot(object(), object())
def test_assertIsNot_fails(self):
# assertIsNot raises assertion errors if one object is identical to
# another.
self.assertFails('None is None', self.assertIsNot, None, None)
some_list = [42]
self.assertFails(
'[42] is [42]', self.assertIsNot, some_list, some_list)
def test_assertThat_matches_clean(self):
class Matcher:
def match(self, foo):
return None
self.assertThat("foo", Matcher())
def test_assertThat_mismatch_raises_description(self):
calls = []
class Mismatch:
def __init__(self, thing):
self.thing = thing
def describe(self):
calls.append(('describe_diff', self.thing))
return "object is not a thing"
class Matcher:
def match(self, thing):
calls.append(('match', thing))
return Mismatch(thing)
def __str__(self):
calls.append(('__str__',))
return "a description"
class Test(TestCase):
def test(self):
self.assertThat("foo", Matcher())
result = Test("test").run()
self.assertEqual([
('match', "foo"),
('describe_diff', "foo"),
('__str__',),
], calls)
self.assertFalse(result.wasSuccessful())
class TestAddCleanup(TestCase):
"""Tests for TestCase.addCleanup."""
class LoggingTest(TestCase):
"""A test that logs calls to setUp, runTest and tearDown."""
def setUp(self):
TestCase.setUp(self)
self._calls = ['setUp']
def brokenSetUp(self):
# A tearDown that deliberately fails.
self._calls = ['brokenSetUp']
raise RuntimeError('Deliberate Failure')
def runTest(self):
self._calls.append('runTest')
def tearDown(self):
self._calls.append('tearDown')
TestCase.tearDown(self)
def setUp(self):
TestCase.setUp(self)
self._result_calls = []
self.test = TestAddCleanup.LoggingTest('runTest')
self.logging_result = LoggingResult(self._result_calls)
def assertErrorLogEqual(self, messages):
self.assertEqual(messages, [call[0] for call in self._result_calls])
def assertTestLogEqual(self, messages):
"""Assert that the call log equals `messages`."""
case = self._result_calls[0][1]
self.assertEqual(messages, case._calls)
def logAppender(self, message):
"""A cleanup that appends `message` to the tests log.
Cleanups are callables that are added to a test by addCleanup. To
verify that our cleanups run in the right order, we add strings to a
list that acts as a log. This method returns a cleanup that will add
the given message to that log when run.
"""
self.test._calls.append(message)
def test_fixture(self):
# A normal run of self.test logs 'setUp', 'runTest' and 'tearDown'.
# This test doesn't test addCleanup itself, it just sanity checks the
# fixture.
self.test.run(self.logging_result)
self.assertTestLogEqual(['setUp', 'runTest', 'tearDown'])
def test_cleanup_run_before_tearDown(self):
# Cleanup functions added with 'addCleanup' are called before tearDown
# runs.
self.test.addCleanup(self.logAppender, 'cleanup')
self.test.run(self.logging_result)
self.assertTestLogEqual(['setUp', 'runTest', 'tearDown', 'cleanup'])
def test_add_cleanup_called_if_setUp_fails(self):
# Cleanup functions added with 'addCleanup' are called even if setUp
# fails. Note that tearDown has a different behavior: it is only
# called when setUp succeeds.
self.test.setUp = self.test.brokenSetUp
self.test.addCleanup(self.logAppender, 'cleanup')
self.test.run(self.logging_result)
self.assertTestLogEqual(['brokenSetUp', 'cleanup'])
def test_addCleanup_called_in_reverse_order(self):
# Cleanup functions added with 'addCleanup' are called in reverse
# order.
#
# One of the main uses of addCleanup is to dynamically create
# resources that need some sort of explicit tearDown. Often one
# resource will be created in terms of another, e.g.,
# self.first = self.makeFirst()
# self.second = self.makeSecond(self.first)
#
# When this happens, we generally want to clean up the second resource
# before the first one, since the second depends on the first.
self.test.addCleanup(self.logAppender, 'first')
self.test.addCleanup(self.logAppender, 'second')
self.test.run(self.logging_result)
self.assertTestLogEqual(
['setUp', 'runTest', 'tearDown', 'second', 'first'])
def test_tearDown_runs_after_cleanup_failure(self):
# tearDown runs even if a cleanup function fails.
self.test.addCleanup(lambda: 1/0)
self.test.run(self.logging_result)
self.assertTestLogEqual(['setUp', 'runTest', 'tearDown'])
def test_cleanups_continue_running_after_error(self):
# All cleanups are always run, even if one or two of them fail.
self.test.addCleanup(self.logAppender, 'first')
self.test.addCleanup(lambda: 1/0)
self.test.addCleanup(self.logAppender, 'second')
self.test.run(self.logging_result)
self.assertTestLogEqual(
['setUp', 'runTest', 'tearDown', 'second', 'first'])
def test_error_in_cleanups_are_captured(self):
# If a cleanup raises an error, we want to record it and fail the the
# test, even though we go on to run other cleanups.
self.test.addCleanup(lambda: 1/0)
self.test.run(self.logging_result)
self.assertErrorLogEqual(['startTest', 'addError', 'stopTest'])
def test_keyboard_interrupt_not_caught(self):
# If a cleanup raises KeyboardInterrupt, it gets reraised.
def raiseKeyboardInterrupt():
raise KeyboardInterrupt()
self.test.addCleanup(raiseKeyboardInterrupt)
self.assertRaises(
KeyboardInterrupt, self.test.run, self.logging_result)
def test_multipleErrorsReported(self):
# Errors from all failing cleanups are reported.
self.test.addCleanup(lambda: 1/0)
self.test.addCleanup(lambda: 1/0)
self.test.run(self.logging_result)
self.assertErrorLogEqual(
['startTest', 'addError', 'addError', 'stopTest'])
class TestWithDetails(TestCase):
def assertDetailsProvided(self, case, expected_outcome, expected_keys):
"""Assert that when case is run, details are provided to the result.
:param case: A TestCase to run.
:param expected_outcome: The call that should be made.
:param expected_keys: The keys to look for.
"""
result = ExtendedTestResult()
case.run(result)
case = result._events[0][1]
expected = [
('startTest', case),
(expected_outcome, case),
('stopTest', case),
]
self.assertEqual(3, len(result._events))
self.assertEqual(expected[0], result._events[0])
self.assertEqual(expected[1], result._events[1][0:2])
# Checking the TB is right is rather tricky. doctest line matching
# would help, but 'meh'.
self.assertEqual(sorted(expected_keys),
sorted(result._events[1][2].keys()))
self.assertEqual(expected[-1], result._events[-1])
def get_content(self):
return content.Content(
content.ContentType("text", "foo"), lambda: ['foo'])
class TestExpectedFailure(TestWithDetails):
"""Tests for expected failures and unexpected successess."""
def make_unexpected_case(self):
class Case(TestCase):
def test(self):
raise testcase._UnexpectedSuccess
case = Case('test')
return case
def test_raising__UnexpectedSuccess_py27(self):
case = self.make_unexpected_case()
result = Python27TestResult()
case.run(result)
case = result._events[0][1]
self.assertEqual([
('startTest', case),
('addUnexpectedSuccess', case),
('stopTest', case),
], result._events)
def test_raising__UnexpectedSuccess_extended(self):
case = self.make_unexpected_case()
result = ExtendedTestResult()
case.run(result)
case = result._events[0][1]
self.assertEqual([
('startTest', case),
('addUnexpectedSuccess', case, {}),
('stopTest', case),
], result._events)
def make_xfail_case_xfails(self):
content = self.get_content()
class Case(TestCase):
def test(self):
self.addDetail("foo", content)
self.expectFailure("we are sad", self.assertEqual,
1, 0)
case = Case('test')
return case
def make_xfail_case_succeeds(self):
content = self.get_content()
class Case(TestCase):
def test(self):
self.addDetail("foo", content)
self.expectFailure("we are sad", self.assertEqual,
1, 1)
case = Case('test')
return case
def test_expectFailure_KnownFailure_extended(self):
case = self.make_xfail_case_xfails()
self.assertDetailsProvided(case, "addExpectedFailure",
["foo", "traceback", "reason"])
def test_expectFailure_KnownFailure_unexpected_success(self):
case = self.make_xfail_case_succeeds()
self.assertDetailsProvided(case, "addUnexpectedSuccess",
["foo", "reason"])
class TestUniqueFactories(TestCase):
"""Tests for getUniqueString and getUniqueInteger."""
def test_getUniqueInteger(self):
# getUniqueInteger returns an integer that increments each time you
# call it.
one = self.getUniqueInteger()
self.assertEqual(1, one)
two = self.getUniqueInteger()
self.assertEqual(2, two)
def test_getUniqueString(self):
# getUniqueString returns the current test id followed by a unique
# integer.
name_one = self.getUniqueString()
self.assertEqual('%s-%d' % (self.id(), 1), name_one)
name_two = self.getUniqueString()
self.assertEqual('%s-%d' % (self.id(), 2), name_two)
def test_getUniqueString_prefix(self):
# If getUniqueString is given an argument, it uses that argument as
# the prefix of the unique string, rather than the test id.
name_one = self.getUniqueString('foo')
self.assertThat(name_one, Equals('foo-1'))
name_two = self.getUniqueString('bar')
self.assertThat(name_two, Equals('bar-2'))
class TestCloneTestWithNewId(TestCase):
"""Tests for clone_test_with_new_id."""
def test_clone_test_with_new_id(self):
class FooTestCase(TestCase):
def test_foo(self):
pass
test = FooTestCase('test_foo')
oldName = test.id()
newName = self.getUniqueString()
newTest = clone_test_with_new_id(test, newName)
self.assertEqual(newName, newTest.id())
self.assertEqual(oldName, test.id(),
"the original test instance should be unchanged.")
class TestDetailsProvided(TestWithDetails):
def test_addDetail(self):
mycontent = self.get_content()
self.addDetail("foo", mycontent)
details = self.getDetails()
self.assertEqual({"foo": mycontent}, details)
def test_addError(self):
class Case(TestCase):
def test(this):
this.addDetail("foo", self.get_content())
1/0
self.assertDetailsProvided(Case("test"), "addError",
["foo", "traceback"])
def test_addFailure(self):
class Case(TestCase):
def test(this):
this.addDetail("foo", self.get_content())
self.fail('yo')
self.assertDetailsProvided(Case("test"), "addFailure",
["foo", "traceback"])
def test_addSkip(self):
class Case(TestCase):
def test(this):
this.addDetail("foo", self.get_content())
self.skip('yo')
self.assertDetailsProvided(Case("test"), "addSkip",
["foo", "reason"])
def test_addSucccess(self):
class Case(TestCase):
def test(this):
this.addDetail("foo", self.get_content())
self.assertDetailsProvided(Case("test"), "addSuccess",
["foo"])
def test_addUnexpectedSuccess(self):
class Case(TestCase):
def test(this):
this.addDetail("foo", self.get_content())
raise testcase._UnexpectedSuccess()
self.assertDetailsProvided(Case("test"), "addUnexpectedSuccess",
["foo"])
class TestSetupTearDown(TestCase):
def test_setUpNotCalled(self):
class DoesnotcallsetUp(TestCase):
def setUp(self):
pass
def test_method(self):
pass
result = unittest.TestResult()
DoesnotcallsetUp('test_method').run(result)
self.assertEqual(1, len(result.errors))
def test_tearDownNotCalled(self):
class DoesnotcalltearDown(TestCase):
def test_method(self):
pass
def tearDown(self):
pass
result = unittest.TestResult()
DoesnotcalltearDown('test_method').run(result)
self.assertEqual(1, len(result.errors))
class TestSkipping(TestCase):
"""Tests for skipping of tests functionality."""
def test_skip_causes_skipException(self):
self.assertRaises(self.skipException, self.skip, "Skip this test")
def test_skip_without_reason_works(self):
class Test(TestCase):
def test(self):
raise self.skipException()
case = Test("test")
result = ExtendedTestResult()
case.run(result)
self.assertEqual('addSkip', result._events[1][0])
self.assertEqual('no reason given.',
''.join(result._events[1][2]['reason'].iter_text()))
def test_skipException_in_setup_calls_result_addSkip(self):
class TestThatRaisesInSetUp(TestCase):
def setUp(self):
TestCase.setUp(self)
self.skip("skipping this test")
def test_that_passes(self):
pass
calls = []
result = LoggingResult(calls)
test = TestThatRaisesInSetUp("test_that_passes")
test.run(result)
case = result._events[0][1]
self.assertEqual([('startTest', case),
('addSkip', case, "Text attachment: reason\n------------\n"
"skipping this test\n------------\n"), ('stopTest', case)],
calls)
def test_skipException_in_test_method_calls_result_addSkip(self):
class SkippingTest(TestCase):
def test_that_raises_skipException(self):
self.skip("skipping this test")
result = Python27TestResult()
test = SkippingTest("test_that_raises_skipException")
test.run(result)
case = result._events[0][1]
self.assertEqual([('startTest', case),
('addSkip', case, "Text attachment: reason\n------------\n"
"skipping this test\n------------\n"), ('stopTest', case)],
result._events)
def test_skip__in_setup_with_old_result_object_calls_addSuccess(self):
class SkippingTest(TestCase):
def setUp(self):
TestCase.setUp(self)
raise self.skipException("skipping this test")
def test_that_raises_skipException(self):
pass
result = Python26TestResult()
test = SkippingTest("test_that_raises_skipException")
test.run(result)
self.assertEqual('addSuccess', result._events[1][0])
def test_skip_with_old_result_object_calls_addError(self):
class SkippingTest(TestCase):
def test_that_raises_skipException(self):
raise self.skipException("skipping this test")
result = Python26TestResult()
test = SkippingTest("test_that_raises_skipException")
test.run(result)
self.assertEqual('addSuccess', result._events[1][0])
def test_skip_decorator(self):
class SkippingTest(TestCase):
@skip("skipping this test")
def test_that_is_decorated_with_skip(self):
self.fail()
result = Python26TestResult()
test = SkippingTest("test_that_is_decorated_with_skip")
test.run(result)
self.assertEqual('addSuccess', result._events[1][0])
def test_skipIf_decorator(self):
class SkippingTest(TestCase):
@skipIf(True, "skipping this test")
def test_that_is_decorated_with_skipIf(self):
self.fail()
result = Python26TestResult()
test = SkippingTest("test_that_is_decorated_with_skipIf")
test.run(result)
self.assertEqual('addSuccess', result._events[1][0])
def test_skipUnless_decorator(self):
class SkippingTest(TestCase):
@skipUnless(False, "skipping this test")
def test_that_is_decorated_with_skipUnless(self):
self.fail()
result = Python26TestResult()
test = SkippingTest("test_that_is_decorated_with_skipUnless")
test.run(result)
self.assertEqual('addSuccess', result._events[1][0])
class TestOnException(TestCase):
def test_default_works(self):
events = []
class Case(TestCase):
def method(self):
self.onException(an_exc_info)
events.append(True)
case = Case("method")
case.run()
self.assertThat(events, Equals([True]))
def test_added_handler_works(self):
events = []
class Case(TestCase):
def method(self):
self.addOnException(events.append)
self.onException(an_exc_info)
case = Case("method")
case.run()
self.assertThat(events, Equals([an_exc_info]))
def test_handler_that_raises_is_not_caught(self):
events = []
class Case(TestCase):
def method(self):
self.addOnException(events.index)
self.assertRaises(ValueError, self.onException, an_exc_info)
case = Case("method")
case.run()
self.assertThat(events, Equals([]))
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)

View File

@ -0,0 +1,74 @@
# Copyright (c) 2009 Jonathan M. Lange. See LICENSE for details.
"""Test suites and related things."""
__metaclass__ = type
__all__ = [
'ConcurrentTestSuite',
]
try:
import Queue
except ImportError:
import queue as Queue
import threading
import unittest
import testtools
class ConcurrentTestSuite(unittest.TestSuite):
"""A TestSuite whose run() calls out to a concurrency strategy."""
def __init__(self, suite, make_tests):
"""Create a ConcurrentTestSuite to execute suite.
:param suite: A suite to run concurrently.
:param make_tests: A helper function to split the tests in the
ConcurrentTestSuite into some number of concurrently executing
sub-suites. make_tests must take a suite, and return an iterable
of TestCase-like object, each of which must have a run(result)
method.
"""
super(ConcurrentTestSuite, self).__init__([suite])
self.make_tests = make_tests
def run(self, result):
"""Run the tests concurrently.
This calls out to the provided make_tests helper, and then serialises
the results so that result only sees activity from one TestCase at
a time.
ConcurrentTestSuite provides no special mechanism to stop the tests
returned by make_tests, it is up to the make_tests to honour the
shouldStop attribute on the result object they are run with, which will
be set if an exception is raised in the thread which
ConcurrentTestSuite.run is called in.
"""
tests = self.make_tests(self)
try:
threads = {}
queue = Queue.Queue()
result_semaphore = threading.Semaphore(1)
for test in tests:
process_result = testtools.ThreadsafeForwardingResult(result,
result_semaphore)
reader_thread = threading.Thread(
target=self._run_test, args=(test, process_result, queue))
threads[test] = reader_thread, process_result
reader_thread.start()
while threads:
finished_test = queue.get()
threads[finished_test][0].join()
del threads[finished_test]
except:
for thread, process_result in threads.values():
process_result.stop()
raise
def _run_test(self, test, process_result, queue):
try:
test.run(process_result)
finally:
queue.put(test)

View File

@ -0,0 +1,39 @@
# Copyright (c) 2008 Jonathan M. Lange. See LICENSE for details.
"""Utilities for dealing with stuff in unittest."""
import sys
__metaclass__ = type
__all__ = [
'iterate_tests',
]
if sys.version_info > (3, 0):
def _u(s):
"""Replacement for u'some string' in Python 3."""
return s
def _b(s):
"""A byte literal."""
return s.encode("latin-1")
advance_iterator = next
else:
def _u(s):
return unicode(s, "latin-1")
def _b(s):
return s
advance_iterator = lambda it: it.next()
def iterate_tests(test_suite_or_case):
"""Iterate through all of the test cases in `test_suite_or_case`."""
try:
suite = iter(test_suite_or_case)
except TypeError:
yield test_suite_or_case
else:
for test in suite:
for subtest in iterate_tests(test):
yield subtest

View File

@ -3,11 +3,14 @@
TARGETDIR="`dirname $0`"
WORKDIR="`mktemp -d`"
bzr branch lp:subunit "$WORKDIR/subunit"
bzr export "$WORKDIR/subunit" lp:subunit
bzr export "$WORKDIR/testtools" lp:testtools
for p in python/ filters/tap2subunit;
do
rsync -avz --delete "$WORKDIR/subunit/$p" "$TARGETDIR/$p"
done
rsync -avz --delete "$WORKDIR/testtools/testtools/" "$TARGETDIR/python/testtools/"
rm -rf "$WORKDIR"

View File

@ -10,7 +10,6 @@ import base64
import re
sys.path.append("bin/python")
sys.path.append("../lib/subunit/python")
import samba.getopt as options
@ -25,7 +24,7 @@ from samba.dcerpc import security
from samba.auth import system_session
from samba import Ldb
from subunit import SubunitTestRunner
from subunit.run import SubunitTestRunner
import unittest
parser = optparse.OptionParser("ldap [options] <host>")

View File

@ -11,7 +11,6 @@ import base64
import os
sys.path.append("bin/python")
sys.path.append("../lib/subunit/python")
import samba.getopt as options
@ -44,7 +43,7 @@ from samba import ATYPE_DISTRIBUTION_GLOBAL_GROUP
from samba import ATYPE_DISTRIBUTION_LOCAL_GROUP
from samba import ATYPE_DISTRIBUTION_UNIVERSAL_GROUP
from subunit import SubunitTestRunner
from subunit.run import SubunitTestRunner
import unittest
from samba.ndr import ndr_pack, ndr_unpack

View File

@ -11,7 +11,6 @@ import base64
import os
sys.path.append("bin/python")
sys.path.append("../lib/subunit/python")
import samba.getopt as options
@ -45,7 +44,7 @@ from samba import ATYPE_DISTRIBUTION_LOCAL_GROUP
from samba import ATYPE_DISTRIBUTION_UNIVERSAL_GROUP
from samba import DS_DC_FUNCTION_2003
from subunit import SubunitTestRunner
from subunit.run import SubunitTestRunner
import unittest
from samba.ndr import ndr_pack, ndr_unpack

View File

@ -11,7 +11,6 @@ import random
import time
sys.path.append("bin/python")
sys.path.append("../lib/subunit/python")
import samba.getopt as options
@ -27,7 +26,7 @@ from samba.dcerpc import security
from samba.auth import system_session
from samba import Ldb, DS_DOMAIN_FUNCTION_2008, SECINFO_OWNER, \
SECINFO_GROUP, SECINFO_DACL, SECINFO_SACL
from subunit import SubunitTestRunner
from subunit.run import SubunitTestRunner
import unittest
parser = optparse.OptionParser("sec_descriptor [options] <host>")

View File

@ -21,9 +21,8 @@ import sys
# Find right directory when running from source tree
sys.path.insert(0, "bin/python")
sys.path.insert(1, "../lib/subunit/python")
from subunit import SubunitTestRunner
from subunit.run import SubunitTestRunner
from unittest import TestProgram
import optparse
import os

View File

@ -30,6 +30,6 @@ $(eval $(foreach pyfile, $(_PY_FILES),$(call python_py_module_template,$(patsubs
EPYDOC_OPTIONS = --no-private --url http://www.samba.org/ --no-sourcecode
epydoc:: pythonmods
PYTHONPATH=$(pythonbuilddir):../lib/subunit/python epydoc $(EPYDOC_OPTIONS) samba tdb ldb subunit
PYTHONPATH=$(pythonbuilddir):../lib/subunit/python epydoc $(EPYDOC_OPTIONS) samba tdb ldb subunit testtools
install:: installpython