2014-02-06 15:12:12 +04:00
# Copyright (C) 2013, 2014 Red Hat, Inc.
2013-03-18 01:06:52 +04:00
#
2018-04-04 16:35:41 +03:00
# This work is licensed under the GNU GPLv2 or later.
2018-03-20 22:00:02 +03:00
# See the COPYING file in the top-level directory.
2013-03-18 01:06:52 +04:00
2020-09-18 21:56:36 +03:00
import pytest
2013-03-18 01:06:52 +04:00
import virtinst
2013-04-11 03:48:07 +04:00
from tests import utils
2013-03-18 01:06:52 +04:00
2013-04-13 22:34:52 +04:00
2020-08-30 15:54:41 +03:00
DATADIR = utils . DATADIR + " /xmlparse/ "
2020-01-27 02:11:43 +03:00
2020-09-18 23:26:28 +03:00
####################
# Helper functions #
####################
def _sanitize_file_xml ( xml ) :
2013-03-18 01:06:52 +04:00
# s/"/'/g from generated XML, matches what libxml dumps out
# This won't work all the time, but should be good enough for testing
return xml . replace ( " ' " , " \" " )
2013-04-13 22:34:52 +04:00
2020-09-18 23:26:28 +03:00
def _alter_compare ( conn , actualXML , outfile ) :
utils . diff_compare ( actualXML , outfile )
utils . test_create ( conn , actualXML )
def _set_and_check ( obj , param , initval , * args ) :
"""
Check expected initial value obj . param == initval , then
set newval , and make sure it is returned properly
"""
curval = virtinst . xmlutil . get_prop_path ( obj , param )
assert initval == curval
for newval in args :
virtinst . xmlutil . set_prop_path ( obj , param , newval )
2019-06-08 01:21:24 +03:00
curval = virtinst . xmlutil . get_prop_path ( obj , param )
2020-09-18 23:26:28 +03:00
assert newval == curval
def _make_checker ( obj ) :
def check ( name , initval , * args ) :
return _set_and_check ( obj , name , initval , * args )
return check
def _gen_outfile_path ( basename ) :
"""
Returns relative path to the file containing the expected XML
output
"""
return DATADIR + " {!s} -out.xml " . format ( basename )
def _get_test_content ( conn , basename ) :
infile = DATADIR + " %s -in.xml " % basename
outfile = _gen_outfile_path ( basename )
guest = virtinst . Guest ( conn , parsexml = open ( infile ) . read ( ) )
return guest , outfile
##############
# Test cases #
##############
def testAlterGuest ( ) :
"""
Test changing Guest ( ) parameters after parsing
"""
conn = utils . URIs . open_testdefault_cached ( )
guest , outfile = _get_test_content ( conn , " change-guest " )
check = _make_checker ( guest )
# Check specific vcpu_current behaviro
check ( " vcpus " , 5 , 10 )
assert guest . vcpu_current is None
check ( " vcpu_current " , None , 15 )
guest . vcpus = 12
assert guest . vcpu_current == 12
guest . vcpu_current = 10
check ( " name " , " TestGuest " , " change_name " )
check ( " id " , None , 1234 )
check ( " description " , None , " Hey desc changed& " )
check ( " title " , None , " Hey title changed! " )
check ( " vcpu_cpuset " , " 1-3 " , " 1-8,^6 " , " 1-5,15 " )
check ( " memory " , 409600 , 512000 )
check ( " currentMemory " , 204800 , 1024000 )
check ( " memory " , 1024000 , 2048000 )
check ( " uuid " , " 12345678-1234-1234-1234-123456789012 " ,
" 11111111-2222-3333-4444-555555555555 " )
check ( " emulator " , " /usr/lib/xen/bin/qemu-dm " , " /usr/binnnn/fooemu " )
check ( " type " , " kvm " , " test " )
check ( " bootloader " , None , " pygrub " )
check ( " on_poweroff " , " destroy " , " restart " )
check ( " on_reboot " , " restart " , " destroy " )
check ( " on_crash " , " destroy " , " restart " )
check ( " on_lockfailure " , " poweroff " , " restart " )
check = _make_checker ( guest . _metadata . libosinfo ) # pylint: disable=protected-access
check ( " os_id " , " http://fedoraproject.org/fedora/17 " )
guest . set_os_name ( " fedora10 " )
check ( " os_id " , " http://fedoraproject.org/fedora/10 " )
assert guest . osinfo . name == " fedora10 "
guest . set_os_name ( " generic " )
check ( " os_id " , None , " frib " )
assert guest . osinfo . name == " generic "
check = _make_checker ( guest . clock )
check ( " offset " , " utc " , " localtime " )
guest . clock . remove_child ( guest . clock . timers [ 0 ] )
check = _make_checker ( guest . clock . timers [ 0 ] )
check ( " name " , " pit " , " rtc " )
check ( " tickpolicy " , " delay " , " merge " )
timer = guest . clock . timers . add_new ( )
check = _make_checker ( timer )
check ( " name " , None , " hpet " )
check ( " present " , None , False )
check = _make_checker ( guest . pm )
check ( " suspend_to_mem " , False , True )
check ( " suspend_to_disk " , None , False )
check = _make_checker ( guest . os )
check ( " os_type " , " hvm " , " xen " )
check ( " arch " , " i686 " , None )
check ( " machine " , " foobar " , " pc-0.11 " )
check ( " loader " , None , " /foo/loader " )
check ( " init " , None , " /sbin/init " )
check ( " bootorder " , [ " hd " ] , [ " fd " ] )
check ( " enable_bootmenu " , None , False )
check ( " useserial " , None , True )
check ( " kernel " , None )
check ( " initrd " , None )
check ( " kernel_args " , None )
guest . os . set_initargs_string ( " foo ' bar baz ' frib " )
assert [ i . val for i in guest . os . initargs ] == [ " foo " , " bar baz " , " frib " ]
check = _make_checker ( guest . features )
check ( " acpi " , True , False )
check ( " apic " , True , True )
check ( " eoi " , None , True )
check ( " pae " , False , False )
check ( " viridian " , False , True )
check ( " hap " , False , False )
check ( " privnet " , False , False )
check ( " hyperv_relaxed " , None , True )
check ( " hyperv_vapic " , False , None )
check ( " hyperv_spinlocks " , True , True )
check ( " hyperv_spinlocks_retries " , 12287 , 54321 )
check ( " vmport " , False , True )
check ( " vmcoreinfo " , None , True )
check ( " kvm_hidden " , None , True )
check ( " pvspinlock " , None , True )
check ( " gic_version " , None , False )
check = _make_checker ( guest . cpu )
check ( " match " , " exact " , " strict " )
guest . cpu . set_model ( guest , " qemu64 " )
check ( " model " , " qemu64 " )
check ( " vendor " , " Intel " , " qemuvendor " )
check ( " topology.threads " , 2 , 1 )
check ( " topology.cores " , 5 , 3 )
guest . cpu . topology . sockets = 4.0
check ( " topology.sockets " , 4 )
check = _make_checker ( guest . cpu . features [ 0 ] )
check ( " name " , " x2apic " )
check ( " policy " , " force " , " disable " )
rmfeat = guest . cpu . features [ 3 ]
guest . cpu . remove_child ( rmfeat )
assert rmfeat . get_xml ( ) == """ <feature name= " foo " policy= " bar " /> \n """
guest . cpu . add_feature ( " addfeature " )
check = _make_checker ( guest . numatune )
check ( " memory_mode " , " interleave " , " strict " , None )
check ( " memory_nodeset " , " 1-5,^3,7 " , " 2,4,6 " )
check = _make_checker ( guest . memtune )
check ( " hard_limit " , None , 1024 , 2048 )
check ( " soft_limit " , None , 100 , 200 )
check ( " swap_hard_limit " , None , 300 , 400 )
check ( " min_guarantee " , None , 400 , 500 )
check = _make_checker ( guest . blkiotune )
check ( " weight " , None , 100 , 200 )
check = _make_checker ( guest . blkiotune . devices . add_new ( ) )
check ( " weight " , None , 300 )
check ( " path " , None , " /home/1.img " )
check = _make_checker ( guest . idmap )
check ( " uid_start " , None , 0 )
check ( " uid_target " , None , 1000 )
check ( " uid_count " , None , 10 )
check ( " gid_start " , None , 0 )
check ( " gid_target " , None , 1000 )
check ( " gid_count " , None , 10 )
check = _make_checker ( guest . resource )
check ( " partition " , None , " /virtualmachines/production " )
check = _make_checker ( guest . devices . memballoon [ 0 ] )
check ( " model " , " virtio " , " none " )
check = _make_checker ( guest . memoryBacking )
check ( " hugepages " , False , True )
check ( " nosharepages " , False , True )
check ( " locked " , False , True )
page = guest . memoryBacking . pages . add_new ( )
check = _make_checker ( page )
check ( " size " , None , 1 )
check ( " unit " , None , " G " )
assert guest . is_full_os_container ( ) is False
_alter_compare ( conn , guest . get_xml ( ) , outfile )
def testAlterCpuMode ( ) :
conn = utils . URIs . open_testdefault_cached ( )
xml = open ( DATADIR + " change-cpumode-in.xml " ) . read ( )
outfile = DATADIR + " change-cpumode-out.xml "
conn = utils . URIs . openconn ( utils . URIs . kvm_q35 )
guest = virtinst . Guest ( conn , xml )
check = _make_checker ( guest . cpu )
guest . cpu . model = " foo "
check ( " mode " , " host-passthrough " )
guest . cpu . check_security_features ( guest )
check ( " secure " , False )
guest . cpu . set_special_mode ( guest , " host-model " )
check ( " mode " , " host-model " )
guest . cpu . check_security_features ( guest )
check ( " secure " , False )
guest . cpu . set_model ( guest , " qemu64 " )
check ( " model " , " qemu64 " )
guest . cpu . check_security_features ( guest )
check ( " secure " , False )
# Test actually filling in security values, and removing them
guest . cpu . secure = True
guest . cpu . set_model ( guest , " Skylake-Client-IBRS " )
guest . cpu . check_security_features ( guest )
check ( " secure " , True )
guest . cpu . set_model ( guest , " EPYC-IBPB " )
guest . cpu . check_security_features ( guest )
check ( " secure " , True )
guest . cpu . secure = False
guest . cpu . set_model ( guest , " Skylake-Client-IBRS " )
guest . cpu . check_security_features ( guest )
check ( " secure " , False )
_alter_compare ( conn , guest . get_xml ( ) , outfile )
# Hits a codepath when domcaps don't provide the needed info
emptyconn = utils . URIs . open_testdefault_cached ( )
guest = virtinst . Guest ( emptyconn , xml )
guest . cpu . check_security_features ( guest )
assert guest . cpu . secure is False
def testAlterDisk ( ) :
"""
Test changing DeviceDisk ( ) parameters after parsing
"""
conn = utils . URIs . open_testdefault_cached ( )
guest , outfile = _get_test_content ( conn , " change-disk " )
def _get_disk ( target ) :
for disk in guest . devices . disk :
if disk . target == target :
return disk
disk = _get_disk ( " hda " )
check = _make_checker ( disk )
check ( " path " , " /tmp/test.img " , " /dev/foo/null " )
disk . sync_path_props ( )
check ( " driver_name " , None , " test " )
check ( " driver_type " , None , " raw " )
check ( " serial " , " WD-WMAP9A966149 " , " frob " )
check ( " wwn " , None , " 123456789abcdefa " )
check ( " bus " , " ide " , " usb " )
check ( " removable " , None , False , True )
disk = guest . devices . disk [ 1 ]
check = _make_checker ( disk . seclabels [ 1 ] )
check ( " model " , " dac " )
check ( " relabel " , None , True )
check ( " label " , None , " foo-my-label " )
disk = _get_disk ( " hdc " )
check = _make_checker ( disk )
check ( " type " , " block " , " dir " , " file " , " block " )
check ( " path " , " /dev/null " , None )
disk . sync_path_props ( )
check ( " device " , " cdrom " , " floppy " )
check ( " read_only " , True , False )
check ( " target " , " hdc " , " fde " )
check ( " bus " , " ide " , " fdc " )
check ( " error_policy " , " stop " , None )
disk = _get_disk ( " hdd " )
check = _make_checker ( disk )
check ( " type " , " block " )
check ( " device " , " lun " )
check ( " sgio " , None , " unfiltered " )
check ( " rawio " , None , " yes " )
disk = _get_disk ( " sda " )
check = _make_checker ( disk )
check ( " path " , None , " http://[1:2:3:4:5:6:7:8]:1122/my/file " )
disk . sync_path_props ( )
disk = _get_disk ( " fda " )
check = _make_checker ( disk )
check ( " path " , None , " /dev/default-pool/default-vol " )
disk . sync_path_props ( )
check ( " startup_policy " , None , " optional " )
check ( " shareable " , False , True )
check ( " driver_cache " , None , " writeback " )
check ( " driver_io " , None , " threads " )
check ( " driver_io " , " threads " , " native " )
check ( " driver_discard " , None , " unmap " )
check ( " driver_detect_zeroes " , None , " unmap " )
check ( " iotune_ris " , 1 , 0 )
check ( " iotune_rbs " , 2 , 0 )
check ( " iotune_wis " , 3 , 0 )
check ( " iotune_wbs " , 4 , 0 )
check ( " iotune_tis " , None , 5 )
check ( " iotune_tbs " , None , 6 )
check = _make_checker ( disk . boot )
check ( " order " , None , 7 , None )
disk = _get_disk ( " vdb " )
check = _make_checker ( disk )
check ( " source_pool " , " defaultPool " , " anotherPool " )
check ( " source_volume " , " foobar " , " newvol " )
disk = _get_disk ( " vdc " )
check = _make_checker ( disk )
check ( " source_protocol " , " rbd " , " gluster " )
check ( " source_name " , " pool/image " , " new-val/vol " )
check ( " source_host_name " , " mon1.example.org " , " diff.example.org " )
check ( " source_host_port " , 6321 , 1234 )
check ( " path " , " gluster://diff.example.org:1234/new-val/vol " )
disk = _get_disk ( " vdd " )
check = _make_checker ( disk )
check ( " source_protocol " , " nbd " )
check ( " source_host_transport " , " unix " )
check ( " source_host_socket " , " /var/run/nbdsock " )
check ( " path " , " nbd+unix:///var/run/nbdsock " )
_alter_compare ( conn , guest . get_xml ( ) , outfile )
def testAlterDevicesBootorder ( ) :
conn = utils . URIs . open_testdefault_cached ( )
basename = " change-devices-bootorder "
guest , outfile = _get_test_content ( conn , basename )
disk_1 = guest . devices . disk [ 0 ]
disk_2 = guest . devices . disk [ 1 ]
disk_3 = guest . devices . disk [ 2 ]
disk_4 = guest . devices . disk [ 3 ]
iface_1 = guest . devices . interface [ 0 ]
iface_2 = guest . devices . interface [ 1 ]
redirdev_1 = guest . devices . redirdev [ 0 ]
assert guest . os . bootorder == [ ' hd ' ]
assert disk_1 . boot . order is None
assert disk_2 . boot . order == 10
assert disk_3 . boot . order == 10
assert disk_4 . boot . order == 1
assert iface_1 . boot . order == 2
assert iface_2 . boot . order is None
assert redirdev_1 . boot . order == 3
guest . reorder_boot_order ( disk_1 , 1 )
assert guest . os . bootorder == [ ]
assert disk_1 . boot . order == 1
assert disk_2 . boot . order == 10
assert disk_3 . boot . order == 10
# verify that the used algorithm preserves the order of
# records with equal boot indices
assert disk_2 is guest . devices . disk [ 1 ]
assert disk_3 is guest . devices . disk [ 2 ]
assert disk_4 . boot . order == 2
assert iface_1 . boot . order == 3
assert iface_2 . boot . order is None
assert redirdev_1 . boot . order == 4
try :
_alter_compare ( conn , guest . get_xml ( ) , outfile )
except RuntimeError as error :
assert " unsupported configuration " in str ( error )
guest . reorder_boot_order ( disk_2 , 10 )
assert disk_2 . boot . order == 10
assert disk_3 . boot . order == 11
assert disk_2 is guest . devices . disk [ 1 ]
assert disk_3 is guest . devices . disk [ 2 ]
outfile = _gen_outfile_path ( " change-devices-bootorder-fixed " )
_alter_compare ( conn , guest . get_xml ( ) , outfile )
def testSingleDisk ( ) :
conn = utils . URIs . open_testdefault_cached ( )
xml = ( """ <disk type= " file " device= " disk " ><source file= " /a.img " /> \n """
""" <target dev= " hda " bus= " ide " /></disk> \n """ )
conn = utils . URIs . open_testdefault_cached ( )
d = virtinst . DeviceDisk ( conn , parsexml = xml )
_set_and_check ( d , " target " , " hda " , " hdb " )
assert xml . replace ( " hda " , " hdb " ) == d . get_xml ( )
def testAlterChars ( ) :
conn = utils . URIs . open_testdefault_cached ( )
guest , outfile = _get_test_content ( conn , " change-chars " )
serial1 = guest . devices . serial [ 0 ]
serial2 = guest . devices . serial [ 1 ]
parallel1 = guest . devices . parallel [ 0 ]
parallel2 = guest . devices . parallel [ 1 ]
console1 = guest . devices . console [ 0 ]
console2 = guest . devices . console [ 1 ]
channel1 = guest . devices . channel [ 0 ]
channel2 = guest . devices . channel [ 1 ]
channel3 = guest . devices . channel [ 2 ]
check = _make_checker ( serial1 )
check ( " type " , " null " , " udp " )
check ( " source.bind_host " , None , " example.com " )
check ( " source.bind_service " , None , 66 )
check ( " source.connect_host " , None , " example.com.uk " )
check ( " source.connect_service " , None , 77 )
check = _make_checker ( serial2 )
check ( " type " , " tcp " )
check ( " source.protocol " , " telnet " , " raw " )
check ( " source.mode " , " bind " , " connect " )
check = _make_checker ( parallel1 )
check ( " source.mode " , " bind " )
check ( " source.path " , " /tmp/foobar " , None )
check ( " type " , " unix " , " pty " )
check = _make_checker ( parallel2 )
check ( " type " , " udp " )
check ( " source.bind_service " , 1111 , 1357 )
check ( " source.bind_host " , " my.bind.host " , " my.foo.host " )
check ( " source.connect_service " , 2222 , 7777 )
check ( " source.connect_host " , " my.source.host " , " source.foo.host " )
check = _make_checker ( console1 )
check ( " type " , " pty " )
check ( " target_type " , None )
check = _make_checker ( console2 )
check ( " type " , " file " )
check ( " source.path " , " /tmp/foo.img " , None )
check ( " source.path " , None , " /root/foo " )
check ( " target_type " , " virtio " )
check ( " target_state " , None , " connected " )
check = _make_checker ( channel1 )
check ( " type " , " pty " )
check ( " target_type " , " virtio " , " bar " , " virtio " )
check ( " target_name " , " foo.bar.frob " , " test.changed " )
check = _make_checker ( channel2 )
check ( " type " , " unix " , " foo " , " unix " )
check ( " target_type " , " guestfwd " )
check ( " target_address " , " 1.2.3.4 " , " 5.6.7.8 " )
check ( " target_port " , 4567 , 1199 )
check = _make_checker ( channel3 )
check ( " type " , " spiceport " )
check ( " source.channel " , " org.spice-space.webdav.0 " , " test.1 " )
check ( " target_type " , " virtio " )
check ( " target_name " , " org.spice-space.webdav.0 " , " test.2 " )
assert channel3 . get_xml_id ( ) == " ./devices/channel[3] "
assert channel3 . get_xml_idx ( ) == 2
_alter_compare ( conn , guest . get_xml ( ) , outfile )
def testAlterNics ( ) :
conn = utils . URIs . open_testdefault_cached ( )
guest , outfile = _get_test_content ( conn , " change-nics " )
dev1 = guest . devices . interface [ 0 ]
dev2 = guest . devices . interface [ 1 ]
dev3 = guest . devices . interface [ 2 ]
dev4 = guest . devices . interface [ 3 ]
dev5 = guest . devices . interface [ 4 ]
check = _make_checker ( dev1 )
check ( " type " , " user " )
check ( " model " , None , " vmxnet3 " )
check ( " source " , None , None , )
check ( " macaddr " , " 22:11:11:11:11:11 " , " AA:AA:AA:AA:AA:AA " )
check ( " filterref " , None , " foo " )
check = _make_checker ( dev2 )
check ( " source " , " default " , None )
check ( " type " , " network " , " bridge " )
check ( " source " , None , " newbr0 " )
check ( " model " , " e1000 " , " virtio " )
check = _make_checker ( dev3 )
check ( " type " , " bridge " )
check ( " source " , " foobr0 " , " newfoo0 " )
check ( " macaddr " , " 22:22:22:22:22:22 " )
check ( " target_dev " , None , " test1 " )
check = _make_checker ( dev4 )
check ( " type " , " ethernet " )
check ( " target_dev " , " nic02 " , " nic03 " )
check ( " target_dev " , " nic03 " , None )
check = _make_checker ( dev5 )
check ( " type " , " direct " )
check ( " source " , " eth0.1 " )
check ( " source_mode " , " vepa " , " bridge " )
check ( " portgroup " , None , " sales " )
check ( " driver_name " , None , " vhost " )
check ( " driver_queues " , None , 5 )
virtualport = dev5 . virtualport
check = _make_checker ( virtualport )
check ( " type " , " 802.1Qbg " , " foo " , " 802.1Qbg " )
check ( " managerid " , 12 , 11 )
check ( " typeid " , 1193046 , 1193047 )
check ( " typeidversion " , 1 , 2 )
check ( " instanceid " , " 09b11c53-8b5c-4eeb-8f00-d84eaa0aaa3b " ,
" 09b11c53-8b5c-4eeb-8f00-d84eaa0aaa4f " )
_alter_compare ( conn , guest . get_xml ( ) , outfile )
def testQEMUXMLNS ( ) :
kvmconn = utils . URIs . open_kvm ( )
basename = " change-xmlns-qemu "
infile = DATADIR + " %s -in.xml " % basename
outfile = DATADIR + " %s -out.xml " % basename
guest = virtinst . Guest ( kvmconn , parsexml = open ( infile ) . read ( ) )
check = _make_checker ( guest . xmlns_qemu . args [ 0 ] )
check ( " value " , " -somearg " , " -somenewarg " )
check = _make_checker ( guest . xmlns_qemu . args [ 1 ] )
check ( " value " , " bar,baz=wib wob " , " testval " )
guest . xmlns_qemu . args . add_new ( ) . value = " additional-arg "
arg0 = guest . xmlns_qemu . args [ 0 ]
guest . xmlns_qemu . remove_child ( guest . xmlns_qemu . args [ 0 ] )
x = " <qemu:arg xmlns:qemu= \" http://libvirt.org/schemas/domain/qemu/1.0 \" value= \" -somenewarg \" /> \n "
assert arg0 . get_xml ( ) == x
check = _make_checker ( guest . xmlns_qemu . envs [ 0 ] )
check ( " name " , " SOMEENV " )
check ( " value " , " foo=bar baz,foo " )
env = guest . xmlns_qemu . envs . add_new ( )
env . name = " DISPLAY "
env . value = " 1:2 "
_alter_compare ( kvmconn , guest . get_xml ( ) , outfile )
def testAddRemoveDevices ( ) :
conn = utils . URIs . open_testdefault_cached ( )
guest , outfile = _get_test_content ( conn , " add-devices " )
# Basic removal of existing device
rmdev = guest . devices . disk [ 2 ]
guest . remove_device ( rmdev )
# Basic device add
d = virtinst . DeviceWatchdog ( conn )
d . set_defaults ( guest )
guest . add_device ( d )
# Test adding device with child properties (address value)
adddev = virtinst . DeviceInterface ( conn )
adddev . type = " network "
adddev . source = " default "
adddev . macaddr = " 1A:2A:3A:4A:5A:6A "
adddev . address . type = " spapr-vio "
adddev . set_defaults ( guest )
# Test adding and removing the same device
guest . add_device ( adddev )
guest . remove_device ( adddev )
guest . add_device ( adddev )
# Test adding device built from parsed XML
guest . add_device ( virtinst . DeviceSound ( conn ,
parsexml = """ <sound model= ' pcspk ' /> """ ) )
_alter_compare ( conn , guest . get_xml ( ) , outfile )
def testChangeKVMMedia ( ) :
kvmconn = utils . URIs . open_kvm ( )
guest , outfile = _get_test_content ( kvmconn , " change-media " )
disk = guest . devices . disk [ 0 ]
check = _make_checker ( disk )
check ( " path " , None , " /dev/default-pool/default-vol " )
disk . sync_path_props ( )
disk = guest . devices . disk [ 1 ]
check = _make_checker ( disk )
check ( " path " , None , " /dev/default-pool/default-vol " )
check ( " path " , " /dev/default-pool/default-vol " , " /dev/disk-pool/diskvol1 " )
disk . sync_path_props ( )
disk = guest . devices . disk [ 2 ]
check = _make_checker ( disk )
check ( " path " , None , " /dev/disk-pool/diskvol1 " )
disk . sync_path_props ( )
disk = guest . devices . disk [ 3 ]
check = _make_checker ( disk )
check ( " path " , None , " /dev/default-pool/default-vol " )
disk . sync_path_props ( )
disk = guest . devices . disk [ 4 ]
check = _make_checker ( disk )
check ( " path " , None , " /dev/disk-pool/diskvol1 " )
disk . sync_path_props ( )
_alter_compare ( kvmconn , guest . get_xml ( ) , outfile )
def testGuestBootorder ( ) :
kvmconn = utils . URIs . open_kvm ( )
guest , outfile = _get_test_content ( kvmconn , " bootorder " )
assert guest . get_boot_order ( ) == [ ' ./devices/disk[1] ' ]
assert guest . get_boot_order ( legacy = True ) == [ ' hd ' ]
legacy_order = [ ' hd ' , ' fd ' , ' cdrom ' , ' network ' ]
dev_order = [ ' ./devices/disk[1] ' ,
' ./devices/disk[3] ' ,
' ./devices/disk[2] ' ,
' ./devices/interface[1] ' ]
guest . set_boot_order ( legacy_order , legacy = True )
assert guest . get_boot_order ( ) == dev_order
assert guest . get_boot_order ( legacy = True ) == legacy_order
guest . set_boot_order ( dev_order )
assert guest . get_boot_order ( ) == dev_order
assert guest . get_boot_order ( legacy = True ) == [ ]
_alter_compare ( kvmconn , guest . get_xml ( ) , outfile )
def testDiskChangeBus ( ) :
conn = utils . URIs . open_testdefault_cached ( )
guest , outfile = _get_test_content ( conn , " disk-change-bus " )
disk = guest . devices . disk [ 0 ]
# Same bus is a no-op
origxml = disk . get_xml ( )
disk . change_bus ( guest , " virtio " )
assert origxml == disk . get_xml ( )
disk . change_bus ( guest , " ide " )
disk = guest . devices . disk [ 2 ]
disk . change_bus ( guest , " scsi " )
_alter_compare ( conn , guest . get_xml ( ) , outfile )
##################
# Snapshot tests #
##################
def testChangeSnapshot ( ) :
conn = utils . URIs . open_testdefault_cached ( )
basename = " change-snapshot "
infile = DATADIR + " %s -in.xml " % basename
outfile = DATADIR + " %s -out.xml " % basename
snap = virtinst . DomainSnapshot ( conn , parsexml = open ( infile ) . read ( ) )
check = _make_checker ( snap )
check ( " name " , " offline-root-child1 " , " name-foo " )
check ( " state " , " shutoff " , " somestate " )
check ( " description " , " offline desk " , " foo \n newline \n indent " )
check ( " parent " , " offline-root " , " newparent " )
check ( " creationTime " , 1375905916 , 1234 )
check ( " memory_type " , " no " , " internal " )
check = _make_checker ( snap . disks [ 0 ] )
check ( " name " , " hda " , " hdb " )
check ( " snapshot " , " internal " , " no " )
utils . diff_compare ( snap . get_xml ( ) , outfile )
#################
# Storage tests #
#################
def testFSPool ( ) :
conn = utils . URIs . open_testdefault_cached ( )
basename = " pool-fs "
infile = DATADIR + " %s .xml " % basename
outfile = DATADIR + " %s -out.xml " % basename
pool = virtinst . StoragePool ( conn , parsexml = open ( infile ) . read ( ) )
check = _make_checker ( pool )
check ( " type " , " fs " , " dir " )
check ( " name " , " pool-fs " , " foo-new " )
check ( " uuid " , " 10211510-2115-1021-1510-211510211510 " ,
" 10211510-2115-1021-1510-211510211999 " )
check ( " capacity " , 984373075968 , 200000 )
check ( " allocation " , 756681687040 , 150000 )
check ( " available " , 227691388928 , 50000 )
check ( " format " , " auto " , " ext3 " )
check ( " source_path " , " /some/source/path " , " /dev/foo/bar " )
check ( " target_path " , " /some/target/path " , " /mnt/my/foo " )
check ( " source_name " , None , " fooname " )
utils . diff_compare ( pool . get_xml ( ) , outfile )
utils . test_create ( conn , pool . get_xml ( ) , " storagePoolDefineXML " )
def testISCSIPool ( ) :
conn = utils . URIs . open_testdefault_cached ( )
basename = " pool-iscsi "
infile = utils . DATADIR + " /storage/ %s .xml " % basename
outfile = DATADIR + " %s -out.xml " % basename
pool = virtinst . StoragePool ( conn , parsexml = open ( infile ) . read ( ) )
check = _make_checker ( pool )
check ( " iqn " , " foo.bar.baz.iqn " , " my.iqn " )
check = _make_checker ( pool . hosts [ 0 ] )
check ( " name " , " some.random.hostname " , " my.host " )
utils . diff_compare ( pool . get_xml ( ) , outfile )
utils . test_create ( conn , pool . get_xml ( ) , " storagePoolDefineXML " )
def testGlusterPool ( ) :
conn = utils . URIs . open_testdefault_cached ( )
basename = " pool-gluster "
infile = utils . DATADIR + " /storage/ %s .xml " % basename
outfile = DATADIR + " %s -out.xml " % basename
pool = virtinst . StoragePool ( conn , parsexml = open ( infile ) . read ( ) )
check = _make_checker ( pool )
check ( " source_path " , " /some/source/path " , " /foo " )
check = _make_checker ( pool . hosts [ 0 ] )
check ( " name " , " some.random.hostname " , " my.host " )
utils . diff_compare ( pool . get_xml ( ) , outfile )
utils . test_create ( conn , pool . get_xml ( ) , " storagePoolDefineXML " )
def testRBDPool ( ) :
conn = utils . URIs . open_testdefault_cached ( )
basename = " pool-rbd "
infile = DATADIR + " %s .xml " % basename
outfile = DATADIR + " %s -out.xml " % basename
pool = virtinst . StoragePool ( conn , parsexml = open ( infile ) . read ( ) )
check = _make_checker ( pool . hosts [ 0 ] )
check ( " name " , " ceph-mon-1.example.com " )
check ( " port " , 6789 , 1234 )
check = _make_checker ( pool . hosts [ 1 ] )
check ( " name " , " ceph-mon-2.example.com " , " foo.bar " )
check ( " port " , 6789 )
check = _make_checker ( pool . hosts [ 2 ] )
check ( " name " , " ceph-mon-3.example.com " )
check ( " port " , 6789 , 1000 )
hostobj = pool . hosts . add_new ( )
hostobj . name = " frobber "
hostobj . port = " 5555 "
utils . diff_compare ( pool . get_xml ( ) , outfile )
utils . test_create ( conn , pool . get_xml ( ) , " storagePoolDefineXML " )
def testVol ( ) :
conn = utils . URIs . open_testdefault_cached ( )
basename = " pool-dir-vol "
infile = DATADIR + " %s -in.xml " % basename
outfile = DATADIR + " %s -out.xml " % basename
vol = virtinst . StorageVolume ( conn , parsexml = open ( infile ) . read ( ) )
check = _make_checker ( vol )
check ( " type " , None , " file " )
check ( " key " , None , " fookey " )
check ( " capacity " , 10737418240 , 2000 )
check ( " allocation " , 5368709120 , 1000 )
check ( " format " , " raw " , " qcow2 " )
check ( " target_path " , None , " /foo/bar " )
check ( " backing_store " , " /foo/bar/baz " , " /my/backing " )
check ( " lazy_refcounts " , False , True )
check = _make_checker ( vol . permissions )
check ( " mode " , " 0700 " , " 0744 " )
check ( " owner " , " 10736 " , " 10000 " )
check ( " group " , " 10736 " , " 10000 " )
check ( " label " , None , " foo.label " )
utils . diff_compare ( vol . get_xml ( ) , outfile )
###################
# <network> tests #
###################
def testNetMulti ( ) :
conn = utils . URIs . open_testdefault_cached ( )
basename = " network-multi "
infile = DATADIR + " %s -in.xml " % basename
outfile = DATADIR + " %s -out.xml " % basename
net = virtinst . Network ( conn , parsexml = open ( infile ) . read ( ) )
check = _make_checker ( net )
check ( " name " , " ipv6_multirange " , " new-foo " )
check ( " uuid " , " 41b4afe4-87bb-8087-6724-5e208a2d483a " ,
" 41b4afe4-87bb-8087-6724-5e208a2d1111 " )
check ( " bridge " , " virbr3 " , " virbr3new " )
check ( " stp " , True , False )
check ( " delay " , 0 , 2 )
check ( " domain_name " , " net7 " , " newdom " )
check ( " ipv6 " , None , True )
check ( " macaddr " , None , " 52:54:00:69:eb:FF " )
check ( " virtualport_type " , None , " openvswitch " )
assert len ( net . portgroups ) == 2
check = _make_checker ( net . portgroups [ 0 ] )
check ( " name " , " engineering " , " foo " )
check ( " default " , True , False )
assert len ( net . ips ) == 4
check = _make_checker ( net . ips [ 0 ] )
check ( " address " , " 192.168.7.1 " , " 192.168.8.1 " )
check ( " netmask " , " 255.255.255.0 " , " 255.255.254.0 " )
assert net . can_pxe ( ) is False
check ( " tftp " , None , " /var/lib/tftproot " )
check ( " bootp_file " , None , " pxeboot.img " )
check ( " bootp_server " , None , " 1.2.3.4 " )
assert net . can_pxe ( ) is True
check = _make_checker ( net . forward )
check ( " mode " , " nat " , " route " )
check ( " dev " , None , " eth22 " )
assert net . can_pxe ( ) is True
check = _make_checker ( net . ips [ 0 ] . ranges [ 0 ] )
check ( " start " , " 192.168.7.128 " , " 192.168.8.128 " )
check ( " end " , " 192.168.7.254 " , " 192.168.8.254 " )
check = _make_checker ( net . ips [ 0 ] . hosts [ 1 ] )
check ( " macaddr " , " 52:54:00:69:eb:91 " , " 52:54:00:69:eb:92 " )
check ( " name " , " badbob " , " newname " )
check ( " ip " , " 192.168.7.3 " , " 192.168.8.3 " )
check = _make_checker ( net . ips [ 1 ] )
check ( " family " , " ipv6 " , " ipv6 " )
check ( " prefix " , 64 , 63 )
r = net . routes . add_new ( )
r . family = " ipv4 "
r . address = " 192.168.8.0 "
r . prefix = " 24 "
r . gateway = " 192.168.8.10 "
check = _make_checker ( r )
check ( " netmask " , None , " foo " , None )
utils . diff_compare ( net . get_xml ( ) , outfile )
utils . test_create ( conn , net . get_xml ( ) , " networkDefineXML " )
def testNetOpen ( ) :
conn = utils . URIs . open_testdefault_cached ( )
basename = " network-open "
infile = DATADIR + " %s -in.xml " % basename
outfile = DATADIR + " %s -out.xml " % basename
net = virtinst . Network ( conn , parsexml = open ( infile ) . read ( ) )
check = _make_checker ( net )
check ( " name " , " open " , " new-foo " )
check ( " domain_name " , " open " , " newdom " )
check = _make_checker ( net . forward )
check ( " mode " , " open " )
check ( " dev " , None )
assert len ( net . ips ) == 1
check = _make_checker ( net . ips [ 0 ] )
check ( " address " , " 192.168.100.1 " , " 192.168.101.1 " )
check ( " netmask " , " 255.255.255.0 " , " 255.255.254.0 " )
check = _make_checker ( net . ips [ 0 ] . ranges [ 0 ] )
check ( " start " , " 192.168.100.128 " , " 192.168.101.128 " )
check ( " end " , " 192.168.100.254 " , " 192.168.101.254 " )
utils . diff_compare ( net . get_xml ( ) , outfile )
utils . test_create ( conn , net . get_xml ( ) , " networkDefineXML " )
def testNetVfPool ( ) :
conn = utils . URIs . open_testdefault_cached ( )
basename = " network-vf-pool "
infile = DATADIR + " %s -in.xml " % basename
outfile = DATADIR + " %s -out.xml " % basename
net = virtinst . Network ( conn , parsexml = open ( infile ) . read ( ) )
check = _make_checker ( net )
check ( " name " , " passthrough " , " new-foo " )
check = _make_checker ( net . forward )
check ( " mode " , " hostdev " )
check ( " managed " , " yes " )
check = _make_checker ( net . forward . pf [ 0 ] )
check ( " dev " , " eth3 " )
utils . diff_compare ( net . get_xml ( ) , outfile )
utils . test_create ( conn , net . get_xml ( ) , " networkDefineXML " )
##############
# Misc tests #
##############
def testCPUUnknownClear ( ) :
# Make sure .clear() even removes XML elements we don't know about
kvmconn = utils . URIs . open_kvm ( )
basename = " clear-cpu-unknown-vals "
infile = DATADIR + " %s -in.xml " % basename
outfile = DATADIR + " %s -out.xml " % basename
guest = virtinst . Guest ( kvmconn , parsexml = open ( infile ) . read ( ) )
guest . cpu . copy_host_cpu ( guest )
guest . cpu . clear ( )
utils . diff_compare ( guest . get_xml ( ) , outfile )
def testDomainRoundtrip ( ) :
conn = utils . URIs . open_testdefault_cached ( )
# Make sure our XML engine doesn't mangle non-libvirt XML bits
infile = DATADIR + " domain-roundtrip.xml "
outfile = DATADIR + " domain-roundtrip.xml "
guest = virtinst . Guest ( conn , parsexml = open ( infile ) . read ( ) )
utils . diff_compare ( guest . get_xml ( ) , outfile )
def testYesNoUnexpectedParse ( ) :
conn = utils . URIs . open_testdefault_cached ( )
# Make sure that if we see an unexpected yes/no or on/off value,
# we just return it to the user and don't error. Libvirt could
# change our assumptions and we shouldn't be too restrictive
xml = ( " <hostdev managed= ' foo ' > \n <rom bar= ' wibble ' /> \n "
" <source><address bus= ' hello ' /></source> \n </hostdev> " )
dev = virtinst . DeviceHostdev ( conn , parsexml = xml )
assert dev . managed == " foo "
assert dev . rom_bar == " wibble "
assert dev . scsi_bus == " hello "
dev . managed = " test1 "
dev . rom_bar = " test2 "
assert dev . managed == " test1 "
assert dev . rom_bar == " test2 "
with pytest . raises ( ValueError ) :
dev . scsi_bus = " goodbye "
def testXMLBuilderCoverage ( ) :
"""
Test XMLBuilder corner cases
"""
conn = utils . URIs . open_testdefault_cached ( )
with pytest . raises ( RuntimeError , match = " .* ' foo ' .* " ) :
# Ensure we validate root element
virtinst . DeviceDisk ( conn , parsexml = " <foo/> " )
with pytest . raises ( Exception , match = " .*xmlParseDoc.* " ) :
# Ensure we validate root element
virtinst . DeviceDisk ( conn , parsexml = - 1 )
with pytest . raises ( virtinst . xmlutil . DevError ) :
raise virtinst . xmlutil . DevError ( " for coverage " )
with pytest . raises ( ValueError ) :
virtinst . DeviceDisk . validate_generic_name ( " objtype " , None )
with pytest . raises ( ValueError ) :
virtinst . DeviceDisk . validate_generic_name ( " objtype " , " foo bar " )
# Test property __repr__ for code coverage
assert " DeviceAddress " in str ( virtinst . DeviceDisk . address )
assert " ./driver/@cache " in str ( virtinst . DeviceDisk . driver_cache )
# Conversion of 0x value into int
xml = """
< controller type = ' scsi ' >
< address type = ' pci ' bus = ' 0x00 ' slot = ' 0x04 ' function = ' 0x7 ' / >
< / controller >
"""
dev = virtinst . DeviceController ( conn , parsexml = xml )
assert dev . address . slot == 4
# Some XML formatting and get_xml_* corner cases
conn = utils . URIs . openconn ( utils . URIs . test_suite )
xml = conn . lookupByName ( " test-for-virtxml " ) . XMLDesc ( 0 )
guest = virtinst . Guest ( conn , parsexml = xml )
assert guest . features . get_xml ( ) . startswith ( " <features " )
assert guest . clock . get_xml ( ) . startswith ( " <clock " )
assert guest . seclabels [ 0 ] . get_xml ( ) . startswith ( " <seclabel " )
assert guest . cpu . get_xml ( ) . startswith ( " <cpu " )
assert guest . os . get_xml ( ) . startswith ( " <os " )
assert guest . cpu . get_xml_id ( ) == " ./cpu "
assert guest . cpu . get_xml_idx ( ) == 0
assert guest . get_xml_id ( ) == " . "
assert guest . get_xml_idx ( ) == 0
assert guest . devices . disk [ 1 ] . get_xml_id ( ) == " ./devices/disk[2] "
assert guest . devices . disk [ 1 ] . get_xml_idx ( ) == 1
def testReplaceChildParse ( ) :
conn = utils . URIs . open_testdefault_cached ( )
buildfile = DATADIR + " replace-child-build.xml "
parsefile = DATADIR + " replace-child-parse.xml "
def mkdisk ( target ) :
disk = virtinst . DeviceDisk ( conn )
disk . device = " cdrom "
disk . bus = " scsi "
disk . target = target
return disk
guest = virtinst . Guest ( conn )
guest . add_device ( mkdisk ( " sda " ) )
guest . add_device ( mkdisk ( " sdb " ) )
guest . add_device ( mkdisk ( " sdc " ) )
guest . add_device ( mkdisk ( " sdd " ) )
guest . add_device ( mkdisk ( " sde " ) )
guest . add_device ( mkdisk ( " sdf " ) )
guest . devices . replace_child ( guest . devices . disk [ 2 ] , mkdisk ( " sdz " ) )
guest . set_defaults ( guest )
utils . diff_compare ( guest . get_xml ( ) , buildfile )
guest = virtinst . Guest ( conn , parsexml = guest . get_xml ( ) )
newdisk = virtinst . DeviceDisk ( conn ,
parsexml = mkdisk ( " sdw " ) . get_xml ( ) )
guest . devices . replace_child ( guest . devices . disk [ 4 ] , newdisk )
utils . diff_compare ( guest . get_xml ( ) , parsefile )
def testGuestXMLDeviceMatch ( ) :
"""
Test Guest . find_device and Device . compare_device
"""
uri = utils . URIs . test_suite
conn = utils . URIs . openconn ( uri )
dom = conn . lookupByName ( " test-for-virtxml " )
xml = dom . XMLDesc ( 0 )
guest = virtinst . Guest ( conn , xml )
guest2 = virtinst . Guest ( conn , xml )
# Assert id matching works
diskdev = guest . devices . disk [ 0 ]
assert guest . find_device ( diskdev ) == diskdev
# Assert type checking correct returns False
ifacedev = guest . devices . interface [ 0 ]
assert ifacedev . compare_device ( diskdev , 0 ) is False
# find_device should fail here
nodev = virtinst . DeviceWatchdog ( conn )
assert guest . find_device ( nodev ) is None
# Ensure parsed XML devices match correctly
for srcdev in guest . devices . get_all ( ) :
devxml = srcdev . get_xml ( )
newdev = srcdev . __class__ ( conn , devxml )
if srcdev != guest . find_device ( newdev ) :
raise AssertionError ( " guest.find_device failed for dev= %s " %
newdev )
# Ensure devices from another parsed XML doc compare correctly
for srcdev in guest . devices . get_all ( ) :
if not guest2 . find_device ( srcdev ) :
raise AssertionError ( " guest.find_device failed for dev= %s " %
srcdev )
def testControllerAttachedDevices ( ) :
"""
Test DeviceController . get_attached_devices
"""
conn = utils . URIs . open_testdefault_cached ( )
xml = open ( DATADIR + " controller-attached-devices.xml " ) . read ( )
guest = virtinst . Guest ( conn , xml )
# virtio-serial path
controller = [ c for c in guest . devices . controller if
c . type == " virtio-serial " ] [ 0 ]
devs = controller . get_attached_devices ( guest )
assert len ( devs ) == 4
assert devs [ - 1 ] . DEVICE_TYPE == " console "
# disk path
controller = [ c for c in guest . devices . controller if
c . type == " sata " ] [ 0 ]
devs = controller . get_attached_devices ( guest )
assert len ( devs ) == 1
assert devs [ - 1 ] . device == " cdrom "
# Little test for DeviceAddress.pretty_desc
assert devs [ - 1 ] . address . pretty_desc ( ) == " 0:0:0:3 "