From fa322588b47b5f383e934e2852bea1c1795b970f Mon Sep 17 00:00:00 2001 From: Cole Robinson Date: Fri, 18 Sep 2020 16:26:28 -0400 Subject: [PATCH] tests: Drop most unittest usage from virtinst tests Kill usage of the TestCase class, move more to pytest standard style Signed-off-by: Cole Robinson --- tests/test_capabilities.py | 164 +-- tests/test_cli.py | 108 +- tests/test_conn.py | 141 ++- tests/test_disk.py | 9 +- tests/test_inject.py | 35 +- tests/test_nodedev.py | 229 ++-- tests/test_osdict.py | 172 +-- tests/test_storage.py | 232 ++-- tests/test_uriparse.py | 146 +-- tests/test_urls.py | 33 +- tests/test_xmlparse.py | 2229 ++++++++++++++++++------------------ tests/utils.py | 5 +- 12 files changed, 1770 insertions(+), 1733 deletions(-) diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py index 1f2244473..b6b870e59 100644 --- a/tests/test_capabilities.py +++ b/tests/test_capabilities.py @@ -4,7 +4,6 @@ # See the COPYING file in the top-level directory. import os -import unittest import pytest @@ -17,93 +16,98 @@ from virtinst import DomainCapabilities DATADIR = utils.DATADIR + "/capabilities" -class TestCapabilities(unittest.TestCase): - def _buildCaps(self, filename): - path = os.path.join(DATADIR, filename) - conn = utils.URIs.open_testdefault_cached() - return Capabilities(conn, open(path).read()) - - def testCapsCPUFeaturesNewSyntax(self): - filename = "test-qemu-with-kvm.xml" - host_feature_list = ['lahf_lm', 'xtpr', 'cx16', 'tm2', 'est', 'vmx', - 'ds_cpl', 'pbe', 'tm', 'ht', 'ss', 'acpi', 'ds'] - - caps = self._buildCaps(filename) - for f in host_feature_list: - assert f in [feat.name for feat in caps.host.cpu.features] - - assert caps.host.cpu.model == "core2duo" - assert caps.host.cpu.vendor == "Intel" - assert caps.host.cpu.topology.threads == 3 - assert caps.host.cpu.topology.cores == 5 - assert caps.host.cpu.topology.sockets == 7 - - def testCapsUtilFuncs(self): - caps_with_kvm = self._buildCaps("test-qemu-with-kvm.xml") - caps_no_kvm = self._buildCaps("test-qemu-no-kvm.xml") - caps_empty = self._buildCaps("test-empty.xml") - - def test_utils(caps, has_guests, is_kvm): - assert caps.has_install_options() == has_guests - if caps.guests: - assert caps.guests[0].is_kvm_available() == is_kvm - - test_utils(caps_empty, False, False) - test_utils(caps_with_kvm, True, True) - test_utils(caps_no_kvm, True, False) - - # Small test for extra unittest coverage - with pytest.raises(ValueError, match=r".*virtualization type 'xen'.*"): - caps_empty.guest_lookup(os_type="linux") - with pytest.raises(ValueError, match=r".*not support any.*"): - caps_empty.guest_lookup() - - def testCapsNuma(self): - cells = self._buildCaps("lxc.xml").host.topology.cells - assert len(cells) == 1 - assert len(cells[0].cpus) == 8 - assert cells[0].cpus[3].id == '3' +def _buildCaps(filename): + path = os.path.join(DATADIR, filename) + conn = utils.URIs.open_testdefault_cached() + return Capabilities(conn, open(path).read()) - ############################## - # domcapabilities.py testing # - ############################## +def testCapsCPUFeaturesNewSyntax(): + filename = "test-qemu-with-kvm.xml" + host_feature_list = ['lahf_lm', 'xtpr', 'cx16', 'tm2', 'est', 'vmx', + 'ds_cpl', 'pbe', 'tm', 'ht', 'ss', 'acpi', 'ds'] - def testDomainCapabilities(self): - xml = open(DATADIR + "/test-domcaps.xml").read() - caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml) + caps = _buildCaps(filename) + for f in host_feature_list: + assert f in [feat.name for feat in caps.host.cpu.features] - assert caps.os.loader.supported is True - assert caps.os.loader.get_values() == ["/foo/bar", "/tmp/my_path"] - assert caps.os.loader.enum_names() == ["type", "readonly"] - assert caps.os.loader.get_enum("type").get_values() == [ - "rom", "pflash"] + assert caps.host.cpu.model == "core2duo" + assert caps.host.cpu.vendor == "Intel" + assert caps.host.cpu.topology.threads == 3 + assert caps.host.cpu.topology.cores == 5 + assert caps.host.cpu.topology.sockets == 7 - def testDomainCapabilitiesx86(self): - xml = open(DATADIR + "/kvm-x86_64-domcaps.xml").read() - caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml) - assert caps.machine == "pc-i440fx-2.1" - assert caps.arch == "x86_64" - assert caps.domain == "kvm" - assert caps.path == "/bin/qemu-system-x86_64" +def testCapsUtilFuncs(): + caps_with_kvm = _buildCaps("test-qemu-with-kvm.xml") + caps_no_kvm = _buildCaps("test-qemu-no-kvm.xml") + caps_empty = _buildCaps("test-empty.xml") - custom_mode = caps.cpu.get_mode("custom") - assert bool(custom_mode) - cpu_model = custom_mode.get_model("Opteron_G4") - assert bool(cpu_model) - assert cpu_model.usable + def test_utils(caps, has_guests, is_kvm): + assert caps.has_install_options() == has_guests + if caps.guests: + assert caps.guests[0].is_kvm_available() == is_kvm - models = caps.get_cpu_models() - assert len(models) > 10 - assert "SandyBridge" in models + test_utils(caps_empty, False, False) + test_utils(caps_with_kvm, True, True) + test_utils(caps_no_kvm, True, False) - assert caps.label_for_firmware_path(None) == "BIOS" - assert "Custom:" in caps.label_for_firmware_path("/foobar") - assert "UEFI" in caps.label_for_firmware_path("OVMF") + # Small test for extra coverage + with pytest.raises(ValueError, match=r".*virtualization type 'xen'.*"): + caps_empty.guest_lookup(os_type="linux") + with pytest.raises(ValueError, match=r".*not support any.*"): + caps_empty.guest_lookup() - def testDomainCapabilitiesAArch64(self): - xml = open(DATADIR + "/kvm-aarch64-domcaps.xml").read() - caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml) - assert "None" in caps.label_for_firmware_path(None) +def testCapsNuma(): + cells = _buildCaps("lxc.xml").host.topology.cells + assert len(cells) == 1 + assert len(cells[0].cpus) == 8 + assert cells[0].cpus[3].id == '3' + + +############################## +# domcapabilities.py testing # +############################## + + +def testDomainCapabilities(): + xml = open(DATADIR + "/test-domcaps.xml").read() + caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml) + + assert caps.os.loader.supported is True + assert caps.os.loader.get_values() == ["/foo/bar", "/tmp/my_path"] + assert caps.os.loader.enum_names() == ["type", "readonly"] + assert caps.os.loader.get_enum("type").get_values() == [ + "rom", "pflash"] + + +def testDomainCapabilitiesx86(): + xml = open(DATADIR + "/kvm-x86_64-domcaps.xml").read() + caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml) + + assert caps.machine == "pc-i440fx-2.1" + assert caps.arch == "x86_64" + assert caps.domain == "kvm" + assert caps.path == "/bin/qemu-system-x86_64" + + custom_mode = caps.cpu.get_mode("custom") + assert bool(custom_mode) + cpu_model = custom_mode.get_model("Opteron_G4") + assert bool(cpu_model) + assert cpu_model.usable + + models = caps.get_cpu_models() + assert len(models) > 10 + assert "SandyBridge" in models + + assert caps.label_for_firmware_path(None) == "BIOS" + assert "Custom:" in caps.label_for_firmware_path("/foobar") + assert "UEFI" in caps.label_for_firmware_path("OVMF") + + +def testDomainCapabilitiesAArch64(): + xml = open(DATADIR + "/kvm-aarch64-domcaps.xml").read() + caps = DomainCapabilities(utils.URIs.open_testdriver_cached(), xml) + + assert "None" in caps.label_for_firmware_path(None) diff --git a/tests/test_cli.py b/tests/test_cli.py index 51dff97d4..b639d08f1 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -10,7 +10,8 @@ import shlex import shutil import sys import traceback -import unittest + +import pytest try: import argcomplete @@ -155,7 +156,7 @@ class SkipChecks: msg = "Skipping check due to version < %s" % check if skip: - raise unittest.case.SkipTest(msg) + raise pytest.skip(msg) def prerun_skip(self, conn): self._check(conn, self.prerun_check) @@ -338,15 +339,8 @@ class Command(object): if self.compare_file: self._check_compare_file(conn, output) - def run(self, tests): - err = None - - try: - self._run() - except AssertionError as e: - err = self.cmdstr + "\n" + str(e) - if err: - tests.fail(err) + def run(self): + self._run() class _CategoryProxy(object): @@ -1453,32 +1447,35 @@ _add_argcomplete_cmd("virt-xml --sound mode", "model") ############## -class CLIMiscTests(unittest.TestCase): - @utils.run_without_testsuite_hacks - def test_virtinstall_no_testsuite(self): - """ - Run virt-install stub command without the testsuite hacks, to test - some code paths like proper logging etc. - """ - cmd = Command( - "virt-install --connect %s " - "--test-stub-command --noautoconsole" % - (utils.URIs.test_suite)) - cmd.run(self) +@utils.run_without_testsuite_hacks +def test_virtinstall_no_testsuite(): + """ + Run virt-install stub command without the testsuite hacks, to test + some code paths like proper logging etc. + """ + cmd = Command( + "virt-install --connect %s " + "--test-stub-command --noautoconsole" % + (utils.URIs.test_suite)) + cmd.run() ######################### # Test runner functions # ######################### -newidx = 0 -curtest = 0 +_CURTEST = 0 def setup(): """ Create initial test files/dirs """ + global _CURTEST + _CURTEST += 1 + if _CURTEST != 1: + return + for i in EXIST_FILES: open(i, "a") @@ -1500,40 +1497,41 @@ def cleanup(clean_all=True): os.unlink(i) -class CLITests(unittest.TestCase): - def setUp(self): - global curtest - curtest += 1 - # Only run this for first test - if curtest == 1: +def _create_testfunc(cmd, do_setup): + def cmdtemplate(): + if do_setup: setup() - - def tearDown(self): - # Only run this on the last test - if curtest == newidx: - cleanup() + cmd.run() + return cmdtemplate -def maketest(cmd): - def cmdtemplate(self, _cmdobj): - _cmdobj.run(self) - return lambda s: cmdtemplate(s, cmd) +def _make_testcases(): + """ + Turn all the registered cli strings into test functions that + the test runner can scoop up + """ + cmdlist = [] + cmdlist += vinst.cmds + cmdlist += vclon.cmds + cmdlist += vixml.cmds + cmdlist += ARGCOMPLETE_CMDS -_cmdlist = [] -_cmdlist += vinst.cmds -_cmdlist += vclon.cmds -_cmdlist += vixml.cmds -_cmdlist += ARGCOMPLETE_CMDS + newidx = 0 + for cmd in cmdlist: + newidx += 1 + # Generate numbered names like testCLI%d + name = "testCLI%.4d" % newidx -# Generate numbered names like testCLI%d -for _cmd in _cmdlist: - newidx += 1 - _name = "testCLI%.4d" % newidx - if _cmd.compare_file: - _base = os.path.splitext(os.path.basename(_cmd.compare_file))[0] - _name += _base.replace("-", "_") - else: - _name += os.path.basename(_cmd.app.replace("-", "_")) - setattr(CLITests, _name, maketest(_cmd)) + if cmd.compare_file: + base = os.path.splitext(os.path.basename(cmd.compare_file))[0] + name += base.replace("-", "_") + else: + name += os.path.basename(cmd.app.replace("-", "_")) + do_setup = newidx == 1 + testfunc = _create_testfunc(cmd, do_setup) + globals()[name] = testfunc + + +_make_testcases() atexit.register(cleanup) diff --git a/tests/test_conn.py b/tests/test_conn.py index 73a1ccfd1..95987fd5e 100644 --- a/tests/test_conn.py +++ b/tests/test_conn.py @@ -2,8 +2,6 @@ # See the COPYING file in the top-level directory. import os -import unittest -import unittest.mock import pytest @@ -13,83 +11,84 @@ from virtinst import StoragePool from virtinst import URI -class TestConn(unittest.TestCase): - """ - VirtinstConnection tests - """ - def test_misc(self): - # Misc API checks - conn = cli.getConnection("test:///default") - conn.invalidate_caps() - assert conn.is_open() is True - assert conn.is_container_only() is False - assert conn.is_openvz() is False - assert not conn.get_uri_hostname() - assert not conn.get_uri_port() - assert not conn.get_uri_username() - assert not conn.get_uri_transport() - assert conn.close() == 0 +############################ +# VirtinstConnection tests # +############################ - # Coverage for a daemon_version check - fakeuri = "__virtinst_test__test:///default,libver=123" - conn = cli.getConnection(fakeuri) - assert conn.daemon_version() == 123 +def test_misc(): + # Misc API checks + conn = cli.getConnection("test:///default") + conn.invalidate_caps() + assert conn.is_open() is True + assert conn.is_container_only() is False + assert conn.is_openvz() is False + assert not conn.get_uri_hostname() + assert not conn.get_uri_port() + assert not conn.get_uri_username() + assert not conn.get_uri_transport() + assert conn.close() == 0 - # Hit a special code path that reflects default libvirt transport - # pylint: disable=protected-access - conn._uriobj = URI("qemu://example.com/system") - assert conn.get_uri_transport() == "tls" + # Coverage for a daemon_version check + fakeuri = "__virtinst_test__test:///default,libver=123" + conn = cli.getConnection(fakeuri) + assert conn.daemon_version() == 123 - # Hit the qemu:///embed case privileged case check - fakeuri = "__virtinst_test__test:///default,fakeuri=qemu:///embed" - conn = cli.getConnection(fakeuri) - assert conn.is_privileged() == (os.getuid() == 0) + # Hit a special code path that reflects default libvirt transport + # pylint: disable=protected-access + conn._uriobj = URI("qemu://example.com/system") + assert conn.get_uri_transport() == "tls" - # Hit fakuuri validation error, for old style opts - with pytest.raises(RuntimeError): - cli.getConnection(fakeuri + ",qemu") + # Hit the qemu:///embed case privileged case check + fakeuri = "__virtinst_test__test:///default,fakeuri=qemu:///embed" + conn = cli.getConnection(fakeuri) + assert conn.is_privileged() == (os.getuid() == 0) - @unittest.mock.patch.dict(os.environ, - {"LIBVIRT_DEFAULT_URI": "test:///default"}) - def test_default_uri(self): - # Handle connecting to None conn - conn = cli.getConnection(None) - assert conn.getURI() == "test:///default" - conn.close() + # Hit fakuuri validation error, for old style opts + with pytest.raises(RuntimeError): + cli.getConnection(fakeuri + ",qemu") - def test_poll(self): - # Add coverage for conn fetch_* handling, and pollhelpers - conn = cli.getConnection("test:///default") - objmap = {} - def build_cb(obj, name): - return obj +def test_default_uri(monkeypatch): + monkeypatch.setitem(os.environ, "LIBVIRT_DEFAULT_URI", "test:///default") - gone, new, master = pollhelpers.fetch_nets(conn, {}, build_cb) - assert len(gone) == 0 - assert len(new) == 1 - assert len(master) == 1 - assert master[0].name() == "default" + # Handle connecting to None conn + conn = cli.getConnection(None) + assert conn.getURI() == "test:///default" + conn.close() - objmap = dict((obj.name(), obj) for obj in master) - gone, new, master = pollhelpers.fetch_nets(conn, objmap, build_cb) - assert len(gone) == 0 - assert len(new) == 0 - assert len(master) == 1 - assert master[0].name() == "default" - # coverage for some special cases in cache_new_pool - def makepool(name, create): - poolxml = StoragePool(conn) - poolxml.type = "dir" - poolxml.name = name - poolxml.target_path = "/tmp/foo/bar/baz/%s" % name - return poolxml.install(create=create) +def test_poll(): + # Add coverage for conn fetch_* handling, and pollhelpers + conn = cli.getConnection("test:///default") + objmap = {} + def build_cb(obj, name): + return obj - poolobj1 = makepool("conntest1", False) - conn.fetch_all_pools() - poolobj2 = makepool("conntest2", True) - conn.fetch_all_vols() - poolobj1.undefine() - poolobj2.destroy() - poolobj2.undefine() + gone, new, master = pollhelpers.fetch_nets(conn, {}, build_cb) + assert len(gone) == 0 + assert len(new) == 1 + assert len(master) == 1 + assert master[0].name() == "default" + + objmap = dict((obj.name(), obj) for obj in master) + gone, new, master = pollhelpers.fetch_nets(conn, objmap, build_cb) + assert len(gone) == 0 + assert len(new) == 0 + assert len(master) == 1 + assert master[0].name() == "default" + + # coverage for some special cases in cache_new_pool + def makepool(name, create): + poolxml = StoragePool(conn) + poolxml.type = "dir" + poolxml.name = name + poolxml.target_path = "/tmp/foo/bar/baz/%s" % name + return poolxml.install(create=create) + + poolobj1 = makepool("conntest1", False) + conn.fetch_all_pools() + poolobj2 = makepool("conntest2", True) + conn.fetch_all_vols() + poolobj1.undefine() + poolobj2.destroy() + poolobj2.undefine() diff --git a/tests/test_disk.py b/tests/test_disk.py index 3b1495ac0..17e72d19a 100644 --- a/tests/test_disk.py +++ b/tests/test_disk.py @@ -5,7 +5,6 @@ import os import tempfile -import unittest import pytest @@ -50,8 +49,8 @@ def test_disk_numtotarget(): assert disk.generate_target(['hda', 'hdd']) == 'hdb' -def test_disk_dir_searchable(): - # Normally the dir searchable test is skipped in the unittest, +def test_disk_dir_searchable(monkeypatch): + # Normally the dir searchable test is skipped in the test suite, # but let's contrive an example that should trigger all the code # to ensure it isn't horribly broken conn = utils.URIs.open_kvm() @@ -87,8 +86,8 @@ def test_disk_dir_searchable(): assert not bool(errdict) # Mock setfacl to definitely fail - with unittest.mock.patch("virtinst.diskbackend.SETFACL", - "getfacl"): + with monkeypatch.context() as m: + m.setattr("virtinst.diskbackend.SETFACL", "getfacl") errdict = virtinst.DeviceDisk.fix_path_search(searchdata) finally: diff --git a/tests/test_inject.py b/tests/test_inject.py index 1fb0db919..db80063d2 100644 --- a/tests/test_inject.py +++ b/tests/test_inject.py @@ -5,7 +5,6 @@ import os import sys -import unittest _alldistros = {} @@ -89,16 +88,8 @@ def _test_distro(distro): os.system(cmd) -_printinitrd = False - - -class InjectTests(unittest.TestCase): - def setUp(self): - global _printinitrd - if _printinitrd: - return - - print(""" +def _print_intro(): + print(""" This is an interactive test suite. @@ -108,18 +99,26 @@ injections, that will cause installs to quickly fail. Look for the failure pattern to confirm that initrd injections are working as expected. """) - prompt() - _printinitrd = True + prompt() + + +def _build_testfunc(dobj, do_setup): + def testfunc(): + if do_setup: + _print_intro() + _test_distro(dobj) + return testfunc def _make_tests(): - def _make_check_cb(_d): - return lambda s: _test_distro(_d) - idx = 0 for dname, dobj in _alldistros.items(): idx += 1 - setattr(InjectTests, "testInitrd%.3d_%s" % - (idx, dname.replace("-", "_")), _make_check_cb(dobj)) + name = "testInitrd%.3d_%s" % (idx, dname.replace("-", "_")) + + do_setup = idx == 1 + testfunc = _build_testfunc(dobj, do_setup) + globals()[name] = testfunc + _make_tests() diff --git a/tests/test_nodedev.py b/tests/test_nodedev.py index 7db199950..406e321f8 100644 --- a/tests/test_nodedev.py +++ b/tests/test_nodedev.py @@ -6,7 +6,6 @@ # See the COPYING file in the top-level directory. import os.path -import unittest import pytest @@ -27,117 +26,137 @@ funky_chars_xml = """ DATADIR = utils.DATADIR + "/nodedev/" -class TestNodeDev(unittest.TestCase): - @property - def conn(self): - return utils.URIs.open_testdriver_cached() - - def _nodeDevFromName(self, devname): - node = self.conn.nodeDeviceLookupByName(devname) - xml = node.XMLDesc(0) - return NodeDevice(self.conn, xml) - - def _testNode2DeviceCompare(self, nodename, devfile, nodedev=None): - devfile = os.path.join(DATADIR, "devxml", devfile) - if not nodedev: - nodedev = self._nodeDevFromName(nodename) - - dev = DeviceHostdev(self.conn) - dev.set_from_nodedev(nodedev) - dev.set_defaults(Guest(self.conn)) - utils.diff_compare(dev.get_xml() + "\n", devfile) - - def testFunkyChars(self): - # Ensure parsing doesn't fail - dev = NodeDevice(self.conn, funky_chars_xml) - assert dev.name == "L3B2616" - assert dev.device_type == "LENOVO" - - def testNetDevice(self): - devname = "net_00_1c_25_10_b1_e4" - dev = self._nodeDevFromName(devname) - assert dev.name == devname - assert dev.parent == "pci_8086_1049" - assert dev.device_type == "net" - assert dev.interface == "eth0" - - def testPCIDevice(self): - nodename = "pci_8086_10fb" - obj = self._nodeDevFromName(nodename) - assert obj.is_pci_sriov() is True - nodename = "pci_8086_2448" - obj = self._nodeDevFromName(nodename) - assert obj.is_pci_bridge() is True +def _nodeDevFromName(conn, devname): + node = conn.nodeDeviceLookupByName(devname) + xml = node.XMLDesc(0) + return NodeDevice(conn, xml) - def testUSBDevDevice(self): - devname = "usb_device_781_5151_2004453082054CA1BEEE" - dev = self._nodeDevFromName(devname) - assert dev.vendor_name == "SanDisk Corp." - assert dev.product_name == "Cruzer Micro 256/512MB Flash Drive" +def _testNode2DeviceCompare(conn, nodename, devfile, nodedev=None): + devfile = os.path.join(DATADIR, "devxml", devfile) + if not nodedev: + nodedev = _nodeDevFromName(conn, nodename) - devname = "usb_device_1d6b_1_0000_00_1a_0" - dev = self._nodeDevFromName(devname) - assert dev.is_usb_linux_root_hub() is True - - def testSCSIDevice(self): - devname = "pci_8086_2829_scsi_host_scsi_device_lun0" - dev = self._nodeDevFromName(devname) - assert dev.host == "0" - assert dev.bus == "0" - assert dev.target == "0" - - def testStorageDevice(self): - devname = "storage_serial_SATA_WDC_WD1600AAJS__WD_WCAP95119685" - dev = self._nodeDevFromName(devname) - assert dev.block == "/dev/sda" - assert dev.drive_type == "disk" - assert dev.media_available is None - - devname = "storage_model_DVDRAM_GSA_U1200N" - dev = self._nodeDevFromName(devname) - assert dev.media_label == "Fedora12_media" - assert dev.media_available == 1 - - def testSCSIBus(self): - devname = "pci_8086_2829_scsi_host_1" - dev = self._nodeDevFromName(devname) - assert dev.host == "2" - - def testDRMDevice(self): - devname = "drm_renderD129" - dev = self._nodeDevFromName(devname) - assert dev.devnodes[0].path == "/dev/dri/renderD129" - assert dev.devnodes[0].node_type == "dev" - assert dev.devnodes[1].path == "/dev/dri/by-path/pci-0000:00:02.0-render" - assert dev.devnodes[1].node_type == "link" - assert dev.is_drm_render() is True - assert dev.get_devnode("frob") + dev = DeviceHostdev(conn) + dev.set_from_nodedev(nodedev) + dev.set_defaults(Guest(conn)) + utils.diff_compare(dev.get_xml() + "\n", devfile) - # NodeDevice 2 Device XML tests - def testNodeDev2USB1(self): - nodename = "usb_device_781_5151_2004453082054CA1BEEE" - devfile = "usbdev1.xml" - self._testNode2DeviceCompare(nodename, devfile) +def testFunkyChars(): + # Ensure parsing doesn't fail + conn = utils.URIs.open_testdriver_cached() + dev = NodeDevice(conn, funky_chars_xml) + assert dev.name == "L3B2616" + assert dev.device_type == "LENOVO" - def testNodeDev2USB2(self): - nodename = "usb_device_1d6b_2_0000_00_1d_7" - devfile = "usbdev2.xml" - nodedev = self._nodeDevFromName(nodename) - self._testNode2DeviceCompare(nodename, devfile, nodedev=nodedev) +def testNetDevice(): + conn = utils.URIs.open_testdriver_cached() + devname = "net_00_1c_25_10_b1_e4" + dev = _nodeDevFromName(conn, devname) + assert dev.name == devname + assert dev.parent == "pci_8086_1049" + assert dev.device_type == "net" + assert dev.interface == "eth0" - def testNodeDev2PCI(self): - nodename = "pci_1180_592" - devfile = "pcidev.xml" - self._testNode2DeviceCompare(nodename, devfile) - def testNodeDevFail(self): - nodename = "usb_device_1d6b_1_0000_00_1d_1_if0" - devfile = "" +def testPCIDevice(): + conn = utils.URIs.open_testdriver_cached() + nodename = "pci_8086_10fb" + obj = _nodeDevFromName(conn, nodename) + assert obj.is_pci_sriov() is True + nodename = "pci_8086_2448" + obj = _nodeDevFromName(conn, nodename) + assert obj.is_pci_bridge() is True - # This should exist, since usbbus is not a valid device to - # pass to a guest. - pytest.raises(ValueError, - self._testNode2DeviceCompare, nodename, devfile) + + +def testUSBDevDevice(): + conn = utils.URIs.open_testdriver_cached() + devname = "usb_device_781_5151_2004453082054CA1BEEE" + dev = _nodeDevFromName(conn, devname) + assert dev.vendor_name == "SanDisk Corp." + assert dev.product_name == "Cruzer Micro 256/512MB Flash Drive" + + devname = "usb_device_1d6b_1_0000_00_1a_0" + dev = _nodeDevFromName(conn, devname) + assert dev.is_usb_linux_root_hub() is True + + +def testSCSIDevice(): + conn = utils.URIs.open_testdriver_cached() + devname = "pci_8086_2829_scsi_host_scsi_device_lun0" + dev = _nodeDevFromName(conn, devname) + assert dev.host == "0" + assert dev.bus == "0" + assert dev.target == "0" + + +def testStorageDevice(): + conn = utils.URIs.open_testdriver_cached() + devname = "storage_serial_SATA_WDC_WD1600AAJS__WD_WCAP95119685" + dev = _nodeDevFromName(conn, devname) + assert dev.block == "/dev/sda" + assert dev.drive_type == "disk" + assert dev.media_available is None + + devname = "storage_model_DVDRAM_GSA_U1200N" + dev = _nodeDevFromName(conn, devname) + assert dev.media_label == "Fedora12_media" + assert dev.media_available == 1 + + +def testSCSIBus(): + conn = utils.URIs.open_testdriver_cached() + devname = "pci_8086_2829_scsi_host_1" + dev = _nodeDevFromName(conn, devname) + assert dev.host == "2" + + +def testDRMDevice(): + conn = utils.URIs.open_testdriver_cached() + devname = "drm_renderD129" + dev = _nodeDevFromName(conn, devname) + assert dev.devnodes[0].path == "/dev/dri/renderD129" + assert dev.devnodes[0].node_type == "dev" + assert dev.devnodes[1].path == "/dev/dri/by-path/pci-0000:00:02.0-render" + assert dev.devnodes[1].node_type == "link" + assert dev.is_drm_render() is True + assert dev.get_devnode("frob") + + +# NodeDevice 2 Device XML tests + +def testNodeDev2USB1(): + conn = utils.URIs.open_testdriver_cached() + nodename = "usb_device_781_5151_2004453082054CA1BEEE" + devfile = "usbdev1.xml" + _testNode2DeviceCompare(conn, nodename, devfile) + + +def testNodeDev2USB2(): + conn = utils.URIs.open_testdriver_cached() + nodename = "usb_device_1d6b_2_0000_00_1d_7" + devfile = "usbdev2.xml" + nodedev = _nodeDevFromName(conn, nodename) + + _testNode2DeviceCompare(conn, nodename, devfile, nodedev=nodedev) + + +def testNodeDev2PCI(): + conn = utils.URIs.open_testdriver_cached() + nodename = "pci_1180_592" + devfile = "pcidev.xml" + _testNode2DeviceCompare(conn, nodename, devfile) + + +def testNodeDevFail(): + conn = utils.URIs.open_testdriver_cached() + nodename = "usb_device_1d6b_1_0000_00_1d_1_if0" + devfile = "" + + # This should exist, since usbbus is not a valid device to + # pass to a guest. + with pytest.raises(ValueError): + _testNode2DeviceCompare(conn, nodename, devfile) diff --git a/tests/test_osdict.py b/tests/test_osdict.py index f559c061a..aae913e77 100644 --- a/tests/test_osdict.py +++ b/tests/test_osdict.py @@ -3,7 +3,7 @@ # This work is licensed under the GNU GPLv2 or later. # See the COPYING file in the top-level directory. -import unittest +import pytest from virtinst import Guest from virtinst import OSDB @@ -13,101 +13,109 @@ from virtinst.install import urldetect from tests import utils -class TestOSDB(unittest.TestCase): - """ - Test osdict/OSDB - """ - def test_osdict_aliases_ro(self): - aliases = getattr(OSDB, "_aliases") +################## +# Test osdict.py # +################## - if len(aliases) != 42: - raise AssertionError(_("OSDB._aliases changed size. It " - "should never be extended, since it is only for back " - "compat with pre-libosinfo osdict.py")) - def test_list_os(self): - OSDB.list_os() +def test_osdict_aliases_ro(): + aliases = getattr(OSDB, "_aliases") - def test_recommended_resources(self): - conn = utils.URIs.open_testdefault_cached() - guest = Guest(conn) - res = OSDB.lookup_os("generic").get_recommended_resources() - assert res.get_recommended_ram(guest.os.arch) is None + if len(aliases) != 42: + raise AssertionError(_("OSDB._aliases changed size. It " + "should never be extended, since it is only for back " + "compat with pre-libosinfo osdict.py")) - res = OSDB.lookup_os("fedora21").get_recommended_resources() - assert res.get_recommended_ncpus(guest.os.arch) == 2 - def test_urldetct_matching_distros(self): - # pylint: disable=protected-access - allstores = urldetect._build_distro_list(OSDB.lookup_os("generic")) +def test_list_os(): + OSDB.list_os() - seen_distro = [] - for store in allstores: - for distro in store.matching_distros: - if distro in seen_distro: - raise xmlutil.DevError( - "store=%s has conflicting matching_distro=%s " % - (store.PRETTY_NAME, distro)) - seen_distro.append(distro) - def test_tree_url(self): - f26 = OSDB.lookup_os("fedora26") - f29 = OSDB.lookup_os("fedora29") - winxp = OSDB.lookup_os("winxp") +def test_recommended_resources(): + conn = utils.URIs.open_testdefault_cached() + guest = Guest(conn) + res = OSDB.lookup_os("generic").get_recommended_resources() + assert res.get_recommended_ram(guest.os.arch) is None - # Valid tree URL - assert "fedoraproject.org" in f26.get_location("x86_64") + res = OSDB.lookup_os("fedora21").get_recommended_resources() + assert res.get_recommended_ncpus(guest.os.arch) == 2 - # Most generic tree URL - assert "Everything" in f29.get_location("x86_64") - # Specific tree - assert "Server" in f29.get_location("x86_64", "jeos") - assert "Workstation" in f29.get_location("x86_64", "desktop") +def test_urldetct_matching_distros(): + # pylint: disable=protected-access + allstores = urldetect._build_distro_list(OSDB.lookup_os("generic")) - # Has tree URLs, but none for arch - try: - f26.get_location("ia64") - raise AssertionError("Expected failure") - except RuntimeError as e: - assert "ia64" in str(e) + seen_distro = [] + for store in allstores: + for distro in store.matching_distros: + if distro in seen_distro: + raise xmlutil.DevError( + "store=%s has conflicting matching_distro=%s " % + (store.PRETTY_NAME, distro)) + seen_distro.append(distro) - # Has no tree URLs - try: - winxp.get_location("x86_64") - raise AssertionError("Expected failure") - except RuntimeError as e: - assert str(e).endswith("URL location") - # Trigger an error path for code coverage - assert OSDB.guess_os_by_tree(utils.TESTDIR) is None +def test_tree_url(): + f26 = OSDB.lookup_os("fedora26") + f29 = OSDB.lookup_os("fedora29") + winxp = OSDB.lookup_os("winxp") - def test_kernel_url(self): - def _c(name): - osobj = OSDB.lookup_os(name) - if not osobj: - self.skipTest("osinfo-db doesn't have '%s'" % name) - return osobj.get_kernel_url_arg() + # Valid tree URL + assert "fedoraproject.org" in f26.get_location("x86_64") - assert _c("rhel7-unknown") == "inst.repo" - assert _c("rhel6-unknown") == "method" - assert _c("fedora-rawhide") == "inst.repo" - assert _c("fedora20") == "inst.repo" - assert _c("generic") is None - assert _c("win10") is None - assert _c("sle15") == "install" + # Most generic tree URL + assert "Everything" in f29.get_location("x86_64") - def test_related_to(self): - # pylint: disable=protected-access - win10 = OSDB.lookup_os("win10") - assert win10._is_related_to("winxp") is True - assert win10._is_related_to("win10") is True - assert win10._is_related_to("fedora26") is False + # Specific tree + assert "Server" in f29.get_location("x86_64", "jeos") + assert "Workstation" in f29.get_location("x86_64", "desktop") - def test_drivers(self): - win7 = OSDB.lookup_os("win7") - generic = OSDB.lookup_os("generic") - assert generic.supports_unattended_drivers("x86_64") is False - assert win7.supports_unattended_drivers("x86_64") is True - assert win7.supports_unattended_drivers("fakearch") is False - assert win7.get_pre_installable_drivers_location("x86_64") + # Has tree URLs, but none for arch + try: + f26.get_location("ia64") + raise AssertionError("Expected failure") + except RuntimeError as e: + assert "ia64" in str(e) + + # Has no tree URLs + try: + winxp.get_location("x86_64") + raise AssertionError("Expected failure") + except RuntimeError as e: + assert str(e).endswith("URL location") + + # Trigger an error path for code coverage + assert OSDB.guess_os_by_tree(utils.TESTDIR) is None + + +def test_kernel_url(): + def _c(name): + osobj = OSDB.lookup_os(name) + if not osobj: + pytest.skip("osinfo-db doesn't have '%s'" % name) + return osobj.get_kernel_url_arg() + + assert _c("rhel7-unknown") == "inst.repo" + assert _c("rhel6-unknown") == "method" + assert _c("fedora-rawhide") == "inst.repo" + assert _c("fedora20") == "inst.repo" + assert _c("generic") is None + assert _c("win10") is None + assert _c("sle15") == "install" + + +def test_related_to(): + # pylint: disable=protected-access + win10 = OSDB.lookup_os("win10") + assert win10._is_related_to("winxp") is True + assert win10._is_related_to("win10") is True + assert win10._is_related_to("fedora26") is False + + +def test_drivers(): + win7 = OSDB.lookup_os("win7") + generic = OSDB.lookup_os("generic") + assert generic.supports_unattended_drivers("x86_64") is False + assert win7.supports_unattended_drivers("x86_64") is True + assert win7.supports_unattended_drivers("fakearch") is False + assert win7.get_pre_installable_drivers_location("x86_64") diff --git a/tests/test_storage.py b/tests/test_storage.py index 1bd3c0ed4..4402a89a9 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -4,7 +4,6 @@ # See the COPYING file in the top-level directory. import os -import unittest from virtinst import StoragePool, StorageVolume from virtinst import log @@ -12,7 +11,6 @@ from virtinst import log from tests import utils # pylint: disable=protected-access -# Access to protected member, needed to unittest stuff BASEPATH = os.path.join(utils.DATADIR, "storage") @@ -97,126 +95,148 @@ def createVol(conn, poolobj, volname=None, input_vol=None, clone_vol=None): return vol_inst.install(meter=False) -class TestStorage(unittest.TestCase): - @property - def conn(self): - return utils.URIs.open_testdefault_cached() +############## +# Test cases # +############## - def testDirPool(self): - poolobj = createPool(self.conn, - StoragePool.TYPE_DIR, "pool-dir") - invol = createVol(self.conn, poolobj) - createVol(self.conn, poolobj, - volname=invol.name() + "input", input_vol=invol) - createVol(self.conn, poolobj, - volname=invol.name() + "clone", clone_vol=invol) - removePool(poolobj) +def testDirPool(): + conn = utils.URIs.open_testdefault_cached() + poolobj = createPool(conn, + StoragePool.TYPE_DIR, "pool-dir") + invol = createVol(conn, poolobj) + createVol(conn, poolobj, + volname=invol.name() + "input", input_vol=invol) + createVol(conn, poolobj, + volname=invol.name() + "clone", clone_vol=invol) + removePool(poolobj) - def testFSPool(self): - poolobj = createPool(self.conn, - StoragePool.TYPE_FS, "pool-fs") - invol = createVol(self.conn, poolobj) - createVol(self.conn, poolobj, - volname=invol.name() + "input", input_vol=invol) - createVol(self.conn, poolobj, - volname=invol.name() + "clone", clone_vol=invol) - removePool(poolobj) - def testNetFSPool(self): - poolobj = createPool(self.conn, - StoragePool.TYPE_NETFS, "pool-netfs") - invol = createVol(self.conn, poolobj) - createVol(self.conn, poolobj, - volname=invol.name() + "input", input_vol=invol) - createVol(self.conn, poolobj, - volname=invol.name() + "clone", clone_vol=invol) - removePool(poolobj) +def testFSPool(): + conn = utils.URIs.open_testdefault_cached() + poolobj = createPool(conn, + StoragePool.TYPE_FS, "pool-fs") + invol = createVol(conn, poolobj) + createVol(conn, poolobj, + volname=invol.name() + "input", input_vol=invol) + createVol(conn, poolobj, + volname=invol.name() + "clone", clone_vol=invol) + removePool(poolobj) - def testLVPool(self): - poolobj = createPool(self.conn, - StoragePool.TYPE_LOGICAL, - "pool-logical", - source_name="pool-logical") - invol = createVol(self.conn, poolobj) - createVol(self.conn, poolobj, - volname=invol.name() + "input", input_vol=invol) - createVol(self.conn, - poolobj, volname=invol.name() + "clone", clone_vol=invol) - removePool(poolobj) - # Test parsing source name for target path - poolobj = createPool(self.conn, StoragePool.TYPE_LOGICAL, - "pool-logical-target-srcname", - target_path="/dev/vgfoobar") - removePool(poolobj) +def testNetFSPool(): + conn = utils.URIs.open_testdefault_cached() + poolobj = createPool(conn, + StoragePool.TYPE_NETFS, "pool-netfs") + invol = createVol(conn, poolobj) + createVol(conn, poolobj, + volname=invol.name() + "input", input_vol=invol) + createVol(conn, poolobj, + volname=invol.name() + "clone", clone_vol=invol) + removePool(poolobj) - # Test with source name - poolobj = createPool(self.conn, - StoragePool.TYPE_LOGICAL, "pool-logical-srcname", - source_name="vgname") - removePool(poolobj) - def testDiskPool(self): - poolobj = createPool(self.conn, - StoragePool.TYPE_DISK, - "pool-disk", fmt="auto", - target_path="/some/target/path") - invol = createVol(self.conn, poolobj) - createVol(self.conn, poolobj, - volname=invol.name() + "input", input_vol=invol) - createVol(self.conn, poolobj, - volname=invol.name() + "clone", clone_vol=invol) - removePool(poolobj) +def testLVPool(): + conn = utils.URIs.open_testdefault_cached() + poolobj = createPool(conn, + StoragePool.TYPE_LOGICAL, + "pool-logical", + source_name="pool-logical") + invol = createVol(conn, poolobj) + createVol(conn, poolobj, + volname=invol.name() + "input", input_vol=invol) + createVol(conn, + poolobj, volname=invol.name() + "clone", clone_vol=invol) + removePool(poolobj) - def testISCSIPool(self): - poolobj = createPool(self.conn, - StoragePool.TYPE_ISCSI, "pool-iscsi", - iqn="foo.bar.baz.iqn") - removePool(poolobj) + # Test parsing source name for target path + poolobj = createPool(conn, StoragePool.TYPE_LOGICAL, + "pool-logical-target-srcname", + target_path="/dev/vgfoobar") + removePool(poolobj) - def testSCSIPool(self): - poolobj = createPool(self.conn, StoragePool.TYPE_SCSI, "pool-scsi") - removePool(poolobj) + # Test with source name + poolobj = createPool(conn, + StoragePool.TYPE_LOGICAL, "pool-logical-srcname", + source_name="vgname") + removePool(poolobj) - def testMpathPool(self): - poolobj = createPool(self.conn, StoragePool.TYPE_MPATH, "pool-mpath") - removePool(poolobj) - def testGlusterPool(self): - poolobj = createPool(self.conn, - StoragePool.TYPE_GLUSTER, "pool-gluster") - removePool(poolobj) +def testDiskPool(): + conn = utils.URIs.open_testdefault_cached() + poolobj = createPool(conn, + StoragePool.TYPE_DISK, + "pool-disk", fmt="auto", + target_path="/some/target/path") + invol = createVol(conn, poolobj) + createVol(conn, poolobj, + volname=invol.name() + "input", input_vol=invol) + createVol(conn, poolobj, + volname=invol.name() + "clone", clone_vol=invol) + removePool(poolobj) - def testRBDPool(self): - poolobj = createPool(self.conn, - StoragePool.TYPE_RBD, "pool-rbd") - removePool(poolobj) - def testMisc(self): - # Misc coverage testing - vol = StorageVolume(self.conn) - assert vol.is_size_conflict()[0] is False +def testISCSIPool(): + conn = utils.URIs.open_testdefault_cached() + poolobj = createPool(conn, + StoragePool.TYPE_ISCSI, "pool-iscsi", + iqn="foo.bar.baz.iqn") + removePool(poolobj) - fullconn = utils.URIs.open_testdriver_cached() - glusterpool = fullconn.storagePoolLookupByName("gluster-pool") - diskpool = fullconn.storagePoolLookupByName("disk-pool") - glustervol = StorageVolume(fullconn) - glustervol.pool = glusterpool - assert glustervol.supports_format() is True +def testSCSIPool(): + conn = utils.URIs.open_testdefault_cached() + poolobj = createPool(conn, StoragePool.TYPE_SCSI, "pool-scsi") + removePool(poolobj) - diskvol = StorageVolume(fullconn) - diskvol.pool = diskpool - assert diskvol.supports_format() is False - glusterpool.destroy() - StoragePool.ensure_pool_is_running(glusterpool) +def testMpathPool(): + conn = utils.URIs.open_testdefault_cached() + poolobj = createPool(conn, StoragePool.TYPE_MPATH, "pool-mpath") + removePool(poolobj) - # Check pool collision detection - name = StoragePool.find_free_name(fullconn, "gluster-pool") - assert name == "gluster-pool-1" - def testEnumerateLogical(self): - lst = StoragePool.pool_list_from_sources(self.conn, - StoragePool.TYPE_LOGICAL) - assert lst == ["testvg1", "testvg2"] +def testGlusterPool(): + conn = utils.URIs.open_testdefault_cached() + poolobj = createPool(conn, + StoragePool.TYPE_GLUSTER, "pool-gluster") + removePool(poolobj) + + +def testRBDPool(): + conn = utils.URIs.open_testdefault_cached() + poolobj = createPool(conn, + StoragePool.TYPE_RBD, "pool-rbd") + removePool(poolobj) + + +def testMisc(): + conn = utils.URIs.open_testdefault_cached() + # Misc coverage testing + vol = StorageVolume(conn) + assert vol.is_size_conflict()[0] is False + + fullconn = utils.URIs.open_testdriver_cached() + glusterpool = fullconn.storagePoolLookupByName("gluster-pool") + diskpool = fullconn.storagePoolLookupByName("disk-pool") + + glustervol = StorageVolume(fullconn) + glustervol.pool = glusterpool + assert glustervol.supports_format() is True + + diskvol = StorageVolume(fullconn) + diskvol.pool = diskpool + assert diskvol.supports_format() is False + + glusterpool.destroy() + StoragePool.ensure_pool_is_running(glusterpool) + + # Check pool collision detection + name = StoragePool.find_free_name(fullconn, "gluster-pool") + assert name == "gluster-pool-1" + + +def testEnumerateLogical(): + conn = utils.URIs.open_testdefault_cached() + lst = StoragePool.pool_list_from_sources(conn, + StoragePool.TYPE_LOGICAL) + assert lst == ["testvg1", "testvg2"] diff --git a/tests/test_uriparse.py b/tests/test_uriparse.py index d42c70671..bcca3c0da 100644 --- a/tests/test_uriparse.py +++ b/tests/test_uriparse.py @@ -3,8 +3,6 @@ # This work is licensed under the GNU GPLv2 or later. # See the COPYING file in the top-level directory. -import unittest - import pytest from virtinst import URI @@ -12,80 +10,82 @@ from virtinst import URI import tests -class TestURI(unittest.TestCase): - """ - Test virtinst URI module - """ - def _compare(self, uri, scheme='', - transport='', port='', username='', path='', - hostname='', query='', fragment='', - is_ipv6=False, host_is_ipv4_string=False): - uriinfo = URI(uri) - assert scheme == uriinfo.scheme - assert transport == uriinfo.transport - assert port == uriinfo.port - assert username == uriinfo.username - assert path == uriinfo.path - assert hostname == uriinfo.hostname - assert query == uriinfo.query - assert fragment == uriinfo.fragment - assert is_ipv6 == uriinfo.is_ipv6 - assert host_is_ipv4_string == uriinfo.host_is_ipv4_string +############################ +# Test virtinst URI module # +############################ - def testURIs(self): - self._compare("lxc://", scheme="lxc") - self._compare("qemu:///session", scheme="qemu", path="/session") - self._compare("http://foobar.com:5901/my/example.path#my-frag", - scheme="http", hostname="foobar.com", - port="5901", path='/my/example.path', - fragment="my-frag") - self._compare( - "gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img", - scheme="gluster", transport="tcp", - hostname="1:2:3:4:5:6:7:8", port="24007", - path="/testvol/dir/a.img", is_ipv6=True) - self._compare( - "qemu+ssh://root@192.168.2.3/system?no_verify=1", - scheme="qemu", transport="ssh", username="root", - hostname="192.168.2.3", path="/system", - query="no_verify=1", host_is_ipv4_string=True) - self._compare( - "qemu+ssh://foo%5Cbar@hostname/system", - scheme="qemu", path="/system", transport="ssh", - hostname="hostname", username="foo\\bar") - self._compare( - "qemu+ssh://user%40domain.org@hostname/system", - scheme="qemu", path="/system", transport="ssh", - hostname="hostname", username="user@domain.org") +def _compare(uri, scheme='', + transport='', port='', username='', path='', + hostname='', query='', fragment='', + is_ipv6=False, host_is_ipv4_string=False): + uriinfo = URI(uri) + assert scheme == uriinfo.scheme + assert transport == uriinfo.transport + assert port == uriinfo.port + assert username == uriinfo.username + assert path == uriinfo.path + assert hostname == uriinfo.hostname + assert query == uriinfo.query + assert fragment == uriinfo.fragment + assert is_ipv6 == uriinfo.is_ipv6 + assert host_is_ipv4_string == uriinfo.host_is_ipv4_string - def test_magicuri_connver(self): - uri = tests.utils.URIs.test_default + ",connver=1,libver=2" - conn = tests.utils.URIs.openconn(uri) - assert conn.conn_version() == 1 - assert conn.local_libvirt_version() == 2 - conn = tests.utils.URIs.openconn("test:///default") - # Add some support tests with it - with pytest.raises(ValueError, - match=".*type .*"): - conn.support.domain_xml_inactive("foo") +def testURIs(): + _compare("lxc://", scheme="lxc") + _compare("qemu:///session", scheme="qemu", path="/session") + _compare("http://foobar.com:5901/my/example.path#my-frag", + scheme="http", hostname="foobar.com", + port="5901", path='/my/example.path', + fragment="my-frag") + _compare( + "gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img", + scheme="gluster", transport="tcp", + hostname="1:2:3:4:5:6:7:8", port="24007", + path="/testvol/dir/a.img", is_ipv6=True) + _compare( + "qemu+ssh://root@192.168.2.3/system?no_verify=1", + scheme="qemu", transport="ssh", username="root", + hostname="192.168.2.3", path="/system", + query="no_verify=1", host_is_ipv4_string=True) + _compare( + "qemu+ssh://foo%5Cbar@hostname/system", + scheme="qemu", path="/system", transport="ssh", + hostname="hostname", username="foo\\bar") + _compare( + "qemu+ssh://user%40domain.org@hostname/system", + scheme="qemu", path="/system", transport="ssh", + hostname="hostname", username="user@domain.org") - # pylint: disable=protected-access - from virtinst import support - def _run(**kwargs): - check = support._SupportCheck(**kwargs) - return check(conn) - assert _run(function="virNope.Foo") is False - assert _run(function="virDomain.IDontExist") is False - assert _run(function="virDomain.isActive") is True - assert _run(function="virConnect.getVersion", - flag="SOME_FLAG_DOESNT_EXIST") is False - assert _run(version="1000.0.0") is False - assert _run(hv_version={"test": "1000.0.0"}) is False - assert _run(hv_libvirt_version={"test": "1000.0.0"}) is False - assert _run(hv_libvirt_version={"qemu": "1.2.3"}) is False - assert _run(hv_libvirt_version={"qemu": "1.2.3", "all": 0}) is True +def test_magicuri_connver(): + uri = tests.utils.URIs.test_default + ",connver=1,libver=2" + conn = tests.utils.URIs.openconn(uri) + assert conn.conn_version() == 1 + assert conn.local_libvirt_version() == 2 - dom = conn.lookupByName("test") - assert conn.support.domain_xml_inactive(dom) is True + conn = tests.utils.URIs.openconn("test:///default") + # Add some support tests with it + with pytest.raises(ValueError, + match=".*type .*"): + conn.support.domain_xml_inactive("foo") + + # pylint: disable=protected-access + from virtinst import support + def _run(**kwargs): + check = support._SupportCheck(**kwargs) + return check(conn) + + assert _run(function="virNope.Foo") is False + assert _run(function="virDomain.IDontExist") is False + assert _run(function="virDomain.isActive") is True + assert _run(function="virConnect.getVersion", + flag="SOME_FLAG_DOESNT_EXIST") is False + assert _run(version="1000.0.0") is False + assert _run(hv_version={"test": "1000.0.0"}) is False + assert _run(hv_libvirt_version={"test": "1000.0.0"}) is False + assert _run(hv_libvirt_version={"qemu": "1.2.3"}) is False + assert _run(hv_libvirt_version={"qemu": "1.2.3", "all": 0}) is True + + dom = conn.lookupByName("test") + assert conn.support.domain_xml_inactive(dom) is True diff --git a/tests/test_urls.py b/tests/test_urls.py index 0ea7e2e98..131aff16c 100644 --- a/tests/test_urls.py +++ b/tests/test_urls.py @@ -6,7 +6,6 @@ import os import re import sys -import unittest import pytest @@ -126,7 +125,7 @@ def _testGuest(testdata, guest): msg = _skipmsg(testdata) if msg: - raise unittest.SkipTest(msg) + raise pytest.skip(msg) installer = Installer(guest.conn, location=url) try: @@ -190,24 +189,22 @@ def _testURL(testdata): _testGuest(testdata, xenguest) -# Register tests to be picked up by unittest -class URLTests(unittest.TestCase): - def test001BadURL(self): - badurl = "http://aksdkakskdfa-idontexist.com/foo/tree" +def test001BadURL(): + badurl = "http://aksdkakskdfa-idontexist.com/foo/tree" - with pytest.raises(ValueError, match=".*maybe you mistyped.*"): - installer = Installer(hvmguest.conn, location=badurl) - installer.detect_distro(hvmguest) + with pytest.raises(ValueError, match=".*maybe you mistyped.*"): + installer = Installer(hvmguest.conn, location=badurl) + installer.detect_distro(hvmguest) - # Non-existent cdrom fails - with pytest.raises(ValueError, match=".*non-existent path.*"): - installer = Installer(hvmguest.conn, cdrom="/not/exist/foobar") - assert installer.detect_distro(hvmguest) is None - - # Ensure existing but non-distro file doesn't error - installer = Installer(hvmguest.conn, cdrom="/dev/null") + # Non-existent cdrom fails + with pytest.raises(ValueError, match=".*non-existent path.*"): + installer = Installer(hvmguest.conn, cdrom="/not/exist/foobar") assert installer.detect_distro(hvmguest) is None + # Ensure existing but non-distro file doesn't error + installer = Installer(hvmguest.conn, cdrom="/dev/null") + assert installer.detect_distro(hvmguest) is None + def _make_tests(): import configparser @@ -238,7 +235,7 @@ def _make_tests(): for key, testdata in sorted(urls.items()): def _make_wrapper(d): return lambda _self: _testURL(d) - methodname = "testURL%s" % key.replace("-", "_") - setattr(URLTests, methodname, _make_wrapper(testdata)) + methodname = "test_URL%s" % key.replace("-", "_") + globals()[methodname] = _make_wrapper(testdata) _make_tests() diff --git a/tests/test_xmlparse.py b/tests/test_xmlparse.py index aabc50f58..0b2303947 100644 --- a/tests/test_xmlparse.py +++ b/tests/test_xmlparse.py @@ -3,10 +3,6 @@ # This work is licensed under the GNU GPLv2 or later. # See the COPYING file in the top-level directory. -import glob -import traceback -import unittest - import pytest import virtinst @@ -17,1125 +13,1124 @@ from tests import utils DATADIR = utils.DATADIR + "/xmlparse/" -def sanitize_file_xml(xml): +#################### +# Helper functions # +#################### + +def _sanitize_file_xml(xml): # s/"/'/g from generated XML, matches what libxml dumps out # This won't work all the time, but should be good enough for testing return xml.replace("'", "\"") -class XMLParseTest(unittest.TestCase): - _kvmconn = None +def _alter_compare(conn, actualXML, outfile): + utils.diff_compare(actualXML, outfile) + utils.test_create(conn, actualXML) - @property - def conn(self): - return utils.URIs.open_testdefault_cached() - @property - def kvmconn(self): - if not self._kvmconn: - self._kvmconn = utils.URIs.open_kvm() - return self._kvmconn +def _set_and_check(obj, param, initval, *args): + """ + Check expected initial value obj.param == initval, then + set newval, and make sure it is returned properly + """ + curval = virtinst.xmlutil.get_prop_path(obj, param) + assert initval == curval - def _roundtrip_compare(self, filename): - expectXML = sanitize_file_xml(open(filename).read()) - guest = virtinst.Guest(self.conn, parsexml=expectXML) - actualXML = guest.get_xml() - utils.diff_compare(actualXML, expect_out=expectXML) - - def _alter_compare(self, actualXML, outfile): - utils.diff_compare(actualXML, outfile) - utils.test_create(self.conn, actualXML) - - def testRoundTrip(self): - """ - Make sure parsing doesn't output different XML - """ - exclude = ["misc-xml-escaping.xml"] - failed = False - error = "" - for f in glob.glob("tests/xmlconfig-xml/*.xml"): - if [e for e in exclude if f.endswith(e)]: - continue - - try: - self._roundtrip_compare(f) - except Exception: - failed = True - error += "%s:\n%s\n" % (f, "".join(traceback.format_exc())) - - if failed: - raise AssertionError("Roundtrip parse tests failed:\n%s" % error) - - def _set_and_check(self, obj, param, initval, *args): - """ - Check expected initial value obj.param == initval, then - set newval, and make sure it is returned properly - """ + for newval in args: + virtinst.xmlutil.set_prop_path(obj, param, newval) curval = virtinst.xmlutil.get_prop_path(obj, param) - assert initval == curval - - for newval in args: - virtinst.xmlutil.set_prop_path(obj, param, newval) - curval = virtinst.xmlutil.get_prop_path(obj, param) - assert newval == curval - - def _make_checker(self, obj): - def check(name, initval, *args): - return self._set_and_check(obj, name, initval, *args) - return check - - def _gen_outfile_path(self, basename): - """ - Returns relative path to the file containing the expected XML - output - - """ - return DATADIR + "{!s}-out.xml".format(basename) - - def _get_test_content(self, basename, kvm=False): - infile = DATADIR + "%s-in.xml" % basename - outfile = self._gen_outfile_path(basename) - guest = virtinst.Guest(kvm and self.kvmconn or self.conn, - parsexml=open(infile).read()) - return guest, outfile - - def testAlterGuest(self): - """ - Test changing Guest() parameters after parsing - """ - guest, outfile = self._get_test_content("change-guest") - - check = self._make_checker(guest) - - # Check specific vcpu_current behaviro - check("vcpus", 5, 10) - assert guest.vcpu_current is None - check("vcpu_current", None, 15) - guest.vcpus = 12 - assert guest.vcpu_current == 12 - guest.vcpu_current = 10 - - check("name", "TestGuest", "change_name") - check("id", None, 1234) - check("description", None, "Hey desc changed&") - check("title", None, "Hey title changed!") - check("vcpu_cpuset", "1-3", "1-8,^6", "1-5,15") - check("memory", 409600, 512000) - check("currentMemory", 204800, 1024000) - check("memory", 1024000, 2048000) - check("uuid", "12345678-1234-1234-1234-123456789012", - "11111111-2222-3333-4444-555555555555") - check("emulator", "/usr/lib/xen/bin/qemu-dm", "/usr/binnnn/fooemu") - check("type", "kvm", "test") - check("bootloader", None, "pygrub") - check("on_poweroff", "destroy", "restart") - check("on_reboot", "restart", "destroy") - check("on_crash", "destroy", "restart") - check("on_lockfailure", "poweroff", "restart") - - check = self._make_checker(guest._metadata.libosinfo) # pylint: disable=protected-access - check("os_id", "http://fedoraproject.org/fedora/17") - guest.set_os_name("fedora10") - check("os_id", "http://fedoraproject.org/fedora/10") - assert guest.osinfo.name == "fedora10" - guest.set_os_name("generic") - check("os_id", None, "frib") - assert guest.osinfo.name == "generic" - - check = self._make_checker(guest.clock) - check("offset", "utc", "localtime") - guest.clock.remove_child(guest.clock.timers[0]) - check = self._make_checker(guest.clock.timers[0]) - check("name", "pit", "rtc") - check("tickpolicy", "delay", "merge") - timer = guest.clock.timers.add_new() - check = self._make_checker(timer) - check("name", None, "hpet") - check("present", None, False) - - check = self._make_checker(guest.pm) - check("suspend_to_mem", False, True) - check("suspend_to_disk", None, False) - - check = self._make_checker(guest.os) - check("os_type", "hvm", "xen") - check("arch", "i686", None) - check("machine", "foobar", "pc-0.11") - check("loader", None, "/foo/loader") - check("init", None, "/sbin/init") - check("bootorder", ["hd"], ["fd"]) - check("enable_bootmenu", None, False) - check("useserial", None, True) - check("kernel", None) - check("initrd", None) - check("kernel_args", None) - - guest.os.set_initargs_string("foo 'bar baz' frib") - assert [i.val for i in guest.os.initargs] == ["foo", "bar baz", "frib"] - - check = self._make_checker(guest.features) - check("acpi", True, False) - check("apic", True, True) - check("eoi", None, True) - check("pae", False, False) - check("viridian", False, True) - check("hap", False, False) - check("privnet", False, False) - check("hyperv_relaxed", None, True) - check("hyperv_vapic", False, None) - check("hyperv_spinlocks", True, True) - check("hyperv_spinlocks_retries", 12287, 54321) - check("vmport", False, True) - check("vmcoreinfo", None, True) - check("kvm_hidden", None, True) - check("pvspinlock", None, True) - check("gic_version", None, False) - - check = self._make_checker(guest.cpu) - check("match", "exact", "strict") - guest.cpu.set_model(guest, "qemu64") - check("model", "qemu64") - check("vendor", "Intel", "qemuvendor") - check("topology.threads", 2, 1) - check("topology.cores", 5, 3) - guest.cpu.topology.sockets = 4.0 - check("topology.sockets", 4) - - check = self._make_checker(guest.cpu.features[0]) - check("name", "x2apic") - check("policy", "force", "disable") - rmfeat = guest.cpu.features[3] - guest.cpu.remove_child(rmfeat) - assert rmfeat.get_xml() == """\n""" - guest.cpu.add_feature("addfeature") - - check = self._make_checker(guest.numatune) - check("memory_mode", "interleave", "strict", None) - check("memory_nodeset", "1-5,^3,7", "2,4,6") - - check = self._make_checker(guest.memtune) - check("hard_limit", None, 1024, 2048) - check("soft_limit", None, 100, 200) - check("swap_hard_limit", None, 300, 400) - check("min_guarantee", None, 400, 500) - - check = self._make_checker(guest.blkiotune) - check("weight", None, 100, 200) - check = self._make_checker(guest.blkiotune.devices.add_new()) - check("weight", None, 300) - check("path", None, "/home/1.img") - - check = self._make_checker(guest.idmap) - check("uid_start", None, 0) - check("uid_target", None, 1000) - check("uid_count", None, 10) - check("gid_start", None, 0) - check("gid_target", None, 1000) - check("gid_count", None, 10) - - check = self._make_checker(guest.resource) - check("partition", None, "/virtualmachines/production") - - check = self._make_checker(guest.devices.memballoon[0]) - check("model", "virtio", "none") - - check = self._make_checker(guest.memoryBacking) - check("hugepages", False, True) - check("nosharepages", False, True) - check("locked", False, True) - - page = guest.memoryBacking.pages.add_new() - check = self._make_checker(page) - check("size", None, 1) - check("unit", None, "G") - - assert guest.is_full_os_container() is False - self._alter_compare(guest.get_xml(), outfile) - - def testAlterCpuMode(self): - xml = open(DATADIR + "change-cpumode-in.xml").read() - outfile = DATADIR + "change-cpumode-out.xml" - conn = utils.URIs.openconn(utils.URIs.kvm_q35) - guest = virtinst.Guest(conn, xml) - check = self._make_checker(guest.cpu) - - guest.cpu.model = "foo" - check("mode", "host-passthrough") - guest.cpu.check_security_features(guest) - check("secure", False) - guest.cpu.set_special_mode(guest, "host-model") - check("mode", "host-model") - guest.cpu.check_security_features(guest) - check("secure", False) - guest.cpu.set_model(guest, "qemu64") - check("model", "qemu64") - guest.cpu.check_security_features(guest) - check("secure", False) - - # Test actually filling in security values, and removing them - guest.cpu.secure = True - guest.cpu.set_model(guest, "Skylake-Client-IBRS") - guest.cpu.check_security_features(guest) - check("secure", True) - guest.cpu.set_model(guest, "EPYC-IBPB") - guest.cpu.check_security_features(guest) - check("secure", True) - guest.cpu.secure = False - guest.cpu.set_model(guest, "Skylake-Client-IBRS") - guest.cpu.check_security_features(guest) - check("secure", False) - self._alter_compare(guest.get_xml(), outfile) - - # Hits a codepath when domcaps don't provide the needed info - emptyconn = utils.URIs.open_testdefault_cached() - guest = virtinst.Guest(emptyconn, xml) - guest.cpu.check_security_features(guest) - assert guest.cpu.secure is False - - def testAlterDisk(self): - """ - Test changing DeviceDisk() parameters after parsing - """ - guest, outfile = self._get_test_content("change-disk") - - def _get_disk(target): - for disk in guest.devices.disk: - if disk.target == target: - return disk - - disk = _get_disk("hda") - check = self._make_checker(disk) - check("path", "/tmp/test.img", "/dev/foo/null") - disk.sync_path_props() - check("driver_name", None, "test") - check("driver_type", None, "raw") - check("serial", "WD-WMAP9A966149", "frob") - check("wwn", None, "123456789abcdefa") - check("bus", "ide", "usb") - check("removable", None, False, True) - - disk = guest.devices.disk[1] - check = self._make_checker(disk.seclabels[1]) - check("model", "dac") - check("relabel", None, True) - check("label", None, "foo-my-label") - - disk = _get_disk("hdc") - check = self._make_checker(disk) - check("type", "block", "dir", "file", "block") - check("path", "/dev/null", None) - disk.sync_path_props() - check("device", "cdrom", "floppy") - check("read_only", True, False) - check("target", "hdc", "fde") - check("bus", "ide", "fdc") - check("error_policy", "stop", None) - - disk = _get_disk("hdd") - check = self._make_checker(disk) - check("type", "block") - check("device", "lun") - check("sgio", None, "unfiltered") - check("rawio", None, "yes") - - disk = _get_disk("sda") - check = self._make_checker(disk) - check("path", None, "http://[1:2:3:4:5:6:7:8]:1122/my/file") - disk.sync_path_props() - - disk = _get_disk("fda") - check = self._make_checker(disk) - check("path", None, "/dev/default-pool/default-vol") - disk.sync_path_props() - check("startup_policy", None, "optional") - check("shareable", False, True) - check("driver_cache", None, "writeback") - check("driver_io", None, "threads") - check("driver_io", "threads", "native") - check("driver_discard", None, "unmap") - check("driver_detect_zeroes", None, "unmap") - check("iotune_ris", 1, 0) - check("iotune_rbs", 2, 0) - check("iotune_wis", 3, 0) - check("iotune_wbs", 4, 0) - check("iotune_tis", None, 5) - check("iotune_tbs", None, 6) - check = self._make_checker(disk.boot) - check("order", None, 7, None) - - disk = _get_disk("vdb") - check = self._make_checker(disk) - check("source_pool", "defaultPool", "anotherPool") - check("source_volume", "foobar", "newvol") - - disk = _get_disk("vdc") - check = self._make_checker(disk) - check("source_protocol", "rbd", "gluster") - check("source_name", "pool/image", "new-val/vol") - check("source_host_name", "mon1.example.org", "diff.example.org") - check("source_host_port", 6321, 1234) - check("path", "gluster://diff.example.org:1234/new-val/vol") - - disk = _get_disk("vdd") - check = self._make_checker(disk) - check("source_protocol", "nbd") - check("source_host_transport", "unix") - check("source_host_socket", "/var/run/nbdsock") - check("path", "nbd+unix:///var/run/nbdsock") - - self._alter_compare(guest.get_xml(), outfile) - - def testAlterDevicesBootorder(self): - basename = "change-devices-bootorder" - guest, outfile = self._get_test_content(basename) - disk_1 = guest.devices.disk[0] - disk_2 = guest.devices.disk[1] - disk_3 = guest.devices.disk[2] - disk_4 = guest.devices.disk[3] - iface_1 = guest.devices.interface[0] - iface_2 = guest.devices.interface[1] - redirdev_1 = guest.devices.redirdev[0] - - assert guest.os.bootorder == ['hd'] - assert disk_1.boot.order is None - assert disk_2.boot.order == 10 - assert disk_3.boot.order == 10 - assert disk_4.boot.order == 1 - assert iface_1.boot.order == 2 - assert iface_2.boot.order is None - assert redirdev_1.boot.order == 3 - - guest.reorder_boot_order(disk_1, 1) - - assert guest.os.bootorder == [] - assert disk_1.boot.order == 1 - assert disk_2.boot.order == 10 - assert disk_3.boot.order == 10 - # verify that the used algorithm preserves the order of - # records with equal boot indices - assert disk_2 is guest.devices.disk[1] - assert disk_3 is guest.devices.disk[2] - assert disk_4.boot.order == 2 - assert iface_1.boot.order == 3 - assert iface_2.boot.order is None - assert redirdev_1.boot.order == 4 - - try: - self._alter_compare(guest.get_xml(), outfile) - except RuntimeError as error: - assert "unsupported configuration" in str(error) - - guest.reorder_boot_order(disk_2, 10) - assert disk_2.boot.order == 10 - assert disk_3.boot.order == 11 - assert disk_2 is guest.devices.disk[1] - assert disk_3 is guest.devices.disk[2] - - outfile = self._gen_outfile_path("change-devices-bootorder-fixed") - self._alter_compare(guest.get_xml(), outfile) - - def testSingleDisk(self): - xml = ("""\n""" - """\n""") - d = virtinst.DeviceDisk(self.conn, parsexml=xml) - self._set_and_check(d, "target", "hda", "hdb") - assert xml.replace("hda", "hdb") == d.get_xml() - - def testAlterChars(self): - guest, outfile = self._get_test_content("change-chars") - - serial1 = guest.devices.serial[0] - serial2 = guest.devices.serial[1] - parallel1 = guest.devices.parallel[0] - parallel2 = guest.devices.parallel[1] - console1 = guest.devices.console[0] - console2 = guest.devices.console[1] - channel1 = guest.devices.channel[0] - channel2 = guest.devices.channel[1] - channel3 = guest.devices.channel[2] - - check = self._make_checker(serial1) - check("type", "null", "udp") - check("source.bind_host", None, "example.com") - check("source.bind_service", None, 66) - check("source.connect_host", None, "example.com.uk") - check("source.connect_service", None, 77) - - check = self._make_checker(serial2) - check("type", "tcp") - check("source.protocol", "telnet", "raw") - check("source.mode", "bind", "connect") - - check = self._make_checker(parallel1) - check("source.mode", "bind") - check("source.path", "/tmp/foobar", None) - check("type", "unix", "pty") - - check = self._make_checker(parallel2) - check("type", "udp") - check("source.bind_service", 1111, 1357) - check("source.bind_host", "my.bind.host", "my.foo.host") - check("source.connect_service", 2222, 7777) - check("source.connect_host", "my.source.host", "source.foo.host") - - check = self._make_checker(console1) - check("type", "pty") - check("target_type", None) - - check = self._make_checker(console2) - check("type", "file") - check("source.path", "/tmp/foo.img", None) - check("source.path", None, "/root/foo") - check("target_type", "virtio") - check("target_state", None, "connected") - - check = self._make_checker(channel1) - check("type", "pty") - check("target_type", "virtio", "bar", "virtio") - check("target_name", "foo.bar.frob", "test.changed") - - check = self._make_checker(channel2) - check("type", "unix", "foo", "unix") - check("target_type", "guestfwd") - check("target_address", "1.2.3.4", "5.6.7.8") - check("target_port", 4567, 1199) - - check = self._make_checker(channel3) - check("type", "spiceport") - check("source.channel", "org.spice-space.webdav.0", "test.1") - check("target_type", "virtio") - check("target_name", "org.spice-space.webdav.0", "test.2") - assert channel3.get_xml_id() == "./devices/channel[3]" - assert channel3.get_xml_idx() == 2 - - self._alter_compare(guest.get_xml(), outfile) - - def _testAlterControllers(self): - guest, outfile = self._get_test_content("change-controllers") - - dev1 = guest.devices.controller[0] - dev2 = guest.devices.controller[1] - dev3 = guest.devices.controller[2] - dev4 = guest.devices.controller[3] - - check = self._make_checker(dev1) - check("type", "ide") - check("index", 3, 1) - - check = self._make_checker(dev2) - check("type", "virtio-serial") - check("index", 0, 7) - check("ports", 32, 5) - check("vectors", 17, None) - - check = self._make_checker(dev3) - check("type", "scsi") - check("index", 1, 2) - - check = self._make_checker(dev4) - check("type", "usb", "foo", "usb") - check("index", 3, 9) - check("model", "ich9-ehci1", "ich9-uhci1") - check("master_startport", 4, 2) - - self._alter_compare(guest.get_xml(), outfile) - - def testAlterNics(self): - guest, outfile = self._get_test_content("change-nics") - - dev1 = guest.devices.interface[0] - dev2 = guest.devices.interface[1] - dev3 = guest.devices.interface[2] - dev4 = guest.devices.interface[3] - dev5 = guest.devices.interface[4] - - check = self._make_checker(dev1) - check("type", "user") - check("model", None, "vmxnet3") - check("source", None, None,) - check("macaddr", "22:11:11:11:11:11", "AA:AA:AA:AA:AA:AA") - check("filterref", None, "foo") - - check = self._make_checker(dev2) - check("source", "default", None) - check("type", "network", "bridge") - check("source", None, "newbr0") - check("model", "e1000", "virtio") - - check = self._make_checker(dev3) - check("type", "bridge") - check("source", "foobr0", "newfoo0") - check("macaddr", "22:22:22:22:22:22") - check("target_dev", None, "test1") - - check = self._make_checker(dev4) - check("type", "ethernet") - check("target_dev", "nic02", "nic03") - check("target_dev", "nic03", None) - - check = self._make_checker(dev5) - check("type", "direct") - check("source", "eth0.1") - check("source_mode", "vepa", "bridge") - check("portgroup", None, "sales") - check("driver_name", None, "vhost") - check("driver_queues", None, 5) - - virtualport = dev5.virtualport - check = self._make_checker(virtualport) - check("type", "802.1Qbg", "foo", "802.1Qbg") - check("managerid", 12, 11) - check("typeid", 1193046, 1193047) - check("typeidversion", 1, 2) - check("instanceid", "09b11c53-8b5c-4eeb-8f00-d84eaa0aaa3b", - "09b11c53-8b5c-4eeb-8f00-d84eaa0aaa4f") - - self._alter_compare(guest.get_xml(), outfile) - - def testQEMUXMLNS(self): - basename = "change-xmlns-qemu" - infile = DATADIR + "%s-in.xml" % basename - outfile = DATADIR + "%s-out.xml" % basename - guest = virtinst.Guest(self.kvmconn, parsexml=open(infile).read()) - - check = self._make_checker(guest.xmlns_qemu.args[0]) - check("value", "-somearg", "-somenewarg") - check = self._make_checker(guest.xmlns_qemu.args[1]) - check("value", "bar,baz=wib wob", "testval") - guest.xmlns_qemu.args.add_new().value = "additional-arg" - arg0 = guest.xmlns_qemu.args[0] - guest.xmlns_qemu.remove_child(guest.xmlns_qemu.args[0]) - x = "\n" - assert arg0.get_xml() == x - - check = self._make_checker(guest.xmlns_qemu.envs[0]) - check("name", "SOMEENV") - check("value", "foo=bar baz,foo") - env = guest.xmlns_qemu.envs.add_new() - env.name = "DISPLAY" - env.value = "1:2" - - self._alter_compare(guest.get_xml(), outfile) - - def testAddRemoveDevices(self): - guest, outfile = self._get_test_content("add-devices") - - # Basic removal of existing device - rmdev = guest.devices.disk[2] - guest.remove_device(rmdev) - - # Basic device add - d = virtinst.DeviceWatchdog(self.conn) - d.set_defaults(guest) - guest.add_device(d) - - # Test adding device with child properties (address value) - adddev = virtinst.DeviceInterface(self.conn) - adddev.type = "network" - adddev.source = "default" - adddev.macaddr = "1A:2A:3A:4A:5A:6A" - adddev.address.type = "spapr-vio" - adddev.set_defaults(guest) - - # Test adding and removing the same device - guest.add_device(adddev) - guest.remove_device(adddev) - guest.add_device(adddev) - - # Test adding device built from parsed XML - guest.add_device(virtinst.DeviceSound(self.conn, - parsexml="""""")) - - self._alter_compare(guest.get_xml(), outfile) - - def testChangeKVMMedia(self): - guest, outfile = self._get_test_content("change-media", kvm=True) - - disk = guest.devices.disk[0] - check = self._make_checker(disk) - check("path", None, "/dev/default-pool/default-vol") - disk.sync_path_props() - - disk = guest.devices.disk[1] - check = self._make_checker(disk) - check("path", None, "/dev/default-pool/default-vol") - check("path", "/dev/default-pool/default-vol", "/dev/disk-pool/diskvol1") - disk.sync_path_props() - - disk = guest.devices.disk[2] - check = self._make_checker(disk) - check("path", None, "/dev/disk-pool/diskvol1") - disk.sync_path_props() - - disk = guest.devices.disk[3] - check = self._make_checker(disk) - check("path", None, "/dev/default-pool/default-vol") - disk.sync_path_props() - - disk = guest.devices.disk[4] - check = self._make_checker(disk) - check("path", None, "/dev/disk-pool/diskvol1") - disk.sync_path_props() - - self._alter_compare(guest.get_xml(), outfile) - - def testGuestBootorder(self): - guest, outfile = self._get_test_content("bootorder", kvm=True) - - assert guest.get_boot_order() == ['./devices/disk[1]'] - assert guest.get_boot_order(legacy=True) == ['hd'] - - legacy_order = ['hd', 'fd', 'cdrom', 'network'] - dev_order = ['./devices/disk[1]', - './devices/disk[3]', - './devices/disk[2]', - './devices/interface[1]'] - guest.set_boot_order(legacy_order, legacy=True) - assert guest.get_boot_order() == dev_order - assert guest.get_boot_order(legacy=True) == legacy_order - - guest.set_boot_order(dev_order) - assert guest.get_boot_order() == dev_order - assert guest.get_boot_order(legacy=True) == [] - - self._alter_compare(guest.get_xml(), outfile) - - def testDiskChangeBus(self): - guest, outfile = self._get_test_content("disk-change-bus") - - disk = guest.devices.disk[0] - - # Same bus is a no-op - origxml = disk.get_xml() - disk.change_bus(guest, "virtio") - assert origxml == disk.get_xml() - disk.change_bus(guest, "ide") - - disk = guest.devices.disk[2] - disk.change_bus(guest, "scsi") - - self._alter_compare(guest.get_xml(), outfile) - - - ################## - # Snapshot tests # - ################## - - def testChangeSnapshot(self): - basename = "change-snapshot" - infile = DATADIR + "%s-in.xml" % basename - outfile = DATADIR + "%s-out.xml" % basename - snap = virtinst.DomainSnapshot(self.conn, parsexml=open(infile).read()) - - check = self._make_checker(snap) - check("name", "offline-root-child1", "name-foo") - check("state", "shutoff", "somestate") - check("description", "offline desk", "foo\nnewline\n indent") - check("parent", "offline-root", "newparent") - check("creationTime", 1375905916, 1234) - check("memory_type", "no", "internal") - - check = self._make_checker(snap.disks[0]) - check("name", "hda", "hdb") - check("snapshot", "internal", "no") - - utils.diff_compare(snap.get_xml(), outfile) - - - ################# - # Storage tests # - ################# - - def testFSPool(self): - basename = "pool-fs" - infile = DATADIR + "%s.xml" % basename - outfile = DATADIR + "%s-out.xml" % basename - pool = virtinst.StoragePool(self.conn, parsexml=open(infile).read()) - - check = self._make_checker(pool) - check("type", "fs", "dir") - check("name", "pool-fs", "foo-new") - check("uuid", "10211510-2115-1021-1510-211510211510", - "10211510-2115-1021-1510-211510211999") - check("capacity", 984373075968, 200000) - check("allocation", 756681687040, 150000) - check("available", 227691388928, 50000) - - check("format", "auto", "ext3") - check("source_path", "/some/source/path", "/dev/foo/bar") - check("target_path", "/some/target/path", "/mnt/my/foo") - check("source_name", None, "fooname") - - utils.diff_compare(pool.get_xml(), outfile) - utils.test_create(self.conn, pool.get_xml(), "storagePoolDefineXML") - - def testISCSIPool(self): - basename = "pool-iscsi" - infile = utils.DATADIR + "/storage/%s.xml" % basename - outfile = DATADIR + "%s-out.xml" % basename - pool = virtinst.StoragePool(self.conn, parsexml=open(infile).read()) - - check = self._make_checker(pool) - check("iqn", "foo.bar.baz.iqn", "my.iqn") - check = self._make_checker(pool.hosts[0]) - check("name", "some.random.hostname", "my.host") - - utils.diff_compare(pool.get_xml(), outfile) - utils.test_create(self.conn, pool.get_xml(), "storagePoolDefineXML") - - def testGlusterPool(self): - basename = "pool-gluster" - infile = utils.DATADIR + "/storage/%s.xml" % basename - outfile = DATADIR + "%s-out.xml" % basename - pool = virtinst.StoragePool(self.conn, parsexml=open(infile).read()) - - check = self._make_checker(pool) - check("source_path", "/some/source/path", "/foo") - check = self._make_checker(pool.hosts[0]) - check("name", "some.random.hostname", "my.host") - - utils.diff_compare(pool.get_xml(), outfile) - utils.test_create(self.conn, pool.get_xml(), "storagePoolDefineXML") - - def testRBDPool(self): - basename = "pool-rbd" - infile = DATADIR + "%s.xml" % basename - outfile = DATADIR + "%s-out.xml" % basename - pool = virtinst.StoragePool(self.conn, parsexml=open(infile).read()) - - check = self._make_checker(pool.hosts[0]) - check("name", "ceph-mon-1.example.com") - check("port", 6789, 1234) - check = self._make_checker(pool.hosts[1]) - check("name", "ceph-mon-2.example.com", "foo.bar") - check("port", 6789) - check = self._make_checker(pool.hosts[2]) - check("name", "ceph-mon-3.example.com") - check("port", 6789, 1000) - hostobj = pool.hosts.add_new() - hostobj.name = "frobber" - hostobj.port = "5555" - - utils.diff_compare(pool.get_xml(), outfile) - utils.test_create(self.conn, pool.get_xml(), "storagePoolDefineXML") - - def testVol(self): - basename = "pool-dir-vol" - infile = DATADIR + "%s-in.xml" % basename - outfile = DATADIR + "%s-out.xml" % basename - vol = virtinst.StorageVolume(self.conn, parsexml=open(infile).read()) - - check = self._make_checker(vol) - check("type", None, "file") - check("key", None, "fookey") - check("capacity", 10737418240, 2000) - check("allocation", 5368709120, 1000) - check("format", "raw", "qcow2") - check("target_path", None, "/foo/bar") - check("backing_store", "/foo/bar/baz", "/my/backing") - check("lazy_refcounts", False, True) - - check = self._make_checker(vol.permissions) - check("mode", "0700", "0744") - check("owner", "10736", "10000") - check("group", "10736", "10000") - check("label", None, "foo.label") - - utils.diff_compare(vol.get_xml(), outfile) - - - ################### - # tests # - ################### - - def testNetMulti(self): - basename = "network-multi" - infile = DATADIR + "%s-in.xml" % basename - outfile = DATADIR + "%s-out.xml" % basename - net = virtinst.Network(self.conn, parsexml=open(infile).read()) - - check = self._make_checker(net) - check("name", "ipv6_multirange", "new-foo") - check("uuid", "41b4afe4-87bb-8087-6724-5e208a2d483a", - "41b4afe4-87bb-8087-6724-5e208a2d1111") - check("bridge", "virbr3", "virbr3new") - check("stp", True, False) - check("delay", 0, 2) - check("domain_name", "net7", "newdom") - check("ipv6", None, True) - check("macaddr", None, "52:54:00:69:eb:FF") - check("virtualport_type", None, "openvswitch") - - assert len(net.portgroups) == 2 - check = self._make_checker(net.portgroups[0]) - check("name", "engineering", "foo") - check("default", True, False) - - assert len(net.ips) == 4 - check = self._make_checker(net.ips[0]) - check("address", "192.168.7.1", "192.168.8.1") - check("netmask", "255.255.255.0", "255.255.254.0") - assert net.can_pxe() is False - check("tftp", None, "/var/lib/tftproot") - check("bootp_file", None, "pxeboot.img") - check("bootp_server", None, "1.2.3.4") - assert net.can_pxe() is True - - check = self._make_checker(net.forward) - check("mode", "nat", "route") - check("dev", None, "eth22") - assert net.can_pxe() is True - - check = self._make_checker(net.ips[0].ranges[0]) - check("start", "192.168.7.128", "192.168.8.128") - check("end", "192.168.7.254", "192.168.8.254") - - check = self._make_checker(net.ips[0].hosts[1]) - check("macaddr", "52:54:00:69:eb:91", "52:54:00:69:eb:92") - check("name", "badbob", "newname") - check("ip", "192.168.7.3", "192.168.8.3") - - check = self._make_checker(net.ips[1]) - check("family", "ipv6", "ipv6") - check("prefix", 64, 63) - - r = net.routes.add_new() - r.family = "ipv4" - r.address = "192.168.8.0" - r.prefix = "24" - r.gateway = "192.168.8.10" - check = self._make_checker(r) - check("netmask", None, "foo", None) - - utils.diff_compare(net.get_xml(), outfile) - utils.test_create(self.conn, net.get_xml(), "networkDefineXML") - - def testNetOpen(self): - basename = "network-open" - infile = DATADIR + "%s-in.xml" % basename - outfile = DATADIR + "%s-out.xml" % basename - net = virtinst.Network(self.conn, parsexml=open(infile).read()) - - check = self._make_checker(net) - check("name", "open", "new-foo") - check("domain_name", "open", "newdom") - - check = self._make_checker(net.forward) - check("mode", "open") - check("dev", None) - - assert len(net.ips) == 1 - check = self._make_checker(net.ips[0]) - check("address", "192.168.100.1", "192.168.101.1") - check("netmask", "255.255.255.0", "255.255.254.0") - - check = self._make_checker(net.ips[0].ranges[0]) - check("start", "192.168.100.128", "192.168.101.128") - check("end", "192.168.100.254", "192.168.101.254") - - utils.diff_compare(net.get_xml(), outfile) - utils.test_create(self.conn, net.get_xml(), "networkDefineXML") - - def testNetVfPool(self): - basename = "network-vf-pool" - infile = DATADIR + "%s-in.xml" % basename - outfile = DATADIR + "%s-out.xml" % basename - net = virtinst.Network(self.conn, parsexml=open(infile).read()) - - check = self._make_checker(net) - check("name", "passthrough", "new-foo") - - check = self._make_checker(net.forward) - check("mode", "hostdev") - check("managed", "yes") - - check = self._make_checker(net.forward.pf[0]) - check("dev", "eth3") - - utils.diff_compare(net.get_xml(), outfile) - utils.test_create(self.conn, net.get_xml(), "networkDefineXML") - - - ############## - # Misc tests # - ############## - - def testCPUUnknownClear(self): - # Make sure .clear() even removes XML elements we don't know about - basename = "clear-cpu-unknown-vals" - infile = DATADIR + "%s-in.xml" % basename - outfile = DATADIR + "%s-out.xml" % basename - guest = virtinst.Guest(self.kvmconn, parsexml=open(infile).read()) - - guest.cpu.copy_host_cpu(guest) - guest.cpu.clear() - utils.diff_compare(guest.get_xml(), outfile) - - def testDomainRoundtrip(self): - # Make sure our XML engine doesn't mangle non-libvirt XML bits - infile = DATADIR + "domain-roundtrip.xml" - outfile = DATADIR + "domain-roundtrip.xml" - guest = virtinst.Guest(self.conn, parsexml=open(infile).read()) - - utils.diff_compare(guest.get_xml(), outfile) - - def testYesNoUnexpectedParse(self): - # Make sure that if we see an unexpected yes/no or on/off value, - # we just return it to the user and don't error. Libvirt could - # change our assumptions and we shouldn't be too restrictive - xml = ("\n \n" - "
\n") - dev = virtinst.DeviceHostdev(self.conn, parsexml=xml) - - assert dev.managed == "foo" - assert dev.rom_bar == "wibble" - assert dev.scsi_bus == "hello" - - dev.managed = "test1" - dev.rom_bar = "test2" - assert dev.managed == "test1" - assert dev.rom_bar == "test2" - - with pytest.raises(ValueError): - dev.scsi_bus = "goodbye" - - def testXMLBuilderCoverage(self): - """ - Test XMLBuilder corner cases - """ - with pytest.raises(RuntimeError, match=".*'foo'.*"): - # Ensure we validate root element - virtinst.DeviceDisk(self.conn, parsexml="") - - with pytest.raises(Exception, match=".*xmlParseDoc.*"): - # Ensure we validate root element - virtinst.DeviceDisk(self.conn, parsexml=-1) - - with pytest.raises(virtinst.xmlutil.DevError): - raise virtinst.xmlutil.DevError("for coverage") - - with pytest.raises(ValueError): - virtinst.DeviceDisk.validate_generic_name("objtype", None) - - with pytest.raises(ValueError): - virtinst.DeviceDisk.validate_generic_name("objtype", "foo bar") - - # Test property __repr__ for code coverage - assert "DeviceAddress" in str(virtinst.DeviceDisk.address) - assert "./driver/@cache" in str(virtinst.DeviceDisk.driver_cache) - - # Conversion of 0x value into int - xml = """ - -
- - """ - dev = virtinst.DeviceController(self.conn, parsexml=xml) - assert dev.address.slot == 4 - - # Some XML formatting and get_xml_* corner cases - conn = utils.URIs.openconn(utils.URIs.test_suite) - xml = conn.lookupByName("test-for-virtxml").XMLDesc(0) - guest = virtinst.Guest(conn, parsexml=xml) - - assert guest.features.get_xml().startswith("\n""" + guest.cpu.add_feature("addfeature") + + check = _make_checker(guest.numatune) + check("memory_mode", "interleave", "strict", None) + check("memory_nodeset", "1-5,^3,7", "2,4,6") + + check = _make_checker(guest.memtune) + check("hard_limit", None, 1024, 2048) + check("soft_limit", None, 100, 200) + check("swap_hard_limit", None, 300, 400) + check("min_guarantee", None, 400, 500) + + check = _make_checker(guest.blkiotune) + check("weight", None, 100, 200) + check = _make_checker(guest.blkiotune.devices.add_new()) + check("weight", None, 300) + check("path", None, "/home/1.img") + + check = _make_checker(guest.idmap) + check("uid_start", None, 0) + check("uid_target", None, 1000) + check("uid_count", None, 10) + check("gid_start", None, 0) + check("gid_target", None, 1000) + check("gid_count", None, 10) + + check = _make_checker(guest.resource) + check("partition", None, "/virtualmachines/production") + + check = _make_checker(guest.devices.memballoon[0]) + check("model", "virtio", "none") + + check = _make_checker(guest.memoryBacking) + check("hugepages", False, True) + check("nosharepages", False, True) + check("locked", False, True) + + page = guest.memoryBacking.pages.add_new() + check = _make_checker(page) + check("size", None, 1) + check("unit", None, "G") + + assert guest.is_full_os_container() is False + _alter_compare(conn, guest.get_xml(), outfile) + + +def testAlterCpuMode(): + conn = utils.URIs.open_testdefault_cached() + xml = open(DATADIR + "change-cpumode-in.xml").read() + outfile = DATADIR + "change-cpumode-out.xml" + conn = utils.URIs.openconn(utils.URIs.kvm_q35) + guest = virtinst.Guest(conn, xml) + check = _make_checker(guest.cpu) + + guest.cpu.model = "foo" + check("mode", "host-passthrough") + guest.cpu.check_security_features(guest) + check("secure", False) + guest.cpu.set_special_mode(guest, "host-model") + check("mode", "host-model") + guest.cpu.check_security_features(guest) + check("secure", False) + guest.cpu.set_model(guest, "qemu64") + check("model", "qemu64") + guest.cpu.check_security_features(guest) + check("secure", False) + + # Test actually filling in security values, and removing them + guest.cpu.secure = True + guest.cpu.set_model(guest, "Skylake-Client-IBRS") + guest.cpu.check_security_features(guest) + check("secure", True) + guest.cpu.set_model(guest, "EPYC-IBPB") + guest.cpu.check_security_features(guest) + check("secure", True) + guest.cpu.secure = False + guest.cpu.set_model(guest, "Skylake-Client-IBRS") + guest.cpu.check_security_features(guest) + check("secure", False) + _alter_compare(conn, guest.get_xml(), outfile) + + # Hits a codepath when domcaps don't provide the needed info + emptyconn = utils.URIs.open_testdefault_cached() + guest = virtinst.Guest(emptyconn, xml) + guest.cpu.check_security_features(guest) + assert guest.cpu.secure is False + + +def testAlterDisk(): + """ + Test changing DeviceDisk() parameters after parsing + """ + conn = utils.URIs.open_testdefault_cached() + guest, outfile = _get_test_content(conn, "change-disk") + + def _get_disk(target): + for disk in guest.devices.disk: + if disk.target == target: + return disk + + disk = _get_disk("hda") + check = _make_checker(disk) + check("path", "/tmp/test.img", "/dev/foo/null") + disk.sync_path_props() + check("driver_name", None, "test") + check("driver_type", None, "raw") + check("serial", "WD-WMAP9A966149", "frob") + check("wwn", None, "123456789abcdefa") + check("bus", "ide", "usb") + check("removable", None, False, True) + + disk = guest.devices.disk[1] + check = _make_checker(disk.seclabels[1]) + check("model", "dac") + check("relabel", None, True) + check("label", None, "foo-my-label") + + disk = _get_disk("hdc") + check = _make_checker(disk) + check("type", "block", "dir", "file", "block") + check("path", "/dev/null", None) + disk.sync_path_props() + check("device", "cdrom", "floppy") + check("read_only", True, False) + check("target", "hdc", "fde") + check("bus", "ide", "fdc") + check("error_policy", "stop", None) + + disk = _get_disk("hdd") + check = _make_checker(disk) + check("type", "block") + check("device", "lun") + check("sgio", None, "unfiltered") + check("rawio", None, "yes") + + disk = _get_disk("sda") + check = _make_checker(disk) + check("path", None, "http://[1:2:3:4:5:6:7:8]:1122/my/file") + disk.sync_path_props() + + disk = _get_disk("fda") + check = _make_checker(disk) + check("path", None, "/dev/default-pool/default-vol") + disk.sync_path_props() + check("startup_policy", None, "optional") + check("shareable", False, True) + check("driver_cache", None, "writeback") + check("driver_io", None, "threads") + check("driver_io", "threads", "native") + check("driver_discard", None, "unmap") + check("driver_detect_zeroes", None, "unmap") + check("iotune_ris", 1, 0) + check("iotune_rbs", 2, 0) + check("iotune_wis", 3, 0) + check("iotune_wbs", 4, 0) + check("iotune_tis", None, 5) + check("iotune_tbs", None, 6) + check = _make_checker(disk.boot) + check("order", None, 7, None) + + disk = _get_disk("vdb") + check = _make_checker(disk) + check("source_pool", "defaultPool", "anotherPool") + check("source_volume", "foobar", "newvol") + + disk = _get_disk("vdc") + check = _make_checker(disk) + check("source_protocol", "rbd", "gluster") + check("source_name", "pool/image", "new-val/vol") + check("source_host_name", "mon1.example.org", "diff.example.org") + check("source_host_port", 6321, 1234) + check("path", "gluster://diff.example.org:1234/new-val/vol") + + disk = _get_disk("vdd") + check = _make_checker(disk) + check("source_protocol", "nbd") + check("source_host_transport", "unix") + check("source_host_socket", "/var/run/nbdsock") + check("path", "nbd+unix:///var/run/nbdsock") + + _alter_compare(conn, guest.get_xml(), outfile) + + +def testAlterDevicesBootorder(): + conn = utils.URIs.open_testdefault_cached() + basename = "change-devices-bootorder" + guest, outfile = _get_test_content(conn, basename) + disk_1 = guest.devices.disk[0] + disk_2 = guest.devices.disk[1] + disk_3 = guest.devices.disk[2] + disk_4 = guest.devices.disk[3] + iface_1 = guest.devices.interface[0] + iface_2 = guest.devices.interface[1] + redirdev_1 = guest.devices.redirdev[0] + + assert guest.os.bootorder == ['hd'] + assert disk_1.boot.order is None + assert disk_2.boot.order == 10 + assert disk_3.boot.order == 10 + assert disk_4.boot.order == 1 + assert iface_1.boot.order == 2 + assert iface_2.boot.order is None + assert redirdev_1.boot.order == 3 + + guest.reorder_boot_order(disk_1, 1) + + assert guest.os.bootorder == [] + assert disk_1.boot.order == 1 + assert disk_2.boot.order == 10 + assert disk_3.boot.order == 10 + # verify that the used algorithm preserves the order of + # records with equal boot indices + assert disk_2 is guest.devices.disk[1] + assert disk_3 is guest.devices.disk[2] + assert disk_4.boot.order == 2 + assert iface_1.boot.order == 3 + assert iface_2.boot.order is None + assert redirdev_1.boot.order == 4 + + try: + _alter_compare(conn, guest.get_xml(), outfile) + except RuntimeError as error: + assert "unsupported configuration" in str(error) + + guest.reorder_boot_order(disk_2, 10) + assert disk_2.boot.order == 10 + assert disk_3.boot.order == 11 + assert disk_2 is guest.devices.disk[1] + assert disk_3 is guest.devices.disk[2] + + outfile = _gen_outfile_path("change-devices-bootorder-fixed") + _alter_compare(conn, guest.get_xml(), outfile) + + +def testSingleDisk(): + conn = utils.URIs.open_testdefault_cached() + xml = ("""\n""" + """\n""") + conn = utils.URIs.open_testdefault_cached() + d = virtinst.DeviceDisk(conn, parsexml=xml) + _set_and_check(d, "target", "hda", "hdb") + assert xml.replace("hda", "hdb") == d.get_xml() + + +def testAlterChars(): + conn = utils.URIs.open_testdefault_cached() + guest, outfile = _get_test_content(conn, "change-chars") + + serial1 = guest.devices.serial[0] + serial2 = guest.devices.serial[1] + parallel1 = guest.devices.parallel[0] + parallel2 = guest.devices.parallel[1] + console1 = guest.devices.console[0] + console2 = guest.devices.console[1] + channel1 = guest.devices.channel[0] + channel2 = guest.devices.channel[1] + channel3 = guest.devices.channel[2] + + check = _make_checker(serial1) + check("type", "null", "udp") + check("source.bind_host", None, "example.com") + check("source.bind_service", None, 66) + check("source.connect_host", None, "example.com.uk") + check("source.connect_service", None, 77) + + check = _make_checker(serial2) + check("type", "tcp") + check("source.protocol", "telnet", "raw") + check("source.mode", "bind", "connect") + + check = _make_checker(parallel1) + check("source.mode", "bind") + check("source.path", "/tmp/foobar", None) + check("type", "unix", "pty") + + check = _make_checker(parallel2) + check("type", "udp") + check("source.bind_service", 1111, 1357) + check("source.bind_host", "my.bind.host", "my.foo.host") + check("source.connect_service", 2222, 7777) + check("source.connect_host", "my.source.host", "source.foo.host") + + check = _make_checker(console1) + check("type", "pty") + check("target_type", None) + + check = _make_checker(console2) + check("type", "file") + check("source.path", "/tmp/foo.img", None) + check("source.path", None, "/root/foo") + check("target_type", "virtio") + check("target_state", None, "connected") + + check = _make_checker(channel1) + check("type", "pty") + check("target_type", "virtio", "bar", "virtio") + check("target_name", "foo.bar.frob", "test.changed") + + check = _make_checker(channel2) + check("type", "unix", "foo", "unix") + check("target_type", "guestfwd") + check("target_address", "1.2.3.4", "5.6.7.8") + check("target_port", 4567, 1199) + + check = _make_checker(channel3) + check("type", "spiceport") + check("source.channel", "org.spice-space.webdav.0", "test.1") + check("target_type", "virtio") + check("target_name", "org.spice-space.webdav.0", "test.2") + assert channel3.get_xml_id() == "./devices/channel[3]" + assert channel3.get_xml_idx() == 2 + + _alter_compare(conn, guest.get_xml(), outfile) + + +def testAlterNics(): + conn = utils.URIs.open_testdefault_cached() + guest, outfile = _get_test_content(conn, "change-nics") + + dev1 = guest.devices.interface[0] + dev2 = guest.devices.interface[1] + dev3 = guest.devices.interface[2] + dev4 = guest.devices.interface[3] + dev5 = guest.devices.interface[4] + + check = _make_checker(dev1) + check("type", "user") + check("model", None, "vmxnet3") + check("source", None, None,) + check("macaddr", "22:11:11:11:11:11", "AA:AA:AA:AA:AA:AA") + check("filterref", None, "foo") + + check = _make_checker(dev2) + check("source", "default", None) + check("type", "network", "bridge") + check("source", None, "newbr0") + check("model", "e1000", "virtio") + + check = _make_checker(dev3) + check("type", "bridge") + check("source", "foobr0", "newfoo0") + check("macaddr", "22:22:22:22:22:22") + check("target_dev", None, "test1") + + check = _make_checker(dev4) + check("type", "ethernet") + check("target_dev", "nic02", "nic03") + check("target_dev", "nic03", None) + + check = _make_checker(dev5) + check("type", "direct") + check("source", "eth0.1") + check("source_mode", "vepa", "bridge") + check("portgroup", None, "sales") + check("driver_name", None, "vhost") + check("driver_queues", None, 5) + + virtualport = dev5.virtualport + check = _make_checker(virtualport) + check("type", "802.1Qbg", "foo", "802.1Qbg") + check("managerid", 12, 11) + check("typeid", 1193046, 1193047) + check("typeidversion", 1, 2) + check("instanceid", "09b11c53-8b5c-4eeb-8f00-d84eaa0aaa3b", + "09b11c53-8b5c-4eeb-8f00-d84eaa0aaa4f") + + _alter_compare(conn, guest.get_xml(), outfile) + + +def testQEMUXMLNS(): + kvmconn = utils.URIs.open_kvm() + basename = "change-xmlns-qemu" + infile = DATADIR + "%s-in.xml" % basename + outfile = DATADIR + "%s-out.xml" % basename + guest = virtinst.Guest(kvmconn, parsexml=open(infile).read()) + + check = _make_checker(guest.xmlns_qemu.args[0]) + check("value", "-somearg", "-somenewarg") + check = _make_checker(guest.xmlns_qemu.args[1]) + check("value", "bar,baz=wib wob", "testval") + guest.xmlns_qemu.args.add_new().value = "additional-arg" + arg0 = guest.xmlns_qemu.args[0] + guest.xmlns_qemu.remove_child(guest.xmlns_qemu.args[0]) + x = "\n" + assert arg0.get_xml() == x + + check = _make_checker(guest.xmlns_qemu.envs[0]) + check("name", "SOMEENV") + check("value", "foo=bar baz,foo") + env = guest.xmlns_qemu.envs.add_new() + env.name = "DISPLAY" + env.value = "1:2" + + _alter_compare(kvmconn, guest.get_xml(), outfile) + + +def testAddRemoveDevices(): + conn = utils.URIs.open_testdefault_cached() + guest, outfile = _get_test_content(conn, "add-devices") + + # Basic removal of existing device + rmdev = guest.devices.disk[2] + guest.remove_device(rmdev) + + # Basic device add + d = virtinst.DeviceWatchdog(conn) + d.set_defaults(guest) + guest.add_device(d) + + # Test adding device with child properties (address value) + adddev = virtinst.DeviceInterface(conn) + adddev.type = "network" + adddev.source = "default" + adddev.macaddr = "1A:2A:3A:4A:5A:6A" + adddev.address.type = "spapr-vio" + adddev.set_defaults(guest) + + # Test adding and removing the same device + guest.add_device(adddev) + guest.remove_device(adddev) + guest.add_device(adddev) + + # Test adding device built from parsed XML + guest.add_device(virtinst.DeviceSound(conn, + parsexml="""""")) + + _alter_compare(conn, guest.get_xml(), outfile) + + +def testChangeKVMMedia(): + kvmconn = utils.URIs.open_kvm() + guest, outfile = _get_test_content(kvmconn, "change-media") + + disk = guest.devices.disk[0] + check = _make_checker(disk) + check("path", None, "/dev/default-pool/default-vol") + disk.sync_path_props() + + disk = guest.devices.disk[1] + check = _make_checker(disk) + check("path", None, "/dev/default-pool/default-vol") + check("path", "/dev/default-pool/default-vol", "/dev/disk-pool/diskvol1") + disk.sync_path_props() + + disk = guest.devices.disk[2] + check = _make_checker(disk) + check("path", None, "/dev/disk-pool/diskvol1") + disk.sync_path_props() + + disk = guest.devices.disk[3] + check = _make_checker(disk) + check("path", None, "/dev/default-pool/default-vol") + disk.sync_path_props() + + disk = guest.devices.disk[4] + check = _make_checker(disk) + check("path", None, "/dev/disk-pool/diskvol1") + disk.sync_path_props() + + _alter_compare(kvmconn, guest.get_xml(), outfile) + + +def testGuestBootorder(): + kvmconn = utils.URIs.open_kvm() + guest, outfile = _get_test_content(kvmconn, "bootorder") + + assert guest.get_boot_order() == ['./devices/disk[1]'] + assert guest.get_boot_order(legacy=True) == ['hd'] + + legacy_order = ['hd', 'fd', 'cdrom', 'network'] + dev_order = ['./devices/disk[1]', + './devices/disk[3]', + './devices/disk[2]', + './devices/interface[1]'] + guest.set_boot_order(legacy_order, legacy=True) + assert guest.get_boot_order() == dev_order + assert guest.get_boot_order(legacy=True) == legacy_order + + guest.set_boot_order(dev_order) + assert guest.get_boot_order() == dev_order + assert guest.get_boot_order(legacy=True) == [] + + _alter_compare(kvmconn, guest.get_xml(), outfile) + + +def testDiskChangeBus(): + conn = utils.URIs.open_testdefault_cached() + guest, outfile = _get_test_content(conn, "disk-change-bus") + + disk = guest.devices.disk[0] + + # Same bus is a no-op + origxml = disk.get_xml() + disk.change_bus(guest, "virtio") + assert origxml == disk.get_xml() + disk.change_bus(guest, "ide") + + disk = guest.devices.disk[2] + disk.change_bus(guest, "scsi") + + _alter_compare(conn, guest.get_xml(), outfile) + + +################## +# Snapshot tests # +################## + + +def testChangeSnapshot(): + conn = utils.URIs.open_testdefault_cached() + basename = "change-snapshot" + infile = DATADIR + "%s-in.xml" % basename + outfile = DATADIR + "%s-out.xml" % basename + snap = virtinst.DomainSnapshot(conn, parsexml=open(infile).read()) + + check = _make_checker(snap) + check("name", "offline-root-child1", "name-foo") + check("state", "shutoff", "somestate") + check("description", "offline desk", "foo\nnewline\n indent") + check("parent", "offline-root", "newparent") + check("creationTime", 1375905916, 1234) + check("memory_type", "no", "internal") + + check = _make_checker(snap.disks[0]) + check("name", "hda", "hdb") + check("snapshot", "internal", "no") + + utils.diff_compare(snap.get_xml(), outfile) + + +################# +# Storage tests # +################# + + +def testFSPool(): + conn = utils.URIs.open_testdefault_cached() + basename = "pool-fs" + infile = DATADIR + "%s.xml" % basename + outfile = DATADIR + "%s-out.xml" % basename + pool = virtinst.StoragePool(conn, parsexml=open(infile).read()) + + check = _make_checker(pool) + check("type", "fs", "dir") + check("name", "pool-fs", "foo-new") + check("uuid", "10211510-2115-1021-1510-211510211510", + "10211510-2115-1021-1510-211510211999") + check("capacity", 984373075968, 200000) + check("allocation", 756681687040, 150000) + check("available", 227691388928, 50000) + + check("format", "auto", "ext3") + check("source_path", "/some/source/path", "/dev/foo/bar") + check("target_path", "/some/target/path", "/mnt/my/foo") + check("source_name", None, "fooname") + + utils.diff_compare(pool.get_xml(), outfile) + utils.test_create(conn, pool.get_xml(), "storagePoolDefineXML") + + +def testISCSIPool(): + conn = utils.URIs.open_testdefault_cached() + basename = "pool-iscsi" + infile = utils.DATADIR + "/storage/%s.xml" % basename + outfile = DATADIR + "%s-out.xml" % basename + pool = virtinst.StoragePool(conn, parsexml=open(infile).read()) + + check = _make_checker(pool) + check("iqn", "foo.bar.baz.iqn", "my.iqn") + check = _make_checker(pool.hosts[0]) + check("name", "some.random.hostname", "my.host") + + utils.diff_compare(pool.get_xml(), outfile) + utils.test_create(conn, pool.get_xml(), "storagePoolDefineXML") + + +def testGlusterPool(): + conn = utils.URIs.open_testdefault_cached() + basename = "pool-gluster" + infile = utils.DATADIR + "/storage/%s.xml" % basename + outfile = DATADIR + "%s-out.xml" % basename + pool = virtinst.StoragePool(conn, parsexml=open(infile).read()) + + check = _make_checker(pool) + check("source_path", "/some/source/path", "/foo") + check = _make_checker(pool.hosts[0]) + check("name", "some.random.hostname", "my.host") + + utils.diff_compare(pool.get_xml(), outfile) + utils.test_create(conn, pool.get_xml(), "storagePoolDefineXML") + + +def testRBDPool(): + conn = utils.URIs.open_testdefault_cached() + basename = "pool-rbd" + infile = DATADIR + "%s.xml" % basename + outfile = DATADIR + "%s-out.xml" % basename + pool = virtinst.StoragePool(conn, parsexml=open(infile).read()) + + check = _make_checker(pool.hosts[0]) + check("name", "ceph-mon-1.example.com") + check("port", 6789, 1234) + check = _make_checker(pool.hosts[1]) + check("name", "ceph-mon-2.example.com", "foo.bar") + check("port", 6789) + check = _make_checker(pool.hosts[2]) + check("name", "ceph-mon-3.example.com") + check("port", 6789, 1000) + hostobj = pool.hosts.add_new() + hostobj.name = "frobber" + hostobj.port = "5555" + + utils.diff_compare(pool.get_xml(), outfile) + utils.test_create(conn, pool.get_xml(), "storagePoolDefineXML") + + +def testVol(): + conn = utils.URIs.open_testdefault_cached() + basename = "pool-dir-vol" + infile = DATADIR + "%s-in.xml" % basename + outfile = DATADIR + "%s-out.xml" % basename + vol = virtinst.StorageVolume(conn, parsexml=open(infile).read()) + + check = _make_checker(vol) + check("type", None, "file") + check("key", None, "fookey") + check("capacity", 10737418240, 2000) + check("allocation", 5368709120, 1000) + check("format", "raw", "qcow2") + check("target_path", None, "/foo/bar") + check("backing_store", "/foo/bar/baz", "/my/backing") + check("lazy_refcounts", False, True) + + check = _make_checker(vol.permissions) + check("mode", "0700", "0744") + check("owner", "10736", "10000") + check("group", "10736", "10000") + check("label", None, "foo.label") + + utils.diff_compare(vol.get_xml(), outfile) + + +################### +# tests # +################### + + +def testNetMulti(): + conn = utils.URIs.open_testdefault_cached() + basename = "network-multi" + infile = DATADIR + "%s-in.xml" % basename + outfile = DATADIR + "%s-out.xml" % basename + net = virtinst.Network(conn, parsexml=open(infile).read()) + + check = _make_checker(net) + check("name", "ipv6_multirange", "new-foo") + check("uuid", "41b4afe4-87bb-8087-6724-5e208a2d483a", + "41b4afe4-87bb-8087-6724-5e208a2d1111") + check("bridge", "virbr3", "virbr3new") + check("stp", True, False) + check("delay", 0, 2) + check("domain_name", "net7", "newdom") + check("ipv6", None, True) + check("macaddr", None, "52:54:00:69:eb:FF") + check("virtualport_type", None, "openvswitch") + + assert len(net.portgroups) == 2 + check = _make_checker(net.portgroups[0]) + check("name", "engineering", "foo") + check("default", True, False) + + assert len(net.ips) == 4 + check = _make_checker(net.ips[0]) + check("address", "192.168.7.1", "192.168.8.1") + check("netmask", "255.255.255.0", "255.255.254.0") + assert net.can_pxe() is False + check("tftp", None, "/var/lib/tftproot") + check("bootp_file", None, "pxeboot.img") + check("bootp_server", None, "1.2.3.4") + assert net.can_pxe() is True + + check = _make_checker(net.forward) + check("mode", "nat", "route") + check("dev", None, "eth22") + assert net.can_pxe() is True + + check = _make_checker(net.ips[0].ranges[0]) + check("start", "192.168.7.128", "192.168.8.128") + check("end", "192.168.7.254", "192.168.8.254") + + check = _make_checker(net.ips[0].hosts[1]) + check("macaddr", "52:54:00:69:eb:91", "52:54:00:69:eb:92") + check("name", "badbob", "newname") + check("ip", "192.168.7.3", "192.168.8.3") + + check = _make_checker(net.ips[1]) + check("family", "ipv6", "ipv6") + check("prefix", 64, 63) + + r = net.routes.add_new() + r.family = "ipv4" + r.address = "192.168.8.0" + r.prefix = "24" + r.gateway = "192.168.8.10" + check = _make_checker(r) + check("netmask", None, "foo", None) + + utils.diff_compare(net.get_xml(), outfile) + utils.test_create(conn, net.get_xml(), "networkDefineXML") + + +def testNetOpen(): + conn = utils.URIs.open_testdefault_cached() + basename = "network-open" + infile = DATADIR + "%s-in.xml" % basename + outfile = DATADIR + "%s-out.xml" % basename + net = virtinst.Network(conn, parsexml=open(infile).read()) + + check = _make_checker(net) + check("name", "open", "new-foo") + check("domain_name", "open", "newdom") + + check = _make_checker(net.forward) + check("mode", "open") + check("dev", None) + + assert len(net.ips) == 1 + check = _make_checker(net.ips[0]) + check("address", "192.168.100.1", "192.168.101.1") + check("netmask", "255.255.255.0", "255.255.254.0") + + check = _make_checker(net.ips[0].ranges[0]) + check("start", "192.168.100.128", "192.168.101.128") + check("end", "192.168.100.254", "192.168.101.254") + + utils.diff_compare(net.get_xml(), outfile) + utils.test_create(conn, net.get_xml(), "networkDefineXML") + + +def testNetVfPool(): + conn = utils.URIs.open_testdefault_cached() + basename = "network-vf-pool" + infile = DATADIR + "%s-in.xml" % basename + outfile = DATADIR + "%s-out.xml" % basename + net = virtinst.Network(conn, parsexml=open(infile).read()) + + check = _make_checker(net) + check("name", "passthrough", "new-foo") + + check = _make_checker(net.forward) + check("mode", "hostdev") + check("managed", "yes") + + check = _make_checker(net.forward.pf[0]) + check("dev", "eth3") + + utils.diff_compare(net.get_xml(), outfile) + utils.test_create(conn, net.get_xml(), "networkDefineXML") + + +############## +# Misc tests # +############## + + +def testCPUUnknownClear(): + # Make sure .clear() even removes XML elements we don't know about + kvmconn = utils.URIs.open_kvm() + basename = "clear-cpu-unknown-vals" + infile = DATADIR + "%s-in.xml" % basename + outfile = DATADIR + "%s-out.xml" % basename + guest = virtinst.Guest(kvmconn, parsexml=open(infile).read()) + + guest.cpu.copy_host_cpu(guest) + guest.cpu.clear() + utils.diff_compare(guest.get_xml(), outfile) + + +def testDomainRoundtrip(): + conn = utils.URIs.open_testdefault_cached() + # Make sure our XML engine doesn't mangle non-libvirt XML bits + infile = DATADIR + "domain-roundtrip.xml" + outfile = DATADIR + "domain-roundtrip.xml" + guest = virtinst.Guest(conn, parsexml=open(infile).read()) + + utils.diff_compare(guest.get_xml(), outfile) + + +def testYesNoUnexpectedParse(): + conn = utils.URIs.open_testdefault_cached() + # Make sure that if we see an unexpected yes/no or on/off value, + # we just return it to the user and don't error. Libvirt could + # change our assumptions and we shouldn't be too restrictive + xml = ("\n \n" + "
\n") + dev = virtinst.DeviceHostdev(conn, parsexml=xml) + + assert dev.managed == "foo" + assert dev.rom_bar == "wibble" + assert dev.scsi_bus == "hello" + + dev.managed = "test1" + dev.rom_bar = "test2" + assert dev.managed == "test1" + assert dev.rom_bar == "test2" + + with pytest.raises(ValueError): + dev.scsi_bus = "goodbye" + + +def testXMLBuilderCoverage(): + """ + Test XMLBuilder corner cases + """ + conn = utils.URIs.open_testdefault_cached() + + with pytest.raises(RuntimeError, match=".*'foo'.*"): + # Ensure we validate root element + virtinst.DeviceDisk(conn, parsexml="") + + with pytest.raises(Exception, match=".*xmlParseDoc.*"): + # Ensure we validate root element + virtinst.DeviceDisk(conn, parsexml=-1) + + with pytest.raises(virtinst.xmlutil.DevError): + raise virtinst.xmlutil.DevError("for coverage") + + with pytest.raises(ValueError): + virtinst.DeviceDisk.validate_generic_name("objtype", None) + + with pytest.raises(ValueError): + virtinst.DeviceDisk.validate_generic_name("objtype", "foo bar") + + # Test property __repr__ for code coverage + assert "DeviceAddress" in str(virtinst.DeviceDisk.address) + assert "./driver/@cache" in str(virtinst.DeviceDisk.driver_cache) + + # Conversion of 0x value into int + xml = """ + +
+ + """ + dev = virtinst.DeviceController(conn, parsexml=xml) + assert dev.address.slot == 4 + + # Some XML formatting and get_xml_* corner cases + conn = utils.URIs.openconn(utils.URIs.test_suite) + xml = conn.lookupByName("test-for-virtxml").XMLDesc(0) + guest = virtinst.Guest(conn, parsexml=xml) + + assert guest.features.get_xml().startswith("