mirror of
https://github.com/virt-manager/virt-manager.git
synced 2024-12-22 13:34:07 +03:00
virtinst: support: add full code coverage testing
This commit is contained in:
parent
54a28485df
commit
ef972cf2ea
@ -61,3 +61,30 @@ class TestURI(unittest.TestCase):
|
||||
conn = tests.utils.URIs.openconn(uri)
|
||||
self.assertEqual(conn.conn_version(), 1)
|
||||
self.assertEqual(conn.local_libvirt_version(), 2)
|
||||
|
||||
conn = tests.utils.URIs.open_testdefault_cached()
|
||||
# Add some support tests with it
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
conn.support.domain_xml_inactive("foo")
|
||||
self.assertTrue("must be of type <class 'libvirt.virDomain'>" in
|
||||
str(cm.exception))
|
||||
|
||||
# pylint: disable=protected-access
|
||||
from virtinst import support
|
||||
def _run(**kwargs):
|
||||
check = support._SupportCheck(**kwargs)
|
||||
return check(conn)
|
||||
|
||||
self.assertFalse(_run(function="virNope.Foo"))
|
||||
self.assertFalse(_run(function="virDomain.IDontExist"))
|
||||
self.assertTrue(_run(function="virDomain.isActive"))
|
||||
self.assertFalse(_run(function="virConnect.getVersion",
|
||||
flag="SOME_FLAG_DOESNT_EXIST"))
|
||||
self.assertFalse(_run(version="1000.0.0"))
|
||||
self.assertFalse(_run(hv_version={"test": "1000.0.0"}))
|
||||
self.assertFalse(_run(hv_libvirt_version={"test": "1000.0.0"}))
|
||||
self.assertFalse(_run(hv_libvirt_version={"qemu": "1.2.3"}))
|
||||
self.assertTrue(_run(hv_libvirt_version={"qemu": "1.2.3", "all": 0}))
|
||||
|
||||
dom = conn.lookupByName("test")
|
||||
self.assertTrue(conn.support.domain_xml_inactive(dom))
|
||||
|
@ -9,77 +9,23 @@
|
||||
import libvirt
|
||||
|
||||
|
||||
# Check that command is present in the python bindings, and return the
|
||||
# the requested function
|
||||
def _get_command(funcname, objname=None, obj=None):
|
||||
if not obj:
|
||||
obj = libvirt
|
||||
|
||||
if objname:
|
||||
if not hasattr(libvirt, objname):
|
||||
return None
|
||||
obj = getattr(libvirt, objname)
|
||||
|
||||
if not hasattr(obj, funcname):
|
||||
return None
|
||||
|
||||
return getattr(obj, funcname)
|
||||
|
||||
|
||||
# Make sure libvirt object 'objname' has function 'funcname'
|
||||
def _has_command(funcname, objname=None, obj=None):
|
||||
return bool(_get_command(funcname, objname, obj))
|
||||
|
||||
|
||||
# Make sure libvirt object has flag 'flag_name'
|
||||
def _get_flag(flag_name):
|
||||
return _get_command(flag_name)
|
||||
|
||||
|
||||
# Try to call the passed function, and look for signs that libvirt or driver
|
||||
# doesn't support it
|
||||
def _try_command(func, run_args, check_all_error=False):
|
||||
try:
|
||||
func(*run_args)
|
||||
except libvirt.libvirtError as e:
|
||||
if SupportCache.is_error_nosupport(e):
|
||||
return False
|
||||
|
||||
if check_all_error:
|
||||
return False
|
||||
except Exception as e:
|
||||
# Other python exceptions likely mean the bindings are horked
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
# Return the hypervisor version
|
||||
def _split_function_name(function):
|
||||
if not function:
|
||||
return (None, None)
|
||||
|
||||
output = function.split(".")
|
||||
if len(output) == 1:
|
||||
return (None, output[0])
|
||||
else:
|
||||
return (output[0], output[1])
|
||||
|
||||
|
||||
def _check_function(function, flag, run_args, data):
|
||||
# Make sure function is present in either libvirt module or
|
||||
# object_name class
|
||||
object_name, function_name = _split_function_name(function)
|
||||
if not function_name:
|
||||
return None
|
||||
|
||||
flag_tuple = ()
|
||||
|
||||
if not _has_command(function_name, objname=object_name):
|
||||
"""
|
||||
Make sure function and option flag is present in the libvirt module.
|
||||
If run_args specified, try actually running the function against
|
||||
the passed 'data' object
|
||||
"""
|
||||
object_name, function_name = function.split(".")
|
||||
classobj = getattr(libvirt, object_name, None)
|
||||
if not classobj:
|
||||
return False
|
||||
if not getattr(classobj, function_name, None):
|
||||
return False
|
||||
|
||||
flag_tuple = None
|
||||
if flag:
|
||||
found_flag = _get_flag(flag)
|
||||
if not bool(found_flag):
|
||||
found_flag = getattr(libvirt, flag, None)
|
||||
if found_flag is None:
|
||||
return False
|
||||
flag_tuple = (found_flag,)
|
||||
|
||||
@ -88,18 +34,26 @@ def _check_function(function, flag, run_args, data):
|
||||
|
||||
# If function requires an object, make sure the passed obj
|
||||
# is of the correct type
|
||||
if object_name:
|
||||
classobj = _get_command(object_name)
|
||||
if not isinstance(data, classobj):
|
||||
raise ValueError(
|
||||
"Passed obj %s with args must be of type %s, was %s" %
|
||||
(data, str(classobj), type(data)))
|
||||
if not isinstance(data, classobj):
|
||||
raise ValueError(
|
||||
"Passed obj %s with args must be of type %s, was %s" %
|
||||
(data, str(classobj), type(data)))
|
||||
|
||||
cmd = _get_command(function_name, obj=data)
|
||||
use_args = run_args
|
||||
if flag_tuple:
|
||||
use_args += flag_tuple
|
||||
|
||||
# Function with args specified is all the proof we need
|
||||
return _try_command(cmd, run_args + flag_tuple,
|
||||
check_all_error=bool(flag_tuple))
|
||||
try:
|
||||
getattr(data, function_name)(*run_args)
|
||||
except libvirt.libvirtError as e:
|
||||
if SupportCache.is_error_nosupport(e):
|
||||
return False
|
||||
if bool(flag_tuple): # pragma: no cover
|
||||
return False
|
||||
except Exception as e: # pragma: no cover
|
||||
# Other python exceptions likely mean the bindings are horked
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _version_str_to_int(verstr):
|
||||
@ -122,9 +76,8 @@ class _SupportCheck(object):
|
||||
@version: Minimum libvirt version required for this feature. Not used
|
||||
if 'args' provided.
|
||||
|
||||
@function: Function name to check exists. If object not specified,
|
||||
function is checked against libvirt module. If run_args is specified,
|
||||
this function will actually be called, so beware.
|
||||
@function: Function name to check exists. Expected to be of the
|
||||
format $obj.$func. Like virDomain.isActive
|
||||
|
||||
@run_args: Argument tuple to actually test 'function' with, and check
|
||||
for an 'unsupported' error from libvirt.
|
||||
@ -153,6 +106,9 @@ class _SupportCheck(object):
|
||||
self.hv_version = hv_version or {}
|
||||
self.hv_libvirt_version = hv_libvirt_version or {}
|
||||
|
||||
if self.function:
|
||||
assert len(function.split(".")) == 2
|
||||
|
||||
versions = ([self.version] + list(self.hv_libvirt_version.values()))
|
||||
for vstr in versions:
|
||||
v = _version_str_to_int(vstr)
|
||||
@ -178,9 +134,11 @@ class _SupportCheck(object):
|
||||
if "VirtinstConnection" in repr(data):
|
||||
data = data.get_conn_for_api_arg()
|
||||
|
||||
ret = _check_function(self.function, self.flag, self.run_args, data)
|
||||
if ret is not None:
|
||||
return ret
|
||||
if self.function:
|
||||
ret = _check_function(
|
||||
self.function, self.flag, self.run_args, data)
|
||||
if ret is not None:
|
||||
return ret
|
||||
|
||||
# Do this after the function check, since there's an ordering issue
|
||||
# with VirtinstConnection
|
||||
@ -243,7 +201,7 @@ class SupportCache:
|
||||
with code VIR_ERR_NO_DOMAIN
|
||||
"""
|
||||
if not isinstance(err, libvirt.libvirtError):
|
||||
return False
|
||||
return False # pragma: no cover
|
||||
return err.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN
|
||||
|
||||
@staticmethod
|
||||
@ -256,13 +214,13 @@ class SupportCache:
|
||||
:returns: True if command isn't supported, False if we can't determine
|
||||
"""
|
||||
if not isinstance(err, libvirt.libvirtError):
|
||||
return False
|
||||
return False # pragma: no cover
|
||||
|
||||
if (err.get_error_code() == libvirt.VIR_ERR_RPC or
|
||||
err.get_error_code() == libvirt.VIR_ERR_NO_SUPPORT):
|
||||
return True
|
||||
|
||||
return False
|
||||
return False # pragma: no cover
|
||||
|
||||
|
||||
def __init__(self, virtconn):
|
||||
|
Loading…
Reference in New Issue
Block a user