mirror of
https://github.com/virt-manager/virt-manager.git
synced 2025-01-22 22:03:58 +03:00
tests: break out test_disk.py
Move a bunch of misc disk/diskbackend tests to this file Signed-off-by: Cole Robinson <crobinso@redhat.com>
This commit is contained in:
parent
0ac9ff3488
commit
4c4753d910
204
tests/test_disk.py
Normal file
204
tests/test_disk.py
Normal file
@ -0,0 +1,204 @@
|
||||
# Copyright (C) 2013, 2014 Red Hat, Inc.
|
||||
#
|
||||
# This work is licensed under the GNU GPLv2 or later.
|
||||
# See the COPYING file in the top-level directory.
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
import pytest
|
||||
|
||||
import virtinst
|
||||
from virtinst import DeviceDisk
|
||||
|
||||
from tests import utils
|
||||
|
||||
|
||||
def test_disk_numtotarget():
|
||||
# Various testing our target generation
|
||||
#
|
||||
# Note: using single quotes in strings to avoid
|
||||
# codespell flagging the 'ba' assert
|
||||
assert DeviceDisk.num_to_target(1) == 'a'
|
||||
assert DeviceDisk.num_to_target(2) == 'b'
|
||||
assert DeviceDisk.num_to_target(26) == 'z'
|
||||
assert DeviceDisk.num_to_target(27) == 'aa'
|
||||
assert DeviceDisk.num_to_target(28) == 'ab'
|
||||
assert DeviceDisk.num_to_target(52) == 'az'
|
||||
assert DeviceDisk.num_to_target(53) == 'ba'
|
||||
assert DeviceDisk.num_to_target(27 * 26) == 'zz'
|
||||
assert DeviceDisk.num_to_target(27 * 26 + 1) == 'aaa'
|
||||
|
||||
assert DeviceDisk.target_to_num('hda') == 0
|
||||
assert DeviceDisk.target_to_num('hdb') == 1
|
||||
assert DeviceDisk.target_to_num('sdz') == 25
|
||||
assert DeviceDisk.target_to_num('sdaa') == 26
|
||||
assert DeviceDisk.target_to_num('vdab') == 27
|
||||
assert DeviceDisk.target_to_num('vdaz') == 51
|
||||
assert DeviceDisk.target_to_num('xvdba') == 52
|
||||
assert DeviceDisk.target_to_num('xvdzz') == 26 * (25 + 1) + 25
|
||||
assert DeviceDisk.target_to_num('xvdaaa') == 26 * 26 * 1 + 26 * 1 + 0
|
||||
|
||||
conn = utils.URIs.open_testdefault_cached()
|
||||
disk = virtinst.DeviceDisk(conn)
|
||||
disk.bus = 'ide'
|
||||
|
||||
assert disk.generate_target([]) == 'hda'
|
||||
assert disk.generate_target(['hda']) == 'hdb'
|
||||
assert disk.generate_target(['hdb', 'sda']) == 'hdc'
|
||||
assert disk.generate_target(['hda', 'hdd']) == 'hdb'
|
||||
|
||||
|
||||
def test_disk_dir_searchable():
|
||||
# Normally the dir searchable test is skipped in the unittest,
|
||||
# but let's contrive an example that should trigger all the code
|
||||
# to ensure it isn't horribly broken
|
||||
conn = utils.URIs.open_kvm()
|
||||
|
||||
def _set_caps_baselabel_uid(uid):
|
||||
secmodel = [s for s in conn.caps.host.secmodels
|
||||
if s.model == "dac"][0]
|
||||
for baselabel in [b for b in secmodel.baselabels
|
||||
if b.type in ["qemu", "kvm"]]:
|
||||
baselabel.content = "+%s:+%s" % (uid, uid)
|
||||
|
||||
tmpobj = tempfile.TemporaryDirectory(prefix="virtinst-test-search")
|
||||
tmpdir = tmpobj.name
|
||||
try:
|
||||
# Invalid uid
|
||||
_set_caps_baselabel_uid(-1)
|
||||
searchdata = virtinst.DeviceDisk.check_path_search(conn, tmpdir)
|
||||
assert searchdata.uid is None
|
||||
|
||||
# Use our uid, verify it shows we have expected access
|
||||
_set_caps_baselabel_uid(os.getuid())
|
||||
searchdata = virtinst.DeviceDisk.check_path_search(conn,
|
||||
tmpdir + "/footest")
|
||||
assert searchdata.uid == os.getuid()
|
||||
assert searchdata.fixlist == []
|
||||
|
||||
# Remove perms on the tmpdir, now it should report failures
|
||||
os.chmod(tmpdir, 0o000)
|
||||
searchdata = virtinst.DeviceDisk.check_path_search(conn, tmpdir)
|
||||
assert searchdata.fixlist == [tmpdir]
|
||||
|
||||
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
|
||||
assert not bool(errdict)
|
||||
|
||||
# Mock setfacl to definitely fail
|
||||
with unittest.mock.patch("virtinst.diskbackend.SETFACL",
|
||||
"getfacl"):
|
||||
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
|
||||
|
||||
finally:
|
||||
# Reset changes we made
|
||||
conn.invalidate_caps()
|
||||
os.chmod(tmpdir, 0o777)
|
||||
|
||||
|
||||
def test_disk_path_in_use_kernel():
|
||||
# Extra tests for DeviceDisk.path_in_use
|
||||
conn = utils.URIs.open_kvm()
|
||||
|
||||
# Comparing against kernel
|
||||
vms = virtinst.DeviceDisk.path_in_use_by(
|
||||
conn, "/dev/default-pool/test-arm-kernel")
|
||||
assert vms == ["test-arm-kernel"]
|
||||
|
||||
|
||||
def test_disk_diskbackend_misc():
|
||||
# Test get_size() with vol_install
|
||||
conn = utils.URIs.open_testdefault_cached()
|
||||
disk = virtinst.DeviceDisk(conn)
|
||||
pool = conn.storagePoolLookupByName("default-pool")
|
||||
vol_install = disk.build_vol_install(conn, "newvol1.img", pool, 1, False)
|
||||
disk.set_vol_install(vol_install)
|
||||
assert disk.get_size() == 1.0
|
||||
|
||||
# Test some blockdev inspecting
|
||||
conn = utils.URIs.openconn("test:///default")
|
||||
if os.path.exists("/dev/loop0"):
|
||||
disk = virtinst.DeviceDisk(conn)
|
||||
disk.path = "/dev/loop0"
|
||||
assert disk.type == "block"
|
||||
disk.get_size()
|
||||
|
||||
# Test sparse cloning
|
||||
tmpinput = tempfile.NamedTemporaryFile()
|
||||
open(tmpinput.name, "wb").write(b'\0' * 10000)
|
||||
|
||||
srcdisk = virtinst.DeviceDisk(conn)
|
||||
srcdisk.path = tmpinput.name
|
||||
|
||||
newdisk = virtinst.DeviceDisk(conn)
|
||||
tmpoutput = tempfile.NamedTemporaryFile()
|
||||
os.unlink(tmpoutput.name)
|
||||
newdisk.path = tmpoutput.name
|
||||
newdisk.set_local_disk_to_clone(srcdisk, True)
|
||||
newdisk.build_storage(None)
|
||||
|
||||
# Test cloning onto existing disk
|
||||
newdisk = virtinst.DeviceDisk(conn, parsexml=newdisk.get_xml())
|
||||
newdisk.path = newdisk.path
|
||||
newdisk.set_local_disk_to_clone(srcdisk, True)
|
||||
newdisk.build_storage(None)
|
||||
|
||||
newdisk = virtinst.DeviceDisk(conn)
|
||||
newdisk.type = "block"
|
||||
newdisk.path = "/dev/foo/idontexist"
|
||||
assert newdisk.get_size() == 0
|
||||
|
||||
conn = utils.URIs.open_testdriver_cached()
|
||||
volpath = "/dev/default-pool/test-clone-simple.img"
|
||||
assert virtinst.DeviceDisk.path_definitely_exists(conn, volpath)
|
||||
disk = virtinst.DeviceDisk(conn)
|
||||
disk.path = volpath
|
||||
assert disk.get_size()
|
||||
|
||||
|
||||
def test_disk_diskbackend_parse():
|
||||
# Test that calling validate() on parsed disk XML doesn't attempt
|
||||
# to verify the path exists. Assume it's a working config
|
||||
conn = utils.URIs.open_testdriver_cached()
|
||||
xml = ("<disk type='file' device='disk'>"
|
||||
"<source file='/A/B/C/D/NOPE'/>"
|
||||
"</disk>")
|
||||
disk = virtinst.DeviceDisk(conn, parsexml=xml)
|
||||
disk.validate()
|
||||
disk.is_size_conflict()
|
||||
disk.build_storage(None)
|
||||
assert getattr(disk, "_storage_backend").is_stub() is True
|
||||
|
||||
# Stub backend coverage testing
|
||||
backend = getattr(disk, "_storage_backend")
|
||||
assert disk.get_parent_pool() is None
|
||||
assert disk.get_vol_object() is None
|
||||
assert disk.get_vol_install() is None
|
||||
assert disk.get_size() == 0
|
||||
assert backend.get_vol_xml() is None
|
||||
assert backend.get_dev_type() == "file"
|
||||
assert backend.get_driver_type() is None
|
||||
assert backend.get_parent_pool() is None
|
||||
|
||||
disk.set_backend_for_existing_path()
|
||||
assert getattr(disk, "_storage_backend").is_stub() is False
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
disk.validate()
|
||||
|
||||
# Ensure set_backend_for_existing_path resolves a path
|
||||
# to its existing storage volume
|
||||
xml = ("<disk type='file' device='disk'>"
|
||||
"<source file='/dev/default-pool/default-vol'/>"
|
||||
"</disk>")
|
||||
disk = virtinst.DeviceDisk(conn, parsexml=xml)
|
||||
disk.set_backend_for_existing_path()
|
||||
assert disk.get_vol_object()
|
||||
|
||||
# Verify set_backend_for_existing_path doesn't error
|
||||
# for a variety of disks
|
||||
dom = conn.lookupByName("test-many-devices")
|
||||
guest = virtinst.Guest(conn, parsexml=dom.XMLDesc(0))
|
||||
for disk in guest.devices.disk:
|
||||
disk.set_backend_for_existing_path()
|
@ -4,12 +4,9 @@
|
||||
# See the COPYING file in the top-level directory.
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
import virtinst
|
||||
from virtinst import cli
|
||||
from virtinst import DeviceDisk
|
||||
|
||||
from tests import utils
|
||||
|
||||
@ -60,39 +57,6 @@ class TestXMLMisc(unittest.TestCase):
|
||||
def conn(self):
|
||||
return utils.URIs.open_testdefault_cached()
|
||||
|
||||
def testDiskNumbers(self):
|
||||
# Various testing our target generation
|
||||
#
|
||||
# Note: using single quotes in strings to avoid
|
||||
# codespell flagging the 'ba' assert
|
||||
assert DeviceDisk.num_to_target(1) == 'a'
|
||||
assert DeviceDisk.num_to_target(2) == 'b'
|
||||
assert DeviceDisk.num_to_target(26) == 'z'
|
||||
assert DeviceDisk.num_to_target(27) == 'aa'
|
||||
assert DeviceDisk.num_to_target(28) == 'ab'
|
||||
assert DeviceDisk.num_to_target(52) == 'az'
|
||||
assert DeviceDisk.num_to_target(53) == 'ba'
|
||||
assert DeviceDisk.num_to_target(27 * 26) == 'zz'
|
||||
assert DeviceDisk.num_to_target(27 * 26 + 1) == 'aaa'
|
||||
|
||||
assert DeviceDisk.target_to_num('hda') == 0
|
||||
assert DeviceDisk.target_to_num('hdb') == 1
|
||||
assert DeviceDisk.target_to_num('sdz') == 25
|
||||
assert DeviceDisk.target_to_num('sdaa') == 26
|
||||
assert DeviceDisk.target_to_num('vdab') == 27
|
||||
assert DeviceDisk.target_to_num('vdaz') == 51
|
||||
assert DeviceDisk.target_to_num('xvdba') == 52
|
||||
assert DeviceDisk.target_to_num('xvdzz') == 26 * (25 + 1) + 25
|
||||
assert DeviceDisk.target_to_num('xvdaaa') == 26 * 26 * 1 + 26 * 1 + 0
|
||||
|
||||
disk = virtinst.DeviceDisk(self.conn)
|
||||
disk.bus = 'ide'
|
||||
|
||||
assert disk.generate_target([]) == 'hda'
|
||||
assert disk.generate_target(['hda']) == 'hdb'
|
||||
assert disk.generate_target(['hdb', 'sda']) == 'hdc'
|
||||
assert disk.generate_target(['hda', 'hdd']) == 'hdb'
|
||||
|
||||
def testCPUTopology(self):
|
||||
# Test CPU topology determining
|
||||
cpu = virtinst.DomainCpu(self.conn)
|
||||
@ -148,62 +112,8 @@ class TestXMLMisc(unittest.TestCase):
|
||||
g._metadata.libosinfo.os_id = "http://example.com/idontexit"
|
||||
assert g.osinfo.name == "generic"
|
||||
|
||||
def test_dir_searchable(self):
|
||||
# Normally the dir searchable test is skipped in the unittest,
|
||||
# but let's contrive an example that should trigger all the code
|
||||
# to ensure it isn't horribly broken
|
||||
conn = utils.URIs.open_kvm()
|
||||
|
||||
def _set_caps_baselabel_uid(uid):
|
||||
secmodel = [s for s in conn.caps.host.secmodels
|
||||
if s.model == "dac"][0]
|
||||
for baselabel in [b for b in secmodel.baselabels
|
||||
if b.type in ["qemu", "kvm"]]:
|
||||
baselabel.content = "+%s:+%s" % (uid, uid)
|
||||
|
||||
tmpobj = tempfile.TemporaryDirectory(prefix="virtinst-test-search")
|
||||
tmpdir = tmpobj.name
|
||||
try:
|
||||
# Invalid uid
|
||||
_set_caps_baselabel_uid(-1)
|
||||
searchdata = virtinst.DeviceDisk.check_path_search(conn, tmpdir)
|
||||
assert searchdata.uid is None
|
||||
|
||||
# Use our uid, verify it shows we have expected access
|
||||
_set_caps_baselabel_uid(os.getuid())
|
||||
searchdata = virtinst.DeviceDisk.check_path_search(conn,
|
||||
tmpdir + "/footest")
|
||||
assert searchdata.uid == os.getuid()
|
||||
assert searchdata.fixlist == []
|
||||
|
||||
# Remove perms on the tmpdir, now it should report failures
|
||||
os.chmod(tmpdir, 0o000)
|
||||
searchdata = virtinst.DeviceDisk.check_path_search(conn, tmpdir)
|
||||
assert searchdata.fixlist == [tmpdir]
|
||||
|
||||
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
|
||||
assert not bool(errdict)
|
||||
|
||||
# Mock setfacl to definitely fail
|
||||
with unittest.mock.patch("virtinst.diskbackend.SETFACL",
|
||||
"getfacl"):
|
||||
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
|
||||
|
||||
finally:
|
||||
# Reset changes we made
|
||||
conn.invalidate_caps()
|
||||
os.chmod(tmpdir, 0o777)
|
||||
|
||||
def test_path_in_use(self):
|
||||
# Extra tests for DeviceDisk.path_in_use
|
||||
conn = utils.URIs.open_kvm()
|
||||
|
||||
# Comparing against kernel
|
||||
vms = virtinst.DeviceDisk.path_in_use_by(
|
||||
conn, "/dev/default-pool/test-arm-kernel")
|
||||
assert vms == ["test-arm-kernel"]
|
||||
|
||||
def test_nonpredicatble_generate(self):
|
||||
from virtinst import cli
|
||||
kvm_uri = utils.URIs.kvm.replace(",predictable", "")
|
||||
kvmconn = cli.getConnection(kvm_uri)
|
||||
testconn = cli.getConnection("test:///default")
|
||||
@ -229,52 +139,3 @@ class TestXMLMisc(unittest.TestCase):
|
||||
except Exception as e:
|
||||
if not self.conn.support.is_libvirt_error_no_domain(e):
|
||||
raise
|
||||
|
||||
def test_disk_backend(self):
|
||||
# Test get_size() with vol_install
|
||||
disk = virtinst.DeviceDisk(self.conn)
|
||||
pool = self.conn.storagePoolLookupByName("default-pool")
|
||||
vol_install = disk.build_vol_install(self.conn, "newvol1.img",
|
||||
pool, 1, False)
|
||||
disk.set_vol_install(vol_install)
|
||||
assert disk.get_size() == 1.0
|
||||
|
||||
# Test some blockdev inspecting
|
||||
conn = utils.URIs.openconn("test:///default")
|
||||
if os.path.exists("/dev/loop0"):
|
||||
disk = virtinst.DeviceDisk(conn)
|
||||
disk.path = "/dev/loop0"
|
||||
assert disk.type == "block"
|
||||
disk.get_size()
|
||||
|
||||
# Test sparse cloning
|
||||
tmpinput = tempfile.NamedTemporaryFile()
|
||||
open(tmpinput.name, "wb").write(b'\0' * 10000)
|
||||
|
||||
srcdisk = virtinst.DeviceDisk(conn)
|
||||
srcdisk.path = tmpinput.name
|
||||
|
||||
newdisk = virtinst.DeviceDisk(conn)
|
||||
tmpoutput = tempfile.NamedTemporaryFile()
|
||||
os.unlink(tmpoutput.name)
|
||||
newdisk.path = tmpoutput.name
|
||||
newdisk.set_local_disk_to_clone(srcdisk, True)
|
||||
newdisk.build_storage(None)
|
||||
|
||||
# Test cloning onto existing disk
|
||||
newdisk = virtinst.DeviceDisk(conn, parsexml=newdisk.get_xml())
|
||||
newdisk.path = newdisk.path
|
||||
newdisk.set_local_disk_to_clone(srcdisk, True)
|
||||
newdisk.build_storage(None)
|
||||
|
||||
newdisk = virtinst.DeviceDisk(conn)
|
||||
newdisk.type = "block"
|
||||
newdisk.path = "/dev/foo/idontexist"
|
||||
assert newdisk.get_size() == 0
|
||||
|
||||
conn = utils.URIs.open_testdriver_cached()
|
||||
volpath = "/dev/default-pool/test-clone-simple.img"
|
||||
assert virtinst.DeviceDisk.path_definitely_exists(conn, volpath)
|
||||
disk = virtinst.DeviceDisk(conn)
|
||||
disk.path = volpath
|
||||
assert disk.get_size()
|
||||
|
@ -1050,7 +1050,6 @@ class XMLParseTest(unittest.TestCase):
|
||||
assert guest.devices.disk[1].get_xml_id() == "./devices/disk[2]"
|
||||
assert guest.devices.disk[1].get_xml_idx() == 1
|
||||
|
||||
|
||||
def testReplaceChildParse(self):
|
||||
buildfile = DATADIR + "replace-child-build.xml"
|
||||
parsefile = DATADIR + "replace-child-parse.xml"
|
||||
@ -1079,52 +1078,6 @@ class XMLParseTest(unittest.TestCase):
|
||||
guest.devices.replace_child(guest.devices.disk[4], newdisk)
|
||||
utils.diff_compare(guest.get_xml(), parsefile)
|
||||
|
||||
def testDiskBackend(self):
|
||||
# Test that calling validate() on parsed disk XML doesn't attempt
|
||||
# to verify the path exists. Assume it's a working config
|
||||
xml = ("<disk type='file' device='disk'>"
|
||||
"<source file='/A/B/C/D/NOPE'/>"
|
||||
"</disk>")
|
||||
disk = virtinst.DeviceDisk(self.conn, parsexml=xml)
|
||||
disk.validate()
|
||||
disk.is_size_conflict()
|
||||
disk.build_storage(None)
|
||||
assert getattr(disk, "_storage_backend").is_stub() is True
|
||||
|
||||
# Stub backend coverage testing
|
||||
backend = getattr(disk, "_storage_backend")
|
||||
assert disk.get_parent_pool() is None
|
||||
assert disk.get_vol_object() is None
|
||||
assert disk.get_vol_install() is None
|
||||
assert disk.get_size() == 0
|
||||
assert backend.get_vol_xml() is None
|
||||
assert backend.get_dev_type() == "file"
|
||||
assert backend.get_driver_type() is None
|
||||
assert backend.get_parent_pool() is None
|
||||
|
||||
disk.set_backend_for_existing_path()
|
||||
assert getattr(disk, "_storage_backend").is_stub() is False
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
disk.validate()
|
||||
|
||||
# Ensure set_backend_for_existing_path resolves a path
|
||||
# to its existing storage volume
|
||||
xml = ("<disk type='file' device='disk'>"
|
||||
"<source file='/dev/default-pool/default-vol'/>"
|
||||
"</disk>")
|
||||
conn = utils.URIs.open_testdriver_cached()
|
||||
disk = virtinst.DeviceDisk(conn, parsexml=xml)
|
||||
disk.set_backend_for_existing_path()
|
||||
assert disk.get_vol_object()
|
||||
|
||||
# Verify set_backend_for_existing_path doesn't error
|
||||
# for a variety of disks
|
||||
dom = conn.lookupByName("test-many-devices")
|
||||
guest = virtinst.Guest(conn, parsexml=dom.XMLDesc(0))
|
||||
for disk in guest.devices.disk:
|
||||
disk.set_backend_for_existing_path()
|
||||
|
||||
def testGuestXMLDeviceMatch(self):
|
||||
"""
|
||||
Test Guest.find_device and Device.compare_device
|
||||
|
Loading…
x
Reference in New Issue
Block a user