2020-09-18 22:45:47 +03:00
# Copyright (C) 2013, 2014 Red Hat, Inc.
#
# This work is licensed under the GNU GPLv2 or later.
# See the COPYING file in the top-level directory.
import os
import tempfile
import pytest
import virtinst
from virtinst import DeviceDisk
from tests import utils
def test_disk_numtotarget ( ) :
# Various testing our target generation
#
# Note: using single quotes in strings to avoid
# codespell flagging the 'ba' assert
assert DeviceDisk . num_to_target ( 1 ) == ' a '
assert DeviceDisk . num_to_target ( 2 ) == ' b '
assert DeviceDisk . num_to_target ( 26 ) == ' z '
assert DeviceDisk . num_to_target ( 27 ) == ' aa '
assert DeviceDisk . num_to_target ( 28 ) == ' ab '
assert DeviceDisk . num_to_target ( 52 ) == ' az '
assert DeviceDisk . num_to_target ( 53 ) == ' ba '
assert DeviceDisk . num_to_target ( 27 * 26 ) == ' zz '
assert DeviceDisk . num_to_target ( 27 * 26 + 1 ) == ' aaa '
assert DeviceDisk . target_to_num ( ' hda ' ) == 0
assert DeviceDisk . target_to_num ( ' hdb ' ) == 1
assert DeviceDisk . target_to_num ( ' sdz ' ) == 25
assert DeviceDisk . target_to_num ( ' sdaa ' ) == 26
assert DeviceDisk . target_to_num ( ' vdab ' ) == 27
assert DeviceDisk . target_to_num ( ' vdaz ' ) == 51
assert DeviceDisk . target_to_num ( ' xvdba ' ) == 52
assert DeviceDisk . target_to_num ( ' xvdzz ' ) == 26 * ( 25 + 1 ) + 25
assert DeviceDisk . target_to_num ( ' xvdaaa ' ) == 26 * 26 * 1 + 26 * 1 + 0
conn = utils . URIs . open_testdefault_cached ( )
disk = virtinst . DeviceDisk ( conn )
disk . bus = ' ide '
assert disk . generate_target ( [ ] ) == ' hda '
assert disk . generate_target ( [ ' hda ' ] ) == ' hdb '
assert disk . generate_target ( [ ' hdb ' , ' sda ' ] ) == ' hdc '
assert disk . generate_target ( [ ' hda ' , ' hdd ' ] ) == ' hdb '
2020-09-18 23:26:28 +03:00
def test_disk_dir_searchable ( monkeypatch ) :
# Normally the dir searchable test is skipped in the test suite,
2020-09-18 22:45:47 +03:00
# but let's contrive an example that should trigger all the code
# to ensure it isn't horribly broken
conn = utils . URIs . open_kvm ( )
def _set_caps_baselabel_uid ( uid ) :
secmodel = [ s for s in conn . caps . host . secmodels
if s . model == " dac " ] [ 0 ]
for baselabel in [ b for b in secmodel . baselabels
if b . type in [ " qemu " , " kvm " ] ] :
baselabel . content = " + %s :+ %s " % ( uid , uid )
tmpobj = tempfile . TemporaryDirectory ( prefix = " virtinst-test-search " )
tmpdir = tmpobj . name
try :
# Invalid uid
_set_caps_baselabel_uid ( - 1 )
searchdata = virtinst . DeviceDisk . check_path_search ( conn , tmpdir )
assert searchdata . uid is None
# Use our uid, verify it shows we have expected access
_set_caps_baselabel_uid ( os . getuid ( ) )
searchdata = virtinst . DeviceDisk . check_path_search ( conn ,
tmpdir + " /footest " )
assert searchdata . uid == os . getuid ( )
assert searchdata . fixlist == [ ]
# Remove perms on the tmpdir, now it should report failures
os . chmod ( tmpdir , 0o000 )
searchdata = virtinst . DeviceDisk . check_path_search ( conn , tmpdir )
assert searchdata . fixlist == [ tmpdir ]
errdict = virtinst . DeviceDisk . fix_path_search ( searchdata )
assert not bool ( errdict )
# Mock setfacl to definitely fail
2020-09-18 23:26:28 +03:00
with monkeypatch . context ( ) as m :
m . setattr ( " virtinst.diskbackend.SETFACL " , " getfacl " )
2020-09-18 22:45:47 +03:00
errdict = virtinst . DeviceDisk . fix_path_search ( searchdata )
finally :
# Reset changes we made
conn . invalidate_caps ( )
os . chmod ( tmpdir , 0o777 )
def test_disk_path_in_use_kernel ( ) :
# Extra tests for DeviceDisk.path_in_use
conn = utils . URIs . open_kvm ( )
# Comparing against kernel
vms = virtinst . DeviceDisk . path_in_use_by (
conn , " /dev/default-pool/test-arm-kernel " )
assert vms == [ " test-arm-kernel " ]
def test_disk_diskbackend_misc ( ) :
# Test get_size() with vol_install
conn = utils . URIs . open_testdefault_cached ( )
disk = virtinst . DeviceDisk ( conn )
pool = conn . storagePoolLookupByName ( " default-pool " )
vol_install = disk . build_vol_install ( conn , " newvol1.img " , pool , 1 , False )
disk . set_vol_install ( vol_install )
assert disk . get_size ( ) == 1.0
# Test some blockdev inspecting
conn = utils . URIs . openconn ( " test:///default " )
if os . path . exists ( " /dev/loop0 " ) :
disk = virtinst . DeviceDisk ( conn )
disk . path = " /dev/loop0 "
assert disk . type == " block "
disk . get_size ( )
# Test sparse cloning
tmpinput = tempfile . NamedTemporaryFile ( )
open ( tmpinput . name , " wb " ) . write ( b ' \0 ' * 10000 )
srcdisk = virtinst . DeviceDisk ( conn )
srcdisk . path = tmpinput . name
newdisk = virtinst . DeviceDisk ( conn )
tmpoutput = tempfile . NamedTemporaryFile ( )
os . unlink ( tmpoutput . name )
newdisk . path = tmpoutput . name
newdisk . set_local_disk_to_clone ( srcdisk , True )
newdisk . build_storage ( None )
# Test cloning onto existing disk
newdisk = virtinst . DeviceDisk ( conn , parsexml = newdisk . get_xml ( ) )
newdisk . path = newdisk . path
newdisk . set_local_disk_to_clone ( srcdisk , True )
newdisk . build_storage ( None )
newdisk = virtinst . DeviceDisk ( conn )
newdisk . type = " block "
newdisk . path = " /dev/foo/idontexist "
assert newdisk . get_size ( ) == 0
conn = utils . URIs . open_testdriver_cached ( )
volpath = " /dev/default-pool/test-clone-simple.img "
assert virtinst . DeviceDisk . path_definitely_exists ( conn , volpath )
disk = virtinst . DeviceDisk ( conn )
disk . path = volpath
assert disk . get_size ( )
def test_disk_diskbackend_parse ( ) :
# Test that calling validate() on parsed disk XML doesn't attempt
# to verify the path exists. Assume it's a working config
conn = utils . URIs . open_testdriver_cached ( )
xml = ( " <disk type= ' file ' device= ' disk ' > "
" <source file= ' /A/B/C/D/NOPE ' /> "
" </disk> " )
disk = virtinst . DeviceDisk ( conn , parsexml = xml )
disk . validate ( )
disk . is_size_conflict ( )
disk . build_storage ( None )
assert getattr ( disk , " _storage_backend " ) . is_stub ( ) is True
# Stub backend coverage testing
backend = getattr ( disk , " _storage_backend " )
assert disk . get_parent_pool ( ) is None
assert disk . get_vol_object ( ) is None
assert disk . get_vol_install ( ) is None
assert disk . get_size ( ) == 0
assert backend . get_vol_xml ( ) is None
assert backend . get_dev_type ( ) == " file "
assert backend . get_driver_type ( ) is None
assert backend . get_parent_pool ( ) is None
disk . set_backend_for_existing_path ( )
assert getattr ( disk , " _storage_backend " ) . is_stub ( ) is False
with pytest . raises ( ValueError ) :
disk . validate ( )
# Ensure set_backend_for_existing_path resolves a path
# to its existing storage volume
xml = ( " <disk type= ' file ' device= ' disk ' > "
" <source file= ' /dev/default-pool/default-vol ' /> "
" </disk> " )
disk = virtinst . DeviceDisk ( conn , parsexml = xml )
disk . set_backend_for_existing_path ( )
assert disk . get_vol_object ( )
# Verify set_backend_for_existing_path doesn't error
# for a variety of disks
dom = conn . lookupByName ( " test-many-devices " )
guest = virtinst . Guest ( conn , parsexml = dom . XMLDesc ( 0 ) )
for disk in guest . devices . disk :
disk . set_backend_for_existing_path ( )