diskbackend: Add full local clone test coverage

Signed-off-by: Cole Robinson <crobinso@redhat.com>
This commit is contained in:
Cole Robinson 2020-01-29 08:57:10 -05:00
parent e7bb021c4c
commit 7313f9ff6a
2 changed files with 28 additions and 9 deletions

View File

@ -260,9 +260,28 @@ class TestXMLMisc(unittest.TestCase):
assert disk.get_size() == 1.0
# Test some blockdev inspecting
conn = utils.URIs.openconn("test:///default")
if os.path.exists("/dev/loop0"):
conn = utils.URIs.openconn("test:///default")
disk = virtinst.DeviceDisk(conn)
disk.path = "/dev/loop0"
assert disk.type == "block"
disk.get_size()
# Test sparse cloning
tmpinput = tempfile.NamedTemporaryFile()
open(tmpinput.name, "wb").write(b'\0' * 10000)
srcdisk = virtinst.DeviceDisk(conn)
srcdisk.path = tmpinput.name
newdisk = virtinst.DeviceDisk(conn)
tmpoutput = tempfile.NamedTemporaryFile()
os.unlink(tmpoutput.name)
newdisk.path = tmpoutput.name
newdisk.set_local_disk_to_clone(srcdisk, True)
newdisk.build_storage(None)
# Test cloning onto existing disk
newdisk.path = newdisk.path
newdisk.set_local_disk_to_clone(srcdisk, True)
newdisk.build_storage(None)

View File

@ -514,12 +514,12 @@ class CloneStorageCreator(_StorageCreator):
ret = False
msg = None
if self.get_dev_type() == "block":
avail = _get_size(self._path)
avail = _get_size(self._path) # pragma: no cover
else:
vfs = os.statvfs(os.path.dirname(self._path))
avail = vfs.f_frsize * vfs.f_bavail
need = int(self._size) * 1024 * 1024 * 1024
if need > avail:
if need > avail: # pragma: no cover
if self._sparse:
msg = _("The filesystem will not have enough free space"
" to fully allocate the sparse file when the guest"
@ -535,21 +535,21 @@ class CloneStorageCreator(_StorageCreator):
return (ret, msg)
def validate(self):
if self._size is None:
if self._size is None: # pragma: no cover
raise ValueError(_("size is required for non-existent disk "
"'%s'" % self.get_path()))
err, msg = self.is_size_conflict()
if err:
raise ValueError(msg)
raise ValueError(msg) # pragma: no cover
if msg:
log.warning(msg)
log.warning(msg) # pragma: no cover
def create(self, progresscb):
text = (_("Cloning %(srcfile)s") %
{'srcfile': os.path.basename(self._input_path)})
size_bytes = int(self.get_size()) * 1024 * 1024 * 1024
size_bytes = int(self.get_size() * 1024 * 1024 * 1024)
progresscb.start(filename=self._output_path, size=size_bytes,
text=text)
@ -588,7 +588,7 @@ class CloneStorageCreator(_StorageCreator):
self._input_path, self._output_path,
sparse, clone_block_size)
zeros = '\0' * 4096
zeros = b'\0' * 4096
src_fd, dst_fd = None, None
try:
@ -609,7 +609,7 @@ class CloneStorageCreator(_StorageCreator):
os.lseek(dst_fd, s, 1)
else:
b = os.write(dst_fd, l)
if s != b:
if s != b: # pragma: no cover
meter.end(i)
break
i += s