1
0
mirror of https://gitlab.com/libvirt/libvirt.git synced 2025-01-11 09:17:52 +03:00
libvirt/python/libvirt-override-virConnect.py
Guannan Ren a515aaa9e4 python: treat flags as default argument with value 0
The following four functions have not changed because default arguments
have to come after positional arguments. Changing them will break the
the binding APIs.

migrate(self, dconn, flags, dname, uri, bandwidth):
migrate2(self, dconn, dxml, flags, dname, uri, bandwidth):
migrateToURI(self, duri, flags, dname, bandwidth):
migrateToURI2(self, dconnuri, miguri, dxml, flags, dname, bandwidth):
2013-03-22 11:50:09 +08:00

313 lines
11 KiB
Python

def __del__(self):
try:
for cb,opaque in self.domainEventCallbacks.items():
del self.domainEventCallbacks[cb]
del self.domainEventCallbacks
libvirtmod.virConnectDomainEventDeregister(self._o, self)
except AttributeError:
pass
if self._o != None:
libvirtmod.virConnectClose(self._o)
self._o = None
def domainEventDeregister(self, cb):
"""Removes a Domain Event Callback. De-registering for a
domain callback will disable delivery of this event type """
try:
del self.domainEventCallbacks[cb]
if len(self.domainEventCallbacks) == 0:
del self.domainEventCallbacks
ret = libvirtmod.virConnectDomainEventDeregister(self._o, self)
if ret == -1: raise libvirtError ('virConnectDomainEventDeregister() failed', conn=self)
except AttributeError:
pass
def domainEventRegister(self, cb, opaque):
"""Adds a Domain Event Callback. Registering for a domain
callback will enable delivery of the events """
try:
self.domainEventCallbacks[cb] = opaque
except AttributeError:
self.domainEventCallbacks = {cb:opaque}
ret = libvirtmod.virConnectDomainEventRegister(self._o, self)
if ret == -1: raise libvirtError ('virConnectDomainEventRegister() failed', conn=self)
def _dispatchDomainEventCallbacks(self, dom, event, detail):
"""Dispatches events to python user domain event callbacks
"""
try:
for cb,opaque in self.domainEventCallbacks.items():
cb(self,dom,event,detail,opaque)
return 0
except AttributeError:
pass
def _dispatchDomainEventLifecycleCallback(self, dom, event, detail, cbData):
"""Dispatches events to python user domain lifecycle event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), event, detail, opaque)
return 0
def _dispatchDomainEventGenericCallback(self, dom, cbData):
"""Dispatches events to python user domain generic event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), opaque)
return 0
def _dispatchDomainEventRTCChangeCallback(self, dom, offset, cbData):
"""Dispatches events to python user domain RTC change event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), offset ,opaque)
return 0
def _dispatchDomainEventWatchdogCallback(self, dom, action, cbData):
"""Dispatches events to python user domain watchdog event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), action, opaque)
return 0
def _dispatchDomainEventIOErrorCallback(self, dom, srcPath, devAlias,
action, cbData):
"""Dispatches events to python user domain IO error event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), srcPath, devAlias, action, opaque)
return 0
def _dispatchDomainEventIOErrorReasonCallback(self, dom, srcPath,
devAlias, action, reason,
cbData):
"""Dispatches events to python user domain IO error event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), srcPath, devAlias, action,
reason, opaque)
return 0
def _dispatchDomainEventGraphicsCallback(self, dom, phase, localAddr,
remoteAddr, authScheme, subject,
cbData):
"""Dispatches events to python user domain graphics event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), phase, localAddr, remoteAddr,
authScheme, subject, opaque)
return 0
def dispatchDomainEventBlockPullCallback(self, dom, path, type, status, cbData):
"""Dispatches events to python user domain blockJob event callbacks
"""
try:
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), path, type, status, opaque)
return 0
except AttributeError:
pass
def _dispatchDomainEventDiskChangeCallback(self, dom, oldSrcPath, newSrcPath, devAlias, reason, cbData):
"""Dispatches event to python user domain diskChange event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), oldSrcPath, newSrcPath, devAlias, reason, opaque)
return 0
def _dispatchDomainEventTrayChangeCallback(self, dom, devAlias, reason, cbData):
"""Dispatches event to python user domain trayChange event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), devAlias, reason, opaque)
return 0
def _dispatchDomainEventPMWakeupCallback(self, dom, reason, cbData):
"""Dispatches event to python user domain pmwakeup event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), reason, opaque)
return 0
def _dispatchDomainEventPMSuspendCallback(self, dom, reason, cbData):
"""Dispatches event to python user domain pmsuspend event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), reason, opaque)
return 0
def _dispatchDomainEventBalloonChangeCallback(self, dom, actual, cbData):
"""Dispatches events to python user domain balloon change event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), actual, opaque)
return 0
def _dispatchDomainEventPMSuspendDiskCallback(self, dom, reason, cbData):
"""Dispatches event to python user domain pmsuspend-disk event callbacks
"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, virDomain(self, _obj=dom), reason, opaque)
return 0
def domainEventDeregisterAny(self, callbackID):
"""Removes a Domain Event Callback. De-registering for a
domain callback will disable delivery of this event type """
try:
ret = libvirtmod.virConnectDomainEventDeregisterAny(self._o, callbackID)
if ret == -1: raise libvirtError ('virConnectDomainEventDeregisterAny() failed', conn=self)
del self.domainEventCallbackID[callbackID]
except AttributeError:
pass
def domainEventRegisterAny(self, dom, eventID, cb, opaque):
"""Adds a Domain Event Callback. Registering for a domain
callback will enable delivery of the events """
if not hasattr(self, 'domainEventCallbackID'):
self.domainEventCallbackID = {}
cbData = { "cb": cb, "conn": self, "opaque": opaque }
if dom is None:
ret = libvirtmod.virConnectDomainEventRegisterAny(self._o, None, eventID, cbData)
else:
ret = libvirtmod.virConnectDomainEventRegisterAny(self._o, dom._o, eventID, cbData)
if ret == -1:
raise libvirtError ('virConnectDomainEventRegisterAny() failed', conn=self)
self.domainEventCallbackID[ret] = opaque
return ret
def listAllDomains(self, flags=0):
"""List all domains and returns a list of domain objects"""
ret = libvirtmod.virConnectListAllDomains(self._o, flags)
if ret is None:
raise libvirtError("virConnectListAllDomains() failed", conn=self)
retlist = list()
for domptr in ret:
retlist.append(virDomain(self, _obj=domptr))
return retlist
def listAllStoragePools(self, flags=0):
"""Returns a list of storage pool objects"""
ret = libvirtmod.virConnectListAllStoragePools(self._o, flags)
if ret is None:
raise libvirtError("virConnectListAllStoragePools() failed", conn=self)
retlist = list()
for poolptr in ret:
retlist.append(virStoragePool(self, _obj=poolptr))
return retlist
def listAllNetworks(self, flags=0):
"""Returns a list of network objects"""
ret = libvirtmod.virConnectListAllNetworks(self._o, flags)
if ret is None:
raise libvirtError("virConnectListAllNetworks() failed", conn=self)
retlist = list()
for netptr in ret:
retlist.append(virNetwork(self, _obj=netptr))
return retlist
def listAllInterfaces(self, flags=0):
"""Returns a list of interface objects"""
ret = libvirtmod.virConnectListAllInterfaces(self._o, flags)
if ret is None:
raise libvirtError("virConnectListAllInterfaces() failed", conn=self)
retlist = list()
for ifaceptr in ret:
retlist.append(virInterface(self, _obj=ifaceptr))
return retlist
def listAllDevices(self, flags=0):
"""Returns a list of host node device objects"""
ret = libvirtmod.virConnectListAllNodeDevices(self._o, flags)
if ret is None:
raise libvirtError("virConnectListAllNodeDevices() failed", conn=self)
retlist = list()
for devptr in ret:
retlist.append(virNodeDevice(self, _obj=devptr))
return retlist
def listAllNWFilters(self, flags=0):
"""Returns a list of network filter objects"""
ret = libvirtmod.virConnectListAllNWFilters(self._o, flags)
if ret is None:
raise libvirtError("virConnectListAllNWFilters() failed", conn=self)
retlist = list()
for filter_ptr in ret:
retlist.append(virNWFilter(self, _obj=filter_ptr))
return retlist
def listAllSecrets(self, flags=0):
"""Returns a list of secret objects"""
ret = libvirtmod.virConnectListAllSecrets(self._o, flags)
if ret is None:
raise libvirtError("virConnectListAllSecrets() failed", conn=self)
retlist = list()
for secret_ptr in ret:
retlist.append(virSecret(self, _obj=secret_ptr))
return retlist
def _dispatchCloseCallback(self, reason, cbData):
"""Dispatches events to python user close callback"""
cb = cbData["cb"]
opaque = cbData["opaque"]
cb(self, reason, opaque)
return 0
def unregisterCloseCallback(self):
"""Removes a close event callback"""
ret = libvirtmod.virConnectUnregisterCloseCallback(self._o)
if ret == -1: raise libvirtError ('virConnectUnregisterCloseCallback() failed', conn=self)
def registerCloseCallback(self, cb, opaque):
"""Adds a close event callback, providing a notification
when a connection fails / closes"""
cbData = { "cb": cb, "conn": self, "opaque": opaque }
ret = libvirtmod.virConnectRegisterCloseCallback(self._o, cbData)
if ret == -1:
raise libvirtError ('virConnectRegisterCloseCallback() failed', conn=self)
return ret