mirror of
https://github.com/virt-manager/virt-manager.git
synced 2025-01-05 09:17:57 +03:00
058d8b4ccd
With the libvirt testdriver, we can create+start a directory pool that points to any arbitrary host path. But this doesn't match the behavior of the real storage driver, where pool start will fail if the source directory doesn't exist. Let's mock this by raising an exception if we are inside the test suite and we see a special string in the pool XML Signed-off-by: Cole Robinson <crobinso@redhat.com>
246 lines
8.5 KiB
Python
246 lines
8.5 KiB
Python
# Copyright (C) 2013, 2014 Red Hat, Inc.
|
|
#
|
|
# This work is licensed under the GNU GPLv2 or later.
|
|
# See the COPYING file in the top-level directory.
|
|
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
import virtinst
|
|
from virtinst import DeviceDisk
|
|
|
|
from tests import utils
|
|
|
|
|
|
def test_disk_numtotarget():
|
|
# Various testing our target generation
|
|
#
|
|
# Note: using single quotes in strings to avoid
|
|
# codespell flagging the 'ba' assert
|
|
assert DeviceDisk.num_to_target(1) == 'a'
|
|
assert DeviceDisk.num_to_target(2) == 'b'
|
|
assert DeviceDisk.num_to_target(26) == 'z'
|
|
assert DeviceDisk.num_to_target(27) == 'aa'
|
|
assert DeviceDisk.num_to_target(28) == 'ab'
|
|
assert DeviceDisk.num_to_target(52) == 'az'
|
|
assert DeviceDisk.num_to_target(53) == 'ba'
|
|
assert DeviceDisk.num_to_target(27 * 26) == 'zz'
|
|
assert DeviceDisk.num_to_target(27 * 26 + 1) == 'aaa'
|
|
|
|
assert DeviceDisk.target_to_num('hda') == 0
|
|
assert DeviceDisk.target_to_num('hdb') == 1
|
|
assert DeviceDisk.target_to_num('sdz') == 25
|
|
assert DeviceDisk.target_to_num('sdaa') == 26
|
|
assert DeviceDisk.target_to_num('vdab') == 27
|
|
assert DeviceDisk.target_to_num('vdaz') == 51
|
|
assert DeviceDisk.target_to_num('xvdba') == 52
|
|
assert DeviceDisk.target_to_num('xvdzz') == 26 * (25 + 1) + 25
|
|
assert DeviceDisk.target_to_num('xvdaaa') == 26 * 26 * 1 + 26 * 1 + 0
|
|
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
disk = virtinst.DeviceDisk(conn)
|
|
disk.bus = 'ide'
|
|
|
|
assert disk.generate_target([]) == 'hda'
|
|
assert disk.generate_target(['hda']) == 'hdb'
|
|
assert disk.generate_target(['hdb', 'sda']) == 'hdc'
|
|
assert disk.generate_target(['hda', 'hdd']) == 'hdb'
|
|
|
|
|
|
def test_disk_dir_searchable(monkeypatch):
|
|
# Normally the dir searchable test is skipped in the test suite,
|
|
# but let's contrive an example that should trigger all the code
|
|
# to ensure it isn't horribly broken
|
|
conn = utils.URIs.open_kvm()
|
|
|
|
def _set_caps_baselabel_uid(uid):
|
|
secmodel = [s for s in conn.caps.host.secmodels
|
|
if s.model == "dac"][0]
|
|
for baselabel in [b for b in secmodel.baselabels
|
|
if b.type in ["qemu", "kvm"]]:
|
|
baselabel.content = "+%s:+%s" % (uid, uid)
|
|
|
|
tmpobj = tempfile.TemporaryDirectory(prefix="virtinst-test-search")
|
|
tmpdir = tmpobj.name
|
|
try:
|
|
# path="" should trigger early return
|
|
searchdata = virtinst.DeviceDisk.check_path_search(conn, "")
|
|
assert searchdata.uid is None
|
|
# path=None should trigger early return
|
|
searchdata = virtinst.DeviceDisk.check_path_search(conn, None)
|
|
assert searchdata.uid is None
|
|
|
|
# Invalid uid
|
|
_set_caps_baselabel_uid(-1)
|
|
searchdata = virtinst.DeviceDisk.check_path_search(conn, tmpdir)
|
|
assert searchdata.uid is None
|
|
|
|
# Use our uid, verify it shows we have expected access
|
|
_set_caps_baselabel_uid(os.getuid())
|
|
searchdata = virtinst.DeviceDisk.check_path_search(conn,
|
|
tmpdir + "/footest")
|
|
assert searchdata.uid == os.getuid()
|
|
# pylint: disable=use-implicit-booleaness-not-comparison
|
|
assert searchdata.fixlist == []
|
|
|
|
# Remove perms on the tmpdir, now it should report failures
|
|
os.chmod(tmpdir, 0o000)
|
|
searchdata = virtinst.DeviceDisk.check_path_search(conn, tmpdir)
|
|
assert searchdata.fixlist == [tmpdir]
|
|
|
|
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
|
|
assert not bool(errdict)
|
|
|
|
# Mock setfacl to definitely fail
|
|
with monkeypatch.context() as m:
|
|
m.setattr("virtinst.diskbackend.SETFACL", "getfacl")
|
|
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
|
|
|
|
finally:
|
|
# Reset changes we made
|
|
conn.invalidate_caps()
|
|
os.chmod(tmpdir, 0o777)
|
|
|
|
|
|
def test_disk_path_in_use_kernel():
|
|
# Extra tests for DeviceDisk.path_in_use
|
|
conn = utils.URIs.open_kvm()
|
|
|
|
# Comparing against kernel
|
|
vms = virtinst.DeviceDisk.path_in_use_by(
|
|
conn, "/pool-dir/test-arm-kernel")
|
|
assert vms == ["test-arm-kernel"]
|
|
|
|
|
|
def test_disk_paths_in_use():
|
|
conn = utils.URIs.open_kvm()
|
|
|
|
vms = virtinst.DeviceDisk.paths_in_use_by(
|
|
conn, ["/pool-dir/test-arm-kernel", "/pool-dir/test-arm-initrd"])
|
|
assert vms == [["test-arm-kernel"], ["test-arm-kernel"]]
|
|
|
|
|
|
def test_disk_diskbackend_misc():
|
|
# Test get_size() with vol_install
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
disk = virtinst.DeviceDisk(conn)
|
|
pool = conn.storagePoolLookupByName("pool-dir")
|
|
vol_install = disk.build_vol_install(conn, "newvol1.img", pool, 1, False)
|
|
disk.set_vol_install(vol_install)
|
|
assert disk.get_size() == 1.0
|
|
|
|
# Test some blockdev inspecting
|
|
conn = utils.URIs.openconn("test:///default")
|
|
if os.path.exists("/dev/loop0"):
|
|
disk = virtinst.DeviceDisk(conn)
|
|
disk.set_source_path("/dev/loop0")
|
|
assert disk.type == "block"
|
|
disk.get_size()
|
|
|
|
# Test sparse cloning
|
|
tmpinput = tempfile.NamedTemporaryFile()
|
|
open(tmpinput.name, "wb").write(b'\0' * 10000)
|
|
|
|
srcdisk = virtinst.DeviceDisk(conn)
|
|
srcdisk.set_source_path(tmpinput.name)
|
|
|
|
newdisk = virtinst.DeviceDisk(conn)
|
|
tmpoutput = tempfile.NamedTemporaryFile()
|
|
os.unlink(tmpoutput.name)
|
|
newdisk.set_source_path(tmpoutput.name)
|
|
newdisk.set_local_disk_to_clone(srcdisk, True)
|
|
newdisk.build_storage(None)
|
|
|
|
# Test cloning onto existing disk
|
|
newdisk = virtinst.DeviceDisk(conn, parsexml=newdisk.get_xml())
|
|
newdisk.set_source_path(newdisk.get_source_path())
|
|
newdisk.set_local_disk_to_clone(srcdisk, True)
|
|
newdisk.build_storage(None)
|
|
|
|
newdisk = virtinst.DeviceDisk(conn)
|
|
newdisk.type = "block"
|
|
newdisk.set_source_path("/dev/foo/idontexist")
|
|
assert newdisk.get_size() == 0
|
|
|
|
conn = utils.URIs.open_testdriver_cached()
|
|
volpath = "/pool-dir/test-clone-simple.img"
|
|
assert virtinst.DeviceDisk.path_definitely_exists(conn, volpath)
|
|
disk = virtinst.DeviceDisk(conn)
|
|
disk.set_source_path(volpath)
|
|
assert disk.get_size()
|
|
|
|
disk = virtinst.DeviceDisk(conn)
|
|
try:
|
|
disk.set_source_path(
|
|
"/virtinst-testsuite-fail-pool-install/test.qcow2")
|
|
raise AssertionError("expected failure")
|
|
except RuntimeError as e:
|
|
assert "StoragePool.install testsuite mocked failure" in str(e)
|
|
|
|
|
|
def test_disk_diskbackend_parse():
|
|
# Test that calling validate() on parsed disk XML doesn't attempt
|
|
# to verify the path exists. Assume it's a working config
|
|
conn = utils.URIs.open_testdriver_cached()
|
|
xml = ("<disk type='file' device='disk'>"
|
|
"<source file='/A/B/C/D/NOPE'/>"
|
|
"</disk>")
|
|
disk = virtinst.DeviceDisk(conn, parsexml=xml)
|
|
disk.validate()
|
|
disk.is_size_conflict()
|
|
disk.build_storage(None)
|
|
assert getattr(disk, "_storage_backend").is_stub() is True
|
|
|
|
# Stub backend coverage testing
|
|
backend = getattr(disk, "_storage_backend")
|
|
assert disk.get_parent_pool() is None
|
|
assert disk.get_vol_object() is None
|
|
assert disk.get_vol_install() is None
|
|
assert disk.get_size() == 0
|
|
assert backend.get_vol_xml() is None
|
|
assert backend.get_dev_type() == "file"
|
|
assert backend.get_driver_type() is None
|
|
assert backend.get_parent_pool() is None
|
|
|
|
disk.set_backend_for_existing_path()
|
|
assert getattr(disk, "_storage_backend").is_stub() is False
|
|
|
|
with pytest.raises(ValueError):
|
|
disk.validate()
|
|
|
|
# Ensure set_backend_for_existing_path resolves a path
|
|
# to its existing storage volume
|
|
xml = ("<disk type='file' device='disk'>"
|
|
"<source file='/pool-dir/default-vol'/>"
|
|
"</disk>")
|
|
disk = virtinst.DeviceDisk(conn, parsexml=xml)
|
|
disk.set_backend_for_existing_path()
|
|
assert disk.get_vol_object()
|
|
|
|
# Verify set_backend_for_existing_path doesn't error
|
|
# for a variety of disks
|
|
dom = conn.lookupByName("test-many-devices")
|
|
guest = virtinst.Guest(conn, parsexml=dom.XMLDesc(0))
|
|
for disk in guest.devices.disk:
|
|
disk.set_backend_for_existing_path()
|
|
|
|
|
|
def test_disk_rbd_path():
|
|
conn = utils.URIs.open_testdriver_cached()
|
|
diskxml1 = """
|
|
<disk type="network" device="disk">
|
|
<source protocol="rbd" name="rbd-sourcename/some-rbd-vol">
|
|
<host name="ceph-mon-1.example.com" port="6789"/>
|
|
<host name="ceph-mon-2.example.com" port="6789"/>
|
|
<host name="ceph-mon-3.example.com" port="6789"/>
|
|
</source>
|
|
<target dev="vdag" bus="virtio"/>
|
|
</disk>
|
|
"""
|
|
|
|
disk1 = virtinst.DeviceDisk(conn, parsexml=diskxml1)
|
|
disk1.set_backend_for_existing_path()
|
|
assert disk1.get_vol_object().name() == "some-rbd-vol"
|