1
0
mirror of https://github.com/samba-team/samba.git synced 2025-04-30 18:53:31 +03:00
Jelmer Vernooij c619f86efc r26516: Fix line splitting in subunitrun.
(This used to be commit 623b7b31144f9da5e256e5541d04db8698dd39d3)
2007-12-21 05:51:20 +01:00

385 lines
13 KiB
Python

#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
# Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import os
from StringIO import StringIO
import sys
import unittest
def test_suite():
import subunit.tests
return subunit.tests.test_suite()
def join_dir(base_path, path):
"""
Returns an absolute path to C{path}, calculated relative to the parent
of C{base_path}.
@param base_path: A path to a file or directory.
@param path: An absolute path, or a path relative to the containing
directory of C{base_path}.
@return: An absolute path to C{path}.
"""
return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
class TestProtocolServer(object):
"""A class for receiving results from a TestProtocol client."""
OUTSIDE_TEST = 0
TEST_STARTED = 1
READING_FAILURE = 2
READING_ERROR = 3
def __init__(self, client, stream=sys.stdout):
"""Create a TestProtocol server instance.
client should be an object that provides
- startTest
- addSuccess
- addFailure
- addError
- stopTest
methods, i.e. a TestResult.
"""
self.state = TestProtocolServer.OUTSIDE_TEST
self.client = client
self._stream = stream
def _addError(self, offset, line):
if (self.state == TestProtocolServer.TEST_STARTED and
self.current_test_description == line[offset:-1]):
self.state = TestProtocolServer.OUTSIDE_TEST
self.current_test_description = None
self.client.addError(self._current_test, RemoteError(""))
self.client.stopTest(self._current_test)
self._current_test = None
elif (self.state == TestProtocolServer.TEST_STARTED and
self.current_test_description + " [" == line[offset:-1]):
self.state = TestProtocolServer.READING_ERROR
self._message = ""
else:
self.stdOutLineReceived(line)
def _addFailure(self, offset, line):
if (self.state == TestProtocolServer.TEST_STARTED and
self.current_test_description == line[offset:-1]):
self.state = TestProtocolServer.OUTSIDE_TEST
self.current_test_description = None
self.client.addFailure(self._current_test, RemoteError())
self.client.stopTest(self._current_test)
elif (self.state == TestProtocolServer.TEST_STARTED and
self.current_test_description + " [" == line[offset:-1]):
self.state = TestProtocolServer.READING_FAILURE
self._message = ""
else:
self.stdOutLineReceived(line)
def _addSuccess(self, offset, line):
if (self.state == TestProtocolServer.TEST_STARTED and
self.current_test_description == line[offset:-1]):
self.client.addSuccess(self._current_test)
self.client.stopTest(self._current_test)
self.current_test_description = None
self._current_test = None
self.state = TestProtocolServer.OUTSIDE_TEST
else:
self.stdOutLineReceived(line)
def _appendMessage(self, line):
if line[0:2] == " ]":
# quoted ] start
self._message += line[1:]
else:
self._message += line
def endQuote(self, line):
if self.state == TestProtocolServer.READING_FAILURE:
self.state = TestProtocolServer.OUTSIDE_TEST
self.current_test_description = None
self.client.addFailure(self._current_test,
RemoteError(self._message))
self.client.stopTest(self._current_test)
elif self.state == TestProtocolServer.READING_ERROR:
self.state = TestProtocolServer.OUTSIDE_TEST
self.current_test_description = None
self.client.addError(self._current_test,
RemoteError(self._message))
self.client.stopTest(self._current_test)
else:
self.stdOutLineReceived(line)
def lineReceived(self, line):
"""Call the appropriate local method for the received line."""
if line == "]\n":
self.endQuote(line)
elif (self.state == TestProtocolServer.READING_FAILURE or
self.state == TestProtocolServer.READING_ERROR):
self._appendMessage(line)
else:
parts = line.split(None, 1)
if len(parts) == 2:
cmd, rest = parts
offset = len(cmd) + 1
cmd = cmd.strip(':')
if cmd in ('test', 'testing'):
self._startTest(offset, line)
elif cmd == 'error':
self._addError(offset, line)
elif cmd == 'failure':
self._addFailure(offset, line)
elif cmd in ('success', 'successful'):
self._addSuccess(offset, line)
else:
self.stdOutLineReceived(line)
else:
self.stdOutLineReceived(line)
def lostConnection(self):
"""The input connection has finished."""
if self.state == TestProtocolServer.TEST_STARTED:
self.client.addError(self._current_test,
RemoteError("lost connection during test '%s'"
% self.current_test_description))
self.client.stopTest(self._current_test)
elif self.state == TestProtocolServer.READING_ERROR:
self.client.addError(self._current_test,
RemoteError("lost connection during "
"error report of test "
"'%s'" %
self.current_test_description))
self.client.stopTest(self._current_test)
elif self.state == TestProtocolServer.READING_FAILURE:
self.client.addError(self._current_test,
RemoteError("lost connection during "
"failure report of test "
"'%s'" %
self.current_test_description))
self.client.stopTest(self._current_test)
def readFrom(self, pipe):
for line in pipe.readlines():
self.lineReceived(line)
self.lostConnection()
def _startTest(self, offset, line):
"""Internal call to change state machine. Override startTest()."""
if self.state == TestProtocolServer.OUTSIDE_TEST:
self.state = TestProtocolServer.TEST_STARTED
self._current_test = RemotedTestCase(line[offset:-1])
self.current_test_description = line[offset:-1]
self.client.startTest(self._current_test)
else:
self.stdOutLineReceived(line)
def stdOutLineReceived(self, line):
self._stream.write(line)
class RemoteException(Exception):
"""An exception that occured remotely to python."""
def __eq__(self, other):
try:
return self.args == other.args
except AttributeError:
return False
class TestProtocolClient(unittest.TestResult):
"""A class that looks like a TestResult and informs a TestProtocolServer."""
def __init__(self, stream):
unittest.TestResult.__init__(self)
self._stream = stream
def addError(self, test, error):
"""Report an error in test test."""
self._stream.write("error: %s [\n" % (test.shortDescription() or str(test)))
for line in self._exc_info_to_string(error, test).splitlines():
self._stream.write("%s\n" % line)
self._stream.write("]\n")
def addFailure(self, test, error):
"""Report a failure in test test."""
self._stream.write("failure: %s [\n" % (test.shortDescription() or str(test)))
for line in self._exc_info_to_string(error, test).splitlines():
self._stream.write("%s\n" % line)
self._stream.write("]\n")
def addSuccess(self, test):
"""Report a success in a test."""
self._stream.write("successful: %s\n" % (test.shortDescription() or str(test)))
def startTest(self, test):
"""Mark a test as starting its test run."""
self._stream.write("test: %s\n" % (test.shortDescription() or str(test)))
def RemoteError(description=""):
if description == "":
description = "\n"
return (RemoteException, RemoteException(description), None)
class RemotedTestCase(unittest.TestCase):
"""A class to represent test cases run in child processes."""
def __eq__ (self, other):
try:
return self.__description == other.__description
except AttributeError:
return False
def __init__(self, description):
"""Create a psuedo test case with description description."""
self.__description = description
def error(self, label):
raise NotImplementedError("%s on RemotedTestCases is not permitted." %
label)
def setUp(self):
self.error("setUp")
def tearDown(self):
self.error("tearDown")
def shortDescription(self):
return self.__description
def id(self):
return "%s.%s" % (self._strclass(), self.__description)
def __str__(self):
return "%s (%s)" % (self.__description, self._strclass())
def __repr__(self):
return "<%s description='%s'>" % \
(self._strclass(), self.__description)
def run(self, result=None):
if result is None: result = self.defaultTestResult()
result.startTest(self)
result.addError(self, RemoteError("Cannot run RemotedTestCases.\n"))
result.stopTest(self)
def _strclass(self):
cls = self.__class__
return "%s.%s" % (cls.__module__, cls.__name__)
class ExecTestCase(unittest.TestCase):
"""A test case which runs external scripts for test fixtures."""
def __init__(self, methodName='runTest'):
"""Create an instance of the class that will use the named test
method when executed. Raises a ValueError if the instance does
not have a method with the specified name.
"""
unittest.TestCase.__init__(self, methodName)
testMethod = getattr(self, methodName)
self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
testMethod.__doc__)
def countTestCases(self):
return 1
def run(self, result=None):
if result is None: result = self.defaultTestResult()
self._run(result)
def debug(self):
"""Run the test without collecting errors in a TestResult"""
self._run(unittest.TestResult())
def _run(self, result):
protocol = TestProtocolServer(result)
output = os.popen(self.script, mode='r')
protocol.readFrom(output)
class IsolatedTestCase(unittest.TestCase):
"""A TestCase which runs its tests in a forked process."""
def run(self, result=None):
if result is None: result = self.defaultTestResult()
run_isolated(unittest.TestCase, self, result)
class IsolatedTestSuite(unittest.TestSuite):
"""A TestCase which runs its tests in a forked process."""
def run(self, result=None):
if result is None: result = unittest.TestResult()
run_isolated(unittest.TestSuite, self, result)
def run_isolated(klass, self, result):
"""Run a test suite or case in a subprocess, using the run method on klass.
"""
c2pread, c2pwrite = os.pipe()
# fixme - error -> result
# now fork
pid = os.fork()
if pid == 0:
# Child
# Close parent's pipe ends
os.close(c2pread)
# Dup fds for child
os.dup2(c2pwrite, 1)
# Close pipe fds.
os.close(c2pwrite)
# at this point, sys.stdin is redirected, now we want
# to filter it to escape ]'s.
### XXX: test and write that bit.
result = TestProtocolClient(sys.stdout)
klass.run(self, result)
sys.stdout.flush()
sys.stderr.flush()
# exit HARD, exit NOW.
os._exit(0)
else:
# Parent
# Close child pipe ends
os.close(c2pwrite)
# hookup a protocol engine
protocol = TestProtocolServer(result)
protocol.readFrom(os.fdopen(c2pread, 'rU'))
os.waitpid(pid, 0)
# TODO return code evaluation.
return result
class SubunitTestRunner:
def __init__(self, stream=sys.stdout):
self.stream = stream
def run(self, test):
"Run the given test case or test suite."
result = TestProtocolClient(self.stream)
test(result)
return result