connection: Store all object lists in an external class

Simplifies list management and allows us to use real locking to prevent
list collisions.
This commit is contained in:
Cole Robinson 2015-04-10 10:33:04 -04:00
parent 2d25c920ea
commit 5357b91402

View File

@ -46,6 +46,97 @@ from .storagepool import vmmStoragePool
_disable_libvirt_events = False
class _ObjectList(vmmGObject):
"""
Class that wraps our internal list of libvirt objects
"""
def __init__(self):
vmmGObject.__init__(self)
self._objects = []
self._lock = threading.Lock()
def _cleanup(self):
try:
self._lock.acquire()
for obj in self._objects:
try:
obj.cleanup()
except:
logging.debug("Failed to cleanup %s", exc_info=True)
self._objects = []
finally:
self._lock.release()
def remove(self, obj):
"""
Remove an object from the list.
:param obj: vmmLibvirtObject to remove
:returns: True if object removed, False if object was not found
"""
try:
self._lock.acquire()
# Identity check is sufficient here, since we should never be
# asked to remove an object that wasn't at one point in the list.
if obj not in self._objects:
return False
self._objects.remove(obj)
return True
finally:
self._lock.release()
def add(self, obj):
"""
Add an object to the list.
:param obj: vmmLibvirtObject to add
:returns: True if object added, False if object already in the list
"""
try:
self._lock.acquire()
# We don't look up based on identity here, to prevent tick()
# races from adding the same domain twice
#
# We don't use lookup_object here since we need to hold the
# lock the whole time to prevent a 'time of check' issue
for checkobj in self._objects:
if (checkobj.__class__ == obj.__class__ and
checkobj.get_connkey() == obj.get_connkey()):
return False
if obj in self._objects:
return False
self._objects.append(obj)
return True
finally:
self._lock.release()
def get_objects_for_class(self, classobj):
"""
Return all objects over the passed vmmLibvirtObject class
"""
try:
self._lock.acquire()
return [o for o in self._objects if o.__class__ is classobj]
finally:
self._lock.release()
def lookup_object(self, classobj, connkey):
"""
Lookup an object with the passed classobj + connkey
"""
# Doesn't require locking, since get_objects_for_class covers us
for obj in self.get_objects_for_class(classobj):
if obj.get_connkey() == connkey:
return obj
return None
class vmmConnection(vmmGObject):
__gsignals__ = {
"vm-added": (GObject.SignalFlags.RUN_FIRST, None, [str]),
@ -92,22 +183,11 @@ class vmmConnection(vmmGObject):
self._xml_flags = {}
# Physical network interfaces: name -> virtinst.NodeDevice
self._nodedevs = {}
# Connection Storage pools: name -> vmmInterface
self._interfaces = {}
# Connection Storage pools: name -> vmmStoragePool
self._pools = {}
# Virtual networks: name -> vmmNetwork object
self._nets = {}
# Virtual machines: name -> vmmDomain object
self._vms = {}
# Resource utilization statistics
self._objects = _ObjectList()
self.record = []
self.hostinfo = None
self._tick_lock = threading.Lock()
self._init_virtconn()
@ -142,17 +222,17 @@ class vmmConnection(vmmGObject):
def _init_virtconn(self):
self._backend.cb_fetch_all_guests = (
lambda: [obj.get_xmlobj(refresh_if_nec=False)
for obj in self._vms.values()])
for obj in self.list_vms()])
self._backend.cb_fetch_all_pools = (
lambda: [obj.get_xmlobj(refresh_if_nec=False)
for obj in self._pools.values()])
for obj in self.list_pools()])
self._backend.cb_fetch_all_nodedevs = (
lambda: [obj.get_xmlobj(refresh_if_nec=False)
for obj in self._nodedevs.values()])
for obj in self.list_nodedevs()])
def fetch_all_vols():
ret = []
for pool in self._pools.values():
for pool in self.list_pools():
for vol in pool.get_volumes().values():
try:
ret.append(vol.get_xmlobj(refresh_if_nec=False))
@ -217,8 +297,8 @@ class vmmConnection(vmmGObject):
handle_id = vmmGObject.connect(self, name, callback, *args)
if name == "vm-added":
for connkey in self._vms.keys():
self.emit("vm-added", connkey)
for vm in self.list_vms():
self.emit("vm-added", vm.get_connkey())
return handle_id
@ -473,13 +553,13 @@ class vmmConnection(vmmGObject):
return self._get_flags_helper(iface, key, check_func)
def get_default_pool(self):
for p in self._pools.values():
for p in self.list_pools():
if p.get_name() == "default":
return p
return None
def get_vol_by_path(self, path):
for pool in self._pools.values():
for pool in self.list_pools():
for vol in pool.get_volumes().values():
try:
if vol.get_target_path() == path:
@ -525,29 +605,29 @@ class vmmConnection(vmmGObject):
#################################
def get_vm(self, connkey):
return self._vms[connkey]
return self._objects.lookup_object(vmmDomain, connkey)
def list_vms(self):
return self._vms.values()
return self._objects.get_objects_for_class(vmmDomain)
def get_net(self, connkey):
return self._nets[connkey]
return self._objects.lookup_object(vmmNetwork, connkey)
def list_nets(self):
return self._nets.values()
return self._objects.get_objects_for_class(vmmNetwork)
def get_pool(self, connkey):
return self._pools[connkey]
return self._objects.lookup_object(vmmStoragePool, connkey)
def list_pools(self):
return self._pools.values()
return self._objects.get_objects_for_class(vmmStoragePool)
def get_interface(self, connkey):
return self._interfaces[connkey]
return self._objects.lookup_object(vmmInterface, connkey)
def list_interfaces(self):
return self._interfaces.values()
return self._objects.get_objects_for_class(vmmInterface)
def get_nodedev(self, connkey):
return self._nodedevs[connkey]
def get_nodedevs(self):
return self._nodedevs.values()
return self._objects.lookup_object(vmmNodeDevice, connkey)
def list_nodedevs(self):
return self._objects.get_objects_for_class(vmmNodeDevice)
############################
@ -556,7 +636,7 @@ class vmmConnection(vmmGObject):
def filter_nodedevs(self, devtype=None, devcap=None):
retdevs = []
for dev in self.get_nodedevs():
for dev in self.list_nodedevs():
xmlobj = dev.get_xmlobj()
if devtype and xmlobj.device_type != devtype:
continue
@ -616,15 +696,15 @@ class vmmConnection(vmmGObject):
return self._backend.interfaceDefineXML(xml, 0)
def rename_object(self, obj, origxml, newxml, oldname, newname):
ignore = oldname
ignore = newname
if obj.class_name() == "domain":
define_cb = self.define_domain
objlist = self._vms
elif obj.class_name() == "pool":
define_cb = self.define_pool
objlist = self._pools
elif obj.class_name() == "network":
define_cb = self.define_network
objlist = self._nets
else:
raise RuntimeError("programming error: rename_object "
"helper doesn't support object class %s" % obj.__class__)
@ -633,11 +713,9 @@ class vmmConnection(vmmGObject):
obj.delete(force=False)
newobj = None
success = False
try:
# Redefine new domain
newobj = define_cb(newxml)
success = True
except Exception, renameerr:
try:
logging.debug("Error defining new name %s XML",
@ -658,10 +736,6 @@ class vmmConnection(vmmGObject):
# Reinsert handle into new obj
obj.change_name_backend(newobj)
if success:
objlist.pop(oldname)
objlist[newname] = obj
#########################
# Domain event handling #
@ -677,7 +751,7 @@ class vmmConnection(vmmGObject):
ignore = conn
ignore = args
obj = self._vms.get(domain.name(), None)
obj = self.get_vm(domain.name())
if not obj:
return
@ -688,7 +762,7 @@ class vmmConnection(vmmGObject):
ignore = conn
ignore = reason
ignore = userdata
obj = self._vms.get(domain.name(), None)
obj = self.get_vm(domain.name())
if obj:
# If the domain disappeared, this will catch it and trigger
@ -705,7 +779,7 @@ class vmmConnection(vmmGObject):
ignore = conn
ignore = reason
ignore = userdata
obj = self._nets.get(network.name(), None)
obj = self.get_net(network.name())
if obj:
self.idle_add(obj.refresh_status_from_event_loop)
@ -787,13 +861,6 @@ class vmmConnection(vmmGObject):
logging.debug("conn.close() uri=%s", self.get_uri())
self._closing = True
def cleanup(devs):
for dev in devs.values():
try:
dev.cleanup()
except:
logging.debug("Failed to cleanup %s", exc_info=True)
try:
if not self._backend.is_closed():
for eid in self._domain_cb_ids:
@ -810,20 +877,8 @@ class vmmConnection(vmmGObject):
self._backend.close()
self.record = []
cleanup(self._nodedevs)
self._nodedevs = {}
cleanup(self._interfaces)
self._interfaces = {}
cleanup(self._pools)
self._pools = {}
cleanup(self._nets)
self._nets = {}
cleanup(self._vms)
self._vms = {}
self._objects.cleanup()
self._objects = _ObjectList()
self._change_state(self._STATE_DISCONNECTED)
self._closing = False
@ -942,39 +997,55 @@ class vmmConnection(vmmGObject):
#######################
def _update_nets(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_nets())
if not dopoll or not self.is_network_capable():
return {}, {}, self._nets
return pollhelpers.fetch_nets(self._backend, self._nets.copy(),
return {}, {}, keymap
return pollhelpers.fetch_nets(self._backend, keymap,
(lambda obj, key: vmmNetwork(self, obj, key)))
def _update_pools(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_pools())
if not dopoll or not self.is_storage_capable():
return {}, {}, self._pools
return pollhelpers.fetch_pools(self._backend, self._pools.copy(),
return {}, {}, keymap
return pollhelpers.fetch_pools(self._backend, keymap,
(lambda obj, key: vmmStoragePool(self, obj, key)))
def _update_interfaces(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_interfaces())
if not dopoll or not self.is_interface_capable():
return {}, {}, self._interfaces
return pollhelpers.fetch_interfaces(self._backend,
self._interfaces.copy(),
return {}, {}, keymap
return pollhelpers.fetch_interfaces(self._backend, keymap,
(lambda obj, key: vmmInterface(self, obj, key)))
def _update_nodedevs(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_nodedevs())
if not dopoll or not self.is_nodedev_capable():
return {}, {}, self._nodedevs
return pollhelpers.fetch_nodedevs(self._backend, self._nodedevs.copy(),
return {}, {}, keymap
return pollhelpers.fetch_nodedevs(self._backend, keymap,
(lambda obj, key: vmmNodeDevice(self, obj, key)))
def _update_vms(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_vms())
if not dopoll:
return {}, {}, self._vms
return pollhelpers.fetch_vms(self._backend, self._vms.copy(),
return {}, {}, keymap
return pollhelpers.fetch_vms(self._backend, keymap,
(lambda obj, key: vmmDomain(self, obj, key)))
def _send_object_signals(self, new_objects, gone_objects,
finish_connecting):
"""
Responsible for signaling the UI for any updates. All possible UI
updates need to go here to enable threading that doesn't block the
app with long tick operations.
"""
# Connection closed out from under us
if not self._backend.is_open():
return
for obj in gone_objects:
if not self._objects.remove(obj):
continue
class_name = obj.class_name()
logging.debug("%s=%s removed", class_name, obj.get_name())
if class_name == "domain":
@ -990,6 +1061,9 @@ class vmmConnection(vmmGObject):
obj.cleanup()
for obj in new_objects:
if not self._objects.add(obj):
continue
class_name = obj.class_name()
if class_name != "nodedev":
# Skip nodedev logging since it's noisy and not interesting
@ -1108,69 +1182,30 @@ class vmmConnection(vmmGObject):
self.hostinfo = self._backend.getInfo()
try:
# We take the ticklock before using the conn object lists
# like self._vms. This ensures that those lists were updated
# by tick_send_signals since the last time we ran tick()
# https://www.redhat.com/archives/virt-tools-list/2015-April/msg00009.html
self._tick_lock.acquire()
(goneNets, newNets, nets) = self._update_nets(pollnet)
self._refresh_new_objects(newNets.values())
(gonePools, newPools, pools) = self._update_pools(pollpool)
self._refresh_new_objects(newPools.values())
(goneInterfaces,
newInterfaces, interfaces) = self._update_interfaces(polliface)
self._refresh_new_objects(newInterfaces.values())
(goneNodedevs,
newNodedevs, nodedevs) = self._update_nodedevs(pollnodedev)
self._refresh_new_objects(newNodedevs.values())
(goneNets, newNets, nets) = self._update_nets(pollnet)
self._refresh_new_objects(newNets.values())
(gonePools, newPools, pools) = self._update_pools(pollpool)
self._refresh_new_objects(newPools.values())
(goneInterfaces,
newInterfaces, interfaces) = self._update_interfaces(polliface)
self._refresh_new_objects(newInterfaces.values())
(goneNodedevs,
newNodedevs, nodedevs) = self._update_nodedevs(pollnodedev)
self._refresh_new_objects(newNodedevs.values())
# These are refreshing in their __init__ method, because the
# data is wanted immediately
(goneVMs, newVMs, vms) = self._update_vms(pollvm)
# These are refreshing in their __init__ method, because the
# data is wanted immediately
(goneVMs, newVMs, vms) = self._update_vms(pollvm)
gone_objects = (goneVMs.values() + goneNets.values() +
gonePools.values() + goneInterfaces.values() +
goneNodedevs.values())
new_objects = (newVMs.values() + newNets.values() +
newPools.values() + newInterfaces.values() +
newNodedevs.values())
gone_objects = (goneVMs.values() + goneNets.values() +
gonePools.values() + goneInterfaces.values() +
goneNodedevs.values())
new_objects = (newVMs.values() + newNets.values() +
newPools.values() + newInterfaces.values() +
newNodedevs.values())
except:
self._tick_lock.release()
raise
def tick_update_lists():
"""
Responsible for signaling the UI for any updates. All possible UI
updates need to go here to enable threading that doesn't block the
app with long tick operations.
"""
try:
# Connection closed out from under us
if not self._backend.is_open():
return
if pollvm:
self._vms = vms
if pollnet:
self._nets = nets
if polliface:
self._interfaces = interfaces
if pollpool:
self._pools = pools
if pollnodedev:
self._nodedevs = nodedevs
self.idle_add(self._send_object_signals,
new_objects, gone_objects, finish_connecting)
finally:
self._tick_lock.release()
# Anything that could possibly fail before this call needs to go
# in the try/except that handles the tick lock
self.idle_add(tick_update_lists)
self.idle_add(self._send_object_signals,
new_objects, gone_objects, finish_connecting)
ticklist = []
if pollvm or stats_update: