tests: Drop most unittest usage from virtinst tests

Kill usage of the TestCase class, move more to pytest standard
style

Signed-off-by: Cole Robinson <crobinso@redhat.com>
This commit is contained in:
Cole Robinson 2020-09-18 16:26:28 -04:00
parent b79ee9565f
commit fa322588b4
12 changed files with 1770 additions and 1733 deletions

View File

@ -4,7 +4,6 @@
# See the COPYING file in the top-level directory. # See the COPYING file in the top-level directory.
import os import os
import unittest
import pytest import pytest
@ -17,93 +16,98 @@ from virtinst import DomainCapabilities
DATADIR = utils.DATADIR + "/capabilities" DATADIR = utils.DATADIR + "/capabilities"
class TestCapabilities(unittest.TestCase): def _buildCaps(filename):
def _buildCaps(self, filename): path = os.path.join(DATADIR, filename)
path = os.path.join(DATADIR, filename) conn = utils.URIs.open_testdefault_cached()
conn = utils.URIs.open_testdefault_cached() return Capabilities(conn, open(path).read())
return Capabilities(conn, open(path).read())
def testCapsCPUFeaturesNewSyntax(self):
filename = "test-qemu-with-kvm.xml"
host_feature_list = ['lahf_lm', 'xtpr', 'cx16', 'tm2', 'est', 'vmx',
'ds_cpl', 'pbe', 'tm', 'ht', 'ss', 'acpi', 'ds']
caps = self._buildCaps(filename)
for f in host_feature_list:
assert f in [feat.name for feat in caps.host.cpu.features]
assert caps.host.cpu.model == "core2duo"
assert caps.host.cpu.vendor == "Intel"
assert caps.host.cpu.topology.threads == 3
assert caps.host.cpu.topology.cores == 5
assert caps.host.cpu.topology.sockets == 7
def testCapsUtilFuncs(self):
caps_with_kvm = self._buildCaps("test-qemu-with-kvm.xml")
caps_no_kvm = self._buildCaps("test-qemu-no-kvm.xml")
caps_empty = self._buildCaps("test-empty.xml")
def test_utils(caps, has_guests, is_kvm):
assert caps.has_install_options() == has_guests
if caps.guests:
assert caps.guests[0].is_kvm_available() == is_kvm
test_utils(caps_empty, False, False)
test_utils(caps_with_kvm, True, True)
test_utils(caps_no_kvm, True, False)
# Small test for extra unittest coverage
with pytest.raises(ValueError, match=r".*virtualization type 'xen'.*"):
caps_empty.guest_lookup(os_type="linux")
with pytest.raises(ValueError, match=r".*not support any.*"):
caps_empty.guest_lookup()
def testCapsNuma(self):
cells = self._buildCaps("lxc.xml").host.topology.cells
assert len(cells) == 1
assert len(cells[0].cpus) == 8
assert cells[0].cpus[3].id == '3'
############################## def testCapsCPUFeaturesNewSyntax():
# domcapabilities.py testing # filename = "test-qemu-with-kvm.xml"
############################## host_feature_list = ['lahf_lm', 'xtpr', 'cx16', 'tm2', 'est', 'vmx',
'ds_cpl', 'pbe', 'tm', 'ht', 'ss', 'acpi', 'ds']
def testDomainCapabilities(self): caps = _buildCaps(filename)
xml = open(DATADIR + "/test-domcaps.xml").read() for f in host_feature_list:
caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml) assert f in [feat.name for feat in caps.host.cpu.features]
assert caps.os.loader.supported is True assert caps.host.cpu.model == "core2duo"
assert caps.os.loader.get_values() == ["/foo/bar", "/tmp/my_path"] assert caps.host.cpu.vendor == "Intel"
assert caps.os.loader.enum_names() == ["type", "readonly"] assert caps.host.cpu.topology.threads == 3
assert caps.os.loader.get_enum("type").get_values() == [ assert caps.host.cpu.topology.cores == 5
"rom", "pflash"] assert caps.host.cpu.topology.sockets == 7
def testDomainCapabilitiesx86(self):
xml = open(DATADIR + "/kvm-x86_64-domcaps.xml").read()
caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml)
assert caps.machine == "pc-i440fx-2.1" def testCapsUtilFuncs():
assert caps.arch == "x86_64" caps_with_kvm = _buildCaps("test-qemu-with-kvm.xml")
assert caps.domain == "kvm" caps_no_kvm = _buildCaps("test-qemu-no-kvm.xml")
assert caps.path == "/bin/qemu-system-x86_64" caps_empty = _buildCaps("test-empty.xml")
custom_mode = caps.cpu.get_mode("custom") def test_utils(caps, has_guests, is_kvm):
assert bool(custom_mode) assert caps.has_install_options() == has_guests
cpu_model = custom_mode.get_model("Opteron_G4") if caps.guests:
assert bool(cpu_model) assert caps.guests[0].is_kvm_available() == is_kvm
assert cpu_model.usable
models = caps.get_cpu_models() test_utils(caps_empty, False, False)
assert len(models) > 10 test_utils(caps_with_kvm, True, True)
assert "SandyBridge" in models test_utils(caps_no_kvm, True, False)
assert caps.label_for_firmware_path(None) == "BIOS" # Small test for extra coverage
assert "Custom:" in caps.label_for_firmware_path("/foobar") with pytest.raises(ValueError, match=r".*virtualization type 'xen'.*"):
assert "UEFI" in caps.label_for_firmware_path("OVMF") caps_empty.guest_lookup(os_type="linux")
with pytest.raises(ValueError, match=r".*not support any.*"):
caps_empty.guest_lookup()
def testDomainCapabilitiesAArch64(self):
xml = open(DATADIR + "/kvm-aarch64-domcaps.xml").read()
caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml)
assert "None" in caps.label_for_firmware_path(None) def testCapsNuma():
cells = _buildCaps("lxc.xml").host.topology.cells
assert len(cells) == 1
assert len(cells[0].cpus) == 8
assert cells[0].cpus[3].id == '3'
##############################
# domcapabilities.py testing #
##############################
def testDomainCapabilities():
xml = open(DATADIR + "/test-domcaps.xml").read()
caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml)
assert caps.os.loader.supported is True
assert caps.os.loader.get_values() == ["/foo/bar", "/tmp/my_path"]
assert caps.os.loader.enum_names() == ["type", "readonly"]
assert caps.os.loader.get_enum("type").get_values() == [
"rom", "pflash"]
def testDomainCapabilitiesx86():
xml = open(DATADIR + "/kvm-x86_64-domcaps.xml").read()
caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml)
assert caps.machine == "pc-i440fx-2.1"
assert caps.arch == "x86_64"
assert caps.domain == "kvm"
assert caps.path == "/bin/qemu-system-x86_64"
custom_mode = caps.cpu.get_mode("custom")
assert bool(custom_mode)
cpu_model = custom_mode.get_model("Opteron_G4")
assert bool(cpu_model)
assert cpu_model.usable
models = caps.get_cpu_models()
assert len(models) > 10
assert "SandyBridge" in models
assert caps.label_for_firmware_path(None) == "BIOS"
assert "Custom:" in caps.label_for_firmware_path("/foobar")
assert "UEFI" in caps.label_for_firmware_path("OVMF")
def testDomainCapabilitiesAArch64():
xml = open(DATADIR + "/kvm-aarch64-domcaps.xml").read()
caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml)
assert "None" in caps.label_for_firmware_path(None)

View File

@ -10,7 +10,8 @@ import shlex
import shutil import shutil
import sys import sys
import traceback import traceback
import unittest
import pytest
try: try:
import argcomplete import argcomplete
@ -155,7 +156,7 @@ class SkipChecks:
msg = "Skipping check due to version < %s" % check msg = "Skipping check due to version < %s" % check
if skip: if skip:
raise unittest.case.SkipTest(msg) raise pytest.skip(msg)
def prerun_skip(self, conn): def prerun_skip(self, conn):
self._check(conn, self.prerun_check) self._check(conn, self.prerun_check)
@ -338,15 +339,8 @@ class Command(object):
if self.compare_file: if self.compare_file:
self._check_compare_file(conn, output) self._check_compare_file(conn, output)
def run(self, tests): def run(self):
err = None self._run()
try:
self._run()
except AssertionError as e:
err = self.cmdstr + "\n" + str(e)
if err:
tests.fail(err)
class _CategoryProxy(object): class _CategoryProxy(object):
@ -1453,32 +1447,35 @@ _add_argcomplete_cmd("virt-xml --sound mode", "model")
############## ##############
class CLIMiscTests(unittest.TestCase): @utils.run_without_testsuite_hacks
@utils.run_without_testsuite_hacks def test_virtinstall_no_testsuite():
def test_virtinstall_no_testsuite(self): """
""" Run virt-install stub command without the testsuite hacks, to test
Run virt-install stub command without the testsuite hacks, to test some code paths like proper logging etc.
some code paths like proper logging etc. """
""" cmd = Command(
cmd = Command( "virt-install --connect %s "
"virt-install --connect %s " "--test-stub-command --noautoconsole" %
"--test-stub-command --noautoconsole" % (utils.URIs.test_suite))
(utils.URIs.test_suite)) cmd.run()
cmd.run(self)
######################### #########################
# Test runner functions # # Test runner functions #
######################### #########################
newidx = 0 _CURTEST = 0
curtest = 0
def setup(): def setup():
""" """
Create initial test files/dirs Create initial test files/dirs
""" """
global _CURTEST
_CURTEST += 1
if _CURTEST != 1:
return
for i in EXIST_FILES: for i in EXIST_FILES:
open(i, "a") open(i, "a")
@ -1500,40 +1497,41 @@ def cleanup(clean_all=True):
os.unlink(i) os.unlink(i)
class CLITests(unittest.TestCase): def _create_testfunc(cmd, do_setup):
def setUp(self): def cmdtemplate():
global curtest if do_setup:
curtest += 1
# Only run this for first test
if curtest == 1:
setup() setup()
cmd.run()
def tearDown(self): return cmdtemplate
# Only run this on the last test
if curtest == newidx:
cleanup()
def maketest(cmd): def _make_testcases():
def cmdtemplate(self, _cmdobj): """
_cmdobj.run(self) Turn all the registered cli strings into test functions that
return lambda s: cmdtemplate(s, cmd) the test runner can scoop up
"""
cmdlist = []
cmdlist += vinst.cmds
cmdlist += vclon.cmds
cmdlist += vixml.cmds
cmdlist += ARGCOMPLETE_CMDS
_cmdlist = [] newidx = 0
_cmdlist += vinst.cmds for cmd in cmdlist:
_cmdlist += vclon.cmds newidx += 1
_cmdlist += vixml.cmds # Generate numbered names like testCLI%d
_cmdlist += ARGCOMPLETE_CMDS name = "testCLI%.4d" % newidx
# Generate numbered names like testCLI%d if cmd.compare_file:
for _cmd in _cmdlist: base = os.path.splitext(os.path.basename(cmd.compare_file))[0]
newidx += 1 name += base.replace("-", "_")
_name = "testCLI%.4d" % newidx else:
if _cmd.compare_file: name += os.path.basename(cmd.app.replace("-", "_"))
_base = os.path.splitext(os.path.basename(_cmd.compare_file))[0]
_name += _base.replace("-", "_")
else:
_name += os.path.basename(_cmd.app.replace("-", "_"))
setattr(CLITests, _name, maketest(_cmd))
do_setup = newidx == 1
testfunc = _create_testfunc(cmd, do_setup)
globals()[name] = testfunc
_make_testcases()
atexit.register(cleanup) atexit.register(cleanup)

View File

@ -2,8 +2,6 @@
# See the COPYING file in the top-level directory. # See the COPYING file in the top-level directory.
import os import os
import unittest
import unittest.mock
import pytest import pytest
@ -13,83 +11,84 @@ from virtinst import StoragePool
from virtinst import URI from virtinst import URI
class TestConn(unittest.TestCase): ############################
""" # VirtinstConnection tests #
VirtinstConnection tests ############################
"""
def test_misc(self):
# Misc API checks
conn = cli.getConnection("test:///default")
conn.invalidate_caps()
assert conn.is_open() is True
assert conn.is_container_only() is False
assert conn.is_openvz() is False
assert not conn.get_uri_hostname()
assert not conn.get_uri_port()
assert not conn.get_uri_username()
assert not conn.get_uri_transport()
assert conn.close() == 0
# Coverage for a daemon_version check def test_misc():
fakeuri = "__virtinst_test__test:///default,libver=123" # Misc API checks
conn = cli.getConnection(fakeuri) conn = cli.getConnection("test:///default")
assert conn.daemon_version() == 123 conn.invalidate_caps()
assert conn.is_open() is True
assert conn.is_container_only() is False
assert conn.is_openvz() is False
assert not conn.get_uri_hostname()
assert not conn.get_uri_port()
assert not conn.get_uri_username()
assert not conn.get_uri_transport()
assert conn.close() == 0
# Hit a special code path that reflects default libvirt transport # Coverage for a daemon_version check
# pylint: disable=protected-access fakeuri = "__virtinst_test__test:///default,libver=123"
conn._uriobj = URI("qemu://example.com/system") conn = cli.getConnection(fakeuri)
assert conn.get_uri_transport() == "tls" assert conn.daemon_version() == 123
# Hit the qemu:///embed case privileged case check # Hit a special code path that reflects default libvirt transport
fakeuri = "__virtinst_test__test:///default,fakeuri=qemu:///embed" # pylint: disable=protected-access
conn = cli.getConnection(fakeuri) conn._uriobj = URI("qemu://example.com/system")
assert conn.is_privileged() == (os.getuid() == 0) assert conn.get_uri_transport() == "tls"
# Hit fakuuri validation error, for old style opts # Hit the qemu:///embed case privileged case check
with pytest.raises(RuntimeError): fakeuri = "__virtinst_test__test:///default,fakeuri=qemu:///embed"
cli.getConnection(fakeuri + ",qemu") conn = cli.getConnection(fakeuri)
assert conn.is_privileged() == (os.getuid() == 0)
@unittest.mock.patch.dict(os.environ, # Hit fakuuri validation error, for old style opts
{"LIBVIRT_DEFAULT_URI": "test:///default"}) with pytest.raises(RuntimeError):
def test_default_uri(self): cli.getConnection(fakeuri + ",qemu")
# Handle connecting to None conn
conn = cli.getConnection(None)
assert conn.getURI() == "test:///default"
conn.close()
def test_poll(self): def test_default_uri(monkeypatch):
# Add coverage for conn fetch_* handling, and pollhelpers monkeypatch.setitem(os.environ, "LIBVIRT_DEFAULT_URI", "test:///default")
conn = cli.getConnection("test:///default")
objmap = {}
def build_cb(obj, name):
return obj
gone, new, master = pollhelpers.fetch_nets(conn, {}, build_cb) # Handle connecting to None conn
assert len(gone) == 0 conn = cli.getConnection(None)
assert len(new) == 1 assert conn.getURI() == "test:///default"
assert len(master) == 1 conn.close()
assert master[0].name() == "default"
objmap = dict((obj.name(), obj) for obj in master)
gone, new, master = pollhelpers.fetch_nets(conn, objmap, build_cb)
assert len(gone) == 0
assert len(new) == 0
assert len(master) == 1
assert master[0].name() == "default"
# coverage for some special cases in cache_new_pool def test_poll():
def makepool(name, create): # Add coverage for conn fetch_* handling, and pollhelpers
poolxml = StoragePool(conn) conn = cli.getConnection("test:///default")
poolxml.type = "dir" objmap = {}
poolxml.name = name def build_cb(obj, name):
poolxml.target_path = "/tmp/foo/bar/baz/%s" % name return obj
return poolxml.install(create=create)
poolobj1 = makepool("conntest1", False) gone, new, master = pollhelpers.fetch_nets(conn, {}, build_cb)
conn.fetch_all_pools() assert len(gone) == 0
poolobj2 = makepool("conntest2", True) assert len(new) == 1
conn.fetch_all_vols() assert len(master) == 1
poolobj1.undefine() assert master[0].name() == "default"
poolobj2.destroy()
poolobj2.undefine() objmap = dict((obj.name(), obj) for obj in master)
gone, new, master = pollhelpers.fetch_nets(conn, objmap, build_cb)
assert len(gone) == 0
assert len(new) == 0
assert len(master) == 1
assert master[0].name() == "default"
# coverage for some special cases in cache_new_pool
def makepool(name, create):
poolxml = StoragePool(conn)
poolxml.type = "dir"
poolxml.name = name
poolxml.target_path = "/tmp/foo/bar/baz/%s" % name
return poolxml.install(create=create)
poolobj1 = makepool("conntest1", False)
conn.fetch_all_pools()
poolobj2 = makepool("conntest2", True)
conn.fetch_all_vols()
poolobj1.undefine()
poolobj2.destroy()
poolobj2.undefine()

View File

@ -5,7 +5,6 @@
import os import os
import tempfile import tempfile
import unittest
import pytest import pytest
@ -50,8 +49,8 @@ def test_disk_numtotarget():
assert disk.generate_target(['hda', 'hdd']) == 'hdb' assert disk.generate_target(['hda', 'hdd']) == 'hdb'
def test_disk_dir_searchable(): def test_disk_dir_searchable(monkeypatch):
# Normally the dir searchable test is skipped in the unittest, # Normally the dir searchable test is skipped in the test suite,
# but let's contrive an example that should trigger all the code # but let's contrive an example that should trigger all the code
# to ensure it isn't horribly broken # to ensure it isn't horribly broken
conn = utils.URIs.open_kvm() conn = utils.URIs.open_kvm()
@ -87,8 +86,8 @@ def test_disk_dir_searchable():
assert not bool(errdict) assert not bool(errdict)
# Mock setfacl to definitely fail # Mock setfacl to definitely fail
with unittest.mock.patch("virtinst.diskbackend.SETFACL", with monkeypatch.context() as m:
"getfacl"): m.setattr("virtinst.diskbackend.SETFACL", "getfacl")
errdict = virtinst.DeviceDisk.fix_path_search(searchdata) errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
finally: finally:

View File

@ -5,7 +5,6 @@
import os import os
import sys import sys
import unittest
_alldistros = {} _alldistros = {}
@ -89,16 +88,8 @@ def _test_distro(distro):
os.system(cmd) os.system(cmd)
_printinitrd = False def _print_intro():
print("""
class InjectTests(unittest.TestCase):
def setUp(self):
global _printinitrd
if _printinitrd:
return
print("""
This is an interactive test suite. This is an interactive test suite.
@ -108,18 +99,26 @@ injections, that will cause installs to quickly fail. Look for the
failure pattern to confirm that initrd injections are working as expected. failure pattern to confirm that initrd injections are working as expected.
""") """)
prompt() prompt()
_printinitrd = True
def _build_testfunc(dobj, do_setup):
def testfunc():
if do_setup:
_print_intro()
_test_distro(dobj)
return testfunc
def _make_tests(): def _make_tests():
def _make_check_cb(_d):
return lambda s: _test_distro(_d)
idx = 0 idx = 0
for dname, dobj in _alldistros.items(): for dname, dobj in _alldistros.items():
idx += 1 idx += 1
setattr(InjectTests, "testInitrd%.3d_%s" % name = "testInitrd%.3d_%s" % (idx, dname.replace("-", "_"))
(idx, dname.replace("-", "_")), _make_check_cb(dobj))
do_setup = idx == 1
testfunc = _build_testfunc(dobj, do_setup)
globals()[name] = testfunc
_make_tests() _make_tests()

View File

@ -6,7 +6,6 @@
# See the COPYING file in the top-level directory. # See the COPYING file in the top-level directory.
import os.path import os.path
import unittest
import pytest import pytest
@ -27,117 +26,137 @@ funky_chars_xml = """
DATADIR = utils.DATADIR + "/nodedev/" DATADIR = utils.DATADIR + "/nodedev/"
class TestNodeDev(unittest.TestCase): def _nodeDevFromName(conn, devname):
@property node = conn.nodeDeviceLookupByName(devname)
def conn(self): xml = node.XMLDesc(0)
return utils.URIs.open_testdriver_cached() return NodeDevice(conn, xml)
def _nodeDevFromName(self, devname):
node = self.conn.nodeDeviceLookupByName(devname)
xml = node.XMLDesc(0)
return NodeDevice(self.conn, xml)
def _testNode2DeviceCompare(self, nodename, devfile, nodedev=None):
devfile = os.path.join(DATADIR, "devxml", devfile)
if not nodedev:
nodedev = self._nodeDevFromName(nodename)
dev = DeviceHostdev(self.conn)
dev.set_from_nodedev(nodedev)
dev.set_defaults(Guest(self.conn))
utils.diff_compare(dev.get_xml() + "\n", devfile)
def testFunkyChars(self):
# Ensure parsing doesn't fail
dev = NodeDevice(self.conn, funky_chars_xml)
assert dev.name == "L3B2616"
assert dev.device_type == "LENOVO"
def testNetDevice(self):
devname = "net_00_1c_25_10_b1_e4"
dev = self._nodeDevFromName(devname)
assert dev.name == devname
assert dev.parent == "pci_8086_1049"
assert dev.device_type == "net"
assert dev.interface == "eth0"
def testPCIDevice(self):
nodename = "pci_8086_10fb"
obj = self._nodeDevFromName(nodename)
assert obj.is_pci_sriov() is True
nodename = "pci_8086_2448"
obj = self._nodeDevFromName(nodename)
assert obj.is_pci_bridge() is True
def testUSBDevDevice(self): def _testNode2DeviceCompare(conn, nodename, devfile, nodedev=None):
devname = "usb_device_781_5151_2004453082054CA1BEEE" devfile = os.path.join(DATADIR, "devxml", devfile)
dev = self._nodeDevFromName(devname) if not nodedev:
assert dev.vendor_name == "SanDisk Corp." nodedev = _nodeDevFromName(conn, nodename)
assert dev.product_name == "Cruzer Micro 256/512MB Flash Drive"
devname = "usb_device_1d6b_1_0000_00_1a_0" dev = DeviceHostdev(conn)
dev = self._nodeDevFromName(devname) dev.set_from_nodedev(nodedev)
assert dev.is_usb_linux_root_hub() is True dev.set_defaults(Guest(conn))
utils.diff_compare(dev.get_xml() + "\n", devfile)
def testSCSIDevice(self):
devname = "pci_8086_2829_scsi_host_scsi_device_lun0"
dev = self._nodeDevFromName(devname)
assert dev.host == "0"
assert dev.bus == "0"
assert dev.target == "0"
def testStorageDevice(self):
devname = "storage_serial_SATA_WDC_WD1600AAJS__WD_WCAP95119685"
dev = self._nodeDevFromName(devname)
assert dev.block == "/dev/sda"
assert dev.drive_type == "disk"
assert dev.media_available is None
devname = "storage_model_DVDRAM_GSA_U1200N"
dev = self._nodeDevFromName(devname)
assert dev.media_label == "Fedora12_media"
assert dev.media_available == 1
def testSCSIBus(self):
devname = "pci_8086_2829_scsi_host_1"
dev = self._nodeDevFromName(devname)
assert dev.host == "2"
def testDRMDevice(self):
devname = "drm_renderD129"
dev = self._nodeDevFromName(devname)
assert dev.devnodes[0].path == "/dev/dri/renderD129"
assert dev.devnodes[0].node_type == "dev"
assert dev.devnodes[1].path == "/dev/dri/by-path/pci-0000:00:02.0-render"
assert dev.devnodes[1].node_type == "link"
assert dev.is_drm_render() is True
assert dev.get_devnode("frob")
# NodeDevice 2 Device XML tests def testFunkyChars():
def testNodeDev2USB1(self): # Ensure parsing doesn't fail
nodename = "usb_device_781_5151_2004453082054CA1BEEE" conn = utils.URIs.open_testdriver_cached()
devfile = "usbdev1.xml" dev = NodeDevice(conn, funky_chars_xml)
self._testNode2DeviceCompare(nodename, devfile) assert dev.name == "L3B2616"
assert dev.device_type == "LENOVO"
def testNodeDev2USB2(self):
nodename = "usb_device_1d6b_2_0000_00_1d_7"
devfile = "usbdev2.xml"
nodedev = self._nodeDevFromName(nodename)
self._testNode2DeviceCompare(nodename, devfile, nodedev=nodedev) def testNetDevice():
conn = utils.URIs.open_testdriver_cached()
devname = "net_00_1c_25_10_b1_e4"
dev = _nodeDevFromName(conn, devname)
assert dev.name == devname
assert dev.parent == "pci_8086_1049"
assert dev.device_type == "net"
assert dev.interface == "eth0"
def testNodeDev2PCI(self):
nodename = "pci_1180_592"
devfile = "pcidev.xml"
self._testNode2DeviceCompare(nodename, devfile)
def testNodeDevFail(self): def testPCIDevice():
nodename = "usb_device_1d6b_1_0000_00_1d_1_if0" conn = utils.URIs.open_testdriver_cached()
devfile = "" nodename = "pci_8086_10fb"
obj = _nodeDevFromName(conn, nodename)
assert obj.is_pci_sriov() is True
nodename = "pci_8086_2448"
obj = _nodeDevFromName(conn, nodename)
assert obj.is_pci_bridge() is True
# This should exist, since usbbus is not a valid device to
# pass to a guest.
pytest.raises(ValueError, def testUSBDevDevice():
self._testNode2DeviceCompare, nodename, devfile) conn = utils.URIs.open_testdriver_cached()
devname = "usb_device_781_5151_2004453082054CA1BEEE"
dev = _nodeDevFromName(conn, devname)
assert dev.vendor_name == "SanDisk Corp."
assert dev.product_name == "Cruzer Micro 256/512MB Flash Drive"
devname = "usb_device_1d6b_1_0000_00_1a_0"
dev = _nodeDevFromName(conn, devname)
assert dev.is_usb_linux_root_hub() is True
def testSCSIDevice():
conn = utils.URIs.open_testdriver_cached()
devname = "pci_8086_2829_scsi_host_scsi_device_lun0"
dev = _nodeDevFromName(conn, devname)
assert dev.host == "0"
assert dev.bus == "0"
assert dev.target == "0"
def testStorageDevice():
conn = utils.URIs.open_testdriver_cached()
devname = "storage_serial_SATA_WDC_WD1600AAJS__WD_WCAP95119685"
dev = _nodeDevFromName(conn, devname)
assert dev.block == "/dev/sda"
assert dev.drive_type == "disk"
assert dev.media_available is None
devname = "storage_model_DVDRAM_GSA_U1200N"
dev = _nodeDevFromName(conn, devname)
assert dev.media_label == "Fedora12_media"
assert dev.media_available == 1
def testSCSIBus():
conn = utils.URIs.open_testdriver_cached()
devname = "pci_8086_2829_scsi_host_1"
dev = _nodeDevFromName(conn, devname)
assert dev.host == "2"
def testDRMDevice():
conn = utils.URIs.open_testdriver_cached()
devname = "drm_renderD129"
dev = _nodeDevFromName(conn, devname)
assert dev.devnodes[0].path == "/dev/dri/renderD129"
assert dev.devnodes[0].node_type == "dev"
assert dev.devnodes[1].path == "/dev/dri/by-path/pci-0000:00:02.0-render"
assert dev.devnodes[1].node_type == "link"
assert dev.is_drm_render() is True
assert dev.get_devnode("frob")
# NodeDevice 2 Device XML tests
def testNodeDev2USB1():
conn = utils.URIs.open_testdriver_cached()
nodename = "usb_device_781_5151_2004453082054CA1BEEE"
devfile = "usbdev1.xml"
_testNode2DeviceCompare(conn, nodename, devfile)
def testNodeDev2USB2():
conn = utils.URIs.open_testdriver_cached()
nodename = "usb_device_1d6b_2_0000_00_1d_7"
devfile = "usbdev2.xml"
nodedev = _nodeDevFromName(conn, nodename)
_testNode2DeviceCompare(conn, nodename, devfile, nodedev=nodedev)
def testNodeDev2PCI():
conn = utils.URIs.open_testdriver_cached()
nodename = "pci_1180_592"
devfile = "pcidev.xml"
_testNode2DeviceCompare(conn, nodename, devfile)
def testNodeDevFail():
conn = utils.URIs.open_testdriver_cached()
nodename = "usb_device_1d6b_1_0000_00_1d_1_if0"
devfile = ""
# This should exist, since usbbus is not a valid device to
# pass to a guest.
with pytest.raises(ValueError):
_testNode2DeviceCompare(conn, nodename, devfile)

View File

@ -3,7 +3,7 @@
# This work is licensed under the GNU GPLv2 or later. # This work is licensed under the GNU GPLv2 or later.
# See the COPYING file in the top-level directory. # See the COPYING file in the top-level directory.
import unittest import pytest
from virtinst import Guest from virtinst import Guest
from virtinst import OSDB from virtinst import OSDB
@ -13,101 +13,109 @@ from virtinst.install import urldetect
from tests import utils from tests import utils
class TestOSDB(unittest.TestCase): ##################
""" # Test osdict.py #
Test osdict/OSDB ##################
"""
def test_osdict_aliases_ro(self):
aliases = getattr(OSDB, "_aliases")
if len(aliases) != 42:
raise AssertionError(_("OSDB._aliases changed size. It "
"should never be extended, since it is only for back "
"compat with pre-libosinfo osdict.py"))
def test_list_os(self): def test_osdict_aliases_ro():
OSDB.list_os() aliases = getattr(OSDB, "_aliases")
def test_recommended_resources(self): if len(aliases) != 42:
conn = utils.URIs.open_testdefault_cached() raise AssertionError(_("OSDB._aliases changed size. It "
guest = Guest(conn) "should never be extended, since it is only for back "
res = OSDB.lookup_os("generic").get_recommended_resources() "compat with pre-libosinfo osdict.py"))
assert res.get_recommended_ram(guest.os.arch) is None
res = OSDB.lookup_os("fedora21").get_recommended_resources()
assert res.get_recommended_ncpus(guest.os.arch) == 2
def test_urldetct_matching_distros(self): def test_list_os():
# pylint: disable=protected-access OSDB.list_os()
allstores = urldetect._build_distro_list(OSDB.lookup_os("generic"))
seen_distro = []
for store in allstores:
for distro in store.matching_distros:
if distro in seen_distro:
raise xmlutil.DevError(
"store=%s has conflicting matching_distro=%s " %
(store.PRETTY_NAME, distro))
seen_distro.append(distro)
def test_tree_url(self): def test_recommended_resources():
f26 = OSDB.lookup_os("fedora26") conn = utils.URIs.open_testdefault_cached()
f29 = OSDB.lookup_os("fedora29") guest = Guest(conn)
winxp = OSDB.lookup_os("winxp") res = OSDB.lookup_os("generic").get_recommended_resources()
assert res.get_recommended_ram(guest.os.arch) is None
# Valid tree URL res = OSDB.lookup_os("fedora21").get_recommended_resources()
assert "fedoraproject.org" in f26.get_location("x86_64") assert res.get_recommended_ncpus(guest.os.arch) == 2
# Most generic tree URL
assert "Everything" in f29.get_location("x86_64")
# Specific tree def test_urldetct_matching_distros():
assert "Server" in f29.get_location("x86_64", "jeos") # pylint: disable=protected-access
assert "Workstation" in f29.get_location("x86_64", "desktop") allstores = urldetect._build_distro_list(OSDB.lookup_os("generic"))
# Has tree URLs, but none for arch seen_distro = []
try: for store in allstores:
f26.get_location("ia64") for distro in store.matching_distros:
raise AssertionError("Expected failure") if distro in seen_distro:
except RuntimeError as e: raise xmlutil.DevError(
assert "ia64" in str(e) "store=%s has conflicting matching_distro=%s " %
(store.PRETTY_NAME, distro))
seen_distro.append(distro)
# Has no tree URLs
try:
winxp.get_location("x86_64")
raise AssertionError("Expected failure")
except RuntimeError as e:
assert str(e).endswith("URL location")
# Trigger an error path for code coverage def test_tree_url():
assert OSDB.guess_os_by_tree(utils.TESTDIR) is None f26 = OSDB.lookup_os("fedora26")
f29 = OSDB.lookup_os("fedora29")
winxp = OSDB.lookup_os("winxp")
def test_kernel_url(self): # Valid tree URL
def _c(name): assert "fedoraproject.org" in f26.get_location("x86_64")
osobj = OSDB.lookup_os(name)
if not osobj:
self.skipTest("osinfo-db doesn't have '%s'" % name)
return osobj.get_kernel_url_arg()
assert _c("rhel7-unknown") == "inst.repo" # Most generic tree URL
assert _c("rhel6-unknown") == "method" assert "Everything" in f29.get_location("x86_64")
assert _c("fedora-rawhide") == "inst.repo"
assert _c("fedora20") == "inst.repo"
assert _c("generic") is None
assert _c("win10") is None
assert _c("sle15") == "install"
def test_related_to(self): # Specific tree
# pylint: disable=protected-access assert "Server" in f29.get_location("x86_64", "jeos")
win10 = OSDB.lookup_os("win10") assert "Workstation" in f29.get_location("x86_64", "desktop")
assert win10._is_related_to("winxp") is True
assert win10._is_related_to("win10") is True
assert win10._is_related_to("fedora26") is False
def test_drivers(self): # Has tree URLs, but none for arch
win7 = OSDB.lookup_os("win7") try:
generic = OSDB.lookup_os("generic") f26.get_location("ia64")
assert generic.supports_unattended_drivers("x86_64") is False raise AssertionError("Expected failure")
assert win7.supports_unattended_drivers("x86_64") is True except RuntimeError as e:
assert win7.supports_unattended_drivers("fakearch") is False assert "ia64" in str(e)
assert win7.get_pre_installable_drivers_location("x86_64")
# Has no tree URLs
try:
winxp.get_location("x86_64")
raise AssertionError("Expected failure")
except RuntimeError as e:
assert str(e).endswith("URL location")
# Trigger an error path for code coverage
assert OSDB.guess_os_by_tree(utils.TESTDIR) is None
def test_kernel_url():
def _c(name):
osobj = OSDB.lookup_os(name)
if not osobj:
pytest.skip("osinfo-db doesn't have '%s'" % name)
return osobj.get_kernel_url_arg()
assert _c("rhel7-unknown") == "inst.repo"
assert _c("rhel6-unknown") == "method"
assert _c("fedora-rawhide") == "inst.repo"
assert _c("fedora20") == "inst.repo"
assert _c("generic") is None
assert _c("win10") is None
assert _c("sle15") == "install"
def test_related_to():
# pylint: disable=protected-access
win10 = OSDB.lookup_os("win10")
assert win10._is_related_to("winxp") is True
assert win10._is_related_to("win10") is True
assert win10._is_related_to("fedora26") is False
def test_drivers():
win7 = OSDB.lookup_os("win7")
generic = OSDB.lookup_os("generic")
assert generic.supports_unattended_drivers("x86_64") is False
assert win7.supports_unattended_drivers("x86_64") is True
assert win7.supports_unattended_drivers("fakearch") is False
assert win7.get_pre_installable_drivers_location("x86_64")

View File

@ -4,7 +4,6 @@
# See the COPYING file in the top-level directory. # See the COPYING file in the top-level directory.
import os import os
import unittest
from virtinst import StoragePool, StorageVolume from virtinst import StoragePool, StorageVolume
from virtinst import log from virtinst import log
@ -12,7 +11,6 @@ from virtinst import log
from tests import utils from tests import utils
# pylint: disable=protected-access # pylint: disable=protected-access
# Access to protected member, needed to unittest stuff
BASEPATH = os.path.join(utils.DATADIR, "storage") BASEPATH = os.path.join(utils.DATADIR, "storage")
@ -97,126 +95,148 @@ def createVol(conn, poolobj, volname=None, input_vol=None, clone_vol=None):
return vol_inst.install(meter=False) return vol_inst.install(meter=False)
class TestStorage(unittest.TestCase): ##############
@property # Test cases #
def conn(self): ##############
return utils.URIs.open_testdefault_cached()
def testDirPool(self): def testDirPool():
poolobj = createPool(self.conn, conn = utils.URIs.open_testdefault_cached()
StoragePool.TYPE_DIR, "pool-dir") poolobj = createPool(conn,
invol = createVol(self.conn, poolobj) StoragePool.TYPE_DIR, "pool-dir")
createVol(self.conn, poolobj, invol = createVol(conn, poolobj)
volname=invol.name() + "input", input_vol=invol) createVol(conn, poolobj,
createVol(self.conn, poolobj, volname=invol.name() + "input", input_vol=invol)
volname=invol.name() + "clone", clone_vol=invol) createVol(conn, poolobj,
removePool(poolobj) volname=invol.name() + "clone", clone_vol=invol)
removePool(poolobj)
def testFSPool(self):
poolobj = createPool(self.conn,
StoragePool.TYPE_FS, "pool-fs")
invol = createVol(self.conn, poolobj)
createVol(self.conn, poolobj,
volname=invol.name() + "input", input_vol=invol)
createVol(self.conn, poolobj,
volname=invol.name() + "clone", clone_vol=invol)
removePool(poolobj)
def testNetFSPool(self): def testFSPool():
poolobj = createPool(self.conn, conn = utils.URIs.open_testdefault_cached()
StoragePool.TYPE_NETFS, "pool-netfs") poolobj = createPool(conn,
invol = createVol(self.conn, poolobj) StoragePool.TYPE_FS, "pool-fs")
createVol(self.conn, poolobj, invol = createVol(conn, poolobj)
volname=invol.name() + "input", input_vol=invol) createVol(conn, poolobj,
createVol(self.conn, poolobj, volname=invol.name() + "input", input_vol=invol)
volname=invol.name() + "clone", clone_vol=invol) createVol(conn, poolobj,
removePool(poolobj) volname=invol.name() + "clone", clone_vol=invol)
removePool(poolobj)
def testLVPool(self):
poolobj = createPool(self.conn,
StoragePool.TYPE_LOGICAL,
"pool-logical",
source_name="pool-logical")
invol = createVol(self.conn, poolobj)
createVol(self.conn, poolobj,
volname=invol.name() + "input", input_vol=invol)
createVol(self.conn,
poolobj, volname=invol.name() + "clone", clone_vol=invol)
removePool(poolobj)
# Test parsing source name for target path def testNetFSPool():
poolobj = createPool(self.conn, StoragePool.TYPE_LOGICAL, conn = utils.URIs.open_testdefault_cached()
"pool-logical-target-srcname", poolobj = createPool(conn,
target_path="/dev/vgfoobar") StoragePool.TYPE_NETFS, "pool-netfs")
removePool(poolobj) invol = createVol(conn, poolobj)
createVol(conn, poolobj,
volname=invol.name() + "input", input_vol=invol)
createVol(conn, poolobj,
volname=invol.name() + "clone", clone_vol=invol)
removePool(poolobj)
# Test with source name
poolobj = createPool(self.conn,
StoragePool.TYPE_LOGICAL, "pool-logical-srcname",
source_name="vgname")
removePool(poolobj)
def testDiskPool(self): def testLVPool():
poolobj = createPool(self.conn, conn = utils.URIs.open_testdefault_cached()
StoragePool.TYPE_DISK, poolobj = createPool(conn,
"pool-disk", fmt="auto", StoragePool.TYPE_LOGICAL,
target_path="/some/target/path") "pool-logical",
invol = createVol(self.conn, poolobj) source_name="pool-logical")
createVol(self.conn, poolobj, invol = createVol(conn, poolobj)
volname=invol.name() + "input", input_vol=invol) createVol(conn, poolobj,
createVol(self.conn, poolobj, volname=invol.name() + "input", input_vol=invol)
volname=invol.name() + "clone", clone_vol=invol) createVol(conn,
removePool(poolobj) poolobj, volname=invol.name() + "clone", clone_vol=invol)
removePool(poolobj)
def testISCSIPool(self): # Test parsing source name for target path
poolobj = createPool(self.conn, poolobj = createPool(conn, StoragePool.TYPE_LOGICAL,
StoragePool.TYPE_ISCSI, "pool-iscsi", "pool-logical-target-srcname",
iqn="foo.bar.baz.iqn") target_path="/dev/vgfoobar")
removePool(poolobj) removePool(poolobj)
def testSCSIPool(self): # Test with source name
poolobj = createPool(self.conn, StoragePool.TYPE_SCSI, "pool-scsi") poolobj = createPool(conn,
removePool(poolobj) StoragePool.TYPE_LOGICAL, "pool-logical-srcname",
source_name="vgname")
removePool(poolobj)
def testMpathPool(self):
poolobj = createPool(self.conn, StoragePool.TYPE_MPATH, "pool-mpath")
removePool(poolobj)
def testGlusterPool(self): def testDiskPool():
poolobj = createPool(self.conn, conn = utils.URIs.open_testdefault_cached()
StoragePool.TYPE_GLUSTER, "pool-gluster") poolobj = createPool(conn,
removePool(poolobj) StoragePool.TYPE_DISK,
"pool-disk", fmt="auto",
target_path="/some/target/path")
invol = createVol(conn, poolobj)
createVol(conn, poolobj,
volname=invol.name() + "input", input_vol=invol)
createVol(conn, poolobj,
volname=invol.name() + "clone", clone_vol=invol)
removePool(poolobj)
def testRBDPool(self):
poolobj = createPool(self.conn,
StoragePool.TYPE_RBD, "pool-rbd")
removePool(poolobj)
def testMisc(self): def testISCSIPool():
# Misc coverage testing conn = utils.URIs.open_testdefault_cached()
vol = StorageVolume(self.conn) poolobj = createPool(conn,
assert vol.is_size_conflict()[0] is False StoragePool.TYPE_ISCSI, "pool-iscsi",
iqn="foo.bar.baz.iqn")
removePool(poolobj)
fullconn = utils.URIs.open_testdriver_cached()
glusterpool = fullconn.storagePoolLookupByName("gluster-pool")
diskpool = fullconn.storagePoolLookupByName("disk-pool")
glustervol = StorageVolume(fullconn) def testSCSIPool():
glustervol.pool = glusterpool conn = utils.URIs.open_testdefault_cached()
assert glustervol.supports_format() is True poolobj = createPool(conn, StoragePool.TYPE_SCSI, "pool-scsi")
removePool(poolobj)
diskvol = StorageVolume(fullconn)
diskvol.pool = diskpool
assert diskvol.supports_format() is False
glusterpool.destroy() def testMpathPool():
StoragePool.ensure_pool_is_running(glusterpool) conn = utils.URIs.open_testdefault_cached()
poolobj = createPool(conn, StoragePool.TYPE_MPATH, "pool-mpath")
removePool(poolobj)
# Check pool collision detection
name = StoragePool.find_free_name(fullconn, "gluster-pool")
assert name == "gluster-pool-1"
def testEnumerateLogical(self): def testGlusterPool():
lst = StoragePool.pool_list_from_sources(self.conn, conn = utils.URIs.open_testdefault_cached()
StoragePool.TYPE_LOGICAL) poolobj = createPool(conn,
assert lst == ["testvg1", "testvg2"] StoragePool.TYPE_GLUSTER, "pool-gluster")
removePool(poolobj)
def testRBDPool():
conn = utils.URIs.open_testdefault_cached()
poolobj = createPool(conn,
StoragePool.TYPE_RBD, "pool-rbd")
removePool(poolobj)
def testMisc():
conn = utils.URIs.open_testdefault_cached()
# Misc coverage testing
vol = StorageVolume(conn)
assert vol.is_size_conflict()[0] is False
fullconn = utils.URIs.open_testdriver_cached()
glusterpool = fullconn.storagePoolLookupByName("gluster-pool")
diskpool = fullconn.storagePoolLookupByName("disk-pool")
glustervol = StorageVolume(fullconn)
glustervol.pool = glusterpool
assert glustervol.supports_format() is True
diskvol = StorageVolume(fullconn)
diskvol.pool = diskpool
assert diskvol.supports_format() is False
glusterpool.destroy()
StoragePool.ensure_pool_is_running(glusterpool)
# Check pool collision detection
name = StoragePool.find_free_name(fullconn, "gluster-pool")
assert name == "gluster-pool-1"
def testEnumerateLogical():
conn = utils.URIs.open_testdefault_cached()
lst = StoragePool.pool_list_from_sources(conn,
StoragePool.TYPE_LOGICAL)
assert lst == ["testvg1", "testvg2"]

View File

@ -3,8 +3,6 @@
# This work is licensed under the GNU GPLv2 or later. # This work is licensed under the GNU GPLv2 or later.
# See the COPYING file in the top-level directory. # See the COPYING file in the top-level directory.
import unittest
import pytest import pytest
from virtinst import URI from virtinst import URI
@ -12,80 +10,82 @@ from virtinst import URI
import tests import tests
class TestURI(unittest.TestCase): ############################
""" # Test virtinst URI module #
Test virtinst URI module ############################
"""
def _compare(self, uri, scheme='',
transport='', port='', username='', path='',
hostname='', query='', fragment='',
is_ipv6=False, host_is_ipv4_string=False):
uriinfo = URI(uri)
assert scheme == uriinfo.scheme
assert transport == uriinfo.transport
assert port == uriinfo.port
assert username == uriinfo.username
assert path == uriinfo.path
assert hostname == uriinfo.hostname
assert query == uriinfo.query
assert fragment == uriinfo.fragment
assert is_ipv6 == uriinfo.is_ipv6
assert host_is_ipv4_string == uriinfo.host_is_ipv4_string
def testURIs(self): def _compare(uri, scheme='',
self._compare("lxc://", scheme="lxc") transport='', port='', username='', path='',
self._compare("qemu:///session", scheme="qemu", path="/session") hostname='', query='', fragment='',
self._compare("http://foobar.com:5901/my/example.path#my-frag", is_ipv6=False, host_is_ipv4_string=False):
scheme="http", hostname="foobar.com", uriinfo = URI(uri)
port="5901", path='/my/example.path', assert scheme == uriinfo.scheme
fragment="my-frag") assert transport == uriinfo.transport
self._compare( assert port == uriinfo.port
"gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img", assert username == uriinfo.username
scheme="gluster", transport="tcp", assert path == uriinfo.path
hostname="1:2:3:4:5:6:7:8", port="24007", assert hostname == uriinfo.hostname
path="/testvol/dir/a.img", is_ipv6=True) assert query == uriinfo.query
self._compare( assert fragment == uriinfo.fragment
"qemu+ssh://root@192.168.2.3/system?no_verify=1", assert is_ipv6 == uriinfo.is_ipv6
scheme="qemu", transport="ssh", username="root", assert host_is_ipv4_string == uriinfo.host_is_ipv4_string
hostname="192.168.2.3", path="/system",
query="no_verify=1", host_is_ipv4_string=True)
self._compare(
"qemu+ssh://foo%5Cbar@hostname/system",
scheme="qemu", path="/system", transport="ssh",
hostname="hostname", username="foo\\bar")
self._compare(
"qemu+ssh://user%40domain.org@hostname/system",
scheme="qemu", path="/system", transport="ssh",
hostname="hostname", username="user@domain.org")
def test_magicuri_connver(self):
uri = tests.utils.URIs.test_default + ",connver=1,libver=2"
conn = tests.utils.URIs.openconn(uri)
assert conn.conn_version() == 1
assert conn.local_libvirt_version() == 2
conn = tests.utils.URIs.openconn("test:///default") def testURIs():
# Add some support tests with it _compare("lxc://", scheme="lxc")
with pytest.raises(ValueError, _compare("qemu:///session", scheme="qemu", path="/session")
match=".*type <class 'libvirt.virDomain'>.*"): _compare("http://foobar.com:5901/my/example.path#my-frag",
conn.support.domain_xml_inactive("foo") scheme="http", hostname="foobar.com",
port="5901", path='/my/example.path',
fragment="my-frag")
_compare(
"gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img",
scheme="gluster", transport="tcp",
hostname="1:2:3:4:5:6:7:8", port="24007",
path="/testvol/dir/a.img", is_ipv6=True)
_compare(
"qemu+ssh://root@192.168.2.3/system?no_verify=1",
scheme="qemu", transport="ssh", username="root",
hostname="192.168.2.3", path="/system",
query="no_verify=1", host_is_ipv4_string=True)
_compare(
"qemu+ssh://foo%5Cbar@hostname/system",
scheme="qemu", path="/system", transport="ssh",
hostname="hostname", username="foo\\bar")
_compare(
"qemu+ssh://user%40domain.org@hostname/system",
scheme="qemu", path="/system", transport="ssh",
hostname="hostname", username="user@domain.org")
# pylint: disable=protected-access
from virtinst import support
def _run(**kwargs):
check = support._SupportCheck(**kwargs)
return check(conn)
assert _run(function="virNope.Foo") is False def test_magicuri_connver():
assert _run(function="virDomain.IDontExist") is False uri = tests.utils.URIs.test_default + ",connver=1,libver=2"
assert _run(function="virDomain.isActive") is True conn = tests.utils.URIs.openconn(uri)
assert _run(function="virConnect.getVersion", assert conn.conn_version() == 1
flag="SOME_FLAG_DOESNT_EXIST") is False assert conn.local_libvirt_version() == 2
assert _run(version="1000.0.0") is False
assert _run(hv_version={"test": "1000.0.0"}) is False
assert _run(hv_libvirt_version={"test": "1000.0.0"}) is False
assert _run(hv_libvirt_version={"qemu": "1.2.3"}) is False
assert _run(hv_libvirt_version={"qemu": "1.2.3", "all": 0}) is True
dom = conn.lookupByName("test") conn = tests.utils.URIs.openconn("test:///default")
assert conn.support.domain_xml_inactive(dom) is True # Add some support tests with it
with pytest.raises(ValueError,
match=".*type <class 'libvirt.virDomain'>.*"):
conn.support.domain_xml_inactive("foo")
# pylint: disable=protected-access
from virtinst import support
def _run(**kwargs):
check = support._SupportCheck(**kwargs)
return check(conn)
assert _run(function="virNope.Foo") is False
assert _run(function="virDomain.IDontExist") is False
assert _run(function="virDomain.isActive") is True
assert _run(function="virConnect.getVersion",
flag="SOME_FLAG_DOESNT_EXIST") is False
assert _run(version="1000.0.0") is False
assert _run(hv_version={"test": "1000.0.0"}) is False
assert _run(hv_libvirt_version={"test": "1000.0.0"}) is False
assert _run(hv_libvirt_version={"qemu": "1.2.3"}) is False
assert _run(hv_libvirt_version={"qemu": "1.2.3", "all": 0}) is True
dom = conn.lookupByName("test")
assert conn.support.domain_xml_inactive(dom) is True

View File

@ -6,7 +6,6 @@
import os import os
import re import re
import sys import sys
import unittest
import pytest import pytest
@ -126,7 +125,7 @@ def _testGuest(testdata, guest):
msg = _skipmsg(testdata) msg = _skipmsg(testdata)
if msg: if msg:
raise unittest.SkipTest(msg) raise pytest.skip(msg)
installer = Installer(guest.conn, location=url) installer = Installer(guest.conn, location=url)
try: try:
@ -190,24 +189,22 @@ def _testURL(testdata):
_testGuest(testdata, xenguest) _testGuest(testdata, xenguest)
# Register tests to be picked up by unittest def test001BadURL():
class URLTests(unittest.TestCase): badurl = "http://aksdkakskdfa-idontexist.com/foo/tree"
def test001BadURL(self):
badurl = "http://aksdkakskdfa-idontexist.com/foo/tree"
with pytest.raises(ValueError, match=".*maybe you mistyped.*"): with pytest.raises(ValueError, match=".*maybe you mistyped.*"):
installer = Installer(hvmguest.conn, location=badurl) installer = Installer(hvmguest.conn, location=badurl)
installer.detect_distro(hvmguest) installer.detect_distro(hvmguest)
# Non-existent cdrom fails # Non-existent cdrom fails
with pytest.raises(ValueError, match=".*non-existent path.*"): with pytest.raises(ValueError, match=".*non-existent path.*"):
installer = Installer(hvmguest.conn, cdrom="/not/exist/foobar") installer = Installer(hvmguest.conn, cdrom="/not/exist/foobar")
assert installer.detect_distro(hvmguest) is None
# Ensure existing but non-distro file doesn't error
installer = Installer(hvmguest.conn, cdrom="/dev/null")
assert installer.detect_distro(hvmguest) is None assert installer.detect_distro(hvmguest) is None
# Ensure existing but non-distro file doesn't error
installer = Installer(hvmguest.conn, cdrom="/dev/null")
assert installer.detect_distro(hvmguest) is None
def _make_tests(): def _make_tests():
import configparser import configparser
@ -238,7 +235,7 @@ def _make_tests():
for key, testdata in sorted(urls.items()): for key, testdata in sorted(urls.items()):
def _make_wrapper(d): def _make_wrapper(d):
return lambda _self: _testURL(d) return lambda _self: _testURL(d)
methodname = "testURL%s" % key.replace("-", "_") methodname = "test_URL%s" % key.replace("-", "_")
setattr(URLTests, methodname, _make_wrapper(testdata)) globals()[methodname] = _make_wrapper(testdata)
_make_tests() _make_tests()

File diff suppressed because it is too large Load Diff

View File

@ -5,9 +5,9 @@
import os import os
import sys import sys
import unittest
import libvirt import libvirt
import pytest
import virtinst import virtinst
import virtinst.uri import virtinst.uri
@ -16,7 +16,6 @@ from virtinst import xmlutil
# pylint: disable=protected-access # pylint: disable=protected-access
# Access to protected member, needed to unittest stuff
class _TestConfig(object): class _TestConfig(object):
""" """
@ -138,7 +137,7 @@ class _URIs(object):
print(self._testdriver_error, file=sys.stderr) print(self._testdriver_error, file=sys.stderr)
if is_testdriver_xml and self._testdriver_error: if is_testdriver_xml and self._testdriver_error:
raise unittest.SkipTest(self._testdriver_error) pytest.skip(self._testdriver_error)
uri = conn._open_uri uri = conn._open_uri