1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-24 21:34:56 +03:00

subunit: Import new version.

This commit is contained in:
Jelmer Vernooij 2009-12-31 18:48:41 +01:00
parent 2e38cb2cbb
commit b6b46b4978
27 changed files with 4060 additions and 503 deletions

26
lib/subunit/filters/tap2subunit Executable file
View File

@ -0,0 +1,26 @@
#!/usr/bin/env python
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""A filter that reads a TAP stream and outputs a subunit stream.
More information on TAP is available at
http://testanything.org/wiki/index.php/Main_Page.
"""
import sys
from subunit import TAP2SubUnit
sys.exit(TAP2SubUnit(sys.stdin, sys.stdout))

View File

@ -0,0 +1,20 @@
Copyright (c) 2007 Michael Twomey
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,26 @@
A simple package to deal with ISO 8601 date time formats.
ISO 8601 defines a neutral, unambiguous date string format, which also
has the property of sorting naturally.
e.g. YYYY-MM-DDTHH:MM:SSZ or 2007-01-25T12:00:00Z
Currently this covers only the most common date formats encountered, not
all of ISO 8601 is handled.
Currently the following formats are handled:
* 2006-01-01T00:00:00Z
* 2006-01-01T00:00:00[+-]00:00
I'll add more as I encounter them in my day to day life. Patches with
new formats and tests will be gratefully accepted of course :)
References:
* http://www.cl.cam.ac.uk/~mgk25/iso-time.html - simple overview
* http://hydracen.com/dx/iso8601.htm - more detailed enumeration of
valid formats.
See the LICENSE file for the license this package is released under.

View File

@ -0,0 +1,5 @@
This is a [slightly rearranged] import of http://pypi.python.org/pypi/iso8601/
version 0.1.4. The OS X hidden files have been stripped, and the package
turned into a single module, to simplify installation. The remainder of the
source distribution is included in the subunit source tree at python/iso8601
for reference.

View File

@ -0,0 +1,58 @@
try:
from setuptools import setup
except ImportError:
from distutils import setup
long_description="""Simple module to parse ISO 8601 dates
This module parses the most common forms of ISO 8601 date strings (e.g.
2007-01-14T20:34:22+00:00) into datetime objects.
>>> import iso8601
>>> iso8601.parse_date("2007-01-25T12:00:00Z")
datetime.datetime(2007, 1, 25, 12, 0, tzinfo=<iso8601.iso8601.Utc ...>)
>>>
Changes
=======
0.1.4
-----
* The default_timezone argument wasn't being passed through correctly,
UTC was being used in every case. Fixes issue 10.
0.1.3
-----
* Fixed the microsecond handling, the generated microsecond values were
way too small. Fixes issue 9.
0.1.2
-----
* Adding ParseError to __all__ in iso8601 module, allows people to import it.
Addresses issue 7.
* Be a little more flexible when dealing with dates without leading zeroes.
This violates the spec a little, but handles more dates as seen in the
field. Addresses issue 6.
* Allow date/time separators other than T.
0.1.1
-----
* When parsing dates without a timezone the specified default is used. If no
default is specified then UTC is used. Addresses issue 4.
"""
setup(
name="iso8601",
version="0.1.4",
description=long_description.split("\n")[0],
long_description=long_description,
author="Michael Twomey",
author_email="micktwomey+iso8601@gmail.com",
url="http://code.google.com/p/pyiso8601/",
packages=["iso8601"],
license="MIT",
)

View File

@ -0,0 +1,111 @@
import iso8601
def test_iso8601_regex():
assert iso8601.ISO8601_REGEX.match("2006-10-11T00:14:33Z")
def test_timezone_regex():
assert iso8601.TIMEZONE_REGEX.match("+01:00")
assert iso8601.TIMEZONE_REGEX.match("+00:00")
assert iso8601.TIMEZONE_REGEX.match("+01:20")
assert iso8601.TIMEZONE_REGEX.match("-01:00")
def test_parse_date():
d = iso8601.parse_date("2006-10-20T15:34:56Z")
assert d.year == 2006
assert d.month == 10
assert d.day == 20
assert d.hour == 15
assert d.minute == 34
assert d.second == 56
assert d.tzinfo == iso8601.UTC
def test_parse_date_fraction():
d = iso8601.parse_date("2006-10-20T15:34:56.123Z")
assert d.year == 2006
assert d.month == 10
assert d.day == 20
assert d.hour == 15
assert d.minute == 34
assert d.second == 56
assert d.microsecond == 123000
assert d.tzinfo == iso8601.UTC
def test_parse_date_fraction_2():
"""From bug 6
"""
d = iso8601.parse_date("2007-5-7T11:43:55.328Z'")
assert d.year == 2007
assert d.month == 5
assert d.day == 7
assert d.hour == 11
assert d.minute == 43
assert d.second == 55
assert d.microsecond == 328000
assert d.tzinfo == iso8601.UTC
def test_parse_date_tz():
d = iso8601.parse_date("2006-10-20T15:34:56.123+02:30")
assert d.year == 2006
assert d.month == 10
assert d.day == 20
assert d.hour == 15
assert d.minute == 34
assert d.second == 56
assert d.microsecond == 123000
assert d.tzinfo.tzname(None) == "+02:30"
offset = d.tzinfo.utcoffset(None)
assert offset.days == 0
assert offset.seconds == 60 * 60 * 2.5
def test_parse_invalid_date():
try:
iso8601.parse_date(None)
except iso8601.ParseError:
pass
else:
assert 1 == 2
def test_parse_invalid_date2():
try:
iso8601.parse_date("23")
except iso8601.ParseError:
pass
else:
assert 1 == 2
def test_parse_no_timezone():
"""issue 4 - Handle datetime string without timezone
This tests what happens when you parse a date with no timezone. While not
strictly correct this is quite common. I'll assume UTC for the time zone
in this case.
"""
d = iso8601.parse_date("2007-01-01T08:00:00")
assert d.year == 2007
assert d.month == 1
assert d.day == 1
assert d.hour == 8
assert d.minute == 0
assert d.second == 0
assert d.microsecond == 0
assert d.tzinfo == iso8601.UTC
def test_parse_no_timezone_different_default():
tz = iso8601.FixedOffset(2, 0, "test offset")
d = iso8601.parse_date("2007-01-01T08:00:00", default_timezone=tz)
assert d.tzinfo == tz
def test_space_separator():
"""Handle a separator other than T
"""
d = iso8601.parse_date("2007-06-23 06:40:34.00Z")
assert d.year == 2007
assert d.month == 6
assert d.day == 23
assert d.hour == 6
assert d.minute == 40
assert d.second == 34
assert d.microsecond == 0
assert d.tzinfo == iso8601.UTC

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,164 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Encoder/decoder for http style chunked encoding."""
class Decoder(object):
"""Decode chunked content to a byte stream."""
def __init__(self, output):
"""Create a decoder decoding to output.
:param output: A file-like object. Bytes written to the Decoder are
decoded to strip off the chunking and written to the output.
Up to a full write worth of data or a single control line may be
buffered (whichever is larger). The close method should be called
when no more data is available, to detect short streams; the
write method will return none-None when the end of a stream is
detected.
"""
self.output = output
self.buffered_bytes = []
self.state = self._read_length
self.body_length = 0
def close(self):
"""Close the decoder.
:raises ValueError: If the stream is incomplete ValueError is raised.
"""
if self.state != self._finished:
raise ValueError("incomplete stream")
def _finished(self):
"""Finished reading, return any remaining bytes."""
if self.buffered_bytes:
buffered_bytes = self.buffered_bytes
self.buffered_bytes = []
return ''.join(buffered_bytes)
else:
raise ValueError("stream is finished")
def _read_body(self):
"""Pass body bytes to the output."""
while self.body_length and self.buffered_bytes:
if self.body_length >= len(self.buffered_bytes[0]):
self.output.write(self.buffered_bytes[0])
self.body_length -= len(self.buffered_bytes[0])
del self.buffered_bytes[0]
# No more data available.
if not self.body_length:
self.state = self._read_length
else:
self.output.write(self.buffered_bytes[0][:self.body_length])
self.buffered_bytes[0] = \
self.buffered_bytes[0][self.body_length:]
self.body_length = 0
self.state = self._read_length
return self.state()
def _read_length(self):
"""Try to decode a length from the bytes."""
count = -1
match_chars = "0123456789abcdefABCDEF\r\n"
count_chars = []
for bytes in self.buffered_bytes:
for byte in bytes:
if byte not in match_chars:
break
count_chars.append(byte)
if byte == '\n':
break
if not count_chars:
return
if count_chars[-1][-1] != '\n':
return
count_str = ''.join(count_chars)
self.body_length = int(count_str[:-2], 16)
excess_bytes = len(count_str)
while excess_bytes:
if excess_bytes >= len(self.buffered_bytes[0]):
excess_bytes -= len(self.buffered_bytes[0])
del self.buffered_bytes[0]
else:
self.buffered_bytes[0] = self.buffered_bytes[0][excess_bytes:]
excess_bytes = 0
if not self.body_length:
self.state = self._finished
if not self.buffered_bytes:
# May not call into self._finished with no buffered data.
return ''
else:
self.state = self._read_body
return self.state()
def write(self, bytes):
"""Decode bytes to the output stream.
:raises ValueError: If the stream has already seen the end of file
marker.
:returns: None, or the excess bytes beyond the end of file marker.
"""
if bytes:
self.buffered_bytes.append(bytes)
return self.state()
class Encoder(object):
"""Encode content to a stream using HTTP Chunked coding."""
def __init__(self, output):
"""Create an encoder encoding to output.
:param output: A file-like object. Bytes written to the Encoder
will be encoded using HTTP chunking. Small writes may be buffered
and the ``close`` method must be called to finish the stream.
"""
self.output = output
self.buffered_bytes = []
self.buffer_size = 0
def flush(self, extra_len=0):
"""Flush the encoder to the output stream.
:param extra_len: Increase the size of the chunk by this many bytes
to allow for a subsequent write.
"""
if not self.buffer_size and not extra_len:
return
buffered_bytes = self.buffered_bytes
buffer_size = self.buffer_size
self.buffered_bytes = []
self.buffer_size = 0
self.output.write("%X\r\n" % (buffer_size + extra_len))
if buffer_size:
self.output.write(''.join(buffered_bytes))
return True
def write(self, bytes):
"""Encode bytes to the output stream."""
bytes_len = len(bytes)
if self.buffer_size + bytes_len >= 65536:
self.flush(bytes_len)
self.output.write(bytes)
else:
self.buffered_bytes.append(bytes)
self.buffer_size += bytes_len
def close(self):
"""Finish the stream. This does not close the output stream."""
self.flush()
self.output.write("0\r\n")

View File

@ -0,0 +1,109 @@
#
# subunit: extensions to Python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Handlers for outcome details."""
from cStringIO import StringIO
from testtools import content, content_type
import chunked
class DetailsParser(object):
"""Base class/API reference for details parsing."""
class SimpleDetailsParser(DetailsParser):
"""Parser for single-part [] delimited details."""
def __init__(self, state):
self._message = ""
self._state = state
def lineReceived(self, line):
if line == "]\n":
self._state.endDetails()
return
if line[0:2] == " ]":
# quoted ] start
self._message += line[1:]
else:
self._message += line
def get_details(self, style=None):
result = {}
if not style:
result['traceback'] = content.Content(
content_type.ContentType("text", "x-traceback"),
lambda:[self._message])
else:
if style == 'skip':
name = 'reason'
else:
name = 'message'
result[name] = content.Content(
content_type.ContentType("text", "plain"),
lambda:[self._message])
return result
def get_message(self):
return self._message
class MultipartDetailsParser(DetailsParser):
"""Parser for multi-part [] surrounded MIME typed chunked details."""
def __init__(self, state):
self._state = state
self._details = {}
self._parse_state = self._look_for_content
def _look_for_content(self, line):
if line == "]\n":
self._state.endDetails()
return
# TODO error handling
field, value = line[:-1].split(' ', 1)
main, sub = value.split('/')
self._content_type = content_type.ContentType(main, sub)
self._parse_state = self._get_name
def _get_name(self, line):
self._name = line[:-1]
self._body = StringIO()
self._chunk_parser = chunked.Decoder(self._body)
self._parse_state = self._feed_chunks
def _feed_chunks(self, line):
residue = self._chunk_parser.write(line)
if residue is not None:
# Line based use always ends on no residue.
assert residue == ''
body = self._body
self._details[self._name] = content.Content(
self._content_type, lambda:[body.getvalue()])
self._chunk_parser.close()
self._parse_state = self._look_for_content
def get_details(self, for_skip=False):
return self._details
def get_message(self):
return None
def lineReceived(self, line):
self._parse_state(line)

View File

@ -0,0 +1,123 @@
# Copyright (c) 2007 Michael Twomey
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""ISO 8601 date time string parsing
Basic usage:
>>> import iso8601
>>> iso8601.parse_date("2007-01-25T12:00:00Z")
datetime.datetime(2007, 1, 25, 12, 0, tzinfo=<iso8601.iso8601.Utc ...>)
>>>
"""
from datetime import datetime, timedelta, tzinfo
import re
__all__ = ["parse_date", "ParseError"]
# Adapted from http://delete.me.uk/2005/03/iso8601.html
ISO8601_REGEX = re.compile(r"(?P<year>[0-9]{4})(-(?P<month>[0-9]{1,2})(-(?P<day>[0-9]{1,2})"
r"((?P<separator>.)(?P<hour>[0-9]{2}):(?P<minute>[0-9]{2})(:(?P<second>[0-9]{2})(\.(?P<fraction>[0-9]+))?)?"
r"(?P<timezone>Z|(([-+])([0-9]{2}):([0-9]{2})))?)?)?)?"
)
TIMEZONE_REGEX = re.compile("(?P<prefix>[+-])(?P<hours>[0-9]{2}).(?P<minutes>[0-9]{2})")
class ParseError(Exception):
"""Raised when there is a problem parsing a date string"""
# Yoinked from python docs
ZERO = timedelta(0)
class Utc(tzinfo):
"""UTC
"""
def utcoffset(self, dt):
return ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return ZERO
UTC = Utc()
class FixedOffset(tzinfo):
"""Fixed offset in hours and minutes from UTC
"""
def __init__(self, offset_hours, offset_minutes, name):
self.__offset = timedelta(hours=offset_hours, minutes=offset_minutes)
self.__name = name
def utcoffset(self, dt):
return self.__offset
def tzname(self, dt):
return self.__name
def dst(self, dt):
return ZERO
def __repr__(self):
return "<FixedOffset %r>" % self.__name
def parse_timezone(tzstring, default_timezone=UTC):
"""Parses ISO 8601 time zone specs into tzinfo offsets
"""
if tzstring == "Z":
return default_timezone
# This isn't strictly correct, but it's common to encounter dates without
# timezones so I'll assume the default (which defaults to UTC).
# Addresses issue 4.
if tzstring is None:
return default_timezone
m = TIMEZONE_REGEX.match(tzstring)
prefix, hours, minutes = m.groups()
hours, minutes = int(hours), int(minutes)
if prefix == "-":
hours = -hours
minutes = -minutes
return FixedOffset(hours, minutes, tzstring)
def parse_date(datestring, default_timezone=UTC):
"""Parses ISO 8601 dates into datetime objects
The timezone is parsed from the date string. However it is quite common to
have dates without a timezone (not strictly correct). In this case the
default timezone specified in default_timezone is used. This is UTC by
default.
"""
if not isinstance(datestring, basestring):
raise ParseError("Expecting a string %r" % datestring)
m = ISO8601_REGEX.match(datestring)
if not m:
raise ParseError("Unable to parse date string %r" % datestring)
groups = m.groupdict()
tz = parse_timezone(groups["timezone"], default_timezone=default_timezone)
if groups["fraction"] is None:
groups["fraction"] = 0
else:
groups["fraction"] = int(float("0.%s" % groups["fraction"]) * 1e6)
return datetime(int(groups["year"]), int(groups["month"]), int(groups["day"]),
int(groups["hour"]), int(groups["minute"]), int(groups["second"]),
int(groups["fraction"]), tz)

View File

@ -0,0 +1,106 @@
#
# subunit: extensions to Python unittest to get test results from subprocesses.
# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Support for dealing with progress state."""
class ProgressModel(object):
"""A model of progress indicators as subunit defines it.
Instances of this class represent a single logical operation that is
progressing. The operation may have many steps, and some of those steps may
supply their own progress information. ProgressModel uses a nested concept
where the overall state can be pushed, creating new starting state, and
later pushed to return to the prior state. Many user interfaces will want
to display an overall summary though, and accordingly the pos() and width()
methods return overall summary information rather than information on the
current subtask.
The default state is 0/0 - indicating that the overall progress is unknown.
Anytime the denominator of pos/width is 0, rendering of a ProgressModel
should should take this into consideration.
:ivar: _tasks. This private attribute stores the subtasks. Each is a tuple:
pos, width, overall_numerator, overall_denominator. The overall fields
store the calculated overall numerator and denominator for the state
that was pushed.
"""
def __init__(self):
"""Create a ProgressModel.
The new model has no progress data at all - it will claim a summary
width of zero and position of 0.
"""
self._tasks = []
self.push()
def adjust_width(self, offset):
"""Adjust the with of the current subtask."""
self._tasks[-1][1] += offset
def advance(self):
"""Advance the current subtask."""
self._tasks[-1][0] += 1
def pop(self):
"""Pop a subtask off the ProgressModel.
See push for a description of how push and pop work.
"""
self._tasks.pop()
def pos(self):
"""Return how far through the operation has progressed."""
if not self._tasks:
return 0
task = self._tasks[-1]
if len(self._tasks) > 1:
# scale up the overall pos by the current task or preserve it if
# no current width is known.
offset = task[2] * (task[1] or 1)
else:
offset = 0
return offset + task[0]
def push(self):
"""Push a new subtask.
After pushing a new subtask, the overall progress hasn't changed. Calls
to adjust_width, advance, set_width will only after the progress within
the range that calling 'advance' would have before - the subtask
represents progressing one step in the earlier task.
Call pop() to restore the progress model to the state before push was
called.
"""
self._tasks.append([0, 0, self.pos(), self.width()])
def set_width(self, width):
"""Set the width of the current subtask."""
self._tasks[-1][1] = width
def width(self):
"""Return the total width of the operation."""
if not self._tasks:
return 0
task = self._tasks[-1]
if len(self._tasks) > 1:
# scale up the overall width by the current task or preserve it if
# no current width is known.
return task[3] * (task[1] or 1)
else:
return task[1]

View File

@ -0,0 +1,46 @@
#!/usr/bin/python
#
# Simple subunit testrunner for python
# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Run a unittest testcase reporting results as Subunit.
$ python -m subunit.run mylib.tests.test_suite
"""
import sys
from subunit import TestProtocolClient
class SubunitTestRunner(object):
def __init__(self, stream=sys.stdout):
self.stream = stream
def run(self, test):
"Run the given test case or test suite."
result = TestProtocolClient(self.stream)
test(result)
return result
if __name__ == '__main__':
import optparse
from unittest import TestProgram
parser = optparse.OptionParser(__doc__)
args = parser.parse_args()[1]
runner = SubunitTestRunner()
program = TestProgram(module=None, argv=[sys.argv[0]] + args,
testRunner=runner)

View File

@ -0,0 +1,334 @@
#
# subunit: extensions to Python unittest to get test results from subprocesses.
# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""TestResult helper classes used to by subunit."""
import datetime
import iso8601
import testtools
import subunit
# NOT a TestResult, because we are implementing the interface, not inheriting
# it.
class TestResultDecorator(object):
"""General pass-through decorator.
This provides a base that other TestResults can inherit from to
gain basic forwarding functionality. It also takes care of
handling the case where the target doesn't support newer methods
or features by degrading them.
"""
def __init__(self, decorated):
"""Create a TestResultDecorator forwarding to decorated."""
# Make every decorator degrade gracefully.
self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
def startTest(self, test):
return self.decorated.startTest(test)
def startTestRun(self):
return self.decorated.startTestRun()
def stopTest(self, test):
return self.decorated.stopTest(test)
def stopTestRun(self):
return self.decorated.stopTestRun()
def addError(self, test, err=None, details=None):
return self.decorated.addError(test, err, details=details)
def addFailure(self, test, err=None, details=None):
return self.decorated.addFailure(test, err, details=details)
def addSuccess(self, test, details=None):
return self.decorated.addSuccess(test, details=details)
def addSkip(self, test, reason=None, details=None):
return self.decorated.addSkip(test, reason, details=details)
def addExpectedFailure(self, test, err=None, details=None):
return self.decorated.addExpectedFailure(test, err, details=details)
def addUnexpectedSuccess(self, test, details=None):
return self.decorated.addUnexpectedSuccess(test, details=details)
def progress(self, offset, whence):
return self.decorated.progress(offset, whence)
def wasSuccessful(self):
return self.decorated.wasSuccessful()
@property
def shouldStop(self):
return self.decorated.shouldStop
def stop(self):
return self.decorated.stop()
def tags(self, gone_tags, new_tags):
return self.decorated.time(gone_tags, new_tags)
def time(self, a_datetime):
return self.decorated.time(a_datetime)
class HookedTestResultDecorator(TestResultDecorator):
"""A TestResult which calls a hook on every event."""
def __init__(self, decorated):
self.super = super(HookedTestResultDecorator, self)
self.super.__init__(decorated)
def startTest(self, test):
self._before_event()
return self.super.startTest(test)
def startTestRun(self):
self._before_event()
return self.super.startTestRun()
def stopTest(self, test):
self._before_event()
return self.super.stopTest(test)
def stopTestRun(self):
self._before_event()
return self.super.stopTestRun()
def addError(self, test, err=None, details=None):
self._before_event()
return self.super.addError(test, err, details=details)
def addFailure(self, test, err=None, details=None):
self._before_event()
return self.super.addFailure(test, err, details=details)
def addSuccess(self, test, details=None):
self._before_event()
return self.super.addSuccess(test, details=details)
def addSkip(self, test, reason=None, details=None):
self._before_event()
return self.super.addSkip(test, reason, details=details)
def addExpectedFailure(self, test, err=None, details=None):
self._before_event()
return self.super.addExpectedFailure(test, err, details=details)
def addUnexpectedSuccess(self, test, details=None):
self._before_event()
return self.super.addUnexpectedSuccess(test, details=details)
def progress(self, offset, whence):
self._before_event()
return self.super.progress(offset, whence)
def wasSuccessful(self):
self._before_event()
return self.super.wasSuccessful()
@property
def shouldStop(self):
self._before_event()
return self.super.shouldStop
def stop(self):
self._before_event()
return self.super.stop()
def time(self, a_datetime):
self._before_event()
return self.super.time(a_datetime)
class AutoTimingTestResultDecorator(HookedTestResultDecorator):
"""Decorate a TestResult to add time events to a test run.
By default this will cause a time event before every test event,
but if explicit time data is being provided by the test run, then
this decorator will turn itself off to prevent causing confusion.
"""
def __init__(self, decorated):
self._time = None
super(AutoTimingTestResultDecorator, self).__init__(decorated)
def _before_event(self):
time = self._time
if time is not None:
return
time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
self.decorated.time(time)
def progress(self, offset, whence):
return self.decorated.progress(offset, whence)
@property
def shouldStop(self):
return self.decorated.shouldStop
def time(self, a_datetime):
"""Provide a timestamp for the current test activity.
:param a_datetime: If None, automatically add timestamps before every
event (this is the default behaviour if time() is not called at
all). If not None, pass the provided time onto the decorated
result object and disable automatic timestamps.
"""
self._time = a_datetime
return self.decorated.time(a_datetime)
class TestResultFilter(TestResultDecorator):
"""A pyunit TestResult interface implementation which filters tests.
Tests that pass the filter are handed on to another TestResult instance
for further processing/reporting. To obtain the filtered results,
the other instance must be interrogated.
:ivar result: The result that tests are passed to after filtering.
:ivar filter_predicate: The callback run to decide whether to pass
a result.
"""
def __init__(self, result, filter_error=False, filter_failure=False,
filter_success=True, filter_skip=False,
filter_predicate=None):
"""Create a FilterResult object filtering to result.
:param filter_error: Filter out errors.
:param filter_failure: Filter out failures.
:param filter_success: Filter out successful tests.
:param filter_skip: Filter out skipped tests.
:param filter_predicate: A callable taking (test, outcome, err,
details) and returning True if the result should be passed
through. err and details may be none if no error or extra
metadata is available. outcome is the name of the outcome such
as 'success' or 'failure'.
"""
TestResultDecorator.__init__(self, result)
self._filter_error = filter_error
self._filter_failure = filter_failure
self._filter_success = filter_success
self._filter_skip = filter_skip
if filter_predicate is None:
filter_predicate = lambda test, outcome, err, details: True
self.filter_predicate = filter_predicate
# The current test (for filtering tags)
self._current_test = None
# Has the current test been filtered (for outputting test tags)
self._current_test_filtered = None
# The (new, gone) tags for the current test.
self._current_test_tags = None
def addError(self, test, err=None, details=None):
if (not self._filter_error and
self.filter_predicate(test, 'error', err, details)):
self.decorated.startTest(test)
self.decorated.addError(test, err, details=details)
else:
self._filtered()
def addFailure(self, test, err=None, details=None):
if (not self._filter_failure and
self.filter_predicate(test, 'failure', err, details)):
self.decorated.startTest(test)
self.decorated.addFailure(test, err, details=details)
else:
self._filtered()
def addSkip(self, test, reason=None, details=None):
if (not self._filter_skip and
self.filter_predicate(test, 'skip', reason, details)):
self.decorated.startTest(test)
self.decorated.addSkip(test, reason, details=details)
else:
self._filtered()
def addSuccess(self, test, details=None):
if (not self._filter_success and
self.filter_predicate(test, 'success', None, details)):
self.decorated.startTest(test)
self.decorated.addSuccess(test, details=details)
else:
self._filtered()
def addExpectedFailure(self, test, err=None, details=None):
if self.filter_predicate(test, 'expectedfailure', err, details):
self.decorated.startTest(test)
return self.decorated.addExpectedFailure(test, err,
details=details)
else:
self._filtered()
def addUnexpectedSuccess(self, test, details=None):
self.decorated.startTest(test)
return self.decorated.addUnexpectedSuccess(test, details=details)
def _filtered(self):
self._current_test_filtered = True
def startTest(self, test):
"""Start a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
self._current_test = test
self._current_test_filtered = False
self._current_test_tags = set(), set()
def stopTest(self, test):
"""Stop a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
if not self._current_test_filtered:
# Tags to output for this test.
if self._current_test_tags[0] or self._current_test_tags[1]:
self.decorated.tags(*self._current_test_tags)
self.decorated.stopTest(test)
self._current_test = None
self._current_test_filtered = None
self._current_test_tags = None
def tags(self, new_tags, gone_tags):
"""Handle tag instructions.
Adds and removes tags as appropriate. If a test is currently running,
tags are not affected for subsequent tests.
:param new_tags: Tags to add,
:param gone_tags: Tags to remove.
"""
if self._current_test is not None:
# gather the tags until the test stops.
self._current_test_tags[0].update(new_tags)
self._current_test_tags[0].difference_update(gone_tags)
self._current_test_tags[1].update(gone_tags)
self._current_test_tags[1].difference_update(new_tags)
return self.decorated.tags(new_tags, gone_tags)
def id_to_orig_id(self, id):
if id.startswith("subunit.RemotedTestCase."):
return id[len("subunit.RemotedTestCase."):]
return id

View File

@ -2,24 +2,40 @@
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
from subunit.tests import TestUtil, test_test_protocol
from subunit.tests import (
TestUtil,
test_chunked,
test_details,
test_progress_model,
test_subunit_filter,
test_subunit_stats,
test_subunit_tags,
test_tap2subunit,
test_test_protocol,
test_test_results,
)
def test_suite():
result = TestUtil.TestSuite()
result.addTest(test_chunked.test_suite())
result.addTest(test_details.test_suite())
result.addTest(test_progress_model.test_suite())
result.addTest(test_test_results.test_suite())
result.addTest(test_test_protocol.test_suite())
result.addTest(test_tap2subunit.test_suite())
result.addTest(test_subunit_filter.test_suite())
result.addTest(test_subunit_tags.test_suite())
result.addTest(test_subunit_stats.test_suite())
return result

View File

@ -1,5 +1,12 @@
#!/usr/bin/env python
import sys
if len(sys.argv) == 2:
# subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args
# uses this code path to be sure that the arguments were passed to
# sample-script.py
print "test fail"
print "error fail"
sys.exit(0)
print "test old mcdonald"
print "success old mcdonald"
print "test bing crosby"

View File

@ -0,0 +1,127 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
from cStringIO import StringIO
import unittest
import subunit.chunked
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)
return result
class TestDecode(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.output = StringIO()
self.decoder = subunit.chunked.Decoder(self.output)
def test_close_read_length_short_errors(self):
self.assertRaises(ValueError, self.decoder.close)
def test_close_body_short_errors(self):
self.assertEqual(None, self.decoder.write('2\r\na'))
self.assertRaises(ValueError, self.decoder.close)
def test_close_body_buffered_data_errors(self):
self.assertEqual(None, self.decoder.write('2\r'))
self.assertRaises(ValueError, self.decoder.close)
def test_close_after_finished_stream_safe(self):
self.assertEqual(None, self.decoder.write('2\r\nab'))
self.assertEqual('', self.decoder.write('0\r\n'))
self.decoder.close()
def test_decode_nothing(self):
self.assertEqual('', self.decoder.write('0\r\n'))
self.assertEqual('', self.output.getvalue())
def test_decode_serialised_form(self):
self.assertEqual(None, self.decoder.write("F\r\n"))
self.assertEqual(None, self.decoder.write("serialised\n"))
self.assertEqual('', self.decoder.write("form0\r\n"))
def test_decode_short(self):
self.assertEqual('', self.decoder.write('3\r\nabc0\r\n'))
self.assertEqual('abc', self.output.getvalue())
def test_decode_combines_short(self):
self.assertEqual('', self.decoder.write('6\r\nabcdef0\r\n'))
self.assertEqual('abcdef', self.output.getvalue())
def test_decode_excess_bytes_from_write(self):
self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234'))
self.assertEqual('abc', self.output.getvalue())
def test_decode_write_after_finished_errors(self):
self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234'))
self.assertRaises(ValueError, self.decoder.write, '')
def test_decode_hex(self):
self.assertEqual('', self.decoder.write('A\r\n12345678900\r\n'))
self.assertEqual('1234567890', self.output.getvalue())
def test_decode_long_ranges(self):
self.assertEqual(None, self.decoder.write('10000\r\n'))
self.assertEqual(None, self.decoder.write('1' * 65536))
self.assertEqual(None, self.decoder.write('10000\r\n'))
self.assertEqual(None, self.decoder.write('2' * 65536))
self.assertEqual('', self.decoder.write('0\r\n'))
self.assertEqual('1' * 65536 + '2' * 65536, self.output.getvalue())
class TestEncode(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.output = StringIO()
self.encoder = subunit.chunked.Encoder(self.output)
def test_encode_nothing(self):
self.encoder.close()
self.assertEqual('0\r\n', self.output.getvalue())
def test_encode_empty(self):
self.encoder.write('')
self.encoder.close()
self.assertEqual('0\r\n', self.output.getvalue())
def test_encode_short(self):
self.encoder.write('abc')
self.encoder.close()
self.assertEqual('3\r\nabc0\r\n', self.output.getvalue())
def test_encode_combines_short(self):
self.encoder.write('abc')
self.encoder.write('def')
self.encoder.close()
self.assertEqual('6\r\nabcdef0\r\n', self.output.getvalue())
def test_encode_over_9_is_in_hex(self):
self.encoder.write('1234567890')
self.encoder.close()
self.assertEqual('A\r\n12345678900\r\n', self.output.getvalue())
def test_encode_long_ranges_not_combined(self):
self.encoder.write('1' * 65536)
self.encoder.write('2' * 65536)
self.encoder.close()
self.assertEqual('10000\r\n' + '1' * 65536 + '10000\r\n' +
'2' * 65536 + '0\r\n', self.output.getvalue())

View File

@ -0,0 +1,110 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
from cStringIO import StringIO
import unittest
import subunit.tests
from subunit import content, content_type, details
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)
return result
class TestSimpleDetails(unittest.TestCase):
def test_lineReceived(self):
parser = details.SimpleDetailsParser(None)
parser.lineReceived("foo\n")
parser.lineReceived("bar\n")
self.assertEqual("foo\nbar\n", parser._message)
def test_lineReceived_escaped_bracket(self):
parser = details.SimpleDetailsParser(None)
parser.lineReceived("foo\n")
parser.lineReceived(" ]are\n")
parser.lineReceived("bar\n")
self.assertEqual("foo\n]are\nbar\n", parser._message)
def test_get_message(self):
parser = details.SimpleDetailsParser(None)
self.assertEqual("", parser.get_message())
def test_get_details(self):
parser = details.SimpleDetailsParser(None)
traceback = ""
expected = {}
expected['traceback'] = content.Content(
content_type.ContentType("text", "x-traceback"),
lambda:[""])
found = parser.get_details()
self.assertEqual(expected.keys(), found.keys())
self.assertEqual(expected['traceback'].content_type,
found['traceback'].content_type)
self.assertEqual(''.join(expected['traceback'].iter_bytes()),
''.join(found['traceback'].iter_bytes()))
def test_get_details_skip(self):
parser = details.SimpleDetailsParser(None)
traceback = ""
expected = {}
expected['reason'] = content.Content(
content_type.ContentType("text", "plain"),
lambda:[""])
found = parser.get_details("skip")
self.assertEqual(expected, found)
def test_get_details_success(self):
parser = details.SimpleDetailsParser(None)
traceback = ""
expected = {}
expected['message'] = content.Content(
content_type.ContentType("text", "plain"),
lambda:[""])
found = parser.get_details("success")
self.assertEqual(expected, found)
class TestMultipartDetails(unittest.TestCase):
def test_get_message_is_None(self):
parser = details.MultipartDetailsParser(None)
self.assertEqual(None, parser.get_message())
def test_get_details(self):
parser = details.MultipartDetailsParser(None)
self.assertEqual({}, parser.get_details())
def test_parts(self):
parser = details.MultipartDetailsParser(None)
parser.lineReceived("Content-Type: text/plain\n")
parser.lineReceived("something\n")
parser.lineReceived("F\r\n")
parser.lineReceived("serialised\n")
parser.lineReceived("form0\r\n")
expected = {}
expected['something'] = content.Content(
content_type.ContentType("text", "plain"),
lambda:["serialised\nform"])
found = parser.get_details()
self.assertEqual(expected.keys(), found.keys())
self.assertEqual(expected['something'].content_type,
found['something'].content_type)
self.assertEqual(''.join(expected['something'].iter_bytes()),
''.join(found['something'].iter_bytes()))

View File

@ -0,0 +1,118 @@
#
# subunit: extensions to Python unittest to get test results from subprocesses.
# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
import unittest
import subunit
from subunit.progress_model import ProgressModel
class TestProgressModel(unittest.TestCase):
def assertProgressSummary(self, pos, total, progress):
"""Assert that a progress model has reached a particular point."""
self.assertEqual(pos, progress.pos())
self.assertEqual(total, progress.width())
def test_new_progress_0_0(self):
progress = ProgressModel()
self.assertProgressSummary(0, 0, progress)
def test_advance_0_0(self):
progress = ProgressModel()
progress.advance()
self.assertProgressSummary(1, 0, progress)
def test_advance_1_0(self):
progress = ProgressModel()
progress.advance()
self.assertProgressSummary(1, 0, progress)
def test_set_width_absolute(self):
progress = ProgressModel()
progress.set_width(10)
self.assertProgressSummary(0, 10, progress)
def test_set_width_absolute_preserves_pos(self):
progress = ProgressModel()
progress.advance()
progress.set_width(2)
self.assertProgressSummary(1, 2, progress)
def test_adjust_width(self):
progress = ProgressModel()
progress.adjust_width(10)
self.assertProgressSummary(0, 10, progress)
progress.adjust_width(-10)
self.assertProgressSummary(0, 0, progress)
def test_adjust_width_preserves_pos(self):
progress = ProgressModel()
progress.advance()
progress.adjust_width(10)
self.assertProgressSummary(1, 10, progress)
progress.adjust_width(-10)
self.assertProgressSummary(1, 0, progress)
def test_push_preserves_progress(self):
progress = ProgressModel()
progress.adjust_width(3)
progress.advance()
progress.push()
self.assertProgressSummary(1, 3, progress)
def test_advance_advances_substack(self):
progress = ProgressModel()
progress.adjust_width(3)
progress.advance()
progress.push()
progress.adjust_width(1)
progress.advance()
self.assertProgressSummary(2, 3, progress)
def test_adjust_width_adjusts_substack(self):
progress = ProgressModel()
progress.adjust_width(3)
progress.advance()
progress.push()
progress.adjust_width(2)
progress.advance()
self.assertProgressSummary(3, 6, progress)
def test_set_width_adjusts_substack(self):
progress = ProgressModel()
progress.adjust_width(3)
progress.advance()
progress.push()
progress.set_width(2)
progress.advance()
self.assertProgressSummary(3, 6, progress)
def test_pop_restores_progress(self):
progress = ProgressModel()
progress.adjust_width(3)
progress.advance()
progress.push()
progress.adjust_width(1)
progress.advance()
progress.pop()
self.assertProgressSummary(1, 3, progress)
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)
return result

View File

@ -0,0 +1,136 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Tests for subunit.TestResultFilter."""
import unittest
from StringIO import StringIO
import subunit
from subunit.test_results import TestResultFilter
class TestTestResultFilter(unittest.TestCase):
"""Test for TestResultFilter, a TestResult object which filters tests."""
def _setUp(self):
self.output = StringIO()
def test_default(self):
"""The default is to exclude success and include everything else."""
self.filtered_result = unittest.TestResult()
self.filter = TestResultFilter(self.filtered_result)
self.run_tests()
# skips are seen as success by default python TestResult.
self.assertEqual(['error'],
[error[0].id() for error in self.filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
self.filtered_result.failures])
self.assertEqual(4, self.filtered_result.testsRun)
def test_exclude_errors(self):
self.filtered_result = unittest.TestResult()
self.filter = TestResultFilter(self.filtered_result,
filter_error=True)
self.run_tests()
# skips are seen as errors by default python TestResult.
self.assertEqual([], self.filtered_result.errors)
self.assertEqual(['failed'],
[failure[0].id() for failure in
self.filtered_result.failures])
self.assertEqual(3, self.filtered_result.testsRun)
def test_exclude_failure(self):
self.filtered_result = unittest.TestResult()
self.filter = TestResultFilter(self.filtered_result,
filter_failure=True)
self.run_tests()
self.assertEqual(['error'],
[error[0].id() for error in self.filtered_result.errors])
self.assertEqual([],
[failure[0].id() for failure in
self.filtered_result.failures])
self.assertEqual(3, self.filtered_result.testsRun)
def test_exclude_skips(self):
self.filtered_result = subunit.TestResultStats(None)
self.filter = TestResultFilter(self.filtered_result,
filter_skip=True)
self.run_tests()
self.assertEqual(0, self.filtered_result.skipped_tests)
self.assertEqual(2, self.filtered_result.failed_tests)
self.assertEqual(3, self.filtered_result.testsRun)
def test_include_success(self):
"""Success's can be included if requested."""
self.filtered_result = unittest.TestResult()
self.filter = TestResultFilter(self.filtered_result,
filter_success=False)
self.run_tests()
self.assertEqual(['error'],
[error[0].id() for error in self.filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
self.filtered_result.failures])
self.assertEqual(5, self.filtered_result.testsRun)
def test_filter_predicate(self):
"""You can filter by predicate callbacks"""
self.filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details):
return outcome == 'success'
self.filter = TestResultFilter(self.filtered_result,
filter_predicate=filter_cb,
filter_success=False)
self.run_tests()
# Only success should pass
self.assertEqual(1, self.filtered_result.testsRun)
def run_tests(self):
self.setUpTestStream()
self.test = subunit.ProtocolTestCase(self.input_stream)
self.test.run(self.filter)
def setUpTestStream(self):
# While TestResultFilter works on python objects, using a subunit
# stream is an easy pithy way of getting a series of test objects to
# call into the TestResult, and as TestResultFilter is intended for
# use with subunit also has the benefit of detecting any interface
# skew issues.
self.input_stream = StringIO()
self.input_stream.write("""tags: global
test passed
success passed
test failed
tags: local
failure failed
test error
error error [
error details
]
test skipped
skip skipped
test todo
xfail todo
""")
self.input_stream.seek(0)
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)
return result

View File

@ -0,0 +1,83 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Tests for subunit.TestResultStats."""
import unittest
from StringIO import StringIO
import subunit
class TestTestResultStats(unittest.TestCase):
"""Test for TestResultStats, a TestResult object that generates stats."""
def setUp(self):
self.output = StringIO()
self.result = subunit.TestResultStats(self.output)
self.input_stream = StringIO()
self.test = subunit.ProtocolTestCase(self.input_stream)
def test_stats_empty(self):
self.test.run(self.result)
self.assertEqual(0, self.result.total_tests)
self.assertEqual(0, self.result.passed_tests)
self.assertEqual(0, self.result.failed_tests)
self.assertEqual(set(), self.result.seen_tags)
def setUpUsedStream(self):
self.input_stream.write("""tags: global
test passed
success passed
test failed
tags: local
failure failed
test error
error error
test skipped
skip skipped
test todo
xfail todo
""")
self.input_stream.seek(0)
self.test.run(self.result)
def test_stats_smoke_everything(self):
# Statistics are calculated usefully.
self.setUpUsedStream()
self.assertEqual(5, self.result.total_tests)
self.assertEqual(2, self.result.passed_tests)
self.assertEqual(2, self.result.failed_tests)
self.assertEqual(1, self.result.skipped_tests)
self.assertEqual(set(["global", "local"]), self.result.seen_tags)
def test_stat_formatting(self):
expected = ("""
Total tests: 5
Passed tests: 2
Failed tests: 2
Skipped tests: 1
Seen tags: global, local
""")[1:]
self.setUpUsedStream()
self.result.formatStats()
self.assertEqual(expected, self.output.getvalue())
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)
return result

View File

@ -0,0 +1,68 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Tests for subunit.tag_stream."""
import unittest
from StringIO import StringIO
import subunit
import subunit.test_results
class TestSubUnitTags(unittest.TestCase):
def setUp(self):
self.original = StringIO()
self.filtered = StringIO()
def test_add_tag(self):
self.original.write("tags: foo\n")
self.original.write("test: test\n")
self.original.write("tags: bar -quux\n")
self.original.write("success: test\n")
self.original.seek(0)
result = subunit.tag_stream(self.original, self.filtered, ["quux"])
self.assertEqual([
"tags: quux",
"tags: foo",
"test: test",
"tags: bar",
"success: test",
],
self.filtered.getvalue().splitlines())
def test_remove_tag(self):
self.original.write("tags: foo\n")
self.original.write("test: test\n")
self.original.write("tags: bar -quux\n")
self.original.write("success: test\n")
self.original.seek(0)
result = subunit.tag_stream(self.original, self.filtered, ["-bar"])
self.assertEqual([
"tags: -bar",
"tags: foo",
"test: test",
"tags: -quux",
"success: test",
],
self.filtered.getvalue().splitlines())
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)
return result

View File

@ -0,0 +1,432 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
"""Tests for TAP2SubUnit."""
import unittest
from StringIO import StringIO
import os
import subunit
import sys
class TestTAP2SubUnit(unittest.TestCase):
"""Tests for TAP2SubUnit.
These tests test TAP string data in, and subunit string data out.
This is ok because the subunit protocol is intended to be stable,
but it might be easier/pithier to write tests against TAP string in,
parsed subunit objects out (by hooking the subunit stream to a subunit
protocol server.
"""
def setUp(self):
self.tap = StringIO()
self.subunit = StringIO()
def test_skip_entire_file(self):
# A file
# 1..- # Skipped: comment
# results in a single skipped test.
self.tap.write("1..0 # Skipped: entire file skipped\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
"test file skip",
"skip file skip [",
"Skipped: entire file skipped",
"]",
],
self.subunit.getvalue().splitlines())
def test_ok_test_pass(self):
# A file
# ok
# results in a passed test with name 'test 1' (a synthetic name as tap
# does not require named fixtures - it is the first test in the tap
# stream).
self.tap.write("ok\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
"test test 1",
"success test 1",
],
self.subunit.getvalue().splitlines())
def test_ok_test_number_pass(self):
# A file
# ok 1
# results in a passed test with name 'test 1'
self.tap.write("ok 1\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
"test test 1",
"success test 1",
],
self.subunit.getvalue().splitlines())
def test_ok_test_number_description_pass(self):
# A file
# ok 1 - There is a description
# results in a passed test with name 'test 1 - There is a description'
self.tap.write("ok 1 - There is a description\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
"test test 1 - There is a description",
"success test 1 - There is a description",
],
self.subunit.getvalue().splitlines())
def test_ok_test_description_pass(self):
# A file
# ok There is a description
# results in a passed test with name 'test 1 There is a description'
self.tap.write("ok There is a description\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
"test test 1 There is a description",
"success test 1 There is a description",
],
self.subunit.getvalue().splitlines())
def test_ok_SKIP_skip(self):
# A file
# ok # SKIP
# results in a skkip test with name 'test 1'
self.tap.write("ok # SKIP\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
"test test 1",
"skip test 1",
],
self.subunit.getvalue().splitlines())
def test_ok_number_description_SKIP_skip_comment(self):
# A file
# ok 1 foo # SKIP Not done yet
# results in a skip test with name 'test 1 foo' and a log of
# Not done yet
self.tap.write("ok 1 foo # SKIP Not done yet\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
"test test 1 foo",
"skip test 1 foo [",
"Not done yet",
"]",
],
self.subunit.getvalue().splitlines())
def test_ok_SKIP_skip_comment(self):
# A file
# ok # SKIP Not done yet
# results in a skip test with name 'test 1' and a log of Not done yet
self.tap.write("ok # SKIP Not done yet\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
"test test 1",
"skip test 1 [",
"Not done yet",
"]",
],
self.subunit.getvalue().splitlines())
def test_ok_TODO_xfail(self):
# A file
# ok # TODO
# results in a xfail test with name 'test 1'
self.tap.write("ok # TODO\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
"test test 1",
"xfail test 1",
],
self.subunit.getvalue().splitlines())
def test_ok_TODO_xfail_comment(self):
# A file
# ok # TODO Not done yet
# results in a xfail test with name 'test 1' and a log of Not done yet
self.tap.write("ok # TODO Not done yet\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
"test test 1",
"xfail test 1 [",
"Not done yet",
"]",
],
self.subunit.getvalue().splitlines())
def test_bail_out_errors(self):
# A file with line in it
# Bail out! COMMENT
# is treated as an error
self.tap.write("ok 1 foo\n")
self.tap.write("Bail out! Lifejacket engaged\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
"test test 1 foo",
"success test 1 foo",
"test Bail out! Lifejacket engaged",
"error Bail out! Lifejacket engaged",
],
self.subunit.getvalue().splitlines())
def test_missing_test_at_end_with_plan_adds_error(self):
# A file
# 1..3
# ok first test
# not ok third test
# results in three tests, with the third being created
self.tap.write('1..3\n')
self.tap.write('ok first test\n')
self.tap.write('not ok second test\n')
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
'test test 1 first test',
'success test 1 first test',
'test test 2 second test',
'failure test 2 second test',
'test test 3',
'error test 3 [',
'test missing from TAP output',
']',
],
self.subunit.getvalue().splitlines())
def test_missing_test_with_plan_adds_error(self):
# A file
# 1..3
# ok first test
# not ok 3 third test
# results in three tests, with the second being created
self.tap.write('1..3\n')
self.tap.write('ok first test\n')
self.tap.write('not ok 3 third test\n')
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
'test test 1 first test',
'success test 1 first test',
'test test 2',
'error test 2 [',
'test missing from TAP output',
']',
'test test 3 third test',
'failure test 3 third test',
],
self.subunit.getvalue().splitlines())
def test_missing_test_no_plan_adds_error(self):
# A file
# ok first test
# not ok 3 third test
# results in three tests, with the second being created
self.tap.write('ok first test\n')
self.tap.write('not ok 3 third test\n')
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
'test test 1 first test',
'success test 1 first test',
'test test 2',
'error test 2 [',
'test missing from TAP output',
']',
'test test 3 third test',
'failure test 3 third test',
],
self.subunit.getvalue().splitlines())
def test_four_tests_in_a_row_trailing_plan(self):
# A file
# ok 1 - first test in a script with no plan at all
# not ok 2 - second
# ok 3 - third
# not ok 4 - fourth
# 1..4
# results in four tests numbered and named
self.tap.write('ok 1 - first test in a script with trailing plan\n')
self.tap.write('not ok 2 - second\n')
self.tap.write('ok 3 - third\n')
self.tap.write('not ok 4 - fourth\n')
self.tap.write('1..4\n')
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
'test test 1 - first test in a script with trailing plan',
'success test 1 - first test in a script with trailing plan',
'test test 2 - second',
'failure test 2 - second',
'test test 3 - third',
'success test 3 - third',
'test test 4 - fourth',
'failure test 4 - fourth'
],
self.subunit.getvalue().splitlines())
def test_four_tests_in_a_row_with_plan(self):
# A file
# 1..4
# ok 1 - first test in a script with no plan at all
# not ok 2 - second
# ok 3 - third
# not ok 4 - fourth
# results in four tests numbered and named
self.tap.write('1..4\n')
self.tap.write('ok 1 - first test in a script with a plan\n')
self.tap.write('not ok 2 - second\n')
self.tap.write('ok 3 - third\n')
self.tap.write('not ok 4 - fourth\n')
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
'test test 1 - first test in a script with a plan',
'success test 1 - first test in a script with a plan',
'test test 2 - second',
'failure test 2 - second',
'test test 3 - third',
'success test 3 - third',
'test test 4 - fourth',
'failure test 4 - fourth'
],
self.subunit.getvalue().splitlines())
def test_four_tests_in_a_row_no_plan(self):
# A file
# ok 1 - first test in a script with no plan at all
# not ok 2 - second
# ok 3 - third
# not ok 4 - fourth
# results in four tests numbered and named
self.tap.write('ok 1 - first test in a script with no plan at all\n')
self.tap.write('not ok 2 - second\n')
self.tap.write('ok 3 - third\n')
self.tap.write('not ok 4 - fourth\n')
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
'test test 1 - first test in a script with no plan at all',
'success test 1 - first test in a script with no plan at all',
'test test 2 - second',
'failure test 2 - second',
'test test 3 - third',
'success test 3 - third',
'test test 4 - fourth',
'failure test 4 - fourth'
],
self.subunit.getvalue().splitlines())
def test_todo_and_skip(self):
# A file
# not ok 1 - a fail but # TODO but is TODO
# not ok 2 - another fail # SKIP instead
# results in two tests, numbered and commented.
self.tap.write("not ok 1 - a fail but # TODO but is TODO\n")
self.tap.write("not ok 2 - another fail # SKIP instead\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
'test test 1 - a fail but',
'xfail test 1 - a fail but [',
'but is TODO',
']',
'test test 2 - another fail',
'skip test 2 - another fail [',
'instead',
']',
],
self.subunit.getvalue().splitlines())
def test_leading_comments_add_to_next_test_log(self):
# A file
# # comment
# ok
# ok
# results in a single test with the comment included
# in the first test and not the second.
self.tap.write("# comment\n")
self.tap.write("ok\n")
self.tap.write("ok\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
'test test 1',
'success test 1 [',
'# comment',
']',
'test test 2',
'success test 2',
],
self.subunit.getvalue().splitlines())
def test_trailing_comments_are_included_in_last_test_log(self):
# A file
# ok foo
# ok foo
# # comment
# results in a two tests, with the second having the comment
# attached to its log.
self.tap.write("ok\n")
self.tap.write("ok\n")
self.tap.write("# comment\n")
self.tap.seek(0)
result = subunit.TAP2SubUnit(self.tap, self.subunit)
self.assertEqual(0, result)
self.assertEqual([
'test test 1',
'success test 1',
'test test 2',
'success test 2 [',
'# comment',
']',
],
self.subunit.getvalue().splitlines())
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)
return result

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,199 @@
#
# subunit: extensions to Python unittest to get test results from subprocesses.
# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
import datetime
import unittest
from StringIO import StringIO
import os
import sys
from testtools.content_type import ContentType
from testtools.content import Content
import subunit
import subunit.iso8601 as iso8601
import subunit.test_results
class LoggingDecorator(subunit.test_results.HookedTestResultDecorator):
def __init__(self, decorated):
self._calls = 0
super(LoggingDecorator, self).__init__(decorated)
def _before_event(self):
self._calls += 1
class AssertBeforeTestResult(LoggingDecorator):
"""A TestResult for checking preconditions."""
def __init__(self, decorated, test):
self.test = test
super(AssertBeforeTestResult, self).__init__(decorated)
def _before_event(self):
self.test.assertEqual(1, self.earlier._calls)
super(AssertBeforeTestResult, self)._before_event()
class TimeCapturingResult(unittest.TestResult):
def __init__(self):
super(TimeCapturingResult, self).__init__()
self._calls = []
def time(self, a_datetime):
self._calls.append(a_datetime)
class TestHookedTestResultDecorator(unittest.TestCase):
def setUp(self):
# An end to the chain
terminal = unittest.TestResult()
# Asserts that the call was made to self.result before asserter was
# called.
asserter = AssertBeforeTestResult(terminal, self)
# The result object we call, which much increase its call count.
self.result = LoggingDecorator(asserter)
asserter.earlier = self.result
self.decorated = asserter
def tearDown(self):
# The hook in self.result must have been called
self.assertEqual(1, self.result._calls)
# The hook in asserter must have been called too, otherwise the
# assertion about ordering won't have completed.
self.assertEqual(1, self.decorated._calls)
def test_startTest(self):
self.result.startTest(self)
def test_startTestRun(self):
self.result.startTestRun()
def test_stopTest(self):
self.result.stopTest(self)
def test_stopTestRun(self):
self.result.stopTestRun()
def test_addError(self):
self.result.addError(self, subunit.RemoteError())
def test_addError_details(self):
self.result.addError(self, details={})
def test_addFailure(self):
self.result.addFailure(self, subunit.RemoteError())
def test_addFailure_details(self):
self.result.addFailure(self, details={})
def test_addSuccess(self):
self.result.addSuccess(self)
def test_addSuccess_details(self):
self.result.addSuccess(self, details={})
def test_addSkip(self):
self.result.addSkip(self, "foo")
def test_addSkip_details(self):
self.result.addSkip(self, details={})
def test_addExpectedFailure(self):
self.result.addExpectedFailure(self, subunit.RemoteError())
def test_addExpectedFailure_details(self):
self.result.addExpectedFailure(self, details={})
def test_addUnexpectedSuccess(self):
self.result.addUnexpectedSuccess(self)
def test_addUnexpectedSuccess_details(self):
self.result.addUnexpectedSuccess(self, details={})
def test_progress(self):
self.result.progress(1, subunit.PROGRESS_SET)
def test_wasSuccessful(self):
self.result.wasSuccessful()
def test_shouldStop(self):
self.result.shouldStop
def test_stop(self):
self.result.stop()
def test_time(self):
self.result.time(None)
class TestAutoTimingTestResultDecorator(unittest.TestCase):
def setUp(self):
# And end to the chain which captures time events.
terminal = TimeCapturingResult()
# The result object under test.
self.result = subunit.test_results.AutoTimingTestResultDecorator(
terminal)
self.decorated = terminal
def test_without_time_calls_time_is_called_and_not_None(self):
self.result.startTest(self)
self.assertEqual(1, len(self.decorated._calls))
self.assertNotEqual(None, self.decorated._calls[0])
def test_no_time_from_progress(self):
self.result.progress(1, subunit.PROGRESS_CUR)
self.assertEqual(0, len(self.decorated._calls))
def test_no_time_from_shouldStop(self):
self.decorated.stop()
self.result.shouldStop
self.assertEqual(0, len(self.decorated._calls))
def test_calling_time_inhibits_automatic_time(self):
# Calling time() outputs a time signal immediately and prevents
# automatically adding one when other methods are called.
time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc())
self.result.time(time)
self.result.startTest(self)
self.result.stopTest(self)
self.assertEqual(1, len(self.decorated._calls))
self.assertEqual(time, self.decorated._calls[0])
def test_calling_time_None_enables_automatic_time(self):
time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc())
self.result.time(time)
self.assertEqual(1, len(self.decorated._calls))
self.assertEqual(time, self.decorated._calls[0])
# Calling None passes the None through, in case other results care.
self.result.time(None)
self.assertEqual(2, len(self.decorated._calls))
self.assertEqual(None, self.decorated._calls[1])
# Calling other methods doesn't generate an automatic time event.
self.result.startTest(self)
self.assertEqual(3, len(self.decorated._calls))
self.assertNotEqual(None, self.decorated._calls[2])
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)
return result

View File

@ -1,35 +0,0 @@
#!/usr/bin/perl
# Simple script that converts Perl test harness output to
# Subunit
# Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
# Published under the GNU GPL, v3 or later
my $firstline = 1;
my $error = 0;
while(<STDIN>) {
if ($firstline) {
$firstline = 0;
next;
}
if (/^not ok (\d+) - (.*)$/) {
print "test: $2\n";
print "failure: $2\n";
$error = 1;
} elsif (/^ok (\d+) - (.*)$/) {
print "test: $2\n";
print "success: $2\n";
} elsif (/^ok (\d+)$/) {
print "test: $1\n";
print "success: $1\n";
} elsif (/^ok (\d+) # skip (.*)$/) {
print "test: $1\n";
print "skip: $1 [\n$2\n]\n";
} elsif (/^not ok (\d+)$/) {
print "test: $1\n";
print "failure: $1\n";
$error = 1;
} else {
print;
}
}
exit $error;

View File

@ -5,9 +5,9 @@ TARGETDIR="`dirname $0`"
WORKDIR="`mktemp -d`"
bzr branch lp:subunit "$WORKDIR/subunit"
for p in python filters;
for p in python/ filters/tap2subunit;
do
rsync -avz --delete "$WORKDIR/subunit/$p/" "$TARGETDIR/$p/"
rsync -avz --delete "$WORKDIR/subunit/$p" "$TARGETDIR/$p"
done
rm -rf "$WORKDIR"

View File

@ -85,7 +85,7 @@ smb4torture="$samba4bindir/smbtorture${EXEEXT}"
if which tap2subunit 2>/dev/null; then
TAP2SUBUNIT=tap2subunit
else
TAP2SUBUNIT="$PERL $samba4srcdir/../lib/subunit/tap2subunit"
TAP2SUBUNIT="PYTHONPATH=$samba4srcdir/../lib/subunit/python $PYTHON $samba4srcdir/../lib/subunit/filters/tap2subunit"
fi
$smb4torture -V