mirror of
https://github.com/virt-manager/virt-manager.git
synced 2025-01-21 18:03:58 +03:00
fa322588b4
Kill usage of the TestCase class, move more to pytest standard style Signed-off-by: Cole Robinson <crobinso@redhat.com>
1137 lines
36 KiB
Python
1137 lines
36 KiB
Python
# Copyright (C) 2013, 2014 Red Hat, Inc.
|
|
#
|
|
# This work is licensed under the GNU GPLv2 or later.
|
|
# See the COPYING file in the top-level directory.
|
|
|
|
import pytest
|
|
|
|
import virtinst
|
|
|
|
from tests import utils
|
|
|
|
|
|
DATADIR = utils.DATADIR + "/xmlparse/"
|
|
|
|
|
|
####################
|
|
# Helper functions #
|
|
####################
|
|
|
|
def _sanitize_file_xml(xml):
|
|
# s/"/'/g from generated XML, matches what libxml dumps out
|
|
# This won't work all the time, but should be good enough for testing
|
|
return xml.replace("'", "\"")
|
|
|
|
|
|
def _alter_compare(conn, actualXML, outfile):
|
|
utils.diff_compare(actualXML, outfile)
|
|
utils.test_create(conn, actualXML)
|
|
|
|
|
|
def _set_and_check(obj, param, initval, *args):
|
|
"""
|
|
Check expected initial value obj.param == initval, then
|
|
set newval, and make sure it is returned properly
|
|
"""
|
|
curval = virtinst.xmlutil.get_prop_path(obj, param)
|
|
assert initval == curval
|
|
|
|
for newval in args:
|
|
virtinst.xmlutil.set_prop_path(obj, param, newval)
|
|
curval = virtinst.xmlutil.get_prop_path(obj, param)
|
|
assert newval == curval
|
|
|
|
|
|
def _make_checker(obj):
|
|
def check(name, initval, *args):
|
|
return _set_and_check(obj, name, initval, *args)
|
|
return check
|
|
|
|
|
|
def _gen_outfile_path(basename):
|
|
"""
|
|
Returns relative path to the file containing the expected XML
|
|
output
|
|
|
|
"""
|
|
return DATADIR + "{!s}-out.xml".format(basename)
|
|
|
|
|
|
def _get_test_content(conn, basename):
|
|
infile = DATADIR + "%s-in.xml" % basename
|
|
outfile = _gen_outfile_path(basename)
|
|
guest = virtinst.Guest(conn, parsexml=open(infile).read())
|
|
return guest, outfile
|
|
|
|
|
|
##############
|
|
# Test cases #
|
|
##############
|
|
|
|
def testAlterGuest():
|
|
"""
|
|
Test changing Guest() parameters after parsing
|
|
"""
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
guest, outfile = _get_test_content(conn, "change-guest")
|
|
|
|
check = _make_checker(guest)
|
|
|
|
# Check specific vcpu_current behaviro
|
|
check("vcpus", 5, 10)
|
|
assert guest.vcpu_current is None
|
|
check("vcpu_current", None, 15)
|
|
guest.vcpus = 12
|
|
assert guest.vcpu_current == 12
|
|
guest.vcpu_current = 10
|
|
|
|
check("name", "TestGuest", "change_name")
|
|
check("id", None, 1234)
|
|
check("description", None, "Hey desc changed&")
|
|
check("title", None, "Hey title changed!")
|
|
check("vcpu_cpuset", "1-3", "1-8,^6", "1-5,15")
|
|
check("memory", 409600, 512000)
|
|
check("currentMemory", 204800, 1024000)
|
|
check("memory", 1024000, 2048000)
|
|
check("uuid", "12345678-1234-1234-1234-123456789012",
|
|
"11111111-2222-3333-4444-555555555555")
|
|
check("emulator", "/usr/lib/xen/bin/qemu-dm", "/usr/binnnn/fooemu")
|
|
check("type", "kvm", "test")
|
|
check("bootloader", None, "pygrub")
|
|
check("on_poweroff", "destroy", "restart")
|
|
check("on_reboot", "restart", "destroy")
|
|
check("on_crash", "destroy", "restart")
|
|
check("on_lockfailure", "poweroff", "restart")
|
|
|
|
check = _make_checker(guest._metadata.libosinfo) # pylint: disable=protected-access
|
|
check("os_id", "http://fedoraproject.org/fedora/17")
|
|
guest.set_os_name("fedora10")
|
|
check("os_id", "http://fedoraproject.org/fedora/10")
|
|
assert guest.osinfo.name == "fedora10"
|
|
guest.set_os_name("generic")
|
|
check("os_id", None, "frib")
|
|
assert guest.osinfo.name == "generic"
|
|
|
|
check = _make_checker(guest.clock)
|
|
check("offset", "utc", "localtime")
|
|
guest.clock.remove_child(guest.clock.timers[0])
|
|
check = _make_checker(guest.clock.timers[0])
|
|
check("name", "pit", "rtc")
|
|
check("tickpolicy", "delay", "merge")
|
|
timer = guest.clock.timers.add_new()
|
|
check = _make_checker(timer)
|
|
check("name", None, "hpet")
|
|
check("present", None, False)
|
|
|
|
check = _make_checker(guest.pm)
|
|
check("suspend_to_mem", False, True)
|
|
check("suspend_to_disk", None, False)
|
|
|
|
check = _make_checker(guest.os)
|
|
check("os_type", "hvm", "xen")
|
|
check("arch", "i686", None)
|
|
check("machine", "foobar", "pc-0.11")
|
|
check("loader", None, "/foo/loader")
|
|
check("init", None, "/sbin/init")
|
|
check("bootorder", ["hd"], ["fd"])
|
|
check("enable_bootmenu", None, False)
|
|
check("useserial", None, True)
|
|
check("kernel", None)
|
|
check("initrd", None)
|
|
check("kernel_args", None)
|
|
|
|
guest.os.set_initargs_string("foo 'bar baz' frib")
|
|
assert [i.val for i in guest.os.initargs] == ["foo", "bar baz", "frib"]
|
|
|
|
check = _make_checker(guest.features)
|
|
check("acpi", True, False)
|
|
check("apic", True, True)
|
|
check("eoi", None, True)
|
|
check("pae", False, False)
|
|
check("viridian", False, True)
|
|
check("hap", False, False)
|
|
check("privnet", False, False)
|
|
check("hyperv_relaxed", None, True)
|
|
check("hyperv_vapic", False, None)
|
|
check("hyperv_spinlocks", True, True)
|
|
check("hyperv_spinlocks_retries", 12287, 54321)
|
|
check("vmport", False, True)
|
|
check("vmcoreinfo", None, True)
|
|
check("kvm_hidden", None, True)
|
|
check("pvspinlock", None, True)
|
|
check("gic_version", None, False)
|
|
|
|
check = _make_checker(guest.cpu)
|
|
check("match", "exact", "strict")
|
|
guest.cpu.set_model(guest, "qemu64")
|
|
check("model", "qemu64")
|
|
check("vendor", "Intel", "qemuvendor")
|
|
check("topology.threads", 2, 1)
|
|
check("topology.cores", 5, 3)
|
|
guest.cpu.topology.sockets = 4.0
|
|
check("topology.sockets", 4)
|
|
|
|
check = _make_checker(guest.cpu.features[0])
|
|
check("name", "x2apic")
|
|
check("policy", "force", "disable")
|
|
rmfeat = guest.cpu.features[3]
|
|
guest.cpu.remove_child(rmfeat)
|
|
assert rmfeat.get_xml() == """<feature name="foo" policy="bar"/>\n"""
|
|
guest.cpu.add_feature("addfeature")
|
|
|
|
check = _make_checker(guest.numatune)
|
|
check("memory_mode", "interleave", "strict", None)
|
|
check("memory_nodeset", "1-5,^3,7", "2,4,6")
|
|
|
|
check = _make_checker(guest.memtune)
|
|
check("hard_limit", None, 1024, 2048)
|
|
check("soft_limit", None, 100, 200)
|
|
check("swap_hard_limit", None, 300, 400)
|
|
check("min_guarantee", None, 400, 500)
|
|
|
|
check = _make_checker(guest.blkiotune)
|
|
check("weight", None, 100, 200)
|
|
check = _make_checker(guest.blkiotune.devices.add_new())
|
|
check("weight", None, 300)
|
|
check("path", None, "/home/1.img")
|
|
|
|
check = _make_checker(guest.idmap)
|
|
check("uid_start", None, 0)
|
|
check("uid_target", None, 1000)
|
|
check("uid_count", None, 10)
|
|
check("gid_start", None, 0)
|
|
check("gid_target", None, 1000)
|
|
check("gid_count", None, 10)
|
|
|
|
check = _make_checker(guest.resource)
|
|
check("partition", None, "/virtualmachines/production")
|
|
|
|
check = _make_checker(guest.devices.memballoon[0])
|
|
check("model", "virtio", "none")
|
|
|
|
check = _make_checker(guest.memoryBacking)
|
|
check("hugepages", False, True)
|
|
check("nosharepages", False, True)
|
|
check("locked", False, True)
|
|
|
|
page = guest.memoryBacking.pages.add_new()
|
|
check = _make_checker(page)
|
|
check("size", None, 1)
|
|
check("unit", None, "G")
|
|
|
|
assert guest.is_full_os_container() is False
|
|
_alter_compare(conn, guest.get_xml(), outfile)
|
|
|
|
|
|
def testAlterCpuMode():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
xml = open(DATADIR + "change-cpumode-in.xml").read()
|
|
outfile = DATADIR + "change-cpumode-out.xml"
|
|
conn = utils.URIs.openconn(utils.URIs.kvm_q35)
|
|
guest = virtinst.Guest(conn, xml)
|
|
check = _make_checker(guest.cpu)
|
|
|
|
guest.cpu.model = "foo"
|
|
check("mode", "host-passthrough")
|
|
guest.cpu.check_security_features(guest)
|
|
check("secure", False)
|
|
guest.cpu.set_special_mode(guest, "host-model")
|
|
check("mode", "host-model")
|
|
guest.cpu.check_security_features(guest)
|
|
check("secure", False)
|
|
guest.cpu.set_model(guest, "qemu64")
|
|
check("model", "qemu64")
|
|
guest.cpu.check_security_features(guest)
|
|
check("secure", False)
|
|
|
|
# Test actually filling in security values, and removing them
|
|
guest.cpu.secure = True
|
|
guest.cpu.set_model(guest, "Skylake-Client-IBRS")
|
|
guest.cpu.check_security_features(guest)
|
|
check("secure", True)
|
|
guest.cpu.set_model(guest, "EPYC-IBPB")
|
|
guest.cpu.check_security_features(guest)
|
|
check("secure", True)
|
|
guest.cpu.secure = False
|
|
guest.cpu.set_model(guest, "Skylake-Client-IBRS")
|
|
guest.cpu.check_security_features(guest)
|
|
check("secure", False)
|
|
_alter_compare(conn, guest.get_xml(), outfile)
|
|
|
|
# Hits a codepath when domcaps don't provide the needed info
|
|
emptyconn = utils.URIs.open_testdefault_cached()
|
|
guest = virtinst.Guest(emptyconn, xml)
|
|
guest.cpu.check_security_features(guest)
|
|
assert guest.cpu.secure is False
|
|
|
|
|
|
def testAlterDisk():
|
|
"""
|
|
Test changing DeviceDisk() parameters after parsing
|
|
"""
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
guest, outfile = _get_test_content(conn, "change-disk")
|
|
|
|
def _get_disk(target):
|
|
for disk in guest.devices.disk:
|
|
if disk.target == target:
|
|
return disk
|
|
|
|
disk = _get_disk("hda")
|
|
check = _make_checker(disk)
|
|
check("path", "/tmp/test.img", "/dev/foo/null")
|
|
disk.sync_path_props()
|
|
check("driver_name", None, "test")
|
|
check("driver_type", None, "raw")
|
|
check("serial", "WD-WMAP9A966149", "frob")
|
|
check("wwn", None, "123456789abcdefa")
|
|
check("bus", "ide", "usb")
|
|
check("removable", None, False, True)
|
|
|
|
disk = guest.devices.disk[1]
|
|
check = _make_checker(disk.seclabels[1])
|
|
check("model", "dac")
|
|
check("relabel", None, True)
|
|
check("label", None, "foo-my-label")
|
|
|
|
disk = _get_disk("hdc")
|
|
check = _make_checker(disk)
|
|
check("type", "block", "dir", "file", "block")
|
|
check("path", "/dev/null", None)
|
|
disk.sync_path_props()
|
|
check("device", "cdrom", "floppy")
|
|
check("read_only", True, False)
|
|
check("target", "hdc", "fde")
|
|
check("bus", "ide", "fdc")
|
|
check("error_policy", "stop", None)
|
|
|
|
disk = _get_disk("hdd")
|
|
check = _make_checker(disk)
|
|
check("type", "block")
|
|
check("device", "lun")
|
|
check("sgio", None, "unfiltered")
|
|
check("rawio", None, "yes")
|
|
|
|
disk = _get_disk("sda")
|
|
check = _make_checker(disk)
|
|
check("path", None, "http://[1:2:3:4:5:6:7:8]:1122/my/file")
|
|
disk.sync_path_props()
|
|
|
|
disk = _get_disk("fda")
|
|
check = _make_checker(disk)
|
|
check("path", None, "/dev/default-pool/default-vol")
|
|
disk.sync_path_props()
|
|
check("startup_policy", None, "optional")
|
|
check("shareable", False, True)
|
|
check("driver_cache", None, "writeback")
|
|
check("driver_io", None, "threads")
|
|
check("driver_io", "threads", "native")
|
|
check("driver_discard", None, "unmap")
|
|
check("driver_detect_zeroes", None, "unmap")
|
|
check("iotune_ris", 1, 0)
|
|
check("iotune_rbs", 2, 0)
|
|
check("iotune_wis", 3, 0)
|
|
check("iotune_wbs", 4, 0)
|
|
check("iotune_tis", None, 5)
|
|
check("iotune_tbs", None, 6)
|
|
check = _make_checker(disk.boot)
|
|
check("order", None, 7, None)
|
|
|
|
disk = _get_disk("vdb")
|
|
check = _make_checker(disk)
|
|
check("source_pool", "defaultPool", "anotherPool")
|
|
check("source_volume", "foobar", "newvol")
|
|
|
|
disk = _get_disk("vdc")
|
|
check = _make_checker(disk)
|
|
check("source_protocol", "rbd", "gluster")
|
|
check("source_name", "pool/image", "new-val/vol")
|
|
check("source_host_name", "mon1.example.org", "diff.example.org")
|
|
check("source_host_port", 6321, 1234)
|
|
check("path", "gluster://diff.example.org:1234/new-val/vol")
|
|
|
|
disk = _get_disk("vdd")
|
|
check = _make_checker(disk)
|
|
check("source_protocol", "nbd")
|
|
check("source_host_transport", "unix")
|
|
check("source_host_socket", "/var/run/nbdsock")
|
|
check("path", "nbd+unix:///var/run/nbdsock")
|
|
|
|
_alter_compare(conn, guest.get_xml(), outfile)
|
|
|
|
|
|
def testAlterDevicesBootorder():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
basename = "change-devices-bootorder"
|
|
guest, outfile = _get_test_content(conn, basename)
|
|
disk_1 = guest.devices.disk[0]
|
|
disk_2 = guest.devices.disk[1]
|
|
disk_3 = guest.devices.disk[2]
|
|
disk_4 = guest.devices.disk[3]
|
|
iface_1 = guest.devices.interface[0]
|
|
iface_2 = guest.devices.interface[1]
|
|
redirdev_1 = guest.devices.redirdev[0]
|
|
|
|
assert guest.os.bootorder == ['hd']
|
|
assert disk_1.boot.order is None
|
|
assert disk_2.boot.order == 10
|
|
assert disk_3.boot.order == 10
|
|
assert disk_4.boot.order == 1
|
|
assert iface_1.boot.order == 2
|
|
assert iface_2.boot.order is None
|
|
assert redirdev_1.boot.order == 3
|
|
|
|
guest.reorder_boot_order(disk_1, 1)
|
|
|
|
assert guest.os.bootorder == []
|
|
assert disk_1.boot.order == 1
|
|
assert disk_2.boot.order == 10
|
|
assert disk_3.boot.order == 10
|
|
# verify that the used algorithm preserves the order of
|
|
# records with equal boot indices
|
|
assert disk_2 is guest.devices.disk[1]
|
|
assert disk_3 is guest.devices.disk[2]
|
|
assert disk_4.boot.order == 2
|
|
assert iface_1.boot.order == 3
|
|
assert iface_2.boot.order is None
|
|
assert redirdev_1.boot.order == 4
|
|
|
|
try:
|
|
_alter_compare(conn, guest.get_xml(), outfile)
|
|
except RuntimeError as error:
|
|
assert "unsupported configuration" in str(error)
|
|
|
|
guest.reorder_boot_order(disk_2, 10)
|
|
assert disk_2.boot.order == 10
|
|
assert disk_3.boot.order == 11
|
|
assert disk_2 is guest.devices.disk[1]
|
|
assert disk_3 is guest.devices.disk[2]
|
|
|
|
outfile = _gen_outfile_path("change-devices-bootorder-fixed")
|
|
_alter_compare(conn, guest.get_xml(), outfile)
|
|
|
|
|
|
def testSingleDisk():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
xml = ("""<disk type="file" device="disk"><source file="/a.img"/>\n"""
|
|
"""<target dev="hda" bus="ide"/></disk>\n""")
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
d = virtinst.DeviceDisk(conn, parsexml=xml)
|
|
_set_and_check(d, "target", "hda", "hdb")
|
|
assert xml.replace("hda", "hdb") == d.get_xml()
|
|
|
|
|
|
def testAlterChars():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
guest, outfile = _get_test_content(conn, "change-chars")
|
|
|
|
serial1 = guest.devices.serial[0]
|
|
serial2 = guest.devices.serial[1]
|
|
parallel1 = guest.devices.parallel[0]
|
|
parallel2 = guest.devices.parallel[1]
|
|
console1 = guest.devices.console[0]
|
|
console2 = guest.devices.console[1]
|
|
channel1 = guest.devices.channel[0]
|
|
channel2 = guest.devices.channel[1]
|
|
channel3 = guest.devices.channel[2]
|
|
|
|
check = _make_checker(serial1)
|
|
check("type", "null", "udp")
|
|
check("source.bind_host", None, "example.com")
|
|
check("source.bind_service", None, 66)
|
|
check("source.connect_host", None, "example.com.uk")
|
|
check("source.connect_service", None, 77)
|
|
|
|
check = _make_checker(serial2)
|
|
check("type", "tcp")
|
|
check("source.protocol", "telnet", "raw")
|
|
check("source.mode", "bind", "connect")
|
|
|
|
check = _make_checker(parallel1)
|
|
check("source.mode", "bind")
|
|
check("source.path", "/tmp/foobar", None)
|
|
check("type", "unix", "pty")
|
|
|
|
check = _make_checker(parallel2)
|
|
check("type", "udp")
|
|
check("source.bind_service", 1111, 1357)
|
|
check("source.bind_host", "my.bind.host", "my.foo.host")
|
|
check("source.connect_service", 2222, 7777)
|
|
check("source.connect_host", "my.source.host", "source.foo.host")
|
|
|
|
check = _make_checker(console1)
|
|
check("type", "pty")
|
|
check("target_type", None)
|
|
|
|
check = _make_checker(console2)
|
|
check("type", "file")
|
|
check("source.path", "/tmp/foo.img", None)
|
|
check("source.path", None, "/root/foo")
|
|
check("target_type", "virtio")
|
|
check("target_state", None, "connected")
|
|
|
|
check = _make_checker(channel1)
|
|
check("type", "pty")
|
|
check("target_type", "virtio", "bar", "virtio")
|
|
check("target_name", "foo.bar.frob", "test.changed")
|
|
|
|
check = _make_checker(channel2)
|
|
check("type", "unix", "foo", "unix")
|
|
check("target_type", "guestfwd")
|
|
check("target_address", "1.2.3.4", "5.6.7.8")
|
|
check("target_port", 4567, 1199)
|
|
|
|
check = _make_checker(channel3)
|
|
check("type", "spiceport")
|
|
check("source.channel", "org.spice-space.webdav.0", "test.1")
|
|
check("target_type", "virtio")
|
|
check("target_name", "org.spice-space.webdav.0", "test.2")
|
|
assert channel3.get_xml_id() == "./devices/channel[3]"
|
|
assert channel3.get_xml_idx() == 2
|
|
|
|
_alter_compare(conn, guest.get_xml(), outfile)
|
|
|
|
|
|
def testAlterNics():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
guest, outfile = _get_test_content(conn, "change-nics")
|
|
|
|
dev1 = guest.devices.interface[0]
|
|
dev2 = guest.devices.interface[1]
|
|
dev3 = guest.devices.interface[2]
|
|
dev4 = guest.devices.interface[3]
|
|
dev5 = guest.devices.interface[4]
|
|
|
|
check = _make_checker(dev1)
|
|
check("type", "user")
|
|
check("model", None, "vmxnet3")
|
|
check("source", None, None,)
|
|
check("macaddr", "22:11:11:11:11:11", "AA:AA:AA:AA:AA:AA")
|
|
check("filterref", None, "foo")
|
|
|
|
check = _make_checker(dev2)
|
|
check("source", "default", None)
|
|
check("type", "network", "bridge")
|
|
check("source", None, "newbr0")
|
|
check("model", "e1000", "virtio")
|
|
|
|
check = _make_checker(dev3)
|
|
check("type", "bridge")
|
|
check("source", "foobr0", "newfoo0")
|
|
check("macaddr", "22:22:22:22:22:22")
|
|
check("target_dev", None, "test1")
|
|
|
|
check = _make_checker(dev4)
|
|
check("type", "ethernet")
|
|
check("target_dev", "nic02", "nic03")
|
|
check("target_dev", "nic03", None)
|
|
|
|
check = _make_checker(dev5)
|
|
check("type", "direct")
|
|
check("source", "eth0.1")
|
|
check("source_mode", "vepa", "bridge")
|
|
check("portgroup", None, "sales")
|
|
check("driver_name", None, "vhost")
|
|
check("driver_queues", None, 5)
|
|
|
|
virtualport = dev5.virtualport
|
|
check = _make_checker(virtualport)
|
|
check("type", "802.1Qbg", "foo", "802.1Qbg")
|
|
check("managerid", 12, 11)
|
|
check("typeid", 1193046, 1193047)
|
|
check("typeidversion", 1, 2)
|
|
check("instanceid", "09b11c53-8b5c-4eeb-8f00-d84eaa0aaa3b",
|
|
"09b11c53-8b5c-4eeb-8f00-d84eaa0aaa4f")
|
|
|
|
_alter_compare(conn, guest.get_xml(), outfile)
|
|
|
|
|
|
def testQEMUXMLNS():
|
|
kvmconn = utils.URIs.open_kvm()
|
|
basename = "change-xmlns-qemu"
|
|
infile = DATADIR + "%s-in.xml" % basename
|
|
outfile = DATADIR + "%s-out.xml" % basename
|
|
guest = virtinst.Guest(kvmconn, parsexml=open(infile).read())
|
|
|
|
check = _make_checker(guest.xmlns_qemu.args[0])
|
|
check("value", "-somearg", "-somenewarg")
|
|
check = _make_checker(guest.xmlns_qemu.args[1])
|
|
check("value", "bar,baz=wib wob", "testval")
|
|
guest.xmlns_qemu.args.add_new().value = "additional-arg"
|
|
arg0 = guest.xmlns_qemu.args[0]
|
|
guest.xmlns_qemu.remove_child(guest.xmlns_qemu.args[0])
|
|
x = "<qemu:arg xmlns:qemu=\"http://libvirt.org/schemas/domain/qemu/1.0\" value=\"-somenewarg\"/>\n"
|
|
assert arg0.get_xml() == x
|
|
|
|
check = _make_checker(guest.xmlns_qemu.envs[0])
|
|
check("name", "SOMEENV")
|
|
check("value", "foo=bar baz,foo")
|
|
env = guest.xmlns_qemu.envs.add_new()
|
|
env.name = "DISPLAY"
|
|
env.value = "1:2"
|
|
|
|
_alter_compare(kvmconn, guest.get_xml(), outfile)
|
|
|
|
|
|
def testAddRemoveDevices():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
guest, outfile = _get_test_content(conn, "add-devices")
|
|
|
|
# Basic removal of existing device
|
|
rmdev = guest.devices.disk[2]
|
|
guest.remove_device(rmdev)
|
|
|
|
# Basic device add
|
|
d = virtinst.DeviceWatchdog(conn)
|
|
d.set_defaults(guest)
|
|
guest.add_device(d)
|
|
|
|
# Test adding device with child properties (address value)
|
|
adddev = virtinst.DeviceInterface(conn)
|
|
adddev.type = "network"
|
|
adddev.source = "default"
|
|
adddev.macaddr = "1A:2A:3A:4A:5A:6A"
|
|
adddev.address.type = "spapr-vio"
|
|
adddev.set_defaults(guest)
|
|
|
|
# Test adding and removing the same device
|
|
guest.add_device(adddev)
|
|
guest.remove_device(adddev)
|
|
guest.add_device(adddev)
|
|
|
|
# Test adding device built from parsed XML
|
|
guest.add_device(virtinst.DeviceSound(conn,
|
|
parsexml="""<sound model='pcspk'/>"""))
|
|
|
|
_alter_compare(conn, guest.get_xml(), outfile)
|
|
|
|
|
|
def testChangeKVMMedia():
|
|
kvmconn = utils.URIs.open_kvm()
|
|
guest, outfile = _get_test_content(kvmconn, "change-media")
|
|
|
|
disk = guest.devices.disk[0]
|
|
check = _make_checker(disk)
|
|
check("path", None, "/dev/default-pool/default-vol")
|
|
disk.sync_path_props()
|
|
|
|
disk = guest.devices.disk[1]
|
|
check = _make_checker(disk)
|
|
check("path", None, "/dev/default-pool/default-vol")
|
|
check("path", "/dev/default-pool/default-vol", "/dev/disk-pool/diskvol1")
|
|
disk.sync_path_props()
|
|
|
|
disk = guest.devices.disk[2]
|
|
check = _make_checker(disk)
|
|
check("path", None, "/dev/disk-pool/diskvol1")
|
|
disk.sync_path_props()
|
|
|
|
disk = guest.devices.disk[3]
|
|
check = _make_checker(disk)
|
|
check("path", None, "/dev/default-pool/default-vol")
|
|
disk.sync_path_props()
|
|
|
|
disk = guest.devices.disk[4]
|
|
check = _make_checker(disk)
|
|
check("path", None, "/dev/disk-pool/diskvol1")
|
|
disk.sync_path_props()
|
|
|
|
_alter_compare(kvmconn, guest.get_xml(), outfile)
|
|
|
|
|
|
def testGuestBootorder():
|
|
kvmconn = utils.URIs.open_kvm()
|
|
guest, outfile = _get_test_content(kvmconn, "bootorder")
|
|
|
|
assert guest.get_boot_order() == ['./devices/disk[1]']
|
|
assert guest.get_boot_order(legacy=True) == ['hd']
|
|
|
|
legacy_order = ['hd', 'fd', 'cdrom', 'network']
|
|
dev_order = ['./devices/disk[1]',
|
|
'./devices/disk[3]',
|
|
'./devices/disk[2]',
|
|
'./devices/interface[1]']
|
|
guest.set_boot_order(legacy_order, legacy=True)
|
|
assert guest.get_boot_order() == dev_order
|
|
assert guest.get_boot_order(legacy=True) == legacy_order
|
|
|
|
guest.set_boot_order(dev_order)
|
|
assert guest.get_boot_order() == dev_order
|
|
assert guest.get_boot_order(legacy=True) == []
|
|
|
|
_alter_compare(kvmconn, guest.get_xml(), outfile)
|
|
|
|
|
|
def testDiskChangeBus():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
guest, outfile = _get_test_content(conn, "disk-change-bus")
|
|
|
|
disk = guest.devices.disk[0]
|
|
|
|
# Same bus is a no-op
|
|
origxml = disk.get_xml()
|
|
disk.change_bus(guest, "virtio")
|
|
assert origxml == disk.get_xml()
|
|
disk.change_bus(guest, "ide")
|
|
|
|
disk = guest.devices.disk[2]
|
|
disk.change_bus(guest, "scsi")
|
|
|
|
_alter_compare(conn, guest.get_xml(), outfile)
|
|
|
|
|
|
##################
|
|
# Snapshot tests #
|
|
##################
|
|
|
|
|
|
def testChangeSnapshot():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
basename = "change-snapshot"
|
|
infile = DATADIR + "%s-in.xml" % basename
|
|
outfile = DATADIR + "%s-out.xml" % basename
|
|
snap = virtinst.DomainSnapshot(conn, parsexml=open(infile).read())
|
|
|
|
check = _make_checker(snap)
|
|
check("name", "offline-root-child1", "name-foo")
|
|
check("state", "shutoff", "somestate")
|
|
check("description", "offline desk", "foo\nnewline\n indent")
|
|
check("parent", "offline-root", "newparent")
|
|
check("creationTime", 1375905916, 1234)
|
|
check("memory_type", "no", "internal")
|
|
|
|
check = _make_checker(snap.disks[0])
|
|
check("name", "hda", "hdb")
|
|
check("snapshot", "internal", "no")
|
|
|
|
utils.diff_compare(snap.get_xml(), outfile)
|
|
|
|
|
|
#################
|
|
# Storage tests #
|
|
#################
|
|
|
|
|
|
def testFSPool():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
basename = "pool-fs"
|
|
infile = DATADIR + "%s.xml" % basename
|
|
outfile = DATADIR + "%s-out.xml" % basename
|
|
pool = virtinst.StoragePool(conn, parsexml=open(infile).read())
|
|
|
|
check = _make_checker(pool)
|
|
check("type", "fs", "dir")
|
|
check("name", "pool-fs", "foo-new")
|
|
check("uuid", "10211510-2115-1021-1510-211510211510",
|
|
"10211510-2115-1021-1510-211510211999")
|
|
check("capacity", 984373075968, 200000)
|
|
check("allocation", 756681687040, 150000)
|
|
check("available", 227691388928, 50000)
|
|
|
|
check("format", "auto", "ext3")
|
|
check("source_path", "/some/source/path", "/dev/foo/bar")
|
|
check("target_path", "/some/target/path", "/mnt/my/foo")
|
|
check("source_name", None, "fooname")
|
|
|
|
utils.diff_compare(pool.get_xml(), outfile)
|
|
utils.test_create(conn, pool.get_xml(), "storagePoolDefineXML")
|
|
|
|
|
|
def testISCSIPool():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
basename = "pool-iscsi"
|
|
infile = utils.DATADIR + "/storage/%s.xml" % basename
|
|
outfile = DATADIR + "%s-out.xml" % basename
|
|
pool = virtinst.StoragePool(conn, parsexml=open(infile).read())
|
|
|
|
check = _make_checker(pool)
|
|
check("iqn", "foo.bar.baz.iqn", "my.iqn")
|
|
check = _make_checker(pool.hosts[0])
|
|
check("name", "some.random.hostname", "my.host")
|
|
|
|
utils.diff_compare(pool.get_xml(), outfile)
|
|
utils.test_create(conn, pool.get_xml(), "storagePoolDefineXML")
|
|
|
|
|
|
def testGlusterPool():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
basename = "pool-gluster"
|
|
infile = utils.DATADIR + "/storage/%s.xml" % basename
|
|
outfile = DATADIR + "%s-out.xml" % basename
|
|
pool = virtinst.StoragePool(conn, parsexml=open(infile).read())
|
|
|
|
check = _make_checker(pool)
|
|
check("source_path", "/some/source/path", "/foo")
|
|
check = _make_checker(pool.hosts[0])
|
|
check("name", "some.random.hostname", "my.host")
|
|
|
|
utils.diff_compare(pool.get_xml(), outfile)
|
|
utils.test_create(conn, pool.get_xml(), "storagePoolDefineXML")
|
|
|
|
|
|
def testRBDPool():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
basename = "pool-rbd"
|
|
infile = DATADIR + "%s.xml" % basename
|
|
outfile = DATADIR + "%s-out.xml" % basename
|
|
pool = virtinst.StoragePool(conn, parsexml=open(infile).read())
|
|
|
|
check = _make_checker(pool.hosts[0])
|
|
check("name", "ceph-mon-1.example.com")
|
|
check("port", 6789, 1234)
|
|
check = _make_checker(pool.hosts[1])
|
|
check("name", "ceph-mon-2.example.com", "foo.bar")
|
|
check("port", 6789)
|
|
check = _make_checker(pool.hosts[2])
|
|
check("name", "ceph-mon-3.example.com")
|
|
check("port", 6789, 1000)
|
|
hostobj = pool.hosts.add_new()
|
|
hostobj.name = "frobber"
|
|
hostobj.port = "5555"
|
|
|
|
utils.diff_compare(pool.get_xml(), outfile)
|
|
utils.test_create(conn, pool.get_xml(), "storagePoolDefineXML")
|
|
|
|
|
|
def testVol():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
basename = "pool-dir-vol"
|
|
infile = DATADIR + "%s-in.xml" % basename
|
|
outfile = DATADIR + "%s-out.xml" % basename
|
|
vol = virtinst.StorageVolume(conn, parsexml=open(infile).read())
|
|
|
|
check = _make_checker(vol)
|
|
check("type", None, "file")
|
|
check("key", None, "fookey")
|
|
check("capacity", 10737418240, 2000)
|
|
check("allocation", 5368709120, 1000)
|
|
check("format", "raw", "qcow2")
|
|
check("target_path", None, "/foo/bar")
|
|
check("backing_store", "/foo/bar/baz", "/my/backing")
|
|
check("lazy_refcounts", False, True)
|
|
|
|
check = _make_checker(vol.permissions)
|
|
check("mode", "0700", "0744")
|
|
check("owner", "10736", "10000")
|
|
check("group", "10736", "10000")
|
|
check("label", None, "foo.label")
|
|
|
|
utils.diff_compare(vol.get_xml(), outfile)
|
|
|
|
|
|
###################
|
|
# <network> tests #
|
|
###################
|
|
|
|
|
|
def testNetMulti():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
basename = "network-multi"
|
|
infile = DATADIR + "%s-in.xml" % basename
|
|
outfile = DATADIR + "%s-out.xml" % basename
|
|
net = virtinst.Network(conn, parsexml=open(infile).read())
|
|
|
|
check = _make_checker(net)
|
|
check("name", "ipv6_multirange", "new-foo")
|
|
check("uuid", "41b4afe4-87bb-8087-6724-5e208a2d483a",
|
|
"41b4afe4-87bb-8087-6724-5e208a2d1111")
|
|
check("bridge", "virbr3", "virbr3new")
|
|
check("stp", True, False)
|
|
check("delay", 0, 2)
|
|
check("domain_name", "net7", "newdom")
|
|
check("ipv6", None, True)
|
|
check("macaddr", None, "52:54:00:69:eb:FF")
|
|
check("virtualport_type", None, "openvswitch")
|
|
|
|
assert len(net.portgroups) == 2
|
|
check = _make_checker(net.portgroups[0])
|
|
check("name", "engineering", "foo")
|
|
check("default", True, False)
|
|
|
|
assert len(net.ips) == 4
|
|
check = _make_checker(net.ips[0])
|
|
check("address", "192.168.7.1", "192.168.8.1")
|
|
check("netmask", "255.255.255.0", "255.255.254.0")
|
|
assert net.can_pxe() is False
|
|
check("tftp", None, "/var/lib/tftproot")
|
|
check("bootp_file", None, "pxeboot.img")
|
|
check("bootp_server", None, "1.2.3.4")
|
|
assert net.can_pxe() is True
|
|
|
|
check = _make_checker(net.forward)
|
|
check("mode", "nat", "route")
|
|
check("dev", None, "eth22")
|
|
assert net.can_pxe() is True
|
|
|
|
check = _make_checker(net.ips[0].ranges[0])
|
|
check("start", "192.168.7.128", "192.168.8.128")
|
|
check("end", "192.168.7.254", "192.168.8.254")
|
|
|
|
check = _make_checker(net.ips[0].hosts[1])
|
|
check("macaddr", "52:54:00:69:eb:91", "52:54:00:69:eb:92")
|
|
check("name", "badbob", "newname")
|
|
check("ip", "192.168.7.3", "192.168.8.3")
|
|
|
|
check = _make_checker(net.ips[1])
|
|
check("family", "ipv6", "ipv6")
|
|
check("prefix", 64, 63)
|
|
|
|
r = net.routes.add_new()
|
|
r.family = "ipv4"
|
|
r.address = "192.168.8.0"
|
|
r.prefix = "24"
|
|
r.gateway = "192.168.8.10"
|
|
check = _make_checker(r)
|
|
check("netmask", None, "foo", None)
|
|
|
|
utils.diff_compare(net.get_xml(), outfile)
|
|
utils.test_create(conn, net.get_xml(), "networkDefineXML")
|
|
|
|
|
|
def testNetOpen():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
basename = "network-open"
|
|
infile = DATADIR + "%s-in.xml" % basename
|
|
outfile = DATADIR + "%s-out.xml" % basename
|
|
net = virtinst.Network(conn, parsexml=open(infile).read())
|
|
|
|
check = _make_checker(net)
|
|
check("name", "open", "new-foo")
|
|
check("domain_name", "open", "newdom")
|
|
|
|
check = _make_checker(net.forward)
|
|
check("mode", "open")
|
|
check("dev", None)
|
|
|
|
assert len(net.ips) == 1
|
|
check = _make_checker(net.ips[0])
|
|
check("address", "192.168.100.1", "192.168.101.1")
|
|
check("netmask", "255.255.255.0", "255.255.254.0")
|
|
|
|
check = _make_checker(net.ips[0].ranges[0])
|
|
check("start", "192.168.100.128", "192.168.101.128")
|
|
check("end", "192.168.100.254", "192.168.101.254")
|
|
|
|
utils.diff_compare(net.get_xml(), outfile)
|
|
utils.test_create(conn, net.get_xml(), "networkDefineXML")
|
|
|
|
|
|
def testNetVfPool():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
basename = "network-vf-pool"
|
|
infile = DATADIR + "%s-in.xml" % basename
|
|
outfile = DATADIR + "%s-out.xml" % basename
|
|
net = virtinst.Network(conn, parsexml=open(infile).read())
|
|
|
|
check = _make_checker(net)
|
|
check("name", "passthrough", "new-foo")
|
|
|
|
check = _make_checker(net.forward)
|
|
check("mode", "hostdev")
|
|
check("managed", "yes")
|
|
|
|
check = _make_checker(net.forward.pf[0])
|
|
check("dev", "eth3")
|
|
|
|
utils.diff_compare(net.get_xml(), outfile)
|
|
utils.test_create(conn, net.get_xml(), "networkDefineXML")
|
|
|
|
|
|
##############
|
|
# Misc tests #
|
|
##############
|
|
|
|
|
|
def testCPUUnknownClear():
|
|
# Make sure .clear() even removes XML elements we don't know about
|
|
kvmconn = utils.URIs.open_kvm()
|
|
basename = "clear-cpu-unknown-vals"
|
|
infile = DATADIR + "%s-in.xml" % basename
|
|
outfile = DATADIR + "%s-out.xml" % basename
|
|
guest = virtinst.Guest(kvmconn, parsexml=open(infile).read())
|
|
|
|
guest.cpu.copy_host_cpu(guest)
|
|
guest.cpu.clear()
|
|
utils.diff_compare(guest.get_xml(), outfile)
|
|
|
|
|
|
def testDomainRoundtrip():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
# Make sure our XML engine doesn't mangle non-libvirt XML bits
|
|
infile = DATADIR + "domain-roundtrip.xml"
|
|
outfile = DATADIR + "domain-roundtrip.xml"
|
|
guest = virtinst.Guest(conn, parsexml=open(infile).read())
|
|
|
|
utils.diff_compare(guest.get_xml(), outfile)
|
|
|
|
|
|
def testYesNoUnexpectedParse():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
# Make sure that if we see an unexpected yes/no or on/off value,
|
|
# we just return it to the user and don't error. Libvirt could
|
|
# change our assumptions and we shouldn't be too restrictive
|
|
xml = ("<hostdev managed='foo'>\n <rom bar='wibble'/>\n"
|
|
" <source><address bus='hello'/></source>\n</hostdev>")
|
|
dev = virtinst.DeviceHostdev(conn, parsexml=xml)
|
|
|
|
assert dev.managed == "foo"
|
|
assert dev.rom_bar == "wibble"
|
|
assert dev.scsi_bus == "hello"
|
|
|
|
dev.managed = "test1"
|
|
dev.rom_bar = "test2"
|
|
assert dev.managed == "test1"
|
|
assert dev.rom_bar == "test2"
|
|
|
|
with pytest.raises(ValueError):
|
|
dev.scsi_bus = "goodbye"
|
|
|
|
|
|
def testXMLBuilderCoverage():
|
|
"""
|
|
Test XMLBuilder corner cases
|
|
"""
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
|
|
with pytest.raises(RuntimeError, match=".*'foo'.*"):
|
|
# Ensure we validate root element
|
|
virtinst.DeviceDisk(conn, parsexml="<foo/>")
|
|
|
|
with pytest.raises(Exception, match=".*xmlParseDoc.*"):
|
|
# Ensure we validate root element
|
|
virtinst.DeviceDisk(conn, parsexml=-1)
|
|
|
|
with pytest.raises(virtinst.xmlutil.DevError):
|
|
raise virtinst.xmlutil.DevError("for coverage")
|
|
|
|
with pytest.raises(ValueError):
|
|
virtinst.DeviceDisk.validate_generic_name("objtype", None)
|
|
|
|
with pytest.raises(ValueError):
|
|
virtinst.DeviceDisk.validate_generic_name("objtype", "foo bar")
|
|
|
|
# Test property __repr__ for code coverage
|
|
assert "DeviceAddress" in str(virtinst.DeviceDisk.address)
|
|
assert "./driver/@cache" in str(virtinst.DeviceDisk.driver_cache)
|
|
|
|
# Conversion of 0x value into int
|
|
xml = """
|
|
<controller type='scsi'>
|
|
<address type='pci' bus='0x00' slot='0x04' function='0x7'/>
|
|
</controller>
|
|
"""
|
|
dev = virtinst.DeviceController(conn, parsexml=xml)
|
|
assert dev.address.slot == 4
|
|
|
|
# Some XML formatting and get_xml_* corner cases
|
|
conn = utils.URIs.openconn(utils.URIs.test_suite)
|
|
xml = conn.lookupByName("test-for-virtxml").XMLDesc(0)
|
|
guest = virtinst.Guest(conn, parsexml=xml)
|
|
|
|
assert guest.features.get_xml().startswith("<features")
|
|
assert guest.clock.get_xml().startswith("<clock")
|
|
assert guest.seclabels[0].get_xml().startswith("<seclabel")
|
|
assert guest.cpu.get_xml().startswith("<cpu")
|
|
assert guest.os.get_xml().startswith("<os")
|
|
assert guest.cpu.get_xml_id() == "./cpu"
|
|
assert guest.cpu.get_xml_idx() == 0
|
|
assert guest.get_xml_id() == "."
|
|
assert guest.get_xml_idx() == 0
|
|
|
|
assert guest.devices.disk[1].get_xml_id() == "./devices/disk[2]"
|
|
assert guest.devices.disk[1].get_xml_idx() == 1
|
|
|
|
|
|
def testReplaceChildParse():
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
buildfile = DATADIR + "replace-child-build.xml"
|
|
parsefile = DATADIR + "replace-child-parse.xml"
|
|
|
|
def mkdisk(target):
|
|
disk = virtinst.DeviceDisk(conn)
|
|
disk.device = "cdrom"
|
|
disk.bus = "scsi"
|
|
disk.target = target
|
|
return disk
|
|
|
|
guest = virtinst.Guest(conn)
|
|
guest.add_device(mkdisk("sda"))
|
|
guest.add_device(mkdisk("sdb"))
|
|
guest.add_device(mkdisk("sdc"))
|
|
guest.add_device(mkdisk("sdd"))
|
|
guest.add_device(mkdisk("sde"))
|
|
guest.add_device(mkdisk("sdf"))
|
|
guest.devices.replace_child(guest.devices.disk[2], mkdisk("sdz"))
|
|
guest.set_defaults(guest)
|
|
utils.diff_compare(guest.get_xml(), buildfile)
|
|
|
|
guest = virtinst.Guest(conn, parsexml=guest.get_xml())
|
|
newdisk = virtinst.DeviceDisk(conn,
|
|
parsexml=mkdisk("sdw").get_xml())
|
|
guest.devices.replace_child(guest.devices.disk[4], newdisk)
|
|
utils.diff_compare(guest.get_xml(), parsefile)
|
|
|
|
|
|
def testGuestXMLDeviceMatch():
|
|
"""
|
|
Test Guest.find_device and Device.compare_device
|
|
"""
|
|
uri = utils.URIs.test_suite
|
|
conn = utils.URIs.openconn(uri)
|
|
dom = conn.lookupByName("test-for-virtxml")
|
|
xml = dom.XMLDesc(0)
|
|
guest = virtinst.Guest(conn, xml)
|
|
guest2 = virtinst.Guest(conn, xml)
|
|
|
|
# Assert id matching works
|
|
diskdev = guest.devices.disk[0]
|
|
assert guest.find_device(diskdev) == diskdev
|
|
|
|
# Assert type checking correct returns False
|
|
ifacedev = guest.devices.interface[0]
|
|
assert ifacedev.compare_device(diskdev, 0) is False
|
|
|
|
# find_device should fail here
|
|
nodev = virtinst.DeviceWatchdog(conn)
|
|
assert guest.find_device(nodev) is None
|
|
|
|
# Ensure parsed XML devices match correctly
|
|
for srcdev in guest.devices.get_all():
|
|
devxml = srcdev.get_xml()
|
|
newdev = srcdev.__class__(conn, devxml)
|
|
if srcdev != guest.find_device(newdev):
|
|
raise AssertionError("guest.find_device failed for dev=%s" %
|
|
newdev)
|
|
|
|
# Ensure devices from another parsed XML doc compare correctly
|
|
for srcdev in guest.devices.get_all():
|
|
if not guest2.find_device(srcdev):
|
|
raise AssertionError("guest.find_device failed for dev=%s" %
|
|
srcdev)
|
|
|
|
|
|
def testControllerAttachedDevices():
|
|
"""
|
|
Test DeviceController.get_attached_devices
|
|
"""
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
xml = open(DATADIR + "controller-attached-devices.xml").read()
|
|
guest = virtinst.Guest(conn, xml)
|
|
|
|
# virtio-serial path
|
|
controller = [c for c in guest.devices.controller if
|
|
c.type == "virtio-serial"][0]
|
|
devs = controller.get_attached_devices(guest)
|
|
assert len(devs) == 4
|
|
assert devs[-1].DEVICE_TYPE == "console"
|
|
|
|
# disk path
|
|
controller = [c for c in guest.devices.controller if
|
|
c.type == "sata"][0]
|
|
devs = controller.get_attached_devices(guest)
|
|
assert len(devs) == 1
|
|
assert devs[-1].device == "cdrom"
|
|
|
|
# Little test for DeviceAddress.pretty_desc
|
|
assert devs[-1].address.pretty_desc() == "0:0:0:3"
|