domain: Have test driver report fake job_info data

Helps us test more of the UI

Signed-off-by: Cole Robinson <crobinso@redhat.com>
This commit is contained in:
Cole Robinson 2020-08-27 16:08:32 -04:00
parent b764773cfe
commit ade6c34a96

View File

@ -25,6 +25,14 @@ class _SENTINEL(object):
pass
def _fake_job_info():
import random
total = 1024 * 1024 * 1024
fakepcent = random.choice(range(1, 100))
remaining = ((total / 100) * fakepcent)
return [None, None, None, total, None, remaining]
def start_job_progress_thread(vm, meter, progtext):
current_thread = threading.currentThread()
@ -33,7 +41,7 @@ def start_job_progress_thread(vm, meter, progtext):
time.sleep(.5)
if not current_thread.isAlive():
return False
return
try:
jobinfo = vm.job_info()
@ -43,7 +51,7 @@ def start_job_progress_thread(vm, meter, progtext):
# data_total is 0 if the job hasn't started yet
if not data_total:
continue
continue # pragma: no cover
if not meter.started:
meter.start(size=data_total,
@ -51,11 +59,9 @@ def start_job_progress_thread(vm, meter, progtext):
progress = data_total - data_remaining
meter.update(progress)
except Exception:
except Exception: # pragma: no cover
log.exception("Error calling jobinfo")
return False
return True
return
if vm.supports_domain_job_info():
t = threading.Thread(target=jobinfo_cb,
@ -975,6 +981,8 @@ class vmmDomain(vmmLibvirtObject):
self.get_autostart()
def job_info(self):
if self.conn.is_test():
return _fake_job_info()
return self._backend.jobInfo()
def abort_job(self):
self._backend.abortJob()