mirror of
https://github.com/virt-manager/virt-manager.git
synced 2024-12-23 17:34:21 +03:00
f107e39989
Init a shared log instance in virtinst/logger.py, and use that throughout the code base, so we aren't calling directly into 'logging'. This helps protect our logging output from being cluttered with other library output, as happens with some 'requests' usage
212 lines
7.3 KiB
Python
212 lines
7.3 KiB
Python
# Copyright (C) 2013, 2015 Red Hat, Inc.
|
|
#
|
|
# This work is licensed under the GNU GPLv2 or later.
|
|
# See the COPYING file in the top-level directory.
|
|
|
|
import unittest
|
|
import os
|
|
|
|
from tests import utils
|
|
|
|
from virtinst import Cloner
|
|
from virtinst import log
|
|
|
|
ORIG_NAME = "clone-orig"
|
|
CLONE_NAME = "clone-new"
|
|
|
|
# Create some files to use as test images
|
|
FILE1 = "/tmp/virtinst-test1.img"
|
|
FILE2 = "/tmp/virtinst-test2.img"
|
|
P1_VOL1 = "/dev/default-pool/testvol1.img"
|
|
P1_VOL2 = "/dev/default-pool/testvol2.img"
|
|
P2_VOL1 = "/dev/cross-pool/testvol1.img"
|
|
P2_VOL2 = "/dev/cross-pool/testvol2.img"
|
|
|
|
POOL1 = "/dev/default-pool"
|
|
POOL2 = "/dev/cross-pool"
|
|
DISKPOOL = "/dev/disk-pool"
|
|
|
|
local_files = [FILE1, FILE2]
|
|
|
|
clonexml_dir = os.path.join(os.getcwd(), "tests/clone-xml")
|
|
|
|
|
|
class TestClone(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
for f in local_files:
|
|
os.system("touch %s" % f)
|
|
|
|
def tearDown(self):
|
|
for f in local_files:
|
|
os.unlink(f)
|
|
|
|
def _clone(self, filebase, disks=None, force_list=None,
|
|
skip_list=None, compare=True, conn=None,
|
|
clone_disks_file=None):
|
|
"""Helper for comparing clone input/output from 2 xml files"""
|
|
infile = os.path.join(clonexml_dir, filebase + "-in.xml")
|
|
in_content = open(infile).read()
|
|
|
|
if not conn:
|
|
conn = utils.URIs.open_testdriver_cached()
|
|
cloneobj = Cloner(conn)
|
|
cloneobj.original_xml = in_content
|
|
|
|
force_list = force_list or []
|
|
for force in force_list:
|
|
cloneobj.force_target = force
|
|
self.assertEqual(cloneobj.force_target, force_list)
|
|
cloneobj.force_target = force_list
|
|
self.assertEqual(cloneobj.force_target, force_list)
|
|
|
|
skip_list = skip_list or []
|
|
for skip in skip_list:
|
|
cloneobj.skip_target = skip
|
|
self.assertEqual(cloneobj.skip_target, skip_list)
|
|
cloneobj.skip_target = skip_list
|
|
self.assertEqual(cloneobj.skip_target, skip_list)
|
|
|
|
cloneobj = self._default_clone_values(cloneobj, disks)
|
|
|
|
if compare:
|
|
self._clone_compare(cloneobj, filebase,
|
|
clone_disks_file=clone_disks_file)
|
|
self._clone_define(filebase)
|
|
else:
|
|
cloneobj.setup_original()
|
|
cloneobj.setup_clone()
|
|
|
|
def _default_clone_values(self, cloneobj, disks=None):
|
|
"""Sets default values for the cloned VM."""
|
|
cloneobj.clone_name = "clone-new"
|
|
|
|
uuid = "12345678-1234-1234-1234-123456789012"
|
|
cloneobj.clone_uuid = uuid
|
|
self.assertEqual(cloneobj.clone_uuid, uuid)
|
|
|
|
macs = ["22:23:45:67:89:00", "22:23:45:67:89:01"]
|
|
cloneobj.clone_macs = macs
|
|
self.assertEqual(cloneobj.clone_macs, macs)
|
|
|
|
if disks is None:
|
|
disks = ["/dev/disk-pool/disk-vol1", "/tmp/clone2.img",
|
|
"/clone3", "/tmp/clone4.img",
|
|
"/tmp/clone5.img", None]
|
|
|
|
cloneobj.clone_paths = disks
|
|
self.assertEqual(cloneobj.clone_paths, disks)
|
|
return cloneobj
|
|
|
|
def _clone_compare(self, cloneobj, outbase, clone_disks_file=None):
|
|
"""Helps compare output from passed clone instance with an xml file"""
|
|
outfile = os.path.join(clonexml_dir, outbase + "-out.xml")
|
|
|
|
cloneobj.setup_original()
|
|
cloneobj.setup_clone()
|
|
|
|
utils.diff_compare(cloneobj.clone_xml, outfile)
|
|
if clone_disks_file:
|
|
xml_clone_disks = ""
|
|
for i in cloneobj.clone_disks:
|
|
xml_clone_disks += i.get_vol_install().get_xml()
|
|
utils.diff_compare(xml_clone_disks, clone_disks_file)
|
|
|
|
def _clone_define(self, filebase):
|
|
"""Take the valid output xml and attempt to define it on the
|
|
connection to ensure we don't get any errors"""
|
|
outfile = os.path.join(clonexml_dir, filebase + "-out.xml")
|
|
outxml = open(outfile).read()
|
|
conn = utils.URIs.open_testdriver_cached()
|
|
utils.test_create(conn, outxml)
|
|
|
|
def testRemoteNoStorage(self):
|
|
"""Test remote clone where VM has no storage that needs cloning"""
|
|
conn = utils.URIs.open_test_remote()
|
|
self._clone("nostorage", conn=conn)
|
|
self._clone("noclone-storage", conn=conn)
|
|
|
|
def testRemoteWithStorage(self):
|
|
"""
|
|
Test remote clone with storage needing cloning. Should fail,
|
|
since libvirt has no storage clone api.
|
|
"""
|
|
conn = utils.URIs.open_test_remote()
|
|
disks = ["%s/1.img" % POOL1, "%s/2.img" % POOL1]
|
|
try:
|
|
self._clone("general-cfg", disks=disks, conn=conn)
|
|
# We shouldn't succeed, so test fails
|
|
raise AssertionError("Remote clone with storage passed "
|
|
"when it shouldn't.")
|
|
except (ValueError, RuntimeError) as e:
|
|
# Exception expected
|
|
log.debug("Received expected exception: %s", str(e))
|
|
|
|
def testCloneStorageManaged(self):
|
|
disks = ["%s/new1.img" % POOL1, "%s/new2.img" % DISKPOOL]
|
|
self._clone("managed-storage", disks=disks)
|
|
|
|
def testCloneStorageCrossPool(self):
|
|
conn = utils.URIs.open_test_remote()
|
|
clone_disks_file = os.path.join(
|
|
clonexml_dir, "cross-pool-disks-out.xml")
|
|
disks = ["%s/new1.img" % POOL2, "%s/new2.img" % POOL1]
|
|
self._clone("cross-pool", disks=disks,
|
|
clone_disks_file=clone_disks_file, conn=conn)
|
|
|
|
def testCloneStorageForce(self):
|
|
disks = ["/dev/default-pool/1234.img", None, "/clone2.img"]
|
|
self._clone("force", disks=disks, force_list=["hda", "fdb", "sdb"])
|
|
|
|
def testCloneStorageSkip(self):
|
|
disks = ["/dev/default-pool/1234.img", None, "/tmp/clone2.img"]
|
|
skip_list = ["hda", "fdb"]
|
|
self._clone("skip", disks=disks, skip_list=skip_list)
|
|
|
|
def testCloneFullPool(self):
|
|
with self.assertRaises(Exception):
|
|
self._clone("fullpool",
|
|
disks=["/full-pool/test.img"], compare=False)
|
|
|
|
def testCloneNvramAuto(self):
|
|
self._clone("nvram-auto")
|
|
|
|
def testCloneNvramNewpool(self):
|
|
self._clone("nvram-newpool")
|
|
|
|
def testCloneNvramMissing(self):
|
|
self._clone("nvram-missing")
|
|
|
|
def testCloneGraphicsPassword(self):
|
|
self._clone("graphics-password")
|
|
|
|
def testCloneChannelSource(self):
|
|
self._clone("channel-source")
|
|
|
|
def testCloneMisc(self):
|
|
conn = utils.URIs.open_testdriver_cached()
|
|
|
|
with self.assertRaises(RuntimeError) as err:
|
|
cloner = Cloner(conn)
|
|
# Add this bit here for coverage testing
|
|
cloner.clone_xml = None
|
|
cloner.setup_original()
|
|
self.assertTrue("Original guest name or xml" in str(err.exception))
|
|
|
|
with self.assertRaises(RuntimeError) as err:
|
|
cloner = Cloner(conn)
|
|
cloner.original_guest = "test-snapshots"
|
|
cloner.setup_original()
|
|
self.assertTrue("paused or shutoff" in str(err.exception))
|
|
|
|
with self.assertRaises(ValueError) as err:
|
|
cloner = Cloner(conn)
|
|
cloner.original_guest = "test-clone-simple"
|
|
cloner.setup_original()
|
|
cloner.setup_clone()
|
|
self.assertTrue("More disks to clone" in str(err.exception))
|
|
|
|
cloner = Cloner(conn)
|
|
self.assertEqual(
|
|
cloner.generate_clone_name("test-clone5"), "test-clone6")
|